tang_rs/
manager.rs

1use core::fmt::Debug;
2use core::future::Future;
3use core::pin::Pin;
4use core::time::Duration;
5#[cfg(feature = "no-send")]
6use std::rc::Rc as WrapPointer;
7#[cfg(not(feature = "no-send"))]
8use std::sync::Arc as WrapPointer;
9
10use crate::pool::SharedManagedPool;
11use crate::util::timeout::ManagerTimeout;
12
13#[cfg(not(feature = "no-send"))]
14pub type ManagerFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
15
16#[cfg(feature = "no-send")]
17pub type ManagerFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
18
19/// # Types for different runtimes:
20///
21/// Trait type                  runtime                 Type                        constructor
22/// <Manager::Timeout>          tokio                   tokio::time::Delay          tokio::time::delay_for
23///                             async-std               smol::Timer                 smol::Timer::after
24///
25/// <Manager::TimeoutError>     tokio                   ()
26///                             async-std               std::time::Instant
27pub trait Manager: Sized + Send + Sync + 'static {
28    type Connection: Unpin + 'static;
29    type Error: Debug + From<Self::TimeoutError> + 'static;
30    type Timeout: Future<Output = Self::TimeoutError> + Send;
31    type TimeoutError: Send + Debug + 'static;
32
33    /// generate a new connection and put it into pool.
34    fn connect(&self) -> ManagerFuture<Result<Self::Connection, Self::Error>>;
35
36    /// check if a connection is valid.
37    ///
38    /// *. Only called when `Builder.always_check == true`
39    fn is_valid<'a>(
40        &'a self,
41        conn: &'a mut Self::Connection,
42    ) -> ManagerFuture<'a, Result<(), Self::Error>>;
43
44    /// check if a connection is closed.
45    ///
46    /// This happens before a connection put back to pool.
47    fn is_closed(&self, conn: &mut Self::Connection) -> bool;
48
49    /// spawn futures on your executor
50    ///
51    /// The future have to be `Send + 'static` and the return type(e.g. JoinHandler) of your executor will be ignored.
52    #[cfg(not(feature = "no-send"))]
53    fn spawn<Fut>(&self, fut: Fut)
54    where
55        Fut: Future<Output = ()> + Send + 'static;
56
57    #[cfg(feature = "no-send")]
58    fn spawn<Fut>(&self, fut: Fut)
59    where
60        Fut: Future<Output = ()> + 'static;
61
62    /// Used to cancel futures and return `Manager::TimeoutError`.
63    ///
64    /// The duration is determined by `Builder.wait_timeout` and `Builder.connection_timeout`
65    fn timeout<Fut: Future>(&self, fut: Fut, _dur: Duration) -> ManagerTimeout<Fut, Self::Timeout>;
66
67    /// This method will be called when `Pool<Manager>::init()` executes.
68    fn on_start(&self, _shared_pool: &SharedManagedPool<Self>) {}
69
70    /// This method will be called when `Pool<Manager>` is dropping
71    fn on_stop(&self) {}
72}
73
74/// helper trait to spawn default garbage collect process to `Pool<Manager>`.
75pub trait GarbageCollect: Manager + ManagerInterval {
76    fn garbage_collect(&self, shared_pool: &SharedManagedPool<Self>) {
77        let builder = shared_pool.get_builder();
78        if builder.use_gc {
79            let rate = builder.get_reaper_rate();
80            let shared_pool = WrapPointer::downgrade(shared_pool);
81
82            let mut interval = Self::interval(rate * 6);
83            self.spawn(async move {
84                loop {
85                    let _i = Self::tick(&mut interval).await;
86                    match shared_pool.upgrade() {
87                        Some(shared_pool) => {
88                            if shared_pool.is_running() {
89                                shared_pool.garbage_collect();
90                            }
91                        }
92                        None => break,
93                    }
94                }
95            });
96        }
97    }
98}
99
100/// helper trait to spawn default schedule reaping process to `Pool<Manager>`.
101pub trait ScheduleReaping: Manager + ManagerInterval {
102    // schedule reaping runs in a spawned future.
103    fn schedule_reaping(&self, shared_pool: &SharedManagedPool<Self>) {
104        let builder = shared_pool.get_builder();
105        if builder.max_lifetime.is_some() || builder.idle_timeout.is_some() {
106            let rate = builder.get_reaper_rate();
107
108            let shared_pool = WrapPointer::downgrade(shared_pool);
109
110            let mut interval = Self::interval(rate);
111            self.spawn(async move {
112                loop {
113                    let _i = Self::tick(&mut interval).await;
114                    match shared_pool.upgrade() {
115                        Some(shared_pool) => {
116                            if shared_pool.is_running() {
117                                let _ = shared_pool.reap_idle_conn().await;
118                            }
119                        }
120                        None => break,
121                    }
122                }
123            });
124        }
125    }
126}
127
128/// helper trait as we have different interval tick api in different runtime
129pub trait ManagerInterval {
130    type Interval: Send;
131    type Tick: Send;
132
133    fn interval(dur: Duration) -> Self::Interval;
134
135    fn tick(tick: &mut Self::Interval) -> ManagerFuture<'_, Self::Tick>;
136}