1use crate::{executor::Executor, Database};
2use actix::SyncArbiter;
3use diesel::{
4 r2d2::{ConnectionManager, CustomizeConnection, Error as R2D2Error, Pool},
5 Connection,
6};
7use once_cell::sync::OnceCell;
8use std::{
9 fmt::{self, Debug},
10 marker::PhantomData,
11 sync::Arc,
12 time::Duration,
13};
14
15pub struct Builder<C: 'static>
16where
17 C: Connection,
18{
19 pub(crate) phantom: PhantomData<C>,
20 pub(crate) pool_max_size: Option<u32>,
21 pub(crate) pool_min_idle: Option<Option<u32>>,
22 pub(crate) pool_max_lifetime: Option<Option<Duration>>,
23 pub(crate) on_acquire: Option<Box<Fn(&mut C) -> Result<(), R2D2Error> + Send + Sync>>,
24 pub(crate) on_release: Option<Box<Fn(C) + Send + Sync>>,
25}
26
27impl<C> Builder<C>
28where
29 C: Connection,
30{
31 #[inline]
32 pub fn pool_max_size(&mut self, max_size: u32) -> &mut Self {
33 self.pool_max_size = Some(max_size);
34 self
35 }
36
37 #[inline]
38 pub fn pool_min_idle(&mut self, min_idle: Option<u32>) -> &mut Self {
39 self.pool_min_idle = Some(min_idle);
40 self
41 }
42
43 #[inline]
44 pub fn pool_max_lifetime(&mut self, max_lifetime: Option<Duration>) -> &mut Self {
45 self.pool_max_lifetime = Some(max_lifetime);
46 self
47 }
48
49 #[inline]
50 pub fn on_acquire(
51 &mut self,
52 on_acquire: impl Fn(&mut C) -> Result<(), R2D2Error> + 'static + Send + Sync,
53 ) -> &mut Self {
54 self.on_acquire = Some(Box::new(on_acquire));
55 self
56 }
57
58 #[inline]
59 pub fn on_release(&mut self, on_release: impl Fn(C) + 'static + Send + Sync) -> &mut Self {
60 self.on_release = Some(Box::new(on_release));
61 self
62 }
63
64 pub fn open(&mut self, url: impl Into<String>) -> Database<C> {
65 let manager = ConnectionManager::<C>::new(url);
66 let mut p = Pool::builder();
67
68 if let Some(max_size) = self.pool_max_size {
69 p = p.max_size(max_size);
70 }
71
72 if let Some(min_idle) = self.pool_min_idle {
73 p = p.min_idle(min_idle);
74 }
75
76 if let Some(max_lifetime) = self.pool_max_lifetime {
77 p = p.max_lifetime(max_lifetime);
78 }
79
80 if self.on_acquire.is_some() || self.on_release.is_some() {
81 p = p.connection_customizer(Box::new(FnConnectionCustomizer {
82 on_acquire: self.on_acquire.take(),
83 on_release: self.on_release.take(),
84 }));
85 }
86
87 let pool = p.build_unchecked(manager);
88
89 Database {
90 pool,
91 cell: Arc::new(OnceCell::new()),
92 init: |pool| SyncArbiter::start(num_cpus::get(), move || Executor(pool.clone())),
93 }
94 }
95}
96
97struct FnConnectionCustomizer<C: 'static> {
98 on_acquire: Option<Box<Fn(&mut C) -> Result<(), R2D2Error> + Send + Sync>>,
99 on_release: Option<Box<Fn(C) + Send + Sync>>,
100}
101
102impl<C> Debug for FnConnectionCustomizer<C> {
103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104 f.write_str("FnConnectionCustomizer")
105 }
106}
107
108impl<C> CustomizeConnection<C, R2D2Error> for FnConnectionCustomizer<C> {
109 #[inline]
110 fn on_acquire(&self, conn: &mut C) -> Result<(), R2D2Error> {
111 if let Some(on_acquire) = &self.on_acquire {
112 (on_acquire)(conn)
113 } else {
114 Ok(())
115 }
116 }
117
118 #[inline]
119 fn on_release(&self, conn: C) {
120 if let Some(on_release) = &self.on_release {
121 (on_release)(conn)
122 }
123 }
124}