deadpool_redis/cluster/
mod.rs

1//! This module extends the library to support Redis Cluster.
2mod config;
3
4use std::{
5    ops::{Deref, DerefMut},
6    sync::atomic::{AtomicUsize, Ordering},
7};
8
9use deadpool::managed;
10use redis::{aio::ConnectionLike, IntoConnectionInfo, RedisError, RedisResult};
11
12use redis;
13pub use redis::cluster::{ClusterClient, ClusterClientBuilder};
14pub use redis::cluster_async::ClusterConnection;
15
16pub use self::config::{Config, ConfigError};
17
18pub use deadpool::managed::reexports::*;
19deadpool::managed_reexports!(
20    "redis_cluster",
21    Manager,
22    Connection,
23    RedisError,
24    ConfigError
25);
26
27type RecycleResult = managed::RecycleResult<RedisError>;
28
29/// Wrapper around [`redis::cluster_async::ClusterConnection`].
30///
31/// This structure implements [`redis::aio::ConnectionLike`] and can therefore
32/// be used just like a regular [`redis::cluster_async::ClusterConnection`].
33#[allow(missing_debug_implementations)] // `redis::cluster_async::ClusterConnection: !Debug`
34pub struct Connection {
35    conn: Object,
36}
37
38impl Connection {
39    /// Takes this [`Connection`] from its [`Pool`] permanently.
40    ///
41    /// This reduces the size of the [`Pool`].
42    #[must_use]
43    pub fn take(this: Self) -> ClusterConnection {
44        Object::take(this.conn)
45    }
46}
47
48impl From<Object> for Connection {
49    fn from(conn: Object) -> Self {
50        Self { conn }
51    }
52}
53
54impl Deref for Connection {
55    type Target = ClusterConnection;
56
57    fn deref(&self) -> &ClusterConnection {
58        &self.conn
59    }
60}
61
62impl DerefMut for Connection {
63    fn deref_mut(&mut self) -> &mut ClusterConnection {
64        &mut self.conn
65    }
66}
67
68impl AsRef<ClusterConnection> for Connection {
69    fn as_ref(&self) -> &ClusterConnection {
70        &self.conn
71    }
72}
73
74impl AsMut<ClusterConnection> for Connection {
75    fn as_mut(&mut self) -> &mut ClusterConnection {
76        &mut self.conn
77    }
78}
79
80impl ConnectionLike for Connection {
81    fn req_packed_command<'a>(
82        &'a mut self,
83        cmd: &'a redis::Cmd,
84    ) -> redis::RedisFuture<'a, redis::Value> {
85        self.conn.req_packed_command(cmd)
86    }
87
88    fn req_packed_commands<'a>(
89        &'a mut self,
90        cmd: &'a redis::Pipeline,
91        offset: usize,
92        count: usize,
93    ) -> redis::RedisFuture<'a, Vec<redis::Value>> {
94        self.conn.req_packed_commands(cmd, offset, count)
95    }
96
97    fn get_db(&self) -> i64 {
98        self.conn.get_db()
99    }
100}
101
102/// [`Manager`] for creating and recycling [`redis::cluster_async`] connections.
103///
104/// [`Manager`]: managed::Manager
105pub struct Manager {
106    client: ClusterClient,
107    ping_number: AtomicUsize,
108}
109
110// `redis::cluster_async::ClusterClient: !Debug`
111impl std::fmt::Debug for Manager {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("Manager")
114            .field("client", &format!("{:p}", &self.client))
115            .field("ping_number", &self.ping_number)
116            .finish()
117    }
118}
119
120impl Manager {
121    /// Creates a new [`Manager`] from the given `params`.
122    ///
123    /// # Errors
124    ///
125    /// If establishing a new [`ClusterClientBuilder`] fails.
126    pub fn new<T: IntoConnectionInfo>(
127        params: Vec<T>,
128        read_from_replicas: bool,
129    ) -> RedisResult<Self> {
130        let mut client = ClusterClientBuilder::new(params);
131        if read_from_replicas {
132            client = client.read_from_replicas();
133        }
134        Ok(Self {
135            client: client.build()?,
136            ping_number: AtomicUsize::new(0),
137        })
138    }
139}
140
141impl managed::Manager for Manager {
142    type Type = ClusterConnection;
143    type Error = RedisError;
144
145    async fn create(&self) -> Result<ClusterConnection, RedisError> {
146        let conn = self.client.get_async_connection().await?;
147        Ok(conn)
148    }
149
150    async fn recycle(&self, conn: &mut ClusterConnection, _: &Metrics) -> RecycleResult {
151        let ping_number = self.ping_number.fetch_add(1, Ordering::Relaxed).to_string();
152        let n = redis::cmd("PING")
153            .arg(&ping_number)
154            .query_async::<String>(conn)
155            .await?;
156        if n == ping_number {
157            Ok(())
158        } else {
159            Err(managed::RecycleError::message("Invalid PING response"))
160        }
161    }
162}