pravega_client/event/
reader.rs

1//
2// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10
11use crate::client_factory::ClientFactoryAsync;
12use crate::event::reader_group_state::ReaderGroupStateError::SyncError;
13use crate::event::reader_group_state::{Offset, ReaderGroupStateError};
14use crate::segment::reader::ReaderError::SegmentSealed;
15use crate::segment::reader::{AsyncSegmentReader, ReaderError};
16use snafu::{ResultExt, Snafu};
17
18use pravega_client_retry::retry_result::Retryable;
19use pravega_client_shared::{Reader, ScopedSegment, Segment, SegmentWithRange};
20use pravega_wire_protocol::commands::{Command, EventCommand, TYPE_PLUS_LENGTH_SIZE};
21
22use crate::sync::synchronizer::SynchronizerError;
23use bytes::{Buf, BufMut, BytesMut};
24use core::fmt;
25use im::HashMap as ImHashMap;
26use std::collections::{HashMap, HashSet};
27use std::mem;
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30use tokio::runtime::Handle;
31use tokio::sync::mpsc::{Receiver, Sender};
32use tokio::sync::oneshot;
33use tokio::sync::oneshot::error::TryRecvError;
34use tokio::sync::{mpsc, Mutex};
35use tokio::time::timeout;
36use tracing::{debug, error, info, warn};
37
38type ReaderErrorWithOffset = (ReaderError, i64);
39type SegmentReadResult = Result<SegmentDataBuffer, ReaderErrorWithOffset>;
40
41const REBALANCE_INTERVAL: Duration = Duration::from_secs(10);
42
43const UPDATE_OFFSET_INTERVAL: Duration = Duration::from_secs(3);
44
45const READ_BUFFER_SIZE: i32 = 8 * 1024 * 1024; // max size for a single Event
46
47cfg_if::cfg_if! {
48    if #[cfg(test)] {
49        use crate::event::reader_group_state::MockReaderGroupState as ReaderGroupState;
50    } else {
51        use crate::event::reader_group_state::ReaderGroupState;
52    }
53}
54
55/// Read events from Stream.
56///
57/// An event reader fetches data from its assigned segments as a SegmentSlice,
58/// where a SegmentSlice represents data from a Pravega Segment. It provides the following APIs.
59/// 1. A method to initialize the event reader [EventReader#init](EventReader#init)
60/// 2. A method to obtain a SegmentSlice to read events from a Pravega segment. The user can use the
61/// SegmentSlice's iterator API to fetch individual events from a given Segment Slice.
62/// [EventReader#acquire_segment](EventReader#acquire_segment).
63/// 3. A method to release the Segment back at the given offset. [EventReader#release_segment_at](EventReader#release_segment_at).
64///    This method needs to be invoked only the user does not consume all the events in a SegmentSlice.
65/// 4. A method to mark the reader as offline.[EventReader#reader_offline](EventReader#reader_offline).
66///    This method ensures the segments owned by this readers are transferred to other readers
67///    in the reader group.
68///
69/// # Examples
70///
71/// ```no_run
72/// use pravega_client_config::{ClientConfigBuilder, MOCK_CONTROLLER_URI};
73/// use pravega_client::client_factory::ClientFactory;
74/// use pravega_client_shared::{ScopedStream, Scope, Stream};
75///
76/// #[tokio::main]
77/// async fn main() {
78///    let config = ClientConfigBuilder::default()
79///         .controller_uri(MOCK_CONTROLLER_URI)
80///         .build()
81///         .expect("creating config");
82///     let client_factory = ClientFactory::new(config);
83///     let stream = ScopedStream {
84///         scope: Scope::from("scope".to_string()),
85///         stream: Stream::from("stream".to_string()),
86///     };
87///     // Create a reader group to read data from the Pravega stream.
88///     let rg = client_factory.create_reader_group("rg".to_string(), stream).await;
89///     // Create a reader under the reader group. The segments of the stream are assigned among the
90///     // readers which are part of the reader group.
91///     let mut reader1 = rg.create_reader("r1".to_string()).await;
92///     // read all events from a given segment slice.
93///     if let Some(mut segment_slice) =  reader1.acquire_segment().await.expect("Failed to acquire segment since the reader is offline") {
94///         while let Some(event) = segment_slice.next() {
95///             println!("Event read is {:?}", event);
96///         }
97///     }
98///     // read one event from the a given  segment slice and return it back.
99///     if let Some(mut segment_slice) = reader1.acquire_segment().await.expect("Failed to acquire segment since the reader is offline") {
100///         if let Some(event) = segment_slice.next() {
101///             println!("Event read is {:?}", event);
102///             // release the segment slice back to the reader.
103///             reader1.release_segment(segment_slice).await;
104///         }
105///     }
106/// }
107/// ```
108pub struct EventReader {
109    pub id: Reader,
110    factory: ClientFactoryAsync,
111    rx: Receiver<SegmentReadResult>,
112    tx: Sender<SegmentReadResult>,
113    meta: ReaderState,
114    rg_state: Arc<Mutex<ReaderGroupState>>,
115}
116
117#[derive(Debug, Snafu)]
118pub enum EventReaderError {
119    #[snafu(display("ReaderGroup State error: {}", source))]
120    StateError { source: ReaderGroupStateError },
121}
122
123impl Drop for EventReader {
124    /// Destructor for reader invoked. This will automatically invoke reader_offline().
125    fn drop(&mut self) {
126        info!("Reader {:?} is dropped", self.id);
127        // try fetching the currently running Runtime.
128        let r = Handle::try_current();
129        let rg_state = self.rg_state.clone();
130        let id = self.id.clone();
131        let mut meta = mem::take(&mut self.meta);
132        match r {
133            Ok(handle) => {
134                // enter the runtime context.
135                let _ = handle.enter();
136                // ensure we block until the reader_offline method completes.
137                tokio::spawn(async move {
138                    EventReader::reader_offline_internal(id, rg_state, &mut meta)
139                        .await
140                        .expect("Reader Offline");
141                });
142                info!("Reader {:?} is marked as offline.", self.id);
143            }
144            Err(_) => {
145                // ensure we block until the reader_offline executes successfully.
146                let rt = tokio::runtime::Runtime::new().expect("Create tokio runtime to drop reader");
147                rt.spawn(async move {
148                    EventReader::reader_offline_internal(id, rg_state, &mut meta)
149                        .await
150                        .expect("Reader Offline");
151                });
152                info!("Reader {:?} is marked as offline.", self.id);
153            }
154        }
155    }
156}
157
158impl EventReader {
159    /// Initialize the reader. This fetches the assigned segments from the Synchronizer and
160    /// spawns background tasks to start reads from those Segments.
161    pub(crate) async fn init_reader(
162        id: String,
163        rg_state: Arc<Mutex<ReaderGroupState>>,
164        factory: ClientFactoryAsync,
165    ) -> Self {
166        let reader = Reader::from(id);
167        let new_segments_to_acquire = rg_state
168            .lock()
169            .await
170            .compute_segments_to_acquire_or_release(&reader)
171            .await
172            .expect("should compute segments");
173        // attempt acquiring the desired number of segments.
174        if new_segments_to_acquire > 0 {
175            for _ in 0..new_segments_to_acquire {
176                if let Some(seg) = rg_state
177                    .lock()
178                    .await
179                    .assign_segment_to_reader(&reader)
180                    .await
181                    .expect("Error while waiting for segments to be assigned")
182                {
183                    debug!("Acquiring segment {:?} for reader {:?}", seg, reader);
184                } else {
185                    // There are no new unassigned segments to be acquired.
186                    debug!(
187                        "No unassigned segments that can be acquired by the reader {:?}",
188                        reader
189                    );
190                    break;
191                }
192            }
193        }
194        // Get all assigned segments for the reader.
195        let mut assigned_segments = rg_state
196            .lock()
197            .await
198            .get_segments_for_reader(&reader)
199            .await
200            .expect("Error while fetching currently assigned segments");
201
202        let mut slice_meta_map: HashMap<ScopedSegment, SliceMetadata> = HashMap::new();
203        slice_meta_map.extend(assigned_segments.drain().map(|(seg, offset)| {
204            (
205                seg.clone(),
206                SliceMetadata {
207                    scoped_segment: seg.to_string(),
208                    start_offset: offset.read,
209                    read_offset: offset.read,
210                    ..Default::default()
211                },
212            )
213        }));
214
215        let (tx, rx) = mpsc::channel(1);
216        let mut stop_reading_map: HashMap<ScopedSegment, oneshot::Sender<()>> = HashMap::new();
217        // spawn background fetch tasks.
218        slice_meta_map.iter().for_each(|(segment, meta)| {
219            let (tx_stop, rx_stop) = oneshot::channel();
220            stop_reading_map.insert(segment.clone(), tx_stop);
221            factory.runtime_handle().spawn(SegmentSlice::get_segment_data(
222                segment.clone(),
223                meta.start_offset,
224                tx.clone(),
225                rx_stop,
226                factory.clone(),
227            ));
228        });
229
230        // initialize the event reader.
231        EventReader::init_event_reader(
232            rg_state,
233            reader,
234            factory,
235            tx,
236            rx,
237            slice_meta_map,
238            stop_reading_map,
239        )
240    }
241
242    #[doc(hidden)]
243    fn init_event_reader(
244        rg_state: Arc<Mutex<ReaderGroupState>>,
245        id: Reader,
246        factory: ClientFactoryAsync,
247        tx: Sender<SegmentReadResult>,
248        rx: Receiver<SegmentReadResult>,
249        segment_slice_map: HashMap<ScopedSegment, SliceMetadata>,
250        slice_stop_reading: HashMap<ScopedSegment, oneshot::Sender<()>>,
251    ) -> Self {
252        EventReader {
253            id,
254            factory,
255            rx,
256            tx,
257            meta: ReaderState {
258                slices: segment_slice_map,
259                slices_dished_out: Default::default(),
260                slice_release_receiver: HashMap::new(),
261                slice_stop_reading,
262                last_segment_release: Instant::now(),
263                last_segment_acquire: Instant::now(),
264                last_offset_update: Instant::now(),
265                reader_offline: false,
266            },
267            rg_state,
268        }
269    }
270
271    // for testing purposes.
272    #[doc(hidden)]
273    #[cfg(feature = "integration-test")]
274    pub fn set_last_acquire_release_time(&mut self, time: Instant) {
275        self.meta.last_segment_release = time;
276        self.meta.last_segment_acquire = time;
277    }
278
279    /// Release a partially read segment slice back to event reader.
280    ///
281    /// Note: it may return an error indicating that the reader has already been removed. This means
282    /// that another thread removes this reader from the ReaderGroup probably due to the host of this reader
283    /// is assumed dead.
284    pub async fn release_segment(&mut self, mut slice: SegmentSlice) -> Result<(), EventReaderError> {
285        info!(
286            "releasing segment slice {} from reader {:?}",
287            slice.meta.scoped_segment, self.id
288        );
289        // check if the reader is already offline.
290        if self.meta.reader_offline {
291            return Err(EventReaderError::StateError {
292                source: ReaderGroupStateError::ReaderAlreadyOfflineError {
293                    error_msg: format!("Reader already marked offline {:?}", self.id),
294                    source: SynchronizerError::SyncPreconditionError {
295                        error_msg: String::from("Precondition failure"),
296                    },
297                },
298            });
299        }
300        //update meta data.
301        let scoped_segment = ScopedSegment::from(slice.meta.scoped_segment.clone().as_str());
302        self.meta.add_slices(slice.meta.clone());
303        self.meta.slices_dished_out.remove(&scoped_segment);
304        if self.meta.last_segment_release.elapsed() > REBALANCE_INTERVAL {
305            debug!("try to rebalance segments across readers");
306            let read_offset = slice.meta.read_offset;
307            // Note: reader may not online
308            self.release_segment_from_reader(slice, read_offset).await?;
309            self.meta.last_segment_release = Instant::now();
310        } else {
311            //send an indication to the waiting rx that slice has been returned.
312            debug!(" slice return to rx success {:?}  ", slice.meta);
313            if let Some(tx) = slice.slice_return_tx.take() {
314                if let Err(_e) = tx.send(Some(slice.meta.clone())) {
315                    warn!(
316                        "Failed to send segment slice release data for slice {:?}",
317                        slice.meta
318                    );
319                }
320            } else {
321                panic!("This is unexpected, No sender for SegmentSlice present.");
322            }
323        }
324
325        //Update latest reader positions if UPDATE_OFFSET_INTERVAL is elapsed
326        if self.meta.last_offset_update.elapsed() > UPDATE_OFFSET_INTERVAL {
327            let mut offset_map: HashMap<ScopedSegment, Offset> = HashMap::new();
328            for metadata in self.meta.slices.values() {
329                offset_map.insert(
330                    ScopedSegment::from(metadata.scoped_segment.as_str()),
331                    Offset::new(metadata.read_offset),
332                );
333            }
334            debug!(
335                " update reader position {:?}  for reader  {:?} ",
336                offset_map, self.id
337            );
338            self.rg_state
339                .lock()
340                .await
341                .update_reader_positions(&self.id, offset_map)
342                .await
343                .context(StateError {})?;
344
345            self.meta.last_offset_update = Instant::now();
346        }
347        Ok(())
348    }
349
350    /// Release a segment back to the reader and also indicate the offset up to which the segment slice is consumed.
351    ///
352    /// Note: it may return an error indicating that the reader has already been removed. This means
353    /// that another thread removes this reader from the ReaderGroup probably due to the host of this reader
354    /// is assumed dead.
355    pub async fn release_segment_at(
356        &mut self,
357        slice: SegmentSlice,
358        offset: i64,
359    ) -> Result<(), EventReaderError> {
360        info!(
361            "releasing segment slice {} at offset {}",
362            slice.meta.scoped_segment, offset
363        );
364        assert!(
365            offset >= 0,
366            "the offset where the segment slice is released should be a positive number"
367        );
368        assert!(
369            slice.meta.start_offset <= offset,
370            "the offset where the segment slice is released should be greater than the start offset"
371        );
372        assert!(
373            slice.meta.end_offset >= offset,
374            "the offset where the segment slice is released should be less than the end offset"
375        );
376        if self.meta.reader_offline {
377            return Err(EventReaderError::StateError {
378                source: ReaderGroupStateError::ReaderAlreadyOfflineError {
379                    error_msg: format!("Reader already marked offline {:?}", self.id),
380                    source: SynchronizerError::SyncPreconditionError {
381                        error_msg: String::from("Precondition failure"),
382                    },
383                },
384            });
385        }
386        let segment = ScopedSegment::from(slice.meta.scoped_segment.as_str());
387        if slice.meta.read_offset != offset {
388            self.meta.stop_reading(&segment);
389
390            let slice_meta = SliceMetadata {
391                start_offset: slice.meta.read_offset,
392                scoped_segment: slice.meta.scoped_segment.clone(),
393                last_event_offset: slice.meta.last_event_offset,
394                read_offset: offset,
395                end_offset: slice.meta.end_offset,
396                segment_data: SegmentDataBuffer::empty(),
397                partial_data_present: false,
398            };
399
400            // reinitialize the segment data reactor.
401            let (tx_drop_fetch, rx_drop_fetch) = oneshot::channel();
402            tokio::spawn(SegmentSlice::get_segment_data(
403                segment.clone(),
404                slice_meta.read_offset, // start reading from the offset provided.
405                self.tx.clone(),
406                rx_drop_fetch,
407                self.factory.clone(),
408            ));
409            self.meta.add_stop_reading_tx(segment.clone(), tx_drop_fetch);
410            self.meta.add_slices(slice_meta);
411            self.meta.slices_dished_out.remove(&segment);
412        } else {
413            self.release_segment(slice).await?;
414        }
415        Ok(())
416    }
417
418    /// Mark the reader as offline.
419    /// This will ensure the segments owned by this reader is distributed to other readers in the ReaderGroup.
420    ///
421    /// Note: it may return an error indicating that the reader has already been removed. This means
422    /// that another thread removes this reader from the ReaderGroup probably due to the host of this reader
423    /// is assumed dead.
424    async fn reader_offline_internal(
425        reader_id: Reader,
426        rg_state: Arc<Mutex<ReaderGroupState>>,
427        meta: &mut ReaderState,
428    ) -> Result<(), EventReaderError> {
429        if !meta.reader_offline && rg_state.lock().await.check_online(&reader_id).await {
430            info!("static Putting reader {:?} offline", reader_id);
431            // stop reading from all the segments.
432            meta.stop_reading_all();
433            // close all slice return Receivers.
434            meta.close_all_slice_return_channel();
435            // use the updated map to return the data.
436            let mut offset_map: HashMap<ScopedSegment, Offset> = HashMap::new();
437            for (seg, slice_meta) in meta.slices_dished_out.drain() {
438                offset_map.insert(seg, Offset::new(slice_meta.read_offset));
439            }
440            for meta in meta.slices.values() {
441                offset_map.insert(
442                    ScopedSegment::from(meta.scoped_segment.as_str()),
443                    Offset::new(meta.read_offset),
444                );
445            }
446
447            match rg_state.lock().await.remove_reader(&reader_id, offset_map).await {
448                Ok(()) => {
449                    meta.reader_offline = true;
450                    Ok(())
451                }
452                Err(e) => match e {
453                    ReaderGroupStateError::ReaderAlreadyOfflineError { .. } => {
454                        meta.reader_offline = true;
455                        info!("staticReader {:?} is already offline", reader_id);
456                        Ok(())
457                    }
458                    state_err => Err(EventReaderError::StateError { source: state_err }),
459                },
460            }?
461        }
462        Ok(())
463    }
464
465    /// Mark the reader as offline after calling the reader_offline_internal.
466    /// Note: it may return an error indicating that the reader has already been removed. This means
467    /// that another thread removes this reader from the ReaderGroup probably due to the host of this reader
468    /// is assumed dead.
469    pub async fn reader_offline(&mut self) -> Result<(), EventReaderError> {
470        let rg_state = self.rg_state.clone();
471        let id = self.id.clone();
472        let mut meta = mem::take(&mut self.meta);
473        Self::reader_offline_internal(id, rg_state, &mut meta).await
474    }
475
476    /// Release the segment of the provided SegmentSlice from the reader. This segment is marked as
477    /// unassigned in the reader group state and other reads can acquire it.
478    async fn release_segment_from_reader(
479        &mut self,
480        mut slice: SegmentSlice,
481        read_offset: i64,
482    ) -> Result<(), EventReaderError> {
483        if self.meta.reader_offline {
484            return Err(EventReaderError::StateError {
485                source: ReaderGroupStateError::ReaderAlreadyOfflineError {
486                    error_msg: format!("Reader already marked offline {:?}", self.id),
487                    source: SynchronizerError::SyncPreconditionError {
488                        error_msg: String::from("Precondition failure"),
489                    },
490                },
491            });
492        }
493        let new_segments_to_release = self
494            .rg_state
495            .lock()
496            .await
497            .compute_segments_to_acquire_or_release(&self.id)
498            .await
499            .map_err(|err| EventReaderError::StateError { source: err })?;
500        let segment = ScopedSegment::from(slice.meta.scoped_segment.as_str());
501        // check if segments needs to be released from the reader
502        if new_segments_to_release < 0 {
503            // Stop reading from the segment.
504            self.meta.stop_reading(&segment);
505            self.meta
506                .slices
507                .remove(&segment)
508                .expect("Segment missing in meta while releasing from reader");
509            // Send None to the waiting slice_return_rx.
510            if let Some(tx) = slice.slice_return_tx.take() {
511                if let Err(_e) = tx.send(None) {
512                    warn!(
513                        "Failed to send segment slice release data for slice {:?}",
514                        slice.meta
515                    );
516                }
517            } else {
518                panic!("This is unexpected, No sender for SegmentSlice present.");
519            }
520            self.rg_state
521                .lock()
522                .await
523                .release_segment(&self.id, &segment, &Offset::new(read_offset))
524                .await
525                .context(StateError {})?;
526        }
527        Ok(())
528    }
529
530    /// This function returns a SegmentSlice from the data received from the SegmentStore(s).
531    /// Individual events can be read from the data received using `SegmentSlice.next()`.
532    ///
533    /// Invoking this function multiple times ensure multiple SegmentSlices corresponding
534    /// to different Segments of the stream are received. In-case we receive data for an already
535    /// acquired SegmentSlice this method waits until SegmentSlice is completely consumed before
536    /// returning the data.
537    ///
538    /// Note: it may return an error indicating that the reader is not online. This means
539    /// that another thread removes this reader from the ReaderGroup probably because the host of this reader
540    /// is assumed dead.
541    pub async fn acquire_segment(&mut self) -> Result<Option<SegmentSlice>, EventReaderError> {
542        if self.meta.reader_offline || !self.rg_state.lock().await.check_online(&self.id).await {
543            return Err(EventReaderError::StateError {
544                source: ReaderGroupStateError::ReaderAlreadyOfflineError {
545                    error_msg: format!(
546                        "Reader already marked offline {:?} or the ReaderGroup is deleted",
547                        self.id
548                    ),
549                    source: SynchronizerError::SyncPreconditionError {
550                        error_msg: String::from("Precondition failure"),
551                    },
552                },
553            });
554        }
555        //Update latest reader positions if UPDATE_OFFSET_INTERVAL is elapsed
556        if self.meta.last_offset_update.elapsed() > UPDATE_OFFSET_INTERVAL {
557            let mut offset_map: HashMap<ScopedSegment, Offset> = HashMap::new();
558            for metadata in self.meta.slices.values() {
559                offset_map.insert(
560                    ScopedSegment::from(metadata.scoped_segment.as_str()),
561                    Offset::new(metadata.read_offset),
562                );
563            }
564            debug!(
565                " update reader position {:?}  for reader  {:?} ",
566                offset_map, self.id
567            );
568            self.rg_state
569                .lock()
570                .await
571                .update_reader_positions(&self.id, offset_map)
572                .await
573                .context(StateError {})?;
574
575            self.meta.last_offset_update = Instant::now();
576        }
577        info!("acquiring segment for reader {:?}", self.id);
578        // Check if newer segments should be acquired.
579        if self.meta.last_segment_acquire.elapsed() > REBALANCE_INTERVAL {
580            info!("need to rebalance segments across readers");
581            // assign newer segments to this reader if available.
582            // Note: reader may not online.
583            let res = self.assign_segments_to_reader().await.context(StateError {})?;
584            if let Some(new_segments) = res {
585                // fetch current segments.
586                // Note: reader may not online.
587                let current_segments = self
588                    .rg_state
589                    .lock()
590                    .await
591                    .get_segments_for_reader(&self.id)
592                    .await
593                    .map_err(|e| SyncError {
594                        error_msg: format!("failed to get segments for reader {:?}", self.id),
595                        source: e,
596                    })
597                    .context(StateError {})?;
598                let new_segments: HashSet<(ScopedSegment, Offset)> = current_segments
599                    .into_iter()
600                    .filter(|(seg, _off)| new_segments.contains(seg))
601                    .collect();
602                debug!("segments which can be read next are {:?}", new_segments);
603                // Initiate segment reads to the newer segments.
604                self.initiate_segment_reads(new_segments);
605                self.meta.last_segment_acquire = Instant::now();
606            }
607        }
608        // Check if any of the segments already has event data and return it.
609        if let Some(segment_with_data) = self.meta.get_segment_id_with_data() {
610            info!("segment {} has data ready to read", segment_with_data);
611            let slice_meta = self.meta.slices.remove(&segment_with_data).unwrap();
612            let segment = ScopedSegment::from(slice_meta.scoped_segment.as_str());
613            // Create an one-shot channel to receive SegmentSlice return.
614            let (slice_return_tx, slice_return_rx) = oneshot::channel();
615            self.meta.add_slice_release_receiver(segment, slice_return_rx);
616
617            info!(
618                "segment slice for {:?} is ready for consumption by reader {}",
619                slice_meta.scoped_segment, self.id,
620            );
621            self.meta
622                .slices_dished_out
623                .insert(segment_with_data, slice_meta.copy_meta());
624            Ok(Some(SegmentSlice {
625                meta: slice_meta,
626                slice_return_tx: Some(slice_return_tx),
627            }))
628        } else if let Ok(option) = timeout(Duration::from_millis(1000), self.rx.recv()).await {
629            if let Some(read_result) = option {
630                match read_result {
631                    // received segment data
632                    Ok(data) => {
633                        let segment = ScopedSegment::from(data.segment.clone().as_str());
634                        info!("new data fetched from server for segment {:?}", segment);
635                        if let Some(mut slice_meta) = self.meta.remove_segment(segment.clone()).await {
636                            if data.offset_in_segment
637                                != slice_meta.read_offset + slice_meta.segment_data.value.len() as i64
638                            {
639                                info!("Data from an invalid offset {:?} observed. Expected offset {:?}. Ignoring this data", data.offset_in_segment, slice_meta.read_offset);
640                                Ok(None)
641                            } else {
642                                // add received data to Segment slice.
643                                EventReader::add_data_to_segment_slice(data, &mut slice_meta);
644
645                                // Create an one-shot channel to receive SegmentSlice return.
646                                let (slice_return_tx, slice_return_rx) = oneshot::channel();
647                                self.meta
648                                    .add_slice_release_receiver(segment.clone(), slice_return_rx);
649                                self.meta
650                                    .slices_dished_out
651                                    .insert(segment.clone(), slice_meta.copy_meta());
652
653                                info!(
654                                    "segment slice for {:?} is ready for consumption by reader {}",
655                                    slice_meta, self.id,
656                                );
657
658                                Ok(Some(SegmentSlice {
659                                    meta: slice_meta,
660                                    slice_return_tx: Some(slice_return_tx),
661                                }))
662                            }
663                        } else {
664                            //None is sent if the the segment is released from the reader.
665                            debug!("ignore the received data since None was returned");
666                            Ok(None)
667                        }
668                    }
669                    Err((e, offset)) => {
670                        let segment = ScopedSegment::from(e.get_segment().as_str());
671                        debug!(
672                            "Reader Error observed {:?} on segment {:?} at offset {:?} ",
673                            e, segment, offset
674                        );
675                        // Remove the slice from the reader meta and fetch successors.
676                        if let Some(slice_meta) = self.meta.remove_segment(segment.clone()).await {
677                            if slice_meta.read_offset != offset {
678                                info!("Error at an invalid offset {:?} observed. Expected offset {:?}. Ignoring this data", offset, slice_meta.start_offset);
679                                self.meta.add_slices(slice_meta);
680                                self.meta.slices_dished_out.remove(&segment);
681                            } else {
682                                info!("Segment slice {:?} has received error {:?}", slice_meta, e);
683                                self.fetch_successors(e).await.context(StateError {})?;
684                            }
685                        }
686                        debug!("Segment Slice meta {:?}", self.meta.slices);
687                        Ok(None)
688                    }
689                }
690            } else {
691                warn!("Error getting updates from segment slice for reader {}", self.id);
692                Ok(None)
693            }
694        } else {
695            info!(
696                "reader {} owns {} slices but none is ready to read",
697                self.id,
698                self.meta.slices.len()
699            );
700            Ok(None)
701        }
702    }
703
704    // Fetch successors of the segment where an error was observed.
705    // ensure we stop the read task and spawn read tasks for the successor segments.
706    async fn fetch_successors(&mut self, e: ReaderError) -> Result<(), ReaderGroupStateError> {
707        match e {
708            ReaderError::SegmentSealed {
709                segment,
710                can_retry: _,
711                operation: _,
712                error_msg: _,
713            }
714            | ReaderError::SegmentIsTruncated {
715                segment,
716                can_retry: _,
717                operation: _,
718                error_msg: _,
719            } => {
720                let completed_scoped_segment = ScopedSegment::from(segment.as_str());
721                self.meta.stop_reading(&completed_scoped_segment); // stop reading segment.
722
723                // Fetch next segments that can be read from.
724                let successors = self
725                    .factory
726                    .controller_client()
727                    .get_successors(&completed_scoped_segment)
728                    .await
729                    .expect("Failed to fetch successors of the segment")
730                    .segment_with_predecessors;
731                info!("Segment Completed {:?}", segment);
732                // Update rg_state with the completed segment and its successors.
733                self.rg_state
734                    .lock()
735                    .await
736                    .segment_completed(&self.id, &completed_scoped_segment, &successors)
737                    .await?;
738                // Assign newer segments to this reader if available.
739                let option = self.assign_segments_to_reader().await?;
740                if let Some(new_segments) = option {
741                    // fetch current segments.
742                    let current_segments = self
743                        .rg_state
744                        .lock()
745                        .await
746                        .get_segments_for_reader(&self.id)
747                        .await
748                        .map_err(|e| SyncError {
749                            error_msg: format!("Failed to fetch segments for reader {:?}", self.id),
750                            source: e,
751                        })?;
752                    let new_segments: HashSet<(ScopedSegment, Offset)> = current_segments
753                        .into_iter()
754                        .filter(|(seg, _off)| new_segments.contains(seg))
755                        .collect();
756                    debug!("Segments which can be read next are {:?}", new_segments);
757                    // Initiate segment reads to the newer segments.
758                    self.initiate_segment_reads(new_segments);
759                }
760            }
761            _ => error!("Error observed while reading from Pravega {:?}", e),
762        };
763        Ok(())
764    }
765
766    // This function tries to acquire newer segments for the reader.
767    async fn assign_segments_to_reader(&self) -> Result<Option<Vec<ScopedSegment>>, ReaderGroupStateError> {
768        let mut new_segments: Vec<ScopedSegment> = Vec::new();
769        let new_segments_to_acquire = self
770            .rg_state
771            .lock()
772            .await
773            .compute_segments_to_acquire_or_release(&self.id)
774            .await
775            .expect("should compute segments");
776        if new_segments_to_acquire <= 0 {
777            Ok(None)
778        } else {
779            for _ in 0..new_segments_to_acquire {
780                if let Some(seg) = self
781                    .rg_state
782                    .lock()
783                    .await
784                    .assign_segment_to_reader(&self.id)
785                    .await?
786                {
787                    debug!("Acquiring segment {:?} for reader {:?}", seg, self.id);
788                    new_segments.push(seg);
789                } else {
790                    // There are no new unassigned segments to be acquired.
791                    break;
792                }
793            }
794            debug!("Segments acquired by reader {:?} are {:?}", self.id, new_segments);
795            Ok(Some(new_segments))
796        }
797    }
798
799    /// Initiate a task to read data from the newly assigned segments.
800    fn initiate_segment_reads(&mut self, new_segments: HashSet<(ScopedSegment, Offset)>) {
801        for (seg, offset) in new_segments {
802            let meta = SliceMetadata {
803                scoped_segment: seg.to_string(),
804                start_offset: offset.read,
805                read_offset: offset.read, // read offset should be same as start_offset.
806                ..Default::default()
807            };
808            let (tx_drop_fetch, rx_drop_fetch) = oneshot::channel();
809            tokio::spawn(SegmentSlice::get_segment_data(
810                seg.clone(),
811                meta.start_offset,
812                self.tx.clone(),
813                rx_drop_fetch,
814                self.factory.clone(),
815            ));
816            self.meta.add_stop_reading_tx(seg, tx_drop_fetch);
817            // update map with newer segments.
818            self.meta.add_slices(meta);
819        }
820    }
821
822    // Helper method to append data to SliceMetadata.
823    fn add_data_to_segment_slice(data: SegmentDataBuffer, slice: &mut SliceMetadata) {
824        if slice.segment_data.value.is_empty() {
825            slice.segment_data = data;
826        } else {
827            slice.segment_data.value.put(data.value); // append to partial data from last read.
828            slice.partial_data_present = false;
829        }
830    }
831
832    // Fetch the successors for a given segment from the controller.
833    async fn get_successors(
834        &mut self,
835        completed_scoped_segment: &str,
836    ) -> ImHashMap<SegmentWithRange, Vec<Segment>> {
837        let completed_scoped_segment = ScopedSegment::from(completed_scoped_segment);
838        self.factory
839            .controller_client()
840            .get_successors(&completed_scoped_segment)
841            .await
842            .expect("Failed to fetch successors of the segment")
843            .segment_with_predecessors
844    }
845}
846
847// Reader meta data.
848struct ReaderState {
849    slices: HashMap<ScopedSegment, SliceMetadata>,
850    slices_dished_out: HashMap<ScopedSegment, SliceMetadata>,
851    slice_release_receiver: HashMap<ScopedSegment, oneshot::Receiver<Option<SliceMetadata>>>,
852    slice_stop_reading: HashMap<ScopedSegment, oneshot::Sender<()>>,
853    last_segment_release: Instant,
854    last_segment_acquire: Instant,
855    last_offset_update: Instant,
856    reader_offline: bool,
857}
858impl Default for ReaderState {
859    // Implements the Default trait for the ReaderState struct.
860    // This allows us to create a new instance of ReaderState with default values using the Default::default() method.
861    fn default() -> Self {
862        ReaderState {
863            slices: HashMap::new(),
864            slices_dished_out: HashMap::new(),
865            slice_release_receiver: HashMap::new(),
866            slice_stop_reading: HashMap::new(),
867            last_segment_release: Instant::now(),
868            last_segment_acquire: Instant::now(),
869            last_offset_update: Instant::now(),
870            reader_offline: false,
871        }
872    }
873}
874
875impl ReaderState {
876    // Add a release receiver which is used to inform a EventReader when the Segment slice is returned.
877    fn add_slice_release_receiver(
878        &mut self,
879        scoped_segment: ScopedSegment,
880        slice_return_rx: oneshot::Receiver<Option<SliceMetadata>>,
881    ) {
882        self.slice_release_receiver
883            .insert(scoped_segment, slice_return_rx);
884    }
885
886    // Wait until the user application returns the Segment Slice.
887    async fn wait_for_segment_slice_return(&mut self, segment: &ScopedSegment) -> Option<SliceMetadata> {
888        if let Some(receiver) = self.slice_release_receiver.remove(segment) {
889            match receiver.await {
890                Ok(returned_meta) => {
891                    debug!("SegmentSlice returned {:?}", returned_meta);
892                    returned_meta
893                }
894                Err(e) => {
895                    error!(
896                        "Error Segment slice was not returned for segment {:?}. Error {:?} ",
897                        segment, e
898                    );
899                    self.slices_dished_out.remove(segment)
900                }
901            }
902        } else {
903            warn!(
904                "Invalid segment {:?} provided for while waiting for segment slice return",
905                segment
906            );
907            None
908        }
909    }
910
911    fn close_all_slice_return_channel(&mut self) {
912        for (_, mut rx) in self.slice_release_receiver.drain() {
913            rx.close();
914        }
915    }
916
917    // Remove segment slice from reader meta and return it.
918    // If the reader does not have the segment slice it waits for the segment slice which is out
919    // for consumption.
920    async fn remove_segment(&mut self, segment: ScopedSegment) -> Option<SliceMetadata> {
921        match self.slices.remove(&segment) {
922            Some(meta) => {
923                debug!(
924                    "Segment slice {:?} has not been dished out for consumption {:?} meta",
925                    &segment, meta
926                );
927                Some(meta)
928            }
929            None => {
930                debug!(
931                    "Segment slice for {:?} has already been dished out for consumption",
932                    &segment
933                );
934                self.wait_for_segment_slice_return(&segment).await
935            }
936        }
937    }
938
939    // Add Segment Slices to Reader meta data.
940    fn add_slices(&mut self, meta: SliceMetadata) {
941        if self
942            .slices
943            .insert(ScopedSegment::from(meta.scoped_segment.as_str()), meta)
944            .is_some()
945        {
946            panic!("Pre-condition check failure. Segment slice already present");
947        }
948    }
949
950    // Store a Sender which is used to stop the read task for a given Segment.
951    fn add_stop_reading_tx(&mut self, segment: ScopedSegment, tx: oneshot::Sender<()>) {
952        assert!(
953            self.slice_stop_reading.insert(segment, tx).is_none(),
954            "Pre-condition check failure. Sender used to stop fetching data is already present"
955        );
956    }
957
958    // Use the stored oneshot::Sender to stop segment reading background task.
959    fn stop_reading(&mut self, segment: &ScopedSegment) {
960        if let Some(tx) = self.slice_stop_reading.remove(segment) {
961            if tx.send(()).is_err() {
962                debug!("Channel already closed, ignoring the error");
963            }
964        }
965    }
966
967    // Stop all the background tasks that are trying to read from owned segments.
968    fn stop_reading_all(&mut self) {
969        for (_, tx) in self.slice_stop_reading.drain() {
970            if tx.send(()).is_err() {
971                debug!("Channel already closed, ignoring the error");
972            }
973        }
974    }
975
976    fn get_segment_id_with_data(&self) -> Option<ScopedSegment> {
977        self.slices
978            .iter()
979            .find_map(|(k, v)| if v.has_events() { Some(k.clone()) } else { None })
980    }
981}
982
983/// This represents an event that was read from a Pravega Segment and the offset at which the event
984/// was read from.
985#[derive(Debug)]
986pub struct Event {
987    pub offset_in_segment: i64,
988    pub value: Vec<u8>,
989}
990
991/// This represents a segment slice which can be used to read events from a Pravega segment as an
992/// iterator.
993#[derive(Default)]
994pub struct SegmentSlice {
995    pub meta: SliceMetadata,
996    pub(crate) slice_return_tx: Option<oneshot::Sender<Option<SliceMetadata>>>,
997}
998
999impl SegmentSlice {
1000    /// Create a new SegmentSlice for a given start_offset, segment.
1001    /// This spawns an asynchronous task to fetch data from the segment with length of  `READ_BUFFER_SIZE`.
1002    /// The channel buffer size is 1 which ensure only one outstanding read request to Segment store.
1003    fn new(
1004        segment: ScopedSegment,
1005        start_offset: i64,
1006        slice_return_tx: oneshot::Sender<Option<SliceMetadata>>,
1007    ) -> Self {
1008        SegmentSlice {
1009            meta: SliceMetadata {
1010                start_offset,
1011                scoped_segment: segment.to_string(),
1012                last_event_offset: 0,
1013                read_offset: start_offset,
1014                end_offset: i64::MAX,
1015                segment_data: SegmentDataBuffer::empty(),
1016                partial_data_present: false,
1017            },
1018            slice_return_tx: Some(slice_return_tx),
1019        }
1020    }
1021
1022    // Method to fetch data from the Segment store from a given start offset.
1023    async fn get_segment_data(
1024        segment: ScopedSegment,
1025        start_offset: i64,
1026        tx: Sender<SegmentReadResult>,
1027        mut drop_fetch: oneshot::Receiver<()>,
1028        factory: ClientFactoryAsync,
1029    ) {
1030        let mut offset: i64 = start_offset;
1031        let segment_reader = factory.create_async_segment_reader(segment.clone()).await;
1032        loop {
1033            if let Ok(_) | Err(TryRecvError::Closed) = drop_fetch.try_recv() {
1034                info!("Stop reading from the segment");
1035                break;
1036            }
1037            debug!(
1038                "Send read request to Segment store at offset {:?} with length {:?}",
1039                offset, READ_BUFFER_SIZE
1040            );
1041            let read = segment_reader.read(offset, READ_BUFFER_SIZE).await;
1042            match read {
1043                Ok(reply) => {
1044                    let len = reply.data.len();
1045                    debug!("read data length of {}", len);
1046                    if len == 0 && reply.end_of_segment {
1047                        info!("Reached end of segment {:?} during read ", segment.clone());
1048                        let data = SegmentSealed {
1049                            segment: segment.to_string(),
1050                            can_retry: false,
1051                            operation: "read segment".to_string(),
1052                            error_msg: "reached the end of stream".to_string(),
1053                        };
1054                        // send data: this waits until there is capacity in the channel.
1055                        if let Err(e) = tx.send(Err((data, offset))).await {
1056                            warn!("Error while sending segment data to event parser {:?} ", e);
1057                            break;
1058                        }
1059                        drop(tx);
1060                        break;
1061                    } else {
1062                        let segment_data = bytes::BytesMut::from(reply.data.as_slice());
1063                        let data = SegmentDataBuffer {
1064                            segment: segment.to_string(),
1065                            offset_in_segment: offset,
1066                            value: segment_data,
1067                        };
1068                        // send data: this waits until there is capacity in the channel.
1069                        if let Err(e) = tx.send(Ok(data)).await {
1070                            info!("Error while sending segment data to event parser {:?} ", e);
1071                            break;
1072                        }
1073                        offset += len as i64;
1074                    }
1075                }
1076                Err(e) => {
1077                    warn!("Error while reading from segment {:?}", e);
1078                    if !e.can_retry() {
1079                        let _s = tx.send(Err((e, offset))).await;
1080                        break;
1081                    }
1082                }
1083            }
1084        }
1085    }
1086
1087    // Return the starting offset of the SegmentSlice.
1088    fn get_starting_offset(&self) -> i64 {
1089        self.meta.start_offset
1090    }
1091
1092    // Return the segment associated with the SegmentSlice.
1093    fn get_segment(&self) -> String {
1094        //Returns the name of the segment
1095        self.meta.scoped_segment.clone()
1096    }
1097
1098    // Extract the next event from the data received from the Segment store.
1099    // Note: This returns a copy of the data received.
1100    // Return None in case of a Partial data.
1101    fn extract_event(
1102        &mut self,
1103        parse_header: fn(&mut SegmentDataBuffer) -> Option<SegmentDataBuffer>,
1104    ) -> Option<Event> {
1105        if let Some(mut event_data) = parse_header(&mut self.meta.segment_data) {
1106            let bytes_to_read = event_data.value.capacity();
1107            if bytes_to_read == 0 {
1108                warn!("Found a header with length as zero");
1109                return None;
1110            }
1111            if self.meta.segment_data.value.remaining() >= bytes_to_read + TYPE_PLUS_LENGTH_SIZE as usize {
1112                self.meta.segment_data.advance(TYPE_PLUS_LENGTH_SIZE as usize);
1113                // all the data of the event is already present.
1114                let t = self.meta.segment_data.split_to(bytes_to_read);
1115                event_data.value.put(t.value);
1116                info!("extract event data with length {}", event_data.value.len());
1117                //Convert to Event and send it.
1118                let event = Event {
1119                    offset_in_segment: event_data.offset_in_segment,
1120                    value: event_data.value.freeze().to_vec(),
1121                };
1122                Some(event)
1123            } else {
1124                // complete data for a given event is not present in the buffer.
1125                debug!(
1126                    "partial event read: data read length {}, target read length {}",
1127                    event_data.value.len(),
1128                    event_data.value.capacity()
1129                );
1130                self.meta.partial_data_present = true;
1131                None
1132            }
1133        } else {
1134            self.meta.partial_data_present = true;
1135            None
1136        }
1137    }
1138
1139    // This method reads the header and returns a BytesMut whose size is as big as the event.
1140    // If complete header is not present return None.
1141    fn read_header(data: &mut SegmentDataBuffer) -> Option<SegmentDataBuffer> {
1142        if data.value.len() >= TYPE_PLUS_LENGTH_SIZE as usize {
1143            let event_offset = data.offset_in_segment;
1144            //workaround since we cannot go back in the position using BytesMut
1145            let mut bytes_temp = data.value.bytes();
1146            let type_code = bytes_temp.get_i32();
1147            let len = bytes_temp.get_i32();
1148            assert_eq!(type_code, EventCommand::TYPE_CODE, "Expected EventCommand here.");
1149            debug!("Event size is {}", len);
1150            Some(SegmentDataBuffer {
1151                segment: data.segment.clone(),
1152                offset_in_segment: event_offset,
1153                value: BytesMut::with_capacity(len as usize),
1154            })
1155        } else {
1156            None
1157        }
1158    }
1159
1160    pub fn is_empty(&self) -> bool {
1161        self.meta.segment_data.value.is_empty() || self.meta.partial_data_present
1162    }
1163}
1164
1165impl Iterator for SegmentSlice {
1166    type Item = Event;
1167
1168    fn next(&mut self) -> Option<Self::Item> {
1169        // extract event from already fetched data.
1170        let res = self.extract_event(SegmentSlice::read_header);
1171
1172        match res {
1173            Some(event) => {
1174                self.meta.last_event_offset = event.offset_in_segment;
1175                self.meta.read_offset =
1176                    event.offset_in_segment + event.value.len() as i64 + TYPE_PLUS_LENGTH_SIZE as i64;
1177                if !self.meta.is_empty() {
1178                    assert_eq!(
1179                        self.meta.read_offset, self.meta.segment_data.offset_in_segment,
1180                        "Error in offset computation"
1181                    );
1182                }
1183                Some(event)
1184            }
1185            None => {
1186                if self.meta.is_empty() {
1187                    info!(
1188                        "Finished reading events from the segment slice of {:?}",
1189                        self.meta.scoped_segment
1190                    );
1191                } else {
1192                    info!("Partial event present in the segment slice of {:?}, this will be returned post a new read request", self.meta.scoped_segment);
1193                }
1194                None
1195            }
1196        }
1197    }
1198}
1199
1200impl fmt::Debug for SegmentSlice {
1201    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1202        f.debug_struct("SegmentSlice").field("meta", &self.meta).finish()
1203    }
1204}
1205
1206// Ensure a Drop of Segment slice releases the segment back to the reader group.
1207impl Drop for SegmentSlice {
1208    fn drop(&mut self) {
1209        if let Some(sender) = self.slice_return_tx.take() {
1210            let _ = sender.send(Some(self.meta.clone()));
1211        }
1212    }
1213}
1214
1215// This holds the SegmentSlice metadata. This meta is persisted by the EventReader.
1216#[derive(Clone)]
1217pub struct SliceMetadata {
1218    pub start_offset: i64,
1219    pub scoped_segment: String,
1220    pub last_event_offset: i64,
1221    pub read_offset: i64,
1222    pub end_offset: i64,
1223    segment_data: SegmentDataBuffer,
1224    pub partial_data_present: bool,
1225}
1226
1227impl SliceMetadata {
1228    /// Method to check if the slice has partial data.
1229    fn is_empty(&self) -> bool {
1230        self.segment_data.value.is_empty()
1231    }
1232
1233    /// Method to verify if the Segment has pending events that can be read.
1234    pub fn has_events(&self) -> bool {
1235        !self.partial_data_present && self.segment_data.value.len() > TYPE_PLUS_LENGTH_SIZE as usize
1236    }
1237
1238    fn copy_meta(&self) -> SliceMetadata {
1239        SliceMetadata {
1240            start_offset: self.start_offset,
1241            scoped_segment: self.scoped_segment.clone(),
1242            last_event_offset: self.last_event_offset,
1243            read_offset: self.read_offset,
1244            end_offset: self.end_offset,
1245            segment_data: SegmentDataBuffer::empty(),
1246            partial_data_present: false,
1247        }
1248    }
1249}
1250
1251impl fmt::Debug for SliceMetadata {
1252    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1253        f.debug_struct("SliceMetadata")
1254            .field("start_offset", &self.start_offset)
1255            .field("scoped_segment", &self.scoped_segment)
1256            .field("last_event_offset", &self.last_event_offset)
1257            .field("read_offset", &self.read_offset)
1258            .field("end_offset", &self.end_offset)
1259            .field("partial_data_present", &self.partial_data_present)
1260            .finish()
1261    }
1262}
1263
1264impl Default for SliceMetadata {
1265    fn default() -> Self {
1266        SliceMetadata {
1267            start_offset: Default::default(),
1268            scoped_segment: Default::default(),
1269            last_event_offset: Default::default(),
1270            read_offset: Default::default(),
1271            end_offset: i64::MAX,
1272            segment_data: SegmentDataBuffer::empty(),
1273            partial_data_present: false,
1274        }
1275    }
1276}
1277
1278// Structure to track the offset and byte array.
1279#[derive(Clone)]
1280struct SegmentDataBuffer {
1281    pub(crate) segment: String,
1282    pub(crate) offset_in_segment: i64,
1283    pub(crate) value: BytesMut,
1284}
1285
1286impl fmt::Debug for SegmentDataBuffer {
1287    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1288        f.debug_struct("SegmentDataBuffer")
1289            .field("segment", &self.segment)
1290            .field("offset in segment", &self.offset_in_segment)
1291            .field("buffer length", &self.value.len())
1292            .finish()
1293    }
1294}
1295
1296impl SegmentDataBuffer {
1297    /// Removes the bytes from the current view, returning them in a new `SegmentDataBuffer` handle.
1298    /// Afterwards, `self` will be empty.
1299    /// This is identical to `self.split_to(length)`.
1300    pub fn split(&mut self) -> SegmentDataBuffer {
1301        let res = self.value.split();
1302        let old_offset = self.offset_in_segment;
1303        let new_offset = old_offset + res.len() as i64;
1304        self.offset_in_segment = new_offset;
1305        SegmentDataBuffer {
1306            segment: self.segment.clone(),
1307            offset_in_segment: old_offset,
1308            value: res,
1309        }
1310    }
1311
1312    /// Splits the buffer into two at the given index.
1313    ///
1314    /// Afterwards `self` contains elements `[at, len)`, and the returned `SegmentDataBuffer`
1315    /// contains elements `[0, at)`.
1316    ///
1317    /// `self.offset_in_segment` is updated accordingly.
1318    ///
1319    /// # Panics
1320    ///
1321    /// Panics if `at > len`.
1322    pub fn split_to(&mut self, at: usize) -> SegmentDataBuffer {
1323        let old_offset = self.offset_in_segment;
1324        let res = self.value.split_to(at);
1325        self.offset_in_segment = old_offset + at as i64;
1326
1327        SegmentDataBuffer {
1328            segment: self.segment.clone(),
1329            offset_in_segment: old_offset,
1330            value: res,
1331        }
1332    }
1333
1334    /// Gets a signed 32 bit integer from `self` in big-endian byte order.
1335    ///
1336    /// The offset in segment is advanced by 4.
1337    ///
1338    /// # Panics
1339    ///
1340    /// This function panics if there is not enough remaining data in `self`.
1341    pub fn get_i32(&mut self) -> i32 {
1342        let result = self.value.get_i32();
1343        self.offset_in_segment += 4;
1344        result
1345    }
1346
1347    /// Advance the internal cursor of the buffer.
1348    pub fn advance(&mut self, cnt: usize) {
1349        self.value.advance(cnt);
1350        self.offset_in_segment += cnt as i64;
1351    }
1352
1353    /// Returns an empty SegmentDataBuffer. The offset is set as 0.
1354    pub fn empty() -> SegmentDataBuffer {
1355        SegmentDataBuffer {
1356            segment: Default::default(),
1357            offset_in_segment: 0,
1358            value: BytesMut::with_capacity(0),
1359        }
1360    }
1361}
1362
1363#[cfg(test)]
1364mod tests {
1365    use super::*;
1366    use crate::client_factory::ClientFactory;
1367    use crate::event::reader_group_state::ReaderGroupStateError;
1368    use crate::sync::synchronizer::SynchronizerError;
1369
1370    use bytes::{Buf, BufMut, BytesMut};
1371    use mockall::predicate;
1372    use mockall::predicate::*;
1373    use pravega_client_config::{ClientConfigBuilder, MOCK_CONTROLLER_URI};
1374    use pravega_client_shared::{Reader, Scope, ScopedSegment, ScopedStream, Stream};
1375    use pravega_wire_protocol::commands::{Command, EventCommand};
1376    use std::collections::HashMap;
1377    use std::iter;
1378    use std::sync::Arc;
1379    use tokio::sync::mpsc::Sender;
1380    use tokio::sync::oneshot;
1381    use tokio::sync::oneshot::error::TryRecvError;
1382    use tokio::sync::{mpsc, Mutex};
1383    use tokio::time::{sleep, Duration};
1384    use tracing::Level;
1385
1386    // This test verifies EventReader reads from a stream where only one segment has data while the other segment is empty.
1387    #[test]
1388    fn test_read_events_single_segment() {
1389        const NUM_EVENTS: usize = 100;
1390        let (tx, rx) = mpsc::channel(1);
1391        tracing_subscriber::fmt().with_max_level(Level::TRACE).finish();
1392        let cf = ClientFactory::new(
1393            ClientConfigBuilder::default()
1394                .controller_uri(MOCK_CONTROLLER_URI)
1395                .build()
1396                .unwrap(),
1397        );
1398
1399        // simulate data being received from Segment store.
1400        let _guard = cf.runtime().enter();
1401        tokio::spawn(generate_variable_size_events(
1402            tx.clone(),
1403            10,
1404            NUM_EVENTS,
1405            0,
1406            false,
1407        ));
1408
1409        // simulate initialization of a Reader
1410        let init_segments = vec![create_segment_slice(0), create_segment_slice(1)];
1411        let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
1412        rg_mock.expect_check_online().return_const(true);
1413        rg_mock
1414            .expect_compute_segments_to_acquire_or_release()
1415            .return_once(move |_| Ok(0 as isize));
1416        rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
1417        // create a new Event Reader with the segment slice data.
1418        let mut reader = EventReader::init_event_reader(
1419            Arc::new(Mutex::new(rg_mock)),
1420            Reader::from("r1".to_string()),
1421            cf.to_async(),
1422            tx.clone(),
1423            rx,
1424            create_slice_map(init_segments),
1425            HashMap::new(),
1426        );
1427
1428        let mut event_count = 0;
1429        let mut event_size = 0;
1430
1431        // Attempt to acquire a segment.
1432        while let Some(mut slice) = cf.runtime().block_on(reader.acquire_segment()).unwrap() {
1433            loop {
1434                if let Some(event) = slice.next() {
1435                    println!("Read event {:?}", event);
1436                    assert_eq!(event.value.len(), event_size + 1, "Event has been missed");
1437                    assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
1438                    event_size += 1;
1439                    event_count += 1;
1440                } else {
1441                    println!(
1442                        "Finished reading from segment {:?}, segment is auto released",
1443                        slice.meta.scoped_segment
1444                    );
1445                    break; // try to acquire the next segment.
1446                }
1447            }
1448            if event_count == NUM_EVENTS {
1449                // all events have been read. Exit test.
1450                break;
1451            }
1452        }
1453    }
1454
1455    #[test]
1456    fn test_acquire_segments() {
1457        const NUM_EVENTS: usize = 10;
1458        let (tx, rx) = mpsc::channel(1);
1459        tracing_subscriber::fmt().with_max_level(Level::TRACE).finish();
1460        let cf = ClientFactory::new(
1461            ClientConfigBuilder::default()
1462                .controller_uri(MOCK_CONTROLLER_URI)
1463                .build()
1464                .unwrap(),
1465        );
1466
1467        // simulate data being received from Segment store.
1468        let _guard = cf.runtime().enter();
1469        tokio::spawn(generate_variable_size_events(
1470            tx.clone(),
1471            1024,
1472            NUM_EVENTS,
1473            0,
1474            false,
1475        ));
1476
1477        // simulate initialization of a Reader
1478        let init_segments = vec![create_segment_slice(0)];
1479        let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
1480        rg_mock
1481            .expect_compute_segments_to_acquire_or_release()
1482            .with(predicate::eq(Reader::from("r1".to_string())))
1483            .return_once(move |_| Ok(1 as isize));
1484        rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
1485        rg_mock.expect_check_online().return_const(true);
1486        // mock rg_state.assign_segment_to_reader
1487        let res: Result<Option<ScopedSegment>, ReaderGroupStateError> =
1488            Ok(Some(ScopedSegment::from("scope/test/1.#epoch.0")));
1489        rg_mock
1490            .expect_assign_segment_to_reader()
1491            .with(predicate::eq(Reader::from("r1".to_string())))
1492            .return_once(move |_| res);
1493        //mock rg_state get_segments for reader
1494        let mut new_current_segments: HashSet<(ScopedSegment, Offset)> = HashSet::new();
1495        new_current_segments.insert((ScopedSegment::from("scope/test/1.#epoch.0"), Offset::new(0)));
1496        new_current_segments.insert((ScopedSegment::from("scope/test/0.#epoch.0"), Offset::new(0)));
1497        let res: Result<HashSet<(ScopedSegment, Offset)>, SynchronizerError> = Ok(new_current_segments);
1498        rg_mock
1499            .expect_get_segments_for_reader()
1500            .with(predicate::eq(Reader::from("r1".to_string())))
1501            .return_once(move |_| res);
1502
1503        // simulate data being received from Segment store.
1504        tokio::spawn(generate_variable_size_events(
1505            tx.clone(),
1506            1024,
1507            NUM_EVENTS,
1508            1,
1509            false,
1510        ));
1511
1512        let before_time = Instant::now() - Duration::from_secs(15);
1513        // create a new Event Reader with the segment slice data.
1514        let mut reader = EventReader::init_event_reader(
1515            Arc::new(Mutex::new(rg_mock)),
1516            Reader::from("r1".to_string()),
1517            cf.to_async(),
1518            tx.clone(),
1519            rx,
1520            create_slice_map(init_segments),
1521            HashMap::new(),
1522        );
1523        reader.set_last_acquire_release_time(before_time);
1524
1525        let mut event_count = 0;
1526
1527        // Attempt to acquire a segment.
1528        while let Some(mut slice) = cf.runtime().block_on(reader.acquire_segment()).unwrap() {
1529            loop {
1530                if let Some(event) = slice.next() {
1531                    println!("Read event {:?}", event);
1532                    assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
1533                    event_count += 1;
1534                } else {
1535                    println!(
1536                        "Finished reading from segment {:?}, segment is auto released",
1537                        slice.meta.scoped_segment
1538                    );
1539                    break; // try to acquire the next segment.
1540                }
1541            }
1542            if event_count == NUM_EVENTS + NUM_EVENTS {
1543                // all events have been read. Exit test.
1544                break;
1545            }
1546        }
1547        assert_eq!(event_count, NUM_EVENTS + NUM_EVENTS);
1548    }
1549
1550    // This test verifies an EventReader reading from a stream where both the segments are sending data.
1551    #[test]
1552    fn test_read_events_multiple_segments() {
1553        const NUM_EVENTS: usize = 100;
1554        let (tx, rx) = mpsc::channel(1);
1555        tracing_subscriber::fmt().with_max_level(Level::TRACE).finish();
1556        let cf = ClientFactory::new(
1557            ClientConfigBuilder::default()
1558                .controller_uri(MOCK_CONTROLLER_URI)
1559                .build()
1560                .unwrap(),
1561        );
1562
1563        // simulate data being received from Segment store. 2 async tasks pumping in data.
1564        let _guard = cf.runtime().enter();
1565        tokio::spawn(generate_variable_size_events(
1566            tx.clone(),
1567            100,
1568            NUM_EVENTS,
1569            0,
1570            false,
1571        ));
1572        //simulate a delay with data received by this segment.
1573        tokio::spawn(generate_variable_size_events(
1574            tx.clone(),
1575            100,
1576            NUM_EVENTS,
1577            1,
1578            true,
1579        ));
1580
1581        // simulate initialization of a Reader
1582        let init_segments = vec![create_segment_slice(0), create_segment_slice(1)];
1583        let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
1584        rg_mock
1585            .expect_compute_segments_to_acquire_or_release()
1586            .return_once(move |_| Ok(0 as isize));
1587        rg_mock.expect_check_online().return_const(true);
1588        rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
1589        rg_mock
1590            .expect_update_reader_positions()
1591            .return_once(move |_, _| Ok(()));
1592        // create a new Event Reader with the segment slice data.
1593        let mut reader = EventReader::init_event_reader(
1594            Arc::new(Mutex::new(rg_mock)),
1595            Reader::from("r1".to_string()),
1596            cf.to_async(),
1597            tx.clone(),
1598            rx,
1599            create_slice_map(init_segments),
1600            HashMap::new(),
1601        );
1602
1603        let mut event_count_per_segment: HashMap<String, usize> = HashMap::new();
1604
1605        let mut total_events_read = 0;
1606        // Attempt to acquire a segment.
1607        while let Some(mut slice) = cf.runtime().block_on(reader.acquire_segment()).unwrap() {
1608            let segment = slice.meta.scoped_segment.clone();
1609            println!("Received Segment Slice {:?}", segment);
1610            let mut event_count = 0;
1611            loop {
1612                if let Some(event) = slice.next() {
1613                    println!("Read event {:?}", event);
1614                    assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
1615                    event_count += 1;
1616                } else {
1617                    println!(
1618                        "Finished reading from segment {:?}, segment is auto released",
1619                        slice.meta.scoped_segment
1620                    );
1621                    break; // try to acquire the next segment.
1622                }
1623            }
1624            total_events_read += event_count;
1625            *event_count_per_segment
1626                .entry(segment.clone())
1627                .or_insert(event_count) += event_count;
1628            if total_events_read == NUM_EVENTS * 2 {
1629                // all events have been read. Exit test.
1630                break;
1631            }
1632        }
1633    }
1634
1635    #[test]
1636    fn test_return_slice() {
1637        const NUM_EVENTS: usize = 2;
1638        let (tx, rx) = mpsc::channel(1);
1639        tracing_subscriber::fmt().with_max_level(Level::TRACE).finish();
1640        let cf = ClientFactory::new(
1641            ClientConfigBuilder::default()
1642                .controller_uri(MOCK_CONTROLLER_URI)
1643                .build()
1644                .unwrap(),
1645        );
1646
1647        // simulate data being received from Segment store.
1648        let _guard = cf.runtime().enter();
1649        tokio::spawn(generate_variable_size_events(
1650            tx.clone(),
1651            10,
1652            NUM_EVENTS,
1653            0,
1654            false,
1655        ));
1656
1657        // simulate initialization of a Reader
1658        let init_segments = vec![create_segment_slice(0), create_segment_slice(1)];
1659
1660        let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
1661        rg_mock.expect_check_online().return_const(true);
1662        rg_mock
1663            .expect_compute_segments_to_acquire_or_release()
1664            .return_once(move |_| Ok(0 as isize));
1665        rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
1666        // create a new Event Reader with the segment slice data.
1667        let mut reader = EventReader::init_event_reader(
1668            Arc::new(Mutex::new(rg_mock)),
1669            Reader::from("r1".to_string()),
1670            cf.to_async(),
1671            tx.clone(),
1672            rx,
1673            create_slice_map(init_segments),
1674            HashMap::new(),
1675        );
1676
1677        // acquire a segment
1678        let mut slice = cf
1679            .runtime()
1680            .block_on(reader.acquire_segment())
1681            .expect("Failed to acquire segment since the reader is offline")
1682            .unwrap();
1683
1684        // read an event.
1685        let event = slice.next().unwrap();
1686        assert_eq!(event.value.len(), 1);
1687        assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
1688        assert_eq!(event.offset_in_segment, 0); // first event.
1689
1690        // release the segment slice.
1691        let _ = cf.runtime().block_on(reader.release_segment(slice));
1692
1693        // acquire the next segment
1694        let slice = cf
1695            .runtime()
1696            .block_on(reader.acquire_segment())
1697            .expect("Failed to acquire segment since the reader is offline")
1698            .unwrap();
1699
1700        //Do not read, simply return it back.
1701        let _ = cf.runtime().block_on(reader.release_segment(slice));
1702
1703        // Try acquiring the segment again.
1704        let mut slice = cf
1705            .runtime()
1706            .block_on(reader.acquire_segment())
1707            .expect("Failed to acquire segment")
1708            .unwrap();
1709        // Verify a partial event being present. This implies
1710        let event = slice.next().unwrap();
1711        assert_eq!(event.value.len(), 2);
1712        assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
1713        assert_eq!(event.offset_in_segment, 8 + 1); // first event.
1714    }
1715
1716    #[test]
1717    fn test_return_slice_at_offset() {
1718        const NUM_EVENTS: usize = 2;
1719        let (tx, rx) = mpsc::channel(1);
1720        let (stop_tx, stop_rx) = oneshot::channel();
1721        tracing_subscriber::fmt().with_max_level(Level::TRACE).finish();
1722        let cf = ClientFactory::new(
1723            ClientConfigBuilder::default()
1724                .controller_uri(MOCK_CONTROLLER_URI)
1725                .build()
1726                .unwrap(),
1727        );
1728
1729        // simulate data being received from Segment store.
1730        let _guard = cf.runtime().enter();
1731        tokio::spawn(generate_constant_size_events(
1732            tx.clone(),
1733            20,
1734            NUM_EVENTS,
1735            0,
1736            false,
1737            stop_rx,
1738        ));
1739        let mut stop_reading_map: HashMap<ScopedSegment, oneshot::Sender<()>> = HashMap::new();
1740        stop_reading_map.insert(ScopedSegment::from("scope/test/0.#epoch.0"), stop_tx);
1741
1742        // simulate initialization of a Reader
1743        let init_segments = vec![create_segment_slice(0), create_segment_slice(1)];
1744        let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
1745        rg_mock.expect_check_online().return_const(true);
1746        rg_mock
1747            .expect_compute_segments_to_acquire_or_release()
1748            .return_once(move |_| Ok(0 as isize));
1749        rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
1750        // create a new Event Reader with the segment slice data.
1751        let mut reader = EventReader::init_event_reader(
1752            Arc::new(Mutex::new(rg_mock)),
1753            Reader::from("r1".to_string()),
1754            cf.to_async(),
1755            tx.clone(),
1756            rx,
1757            create_slice_map(init_segments),
1758            stop_reading_map,
1759        );
1760
1761        // acquire a segment
1762        let mut slice = cf
1763            .runtime()
1764            .block_on(reader.acquire_segment())
1765            .expect("Failed to acquire segment")
1766            .unwrap();
1767
1768        // read an event.
1769        let event = slice.next().unwrap();
1770        assert_eq!(event.value.len(), 1);
1771        assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
1772        assert_eq!(event.offset_in_segment, 0); // first event.
1773
1774        let result = slice.next();
1775        assert!(result.is_some());
1776        let event = result.unwrap();
1777        assert_eq!(event.value.len(), 1);
1778        assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
1779        assert_eq!(event.offset_in_segment, 9); // second event.
1780
1781        // release the segment slice.
1782        let _ = cf.runtime().block_on(reader.release_segment_at(slice, 0));
1783
1784        // simulate a segment read at offset 0.
1785        let (_stop_tx, stop_rx) = oneshot::channel();
1786        tokio::spawn(generate_constant_size_events(
1787            tx.clone(),
1788            20,
1789            NUM_EVENTS,
1790            0,
1791            false,
1792            stop_rx,
1793        ));
1794
1795        // acquire the next segment
1796        let mut slice = cf
1797            .runtime()
1798            .block_on(reader.acquire_segment())
1799            .expect("Failed to acquire segment")
1800            .unwrap();
1801        // Verify a partial event being present. This implies
1802        let event = slice.next().unwrap();
1803        assert_eq!(event.value.len(), 1);
1804        assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
1805        assert_eq!(event.offset_in_segment, 0); // first event.
1806    }
1807
1808    #[tokio::test]
1809    async fn test_read_partial_events_buffer_10() {
1810        let (tx, mut rx) = mpsc::channel(1);
1811        tokio::spawn(generate_variable_size_events(tx, 10, 20, 0, false));
1812        let mut segment_slice = create_segment_slice(0);
1813        let mut expected_offset: usize = 0;
1814        let mut expected_event_len = 0;
1815
1816        loop {
1817            if segment_slice.is_empty() {
1818                if let Some(response) = rx.recv().await {
1819                    segment_slice
1820                        .meta
1821                        .segment_data
1822                        .value
1823                        .put(response.expect("get response").value);
1824                } else {
1825                    break; // All events are sent.
1826                }
1827            }
1828
1829            while let Some(d) = segment_slice.next() {
1830                assert_eq!(expected_offset, d.offset_in_segment as usize);
1831                assert_eq!(expected_event_len + 1, d.value.len());
1832                assert!(is_all_same(d.value.as_slice()));
1833                expected_offset += 8 + expected_event_len + 1;
1834                expected_event_len += 1;
1835            }
1836        }
1837        assert_eq!(20, expected_event_len);
1838    }
1839
1840    #[tokio::test]
1841    async fn test_read_partial_events_buffer_100() {
1842        let (tx, mut rx) = mpsc::channel(1);
1843        tokio::spawn(generate_variable_size_events(tx, 100, 200, 0, false));
1844        let mut segment_slice = create_segment_slice(0);
1845        let mut expected_offset: usize = 0;
1846        let mut expected_event_len = 0;
1847
1848        loop {
1849            if segment_slice.is_empty() {
1850                if let Some(response) = rx.recv().await {
1851                    segment_slice
1852                        .meta
1853                        .segment_data
1854                        .value
1855                        .put(response.expect("get response").value);
1856                } else {
1857                    break; // All events are sent.
1858                }
1859            }
1860
1861            while let Some(d) = segment_slice.next() {
1862                assert_eq!(expected_offset, d.offset_in_segment as usize);
1863                assert_eq!(expected_event_len + 1, d.value.len());
1864                assert!(is_all_same(d.value.as_slice()));
1865                expected_offset += 8 + expected_event_len + 1;
1866                expected_event_len += 1;
1867            }
1868        }
1869        assert_eq!(200, expected_event_len);
1870    }
1871
1872    // Generate event data given the length of the event.
1873    // The data is 'a'
1874    fn generate_event_data(len: usize) -> BytesMut {
1875        let mut buf = BytesMut::with_capacity(len + 8);
1876        buf.put_i32(EventCommand::TYPE_CODE);
1877        buf.put_i32(len as i32); // header
1878
1879        let mut data = Vec::new();
1880        data.extend(iter::repeat(b'a').take(len));
1881        buf.put(data.as_slice());
1882        buf
1883    }
1884
1885    // Custom multiple size events.
1886    async fn generate_multiple_constant_size_events(tx: Sender<SegmentDataBuffer>) {
1887        let mut buf = BytesMut::with_capacity(10);
1888        let segment = ScopedSegment::from("test/test/123").to_string();
1889
1890        buf.put_i32(1);
1891        buf.put_u8(b'a');
1892        buf.put_i32(2);
1893        buf.put(&b"aa"[..]);
1894        tx.send(SegmentDataBuffer {
1895            segment: segment.clone(),
1896            offset_in_segment: 0,
1897            value: buf,
1898        })
1899        .await
1900        .unwrap();
1901
1902        buf = BytesMut::with_capacity(10);
1903        buf.put_i32(3);
1904        buf.put(&b"aaa"[..]);
1905        tx.send(SegmentDataBuffer {
1906            segment: segment.clone(),
1907            offset_in_segment: 0,
1908            value: buf,
1909        })
1910        .await
1911        .unwrap();
1912
1913        buf = BytesMut::with_capacity(10);
1914        buf.put_i32(4);
1915        buf.put(&b"aaaa"[..]);
1916        tx.send(SegmentDataBuffer {
1917            segment: segment.clone(),
1918            offset_in_segment: 0,
1919            value: buf,
1920        })
1921        .await
1922        .unwrap();
1923
1924        buf = BytesMut::with_capacity(10);
1925        buf.put_i32(5);
1926        buf.put(&b"aaaaa"[..]);
1927        tx.send(SegmentDataBuffer {
1928            segment: segment.clone(),
1929            offset_in_segment: 0,
1930            value: buf,
1931        })
1932        .await
1933        .unwrap();
1934
1935        buf = BytesMut::with_capacity(10);
1936        buf.put_i32(6);
1937        buf.put(&b"aaaaaa"[..]);
1938        tx.send(SegmentDataBuffer {
1939            segment: segment.clone(),
1940            offset_in_segment: 0,
1941            value: buf,
1942        })
1943        .await
1944        .unwrap();
1945
1946        buf = BytesMut::with_capacity(10);
1947        buf.put_i32(7);
1948        buf.put(&b"aaaaaa"[..]);
1949        tx.send(SegmentDataBuffer {
1950            segment: segment.clone(),
1951            offset_in_segment: 0,
1952            value: buf,
1953        })
1954        .await
1955        .unwrap();
1956
1957        buf = BytesMut::with_capacity(10);
1958        buf.put_u8(b'a');
1959        buf.put_i32(8);
1960        buf.put(&b"aaaaa"[..]);
1961        tx.send(SegmentDataBuffer {
1962            segment: segment.clone(),
1963            offset_in_segment: 0,
1964            value: buf,
1965        })
1966        .await
1967        .unwrap();
1968
1969        buf = BytesMut::with_capacity(10);
1970        buf.put(&b"aaa"[..]);
1971        tx.send(SegmentDataBuffer {
1972            segment: segment.clone(),
1973            offset_in_segment: 0,
1974            value: buf,
1975        })
1976        .await
1977        .unwrap();
1978    }
1979
1980    // This is a test function to generate single events of varying sizes
1981    async fn generate_multiple_variable_sized_events(tx: Sender<SegmentDataBuffer>) {
1982        for i in 1..11 {
1983            let mut buf = BytesMut::with_capacity(32);
1984            buf.put_i32(i); // length.
1985            for _ in 0..i {
1986                buf.put(&b"a"[..]);
1987            }
1988            if let Err(_) = tx
1989                .send(SegmentDataBuffer {
1990                    segment: ScopedSegment::from("test/test/123").to_string(),
1991                    offset_in_segment: 0,
1992                    value: buf,
1993                })
1994                .await
1995            {
1996                warn!("receiver dropped");
1997                return;
1998            }
1999        }
2000    }
2001
2002    // This method reads the header and returns a BytesMut whose size is as big as the event.
2003    fn custom_read_header(data: &mut SegmentDataBuffer) -> Option<SegmentDataBuffer> {
2004        if data.value.remaining() >= 4 {
2005            let mut temp = data.value.bytes();
2006            let len = temp.get_i32();
2007            Some(SegmentDataBuffer {
2008                segment: data.segment.clone(),
2009                offset_in_segment: 0,
2010                value: BytesMut::with_capacity(len as usize),
2011            })
2012        } else {
2013            None
2014        }
2015    }
2016
2017    fn read_n_events(slice: &mut SegmentSlice, events_to_read: usize) {
2018        let mut event_count = 0;
2019        loop {
2020            if event_count == events_to_read {
2021                break;
2022            }
2023            if let Some(event) = slice.next() {
2024                println!("Read event {:?}", event);
2025                assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
2026                event_count += 1;
2027            } else {
2028                println!(
2029                    "Finished reading from segment {:?}, segment is auto released",
2030                    slice.meta.scoped_segment
2031                );
2032                break;
2033            }
2034        }
2035    }
2036
2037    // Helper method to update slice meta
2038    fn create_slice_map(init_segments: Vec<SegmentSlice>) -> HashMap<ScopedSegment, SliceMetadata> {
2039        let mut map = HashMap::with_capacity(init_segments.len());
2040        for s in init_segments {
2041            map.insert(
2042                ScopedSegment::from(s.meta.scoped_segment.clone().as_str()),
2043                s.meta.clone(),
2044            );
2045        }
2046        map
2047    }
2048
2049    fn get_scoped_stream(scope: &str, stream: &str) -> ScopedStream {
2050        let stream: ScopedStream = ScopedStream {
2051            scope: Scope {
2052                name: scope.to_string(),
2053            },
2054            stream: Stream {
2055                name: stream.to_string(),
2056            },
2057        };
2058        stream
2059    }
2060
2061    // Generate events to simulate Pravega SegmentReadCommand.
2062    async fn generate_constant_size_events(
2063        tx: Sender<SegmentReadResult>,
2064        buf_size: usize,
2065        num_events: usize,
2066        segment_id: usize,
2067        should_delay: bool,
2068        mut stop_generation: oneshot::Receiver<()>,
2069    ) {
2070        let mut segment_name = "scope/test/".to_owned();
2071        segment_name.push_str(segment_id.to_string().as_ref());
2072        let mut buf = BytesMut::with_capacity(buf_size);
2073        let mut offset: i64 = 0;
2074        for _i in 1..num_events + 1 {
2075            if let Ok(_) | Err(TryRecvError::Closed) = stop_generation.try_recv() {
2076                break;
2077            }
2078            let mut data = generate_event_data(1); // constant event data.
2079            if data.len() < buf.capacity() - buf.len() {
2080                buf.put(data);
2081            } else {
2082                while data.len() > 0 {
2083                    let free_space = buf.capacity() - buf.len();
2084                    if free_space == 0 {
2085                        if should_delay {
2086                            sleep(Duration::from_millis(100)).await;
2087                        }
2088                        tx.send(Ok(SegmentDataBuffer {
2089                            segment: ScopedSegment::from(segment_name.as_str()).to_string(),
2090                            offset_in_segment: offset,
2091                            value: buf,
2092                        }))
2093                        .await
2094                        .unwrap();
2095                        offset += buf_size as i64;
2096                        buf = BytesMut::with_capacity(buf_size);
2097                    } else if free_space >= data.len() {
2098                        buf.put(data.split());
2099                    } else {
2100                        buf.put(data.split_to(free_space));
2101                    }
2102                }
2103            }
2104        }
2105        // send the last event.
2106        tx.send(Ok(SegmentDataBuffer {
2107            segment: ScopedSegment::from(segment_name.as_str()).to_string(),
2108            offset_in_segment: offset,
2109            value: buf,
2110        }))
2111        .await
2112        .unwrap();
2113    }
2114
2115    // Generate events to simulate Pravega SegmentReadCommand.
2116    async fn generate_variable_size_events(
2117        tx: Sender<SegmentReadResult>,
2118        buf_size: usize,
2119        num_events: usize,
2120        segment_id: usize,
2121        should_delay: bool,
2122    ) {
2123        let mut segment_name = "scope/test/".to_owned();
2124        segment_name.push_str(segment_id.to_string().as_ref());
2125        segment_name.push_str(".#epoch.0");
2126        let mut buf = BytesMut::with_capacity(buf_size);
2127        let mut offset: i64 = 0;
2128        for i in 1..num_events + 1 {
2129            let mut data = generate_event_data(i);
2130            if data.len() < buf.capacity() - buf.len() {
2131                buf.put(data);
2132            } else {
2133                while data.len() > 0 {
2134                    let free_space = buf.capacity() - buf.len();
2135                    if free_space == 0 {
2136                        if should_delay {
2137                            sleep(Duration::from_millis(100)).await;
2138                        }
2139                        tx.send(Ok(SegmentDataBuffer {
2140                            segment: ScopedSegment::from(segment_name.as_str()).to_string(),
2141                            offset_in_segment: offset,
2142                            value: buf,
2143                        }))
2144                        .await
2145                        .unwrap();
2146                        offset += buf_size as i64;
2147                        buf = BytesMut::with_capacity(buf_size);
2148                    } else if free_space >= data.len() {
2149                        buf.put(data.split());
2150                    } else {
2151                        buf.put(data.split_to(free_space));
2152                    }
2153                }
2154            }
2155        }
2156        // send the last event.
2157        tx.send(Ok(SegmentDataBuffer {
2158            segment: ScopedSegment::from(segment_name.as_str()).to_string(),
2159            offset_in_segment: offset,
2160            value: buf,
2161        }))
2162        .await
2163        .unwrap();
2164    }
2165
2166    // create a segment slice object without spawning a background task for testing
2167    fn create_segment_slice(segment_id: i64) -> SegmentSlice {
2168        let mut segment_name = "scope/test/".to_owned();
2169        segment_name.push_str(segment_id.to_string().as_ref());
2170        let segment = ScopedSegment::from(segment_name.as_str());
2171        let segment_slice = SegmentSlice {
2172            meta: SliceMetadata {
2173                start_offset: 0,
2174                scoped_segment: segment.to_string(),
2175                last_event_offset: 0,
2176                read_offset: 0,
2177                end_offset: i64::MAX,
2178                segment_data: SegmentDataBuffer::empty(),
2179                partial_data_present: false,
2180            },
2181            slice_return_tx: None,
2182        };
2183        segment_slice
2184    }
2185
2186    // Helper method to verify if the bytes read by Segment slice are the same.
2187    fn is_all_same<T: Eq>(slice: &[T]) -> bool {
2188        slice
2189            .get(0)
2190            .map(|first| slice.iter().all(|x| x == first))
2191            .unwrap_or(true)
2192    }
2193}