redis-sentinel-pool 0.1.0

An async Redis Sentinel-aware connection pool built on top of redis-rs and bb8, with transparent master failover.
Documentation
//! Sentinel 感知的高阶连接池。
//!
//! [`SentinelPool`] 是给业务方使用的入口类型,背后是一个 `bb8::Pool<SentinelConnectionManager>`,
//! 加上:
//! 1. 一个可选的后台 watcher(订阅 `+switch-master` 事件),实现"master 切换的瞬间作废所有连接";
//! 2. [`SentinelPool::execute`] —— 自动重试的执行包装;
//! 3. 透明的连接借用接口,让上层和裸 `MultiplexedConnection` 一样使用。

use std::sync::Arc;
use std::time::Duration;

use bb8::{Pool, PooledConnection as Bb8Pooled};
use redis::aio::MultiplexedConnection;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tracing::{debug, warn};

use crate::config::SentinelPoolConfig;
use crate::error::{Result, SentinelPoolError};
use crate::manager::{PooledConnection, SentinelConnectionManager};
use crate::watcher::spawn_watcher;

/// Sentinel 感知的异步连接池。
///
/// 内部封装 `bb8::Pool`,并暴露常用的便捷方法。`SentinelPool` 是 `Clone` 的,
/// 克隆开销极低(只克隆几个 `Arc`),适合在不同 task 之间随手传递。
#[derive(Clone)]
pub struct SentinelPool {
    inner: Arc<Inner>,
}

struct Inner {
    pool: Pool<SentinelConnectionManager>,
    manager: SentinelConnectionManager,
    config: SentinelPoolConfig,
    watcher_handle: parking_lot::Mutex<Option<JoinHandle<()>>>,
    shutdown: Arc<Notify>,
}

impl SentinelPool {
    /// 基于配置构建连接池。
    ///
    /// 该方法是 `async` 的,因为 `bb8::Pool::build` 在 `min_idle > 0` 时
    /// 会立刻预热相应数量的连接。
    pub async fn new(config: SentinelPoolConfig) -> Result<Self> {
        let manager = SentinelConnectionManager::new(config.clone())?;

        let mut builder = Pool::builder()
            .max_size(config.max_size)
            .connection_timeout(config.connection_timeout);

        if let Some(min_idle) = config.min_idle {
            builder = builder.min_idle(Some(min_idle));
        }
        builder = builder.idle_timeout(config.idle_timeout);
        builder = builder.max_lifetime(config.max_lifetime);

        let pool = builder
            .build(manager.clone())
            .await
            .map_err(|e| SentinelPoolError::Pool(format!("failed to build pool: {e}")))?;

        let shutdown = Arc::new(Notify::new());
        let watcher_handle = if config.enable_watcher {
            Some(spawn_watcher(manager.clone(), Arc::clone(&shutdown)))
        } else {
            None
        };

        Ok(Self {
            inner: Arc::new(Inner {
                pool,
                manager,
                config,
                watcher_handle: parking_lot::Mutex::new(watcher_handle),
                shutdown,
            }),
        })
    }

    /// 当前配置的只读视图。
    pub fn config(&self) -> &SentinelPoolConfig {
        &self.inner.config
    }

    /// 暴露底层 `bb8::Pool`,用于细粒度操作(如 `state()` 监控)。
    pub fn inner_pool(&self) -> &Pool<SentinelConnectionManager> {
        &self.inner.pool
    }

    /// 当前的 master epoch;每次 failover 后会递增。
    pub fn epoch(&self) -> u64 {
        self.inner.manager.current_epoch()
    }

    /// 主动让池里所有连接立刻失效。
    ///
    /// 通常你不需要调用它 —— 后台 watcher 会自动处理。但例如在做演练、压测,
    /// 或者收到自己运维系统的信号时,可以手动调用以触发刷新。
    pub fn force_refresh(&self) -> u64 {
        self.inner.manager.bump_epoch()
    }

