deadpool_redis/
lib.rs

1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![deny(
4    nonstandard_style,
5    rust_2018_idioms,
6    rustdoc::broken_intra_doc_links,
7    rustdoc::private_intra_doc_links
8)]
9#![forbid(non_ascii_idents, unsafe_code)]
10#![warn(
11    deprecated_in_future,
12    missing_copy_implementations,
13    missing_debug_implementations,
14    missing_docs,
15    unreachable_pub,
16    unused_import_braces,
17    unused_labels,
18    unused_lifetimes,
19    unused_qualifications,
20    unused_results
21)]
22#![allow(clippy::uninlined_format_args)]
23
24#[cfg(feature = "cluster")]
25pub mod cluster;
26mod config;
27
28#[cfg(feature = "sentinel")]
29pub mod sentinel;
30
31use std::{
32    ops::{Deref, DerefMut},
33    sync::atomic::{AtomicUsize, Ordering},
34};
35
36use deadpool::managed;
37use redis::{
38    aio::{ConnectionLike, MultiplexedConnection},
39    Client, IntoConnectionInfo, RedisError, RedisResult,
40};
41
42pub use redis;
43
44pub use self::config::{
45    Config, ConfigError, ConnectionAddr, ConnectionInfo, ProtocolVersion, RedisConnectionInfo,
46};
47
48pub use deadpool::managed::reexports::*;
49deadpool::managed_reexports!("redis", Manager, Connection, RedisError, ConfigError);
50
51/// Type alias for using [`deadpool::managed::RecycleResult`] with [`redis`].
52type RecycleResult = managed::RecycleResult<RedisError>;
53
54/// Wrapper around [`redis::aio::MultiplexedConnection`].
55///
56/// This structure implements [`redis::aio::ConnectionLike`] and can therefore
57/// be used just like a regular [`redis::aio::MultiplexedConnection`].
58#[allow(missing_debug_implementations)] // `redis::aio::MultiplexedConnection: !Debug`
59pub struct Connection {
60    conn: Object,
61}
62
63impl Connection {
64    /// Takes this [`Connection`] from its [`Pool`] permanently.
65    ///
66    /// This reduces the size of the [`Pool`].
67    #[must_use]
68    pub fn take(this: Self) -> MultiplexedConnection {
69        Object::take(this.conn)
70    }
71}
72
73impl From<Object> for Connection {
74    fn from(conn: Object) -> Self {
75        Self { conn }
76    }
77}
78
79impl Deref for Connection {
80    type Target = MultiplexedConnection;
81
82    fn deref(&self) -> &MultiplexedConnection {
83        &self.conn
84    }
85}
86
87impl DerefMut for Connection {
88    fn deref_mut(&mut self) -> &mut MultiplexedConnection {
89        &mut self.conn
90    }
91}
92
93impl AsRef<MultiplexedConnection> for Connection {
94    fn as_ref(&self) -> &MultiplexedConnection {
95        &self.conn
96    }
97}
98
99impl AsMut<MultiplexedConnection> for Connection {
100    fn as_mut(&mut self) -> &mut MultiplexedConnection {
101        &mut self.conn
102    }
103}
104
105impl ConnectionLike for Connection {
106    fn req_packed_command<'a>(
107        &'a mut self,
108        cmd: &'a redis::Cmd,
109    ) -> redis::RedisFuture<'a, redis::Value> {
110        self.conn.req_packed_command(cmd)
111    }
112
113    fn req_packed_commands<'a>(
114        &'a mut self,
115        cmd: &'a redis::Pipeline,
116        offset: usize,
117        count: usize,
118    ) -> redis::RedisFuture<'a, Vec<redis::Value>> {
119        self.conn.req_packed_commands(cmd, offset, count)
120    }
121
122    fn get_db(&self) -> i64 {
123        self.conn.get_db()
124    }
125}
126
127/// [`Manager`] for creating and recycling [`redis`] connections.
128///
129/// [`Manager`]: managed::Manager
130#[derive(Debug)]
131pub struct Manager {
132    client: Client,
133    ping_number: AtomicUsize,
134}
135
136impl Manager {
137    /// Creates a new [`Manager`] from the given `params`.
138    ///
139    /// # Errors
140    ///
141    /// If establishing a new [`Client`] fails.
142    pub fn new<T: IntoConnectionInfo>(params: T) -> RedisResult<Self> {
143        Ok(Self {
144            client: Client::open(params)?,
145            ping_number: AtomicUsize::new(0),
146        })
147    }
148}
149
150impl managed::Manager for Manager {
151    type Type = MultiplexedConnection;
152    type Error = RedisError;
153
154    async fn create(&self) -> Result<MultiplexedConnection, RedisError> {
155        let conn = self.client.get_multiplexed_async_connection().await?;
156        Ok(conn)
157    }
158
159    async fn recycle(&self, conn: &mut MultiplexedConnection, _: &Metrics) -> RecycleResult {
160        let ping_number = self.ping_number.fetch_add(1, Ordering::Relaxed).to_string();
161        // Using pipeline to avoid roundtrip for UNWATCH
162        let (n,) = redis::Pipeline::with_capacity(2)
163            .cmd("UNWATCH")
164            .ignore()
165            .cmd("PING")
166            .arg(&ping_number)
167            .query_async::<(String,)>(conn)
168            .await?;
169        if n == ping_number {
170            Ok(())
171        } else {
172            Err(managed::RecycleError::message("Invalid PING response"))
173        }
174    }
175}