pravega_client/index/
reader.rs

1//
2// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10
11use crate::client_factory::ClientFactoryAsync;
12use crate::index::{IndexRecord, RECORD_SIZE};
13use crate::segment::reader::{AsyncSegmentReader, AsyncSegmentReaderImpl};
14
15use pravega_client_shared::{ScopedSegment, ScopedStream};
16
17use crate::index::writer::INDEX_RECORD_SIZE_ATTRIBUTE_ID;
18use crate::segment::metadata::SegmentMetadataClient;
19use crate::segment::raw_client::RawClient;
20use crate::util::get_request_id;
21use async_stream::try_stream;
22use futures::stream::Stream;
23use pravega_wire_protocol::commands::GetSegmentAttributeCommand;
24use pravega_wire_protocol::wire_commands::{Replies, Requests};
25use snafu::{ensure, Snafu};
26use std::io::SeekFrom;
27use tracing::info;
28
29#[derive(Debug, Snafu)]
30#[snafu(visibility = "pub")]
31pub enum IndexReaderError {
32    #[snafu(display("Field {} does not exist", msg))]
33    FieldNotFound { msg: String },
34
35    #[snafu(display("Invalid offset: {}", msg))]
36    InvalidOffset { msg: String },
37
38    #[snafu(display("Internal error: {}", msg))]
39    Internal { msg: String },
40}
41
42/// Index Reader reads the Index Record from Stream.
43///
44/// The Stream has to be fixed size single segment stream like byte stream.
45///
46/// # Examples
47/// ```no_run
48/// use pravega_client_config::ClientConfigBuilder;
49/// use pravega_client::client_factory::ClientFactory;
50/// use pravega_client_shared::ScopedStream;
51/// use futures_util::pin_mut;
52/// use futures_util::StreamExt;
53/// use std::io::Write;
54/// use tokio;
55///
56/// // Suppose the existing Fields in the stream is like below.
57/// // #[derive(Fields, Debug, PartialOrd, PartialEq)]
58/// // struct MyFields {
59/// //    id: u64,
60/// //    timestamp: u64,
61/// // }
62///
63/// #[tokio::main]
64/// async fn main() {
65///     // assuming Pravega controller is running at endpoint `localhost:9090`
66///     let config = ClientConfigBuilder::default()
67///         .controller_uri("localhost:9090")
68///         .build()
69///         .expect("creating config");
70///
71///     let client_factory = ClientFactory::new(config);
72///
73///     // assuming scope:myscope, stream:mystream exist.
74///     let stream = ScopedStream::from("myscope/mystream");
75///
76///     let mut index_reader = client_factory.create_index_reader(stream).await;
77///
78///     // search data
79///     let offset = index_reader.search_offset(("id", 10)).await.expect("get offset");
80///
81///     // read data
82///     let s = index_reader.read(offset, u64::MAX).expect("get read slice");
83///     pin_mut!(s);
84///     while let Some(res) = s.next().await {
85///         // do something with the read result
86///         res.expect("read next event");
87///     }
88/// }
89/// ```
90pub struct IndexReader {
91    stream: ScopedStream,
92    factory: ClientFactoryAsync,
93    meta: SegmentMetadataClient,
94    segment_reader: AsyncSegmentReaderImpl,
95    record_size: usize,
96}
97
98impl IndexReader {
99    pub(crate) async fn new(factory: ClientFactoryAsync, stream: ScopedStream) -> Self {
100        let segments = factory
101            .controller_client()
102            .get_head_segments(&stream)
103            .await
104            .expect("get head segments");
105        assert_eq!(
106            segments.len(),
107            1,
108            "Index stream is configured with more than one segment"
109        );
110        let segment = segments.iter().next().unwrap().0.clone();
111        let scoped_segment = ScopedSegment {
112            scope: stream.scope.clone(),
113            stream: stream.stream.clone(),
114            segment,
115        };
116        let segment_reader = factory.create_async_segment_reader(scoped_segment.clone()).await;
117        let meta = factory
118            .create_segment_metadata_client(scoped_segment.clone())
119            .await;
120
121        let controller_client = factory.controller_client();
122        let endpoint = controller_client
123            .get_endpoint_for_segment(&scoped_segment)
124            .await
125            .expect("get endpoint for segment");
126        let raw_client = factory.create_raw_client_for_endpoint(endpoint);
127        let segment_name = scoped_segment.to_string();
128        let token = controller_client
129            .get_or_refresh_delegation_token_for(stream.clone())
130            .await
131            .expect("controller error when refreshing token");
132        let request = Requests::GetSegmentAttribute(GetSegmentAttributeCommand {
133            request_id: get_request_id(),
134            segment_name: segment_name.clone(),
135            attribute_id: INDEX_RECORD_SIZE_ATTRIBUTE_ID,
136            delegation_token: token,
137        });
138        let reply = raw_client
139            .send_request(&request)
140            .await
141            .expect("get segment attribute");
142
143        let record_size = match reply {
144            Replies::SegmentAttribute(cmd) => {
145                if cmd.value == i64::MIN {
146                    info!("record_size segment attribute for Segment = {} is not set.Falling back to default RECORD_SIZE = {:?}", segment_name.clone() ,RECORD_SIZE);
147                    RECORD_SIZE as usize
148                } else {
149                    info!(
150                        "record_size segment attribute for Segment = {} is already set to {:?}",
151                        segment_name.clone(),
152                        cmd.value
153                    );
154                    cmd.value as usize
155                }
156            }
157            _ => {
158                panic!("get segment attribute for record_size failed due to {:?}", reply);
159            }
160        };
161        IndexReader {
162            stream,
163            factory,
164            meta,
165            segment_reader,
166            record_size,
167        }
168    }
169
170    /// Given an Field (name, v), find the offset of the first record that contains the given Field
171    /// that has value >= v.
172    ///
173    /// Note that if there are multiple entries that have the same Field name and value, this method will find and return
174    /// the first one.
175    /// If the value of searching field is smaller than the first readable Record's field in the
176    /// stream, the first record data will be returned.
177    /// If the value of searching field is larger than the latest Record, a FieldNotFound error will be returned.
178    pub async fn search_offset(&self, field: (&'static str, u64)) -> Result<u64, IndexReaderError> {
179        let record_size_signed: i64 = self.record_size as i64;
180
181        let target_key = IndexRecord::hash_key_to_u128(field.0);
182        let target_value = field.1;
183
184        let head = self.head_offset().await.map_err(|e| IndexReaderError::Internal {
185            msg: format!("error when fetching head offset: {:?}", e),
186        })? as i64;
187        let tail = self.tail_offset().await.map_err(|e| IndexReaderError::Internal {
188            msg: format!("error when fetching tail offset: {:?}", e),
189        })? as i64;
190        let mut start = 0;
191        let num_of_record = (tail - head) / record_size_signed;
192        let mut end = num_of_record - 1;
193
194        while start <= end {
195            let mid = start + (end - start) / 2;
196            let record = self
197                .read_record_from_random_offset((head + mid * record_size_signed) as u64)
198                .await?;
199
200            if let Some(e) = record.fields.iter().find(|&e| e.0 == target_key) {
201                // record contains the field, compare value with the target value.
202                if e.1 >= target_value {
203                    // value is large than or equal to the target value, check the first half.
204                    end = mid - 1;
205                } else {
206                    // value is smaller than the target value, check the second half.
207                    start = mid + 1;
208                }
209                // field does not exist in the current record.
210                // it might exist in the second half.
211            } else {
212                start = mid + 1;
213            }
214        }
215
216        if start == num_of_record {
217            Err(IndexReaderError::FieldNotFound {
218                msg: format!("key/value: {}/{}", field.0, field.1),
219            })
220        } else {
221            Ok((head + start * record_size_signed) as u64)
222        }
223    }
224
225    /// Reads records starting from the given offset.
226    ///
227    /// This method returns a slice of stream that implements an iterator. Application can iterate on
228    /// this slice to get the data. When `next()` is invoked on the iterator, a read request
229    /// will be issued by the underlying reader.
230    ///
231    /// If we want to do tail read instead of reading just a slice of the data, we can set end_offset
232    /// to be u64::MAX.
233    pub fn read<'stream, 'reader: 'stream>(
234        &'reader self,
235        start_offset: u64,
236        end_offset: u64,
237    ) -> Result<impl Stream<Item = Result<Vec<u8>, IndexReaderError>> + 'stream, IndexReaderError> {
238        ensure!(
239            start_offset % (self.record_size as u64) == 0,
240            InvalidOffset {
241                msg: format!(
242                    "Start offset {} is invalid as it cannot be divided by the record size {}",
243                    start_offset, self.record_size
244                )
245            }
246        );
247        if end_offset != u64::MAX {
248            ensure!(
249                end_offset % (self.record_size as u64) == 0,
250                InvalidOffset {
251                    msg: format!(
252                        "End offset {} is invalid as it cannot be divided by the record size {}",
253                        end_offset, self.record_size
254                    )
255                }
256            );
257        }
258        Ok(try_stream! {
259            let stream = self.stream.clone();
260            let record_size = self.record_size;
261            let mut byte_reader = self.factory.create_byte_reader(stream).await;
262            let mut num_of_records_to_read = if end_offset == u64::MAX {
263                u64::MAX
264            } else {
265                (end_offset - start_offset) / (record_size as u64)
266            };
267            byte_reader.seek(SeekFrom::Start(start_offset))
268                .await
269                .map_err(|e| IndexReaderError::InvalidOffset {
270                    msg: format!("invalid seeking offset {:?}", e)
271            })?;
272            loop {
273                let mut buf = vec!{};
274                let mut size_to_read = record_size as usize;
275                while size_to_read != 0 {
276                    let mut tmp_buf = vec![0; size_to_read];
277                    let size = byte_reader
278                        .read(&mut tmp_buf)
279                        .await
280                        .map_err(|e| IndexReaderError::Internal {
281                            msg: format!("byte reader read error {:?}", e),
282                        })?;
283                    buf.extend_from_slice(&tmp_buf[..size]);
284                    size_to_read -= size;
285                }
286                let record = IndexRecord::read_from(&buf).map_err(|e| IndexReaderError::Internal {
287                    msg: format!("deserialize record {:?}", e),
288                })?;
289                yield record.data;
290                if num_of_records_to_read != u64::MAX {
291                    num_of_records_to_read -= 1;
292                }
293                if num_of_records_to_read == 0 {
294                    break;
295                }
296            }
297        })
298    }
299
300    /// Data in the first readable record.
301    pub async fn first_record_data(&self) -> Result<Vec<u8>, IndexReaderError> {
302        let head_offset = self.head_offset().await?;
303        let first_record = self.read_record_from_random_offset(head_offset).await?;
304        Ok(first_record.data)
305    }
306
307    /// Data in the last record.
308    pub async fn last_record_data(&self) -> Result<Vec<u8>, IndexReaderError> {
309        let last_offset = self.tail_offset().await?;
310        let last_record_offset = last_offset - self.record_size as u64;
311        let last_record = self.read_record_from_random_offset(last_record_offset).await?;
312        Ok(last_record.data)
313    }
314
315    /// Get the readable head offset.
316    pub async fn head_offset(&self) -> Result<u64, IndexReaderError> {
317        self.meta
318            .fetch_current_starting_head()
319            .await
320            .map(|i| i as u64)
321            .map_err(|e| IndexReaderError::Internal {
322                msg: format!("failed to get head offset: {:?}", e),
323            })
324    }
325
326    /// Get the tail offset.
327    pub async fn tail_offset(&self) -> Result<u64, IndexReaderError> {
328        self.meta
329            .fetch_current_segment_length()
330            .await
331            .map(|i| i as u64)
332            .map_err(|e| IndexReaderError::Internal {
333                msg: format!("failed to get tail offset: {:?}", e),
334            })
335    }
336
337    // Read a record from a given offset.
338    pub(crate) async fn read_record_from_random_offset(
339        &self,
340        offset: u64,
341    ) -> Result<IndexRecord, IndexReaderError> {
342        let segment_read_cmd = self
343            .segment_reader
344            .read(offset as i64, self.record_size as i32)
345            .await
346            .map_err(|e| IndexReaderError::Internal {
347                msg: format!("segment reader error: {:?}", e),
348            })?;
349        let record =
350            IndexRecord::read_from(&segment_read_cmd.data).map_err(|e| IndexReaderError::Internal {
351                msg: format!("record deserialization error: {:?}", e),
352            })?;
353        Ok(record)
354    }
355}
356
357#[cfg(test)]
358mod test {
359    use super::*;
360    use crate::client_factory::ClientFactory;
361    use crate::util::create_stream;
362    use pravega_client_config::connection_type::{ConnectionType, MockType};
363    use pravega_client_config::ClientConfigBuilder;
364    use pravega_client_shared::PravegaNodeUri;
365
366    #[test]
367    #[should_panic(expected = "get segment attribute for record_size failed due to WrongHost")]
368    fn test_index_reader_wrong_host() {
369        let config = ClientConfigBuilder::default()
370            .connection_type(ConnectionType::Mock(MockType::WrongHost))
371            .mock(true)
372            .controller_uri(PravegaNodeUri::from("127.0.0.2:9091".to_string()))
373            .build()
374            .unwrap();
375        let factory = ClientFactory::new(config);
376        factory.runtime().block_on(create_stream(
377            &factory,
378            "testScopeInvalid",
379            "testStreamInvalid",
380            1,
381        ));
382        let stream = ScopedStream::from("testScopeInvalid/testStreamInvalid");
383        factory
384            .runtime()
385            .block_on(factory.create_index_reader(stream.clone()));
386    }
387
388    #[test]
389    fn test_default_index_reader_size() {
390        let config = ClientConfigBuilder::default()
391            .connection_type(ConnectionType::Mock(MockType::Happy))
392            .mock(true)
393            .controller_uri(PravegaNodeUri::from("127.0.0.2:9091".to_string()))
394            .build()
395            .unwrap();
396        let factory = ClientFactory::new(config);
397        factory.runtime().block_on(create_stream(
398            &factory,
399            "testScopeInvalid",
400            "testStreamInvalid",
401            1,
402        ));
403        let stream = ScopedStream::from("testScopeInvalid/testStreamInvalid");
404        let reply = factory
405            .runtime()
406            .block_on(factory.create_index_reader(stream.clone()));
407        assert_eq!(reply.record_size, RECORD_SIZE as usize);
408    }
409}