deadpool_redis_cluster/
lib.rs1#![deprecated(since = "0.1.1", note="The functionality of `redis_cluster_async` has been merged into the
2`redis` crate rendering this crate obsolete. Please use `deadpool-redis` instead.")]
3#![doc = include_str!("../README.md")]
4#![cfg_attr(docsrs, feature(doc_cfg))]
5#![deny(
6 nonstandard_style,
7 rust_2018_idioms,
8 rustdoc::broken_intra_doc_links,
9 rustdoc::private_intra_doc_links
10)]
11#![forbid(non_ascii_idents, unsafe_code)]
12#![warn(
13 deprecated_in_future,
14 missing_copy_implementations,
15 missing_debug_implementations,
16 missing_docs,
17 unreachable_pub,
18 unused_import_braces,
19 unused_labels,
20 unused_lifetimes,
21 unused_qualifications,
22 unused_results
23)]
24#![allow(clippy::uninlined_format_args)]
25
26mod config;
27
28use std::{
29 ops::{Deref, DerefMut},
30 sync::atomic::{AtomicUsize, Ordering},
31};
32
33use deadpool::{async_trait, managed};
34use redis::{aio::ConnectionLike, IntoConnectionInfo, RedisError, RedisResult};
35
36pub use redis;
37pub use redis_cluster_async::{Client, Connection as RedisConnection};
38
39pub use self::config::{Config, ConfigError};
40
41pub use deadpool::managed::reexports::*;
42deadpool::managed_reexports!(
43 "redis_cluster",
44 Manager,
45 Connection,
46 RedisError,
47 ConfigError
48);
49
50type RecycleResult = managed::RecycleResult<RedisError>;
51
52#[allow(missing_debug_implementations)] pub struct Connection {
58 conn: Object,
59}
60
61impl Connection {
62 #[must_use]
66 pub fn take(this: Self) -> RedisConnection {
67 Object::take(this.conn)
68 }
69}
70
71impl From<Object> for Connection {
72 fn from(conn: Object) -> Self {
73 Self { conn }
74 }
75}
76
77impl Deref for Connection {
78 type Target = RedisConnection;
79
80 fn deref(&self) -> &RedisConnection {
81 &self.conn
82 }
83}
84
85impl DerefMut for Connection {
86 fn deref_mut(&mut self) -> &mut RedisConnection {
87 &mut self.conn
88 }
89}
90
91impl AsRef<RedisConnection> for Connection {
92 fn as_ref(&self) -> &RedisConnection {
93 &self.conn
94 }
95}
96
97impl AsMut<RedisConnection> for Connection {
98 fn as_mut(&mut self) -> &mut RedisConnection {
99 &mut self.conn
100 }
101}
102
103impl ConnectionLike for Connection {
104 fn req_packed_command<'a>(
105 &'a mut self,
106 cmd: &'a redis::Cmd,
107 ) -> redis::RedisFuture<'a, redis::Value> {
108 self.conn.req_packed_command(cmd)
109 }
110
111 fn req_packed_commands<'a>(
112 &'a mut self,
113 cmd: &'a redis::Pipeline,
114 offset: usize,
115 count: usize,
116 ) -> redis::RedisFuture<'a, Vec<redis::Value>> {
117 self.conn.req_packed_commands(cmd, offset, count)
118 }
119
120 fn get_db(&self) -> i64 {
121 self.conn.get_db()
122 }
123}
124
125pub struct Manager {
129 client: Client,
130 ping_number: AtomicUsize,
131}
132
133impl std::fmt::Debug for Manager {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 f.debug_struct("Manager")
137 .field("client", &format!("{:p}", &self.client))
138 .field("ping_number", &self.ping_number)
139 .finish()
140 }
141}
142
143impl Manager {
144 pub fn new<T: IntoConnectionInfo>(params: Vec<T>) -> RedisResult<Self> {
150 Ok(Self {
151 client: Client::open(params)?,
152 ping_number: AtomicUsize::new(0),
153 })
154 }
155}
156
157#[async_trait]
158impl managed::Manager for Manager {
159 type Type = RedisConnection;
160 type Error = RedisError;
161
162 async fn create(&self) -> Result<RedisConnection, RedisError> {
163 let conn = self.client.get_connection().await?;
164 Ok(conn)
165 }
166
167 async fn recycle(&self, conn: &mut RedisConnection, _: &Metrics) -> RecycleResult {
168 let ping_number = self.ping_number.fetch_add(1, Ordering::Relaxed).to_string();
169 let n = redis::cmd("PING")
170 .arg(&ping_number)
171 .query_async::<_, String>(conn)
172 .await?;
173 if n == ping_number {
174 Ok(())
175 } else {
176 Err(managed::RecycleError::StaticMessage(
177 "Invalid PING response",
178 ))
179 }
180 }
181}