kv_core/storage/
aof.rs

1//! Append-Only File (AOF) storage implementation
2
3use std::collections::HashMap;
4use std::path::Path;
5use std::sync::Arc;
6use tokio::fs::{File, OpenOptions};
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
8use tokio::sync::RwLock;
9use tracing::{debug, error, warn};
10
11use crate::{
12    KVError, KVResult, Key, Entry, DatabaseId, Storage, StorageStats,
13};
14
15/// AOF operation types
16#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
17enum AOFOperation {
18    Set { key: Key, entry: Entry },
19    Delete { key: Key },
20    Clear,
21}
22
23/// AOF storage implementation
24pub struct AOFStorage {
25    /// In-memory cache of data
26    cache: Arc<RwLock<HashMap<DatabaseId, HashMap<Key, Entry>>>>,
27    /// AOF file path
28    aof_path: String,
29    /// AOF file writer
30    writer: Arc<RwLock<Option<BufWriter<File>>>>,
31    /// Sync interval in seconds
32    sync_interval: u64,
33    /// Background sync task handle
34    sync_handle: Option<tokio::task::JoinHandle<()>>,
35}
36
37impl AOFStorage {
38    /// Create a new AOF storage
39    /// 
40    /// # Errors
41    /// Returns error if directory creation or file operations fail
42    pub async fn new<P: AsRef<Path>>(aof_path: P) -> KVResult<Self> {
43        let aof_path = aof_path.as_ref().to_string_lossy().to_string();
44        
45        // Create directory if it doesn't exist
46        if let Some(parent) = Path::new(&aof_path).parent() {
47            tokio::fs::create_dir_all(parent).await
48                .map_err(|e| KVError::Storage(format!("Failed to create AOF directory: {e}")))?;
49        }
50
51        let mut storage = Self {
52            cache: Arc::new(RwLock::new(HashMap::new())),
53            aof_path: aof_path.clone(),
54            writer: Arc::new(RwLock::new(None)),
55            sync_interval: 1, // Default 1 second
56            sync_handle: None,
57        };
58
59        // Load existing data from AOF file
60        storage.load_from_aof().await?;
61        
62        // Open writer
63        storage.open_writer().await?;
64        
65        // Start background sync task
66        storage.start_sync_task();
67
68        Ok(storage)
69    }
70
71    /// Load data from AOF file
72    async fn load_from_aof(&self) -> KVResult<()> {
73        if !Path::new(&self.aof_path).exists() {
74            debug!("AOF file does not exist, starting fresh");
75            return Ok(());
76        }
77
78        debug!("Loading data from AOF file: {}", self.aof_path);
79        
80        let file = File::open(&self.aof_path).await
81            .map_err(|e| KVError::Storage(format!("Failed to open AOF file: {e}")))?;
82        
83        let reader = BufReader::new(file);
84        let mut lines = reader.lines();
85        let mut cache = self.cache.write().await;
86        
87        let mut line_count = 0;
88        while let Some(line) = lines.next_line().await
89            .map_err(|e| KVError::Storage(format!("Failed to read AOF line: {e}")))?
90        {
91            line_count += 1;
92            
93            if line.trim().is_empty() {
94                continue;
95            }
96            
97            match serde_json::from_str::<AOFOperation>(&line) {
98                Ok(op) => {
99                    match op {
100                        AOFOperation::Set { key, entry } => {
101                            // We need to determine which database this belongs to
102                            // For now, we'll use database 0 as default
103                            // In a real implementation, we'd need to track database context
104                            let db = cache.entry(0).or_insert_with(HashMap::new);
105                            db.insert(key, entry);
106                        }
107                        AOFOperation::Delete { key } => {
108                            if let Some(db) = cache.get_mut(&0) {
109                                db.remove(&key);
110                            }
111                        }
112                        AOFOperation::Clear => {
113                            if let Some(db) = cache.get_mut(&0) {
114                                db.clear();
115                            }
116                        }
117                    }
118                }
119                Err(e) => {
120                    warn!("Failed to parse AOF line {}: {}", line_count, e);
121                    // Continue loading other lines
122                }
123            }
124        }
125        
126        debug!("Loaded {} lines from AOF file", line_count);
127        Ok(())
128    }
129
130    /// Open AOF file for writing
131    #[allow(clippy::significant_drop_tightening)]
132    async fn open_writer(&self) -> KVResult<()> {
133        let file = OpenOptions::new()
134            .create(true)
135            .append(true)
136            .open(&self.aof_path)
137            .await
138            .map_err(|e| KVError::Storage(format!("Failed to open AOF file for writing: {e}")))?;
139        
140        let writer = BufWriter::new(file);
141        let mut writer_guard = self.writer.write().await;
142        *writer_guard = Some(writer);
143        
144        Ok(())
145    }
146
147    /// Write operation to AOF file
148    #[allow(clippy::significant_drop_tightening)]
149    async fn write_operation(&self, operation: AOFOperation) -> KVResult<()> {
150        let operation_json = serde_json::to_string(&operation)
151            .map_err(KVError::Serialization)?;
152        
153        let mut writer_guard = self.writer.write().await;
154        if let Some(writer) = writer_guard.as_mut() {
155            writer.write_all(operation_json.as_bytes()).await
156                .map_err(|e| KVError::Storage(format!("Failed to write to AOF: {e}")))?;
157            writer.write_all(b"\n").await
158                .map_err(|e| KVError::Storage(format!("Failed to write newline to AOF: {e}")))?;
159        } else {
160            return Err(KVError::Storage("AOF writer not available".to_string()));
161        }
162        
163        Ok(())
164    }
165
166    /// Start background sync task
167    fn start_sync_task(&mut self) {
168        let writer = Arc::clone(&self.writer);
169        let sync_interval = self.sync_interval;
170        
171        let handle = tokio::spawn(async move {
172            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(sync_interval));
173            
174            loop {
175                interval.tick().await;
176                
177                let mut writer_guard = writer.write().await;
178                if let Some(writer) = writer_guard.as_mut()
179                    && let Err(e) = writer.flush().await {
180                        error!("Failed to flush AOF: {}", e);
181                    }
182            }
183        });
184        
185        self.sync_handle = Some(handle);
186    }
187
188    /// Stop background sync task
189    fn _stop_sync_task(&mut self) {
190        if let Some(handle) = self.sync_handle.take() {
191            handle.abort();
192        }
193    }
194
195    /// Force sync AOF file
196    #[allow(clippy::significant_drop_tightening)]
197    async fn force_sync(&self) -> KVResult<()> {
198        let mut writer_guard = self.writer.write().await;
199        if let Some(writer) = writer_guard.as_mut() {
200            writer.flush().await
201                .map_err(|e| KVError::Storage(format!("Failed to flush AOF: {e}")))?;
202        }
203        Ok(())
204    }
205}
206
207impl Drop for AOFStorage {
208    fn drop(&mut self) {
209        // Note: We can't use async in Drop, so we'll rely on the sync task
210        // In a real implementation, we'd want to ensure proper cleanup
211    }
212}
213
214#[async_trait::async_trait]
215impl Storage for AOFStorage {
216    #[allow(clippy::significant_drop_tightening, clippy::option_if_let_else)]
217    async fn get(&self, database_id: DatabaseId, key: &Key) -> KVResult<Option<Entry>> {
218        let cache = self.cache.read().await;
219        if let Some(db) = cache.get(&database_id) {
220            Ok(db.get(key).cloned())
221        } else {
222            Ok(None)
223        }
224    }
225
226    #[allow(clippy::significant_drop_tightening)]
227    async fn set(&self, database_id: DatabaseId, key: Key, entry: Entry) -> KVResult<()> {
228        // Update cache
229        {
230            let mut cache = self.cache.write().await;
231            let db = cache.entry(database_id).or_insert_with(HashMap::new);
232            db.insert(key.clone(), entry.clone());
233        }
234        
235        // Write to AOF
236        let operation = AOFOperation::Set { key, entry };
237        self.write_operation(operation).await?;
238        
239        Ok(())
240    }
241
242    #[allow(clippy::significant_drop_tightening, clippy::option_if_let_else)]
243    async fn delete(&self, database_id: DatabaseId, key: &Key) -> KVResult<bool> {
244        // Update cache
245        let existed = {
246            let mut cache = self.cache.write().await;
247            if let Some(db) = cache.get_mut(&database_id) {
248                db.remove(key).is_some()
249            } else {
250                false
251            }
252        };
253        
254        if existed {
255            // Write to AOF
256            let operation = AOFOperation::Delete { key: key.clone() };
257            self.write_operation(operation).await?;
258        }
259        
260        Ok(existed)
261    }
262
263    #[allow(clippy::significant_drop_tightening, clippy::option_if_let_else)]
264    async fn exists(&self, database_id: DatabaseId, key: &Key) -> KVResult<bool> {
265        let cache = self.cache.read().await;
266        if let Some(db) = cache.get(&database_id) {
267            Ok(db.contains_key(key))
268        } else {
269            Ok(false)
270        }
271    }
272
273    #[allow(clippy::significant_drop_tightening, clippy::option_if_let_else)]
274    async fn keys(&self, database_id: DatabaseId) -> KVResult<Vec<Key>> {
275        let cache = self.cache.read().await;
276        if let Some(db) = cache.get(&database_id) {
277            Ok(db.keys().cloned().collect())
278        } else {
279            Ok(Vec::new())
280        }
281    }
282
283    #[allow(clippy::significant_drop_tightening, clippy::option_if_let_else)]
284    async fn keys_pattern(&self, database_id: DatabaseId, pattern: &str) -> KVResult<Vec<Key>> {
285        let cache = self.cache.read().await;
286        if let Some(db) = cache.get(&database_id) {
287            let keys: Vec<Key> = db.keys()
288                .filter(|key| matches_pattern(key, pattern))
289                .cloned()
290                .collect();
291            Ok(keys)
292        } else {
293            Ok(Vec::new())
294        }
295    }
296
297    async fn clear_database(&self, database_id: DatabaseId) -> KVResult<()> {
298        // Update cache
299        {
300            let mut cache = self.cache.write().await;
301            if let Some(db) = cache.get_mut(&database_id) {
302                db.clear();
303            }
304        }
305        
306        // Write to AOF
307        let operation = AOFOperation::Clear;
308        self.write_operation(operation).await?;
309        
310        Ok(())
311    }
312
313    async fn get_stats(&self, database_id: DatabaseId) -> KVResult<StorageStats> {
314        let cache = self.cache.read().await;
315        if let Some(db) = cache.get(&database_id) {
316            let total_keys = db.len() as u64;
317            let memory_usage = std::mem::size_of_val(db) as u64;
318            
319            // Get AOF file size
320            let disk_usage = tokio::fs::metadata(&self.aof_path).await
321                .map(|m| m.len())
322                .unwrap_or(0);
323            
324            Ok(StorageStats {
325                total_keys,
326                memory_usage,
327                disk_usage: Some(disk_usage),
328                last_flush: None, // We don't track flush times in this simple implementation
329            })
330        } else {
331            Ok(StorageStats {
332                total_keys: 0,
333                memory_usage: 0,
334                disk_usage: Some(0),
335                last_flush: None,
336            })
337        }
338    }
339
340    async fn flush(&self) -> KVResult<()> {
341        self.force_sync().await?;
342        debug!("AOF storage flushed");
343        Ok(())
344    }
345
346    async fn close(&self) -> KVResult<()> {
347        self.force_sync().await?;
348        debug!("AOF storage closed");
349        Ok(())
350    }
351}
352
353/// Simple pattern matching (supports * wildcard)
354fn matches_pattern(key: &str, pattern: &str) -> bool {
355    if pattern == "*" {
356        return true;
357    }
358    
359    if !pattern.contains('*') {
360        return key == pattern;
361    }
362    
363    // Simple wildcard matching
364    let pattern_parts: Vec<&str> = pattern.split('*').collect();
365    if pattern_parts.len() == 2 {
366        let prefix = pattern_parts[0];
367        let suffix = pattern_parts[1];
368        
369        if prefix.is_empty() {
370            key.ends_with(suffix)
371        } else if suffix.is_empty() {
372            key.starts_with(prefix)
373        } else {
374            key.starts_with(prefix) && key.ends_with(suffix)
375        }
376    } else {
377        // More complex pattern - for now, just do simple contains
378        key.contains(pattern.trim_matches('*'))
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385    use crate::{Value, Entry};
386    use tempfile::TempDir;
387
388    #[tokio::test]
389    async fn test_aof_storage_basic_operations() {
390        let temp_dir = TempDir::new().unwrap();
391        let aof_path = temp_dir.path().join("test.aof");
392        let storage = AOFStorage::new(&aof_path).await.unwrap();
393        let database_id = 0;
394        
395        // Test set and get
396        let entry = Entry::new(Value::String("test_value".to_string()), None);
397        storage.set(database_id, "test_key".to_string(), entry.clone()).await.unwrap();
398        
399        let retrieved = storage.get(database_id, &"test_key".to_string()).await.unwrap();
400        assert!(retrieved.is_some());
401        assert_eq!(retrieved.unwrap().value.as_string().unwrap(), "test_value");
402        
403        // Test exists
404        let exists = storage.exists(database_id, &"test_key".to_string()).await.unwrap();
405        assert!(exists);
406        
407        // Test delete
408        let deleted = storage.delete(database_id, &"test_key".to_string()).await.unwrap();
409        assert!(deleted);
410        
411        let exists_after = storage.exists(database_id, &"test_key".to_string()).await.unwrap();
412        assert!(!exists_after);
413    }
414
415    #[tokio::test]
416    async fn test_aof_storage_persistence() {
417        let temp_dir = TempDir::new().unwrap();
418        let aof_path = temp_dir.path().join("persistent.aof");
419        
420        // Create storage and add data
421        {
422            let storage = AOFStorage::new(&aof_path).await.unwrap();
423            let entry = Entry::new(Value::String("persistent_value".to_string()), None);
424            storage.set(0, "persistent_key".to_string(), entry).await.unwrap();
425            storage.flush().await.unwrap();
426        }
427        
428        // Reopen storage and check data persists
429        {
430            let storage = AOFStorage::new(&aof_path).await.unwrap();
431            let retrieved = storage.get(0, &"persistent_key".to_string()).await.unwrap();
432            assert!(retrieved.is_some());
433            assert_eq!(retrieved.unwrap().value.as_string().unwrap(), "persistent_value");
434        }
435    }
436
437    #[tokio::test]
438    async fn test_aof_storage_stats() {
439        let temp_dir = TempDir::new().unwrap();
440        let aof_path = temp_dir.path().join("stats.aof");
441        let storage = AOFStorage::new(&aof_path).await.unwrap();
442        
443        // Add some data
444        let entry = Entry::new(Value::String("value".to_string()), None);
445        storage.set(0, "key1".to_string(), entry.clone()).await.unwrap();
446        storage.set(0, "key2".to_string(), entry).await.unwrap();
447        
448        // Force flush to ensure data is written to disk
449        storage.flush().await.unwrap();
450        
451        let stats = storage.get_stats(0).await.unwrap();
452        assert_eq!(stats.total_keys, 2);
453        assert!(stats.memory_usage > 0);
454        assert!(stats.disk_usage.is_some());
455        assert!(stats.disk_usage.unwrap() > 0);
456    }
457}