    /// 从池中借一条到 **当前 master** 的连接。
    ///
    /// 返回的 [`ConnectionLease`] 在 drop 时自动归还连接。如果归还前底层
    /// 连接已经损坏,会被自动丢弃,下次借用时由 manager 重新建立。
    pub async fn get(&self) -> Result<ConnectionLease<'_>> {
        let inner = self.inner.pool.get().await.map_err(map_bb8_run_error)?;
        Ok(ConnectionLease { inner })
    }

    /// 在一条新借来的连接上执行闭包,并按配置自动重试。
    ///
    /// 适合"业务侧根本不想关心 failover"的场景。重试逻辑:
    ///
    /// 1. 调用 [`SentinelPool::get`] 拿连接;
    /// 2. 执行闭包,成功就返回;
    /// 3. 失败且错误**看起来像连接级错误**(IO / 已断开 / READONLY 等)时,
    ///    主动 [`force_refresh`](Self::force_refresh) 后重试;
    /// 4. 重试次数耗尽返回 [`SentinelPoolError::RetryExhausted`]。
    pub async fn execute<F, Fut, T>(&self, mut op: F) -> Result<T>
    where
        F: FnMut(MultiplexedConnection) -> Fut,
        Fut: std::future::Future<Output = std::result::Result<T, redis::RedisError>>,
    {
        let max_retries = self.inner.config.max_retries.max(1);
        let backoff = self.inner.config.retry_backoff;
        let mut last_err: Option<redis::RedisError> = None;

        for attempt in 1..=max_retries {
            let mut lease = match self.get().await {
                Ok(l) => l,
                Err(SentinelPoolError::Redis(e)) => {
                    last_err = Some(e);
                    sleep_for(backoff * attempt).await;
                    continue;
                }
                Err(other) => return Err(other),
            };

            let conn_clone = lease.connection().clone();
            match op(conn_clone).await {
                Ok(value) => return Ok(value),
                Err(err) => {
                    if is_retryable(&err) {
                        warn!(?err, attempt, "redis op failed, will retry after refresh");
                        self.force_refresh();
                        drop(lease);
                        last_err = Some(err);
                        sleep_for(backoff * attempt).await;
                        continue;
                    } else {
                        return Err(err.into());
                    }
                }
            }
        }

        Err(SentinelPoolError::RetryExhausted {
            attempts: max_retries,
            source: last_err.unwrap_or_else(|| {
                redis::RedisError::from((redis::ErrorKind::Io, "unknown failure"))
            }),
        })
    }

    /// 优雅关闭:停止后台 watcher 并等待退出。
    ///
    /// 池本身在最后一个 `SentinelPool` 实例被 drop 时由 `bb8` 内部释放。
    pub async fn shutdown(self) {
        self.inner.shutdown.notify_waiters();
        let handle = self.inner.watcher_handle.lock().take();
        if let Some(h) = handle {
            if let Err(e) = h.await {
                debug!(?e, "watcher join error");
            }
        }
    }
}

/// 一次"借用"。drop 时连接会归还给池子。
pub struct ConnectionLease<'a> {
    inner: Bb8Pooled<'a, SentinelConnectionManager>,
}

impl<'a> ConnectionLease<'a> {
    /// 直接拿到 `&mut MultiplexedConnection`,可以像普通 redis 连接一样使用。
    pub fn connection(&mut self) -> &mut MultiplexedConnection {
        self.inner.as_mut()
    }

    /// 借用对应的池化包装(包含 epoch 信息)。
    pub fn pooled(&mut self) -> &mut PooledConnection {
        &mut self.inner
    }
}

impl<'a> std::ops::Deref for ConnectionLease<'a> {
    type Target = MultiplexedConnection;
    fn deref(&self) -> &Self::Target {
        self.inner.as_ref()
    }
}

impl<'a> std::ops::DerefMut for ConnectionLease<'a> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        self.inner.as_mut()
    }
}

fn map_bb8_run_error(err: bb8::RunError<SentinelPoolError>) -> SentinelPoolError {
    match err {
        bb8::RunError::User(e) => e,
        bb8::RunError::TimedOut => SentinelPoolError::Pool("checkout timed out".into()),
    }
}

/// 判断一个 RedisError 是否值得"换一条新连接重试"。
///
/// 直接复用 redis-rs 内部的 `retry_method()`,并把 IO 类问题一并视作可重试:
///
/// * `Reconnect` / `ReconnectFromInitialConnections` —— 连接级错误,正是 failover 时的典型表现;
/// * `WaitAndRetry` / `RetryImmediately` —— Redis 主动告诉客户端"短暂错误,重试即可"
///   (例如 master 选举期间);
/// * `is_io_error` / `is_connection_dropped` —— 底层 socket 类错误;
/// * `AskRedirect` / `MovedRedirect` —— Cluster 重定向,sentinel 模式下基本不会出现,
///   保守地不视为可重试(让上层感知到拓扑配置不对)。
fn is_retryable(err: &redis::RedisError) -> bool {
    if err.is_io_error() || err.is_connection_dropped() || err.is_connection_refusal() {
        return true;
    }
    matches!(
        err.retry_method(),
        redis::RetryMethod::Reconnect
            | redis::RetryMethod::ReconnectFromInitialConnections
            | redis::RetryMethod::RetryImmediately
            | redis::RetryMethod::WaitAndRetry
    )
}

async fn sleep_for(d: Duration) {
    if d.is_zero() {
        tokio::task::yield_now().await;
    } else {
        tokio::time::sleep(d).await;
    }
}