1use super::ScopedPoolFuture;
5use super::churn::{
6 PoolStats, decrement_active_count_saturating, pool_churn_record_destroy,
7 pool_churn_remaining_open, record_pool_connection_destroy,
8};
9use super::config::PoolConfig;
10use super::connection::PooledConn;
11use super::connection::PooledConnection;
12use super::gss::*;
13use crate::driver::{
14 AstPipelineMode, AutoCountPath, AutoCountPlan, ConnectOptions, PgConnection, PgError, PgResult,
15 is_ignorable_session_message, unexpected_backend_message,
16};
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
19use std::time::{Duration, Instant};
20use tokio::sync::{Mutex, Semaphore};
21use tokio::task::JoinSet;
22
23pub(super) const MAX_HOT_STATEMENTS: usize = 32;
25
26pub(super) struct PgPoolInner {
28 pub(super) config: PoolConfig,
29 pub(super) connections: Mutex<Vec<PooledConn>>,
30 pub(super) semaphore: Semaphore,
31 pub(super) closed: AtomicBool,
32 pub(super) active_count: AtomicUsize,
33 pub(super) total_created: AtomicUsize,
34 pub(super) leaked_cleanup_inflight: AtomicUsize,
35 pub(super) hot_statements: std::sync::RwLock<std::collections::HashMap<u64, (String, String)>>,
39}
40
41pub(super) fn handle_hot_preprepare_message(
42 msg: &crate::protocol::BackendMessage,
43 parse_complete_count: &mut usize,
44 error: &mut Option<PgError>,
45) -> PgResult<bool> {
46 match msg {
47 crate::protocol::BackendMessage::ParseComplete => {
48 *parse_complete_count += 1;
49 Ok(false)
50 }
51 crate::protocol::BackendMessage::ErrorResponse(err) => {
52 if error.is_none() {
53 *error = Some(PgError::QueryServer(err.clone().into()));
54 }
55 Ok(false)
56 }
57 crate::protocol::BackendMessage::ReadyForQuery(_) => Ok(true),
58 msg if is_ignorable_session_message(msg) => Ok(false),
59 other => Err(unexpected_backend_message("pool hot pre-prepare", other)),
60 }
61}
62
63impl PgPoolInner {
64 pub(super) async fn return_connection(&self, conn: PgConnection, created_at: Instant) {
65 decrement_active_count_saturating(&self.active_count);
66
67 if conn.is_io_desynced() {
68 tracing::warn!(
69 host = %self.config.host,
70 port = self.config.port,
71 user = %self.config.user,
72 db = %self.config.database,
73 "pool_return_desynced: dropping connection due to prior I/O/protocol desync"
74 );
75 record_pool_connection_destroy("pool_desynced_drop");
76 self.semaphore.add_permits(1);
77 pool_churn_record_destroy(&self.config, "return_desynced");
78 return;
79 }
80
81 if self.closed.load(Ordering::Relaxed) {
82 record_pool_connection_destroy("pool_closed_drop");
83 self.semaphore.add_permits(1);
84 return;
85 }
86
87 let mut connections = self.connections.lock().await;
88 if connections.len() < self.config.max_connections {
89 connections.push(PooledConn {
90 conn,
91 created_at,
92 last_used: Instant::now(),
93 });
94 } else {
95 record_pool_connection_destroy("pool_overflow_drop");
96 }
97
98 self.semaphore.add_permits(1);
99 }
100
101 async fn get_healthy_connection(&self) -> Option<PooledConn> {
103 let mut connections = self.connections.lock().await;
104
105 while let Some(pooled) = connections.pop() {
106 if pooled.last_used.elapsed() > self.config.idle_timeout {
107 tracing::debug!(
108 idle_secs = pooled.last_used.elapsed().as_secs(),
109 timeout_secs = self.config.idle_timeout.as_secs(),
110 "pool_checkout_evict: connection exceeded idle timeout"
111 );
112 record_pool_connection_destroy("idle_timeout_evict");
113 continue;
114 }
115
116 if let Some(max_life) = self.config.max_lifetime
117 && pooled.created_at.elapsed() > max_life
118 {
119 tracing::debug!(
120 age_secs = pooled.created_at.elapsed().as_secs(),
121 max_lifetime_secs = max_life.as_secs(),
122 "pool_checkout_evict: connection exceeded max lifetime"
123 );
124 record_pool_connection_destroy("max_lifetime_evict");
125 continue;
126 }
127
128 return Some(pooled);
129 }
130
131 None
132 }
133}
134
135#[derive(Clone)]
146pub struct PgPool {
147 pub(super) inner: Arc<PgPoolInner>,
148}
149
150impl PgPool {
151 pub async fn from_config() -> PgResult<Self> {
158 let qail = qail_core::config::QailConfig::load()
159 .map_err(|e| PgError::Connection(format!("Config error: {}", e)))?;
160 let config = PoolConfig::from_qail_config(&qail)?;
161 Self::connect(config).await
162 }
163
164 pub async fn connect(config: PoolConfig) -> PgResult<Self> {
166 validate_pool_config(&config)?;
167
168 let semaphore = Semaphore::new(config.max_connections);
170
171 let mut initial_connections = Vec::new();
172 for _ in 0..config.min_connections {
173 let conn = Self::create_connection(&config).await?;
174 initial_connections.push(PooledConn {
175 conn,
176 created_at: Instant::now(),
177 last_used: Instant::now(),
178 });
179 }
180
181 let initial_count = initial_connections.len();
182
183 let inner = Arc::new(PgPoolInner {
184 config,
185 connections: Mutex::new(initial_connections),
186 semaphore,
187 closed: AtomicBool::new(false),
188 active_count: AtomicUsize::new(0),
189 total_created: AtomicUsize::new(initial_count),
190 leaked_cleanup_inflight: AtomicUsize::new(0),
191 hot_statements: std::sync::RwLock::new(std::collections::HashMap::new()),
192 });
193
194 Ok(Self { inner })
195 }
196
197 pub async fn acquire_raw(&self) -> PgResult<PooledConnection> {
212 if self.inner.closed.load(Ordering::Relaxed) {
213 return Err(PgError::PoolClosed);
214 }
215
216 if let Some(remaining) = pool_churn_remaining_open(&self.inner.config) {
217 metrics::counter!("qail_pg_pool_churn_circuit_reject_total").increment(1);
218 tracing::warn!(
219 host = %self.inner.config.host,
220 port = self.inner.config.port,
221 user = %self.inner.config.user,
222 db = %self.inner.config.database,
223 remaining_ms = remaining.as_millis() as u64,
224 "pool_connection_churn_circuit_open"
225 );
226 return Err(PgError::PoolExhausted {
227 max: self.inner.config.max_connections,
228 });
229 }
230
231 let acquire_timeout = self.inner.config.acquire_timeout;
233 let permit =
234 match tokio::time::timeout(acquire_timeout, self.inner.semaphore.acquire()).await {
235 Ok(permit) => permit.map_err(|_| PgError::PoolClosed)?,
236 Err(_) => {
237 metrics::counter!("qail_pg_pool_acquire_timeouts_total").increment(1);
238 return Err(PgError::Timeout(format!(
239 "pool acquire after {}s ({} max connections)",
240 acquire_timeout.as_secs(),
241 self.inner.config.max_connections
242 )));
243 }
244 };
245
246 if self.inner.closed.load(Ordering::Relaxed) {
247 return Err(PgError::PoolClosed);
248 }
249
250 let (mut conn, mut created_at) =
252 if let Some(pooled) = self.inner.get_healthy_connection().await {
253 (pooled.conn, pooled.created_at)
254 } else {
255 let conn = Self::create_connection(&self.inner.config).await?;
256 self.inner.total_created.fetch_add(1, Ordering::Relaxed);
257 (conn, Instant::now())
258 };
259
260 if self.inner.config.test_on_acquire
261 && let Err(e) = execute_simple_with_timeout(
262 &mut conn,
263 "SELECT 1",
264 self.inner.config.connect_timeout,
265 "pool checkout health check",
266 )
267 .await
268 {
269 tracing::warn!(
270 host = %self.inner.config.host,
271 port = self.inner.config.port,
272 user = %self.inner.config.user,
273 db = %self.inner.config.database,
274 error = %e,
275 "pool_health_check_failed: checkout probe failed, creating replacement connection"
276 );
277 pool_churn_record_destroy(&self.inner.config, "health_check_failed");
278 conn = Self::create_connection(&self.inner.config).await?;
279 self.inner.total_created.fetch_add(1, Ordering::Relaxed);
280 created_at = Instant::now();
281 }
282
283 let missing: Vec<(u64, String, String)> = {
286 if let Ok(hot) = self.inner.hot_statements.read() {
287 hot.iter()
288 .filter(|(hash, _)| !conn.stmt_cache.contains(hash))
289 .map(|(hash, (name, sql))| (*hash, name.clone(), sql.clone()))
290 .collect()
291 } else {
292 Vec::new()
293 }
294 }; if !missing.is_empty() {
297 use crate::protocol::PgEncoder;
298 let mut buf = bytes::BytesMut::new();
299 for (_, name, sql) in &missing {
300 let parse_msg = PgEncoder::try_encode_parse(name, sql, &[])?;
301 buf.extend_from_slice(&parse_msg);
302 }
303 PgEncoder::encode_sync_to(&mut buf);
304 let preprepare_timeout = self.inner.config.connect_timeout;
305 let preprepare_result: PgResult<()> = match tokio::time::timeout(
306 preprepare_timeout,
307 async {
308 conn.send_bytes(&buf).await?;
309 let mut parse_complete_count = 0usize;
311 let mut parse_error: Option<PgError> = None;
312 loop {
313 let msg = conn.recv().await?;
314 if handle_hot_preprepare_message(
315 &msg,
316 &mut parse_complete_count,
317 &mut parse_error,
318 )? {
319 if let Some(err) = parse_error {
320 return Err(err);
321 }
322 if parse_complete_count != missing.len() {
323 return Err(PgError::Protocol(format!(
324 "hot pre-prepare completed with {} ParseComplete messages (expected {})",
325 parse_complete_count,
326 missing.len()
327 )));
328 }
329 break;
330 }
331 }
332 Ok::<(), PgError>(())
333 },
334 )
335 .await
336 {
337 Ok(res) => res,
338 Err(_) => Err(PgError::Timeout(format!(
339 "hot statement pre-prepare timeout after {:?} (pool config connect_timeout)",
340 preprepare_timeout
341 ))),
342 };
343
344 if let Err(e) = preprepare_result {
345 tracing::warn!(
346 host = %self.inner.config.host,
347 port = self.inner.config.port,
348 user = %self.inner.config.user,
349 db = %self.inner.config.database,
350 timeout_ms = preprepare_timeout.as_millis() as u64,
351 error = %e,
352 "pool_hot_prepare_failed: replacing connection to avoid handing out uncertain protocol state"
353 );
354 pool_churn_record_destroy(&self.inner.config, "hot_prepare_failed");
355 conn = Self::create_connection(&self.inner.config).await?;
356 self.inner.total_created.fetch_add(1, Ordering::Relaxed);
357 created_at = Instant::now();
358 } else {
359 for (hash, name, sql) in &missing {
361 conn.stmt_cache.put(*hash, name.clone());
362 conn.prepared_statements.insert(name.clone(), sql.clone());
363 }
364 }
365 }
366
367 self.inner.active_count.fetch_add(1, Ordering::Relaxed);
368 permit.forget();
370
371 Ok(PooledConnection {
372 conn: Some(conn),
373 pool: std::sync::Arc::clone(&self.inner),
374 rls_dirty: false,
375 created_at,
376 })
377 }
378
379 pub async fn acquire_with_rls(
395 &self,
396 ctx: qail_core::rls::RlsContext,
397 ) -> PgResult<PooledConnection> {
398 let mut conn = self.acquire_raw().await?;
400
401 let sql = crate::driver::rls::context_to_sql(&ctx);
403 let pg_conn = conn.get_mut()?;
404 if let Err(e) = execute_simple_with_timeout(
405 pg_conn,
406 &sql,
407 self.inner.config.connect_timeout,
408 "pool acquire_with_rls setup",
409 )
410 .await
411 {
412 if let Ok(pg_conn) = conn.get_mut() {
415 let _ = pg_conn.execute_simple("ROLLBACK").await;
416 }
417 conn.release().await;
418 return Err(e);
419 }
420
421 conn.rls_dirty = true;
423
424 Ok(conn)
425 }
426
427 pub async fn with_rls<T, F>(&self, ctx: qail_core::rls::RlsContext, f: F) -> PgResult<T>
431 where
432 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
433 {
434 let mut conn = self.acquire_with_rls(ctx).await?;
435 let out = f(&mut conn).await;
436 match out {
437 Ok(value) => {
438 conn.release_checked().await?;
439 Ok(value)
440 }
441 Err(err) => {
442 let _ = conn.rollback_and_release().await;
443 Err(err)
444 }
445 }
446 }
447
448 pub async fn with_system<T, F>(&self, f: F) -> PgResult<T>
450 where
451 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
452 {
453 self.with_rls(qail_core::rls::RlsContext::empty(), f).await
454 }
455
456 pub async fn with_global<T, F>(&self, f: F) -> PgResult<T>
458 where
459 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
460 {
461 self.with_rls(qail_core::rls::RlsContext::global(), f).await
462 }
463
464 pub async fn with_tenant<T, F>(&self, tenant_id: &str, f: F) -> PgResult<T>
466 where
467 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
468 {
469 self.with_rls(qail_core::rls::RlsContext::tenant(tenant_id), f)
470 .await
471 }
472
473 pub async fn acquire_with_rls_timeout(
478 &self,
479 ctx: qail_core::rls::RlsContext,
480 timeout_ms: u32,
481 ) -> PgResult<PooledConnection> {
482 let mut conn = self.acquire_raw().await?;
484
485 let sql = crate::driver::rls::context_to_sql_with_timeout(&ctx, timeout_ms);
487 let pg_conn = conn.get_mut()?;
488 if let Err(e) = execute_simple_with_timeout(
489 pg_conn,
490 &sql,
491 self.inner.config.connect_timeout,
492 "pool acquire_with_rls_timeout setup",
493 )
494 .await
495 {
496 if let Ok(pg_conn) = conn.get_mut() {
497 let _ = pg_conn.execute_simple("ROLLBACK").await;
498 }
499 conn.release().await;
500 return Err(e);
501 }
502
503 conn.rls_dirty = true;
505
506 Ok(conn)
507 }
508
509 pub async fn with_rls_timeout<T, F>(
511 &self,
512 ctx: qail_core::rls::RlsContext,
513 timeout_ms: u32,
514 f: F,
515 ) -> PgResult<T>
516 where
517 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
518 {
519 let mut conn = self.acquire_with_rls_timeout(ctx, timeout_ms).await?;
520 let out = f(&mut conn).await;
521 match out {
522 Ok(value) => {
523 conn.release_checked().await?;
524 Ok(value)
525 }
526 Err(err) => {
527 let _ = conn.rollback_and_release().await;
528 Err(err)
529 }
530 }
531 }
532
533 pub async fn acquire_with_rls_timeouts(
539 &self,
540 ctx: qail_core::rls::RlsContext,
541 statement_timeout_ms: u32,
542 lock_timeout_ms: u32,
543 ) -> PgResult<PooledConnection> {
544 let mut conn = self.acquire_raw().await?;
546
547 let sql = crate::driver::rls::context_to_sql_with_timeouts(
548 &ctx,
549 statement_timeout_ms,
550 lock_timeout_ms,
551 );
552 let pg_conn = conn.get_mut()?;
553 if let Err(e) = execute_simple_with_timeout(
554 pg_conn,
555 &sql,
556 self.inner.config.connect_timeout,
557 "pool acquire_with_rls_timeouts setup",
558 )
559 .await
560 {
561 if let Ok(pg_conn) = conn.get_mut() {
562 let _ = pg_conn.execute_simple("ROLLBACK").await;
563 }
564 conn.release().await;
565 return Err(e);
566 }
567
568 conn.rls_dirty = true;
569
570 Ok(conn)
571 }
572
573 pub async fn with_rls_timeouts<T, F>(
575 &self,
576 ctx: qail_core::rls::RlsContext,
577 statement_timeout_ms: u32,
578 lock_timeout_ms: u32,
579 f: F,
580 ) -> PgResult<T>
581 where
582 F: for<'a> FnOnce(&'a mut PooledConnection) -> ScopedPoolFuture<'a, T>,
583 {
584 let mut conn = self
585 .acquire_with_rls_timeouts(ctx, statement_timeout_ms, lock_timeout_ms)
586 .await?;
587 let out = f(&mut conn).await;
588 match out {
589 Ok(value) => {
590 conn.release_checked().await?;
591 Ok(value)
592 }
593 Err(err) => {
594 let _ = conn.rollback_and_release().await;
595 Err(err)
596 }
597 }
598 }
599
600 pub async fn acquire_system(&self) -> PgResult<PooledConnection> {
610 let ctx = qail_core::rls::RlsContext::empty();
611 self.acquire_with_rls(ctx).await
612 }
613
614 pub async fn acquire_global(&self) -> PgResult<PooledConnection> {
620 self.acquire_with_rls(qail_core::rls::RlsContext::global())
621 .await
622 }
623
624 pub async fn acquire_for_tenant(&self, tenant_id: &str) -> PgResult<PooledConnection> {
636 self.acquire_with_rls(qail_core::rls::RlsContext::tenant(tenant_id))
637 .await
638 }
639
640 pub async fn acquire_with_branch(
654 &self,
655 ctx: &qail_core::branch::BranchContext,
656 ) -> PgResult<PooledConnection> {
657 let mut conn = self.acquire_raw().await?;
659
660 if let Some(branch_name) = ctx.branch_name() {
661 let sql = crate::driver::branch_sql::branch_context_sql(branch_name);
662 let pg_conn = conn.get_mut()?;
663 if let Err(e) = execute_simple_with_timeout(
664 pg_conn,
665 &sql,
666 self.inner.config.connect_timeout,
667 "pool acquire_with_branch setup",
668 )
669 .await
670 {
671 if let Ok(pg_conn) = conn.get_mut() {
672 let _ = pg_conn.execute_simple("ROLLBACK").await;
673 }
674 conn.release().await;
675 return Err(e);
676 }
677 conn.rls_dirty = true; }
679
680 Ok(conn)
681 }
682
683 pub async fn idle_count(&self) -> usize {
685 self.inner.connections.lock().await.len()
686 }
687
688 pub fn active_count(&self) -> usize {
690 self.inner.active_count.load(Ordering::Relaxed)
691 }
692
693 pub fn max_connections(&self) -> usize {
695 self.inner.config.max_connections
696 }
697
698 pub fn plan_auto_count(&self, batch_len: usize) -> AutoCountPlan {
700 AutoCountPlan::for_pool(
701 batch_len,
702 self.inner.config.max_connections,
703 self.inner.semaphore.available_permits(),
704 )
705 }
706
707 pub async fn execute_count_auto_with_plan(
709 &self,
710 cmds: &[qail_core::ast::Qail],
711 ) -> PgResult<(usize, AutoCountPlan)> {
712 let plan = self.plan_auto_count(cmds.len());
713
714 let completed = match plan.path {
715 AutoCountPath::SingleCached => {
716 if cmds.is_empty() {
717 0
718 } else {
719 let mut conn = self.acquire_system().await?;
720 let run_result = conn.fetch_all_cached(&cmds[0]).await;
721 conn.release().await;
722 let _ = run_result?;
723 1
724 }
725 }
726 AutoCountPath::PipelineOneShot | AutoCountPath::PipelineCached => {
727 let mode = if matches!(plan.path, AutoCountPath::PipelineOneShot) {
728 AstPipelineMode::OneShot
729 } else {
730 AstPipelineMode::Cached
731 };
732
733 let mut pooled = self.acquire_system().await?;
734 let run_result = {
735 let conn = pooled.get_mut()?;
736 conn.pipeline_execute_count_ast_with_mode(cmds, mode).await
737 };
738 pooled.release().await;
739 run_result?
740 }
741 AutoCountPath::PoolParallel => {
742 if cmds.is_empty() {
743 0
744 } else {
745 let all_cmds = Arc::new(cmds.to_vec());
746 let mut tasks: JoinSet<PgResult<usize>> = JoinSet::new();
747
748 for worker in 0..plan.workers {
749 let start = worker * plan.chunk_size;
750 if start >= all_cmds.len() {
751 break;
752 }
753 let end = (start + plan.chunk_size).min(all_cmds.len());
754 let pool = self.clone();
755 let all_cmds = Arc::clone(&all_cmds);
756
757 tasks.spawn(async move {
758 let mut pooled = pool.acquire_system().await?;
759 let run_result = {
760 let conn = pooled.get_mut()?;
761 conn.pipeline_execute_count_ast_with_mode(
762 &all_cmds[start..end],
763 AstPipelineMode::Auto,
764 )
765 .await
766 };
767 pooled.release().await;
768 run_result
769 });
770 }
771
772 let mut total = 0usize;
773 while let Some(joined) = tasks.join_next().await {
774 match joined {
775 Ok(Ok(count)) => {
776 total += count;
777 }
778 Ok(Err(err)) => return Err(err),
779 Err(err) => {
780 return Err(PgError::Connection(format!(
781 "auto pool worker join failed: {err}"
782 )));
783 }
784 }
785 }
786 total
787 }
788 }
789 };
790
791 Ok((completed, plan))
792 }
793
794 #[inline]
796 pub async fn execute_count_auto(&self, cmds: &[qail_core::ast::Qail]) -> PgResult<usize> {
797 let (completed, _plan) = self.execute_count_auto_with_plan(cmds).await?;
798 Ok(completed)
799 }
800
801 pub async fn stats(&self) -> PoolStats {
803 let idle = self.inner.connections.lock().await.len();
804 let active = self.inner.active_count.load(Ordering::Relaxed);
805 let used_slots = self
806 .inner
807 .config
808 .max_connections
809 .saturating_sub(self.inner.semaphore.available_permits());
810 PoolStats {
811 active,
812 idle,
813 pending: used_slots.saturating_sub(active),
814 max_size: self.inner.config.max_connections,
815 total_created: self.inner.total_created.load(Ordering::Relaxed),
816 }
817 }
818
819 pub fn is_closed(&self) -> bool {
821 self.inner.closed.load(Ordering::Relaxed)
822 }
823
824 pub async fn close(&self) {
831 self.close_graceful(self.inner.config.acquire_timeout).await;
832 }
833
834 pub async fn close_graceful(&self, drain_timeout: Duration) {
836 self.inner.closed.store(true, Ordering::Relaxed);
837 self.inner.semaphore.close();
839
840 let deadline = Instant::now() + drain_timeout;
841 loop {
842 let active = self.inner.active_count.load(Ordering::Relaxed);
843 if active == 0 {
844 break;
845 }
846 if Instant::now() >= deadline {
847 tracing::warn!(
848 active_connections = active,
849 timeout_ms = drain_timeout.as_millis() as u64,
850 "pool_close_drain_timeout: forcing idle cleanup while active connections remain"
851 );
852 break;
853 }
854 tokio::time::sleep(Duration::from_millis(25)).await;
855 }
856
857 let mut connections = self.inner.connections.lock().await;
858 let dropped_idle = connections.len();
859 connections.clear();
860 tracing::info!(
861 dropped_idle_connections = dropped_idle,
862 active_connections = self.inner.active_count.load(Ordering::Relaxed),
863 "pool_closed"
864 );
865 }
866
867 async fn create_connection(config: &PoolConfig) -> PgResult<PgConnection> {
869 if !config.auth_settings.has_any_password_method()
870 && config.mtls.is_none()
871 && config.password.is_some()
872 {
873 return Err(PgError::Auth(
874 "Invalid PoolConfig: all password auth methods are disabled".to_string(),
875 ));
876 }
877
878 let options = ConnectOptions {
879 tls_mode: config.tls_mode,
880 gss_enc_mode: config.gss_enc_mode,
881 tls_ca_cert_pem: config.tls_ca_cert_pem.clone(),
882 mtls: config.mtls.clone(),
883 gss_token_provider: config.gss_token_provider,
884 gss_token_provider_ex: config.gss_token_provider_ex.clone(),
885 auth: config.auth_settings,
886 startup_params: Vec::new(),
887 };
888
889 if let Some(remaining) = gss_circuit_remaining_open(config) {
890 metrics::counter!("qail_pg_gss_circuit_open_total").increment(1);
891 tracing::warn!(
892 host = %config.host,
893 port = config.port,
894 user = %config.user,
895 db = %config.database,
896 remaining_ms = remaining.as_millis() as u64,
897 "gss_connect_circuit_open"
898 );
899 return Err(PgError::Connection(format!(
900 "GSS connection circuit is open; retry after {:?}",
901 remaining
902 )));
903 }
904
905 let mut attempt = 0usize;
906 loop {
907 let connect_result = tokio::time::timeout(
908 config.connect_timeout,
909 PgConnection::connect_with_options(
910 &config.host,
911 config.port,
912 &config.user,
913 &config.database,
914 config.password.as_deref(),
915 options.clone(),
916 ),
917 )
918 .await;
919
920 let connect_result = match connect_result {
921 Ok(result) => result,
922 Err(_) => Err(PgError::Timeout(format!(
923 "connect timeout after {:?} (pool config connect_timeout)",
924 config.connect_timeout
925 ))),
926 };
927
928 match connect_result {
929 Ok(conn) => {
930 metrics::counter!("qail_pg_pool_connect_success_total").increment(1);
931 gss_circuit_record_success(config);
932 return Ok(conn);
933 }
934 Err(err) if should_retry_gss_connect_error(config, attempt, &err) => {
935 metrics::counter!("qail_pg_gss_connect_retries_total").increment(1);
936 gss_circuit_record_failure(config);
937 let delay = gss_retry_delay(config.gss_retry_base_delay, attempt);
938 tracing::warn!(
939 host = %config.host,
940 port = config.port,
941 user = %config.user,
942 db = %config.database,
943 attempt = attempt + 1,
944 delay_ms = delay.as_millis() as u64,
945 error = %err,
946 "gss_connect_retry"
947 );
948 tokio::time::sleep(delay).await;
949 attempt += 1;
950 }
951 Err(err) => {
952 metrics::counter!("qail_pg_pool_connect_failures_total").increment(1);
953 if should_track_gss_circuit_error(config, &err) {
954 metrics::counter!("qail_pg_gss_connect_failures_total").increment(1);
955 gss_circuit_record_failure(config);
956 }
957 return Err(err);
958 }
959 }
960 }
961 }
962
963 pub async fn maintain(&self) {
966 if self.inner.closed.load(Ordering::Relaxed) {
967 return;
968 }
969
970 let evicted = {
972 let mut connections = self.inner.connections.lock().await;
973 let before = connections.len();
974 connections.retain(|pooled| {
975 if pooled.last_used.elapsed() > self.inner.config.idle_timeout {
976 record_pool_connection_destroy("idle_sweep_evict");
977 return false;
978 }
979 if let Some(max_life) = self.inner.config.max_lifetime
980 && pooled.created_at.elapsed() > max_life
981 {
982 record_pool_connection_destroy("lifetime_sweep_evict");
983 return false;
984 }
985 true
986 });
987 before - connections.len()
988 };
989
990 if evicted > 0 {
991 tracing::debug!(evicted, "pool_maintenance: evicted stale idle connections");
992 }
993
994 let min = self.inner.config.min_connections;
996 if min == 0 {
997 return;
998 }
999
1000 let idle_count = self.inner.connections.lock().await.len();
1001 let checked_out_slots = self
1002 .inner
1003 .config
1004 .max_connections
1005 .saturating_sub(self.inner.semaphore.available_permits());
1006 let deficit = maintenance_backfill_deficit(
1007 self.inner.config.max_connections,
1008 min,
1009 idle_count,
1010 checked_out_slots,
1011 );
1012 if deficit == 0 {
1013 return;
1014 }
1015 let mut created = 0usize;
1016 for _ in 0..deficit {
1017 match Self::create_connection(&self.inner.config).await {
1018 Ok(conn) => {
1019 self.inner.total_created.fetch_add(1, Ordering::Relaxed);
1020 let mut connections = self.inner.connections.lock().await;
1021 if connections.len() < self.inner.config.max_connections {
1022 connections.push(PooledConn {
1023 conn,
1024 created_at: Instant::now(),
1025 last_used: Instant::now(),
1026 });
1027 created += 1;
1028 } else {
1029 break;
1031 }
1032 }
1033 Err(e) => {
1034 tracing::warn!(error = %e, "pool_maintenance: backfill connection failed");
1035 break; }
1037 }
1038 }
1039
1040 if created > 0 {
1041 tracing::debug!(
1042 created,
1043 min_connections = min,
1044 "pool_maintenance: backfilled idle connections"
1045 );
1046 }
1047 }
1048}
1049
1050pub fn spawn_pool_maintenance(pool: PgPool) {
1055 let interval_secs = std::cmp::max(pool.inner.config.idle_timeout.as_secs() / 2, 5);
1056 tokio::spawn(async move {
1057 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
1058 loop {
1059 interval.tick().await;
1060 if pool.is_closed() {
1061 break;
1062 }
1063 pool.maintain().await;
1064 }
1065 });
1066}
1067
1068pub(super) fn maintenance_backfill_deficit(
1069 max_connections: usize,
1070 min_connections: usize,
1071 idle_count: usize,
1072 checked_out_slots: usize,
1073) -> usize {
1074 let target_idle = min_connections.min(max_connections);
1075 if idle_count >= target_idle {
1076 return 0;
1077 }
1078
1079 let needed_idle = target_idle - idle_count;
1080 let available_slots =
1081 max_connections.saturating_sub(idle_count.saturating_add(checked_out_slots));
1082 needed_idle.min(available_slots)
1083}
1084
1085pub(super) fn validate_pool_config(config: &PoolConfig) -> PgResult<()> {
1086 if config.max_connections == 0 {
1087 return Err(PgError::Connection(
1088 "Invalid PoolConfig: max_connections must be >= 1".to_string(),
1089 ));
1090 }
1091 if config.min_connections > config.max_connections {
1092 return Err(PgError::Connection(format!(
1093 "Invalid PoolConfig: min_connections ({}) must be <= max_connections ({})",
1094 config.min_connections, config.max_connections
1095 )));
1096 }
1097 if config.acquire_timeout.is_zero() {
1098 return Err(PgError::Connection(
1099 "Invalid PoolConfig: acquire_timeout must be > 0".to_string(),
1100 ));
1101 }
1102 if config.connect_timeout.is_zero() {
1103 return Err(PgError::Connection(
1104 "Invalid PoolConfig: connect_timeout must be > 0".to_string(),
1105 ));
1106 }
1107 if config.leaked_cleanup_queue == 0 {
1108 return Err(PgError::Connection(
1109 "Invalid PoolConfig: leaked_cleanup_queue must be >= 1".to_string(),
1110 ));
1111 }
1112 Ok(())
1113}
1114
1115pub(super) async fn execute_simple_with_timeout(
1116 conn: &mut PgConnection,
1117 sql: &str,
1118 timeout: Duration,
1119 operation: &str,
1120) -> PgResult<()> {
1121 match tokio::time::timeout(timeout, conn.execute_simple(sql)).await {
1122 Ok(result) => result,
1123 Err(_) => {
1124 conn.mark_io_desynced();
1125 Err(PgError::Timeout(format!(
1126 "{} timeout after {:?} (pool config connect_timeout)",
1127 operation, timeout
1128 )))
1129 }
1130 }
1131}