sync_engine/storage/
memory.rs

1use async_trait::async_trait;
2use dashmap::DashMap;
3use crate::sync_item::SyncItem;
4use super::traits::{CacheStore, StorageError};
5
6pub struct InMemoryStore {
7    data: DashMap<String, SyncItem>,
8}
9
10impl InMemoryStore {
11    #[must_use]
12    pub fn new() -> Self {
13        Self {
14            data: DashMap::new(),
15        }
16    }
17
18    /// Get current item count
19    #[must_use]
20    pub fn len(&self) -> usize {
21        self.data.len()
22    }
23
24    /// Check if empty
25    #[must_use]
26    pub fn is_empty(&self) -> bool {
27        self.data.is_empty()
28    }
29
30    /// Clear all items
31    pub fn clear(&self) {
32        self.data.clear();
33    }
34}
35
36impl Default for InMemoryStore {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42#[async_trait]
43impl CacheStore for InMemoryStore {
44    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
45        Ok(self.data.get(id).map(|r| r.value().clone()))
46    }
47
48    async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
49        self.data.insert(item.object_id.clone(), item.clone());
50        Ok(())
51    }
52
53    async fn delete(&self, id: &str) -> Result<(), StorageError> {
54        self.data.remove(id);
55        Ok(())
56    }
57
58    async fn exists(&self, id: &str) -> Result<bool, StorageError> {
59        Ok(self.data.contains_key(id))
60    }
61}
62
63#[cfg(test)]
64mod tests {
65    use super::*;
66    use serde_json::json;
67
68    fn test_item(id: &str) -> SyncItem {
69        SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
70    }
71
72    #[tokio::test]
73    async fn test_new_store_is_empty() {
74        let store = InMemoryStore::new();
75        assert!(store.is_empty());
76        assert_eq!(store.len(), 0);
77    }
78
79    #[tokio::test]
80    async fn test_put_and_get() {
81        let store = InMemoryStore::new();
82        let item = test_item("item-1");
83        
84        store.put(&item).await.unwrap();
85        
86        let result = store.get("item-1").await.unwrap();
87        assert!(result.is_some());
88        assert_eq!(result.unwrap().object_id, "item-1");
89    }
90
91    #[tokio::test]
92    async fn test_get_nonexistent_returns_none() {
93        let store = InMemoryStore::new();
94        
95        let result = store.get("nonexistent").await.unwrap();
96        assert!(result.is_none());
97    }
98
99    #[tokio::test]
100    async fn test_delete() {
101        let store = InMemoryStore::new();
102        let item = test_item("to-delete");
103        
104        store.put(&item).await.unwrap();
105        assert_eq!(store.len(), 1);
106        
107        store.delete("to-delete").await.unwrap();
108        assert_eq!(store.len(), 0);
109        
110        let result = store.get("to-delete").await.unwrap();
111        assert!(result.is_none());
112    }
113
114    #[tokio::test]
115    async fn test_delete_nonexistent_is_ok() {
116        let store = InMemoryStore::new();
117        
118        // Should not error
119        let result = store.delete("nonexistent").await;
120        assert!(result.is_ok());
121    }
122
123    #[tokio::test]
124    async fn test_put_overwrites() {
125        let store = InMemoryStore::new();
126        
127        let item1 = SyncItem::from_json("same-id".to_string(), json!({"version": 1}));
128        let item2 = SyncItem::from_json("same-id".to_string(), json!({"version": 2}));
129        
130        store.put(&item1).await.unwrap();
131        store.put(&item2).await.unwrap();
132        
133        assert_eq!(store.len(), 1);
134        
135        let result = store.get("same-id").await.unwrap().unwrap();
136        let content = result.content_as_json().unwrap();
137        assert_eq!(content["version"], 2);
138    }
139
140    #[tokio::test]
141    async fn test_clear() {
142        let store = InMemoryStore::new();
143        
144        for i in 0..10 {
145            store.put(&test_item(&format!("item-{}", i))).await.unwrap();
146        }
147        
148        assert_eq!(store.len(), 10);
149        
150        store.clear();
151        
152        assert!(store.is_empty());
153    }
154
155    #[tokio::test]
156    async fn test_default_trait() {
157        let store = InMemoryStore::default();
158        assert!(store.is_empty());
159    }
160
161    #[tokio::test]
162    async fn test_put_batch_via_trait() {
163        let store = InMemoryStore::new();
164        
165        let items: Vec<SyncItem> = (0..5)
166            .map(|i| test_item(&format!("batch-{}", i)))
167            .collect();
168        
169        // Use trait default implementation
170        let result = store.put_batch(&items).await.unwrap();
171        
172        assert_eq!(result.written, 5);
173        assert!(result.verified);
174        assert_eq!(store.len(), 5);
175    }
176
177    #[tokio::test]
178    async fn test_concurrent_access() {
179        use std::sync::Arc;
180        
181        let store = Arc::new(InMemoryStore::new());
182        let mut handles = vec![];
183        
184        // Spawn 10 tasks that each insert 10 items
185        for batch in 0..10 {
186            let store_clone = store.clone();
187            let handle = tokio::spawn(async move {
188                for i in 0..10 {
189                    let item = test_item(&format!("batch-{}-item-{}", batch, i));
190                    store_clone.put(&item).await.unwrap();
191                }
192            });
193            handles.push(handle);
194        }
195        
196        // Wait for all tasks
197        for handle in handles {
198            handle.await.unwrap();
199        }
200        
201        // Should have all 100 items
202        assert_eq!(store.len(), 100);
203    }
204}