1use std::collections::HashMap;
7use std::sync::{Arc, RwLock};
8
9use eventcore_types::{
10 CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
11 EventStreamReader, EventStreamSlice, Operation, StreamId, StreamPosition, StreamVersion,
12 StreamWriteEntry, StreamWrites,
13};
14use uuid::Uuid;
15
16type StreamData = (Vec<Box<dyn std::any::Any + Send>>, StreamVersion);
36
37#[derive(Debug, Clone)]
45struct GlobalLogEntry {
46 event_id: Uuid,
48 stream_id: String,
50 event_data: serde_json::Value,
52}
53
54struct StoreData {
56 streams: HashMap<StreamId, StreamData>,
57 global_log: Vec<GlobalLogEntry>,
59}
60
61pub struct InMemoryEventStore {
62 data: std::sync::Mutex<StoreData>,
63}
64
65impl InMemoryEventStore {
66 pub fn new() -> Self {
71 Self {
72 data: std::sync::Mutex::new(StoreData {
73 streams: HashMap::new(),
74 global_log: Vec::new(),
75 }),
76 }
77 }
78}
79
80impl Default for InMemoryEventStore {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl EventStore for InMemoryEventStore {
87 async fn read_stream<E: Event>(
88 &self,
89 stream_id: StreamId,
90 ) -> Result<EventStreamReader<E>, EventStoreError> {
91 let data = self
92 .data
93 .lock()
94 .map_err(|_| EventStoreError::StoreFailure {
95 operation: Operation::ReadStream,
96 })?;
97 let events = data
98 .streams
99 .get(&stream_id)
100 .map(|(boxed_events, _version)| {
101 boxed_events
102 .iter()
103 .filter_map(|boxed| boxed.downcast_ref::<E>())
104 .cloned()
105 .collect()
106 })
107 .unwrap_or_default();
108
109 Ok(EventStreamReader::new(events))
110 }
111
112 async fn append_events(
113 &self,
114 writes: StreamWrites,
115 ) -> Result<EventStreamSlice, EventStoreError> {
116 let mut data = self
117 .data
118 .lock()
119 .map_err(|_| EventStoreError::StoreFailure {
120 operation: Operation::AppendEvents,
121 })?;
122 let expected_versions = writes.expected_versions().clone();
123
124 for (stream_id, expected_version) in &expected_versions {
126 let current_version = data
127 .streams
128 .get(stream_id)
129 .map(|(_events, version)| *version)
130 .unwrap_or_else(|| StreamVersion::new(0));
131
132 if current_version != *expected_version {
133 return Err(EventStoreError::VersionConflict);
134 }
135 }
136
137 for entry in writes.into_entries() {
139 let StreamWriteEntry {
140 stream_id,
141 event,
142 event_type: _,
143 event_data,
144 } = entry;
145
146 let event_id = Uuid::now_v7();
148
149 data.global_log.push(GlobalLogEntry {
151 event_id,
152 stream_id: stream_id.as_ref().to_string(),
153 event_data,
154 });
155
156 let (events, version) = data
157 .streams
158 .entry(stream_id)
159 .or_insert_with(|| (Vec::new(), StreamVersion::new(0)));
160 events.push(event);
161 *version = version.increment();
162 }
163
164 Ok(EventStreamSlice)
165 }
166}
167
168impl EventReader for InMemoryEventStore {
169 type Error = EventStoreError;
170
171 async fn read_events<E: Event>(
172 &self,
173 filter: EventFilter,
174 page: EventPage,
175 ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
176 let data = self
177 .data
178 .lock()
179 .map_err(|_| EventStoreError::StoreFailure {
180 operation: Operation::ReadStream,
181 })?;
182
183 let after_event_id = page.after_position().map(|p| p.into_inner());
184
185 let events: Vec<(E, StreamPosition)> = data
186 .global_log
187 .iter()
188 .filter(|entry| {
189 match after_event_id {
191 None => true,
192 Some(after_id) => entry.event_id > after_id,
193 }
194 })
195 .filter(|entry| {
196 match filter.stream_prefix() {
198 None => true,
199 Some(prefix) => entry.stream_id.starts_with(prefix.as_ref()),
200 }
201 })
202 .take(page.limit().into_inner())
203 .filter_map(|entry| {
204 serde_json::from_value::<E>(entry.event_data.clone())
205 .ok()
206 .map(|e| (e, StreamPosition::new(entry.event_id)))
207 })
208 .collect();
209
210 Ok(events)
211 }
212}
213
214#[derive(Debug, Clone, Default)]
232pub struct InMemoryCheckpointStore {
233 checkpoints: Arc<RwLock<HashMap<String, StreamPosition>>>,
234}
235
236impl InMemoryCheckpointStore {
237 pub fn new() -> Self {
239 Self::default()
240 }
241}
242
243#[derive(Debug, Clone)]
248pub struct InMemoryCheckpointError {
249 message: String,
250}
251
252impl std::fmt::Display for InMemoryCheckpointError {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 write!(f, "{}", self.message)
255 }
256}
257
258impl std::error::Error for InMemoryCheckpointError {}
259
260impl CheckpointStore for InMemoryCheckpointStore {
261 type Error = InMemoryCheckpointError;
262
263 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
264 let guard = self
265 .checkpoints
266 .read()
267 .map_err(|e| InMemoryCheckpointError {
268 message: format!("failed to acquire read lock: {}", e),
269 })?;
270 Ok(guard.get(name).copied())
271 }
272
273 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
274 let mut guard = self
275 .checkpoints
276 .write()
277 .map_err(|e| InMemoryCheckpointError {
278 message: format!("failed to acquire write lock: {}", e),
279 })?;
280 let _ = guard.insert(name.to_string(), position);
281 Ok(())
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288 use eventcore_types::{BatchSize, EventFilter, EventPage};
289 use serde::{Deserialize, Serialize};
290
291 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
293 struct TestEvent {
294 stream_id: StreamId,
295 data: String,
296 }
297
298 impl Event for TestEvent {
299 fn stream_id(&self) -> &StreamId {
300 &self.stream_id
301 }
302 }
303
304 #[tokio::test]
315 async fn test_append_and_read_single_event() {
316 let store = InMemoryEventStore::new();
318
319 let stream_id = StreamId::try_new("test-stream-123".to_string()).expect("valid stream id");
321
322 let event = TestEvent {
324 stream_id: stream_id.clone(),
325 data: "test event data".to_string(),
326 };
327
328 let writes = StreamWrites::new()
330 .register_stream(stream_id.clone(), StreamVersion::new(0))
331 .and_then(|writes| writes.append(event.clone()))
332 .expect("append should succeed");
333
334 let _ = store
336 .append_events(writes)
337 .await
338 .expect("append to succeed");
339
340 let reader = store
341 .read_stream::<TestEvent>(stream_id)
342 .await
343 .expect("read to succeed");
344
345 let observed = (
346 reader.is_empty(),
347 reader.len(),
348 reader.iter().next().is_none(),
349 );
350
351 assert_eq!(observed, (false, 1usize, false));
352 }
353
354 #[tokio::test]
355 async fn event_stream_reader_is_empty_reflects_stream_population() {
356 let store = InMemoryEventStore::new();
357 let stream_id =
358 StreamId::try_new("is-empty-observation".to_string()).expect("valid stream id");
359
360 let initial_reader = store
361 .read_stream::<TestEvent>(stream_id.clone())
362 .await
363 .expect("initial read to succeed");
364
365 let event = TestEvent {
366 stream_id: stream_id.clone(),
367 data: "populated event".to_string(),
368 };
369
370 let writes = StreamWrites::new()
371 .register_stream(stream_id.clone(), StreamVersion::new(0))
372 .and_then(|writes| writes.append(event))
373 .expect("append should succeed");
374
375 let _ = store
376 .append_events(writes)
377 .await
378 .expect("append to succeed");
379
380 let populated_reader = store
381 .read_stream::<TestEvent>(stream_id)
382 .await
383 .expect("populated read to succeed");
384
385 let observed = (
386 initial_reader.is_empty(),
387 initial_reader.len(),
388 populated_reader.is_empty(),
389 populated_reader.len(),
390 );
391
392 assert_eq!(observed, (true, 0usize, false, 1usize));
393 }
394
395 #[tokio::test]
396 async fn read_stream_iterates_through_events_in_order() {
397 let store = InMemoryEventStore::new();
398 let stream_id = StreamId::try_new("ordered-stream".to_string()).expect("valid stream id");
399
400 let first_event = TestEvent {
401 stream_id: stream_id.clone(),
402 data: "first".to_string(),
403 };
404
405 let second_event = TestEvent {
406 stream_id: stream_id.clone(),
407 data: "second".to_string(),
408 };
409
410 let writes = StreamWrites::new()
411 .register_stream(stream_id.clone(), StreamVersion::new(0))
412 .and_then(|writes| writes.append(first_event))
413 .and_then(|writes| writes.append(second_event))
414 .expect("append chain should succeed");
415
416 let _ = store
417 .append_events(writes)
418 .await
419 .expect("append to succeed");
420
421 let reader = store
422 .read_stream::<TestEvent>(stream_id)
423 .await
424 .expect("read to succeed");
425
426 let collected: Vec<String> = reader.iter().map(|event| event.data.clone()).collect();
427
428 let observed = (reader.is_empty(), collected);
429
430 assert_eq!(
431 observed,
432 (false, vec!["first".to_string(), "second".to_string()])
433 );
434 }
435
436 #[test]
437 fn stream_writes_accepts_duplicate_stream_with_same_expected_version() {
438 let stream_id = StreamId::try_new("duplicate-stream-same-version".to_string())
439 .expect("valid stream id");
440
441 let first_event = TestEvent {
442 stream_id: stream_id.clone(),
443 data: "first-event".to_string(),
444 };
445
446 let second_event = TestEvent {
447 stream_id: stream_id.clone(),
448 data: "second-event".to_string(),
449 };
450
451 let writes_result = StreamWrites::new()
452 .register_stream(stream_id.clone(), StreamVersion::new(0))
453 .and_then(|writes| writes.append(first_event))
454 .and_then(|writes| writes.append(second_event));
455
456 assert!(writes_result.is_ok());
457 }
458
459 #[test]
460 fn stream_writes_rejects_duplicate_stream_with_conflicting_expected_versions() {
461 let stream_id =
462 StreamId::try_new("duplicate-stream-conflict".to_string()).expect("valid stream id");
463
464 let first_event = TestEvent {
465 stream_id: stream_id.clone(),
466 data: "first-event-conflict".to_string(),
467 };
468
469 let second_event = TestEvent {
470 stream_id: stream_id.clone(),
471 data: "second-event-conflict".to_string(),
472 };
473
474 let conflict = StreamWrites::new()
475 .register_stream(stream_id.clone(), StreamVersion::new(0))
476 .and_then(|writes| writes.append(first_event))
477 .and_then(|writes| writes.register_stream(stream_id.clone(), StreamVersion::new(1)))
478 .and_then(|writes| writes.append(second_event));
479
480 let message = conflict.unwrap_err().to_string();
481
482 assert_eq!(
483 message,
484 "conflicting expected versions for stream duplicate-stream-conflict: first=0, second=1"
485 );
486 }
487
488 #[tokio::test]
489 async fn stream_writes_registers_stream_before_appending_multiple_events() {
490 let store = InMemoryEventStore::new();
491 let stream_id =
492 StreamId::try_new("registered-stream".to_string()).expect("valid stream id");
493
494 let first_event = TestEvent {
495 stream_id: stream_id.clone(),
496 data: "first-registered-event".to_string(),
497 };
498
499 let second_event = TestEvent {
500 stream_id: stream_id.clone(),
501 data: "second-registered-event".to_string(),
502 };
503
504 let writes = StreamWrites::new()
505 .register_stream(stream_id.clone(), StreamVersion::new(0))
506 .and_then(|writes| writes.append(first_event))
507 .and_then(|writes| writes.append(second_event))
508 .expect("registered stream should accept events");
509
510 let result = store.append_events(writes).await;
511
512 assert!(
513 result.is_ok(),
514 "append should succeed when stream registered before events"
515 );
516 }
517
518 #[test]
519 fn stream_writes_rejects_appends_for_unregistered_streams() {
520 let stream_id =
521 StreamId::try_new("unregistered-stream".to_string()).expect("valid stream id");
522
523 let event = TestEvent {
524 stream_id: stream_id.clone(),
525 data: "unregistered-event".to_string(),
526 };
527
528 let error = StreamWrites::new()
529 .append(event)
530 .expect_err("append without prior registration should fail");
531
532 assert!(matches!(
533 error,
534 EventStoreError::UndeclaredStream { stream_id: ref actual } if *actual == stream_id
535 ));
536 }
537
538 #[test]
539 fn expected_versions_returns_registered_streams_and_versions() {
540 let stream_a = StreamId::try_new("stream-a").expect("valid stream id");
541 let stream_b = StreamId::try_new("stream-b").expect("valid stream id");
542
543 let writes = StreamWrites::new()
544 .register_stream(stream_a.clone(), StreamVersion::new(0))
545 .and_then(|w| w.register_stream(stream_b.clone(), StreamVersion::new(5)))
546 .expect("registration should succeed");
547
548 let versions = writes.expected_versions();
549
550 assert_eq!(versions.len(), 2);
551 assert_eq!(versions.get(&stream_a), Some(&StreamVersion::new(0)));
552 assert_eq!(versions.get(&stream_b), Some(&StreamVersion::new(5)));
553 }
554
555 #[test]
556 fn stream_id_rejects_asterisk_metacharacter() {
557 let result = StreamId::try_new("account-*");
558 assert!(
559 result.is_err(),
560 "StreamId should reject asterisk glob metacharacter"
561 );
562 }
563
564 #[test]
565 fn stream_id_rejects_question_mark_metacharacter() {
566 let result = StreamId::try_new("account-?");
567 assert!(
568 result.is_err(),
569 "StreamId should reject question mark glob metacharacter"
570 );
571 }
572
573 #[test]
574 fn stream_id_rejects_open_bracket_metacharacter() {
575 let result = StreamId::try_new("account-[");
576 assert!(
577 result.is_err(),
578 "StreamId should reject open bracket glob metacharacter"
579 );
580 }
581
582 #[test]
583 fn stream_id_rejects_close_bracket_metacharacter() {
584 let result = StreamId::try_new("account-]");
585 assert!(
586 result.is_err(),
587 "StreamId should reject close bracket glob metacharacter"
588 );
589 }
590
591 #[tokio::test]
592 async fn event_reader_after_position_excludes_event_at_position() {
593 let store = InMemoryEventStore::new();
595 let stream_id = StreamId::try_new("reader-test").expect("valid stream id");
596
597 let event1 = TestEvent {
598 stream_id: stream_id.clone(),
599 data: "first".to_string(),
600 };
601 let event2 = TestEvent {
602 stream_id: stream_id.clone(),
603 data: "second".to_string(),
604 };
605 let event3 = TestEvent {
606 stream_id: stream_id.clone(),
607 data: "third".to_string(),
608 };
609
610 let writes = StreamWrites::new()
611 .register_stream(stream_id.clone(), StreamVersion::new(0))
612 .and_then(|w| w.append(event1))
613 .and_then(|w| w.append(event2))
614 .and_then(|w| w.append(event3))
615 .expect("append should succeed");
616
617 store
618 .append_events(writes)
619 .await
620 .expect("append to succeed");
621
622 let all_events = store
624 .read_events::<TestEvent>(EventFilter::all(), EventPage::first(BatchSize::new(100)))
625 .await
626 .expect("read all events to succeed");
627
628 assert_eq!(all_events.len(), 3, "Should have 3 events total");
629 let (first_event, first_position) = &all_events[0];
630
631 let page = EventPage::after(*first_position, BatchSize::new(100));
633 let filter = EventFilter::all();
634 let events = store
635 .read_events::<TestEvent>(filter, page)
636 .await
637 .expect("read to succeed");
638
639 assert_eq!(events.len(), 2, "Should get 2 events after first position");
641 assert_eq!(
642 events[0].0.data, "second",
643 "First returned event should be 'second'"
644 );
645 assert_eq!(
646 events[1].0.data, "third",
647 "Second returned event should be 'third'"
648 );
649
650 for (event, _pos) in &events {
652 assert_ne!(
653 event.data, first_event.data,
654 "First event should be excluded"
655 );
656 }
657
658 for (_event, pos) in &events {
660 assert!(
661 *pos > *first_position,
662 "Returned position {} should be > first position {}",
663 pos,
664 first_position
665 );
666 }
667 }
668}