Skip to main content

eventcore_memory/
lib.rs

1//! In-memory event store implementation for testing.
2//!
3//! This module provides the `InMemoryEventStore` - a lightweight, zero-dependency
4//! storage backend for EventCore integration tests and development.
5
6use std::collections::HashMap;
7use std::sync::{Arc, RwLock};
8
9use eventcore_types::{
10    CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
11    EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
12    StreamVersion, StreamWriteEntry, StreamWrites,
13};
14use uuid::Uuid;
15
16type StreamData = (Vec<Box<dyn std::any::Any + Send>>, StreamVersion);
17
18/// Entry in the global event log with indexed stream_id for efficient filtering.
19///
20/// This structure mirrors the Postgres schema where stream_id is a separate
21/// indexed column and event_id (UUID7) serves as the global position.
22/// By storing stream_id and event_id separately, we can filter by stream
23/// prefix and position without parsing JSON, matching the performance
24/// characteristics of the database implementation.
25#[derive(Debug, Clone)]
26struct GlobalLogEntry {
27    /// Event identifier (UUID7), used as global position
28    event_id: Uuid,
29    /// Stream identifier, extracted at write time for efficient filtering
30    stream_id: String,
31    /// Event type name, stored at write time for efficient type filtering
32    event_type: String,
33    /// Event data as JSON value
34    event_data: serde_json::Value,
35}
36
37/// Internal storage combining per-stream data with global event ordering.
38struct StoreData {
39    streams: HashMap<StreamId, StreamData>,
40    /// Global log with indexed stream_id for efficient EventReader queries
41    global_log: Vec<GlobalLogEntry>,
42    /// Checkpoint storage for projection progress tracking
43    checkpoints: HashMap<String, StreamPosition>,
44    /// Coordination locks for projector leadership
45    locks: Arc<RwLock<HashMap<String, ()>>>,
46}
47
48/// In-memory event store implementation for testing.
49///
50/// `InMemoryEventStore` provides a lightweight, zero-dependency storage backend
51/// for EventCore integration tests and development. It implements the `EventStore`,
52/// `EventReader`, `CheckpointStore`, and `ProjectorCoordinator` traits using
53/// standard library collections with optimistic concurrency control via version
54/// checking.
55///
56/// # Example
57///
58/// ```ignore
59/// use eventcore_memory::InMemoryEventStore;
60///
61/// let store = InMemoryEventStore::new();
62/// // Use store with execute() function
63/// ```
64///
65/// # Thread Safety
66///
67/// `InMemoryEventStore` uses interior mutability (`Mutex`) for concurrent access.
68pub struct InMemoryEventStore {
69    data: std::sync::Mutex<StoreData>,
70}
71
72impl InMemoryEventStore {
73    /// Create a new in-memory event store.
74    ///
75    /// Returns an empty event store ready for command execution.
76    /// All streams start at version 0 (no events).
77    pub fn new() -> Self {
78        Self {
79            data: std::sync::Mutex::new(StoreData {
80                streams: HashMap::new(),
81                global_log: Vec::new(),
82                checkpoints: HashMap::new(),
83                locks: Arc::new(RwLock::new(HashMap::new())),
84            }),
85        }
86    }
87}
88
89impl Default for InMemoryEventStore {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95impl EventStore for InMemoryEventStore {
96    async fn read_stream<E: Event>(
97        &self,
98        stream_id: StreamId,
99    ) -> Result<EventStreamReader<E>, EventStoreError> {
100        let data = self
101            .data
102            .lock()
103            .map_err(|_| EventStoreError::StoreFailure {
104                operation: Operation::ReadStream,
105            })?;
106        let events = match data.streams.get(&stream_id) {
107            None => Vec::new(),
108            Some((boxed_events, _version)) => {
109                let mut events = Vec::with_capacity(boxed_events.len());
110                for boxed in boxed_events {
111                    match boxed.downcast_ref::<E>() {
112                        Some(event) => events.push(event.clone()),
113                        None => {
114                            return Err(EventStoreError::DeserializationFailed {
115                                stream_id,
116                                detail: format!(
117                                    "event could not be downcast to {}",
118                                    std::any::type_name::<E>()
119                                ),
120                            });
121                        }
122                    }
123                }
124                events
125            }
126        };
127
128        Ok(EventStreamReader::new(events))
129    }
130
131    async fn append_events(
132        &self,
133        writes: StreamWrites,
134    ) -> Result<EventStreamSlice, EventStoreError> {
135        let mut data = self
136            .data
137            .lock()
138            .map_err(|_| EventStoreError::StoreFailure {
139                operation: Operation::AppendEvents,
140            })?;
141        let expected_versions = writes.expected_versions().clone();
142
143        // Check all version constraints before writing any events
144        for (stream_id, expected_version) in &expected_versions {
145            let current_version = data
146                .streams
147                .get(stream_id)
148                .map(|(_events, version)| *version)
149                .unwrap_or_else(|| StreamVersion::new(0));
150
151            if current_version != *expected_version {
152                return Err(EventStoreError::VersionConflict {
153                    stream_id: stream_id.clone(),
154                    expected: *expected_version,
155                    actual: current_version,
156                });
157            }
158        }
159
160        // All versions match - proceed with writes
161        for entry in writes.into_entries() {
162            let StreamWriteEntry {
163                stream_id,
164                event,
165                event_type,
166                event_data,
167            } = entry;
168
169            // Generate UUID7 for this event (monotonic, timestamp-ordered)
170            let event_id = Uuid::now_v7();
171
172            // Store in global log for EventReader with indexed stream_id, event_type, and event_id
173            data.global_log.push(GlobalLogEntry {
174                event_id,
175                stream_id: stream_id.as_ref().to_string(),
176                event_type: event_type.to_string(),
177                event_data,
178            });
179
180            let (events, version) = data
181                .streams
182                .entry(stream_id)
183                .or_insert_with(|| (Vec::new(), StreamVersion::new(0)));
184            events.push(event);
185            *version = version.increment();
186        }
187
188        Ok(EventStreamSlice)
189    }
190}
191
192impl EventReader for InMemoryEventStore {
193    type Error = EventStoreError;
194
195    async fn read_events<E: Event>(
196        &self,
197        filter: EventFilter,
198        page: EventPage,
199    ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
200        let data = self
201            .data
202            .lock()
203            .map_err(|_| EventStoreError::StoreFailure {
204                operation: Operation::ReadStream,
205            })?;
206
207        let after_event_id = page.after_position().map(|p| p.into_inner());
208
209        let events: Vec<(E, StreamPosition)> = data
210            .global_log
211            .iter()
212            .filter(|entry| {
213                // Filter by event_id (UUID7 comparison)
214                match after_event_id {
215                    None => true,
216                    Some(after_id) => entry.event_id > after_id,
217                }
218            })
219            .filter(|entry| {
220                // Filter by indexed stream_id WITHOUT parsing JSON (matches Postgres behavior)
221                match filter.stream_prefix() {
222                    None => true,
223                    Some(prefix) => entry.stream_id.starts_with(prefix.as_ref()),
224                }
225            })
226            .filter(|entry| {
227                // Filter by event_type BEFORE take() so non-matching types
228                // don't consume batch slots (fixes issue #372).
229                // Use explicit filter if set, otherwise derive from E::event_type_name().
230                let type_filter = filter.event_type().unwrap_or_else(|| E::event_type_name());
231                entry.event_type == type_filter
232            })
233            .take(page.limit().into_inner())
234            .filter_map(|entry| {
235                serde_json::from_value::<E>(entry.event_data.clone())
236                    .ok()
237                    .map(|e| (e, StreamPosition::new(entry.event_id)))
238            })
239            .collect();
240
241        Ok(events)
242    }
243}
244
245impl CheckpointStore for InMemoryEventStore {
246    type Error = InMemoryCheckpointError;
247
248    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
249        let data = self
250            .data
251            .lock()
252            .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
253        Ok(data.checkpoints.get(name).copied())
254    }
255
256    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
257        let mut data = self
258            .data
259            .lock()
260            .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
261        let _ = data.checkpoints.insert(name.to_string(), position);
262        Ok(())
263    }
264}
265
266impl ProjectorCoordinator for InMemoryEventStore {
267    type Error = InMemoryCoordinationError;
268    type Guard = InMemoryCoordinationGuard;
269
270    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
271        let data = self
272            .data
273            .lock()
274            .map_err(|e| InMemoryCoordinationError::LockPoisoned {
275                message: e.to_string(),
276            })?;
277
278        let mut guard =
279            data.locks
280                .write()
281                .map_err(|e| InMemoryCoordinationError::LockPoisoned {
282                    message: e.to_string(),
283                })?;
284
285        if guard.contains_key(subscription_name) {
286            return Err(InMemoryCoordinationError::LeadershipNotAcquired {
287                subscription_name: subscription_name.to_string(),
288            });
289        }
290
291        let _ = guard.insert(subscription_name.to_string(), ());
292
293        Ok(InMemoryCoordinationGuard {
294            subscription_name: subscription_name.to_string(),
295            locks: Arc::clone(&data.locks),
296        })
297    }
298}
299
300/// In-memory checkpoint store for tracking projection progress.
301///
302/// `InMemoryCheckpointStore` stores checkpoint positions in memory using a
303/// thread-safe `Arc<RwLock<HashMap>>`. It is primarily useful for testing
304/// and single-process deployments where persistence across restarts is not required.
305///
306/// For production deployments requiring durability, use a persistent
307/// checkpoint store implementation.
308///
309/// # Example
310///
311/// ```ignore
312/// use eventcore_memory::InMemoryCheckpointStore;
313///
314/// let checkpoint_store = InMemoryCheckpointStore::new();
315/// // Use with ProjectionRunner
316/// ```
317#[derive(Debug, Clone, Default)]
318pub struct InMemoryCheckpointStore {
319    checkpoints: Arc<RwLock<HashMap<String, StreamPosition>>>,
320}
321
322impl InMemoryCheckpointStore {
323    /// Create a new in-memory checkpoint store.
324    pub fn new() -> Self {
325        Self::default()
326    }
327}
328
329/// Error type for in-memory checkpoint store operations.
330///
331/// Since the in-memory store uses an `RwLock`, the only possible error
332/// is a poisoned lock from a panic in another thread.
333#[derive(Debug, Clone, thiserror::Error)]
334pub enum InMemoryCheckpointError {
335    #[error("failed to acquire lock: {0}")]
336    LockFailed(String),
337}
338
339/// Error type for in-memory coordinator operations.
340#[derive(Debug, Clone, thiserror::Error)]
341pub enum InMemoryCoordinationError {
342    /// Leadership is already held by another instance.
343    #[error(
344        "leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
345    )]
346    LeadershipNotAcquired { subscription_name: String },
347    /// Lock was poisoned by a panic in another thread.
348    #[error("lock poisoned: {message}")]
349    LockPoisoned { message: String },
350}
351
352/// Guard that releases leadership when dropped.
353#[derive(Debug)]
354pub struct InMemoryCoordinationGuard {
355    subscription_name: String,
356    locks: Arc<RwLock<HashMap<String, ()>>>,
357}
358
359impl Drop for InMemoryCoordinationGuard {
360    fn drop(&mut self) {
361        if let Ok(mut guard) = self.locks.write() {
362            let _ = guard.remove(&self.subscription_name);
363        } else {
364            tracing::error!(
365                subscription_name = %self.subscription_name,
366                "failed to release coordination lock: RwLock poisoned"
367            );
368        }
369    }
370}
371
372/// In-memory projector coordinator for single-process deployments.
373///
374/// `InMemoryProjectorCoordinator` provides coordination for projectors within a single
375/// process using an in-memory lock table. This is suitable for testing and single-process
376/// deployments where distributed coordination is not required.
377///
378/// For distributed deployments with multiple process instances, use a database-backed
379/// coordinator implementation (e.g., PostgreSQL advisory locks).
380#[derive(Debug, Clone, Default)]
381pub struct InMemoryProjectorCoordinator {
382    locks: Arc<RwLock<HashMap<String, ()>>>,
383}
384
385impl InMemoryProjectorCoordinator {
386    /// Create a new in-memory projector coordinator.
387    pub fn new() -> Self {
388        Self::default()
389    }
390}
391
392impl ProjectorCoordinator for InMemoryProjectorCoordinator {
393    type Error = InMemoryCoordinationError;
394    type Guard = InMemoryCoordinationGuard;
395
396    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
397        let mut guard =
398            self.locks
399                .write()
400                .map_err(|e| InMemoryCoordinationError::LockPoisoned {
401                    message: e.to_string(),
402                })?;
403
404        if guard.contains_key(subscription_name) {
405            return Err(InMemoryCoordinationError::LeadershipNotAcquired {
406                subscription_name: subscription_name.to_string(),
407            });
408        }
409
410        let _ = guard.insert(subscription_name.to_string(), ());
411
412        Ok(InMemoryCoordinationGuard {
413            subscription_name: subscription_name.to_string(),
414            locks: Arc::clone(&self.locks),
415        })
416    }
417}
418
419impl CheckpointStore for InMemoryCheckpointStore {
420    type Error = InMemoryCheckpointError;
421
422    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
423        let guard = self
424            .checkpoints
425            .read()
426            .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
427        Ok(guard.get(name).copied())
428    }
429
430    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
431        let mut guard = self
432            .checkpoints
433            .write()
434            .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
435        let _ = guard.insert(name.to_string(), position);
436        Ok(())
437    }
438}
439
440#[cfg(test)]
441mod tests {
442    use super::*;
443    use eventcore_types::{BatchSize, EventFilter, EventPage};
444    use serde::{Deserialize, Serialize};
445
446    /// Test-specific domain event type for unit testing storage operations.
447    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
448    struct TestEvent {
449        stream_id: StreamId,
450        data: String,
451    }
452
453    impl Event for TestEvent {
454        fn stream_id(&self) -> &StreamId {
455            &self.stream_id
456        }
457
458        fn event_type_name() -> &'static str {
459            "TestEvent"
460        }
461    }
462
463    /// Unit test: Verify InMemoryEventStore can append and retrieve a single event
464    ///
465    /// This test verifies the fundamental event storage capability:
466    /// - Append an event to a stream
467    /// - Read the stream back
468    /// - Verify the event is retrievable with correct data
469    ///
470    /// This is a unit test drilling down from the failing integration test
471    /// test_deposit_command_event_data_is_retrievable. We're testing the
472    /// storage layer in isolation before testing the full command execution flow.
473    #[tokio::test]
474    async fn test_append_and_read_single_event() {
475        // Given: An in-memory event store
476        let store = InMemoryEventStore::new();
477
478        // And: A stream ID
479        let stream_id = StreamId::try_new("test-stream-123".to_string()).expect("valid stream id");
480
481        // And: A domain event to store
482        let event = TestEvent {
483            stream_id: stream_id.clone(),
484            data: "test event data".to_string(),
485        };
486
487        // And: A collection of writes containing the event (expected version 0 for empty stream)
488        let writes = StreamWrites::new()
489            .register_stream(stream_id.clone(), StreamVersion::new(0))
490            .and_then(|writes| writes.append(event.clone()))
491            .expect("append should succeed");
492
493        // When: We append the event to the store
494        let _ = store
495            .append_events(writes)
496            .await
497            .expect("append to succeed");
498
499        let reader = store
500            .read_stream::<TestEvent>(stream_id)
501            .await
502            .expect("read to succeed");
503
504        let observed = (
505            reader.is_empty(),
506            reader.len(),
507            reader.iter().next().is_none(),
508        );
509
510        assert_eq!(observed, (false, 1usize, false));
511    }
512
513    #[tokio::test]
514    async fn event_stream_reader_is_empty_reflects_stream_population() {
515        let store = InMemoryEventStore::new();
516        let stream_id =
517            StreamId::try_new("is-empty-observation".to_string()).expect("valid stream id");
518
519        let initial_reader = store
520            .read_stream::<TestEvent>(stream_id.clone())
521            .await
522            .expect("initial read to succeed");
523
524        let event = TestEvent {
525            stream_id: stream_id.clone(),
526            data: "populated event".to_string(),
527        };
528
529        let writes = StreamWrites::new()
530            .register_stream(stream_id.clone(), StreamVersion::new(0))
531            .and_then(|writes| writes.append(event))
532            .expect("append should succeed");
533
534        let _ = store
535            .append_events(writes)
536            .await
537            .expect("append to succeed");
538
539        let populated_reader = store
540            .read_stream::<TestEvent>(stream_id)
541            .await
542            .expect("populated read to succeed");
543
544        let observed = (
545            initial_reader.is_empty(),
546            initial_reader.len(),
547            populated_reader.is_empty(),
548            populated_reader.len(),
549        );
550
551        assert_eq!(observed, (true, 0usize, false, 1usize));
552    }
553
554    #[tokio::test]
555    async fn read_stream_iterates_through_events_in_order() {
556        let store = InMemoryEventStore::new();
557        let stream_id = StreamId::try_new("ordered-stream".to_string()).expect("valid stream id");
558
559        let first_event = TestEvent {
560            stream_id: stream_id.clone(),
561            data: "first".to_string(),
562        };
563
564        let second_event = TestEvent {
565            stream_id: stream_id.clone(),
566            data: "second".to_string(),
567        };
568
569        let writes = StreamWrites::new()
570            .register_stream(stream_id.clone(), StreamVersion::new(0))
571            .and_then(|writes| writes.append(first_event))
572            .and_then(|writes| writes.append(second_event))
573            .expect("append chain should succeed");
574
575        let _ = store
576            .append_events(writes)
577            .await
578            .expect("append to succeed");
579
580        let reader = store
581            .read_stream::<TestEvent>(stream_id)
582            .await
583            .expect("read to succeed");
584
585        let collected: Vec<String> = reader.iter().map(|event| event.data.clone()).collect();
586
587        let observed = (reader.is_empty(), collected);
588
589        assert_eq!(
590            observed,
591            (false, vec!["first".to_string(), "second".to_string()])
592        );
593    }
594
595    #[test]
596    fn stream_writes_accepts_duplicate_stream_with_same_expected_version() {
597        let stream_id = StreamId::try_new("duplicate-stream-same-version".to_string())
598            .expect("valid stream id");
599
600        let first_event = TestEvent {
601            stream_id: stream_id.clone(),
602            data: "first-event".to_string(),
603        };
604
605        let second_event = TestEvent {
606            stream_id: stream_id.clone(),
607            data: "second-event".to_string(),
608        };
609
610        let writes_result = StreamWrites::new()
611            .register_stream(stream_id.clone(), StreamVersion::new(0))
612            .and_then(|writes| writes.append(first_event))
613            .and_then(|writes| writes.append(second_event));
614
615        assert!(writes_result.is_ok());
616    }
617
618    #[test]
619    fn stream_writes_rejects_duplicate_stream_with_conflicting_expected_versions() {
620        let stream_id =
621            StreamId::try_new("duplicate-stream-conflict".to_string()).expect("valid stream id");
622
623        let first_event = TestEvent {
624            stream_id: stream_id.clone(),
625            data: "first-event-conflict".to_string(),
626        };
627
628        let second_event = TestEvent {
629            stream_id: stream_id.clone(),
630            data: "second-event-conflict".to_string(),
631        };
632
633        let conflict = StreamWrites::new()
634            .register_stream(stream_id.clone(), StreamVersion::new(0))
635            .and_then(|writes| writes.append(first_event))
636            .and_then(|writes| writes.register_stream(stream_id.clone(), StreamVersion::new(1)))
637            .and_then(|writes| writes.append(second_event));
638
639        let message = conflict.unwrap_err().to_string();
640
641        assert_eq!(
642            message,
643            "conflicting expected versions for stream duplicate-stream-conflict: first=0, second=1"
644        );
645    }
646
647    #[tokio::test]
648    async fn stream_writes_registers_stream_before_appending_multiple_events() {
649        let store = InMemoryEventStore::new();
650        let stream_id =
651            StreamId::try_new("registered-stream".to_string()).expect("valid stream id");
652
653        let first_event = TestEvent {
654            stream_id: stream_id.clone(),
655            data: "first-registered-event".to_string(),
656        };
657
658        let second_event = TestEvent {
659            stream_id: stream_id.clone(),
660            data: "second-registered-event".to_string(),
661        };
662
663        let writes = StreamWrites::new()
664            .register_stream(stream_id.clone(), StreamVersion::new(0))
665            .and_then(|writes| writes.append(first_event))
666            .and_then(|writes| writes.append(second_event))
667            .expect("registered stream should accept events");
668
669        let result = store.append_events(writes).await;
670
671        assert!(
672            result.is_ok(),
673            "append should succeed when stream registered before events"
674        );
675    }
676
677    #[test]
678    fn stream_writes_rejects_appends_for_unregistered_streams() {
679        let stream_id =
680            StreamId::try_new("unregistered-stream".to_string()).expect("valid stream id");
681
682        let event = TestEvent {
683            stream_id: stream_id.clone(),
684            data: "unregistered-event".to_string(),
685        };
686
687        let error = StreamWrites::new()
688            .append(event)
689            .expect_err("append without prior registration should fail");
690
691        assert!(matches!(
692            error,
693            EventStoreError::UndeclaredStream { stream_id: ref actual } if *actual == stream_id
694        ));
695    }
696
697    #[test]
698    fn expected_versions_returns_registered_streams_and_versions() {
699        let stream_a = StreamId::try_new("stream-a").expect("valid stream id");
700        let stream_b = StreamId::try_new("stream-b").expect("valid stream id");
701
702        let writes = StreamWrites::new()
703            .register_stream(stream_a.clone(), StreamVersion::new(0))
704            .and_then(|w| w.register_stream(stream_b.clone(), StreamVersion::new(5)))
705            .expect("registration should succeed");
706
707        let versions = writes.expected_versions();
708
709        assert_eq!(versions.len(), 2);
710        assert_eq!(versions.get(&stream_a), Some(&StreamVersion::new(0)));
711        assert_eq!(versions.get(&stream_b), Some(&StreamVersion::new(5)));
712    }
713
714    #[test]
715    fn stream_id_rejects_asterisk_metacharacter() {
716        let result = StreamId::try_new("account-*");
717        assert!(
718            result.is_err(),
719            "StreamId should reject asterisk glob metacharacter"
720        );
721    }
722
723    #[test]
724    fn stream_id_rejects_question_mark_metacharacter() {
725        let result = StreamId::try_new("account-?");
726        assert!(
727            result.is_err(),
728            "StreamId should reject question mark glob metacharacter"
729        );
730    }
731
732    #[test]
733    fn stream_id_rejects_open_bracket_metacharacter() {
734        let result = StreamId::try_new("account-[");
735        assert!(
736            result.is_err(),
737            "StreamId should reject open bracket glob metacharacter"
738        );
739    }
740
741    #[test]
742    fn stream_id_rejects_close_bracket_metacharacter() {
743        let result = StreamId::try_new("account-]");
744        assert!(
745            result.is_err(),
746            "StreamId should reject close bracket glob metacharacter"
747        );
748    }
749
750    #[tokio::test]
751    async fn event_reader_after_position_excludes_event_at_position() {
752        // Given: An event store with 3 events
753        let store = InMemoryEventStore::new();
754        let stream_id = StreamId::try_new("reader-test").expect("valid stream id");
755
756        let event1 = TestEvent {
757            stream_id: stream_id.clone(),
758            data: "first".to_string(),
759        };
760        let event2 = TestEvent {
761            stream_id: stream_id.clone(),
762            data: "second".to_string(),
763        };
764        let event3 = TestEvent {
765            stream_id: stream_id.clone(),
766            data: "third".to_string(),
767        };
768
769        let writes = StreamWrites::new()
770            .register_stream(stream_id.clone(), StreamVersion::new(0))
771            .and_then(|w| w.append(event1))
772            .and_then(|w| w.append(event2))
773            .and_then(|w| w.append(event3))
774            .expect("append should succeed");
775
776        let _ = store
777            .append_events(writes)
778            .await
779            .expect("append to succeed");
780
781        // First, read all events to get their positions
782        let all_events = store
783            .read_events::<TestEvent>(EventFilter::all(), EventPage::first(BatchSize::new(100)))
784            .await
785            .expect("read all events to succeed");
786
787        assert_eq!(all_events.len(), 3, "Should have 3 events total");
788        let (first_event, first_position) = &all_events[0];
789
790        // When: We read events after the first event's position
791        let page = EventPage::after(*first_position, BatchSize::new(100));
792        let filter = EventFilter::all();
793        let events = store
794            .read_events::<TestEvent>(filter, page)
795            .await
796            .expect("read to succeed");
797
798        // Then: We should get 2 events (event2 and event3), not including event1
799        assert_eq!(events.len(), 2, "Should get 2 events after first position");
800        assert_eq!(
801            events[0].0.data, "second",
802            "First returned event should be 'second'"
803        );
804        assert_eq!(
805            events[1].0.data, "third",
806            "Second returned event should be 'third'"
807        );
808
809        // And: The first event should NOT be in the results
810        for (event, _pos) in &events {
811            assert_ne!(
812                event.data, first_event.data,
813                "First event should be excluded"
814            );
815        }
816
817        // And: All returned positions should be greater than first_position
818        for (_event, pos) in &events {
819            assert!(
820                *pos > *first_position,
821                "Returned position {} should be > first position {}",
822                pos,
823                first_position
824            );
825        }
826    }
827
828    #[tokio::test]
829    async fn in_memory_event_store_implements_checkpoint_store() {
830        // Given: An InMemoryEventStore
831        let store = InMemoryEventStore::new();
832
833        // When: We save a checkpoint
834        let position = StreamPosition::new(Uuid::now_v7());
835        CheckpointStore::save(&store, "test-projector", position)
836            .await
837            .expect("save should succeed");
838
839        // Then: We can load it back
840        let loaded = CheckpointStore::load(&store, "test-projector")
841            .await
842            .expect("load should succeed");
843        assert_eq!(loaded, Some(position));
844    }
845
846    #[tokio::test]
847    async fn in_memory_event_store_implements_projector_coordinator() {
848        // Given: An InMemoryEventStore
849        let store = InMemoryEventStore::new();
850
851        // When: We try to acquire leadership
852        let guard = ProjectorCoordinator::try_acquire(&store, "test-projector").await;
853
854        // Then: It should succeed
855        assert!(guard.is_ok(), "should acquire leadership");
856    }
857}