rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use crate::{
    cache::{CacheError, CacheResult},
    cache_redis::{
        RedisCacheError, RedisCacheResult, RedisCacheStore, RedisDegradationEvent,
        RedisDegradedAction, RedisOperation, classify_redis_error, redis_breaker_rejection,
    },
    resil::{BreakerCallError, BreakerGuard},
};

pub(crate) fn is_breaker_acceptable_error(error: &RedisCacheError) -> bool {
    matches!(
        classify_redis_error(error),
        crate::cache_redis::RedisCommandOutcome::NoScript
            | crate::cache_redis::RedisCommandOutcome::Redirect(_)
    )
}

impl RedisCacheStore {
    pub(super) async fn protect_command<T, F>(
        &self,
        operation: &'static str,
        future: F,
    ) -> RedisCacheResult<T>
    where
        F: std::future::Future<Output = redis::RedisResult<T>>,
    {
        let Some(breaker) = &self.breaker else {
            return future.await.map_err(crate::cache_redis::redis_error);
        };

        match breaker
            .do_with_acceptable(
                || async { future.await.map_err(crate::cache_redis::redis_error) },
                is_breaker_acceptable_error,
            )
            .await
        {
            Ok(value) => {
                self.recorder
                    .record_breaker(crate::cache_redis::RedisCommandOutcome::Success);
                Ok(value)
            }
            Err(BreakerCallError::Rejected(error)) => {
                self.recorder
                    .record_breaker(crate::cache_redis::RedisCommandOutcome::BreakerRejected);
                Err(redis_breaker_rejection(operation, error))
            }
            Err(BreakerCallError::Inner(error)) => {
                self.recorder.record_breaker(classify_redis_error(&error));
                Err(error)
            }
        }
    }

    pub(super) async fn allow_breaker(
        &self,
        operation: &'static str,
    ) -> RedisCacheResult<Option<BreakerGuard>> {
        let Some(breaker) = &self.breaker else {
            return Ok(None);
        };

        match breaker.allow().await {
            Ok(guard) => Ok(Some(guard)),
            Err(error) => {
                self.recorder
                    .record_breaker(crate::cache_redis::RedisCommandOutcome::BreakerRejected);
                Err(redis_breaker_rejection(operation, error))
            }
        }
    }

    pub(super) async fn record_breaker_outcome<T>(
        &self,
        guard: Option<BreakerGuard>,
        result: &RedisCacheResult<T>,
    ) {
        let Some(guard) = guard else {
            return;
        };

        match result {
            Ok(_) => {
                guard.record_success().await;
                self.recorder
                    .record_breaker(crate::cache_redis::RedisCommandOutcome::Success);
            }
            Err(error) if is_breaker_acceptable_error(error) => {
                guard.record_success().await;
                self.recorder.record_breaker(classify_redis_error(error));
            }
            Err(error) => {
                guard.record_failure().await;
                self.recorder.record_breaker(classify_redis_error(error));
            }
        }
    }

    pub(super) async fn ensure_breaker_allows(
        &self,
        operation: RedisOperation,
    ) -> Result<(), CacheResult<()>> {
        let Some(breaker) = &self.breaker else {
            return Ok(());
        };

        match breaker.state().await {
            crate::resil::BreakerState::Open => {
                self.recorder
                    .record_breaker(crate::cache_redis::RedisCommandOutcome::BreakerRejected);
                let error = RedisCacheError::BreakerOpen(format!(
                    "{}: circuit breaker is open",
                    operation.as_str()
                ));
                Err(self.degrade_unit_error(operation, error))
            }
            crate::resil::BreakerState::Closed | crate::resil::BreakerState::HalfOpen => Ok(()),
        }
    }

    fn degrade_unit_error(
        &self,
        operation: RedisOperation,
        error: RedisCacheError,
    ) -> CacheResult<()> {
        let action = self.config.unavailable_policy.action_for(operation, &error);
        if action == RedisDegradedAction::ReturnError {
            return Err(CacheError::Backend(error.to_string()));
        }
        self.recorder
            .record_degradation(RedisDegradationEvent::new(operation, action));
        Ok(())
    }
}