Skip to main content

oxigdal_cluster/monitoring/
mod.rs

1//! Advanced monitoring and alerting for cluster management.
2//!
3//! Provides real-time monitoring, custom metrics, alert rules, and anomaly detection.
4
5use crate::error::{ClusterError, Result};
6use dashmap::DashMap;
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::sync::Arc;
11use std::time::{Duration, SystemTime};
12use tracing::{debug, info, warn};
13
14/// Metric identifier.
15pub type MetricId = String;
16
17/// Alert identifier.
18pub type AlertId = uuid::Uuid;
19
20/// Monitoring manager for comprehensive cluster monitoring.
21pub struct MonitoringManager {
22    /// Custom metrics registry
23    metrics: Arc<DashMap<MetricId, RwLock<MetricSeries>>>,
24    /// Alert rules
25    alert_rules: Arc<DashMap<AlertId, AlertRule>>,
26    /// Active alerts
27    active_alerts: Arc<DashMap<AlertId, Alert>>,
28    /// Alert history
29    alert_history: Arc<RwLock<VecDeque<Alert>>>,
30    /// Anomaly detector
31    anomaly_detector: Arc<RwLock<AnomalyDetector>>,
32    /// Statistics
33    stats: Arc<RwLock<MonitoringStats>>,
34}
35
36/// Time series data for a metric.
37#[derive(Debug, Clone)]
38pub struct MetricSeries {
39    /// Metric name
40    pub name: String,
41    /// Type of metric (counter, gauge, etc.)
42    pub metric_type: MetricType,
43    /// Stored data points
44    pub datapoints: VecDeque<DataPoint>,
45    /// Maximum number of data points to retain
46    pub max_points: usize,
47}
48
49/// Data point in time series.
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct DataPoint {
52    /// Timestamp when the data point was recorded
53    pub timestamp: SystemTime,
54    /// Metric value
55    pub value: f64,
56    /// Key-value labels for the data point
57    pub labels: HashMap<String, String>,
58}
59
60/// Metric type.
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
62pub enum MetricType {
63    /// Monotonically increasing counter
64    Counter,
65    /// Value that can go up or down
66    Gauge,
67    /// Distribution of values in buckets
68    Histogram,
69    /// Quantile-based summary of values
70    Summary,
71}
72
73/// Alert rule definition.
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct AlertRule {
76    /// Unique identifier for the alert rule
77    pub id: AlertId,
78    /// Human-readable name for the alert
79    pub name: String,
80    /// Metric ID this alert monitors
81    pub metric: MetricId,
82    /// Condition that triggers the alert
83    pub condition: AlertCondition,
84    /// Threshold value for the condition
85    pub threshold: f64,
86    /// Duration the condition must persist before alerting
87    pub duration: Duration,
88    /// Severity level of the alert
89    pub severity: AlertSeverity,
90    /// Whether the alert rule is active
91    pub enabled: bool,
92    /// Notification channels to use when alert triggers
93    pub notify: Vec<NotificationChannel>,
94}
95
96/// Alert condition.
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub enum AlertCondition {
99    /// Alert when value exceeds threshold
100    GreaterThan,
101    /// Alert when value falls below threshold
102    LessThan,
103    /// Alert when value equals threshold
104    Equal,
105    /// Alert when rate of change exceeds threshold
106    RateOfChange {
107        /// Time period over which to calculate rate
108        period: Duration,
109    },
110}
111
112/// Alert severity.
113#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
114pub enum AlertSeverity {
115    /// Informational alert
116    Info,
117    /// Warning-level alert
118    Warning,
119    /// Error-level alert
120    Error,
121    /// Critical alert requiring immediate attention
122    Critical,
123}
124
125/// Notification channel.
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub enum NotificationChannel {
128    /// Email notification
129    Email {
130        /// Email address to notify
131        address: String,
132    },
133    /// Webhook notification
134    Webhook {
135        /// URL to send webhook request
136        url: String,
137    },
138    /// Slack notification
139    Slack {
140        /// Slack webhook URL
141        webhook_url: String,
142    },
143    /// PagerDuty notification
144    PagerDuty {
145        /// PagerDuty integration key
146        integration_key: String,
147    },
148}
149
150/// Active alert.
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct Alert {
153    /// Unique identifier for this alert instance
154    pub id: AlertId,
155    /// The rule that triggered this alert
156    pub rule_id: AlertId,
157    /// When the alert was triggered
158    pub triggered_at: SystemTime,
159    /// When the alert was resolved (if resolved)
160    pub resolved_at: Option<SystemTime>,
161    /// Severity level of the alert
162    pub severity: AlertSeverity,
163    /// Human-readable alert message
164    pub message: String,
165    /// Value that triggered the alert
166    pub value: f64,
167}
168
169/// Anomaly detector using simple statistical methods.
170#[derive(Debug, Clone)]
171pub struct AnomalyDetector {
172    /// Sensitivity (number of standard deviations)
173    sensitivity: f64,
174    /// Metric history for analysis
175    metric_history: HashMap<MetricId, VecDeque<f64>>,
176    /// Window size
177    window_size: usize,
178}
179
180impl AnomalyDetector {
181    fn new(sensitivity: f64, window_size: usize) -> Self {
182        Self {
183            sensitivity,
184            metric_history: HashMap::new(),
185            window_size,
186        }
187    }
188
189    fn record(&mut self, metric_id: MetricId, value: f64) {
190        let history = self.metric_history.entry(metric_id).or_default();
191        history.push_back(value);
192
193        if history.len() > self.window_size {
194            history.pop_front();
195        }
196    }
197
198    fn detect_anomaly(&self, metric_id: &MetricId, value: f64) -> bool {
199        if let Some(history) = self.metric_history.get(metric_id) {
200            if history.len() < 10 {
201                return false;
202            }
203
204            let mean = history.iter().sum::<f64>() / history.len() as f64;
205            let variance =
206                history.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / history.len() as f64;
207            let std_dev = variance.sqrt();
208
209            let z_score = (value - mean).abs() / std_dev.max(0.001);
210            z_score > self.sensitivity
211        } else {
212            false
213        }
214    }
215}
216
217/// Monitoring statistics.
218#[derive(Debug, Clone, Default, Serialize, Deserialize)]
219pub struct MonitoringStats {
220    /// Total number of registered metrics
221    pub total_metrics: usize,
222    /// Total number of data points recorded
223    pub total_datapoints: u64,
224    /// Total number of alerts triggered
225    pub total_alerts: u64,
226    /// Number of currently active alerts
227    pub active_alerts: usize,
228    /// Number of anomalies detected
229    pub anomalies_detected: u64,
230}
231
232impl MonitoringManager {
233    /// Create a new monitoring manager.
234    pub fn new() -> Self {
235        Self {
236            metrics: Arc::new(DashMap::new()),
237            alert_rules: Arc::new(DashMap::new()),
238            active_alerts: Arc::new(DashMap::new()),
239            alert_history: Arc::new(RwLock::new(VecDeque::new())),
240            anomaly_detector: Arc::new(RwLock::new(AnomalyDetector::new(3.0, 100))),
241            stats: Arc::new(RwLock::new(MonitoringStats::default())),
242        }
243    }
244
245    /// Register a custom metric.
246    pub fn register_metric(&self, name: MetricId, metric_type: MetricType) -> Result<()> {
247        let series = MetricSeries {
248            name: name.clone(),
249            metric_type,
250            datapoints: VecDeque::new(),
251            max_points: 10000,
252        };
253
254        self.metrics.insert(name, RwLock::new(series));
255
256        let mut stats = self.stats.write();
257        stats.total_metrics = self.metrics.len();
258
259        Ok(())
260    }
261
262    /// Record a metric value.
263    pub fn record_metric(
264        &self,
265        metric_id: MetricId,
266        value: f64,
267        labels: HashMap<String, String>,
268    ) -> Result<()> {
269        let entry = self
270            .metrics
271            .get(&metric_id)
272            .ok_or_else(|| ClusterError::MetricNotFound(metric_id.clone()))?;
273
274        let mut series = entry.write();
275
276        let datapoint = DataPoint {
277            timestamp: SystemTime::now(),
278            value,
279            labels,
280        };
281
282        series.datapoints.push_back(datapoint);
283
284        if series.datapoints.len() > series.max_points {
285            series.datapoints.pop_front();
286        }
287
288        // Record for anomaly detection
289        let mut detector = self.anomaly_detector.write();
290        detector.record(metric_id.clone(), value);
291
292        // Check for anomalies
293        if detector.detect_anomaly(&metric_id, value) {
294            warn!("Anomaly detected in metric {}: {}", metric_id, value);
295            let mut stats = self.stats.write();
296            stats.anomalies_detected += 1;
297        }
298
299        // Update stats - IMPORTANT: Must release this lock before evaluate_alerts
300        // to avoid deadlock (evaluate_alerts may call trigger_alert which also locks stats)
301        {
302            let mut stats = self.stats.write();
303            stats.total_datapoints += 1;
304        } // Lock is released here
305
306        // Evaluate alert rules
307        self.evaluate_alerts(&metric_id, value)?;
308
309        Ok(())
310    }
311
312    /// Create an alert rule.
313    pub fn create_alert_rule(&self, rule: AlertRule) -> Result<AlertId> {
314        let id = rule.id;
315        self.alert_rules.insert(id, rule);
316        Ok(id)
317    }
318
319    /// Evaluate alert rules for a metric.
320    fn evaluate_alerts(&self, metric_id: &MetricId, value: f64) -> Result<()> {
321        for entry in self.alert_rules.iter() {
322            let rule = entry.value();
323
324            if !rule.enabled || rule.metric != *metric_id {
325                continue;
326            }
327
328            let triggered = match rule.condition {
329                AlertCondition::GreaterThan => value > rule.threshold,
330                AlertCondition::LessThan => value < rule.threshold,
331                AlertCondition::Equal => (value - rule.threshold).abs() < 0.001,
332                AlertCondition::RateOfChange { .. } => false, // Simplified
333            };
334
335            if triggered && !self.active_alerts.contains_key(&rule.id) {
336                self.trigger_alert(rule.id, value)?;
337            } else if !triggered && self.active_alerts.contains_key(&rule.id) {
338                self.resolve_alert(rule.id)?;
339            }
340        }
341
342        Ok(())
343    }
344
345    fn trigger_alert(&self, rule_id: AlertId, value: f64) -> Result<()> {
346        let rule = self
347            .alert_rules
348            .get(&rule_id)
349            .ok_or_else(|| ClusterError::AlertNotFound(rule_id.to_string()))?;
350
351        let alert = Alert {
352            id: uuid::Uuid::new_v4(),
353            rule_id,
354            triggered_at: SystemTime::now(),
355            resolved_at: None,
356            severity: rule.severity,
357            message: format!("Alert triggered: {} (value: {})", rule.name, value),
358            value,
359        };
360
361        info!("Alert triggered: {} - {}", rule.name, alert.message);
362
363        // Send notifications
364        for channel in &rule.notify {
365            self.send_notification(channel, &alert)?;
366        }
367
368        self.active_alerts.insert(rule_id, alert.clone());
369        self.alert_history.write().push_back(alert);
370
371        let mut stats = self.stats.write();
372        stats.total_alerts += 1;
373        stats.active_alerts = self.active_alerts.len();
374
375        Ok(())
376    }
377
378    fn resolve_alert(&self, rule_id: AlertId) -> Result<()> {
379        if let Some((_, mut alert)) = self.active_alerts.remove(&rule_id) {
380            alert.resolved_at = Some(SystemTime::now());
381            info!("Alert resolved: {}", alert.message);
382
383            let mut stats = self.stats.write();
384            stats.active_alerts = self.active_alerts.len();
385        }
386
387        Ok(())
388    }
389
390    fn send_notification(&self, channel: &NotificationChannel, alert: &Alert) -> Result<()> {
391        match channel {
392            NotificationChannel::Email { address } => {
393                debug!("Would send email to {}: {}", address, alert.message);
394            }
395            NotificationChannel::Webhook { url } => {
396                debug!("Would send webhook to {}: {}", url, alert.message);
397            }
398            NotificationChannel::Slack { webhook_url } => {
399                debug!(
400                    "Would send Slack notification to {}: {}",
401                    webhook_url, alert.message
402                );
403            }
404            NotificationChannel::PagerDuty { integration_key } => {
405                debug!(
406                    "Would trigger PagerDuty with key {}: {}",
407                    integration_key, alert.message
408                );
409            }
410        }
411        Ok(())
412    }
413
414    /// Get metric series.
415    pub fn get_metric(&self, metric_id: &MetricId) -> Option<Vec<DataPoint>> {
416        self.metrics
417            .get(metric_id)
418            .map(|s| s.read().datapoints.iter().cloned().collect())
419    }
420
421    /// Get active alerts.
422    pub fn get_active_alerts(&self) -> Vec<Alert> {
423        self.active_alerts
424            .iter()
425            .map(|e| e.value().clone())
426            .collect()
427    }
428
429    /// Get alert history.
430    pub fn get_alert_history(&self, limit: usize) -> Vec<Alert> {
431        let history = self.alert_history.read();
432        history.iter().rev().take(limit).cloned().collect()
433    }
434
435    /// Get monitoring statistics.
436    pub fn get_stats(&self) -> MonitoringStats {
437        self.stats.read().clone()
438    }
439}
440
441impl Default for MonitoringManager {
442    fn default() -> Self {
443        Self::new()
444    }
445}
446
447#[cfg(test)]
448#[allow(clippy::expect_used, clippy::unwrap_used)]
449mod tests {
450    use super::*;
451
452    #[test]
453    fn test_metric_registration() {
454        let manager = MonitoringManager::new();
455        let result = manager.register_metric("test_metric".to_string(), MetricType::Gauge);
456        assert!(result.is_ok());
457
458        let stats = manager.get_stats();
459        assert_eq!(stats.total_metrics, 1);
460    }
461
462    #[test]
463    fn test_metric_recording() {
464        let manager = MonitoringManager::new();
465        manager
466            .register_metric("cpu_usage".to_string(), MetricType::Gauge)
467            .ok();
468
469        let mut labels = HashMap::new();
470        labels.insert("host".to_string(), "worker1".to_string());
471
472        manager
473            .record_metric("cpu_usage".to_string(), 0.75, labels)
474            .ok();
475
476        let datapoints = manager.get_metric(&"cpu_usage".to_string());
477        assert!(datapoints.is_some());
478        assert_eq!(
479            datapoints
480                .expect("datapoints should be present for recorded metric")
481                .len(),
482            1
483        );
484    }
485
486    #[test]
487    fn test_alert_rule() {
488        let manager = MonitoringManager::new();
489        manager
490            .register_metric("cpu_usage".to_string(), MetricType::Gauge)
491            .expect("Failed to register metric");
492
493        // Use a short duration for test - the duration field is metadata
494        // and not currently enforced in the alert evaluation logic
495        let rule = AlertRule {
496            id: uuid::Uuid::new_v4(),
497            name: "High CPU".to_string(),
498            metric: "cpu_usage".to_string(),
499            condition: AlertCondition::GreaterThan,
500            threshold: 0.8,
501            duration: Duration::from_millis(100), // Reduced from 60s to 100ms for fast testing
502            severity: AlertSeverity::Warning,
503            enabled: true,
504            notify: vec![],
505        };
506
507        manager
508            .create_alert_rule(rule.clone())
509            .expect("Failed to create alert rule");
510
511        // Trigger alert with high CPU
512        manager
513            .record_metric("cpu_usage".to_string(), 0.9, HashMap::new())
514            .expect("Failed to record metric");
515
516        let alerts = manager.get_active_alerts();
517        assert!(!alerts.is_empty());
518    }
519
520    #[test]
521    fn test_anomaly_detection() {
522        let mut detector = AnomalyDetector::new(3.0, 100);
523
524        // Record normal values
525        for i in 0..50 {
526            detector.record("metric1".to_string(), 100.0 + (i as f64 % 10.0));
527        }
528
529        // Normal value should not be anomaly
530        assert!(!detector.detect_anomaly(&"metric1".to_string(), 105.0));
531
532        // Abnormal value should be anomaly
533        assert!(detector.detect_anomaly(&"metric1".to_string(), 500.0));
534    }
535}