mappy_core/storage/
aof.rs

1//! Append-Only File (AOF) storage backend
2//! 
3//! Provides durability through append-only logging with periodic sync.
4
5use super::{Storage, StorageStats, StorageConfig};
6use crate::{MapletError, MapletResult};
7use dashmap::DashMap;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use std::time::Instant;
11use std::path::Path;
12use std::fs::OpenOptions;
13use std::io::{BufReader, Write, BufRead};
14use serde_json;
15use async_trait::async_trait;
16use tokio::time::{interval, Duration};
17
18/// AOF storage backend
19pub struct AOFStorage {
20    /// In-memory cache
21    cache: Arc<DashMap<String, Vec<u8>>>,
22    /// AOF file path
23    aof_path: std::path::PathBuf,
24    /// Configuration
25    config: StorageConfig,
26    /// Statistics
27    stats: Arc<RwLock<StorageStats>>,
28    /// Start time for latency calculation
29    #[allow(dead_code)]
30    start_time: Instant,
31    /// Background sync task handle
32    sync_handle: Option<tokio::task::JoinHandle<()>>,
33}
34
35#[derive(serde::Serialize, serde::Deserialize)]
36enum AOFEntry {
37    Set { key: String, value: Vec<u8> },
38    Delete { key: String },
39}
40
41impl AOFStorage {
42    /// Create a new AOF storage
43    pub fn new(config: StorageConfig) -> MapletResult<Self> {
44        // Ensure data directory exists
45        std::fs::create_dir_all(&config.data_dir)
46            .map_err(|e| MapletError::Internal(format!("Failed to create data directory: {}", e)))?;
47        
48        let aof_path = Path::new(&config.data_dir).join("mappy.aof");
49        
50        let mut storage = Self {
51            cache: Arc::new(DashMap::new()),
52            aof_path: aof_path.clone(),
53            config,
54            stats: Arc::new(RwLock::new(StorageStats::default())),
55            start_time: Instant::now(),
56            sync_handle: None,
57        };
58        
59        // Load existing data from AOF file
60        storage.load_from_aof()?;
61        
62        // Start background sync task
63        storage.start_sync_task();
64        
65        Ok(storage)
66    }
67    
68    /// Load data from AOF file
69    fn load_from_aof(&self) -> MapletResult<()> {
70        if !self.aof_path.exists() {
71            return Ok(());
72        }
73        
74        let file = std::fs::File::open(&self.aof_path)
75            .map_err(|e| MapletError::Internal(format!("Failed to open AOF file: {}", e)))?;
76        
77        let reader = BufReader::new(file);
78        
79        for line in reader.lines() {
80            let line = line.map_err(|e| MapletError::Internal(format!("Failed to read AOF line: {}", e)))?;
81            if line.trim().is_empty() {
82                continue;
83            }
84            
85            let entry: AOFEntry = serde_json::from_str(&line)
86                .map_err(|e| MapletError::Internal(format!("Failed to parse AOF entry: {}", e)))?;
87            
88            match entry {
89                AOFEntry::Set { key, value } => {
90                    self.cache.insert(key, value);
91                }
92                AOFEntry::Delete { key } => {
93                    self.cache.remove(&key);
94                }
95            }
96        }
97        
98        Ok(())
99    }
100    
101    /// Append entry to AOF file
102    fn append_to_aof(&self, entry: AOFEntry) -> MapletResult<()> {
103        let line = serde_json::to_string(&entry)
104            .map_err(|e| MapletError::Internal(format!("Failed to serialize AOF entry: {}", e)))?;
105        
106        let mut file = OpenOptions::new()
107            .create(true)
108            .append(true)
109            .open(&self.aof_path)
110            .map_err(|e| MapletError::Internal(format!("Failed to open AOF file for append: {}", e)))?;
111        
112        writeln!(file, "{}", line)
113            .map_err(|e| MapletError::Internal(format!("Failed to write to AOF file: {}", e)))?;
114        
115        Ok(())
116    }
117    
118    /// Start background sync task
119    fn start_sync_task(&mut self) {
120        let _cache = Arc::clone(&self.cache);
121        let aof_path = self.aof_path.clone();
122        let sync_interval = Duration::from_secs(self.config.sync_interval);
123        
124        let handle = tokio::spawn(async move {
125            let mut interval = interval(sync_interval);
126            
127            loop {
128                interval.tick().await;
129                
130                // Force sync to disk
131                if let Err(e) = std::fs::File::open(&aof_path)
132                    .and_then(|f| f.sync_all())
133                {
134                    eprintln!("Failed to sync AOF file: {}", e);
135                }
136            }
137        });
138        
139        self.sync_handle = Some(handle);
140    }
141    
142    /// Update statistics
143    async fn update_stats<F>(&self, f: F) 
144    where
145        F: FnOnce(&mut StorageStats),
146    {
147        let mut stats = self.stats.write().await;
148        f(&mut stats);
149    }
150    
151    /// Calculate memory usage
152    fn calculate_memory_usage(&self) -> u64 {
153        let mut total = 0;
154        for entry in self.cache.iter() {
155            total += entry.key().len() + entry.value().len();
156        }
157        total as u64
158    }
159    
160    /// Calculate disk usage
161    fn calculate_disk_usage(&self) -> u64 {
162        if self.aof_path.exists() {
163            std::fs::metadata(&self.aof_path)
164                .map(|m| m.len())
165                .unwrap_or(0)
166        } else {
167            0
168        }
169    }
170}
171
172#[async_trait]
173impl Storage for AOFStorage {
174    async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
175        let start = Instant::now();
176        let result = self.cache.get(key).map(|entry| entry.value().clone());
177        let latency = start.elapsed().as_micros() as u64;
178        
179        self.update_stats(|stats| {
180            stats.operations_count += 1;
181            stats.avg_latency_us = (stats.avg_latency_us + latency) / 2;
182        }).await;
183        
184        Ok(result)
185    }
186    
187    async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
188        let start = Instant::now();
189        
190        // Update cache
191        self.cache.insert(key.clone(), value.clone());
192        
193        // Append to AOF
194        let entry = AOFEntry::Set { key, value };
195        self.append_to_aof(entry)?;
196        
197        let latency = start.elapsed().as_micros() as u64;
198        
199        self.update_stats(|stats| {
200            stats.operations_count += 1;
201            stats.avg_latency_us = (stats.avg_latency_us + latency) / 2;
202            stats.total_keys = self.cache.len() as u64;
203            stats.memory_usage = self.calculate_memory_usage();
204            stats.disk_usage = self.calculate_disk_usage();
205        }).await;
206        
207        Ok(())
208    }
209    
210    async fn delete(&self, key: &str) -> MapletResult<bool> {
211        let start = Instant::now();
212        let existed = self.cache.remove(key).is_some();
213        
214        if existed {
215            // Append delete to AOF
216            let entry = AOFEntry::Delete { key: key.to_string() };
217            self.append_to_aof(entry)?;
218        }
219        
220        let latency = start.elapsed().as_micros() as u64;
221        
222        self.update_stats(|stats| {
223            stats.operations_count += 1;
224            stats.avg_latency_us = (stats.avg_latency_us + latency) / 2;
225            stats.total_keys = self.cache.len() as u64;
226            stats.memory_usage = self.calculate_memory_usage();
227            stats.disk_usage = self.calculate_disk_usage();
228        }).await;
229        
230        Ok(existed)
231    }
232    
233    async fn exists(&self, key: &str) -> MapletResult<bool> {
234        let start = Instant::now();
235        let exists = self.cache.contains_key(key);
236        let latency = start.elapsed().as_micros() as u64;
237        
238        self.update_stats(|stats| {
239            stats.operations_count += 1;
240            stats.avg_latency_us = (stats.avg_latency_us + latency) / 2;
241        }).await;
242        
243        Ok(exists)
244    }
245    
246    async fn keys(&self) -> MapletResult<Vec<String>> {
247        let start = Instant::now();
248        let keys: Vec<String> = self.cache.iter().map(|entry| entry.key().clone()).collect();
249        let latency = start.elapsed().as_micros() as u64;
250        
251        self.update_stats(|stats| {
252            stats.operations_count += 1;
253            stats.avg_latency_us = (stats.avg_latency_us + latency) / 2;
254        }).await;
255        
256        Ok(keys)
257    }
258    
259    async fn clear_database(&self) -> MapletResult<()> {
260        let start = Instant::now();
261        self.cache.clear();
262        
263        // Clear AOF file
264        std::fs::write(&self.aof_path, "")
265            .map_err(|e| MapletError::Internal(format!("Failed to clear AOF file: {}", e)))?;
266        
267        let latency = start.elapsed().as_micros() as u64;
268        
269        self.update_stats(|stats| {
270            stats.operations_count += 1;
271            stats.avg_latency_us = (stats.avg_latency_us + latency) / 2;
272            stats.total_keys = 0;
273            stats.memory_usage = 0;
274            stats.disk_usage = 0;
275        }).await;
276        
277        Ok(())
278    }
279    
280    async fn flush(&self) -> MapletResult<()> {
281        let start = Instant::now();
282        
283        // Force sync AOF file to disk
284        std::fs::File::open(&self.aof_path)
285            .and_then(|f| f.sync_all())
286            .map_err(|e| MapletError::Internal(format!("Failed to sync AOF file: {}", e)))?;
287        
288        let latency = start.elapsed().as_micros() as u64;
289        
290        self.update_stats(|stats| {
291            stats.operations_count += 1;
292            stats.avg_latency_us = (stats.avg_latency_us + latency) / 2;
293        }).await;
294        
295        Ok(())
296    }
297    
298    async fn close(&self) -> MapletResult<()> {
299        // Cancel background sync task
300        if let Some(handle) = &self.sync_handle {
301            handle.abort();
302        }
303        
304        // Final sync
305        self.flush().await?;
306        
307        Ok(())
308    }
309    
310    async fn stats(&self) -> MapletResult<StorageStats> {
311        let mut stats = self.stats.read().await.clone();
312        stats.total_keys = self.cache.len() as u64;
313        stats.memory_usage = self.calculate_memory_usage();
314        stats.disk_usage = self.calculate_disk_usage();
315        Ok(stats)
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322    use tempfile::TempDir;
323
324    #[tokio::test]
325    async fn test_aof_storage_basic_operations() {
326        let temp_dir = TempDir::new().unwrap();
327        let config = StorageConfig {
328            data_dir: temp_dir.path().to_string_lossy().to_string(),
329            sync_interval: 1,
330            ..Default::default()
331        };
332        let storage = AOFStorage::new(config).unwrap();
333        
334        // Test set and get
335        storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
336        let value = storage.get("key1").await.unwrap();
337        assert_eq!(value, Some(b"value1".to_vec()));
338        
339        // Test exists
340        assert!(storage.exists("key1").await.unwrap());
341        assert!(!storage.exists("key2").await.unwrap());
342        
343        // Test delete
344        let deleted = storage.delete("key1").await.unwrap();
345        assert!(deleted);
346        assert!(!storage.exists("key1").await.unwrap());
347    }
348    
349    #[tokio::test]
350    async fn test_aof_storage_persistence() {
351        let temp_dir = TempDir::new().unwrap();
352        let config = StorageConfig {
353            data_dir: temp_dir.path().to_string_lossy().to_string(),
354            sync_interval: 1,
355            ..Default::default()
356        };
357        
358        // Create storage and write data
359        {
360            let storage = AOFStorage::new(config.clone()).unwrap();
361            storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
362            storage.flush().await.unwrap();
363        }
364        
365        // Reopen storage and verify data persists
366        {
367            let storage = AOFStorage::new(config).unwrap();
368            let value = storage.get("key1").await.unwrap();
369            assert_eq!(value, Some(b"value1".to_vec()));
370        }
371    }
372    
373    #[tokio::test]
374    async fn test_aof_storage_stats() {
375        let temp_dir = TempDir::new().unwrap();
376        let config = StorageConfig {
377            data_dir: temp_dir.path().to_string_lossy().to_string(),
378            sync_interval: 1,
379            ..Default::default()
380        };
381        let storage = AOFStorage::new(config).unwrap();
382        
383        storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
384        
385        let stats = storage.stats().await.unwrap();
386        assert_eq!(stats.total_keys, 1);
387        assert!(stats.memory_usage > 0);
388        assert!(stats.disk_usage > 0);
389        assert!(stats.operations_count > 0);
390    }
391}