1#![forbid(unsafe_code)]
8#![warn(missing_docs)]
9#![allow(clippy::significant_drop_tightening)]
10
11use std::collections::HashMap;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::{Arc, RwLock};
14
15use async_trait::async_trait;
16use eventcore::{
17 Checkpoint, EventProcessor, EventStore, EventStoreError, EventVersion, ExpectedVersion,
18 ReadOptions, StoredEvent, StreamData, StreamEvents, StreamId, Subscription, SubscriptionError,
19 SubscriptionName, SubscriptionOptions, SubscriptionPosition, SubscriptionResult, Timestamp,
20};
21
22type EventStoreResult<T> = Result<T, EventStoreError>;
23
24#[derive(Clone)]
26pub struct InMemoryEventStore<E>
27where
28 E: Send + Sync + Clone + 'static + PartialEq + Eq,
29{
30 streams: Arc<RwLock<HashMap<StreamId, Vec<StoredEvent<E>>>>>,
32 versions: Arc<RwLock<HashMap<StreamId, EventVersion>>>,
34}
35
36impl<E> InMemoryEventStore<E>
37where
38 E: Send + Sync + Clone + 'static + PartialEq + Eq,
39{
40 pub fn new() -> Self {
42 Self {
43 streams: Arc::new(RwLock::new(HashMap::new())),
44 versions: Arc::new(RwLock::new(HashMap::new())),
45 }
46 }
47}
48
49impl<E> Default for InMemoryEventStore<E>
50where
51 E: Send + Sync + Clone + 'static + PartialEq + Eq,
52{
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58#[async_trait]
59impl<E> EventStore for InMemoryEventStore<E>
60where
61 E: Send + Sync + Clone + 'static + PartialEq + Eq,
62{
63 type Event = E;
64
65 async fn read_streams(
66 &self,
67 stream_ids: &[StreamId],
68 options: &ReadOptions,
69 ) -> EventStoreResult<StreamData<Self::Event>> {
70 let streams = self.streams.read().map_err(|_| {
71 EventStoreError::Internal("Failed to acquire read lock on streams".to_string())
72 })?;
73
74 let versions = self.versions.read().map_err(|_| {
75 EventStoreError::Internal("Failed to acquire read lock on versions".to_string())
76 })?;
77
78 let mut all_events = Vec::new();
79 let mut stream_versions = HashMap::new();
80
81 for stream_id in stream_ids {
82 let version = versions
83 .get(stream_id)
84 .copied()
85 .unwrap_or_else(EventVersion::initial);
86 stream_versions.insert(stream_id.clone(), version);
87
88 if let Some(stream_events) = streams.get(stream_id) {
89 for event in stream_events {
90 if let Some(from_version) = options.from_version {
92 if event.event_version < from_version {
93 continue;
94 }
95 }
96
97 if let Some(to_version) = options.to_version {
99 if event.event_version > to_version {
100 continue;
101 }
102 }
103
104 all_events.push(event.clone());
105 }
106 }
107 }
108
109 all_events.sort_by_key(|e| e.event_id);
111
112 if let Some(max_events) = options.max_events {
114 all_events.truncate(max_events);
115 }
116
117 Ok(StreamData::new(all_events, stream_versions))
118 }
119
120 async fn write_events_multi(
121 &self,
122 stream_events: Vec<StreamEvents<Self::Event>>,
123 ) -> EventStoreResult<HashMap<StreamId, EventVersion>> {
124 let mut streams = self.streams.write().map_err(|_| {
125 EventStoreError::Internal("Failed to acquire write lock on streams".to_string())
126 })?;
127
128 let mut versions = self.versions.write().map_err(|_| {
129 EventStoreError::Internal("Failed to acquire write lock on versions".to_string())
130 })?;
131
132 for stream_event in &stream_events {
134 let current_version = versions
135 .get(&stream_event.stream_id)
136 .copied()
137 .unwrap_or_else(EventVersion::initial);
138
139 match stream_event.expected_version {
140 ExpectedVersion::New => {
141 if versions.contains_key(&stream_event.stream_id) {
142 return Err(EventStoreError::VersionConflict {
143 stream: stream_event.stream_id.clone(),
144 expected: EventVersion::initial(),
145 current: current_version,
146 });
147 }
148 }
149 ExpectedVersion::Exact(expected) => {
150 if current_version != expected {
151 return Err(EventStoreError::VersionConflict {
152 stream: stream_event.stream_id.clone(),
153 expected,
154 current: current_version,
155 });
156 }
157 }
158 ExpectedVersion::Any => {
159 }
161 }
162 }
163
164 let mut new_versions = HashMap::new();
166
167 for stream_event in stream_events {
168 let stream_events_list = streams.entry(stream_event.stream_id.clone()).or_default();
169
170 let mut current_version = versions
171 .get(&stream_event.stream_id)
172 .copied()
173 .unwrap_or_else(EventVersion::initial);
174
175 for event_to_write in stream_event.events {
176 current_version = current_version.next();
177
178 let stored_event = StoredEvent::new(
179 event_to_write.event_id,
180 stream_event.stream_id.clone(),
181 current_version,
182 Timestamp::now(),
183 event_to_write.payload,
184 event_to_write.metadata,
185 );
186
187 stream_events_list.push(stored_event);
188 }
189
190 versions.insert(stream_event.stream_id.clone(), current_version);
191 new_versions.insert(stream_event.stream_id.clone(), current_version);
192 }
193
194 Ok(new_versions)
195 }
196
197 async fn stream_exists(&self, stream_id: &StreamId) -> EventStoreResult<bool> {
198 let streams = self.streams.read().map_err(|_| {
199 EventStoreError::Internal("Failed to acquire read lock on streams".to_string())
200 })?;
201
202 Ok(streams.contains_key(stream_id))
203 }
204
205 async fn get_stream_version(
206 &self,
207 stream_id: &StreamId,
208 ) -> EventStoreResult<Option<EventVersion>> {
209 let versions = self.versions.read().map_err(|_| {
210 EventStoreError::Internal("Failed to acquire read lock on versions".to_string())
211 })?;
212
213 Ok(versions.get(stream_id).copied())
214 }
215
216 async fn subscribe(
217 &self,
218 options: SubscriptionOptions,
219 ) -> EventStoreResult<Box<dyn Subscription<Event = Self::Event>>> {
220 let subscription = InMemorySubscription::new(self.clone(), options);
221 Ok(Box::new(subscription))
222 }
223}
224
225pub struct InMemorySubscription<E>
227where
228 E: Send + Sync + Clone + 'static + PartialEq + Eq,
229{
230 event_store: InMemoryEventStore<E>,
231 options: SubscriptionOptions,
232 current_position: Arc<RwLock<Option<SubscriptionPosition>>>,
233 checkpoints: Arc<RwLock<HashMap<String, SubscriptionPosition>>>,
234 is_running: Arc<AtomicBool>,
235 is_paused: Arc<AtomicBool>,
236 stop_signal: Arc<AtomicBool>,
237}
238
239impl<E> InMemorySubscription<E>
240where
241 E: Send + Sync + Clone + 'static + PartialEq + Eq,
242{
243 pub fn new(event_store: InMemoryEventStore<E>, options: SubscriptionOptions) -> Self {
245 Self {
246 event_store,
247 options,
248 current_position: Arc::new(RwLock::new(None)),
249 checkpoints: Arc::new(RwLock::new(HashMap::new())),
250 is_running: Arc::new(AtomicBool::new(false)),
251 is_paused: Arc::new(AtomicBool::new(false)),
252 stop_signal: Arc::new(AtomicBool::new(false)),
253 }
254 }
255
256 async fn process_events(
258 &self,
259 name: SubscriptionName,
260 mut processor: Box<dyn EventProcessor<Event = E>>,
261 ) -> SubscriptionResult<()>
262 where
263 E: PartialEq + Eq,
264 {
265 let starting_position = self.load_checkpoint(&name).await?;
267
268 loop {
269 if self.stop_signal.load(Ordering::Acquire) {
271 break;
272 }
273
274 if self.is_paused.load(Ordering::Acquire) {
276 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
277 continue;
278 }
279
280 let events = self
282 .get_events_for_processing(starting_position.as_ref())
283 .await?;
284
285 let mut current_pos = starting_position.clone();
286 let mut has_new_events = false;
287
288 for event in events {
289 if let Some(ref pos) = current_pos {
291 if event.event_id <= pos.last_event_id {
292 continue;
293 }
294 }
295
296 processor.process_event(event.clone()).await?;
298 has_new_events = true;
299
300 let new_checkpoint = Checkpoint::new(event.event_id, event.event_version.into());
302
303 current_pos = Some(if let Some(mut pos) = current_pos {
304 pos.last_event_id = event.event_id;
305 pos.update_checkpoint(event.stream_id.clone(), new_checkpoint);
306 pos
307 } else {
308 let mut pos = SubscriptionPosition::new(event.event_id);
309 pos.update_checkpoint(event.stream_id.clone(), new_checkpoint);
310 pos
311 });
312
313 {
315 let mut guard = self.current_position.write().map_err(|_| {
316 SubscriptionError::CheckpointSaveFailed(
317 "Failed to acquire position lock".to_string(),
318 )
319 })?;
320 (*guard).clone_from(¤t_pos);
321 }
322
323 }
326
327 if !has_new_events && matches!(self.options, SubscriptionOptions::LiveOnly) {
329 processor.on_live().await?;
330 }
331
332 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
334 }
335
336 Ok(())
337 }
338
339 async fn get_events_for_processing(
341 &self,
342 starting_position: Option<&SubscriptionPosition>,
343 ) -> SubscriptionResult<Vec<StoredEvent<E>>> {
344 let (streams, from_position) = match &self.options {
345 SubscriptionOptions::CatchUpFromBeginning => (vec![], None),
346 SubscriptionOptions::CatchUpFromPosition(pos) => (vec![], Some(pos.last_event_id)),
347 SubscriptionOptions::LiveOnly => {
348 (vec![], starting_position.as_ref().map(|p| p.last_event_id))
350 }
351 SubscriptionOptions::SpecificStreamsFromBeginning(_mode) => {
352 (vec![], None)
354 }
355 SubscriptionOptions::SpecificStreamsFromPosition(_mode, pos) => {
356 (vec![], Some(pos.last_event_id))
357 }
358 SubscriptionOptions::AllStreams { from_position } => (vec![], *from_position),
359 SubscriptionOptions::SpecificStreams {
360 streams,
361 from_position,
362 } => (streams.clone(), *from_position),
363 };
364
365 let all_events = if streams.is_empty() {
367 self.read_all_events_sorted()?
368 } else {
369 self.read_streams_events(&streams).await?
370 };
371
372 let filtered_events = if let Some(from_id) =
374 from_position.or_else(|| starting_position.map(|p| p.last_event_id))
375 {
376 all_events
377 .into_iter()
378 .filter(|e| e.event_id > from_id)
379 .collect()
380 } else {
381 all_events
382 };
383
384 Ok(filtered_events)
385 }
386
387 fn read_all_events_sorted(&self) -> SubscriptionResult<Vec<StoredEvent<E>>> {
389 let streams = self.event_store.streams.read().map_err(|_| {
390 SubscriptionError::EventStore(EventStoreError::Internal(
391 "Failed to acquire read lock on streams".to_string(),
392 ))
393 })?;
394
395 let mut all_events = Vec::new();
396 for events in streams.values() {
397 all_events.extend(events.iter().cloned());
398 }
399
400 all_events.sort_by(|a, b| a.event_id.cmp(&b.event_id));
402
403 Ok(all_events)
404 }
405
406 async fn read_streams_events(
408 &self,
409 stream_ids: &[StreamId],
410 ) -> SubscriptionResult<Vec<StoredEvent<E>>> {
411 let read_options = ReadOptions::default();
412
413 let stream_data = self
414 .event_store
415 .read_streams(stream_ids, &read_options)
416 .await
417 .map_err(SubscriptionError::EventStore)?;
418
419 Ok(stream_data.events)
420 }
421}
422
423#[async_trait]
424impl<E> Subscription for InMemorySubscription<E>
425where
426 E: Send + Sync + Clone + 'static + PartialEq + Eq,
427{
428 type Event = E;
429
430 async fn start(
431 &mut self,
432 name: SubscriptionName,
433 options: SubscriptionOptions,
434 processor: Box<dyn EventProcessor<Event = Self::Event>>,
435 ) -> SubscriptionResult<()>
436 where
437 Self::Event: PartialEq + Eq,
438 {
439 self.options = options;
441
442 self.is_running.store(true, Ordering::Release);
444 self.stop_signal.store(false, Ordering::Release);
445 self.is_paused.store(false, Ordering::Release);
446
447 let subscription = self.clone(); let name_copy = name;
450
451 tokio::spawn(async move {
452 if let Err(e) = subscription.process_events(name_copy, processor).await {
453 eprintln!("Subscription processing failed: {e}");
454 }
455 });
456
457 Ok(())
458 }
459
460 async fn stop(&mut self) -> SubscriptionResult<()> {
461 self.stop_signal.store(true, Ordering::Release);
462 self.is_running.store(false, Ordering::Release);
463 Ok(())
464 }
465
466 async fn pause(&mut self) -> SubscriptionResult<()> {
467 self.is_paused.store(true, Ordering::Release);
468 Ok(())
469 }
470
471 async fn resume(&mut self) -> SubscriptionResult<()> {
472 self.is_paused.store(false, Ordering::Release);
473 Ok(())
474 }
475
476 async fn get_position(&self) -> SubscriptionResult<Option<SubscriptionPosition>> {
477 let guard = self.current_position.read().map_err(|_| {
478 SubscriptionError::CheckpointLoadFailed("Failed to acquire position lock".to_string())
479 })?;
480 Ok(guard.clone())
481 }
482
483 async fn save_checkpoint(&mut self, position: SubscriptionPosition) -> SubscriptionResult<()> {
484 {
487 let mut guard = self.current_position.write().map_err(|_| {
488 SubscriptionError::CheckpointSaveFailed(
489 "Failed to acquire position lock".to_string(),
490 )
491 })?;
492 *guard = Some(position);
493 }
494 Ok(())
495 }
496
497 async fn load_checkpoint(
498 &self,
499 name: &SubscriptionName,
500 ) -> SubscriptionResult<Option<SubscriptionPosition>> {
501 let checkpoints = self.checkpoints.read().map_err(|_| {
502 SubscriptionError::CheckpointLoadFailed(
503 "Failed to acquire checkpoints lock".to_string(),
504 )
505 })?;
506 Ok(checkpoints.get(name.as_ref()).cloned())
507 }
508}
509
510impl<E> Clone for InMemorySubscription<E>
512where
513 E: Send + Sync + Clone + 'static + PartialEq + Eq,
514{
515 fn clone(&self) -> Self {
516 Self {
517 event_store: self.event_store.clone(),
518 options: self.options.clone(),
519 current_position: Arc::clone(&self.current_position),
520 checkpoints: Arc::clone(&self.checkpoints),
521 is_running: Arc::clone(&self.is_running),
522 is_paused: Arc::clone(&self.is_paused),
523 stop_signal: Arc::clone(&self.stop_signal),
524 }
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531 use eventcore::{EventId, EventToWrite};
532
533 #[tokio::test]
534 async fn test_new_store_is_empty() {
535 let store: InMemoryEventStore<String> = InMemoryEventStore::new();
536 assert!(store.streams.read().unwrap().is_empty());
537 }
538
539 #[tokio::test]
540 async fn test_clone_shares_storage() {
541 let store1: InMemoryEventStore<String> = InMemoryEventStore::new();
542 #[allow(clippy::redundant_clone)]
543 let store2 = store1.clone();
544
545 assert!(Arc::ptr_eq(&store1.streams, &store2.streams));
547 assert!(Arc::ptr_eq(&store1.versions, &store2.versions));
548 }
549
550 #[tokio::test]
551 async fn test_stream_exists() {
552 let store: InMemoryEventStore<String> = InMemoryEventStore::new();
553 let stream_id = StreamId::try_new("test-stream").unwrap();
554
555 assert!(!store.stream_exists(&stream_id).await.unwrap());
557
558 let event = EventToWrite::new(EventId::new(), "test-event".to_string());
560
561 let stream_events = StreamEvents::new(stream_id.clone(), ExpectedVersion::New, vec![event]);
562
563 store.write_events_multi(vec![stream_events]).await.unwrap();
564
565 assert!(store.stream_exists(&stream_id).await.unwrap());
567 }
568
569 #[tokio::test]
570 async fn test_get_stream_version() {
571 let store: InMemoryEventStore<String> = InMemoryEventStore::new();
572 let stream_id = StreamId::try_new("test-stream").unwrap();
573
574 assert_eq!(store.get_stream_version(&stream_id).await.unwrap(), None);
576
577 let event = EventToWrite::new(EventId::new(), "test-event".to_string());
579 let stream_events = StreamEvents::new(stream_id.clone(), ExpectedVersion::New, vec![event]);
580
581 store.write_events_multi(vec![stream_events]).await.unwrap();
582
583 assert_eq!(
585 store.get_stream_version(&stream_id).await.unwrap(),
586 Some(EventVersion::try_new(1).unwrap())
587 );
588 }
589
590 #[tokio::test]
591 async fn test_read_streams() {
592 let store: InMemoryEventStore<String> = InMemoryEventStore::new();
593 let stream_id1 = StreamId::try_new("stream-1").unwrap();
594 let stream_id2 = StreamId::try_new("stream-2").unwrap();
595
596 let event1 = EventToWrite::new(EventId::new(), "event-1".to_string());
598 let event2 = EventToWrite::new(EventId::new(), "event-2".to_string());
599
600 let stream_events1 =
601 StreamEvents::new(stream_id1.clone(), ExpectedVersion::New, vec![event1]);
602 let stream_events2 =
603 StreamEvents::new(stream_id2.clone(), ExpectedVersion::New, vec![event2]);
604
605 store
606 .write_events_multi(vec![stream_events1, stream_events2])
607 .await
608 .unwrap();
609
610 let result = store
612 .read_streams(
613 &[stream_id1.clone(), stream_id2.clone()],
614 &ReadOptions::new(),
615 )
616 .await
617 .unwrap();
618
619 assert_eq!(result.events.len(), 2);
620 assert_eq!(
621 result.stream_version(&stream_id1),
622 Some(EventVersion::try_new(1).unwrap())
623 );
624 assert_eq!(
625 result.stream_version(&stream_id2),
626 Some(EventVersion::try_new(1).unwrap())
627 );
628
629 let stream1_events: Vec<_> = result.events_for_stream(&stream_id1).collect();
631 assert_eq!(stream1_events.len(), 1);
632 assert_eq!(stream1_events[0].payload, "event-1");
633
634 let stream2_events: Vec<_> = result.events_for_stream(&stream_id2).collect();
635 assert_eq!(stream2_events.len(), 1);
636 assert_eq!(stream2_events[0].payload, "event-2");
637 }
638
639 #[tokio::test]
640 async fn test_concurrency_control() {
641 let store: InMemoryEventStore<String> = InMemoryEventStore::new();
642 let stream_id = StreamId::try_new("test-stream").unwrap();
643
644 let event1 = EventToWrite::new(EventId::new(), "event-1".to_string());
646 let stream_events1 =
647 StreamEvents::new(stream_id.clone(), ExpectedVersion::New, vec![event1]);
648
649 store
650 .write_events_multi(vec![stream_events1])
651 .await
652 .unwrap();
653
654 let event2 = EventToWrite::new(EventId::new(), "event-2".to_string());
656 let stream_events2 = StreamEvents::new(
657 stream_id.clone(),
658 ExpectedVersion::Exact(EventVersion::initial()), vec![event2.clone()],
660 );
661
662 let result = store.write_events_multi(vec![stream_events2]).await;
663
664 assert!(matches!(
665 result,
666 Err(EventStoreError::VersionConflict { .. })
667 ));
668
669 let stream_events3 = StreamEvents::new(
671 stream_id.clone(),
672 ExpectedVersion::Exact(EventVersion::try_new(1).unwrap()),
673 vec![event2],
674 );
675
676 let result = store.write_events_multi(vec![stream_events3]).await;
677
678 assert!(result.is_ok());
679 assert_eq!(
680 store.get_stream_version(&stream_id).await.unwrap(),
681 Some(EventVersion::try_new(2).unwrap())
682 );
683 }
684
685 #[tokio::test]
686 async fn test_multiple_events_in_single_write() {
687 let store: InMemoryEventStore<String> = InMemoryEventStore::new();
688 let stream_id = StreamId::try_new("test-stream").unwrap();
689
690 let events: Vec<EventToWrite<String>> = (0..5)
692 .map(|i| EventToWrite::new(EventId::new(), format!("event-{i}")))
693 .collect();
694
695 let stream_events = StreamEvents::new(stream_id.clone(), ExpectedVersion::New, events);
696
697 store.write_events_multi(vec![stream_events]).await.unwrap();
698
699 assert_eq!(
701 store.get_stream_version(&stream_id).await.unwrap(),
702 Some(EventVersion::try_new(5).unwrap())
703 );
704
705 let result = store
707 .read_streams(&[stream_id.clone()], &ReadOptions::new())
708 .await
709 .unwrap();
710 assert_eq!(result.events.len(), 5);
711 for (i, event) in result.events.iter().enumerate() {
712 assert_eq!(event.payload, format!("event-{i}"));
713 }
714 }
715
716 #[tokio::test]
717 async fn test_expected_version_new() {
718 let store: InMemoryEventStore<String> = InMemoryEventStore::new();
719 let stream_id = StreamId::try_new("test-stream").unwrap();
720
721 let event1 = EventToWrite::new(EventId::new(), "event-1".to_string());
723 let stream_events1 =
724 StreamEvents::new(stream_id.clone(), ExpectedVersion::New, vec![event1]);
725
726 store
727 .write_events_multi(vec![stream_events1])
728 .await
729 .unwrap();
730
731 let event2 = EventToWrite::new(EventId::new(), "event-2".to_string());
733 let stream_events2 =
734 StreamEvents::new(stream_id.clone(), ExpectedVersion::New, vec![event2]);
735
736 let result = store.write_events_multi(vec![stream_events2]).await;
737 assert!(matches!(
738 result,
739 Err(EventStoreError::VersionConflict { .. })
740 ));
741 }
742
743 #[tokio::test]
744 async fn test_expected_version_any() {
745 let store: InMemoryEventStore<String> = InMemoryEventStore::new();
746 let stream_id = StreamId::try_new("test-stream").unwrap();
747
748 let event1 = EventToWrite::new(EventId::new(), "event-1".to_string());
750 let stream_events1 =
751 StreamEvents::new(stream_id.clone(), ExpectedVersion::Any, vec![event1]);
752
753 store
754 .write_events_multi(vec![stream_events1])
755 .await
756 .unwrap();
757
758 let event2 = EventToWrite::new(EventId::new(), "event-2".to_string());
760 let stream_events2 =
761 StreamEvents::new(stream_id.clone(), ExpectedVersion::Any, vec![event2]);
762
763 store
764 .write_events_multi(vec![stream_events2])
765 .await
766 .unwrap();
767
768 assert_eq!(
769 store.get_stream_version(&stream_id).await.unwrap(),
770 Some(EventVersion::try_new(2).unwrap())
771 );
772 }
773
774 #[tokio::test]
775 async fn test_read_options_filtering() {
776 let store: InMemoryEventStore<String> = InMemoryEventStore::new();
777 let stream_id = StreamId::try_new("test-stream").unwrap();
778
779 let events: Vec<EventToWrite<String>> = (0..10)
781 .map(|i| EventToWrite::new(EventId::new(), format!("event-{i}")))
782 .collect();
783
784 let stream_events = StreamEvents::new(stream_id.clone(), ExpectedVersion::New, events);
785
786 store.write_events_multi(vec![stream_events]).await.unwrap();
787
788 let options = ReadOptions::new().from_version(EventVersion::try_new(5).unwrap());
790 let result = store
791 .read_streams(&[stream_id.clone()], &options)
792 .await
793 .unwrap();
794 assert_eq!(result.events.len(), 6); let options = ReadOptions::new().to_version(EventVersion::try_new(3).unwrap());
798 let result = store
799 .read_streams(&[stream_id.clone()], &options)
800 .await
801 .unwrap();
802 assert_eq!(result.events.len(), 3); let options = ReadOptions::new()
806 .from_version(EventVersion::try_new(3).unwrap())
807 .to_version(EventVersion::try_new(7).unwrap());
808 let result = store
809 .read_streams(&[stream_id.clone()], &options)
810 .await
811 .unwrap();
812 assert_eq!(result.events.len(), 5); let options = ReadOptions::new().with_max_events(5);
816 let result = store
817 .read_streams(&[stream_id.clone()], &options)
818 .await
819 .unwrap();
820 assert_eq!(result.events.len(), 5); }
822}