reool/
pool_connection.rs

1use std::io::{Error as IoError, ErrorKind as IoErrorKind};
2use std::{
3    sync::Arc,
4    time::{Duration, Instant},
5};
6
7use futures::prelude::*;
8use redis::{aio::ConnectionLike, Cmd, ErrorKind, Pipeline, RedisError, RedisFuture, Value};
9use tokio::time::timeout;
10
11use crate::pools::pool_internal::Managed;
12use crate::{config::DefaultCommandTimeout, Poolable};
13
14/// A connection that has been taken from the pool.
15///
16/// The connection returns when dropped unless there was an error.
17///
18/// Pooled connection implements `redis::async::ConnectionLike`
19/// to easily integrate with code that already uses `redis-rs`.
20pub struct PoolConnection<T: Poolable = ConnectionFlavour> {
21    /// Track whether the connection is still in a valid state.
22    ///
23    /// If a future gets cancelled it is likely that the connection
24    /// is not in a valid state anymore. For stateless connections this
25    /// field is useless.
26    pub(crate) connection_state_ok: bool,
27    pub(crate) managed: Option<Managed<T>>,
28    pub(crate) command_timeout: Option<Duration>,
29}
30
31impl<T: Poolable> PoolConnection<T> {
32    pub fn default_command_timeout<TO: Into<DefaultCommandTimeout>>(&mut self, timeout: TO) {
33        self.command_timeout = timeout.into().to_duration_opt();
34    }
35
36    fn get_connection(&mut self) -> Result<&mut T, IoError> {
37        let managed = if let Some(managed) = &mut self.managed {
38            managed
39        } else {
40            return Err(IoError::new(
41                IoErrorKind::ConnectionAborted,
42                "connection is broken due to a previous io error",
43            ));
44        };
45
46        if let Some(connection) = managed.connection_mut() {
47            Ok(connection)
48        } else {
49            Err(IoError::new(
50                IoErrorKind::ConnectionAborted,
51                "inner connection is invalid. THIS IS A BUG!",
52            ))
53        }
54    }
55
56    /// Invalidate the managed internal connection to prevent it from returning
57    /// to the pool and also immediately drop the invalidated managed connection to
58    /// trigger the creation of a new one
59    fn invalidate(&mut self) {
60        if let Some(mut managed) = self.managed.take() {
61            managed.invalidate()
62        }
63        self.managed = None;
64    }
65}
66
67impl<T: Poolable> ConnectionLike for PoolConnection<T>
68where
69    T: ConnectionLike,
70{
71    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
72        async move {
73            self.connection_state_ok = false;
74            let command_timeout = self.command_timeout;
75
76            let conn = self.get_connection()?;
77
78            let f = conn.req_packed_command(cmd);
79            let r = if let Some(command_timeout) = command_timeout {
80                let started = Instant::now();
81                match timeout(command_timeout, f).await {
82                    Ok(r) => r,
83                    Err(_) => {
84                        let message = format!(
85                            "command timeout after {:?} on `req_packed_command`.",
86                            started.elapsed()
87                        );
88                        let err: RedisError =
89                            (ErrorKind::IoError, "command timeout", message).into();
90                        Err(err)
91                    }
92                }
93            } else {
94                f.await
95            };
96
97            match r {
98                Ok(value) => {
99                    self.connection_state_ok = true;
100
101                    Ok(value)
102                }
103                Err(err) => {
104                    match err.kind() {
105                        // ErrorKind::ResponseError is a hack because the
106                        // parsing files with 0 bytes and an unexpected EOF
107                        // This behaviour need clarification.
108                        // See https://github.com/mitsuhiko/redis-rs/issues/320
109                        ErrorKind::IoError | ErrorKind::ResponseError => {
110                            // TODO: Can we get a new connection?
111                            self.invalidate();
112                        }
113                        _ => {
114                            self.connection_state_ok = true;
115                        }
116                    }
117                    Err(err)
118                }
119            }
120        }
121        .boxed()
122    }
123
124    fn req_packed_commands<'a>(
125        &'a mut self,
126        pipeline: &'a Pipeline,
127        offset: usize,
128        count: usize,
129    ) -> RedisFuture<'a, Vec<Value>> {
130        async move {
131            self.connection_state_ok = false;
132            let command_timeout = self.command_timeout;
133
134            let conn = self.get_connection()?;
135
136            let f = conn.req_packed_commands(pipeline, offset, count);
137            let r = if let Some(command_timeout) = command_timeout {
138                let started = Instant::now();
139                match timeout(command_timeout, f).await {
140                    Ok(r) => r,
141                    Err(_) => {
142                        let message = format!(
143                            "command timeout after {:?} on `req_packed_commands`.",
144                            started.elapsed()
145                        );
146                        let err: RedisError =
147                            (ErrorKind::IoError, "command timeout", message).into();
148                        Err(err)
149                    }
150                }
151            } else {
152                f.await
153            };
154
155            match r {
156                Ok(values) => {
157                    self.connection_state_ok = true;
158
159                    Ok(values)
160                }
161                Err(err) => {
162                    match err.kind() {
163                        // ErrorKind::ResponseError is a hack because the
164                        // parsing files with 0 bytes and an unexpected EOF
165                        // This behaviour need clarification.
166                        // See https://github.com/mitsuhiko/redis-rs/issues/320
167                        ErrorKind::IoError | ErrorKind::ResponseError => {
168                            // TODO: Can we get a new connection?
169                            self.invalidate();
170                        }
171                        _ => {
172                            self.connection_state_ok = true;
173                        }
174                    }
175                    Err(err)
176                }
177            }
178        }
179        .boxed()
180    }
181
182    fn get_db(&self) -> i64 {
183        if let Some(conn) = self.managed.as_ref() {
184            conn.get_db()
185        } else {
186            -1
187        }
188    }
189}
190
191impl<T: Poolable> Drop for PoolConnection<T> {
192    fn drop(&mut self) {
193        if !self.connection_state_ok {
194            self.invalidate();
195        }
196    }
197}
198
199pub enum ConnectionFlavour {
200    RedisRs(redis::aio::Connection, Arc<String>),
201    // Tls(?)
202}
203
204impl Poolable for ConnectionFlavour {
205    fn connected_to(&self) -> &str {
206        match self {
207            ConnectionFlavour::RedisRs(_, c) => &c,
208        }
209    }
210}
211
212impl ConnectionLike for ConnectionFlavour {
213    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
214        match self {
215            ConnectionFlavour::RedisRs(conn, _uri) => conn.req_packed_command(cmd),
216        }
217    }
218
219    fn req_packed_commands<'a>(
220        &'a mut self,
221        pipeline: &'a Pipeline,
222        offset: usize,
223        count: usize,
224    ) -> RedisFuture<'a, Vec<Value>> {
225        match self {
226            ConnectionFlavour::RedisRs(conn, _) => {
227                conn.req_packed_commands(pipeline, offset, count)
228            }
229        }
230    }
231
232    fn get_db(&self) -> i64 {
233        match self {
234            ConnectionFlavour::RedisRs(conn, _) => conn.get_db(),
235        }
236    }
237}
238
239impl<T: Poolable> ConnectionLike for Managed<T>
240where
241    T: ConnectionLike,
242{
243    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
244        async move {
245            let conn = match self.connection_mut() {
246                Some(conn) => conn,
247                None => {
248                    return Err(
249                        (ErrorKind::IoError, "no connection - this is a bug of reool").into(),
250                    )
251                }
252            };
253
254            let value = conn.req_packed_command(cmd).await?;
255
256            Ok(value)
257        }
258        .boxed()
259    }
260
261    fn req_packed_commands<'a>(
262        &'a mut self,
263        pipeline: &'a Pipeline,
264        offset: usize,
265        count: usize,
266    ) -> RedisFuture<'a, Vec<Value>> {
267        async move {
268            let conn = match self.connection_mut() {
269                Some(conn) => conn,
270                None => {
271                    return Err(
272                        (ErrorKind::IoError, "no connection - this is a bug of reool").into(),
273                    )
274                }
275            };
276
277            let values = conn.req_packed_commands(pipeline, offset, count).await?;
278
279            Ok(values)
280        }
281        .boxed()
282    }
283
284    fn get_db(&self) -> i64 {
285        if let Some(conn) = self.connection() {
286            conn.get_db()
287        } else {
288            -1
289        }
290    }
291}