rs-zero 0.2.8

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::{
    future::Future,
    time::{Duration, Instant},
};

use crate::{
    cache_redis::{
        RedisCacheError, RedisCacheResult, RedisClusterRedirectKind, RedisCommandEvent,
        RedisCommandEventKind, RedisCommandOutcome, RedisDegradationEvent,
    },
    resil::BreakerError,
};

/// Records Redis command, pool, redirect and degradation metrics.
#[derive(Debug, Clone, Default)]
pub struct RedisCommandRecorder {
    #[cfg(feature = "observability")]
    metrics: Option<crate::observability::MetricsRegistry>,
    shard: String,
}

impl RedisCommandRecorder {
    /// Creates a recorder without attached metrics.
    pub fn new() -> Self {
        Self {
            #[cfg(feature = "observability")]
            metrics: None,
            shard: "default".to_string(),
        }
    }

    /// Sets the low-cardinality shard label.
    pub fn with_shard(mut self, shard: impl Into<String>) -> Self {
        self.shard = shard.into();
        self
    }

    /// Attaches metrics.
    #[cfg(feature = "observability")]
    pub fn with_metrics(mut self, metrics: crate::observability::MetricsRegistry) -> Self {
        self.metrics = Some(metrics);
        self
    }

    /// Records a command duration and outcome.
    pub fn record_command(
        &self,
        operation: &str,
        outcome: RedisCommandOutcome,
        duration: Duration,
    ) {
        #[cfg(feature = "observability")]
        crate::observability::record_redis_command(
            self.metrics.as_ref(),
            operation,
            &self.shard,
            outcome.as_str(),
            duration,
        );

        #[cfg(not(feature = "observability"))]
        let _ = (operation, outcome, duration);
    }

    /// Records a Redis low-cardinality event.
    pub fn record_event(&self, event: RedisCommandEvent) {
        #[cfg(feature = "observability")]
        crate::observability::record_redis_event(
            self.metrics.as_ref(),
            event.event.as_str(),
            &self.shard,
            event.outcome.as_str(),
        );

        #[cfg(not(feature = "observability"))]
        let _ = event;
    }

    /// Records a Redis Cluster redirect.
    pub fn record_redirect(&self, kind: RedisClusterRedirectKind) {
        self.record_event(RedisCommandEvent::redirect(kind));
    }

    /// Records a degradation event.
    pub fn record_degradation(&self, event: RedisDegradationEvent) {
        #[cfg(feature = "observability")]
        crate::observability::record_redis_degradation(
            self.metrics.as_ref(),
            event.operation.as_str(),
            event.action.as_str(),
            &self.shard,
        );
        #[cfg(not(feature = "observability"))]
        let _ = event;

        self.record_event(RedisCommandEvent::new(
            RedisCommandEventKind::Degradation,
            RedisCommandOutcome::Degraded,
        ));
    }

    /// Records a Redis breaker event.
    pub fn record_breaker(&self, outcome: RedisCommandOutcome) {
        self.record_event(RedisCommandEvent::new(
            RedisCommandEventKind::Breaker,
            outcome,
        ));
    }
}

/// Runs a Redis future with timeout and records the classified outcome.
pub async fn run_redis_command<T, F>(
    recorder: &RedisCommandRecorder,
    operation: &'static str,
    command_timeout: Duration,
    future: F,
) -> RedisCacheResult<T>
where
    F: Future<Output = redis::RedisResult<T>>,
{
    run_redis_cache_command(recorder, operation, command_timeout, async {
        future.await.map_err(redis_error)
    })
    .await
}

pub(crate) async fn run_redis_cache_command<T, F>(
    recorder: &RedisCommandRecorder,
    operation: &'static str,
    command_timeout: Duration,
    future: F,
) -> RedisCacheResult<T>
where
    F: Future<Output = RedisCacheResult<T>>,
{
    let started = Instant::now();
    let result = match tokio::time::timeout(command_timeout, future).await {
        Ok(result) => result,
        Err(_) => Err(RedisCacheError::Timeout(operation.to_string())),
    };
    let outcome = match &result {
        Ok(_) => RedisCommandOutcome::Success,
        Err(error) => crate::cache_redis::classify_redis_error(error),
    };
    recorder.record_command(operation, outcome, started.elapsed());
    if let RedisCommandOutcome::Redirect(kind) = outcome {
        recorder.record_redirect(kind);
    }
    result
}

pub(crate) fn redis_breaker_rejection(
    operation: &'static str,
    error: BreakerError,
) -> RedisCacheError {
    RedisCacheError::BreakerOpen(format!("{operation}: {error}"))
}

pub(crate) fn redis_error(error: redis::RedisError) -> RedisCacheError {
    RedisCacheError::Backend(error.to_string())
}