use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash)]
pub enum AlertLevel {
Info,
Warning,
Critical,
}
impl std::fmt::Display for AlertLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AlertLevel::Info => write!(f, "INFO"),
AlertLevel::Warning => write!(f, "WARNING"),
AlertLevel::Critical => write!(f, "CRITICAL"),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Hash, Eq)]
#[serde(tag = "type")]
pub enum AlertCondition {
MissedSchedule {
expected_at: DateTime<Utc>,
detected_at: DateTime<Utc>,
},
ConsecutiveFailures {
count: u32,
threshold: u32,
},
HighFailureRate {
rate: String, threshold: String,
},
SlowExecution {
duration_ms: u64,
threshold_ms: u64,
},
TaskStuck {
idle_duration_seconds: i64,
expected_interval_seconds: u64,
},
TaskUnhealthy {
issues: Vec<String>,
},
}
impl std::fmt::Display for AlertCondition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AlertCondition::MissedSchedule {
expected_at,
detected_at,
} => {
let delay = detected_at
.signed_duration_since(*expected_at)
.num_seconds();
write!(
f,
"Missed schedule ({}s late, expected at {})",
delay,
expected_at.format("%Y-%m-%d %H:%M:%S UTC")
)
}
AlertCondition::ConsecutiveFailures { count, threshold } => {
write!(
f,
"Consecutive failures ({} failures, threshold: {})",
count, threshold
)
}
AlertCondition::HighFailureRate { rate, threshold } => {
write!(
f,
"High failure rate (rate: {}, threshold: {})",
rate, threshold
)
}
AlertCondition::SlowExecution {
duration_ms,
threshold_ms,
} => {
write!(
f,
"Slow execution ({}ms, threshold: {}ms)",
duration_ms, threshold_ms
)
}
AlertCondition::TaskStuck {
idle_duration_seconds,
expected_interval_seconds,
} => {
write!(
f,
"Task stuck (idle: {}s, expected interval: {}s)",
idle_duration_seconds, expected_interval_seconds
)
}
AlertCondition::TaskUnhealthy { issues } => {
write!(f, "Task unhealthy: {}", issues.join(", "))
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Alert {
pub timestamp: DateTime<Utc>,
pub task_name: String,
pub level: AlertLevel,
pub condition: AlertCondition,
pub message: String,
#[serde(default)]
pub metadata: HashMap<String, String>,
}
impl Alert {
pub fn new(
task_name: String,
level: AlertLevel,
condition: AlertCondition,
message: String,
) -> Self {
Self {
timestamp: Utc::now(),
task_name,
level,
condition,
message,
metadata: HashMap::new(),
}
}
pub fn with_metadata(mut self, key: String, value: String) -> Self {
self.metadata.insert(key, value);
self
}
pub fn is_critical(&self) -> bool {
self.level == AlertLevel::Critical
}
pub fn is_warning(&self) -> bool {
self.level == AlertLevel::Warning
}
pub fn is_info(&self) -> bool {
self.level == AlertLevel::Info
}
fn dedup_key(&self) -> String {
format!("{}::{:?}", self.task_name, self.condition)
}
}
impl std::fmt::Display for Alert {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"[{}] {} - {} - {}",
self.timestamp.format("%Y-%m-%d %H:%M:%S UTC"),
self.level,
self.task_name,
self.message
)
}
}
pub type AlertCallback = Arc<dyn Fn(&Alert) + Send + Sync>;
#[derive(Clone, Serialize, Deserialize)]
pub struct AlertManager {
alerts: Vec<Alert>,
max_history: usize,
dedup_window_seconds: i64,
last_alert_time: HashMap<String, DateTime<Utc>>,
#[serde(skip)]
callbacks: Vec<AlertCallback>,
}
impl AlertManager {
pub fn new(max_history: usize, dedup_window_seconds: i64) -> Self {
Self {
alerts: Vec::new(),
max_history,
dedup_window_seconds,
last_alert_time: HashMap::new(),
callbacks: Vec::new(),
}
}
pub fn add_callback(&mut self, callback: AlertCallback) {
self.callbacks.push(callback);
}
pub fn record_alert(&mut self, alert: Alert) -> bool {
let dedup_key = alert.dedup_key();
let now = Utc::now();
if let Some(last_time) = self.last_alert_time.get(&dedup_key) {
let elapsed = now.signed_duration_since(*last_time).num_seconds();
if elapsed < self.dedup_window_seconds {
return false;
}
}
self.last_alert_time.insert(dedup_key, now);
for callback in &self.callbacks {
callback(&alert);
}
self.alerts.push(alert);
if self.alerts.len() > self.max_history {
self.alerts.drain(0..self.alerts.len() - self.max_history);
}
self.last_alert_time.retain(|_, last_time| {
now.signed_duration_since(*last_time).num_seconds() < self.dedup_window_seconds * 2
});
true
}
pub fn get_alerts(&self) -> &[Alert] {
&self.alerts
}
pub fn get_critical_alerts(&self) -> Vec<&Alert> {
self.alerts.iter().filter(|a| a.is_critical()).collect()
}
pub fn get_warning_alerts(&self) -> Vec<&Alert> {
self.alerts.iter().filter(|a| a.is_warning()).collect()
}
pub fn get_task_alerts(&self, task_name: &str) -> Vec<&Alert> {
self.alerts
.iter()
.filter(|a| a.task_name == task_name)
.collect()
}
pub fn get_recent_alerts(&self, seconds: i64) -> Vec<&Alert> {
let cutoff = Utc::now() - Duration::seconds(seconds);
self.alerts
.iter()
.filter(|a| a.timestamp > cutoff)
.collect()
}
pub fn clear(&mut self) {
self.alerts.clear();
self.last_alert_time.clear();
}
pub fn clear_task_alerts(&mut self, task_name: &str) {
self.alerts.retain(|a| a.task_name != task_name);
self.last_alert_time
.retain(|k, _| !k.starts_with(&format!("{}::", task_name)));
}
pub fn alert_count(&self) -> usize {
self.alerts.len()
}
pub fn critical_alert_count(&self) -> usize {
self.alerts.iter().filter(|a| a.is_critical()).count()
}
pub fn warning_alert_count(&self) -> usize {
self.alerts.iter().filter(|a| a.is_warning()).count()
}
}
impl Default for AlertManager {
fn default() -> Self {
Self::new(1000, 300) }
}
impl std::fmt::Debug for AlertManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AlertManager")
.field("alerts_count", &self.alerts.len())
.field("max_history", &self.max_history)
.field("dedup_window_seconds", &self.dedup_window_seconds)
.field("callbacks_count", &self.callbacks.len())
.finish()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertConfig {
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default = "default_consecutive_failures_threshold")]
pub consecutive_failures_threshold: u32,
#[serde(default = "default_failure_rate_threshold")]
pub failure_rate_threshold: f64,
pub slow_execution_threshold_ms: Option<u64>,
#[serde(default = "default_true")]
pub alert_on_missed_schedule: bool,
#[serde(default = "default_true")]
pub alert_on_stuck: bool,
}
#[allow(dead_code)]
fn default_true() -> bool {
true
}
#[allow(dead_code)]
fn default_consecutive_failures_threshold() -> u32 {
3
}
#[allow(dead_code)]
fn default_failure_rate_threshold() -> f64 {
0.5
}
impl Default for AlertConfig {
fn default() -> Self {
Self {
enabled: true,
consecutive_failures_threshold: 3,
failure_rate_threshold: 0.5,
slow_execution_threshold_ms: None,
alert_on_missed_schedule: true,
alert_on_stuck: true,
}
}
}
impl AlertConfig {
pub fn new() -> Self {
Self::default()
}
pub fn disabled() -> Self {
Self {
enabled: false,
..Default::default()
}
}
pub fn with_consecutive_failures_threshold(mut self, threshold: u32) -> Self {
self.consecutive_failures_threshold = threshold;
self
}
pub fn with_failure_rate_threshold(mut self, threshold: f64) -> Self {
self.failure_rate_threshold = threshold;
self
}
pub fn with_slow_execution_threshold_ms(mut self, threshold_ms: u64) -> Self {
self.slow_execution_threshold_ms = Some(threshold_ms);
self
}
pub fn without_missed_schedule_alerts(mut self) -> Self {
self.alert_on_missed_schedule = false;
self
}
pub fn without_stuck_alerts(mut self) -> Self {
self.alert_on_stuck = false;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookConfig {
pub url: String,
#[serde(default)]
pub headers: HashMap<String, String>,
#[serde(default = "default_webhook_timeout")]
pub timeout_seconds: u64,
#[serde(default)]
pub alert_levels: Vec<AlertLevel>,
}
#[allow(dead_code)]
fn default_webhook_timeout() -> u64 {
30
}
impl WebhookConfig {
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
headers: HashMap::new(),
timeout_seconds: default_webhook_timeout(),
alert_levels: Vec::new(),
}
}
pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.headers.insert(key.into(), value.into());
self
}
pub fn with_timeout(mut self, timeout_seconds: u64) -> Self {
self.timeout_seconds = timeout_seconds;
self
}
pub fn with_alert_levels(mut self, levels: Vec<AlertLevel>) -> Self {
self.alert_levels = levels;
self
}
pub fn should_send(&self, alert: &Alert) -> bool {
if self.alert_levels.is_empty() {
return true;
}
self.alert_levels.contains(&alert.level)
}
pub fn create_payload(&self, alert: &Alert) -> serde_json::Value {
serde_json::json!({
"timestamp": alert.timestamp.to_rfc3339(),
"task_name": alert.task_name,
"level": format!("{:?}", alert.level),
"condition": format!("{:?}", alert.condition),
"message": alert.message,
"metadata": alert.metadata,
})
}
}