Skip to main content

do_memory_storage_redb/persistence/
mod.rs

1//! # Redb Cache Persistence Module
2//!
3//! This module provides persistence for the redb cache layer, enabling:
4//! - Cache state save/load functionality
5//! - Graceful shutdown with cache flush
6//! - Recovery on startup
7//! - Incremental cache persistence
8//!
9//! ## Features
10//!
11//! - **Full Cache Persistence**: Save and load complete cache state
12//! - **Incremental Updates**: Persist only changed entries
13//! - **Graceful Shutdown**: Automatic cache flush on shutdown
14//! - **Recovery**: Restore cache state on startup
15//! - **Compression**: Optional compression for persisted data
16//!
17//! ## Usage
18//!
19//! ```rust
20//! use do_memory_storage_redb::{CachePersistence, PersistenceConfig};
21//!
22//! let config = PersistenceConfig::default();
23//! let persistence = CachePersistence::new(config);
24//! ```
25
26use std::path::{Path, PathBuf};
27use std::sync::Arc;
28#[allow(unused_imports)]
29use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
30
31use parking_lot::RwLock;
32use tracing::{debug, info};
33
34#[allow(unused_imports)] // False positive - imports are used in conditional code
35mod config;
36mod manager;
37mod types;
38
39pub use config::{PersistenceConfig, PersistenceMode, PersistenceStrategy};
40pub use manager::PersistenceManager;
41pub use types::{CacheSnapshot, IncrementalUpdate, PersistedCacheEntry, PersistenceStats};
42
43/// Cache persistence handler
44///
45/// Manages saving and loading cache state to/from disk.
46/// Supports full snapshots and incremental updates.
47#[derive(Debug)]
48pub struct CachePersistence {
49    config: PersistenceConfig,
50    stats: Arc<RwLock<PersistenceStats>>,
51    last_save: Arc<RwLock<Option<Instant>>>,
52}
53
54impl CachePersistence {
55    /// Create a new cache persistence handler
56    pub fn new(config: PersistenceConfig) -> Self {
57        info!(
58            "Creating cache persistence with mode={:?}, strategy={:?}",
59            config.mode, config.strategy
60        );
61
62        Self {
63            config,
64            stats: Arc::new(RwLock::new(PersistenceStats::default())),
65            last_save: Arc::new(RwLock::new(None)),
66        }
67    }
68
69    /// Create with default configuration
70    pub fn with_default_config() -> Self {
71        Self::new(PersistenceConfig::default())
72    }
73
74    /// Get persistence configuration
75    pub fn config(&self) -> &PersistenceConfig {
76        &self.config
77    }
78
79    /// Get persistence statistics
80    pub fn stats(&self) -> PersistenceStats {
81        self.stats.read().clone()
82    }
83
84    /// Check if persistence is enabled
85    pub fn is_enabled(&self) -> bool {
86        self.config.enabled
87    }
88
89    /// Save cache snapshot to disk
90    ///
91    /// # Arguments
92    ///
93    /// * `snapshot` - The cache snapshot to save
94    /// * `path` - Optional path override (uses config path if None)
95    ///
96    /// # Returns
97    ///
98    /// Number of entries saved, or error if save failed
99    pub fn save_snapshot(
100        &self,
101        snapshot: &CacheSnapshot,
102        path: Option<&Path>,
103    ) -> crate::Result<usize> {
104        if !self.config.enabled {
105            debug!("Cache persistence disabled, skipping save");
106            return Ok(0);
107        }
108
109        let save_path = path
110            .map(PathBuf::from)
111            .unwrap_or_else(|| self.config.persistence_path.clone());
112
113        info!(
114            "Saving cache snapshot with {} entries to {:?}",
115            snapshot.entries.len(),
116            save_path
117        );
118
119        let start = Instant::now();
120
121        // Serialize snapshot
122        let serialized = postcard::to_allocvec(snapshot).map_err(|e| {
123            crate::Error::Storage(format!("Failed to serialize cache snapshot: {}", e))
124        })?;
125
126        // Compress if enabled
127        let data = if self.config.compression_enabled {
128            debug!("Compressing cache snapshot ({} bytes)", serialized.len());
129            compress_data(&serialized).map_err(|e| {
130                crate::Error::Storage(format!("Failed to compress cache snapshot: {}", e))
131            })?
132        } else {
133            serialized
134        };
135
136        // Write to file
137        std::fs::write(&save_path, &data)
138            .map_err(|e| crate::Error::Storage(format!("Failed to write cache snapshot: {}", e)))?;
139
140        let elapsed = start.elapsed();
141        let bytes_written = data.len();
142
143        // Update statistics
144        {
145            let mut stats = self.stats.write();
146            stats.snapshots_saved += 1;
147            stats.total_entries_saved += snapshot.entries.len();
148            stats.total_bytes_written += bytes_written as u64;
149            stats.last_save_duration = elapsed;
150        }
151
152        // Update last save time
153        {
154            let mut last = self.last_save.write();
155            *last = Some(Instant::now());
156        }
157
158        info!(
159            "Cache snapshot saved: {} entries, {} bytes in {:?}",
160            snapshot.entries.len(),
161            bytes_written,
162            elapsed
163        );
164
165        Ok(snapshot.entries.len())
166    }
167
168    /// Load cache snapshot from disk
169    ///
170    /// # Arguments
171    ///
172    /// * `path` - Optional path override (uses config path if None)
173    ///
174    /// # Returns
175    ///
176    /// Loaded cache snapshot, or error if load failed
177    pub fn load_snapshot(&self, path: Option<&Path>) -> crate::Result<Option<CacheSnapshot>> {
178        if !self.config.enabled {
179            debug!("Cache persistence disabled, skipping load");
180            return Ok(None);
181        }
182
183        let load_path = path
184            .map(PathBuf::from)
185            .unwrap_or_else(|| self.config.persistence_path.clone());
186
187        if !load_path.exists() {
188            debug!("No cache snapshot found at {:?}", load_path);
189            return Ok(None);
190        }
191
192        info!("Loading cache snapshot from {:?}", load_path);
193
194        let start = Instant::now();
195
196        // Read from file
197        let data = std::fs::read(&load_path)
198            .map_err(|e| crate::Error::Storage(format!("Failed to read cache snapshot: {}", e)))?;
199
200        // Decompress if needed
201        let serialized = if self.config.compression_enabled {
202            debug!("Decompressing cache snapshot ({} bytes)", data.len());
203            decompress_data(&data).map_err(|e| {
204                crate::Error::Storage(format!("Failed to decompress cache snapshot: {}", e))
205            })?
206        } else {
207            data
208        };
209
210        // Deserialize snapshot
211        let snapshot: CacheSnapshot = postcard::from_bytes(&serialized).map_err(|e| {
212            crate::Error::Storage(format!("Failed to deserialize cache snapshot: {}", e))
213        })?;
214
215        let elapsed = start.elapsed();
216
217        // Update statistics
218        {
219            let mut stats = self.stats.write();
220            stats.snapshots_loaded += 1;
221            stats.total_entries_loaded += snapshot.entries.len();
222            stats.total_bytes_read += serialized.len() as u64;
223            stats.last_load_duration = elapsed;
224        }
225
226        info!(
227            "Cache snapshot loaded: {} entries, {} bytes in {:?}",
228            snapshot.entries.len(),
229            serialized.len(),
230            elapsed
231        );
232
233        Ok(Some(snapshot))
234    }
235
236    /// Check if a save is needed based on configuration
237    pub fn should_save(&self, entries_count: usize) -> bool {
238        if !self.config.enabled {
239            return false;
240        }
241
242        // Check minimum entries threshold
243        if entries_count < self.config.min_entries_threshold {
244            return false;
245        }
246
247        // Check save interval
248        if let Some(last) = *self.last_save.read() {
249            if last.elapsed() < self.config.save_interval {
250                return false;
251            }
252        }
253
254        true
255    }
256
257    /// Delete persisted cache snapshot
258    pub fn delete_snapshot(&self, path: Option<&Path>) -> crate::Result<bool> {
259        let delete_path = path
260            .map(PathBuf::from)
261            .unwrap_or_else(|| self.config.persistence_path.clone());
262
263        if delete_path.exists() {
264            std::fs::remove_file(&delete_path).map_err(|e| {
265                crate::Error::Storage(format!("Failed to delete cache snapshot: {}", e))
266            })?;
267
268            info!("Cache snapshot deleted: {:?}", delete_path);
269            Ok(true)
270        } else {
271            Ok(false)
272        }
273    }
274
275    /// Get the age of the last save
276    pub fn last_save_age(&self) -> Option<Duration> {
277        self.last_save.read().map(|instant| instant.elapsed())
278    }
279
280    /// Reset persistence statistics
281    pub fn reset_stats(&self) {
282        let mut stats = self.stats.write();
283        *stats = PersistenceStats::default();
284        info!("Cache persistence statistics reset");
285    }
286}
287
288impl Default for CachePersistence {
289    fn default() -> Self {
290        Self::new(PersistenceConfig::default())
291    }
292}
293
294/// Compress data using LZ4
295fn compress_data(data: &[u8]) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
296    // Simple compression: store length prefix + compressed data
297    let compressed = lz4_flex::compress_prepend_size(data);
298    Ok(compressed)
299}
300
301/// Decompress data using LZ4
302fn decompress_data(data: &[u8]) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
303    let decompressed = lz4_flex::decompress_size_prepended(data)?;
304    Ok(decompressed)
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310    use std::collections::HashMap;
311    use tempfile::TempDir;
312
313    fn create_test_snapshot() -> CacheSnapshot {
314        let entries = vec![
315            PersistedCacheEntry {
316                key: "entry1".to_string(),
317                value: vec![1, 2, 3],
318                created_at: SystemTime::now()
319                    .duration_since(UNIX_EPOCH)
320                    .unwrap()
321                    .as_secs(),
322                access_count: 5,
323                last_accessed: SystemTime::now()
324                    .duration_since(UNIX_EPOCH)
325                    .unwrap()
326                    .as_secs(),
327                ttl_secs: None,
328            },
329            PersistedCacheEntry {
330                key: "entry2".to_string(),
331                value: vec![4, 5, 6],
332                created_at: SystemTime::now()
333                    .duration_since(UNIX_EPOCH)
334                    .unwrap()
335                    .as_secs(),
336                access_count: 3,
337                last_accessed: SystemTime::now()
338                    .duration_since(UNIX_EPOCH)
339                    .unwrap()
340                    .as_secs(),
341                ttl_secs: None,
342            },
343        ];
344
345        CacheSnapshot {
346            version: 1,
347            created_at: SystemTime::now()
348                .duration_since(UNIX_EPOCH)
349                .unwrap()
350                .as_secs(),
351            entries,
352            metadata: HashMap::new(),
353        }
354    }
355
356    #[test]
357    fn test_persistence_creation() {
358        let config = PersistenceConfig::default();
359        let persistence = CachePersistence::new(config);
360
361        assert!(persistence.is_enabled());
362        assert_eq!(persistence.stats().snapshots_saved, 0);
363    }
364
365    #[test]
366    fn test_save_and_load_snapshot() {
367        let temp_dir = TempDir::new().unwrap();
368        let snapshot_path = temp_dir.path().join("cache.snapshot");
369
370        let config = PersistenceConfig {
371            enabled: true,
372            persistence_path: snapshot_path.clone(),
373            compression_enabled: false,
374            ..Default::default()
375        };
376
377        let persistence = CachePersistence::new(config);
378        let snapshot = create_test_snapshot();
379
380        // Save snapshot
381        let saved = persistence.save_snapshot(&snapshot, None).unwrap();
382        assert_eq!(saved, 2);
383        assert_eq!(persistence.stats().snapshots_saved, 1);
384
385        // Load snapshot
386        let loaded = persistence.load_snapshot(None).unwrap();
387        assert!(loaded.is_some());
388
389        let loaded_snapshot = loaded.unwrap();
390        assert_eq!(loaded_snapshot.entries.len(), 2);
391        assert_eq!(loaded_snapshot.entries[0].key, "entry1");
392        assert_eq!(loaded_snapshot.entries[1].key, "entry2");
393    }
394
395    #[test]
396    fn test_save_with_compression() {
397        let temp_dir = TempDir::new().unwrap();
398        let snapshot_path = temp_dir.path().join("cache.snapshot");
399
400        let config = PersistenceConfig {
401            enabled: true,
402            persistence_path: snapshot_path.clone(),
403            compression_enabled: true,
404            ..Default::default()
405        };
406
407        let persistence = CachePersistence::new(config);
408        let snapshot = create_test_snapshot();
409
410        // Save with compression
411        let saved = persistence.save_snapshot(&snapshot, None).unwrap();
412        assert_eq!(saved, 2);
413
414        // Load and verify
415        let loaded = persistence.load_snapshot(None).unwrap();
416        assert!(loaded.is_some());
417        assert_eq!(loaded.unwrap().entries.len(), 2);
418    }
419
420    #[test]
421    fn test_disabled_persistence() {
422        let config = PersistenceConfig {
423            enabled: false,
424            ..Default::default()
425        };
426
427        let persistence = CachePersistence::new(config);
428        let snapshot = create_test_snapshot();
429
430        assert!(!persistence.is_enabled());
431        assert_eq!(persistence.save_snapshot(&snapshot, None).unwrap(), 0);
432        assert!(persistence.load_snapshot(None).unwrap().is_none());
433    }
434
435    #[test]
436    fn test_should_save() {
437        let config = PersistenceConfig {
438            enabled: true,
439            min_entries_threshold: 10,
440            save_interval: Duration::from_secs(60),
441            ..Default::default()
442        };
443
444        let persistence = CachePersistence::new(config);
445
446        // Below threshold
447        assert!(!persistence.should_save(5));
448
449        // Above threshold
450        assert!(persistence.should_save(15));
451    }
452
453    #[test]
454    fn test_delete_snapshot() {
455        let temp_dir = TempDir::new().unwrap();
456        let snapshot_path = temp_dir.path().join("cache.snapshot");
457
458        let config = PersistenceConfig {
459            enabled: true,
460            persistence_path: snapshot_path.clone(),
461            ..Default::default()
462        };
463
464        let persistence = CachePersistence::new(config);
465        let snapshot = create_test_snapshot();
466
467        // Save and verify exists
468        persistence.save_snapshot(&snapshot, None).unwrap();
469        assert!(snapshot_path.exists());
470
471        // Delete and verify gone
472        let deleted = persistence.delete_snapshot(None).unwrap();
473        assert!(deleted);
474        assert!(!snapshot_path.exists());
475
476        // Delete non-existent
477        let deleted = persistence.delete_snapshot(None).unwrap();
478        assert!(!deleted);
479    }
480}