deadpool_redis/sentinel/
mod.rs1use std::{
3 ops::{Deref, DerefMut},
4 sync::atomic::{AtomicUsize, Ordering},
5};
6
7use redis;
8use redis::aio::MultiplexedConnection;
9use redis::sentinel::SentinelClient;
10use redis::{aio::ConnectionLike, IntoConnectionInfo, RedisError, RedisResult};
11use tokio::sync::Mutex;
12
13use deadpool::managed;
14pub use deadpool::managed::reexports::*;
15
16pub use crate::sentinel::config::SentinelNodeConnectionInfo;
17pub use crate::sentinel::config::SentinelServerType;
18pub use crate::sentinel::config::TlsMode;
19
20pub use self::config::{Config, ConfigError};
21
22mod config;
23
24deadpool::managed_reexports!(
25 "redis_sentinel",
26 Manager,
27 Connection,
28 RedisError,
29 ConfigError
30);
31
32type RecycleResult = managed::RecycleResult<RedisError>;
33
34#[allow(missing_debug_implementations)] pub struct Connection {
40 conn: Object,
41}
42
43impl Connection {
44 #[must_use]
48 pub fn take(this: Self) -> MultiplexedConnection {
49 Object::take(this.conn)
50 }
51}
52
53impl From<Object> for Connection {
54 fn from(conn: Object) -> Self {
55 Self { conn }
56 }
57}
58
59impl Deref for Connection {
60 type Target = MultiplexedConnection;
61
62 fn deref(&self) -> &MultiplexedConnection {
63 &self.conn
64 }
65}
66
67impl DerefMut for Connection {
68 fn deref_mut(&mut self) -> &mut MultiplexedConnection {
69 &mut self.conn
70 }
71}
72
73impl AsRef<MultiplexedConnection> for Connection {
74 fn as_ref(&self) -> &MultiplexedConnection {
75 &self.conn
76 }
77}
78
79impl AsMut<MultiplexedConnection> for Connection {
80 fn as_mut(&mut self) -> &mut MultiplexedConnection {
81 &mut self.conn
82 }
83}
84
85impl ConnectionLike for Connection {
86 fn req_packed_command<'a>(
87 &'a mut self,
88 cmd: &'a redis::Cmd,
89 ) -> redis::RedisFuture<'a, redis::Value> {
90 self.conn.req_packed_command(cmd)
91 }
92
93 fn req_packed_commands<'a>(
94 &'a mut self,
95 cmd: &'a redis::Pipeline,
96 offset: usize,
97 count: usize,
98 ) -> redis::RedisFuture<'a, Vec<redis::Value>> {
99 self.conn.req_packed_commands(cmd, offset, count)
100 }
101
102 fn get_db(&self) -> i64 {
103 self.conn.get_db()
104 }
105}
106
107pub struct Manager {
111 client: Mutex<SentinelClient>,
112 ping_number: AtomicUsize,
113}
114
115impl std::fmt::Debug for Manager {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 f.debug_struct("Manager")
118 .field("client", &format!("{:p}", &self.client))
119 .field("ping_number", &self.ping_number)
120 .finish()
121 }
122}
123
124impl Manager {
125 pub fn new<T: IntoConnectionInfo>(
131 param: Vec<T>,
132 service_name: String,
133 node_connection_info: Option<SentinelNodeConnectionInfo>,
134 server_type: SentinelServerType,
135 ) -> RedisResult<Self> {
136 Ok(Self {
137 client: Mutex::new(SentinelClient::build(
138 param,
139 service_name,
140 node_connection_info.map(|i| i.into()),
141 server_type.into(),
142 )?),
143 ping_number: AtomicUsize::new(0),
144 })
145 }
146}
147
148impl managed::Manager for Manager {
149 type Type = MultiplexedConnection;
150 type Error = RedisError;
151
152 async fn create(&self) -> Result<MultiplexedConnection, RedisError> {
153 let mut client = self.client.lock().await;
154 let conn = client.get_async_connection().await?;
155 Ok(conn)
156 }
157
158 async fn recycle(&self, conn: &mut MultiplexedConnection, _: &Metrics) -> RecycleResult {
159 let ping_number = self.ping_number.fetch_add(1, Ordering::Relaxed).to_string();
160 let n = redis::cmd("PING")
161 .arg(&ping_number)
162 .query_async::<String>(conn)
163 .await?;
164 if n == ping_number {
165 Ok(())
166 } else {
167 Err(managed::RecycleError::message("Invalid PING response"))
168 }
169 }
170}