qail_pg/driver/pool/
churn.rs1use super::config::PoolConfig;
4use std::collections::HashMap;
5use std::sync::OnceLock;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::time::{Duration, Instant};
8
9#[derive(Debug, Clone, Default)]
11pub struct PoolStats {
12 pub active: usize,
14 pub idle: usize,
16 pub pending: usize,
18 pub max_size: usize,
20 pub total_created: usize,
22}
23
24pub(super) const POOL_CHURN_THRESHOLD: usize = 24;
25const POOL_CHURN_WINDOW: Duration = Duration::from_secs(15);
26const POOL_CHURN_COOLDOWN: Duration = Duration::from_secs(5);
27
28#[derive(Debug, Clone)]
29pub(super) struct PoolChurnState {
30 window_started_at: Instant,
31 failure_count: usize,
32 open_until: Option<Instant>,
33}
34
35pub(super) fn pool_churn_registry() -> &'static std::sync::Mutex<HashMap<String, PoolChurnState>> {
36 static REGISTRY: OnceLock<std::sync::Mutex<HashMap<String, PoolChurnState>>> = OnceLock::new();
37 REGISTRY.get_or_init(|| std::sync::Mutex::new(HashMap::new()))
38}
39
40pub(super) fn pool_churn_key(config: &PoolConfig) -> String {
41 format!(
42 "{}:{}:{}:{}",
43 config.host, config.port, config.user, config.database
44 )
45}
46
47pub(super) fn pool_churn_remaining_open(config: &PoolConfig) -> Option<Duration> {
48 let now = Instant::now();
49 let key = pool_churn_key(config);
50 let Ok(mut registry) = pool_churn_registry().lock() else {
51 return None;
52 };
53 let state = registry.get_mut(&key)?;
54 let until = state.open_until?;
55 if until > now {
56 return Some(until.duration_since(now));
57 }
58 state.open_until = None;
59 state.failure_count = 0;
60 state.window_started_at = now;
61 None
62}
63
64pub(super) fn record_pool_connection_destroy(reason: &'static str) {
65 metrics::counter!("qail_pg_pool_connection_destroyed_total", "reason" => reason).increment(1);
66}
67
68pub(super) fn pool_churn_record_destroy(config: &PoolConfig, reason: &'static str) {
69 record_pool_connection_destroy(reason);
70
71 let now = Instant::now();
72 let key = pool_churn_key(config);
73 let Ok(mut registry) = pool_churn_registry().lock() else {
74 return;
75 };
76 let state = registry
77 .entry(key.clone())
78 .or_insert_with(|| PoolChurnState {
79 window_started_at: now,
80 failure_count: 0,
81 open_until: None,
82 });
83
84 if let Some(until) = state.open_until {
85 if until > now {
86 return;
87 }
88 state.open_until = None;
89 state.failure_count = 0;
90 state.window_started_at = now;
91 }
92
93 if now.duration_since(state.window_started_at) > POOL_CHURN_WINDOW {
94 state.window_started_at = now;
95 state.failure_count = 0;
96 }
97
98 state.failure_count += 1;
99 if state.failure_count >= POOL_CHURN_THRESHOLD {
100 metrics::counter!("qail_pg_pool_churn_circuit_open_total").increment(1);
101 state.open_until = Some(now + POOL_CHURN_COOLDOWN);
102 state.failure_count = 0;
103 state.window_started_at = now;
104 tracing::warn!(
105 host = %config.host,
106 port = config.port,
107 user = %config.user,
108 db = %config.database,
109 threshold = POOL_CHURN_THRESHOLD as u64,
110 cooldown_ms = POOL_CHURN_COOLDOWN.as_millis() as u64,
111 "pool_connection_churn_circuit_opened"
112 );
113 }
114}
115
116pub(super) fn decrement_active_count_saturating(counter: &AtomicUsize) {
117 let mut current = counter.load(Ordering::Relaxed);
118 loop {
119 if current == 0 {
120 return;
121 }
122 match counter.compare_exchange_weak(
123 current,
124 current - 1,
125 Ordering::Relaxed,
126 Ordering::Relaxed,
127 ) {
128 Ok(_) => return,
129 Err(actual) => current = actual,
130 }
131 }
132}