do_memory_storage_redb/persistence/
manager.rs1use std::sync::Arc;
4
5use parking_lot::RwLock;
6use tokio::task::JoinHandle;
7use tokio::time::interval;
8use tracing::{debug, error, info, warn};
9
10use super::{CachePersistence, CacheSnapshot, PersistenceConfig, PersistenceStats};
11
12pub struct PersistenceManager {
16 config: PersistenceConfig,
17 persistence: CachePersistence,
18 last_snapshot: Arc<RwLock<Option<CacheSnapshot>>>,
19 background_task: Arc<RwLock<Option<JoinHandle<()>>>>,
20 shutdown_flag: Arc<RwLock<bool>>,
21}
22
23impl PersistenceManager {
24 pub fn new(config: PersistenceConfig) -> Self {
26 let persistence = CachePersistence::new(config.clone());
27
28 Self {
29 config,
30 persistence,
31 last_snapshot: Arc::new(RwLock::new(None)),
32 background_task: Arc::new(RwLock::new(None)),
33 shutdown_flag: Arc::new(RwLock::new(false)),
34 }
35 }
36
37 pub fn with_default_config() -> Self {
39 Self::new(PersistenceConfig::default())
40 }
41
42 pub fn start_background_task(
47 &self,
48 snapshot_provider: Arc<dyn Fn() -> Option<CacheSnapshot> + Send + Sync>,
49 ) {
50 if !self.config.enabled {
51 debug!("Persistence disabled, not starting background task");
52 return;
53 }
54
55 if *self.shutdown_flag.read() {
56 warn!("Cannot start background task: shutdown in progress");
57 return;
58 }
59
60 let interval_duration = self.config.save_interval;
61 let persistence = CachePersistence::new(self.config.clone());
62 let shutdown_flag: Arc<parking_lot::RwLock<bool>> = Arc::clone(&self.shutdown_flag);
63 let last_snapshot: Arc<parking_lot::RwLock<Option<CacheSnapshot>>> =
64 Arc::clone(&self.last_snapshot);
65
66 let handle = tokio::spawn(async move {
67 let mut ticker = interval(interval_duration);
68
69 loop {
70 ticker.tick().await;
71
72 if *shutdown_flag.read() {
73 debug!("Background persistence task shutting down");
74 break;
75 }
76
77 if let Some(snapshot) = snapshot_provider() {
79 if persistence.should_save(snapshot.len()) {
80 match persistence.save_snapshot(&snapshot, None) {
81 Ok(count) => {
82 debug!("Background save completed: {} entries", count);
83 let mut last = last_snapshot.write();
84 *last = Some(snapshot);
85 }
86 Err(e) => {
87 error!("Background save failed: {}", e);
88 }
89 }
90 }
91 }
92 }
93 });
94
95 let mut task = self.background_task.write();
96 *task = Some(handle);
97
98 info!(
99 "Started background persistence task with interval {:?}",
100 interval_duration
101 );
102 }
103
104 pub fn stop_background_task(&self) {
106 {
108 let mut flag = self.shutdown_flag.write();
109 *flag = true;
110 }
111
112 let mut task = self.background_task.write();
114 if let Some(handle) = task.take() {
115 handle.abort();
116 info!("Background persistence task stopped");
117 }
118 }
119
120 pub fn shutdown(&self, final_snapshot: Option<CacheSnapshot>) {
126 info!("Starting persistence manager shutdown");
127
128 self.stop_background_task();
130
131 if let Some(snapshot) = final_snapshot {
133 if self.config.enabled {
134 info!("Saving final cache snapshot ({} entries)", snapshot.len());
135 match self.persistence.save_snapshot(&snapshot, None) {
136 Ok(count) => {
137 info!("Final cache snapshot saved: {} entries", count);
138 }
139 Err(e) => {
140 error!("Failed to save final cache snapshot: {}", e);
141 }
142 }
143 }
144 }
145
146 info!("Persistence manager shutdown complete");
147 }
148
149 pub fn recover(&self) -> crate::Result<Option<CacheSnapshot>> {
155 if !self.config.enabled {
156 debug!("Persistence disabled, skipping recovery");
157 return Ok(None);
158 }
159
160 info!("Attempting to recover cache from persistence");
161
162 match self.persistence.load_snapshot(None) {
163 Ok(Some(snapshot)) => {
164 info!(
165 "Cache recovered: {} entries from snapshot created at {}",
166 snapshot.len(),
167 snapshot.created_at
168 );
169
170 {
172 let mut last = self.last_snapshot.write();
173 *last = Some(snapshot.clone());
174 }
175
176 Ok(Some(snapshot))
177 }
178 Ok(None) => {
179 info!("No cache snapshot found for recovery");
180 Ok(None)
181 }
182 Err(e) => {
183 error!("Failed to recover cache: {}", e);
184 Err(e)
185 }
186 }
187 }
188
189 pub fn has_recovery_snapshot(&self) -> bool {
191 self.config.enabled && self.config.persistence_path.exists()
192 }
193
194 pub fn config(&self) -> &PersistenceConfig {
196 &self.config
197 }
198
199 pub fn stats(&self) -> PersistenceStats {
201 self.persistence.stats()
202 }
203
204 pub fn last_snapshot(&self) -> Option<CacheSnapshot> {
206 self.last_snapshot.read().clone()
207 }
208
209 pub fn force_save(&self, snapshot: &CacheSnapshot) -> crate::Result<usize> {
211 let result = self.persistence.save_snapshot(snapshot, None);
212
213 if result.is_ok() {
214 let mut last = self.last_snapshot.write();
215 *last = Some(snapshot.clone());
216 }
217
218 result
219 }
220
221 pub fn delete_persisted(&self) -> crate::Result<bool> {
223 self.persistence.delete_snapshot(None)
224 }
225}
226
227impl Default for PersistenceManager {
228 fn default() -> Self {
229 Self::new(PersistenceConfig::default())
230 }
231}
232
233impl Drop for PersistenceManager {
234 fn drop(&mut self) {
235 self.stop_background_task();
237 }
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243 use std::collections::HashMap;
244 use tempfile::TempDir;
245
246 fn create_test_snapshot() -> CacheSnapshot {
247 CacheSnapshot {
248 version: 1,
249 created_at: 1234567890,
250 entries: vec![],
251 metadata: HashMap::new(),
252 }
253 }
254
255 #[test]
256 fn test_manager_creation() {
257 let manager = PersistenceManager::default();
258 assert!(manager.config().enabled);
259 assert!(manager.last_snapshot().is_none());
260 }
261
262 #[test]
263 fn test_force_save() {
264 let temp_dir = TempDir::new().unwrap();
265 let config = PersistenceConfig {
266 enabled: true,
267 persistence_path: temp_dir.path().join("cache.snapshot"),
268 ..Default::default()
269 };
270
271 let manager = PersistenceManager::new(config);
272 let snapshot = create_test_snapshot();
273
274 let saved = manager.force_save(&snapshot).unwrap();
275 assert_eq!(saved, 0);
276 assert!(manager.last_snapshot().is_some());
277 }
278
279 #[test]
280 fn test_delete_persisted() {
281 let temp_dir = TempDir::new().unwrap();
282 let config = PersistenceConfig {
283 enabled: true,
284 persistence_path: temp_dir.path().join("cache.snapshot"),
285 ..Default::default()
286 };
287
288 let manager = PersistenceManager::new(config);
289 let snapshot = create_test_snapshot();
290
291 manager.force_save(&snapshot).unwrap();
293 assert!(manager.has_recovery_snapshot());
294
295 let deleted = manager.delete_persisted().unwrap();
296 assert!(deleted);
297 assert!(!manager.has_recovery_snapshot());
298 }
299
300 #[test]
301 fn test_disabled_manager() {
302 let config = PersistenceConfig::disabled();
303 let manager = PersistenceManager::new(config);
304
305 assert!(!manager.config().enabled);
306 assert!(manager.recover().unwrap().is_none());
307 }
308}