use crate::{AsyncCache, AsyncLocalCache, AsyncRedisConnectionManager, CacheStats, RedissonError, RedissonResult};
use async_trait::async_trait;
use redis::AsyncCommands;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;
pub struct AsyncRedisIntegratedCache<K, V> {
connection_manager: Arc<AsyncRedisConnectionManager>,
local_cache: Arc<AsyncLocalCache<K, V>>,
cache_key_prefix: String,
read_through: bool,
write_through: bool,
use_local_cache: bool,
}
impl<K: Eq + Hash + Clone + Serialize + DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static> AsyncRedisIntegratedCache<K, V> {
pub fn new(
connection_manager: Arc<AsyncRedisConnectionManager>,
cache_name: &str,
ttl: Duration,
max_size: usize,
) -> Self {
let stats = Arc::new(tokio::sync::RwLock::new(CacheStats::new()));
let local_cache = AsyncLocalCache::new(
cache_name.to_string(),
ttl,
max_size,
stats,
);
Self {
connection_manager,
local_cache: Arc::new(local_cache),
cache_key_prefix: format!("cache:{}", cache_name),
read_through: true,
write_through: true,
use_local_cache: true,
}
}
#[inline]
pub fn with_read_through(mut self, enabled: bool) -> Self {
self.read_through = enabled;
self
}
#[inline]
pub fn with_write_through(mut self, enabled: bool) -> Self {
self.write_through = enabled;
self
}
#[inline]
pub fn with_local_cache(mut self, enabled: bool) -> Self {
self.use_local_cache = enabled;
self
}
fn build_redis_key(&self, key: &K) -> RedissonResult<String> {
let key_json = serde_json::to_string(key)
.map_err(|e| crate::errors::RedissonError::SerializationError(e.to_string()))?;
Ok(format!("{}:{}", self.cache_key_prefix, key_json))
}
#[inline]
pub fn get_local_cache(&self) -> &Arc<AsyncLocalCache<K, V>> {
&self.local_cache
}
pub async fn get_map(&self, keys: &[K]) -> RedissonResult<HashMap<K, V>> {
let mut result = HashMap::with_capacity(keys.len());
let mut missing_keys = Vec::new();
if self.use_local_cache {
let local_results = self.local_cache.get_map(keys).await;
for (key, value) in local_results {
result.insert(key, value);
}
for key in keys {
if !result.contains_key(key) {
missing_keys.push(key.clone());
}
}
} else {
missing_keys.extend_from_slice(keys);
}
if self.read_through && !missing_keys.is_empty() {
let mut conn = self.connection_manager.get_connection().await?;
let mut pipe = redis::pipe();
for key in &missing_keys {
let redis_key = self.build_redis_key(key)?;
pipe.get(&redis_key);
}
let redis_results: Vec<Option<String>> = pipe.query_async(&mut conn).await?;
for (i, redis_result) in redis_results.into_iter().enumerate() {
if let Some(json) = redis_result {
if let Ok(value) = serde_json::from_str::<V>(&json) {
let key = missing_keys[i].clone();
if self.use_local_cache {
let _ = self.local_cache.set(key.clone(), value.clone());
}
result.insert(key, value);
}
}
}
}
Ok(result)
}
pub async fn set_multi(&self, items: impl IntoIterator<Item = (K, V)>) -> RedissonResult<()> {
let items: Vec<(K, V)> = items.into_iter().collect();
if self.use_local_cache {
self.local_cache.set_multi(items.iter().cloned()).await;
}
if self.write_through && !items.is_empty() {
let mut conn = self.connection_manager.get_connection().await?;
let mut pipe = redis::pipe();
for (key, value) in items {
let redis_key = self.build_redis_key(&key)?;
let value_json = serde_json::to_string(&value)
.map_err(|e| crate::errors::RedissonError::SerializationError(e.to_string()))?;
let ttl_secs = self.local_cache.ttl.as_secs();
pipe.set_ex(&redis_key, value_json, ttl_secs);
}
pipe.query_async::<()>(&mut conn).await?;
}
Ok(())
}
pub async fn check_redis_connection(&self) -> RedissonResult<bool> {
match self.connection_manager.get_connection().await {
Ok(mut conn) => {
let result: Result<String, _> = redis::cmd("PING").query_async(&mut conn).await;
Ok(result.is_ok())
}
Err(_) => Ok(false),
}
}
}
#[async_trait]
impl<K, V> AsyncCache<K, V> for AsyncRedisIntegratedCache<K, V>
where
K: Eq + Hash + Clone + Serialize + DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
async fn get(&self, key: &K) -> RedissonResult<Option<V>> {
if self.use_local_cache {
if let Some(value) = self.local_cache.get(key).await? {
return Ok(Some(value));
}
}
if !self.read_through {
return Ok(None);
}
let redis_key = self.build_redis_key(key)?;
let mut conn = self.connection_manager.get_connection().await?;
let value_json: Option<String> = conn.get(&redis_key).await?;
if let Some(json) = value_json {
let value: V = serde_json::from_str(&json)
.map_err(|e| crate::errors::RedissonError::DeserializationError(e.to_string()))?;
if self.use_local_cache {
self.local_cache.set(key.clone(), value.clone()).await?;
}
Ok(Some(value))
} else {
Ok(None)
}
}
async fn set(&self, key: K, value: V) -> RedissonResult<()> {
if self.use_local_cache {
self.local_cache.set(key.clone(), value.clone()).await?;
}
if !self.write_through {
return Ok(());
}
let redis_key = self.build_redis_key(&key)?;
let value_json = serde_json::to_string(&value)
.map_err(|e| RedissonError::SerializationError(e.to_string()))?;
let mut conn = self.connection_manager.get_connection().await?;
let ttl_secs = self.local_cache.ttl.as_secs() as i64;
redis::pipe()
.atomic()
.set(&redis_key, &value_json)
.expire(&redis_key, ttl_secs)
.query_async::<()>(&mut conn)
.await
.map_err(|e| RedissonError::RedisError(e))?;
Ok(())
}
async fn remove(&self, key: &K) -> RedissonResult<bool> {
if self.use_local_cache {
self.local_cache.remove(key).await?;
}
if !self.write_through {
return Ok(true);
}
let redis_key = self.build_redis_key(key)?;
let mut conn = self.connection_manager.get_connection().await?;
let deleted: i32 = conn.del(&redis_key).await?;
Ok(deleted > 0)
}
async fn clear(&self) -> RedissonResult<()> {
if self.use_local_cache {
self.local_cache.clear().await?;
}
if !self.write_through {
return Ok(());
}
let pattern = format!("{}:*", self.cache_key_prefix);
let mut conn = self.connection_manager.get_connection().await?;
let keys: Vec<String> = conn.keys(&pattern).await?;
if !keys.is_empty() {
conn.del::<_, ()>(keys).await?;
}
Ok(())
}
async fn refresh(&self, key: &K) -> RedissonResult<bool> {
let redis_key = self.build_redis_key(key)?;
let mut conn = self.connection_manager.get_connection().await?;
let value_json: Option<String> = conn.get(&redis_key).await?;
if let Some(json) = value_json {
let value: V = serde_json::from_str(&json)
.map_err(|e| RedissonError::DeserializationError(e.to_string()))?;
if self.use_local_cache {
self.local_cache.set(key.clone(), value).await?;
}
Ok(true)
} else {
if self.use_local_cache {
self.local_cache.remove(key).await?;
}
Ok(false)
}
}
}