rs-zero 0.2.7

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::time::Duration;

use crate::cache_redis::RedisBreakerConfig;

/// Redis cache adapter configuration.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RedisCacheConfig {
    /// Redis URL. In Cluster mode this accepts a comma-separated list of startup node URLs.
    pub url: String,
    /// Namespace prefix.
    pub namespace: String,
    /// Default TTL for positive values.
    pub default_ttl: Duration,
    /// TTL for negative cache placeholders.
    pub not_found_ttl: Duration,
    /// Timeout for establishing a Redis connection.
    pub connect_timeout: Duration,
    /// Timeout for individual Redis commands.
    pub command_timeout: Duration,
    /// Optional bounded retry behavior for failed cache deletes.
    pub delete_retry: RedisDeleteRetryConfig,
    /// Explicit behavior when Redis is unavailable.
    pub unavailable_policy: RedisUnavailablePolicy,
    /// Optional circuit breaker protecting Redis operations.
    pub breaker: RedisBreakerConfig,
    /// Redis Cluster routing settings.
    pub cluster: RedisClusterConfig,
}

impl Default for RedisCacheConfig {
    fn default() -> Self {
        Self {
            url: "redis://127.0.0.1:6379".to_string(),
            namespace: "rs-zero".to_string(),
            default_ttl: Duration::from_secs(300),
            not_found_ttl: Duration::from_secs(60),
            connect_timeout: Duration::from_secs(3),
            command_timeout: Duration::from_secs(2),
            delete_retry: RedisDeleteRetryConfig::default(),
            unavailable_policy: RedisUnavailablePolicy::default(),
            breaker: RedisBreakerConfig::default(),
            cluster: RedisClusterConfig::default(),
        }
    }
}

impl RedisCacheConfig {
    /// Validates configuration values that can be checked locally.
    pub fn validate(&self) -> Result<(), crate::cache_redis::RedisCacheError> {
        if self.cluster.enabled {
            validate_redis_urls(&self.url)?;
        } else {
            validate_redis_url(&self.url)?;
        }
        if self.namespace.trim().is_empty() {
            return Err(crate::cache_redis::RedisCacheError::InvalidConfig(
                "redis cache namespace is required".to_string(),
            ));
        }
        Ok(())
    }
}

fn validate_redis_url(url: &str) -> Result<(), crate::cache_redis::RedisCacheError> {
    let url = url.trim();
    if url.is_empty() || url.contains(',') {
        return Err(crate::cache_redis::RedisCacheError::InvalidUrl {
            url: url.to_string(),
        });
    }
    validate_redis_url_scheme(url)
}

fn validate_redis_urls(urls: &str) -> Result<(), crate::cache_redis::RedisCacheError> {
    let nodes = urls
        .split(',')
        .map(str::trim)
        .filter(|url| !url.is_empty())
        .collect::<Vec<_>>();
    if nodes.is_empty() {
        return Err(crate::cache_redis::RedisCacheError::InvalidUrl {
            url: urls.to_string(),
        });
    }
    for url in nodes {
        validate_redis_url_scheme(url)?;
    }
    Ok(())
}

fn validate_redis_url_scheme(url: &str) -> Result<(), crate::cache_redis::RedisCacheError> {
    if !url.starts_with("redis://") && !url.starts_with("rediss://") {
        return Err(crate::cache_redis::RedisCacheError::InvalidUrl {
            url: url.to_string(),
        });
    }
    Ok(())
}

/// Redis cache operation category for degradation policy.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum RedisOperation {
    /// Read one cache key.
    Get,
    /// Write one cache key.
    Set,
    /// Delete one or more cache keys.
    Delete,
}

impl RedisOperation {
    /// Returns a low-cardinality label value.
    pub fn as_str(self) -> &'static str {
        match self {
            Self::Get => "get",
            Self::Set => "set",
            Self::Delete => "delete",
        }
    }
}

/// Action to take when Redis is unavailable.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum RedisDegradedAction {
    /// Return the original Redis error.
    ReturnError,
    /// Treat a failed read as a cache miss.
    ReturnMiss,
    /// Skip a failed write or delete.
    SkipWrite,
}

impl RedisDegradedAction {
    /// Returns a low-cardinality label value.
    pub fn as_str(self) -> &'static str {
        match self {
            Self::ReturnError => "return_error",
            Self::ReturnMiss => "return_miss",
            Self::SkipWrite => "skip_write",
        }
    }
}

/// Explicit Redis unavailable policy for cache operations.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RedisUnavailablePolicy {
    /// Action for failed reads.
    pub get: RedisDegradedAction,
    /// Action for failed writes.
    pub set: RedisDegradedAction,
    /// Action for failed deletes.
    pub delete: RedisDegradedAction,
}

impl Default for RedisUnavailablePolicy {
    fn default() -> Self {
        Self::fail_closed()
    }
}

impl RedisUnavailablePolicy {
    /// Keeps existing behavior and returns Redis errors.
    pub fn fail_closed() -> Self {
        Self {
            get: RedisDegradedAction::ReturnError,
            set: RedisDegradedAction::ReturnError,
            delete: RedisDegradedAction::ReturnError,
        }
    }

    /// Degrades cache reads to misses and writes/deletes to no-ops.
    pub fn fail_open_for_cache() -> Self {
        Self {
            get: RedisDegradedAction::ReturnMiss,
            set: RedisDegradedAction::SkipWrite,
            delete: RedisDegradedAction::SkipWrite,
        }
    }

