pepe_pgpool/
lib.rs

1use std::time::Duration;
2
3use diesel::{
4    r2d2,
5    r2d2::{ConnectionManager, PooledConnection},
6    result::Error as DieselError,
7    Connection, PgConnection,
8};
9use diesel_migrations::EmbeddedMigrations;
10use pepe_pg::check_pending_migrations;
11
12pub type PgConnectionManager = ConnectionManager<PgConnection>;
13pub type PgPool = r2d2::Pool<PgConnectionManager>;
14pub type PgPooledConnection = PooledConnection<PgConnectionManager>;
15
16#[derive(thiserror::Error, Debug)]
17pub enum Error {
18    #[error("create pool: {0}")]
19    CreatePool(String),
20    #[error("get connection from pool: {0}")]
21    GetConnection(String),
22    #[error("pepe pg migrations error: {0}")]
23    PepePgMigrations(pepe_pg::MigrationError),
24    #[error("diesel failed: {0}")]
25    Diesel(DieselError),
26}
27
28#[derive(Clone)]
29pub struct Pool {
30    pool: PgPool,
31}
32
33#[derive(Clone, Debug)]
34pub struct PoolOpts {
35    pub connection_timeout: Option<Duration>,
36    pub idle_timeout: Option<Duration>,
37    pub max_size: Option<u32>,
38    pub min_idle: Option<u32>,
39    pub test_on_check_out: Option<bool>,
40}
41
42impl Default for PoolOpts {
43    fn default() -> Self {
44        Self {
45            connection_timeout: Some(Duration::from_secs(10)),
46            idle_timeout: None,
47            max_size: None,
48            min_idle: None,
49            test_on_check_out: Some(true),
50        }
51    }
52}
53
54impl Pool {
55    pub fn new(connection_url: String, migrations: EmbeddedMigrations) -> Result<Self, Error> {
56        Self::new_with_opts(connection_url, migrations, PoolOpts::default())
57    }
58
59    pub fn new_with_opts(
60        connection_url: String,
61        migrations: EmbeddedMigrations,
62        opts: PoolOpts,
63    ) -> Result<Self, Error> {
64        let mut builder = r2d2::Pool::builder();
65
66        if let Some(connection_timeout) = opts.connection_timeout {
67            builder = builder.connection_timeout(connection_timeout);
68        }
69
70        if let Some(idle_timeout) = opts.idle_timeout {
71            builder = builder.idle_timeout(Some(idle_timeout));
72        }
73
74        if let Some(min_idle) = opts.min_idle {
75            builder = builder.min_idle(Some(min_idle));
76        }
77
78        if let Some(max_size) = opts.max_size {
79            builder = builder.max_size(max_size);
80        }
81
82        if let Some(test_on_check_out) = opts.test_on_check_out {
83            builder = builder.test_on_check_out(test_on_check_out);
84        }
85
86        let pool = builder
87            .build(ConnectionManager::<PgConnection>::new(connection_url))
88            .map_err(|e| Error::CreatePool(e.to_string()))?;
89
90        let mut conn = pool
91            .get()
92            .map_err(|e| Error::GetConnection(e.to_string()))?;
93
94        check_pending_migrations(&mut conn, migrations).map_err(Error::PepePgMigrations)?;
95
96        Ok(Self { pool })
97    }
98
99    pub fn execute<T, Q, E>(&self, query: Q) -> Result<T, E>
100    where
101        T: Send + 'static,
102        Q: FnOnce(PgPooledConnection) -> Result<T, E> + Send + 'static,
103        E: From<Error>,
104    {
105        let conn = self.connection()?;
106        tokio::task::block_in_place(|| query(conn))
107    }
108
109    pub fn transaction<E>(
110        &self,
111        f: impl FnOnce(&mut PgPooledConnection) -> Result<(), E>,
112    ) -> Result<(), E>
113    where
114        E: From<Error> + From<DieselError>,
115    {
116        let mut conn = self.connection()?;
117        conn.transaction(|conn| {
118            f(conn)?;
119            Result::<(), E>::Ok(())
120        })
121        .map_err(E::from)
122    }
123
124    fn connection(&self) -> Result<PgPooledConnection, Error> {
125        self.pool
126            .get()
127            .map_err(|e| Error::GetConnection(e.to_string()))
128    }
129}