1use std::time::Duration;
2
3use futures::future::BoxFuture;
4use tokio::time::sleep;
5
6use crate::{CheckoutError, CheckoutErrorKind, Poolable};
7
8use pool_internal::Managed;
9
10mod checkout_constraint;
11pub(crate) mod pool_internal;
12mod pool_per_node;
13mod single_pool;
14
15pub(crate) use self::checkout_constraint::*;
16pub(crate) use self::pool_per_node::PoolPerNode;
17pub(crate) use self::single_pool::SinglePool;
18
19pub(crate) trait CanCheckout<T: Poolable> {
21 fn check_out<'a, M: Into<CheckoutConstraint> + Send + 'static>(
23 &'a self,
24 constraint: M,
25 ) -> BoxFuture<'a, Result<Managed<T>, CheckoutError>>;
26}
27
28pub(crate) async fn check_out_maybe_retry_on_queue_limit_reached<P, T, M>(
32 pool: &P,
33 constraint: M,
34 retry_enabled: bool,
35) -> Result<Managed<T>, CheckoutError>
36where
37 P: CanCheckout<T> + Clone + Send + 'static,
38 T: Poolable,
39 M: Into<CheckoutConstraint> + Send + 'static,
40{
41 if !retry_enabled {
42 return pool.check_out(constraint).await;
43 }
44
45 let constraint = constraint.into();
46
47 match pool.check_out(constraint).await {
48 Ok(conn) => Ok(conn),
49 Err(err) => {
50 if err.kind() != CheckoutErrorKind::CheckoutLimitReached {
51 Err(err)
52 } else {
53 retry_on_queue_limit_reached(pool, constraint, err.kind()).await
54 }
55 }
56 }
57}
58
59async fn retry_on_queue_limit_reached<P, T>(
60 pool: &P,
61 constraint: CheckoutConstraint,
62 last_err: CheckoutErrorKind,
63) -> Result<Managed<T>, CheckoutError>
64where
65 P: CanCheckout<T> + Send + 'static,
66 T: Poolable,
67{
68 loop {
69 if !constraint.can_wait_for_dispatch() {
70 return Err(CheckoutError::from(last_err));
71 }
72
73 match pool.check_out(constraint).await {
74 Ok(conn) => return Ok(conn),
75 Err(err) => {
76 if err.kind() != CheckoutErrorKind::CheckoutLimitReached {
77 return Err(err);
78 }
79
80 sleep(Duration::from_millis(1)).await;
81 }
82 }
83 }
84}