Skip to main content

laminar_core/budget/
monitor.rs

1//! Budget violation monitoring and alerting.
2//!
3//! Metrics intentionally use f64 for rate calculations, accepting precision loss
4//! for very large counts (>2^52). This is acceptable for monitoring purposes.
5
6#![allow(clippy::cast_precision_loss)] // Metrics don't need full u64 precision
7
8use std::collections::HashMap;
9use std::time::{Duration, Instant};
10
11/// Tracks budget violations over time windows for alerting.
12///
13/// The monitor aggregates violations per task and checks against
14/// configurable thresholds. When a task exceeds its threshold,
15/// an alert is generated.
16///
17/// # Example
18///
19/// ```rust,ignore
20/// use laminar_core::budget::BudgetMonitor;
21/// use std::time::Duration;
22///
23/// let mut monitor = BudgetMonitor::new(
24///     Duration::from_secs(60),  // 1 minute window
25///     10.0,                      // Alert if > 10 violations/sec
26/// );
27///
28/// // Record violations
29/// monitor.record_violation("ring0_event", 0, 100);
30///
31/// // Check for alerts
32/// let alerts = monitor.check_alerts();
33/// for alert in alerts {
34///     eprintln!("ALERT: {} has {} violations/sec", alert.task, alert.violation_rate);
35/// }
36/// ```
37#[derive(Debug)]
38pub struct BudgetMonitor {
39    /// Window size for rate calculation
40    window: Duration,
41    /// Alert threshold (violations per second)
42    threshold: f64,
43    /// Per-task violation windows
44    violations: HashMap<String, ViolationWindow>,
45    /// Last check time
46    last_check: Instant,
47}
48
49impl BudgetMonitor {
50    /// Create a new budget monitor.
51    ///
52    /// # Arguments
53    ///
54    /// * `window` - Time window for rate calculation (e.g., 60 seconds)
55    /// * `threshold` - Alert threshold in violations per second
56    #[must_use]
57    pub fn new(window: Duration, threshold: f64) -> Self {
58        Self {
59            window,
60            threshold,
61            violations: HashMap::new(),
62            last_check: Instant::now(),
63        }
64    }
65
66    /// Record a budget violation.
67    ///
68    /// # Arguments
69    ///
70    /// * `task` - Task name that violated budget
71    /// * `ring` - Ring number (0, 1, or 2)
72    /// * `exceeded_by_ns` - How much the budget was exceeded in nanoseconds
73    pub fn record_violation(&mut self, task: &str, ring: u8, exceeded_by_ns: u64) {
74        let entry = self
75            .violations
76            .entry(task.to_string())
77            .or_insert_with(|| ViolationWindow::new(self.window));
78
79        entry.record(ring, exceeded_by_ns);
80    }
81
82    /// Check for alerts based on configured threshold.
83    ///
84    /// Returns a list of alerts for tasks exceeding the threshold.
85    /// Call this periodically (e.g., every second).
86    pub fn check_alerts(&mut self) -> Vec<BudgetAlert> {
87        let now = Instant::now();
88        self.last_check = now;
89
90        let mut alerts = Vec::new();
91
92        for (task, window) in &mut self.violations {
93            // Prune old violations outside the window
94            window.prune(now);
95
96            // Calculate rate based on the window duration
97            let window_secs = self.window.as_secs_f64();
98            let rate = if window_secs > 0.0 {
99                window.count as f64 / window_secs
100            } else {
101                0.0
102            };
103
104            if rate > self.threshold {
105                alerts.push(BudgetAlert {
106                    task: task.clone(),
107                    ring: window.primary_ring,
108                    violation_rate: rate,
109                    avg_exceeded_by_ns: window.avg_exceeded_ns(),
110                    total_violations: window.count,
111                    window_duration: self.window,
112                });
113            }
114        }
115
116        alerts
117    }
118
119    /// Get current violation count for a task.
120    #[must_use]
121    pub fn violation_count(&self, task: &str) -> u64 {
122        self.violations.get(task).map_or(0, |w| w.count)
123    }
124
125    /// Get the alert threshold (violations per second).
126    #[must_use]
127    pub fn threshold(&self) -> f64 {
128        self.threshold
129    }
130
131    /// Update the alert threshold.
132    pub fn set_threshold(&mut self, threshold: f64) {
133        self.threshold = threshold;
134    }
135
136    /// Get the window duration.
137    #[must_use]
138    pub fn window(&self) -> Duration {
139        self.window
140    }
141
142    /// Reset all violation tracking.
143    pub fn reset(&mut self) {
144        self.violations.clear();
145        self.last_check = Instant::now();
146    }
147
148    /// Get all current violation windows.
149    #[must_use]
150    pub fn violations(&self) -> &HashMap<String, ViolationWindow> {
151        &self.violations
152    }
153}
154
155/// Tracks violations within a time window.
156#[derive(Debug)]
157pub struct ViolationWindow {
158    /// Violation timestamps (for pruning)
159    timestamps: Vec<Instant>,
160    /// Total count in current window
161    count: u64,
162    /// Total exceeded time in nanoseconds
163    total_exceeded_ns: u64,
164    /// Window duration
165    window: Duration,
166    /// Primary ring for violations (most common)
167    primary_ring: u8,
168    /// Ring violation counts
169    ring_counts: [u64; 3],
170}
171
172impl ViolationWindow {
173    /// Create a new violation window.
174    fn new(window: Duration) -> Self {
175        Self {
176            timestamps: Vec::with_capacity(1024),
177            count: 0,
178            total_exceeded_ns: 0,
179            window,
180            primary_ring: 0,
181            ring_counts: [0; 3],
182        }
183    }
184
185    /// Record a violation.
186    fn record(&mut self, ring: u8, exceeded_by_ns: u64) {
187        self.timestamps.push(Instant::now());
188        self.count += 1;
189        self.total_exceeded_ns += exceeded_by_ns;
190
191        // Track ring counts
192        if ring < 3 {
193            self.ring_counts[ring as usize] += 1;
194            // Update primary ring if this one has more violations
195            if self.ring_counts[ring as usize] > self.ring_counts[self.primary_ring as usize] {
196                self.primary_ring = ring;
197            }
198        }
199    }
200
201    /// Prune violations outside the window.
202    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
203    fn prune(&mut self, now: Instant) {
204        // Use checked_sub with fallback to handle edge cases (e.g., system time issues)
205        let cutoff = now.checked_sub(self.window).unwrap_or(now);
206        let before_len = self.timestamps.len();
207        self.timestamps.retain(|&t| t > cutoff);
208        let pruned = before_len - self.timestamps.len();
209
210        // Adjust count (approximate - we don't track per-violation exceeded time)
211        if pruned > 0 && self.count >= pruned as u64 {
212            self.count -= pruned as u64;
213            // Approximately reduce exceeded time proportionally
214            if before_len > 0 {
215                let ratio = (before_len - pruned) as f64 / before_len as f64;
216                // This is approximate anyway, truncation is acceptable
217                self.total_exceeded_ns = (self.total_exceeded_ns as f64 * ratio) as u64;
218            }
219        }
220    }
221
222    /// Calculate violation rate (per second) for a given window duration.
223    #[must_use]
224    pub fn violation_rate(&self, window_secs: f64) -> f64 {
225        if window_secs <= 0.0 {
226            return 0.0;
227        }
228        self.count as f64 / window_secs
229    }
230
231    /// Calculate average exceeded time.
232    fn avg_exceeded_ns(&self) -> u64 {
233        if self.count == 0 {
234            return 0;
235        }
236        self.total_exceeded_ns / self.count
237    }
238
239    /// Get violation count.
240    #[must_use]
241    pub fn count(&self) -> u64 {
242        self.count
243    }
244
245    /// Get total exceeded nanoseconds.
246    #[must_use]
247    pub fn total_exceeded_ns(&self) -> u64 {
248        self.total_exceeded_ns
249    }
250
251    /// Get primary ring (most violations).
252    #[must_use]
253    pub fn primary_ring(&self) -> u8 {
254        self.primary_ring
255    }
256}
257
258/// Alert generated when a task exceeds violation threshold.
259#[derive(Debug, Clone)]
260pub struct BudgetAlert {
261    /// Task name that triggered the alert
262    pub task: String,
263    /// Primary ring for violations
264    pub ring: u8,
265    /// Violation rate (per second)
266    pub violation_rate: f64,
267    /// Average exceeded time in nanoseconds
268    pub avg_exceeded_by_ns: u64,
269    /// Total violations in window
270    pub total_violations: u64,
271    /// Window duration
272    pub window_duration: Duration,
273}
274
275impl BudgetAlert {
276    /// Get a human-readable summary.
277    #[must_use]
278    pub fn summary(&self) -> String {
279        format!(
280            "Task '{}' (Ring {}) exceeds budget: {:.1} violations/sec, avg exceeded by {}ns",
281            self.task, self.ring, self.violation_rate, self.avg_exceeded_by_ns
282        )
283    }
284
285    /// Get severity based on violation rate.
286    ///
287    /// Returns:
288    /// - "critical" if rate > 100/sec
289    /// - "high" if rate > 50/sec
290    /// - "medium" if rate > 10/sec
291    /// - "low" otherwise
292    #[must_use]
293    pub fn severity(&self) -> &'static str {
294        if self.violation_rate > 100.0 {
295            "critical"
296        } else if self.violation_rate > 50.0 {
297            "high"
298        } else if self.violation_rate > 10.0 {
299            "medium"
300        } else {
301            "low"
302        }
303    }
304
305    /// Get ring name.
306    #[must_use]
307    pub fn ring_name(&self) -> &'static str {
308        match self.ring {
309            0 => "Ring 0 (Hot Path)",
310            1 => "Ring 1 (Background)",
311            2 => "Ring 2 (Control Plane)",
312            _ => "Unknown",
313        }
314    }
315}
316
317#[cfg(test)]
318mod tests {
319    use super::*;
320    use std::thread;
321
322    #[test]
323    #[allow(clippy::float_cmp)]
324    fn test_monitor_new() {
325        let monitor = BudgetMonitor::new(Duration::from_secs(60), 10.0);
326        assert_eq!(monitor.threshold(), 10.0);
327        assert_eq!(monitor.window(), Duration::from_secs(60));
328    }
329
330    #[test]
331    fn test_record_violation() {
332        let mut monitor = BudgetMonitor::new(Duration::from_secs(60), 10.0);
333        monitor.record_violation("test_task", 0, 100);
334        monitor.record_violation("test_task", 0, 200);
335
336        assert_eq!(monitor.violation_count("test_task"), 2);
337        assert_eq!(monitor.violation_count("other_task"), 0);
338    }
339
340    #[test]
341    fn test_alert_triggered() {
342        let mut monitor = BudgetMonitor::new(Duration::from_secs(1), 5.0);
343
344        // Record many violations quickly
345        for _ in 0..20 {
346            monitor.record_violation("test_task", 0, 100);
347        }
348
349        // Small delay to get non-zero elapsed time
350        thread::sleep(Duration::from_millis(10));
351
352        let alerts = monitor.check_alerts();
353        assert!(!alerts.is_empty());
354        assert_eq!(alerts[0].task, "test_task");
355        assert!(alerts[0].violation_rate > 5.0);
356    }
357
358    #[test]
359    fn test_no_alert_under_threshold() {
360        let mut monitor = BudgetMonitor::new(Duration::from_secs(1), 100.0);
361
362        // Record few violations
363        for _ in 0..5 {
364            monitor.record_violation("test_task", 0, 100);
365        }
366
367        thread::sleep(Duration::from_millis(10));
368
369        let alerts = monitor.check_alerts();
370        assert!(alerts.is_empty());
371    }
372
373    #[test]
374    fn test_alert_summary() {
375        let alert = BudgetAlert {
376            task: "ring0_event".to_string(),
377            ring: 0,
378            violation_rate: 25.5,
379            avg_exceeded_by_ns: 500,
380            total_violations: 100,
381            window_duration: Duration::from_secs(60),
382        };
383
384        let summary = alert.summary();
385        assert!(summary.contains("ring0_event"));
386        assert!(summary.contains("Ring 0"));
387        assert!(summary.contains("25.5"));
388    }
389
390    #[test]
391    fn test_alert_severity() {
392        let mut alert = BudgetAlert {
393            task: "test".to_string(),
394            ring: 0,
395            violation_rate: 5.0,
396            avg_exceeded_by_ns: 0,
397            total_violations: 0,
398            window_duration: Duration::from_secs(1),
399        };
400
401        assert_eq!(alert.severity(), "low");
402
403        alert.violation_rate = 25.0;
404        assert_eq!(alert.severity(), "medium");
405
406        alert.violation_rate = 75.0;
407        assert_eq!(alert.severity(), "high");
408
409        alert.violation_rate = 150.0;
410        assert_eq!(alert.severity(), "critical");
411    }
412
413    #[test]
414    fn test_ring_name() {
415        let alert = BudgetAlert {
416            task: "test".to_string(),
417            ring: 0,
418            violation_rate: 0.0,
419            avg_exceeded_by_ns: 0,
420            total_violations: 0,
421            window_duration: Duration::from_secs(1),
422        };
423        assert_eq!(alert.ring_name(), "Ring 0 (Hot Path)");
424
425        let alert1 = BudgetAlert {
426            ring: 1,
427            ..alert.clone()
428        };
429        assert_eq!(alert1.ring_name(), "Ring 1 (Background)");
430
431        let alert2 = BudgetAlert { ring: 2, ..alert };
432        assert_eq!(alert2.ring_name(), "Ring 2 (Control Plane)");
433    }
434
435    #[test]
436    fn test_monitor_reset() {
437        let mut monitor = BudgetMonitor::new(Duration::from_secs(60), 10.0);
438        monitor.record_violation("test_task", 0, 100);
439        assert_eq!(monitor.violation_count("test_task"), 1);
440
441        monitor.reset();
442        assert_eq!(monitor.violation_count("test_task"), 0);
443    }
444
445    #[test]
446    #[allow(clippy::float_cmp)]
447    fn test_set_threshold() {
448        let mut monitor = BudgetMonitor::new(Duration::from_secs(60), 10.0);
449        assert_eq!(monitor.threshold(), 10.0);
450
451        monitor.set_threshold(50.0);
452        assert_eq!(monitor.threshold(), 50.0);
453    }
454
455    #[test]
456    fn test_primary_ring_tracking() {
457        let mut monitor = BudgetMonitor::new(Duration::from_secs(60), 10.0);
458
459        // Record more Ring 1 violations
460        monitor.record_violation("test", 0, 100);
461        monitor.record_violation("test", 1, 100);
462        monitor.record_violation("test", 1, 100);
463        monitor.record_violation("test", 1, 100);
464
465        let window = monitor.violations().get("test").unwrap();
466        assert_eq!(window.primary_ring(), 1);
467    }
468
469    #[test]
470    fn test_violation_window_accessors() {
471        let mut monitor = BudgetMonitor::new(Duration::from_secs(60), 10.0);
472        monitor.record_violation("test", 0, 500);
473        monitor.record_violation("test", 0, 300);
474
475        let window = monitor.violations().get("test").unwrap();
476        assert_eq!(window.count(), 2);
477        assert_eq!(window.total_exceeded_ns(), 800);
478        assert_eq!(window.primary_ring(), 0);
479    }
480}