mappy_core/storage/
hybrid.rs

1//! Hybrid storage backend
2//! 
3//! Combines in-memory cache with AOF logging for optimal performance and durability.
4
5use super::{Storage, StorageStats, StorageConfig};
6use crate::{MapletError, MapletResult};
7use dashmap::DashMap;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use std::time::Instant;
11use std::path::Path;
12use std::fs::OpenOptions;
13use std::io::{BufReader, Write, BufRead};
14use serde_json;
15use async_trait::async_trait;
16use tokio::time::{interval, Duration};
17
18/// Hybrid storage backend combining memory and AOF
19pub struct HybridStorage {
20    /// In-memory cache
21    cache: Arc<DashMap<String, Vec<u8>>>,
22    /// AOF file path
23    aof_path: std::path::PathBuf,
24    /// Configuration
25    config: StorageConfig,
26    /// Statistics
27    stats: Arc<RwLock<StorageStats>>,
28    /// Start time for latency calculation
29    #[allow(dead_code)]
30    start_time: Instant,
31    /// Background sync task handle
32    sync_handle: Option<tokio::task::JoinHandle<()>>,
33    /// Write buffer for batching AOF writes
34    write_buffer: Arc<RwLock<Vec<AOFEntry>>>,
35}
36
37#[derive(serde::Serialize, serde::Deserialize, Clone)]
38enum AOFEntry {
39    Set { key: String, value: Vec<u8> },
40    Delete { key: String },
41}
42
43impl HybridStorage {
44    /// Create a new hybrid storage
45    pub fn new(config: StorageConfig) -> MapletResult<Self> {
46        // Ensure data directory exists
47        std::fs::create_dir_all(&config.data_dir)
48            .map_err(|e| MapletError::Internal(format!("Failed to create data directory: {}", e)))?;
49        
50        let aof_path = Path::new(&config.data_dir).join("mappy.aof");
51        
52        let mut storage = Self {
53            cache: Arc::new(DashMap::new()),
54            aof_path: aof_path.clone(),
55            config,
56            stats: Arc::new(RwLock::new(StorageStats::default())),
57            start_time: Instant::now(),
58            sync_handle: None,
59            write_buffer: Arc::new(RwLock::new(Vec::new())),
60        };
61        
62        // Load existing data from AOF file
63        storage.load_from_aof()?;
64        
65        // Start background sync task
66        storage.start_sync_task();
67        
68        Ok(storage)
69    }
70    
71    /// Load data from AOF file
72    fn load_from_aof(&self) -> MapletResult<()> {
73        if !self.aof_path.exists() {
74            return Ok(());
75        }
76        
77        let file = std::fs::File::open(&self.aof_path)
78            .map_err(|e| MapletError::Internal(format!("Failed to open AOF file: {}", e)))?;
79        
80        let reader = BufReader::new(file);
81        
82        for line in reader.lines() {
83            let line = line.map_err(|e| MapletError::Internal(format!("Failed to read AOF line: {}", e)))?;
84            if line.trim().is_empty() {
85                continue;
86            }
87            
88            let entry: AOFEntry = serde_json::from_str(&line)
89                .map_err(|e| MapletError::Internal(format!("Failed to parse AOF entry: {}", e)))?;
90            
91            match entry {
92                AOFEntry::Set { key, value } => {
93                    self.cache.insert(key, value);
94                }
95                AOFEntry::Delete { key } => {
96                    self.cache.remove(&key);
97                }
98            }
99        }
100        
101        Ok(())
102    }
103    
104    /// Add entry to write buffer
105    async fn add_to_buffer(&self, entry: AOFEntry) -> MapletResult<()> {
106        let mut buffer = self.write_buffer.write().await;
107        buffer.push(entry);
108        
109        // Flush buffer if it's full
110        if buffer.len() >= self.config.write_buffer_size {
111            self.flush_buffer_internal(&mut buffer)?;
112        }
113        
114        Ok(())
115    }
116    
117    /// Flush write buffer to AOF file
118    fn flush_buffer_internal(&self, buffer: &mut Vec<AOFEntry>) -> MapletResult<()> {
119        if buffer.is_empty() {
120            return Ok(());
121        }
122        
123        let mut file = OpenOptions::new()
124            .create(true)
125            .append(true)
126            .open(&self.aof_path)
127            .map_err(|e| MapletError::Internal(format!("Failed to open AOF file for append: {}", e)))?;
128        
129        for entry in buffer.iter() {
130            let line = serde_json::to_string(entry)
131                .map_err(|e| MapletError::Internal(format!("Failed to serialize AOF entry: {}", e)))?;
132            writeln!(file, "{}", line)
133                .map_err(|e| MapletError::Internal(format!("Failed to write to AOF file: {}", e)))?;
134        }
135        
136        buffer.clear();
137        Ok(())
138    }
139    
140    /// Start background sync task
141    fn start_sync_task(&mut self) {
142        let write_buffer = Arc::clone(&self.write_buffer);
143        let aof_path = self.aof_path.clone();
144        let sync_interval = Duration::from_secs(self.config.sync_interval);
145        
146        let handle = tokio::spawn(async move {
147            let mut interval = interval(sync_interval);
148            
149            loop {
150                interval.tick().await;
151                
152                // Flush any pending writes
153                {
154                    let mut buffer = write_buffer.write().await;
155                    if !buffer.is_empty() {
156                        if let Err(e) = Self::flush_buffer_internal_static(&mut buffer, &aof_path) {
157                            eprintln!("Failed to flush AOF buffer: {}", e);
158                        }
159                    }
160                }
161                
162                // Force sync to disk
163                if let Err(e) = std::fs::File::open(&aof_path)
164                    .and_then(|f| f.sync_all())
165                {
166                    eprintln!("Failed to sync AOF file: {}", e);
167                }
168            }
169        });
170        
171        self.sync_handle = Some(handle);
172    }
173    
174    /// Static version of flush_buffer_internal for use in background task
175    fn flush_buffer_internal_static(
176        buffer: &mut Vec<AOFEntry>,
177        aof_path: &std::path::Path,
178    ) -> MapletResult<()> {
179        if buffer.is_empty() {
180            return Ok(());
181        }
182        
183        let mut file = OpenOptions::new()
184            .create(true)
185            .append(true)
186            .open(aof_path)
187            .map_err(|e| MapletError::Internal(format!("Failed to open AOF file for append: {}", e)))?;
188        
189        for entry in buffer.iter() {
190            let line = serde_json::to_string(entry)
191                .map_err(|e| MapletError::Internal(format!("Failed to serialize AOF entry: {}", e)))?;
192            writeln!(file, "{}", line)
193                .map_err(|e| MapletError::Internal(format!("Failed to write to AOF file: {}", e)))?;
194        }
195        
196        buffer.clear();
197        Ok(())
198    }
199    
200    /// Update statistics
201    async fn update_stats<F>(&self, f: F) 
202    where
203        F: FnOnce(&mut StorageStats),
204    {
205        let mut stats = self.stats.write().await;
206        f(&mut stats);
207    }
208    
209    /// Calculate memory usage
210    fn calculate_memory_usage(&self) -> u64 {
211        let mut total = 0;
212        for entry in self.cache.iter() {
213            total += entry.key().len() + entry.value().len();
214        }
215        total as u64
216    }
217    
218    /// Calculate disk usage
219    fn calculate_disk_usage(&self) -> u64 {
220        if self.aof_path.exists() {
221            std::fs::metadata(&self.aof_path)
222                .map(|m| m.len())
223                .unwrap_or(0)
224        } else {
225            0
226        }
227    }
228}
229
230#[async_trait]
231impl Storage for HybridStorage {
232    async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
233        let start = Instant::now();
234        let result = self.cache.get(key).map(|entry| entry.value().clone());
235        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
236        
237        self.update_stats(|stats| {
238            stats.operations_count += 1;
239            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
240        }).await;
241        
242        Ok(result)
243    }
244    
245    async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
246        let start = Instant::now();
247        
248        // Update cache immediately
249        self.cache.insert(key.clone(), value.clone());
250        
251        // Add to write buffer for AOF
252        let entry = AOFEntry::Set { key, value };
253        self.add_to_buffer(entry).await?;
254        
255        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
256        
257        self.update_stats(|stats| {
258            stats.operations_count += 1;
259            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
260            stats.total_keys = self.cache.len() as u64;
261            stats.memory_usage = self.calculate_memory_usage();
262            stats.disk_usage = self.calculate_disk_usage();
263        }).await;
264        
265        Ok(())
266    }
267    
268    async fn delete(&self, key: &str) -> MapletResult<bool> {
269        let start = Instant::now();
270        let existed = self.cache.remove(key).is_some();
271        
272        if existed {
273            // Add delete to write buffer
274            let entry = AOFEntry::Delete { key: key.to_string() };
275            self.add_to_buffer(entry).await?;
276        }
277        
278        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
279        
280        self.update_stats(|stats| {
281            stats.operations_count += 1;
282            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
283            stats.total_keys = self.cache.len() as u64;
284            stats.memory_usage = self.calculate_memory_usage();
285            stats.disk_usage = self.calculate_disk_usage();
286        }).await;
287        
288        Ok(existed)
289    }
290    
291    async fn exists(&self, key: &str) -> MapletResult<bool> {
292        let start = Instant::now();
293        let exists = self.cache.contains_key(key);
294        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
295        
296        self.update_stats(|stats| {
297            stats.operations_count += 1;
298            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
299        }).await;
300        
301        Ok(exists)
302    }
303    
304    async fn keys(&self) -> MapletResult<Vec<String>> {
305        let start = Instant::now();
306        let keys: Vec<String> = self.cache.iter().map(|entry| entry.key().clone()).collect();
307        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
308        
309        self.update_stats(|stats| {
310            stats.operations_count += 1;
311            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
312        }).await;
313        
314        Ok(keys)
315    }
316    
317    async fn clear_database(&self) -> MapletResult<()> {
318        let start = Instant::now();
319        self.cache.clear();
320        
321        // Clear write buffer
322        {
323            let mut buffer = self.write_buffer.write().await;
324            buffer.clear();
325        }
326        
327        // Clear AOF file
328        std::fs::write(&self.aof_path, "")
329            .map_err(|e| MapletError::Internal(format!("Failed to clear AOF file: {e}")))?;
330        
331        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
332        
333        self.update_stats(|stats| {
334            stats.operations_count += 1;
335            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
336            stats.total_keys = 0;
337            stats.memory_usage = 0;
338            stats.disk_usage = 0;
339        }).await;
340        
341        Ok(())
342    }
343    
344    async fn flush(&self) -> MapletResult<()> {
345        let start = Instant::now();
346        
347        // Flush write buffer
348        {
349            let mut buffer = self.write_buffer.write().await;
350            self.flush_buffer_internal(&mut buffer)?;
351        }
352        
353        // Force sync AOF file to disk
354        std::fs::File::open(&self.aof_path)
355            .and_then(|f| f.sync_all())
356            .map_err(|e| MapletError::Internal(format!("Failed to sync AOF file: {e}")))?;
357        
358        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
359        
360        self.update_stats(|stats| {
361            stats.operations_count += 1;
362            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
363        }).await;
364        
365        Ok(())
366    }
367    
368    async fn close(&self) -> MapletResult<()> {
369        // Cancel background sync task
370        if let Some(handle) = &self.sync_handle {
371            handle.abort();
372        }
373        
374        // Final flush
375        self.flush().await?;
376        
377        Ok(())
378    }
379    
380    async fn stats(&self) -> MapletResult<StorageStats> {
381        let mut stats = self.stats.read().await.clone();
382        stats.total_keys = self.cache.len() as u64;
383        stats.memory_usage = self.calculate_memory_usage();
384        stats.disk_usage = self.calculate_disk_usage();
385        Ok(stats)
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use tempfile::TempDir;
393
394    #[tokio::test]
395    async fn test_hybrid_storage_basic_operations() {
396        let temp_dir = TempDir::new().unwrap();
397        let config = StorageConfig {
398            data_dir: temp_dir.path().to_string_lossy().to_string(),
399            sync_interval: 1,
400            write_buffer_size: 10,
401            ..Default::default()
402        };
403        let storage = HybridStorage::new(config).unwrap();
404        
405        // Test set and get
406        storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
407        let value = storage.get("key1").await.unwrap();
408        assert_eq!(value, Some(b"value1".to_vec()));
409        
410        // Test exists
411        assert!(storage.exists("key1").await.unwrap());
412        assert!(!storage.exists("key2").await.unwrap());
413        
414        // Test delete
415        let deleted = storage.delete("key1").await.unwrap();
416        assert!(deleted);
417        assert!(!storage.exists("key1").await.unwrap());
418    }
419    
420    #[tokio::test]
421    async fn test_hybrid_storage_persistence() {
422        let temp_dir = TempDir::new().unwrap();
423        let config = StorageConfig {
424            data_dir: temp_dir.path().to_string_lossy().to_string(),
425            sync_interval: 1,
426            write_buffer_size: 10,
427            ..Default::default()
428        };
429        
430        // Create storage and write data
431        {
432            let storage = HybridStorage::new(config.clone()).unwrap();
433            storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
434            storage.flush().await.unwrap();
435        }
436        
437        // Reopen storage and verify data persists
438        {
439            let storage = HybridStorage::new(config).unwrap();
440            let value = storage.get("key1").await.unwrap();
441            assert_eq!(value, Some(b"value1".to_vec()));
442        }
443    }
444    
445    #[tokio::test]
446    async fn test_hybrid_storage_stats() {
447        let temp_dir = TempDir::new().unwrap();
448        let config = StorageConfig {
449            data_dir: temp_dir.path().to_string_lossy().to_string(),
450            sync_interval: 1,
451            write_buffer_size: 10,
452            ..Default::default()
453        };
454        let storage = HybridStorage::new(config).unwrap();
455        
456        storage.set("key1".to_string(), b"value1".to_vec()).await.unwrap();
457        
458        // Flush to ensure data is written to disk
459        storage.flush().await.unwrap();
460        
461        let stats = storage.stats().await.unwrap();
462        assert_eq!(stats.total_keys, 1);
463        assert!(stats.memory_usage > 0);
464        assert!(stats.disk_usage > 0);
465        assert!(stats.operations_count > 0);
466    }
467}