1use crate::client_factory::ClientFactoryAsync;
12use crate::event::reader_group_state::ReaderGroupStateError::SyncError;
13use crate::event::reader_group_state::{Offset, ReaderGroupStateError};
14use crate::segment::reader::ReaderError::SegmentSealed;
15use crate::segment::reader::{AsyncSegmentReader, ReaderError};
16use snafu::{ResultExt, Snafu};
17
18use pravega_client_retry::retry_result::Retryable;
19use pravega_client_shared::{Reader, ScopedSegment, Segment, SegmentWithRange};
20use pravega_wire_protocol::commands::{Command, EventCommand, TYPE_PLUS_LENGTH_SIZE};
21
22use crate::sync::synchronizer::SynchronizerError;
23use bytes::{Buf, BufMut, BytesMut};
24use core::fmt;
25use im::HashMap as ImHashMap;
26use std::collections::{HashMap, HashSet};
27use std::mem;
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30use tokio::runtime::Handle;
31use tokio::sync::mpsc::{Receiver, Sender};
32use tokio::sync::oneshot;
33use tokio::sync::oneshot::error::TryRecvError;
34use tokio::sync::{mpsc, Mutex};
35use tokio::time::timeout;
36use tracing::{debug, error, info, warn};
37
38type ReaderErrorWithOffset = (ReaderError, i64);
39type SegmentReadResult = Result<SegmentDataBuffer, ReaderErrorWithOffset>;
40
41const REBALANCE_INTERVAL: Duration = Duration::from_secs(10);
42
43const UPDATE_OFFSET_INTERVAL: Duration = Duration::from_secs(3);
44
45const READ_BUFFER_SIZE: i32 = 8 * 1024 * 1024; cfg_if::cfg_if! {
48 if #[cfg(test)] {
49 use crate::event::reader_group_state::MockReaderGroupState as ReaderGroupState;
50 } else {
51 use crate::event::reader_group_state::ReaderGroupState;
52 }
53}
54
55pub struct EventReader {
109 pub id: Reader,
110 factory: ClientFactoryAsync,
111 rx: Receiver<SegmentReadResult>,
112 tx: Sender<SegmentReadResult>,
113 meta: ReaderState,
114 rg_state: Arc<Mutex<ReaderGroupState>>,
115}
116
117#[derive(Debug, Snafu)]
118pub enum EventReaderError {
119 #[snafu(display("ReaderGroup State error: {}", source))]
120 StateError { source: ReaderGroupStateError },
121}
122
123impl Drop for EventReader {
124 fn drop(&mut self) {
126 info!("Reader {:?} is dropped", self.id);
127 let r = Handle::try_current();
129 let rg_state = self.rg_state.clone();
130 let id = self.id.clone();
131 let mut meta = mem::take(&mut self.meta);
132 match r {
133 Ok(handle) => {
134 let _ = handle.enter();
136 tokio::spawn(async move {
138 EventReader::reader_offline_internal(id, rg_state, &mut meta)
139 .await
140 .expect("Reader Offline");
141 });
142 info!("Reader {:?} is marked as offline.", self.id);
143 }
144 Err(_) => {
145 let rt = tokio::runtime::Runtime::new().expect("Create tokio runtime to drop reader");
147 rt.spawn(async move {
148 EventReader::reader_offline_internal(id, rg_state, &mut meta)
149 .await
150 .expect("Reader Offline");
151 });
152 info!("Reader {:?} is marked as offline.", self.id);
153 }
154 }
155 }
156}
157
158impl EventReader {
159 pub(crate) async fn init_reader(
162 id: String,
163 rg_state: Arc<Mutex<ReaderGroupState>>,
164 factory: ClientFactoryAsync,
165 ) -> Self {
166 let reader = Reader::from(id);
167 let new_segments_to_acquire = rg_state
168 .lock()
169 .await
170 .compute_segments_to_acquire_or_release(&reader)
171 .await
172 .expect("should compute segments");
173 if new_segments_to_acquire > 0 {
175 for _ in 0..new_segments_to_acquire {
176 if let Some(seg) = rg_state
177 .lock()
178 .await
179 .assign_segment_to_reader(&reader)
180 .await
181 .expect("Error while waiting for segments to be assigned")
182 {
183 debug!("Acquiring segment {:?} for reader {:?}", seg, reader);
184 } else {
185 debug!(
187 "No unassigned segments that can be acquired by the reader {:?}",
188 reader
189 );
190 break;
191 }
192 }
193 }
194 let mut assigned_segments = rg_state
196 .lock()
197 .await
198 .get_segments_for_reader(&reader)
199 .await
200 .expect("Error while fetching currently assigned segments");
201
202 let mut slice_meta_map: HashMap<ScopedSegment, SliceMetadata> = HashMap::new();
203 slice_meta_map.extend(assigned_segments.drain().map(|(seg, offset)| {
204 (
205 seg.clone(),
206 SliceMetadata {
207 scoped_segment: seg.to_string(),
208 start_offset: offset.read,
209 read_offset: offset.read,
210 ..Default::default()
211 },
212 )
213 }));
214
215 let (tx, rx) = mpsc::channel(1);
216 let mut stop_reading_map: HashMap<ScopedSegment, oneshot::Sender<()>> = HashMap::new();
217 slice_meta_map.iter().for_each(|(segment, meta)| {
219 let (tx_stop, rx_stop) = oneshot::channel();
220 stop_reading_map.insert(segment.clone(), tx_stop);
221 factory.runtime_handle().spawn(SegmentSlice::get_segment_data(
222 segment.clone(),
223 meta.start_offset,
224 tx.clone(),
225 rx_stop,
226 factory.clone(),
227 ));
228 });
229
230 EventReader::init_event_reader(
232 rg_state,
233 reader,
234 factory,
235 tx,
236 rx,
237 slice_meta_map,
238 stop_reading_map,
239 )
240 }
241
242 #[doc(hidden)]
243 fn init_event_reader(
244 rg_state: Arc<Mutex<ReaderGroupState>>,
245 id: Reader,
246 factory: ClientFactoryAsync,
247 tx: Sender<SegmentReadResult>,
248 rx: Receiver<SegmentReadResult>,
249 segment_slice_map: HashMap<ScopedSegment, SliceMetadata>,
250 slice_stop_reading: HashMap<ScopedSegment, oneshot::Sender<()>>,
251 ) -> Self {
252 EventReader {
253 id,
254 factory,
255 rx,
256 tx,
257 meta: ReaderState {
258 slices: segment_slice_map,
259 slices_dished_out: Default::default(),
260 slice_release_receiver: HashMap::new(),
261 slice_stop_reading,
262 last_segment_release: Instant::now(),
263 last_segment_acquire: Instant::now(),
264 last_offset_update: Instant::now(),
265 reader_offline: false,
266 },
267 rg_state,
268 }
269 }
270
271 #[doc(hidden)]
273 #[cfg(feature = "integration-test")]
274 pub fn set_last_acquire_release_time(&mut self, time: Instant) {
275 self.meta.last_segment_release = time;
276 self.meta.last_segment_acquire = time;
277 }
278
279 pub async fn release_segment(&mut self, mut slice: SegmentSlice) -> Result<(), EventReaderError> {
285 info!(
286 "releasing segment slice {} from reader {:?}",
287 slice.meta.scoped_segment, self.id
288 );
289 if self.meta.reader_offline {
291 return Err(EventReaderError::StateError {
292 source: ReaderGroupStateError::ReaderAlreadyOfflineError {
293 error_msg: format!("Reader already marked offline {:?}", self.id),
294 source: SynchronizerError::SyncPreconditionError {
295 error_msg: String::from("Precondition failure"),
296 },
297 },
298 });
299 }
300 let scoped_segment = ScopedSegment::from(slice.meta.scoped_segment.clone().as_str());
302 self.meta.add_slices(slice.meta.clone());
303 self.meta.slices_dished_out.remove(&scoped_segment);
304 if self.meta.last_segment_release.elapsed() > REBALANCE_INTERVAL {
305 debug!("try to rebalance segments across readers");
306 let read_offset = slice.meta.read_offset;
307 self.release_segment_from_reader(slice, read_offset).await?;
309 self.meta.last_segment_release = Instant::now();
310 } else {
311 debug!(" slice return to rx success {:?} ", slice.meta);
313 if let Some(tx) = slice.slice_return_tx.take() {
314 if let Err(_e) = tx.send(Some(slice.meta.clone())) {
315 warn!(
316 "Failed to send segment slice release data for slice {:?}",
317 slice.meta
318 );
319 }
320 } else {
321 panic!("This is unexpected, No sender for SegmentSlice present.");
322 }
323 }
324
325 if self.meta.last_offset_update.elapsed() > UPDATE_OFFSET_INTERVAL {
327 let mut offset_map: HashMap<ScopedSegment, Offset> = HashMap::new();
328 for metadata in self.meta.slices.values() {
329 offset_map.insert(
330 ScopedSegment::from(metadata.scoped_segment.as_str()),
331 Offset::new(metadata.read_offset),
332 );
333 }
334 debug!(
335 " update reader position {:?} for reader {:?} ",
336 offset_map, self.id
337 );
338 self.rg_state
339 .lock()
340 .await
341 .update_reader_positions(&self.id, offset_map)
342 .await
343 .context(StateError {})?;
344
345 self.meta.last_offset_update = Instant::now();
346 }
347 Ok(())
348 }
349
350 pub async fn release_segment_at(
356 &mut self,
357 slice: SegmentSlice,
358 offset: i64,
359 ) -> Result<(), EventReaderError> {
360 info!(
361 "releasing segment slice {} at offset {}",
362 slice.meta.scoped_segment, offset
363 );
364 assert!(
365 offset >= 0,
366 "the offset where the segment slice is released should be a positive number"
367 );
368 assert!(
369 slice.meta.start_offset <= offset,
370 "the offset where the segment slice is released should be greater than the start offset"
371 );
372 assert!(
373 slice.meta.end_offset >= offset,
374 "the offset where the segment slice is released should be less than the end offset"
375 );
376 if self.meta.reader_offline {
377 return Err(EventReaderError::StateError {
378 source: ReaderGroupStateError::ReaderAlreadyOfflineError {
379 error_msg: format!("Reader already marked offline {:?}", self.id),
380 source: SynchronizerError::SyncPreconditionError {
381 error_msg: String::from("Precondition failure"),
382 },
383 },
384 });
385 }
386 let segment = ScopedSegment::from(slice.meta.scoped_segment.as_str());
387 if slice.meta.read_offset != offset {
388 self.meta.stop_reading(&segment);
389
390 let slice_meta = SliceMetadata {
391 start_offset: slice.meta.read_offset,
392 scoped_segment: slice.meta.scoped_segment.clone(),
393 last_event_offset: slice.meta.last_event_offset,
394 read_offset: offset,
395 end_offset: slice.meta.end_offset,
396 segment_data: SegmentDataBuffer::empty(),
397 partial_data_present: false,
398 };
399
400 let (tx_drop_fetch, rx_drop_fetch) = oneshot::channel();
402 tokio::spawn(SegmentSlice::get_segment_data(
403 segment.clone(),
404 slice_meta.read_offset, self.tx.clone(),
406 rx_drop_fetch,
407 self.factory.clone(),
408 ));
409 self.meta.add_stop_reading_tx(segment.clone(), tx_drop_fetch);
410 self.meta.add_slices(slice_meta);
411 self.meta.slices_dished_out.remove(&segment);
412 } else {
413 self.release_segment(slice).await?;
414 }
415 Ok(())
416 }
417
418 async fn reader_offline_internal(
425 reader_id: Reader,
426 rg_state: Arc<Mutex<ReaderGroupState>>,
427 meta: &mut ReaderState,
428 ) -> Result<(), EventReaderError> {
429 if !meta.reader_offline && rg_state.lock().await.check_online(&reader_id).await {
430 info!("static Putting reader {:?} offline", reader_id);
431 meta.stop_reading_all();
433 meta.close_all_slice_return_channel();
435 let mut offset_map: HashMap<ScopedSegment, Offset> = HashMap::new();
437 for (seg, slice_meta) in meta.slices_dished_out.drain() {
438 offset_map.insert(seg, Offset::new(slice_meta.read_offset));
439 }
440 for meta in meta.slices.values() {
441 offset_map.insert(
442 ScopedSegment::from(meta.scoped_segment.as_str()),
443 Offset::new(meta.read_offset),
444 );
445 }
446
447 match rg_state.lock().await.remove_reader(&reader_id, offset_map).await {
448 Ok(()) => {
449 meta.reader_offline = true;
450 Ok(())
451 }
452 Err(e) => match e {
453 ReaderGroupStateError::ReaderAlreadyOfflineError { .. } => {
454 meta.reader_offline = true;
455 info!("staticReader {:?} is already offline", reader_id);
456 Ok(())
457 }
458 state_err => Err(EventReaderError::StateError { source: state_err }),
459 },
460 }?
461 }
462 Ok(())
463 }
464
465 pub async fn reader_offline(&mut self) -> Result<(), EventReaderError> {
470 let rg_state = self.rg_state.clone();
471 let id = self.id.clone();
472 let mut meta = mem::take(&mut self.meta);
473 Self::reader_offline_internal(id, rg_state, &mut meta).await
474 }
475
476 async fn release_segment_from_reader(
479 &mut self,
480 mut slice: SegmentSlice,
481 read_offset: i64,
482 ) -> Result<(), EventReaderError> {
483 if self.meta.reader_offline {
484 return Err(EventReaderError::StateError {
485 source: ReaderGroupStateError::ReaderAlreadyOfflineError {
486 error_msg: format!("Reader already marked offline {:?}", self.id),
487 source: SynchronizerError::SyncPreconditionError {
488 error_msg: String::from("Precondition failure"),
489 },
490 },
491 });
492 }
493 let new_segments_to_release = self
494 .rg_state
495 .lock()
496 .await
497 .compute_segments_to_acquire_or_release(&self.id)
498 .await
499 .map_err(|err| EventReaderError::StateError { source: err })?;
500 let segment = ScopedSegment::from(slice.meta.scoped_segment.as_str());
501 if new_segments_to_release < 0 {
503 self.meta.stop_reading(&segment);
505 self.meta
506 .slices
507 .remove(&segment)
508 .expect("Segment missing in meta while releasing from reader");
509 if let Some(tx) = slice.slice_return_tx.take() {
511 if let Err(_e) = tx.send(None) {
512 warn!(
513 "Failed to send segment slice release data for slice {:?}",
514 slice.meta
515 );
516 }
517 } else {
518 panic!("This is unexpected, No sender for SegmentSlice present.");
519 }
520 self.rg_state
521 .lock()
522 .await
523 .release_segment(&self.id, &segment, &Offset::new(read_offset))
524 .await
525 .context(StateError {})?;
526 }
527 Ok(())
528 }
529
530 pub async fn acquire_segment(&mut self) -> Result<Option<SegmentSlice>, EventReaderError> {
542 if self.meta.reader_offline || !self.rg_state.lock().await.check_online(&self.id).await {
543 return Err(EventReaderError::StateError {
544 source: ReaderGroupStateError::ReaderAlreadyOfflineError {
545 error_msg: format!(
546 "Reader already marked offline {:?} or the ReaderGroup is deleted",
547 self.id
548 ),
549 source: SynchronizerError::SyncPreconditionError {
550 error_msg: String::from("Precondition failure"),
551 },
552 },
553 });
554 }
555 if self.meta.last_offset_update.elapsed() > UPDATE_OFFSET_INTERVAL {
557 let mut offset_map: HashMap<ScopedSegment, Offset> = HashMap::new();
558 for metadata in self.meta.slices.values() {
559 offset_map.insert(
560 ScopedSegment::from(metadata.scoped_segment.as_str()),
561 Offset::new(metadata.read_offset),
562 );
563 }
564 debug!(
565 " update reader position {:?} for reader {:?} ",
566 offset_map, self.id
567 );
568 self.rg_state
569 .lock()
570 .await
571 .update_reader_positions(&self.id, offset_map)
572 .await
573 .context(StateError {})?;
574
575 self.meta.last_offset_update = Instant::now();
576 }
577 info!("acquiring segment for reader {:?}", self.id);
578 if self.meta.last_segment_acquire.elapsed() > REBALANCE_INTERVAL {
580 info!("need to rebalance segments across readers");
581 let res = self.assign_segments_to_reader().await.context(StateError {})?;
584 if let Some(new_segments) = res {
585 let current_segments = self
588 .rg_state
589 .lock()
590 .await
591 .get_segments_for_reader(&self.id)
592 .await
593 .map_err(|e| SyncError {
594 error_msg: format!("failed to get segments for reader {:?}", self.id),
595 source: e,
596 })
597 .context(StateError {})?;
598 let new_segments: HashSet<(ScopedSegment, Offset)> = current_segments
599 .into_iter()
600 .filter(|(seg, _off)| new_segments.contains(seg))
601 .collect();
602 debug!("segments which can be read next are {:?}", new_segments);
603 self.initiate_segment_reads(new_segments);
605 self.meta.last_segment_acquire = Instant::now();
606 }
607 }
608 if let Some(segment_with_data) = self.meta.get_segment_id_with_data() {
610 info!("segment {} has data ready to read", segment_with_data);
611 let slice_meta = self.meta.slices.remove(&segment_with_data).unwrap();
612 let segment = ScopedSegment::from(slice_meta.scoped_segment.as_str());
613 let (slice_return_tx, slice_return_rx) = oneshot::channel();
615 self.meta.add_slice_release_receiver(segment, slice_return_rx);
616
617 info!(
618 "segment slice for {:?} is ready for consumption by reader {}",
619 slice_meta.scoped_segment, self.id,
620 );
621 self.meta
622 .slices_dished_out
623 .insert(segment_with_data, slice_meta.copy_meta());
624 Ok(Some(SegmentSlice {
625 meta: slice_meta,
626 slice_return_tx: Some(slice_return_tx),
627 }))
628 } else if let Ok(option) = timeout(Duration::from_millis(1000), self.rx.recv()).await {
629 if let Some(read_result) = option {
630 match read_result {
631 Ok(data) => {
633 let segment = ScopedSegment::from(data.segment.clone().as_str());
634 info!("new data fetched from server for segment {:?}", segment);
635 if let Some(mut slice_meta) = self.meta.remove_segment(segment.clone()).await {
636 if data.offset_in_segment
637 != slice_meta.read_offset + slice_meta.segment_data.value.len() as i64
638 {
639 info!("Data from an invalid offset {:?} observed. Expected offset {:?}. Ignoring this data", data.offset_in_segment, slice_meta.read_offset);
640 Ok(None)
641 } else {
642 EventReader::add_data_to_segment_slice(data, &mut slice_meta);
644
645 let (slice_return_tx, slice_return_rx) = oneshot::channel();
647 self.meta
648 .add_slice_release_receiver(segment.clone(), slice_return_rx);
649 self.meta
650 .slices_dished_out
651 .insert(segment.clone(), slice_meta.copy_meta());
652
653 info!(
654 "segment slice for {:?} is ready for consumption by reader {}",
655 slice_meta, self.id,
656 );
657
658 Ok(Some(SegmentSlice {
659 meta: slice_meta,
660 slice_return_tx: Some(slice_return_tx),
661 }))
662 }
663 } else {
664 debug!("ignore the received data since None was returned");
666 Ok(None)
667 }
668 }
669 Err((e, offset)) => {
670 let segment = ScopedSegment::from(e.get_segment().as_str());
671 debug!(
672 "Reader Error observed {:?} on segment {:?} at offset {:?} ",
673 e, segment, offset
674 );
675 if let Some(slice_meta) = self.meta.remove_segment(segment.clone()).await {
677 if slice_meta.read_offset != offset {
678 info!("Error at an invalid offset {:?} observed. Expected offset {:?}. Ignoring this data", offset, slice_meta.start_offset);
679 self.meta.add_slices(slice_meta);
680 self.meta.slices_dished_out.remove(&segment);
681 } else {
682 info!("Segment slice {:?} has received error {:?}", slice_meta, e);
683 self.fetch_successors(e).await.context(StateError {})?;
684 }
685 }
686 debug!("Segment Slice meta {:?}", self.meta.slices);
687 Ok(None)
688 }
689 }
690 } else {
691 warn!("Error getting updates from segment slice for reader {}", self.id);
692 Ok(None)
693 }
694 } else {
695 info!(
696 "reader {} owns {} slices but none is ready to read",
697 self.id,
698 self.meta.slices.len()
699 );
700 Ok(None)
701 }
702 }
703
704 async fn fetch_successors(&mut self, e: ReaderError) -> Result<(), ReaderGroupStateError> {
707 match e {
708 ReaderError::SegmentSealed {
709 segment,
710 can_retry: _,
711 operation: _,
712 error_msg: _,
713 }
714 | ReaderError::SegmentIsTruncated {
715 segment,
716 can_retry: _,
717 operation: _,
718 error_msg: _,
719 } => {
720 let completed_scoped_segment = ScopedSegment::from(segment.as_str());
721 self.meta.stop_reading(&completed_scoped_segment); let successors = self
725 .factory
726 .controller_client()
727 .get_successors(&completed_scoped_segment)
728 .await
729 .expect("Failed to fetch successors of the segment")
730 .segment_with_predecessors;
731 info!("Segment Completed {:?}", segment);
732 self.rg_state
734 .lock()
735 .await
736 .segment_completed(&self.id, &completed_scoped_segment, &successors)
737 .await?;
738 let option = self.assign_segments_to_reader().await?;
740 if let Some(new_segments) = option {
741 let current_segments = self
743 .rg_state
744 .lock()
745 .await
746 .get_segments_for_reader(&self.id)
747 .await
748 .map_err(|e| SyncError {
749 error_msg: format!("Failed to fetch segments for reader {:?}", self.id),
750 source: e,
751 })?;
752 let new_segments: HashSet<(ScopedSegment, Offset)> = current_segments
753 .into_iter()
754 .filter(|(seg, _off)| new_segments.contains(seg))
755 .collect();
756 debug!("Segments which can be read next are {:?}", new_segments);
757 self.initiate_segment_reads(new_segments);
759 }
760 }
761 _ => error!("Error observed while reading from Pravega {:?}", e),
762 };
763 Ok(())
764 }
765
766 async fn assign_segments_to_reader(&self) -> Result<Option<Vec<ScopedSegment>>, ReaderGroupStateError> {
768 let mut new_segments: Vec<ScopedSegment> = Vec::new();
769 let new_segments_to_acquire = self
770 .rg_state
771 .lock()
772 .await
773 .compute_segments_to_acquire_or_release(&self.id)
774 .await
775 .expect("should compute segments");
776 if new_segments_to_acquire <= 0 {
777 Ok(None)
778 } else {
779 for _ in 0..new_segments_to_acquire {
780 if let Some(seg) = self
781 .rg_state
782 .lock()
783 .await
784 .assign_segment_to_reader(&self.id)
785 .await?
786 {
787 debug!("Acquiring segment {:?} for reader {:?}", seg, self.id);
788 new_segments.push(seg);
789 } else {
790 break;
792 }
793 }
794 debug!("Segments acquired by reader {:?} are {:?}", self.id, new_segments);
795 Ok(Some(new_segments))
796 }
797 }
798
799 fn initiate_segment_reads(&mut self, new_segments: HashSet<(ScopedSegment, Offset)>) {
801 for (seg, offset) in new_segments {
802 let meta = SliceMetadata {
803 scoped_segment: seg.to_string(),
804 start_offset: offset.read,
805 read_offset: offset.read, ..Default::default()
807 };
808 let (tx_drop_fetch, rx_drop_fetch) = oneshot::channel();
809 tokio::spawn(SegmentSlice::get_segment_data(
810 seg.clone(),
811 meta.start_offset,
812 self.tx.clone(),
813 rx_drop_fetch,
814 self.factory.clone(),
815 ));
816 self.meta.add_stop_reading_tx(seg, tx_drop_fetch);
817 self.meta.add_slices(meta);
819 }
820 }
821
822 fn add_data_to_segment_slice(data: SegmentDataBuffer, slice: &mut SliceMetadata) {
824 if slice.segment_data.value.is_empty() {
825 slice.segment_data = data;
826 } else {
827 slice.segment_data.value.put(data.value); slice.partial_data_present = false;
829 }
830 }
831
832 async fn get_successors(
834 &mut self,
835 completed_scoped_segment: &str,
836 ) -> ImHashMap<SegmentWithRange, Vec<Segment>> {
837 let completed_scoped_segment = ScopedSegment::from(completed_scoped_segment);
838 self.factory
839 .controller_client()
840 .get_successors(&completed_scoped_segment)
841 .await
842 .expect("Failed to fetch successors of the segment")
843 .segment_with_predecessors
844 }
845}
846
847struct ReaderState {
849 slices: HashMap<ScopedSegment, SliceMetadata>,
850 slices_dished_out: HashMap<ScopedSegment, SliceMetadata>,
851 slice_release_receiver: HashMap<ScopedSegment, oneshot::Receiver<Option<SliceMetadata>>>,
852 slice_stop_reading: HashMap<ScopedSegment, oneshot::Sender<()>>,
853 last_segment_release: Instant,
854 last_segment_acquire: Instant,
855 last_offset_update: Instant,
856 reader_offline: bool,
857}
858impl Default for ReaderState {
859 fn default() -> Self {
862 ReaderState {
863 slices: HashMap::new(),
864 slices_dished_out: HashMap::new(),
865 slice_release_receiver: HashMap::new(),
866 slice_stop_reading: HashMap::new(),
867 last_segment_release: Instant::now(),
868 last_segment_acquire: Instant::now(),
869 last_offset_update: Instant::now(),
870 reader_offline: false,
871 }
872 }
873}
874
875impl ReaderState {
876 fn add_slice_release_receiver(
878 &mut self,
879 scoped_segment: ScopedSegment,
880 slice_return_rx: oneshot::Receiver<Option<SliceMetadata>>,
881 ) {
882 self.slice_release_receiver
883 .insert(scoped_segment, slice_return_rx);
884 }
885
886 async fn wait_for_segment_slice_return(&mut self, segment: &ScopedSegment) -> Option<SliceMetadata> {
888 if let Some(receiver) = self.slice_release_receiver.remove(segment) {
889 match receiver.await {
890 Ok(returned_meta) => {
891 debug!("SegmentSlice returned {:?}", returned_meta);
892 returned_meta
893 }
894 Err(e) => {
895 error!(
896 "Error Segment slice was not returned for segment {:?}. Error {:?} ",
897 segment, e
898 );
899 self.slices_dished_out.remove(segment)
900 }
901 }
902 } else {
903 warn!(
904 "Invalid segment {:?} provided for while waiting for segment slice return",
905 segment
906 );
907 None
908 }
909 }
910
911 fn close_all_slice_return_channel(&mut self) {
912 for (_, mut rx) in self.slice_release_receiver.drain() {
913 rx.close();
914 }
915 }
916
917 async fn remove_segment(&mut self, segment: ScopedSegment) -> Option<SliceMetadata> {
921 match self.slices.remove(&segment) {
922 Some(meta) => {
923 debug!(
924 "Segment slice {:?} has not been dished out for consumption {:?} meta",
925 &segment, meta
926 );
927 Some(meta)
928 }
929 None => {
930 debug!(
931 "Segment slice for {:?} has already been dished out for consumption",
932 &segment
933 );
934 self.wait_for_segment_slice_return(&segment).await
935 }
936 }
937 }
938
939 fn add_slices(&mut self, meta: SliceMetadata) {
941 if self
942 .slices
943 .insert(ScopedSegment::from(meta.scoped_segment.as_str()), meta)
944 .is_some()
945 {
946 panic!("Pre-condition check failure. Segment slice already present");
947 }
948 }
949
950 fn add_stop_reading_tx(&mut self, segment: ScopedSegment, tx: oneshot::Sender<()>) {
952 assert!(
953 self.slice_stop_reading.insert(segment, tx).is_none(),
954 "Pre-condition check failure. Sender used to stop fetching data is already present"
955 );
956 }
957
958 fn stop_reading(&mut self, segment: &ScopedSegment) {
960 if let Some(tx) = self.slice_stop_reading.remove(segment) {
961 if tx.send(()).is_err() {
962 debug!("Channel already closed, ignoring the error");
963 }
964 }
965 }
966
967 fn stop_reading_all(&mut self) {
969 for (_, tx) in self.slice_stop_reading.drain() {
970 if tx.send(()).is_err() {
971 debug!("Channel already closed, ignoring the error");
972 }
973 }
974 }
975
976 fn get_segment_id_with_data(&self) -> Option<ScopedSegment> {
977 self.slices
978 .iter()
979 .find_map(|(k, v)| if v.has_events() { Some(k.clone()) } else { None })
980 }
981}
982
983#[derive(Debug)]
986pub struct Event {
987 pub offset_in_segment: i64,
988 pub value: Vec<u8>,
989}
990
991#[derive(Default)]
994pub struct SegmentSlice {
995 pub meta: SliceMetadata,
996 pub(crate) slice_return_tx: Option<oneshot::Sender<Option<SliceMetadata>>>,
997}
998
999impl SegmentSlice {
1000 fn new(
1004 segment: ScopedSegment,
1005 start_offset: i64,
1006 slice_return_tx: oneshot::Sender<Option<SliceMetadata>>,
1007 ) -> Self {
1008 SegmentSlice {
1009 meta: SliceMetadata {
1010 start_offset,
1011 scoped_segment: segment.to_string(),
1012 last_event_offset: 0,
1013 read_offset: start_offset,
1014 end_offset: i64::MAX,
1015 segment_data: SegmentDataBuffer::empty(),
1016 partial_data_present: false,
1017 },
1018 slice_return_tx: Some(slice_return_tx),
1019 }
1020 }
1021
1022 async fn get_segment_data(
1024 segment: ScopedSegment,
1025 start_offset: i64,
1026 tx: Sender<SegmentReadResult>,
1027 mut drop_fetch: oneshot::Receiver<()>,
1028 factory: ClientFactoryAsync,
1029 ) {
1030 let mut offset: i64 = start_offset;
1031 let segment_reader = factory.create_async_segment_reader(segment.clone()).await;
1032 loop {
1033 if let Ok(_) | Err(TryRecvError::Closed) = drop_fetch.try_recv() {
1034 info!("Stop reading from the segment");
1035 break;
1036 }
1037 debug!(
1038 "Send read request to Segment store at offset {:?} with length {:?}",
1039 offset, READ_BUFFER_SIZE
1040 );
1041 let read = segment_reader.read(offset, READ_BUFFER_SIZE).await;
1042 match read {
1043 Ok(reply) => {
1044 let len = reply.data.len();
1045 debug!("read data length of {}", len);
1046 if len == 0 && reply.end_of_segment {
1047 info!("Reached end of segment {:?} during read ", segment.clone());
1048 let data = SegmentSealed {
1049 segment: segment.to_string(),
1050 can_retry: false,
1051 operation: "read segment".to_string(),
1052 error_msg: "reached the end of stream".to_string(),
1053 };
1054 if let Err(e) = tx.send(Err((data, offset))).await {
1056 warn!("Error while sending segment data to event parser {:?} ", e);
1057 break;
1058 }
1059 drop(tx);
1060 break;
1061 } else {
1062 let segment_data = bytes::BytesMut::from(reply.data.as_slice());
1063 let data = SegmentDataBuffer {
1064 segment: segment.to_string(),
1065 offset_in_segment: offset,
1066 value: segment_data,
1067 };
1068 if let Err(e) = tx.send(Ok(data)).await {
1070 info!("Error while sending segment data to event parser {:?} ", e);
1071 break;
1072 }
1073 offset += len as i64;
1074 }
1075 }
1076 Err(e) => {
1077 warn!("Error while reading from segment {:?}", e);
1078 if !e.can_retry() {
1079 let _s = tx.send(Err((e, offset))).await;
1080 break;
1081 }
1082 }
1083 }
1084 }
1085 }
1086
1087 fn get_starting_offset(&self) -> i64 {
1089 self.meta.start_offset
1090 }
1091
1092 fn get_segment(&self) -> String {
1094 self.meta.scoped_segment.clone()
1096 }
1097
1098 fn extract_event(
1102 &mut self,
1103 parse_header: fn(&mut SegmentDataBuffer) -> Option<SegmentDataBuffer>,
1104 ) -> Option<Event> {
1105 if let Some(mut event_data) = parse_header(&mut self.meta.segment_data) {
1106 let bytes_to_read = event_data.value.capacity();
1107 if bytes_to_read == 0 {
1108 warn!("Found a header with length as zero");
1109 return None;
1110 }
1111 if self.meta.segment_data.value.remaining() >= bytes_to_read + TYPE_PLUS_LENGTH_SIZE as usize {
1112 self.meta.segment_data.advance(TYPE_PLUS_LENGTH_SIZE as usize);
1113 let t = self.meta.segment_data.split_to(bytes_to_read);
1115 event_data.value.put(t.value);
1116 info!("extract event data with length {}", event_data.value.len());
1117 let event = Event {
1119 offset_in_segment: event_data.offset_in_segment,
1120 value: event_data.value.freeze().to_vec(),
1121 };
1122 Some(event)
1123 } else {
1124 debug!(
1126 "partial event read: data read length {}, target read length {}",
1127 event_data.value.len(),
1128 event_data.value.capacity()
1129 );
1130 self.meta.partial_data_present = true;
1131 None
1132 }
1133 } else {
1134 self.meta.partial_data_present = true;
1135 None
1136 }
1137 }
1138
1139 fn read_header(data: &mut SegmentDataBuffer) -> Option<SegmentDataBuffer> {
1142 if data.value.len() >= TYPE_PLUS_LENGTH_SIZE as usize {
1143 let event_offset = data.offset_in_segment;
1144 let mut bytes_temp = data.value.bytes();
1146 let type_code = bytes_temp.get_i32();
1147 let len = bytes_temp.get_i32();
1148 assert_eq!(type_code, EventCommand::TYPE_CODE, "Expected EventCommand here.");
1149 debug!("Event size is {}", len);
1150 Some(SegmentDataBuffer {
1151 segment: data.segment.clone(),
1152 offset_in_segment: event_offset,
1153 value: BytesMut::with_capacity(len as usize),
1154 })
1155 } else {
1156 None
1157 }
1158 }
1159
1160 pub fn is_empty(&self) -> bool {
1161 self.meta.segment_data.value.is_empty() || self.meta.partial_data_present
1162 }
1163}
1164
1165impl Iterator for SegmentSlice {
1166 type Item = Event;
1167
1168 fn next(&mut self) -> Option<Self::Item> {
1169 let res = self.extract_event(SegmentSlice::read_header);
1171
1172 match res {
1173 Some(event) => {
1174 self.meta.last_event_offset = event.offset_in_segment;
1175 self.meta.read_offset =
1176 event.offset_in_segment + event.value.len() as i64 + TYPE_PLUS_LENGTH_SIZE as i64;
1177 if !self.meta.is_empty() {
1178 assert_eq!(
1179 self.meta.read_offset, self.meta.segment_data.offset_in_segment,
1180 "Error in offset computation"
1181 );
1182 }
1183 Some(event)
1184 }
1185 None => {
1186 if self.meta.is_empty() {
1187 info!(
1188 "Finished reading events from the segment slice of {:?}",
1189 self.meta.scoped_segment
1190 );
1191 } else {
1192 info!("Partial event present in the segment slice of {:?}, this will be returned post a new read request", self.meta.scoped_segment);
1193 }
1194 None
1195 }
1196 }
1197 }
1198}
1199
1200impl fmt::Debug for SegmentSlice {
1201 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1202 f.debug_struct("SegmentSlice").field("meta", &self.meta).finish()
1203 }
1204}
1205
1206impl Drop for SegmentSlice {
1208 fn drop(&mut self) {
1209 if let Some(sender) = self.slice_return_tx.take() {
1210 let _ = sender.send(Some(self.meta.clone()));
1211 }
1212 }
1213}
1214
1215#[derive(Clone)]
1217pub struct SliceMetadata {
1218 pub start_offset: i64,
1219 pub scoped_segment: String,
1220 pub last_event_offset: i64,
1221 pub read_offset: i64,
1222 pub end_offset: i64,
1223 segment_data: SegmentDataBuffer,
1224 pub partial_data_present: bool,
1225}
1226
1227impl SliceMetadata {
1228 fn is_empty(&self) -> bool {
1230 self.segment_data.value.is_empty()
1231 }
1232
1233 pub fn has_events(&self) -> bool {
1235 !self.partial_data_present && self.segment_data.value.len() > TYPE_PLUS_LENGTH_SIZE as usize
1236 }
1237
1238 fn copy_meta(&self) -> SliceMetadata {
1239 SliceMetadata {
1240 start_offset: self.start_offset,
1241 scoped_segment: self.scoped_segment.clone(),
1242 last_event_offset: self.last_event_offset,
1243 read_offset: self.read_offset,
1244 end_offset: self.end_offset,
1245 segment_data: SegmentDataBuffer::empty(),
1246 partial_data_present: false,
1247 }
1248 }
1249}
1250
1251impl fmt::Debug for SliceMetadata {
1252 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1253 f.debug_struct("SliceMetadata")
1254 .field("start_offset", &self.start_offset)
1255 .field("scoped_segment", &self.scoped_segment)
1256 .field("last_event_offset", &self.last_event_offset)
1257 .field("read_offset", &self.read_offset)
1258 .field("end_offset", &self.end_offset)
1259 .field("partial_data_present", &self.partial_data_present)
1260 .finish()
1261 }
1262}
1263
1264impl Default for SliceMetadata {
1265 fn default() -> Self {
1266 SliceMetadata {
1267 start_offset: Default::default(),
1268 scoped_segment: Default::default(),
1269 last_event_offset: Default::default(),
1270 read_offset: Default::default(),
1271 end_offset: i64::MAX,
1272 segment_data: SegmentDataBuffer::empty(),
1273 partial_data_present: false,
1274 }
1275 }
1276}
1277
1278#[derive(Clone)]
1280struct SegmentDataBuffer {
1281 pub(crate) segment: String,
1282 pub(crate) offset_in_segment: i64,
1283 pub(crate) value: BytesMut,
1284}
1285
1286impl fmt::Debug for SegmentDataBuffer {
1287 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1288 f.debug_struct("SegmentDataBuffer")
1289 .field("segment", &self.segment)
1290 .field("offset in segment", &self.offset_in_segment)
1291 .field("buffer length", &self.value.len())
1292 .finish()
1293 }
1294}
1295
1296impl SegmentDataBuffer {
1297 pub fn split(&mut self) -> SegmentDataBuffer {
1301 let res = self.value.split();
1302 let old_offset = self.offset_in_segment;
1303 let new_offset = old_offset + res.len() as i64;
1304 self.offset_in_segment = new_offset;
1305 SegmentDataBuffer {
1306 segment: self.segment.clone(),
1307 offset_in_segment: old_offset,
1308 value: res,
1309 }
1310 }
1311
1312 pub fn split_to(&mut self, at: usize) -> SegmentDataBuffer {
1323 let old_offset = self.offset_in_segment;
1324 let res = self.value.split_to(at);
1325 self.offset_in_segment = old_offset + at as i64;
1326
1327 SegmentDataBuffer {
1328 segment: self.segment.clone(),
1329 offset_in_segment: old_offset,
1330 value: res,
1331 }
1332 }
1333
1334 pub fn get_i32(&mut self) -> i32 {
1342 let result = self.value.get_i32();
1343 self.offset_in_segment += 4;
1344 result
1345 }
1346
1347 pub fn advance(&mut self, cnt: usize) {
1349 self.value.advance(cnt);
1350 self.offset_in_segment += cnt as i64;
1351 }
1352
1353 pub fn empty() -> SegmentDataBuffer {
1355 SegmentDataBuffer {
1356 segment: Default::default(),
1357 offset_in_segment: 0,
1358 value: BytesMut::with_capacity(0),
1359 }
1360 }
1361}
1362
1363#[cfg(test)]
1364mod tests {
1365 use super::*;
1366 use crate::client_factory::ClientFactory;
1367 use crate::event::reader_group_state::ReaderGroupStateError;
1368 use crate::sync::synchronizer::SynchronizerError;
1369
1370 use bytes::{Buf, BufMut, BytesMut};
1371 use mockall::predicate;
1372 use mockall::predicate::*;
1373 use pravega_client_config::{ClientConfigBuilder, MOCK_CONTROLLER_URI};
1374 use pravega_client_shared::{Reader, Scope, ScopedSegment, ScopedStream, Stream};
1375 use pravega_wire_protocol::commands::{Command, EventCommand};
1376 use std::collections::HashMap;
1377 use std::iter;
1378 use std::sync::Arc;
1379 use tokio::sync::mpsc::Sender;
1380 use tokio::sync::oneshot;
1381 use tokio::sync::oneshot::error::TryRecvError;
1382 use tokio::sync::{mpsc, Mutex};
1383 use tokio::time::{sleep, Duration};
1384 use tracing::Level;
1385
1386 #[test]
1388 fn test_read_events_single_segment() {
1389 const NUM_EVENTS: usize = 100;
1390 let (tx, rx) = mpsc::channel(1);
1391 tracing_subscriber::fmt().with_max_level(Level::TRACE).finish();
1392 let cf = ClientFactory::new(
1393 ClientConfigBuilder::default()
1394 .controller_uri(MOCK_CONTROLLER_URI)
1395 .build()
1396 .unwrap(),
1397 );
1398
1399 let _guard = cf.runtime().enter();
1401 tokio::spawn(generate_variable_size_events(
1402 tx.clone(),
1403 10,
1404 NUM_EVENTS,
1405 0,
1406 false,
1407 ));
1408
1409 let init_segments = vec![create_segment_slice(0), create_segment_slice(1)];
1411 let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
1412 rg_mock.expect_check_online().return_const(true);
1413 rg_mock
1414 .expect_compute_segments_to_acquire_or_release()
1415 .return_once(move |_| Ok(0 as isize));
1416 rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
1417 let mut reader = EventReader::init_event_reader(
1419 Arc::new(Mutex::new(rg_mock)),
1420 Reader::from("r1".to_string()),
1421 cf.to_async(),
1422 tx.clone(),
1423 rx,
1424 create_slice_map(init_segments),
1425 HashMap::new(),
1426 );
1427
1428 let mut event_count = 0;
1429 let mut event_size = 0;
1430
1431 while let Some(mut slice) = cf.runtime().block_on(reader.acquire_segment()).unwrap() {
1433 loop {
1434 if let Some(event) = slice.next() {
1435 println!("Read event {:?}", event);
1436 assert_eq!(event.value.len(), event_size + 1, "Event has been missed");
1437 assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
1438 event_size += 1;
1439 event_count += 1;
1440 } else {
1441 println!(
1442 "Finished reading from segment {:?}, segment is auto released",
1443 slice.meta.scoped_segment
1444 );
1445 break; }
1447 }
1448 if event_count == NUM_EVENTS {
1449 break;
1451 }
1452 }
1453 }
1454
1455 #[test]
1456 fn test_acquire_segments() {
1457 const NUM_EVENTS: usize = 10;
1458 let (tx, rx) = mpsc::channel(1);
1459 tracing_subscriber::fmt().with_max_level(Level::TRACE).finish();
1460 let cf = ClientFactory::new(
1461 ClientConfigBuilder::default()
1462 .controller_uri(MOCK_CONTROLLER_URI)
1463 .build()
1464 .unwrap(),
1465 );
1466
1467 let _guard = cf.runtime().enter();
1469 tokio::spawn(generate_variable_size_events(
1470 tx.clone(),
1471 1024,
1472 NUM_EVENTS,
1473 0,
1474 false,
1475 ));
1476
1477 let init_segments = vec![create_segment_slice(0)];
1479 let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
1480 rg_mock
1481 .expect_compute_segments_to_acquire_or_release()
1482 .with(predicate::eq(Reader::from("r1".to_string())))
1483 .return_once(move |_| Ok(1 as isize));
1484 rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
1485 rg_mock.expect_check_online().return_const(true);
1486 let res: Result<Option<ScopedSegment>, ReaderGroupStateError> =
1488 Ok(Some(ScopedSegment::from("scope/test/1.#epoch.0")));
1489 rg_mock
1490 .expect_assign_segment_to_reader()
1491 .with(predicate::eq(Reader::from("r1".to_string())))
1492 .return_once(move |_| res);
1493 let mut new_current_segments: HashSet<(ScopedSegment, Offset)> = HashSet::new();
1495 new_current_segments.insert((ScopedSegment::from("scope/test/1.#epoch.0"), Offset::new(0)));
1496 new_current_segments.insert((ScopedSegment::from("scope/test/0.#epoch.0"), Offset::new(0)));
1497 let res: Result<HashSet<(ScopedSegment, Offset)>, SynchronizerError> = Ok(new_current_segments);
1498 rg_mock
1499 .expect_get_segments_for_reader()
1500 .with(predicate::eq(Reader::from("r1".to_string())))
1501 .return_once(move |_| res);
1502
1503 tokio::spawn(generate_variable_size_events(
1505 tx.clone(),
1506 1024,
1507 NUM_EVENTS,
1508 1,
1509 false,
1510 ));
1511
1512 let before_time = Instant::now() - Duration::from_secs(15);
1513 let mut reader = EventReader::init_event_reader(
1515 Arc::new(Mutex::new(rg_mock)),
1516 Reader::from("r1".to_string()),
1517 cf.to_async(),
1518 tx.clone(),
1519 rx,
1520 create_slice_map(init_segments),
1521 HashMap::new(),
1522 );
1523 reader.set_last_acquire_release_time(before_time);
1524
1525 let mut event_count = 0;
1526
1527 while let Some(mut slice) = cf.runtime().block_on(reader.acquire_segment()).unwrap() {
1529 loop {
1530 if let Some(event) = slice.next() {
1531 println!("Read event {:?}", event);
1532 assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
1533 event_count += 1;
1534 } else {
1535 println!(
1536 "Finished reading from segment {:?}, segment is auto released",
1537 slice.meta.scoped_segment
1538 );
1539 break; }
1541 }
1542 if event_count == NUM_EVENTS + NUM_EVENTS {
1543 break;
1545 }
1546 }
1547 assert_eq!(event_count, NUM_EVENTS + NUM_EVENTS);
1548 }
1549
1550 #[test]
1552 fn test_read_events_multiple_segments() {
1553 const NUM_EVENTS: usize = 100;
1554 let (tx, rx) = mpsc::channel(1);
1555 tracing_subscriber::fmt().with_max_level(Level::TRACE).finish();
1556 let cf = ClientFactory::new(
1557 ClientConfigBuilder::default()
1558 .controller_uri(MOCK_CONTROLLER_URI)
1559 .build()
1560 .unwrap(),
1561 );
1562
1563 let _guard = cf.runtime().enter();
1565 tokio::spawn(generate_variable_size_events(
1566 tx.clone(),
1567 100,
1568 NUM_EVENTS,
1569 0,
1570 false,
1571 ));
1572 tokio::spawn(generate_variable_size_events(
1574 tx.clone(),
1575 100,
1576 NUM_EVENTS,
1577 1,
1578 true,
1579 ));
1580
1581 let init_segments = vec![create_segment_slice(0), create_segment_slice(1)];
1583 let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
1584 rg_mock
1585 .expect_compute_segments_to_acquire_or_release()
1586 .return_once(move |_| Ok(0 as isize));
1587 rg_mock.expect_check_online().return_const(true);
1588 rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
1589 rg_mock
1590 .expect_update_reader_positions()
1591 .return_once(move |_, _| Ok(()));
1592 let mut reader = EventReader::init_event_reader(
1594 Arc::new(Mutex::new(rg_mock)),
1595 Reader::from("r1".to_string()),
1596 cf.to_async(),
1597 tx.clone(),
1598 rx,
1599 create_slice_map(init_segments),
1600 HashMap::new(),
1601 );
1602
1603 let mut event_count_per_segment: HashMap<String, usize> = HashMap::new();
1604
1605 let mut total_events_read = 0;
1606 while let Some(mut slice) = cf.runtime().block_on(reader.acquire_segment()).unwrap() {
1608 let segment = slice.meta.scoped_segment.clone();
1609 println!("Received Segment Slice {:?}", segment);
1610 let mut event_count = 0;
1611 loop {
1612 if let Some(event) = slice.next() {
1613 println!("Read event {:?}", event);
1614 assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
1615 event_count += 1;
1616 } else {
1617 println!(
1618 "Finished reading from segment {:?}, segment is auto released",
1619 slice.meta.scoped_segment
1620 );
1621 break; }
1623 }
1624 total_events_read += event_count;
1625 *event_count_per_segment
1626 .entry(segment.clone())
1627 .or_insert(event_count) += event_count;
1628 if total_events_read == NUM_EVENTS * 2 {
1629 break;
1631 }
1632 }
1633 }
1634
1635 #[test]
1636 fn test_return_slice() {
1637 const NUM_EVENTS: usize = 2;
1638 let (tx, rx) = mpsc::channel(1);
1639 tracing_subscriber::fmt().with_max_level(Level::TRACE).finish();
1640 let cf = ClientFactory::new(
1641 ClientConfigBuilder::default()
1642 .controller_uri(MOCK_CONTROLLER_URI)
1643 .build()
1644 .unwrap(),
1645 );
1646
1647 let _guard = cf.runtime().enter();
1649 tokio::spawn(generate_variable_size_events(
1650 tx.clone(),
1651 10,
1652 NUM_EVENTS,
1653 0,
1654 false,
1655 ));
1656
1657 let init_segments = vec![create_segment_slice(0), create_segment_slice(1)];
1659
1660 let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
1661 rg_mock.expect_check_online().return_const(true);
1662 rg_mock
1663 .expect_compute_segments_to_acquire_or_release()
1664 .return_once(move |_| Ok(0 as isize));
1665 rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
1666 let mut reader = EventReader::init_event_reader(
1668 Arc::new(Mutex::new(rg_mock)),
1669 Reader::from("r1".to_string()),
1670 cf.to_async(),
1671 tx.clone(),
1672 rx,
1673 create_slice_map(init_segments),
1674 HashMap::new(),
1675 );
1676
1677 let mut slice = cf
1679 .runtime()
1680 .block_on(reader.acquire_segment())
1681 .expect("Failed to acquire segment since the reader is offline")
1682 .unwrap();
1683
1684 let event = slice.next().unwrap();
1686 assert_eq!(event.value.len(), 1);
1687 assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
1688 assert_eq!(event.offset_in_segment, 0); let _ = cf.runtime().block_on(reader.release_segment(slice));
1692
1693 let slice = cf
1695 .runtime()
1696 .block_on(reader.acquire_segment())
1697 .expect("Failed to acquire segment since the reader is offline")
1698 .unwrap();
1699
1700 let _ = cf.runtime().block_on(reader.release_segment(slice));
1702
1703 let mut slice = cf
1705 .runtime()
1706 .block_on(reader.acquire_segment())
1707 .expect("Failed to acquire segment")
1708 .unwrap();
1709 let event = slice.next().unwrap();
1711 assert_eq!(event.value.len(), 2);
1712 assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
1713 assert_eq!(event.offset_in_segment, 8 + 1); }
1715
1716 #[test]
1717 fn test_return_slice_at_offset() {
1718 const NUM_EVENTS: usize = 2;
1719 let (tx, rx) = mpsc::channel(1);
1720 let (stop_tx, stop_rx) = oneshot::channel();
1721 tracing_subscriber::fmt().with_max_level(Level::TRACE).finish();
1722 let cf = ClientFactory::new(
1723 ClientConfigBuilder::default()
1724 .controller_uri(MOCK_CONTROLLER_URI)
1725 .build()
1726 .unwrap(),
1727 );
1728
1729 let _guard = cf.runtime().enter();
1731 tokio::spawn(generate_constant_size_events(
1732 tx.clone(),
1733 20,
1734 NUM_EVENTS,
1735 0,
1736 false,
1737 stop_rx,
1738 ));
1739 let mut stop_reading_map: HashMap<ScopedSegment, oneshot::Sender<()>> = HashMap::new();
1740 stop_reading_map.insert(ScopedSegment::from("scope/test/0.#epoch.0"), stop_tx);
1741
1742 let init_segments = vec![create_segment_slice(0), create_segment_slice(1)];
1744 let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
1745 rg_mock.expect_check_online().return_const(true);
1746 rg_mock
1747 .expect_compute_segments_to_acquire_or_release()
1748 .return_once(move |_| Ok(0 as isize));
1749 rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
1750 let mut reader = EventReader::init_event_reader(
1752 Arc::new(Mutex::new(rg_mock)),
1753 Reader::from("r1".to_string()),
1754 cf.to_async(),
1755 tx.clone(),
1756 rx,
1757 create_slice_map(init_segments),
1758 stop_reading_map,
1759 );
1760
1761 let mut slice = cf
1763 .runtime()
1764 .block_on(reader.acquire_segment())
1765 .expect("Failed to acquire segment")
1766 .unwrap();
1767
1768 let event = slice.next().unwrap();
1770 assert_eq!(event.value.len(), 1);
1771 assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
1772 assert_eq!(event.offset_in_segment, 0); let result = slice.next();
1775 assert!(result.is_some());
1776 let event = result.unwrap();
1777 assert_eq!(event.value.len(), 1);
1778 assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
1779 assert_eq!(event.offset_in_segment, 9); let _ = cf.runtime().block_on(reader.release_segment_at(slice, 0));
1783
1784 let (_stop_tx, stop_rx) = oneshot::channel();
1786 tokio::spawn(generate_constant_size_events(
1787 tx.clone(),
1788 20,
1789 NUM_EVENTS,
1790 0,
1791 false,
1792 stop_rx,
1793 ));
1794
1795 let mut slice = cf
1797 .runtime()
1798 .block_on(reader.acquire_segment())
1799 .expect("Failed to acquire segment")
1800 .unwrap();
1801 let event = slice.next().unwrap();
1803 assert_eq!(event.value.len(), 1);
1804 assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
1805 assert_eq!(event.offset_in_segment, 0); }
1807
1808 #[tokio::test]
1809 async fn test_read_partial_events_buffer_10() {
1810 let (tx, mut rx) = mpsc::channel(1);
1811 tokio::spawn(generate_variable_size_events(tx, 10, 20, 0, false));
1812 let mut segment_slice = create_segment_slice(0);
1813 let mut expected_offset: usize = 0;
1814 let mut expected_event_len = 0;
1815
1816 loop {
1817 if segment_slice.is_empty() {
1818 if let Some(response) = rx.recv().await {
1819 segment_slice
1820 .meta
1821 .segment_data
1822 .value
1823 .put(response.expect("get response").value);
1824 } else {
1825 break; }
1827 }
1828
1829 while let Some(d) = segment_slice.next() {
1830 assert_eq!(expected_offset, d.offset_in_segment as usize);
1831 assert_eq!(expected_event_len + 1, d.value.len());
1832 assert!(is_all_same(d.value.as_slice()));
1833 expected_offset += 8 + expected_event_len + 1;
1834 expected_event_len += 1;
1835 }
1836 }
1837 assert_eq!(20, expected_event_len);
1838 }
1839
1840 #[tokio::test]
1841 async fn test_read_partial_events_buffer_100() {
1842 let (tx, mut rx) = mpsc::channel(1);
1843 tokio::spawn(generate_variable_size_events(tx, 100, 200, 0, false));
1844 let mut segment_slice = create_segment_slice(0);
1845 let mut expected_offset: usize = 0;
1846 let mut expected_event_len = 0;
1847
1848 loop {
1849 if segment_slice.is_empty() {
1850 if let Some(response) = rx.recv().await {
1851 segment_slice
1852 .meta
1853 .segment_data
1854 .value
1855 .put(response.expect("get response").value);
1856 } else {
1857 break; }
1859 }
1860
1861 while let Some(d) = segment_slice.next() {
1862 assert_eq!(expected_offset, d.offset_in_segment as usize);
1863 assert_eq!(expected_event_len + 1, d.value.len());
1864 assert!(is_all_same(d.value.as_slice()));
1865 expected_offset += 8 + expected_event_len + 1;
1866 expected_event_len += 1;
1867 }
1868 }
1869 assert_eq!(200, expected_event_len);
1870 }
1871
1872 fn generate_event_data(len: usize) -> BytesMut {
1875 let mut buf = BytesMut::with_capacity(len + 8);
1876 buf.put_i32(EventCommand::TYPE_CODE);
1877 buf.put_i32(len as i32); let mut data = Vec::new();
1880 data.extend(iter::repeat(b'a').take(len));
1881 buf.put(data.as_slice());
1882 buf
1883 }
1884
1885 async fn generate_multiple_constant_size_events(tx: Sender<SegmentDataBuffer>) {
1887 let mut buf = BytesMut::with_capacity(10);
1888 let segment = ScopedSegment::from("test/test/123").to_string();
1889
1890 buf.put_i32(1);
1891 buf.put_u8(b'a');
1892 buf.put_i32(2);
1893 buf.put(&b"aa"[..]);
1894 tx.send(SegmentDataBuffer {
1895 segment: segment.clone(),
1896 offset_in_segment: 0,
1897 value: buf,
1898 })
1899 .await
1900 .unwrap();
1901
1902 buf = BytesMut::with_capacity(10);
1903 buf.put_i32(3);
1904 buf.put(&b"aaa"[..]);
1905 tx.send(SegmentDataBuffer {
1906 segment: segment.clone(),
1907 offset_in_segment: 0,
1908 value: buf,
1909 })
1910 .await
1911 .unwrap();
1912
1913 buf = BytesMut::with_capacity(10);
1914 buf.put_i32(4);
1915 buf.put(&b"aaaa"[..]);
1916 tx.send(SegmentDataBuffer {
1917 segment: segment.clone(),
1918 offset_in_segment: 0,
1919 value: buf,
1920 })
1921 .await
1922 .unwrap();
1923
1924 buf = BytesMut::with_capacity(10);
1925 buf.put_i32(5);
1926 buf.put(&b"aaaaa"[..]);
1927 tx.send(SegmentDataBuffer {
1928 segment: segment.clone(),
1929 offset_in_segment: 0,
1930 value: buf,
1931 })
1932 .await
1933 .unwrap();
1934
1935 buf = BytesMut::with_capacity(10);
1936 buf.put_i32(6);
1937 buf.put(&b"aaaaaa"[..]);
1938 tx.send(SegmentDataBuffer {
1939 segment: segment.clone(),
1940 offset_in_segment: 0,
1941 value: buf,
1942 })
1943 .await
1944 .unwrap();
1945
1946 buf = BytesMut::with_capacity(10);
1947 buf.put_i32(7);
1948 buf.put(&b"aaaaaa"[..]);
1949 tx.send(SegmentDataBuffer {
1950 segment: segment.clone(),
1951 offset_in_segment: 0,
1952 value: buf,
1953 })
1954 .await
1955 .unwrap();
1956
1957 buf = BytesMut::with_capacity(10);
1958 buf.put_u8(b'a');
1959 buf.put_i32(8);
1960 buf.put(&b"aaaaa"[..]);
1961 tx.send(SegmentDataBuffer {
1962 segment: segment.clone(),
1963 offset_in_segment: 0,
1964 value: buf,
1965 })
1966 .await
1967 .unwrap();
1968
1969 buf = BytesMut::with_capacity(10);
1970 buf.put(&b"aaa"[..]);
1971 tx.send(SegmentDataBuffer {
1972 segment: segment.clone(),
1973 offset_in_segment: 0,
1974 value: buf,
1975 })
1976 .await
1977 .unwrap();
1978 }
1979
1980 async fn generate_multiple_variable_sized_events(tx: Sender<SegmentDataBuffer>) {
1982 for i in 1..11 {
1983 let mut buf = BytesMut::with_capacity(32);
1984 buf.put_i32(i); for _ in 0..i {
1986 buf.put(&b"a"[..]);
1987 }
1988 if let Err(_) = tx
1989 .send(SegmentDataBuffer {
1990 segment: ScopedSegment::from("test/test/123").to_string(),
1991 offset_in_segment: 0,
1992 value: buf,
1993 })
1994 .await
1995 {
1996 warn!("receiver dropped");
1997 return;
1998 }
1999 }
2000 }
2001
2002 fn custom_read_header(data: &mut SegmentDataBuffer) -> Option<SegmentDataBuffer> {
2004 if data.value.remaining() >= 4 {
2005 let mut temp = data.value.bytes();
2006 let len = temp.get_i32();
2007 Some(SegmentDataBuffer {
2008 segment: data.segment.clone(),
2009 offset_in_segment: 0,
2010 value: BytesMut::with_capacity(len as usize),
2011 })
2012 } else {
2013 None
2014 }
2015 }
2016
2017 fn read_n_events(slice: &mut SegmentSlice, events_to_read: usize) {
2018 let mut event_count = 0;
2019 loop {
2020 if event_count == events_to_read {
2021 break;
2022 }
2023 if let Some(event) = slice.next() {
2024 println!("Read event {:?}", event);
2025 assert!(is_all_same(event.value.as_slice()), "Event has been corrupted");
2026 event_count += 1;
2027 } else {
2028 println!(
2029 "Finished reading from segment {:?}, segment is auto released",
2030 slice.meta.scoped_segment
2031 );
2032 break;
2033 }
2034 }
2035 }
2036
2037 fn create_slice_map(init_segments: Vec<SegmentSlice>) -> HashMap<ScopedSegment, SliceMetadata> {
2039 let mut map = HashMap::with_capacity(init_segments.len());
2040 for s in init_segments {
2041 map.insert(
2042 ScopedSegment::from(s.meta.scoped_segment.clone().as_str()),
2043 s.meta.clone(),
2044 );
2045 }
2046 map
2047 }
2048
2049 fn get_scoped_stream(scope: &str, stream: &str) -> ScopedStream {
2050 let stream: ScopedStream = ScopedStream {
2051 scope: Scope {
2052 name: scope.to_string(),
2053 },
2054 stream: Stream {
2055 name: stream.to_string(),
2056 },
2057 };
2058 stream
2059 }
2060
2061 async fn generate_constant_size_events(
2063 tx: Sender<SegmentReadResult>,
2064 buf_size: usize,
2065 num_events: usize,
2066 segment_id: usize,
2067 should_delay: bool,
2068 mut stop_generation: oneshot::Receiver<()>,
2069 ) {
2070 let mut segment_name = "scope/test/".to_owned();
2071 segment_name.push_str(segment_id.to_string().as_ref());
2072 let mut buf = BytesMut::with_capacity(buf_size);
2073 let mut offset: i64 = 0;
2074 for _i in 1..num_events + 1 {
2075 if let Ok(_) | Err(TryRecvError::Closed) = stop_generation.try_recv() {
2076 break;
2077 }
2078 let mut data = generate_event_data(1); if data.len() < buf.capacity() - buf.len() {
2080 buf.put(data);
2081 } else {
2082 while data.len() > 0 {
2083 let free_space = buf.capacity() - buf.len();
2084 if free_space == 0 {
2085 if should_delay {
2086 sleep(Duration::from_millis(100)).await;
2087 }
2088 tx.send(Ok(SegmentDataBuffer {
2089 segment: ScopedSegment::from(segment_name.as_str()).to_string(),
2090 offset_in_segment: offset,
2091 value: buf,
2092 }))
2093 .await
2094 .unwrap();
2095 offset += buf_size as i64;
2096 buf = BytesMut::with_capacity(buf_size);
2097 } else if free_space >= data.len() {
2098 buf.put(data.split());
2099 } else {
2100 buf.put(data.split_to(free_space));
2101 }
2102 }
2103 }
2104 }
2105 tx.send(Ok(SegmentDataBuffer {
2107 segment: ScopedSegment::from(segment_name.as_str()).to_string(),
2108 offset_in_segment: offset,
2109 value: buf,
2110 }))
2111 .await
2112 .unwrap();
2113 }
2114
2115 async fn generate_variable_size_events(
2117 tx: Sender<SegmentReadResult>,
2118 buf_size: usize,
2119 num_events: usize,
2120 segment_id: usize,
2121 should_delay: bool,
2122 ) {
2123 let mut segment_name = "scope/test/".to_owned();
2124 segment_name.push_str(segment_id.to_string().as_ref());
2125 segment_name.push_str(".#epoch.0");
2126 let mut buf = BytesMut::with_capacity(buf_size);
2127 let mut offset: i64 = 0;
2128 for i in 1..num_events + 1 {
2129 let mut data = generate_event_data(i);
2130 if data.len() < buf.capacity() - buf.len() {
2131 buf.put(data);
2132 } else {
2133 while data.len() > 0 {
2134 let free_space = buf.capacity() - buf.len();
2135 if free_space == 0 {
2136 if should_delay {
2137 sleep(Duration::from_millis(100)).await;
2138 }
2139 tx.send(Ok(SegmentDataBuffer {
2140 segment: ScopedSegment::from(segment_name.as_str()).to_string(),
2141 offset_in_segment: offset,
2142 value: buf,
2143 }))
2144 .await
2145 .unwrap();
2146 offset += buf_size as i64;
2147 buf = BytesMut::with_capacity(buf_size);
2148 } else if free_space >= data.len() {
2149 buf.put(data.split());
2150 } else {
2151 buf.put(data.split_to(free_space));
2152 }
2153 }
2154 }
2155 }
2156 tx.send(Ok(SegmentDataBuffer {
2158 segment: ScopedSegment::from(segment_name.as_str()).to_string(),
2159 offset_in_segment: offset,
2160 value: buf,
2161 }))
2162 .await
2163 .unwrap();
2164 }
2165
2166 fn create_segment_slice(segment_id: i64) -> SegmentSlice {
2168 let mut segment_name = "scope/test/".to_owned();
2169 segment_name.push_str(segment_id.to_string().as_ref());
2170 let segment = ScopedSegment::from(segment_name.as_str());
2171 let segment_slice = SegmentSlice {
2172 meta: SliceMetadata {
2173 start_offset: 0,
2174 scoped_segment: segment.to_string(),
2175 last_event_offset: 0,
2176 read_offset: 0,
2177 end_offset: i64::MAX,
2178 segment_data: SegmentDataBuffer::empty(),
2179 partial_data_present: false,
2180 },
2181 slice_return_tx: None,
2182 };
2183 segment_slice
2184 }
2185
2186 fn is_all_same<T: Eq>(slice: &[T]) -> bool {
2188 slice
2189 .get(0)
2190 .map(|first| slice.iter().all(|x| x == first))
2191 .unwrap_or(true)
2192 }
2193}