Skip to main content

amaters_core/storage/
memory.rs

1//! In-memory storage implementation for MVP
2//!
3//! This is a simple in-memory storage engine for testing and development.
4//! Not suitable for production use (no persistence).
5
6use crate::error::{AmateRSError, ErrorContext, Result};
7use crate::storage::secondary_index::{IndexConfig, IndexExtractor, IndexManager, IndexedField};
8use crate::traits::StorageEngine;
9use crate::types::{CipherBlob, Key};
10use async_trait::async_trait;
11use dashmap::DashMap;
12use std::sync::Arc;
13use tokio::sync::Mutex;
14
15/// In-memory storage engine backed by DashMap
16#[derive(Clone)]
17pub struct MemoryStorage {
18    data: Arc<DashMap<Key, CipherBlob>>,
19    /// Optional secondary index manager for automatic index maintenance.
20    index_manager: Option<Arc<IndexManager>>,
21    /// Optional extractor used to derive indexed fields from stored records.
22    index_extractor: Option<Arc<dyn IndexExtractor>>,
23    /// Serialises the read-modify-write portion of put/delete when indexing is enabled.
24    index_write_lock: Arc<Mutex<()>>,
25}
26
27impl std::fmt::Debug for MemoryStorage {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        f.debug_struct("MemoryStorage")
30            .field("len", &self.data.len())
31            .field("has_index_manager", &self.index_manager.is_some())
32            .field("has_index_extractor", &self.index_extractor.is_some())
33            .finish()
34    }
35}
36
37impl MemoryStorage {
38    /// Create a new in-memory storage
39    pub fn new() -> Self {
40        Self {
41            data: Arc::new(DashMap::new()),
42            index_manager: None,
43            index_extractor: None,
44            index_write_lock: Arc::new(Mutex::new(())),
45        }
46    }
47
48    /// Attach a secondary index manager for automatic index maintenance.
49    pub fn with_index_manager(mut self, manager: Arc<IndexManager>) -> Self {
50        self.index_manager = Some(manager);
51        self
52    }
53
54    /// Attach an index extractor for automatic index maintenance.
55    pub fn with_index_extractor(mut self, extractor: Arc<dyn IndexExtractor>) -> Self {
56        self.index_extractor = Some(extractor);
57        self
58    }
59
60    /// Register a secondary index definition.
61    ///
62    /// Requires an attached index manager (see [`Self::with_index_manager`]).
63    pub fn register_index(&self, config: IndexConfig) -> Result<()> {
64        self.index_manager
65            .as_ref()
66            .ok_or_else(|| {
67                AmateRSError::ValidationError(ErrorContext::new(
68                    "No index manager attached; call with_index_manager() first",
69                ))
70            })
71            .and_then(|m| m.create_index(config))
72    }
73
74    /// Access the attached index manager for queries.
75    pub fn index_manager(&self) -> Option<&Arc<IndexManager>> {
76        self.index_manager.as_ref()
77    }
78
79    /// Validate unique constraints before writing `new_fields` for `key`.
80    ///
81    /// Delegates to [`IndexManager::check_unique_for_fields`].
82    fn validate_unique_constraints_mem(
83        mgr: &IndexManager,
84        key: &Key,
85        new_fields: &[IndexedField],
86    ) -> Result<()> {
87        mgr.check_unique_for_fields(key, new_fields)
88    }
89
90    /// Get the number of entries
91    pub fn len(&self) -> usize {
92        self.data.len()
93    }
94
95    /// Check if empty
96    pub fn is_empty(&self) -> bool {
97        self.data.is_empty()
98    }
99
100    /// Clear all data
101    pub fn clear(&self) {
102        self.data.clear();
103    }
104}
105
106impl Default for MemoryStorage {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112#[async_trait]
113impl StorageEngine for MemoryStorage {
114    async fn put(&self, key: &Key, value: &CipherBlob) -> Result<()> {
115        // Integrity check always happens first.
116        value.verify_integrity()?;
117
118        if let (Some(mgr), Some(ext)) = (&self.index_manager, &self.index_extractor) {
119            // Serialise the read-old / write / update-index sequence.
120            let _guard = self.index_write_lock.lock().await;
121
122            let old_fields = match self.data.get(key) {
123                Some(old_blob) => ext.extract(key, old_blob.value()),
124                None => Vec::new(),
125            };
126            let new_fields = ext.extract(key, value);
127
128            // Unique constraint pre-flight check.
129            Self::validate_unique_constraints_mem(mgr, key, &new_fields)?;
130
131            // Write.
132            self.data.insert(key.clone(), value.clone());
133
134            // Update indexes.
135            mgr.apply_extracted(key, &old_fields, &new_fields)?;
136        } else {
137            self.data.insert(key.clone(), value.clone());
138        }
139
140        Ok(())
141    }
142
143    async fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
144        Ok(self.data.get(key).map(|v| v.clone()))
145    }
146
147    async fn atomic_update<F>(&self, key: &Key, f: F) -> Result<()>
148    where
149        F: Fn(&CipherBlob) -> Result<CipherBlob> + Send + Sync,
150    {
151        if let (Some(mgr), Some(ext)) = (&self.index_manager, &self.index_extractor) {
152            let _guard = self.index_write_lock.lock().await;
153
154            let old_value = self
155                .data
156                .get(key)
157                .map(|v| v.clone())
158                .unwrap_or_else(|| CipherBlob::new(Vec::new()));
159
160            let new_value = f(&old_value)?;
161            new_value.verify_integrity()?;
162
163            let old_fields = ext.extract(key, &old_value);
164            let new_fields = ext.extract(key, &new_value);
165            Self::validate_unique_constraints_mem(mgr, key, &new_fields)?;
166
167            self.data.insert(key.clone(), new_value);
168            mgr.apply_extracted(key, &old_fields, &new_fields)?;
169        } else {
170            // DashMap provides interior mutability for the simple case.
171            let mut entry = self
172                .data
173                .entry(key.clone())
174                .or_insert_with(|| CipherBlob::new(Vec::new()));
175
176            let old_value = entry.value().clone();
177            let new_value = f(&old_value)?;
178            new_value.verify_integrity()?;
179            *entry = new_value;
180        }
181
182        Ok(())
183    }
184
185    async fn delete(&self, key: &Key) -> Result<()> {
186        if let (Some(mgr), Some(ext)) = (&self.index_manager, &self.index_extractor) {
187            let _guard = self.index_write_lock.lock().await;
188
189            let old_fields = match self.data.get(key) {
190                Some(old_blob) => ext.extract(key, old_blob.value()),
191                None => Vec::new(),
192            };
193
194            self.data.remove(key);
195            mgr.apply_extracted(key, &old_fields, &[])?;
196        } else {
197            self.data.remove(key);
198        }
199
200        Ok(())
201    }
202
203    async fn range(&self, start: &Key, end: &Key) -> Result<Vec<(Key, CipherBlob)>> {
204        let mut results: Vec<_> = self
205            .data
206            .iter()
207            .filter(|entry| entry.key() >= start && entry.key() < end)
208            .map(|entry| (entry.key().clone(), entry.value().clone()))
209            .collect();
210
211        results.sort_by(|a, b| a.0.cmp(&b.0));
212        Ok(results)
213    }
214
215    async fn keys(&self) -> Result<Vec<Key>> {
216        let mut keys: Vec<_> = self.data.iter().map(|entry| entry.key().clone()).collect();
217        keys.sort();
218        Ok(keys)
219    }
220
221    async fn flush(&self) -> Result<()> {
222        // No-op for in-memory storage
223        Ok(())
224    }
225
226    async fn close(&self) -> Result<()> {
227        // No-op for in-memory storage
228        Ok(())
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[tokio::test]
237    async fn test_memory_storage_basic() -> Result<()> {
238        let storage = MemoryStorage::new();
239        let key = Key::from_str("test_key");
240        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
241
242        // Put
243        storage.put(&key, &value).await?;
244
245        // Get
246        let retrieved = storage.get(&key).await?;
247        assert_eq!(retrieved, Some(value.clone()));
248
249        // Delete
250        storage.delete(&key).await?;
251        let retrieved = storage.get(&key).await?;
252        assert_eq!(retrieved, None);
253
254        Ok(())
255    }
256
257    #[tokio::test]
258    async fn test_memory_storage_range() -> Result<()> {
259        let storage = MemoryStorage::new();
260
261        // Insert keys
262        for i in 0..10 {
263            let key = Key::from_str(&format!("key_{:03}", i));
264            let value = CipherBlob::new(vec![i as u8]);
265            storage.put(&key, &value).await?;
266        }
267
268        // Range scan
269        let start = Key::from_str("key_003");
270        let end = Key::from_str("key_007");
271        let results = storage.range(&start, &end).await?;
272
273        assert_eq!(results.len(), 4); // 3, 4, 5, 6
274        assert_eq!(results[0].0, Key::from_str("key_003"));
275        assert_eq!(results[3].0, Key::from_str("key_006"));
276
277        Ok(())
278    }
279
280    #[tokio::test]
281    async fn test_memory_storage_atomic_update() -> Result<()> {
282        let storage = MemoryStorage::new();
283        let key = Key::from_str("counter");
284        let initial = CipherBlob::new(vec![0]);
285
286        storage.put(&key, &initial).await?;
287
288        // Atomic increment
289        storage
290            .atomic_update(&key, |old| {
291                let mut data = old.to_vec();
292                if !data.is_empty() {
293                    data[0] += 1;
294                }
295                Ok(CipherBlob::new(data))
296            })
297            .await?;
298
299        let result = storage.get(&key).await?;
300        assert_eq!(result.expect("Value should exist").as_bytes()[0], 1);
301
302        Ok(())
303    }
304
305    // -------------------------------------------------------------------------
306    // Index integration tests
307    // -------------------------------------------------------------------------
308
309    #[derive(Debug)]
310    struct MemTestExtractor;
311
312    impl IndexExtractor for MemTestExtractor {
313        fn extract(&self, _key: &Key, value: &CipherBlob) -> Vec<IndexedField> {
314            vec![IndexedField {
315                collection: "mem_col".to_string(),
316                field_name: "payload".to_string(),
317                value: value.as_bytes().to_vec(),
318            }]
319        }
320    }
321
322    fn make_indexed_memory_storage() -> Result<MemoryStorage> {
323        let mgr = Arc::new(IndexManager::new());
324        mgr.create_index(IndexConfig {
325            name: "idx_mem_col_payload".to_string(),
326            collection: "mem_col".to_string(),
327            field_name: "payload".to_string(),
328            index_type: crate::storage::secondary_index::IndexType::BTree,
329            unique: false,
330        })?;
331
332        let storage = MemoryStorage::new()
333            .with_index_manager(mgr)
334            .with_index_extractor(Arc::new(MemTestExtractor));
335
336        Ok(storage)
337    }
338
339    fn mem_lookup_count(storage: &MemoryStorage, value: &[u8]) -> usize {
340        storage
341            .index_manager()
342            .and_then(|m| m.with_index("idx_mem_col_payload", |idx| idx.lookup(value).len()))
343            .unwrap_or(0)
344    }
345
346    #[tokio::test]
347    async fn test_memory_auto_index_on_put() -> Result<()> {
348        let storage = make_indexed_memory_storage()?;
349
350        let key = Key::from_str("mem_rec_1");
351        storage
352            .put(&key, &CipherBlob::new(b"charlie".to_vec()))
353            .await?;
354
355        assert_eq!(
356            mem_lookup_count(&storage, b"charlie"),
357            1,
358            "index should contain one entry after put"
359        );
360        Ok(())
361    }
362
363    #[tokio::test]
364    async fn test_memory_auto_index_on_delete() -> Result<()> {
365        let storage = make_indexed_memory_storage()?;
366
367        let key = Key::from_str("mem_rec_2");
368        storage
369            .put(&key, &CipherBlob::new(b"dave".to_vec()))
370            .await?;
371        assert_eq!(mem_lookup_count(&storage, b"dave"), 1);
372
373        storage.delete(&key).await?;
374
375        assert_eq!(
376            mem_lookup_count(&storage, b"dave"),
377            0,
378            "index entry should be removed after delete"
379        );
380        Ok(())
381    }
382
383    #[tokio::test]
384    async fn test_memory_auto_index_on_overwrite() -> Result<()> {
385        let storage = make_indexed_memory_storage()?;
386
387        let key = Key::from_str("mem_rec_3");
388        storage.put(&key, &CipherBlob::new(b"eve".to_vec())).await?;
389        assert_eq!(mem_lookup_count(&storage, b"eve"), 1);
390
391        storage
392            .put(&key, &CipherBlob::new(b"frank".to_vec()))
393            .await?;
394
395        assert_eq!(
396            mem_lookup_count(&storage, b"eve"),
397            0,
398            "old value entry should be gone after overwrite"
399        );
400        assert_eq!(
401            mem_lookup_count(&storage, b"frank"),
402            1,
403            "new value entry should be present after overwrite"
404        );
405        Ok(())
406    }
407}