sync_engine/storage/
memory.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4use async_trait::async_trait;
5use dashmap::DashMap;
6use crate::sync_item::SyncItem;
7use super::traits::{CacheStore, StorageError};
8
9pub struct InMemoryStore {
10    data: DashMap<String, SyncItem>,
11}
12
13impl InMemoryStore {
14    #[must_use]
15    pub fn new() -> Self {
16        Self {
17            data: DashMap::new(),
18        }
19    }
20
21    /// Get current item count
22    #[must_use]
23    pub fn len(&self) -> usize {
24        self.data.len()
25    }
26
27    /// Check if empty
28    #[must_use]
29    pub fn is_empty(&self) -> bool {
30        self.data.is_empty()
31    }
32
33    /// Clear all items
34    pub fn clear(&self) {
35        self.data.clear();
36    }
37}
38
39impl Default for InMemoryStore {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45#[async_trait]
46impl CacheStore for InMemoryStore {
47    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
48        Ok(self.data.get(id).map(|r| r.value().clone()))
49    }
50
51    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
52        self.data.insert(item.object_id.clone(), item.clone());
53        Ok(())
54    }
55
56    async fn delete(&self, id: &str) -> Result<(), StorageError> {
57        self.data.remove(id);
58        Ok(())
59    }
60
61    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
62        Ok(self.data.contains_key(id))
63    }
64}
65
66#[cfg(test)]
67mod tests {
68    use super::*;
69    use serde_json::json;
70
71    fn test_item(id: &str) -> SyncItem {
72        SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
73    }
74
75    #[tokio::test]
76    async fn test_new_store_is_empty() {
77        let store = InMemoryStore::new();
78        assert!(store.is_empty());
79        assert_eq!(store.len(), 0);
80    }
81
82    #[tokio::test]
83    async fn test_put_and_get() {
84        let store = InMemoryStore::new();
85        let item = test_item("item-1");
86        
87        store.put(&item).await.unwrap();
88        
89        let result = store.get("item-1").await.unwrap();
90        assert!(result.is_some());
91        assert_eq!(result.unwrap().object_id, "item-1");
92    }
93
94    #[tokio::test]
95    async fn test_get_nonexistent_returns_none() {
96        let store = InMemoryStore::new();
97        
98        let result = store.get("nonexistent").await.unwrap();
99        assert!(result.is_none());
100    }
101
102    #[tokio::test]
103    async fn test_delete() {
104        let store = InMemoryStore::new();
105        let item = test_item("to-delete");
106        
107        store.put(&item).await.unwrap();
108        assert_eq!(store.len(), 1);
109        
110        store.delete("to-delete").await.unwrap();
111        assert_eq!(store.len(), 0);
112        
113        let result = store.get("to-delete").await.unwrap();
114        assert!(result.is_none());
115    }
116
117    #[tokio::test]
118    async fn test_delete_nonexistent_is_ok() {
119        let store = InMemoryStore::new();
120        
121        // Should not error
122        let result = store.delete("nonexistent").await;
123        assert!(result.is_ok());
124    }
125
126    #[tokio::test]
127    async fn test_put_overwrites() {
128        let store = InMemoryStore::new();
129        
130        let item1 = SyncItem::from_json("same-id".to_string(), json!({"version": 1}));
131        let item2 = SyncItem::from_json("same-id".to_string(), json!({"version": 2}));
132        
133        store.put(&item1).await.unwrap();
134        store.put(&item2).await.unwrap();
135        
136        assert_eq!(store.len(), 1);
137        
138        let result = store.get("same-id").await.unwrap().unwrap();
139        let content = result.content_as_json().unwrap();
140        assert_eq!(content["version"], 2);
141    }
142
143    #[tokio::test]
144    async fn test_clear() {
145        let store = InMemoryStore::new();
146        
147        for i in 0..10 {
148            store.put(&test_item(&format!("item-{}", i))).await.unwrap();
149        }
150        
151        assert_eq!(store.len(), 10);
152        
153        store.clear();
154        
155        assert!(store.is_empty());
156    }
157
158    #[tokio::test]
159    async fn test_default_trait() {
160        let store = InMemoryStore::default();
161        assert!(store.is_empty());
162    }
163
164    #[tokio::test]
165    async fn test_put_batch_via_trait() {
166        let store = InMemoryStore::new();
167        
168        let items: Vec<SyncItem> = (0..5)
169            .map(|i| test_item(&format!("batch-{}", i)))
170            .collect();
171        
172        // Use trait default implementation
173        let result = store.put_batch(&items).await.unwrap();
174        
175        assert_eq!(result.written, 5);
176        assert!(result.verified);
177        assert_eq!(store.len(), 5);
178    }
179
180    #[tokio::test]
181    async fn test_concurrent_access() {
182        use std::sync::Arc;
183        
184        let store = Arc::new(InMemoryStore::new());
185        let mut handles = vec![];
186        
187        // Spawn 10 tasks that each insert 10 items
188        for batch in 0..10 {
189            let store_clone = store.clone();
190            let handle = tokio::spawn(async move {
191                for i in 0..10 {
192                    let item = test_item(&format!("batch-{}-item-{}", batch, i));
193                    store_clone.put(&item).await.unwrap();
194                }
195            });
196            handles.push(handle);
197        }
198        
199        // Wait for all tasks
200        for handle in handles {
201            handle.await.unwrap();
202        }
203        
204        // Should have all 100 items
205        assert_eq!(store.len(), 100);
206    }
207}