1mod connect;
2mod connection;
3mod execute;
4
5pub use self::connect::Connect;
6pub use self::connection::{CachedStatement, PoolConnection};
7
8use core::num::NonZeroUsize;
9
10use std::{collections::VecDeque, sync::Mutex};
11
12use tokio::sync::Semaphore;
13
14use super::{config::Config, error::Error};
15
16use self::{
17 connect::{ConnectorDyn, DefaultConnector},
18 connection::PoolClient,
19};
20
21pub struct PoolBuilder {
23 config: Result<Config, Error>,
24 capacity: usize,
25 cache_size: usize,
26 connector: Box<dyn ConnectorDyn>,
27}
28
29impl PoolBuilder {
30 pub fn capacity(mut self, cap: usize) -> Self {
35 self.capacity = cap;
36 self
37 }
38
39 pub fn cache_size(mut self, size: usize) -> Self {
45 self.cache_size = size;
46 self
47 }
48
49 pub fn connector<C>(mut self, connector: C) -> Self
51 where
52 C: Connect + 'static,
53 {
54 self.connector = Box::new(connector) as _;
55 self
56 }
57
58 pub fn build(self) -> Result<Pool, Error> {
60 let cfg = self.config?;
61 let cache_size = NonZeroUsize::new(self.cache_size).ok_or_else(Error::todo)?;
62
63 Ok(Pool {
64 conn: Mutex::new(VecDeque::with_capacity(self.capacity)),
65 permits: Semaphore::new(self.capacity),
66 config: Box::new(PoolConfig {
67 connector: self.connector,
68 cfg,
69 cache_size,
70 }),
71 })
72 }
73}
74
75pub struct Pool {
94 conn: Mutex<VecDeque<PoolClient>>,
95 permits: Semaphore,
96 config: Box<PoolConfig>,
97}
98
99struct PoolConfig {
100 connector: Box<dyn ConnectorDyn>,
101 cfg: Config,
102 cache_size: NonZeroUsize,
103}
104
105impl Pool {
106 pub fn builder<C>(cfg: C) -> PoolBuilder
108 where
109 Config: TryFrom<C>,
110 Error: From<<Config as TryFrom<C>>::Error>,
111 {
112 PoolBuilder {
113 config: cfg.try_into().map_err(Into::into),
114 capacity: 1,
115 cache_size: 16,
116 connector: Box::new(DefaultConnector),
117 }
118 }
119
120 pub async fn get(&self) -> Result<PoolConnection<'_>, Error> {
124 let _permit = self.permits.acquire().await.expect("Semaphore must not be closed");
125
126 let conn = match self.try_get() {
127 Some(conn) => conn,
128 None => self.connect().await?,
129 };
130
131 Ok(PoolConnection {
132 pool: self,
133 conn: Some(conn),
134 _permit,
135 })
136 }
137
138 pub fn config(&self) -> &Config {
140 &self.config.cfg
141 }
142
143 fn try_get(&self) -> Option<PoolClient> {
144 let mut inner = self.conn.lock().unwrap();
145
146 while let Some(conn) = inner.pop_front() {
147 if !conn.closed() {
148 return Some(conn);
149 }
150 }
151
152 None
153 }
154
155 #[cold]
156 #[inline(never)]
157 async fn connect(&self) -> Result<PoolClient, Error> {
158 self.config
159 .connector
160 .connect_dyn(self.config.cfg.clone())
161 .await
162 .map(|cli| PoolClient::new(cli, self.config.cache_size))
163 }
164}
165
166#[cfg(not(feature = "io-uring"))]
167#[cfg(test)]
168mod test {
169 use crate::{execute::Execute, iter::AsyncLendingIterator, statement::Statement};
170
171 use super::*;
172
173 #[tokio::test]
174 async fn pool() {
175 let pool = Pool::builder("postgres://postgres:postgres@localhost:5432")
176 .build()
177 .unwrap();
178
179 {
180 let mut conn = pool.get().await.unwrap();
181
182 let stmt = Statement::named("SELECT 1", &[]).execute(&mut conn).await.unwrap();
183 stmt.execute(&conn.consume()).await.unwrap();
184
185 let num = Statement::named("SELECT 1", &[])
186 .bind_none()
187 .query(&pool)
188 .await
189 .unwrap()
190 .try_next()
191 .await
192 .unwrap()
193 .unwrap()
194 .get::<i32>(0);
195
196 assert_eq!(num, 1);
197 }
198
199 let res = [
200 Statement::named("SELECT 1", &[]).bind_none(),
201 Statement::named("SELECT 1", &[]).bind_none(),
202 ]
203 .query(&pool)
204 .await
205 .unwrap();
206
207 for mut res in res {
208 let num = res.try_next().await.unwrap().unwrap().get::<i32>(0);
209 assert_eq!(num, 1);
210 }
211
212 let _ = vec![
213 Statement::named("SELECT 1", &[]).bind_dyn(&[&1]),
214 Statement::named("SELECT 1", &[]).bind_dyn(&[&"123"]),
215 Statement::named("SELECT 1", &[]).bind_dyn(&[&String::new()]),
216 ]
217 .query(&pool)
218 .await;
219 }
220}