sync_engine/storage/
memory.rs1use async_trait::async_trait;
5use dashmap::DashMap;
6use crate::sync_item::SyncItem;
7use super::traits::{CacheStore, StorageError};
8
9pub struct InMemoryStore {
10 data: DashMap<String, SyncItem>,
11}
12
13impl InMemoryStore {
14 #[must_use]
15 pub fn new() -> Self {
16 Self {
17 data: DashMap::new(),
18 }
19 }
20
21 #[must_use]
23 pub fn len(&self) -> usize {
24 self.data.len()
25 }
26
27 #[must_use]
29 pub fn is_empty(&self) -> bool {
30 self.data.is_empty()
31 }
32
33 pub fn clear(&self) {
35 self.data.clear();
36 }
37}
38
39impl Default for InMemoryStore {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45#[async_trait]
46impl CacheStore for InMemoryStore {
47 async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
48 Ok(self.data.get(id).map(|r| r.value().clone()))
49 }
50
51 async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
52 self.data.insert(item.object_id.clone(), item.clone());
53 Ok(())
54 }
55
56 async fn delete(&self, id: &str) -> Result<(), StorageError> {
57 self.data.remove(id);
58 Ok(())
59 }
60
61 async fn exists(&self, id: &str) -> Result<bool, StorageError> {
62 Ok(self.data.contains_key(id))
63 }
64}
65
66#[cfg(test)]
67mod tests {
68 use super::*;
69 use serde_json::json;
70
71 fn test_item(id: &str) -> SyncItem {
72 SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
73 }
74
75 #[tokio::test]
76 async fn test_new_store_is_empty() {
77 let store = InMemoryStore::new();
78 assert!(store.is_empty());
79 assert_eq!(store.len(), 0);
80 }
81
82 #[tokio::test]
83 async fn test_put_and_get() {
84 let store = InMemoryStore::new();
85 let item = test_item("item-1");
86
87 store.put(&item).await.unwrap();
88
89 let result = store.get("item-1").await.unwrap();
90 assert!(result.is_some());
91 assert_eq!(result.unwrap().object_id, "item-1");
92 }
93
94 #[tokio::test]
95 async fn test_get_nonexistent_returns_none() {
96 let store = InMemoryStore::new();
97
98 let result = store.get("nonexistent").await.unwrap();
99 assert!(result.is_none());
100 }
101
102 #[tokio::test]
103 async fn test_delete() {
104 let store = InMemoryStore::new();
105 let item = test_item("to-delete");
106
107 store.put(&item).await.unwrap();
108 assert_eq!(store.len(), 1);
109
110 store.delete("to-delete").await.unwrap();
111 assert_eq!(store.len(), 0);
112
113 let result = store.get("to-delete").await.unwrap();
114 assert!(result.is_none());
115 }
116
117 #[tokio::test]
118 async fn test_delete_nonexistent_is_ok() {
119 let store = InMemoryStore::new();
120
121 let result = store.delete("nonexistent").await;
123 assert!(result.is_ok());
124 }
125
126 #[tokio::test]
127 async fn test_put_overwrites() {
128 let store = InMemoryStore::new();
129
130 let item1 = SyncItem::from_json("same-id".to_string(), json!({"version": 1}));
131 let item2 = SyncItem::from_json("same-id".to_string(), json!({"version": 2}));
132
133 store.put(&item1).await.unwrap();
134 store.put(&item2).await.unwrap();
135
136 assert_eq!(store.len(), 1);
137
138 let result = store.get("same-id").await.unwrap().unwrap();
139 let content = result.content_as_json().unwrap();
140 assert_eq!(content["version"], 2);
141 }
142
143 #[tokio::test]
144 async fn test_clear() {
145 let store = InMemoryStore::new();
146
147 for i in 0..10 {
148 store.put(&test_item(&format!("item-{}", i))).await.unwrap();
149 }
150
151 assert_eq!(store.len(), 10);
152
153 store.clear();
154
155 assert!(store.is_empty());
156 }
157
158 #[tokio::test]
159 async fn test_default_trait() {
160 let store = InMemoryStore::default();
161 assert!(store.is_empty());
162 }
163
164 #[tokio::test]
165 async fn test_put_batch_via_trait() {
166 let store = InMemoryStore::new();
167
168 let items: Vec<SyncItem> = (0..5)
169 .map(|i| test_item(&format!("batch-{}", i)))
170 .collect();
171
172 let result = store.put_batch(&items).await.unwrap();
174
175 assert_eq!(result.written, 5);
176 assert!(result.verified);
177 assert_eq!(store.len(), 5);
178 }
179
180 #[tokio::test]
181 async fn test_concurrent_access() {
182 use std::sync::Arc;
183
184 let store = Arc::new(InMemoryStore::new());
185 let mut handles = vec![];
186
187 for batch in 0..10 {
189 let store_clone = store.clone();
190 let handle = tokio::spawn(async move {
191 for i in 0..10 {
192 let item = test_item(&format!("batch-{}-item-{}", batch, i));
193 store_clone.put(&item).await.unwrap();
194 }
195 });
196 handles.push(handle);
197 }
198
199 for handle in handles {
201 handle.await.unwrap();
202 }
203
204 assert_eq!(store.len(), 100);
206 }
207}