redis-sentinel-pool 0.1.0

An async Redis Sentinel-aware connection pool built on top of redis-rs and bb8, with transparent master failover.
Documentation
//! 连接池配置。
//!
//! 通过 [`SentinelPoolConfig`] 构造一个 [`crate::SentinelPool`]。所有字段
//! 都提供了合理的默认值,可按需链式覆盖。

use std::time::Duration;

use redis::{ProtocolVersion, RedisConnectionInfo, TlsMode, sentinel::SentinelNodeConnectionInfo};

/// 节点角色:master 或 replica。
///
/// 当前连接池主要服务于 master,写入这个类型主要是为后续扩展(只读副本池)留口子。
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ServerRole {
    /// 主节点(可读写)。
    #[default]
    Master,
    /// 副本节点(只读)。
    Replica,
}

/// Sentinel 连接池配置。
///
/// # 示例
///
/// ```no_run
/// use redis_sentinel_pool::SentinelPoolConfig;
///
/// let cfg = SentinelPoolConfig::new(
///     vec![
///         "redis://127.0.0.1:26379".to_string(),
///         "redis://127.0.0.1:26380".to_string(),
///         "redis://127.0.0.1:26381".to_string(),
///     ],
///     "mymaster",
/// )
/// .max_size(32)
/// .redis_password("secret");
/// ```
#[derive(Clone)]
pub struct SentinelPoolConfig {
    pub(crate) sentinels: Vec<String>,
    pub(crate) service_name: String,
    pub(crate) role: ServerRole,

    pub(crate) max_size: u32,
    pub(crate) min_idle: Option<u32>,
    pub(crate) connection_timeout: Duration,
    pub(crate) idle_timeout: Option<Duration>,
    pub(crate) max_lifetime: Option<Duration>,

    pub(crate) verify_role_on_checkout: bool,
    pub(crate) max_retries: u32,
    pub(crate) retry_backoff: Duration,
    pub(crate) enable_watcher: bool,
    pub(crate) watcher_reconnect_backoff: Duration,

    pub(crate) redis_db: Option<i64>,
    pub(crate) redis_username: Option<String>,
    pub(crate) redis_password: Option<String>,
    pub(crate) redis_protocol: Option<ProtocolVersion>,
    pub(crate) redis_tls_mode: Option<TlsMode>,
}

impl SentinelPoolConfig {
    /// 创建一个新的配置。
    ///
    /// * `sentinels` —— Sentinel 节点列表,形如 `redis://host:port`。
    /// * `service_name` —— Sentinel 中配置的 master 名称,例如 `mymaster`。
    pub fn new<I, S>(sentinels: I, service_name: impl Into<String>) -> Self
    where
        I: IntoIterator<Item = S>,
        S: Into<String>,
    {
        Self {
            sentinels: sentinels.into_iter().map(Into::into).collect(),
            service_name: service_name.into(),
            role: ServerRole::Master,
            max_size: 16,
            min_idle: None,
            connection_timeout: Duration::from_secs(5),
            idle_timeout: Some(Duration::from_secs(600)),
            max_lifetime: Some(Duration::from_secs(1800)),
            verify_role_on_checkout: true,
            max_retries: 3,
            retry_backoff: Duration::from_millis(100),
            enable_watcher: true,
            watcher_reconnect_backoff: Duration::from_secs(2),
            redis_db: None,
            redis_username: None,
            redis_password: None,
            redis_protocol: None,
            redis_tls_mode: None,
        }
    }

    /// 选择拉取的节点角色,默认 [`ServerRole::Master`]。
    pub fn role(mut self, role: ServerRole) -> Self {
        self.role = role;
        self
    }

    /// 连接池最大连接数,默认 `16`。
    pub fn max_size(mut self, max_size: u32) -> Self {
        self.max_size = max_size;
        self
    }

    /// 最小空闲连接数(提前预热),默认不设置。
    pub fn min_idle(mut self, min_idle: u32) -> Self {
        self.min_idle = Some(min_idle);
        self
    }

    /// 借连接的超时时间,默认 5 秒。
    pub fn connection_timeout(mut self, timeout: Duration) -> Self {
        self.connection_timeout = timeout;
        self
    }

    /// 空闲连接最长存活时间,默认 600 秒。
    pub fn idle_timeout(mut self, timeout: Option<Duration>) -> Self {
        self.idle_timeout = timeout;
        self
    }

    /// 连接最大生命周期,默认 1800 秒。
    pub fn max_lifetime(mut self, lifetime: Option<Duration>) -> Self {
        self.max_lifetime = lifetime;
        self
    }

