Skip to main content

heliosdb_proxy/pool/
metrics.rs

1//! Pool Mode Metrics
2//!
3//! Tracks connection pool statistics for monitoring and debugging.
4
5use super::mode::PoolingMode;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10
11/// Connection pool metrics
12#[derive(Debug)]
13pub struct PoolModeMetrics {
14    /// Total connections acquired
15    pub acquires: AtomicU64,
16    /// Total connections released
17    pub releases: AtomicU64,
18    /// Connection acquire failures
19    pub acquire_failures: AtomicU64,
20    /// Acquire timeouts
21    pub acquire_timeouts: AtomicU64,
22    /// Connections created
23    pub connections_created: AtomicU64,
24    /// Connections closed
25    pub connections_closed: AtomicU64,
26    /// Connection resets performed
27    pub connection_resets: AtomicU64,
28    /// Reset failures
29    pub reset_failures: AtomicU64,
30    /// Total transactions completed
31    pub transactions_completed: AtomicU64,
32    /// Total statements executed
33    pub statements_executed: AtomicU64,
34    /// Current active leases
35    pub active_leases: AtomicU64,
36    /// Peak active leases
37    pub peak_active_leases: AtomicU64,
38    /// Queue wait count (when pool exhausted)
39    pub queue_waits: AtomicU64,
40    /// Per-mode statistics
41    mode_stats: Arc<parking_lot::RwLock<HashMap<PoolingMode, ModeStats>>>,
42}
43
44/// Per-mode statistics
45#[derive(Debug, Clone, Default, Serialize, Deserialize)]
46pub struct ModeStats {
47    /// Connections using this mode
48    pub active_connections: u64,
49    /// Total acquires in this mode
50    pub total_acquires: u64,
51    /// Total releases in this mode
52    pub total_releases: u64,
53    /// Average lease duration (ms)
54    pub avg_lease_duration_ms: f64,
55    /// Average statements per lease
56    pub avg_statements_per_lease: f64,
57}
58
59impl Default for PoolModeMetrics {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65impl PoolModeMetrics {
66    /// Create new metrics instance
67    pub fn new() -> Self {
68        let mut mode_stats = HashMap::new();
69        mode_stats.insert(PoolingMode::Session, ModeStats::default());
70        mode_stats.insert(PoolingMode::Transaction, ModeStats::default());
71        mode_stats.insert(PoolingMode::Statement, ModeStats::default());
72
73        Self {
74            acquires: AtomicU64::new(0),
75            releases: AtomicU64::new(0),
76            acquire_failures: AtomicU64::new(0),
77            acquire_timeouts: AtomicU64::new(0),
78            connections_created: AtomicU64::new(0),
79            connections_closed: AtomicU64::new(0),
80            connection_resets: AtomicU64::new(0),
81            reset_failures: AtomicU64::new(0),
82            transactions_completed: AtomicU64::new(0),
83            statements_executed: AtomicU64::new(0),
84            active_leases: AtomicU64::new(0),
85            peak_active_leases: AtomicU64::new(0),
86            queue_waits: AtomicU64::new(0),
87            mode_stats: Arc::new(parking_lot::RwLock::new(mode_stats)),
88        }
89    }
90
91    /// Record a connection acquire
92    pub fn record_acquire(&self, mode: PoolingMode) {
93        self.acquires.fetch_add(1, Ordering::Relaxed);
94        let active = self.active_leases.fetch_add(1, Ordering::Relaxed) + 1;
95
96        // Update peak
97        loop {
98            let peak = self.peak_active_leases.load(Ordering::Relaxed);
99            if active <= peak {
100                break;
101            }
102            if self
103                .peak_active_leases
104                .compare_exchange(peak, active, Ordering::Relaxed, Ordering::Relaxed)
105                .is_ok()
106            {
107                break;
108            }
109        }
110
111        // Update mode stats
112        let mut stats = self.mode_stats.write();
113        if let Some(mode_stat) = stats.get_mut(&mode) {
114            mode_stat.active_connections += 1;
115            mode_stat.total_acquires += 1;
116        }
117    }
118
119    /// Record a connection release
120    pub fn record_release(&self, mode: PoolingMode, lease_duration_ms: u64, statements: u64) {
121        self.releases.fetch_add(1, Ordering::Relaxed);
122        self.active_leases.fetch_sub(1, Ordering::Relaxed);
123        self.statements_executed
124            .fetch_add(statements, Ordering::Relaxed);
125
126        // Update mode stats with running average
127        let mut stats = self.mode_stats.write();
128        if let Some(mode_stat) = stats.get_mut(&mode) {
129            mode_stat.active_connections = mode_stat.active_connections.saturating_sub(1);
130            mode_stat.total_releases += 1;
131
132            // Update running average for lease duration
133            let n = mode_stat.total_releases as f64;
134            mode_stat.avg_lease_duration_ms =
135                mode_stat.avg_lease_duration_ms * ((n - 1.0) / n) + (lease_duration_ms as f64 / n);
136
137            // Update running average for statements per lease
138            mode_stat.avg_statements_per_lease =
139                mode_stat.avg_statements_per_lease * ((n - 1.0) / n) + (statements as f64 / n);
140        }
141    }
142
143    /// Record an acquire failure
144    pub fn record_acquire_failure(&self) {
145        self.acquire_failures.fetch_add(1, Ordering::Relaxed);
146    }
147
148    /// Record an acquire timeout
149    pub fn record_acquire_timeout(&self) {
150        self.acquire_timeouts.fetch_add(1, Ordering::Relaxed);
151    }
152
153    /// Record connection creation
154    pub fn record_connection_created(&self) {
155        self.connections_created.fetch_add(1, Ordering::Relaxed);
156    }
157
158    /// Record connection close
159    pub fn record_connection_closed(&self) {
160        self.connections_closed.fetch_add(1, Ordering::Relaxed);
161    }
162
163    /// Record connection reset
164    pub fn record_reset(&self, success: bool) {
165        self.connection_resets.fetch_add(1, Ordering::Relaxed);
166        if !success {
167            self.reset_failures.fetch_add(1, Ordering::Relaxed);
168        }
169    }
170
171    /// Record transaction completion
172    pub fn record_transaction_complete(&self) {
173        self.transactions_completed.fetch_add(1, Ordering::Relaxed);
174    }
175
176    /// Record queue wait
177    pub fn record_queue_wait(&self) {
178        self.queue_waits.fetch_add(1, Ordering::Relaxed);
179    }
180
181    /// Get a snapshot of all metrics
182    pub fn snapshot(&self) -> MetricsSnapshot {
183        MetricsSnapshot {
184            acquires: self.acquires.load(Ordering::Relaxed),
185            releases: self.releases.load(Ordering::Relaxed),
186            acquire_failures: self.acquire_failures.load(Ordering::Relaxed),
187            acquire_timeouts: self.acquire_timeouts.load(Ordering::Relaxed),
188            connections_created: self.connections_created.load(Ordering::Relaxed),
189            connections_closed: self.connections_closed.load(Ordering::Relaxed),
190            connection_resets: self.connection_resets.load(Ordering::Relaxed),
191            reset_failures: self.reset_failures.load(Ordering::Relaxed),
192            transactions_completed: self.transactions_completed.load(Ordering::Relaxed),
193            statements_executed: self.statements_executed.load(Ordering::Relaxed),
194            active_leases: self.active_leases.load(Ordering::Relaxed),
195            peak_active_leases: self.peak_active_leases.load(Ordering::Relaxed),
196            queue_waits: self.queue_waits.load(Ordering::Relaxed),
197            mode_stats: self.mode_stats.read().clone(),
198        }
199    }
200
201    /// Reset all metrics
202    pub fn reset(&self) {
203        self.acquires.store(0, Ordering::Relaxed);
204        self.releases.store(0, Ordering::Relaxed);
205        self.acquire_failures.store(0, Ordering::Relaxed);
206        self.acquire_timeouts.store(0, Ordering::Relaxed);
207        self.connections_created.store(0, Ordering::Relaxed);
208        self.connections_closed.store(0, Ordering::Relaxed);
209        self.connection_resets.store(0, Ordering::Relaxed);
210        self.reset_failures.store(0, Ordering::Relaxed);
211        self.transactions_completed.store(0, Ordering::Relaxed);
212        self.statements_executed.store(0, Ordering::Relaxed);
213        // Note: active_leases and peak are not reset
214        self.queue_waits.store(0, Ordering::Relaxed);
215
216        let mut stats = self.mode_stats.write();
217        for stat in stats.values_mut() {
218            *stat = ModeStats::default();
219        }
220    }
221}
222
223/// Serializable metrics snapshot
224#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct MetricsSnapshot {
226    pub acquires: u64,
227    pub releases: u64,
228    pub acquire_failures: u64,
229    pub acquire_timeouts: u64,
230    pub connections_created: u64,
231    pub connections_closed: u64,
232    pub connection_resets: u64,
233    pub reset_failures: u64,
234    pub transactions_completed: u64,
235    pub statements_executed: u64,
236    pub active_leases: u64,
237    pub peak_active_leases: u64,
238    pub queue_waits: u64,
239    pub mode_stats: HashMap<PoolingMode, ModeStats>,
240}
241
242impl MetricsSnapshot {
243    /// Calculate connection efficiency (releases / acquires)
244    pub fn connection_efficiency(&self) -> f64 {
245        if self.acquires == 0 {
246            1.0
247        } else {
248            self.releases as f64 / self.acquires as f64
249        }
250    }
251
252    /// Calculate reset success rate
253    pub fn reset_success_rate(&self) -> f64 {
254        if self.connection_resets == 0 {
255            1.0
256        } else {
257            1.0 - (self.reset_failures as f64 / self.connection_resets as f64)
258        }
259    }
260
261    /// Calculate acquire success rate
262    pub fn acquire_success_rate(&self) -> f64 {
263        let total_attempts = self.acquires + self.acquire_failures + self.acquire_timeouts;
264        if total_attempts == 0 {
265            1.0
266        } else {
267            self.acquires as f64 / total_attempts as f64
268        }
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275
276    #[test]
277    fn test_metrics_new() {
278        let metrics = PoolModeMetrics::new();
279        let snapshot = metrics.snapshot();
280        assert_eq!(snapshot.acquires, 0);
281        assert_eq!(snapshot.releases, 0);
282        assert_eq!(snapshot.active_leases, 0);
283    }
284
285    #[test]
286    fn test_record_acquire_release() {
287        let metrics = PoolModeMetrics::new();
288
289        metrics.record_acquire(PoolingMode::Transaction);
290        assert_eq!(metrics.active_leases.load(Ordering::Relaxed), 1);
291        assert_eq!(metrics.acquires.load(Ordering::Relaxed), 1);
292
293        metrics.record_release(PoolingMode::Transaction, 100, 5);
294        assert_eq!(metrics.active_leases.load(Ordering::Relaxed), 0);
295        assert_eq!(metrics.releases.load(Ordering::Relaxed), 1);
296        assert_eq!(metrics.statements_executed.load(Ordering::Relaxed), 5);
297    }
298
299    #[test]
300    fn test_peak_active_leases() {
301        let metrics = PoolModeMetrics::new();
302
303        metrics.record_acquire(PoolingMode::Session);
304        metrics.record_acquire(PoolingMode::Session);
305        metrics.record_acquire(PoolingMode::Session);
306        assert_eq!(metrics.peak_active_leases.load(Ordering::Relaxed), 3);
307
308        metrics.record_release(PoolingMode::Session, 100, 1);
309        metrics.record_release(PoolingMode::Session, 100, 1);
310        assert_eq!(metrics.peak_active_leases.load(Ordering::Relaxed), 3);
311
312        metrics.record_acquire(PoolingMode::Session);
313        assert_eq!(metrics.peak_active_leases.load(Ordering::Relaxed), 3);
314
315        metrics.record_acquire(PoolingMode::Session);
316        metrics.record_acquire(PoolingMode::Session);
317        assert_eq!(metrics.peak_active_leases.load(Ordering::Relaxed), 4);
318    }
319
320    #[test]
321    fn test_mode_stats() {
322        let metrics = PoolModeMetrics::new();
323
324        metrics.record_acquire(PoolingMode::Transaction);
325        metrics.record_release(PoolingMode::Transaction, 200, 10);
326
327        metrics.record_acquire(PoolingMode::Transaction);
328        metrics.record_release(PoolingMode::Transaction, 100, 5);
329
330        let snapshot = metrics.snapshot();
331        let txn_stats = snapshot.mode_stats.get(&PoolingMode::Transaction).unwrap();
332
333        assert_eq!(txn_stats.total_acquires, 2);
334        assert_eq!(txn_stats.total_releases, 2);
335        assert_eq!(txn_stats.avg_statements_per_lease, 7.5); // (10 + 5) / 2
336    }
337
338    #[test]
339    fn test_reset() {
340        let metrics = PoolModeMetrics::new();
341
342        metrics.record_acquire(PoolingMode::Session);
343        metrics.record_connection_created();
344        metrics.record_transaction_complete();
345
346        metrics.reset();
347
348        let snapshot = metrics.snapshot();
349        assert_eq!(snapshot.acquires, 0);
350        assert_eq!(snapshot.connections_created, 0);
351        assert_eq!(snapshot.transactions_completed, 0);
352        // active_leases is NOT reset
353        assert_eq!(snapshot.active_leases, 1);
354    }
355
356    #[test]
357    fn test_snapshot_calculations() {
358        let metrics = PoolModeMetrics::new();
359
360        // Perfect efficiency
361        for _ in 0..10 {
362            metrics.record_acquire(PoolingMode::Transaction);
363            metrics.record_release(PoolingMode::Transaction, 100, 1);
364        }
365
366        let snapshot = metrics.snapshot();
367        assert_eq!(snapshot.connection_efficiency(), 1.0);
368        assert_eq!(snapshot.acquire_success_rate(), 1.0);
369    }
370}