Skip to main content

sockudo_cache/
fallback_cache_manager.rs

1use crate::memory_cache_manager::MemoryCacheManager;
2use async_trait::async_trait;
3use sockudo_core::cache::CacheManager;
4use sockudo_core::error::Result;
5use sockudo_core::options::MemoryCacheOptions;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8use tokio::sync::{Mutex, RwLock};
9use tracing::{debug, info, warn};
10
11const RECOVERY_CHECK_INTERVAL_SECS: u64 = 30;
12
13/// Cache manager that wraps a primary (Redis/Redis Cluster) cache and automatically
14/// falls back to in-memory cache when the primary becomes unavailable.
15///
16/// When the primary cache recovers, data from the fallback cache is synchronized
17/// back to the primary before switching over to prevent data loss.
18pub struct FallbackCacheManager {
19    primary: Mutex<Box<dyn CacheManager + Send + Sync>>,
20    fallback: Mutex<MemoryCacheManager>,
21    using_fallback: AtomicBool,
22    last_failure_time: AtomicU64,
23    start_time: Instant,
24    /// Guards state transitions to prevent race conditions during recovery
25    recovery_lock: RwLock<()>,
26}
27
28impl FallbackCacheManager {
29    /// Creates a new FallbackCacheManager without performing an initial health check.
30    /// If Redis is unavailable at startup, the first cache operation will experience
31    /// higher latency due to the failed attempt and retry.
32    pub fn new(
33        primary: Box<dyn CacheManager + Send + Sync>,
34        fallback_options: MemoryCacheOptions,
35    ) -> Self {
36        let fallback = MemoryCacheManager::new("fallback_cache".to_string(), fallback_options);
37
38        Self {
39            primary: Mutex::new(primary),
40            fallback: Mutex::new(fallback),
41            using_fallback: AtomicBool::new(false),
42            last_failure_time: AtomicU64::new(0),
43            start_time: Instant::now(),
44            recovery_lock: RwLock::new(()),
45        }
46    }
47
48    /// Creates a new FallbackCacheManager and performs an initial health check on the
49    /// primary cache. If the primary cache is unavailable at startup, immediately switches
50    /// to fallback mode, avoiding initial latency on the first cache operation.
51    pub async fn new_with_health_check(
52        primary: Box<dyn CacheManager + Send + Sync>,
53        fallback_options: MemoryCacheOptions,
54    ) -> Self {
55        let fallback = MemoryCacheManager::new("fallback_cache".to_string(), fallback_options);
56        let start_time = Instant::now();
57
58        let using_fallback = match primary.check_health().await {
59            Ok(()) => {
60                debug!("Primary cache is healthy at startup");
61                false
62            }
63            Err(e) => {
64                warn!(
65                    "Primary cache unavailable at startup, starting in fallback mode. Error: {}",
66                    e
67                );
68                true
69            }
70        };
71
72        let last_failure_time = if using_fallback {
73            start_time.elapsed().as_secs()
74        } else {
75            0
76        };
77
78        Self {
79            primary: Mutex::new(primary),
80            fallback: Mutex::new(fallback),
81            using_fallback: AtomicBool::new(using_fallback),
82            last_failure_time: AtomicU64::new(last_failure_time),
83            start_time,
84            recovery_lock: RwLock::new(()),
85        }
86    }
87
88    fn is_using_fallback(&self) -> bool {
89        self.using_fallback.load(Ordering::SeqCst)
90    }
91
92    fn switch_to_fallback(&self, error: &str) {
93        if !self.using_fallback.swap(true, Ordering::SeqCst) {
94            warn!(
95                "Redis cache unavailable, switching to in-memory fallback. Error: {}",
96                error
97            );
98            self.last_failure_time
99                .store(self.start_time.elapsed().as_secs(), Ordering::SeqCst);
100        }
101    }
102
103    fn should_attempt_recovery(&self) -> bool {
104        if !self.is_using_fallback() {
105            return false;
106        }
107
108        let last_failure = self.last_failure_time.load(Ordering::SeqCst);
109        let current_time = self.start_time.elapsed().as_secs();
110
111        current_time.saturating_sub(last_failure) >= RECOVERY_CHECK_INTERVAL_SECS
112    }
113
114    async fn try_recover(&self) -> bool {
115        if !self.should_attempt_recovery() {
116            return false;
117        }
118
119        // Acquire write lock to prevent concurrent recovery attempts and operations
120        let _recovery_guard = self.recovery_lock.write().await;
121
122        // Double-check after acquiring lock - another thread may have already recovered
123        if !self.is_using_fallback() {
124            return false;
125        }
126
127        debug!("Attempting to recover Redis cache connection...");
128
129        let mut primary = self.primary.lock().await;
130        match primary.check_health().await {
131            Ok(()) => {
132                info!("Redis cache connection recovered, synchronizing fallback data...");
133
134                if let Err(e) = self.sync_fallback_to_primary(&mut primary).await {
135                    warn!(
136                        "Failed to sync fallback data to primary during recovery: {}",
137                        e
138                    );
139                }
140
141                self.using_fallback.store(false, Ordering::SeqCst);
142                info!("Successfully switched back to primary cache after recovery");
143                true
144            }
145            Err(e) => {
146                debug!("Redis cache still unavailable: {}", e);
147                self.last_failure_time
148                    .store(self.start_time.elapsed().as_secs(), Ordering::SeqCst);
149                false
150            }
151        }
152    }
153
154    /// Synchronizes data from fallback cache to primary cache during recovery.
155    /// This prevents data loss for entries written during the outage.
156    async fn sync_fallback_to_primary(
157        &self,
158        primary: &mut Box<dyn CacheManager + Send + Sync>,
159    ) -> Result<()> {
160        let fallback = self.fallback.lock().await;
161
162        let entries = fallback.get_all_entries().await;
163
164        if entries.is_empty() {
165            debug!("No entries in fallback cache to sync");
166            return Ok(());
167        }
168
169        debug!(
170            "Syncing {} entries from fallback to primary cache",
171            entries.len()
172        );
173
174        let mut synced = 0;
175        let mut failed = 0;
176
177        for (key, value, ttl) in entries {
178            let ttl_seconds = ttl.map(|d| d.as_secs()).unwrap_or(0);
179            match primary.set(&key, &value, ttl_seconds).await {
180                Ok(()) => synced += 1,
181                Err(e) => {
182                    warn!("Failed to sync key '{}' to primary cache: {}", key, e);
183                    failed += 1;
184                }
185            }
186        }
187
188        if failed > 0 {
189            warn!(
190                "Synced {}/{} entries from fallback to primary ({} failed)",
191                synced,
192                synced + failed,
193                failed
194            );
195        } else {
196            info!(
197                "Successfully synced {} entries from fallback to primary cache",
198                synced
199            );
200        }
201
202        Ok(())
203    }
204}
205
206#[async_trait]
207impl CacheManager for FallbackCacheManager {
208    async fn has(&self, key: &str) -> Result<bool> {
209        self.try_recover().await;
210
211        let _guard = self.recovery_lock.read().await;
212
213        if self.is_using_fallback() {
214            return self.fallback.lock().await.has(key).await;
215        }
216
217        let primary = self.primary.lock().await;
218        match primary.has(key).await {
219            Ok(result) => Ok(result),
220            Err(e) => {
221                self.switch_to_fallback(&e.to_string());
222                self.fallback.lock().await.has(key).await
223            }
224        }
225    }
226
227    async fn get(&self, key: &str) -> Result<Option<String>> {
228        self.try_recover().await;
229
230        let _guard = self.recovery_lock.read().await;
231
232        if self.is_using_fallback() {
233            return self.fallback.lock().await.get(key).await;
234        }
235
236        let primary = self.primary.lock().await;
237        match primary.get(key).await {
238            Ok(result) => Ok(result),
239            Err(e) => {
240                self.switch_to_fallback(&e.to_string());
241                self.fallback.lock().await.get(key).await
242            }
243        }
244    }
245
246    async fn set(&self, key: &str, value: &str, ttl_seconds: u64) -> Result<()> {
247        self.try_recover().await;
248
249        let _guard = self.recovery_lock.read().await;
250
251        if self.is_using_fallback() {
252            return self
253                .fallback
254                .lock()
255                .await
256                .set(key, value, ttl_seconds)
257                .await;
258        }
259
260        let primary = self.primary.lock().await;
261        match primary.set(key, value, ttl_seconds).await {
262            Ok(()) => Ok(()),
263            Err(e) => {
264                self.switch_to_fallback(&e.to_string());
265                self.fallback
266                    .lock()
267                    .await
268                    .set(key, value, ttl_seconds)
269                    .await
270            }
271        }
272    }
273
274    async fn remove(&self, key: &str) -> Result<()> {
275        self.try_recover().await;
276
277        let _guard = self.recovery_lock.read().await;
278
279        if self.is_using_fallback() {
280            return self.fallback.lock().await.remove(key).await;
281        }
282
283        let primary = self.primary.lock().await;
284        match primary.remove(key).await {
285            Ok(()) => Ok(()),
286            Err(e) => {
287                self.switch_to_fallback(&e.to_string());
288                self.fallback.lock().await.remove(key).await
289            }
290        }
291    }
292
293    async fn disconnect(&self) -> Result<()> {
294        let primary_result = self.primary.lock().await.disconnect().await;
295        if let Err(ref e) = primary_result {
296            warn!(error = ?e, "Failed to disconnect primary cache");
297        }
298
299        let fallback_result = self.fallback.lock().await.disconnect().await;
300        if let Err(ref e) = fallback_result {
301            warn!(error = ?e, "Failed to disconnect fallback cache");
302        }
303
304        match (primary_result, fallback_result) {
305            (Ok(_), Ok(_)) => Ok(()),
306            (Err(e), _) => Err(e),
307            (_, Err(e)) => Err(e),
308        }
309    }
310
311    async fn check_health(&self) -> Result<()> {
312        if self.is_using_fallback() {
313            let fallback = self.fallback.lock().await;
314            return fallback.check_health().await;
315        }
316
317        let primary = self.primary.lock().await;
318        primary.check_health().await
319    }
320
321    async fn ttl(&self, key: &str) -> Result<Option<Duration>> {
322        self.try_recover().await;
323
324        let _guard = self.recovery_lock.read().await;
325
326        if self.is_using_fallback() {
327            return self.fallback.lock().await.ttl(key).await;
328        }
329
330        let primary = self.primary.lock().await;
331        match primary.ttl(key).await {
332            Ok(result) => Ok(result),
333            Err(e) => {
334                self.switch_to_fallback(&e.to_string());
335                self.fallback.lock().await.ttl(key).await
336            }
337        }
338    }
339
340    async fn set_if_not_exists(&self, key: &str, value: &str, ttl_seconds: u64) -> Result<bool> {
341        self.try_recover().await;
342
343        let _guard = self.recovery_lock.read().await;
344
345        if self.is_using_fallback() {
346            return self
347                .fallback
348                .lock()
349                .await
350                .set_if_not_exists(key, value, ttl_seconds)
351                .await;
352        }
353
354        let primary = self.primary.lock().await;
355        match primary.set_if_not_exists(key, value, ttl_seconds).await {
356            Ok(result) => Ok(result),
357            Err(e) => {
358                self.switch_to_fallback(&e.to_string());
359                self.fallback
360                    .lock()
361                    .await
362                    .set_if_not_exists(key, value, ttl_seconds)
363                    .await
364            }
365        }
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372    use crate::memory_cache_manager::MemoryCacheManager;
373    use sockudo_core::error::Error;
374    use std::sync::Arc;
375    use tokio::sync::Mutex as TokioMutex;
376
377    /// Mock cache that can be configured to fail on demand
378    struct MockCache {
379        should_fail: Arc<TokioMutex<bool>>,
380        data: Arc<TokioMutex<std::collections::HashMap<String, String>>>,
381    }
382
383    impl MockCache {
384        fn new() -> Self {
385            Self {
386                should_fail: Arc::new(TokioMutex::new(false)),
387                data: Arc::new(TokioMutex::new(std::collections::HashMap::new())),
388            }
389        }
390
391        #[allow(dead_code)]
392        async fn set_should_fail(&self, should_fail: bool) {
393            *self.should_fail.lock().await = should_fail;
394        }
395    }
396
397    #[async_trait]
398    impl CacheManager for MockCache {
399        async fn has(&self, key: &str) -> Result<bool> {
400            if *self.should_fail.lock().await {
401                return Err(Error::Cache("Mock failure".to_string()));
402            }
403            Ok(self.data.lock().await.contains_key(key))
404        }
405
406        async fn get(&self, key: &str) -> Result<Option<String>> {
407            if *self.should_fail.lock().await {
408                return Err(Error::Cache("Mock failure".to_string()));
409            }
410            Ok(self.data.lock().await.get(key).cloned())
411        }
412
413        async fn set(&self, key: &str, value: &str, _ttl_seconds: u64) -> Result<()> {
414            if *self.should_fail.lock().await {
415                return Err(Error::Cache("Mock failure".to_string()));
416            }
417            self.data
418                .lock()
419                .await
420                .insert(key.to_string(), value.to_string());
421            Ok(())
422        }
423
424        async fn remove(&self, key: &str) -> Result<()> {
425            if *self.should_fail.lock().await {
426                return Err(Error::Cache("Mock failure".to_string()));
427            }
428            self.data.lock().await.remove(key);
429            Ok(())
430        }
431
432        async fn disconnect(&self) -> Result<()> {
433            Ok(())
434        }
435
436        async fn check_health(&self) -> Result<()> {
437            if *self.should_fail.lock().await {
438                return Err(Error::Cache("Mock failure".to_string()));
439            }
440            Ok(())
441        }
442
443        async fn ttl(&self, _key: &str) -> Result<Option<Duration>> {
444            if *self.should_fail.lock().await {
445                return Err(Error::Cache("Mock failure".to_string()));
446            }
447            Ok(Some(Duration::from_secs(60)))
448        }
449    }
450
451    #[tokio::test]
452    async fn test_fallback_on_primary_failure() {
453        let mock = MockCache::new();
454        let should_fail = mock.should_fail.clone();
455
456        let options = MemoryCacheOptions {
457            ttl: 60,
458            cleanup_interval: 60,
459            max_capacity: 100,
460        };
461
462        let manager = FallbackCacheManager::new(Box::new(mock), options);
463
464        assert!(!manager.is_using_fallback());
465
466        *should_fail.lock().await = true;
467
468        let result = manager.set("test_key", "test_value", 60).await;
469        assert!(result.is_ok());
470
471        assert!(manager.is_using_fallback());
472
473        let value = manager.get("test_key").await.unwrap();
474        assert_eq!(value, Some("test_value".to_string()));
475    }
476
477    #[tokio::test]
478    async fn test_recovery_with_data_sync() {
479        let mock = MockCache::new();
480        let should_fail = mock.should_fail.clone();
481
482        let options = MemoryCacheOptions {
483            ttl: 60,
484            cleanup_interval: 60,
485            max_capacity: 100,
486        };
487
488        let manager = FallbackCacheManager::new(Box::new(mock), options);
489
490        *should_fail.lock().await = true;
491        manager.set("key1", "value1", 60).await.unwrap();
492        manager.set("key2", "value2", 60).await.unwrap();
493
494        assert!(manager.is_using_fallback());
495
496        let value1_fallback = manager.get("key1").await.unwrap();
497        assert_eq!(value1_fallback, Some("value1".to_string()));
498
499        *should_fail.lock().await = false;
500
501        manager.using_fallback.store(true, Ordering::SeqCst);
502        manager.last_failure_time.store(0, Ordering::SeqCst);
503
504        assert!(manager.is_using_fallback());
505        let value2_fallback = manager.get("key2").await.unwrap();
506        assert_eq!(value2_fallback, Some("value2".to_string()));
507    }
508
509    #[tokio::test]
510    async fn test_no_race_condition_during_operations() {
511        let mock = MockCache::new();
512        let should_fail = mock.should_fail.clone();
513
514        let options = MemoryCacheOptions {
515            ttl: 60,
516            cleanup_interval: 60,
517            max_capacity: 100,
518        };
519
520        let manager = Arc::new(TokioMutex::new(FallbackCacheManager::new(
521            Box::new(mock),
522            options,
523        )));
524
525        *should_fail.lock().await = true;
526        manager.lock().await.set("test", "value", 60).await.unwrap();
527
528        assert!(manager.lock().await.is_using_fallback());
529
530        let handles: Vec<_> = (0..10)
531            .map(|i| {
532                let manager_clone = manager.clone();
533                tokio::spawn(async move {
534                    let mgr = manager_clone.lock().await;
535                    mgr.set(&format!("key{}", i), &format!("value{}", i), 60)
536                        .await
537                })
538            })
539            .collect();
540
541        for handle in handles {
542            assert!(handle.await.unwrap().is_ok());
543        }
544
545        assert!(manager.lock().await.is_using_fallback());
546
547        for i in 0..10 {
548            let mgr = manager.lock().await;
549            let value = mgr.get(&format!("key{}", i)).await.unwrap();
550            assert_eq!(value, Some(format!("value{}", i)));
551        }
552    }
553
554    #[tokio::test]
555    async fn test_memory_cache_get_all_entries() {
556        let options = MemoryCacheOptions {
557            ttl: 60,
558            cleanup_interval: 60,
559            max_capacity: 100,
560        };
561
562        let cache = MemoryCacheManager::new("test_prefix".to_string(), options);
563
564        cache.set("key1", "value1", 60).await.unwrap();
565        cache.set("key2", "value2", 60).await.unwrap();
566        cache.set("key3", "value3", 60).await.unwrap();
567
568        let entries = cache.get_all_entries().await;
569
570        assert_eq!(entries.len(), 3);
571
572        let keys: Vec<String> = entries.iter().map(|(k, _, _)| k.clone()).collect();
573        assert!(keys.contains(&"key1".to_string()));
574        assert!(keys.contains(&"key2".to_string()));
575        assert!(keys.contains(&"key3".to_string()));
576
577        for (key, value, ttl) in entries {
578            match key.as_str() {
579                "key1" => assert_eq!(value, "value1"),
580                "key2" => assert_eq!(value, "value2"),
581                "key3" => assert_eq!(value, "value3"),
582                _ => panic!("Unexpected key: {}", key),
583            }
584            assert_eq!(ttl, Some(Duration::from_secs(60)));
585        }
586    }
587
588    #[tokio::test]
589    async fn test_new_with_health_check_healthy_primary() {
590        let mock = MockCache::new();
591        let options = MemoryCacheOptions {
592            ttl: 60,
593            cleanup_interval: 60,
594            max_capacity: 100,
595        };
596
597        let manager = FallbackCacheManager::new_with_health_check(Box::new(mock), options).await;
598
599        assert!(!manager.is_using_fallback());
600    }
601
602    #[tokio::test]
603    async fn test_new_with_health_check_unhealthy_primary() {
604        let mock = MockCache::new();
605        let should_fail = mock.should_fail.clone();
606        *should_fail.lock().await = true;
607
608        let options = MemoryCacheOptions {
609            ttl: 60,
610            cleanup_interval: 60,
611            max_capacity: 100,
612        };
613
614        let manager = FallbackCacheManager::new_with_health_check(Box::new(mock), options).await;
615
616        assert!(manager.is_using_fallback());
617    }
618
619    #[tokio::test]
620    async fn test_new_without_health_check() {
621        let mock = MockCache::new();
622        let should_fail = mock.should_fail.clone();
623        *should_fail.lock().await = true;
624
625        let options = MemoryCacheOptions {
626            ttl: 60,
627            cleanup_interval: 60,
628            max_capacity: 100,
629        };
630
631        let manager = FallbackCacheManager::new(Box::new(mock), options);
632
633        assert!(!manager.is_using_fallback());
634    }
635}