rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use redis::aio::MultiplexedConnection;

use crate::{
    cache_redis::{
        RedisCacheError, RedisCacheResult, RedisCommandOutcome, classify_redis_error,
        redis_breaker_rejection,
    },
    lock::LockResult,
    resil::BreakerGuard,
};

use super::RedisDistributedLock;

#[derive(Clone)]
pub(crate) enum RedisLockBackend {
    Single(redis::Client),
    Cluster(redis::cluster::ClusterClient),
}

impl std::fmt::Debug for RedisLockBackend {
    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Single(_) => formatter.write_str("RedisLockBackend::Single"),
            Self::Cluster(_) => formatter.write_str("RedisLockBackend::Cluster"),
        }
    }
}

impl RedisDistributedLock {
    pub(crate) async fn single_connection(
        &self,
        client: &redis::Client,
    ) -> LockResult<MultiplexedConnection> {
        let guard = self.allow_breaker("LOCK_CONNECT").await?;
        let result = tokio::time::timeout(
            self.config.redis.connect_timeout,
            client.get_multiplexed_async_connection(),
        )
        .await
        .map_err(|_| RedisCacheError::Timeout("lock connect".to_string()))?
        .map_err(|error| RedisCacheError::Connection(error.to_string()));
        self.record_breaker_outcome(guard, &result).await;
        result.map_err(Into::into)
    }

    pub(crate) async fn cluster_connection(
        &self,
        client: &redis::cluster::ClusterClient,
    ) -> LockResult<redis::cluster_async::ClusterConnection> {
        let guard = self.allow_breaker("LOCK_CLUSTER_CONNECT").await?;
        let result = tokio::time::timeout(
            self.config.redis.connect_timeout,
            client.get_async_connection(),
        )
        .await
        .map_err(|_| RedisCacheError::Timeout("lock cluster connect".to_string()))?
        .map_err(|error| RedisCacheError::Connection(error.to_string()));
        self.record_breaker_outcome(guard, &result).await;
        result.map_err(Into::into)
    }

    async fn allow_breaker(
        &self,
        operation: &'static str,
    ) -> RedisCacheResult<Option<BreakerGuard>> {
        let Some(breaker) = &self.breaker else {
            return Ok(None);
        };

        match breaker.allow().await {
            Ok(guard) => Ok(Some(guard)),
            Err(error) => {
                self.recorder
                    .record_breaker(RedisCommandOutcome::BreakerRejected);
                Err(redis_breaker_rejection(operation, error))
            }
        }
    }

    async fn record_breaker_outcome<T>(
        &self,
        guard: Option<BreakerGuard>,
        result: &RedisCacheResult<T>,
    ) {
        let Some(guard) = guard else {
            return;
        };

        match result {
            Ok(_) => {
                guard.record_success().await;
                self.recorder.record_breaker(RedisCommandOutcome::Success);
            }
            Err(error) if crate::cache_redis::store_breaker::is_breaker_acceptable_error(error) => {
                guard.record_success().await;
                self.recorder.record_breaker(classify_redis_error(error));
            }
            Err(error) => {
                guard.record_failure().await;
                self.recorder.record_breaker(classify_redis_error(error));
            }
        }
    }
}