Skip to main content

amaters_cluster/
alert_rules.rs

1//! Alert rule engine for the AmateRS cluster layer.
2//!
3//! Provides a [`RuleEngine`] that evaluates [`AlertRule`]s against incoming
4//! [`AlertEvent`]s, assigns [`AlertSeverity`], deduplicates within a
5//! configurable time window, and fans out [`FiredAlert`]s to registered
6//! [`AlertSink`]s.
7
8use crate::failover::AlertEvent;
9use std::collections::HashMap;
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, Instant};
12
13// ── Severity ────────────────────────────────────────────────────────────────
14
15/// Severity level assigned to a fired alert.
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum AlertSeverity {
18    Info,
19    Warning,
20    Critical,
21}
22
23// ── Rule ────────────────────────────────────────────────────────────────────
24
25/// A predicate function that inspects an [`AlertEvent`] and optionally returns
26/// the severity at which the rule should fire.  Returning `None` means the
27/// rule does not match this event.
28pub type RulePredicate = Arc<dyn Fn(&AlertEvent) -> Option<AlertSeverity> + Send + Sync>;
29
30/// A named rule with its matching predicate.
31pub struct AlertRule {
32    /// Human-readable rule identifier (used as part of the dedup key).
33    pub name: String,
34    /// Predicate that maps an event to an optional severity.
35    pub predicate: RulePredicate,
36}
37
38// ── FiredAlert ───────────────────────────────────────────────────────────────
39
40/// A fully-resolved alert that has passed rule evaluation and dedup checks.
41#[derive(Debug, Clone)]
42pub struct FiredAlert {
43    /// Name of the rule that produced this alert.
44    pub rule_name: String,
45    /// Severity assigned by the rule.
46    pub severity: AlertSeverity,
47    /// The original event that triggered the alert.
48    pub event: AlertEvent,
49    /// Stable dedup key derived from the event (see [`event_dedup_key`]).
50    pub dedup_key: String,
51}
52
53// ── Sinks ────────────────────────────────────────────────────────────────────
54
55/// Trait implemented by all alert receivers.
56pub trait AlertSink: Send + Sync {
57    fn on_alert(&self, alert: &FiredAlert);
58}
59
60/// A sink that emits alerts via the `tracing` crate.
61pub struct LogSink;
62
63impl AlertSink for LogSink {
64    fn on_alert(&self, alert: &FiredAlert) {
65        match alert.severity {
66            AlertSeverity::Critical => {
67                tracing::error!(
68                    rule = %alert.rule_name,
69                    dedup_key = %alert.dedup_key,
70                    "CRITICAL alert fired"
71                );
72            }
73            AlertSeverity::Warning => {
74                tracing::warn!(
75                    rule = %alert.rule_name,
76                    dedup_key = %alert.dedup_key,
77                    "WARNING alert fired"
78                );
79            }
80            AlertSeverity::Info => {
81                tracing::info!(
82                    rule = %alert.rule_name,
83                    dedup_key = %alert.dedup_key,
84                    "INFO alert fired"
85                );
86            }
87        }
88    }
89}
90
91/// A sink that collects fired alerts into an in-memory vector; intended for
92/// unit tests.
93pub struct CollectingSink {
94    alerts: Mutex<Vec<FiredAlert>>,
95}
96
97impl Default for CollectingSink {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103impl CollectingSink {
104    /// Create a new, empty collecting sink.
105    pub fn new() -> Self {
106        Self {
107            alerts: Mutex::new(Vec::new()),
108        }
109    }
110
111    /// Return a clone of all collected alerts.
112    pub fn collected(&self) -> Vec<FiredAlert> {
113        self.alerts
114            .lock()
115            .unwrap_or_else(|e| e.into_inner())
116            .clone()
117    }
118
119    /// Clear all collected alerts.
120    pub fn clear(&self) {
121        self.alerts
122            .lock()
123            .unwrap_or_else(|e| e.into_inner())
124            .clear();
125    }
126}
127
128impl AlertSink for CollectingSink {
129    fn on_alert(&self, alert: &FiredAlert) {
130        self.alerts
131            .lock()
132            .unwrap_or_else(|e| e.into_inner())
133            .push(alert.clone());
134    }
135}
136
137// ── Dedup key ────────────────────────────────────────────────────────────────
138
139/// Return a stable string that uniquely identifies the *entity* (node, cluster,
140/// …) that an event concerns.  Used as the second component of the dedup key.
141pub fn event_dedup_key(event: &AlertEvent) -> String {
142    match event {
143        AlertEvent::NodeFailed { node_id } => format!("node_failed:{node_id}"),
144        AlertEvent::NodeRecovered { node_id } => format!("node_recovered:{node_id}"),
145        AlertEvent::LeaderChanged { new_leader, .. } => format!("leader_changed:{new_leader}"),
146        AlertEvent::QuorumLost { .. } => "quorum_lost".to_owned(),
147        AlertEvent::SlowReplication { follower, .. } => format!("slow_replication:{follower}"),
148    }
149}
150
151// ── RuleEngine ───────────────────────────────────────────────────────────────
152
153/// Evaluates alert rules, deduplicates firings within a time window, and fans
154/// out [`FiredAlert`]s to registered [`AlertSink`]s.
155pub struct RuleEngine {
156    rules: Vec<AlertRule>,
157    sinks: Vec<Arc<dyn AlertSink>>,
158    dedup_window: Duration,
159    /// Maps `(rule_name, dedup_key)` → time of last firing.
160    dedup: Mutex<HashMap<(String, String), Instant>>,
161}
162
163impl RuleEngine {
164    /// Create a rule engine with the given dedup window.
165    ///
166    /// An event fired within `dedup_window` of a previous identical event
167    /// (same rule + same entity) is suppressed.
168    pub fn new_with_window(dedup_window: Duration) -> Self {
169        Self {
170            rules: Vec::new(),
171            sinks: Vec::new(),
172            dedup_window,
173            dedup: Mutex::new(HashMap::new()),
174        }
175    }
176
177    /// Register an alert rule.  Returns `&mut Self` for chaining.
178    pub fn add_rule(&mut self, rule: AlertRule) -> &mut Self {
179        self.rules.push(rule);
180        self
181    }
182
183    /// Register an alert sink.  Returns `&mut Self` for chaining.
184    pub fn add_sink(&mut self, sink: Arc<dyn AlertSink>) -> &mut Self {
185        self.sinks.push(sink);
186        self
187    }
188
189    /// Process one event against all registered rules.
190    ///
191    /// `now` is injected so that unit tests can advance time deterministically
192    /// without sleeping.
193    pub fn process_event(&self, event: &AlertEvent, now: Instant) {
194        let dedup_key = event_dedup_key(event);
195
196        for rule in &self.rules {
197            let Some(severity) = (rule.predicate)(event) else {
198                continue;
199            };
200
201            let dup_tuple = (rule.name.clone(), dedup_key.clone());
202
203            // --- dedup check ---
204            {
205                let mut guard = self.dedup.lock().unwrap_or_else(|e| e.into_inner());
206                if let Some(last) = guard.get(&dup_tuple) {
207                    if now.duration_since(*last) < self.dedup_window {
208                        continue; // within window — suppress
209                    }
210                }
211                guard.insert(dup_tuple, now);
212            } // release lock before calling sinks
213
214            let alert = FiredAlert {
215                rule_name: rule.name.clone(),
216                severity,
217                event: event.clone(),
218                dedup_key: dedup_key.clone(),
219            };
220
221            for sink in &self.sinks {
222                sink.on_alert(&alert);
223            }
224        }
225    }
226}
227
228// ── Default rule set ─────────────────────────────────────────────────────────
229
230/// Build the standard set of alert rules used by the cluster layer.
231///
232/// `slow_repl_threshold` — minimum `lag_entries` value for a
233/// [`AlertEvent::SlowReplication`] event to trigger a warning.
234pub fn default_rules(slow_repl_threshold: u64) -> Vec<AlertRule> {
235    vec![
236        AlertRule {
237            name: "node_failed".to_owned(),
238            predicate: Arc::new(|event| match event {
239                AlertEvent::NodeFailed { .. } => Some(AlertSeverity::Critical),
240                _ => None,
241            }),
242        },
243        AlertRule {
244            name: "quorum_lost".to_owned(),
245            predicate: Arc::new(|event| match event {
246                AlertEvent::QuorumLost { .. } => Some(AlertSeverity::Critical),
247                _ => None,
248            }),
249        },
250        AlertRule {
251            name: "leader_changed".to_owned(),
252            predicate: Arc::new(|event| match event {
253                AlertEvent::LeaderChanged { .. } => Some(AlertSeverity::Warning),
254                _ => None,
255            }),
256        },
257        AlertRule {
258            name: "slow_replication".to_owned(),
259            predicate: Arc::new(move |event| match event {
260                AlertEvent::SlowReplication { lag_entries, .. }
261                    if *lag_entries >= slow_repl_threshold =>
262                {
263                    Some(AlertSeverity::Warning)
264                }
265                _ => None,
266            }),
267        },
268    ]
269}
270
271// ── Tests ─────────────────────────────────────────────────────────────────────
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use std::time::{Duration, Instant};
277
278    fn build_engine(sink: Arc<CollectingSink>) -> RuleEngine {
279        let mut engine = RuleEngine::new_with_window(Duration::from_secs(60));
280        for rule in default_rules(100) {
281            engine.add_rule(rule);
282        }
283        engine.add_sink(sink);
284        engine
285    }
286
287    #[test]
288    fn test_node_failed_fires_critical() {
289        let sink = Arc::new(CollectingSink::new());
290        let engine = build_engine(Arc::clone(&sink));
291        engine.process_event(&AlertEvent::NodeFailed { node_id: 1 }, Instant::now());
292        let alerts = sink.collected();
293        assert_eq!(alerts.len(), 1);
294        assert_eq!(alerts[0].severity, AlertSeverity::Critical);
295        assert_eq!(alerts[0].rule_name, "node_failed");
296    }
297
298    #[test]
299    fn test_quorum_lost_fires_critical() {
300        let sink = Arc::new(CollectingSink::new());
301        let engine = build_engine(Arc::clone(&sink));
302        engine.process_event(
303            &AlertEvent::QuorumLost {
304                cluster_size: 5,
305                reachable: 2,
306            },
307            Instant::now(),
308        );
309        let alerts = sink.collected();
310        assert_eq!(alerts.len(), 1);
311        assert_eq!(alerts[0].severity, AlertSeverity::Critical);
312        assert_eq!(alerts[0].rule_name, "quorum_lost");
313    }
314
315    #[test]
316    fn test_leader_changed_fires_warning() {
317        let sink = Arc::new(CollectingSink::new());
318        let engine = build_engine(Arc::clone(&sink));
319        engine.process_event(
320            &AlertEvent::LeaderChanged {
321                old_leader: Some(1),
322                new_leader: 2,
323            },
324            Instant::now(),
325        );
326        let alerts = sink.collected();
327        assert_eq!(alerts.len(), 1);
328        assert_eq!(alerts[0].severity, AlertSeverity::Warning);
329        assert_eq!(alerts[0].rule_name, "leader_changed");
330    }
331
332    #[test]
333    fn test_slow_replication_fires_warning_above_threshold() {
334        let sink = Arc::new(CollectingSink::new());
335        let engine = build_engine(Arc::clone(&sink));
336        engine.process_event(
337            &AlertEvent::SlowReplication {
338                follower: 3,
339                lag_entries: 200,
340            },
341            Instant::now(),
342        );
343        let alerts = sink.collected();
344        assert_eq!(alerts.len(), 1);
345        assert_eq!(alerts[0].severity, AlertSeverity::Warning);
346        assert_eq!(alerts[0].rule_name, "slow_replication");
347    }
348
349    #[test]
350    fn test_slow_replication_silent_below_threshold() {
351        let sink = Arc::new(CollectingSink::new());
352        let engine = build_engine(Arc::clone(&sink));
353        engine.process_event(
354            &AlertEvent::SlowReplication {
355                follower: 3,
356                lag_entries: 50, // below 100 threshold
357            },
358            Instant::now(),
359        );
360        assert!(sink.collected().is_empty());
361    }
362
363    #[test]
364    fn test_dedup_suppresses_repeat_within_window() {
365        let sink = Arc::new(CollectingSink::new());
366        let engine = build_engine(Arc::clone(&sink));
367        let t0 = Instant::now();
368        let event = AlertEvent::NodeFailed { node_id: 7 };
369        engine.process_event(&event, t0);
370        // Second fire 1 second later — still within 60-second window
371        engine.process_event(&event, t0 + Duration::from_secs(1));
372        assert_eq!(
373            sink.collected().len(),
374            1,
375            "second fire should be suppressed"
376        );
377    }
378
379    #[test]
380    fn test_dedup_refires_after_window() {
381        let sink = Arc::new(CollectingSink::new());
382        let engine = build_engine(Arc::clone(&sink));
383        let t0 = Instant::now();
384        let event = AlertEvent::NodeFailed { node_id: 8 };
385        engine.process_event(&event, t0);
386        // Second fire 61 seconds later — window has expired
387        engine.process_event(&event, t0 + Duration::from_secs(61));
388        assert_eq!(
389            sink.collected().len(),
390            2,
391            "should refire after window expires"
392        );
393    }
394
395    #[test]
396    fn test_collecting_sink_captures_alerts() {
397        let sink = Arc::new(CollectingSink::new());
398        let engine = build_engine(Arc::clone(&sink));
399        let t0 = Instant::now();
400        engine.process_event(&AlertEvent::NodeFailed { node_id: 10 }, t0);
401        engine.process_event(
402            &AlertEvent::QuorumLost {
403                cluster_size: 3,
404                reachable: 1,
405            },
406            t0,
407        );
408        let alerts = sink.collected();
409        assert_eq!(alerts.len(), 2);
410        sink.clear();
411        assert!(sink.collected().is_empty());
412    }
413}