1use std::collections::HashMap;
7
8use eventcore_types::{
9 Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError, EventStreamReader,
10 EventStreamSlice, Operation, StreamId, StreamPosition, StreamVersion, StreamWriteEntry,
11 StreamWrites,
12};
13use uuid::Uuid;
14
15type StreamData = (Vec<Box<dyn std::any::Any + Send>>, StreamVersion);
35
36#[derive(Debug, Clone)]
44struct GlobalLogEntry {
45 event_id: Uuid,
47 stream_id: String,
49 event_data: serde_json::Value,
51}
52
53struct StoreData {
55 streams: HashMap<StreamId, StreamData>,
56 global_log: Vec<GlobalLogEntry>,
58}
59
60pub struct InMemoryEventStore {
61 data: std::sync::Mutex<StoreData>,
62}
63
64impl InMemoryEventStore {
65 pub fn new() -> Self {
70 Self {
71 data: std::sync::Mutex::new(StoreData {
72 streams: HashMap::new(),
73 global_log: Vec::new(),
74 }),
75 }
76 }
77}
78
79impl Default for InMemoryEventStore {
80 fn default() -> Self {
81 Self::new()
82 }
83}
84
85impl EventStore for InMemoryEventStore {
86 async fn read_stream<E: Event>(
87 &self,
88 stream_id: StreamId,
89 ) -> Result<EventStreamReader<E>, EventStoreError> {
90 let data = self
91 .data
92 .lock()
93 .map_err(|_| EventStoreError::StoreFailure {
94 operation: Operation::ReadStream,
95 })?;
96 let events = data
97 .streams
98 .get(&stream_id)
99 .map(|(boxed_events, _version)| {
100 boxed_events
101 .iter()
102 .filter_map(|boxed| boxed.downcast_ref::<E>())
103 .cloned()
104 .collect()
105 })
106 .unwrap_or_default();
107
108 Ok(EventStreamReader::new(events))
109 }
110
111 async fn append_events(
112 &self,
113 writes: StreamWrites,
114 ) -> Result<EventStreamSlice, EventStoreError> {
115 let mut data = self
116 .data
117 .lock()
118 .map_err(|_| EventStoreError::StoreFailure {
119 operation: Operation::AppendEvents,
120 })?;
121 let expected_versions = writes.expected_versions().clone();
122
123 for (stream_id, expected_version) in &expected_versions {
125 let current_version = data
126 .streams
127 .get(stream_id)
128 .map(|(_events, version)| *version)
129 .unwrap_or_else(|| StreamVersion::new(0));
130
131 if current_version != *expected_version {
132 return Err(EventStoreError::VersionConflict);
133 }
134 }
135
136 for entry in writes.into_entries() {
138 let StreamWriteEntry {
139 stream_id,
140 event,
141 event_type: _,
142 event_data,
143 } = entry;
144
145 let event_id = Uuid::now_v7();
147
148 data.global_log.push(GlobalLogEntry {
150 event_id,
151 stream_id: stream_id.as_ref().to_string(),
152 event_data,
153 });
154
155 let (events, version) = data
156 .streams
157 .entry(stream_id)
158 .or_insert_with(|| (Vec::new(), StreamVersion::new(0)));
159 events.push(event);
160 *version = version.increment();
161 }
162
163 Ok(EventStreamSlice)
164 }
165}
166
167impl EventReader for InMemoryEventStore {
168 type Error = EventStoreError;
169
170 async fn read_events<E: Event>(
171 &self,
172 filter: EventFilter,
173 page: EventPage,
174 ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
175 let data = self
176 .data
177 .lock()
178 .map_err(|_| EventStoreError::StoreFailure {
179 operation: Operation::ReadStream,
180 })?;
181
182 let after_event_id = page.after_position().map(|p| p.into_inner());
183
184 let events: Vec<(E, StreamPosition)> = data
185 .global_log
186 .iter()
187 .filter(|entry| {
188 match after_event_id {
190 None => true,
191 Some(after_id) => entry.event_id > after_id,
192 }
193 })
194 .filter(|entry| {
195 match filter.stream_prefix() {
197 None => true,
198 Some(prefix) => entry.stream_id.starts_with(prefix.as_ref()),
199 }
200 })
201 .take(page.limit().into_inner())
202 .filter_map(|entry| {
203 serde_json::from_value::<E>(entry.event_data.clone())
204 .ok()
205 .map(|e| (e, StreamPosition::new(entry.event_id)))
206 })
207 .collect();
208
209 Ok(events)
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216 use eventcore_types::{BatchSize, EventFilter, EventPage};
217 use serde::{Deserialize, Serialize};
218
219 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
221 struct TestEvent {
222 stream_id: StreamId,
223 data: String,
224 }
225
226 impl Event for TestEvent {
227 fn stream_id(&self) -> &StreamId {
228 &self.stream_id
229 }
230 }
231
232 #[tokio::test]
243 async fn test_append_and_read_single_event() {
244 let store = InMemoryEventStore::new();
246
247 let stream_id = StreamId::try_new("test-stream-123".to_string()).expect("valid stream id");
249
250 let event = TestEvent {
252 stream_id: stream_id.clone(),
253 data: "test event data".to_string(),
254 };
255
256 let writes = StreamWrites::new()
258 .register_stream(stream_id.clone(), StreamVersion::new(0))
259 .and_then(|writes| writes.append(event.clone()))
260 .expect("append should succeed");
261
262 let _ = store
264 .append_events(writes)
265 .await
266 .expect("append to succeed");
267
268 let reader = store
269 .read_stream::<TestEvent>(stream_id)
270 .await
271 .expect("read to succeed");
272
273 let observed = (
274 reader.is_empty(),
275 reader.len(),
276 reader.iter().next().is_none(),
277 );
278
279 assert_eq!(observed, (false, 1usize, false));
280 }
281
282 #[tokio::test]
283 async fn event_stream_reader_is_empty_reflects_stream_population() {
284 let store = InMemoryEventStore::new();
285 let stream_id =
286 StreamId::try_new("is-empty-observation".to_string()).expect("valid stream id");
287
288 let initial_reader = store
289 .read_stream::<TestEvent>(stream_id.clone())
290 .await
291 .expect("initial read to succeed");
292
293 let event = TestEvent {
294 stream_id: stream_id.clone(),
295 data: "populated event".to_string(),
296 };
297
298 let writes = StreamWrites::new()
299 .register_stream(stream_id.clone(), StreamVersion::new(0))
300 .and_then(|writes| writes.append(event))
301 .expect("append should succeed");
302
303 let _ = store
304 .append_events(writes)
305 .await
306 .expect("append to succeed");
307
308 let populated_reader = store
309 .read_stream::<TestEvent>(stream_id)
310 .await
311 .expect("populated read to succeed");
312
313 let observed = (
314 initial_reader.is_empty(),
315 initial_reader.len(),
316 populated_reader.is_empty(),
317 populated_reader.len(),
318 );
319
320 assert_eq!(observed, (true, 0usize, false, 1usize));
321 }
322
323 #[tokio::test]
324 async fn read_stream_iterates_through_events_in_order() {
325 let store = InMemoryEventStore::new();
326 let stream_id = StreamId::try_new("ordered-stream".to_string()).expect("valid stream id");
327
328 let first_event = TestEvent {
329 stream_id: stream_id.clone(),
330 data: "first".to_string(),
331 };
332
333 let second_event = TestEvent {
334 stream_id: stream_id.clone(),
335 data: "second".to_string(),
336 };
337
338 let writes = StreamWrites::new()
339 .register_stream(stream_id.clone(), StreamVersion::new(0))
340 .and_then(|writes| writes.append(first_event))
341 .and_then(|writes| writes.append(second_event))
342 .expect("append chain should succeed");
343
344 let _ = store
345 .append_events(writes)
346 .await
347 .expect("append to succeed");
348
349 let reader = store
350 .read_stream::<TestEvent>(stream_id)
351 .await
352 .expect("read to succeed");
353
354 let collected: Vec<String> = reader.iter().map(|event| event.data.clone()).collect();
355
356 let observed = (reader.is_empty(), collected);
357
358 assert_eq!(
359 observed,
360 (false, vec!["first".to_string(), "second".to_string()])
361 );
362 }
363
364 #[test]
365 fn stream_writes_accepts_duplicate_stream_with_same_expected_version() {
366 let stream_id = StreamId::try_new("duplicate-stream-same-version".to_string())
367 .expect("valid stream id");
368
369 let first_event = TestEvent {
370 stream_id: stream_id.clone(),
371 data: "first-event".to_string(),
372 };
373
374 let second_event = TestEvent {
375 stream_id: stream_id.clone(),
376 data: "second-event".to_string(),
377 };
378
379 let writes_result = StreamWrites::new()
380 .register_stream(stream_id.clone(), StreamVersion::new(0))
381 .and_then(|writes| writes.append(first_event))
382 .and_then(|writes| writes.append(second_event));
383
384 assert!(writes_result.is_ok());
385 }
386
387 #[test]
388 fn stream_writes_rejects_duplicate_stream_with_conflicting_expected_versions() {
389 let stream_id =
390 StreamId::try_new("duplicate-stream-conflict".to_string()).expect("valid stream id");
391
392 let first_event = TestEvent {
393 stream_id: stream_id.clone(),
394 data: "first-event-conflict".to_string(),
395 };
396
397 let second_event = TestEvent {
398 stream_id: stream_id.clone(),
399 data: "second-event-conflict".to_string(),
400 };
401
402 let conflict = StreamWrites::new()
403 .register_stream(stream_id.clone(), StreamVersion::new(0))
404 .and_then(|writes| writes.append(first_event))
405 .and_then(|writes| writes.register_stream(stream_id.clone(), StreamVersion::new(1)))
406 .and_then(|writes| writes.append(second_event));
407
408 let message = conflict.unwrap_err().to_string();
409
410 assert_eq!(
411 message,
412 "conflicting expected versions for stream duplicate-stream-conflict: first=0, second=1"
413 );
414 }
415
416 #[tokio::test]
417 async fn stream_writes_registers_stream_before_appending_multiple_events() {
418 let store = InMemoryEventStore::new();
419 let stream_id =
420 StreamId::try_new("registered-stream".to_string()).expect("valid stream id");
421
422 let first_event = TestEvent {
423 stream_id: stream_id.clone(),
424 data: "first-registered-event".to_string(),
425 };
426
427 let second_event = TestEvent {
428 stream_id: stream_id.clone(),
429 data: "second-registered-event".to_string(),
430 };
431
432 let writes = StreamWrites::new()
433 .register_stream(stream_id.clone(), StreamVersion::new(0))
434 .and_then(|writes| writes.append(first_event))
435 .and_then(|writes| writes.append(second_event))
436 .expect("registered stream should accept events");
437
438 let result = store.append_events(writes).await;
439
440 assert!(
441 result.is_ok(),
442 "append should succeed when stream registered before events"
443 );
444 }
445
446 #[test]
447 fn stream_writes_rejects_appends_for_unregistered_streams() {
448 let stream_id =
449 StreamId::try_new("unregistered-stream".to_string()).expect("valid stream id");
450
451 let event = TestEvent {
452 stream_id: stream_id.clone(),
453 data: "unregistered-event".to_string(),
454 };
455
456 let error = StreamWrites::new()
457 .append(event)
458 .expect_err("append without prior registration should fail");
459
460 assert!(matches!(
461 error,
462 EventStoreError::UndeclaredStream { stream_id: ref actual } if *actual == stream_id
463 ));
464 }
465
466 #[test]
467 fn expected_versions_returns_registered_streams_and_versions() {
468 let stream_a = StreamId::try_new("stream-a").expect("valid stream id");
469 let stream_b = StreamId::try_new("stream-b").expect("valid stream id");
470
471 let writes = StreamWrites::new()
472 .register_stream(stream_a.clone(), StreamVersion::new(0))
473 .and_then(|w| w.register_stream(stream_b.clone(), StreamVersion::new(5)))
474 .expect("registration should succeed");
475
476 let versions = writes.expected_versions();
477
478 assert_eq!(versions.len(), 2);
479 assert_eq!(versions.get(&stream_a), Some(&StreamVersion::new(0)));
480 assert_eq!(versions.get(&stream_b), Some(&StreamVersion::new(5)));
481 }
482
483 #[test]
484 fn stream_id_rejects_asterisk_metacharacter() {
485 let result = StreamId::try_new("account-*");
486 assert!(
487 result.is_err(),
488 "StreamId should reject asterisk glob metacharacter"
489 );
490 }
491
492 #[test]
493 fn stream_id_rejects_question_mark_metacharacter() {
494 let result = StreamId::try_new("account-?");
495 assert!(
496 result.is_err(),
497 "StreamId should reject question mark glob metacharacter"
498 );
499 }
500
501 #[test]
502 fn stream_id_rejects_open_bracket_metacharacter() {
503 let result = StreamId::try_new("account-[");
504 assert!(
505 result.is_err(),
506 "StreamId should reject open bracket glob metacharacter"
507 );
508 }
509
510 #[test]
511 fn stream_id_rejects_close_bracket_metacharacter() {
512 let result = StreamId::try_new("account-]");
513 assert!(
514 result.is_err(),
515 "StreamId should reject close bracket glob metacharacter"
516 );
517 }
518
519 #[tokio::test]
520 async fn event_reader_after_position_excludes_event_at_position() {
521 let store = InMemoryEventStore::new();
523 let stream_id = StreamId::try_new("reader-test").expect("valid stream id");
524
525 let event1 = TestEvent {
526 stream_id: stream_id.clone(),
527 data: "first".to_string(),
528 };
529 let event2 = TestEvent {
530 stream_id: stream_id.clone(),
531 data: "second".to_string(),
532 };
533 let event3 = TestEvent {
534 stream_id: stream_id.clone(),
535 data: "third".to_string(),
536 };
537
538 let writes = StreamWrites::new()
539 .register_stream(stream_id.clone(), StreamVersion::new(0))
540 .and_then(|w| w.append(event1))
541 .and_then(|w| w.append(event2))
542 .and_then(|w| w.append(event3))
543 .expect("append should succeed");
544
545 store
546 .append_events(writes)
547 .await
548 .expect("append to succeed");
549
550 let all_events = store
552 .read_events::<TestEvent>(EventFilter::all(), EventPage::first(BatchSize::new(100)))
553 .await
554 .expect("read all events to succeed");
555
556 assert_eq!(all_events.len(), 3, "Should have 3 events total");
557 let (first_event, first_position) = &all_events[0];
558
559 let page = EventPage::after(*first_position, BatchSize::new(100));
561 let filter = EventFilter::all();
562 let events = store
563 .read_events::<TestEvent>(filter, page)
564 .await
565 .expect("read to succeed");
566
567 assert_eq!(events.len(), 2, "Should get 2 events after first position");
569 assert_eq!(
570 events[0].0.data, "second",
571 "First returned event should be 'second'"
572 );
573 assert_eq!(
574 events[1].0.data, "third",
575 "Second returned event should be 'third'"
576 );
577
578 for (event, _pos) in &events {
580 assert_ne!(
581 event.data, first_event.data,
582 "First event should be excluded"
583 );
584 }
585
586 for (_event, pos) in &events {
588 assert!(
589 *pos > *first_position,
590 "Returned position {} should be > first position {}",
591 pos,
592 first_position
593 );
594 }
595 }
596}