1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
use core::fmt::Debug;
use core::future::Future;
use core::pin::Pin;
use core::time::Duration;
#[cfg(feature = "no-send")]
use std::rc::Rc as WrapPointer;
#[cfg(not(feature = "no-send"))]
use std::sync::Arc as WrapPointer;

use crate::pool::SharedManagedPool;
use crate::util::timeout::ManagerTimeout;

#[cfg(not(feature = "no-send"))]
pub type ManagerFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

#[cfg(feature = "no-send")]
pub type ManagerFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;

/// # Types for different runtimes:
///
/// Trait type                  runtime                 Type                        constructor
/// <Manager::Timeout>          tokio                   tokio::time::Delay          tokio::time::delay_for
///                             async-std               smol::Timer                 smol::Timer::after
///
/// <Manager::TimeoutError>     tokio                   ()
///                             async-std               std::time::Instant
pub trait Manager: Sized + Send + Sync + 'static {
    type Connection: Unpin + 'static;
    type Error: Debug + From<Self::TimeoutError> + 'static;
    type Timeout: Future<Output = Self::TimeoutError> + Send;
    type TimeoutError: Send + Debug + 'static;

    /// generate a new connection and put it into pool.
    fn connect(&self) -> ManagerFuture<Result<Self::Connection, Self::Error>>;

    /// check if a connection is valid.
    ///
    /// *. Only called when `Builder.always_check == true`
    fn is_valid<'a>(
        &'a self,
        conn: &'a mut Self::Connection,
    ) -> ManagerFuture<'a, Result<(), Self::Error>>;

    /// check if a connection is closed.
    ///
    /// This happens before a connection put back to pool.
    fn is_closed(&self, conn: &mut Self::Connection) -> bool;

    /// spawn futures on your executor
    ///
    /// The future have to be `Send + 'static` and the return type(e.g. JoinHandler) of your executor will be ignored.
    #[cfg(not(feature = "no-send"))]
    fn spawn<Fut>(&self, fut: Fut)
    where
        Fut: Future<Output = ()> + Send + 'static;

    #[cfg(feature = "no-send")]
    fn spawn<Fut>(&self, fut: Fut)
    where
        Fut: Future<Output = ()> + 'static;

    /// Used to cancel futures and return `Manager::TimeoutError`.
    ///
    /// The duration is determined by `Builder.wait_timeout` and `Builder.connection_timeout`
    fn timeout<Fut: Future>(&self, fut: Fut, _dur: Duration) -> ManagerTimeout<Fut, Self::Timeout>;

    /// This method will be called when `Pool<Manager>::init()` executes.
    fn on_start(&self, _shared_pool: &SharedManagedPool<Self>) {}

    /// This method will be called when `Pool<Manager>` is dropping
    fn on_stop(&self) {}
}

/// helper trait to spawn default garbage collect process to `Pool<Manager>`.
pub trait GarbageCollect: Manager + ManagerInterval {
    fn garbage_collect(&self, shared_pool: &SharedManagedPool<Self>) {
        let builder = shared_pool.get_builder();
        if builder.use_gc {
            let rate = builder.get_reaper_rate();
            let shared_pool = WrapPointer::downgrade(shared_pool);

            let mut interval = Self::interval(rate * 6);
            self.spawn(async move {
                loop {
                    let _i = Self::tick(&mut interval).await;
                    match shared_pool.upgrade() {
                        Some(shared_pool) => {
                            if shared_pool.is_running() {
                                shared_pool.garbage_collect();
                            }
                        }
                        None => break,
                    }
                }
            });
        }
    }
}

/// helper trait to spawn default schedule reaping process to `Pool<Manager>`.
pub trait ScheduleReaping: Manager + ManagerInterval {
    // schedule reaping runs in a spawned future.
    fn schedule_reaping(&self, shared_pool: &SharedManagedPool<Self>) {
        let builder = shared_pool.get_builder();
        if builder.max_lifetime.is_some() || builder.idle_timeout.is_some() {
            let rate = builder.get_reaper_rate();

            let shared_pool = WrapPointer::downgrade(shared_pool);

            let mut interval = Self::interval(rate);
            self.spawn(async move {
                loop {
                    let _i = Self::tick(&mut interval).await;
                    match shared_pool.upgrade() {
                        Some(shared_pool) => {
                            if shared_pool.is_running() {
                                let _ = shared_pool.reap_idle_conn().await;
                            }
                        }
                        None => break,
                    }
                }
            });
        }
    }
}

/// helper trait as we have different interval tick api in different runtime
pub trait ManagerInterval {
    type Interval: Send;
    type Tick: Send;

    fn interval(dur: Duration) -> Self::Interval;

    fn tick(tick: &mut Self::Interval) -> ManagerFuture<'_, Self::Tick>;
}