Skip to main content

do_memory_storage_redb/persistence/
manager.rs

1//! Persistence manager for automatic cache persistence
2
3use std::sync::Arc;
4
5use parking_lot::RwLock;
6use tokio::task::JoinHandle;
7use tokio::time::interval;
8use tracing::{debug, error, info, warn};
9
10use super::{CachePersistence, CacheSnapshot, PersistenceConfig, PersistenceStats};
11
12/// Manager for automatic cache persistence
13///
14/// Handles periodic saves, shutdown persistence, and recovery.
15pub struct PersistenceManager {
16    config: PersistenceConfig,
17    persistence: CachePersistence,
18    last_snapshot: Arc<RwLock<Option<CacheSnapshot>>>,
19    background_task: Arc<RwLock<Option<JoinHandle<()>>>>,
20    shutdown_flag: Arc<RwLock<bool>>,
21}
22
23impl PersistenceManager {
24    /// Create a new persistence manager
25    pub fn new(config: PersistenceConfig) -> Self {
26        let persistence = CachePersistence::new(config.clone());
27
28        Self {
29            config,
30            persistence,
31            last_snapshot: Arc::new(RwLock::new(None)),
32            background_task: Arc::new(RwLock::new(None)),
33            shutdown_flag: Arc::new(RwLock::new(false)),
34        }
35    }
36
37    /// Create with default configuration
38    pub fn with_default_config() -> Self {
39        Self::new(PersistenceConfig::default())
40    }
41
42    /// Start background persistence task
43    ///
44    /// This starts a task that periodically saves the cache based on
45    /// the configured strategy and interval.
46    pub fn start_background_task(
47        &self,
48        snapshot_provider: Arc<dyn Fn() -> Option<CacheSnapshot> + Send + Sync>,
49    ) {
50        if !self.config.enabled {
51            debug!("Persistence disabled, not starting background task");
52            return;
53        }
54
55        if *self.shutdown_flag.read() {
56            warn!("Cannot start background task: shutdown in progress");
57            return;
58        }
59
60        let interval_duration = self.config.save_interval;
61        let persistence = CachePersistence::new(self.config.clone());
62        let shutdown_flag: Arc<parking_lot::RwLock<bool>> = Arc::clone(&self.shutdown_flag);
63        let last_snapshot: Arc<parking_lot::RwLock<Option<CacheSnapshot>>> =
64            Arc::clone(&self.last_snapshot);
65
66        let handle = tokio::spawn(async move {
67            let mut ticker = interval(interval_duration);
68
69            loop {
70                ticker.tick().await;
71
72                if *shutdown_flag.read() {
73                    debug!("Background persistence task shutting down");
74                    break;
75                }
76
77                // Get snapshot from provider
78                if let Some(snapshot) = snapshot_provider() {
79                    if persistence.should_save(snapshot.len()) {
80                        match persistence.save_snapshot(&snapshot, None) {
81                            Ok(count) => {
82                                debug!("Background save completed: {} entries", count);
83                                let mut last = last_snapshot.write();
84                                *last = Some(snapshot);
85                            }
86                            Err(e) => {
87                                error!("Background save failed: {}", e);
88                            }
89                        }
90                    }
91                }
92            }
93        });
94
95        let mut task = self.background_task.write();
96        *task = Some(handle);
97
98        info!(
99            "Started background persistence task with interval {:?}",
100            interval_duration
101        );
102    }
103
104    /// Stop the background persistence task
105    pub fn stop_background_task(&self) {
106        // Set shutdown flag
107        {
108            let mut flag = self.shutdown_flag.write();
109            *flag = true;
110        }
111
112        // Abort background task
113        let mut task = self.background_task.write();
114        if let Some(handle) = task.take() {
115            handle.abort();
116            info!("Background persistence task stopped");
117        }
118    }
119
120    /// Perform graceful shutdown with final cache save
121    ///
122    /// # Arguments
123    ///
124    /// * `final_snapshot` - The final cache snapshot to save
125    pub fn shutdown(&self, final_snapshot: Option<CacheSnapshot>) {
126        info!("Starting persistence manager shutdown");
127
128        // Stop background task
129        self.stop_background_task();
130
131        // Save final snapshot if provided and enabled
132        if let Some(snapshot) = final_snapshot {
133            if self.config.enabled {
134                info!("Saving final cache snapshot ({} entries)", snapshot.len());
135                match self.persistence.save_snapshot(&snapshot, None) {
136                    Ok(count) => {
137                        info!("Final cache snapshot saved: {} entries", count);
138                    }
139                    Err(e) => {
140                        error!("Failed to save final cache snapshot: {}", e);
141                    }
142                }
143            }
144        }
145
146        info!("Persistence manager shutdown complete");
147    }
148
149    /// Recover cache from persisted snapshot
150    ///
151    /// # Returns
152    ///
153    /// The recovered cache snapshot, or None if no snapshot exists
154    pub fn recover(&self) -> crate::Result<Option<CacheSnapshot>> {
155        if !self.config.enabled {
156            debug!("Persistence disabled, skipping recovery");
157            return Ok(None);
158        }
159
160        info!("Attempting to recover cache from persistence");
161
162        match self.persistence.load_snapshot(None) {
163            Ok(Some(snapshot)) => {
164                info!(
165                    "Cache recovered: {} entries from snapshot created at {}",
166                    snapshot.len(),
167                    snapshot.created_at
168                );
169
170                // Update last snapshot
171                {
172                    let mut last = self.last_snapshot.write();
173                    *last = Some(snapshot.clone());
174                }
175
176                Ok(Some(snapshot))
177            }
178            Ok(None) => {
179                info!("No cache snapshot found for recovery");
180                Ok(None)
181            }
182            Err(e) => {
183                error!("Failed to recover cache: {}", e);
184                Err(e)
185            }
186        }
187    }
188
189    /// Check if a recovery snapshot exists
190    pub fn has_recovery_snapshot(&self) -> bool {
191        self.config.enabled && self.config.persistence_path.exists()
192    }
193
194    /// Get the persistence configuration
195    pub fn config(&self) -> &PersistenceConfig {
196        &self.config
197    }
198
199    /// Get persistence statistics
200    pub fn stats(&self) -> PersistenceStats {
201        self.persistence.stats()
202    }
203
204    /// Get the last saved snapshot
205    pub fn last_snapshot(&self) -> Option<CacheSnapshot> {
206        self.last_snapshot.read().clone()
207    }
208
209    /// Force an immediate save
210    pub fn force_save(&self, snapshot: &CacheSnapshot) -> crate::Result<usize> {
211        let result = self.persistence.save_snapshot(snapshot, None);
212
213        if result.is_ok() {
214            let mut last = self.last_snapshot.write();
215            *last = Some(snapshot.clone());
216        }
217
218        result
219    }
220
221    /// Delete persisted snapshot
222    pub fn delete_persisted(&self) -> crate::Result<bool> {
223        self.persistence.delete_snapshot(None)
224    }
225}
226
227impl Default for PersistenceManager {
228    fn default() -> Self {
229        Self::new(PersistenceConfig::default())
230    }
231}
232
233impl Drop for PersistenceManager {
234    fn drop(&mut self) {
235        // Ensure background task is stopped
236        self.stop_background_task();
237    }
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243    use std::collections::HashMap;
244    use tempfile::TempDir;
245
246    fn create_test_snapshot() -> CacheSnapshot {
247        CacheSnapshot {
248            version: 1,
249            created_at: 1234567890,
250            entries: vec![],
251            metadata: HashMap::new(),
252        }
253    }
254
255    #[test]
256    fn test_manager_creation() {
257        let manager = PersistenceManager::default();
258        assert!(manager.config().enabled);
259        assert!(manager.last_snapshot().is_none());
260    }
261
262    #[test]
263    fn test_force_save() {
264        let temp_dir = TempDir::new().unwrap();
265        let config = PersistenceConfig {
266            enabled: true,
267            persistence_path: temp_dir.path().join("cache.snapshot"),
268            ..Default::default()
269        };
270
271        let manager = PersistenceManager::new(config);
272        let snapshot = create_test_snapshot();
273
274        let saved = manager.force_save(&snapshot).unwrap();
275        assert_eq!(saved, 0);
276        assert!(manager.last_snapshot().is_some());
277    }
278
279    #[test]
280    fn test_delete_persisted() {
281        let temp_dir = TempDir::new().unwrap();
282        let config = PersistenceConfig {
283            enabled: true,
284            persistence_path: temp_dir.path().join("cache.snapshot"),
285            ..Default::default()
286        };
287
288        let manager = PersistenceManager::new(config);
289        let snapshot = create_test_snapshot();
290
291        // Save then delete
292        manager.force_save(&snapshot).unwrap();
293        assert!(manager.has_recovery_snapshot());
294
295        let deleted = manager.delete_persisted().unwrap();
296        assert!(deleted);
297        assert!(!manager.has_recovery_snapshot());
298    }
299
300    #[test]
301    fn test_disabled_manager() {
302        let config = PersistenceConfig::disabled();
303        let manager = PersistenceManager::new(config);
304
305        assert!(!manager.config().enabled);
306        assert!(manager.recover().unwrap().is_none());
307    }
308}