kaccy_db/
pool_monitor.rs

1//! Connection Pool Monitor
2//!
3//! Real-time monitoring and alerting for database connection pools.
4//! Provides detailed metrics, health checks, and automatic anomaly detection.
5//!
6//! # Features
7//!
8//! - Real-time pool utilization tracking
9//! - Connection acquisition time monitoring
10//! - Pool saturation detection
11//! - Automatic alert generation
12//! - Historical metrics collection
13//! - Trend analysis for capacity planning
14//! - Configurable alert thresholds
15//!
16//! # Example
17//!
18//! ```rust
19//! use kaccy_db::pool_monitor::{PoolMonitor, MonitorConfig};
20//! use sqlx::PgPool;
21//! use std::time::Duration;
22//!
23//! let config = MonitorConfig {
24//!     high_utilization_threshold: 0.8,
25//!     critical_utilization_threshold: 0.95,
26//!     slow_acquisition_threshold_ms: 1000,
27//!     collection_interval: Duration::from_secs(30),
28//!     max_history_points: 1000,
29//! };
30//!
31//! let monitor = PoolMonitor::new(config);
32//! ```
33
34use chrono::{DateTime, Utc};
35use serde::{Deserialize, Serialize};
36use sqlx::PgPool;
37use std::collections::VecDeque;
38use std::sync::{Arc, Mutex};
39use std::time::Duration;
40use tracing::{debug, info};
41
42/// Configuration for the pool monitor
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MonitorConfig {
45    /// Utilization threshold for high utilization warning (0.0-1.0)
46    pub high_utilization_threshold: f64,
47
48    /// Utilization threshold for critical alert (0.0-1.0)
49    pub critical_utilization_threshold: f64,
50
51    /// Connection acquisition time threshold (ms) for slow acquisition warning
52    pub slow_acquisition_threshold_ms: u64,
53
54    /// How often to collect metrics
55    pub collection_interval: Duration,
56
57    /// Maximum number of historical data points to keep
58    pub max_history_points: usize,
59}
60
61impl Default for MonitorConfig {
62    fn default() -> Self {
63        Self {
64            high_utilization_threshold: 0.8,
65            critical_utilization_threshold: 0.95,
66            slow_acquisition_threshold_ms: 1000,
67            collection_interval: Duration::from_secs(30),
68            max_history_points: 1000,
69        }
70    }
71}
72
73/// Alert severity level
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
75pub enum AlertLevel {
76    /// Informational alert
77    Info,
78
79    /// Warning that requires attention
80    Warning,
81
82    /// Critical issue requiring immediate action
83    Critical,
84}
85
86/// Pool monitoring alert
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct PoolAlert {
89    /// Alert severity level
90    pub level: AlertLevel,
91
92    /// Alert message
93    pub message: String,
94
95    /// Current metric value that triggered the alert
96    pub current_value: f64,
97
98    /// Threshold value
99    pub threshold: f64,
100
101    /// When the alert was triggered
102    pub triggered_at: DateTime<Utc>,
103
104    /// Recommended action
105    pub recommendation: String,
106}
107
108/// Pool metrics snapshot
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct PoolMetricsSnapshot {
111    /// When this snapshot was taken
112    pub timestamp: DateTime<Utc>,
113
114    /// Total connections in pool
115    pub total_connections: u32,
116
117    /// Active connections
118    pub active_connections: u32,
119
120    /// Idle connections
121    pub idle_connections: u32,
122
123    /// Pool utilization ratio (0.0-1.0)
124    pub utilization: f64,
125
126    /// Average connection acquisition time (ms) - estimated
127    pub avg_acquisition_time_ms: f64,
128
129    /// Number of connection timeouts
130    pub timeouts: u64,
131}
132
133impl PoolMetricsSnapshot {
134    /// Create a new metrics snapshot from pool
135    pub fn from_pool(pool: &PgPool) -> Self {
136        let size = pool.size();
137        let idle = pool.num_idle() as u32;
138        let active = size.saturating_sub(idle);
139        let max_size = pool.options().get_max_connections();
140
141        let utilization = if max_size > 0 {
142            size as f64 / max_size as f64
143        } else {
144            0.0
145        };
146
147        Self {
148            timestamp: Utc::now(),
149            total_connections: size,
150            active_connections: active,
151            idle_connections: idle,
152            utilization,
153            avg_acquisition_time_ms: 0.0, // Would need tracking layer to measure this
154            timeouts: 0,
155        }
156    }
157
158    /// Check if this snapshot indicates high load
159    pub fn is_high_load(&self, threshold: f64) -> bool {
160        self.utilization >= threshold
161    }
162
163    /// Check if pool is saturated (no idle connections)
164    pub fn is_saturated(&self) -> bool {
165        self.idle_connections == 0
166    }
167}
168
169/// Pool health status
170#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
171pub enum PoolHealth {
172    /// Pool is healthy
173    Healthy,
174
175    /// Pool is experiencing high load
176    HighLoad,
177
178    /// Pool is critically loaded
179    Critical,
180
181    /// Pool is saturated
182    Saturated,
183}
184
185impl PoolHealth {
186    /// Get a human-readable description
187    pub fn description(&self) -> &'static str {
188        match self {
189            Self::Healthy => "Pool is operating normally",
190            Self::HighLoad => "Pool utilization is high",
191            Self::Critical => "Pool utilization is critically high",
192            Self::Saturated => "Pool is fully saturated",
193        }
194    }
195}
196
197/// Pool monitoring report
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct MonitoringReport {
200    /// When this report was generated
201    pub generated_at: DateTime<Utc>,
202
203    /// Current pool health status
204    pub health: PoolHealth,
205
206    /// Current metrics snapshot
207    pub current_metrics: PoolMetricsSnapshot,
208
209    /// Active alerts
210    pub alerts: Vec<PoolAlert>,
211
212    /// Historical metrics (recent)
213    pub history: Vec<PoolMetricsSnapshot>,
214
215    /// Capacity statistics
216    pub capacity_stats: CapacityStats,
217}
218
219/// Capacity statistics
220#[derive(Debug, Clone, Serialize, Deserialize)]
221pub struct CapacityStats {
222    /// Average utilization over history
223    pub avg_utilization: f64,
224
225    /// Peak utilization
226    pub peak_utilization: f64,
227
228    /// Time of peak utilization
229    pub peak_at: Option<DateTime<Utc>>,
230
231    /// Utilization trend (positive = increasing, negative = decreasing)
232    pub trend: f64,
233
234    /// Estimated time until capacity exhaustion (if trend continues)
235    pub time_to_exhaustion: Option<Duration>,
236}
237
238/// Connection pool monitor
239pub struct PoolMonitor {
240    config: MonitorConfig,
241    history: Arc<Mutex<VecDeque<PoolMetricsSnapshot>>>,
242}
243
244impl PoolMonitor {
245    /// Create a new pool monitor
246    pub fn new(config: MonitorConfig) -> Self {
247        Self {
248            config,
249            history: Arc::new(Mutex::new(VecDeque::new())),
250        }
251    }
252
253    /// Create a monitor with default configuration
254    pub fn with_defaults() -> Self {
255        Self::new(MonitorConfig::default())
256    }
257
258    /// Collect current metrics and update history
259    pub fn collect_metrics(&self, pool: &PgPool) -> PoolMetricsSnapshot {
260        let snapshot = PoolMetricsSnapshot::from_pool(pool);
261
262        if let Ok(mut history) = self.history.lock() {
263            history.push_back(snapshot.clone());
264
265            // Keep history within limits
266            while history.len() > self.config.max_history_points {
267                history.pop_front();
268            }
269        }
270
271        debug!(
272            utilization = snapshot.utilization,
273            active = snapshot.active_connections,
274            idle = snapshot.idle_connections,
275            "Collected pool metrics"
276        );
277
278        snapshot
279    }
280
281    /// Generate monitoring report with alerts
282    pub fn generate_report(&self, pool: &PgPool) -> MonitoringReport {
283        let current_metrics = self.collect_metrics(pool);
284        let history = self.get_history();
285
286        let health = self.determine_health(&current_metrics);
287        let alerts = self.check_alerts(&current_metrics, &history);
288        let capacity_stats = self.calculate_capacity_stats(&history);
289
290        info!(
291            health = ?health,
292            alerts = alerts.len(),
293            utilization = current_metrics.utilization,
294            "Generated monitoring report"
295        );
296
297        MonitoringReport {
298            generated_at: Utc::now(),
299            health,
300            current_metrics,
301            alerts,
302            history,
303            capacity_stats,
304        }
305    }
306
307    /// Determine pool health status
308    fn determine_health(&self, metrics: &PoolMetricsSnapshot) -> PoolHealth {
309        if metrics.is_saturated() {
310            PoolHealth::Saturated
311        } else if metrics.utilization >= self.config.critical_utilization_threshold {
312            PoolHealth::Critical
313        } else if metrics.utilization >= self.config.high_utilization_threshold {
314            PoolHealth::HighLoad
315        } else {
316            PoolHealth::Healthy
317        }
318    }
319
320    /// Check for alert conditions
321    fn check_alerts(
322        &self,
323        current: &PoolMetricsSnapshot,
324        history: &[PoolMetricsSnapshot],
325    ) -> Vec<PoolAlert> {
326        let mut alerts = Vec::new();
327
328        // High utilization alert
329        if current.utilization >= self.config.high_utilization_threshold {
330            let level = if current.utilization >= self.config.critical_utilization_threshold {
331                AlertLevel::Critical
332            } else {
333                AlertLevel::Warning
334            };
335
336            alerts.push(PoolAlert {
337                level,
338                message: "High pool utilization detected".to_string(),
339                current_value: current.utilization,
340                threshold: self.config.high_utilization_threshold,
341                triggered_at: Utc::now(),
342                recommendation:
343                    "Consider increasing max_connections or optimizing query performance"
344                        .to_string(),
345            });
346        }
347
348        // Pool saturation alert
349        if current.is_saturated() {
350            alerts.push(PoolAlert {
351                level: AlertLevel::Critical,
352                message: "Pool is fully saturated - no idle connections available".to_string(),
353                current_value: 0.0,
354                threshold: 1.0,
355                triggered_at: Utc::now(),
356                recommendation:
357                    "Immediate action required: increase pool size or reduce concurrent connections"
358                        .to_string(),
359            });
360        }
361
362        // Rapid utilization increase alert
363        if history.len() >= 3 {
364            let recent_avg = history
365                .iter()
366                .rev()
367                .take(3)
368                .map(|s| s.utilization)
369                .sum::<f64>()
370                / 3.0;
371
372            if current.utilization > recent_avg + 0.2 {
373                alerts.push(PoolAlert {
374                    level: AlertLevel::Warning,
375                    message: "Rapid increase in pool utilization detected".to_string(),
376                    current_value: current.utilization,
377                    threshold: recent_avg,
378                    triggered_at: Utc::now(),
379                    recommendation: "Monitor for potential traffic spike or connection leak"
380                        .to_string(),
381                });
382            }
383        }
384
385        alerts
386    }
387
388    /// Calculate capacity statistics from history
389    fn calculate_capacity_stats(&self, history: &[PoolMetricsSnapshot]) -> CapacityStats {
390        if history.is_empty() {
391            return CapacityStats {
392                avg_utilization: 0.0,
393                peak_utilization: 0.0,
394                peak_at: None,
395                trend: 0.0,
396                time_to_exhaustion: None,
397            };
398        }
399
400        let avg_utilization =
401            history.iter().map(|s| s.utilization).sum::<f64>() / history.len() as f64;
402
403        let peak = history
404            .iter()
405            .max_by(|a, b| a.utilization.partial_cmp(&b.utilization).unwrap())
406            .unwrap();
407
408        let peak_utilization = peak.utilization;
409        let peak_at = Some(peak.timestamp);
410
411        // Calculate trend using simple linear regression
412        let trend = self.calculate_trend(history);
413
414        // Estimate time to exhaustion if trend is positive
415        let time_to_exhaustion = if trend > 0.0 {
416            let current_utilization = history.last().map(|s| s.utilization).unwrap_or(0.0);
417            let remaining = 1.0 - current_utilization;
418            if remaining > 0.0 {
419                let hours = remaining / (trend * 24.0); // trend is per day, convert to hours
420                Some(Duration::from_secs((hours * 3600.0) as u64))
421            } else {
422                None
423            }
424        } else {
425            None
426        };
427
428        CapacityStats {
429            avg_utilization,
430            peak_utilization,
431            peak_at,
432            trend,
433            time_to_exhaustion,
434        }
435    }
436
437    /// Calculate utilization trend (change per day)
438    fn calculate_trend(&self, history: &[PoolMetricsSnapshot]) -> f64 {
439        if history.len() < 2 {
440            return 0.0;
441        }
442
443        let first = &history[0];
444        let last = history.last().unwrap();
445
446        let time_diff = last.timestamp.signed_duration_since(first.timestamp);
447        let days = time_diff.num_seconds() as f64 / 86400.0;
448
449        if days > 0.0 {
450            (last.utilization - first.utilization) / days
451        } else {
452            0.0
453        }
454    }
455
456    /// Get historical metrics
457    pub fn get_history(&self) -> Vec<PoolMetricsSnapshot> {
458        self.history
459            .lock()
460            .ok()
461            .map(|h| h.iter().cloned().collect())
462            .unwrap_or_default()
463    }
464
465    /// Clear historical data
466    pub fn clear_history(&self) {
467        if let Ok(mut history) = self.history.lock() {
468            history.clear();
469        }
470    }
471
472    /// Get the number of historical data points
473    pub fn history_count(&self) -> usize {
474        self.history.lock().ok().map(|h| h.len()).unwrap_or(0)
475    }
476}
477
478#[cfg(test)]
479mod tests {
480    use super::*;
481
482    #[test]
483    fn test_monitor_config_default() {
484        let config = MonitorConfig::default();
485        assert_eq!(config.high_utilization_threshold, 0.8);
486        assert_eq!(config.critical_utilization_threshold, 0.95);
487        assert_eq!(config.slow_acquisition_threshold_ms, 1000);
488    }
489
490    #[test]
491    fn test_alert_level_ordering() {
492        assert_ne!(AlertLevel::Info, AlertLevel::Warning);
493        assert_ne!(AlertLevel::Warning, AlertLevel::Critical);
494    }
495
496    #[test]
497    fn test_pool_health_description() {
498        assert_eq!(
499            PoolHealth::Healthy.description(),
500            "Pool is operating normally"
501        );
502        assert_eq!(
503            PoolHealth::Critical.description(),
504            "Pool utilization is critically high"
505        );
506    }
507
508    #[test]
509    fn test_metrics_snapshot_high_load() {
510        let snapshot = PoolMetricsSnapshot {
511            timestamp: Utc::now(),
512            total_connections: 8,
513            active_connections: 7,
514            idle_connections: 1,
515            utilization: 0.85,
516            avg_acquisition_time_ms: 100.0,
517            timeouts: 0,
518        };
519
520        assert!(snapshot.is_high_load(0.8));
521        assert!(!snapshot.is_high_load(0.9));
522    }
523
524    #[test]
525    fn test_metrics_snapshot_saturated() {
526        let snapshot = PoolMetricsSnapshot {
527            timestamp: Utc::now(),
528            total_connections: 10,
529            active_connections: 10,
530            idle_connections: 0,
531            utilization: 1.0,
532            avg_acquisition_time_ms: 500.0,
533            timeouts: 5,
534        };
535
536        assert!(snapshot.is_saturated());
537    }
538
539    #[test]
540    fn test_pool_alert_serialization() {
541        let alert = PoolAlert {
542            level: AlertLevel::Critical,
543            message: "Pool exhausted".to_string(),
544            current_value: 1.0,
545            threshold: 0.95,
546            triggered_at: Utc::now(),
547            recommendation: "Increase pool size".to_string(),
548        };
549
550        let json = serde_json::to_string(&alert).unwrap();
551        assert!(json.contains("Critical"));
552        assert!(json.contains("Pool exhausted"));
553    }
554
555    #[test]
556    fn test_monitor_with_defaults() {
557        let monitor = PoolMonitor::with_defaults();
558        assert_eq!(monitor.history_count(), 0);
559    }
560
561    #[test]
562    fn test_monitor_clear_history() {
563        let monitor = PoolMonitor::with_defaults();
564        monitor.clear_history();
565        assert_eq!(monitor.history_count(), 0);
566    }
567
568    #[test]
569    fn test_capacity_stats_serialization() {
570        let stats = CapacityStats {
571            avg_utilization: 0.7,
572            peak_utilization: 0.95,
573            peak_at: Some(Utc::now()),
574            trend: 0.05,
575            time_to_exhaustion: Some(Duration::from_secs(3600)),
576        };
577
578        let json = serde_json::to_string(&stats).unwrap();
579        assert!(json.contains("avg_utilization"));
580        assert!(json.contains("peak_utilization"));
581    }
582
583    #[test]
584    fn test_monitoring_report_structure() {
585        let snapshot = PoolMetricsSnapshot {
586            timestamp: Utc::now(),
587            total_connections: 5,
588            active_connections: 3,
589            idle_connections: 2,
590            utilization: 0.5,
591            avg_acquisition_time_ms: 50.0,
592            timeouts: 0,
593        };
594
595        let report = MonitoringReport {
596            generated_at: Utc::now(),
597            health: PoolHealth::Healthy,
598            current_metrics: snapshot,
599            alerts: vec![],
600            history: vec![],
601            capacity_stats: CapacityStats {
602                avg_utilization: 0.5,
603                peak_utilization: 0.7,
604                peak_at: None,
605                trend: 0.0,
606                time_to_exhaustion: None,
607            },
608        };
609
610        let json = serde_json::to_string(&report).unwrap();
611        assert!(json.contains("Healthy"));
612    }
613}