Skip to main content

eventcore_memory/
lib.rs

1//! In-memory event store implementation for testing.
2//!
3//! This module provides the `InMemoryEventStore` - a lightweight, zero-dependency
4//! storage backend for EventCore integration tests and development.
5
6use std::collections::HashMap;
7use std::sync::{Arc, RwLock};
8
9use eventcore_types::{
10    CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
11    EventStream, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
12    StreamVersion, StreamWriteEntry, StreamWrites,
13};
14use uuid::Uuid;
15
16type StreamData = (Vec<Box<dyn std::any::Any + Send>>, StreamVersion);
17
18/// Entry in the global event log with indexed stream_id for efficient filtering.
19///
20/// This structure mirrors the Postgres schema where stream_id is a separate
21/// indexed column and event_id (UUID7) serves as the global position.
22/// By storing stream_id and event_id separately, we can filter by stream
23/// prefix and position without parsing JSON, matching the performance
24/// characteristics of the database implementation.
25#[derive(Debug, Clone)]
26struct GlobalLogEntry {
27    /// Event identifier (UUID7), used as global position
28    event_id: Uuid,
29    /// Stream identifier, extracted at write time for efficient filtering
30    stream_id: String,
31    /// Event type name, stored at write time for efficient type filtering
32    event_type: String,
33    /// Event data as serialized JSON (serialized once at append time)
34    event_data: String,
35}
36
37/// Internal storage combining per-stream data with global event ordering.
38struct StoreData {
39    streams: HashMap<StreamId, StreamData>,
40    /// Global log with indexed stream_id for efficient EventReader queries
41    global_log: Vec<GlobalLogEntry>,
42    /// Checkpoint storage for projection progress tracking
43    checkpoints: HashMap<String, StreamPosition>,
44    /// Coordination locks for projector leadership
45    locks: Arc<RwLock<HashMap<String, ()>>>,
46}
47
48/// In-memory event store implementation for testing.
49///
50/// `InMemoryEventStore` provides a lightweight, zero-dependency storage backend
51/// for EventCore integration tests and development. It implements the `EventStore`,
52/// `EventReader`, `CheckpointStore`, and `ProjectorCoordinator` traits using
53/// standard library collections with optimistic concurrency control via version
54/// checking.
55///
56/// # Example
57///
58/// ```no_run
59/// use eventcore_memory::InMemoryEventStore;
60///
61/// let store = InMemoryEventStore::new();
62/// // Use store with execute() function
63/// ```
64///
65/// # Thread Safety
66///
67/// `InMemoryEventStore` uses interior mutability (`Mutex`) for concurrent access.
68pub struct InMemoryEventStore {
69    data: std::sync::Mutex<StoreData>,
70}
71
72impl InMemoryEventStore {
73    /// Create a new in-memory event store.
74    ///
75    /// Returns an empty event store ready for command execution.
76    /// All streams start at version 0 (no events).
77    pub fn new() -> Self {
78        Self {
79            data: std::sync::Mutex::new(StoreData {
80                streams: HashMap::new(),
81                global_log: Vec::new(),
82                checkpoints: HashMap::new(),
83                locks: Arc::new(RwLock::new(HashMap::new())),
84            }),
85        }
86    }
87}
88
89impl Default for InMemoryEventStore {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95impl EventStore for InMemoryEventStore {
96    async fn read_stream<E: Event>(
97        &self,
98        stream_id: StreamId,
99    ) -> Result<EventStream<E>, EventStoreError> {
100        // The in-memory store keeps events behind a lock as type-erased
101        // `Box<dyn Any>`. Producing an owned `E` per item requires a downcast
102        // and clone (expected — see #363), so we materialize the per-event
103        // results while holding the lock, then release it and yield the items
104        // one at a time. The stream is still consumed incrementally by the
105        // executor; only this local, in-process backend buffers the owned
106        // clones (which already existed in memory).
107        let items: Vec<Result<E, EventStoreError>> = {
108            let data = self
109                .data
110                .lock()
111                .map_err(|_| EventStoreError::StoreFailure {
112                    operation: Operation::ReadStream,
113                })?;
114            match data.streams.get(&stream_id) {
115                None => Vec::new(),
116                Some((boxed_events, _version)) => boxed_events
117                    .iter()
118                    .map(|boxed| match boxed.downcast_ref::<E>() {
119                        Some(event) => Ok(event.clone()),
120                        None => Err(EventStoreError::DeserializationFailed {
121                            stream_id: stream_id.clone(),
122                            detail: format!(
123                                "event could not be downcast to {}",
124                                std::any::type_name::<E>()
125                            ),
126                        }),
127                    })
128                    .collect(),
129            }
130        };
131
132        Ok(EventStream::new(futures::stream::iter(items)))
133    }
134
135    async fn append_events(
136        &self,
137        writes: StreamWrites,
138    ) -> Result<EventStreamSlice, EventStoreError> {
139        let mut data = self
140            .data
141            .lock()
142            .map_err(|_| EventStoreError::StoreFailure {
143                operation: Operation::AppendEvents,
144            })?;
145        let expected_versions = writes.expected_versions().clone();
146
147        // Check all version constraints before writing any events
148        for (stream_id, expected_version) in &expected_versions {
149            let current_version = data
150                .streams
151                .get(stream_id)
152                .map(|(_events, version)| *version)
153                .unwrap_or_else(|| StreamVersion::new(0));
154
155            if current_version != *expected_version {
156                return Err(EventStoreError::VersionConflict {
157                    stream_id: stream_id.clone(),
158                    expected: *expected_version,
159                    actual: current_version,
160                });
161            }
162        }
163
164        // All versions match - proceed with writes
165        for entry in writes.into_entries() {
166            let StreamWriteEntry {
167                stream_id,
168                event,
169                event_type,
170                event_data,
171            } = entry;
172
173            // Generate UUID7 for this event (monotonic, timestamp-ordered)
174            let event_id = Uuid::now_v7();
175
176            // Store in global log for EventReader with indexed stream_id, event_type, and event_id.
177            // event_data is already serialized JSON; keep the raw string.
178            data.global_log.push(GlobalLogEntry {
179                event_id,
180                stream_id: stream_id.as_ref().to_string(),
181                event_type: event_type.to_string(),
182                event_data: event_data.get().to_owned(),
183            });
184
185            let (events, version) = data
186                .streams
187                .entry(stream_id)
188                .or_insert_with(|| (Vec::new(), StreamVersion::new(0)));
189            events.push(event);
190            *version = version.increment();
191        }
192
193        Ok(EventStreamSlice)
194    }
195}
196
197impl EventReader for InMemoryEventStore {
198    type Error = EventStoreError;
199
200    async fn read_events<E: Event>(
201        &self,
202        filter: EventFilter,
203        page: EventPage,
204    ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
205        let data = self
206            .data
207            .lock()
208            .map_err(|_| EventStoreError::StoreFailure {
209                operation: Operation::ReadStream,
210            })?;
211
212        let after_event_id = page.after_position().map(|p| p.into_inner());
213
214        let events: Vec<(E, StreamPosition)> = data
215            .global_log
216            .iter()
217            .filter(|entry| {
218                // Filter by event_id (UUID7 comparison)
219                match after_event_id {
220                    None => true,
221                    Some(after_id) => entry.event_id > after_id,
222                }
223            })
224            .filter(|entry| {
225                // Filter by indexed stream_id WITHOUT parsing JSON (matches Postgres behavior)
226                match filter.stream_prefix() {
227                    None => true,
228                    Some(prefix) => entry.stream_id.starts_with(prefix.as_ref()),
229                }
230            })
231            .filter(|entry| {
232                // Filter by glob pattern (ADR-0047) at the query level, before
233                // take(), so non-matching streams don't consume batch slots.
234                match filter.stream_pattern() {
235                    None => true,
236                    Some(pattern) => pattern.matches(&entry.stream_id),
237                }
238            })
239            .filter(|entry| {
240                // Filter by event_type BEFORE take() so non-matching types
241                // don't consume batch slots (fixes issue #372).
242                // Use explicit filter if set, otherwise derive from E::event_type_name().
243                let type_filter = filter.event_type().unwrap_or_else(|| E::event_type_name());
244                entry.event_type == type_filter
245            })
246            .take(page.limit().into_inner())
247            .filter_map(|entry| {
248                serde_json::from_str::<E>(&entry.event_data)
249                    .ok()
250                    .map(|e| (e, StreamPosition::new(entry.event_id)))
251            })
252            .collect();
253
254        Ok(events)
255    }
256}
257
258impl CheckpointStore for InMemoryEventStore {
259    type Error = InMemoryCheckpointError;
260
261    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
262        let data = self
263            .data
264            .lock()
265            .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
266        Ok(data.checkpoints.get(name).copied())
267    }
268
269    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
270        let mut data = self
271            .data
272            .lock()
273            .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
274        let _ = data.checkpoints.insert(name.to_string(), position);
275        Ok(())
276    }
277}
278
279impl ProjectorCoordinator for InMemoryEventStore {
280    type Error = InMemoryCoordinationError;
281    type Guard = InMemoryCoordinationGuard;
282
283    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
284        let data = self
285            .data
286            .lock()
287            .map_err(|e| InMemoryCoordinationError::LockPoisoned {
288                message: e.to_string(),
289            })?;
290
291        let mut guard =
292            data.locks
293                .write()
294                .map_err(|e| InMemoryCoordinationError::LockPoisoned {
295                    message: e.to_string(),
296                })?;
297
298        if guard.contains_key(subscription_name) {
299            return Err(InMemoryCoordinationError::LeadershipNotAcquired {
300                subscription_name: subscription_name.to_string(),
301            });
302        }
303
304        let _ = guard.insert(subscription_name.to_string(), ());
305
306        Ok(InMemoryCoordinationGuard {
307            subscription_name: subscription_name.to_string(),
308            locks: Arc::clone(&data.locks),
309        })
310    }
311}
312
313/// In-memory checkpoint store for tracking projection progress.
314///
315/// `InMemoryCheckpointStore` stores checkpoint positions in memory using a
316/// thread-safe `Arc<RwLock<HashMap>>`. It is primarily useful for testing
317/// and single-process deployments where persistence across restarts is not required.
318///
319/// For production deployments requiring durability, use a persistent
320/// checkpoint store implementation.
321///
322/// # Example
323///
324/// ```no_run
325/// use eventcore_memory::InMemoryCheckpointStore;
326///
327/// let checkpoint_store = InMemoryCheckpointStore::new();
328/// // Implements CheckpointStore for tracking projection progress.
329/// ```
330#[derive(Debug, Clone, Default)]
331pub struct InMemoryCheckpointStore {
332    checkpoints: Arc<RwLock<HashMap<String, StreamPosition>>>,
333}
334
335impl InMemoryCheckpointStore {
336    /// Create a new in-memory checkpoint store.
337    pub fn new() -> Self {
338        Self::default()
339    }
340}
341
342/// Error type for in-memory checkpoint store operations.
343///
344/// Since the in-memory store uses an `RwLock`, the only possible error
345/// is a poisoned lock from a panic in another thread.
346#[derive(Debug, Clone, thiserror::Error)]
347pub enum InMemoryCheckpointError {
348    #[error("failed to acquire lock: {0}")]
349    LockFailed(String),
350}
351
352/// Error type for in-memory coordinator operations.
353#[derive(Debug, Clone, thiserror::Error)]
354pub enum InMemoryCoordinationError {
355    /// Leadership is already held by another instance.
356    #[error(
357        "leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
358    )]
359    LeadershipNotAcquired { subscription_name: String },
360    /// Lock was poisoned by a panic in another thread.
361    #[error("lock poisoned: {message}")]
362    LockPoisoned { message: String },
363}
364
365/// Guard that releases leadership when dropped.
366#[derive(Debug)]
367pub struct InMemoryCoordinationGuard {
368    subscription_name: String,
369    locks: Arc<RwLock<HashMap<String, ()>>>,
370}
371
372impl Drop for InMemoryCoordinationGuard {
373    fn drop(&mut self) {
374        if let Ok(mut guard) = self.locks.write() {
375            let _ = guard.remove(&self.subscription_name);
376        } else {
377            tracing::error!(
378                subscription_name = %self.subscription_name,
379                "failed to release coordination lock: RwLock poisoned"
380            );
381        }
382    }
383}
384
385/// In-memory projector coordinator for single-process deployments.
386///
387/// `InMemoryProjectorCoordinator` provides coordination for projectors within a single
388/// process using an in-memory lock table. This is suitable for testing and single-process
389/// deployments where distributed coordination is not required.
390///
391/// For distributed deployments with multiple process instances, use a database-backed
392/// coordinator implementation (e.g., PostgreSQL advisory locks).
393#[derive(Debug, Clone, Default)]
394pub struct InMemoryProjectorCoordinator {
395    locks: Arc<RwLock<HashMap<String, ()>>>,
396}
397
398impl InMemoryProjectorCoordinator {
399    /// Create a new in-memory projector coordinator.
400    pub fn new() -> Self {
401        Self::default()
402    }
403}
404
405impl ProjectorCoordinator for InMemoryProjectorCoordinator {
406    type Error = InMemoryCoordinationError;
407    type Guard = InMemoryCoordinationGuard;
408
409    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
410        let mut guard =
411            self.locks
412                .write()
413                .map_err(|e| InMemoryCoordinationError::LockPoisoned {
414                    message: e.to_string(),
415                })?;
416
417        if guard.contains_key(subscription_name) {
418            return Err(InMemoryCoordinationError::LeadershipNotAcquired {
419                subscription_name: subscription_name.to_string(),
420            });
421        }
422
423        let _ = guard.insert(subscription_name.to_string(), ());
424
425        Ok(InMemoryCoordinationGuard {
426            subscription_name: subscription_name.to_string(),
427            locks: Arc::clone(&self.locks),
428        })
429    }
430}
431
432impl CheckpointStore for InMemoryCheckpointStore {
433    type Error = InMemoryCheckpointError;
434
435    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
436        let guard = self
437            .checkpoints
438            .read()
439            .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
440        Ok(guard.get(name).copied())
441    }
442
443    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
444        let mut guard = self
445            .checkpoints
446            .write()
447            .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
448        let _ = guard.insert(name.to_string(), position);
449        Ok(())
450    }
451}
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456    use eventcore_types::collect_events;
457    use eventcore_types::{BatchSize, EventFilter, EventPage};
458    use serde::{Deserialize, Serialize};
459
460    /// Test-specific domain event type for unit testing storage operations.
461    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
462    struct TestEvent {
463        stream_id: StreamId,
464        data: String,
465    }
466
467    impl Event for TestEvent {
468        fn stream_id(&self) -> &StreamId {
469            &self.stream_id
470        }
471
472        fn event_type_name() -> &'static str {
473            "TestEvent"
474        }
475    }
476
477    /// Unit test: Verify InMemoryEventStore can append and retrieve a single event
478    ///
479    /// This test verifies the fundamental event storage capability:
480    /// - Append an event to a stream
481    /// - Read the stream back
482    /// - Verify the event is retrievable with correct data
483    ///
484    /// This is a unit test drilling down from the failing integration test
485    /// test_deposit_command_event_data_is_retrievable. We're testing the
486    /// storage layer in isolation before testing the full command execution flow.
487    #[tokio::test]
488    async fn test_append_and_read_single_event() {
489        // Given: An in-memory event store
490        let store = InMemoryEventStore::new();
491
492        // And: A stream ID
493        let stream_id = StreamId::try_new("test-stream-123".to_string()).expect("valid stream id");
494
495        // And: A domain event to store
496        let event = TestEvent {
497            stream_id: stream_id.clone(),
498            data: "test event data".to_string(),
499        };
500
501        // And: A collection of writes containing the event (expected version 0 for empty stream)
502        let writes = StreamWrites::new()
503            .register_stream(stream_id.clone(), StreamVersion::new(0))
504            .and_then(|writes| writes.append(event.clone()))
505            .expect("append should succeed");
506
507        // When: We append the event to the store
508        let _ = store
509            .append_events(writes)
510            .await
511            .expect("append to succeed");
512
513        let stream = store
514            .read_stream::<TestEvent>(stream_id)
515            .await
516            .expect("read to succeed");
517        let events = collect_events(stream).await.expect("collect to succeed");
518
519        let observed = (events.is_empty(), events.len());
520
521        assert_eq!(observed, (false, 1usize));
522    }
523
524    #[tokio::test]
525    async fn event_stream_reader_is_empty_reflects_stream_population() {
526        let store = InMemoryEventStore::new();
527        let stream_id =
528            StreamId::try_new("is-empty-observation".to_string()).expect("valid stream id");
529
530        let initial_stream = store
531            .read_stream::<TestEvent>(stream_id.clone())
532            .await
533            .expect("initial read to succeed");
534        let initial_events = collect_events(initial_stream)
535            .await
536            .expect("collect to succeed");
537
538        let event = TestEvent {
539            stream_id: stream_id.clone(),
540            data: "populated event".to_string(),
541        };
542
543        let writes = StreamWrites::new()
544            .register_stream(stream_id.clone(), StreamVersion::new(0))
545            .and_then(|writes| writes.append(event))
546            .expect("append should succeed");
547
548        let _ = store
549            .append_events(writes)
550            .await
551            .expect("append to succeed");
552
553        let populated_stream = store
554            .read_stream::<TestEvent>(stream_id)
555            .await
556            .expect("populated read to succeed");
557        let populated_events = collect_events(populated_stream)
558            .await
559            .expect("collect to succeed");
560
561        let observed = (
562            initial_events.is_empty(),
563            initial_events.len(),
564            populated_events.is_empty(),
565            populated_events.len(),
566        );
567
568        assert_eq!(observed, (true, 0usize, false, 1usize));
569    }
570
571    #[tokio::test]
572    async fn read_stream_iterates_through_events_in_order() {
573        let store = InMemoryEventStore::new();
574        let stream_id = StreamId::try_new("ordered-stream".to_string()).expect("valid stream id");
575
576        let first_event = TestEvent {
577            stream_id: stream_id.clone(),
578            data: "first".to_string(),
579        };
580
581        let second_event = TestEvent {
582            stream_id: stream_id.clone(),
583            data: "second".to_string(),
584        };
585
586        let writes = StreamWrites::new()
587            .register_stream(stream_id.clone(), StreamVersion::new(0))
588            .and_then(|writes| writes.append(first_event))
589            .and_then(|writes| writes.append(second_event))
590            .expect("append chain should succeed");
591
592        let _ = store
593            .append_events(writes)
594            .await
595            .expect("append to succeed");
596
597        let stream = store
598            .read_stream::<TestEvent>(stream_id)
599            .await
600            .expect("read to succeed");
601        let events = collect_events(stream).await.expect("collect to succeed");
602
603        let collected: Vec<String> = events.iter().map(|event| event.data.clone()).collect();
604
605        let observed = (events.is_empty(), collected);
606
607        assert_eq!(
608            observed,
609            (false, vec!["first".to_string(), "second".to_string()])
610        );
611    }
612
613    #[test]
614    fn stream_writes_accepts_duplicate_stream_with_same_expected_version() {
615        let stream_id = StreamId::try_new("duplicate-stream-same-version".to_string())
616            .expect("valid stream id");
617
618        let first_event = TestEvent {
619            stream_id: stream_id.clone(),
620            data: "first-event".to_string(),
621        };
622
623        let second_event = TestEvent {
624            stream_id: stream_id.clone(),
625            data: "second-event".to_string(),
626        };
627
628        let writes_result = StreamWrites::new()
629            .register_stream(stream_id.clone(), StreamVersion::new(0))
630            .and_then(|writes| writes.append(first_event))
631            .and_then(|writes| writes.append(second_event));
632
633        assert!(writes_result.is_ok());
634    }
635
636    #[test]
637    fn stream_writes_rejects_duplicate_stream_with_conflicting_expected_versions() {
638        let stream_id =
639            StreamId::try_new("duplicate-stream-conflict".to_string()).expect("valid stream id");
640
641        let first_event = TestEvent {
642            stream_id: stream_id.clone(),
643            data: "first-event-conflict".to_string(),
644        };
645
646        let second_event = TestEvent {
647            stream_id: stream_id.clone(),
648            data: "second-event-conflict".to_string(),
649        };
650
651        let conflict = StreamWrites::new()
652            .register_stream(stream_id.clone(), StreamVersion::new(0))
653            .and_then(|writes| writes.append(first_event))
654            .and_then(|writes| writes.register_stream(stream_id.clone(), StreamVersion::new(1)))
655            .and_then(|writes| writes.append(second_event));
656
657        let message = conflict.unwrap_err().to_string();
658
659        assert_eq!(
660            message,
661            "conflicting expected versions for stream duplicate-stream-conflict: first=0, second=1"
662        );
663    }
664
665    #[tokio::test]
666    async fn stream_writes_registers_stream_before_appending_multiple_events() {
667        let store = InMemoryEventStore::new();
668        let stream_id =
669            StreamId::try_new("registered-stream".to_string()).expect("valid stream id");
670
671        let first_event = TestEvent {
672            stream_id: stream_id.clone(),
673            data: "first-registered-event".to_string(),
674        };
675
676        let second_event = TestEvent {
677            stream_id: stream_id.clone(),
678            data: "second-registered-event".to_string(),
679        };
680
681        let writes = StreamWrites::new()
682            .register_stream(stream_id.clone(), StreamVersion::new(0))
683            .and_then(|writes| writes.append(first_event))
684            .and_then(|writes| writes.append(second_event))
685            .expect("registered stream should accept events");
686
687        let result = store.append_events(writes).await;
688
689        assert!(
690            result.is_ok(),
691            "append should succeed when stream registered before events"
692        );
693    }
694
695    #[test]
696    fn stream_writes_rejects_appends_for_unregistered_streams() {
697        let stream_id =
698            StreamId::try_new("unregistered-stream".to_string()).expect("valid stream id");
699
700        let event = TestEvent {
701            stream_id: stream_id.clone(),
702            data: "unregistered-event".to_string(),
703        };
704
705        let error = StreamWrites::new()
706            .append(event)
707            .expect_err("append without prior registration should fail");
708
709        assert!(matches!(
710            error,
711            EventStoreError::UndeclaredStream { stream_id: ref actual } if *actual == stream_id
712        ));
713    }
714
715    #[test]
716    fn expected_versions_returns_registered_streams_and_versions() {
717        let stream_a = StreamId::try_new("stream-a").expect("valid stream id");
718        let stream_b = StreamId::try_new("stream-b").expect("valid stream id");
719
720        let writes = StreamWrites::new()
721            .register_stream(stream_a.clone(), StreamVersion::new(0))
722            .and_then(|w| w.register_stream(stream_b.clone(), StreamVersion::new(5)))
723            .expect("registration should succeed");
724
725        let versions = writes.expected_versions();
726
727        assert_eq!(versions.len(), 2);
728        assert_eq!(versions.get(&stream_a), Some(&StreamVersion::new(0)));
729        assert_eq!(versions.get(&stream_b), Some(&StreamVersion::new(5)));
730    }
731
732    #[test]
733    fn stream_id_rejects_asterisk_metacharacter() {
734        let result = StreamId::try_new("account-*");
735        assert!(
736            result.is_err(),
737            "StreamId should reject asterisk glob metacharacter"
738        );
739    }
740
741    #[test]
742    fn stream_id_rejects_question_mark_metacharacter() {
743        let result = StreamId::try_new("account-?");
744        assert!(
745            result.is_err(),
746            "StreamId should reject question mark glob metacharacter"
747        );
748    }
749
750    #[test]
751    fn stream_id_rejects_open_bracket_metacharacter() {
752        let result = StreamId::try_new("account-[");
753        assert!(
754            result.is_err(),
755            "StreamId should reject open bracket glob metacharacter"
756        );
757    }
758
759    #[test]
760    fn stream_id_rejects_close_bracket_metacharacter() {
761        let result = StreamId::try_new("account-]");
762        assert!(
763            result.is_err(),
764            "StreamId should reject close bracket glob metacharacter"
765        );
766    }
767
768    #[tokio::test]
769    async fn event_reader_after_position_excludes_event_at_position() {
770        // Given: An event store with 3 events
771        let store = InMemoryEventStore::new();
772        let stream_id = StreamId::try_new("reader-test").expect("valid stream id");
773
774        let event1 = TestEvent {
775            stream_id: stream_id.clone(),
776            data: "first".to_string(),
777        };
778        let event2 = TestEvent {
779            stream_id: stream_id.clone(),
780            data: "second".to_string(),
781        };
782        let event3 = TestEvent {
783            stream_id: stream_id.clone(),
784            data: "third".to_string(),
785        };
786
787        let writes = StreamWrites::new()
788            .register_stream(stream_id.clone(), StreamVersion::new(0))
789            .and_then(|w| w.append(event1))
790            .and_then(|w| w.append(event2))
791            .and_then(|w| w.append(event3))
792            .expect("append should succeed");
793
794        let _ = store
795            .append_events(writes)
796            .await
797            .expect("append to succeed");
798
799        // First, read all events to get their positions
800        let all_events = store
801            .read_events::<TestEvent>(EventFilter::all(), EventPage::first(BatchSize::new(100)))
802            .await
803            .expect("read all events to succeed");
804
805        assert_eq!(all_events.len(), 3, "Should have 3 events total");
806        let (first_event, first_position) = &all_events[0];
807
808        // When: We read events after the first event's position
809        let page = EventPage::after(*first_position, BatchSize::new(100));
810        let filter = EventFilter::all();
811        let events = store
812            .read_events::<TestEvent>(filter, page)
813            .await
814            .expect("read to succeed");
815
816        // Then: We should get 2 events (event2 and event3), not including event1
817        assert_eq!(events.len(), 2, "Should get 2 events after first position");
818        assert_eq!(
819            events[0].0.data, "second",
820            "First returned event should be 'second'"
821        );
822        assert_eq!(
823            events[1].0.data, "third",
824            "Second returned event should be 'third'"
825        );
826
827        // And: The first event should NOT be in the results
828        for (event, _pos) in &events {
829            assert_ne!(
830                event.data, first_event.data,
831                "First event should be excluded"
832            );
833        }
834
835        // And: All returned positions should be greater than first_position
836        for (_event, pos) in &events {
837            assert!(
838                *pos > *first_position,
839                "Returned position {} should be > first position {}",
840                pos,
841                first_position
842            );
843        }
844    }
845
846    #[tokio::test]
847    async fn in_memory_event_store_implements_checkpoint_store() {
848        // Given: An InMemoryEventStore
849        let store = InMemoryEventStore::new();
850
851        // When: We save a checkpoint
852        let position = StreamPosition::new(Uuid::now_v7());
853        CheckpointStore::save(&store, "test-projector", position)
854            .await
855            .expect("save should succeed");
856
857        // Then: We can load it back
858        let loaded = CheckpointStore::load(&store, "test-projector")
859            .await
860            .expect("load should succeed");
861        assert_eq!(loaded, Some(position));
862    }
863
864    #[tokio::test]
865    async fn in_memory_event_store_implements_projector_coordinator() {
866        // Given: An InMemoryEventStore
867        let store = InMemoryEventStore::new();
868
869        // When: We try to acquire leadership
870        let guard = ProjectorCoordinator::try_acquire(&store, "test-projector").await;
871
872        // Then: It should succeed
873        assert!(guard.is_ok(), "should acquire leadership");
874    }
875}