actix_diesel/
builder.rs

1use crate::{executor::Executor, Database};
2use actix::SyncArbiter;
3use diesel::{
4    r2d2::{ConnectionManager, CustomizeConnection, Error as R2D2Error, Pool},
5    Connection,
6};
7use once_cell::sync::OnceCell;
8use std::{
9    fmt::{self, Debug},
10    marker::PhantomData,
11    sync::Arc,
12    time::Duration,
13};
14
15pub struct Builder<C: 'static>
16where
17    C: Connection,
18{
19    pub(crate) phantom: PhantomData<C>,
20    pub(crate) pool_max_size: Option<u32>,
21    pub(crate) pool_min_idle: Option<Option<u32>>,
22    pub(crate) pool_max_lifetime: Option<Option<Duration>>,
23    pub(crate) on_acquire: Option<Box<Fn(&mut C) -> Result<(), R2D2Error> + Send + Sync>>,
24    pub(crate) on_release: Option<Box<Fn(C) + Send + Sync>>,
25}
26
27impl<C> Builder<C>
28where
29    C: Connection,
30{
31    #[inline]
32    pub fn pool_max_size(&mut self, max_size: u32) -> &mut Self {
33        self.pool_max_size = Some(max_size);
34        self
35    }
36
37    #[inline]
38    pub fn pool_min_idle(&mut self, min_idle: Option<u32>) -> &mut Self {
39        self.pool_min_idle = Some(min_idle);
40        self
41    }
42
43    #[inline]
44    pub fn pool_max_lifetime(&mut self, max_lifetime: Option<Duration>) -> &mut Self {
45        self.pool_max_lifetime = Some(max_lifetime);
46        self
47    }
48
49    #[inline]
50    pub fn on_acquire(
51        &mut self,
52        on_acquire: impl Fn(&mut C) -> Result<(), R2D2Error> + 'static + Send + Sync,
53    ) -> &mut Self {
54        self.on_acquire = Some(Box::new(on_acquire));
55        self
56    }
57
58    #[inline]
59    pub fn on_release(&mut self, on_release: impl Fn(C) + 'static + Send + Sync) -> &mut Self {
60        self.on_release = Some(Box::new(on_release));
61        self
62    }
63
64    pub fn open(&mut self, url: impl Into<String>) -> Database<C> {
65        let manager = ConnectionManager::<C>::new(url);
66        let mut p = Pool::builder();
67
68        if let Some(max_size) = self.pool_max_size {
69            p = p.max_size(max_size);
70        }
71
72        if let Some(min_idle) = self.pool_min_idle {
73            p = p.min_idle(min_idle);
74        }
75
76        if let Some(max_lifetime) = self.pool_max_lifetime {
77            p = p.max_lifetime(max_lifetime);
78        }
79
80        if self.on_acquire.is_some() || self.on_release.is_some() {
81            p = p.connection_customizer(Box::new(FnConnectionCustomizer {
82                on_acquire: self.on_acquire.take(),
83                on_release: self.on_release.take(),
84            }));
85        }
86
87        let pool = p.build_unchecked(manager);
88
89        Database {
90            pool,
91            cell: Arc::new(OnceCell::new()),
92            init: |pool| SyncArbiter::start(num_cpus::get(), move || Executor(pool.clone())),
93        }
94    }
95}
96
97struct FnConnectionCustomizer<C: 'static> {
98    on_acquire: Option<Box<Fn(&mut C) -> Result<(), R2D2Error> + Send + Sync>>,
99    on_release: Option<Box<Fn(C) + Send + Sync>>,
100}
101
102impl<C> Debug for FnConnectionCustomizer<C> {
103    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104        f.write_str("FnConnectionCustomizer")
105    }
106}
107
108impl<C> CustomizeConnection<C, R2D2Error> for FnConnectionCustomizer<C> {
109    #[inline]
110    fn on_acquire(&self, conn: &mut C) -> Result<(), R2D2Error> {
111        if let Some(on_acquire) = &self.on_acquire {
112            (on_acquire)(conn)
113        } else {
114            Ok(())
115        }
116    }
117
118    #[inline]
119    fn on_release(&self, conn: C) {
120        if let Some(on_release) = &self.on_release {
121            (on_release)(conn)
122        }
123    }
124}