Skip to main content

qail_pg/driver/pool/
churn.rs

1//! Pool statistics and connection churn circuit breaker.
2
3use super::config::PoolConfig;
4use std::collections::HashMap;
5use std::sync::OnceLock;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::time::{Duration, Instant};
8
9/// Pool statistics for monitoring.
10#[derive(Debug, Clone, Default)]
11pub struct PoolStats {
12    /// Connections currently checked out by callers.
13    pub active: usize,
14    /// Connections idle in the pool, ready for reuse.
15    pub idle: usize,
16    /// Callers waiting for a connection.
17    pub pending: usize,
18    /// Maximum connections configured
19    pub max_size: usize,
20    /// Cumulative connections created since pool startup.
21    pub total_created: usize,
22}
23
24pub(super) const POOL_CHURN_THRESHOLD: usize = 24;
25const POOL_CHURN_WINDOW: Duration = Duration::from_secs(15);
26const POOL_CHURN_COOLDOWN: Duration = Duration::from_secs(5);
27
28#[derive(Debug, Clone)]
29pub(super) struct PoolChurnState {
30    window_started_at: Instant,
31    failure_count: usize,
32    open_until: Option<Instant>,
33}
34
35pub(super) fn pool_churn_registry() -> &'static std::sync::Mutex<HashMap<String, PoolChurnState>> {
36    static REGISTRY: OnceLock<std::sync::Mutex<HashMap<String, PoolChurnState>>> = OnceLock::new();
37    REGISTRY.get_or_init(|| std::sync::Mutex::new(HashMap::new()))
38}
39
40pub(super) fn pool_churn_key(config: &PoolConfig) -> String {
41    format!(
42        "{}:{}:{}:{}",
43        config.host, config.port, config.user, config.database
44    )
45}
46
47pub(super) fn pool_churn_remaining_open(config: &PoolConfig) -> Option<Duration> {
48    let now = Instant::now();
49    let key = pool_churn_key(config);
50    let Ok(mut registry) = pool_churn_registry().lock() else {
51        return None;
52    };
53    let state = registry.get_mut(&key)?;
54    let until = state.open_until?;
55    if until > now {
56        return Some(until.duration_since(now));
57    }
58    state.open_until = None;
59    state.failure_count = 0;
60    state.window_started_at = now;
61    None
62}
63
64pub(super) fn record_pool_connection_destroy(reason: &'static str) {
65    metrics::counter!("qail_pg_pool_connection_destroyed_total", "reason" => reason).increment(1);
66}
67
68pub(super) fn pool_churn_record_destroy(config: &PoolConfig, reason: &'static str) {
69    record_pool_connection_destroy(reason);
70
71    let now = Instant::now();
72    let key = pool_churn_key(config);
73    let Ok(mut registry) = pool_churn_registry().lock() else {
74        return;
75    };
76    let state = registry.entry(key).or_insert_with(|| PoolChurnState {
77        window_started_at: now,
78        failure_count: 0,
79        open_until: None,
80    });
81
82    if let Some(until) = state.open_until {
83        if until > now {
84            return;
85        }
86        state.open_until = None;
87        state.failure_count = 0;
88        state.window_started_at = now;
89    }
90
91    if now.duration_since(state.window_started_at) > POOL_CHURN_WINDOW {
92        state.window_started_at = now;
93        state.failure_count = 0;
94    }
95
96    state.failure_count += 1;
97    if state.failure_count >= POOL_CHURN_THRESHOLD {
98        metrics::counter!("qail_pg_pool_churn_circuit_open_total").increment(1);
99        state.open_until = Some(now + POOL_CHURN_COOLDOWN);
100        state.failure_count = 0;
101        state.window_started_at = now;
102        tracing::warn!(
103            host = %config.host,
104            port = config.port,
105            user = %config.user,
106            db = %config.database,
107            threshold = POOL_CHURN_THRESHOLD as u64,
108            cooldown_ms = POOL_CHURN_COOLDOWN.as_millis() as u64,
109            "pool_connection_churn_circuit_opened"
110        );
111    }
112}
113
114pub(super) fn decrement_active_count_saturating(counter: &AtomicUsize) {
115    let mut current = counter.load(Ordering::Relaxed);
116    loop {
117        if current == 0 {
118            return;
119        }
120        match counter.compare_exchange_weak(
121            current,
122            current - 1,
123            Ordering::Relaxed,
124            Ordering::Relaxed,
125        ) {
126            Ok(_) => return,
127            Err(actual) => current = actual,
128        }
129    }
130}