use crate::error::{Result, TidewayError};
use crate::traits::cache::Cache;
use async_trait::async_trait;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
#[derive(Clone)]
pub struct RedisCache {
client: redis::Client,
default_ttl: Duration,
health_status: Arc<AtomicBool>,
}
impl RedisCache {
pub fn new(url: &str, default_ttl: Duration) -> Result<Self> {
let client = redis::Client::open(url)
.map_err(|e| TidewayError::internal(format!("Failed to create Redis client: {}", e)))?;
Ok(Self {
client,
default_ttl,
health_status: Arc::new(AtomicBool::new(true)),
})
}
async fn get_connection(&self) -> Result<redis::aio::MultiplexedConnection> {
self.client
.get_multiplexed_async_connection()
.await
.map_err(|e| TidewayError::internal(format!("Failed to get Redis connection: {}", e)))
}
pub async fn ping(&self) -> bool {
match self.get_connection().await {
Ok(mut conn) => {
let result: redis::RedisResult<String> =
redis::cmd("PING").query_async(&mut conn).await;
let healthy = result.is_ok();
self.health_status.store(healthy, Ordering::Release);
healthy
}
Err(e) => {
tracing::warn!("Redis ping failed: {}", e);
self.health_status.store(false, Ordering::Release);
false
}
}
}
}
#[async_trait]
impl Cache for RedisCache {
async fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>> {
let mut conn = self.get_connection().await?;
let value: Option<Vec<u8>> = redis::cmd("GET")
.arg(key)
.query_async::<Option<Vec<u8>>>(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Redis GET failed: {}", e)))?;
Ok(value)
}
async fn set_bytes(&self, key: &str, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
let mut conn = self.get_connection().await?;
let ttl_seconds = ttl.or(Some(self.default_ttl)).map(|d| d.as_secs() as usize);
if let Some(ttl_secs) = ttl_seconds {
redis::cmd("SETEX")
.arg(key)
.arg(ttl_secs)
.arg(value)
.query_async::<()>(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Redis SETEX failed: {}", e)))?;
} else {
redis::cmd("SET")
.arg(key)
.arg(value)
.query_async::<()>(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Redis SET failed: {}", e)))?;
}
Ok(())
}
async fn delete(&self, key: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
redis::cmd("DEL")
.arg(key)
.query_async::<()>(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Redis DEL failed: {}", e)))?;
Ok(())
}
async fn clear(&self) -> Result<()> {
let mut conn = self.get_connection().await?;
redis::cmd("FLUSHDB")
.query_async::<()>(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Redis FLUSHDB failed: {}", e)))?;
Ok(())
}
fn is_healthy(&self) -> bool {
self.health_status.load(Ordering::Acquire)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::traits::cache::CacheExt;
#[tokio::test]
#[ignore] async fn test_redis_cache() {
let cache = RedisCache::new("redis://127.0.0.1/", Duration::from_secs(3600)).unwrap();
cache.set("test_key", &"test_value", None).await.unwrap();
let value: Option<String> = cache.get("test_key").await.unwrap();
assert_eq!(value, Some("test_value".to_string()));
cache.delete("test_key").await.unwrap();
let value: Option<String> = cache.get("test_key").await.unwrap();
assert_eq!(value, None);
}
}