actix_diesel/
db.rs

1use crate::{
2    executor::{Execute, Executor},
3    AsyncError, Builder,
4};
5use actix::Addr;
6use diesel::{
7    r2d2::{ConnectionManager, Pool},
8    Connection,
9};
10use futures::Future;
11use once_cell::sync::OnceCell;
12use std::{fmt::Debug, marker::PhantomData, sync::Arc};
13
14pub struct Database<C: 'static>
15where
16    C: Connection,
17{
18    pub(crate) cell: Arc<OnceCell<Addr<Executor<C>>>>,
19    pub(crate) pool: Pool<ConnectionManager<C>>,
20    pub(crate) init: fn(Pool<ConnectionManager<C>>) -> Addr<Executor<C>>,
21}
22
23impl<C> Clone for Database<C>
24where
25    C: Connection,
26{
27    fn clone(&self) -> Self {
28        Database {
29            cell: self.cell.clone(),
30            init: self.init.clone(),
31            pool: self.pool.clone(),
32        }
33    }
34}
35
36impl<C> Database<C>
37where
38    C: Connection,
39{
40    #[inline]
41    pub fn open(url: impl Into<String>) -> Database<C> {
42        Self::builder().open(url)
43    }
44
45    #[inline]
46    pub fn builder() -> Builder<C> {
47        Builder {
48            phantom: PhantomData,
49            pool_max_size: None,
50            pool_min_idle: None,
51            pool_max_lifetime: None,
52            on_acquire: None,
53            on_release: None,
54        }
55    }
56
57    /// Executes the given function inside a database transaction.
58    #[inline]
59    pub fn transaction<F, R, E>(&self, f: F) -> impl Future<Item = R, Error = AsyncError<E>>
60    where
61        F: 'static + FnOnce(&C) -> Result<R, E> + Send,
62        R: 'static + Send,
63        E: 'static + From<diesel::result::Error> + Debug + Send + Sync,
64    {
65        self.get(move |conn| conn.transaction(move || f(conn)))
66    }
67
68    /// Executes the given function with a connection retrieved from the pool.
69    ///
70    /// This is non-blocking and uses a `SyncArbiter` to provide a thread pool.
71    pub fn get<F, R, E>(&self, f: F) -> impl Future<Item = R, Error = AsyncError<E>>
72    where
73        F: 'static + FnOnce(&C) -> Result<R, E> + Send,
74        R: 'static + Send,
75        E: 'static + Debug + Send + Sync,
76    {
77        self.cell
78            .get_or_init(|| (self.init)(self.pool.clone()))
79            .send(Execute(f, PhantomData))
80            .then(|res| -> Result<R, AsyncError<E>> {
81                match res {
82                    Ok(res) => match res {
83                        Ok(res) => match res {
84                            Ok(value) => Ok(value),
85                            Err(err) => Err(AsyncError::Execute(err)),
86                        },
87
88                        Err(err) => Err(AsyncError::Timeout(err)),
89                    },
90
91                    Err(err) => Err(AsyncError::Delivery(err)),
92                }
93            })
94    }
95}