eventcore_postgres/
monitoring.rs

1//! Connection pool monitoring and metrics for `PostgreSQL` event store
2//!
3//! This module provides comprehensive monitoring capabilities for database
4//! connection pools, including metrics collection and health tracking.
5
6#![allow(clippy::missing_const_for_fn)]
7#![allow(clippy::cast_possible_truncation)]
8#![allow(clippy::cast_lossless)]
9#![allow(clippy::cast_precision_loss)]
10
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use chrono::{DateTime, Utc};
16use serde::{Deserialize, Serialize};
17use tracing::{debug, instrument, warn};
18
19use crate::{PoolStatus, PostgresError};
20
21/// Connection pool metrics
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct PoolMetrics {
24    /// Current number of connections in the pool
25    pub current_connections: u32,
26    /// Number of idle connections
27    pub idle_connections: u32,
28    /// Number of active connections
29    pub active_connections: u32,
30    /// Total connections created since startup
31    pub total_connections_created: u64,
32    /// Total connections closed since startup
33    pub total_connections_closed: u64,
34    /// Average connection acquisition time
35    pub avg_acquisition_time: Duration,
36    /// Peak number of connections
37    pub peak_connections: u32,
38    /// Number of connection timeouts
39    pub connection_timeouts: u64,
40    /// Number of connection errors
41    pub connection_errors: u64,
42    /// Pool utilization percentage (0-100)
43    pub utilization_percent: f64,
44    /// Whether the pool is healthy
45    pub is_healthy: bool,
46    /// Last update timestamp
47    pub last_updated: DateTime<Utc>,
48}
49
50/// Pool monitor for tracking connection metrics
51#[derive(Debug)]
52pub struct PoolMonitor {
53    /// Metrics counters
54    connections_created: AtomicU64,
55    connections_closed: AtomicU64,
56    connection_timeouts: AtomicU64,
57    connection_errors: AtomicU64,
58    peak_connections: AtomicU64,
59
60    /// Timing metrics
61    total_acquisition_time: AtomicU64,
62    acquisition_count: AtomicU64,
63
64    /// Configuration
65    max_connections: u32,
66    warning_threshold: f64, // Utilization percentage threshold for warnings
67}
68
69impl PoolMonitor {
70    /// Create a new pool monitor
71    pub fn new(max_connections: u32) -> Self {
72        Self {
73            connections_created: AtomicU64::new(0),
74            connections_closed: AtomicU64::new(0),
75            connection_timeouts: AtomicU64::new(0),
76            connection_errors: AtomicU64::new(0),
77            peak_connections: AtomicU64::new(0),
78            total_acquisition_time: AtomicU64::new(0),
79            acquisition_count: AtomicU64::new(0),
80            max_connections,
81            warning_threshold: 80.0, // Warn when 80% utilized
82        }
83    }
84
85    /// Record a connection creation
86    pub fn record_connection_created(&self) {
87        self.connections_created.fetch_add(1, Ordering::Relaxed);
88    }
89
90    /// Record a connection closure
91    pub fn record_connection_closed(&self) {
92        self.connections_closed.fetch_add(1, Ordering::Relaxed);
93    }
94
95    /// Record a connection timeout
96    pub fn record_connection_timeout(&self) {
97        self.connection_timeouts.fetch_add(1, Ordering::Relaxed);
98        debug!("Connection timeout recorded");
99    }
100
101    /// Record a connection error
102    pub fn record_connection_error(&self) {
103        self.connection_errors.fetch_add(1, Ordering::Relaxed);
104        debug!("Connection error recorded");
105    }
106
107    /// Record connection acquisition time
108    pub fn record_acquisition_time(&self, duration: Duration) {
109        self.total_acquisition_time
110            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
111        self.acquisition_count.fetch_add(1, Ordering::Relaxed);
112    }
113
114    /// Update peak connections if current value is higher
115    pub fn update_peak_connections(&self, current: u32) {
116        let current_peak = self.peak_connections.load(Ordering::Relaxed) as u32;
117        if current > current_peak {
118            self.peak_connections
119                .store(current as u64, Ordering::Relaxed);
120        }
121    }
122
123    /// Get current metrics snapshot
124    #[instrument(skip(self))]
125    pub fn get_metrics(&self, pool_status: &PoolStatus) -> PoolMetrics {
126        let current_connections = pool_status.size;
127        let idle_connections = pool_status.idle;
128        let active_connections = current_connections.saturating_sub(idle_connections);
129
130        // Update peak if necessary
131        self.update_peak_connections(current_connections);
132
133        // Calculate average acquisition time
134        let total_time = self.total_acquisition_time.load(Ordering::Relaxed);
135        let count = self.acquisition_count.load(Ordering::Relaxed);
136        let avg_acquisition_time = if count > 0 {
137            Duration::from_micros(total_time / count)
138        } else {
139            Duration::ZERO
140        };
141
142        // Calculate utilization
143        let utilization_percent = if self.max_connections > 0 {
144            (current_connections as f64 / self.max_connections as f64) * 100.0
145        } else {
146            0.0
147        };
148
149        // Determine health status
150        let is_healthy = self.is_pool_healthy(pool_status, utilization_percent);
151
152        // Log warnings if necessary
153        if utilization_percent > self.warning_threshold {
154            warn!(
155                "Pool utilization high: {:.1}% ({}/{})",
156                utilization_percent, current_connections, self.max_connections
157            );
158        }
159
160        PoolMetrics {
161            current_connections,
162            idle_connections,
163            active_connections,
164            total_connections_created: self.connections_created.load(Ordering::Relaxed),
165            total_connections_closed: self.connections_closed.load(Ordering::Relaxed),
166            avg_acquisition_time,
167            peak_connections: self.peak_connections.load(Ordering::Relaxed) as u32,
168            connection_timeouts: self.connection_timeouts.load(Ordering::Relaxed),
169            connection_errors: self.connection_errors.load(Ordering::Relaxed),
170            utilization_percent,
171            is_healthy,
172            last_updated: Utc::now(),
173        }
174    }
175
176    /// Determine if the pool is healthy based on current metrics
177    fn is_pool_healthy(&self, pool_status: &PoolStatus, utilization_percent: f64) -> bool {
178        // Pool is unhealthy if:
179        // 1. It's closed
180        // 2. Utilization is too high (>95%)
181        // 3. No idle connections and high utilization
182        // 4. Recent errors exceed threshold
183
184        if pool_status.is_closed {
185            return false;
186        }
187
188        if utilization_percent > 95.0 {
189            return false;
190        }
191
192        if pool_status.idle == 0 && utilization_percent > 80.0 {
193            return false;
194        }
195
196        // Check recent error rate (simple heuristic)
197        let recent_errors = self.connection_errors.load(Ordering::Relaxed);
198        let recent_acquisitions = self.acquisition_count.load(Ordering::Relaxed);
199
200        if recent_acquisitions > 0 {
201            let error_rate = recent_errors as f64 / recent_acquisitions as f64;
202            if error_rate > 0.1 {
203                // More than 10% error rate
204                return false;
205            }
206        }
207
208        true
209    }
210
211    /// Reset all metrics (useful for testing or periodic resets)
212    pub fn reset_metrics(&self) {
213        self.connections_created.store(0, Ordering::Relaxed);
214        self.connections_closed.store(0, Ordering::Relaxed);
215        self.connection_timeouts.store(0, Ordering::Relaxed);
216        self.connection_errors.store(0, Ordering::Relaxed);
217        self.peak_connections.store(0, Ordering::Relaxed);
218        self.total_acquisition_time.store(0, Ordering::Relaxed);
219        self.acquisition_count.store(0, Ordering::Relaxed);
220        debug!("Pool metrics reset");
221    }
222
223    /// Get metrics as JSON string for external monitoring systems
224    pub fn get_metrics_json(&self, pool_status: &PoolStatus) -> Result<String, PostgresError> {
225        let metrics = self.get_metrics(pool_status);
226        serde_json::to_string(&metrics).map_err(PostgresError::Serialization)
227    }
228}
229
230/// Utility for measuring connection acquisition time
231pub struct AcquisitionTimer {
232    start: Instant,
233    monitor: Arc<PoolMonitor>,
234}
235
236impl AcquisitionTimer {
237    /// Start a new acquisition timer
238    pub fn new(monitor: Arc<PoolMonitor>) -> Self {
239        Self {
240            start: Instant::now(),
241            monitor,
242        }
243    }
244
245    /// Complete the timer and record the duration
246    pub fn complete(self) {
247        let duration = self.start.elapsed();
248        self.monitor.record_acquisition_time(duration);
249    }
250}
251
252/// Background task for periodic pool monitoring
253pub struct PoolMonitoringTask {
254    monitor: Arc<PoolMonitor>,
255    interval: Duration,
256    stop_signal: tokio::sync::watch::Receiver<bool>,
257}
258
259impl PoolMonitoringTask {
260    /// Create a new monitoring task
261    pub fn new(
262        monitor: Arc<PoolMonitor>,
263        interval: Duration,
264        stop_signal: tokio::sync::watch::Receiver<bool>,
265    ) -> Self {
266        Self {
267            monitor,
268            interval,
269            stop_signal,
270        }
271    }
272
273    /// Run the monitoring task
274    pub async fn run<F>(mut self, mut get_pool_status: F)
275    where
276        F: FnMut() -> PoolStatus + Send + 'static,
277    {
278        let mut interval_timer = tokio::time::interval(self.interval);
279
280        loop {
281            tokio::select! {
282                _ = interval_timer.tick() => {
283                    let pool_status = get_pool_status();
284                    let metrics = self.monitor.get_metrics(&pool_status);
285
286                    debug!("Pool metrics: {:?}", metrics);
287
288                    // Log warnings for concerning metrics
289                    if !metrics.is_healthy {
290                        warn!("Pool health check failed: {:?}", metrics);
291                    }
292
293                    if metrics.connection_errors > 0 {
294                        warn!("Connection errors detected: {}", metrics.connection_errors);
295                    }
296
297                    if metrics.connection_timeouts > 0 {
298                        warn!("Connection timeouts detected: {}", metrics.connection_timeouts);
299                    }
300                }
301                _ = self.stop_signal.changed() => {
302                    if *self.stop_signal.borrow() {
303                        debug!("Pool monitoring task stopped");
304                        break;
305                    }
306                }
307            }
308        }
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315
316    #[test]
317    fn test_pool_monitor_creation() {
318        let monitor = PoolMonitor::new(10);
319
320        // Initial values should be zero
321        assert_eq!(monitor.connections_created.load(Ordering::Relaxed), 0);
322        assert_eq!(monitor.connections_closed.load(Ordering::Relaxed), 0);
323        assert_eq!(monitor.max_connections, 10);
324    }
325
326    #[test]
327    fn test_pool_monitor_record_operations() {
328        let monitor = PoolMonitor::new(10);
329
330        // Record some operations
331        monitor.record_connection_created();
332        monitor.record_connection_created();
333        monitor.record_connection_closed();
334        monitor.record_connection_timeout();
335        monitor.record_connection_error();
336
337        // Verify counters
338        assert_eq!(monitor.connections_created.load(Ordering::Relaxed), 2);
339        assert_eq!(monitor.connections_closed.load(Ordering::Relaxed), 1);
340        assert_eq!(monitor.connection_timeouts.load(Ordering::Relaxed), 1);
341        assert_eq!(monitor.connection_errors.load(Ordering::Relaxed), 1);
342    }
343
344    #[test]
345    fn test_pool_monitor_metrics() {
346        let monitor = PoolMonitor::new(10);
347
348        // Set up some data
349        monitor.record_connection_created();
350        monitor.record_connection_created();
351        monitor.record_acquisition_time(Duration::from_millis(50));
352        monitor.record_acquisition_time(Duration::from_millis(100));
353
354        let pool_status = PoolStatus {
355            size: 5,
356            idle: 2,
357            is_closed: false,
358        };
359
360        let metrics = monitor.get_metrics(&pool_status);
361
362        assert_eq!(metrics.current_connections, 5);
363        assert_eq!(metrics.idle_connections, 2);
364        assert_eq!(metrics.active_connections, 3);
365        assert_eq!(metrics.total_connections_created, 2);
366        assert_eq!(metrics.avg_acquisition_time, Duration::from_millis(75)); // (50 + 100) / 2
367        assert!((metrics.utilization_percent - 50.0).abs() < f64::EPSILON); // 5 / 10 * 100
368        assert!(metrics.is_healthy);
369    }
370
371    #[test]
372    fn test_pool_health_assessment() {
373        let monitor = PoolMonitor::new(10);
374
375        // Healthy pool
376        let healthy_status = PoolStatus {
377            size: 5,
378            idle: 2,
379            is_closed: false,
380        };
381        assert!(monitor.is_pool_healthy(&healthy_status, 50.0));
382
383        // Closed pool (unhealthy)
384        let closed_status = PoolStatus {
385            size: 5,
386            idle: 2,
387            is_closed: true,
388        };
389        assert!(!monitor.is_pool_healthy(&closed_status, 50.0));
390
391        // Over-utilized pool (unhealthy)
392        let overutil_status = PoolStatus {
393            size: 10,
394            idle: 0,
395            is_closed: false,
396        };
397        assert!(!monitor.is_pool_healthy(&overutil_status, 96.0));
398    }
399
400    #[test]
401    fn test_acquisition_timer() {
402        let monitor = Arc::new(PoolMonitor::new(10));
403        let timer = AcquisitionTimer::new(Arc::clone(&monitor));
404
405        // Add a small delay to ensure non-zero timing
406        std::thread::sleep(std::time::Duration::from_millis(1));
407
408        // Complete the timer
409        timer.complete();
410
411        // Should have recorded at least one acquisition
412        assert_eq!(monitor.acquisition_count.load(Ordering::Relaxed), 1);
413        assert!(monitor.total_acquisition_time.load(Ordering::Relaxed) > 0);
414    }
415
416    #[test]
417    fn test_metrics_reset() {
418        let monitor = PoolMonitor::new(10);
419
420        // Add some data
421        monitor.record_connection_created();
422        monitor.record_connection_error();
423        monitor.record_acquisition_time(Duration::from_millis(100));
424
425        // Verify data exists
426        assert!(monitor.connections_created.load(Ordering::Relaxed) > 0);
427        assert!(monitor.connection_errors.load(Ordering::Relaxed) > 0);
428        assert!(monitor.acquisition_count.load(Ordering::Relaxed) > 0);
429
430        // Reset and verify
431        monitor.reset_metrics();
432        assert_eq!(monitor.connections_created.load(Ordering::Relaxed), 0);
433        assert_eq!(monitor.connection_errors.load(Ordering::Relaxed), 0);
434        assert_eq!(monitor.acquisition_count.load(Ordering::Relaxed), 0);
435    }
436}