palpo_data/
pool.rs

1use std::ops::Deref;
2use std::time::Duration;
3
4use diesel::prelude::*;
5use diesel::r2d2::{self, ConnectionManager, State};
6use thiserror::Error;
7
8use super::{DbConfig, connection_url};
9
10pub type PgPool = r2d2::Pool<ConnectionManager<PgConnection>>;
11pub type PgPooledConnection = r2d2::PooledConnection<ConnectionManager<PgConnection>>;
12
13#[derive(Clone, Debug)]
14pub struct DieselPool {
15    inner: PgPool,
16}
17
18impl DieselPool {
19    pub(crate) fn new(
20        url: &String,
21        config: &DbConfig,
22        r2d2_config: r2d2::Builder<ConnectionManager<PgConnection>>,
23    ) -> Result<DieselPool, PoolError> {
24        let manager = ConnectionManager::new(connection_url(config, url));
25
26        let pool = DieselPool {
27            inner: r2d2_config.build_unchecked(manager),
28        };
29        match pool.wait_until_healthy(Duration::from_secs(5)) {
30            Ok(()) => {}
31            Err(PoolError::UnhealthyPool) => {}
32            Err(err) => return Err(err),
33        }
34
35        Ok(pool)
36    }
37
38    pub fn new_background_worker(inner: r2d2::Pool<ConnectionManager<PgConnection>>) -> Self {
39        Self { inner }
40    }
41
42    pub fn get(&self) -> Result<PgPooledConnection, PoolError> {
43        Ok(self.inner.get()?)
44    }
45
46    pub fn state(&self) -> State {
47        self.inner.state()
48    }
49
50    pub fn wait_until_healthy(&self, timeout: Duration) -> Result<(), PoolError> {
51        match self.inner.get_timeout(timeout) {
52            Ok(_) => Ok(()),
53            Err(_) if !self.is_healthy() => Err(PoolError::UnhealthyPool),
54            Err(err) => Err(PoolError::R2D2(err)),
55        }
56    }
57
58    fn is_healthy(&self) -> bool {
59        self.state().connections > 0
60    }
61}
62
63impl Deref for DieselPool {
64    type Target = PgPool;
65
66    fn deref(&self) -> &Self::Target {
67        &self.inner
68    }
69}
70
71#[derive(Debug, Error)]
72pub enum PoolError {
73    #[error(transparent)]
74    R2D2(#[from] r2d2::PoolError),
75    #[error("unhealthy database pool")]
76    UnhealthyPool,
77    #[error("Failed to lock test database connection")]
78    TestConnectionUnavailable,
79}