use crate::memory_cache_manager::MemoryCacheManager;
use async_trait::async_trait;
use sockudo_core::cache::CacheManager;
use sockudo_core::error::Result;
use sockudo_core::options::MemoryCacheOptions;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, info, warn};
const RECOVERY_CHECK_INTERVAL_SECS: u64 = 30;
pub struct FallbackCacheManager {
primary: Mutex<Box<dyn CacheManager + Send + Sync>>,
fallback: Mutex<MemoryCacheManager>,
using_fallback: AtomicBool,
last_failure_time: AtomicU64,
start_time: Instant,
recovery_lock: RwLock<()>,
}
impl FallbackCacheManager {
pub fn new(
primary: Box<dyn CacheManager + Send + Sync>,
fallback_options: MemoryCacheOptions,
) -> Self {
let fallback = MemoryCacheManager::new("fallback_cache".to_string(), fallback_options);
Self {
primary: Mutex::new(primary),
fallback: Mutex::new(fallback),
using_fallback: AtomicBool::new(false),
last_failure_time: AtomicU64::new(0),
start_time: Instant::now(),
recovery_lock: RwLock::new(()),
}
}
pub async fn new_with_health_check(
primary: Box<dyn CacheManager + Send + Sync>,
fallback_options: MemoryCacheOptions,
) -> Self {
let fallback = MemoryCacheManager::new("fallback_cache".to_string(), fallback_options);
let start_time = Instant::now();
let using_fallback = match primary.check_health().await {
Ok(()) => {
debug!("Primary cache is healthy at startup");
false
}
Err(e) => {
warn!(
"Primary cache unavailable at startup, starting in fallback mode. Error: {}",
e
);
true
}
};
let last_failure_time = if using_fallback {
start_time.elapsed().as_secs()
} else {
0
};
Self {
primary: Mutex::new(primary),
fallback: Mutex::new(fallback),
using_fallback: AtomicBool::new(using_fallback),
last_failure_time: AtomicU64::new(last_failure_time),
start_time,
recovery_lock: RwLock::new(()),
}
}
fn is_using_fallback(&self) -> bool {
self.using_fallback.load(Ordering::SeqCst)
}
fn switch_to_fallback(&self, error: &str) {
if !self.using_fallback.swap(true, Ordering::SeqCst) {
warn!(
"Redis cache unavailable, switching to in-memory fallback. Error: {}",
error
);
self.last_failure_time
.store(self.start_time.elapsed().as_secs(), Ordering::SeqCst);
}
}
fn should_attempt_recovery(&self) -> bool {
if !self.is_using_fallback() {
return false;
}
let last_failure = self.last_failure_time.load(Ordering::SeqCst);
let current_time = self.start_time.elapsed().as_secs();
current_time.saturating_sub(last_failure) >= RECOVERY_CHECK_INTERVAL_SECS
}
async fn try_recover(&self) -> bool {
if !self.should_attempt_recovery() {
return false;
}
let _recovery_guard = self.recovery_lock.write().await;
if !self.is_using_fallback() {
return false;
}
debug!("Attempting to recover Redis cache connection...");
let mut primary = self.primary.lock().await;
match primary.check_health().await {
Ok(()) => {
info!("Redis cache connection recovered, synchronizing fallback data...");
if let Err(e) = self.sync_fallback_to_primary(&mut primary).await {
warn!(
"Failed to sync fallback data to primary during recovery: {}",
e
);
}
self.using_fallback.store(false, Ordering::SeqCst);
info!("Successfully switched back to primary cache after recovery");
true
}
Err(e) => {
debug!("Redis cache still unavailable: {}", e);
self.last_failure_time
.store(self.start_time.elapsed().as_secs(), Ordering::SeqCst);
false
}
}
}
async fn sync_fallback_to_primary(
&self,
primary: &mut Box<dyn CacheManager + Send + Sync>,
) -> Result<()> {
let fallback = self.fallback.lock().await;
let entries = fallback.get_all_entries().await;
if entries.is_empty() {
debug!("No entries in fallback cache to sync");
return Ok(());
}
debug!(
"Syncing {} entries from fallback to primary cache",
entries.len()
);
let mut synced = 0;
let mut failed = 0;
for (key, value, ttl) in entries {
let ttl_seconds = ttl.map(|d| d.as_secs()).unwrap_or(0);
match primary.set(&key, &value, ttl_seconds).await {
Ok(()) => synced += 1,
Err(e) => {
warn!("Failed to sync key '{}' to primary cache: {}", key, e);
failed += 1;
}
}
}
if failed > 0 {
warn!(
"Synced {}/{} entries from fallback to primary ({} failed)",
synced,
synced + failed,
failed
);
} else {
info!(
"Successfully synced {} entries from fallback to primary cache",
synced
);
}
Ok(())
}
}
#[async_trait]
impl CacheManager for FallbackCacheManager {
async fn has(&self, key: &str) -> Result<bool> {
self.try_recover().await;
let _guard = self.recovery_lock.read().await;
if self.is_using_fallback() {
return self.fallback.lock().await.has(key).await;
}
let primary = self.primary.lock().await;
match primary.has(key).await {
Ok(result) => Ok(result),
Err(e) => {
self.switch_to_fallback(&e.to_string());
self.fallback.lock().await.has(key).await
}
}
}
async fn get(&self, key: &str) -> Result<Option<String>> {
self.try_recover().await;
let _guard = self.recovery_lock.read().await;
if self.is_using_fallback() {
return self.fallback.lock().await.get(key).await;
}
let primary = self.primary.lock().await;
match primary.get(key).await {
Ok(result) => Ok(result),
Err(e) => {
self.switch_to_fallback(&e.to_string());
self.fallback.lock().await.get(key).await
}
}
}
async fn set(&self, key: &str, value: &str, ttl_seconds: u64) -> Result<()> {
self.try_recover().await;
let _guard = self.recovery_lock.read().await;
if self.is_using_fallback() {
return self
.fallback
.lock()
.await
.set(key, value, ttl_seconds)
.await;
}
let primary = self.primary.lock().await;
match primary.set(key, value, ttl_seconds).await {
Ok(()) => Ok(()),
Err(e) => {
self.switch_to_fallback(&e.to_string());
self.fallback
.lock()
.await
.set(key, value, ttl_seconds)
.await
}
}
}
async fn remove(&self, key: &str) -> Result<()> {
self.try_recover().await;
let _guard = self.recovery_lock.read().await;
if self.is_using_fallback() {
return self.fallback.lock().await.remove(key).await;
}
let primary = self.primary.lock().await;
match primary.remove(key).await {
Ok(()) => Ok(()),
Err(e) => {
self.switch_to_fallback(&e.to_string());
self.fallback.lock().await.remove(key).await
}
}
}
async fn disconnect(&self) -> Result<()> {
let primary_result = self.primary.lock().await.disconnect().await;
if let Err(ref e) = primary_result {
warn!(error = ?e, "Failed to disconnect primary cache");
}
let fallback_result = self.fallback.lock().await.disconnect().await;
if let Err(ref e) = fallback_result {
warn!(error = ?e, "Failed to disconnect fallback cache");
}
match (primary_result, fallback_result) {
(Ok(_), Ok(_)) => Ok(()),
(Err(e), _) => Err(e),
(_, Err(e)) => Err(e),
}
}
async fn check_health(&self) -> Result<()> {
if self.is_using_fallback() {
let fallback = self.fallback.lock().await;
return fallback.check_health().await;
}
let primary = self.primary.lock().await;
primary.check_health().await
}
async fn ttl(&self, key: &str) -> Result<Option<Duration>> {
self.try_recover().await;
let _guard = self.recovery_lock.read().await;
if self.is_using_fallback() {
return self.fallback.lock().await.ttl(key).await;
}
let primary = self.primary.lock().await;
match primary.ttl(key).await {
Ok(result) => Ok(result),
Err(e) => {
self.switch_to_fallback(&e.to_string());
self.fallback.lock().await.ttl(key).await
}
}
}
async fn set_if_not_exists(&self, key: &str, value: &str, ttl_seconds: u64) -> Result<bool> {
self.try_recover().await;
let _guard = self.recovery_lock.read().await;
if self.is_using_fallback() {
return self
.fallback
.lock()
.await
.set_if_not_exists(key, value, ttl_seconds)
.await;
}
let primary = self.primary.lock().await;
match primary.set_if_not_exists(key, value, ttl_seconds).await {
Ok(result) => Ok(result),
Err(e) => {
self.switch_to_fallback(&e.to_string());
self.fallback
.lock()
.await
.set_if_not_exists(key, value, ttl_seconds)
.await
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory_cache_manager::MemoryCacheManager;
use sockudo_core::error::Error;
use std::sync::Arc;
use tokio::sync::Mutex as TokioMutex;
struct MockCache {
should_fail: Arc<TokioMutex<bool>>,
data: Arc<TokioMutex<std::collections::HashMap<String, String>>>,
}
impl MockCache {
fn new() -> Self {
Self {
should_fail: Arc::new(TokioMutex::new(false)),
data: Arc::new(TokioMutex::new(std::collections::HashMap::new())),
}
}
#[allow(dead_code)]
async fn set_should_fail(&self, should_fail: bool) {
*self.should_fail.lock().await = should_fail;
}
}
#[async_trait]
impl CacheManager for MockCache {
async fn has(&self, key: &str) -> Result<bool> {
if *self.should_fail.lock().await {
return Err(Error::Cache("Mock failure".to_string()));
}
Ok(self.data.lock().await.contains_key(key))
}
async fn get(&self, key: &str) -> Result<Option<String>> {
if *self.should_fail.lock().await {
return Err(Error::Cache("Mock failure".to_string()));
}
Ok(self.data.lock().await.get(key).cloned())
}
async fn set(&self, key: &str, value: &str, _ttl_seconds: u64) -> Result<()> {
if *self.should_fail.lock().await {
return Err(Error::Cache("Mock failure".to_string()));
}
self.data
.lock()
.await
.insert(key.to_string(), value.to_string());
Ok(())
}
async fn remove(&self, key: &str) -> Result<()> {
if *self.should_fail.lock().await {
return Err(Error::Cache("Mock failure".to_string()));
}
self.data.lock().await.remove(key);
Ok(())
}
async fn disconnect(&self) -> Result<()> {
Ok(())
}
async fn check_health(&self) -> Result<()> {
if *self.should_fail.lock().await {
return Err(Error::Cache("Mock failure".to_string()));
}
Ok(())
}
async fn ttl(&self, _key: &str) -> Result<Option<Duration>> {
if *self.should_fail.lock().await {
return Err(Error::Cache("Mock failure".to_string()));
}
Ok(Some(Duration::from_secs(60)))
}
}
#[tokio::test]
async fn test_fallback_on_primary_failure() {
let mock = MockCache::new();
let should_fail = mock.should_fail.clone();
let options = MemoryCacheOptions {
ttl: 60,
cleanup_interval: 60,
max_capacity: 100,
};
let manager = FallbackCacheManager::new(Box::new(mock), options);
assert!(!manager.is_using_fallback());
*should_fail.lock().await = true;
let result = manager.set("test_key", "test_value", 60).await;
assert!(result.is_ok());
assert!(manager.is_using_fallback());
let value = manager.get("test_key").await.unwrap();
assert_eq!(value, Some("test_value".to_string()));
}
#[tokio::test]
async fn test_recovery_with_data_sync() {
let mock = MockCache::new();
let should_fail = mock.should_fail.clone();
let options = MemoryCacheOptions {
ttl: 60,
cleanup_interval: 60,
max_capacity: 100,
};
let manager = FallbackCacheManager::new(Box::new(mock), options);
*should_fail.lock().await = true;
manager.set("key1", "value1", 60).await.unwrap();
manager.set("key2", "value2", 60).await.unwrap();
assert!(manager.is_using_fallback());
let value1_fallback = manager.get("key1").await.unwrap();
assert_eq!(value1_fallback, Some("value1".to_string()));
*should_fail.lock().await = false;
manager.using_fallback.store(true, Ordering::SeqCst);
manager.last_failure_time.store(0, Ordering::SeqCst);
assert!(manager.is_using_fallback());
let value2_fallback = manager.get("key2").await.unwrap();
assert_eq!(value2_fallback, Some("value2".to_string()));
}
#[tokio::test]
async fn test_no_race_condition_during_operations() {
let mock = MockCache::new();
let should_fail = mock.should_fail.clone();
let options = MemoryCacheOptions {
ttl: 60,
cleanup_interval: 60,
max_capacity: 100,
};
let manager = Arc::new(TokioMutex::new(FallbackCacheManager::new(
Box::new(mock),
options,
)));
*should_fail.lock().await = true;
manager.lock().await.set("test", "value", 60).await.unwrap();
assert!(manager.lock().await.is_using_fallback());
let handles: Vec<_> = (0..10)
.map(|i| {
let manager_clone = manager.clone();
tokio::spawn(async move {
let mgr = manager_clone.lock().await;
mgr.set(&format!("key{}", i), &format!("value{}", i), 60)
.await
})
})
.collect();
for handle in handles {
assert!(handle.await.unwrap().is_ok());
}
assert!(manager.lock().await.is_using_fallback());
for i in 0..10 {
let mgr = manager.lock().await;
let value = mgr.get(&format!("key{}", i)).await.unwrap();
assert_eq!(value, Some(format!("value{}", i)));
}
}
#[tokio::test]
async fn test_memory_cache_get_all_entries() {
let options = MemoryCacheOptions {
ttl: 60,
cleanup_interval: 60,
max_capacity: 100,
};
let cache = MemoryCacheManager::new("test_prefix".to_string(), options);
cache.set("key1", "value1", 60).await.unwrap();
cache.set("key2", "value2", 60).await.unwrap();
cache.set("key3", "value3", 60).await.unwrap();
let entries = cache.get_all_entries().await;
assert_eq!(entries.len(), 3);
let keys: Vec<String> = entries.iter().map(|(k, _, _)| k.clone()).collect();
assert!(keys.contains(&"key1".to_string()));
assert!(keys.contains(&"key2".to_string()));
assert!(keys.contains(&"key3".to_string()));
for (key, value, ttl) in entries {
match key.as_str() {
"key1" => assert_eq!(value, "value1"),
"key2" => assert_eq!(value, "value2"),
"key3" => assert_eq!(value, "value3"),
_ => panic!("Unexpected key: {}", key),
}
assert_eq!(ttl, Some(Duration::from_secs(60)));
}
}
#[tokio::test]
async fn test_new_with_health_check_healthy_primary() {
let mock = MockCache::new();
let options = MemoryCacheOptions {
ttl: 60,
cleanup_interval: 60,
max_capacity: 100,
};
let manager = FallbackCacheManager::new_with_health_check(Box::new(mock), options).await;
assert!(!manager.is_using_fallback());
}
#[tokio::test]
async fn test_new_with_health_check_unhealthy_primary() {
let mock = MockCache::new();
let should_fail = mock.should_fail.clone();
*should_fail.lock().await = true;
let options = MemoryCacheOptions {
ttl: 60,
cleanup_interval: 60,
max_capacity: 100,
};
let manager = FallbackCacheManager::new_with_health_check(Box::new(mock), options).await;
assert!(manager.is_using_fallback());
}
#[tokio::test]
async fn test_new_without_health_check() {
let mock = MockCache::new();
let should_fail = mock.should_fail.clone();
*should_fail.lock().await = true;
let options = MemoryCacheOptions {
ttl: 60,
cleanup_interval: 60,
max_capacity: 100,
};
let manager = FallbackCacheManager::new(Box::new(mock), options);
assert!(!manager.is_using_fallback());
}
}