mappy_core/storage/
aof.rs1use super::{Storage, StorageConfig, StorageStats};
6use crate::{MapletError, MapletResult};
7use async_trait::async_trait;
8use dashmap::DashMap;
9use serde_json;
10use std::fs::OpenOptions;
11use std::io::{BufRead, BufReader, Write};
12use std::path::Path;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::RwLock;
16use tokio::time::{Duration, interval};
17
18pub struct AOFStorage {
20 cache: Arc<DashMap<String, Vec<u8>>>,
22 aof_path: std::path::PathBuf,
24 config: StorageConfig,
26 stats: Arc<RwLock<StorageStats>>,
28 #[allow(dead_code)]
30 start_time: Instant,
31 sync_handle: Option<tokio::task::JoinHandle<()>>,
33}
34
35#[derive(serde::Serialize, serde::Deserialize)]
36enum AOFEntry {
37 Set { key: String, value: Vec<u8> },
38 Delete { key: String },
39}
40
41impl AOFStorage {
42 pub fn new(config: StorageConfig) -> MapletResult<Self> {
44 std::fs::create_dir_all(&config.data_dir)
46 .map_err(|e| MapletError::Internal(format!("Failed to create data directory: {e}")))?;
47
48 let aof_path = Path::new(&config.data_dir).join("mappy.aof");
49
50 let mut storage = Self {
51 cache: Arc::new(DashMap::new()),
52 aof_path,
53 config,
54 stats: Arc::new(RwLock::new(StorageStats::default())),
55 start_time: Instant::now(),
56 sync_handle: None,
57 };
58
59 storage.load_from_aof()?;
61
62 storage.start_sync_task();
64
65 Ok(storage)
66 }
67
68 fn load_from_aof(&self) -> MapletResult<()> {
70 if !self.aof_path.exists() {
71 return Ok(());
72 }
73
74 let file = std::fs::File::open(&self.aof_path)
75 .map_err(|e| MapletError::Internal(format!("Failed to open AOF file: {e}")))?;
76
77 let reader = BufReader::new(file);
78
79 for line in reader.lines() {
80 let line =
81 line.map_err(|e| MapletError::Internal(format!("Failed to read AOF line: {e}")))?;
82 if line.trim().is_empty() {
83 continue;
84 }
85
86 let entry: AOFEntry = serde_json::from_str(&line)
87 .map_err(|e| MapletError::Internal(format!("Failed to parse AOF entry: {e}")))?;
88
89 match entry {
90 AOFEntry::Set { key, value } => {
91 self.cache.insert(key, value);
92 }
93 AOFEntry::Delete { key } => {
94 self.cache.remove(&key);
95 }
96 }
97 }
98
99 Ok(())
100 }
101
102 fn append_to_aof(&self, entry: AOFEntry) -> MapletResult<()> {
104 let line = serde_json::to_string(&entry)
105 .map_err(|e| MapletError::Internal(format!("Failed to serialize AOF entry: {e}")))?;
106
107 let mut file = OpenOptions::new()
108 .create(true)
109 .append(true)
110 .open(&self.aof_path)
111 .map_err(|e| {
112 MapletError::Internal(format!("Failed to open AOF file for append: {e}"))
113 })?;
114
115 writeln!(file, "{line}")
116 .map_err(|e| MapletError::Internal(format!("Failed to write to AOF file: {e}")))?;
117
118 Ok(())
119 }
120
121 fn start_sync_task(&mut self) {
123 let _cache = Arc::clone(&self.cache);
124 let aof_path = self.aof_path.clone();
125 let sync_interval = Duration::from_secs(self.config.sync_interval);
126
127 let handle = tokio::spawn(async move {
128 let mut interval = interval(sync_interval);
129
130 loop {
131 interval.tick().await;
132
133 if let Err(e) = std::fs::File::open(&aof_path).and_then(|f| f.sync_all()) {
135 eprintln!("Failed to sync AOF file: {e}");
136 }
137 }
138 });
139
140 self.sync_handle = Some(handle);
141 }
142
143 async fn update_stats<F>(&self, f: F)
145 where
146 F: FnOnce(&mut StorageStats),
147 {
148 let mut stats = self.stats.write().await;
149 f(&mut stats);
150 }
151
152 fn calculate_memory_usage(&self) -> u64 {
154 let mut total = 0;
155 for entry in self.cache.iter() {
156 total += entry.key().len() + entry.value().len();
157 }
158 total as u64
159 }
160
161 fn calculate_disk_usage(&self) -> u64 {
163 if self.aof_path.exists() {
164 std::fs::metadata(&self.aof_path)
165 .map(|m| m.len())
166 .unwrap_or(0)
167 } else {
168 0
169 }
170 }
171}
172
173#[async_trait]
174impl Storage for AOFStorage {
175 async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
176 let start = Instant::now();
177 let result = self.cache.get(key).map(|entry| entry.value().clone());
178 let latency = start.elapsed().as_micros() as u64;
179
180 self.update_stats(|stats| {
181 stats.operations_count += 1;
182 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
183 })
184 .await;
185
186 Ok(result)
187 }
188
189 async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
190 let start = Instant::now();
191
192 self.cache.insert(key.clone(), value.clone());
194
195 let entry = AOFEntry::Set { key, value };
197 self.append_to_aof(entry)?;
198
199 let latency = start.elapsed().as_micros() as u64;
200
201 self.update_stats(|stats| {
202 stats.operations_count += 1;
203 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
204 stats.total_keys = self.cache.len() as u64;
205 stats.memory_usage = self.calculate_memory_usage();
206 stats.disk_usage = self.calculate_disk_usage();
207 })
208 .await;
209
210 Ok(())
211 }
212
213 async fn delete(&self, key: &str) -> MapletResult<bool> {
214 let start = Instant::now();
215 let existed = self.cache.remove(key).is_some();
216
217 if existed {
218 let entry = AOFEntry::Delete {
220 key: key.to_string(),
221 };
222 self.append_to_aof(entry)?;
223 }
224
225 let latency = start.elapsed().as_micros() as u64;
226
227 self.update_stats(|stats| {
228 stats.operations_count += 1;
229 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
230 stats.total_keys = self.cache.len() as u64;
231 stats.memory_usage = self.calculate_memory_usage();
232 stats.disk_usage = self.calculate_disk_usage();
233 })
234 .await;
235
236 Ok(existed)
237 }
238
239 async fn exists(&self, key: &str) -> MapletResult<bool> {
240 let start = Instant::now();
241 let exists = self.cache.contains_key(key);
242 let latency = start.elapsed().as_micros() as u64;
243
244 self.update_stats(|stats| {
245 stats.operations_count += 1;
246 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
247 })
248 .await;
249
250 Ok(exists)
251 }
252
253 async fn keys(&self) -> MapletResult<Vec<String>> {
254 let start = Instant::now();
255 let keys: Vec<String> = self.cache.iter().map(|entry| entry.key().clone()).collect();
256 let latency = start.elapsed().as_micros() as u64;
257
258 self.update_stats(|stats| {
259 stats.operations_count += 1;
260 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
261 })
262 .await;
263
264 Ok(keys)
265 }
266
267 async fn clear_database(&self) -> MapletResult<()> {
268 let start = Instant::now();
269 self.cache.clear();
270
271 std::fs::write(&self.aof_path, "")
273 .map_err(|e| MapletError::Internal(format!("Failed to clear AOF file: {e}")))?;
274
275 let latency = start.elapsed().as_micros() as u64;
276
277 self.update_stats(|stats| {
278 stats.operations_count += 1;
279 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
280 stats.total_keys = 0;
281 stats.memory_usage = 0;
282 stats.disk_usage = 0;
283 })
284 .await;
285
286 Ok(())
287 }
288
289 async fn flush(&self) -> MapletResult<()> {
290 let start = Instant::now();
291
292 std::fs::File::open(&self.aof_path)
294 .and_then(|f| f.sync_all())
295 .map_err(|e| MapletError::Internal(format!("Failed to sync AOF file: {e}")))?;
296
297 let latency = start.elapsed().as_micros() as u64;
298
299 self.update_stats(|stats| {
300 stats.operations_count += 1;
301 stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
302 })
303 .await;
304
305 Ok(())
306 }
307
308 async fn close(&self) -> MapletResult<()> {
309 if let Some(handle) = &self.sync_handle {
311 handle.abort();
312 }
313
314 self.flush().await?;
316
317 Ok(())
318 }
319
320 async fn stats(&self) -> MapletResult<StorageStats> {
321 let mut stats = self.stats.read().await.clone();
322 stats.total_keys = self.cache.len() as u64;
323 stats.memory_usage = self.calculate_memory_usage();
324 stats.disk_usage = self.calculate_disk_usage();
325 Ok(stats)
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332 use tempfile::TempDir;
333
334 #[tokio::test]
335 async fn test_aof_storage_basic_operations() {
336 let temp_dir = TempDir::new().unwrap();
337 let config = StorageConfig {
338 data_dir: temp_dir.path().to_string_lossy().to_string(),
339 sync_interval: 1,
340 ..Default::default()
341 };
342 let storage = AOFStorage::new(config).unwrap();
343
344 storage
346 .set("key1".to_string(), b"value1".to_vec())
347 .await
348 .unwrap();
349 let value = storage.get("key1").await.unwrap();
350 assert_eq!(value, Some(b"value1".to_vec()));
351
352 assert!(storage.exists("key1").await.unwrap());
354 assert!(!storage.exists("key2").await.unwrap());
355
356 let deleted = storage.delete("key1").await.unwrap();
358 assert!(deleted);
359 assert!(!storage.exists("key1").await.unwrap());
360 }
361
362 #[tokio::test]
363 async fn test_aof_storage_persistence() {
364 let temp_dir = TempDir::new().unwrap();
365 let config = StorageConfig {
366 data_dir: temp_dir.path().to_string_lossy().to_string(),
367 sync_interval: 1,
368 ..Default::default()
369 };
370
371 {
373 let storage = AOFStorage::new(config.clone()).unwrap();
374 storage
375 .set("key1".to_string(), b"value1".to_vec())
376 .await
377 .unwrap();
378 storage.flush().await.unwrap();
379 }
380
381 {
383 let storage = AOFStorage::new(config).unwrap();
384 let value = storage.get("key1").await.unwrap();
385 assert_eq!(value, Some(b"value1".to_vec()));
386 }
387 }
388
389 #[tokio::test]
390 async fn test_aof_storage_stats() {
391 let temp_dir = TempDir::new().unwrap();
392 let config = StorageConfig {
393 data_dir: temp_dir.path().to_string_lossy().to_string(),
394 sync_interval: 1,
395 ..Default::default()
396 };
397 let storage = AOFStorage::new(config).unwrap();
398
399 storage
400 .set("key1".to_string(), b"value1".to_vec())
401 .await
402 .unwrap();
403
404 let stats = storage.stats().await.unwrap();
405 assert_eq!(stats.total_keys, 1);
406 assert!(stats.memory_usage > 0);
407 assert!(stats.disk_usage > 0);
408 assert!(stats.operations_count > 0);
409 }
410}