mappy_core/storage/
memory.rs1use super::{Storage, StorageStats, StorageConfig};
6use crate::{MapletError, MapletResult};
7use dashmap::DashMap;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use std::time::Instant;
11use async_trait::async_trait;
12
13pub struct MemoryStorage {
15 data: Arc<DashMap<String, Vec<u8>>>,
17 config: StorageConfig,
19 stats: Arc<RwLock<StorageStats>>,
21 #[allow(dead_code)]
23 start_time: Instant,
24}
25
26impl MemoryStorage {
27 pub fn new(config: StorageConfig) -> MapletResult<Self> {
29 Ok(Self {
30 data: Arc::new(DashMap::new()),
31 config,
32 stats: Arc::new(RwLock::new(StorageStats::default())),
33 start_time: Instant::now(),
34 })
35 }
36
37 async fn update_stats<F>(&self, f: F)
39 where
40 F: FnOnce(&mut StorageStats),
41 {
42 let mut stats = self.stats.write().await;
43 f(&mut stats);
44 }
45
46 fn calculate_memory_usage(&self) -> u64 {
48 let mut total = 0;
49 for entry in self.data.iter() {
50 total += entry.key().len() + entry.value().len();
51 }
52 total as u64
53 }
54}
55
56#[async_trait]
57impl Storage for MemoryStorage {
58 async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
59 let start = Instant::now();
60 let result = self.data.get(key).map(|entry| entry.value().clone());
61 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
62
63 self.update_stats(|stats| {
64 stats.operations_count += 1;
65 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
66 }).await;
67
68 Ok(result)
69 }
70
71 async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
72 let start = Instant::now();
73
74 if let Some(max_memory) = self.config.max_memory {
76 let current_usage = self.calculate_memory_usage();
77 if current_usage + key.len() as u64 + value.len() as u64 > max_memory {
78 return Err(MapletError::Internal("Memory limit exceeded".to_string()));
79 }
80 }
81
82 self.data.insert(key.clone(), value);
83 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
84
85 self.update_stats(|stats| {
86 stats.operations_count += 1;
87 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
88 stats.total_keys = self.data.len() as u64;
89 stats.memory_usage = self.calculate_memory_usage();
90 }).await;
91
92 Ok(())
93 }
94
95 async fn delete(&self, key: &str) -> MapletResult<bool> {
96 let start = Instant::now();
97 let existed = self.data.remove(key).is_some();
98 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
99
100 self.update_stats(|stats| {
101 stats.operations_count += 1;
102 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
103 stats.total_keys = self.data.len() as u64;
104 stats.memory_usage = self.calculate_memory_usage();
105 }).await;
106
107 Ok(existed)
108 }
109
110 async fn exists(&self, key: &str) -> MapletResult<bool> {
111 let start = Instant::now();
112 let exists = self.data.contains_key(key);
113 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
114
115 self.update_stats(|stats| {
116 stats.operations_count += 1;
117 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
118 }).await;
119
120 Ok(exists)
121 }
122
123 async fn keys(&self) -> MapletResult<Vec<String>> {
124 let start = Instant::now();
125 let keys: Vec<String> = self.data.iter().map(|entry| entry.key().clone()).collect();
126 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
127
128 self.update_stats(|stats| {
129 stats.operations_count += 1;
130 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
131 }).await;
132
133 Ok(keys)
134 }
135
136 async fn clear_database(&self) -> MapletResult<()> {
137 let start = Instant::now();
138 self.data.clear();
139 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
140
141 self.update_stats(|stats| {
142 stats.operations_count += 1;
143 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
144 stats.total_keys = 0;
145 stats.memory_usage = 0;
146 }).await;
147
148 Ok(())
149 }
150
151 async fn flush(&self) -> MapletResult<()> {
152 Ok(())
154 }
155
156 async fn close(&self) -> MapletResult<()> {
157 Ok(())
159 }
160
161 async fn stats(&self) -> MapletResult<StorageStats> {
162 let stats = self.stats.read().await;
163 Ok(stats.clone())
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170
171 #[tokio::test]
172 async fn test_memory_storage_basic_operations() {
173 let config = StorageConfig::default();
174 let storage = MemoryStorage::new(config).unwrap();
175
176 storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
178 let value = storage.get("key1").await.unwrap();
179 assert_eq!(value, Some(b"value1".to_vec()));
180
181 assert!(storage.exists("key1").await.unwrap());
183 assert!(!storage.exists("key2").await.unwrap());
184
185 let deleted = storage.delete("key1").await.unwrap();
187 assert!(deleted);
188 assert!(!storage.exists("key1").await.unwrap());
189 }
190
191 #[tokio::test]
192 async fn test_memory_storage_keys() {
193 let config = StorageConfig::default();
194 let storage = MemoryStorage::new(config).unwrap();
195
196 storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
197 storage.set("key2".to_string(), b"value2".to_vec()).await.unwrap();
198
199 let keys = storage.keys().await.unwrap();
200 assert_eq!(keys.len(), 2);
201 assert!(keys.contains(&"key1".to_string()));
202 assert!(keys.contains(&"key2".to_string()));
203 }
204
205 #[tokio::test]
206 async fn test_memory_storage_clear() {
207 let config = StorageConfig::default();
208 let storage = MemoryStorage::new(config).unwrap();
209
210 storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
211 storage.set("key2".to_string(), b"value2".to_vec()).await.unwrap();
212
213 storage.clear_database().await.unwrap();
214
215 let keys = storage.keys().await.unwrap();
216 assert_eq!(keys.len(), 0);
217 }
218
219 #[tokio::test]
220 async fn test_memory_storage_stats() {
221 let config = StorageConfig::default();
222 let storage = MemoryStorage::new(config).unwrap();
223
224 storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
225
226 let stats = storage.stats().await.unwrap();
227 assert_eq!(stats.total_keys, 1);
228 assert!(stats.memory_usage > 0);
229 assert!(stats.operations_count > 0);
230 }
231}