1use crate::client_factory::ClientFactoryAsync;
12use crate::index::{IndexRecord, RECORD_SIZE};
13use crate::segment::reader::{AsyncSegmentReader, AsyncSegmentReaderImpl};
14
15use pravega_client_shared::{ScopedSegment, ScopedStream};
16
17use crate::index::writer::INDEX_RECORD_SIZE_ATTRIBUTE_ID;
18use crate::segment::metadata::SegmentMetadataClient;
19use crate::segment::raw_client::RawClient;
20use crate::util::get_request_id;
21use async_stream::try_stream;
22use futures::stream::Stream;
23use pravega_wire_protocol::commands::GetSegmentAttributeCommand;
24use pravega_wire_protocol::wire_commands::{Replies, Requests};
25use snafu::{ensure, Snafu};
26use std::io::SeekFrom;
27use tracing::info;
28
29#[derive(Debug, Snafu)]
30#[snafu(visibility = "pub")]
31pub enum IndexReaderError {
32 #[snafu(display("Field {} does not exist", msg))]
33 FieldNotFound { msg: String },
34
35 #[snafu(display("Invalid offset: {}", msg))]
36 InvalidOffset { msg: String },
37
38 #[snafu(display("Internal error: {}", msg))]
39 Internal { msg: String },
40}
41
42pub struct IndexReader {
91 stream: ScopedStream,
92 factory: ClientFactoryAsync,
93 meta: SegmentMetadataClient,
94 segment_reader: AsyncSegmentReaderImpl,
95 record_size: usize,
96}
97
98impl IndexReader {
99 pub(crate) async fn new(factory: ClientFactoryAsync, stream: ScopedStream) -> Self {
100 let segments = factory
101 .controller_client()
102 .get_head_segments(&stream)
103 .await
104 .expect("get head segments");
105 assert_eq!(
106 segments.len(),
107 1,
108 "Index stream is configured with more than one segment"
109 );
110 let segment = segments.iter().next().unwrap().0.clone();
111 let scoped_segment = ScopedSegment {
112 scope: stream.scope.clone(),
113 stream: stream.stream.clone(),
114 segment,
115 };
116 let segment_reader = factory.create_async_segment_reader(scoped_segment.clone()).await;
117 let meta = factory
118 .create_segment_metadata_client(scoped_segment.clone())
119 .await;
120
121 let controller_client = factory.controller_client();
122 let endpoint = controller_client
123 .get_endpoint_for_segment(&scoped_segment)
124 .await
125 .expect("get endpoint for segment");
126 let raw_client = factory.create_raw_client_for_endpoint(endpoint);
127 let segment_name = scoped_segment.to_string();
128 let token = controller_client
129 .get_or_refresh_delegation_token_for(stream.clone())
130 .await
131 .expect("controller error when refreshing token");
132 let request = Requests::GetSegmentAttribute(GetSegmentAttributeCommand {
133 request_id: get_request_id(),
134 segment_name: segment_name.clone(),
135 attribute_id: INDEX_RECORD_SIZE_ATTRIBUTE_ID,
136 delegation_token: token,
137 });
138 let reply = raw_client
139 .send_request(&request)
140 .await
141 .expect("get segment attribute");
142
143 let record_size = match reply {
144 Replies::SegmentAttribute(cmd) => {
145 if cmd.value == i64::MIN {
146 info!("record_size segment attribute for Segment = {} is not set.Falling back to default RECORD_SIZE = {:?}", segment_name.clone() ,RECORD_SIZE);
147 RECORD_SIZE as usize
148 } else {
149 info!(
150 "record_size segment attribute for Segment = {} is already set to {:?}",
151 segment_name.clone(),
152 cmd.value
153 );
154 cmd.value as usize
155 }
156 }
157 _ => {
158 panic!("get segment attribute for record_size failed due to {:?}", reply);
159 }
160 };
161 IndexReader {
162 stream,
163 factory,
164 meta,
165 segment_reader,
166 record_size,
167 }
168 }
169
170 pub async fn search_offset(&self, field: (&'static str, u64)) -> Result<u64, IndexReaderError> {
179 let record_size_signed: i64 = self.record_size as i64;
180
181 let target_key = IndexRecord::hash_key_to_u128(field.0);
182 let target_value = field.1;
183
184 let head = self.head_offset().await.map_err(|e| IndexReaderError::Internal {
185 msg: format!("error when fetching head offset: {:?}", e),
186 })? as i64;
187 let tail = self.tail_offset().await.map_err(|e| IndexReaderError::Internal {
188 msg: format!("error when fetching tail offset: {:?}", e),
189 })? as i64;
190 let mut start = 0;
191 let num_of_record = (tail - head) / record_size_signed;
192 let mut end = num_of_record - 1;
193
194 while start <= end {
195 let mid = start + (end - start) / 2;
196 let record = self
197 .read_record_from_random_offset((head + mid * record_size_signed) as u64)
198 .await?;
199
200 if let Some(e) = record.fields.iter().find(|&e| e.0 == target_key) {
201 if e.1 >= target_value {
203 end = mid - 1;
205 } else {
206 start = mid + 1;
208 }
209 } else {
212 start = mid + 1;
213 }
214 }
215
216 if start == num_of_record {
217 Err(IndexReaderError::FieldNotFound {
218 msg: format!("key/value: {}/{}", field.0, field.1),
219 })
220 } else {
221 Ok((head + start * record_size_signed) as u64)
222 }
223 }
224
225 pub fn read<'stream, 'reader: 'stream>(
234 &'reader self,
235 start_offset: u64,
236 end_offset: u64,
237 ) -> Result<impl Stream<Item = Result<Vec<u8>, IndexReaderError>> + 'stream, IndexReaderError> {
238 ensure!(
239 start_offset % (self.record_size as u64) == 0,
240 InvalidOffset {
241 msg: format!(
242 "Start offset {} is invalid as it cannot be divided by the record size {}",
243 start_offset, self.record_size
244 )
245 }
246 );
247 if end_offset != u64::MAX {
248 ensure!(
249 end_offset % (self.record_size as u64) == 0,
250 InvalidOffset {
251 msg: format!(
252 "End offset {} is invalid as it cannot be divided by the record size {}",
253 end_offset, self.record_size
254 )
255 }
256 );
257 }
258 Ok(try_stream! {
259 let stream = self.stream.clone();
260 let record_size = self.record_size;
261 let mut byte_reader = self.factory.create_byte_reader(stream).await;
262 let mut num_of_records_to_read = if end_offset == u64::MAX {
263 u64::MAX
264 } else {
265 (end_offset - start_offset) / (record_size as u64)
266 };
267 byte_reader.seek(SeekFrom::Start(start_offset))
268 .await
269 .map_err(|e| IndexReaderError::InvalidOffset {
270 msg: format!("invalid seeking offset {:?}", e)
271 })?;
272 loop {
273 let mut buf = vec!{};
274 let mut size_to_read = record_size as usize;
275 while size_to_read != 0 {
276 let mut tmp_buf = vec![0; size_to_read];
277 let size = byte_reader
278 .read(&mut tmp_buf)
279 .await
280 .map_err(|e| IndexReaderError::Internal {
281 msg: format!("byte reader read error {:?}", e),
282 })?;
283 buf.extend_from_slice(&tmp_buf[..size]);
284 size_to_read -= size;
285 }
286 let record = IndexRecord::read_from(&buf).map_err(|e| IndexReaderError::Internal {
287 msg: format!("deserialize record {:?}", e),
288 })?;
289 yield record.data;
290 if num_of_records_to_read != u64::MAX {
291 num_of_records_to_read -= 1;
292 }
293 if num_of_records_to_read == 0 {
294 break;
295 }
296 }
297 })
298 }
299
300 pub async fn first_record_data(&self) -> Result<Vec<u8>, IndexReaderError> {
302 let head_offset = self.head_offset().await?;
303 let first_record = self.read_record_from_random_offset(head_offset).await?;
304 Ok(first_record.data)
305 }
306
307 pub async fn last_record_data(&self) -> Result<Vec<u8>, IndexReaderError> {
309 let last_offset = self.tail_offset().await?;
310 let last_record_offset = last_offset - self.record_size as u64;
311 let last_record = self.read_record_from_random_offset(last_record_offset).await?;
312 Ok(last_record.data)
313 }
314
315 pub async fn head_offset(&self) -> Result<u64, IndexReaderError> {
317 self.meta
318 .fetch_current_starting_head()
319 .await
320 .map(|i| i as u64)
321 .map_err(|e| IndexReaderError::Internal {
322 msg: format!("failed to get head offset: {:?}", e),
323 })
324 }
325
326 pub async fn tail_offset(&self) -> Result<u64, IndexReaderError> {
328 self.meta
329 .fetch_current_segment_length()
330 .await
331 .map(|i| i as u64)
332 .map_err(|e| IndexReaderError::Internal {
333 msg: format!("failed to get tail offset: {:?}", e),
334 })
335 }
336
337 pub(crate) async fn read_record_from_random_offset(
339 &self,
340 offset: u64,
341 ) -> Result<IndexRecord, IndexReaderError> {
342 let segment_read_cmd = self
343 .segment_reader
344 .read(offset as i64, self.record_size as i32)
345 .await
346 .map_err(|e| IndexReaderError::Internal {
347 msg: format!("segment reader error: {:?}", e),
348 })?;
349 let record =
350 IndexRecord::read_from(&segment_read_cmd.data).map_err(|e| IndexReaderError::Internal {
351 msg: format!("record deserialization error: {:?}", e),
352 })?;
353 Ok(record)
354 }
355}
356
357#[cfg(test)]
358mod test {
359 use super::*;
360 use crate::client_factory::ClientFactory;
361 use crate::util::create_stream;
362 use pravega_client_config::connection_type::{ConnectionType, MockType};
363 use pravega_client_config::ClientConfigBuilder;
364 use pravega_client_shared::PravegaNodeUri;
365
366 #[test]
367 #[should_panic(expected = "get segment attribute for record_size failed due to WrongHost")]
368 fn test_index_reader_wrong_host() {
369 let config = ClientConfigBuilder::default()
370 .connection_type(ConnectionType::Mock(MockType::WrongHost))
371 .mock(true)
372 .controller_uri(PravegaNodeUri::from("127.0.0.2:9091".to_string()))
373 .build()
374 .unwrap();
375 let factory = ClientFactory::new(config);
376 factory.runtime().block_on(create_stream(
377 &factory,
378 "testScopeInvalid",
379 "testStreamInvalid",
380 1,
381 ));
382 let stream = ScopedStream::from("testScopeInvalid/testStreamInvalid");
383 factory
384 .runtime()
385 .block_on(factory.create_index_reader(stream.clone()));
386 }
387
388 #[test]
389 fn test_default_index_reader_size() {
390 let config = ClientConfigBuilder::default()
391 .connection_type(ConnectionType::Mock(MockType::Happy))
392 .mock(true)
393 .controller_uri(PravegaNodeUri::from("127.0.0.2:9091".to_string()))
394 .build()
395 .unwrap();
396 let factory = ClientFactory::new(config);
397 factory.runtime().block_on(create_stream(
398 &factory,
399 "testScopeInvalid",
400 "testStreamInvalid",
401 1,
402 ));
403 let stream = ScopedStream::from("testScopeInvalid/testStreamInvalid");
404 let reply = factory
405 .runtime()
406 .block_on(factory.create_index_reader(stream.clone()));
407 assert_eq!(reply.record_size, RECORD_SIZE as usize);
408 }
409}