mappy_core/storage/
hybrid.rs

1//! Hybrid storage backend
2//!
3//! Combines in-memory cache with AOF logging for optimal performance and durability.
4
5use super::{Storage, StorageConfig, StorageStats};
6use crate::{MapletError, MapletResult};
7use async_trait::async_trait;
8use dashmap::DashMap;
9use serde_json;
10use std::fs::OpenOptions;
11use std::io::{BufRead, BufReader, Write};
12use std::path::Path;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::RwLock;
16use tokio::time::{Duration, interval};
17
18/// Hybrid storage backend combining memory and AOF
19pub struct HybridStorage {
20    /// In-memory cache
21    cache: Arc<DashMap<String, Vec<u8>>>,
22    /// AOF file path
23    aof_path: std::path::PathBuf,
24    /// Configuration
25    config: StorageConfig,
26    /// Statistics
27    stats: Arc<RwLock<StorageStats>>,
28    /// Start time for latency calculation
29    #[allow(dead_code)]
30    start_time: Instant,
31    /// Background sync task handle
32    sync_handle: Option<tokio::task::JoinHandle<()>>,
33    /// Write buffer for batching AOF writes
34    write_buffer: Arc<RwLock<Vec<AOFEntry>>>,
35}
36
37#[derive(serde::Serialize, serde::Deserialize, Clone)]
38enum AOFEntry {
39    Set { key: String, value: Vec<u8> },
40    Delete { key: String },
41}
42
43impl HybridStorage {
44    /// Create a new hybrid storage
45    pub fn new(config: StorageConfig) -> MapletResult<Self> {
46        // Ensure data directory exists
47        std::fs::create_dir_all(&config.data_dir)
48            .map_err(|e| MapletError::Internal(format!("Failed to create data directory: {e}")))?;
49
50        let aof_path = Path::new(&config.data_dir).join("mappy.aof");
51
52        let mut storage = Self {
53            cache: Arc::new(DashMap::new()),
54            aof_path,
55            config,
56            stats: Arc::new(RwLock::new(StorageStats::default())),
57            start_time: Instant::now(),
58            sync_handle: None,
59            write_buffer: Arc::new(RwLock::new(Vec::new())),
60        };
61
62        // Load existing data from AOF file
63        storage.load_from_aof()?;
64
65        // Start background sync task
66        storage.start_sync_task();
67
68        Ok(storage)
69    }
70
71    /// Load data from AOF file
72    fn load_from_aof(&self) -> MapletResult<()> {
73        if !self.aof_path.exists() {
74            return Ok(());
75        }
76
77        let file = std::fs::File::open(&self.aof_path)
78            .map_err(|e| MapletError::Internal(format!("Failed to open AOF file: {e}")))?;
79
80        let reader = BufReader::new(file);
81
82        for line in reader.lines() {
83            let line =
84                line.map_err(|e| MapletError::Internal(format!("Failed to read AOF line: {e}")))?;
85            if line.trim().is_empty() {
86                continue;
87            }
88
89            let entry: AOFEntry = serde_json::from_str(&line)
90                .map_err(|e| MapletError::Internal(format!("Failed to parse AOF entry: {e}")))?;
91
92            match entry {
93                AOFEntry::Set { key, value } => {
94                    self.cache.insert(key, value);
95                }
96                AOFEntry::Delete { key } => {
97                    self.cache.remove(&key);
98                }
99            }
100        }
101
102        Ok(())
103    }
104
105    /// Add entry to write buffer
106    async fn add_to_buffer(&self, entry: AOFEntry) -> MapletResult<()> {
107        {
108            let mut buffer = self.write_buffer.write().await;
109            buffer.push(entry);
110
111            // Flush buffer if it's full
112            if buffer.len() >= self.config.write_buffer_size {
113                self.flush_buffer_internal(&mut buffer)?;
114            }
115        }
116
117        Ok(())
118    }
119
120    /// Flush write buffer to AOF file
121    fn flush_buffer_internal(&self, buffer: &mut Vec<AOFEntry>) -> MapletResult<()> {
122        if buffer.is_empty() {
123            return Ok(());
124        }
125
126        let mut file = OpenOptions::new()
127            .create(true)
128            .append(true)
129            .open(&self.aof_path)
130            .map_err(|e| {
131                MapletError::Internal(format!("Failed to open AOF file for append: {e}"))
132            })?;
133
134        for entry in buffer.iter() {
135            let line = serde_json::to_string(entry).map_err(|e| {
136                MapletError::Internal(format!("Failed to serialize AOF entry: {e}"))
137            })?;
138            writeln!(file, "{line}")
139                .map_err(|e| MapletError::Internal(format!("Failed to write to AOF file: {e}")))?;
140        }
141
142        buffer.clear();
143        Ok(())
144    }
145
146    /// Start background sync task
147    fn start_sync_task(&mut self) {
148        let write_buffer = Arc::clone(&self.write_buffer);
149        let aof_path = self.aof_path.clone();
150        let sync_interval = Duration::from_secs(self.config.sync_interval);
151
152        let handle = tokio::spawn(async move {
153            let mut interval = interval(sync_interval);
154
155            loop {
156                interval.tick().await;
157
158                // Flush any pending writes
159                {
160                    let mut buffer = write_buffer.write().await;
161                    if !buffer.is_empty()
162                        && let Err(e) = Self::flush_buffer_internal_static(&mut buffer, &aof_path)
163                    {
164                        eprintln!("Failed to flush AOF buffer: {e}");
165                    }
166                }
167
168                // Force sync to disk
169                if let Err(e) = std::fs::File::open(&aof_path).and_then(|f| f.sync_all()) {
170                    eprintln!("Failed to sync AOF file: {e}");
171                }
172            }
173        });
174
175        self.sync_handle = Some(handle);
176    }
177
178    /// Static version of `flush_buffer_internal` for use in background task
179    fn flush_buffer_internal_static(
180        buffer: &mut Vec<AOFEntry>,
181        aof_path: &std::path::Path,
182    ) -> MapletResult<()> {
183        if buffer.is_empty() {
184            return Ok(());
185        }
186
187        let mut file = OpenOptions::new()
188            .create(true)
189            .append(true)
190            .open(aof_path)
191            .map_err(|e| {
192                MapletError::Internal(format!("Failed to open AOF file for append: {e}"))
193            })?;
194
195        for entry in buffer.iter() {
196            let line = serde_json::to_string(entry).map_err(|e| {
197                MapletError::Internal(format!("Failed to serialize AOF entry: {e}"))
198            })?;
199            writeln!(file, "{line}")
200                .map_err(|e| MapletError::Internal(format!("Failed to write to AOF file: {e}")))?;
201        }
202
203        buffer.clear();
204        Ok(())
205    }
206
207    /// Update statistics
208    async fn update_stats<F>(&self, f: F)
209    where
210        F: FnOnce(&mut StorageStats),
211    {
212        let mut stats = self.stats.write().await;
213        f(&mut stats);
214    }
215
216    /// Calculate memory usage
217    fn calculate_memory_usage(&self) -> u64 {
218        let mut total = 0;
219        for entry in self.cache.iter() {
220            total += entry.key().len() + entry.value().len();
221        }
222        total as u64
223    }
224
225    /// Calculate disk usage
226    fn calculate_disk_usage(&self) -> u64 {
227        if self.aof_path.exists() {
228            std::fs::metadata(&self.aof_path)
229                .map(|m| m.len())
230                .unwrap_or(0)
231        } else {
232            0
233        }
234    }
235}
236
237#[async_trait]
238impl Storage for HybridStorage {
239    async fn get(&self, key: &str) -> MapletResult<Option<Vec<u8>>> {
240        let start = Instant::now();
241        let result = self.cache.get(key).map(|entry| entry.value().clone());
242        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
243
244        self.update_stats(|stats| {
245            stats.operations_count += 1;
246            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
247        })
248        .await;
249
250        Ok(result)
251    }
252
253    async fn set(&self, key: String, value: Vec<u8>) -> MapletResult<()> {
254        let start = Instant::now();
255
256        // Update cache immediately
257        self.cache.insert(key.clone(), value.clone());
258
259        // Add to write buffer for AOF
260        let entry = AOFEntry::Set { key, value };
261        self.add_to_buffer(entry).await?;
262
263        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
264
265        self.update_stats(|stats| {
266            stats.operations_count += 1;
267            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
268            stats.total_keys = self.cache.len() as u64;
269            stats.memory_usage = self.calculate_memory_usage();
270            stats.disk_usage = self.calculate_disk_usage();
271        })
272        .await;
273
274        Ok(())
275    }
276
277    async fn delete(&self, key: &str) -> MapletResult<bool> {
278        let start = Instant::now();
279        let existed = self.cache.remove(key).is_some();
280
281        if existed {
282            // Add delete to write buffer
283            let entry = AOFEntry::Delete {
284                key: key.to_string(),
285            };
286            self.add_to_buffer(entry).await?;
287        }
288
289        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
290
291        self.update_stats(|stats| {
292            stats.operations_count += 1;
293            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
294            stats.total_keys = self.cache.len() as u64;
295            stats.memory_usage = self.calculate_memory_usage();
296            stats.disk_usage = self.calculate_disk_usage();
297        })
298        .await;
299
300        Ok(existed)
301    }
302
303    async fn exists(&self, key: &str) -> MapletResult<bool> {
304        let start = Instant::now();
305        let exists = self.cache.contains_key(key);
306        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
307
308        self.update_stats(|stats| {
309            stats.operations_count += 1;
310            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
311        })
312        .await;
313
314        Ok(exists)
315    }
316
317    async fn keys(&self) -> MapletResult<Vec<String>> {
318        let start = Instant::now();
319        let keys: Vec<String> = self.cache.iter().map(|entry| entry.key().clone()).collect();
320        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
321
322        self.update_stats(|stats| {
323            stats.operations_count += 1;
324            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
325        })
326        .await;
327
328        Ok(keys)
329    }
330
331    async fn clear_database(&self) -> MapletResult<()> {
332        let start = Instant::now();
333        self.cache.clear();
334
335        // Clear write buffer
336        {
337            let mut buffer = self.write_buffer.write().await;
338            buffer.clear();
339        }
340
341        // Clear AOF file
342        std::fs::write(&self.aof_path, "")
343            .map_err(|e| MapletError::Internal(format!("Failed to clear AOF file: {e}")))?;
344
345        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
346
347        self.update_stats(|stats| {
348            stats.operations_count += 1;
349            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
350            stats.total_keys = 0;
351            stats.memory_usage = 0;
352            stats.disk_usage = 0;
353        })
354        .await;
355
356        Ok(())
357    }
358
359    async fn flush(&self) -> MapletResult<()> {
360        let start = Instant::now();
361
362        // Flush write buffer
363        {
364            let mut buffer = self.write_buffer.write().await;
365            self.flush_buffer_internal(&mut buffer)?;
366        }
367
368        // Force sync AOF file to disk
369        std::fs::File::open(&self.aof_path)
370            .and_then(|f| f.sync_all())
371            .map_err(|e| MapletError::Internal(format!("Failed to sync AOF file: {e}")))?;
372
373        let latency = u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
374
375        self.update_stats(|stats| {
376            stats.operations_count += 1;
377            stats.avg_latency_us = u64::midpoint(stats.avg_latency_us, latency);
378        })
379        .await;
380
381        Ok(())
382    }
383
384    async fn close(&self) -> MapletResult<()> {
385        // Cancel background sync task
386        if let Some(handle) = &self.sync_handle {
387            handle.abort();
388        }
389
390        // Final flush
391        self.flush().await?;
392
393        Ok(())
394    }
395
396    async fn stats(&self) -> MapletResult<StorageStats> {
397        let mut stats = self.stats.read().await.clone();
398        stats.total_keys = self.cache.len() as u64;
399        stats.memory_usage = self.calculate_memory_usage();
400        stats.disk_usage = self.calculate_disk_usage();
401        Ok(stats)
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use tempfile::TempDir;
409
410    #[tokio::test]
411    async fn test_hybrid_storage_basic_operations() {
412        let temp_dir = TempDir::new().unwrap();
413        let config = StorageConfig {
414            data_dir: temp_dir.path().to_string_lossy().to_string(),
415            sync_interval: 1,
416            write_buffer_size: 10,
417            ..Default::default()
418        };
419        let storage = HybridStorage::new(config).unwrap();
420
421        // Test set and get
422        storage
423            .set("key1".to_string(), b"value1".to_vec())
424            .await
425            .unwrap();
426        let value = storage.get("key1").await.unwrap();
427        assert_eq!(value, Some(b"value1".to_vec()));
428
429        // Test exists
430        assert!(storage.exists("key1").await.unwrap());
431        assert!(!storage.exists("key2").await.unwrap());
432
433        // Test delete
434        let deleted = storage.delete("key1").await.unwrap();
435        assert!(deleted);
436        assert!(!storage.exists("key1").await.unwrap());
437    }
438
439    #[tokio::test]
440    async fn test_hybrid_storage_persistence() {
441        let temp_dir = TempDir::new().unwrap();
442        let config = StorageConfig {
443            data_dir: temp_dir.path().to_string_lossy().to_string(),
444            sync_interval: 1,
445            write_buffer_size: 10,
446            ..Default::default()
447        };
448
449        // Create storage and write data
450        {
451            let storage = HybridStorage::new(config.clone()).unwrap();
452            storage
453                .set("key1".to_string(), b"value1".to_vec())
454                .await
455                .unwrap();
456            storage.flush().await.unwrap();
457        }
458
459        // Reopen storage and verify data persists
460        {
461            let storage = HybridStorage::new(config).unwrap();
462            let value = storage.get("key1").await.unwrap();
463            assert_eq!(value, Some(b"value1".to_vec()));
464        }
465    }
466
467    #[tokio::test]
468    async fn test_hybrid_storage_stats() {
469        let temp_dir = TempDir::new().unwrap();
470        let config = StorageConfig {
471            data_dir: temp_dir.path().to_string_lossy().to_string(),
472            sync_interval: 1,
473            write_buffer_size: 10,
474            ..Default::default()
475        };
476        let storage = HybridStorage::new(config).unwrap();
477
478        storage
479            .set("key1".to_string(), b"value1".to_vec())
480            .await
481            .unwrap();
482
483        // Flush to ensure data is written to disk
484        storage.flush().await.unwrap();
485
486        let stats = storage.stats().await.unwrap();
487        assert_eq!(stats.total_keys, 1);
488        assert!(stats.memory_usage > 0);
489        assert!(stats.disk_usage > 0);
490        assert!(stats.operations_count > 0);
491    }
492}