Skip to main content

crdt_store/
memory.rs

1use alloc::collections::BTreeMap;
2use alloc::string::{String, ToString};
3use alloc::vec::Vec;
4use core::fmt;
5
6use crate::traits::{EventStore, Snapshot, StateStore, StoredEvent};
7
8/// In-memory storage backend.
9///
10/// All data is stored in `BTreeMap`s — nothing touches disk.
11/// Ideal for testing and prototyping.
12///
13/// # Example
14///
15/// ```
16/// use crdt_store::{MemoryStore, StateStore};
17///
18/// let mut store = MemoryStore::new();
19/// store.put("sensors", "s1", b"temp=22.5").unwrap();
20///
21/// let data = store.get("sensors", "s1").unwrap().unwrap();
22/// assert_eq!(data, b"temp=22.5");
23/// ```
24pub struct MemoryStore {
25    /// State store: (namespace, key) -> value
26    state: BTreeMap<(String, String), Vec<u8>>,
27    /// Event log: (namespace, entity_id) -> sorted events
28    events: BTreeMap<(String, String), Vec<StoredEvent>>,
29    /// Snapshots: (namespace, entity_id) -> latest snapshot
30    snapshots: BTreeMap<(String, String), Snapshot>,
31    /// Global sequence counter
32    next_sequence: u64,
33}
34
35/// Error type for the in-memory backend.
36///
37/// This backend never actually fails, but the trait requires an error type.
38#[derive(Debug, Clone)]
39pub struct MemoryError(String);
40
41impl fmt::Display for MemoryError {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        write!(f, "MemoryStore error: {}", self.0)
44    }
45}
46
47#[cfg(feature = "std")]
48impl std::error::Error for MemoryError {}
49
50impl MemoryStore {
51    /// Create a new empty in-memory store.
52    pub fn new() -> Self {
53        Self {
54            state: BTreeMap::new(),
55            events: BTreeMap::new(),
56            snapshots: BTreeMap::new(),
57            next_sequence: 1,
58        }
59    }
60
61    /// Returns the total number of state entries across all namespaces.
62    pub fn state_count(&self) -> usize {
63        self.state.len()
64    }
65
66    /// Returns the total number of events across all entities.
67    pub fn total_event_count(&self) -> usize {
68        self.events.values().map(|v| v.len()).sum()
69    }
70
71    fn ns_key(namespace: &str, key: &str) -> (String, String) {
72        (namespace.to_string(), key.to_string())
73    }
74}
75
76impl Default for MemoryStore {
77    fn default() -> Self {
78        Self::new()
79    }
80}
81
82impl StateStore for MemoryStore {
83    type Error = MemoryError;
84
85    fn put(&mut self, namespace: &str, key: &str, value: &[u8]) -> Result<(), Self::Error> {
86        self.state
87            .insert(Self::ns_key(namespace, key), value.to_vec());
88        Ok(())
89    }
90
91    fn get(&self, namespace: &str, key: &str) -> Result<Option<Vec<u8>>, Self::Error> {
92        Ok(self.state.get(&Self::ns_key(namespace, key)).cloned())
93    }
94
95    fn delete(&mut self, namespace: &str, key: &str) -> Result<(), Self::Error> {
96        self.state.remove(&Self::ns_key(namespace, key));
97        Ok(())
98    }
99
100    fn list_keys(&self, namespace: &str) -> Result<Vec<String>, Self::Error> {
101        let keys = self
102            .state
103            .keys()
104            .filter(|(ns, _)| ns == namespace)
105            .map(|(_, k)| k.clone())
106            .collect();
107        Ok(keys)
108    }
109
110    fn exists(&self, namespace: &str, key: &str) -> Result<bool, Self::Error> {
111        Ok(self.state.contains_key(&Self::ns_key(namespace, key)))
112    }
113}
114
115impl EventStore for MemoryStore {
116    fn append_event(
117        &mut self,
118        namespace: &str,
119        entity_id: &str,
120        data: &[u8],
121        timestamp: u64,
122        node_id: &str,
123    ) -> Result<u64, Self::Error> {
124        let seq = self.next_sequence;
125        self.next_sequence += 1;
126
127        let event = StoredEvent {
128            sequence: seq,
129            entity_id: entity_id.to_string(),
130            namespace: namespace.to_string(),
131            data: data.to_vec(),
132            timestamp,
133            node_id: node_id.to_string(),
134        };
135
136        self.events
137            .entry(Self::ns_key(namespace, entity_id))
138            .or_default()
139            .push(event);
140
141        Ok(seq)
142    }
143
144    fn events_since(
145        &self,
146        namespace: &str,
147        entity_id: &str,
148        since_sequence: u64,
149    ) -> Result<Vec<StoredEvent>, Self::Error> {
150        let events = self
151            .events
152            .get(&Self::ns_key(namespace, entity_id))
153            .map(|evts| {
154                evts.iter()
155                    .filter(|e| e.sequence > since_sequence)
156                    .cloned()
157                    .collect()
158            })
159            .unwrap_or_default();
160        Ok(events)
161    }
162
163    fn event_count(&self, namespace: &str, entity_id: &str) -> Result<u64, Self::Error> {
164        let count = self
165            .events
166            .get(&Self::ns_key(namespace, entity_id))
167            .map(|evts| evts.len() as u64)
168            .unwrap_or(0);
169        Ok(count)
170    }
171
172    fn save_snapshot(
173        &mut self,
174        namespace: &str,
175        entity_id: &str,
176        state: &[u8],
177        at_sequence: u64,
178        version: u8,
179    ) -> Result<(), Self::Error> {
180        self.snapshots.insert(
181            Self::ns_key(namespace, entity_id),
182            Snapshot {
183                state: state.to_vec(),
184                at_sequence,
185                version,
186            },
187        );
188        Ok(())
189    }
190
191    fn load_snapshot(
192        &self,
193        namespace: &str,
194        entity_id: &str,
195    ) -> Result<Option<Snapshot>, Self::Error> {
196        Ok(self
197            .snapshots
198            .get(&Self::ns_key(namespace, entity_id))
199            .cloned())
200    }
201
202    fn truncate_events_before(
203        &mut self,
204        namespace: &str,
205        entity_id: &str,
206        before_sequence: u64,
207    ) -> Result<u64, Self::Error> {
208        let key = Self::ns_key(namespace, entity_id);
209        if let Some(events) = self.events.get_mut(&key) {
210            let before_len = events.len() as u64;
211            events.retain(|e| e.sequence >= before_sequence);
212            let after_len = events.len() as u64;
213            Ok(before_len - after_len)
214        } else {
215            Ok(0)
216        }
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223
224    #[test]
225    fn state_put_get_delete() {
226        let mut store = MemoryStore::new();
227
228        store.put("ns", "k1", b"hello").unwrap();
229        assert_eq!(store.get("ns", "k1").unwrap(), Some(b"hello".to_vec()));
230
231        store.put("ns", "k1", b"world").unwrap();
232        assert_eq!(store.get("ns", "k1").unwrap(), Some(b"world".to_vec()));
233
234        store.delete("ns", "k1").unwrap();
235        assert_eq!(store.get("ns", "k1").unwrap(), None);
236    }
237
238    #[test]
239    fn state_namespace_isolation() {
240        let mut store = MemoryStore::new();
241        store.put("a", "k1", b"alpha").unwrap();
242        store.put("b", "k1", b"beta").unwrap();
243
244        assert_eq!(store.get("a", "k1").unwrap(), Some(b"alpha".to_vec()));
245        assert_eq!(store.get("b", "k1").unwrap(), Some(b"beta".to_vec()));
246    }
247
248    #[test]
249    fn state_list_keys() {
250        let mut store = MemoryStore::new();
251        store.put("ns", "a", b"1").unwrap();
252        store.put("ns", "b", b"2").unwrap();
253        store.put("other", "c", b"3").unwrap();
254
255        let mut keys = store.list_keys("ns").unwrap();
256        keys.sort();
257        assert_eq!(keys, vec!["a", "b"]);
258    }
259
260    #[test]
261    fn state_exists() {
262        let mut store = MemoryStore::new();
263        assert!(!store.exists("ns", "k").unwrap());
264        store.put("ns", "k", b"v").unwrap();
265        assert!(store.exists("ns", "k").unwrap());
266    }
267
268    #[test]
269    fn event_append_and_read() {
270        let mut store = MemoryStore::new();
271
272        let seq1 = store
273            .append_event("ns", "e1", b"op1", 100, "node-a")
274            .unwrap();
275        let seq2 = store
276            .append_event("ns", "e1", b"op2", 101, "node-a")
277            .unwrap();
278        let seq3 = store
279            .append_event("ns", "e1", b"op3", 102, "node-b")
280            .unwrap();
281
282        assert_eq!(seq1, 1);
283        assert_eq!(seq2, 2);
284        assert_eq!(seq3, 3);
285
286        // Read all events
287        let events = store.events_since("ns", "e1", 0).unwrap();
288        assert_eq!(events.len(), 3);
289        assert_eq!(events[0].data, b"op1");
290        assert_eq!(events[2].node_id, "node-b");
291
292        // Read events since seq 1
293        let events = store.events_since("ns", "e1", 1).unwrap();
294        assert_eq!(events.len(), 2);
295        assert_eq!(events[0].sequence, 2);
296    }
297
298    #[test]
299    fn event_count() {
300        let mut store = MemoryStore::new();
301        assert_eq!(store.event_count("ns", "e1").unwrap(), 0);
302
303        store.append_event("ns", "e1", b"op1", 100, "n").unwrap();
304        store.append_event("ns", "e1", b"op2", 101, "n").unwrap();
305        assert_eq!(store.event_count("ns", "e1").unwrap(), 2);
306
307        // Different entity
308        assert_eq!(store.event_count("ns", "e2").unwrap(), 0);
309    }
310
311    #[test]
312    fn snapshot_save_load() {
313        let mut store = MemoryStore::new();
314
315        assert!(store.load_snapshot("ns", "e1").unwrap().is_none());
316
317        store
318            .save_snapshot("ns", "e1", b"state-data", 42, 2)
319            .unwrap();
320
321        let snap = store.load_snapshot("ns", "e1").unwrap().unwrap();
322        assert_eq!(snap.state, b"state-data");
323        assert_eq!(snap.at_sequence, 42);
324        assert_eq!(snap.version, 2);
325    }
326
327    #[test]
328    fn truncate_events() {
329        let mut store = MemoryStore::new();
330        store.append_event("ns", "e1", b"op1", 100, "n").unwrap();
331        store.append_event("ns", "e1", b"op2", 101, "n").unwrap();
332        store.append_event("ns", "e1", b"op3", 102, "n").unwrap();
333        store.append_event("ns", "e1", b"op4", 103, "n").unwrap();
334
335        let removed = store.truncate_events_before("ns", "e1", 3).unwrap();
336        assert_eq!(removed, 2); // seq 1 and 2 removed
337
338        let events = store.events_since("ns", "e1", 0).unwrap();
339        assert_eq!(events.len(), 2);
340        assert_eq!(events[0].sequence, 3);
341    }
342
343    #[test]
344    fn entity_isolation() {
345        let mut store = MemoryStore::new();
346        store.append_event("ns", "e1", b"op1", 100, "n").unwrap();
347        store.append_event("ns", "e2", b"op2", 101, "n").unwrap();
348
349        assert_eq!(store.event_count("ns", "e1").unwrap(), 1);
350        assert_eq!(store.event_count("ns", "e2").unwrap(), 1);
351    }
352}