use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::{mpsc, RwLock};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertRule {
pub id: String,
pub name: String,
pub description: String,
pub metric: String,
pub condition: AlertCondition,
pub threshold: f64,
pub duration: Duration,
pub severity: AlertSeverity,
pub enabled: bool,
pub notification_channels: Vec<NotificationChannel>,
pub cooldown_period: Duration,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum AlertCondition {
GreaterThan,
LessThan,
Equal,
NotEqual,
GreaterThanOrEqual,
LessThanOrEqual,
RateOfChange,
Anomaly,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum AlertSeverity {
Info,
Warning,
Error,
Critical,
}
impl AlertSeverity {
#[must_use]
pub fn priority(&self) -> u8 {
match self {
AlertSeverity::Info => 1,
AlertSeverity::Warning => 2,
AlertSeverity::Error => 3,
AlertSeverity::Critical => 4,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum NotificationChannel {
Dashboard,
Email {
recipients: Vec<String>,
},
Webhook {
url: String,
method: String,
},
Slack {
webhook_url: String,
channel: String,
},
PagerDuty {
integration_key: String,
},
Log {
level: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Alert {
pub id: String,
pub rule_id: String,
pub rule_name: String,
pub severity: AlertSeverity,
pub state: AlertState,
pub triggered_at: SystemTime,
pub resolved_at: Option<SystemTime>,
pub metric_value: f64,
pub threshold_value: f64,
pub message: String,
pub context: HashMap<String, String>,
pub notification_sent: bool,
pub acknowledgement: Option<Acknowledgement>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum AlertState {
Triggered,
Active,
Acknowledged,
Resolved,
Silenced,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Acknowledgement {
pub acknowledged_by: String,
pub acknowledged_at: SystemTime,
pub comment: Option<String>,
}
pub struct AlertManager {
rules: Arc<RwLock<HashMap<String, AlertRule>>>,
active_alerts: Arc<RwLock<HashMap<String, Alert>>>,
alert_history: Arc<RwLock<VecDeque<Alert>>>,
metric_values: Arc<RwLock<HashMap<String, MetricValue>>>,
notification_tx: mpsc::UnboundedSender<Alert>,
#[allow(dead_code)]
notification_rx: Arc<RwLock<mpsc::UnboundedReceiver<Alert>>>,
statistics: Arc<RwLock<AlertStatistics>>,
config: AlertManagerConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertManagerConfig {
pub max_active_alerts: usize,
pub max_history_size: usize,
#[serde(with = "serde_duration")]
pub evaluation_interval: Duration,
#[serde(with = "serde_duration")]
pub default_cooldown: Duration,
pub enable_auto_resolve: bool,
pub silence_duplicate_alerts: bool,
}
mod serde_duration {
use serde::{Deserialize, Deserializer, Serializer};
use std::time::Duration;
pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_u64(duration.as_secs())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let secs = u64::deserialize(deserializer)?;
Ok(Duration::from_secs(secs))
}
}
impl Default for AlertManagerConfig {
fn default() -> Self {
Self {
max_active_alerts: 100,
max_history_size: 1000,
evaluation_interval: Duration::from_secs(10),
default_cooldown: Duration::from_secs(300),
enable_auto_resolve: true,
silence_duplicate_alerts: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricValue {
pub value: f64,
pub timestamp: SystemTime,
pub tags: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct AlertStatistics {
pub total_triggered: u64,
pub total_resolved: u64,
pub total_acknowledged: u64,
pub alerts_by_severity: HashMap<AlertSeverity, u64>,
pub mean_time_to_acknowledge_ms: f64,
pub mean_time_to_resolve_ms: f64,
pub false_positive_rate: f64,
}
impl AlertManager {
#[must_use]
pub fn new(config: AlertManagerConfig) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
Self {
rules: Arc::new(RwLock::new(HashMap::new())),
active_alerts: Arc::new(RwLock::new(HashMap::new())),
alert_history: Arc::new(RwLock::new(VecDeque::new())),
metric_values: Arc::new(RwLock::new(HashMap::new())),
notification_tx: tx,
notification_rx: Arc::new(RwLock::new(rx)),
statistics: Arc::new(RwLock::new(AlertStatistics::default())),
config,
}
}
pub async fn add_rule(&self, rule: AlertRule) -> Result<()> {
let mut rules = self.rules.write().await;
rules.insert(rule.id.clone(), rule);
Ok(())
}
pub async fn remove_rule(&self, rule_id: &str) -> Result<()> {
let mut rules = self.rules.write().await;
rules.remove(rule_id);
let mut active = self.active_alerts.write().await;
let alerts_to_resolve: Vec<String> = active
.iter()
.filter(|(_, alert)| alert.rule_id == rule_id)
.map(|(id, _)| id.clone())
.collect();
for alert_id in alerts_to_resolve {
if let Some(mut alert) = active.remove(&alert_id) {
alert.state = AlertState::Resolved;
alert.resolved_at = Some(SystemTime::now());
self.add_to_history(alert).await;
}
}
Ok(())
}
pub async fn update_metric(&self, metric_name: String, value: f64) -> Result<()> {
let mut metrics = self.metric_values.write().await;
metrics.insert(
metric_name.clone(),
MetricValue {
value,
timestamp: SystemTime::now(),
tags: HashMap::new(),
},
);
self.evaluate_rules_for_metric(&metric_name).await?;
Ok(())
}
async fn evaluate_rules_for_metric(&self, metric_name: &str) -> Result<()> {
let rules = self.rules.read().await;
let metrics = self.metric_values.read().await;
if let Some(metric_value) = metrics.get(metric_name) {
for rule in rules
.values()
.filter(|r| r.metric == metric_name && r.enabled)
{
self.evaluate_rule(rule, metric_value).await?;
}
}
Ok(())
}
async fn evaluate_rule(&self, rule: &AlertRule, metric: &MetricValue) -> Result<()> {
let should_trigger = match rule.condition {
AlertCondition::GreaterThan => metric.value > rule.threshold,
AlertCondition::LessThan => metric.value < rule.threshold,
AlertCondition::Equal => (metric.value - rule.threshold).abs() < f64::EPSILON,
AlertCondition::NotEqual => (metric.value - rule.threshold).abs() >= f64::EPSILON,
AlertCondition::GreaterThanOrEqual => metric.value >= rule.threshold,
AlertCondition::LessThanOrEqual => metric.value <= rule.threshold,
AlertCondition::RateOfChange => {
false }
AlertCondition::Anomaly => {
false }
};
if should_trigger {
self.trigger_alert(rule, metric.value).await?;
} else if self.config.enable_auto_resolve {
self.auto_resolve_alert(&rule.id).await?;
}
Ok(())
}
async fn trigger_alert(&self, rule: &AlertRule, metric_value: f64) -> Result<()> {
let mut active = self.active_alerts.write().await;
let existing = active.values().any(|a| {
a.rule_id == rule.id && matches!(a.state, AlertState::Triggered | AlertState::Active)
});
if existing && self.config.silence_duplicate_alerts {
return Ok(());
}
if let Some(last_alert) = self.get_last_alert_for_rule(&rule.id).await {
if let Some(resolved_at) = last_alert.resolved_at {
let elapsed = SystemTime::now()
.duration_since(resolved_at)
.unwrap_or(Duration::ZERO);
if elapsed < rule.cooldown_period {
return Ok(()); }
}
}
let alert = Alert {
id: format!(
"alert_{}_{}",
rule.id,
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis()
),
rule_id: rule.id.clone(),
rule_name: rule.name.clone(),
severity: rule.severity.clone(),
state: AlertState::Triggered,
triggered_at: SystemTime::now(),
resolved_at: None,
metric_value,
threshold_value: rule.threshold,
message: format!(
"Alert: {} - {} {} {} (value: {:.2}, threshold: {:.2})",
rule.name,
rule.metric,
format!("{:?}", rule.condition).to_lowercase(),
rule.threshold,
metric_value,
rule.threshold
),
context: HashMap::new(),
notification_sent: false,
acknowledgement: None,
};
active.insert(alert.id.clone(), alert.clone());
let _ = self.notification_tx.send(alert.clone());
let mut stats = self.statistics.write().await;
stats.total_triggered += 1;
*stats
.alerts_by_severity
.entry(rule.severity.clone())
.or_insert(0) += 1;
Ok(())
}
async fn auto_resolve_alert(&self, rule_id: &str) -> Result<()> {
let mut active = self.active_alerts.write().await;
let alerts_to_resolve: Vec<String> = active
.iter()
.filter(|(_, alert)| {
alert.rule_id == rule_id
&& matches!(alert.state, AlertState::Triggered | AlertState::Active)
})
.map(|(id, _)| id.clone())
.collect();
for alert_id in alerts_to_resolve {
if let Some(mut alert) = active.remove(&alert_id) {
alert.state = AlertState::Resolved;
alert.resolved_at = Some(SystemTime::now());
let mut stats = self.statistics.write().await;
stats.total_resolved += 1;
if let (Some(resolved), triggered) = (alert.resolved_at, alert.triggered_at) {
let duration = resolved
.duration_since(triggered)
.unwrap_or(Duration::ZERO)
.as_millis() as f64;
stats.mean_time_to_resolve_ms = (stats.mean_time_to_resolve_ms
* (stats.total_resolved - 1) as f64
+ duration)
/ stats.total_resolved as f64;
}
self.add_to_history(alert).await;
}
}
Ok(())
}
pub async fn acknowledge_alert(
&self,
alert_id: &str,
acknowledged_by: String,
comment: Option<String>,
) -> Result<()> {
let mut active = self.active_alerts.write().await;
if let Some(alert) = active.get_mut(alert_id) {
alert.state = AlertState::Acknowledged;
alert.acknowledgement = Some(Acknowledgement {
acknowledged_by,
acknowledged_at: SystemTime::now(),
comment,
});
let mut stats = self.statistics.write().await;
stats.total_acknowledged += 1;
if let Some(ack) = &alert.acknowledgement {
let duration = ack
.acknowledged_at
.duration_since(alert.triggered_at)
.unwrap_or(Duration::ZERO)
.as_millis() as f64;
stats.mean_time_to_acknowledge_ms = (stats.mean_time_to_acknowledge_ms
* (stats.total_acknowledged - 1) as f64
+ duration)
/ stats.total_acknowledged as f64;
}
}
Ok(())
}
pub async fn silence_alert(&self, alert_id: &str, _duration: Duration) -> Result<()> {
let mut active = self.active_alerts.write().await;
if let Some(alert) = active.get_mut(alert_id) {
alert.state = AlertState::Silenced;
}
Ok(())
}
async fn get_last_alert_for_rule(&self, rule_id: &str) -> Option<Alert> {
let history = self.alert_history.read().await;
history.iter().rev().find(|a| a.rule_id == rule_id).cloned()
}
async fn add_to_history(&self, alert: Alert) {
let mut history = self.alert_history.write().await;
history.push_back(alert);
while history.len() > self.config.max_history_size {
history.pop_front();
}
}
pub async fn get_active_alerts(&self) -> Vec<Alert> {
let active = self.active_alerts.read().await;
active.values().cloned().collect()
}
pub async fn get_alerts_by_severity(&self, severity: AlertSeverity) -> Vec<Alert> {
let active = self.active_alerts.read().await;
active
.values()
.filter(|a| a.severity == severity)
.cloned()
.collect()
}
pub async fn get_statistics(&self) -> AlertStatistics {
self.statistics.read().await.clone()
}
pub async fn export_config(&self) -> AlertConfiguration {
let rules = self.rules.read().await;
AlertConfiguration {
rules: rules.values().cloned().collect(),
config: self.config.clone(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertConfiguration {
pub rules: Vec<AlertRule>,
pub config: AlertManagerConfig,
}
#[must_use]
pub fn default_tdg_alert_rules() -> Vec<AlertRule> {
vec![
AlertRule {
id: "high_cpu".to_string(),
name: "High CPU Usage".to_string(),
description: "CPU usage exceeds critical threshold".to_string(),
metric: "cpu_usage_percent".to_string(),
condition: AlertCondition::GreaterThan,
threshold: 90.0,
duration: Duration::from_secs(60),
severity: AlertSeverity::Critical,
enabled: true,
notification_channels: vec![NotificationChannel::Dashboard],
cooldown_period: Duration::from_secs(300),
metadata: HashMap::new(),
},
AlertRule {
id: "high_memory".to_string(),
name: "High Memory Usage".to_string(),
description: "Memory usage exceeds warning threshold".to_string(),
metric: "memory_usage_mb".to_string(),
condition: AlertCondition::GreaterThan,
threshold: 8192.0,
duration: Duration::from_secs(120),
severity: AlertSeverity::Warning,
enabled: true,
notification_channels: vec![NotificationChannel::Dashboard],
cooldown_period: Duration::from_secs(600),
metadata: HashMap::new(),
},
AlertRule {
id: "slow_analysis".to_string(),
name: "Slow Analysis Time".to_string(),
description: "Analysis taking longer than expected".to_string(),
metric: "avg_analysis_time_ms".to_string(),
condition: AlertCondition::GreaterThan,
threshold: 5000.0,
duration: Duration::from_secs(30),
severity: AlertSeverity::Warning,
enabled: true,
notification_channels: vec![NotificationChannel::Dashboard],
cooldown_period: Duration::from_secs(180),
metadata: HashMap::new(),
},
AlertRule {
id: "low_cache_hit".to_string(),
name: "Low Cache Hit Ratio".to_string(),
description: "Cache hit ratio below optimal level".to_string(),
metric: "cache_hit_ratio".to_string(),
condition: AlertCondition::LessThan,
threshold: 0.7,
duration: Duration::from_secs(300),
severity: AlertSeverity::Info,
enabled: true,
notification_channels: vec![NotificationChannel::Dashboard],
cooldown_period: Duration::from_secs(900),
metadata: HashMap::new(),
},
]
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_alert_triggering() {
let manager = AlertManager::new(AlertManagerConfig::default());
let rule = AlertRule {
id: "test_rule".to_string(),
name: "Test Alert".to_string(),
description: "Test alert rule".to_string(),
metric: "test_metric".to_string(),
condition: AlertCondition::GreaterThan,
threshold: 50.0,
duration: Duration::from_secs(10),
severity: AlertSeverity::Warning,
enabled: true,
notification_channels: vec![],
cooldown_period: Duration::from_secs(60),
metadata: HashMap::new(),
};
manager.add_rule(rule).await.unwrap();
manager
.update_metric("test_metric".to_string(), 75.0)
.await
.unwrap();
let active = manager.get_active_alerts().await;
assert_eq!(active.len(), 1);
assert_eq!(active[0].severity, AlertSeverity::Warning);
}
#[tokio::test]
async fn test_alert_acknowledgement() {
let manager = AlertManager::new(AlertManagerConfig::default());
let rule = AlertRule {
id: "ack_test".to_string(),
name: "Ack Test".to_string(),
description: "Test acknowledgement".to_string(),
metric: "test_metric".to_string(),
condition: AlertCondition::GreaterThan,
threshold: 10.0,
duration: Duration::from_secs(5),
severity: AlertSeverity::Error,
enabled: true,
notification_channels: vec![],
cooldown_period: Duration::from_secs(30),
metadata: HashMap::new(),
};
manager.add_rule(rule).await.unwrap();
manager
.update_metric("test_metric".to_string(), 20.0)
.await
.unwrap();
let active = manager.get_active_alerts().await;
assert!(!active.is_empty());
let alert_id = &active[0].id;
manager
.acknowledge_alert(
alert_id,
"test_user".to_string(),
Some("Acknowledged for testing".to_string()),
)
.await
.unwrap();
let updated = manager.get_active_alerts().await;
assert_eq!(updated[0].state, AlertState::Acknowledged);
assert!(updated[0].acknowledgement.is_some());
}
#[tokio::test]
async fn test_auto_resolve() {
let mut config = AlertManagerConfig::default();
config.enable_auto_resolve = true;
let manager = AlertManager::new(config);
let rule = AlertRule {
id: "resolve_test".to_string(),
name: "Resolve Test".to_string(),
description: "Test auto-resolve".to_string(),
metric: "test_metric".to_string(),
condition: AlertCondition::GreaterThan,
threshold: 30.0,
duration: Duration::from_secs(1),
severity: AlertSeverity::Warning,
enabled: true,
notification_channels: vec![],
cooldown_period: Duration::from_secs(10),
metadata: HashMap::new(),
};
manager.add_rule(rule).await.unwrap();
manager
.update_metric("test_metric".to_string(), 50.0)
.await
.unwrap();
assert_eq!(manager.get_active_alerts().await.len(), 1);
manager
.update_metric("test_metric".to_string(), 20.0)
.await
.unwrap();
assert_eq!(manager.get_active_alerts().await.len(), 0);
let stats = manager.get_statistics().await;
assert_eq!(stats.total_resolved, 1);
}
}
#[cfg(test)]
mod property_tests {
use proptest::prelude::*;
proptest! {
#[test]
fn basic_property_stability(_input in ".*") {
prop_assert!(true);
}
#[test]
fn module_consistency_check(_x in 0u32..1000) {
prop_assert!(_x < 1001);
}
}
}