    /// 借连接时是否额外发送 `ROLE` 校验当前节点仍是 master,默认 `true`。
    ///
    /// 关闭后只会执行 `PING`,可以省一次 RTT,但 failover 检测延迟会变高。
    pub fn verify_role_on_checkout(mut self, verify: bool) -> Self {
        self.verify_role_on_checkout = verify;
        self
    }

    /// `execute_with_retry` 系列方法的最大重试次数,默认 `3`。
    pub fn max_retries(mut self, retries: u32) -> Self {
        self.max_retries = retries;
        self
    }

    /// 重试之间的退避,每次重试乘以重试次数,默认 100ms(即 100ms、200ms、300ms ...)。
    pub fn retry_backoff(mut self, backoff: Duration) -> Self {
        self.retry_backoff = backoff;
        self
    }

    /// 是否启用后台订阅 `+switch-master` 事件,默认开启。
    pub fn enable_watcher(mut self, enable: bool) -> Self {
        self.enable_watcher = enable;
        self
    }

    /// watcher 断开后重连的退避,默认 2 秒。
    pub fn watcher_reconnect_backoff(mut self, backoff: Duration) -> Self {
        self.watcher_reconnect_backoff = backoff;
        self
    }

    /// 选择 Redis 数据库索引(注意:Sentinel 自身的连接不能使用非 0 库)。
    pub fn redis_db(mut self, db: i64) -> Self {
        self.redis_db = Some(db);
        self
    }

    /// Redis ACL 用户名。
    pub fn redis_username(mut self, username: impl Into<String>) -> Self {
        self.redis_username = Some(username.into());
        self
    }

    /// Redis 密码。
    pub fn redis_password(mut self, password: impl Into<String>) -> Self {
        self.redis_password = Some(password.into());
        self
    }

    /// Redis 协议版本(默认 RESP2)。
    pub fn redis_protocol(mut self, protocol: ProtocolVersion) -> Self {
        self.redis_protocol = Some(protocol);
        self
    }

    /// Redis 节点的 TLS 模式。
    pub fn redis_tls_mode(mut self, tls_mode: TlsMode) -> Self {
        self.redis_tls_mode = Some(tls_mode);
        self
    }

    /// 内部使用:构造一个用于 SentinelClient 的 [`SentinelNodeConnectionInfo`]。
    pub(crate) fn build_node_connection_info(&self) -> SentinelNodeConnectionInfo {
        let mut info = SentinelNodeConnectionInfo::default();

        if let Some(tls) = self.redis_tls_mode {
            info = info.set_tls_mode(tls);
        }

        let mut rci = RedisConnectionInfo::default();
        let mut touched = false;

        if let Some(db) = self.redis_db {
            rci = rci.set_db(db);
            touched = true;
        }
        if let Some(u) = &self.redis_username {
            rci = rci.set_username(u);
            touched = true;
        }
        if let Some(p) = &self.redis_password {
            rci = rci.set_password(p);
            touched = true;
        }
        if let Some(proto) = self.redis_protocol {
            rci = rci.set_protocol(proto);
            touched = true;
        }

        if touched {
            info = info.set_redis_connection_info(rci);
        }
        info
    }
}

impl std::fmt::Debug for SentinelPoolConfig {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("SentinelPoolConfig")
            .field("sentinels", &self.sentinels)
            .field("service_name", &self.service_name)
            .field("role", &self.role)
            .field("max_size", &self.max_size)
            .field("min_idle", &self.min_idle)
            .field("connection_timeout", &self.connection_timeout)
            .field("idle_timeout", &self.idle_timeout)
            .field("max_lifetime", &self.max_lifetime)
            .field("verify_role_on_checkout", &self.verify_role_on_checkout)
            .field("max_retries", &self.max_retries)
            .field("retry_backoff", &self.retry_backoff)
            .field("enable_watcher", &self.enable_watcher)
            .field("watcher_reconnect_backoff", &self.watcher_reconnect_backoff)
            .field("redis_db", &self.redis_db)
            .field("redis_username", &self.redis_username.as_ref().map(|_| "***"))
            .field("redis_password", &self.redis_password.as_ref().map(|_| "***"))
            .field("redis_protocol", &self.redis_protocol)
            .field(
                "redis_tls_mode",
                &self.redis_tls_mode.map(|m| match m {
                    TlsMode::Secure => "Secure",
                    TlsMode::Insecure => "Insecure",
                    _ => "Other",
                }),
            )
            .finish()
    }
}