1use super::ScopedPoolFuture;
5use super::churn::{
6 PoolStats, decrement_active_count_saturating, pool_churn_record_destroy,
7 pool_churn_remaining_open, record_pool_connection_destroy,
8};
9use super::config::PoolConfig;
10use super::connection::PooledConn;
11use super::connection::PooledConnection;
12use super::gss::*;
13use crate::driver::{
14 AstPipelineMode, AutoCountPath, AutoCountPlan, ConnectOptions, PgConnection, PgError, PgResult,
15 is_ignorable_session_message, unexpected_backend_message,
16};
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
19use std::time::{Duration, Instant};
20use tokio::sync::{Mutex, Semaphore};
21use tokio::task::JoinSet;
22
23pub(super) const MAX_HOT_STATEMENTS: usize = 32;
25
26pub(super) struct PgPoolInner {
28 pub(super) config: PoolConfig,
29 pub(super) connections: Mutex<Vec<PooledConn>>,
30 pub(super) semaphore: Semaphore,
31 pub(super) closed: AtomicBool,
32 pub(super) active_count: AtomicUsize,
33 pub(super) total_created: AtomicUsize,
34 pub(super) leaked_cleanup_inflight: AtomicUsize,
35 pub(super) hot_statements: std::sync::RwLock<std::collections::HashMap<u64, (String, String)>>,
39}
40
41pub(super) fn handle_hot_preprepare_message(
42 msg: &crate::protocol::BackendMessage,
43 parse_complete_count: &mut usize,
44 error: &mut Option<PgError>,
45) -> PgResult<bool> {
46 match msg {
47 crate::protocol::BackendMessage::ParseComplete => {
48 *parse_complete_count += 1;
49 Ok(false)
50 }
51 crate::protocol::BackendMessage::ErrorResponse(err) => {
52 if error.is_none() {
53 *error = Some(PgError::QueryServer(err.clone().into()));
54 }
55 Ok(false)
56 }
57 crate::protocol::BackendMessage::ReadyForQuery(_) => Ok(true),
58 msg if is_ignorable_session_message(msg) => Ok(false),
59 other => Err(unexpected_backend_message("pool hot pre-prepare", other)),
60 }
61}
62
63impl PgPoolInner {
64 pub(super) async fn return_connection(&self, conn: PgConnection, created_at: Instant) {
65 decrement_active_count_saturating(&self.active_count);
66
67 if conn.is_io_desynced() {
68 tracing::warn!(
69 host = %self.config.host,
70 port = self.config.port,
71 user = %self.config.user,
72 db = %self.config.database,
73 "pool_return_desynced: dropping connection due to prior I/O/protocol desync"
74 );
75 record_pool_connection_destroy("pool_desynced_drop");
76 self.semaphore.add_permits(1);
77 pool_churn_record_destroy(&self.config, "return_desynced");
78 return;
79 }
80
81 if self.closed.load(Ordering::Relaxed) {
82 record_pool_connection_destroy("pool_closed_drop");
83 self.semaphore.add_permits(1);
84 return;
85 }
86
87 let mut connections = self.connections.lock().await;
88 if connections.len() < self.config.max_connections {
89 connections.push(PooledConn {
90 conn,
91 created_at,
92 last_used: Instant::now(),
93 });
94 } else {
95 record_pool_connection_destroy("pool_overflow_drop");
96 }
97
98 self.semaphore.add_permits(1);
99 }
100
101 async fn get_healthy_connection(&self) -> Option<PooledConn> {
103 let mut connections = self.connections.lock().await;
104
105 while let Some(pooled) = connections.pop() {
106 if pooled.last_used.elapsed() > self.config.idle_timeout {
107 tracing::debug!(
108 idle_secs = pooled.last_used.elapsed().as_secs(),
109 timeout_secs = self.config.idle_timeout.as_secs(),
110 "pool_checkout_evict: connection exceeded idle timeout"
111 );
112 record_pool_connection_destroy("idle_timeout_evict");
113 continue;
114 }
115
116 if let Some(max_life) = self.config.max_lifetime
117 && pooled.created_at.elapsed() > max_life
118 {
119 tracing::debug!(
120 age_secs = pooled.created_at.elapsed().as_secs(),
121 max_lifetime_secs = max_life.as_secs(),
122 "pool_checkout_evict: connection exceeded max lifetime"
123 );
124 record_pool_connection_destroy("max_lifetime_evict");
125 continue;
126 }
127
128 return Some(pooled);
129 }
130
131 None
132 }
133}
134
135#[derive(Clone)]
146pub struct PgPool {
147 pub(super) inner: Arc<PgPoolInner>,
148}
149
150impl PgPool {
151 pub async fn from_config() -> PgResult<Self> {
158 let qail = qail_core::config::QailConfig::load()
159 .map_err(|e| PgError::Connection(format!("Config error: {}", e)))?;
160 let config = PoolConfig::from_qail_config(&qail)?;
161 Self::connect(config).await
162 }
163
164 pub async fn connect(config: PoolConfig) -> PgResult<Self> {
166 validate_pool_config(&config)?;
167
168 let semaphore = Semaphore::new(config.max_connections);
170
171 let mut initial_connections = Vec::new();
172 for _ in 0..config.min_connections {
173 let conn = Self::create_connection(&config).await?;
174 initial_connections.push(PooledConn {
175 conn,
176 created_at: Instant::now(),
177 last_used: Instant::now(),
178 });
179 }
180
181 let initial_count = initial_connections.len();
182
183 let inner = Arc::new(PgPoolInner {
184 config,
185 connections: Mutex::new(initial_connections),
186 semaphore,
187 closed: AtomicBool::new(false),
188 active_count: AtomicUsize::new(0),
189 total_created: AtomicUsize::new(initial_count),
190 leaked_cleanup_inflight: AtomicUsize::new(0),
191 hot_statements: std::sync::RwLock::new(std::collections::HashMap::new()),
192 });
193
194 Ok(Self { inner })
195 }
196
197 pub async fn acquire_raw(&self) -> PgResult<PooledConnection> {
212 if self.inner.closed.load(Ordering::Relaxed) {
213 return Err(PgError::PoolClosed);
214 }
215
216 if let Some(remaining) = pool_churn_remaining_open(&self.inner.config) {
217 metrics::counter!("qail_pg_pool_churn_circuit_reject_total").increment(1);
218 tracing::warn!(
219 host = %self.inner.config.host,
220 port = self.inner.config.port,
221 user = %self.inner.config.user,
222 db = %self.inner.config.database,
223 remaining_ms = remaining.as_millis() as u64,
224 "pool_connection_churn_circuit_open"
225 );
226 return Err(PgError::PoolExhausted {
227 max: self.inner.config.max_connections,
228 });
229 }
230
231 let acquire_timeout = self.inner.config.acquire_timeout;
233 let permit =
234 match tokio::time::timeout(acquire_timeout, self.inner.semaphore.acquire()).await {
235 Ok(permit) => permit.map_err(|_| PgError::PoolClosed)?,
236 Err(_) => {
237 metrics::counter!("qail_pg_pool_acquire_timeouts_total").increment(1);
238 return Err(PgError::Timeout(format!(
239 "pool acquire after {}s ({} max connections)",
240 acquire_timeout.as_secs(),
241 self.inner.config.max_connections
242 )));
243 }
244 };
245
246 if self.inner.closed.load(Ordering::Relaxed) {
247 return Err(PgError::PoolClosed);
248 }
249
250 let (mut conn, mut created_at) =
252 if let Some(pooled) = self.inner.get_healthy_connection().await {
253 (pooled.conn, pooled.created_at)
254 } else {
255 let conn = Self::create_connection(&self.inner.config).await?;
256 self.inner.total_created.fetch_add(1, Ordering::Relaxed);
257 (conn, Instant::now())
258 };
259
260 if self.inner.config.test_on_acquire
261 && let Err(e) = execute_simple_with_timeout(
262 &mut conn,
263 "SELECT 1",
264 self.inner.config.connect_timeout,
265 "pool checkout health check",
266 )
267 .await
268 {
269 tracing::warn!(
270 host = %self.inner.config.host,
271 port = self.inner.config.port,
272 user = %self.inner.config.user,
273 db = %self.inner.config.database,
274 error = %e,
275 "pool_health_check_failed: checkout probe failed, creating replacement connection"
276 );
277 pool_churn_record_destroy(&self.inner.config, "health_check_failed");
278 conn = Self::create_connection(&self.inner.config).await?;
279 self.inner.total_created.fetch_add(1, Ordering::Relaxed);
280 created_at = Instant::now();
281 }
282
283 let missing: Vec<(u64, String, String)> = {
286 if let Ok(hot) = self.inner.hot_statements.read() {
287 hot.iter()
288 .filter(|(hash, _)| !conn.stmt_cache.contains(hash))
289 .map(|(hash, (name, sql))| (*hash, name.clone(), sql.clone()))
290 .collect()
291 } else {
292 Vec::new()
293 }
294 }; if !missing.is_empty() {
297 use crate::protocol::PgEncoder;
298 let mut buf = bytes::BytesMut::new();
299 for (_, name, sql) in &missing {
300 let parse_msg = PgEncoder::try_encode_parse(name, sql, &[])?;
301 buf.extend_from_slice(&parse_msg);
302 }
303 PgEncoder::encode_sync_to(&mut buf);
304 let preprepare_timeout = self.inner.config.connect_timeout;
305 let preprepare_result: PgResult<()> = match tokio::time::timeout(
306 preprepare_timeout,
307 async {
308 conn.send_bytes(&buf).await?;
309 let mut parse_complete_count = 0usize;
311 let mut parse_error: Option<PgError> = None;
312 loop {
313 let msg = conn.recv().await?;
314 if handle_hot_preprepare_message(
315 &msg,
316 &mut parse_complete_count,
317 &mut parse_error,
318 )? {
319 if let Some(err) = parse_error {
320 return Err(err);
321 }
322 if parse_complete_count != missing.len() {
323 return Err(PgError::Protocol(format!(
324 "hot pre-prepare completed with {} ParseComplete messages (expected {})",
325 parse_complete_count,
326 missing.len()
327 )));
328 }
329 break;
330 }
331 }
332 Ok::<(), PgError>(())
333 },
334 )
335 .await
336 {
337 Ok(res) => res,
338 Err(_) => Err(PgError::Timeout(format!(
339 "hot statement pre-prepare timeout after {:?} (pool config connect_timeout)",
340 preprepare_timeout
341 ))),
342 };
343
344 if let Err(e) = preprepare_result {
345 tracing::warn!(
346 host = %self.inner.config.host,
347 port = self.inner.config.port,
348 user = %self.inner.config.user,
349 db = %self.inner.config.database,
350 timeout_ms = preprepare_timeout.as_millis() as u64,
351 error = %e,
352 "pool_hot_prepare_failed: replacing connection to avoid handing out uncertain protocol state"
353 );
354 pool_churn_record_destroy(&self.inner.config, "hot_prepare_failed");
355 conn = Self::create_connection(&self.inner.config).await?;
356 self.inner.total_created.fetch_add(1, Ordering::Relaxed);
357 created_at = Instant::now();
358 } else {
359 for (hash, name, sql) in &missing {
361 conn.stmt_cache.put(*hash, name.clone());
362 conn.prepared_statements.insert(name.clone(), sql.clone());
363 }
364 }
365 }
366
367 self.inner.active_count.fetch_add(1, Ordering::Relaxed);
368 permit.forget();
370
371 Ok(PooledConnection {
372 conn: Some(conn),
373 pool: self.inner.clone(),
374 rls_dirty: false,
375 created_at,
376 })
377 }
378
379 pub async fn acquire_with_rls(
395 &self,
396 ctx: qail_core::rls::RlsContext,
397 ) -> PgResult<PooledConnection> {
398 let mut conn = self.acquire_raw().await?;
400
401 let sql = crate::driver::rls::context_to_sql(&ctx);
403 let pg_conn = conn.get_mut()?;
404 if let Err(e) = execute_simple_with_timeout(
405 pg_conn,
406 &sql,
407 self.inner.config.connect_timeout,
408 "pool acquire_with_rls setup",
409 )
410 .await
411 {
412 if let Ok(pg_conn) = conn.get_mut() {
415 let _ = pg_conn.execute_simple("ROLLBACK").await;
416 }
417 conn.release().await;
418 return Err(e);
419 }
420
421 conn.rls_dirty = true;
423
424 Ok(conn)
425 }
426
427 pub async fn with_rls<T, F>(&self, ctx: qail_core::rls::RlsContext, f: F) -> PgResult<T>
431 where
432 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
433 {
434 let mut conn = self.acquire_with_rls(ctx).await?;
435 let out = f(&mut conn).await;
436 conn.release().await;
437 out
438 }
439
440 pub async fn with_system<T, F>(&self, f: F) -> PgResult<T>
442 where
443 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
444 {
445 self.with_rls(qail_core::rls::RlsContext::empty(), f).await
446 }
447
448 pub async fn with_global<T, F>(&self, f: F) -> PgResult<T>
450 where
451 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
452 {
453 self.with_rls(qail_core::rls::RlsContext::global(), f).await
454 }
455
456 pub async fn with_tenant<T, F>(&self, tenant_id: &str, f: F) -> PgResult<T>
458 where
459 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
460 {
461 self.with_rls(qail_core::rls::RlsContext::tenant(tenant_id), f)
462 .await
463 }
464
465 pub async fn acquire_with_rls_timeout(
470 &self,
471 ctx: qail_core::rls::RlsContext,
472 timeout_ms: u32,
473 ) -> PgResult<PooledConnection> {
474 let mut conn = self.acquire_raw().await?;
476
477 let sql = crate::driver::rls::context_to_sql_with_timeout(&ctx, timeout_ms);
479 let pg_conn = conn.get_mut()?;
480 if let Err(e) = execute_simple_with_timeout(
481 pg_conn,
482 &sql,
483 self.inner.config.connect_timeout,
484 "pool acquire_with_rls_timeout setup",
485 )
486 .await
487 {
488 if let Ok(pg_conn) = conn.get_mut() {
489 let _ = pg_conn.execute_simple("ROLLBACK").await;
490 }
491 conn.release().await;
492 return Err(e);
493 }
494
495 conn.rls_dirty = true;
497
498 Ok(conn)
499 }
500
501 pub async fn with_rls_timeout<T, F>(
503 &self,
504 ctx: qail_core::rls::RlsContext,
505 timeout_ms: u32,
506 f: F,
507 ) -> PgResult<T>
508 where
509 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
510 {
511 let mut conn = self.acquire_with_rls_timeout(ctx, timeout_ms).await?;
512 let out = f(&mut conn).await;
513 conn.release().await;
514 out
515 }
516
517 pub async fn acquire_with_rls_timeouts(
523 &self,
524 ctx: qail_core::rls::RlsContext,
525 statement_timeout_ms: u32,
526 lock_timeout_ms: u32,
527 ) -> PgResult<PooledConnection> {
528 let mut conn = self.acquire_raw().await?;
530
531 let sql = crate::driver::rls::context_to_sql_with_timeouts(
532 &ctx,
533 statement_timeout_ms,
534 lock_timeout_ms,
535 );
536 let pg_conn = conn.get_mut()?;
537 if let Err(e) = execute_simple_with_timeout(
538 pg_conn,
539 &sql,
540 self.inner.config.connect_timeout,
541 "pool acquire_with_rls_timeouts setup",
542 )
543 .await
544 {
545 if let Ok(pg_conn) = conn.get_mut() {
546 let _ = pg_conn.execute_simple("ROLLBACK").await;
547 }
548 conn.release().await;
549 return Err(e);
550 }
551
552 conn.rls_dirty = true;
553
554 Ok(conn)
555 }
556
557 pub async fn with_rls_timeouts<T, F>(
559 &self,
560 ctx: qail_core::rls::RlsContext,
561 statement_timeout_ms: u32,
562 lock_timeout_ms: u32,
563 f: F,
564 ) -> PgResult<T>
565 where
566 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
567 {
568 let mut conn = self
569 .acquire_with_rls_timeouts(ctx, statement_timeout_ms, lock_timeout_ms)
570 .await?;
571 let out = f(&mut conn).await;
572 conn.release().await;
573 out
574 }
575
576 pub async fn acquire_system(&self) -> PgResult<PooledConnection> {
586 let ctx = qail_core::rls::RlsContext::empty();
587 self.acquire_with_rls(ctx).await
588 }
589
590 pub async fn acquire_global(&self) -> PgResult<PooledConnection> {
596 self.acquire_with_rls(qail_core::rls::RlsContext::global())
597 .await
598 }
599
600 pub async fn acquire_for_tenant(&self, tenant_id: &str) -> PgResult<PooledConnection> {
612 self.acquire_with_rls(qail_core::rls::RlsContext::tenant(tenant_id))
613 .await
614 }
615
616 pub async fn acquire_with_branch(
630 &self,
631 ctx: &qail_core::branch::BranchContext,
632 ) -> PgResult<PooledConnection> {
633 let mut conn = self.acquire_raw().await?;
635
636 if let Some(branch_name) = ctx.branch_name() {
637 let sql = crate::driver::branch_sql::branch_context_sql(branch_name);
638 let pg_conn = conn.get_mut()?;
639 if let Err(e) = execute_simple_with_timeout(
640 pg_conn,
641 &sql,
642 self.inner.config.connect_timeout,
643 "pool acquire_with_branch setup",
644 )
645 .await
646 {
647 if let Ok(pg_conn) = conn.get_mut() {
648 let _ = pg_conn.execute_simple("ROLLBACK").await;
649 }
650 conn.release().await;
651 return Err(e);
652 }
653 conn.rls_dirty = true; }
655
656 Ok(conn)
657 }
658
659 pub async fn idle_count(&self) -> usize {
661 self.inner.connections.lock().await.len()
662 }
663
664 pub fn active_count(&self) -> usize {
666 self.inner.active_count.load(Ordering::Relaxed)
667 }
668
669 pub fn max_connections(&self) -> usize {
671 self.inner.config.max_connections
672 }
673
674 pub fn plan_auto_count(&self, batch_len: usize) -> AutoCountPlan {
676 AutoCountPlan::for_pool(
677 batch_len,
678 self.inner.config.max_connections,
679 self.inner.semaphore.available_permits(),
680 )
681 }
682
683 pub async fn execute_count_auto_with_plan(
685 &self,
686 cmds: &[qail_core::ast::Qail],
687 ) -> PgResult<(usize, AutoCountPlan)> {
688 let plan = self.plan_auto_count(cmds.len());
689
690 let completed = match plan.path {
691 AutoCountPath::SingleCached => {
692 if cmds.is_empty() {
693 0
694 } else {
695 let mut conn = self.acquire_system().await?;
696 let run_result = conn.fetch_all_cached(&cmds[0]).await;
697 conn.release().await;
698 let _ = run_result?;
699 1
700 }
701 }
702 AutoCountPath::PipelineOneShot | AutoCountPath::PipelineCached => {
703 let mode = if matches!(plan.path, AutoCountPath::PipelineOneShot) {
704 AstPipelineMode::OneShot
705 } else {
706 AstPipelineMode::Cached
707 };
708
709 let mut pooled = self.acquire_system().await?;
710 let run_result = {
711 let conn = pooled.get_mut()?;
712 conn.pipeline_execute_count_ast_with_mode(cmds, mode).await
713 };
714 pooled.release().await;
715 run_result?
716 }
717 AutoCountPath::PoolParallel => {
718 if cmds.is_empty() {
719 0
720 } else {
721 let all_cmds = Arc::new(cmds.to_vec());
722 let mut tasks: JoinSet<PgResult<usize>> = JoinSet::new();
723
724 for worker in 0..plan.workers {
725 let start = worker * plan.chunk_size;
726 if start >= all_cmds.len() {
727 break;
728 }
729 let end = (start + plan.chunk_size).min(all_cmds.len());
730 let pool = self.clone();
731 let all_cmds = Arc::clone(&all_cmds);
732
733 tasks.spawn(async move {
734 let mut pooled = pool.acquire_system().await?;
735 let run_result = {
736 let conn = pooled.get_mut()?;
737 conn.pipeline_execute_count_ast_with_mode(
738 &all_cmds[start..end],
739 AstPipelineMode::Auto,
740 )
741 .await
742 };
743 pooled.release().await;
744 run_result
745 });
746 }
747
748 let mut total = 0usize;
749 while let Some(joined) = tasks.join_next().await {
750 match joined {
751 Ok(Ok(count)) => {
752 total += count;
753 }
754 Ok(Err(err)) => return Err(err),
755 Err(err) => {
756 return Err(PgError::Connection(format!(
757 "auto pool worker join failed: {err}"
758 )));
759 }
760 }
761 }
762 total
763 }
764 }
765 };
766
767 Ok((completed, plan))
768 }
769
770 #[inline]
772 pub async fn execute_count_auto(&self, cmds: &[qail_core::ast::Qail]) -> PgResult<usize> {
773 let (completed, _plan) = self.execute_count_auto_with_plan(cmds).await?;
774 Ok(completed)
775 }
776
777 pub async fn stats(&self) -> PoolStats {
779 let idle = self.inner.connections.lock().await.len();
780 let active = self.inner.active_count.load(Ordering::Relaxed);
781 let used_slots = self
782 .inner
783 .config
784 .max_connections
785 .saturating_sub(self.inner.semaphore.available_permits());
786 PoolStats {
787 active,
788 idle,
789 pending: used_slots.saturating_sub(active),
790 max_size: self.inner.config.max_connections,
791 total_created: self.inner.total_created.load(Ordering::Relaxed),
792 }
793 }
794
795 pub fn is_closed(&self) -> bool {
797 self.inner.closed.load(Ordering::Relaxed)
798 }
799
800 pub async fn close(&self) {
807 self.close_graceful(self.inner.config.acquire_timeout).await;
808 }
809
810 pub async fn close_graceful(&self, drain_timeout: Duration) {
812 self.inner.closed.store(true, Ordering::Relaxed);
813 self.inner.semaphore.close();
815
816 let deadline = Instant::now() + drain_timeout;
817 loop {
818 let active = self.inner.active_count.load(Ordering::Relaxed);
819 if active == 0 {
820 break;
821 }
822 if Instant::now() >= deadline {
823 tracing::warn!(
824 active_connections = active,
825 timeout_ms = drain_timeout.as_millis() as u64,
826 "pool_close_drain_timeout: forcing idle cleanup while active connections remain"
827 );
828 break;
829 }
830 tokio::time::sleep(Duration::from_millis(25)).await;
831 }
832
833 let mut connections = self.inner.connections.lock().await;
834 let dropped_idle = connections.len();
835 connections.clear();
836 tracing::info!(
837 dropped_idle_connections = dropped_idle,
838 active_connections = self.inner.active_count.load(Ordering::Relaxed),
839 "pool_closed"
840 );
841 }
842
843 async fn create_connection(config: &PoolConfig) -> PgResult<PgConnection> {
845 if !config.auth_settings.has_any_password_method()
846 && config.mtls.is_none()
847 && config.password.is_some()
848 {
849 return Err(PgError::Auth(
850 "Invalid PoolConfig: all password auth methods are disabled".to_string(),
851 ));
852 }
853
854 let options = ConnectOptions {
855 tls_mode: config.tls_mode,
856 gss_enc_mode: config.gss_enc_mode,
857 tls_ca_cert_pem: config.tls_ca_cert_pem.clone(),
858 mtls: config.mtls.clone(),
859 gss_token_provider: config.gss_token_provider,
860 gss_token_provider_ex: config.gss_token_provider_ex.clone(),
861 auth: config.auth_settings,
862 startup_params: Vec::new(),
863 };
864
865 if let Some(remaining) = gss_circuit_remaining_open(config) {
866 metrics::counter!("qail_pg_gss_circuit_open_total").increment(1);
867 tracing::warn!(
868 host = %config.host,
869 port = config.port,
870 user = %config.user,
871 db = %config.database,
872 remaining_ms = remaining.as_millis() as u64,
873 "gss_connect_circuit_open"
874 );
875 return Err(PgError::Connection(format!(
876 "GSS connection circuit is open; retry after {:?}",
877 remaining
878 )));
879 }
880
881 let mut attempt = 0usize;
882 loop {
883 let connect_result = tokio::time::timeout(
884 config.connect_timeout,
885 PgConnection::connect_with_options(
886 &config.host,
887 config.port,
888 &config.user,
889 &config.database,
890 config.password.as_deref(),
891 options.clone(),
892 ),
893 )
894 .await;
895
896 let connect_result = match connect_result {
897 Ok(result) => result,
898 Err(_) => Err(PgError::Timeout(format!(
899 "connect timeout after {:?} (pool config connect_timeout)",
900 config.connect_timeout
901 ))),
902 };
903
904 match connect_result {
905 Ok(conn) => {
906 metrics::counter!("qail_pg_pool_connect_success_total").increment(1);
907 gss_circuit_record_success(config);
908 return Ok(conn);
909 }
910 Err(err) if should_retry_gss_connect_error(config, attempt, &err) => {
911 metrics::counter!("qail_pg_gss_connect_retries_total").increment(1);
912 gss_circuit_record_failure(config);
913 let delay = gss_retry_delay(config.gss_retry_base_delay, attempt);
914 tracing::warn!(
915 host = %config.host,
916 port = config.port,
917 user = %config.user,
918 db = %config.database,
919 attempt = attempt + 1,
920 delay_ms = delay.as_millis() as u64,
921 error = %err,
922 "gss_connect_retry"
923 );
924 tokio::time::sleep(delay).await;
925 attempt += 1;
926 }
927 Err(err) => {
928 metrics::counter!("qail_pg_pool_connect_failures_total").increment(1);
929 if should_track_gss_circuit_error(config, &err) {
930 metrics::counter!("qail_pg_gss_connect_failures_total").increment(1);
931 gss_circuit_record_failure(config);
932 }
933 return Err(err);
934 }
935 }
936 }
937 }
938
939 pub async fn maintain(&self) {
942 if self.inner.closed.load(Ordering::Relaxed) {
943 return;
944 }
945
946 let evicted = {
948 let mut connections = self.inner.connections.lock().await;
949 let before = connections.len();
950 connections.retain(|pooled| {
951 if pooled.last_used.elapsed() > self.inner.config.idle_timeout {
952 record_pool_connection_destroy("idle_sweep_evict");
953 return false;
954 }
955 if let Some(max_life) = self.inner.config.max_lifetime
956 && pooled.created_at.elapsed() > max_life
957 {
958 record_pool_connection_destroy("lifetime_sweep_evict");
959 return false;
960 }
961 true
962 });
963 before - connections.len()
964 };
965
966 if evicted > 0 {
967 tracing::debug!(evicted, "pool_maintenance: evicted stale idle connections");
968 }
969
970 let min = self.inner.config.min_connections;
972 if min == 0 {
973 return;
974 }
975
976 let idle_count = self.inner.connections.lock().await.len();
977 let checked_out_slots = self
978 .inner
979 .config
980 .max_connections
981 .saturating_sub(self.inner.semaphore.available_permits());
982 let deficit = maintenance_backfill_deficit(
983 self.inner.config.max_connections,
984 min,
985 idle_count,
986 checked_out_slots,
987 );
988 if deficit == 0 {
989 return;
990 }
991 let mut created = 0usize;
992 for _ in 0..deficit {
993 match Self::create_connection(&self.inner.config).await {
994 Ok(conn) => {
995 self.inner.total_created.fetch_add(1, Ordering::Relaxed);
996 let mut connections = self.inner.connections.lock().await;
997 if connections.len() < self.inner.config.max_connections {
998 connections.push(PooledConn {
999 conn,
1000 created_at: Instant::now(),
1001 last_used: Instant::now(),
1002 });
1003 created += 1;
1004 } else {
1005 break;
1007 }
1008 }
1009 Err(e) => {
1010 tracing::warn!(error = %e, "pool_maintenance: backfill connection failed");
1011 break; }
1013 }
1014 }
1015
1016 if created > 0 {
1017 tracing::debug!(
1018 created,
1019 min_connections = min,
1020 "pool_maintenance: backfilled idle connections"
1021 );
1022 }
1023 }
1024}
1025
1026pub fn spawn_pool_maintenance(pool: PgPool) {
1031 let interval_secs = std::cmp::max(pool.inner.config.idle_timeout.as_secs() / 2, 5);
1032 tokio::spawn(async move {
1033 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
1034 loop {
1035 interval.tick().await;
1036 if pool.is_closed() {
1037 break;
1038 }
1039 pool.maintain().await;
1040 }
1041 });
1042}
1043
1044pub(super) fn maintenance_backfill_deficit(
1045 max_connections: usize,
1046 min_connections: usize,
1047 idle_count: usize,
1048 checked_out_slots: usize,
1049) -> usize {
1050 let target_idle = min_connections.min(max_connections);
1051 if idle_count >= target_idle {
1052 return 0;
1053 }
1054
1055 let needed_idle = target_idle - idle_count;
1056 let available_slots =
1057 max_connections.saturating_sub(idle_count.saturating_add(checked_out_slots));
1058 needed_idle.min(available_slots)
1059}
1060
1061pub(super) fn validate_pool_config(config: &PoolConfig) -> PgResult<()> {
1062 if config.max_connections == 0 {
1063 return Err(PgError::Connection(
1064 "Invalid PoolConfig: max_connections must be >= 1".to_string(),
1065 ));
1066 }
1067 if config.min_connections > config.max_connections {
1068 return Err(PgError::Connection(format!(
1069 "Invalid PoolConfig: min_connections ({}) must be <= max_connections ({})",
1070 config.min_connections, config.max_connections
1071 )));
1072 }
1073 if config.acquire_timeout.is_zero() {
1074 return Err(PgError::Connection(
1075 "Invalid PoolConfig: acquire_timeout must be > 0".to_string(),
1076 ));
1077 }
1078 if config.connect_timeout.is_zero() {
1079 return Err(PgError::Connection(
1080 "Invalid PoolConfig: connect_timeout must be > 0".to_string(),
1081 ));
1082 }
1083 if config.leaked_cleanup_queue == 0 {
1084 return Err(PgError::Connection(
1085 "Invalid PoolConfig: leaked_cleanup_queue must be >= 1".to_string(),
1086 ));
1087 }
1088 Ok(())
1089}
1090
1091pub(super) async fn execute_simple_with_timeout(
1092 conn: &mut PgConnection,
1093 sql: &str,
1094 timeout: Duration,
1095 operation: &str,
1096) -> PgResult<()> {
1097 match tokio::time::timeout(timeout, conn.execute_simple(sql)).await {
1098 Ok(result) => result,
1099 Err(_) => {
1100 conn.mark_io_desynced();
1101 Err(PgError::Timeout(format!(
1102 "{} timeout after {:?} (pool config connect_timeout)",
1103 operation, timeout
1104 )))
1105 }
1106 }
1107}