Skip to main content

xitca_postgres/
pool.rs

1mod connect;
2mod connection;
3mod execute;
4
5pub use self::connect::Connect;
6pub use self::connection::{CachedStatement, PoolConnection};
7
8use core::num::NonZeroUsize;
9
10use std::{collections::VecDeque, sync::Mutex};
11
12use tokio::sync::Semaphore;
13
14use super::{config::Config, error::Error};
15
16use self::{
17    connect::{ConnectorDyn, DefaultConnector},
18    connection::PoolClient,
19};
20
21/// builder type for connection pool
22pub struct PoolBuilder {
23    config: Result<Config, Error>,
24    capacity: usize,
25    cache_size: usize,
26    connector: Box<dyn ConnectorDyn>,
27}
28
29impl PoolBuilder {
30    /// set capacity. pool would spawn up to amount of capacity concurrent connections to database.
31    ///
32    /// # Default
33    /// capacity default to 1
34    pub fn capacity(mut self, cap: usize) -> Self {
35        self.capacity = cap;
36        self
37    }
38
39    /// set statment cache size. every connection in pool would keep a set of prepared statement cache to
40    /// speed up repeated query
41    ///
42    /// # Default
43    /// cache_size default to 16
44    pub fn cache_size(mut self, size: usize) -> Self {
45        self.cache_size = size;
46        self
47    }
48
49    /// set connector type for establishing connection to database. C must impl [`Connect`] trait
50    pub fn connector<C>(mut self, connector: C) -> Self
51    where
52        C: Connect + 'static,
53    {
54        self.connector = Box::new(connector) as _;
55        self
56    }
57
58    /// try convert builder to a connection pool instance.
59    pub fn build(self) -> Result<Pool, Error> {
60        let cfg = self.config?;
61        let cache_size = NonZeroUsize::new(self.cache_size).ok_or_else(Error::todo)?;
62
63        Ok(Pool {
64            conn: Mutex::new(VecDeque::with_capacity(self.capacity)),
65            permits: Semaphore::new(self.capacity),
66            config: Box::new(PoolConfig {
67                connector: self.connector,
68                cfg,
69                cache_size,
70            }),
71        })
72    }
73}
74
75/// connection pool for a set of connections to database. Can be used as entry point of query
76///
77/// # Examples
78/// ```rust
79/// # use xitca_postgres::{Error, Execute};
80/// # async fn query() -> Result<(), Error> {
81/// let pool = xitca_postgres::pool::Pool::builder("db_url").build()?;
82/// xitca_postgres::Statement::named("SELECT 1", &[]).bind_none().execute(&pool).await?;
83/// # Ok(())
84/// # }
85/// ```
86///
87/// # Caching
88/// When connection pool is used as executor through [`Execute::query`] and [`Execute::execute`] methods
89/// it would prepare and cache statement for reuse. For selective caching consider use [`PoolConnection`]
90///
91/// [`Execute::query`]: crate::Execute::query
92/// [`Execute::execute`]: crate::Execute::execute
93pub struct Pool {
94    conn: Mutex<VecDeque<PoolClient>>,
95    permits: Semaphore,
96    config: Box<PoolConfig>,
97}
98
99struct PoolConfig {
100    connector: Box<dyn ConnectorDyn>,
101    cfg: Config,
102    cache_size: NonZeroUsize,
103}
104
105impl Pool {
106    /// start a builder of pool where it's behavior can be configured.
107    pub fn builder<C>(cfg: C) -> PoolBuilder
108    where
109        Config: TryFrom<C>,
110        Error: From<<Config as TryFrom<C>>::Error>,
111    {
112        PoolBuilder {
113            config: cfg.try_into().map_err(Into::into),
114            capacity: 1,
115            cache_size: 16,
116            connector: Box::new(DefaultConnector),
117        }
118    }
119
120    /// try to get a connection from pool.
121    /// when pool is empty it will try to spawn new connection to database and if the process failed the outcome will
122    /// return as [`Error`]
123    pub async fn get(&self) -> Result<PoolConnection<'_>, Error> {
124        let _permit = self.permits.acquire().await.expect("Semaphore must not be closed");
125
126        let conn = match self.try_get() {
127            Some(conn) => conn,
128            None => self.connect().await?,
129        };
130
131        Ok(PoolConnection {
132            pool: self,
133            conn: Some(conn),
134            _permit,
135        })
136    }
137
138    /// get configration of current connection pool.
139    pub fn config(&self) -> &Config {
140        &self.config.cfg
141    }
142
143    fn try_get(&self) -> Option<PoolClient> {
144        let mut inner = self.conn.lock().unwrap();
145
146        while let Some(conn) = inner.pop_front() {
147            if !conn.closed() {
148                return Some(conn);
149            }
150        }
151
152        None
153    }
154
155    #[cold]
156    #[inline(never)]
157    async fn connect(&self) -> Result<PoolClient, Error> {
158        self.config
159            .connector
160            .connect_dyn(self.config.cfg.clone())
161            .await
162            .map(|cli| PoolClient::new(cli, self.config.cache_size))
163    }
164}
165
166#[cfg(not(feature = "io-uring"))]
167#[cfg(test)]
168mod test {
169    use crate::{execute::Execute, iter::AsyncLendingIterator, statement::Statement};
170
171    use super::*;
172
173    #[tokio::test]
174    async fn pool() {
175        let pool = Pool::builder("postgres://postgres:postgres@localhost:5432")
176            .build()
177            .unwrap();
178
179        {
180            let mut conn = pool.get().await.unwrap();
181
182            let stmt = Statement::named("SELECT 1", &[]).execute(&mut conn).await.unwrap();
183            stmt.execute(&conn.consume()).await.unwrap();
184
185            let num = Statement::named("SELECT 1", &[])
186                .bind_none()
187                .query(&pool)
188                .await
189                .unwrap()
190                .try_next()
191                .await
192                .unwrap()
193                .unwrap()
194                .get::<i32>(0);
195
196            assert_eq!(num, 1);
197        }
198
199        let res = [
200            Statement::named("SELECT 1", &[]).bind_none(),
201            Statement::named("SELECT 1", &[]).bind_none(),
202        ]
203        .query(&pool)
204        .await
205        .unwrap();
206
207        for mut res in res {
208            let num = res.try_next().await.unwrap().unwrap().get::<i32>(0);
209            assert_eq!(num, 1);
210        }
211
212        let _ = vec![
213            Statement::named("SELECT 1", &[]).bind_dyn(&[&1]),
214            Statement::named("SELECT 1", &[]).bind_dyn(&[&"123"]),
215            Statement::named("SELECT 1", &[]).bind_dyn(&[&String::new()]),
216        ]
217        .query(&pool)
218        .await;
219    }
220}