mappy_core/storage/
aof.rs

1//! Append-Only File (AOF) storage backend
2//!
3//! Provides durability through append-only logging with periodic sync.
4
5use super::{Storage, StorageConfig, StorageStats};
6use crate::{MapletError, MapletResult};
7use async_trait::async_trait;
8use dashmap::DashMap;
9use serde_json;
10use std::fs::OpenOptions;
11use std::io::{BufRead, BufReader, Write};
12use std::path::Path;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::RwLock;
16use tokio::time::{Duration, interval};
17
18/// AOF storage backend
19pub struct AOFStorage {
20    /// In-memory cache
21    cache: Arc<DashMap<String, Vec<u8>>>,
22    /// AOF file path
23    aof_path: std::path::PathBuf,
24    /// Configuration
25    config: StorageConfig,
26    /// Statistics
27    stats: Arc<RwLock<StorageStats>>,
28    /// Start time for latency calculation
29    #[allow(dead_code)]
30    start_time: Instant,
31    /// Background sync task handle
32    sync_handle: Option<tokio::task::JoinHandle<()>>,
33}
34
35#[derive(serde::Serialize, serde::Deserialize)]
36enum AOFEntry {
37    Set { key: String, value: Vec<u8> },
38    Delete { key: String },
39}
40
41impl AOFStorage {
42    /// Create a new AOF storage
43    pub fn new(config: StorageConfig) -> MapletResult<Self> {
44        // Ensure data directory exists
45        std::fs::create_dir_all(&config.data_dir)
46            .map_err(|e| MapletError::Internal(format!("Failed to create data directory: {e}")))?;
47
48        let aof_path = Path::new(&config.data_dir).join("mappy.aof");
49
50        let mut storage = Self {
51            cache: Arc::new(DashMap::new()),
52            aof_path,
53            config,
54            stats: Arc::new(RwLock::new(StorageStats::default())),
55            start_time: Instant::now(),
56            sync_handle: None,
57        };
58
59        // Load existing data from AOF file
60        storage.load_from_aof()?;
61
62        // Start background sync task
63        storage.start_sync_task();
64
65        Ok(storage)
66    }
67
68    /// Load data from AOF file
69    fn load_from_aof(&self) -> MapletResult<()> {
70        if !self.aof_path.exists() {
71            return Ok(());
72        }
73
74        let file = std::fs::File::open(&self.aof_path)
75            .map_err(|e| MapletError::Internal(format!("Failed to open AOF file: {e}")))?;
76
77        let reader = BufReader::new(file);
78
79        for line in reader.lines() {
80            let line =
81                line.map_err(|e| MapletError::Internal(format!("Failed to read AOF line: {e}")))?;
82            if line.trim().is_empty() {
83                continue;
84            }
85
86            let entry: AOFEntry = serde_json::from_str(&line)
87                .map_err(|e| MapletError::Internal(format!("Failed to parse AOF entry: {e}")))?;
88
89            match entry {
90                AOFEntry::Set { key, value } => {
91                    self.cache.insert(key, value);
92                }
93                AOFEntry::Delete { key } => {
94                    self.cache.remove(&key);
95                }
96            }
97        }
98
99        Ok(())
100    }
101
102    /// Append entry to AOF file
103    fn append_to_aof(&self, entry: AOFEntry) -> MapletResult<()> {
104        let line = serde_json::to_string(&entry)
105            .map_err(|e| MapletError::Internal(format!("Failed to serialize AOF entry: {e}")))?;
106
107        let mut file = OpenOptions::new()
108            .create(true)
109            .append(true)
110            .open(&self.aof_path)
111            .map_err(|e| {
112                MapletError::Internal(format!("Failed to open AOF file for append: {e}"))
113            })?;
114
115        writeln!(file, "{line}")
116            .map_err(|e| MapletError::Internal(format!("Failed to write to AOF file: {e}")))?;
117
118        Ok(())
119    }
120
121    /// Start background sync task
122    fn start_sync_task(&mut self) {
123        let _cache = Arc::clone(&self.cache);
124        let aof_path = self.aof_path.clone();
125        let sync_interval = Duration::from_secs(self.config.sync_interval);
126
127        let handle = tokio::spawn(async move {
128            let mut interval = interval(sync_interval);
129
130            loop {
131                interval.tick().await;
132
133                // Force sync to disk
134                if let Err(e) = std::fs::File::open(&aof_path).and_then(|f| f.sync_all()) {
135                    eprintln!("Failed to sync AOF file: {e}");
136                }
137            }
138        });
139
140        self.sync_handle = Some(handle);
141    }
142
143    /// Update statistics
144    async fn update_stats<F>(&self, f: F)
145    where
146        F: FnOnce(&mut StorageStats),
147    {
148        let mut stats = self.stats.write().await;
149        f(&mut stats);
150    }
151
152    /// Calculate memory usage
153    fn calculate_memory_usage(&self) -> u64 {
154        let mut total = 0;
155        for entry in self.cache.iter() {
156            total += entry.key().len() + entry.value().len();
157        }
158        total as u64
159    }
160
161    /// Calculate disk usage
162    fn calculate_disk_usage(&self) -> u64 {
163        if self.aof_path.exists() {
164            std::fs::metadata(&self.aof_path)
165                .map(|m| m.len())
166                .unwrap_or(0)
167        } else {
168            0
169        }
170    }
171}
172
173#[async_trait]
174impl Storage for AOFStorage {
175    async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
176        let start = Instant::now();
177        let result = self.cache.get(key).map(|entry| entry.value().clone());
178        let latency = start.elapsed().as_micros() as u64;
179
180        self.update_stats(|stats| {
181            stats.operations_count += 1;
182            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
183        })
184        .await;
185
186        Ok(result)
187    }
188
189    async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
190        let start = Instant::now();
191
192        // Update cache
193        self.cache.insert(key.clone(), value.clone());
194
195        // Append to AOF
196        let entry = AOFEntry::Set { key, value };
197        self.append_to_aof(entry)?;
198
199        let latency = start.elapsed().as_micros() as u64;
200
201        self.update_stats(|stats| {
202            stats.operations_count += 1;
203            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
204            stats.total_keys = self.cache.len() as u64;
205            stats.memory_usage = self.calculate_memory_usage();
206            stats.disk_usage = self.calculate_disk_usage();
207        })
208        .await;
209
210        Ok(())
211    }
212
213    async fn delete(&self, key: &str) -> MapletResult<bool> {
214        let start = Instant::now();
215        let existed = self.cache.remove(key).is_some();
216
217        if existed {
218            // Append delete to AOF
219            let entry = AOFEntry::Delete {
220                key: key.to_string(),
221            };
222            self.append_to_aof(entry)?;
223        }
224
225        let latency = start.elapsed().as_micros() as u64;
226
227        self.update_stats(|stats| {
228            stats.operations_count += 1;
229            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
230            stats.total_keys = self.cache.len() as u64;
231            stats.memory_usage = self.calculate_memory_usage();
232            stats.disk_usage = self.calculate_disk_usage();
233        })
234        .await;
235
236        Ok(existed)
237    }
238
239    async fn exists(&self, key: &str) -> MapletResult<bool> {
240        let start = Instant::now();
241        let exists = self.cache.contains_key(key);
242        let latency = start.elapsed().as_micros() as u64;
243
244        self.update_stats(|stats| {
245            stats.operations_count += 1;
246            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
247        })
248        .await;
249
250        Ok(exists)
251    }
252
253    async fn keys(&self) -> MapletResult<Vec<String>> {
254        let start = Instant::now();
255        let keys: Vec<String> = self.cache.iter().map(|entry| entry.key().clone()).collect();
256        let latency = start.elapsed().as_micros() as u64;
257
258        self.update_stats(|stats| {
259            stats.operations_count += 1;
260            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
261        })
262        .await;
263
264        Ok(keys)
265    }
266
267    async fn clear_database(&self) -> MapletResult<()> {
268        let start = Instant::now();
269        self.cache.clear();
270
271        // Clear AOF file
272        std::fs::write(&self.aof_path, "")
273            .map_err(|e| MapletError::Internal(format!("Failed to clear AOF file: {e}")))?;
274
275        let latency = start.elapsed().as_micros() as u64;
276
277        self.update_stats(|stats| {
278            stats.operations_count += 1;
279            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
280            stats.total_keys = 0;
281            stats.memory_usage = 0;
282            stats.disk_usage = 0;
283        })
284        .await;
285
286        Ok(())
287    }
288
289    async fn flush(&self) -> MapletResult<()> {
290        let start = Instant::now();
291
292        // Force sync AOF file to disk
293        std::fs::File::open(&self.aof_path)
294            .and_then(|f| f.sync_all())
295            .map_err(|e| MapletError::Internal(format!("Failed to sync AOF file: {e}")))?;
296
297        let latency = start.elapsed().as_micros() as u64;
298
299        self.update_stats(|stats| {
300            stats.operations_count += 1;
301            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
302        })
303        .await;
304
305        Ok(())
306    }
307
308    async fn close(&self) -> MapletResult<()> {
309        // Cancel background sync task
310        if let Some(handle) = &self.sync_handle {
311            handle.abort();
312        }
313
314        // Final sync
315        self.flush().await?;
316
317        Ok(())
318    }
319
320    async fn stats(&self) -> MapletResult<StorageStats> {
321        let mut stats = self.stats.read().await.clone();
322        stats.total_keys = self.cache.len() as u64;
323        stats.memory_usage = self.calculate_memory_usage();
324        stats.disk_usage = self.calculate_disk_usage();
325        Ok(stats)
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332    use tempfile::TempDir;
333
334    #[tokio::test]
335    async fn test_aof_storage_basic_operations() {
336        let temp_dir = TempDir::new().unwrap();
337        let config = StorageConfig {
338            data_dir: temp_dir.path().to_string_lossy().to_string(),
339            sync_interval: 1,
340            ..Default::default()
341        };
342        let storage = AOFStorage::new(config).unwrap();
343
344        // Test set and get
345        storage
346            .set("key1".to_string(), b"value1".to_vec())
347            .await
348            .unwrap();
349        let value = storage.get("key1").await.unwrap();
350        assert_eq!(value, Some(b"value1".to_vec()));
351
352        // Test exists
353        assert!(storage.exists("key1").await.unwrap());
354        assert!(!storage.exists("key2").await.unwrap());
355
356        // Test delete
357        let deleted = storage.delete("key1").await.unwrap();
358        assert!(deleted);
359        assert!(!storage.exists("key1").await.unwrap());
360    }
361
362    #[tokio::test]
363    async fn test_aof_storage_persistence() {
364        let temp_dir = TempDir::new().unwrap();
365        let config = StorageConfig {
366            data_dir: temp_dir.path().to_string_lossy().to_string(),
367            sync_interval: 1,
368            ..Default::default()
369        };
370
371        // Create storage and write data
372        {
373            let storage = AOFStorage::new(config.clone()).unwrap();
374            storage
375                .set("key1".to_string(), b"value1".to_vec())
376                .await
377                .unwrap();
378            storage.flush().await.unwrap();
379        }
380
381        // Reopen storage and verify data persists
382        {
383            let storage = AOFStorage::new(config).unwrap();
384            let value = storage.get("key1").await.unwrap();
385            assert_eq!(value, Some(b"value1".to_vec()));
386        }
387    }
388
389    #[tokio::test]
390    async fn test_aof_storage_stats() {
391        let temp_dir = TempDir::new().unwrap();
392        let config = StorageConfig {
393            data_dir: temp_dir.path().to_string_lossy().to_string(),
394            sync_interval: 1,
395            ..Default::default()
396        };
397        let storage = AOFStorage::new(config).unwrap();
398
399        storage
400            .set("key1".to_string(), b"value1".to_vec())
401            .await
402            .unwrap();
403
404        let stats = storage.stats().await.unwrap();
405        assert_eq!(stats.total_keys, 1);
406        assert!(stats.memory_usage > 0);
407        assert!(stats.disk_usage > 0);
408        assert!(stats.operations_count > 0);
409    }
410}