1use std::collections::HashMap;
7use std::sync::{Arc, RwLock};
8
9use eventcore_types::{
10 CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
11 EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
12 StreamVersion, StreamWriteEntry, StreamWrites,
13};
14use uuid::Uuid;
15
16type StreamData = (Vec<Box<dyn std::any::Any + Send>>, StreamVersion);
17
18#[derive(Debug, Clone)]
26struct GlobalLogEntry {
27 event_id: Uuid,
29 stream_id: String,
31 event_type: String,
33 event_data: serde_json::Value,
35}
36
37struct StoreData {
39 streams: HashMap<StreamId, StreamData>,
40 global_log: Vec<GlobalLogEntry>,
42 checkpoints: HashMap<String, StreamPosition>,
44 locks: Arc<RwLock<HashMap<String, ()>>>,
46}
47
48pub struct InMemoryEventStore {
69 data: std::sync::Mutex<StoreData>,
70}
71
72impl InMemoryEventStore {
73 pub fn new() -> Self {
78 Self {
79 data: std::sync::Mutex::new(StoreData {
80 streams: HashMap::new(),
81 global_log: Vec::new(),
82 checkpoints: HashMap::new(),
83 locks: Arc::new(RwLock::new(HashMap::new())),
84 }),
85 }
86 }
87}
88
89impl Default for InMemoryEventStore {
90 fn default() -> Self {
91 Self::new()
92 }
93}
94
95impl EventStore for InMemoryEventStore {
96 async fn read_stream<E: Event>(
97 &self,
98 stream_id: StreamId,
99 ) -> Result<EventStreamReader<E>, EventStoreError> {
100 let data = self
101 .data
102 .lock()
103 .map_err(|_| EventStoreError::StoreFailure {
104 operation: Operation::ReadStream,
105 })?;
106 let events = match data.streams.get(&stream_id) {
107 None => Vec::new(),
108 Some((boxed_events, _version)) => {
109 let mut events = Vec::with_capacity(boxed_events.len());
110 for boxed in boxed_events {
111 match boxed.downcast_ref::<E>() {
112 Some(event) => events.push(event.clone()),
113 None => {
114 return Err(EventStoreError::DeserializationFailed {
115 stream_id,
116 detail: format!(
117 "event could not be downcast to {}",
118 std::any::type_name::<E>()
119 ),
120 });
121 }
122 }
123 }
124 events
125 }
126 };
127
128 Ok(EventStreamReader::new(events))
129 }
130
131 async fn append_events(
132 &self,
133 writes: StreamWrites,
134 ) -> Result<EventStreamSlice, EventStoreError> {
135 let mut data = self
136 .data
137 .lock()
138 .map_err(|_| EventStoreError::StoreFailure {
139 operation: Operation::AppendEvents,
140 })?;
141 let expected_versions = writes.expected_versions().clone();
142
143 for (stream_id, expected_version) in &expected_versions {
145 let current_version = data
146 .streams
147 .get(stream_id)
148 .map(|(_events, version)| *version)
149 .unwrap_or_else(|| StreamVersion::new(0));
150
151 if current_version != *expected_version {
152 return Err(EventStoreError::VersionConflict {
153 stream_id: stream_id.clone(),
154 expected: *expected_version,
155 actual: current_version,
156 });
157 }
158 }
159
160 for entry in writes.into_entries() {
162 let StreamWriteEntry {
163 stream_id,
164 event,
165 event_type,
166 event_data,
167 } = entry;
168
169 let event_id = Uuid::now_v7();
171
172 data.global_log.push(GlobalLogEntry {
174 event_id,
175 stream_id: stream_id.as_ref().to_string(),
176 event_type: event_type.to_string(),
177 event_data,
178 });
179
180 let (events, version) = data
181 .streams
182 .entry(stream_id)
183 .or_insert_with(|| (Vec::new(), StreamVersion::new(0)));
184 events.push(event);
185 *version = version.increment();
186 }
187
188 Ok(EventStreamSlice)
189 }
190}
191
192impl EventReader for InMemoryEventStore {
193 type Error = EventStoreError;
194
195 async fn read_events<E: Event>(
196 &self,
197 filter: EventFilter,
198 page: EventPage,
199 ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
200 let data = self
201 .data
202 .lock()
203 .map_err(|_| EventStoreError::StoreFailure {
204 operation: Operation::ReadStream,
205 })?;
206
207 let after_event_id = page.after_position().map(|p| p.into_inner());
208
209 let events: Vec<(E, StreamPosition)> = data
210 .global_log
211 .iter()
212 .filter(|entry| {
213 match after_event_id {
215 None => true,
216 Some(after_id) => entry.event_id > after_id,
217 }
218 })
219 .filter(|entry| {
220 match filter.stream_prefix() {
222 None => true,
223 Some(prefix) => entry.stream_id.starts_with(prefix.as_ref()),
224 }
225 })
226 .filter(|entry| {
227 let type_filter = filter.event_type().unwrap_or_else(|| E::event_type_name());
231 entry.event_type == type_filter
232 })
233 .take(page.limit().into_inner())
234 .filter_map(|entry| {
235 serde_json::from_value::<E>(entry.event_data.clone())
236 .ok()
237 .map(|e| (e, StreamPosition::new(entry.event_id)))
238 })
239 .collect();
240
241 Ok(events)
242 }
243}
244
245impl CheckpointStore for InMemoryEventStore {
246 type Error = InMemoryCheckpointError;
247
248 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
249 let data = self
250 .data
251 .lock()
252 .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
253 Ok(data.checkpoints.get(name).copied())
254 }
255
256 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
257 let mut data = self
258 .data
259 .lock()
260 .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
261 let _ = data.checkpoints.insert(name.to_string(), position);
262 Ok(())
263 }
264}
265
266impl ProjectorCoordinator for InMemoryEventStore {
267 type Error = InMemoryCoordinationError;
268 type Guard = InMemoryCoordinationGuard;
269
270 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
271 let data = self
272 .data
273 .lock()
274 .map_err(|e| InMemoryCoordinationError::LockPoisoned {
275 message: e.to_string(),
276 })?;
277
278 let mut guard =
279 data.locks
280 .write()
281 .map_err(|e| InMemoryCoordinationError::LockPoisoned {
282 message: e.to_string(),
283 })?;
284
285 if guard.contains_key(subscription_name) {
286 return Err(InMemoryCoordinationError::LeadershipNotAcquired {
287 subscription_name: subscription_name.to_string(),
288 });
289 }
290
291 let _ = guard.insert(subscription_name.to_string(), ());
292
293 Ok(InMemoryCoordinationGuard {
294 subscription_name: subscription_name.to_string(),
295 locks: Arc::clone(&data.locks),
296 })
297 }
298}
299
300#[derive(Debug, Clone, Default)]
318pub struct InMemoryCheckpointStore {
319 checkpoints: Arc<RwLock<HashMap<String, StreamPosition>>>,
320}
321
322impl InMemoryCheckpointStore {
323 pub fn new() -> Self {
325 Self::default()
326 }
327}
328
329#[derive(Debug, Clone, thiserror::Error)]
334pub enum InMemoryCheckpointError {
335 #[error("failed to acquire lock: {0}")]
336 LockFailed(String),
337}
338
339#[derive(Debug, Clone, thiserror::Error)]
341pub enum InMemoryCoordinationError {
342 #[error(
344 "leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
345 )]
346 LeadershipNotAcquired { subscription_name: String },
347 #[error("lock poisoned: {message}")]
349 LockPoisoned { message: String },
350}
351
352#[derive(Debug)]
354pub struct InMemoryCoordinationGuard {
355 subscription_name: String,
356 locks: Arc<RwLock<HashMap<String, ()>>>,
357}
358
359impl Drop for InMemoryCoordinationGuard {
360 fn drop(&mut self) {
361 if let Ok(mut guard) = self.locks.write() {
362 let _ = guard.remove(&self.subscription_name);
363 } else {
364 tracing::error!(
365 subscription_name = %self.subscription_name,
366 "failed to release coordination lock: RwLock poisoned"
367 );
368 }
369 }
370}
371
372#[derive(Debug, Clone, Default)]
381pub struct InMemoryProjectorCoordinator {
382 locks: Arc<RwLock<HashMap<String, ()>>>,
383}
384
385impl InMemoryProjectorCoordinator {
386 pub fn new() -> Self {
388 Self::default()
389 }
390}
391
392impl ProjectorCoordinator for InMemoryProjectorCoordinator {
393 type Error = InMemoryCoordinationError;
394 type Guard = InMemoryCoordinationGuard;
395
396 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
397 let mut guard =
398 self.locks
399 .write()
400 .map_err(|e| InMemoryCoordinationError::LockPoisoned {
401 message: e.to_string(),
402 })?;
403
404 if guard.contains_key(subscription_name) {
405 return Err(InMemoryCoordinationError::LeadershipNotAcquired {
406 subscription_name: subscription_name.to_string(),
407 });
408 }
409
410 let _ = guard.insert(subscription_name.to_string(), ());
411
412 Ok(InMemoryCoordinationGuard {
413 subscription_name: subscription_name.to_string(),
414 locks: Arc::clone(&self.locks),
415 })
416 }
417}
418
419impl CheckpointStore for InMemoryCheckpointStore {
420 type Error = InMemoryCheckpointError;
421
422 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
423 let guard = self
424 .checkpoints
425 .read()
426 .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
427 Ok(guard.get(name).copied())
428 }
429
430 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
431 let mut guard = self
432 .checkpoints
433 .write()
434 .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
435 let _ = guard.insert(name.to_string(), position);
436 Ok(())
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 use super::*;
443 use eventcore_types::{BatchSize, EventFilter, EventPage};
444 use serde::{Deserialize, Serialize};
445
446 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
448 struct TestEvent {
449 stream_id: StreamId,
450 data: String,
451 }
452
453 impl Event for TestEvent {
454 fn stream_id(&self) -> &StreamId {
455 &self.stream_id
456 }
457
458 fn event_type_name() -> &'static str {
459 "TestEvent"
460 }
461 }
462
463 #[tokio::test]
474 async fn test_append_and_read_single_event() {
475 let store = InMemoryEventStore::new();
477
478 let stream_id = StreamId::try_new("test-stream-123".to_string()).expect("valid stream id");
480
481 let event = TestEvent {
483 stream_id: stream_id.clone(),
484 data: "test event data".to_string(),
485 };
486
487 let writes = StreamWrites::new()
489 .register_stream(stream_id.clone(), StreamVersion::new(0))
490 .and_then(|writes| writes.append(event.clone()))
491 .expect("append should succeed");
492
493 let _ = store
495 .append_events(writes)
496 .await
497 .expect("append to succeed");
498
499 let reader = store
500 .read_stream::<TestEvent>(stream_id)
501 .await
502 .expect("read to succeed");
503
504 let observed = (
505 reader.is_empty(),
506 reader.len(),
507 reader.iter().next().is_none(),
508 );
509
510 assert_eq!(observed, (false, 1usize, false));
511 }
512
513 #[tokio::test]
514 async fn event_stream_reader_is_empty_reflects_stream_population() {
515 let store = InMemoryEventStore::new();
516 let stream_id =
517 StreamId::try_new("is-empty-observation".to_string()).expect("valid stream id");
518
519 let initial_reader = store
520 .read_stream::<TestEvent>(stream_id.clone())
521 .await
522 .expect("initial read to succeed");
523
524 let event = TestEvent {
525 stream_id: stream_id.clone(),
526 data: "populated event".to_string(),
527 };
528
529 let writes = StreamWrites::new()
530 .register_stream(stream_id.clone(), StreamVersion::new(0))
531 .and_then(|writes| writes.append(event))
532 .expect("append should succeed");
533
534 let _ = store
535 .append_events(writes)
536 .await
537 .expect("append to succeed");
538
539 let populated_reader = store
540 .read_stream::<TestEvent>(stream_id)
541 .await
542 .expect("populated read to succeed");
543
544 let observed = (
545 initial_reader.is_empty(),
546 initial_reader.len(),
547 populated_reader.is_empty(),
548 populated_reader.len(),
549 );
550
551 assert_eq!(observed, (true, 0usize, false, 1usize));
552 }
553
554 #[tokio::test]
555 async fn read_stream_iterates_through_events_in_order() {
556 let store = InMemoryEventStore::new();
557 let stream_id = StreamId::try_new("ordered-stream".to_string()).expect("valid stream id");
558
559 let first_event = TestEvent {
560 stream_id: stream_id.clone(),
561 data: "first".to_string(),
562 };
563
564 let second_event = TestEvent {
565 stream_id: stream_id.clone(),
566 data: "second".to_string(),
567 };
568
569 let writes = StreamWrites::new()
570 .register_stream(stream_id.clone(), StreamVersion::new(0))
571 .and_then(|writes| writes.append(first_event))
572 .and_then(|writes| writes.append(second_event))
573 .expect("append chain should succeed");
574
575 let _ = store
576 .append_events(writes)
577 .await
578 .expect("append to succeed");
579
580 let reader = store
581 .read_stream::<TestEvent>(stream_id)
582 .await
583 .expect("read to succeed");
584
585 let collected: Vec<String> = reader.iter().map(|event| event.data.clone()).collect();
586
587 let observed = (reader.is_empty(), collected);
588
589 assert_eq!(
590 observed,
591 (false, vec!["first".to_string(), "second".to_string()])
592 );
593 }
594
595 #[test]
596 fn stream_writes_accepts_duplicate_stream_with_same_expected_version() {
597 let stream_id = StreamId::try_new("duplicate-stream-same-version".to_string())
598 .expect("valid stream id");
599
600 let first_event = TestEvent {
601 stream_id: stream_id.clone(),
602 data: "first-event".to_string(),
603 };
604
605 let second_event = TestEvent {
606 stream_id: stream_id.clone(),
607 data: "second-event".to_string(),
608 };
609
610 let writes_result = StreamWrites::new()
611 .register_stream(stream_id.clone(), StreamVersion::new(0))
612 .and_then(|writes| writes.append(first_event))
613 .and_then(|writes| writes.append(second_event));
614
615 assert!(writes_result.is_ok());
616 }
617
618 #[test]
619 fn stream_writes_rejects_duplicate_stream_with_conflicting_expected_versions() {
620 let stream_id =
621 StreamId::try_new("duplicate-stream-conflict".to_string()).expect("valid stream id");
622
623 let first_event = TestEvent {
624 stream_id: stream_id.clone(),
625 data: "first-event-conflict".to_string(),
626 };
627
628 let second_event = TestEvent {
629 stream_id: stream_id.clone(),
630 data: "second-event-conflict".to_string(),
631 };
632
633 let conflict = StreamWrites::new()
634 .register_stream(stream_id.clone(), StreamVersion::new(0))
635 .and_then(|writes| writes.append(first_event))
636 .and_then(|writes| writes.register_stream(stream_id.clone(), StreamVersion::new(1)))
637 .and_then(|writes| writes.append(second_event));
638
639 let message = conflict.unwrap_err().to_string();
640
641 assert_eq!(
642 message,
643 "conflicting expected versions for stream duplicate-stream-conflict: first=0, second=1"
644 );
645 }
646
647 #[tokio::test]
648 async fn stream_writes_registers_stream_before_appending_multiple_events() {
649 let store = InMemoryEventStore::new();
650 let stream_id =
651 StreamId::try_new("registered-stream".to_string()).expect("valid stream id");
652
653 let first_event = TestEvent {
654 stream_id: stream_id.clone(),
655 data: "first-registered-event".to_string(),
656 };
657
658 let second_event = TestEvent {
659 stream_id: stream_id.clone(),
660 data: "second-registered-event".to_string(),
661 };
662
663 let writes = StreamWrites::new()
664 .register_stream(stream_id.clone(), StreamVersion::new(0))
665 .and_then(|writes| writes.append(first_event))
666 .and_then(|writes| writes.append(second_event))
667 .expect("registered stream should accept events");
668
669 let result = store.append_events(writes).await;
670
671 assert!(
672 result.is_ok(),
673 "append should succeed when stream registered before events"
674 );
675 }
676
677 #[test]
678 fn stream_writes_rejects_appends_for_unregistered_streams() {
679 let stream_id =
680 StreamId::try_new("unregistered-stream".to_string()).expect("valid stream id");
681
682 let event = TestEvent {
683 stream_id: stream_id.clone(),
684 data: "unregistered-event".to_string(),
685 };
686
687 let error = StreamWrites::new()
688 .append(event)
689 .expect_err("append without prior registration should fail");
690
691 assert!(matches!(
692 error,
693 EventStoreError::UndeclaredStream { stream_id: ref actual } if *actual == stream_id
694 ));
695 }
696
697 #[test]
698 fn expected_versions_returns_registered_streams_and_versions() {
699 let stream_a = StreamId::try_new("stream-a").expect("valid stream id");
700 let stream_b = StreamId::try_new("stream-b").expect("valid stream id");
701
702 let writes = StreamWrites::new()
703 .register_stream(stream_a.clone(), StreamVersion::new(0))
704 .and_then(|w| w.register_stream(stream_b.clone(), StreamVersion::new(5)))
705 .expect("registration should succeed");
706
707 let versions = writes.expected_versions();
708
709 assert_eq!(versions.len(), 2);
710 assert_eq!(versions.get(&stream_a), Some(&StreamVersion::new(0)));
711 assert_eq!(versions.get(&stream_b), Some(&StreamVersion::new(5)));
712 }
713
714 #[test]
715 fn stream_id_rejects_asterisk_metacharacter() {
716 let result = StreamId::try_new("account-*");
717 assert!(
718 result.is_err(),
719 "StreamId should reject asterisk glob metacharacter"
720 );
721 }
722
723 #[test]
724 fn stream_id_rejects_question_mark_metacharacter() {
725 let result = StreamId::try_new("account-?");
726 assert!(
727 result.is_err(),
728 "StreamId should reject question mark glob metacharacter"
729 );
730 }
731
732 #[test]
733 fn stream_id_rejects_open_bracket_metacharacter() {
734 let result = StreamId::try_new("account-[");
735 assert!(
736 result.is_err(),
737 "StreamId should reject open bracket glob metacharacter"
738 );
739 }
740
741 #[test]
742 fn stream_id_rejects_close_bracket_metacharacter() {
743 let result = StreamId::try_new("account-]");
744 assert!(
745 result.is_err(),
746 "StreamId should reject close bracket glob metacharacter"
747 );
748 }
749
750 #[tokio::test]
751 async fn event_reader_after_position_excludes_event_at_position() {
752 let store = InMemoryEventStore::new();
754 let stream_id = StreamId::try_new("reader-test").expect("valid stream id");
755
756 let event1 = TestEvent {
757 stream_id: stream_id.clone(),
758 data: "first".to_string(),
759 };
760 let event2 = TestEvent {
761 stream_id: stream_id.clone(),
762 data: "second".to_string(),
763 };
764 let event3 = TestEvent {
765 stream_id: stream_id.clone(),
766 data: "third".to_string(),
767 };
768
769 let writes = StreamWrites::new()
770 .register_stream(stream_id.clone(), StreamVersion::new(0))
771 .and_then(|w| w.append(event1))
772 .and_then(|w| w.append(event2))
773 .and_then(|w| w.append(event3))
774 .expect("append should succeed");
775
776 let _ = store
777 .append_events(writes)
778 .await
779 .expect("append to succeed");
780
781 let all_events = store
783 .read_events::<TestEvent>(EventFilter::all(), EventPage::first(BatchSize::new(100)))
784 .await
785 .expect("read all events to succeed");
786
787 assert_eq!(all_events.len(), 3, "Should have 3 events total");
788 let (first_event, first_position) = &all_events[0];
789
790 let page = EventPage::after(*first_position, BatchSize::new(100));
792 let filter = EventFilter::all();
793 let events = store
794 .read_events::<TestEvent>(filter, page)
795 .await
796 .expect("read to succeed");
797
798 assert_eq!(events.len(), 2, "Should get 2 events after first position");
800 assert_eq!(
801 events[0].0.data, "second",
802 "First returned event should be 'second'"
803 );
804 assert_eq!(
805 events[1].0.data, "third",
806 "Second returned event should be 'third'"
807 );
808
809 for (event, _pos) in &events {
811 assert_ne!(
812 event.data, first_event.data,
813 "First event should be excluded"
814 );
815 }
816
817 for (_event, pos) in &events {
819 assert!(
820 *pos > *first_position,
821 "Returned position {} should be > first position {}",
822 pos,
823 first_position
824 );
825 }
826 }
827
828 #[tokio::test]
829 async fn in_memory_event_store_implements_checkpoint_store() {
830 let store = InMemoryEventStore::new();
832
833 let position = StreamPosition::new(Uuid::now_v7());
835 CheckpointStore::save(&store, "test-projector", position)
836 .await
837 .expect("save should succeed");
838
839 let loaded = CheckpointStore::load(&store, "test-projector")
841 .await
842 .expect("load should succeed");
843 assert_eq!(loaded, Some(position));
844 }
845
846 #[tokio::test]
847 async fn in_memory_event_store_implements_projector_coordinator() {
848 let store = InMemoryEventStore::new();
850
851 let guard = ProjectorCoordinator::try_acquire(&store, "test-projector").await;
853
854 assert!(guard.is_ok(), "should acquire leadership");
856 }
857}