mappy_core/storage/
disk.rs1use super::{Storage, StorageStats, StorageConfig};
6use crate::{MapletError, MapletResult};
7use sled::{Db, Tree};
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use std::time::Instant;
11use std::path::Path;
12use async_trait::async_trait;
13
14pub struct DiskStorage {
16 db: Arc<Db>,
18 tree: Tree,
20 #[allow(dead_code)]
22 config: StorageConfig,
23 stats: Arc<RwLock<StorageStats>>,
25 #[allow(dead_code)]
27 start_time: Instant,
28}
29
30impl DiskStorage {
31 pub fn new(config: StorageConfig) -> MapletResult<Self> {
33 std::fs::create_dir_all(&config.data_dir)
35 .map_err(|e| MapletError::Internal(format!("Failed to create data directory: {e}")))?;
36
37 let db_path = Path::new(&config.data_dir).join("mappy.db");
38 let db = sled::open(&db_path)
39 .map_err(|e| MapletError::Internal(format!("Failed to open database: {e}")))?;
40
41 let tree = db.open_tree("data")
42 .map_err(|e| MapletError::Internal(format!("Failed to open tree: {e}")))?;
43
44 Ok(Self {
45 db: Arc::new(db),
46 tree,
47 config,
48 stats: Arc::new(RwLock::new(StorageStats::default())),
49 start_time: Instant::now(),
50 })
51 }
52
53 async fn update_stats<F>(&self, f: F)
55 where
56 F: FnOnce(&mut StorageStats),
57 {
58 let mut stats = self.stats.write().await;
59 f(&mut stats);
60 }
61
62 fn calculate_disk_usage(&self) -> u64 {
64 self.db.size_on_disk()
65 .map_err(|_| MapletError::Internal("Failed to get disk usage".to_string()))
66 .unwrap_or(0)
67 }
68}
69
70#[async_trait]
71impl Storage for DiskStorage {
72 async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
73 let start = Instant::now();
74 let result = self.tree.get(key)
75 .map_err(|e| MapletError::Internal(format!("Failed to get key: {e}")))?
76 .map(|ivec| ivec.to_vec());
77 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
78
79 self.update_stats(|stats| {
80 stats.operations_count += 1;
81 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
82 }).await;
83
84 Ok(result)
85 }
86
87 async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
88 let start = Instant::now();
89
90 self.tree.insert(&key, value)
91 .map_err(|e| MapletError::Internal(format!("Failed to set key: {e}")))?;
92
93 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
94
95 self.update_stats(|stats| {
96 stats.operations_count += 1;
97 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
98 stats.total_keys = self.tree.len() as u64;
99 stats.disk_usage = self.calculate_disk_usage();
100 }).await;
101
102 Ok(())
103 }
104
105 async fn delete(&self, key: &str) -> MapletResult<bool> {
106 let start = Instant::now();
107 let existed = self.tree.remove(key)
108 .map_err(|e| MapletError::Internal(format!("Failed to delete key: {e}")))?
109 .is_some();
110 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
111
112 self.update_stats(|stats| {
113 stats.operations_count += 1;
114 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
115 stats.total_keys = self.tree.len() as u64;
116 stats.disk_usage = self.calculate_disk_usage();
117 }).await;
118
119 Ok(existed)
120 }
121
122 async fn exists(&self, key: &str) -> MapletResult<bool> {
123 let start = Instant::now();
124 let exists = self.tree.contains_key(key)
125 .map_err(|e| MapletError::Internal(format!("Failed to check key existence: {e}")))?;
126 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
127
128 self.update_stats(|stats| {
129 stats.operations_count += 1;
130 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
131 }).await;
132
133 Ok(exists)
134 }
135
136 async fn keys(&self) -> MapletResult<Vec<String>> {
137 let start = Instant::now();
138 let keys: Result<Vec<String>, _> = self.tree.iter()
139 .map(|result| {
140 let (key, _) = result.map_err(|e| MapletError::Internal(format!("Failed to iterate: {e}")))?;
141 String::from_utf8(key.to_vec())
142 .map_err(|e| MapletError::Internal(format!("Invalid UTF-8 key: {e}")))
143 })
144 .collect();
145 let keys = keys?;
146 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
147
148 self.update_stats(|stats| {
149 stats.operations_count += 1;
150 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
151 }).await;
152
153 Ok(keys)
154 }
155
156 async fn clear_database(&self) -> MapletResult<()> {
157 let start = Instant::now();
158 self.tree.clear()
159 .map_err(|e| MapletError::Internal(format!("Failed to clear database: {}", e)))?;
160 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
161
162 self.update_stats(|stats| {
163 stats.operations_count += 1;
164 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
165 stats.total_keys = 0;
166 stats.disk_usage = self.calculate_disk_usage();
167 }).await;
168
169 Ok(())
170 }
171
172 async fn flush(&self) -> MapletResult<()> {
173 let start = Instant::now();
174 self.db.flush()
175 .map_err(|e| MapletError::Internal(format!("Failed to flush: {}", e)))?;
176 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
177
178 self.update_stats(|stats| {
179 stats.operations_count += 1;
180 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
181 }).await;
182
183 Ok(())
184 }
185
186 async fn close(&self) -> MapletResult<()> {
187 self.db.flush()
188 .map_err(|e| MapletError::Internal(format!("Failed to flush on close: {}", e)))?;
189 Ok(())
190 }
191
192 async fn stats(&self) -> MapletResult<StorageStats> {
193 let mut stats = self.stats.read().await.clone();
194 stats.total_keys = self.tree.len() as u64;
195 stats.disk_usage = self.calculate_disk_usage();
196 Ok(stats)
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use tempfile::TempDir;
204
205 #[tokio::test]
206 async fn test_disk_storage_basic_operations() {
207 let temp_dir = TempDir::new().unwrap();
208 let config = StorageConfig {
209 data_dir: temp_dir.path().to_string_lossy().to_string(),
210 ..Default::default()
211 };
212 let storage = DiskStorage::new(config).unwrap();
213
214 storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
216 let value = storage.get("key1").await.unwrap();
217 assert_eq!(value, Some(b"value1".to_vec()));
218
219 assert!(storage.exists("key1").await.unwrap());
221 assert!(!storage.exists("key2").await.unwrap());
222
223 let deleted = storage.delete("key1").await.unwrap();
225 assert!(deleted);
226 assert!(!storage.exists("key1").await.unwrap());
227 }
228
229 #[tokio::test]
230 async fn test_disk_storage_persistence() {
231 let temp_dir = TempDir::new().unwrap();
232 let config = StorageConfig {
233 data_dir: temp_dir.path().to_string_lossy().to_string(),
234 ..Default::default()
235 };
236
237 {
239 let storage = DiskStorage::new(config.clone()).unwrap();
240 storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
241 storage.flush().await.unwrap();
242 storage.close().await.unwrap();
243 }
244
245 {
247 let storage = DiskStorage::new(config).unwrap();
248 let value = storage.get("key1").await.unwrap();
249 assert_eq!(value, Some(b"value1".to_vec()));
250 storage.close().await.unwrap();
251 }
252 }
253
254 #[tokio::test]
255 async fn test_disk_storage_stats() {
256 let temp_dir = TempDir::new().unwrap();
257 let config = StorageConfig {
258 data_dir: temp_dir.path().to_string_lossy().to_string(),
259 ..Default::default()
260 };
261 let storage = DiskStorage::new(config).unwrap();
262
263 storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
264
265 storage.flush().await.unwrap();
267
268 let stats = storage.stats().await.unwrap();
269 assert_eq!(stats.total_keys, 1);
270 assert!(stats.disk_usage > 0);
271 assert!(stats.operations_count > 0);
272 }
273}