1use std::io::{Error as IoError, ErrorKind as IoErrorKind};
2use std::{
3 sync::Arc,
4 time::{Duration, Instant},
5};
6
7use futures::prelude::*;
8use redis::{aio::ConnectionLike, Cmd, ErrorKind, Pipeline, RedisError, RedisFuture, Value};
9use tokio::time::timeout;
10
11use crate::pools::pool_internal::Managed;
12use crate::{config::DefaultCommandTimeout, Poolable};
13
14pub struct PoolConnection<T: Poolable = ConnectionFlavour> {
21 pub(crate) connection_state_ok: bool,
27 pub(crate) managed: Option<Managed<T>>,
28 pub(crate) command_timeout: Option<Duration>,
29}
30
31impl<T: Poolable> PoolConnection<T> {
32 pub fn default_command_timeout<TO: Into<DefaultCommandTimeout>>(&mut self, timeout: TO) {
33 self.command_timeout = timeout.into().to_duration_opt();
34 }
35
36 fn get_connection(&mut self) -> Result<&mut T, IoError> {
37 let managed = if let Some(managed) = &mut self.managed {
38 managed
39 } else {
40 return Err(IoError::new(
41 IoErrorKind::ConnectionAborted,
42 "connection is broken due to a previous io error",
43 ));
44 };
45
46 if let Some(connection) = managed.connection_mut() {
47 Ok(connection)
48 } else {
49 Err(IoError::new(
50 IoErrorKind::ConnectionAborted,
51 "inner connection is invalid. THIS IS A BUG!",
52 ))
53 }
54 }
55
56 fn invalidate(&mut self) {
60 if let Some(mut managed) = self.managed.take() {
61 managed.invalidate()
62 }
63 self.managed = None;
64 }
65}
66
67impl<T: Poolable> ConnectionLike for PoolConnection<T>
68where
69 T: ConnectionLike,
70{
71 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
72 async move {
73 self.connection_state_ok = false;
74 let command_timeout = self.command_timeout;
75
76 let conn = self.get_connection()?;
77
78 let f = conn.req_packed_command(cmd);
79 let r = if let Some(command_timeout) = command_timeout {
80 let started = Instant::now();
81 match timeout(command_timeout, f).await {
82 Ok(r) => r,
83 Err(_) => {
84 let message = format!(
85 "command timeout after {:?} on `req_packed_command`.",
86 started.elapsed()
87 );
88 let err: RedisError =
89 (ErrorKind::IoError, "command timeout", message).into();
90 Err(err)
91 }
92 }
93 } else {
94 f.await
95 };
96
97 match r {
98 Ok(value) => {
99 self.connection_state_ok = true;
100
101 Ok(value)
102 }
103 Err(err) => {
104 match err.kind() {
105 ErrorKind::IoError | ErrorKind::ResponseError => {
110 self.invalidate();
112 }
113 _ => {
114 self.connection_state_ok = true;
115 }
116 }
117 Err(err)
118 }
119 }
120 }
121 .boxed()
122 }
123
124 fn req_packed_commands<'a>(
125 &'a mut self,
126 pipeline: &'a Pipeline,
127 offset: usize,
128 count: usize,
129 ) -> RedisFuture<'a, Vec<Value>> {
130 async move {
131 self.connection_state_ok = false;
132 let command_timeout = self.command_timeout;
133
134 let conn = self.get_connection()?;
135
136 let f = conn.req_packed_commands(pipeline, offset, count);
137 let r = if let Some(command_timeout) = command_timeout {
138 let started = Instant::now();
139 match timeout(command_timeout, f).await {
140 Ok(r) => r,
141 Err(_) => {
142 let message = format!(
143 "command timeout after {:?} on `req_packed_commands`.",
144 started.elapsed()
145 );
146 let err: RedisError =
147 (ErrorKind::IoError, "command timeout", message).into();
148 Err(err)
149 }
150 }
151 } else {
152 f.await
153 };
154
155 match r {
156 Ok(values) => {
157 self.connection_state_ok = true;
158
159 Ok(values)
160 }
161 Err(err) => {
162 match err.kind() {
163 ErrorKind::IoError | ErrorKind::ResponseError => {
168 self.invalidate();
170 }
171 _ => {
172 self.connection_state_ok = true;
173 }
174 }
175 Err(err)
176 }
177 }
178 }
179 .boxed()
180 }
181
182 fn get_db(&self) -> i64 {
183 if let Some(conn) = self.managed.as_ref() {
184 conn.get_db()
185 } else {
186 -1
187 }
188 }
189}
190
191impl<T: Poolable> Drop for PoolConnection<T> {
192 fn drop(&mut self) {
193 if !self.connection_state_ok {
194 self.invalidate();
195 }
196 }
197}
198
199pub enum ConnectionFlavour {
200 RedisRs(redis::aio::Connection, Arc<String>),
201 }
203
204impl Poolable for ConnectionFlavour {
205 fn connected_to(&self) -> &str {
206 match self {
207 ConnectionFlavour::RedisRs(_, c) => &c,
208 }
209 }
210}
211
212impl ConnectionLike for ConnectionFlavour {
213 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
214 match self {
215 ConnectionFlavour::RedisRs(conn, _uri) => conn.req_packed_command(cmd),
216 }
217 }
218
219 fn req_packed_commands<'a>(
220 &'a mut self,
221 pipeline: &'a Pipeline,
222 offset: usize,
223 count: usize,
224 ) -> RedisFuture<'a, Vec<Value>> {
225 match self {
226 ConnectionFlavour::RedisRs(conn, _) => {
227 conn.req_packed_commands(pipeline, offset, count)
228 }
229 }
230 }
231
232 fn get_db(&self) -> i64 {
233 match self {
234 ConnectionFlavour::RedisRs(conn, _) => conn.get_db(),
235 }
236 }
237}
238
239impl<T: Poolable> ConnectionLike for Managed<T>
240where
241 T: ConnectionLike,
242{
243 fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
244 async move {
245 let conn = match self.connection_mut() {
246 Some(conn) => conn,
247 None => {
248 return Err(
249 (ErrorKind::IoError, "no connection - this is a bug of reool").into(),
250 )
251 }
252 };
253
254 let value = conn.req_packed_command(cmd).await?;
255
256 Ok(value)
257 }
258 .boxed()
259 }
260
261 fn req_packed_commands<'a>(
262 &'a mut self,
263 pipeline: &'a Pipeline,
264 offset: usize,
265 count: usize,
266 ) -> RedisFuture<'a, Vec<Value>> {
267 async move {
268 let conn = match self.connection_mut() {
269 Some(conn) => conn,
270 None => {
271 return Err(
272 (ErrorKind::IoError, "no connection - this is a bug of reool").into(),
273 )
274 }
275 };
276
277 let values = conn.req_packed_commands(pipeline, offset, count).await?;
278
279 Ok(values)
280 }
281 .boxed()
282 }
283
284 fn get_db(&self) -> i64 {
285 if let Some(conn) = self.connection() {
286 conn.get_db()
287 } else {
288 -1
289 }
290 }
291}