Skip to main content

a3s_lane/
alerts.rs

1//! Alert system for queue monitoring and notifications.
2//!
3//! This module provides configurable alerts for queue depth, latency,
4//! and other metrics. Alerts can trigger callbacks for notifications.
5
6use std::sync::Arc;
7use tokio::sync::RwLock;
8
9/// Alert severity level
10#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
11pub enum AlertLevel {
12    /// Informational alert
13    Info,
14    /// Warning alert
15    Warning,
16    /// Critical alert
17    Critical,
18}
19
20/// Alert condition that was triggered
21#[derive(Debug, Clone, PartialEq)]
22pub struct Alert {
23    /// Alert level
24    pub level: AlertLevel,
25    /// Lane ID that triggered the alert
26    pub lane_id: String,
27    /// Alert message
28    pub message: String,
29    /// Current value that triggered the alert
30    pub current_value: f64,
31    /// Threshold that was exceeded
32    pub threshold: f64,
33}
34
35/// Callback function type for alert notifications
36pub type AlertCallback = Arc<dyn Fn(Alert) + Send + Sync>;
37
38/// Configuration for queue depth alerts
39#[derive(Debug, Clone, PartialEq)]
40pub struct QueueDepthAlertConfig {
41    /// Warning threshold (queue depth)
42    pub warning_threshold: usize,
43    /// Critical threshold (queue depth)
44    pub critical_threshold: usize,
45    /// Enable alerts
46    pub enabled: bool,
47}
48
49impl QueueDepthAlertConfig {
50    /// Create a new queue depth alert configuration
51    pub fn new(warning_threshold: usize, critical_threshold: usize) -> Self {
52        Self {
53            warning_threshold,
54            critical_threshold,
55            enabled: true,
56        }
57    }
58
59    /// Disable alerts
60    pub fn disabled() -> Self {
61        Self {
62            warning_threshold: usize::MAX,
63            critical_threshold: usize::MAX,
64            enabled: false,
65        }
66    }
67}
68
69impl Default for QueueDepthAlertConfig {
70    fn default() -> Self {
71        Self::disabled()
72    }
73}
74
75/// Configuration for latency alerts
76#[derive(Debug, Clone, PartialEq)]
77pub struct LatencyAlertConfig {
78    /// Warning threshold (milliseconds)
79    pub warning_threshold_ms: f64,
80    /// Critical threshold (milliseconds)
81    pub critical_threshold_ms: f64,
82    /// Enable alerts
83    pub enabled: bool,
84}
85
86impl LatencyAlertConfig {
87    /// Create a new latency alert configuration
88    pub fn new(warning_threshold_ms: f64, critical_threshold_ms: f64) -> Self {
89        Self {
90            warning_threshold_ms,
91            critical_threshold_ms,
92            enabled: true,
93        }
94    }
95
96    /// Disable alerts
97    pub fn disabled() -> Self {
98        Self {
99            warning_threshold_ms: f64::MAX,
100            critical_threshold_ms: f64::MAX,
101            enabled: false,
102        }
103    }
104}
105
106impl Default for LatencyAlertConfig {
107    fn default() -> Self {
108        Self::disabled()
109    }
110}
111
112/// Alert manager for monitoring queue metrics and triggering alerts
113pub struct AlertManager {
114    queue_depth_config: RwLock<QueueDepthAlertConfig>,
115    latency_config: RwLock<LatencyAlertConfig>,
116    callbacks: RwLock<Vec<AlertCallback>>,
117}
118
119impl AlertManager {
120    /// Create a new alert manager
121    pub fn new() -> Self {
122        Self {
123            queue_depth_config: RwLock::new(QueueDepthAlertConfig::default()),
124            latency_config: RwLock::new(LatencyAlertConfig::default()),
125            callbacks: RwLock::new(Vec::new()),
126        }
127    }
128
129    /// Create a new alert manager with queue depth alerts enabled
130    pub fn with_queue_depth_alerts(warning_threshold: usize, critical_threshold: usize) -> Self {
131        Self {
132            queue_depth_config: RwLock::new(QueueDepthAlertConfig::new(
133                warning_threshold,
134                critical_threshold,
135            )),
136            latency_config: RwLock::new(LatencyAlertConfig::default()),
137            callbacks: RwLock::new(Vec::new()),
138        }
139    }
140
141    /// Create a new alert manager with latency alerts enabled
142    pub fn with_latency_alerts(warning_threshold_ms: f64, critical_threshold_ms: f64) -> Self {
143        Self {
144            queue_depth_config: RwLock::new(QueueDepthAlertConfig::default()),
145            latency_config: RwLock::new(LatencyAlertConfig::new(
146                warning_threshold_ms,
147                critical_threshold_ms,
148            )),
149            callbacks: RwLock::new(Vec::new()),
150        }
151    }
152
153    /// Set queue depth alert configuration
154    pub async fn set_queue_depth_config(&self, config: QueueDepthAlertConfig) {
155        let mut queue_depth_config = self.queue_depth_config.write().await;
156        *queue_depth_config = config;
157    }
158
159    /// Set latency alert configuration
160    pub async fn set_latency_config(&self, config: LatencyAlertConfig) {
161        let mut latency_config = self.latency_config.write().await;
162        *latency_config = config;
163    }
164
165    /// Add an alert callback
166    pub async fn add_callback<F>(&self, callback: F)
167    where
168        F: Fn(Alert) + Send + Sync + 'static,
169    {
170        let mut callbacks = self.callbacks.write().await;
171        callbacks.push(Arc::new(callback));
172    }
173
174    /// Check queue depth and trigger alerts if thresholds are exceeded
175    pub async fn check_queue_depth(&self, lane_id: &str, depth: usize) {
176        let config = self.queue_depth_config.read().await;
177        if !config.enabled {
178            return;
179        }
180
181        let alert = if depth >= config.critical_threshold {
182            Some(Alert {
183                level: AlertLevel::Critical,
184                lane_id: lane_id.to_string(),
185                message: format!(
186                    "Queue depth critical: {} (threshold: {})",
187                    depth, config.critical_threshold
188                ),
189                current_value: depth as f64,
190                threshold: config.critical_threshold as f64,
191            })
192        } else if depth >= config.warning_threshold {
193            Some(Alert {
194                level: AlertLevel::Warning,
195                lane_id: lane_id.to_string(),
196                message: format!(
197                    "Queue depth warning: {} (threshold: {})",
198                    depth, config.warning_threshold
199                ),
200                current_value: depth as f64,
201                threshold: config.warning_threshold as f64,
202            })
203        } else {
204            None
205        };
206
207        if let Some(alert) = alert {
208            self.trigger_alert(alert).await;
209        }
210    }
211
212    /// Check latency and trigger alerts if thresholds are exceeded
213    pub async fn check_latency(&self, lane_id: &str, latency_ms: f64) {
214        let config = self.latency_config.read().await;
215        if !config.enabled {
216            return;
217        }
218
219        let alert = if latency_ms >= config.critical_threshold_ms {
220            Some(Alert {
221                level: AlertLevel::Critical,
222                lane_id: lane_id.to_string(),
223                message: format!(
224                    "Latency critical: {:.2}ms (threshold: {:.2}ms)",
225                    latency_ms, config.critical_threshold_ms
226                ),
227                current_value: latency_ms,
228                threshold: config.critical_threshold_ms,
229            })
230        } else if latency_ms >= config.warning_threshold_ms {
231            Some(Alert {
232                level: AlertLevel::Warning,
233                lane_id: lane_id.to_string(),
234                message: format!(
235                    "Latency warning: {:.2}ms (threshold: {:.2}ms)",
236                    latency_ms, config.warning_threshold_ms
237                ),
238                current_value: latency_ms,
239                threshold: config.warning_threshold_ms,
240            })
241        } else {
242            None
243        };
244
245        if let Some(alert) = alert {
246            self.trigger_alert(alert).await;
247        }
248    }
249
250    /// Trigger an alert by calling all registered callbacks
251    async fn trigger_alert(&self, alert: Alert) {
252        let callbacks = self.callbacks.read().await;
253        for callback in callbacks.iter() {
254            callback(alert.clone());
255        }
256    }
257
258    /// Get current queue depth alert configuration
259    pub async fn queue_depth_config(&self) -> QueueDepthAlertConfig {
260        self.queue_depth_config.read().await.clone()
261    }
262
263    /// Get current latency alert configuration
264    pub async fn latency_config(&self) -> LatencyAlertConfig {
265        self.latency_config.read().await.clone()
266    }
267}
268
269impl Default for AlertManager {
270    fn default() -> Self {
271        Self::new()
272    }
273}
274
275impl Clone for AlertManager {
276    fn clone(&self) -> Self {
277        // Create a new AlertManager with default configs
278        // Note: callbacks are not cloned as they can't be easily cloned
279        Self::new()
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use std::sync::atomic::{AtomicUsize, Ordering};
287    use std::time::Duration;
288
289    #[tokio::test]
290    async fn test_queue_depth_alert_config() {
291        let config = QueueDepthAlertConfig::new(100, 200);
292        assert_eq!(config.warning_threshold, 100);
293        assert_eq!(config.critical_threshold, 200);
294        assert!(config.enabled);
295
296        let disabled = QueueDepthAlertConfig::disabled();
297        assert!(!disabled.enabled);
298    }
299
300    #[tokio::test]
301    async fn test_latency_alert_config() {
302        let config = LatencyAlertConfig::new(100.0, 500.0);
303        assert_eq!(config.warning_threshold_ms, 100.0);
304        assert_eq!(config.critical_threshold_ms, 500.0);
305        assert!(config.enabled);
306
307        let disabled = LatencyAlertConfig::disabled();
308        assert!(!disabled.enabled);
309    }
310
311    #[tokio::test]
312    async fn test_alert_manager_new() {
313        let manager = AlertManager::new();
314        let queue_config = manager.queue_depth_config().await;
315        let latency_config = manager.latency_config().await;
316
317        assert!(!queue_config.enabled);
318        assert!(!latency_config.enabled);
319    }
320
321    #[tokio::test]
322    async fn test_alert_manager_with_queue_depth_alerts() {
323        let manager = AlertManager::with_queue_depth_alerts(50, 100);
324        let config = manager.queue_depth_config().await;
325
326        assert_eq!(config.warning_threshold, 50);
327        assert_eq!(config.critical_threshold, 100);
328        assert!(config.enabled);
329    }
330
331    #[tokio::test]
332    async fn test_alert_manager_with_latency_alerts() {
333        let manager = AlertManager::with_latency_alerts(100.0, 500.0);
334        let config = manager.latency_config().await;
335
336        assert_eq!(config.warning_threshold_ms, 100.0);
337        assert_eq!(config.critical_threshold_ms, 500.0);
338        assert!(config.enabled);
339    }
340
341    #[tokio::test]
342    async fn test_alert_manager_set_configs() {
343        let manager = AlertManager::new();
344
345        manager
346            .set_queue_depth_config(QueueDepthAlertConfig::new(10, 20))
347            .await;
348        let queue_config = manager.queue_depth_config().await;
349        assert_eq!(queue_config.warning_threshold, 10);
350        assert_eq!(queue_config.critical_threshold, 20);
351
352        manager
353            .set_latency_config(LatencyAlertConfig::new(50.0, 100.0))
354            .await;
355        let latency_config = manager.latency_config().await;
356        assert_eq!(latency_config.warning_threshold_ms, 50.0);
357        assert_eq!(latency_config.critical_threshold_ms, 100.0);
358    }
359
360    #[tokio::test]
361    async fn test_check_queue_depth_no_alert() {
362        let manager = AlertManager::with_queue_depth_alerts(100, 200);
363        let alert_count = Arc::new(AtomicUsize::new(0));
364        let alert_count_clone = Arc::clone(&alert_count);
365
366        manager
367            .add_callback(move |_alert| {
368                alert_count_clone.fetch_add(1, Ordering::SeqCst);
369            })
370            .await;
371
372        manager.check_queue_depth("query", 50).await;
373
374        // No alert should be triggered
375        assert_eq!(alert_count.load(Ordering::SeqCst), 0);
376    }
377
378    #[tokio::test]
379    async fn test_check_queue_depth_warning() {
380        let manager = AlertManager::with_queue_depth_alerts(100, 200);
381        let alert_level = Arc::new(RwLock::new(None));
382        let alert_level_clone = Arc::clone(&alert_level);
383
384        manager
385            .add_callback(move |alert| {
386                let alert_level = Arc::clone(&alert_level_clone);
387                tokio::spawn(async move {
388                    let mut level = alert_level.write().await;
389                    *level = Some(alert.level);
390                });
391            })
392            .await;
393
394        manager.check_queue_depth("query", 150).await;
395
396        // Give callback time to execute
397        tokio::time::sleep(Duration::from_millis(10)).await;
398
399        let level = alert_level.read().await;
400        assert_eq!(*level, Some(AlertLevel::Warning));
401    }
402
403    #[tokio::test]
404    async fn test_check_queue_depth_critical() {
405        let manager = AlertManager::with_queue_depth_alerts(100, 200);
406        let alert_level = Arc::new(RwLock::new(None));
407        let alert_level_clone = Arc::clone(&alert_level);
408
409        manager
410            .add_callback(move |alert| {
411                let alert_level = Arc::clone(&alert_level_clone);
412                tokio::spawn(async move {
413                    let mut level = alert_level.write().await;
414                    *level = Some(alert.level);
415                });
416            })
417            .await;
418
419        manager.check_queue_depth("query", 250).await;
420
421        // Give callback time to execute
422        tokio::time::sleep(Duration::from_millis(10)).await;
423
424        let level = alert_level.read().await;
425        assert_eq!(*level, Some(AlertLevel::Critical));
426    }
427
428    #[tokio::test]
429    async fn test_check_queue_depth_disabled() {
430        let manager = AlertManager::new();
431        let alert_count = Arc::new(AtomicUsize::new(0));
432        let alert_count_clone = Arc::clone(&alert_count);
433
434        manager
435            .add_callback(move |_alert| {
436                alert_count_clone.fetch_add(1, Ordering::SeqCst);
437            })
438            .await;
439
440        manager.check_queue_depth("query", 1000).await;
441
442        // No alert should be triggered when disabled
443        assert_eq!(alert_count.load(Ordering::SeqCst), 0);
444    }
445
446    #[tokio::test]
447    async fn test_check_latency_no_alert() {
448        let manager = AlertManager::with_latency_alerts(100.0, 500.0);
449        let alert_count = Arc::new(AtomicUsize::new(0));
450        let alert_count_clone = Arc::clone(&alert_count);
451
452        manager
453            .add_callback(move |_alert| {
454                alert_count_clone.fetch_add(1, Ordering::SeqCst);
455            })
456            .await;
457
458        manager.check_latency("query", 50.0).await;
459
460        // No alert should be triggered
461        assert_eq!(alert_count.load(Ordering::SeqCst), 0);
462    }
463
464    #[tokio::test]
465    async fn test_check_latency_warning() {
466        let manager = AlertManager::with_latency_alerts(100.0, 500.0);
467        let alert_level = Arc::new(RwLock::new(None));
468        let alert_level_clone = Arc::clone(&alert_level);
469
470        manager
471            .add_callback(move |alert| {
472                let alert_level = Arc::clone(&alert_level_clone);
473                tokio::spawn(async move {
474                    let mut level = alert_level.write().await;
475                    *level = Some(alert.level);
476                });
477            })
478            .await;
479
480        manager.check_latency("query", 250.0).await;
481
482        // Give callback time to execute
483        tokio::time::sleep(Duration::from_millis(10)).await;
484
485        let level = alert_level.read().await;
486        assert_eq!(*level, Some(AlertLevel::Warning));
487    }
488
489    #[tokio::test]
490    async fn test_check_latency_critical() {
491        let manager = AlertManager::with_latency_alerts(100.0, 500.0);
492        let alert_level = Arc::new(RwLock::new(None));
493        let alert_level_clone = Arc::clone(&alert_level);
494
495        manager
496            .add_callback(move |alert| {
497                let alert_level = Arc::clone(&alert_level_clone);
498                tokio::spawn(async move {
499                    let mut level = alert_level.write().await;
500                    *level = Some(alert.level);
501                });
502            })
503            .await;
504
505        manager.check_latency("query", 600.0).await;
506
507        // Give callback time to execute
508        tokio::time::sleep(Duration::from_millis(10)).await;
509
510        let level = alert_level.read().await;
511        assert_eq!(*level, Some(AlertLevel::Critical));
512    }
513
514    #[tokio::test]
515    async fn test_check_latency_disabled() {
516        let manager = AlertManager::new();
517        let alert_count = Arc::new(AtomicUsize::new(0));
518        let alert_count_clone = Arc::clone(&alert_count);
519
520        manager
521            .add_callback(move |_alert| {
522                alert_count_clone.fetch_add(1, Ordering::SeqCst);
523            })
524            .await;
525
526        manager.check_latency("query", 10000.0).await;
527
528        // No alert should be triggered when disabled
529        assert_eq!(alert_count.load(Ordering::SeqCst), 0);
530    }
531
532    #[tokio::test]
533    async fn test_multiple_callbacks() {
534        let manager = AlertManager::with_queue_depth_alerts(100, 200);
535        let count1 = Arc::new(AtomicUsize::new(0));
536        let count2 = Arc::new(AtomicUsize::new(0));
537
538        let count1_clone = Arc::clone(&count1);
539        manager
540            .add_callback(move |_alert| {
541                count1_clone.fetch_add(1, Ordering::SeqCst);
542            })
543            .await;
544
545        let count2_clone = Arc::clone(&count2);
546        manager
547            .add_callback(move |_alert| {
548                count2_clone.fetch_add(1, Ordering::SeqCst);
549            })
550            .await;
551
552        manager.check_queue_depth("query", 150).await;
553
554        // Both callbacks should be triggered
555        assert_eq!(count1.load(Ordering::SeqCst), 1);
556        assert_eq!(count2.load(Ordering::SeqCst), 1);
557    }
558
559    #[tokio::test]
560    async fn test_alert_level_ordering() {
561        assert!(AlertLevel::Info < AlertLevel::Warning);
562        assert!(AlertLevel::Warning < AlertLevel::Critical);
563    }
564
565    #[tokio::test]
566    async fn test_alert_manager_default() {
567        let manager = AlertManager::default();
568        let queue_config = manager.queue_depth_config().await;
569        assert!(!queue_config.enabled);
570    }
571
572    #[tokio::test]
573    async fn test_queue_depth_alert_config_default() {
574        let config = QueueDepthAlertConfig::default();
575        assert!(!config.enabled);
576    }
577
578    #[tokio::test]
579    async fn test_latency_alert_config_default() {
580        let config = LatencyAlertConfig::default();
581        assert!(!config.enabled);
582    }
583}