rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::{collections::BTreeMap, fmt};

use crate::cache_redis::RedisCacheError;

/// Redis Cluster redirection kind.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum RedisClusterRedirectKind {
    /// Persistent slot owner changed.
    Moved,
    /// Temporary slot migration redirection.
    Ask,
}

impl RedisClusterRedirectKind {
    /// Returns a low-cardinality label value.
    pub fn as_str(self) -> &'static str {
        match self {
            Self::Moved => "moved",
            Self::Ask => "ask",
        }
    }
}

impl fmt::Display for RedisClusterRedirectKind {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str(self.as_str())
    }
}

/// Parsed Redis Cluster redirection response.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RedisClusterRedirect {
    /// Redirection kind.
    pub kind: RedisClusterRedirectKind,
    /// Slot referenced by Redis.
    pub slot: u16,
    /// Target endpoint as returned by Redis, without credentials.
    pub endpoint: String,
}

impl RedisClusterRedirect {
    /// Parses `MOVED slot host:port` or `ASK slot host:port`.
    pub fn parse(message: &str) -> Option<Self> {
        let mut parts = message.split_whitespace();
        let kind = match parts.next()? {
            "MOVED" => RedisClusterRedirectKind::Moved,
            "ASK" => RedisClusterRedirectKind::Ask,
            _ => return None,
        };
        let slot = parts.next()?.parse::<u16>().ok()?;
        let endpoint = parts.next()?.to_string();
        if endpoint.is_empty() || parts.next().is_some() {
            return None;
        }
        Some(Self {
            kind,
            slot,
            endpoint,
        })
    }
}

/// Slot map update required after a redirection.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RedisClusterRouteAction {
    /// Refresh persistent slot ownership.
    RefreshSlot,
    /// Use one temporary ASK redirection.
    AskRedirect,
}

/// Minimal in-memory Redis Cluster slot map.
#[derive(Debug, Clone, Default)]
pub struct RedisClusterSlotMap {
    slots: BTreeMap<u16, String>,
}

impl RedisClusterSlotMap {
    /// Creates an empty slot map.
    pub fn new() -> Self {
        Self::default()
    }

    /// Returns the endpoint currently assigned to a slot.
    pub fn endpoint_for_slot(&self, slot: u16) -> Option<&str> {
        self.slots.get(&slot).map(String::as_str)
    }

    /// Sets the endpoint for one slot.
    pub fn set_slot(&mut self, slot: u16, endpoint: impl Into<String>) {
        self.slots.insert(slot, endpoint.into());
    }

    /// Applies a redirect and returns the route action to perform.
    pub fn apply_redirect(&mut self, redirect: &RedisClusterRedirect) -> RedisClusterRouteAction {
        match redirect.kind {
            RedisClusterRedirectKind::Moved => {
                self.set_slot(redirect.slot, redirect.endpoint.clone());
                RedisClusterRouteAction::RefreshSlot
            }
            RedisClusterRedirectKind::Ask => RedisClusterRouteAction::AskRedirect,
        }
    }
}

/// Redis command result category for low-cardinality metrics and retries.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum RedisCommandOutcome {
    /// Command succeeded.
    Success,
    /// Command timed out.
    Timeout,
    /// Command returned a non-timeout error.
    Error,
    /// Command returned Redis `NOSCRIPT`.
    NoScript,
    /// Command returned a Redis Cluster redirect.
    Redirect(RedisClusterRedirectKind),
    /// Command was degraded by explicit policy.
    Degraded,
    /// Command was rejected by the local circuit breaker.
    BreakerRejected,
}

impl RedisCommandOutcome {
    /// Returns a low-cardinality label value.
    pub fn as_str(self) -> &'static str {
        match self {
            Self::Success => "success",
            Self::Timeout => "timeout",
            Self::Error => "error",
            Self::NoScript => "noscript",
            Self::Redirect(kind) => kind.as_str(),
            Self::Degraded => "degraded",
            Self::BreakerRejected => "breaker_rejected",
        }
    }

    /// Returns whether this outcome represents Redis `NOSCRIPT`.
    pub fn is_noscript(self) -> bool {
        matches!(self, Self::NoScript)
    }
}

/// Redis command event category for low-cardinality metrics.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum RedisCommandEventKind {
    /// Normal command execution.
    Command,
    /// Connection acquisition or pool state.
    Pool,
    /// Redis Cluster redirection.
    Redirect,
    /// Lua script cache behavior.
    Script,
    /// Explicit Redis degradation behavior.
    Degradation,
    /// Local Redis breaker allow, reject, success or failure event.
    Breaker,
}

impl RedisCommandEventKind {
    /// Returns a low-cardinality label value.
    pub fn as_str(self) -> &'static str {
        match self {
            Self::Command => "command",
            Self::Pool => "pool",
            Self::Redirect => "redirect",
            Self::Script => "script",
            Self::Degradation => "degradation",
            Self::Breaker => "breaker",
        }
    }
}

/// Low-cardinality Redis event.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RedisCommandEvent {
    /// Event kind.
    pub event: RedisCommandEventKind,
    /// Event outcome.
    pub outcome: RedisCommandOutcome,
}

impl RedisCommandEvent {
    /// Creates a Redis event.
    pub fn new(event: RedisCommandEventKind, outcome: RedisCommandOutcome) -> Self {
        Self { event, outcome }
    }

    /// Creates a redirect event.
    pub fn redirect(kind: RedisClusterRedirectKind) -> Self {
        Self::new(
            RedisCommandEventKind::Redirect,
            RedisCommandOutcome::Redirect(kind),
        )
    }
}

/// Classifies a Redis adapter error into a low-cardinality outcome.
pub fn classify_redis_error(error: &RedisCacheError) -> RedisCommandOutcome {
    match error {
        RedisCacheError::Timeout(_) => RedisCommandOutcome::Timeout,
        RedisCacheError::BreakerOpen(_) => RedisCommandOutcome::BreakerRejected,
        RedisCacheError::ClusterRedirect(redirect) => RedisCommandOutcome::Redirect(redirect.kind),
        RedisCacheError::Backend(message) if is_noscript_message(message) => {
            RedisCommandOutcome::NoScript
        }
        RedisCacheError::Backend(message) => RedisClusterRedirect::parse(message)
            .map(|redirect| RedisCommandOutcome::Redirect(redirect.kind))
            .unwrap_or(RedisCommandOutcome::Error),
        _ => RedisCommandOutcome::Error,
    }
}

pub(crate) fn is_noscript_message(message: &str) -> bool {
    let normalized = message.trim_start().to_ascii_uppercase();

    normalized.starts_with("NOSCRIPT")
        || normalized.contains(" NOSCRIPT")
        || normalized.contains("NOSCRIPTERROR")
}