redis-sentinel-pool 0.1.0

An async Redis Sentinel-aware connection pool built on top of redis-rs and bb8, with transparent master failover.
Documentation
//! 后台 watcher:订阅 sentinel 的 `+switch-master` Pub/Sub 通道。
//!
//! 一旦发现当前服务名的 master 发生变更,就调用
//! [`SentinelConnectionManager::bump_epoch`] 让池里的旧连接立即失效,
//! 下一次借连接时会触发 [`SentinelClient::async_get_connection`] 重新解析。
//!
//! 注意:watcher 是"加速器"而非"必需品"。即便没有 watcher,
//! 池里已断的连接也会在第一次失败时被回收,新连接会自动指向新 master。

use std::sync::Arc;
use std::time::Duration;

use futures_util::StreamExt;
use redis::Msg;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};

use crate::manager::SentinelConnectionManager;

const SWITCH_MASTER_CHANNEL: &str = "+switch-master";

/// 启动一个后台 task 来监听 `+switch-master` 事件。
///
/// 返回的 `JoinHandle` 一般无需关心,调用 [`crate::SentinelPool::shutdown`]
/// 即可优雅终止。
pub(crate) fn spawn_watcher(
    manager: SentinelConnectionManager,
    shutdown: Arc<Notify>,
) -> JoinHandle<()> {
    tokio::spawn(async move {
        let service_name = manager.config.service_name.clone();
        let backoff = manager.config.watcher_reconnect_backoff;

        info!(%service_name, "sentinel watcher started");
        loop {
            tokio::select! {
                _ = shutdown.notified() => {
                    info!("sentinel watcher received shutdown signal");
                    break;
                }
                res = run_once(&manager, &shutdown) => {
                    if let Err(err) = res {
                        warn!(?err, "watcher loop errored, will retry");
                    }
                    if wait_or_shutdown(&shutdown, backoff).await {
                        info!("sentinel watcher exiting after error");
                        break;
                    }
                }
            }
        }
        info!("sentinel watcher stopped");
    })
}

async fn wait_or_shutdown(shutdown: &Arc<Notify>, dur: Duration) -> bool {
    if dur.is_zero() {
        return false;
    }
    tokio::select! {
        _ = shutdown.notified() => true,
        _ = tokio::time::sleep(dur) => false,
    }
}

async fn run_once(
    manager: &SentinelConnectionManager,
    shutdown: &Arc<Notify>,
) -> Result<(), redis::RedisError> {
    let target_service = manager.config.service_name.clone();

    // 拿一个到 sentinel 节点的 Client(SentinelClient 会选第一个能响应 ROLE=sentinel 的节点)
    let sentinel_client = {
        let mut client = manager.shared_client().lock_owned().await;
        client.async_get_sentinel_client().await?
    };

    let mut pubsub = sentinel_client.get_async_pubsub().await?;
    pubsub.subscribe(SWITCH_MASTER_CHANNEL).await?;
    debug!(channel = SWITCH_MASTER_CHANNEL, "subscribed");

    let mut stream = pubsub.on_message();
    loop {
        tokio::select! {
            _ = shutdown.notified() => {
                debug!("shutdown received inside pubsub loop");
                return Ok(());
            }
            msg = stream.next() => {
                match msg {
                    Some(m) => handle_message(m, &target_service, manager),
                    None => {
                        // 流结束(多半是底层连接被断开),返回让外层重连
                        return Err(redis::RedisError::from((
                            redis::ErrorKind::Io,
                            "sentinel pubsub stream closed",
                        )));
                    }
                }
            }
        }
    }
}

fn handle_message(msg: Msg, target_service: &str, manager: &SentinelConnectionManager) {
    // +switch-master 消息体: "<master-name> <old-ip> <old-port> <new-ip> <new-port>"
    let payload: String = match msg.get_payload() {
        Ok(p) => p,
        Err(err) => {
            warn!(?err, "failed to decode +switch-master payload");
            return;
        }
    };
    let mut parts = payload.split_whitespace();
    let name = parts.next().unwrap_or_default();
    if name == target_service {
        info!(payload = %payload, "+switch-master matched, bumping epoch");
        manager.bump_epoch();
    } else {
        debug!(payload = %payload, "+switch-master for another service, ignored");
    }
}