1use crate::memory_cache_manager::MemoryCacheManager;
2use async_trait::async_trait;
3use sockudo_core::cache::{CacheManager, CacheScanPage};
4use sockudo_core::error::Result;
5use sockudo_core::options::MemoryCacheOptions;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8use tokio::sync::{Mutex, RwLock};
9use tracing::{debug, info, warn};
10
11const RECOVERY_CHECK_INTERVAL_SECS: u64 = 30;
12
13pub struct FallbackCacheManager {
19 primary: Mutex<Box<dyn CacheManager + Send + Sync>>,
20 fallback: Mutex<MemoryCacheManager>,
21 using_fallback: AtomicBool,
22 last_failure_time: AtomicU64,
23 start_time: Instant,
24 recovery_lock: RwLock<()>,
26}
27
28impl FallbackCacheManager {
29 pub fn new(
33 primary: Box<dyn CacheManager + Send + Sync>,
34 fallback_options: MemoryCacheOptions,
35 ) -> Self {
36 let fallback = MemoryCacheManager::new("fallback_cache".to_string(), fallback_options);
37
38 Self {
39 primary: Mutex::new(primary),
40 fallback: Mutex::new(fallback),
41 using_fallback: AtomicBool::new(false),
42 last_failure_time: AtomicU64::new(0),
43 start_time: Instant::now(),
44 recovery_lock: RwLock::new(()),
45 }
46 }
47
48 pub async fn new_with_health_check(
52 primary: Box<dyn CacheManager + Send + Sync>,
53 fallback_options: MemoryCacheOptions,
54 ) -> Self {
55 let fallback = MemoryCacheManager::new("fallback_cache".to_string(), fallback_options);
56 let start_time = Instant::now();
57
58 let using_fallback = match primary.check_health().await {
59 Ok(()) => {
60 debug!("Primary cache is healthy at startup");
61 false
62 }
63 Err(e) => {
64 warn!(
65 "Primary cache unavailable at startup, starting in fallback mode. Error: {}",
66 e
67 );
68 true
69 }
70 };
71
72 let last_failure_time = if using_fallback {
73 start_time.elapsed().as_secs()
74 } else {
75 0
76 };
77
78 Self {
79 primary: Mutex::new(primary),
80 fallback: Mutex::new(fallback),
81 using_fallback: AtomicBool::new(using_fallback),
82 last_failure_time: AtomicU64::new(last_failure_time),
83 start_time,
84 recovery_lock: RwLock::new(()),
85 }
86 }
87
88 fn is_using_fallback(&self) -> bool {
89 self.using_fallback.load(Ordering::SeqCst)
90 }
91
92 fn switch_to_fallback(&self, error: &str) {
93 if !self.using_fallback.swap(true, Ordering::SeqCst) {
94 warn!(
95 "Redis cache unavailable, switching to in-memory fallback. Error: {}",
96 error
97 );
98 self.last_failure_time
99 .store(self.start_time.elapsed().as_secs(), Ordering::SeqCst);
100 }
101 }
102
103 fn should_attempt_recovery(&self) -> bool {
104 if !self.is_using_fallback() {
105 return false;
106 }
107
108 let last_failure = self.last_failure_time.load(Ordering::SeqCst);
109 let current_time = self.start_time.elapsed().as_secs();
110
111 current_time.saturating_sub(last_failure) >= RECOVERY_CHECK_INTERVAL_SECS
112 }
113
114 async fn try_recover(&self) -> bool {
115 if !self.should_attempt_recovery() {
116 return false;
117 }
118
119 let _recovery_guard = self.recovery_lock.write().await;
121
122 if !self.is_using_fallback() {
124 return false;
125 }
126
127 debug!("Attempting to recover Redis cache connection...");
128
129 let mut primary = self.primary.lock().await;
130 match primary.check_health().await {
131 Ok(()) => {
132 info!("Redis cache connection recovered, synchronizing fallback data...");
133
134 if let Err(e) = self.sync_fallback_to_primary(&mut primary).await {
135 warn!(
136 "Failed to sync fallback data to primary during recovery: {}",
137 e
138 );
139 }
140
141 self.using_fallback.store(false, Ordering::SeqCst);
142 info!("Successfully switched back to primary cache after recovery");
143 true
144 }
145 Err(e) => {
146 debug!("Redis cache still unavailable: {}", e);
147 self.last_failure_time
148 .store(self.start_time.elapsed().as_secs(), Ordering::SeqCst);
149 false
150 }
151 }
152 }
153
154 async fn sync_fallback_to_primary(
157 &self,
158 primary: &mut Box<dyn CacheManager + Send + Sync>,
159 ) -> Result<()> {
160 let fallback = self.fallback.lock().await;
161
162 let entries = fallback.get_all_entries().await;
163
164 if entries.is_empty() {
165 debug!("No entries in fallback cache to sync");
166 return Ok(());
167 }
168
169 debug!(
170 "Syncing {} entries from fallback to primary cache",
171 entries.len()
172 );
173
174 let mut synced = 0;
175 let mut failed = 0;
176
177 for (key, value, ttl) in entries {
178 let ttl_seconds = ttl.map(|d| d.as_secs()).unwrap_or(0);
179 match primary.set(&key, &value, ttl_seconds).await {
180 Ok(()) => synced += 1,
181 Err(e) => {
182 warn!("Failed to sync key '{}' to primary cache: {}", key, e);
183 failed += 1;
184 }
185 }
186 }
187
188 if failed > 0 {
189 warn!(
190 "Synced {}/{} entries from fallback to primary ({} failed)",
191 synced,
192 synced + failed,
193 failed
194 );
195 } else {
196 info!(
197 "Successfully synced {} entries from fallback to primary cache",
198 synced
199 );
200 }
201
202 Ok(())
203 }
204}
205
206#[async_trait]
207impl CacheManager for FallbackCacheManager {
208 async fn has(&self, key: &str) -> Result<bool> {
209 self.try_recover().await;
210
211 let _guard = self.recovery_lock.read().await;
212
213 if self.is_using_fallback() {
214 return self.fallback.lock().await.has(key).await;
215 }
216
217 let primary = self.primary.lock().await;
218 match primary.has(key).await {
219 Ok(result) => Ok(result),
220 Err(e) => {
221 self.switch_to_fallback(&e.to_string());
222 self.fallback.lock().await.has(key).await
223 }
224 }
225 }
226
227 async fn get(&self, key: &str) -> Result<Option<String>> {
228 self.try_recover().await;
229
230 let _guard = self.recovery_lock.read().await;
231
232 if self.is_using_fallback() {
233 return self.fallback.lock().await.get(key).await;
234 }
235
236 let primary = self.primary.lock().await;
237 match primary.get(key).await {
238 Ok(result) => Ok(result),
239 Err(e) => {
240 self.switch_to_fallback(&e.to_string());
241 self.fallback.lock().await.get(key).await
242 }
243 }
244 }
245
246 async fn set(&self, key: &str, value: &str, ttl_seconds: u64) -> Result<()> {
247 self.try_recover().await;
248
249 let _guard = self.recovery_lock.read().await;
250
251 if self.is_using_fallback() {
252 return self
253 .fallback
254 .lock()
255 .await
256 .set(key, value, ttl_seconds)
257 .await;
258 }
259
260 let primary = self.primary.lock().await;
261 match primary.set(key, value, ttl_seconds).await {
262 Ok(()) => Ok(()),
263 Err(e) => {
264 self.switch_to_fallback(&e.to_string());
265 self.fallback
266 .lock()
267 .await
268 .set(key, value, ttl_seconds)
269 .await
270 }
271 }
272 }
273
274 async fn remove(&self, key: &str) -> Result<()> {
275 self.try_recover().await;
276
277 let _guard = self.recovery_lock.read().await;
278
279 if self.is_using_fallback() {
280 return self.fallback.lock().await.remove(key).await;
281 }
282
283 let primary = self.primary.lock().await;
284 match primary.remove(key).await {
285 Ok(()) => Ok(()),
286 Err(e) => {
287 self.switch_to_fallback(&e.to_string());
288 self.fallback.lock().await.remove(key).await
289 }
290 }
291 }
292
293 async fn disconnect(&self) -> Result<()> {
294 let primary_result = self.primary.lock().await.disconnect().await;
295 if let Err(ref e) = primary_result {
296 warn!(error = ?e, "Failed to disconnect primary cache");
297 }
298
299 let fallback_result = self.fallback.lock().await.disconnect().await;
300 if let Err(ref e) = fallback_result {
301 warn!(error = ?e, "Failed to disconnect fallback cache");
302 }
303
304 match (primary_result, fallback_result) {
305 (Ok(_), Ok(_)) => Ok(()),
306 (Err(e), _) => Err(e),
307 (_, Err(e)) => Err(e),
308 }
309 }
310
311 async fn check_health(&self) -> Result<()> {
312 if self.is_using_fallback() {
313 let fallback = self.fallback.lock().await;
314 return fallback.check_health().await;
315 }
316
317 let primary = self.primary.lock().await;
318 primary.check_health().await
319 }
320
321 async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<(String, String)>> {
322 self.try_recover().await;
323
324 let _guard = self.recovery_lock.read().await;
325
326 if self.is_using_fallback() {
327 return self.fallback.lock().await.scan_prefix(prefix, limit).await;
328 }
329
330 let primary = self.primary.lock().await;
331 match primary.scan_prefix(prefix, limit).await {
332 Ok(result) => Ok(result),
333 Err(e) => {
334 self.switch_to_fallback(&e.to_string());
335 self.fallback.lock().await.scan_prefix(prefix, limit).await
336 }
337 }
338 }
339
340 async fn scan_prefix_page(
341 &self,
342 prefix: &str,
343 cursor: Option<String>,
344 limit: usize,
345 ) -> Result<CacheScanPage> {
346 self.try_recover().await;
347
348 let _guard = self.recovery_lock.read().await;
349
350 if self.is_using_fallback() {
351 return self
352 .fallback
353 .lock()
354 .await
355 .scan_prefix_page(prefix, cursor, limit)
356 .await;
357 }
358
359 let primary = self.primary.lock().await;
360 match primary
361 .scan_prefix_page(prefix, cursor.clone(), limit)
362 .await
363 {
364 Ok(result) => Ok(result),
365 Err(e) => {
366 self.switch_to_fallback(&e.to_string());
367 self.fallback
368 .lock()
369 .await
370 .scan_prefix_page(prefix, cursor, limit)
371 .await
372 }
373 }
374 }
375
376 async fn ttl(&self, key: &str) -> Result<Option<Duration>> {
377 self.try_recover().await;
378
379 let _guard = self.recovery_lock.read().await;
380
381 if self.is_using_fallback() {
382 return self.fallback.lock().await.ttl(key).await;
383 }
384
385 let primary = self.primary.lock().await;
386 match primary.ttl(key).await {
387 Ok(result) => Ok(result),
388 Err(e) => {
389 self.switch_to_fallback(&e.to_string());
390 self.fallback.lock().await.ttl(key).await
391 }
392 }
393 }
394
395 async fn set_if_not_exists(&self, key: &str, value: &str, ttl_seconds: u64) -> Result<bool> {
396 self.try_recover().await;
397
398 let _guard = self.recovery_lock.read().await;
399
400 if self.is_using_fallback() {
401 return self
402 .fallback
403 .lock()
404 .await
405 .set_if_not_exists(key, value, ttl_seconds)
406 .await;
407 }
408
409 let primary = self.primary.lock().await;
410 match primary.set_if_not_exists(key, value, ttl_seconds).await {
411 Ok(result) => Ok(result),
412 Err(e) => {
413 self.switch_to_fallback(&e.to_string());
414 self.fallback
415 .lock()
416 .await
417 .set_if_not_exists(key, value, ttl_seconds)
418 .await
419 }
420 }
421 }
422
423 async fn increment_by(&self, key: &str, delta: i64, ttl_seconds: u64) -> Result<i64> {
424 self.try_recover().await;
425
426 let _guard = self.recovery_lock.read().await;
427
428 if self.is_using_fallback() {
429 return self
430 .fallback
431 .lock()
432 .await
433 .increment_by(key, delta, ttl_seconds)
434 .await;
435 }
436
437 let primary = self.primary.lock().await;
438 match primary.increment_by(key, delta, ttl_seconds).await {
439 Ok(result) => Ok(result),
440 Err(e) => {
441 self.switch_to_fallback(&e.to_string());
442 self.fallback
443 .lock()
444 .await
445 .increment_by(key, delta, ttl_seconds)
446 .await
447 }
448 }
449 }
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455 use crate::memory_cache_manager::MemoryCacheManager;
456 use sockudo_core::error::Error;
457 use std::sync::Arc;
458 use tokio::sync::Mutex as TokioMutex;
459
460 struct MockCache {
462 should_fail: Arc<TokioMutex<bool>>,
463 data: Arc<TokioMutex<std::collections::HashMap<String, String>>>,
464 }
465
466 impl MockCache {
467 fn new() -> Self {
468 Self {
469 should_fail: Arc::new(TokioMutex::new(false)),
470 data: Arc::new(TokioMutex::new(std::collections::HashMap::new())),
471 }
472 }
473
474 #[allow(dead_code)]
475 async fn set_should_fail(&self, should_fail: bool) {
476 *self.should_fail.lock().await = should_fail;
477 }
478 }
479
480 #[async_trait]
481 impl CacheManager for MockCache {
482 async fn has(&self, key: &str) -> Result<bool> {
483 if *self.should_fail.lock().await {
484 return Err(Error::Cache("Mock failure".to_string()));
485 }
486 Ok(self.data.lock().await.contains_key(key))
487 }
488
489 async fn get(&self, key: &str) -> Result<Option<String>> {
490 if *self.should_fail.lock().await {
491 return Err(Error::Cache("Mock failure".to_string()));
492 }
493 Ok(self.data.lock().await.get(key).cloned())
494 }
495
496 async fn set(&self, key: &str, value: &str, _ttl_seconds: u64) -> Result<()> {
497 if *self.should_fail.lock().await {
498 return Err(Error::Cache("Mock failure".to_string()));
499 }
500 self.data
501 .lock()
502 .await
503 .insert(key.to_string(), value.to_string());
504 Ok(())
505 }
506
507 async fn remove(&self, key: &str) -> Result<()> {
508 if *self.should_fail.lock().await {
509 return Err(Error::Cache("Mock failure".to_string()));
510 }
511 self.data.lock().await.remove(key);
512 Ok(())
513 }
514
515 async fn disconnect(&self) -> Result<()> {
516 Ok(())
517 }
518
519 async fn check_health(&self) -> Result<()> {
520 if *self.should_fail.lock().await {
521 return Err(Error::Cache("Mock failure".to_string()));
522 }
523 Ok(())
524 }
525
526 async fn ttl(&self, _key: &str) -> Result<Option<Duration>> {
527 if *self.should_fail.lock().await {
528 return Err(Error::Cache("Mock failure".to_string()));
529 }
530 Ok(Some(Duration::from_secs(60)))
531 }
532 }
533
534 #[tokio::test]
535 async fn test_fallback_on_primary_failure() {
536 let mock = MockCache::new();
537 let should_fail = mock.should_fail.clone();
538
539 let options = MemoryCacheOptions {
540 ttl: 60,
541 cleanup_interval: 60,
542 max_capacity: 100,
543 };
544
545 let manager = FallbackCacheManager::new(Box::new(mock), options);
546
547 assert!(!manager.is_using_fallback());
548
549 *should_fail.lock().await = true;
550
551 let result = manager.set("test_key", "test_value", 60).await;
552 assert!(result.is_ok());
553
554 assert!(manager.is_using_fallback());
555
556 let value = manager.get("test_key").await.unwrap();
557 assert_eq!(value, Some("test_value".to_string()));
558 }
559
560 #[tokio::test]
561 async fn test_recovery_with_data_sync() {
562 let mock = MockCache::new();
563 let should_fail = mock.should_fail.clone();
564
565 let options = MemoryCacheOptions {
566 ttl: 60,
567 cleanup_interval: 60,
568 max_capacity: 100,
569 };
570
571 let manager = FallbackCacheManager::new(Box::new(mock), options);
572
573 *should_fail.lock().await = true;
574 manager.set("key1", "value1", 60).await.unwrap();
575 manager.set("key2", "value2", 60).await.unwrap();
576
577 assert!(manager.is_using_fallback());
578
579 let value1_fallback = manager.get("key1").await.unwrap();
580 assert_eq!(value1_fallback, Some("value1".to_string()));
581
582 *should_fail.lock().await = false;
583
584 manager.using_fallback.store(true, Ordering::SeqCst);
585 manager.last_failure_time.store(0, Ordering::SeqCst);
586
587 assert!(manager.is_using_fallback());
588 let value2_fallback = manager.get("key2").await.unwrap();
589 assert_eq!(value2_fallback, Some("value2".to_string()));
590 }
591
592 #[tokio::test]
593 async fn test_no_race_condition_during_operations() {
594 let mock = MockCache::new();
595 let should_fail = mock.should_fail.clone();
596
597 let options = MemoryCacheOptions {
598 ttl: 60,
599 cleanup_interval: 60,
600 max_capacity: 100,
601 };
602
603 let manager = Arc::new(TokioMutex::new(FallbackCacheManager::new(
604 Box::new(mock),
605 options,
606 )));
607
608 *should_fail.lock().await = true;
609 manager.lock().await.set("test", "value", 60).await.unwrap();
610
611 assert!(manager.lock().await.is_using_fallback());
612
613 let handles: Vec<_> = (0..10)
614 .map(|i| {
615 let manager_clone = manager.clone();
616 tokio::spawn(async move {
617 let mgr = manager_clone.lock().await;
618 mgr.set(&format!("key{}", i), &format!("value{}", i), 60)
619 .await
620 })
621 })
622 .collect();
623
624 for handle in handles {
625 assert!(handle.await.unwrap().is_ok());
626 }
627
628 assert!(manager.lock().await.is_using_fallback());
629
630 for i in 0..10 {
631 let mgr = manager.lock().await;
632 let value = mgr.get(&format!("key{}", i)).await.unwrap();
633 assert_eq!(value, Some(format!("value{}", i)));
634 }
635 }
636
637 #[tokio::test]
638 async fn test_memory_cache_get_all_entries() {
639 let options = MemoryCacheOptions {
640 ttl: 60,
641 cleanup_interval: 60,
642 max_capacity: 100,
643 };
644
645 let cache = MemoryCacheManager::new("test_prefix".to_string(), options);
646
647 cache.set("key1", "value1", 60).await.unwrap();
648 cache.set("key2", "value2", 60).await.unwrap();
649 cache.set("key3", "value3", 60).await.unwrap();
650
651 let entries = cache.get_all_entries().await;
652
653 assert_eq!(entries.len(), 3);
654
655 let keys: Vec<String> = entries.iter().map(|(k, _, _)| k.clone()).collect();
656 assert!(keys.contains(&"key1".to_string()));
657 assert!(keys.contains(&"key2".to_string()));
658 assert!(keys.contains(&"key3".to_string()));
659
660 for (key, value, ttl) in entries {
661 match key.as_str() {
662 "key1" => assert_eq!(value, "value1"),
663 "key2" => assert_eq!(value, "value2"),
664 "key3" => assert_eq!(value, "value3"),
665 _ => panic!("Unexpected key: {}", key),
666 }
667 assert_eq!(ttl, Some(Duration::from_secs(60)));
668 }
669 }
670
671 #[tokio::test]
672 async fn test_new_with_health_check_healthy_primary() {
673 let mock = MockCache::new();
674 let options = MemoryCacheOptions {
675 ttl: 60,
676 cleanup_interval: 60,
677 max_capacity: 100,
678 };
679
680 let manager = FallbackCacheManager::new_with_health_check(Box::new(mock), options).await;
681
682 assert!(!manager.is_using_fallback());
683 }
684
685 #[tokio::test]
686 async fn test_new_with_health_check_unhealthy_primary() {
687 let mock = MockCache::new();
688 let should_fail = mock.should_fail.clone();
689 *should_fail.lock().await = true;
690
691 let options = MemoryCacheOptions {
692 ttl: 60,
693 cleanup_interval: 60,
694 max_capacity: 100,
695 };
696
697 let manager = FallbackCacheManager::new_with_health_check(Box::new(mock), options).await;
698
699 assert!(manager.is_using_fallback());
700 }
701
702 #[tokio::test]
703 async fn test_new_without_health_check() {
704 let mock = MockCache::new();
705 let should_fail = mock.should_fail.clone();
706 *should_fail.lock().await = true;
707
708 let options = MemoryCacheOptions {
709 ttl: 60,
710 cleanup_interval: 60,
711 max_capacity: 100,
712 };
713
714 let manager = FallbackCacheManager::new(Box::new(mock), options);
715
716 assert!(!manager.is_using_fallback());
717 }
718}