use super::channels::{NotificationChannel, SlackChannel};
use super::types::{AlertRule, AlertStats, AlertStorage};
use crate::config::models::file_storage::AlertingConfig;
use crate::monitoring::types::{Alert, AlertSeverity};
use crate::utils::error::gateway_error::Result;
use parking_lot::{Mutex, RwLock};
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::RwLock as TokioRwLock;
use tracing::{debug, info};
#[derive(Debug)]
pub struct AlertManager {
config: AlertingConfig,
pub(super) storage: Arc<RwLock<AlertStorage>>,
pub(super) pending_alerts: Arc<Mutex<VecDeque<Alert>>>,
pub(super) notification_channels: Arc<TokioRwLock<Vec<Box<dyn NotificationChannel>>>>,
pub(super) active: AtomicBool,
}
impl AlertManager {
pub async fn new(config: &AlertingConfig) -> Result<Self> {
let mut notification_channels: Vec<Box<dyn NotificationChannel>> = Vec::new();
if let Some(webhook_url) = &config.slack_webhook {
notification_channels.push(Box::new(SlackChannel::new(
webhook_url.clone(),
None,
Some("Gateway Alert".to_string()),
AlertSeverity::Info,
)));
}
Ok(Self {
config: config.clone(),
storage: Arc::new(RwLock::new(AlertStorage::default())),
pending_alerts: Arc::new(Mutex::new(VecDeque::new())),
notification_channels: Arc::new(TokioRwLock::new(notification_channels)),
active: AtomicBool::new(false),
})
}
pub async fn start(&self) -> Result<()> {
info!("Starting alert manager");
self.active.store(true, Ordering::Release);
self.start_alert_processing().await;
self.start_rule_evaluation().await;
Ok(())
}
pub async fn stop(&self) -> Result<()> {
info!("Stopping alert manager");
self.active.store(false, Ordering::Release);
Ok(())
}
#[inline]
pub fn is_active(&self) -> bool {
self.active.load(Ordering::Acquire)
}
pub async fn send_alert(&self, alert: Alert) -> Result<()> {
debug!("Queuing alert: {} - {}", alert.severity, alert.title);
{
self.pending_alerts.lock().push_back(alert.clone());
}
{
let mut storage = self.storage.write();
storage.stats.total_alerts += 1;
*storage
.stats
.alerts_by_severity
.entry(format!("{:?}", alert.severity))
.or_insert(0) += 1;
*storage
.stats
.alerts_by_source
.entry(alert.source.clone())
.or_insert(0) += 1;
storage.stats.last_alert = Some(alert.timestamp);
storage.history.push_back(alert);
if storage.history.len() > 1000 {
storage.history.pop_front();
}
}
Ok(())
}
pub async fn process_pending(&self) -> Result<()> {
let mut alerts_to_process = Vec::new();
{
let mut pending = self.pending_alerts.lock();
while let Some(alert) = pending.pop_front() {
alerts_to_process.push(alert);
}
}
for alert in alerts_to_process {
if let Err(e) = self.process_alert(&alert).await {
tracing::error!("Failed to process alert {}: {}", alert.id, e);
self.storage.write().stats.failed_notifications += 1;
}
}
Ok(())
}
pub async fn add_rule(&self, rule: AlertRule) -> Result<()> {
info!("Adding alert rule: {}", rule.name);
self.storage.write().rules.insert(rule.id.clone(), rule);
Ok(())
}
pub async fn remove_rule(&self, rule_id: &str) -> Result<()> {
info!("Removing alert rule: {}", rule_id);
self.storage.write().rules.remove(rule_id);
Ok(())
}
pub async fn get_stats(&self) -> AlertStats {
self.storage.read().stats.clone()
}
pub async fn get_history(&self, limit: Option<usize>) -> Vec<Alert> {
let storage = self.storage.read();
let limit = limit.unwrap_or(100);
storage.history.iter().rev().take(limit).cloned().collect()
}
}
impl Clone for AlertManager {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
storage: self.storage.clone(),
pending_alerts: self.pending_alerts.clone(),
notification_channels: self.notification_channels.clone(),
active: AtomicBool::new(self.active.load(Ordering::Acquire)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use uuid::Uuid;
fn default_alerting_config() -> AlertingConfig {
AlertingConfig {
enabled: true,
slack_webhook: None,
email: None,
}
}
fn create_test_alert(severity: AlertSeverity, title: &str) -> Alert {
Alert {
id: Uuid::new_v4().to_string(),
title: title.to_string(),
description: "Test description".to_string(),
severity,
source: "test_source".to_string(),
timestamp: Utc::now(),
metadata: serde_json::json!({}),
resolved: false,
}
}
#[tokio::test]
async fn test_alert_manager_new() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
assert!(!manager.is_active());
}
#[tokio::test]
async fn test_alert_manager_with_slack_webhook() {
let config = AlertingConfig {
enabled: true,
slack_webhook: Some("https://hooks.slack.com/test".to_string()),
email: None,
};
let manager = AlertManager::new(&config).await.unwrap();
let channels = manager.notification_channels.read().await;
assert_eq!(channels.len(), 1);
}
#[tokio::test]
async fn test_alert_manager_no_channels() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
let channels = manager.notification_channels.read().await;
assert!(channels.is_empty());
}
#[tokio::test]
async fn test_alert_manager_start() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
assert!(!manager.is_active());
manager.start().await.unwrap();
assert!(manager.is_active());
}
#[tokio::test]
async fn test_alert_manager_stop() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
manager.start().await.unwrap();
assert!(manager.is_active());
manager.stop().await.unwrap();
assert!(!manager.is_active());
}
#[tokio::test]
async fn test_alert_manager_start_stop_cycle() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
for _ in 0..3 {
manager.start().await.unwrap();
assert!(manager.is_active());
manager.stop().await.unwrap();
assert!(!manager.is_active());
}
}
#[tokio::test]
async fn test_send_alert() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
let alert = create_test_alert(AlertSeverity::Warning, "Test Alert");
manager.send_alert(alert.clone()).await.unwrap();
let stats = manager.get_stats().await;
assert_eq!(stats.total_alerts, 1);
let history = manager.get_history(None).await;
assert_eq!(history.len(), 1);
assert_eq!(history[0].title, "Test Alert");
}
#[tokio::test]
async fn test_send_multiple_alerts() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
for i in 0..5 {
let alert = create_test_alert(AlertSeverity::Info, &format!("Alert {}", i));
manager.send_alert(alert).await.unwrap();
}
let stats = manager.get_stats().await;
assert_eq!(stats.total_alerts, 5);
let history = manager.get_history(None).await;
assert_eq!(history.len(), 5);
}
#[tokio::test]
async fn test_send_alerts_different_severities() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
let severities = [
AlertSeverity::Info,
AlertSeverity::Warning,
AlertSeverity::Critical,
AlertSeverity::Emergency,
];
for severity in severities {
let alert = create_test_alert(severity, "Test");
manager.send_alert(alert).await.unwrap();
}
let stats = manager.get_stats().await;
assert_eq!(stats.total_alerts, 4);
assert_eq!(stats.alerts_by_severity.len(), 4);
}
#[tokio::test]
async fn test_send_alerts_different_sources() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
let sources = ["api", "worker", "scheduler", "database"];
for source in sources {
let mut alert = create_test_alert(AlertSeverity::Info, "Test");
alert.source = source.to_string();
manager.send_alert(alert).await.unwrap();
}
let stats = manager.get_stats().await;
assert_eq!(stats.alerts_by_source.len(), 4);
}
#[tokio::test]
async fn test_get_history_default_limit() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
for i in 0..150 {
let alert = create_test_alert(AlertSeverity::Info, &format!("Alert {}", i));
manager.send_alert(alert).await.unwrap();
}
let history = manager.get_history(None).await;
assert_eq!(history.len(), 100);
}
#[tokio::test]
async fn test_get_history_custom_limit() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
for i in 0..50 {
let alert = create_test_alert(AlertSeverity::Info, &format!("Alert {}", i));
manager.send_alert(alert).await.unwrap();
}
let history = manager.get_history(Some(10)).await;
assert_eq!(history.len(), 10);
}
#[tokio::test]
async fn test_get_history_returns_recent_first() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
for i in 0..5 {
let alert = create_test_alert(AlertSeverity::Info, &format!("Alert {}", i));
manager.send_alert(alert).await.unwrap();
}
let history = manager.get_history(Some(5)).await;
assert_eq!(history[0].title, "Alert 4");
}
#[tokio::test]
async fn test_add_rule() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
let rule = AlertRule {
id: "rule-1".to_string(),
name: "High CPU".to_string(),
description: "CPU usage too high".to_string(),
metric: "cpu.usage".to_string(),
threshold: 90.0,
operator: super::super::types::ComparisonOperator::GreaterThan,
severity: AlertSeverity::Warning,
interval: std::time::Duration::from_secs(60),
enabled: true,
channels: vec!["slack".to_string()],
};
manager.add_rule(rule).await.unwrap();
let storage = manager.storage.read();
assert_eq!(storage.rules.len(), 1);
assert!(storage.rules.contains_key("rule-1"));
}
#[tokio::test]
async fn test_add_multiple_rules() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
for i in 0..5 {
let rule = AlertRule {
id: format!("rule-{}", i),
name: format!("Rule {}", i),
description: "Test rule".to_string(),
metric: "test.metric".to_string(),
threshold: i as f64 * 10.0,
operator: super::super::types::ComparisonOperator::GreaterThan,
severity: AlertSeverity::Info,
interval: std::time::Duration::from_secs(60),
enabled: true,
channels: vec![],
};
manager.add_rule(rule).await.unwrap();
}
let storage = manager.storage.read();
assert_eq!(storage.rules.len(), 5);
}
#[tokio::test]
async fn test_remove_rule() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
let rule = AlertRule {
id: "rule-to-remove".to_string(),
name: "Temporary Rule".to_string(),
description: "".to_string(),
metric: "test".to_string(),
threshold: 50.0,
operator: super::super::types::ComparisonOperator::GreaterThan,
severity: AlertSeverity::Info,
interval: std::time::Duration::from_secs(60),
enabled: true,
channels: vec![],
};
manager.add_rule(rule).await.unwrap();
assert_eq!(manager.storage.read().rules.len(), 1);
manager.remove_rule("rule-to-remove").await.unwrap();
assert_eq!(manager.storage.read().rules.len(), 0);
}
#[tokio::test]
async fn test_remove_nonexistent_rule() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
let result = manager.remove_rule("nonexistent").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_get_stats_empty() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
let stats = manager.get_stats().await;
assert_eq!(stats.total_alerts, 0);
assert_eq!(stats.failed_notifications, 0);
assert!(stats.alerts_by_severity.is_empty());
assert!(stats.alerts_by_source.is_empty());
assert!(stats.last_alert.is_none());
}
#[tokio::test]
async fn test_get_stats_after_alerts() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
let alert = create_test_alert(AlertSeverity::Critical, "Critical Alert");
manager.send_alert(alert).await.unwrap();
let stats = manager.get_stats().await;
assert_eq!(stats.total_alerts, 1);
assert!(stats.last_alert.is_some());
assert_eq!(stats.alerts_by_severity.get("Critical"), Some(&1));
}
#[tokio::test]
async fn test_alert_manager_clone() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
manager.start().await.unwrap();
let cloned = manager.clone();
assert!(cloned.is_active());
}
#[tokio::test]
async fn test_alert_manager_clone_shares_storage() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
let cloned = manager.clone();
let alert = create_test_alert(AlertSeverity::Info, "Shared Alert");
manager.send_alert(alert).await.unwrap();
let stats = cloned.get_stats().await;
assert_eq!(stats.total_alerts, 1);
}
#[tokio::test]
async fn test_process_pending_empty() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
let result = manager.process_pending().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_process_pending_with_alerts() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
let alert = create_test_alert(AlertSeverity::Warning, "Pending Alert");
manager.send_alert(alert).await.unwrap();
let result = manager.process_pending().await;
assert!(result.is_ok());
let pending = manager.pending_alerts.lock();
assert!(pending.is_empty());
}
#[tokio::test]
async fn test_alert_manager_debug() {
let config = default_alerting_config();
let manager = AlertManager::new(&config).await.unwrap();
let debug_str = format!("{:?}", manager);
assert!(debug_str.contains("AlertManager"));
}
}