1#![allow(clippy::cast_precision_loss)] use std::collections::HashMap;
9use std::time::{Duration, Instant};
10
11#[derive(Debug)]
38pub struct BudgetMonitor {
39 window: Duration,
41 threshold: f64,
43 violations: HashMap<String, ViolationWindow>,
45 last_check: Instant,
47}
48
49impl BudgetMonitor {
50 #[must_use]
57 pub fn new(window: Duration, threshold: f64) -> Self {
58 Self {
59 window,
60 threshold,
61 violations: HashMap::new(),
62 last_check: Instant::now(),
63 }
64 }
65
66 pub fn record_violation(&mut self, task: &str, ring: u8, exceeded_by_ns: u64) {
74 let entry = self
75 .violations
76 .entry(task.to_string())
77 .or_insert_with(|| ViolationWindow::new(self.window));
78
79 entry.record(ring, exceeded_by_ns);
80 }
81
82 pub fn check_alerts(&mut self) -> Vec<BudgetAlert> {
87 let now = Instant::now();
88 self.last_check = now;
89
90 let mut alerts = Vec::new();
91
92 for (task, window) in &mut self.violations {
93 window.prune(now);
95
96 let window_secs = self.window.as_secs_f64();
98 let rate = if window_secs > 0.0 {
99 window.count as f64 / window_secs
100 } else {
101 0.0
102 };
103
104 if rate > self.threshold {
105 alerts.push(BudgetAlert {
106 task: task.clone(),
107 ring: window.primary_ring,
108 violation_rate: rate,
109 avg_exceeded_by_ns: window.avg_exceeded_ns(),
110 total_violations: window.count,
111 window_duration: self.window,
112 });
113 }
114 }
115
116 alerts
117 }
118
119 #[must_use]
121 pub fn violation_count(&self, task: &str) -> u64 {
122 self.violations.get(task).map_or(0, |w| w.count)
123 }
124
125 #[must_use]
127 pub fn threshold(&self) -> f64 {
128 self.threshold
129 }
130
131 pub fn set_threshold(&mut self, threshold: f64) {
133 self.threshold = threshold;
134 }
135
136 #[must_use]
138 pub fn window(&self) -> Duration {
139 self.window
140 }
141
142 pub fn reset(&mut self) {
144 self.violations.clear();
145 self.last_check = Instant::now();
146 }
147
148 #[must_use]
150 pub fn violations(&self) -> &HashMap<String, ViolationWindow> {
151 &self.violations
152 }
153}
154
155#[derive(Debug)]
157pub struct ViolationWindow {
158 timestamps: Vec<Instant>,
160 count: u64,
162 total_exceeded_ns: u64,
164 window: Duration,
166 primary_ring: u8,
168 ring_counts: [u64; 3],
170}
171
172impl ViolationWindow {
173 fn new(window: Duration) -> Self {
175 Self {
176 timestamps: Vec::with_capacity(1024),
177 count: 0,
178 total_exceeded_ns: 0,
179 window,
180 primary_ring: 0,
181 ring_counts: [0; 3],
182 }
183 }
184
185 fn record(&mut self, ring: u8, exceeded_by_ns: u64) {
187 self.timestamps.push(Instant::now());
188 self.count += 1;
189 self.total_exceeded_ns += exceeded_by_ns;
190
191 if ring < 3 {
193 self.ring_counts[ring as usize] += 1;
194 if self.ring_counts[ring as usize] > self.ring_counts[self.primary_ring as usize] {
196 self.primary_ring = ring;
197 }
198 }
199 }
200
201 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
203 fn prune(&mut self, now: Instant) {
204 let cutoff = now.checked_sub(self.window).unwrap_or(now);
206 let before_len = self.timestamps.len();
207 self.timestamps.retain(|&t| t > cutoff);
208 let pruned = before_len - self.timestamps.len();
209
210 if pruned > 0 && self.count >= pruned as u64 {
212 self.count -= pruned as u64;
213 if before_len > 0 {
215 let ratio = (before_len - pruned) as f64 / before_len as f64;
216 self.total_exceeded_ns = (self.total_exceeded_ns as f64 * ratio) as u64;
218 }
219 }
220 }
221
222 #[must_use]
224 pub fn violation_rate(&self, window_secs: f64) -> f64 {
225 if window_secs <= 0.0 {
226 return 0.0;
227 }
228 self.count as f64 / window_secs
229 }
230
231 fn avg_exceeded_ns(&self) -> u64 {
233 if self.count == 0 {
234 return 0;
235 }
236 self.total_exceeded_ns / self.count
237 }
238
239 #[must_use]
241 pub fn count(&self) -> u64 {
242 self.count
243 }
244
245 #[must_use]
247 pub fn total_exceeded_ns(&self) -> u64 {
248 self.total_exceeded_ns
249 }
250
251 #[must_use]
253 pub fn primary_ring(&self) -> u8 {
254 self.primary_ring
255 }
256}
257
258#[derive(Debug, Clone)]
260pub struct BudgetAlert {
261 pub task: String,
263 pub ring: u8,
265 pub violation_rate: f64,
267 pub avg_exceeded_by_ns: u64,
269 pub total_violations: u64,
271 pub window_duration: Duration,
273}
274
275impl BudgetAlert {
276 #[must_use]
278 pub fn summary(&self) -> String {
279 format!(
280 "Task '{}' (Ring {}) exceeds budget: {:.1} violations/sec, avg exceeded by {}ns",
281 self.task, self.ring, self.violation_rate, self.avg_exceeded_by_ns
282 )
283 }
284
285 #[must_use]
293 pub fn severity(&self) -> &'static str {
294 if self.violation_rate > 100.0 {
295 "critical"
296 } else if self.violation_rate > 50.0 {
297 "high"
298 } else if self.violation_rate > 10.0 {
299 "medium"
300 } else {
301 "low"
302 }
303 }
304
305 #[must_use]
307 pub fn ring_name(&self) -> &'static str {
308 match self.ring {
309 0 => "Ring 0 (Hot Path)",
310 1 => "Ring 1 (Background)",
311 2 => "Ring 2 (Control Plane)",
312 _ => "Unknown",
313 }
314 }
315}
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320 use std::thread;
321
322 #[test]
323 #[allow(clippy::float_cmp)]
324 fn test_monitor_new() {
325 let monitor = BudgetMonitor::new(Duration::from_secs(60), 10.0);
326 assert_eq!(monitor.threshold(), 10.0);
327 assert_eq!(monitor.window(), Duration::from_secs(60));
328 }
329
330 #[test]
331 fn test_record_violation() {
332 let mut monitor = BudgetMonitor::new(Duration::from_secs(60), 10.0);
333 monitor.record_violation("test_task", 0, 100);
334 monitor.record_violation("test_task", 0, 200);
335
336 assert_eq!(monitor.violation_count("test_task"), 2);
337 assert_eq!(monitor.violation_count("other_task"), 0);
338 }
339
340 #[test]
341 fn test_alert_triggered() {
342 let mut monitor = BudgetMonitor::new(Duration::from_secs(1), 5.0);
343
344 for _ in 0..20 {
346 monitor.record_violation("test_task", 0, 100);
347 }
348
349 thread::sleep(Duration::from_millis(10));
351
352 let alerts = monitor.check_alerts();
353 assert!(!alerts.is_empty());
354 assert_eq!(alerts[0].task, "test_task");
355 assert!(alerts[0].violation_rate > 5.0);
356 }
357
358 #[test]
359 fn test_no_alert_under_threshold() {
360 let mut monitor = BudgetMonitor::new(Duration::from_secs(1), 100.0);
361
362 for _ in 0..5 {
364 monitor.record_violation("test_task", 0, 100);
365 }
366
367 thread::sleep(Duration::from_millis(10));
368
369 let alerts = monitor.check_alerts();
370 assert!(alerts.is_empty());
371 }
372
373 #[test]
374 fn test_alert_summary() {
375 let alert = BudgetAlert {
376 task: "ring0_event".to_string(),
377 ring: 0,
378 violation_rate: 25.5,
379 avg_exceeded_by_ns: 500,
380 total_violations: 100,
381 window_duration: Duration::from_secs(60),
382 };
383
384 let summary = alert.summary();
385 assert!(summary.contains("ring0_event"));
386 assert!(summary.contains("Ring 0"));
387 assert!(summary.contains("25.5"));
388 }
389
390 #[test]
391 fn test_alert_severity() {
392 let mut alert = BudgetAlert {
393 task: "test".to_string(),
394 ring: 0,
395 violation_rate: 5.0,
396 avg_exceeded_by_ns: 0,
397 total_violations: 0,
398 window_duration: Duration::from_secs(1),
399 };
400
401 assert_eq!(alert.severity(), "low");
402
403 alert.violation_rate = 25.0;
404 assert_eq!(alert.severity(), "medium");
405
406 alert.violation_rate = 75.0;
407 assert_eq!(alert.severity(), "high");
408
409 alert.violation_rate = 150.0;
410 assert_eq!(alert.severity(), "critical");
411 }
412
413 #[test]
414 fn test_ring_name() {
415 let alert = BudgetAlert {
416 task: "test".to_string(),
417 ring: 0,
418 violation_rate: 0.0,
419 avg_exceeded_by_ns: 0,
420 total_violations: 0,
421 window_duration: Duration::from_secs(1),
422 };
423 assert_eq!(alert.ring_name(), "Ring 0 (Hot Path)");
424
425 let alert1 = BudgetAlert {
426 ring: 1,
427 ..alert.clone()
428 };
429 assert_eq!(alert1.ring_name(), "Ring 1 (Background)");
430
431 let alert2 = BudgetAlert { ring: 2, ..alert };
432 assert_eq!(alert2.ring_name(), "Ring 2 (Control Plane)");
433 }
434
435 #[test]
436 fn test_monitor_reset() {
437 let mut monitor = BudgetMonitor::new(Duration::from_secs(60), 10.0);
438 monitor.record_violation("test_task", 0, 100);
439 assert_eq!(monitor.violation_count("test_task"), 1);
440
441 monitor.reset();
442 assert_eq!(monitor.violation_count("test_task"), 0);
443 }
444
445 #[test]
446 #[allow(clippy::float_cmp)]
447 fn test_set_threshold() {
448 let mut monitor = BudgetMonitor::new(Duration::from_secs(60), 10.0);
449 assert_eq!(monitor.threshold(), 10.0);
450
451 monitor.set_threshold(50.0);
452 assert_eq!(monitor.threshold(), 50.0);
453 }
454
455 #[test]
456 fn test_primary_ring_tracking() {
457 let mut monitor = BudgetMonitor::new(Duration::from_secs(60), 10.0);
458
459 monitor.record_violation("test", 0, 100);
461 monitor.record_violation("test", 1, 100);
462 monitor.record_violation("test", 1, 100);
463 monitor.record_violation("test", 1, 100);
464
465 let window = monitor.violations().get("test").unwrap();
466 assert_eq!(window.primary_ring(), 1);
467 }
468
469 #[test]
470 fn test_violation_window_accessors() {
471 let mut monitor = BudgetMonitor::new(Duration::from_secs(60), 10.0);
472 monitor.record_violation("test", 0, 500);
473 monitor.record_violation("test", 0, 300);
474
475 let window = monitor.violations().get("test").unwrap();
476 assert_eq!(window.count(), 2);
477 assert_eq!(window.total_exceeded_ns(), 800);
478 assert_eq!(window.primary_ring(), 0);
479 }
480}