use crate::{
cache::{CacheError, CacheResult},
cache_redis::{
RedisCacheError, RedisCacheResult, RedisCacheStore, RedisDegradationEvent,
RedisDegradedAction, RedisOperation, classify_redis_error, redis_breaker_rejection,
},
resil::{BreakerCallError, BreakerGuard},
};
pub(crate) fn is_breaker_acceptable_error(error: &RedisCacheError) -> bool {
matches!(
classify_redis_error(error),
crate::cache_redis::RedisCommandOutcome::NoScript
| crate::cache_redis::RedisCommandOutcome::Redirect(_)
)
}
impl RedisCacheStore {
pub(super) async fn protect_command<T, F>(
&self,
operation: &'static str,
future: F,
) -> RedisCacheResult<T>
where
F: std::future::Future<Output = redis::RedisResult<T>>,
{
let Some(breaker) = &self.breaker else {
return future.await.map_err(crate::cache_redis::redis_error);
};
match breaker
.do_with_acceptable(
|| async { future.await.map_err(crate::cache_redis::redis_error) },
is_breaker_acceptable_error,
)
.await
{
Ok(value) => {
self.recorder
.record_breaker(crate::cache_redis::RedisCommandOutcome::Success);
Ok(value)
}
Err(BreakerCallError::Rejected(error)) => {
self.recorder
.record_breaker(crate::cache_redis::RedisCommandOutcome::BreakerRejected);
Err(redis_breaker_rejection(operation, error))
}
Err(BreakerCallError::Inner(error)) => {
self.recorder.record_breaker(classify_redis_error(&error));
Err(error)
}
}
}
pub(super) async fn allow_breaker(
&self,
operation: &'static str,
) -> RedisCacheResult<Option<BreakerGuard>> {
let Some(breaker) = &self.breaker else {
return Ok(None);
};
match breaker.allow().await {
Ok(guard) => Ok(Some(guard)),
Err(error) => {
self.recorder
.record_breaker(crate::cache_redis::RedisCommandOutcome::BreakerRejected);
Err(redis_breaker_rejection(operation, error))
}
}
}
pub(super) async fn record_breaker_outcome<T>(
&self,
guard: Option<BreakerGuard>,
result: &RedisCacheResult<T>,
) {
let Some(guard) = guard else {
return;
};
match result {
Ok(_) => {
guard.record_success().await;
self.recorder
.record_breaker(crate::cache_redis::RedisCommandOutcome::Success);
}
Err(error) if is_breaker_acceptable_error(error) => {
guard.record_success().await;
self.recorder.record_breaker(classify_redis_error(error));
}
Err(error) => {
guard.record_failure().await;
self.recorder.record_breaker(classify_redis_error(error));
}
}
}
pub(super) async fn ensure_breaker_allows(
&self,
operation: RedisOperation,
) -> Result<(), CacheResult<()>> {
let Some(breaker) = &self.breaker else {
return Ok(());
};
match breaker.state().await {
crate::resil::BreakerState::Open => {
self.recorder
.record_breaker(crate::cache_redis::RedisCommandOutcome::BreakerRejected);
let error = RedisCacheError::BreakerOpen(format!(
"{}: circuit breaker is open",
operation.as_str()
));
Err(self.degrade_unit_error(operation, error))
}
crate::resil::BreakerState::Closed | crate::resil::BreakerState::HalfOpen => Ok(()),
}
}
fn degrade_unit_error(
&self,
operation: RedisOperation,
error: RedisCacheError,
) -> CacheResult<()> {
let action = self.config.unavailable_policy.action_for(operation, &error);
if action == RedisDegradedAction::ReturnError {
return Err(CacheError::Backend(error.to_string()));
}
self.recorder
.record_degradation(RedisDegradationEvent::new(operation, action));
Ok(())
}
}