redis-sentinel-pool 0.1.0

An async Redis Sentinel-aware connection pool built on top of redis-rs and bb8, with transparent master failover.
Documentation
//! bb8 用 [`ManageConnection`](bb8::ManageConnection) 实现。
//!
//! 通过共享的 [`SentinelClient`] 创建到 master 的 `MultiplexedConnection`,
//! 并在借出时通过 epoch + PING(可选 ROLE)做健康检查。当后台 watcher
//! 收到 `+switch-master` 事件后会调用 [`SentinelConnectionManager::bump_epoch`]
//! 让池里所有旧连接立即作废,下一次借连接时就会重新向 Sentinel 解析新的 master。

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

use redis::aio::MultiplexedConnection;
use redis::sentinel::{SentinelClient, SentinelServerType};
use redis::{AsyncCommands, Value, cmd};
use tokio::sync::Mutex;
use tracing::{debug, warn};

use crate::config::{ServerRole, SentinelPoolConfig};
use crate::error::{Result, SentinelPoolError};

/// 池中实际持有的连接。
///
/// 用 epoch 标记它"出生时"的 master 版本号;一旦后台 watcher 检测到 master
/// 变更并 [`bump_epoch`](SentinelConnectionManager::bump_epoch),所有旧连接的
/// epoch 都会小于当前 epoch,从而在 `has_broken` / `is_valid` 中被识别为失效。
pub struct PooledConnection {
    pub(crate) inner: MultiplexedConnection,
    epoch: u64,
}

impl AsRef<MultiplexedConnection> for PooledConnection {
    fn as_ref(&self) -> &MultiplexedConnection {
        &self.inner
    }
}

impl AsMut<MultiplexedConnection> for PooledConnection {
    fn as_mut(&mut self) -> &mut MultiplexedConnection {
        &mut self.inner
    }
}

impl std::ops::Deref for PooledConnection {
    type Target = MultiplexedConnection;
    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

impl std::ops::DerefMut for PooledConnection {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.inner
    }
}

/// bb8 的 [`bb8::ManageConnection`] 实现。
///
/// 内部持有 `Arc<Mutex<SentinelClient>>`,因为 `SentinelClient::async_get_connection`
/// 需要 `&mut self`。建立新连接的时间通常是毫秒级,所以串行化并不会成为热路径瓶颈。
#[derive(Clone)]
pub struct SentinelConnectionManager {
    client: Arc<Mutex<SentinelClient>>,
    pub(crate) config: SentinelPoolConfig,
    pub(crate) epoch: Arc<AtomicU64>,
}

impl SentinelConnectionManager {
    /// 基于 [`SentinelPoolConfig`] 构造一个新的 manager。
    pub fn new(config: SentinelPoolConfig) -> Result<Self> {
        if config.sentinels.is_empty() {
            return Err(SentinelPoolError::Config(
                "at least one sentinel address is required".into(),
            ));
        }
        if config.service_name.is_empty() {
            return Err(SentinelPoolError::Config(
                "service_name must not be empty".into(),
            ));
        }

        let server_type = match config.role {
            ServerRole::Master => SentinelServerType::Master,
            ServerRole::Replica => SentinelServerType::Replica,
        };
        let node_info = config.build_node_connection_info();

        let client = SentinelClient::build(
            config.sentinels.clone(),
            config.service_name.clone(),
            Some(node_info),
            server_type,
        )?;

        Ok(Self {
            client: Arc::new(Mutex::new(client)),
            config,
            epoch: Arc::new(AtomicU64::new(0)),
        })
    }

    /// 让所有当前在池里的连接立刻被识别为"已坏"。
    ///
    /// 后台 watcher 收到 `+switch-master` 事件时会调用本方法。也可以
    /// 在业务代码里手动调用,例如收到自定义运维信号时。
    pub fn bump_epoch(&self) -> u64 {
        let new_epoch = self.epoch.fetch_add(1, Ordering::AcqRel) + 1;
        debug!(new_epoch, "sentinel manager epoch bumped");
        new_epoch
    }

