mappy_core/storage/
memory.rs

1//! In-memory storage backend
2//!
3//! Uses `DashMap` for high-performance concurrent access.
4
5use super::{Storage, StorageConfig, StorageStats};
6use crate::{MapletError, MapletResult};
7use async_trait::async_trait;
8use dashmap::DashMap;
9use std::sync::Arc;
10use std::time::Instant;
11use tokio::sync::RwLock;
12
13/// In-memory storage using `DashMap`
14pub struct MemoryStorage {
15    /// The actual storage
16    data: Arc<DashMap<String, Vec<u8>>>,
17    /// Configuration
18    config: StorageConfig,
19    /// Statistics
20    stats: Arc<RwLock<StorageStats>>,
21    /// Start time for latency calculation
22    #[allow(dead_code)]
23    start_time: Instant,
24}
25
26impl MemoryStorage {
27    /// Create a new memory storage
28    pub fn new(config: StorageConfig) -> MapletResult<Self> {
29        Ok(Self {
30            data: Arc::new(DashMap::new()),
31            config,
32            stats: Arc::new(RwLock::new(StorageStats::default())),
33            start_time: Instant::now(),
34        })
35    }
36
37    /// Update statistics
38    async fn update_stats<F>(&self, f: F)
39    where
40        F: FnOnce(&mut StorageStats),
41    {
42        let mut stats = self.stats.write().await;
43        f(&mut stats);
44    }
45
46    /// Calculate memory usage
47    fn calculate_memory_usage(&self) -> u64 {
48        let mut total = 0;
49        for entry in self.data.iter() {
50            total += entry.key().len() + entry.value().len();
51        }
52        total as u64
53    }
54}
55
56#[async_trait]
57impl Storage for MemoryStorage {
58    async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
59        let start = Instant::now();
60        let result = self.data.get(key).map(|entry| entry.value().clone());
61        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
62
63        self.update_stats(|stats| {
64            stats.operations_count += 1;
65            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
66        })
67        .await;
68
69        Ok(result)
70    }
71
72    async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
73        let start = Instant::now();
74
75        // Check memory limits
76        if let Some(max_memory) = self.config.max_memory {
77            let current_usage = self.calculate_memory_usage();
78            if current_usage + key.len() as u64 + value.len() as u64 > max_memory {
79                return Err(MapletError::Internal("Memory limit exceeded".to_string()));
80            }
81        }
82
83        self.data.insert(key.clone(), value);
84        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
85
86        self.update_stats(|stats| {
87            stats.operations_count += 1;
88            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
89            stats.total_keys = self.data.len() as u64;
90            stats.memory_usage = self.calculate_memory_usage();
91        })
92        .await;
93
94        Ok(())
95    }
96
97    async fn delete(&self, key: &str) -> MapletResult<bool> {
98        let start = Instant::now();
99        let existed = self.data.remove(key).is_some();
100        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
101
102        self.update_stats(|stats| {
103            stats.operations_count += 1;
104            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
105            stats.total_keys = self.data.len() as u64;
106            stats.memory_usage = self.calculate_memory_usage();
107        })
108        .await;
109
110        Ok(existed)
111    }
112
113    async fn exists(&self, key: &str) -> MapletResult<bool> {
114        let start = Instant::now();
115        let exists = self.data.contains_key(key);
116        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
117
118        self.update_stats(|stats| {
119            stats.operations_count += 1;
120            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
121        })
122        .await;
123
124        Ok(exists)
125    }
126
127    async fn keys(&self) -> MapletResult<Vec<String>> {
128        let start = Instant::now();
129        let keys: Vec<String> = self.data.iter().map(|entry| entry.key().clone()).collect();
130        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
131
132        self.update_stats(|stats| {
133            stats.operations_count += 1;
134            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
135        })
136        .await;
137
138        Ok(keys)
139    }
140
141    async fn clear_database(&self) -> MapletResult<()> {
142        let start = Instant::now();
143        self.data.clear();
144        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
145
146        self.update_stats(|stats| {
147            stats.operations_count += 1;
148            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
149            stats.total_keys = 0;
150            stats.memory_usage = 0;
151        })
152        .await;
153
154        Ok(())
155    }
156
157    async fn flush(&self) -> MapletResult<()> {
158        // Memory storage doesn't need flushing
159        Ok(())
160    }
161
162    async fn close(&self) -> MapletResult<()> {
163        // Memory storage doesn't need explicit closing
164        Ok(())
165    }
166
167    async fn stats(&self) -> MapletResult<StorageStats> {
168        let stats = self.stats.read().await;
169        Ok(stats.clone())
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176
177    #[tokio::test]
178    async fn test_memory_storage_basic_operations() {
179        let config = StorageConfig::default();
180        let storage = MemoryStorage::new(config).unwrap();
181
182        // Test set and get
183        storage
184            .set("key1".to_string(), b"value1".to_vec())
185            .await
186            .unwrap();
187        let value = storage.get("key1").await.unwrap();
188        assert_eq!(value, Some(b"value1".to_vec()));
189
190        // Test exists
191        assert!(storage.exists("key1").await.unwrap());
192        assert!(!storage.exists("key2").await.unwrap());
193
194        // Test delete
195        let deleted = storage.delete("key1").await.unwrap();
196        assert!(deleted);
197        assert!(!storage.exists("key1").await.unwrap());
198    }
199
200    #[tokio::test]
201    async fn test_memory_storage_keys() {
202        let config = StorageConfig::default();
203        let storage = MemoryStorage::new(config).unwrap();
204
205        storage
206            .set("key1".to_string(), b"value1".to_vec())
207            .await
208            .unwrap();
209        storage
210            .set("key2".to_string(), b"value2".to_vec())
211            .await
212            .unwrap();
213
214        let keys = storage.keys().await.unwrap();
215        assert_eq!(keys.len(), 2);
216        assert!(keys.contains(&"key1".to_string()));
217        assert!(keys.contains(&"key2".to_string()));
218    }
219
220    #[tokio::test]
221    async fn test_memory_storage_clear() {
222        let config = StorageConfig::default();
223        let storage = MemoryStorage::new(config).unwrap();
224
225        storage
226            .set("key1".to_string(), b"value1".to_vec())
227            .await
228            .unwrap();
229        storage
230            .set("key2".to_string(), b"value2".to_vec())
231            .await
232            .unwrap();
233
234        storage.clear_database().await.unwrap();
235
236        let keys = storage.keys().await.unwrap();
237        assert_eq!(keys.len(), 0);
238    }
239
240    #[tokio::test]
241    async fn test_memory_storage_stats() {
242        let config = StorageConfig::default();
243        let storage = MemoryStorage::new(config).unwrap();
244
245        storage
246            .set("key1".to_string(), b"value1".to_vec())
247            .await
248            .unwrap();
249
250        let stats = storage.stats().await.unwrap();
251        assert_eq!(stats.total_keys, 1);
252        assert!(stats.memory_usage > 0);
253        assert!(stats.operations_count > 0);
254    }
255}