use chrono::{DateTime, Utc};
use lettre::{
message::header::ContentType, transport::smtp::authentication::Credentials, AsyncSmtpTransport,
AsyncTransport, Message, Tokio1Executor,
};
use serde::{Deserialize, Serialize};
use slack_hook2::{PayloadBuilder, Slack};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::RwLock;
#[derive(Debug, Error)]
pub enum AlertingError {
#[error("Configuration error: {0}")]
ConfigError(String),
#[error("Alert channel error: {0}")]
ChannelError(String),
#[error("Failed to send email: {0}")]
EmailError(String),
#[error("Failed to send Slack message: {0}")]
SlackError(String),
#[error("Webhook error: {0}")]
WebhookError(String),
#[error("Template error: {0}")]
TemplateError(String),
#[error("Alerting error: {0}")]
Other(String),
}
pub type Result<T> = std::result::Result<T, AlertingError>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum AlertSeverity {
Info,
Warning,
Error,
Critical,
}
impl AlertSeverity {
pub fn as_str(&self) -> &'static str {
match self {
Self::Info => "INFO",
Self::Warning => "WARNING",
Self::Error => "ERROR",
Self::Critical => "CRITICAL",
}
}
pub fn emoji(&self) -> &'static str {
match self {
Self::Info => ":information_source:",
Self::Warning => ":warning:",
Self::Error => ":x:",
Self::Critical => ":rotating_light:",
}
}
pub fn color(&self) -> &'static str {
match self {
Self::Info => "#36a64f", Self::Warning => "#ff9900", Self::Error => "#ff0000", Self::Critical => "#8b0000", }
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum AlertCategory {
NodeHealth,
Consensus,
Replication,
Performance,
Security,
Storage,
Network,
Custom(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Alert {
pub id: String,
pub severity: AlertSeverity,
pub category: AlertCategory,
pub title: String,
pub message: String,
pub metadata: HashMap<String, String>,
pub timestamp: DateTime<Utc>,
pub node_id: Option<u64>,
pub acknowledged: bool,
pub acknowledged_at: Option<DateTime<Utc>>,
}
impl Alert {
pub fn new(
severity: AlertSeverity,
category: AlertCategory,
title: impl Into<String>,
message: impl Into<String>,
) -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
severity,
category,
title: title.into(),
message: message.into(),
metadata: HashMap::new(),
timestamp: Utc::now(),
node_id: None,
acknowledged: false,
acknowledged_at: None,
}
}
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
pub fn with_node_id(mut self, node_id: u64) -> Self {
self.node_id = Some(node_id);
self
}
pub fn acknowledge(&mut self) {
self.acknowledged = true;
self.acknowledged_at = Some(Utc::now());
}
pub fn to_text(&self) -> String {
let node_info = self
.node_id
.map(|id| format!(" [Node {}]", id))
.unwrap_or_default();
let mut text = format!(
"[{}]{} {}\n\n{}\n\nTimestamp: {}",
self.severity.as_str(),
node_info,
self.title,
self.message,
self.timestamp.format("%Y-%m-%d %H:%M:%S UTC")
);
if !self.metadata.is_empty() {
text.push_str("\n\nMetadata:\n");
for (key, value) in &self.metadata {
text.push_str(&format!(" {}: {}\n", key, value));
}
}
text
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailChannelConfig {
pub smtp_server: String,
pub smtp_port: u16,
pub username: String,
pub password: String,
pub from_address: String,
pub to_addresses: Vec<String>,
pub use_tls: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SlackChannelConfig {
pub webhook_url: String,
pub channel: Option<String>,
pub username: Option<String>,
pub icon_emoji: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookChannelConfig {
pub url: String,
pub method: String,
pub headers: HashMap<String, String>,
pub timeout_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ThrottlingConfig {
pub enabled: bool,
pub window_seconds: u64,
pub max_alerts_per_window: usize,
pub cooldown_seconds: u64,
}
impl Default for ThrottlingConfig {
fn default() -> Self {
Self {
enabled: true,
window_seconds: 60,
max_alerts_per_window: 10,
cooldown_seconds: 300,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertingConfig {
pub enabled: bool,
pub email_channel: Option<EmailChannelConfig>,
pub slack_channel: Option<SlackChannelConfig>,
pub webhook_channels: Vec<WebhookChannelConfig>,
pub min_severity: AlertSeverity,
pub throttling: ThrottlingConfig,
pub max_history_size: usize,
pub enable_anomaly_detection: bool,
pub aggregation_window_seconds: u64,
}
impl Default for AlertingConfig {
fn default() -> Self {
Self {
enabled: true,
email_channel: None,
slack_channel: None,
webhook_channels: Vec::new(),
min_severity: AlertSeverity::Warning,
throttling: ThrottlingConfig::default(),
max_history_size: 1000,
enable_anomaly_detection: true,
aggregation_window_seconds: 60,
}
}
}
impl AlertingConfig {
pub fn with_email_channel(
mut self,
smtp_server: impl Into<String>,
smtp_port: u16,
username: impl Into<String>,
password: impl Into<String>,
from: impl Into<String>,
to: Vec<String>,
) -> Self {
self.email_channel = Some(EmailChannelConfig {
smtp_server: smtp_server.into(),
smtp_port,
username: username.into(),
password: password.into(),
from_address: from.into(),
to_addresses: to,
use_tls: true,
});
self
}
pub fn with_slack_channel(mut self, webhook_url: impl Into<String>) -> Self {
self.slack_channel = Some(SlackChannelConfig {
webhook_url: webhook_url.into(),
channel: None,
username: Some("OxiRS Cluster".to_string()),
icon_emoji: Some(":gear:".to_string()),
});
self
}
pub fn add_webhook(mut self, url: impl Into<String>) -> Self {
self.webhook_channels.push(WebhookChannelConfig {
url: url.into(),
method: "POST".to_string(),
headers: HashMap::new(),
timeout_ms: 5000,
});
self
}
pub fn with_min_severity(mut self, severity: AlertSeverity) -> Self {
self.min_severity = severity;
self
}
}
#[derive(Debug, Clone)]
struct AlertHistoryEntry {
alert: Alert,
#[allow(dead_code)] sent_at: DateTime<Utc>,
}
pub struct AlertingManager {
config: AlertingConfig,
email_transport: Arc<RwLock<Option<AsyncSmtpTransport<Tokio1Executor>>>>,
slack_client: Arc<RwLock<Option<Slack>>>,
alert_history: Arc<RwLock<VecDeque<AlertHistoryEntry>>>,
throttle_state: Arc<RwLock<ThrottleState>>,
running: Arc<RwLock<bool>>,
}
#[derive(Debug)]
struct ThrottleState {
alert_count: usize,
window_start: DateTime<Utc>,
in_cooldown: bool,
cooldown_start: Option<DateTime<Utc>>,
}
impl ThrottleState {
fn new() -> Self {
Self {
alert_count: 0,
window_start: Utc::now(),
in_cooldown: false,
cooldown_start: None,
}
}
}
impl AlertingManager {
pub async fn new(config: AlertingConfig) -> Result<Self> {
let email_transport = if let Some(email_config) = &config.email_channel {
let creds =
Credentials::new(email_config.username.clone(), email_config.password.clone());
let transport = if email_config.use_tls {
AsyncSmtpTransport::<Tokio1Executor>::starttls_relay(&email_config.smtp_server)
} else {
AsyncSmtpTransport::<Tokio1Executor>::relay(&email_config.smtp_server)
}
.map_err(|e| {
AlertingError::EmailError(format!("Failed to create SMTP transport: {e}"))
})?
.credentials(creds)
.port(email_config.smtp_port)
.build();
Arc::new(RwLock::new(Some(transport)))
} else {
Arc::new(RwLock::new(None))
};
let slack_client = if let Some(slack_config) = &config.slack_channel {
let client = Slack::new(&slack_config.webhook_url).map_err(|e| {
AlertingError::SlackError(format!("Failed to create Slack client: {e}"))
})?;
Arc::new(RwLock::new(Some(client)))
} else {
Arc::new(RwLock::new(None))
};
Ok(Self {
config,
email_transport,
slack_client,
alert_history: Arc::new(RwLock::new(VecDeque::new())),
throttle_state: Arc::new(RwLock::new(ThrottleState::new())),
running: Arc::new(RwLock::new(false)),
})
}
pub async fn start(&mut self) -> Result<()> {
let mut running = self.running.write().await;
if *running {
return Ok(());
}
tracing::info!("Starting alerting manager");
*running = true;
self.start_background_tasks().await;
Ok(())
}
pub async fn stop(&mut self) -> Result<()> {
let mut running = self.running.write().await;
if !*running {
return Ok(());
}
tracing::info!("Stopping alerting manager");
*running = false;
Ok(())
}
pub async fn send_alert(
&self,
severity: AlertSeverity,
title: impl Into<String>,
message: impl Into<String>,
) -> Result<()> {
if !self.config.enabled {
return Ok(());
}
if severity < self.config.min_severity {
return Ok(());
}
if !self.check_throttle().await {
tracing::warn!("Alert throttled due to rate limiting");
return Ok(());
}
let alert = Alert::new(
severity,
AlertCategory::Custom("general".to_string()),
title,
message,
);
self.send_alert_internal(alert).await
}
pub async fn send_categorized_alert(
&self,
severity: AlertSeverity,
category: AlertCategory,
title: impl Into<String>,
message: impl Into<String>,
) -> Result<()> {
if !self.config.enabled {
return Ok(());
}
if severity < self.config.min_severity {
return Ok(());
}
if !self.check_throttle().await {
tracing::warn!("Alert throttled due to rate limiting");
return Ok(());
}
let alert = Alert::new(severity, category, title, message);
self.send_alert_internal(alert).await
}
pub async fn send_custom_alert(&self, alert: Alert) -> Result<()> {
if !self.config.enabled {
return Ok(());
}
if alert.severity < self.config.min_severity {
return Ok(());
}
if !self.check_throttle().await {
tracing::warn!("Alert throttled due to rate limiting");
return Ok(());
}
self.send_alert_internal(alert).await
}
async fn send_alert_internal(&self, alert: Alert) -> Result<()> {
tracing::info!(
severity = ?alert.severity,
category = ?alert.category,
title = %alert.title,
"Sending alert"
);
if self.config.email_channel.is_some() {
self.send_email_alert(&alert).await?;
}
if self.config.slack_channel.is_some() {
self.send_slack_alert(&alert).await?;
}
for webhook_config in &self.config.webhook_channels {
self.send_webhook_alert(&alert, webhook_config).await?;
}
self.add_to_history(alert).await;
Ok(())
}
async fn send_email_alert(&self, alert: &Alert) -> Result<()> {
let transport = self.email_transport.read().await;
let Some(transport) = transport.as_ref() else {
return Ok(());
};
let email_config = self
.config
.email_channel
.as_ref()
.expect("email_channel config should be present when email transport exists");
for to_address in &email_config.to_addresses {
let email = Message::builder()
.from(
email_config.from_address.parse().map_err(|e| {
AlertingError::EmailError(format!("Invalid from address: {e}"))
})?,
)
.to(to_address
.parse()
.map_err(|e| AlertingError::EmailError(format!("Invalid to address: {e}")))?)
.subject(format!("[{}] {}", alert.severity.as_str(), alert.title))
.header(ContentType::TEXT_PLAIN)
.body(alert.to_text())
.map_err(|e| AlertingError::EmailError(format!("Failed to build email: {e}")))?;
transport
.send(email)
.await
.map_err(|e| AlertingError::EmailError(format!("Failed to send email: {e}")))?;
}
Ok(())
}
async fn send_slack_alert(&self, alert: &Alert) -> Result<()> {
let client = self.slack_client.read().await;
let Some(client) = client.as_ref() else {
return Ok(());
};
let slack_config = self
.config
.slack_channel
.as_ref()
.expect("slack_channel config should be present when slack client exists");
let mut payload = PayloadBuilder::new()
.text(format!(
"{} *{}*\n{}",
alert.severity.emoji(),
alert.title,
alert.message
))
.username(slack_config.username.as_deref().unwrap_or("OxiRS Cluster"));
if let Some(channel) = &slack_config.channel {
payload = payload.channel(channel);
}
if let Some(icon) = &slack_config.icon_emoji {
payload = payload.icon_emoji(icon);
}
let payload = payload.build().map_err(|e| {
AlertingError::SlackError(format!("Failed to build Slack payload: {e}"))
})?;
client
.send(&payload)
.await
.map_err(|e| AlertingError::SlackError(format!("Failed to send Slack message: {e}")))?;
Ok(())
}
async fn send_webhook_alert(&self, alert: &Alert, config: &WebhookChannelConfig) -> Result<()> {
let client = reqwest::Client::new();
let mut request = match config.method.to_uppercase().as_str() {
"GET" => client.get(&config.url),
"POST" => client.post(&config.url),
"PUT" => client.put(&config.url),
_ => {
return Err(AlertingError::WebhookError(format!(
"Unsupported HTTP method: {}",
config.method
)))
}
};
for (key, value) in &config.headers {
request = request.header(key, value);
}
if config.method.to_uppercase() == "POST" || config.method.to_uppercase() == "PUT" {
request = request.json(alert);
}
request
.timeout(Duration::from_millis(config.timeout_ms))
.send()
.await
.map_err(|e| AlertingError::WebhookError(format!("Failed to send webhook: {e}")))?;
Ok(())
}
async fn check_throttle(&self) -> bool {
if !self.config.throttling.enabled {
return true;
}
let mut state = self.throttle_state.write().await;
let now = Utc::now();
if state.in_cooldown {
if let Some(cooldown_start) = state.cooldown_start {
let cooldown_duration =
Duration::from_secs(self.config.throttling.cooldown_seconds);
if now
.signed_duration_since(cooldown_start)
.to_std()
.unwrap_or_default()
< cooldown_duration
{
return false;
}
state.in_cooldown = false;
state.cooldown_start = None;
state.alert_count = 0;
state.window_start = now;
}
}
let window_duration = Duration::from_secs(self.config.throttling.window_seconds);
if now
.signed_duration_since(state.window_start)
.to_std()
.unwrap_or_default()
>= window_duration
{
state.alert_count = 0;
state.window_start = now;
}
if state.alert_count >= self.config.throttling.max_alerts_per_window {
state.in_cooldown = true;
state.cooldown_start = Some(now);
return false;
}
state.alert_count += 1;
true
}
async fn add_to_history(&self, alert: Alert) {
let mut history = self.alert_history.write().await;
history.push_back(AlertHistoryEntry {
alert,
sent_at: Utc::now(),
});
while history.len() > self.config.max_history_size {
history.pop_front();
}
}
pub async fn get_history(&self) -> Vec<Alert> {
let history = self.alert_history.read().await;
history.iter().map(|entry| entry.alert.clone()).collect()
}
pub async fn get_statistics(&self) -> AlertingStatistics {
let history = self.alert_history.read().await;
let total_alerts = history.len();
let mut by_severity = HashMap::new();
let mut by_category = HashMap::new();
for entry in history.iter() {
*by_severity.entry(entry.alert.severity).or_insert(0) += 1;
*by_category.entry(entry.alert.category.clone()).or_insert(0) += 1;
}
let throttle_state = self.throttle_state.read().await;
AlertingStatistics {
total_alerts,
alerts_by_severity: by_severity,
alerts_by_category: by_category,
is_throttled: throttle_state.in_cooldown,
current_window_count: throttle_state.alert_count,
}
}
async fn start_background_tasks(&self) {
let running = Arc::clone(&self.running);
let alert_history = Arc::clone(&self.alert_history);
let max_size = self.config.max_history_size;
tokio::spawn(async move {
while *running.read().await {
let mut history = alert_history.write().await;
while history.len() > max_size {
history.pop_front();
}
drop(history);
tokio::time::sleep(Duration::from_secs(60)).await;
}
});
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertingStatistics {
pub total_alerts: usize,
pub alerts_by_severity: HashMap<AlertSeverity, usize>,
pub alerts_by_category: HashMap<AlertCategory, usize>,
pub is_throttled: bool,
pub current_window_count: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_alert_severity_ordering() {
assert!(AlertSeverity::Info < AlertSeverity::Warning);
assert!(AlertSeverity::Warning < AlertSeverity::Error);
assert!(AlertSeverity::Error < AlertSeverity::Critical);
}
#[test]
fn test_alert_creation() {
let alert = Alert::new(
AlertSeverity::Warning,
AlertCategory::NodeHealth,
"Test Alert",
"Test message",
);
assert_eq!(alert.severity, AlertSeverity::Warning);
assert_eq!(alert.category, AlertCategory::NodeHealth);
assert_eq!(alert.title, "Test Alert");
assert_eq!(alert.message, "Test message");
assert!(!alert.acknowledged);
}
#[test]
fn test_alert_metadata() {
let alert = Alert::new(
AlertSeverity::Error,
AlertCategory::Performance,
"Performance Issue",
"High latency detected",
)
.with_metadata("latency_ms", "500")
.with_node_id(42);
assert_eq!(alert.metadata.get("latency_ms"), Some(&"500".to_string()));
assert_eq!(alert.node_id, Some(42));
}
#[tokio::test]
async fn test_alerting_config_builder() {
let config = AlertingConfig::default()
.with_min_severity(AlertSeverity::Error)
.with_slack_channel("https://hooks.slack.com/test");
assert_eq!(config.min_severity, AlertSeverity::Error);
assert!(config.slack_channel.is_some());
}
#[tokio::test]
async fn test_alerting_manager_creation() {
let config = AlertingConfig::default();
let manager = AlertingManager::new(config).await;
assert!(manager.is_ok());
}
}