1pub mod config;
2pub mod health;
3
4use std::collections::VecDeque;
5use std::ops::{Deref, DerefMut};
6use std::sync::Arc;
7
8use tokio::sync::{Mutex, Semaphore};
9use tracing::debug;
10
11use crate::config::Config;
12use crate::error::{Error, Result};
13use crate::pool::config::PoolConfig;
14use crate::pool::health::{ConnectionMeta, HealthCheckStrategy};
15use crate::Connection;
16
17struct IdleConnection {
19 conn: Connection,
20 meta: ConnectionMeta,
21}
22
23struct PoolState {
25 idle: VecDeque<IdleConnection>,
26 total_count: usize,
27}
28
29struct PoolShared {
31 config: Config,
32 pool_config: PoolConfig,
33 semaphore: Semaphore,
34 state: Mutex<PoolState>,
35}
36
37#[derive(Debug, Clone, Copy)]
41pub struct PoolMetrics {
42 pub active: usize,
44 pub idle: usize,
46 pub total: usize,
48 pub max: usize,
50}
51
52#[derive(Clone)]
82pub struct Pool {
83 shared: Arc<PoolShared>,
84 pool_instrumentation: Arc<dyn crate::Instrumentation>,
85}
86
87impl Pool {
88 pub fn new(config: Config, pool_config: PoolConfig) -> Self {
90 let pool_instrumentation = config
91 .instrumentation
92 .clone()
93 .unwrap_or_else(crate::instrumentation::noop);
94 let shared = Arc::new(PoolShared {
95 semaphore: Semaphore::new(pool_config.max_connections),
96 config,
97 pool_config,
98 state: Mutex::new(PoolState {
99 idle: VecDeque::new(),
100 total_count: 0,
101 }),
102 });
103
104 Self {
105 shared,
106 pool_instrumentation,
107 }
108 }
109
110 pub fn connect_lazy(config: Config, pool_config: PoolConfig) -> Self {
126 Self::new(config, pool_config)
127 }
128
129 pub fn with_instrumentation(mut self, instr: Arc<dyn crate::Instrumentation>) -> Self {
134 self.pool_instrumentation = instr;
135 self
136 }
137
138 pub async fn acquire(&self) -> Result<PooledConnection> {
144 let pending = {
145 let state = self.shared.state.lock().await;
146 state.total_count.saturating_sub(state.idle.len())
147 };
148 self.pool_instrumentation
149 .on_event(&crate::Event::PoolAcquireStart { pending });
150 let started = std::time::Instant::now();
151 let res = self.acquire_inner().await;
152 let wait = started.elapsed();
153 let outcome = match &res {
154 Ok(_) => crate::AcquireOutcome::Ok,
155 Err(crate::Error::Pool(msg)) if msg.contains("timeout") => {
156 crate::AcquireOutcome::Timeout
157 }
158 Err(crate::Error::Pool(msg)) if msg.contains("closed") => {
159 crate::AcquireOutcome::PoolClosed
160 }
161 Err(_) => crate::AcquireOutcome::Error,
162 };
163 self.pool_instrumentation
164 .on_event(&crate::Event::PoolAcquireFinish { wait, outcome });
165 res
166 }
167
168 async fn acquire_inner(&self) -> Result<PooledConnection> {
169 let permit = tokio::time::timeout(
170 self.shared.pool_config.acquire_timeout,
171 self.shared.semaphore.acquire(),
172 )
173 .await
174 .map_err(|_| Error::Pool("acquire timeout: pool exhausted".into()))?
175 .map_err(|_| Error::Pool("pool closed".into()))?;
176
177 drop(permit);
180
181 let idle_conn = {
183 let mut state = self.shared.state.lock().await;
184 state.idle.pop_front()
185 };
186
187 if let Some(idle) = idle_conn {
188 if self.is_fresh(&idle.meta) {
189 let mut conn = idle.conn;
190 if self.shared.pool_config.health_check == HealthCheckStrategy::Query
192 && !health::check_alive(conn.pg_connection_mut()).await
193 {
194 debug!("idle connection failed health check, creating new one");
195 self.decrement_count().await;
196 let (conn, meta) = self.create_connection().await?;
197 return Ok(self.wrap(conn, meta));
198 }
199
200 if let Some(ref cb) = self.shared.pool_config.before_acquire {
202 match cb(&mut conn).await {
203 Ok(true) => { }
204 Ok(false) => {
205 debug!("before_acquire rejected connection");
206 self.decrement_count().await;
207 let (conn, meta) = self.create_connection().await?;
208 return Ok(self.wrap(conn, meta));
209 }
210 Err(_) => {
211 debug!("before_acquire callback error, discarding connection");
212 self.decrement_count().await;
213 let (conn, meta) = self.create_connection().await?;
214 return Ok(self.wrap(conn, meta));
215 }
216 }
217 }
218
219 debug!("reusing idle connection");
220 Ok(self.wrap(conn, idle.meta))
221 } else {
222 debug!("idle connection expired, creating new one");
223 self.decrement_count().await;
224 let (conn, meta) = self.create_connection().await?;
225 Ok(self.wrap(conn, meta))
226 }
227 } else {
228 let (conn, meta) = self.create_connection().await?;
229 Ok(self.wrap(conn, meta))
230 }
231 }
232
233 pub async fn idle_count(&self) -> usize {
235 self.shared.state.lock().await.idle.len()
236 }
237
238 pub async fn total_count(&self) -> usize {
240 self.shared.state.lock().await.total_count
241 }
242
243 pub fn max_connections(&self) -> usize {
245 self.shared.pool_config.max_connections
246 }
247
248 pub async fn metrics(&self) -> PoolMetrics {
250 let state = self.shared.state.lock().await;
251 let idle = state.idle.len();
252 let total = state.total_count;
253 PoolMetrics {
254 active: total.saturating_sub(idle),
255 idle,
256 total,
257 max: self.shared.pool_config.max_connections,
258 }
259 }
260
261 fn wrap(&self, mut conn: Connection, meta: ConnectionMeta) -> PooledConnection {
267 conn.set_instrumentation(self.pool_instrumentation.clone());
268 PooledConnection {
269 conn: Some(conn),
270 meta,
271 shared: Arc::clone(&self.shared),
272 pool_instrumentation: self.pool_instrumentation.clone(),
273 }
274 }
275
276 async fn create_connection(&self) -> Result<(Connection, ConnectionMeta)> {
277 let mut conn = Connection::connect(self.shared.config.clone()).await?;
278
279 if let Some(ref cb) = self.shared.pool_config.after_connect {
281 if let Err(e) = cb(&mut conn).await {
282 debug!(?e, "after_connect callback failed, discarding connection");
283 return Err(e);
284 }
285 }
286
287 let meta = ConnectionMeta::new();
288
289 let mut state = self.shared.state.lock().await;
290 state.total_count += 1;
291 debug!(total = state.total_count, "created new connection");
292
293 Ok((conn, meta))
294 }
295
296 async fn decrement_count(&self) {
297 let mut state = self.shared.state.lock().await;
298 state.total_count = state.total_count.saturating_sub(1);
299 }
300
301 fn is_fresh(&self, meta: &ConnectionMeta) -> bool {
302 if meta.is_broken {
303 return false;
304 }
305
306 if let Some(timeout) = self.shared.pool_config.idle_timeout {
307 if meta.is_idle_expired(timeout) {
308 return false;
309 }
310 }
311
312 if let Some(lifetime) = self.shared.pool_config.max_lifetime {
313 if meta.is_lifetime_expired(lifetime) {
314 return false;
315 }
316 }
317
318 true
319 }
320}
321
322pub struct PooledConnection {
328 conn: Option<Connection>,
329 meta: ConnectionMeta,
330 shared: Arc<PoolShared>,
331 pool_instrumentation: Arc<dyn crate::Instrumentation>,
332}
333
334impl PooledConnection {
335 pub fn mark_broken(&mut self) {
338 self.meta.is_broken = true;
339 }
340}
341
342impl Deref for PooledConnection {
343 type Target = Connection;
344
345 #[allow(clippy::expect_used)]
346 fn deref(&self) -> &Self::Target {
347 self.conn
348 .as_ref()
349 .expect("PooledConnection used after drop")
350 }
351}
352
353impl DerefMut for PooledConnection {
354 #[allow(clippy::expect_used)]
355 fn deref_mut(&mut self) -> &mut Self::Target {
356 self.conn
357 .as_mut()
358 .expect("PooledConnection used after drop")
359 }
360}
361
362impl Drop for PooledConnection {
363 fn drop(&mut self) {
364 if let Some(conn) = self.conn.take() {
365 self.pool_instrumentation
367 .on_event(&crate::Event::PoolRelease);
368
369 let shared = Arc::clone(&self.shared);
370
371 if self.meta.is_broken {
372 tokio::spawn(async move {
373 drop(conn);
374 let mut state = shared.state.lock().await;
375 state.total_count = state.total_count.saturating_sub(1);
376 debug!("discarded broken connection");
377 });
378 } else {
379 let created_at = self.meta.created_at;
380 let after_release = self.shared.pool_config.after_release.clone();
381
382 tokio::spawn(async move {
383 let mut conn = conn;
384
385 if let Some(cb) = after_release {
387 match cb(&mut conn).await {
388 Ok(true) => { }
389 Ok(false) => {
390 debug!("after_release rejected connection, discarding");
391 let mut state = shared.state.lock().await;
392 state.total_count = state.total_count.saturating_sub(1);
393 return;
394 }
395 Err(_) => {
396 debug!("after_release callback error, discarding connection");
397 let mut state = shared.state.lock().await;
398 state.total_count = state.total_count.saturating_sub(1);
399 return;
400 }
401 }
402 }
403
404 let mut meta = ConnectionMeta::new();
405 meta.created_at = created_at;
406 meta.touch();
407
408 let mut state = shared.state.lock().await;
409 state.idle.push_back(IdleConnection { conn, meta });
410 });
411 }
412 }
413 }
414}
415
416impl crate::generic_client::GenericClient for &Pool {
426 async fn query(
427 &mut self,
428 sql: &str,
429 params: &[&(dyn crate::types::ToSql + Sync)],
430 ) -> crate::Result<Vec<crate::row::Row>> {
431 let mut pooled = self.acquire().await?;
432 crate::Connection::query(&mut pooled, sql, params).await
433 }
434
435 async fn query_one(
436 &mut self,
437 sql: &str,
438 params: &[&(dyn crate::types::ToSql + Sync)],
439 ) -> crate::Result<crate::row::Row> {
440 let mut pooled = self.acquire().await?;
441 crate::Connection::query_one(&mut pooled, sql, params).await
442 }
443
444 async fn query_opt(
445 &mut self,
446 sql: &str,
447 params: &[&(dyn crate::types::ToSql + Sync)],
448 ) -> crate::Result<Option<crate::row::Row>> {
449 let mut pooled = self.acquire().await?;
450 crate::Connection::query_opt(&mut pooled, sql, params).await
451 }
452
453 async fn execute(
454 &mut self,
455 sql: &str,
456 params: &[&(dyn crate::types::ToSql + Sync)],
457 ) -> crate::Result<u64> {
458 let mut pooled = self.acquire().await?;
459 crate::Connection::execute(&mut pooled, sql, params).await
460 }
461
462 async fn simple_query(
463 &mut self,
464 sql: &str,
465 ) -> crate::Result<Vec<crate::row::SimpleQueryMessage>> {
466 let mut pooled = self.acquire().await?;
467 crate::Connection::simple_query(&mut pooled, sql).await
468 }
469
470 async fn query_typed(
471 &mut self,
472 sql: &str,
473 params: &[(&(dyn crate::types::ToSql + Sync), crate::Oid)],
474 ) -> crate::Result<Vec<crate::row::Row>> {
475 let mut pooled = self.acquire().await?;
476 crate::Connection::query_typed(&mut pooled, sql, params).await
477 }
478
479 async fn query_typed_one(
480 &mut self,
481 sql: &str,
482 params: &[(&(dyn crate::types::ToSql + Sync), crate::Oid)],
483 ) -> crate::Result<crate::row::Row> {
484 let mut pooled = self.acquire().await?;
485 crate::Connection::query_typed_one(&mut pooled, sql, params).await
486 }
487
488 async fn query_typed_opt(
489 &mut self,
490 sql: &str,
491 params: &[(&(dyn crate::types::ToSql + Sync), crate::Oid)],
492 ) -> crate::Result<Option<crate::row::Row>> {
493 let mut pooled = self.acquire().await?;
494 crate::Connection::query_typed_opt(&mut pooled, sql, params).await
495 }
496
497 async fn execute_typed(
498 &mut self,
499 sql: &str,
500 params: &[(&(dyn crate::types::ToSql + Sync), crate::Oid)],
501 ) -> crate::Result<u64> {
502 let mut pooled = self.acquire().await?;
503 crate::Connection::execute_typed(&mut pooled, sql, params).await
504 }
505
506 async fn execute_pipeline(
507 &mut self,
508 batch: crate::pipeline::batch::PipelineBatch,
509 ) -> crate::Result<Vec<crate::pipeline::QueryResult>> {
510 let mut pooled = self.acquire().await?;
511 crate::Connection::execute_pipeline(&mut pooled, batch).await
512 }
513}