Skip to main content

sockudo_cache/
fallback_cache_manager.rs

1use crate::memory_cache_manager::MemoryCacheManager;
2use async_trait::async_trait;
3use sockudo_core::cache::CacheManager;
4use sockudo_core::error::Result;
5use sockudo_core::options::MemoryCacheOptions;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8use tokio::sync::{Mutex, RwLock};
9use tracing::{debug, info, warn};
10
11const RECOVERY_CHECK_INTERVAL_SECS: u64 = 30;
12
13/// Cache manager that wraps a primary (Redis/Redis Cluster) cache and automatically
14/// falls back to in-memory cache when the primary becomes unavailable.
15///
16/// When the primary cache recovers, data from the fallback cache is synchronized
17/// back to the primary before switching over to prevent data loss.
18pub struct FallbackCacheManager {
19    primary: Mutex<Box<dyn CacheManager + Send + Sync>>,
20    fallback: Mutex<MemoryCacheManager>,
21    using_fallback: AtomicBool,
22    last_failure_time: AtomicU64,
23    start_time: Instant,
24    /// Guards state transitions to prevent race conditions during recovery
25    recovery_lock: RwLock<()>,
26}
27
28impl FallbackCacheManager {
29    /// Creates a new FallbackCacheManager without performing an initial health check.
30    /// If Redis is unavailable at startup, the first cache operation will experience
31    /// higher latency due to the failed attempt and retry.
32    pub fn new(
33        primary: Box<dyn CacheManager + Send + Sync>,
34        fallback_options: MemoryCacheOptions,
35    ) -> Self {
36        let fallback = MemoryCacheManager::new("fallback_cache".to_string(), fallback_options);
37
38        Self {
39            primary: Mutex::new(primary),
40            fallback: Mutex::new(fallback),
41            using_fallback: AtomicBool::new(false),
42            last_failure_time: AtomicU64::new(0),
43            start_time: Instant::now(),
44            recovery_lock: RwLock::new(()),
45        }
46    }
47
48    /// Creates a new FallbackCacheManager and performs an initial health check on the
49    /// primary cache. If the primary cache is unavailable at startup, immediately switches
50    /// to fallback mode, avoiding initial latency on the first cache operation.
51    pub async fn new_with_health_check(
52        primary: Box<dyn CacheManager + Send + Sync>,
53        fallback_options: MemoryCacheOptions,
54    ) -> Self {
55        let fallback = MemoryCacheManager::new("fallback_cache".to_string(), fallback_options);
56        let start_time = Instant::now();
57
58        let using_fallback = match primary.check_health().await {
59            Ok(()) => {
60                debug!("Primary cache is healthy at startup");
61                false
62            }
63            Err(e) => {
64                warn!(
65                    "Primary cache unavailable at startup, starting in fallback mode. Error: {}",
66                    e
67                );
68                true
69            }
70        };
71
72        let last_failure_time = if using_fallback {
73            start_time.elapsed().as_secs()
74        } else {
75            0
76        };
77
78        Self {
79            primary: Mutex::new(primary),
80            fallback: Mutex::new(fallback),
81            using_fallback: AtomicBool::new(using_fallback),
82            last_failure_time: AtomicU64::new(last_failure_time),
83            start_time,
84            recovery_lock: RwLock::new(()),
85        }
86    }
87
88    fn is_using_fallback(&self) -> bool {
89        self.using_fallback.load(Ordering::SeqCst)
90    }
91
92    fn switch_to_fallback(&self, error: &str) {
93        if !self.using_fallback.swap(true, Ordering::SeqCst) {
94            warn!(
95                "Redis cache unavailable, switching to in-memory fallback. Error: {}",
96                error
97            );
98            self.last_failure_time
99                .store(self.start_time.elapsed().as_secs(), Ordering::SeqCst);
100        }
101    }
102
103    fn should_attempt_recovery(&self) -> bool {
104        if !self.is_using_fallback() {
105            return false;
106        }
107
108        let last_failure = self.last_failure_time.load(Ordering::SeqCst);
109        let current_time = self.start_time.elapsed().as_secs();
110
111        current_time.saturating_sub(last_failure) >= RECOVERY_CHECK_INTERVAL_SECS
112    }
113
114    async fn try_recover(&self) -> bool {
115        if !self.should_attempt_recovery() {
116            return false;
117        }
118
119        // Acquire write lock to prevent concurrent recovery attempts and operations
120        let _recovery_guard = self.recovery_lock.write().await;
121
122        // Double-check after acquiring lock - another thread may have already recovered
123        if !self.is_using_fallback() {
124            return false;
125        }
126
127        debug!("Attempting to recover Redis cache connection...");
128
129        let mut primary = self.primary.lock().await;
130        match primary.check_health().await {
131            Ok(()) => {
132                info!("Redis cache connection recovered, synchronizing fallback data...");
133
134                if let Err(e) = self.sync_fallback_to_primary(&mut primary).await {
135                    warn!(
136                        "Failed to sync fallback data to primary during recovery: {}",
137                        e
138                    );
139                }
140
141                self.using_fallback.store(false, Ordering::SeqCst);
142                info!("Successfully switched back to primary cache after recovery");
143                true
144            }
145            Err(e) => {
146                debug!("Redis cache still unavailable: {}", e);
147                self.last_failure_time
148                    .store(self.start_time.elapsed().as_secs(), Ordering::SeqCst);
149                false
150            }
151        }
152    }
153
154    /// Synchronizes data from fallback cache to primary cache during recovery.
155    /// This prevents data loss for entries written during the outage.
156    async fn sync_fallback_to_primary(
157        &self,
158        primary: &mut Box<dyn CacheManager + Send + Sync>,
159    ) -> Result<()> {
160        let fallback = self.fallback.lock().await;
161
162        let entries = fallback.get_all_entries().await;
163
164        if entries.is_empty() {
165            debug!("No entries in fallback cache to sync");
166            return Ok(());
167        }
168
169        debug!(
170            "Syncing {} entries from fallback to primary cache",
171            entries.len()
172        );
173
174        let mut synced = 0;
175        let mut failed = 0;
176
177        for (key, value, ttl) in entries {
178            let ttl_seconds = ttl.map(|d| d.as_secs()).unwrap_or(0);
179            match primary.set(&key, &value, ttl_seconds).await {
180                Ok(()) => synced += 1,
181                Err(e) => {
182                    warn!("Failed to sync key '{}' to primary cache: {}", key, e);
183                    failed += 1;
184                }
185            }
186        }
187
188        if failed > 0 {
189            warn!(
190                "Synced {}/{} entries from fallback to primary ({} failed)",
191                synced,
192                synced + failed,
193                failed
194            );
195        } else {
196            info!(
197                "Successfully synced {} entries from fallback to primary cache",
198                synced
199            );
200        }
201
202        Ok(())
203    }
204}
205
206#[async_trait]
207impl CacheManager for FallbackCacheManager {
208    async fn has(&self, key: &str) -> Result<bool> {
209        self.try_recover().await;
210
211        let _guard = self.recovery_lock.read().await;
212
213        if self.is_using_fallback() {
214            return self.fallback.lock().await.has(key).await;
215        }
216
217        let primary = self.primary.lock().await;
218        match primary.has(key).await {
219            Ok(result) => Ok(result),
220            Err(e) => {
221                self.switch_to_fallback(&e.to_string());
222                self.fallback.lock().await.has(key).await
223            }
224        }
225    }
226
227    async fn get(&self, key: &str) -> Result<Option<String>> {
228        self.try_recover().await;
229
230        let _guard = self.recovery_lock.read().await;
231
232        if self.is_using_fallback() {
233            return self.fallback.lock().await.get(key).await;
234        }
235
236        let primary = self.primary.lock().await;
237        match primary.get(key).await {
238            Ok(result) => Ok(result),
239            Err(e) => {
240                self.switch_to_fallback(&e.to_string());
241                self.fallback.lock().await.get(key).await
242            }
243        }
244    }
245
246    async fn set(&self, key: &str, value: &str, ttl_seconds: u64) -> Result<()> {
247        self.try_recover().await;
248
249        let _guard = self.recovery_lock.read().await;
250
251        if self.is_using_fallback() {
252            return self
253                .fallback
254                .lock()
255                .await
256                .set(key, value, ttl_seconds)
257                .await;
258        }
259
260        let primary = self.primary.lock().await;
261        match primary.set(key, value, ttl_seconds).await {
262            Ok(()) => Ok(()),
263            Err(e) => {
264                self.switch_to_fallback(&e.to_string());
265                self.fallback
266                    .lock()
267                    .await
268                    .set(key, value, ttl_seconds)
269                    .await
270            }
271        }
272    }
273
274    async fn remove(&self, key: &str) -> Result<()> {
275        self.try_recover().await;
276
277        let _guard = self.recovery_lock.read().await;
278
279        if self.is_using_fallback() {
280            return self.fallback.lock().await.remove(key).await;
281        }
282
283        let primary = self.primary.lock().await;
284        match primary.remove(key).await {
285            Ok(()) => Ok(()),
286            Err(e) => {
287                self.switch_to_fallback(&e.to_string());
288                self.fallback.lock().await.remove(key).await
289            }
290        }
291    }
292
293    async fn disconnect(&self) -> Result<()> {
294        let primary_result = self.primary.lock().await.disconnect().await;
295        if let Err(ref e) = primary_result {
296            warn!(error = ?e, "Failed to disconnect primary cache");
297        }
298
299        let fallback_result = self.fallback.lock().await.disconnect().await;
300        if let Err(ref e) = fallback_result {
301            warn!(error = ?e, "Failed to disconnect fallback cache");
302        }
303
304        match (primary_result, fallback_result) {
305            (Ok(_), Ok(_)) => Ok(()),
306            (Err(e), _) => Err(e),
307            (_, Err(e)) => Err(e),
308        }
309    }
310
311    async fn check_health(&self) -> Result<()> {
312        if self.is_using_fallback() {
313            let fallback = self.fallback.lock().await;
314            return fallback.check_health().await;
315        }
316
317        let primary = self.primary.lock().await;
318        primary.check_health().await
319    }
320
321    async fn ttl(&self, key: &str) -> Result<Option<Duration>> {
322        self.try_recover().await;
323
324        let _guard = self.recovery_lock.read().await;
325
326        if self.is_using_fallback() {
327            return self.fallback.lock().await.ttl(key).await;
328        }
329
330        let primary = self.primary.lock().await;
331        match primary.ttl(key).await {
332            Ok(result) => Ok(result),
333            Err(e) => {
334                self.switch_to_fallback(&e.to_string());
335                self.fallback.lock().await.ttl(key).await
336            }
337        }
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344    use crate::memory_cache_manager::MemoryCacheManager;
345    use sockudo_core::error::Error;
346    use std::sync::Arc;
347    use tokio::sync::Mutex as TokioMutex;
348
349    /// Mock cache that can be configured to fail on demand
350    struct MockCache {
351        should_fail: Arc<TokioMutex<bool>>,
352        data: Arc<TokioMutex<std::collections::HashMap<String, String>>>,
353    }
354
355    impl MockCache {
356        fn new() -> Self {
357            Self {
358                should_fail: Arc::new(TokioMutex::new(false)),
359                data: Arc::new(TokioMutex::new(std::collections::HashMap::new())),
360            }
361        }
362
363        #[allow(dead_code)]
364        async fn set_should_fail(&self, should_fail: bool) {
365            *self.should_fail.lock().await = should_fail;
366        }
367    }
368
369    #[async_trait]
370    impl CacheManager for MockCache {
371        async fn has(&self, key: &str) -> Result<bool> {
372            if *self.should_fail.lock().await {
373                return Err(Error::Cache("Mock failure".to_string()));
374            }
375            Ok(self.data.lock().await.contains_key(key))
376        }
377
378        async fn get(&self, key: &str) -> Result<Option<String>> {
379            if *self.should_fail.lock().await {
380                return Err(Error::Cache("Mock failure".to_string()));
381            }
382            Ok(self.data.lock().await.get(key).cloned())
383        }
384
385        async fn set(&self, key: &str, value: &str, _ttl_seconds: u64) -> Result<()> {
386            if *self.should_fail.lock().await {
387                return Err(Error::Cache("Mock failure".to_string()));
388            }
389            self.data
390                .lock()
391                .await
392                .insert(key.to_string(), value.to_string());
393            Ok(())
394        }
395
396        async fn remove(&self, key: &str) -> Result<()> {
397            if *self.should_fail.lock().await {
398                return Err(Error::Cache("Mock failure".to_string()));
399            }
400            self.data.lock().await.remove(key);
401            Ok(())
402        }
403
404        async fn disconnect(&self) -> Result<()> {
405            Ok(())
406        }
407
408        async fn check_health(&self) -> Result<()> {
409            if *self.should_fail.lock().await {
410                return Err(Error::Cache("Mock failure".to_string()));
411            }
412            Ok(())
413        }
414
415        async fn ttl(&self, _key: &str) -> Result<Option<Duration>> {
416            if *self.should_fail.lock().await {
417                return Err(Error::Cache("Mock failure".to_string()));
418            }
419            Ok(Some(Duration::from_secs(60)))
420        }
421    }
422
423    #[tokio::test]
424    async fn test_fallback_on_primary_failure() {
425        let mock = MockCache::new();
426        let should_fail = mock.should_fail.clone();
427
428        let options = MemoryCacheOptions {
429            ttl: 60,
430            cleanup_interval: 60,
431            max_capacity: 100,
432        };
433
434        let manager = FallbackCacheManager::new(Box::new(mock), options);
435
436        assert!(!manager.is_using_fallback());
437
438        *should_fail.lock().await = true;
439
440        let result = manager.set("test_key", "test_value", 60).await;
441        assert!(result.is_ok());
442
443        assert!(manager.is_using_fallback());
444
445        let value = manager.get("test_key").await.unwrap();
446        assert_eq!(value, Some("test_value".to_string()));
447    }
448
449    #[tokio::test]
450    async fn test_recovery_with_data_sync() {
451        let mock = MockCache::new();
452        let should_fail = mock.should_fail.clone();
453
454        let options = MemoryCacheOptions {
455            ttl: 60,
456            cleanup_interval: 60,
457            max_capacity: 100,
458        };
459
460        let manager = FallbackCacheManager::new(Box::new(mock), options);
461
462        *should_fail.lock().await = true;
463        manager.set("key1", "value1", 60).await.unwrap();
464        manager.set("key2", "value2", 60).await.unwrap();
465
466        assert!(manager.is_using_fallback());
467
468        let value1_fallback = manager.get("key1").await.unwrap();
469        assert_eq!(value1_fallback, Some("value1".to_string()));
470
471        *should_fail.lock().await = false;
472
473        manager.using_fallback.store(true, Ordering::SeqCst);
474        manager.last_failure_time.store(0, Ordering::SeqCst);
475
476        assert!(manager.is_using_fallback());
477        let value2_fallback = manager.get("key2").await.unwrap();
478        assert_eq!(value2_fallback, Some("value2".to_string()));
479    }
480
481    #[tokio::test]
482    async fn test_no_race_condition_during_operations() {
483        let mock = MockCache::new();
484        let should_fail = mock.should_fail.clone();
485
486        let options = MemoryCacheOptions {
487            ttl: 60,
488            cleanup_interval: 60,
489            max_capacity: 100,
490        };
491
492        let manager = Arc::new(TokioMutex::new(FallbackCacheManager::new(
493            Box::new(mock),
494            options,
495        )));
496
497        *should_fail.lock().await = true;
498        manager.lock().await.set("test", "value", 60).await.unwrap();
499
500        assert!(manager.lock().await.is_using_fallback());
501
502        let handles: Vec<_> = (0..10)
503            .map(|i| {
504                let manager_clone = manager.clone();
505                tokio::spawn(async move {
506                    let mgr = manager_clone.lock().await;
507                    mgr.set(&format!("key{}", i), &format!("value{}", i), 60)
508                        .await
509                })
510            })
511            .collect();
512
513        for handle in handles {
514            assert!(handle.await.unwrap().is_ok());
515        }
516
517        assert!(manager.lock().await.is_using_fallback());
518
519        for i in 0..10 {
520            let mgr = manager.lock().await;
521            let value = mgr.get(&format!("key{}", i)).await.unwrap();
522            assert_eq!(value, Some(format!("value{}", i)));
523        }
524    }
525
526    #[tokio::test]
527    async fn test_memory_cache_get_all_entries() {
528        let options = MemoryCacheOptions {
529            ttl: 60,
530            cleanup_interval: 60,
531            max_capacity: 100,
532        };
533
534        let cache = MemoryCacheManager::new("test_prefix".to_string(), options);
535
536        cache.set("key1", "value1", 60).await.unwrap();
537        cache.set("key2", "value2", 60).await.unwrap();
538        cache.set("key3", "value3", 60).await.unwrap();
539
540        let entries = cache.get_all_entries().await;
541
542        assert_eq!(entries.len(), 3);
543
544        let keys: Vec<String> = entries.iter().map(|(k, _, _)| k.clone()).collect();
545        assert!(keys.contains(&"key1".to_string()));
546        assert!(keys.contains(&"key2".to_string()));
547        assert!(keys.contains(&"key3".to_string()));
548
549        for (key, value, ttl) in entries {
550            match key.as_str() {
551                "key1" => assert_eq!(value, "value1"),
552                "key2" => assert_eq!(value, "value2"),
553                "key3" => assert_eq!(value, "value3"),
554                _ => panic!("Unexpected key: {}", key),
555            }
556            assert_eq!(ttl, Some(Duration::from_secs(60)));
557        }
558    }
559
560    #[tokio::test]
561    async fn test_new_with_health_check_healthy_primary() {
562        let mock = MockCache::new();
563        let options = MemoryCacheOptions {
564            ttl: 60,
565            cleanup_interval: 60,
566            max_capacity: 100,
567        };
568
569        let manager = FallbackCacheManager::new_with_health_check(Box::new(mock), options).await;
570
571        assert!(!manager.is_using_fallback());
572    }
573
574    #[tokio::test]
575    async fn test_new_with_health_check_unhealthy_primary() {
576        let mock = MockCache::new();
577        let should_fail = mock.should_fail.clone();
578        *should_fail.lock().await = true;
579
580        let options = MemoryCacheOptions {
581            ttl: 60,
582            cleanup_interval: 60,
583            max_capacity: 100,
584        };
585
586        let manager = FallbackCacheManager::new_with_health_check(Box::new(mock), options).await;
587
588        assert!(manager.is_using_fallback());
589    }
590
591    #[tokio::test]
592    async fn test_new_without_health_check() {
593        let mock = MockCache::new();
594        let should_fail = mock.should_fail.clone();
595        *should_fail.lock().await = true;
596
597        let options = MemoryCacheOptions {
598            ttl: 60,
599            cleanup_interval: 60,
600            max_capacity: 100,
601        };
602
603        let manager = FallbackCacheManager::new(Box::new(mock), options);
604
605        assert!(!manager.is_using_fallback());
606    }
607}