1use anyhow::Result;
2use serde::{Deserialize, Serialize};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5use tokio::sync::RwLock;
6use tokio::time::interval;
7use tracing::{error, info, warn};
8
9use crate::memory::connection::ConnectionPool;
10
11#[derive(Debug, Clone)]
14pub struct ConnectionMonitor {
15 pool: Arc<ConnectionPool>,
16 config: MonitoringConfig,
17 metrics: Arc<RwLock<ConnectionMetrics>>,
18 alert_history: Arc<RwLock<Vec<AlertEvent>>>,
19}
20
21#[derive(Debug, Clone)]
22pub struct MonitoringConfig {
23 pub check_interval_seconds: u64,
25 pub warning_threshold: f32,
27 pub critical_threshold: f32,
29 pub max_alert_history: usize,
31 pub alert_cooldown_seconds: u64,
33 pub enable_detailed_logging: bool,
35}
36
37impl Default for MonitoringConfig {
38 fn default() -> Self {
39 Self {
40 check_interval_seconds: 30, warning_threshold: 70.0, critical_threshold: 90.0, max_alert_history: 1000,
44 alert_cooldown_seconds: 300, enable_detailed_logging: true,
46 }
47 }
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct ConnectionMetrics {
52 pub timestamp: chrono::DateTime<chrono::Utc>,
53 pub pool_stats: PoolStatsSnapshot,
54 pub health_status: String,
55 pub alert_level: AlertLevel,
56 pub uptime_seconds: u64,
57 pub total_checks: u64,
58 pub alert_count: u64,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct PoolStatsSnapshot {
63 pub size: u32,
64 pub idle: u32,
65 pub active_connections: u32,
66 pub max_size: u32,
67 pub utilization_percentage: f32,
68 pub waiting_for_connection: u32,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
72pub enum AlertLevel {
73 Healthy,
74 Warning,
75 Critical,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct AlertEvent {
80 pub timestamp: chrono::DateTime<chrono::Utc>,
81 pub level: AlertLevel,
82 pub message: String,
83 pub pool_stats: PoolStatsSnapshot,
84 pub resolved: bool,
85}
86
87impl ConnectionMonitor {
88 pub fn new(pool: Arc<ConnectionPool>, config: MonitoringConfig) -> Self {
89 Self {
90 pool,
91 config,
92 metrics: Arc::new(RwLock::new(ConnectionMetrics {
93 timestamp: chrono::Utc::now(),
94 pool_stats: PoolStatsSnapshot::default(),
95 health_status: "Starting".to_string(),
96 alert_level: AlertLevel::Healthy,
97 uptime_seconds: 0,
98 total_checks: 0,
99 alert_count: 0,
100 })),
101 alert_history: Arc::new(RwLock::new(Vec::new())),
102 }
103 }
104
105 pub async fn start_monitoring(&self) -> Result<()> {
107 info!("🔍 Starting connection pool monitoring service");
108 info!(" Warning threshold: {}%", self.config.warning_threshold);
109 info!(" Critical threshold: {}%", self.config.critical_threshold);
110 info!(" Check interval: {}s", self.config.check_interval_seconds);
111
112 let start_time = Instant::now();
113 let mut interval = interval(Duration::from_secs(self.config.check_interval_seconds));
114
115 loop {
116 interval.tick().await;
117
118 match self.perform_health_check(start_time.elapsed()).await {
119 Ok(_) => {
120 }
122 Err(e) => {
123 error!("Health check failed: {}", e);
124 }
126 }
127 }
128 }
129
130 async fn perform_health_check(&self, uptime: Duration) -> Result<()> {
131 let pool_stats = self.pool.get_pool_stats().await;
132 let timestamp = chrono::Utc::now();
133
134 let stats_snapshot = PoolStatsSnapshot {
136 size: pool_stats.size,
137 idle: pool_stats.idle,
138 active_connections: pool_stats.active_connections,
139 max_size: pool_stats.max_size,
140 utilization_percentage: pool_stats.utilization_percentage(),
141 waiting_for_connection: pool_stats.waiting_for_connection,
142 };
143
144 let alert_level = self.determine_alert_level(&stats_snapshot);
146 let health_status = self.generate_health_status(&stats_snapshot, &alert_level);
147
148 {
150 let mut metrics = self.metrics.write().await;
151 metrics.timestamp = timestamp;
152 metrics.pool_stats = stats_snapshot.clone();
153 metrics.health_status = health_status.clone();
154 metrics.alert_level = alert_level.clone();
155 metrics.uptime_seconds = uptime.as_secs();
156 metrics.total_checks += 1;
157 }
158
159 if let Some(alert) = self.should_alert(&alert_level, &stats_snapshot).await? {
161 self.send_alert(alert).await?;
162 }
163
164 if self.config.enable_detailed_logging {
166 self.log_status(&stats_snapshot, &alert_level).await;
167 }
168
169 Ok(())
170 }
171
172 fn determine_alert_level(&self, stats: &PoolStatsSnapshot) -> AlertLevel {
173 let utilization = stats.utilization_percentage;
174
175 if utilization >= self.config.critical_threshold {
176 AlertLevel::Critical
177 } else if utilization >= self.config.warning_threshold {
178 AlertLevel::Warning
179 } else {
180 AlertLevel::Healthy
181 }
182 }
183
184 fn generate_health_status(
185 &self,
186 stats: &PoolStatsSnapshot,
187 alert_level: &AlertLevel,
188 ) -> String {
189 match alert_level {
190 AlertLevel::Healthy => format!(
191 "HEALTHY: Pool at {:.1}% utilization ({}/{} connections active)",
192 stats.utilization_percentage, stats.active_connections, stats.max_size
193 ),
194 AlertLevel::Warning => format!(
195 "WARNING: Pool at {:.1}% utilization ({}/{} connections active) - Approaching capacity",
196 stats.utilization_percentage, stats.active_connections, stats.max_size
197 ),
198 AlertLevel::Critical => format!(
199 "CRITICAL: Pool at {:.1}% utilization ({}/{} connections active) - Pool saturated!",
200 stats.utilization_percentage, stats.active_connections, stats.max_size
201 ),
202 }
203 }
204
205 async fn should_alert(
206 &self,
207 level: &AlertLevel,
208 stats: &PoolStatsSnapshot,
209 ) -> Result<Option<AlertEvent>> {
210 if matches!(level, AlertLevel::Healthy) {
211 return Ok(None);
212 }
213
214 let alert_history = self.alert_history.read().await;
215 let now = chrono::Utc::now();
216
217 if let Some(last_alert) = alert_history.iter().rev().find(|a| a.level == *level) {
219 let time_since_last = now.signed_duration_since(last_alert.timestamp);
220 if time_since_last.num_seconds() < self.config.alert_cooldown_seconds as i64 {
221 return Ok(None); }
223 }
224
225 let message = match level {
227 AlertLevel::Warning => format!(
228 "Connection pool utilization at {:.1}% (threshold: {}%) - {} active of {} max connections",
229 stats.utilization_percentage,
230 self.config.warning_threshold,
231 stats.active_connections,
232 stats.max_size
233 ),
234 AlertLevel::Critical => format!(
235 "Connection pool critically saturated at {:.1}% (threshold: {}%) - {} active of {} max connections. Immediate attention required!",
236 stats.utilization_percentage,
237 self.config.critical_threshold,
238 stats.active_connections,
239 stats.max_size
240 ),
241 AlertLevel::Healthy => unreachable!(),
242 };
243
244 Ok(Some(AlertEvent {
245 timestamp: now,
246 level: level.clone(),
247 message,
248 pool_stats: stats.clone(),
249 resolved: false,
250 }))
251 }
252
253 async fn send_alert(&self, alert: AlertEvent) -> Result<()> {
254 match alert.level {
256 AlertLevel::Warning => warn!("🚨 POOL WARNING: {}", alert.message),
257 AlertLevel::Critical => error!("🚨🚨 POOL CRITICAL: {}", alert.message),
258 AlertLevel::Healthy => unreachable!(),
259 }
260
261 {
263 let mut metrics = self.metrics.write().await;
264 metrics.alert_count += 1;
265 }
266
267 {
269 let mut history = self.alert_history.write().await;
270 history.push(alert.clone());
271
272 if history.len() > self.config.max_alert_history {
274 history.remove(0);
275 }
276 }
277
278 info!("Alert sent: {} - {}", alert.level as u8, alert.message);
285
286 Ok(())
287 }
288
289 async fn log_status(&self, stats: &PoolStatsSnapshot, level: &AlertLevel) {
290 match level {
291 AlertLevel::Healthy => {
292 if self.config.enable_detailed_logging {
293 info!(
294 "Pool Status: {:.1}% utilization - {}/{} active connections",
295 stats.utilization_percentage, stats.active_connections, stats.max_size
296 );
297 }
298 }
299 AlertLevel::Warning => {
300 warn!(
301 "Pool Status: {:.1}% utilization - {}/{} active connections (WARNING LEVEL)",
302 stats.utilization_percentage, stats.active_connections, stats.max_size
303 );
304 }
305 AlertLevel::Critical => {
306 error!(
307 "Pool Status: {:.1}% utilization - {}/{} active connections (CRITICAL LEVEL)",
308 stats.utilization_percentage, stats.active_connections, stats.max_size
309 );
310 }
311 }
312 }
313
314 pub async fn get_metrics(&self) -> ConnectionMetrics {
316 self.metrics.read().await.clone()
317 }
318
319 pub async fn get_alert_history(&self, limit: Option<usize>) -> Vec<AlertEvent> {
321 let history = self.alert_history.read().await;
322 let limit = limit.unwrap_or(history.len());
323 history.iter().rev().take(limit).cloned().collect()
324 }
325
326 pub async fn get_health_summary(&self) -> PoolHealthSummary {
328 let metrics = self.get_metrics().await;
329 let recent_alerts = self.get_alert_history(Some(10)).await;
330
331 let critical_alerts = recent_alerts
332 .iter()
333 .filter(|a| matches!(a.level, AlertLevel::Critical))
334 .count();
335 let warning_alerts = recent_alerts
336 .iter()
337 .filter(|a| matches!(a.level, AlertLevel::Warning))
338 .count();
339
340 PoolHealthSummary {
341 current_status: metrics.health_status,
342 current_utilization: metrics.pool_stats.utilization_percentage,
343 alert_level: metrics.alert_level,
344 uptime_hours: metrics.uptime_seconds / 3600,
345 total_alerts: metrics.alert_count,
346 recent_critical_alerts: critical_alerts as u64,
347 recent_warning_alerts: warning_alerts as u64,
348 last_check: metrics.timestamp,
349 }
350 }
351}
352
353impl Default for PoolStatsSnapshot {
354 fn default() -> Self {
355 Self {
356 size: 0,
357 idle: 0,
358 active_connections: 0,
359 max_size: 0,
360 utilization_percentage: 0.0,
361 waiting_for_connection: 0,
362 }
363 }
364}
365
366#[derive(Debug, Clone, Serialize, Deserialize)]
367pub struct PoolHealthSummary {
368 pub current_status: String,
369 pub current_utilization: f32,
370 pub alert_level: AlertLevel,
371 pub uptime_hours: u64,
372 pub total_alerts: u64,
373 pub recent_critical_alerts: u64,
374 pub recent_warning_alerts: u64,
375 pub last_check: chrono::DateTime<chrono::Utc>,
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381
382 #[test]
383 fn test_alert_level_determination() {
384 let config = MonitoringConfig::default();
385
386 let healthy_stats = PoolStatsSnapshot {
388 utilization_percentage: 50.0,
389 ..Default::default()
390 };
391 assert_eq!(
392 determine_alert_level(&config, &healthy_stats),
393 AlertLevel::Healthy
394 );
395
396 let warning_stats = PoolStatsSnapshot {
398 utilization_percentage: 75.0,
399 ..Default::default()
400 };
401 assert_eq!(
402 determine_alert_level(&config, &warning_stats),
403 AlertLevel::Warning
404 );
405
406 let critical_stats = PoolStatsSnapshot {
408 utilization_percentage: 95.0,
409 ..Default::default()
410 };
411 assert_eq!(
412 determine_alert_level(&config, &critical_stats),
413 AlertLevel::Critical
414 );
415 }
416
417 #[test]
418 fn test_health_status_generation() {
419 let stats = PoolStatsSnapshot {
420 utilization_percentage: 75.0,
421 active_connections: 75,
422 max_size: 100,
423 ..Default::default()
424 };
425
426 let status = generate_health_status(&stats, &AlertLevel::Warning);
427 assert!(status.contains("WARNING"));
428 assert!(status.contains("75.0%"));
429 assert!(status.contains("75/100"));
430 }
431}
432
433fn determine_alert_level(config: &MonitoringConfig, stats: &PoolStatsSnapshot) -> AlertLevel {
435 let utilization = stats.utilization_percentage;
436
437 if utilization >= config.critical_threshold {
438 AlertLevel::Critical
439 } else if utilization >= config.warning_threshold {
440 AlertLevel::Warning
441 } else {
442 AlertLevel::Healthy
443 }
444}
445
446fn generate_health_status(stats: &PoolStatsSnapshot, alert_level: &AlertLevel) -> String {
447 match alert_level {
448 AlertLevel::Healthy => format!(
449 "HEALTHY: Pool at {:.1}% utilization ({}/{} connections active)",
450 stats.utilization_percentage, stats.active_connections, stats.max_size
451 ),
452 AlertLevel::Warning => format!(
453 "WARNING: Pool at {:.1}% utilization ({}/{} connections active) - Approaching capacity",
454 stats.utilization_percentage, stats.active_connections, stats.max_size
455 ),
456 AlertLevel::Critical => format!(
457 "CRITICAL: Pool at {:.1}% utilization ({}/{} connections active) - Pool saturated!",
458 stats.utilization_percentage, stats.active_connections, stats.max_size
459 ),
460 }
461}