1use std::sync::Arc;
7use tokio::sync::RwLock;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
11pub enum AlertLevel {
12 Info,
14 Warning,
16 Critical,
18}
19
20#[derive(Debug, Clone, PartialEq)]
22pub struct Alert {
23 pub level: AlertLevel,
25 pub lane_id: String,
27 pub message: String,
29 pub current_value: f64,
31 pub threshold: f64,
33}
34
35pub type AlertCallback = Arc<dyn Fn(Alert) + Send + Sync>;
37
38#[derive(Debug, Clone, PartialEq)]
40pub struct QueueDepthAlertConfig {
41 pub warning_threshold: usize,
43 pub critical_threshold: usize,
45 pub enabled: bool,
47}
48
49impl QueueDepthAlertConfig {
50 pub fn new(warning_threshold: usize, critical_threshold: usize) -> Self {
52 Self {
53 warning_threshold,
54 critical_threshold,
55 enabled: true,
56 }
57 }
58
59 pub fn disabled() -> Self {
61 Self {
62 warning_threshold: usize::MAX,
63 critical_threshold: usize::MAX,
64 enabled: false,
65 }
66 }
67}
68
69impl Default for QueueDepthAlertConfig {
70 fn default() -> Self {
71 Self::disabled()
72 }
73}
74
75#[derive(Debug, Clone, PartialEq)]
77pub struct LatencyAlertConfig {
78 pub warning_threshold_ms: f64,
80 pub critical_threshold_ms: f64,
82 pub enabled: bool,
84}
85
86impl LatencyAlertConfig {
87 pub fn new(warning_threshold_ms: f64, critical_threshold_ms: f64) -> Self {
89 Self {
90 warning_threshold_ms,
91 critical_threshold_ms,
92 enabled: true,
93 }
94 }
95
96 pub fn disabled() -> Self {
98 Self {
99 warning_threshold_ms: f64::MAX,
100 critical_threshold_ms: f64::MAX,
101 enabled: false,
102 }
103 }
104}
105
106impl Default for LatencyAlertConfig {
107 fn default() -> Self {
108 Self::disabled()
109 }
110}
111
112pub struct AlertManager {
114 queue_depth_config: RwLock<QueueDepthAlertConfig>,
115 latency_config: RwLock<LatencyAlertConfig>,
116 callbacks: RwLock<Vec<AlertCallback>>,
117}
118
119impl AlertManager {
120 pub fn new() -> Self {
122 Self {
123 queue_depth_config: RwLock::new(QueueDepthAlertConfig::default()),
124 latency_config: RwLock::new(LatencyAlertConfig::default()),
125 callbacks: RwLock::new(Vec::new()),
126 }
127 }
128
129 pub fn with_queue_depth_alerts(warning_threshold: usize, critical_threshold: usize) -> Self {
131 Self {
132 queue_depth_config: RwLock::new(QueueDepthAlertConfig::new(
133 warning_threshold,
134 critical_threshold,
135 )),
136 latency_config: RwLock::new(LatencyAlertConfig::default()),
137 callbacks: RwLock::new(Vec::new()),
138 }
139 }
140
141 pub fn with_latency_alerts(warning_threshold_ms: f64, critical_threshold_ms: f64) -> Self {
143 Self {
144 queue_depth_config: RwLock::new(QueueDepthAlertConfig::default()),
145 latency_config: RwLock::new(LatencyAlertConfig::new(
146 warning_threshold_ms,
147 critical_threshold_ms,
148 )),
149 callbacks: RwLock::new(Vec::new()),
150 }
151 }
152
153 pub async fn set_queue_depth_config(&self, config: QueueDepthAlertConfig) {
155 let mut queue_depth_config = self.queue_depth_config.write().await;
156 *queue_depth_config = config;
157 }
158
159 pub async fn set_latency_config(&self, config: LatencyAlertConfig) {
161 let mut latency_config = self.latency_config.write().await;
162 *latency_config = config;
163 }
164
165 pub async fn add_callback<F>(&self, callback: F)
167 where
168 F: Fn(Alert) + Send + Sync + 'static,
169 {
170 let mut callbacks = self.callbacks.write().await;
171 callbacks.push(Arc::new(callback));
172 }
173
174 pub async fn check_queue_depth(&self, lane_id: &str, depth: usize) {
176 let config = self.queue_depth_config.read().await;
177 if !config.enabled {
178 return;
179 }
180
181 let alert = if depth >= config.critical_threshold {
182 Some(Alert {
183 level: AlertLevel::Critical,
184 lane_id: lane_id.to_string(),
185 message: format!(
186 "Queue depth critical: {} (threshold: {})",
187 depth, config.critical_threshold
188 ),
189 current_value: depth as f64,
190 threshold: config.critical_threshold as f64,
191 })
192 } else if depth >= config.warning_threshold {
193 Some(Alert {
194 level: AlertLevel::Warning,
195 lane_id: lane_id.to_string(),
196 message: format!(
197 "Queue depth warning: {} (threshold: {})",
198 depth, config.warning_threshold
199 ),
200 current_value: depth as f64,
201 threshold: config.warning_threshold as f64,
202 })
203 } else {
204 None
205 };
206
207 if let Some(alert) = alert {
208 self.trigger_alert(alert).await;
209 }
210 }
211
212 pub async fn check_latency(&self, lane_id: &str, latency_ms: f64) {
214 let config = self.latency_config.read().await;
215 if !config.enabled {
216 return;
217 }
218
219 let alert = if latency_ms >= config.critical_threshold_ms {
220 Some(Alert {
221 level: AlertLevel::Critical,
222 lane_id: lane_id.to_string(),
223 message: format!(
224 "Latency critical: {:.2}ms (threshold: {:.2}ms)",
225 latency_ms, config.critical_threshold_ms
226 ),
227 current_value: latency_ms,
228 threshold: config.critical_threshold_ms,
229 })
230 } else if latency_ms >= config.warning_threshold_ms {
231 Some(Alert {
232 level: AlertLevel::Warning,
233 lane_id: lane_id.to_string(),
234 message: format!(
235 "Latency warning: {:.2}ms (threshold: {:.2}ms)",
236 latency_ms, config.warning_threshold_ms
237 ),
238 current_value: latency_ms,
239 threshold: config.warning_threshold_ms,
240 })
241 } else {
242 None
243 };
244
245 if let Some(alert) = alert {
246 self.trigger_alert(alert).await;
247 }
248 }
249
250 async fn trigger_alert(&self, alert: Alert) {
252 let callbacks = self.callbacks.read().await;
253 for callback in callbacks.iter() {
254 callback(alert.clone());
255 }
256 }
257
258 pub async fn queue_depth_config(&self) -> QueueDepthAlertConfig {
260 self.queue_depth_config.read().await.clone()
261 }
262
263 pub async fn latency_config(&self) -> LatencyAlertConfig {
265 self.latency_config.read().await.clone()
266 }
267}
268
269impl Default for AlertManager {
270 fn default() -> Self {
271 Self::new()
272 }
273}
274
275impl Clone for AlertManager {
276 fn clone(&self) -> Self {
277 Self::new()
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use std::sync::atomic::{AtomicUsize, Ordering};
287 use std::time::Duration;
288
289 #[tokio::test]
290 async fn test_queue_depth_alert_config() {
291 let config = QueueDepthAlertConfig::new(100, 200);
292 assert_eq!(config.warning_threshold, 100);
293 assert_eq!(config.critical_threshold, 200);
294 assert!(config.enabled);
295
296 let disabled = QueueDepthAlertConfig::disabled();
297 assert!(!disabled.enabled);
298 }
299
300 #[tokio::test]
301 async fn test_latency_alert_config() {
302 let config = LatencyAlertConfig::new(100.0, 500.0);
303 assert_eq!(config.warning_threshold_ms, 100.0);
304 assert_eq!(config.critical_threshold_ms, 500.0);
305 assert!(config.enabled);
306
307 let disabled = LatencyAlertConfig::disabled();
308 assert!(!disabled.enabled);
309 }
310
311 #[tokio::test]
312 async fn test_alert_manager_new() {
313 let manager = AlertManager::new();
314 let queue_config = manager.queue_depth_config().await;
315 let latency_config = manager.latency_config().await;
316
317 assert!(!queue_config.enabled);
318 assert!(!latency_config.enabled);
319 }
320
321 #[tokio::test]
322 async fn test_alert_manager_with_queue_depth_alerts() {
323 let manager = AlertManager::with_queue_depth_alerts(50, 100);
324 let config = manager.queue_depth_config().await;
325
326 assert_eq!(config.warning_threshold, 50);
327 assert_eq!(config.critical_threshold, 100);
328 assert!(config.enabled);
329 }
330
331 #[tokio::test]
332 async fn test_alert_manager_with_latency_alerts() {
333 let manager = AlertManager::with_latency_alerts(100.0, 500.0);
334 let config = manager.latency_config().await;
335
336 assert_eq!(config.warning_threshold_ms, 100.0);
337 assert_eq!(config.critical_threshold_ms, 500.0);
338 assert!(config.enabled);
339 }
340
341 #[tokio::test]
342 async fn test_alert_manager_set_configs() {
343 let manager = AlertManager::new();
344
345 manager
346 .set_queue_depth_config(QueueDepthAlertConfig::new(10, 20))
347 .await;
348 let queue_config = manager.queue_depth_config().await;
349 assert_eq!(queue_config.warning_threshold, 10);
350 assert_eq!(queue_config.critical_threshold, 20);
351
352 manager
353 .set_latency_config(LatencyAlertConfig::new(50.0, 100.0))
354 .await;
355 let latency_config = manager.latency_config().await;
356 assert_eq!(latency_config.warning_threshold_ms, 50.0);
357 assert_eq!(latency_config.critical_threshold_ms, 100.0);
358 }
359
360 #[tokio::test]
361 async fn test_check_queue_depth_no_alert() {
362 let manager = AlertManager::with_queue_depth_alerts(100, 200);
363 let alert_count = Arc::new(AtomicUsize::new(0));
364 let alert_count_clone = Arc::clone(&alert_count);
365
366 manager
367 .add_callback(move |_alert| {
368 alert_count_clone.fetch_add(1, Ordering::SeqCst);
369 })
370 .await;
371
372 manager.check_queue_depth("query", 50).await;
373
374 assert_eq!(alert_count.load(Ordering::SeqCst), 0);
376 }
377
378 #[tokio::test]
379 async fn test_check_queue_depth_warning() {
380 let manager = AlertManager::with_queue_depth_alerts(100, 200);
381 let alert_level = Arc::new(RwLock::new(None));
382 let alert_level_clone = Arc::clone(&alert_level);
383
384 manager
385 .add_callback(move |alert| {
386 let alert_level = Arc::clone(&alert_level_clone);
387 tokio::spawn(async move {
388 let mut level = alert_level.write().await;
389 *level = Some(alert.level);
390 });
391 })
392 .await;
393
394 manager.check_queue_depth("query", 150).await;
395
396 tokio::time::sleep(Duration::from_millis(10)).await;
398
399 let level = alert_level.read().await;
400 assert_eq!(*level, Some(AlertLevel::Warning));
401 }
402
403 #[tokio::test]
404 async fn test_check_queue_depth_critical() {
405 let manager = AlertManager::with_queue_depth_alerts(100, 200);
406 let alert_level = Arc::new(RwLock::new(None));
407 let alert_level_clone = Arc::clone(&alert_level);
408
409 manager
410 .add_callback(move |alert| {
411 let alert_level = Arc::clone(&alert_level_clone);
412 tokio::spawn(async move {
413 let mut level = alert_level.write().await;
414 *level = Some(alert.level);
415 });
416 })
417 .await;
418
419 manager.check_queue_depth("query", 250).await;
420
421 tokio::time::sleep(Duration::from_millis(10)).await;
423
424 let level = alert_level.read().await;
425 assert_eq!(*level, Some(AlertLevel::Critical));
426 }
427
428 #[tokio::test]
429 async fn test_check_queue_depth_disabled() {
430 let manager = AlertManager::new();
431 let alert_count = Arc::new(AtomicUsize::new(0));
432 let alert_count_clone = Arc::clone(&alert_count);
433
434 manager
435 .add_callback(move |_alert| {
436 alert_count_clone.fetch_add(1, Ordering::SeqCst);
437 })
438 .await;
439
440 manager.check_queue_depth("query", 1000).await;
441
442 assert_eq!(alert_count.load(Ordering::SeqCst), 0);
444 }
445
446 #[tokio::test]
447 async fn test_check_latency_no_alert() {
448 let manager = AlertManager::with_latency_alerts(100.0, 500.0);
449 let alert_count = Arc::new(AtomicUsize::new(0));
450 let alert_count_clone = Arc::clone(&alert_count);
451
452 manager
453 .add_callback(move |_alert| {
454 alert_count_clone.fetch_add(1, Ordering::SeqCst);
455 })
456 .await;
457
458 manager.check_latency("query", 50.0).await;
459
460 assert_eq!(alert_count.load(Ordering::SeqCst), 0);
462 }
463
464 #[tokio::test]
465 async fn test_check_latency_warning() {
466 let manager = AlertManager::with_latency_alerts(100.0, 500.0);
467 let alert_level = Arc::new(RwLock::new(None));
468 let alert_level_clone = Arc::clone(&alert_level);
469
470 manager
471 .add_callback(move |alert| {
472 let alert_level = Arc::clone(&alert_level_clone);
473 tokio::spawn(async move {
474 let mut level = alert_level.write().await;
475 *level = Some(alert.level);
476 });
477 })
478 .await;
479
480 manager.check_latency("query", 250.0).await;
481
482 tokio::time::sleep(Duration::from_millis(10)).await;
484
485 let level = alert_level.read().await;
486 assert_eq!(*level, Some(AlertLevel::Warning));
487 }
488
489 #[tokio::test]
490 async fn test_check_latency_critical() {
491 let manager = AlertManager::with_latency_alerts(100.0, 500.0);
492 let alert_level = Arc::new(RwLock::new(None));
493 let alert_level_clone = Arc::clone(&alert_level);
494
495 manager
496 .add_callback(move |alert| {
497 let alert_level = Arc::clone(&alert_level_clone);
498 tokio::spawn(async move {
499 let mut level = alert_level.write().await;
500 *level = Some(alert.level);
501 });
502 })
503 .await;
504
505 manager.check_latency("query", 600.0).await;
506
507 tokio::time::sleep(Duration::from_millis(10)).await;
509
510 let level = alert_level.read().await;
511 assert_eq!(*level, Some(AlertLevel::Critical));
512 }
513
514 #[tokio::test]
515 async fn test_check_latency_disabled() {
516 let manager = AlertManager::new();
517 let alert_count = Arc::new(AtomicUsize::new(0));
518 let alert_count_clone = Arc::clone(&alert_count);
519
520 manager
521 .add_callback(move |_alert| {
522 alert_count_clone.fetch_add(1, Ordering::SeqCst);
523 })
524 .await;
525
526 manager.check_latency("query", 10000.0).await;
527
528 assert_eq!(alert_count.load(Ordering::SeqCst), 0);
530 }
531
532 #[tokio::test]
533 async fn test_multiple_callbacks() {
534 let manager = AlertManager::with_queue_depth_alerts(100, 200);
535 let count1 = Arc::new(AtomicUsize::new(0));
536 let count2 = Arc::new(AtomicUsize::new(0));
537
538 let count1_clone = Arc::clone(&count1);
539 manager
540 .add_callback(move |_alert| {
541 count1_clone.fetch_add(1, Ordering::SeqCst);
542 })
543 .await;
544
545 let count2_clone = Arc::clone(&count2);
546 manager
547 .add_callback(move |_alert| {
548 count2_clone.fetch_add(1, Ordering::SeqCst);
549 })
550 .await;
551
552 manager.check_queue_depth("query", 150).await;
553
554 assert_eq!(count1.load(Ordering::SeqCst), 1);
556 assert_eq!(count2.load(Ordering::SeqCst), 1);
557 }
558
559 #[tokio::test]
560 async fn test_alert_level_ordering() {
561 assert!(AlertLevel::Info < AlertLevel::Warning);
562 assert!(AlertLevel::Warning < AlertLevel::Critical);
563 }
564
565 #[tokio::test]
566 async fn test_alert_manager_default() {
567 let manager = AlertManager::default();
568 let queue_config = manager.queue_depth_config().await;
569 assert!(!queue_config.enabled);
570 }
571
572 #[tokio::test]
573 async fn test_queue_depth_alert_config_default() {
574 let config = QueueDepthAlertConfig::default();
575 assert!(!config.enabled);
576 }
577
578 #[tokio::test]
579 async fn test_latency_alert_config_default() {
580 let config = LatencyAlertConfig::default();
581 assert!(!config.enabled);
582 }
583}