    /// 当前 epoch。
    pub fn current_epoch(&self) -> u64 {
        self.epoch.load(Ordering::Acquire)
    }

    /// 共享的 [`SentinelClient`],主要用于 watcher 拿一条到 sentinel 自身的连接。
    pub(crate) fn shared_client(&self) -> Arc<Mutex<SentinelClient>> {
        Arc::clone(&self.client)
    }
}

// bb8 0.9 的 trait 用 `-> impl Future + Send`,直接写 `async fn` 没办法附加
// `+ Send` 约束(RTN 还未稳定),所以这里手动 desugar;clippy 的
// manual_async_fn 在此场景必须放行。
#[allow(clippy::manual_async_fn)]
impl bb8::ManageConnection for SentinelConnectionManager {
    type Connection = PooledConnection;
    type Error = SentinelPoolError;

    fn connect(
        &self,
    ) -> impl std::future::Future<Output = Result<Self::Connection>> + Send {
        async move {
            let conn = {
                let mut client = self.client.lock().await;
                client.get_async_connection().await?
            };
            let epoch = self.current_epoch();
            debug!(epoch, "created new sentinel-aware connection");
            Ok(PooledConnection { inner: conn, epoch })
        }
    }

    fn is_valid(
        &self,
        conn: &mut Self::Connection,
    ) -> impl std::future::Future<Output = Result<()>> + Send {
        async move {
            // 1. epoch 已落后,直接淘汰
            let current = self.current_epoch();
            if conn.epoch < current {
                return Err(SentinelPoolError::Pool(format!(
                    "connection epoch {} is older than current {} (master changed)",
                    conn.epoch, current
                )));
            }

            // 2. PING 心跳
            let pong: String = cmd("PING").query_async(&mut conn.inner).await?;
            if pong != "PONG" {
                return Err(SentinelPoolError::Pool(format!(
                    "unexpected PING reply: {pong}"
                )));
            }

            // 3. 角色校验(可选):识别"TCP 还活着但 master 已经被 demote 成 replica"
            if self.config.verify_role_on_checkout && self.config.role == ServerRole::Master {
                let role: Value = cmd("ROLE").query_async(&mut conn.inner).await?;
                if !is_master_role(&role) {
                    warn!("connection no longer points to a master, dropping");
                    self.bump_epoch();
                    return Err(SentinelPoolError::Pool(
                        "connection is no longer master".into(),
                    ));
                }
            }

            Ok(())
        }
    }

    fn has_broken(&self, conn: &mut Self::Connection) -> bool {
        conn.epoch < self.current_epoch()
    }
}

/// 让 [`AsyncCommands`] 在 [`PooledConnection`] 上可用,方便 doc 测试不报 unused import。
#[allow(dead_code)]
fn _assert_async_commands_impl<T: AsyncCommands>(_: &T) {}

/// 解析 `ROLE` 命令的返回,判断当前节点是否为 master。
///
/// RESP 形式:`["master", <repl-offset>, [...replicas...]]`
fn is_master_role(value: &Value) -> bool {
    if let Value::Array(items) = value {
        if let Some(first) = items.first() {
            return matches!(
                first,
                Value::BulkString(s) if s.eq_ignore_ascii_case(b"master")
            ) || matches!(
                first,
                Value::SimpleString(s) if s.eq_ignore_ascii_case("master")
            );
        }
    }
    false
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn detects_master_role() {
        let value = Value::Array(vec![
            Value::BulkString(b"master".to_vec()),
            Value::Int(0),
            Value::Array(vec![]),
        ]);
        assert!(is_master_role(&value));
    }

    #[test]
    fn rejects_replica_role() {
        let value = Value::Array(vec![
            Value::BulkString(b"slave".to_vec()),
            Value::BulkString(b"127.0.0.1".to_vec()),
            Value::Int(6379),
        ]);
        assert!(!is_master_role(&value));
    }
}