mappy_core/storage/
disk.rs1use super::{Storage, StorageConfig, StorageStats};
6use crate::{MapletError, MapletResult};
7use async_trait::async_trait;
8use sled::{Db, Tree};
9use std::path::Path;
10use std::sync::Arc;
11use std::time::Instant;
12use tokio::sync::RwLock;
13
14pub struct DiskStorage {
16 db: Arc<Db>,
18 tree: Tree,
20 #[allow(dead_code)]
22 config: StorageConfig,
23 stats: Arc<RwLock<StorageStats>>,
25 #[allow(dead_code)]
27 start_time: Instant,
28}
29
30impl DiskStorage {
31 pub fn new(config: StorageConfig) -> MapletResult<Self> {
33 std::fs::create_dir_all(&config.data_dir)
35 .map_err(|e| MapletError::Internal(format!("Failed to create data directory: {e}")))?;
36
37 let db_path = Path::new(&config.data_dir).join("mappy.db");
38 let db = sled::open(&db_path)
39 .map_err(|e| MapletError::Internal(format!("Failed to open database: {e}")))?;
40
41 let tree = db
42 .open_tree("data")
43 .map_err(|e| MapletError::Internal(format!("Failed to open tree: {e}")))?;
44
45 Ok(Self {
46 db: Arc::new(db),
47 tree,
48 config,
49 stats: Arc::new(RwLock::new(StorageStats::default())),
50 start_time: Instant::now(),
51 })
52 }
53
54 async fn update_stats<F>(&self, f: F)
56 where
57 F: FnOnce(&mut StorageStats),
58 {
59 let mut stats = self.stats.write().await;
60 f(&mut stats);
61 }
62
63 fn calculate_disk_usage(&self) -> u64 {
65 self.db
66 .size_on_disk()
67 .map_err(|_| MapletError::Internal("Failed to get disk usage".to_string()))
68 .unwrap_or(0)
69 }
70}
71
72#[async_trait]
73impl Storage for DiskStorage {
74 async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
75 let start = Instant::now();
76 let result = self
77 .tree
78 .get(key)
79 .map_err(|e| MapletError::Internal(format!("Failed to get key: {e}")))?
80 .map(|ivec| ivec.to_vec());
81 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
82
83 self.update_stats(|stats| {
84 stats.operations_count += 1;
85 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
86 })
87 .await;
88
89 Ok(result)
90 }
91
92 async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
93 let start = Instant::now();
94
95 self.tree
96 .insert(&key, value)
97 .map_err(|e| MapletError::Internal(format!("Failed to set key: {e}")))?;
98
99 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
100
101 self.update_stats(|stats| {
102 stats.operations_count += 1;
103 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
104 stats.total_keys = self.tree.len() as u64;
105 stats.disk_usage = self.calculate_disk_usage();
106 })
107 .await;
108
109 Ok(())
110 }
111
112 async fn delete(&self, key: &str) -> MapletResult<bool> {
113 let start = Instant::now();
114 let existed = self
115 .tree
116 .remove(key)
117 .map_err(|e| MapletError::Internal(format!("Failed to delete key: {e}")))?
118 .is_some();
119 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
120
121 self.update_stats(|stats| {
122 stats.operations_count += 1;
123 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
124 stats.total_keys = self.tree.len() as u64;
125 stats.disk_usage = self.calculate_disk_usage();
126 })
127 .await;
128
129 Ok(existed)
130 }
131
132 async fn exists(&self, key: &str) -> MapletResult<bool> {
133 let start = Instant::now();
134 let exists = self
135 .tree
136 .contains_key(key)
137 .map_err(|e| MapletError::Internal(format!("Failed to check key existence: {e}")))?;
138 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
139
140 self.update_stats(|stats| {
141 stats.operations_count += 1;
142 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
143 })
144 .await;
145
146 Ok(exists)
147 }
148
149 async fn keys(&self) -> MapletResult<Vec<String>> {
150 let start = Instant::now();
151 let keys: Result<Vec<String>, _> = self
152 .tree
153 .iter()
154 .map(|result| {
155 let (key, _) =
156 result.map_err(|e| MapletError::Internal(format!("Failed to iterate: {e}")))?;
157 String::from_utf8(key.to_vec())
158 .map_err(|e| MapletError::Internal(format!("Invalid UTF-8 key: {e}")))
159 })
160 .collect();
161 let keys = keys?;
162 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
163
164 self.update_stats(|stats| {
165 stats.operations_count += 1;
166 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
167 })
168 .await;
169
170 Ok(keys)
171 }
172
173 async fn clear_database(&self) -> MapletResult<()> {
174 let start = Instant::now();
175 self.tree
176 .clear()
177 .map_err(|e| MapletError::Internal(format!("Failed to clear database: {e}")))?;
178 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
179
180 self.update_stats(|stats| {
181 stats.operations_count += 1;
182 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
183 stats.total_keys = 0;
184 stats.disk_usage = self.calculate_disk_usage();
185 })
186 .await;
187
188 Ok(())
189 }
190
191 async fn flush(&self) -> MapletResult<()> {
192 let start = Instant::now();
193 self.db
194 .flush()
195 .map_err(|e| MapletError::Internal(format!("Failed to flush: {e}")))?;
196 let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
197
198 self.update_stats(|stats| {
199 stats.operations_count += 1;
200 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
201 })
202 .await;
203
204 Ok(())
205 }
206
207 async fn close(&self) -> MapletResult<()> {
208 self.db
209 .flush()
210 .map_err(|e| MapletError::Internal(format!("Failed to flush on close: {e}")))?;
211 Ok(())
212 }
213
214 async fn stats(&self) -> MapletResult<StorageStats> {
215 let mut stats = self.stats.read().await.clone();
216 stats.total_keys = self.tree.len() as u64;
217 stats.disk_usage = self.calculate_disk_usage();
218 Ok(stats)
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use tempfile::TempDir;
226
227 #[tokio::test]
228 async fn test_disk_storage_basic_operations() {
229 let temp_dir = TempDir::new().unwrap();
230 let config = StorageConfig {
231 data_dir: temp_dir.path().to_string_lossy().to_string(),
232 ..Default::default()
233 };
234 let storage = DiskStorage::new(config).unwrap();
235
236 storage
238 .set("key1".to_string(), b"value1".to_vec())
239 .await
240 .unwrap();
241 let value = storage.get("key1").await.unwrap();
242 assert_eq!(value, Some(b"value1".to_vec()));
243
244 assert!(storage.exists("key1").await.unwrap());
246 assert!(!storage.exists("key2").await.unwrap());
247
248 let deleted = storage.delete("key1").await.unwrap();
250 assert!(deleted);
251 assert!(!storage.exists("key1").await.unwrap());
252 }
253
254 #[tokio::test]
255 async fn test_disk_storage_persistence() {
256 let temp_dir = TempDir::new().unwrap();
257 let config = StorageConfig {
258 data_dir: temp_dir.path().to_string_lossy().to_string(),
259 ..Default::default()
260 };
261
262 {
264 let storage = DiskStorage::new(config.clone()).unwrap();
265 storage
266 .set("key1".to_string(), b"value1".to_vec())
267 .await
268 .unwrap();
269 storage.flush().await.unwrap();
270 storage.close().await.unwrap();
271 }
272
273 {
275 let storage = DiskStorage::new(config).unwrap();
276 let value = storage.get("key1").await.unwrap();
277 assert_eq!(value, Some(b"value1".to_vec()));
278 storage.close().await.unwrap();
279 }
280 }
281
282 #[tokio::test]
283 async fn test_disk_storage_stats() {
284 let temp_dir = TempDir::new().unwrap();
285 let config = StorageConfig {
286 data_dir: temp_dir.path().to_string_lossy().to_string(),
287 ..Default::default()
288 };
289 let storage = DiskStorage::new(config).unwrap();
290
291 storage
292 .set("key1".to_string(), b"value1".to_vec())
293 .await
294 .unwrap();
295
296 storage.flush().await.unwrap();
298
299 let stats = storage.stats().await.unwrap();
300 assert_eq!(stats.total_keys, 1);
301 assert!(stats.disk_usage > 0);
302 assert!(stats.operations_count > 0);
303 }
304}