1use std::collections::HashMap;
7use std::sync::{Arc, RwLock};
8
9use eventcore_types::{
10 CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
11 EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
12 StreamVersion, StreamWriteEntry, StreamWrites,
13};
14use uuid::Uuid;
15
16type StreamData = (Vec<Box<dyn std::any::Any + Send>>, StreamVersion);
17
18#[derive(Debug, Clone)]
26struct GlobalLogEntry {
27 event_id: Uuid,
29 stream_id: String,
31 event_data: serde_json::Value,
33}
34
35struct StoreData {
37 streams: HashMap<StreamId, StreamData>,
38 global_log: Vec<GlobalLogEntry>,
40 checkpoints: HashMap<String, StreamPosition>,
42 locks: Arc<RwLock<HashMap<String, ()>>>,
44}
45
46pub struct InMemoryEventStore {
67 data: std::sync::Mutex<StoreData>,
68}
69
70impl InMemoryEventStore {
71 pub fn new() -> Self {
76 Self {
77 data: std::sync::Mutex::new(StoreData {
78 streams: HashMap::new(),
79 global_log: Vec::new(),
80 checkpoints: HashMap::new(),
81 locks: Arc::new(RwLock::new(HashMap::new())),
82 }),
83 }
84 }
85}
86
87impl Default for InMemoryEventStore {
88 fn default() -> Self {
89 Self::new()
90 }
91}
92
93impl EventStore for InMemoryEventStore {
94 async fn read_stream<E: Event>(
95 &self,
96 stream_id: StreamId,
97 ) -> Result<EventStreamReader<E>, EventStoreError> {
98 let data = self
99 .data
100 .lock()
101 .map_err(|_| EventStoreError::StoreFailure {
102 operation: Operation::ReadStream,
103 })?;
104 let events = match data.streams.get(&stream_id) {
105 None => Vec::new(),
106 Some((boxed_events, _version)) => {
107 let mut events = Vec::with_capacity(boxed_events.len());
108 for boxed in boxed_events {
109 match boxed.downcast_ref::<E>() {
110 Some(event) => events.push(event.clone()),
111 None => {
112 return Err(EventStoreError::DeserializationFailed {
113 stream_id,
114 detail: format!(
115 "event could not be downcast to {}",
116 std::any::type_name::<E>()
117 ),
118 });
119 }
120 }
121 }
122 events
123 }
124 };
125
126 Ok(EventStreamReader::new(events))
127 }
128
129 async fn append_events(
130 &self,
131 writes: StreamWrites,
132 ) -> Result<EventStreamSlice, EventStoreError> {
133 let mut data = self
134 .data
135 .lock()
136 .map_err(|_| EventStoreError::StoreFailure {
137 operation: Operation::AppendEvents,
138 })?;
139 let expected_versions = writes.expected_versions().clone();
140
141 for (stream_id, expected_version) in &expected_versions {
143 let current_version = data
144 .streams
145 .get(stream_id)
146 .map(|(_events, version)| *version)
147 .unwrap_or_else(|| StreamVersion::new(0));
148
149 if current_version != *expected_version {
150 return Err(EventStoreError::VersionConflict {
151 stream_id: stream_id.clone(),
152 expected: *expected_version,
153 actual: current_version,
154 });
155 }
156 }
157
158 for entry in writes.into_entries() {
160 let StreamWriteEntry {
161 stream_id,
162 event,
163 event_type: _,
164 event_data,
165 } = entry;
166
167 let event_id = Uuid::now_v7();
169
170 data.global_log.push(GlobalLogEntry {
172 event_id,
173 stream_id: stream_id.as_ref().to_string(),
174 event_data,
175 });
176
177 let (events, version) = data
178 .streams
179 .entry(stream_id)
180 .or_insert_with(|| (Vec::new(), StreamVersion::new(0)));
181 events.push(event);
182 *version = version.increment();
183 }
184
185 Ok(EventStreamSlice)
186 }
187}
188
189impl EventReader for InMemoryEventStore {
190 type Error = EventStoreError;
191
192 async fn read_events<E: Event>(
193 &self,
194 filter: EventFilter,
195 page: EventPage,
196 ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
197 let data = self
198 .data
199 .lock()
200 .map_err(|_| EventStoreError::StoreFailure {
201 operation: Operation::ReadStream,
202 })?;
203
204 let after_event_id = page.after_position().map(|p| p.into_inner());
205
206 let events: Vec<(E, StreamPosition)> = data
207 .global_log
208 .iter()
209 .filter(|entry| {
210 match after_event_id {
212 None => true,
213 Some(after_id) => entry.event_id > after_id,
214 }
215 })
216 .filter(|entry| {
217 match filter.stream_prefix() {
219 None => true,
220 Some(prefix) => entry.stream_id.starts_with(prefix.as_ref()),
221 }
222 })
223 .take(page.limit().into_inner())
224 .filter_map(|entry| {
225 serde_json::from_value::<E>(entry.event_data.clone())
226 .ok()
227 .map(|e| (e, StreamPosition::new(entry.event_id)))
228 })
229 .collect();
230
231 Ok(events)
232 }
233}
234
235impl CheckpointStore for InMemoryEventStore {
236 type Error = InMemoryCheckpointError;
237
238 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
239 let data = self
240 .data
241 .lock()
242 .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
243 Ok(data.checkpoints.get(name).copied())
244 }
245
246 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
247 let mut data = self
248 .data
249 .lock()
250 .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
251 let _ = data.checkpoints.insert(name.to_string(), position);
252 Ok(())
253 }
254}
255
256impl ProjectorCoordinator for InMemoryEventStore {
257 type Error = InMemoryCoordinationError;
258 type Guard = InMemoryCoordinationGuard;
259
260 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
261 let data = self
262 .data
263 .lock()
264 .map_err(|e| InMemoryCoordinationError::LockPoisoned {
265 message: e.to_string(),
266 })?;
267
268 let mut guard =
269 data.locks
270 .write()
271 .map_err(|e| InMemoryCoordinationError::LockPoisoned {
272 message: e.to_string(),
273 })?;
274
275 if guard.contains_key(subscription_name) {
276 return Err(InMemoryCoordinationError::LeadershipNotAcquired {
277 subscription_name: subscription_name.to_string(),
278 });
279 }
280
281 let _ = guard.insert(subscription_name.to_string(), ());
282
283 Ok(InMemoryCoordinationGuard {
284 subscription_name: subscription_name.to_string(),
285 locks: Arc::clone(&data.locks),
286 })
287 }
288}
289
290#[derive(Debug, Clone, Default)]
308pub struct InMemoryCheckpointStore {
309 checkpoints: Arc<RwLock<HashMap<String, StreamPosition>>>,
310}
311
312impl InMemoryCheckpointStore {
313 pub fn new() -> Self {
315 Self::default()
316 }
317}
318
319#[derive(Debug, Clone, thiserror::Error)]
324pub enum InMemoryCheckpointError {
325 #[error("failed to acquire lock: {0}")]
326 LockFailed(String),
327}
328
329#[derive(Debug, Clone, thiserror::Error)]
331pub enum InMemoryCoordinationError {
332 #[error(
334 "leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
335 )]
336 LeadershipNotAcquired { subscription_name: String },
337 #[error("lock poisoned: {message}")]
339 LockPoisoned { message: String },
340}
341
342#[derive(Debug)]
344pub struct InMemoryCoordinationGuard {
345 subscription_name: String,
346 locks: Arc<RwLock<HashMap<String, ()>>>,
347}
348
349impl Drop for InMemoryCoordinationGuard {
350 fn drop(&mut self) {
351 if let Ok(mut guard) = self.locks.write() {
352 let _ = guard.remove(&self.subscription_name);
353 } else {
354 tracing::error!(
355 subscription_name = %self.subscription_name,
356 "failed to release coordination lock: RwLock poisoned"
357 );
358 }
359 }
360}
361
362#[derive(Debug, Clone, Default)]
371pub struct InMemoryProjectorCoordinator {
372 locks: Arc<RwLock<HashMap<String, ()>>>,
373}
374
375impl InMemoryProjectorCoordinator {
376 pub fn new() -> Self {
378 Self::default()
379 }
380}
381
382impl ProjectorCoordinator for InMemoryProjectorCoordinator {
383 type Error = InMemoryCoordinationError;
384 type Guard = InMemoryCoordinationGuard;
385
386 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
387 let mut guard =
388 self.locks
389 .write()
390 .map_err(|e| InMemoryCoordinationError::LockPoisoned {
391 message: e.to_string(),
392 })?;
393
394 if guard.contains_key(subscription_name) {
395 return Err(InMemoryCoordinationError::LeadershipNotAcquired {
396 subscription_name: subscription_name.to_string(),
397 });
398 }
399
400 let _ = guard.insert(subscription_name.to_string(), ());
401
402 Ok(InMemoryCoordinationGuard {
403 subscription_name: subscription_name.to_string(),
404 locks: Arc::clone(&self.locks),
405 })
406 }
407}
408
409impl CheckpointStore for InMemoryCheckpointStore {
410 type Error = InMemoryCheckpointError;
411
412 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
413 let guard = self
414 .checkpoints
415 .read()
416 .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
417 Ok(guard.get(name).copied())
418 }
419
420 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
421 let mut guard = self
422 .checkpoints
423 .write()
424 .map_err(|e| InMemoryCheckpointError::LockFailed(e.to_string()))?;
425 let _ = guard.insert(name.to_string(), position);
426 Ok(())
427 }
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433 use eventcore_types::{BatchSize, EventFilter, EventPage};
434 use serde::{Deserialize, Serialize};
435
436 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
438 struct TestEvent {
439 stream_id: StreamId,
440 data: String,
441 }
442
443 impl Event for TestEvent {
444 fn stream_id(&self) -> &StreamId {
445 &self.stream_id
446 }
447
448 fn event_type_name() -> &'static str {
449 "TestEvent"
450 }
451 }
452
453 #[tokio::test]
464 async fn test_append_and_read_single_event() {
465 let store = InMemoryEventStore::new();
467
468 let stream_id = StreamId::try_new("test-stream-123".to_string()).expect("valid stream id");
470
471 let event = TestEvent {
473 stream_id: stream_id.clone(),
474 data: "test event data".to_string(),
475 };
476
477 let writes = StreamWrites::new()
479 .register_stream(stream_id.clone(), StreamVersion::new(0))
480 .and_then(|writes| writes.append(event.clone()))
481 .expect("append should succeed");
482
483 let _ = store
485 .append_events(writes)
486 .await
487 .expect("append to succeed");
488
489 let reader = store
490 .read_stream::<TestEvent>(stream_id)
491 .await
492 .expect("read to succeed");
493
494 let observed = (
495 reader.is_empty(),
496 reader.len(),
497 reader.iter().next().is_none(),
498 );
499
500 assert_eq!(observed, (false, 1usize, false));
501 }
502
503 #[tokio::test]
504 async fn event_stream_reader_is_empty_reflects_stream_population() {
505 let store = InMemoryEventStore::new();
506 let stream_id =
507 StreamId::try_new("is-empty-observation".to_string()).expect("valid stream id");
508
509 let initial_reader = store
510 .read_stream::<TestEvent>(stream_id.clone())
511 .await
512 .expect("initial read to succeed");
513
514 let event = TestEvent {
515 stream_id: stream_id.clone(),
516 data: "populated event".to_string(),
517 };
518
519 let writes = StreamWrites::new()
520 .register_stream(stream_id.clone(), StreamVersion::new(0))
521 .and_then(|writes| writes.append(event))
522 .expect("append should succeed");
523
524 let _ = store
525 .append_events(writes)
526 .await
527 .expect("append to succeed");
528
529 let populated_reader = store
530 .read_stream::<TestEvent>(stream_id)
531 .await
532 .expect("populated read to succeed");
533
534 let observed = (
535 initial_reader.is_empty(),
536 initial_reader.len(),
537 populated_reader.is_empty(),
538 populated_reader.len(),
539 );
540
541 assert_eq!(observed, (true, 0usize, false, 1usize));
542 }
543
544 #[tokio::test]
545 async fn read_stream_iterates_through_events_in_order() {
546 let store = InMemoryEventStore::new();
547 let stream_id = StreamId::try_new("ordered-stream".to_string()).expect("valid stream id");
548
549 let first_event = TestEvent {
550 stream_id: stream_id.clone(),
551 data: "first".to_string(),
552 };
553
554 let second_event = TestEvent {
555 stream_id: stream_id.clone(),
556 data: "second".to_string(),
557 };
558
559 let writes = StreamWrites::new()
560 .register_stream(stream_id.clone(), StreamVersion::new(0))
561 .and_then(|writes| writes.append(first_event))
562 .and_then(|writes| writes.append(second_event))
563 .expect("append chain should succeed");
564
565 let _ = store
566 .append_events(writes)
567 .await
568 .expect("append to succeed");
569
570 let reader = store
571 .read_stream::<TestEvent>(stream_id)
572 .await
573 .expect("read to succeed");
574
575 let collected: Vec<String> = reader.iter().map(|event| event.data.clone()).collect();
576
577 let observed = (reader.is_empty(), collected);
578
579 assert_eq!(
580 observed,
581 (false, vec!["first".to_string(), "second".to_string()])
582 );
583 }
584
585 #[test]
586 fn stream_writes_accepts_duplicate_stream_with_same_expected_version() {
587 let stream_id = StreamId::try_new("duplicate-stream-same-version".to_string())
588 .expect("valid stream id");
589
590 let first_event = TestEvent {
591 stream_id: stream_id.clone(),
592 data: "first-event".to_string(),
593 };
594
595 let second_event = TestEvent {
596 stream_id: stream_id.clone(),
597 data: "second-event".to_string(),
598 };
599
600 let writes_result = StreamWrites::new()
601 .register_stream(stream_id.clone(), StreamVersion::new(0))
602 .and_then(|writes| writes.append(first_event))
603 .and_then(|writes| writes.append(second_event));
604
605 assert!(writes_result.is_ok());
606 }
607
608 #[test]
609 fn stream_writes_rejects_duplicate_stream_with_conflicting_expected_versions() {
610 let stream_id =
611 StreamId::try_new("duplicate-stream-conflict".to_string()).expect("valid stream id");
612
613 let first_event = TestEvent {
614 stream_id: stream_id.clone(),
615 data: "first-event-conflict".to_string(),
616 };
617
618 let second_event = TestEvent {
619 stream_id: stream_id.clone(),
620 data: "second-event-conflict".to_string(),
621 };
622
623 let conflict = StreamWrites::new()
624 .register_stream(stream_id.clone(), StreamVersion::new(0))
625 .and_then(|writes| writes.append(first_event))
626 .and_then(|writes| writes.register_stream(stream_id.clone(), StreamVersion::new(1)))
627 .and_then(|writes| writes.append(second_event));
628
629 let message = conflict.unwrap_err().to_string();
630
631 assert_eq!(
632 message,
633 "conflicting expected versions for stream duplicate-stream-conflict: first=0, second=1"
634 );
635 }
636
637 #[tokio::test]
638 async fn stream_writes_registers_stream_before_appending_multiple_events() {
639 let store = InMemoryEventStore::new();
640 let stream_id =
641 StreamId::try_new("registered-stream".to_string()).expect("valid stream id");
642
643 let first_event = TestEvent {
644 stream_id: stream_id.clone(),
645 data: "first-registered-event".to_string(),
646 };
647
648 let second_event = TestEvent {
649 stream_id: stream_id.clone(),
650 data: "second-registered-event".to_string(),
651 };
652
653 let writes = StreamWrites::new()
654 .register_stream(stream_id.clone(), StreamVersion::new(0))
655 .and_then(|writes| writes.append(first_event))
656 .and_then(|writes| writes.append(second_event))
657 .expect("registered stream should accept events");
658
659 let result = store.append_events(writes).await;
660
661 assert!(
662 result.is_ok(),
663 "append should succeed when stream registered before events"
664 );
665 }
666
667 #[test]
668 fn stream_writes_rejects_appends_for_unregistered_streams() {
669 let stream_id =
670 StreamId::try_new("unregistered-stream".to_string()).expect("valid stream id");
671
672 let event = TestEvent {
673 stream_id: stream_id.clone(),
674 data: "unregistered-event".to_string(),
675 };
676
677 let error = StreamWrites::new()
678 .append(event)
679 .expect_err("append without prior registration should fail");
680
681 assert!(matches!(
682 error,
683 EventStoreError::UndeclaredStream { stream_id: ref actual } if *actual == stream_id
684 ));
685 }
686
687 #[test]
688 fn expected_versions_returns_registered_streams_and_versions() {
689 let stream_a = StreamId::try_new("stream-a").expect("valid stream id");
690 let stream_b = StreamId::try_new("stream-b").expect("valid stream id");
691
692 let writes = StreamWrites::new()
693 .register_stream(stream_a.clone(), StreamVersion::new(0))
694 .and_then(|w| w.register_stream(stream_b.clone(), StreamVersion::new(5)))
695 .expect("registration should succeed");
696
697 let versions = writes.expected_versions();
698
699 assert_eq!(versions.len(), 2);
700 assert_eq!(versions.get(&stream_a), Some(&StreamVersion::new(0)));
701 assert_eq!(versions.get(&stream_b), Some(&StreamVersion::new(5)));
702 }
703
704 #[test]
705 fn stream_id_rejects_asterisk_metacharacter() {
706 let result = StreamId::try_new("account-*");
707 assert!(
708 result.is_err(),
709 "StreamId should reject asterisk glob metacharacter"
710 );
711 }
712
713 #[test]
714 fn stream_id_rejects_question_mark_metacharacter() {
715 let result = StreamId::try_new("account-?");
716 assert!(
717 result.is_err(),
718 "StreamId should reject question mark glob metacharacter"
719 );
720 }
721
722 #[test]
723 fn stream_id_rejects_open_bracket_metacharacter() {
724 let result = StreamId::try_new("account-[");
725 assert!(
726 result.is_err(),
727 "StreamId should reject open bracket glob metacharacter"
728 );
729 }
730
731 #[test]
732 fn stream_id_rejects_close_bracket_metacharacter() {
733 let result = StreamId::try_new("account-]");
734 assert!(
735 result.is_err(),
736 "StreamId should reject close bracket glob metacharacter"
737 );
738 }
739
740 #[tokio::test]
741 async fn event_reader_after_position_excludes_event_at_position() {
742 let store = InMemoryEventStore::new();
744 let stream_id = StreamId::try_new("reader-test").expect("valid stream id");
745
746 let event1 = TestEvent {
747 stream_id: stream_id.clone(),
748 data: "first".to_string(),
749 };
750 let event2 = TestEvent {
751 stream_id: stream_id.clone(),
752 data: "second".to_string(),
753 };
754 let event3 = TestEvent {
755 stream_id: stream_id.clone(),
756 data: "third".to_string(),
757 };
758
759 let writes = StreamWrites::new()
760 .register_stream(stream_id.clone(), StreamVersion::new(0))
761 .and_then(|w| w.append(event1))
762 .and_then(|w| w.append(event2))
763 .and_then(|w| w.append(event3))
764 .expect("append should succeed");
765
766 let _ = store
767 .append_events(writes)
768 .await
769 .expect("append to succeed");
770
771 let all_events = store
773 .read_events::<TestEvent>(EventFilter::all(), EventPage::first(BatchSize::new(100)))
774 .await
775 .expect("read all events to succeed");
776
777 assert_eq!(all_events.len(), 3, "Should have 3 events total");
778 let (first_event, first_position) = &all_events[0];
779
780 let page = EventPage::after(*first_position, BatchSize::new(100));
782 let filter = EventFilter::all();
783 let events = store
784 .read_events::<TestEvent>(filter, page)
785 .await
786 .expect("read to succeed");
787
788 assert_eq!(events.len(), 2, "Should get 2 events after first position");
790 assert_eq!(
791 events[0].0.data, "second",
792 "First returned event should be 'second'"
793 );
794 assert_eq!(
795 events[1].0.data, "third",
796 "Second returned event should be 'third'"
797 );
798
799 for (event, _pos) in &events {
801 assert_ne!(
802 event.data, first_event.data,
803 "First event should be excluded"
804 );
805 }
806
807 for (_event, pos) in &events {
809 assert!(
810 *pos > *first_position,
811 "Returned position {} should be > first position {}",
812 pos,
813 first_position
814 );
815 }
816 }
817
818 #[tokio::test]
819 async fn in_memory_event_store_implements_checkpoint_store() {
820 let store = InMemoryEventStore::new();
822
823 let position = StreamPosition::new(Uuid::now_v7());
825 CheckpointStore::save(&store, "test-projector", position)
826 .await
827 .expect("save should succeed");
828
829 let loaded = CheckpointStore::load(&store, "test-projector")
831 .await
832 .expect("load should succeed");
833 assert_eq!(loaded, Some(position));
834 }
835
836 #[tokio::test]
837 async fn in_memory_event_store_implements_projector_coordinator() {
838 let store = InMemoryEventStore::new();
840
841 let guard = ProjectorCoordinator::try_acquire(&store, "test-projector").await;
843
844 assert!(guard.is_ok(), "should acquire leadership");
846 }
847}