allframe_core/cqrs/
memory_backend.rs

1//! In-memory event store backend
2//!
3//! This is the default backend for AllFrame CQRS, providing a simple
4//! HashMap-based storage suitable for testing, development, and MVPs.
5
6use std::{collections::HashMap, sync::Arc};
7
8use async_trait::async_trait;
9use tokio::sync::RwLock;
10
11use super::{
12    backend::{BackendStats, EventStoreBackend},
13    Event,
14};
15
16/// Type alias for snapshot storage (snapshot data + version)
17type SnapshotMap = HashMap<String, (Vec<u8>, u64)>;
18
19/// In-memory event store backend
20#[derive(Clone)]
21pub struct InMemoryBackend<E: Event> {
22    events: Arc<RwLock<HashMap<String, Vec<E>>>>,
23    snapshots: Arc<RwLock<SnapshotMap>>,
24}
25
26impl<E: Event> InMemoryBackend<E> {
27    /// Create a new in-memory backend
28    pub fn new() -> Self {
29        Self {
30            events: Arc::new(RwLock::new(HashMap::new())),
31            snapshots: Arc::new(RwLock::new(HashMap::new())),
32        }
33    }
34}
35
36impl<E: Event> Default for InMemoryBackend<E> {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42#[async_trait]
43impl<E: Event> EventStoreBackend<E> for InMemoryBackend<E> {
44    async fn append(&self, aggregate_id: &str, events: Vec<E>) -> Result<(), String> {
45        let mut store = self.events.write().await;
46        let stream = store
47            .entry(aggregate_id.to_string())
48            .or_insert_with(Vec::new);
49        stream.extend(events);
50        Ok(())
51    }
52
53    async fn get_events(&self, aggregate_id: &str) -> Result<Vec<E>, String> {
54        let store = self.events.read().await;
55        Ok(store.get(aggregate_id).cloned().unwrap_or_default())
56    }
57
58    async fn get_all_events(&self) -> Result<Vec<E>, String> {
59        let store = self.events.read().await;
60        let mut all_events = Vec::new();
61        for events in store.values() {
62            all_events.extend(events.clone());
63        }
64        Ok(all_events)
65    }
66
67    async fn get_events_after(&self, aggregate_id: &str, version: u64) -> Result<Vec<E>, String> {
68        let events = self.get_events(aggregate_id).await?;
69        Ok(events.into_iter().skip(version as usize).collect())
70    }
71
72    async fn save_snapshot(
73        &self,
74        aggregate_id: &str,
75        snapshot_data: Vec<u8>,
76        version: u64,
77    ) -> Result<(), String> {
78        let mut snapshots = self.snapshots.write().await;
79        snapshots.insert(aggregate_id.to_string(), (snapshot_data, version));
80        Ok(())
81    }
82
83    async fn get_latest_snapshot(&self, aggregate_id: &str) -> Result<(Vec<u8>, u64), String> {
84        let snapshots = self.snapshots.read().await;
85        snapshots
86            .get(aggregate_id)
87            .cloned()
88            .ok_or_else(|| "No snapshot found".to_string())
89    }
90
91    async fn stats(&self) -> BackendStats {
92        let store = self.events.read().await;
93        let snapshots = self.snapshots.read().await;
94
95        let total_events: u64 = store.values().map(|v| v.len() as u64).sum();
96        let total_aggregates = store.len() as u64;
97        let total_snapshots = snapshots.len() as u64;
98
99        let mut backend_specific = HashMap::new();
100        backend_specific.insert("backend_type".to_string(), "in-memory".to_string());
101
102        BackendStats {
103            total_events,
104            total_aggregates,
105            total_snapshots,
106            backend_specific,
107        }
108    }
109}