deadpool_redis/cluster/
mod.rs1mod config;
3
4use std::{
5 ops::{Deref, DerefMut},
6 sync::atomic::{AtomicUsize, Ordering},
7};
8
9use deadpool::managed;
10use redis::{aio::ConnectionLike, IntoConnectionInfo, RedisError, RedisResult};
11
12use redis;
13pub use redis::cluster::{ClusterClient, ClusterClientBuilder};
14pub use redis::cluster_async::ClusterConnection;
15
16pub use self::config::{Config, ConfigError};
17
18pub use deadpool::managed::reexports::*;
19deadpool::managed_reexports!(
20 "redis_cluster",
21 Manager,
22 Connection,
23 RedisError,
24 ConfigError
25);
26
27type RecycleResult = managed::RecycleResult<RedisError>;
28
29#[allow(missing_debug_implementations)] pub struct Connection {
35 conn: Object,
36}
37
38impl Connection {
39 #[must_use]
43 pub fn take(this: Self) -> ClusterConnection {
44 Object::take(this.conn)
45 }
46}
47
48impl From<Object> for Connection {
49 fn from(conn: Object) -> Self {
50 Self { conn }
51 }
52}
53
54impl Deref for Connection {
55 type Target = ClusterConnection;
56
57 fn deref(&self) -> &ClusterConnection {
58 &self.conn
59 }
60}
61
62impl DerefMut for Connection {
63 fn deref_mut(&mut self) -> &mut ClusterConnection {
64 &mut self.conn
65 }
66}
67
68impl AsRef<ClusterConnection> for Connection {
69 fn as_ref(&self) -> &ClusterConnection {
70 &self.conn
71 }
72}
73
74impl AsMut<ClusterConnection> for Connection {
75 fn as_mut(&mut self) -> &mut ClusterConnection {
76 &mut self.conn
77 }
78}
79
80impl ConnectionLike for Connection {
81 fn req_packed_command<'a>(
82 &'a mut self,
83 cmd: &'a redis::Cmd,
84 ) -> redis::RedisFuture<'a, redis::Value> {
85 self.conn.req_packed_command(cmd)
86 }
87
88 fn req_packed_commands<'a>(
89 &'a mut self,
90 cmd: &'a redis::Pipeline,
91 offset: usize,
92 count: usize,
93 ) -> redis::RedisFuture<'a, Vec<redis::Value>> {
94 self.conn.req_packed_commands(cmd, offset, count)
95 }
96
97 fn get_db(&self) -> i64 {
98 self.conn.get_db()
99 }
100}
101
102pub struct Manager {
106 client: ClusterClient,
107 ping_number: AtomicUsize,
108}
109
110impl std::fmt::Debug for Manager {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct("Manager")
114 .field("client", &format!("{:p}", &self.client))
115 .field("ping_number", &self.ping_number)
116 .finish()
117 }
118}
119
120impl Manager {
121 pub fn new<T: IntoConnectionInfo>(
127 params: Vec<T>,
128 read_from_replicas: bool,
129 ) -> RedisResult<Self> {
130 let mut client = ClusterClientBuilder::new(params);
131 if read_from_replicas {
132 client = client.read_from_replicas();
133 }
134 Ok(Self {
135 client: client.build()?,
136 ping_number: AtomicUsize::new(0),
137 })
138 }
139}
140
141impl managed::Manager for Manager {
142 type Type = ClusterConnection;
143 type Error = RedisError;
144
145 async fn create(&self) -> Result<ClusterConnection, RedisError> {
146 let conn = self.client.get_async_connection().await?;
147 Ok(conn)
148 }
149
150 async fn recycle(&self, conn: &mut ClusterConnection, _: &Metrics) -> RecycleResult {
151 let ping_number = self.ping_number.fetch_add(1, Ordering::Relaxed).to_string();
152 let n = redis::cmd("PING")
153 .arg(&ping_number)
154 .query_async::<String>(conn)
155 .await?;
156 if n == ping_number {
157 Ok(())
158 } else {
159 Err(managed::RecycleError::message("Invalid PING response"))
160 }
161 }
162}