1use crate::memory_cache_manager::MemoryCacheManager;
2use async_trait::async_trait;
3use sockudo_core::cache::CacheManager;
4use sockudo_core::error::Result;
5use sockudo_core::options::MemoryCacheOptions;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8use tokio::sync::{Mutex, RwLock};
9use tracing::{debug, info, warn};
10
11const RECOVERY_CHECK_INTERVAL_SECS: u64 = 30;
12
13pub struct FallbackCacheManager {
19 primary: Mutex<Box<dyn CacheManager + Send + Sync>>,
20 fallback: Mutex<MemoryCacheManager>,
21 using_fallback: AtomicBool,
22 last_failure_time: AtomicU64,
23 start_time: Instant,
24 recovery_lock: RwLock<()>,
26}
27
28impl FallbackCacheManager {
29 pub fn new(
33 primary: Box<dyn CacheManager + Send + Sync>,
34 fallback_options: MemoryCacheOptions,
35 ) -> Self {
36 let fallback = MemoryCacheManager::new("fallback_cache".to_string(), fallback_options);
37
38 Self {
39 primary: Mutex::new(primary),
40 fallback: Mutex::new(fallback),
41 using_fallback: AtomicBool::new(false),
42 last_failure_time: AtomicU64::new(0),
43 start_time: Instant::now(),
44 recovery_lock: RwLock::new(()),
45 }
46 }
47
48 pub async fn new_with_health_check(
52 primary: Box<dyn CacheManager + Send + Sync>,
53 fallback_options: MemoryCacheOptions,
54 ) -> Self {
55 let fallback = MemoryCacheManager::new("fallback_cache".to_string(), fallback_options);
56 let start_time = Instant::now();
57
58 let using_fallback = match primary.check_health().await {
59 Ok(()) => {
60 debug!("Primary cache is healthy at startup");
61 false
62 }
63 Err(e) => {
64 warn!(
65 "Primary cache unavailable at startup, starting in fallback mode. Error: {}",
66 e
67 );
68 true
69 }
70 };
71
72 let last_failure_time = if using_fallback {
73 start_time.elapsed().as_secs()
74 } else {
75 0
76 };
77
78 Self {
79 primary: Mutex::new(primary),
80 fallback: Mutex::new(fallback),
81 using_fallback: AtomicBool::new(using_fallback),
82 last_failure_time: AtomicU64::new(last_failure_time),
83 start_time,
84 recovery_lock: RwLock::new(()),
85 }
86 }
87
88 fn is_using_fallback(&self) -> bool {
89 self.using_fallback.load(Ordering::SeqCst)
90 }
91
92 fn switch_to_fallback(&self, error: &str) {
93 if !self.using_fallback.swap(true, Ordering::SeqCst) {
94 warn!(
95 "Redis cache unavailable, switching to in-memory fallback. Error: {}",
96 error
97 );
98 self.last_failure_time
99 .store(self.start_time.elapsed().as_secs(), Ordering::SeqCst);
100 }
101 }
102
103 fn should_attempt_recovery(&self) -> bool {
104 if !self.is_using_fallback() {
105 return false;
106 }
107
108 let last_failure = self.last_failure_time.load(Ordering::SeqCst);
109 let current_time = self.start_time.elapsed().as_secs();
110
111 current_time.saturating_sub(last_failure) >= RECOVERY_CHECK_INTERVAL_SECS
112 }
113
114 async fn try_recover(&self) -> bool {
115 if !self.should_attempt_recovery() {
116 return false;
117 }
118
119 let _recovery_guard = self.recovery_lock.write().await;
121
122 if !self.is_using_fallback() {
124 return false;
125 }
126
127 debug!("Attempting to recover Redis cache connection...");
128
129 let mut primary = self.primary.lock().await;
130 match primary.check_health().await {
131 Ok(()) => {
132 info!("Redis cache connection recovered, synchronizing fallback data...");
133
134 if let Err(e) = self.sync_fallback_to_primary(&mut primary).await {
135 warn!(
136 "Failed to sync fallback data to primary during recovery: {}",
137 e
138 );
139 }
140
141 self.using_fallback.store(false, Ordering::SeqCst);
142 info!("Successfully switched back to primary cache after recovery");
143 true
144 }
145 Err(e) => {
146 debug!("Redis cache still unavailable: {}", e);
147 self.last_failure_time
148 .store(self.start_time.elapsed().as_secs(), Ordering::SeqCst);
149 false
150 }
151 }
152 }
153
154 async fn sync_fallback_to_primary(
157 &self,
158 primary: &mut Box<dyn CacheManager + Send + Sync>,
159 ) -> Result<()> {
160 let fallback = self.fallback.lock().await;
161
162 let entries = fallback.get_all_entries().await;
163
164 if entries.is_empty() {
165 debug!("No entries in fallback cache to sync");
166 return Ok(());
167 }
168
169 debug!(
170 "Syncing {} entries from fallback to primary cache",
171 entries.len()
172 );
173
174 let mut synced = 0;
175 let mut failed = 0;
176
177 for (key, value, ttl) in entries {
178 let ttl_seconds = ttl.map(|d| d.as_secs()).unwrap_or(0);
179 match primary.set(&key, &value, ttl_seconds).await {
180 Ok(()) => synced += 1,
181 Err(e) => {
182 warn!("Failed to sync key '{}' to primary cache: {}", key, e);
183 failed += 1;
184 }
185 }
186 }
187
188 if failed > 0 {
189 warn!(
190 "Synced {}/{} entries from fallback to primary ({} failed)",
191 synced,
192 synced + failed,
193 failed
194 );
195 } else {
196 info!(
197 "Successfully synced {} entries from fallback to primary cache",
198 synced
199 );
200 }
201
202 Ok(())
203 }
204}
205
206#[async_trait]
207impl CacheManager for FallbackCacheManager {
208 async fn has(&self, key: &str) -> Result<bool> {
209 self.try_recover().await;
210
211 let _guard = self.recovery_lock.read().await;
212
213 if self.is_using_fallback() {
214 return self.fallback.lock().await.has(key).await;
215 }
216
217 let primary = self.primary.lock().await;
218 match primary.has(key).await {
219 Ok(result) => Ok(result),
220 Err(e) => {
221 self.switch_to_fallback(&e.to_string());
222 self.fallback.lock().await.has(key).await
223 }
224 }
225 }
226
227 async fn get(&self, key: &str) -> Result<Option<String>> {
228 self.try_recover().await;
229
230 let _guard = self.recovery_lock.read().await;
231
232 if self.is_using_fallback() {
233 return self.fallback.lock().await.get(key).await;
234 }
235
236 let primary = self.primary.lock().await;
237 match primary.get(key).await {
238 Ok(result) => Ok(result),
239 Err(e) => {
240 self.switch_to_fallback(&e.to_string());
241 self.fallback.lock().await.get(key).await
242 }
243 }
244 }
245
246 async fn set(&self, key: &str, value: &str, ttl_seconds: u64) -> Result<()> {
247 self.try_recover().await;
248
249 let _guard = self.recovery_lock.read().await;
250
251 if self.is_using_fallback() {
252 return self
253 .fallback
254 .lock()
255 .await
256 .set(key, value, ttl_seconds)
257 .await;
258 }
259
260 let primary = self.primary.lock().await;
261 match primary.set(key, value, ttl_seconds).await {
262 Ok(()) => Ok(()),
263 Err(e) => {
264 self.switch_to_fallback(&e.to_string());
265 self.fallback
266 .lock()
267 .await
268 .set(key, value, ttl_seconds)
269 .await
270 }
271 }
272 }
273
274 async fn remove(&self, key: &str) -> Result<()> {
275 self.try_recover().await;
276
277 let _guard = self.recovery_lock.read().await;
278
279 if self.is_using_fallback() {
280 return self.fallback.lock().await.remove(key).await;
281 }
282
283 let primary = self.primary.lock().await;
284 match primary.remove(key).await {
285 Ok(()) => Ok(()),
286 Err(e) => {
287 self.switch_to_fallback(&e.to_string());
288 self.fallback.lock().await.remove(key).await
289 }
290 }
291 }
292
293 async fn disconnect(&self) -> Result<()> {
294 let primary_result = self.primary.lock().await.disconnect().await;
295 if let Err(ref e) = primary_result {
296 warn!(error = ?e, "Failed to disconnect primary cache");
297 }
298
299 let fallback_result = self.fallback.lock().await.disconnect().await;
300 if let Err(ref e) = fallback_result {
301 warn!(error = ?e, "Failed to disconnect fallback cache");
302 }
303
304 match (primary_result, fallback_result) {
305 (Ok(_), Ok(_)) => Ok(()),
306 (Err(e), _) => Err(e),
307 (_, Err(e)) => Err(e),
308 }
309 }
310
311 async fn check_health(&self) -> Result<()> {
312 if self.is_using_fallback() {
313 let fallback = self.fallback.lock().await;
314 return fallback.check_health().await;
315 }
316
317 let primary = self.primary.lock().await;
318 primary.check_health().await
319 }
320
321 async fn ttl(&self, key: &str) -> Result<Option<Duration>> {
322 self.try_recover().await;
323
324 let _guard = self.recovery_lock.read().await;
325
326 if self.is_using_fallback() {
327 return self.fallback.lock().await.ttl(key).await;
328 }
329
330 let primary = self.primary.lock().await;
331 match primary.ttl(key).await {
332 Ok(result) => Ok(result),
333 Err(e) => {
334 self.switch_to_fallback(&e.to_string());
335 self.fallback.lock().await.ttl(key).await
336 }
337 }
338 }
339
340 async fn set_if_not_exists(&self, key: &str, value: &str, ttl_seconds: u64) -> Result<bool> {
341 self.try_recover().await;
342
343 let _guard = self.recovery_lock.read().await;
344
345 if self.is_using_fallback() {
346 return self
347 .fallback
348 .lock()
349 .await
350 .set_if_not_exists(key, value, ttl_seconds)
351 .await;
352 }
353
354 let primary = self.primary.lock().await;
355 match primary.set_if_not_exists(key, value, ttl_seconds).await {
356 Ok(result) => Ok(result),
357 Err(e) => {
358 self.switch_to_fallback(&e.to_string());
359 self.fallback
360 .lock()
361 .await
362 .set_if_not_exists(key, value, ttl_seconds)
363 .await
364 }
365 }
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 use crate::memory_cache_manager::MemoryCacheManager;
373 use sockudo_core::error::Error;
374 use std::sync::Arc;
375 use tokio::sync::Mutex as TokioMutex;
376
377 struct MockCache {
379 should_fail: Arc<TokioMutex<bool>>,
380 data: Arc<TokioMutex<std::collections::HashMap<String, String>>>,
381 }
382
383 impl MockCache {
384 fn new() -> Self {
385 Self {
386 should_fail: Arc::new(TokioMutex::new(false)),
387 data: Arc::new(TokioMutex::new(std::collections::HashMap::new())),
388 }
389 }
390
391 #[allow(dead_code)]
392 async fn set_should_fail(&self, should_fail: bool) {
393 *self.should_fail.lock().await = should_fail;
394 }
395 }
396
397 #[async_trait]
398 impl CacheManager for MockCache {
399 async fn has(&self, key: &str) -> Result<bool> {
400 if *self.should_fail.lock().await {
401 return Err(Error::Cache("Mock failure".to_string()));
402 }
403 Ok(self.data.lock().await.contains_key(key))
404 }
405
406 async fn get(&self, key: &str) -> Result<Option<String>> {
407 if *self.should_fail.lock().await {
408 return Err(Error::Cache("Mock failure".to_string()));
409 }
410 Ok(self.data.lock().await.get(key).cloned())
411 }
412
413 async fn set(&self, key: &str, value: &str, _ttl_seconds: u64) -> Result<()> {
414 if *self.should_fail.lock().await {
415 return Err(Error::Cache("Mock failure".to_string()));
416 }
417 self.data
418 .lock()
419 .await
420 .insert(key.to_string(), value.to_string());
421 Ok(())
422 }
423
424 async fn remove(&self, key: &str) -> Result<()> {
425 if *self.should_fail.lock().await {
426 return Err(Error::Cache("Mock failure".to_string()));
427 }
428 self.data.lock().await.remove(key);
429 Ok(())
430 }
431
432 async fn disconnect(&self) -> Result<()> {
433 Ok(())
434 }
435
436 async fn check_health(&self) -> Result<()> {
437 if *self.should_fail.lock().await {
438 return Err(Error::Cache("Mock failure".to_string()));
439 }
440 Ok(())
441 }
442
443 async fn ttl(&self, _key: &str) -> Result<Option<Duration>> {
444 if *self.should_fail.lock().await {
445 return Err(Error::Cache("Mock failure".to_string()));
446 }
447 Ok(Some(Duration::from_secs(60)))
448 }
449 }
450
451 #[tokio::test]
452 async fn test_fallback_on_primary_failure() {
453 let mock = MockCache::new();
454 let should_fail = mock.should_fail.clone();
455
456 let options = MemoryCacheOptions {
457 ttl: 60,
458 cleanup_interval: 60,
459 max_capacity: 100,
460 };
461
462 let manager = FallbackCacheManager::new(Box::new(mock), options);
463
464 assert!(!manager.is_using_fallback());
465
466 *should_fail.lock().await = true;
467
468 let result = manager.set("test_key", "test_value", 60).await;
469 assert!(result.is_ok());
470
471 assert!(manager.is_using_fallback());
472
473 let value = manager.get("test_key").await.unwrap();
474 assert_eq!(value, Some("test_value".to_string()));
475 }
476
477 #[tokio::test]
478 async fn test_recovery_with_data_sync() {
479 let mock = MockCache::new();
480 let should_fail = mock.should_fail.clone();
481
482 let options = MemoryCacheOptions {
483 ttl: 60,
484 cleanup_interval: 60,
485 max_capacity: 100,
486 };
487
488 let manager = FallbackCacheManager::new(Box::new(mock), options);
489
490 *should_fail.lock().await = true;
491 manager.set("key1", "value1", 60).await.unwrap();
492 manager.set("key2", "value2", 60).await.unwrap();
493
494 assert!(manager.is_using_fallback());
495
496 let value1_fallback = manager.get("key1").await.unwrap();
497 assert_eq!(value1_fallback, Some("value1".to_string()));
498
499 *should_fail.lock().await = false;
500
501 manager.using_fallback.store(true, Ordering::SeqCst);
502 manager.last_failure_time.store(0, Ordering::SeqCst);
503
504 assert!(manager.is_using_fallback());
505 let value2_fallback = manager.get("key2").await.unwrap();
506 assert_eq!(value2_fallback, Some("value2".to_string()));
507 }
508
509 #[tokio::test]
510 async fn test_no_race_condition_during_operations() {
511 let mock = MockCache::new();
512 let should_fail = mock.should_fail.clone();
513
514 let options = MemoryCacheOptions {
515 ttl: 60,
516 cleanup_interval: 60,
517 max_capacity: 100,
518 };
519
520 let manager = Arc::new(TokioMutex::new(FallbackCacheManager::new(
521 Box::new(mock),
522 options,
523 )));
524
525 *should_fail.lock().await = true;
526 manager.lock().await.set("test", "value", 60).await.unwrap();
527
528 assert!(manager.lock().await.is_using_fallback());
529
530 let handles: Vec<_> = (0..10)
531 .map(|i| {
532 let manager_clone = manager.clone();
533 tokio::spawn(async move {
534 let mgr = manager_clone.lock().await;
535 mgr.set(&format!("key{}", i), &format!("value{}", i), 60)
536 .await
537 })
538 })
539 .collect();
540
541 for handle in handles {
542 assert!(handle.await.unwrap().is_ok());
543 }
544
545 assert!(manager.lock().await.is_using_fallback());
546
547 for i in 0..10 {
548 let mgr = manager.lock().await;
549 let value = mgr.get(&format!("key{}", i)).await.unwrap();
550 assert_eq!(value, Some(format!("value{}", i)));
551 }
552 }
553
554 #[tokio::test]
555 async fn test_memory_cache_get_all_entries() {
556 let options = MemoryCacheOptions {
557 ttl: 60,
558 cleanup_interval: 60,
559 max_capacity: 100,
560 };
561
562 let cache = MemoryCacheManager::new("test_prefix".to_string(), options);
563
564 cache.set("key1", "value1", 60).await.unwrap();
565 cache.set("key2", "value2", 60).await.unwrap();
566 cache.set("key3", "value3", 60).await.unwrap();
567
568 let entries = cache.get_all_entries().await;
569
570 assert_eq!(entries.len(), 3);
571
572 let keys: Vec<String> = entries.iter().map(|(k, _, _)| k.clone()).collect();
573 assert!(keys.contains(&"key1".to_string()));
574 assert!(keys.contains(&"key2".to_string()));
575 assert!(keys.contains(&"key3".to_string()));
576
577 for (key, value, ttl) in entries {
578 match key.as_str() {
579 "key1" => assert_eq!(value, "value1"),
580 "key2" => assert_eq!(value, "value2"),
581 "key3" => assert_eq!(value, "value3"),
582 _ => panic!("Unexpected key: {}", key),
583 }
584 assert_eq!(ttl, Some(Duration::from_secs(60)));
585 }
586 }
587
588 #[tokio::test]
589 async fn test_new_with_health_check_healthy_primary() {
590 let mock = MockCache::new();
591 let options = MemoryCacheOptions {
592 ttl: 60,
593 cleanup_interval: 60,
594 max_capacity: 100,
595 };
596
597 let manager = FallbackCacheManager::new_with_health_check(Box::new(mock), options).await;
598
599 assert!(!manager.is_using_fallback());
600 }
601
602 #[tokio::test]
603 async fn test_new_with_health_check_unhealthy_primary() {
604 let mock = MockCache::new();
605 let should_fail = mock.should_fail.clone();
606 *should_fail.lock().await = true;
607
608 let options = MemoryCacheOptions {
609 ttl: 60,
610 cleanup_interval: 60,
611 max_capacity: 100,
612 };
613
614 let manager = FallbackCacheManager::new_with_health_check(Box::new(mock), options).await;
615
616 assert!(manager.is_using_fallback());
617 }
618
619 #[tokio::test]
620 async fn test_new_without_health_check() {
621 let mock = MockCache::new();
622 let should_fail = mock.should_fail.clone();
623 *should_fail.lock().await = true;
624
625 let options = MemoryCacheOptions {
626 ttl: 60,
627 cleanup_interval: 60,
628 max_capacity: 100,
629 };
630
631 let manager = FallbackCacheManager::new(Box::new(mock), options);
632
633 assert!(!manager.is_using_fallback());
634 }
635}