mappy_core/storage/
disk.rs

1//! Disk-based storage backend
2//!
3//! Uses Sled for persistent storage with ACID guarantees.
4
5use super::{Storage, StorageConfig, StorageStats};
6use crate::{MapletError, MapletResult};
7use async_trait::async_trait;
8use sled::{Db, Tree};
9use std::path::Path;
10use std::sync::Arc;
11use std::time::Instant;
12use tokio::sync::RwLock;
13
14/// Disk-based storage using Sled
15pub struct DiskStorage {
16    /// The Sled database
17    db: Arc<Db>,
18    /// Main data tree
19    tree: Tree,
20    /// Configuration
21    #[allow(dead_code)]
22    config: StorageConfig,
23    /// Statistics
24    stats: Arc<RwLock<StorageStats>>,
25    /// Start time for latency calculation
26    #[allow(dead_code)]
27    start_time: Instant,
28}
29
30impl DiskStorage {
31    /// Create a new disk storage
32    pub fn new(config: StorageConfig) -> MapletResult<Self> {
33        // Ensure data directory exists
34        std::fs::create_dir_all(&config.data_dir)
35            .map_err(|e| MapletError::Internal(format!("Failed to create data directory: {e}")))?;
36
37        let db_path = Path::new(&config.data_dir).join("mappy.db");
38        let db = sled::open(&db_path)
39            .map_err(|e| MapletError::Internal(format!("Failed to open database: {e}")))?;
40
41        let tree = db
42            .open_tree("data")
43            .map_err(|e| MapletError::Internal(format!("Failed to open tree: {e}")))?;
44
45        Ok(Self {
46            db: Arc::new(db),
47            tree,
48            config,
49            stats: Arc::new(RwLock::new(StorageStats::default())),
50            start_time: Instant::now(),
51        })
52    }
53
54    /// Update statistics
55    async fn update_stats<F>(&self, f: F)
56    where
57        F: FnOnce(&mut StorageStats),
58    {
59        let mut stats = self.stats.write().await;
60        f(&mut stats);
61    }
62
63    /// Calculate disk usage
64    fn calculate_disk_usage(&self) -> u64 {
65        self.db
66            .size_on_disk()
67            .map_err(|_| MapletError::Internal("Failed to get disk usage".to_string()))
68            .unwrap_or(0)
69    }
70}
71
72#[async_trait]
73impl Storage for DiskStorage {
74    async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
75        let start = Instant::now();
76        let result = self
77            .tree
78            .get(key)
79            .map_err(|e| MapletError::Internal(format!("Failed to get key: {e}")))?
80            .map(|ivec| ivec.to_vec());
81        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
82
83        self.update_stats(|stats| {
84            stats.operations_count += 1;
85            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
86        })
87        .await;
88
89        Ok(result)
90    }
91
92    async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
93        let start = Instant::now();
94
95        self.tree
96            .insert(&key, value)
97            .map_err(|e| MapletError::Internal(format!("Failed to set key: {e}")))?;
98
99        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
100
101        self.update_stats(|stats| {
102            stats.operations_count += 1;
103            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
104            stats.total_keys = self.tree.len() as u64;
105            stats.disk_usage = self.calculate_disk_usage();
106        })
107        .await;
108
109        Ok(())
110    }
111
112    async fn delete(&self, key: &str) -> MapletResult<bool> {
113        let start = Instant::now();
114        let existed = self
115            .tree
116            .remove(key)
117            .map_err(|e| MapletError::Internal(format!("Failed to delete key: {e}")))?
118            .is_some();
119        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
120
121        self.update_stats(|stats| {
122            stats.operations_count += 1;
123            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
124            stats.total_keys = self.tree.len() as u64;
125            stats.disk_usage = self.calculate_disk_usage();
126        })
127        .await;
128
129        Ok(existed)
130    }
131
132    async fn exists(&self, key: &str) -> MapletResult<bool> {
133        let start = Instant::now();
134        let exists = self
135            .tree
136            .contains_key(key)
137            .map_err(|e| MapletError::Internal(format!("Failed to check key existence: {e}")))?;
138        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
139
140        self.update_stats(|stats| {
141            stats.operations_count += 1;
142            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
143        })
144        .await;
145
146        Ok(exists)
147    }
148
149    async fn keys(&self) -> MapletResult<Vec<String>> {
150        let start = Instant::now();
151        let keys: Result<Vec<String>, _> = self
152            .tree
153            .iter()
154            .map(|result| {
155                let (key, _) =
156                    result.map_err(|e| MapletError::Internal(format!("Failed to iterate: {e}")))?;
157                String::from_utf8(key.to_vec())
158                    .map_err(|e| MapletError::Internal(format!("Invalid UTF-8 key: {e}")))
159            })
160            .collect();
161        let keys = keys?;
162        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
163
164        self.update_stats(|stats| {
165            stats.operations_count += 1;
166            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
167        })
168        .await;
169
170        Ok(keys)
171    }
172
173    async fn clear_database(&self) -> MapletResult<()> {
174        let start = Instant::now();
175        self.tree
176            .clear()
177            .map_err(|e| MapletError::Internal(format!("Failed to clear database: {e}")))?;
178        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
179
180        self.update_stats(|stats| {
181            stats.operations_count += 1;
182            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
183            stats.total_keys = 0;
184            stats.disk_usage = self.calculate_disk_usage();
185        })
186        .await;
187
188        Ok(())
189    }
190
191    async fn flush(&self) -> MapletResult<()> {
192        let start = Instant::now();
193        self.db
194            .flush()
195            .map_err(|e| MapletError::Internal(format!("Failed to flush: {e}")))?;
196        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
197
198        self.update_stats(|stats| {
199            stats.operations_count += 1;
200            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
201        })
202        .await;
203
204        Ok(())
205    }
206
207    async fn close(&self) -> MapletResult<()> {
208        self.db
209            .flush()
210            .map_err(|e| MapletError::Internal(format!("Failed to flush on close: {e}")))?;
211        Ok(())
212    }
213
214    async fn stats(&self) -> MapletResult<StorageStats> {
215        let mut stats = self.stats.read().await.clone();
216        stats.total_keys = self.tree.len() as u64;
217        stats.disk_usage = self.calculate_disk_usage();
218        Ok(stats)
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use tempfile::TempDir;
226
227    #[tokio::test]
228    async fn test_disk_storage_basic_operations() {
229        let temp_dir = TempDir::new().unwrap();
230        let config = StorageConfig {
231            data_dir: temp_dir.path().to_string_lossy().to_string(),
232            ..Default::default()
233        };
234        let storage = DiskStorage::new(config).unwrap();
235
236        // Test set and get
237        storage
238            .set("key1".to_string(), b"value1".to_vec())
239            .await
240            .unwrap();
241        let value = storage.get("key1").await.unwrap();
242        assert_eq!(value, Some(b"value1".to_vec()));
243
244        // Test exists
245        assert!(storage.exists("key1").await.unwrap());
246        assert!(!storage.exists("key2").await.unwrap());
247
248        // Test delete
249        let deleted = storage.delete("key1").await.unwrap();
250        assert!(deleted);
251        assert!(!storage.exists("key1").await.unwrap());
252    }
253
254    #[tokio::test]
255    async fn test_disk_storage_persistence() {
256        let temp_dir = TempDir::new().unwrap();
257        let config = StorageConfig {
258            data_dir: temp_dir.path().to_string_lossy().to_string(),
259            ..Default::default()
260        };
261
262        // Create storage and write data
263        {
264            let storage = DiskStorage::new(config.clone()).unwrap();
265            storage
266                .set("key1".to_string(), b"value1".to_vec())
267                .await
268                .unwrap();
269            storage.flush().await.unwrap();
270            storage.close().await.unwrap();
271        }
272
273        // Reopen storage and verify data persists
274        {
275            let storage = DiskStorage::new(config).unwrap();
276            let value = storage.get("key1").await.unwrap();
277            assert_eq!(value, Some(b"value1".to_vec()));
278            storage.close().await.unwrap();
279        }
280    }
281
282    #[tokio::test]
283    async fn test_disk_storage_stats() {
284        let temp_dir = TempDir::new().unwrap();
285        let config = StorageConfig {
286            data_dir: temp_dir.path().to_string_lossy().to_string(),
287            ..Default::default()
288        };
289        let storage = DiskStorage::new(config).unwrap();
290
291        storage
292            .set("key1".to_string(), b"value1".to_vec())
293            .await
294            .unwrap();
295
296        // Flush to ensure data is written to disk
297        storage.flush().await.unwrap();
298
299        let stats = storage.stats().await.unwrap();
300        assert_eq!(stats.total_keys, 1);
301        assert!(stats.disk_usage > 0);
302        assert!(stats.operations_count > 0);
303    }
304}