allframe_core/cqrs/
memory_backend.rs1use std::{collections::HashMap, sync::Arc};
7
8use async_trait::async_trait;
9use tokio::sync::RwLock;
10
11use super::{
12 backend::{BackendStats, EventStoreBackend},
13 Event,
14};
15
16type SnapshotMap = HashMap<String, (Vec<u8>, u64)>;
18
19#[derive(Clone)]
21pub struct InMemoryBackend<E: Event> {
22 events: Arc<RwLock<HashMap<String, Vec<E>>>>,
23 snapshots: Arc<RwLock<SnapshotMap>>,
24}
25
26impl<E: Event> InMemoryBackend<E> {
27 pub fn new() -> Self {
29 Self {
30 events: Arc::new(RwLock::new(HashMap::new())),
31 snapshots: Arc::new(RwLock::new(HashMap::new())),
32 }
33 }
34}
35
36impl<E: Event> Default for InMemoryBackend<E> {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42#[async_trait]
43impl<E: Event> EventStoreBackend<E> for InMemoryBackend<E> {
44 async fn append(&self, aggregate_id: &str, events: Vec<E>) -> Result<(), String> {
45 let mut store = self.events.write().await;
46 let stream = store
47 .entry(aggregate_id.to_string())
48 .or_insert_with(Vec::new);
49 stream.extend(events);
50 Ok(())
51 }
52
53 async fn get_events(&self, aggregate_id: &str) -> Result<Vec<E>, String> {
54 let store = self.events.read().await;
55 Ok(store.get(aggregate_id).cloned().unwrap_or_default())
56 }
57
58 async fn get_all_events(&self) -> Result<Vec<E>, String> {
59 let store = self.events.read().await;
60 let mut all_events = Vec::new();
61 for events in store.values() {
62 all_events.extend(events.clone());
63 }
64 Ok(all_events)
65 }
66
67 async fn get_events_after(&self, aggregate_id: &str, version: u64) -> Result<Vec<E>, String> {
68 let events = self.get_events(aggregate_id).await?;
69 Ok(events.into_iter().skip(version as usize).collect())
70 }
71
72 async fn save_snapshot(
73 &self,
74 aggregate_id: &str,
75 snapshot_data: Vec<u8>,
76 version: u64,
77 ) -> Result<(), String> {
78 let mut snapshots = self.snapshots.write().await;
79 snapshots.insert(aggregate_id.to_string(), (snapshot_data, version));
80 Ok(())
81 }
82
83 async fn get_latest_snapshot(&self, aggregate_id: &str) -> Result<(Vec<u8>, u64), String> {
84 let snapshots = self.snapshots.read().await;
85 snapshots
86 .get(aggregate_id)
87 .cloned()
88 .ok_or_else(|| "No snapshot found".to_string())
89 }
90
91 async fn stats(&self) -> BackendStats {
92 let store = self.events.read().await;
93 let snapshots = self.snapshots.read().await;
94
95 let total_events: u64 = store.values().map(|v| v.len() as u64).sum();
96 let total_aggregates = store.len() as u64;
97 let total_snapshots = snapshots.len() as u64;
98
99 let mut backend_specific = HashMap::new();
100 backend_specific.insert("backend_type".to_string(), "in-memory".to_string());
101
102 BackendStats {
103 total_events,
104 total_aggregates,
105 total_snapshots,
106 backend_specific,
107 }
108 }
109}