use std::time::Duration;
use redis::AsyncCommands;
use crate::{
cache::{CacheError, CacheKey, CacheResult},
cache_redis::{
RedisCacheError, RedisCacheResult, RedisCacheStore, RedisDegradationEvent,
RedisDegradedAction, RedisOperation,
},
};
impl RedisCacheStore {
pub(super) async fn get_raw_single(
&self,
client: &redis::Client,
key: &CacheKey,
) -> CacheResult<Option<Vec<u8>>> {
let rendered = key.render();
let mut connection = match self.single_connection(client).await {
Ok(connection) => connection,
Err(error) => return self.degrade_get_error(error),
};
match self
.run_command("GET", async { connection.get(rendered).await })
.await
{
Ok(value) => Ok(value),
Err(error) => self.degrade_get_error(error),
}
}
pub(super) async fn get_raw_cluster(
&self,
client: &redis::cluster::ClusterClient,
key: &CacheKey,
) -> CacheResult<Option<Vec<u8>>> {
let rendered = key.render();
let mut connection = match self.cluster_connection(client).await {
Ok(connection) => connection,
Err(error) => return self.degrade_get_error(error),
};
match self
.run_command("GET", async { connection.get(rendered).await })
.await
{
Ok(value) => Ok(value),
Err(error) => self.degrade_get_error(error),
}
}
pub(super) async fn set_raw_single(
&self,
client: &redis::Client,
key: &CacheKey,
value: Vec<u8>,
ttl: Option<Duration>,
) -> CacheResult<()> {
let rendered = key.render();
let mut connection = match self.single_connection(client).await {
Ok(connection) => connection,
Err(error) => return self.degrade_set_error(error),
};
let result = self
.set_on_connection(&mut connection, rendered, value, ttl)
.await;
match result {
Ok(()) => Ok(()),
Err(error) => self.degrade_set_error(error),
}
}
pub(super) async fn set_raw_cluster(
&self,
client: &redis::cluster::ClusterClient,
key: &CacheKey,
value: Vec<u8>,
ttl: Option<Duration>,
) -> CacheResult<()> {
let rendered = key.render();
let mut connection = match self.cluster_connection(client).await {
Ok(connection) => connection,
Err(error) => return self.degrade_set_error(error),
};
let result = self
.set_on_connection(&mut connection, rendered, value, ttl)
.await;
match result {
Ok(()) => Ok(()),
Err(error) => self.degrade_set_error(error),
}
}
pub(super) async fn delete_single(
&self,
client: &redis::Client,
key: &CacheKey,
) -> CacheResult<()> {
let rendered = key.render();
let mut connection = match self.single_connection(client).await {
Ok(connection) => connection,
Err(error) => return self.degrade_delete_error(error),
};
match self
.run_command("DEL", async {
connection.del::<_, ()>(rendered.clone()).await
})
.await
{
Ok(()) => Ok(()),
Err(error) => {
self.enqueue_delete_retry(rendered);
self.degrade_delete_error(error)
}
}
}
pub(super) async fn delete_cluster(
&self,
client: &redis::cluster::ClusterClient,
key: &CacheKey,
) -> CacheResult<()> {
let rendered = key.render();
let mut connection = match self.cluster_connection(client).await {
Ok(connection) => connection,
Err(error) => return self.degrade_delete_error(error),
};
match self
.run_command("DEL", async {
connection.del::<_, ()>(rendered.clone()).await
})
.await
{
Ok(()) => Ok(()),
Err(error) => self.degrade_delete_error(error),
}
}
pub(super) async fn delete_many_single(
&self,
client: &redis::Client,
keys: &[CacheKey],
) -> CacheResult<()> {
if keys.is_empty() {
return Ok(());
}
let rendered = keys.iter().map(CacheKey::render).collect::<Vec<_>>();
let mut connection = match self.single_connection(client).await {
Ok(connection) => connection,
Err(error) => return self.degrade_delete_error(error),
};
match self
.run_command("DEL", async {
connection.del::<_, ()>(rendered.clone()).await
})
.await
{
Ok(()) => Ok(()),
Err(error) => {
for key in rendered {
self.enqueue_delete_retry(key);
}
self.degrade_delete_error(error)
}
}
}
pub(super) async fn delete_many_cluster(
&self,
client: &redis::cluster::ClusterClient,
keys: &[CacheKey],
) -> CacheResult<()> {
let mut failures = Vec::new();
for key in keys {
if let Err(error) = self.delete_cluster(client, key).await {
failures.push(error.to_string());
}
}
if failures.is_empty() {
Ok(())
} else {
Err(CacheError::Backend(format!(
"redis cluster delete_many failed: {}",
failures.join("; ")
)))
}
}
async fn set_on_connection<C>(
&self,
connection: &mut C,
rendered: String,
value: Vec<u8>,
ttl: Option<Duration>,
) -> RedisCacheResult<()>
where
C: AsyncCommands + Send + Sync,
{
let ttl = ttl.or(Some(self.config.default_ttl));
match ttl {
Some(ttl) => {
let seconds = ttl.as_secs().max(1);
self.run_command("SETEX", async {
connection
.set_ex::<_, _, ()>(rendered, value, seconds)
.await
})
.await
}
None => {
self.run_command("SET", async {
connection.set::<_, _, ()>(rendered, value).await
})
.await
}
}
}
fn degrade_error(
&self,
operation: RedisOperation,
error: RedisCacheError,
) -> Result<RedisDegradedAction, CacheError> {
let action = self.config.unavailable_policy.action_for(operation, &error);
if action == RedisDegradedAction::ReturnError {
return Err(to_cache_error(error));
}
self.recorder
.record_degradation(RedisDegradationEvent::new(operation, action));
Ok(action)
}
fn degrade_get_error<T>(&self, error: RedisCacheError) -> CacheResult<Option<T>> {
match self.degrade_error(RedisOperation::Get, error)? {
RedisDegradedAction::ReturnMiss => Ok(None),
RedisDegradedAction::SkipWrite | RedisDegradedAction::ReturnError => {
unreachable!("return_error handled by degrade_error")
}
}
}
fn degrade_set_error(&self, error: RedisCacheError) -> CacheResult<()> {
match self.degrade_error(RedisOperation::Set, error)? {
RedisDegradedAction::SkipWrite => Ok(()),
RedisDegradedAction::ReturnMiss | RedisDegradedAction::ReturnError => {
unreachable!("return_error handled by degrade_error")
}
}
}
fn degrade_delete_error(&self, error: RedisCacheError) -> CacheResult<()> {
match self.degrade_error(RedisOperation::Delete, error)? {
RedisDegradedAction::SkipWrite => Ok(()),
RedisDegradedAction::ReturnMiss | RedisDegradedAction::ReturnError => {
unreachable!("return_error handled by degrade_error")
}
}
}
fn enqueue_delete_retry(&self, key: String) {
if let Some(queue) = &self.delete_retry {
let _ = queue.enqueue(key);
}
}
}
fn to_cache_error(error: RedisCacheError) -> CacheError {
CacheError::Backend(error.to_string())
}