1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![deny(
4 nonstandard_style,
5 rust_2018_idioms,
6 rustdoc::broken_intra_doc_links,
7 rustdoc::private_intra_doc_links
8)]
9#![forbid(non_ascii_idents, unsafe_code)]
10#![warn(
11 deprecated_in_future,
12 missing_copy_implementations,
13 missing_debug_implementations,
14 missing_docs,
15 unreachable_pub,
16 unused_import_braces,
17 unused_labels,
18 unused_lifetimes,
19 unused_qualifications,
20 unused_results
21)]
22#![allow(clippy::uninlined_format_args)]
23
24#[cfg(feature = "cluster")]
25pub mod cluster;
26mod config;
27
28#[cfg(feature = "sentinel")]
29pub mod sentinel;
30
31use std::{
32 ops::{Deref, DerefMut},
33 sync::atomic::{AtomicUsize, Ordering},
34};
35
36use deadpool::managed;
37use redis::{
38 aio::{ConnectionLike, MultiplexedConnection},
39 Client, IntoConnectionInfo, RedisError, RedisResult,
40};
41
42pub use redis;
43
44pub use self::config::{
45 Config, ConfigError, ConnectionAddr, ConnectionInfo, ProtocolVersion, RedisConnectionInfo,
46};
47
48pub use deadpool::managed::reexports::*;
49deadpool::managed_reexports!("redis", Manager, Connection, RedisError, ConfigError);
50
51type RecycleResult = managed::RecycleResult<RedisError>;
53
54#[allow(missing_debug_implementations)] pub struct Connection {
60 conn: Object,
61}
62
63impl Connection {
64 #[must_use]
68 pub fn take(this: Self) -> MultiplexedConnection {
69 Object::take(this.conn)
70 }
71}
72
73impl From<Object> for Connection {
74 fn from(conn: Object) -> Self {
75 Self { conn }
76 }
77}
78
79impl Deref for Connection {
80 type Target = MultiplexedConnection;
81
82 fn deref(&self) -> &MultiplexedConnection {
83 &self.conn
84 }
85}
86
87impl DerefMut for Connection {
88 fn deref_mut(&mut self) -> &mut MultiplexedConnection {
89 &mut self.conn
90 }
91}
92
93impl AsRef<MultiplexedConnection> for Connection {
94 fn as_ref(&self) -> &MultiplexedConnection {
95 &self.conn
96 }
97}
98
99impl AsMut<MultiplexedConnection> for Connection {
100 fn as_mut(&mut self) -> &mut MultiplexedConnection {
101 &mut self.conn
102 }
103}
104
105impl ConnectionLike for Connection {
106 fn req_packed_command<'a>(
107 &'a mut self,
108 cmd: &'a redis::Cmd,
109 ) -> redis::RedisFuture<'a, redis::Value> {
110 self.conn.req_packed_command(cmd)
111 }
112
113 fn req_packed_commands<'a>(
114 &'a mut self,
115 cmd: &'a redis::Pipeline,
116 offset: usize,
117 count: usize,
118 ) -> redis::RedisFuture<'a, Vec<redis::Value>> {
119 self.conn.req_packed_commands(cmd, offset, count)
120 }
121
122 fn get_db(&self) -> i64 {
123 self.conn.get_db()
124 }
125}
126
127#[derive(Debug)]
131pub struct Manager {
132 client: Client,
133 ping_number: AtomicUsize,
134}
135
136impl Manager {
137 pub fn new<T: IntoConnectionInfo>(params: T) -> RedisResult<Self> {
143 Ok(Self {
144 client: Client::open(params)?,
145 ping_number: AtomicUsize::new(0),
146 })
147 }
148}
149
150impl managed::Manager for Manager {
151 type Type = MultiplexedConnection;
152 type Error = RedisError;
153
154 async fn create(&self) -> Result<MultiplexedConnection, RedisError> {
155 let conn = self.client.get_multiplexed_async_connection().await?;
156 Ok(conn)
157 }
158
159 async fn recycle(&self, conn: &mut MultiplexedConnection, _: &Metrics) -> RecycleResult {
160 let ping_number = self.ping_number.fetch_add(1, Ordering::Relaxed).to_string();
161 let (n,) = redis::Pipeline::with_capacity(2)
163 .cmd("UNWATCH")
164 .ignore()
165 .cmd("PING")
166 .arg(&ping_number)
167 .query_async::<(String,)>(conn)
168 .await?;
169 if n == ping_number {
170 Ok(())
171 } else {
172 Err(managed::RecycleError::message("Invalid PING response"))
173 }
174 }
175}