    /// Returns the action for an operation.
    pub fn action_for(
        &self,
        operation: RedisOperation,
        _error: &crate::cache_redis::RedisCacheError,
    ) -> RedisDegradedAction {
        match operation {
            RedisOperation::Get => self.get,
            RedisOperation::Set => self.set,
            RedisOperation::Delete => self.delete,
        }
    }
}

/// Low-cardinality Redis degradation event.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RedisDegradationEvent {
    /// Redis operation being degraded.
    pub operation: RedisOperation,
    /// Degraded action.
    pub action: RedisDegradedAction,
}

impl RedisDegradationEvent {
    /// Creates a degradation event.
    pub fn new(operation: RedisOperation, action: RedisDegradedAction) -> Self {
        Self { operation, action }
    }
}

/// Redis Cluster routing configuration.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RedisClusterConfig {
    /// Whether native Redis Cluster routing is enabled.
    pub enabled: bool,
    /// Maximum Redis Cluster routing retries handled by the Redis client.
    pub max_redirects: u32,
    /// Whether read-only commands may be routed to replicas.
    pub read_from_replicas: bool,
}

impl Default for RedisClusterConfig {
    fn default() -> Self {
        Self {
            enabled: false,
            max_redirects: 5,
            read_from_replicas: false,
        }
    }
}

/// Bounded retry settings for failed Redis cache delete commands.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RedisDeleteRetryConfig {
    /// Whether failed deletes should be retried in a background worker.
    pub enabled: bool,
    /// Maximum queued delete tasks.
    pub capacity: usize,
    /// Maximum attempts per key, including the first retry attempt.
    pub max_attempts: u32,
    /// Initial delay before retrying; later attempts use a linear backoff.
    pub initial_delay: Duration,
}

impl Default for RedisDeleteRetryConfig {
    fn default() -> Self {
        Self {
            enabled: false,
            capacity: 1024,
            max_attempts: 3,
            initial_delay: Duration::from_millis(50),
        }
    }
}

/// One Redis node in an application-level sharded cache topology.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RedisNodeConfig {
    /// Stable node name used by the sharding algorithm and diagnostics.
    pub name: String,
    /// Redis URL for this node.
    pub url: String,
    /// Relative node weight. Higher values receive more keys.
    pub weight: u32,
}

impl RedisNodeConfig {
    /// Creates a node config with weight `1`.
    pub fn new(name: impl Into<String>, url: impl Into<String>) -> Self {
        Self {
            name: name.into(),
            url: url.into(),
            weight: 1,
        }
    }

    /// Overrides the relative node weight.
    pub fn with_weight(mut self, weight: u32) -> Self {
        self.weight = weight;
        self
    }
}

/// Redis application-level sharding configuration.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RedisShardedCacheConfig {
    /// Shared namespace prefix.
    pub namespace: String,
    /// Default entry time-to-live.
    pub default_ttl: Duration,
    /// TTL for negative cache placeholders.
    pub not_found_ttl: Duration,
    /// Timeout for establishing Redis connections.
    pub connect_timeout: Duration,
    /// Timeout for individual Redis commands.
    pub command_timeout: Duration,
    /// Redis nodes participating in application-level sharding.
    pub nodes: Vec<RedisNodeConfig>,
    /// Optional bounded retry behavior for failed cache deletes on each shard.
    pub delete_retry: RedisDeleteRetryConfig,
    /// Optional breaker configuration inherited by every shard.
    pub breaker: RedisBreakerConfig,
}

impl Default for RedisShardedCacheConfig {
    fn default() -> Self {
        let base = RedisCacheConfig::default();
        Self {
            namespace: base.namespace,
            default_ttl: base.default_ttl,
            not_found_ttl: base.not_found_ttl,
            connect_timeout: base.connect_timeout,
            command_timeout: base.command_timeout,
            nodes: Vec::new(),
            delete_retry: RedisDeleteRetryConfig::default(),
            breaker: RedisBreakerConfig::default(),
        }
    }
}

impl RedisShardedCacheConfig {
    /// Validates local sharded Redis configuration.
    pub fn validate(&self) -> Result<(), crate::cache_redis::RedisCacheError> {
        if self.namespace.trim().is_empty() {
            return Err(crate::cache_redis::RedisCacheError::InvalidConfig(
                "redis cache namespace is required".to_string(),
            ));
        }
        if self.nodes.is_empty() {
            return Err(crate::cache_redis::RedisCacheError::InvalidConfig(
                "at least one redis shard is required".to_string(),
            ));
        }
        for node in &self.nodes {
            self.validate_node(node)?;
        }
        Ok(())
    }

    /// Converts one shard to the single-node store configuration.
    pub fn node_cache_config(&self, node: &RedisNodeConfig) -> RedisCacheConfig {
        RedisCacheConfig {
            url: node.url.clone(),
            namespace: self.namespace.clone(),
            default_ttl: self.default_ttl,
            not_found_ttl: self.not_found_ttl,
            connect_timeout: self.connect_timeout,
            command_timeout: self.command_timeout,
            delete_retry: self.delete_retry,
            unavailable_policy: RedisUnavailablePolicy::default(),
            breaker: self.breaker.clone(),
            cluster: RedisClusterConfig::default(),
        }
    }

    fn validate_node(
        &self,
        node: &RedisNodeConfig,
    ) -> Result<(), crate::cache_redis::RedisCacheError> {
        if node.name.trim().is_empty() {
            return Err(crate::cache_redis::RedisCacheError::InvalidConfig(
                "redis shard name is required".to_string(),
            ));
        }
        if node.weight == 0 {
            return Err(crate::cache_redis::RedisCacheError::InvalidConfig(format!(
                "redis shard `{}` weight must be greater than zero",
                node.name
            )));
        }
        self.node_cache_config(node).validate()
    }
}