1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use std::time::{Duration, Instant};
use futures::future::{self, Future, Loop};
use log::error;
use tokio::timer::Delay;
use crate::{CheckoutError, CheckoutErrorKind, Poolable};
use pool_internal::CheckoutManaged;
mod checkout_constraint;
pub(crate) mod pool_internal;
mod pool_per_node;
mod single_pool;
pub(crate) use self::checkout_constraint::*;
pub(crate) use self::pool_per_node::PoolPerNode;
pub(crate) use self::single_pool::SinglePool;
pub(crate) trait CanCheckout<T: Poolable> {
fn check_out<M: Into<CheckoutConstraint>>(&self, constraint: M) -> CheckoutManaged<T>;
}
pub(crate) fn check_out_maybe_retry_on_queue_limit_reached<P, T, M>(
pool: &P,
constraint: M,
retry_enabled: bool,
) -> CheckoutManaged<T>
where
P: CanCheckout<T> + Clone + Send + 'static,
T: Poolable,
M: Into<CheckoutConstraint>,
{
if !retry_enabled {
pool.check_out(constraint)
} else {
let pool = pool.clone();
let constraint = constraint.into();
CheckoutManaged::new(pool.check_out(constraint).or_else(move |err| {
if err.kind() != CheckoutErrorKind::CheckoutLimitReached {
CheckoutManaged::error(err.kind())
} else {
retry_on_queue_limit_reached(pool, constraint, err.kind())
}
}))
}
}
fn retry_on_queue_limit_reached<P, T>(
pool: P,
constraint: CheckoutConstraint,
last_err: CheckoutErrorKind,
) -> CheckoutManaged<T>
where
P: CanCheckout<T> + Send + 'static,
T: Poolable,
{
CheckoutManaged::new(future::loop_fn(
(pool, last_err),
move |(pool, last_err)| {
if !constraint.can_wait_for_dispatch() {
Box::new(future::err(CheckoutError::from(last_err)))
} else {
let f = pool.check_out(constraint).then(|r| match r {
Ok(conn) => Box::new(future::ok(Loop::Break(conn))),
Err(err) => {
if err.kind() != CheckoutErrorKind::CheckoutLimitReached {
Box::new(future::err(err))
} else {
let delayed = Delay::new(Instant::now() + Duration::from_millis(1))
.map_err(|err| {
error!("A timer error occurred: {}", err);
CheckoutError::new(CheckoutErrorKind::TaskExecution)
})
.map(move |()| Loop::Continue((pool, err.kind())));
Box::new(delayed) as Box<dyn Future<Item = _, Error = _> + Send>
}
}
});
Box::new(f) as Box<dyn Future<Item = _, Error = _> + Send>
}
},
))
}