use crate::{Aggregate, CqrsContext, CqrsError, Dispatcher, EventEnvelope};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tracing::{debug, info};
pub struct InMemoryDispatcher<A: Aggregate> {
events: Arc<Mutex<HashMap<String, Vec<EventEnvelope<A>>>>>,
}
impl<A: Aggregate> InMemoryDispatcher<A> {
#[must_use]
pub fn new() -> Self {
Self {
events: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn get_events(&self, aggregate_id: &str) -> Vec<EventEnvelope<A>> {
let events = self.events.lock().unwrap();
events.get(aggregate_id).cloned().unwrap_or_default()
}
pub fn get_all_events(&self) -> HashMap<String, Vec<EventEnvelope<A>>> {
let events = self.events.lock().unwrap();
events.clone()
}
pub fn clear(&self) {
let mut events = self.events.lock().unwrap();
events.clear();
}
}
impl<A: Aggregate> Default for InMemoryDispatcher<A> {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl<A: Aggregate> Dispatcher<A> for InMemoryDispatcher<A> {
async fn dispatch(
&self,
aggregate_id: &str,
events: &[EventEnvelope<A>],
_context: &CqrsContext,
) -> Result<(), CqrsError> {
debug!("Dispatching events to in-memory store");
let mut store = self.events.lock().unwrap();
let aggregate_events = store.entry(aggregate_id.to_string()).or_default();
for event in events {
debug!(event_id = %event.event_id, version = %event.version, "Adding event to in-memory store");
aggregate_events.push(event.clone());
}
info!(
event_count = events.len(),
"Successfully dispatched events to in-memory store"
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::testing::{TestAggregate, TestEvent};
use crate::CqrsContext;
use chrono::Utc;
#[tokio::test]
async fn test_in_memory_dispatcher() {
let dispatcher = InMemoryDispatcher::<TestAggregate>::new();
let context = CqrsContext::default();
let events = vec![
EventEnvelope {
event_id: "event1".to_string(),
aggregate_id: "agg1".to_string(),
version: 1,
payload: TestEvent::Created {
name: "toto".to_string(),
},
metadata: HashMap::new(),
at: Utc::now(),
},
EventEnvelope {
event_id: "event2".to_string(),
aggregate_id: "agg1".to_string(),
version: 2,
payload: TestEvent::Updated {
name: "toto".to_string(),
},
metadata: HashMap::new(),
at: Utc::now(),
},
];
dispatcher
.dispatch("agg1", &events, &context)
.await
.unwrap();
let stored_events = dispatcher.get_events("agg1");
assert_eq!(stored_events.len(), 2);
assert_eq!(stored_events[0].event_id, "event1");
assert_eq!(stored_events[1].event_id, "event2");
let all_events = dispatcher.get_all_events();
assert_eq!(all_events.len(), 1);
assert!(all_events.contains_key("agg1"));
dispatcher.clear();
let stored_events = dispatcher.get_events("agg1");
assert_eq!(stored_events.len(), 0);
}
}