reool/pools/
mod.rs

1use std::time::Duration;
2
3use futures::future::BoxFuture;
4use tokio::time::sleep;
5
6use crate::{CheckoutError, CheckoutErrorKind, Poolable};
7
8use pool_internal::Managed;
9
10mod checkout_constraint;
11pub(crate) mod pool_internal;
12mod pool_per_node;
13mod single_pool;
14
15pub(crate) use self::checkout_constraint::*;
16pub(crate) use self::pool_per_node::PoolPerNode;
17pub(crate) use self::single_pool::SinglePool;
18
19/// Something that can checkout
20pub(crate) trait CanCheckout<T: Poolable> {
21    /// Directly do a checkout
22    fn check_out<'a, M: Into<CheckoutConstraint> + Send + 'static>(
23        &'a self,
24        constraint: M,
25    ) -> BoxFuture<'a, Result<Managed<T>, CheckoutError>>;
26}
27
28/// Retry the checkout if the checkout failed with a
29/// `CheckoutLimitReached` as long as a retry is allowed
30/// by the constraint
31pub(crate) async fn check_out_maybe_retry_on_queue_limit_reached<P, T, M>(
32    pool: &P,
33    constraint: M,
34    retry_enabled: bool,
35) -> Result<Managed<T>, CheckoutError>
36where
37    P: CanCheckout<T> + Clone + Send + 'static,
38    T: Poolable,
39    M: Into<CheckoutConstraint> + Send + 'static,
40{
41    if !retry_enabled {
42        return pool.check_out(constraint).await;
43    }
44
45    let constraint = constraint.into();
46
47    match pool.check_out(constraint).await {
48        Ok(conn) => Ok(conn),
49        Err(err) => {
50            if err.kind() != CheckoutErrorKind::CheckoutLimitReached {
51                Err(err)
52            } else {
53                retry_on_queue_limit_reached(pool, constraint, err.kind()).await
54            }
55        }
56    }
57}
58
59async fn retry_on_queue_limit_reached<P, T>(
60    pool: &P,
61    constraint: CheckoutConstraint,
62    last_err: CheckoutErrorKind,
63) -> Result<Managed<T>, CheckoutError>
64where
65    P: CanCheckout<T> + Send + 'static,
66    T: Poolable,
67{
68    loop {
69        if !constraint.can_wait_for_dispatch() {
70            return Err(CheckoutError::from(last_err));
71        }
72
73        match pool.check_out(constraint).await {
74            Ok(conn) => return Ok(conn),
75            Err(err) => {
76                if err.kind() != CheckoutErrorKind::CheckoutLimitReached {
77                    return Err(err);
78                }
79
80                sleep(Duration::from_millis(1)).await;
81            }
82        }
83    }
84}