do_memory_storage_redb/persistence/
mod.rs1use std::path::{Path, PathBuf};
27use std::sync::Arc;
28#[allow(unused_imports)]
29use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
30
31use parking_lot::RwLock;
32use tracing::{debug, info};
33
34#[allow(unused_imports)] mod config;
36mod manager;
37mod types;
38
39pub use config::{PersistenceConfig, PersistenceMode, PersistenceStrategy};
40pub use manager::PersistenceManager;
41pub use types::{CacheSnapshot, IncrementalUpdate, PersistedCacheEntry, PersistenceStats};
42
43#[derive(Debug)]
48pub struct CachePersistence {
49 config: PersistenceConfig,
50 stats: Arc<RwLock<PersistenceStats>>,
51 last_save: Arc<RwLock<Option<Instant>>>,
52}
53
54impl CachePersistence {
55 pub fn new(config: PersistenceConfig) -> Self {
57 info!(
58 "Creating cache persistence with mode={:?}, strategy={:?}",
59 config.mode, config.strategy
60 );
61
62 Self {
63 config,
64 stats: Arc::new(RwLock::new(PersistenceStats::default())),
65 last_save: Arc::new(RwLock::new(None)),
66 }
67 }
68
69 pub fn with_default_config() -> Self {
71 Self::new(PersistenceConfig::default())
72 }
73
74 pub fn config(&self) -> &PersistenceConfig {
76 &self.config
77 }
78
79 pub fn stats(&self) -> PersistenceStats {
81 self.stats.read().clone()
82 }
83
84 pub fn is_enabled(&self) -> bool {
86 self.config.enabled
87 }
88
89 pub fn save_snapshot(
100 &self,
101 snapshot: &CacheSnapshot,
102 path: Option<&Path>,
103 ) -> crate::Result<usize> {
104 if !self.config.enabled {
105 debug!("Cache persistence disabled, skipping save");
106 return Ok(0);
107 }
108
109 let save_path = path
110 .map(PathBuf::from)
111 .unwrap_or_else(|| self.config.persistence_path.clone());
112
113 info!(
114 "Saving cache snapshot with {} entries to {:?}",
115 snapshot.entries.len(),
116 save_path
117 );
118
119 let start = Instant::now();
120
121 let serialized = postcard::to_allocvec(snapshot).map_err(|e| {
123 crate::Error::Storage(format!("Failed to serialize cache snapshot: {}", e))
124 })?;
125
126 let data = if self.config.compression_enabled {
128 debug!("Compressing cache snapshot ({} bytes)", serialized.len());
129 compress_data(&serialized).map_err(|e| {
130 crate::Error::Storage(format!("Failed to compress cache snapshot: {}", e))
131 })?
132 } else {
133 serialized
134 };
135
136 std::fs::write(&save_path, &data)
138 .map_err(|e| crate::Error::Storage(format!("Failed to write cache snapshot: {}", e)))?;
139
140 let elapsed = start.elapsed();
141 let bytes_written = data.len();
142
143 {
145 let mut stats = self.stats.write();
146 stats.snapshots_saved += 1;
147 stats.total_entries_saved += snapshot.entries.len();
148 stats.total_bytes_written += bytes_written as u64;
149 stats.last_save_duration = elapsed;
150 }
151
152 {
154 let mut last = self.last_save.write();
155 *last = Some(Instant::now());
156 }
157
158 info!(
159 "Cache snapshot saved: {} entries, {} bytes in {:?}",
160 snapshot.entries.len(),
161 bytes_written,
162 elapsed
163 );
164
165 Ok(snapshot.entries.len())
166 }
167
168 pub fn load_snapshot(&self, path: Option<&Path>) -> crate::Result<Option<CacheSnapshot>> {
178 if !self.config.enabled {
179 debug!("Cache persistence disabled, skipping load");
180 return Ok(None);
181 }
182
183 let load_path = path
184 .map(PathBuf::from)
185 .unwrap_or_else(|| self.config.persistence_path.clone());
186
187 if !load_path.exists() {
188 debug!("No cache snapshot found at {:?}", load_path);
189 return Ok(None);
190 }
191
192 info!("Loading cache snapshot from {:?}", load_path);
193
194 let start = Instant::now();
195
196 let data = std::fs::read(&load_path)
198 .map_err(|e| crate::Error::Storage(format!("Failed to read cache snapshot: {}", e)))?;
199
200 let serialized = if self.config.compression_enabled {
202 debug!("Decompressing cache snapshot ({} bytes)", data.len());
203 decompress_data(&data).map_err(|e| {
204 crate::Error::Storage(format!("Failed to decompress cache snapshot: {}", e))
205 })?
206 } else {
207 data
208 };
209
210 let snapshot: CacheSnapshot = postcard::from_bytes(&serialized).map_err(|e| {
212 crate::Error::Storage(format!("Failed to deserialize cache snapshot: {}", e))
213 })?;
214
215 let elapsed = start.elapsed();
216
217 {
219 let mut stats = self.stats.write();
220 stats.snapshots_loaded += 1;
221 stats.total_entries_loaded += snapshot.entries.len();
222 stats.total_bytes_read += serialized.len() as u64;
223 stats.last_load_duration = elapsed;
224 }
225
226 info!(
227 "Cache snapshot loaded: {} entries, {} bytes in {:?}",
228 snapshot.entries.len(),
229 serialized.len(),
230 elapsed
231 );
232
233 Ok(Some(snapshot))
234 }
235
236 pub fn should_save(&self, entries_count: usize) -> bool {
238 if !self.config.enabled {
239 return false;
240 }
241
242 if entries_count < self.config.min_entries_threshold {
244 return false;
245 }
246
247 if let Some(last) = *self.last_save.read() {
249 if last.elapsed() < self.config.save_interval {
250 return false;
251 }
252 }
253
254 true
255 }
256
257 pub fn delete_snapshot(&self, path: Option<&Path>) -> crate::Result<bool> {
259 let delete_path = path
260 .map(PathBuf::from)
261 .unwrap_or_else(|| self.config.persistence_path.clone());
262
263 if delete_path.exists() {
264 std::fs::remove_file(&delete_path).map_err(|e| {
265 crate::Error::Storage(format!("Failed to delete cache snapshot: {}", e))
266 })?;
267
268 info!("Cache snapshot deleted: {:?}", delete_path);
269 Ok(true)
270 } else {
271 Ok(false)
272 }
273 }
274
275 pub fn last_save_age(&self) -> Option<Duration> {
277 self.last_save.read().map(|instant| instant.elapsed())
278 }
279
280 pub fn reset_stats(&self) {
282 let mut stats = self.stats.write();
283 *stats = PersistenceStats::default();
284 info!("Cache persistence statistics reset");
285 }
286}
287
288impl Default for CachePersistence {
289 fn default() -> Self {
290 Self::new(PersistenceConfig::default())
291 }
292}
293
294fn compress_data(data: &[u8]) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
296 let compressed = lz4_flex::compress_prepend_size(data);
298 Ok(compressed)
299}
300
301fn decompress_data(data: &[u8]) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
303 let decompressed = lz4_flex::decompress_size_prepended(data)?;
304 Ok(decompressed)
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310 use std::collections::HashMap;
311 use tempfile::TempDir;
312
313 fn create_test_snapshot() -> CacheSnapshot {
314 let entries = vec![
315 PersistedCacheEntry {
316 key: "entry1".to_string(),
317 value: vec![1, 2, 3],
318 created_at: SystemTime::now()
319 .duration_since(UNIX_EPOCH)
320 .unwrap()
321 .as_secs(),
322 access_count: 5,
323 last_accessed: SystemTime::now()
324 .duration_since(UNIX_EPOCH)
325 .unwrap()
326 .as_secs(),
327 ttl_secs: None,
328 },
329 PersistedCacheEntry {
330 key: "entry2".to_string(),
331 value: vec![4, 5, 6],
332 created_at: SystemTime::now()
333 .duration_since(UNIX_EPOCH)
334 .unwrap()
335 .as_secs(),
336 access_count: 3,
337 last_accessed: SystemTime::now()
338 .duration_since(UNIX_EPOCH)
339 .unwrap()
340 .as_secs(),
341 ttl_secs: None,
342 },
343 ];
344
345 CacheSnapshot {
346 version: 1,
347 created_at: SystemTime::now()
348 .duration_since(UNIX_EPOCH)
349 .unwrap()
350 .as_secs(),
351 entries,
352 metadata: HashMap::new(),
353 }
354 }
355
356 #[test]
357 fn test_persistence_creation() {
358 let config = PersistenceConfig::default();
359 let persistence = CachePersistence::new(config);
360
361 assert!(persistence.is_enabled());
362 assert_eq!(persistence.stats().snapshots_saved, 0);
363 }
364
365 #[test]
366 fn test_save_and_load_snapshot() {
367 let temp_dir = TempDir::new().unwrap();
368 let snapshot_path = temp_dir.path().join("cache.snapshot");
369
370 let config = PersistenceConfig {
371 enabled: true,
372 persistence_path: snapshot_path.clone(),
373 compression_enabled: false,
374 ..Default::default()
375 };
376
377 let persistence = CachePersistence::new(config);
378 let snapshot = create_test_snapshot();
379
380 let saved = persistence.save_snapshot(&snapshot, None).unwrap();
382 assert_eq!(saved, 2);
383 assert_eq!(persistence.stats().snapshots_saved, 1);
384
385 let loaded = persistence.load_snapshot(None).unwrap();
387 assert!(loaded.is_some());
388
389 let loaded_snapshot = loaded.unwrap();
390 assert_eq!(loaded_snapshot.entries.len(), 2);
391 assert_eq!(loaded_snapshot.entries[0].key, "entry1");
392 assert_eq!(loaded_snapshot.entries[1].key, "entry2");
393 }
394
395 #[test]
396 fn test_save_with_compression() {
397 let temp_dir = TempDir::new().unwrap();
398 let snapshot_path = temp_dir.path().join("cache.snapshot");
399
400 let config = PersistenceConfig {
401 enabled: true,
402 persistence_path: snapshot_path.clone(),
403 compression_enabled: true,
404 ..Default::default()
405 };
406
407 let persistence = CachePersistence::new(config);
408 let snapshot = create_test_snapshot();
409
410 let saved = persistence.save_snapshot(&snapshot, None).unwrap();
412 assert_eq!(saved, 2);
413
414 let loaded = persistence.load_snapshot(None).unwrap();
416 assert!(loaded.is_some());
417 assert_eq!(loaded.unwrap().entries.len(), 2);
418 }
419
420 #[test]
421 fn test_disabled_persistence() {
422 let config = PersistenceConfig {
423 enabled: false,
424 ..Default::default()
425 };
426
427 let persistence = CachePersistence::new(config);
428 let snapshot = create_test_snapshot();
429
430 assert!(!persistence.is_enabled());
431 assert_eq!(persistence.save_snapshot(&snapshot, None).unwrap(), 0);
432 assert!(persistence.load_snapshot(None).unwrap().is_none());
433 }
434
435 #[test]
436 fn test_should_save() {
437 let config = PersistenceConfig {
438 enabled: true,
439 min_entries_threshold: 10,
440 save_interval: Duration::from_secs(60),
441 ..Default::default()
442 };
443
444 let persistence = CachePersistence::new(config);
445
446 assert!(!persistence.should_save(5));
448
449 assert!(persistence.should_save(15));
451 }
452
453 #[test]
454 fn test_delete_snapshot() {
455 let temp_dir = TempDir::new().unwrap();
456 let snapshot_path = temp_dir.path().join("cache.snapshot");
457
458 let config = PersistenceConfig {
459 enabled: true,
460 persistence_path: snapshot_path.clone(),
461 ..Default::default()
462 };
463
464 let persistence = CachePersistence::new(config);
465 let snapshot = create_test_snapshot();
466
467 persistence.save_snapshot(&snapshot, None).unwrap();
469 assert!(snapshot_path.exists());
470
471 let deleted = persistence.delete_snapshot(None).unwrap();
473 assert!(deleted);
474 assert!(!snapshot_path.exists());
475
476 let deleted = persistence.delete_snapshot(None).unwrap();
478 assert!(!deleted);
479 }
480}