eventcore_memory/
lib.rs

1//! In-memory adapter for `EventCore` event sourcing library
2//!
3//! This crate provides an in-memory implementation of the `EventStore` trait
4//! from the eventcore crate, useful for testing and development scenarios
5//! where persistence is not required.
6
7#![forbid(unsafe_code)]
8#![warn(missing_docs)]
9#![allow(clippy::significant_drop_tightening)]
10
11use std::collections::HashMap;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::{Arc, RwLock};
14
15use async_trait::async_trait;
16use eventcore::{
17    Checkpoint, EventProcessor, EventStore, EventStoreError, EventVersion, ExpectedVersion,
18    ReadOptions, StoredEvent, StreamData, StreamEvents, StreamId, Subscription, SubscriptionError,
19    SubscriptionName, SubscriptionOptions, SubscriptionPosition, SubscriptionResult, Timestamp,
20};
21
22type EventStoreResult<T> = Result<T, EventStoreError>;
23
24/// Thread-safe in-memory event store for testing
25#[derive(Clone)]
26pub struct InMemoryEventStore<E>
27where
28    E: Send + Sync + Clone + 'static + PartialEq + Eq,
29{
30    // Maps stream IDs to their stored events
31    streams: Arc<RwLock<HashMap<StreamId, Vec<StoredEvent<E>>>>>,
32    // Maps stream IDs to their current version
33    versions: Arc<RwLock<HashMap<StreamId, EventVersion>>>,
34}
35
36impl<E> InMemoryEventStore<E>
37where
38    E: Send + Sync + Clone + 'static + PartialEq + Eq,
39{
40    /// Create a new empty in-memory event store
41    pub fn new() -> Self {
42        Self {
43            streams: Arc::new(RwLock::new(HashMap::new())),
44            versions: Arc::new(RwLock::new(HashMap::new())),
45        }
46    }
47}
48
49impl<E> Default for InMemoryEventStore<E>
50where
51    E: Send + Sync + Clone + 'static + PartialEq + Eq,
52{
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58#[async_trait]
59impl<E> EventStore for InMemoryEventStore<E>
60where
61    E: Send + Sync + Clone + 'static + PartialEq + Eq,
62{
63    type Event = E;
64
65    async fn read_streams(
66        &self,
67        stream_ids: &[StreamId],
68        options: &ReadOptions,
69    ) -> EventStoreResult<StreamData<Self::Event>> {
70        let streams = self.streams.read().map_err(|_| {
71            EventStoreError::Internal("Failed to acquire read lock on streams".to_string())
72        })?;
73
74        let versions = self.versions.read().map_err(|_| {
75            EventStoreError::Internal("Failed to acquire read lock on versions".to_string())
76        })?;
77
78        let mut all_events = Vec::new();
79        let mut stream_versions = HashMap::new();
80
81        for stream_id in stream_ids {
82            let version = versions
83                .get(stream_id)
84                .copied()
85                .unwrap_or_else(EventVersion::initial);
86            stream_versions.insert(stream_id.clone(), version);
87
88            if let Some(stream_events) = streams.get(stream_id) {
89                for event in stream_events {
90                    // Apply from_version filter
91                    if let Some(from_version) = options.from_version {
92                        if event.event_version < from_version {
93                            continue;
94                        }
95                    }
96
97                    // Apply to_version filter
98                    if let Some(to_version) = options.to_version {
99                        if event.event_version > to_version {
100                            continue;
101                        }
102                    }
103
104                    all_events.push(event.clone());
105                }
106            }
107        }
108
109        // Sort by event ID (which provides timestamp ordering)
110        all_events.sort_by_key(|e| e.event_id);
111
112        // Apply max_events limit
113        if let Some(max_events) = options.max_events {
114            all_events.truncate(max_events);
115        }
116
117        Ok(StreamData::new(all_events, stream_versions))
118    }
119
120    async fn write_events_multi(
121        &self,
122        stream_events: Vec<StreamEvents<Self::Event>>,
123    ) -> EventStoreResult<HashMap<StreamId, EventVersion>> {
124        let mut streams = self.streams.write().map_err(|_| {
125            EventStoreError::Internal("Failed to acquire write lock on streams".to_string())
126        })?;
127
128        let mut versions = self.versions.write().map_err(|_| {
129            EventStoreError::Internal("Failed to acquire write lock on versions".to_string())
130        })?;
131
132        // First, verify all expected versions match
133        for stream_event in &stream_events {
134            let current_version = versions
135                .get(&stream_event.stream_id)
136                .copied()
137                .unwrap_or_else(EventVersion::initial);
138
139            match stream_event.expected_version {
140                ExpectedVersion::New => {
141                    if versions.contains_key(&stream_event.stream_id) {
142                        return Err(EventStoreError::VersionConflict {
143                            stream: stream_event.stream_id.clone(),
144                            expected: EventVersion::initial(),
145                            current: current_version,
146                        });
147                    }
148                }
149                ExpectedVersion::Exact(expected) => {
150                    if current_version != expected {
151                        return Err(EventStoreError::VersionConflict {
152                            stream: stream_event.stream_id.clone(),
153                            expected,
154                            current: current_version,
155                        });
156                    }
157                }
158                ExpectedVersion::Any => {
159                    // No check needed
160                }
161            }
162        }
163
164        // All versions match, proceed with writes
165        let mut new_versions = HashMap::new();
166
167        for stream_event in stream_events {
168            let stream_events_list = streams.entry(stream_event.stream_id.clone()).or_default();
169
170            let mut current_version = versions
171                .get(&stream_event.stream_id)
172                .copied()
173                .unwrap_or_else(EventVersion::initial);
174
175            for event_to_write in stream_event.events {
176                current_version = current_version.next();
177
178                let stored_event = StoredEvent::new(
179                    event_to_write.event_id,
180                    stream_event.stream_id.clone(),
181                    current_version,
182                    Timestamp::now(),
183                    event_to_write.payload,
184                    event_to_write.metadata,
185                );
186
187                stream_events_list.push(stored_event);
188            }
189
190            versions.insert(stream_event.stream_id.clone(), current_version);
191            new_versions.insert(stream_event.stream_id.clone(), current_version);
192        }
193
194        Ok(new_versions)
195    }
196
197    async fn stream_exists(&self, stream_id: &StreamId) -> EventStoreResult<bool> {
198        let streams = self.streams.read().map_err(|_| {
199            EventStoreError::Internal("Failed to acquire read lock on streams".to_string())
200        })?;
201
202        Ok(streams.contains_key(stream_id))
203    }
204
205    async fn get_stream_version(
206        &self,
207        stream_id: &StreamId,
208    ) -> EventStoreResult<Option<EventVersion>> {
209        let versions = self.versions.read().map_err(|_| {
210            EventStoreError::Internal("Failed to acquire read lock on versions".to_string())
211        })?;
212
213        Ok(versions.get(stream_id).copied())
214    }
215
216    async fn subscribe(
217        &self,
218        options: SubscriptionOptions,
219    ) -> EventStoreResult<Box<dyn Subscription<Event = Self::Event>>> {
220        let subscription = InMemorySubscription::new(self.clone(), options);
221        Ok(Box::new(subscription))
222    }
223}
224
225/// In-memory subscription implementation with full replay and checkpointing support.
226pub struct InMemorySubscription<E>
227where
228    E: Send + Sync + Clone + 'static + PartialEq + Eq,
229{
230    event_store: InMemoryEventStore<E>,
231    options: SubscriptionOptions,
232    current_position: Arc<RwLock<Option<SubscriptionPosition>>>,
233    checkpoints: Arc<RwLock<HashMap<String, SubscriptionPosition>>>,
234    is_running: Arc<AtomicBool>,
235    is_paused: Arc<AtomicBool>,
236    stop_signal: Arc<AtomicBool>,
237}
238
239impl<E> InMemorySubscription<E>
240where
241    E: Send + Sync + Clone + 'static + PartialEq + Eq,
242{
243    /// Creates a new in-memory subscription.
244    pub fn new(event_store: InMemoryEventStore<E>, options: SubscriptionOptions) -> Self {
245        Self {
246            event_store,
247            options,
248            current_position: Arc::new(RwLock::new(None)),
249            checkpoints: Arc::new(RwLock::new(HashMap::new())),
250            is_running: Arc::new(AtomicBool::new(false)),
251            is_paused: Arc::new(AtomicBool::new(false)),
252            stop_signal: Arc::new(AtomicBool::new(false)),
253        }
254    }
255
256    /// Processes events from the event store according to subscription options.
257    async fn process_events(
258        &self,
259        name: SubscriptionName,
260        mut processor: Box<dyn EventProcessor<Event = E>>,
261    ) -> SubscriptionResult<()>
262    where
263        E: PartialEq + Eq,
264    {
265        // Load checkpoint to determine starting position
266        let starting_position = self.load_checkpoint(&name).await?;
267
268        loop {
269            // Check if we should stop
270            if self.stop_signal.load(Ordering::Acquire) {
271                break;
272            }
273
274            // Check if we're paused
275            if self.is_paused.load(Ordering::Acquire) {
276                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
277                continue;
278            }
279
280            // Get events according to subscription options
281            let events = self
282                .get_events_for_processing(starting_position.as_ref())
283                .await?;
284
285            let mut current_pos = starting_position.clone();
286            let mut has_new_events = false;
287
288            for event in events {
289                // Skip events we've already processed
290                if let Some(ref pos) = current_pos {
291                    if event.event_id <= pos.last_event_id {
292                        continue;
293                    }
294                }
295
296                // Process the event
297                processor.process_event(event.clone()).await?;
298                has_new_events = true;
299
300                // Update current position
301                let new_checkpoint = Checkpoint::new(event.event_id, event.event_version.into());
302
303                current_pos = Some(if let Some(mut pos) = current_pos {
304                    pos.last_event_id = event.event_id;
305                    pos.update_checkpoint(event.stream_id.clone(), new_checkpoint);
306                    pos
307                } else {
308                    let mut pos = SubscriptionPosition::new(event.event_id);
309                    pos.update_checkpoint(event.stream_id.clone(), new_checkpoint);
310                    pos
311                });
312
313                // Update our current position
314                {
315                    let mut guard = self.current_position.write().map_err(|_| {
316                        SubscriptionError::CheckpointSaveFailed(
317                            "Failed to acquire position lock".to_string(),
318                        )
319                    })?;
320                    (*guard).clone_from(&current_pos);
321                }
322
323                // Periodically save checkpoint - for in-memory we just update current_position
324                // which is handled above
325            }
326
327            // If we're caught up and this is a live subscription, notify the processor
328            if !has_new_events && matches!(self.options, SubscriptionOptions::LiveOnly) {
329                processor.on_live().await?;
330            }
331
332            // Sleep briefly to avoid busy-waiting
333            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
334        }
335
336        Ok(())
337    }
338
339    /// Gets events for processing based on subscription options.
340    async fn get_events_for_processing(
341        &self,
342        starting_position: Option<&SubscriptionPosition>,
343    ) -> SubscriptionResult<Vec<StoredEvent<E>>> {
344        let (streams, from_position) = match &self.options {
345            SubscriptionOptions::CatchUpFromBeginning => (vec![], None),
346            SubscriptionOptions::CatchUpFromPosition(pos) => (vec![], Some(pos.last_event_id)),
347            SubscriptionOptions::LiveOnly => {
348                // For live-only, start from the current position in the store
349                (vec![], starting_position.as_ref().map(|p| p.last_event_id))
350            }
351            SubscriptionOptions::SpecificStreamsFromBeginning(_mode) => {
352                // This would need stream selection logic based on mode
353                (vec![], None)
354            }
355            SubscriptionOptions::SpecificStreamsFromPosition(_mode, pos) => {
356                (vec![], Some(pos.last_event_id))
357            }
358            SubscriptionOptions::AllStreams { from_position } => (vec![], *from_position),
359            SubscriptionOptions::SpecificStreams {
360                streams,
361                from_position,
362            } => (streams.clone(), *from_position),
363        };
364
365        // Read all events from specified streams (or all streams if empty)
366        let all_events = if streams.is_empty() {
367            self.read_all_events_sorted()?
368        } else {
369            self.read_streams_events(&streams).await?
370        };
371
372        // Filter events based on starting position
373        let filtered_events = if let Some(from_id) =
374            from_position.or_else(|| starting_position.map(|p| p.last_event_id))
375        {
376            all_events
377                .into_iter()
378                .filter(|e| e.event_id > from_id)
379                .collect()
380        } else {
381            all_events
382        };
383
384        Ok(filtered_events)
385    }
386
387    /// Reads all events from the store in sorted order.
388    fn read_all_events_sorted(&self) -> SubscriptionResult<Vec<StoredEvent<E>>> {
389        let streams = self.event_store.streams.read().map_err(|_| {
390            SubscriptionError::EventStore(EventStoreError::Internal(
391                "Failed to acquire read lock on streams".to_string(),
392            ))
393        })?;
394
395        let mut all_events = Vec::new();
396        for events in streams.values() {
397            all_events.extend(events.iter().cloned());
398        }
399
400        // Sort by event ID (which is timestamp-based with UUIDv7)
401        all_events.sort_by(|a, b| a.event_id.cmp(&b.event_id));
402
403        Ok(all_events)
404    }
405
406    /// Reads events from specific streams.
407    async fn read_streams_events(
408        &self,
409        stream_ids: &[StreamId],
410    ) -> SubscriptionResult<Vec<StoredEvent<E>>> {
411        let read_options = ReadOptions::default();
412
413        let stream_data = self
414            .event_store
415            .read_streams(stream_ids, &read_options)
416            .await
417            .map_err(SubscriptionError::EventStore)?;
418
419        Ok(stream_data.events)
420    }
421}
422
423#[async_trait]
424impl<E> Subscription for InMemorySubscription<E>
425where
426    E: Send + Sync + Clone + 'static + PartialEq + Eq,
427{
428    type Event = E;
429
430    async fn start(
431        &mut self,
432        name: SubscriptionName,
433        options: SubscriptionOptions,
434        processor: Box<dyn EventProcessor<Event = Self::Event>>,
435    ) -> SubscriptionResult<()>
436    where
437        Self::Event: PartialEq + Eq,
438    {
439        // Update options if provided
440        self.options = options;
441
442        // Set running state
443        self.is_running.store(true, Ordering::Release);
444        self.stop_signal.store(false, Ordering::Release);
445        self.is_paused.store(false, Ordering::Release);
446
447        // Start processing events in a background task
448        let subscription = self.clone(); // We'll need to implement Clone
449        let name_copy = name;
450
451        tokio::spawn(async move {
452            if let Err(e) = subscription.process_events(name_copy, processor).await {
453                eprintln!("Subscription processing failed: {e}");
454            }
455        });
456
457        Ok(())
458    }
459
460    async fn stop(&mut self) -> SubscriptionResult<()> {
461        self.stop_signal.store(true, Ordering::Release);
462        self.is_running.store(false, Ordering::Release);
463        Ok(())
464    }
465
466    async fn pause(&mut self) -> SubscriptionResult<()> {
467        self.is_paused.store(true, Ordering::Release);
468        Ok(())
469    }
470
471    async fn resume(&mut self) -> SubscriptionResult<()> {
472        self.is_paused.store(false, Ordering::Release);
473        Ok(())
474    }
475
476    async fn get_position(&self) -> SubscriptionResult<Option<SubscriptionPosition>> {
477        let guard = self.current_position.read().map_err(|_| {
478            SubscriptionError::CheckpointLoadFailed("Failed to acquire position lock".to_string())
479        })?;
480        Ok(guard.clone())
481    }
482
483    async fn save_checkpoint(&mut self, position: SubscriptionPosition) -> SubscriptionResult<()> {
484        // For the in-memory implementation, we don't have a specific name context here,
485        // so we'll update the current position
486        {
487            let mut guard = self.current_position.write().map_err(|_| {
488                SubscriptionError::CheckpointSaveFailed(
489                    "Failed to acquire position lock".to_string(),
490                )
491            })?;
492            *guard = Some(position);
493        }
494        Ok(())
495    }
496
497    async fn load_checkpoint(
498        &self,
499        name: &SubscriptionName,
500    ) -> SubscriptionResult<Option<SubscriptionPosition>> {
501        let checkpoints = self.checkpoints.read().map_err(|_| {
502            SubscriptionError::CheckpointLoadFailed(
503                "Failed to acquire checkpoints lock".to_string(),
504            )
505        })?;
506        Ok(checkpoints.get(name.as_ref()).cloned())
507    }
508}
509
510// We need to implement Clone for the subscription
511impl<E> Clone for InMemorySubscription<E>
512where
513    E: Send + Sync + Clone + 'static + PartialEq + Eq,
514{
515    fn clone(&self) -> Self {
516        Self {
517            event_store: self.event_store.clone(),
518            options: self.options.clone(),
519            current_position: Arc::clone(&self.current_position),
520            checkpoints: Arc::clone(&self.checkpoints),
521            is_running: Arc::clone(&self.is_running),
522            is_paused: Arc::clone(&self.is_paused),
523            stop_signal: Arc::clone(&self.stop_signal),
524        }
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use eventcore::{EventId, EventToWrite};
532
533    #[tokio::test]
534    async fn test_new_store_is_empty() {
535        let store: InMemoryEventStore<String> = InMemoryEventStore::new();
536        assert!(store.streams.read().unwrap().is_empty());
537    }
538
539    #[tokio::test]
540    async fn test_clone_shares_storage() {
541        let store1: InMemoryEventStore<String> = InMemoryEventStore::new();
542        #[allow(clippy::redundant_clone)]
543        let store2 = store1.clone();
544
545        // Verify both stores point to the same storage
546        assert!(Arc::ptr_eq(&store1.streams, &store2.streams));
547        assert!(Arc::ptr_eq(&store1.versions, &store2.versions));
548    }
549
550    #[tokio::test]
551    async fn test_stream_exists() {
552        let store: InMemoryEventStore<String> = InMemoryEventStore::new();
553        let stream_id = StreamId::try_new("test-stream").unwrap();
554
555        // Stream should not exist initially
556        assert!(!store.stream_exists(&stream_id).await.unwrap());
557
558        // Write an event to create the stream
559        let event = EventToWrite::new(EventId::new(), "test-event".to_string());
560
561        let stream_events = StreamEvents::new(stream_id.clone(), ExpectedVersion::New, vec![event]);
562
563        store.write_events_multi(vec![stream_events]).await.unwrap();
564
565        // Stream should now exist
566        assert!(store.stream_exists(&stream_id).await.unwrap());
567    }
568
569    #[tokio::test]
570    async fn test_get_stream_version() {
571        let store: InMemoryEventStore<String> = InMemoryEventStore::new();
572        let stream_id = StreamId::try_new("test-stream").unwrap();
573
574        // New stream should have no version
575        assert_eq!(store.get_stream_version(&stream_id).await.unwrap(), None);
576
577        // Write an event
578        let event = EventToWrite::new(EventId::new(), "test-event".to_string());
579        let stream_events = StreamEvents::new(stream_id.clone(), ExpectedVersion::New, vec![event]);
580
581        store.write_events_multi(vec![stream_events]).await.unwrap();
582
583        // Version should be 1
584        assert_eq!(
585            store.get_stream_version(&stream_id).await.unwrap(),
586            Some(EventVersion::try_new(1).unwrap())
587        );
588    }
589
590    #[tokio::test]
591    async fn test_read_streams() {
592        let store: InMemoryEventStore<String> = InMemoryEventStore::new();
593        let stream_id1 = StreamId::try_new("stream-1").unwrap();
594        let stream_id2 = StreamId::try_new("stream-2").unwrap();
595
596        // Write events to both streams
597        let event1 = EventToWrite::new(EventId::new(), "event-1".to_string());
598        let event2 = EventToWrite::new(EventId::new(), "event-2".to_string());
599
600        let stream_events1 =
601            StreamEvents::new(stream_id1.clone(), ExpectedVersion::New, vec![event1]);
602        let stream_events2 =
603            StreamEvents::new(stream_id2.clone(), ExpectedVersion::New, vec![event2]);
604
605        store
606            .write_events_multi(vec![stream_events1, stream_events2])
607            .await
608            .unwrap();
609
610        // Read both streams
611        let result = store
612            .read_streams(
613                &[stream_id1.clone(), stream_id2.clone()],
614                &ReadOptions::new(),
615            )
616            .await
617            .unwrap();
618
619        assert_eq!(result.events.len(), 2);
620        assert_eq!(
621            result.stream_version(&stream_id1),
622            Some(EventVersion::try_new(1).unwrap())
623        );
624        assert_eq!(
625            result.stream_version(&stream_id2),
626            Some(EventVersion::try_new(1).unwrap())
627        );
628
629        // Verify we can find events by stream
630        let stream1_events: Vec<_> = result.events_for_stream(&stream_id1).collect();
631        assert_eq!(stream1_events.len(), 1);
632        assert_eq!(stream1_events[0].payload, "event-1");
633
634        let stream2_events: Vec<_> = result.events_for_stream(&stream_id2).collect();
635        assert_eq!(stream2_events.len(), 1);
636        assert_eq!(stream2_events[0].payload, "event-2");
637    }
638
639    #[tokio::test]
640    async fn test_concurrency_control() {
641        let store: InMemoryEventStore<String> = InMemoryEventStore::new();
642        let stream_id = StreamId::try_new("test-stream").unwrap();
643
644        // Write initial event
645        let event1 = EventToWrite::new(EventId::new(), "event-1".to_string());
646        let stream_events1 =
647            StreamEvents::new(stream_id.clone(), ExpectedVersion::New, vec![event1]);
648
649        store
650            .write_events_multi(vec![stream_events1])
651            .await
652            .unwrap();
653
654        // Try to write with wrong expected version
655        let event2 = EventToWrite::new(EventId::new(), "event-2".to_string());
656        let stream_events2 = StreamEvents::new(
657            stream_id.clone(),
658            ExpectedVersion::Exact(EventVersion::initial()), // Wrong version, should be 1
659            vec![event2.clone()],
660        );
661
662        let result = store.write_events_multi(vec![stream_events2]).await;
663
664        assert!(matches!(
665            result,
666            Err(EventStoreError::VersionConflict { .. })
667        ));
668
669        // Write with correct version should succeed
670        let stream_events3 = StreamEvents::new(
671            stream_id.clone(),
672            ExpectedVersion::Exact(EventVersion::try_new(1).unwrap()),
673            vec![event2],
674        );
675
676        let result = store.write_events_multi(vec![stream_events3]).await;
677
678        assert!(result.is_ok());
679        assert_eq!(
680            store.get_stream_version(&stream_id).await.unwrap(),
681            Some(EventVersion::try_new(2).unwrap())
682        );
683    }
684
685    #[tokio::test]
686    async fn test_multiple_events_in_single_write() {
687        let store: InMemoryEventStore<String> = InMemoryEventStore::new();
688        let stream_id = StreamId::try_new("test-stream").unwrap();
689
690        // Write multiple events at once
691        let events: Vec<EventToWrite<String>> = (0..5)
692            .map(|i| EventToWrite::new(EventId::new(), format!("event-{i}")))
693            .collect();
694
695        let stream_events = StreamEvents::new(stream_id.clone(), ExpectedVersion::New, events);
696
697        store.write_events_multi(vec![stream_events]).await.unwrap();
698
699        // Version should be 5
700        assert_eq!(
701            store.get_stream_version(&stream_id).await.unwrap(),
702            Some(EventVersion::try_new(5).unwrap())
703        );
704
705        // Read and verify all events
706        let result = store
707            .read_streams(&[stream_id.clone()], &ReadOptions::new())
708            .await
709            .unwrap();
710        assert_eq!(result.events.len(), 5);
711        for (i, event) in result.events.iter().enumerate() {
712            assert_eq!(event.payload, format!("event-{i}"));
713        }
714    }
715
716    #[tokio::test]
717    async fn test_expected_version_new() {
718        let store: InMemoryEventStore<String> = InMemoryEventStore::new();
719        let stream_id = StreamId::try_new("test-stream").unwrap();
720
721        // First write with ExpectedVersion::New should succeed
722        let event1 = EventToWrite::new(EventId::new(), "event-1".to_string());
723        let stream_events1 =
724            StreamEvents::new(stream_id.clone(), ExpectedVersion::New, vec![event1]);
725
726        store
727            .write_events_multi(vec![stream_events1])
728            .await
729            .unwrap();
730
731        // Second write with ExpectedVersion::New should fail
732        let event2 = EventToWrite::new(EventId::new(), "event-2".to_string());
733        let stream_events2 =
734            StreamEvents::new(stream_id.clone(), ExpectedVersion::New, vec![event2]);
735
736        let result = store.write_events_multi(vec![stream_events2]).await;
737        assert!(matches!(
738            result,
739            Err(EventStoreError::VersionConflict { .. })
740        ));
741    }
742
743    #[tokio::test]
744    async fn test_expected_version_any() {
745        let store: InMemoryEventStore<String> = InMemoryEventStore::new();
746        let stream_id = StreamId::try_new("test-stream").unwrap();
747
748        // Write with ExpectedVersion::Any on new stream should succeed
749        let event1 = EventToWrite::new(EventId::new(), "event-1".to_string());
750        let stream_events1 =
751            StreamEvents::new(stream_id.clone(), ExpectedVersion::Any, vec![event1]);
752
753        store
754            .write_events_multi(vec![stream_events1])
755            .await
756            .unwrap();
757
758        // Write with ExpectedVersion::Any on existing stream should succeed
759        let event2 = EventToWrite::new(EventId::new(), "event-2".to_string());
760        let stream_events2 =
761            StreamEvents::new(stream_id.clone(), ExpectedVersion::Any, vec![event2]);
762
763        store
764            .write_events_multi(vec![stream_events2])
765            .await
766            .unwrap();
767
768        assert_eq!(
769            store.get_stream_version(&stream_id).await.unwrap(),
770            Some(EventVersion::try_new(2).unwrap())
771        );
772    }
773
774    #[tokio::test]
775    async fn test_read_options_filtering() {
776        let store: InMemoryEventStore<String> = InMemoryEventStore::new();
777        let stream_id = StreamId::try_new("test-stream").unwrap();
778
779        // Write 10 events
780        let events: Vec<EventToWrite<String>> = (0..10)
781            .map(|i| EventToWrite::new(EventId::new(), format!("event-{i}")))
782            .collect();
783
784        let stream_events = StreamEvents::new(stream_id.clone(), ExpectedVersion::New, events);
785
786        store.write_events_multi(vec![stream_events]).await.unwrap();
787
788        // Test from_version
789        let options = ReadOptions::new().from_version(EventVersion::try_new(5).unwrap());
790        let result = store
791            .read_streams(&[stream_id.clone()], &options)
792            .await
793            .unwrap();
794        assert_eq!(result.events.len(), 6); // Events 5-10
795
796        // Test to_version
797        let options = ReadOptions::new().to_version(EventVersion::try_new(3).unwrap());
798        let result = store
799            .read_streams(&[stream_id.clone()], &options)
800            .await
801            .unwrap();
802        assert_eq!(result.events.len(), 3); // Events 1-3
803
804        // Test from_version and to_version
805        let options = ReadOptions::new()
806            .from_version(EventVersion::try_new(3).unwrap())
807            .to_version(EventVersion::try_new(7).unwrap());
808        let result = store
809            .read_streams(&[stream_id.clone()], &options)
810            .await
811            .unwrap();
812        assert_eq!(result.events.len(), 5); // Events 3-7
813
814        // Test max_events
815        let options = ReadOptions::new().with_max_events(5);
816        let result = store
817            .read_streams(&[stream_id.clone()], &options)
818            .await
819            .unwrap();
820        assert_eq!(result.events.len(), 5); // First 5 events
821    }
822}