mappy_core/storage/
memory.rs

1//! In-memory storage backend
2//! 
3//! Uses `DashMap` for high-performance concurrent access.
4
5use super::{Storage, StorageStats, StorageConfig};
6use crate::{MapletError, MapletResult};
7use dashmap::DashMap;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use std::time::Instant;
11use async_trait::async_trait;
12
13/// In-memory storage using `DashMap`
14pub struct MemoryStorage {
15    /// The actual storage
16    data: Arc<DashMap<String, Vec<u8>>>,
17    /// Configuration
18    config: StorageConfig,
19    /// Statistics
20    stats: Arc<RwLock<StorageStats>>,
21    /// Start time for latency calculation
22    #[allow(dead_code)]
23    start_time: Instant,
24}
25
26impl MemoryStorage {
27    /// Create a new memory storage
28    pub fn new(config: StorageConfig) -> MapletResult<Self> {
29        Ok(Self {
30            data: Arc::new(DashMap::new()),
31            config,
32            stats: Arc::new(RwLock::new(StorageStats::default())),
33            start_time: Instant::now(),
34        })
35    }
36    
37    /// Update statistics
38    async fn update_stats<F>(&self, f: F) 
39    where
40        F: FnOnce(&mut StorageStats),
41    {
42        let mut stats = self.stats.write().await;
43        f(&mut stats);
44    }
45    
46    /// Calculate memory usage
47    fn calculate_memory_usage(&self) -> u64 {
48        let mut total = 0;
49        for entry in self.data.iter() {
50            total += entry.key().len() + entry.value().len();
51        }
52        total as u64
53    }
54}
55
56#[async_trait]
57impl Storage for MemoryStorage {
58    async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
59        let start = Instant::now();
60        let result = self.data.get(key).map(|entry| entry.value().clone());
61        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
62        
63        self.update_stats(|stats| {
64            stats.operations_count += 1;
65            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
66        }).await;
67        
68        Ok(result)
69    }
70    
71    async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
72        let start = Instant::now();
73        
74        // Check memory limits
75        if let Some(max_memory) = self.config.max_memory {
76            let current_usage = self.calculate_memory_usage();
77            if current_usage + key.len() as u64 + value.len() as u64 > max_memory {
78                return Err(MapletError::Internal("Memory limit exceeded".to_string()));
79            }
80        }
81        
82        self.data.insert(key.clone(), value);
83        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
84        
85        self.update_stats(|stats| {
86            stats.operations_count += 1;
87            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
88            stats.total_keys = self.data.len() as u64;
89            stats.memory_usage = self.calculate_memory_usage();
90        }).await;
91        
92        Ok(())
93    }
94    
95    async fn delete(&self, key: &str) -> MapletResult<bool> {
96        let start = Instant::now();
97        let existed = self.data.remove(key).is_some();
98        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
99        
100        self.update_stats(|stats| {
101            stats.operations_count += 1;
102            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
103            stats.total_keys = self.data.len() as u64;
104            stats.memory_usage = self.calculate_memory_usage();
105        }).await;
106        
107        Ok(existed)
108    }
109    
110    async fn exists(&self, key: &str) -> MapletResult<bool> {
111        let start = Instant::now();
112        let exists = self.data.contains_key(key);
113        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
114        
115        self.update_stats(|stats| {
116            stats.operations_count += 1;
117            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
118        }).await;
119        
120        Ok(exists)
121    }
122    
123    async fn keys(&self) -> MapletResult<Vec<String>> {
124        let start = Instant::now();
125        let keys: Vec<String> = self.data.iter().map(|entry| entry.key().clone()).collect();
126        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
127        
128        self.update_stats(|stats| {
129            stats.operations_count += 1;
130            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
131        }).await;
132        
133        Ok(keys)
134    }
135    
136    async fn clear_database(&self) -> MapletResult<()> {
137        let start = Instant::now();
138        self.data.clear();
139        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
140        
141        self.update_stats(|stats| {
142            stats.operations_count += 1;
143            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
144            stats.total_keys = 0;
145            stats.memory_usage = 0;
146        }).await;
147        
148        Ok(())
149    }
150    
151    async fn flush(&self) -> MapletResult<()> {
152        // Memory storage doesn't need flushing
153        Ok(())
154    }
155    
156    async fn close(&self) -> MapletResult<()> {
157        // Memory storage doesn't need explicit closing
158        Ok(())
159    }
160    
161    async fn stats(&self) -> MapletResult<StorageStats> {
162        let stats = self.stats.read().await;
163        Ok(stats.clone())
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    #[tokio::test]
172    async fn test_memory_storage_basic_operations() {
173        let config = StorageConfig::default();
174        let storage = MemoryStorage::new(config).unwrap();
175        
176        // Test set and get
177        storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
178        let value = storage.get("key1").await.unwrap();
179        assert_eq!(value, Some(b"value1".to_vec()));
180        
181        // Test exists
182        assert!(storage.exists("key1").await.unwrap());
183        assert!(!storage.exists("key2").await.unwrap());
184        
185        // Test delete
186        let deleted = storage.delete("key1").await.unwrap();
187        assert!(deleted);
188        assert!(!storage.exists("key1").await.unwrap());
189    }
190    
191    #[tokio::test]
192    async fn test_memory_storage_keys() {
193        let config = StorageConfig::default();
194        let storage = MemoryStorage::new(config).unwrap();
195        
196        storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
197        storage.set("key2".to_string(), b"value2".to_vec()).await.unwrap();
198        
199        let keys = storage.keys().await.unwrap();
200        assert_eq!(keys.len(), 2);
201        assert!(keys.contains(&"key1".to_string()));
202        assert!(keys.contains(&"key2".to_string()));
203    }
204    
205    #[tokio::test]
206    async fn test_memory_storage_clear() {
207        let config = StorageConfig::default();
208        let storage = MemoryStorage::new(config).unwrap();
209        
210        storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
211        storage.set("key2".to_string(), b"value2".to_vec()).await.unwrap();
212        
213        storage.clear_database().await.unwrap();
214        
215        let keys = storage.keys().await.unwrap();
216        assert_eq!(keys.len(), 0);
217    }
218    
219    #[tokio::test]
220    async fn test_memory_storage_stats() {
221        let config = StorageConfig::default();
222        let storage = MemoryStorage::new(config).unwrap();
223        
224        storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
225        
226        let stats = storage.stats().await.unwrap();
227        assert_eq!(stats.total_keys, 1);
228        assert!(stats.memory_usage > 0);
229        assert!(stats.operations_count > 0);
230    }
231}