Skip to main content

redis_sentinel_pool/
manager.rs

1//! bb8 用 [`ManageConnection`](bb8::ManageConnection) 实现。
2//!
3//! 通过共享的 [`SentinelClient`] 创建到 master 的 `MultiplexedConnection`,
4//! 并在借出时通过 epoch + PING(可选 ROLE)做健康检查。当后台 watcher
5//! 收到 `+switch-master` 事件后会调用 [`SentinelConnectionManager::bump_epoch`]
6//! 让池里所有旧连接立即作废,下一次借连接时就会重新向 Sentinel 解析新的 master。
7
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10
11use redis::aio::MultiplexedConnection;
12use redis::sentinel::{SentinelClient, SentinelServerType};
13use redis::{AsyncCommands, Value, cmd};
14use tokio::sync::Mutex;
15use tracing::{debug, warn};
16
17use crate::config::{ServerRole, SentinelPoolConfig};
18use crate::error::{Result, SentinelPoolError};
19
20/// 池中实际持有的连接。
21///
22/// 用 epoch 标记它"出生时"的 master 版本号;一旦后台 watcher 检测到 master
23/// 变更并 [`bump_epoch`](SentinelConnectionManager::bump_epoch),所有旧连接的
24/// epoch 都会小于当前 epoch,从而在 `has_broken` / `is_valid` 中被识别为失效。
25pub struct PooledConnection {
26    pub(crate) inner: MultiplexedConnection,
27    epoch: u64,
28}
29
30impl AsRef<MultiplexedConnection> for PooledConnection {
31    fn as_ref(&self) -> &MultiplexedConnection {
32        &self.inner
33    }
34}
35
36impl AsMut<MultiplexedConnection> for PooledConnection {
37    fn as_mut(&mut self) -> &mut MultiplexedConnection {
38        &mut self.inner
39    }
40}
41
42impl std::ops::Deref for PooledConnection {
43    type Target = MultiplexedConnection;
44    fn deref(&self) -> &Self::Target {
45        &self.inner
46    }
47}
48
49impl std::ops::DerefMut for PooledConnection {
50    fn deref_mut(&mut self) -> &mut Self::Target {
51        &mut self.inner
52    }
53}
54
55/// bb8 的 [`bb8::ManageConnection`] 实现。
56///
57/// 内部持有 `Arc<Mutex<SentinelClient>>`,因为 `SentinelClient::async_get_connection`
58/// 需要 `&mut self`。建立新连接的时间通常是毫秒级,所以串行化并不会成为热路径瓶颈。
59#[derive(Clone)]
60pub struct SentinelConnectionManager {
61    client: Arc<Mutex<SentinelClient>>,
62    pub(crate) config: SentinelPoolConfig,
63    pub(crate) epoch: Arc<AtomicU64>,
64}
65
66impl SentinelConnectionManager {
67    /// 基于 [`SentinelPoolConfig`] 构造一个新的 manager。
68    pub fn new(config: SentinelPoolConfig) -> Result<Self> {
69        if config.sentinels.is_empty() {
70            return Err(SentinelPoolError::Config(
71                "at least one sentinel address is required".into(),
72            ));
73        }
74        if config.service_name.is_empty() {
75            return Err(SentinelPoolError::Config(
76                "service_name must not be empty".into(),
77            ));
78        }
79
80        let server_type = match config.role {
81            ServerRole::Master => SentinelServerType::Master,
82            ServerRole::Replica => SentinelServerType::Replica,
83        };
84        let node_info = config.build_node_connection_info();
85
86        let client = SentinelClient::build(
87            config.sentinels.clone(),
88            config.service_name.clone(),
89            Some(node_info),
90            server_type,
91        )?;
92
93        Ok(Self {
94            client: Arc::new(Mutex::new(client)),
95            config,
96            epoch: Arc::new(AtomicU64::new(0)),
97        })
98    }
99
100    /// 让所有当前在池里的连接立刻被识别为"已坏"。
101    ///
102    /// 后台 watcher 收到 `+switch-master` 事件时会调用本方法。也可以
103    /// 在业务代码里手动调用,例如收到自定义运维信号时。
104    pub fn bump_epoch(&self) -> u64 {
105        let new_epoch = self.epoch.fetch_add(1, Ordering::AcqRel) + 1;
106        debug!(new_epoch, "sentinel manager epoch bumped");
107        new_epoch
108    }
109
110    /// 当前 epoch。
111    pub fn current_epoch(&self) -> u64 {
112        self.epoch.load(Ordering::Acquire)
113    }
114
115    /// 共享的 [`SentinelClient`],主要用于 watcher 拿一条到 sentinel 自身的连接。
116    pub(crate) fn shared_client(&self) -> Arc<Mutex<SentinelClient>> {
117        Arc::clone(&self.client)
118    }
119}
120
121// bb8 0.9 的 trait 用 `-> impl Future + Send`,直接写 `async fn` 没办法附加
122// `+ Send` 约束(RTN 还未稳定),所以这里手动 desugar;clippy 的
123// manual_async_fn 在此场景必须放行。
124#[allow(clippy::manual_async_fn)]
125impl bb8::ManageConnection for SentinelConnectionManager {
126    type Connection = PooledConnection;
127    type Error = SentinelPoolError;
128
129    fn connect(
130        &self,
131    ) -> impl std::future::Future<Output = Result<Self::Connection>> + Send {
132        async move {
133            let conn = {
134                let mut client = self.client.lock().await;
135                client.get_async_connection().await?
136            };
137            let epoch = self.current_epoch();
138            debug!(epoch, "created new sentinel-aware connection");
139            Ok(PooledConnection { inner: conn, epoch })
140        }
141    }
142
143    fn is_valid(
144        &self,
145        conn: &mut Self::Connection,
146    ) -> impl std::future::Future<Output = Result<()>> + Send {
147        async move {
148            // 1. epoch 已落后,直接淘汰
149            let current = self.current_epoch();
150            if conn.epoch < current {
151                return Err(SentinelPoolError::Pool(format!(
152                    "connection epoch {} is older than current {} (master changed)",
153                    conn.epoch, current
154                )));
155            }
156
157            // 2. PING 心跳
158            let pong: String = cmd("PING").query_async(&mut conn.inner).await?;
159            if pong != "PONG" {
160                return Err(SentinelPoolError::Pool(format!(
161                    "unexpected PING reply: {pong}"
162                )));
163            }
164
165            // 3. 角色校验(可选):识别"TCP 还活着但 master 已经被 demote 成 replica"
166            if self.config.verify_role_on_checkout && self.config.role == ServerRole::Master {
167                let role: Value = cmd("ROLE").query_async(&mut conn.inner).await?;
168                if !is_master_role(&role) {
169                    warn!("connection no longer points to a master, dropping");
170                    self.bump_epoch();
171                    return Err(SentinelPoolError::Pool(
172                        "connection is no longer master".into(),
173                    ));
174                }
175            }
176
177            Ok(())
178        }
179    }
180
181    fn has_broken(&self, conn: &mut Self::Connection) -> bool {
182        conn.epoch < self.current_epoch()
183    }
184}
185
186/// 让 [`AsyncCommands`] 在 [`PooledConnection`] 上可用,方便 doc 测试不报 unused import。
187#[allow(dead_code)]
188fn _assert_async_commands_impl<T: AsyncCommands>(_: &T) {}
189
190/// 解析 `ROLE` 命令的返回,判断当前节点是否为 master。
191///
192/// RESP 形式:`["master", <repl-offset>, [...replicas...]]`
193fn is_master_role(value: &Value) -> bool {
194    if let Value::Array(items) = value {
195        if let Some(first) = items.first() {
196            return matches!(
197                first,
198                Value::BulkString(s) if s.eq_ignore_ascii_case(b"master")
199            ) || matches!(
200                first,
201                Value::SimpleString(s) if s.eq_ignore_ascii_case("master")
202            );
203        }
204    }
205    false
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    #[test]
213    fn detects_master_role() {
214        let value = Value::Array(vec![
215            Value::BulkString(b"master".to_vec()),
216            Value::Int(0),
217            Value::Array(vec![]),
218        ]);
219        assert!(is_master_role(&value));
220    }
221
222    #[test]
223    fn rejects_replica_role() {
224        let value = Value::Array(vec![
225            Value::BulkString(b"slave".to_vec()),
226            Value::BulkString(b"127.0.0.1".to_vec()),
227            Value::Int(6379),
228        ]);
229        assert!(!is_master_role(&value));
230    }
231}