1use crate::error::{ClusterError, Result};
6use dashmap::DashMap;
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::sync::Arc;
11use std::time::{Duration, SystemTime};
12use tracing::{debug, info, warn};
13
14pub type MetricId = String;
16
17pub type AlertId = uuid::Uuid;
19
20pub struct MonitoringManager {
22 metrics: Arc<DashMap<MetricId, RwLock<MetricSeries>>>,
24 alert_rules: Arc<DashMap<AlertId, AlertRule>>,
26 active_alerts: Arc<DashMap<AlertId, Alert>>,
28 alert_history: Arc<RwLock<VecDeque<Alert>>>,
30 anomaly_detector: Arc<RwLock<AnomalyDetector>>,
32 stats: Arc<RwLock<MonitoringStats>>,
34}
35
36#[derive(Debug, Clone)]
38pub struct MetricSeries {
39 pub name: String,
41 pub metric_type: MetricType,
43 pub datapoints: VecDeque<DataPoint>,
45 pub max_points: usize,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct DataPoint {
52 pub timestamp: SystemTime,
54 pub value: f64,
56 pub labels: HashMap<String, String>,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
62pub enum MetricType {
63 Counter,
65 Gauge,
67 Histogram,
69 Summary,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct AlertRule {
76 pub id: AlertId,
78 pub name: String,
80 pub metric: MetricId,
82 pub condition: AlertCondition,
84 pub threshold: f64,
86 pub duration: Duration,
88 pub severity: AlertSeverity,
90 pub enabled: bool,
92 pub notify: Vec<NotificationChannel>,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub enum AlertCondition {
99 GreaterThan,
101 LessThan,
103 Equal,
105 RateOfChange {
107 period: Duration,
109 },
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
114pub enum AlertSeverity {
115 Info,
117 Warning,
119 Error,
121 Critical,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127pub enum NotificationChannel {
128 Email {
130 address: String,
132 },
133 Webhook {
135 url: String,
137 },
138 Slack {
140 webhook_url: String,
142 },
143 PagerDuty {
145 integration_key: String,
147 },
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct Alert {
153 pub id: AlertId,
155 pub rule_id: AlertId,
157 pub triggered_at: SystemTime,
159 pub resolved_at: Option<SystemTime>,
161 pub severity: AlertSeverity,
163 pub message: String,
165 pub value: f64,
167}
168
169#[derive(Debug, Clone)]
171pub struct AnomalyDetector {
172 sensitivity: f64,
174 metric_history: HashMap<MetricId, VecDeque<f64>>,
176 window_size: usize,
178}
179
180impl AnomalyDetector {
181 fn new(sensitivity: f64, window_size: usize) -> Self {
182 Self {
183 sensitivity,
184 metric_history: HashMap::new(),
185 window_size,
186 }
187 }
188
189 fn record(&mut self, metric_id: MetricId, value: f64) {
190 let history = self.metric_history.entry(metric_id).or_default();
191 history.push_back(value);
192
193 if history.len() > self.window_size {
194 history.pop_front();
195 }
196 }
197
198 fn detect_anomaly(&self, metric_id: &MetricId, value: f64) -> bool {
199 if let Some(history) = self.metric_history.get(metric_id) {
200 if history.len() < 10 {
201 return false;
202 }
203
204 let mean = history.iter().sum::<f64>() / history.len() as f64;
205 let variance =
206 history.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / history.len() as f64;
207 let std_dev = variance.sqrt();
208
209 let z_score = (value - mean).abs() / std_dev.max(0.001);
210 z_score > self.sensitivity
211 } else {
212 false
213 }
214 }
215}
216
217#[derive(Debug, Clone, Default, Serialize, Deserialize)]
219pub struct MonitoringStats {
220 pub total_metrics: usize,
222 pub total_datapoints: u64,
224 pub total_alerts: u64,
226 pub active_alerts: usize,
228 pub anomalies_detected: u64,
230}
231
232impl MonitoringManager {
233 pub fn new() -> Self {
235 Self {
236 metrics: Arc::new(DashMap::new()),
237 alert_rules: Arc::new(DashMap::new()),
238 active_alerts: Arc::new(DashMap::new()),
239 alert_history: Arc::new(RwLock::new(VecDeque::new())),
240 anomaly_detector: Arc::new(RwLock::new(AnomalyDetector::new(3.0, 100))),
241 stats: Arc::new(RwLock::new(MonitoringStats::default())),
242 }
243 }
244
245 pub fn register_metric(&self, name: MetricId, metric_type: MetricType) -> Result<()> {
247 let series = MetricSeries {
248 name: name.clone(),
249 metric_type,
250 datapoints: VecDeque::new(),
251 max_points: 10000,
252 };
253
254 self.metrics.insert(name, RwLock::new(series));
255
256 let mut stats = self.stats.write();
257 stats.total_metrics = self.metrics.len();
258
259 Ok(())
260 }
261
262 pub fn record_metric(
264 &self,
265 metric_id: MetricId,
266 value: f64,
267 labels: HashMap<String, String>,
268 ) -> Result<()> {
269 let entry = self
270 .metrics
271 .get(&metric_id)
272 .ok_or_else(|| ClusterError::MetricNotFound(metric_id.clone()))?;
273
274 let mut series = entry.write();
275
276 let datapoint = DataPoint {
277 timestamp: SystemTime::now(),
278 value,
279 labels,
280 };
281
282 series.datapoints.push_back(datapoint);
283
284 if series.datapoints.len() > series.max_points {
285 series.datapoints.pop_front();
286 }
287
288 let mut detector = self.anomaly_detector.write();
290 detector.record(metric_id.clone(), value);
291
292 if detector.detect_anomaly(&metric_id, value) {
294 warn!("Anomaly detected in metric {}: {}", metric_id, value);
295 let mut stats = self.stats.write();
296 stats.anomalies_detected += 1;
297 }
298
299 {
302 let mut stats = self.stats.write();
303 stats.total_datapoints += 1;
304 } self.evaluate_alerts(&metric_id, value)?;
308
309 Ok(())
310 }
311
312 pub fn create_alert_rule(&self, rule: AlertRule) -> Result<AlertId> {
314 let id = rule.id;
315 self.alert_rules.insert(id, rule);
316 Ok(id)
317 }
318
319 fn evaluate_alerts(&self, metric_id: &MetricId, value: f64) -> Result<()> {
321 for entry in self.alert_rules.iter() {
322 let rule = entry.value();
323
324 if !rule.enabled || rule.metric != *metric_id {
325 continue;
326 }
327
328 let triggered = match rule.condition {
329 AlertCondition::GreaterThan => value > rule.threshold,
330 AlertCondition::LessThan => value < rule.threshold,
331 AlertCondition::Equal => (value - rule.threshold).abs() < 0.001,
332 AlertCondition::RateOfChange { .. } => false, };
334
335 if triggered && !self.active_alerts.contains_key(&rule.id) {
336 self.trigger_alert(rule.id, value)?;
337 } else if !triggered && self.active_alerts.contains_key(&rule.id) {
338 self.resolve_alert(rule.id)?;
339 }
340 }
341
342 Ok(())
343 }
344
345 fn trigger_alert(&self, rule_id: AlertId, value: f64) -> Result<()> {
346 let rule = self
347 .alert_rules
348 .get(&rule_id)
349 .ok_or_else(|| ClusterError::AlertNotFound(rule_id.to_string()))?;
350
351 let alert = Alert {
352 id: uuid::Uuid::new_v4(),
353 rule_id,
354 triggered_at: SystemTime::now(),
355 resolved_at: None,
356 severity: rule.severity,
357 message: format!("Alert triggered: {} (value: {})", rule.name, value),
358 value,
359 };
360
361 info!("Alert triggered: {} - {}", rule.name, alert.message);
362
363 for channel in &rule.notify {
365 self.send_notification(channel, &alert)?;
366 }
367
368 self.active_alerts.insert(rule_id, alert.clone());
369 self.alert_history.write().push_back(alert);
370
371 let mut stats = self.stats.write();
372 stats.total_alerts += 1;
373 stats.active_alerts = self.active_alerts.len();
374
375 Ok(())
376 }
377
378 fn resolve_alert(&self, rule_id: AlertId) -> Result<()> {
379 if let Some((_, mut alert)) = self.active_alerts.remove(&rule_id) {
380 alert.resolved_at = Some(SystemTime::now());
381 info!("Alert resolved: {}", alert.message);
382
383 let mut stats = self.stats.write();
384 stats.active_alerts = self.active_alerts.len();
385 }
386
387 Ok(())
388 }
389
390 fn send_notification(&self, channel: &NotificationChannel, alert: &Alert) -> Result<()> {
391 match channel {
392 NotificationChannel::Email { address } => {
393 debug!("Would send email to {}: {}", address, alert.message);
394 }
395 NotificationChannel::Webhook { url } => {
396 debug!("Would send webhook to {}: {}", url, alert.message);
397 }
398 NotificationChannel::Slack { webhook_url } => {
399 debug!(
400 "Would send Slack notification to {}: {}",
401 webhook_url, alert.message
402 );
403 }
404 NotificationChannel::PagerDuty { integration_key } => {
405 debug!(
406 "Would trigger PagerDuty with key {}: {}",
407 integration_key, alert.message
408 );
409 }
410 }
411 Ok(())
412 }
413
414 pub fn get_metric(&self, metric_id: &MetricId) -> Option<Vec<DataPoint>> {
416 self.metrics
417 .get(metric_id)
418 .map(|s| s.read().datapoints.iter().cloned().collect())
419 }
420
421 pub fn get_active_alerts(&self) -> Vec<Alert> {
423 self.active_alerts
424 .iter()
425 .map(|e| e.value().clone())
426 .collect()
427 }
428
429 pub fn get_alert_history(&self, limit: usize) -> Vec<Alert> {
431 let history = self.alert_history.read();
432 history.iter().rev().take(limit).cloned().collect()
433 }
434
435 pub fn get_stats(&self) -> MonitoringStats {
437 self.stats.read().clone()
438 }
439}
440
441impl Default for MonitoringManager {
442 fn default() -> Self {
443 Self::new()
444 }
445}
446
447#[cfg(test)]
448#[allow(clippy::expect_used, clippy::unwrap_used)]
449mod tests {
450 use super::*;
451
452 #[test]
453 fn test_metric_registration() {
454 let manager = MonitoringManager::new();
455 let result = manager.register_metric("test_metric".to_string(), MetricType::Gauge);
456 assert!(result.is_ok());
457
458 let stats = manager.get_stats();
459 assert_eq!(stats.total_metrics, 1);
460 }
461
462 #[test]
463 fn test_metric_recording() {
464 let manager = MonitoringManager::new();
465 manager
466 .register_metric("cpu_usage".to_string(), MetricType::Gauge)
467 .ok();
468
469 let mut labels = HashMap::new();
470 labels.insert("host".to_string(), "worker1".to_string());
471
472 manager
473 .record_metric("cpu_usage".to_string(), 0.75, labels)
474 .ok();
475
476 let datapoints = manager.get_metric(&"cpu_usage".to_string());
477 assert!(datapoints.is_some());
478 assert_eq!(
479 datapoints
480 .expect("datapoints should be present for recorded metric")
481 .len(),
482 1
483 );
484 }
485
486 #[test]
487 fn test_alert_rule() {
488 let manager = MonitoringManager::new();
489 manager
490 .register_metric("cpu_usage".to_string(), MetricType::Gauge)
491 .expect("Failed to register metric");
492
493 let rule = AlertRule {
496 id: uuid::Uuid::new_v4(),
497 name: "High CPU".to_string(),
498 metric: "cpu_usage".to_string(),
499 condition: AlertCondition::GreaterThan,
500 threshold: 0.8,
501 duration: Duration::from_millis(100), severity: AlertSeverity::Warning,
503 enabled: true,
504 notify: vec![],
505 };
506
507 manager
508 .create_alert_rule(rule.clone())
509 .expect("Failed to create alert rule");
510
511 manager
513 .record_metric("cpu_usage".to_string(), 0.9, HashMap::new())
514 .expect("Failed to record metric");
515
516 let alerts = manager.get_active_alerts();
517 assert!(!alerts.is_empty());
518 }
519
520 #[test]
521 fn test_anomaly_detection() {
522 let mut detector = AnomalyDetector::new(3.0, 100);
523
524 for i in 0..50 {
526 detector.record("metric1".to_string(), 100.0 + (i as f64 % 10.0));
527 }
528
529 assert!(!detector.detect_anomaly(&"metric1".to_string(), 105.0));
531
532 assert!(detector.detect_anomaly(&"metric1".to_string(), 500.0));
534 }
535}