mappy_core/storage/
disk.rs

1//! Disk-based storage backend
2//! 
3//! Uses Sled for persistent storage with ACID guarantees.
4
5use super::{Storage, StorageStats, StorageConfig};
6use crate::{MapletError, MapletResult};
7use sled::{Db, Tree};
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use std::time::Instant;
11use std::path::Path;
12use async_trait::async_trait;
13
14/// Disk-based storage using Sled
15pub struct DiskStorage {
16    /// The Sled database
17    db: Arc<Db>,
18    /// Main data tree
19    tree: Tree,
20    /// Configuration
21    #[allow(dead_code)]
22    config: StorageConfig,
23    /// Statistics
24    stats: Arc<RwLock<StorageStats>>,
25    /// Start time for latency calculation
26    #[allow(dead_code)]
27    start_time: Instant,
28}
29
30impl DiskStorage {
31    /// Create a new disk storage
32    pub fn new(config: StorageConfig) -> MapletResult<Self> {
33        // Ensure data directory exists
34        std::fs::create_dir_all(&config.data_dir)
35            .map_err(|e| MapletError::Internal(format!("Failed to create data directory: {e}")))?;
36        
37        let db_path = Path::new(&config.data_dir).join("mappy.db");
38        let db = sled::open(&db_path)
39            .map_err(|e| MapletError::Internal(format!("Failed to open database: {e}")))?;
40        
41        let tree = db.open_tree("data")
42            .map_err(|e| MapletError::Internal(format!("Failed to open tree: {e}")))?;
43        
44        Ok(Self {
45            db: Arc::new(db),
46            tree,
47            config,
48            stats: Arc::new(RwLock::new(StorageStats::default())),
49            start_time: Instant::now(),
50        })
51    }
52    
53    /// Update statistics
54    async fn update_stats<F>(&self, f: F) 
55    where
56        F: FnOnce(&mut StorageStats),
57    {
58        let mut stats = self.stats.write().await;
59        f(&mut stats);
60    }
61    
62    /// Calculate disk usage
63    fn calculate_disk_usage(&self) -> u64 {
64        self.db.size_on_disk()
65            .map_err(|_| MapletError::Internal("Failed to get disk usage".to_string()))
66            .unwrap_or(0)
67    }
68}
69
70#[async_trait]
71impl Storage for DiskStorage {
72    async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
73        let start = Instant::now();
74        let result = self.tree.get(key)
75            .map_err(|e| MapletError::Internal(format!("Failed to get key: {e}")))?
76            .map(|ivec| ivec.to_vec());
77        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
78        
79        self.update_stats(|stats| {
80            stats.operations_count += 1;
81            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
82        }).await;
83        
84        Ok(result)
85    }
86    
87    async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
88        let start = Instant::now();
89        
90        self.tree.insert(&key, value)
91            .map_err(|e| MapletError::Internal(format!("Failed to set key: {e}")))?;
92        
93        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
94        
95        self.update_stats(|stats| {
96            stats.operations_count += 1;
97            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
98            stats.total_keys = self.tree.len() as u64;
99            stats.disk_usage = self.calculate_disk_usage();
100        }).await;
101        
102        Ok(())
103    }
104    
105    async fn delete(&self, key: &str) -> MapletResult<bool> {
106        let start = Instant::now();
107        let existed = self.tree.remove(key)
108            .map_err(|e| MapletError::Internal(format!("Failed to delete key: {e}")))?
109            .is_some();
110        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
111        
112        self.update_stats(|stats| {
113            stats.operations_count += 1;
114            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
115            stats.total_keys = self.tree.len() as u64;
116            stats.disk_usage = self.calculate_disk_usage();
117        }).await;
118        
119        Ok(existed)
120    }
121    
122    async fn exists(&self, key: &str) -> MapletResult<bool> {
123        let start = Instant::now();
124        let exists = self.tree.contains_key(key)
125            .map_err(|e| MapletError::Internal(format!("Failed to check key existence: {e}")))?;
126        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
127        
128        self.update_stats(|stats| {
129            stats.operations_count += 1;
130            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
131        }).await;
132        
133        Ok(exists)
134    }
135    
136    async fn keys(&self) -> MapletResult<Vec<String>> {
137        let start = Instant::now();
138        let keys: Result<Vec<String>, _> = self.tree.iter()
139            .map(|result| {
140                let (key, _) = result.map_err(|e| MapletError::Internal(format!("Failed to iterate: {e}")))?;
141                String::from_utf8(key.to_vec())
142                    .map_err(|e| MapletError::Internal(format!("Invalid UTF-8 key: {e}")))
143            })
144            .collect();
145        let keys = keys?;
146        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
147        
148        self.update_stats(|stats| {
149            stats.operations_count += 1;
150            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
151        }).await;
152        
153        Ok(keys)
154    }
155    
156    async fn clear_database(&self) -> MapletResult<()> {
157        let start = Instant::now();
158        self.tree.clear()
159            .map_err(|e| MapletError::Internal(format!("Failed to clear database: {}", e)))?;
160        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
161        
162        self.update_stats(|stats| {
163            stats.operations_count += 1;
164            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
165            stats.total_keys = 0;
166            stats.disk_usage = self.calculate_disk_usage();
167        }).await;
168        
169        Ok(())
170    }
171    
172    async fn flush(&self) -> MapletResult<()> {
173        let start = Instant::now();
174        self.db.flush()
175            .map_err(|e| MapletError::Internal(format!("Failed to flush: {}", e)))?;
176        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
177        
178        self.update_stats(|stats| {
179            stats.operations_count += 1;
180            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
181        }).await;
182        
183        Ok(())
184    }
185    
186    async fn close(&self) -> MapletResult<()> {
187        self.db.flush()
188            .map_err(|e| MapletError::Internal(format!("Failed to flush on close: {}", e)))?;
189        Ok(())
190    }
191    
192    async fn stats(&self) -> MapletResult<StorageStats> {
193        let mut stats = self.stats.read().await.clone();
194        stats.total_keys = self.tree.len() as u64;
195        stats.disk_usage = self.calculate_disk_usage();
196        Ok(stats)
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use tempfile::TempDir;
204
205    #[tokio::test]
206    async fn test_disk_storage_basic_operations() {
207        let temp_dir = TempDir::new().unwrap();
208        let config = StorageConfig {
209            data_dir: temp_dir.path().to_string_lossy().to_string(),
210            ..Default::default()
211        };
212        let storage = DiskStorage::new(config).unwrap();
213        
214        // Test set and get
215        storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
216        let value = storage.get("key1").await.unwrap();
217        assert_eq!(value, Some(b"value1".to_vec()));
218        
219        // Test exists
220        assert!(storage.exists("key1").await.unwrap());
221        assert!(!storage.exists("key2").await.unwrap());
222        
223        // Test delete
224        let deleted = storage.delete("key1").await.unwrap();
225        assert!(deleted);
226        assert!(!storage.exists("key1").await.unwrap());
227    }
228    
229    #[tokio::test]
230    async fn test_disk_storage_persistence() {
231        let temp_dir = TempDir::new().unwrap();
232        let config = StorageConfig {
233            data_dir: temp_dir.path().to_string_lossy().to_string(),
234            ..Default::default()
235        };
236        
237        // Create storage and write data
238        {
239            let storage = DiskStorage::new(config.clone()).unwrap();
240            storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
241            storage.flush().await.unwrap();
242            storage.close().await.unwrap();
243        }
244        
245        // Reopen storage and verify data persists
246        {
247            let storage = DiskStorage::new(config).unwrap();
248            let value = storage.get("key1").await.unwrap();
249            assert_eq!(value, Some(b"value1".to_vec()));
250            storage.close().await.unwrap();
251        }
252    }
253    
254    #[tokio::test]
255    async fn test_disk_storage_stats() {
256        let temp_dir = TempDir::new().unwrap();
257        let config = StorageConfig {
258            data_dir: temp_dir.path().to_string_lossy().to_string(),
259            ..Default::default()
260        };
261        let storage = DiskStorage::new(config).unwrap();
262        
263        storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
264        
265        // Flush to ensure data is written to disk
266        storage.flush().await.unwrap();
267        
268        let stats = storage.stats().await.unwrap();
269        assert_eq!(stats.total_keys, 1);
270        assert!(stats.disk_usage > 0);
271        assert!(stats.operations_count > 0);
272    }
273}