sync_engine/storage/
memory.rs1use async_trait::async_trait;
2use dashmap::DashMap;
3use crate::sync_item::SyncItem;
4use super::traits::{CacheStore, StorageError};
5
6pub struct InMemoryStore {
7 data: DashMap<String, SyncItem>,
8}
9
10impl InMemoryStore {
11 #[must_use]
12 pub fn new() -> Self {
13 Self {
14 data: DashMap::new(),
15 }
16 }
17
18 #[must_use]
20 pub fn len(&self) -> usize {
21 self.data.len()
22 }
23
24 #[must_use]
26 pub fn is_empty(&self) -> bool {
27 self.data.is_empty()
28 }
29
30 pub fn clear(&self) {
32 self.data.clear();
33 }
34}
35
36impl Default for InMemoryStore {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42#[async_trait]
43impl CacheStore for InMemoryStore {
44 async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
45 Ok(self.data.get(id).map(|r| r.value().clone()))
46 }
47
48 async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
49 self.data.insert(item.object_id.clone(), item.clone());
50 Ok(())
51 }
52
53 async fn delete(&self, id: &str) -> Result<(), StorageError> {
54 self.data.remove(id);
55 Ok(())
56 }
57
58 async fn exists(&self, id: &str) -> Result<bool, StorageError> {
59 Ok(self.data.contains_key(id))
60 }
61}
62
63#[cfg(test)]
64mod tests {
65 use super::*;
66 use serde_json::json;
67
68 fn test_item(id: &str) -> SyncItem {
69 SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
70 }
71
72 #[tokio::test]
73 async fn test_new_store_is_empty() {
74 let store = InMemoryStore::new();
75 assert!(store.is_empty());
76 assert_eq!(store.len(), 0);
77 }
78
79 #[tokio::test]
80 async fn test_put_and_get() {
81 let store = InMemoryStore::new();
82 let item = test_item("item-1");
83
84 store.put(&item).await.unwrap();
85
86 let result = store.get("item-1").await.unwrap();
87 assert!(result.is_some());
88 assert_eq!(result.unwrap().object_id, "item-1");
89 }
90
91 #[tokio::test]
92 async fn test_get_nonexistent_returns_none() {
93 let store = InMemoryStore::new();
94
95 let result = store.get("nonexistent").await.unwrap();
96 assert!(result.is_none());
97 }
98
99 #[tokio::test]
100 async fn test_delete() {
101 let store = InMemoryStore::new();
102 let item = test_item("to-delete");
103
104 store.put(&item).await.unwrap();
105 assert_eq!(store.len(), 1);
106
107 store.delete("to-delete").await.unwrap();
108 assert_eq!(store.len(), 0);
109
110 let result = store.get("to-delete").await.unwrap();
111 assert!(result.is_none());
112 }
113
114 #[tokio::test]
115 async fn test_delete_nonexistent_is_ok() {
116 let store = InMemoryStore::new();
117
118 let result = store.delete("nonexistent").await;
120 assert!(result.is_ok());
121 }
122
123 #[tokio::test]
124 async fn test_put_overwrites() {
125 let store = InMemoryStore::new();
126
127 let item1 = SyncItem::from_json("same-id".to_string(), json!({"version": 1}));
128 let item2 = SyncItem::from_json("same-id".to_string(), json!({"version": 2}));
129
130 store.put(&item1).await.unwrap();
131 store.put(&item2).await.unwrap();
132
133 assert_eq!(store.len(), 1);
134
135 let result = store.get("same-id").await.unwrap().unwrap();
136 let content = result.content_as_json().unwrap();
137 assert_eq!(content["version"], 2);
138 }
139
140 #[tokio::test]
141 async fn test_clear() {
142 let store = InMemoryStore::new();
143
144 for i in 0..10 {
145 store.put(&test_item(&format!("item-{}", i))).await.unwrap();
146 }
147
148 assert_eq!(store.len(), 10);
149
150 store.clear();
151
152 assert!(store.is_empty());
153 }
154
155 #[tokio::test]
156 async fn test_default_trait() {
157 let store = InMemoryStore::default();
158 assert!(store.is_empty());
159 }
160
161 #[tokio::test]
162 async fn test_put_batch_via_trait() {
163 let store = InMemoryStore::new();
164
165 let items: Vec<SyncItem> = (0..5)
166 .map(|i| test_item(&format!("batch-{}", i)))
167 .collect();
168
169 let result = store.put_batch(&items).await.unwrap();
171
172 assert_eq!(result.written, 5);
173 assert!(result.verified);
174 assert_eq!(store.len(), 5);
175 }
176
177 #[tokio::test]
178 async fn test_concurrent_access() {
179 use std::sync::Arc;
180
181 let store = Arc::new(InMemoryStore::new());
182 let mut handles = vec![];
183
184 for batch in 0..10 {
186 let store_clone = store.clone();
187 let handle = tokio::spawn(async move {
188 for i in 0..10 {
189 let item = test_item(&format!("batch-{}-item-{}", batch, i));
190 store_clone.put(&item).await.unwrap();
191 }
192 });
193 handles.push(handle);
194 }
195
196 for handle in handles {
198 handle.await.unwrap();
199 }
200
201 assert_eq!(store.len(), 100);
203 }
204}