bb8_redis_break_with_error/
lib.rs1pub use bb8;
2pub use redis;
3
4use core::{
5 fmt,
6 ops::{Deref, DerefMut},
7};
8
9use async_trait::async_trait;
10use redis::{
11 aio::{Connection, ConnectionLike},
12 cmd, Client, Cmd, ErrorKind, IntoConnectionInfo, Pipeline, RedisError, RedisFuture, Value,
13};
14
15#[derive(Clone, Debug)]
17pub struct RedisConnectionManager {
18 client: Client,
19}
20
21impl RedisConnectionManager {
22 pub fn new<T: IntoConnectionInfo>(info: T) -> Result<RedisConnectionManager, RedisError> {
25 Ok(RedisConnectionManager {
26 client: Client::open(info.into_connection_info()?)?,
27 })
28 }
29}
30
31#[async_trait]
32impl bb8::ManageConnection for RedisConnectionManager {
33 type Connection = RedisConnection;
34 type Error = RedisError;
35
36 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
37 self.client
38 .get_async_connection()
39 .await
40 .map(RedisConnection::new)
41 }
42
43 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
44 let pong: String = cmd("PING").query_async(conn).await?;
45 match pong.as_str() {
46 "PONG" => Ok(()),
47 _ => Err((ErrorKind::ResponseError, "ping request").into()),
48 }
49 }
50
51 fn has_broken(&self, conn: &mut Self::Connection) -> bool {
52 conn.is_close_required()
53 }
54}
55
56pub struct RedisConnection {
58 connection: Connection,
59 close_required: bool,
60}
61
62impl fmt::Debug for RedisConnection {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 f.debug_struct("RedisConnection")
65 .field("connection", &"")
66 .field("close_required", &self.close_required)
67 .finish()
68 }
69}
70
71impl RedisConnection {
72 pub fn new(connection: Connection) -> Self {
73 Self {
74 connection,
75 close_required: false,
76 }
77 }
78
79 pub fn inner(&self) -> &Connection {
80 &self.connection
81 }
82
83 pub fn into_inner(self) -> Connection {
84 self.connection
85 }
86
87 pub fn set_close_required_with_error(&mut self, err: &RedisError) {
88 let val = match err.kind() {
89 ErrorKind::ResponseError => {
90 false
92 }
93 ErrorKind::AuthenticationFailed => true,
94 ErrorKind::TypeError => {
95 true
97 }
98 ErrorKind::ExecAbortError => {
99 false
101 }
102 ErrorKind::BusyLoadingError => {
103 false
105 }
106 ErrorKind::NoScriptError => {
107 false
109 }
110 ErrorKind::InvalidClientConfig => true,
111 ErrorKind::Moved => {
112 false
114 }
115 ErrorKind::Ask => {
116 false
118 }
119 ErrorKind::TryAgain => {
120 false
122 }
123 ErrorKind::ClusterDown => {
124 false
126 }
127 ErrorKind::CrossSlot => {
128 false
130 }
131 ErrorKind::MasterDown => {
132 false
134 }
135 ErrorKind::IoError => {
136 true
139 }
140 ErrorKind::ClientError => true,
141 ErrorKind::ExtensionError => {
142 match err.code() {
146 Some("NOAUTH") => true,
147 Some("WRONGPASS") => true,
148 _ => true,
149 }
150 }
151 ErrorKind::ReadOnly => {
152 false
154 }
155 _ => true,
156 };
157 self.set_close_required(val)
158 }
159
160 pub fn set_close_required(&mut self, val: bool) {
161 self.close_required = val
162 }
163
164 pub fn is_close_required(&self) -> bool {
165 self.close_required
166 }
167}
168
169impl Deref for RedisConnection {
170 type Target = Connection;
171
172 fn deref(&self) -> &Connection {
173 &self.connection
174 }
175}
176
177impl DerefMut for RedisConnection {
178 fn deref_mut(&mut self) -> &mut Connection {
179 &mut self.connection
180 }
181}
182
183impl ConnectionLike for RedisConnection {
185 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
186 Box::pin(async move {
187 match self.connection.req_packed_command(cmd).await {
188 Ok(value) => Ok(value),
189 Err(err) => {
190 self.set_close_required_with_error(&err);
191 Err(err)
192 }
193 }
194 })
195 }
196
197 fn req_packed_commands<'a>(
198 &'a mut self,
199 cmd: &'a Pipeline,
200 offset: usize,
201 count: usize,
202 ) -> RedisFuture<'a, Vec<Value>> {
203 Box::pin(async move {
204 match self
205 .connection
206 .req_packed_commands(cmd, offset, count)
207 .await
208 {
209 Ok(value) => Ok(value),
210 Err(err) => {
211 self.set_close_required_with_error(&err);
212 Err(err)
213 }
214 }
215 })
216 }
217
218 fn get_db(&self) -> i64 {
219 self.connection.get_db()
220 }
221}