1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use crate::{executor::Executor, Database};
use actix::SyncArbiter;
use diesel::{
    r2d2::{ConnectionManager, CustomizeConnection, Error as R2D2Error, Pool},
    Connection,
};
use once_cell::sync::OnceCell;
use std::{
    fmt::{self, Debug},
    marker::PhantomData,
    sync::Arc,
    time::Duration,
};

pub struct Builder<C: 'static>
where
    C: Connection,
{
    pub(crate) phantom: PhantomData<C>,
    pub(crate) pool_max_size: Option<u32>,
    pub(crate) pool_min_idle: Option<Option<u32>>,
    pub(crate) pool_max_lifetime: Option<Option<Duration>>,
    pub(crate) on_acquire: Option<Box<Fn(&mut C) -> Result<(), R2D2Error> + Send + Sync>>,
    pub(crate) on_release: Option<Box<Fn(C) + Send + Sync>>,
}

impl<C> Builder<C>
where
    C: Connection,
{
    #[inline]
    pub fn pool_max_size(&mut self, max_size: u32) -> &mut Self {
        self.pool_max_size = Some(max_size);
        self
    }

    #[inline]
    pub fn pool_min_idle(&mut self, min_idle: Option<u32>) -> &mut Self {
        self.pool_min_idle = Some(min_idle);
        self
    }

    #[inline]
    pub fn pool_max_lifetime(&mut self, max_lifetime: Option<Duration>) -> &mut Self {
        self.pool_max_lifetime = Some(max_lifetime);
        self
    }

    #[inline]
    pub fn on_acquire(
        &mut self,
        on_acquire: impl Fn(&mut C) -> Result<(), R2D2Error> + 'static + Send + Sync,
    ) -> &mut Self {
        self.on_acquire = Some(Box::new(on_acquire));
        self
    }

    #[inline]
    pub fn on_release(&mut self, on_release: impl Fn(C) + 'static + Send + Sync) -> &mut Self {
        self.on_release = Some(Box::new(on_release));
        self
    }

    pub fn open(&mut self, url: impl Into<String>) -> Database<C> {
        let manager = ConnectionManager::<C>::new(url);
        let mut p = Pool::builder();

        if let Some(max_size) = self.pool_max_size {
            p = p.max_size(max_size);
        }

        if let Some(min_idle) = self.pool_min_idle {
            p = p.min_idle(min_idle);
        }

        if let Some(max_lifetime) = self.pool_max_lifetime {
            p = p.max_lifetime(max_lifetime);
        }

        if self.on_acquire.is_some() || self.on_release.is_some() {
            p = p.connection_customizer(Box::new(FnConnectionCustomizer {
                on_acquire: self.on_acquire.take(),
                on_release: self.on_release.take(),
            }));
        }

        let pool = p.build_unchecked(manager);

        Database {
            pool,
            cell: Arc::new(OnceCell::new()),
            init: |pool| SyncArbiter::start(num_cpus::get(), move || Executor(pool.clone())),
        }
    }
}

struct FnConnectionCustomizer<C: 'static> {
    on_acquire: Option<Box<Fn(&mut C) -> Result<(), R2D2Error> + Send + Sync>>,
    on_release: Option<Box<Fn(C) + Send + Sync>>,
}

impl<C> Debug for FnConnectionCustomizer<C> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str("FnConnectionCustomizer")
    }
}

impl<C> CustomizeConnection<C, R2D2Error> for FnConnectionCustomizer<C> {
    #[inline]
    fn on_acquire(&self, conn: &mut C) -> Result<(), R2D2Error> {
        if let Some(on_acquire) = &self.on_acquire {
            (on_acquire)(conn)
        } else {
            Ok(())
        }
    }

    #[inline]
    fn on_release(&self, conn: C) {
        if let Some(on_release) = &self.on_release {
            (on_release)(conn)
        }
    }
}