Skip to main content

sockudo_cache/
fallback_cache_manager.rs

1use crate::memory_cache_manager::MemoryCacheManager;
2use async_trait::async_trait;
3use sockudo_core::cache::{CacheManager, CacheScanPage};
4use sockudo_core::error::Result;
5use sockudo_core::options::MemoryCacheOptions;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8use tokio::sync::{Mutex, RwLock};
9use tracing::{debug, info, warn};
10
11const RECOVERY_CHECK_INTERVAL_SECS: u64 = 30;
12
13/// Cache manager that wraps a primary (Redis/Redis Cluster) cache and automatically
14/// falls back to in-memory cache when the primary becomes unavailable.
15///
16/// When the primary cache recovers, data from the fallback cache is synchronized
17/// back to the primary before switching over to prevent data loss.
18pub struct FallbackCacheManager {
19    primary: Mutex<Box<dyn CacheManager + Send + Sync>>,
20    fallback: Mutex<MemoryCacheManager>,
21    using_fallback: AtomicBool,
22    last_failure_time: AtomicU64,
23    start_time: Instant,
24    /// Guards state transitions to prevent race conditions during recovery
25    recovery_lock: RwLock<()>,
26}
27
28impl FallbackCacheManager {
29    /// Creates a new FallbackCacheManager without performing an initial health check.
30    /// If Redis is unavailable at startup, the first cache operation will experience
31    /// higher latency due to the failed attempt and retry.
32    pub fn new(
33        primary: Box<dyn CacheManager + Send + Sync>,
34        fallback_options: MemoryCacheOptions,
35    ) -> Self {
36        let fallback = MemoryCacheManager::new("fallback_cache".to_string(), fallback_options);
37
38        Self {
39            primary: Mutex::new(primary),
40            fallback: Mutex::new(fallback),
41            using_fallback: AtomicBool::new(false),
42            last_failure_time: AtomicU64::new(0),
43            start_time: Instant::now(),
44            recovery_lock: RwLock::new(()),
45        }
46    }
47
48    /// Creates a new FallbackCacheManager and performs an initial health check on the
49    /// primary cache. If the primary cache is unavailable at startup, immediately switches
50    /// to fallback mode, avoiding initial latency on the first cache operation.
51    pub async fn new_with_health_check(
52        primary: Box<dyn CacheManager + Send + Sync>,
53        fallback_options: MemoryCacheOptions,
54    ) -> Self {
55        let fallback = MemoryCacheManager::new("fallback_cache".to_string(), fallback_options);
56        let start_time = Instant::now();
57
58        let using_fallback = match primary.check_health().await {
59            Ok(()) => {
60                debug!("Primary cache is healthy at startup");
61                false
62            }
63            Err(e) => {
64                warn!(
65                    "Primary cache unavailable at startup, starting in fallback mode. Error: {}",
66                    e
67                );
68                true
69            }
70        };
71
72        let last_failure_time = if using_fallback {
73            start_time.elapsed().as_secs()
74        } else {
75            0
76        };
77
78        Self {
79            primary: Mutex::new(primary),
80            fallback: Mutex::new(fallback),
81            using_fallback: AtomicBool::new(using_fallback),
82            last_failure_time: AtomicU64::new(last_failure_time),
83            start_time,
84            recovery_lock: RwLock::new(()),
85        }
86    }
87
88    fn is_using_fallback(&self) -> bool {
89        self.using_fallback.load(Ordering::SeqCst)
90    }
91
92    fn switch_to_fallback(&self, error: &str) {
93        if !self.using_fallback.swap(true, Ordering::SeqCst) {
94            warn!(
95                "Redis cache unavailable, switching to in-memory fallback. Error: {}",
96                error
97            );
98            self.last_failure_time
99                .store(self.start_time.elapsed().as_secs(), Ordering::SeqCst);
100        }
101    }
102
103    fn should_attempt_recovery(&self) -> bool {
104        if !self.is_using_fallback() {
105            return false;
106        }
107
108        let last_failure = self.last_failure_time.load(Ordering::SeqCst);
109        let current_time = self.start_time.elapsed().as_secs();
110
111        current_time.saturating_sub(last_failure) >= RECOVERY_CHECK_INTERVAL_SECS
112    }
113
114    async fn try_recover(&self) -> bool {
115        if !self.should_attempt_recovery() {
116            return false;
117        }
118
119        // Acquire write lock to prevent concurrent recovery attempts and operations
120        let _recovery_guard = self.recovery_lock.write().await;
121
122        // Double-check after acquiring lock - another thread may have already recovered
123        if !self.is_using_fallback() {
124            return false;
125        }
126
127        debug!("Attempting to recover Redis cache connection...");
128
129        let mut primary = self.primary.lock().await;
130        match primary.check_health().await {
131            Ok(()) => {
132                info!("Redis cache connection recovered, synchronizing fallback data...");
133
134                if let Err(e) = self.sync_fallback_to_primary(&mut primary).await {
135                    warn!(
136                        "Failed to sync fallback data to primary during recovery: {}",
137                        e
138                    );
139                }
140
141                self.using_fallback.store(false, Ordering::SeqCst);
142                info!("Successfully switched back to primary cache after recovery");
143                true
144            }
145            Err(e) => {
146                debug!("Redis cache still unavailable: {}", e);
147                self.last_failure_time
148                    .store(self.start_time.elapsed().as_secs(), Ordering::SeqCst);
149                false
150            }
151        }
152    }
153
154    /// Synchronizes data from fallback cache to primary cache during recovery.
155    /// This prevents data loss for entries written during the outage.
156    async fn sync_fallback_to_primary(
157        &self,
158        primary: &mut Box<dyn CacheManager + Send + Sync>,
159    ) -> Result<()> {
160        let fallback = self.fallback.lock().await;
161
162        let entries = fallback.get_all_entries().await;
163
164        if entries.is_empty() {
165            debug!("No entries in fallback cache to sync");
166            return Ok(());
167        }
168
169        debug!(
170            "Syncing {} entries from fallback to primary cache",
171            entries.len()
172        );
173
174        let mut synced = 0;
175        let mut failed = 0;
176
177        for (key, value, ttl) in entries {
178            let ttl_seconds = ttl.map(|d| d.as_secs()).unwrap_or(0);
179            match primary.set(&key, &value, ttl_seconds).await {
180                Ok(()) => synced += 1,
181                Err(e) => {
182                    warn!("Failed to sync key '{}' to primary cache: {}", key, e);
183                    failed += 1;
184                }
185            }
186        }
187
188        if failed > 0 {
189            warn!(
190                "Synced {}/{} entries from fallback to primary ({} failed)",
191                synced,
192                synced + failed,
193                failed
194            );
195        } else {
196            info!(
197                "Successfully synced {} entries from fallback to primary cache",
198                synced
199            );
200        }
201
202        Ok(())
203    }
204}
205
206#[async_trait]
207impl CacheManager for FallbackCacheManager {
208    async fn has(&self, key: &str) -> Result<bool> {
209        self.try_recover().await;
210
211        let _guard = self.recovery_lock.read().await;
212
213        if self.is_using_fallback() {
214            return self.fallback.lock().await.has(key).await;
215        }
216
217        let primary = self.primary.lock().await;
218        match primary.has(key).await {
219            Ok(result) => Ok(result),
220            Err(e) => {
221                self.switch_to_fallback(&e.to_string());
222                self.fallback.lock().await.has(key).await
223            }
224        }
225    }
226
227    async fn get(&self, key: &str) -> Result<Option<String>> {
228        self.try_recover().await;
229
230        let _guard = self.recovery_lock.read().await;
231
232        if self.is_using_fallback() {
233            return self.fallback.lock().await.get(key).await;
234        }
235
236        let primary = self.primary.lock().await;
237        match primary.get(key).await {
238            Ok(result) => Ok(result),
239            Err(e) => {
240                self.switch_to_fallback(&e.to_string());
241                self.fallback.lock().await.get(key).await
242            }
243        }
244    }
245
246    async fn set(&self, key: &str, value: &str, ttl_seconds: u64) -> Result<()> {
247        self.try_recover().await;
248
249        let _guard = self.recovery_lock.read().await;
250
251        if self.is_using_fallback() {
252            return self
253                .fallback
254                .lock()
255                .await
256                .set(key, value, ttl_seconds)
257                .await;
258        }
259
260        let primary = self.primary.lock().await;
261        match primary.set(key, value, ttl_seconds).await {
262            Ok(()) => Ok(()),
263            Err(e) => {
264                self.switch_to_fallback(&e.to_string());
265                self.fallback
266                    .lock()
267                    .await
268                    .set(key, value, ttl_seconds)
269                    .await
270            }
271        }
272    }
273
274    async fn remove(&self, key: &str) -> Result<()> {
275        self.try_recover().await;
276
277        let _guard = self.recovery_lock.read().await;
278
279        if self.is_using_fallback() {
280            return self.fallback.lock().await.remove(key).await;
281        }
282
283        let primary = self.primary.lock().await;
284        match primary.remove(key).await {
285            Ok(()) => Ok(()),
286            Err(e) => {
287                self.switch_to_fallback(&e.to_string());
288                self.fallback.lock().await.remove(key).await
289            }
290        }
291    }
292
293    async fn disconnect(&self) -> Result<()> {
294        let primary_result = self.primary.lock().await.disconnect().await;
295        if let Err(ref e) = primary_result {
296            warn!(error = ?e, "Failed to disconnect primary cache");
297        }
298
299        let fallback_result = self.fallback.lock().await.disconnect().await;
300        if let Err(ref e) = fallback_result {
301            warn!(error = ?e, "Failed to disconnect fallback cache");
302        }
303
304        match (primary_result, fallback_result) {
305            (Ok(_), Ok(_)) => Ok(()),
306            (Err(e), _) => Err(e),
307            (_, Err(e)) => Err(e),
308        }
309    }
310
311    async fn check_health(&self) -> Result<()> {
312        if self.is_using_fallback() {
313            let fallback = self.fallback.lock().await;
314            return fallback.check_health().await;
315        }
316
317        let primary = self.primary.lock().await;
318        primary.check_health().await
319    }
320
321    async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<(String, String)>> {
322        self.try_recover().await;
323
324        let _guard = self.recovery_lock.read().await;
325
326        if self.is_using_fallback() {
327            return self.fallback.lock().await.scan_prefix(prefix, limit).await;
328        }
329
330        let primary = self.primary.lock().await;
331        match primary.scan_prefix(prefix, limit).await {
332            Ok(result) => Ok(result),
333            Err(e) => {
334                self.switch_to_fallback(&e.to_string());
335                self.fallback.lock().await.scan_prefix(prefix, limit).await
336            }
337        }
338    }
339
340    async fn scan_prefix_page(
341        &self,
342        prefix: &str,
343        cursor: Option<String>,
344        limit: usize,
345    ) -> Result<CacheScanPage> {
346        self.try_recover().await;
347
348        let _guard = self.recovery_lock.read().await;
349
350        if self.is_using_fallback() {
351            return self
352                .fallback
353                .lock()
354                .await
355                .scan_prefix_page(prefix, cursor, limit)
356                .await;
357        }
358
359        let primary = self.primary.lock().await;
360        match primary
361            .scan_prefix_page(prefix, cursor.clone(), limit)
362            .await
363        {
364            Ok(result) => Ok(result),
365            Err(e) => {
366                self.switch_to_fallback(&e.to_string());
367                self.fallback
368                    .lock()
369                    .await
370                    .scan_prefix_page(prefix, cursor, limit)
371                    .await
372            }
373        }
374    }
375
376    async fn ttl(&self, key: &str) -> Result<Option<Duration>> {
377        self.try_recover().await;
378
379        let _guard = self.recovery_lock.read().await;
380
381        if self.is_using_fallback() {
382            return self.fallback.lock().await.ttl(key).await;
383        }
384
385        let primary = self.primary.lock().await;
386        match primary.ttl(key).await {
387            Ok(result) => Ok(result),
388            Err(e) => {
389                self.switch_to_fallback(&e.to_string());
390                self.fallback.lock().await.ttl(key).await
391            }
392        }
393    }
394
395    async fn set_if_not_exists(&self, key: &str, value: &str, ttl_seconds: u64) -> Result<bool> {
396        self.try_recover().await;
397
398        let _guard = self.recovery_lock.read().await;
399
400        if self.is_using_fallback() {
401            return self
402                .fallback
403                .lock()
404                .await
405                .set_if_not_exists(key, value, ttl_seconds)
406                .await;
407        }
408
409        let primary = self.primary.lock().await;
410        match primary.set_if_not_exists(key, value, ttl_seconds).await {
411            Ok(result) => Ok(result),
412            Err(e) => {
413                self.switch_to_fallback(&e.to_string());
414                self.fallback
415                    .lock()
416                    .await
417                    .set_if_not_exists(key, value, ttl_seconds)
418                    .await
419            }
420        }
421    }
422
423    async fn increment_by(&self, key: &str, delta: i64, ttl_seconds: u64) -> Result<i64> {
424        self.try_recover().await;
425
426        let _guard = self.recovery_lock.read().await;
427
428        if self.is_using_fallback() {
429            return self
430                .fallback
431                .lock()
432                .await
433                .increment_by(key, delta, ttl_seconds)
434                .await;
435        }
436
437        let primary = self.primary.lock().await;
438        match primary.increment_by(key, delta, ttl_seconds).await {
439            Ok(result) => Ok(result),
440            Err(e) => {
441                self.switch_to_fallback(&e.to_string());
442                self.fallback
443                    .lock()
444                    .await
445                    .increment_by(key, delta, ttl_seconds)
446                    .await
447            }
448        }
449    }
450}
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455    use crate::memory_cache_manager::MemoryCacheManager;
456    use sockudo_core::error::Error;
457    use std::sync::Arc;
458    use tokio::sync::Mutex as TokioMutex;
459
460    /// Mock cache that can be configured to fail on demand
461    struct MockCache {
462        should_fail: Arc<TokioMutex<bool>>,
463        data: Arc<TokioMutex<std::collections::HashMap<String, String>>>,
464    }
465
466    impl MockCache {
467        fn new() -> Self {
468            Self {
469                should_fail: Arc::new(TokioMutex::new(false)),
470                data: Arc::new(TokioMutex::new(std::collections::HashMap::new())),
471            }
472        }
473
474        #[allow(dead_code)]
475        async fn set_should_fail(&self, should_fail: bool) {
476            *self.should_fail.lock().await = should_fail;
477        }
478    }
479
480    #[async_trait]
481    impl CacheManager for MockCache {
482        async fn has(&self, key: &str) -> Result<bool> {
483            if *self.should_fail.lock().await {
484                return Err(Error::Cache("Mock failure".to_string()));
485            }
486            Ok(self.data.lock().await.contains_key(key))
487        }
488
489        async fn get(&self, key: &str) -> Result<Option<String>> {
490            if *self.should_fail.lock().await {
491                return Err(Error::Cache("Mock failure".to_string()));
492            }
493            Ok(self.data.lock().await.get(key).cloned())
494        }
495
496        async fn set(&self, key: &str, value: &str, _ttl_seconds: u64) -> Result<()> {
497            if *self.should_fail.lock().await {
498                return Err(Error::Cache("Mock failure".to_string()));
499            }
500            self.data
501                .lock()
502                .await
503                .insert(key.to_string(), value.to_string());
504            Ok(())
505        }
506
507        async fn remove(&self, key: &str) -> Result<()> {
508            if *self.should_fail.lock().await {
509                return Err(Error::Cache("Mock failure".to_string()));
510            }
511            self.data.lock().await.remove(key);
512            Ok(())
513        }
514
515        async fn disconnect(&self) -> Result<()> {
516            Ok(())
517        }
518
519        async fn check_health(&self) -> Result<()> {
520            if *self.should_fail.lock().await {
521                return Err(Error::Cache("Mock failure".to_string()));
522            }
523            Ok(())
524        }
525
526        async fn ttl(&self, _key: &str) -> Result<Option<Duration>> {
527            if *self.should_fail.lock().await {
528                return Err(Error::Cache("Mock failure".to_string()));
529            }
530            Ok(Some(Duration::from_secs(60)))
531        }
532    }
533
534    #[tokio::test]
535    async fn test_fallback_on_primary_failure() {
536        let mock = MockCache::new();
537        let should_fail = mock.should_fail.clone();
538
539        let options = MemoryCacheOptions {
540            ttl: 60,
541            cleanup_interval: 60,
542            max_capacity: 100,
543        };
544
545        let manager = FallbackCacheManager::new(Box::new(mock), options);
546
547        assert!(!manager.is_using_fallback());
548
549        *should_fail.lock().await = true;
550
551        let result = manager.set("test_key", "test_value", 60).await;
552        assert!(result.is_ok());
553
554        assert!(manager.is_using_fallback());
555
556        let value = manager.get("test_key").await.unwrap();
557        assert_eq!(value, Some("test_value".to_string()));
558    }
559
560    #[tokio::test]
561    async fn test_recovery_with_data_sync() {
562        let mock = MockCache::new();
563        let should_fail = mock.should_fail.clone();
564
565        let options = MemoryCacheOptions {
566            ttl: 60,
567            cleanup_interval: 60,
568            max_capacity: 100,
569        };
570
571        let manager = FallbackCacheManager::new(Box::new(mock), options);
572
573        *should_fail.lock().await = true;
574        manager.set("key1", "value1", 60).await.unwrap();
575        manager.set("key2", "value2", 60).await.unwrap();
576
577        assert!(manager.is_using_fallback());
578
579        let value1_fallback = manager.get("key1").await.unwrap();
580        assert_eq!(value1_fallback, Some("value1".to_string()));
581
582        *should_fail.lock().await = false;
583
584        manager.using_fallback.store(true, Ordering::SeqCst);
585        manager.last_failure_time.store(0, Ordering::SeqCst);
586
587        assert!(manager.is_using_fallback());
588        let value2_fallback = manager.get("key2").await.unwrap();
589        assert_eq!(value2_fallback, Some("value2".to_string()));
590    }
591
592    #[tokio::test]
593    async fn test_no_race_condition_during_operations() {
594        let mock = MockCache::new();
595        let should_fail = mock.should_fail.clone();
596
597        let options = MemoryCacheOptions {
598            ttl: 60,
599            cleanup_interval: 60,
600            max_capacity: 100,
601        };
602
603        let manager = Arc::new(TokioMutex::new(FallbackCacheManager::new(
604            Box::new(mock),
605            options,
606        )));
607
608        *should_fail.lock().await = true;
609        manager.lock().await.set("test", "value", 60).await.unwrap();
610
611        assert!(manager.lock().await.is_using_fallback());
612
613        let handles: Vec<_> = (0..10)
614            .map(|i| {
615                let manager_clone = manager.clone();
616                tokio::spawn(async move {
617                    let mgr = manager_clone.lock().await;
618                    mgr.set(&format!("key{}", i), &format!("value{}", i), 60)
619                        .await
620                })
621            })
622            .collect();
623
624        for handle in handles {
625            assert!(handle.await.unwrap().is_ok());
626        }
627
628        assert!(manager.lock().await.is_using_fallback());
629
630        for i in 0..10 {
631            let mgr = manager.lock().await;
632            let value = mgr.get(&format!("key{}", i)).await.unwrap();
633            assert_eq!(value, Some(format!("value{}", i)));
634        }
635    }
636
637    #[tokio::test]
638    async fn test_memory_cache_get_all_entries() {
639        let options = MemoryCacheOptions {
640            ttl: 60,
641            cleanup_interval: 60,
642            max_capacity: 100,
643        };
644
645        let cache = MemoryCacheManager::new("test_prefix".to_string(), options);
646
647        cache.set("key1", "value1", 60).await.unwrap();
648        cache.set("key2", "value2", 60).await.unwrap();
649        cache.set("key3", "value3", 60).await.unwrap();
650
651        let entries = cache.get_all_entries().await;
652
653        assert_eq!(entries.len(), 3);
654
655        let keys: Vec<String> = entries.iter().map(|(k, _, _)| k.clone()).collect();
656        assert!(keys.contains(&"key1".to_string()));
657        assert!(keys.contains(&"key2".to_string()));
658        assert!(keys.contains(&"key3".to_string()));
659
660        for (key, value, ttl) in entries {
661            match key.as_str() {
662                "key1" => assert_eq!(value, "value1"),
663                "key2" => assert_eq!(value, "value2"),
664                "key3" => assert_eq!(value, "value3"),
665                _ => panic!("Unexpected key: {}", key),
666            }
667            assert_eq!(ttl, Some(Duration::from_secs(60)));
668        }
669    }
670
671    #[tokio::test]
672    async fn test_new_with_health_check_healthy_primary() {
673        let mock = MockCache::new();
674        let options = MemoryCacheOptions {
675            ttl: 60,
676            cleanup_interval: 60,
677            max_capacity: 100,
678        };
679
680        let manager = FallbackCacheManager::new_with_health_check(Box::new(mock), options).await;
681
682        assert!(!manager.is_using_fallback());
683    }
684
685    #[tokio::test]
686    async fn test_new_with_health_check_unhealthy_primary() {
687        let mock = MockCache::new();
688        let should_fail = mock.should_fail.clone();
689        *should_fail.lock().await = true;
690
691        let options = MemoryCacheOptions {
692            ttl: 60,
693            cleanup_interval: 60,
694            max_capacity: 100,
695        };
696
697        let manager = FallbackCacheManager::new_with_health_check(Box::new(mock), options).await;
698
699        assert!(manager.is_using_fallback());
700    }
701
702    #[tokio::test]
703    async fn test_new_without_health_check() {
704        let mock = MockCache::new();
705        let should_fail = mock.should_fail.clone();
706        *should_fail.lock().await = true;
707
708        let options = MemoryCacheOptions {
709            ttl: 60,
710            cleanup_interval: 60,
711            max_capacity: 100,
712        };
713
714        let manager = FallbackCacheManager::new(Box::new(mock), options);
715
716        assert!(!manager.is_using_fallback());
717    }
718}