1use crate::memory_cache_manager::MemoryCacheManager;
2use async_trait::async_trait;
3use sockudo_core::cache::CacheManager;
4use sockudo_core::error::Result;
5use sockudo_core::options::MemoryCacheOptions;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8use tokio::sync::{Mutex, RwLock};
9use tracing::{debug, info, warn};
10
11const RECOVERY_CHECK_INTERVAL_SECS: u64 = 30;
12
13pub struct FallbackCacheManager {
19 primary: Mutex<Box<dyn CacheManager + Send + Sync>>,
20 fallback: Mutex<MemoryCacheManager>,
21 using_fallback: AtomicBool,
22 last_failure_time: AtomicU64,
23 start_time: Instant,
24 recovery_lock: RwLock<()>,
26}
27
28impl FallbackCacheManager {
29 pub fn new(
33 primary: Box<dyn CacheManager + Send + Sync>,
34 fallback_options: MemoryCacheOptions,
35 ) -> Self {
36 let fallback = MemoryCacheManager::new("fallback_cache".to_string(), fallback_options);
37
38 Self {
39 primary: Mutex::new(primary),
40 fallback: Mutex::new(fallback),
41 using_fallback: AtomicBool::new(false),
42 last_failure_time: AtomicU64::new(0),
43 start_time: Instant::now(),
44 recovery_lock: RwLock::new(()),
45 }
46 }
47
48 pub async fn new_with_health_check(
52 primary: Box<dyn CacheManager + Send + Sync>,
53 fallback_options: MemoryCacheOptions,
54 ) -> Self {
55 let fallback = MemoryCacheManager::new("fallback_cache".to_string(), fallback_options);
56 let start_time = Instant::now();
57
58 let using_fallback = match primary.check_health().await {
59 Ok(()) => {
60 debug!("Primary cache is healthy at startup");
61 false
62 }
63 Err(e) => {
64 warn!(
65 "Primary cache unavailable at startup, starting in fallback mode. Error: {}",
66 e
67 );
68 true
69 }
70 };
71
72 let last_failure_time = if using_fallback {
73 start_time.elapsed().as_secs()
74 } else {
75 0
76 };
77
78 Self {
79 primary: Mutex::new(primary),
80 fallback: Mutex::new(fallback),
81 using_fallback: AtomicBool::new(using_fallback),
82 last_failure_time: AtomicU64::new(last_failure_time),
83 start_time,
84 recovery_lock: RwLock::new(()),
85 }
86 }
87
88 fn is_using_fallback(&self) -> bool {
89 self.using_fallback.load(Ordering::SeqCst)
90 }
91
92 fn switch_to_fallback(&self, error: &str) {
93 if !self.using_fallback.swap(true, Ordering::SeqCst) {
94 warn!(
95 "Redis cache unavailable, switching to in-memory fallback. Error: {}",
96 error
97 );
98 self.last_failure_time
99 .store(self.start_time.elapsed().as_secs(), Ordering::SeqCst);
100 }
101 }
102
103 fn should_attempt_recovery(&self) -> bool {
104 if !self.is_using_fallback() {
105 return false;
106 }
107
108 let last_failure = self.last_failure_time.load(Ordering::SeqCst);
109 let current_time = self.start_time.elapsed().as_secs();
110
111 current_time.saturating_sub(last_failure) >= RECOVERY_CHECK_INTERVAL_SECS
112 }
113
114 async fn try_recover(&self) -> bool {
115 if !self.should_attempt_recovery() {
116 return false;
117 }
118
119 let _recovery_guard = self.recovery_lock.write().await;
121
122 if !self.is_using_fallback() {
124 return false;
125 }
126
127 debug!("Attempting to recover Redis cache connection...");
128
129 let mut primary = self.primary.lock().await;
130 match primary.check_health().await {
131 Ok(()) => {
132 info!("Redis cache connection recovered, synchronizing fallback data...");
133
134 if let Err(e) = self.sync_fallback_to_primary(&mut primary).await {
135 warn!(
136 "Failed to sync fallback data to primary during recovery: {}",
137 e
138 );
139 }
140
141 self.using_fallback.store(false, Ordering::SeqCst);
142 info!("Successfully switched back to primary cache after recovery");
143 true
144 }
145 Err(e) => {
146 debug!("Redis cache still unavailable: {}", e);
147 self.last_failure_time
148 .store(self.start_time.elapsed().as_secs(), Ordering::SeqCst);
149 false
150 }
151 }
152 }
153
154 async fn sync_fallback_to_primary(
157 &self,
158 primary: &mut Box<dyn CacheManager + Send + Sync>,
159 ) -> Result<()> {
160 let fallback = self.fallback.lock().await;
161
162 let entries = fallback.get_all_entries().await;
163
164 if entries.is_empty() {
165 debug!("No entries in fallback cache to sync");
166 return Ok(());
167 }
168
169 debug!(
170 "Syncing {} entries from fallback to primary cache",
171 entries.len()
172 );
173
174 let mut synced = 0;
175 let mut failed = 0;
176
177 for (key, value, ttl) in entries {
178 let ttl_seconds = ttl.map(|d| d.as_secs()).unwrap_or(0);
179 match primary.set(&key, &value, ttl_seconds).await {
180 Ok(()) => synced += 1,
181 Err(e) => {
182 warn!("Failed to sync key '{}' to primary cache: {}", key, e);
183 failed += 1;
184 }
185 }
186 }
187
188 if failed > 0 {
189 warn!(
190 "Synced {}/{} entries from fallback to primary ({} failed)",
191 synced,
192 synced + failed,
193 failed
194 );
195 } else {
196 info!(
197 "Successfully synced {} entries from fallback to primary cache",
198 synced
199 );
200 }
201
202 Ok(())
203 }
204}
205
206#[async_trait]
207impl CacheManager for FallbackCacheManager {
208 async fn has(&self, key: &str) -> Result<bool> {
209 self.try_recover().await;
210
211 let _guard = self.recovery_lock.read().await;
212
213 if self.is_using_fallback() {
214 return self.fallback.lock().await.has(key).await;
215 }
216
217 let primary = self.primary.lock().await;
218 match primary.has(key).await {
219 Ok(result) => Ok(result),
220 Err(e) => {
221 self.switch_to_fallback(&e.to_string());
222 self.fallback.lock().await.has(key).await
223 }
224 }
225 }
226
227 async fn get(&self, key: &str) -> Result<Option<String>> {
228 self.try_recover().await;
229
230 let _guard = self.recovery_lock.read().await;
231
232 if self.is_using_fallback() {
233 return self.fallback.lock().await.get(key).await;
234 }
235
236 let primary = self.primary.lock().await;
237 match primary.get(key).await {
238 Ok(result) => Ok(result),
239 Err(e) => {
240 self.switch_to_fallback(&e.to_string());
241 self.fallback.lock().await.get(key).await
242 }
243 }
244 }
245
246 async fn set(&self, key: &str, value: &str, ttl_seconds: u64) -> Result<()> {
247 self.try_recover().await;
248
249 let _guard = self.recovery_lock.read().await;
250
251 if self.is_using_fallback() {
252 return self
253 .fallback
254 .lock()
255 .await
256 .set(key, value, ttl_seconds)
257 .await;
258 }
259
260 let primary = self.primary.lock().await;
261 match primary.set(key, value, ttl_seconds).await {
262 Ok(()) => Ok(()),
263 Err(e) => {
264 self.switch_to_fallback(&e.to_string());
265 self.fallback
266 .lock()
267 .await
268 .set(key, value, ttl_seconds)
269 .await
270 }
271 }
272 }
273
274 async fn remove(&self, key: &str) -> Result<()> {
275 self.try_recover().await;
276
277 let _guard = self.recovery_lock.read().await;
278
279 if self.is_using_fallback() {
280 return self.fallback.lock().await.remove(key).await;
281 }
282
283 let primary = self.primary.lock().await;
284 match primary.remove(key).await {
285 Ok(()) => Ok(()),
286 Err(e) => {
287 self.switch_to_fallback(&e.to_string());
288 self.fallback.lock().await.remove(key).await
289 }
290 }
291 }
292
293 async fn disconnect(&self) -> Result<()> {
294 let primary_result = self.primary.lock().await.disconnect().await;
295 if let Err(ref e) = primary_result {
296 warn!(error = ?e, "Failed to disconnect primary cache");
297 }
298
299 let fallback_result = self.fallback.lock().await.disconnect().await;
300 if let Err(ref e) = fallback_result {
301 warn!(error = ?e, "Failed to disconnect fallback cache");
302 }
303
304 match (primary_result, fallback_result) {
305 (Ok(_), Ok(_)) => Ok(()),
306 (Err(e), _) => Err(e),
307 (_, Err(e)) => Err(e),
308 }
309 }
310
311 async fn check_health(&self) -> Result<()> {
312 if self.is_using_fallback() {
313 let fallback = self.fallback.lock().await;
314 return fallback.check_health().await;
315 }
316
317 let primary = self.primary.lock().await;
318 primary.check_health().await
319 }
320
321 async fn ttl(&self, key: &str) -> Result<Option<Duration>> {
322 self.try_recover().await;
323
324 let _guard = self.recovery_lock.read().await;
325
326 if self.is_using_fallback() {
327 return self.fallback.lock().await.ttl(key).await;
328 }
329
330 let primary = self.primary.lock().await;
331 match primary.ttl(key).await {
332 Ok(result) => Ok(result),
333 Err(e) => {
334 self.switch_to_fallback(&e.to_string());
335 self.fallback.lock().await.ttl(key).await
336 }
337 }
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use crate::memory_cache_manager::MemoryCacheManager;
345 use sockudo_core::error::Error;
346 use std::sync::Arc;
347 use tokio::sync::Mutex as TokioMutex;
348
349 struct MockCache {
351 should_fail: Arc<TokioMutex<bool>>,
352 data: Arc<TokioMutex<std::collections::HashMap<String, String>>>,
353 }
354
355 impl MockCache {
356 fn new() -> Self {
357 Self {
358 should_fail: Arc::new(TokioMutex::new(false)),
359 data: Arc::new(TokioMutex::new(std::collections::HashMap::new())),
360 }
361 }
362
363 #[allow(dead_code)]
364 async fn set_should_fail(&self, should_fail: bool) {
365 *self.should_fail.lock().await = should_fail;
366 }
367 }
368
369 #[async_trait]
370 impl CacheManager for MockCache {
371 async fn has(&self, key: &str) -> Result<bool> {
372 if *self.should_fail.lock().await {
373 return Err(Error::Cache("Mock failure".to_string()));
374 }
375 Ok(self.data.lock().await.contains_key(key))
376 }
377
378 async fn get(&self, key: &str) -> Result<Option<String>> {
379 if *self.should_fail.lock().await {
380 return Err(Error::Cache("Mock failure".to_string()));
381 }
382 Ok(self.data.lock().await.get(key).cloned())
383 }
384
385 async fn set(&self, key: &str, value: &str, _ttl_seconds: u64) -> Result<()> {
386 if *self.should_fail.lock().await {
387 return Err(Error::Cache("Mock failure".to_string()));
388 }
389 self.data
390 .lock()
391 .await
392 .insert(key.to_string(), value.to_string());
393 Ok(())
394 }
395
396 async fn remove(&self, key: &str) -> Result<()> {
397 if *self.should_fail.lock().await {
398 return Err(Error::Cache("Mock failure".to_string()));
399 }
400 self.data.lock().await.remove(key);
401 Ok(())
402 }
403
404 async fn disconnect(&self) -> Result<()> {
405 Ok(())
406 }
407
408 async fn check_health(&self) -> Result<()> {
409 if *self.should_fail.lock().await {
410 return Err(Error::Cache("Mock failure".to_string()));
411 }
412 Ok(())
413 }
414
415 async fn ttl(&self, _key: &str) -> Result<Option<Duration>> {
416 if *self.should_fail.lock().await {
417 return Err(Error::Cache("Mock failure".to_string()));
418 }
419 Ok(Some(Duration::from_secs(60)))
420 }
421 }
422
423 #[tokio::test]
424 async fn test_fallback_on_primary_failure() {
425 let mock = MockCache::new();
426 let should_fail = mock.should_fail.clone();
427
428 let options = MemoryCacheOptions {
429 ttl: 60,
430 cleanup_interval: 60,
431 max_capacity: 100,
432 };
433
434 let manager = FallbackCacheManager::new(Box::new(mock), options);
435
436 assert!(!manager.is_using_fallback());
437
438 *should_fail.lock().await = true;
439
440 let result = manager.set("test_key", "test_value", 60).await;
441 assert!(result.is_ok());
442
443 assert!(manager.is_using_fallback());
444
445 let value = manager.get("test_key").await.unwrap();
446 assert_eq!(value, Some("test_value".to_string()));
447 }
448
449 #[tokio::test]
450 async fn test_recovery_with_data_sync() {
451 let mock = MockCache::new();
452 let should_fail = mock.should_fail.clone();
453
454 let options = MemoryCacheOptions {
455 ttl: 60,
456 cleanup_interval: 60,
457 max_capacity: 100,
458 };
459
460 let manager = FallbackCacheManager::new(Box::new(mock), options);
461
462 *should_fail.lock().await = true;
463 manager.set("key1", "value1", 60).await.unwrap();
464 manager.set("key2", "value2", 60).await.unwrap();
465
466 assert!(manager.is_using_fallback());
467
468 let value1_fallback = manager.get("key1").await.unwrap();
469 assert_eq!(value1_fallback, Some("value1".to_string()));
470
471 *should_fail.lock().await = false;
472
473 manager.using_fallback.store(true, Ordering::SeqCst);
474 manager.last_failure_time.store(0, Ordering::SeqCst);
475
476 assert!(manager.is_using_fallback());
477 let value2_fallback = manager.get("key2").await.unwrap();
478 assert_eq!(value2_fallback, Some("value2".to_string()));
479 }
480
481 #[tokio::test]
482 async fn test_no_race_condition_during_operations() {
483 let mock = MockCache::new();
484 let should_fail = mock.should_fail.clone();
485
486 let options = MemoryCacheOptions {
487 ttl: 60,
488 cleanup_interval: 60,
489 max_capacity: 100,
490 };
491
492 let manager = Arc::new(TokioMutex::new(FallbackCacheManager::new(
493 Box::new(mock),
494 options,
495 )));
496
497 *should_fail.lock().await = true;
498 manager.lock().await.set("test", "value", 60).await.unwrap();
499
500 assert!(manager.lock().await.is_using_fallback());
501
502 let handles: Vec<_> = (0..10)
503 .map(|i| {
504 let manager_clone = manager.clone();
505 tokio::spawn(async move {
506 let mgr = manager_clone.lock().await;
507 mgr.set(&format!("key{}", i), &format!("value{}", i), 60)
508 .await
509 })
510 })
511 .collect();
512
513 for handle in handles {
514 assert!(handle.await.unwrap().is_ok());
515 }
516
517 assert!(manager.lock().await.is_using_fallback());
518
519 for i in 0..10 {
520 let mgr = manager.lock().await;
521 let value = mgr.get(&format!("key{}", i)).await.unwrap();
522 assert_eq!(value, Some(format!("value{}", i)));
523 }
524 }
525
526 #[tokio::test]
527 async fn test_memory_cache_get_all_entries() {
528 let options = MemoryCacheOptions {
529 ttl: 60,
530 cleanup_interval: 60,
531 max_capacity: 100,
532 };
533
534 let cache = MemoryCacheManager::new("test_prefix".to_string(), options);
535
536 cache.set("key1", "value1", 60).await.unwrap();
537 cache.set("key2", "value2", 60).await.unwrap();
538 cache.set("key3", "value3", 60).await.unwrap();
539
540 let entries = cache.get_all_entries().await;
541
542 assert_eq!(entries.len(), 3);
543
544 let keys: Vec<String> = entries.iter().map(|(k, _, _)| k.clone()).collect();
545 assert!(keys.contains(&"key1".to_string()));
546 assert!(keys.contains(&"key2".to_string()));
547 assert!(keys.contains(&"key3".to_string()));
548
549 for (key, value, ttl) in entries {
550 match key.as_str() {
551 "key1" => assert_eq!(value, "value1"),
552 "key2" => assert_eq!(value, "value2"),
553 "key3" => assert_eq!(value, "value3"),
554 _ => panic!("Unexpected key: {}", key),
555 }
556 assert_eq!(ttl, Some(Duration::from_secs(60)));
557 }
558 }
559
560 #[tokio::test]
561 async fn test_new_with_health_check_healthy_primary() {
562 let mock = MockCache::new();
563 let options = MemoryCacheOptions {
564 ttl: 60,
565 cleanup_interval: 60,
566 max_capacity: 100,
567 };
568
569 let manager = FallbackCacheManager::new_with_health_check(Box::new(mock), options).await;
570
571 assert!(!manager.is_using_fallback());
572 }
573
574 #[tokio::test]
575 async fn test_new_with_health_check_unhealthy_primary() {
576 let mock = MockCache::new();
577 let should_fail = mock.should_fail.clone();
578 *should_fail.lock().await = true;
579
580 let options = MemoryCacheOptions {
581 ttl: 60,
582 cleanup_interval: 60,
583 max_capacity: 100,
584 };
585
586 let manager = FallbackCacheManager::new_with_health_check(Box::new(mock), options).await;
587
588 assert!(manager.is_using_fallback());
589 }
590
591 #[tokio::test]
592 async fn test_new_without_health_check() {
593 let mock = MockCache::new();
594 let should_fail = mock.should_fail.clone();
595 *should_fail.lock().await = true;
596
597 let options = MemoryCacheOptions {
598 ttl: 60,
599 cleanup_interval: 60,
600 max_capacity: 100,
601 };
602
603 let manager = FallbackCacheManager::new(Box::new(mock), options);
604
605 assert!(!manager.is_using_fallback());
606 }
607}