amaters_core/storage/
memory.rs1use crate::error::{AmateRSError, ErrorContext, Result};
7use crate::traits::StorageEngine;
8use crate::types::{CipherBlob, Key};
9use async_trait::async_trait;
10use dashmap::DashMap;
11use std::sync::Arc;
12
13#[derive(Debug, Clone)]
15pub struct MemoryStorage {
16 data: Arc<DashMap<Key, CipherBlob>>,
17}
18
19impl MemoryStorage {
20 pub fn new() -> Self {
22 Self {
23 data: Arc::new(DashMap::new()),
24 }
25 }
26
27 pub fn len(&self) -> usize {
29 self.data.len()
30 }
31
32 pub fn is_empty(&self) -> bool {
34 self.data.is_empty()
35 }
36
37 pub fn clear(&self) {
39 self.data.clear();
40 }
41}
42
43impl Default for MemoryStorage {
44 fn default() -> Self {
45 Self::new()
46 }
47}
48
49#[async_trait]
50impl StorageEngine for MemoryStorage {
51 async fn put(&self, key: &Key, value: &CipherBlob) -> Result<()> {
52 value.verify_integrity()?;
54 self.data.insert(key.clone(), value.clone());
55 Ok(())
56 }
57
58 async fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
59 Ok(self.data.get(key).map(|v| v.clone()))
60 }
61
62 async fn atomic_update<F>(&self, key: &Key, f: F) -> Result<()>
63 where
64 F: Fn(&CipherBlob) -> Result<CipherBlob> + Send + Sync,
65 {
66 let mut entry = self.data.entry(key.clone()).or_insert_with(|| {
68 CipherBlob::new(Vec::new())
70 });
71
72 let old_value = entry.value().clone();
73 let new_value = f(&old_value)?;
74 new_value.verify_integrity()?;
75 *entry = new_value;
76
77 Ok(())
78 }
79
80 async fn delete(&self, key: &Key) -> Result<()> {
81 self.data.remove(key);
82 Ok(())
83 }
84
85 async fn range(&self, start: &Key, end: &Key) -> Result<Vec<(Key, CipherBlob)>> {
86 let mut results: Vec<_> = self
87 .data
88 .iter()
89 .filter(|entry| entry.key() >= start && entry.key() < end)
90 .map(|entry| (entry.key().clone(), entry.value().clone()))
91 .collect();
92
93 results.sort_by(|a, b| a.0.cmp(&b.0));
94 Ok(results)
95 }
96
97 async fn keys(&self) -> Result<Vec<Key>> {
98 let mut keys: Vec<_> = self.data.iter().map(|entry| entry.key().clone()).collect();
99 keys.sort();
100 Ok(keys)
101 }
102
103 async fn flush(&self) -> Result<()> {
104 Ok(())
106 }
107
108 async fn close(&self) -> Result<()> {
109 Ok(())
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117
118 #[tokio::test]
119 async fn test_memory_storage_basic() -> Result<()> {
120 let storage = MemoryStorage::new();
121 let key = Key::from_str("test_key");
122 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
123
124 storage.put(&key, &value).await?;
126
127 let retrieved = storage.get(&key).await?;
129 assert_eq!(retrieved, Some(value.clone()));
130
131 storage.delete(&key).await?;
133 let retrieved = storage.get(&key).await?;
134 assert_eq!(retrieved, None);
135
136 Ok(())
137 }
138
139 #[tokio::test]
140 async fn test_memory_storage_range() -> Result<()> {
141 let storage = MemoryStorage::new();
142
143 for i in 0..10 {
145 let key = Key::from_str(&format!("key_{:03}", i));
146 let value = CipherBlob::new(vec![i as u8]);
147 storage.put(&key, &value).await?;
148 }
149
150 let start = Key::from_str("key_003");
152 let end = Key::from_str("key_007");
153 let results = storage.range(&start, &end).await?;
154
155 assert_eq!(results.len(), 4); assert_eq!(results[0].0, Key::from_str("key_003"));
157 assert_eq!(results[3].0, Key::from_str("key_006"));
158
159 Ok(())
160 }
161
162 #[tokio::test]
163 async fn test_memory_storage_atomic_update() -> Result<()> {
164 let storage = MemoryStorage::new();
165 let key = Key::from_str("counter");
166 let initial = CipherBlob::new(vec![0]);
167
168 storage.put(&key, &initial).await?;
169
170 storage
172 .atomic_update(&key, |old| {
173 let mut data = old.to_vec();
174 if !data.is_empty() {
175 data[0] += 1;
176 }
177 Ok(CipherBlob::new(data))
178 })
179 .await?;
180
181 let result = storage.get(&key).await?;
182 assert_eq!(result.expect("Value should exist").as_bytes()[0], 1);
183
184 Ok(())
185 }
186}