deadpool_redis/sentinel/
mod.rs

1//! This module extends the library to support Redis Cluster.
2use std::{
3    ops::{Deref, DerefMut},
4    sync::atomic::{AtomicUsize, Ordering},
5};
6
7use redis;
8use redis::aio::MultiplexedConnection;
9use redis::sentinel::SentinelClient;
10use redis::{aio::ConnectionLike, IntoConnectionInfo, RedisError, RedisResult};
11use tokio::sync::Mutex;
12
13use deadpool::managed;
14pub use deadpool::managed::reexports::*;
15
16pub use crate::sentinel::config::SentinelNodeConnectionInfo;
17pub use crate::sentinel::config::SentinelServerType;
18pub use crate::sentinel::config::TlsMode;
19
20pub use self::config::{Config, ConfigError};
21
22mod config;
23
24deadpool::managed_reexports!(
25    "redis_sentinel",
26    Manager,
27    Connection,
28    RedisError,
29    ConfigError
30);
31
32type RecycleResult = managed::RecycleResult<RedisError>;
33
34/// Wrapper around [`redis::aio::MultiplexedConnection`].
35///
36/// This structure implements [`redis::aio::ConnectionLike`] and can therefore
37/// be used just like a regular [`redis::aio::MultiplexedConnection`].
38#[allow(missing_debug_implementations)] // `redis::cluster_async::ClusterConnection: !Debug`
39pub struct Connection {
40    conn: Object,
41}
42
43impl Connection {
44    /// Takes this [`Connection`] from its [`Pool`] permanently.
45    ///
46    /// This reduces the size of the [`Pool`].
47    #[must_use]
48    pub fn take(this: Self) -> MultiplexedConnection {
49        Object::take(this.conn)
50    }
51}
52
53impl From<Object> for Connection {
54    fn from(conn: Object) -> Self {
55        Self { conn }
56    }
57}
58
59impl Deref for Connection {
60    type Target = MultiplexedConnection;
61
62    fn deref(&self) -> &MultiplexedConnection {
63        &self.conn
64    }
65}
66
67impl DerefMut for Connection {
68    fn deref_mut(&mut self) -> &mut MultiplexedConnection {
69        &mut self.conn
70    }
71}
72
73impl AsRef<MultiplexedConnection> for Connection {
74    fn as_ref(&self) -> &MultiplexedConnection {
75        &self.conn
76    }
77}
78
79impl AsMut<MultiplexedConnection> for Connection {
80    fn as_mut(&mut self) -> &mut MultiplexedConnection {
81        &mut self.conn
82    }
83}
84
85impl ConnectionLike for Connection {
86    fn req_packed_command<'a>(
87        &'a mut self,
88        cmd: &'a redis::Cmd,
89    ) -> redis::RedisFuture<'a, redis::Value> {
90        self.conn.req_packed_command(cmd)
91    }
92
93    fn req_packed_commands<'a>(
94        &'a mut self,
95        cmd: &'a redis::Pipeline,
96        offset: usize,
97        count: usize,
98    ) -> redis::RedisFuture<'a, Vec<redis::Value>> {
99        self.conn.req_packed_commands(cmd, offset, count)
100    }
101
102    fn get_db(&self) -> i64 {
103        self.conn.get_db()
104    }
105}
106
107/// [`Manager`] for creating and recycling [`redis::aio::MultiplexedConnection`] connections.
108///
109/// [`Manager`]: managed::Manager
110pub struct Manager {
111    client: Mutex<SentinelClient>,
112    ping_number: AtomicUsize,
113}
114
115impl std::fmt::Debug for Manager {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        f.debug_struct("Manager")
118            .field("client", &format!("{:p}", &self.client))
119            .field("ping_number", &self.ping_number)
120            .finish()
121    }
122}
123
124impl Manager {
125    /// Creates a new [`Manager`] from the given `params`.
126    ///
127    /// # Errors
128    ///
129    /// If establishing a new [`SentinelClient`] fails.
130    pub fn new<T: IntoConnectionInfo>(
131        param: Vec<T>,
132        service_name: String,
133        node_connection_info: Option<SentinelNodeConnectionInfo>,
134        server_type: SentinelServerType,
135    ) -> RedisResult<Self> {
136        Ok(Self {
137            client: Mutex::new(SentinelClient::build(
138                param,
139                service_name,
140                node_connection_info.map(|i| i.into()),
141                server_type.into(),
142            )?),
143            ping_number: AtomicUsize::new(0),
144        })
145    }
146}
147
148impl managed::Manager for Manager {
149    type Type = MultiplexedConnection;
150    type Error = RedisError;
151
152    async fn create(&self) -> Result<MultiplexedConnection, RedisError> {
153        let mut client = self.client.lock().await;
154        let conn = client.get_async_connection().await?;
155        Ok(conn)
156    }
157
158    async fn recycle(&self, conn: &mut MultiplexedConnection, _: &Metrics) -> RecycleResult {
159        let ping_number = self.ping_number.fetch_add(1, Ordering::Relaxed).to_string();
160        let n = redis::cmd("PING")
161            .arg(&ping_number)
162            .query_async::<String>(conn)
163            .await?;
164        if n == ping_number {
165            Ok(())
166        } else {
167            Err(managed::RecycleError::message("Invalid PING response"))
168        }
169    }
170}