1use crate::error::{AmateRSError, ErrorContext, Result};
7use crate::storage::secondary_index::{IndexConfig, IndexExtractor, IndexManager, IndexedField};
8use crate::traits::StorageEngine;
9use crate::types::{CipherBlob, Key};
10use async_trait::async_trait;
11use dashmap::DashMap;
12use std::sync::Arc;
13use tokio::sync::Mutex;
14
15#[derive(Clone)]
17pub struct MemoryStorage {
18 data: Arc<DashMap<Key, CipherBlob>>,
19 index_manager: Option<Arc<IndexManager>>,
21 index_extractor: Option<Arc<dyn IndexExtractor>>,
23 index_write_lock: Arc<Mutex<()>>,
25}
26
27impl std::fmt::Debug for MemoryStorage {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 f.debug_struct("MemoryStorage")
30 .field("len", &self.data.len())
31 .field("has_index_manager", &self.index_manager.is_some())
32 .field("has_index_extractor", &self.index_extractor.is_some())
33 .finish()
34 }
35}
36
37impl MemoryStorage {
38 pub fn new() -> Self {
40 Self {
41 data: Arc::new(DashMap::new()),
42 index_manager: None,
43 index_extractor: None,
44 index_write_lock: Arc::new(Mutex::new(())),
45 }
46 }
47
48 pub fn with_index_manager(mut self, manager: Arc<IndexManager>) -> Self {
50 self.index_manager = Some(manager);
51 self
52 }
53
54 pub fn with_index_extractor(mut self, extractor: Arc<dyn IndexExtractor>) -> Self {
56 self.index_extractor = Some(extractor);
57 self
58 }
59
60 pub fn register_index(&self, config: IndexConfig) -> Result<()> {
64 self.index_manager
65 .as_ref()
66 .ok_or_else(|| {
67 AmateRSError::ValidationError(ErrorContext::new(
68 "No index manager attached; call with_index_manager() first",
69 ))
70 })
71 .and_then(|m| m.create_index(config))
72 }
73
74 pub fn index_manager(&self) -> Option<&Arc<IndexManager>> {
76 self.index_manager.as_ref()
77 }
78
79 fn validate_unique_constraints_mem(
83 mgr: &IndexManager,
84 key: &Key,
85 new_fields: &[IndexedField],
86 ) -> Result<()> {
87 mgr.check_unique_for_fields(key, new_fields)
88 }
89
90 pub fn len(&self) -> usize {
92 self.data.len()
93 }
94
95 pub fn is_empty(&self) -> bool {
97 self.data.is_empty()
98 }
99
100 pub fn clear(&self) {
102 self.data.clear();
103 }
104}
105
106impl Default for MemoryStorage {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112#[async_trait]
113impl StorageEngine for MemoryStorage {
114 async fn put(&self, key: &Key, value: &CipherBlob) -> Result<()> {
115 value.verify_integrity()?;
117
118 if let (Some(mgr), Some(ext)) = (&self.index_manager, &self.index_extractor) {
119 let _guard = self.index_write_lock.lock().await;
121
122 let old_fields = match self.data.get(key) {
123 Some(old_blob) => ext.extract(key, old_blob.value()),
124 None => Vec::new(),
125 };
126 let new_fields = ext.extract(key, value);
127
128 Self::validate_unique_constraints_mem(mgr, key, &new_fields)?;
130
131 self.data.insert(key.clone(), value.clone());
133
134 mgr.apply_extracted(key, &old_fields, &new_fields)?;
136 } else {
137 self.data.insert(key.clone(), value.clone());
138 }
139
140 Ok(())
141 }
142
143 async fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
144 Ok(self.data.get(key).map(|v| v.clone()))
145 }
146
147 async fn atomic_update<F>(&self, key: &Key, f: F) -> Result<()>
148 where
149 F: Fn(&CipherBlob) -> Result<CipherBlob> + Send + Sync,
150 {
151 if let (Some(mgr), Some(ext)) = (&self.index_manager, &self.index_extractor) {
152 let _guard = self.index_write_lock.lock().await;
153
154 let old_value = self
155 .data
156 .get(key)
157 .map(|v| v.clone())
158 .unwrap_or_else(|| CipherBlob::new(Vec::new()));
159
160 let new_value = f(&old_value)?;
161 new_value.verify_integrity()?;
162
163 let old_fields = ext.extract(key, &old_value);
164 let new_fields = ext.extract(key, &new_value);
165 Self::validate_unique_constraints_mem(mgr, key, &new_fields)?;
166
167 self.data.insert(key.clone(), new_value);
168 mgr.apply_extracted(key, &old_fields, &new_fields)?;
169 } else {
170 let mut entry = self
172 .data
173 .entry(key.clone())
174 .or_insert_with(|| CipherBlob::new(Vec::new()));
175
176 let old_value = entry.value().clone();
177 let new_value = f(&old_value)?;
178 new_value.verify_integrity()?;
179 *entry = new_value;
180 }
181
182 Ok(())
183 }
184
185 async fn delete(&self, key: &Key) -> Result<()> {
186 if let (Some(mgr), Some(ext)) = (&self.index_manager, &self.index_extractor) {
187 let _guard = self.index_write_lock.lock().await;
188
189 let old_fields = match self.data.get(key) {
190 Some(old_blob) => ext.extract(key, old_blob.value()),
191 None => Vec::new(),
192 };
193
194 self.data.remove(key);
195 mgr.apply_extracted(key, &old_fields, &[])?;
196 } else {
197 self.data.remove(key);
198 }
199
200 Ok(())
201 }
202
203 async fn range(&self, start: &Key, end: &Key) -> Result<Vec<(Key, CipherBlob)>> {
204 let mut results: Vec<_> = self
205 .data
206 .iter()
207 .filter(|entry| entry.key() >= start && entry.key() < end)
208 .map(|entry| (entry.key().clone(), entry.value().clone()))
209 .collect();
210
211 results.sort_by(|a, b| a.0.cmp(&b.0));
212 Ok(results)
213 }
214
215 async fn keys(&self) -> Result<Vec<Key>> {
216 let mut keys: Vec<_> = self.data.iter().map(|entry| entry.key().clone()).collect();
217 keys.sort();
218 Ok(keys)
219 }
220
221 async fn flush(&self) -> Result<()> {
222 Ok(())
224 }
225
226 async fn close(&self) -> Result<()> {
227 Ok(())
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[tokio::test]
237 async fn test_memory_storage_basic() -> Result<()> {
238 let storage = MemoryStorage::new();
239 let key = Key::from_str("test_key");
240 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
241
242 storage.put(&key, &value).await?;
244
245 let retrieved = storage.get(&key).await?;
247 assert_eq!(retrieved, Some(value.clone()));
248
249 storage.delete(&key).await?;
251 let retrieved = storage.get(&key).await?;
252 assert_eq!(retrieved, None);
253
254 Ok(())
255 }
256
257 #[tokio::test]
258 async fn test_memory_storage_range() -> Result<()> {
259 let storage = MemoryStorage::new();
260
261 for i in 0..10 {
263 let key = Key::from_str(&format!("key_{:03}", i));
264 let value = CipherBlob::new(vec![i as u8]);
265 storage.put(&key, &value).await?;
266 }
267
268 let start = Key::from_str("key_003");
270 let end = Key::from_str("key_007");
271 let results = storage.range(&start, &end).await?;
272
273 assert_eq!(results.len(), 4); assert_eq!(results[0].0, Key::from_str("key_003"));
275 assert_eq!(results[3].0, Key::from_str("key_006"));
276
277 Ok(())
278 }
279
280 #[tokio::test]
281 async fn test_memory_storage_atomic_update() -> Result<()> {
282 let storage = MemoryStorage::new();
283 let key = Key::from_str("counter");
284 let initial = CipherBlob::new(vec![0]);
285
286 storage.put(&key, &initial).await?;
287
288 storage
290 .atomic_update(&key, |old| {
291 let mut data = old.to_vec();
292 if !data.is_empty() {
293 data[0] += 1;
294 }
295 Ok(CipherBlob::new(data))
296 })
297 .await?;
298
299 let result = storage.get(&key).await?;
300 assert_eq!(result.expect("Value should exist").as_bytes()[0], 1);
301
302 Ok(())
303 }
304
305 #[derive(Debug)]
310 struct MemTestExtractor;
311
312 impl IndexExtractor for MemTestExtractor {
313 fn extract(&self, _key: &Key, value: &CipherBlob) -> Vec<IndexedField> {
314 vec![IndexedField {
315 collection: "mem_col".to_string(),
316 field_name: "payload".to_string(),
317 value: value.as_bytes().to_vec(),
318 }]
319 }
320 }
321
322 fn make_indexed_memory_storage() -> Result<MemoryStorage> {
323 let mgr = Arc::new(IndexManager::new());
324 mgr.create_index(IndexConfig {
325 name: "idx_mem_col_payload".to_string(),
326 collection: "mem_col".to_string(),
327 field_name: "payload".to_string(),
328 index_type: crate::storage::secondary_index::IndexType::BTree,
329 unique: false,
330 })?;
331
332 let storage = MemoryStorage::new()
333 .with_index_manager(mgr)
334 .with_index_extractor(Arc::new(MemTestExtractor));
335
336 Ok(storage)
337 }
338
339 fn mem_lookup_count(storage: &MemoryStorage, value: &[u8]) -> usize {
340 storage
341 .index_manager()
342 .and_then(|m| m.with_index("idx_mem_col_payload", |idx| idx.lookup(value).len()))
343 .unwrap_or(0)
344 }
345
346 #[tokio::test]
347 async fn test_memory_auto_index_on_put() -> Result<()> {
348 let storage = make_indexed_memory_storage()?;
349
350 let key = Key::from_str("mem_rec_1");
351 storage
352 .put(&key, &CipherBlob::new(b"charlie".to_vec()))
353 .await?;
354
355 assert_eq!(
356 mem_lookup_count(&storage, b"charlie"),
357 1,
358 "index should contain one entry after put"
359 );
360 Ok(())
361 }
362
363 #[tokio::test]
364 async fn test_memory_auto_index_on_delete() -> Result<()> {
365 let storage = make_indexed_memory_storage()?;
366
367 let key = Key::from_str("mem_rec_2");
368 storage
369 .put(&key, &CipherBlob::new(b"dave".to_vec()))
370 .await?;
371 assert_eq!(mem_lookup_count(&storage, b"dave"), 1);
372
373 storage.delete(&key).await?;
374
375 assert_eq!(
376 mem_lookup_count(&storage, b"dave"),
377 0,
378 "index entry should be removed after delete"
379 );
380 Ok(())
381 }
382
383 #[tokio::test]
384 async fn test_memory_auto_index_on_overwrite() -> Result<()> {
385 let storage = make_indexed_memory_storage()?;
386
387 let key = Key::from_str("mem_rec_3");
388 storage.put(&key, &CipherBlob::new(b"eve".to_vec())).await?;
389 assert_eq!(mem_lookup_count(&storage, b"eve"), 1);
390
391 storage
392 .put(&key, &CipherBlob::new(b"frank".to_vec()))
393 .await?;
394
395 assert_eq!(
396 mem_lookup_count(&storage, b"eve"),
397 0,
398 "old value entry should be gone after overwrite"
399 );
400 assert_eq!(
401 mem_lookup_count(&storage, b"frank"),
402 1,
403 "new value entry should be present after overwrite"
404 );
405 Ok(())
406 }
407}