mappy_core/storage/
memory.rs1use super::{Storage, StorageConfig, StorageStats};
6use crate::{MapletError, MapletResult};
7use async_trait::async_trait;
8use dashmap::DashMap;
9use std::sync::Arc;
10use std::time::Instant;
11use tokio::sync::RwLock;
12
13pub struct MemoryStorage {
15 data: Arc<DashMap<String, Vec<u8>>>,
17 config: StorageConfig,
19 stats: Arc<RwLock<StorageStats>>,
21 #[allow(dead_code)]
23 start_time: Instant,
24}
25
26impl MemoryStorage {
27 pub fn new(config: StorageConfig) -> MapletResult<Self> {
29 Ok(Self {
30 data: Arc::new(DashMap::new()),
31 config,
32 stats: Arc::new(RwLock::new(StorageStats::default())),
33 start_time: Instant::now(),
34 })
35 }
36
37 async fn update_stats<F>(&self, f: F)
39 where
40 F: FnOnce(&mut StorageStats),
41 {
42 let mut stats = self.stats.write().await;
43 f(&mut stats);
44 }
45
46 fn calculate_memory_usage(&self) -> u64 {
48 let mut total = 0;
49 for entry in self.data.iter() {
50 total += entry.key().len() + entry.value().len();
51 }
52 total as u64
53 }
54}
55
56#[async_trait]
57impl Storage for MemoryStorage {
58 async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
59 let start = Instant::now();
60 let result = self.data.get(key).map(|entry| entry.value().clone());
61 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
62
63 self.update_stats(|stats| {
64 stats.operations_count += 1;
65 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
66 })
67 .await;
68
69 Ok(result)
70 }
71
72 async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
73 let start = Instant::now();
74
75 if let Some(max_memory) = self.config.max_memory {
77 let current_usage = self.calculate_memory_usage();
78 if current_usage + key.len() as u64 + value.len() as u64 > max_memory {
79 return Err(MapletError::Internal("Memory limit exceeded".to_string()));
80 }
81 }
82
83 self.data.insert(key.clone(), value);
84 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
85
86 self.update_stats(|stats| {
87 stats.operations_count += 1;
88 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
89 stats.total_keys = self.data.len() as u64;
90 stats.memory_usage = self.calculate_memory_usage();
91 })
92 .await;
93
94 Ok(())
95 }
96
97 async fn delete(&self, key: &str) -> MapletResult<bool> {
98 let start = Instant::now();
99 let existed = self.data.remove(key).is_some();
100 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
101
102 self.update_stats(|stats| {
103 stats.operations_count += 1;
104 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
105 stats.total_keys = self.data.len() as u64;
106 stats.memory_usage = self.calculate_memory_usage();
107 })
108 .await;
109
110 Ok(existed)
111 }
112
113 async fn exists(&self, key: &str) -> MapletResult<bool> {
114 let start = Instant::now();
115 let exists = self.data.contains_key(key);
116 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
117
118 self.update_stats(|stats| {
119 stats.operations_count += 1;
120 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
121 })
122 .await;
123
124 Ok(exists)
125 }
126
127 async fn keys(&self) -> MapletResult<Vec<String>> {
128 let start = Instant::now();
129 let keys: Vec<String> = self.data.iter().map(|entry| entry.key().clone()).collect();
130 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
131
132 self.update_stats(|stats| {
133 stats.operations_count += 1;
134 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
135 })
136 .await;
137
138 Ok(keys)
139 }
140
141 async fn clear_database(&self) -> MapletResult<()> {
142 let start = Instant::now();
143 self.data.clear();
144 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
145
146 self.update_stats(|stats| {
147 stats.operations_count += 1;
148 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
149 stats.total_keys = 0;
150 stats.memory_usage = 0;
151 })
152 .await;
153
154 Ok(())
155 }
156
157 async fn flush(&self) -> MapletResult<()> {
158 Ok(())
160 }
161
162 async fn close(&self) -> MapletResult<()> {
163 Ok(())
165 }
166
167 async fn stats(&self) -> MapletResult<StorageStats> {
168 let stats = self.stats.read().await;
169 Ok(stats.clone())
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176
177 #[tokio::test]
178 async fn test_memory_storage_basic_operations() {
179 let config = StorageConfig::default();
180 let storage = MemoryStorage::new(config).unwrap();
181
182 storage
184 .set("key1".to_string(), b"value1".to_vec())
185 .await
186 .unwrap();
187 let value = storage.get("key1").await.unwrap();
188 assert_eq!(value, Some(b"value1".to_vec()));
189
190 assert!(storage.exists("key1").await.unwrap());
192 assert!(!storage.exists("key2").await.unwrap());
193
194 let deleted = storage.delete("key1").await.unwrap();
196 assert!(deleted);
197 assert!(!storage.exists("key1").await.unwrap());
198 }
199
200 #[tokio::test]
201 async fn test_memory_storage_keys() {
202 let config = StorageConfig::default();
203 let storage = MemoryStorage::new(config).unwrap();
204
205 storage
206 .set("key1".to_string(), b"value1".to_vec())
207 .await
208 .unwrap();
209 storage
210 .set("key2".to_string(), b"value2".to_vec())
211 .await
212 .unwrap();
213
214 let keys = storage.keys().await.unwrap();
215 assert_eq!(keys.len(), 2);
216 assert!(keys.contains(&"key1".to_string()));
217 assert!(keys.contains(&"key2".to_string()));
218 }
219
220 #[tokio::test]
221 async fn test_memory_storage_clear() {
222 let config = StorageConfig::default();
223 let storage = MemoryStorage::new(config).unwrap();
224
225 storage
226 .set("key1".to_string(), b"value1".to_vec())
227 .await
228 .unwrap();
229 storage
230 .set("key2".to_string(), b"value2".to_vec())
231 .await
232 .unwrap();
233
234 storage.clear_database().await.unwrap();
235
236 let keys = storage.keys().await.unwrap();
237 assert_eq!(keys.len(), 0);
238 }
239
240 #[tokio::test]
241 async fn test_memory_storage_stats() {
242 let config = StorageConfig::default();
243 let storage = MemoryStorage::new(config).unwrap();
244
245 storage
246 .set("key1".to_string(), b"value1".to_vec())
247 .await
248 .unwrap();
249
250 let stats = storage.stats().await.unwrap();
251 assert_eq!(stats.total_keys, 1);
252 assert!(stats.memory_usage > 0);
253 assert!(stats.operations_count > 0);
254 }
255}