1use chrono::{DateTime, Utc};
35use serde::{Deserialize, Serialize};
36use sqlx::PgPool;
37use std::collections::VecDeque;
38use std::sync::{Arc, Mutex};
39use std::time::Duration;
40use tracing::{debug, info};
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MonitorConfig {
45 pub high_utilization_threshold: f64,
47
48 pub critical_utilization_threshold: f64,
50
51 pub slow_acquisition_threshold_ms: u64,
53
54 pub collection_interval: Duration,
56
57 pub max_history_points: usize,
59}
60
61impl Default for MonitorConfig {
62 fn default() -> Self {
63 Self {
64 high_utilization_threshold: 0.8,
65 critical_utilization_threshold: 0.95,
66 slow_acquisition_threshold_ms: 1000,
67 collection_interval: Duration::from_secs(30),
68 max_history_points: 1000,
69 }
70 }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
75pub enum AlertLevel {
76 Info,
78
79 Warning,
81
82 Critical,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct PoolAlert {
89 pub level: AlertLevel,
91
92 pub message: String,
94
95 pub current_value: f64,
97
98 pub threshold: f64,
100
101 pub triggered_at: DateTime<Utc>,
103
104 pub recommendation: String,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct PoolMetricsSnapshot {
111 pub timestamp: DateTime<Utc>,
113
114 pub total_connections: u32,
116
117 pub active_connections: u32,
119
120 pub idle_connections: u32,
122
123 pub utilization: f64,
125
126 pub avg_acquisition_time_ms: f64,
128
129 pub timeouts: u64,
131}
132
133impl PoolMetricsSnapshot {
134 pub fn from_pool(pool: &PgPool) -> Self {
136 let size = pool.size();
137 let idle = pool.num_idle() as u32;
138 let active = size.saturating_sub(idle);
139 let max_size = pool.options().get_max_connections();
140
141 let utilization = if max_size > 0 {
142 size as f64 / max_size as f64
143 } else {
144 0.0
145 };
146
147 Self {
148 timestamp: Utc::now(),
149 total_connections: size,
150 active_connections: active,
151 idle_connections: idle,
152 utilization,
153 avg_acquisition_time_ms: 0.0, timeouts: 0,
155 }
156 }
157
158 pub fn is_high_load(&self, threshold: f64) -> bool {
160 self.utilization >= threshold
161 }
162
163 pub fn is_saturated(&self) -> bool {
165 self.idle_connections == 0
166 }
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
171pub enum PoolHealth {
172 Healthy,
174
175 HighLoad,
177
178 Critical,
180
181 Saturated,
183}
184
185impl PoolHealth {
186 pub fn description(&self) -> &'static str {
188 match self {
189 Self::Healthy => "Pool is operating normally",
190 Self::HighLoad => "Pool utilization is high",
191 Self::Critical => "Pool utilization is critically high",
192 Self::Saturated => "Pool is fully saturated",
193 }
194 }
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct MonitoringReport {
200 pub generated_at: DateTime<Utc>,
202
203 pub health: PoolHealth,
205
206 pub current_metrics: PoolMetricsSnapshot,
208
209 pub alerts: Vec<PoolAlert>,
211
212 pub history: Vec<PoolMetricsSnapshot>,
214
215 pub capacity_stats: CapacityStats,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
221pub struct CapacityStats {
222 pub avg_utilization: f64,
224
225 pub peak_utilization: f64,
227
228 pub peak_at: Option<DateTime<Utc>>,
230
231 pub trend: f64,
233
234 pub time_to_exhaustion: Option<Duration>,
236}
237
238pub struct PoolMonitor {
240 config: MonitorConfig,
241 history: Arc<Mutex<VecDeque<PoolMetricsSnapshot>>>,
242}
243
244impl PoolMonitor {
245 pub fn new(config: MonitorConfig) -> Self {
247 Self {
248 config,
249 history: Arc::new(Mutex::new(VecDeque::new())),
250 }
251 }
252
253 pub fn with_defaults() -> Self {
255 Self::new(MonitorConfig::default())
256 }
257
258 pub fn collect_metrics(&self, pool: &PgPool) -> PoolMetricsSnapshot {
260 let snapshot = PoolMetricsSnapshot::from_pool(pool);
261
262 if let Ok(mut history) = self.history.lock() {
263 history.push_back(snapshot.clone());
264
265 while history.len() > self.config.max_history_points {
267 history.pop_front();
268 }
269 }
270
271 debug!(
272 utilization = snapshot.utilization,
273 active = snapshot.active_connections,
274 idle = snapshot.idle_connections,
275 "Collected pool metrics"
276 );
277
278 snapshot
279 }
280
281 pub fn generate_report(&self, pool: &PgPool) -> MonitoringReport {
283 let current_metrics = self.collect_metrics(pool);
284 let history = self.get_history();
285
286 let health = self.determine_health(¤t_metrics);
287 let alerts = self.check_alerts(¤t_metrics, &history);
288 let capacity_stats = self.calculate_capacity_stats(&history);
289
290 info!(
291 health = ?health,
292 alerts = alerts.len(),
293 utilization = current_metrics.utilization,
294 "Generated monitoring report"
295 );
296
297 MonitoringReport {
298 generated_at: Utc::now(),
299 health,
300 current_metrics,
301 alerts,
302 history,
303 capacity_stats,
304 }
305 }
306
307 fn determine_health(&self, metrics: &PoolMetricsSnapshot) -> PoolHealth {
309 if metrics.is_saturated() {
310 PoolHealth::Saturated
311 } else if metrics.utilization >= self.config.critical_utilization_threshold {
312 PoolHealth::Critical
313 } else if metrics.utilization >= self.config.high_utilization_threshold {
314 PoolHealth::HighLoad
315 } else {
316 PoolHealth::Healthy
317 }
318 }
319
320 fn check_alerts(
322 &self,
323 current: &PoolMetricsSnapshot,
324 history: &[PoolMetricsSnapshot],
325 ) -> Vec<PoolAlert> {
326 let mut alerts = Vec::new();
327
328 if current.utilization >= self.config.high_utilization_threshold {
330 let level = if current.utilization >= self.config.critical_utilization_threshold {
331 AlertLevel::Critical
332 } else {
333 AlertLevel::Warning
334 };
335
336 alerts.push(PoolAlert {
337 level,
338 message: "High pool utilization detected".to_string(),
339 current_value: current.utilization,
340 threshold: self.config.high_utilization_threshold,
341 triggered_at: Utc::now(),
342 recommendation:
343 "Consider increasing max_connections or optimizing query performance"
344 .to_string(),
345 });
346 }
347
348 if current.is_saturated() {
350 alerts.push(PoolAlert {
351 level: AlertLevel::Critical,
352 message: "Pool is fully saturated - no idle connections available".to_string(),
353 current_value: 0.0,
354 threshold: 1.0,
355 triggered_at: Utc::now(),
356 recommendation:
357 "Immediate action required: increase pool size or reduce concurrent connections"
358 .to_string(),
359 });
360 }
361
362 if history.len() >= 3 {
364 let recent_avg = history
365 .iter()
366 .rev()
367 .take(3)
368 .map(|s| s.utilization)
369 .sum::<f64>()
370 / 3.0;
371
372 if current.utilization > recent_avg + 0.2 {
373 alerts.push(PoolAlert {
374 level: AlertLevel::Warning,
375 message: "Rapid increase in pool utilization detected".to_string(),
376 current_value: current.utilization,
377 threshold: recent_avg,
378 triggered_at: Utc::now(),
379 recommendation: "Monitor for potential traffic spike or connection leak"
380 .to_string(),
381 });
382 }
383 }
384
385 alerts
386 }
387
388 fn calculate_capacity_stats(&self, history: &[PoolMetricsSnapshot]) -> CapacityStats {
390 if history.is_empty() {
391 return CapacityStats {
392 avg_utilization: 0.0,
393 peak_utilization: 0.0,
394 peak_at: None,
395 trend: 0.0,
396 time_to_exhaustion: None,
397 };
398 }
399
400 let avg_utilization =
401 history.iter().map(|s| s.utilization).sum::<f64>() / history.len() as f64;
402
403 let peak = history
404 .iter()
405 .max_by(|a, b| a.utilization.partial_cmp(&b.utilization).unwrap())
406 .unwrap();
407
408 let peak_utilization = peak.utilization;
409 let peak_at = Some(peak.timestamp);
410
411 let trend = self.calculate_trend(history);
413
414 let time_to_exhaustion = if trend > 0.0 {
416 let current_utilization = history.last().map(|s| s.utilization).unwrap_or(0.0);
417 let remaining = 1.0 - current_utilization;
418 if remaining > 0.0 {
419 let hours = remaining / (trend * 24.0); Some(Duration::from_secs((hours * 3600.0) as u64))
421 } else {
422 None
423 }
424 } else {
425 None
426 };
427
428 CapacityStats {
429 avg_utilization,
430 peak_utilization,
431 peak_at,
432 trend,
433 time_to_exhaustion,
434 }
435 }
436
437 fn calculate_trend(&self, history: &[PoolMetricsSnapshot]) -> f64 {
439 if history.len() < 2 {
440 return 0.0;
441 }
442
443 let first = &history[0];
444 let last = history.last().unwrap();
445
446 let time_diff = last.timestamp.signed_duration_since(first.timestamp);
447 let days = time_diff.num_seconds() as f64 / 86400.0;
448
449 if days > 0.0 {
450 (last.utilization - first.utilization) / days
451 } else {
452 0.0
453 }
454 }
455
456 pub fn get_history(&self) -> Vec<PoolMetricsSnapshot> {
458 self.history
459 .lock()
460 .ok()
461 .map(|h| h.iter().cloned().collect())
462 .unwrap_or_default()
463 }
464
465 pub fn clear_history(&self) {
467 if let Ok(mut history) = self.history.lock() {
468 history.clear();
469 }
470 }
471
472 pub fn history_count(&self) -> usize {
474 self.history.lock().ok().map(|h| h.len()).unwrap_or(0)
475 }
476}
477
478#[cfg(test)]
479mod tests {
480 use super::*;
481
482 #[test]
483 fn test_monitor_config_default() {
484 let config = MonitorConfig::default();
485 assert_eq!(config.high_utilization_threshold, 0.8);
486 assert_eq!(config.critical_utilization_threshold, 0.95);
487 assert_eq!(config.slow_acquisition_threshold_ms, 1000);
488 }
489
490 #[test]
491 fn test_alert_level_ordering() {
492 assert_ne!(AlertLevel::Info, AlertLevel::Warning);
493 assert_ne!(AlertLevel::Warning, AlertLevel::Critical);
494 }
495
496 #[test]
497 fn test_pool_health_description() {
498 assert_eq!(
499 PoolHealth::Healthy.description(),
500 "Pool is operating normally"
501 );
502 assert_eq!(
503 PoolHealth::Critical.description(),
504 "Pool utilization is critically high"
505 );
506 }
507
508 #[test]
509 fn test_metrics_snapshot_high_load() {
510 let snapshot = PoolMetricsSnapshot {
511 timestamp: Utc::now(),
512 total_connections: 8,
513 active_connections: 7,
514 idle_connections: 1,
515 utilization: 0.85,
516 avg_acquisition_time_ms: 100.0,
517 timeouts: 0,
518 };
519
520 assert!(snapshot.is_high_load(0.8));
521 assert!(!snapshot.is_high_load(0.9));
522 }
523
524 #[test]
525 fn test_metrics_snapshot_saturated() {
526 let snapshot = PoolMetricsSnapshot {
527 timestamp: Utc::now(),
528 total_connections: 10,
529 active_connections: 10,
530 idle_connections: 0,
531 utilization: 1.0,
532 avg_acquisition_time_ms: 500.0,
533 timeouts: 5,
534 };
535
536 assert!(snapshot.is_saturated());
537 }
538
539 #[test]
540 fn test_pool_alert_serialization() {
541 let alert = PoolAlert {
542 level: AlertLevel::Critical,
543 message: "Pool exhausted".to_string(),
544 current_value: 1.0,
545 threshold: 0.95,
546 triggered_at: Utc::now(),
547 recommendation: "Increase pool size".to_string(),
548 };
549
550 let json = serde_json::to_string(&alert).unwrap();
551 assert!(json.contains("Critical"));
552 assert!(json.contains("Pool exhausted"));
553 }
554
555 #[test]
556 fn test_monitor_with_defaults() {
557 let monitor = PoolMonitor::with_defaults();
558 assert_eq!(monitor.history_count(), 0);
559 }
560
561 #[test]
562 fn test_monitor_clear_history() {
563 let monitor = PoolMonitor::with_defaults();
564 monitor.clear_history();
565 assert_eq!(monitor.history_count(), 0);
566 }
567
568 #[test]
569 fn test_capacity_stats_serialization() {
570 let stats = CapacityStats {
571 avg_utilization: 0.7,
572 peak_utilization: 0.95,
573 peak_at: Some(Utc::now()),
574 trend: 0.05,
575 time_to_exhaustion: Some(Duration::from_secs(3600)),
576 };
577
578 let json = serde_json::to_string(&stats).unwrap();
579 assert!(json.contains("avg_utilization"));
580 assert!(json.contains("peak_utilization"));
581 }
582
583 #[test]
584 fn test_monitoring_report_structure() {
585 let snapshot = PoolMetricsSnapshot {
586 timestamp: Utc::now(),
587 total_connections: 5,
588 active_connections: 3,
589 idle_connections: 2,
590 utilization: 0.5,
591 avg_acquisition_time_ms: 50.0,
592 timeouts: 0,
593 };
594
595 let report = MonitoringReport {
596 generated_at: Utc::now(),
597 health: PoolHealth::Healthy,
598 current_metrics: snapshot,
599 alerts: vec![],
600 history: vec![],
601 capacity_stats: CapacityStats {
602 avg_utilization: 0.5,
603 peak_utilization: 0.7,
604 peak_at: None,
605 trend: 0.0,
606 time_to_exhaustion: None,
607 },
608 };
609
610 let json = serde_json::to_string(&report).unwrap();
611 assert!(json.contains("Healthy"));
612 }
613}