redis_sentinel_pool/
manager.rs1use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10
11use redis::aio::MultiplexedConnection;
12use redis::sentinel::{SentinelClient, SentinelServerType};
13use redis::{AsyncCommands, Value, cmd};
14use tokio::sync::Mutex;
15use tracing::{debug, warn};
16
17use crate::config::{ServerRole, SentinelPoolConfig};
18use crate::error::{Result, SentinelPoolError};
19
20pub struct PooledConnection {
26 pub(crate) inner: MultiplexedConnection,
27 epoch: u64,
28}
29
30impl AsRef<MultiplexedConnection> for PooledConnection {
31 fn as_ref(&self) -> &MultiplexedConnection {
32 &self.inner
33 }
34}
35
36impl AsMut<MultiplexedConnection> for PooledConnection {
37 fn as_mut(&mut self) -> &mut MultiplexedConnection {
38 &mut self.inner
39 }
40}
41
42impl std::ops::Deref for PooledConnection {
43 type Target = MultiplexedConnection;
44 fn deref(&self) -> &Self::Target {
45 &self.inner
46 }
47}
48
49impl std::ops::DerefMut for PooledConnection {
50 fn deref_mut(&mut self) -> &mut Self::Target {
51 &mut self.inner
52 }
53}
54
55#[derive(Clone)]
60pub struct SentinelConnectionManager {
61 client: Arc<Mutex<SentinelClient>>,
62 pub(crate) config: SentinelPoolConfig,
63 pub(crate) epoch: Arc<AtomicU64>,
64}
65
66impl SentinelConnectionManager {
67 pub fn new(config: SentinelPoolConfig) -> Result<Self> {
69 if config.sentinels.is_empty() {
70 return Err(SentinelPoolError::Config(
71 "at least one sentinel address is required".into(),
72 ));
73 }
74 if config.service_name.is_empty() {
75 return Err(SentinelPoolError::Config(
76 "service_name must not be empty".into(),
77 ));
78 }
79
80 let server_type = match config.role {
81 ServerRole::Master => SentinelServerType::Master,
82 ServerRole::Replica => SentinelServerType::Replica,
83 };
84 let node_info = config.build_node_connection_info();
85
86 let client = SentinelClient::build(
87 config.sentinels.clone(),
88 config.service_name.clone(),
89 Some(node_info),
90 server_type,
91 )?;
92
93 Ok(Self {
94 client: Arc::new(Mutex::new(client)),
95 config,
96 epoch: Arc::new(AtomicU64::new(0)),
97 })
98 }
99
100 pub fn bump_epoch(&self) -> u64 {
105 let new_epoch = self.epoch.fetch_add(1, Ordering::AcqRel) + 1;
106 debug!(new_epoch, "sentinel manager epoch bumped");
107 new_epoch
108 }
109
110 pub fn current_epoch(&self) -> u64 {
112 self.epoch.load(Ordering::Acquire)
113 }
114
115 pub(crate) fn shared_client(&self) -> Arc<Mutex<SentinelClient>> {
117 Arc::clone(&self.client)
118 }
119}
120
121#[allow(clippy::manual_async_fn)]
125impl bb8::ManageConnection for SentinelConnectionManager {
126 type Connection = PooledConnection;
127 type Error = SentinelPoolError;
128
129 fn connect(
130 &self,
131 ) -> impl std::future::Future<Output = Result<Self::Connection>> + Send {
132 async move {
133 let conn = {
134 let mut client = self.client.lock().await;
135 client.get_async_connection().await?
136 };
137 let epoch = self.current_epoch();
138 debug!(epoch, "created new sentinel-aware connection");
139 Ok(PooledConnection { inner: conn, epoch })
140 }
141 }
142
143 fn is_valid(
144 &self,
145 conn: &mut Self::Connection,
146 ) -> impl std::future::Future<Output = Result<()>> + Send {
147 async move {
148 let current = self.current_epoch();
150 if conn.epoch < current {
151 return Err(SentinelPoolError::Pool(format!(
152 "connection epoch {} is older than current {} (master changed)",
153 conn.epoch, current
154 )));
155 }
156
157 let pong: String = cmd("PING").query_async(&mut conn.inner).await?;
159 if pong != "PONG" {
160 return Err(SentinelPoolError::Pool(format!(
161 "unexpected PING reply: {pong}"
162 )));
163 }
164
165 if self.config.verify_role_on_checkout && self.config.role == ServerRole::Master {
167 let role: Value = cmd("ROLE").query_async(&mut conn.inner).await?;
168 if !is_master_role(&role) {
169 warn!("connection no longer points to a master, dropping");
170 self.bump_epoch();
171 return Err(SentinelPoolError::Pool(
172 "connection is no longer master".into(),
173 ));
174 }
175 }
176
177 Ok(())
178 }
179 }
180
181 fn has_broken(&self, conn: &mut Self::Connection) -> bool {
182 conn.epoch < self.current_epoch()
183 }
184}
185
186#[allow(dead_code)]
188fn _assert_async_commands_impl<T: AsyncCommands>(_: &T) {}
189
190fn is_master_role(value: &Value) -> bool {
194 if let Value::Array(items) = value {
195 if let Some(first) = items.first() {
196 return matches!(
197 first,
198 Value::BulkString(s) if s.eq_ignore_ascii_case(b"master")
199 ) || matches!(
200 first,
201 Value::SimpleString(s) if s.eq_ignore_ascii_case("master")
202 );
203 }
204 }
205 false
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211
212 #[test]
213 fn detects_master_role() {
214 let value = Value::Array(vec![
215 Value::BulkString(b"master".to_vec()),
216 Value::Int(0),
217 Value::Array(vec![]),
218 ]);
219 assert!(is_master_role(&value));
220 }
221
222 #[test]
223 fn rejects_replica_role() {
224 let value = Value::Array(vec![
225 Value::BulkString(b"slave".to_vec()),
226 Value::BulkString(b"127.0.0.1".to_vec()),
227 Value::Int(6379),
228 ]);
229 assert!(!is_master_role(&value));
230 }
231}