deadpool_redis_cluster/
lib.rs

1#![deprecated(since = "0.1.1", note="The functionality of `redis_cluster_async` has been merged into the 
2`redis` crate rendering this crate obsolete. Please use `deadpool-redis` instead.")]
3#![doc = include_str!("../README.md")]
4#![cfg_attr(docsrs, feature(doc_cfg))]
5#![deny(
6    nonstandard_style,
7    rust_2018_idioms,
8    rustdoc::broken_intra_doc_links,
9    rustdoc::private_intra_doc_links
10)]
11#![forbid(non_ascii_idents, unsafe_code)]
12#![warn(
13    deprecated_in_future,
14    missing_copy_implementations,
15    missing_debug_implementations,
16    missing_docs,
17    unreachable_pub,
18    unused_import_braces,
19    unused_labels,
20    unused_lifetimes,
21    unused_qualifications,
22    unused_results
23)]
24#![allow(clippy::uninlined_format_args)]
25
26mod config;
27
28use std::{
29    ops::{Deref, DerefMut},
30    sync::atomic::{AtomicUsize, Ordering},
31};
32
33use deadpool::{async_trait, managed};
34use redis::{aio::ConnectionLike, IntoConnectionInfo, RedisError, RedisResult};
35
36pub use redis;
37pub use redis_cluster_async::{Client, Connection as RedisConnection};
38
39pub use self::config::{Config, ConfigError};
40
41pub use deadpool::managed::reexports::*;
42deadpool::managed_reexports!(
43    "redis_cluster",
44    Manager,
45    Connection,
46    RedisError,
47    ConfigError
48);
49
50type RecycleResult = managed::RecycleResult<RedisError>;
51
52/// Wrapper around [`redis_cluster_async::Connection`].
53///
54/// This structure implements [`redis::aio::ConnectionLike`] and can therefore
55/// be used just like a regular [`redis_cluster_async::Connection`].
56#[allow(missing_debug_implementations)] // `redis_cluster_async::Connection: !Debug`
57pub struct Connection {
58    conn: Object,
59}
60
61impl Connection {
62    /// Takes this [`Connection`] from its [`Pool`] permanently.
63    ///
64    /// This reduces the size of the [`Pool`].
65    #[must_use]
66    pub fn take(this: Self) -> RedisConnection {
67        Object::take(this.conn)
68    }
69}
70
71impl From<Object> for Connection {
72    fn from(conn: Object) -> Self {
73        Self { conn }
74    }
75}
76
77impl Deref for Connection {
78    type Target = RedisConnection;
79
80    fn deref(&self) -> &RedisConnection {
81        &self.conn
82    }
83}
84
85impl DerefMut for Connection {
86    fn deref_mut(&mut self) -> &mut RedisConnection {
87        &mut self.conn
88    }
89}
90
91impl AsRef<RedisConnection> for Connection {
92    fn as_ref(&self) -> &RedisConnection {
93        &self.conn
94    }
95}
96
97impl AsMut<RedisConnection> for Connection {
98    fn as_mut(&mut self) -> &mut RedisConnection {
99        &mut self.conn
100    }
101}
102
103impl ConnectionLike for Connection {
104    fn req_packed_command<'a>(
105        &'a mut self,
106        cmd: &'a redis::Cmd,
107    ) -> redis::RedisFuture<'a, redis::Value> {
108        self.conn.req_packed_command(cmd)
109    }
110
111    fn req_packed_commands<'a>(
112        &'a mut self,
113        cmd: &'a redis::Pipeline,
114        offset: usize,
115        count: usize,
116    ) -> redis::RedisFuture<'a, Vec<redis::Value>> {
117        self.conn.req_packed_commands(cmd, offset, count)
118    }
119
120    fn get_db(&self) -> i64 {
121        self.conn.get_db()
122    }
123}
124
125/// [`Manager`] for creating and recycling [`redis_cluster_async`] connections.
126///
127/// [`Manager`]: managed::Manager
128pub struct Manager {
129    client: Client,
130    ping_number: AtomicUsize,
131}
132
133// `redis_cluster_async::Client: !Debug`
134impl std::fmt::Debug for Manager {
135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136        f.debug_struct("Manager")
137            .field("client", &format!("{:p}", &self.client))
138            .field("ping_number", &self.ping_number)
139            .finish()
140    }
141}
142
143impl Manager {
144    /// Creates a new [`Manager`] from the given `params`.
145    ///
146    /// # Errors
147    ///
148    /// If establishing a new [`Client`] fails.
149    pub fn new<T: IntoConnectionInfo>(params: Vec<T>) -> RedisResult<Self> {
150        Ok(Self {
151            client: Client::open(params)?,
152            ping_number: AtomicUsize::new(0),
153        })
154    }
155}
156
157#[async_trait]
158impl managed::Manager for Manager {
159    type Type = RedisConnection;
160    type Error = RedisError;
161
162    async fn create(&self) -> Result<RedisConnection, RedisError> {
163        let conn = self.client.get_connection().await?;
164        Ok(conn)
165    }
166
167    async fn recycle(&self, conn: &mut RedisConnection, _: &Metrics) -> RecycleResult {
168        let ping_number = self.ping_number.fetch_add(1, Ordering::Relaxed).to_string();
169        let n = redis::cmd("PING")
170            .arg(&ping_number)
171            .query_async::<_, String>(conn)
172            .await?;
173        if n == ping_number {
174            Ok(())
175        } else {
176            Err(managed::RecycleError::StaticMessage(
177                "Invalid PING response",
178            ))
179        }
180    }
181}