bb8_redis_break_with_error/
lib.rs

1pub use bb8;
2pub use redis;
3
4use core::{
5    fmt,
6    ops::{Deref, DerefMut},
7};
8
9use async_trait::async_trait;
10use redis::{
11    aio::{Connection, ConnectionLike},
12    cmd, Client, Cmd, ErrorKind, IntoConnectionInfo, Pipeline, RedisError, RedisFuture, Value,
13};
14
15/// A `bb8::ManageConnection` for `redis::Client::get_async_connection`.
16#[derive(Clone, Debug)]
17pub struct RedisConnectionManager {
18    client: Client,
19}
20
21impl RedisConnectionManager {
22    /// Create a new `RedisConnectionManager`.
23    /// See `redis::Client::open` for a description of the parameter types.
24    pub fn new<T: IntoConnectionInfo>(info: T) -> Result<RedisConnectionManager, RedisError> {
25        Ok(RedisConnectionManager {
26            client: Client::open(info.into_connection_info()?)?,
27        })
28    }
29}
30
31#[async_trait]
32impl bb8::ManageConnection for RedisConnectionManager {
33    type Connection = RedisConnection;
34    type Error = RedisError;
35
36    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
37        self.client
38            .get_async_connection()
39            .await
40            .map(RedisConnection::new)
41    }
42
43    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
44        let pong: String = cmd("PING").query_async(conn).await?;
45        match pong.as_str() {
46            "PONG" => Ok(()),
47            _ => Err((ErrorKind::ResponseError, "ping request").into()),
48        }
49    }
50
51    fn has_broken(&self, conn: &mut Self::Connection) -> bool {
52        conn.is_close_required()
53    }
54}
55
56//
57pub struct RedisConnection {
58    connection: Connection,
59    close_required: bool,
60}
61
62impl fmt::Debug for RedisConnection {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        f.debug_struct("RedisConnection")
65            .field("connection", &"")
66            .field("close_required", &self.close_required)
67            .finish()
68    }
69}
70
71impl RedisConnection {
72    pub fn new(connection: Connection) -> Self {
73        Self {
74            connection,
75            close_required: false,
76        }
77    }
78
79    pub fn inner(&self) -> &Connection {
80        &self.connection
81    }
82
83    pub fn into_inner(self) -> Connection {
84        self.connection
85    }
86
87    pub fn set_close_required_with_error(&mut self, err: &RedisError) {
88        let val = match err.kind() {
89            ErrorKind::ResponseError => {
90                // https://github.com/redis-rs/redis-rs/blob/0.21.5/src/parser.rs#L122
91                false
92            }
93            ErrorKind::AuthenticationFailed => true,
94            ErrorKind::TypeError => {
95                // https://github.com/redis-rs/redis-rs/blob/0.21.5/src/types.rs#L21
96                true
97            }
98            ErrorKind::ExecAbortError => {
99                // https://github.com/redis-rs/redis-rs/blob/0.21.5/src/parser.rs#L123
100                false
101            }
102            ErrorKind::BusyLoadingError => {
103                // https://github.com/redis-rs/redis-rs/blob/0.21.5/src/parser.rs#L124
104                false
105            }
106            ErrorKind::NoScriptError => {
107                // https://github.com/redis-rs/redis-rs/blob/0.21.5/src/parser.rs#L125
108                false
109            }
110            ErrorKind::InvalidClientConfig => true,
111            ErrorKind::Moved => {
112                // https://github.com/redis-rs/redis-rs/blob/0.21.5/src/parser.rs#L126
113                false
114            }
115            ErrorKind::Ask => {
116                // https://github.com/redis-rs/redis-rs/blob/0.21.5/src/parser.rs#L127
117                false
118            }
119            ErrorKind::TryAgain => {
120                // https://github.com/redis-rs/redis-rs/blob/0.21.5/src/parser.rs#L128
121                false
122            }
123            ErrorKind::ClusterDown => {
124                // https://github.com/redis-rs/redis-rs/blob/0.21.5/src/parser.rs#L129
125                false
126            }
127            ErrorKind::CrossSlot => {
128                // https://github.com/redis-rs/redis-rs/blob/0.21.5/src/parser.rs#L130
129                false
130            }
131            ErrorKind::MasterDown => {
132                // https://github.com/redis-rs/redis-rs/blob/0.21.5/src/parser.rs#L131
133                false
134            }
135            ErrorKind::IoError => {
136                // TODO, is_connection_dropped is_connection_refusal
137                //
138                true
139            }
140            ErrorKind::ClientError => true,
141            ErrorKind::ExtensionError => {
142                // https://github.com/redis-rs/redis-rs/blob/0.21.5/src/types.rs#L315-L319
143                // https://github.com/redis-rs/redis-rs/blob/0.21.5/src/types.rs#L367
144                //
145                match err.code() {
146                    Some("NOAUTH") => true,
147                    Some("WRONGPASS") => true,
148                    _ => true,
149                }
150            }
151            ErrorKind::ReadOnly => {
152                // https://github.com/redis-rs/redis-rs/blob/0.21.5/src/parser.rs#L132
153                false
154            }
155            _ => true,
156        };
157        self.set_close_required(val)
158    }
159
160    pub fn set_close_required(&mut self, val: bool) {
161        self.close_required = val
162    }
163
164    pub fn is_close_required(&self) -> bool {
165        self.close_required
166    }
167}
168
169impl Deref for RedisConnection {
170    type Target = Connection;
171
172    fn deref(&self) -> &Connection {
173        &self.connection
174    }
175}
176
177impl DerefMut for RedisConnection {
178    fn deref_mut(&mut self) -> &mut Connection {
179        &mut self.connection
180    }
181}
182
183//
184impl ConnectionLike for RedisConnection {
185    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
186        Box::pin(async move {
187            match self.connection.req_packed_command(cmd).await {
188                Ok(value) => Ok(value),
189                Err(err) => {
190                    self.set_close_required_with_error(&err);
191                    Err(err)
192                }
193            }
194        })
195    }
196
197    fn req_packed_commands<'a>(
198        &'a mut self,
199        cmd: &'a Pipeline,
200        offset: usize,
201        count: usize,
202    ) -> RedisFuture<'a, Vec<Value>> {
203        Box::pin(async move {
204            match self
205                .connection
206                .req_packed_commands(cmd, offset, count)
207                .await
208            {
209                Ok(value) => Ok(value),
210                Err(err) => {
211                    self.set_close_required_with_error(&err);
212                    Err(err)
213                }
214            }
215        })
216    }
217
218    fn get_db(&self) -> i64 {
219        self.connection.get_db()
220    }
221}