use redis::aio::MultiplexedConnection;
use crate::{
cache_redis::{
RedisCacheError, RedisCacheResult, RedisCommandOutcome, classify_redis_error,
redis_breaker_rejection,
},
lock::LockResult,
resil::BreakerGuard,
};
use super::RedisDistributedLock;
#[derive(Clone)]
pub(crate) enum RedisLockBackend {
Single(redis::Client),
Cluster(redis::cluster::ClusterClient),
}
impl std::fmt::Debug for RedisLockBackend {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Single(_) => formatter.write_str("RedisLockBackend::Single"),
Self::Cluster(_) => formatter.write_str("RedisLockBackend::Cluster"),
}
}
}
impl RedisDistributedLock {
pub(crate) async fn single_connection(
&self,
client: &redis::Client,
) -> LockResult<MultiplexedConnection> {
let guard = self.allow_breaker("LOCK_CONNECT").await?;
let result = tokio::time::timeout(
self.config.redis.connect_timeout,
client.get_multiplexed_async_connection(),
)
.await
.map_err(|_| RedisCacheError::Timeout("lock connect".to_string()))?
.map_err(|error| RedisCacheError::Connection(error.to_string()));
self.record_breaker_outcome(guard, &result).await;
result.map_err(Into::into)
}
pub(crate) async fn cluster_connection(
&self,
client: &redis::cluster::ClusterClient,
) -> LockResult<redis::cluster_async::ClusterConnection> {
let guard = self.allow_breaker("LOCK_CLUSTER_CONNECT").await?;
let result = tokio::time::timeout(
self.config.redis.connect_timeout,
client.get_async_connection(),
)
.await
.map_err(|_| RedisCacheError::Timeout("lock cluster connect".to_string()))?
.map_err(|error| RedisCacheError::Connection(error.to_string()));
self.record_breaker_outcome(guard, &result).await;
result.map_err(Into::into)
}
async fn allow_breaker(
&self,
operation: &'static str,
) -> RedisCacheResult<Option<BreakerGuard>> {
let Some(breaker) = &self.breaker else {
return Ok(None);
};
match breaker.allow().await {
Ok(guard) => Ok(Some(guard)),
Err(error) => {
self.recorder
.record_breaker(RedisCommandOutcome::BreakerRejected);
Err(redis_breaker_rejection(operation, error))
}
}
}
async fn record_breaker_outcome<T>(
&self,
guard: Option<BreakerGuard>,
result: &RedisCacheResult<T>,
) {
let Some(guard) = guard else {
return;
};
match result {
Ok(_) => {
guard.record_success().await;
self.recorder.record_breaker(RedisCommandOutcome::Success);
}
Err(error) if crate::cache_redis::store_breaker::is_breaker_acceptable_error(error) => {
guard.record_success().await;
self.recorder.record_breaker(classify_redis_error(error));
}
Err(error) => {
guard.record_failure().await;
self.recorder.record_breaker(classify_redis_error(error));
}
}
}
}