1use crate::failover::AlertEvent;
9use std::collections::HashMap;
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, Instant};
12
13#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum AlertSeverity {
18 Info,
19 Warning,
20 Critical,
21}
22
23pub type RulePredicate = Arc<dyn Fn(&AlertEvent) -> Option<AlertSeverity> + Send + Sync>;
29
30pub struct AlertRule {
32 pub name: String,
34 pub predicate: RulePredicate,
36}
37
38#[derive(Debug, Clone)]
42pub struct FiredAlert {
43 pub rule_name: String,
45 pub severity: AlertSeverity,
47 pub event: AlertEvent,
49 pub dedup_key: String,
51}
52
53pub trait AlertSink: Send + Sync {
57 fn on_alert(&self, alert: &FiredAlert);
58}
59
60pub struct LogSink;
62
63impl AlertSink for LogSink {
64 fn on_alert(&self, alert: &FiredAlert) {
65 match alert.severity {
66 AlertSeverity::Critical => {
67 tracing::error!(
68 rule = %alert.rule_name,
69 dedup_key = %alert.dedup_key,
70 "CRITICAL alert fired"
71 );
72 }
73 AlertSeverity::Warning => {
74 tracing::warn!(
75 rule = %alert.rule_name,
76 dedup_key = %alert.dedup_key,
77 "WARNING alert fired"
78 );
79 }
80 AlertSeverity::Info => {
81 tracing::info!(
82 rule = %alert.rule_name,
83 dedup_key = %alert.dedup_key,
84 "INFO alert fired"
85 );
86 }
87 }
88 }
89}
90
91pub struct CollectingSink {
94 alerts: Mutex<Vec<FiredAlert>>,
95}
96
97impl Default for CollectingSink {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103impl CollectingSink {
104 pub fn new() -> Self {
106 Self {
107 alerts: Mutex::new(Vec::new()),
108 }
109 }
110
111 pub fn collected(&self) -> Vec<FiredAlert> {
113 self.alerts
114 .lock()
115 .unwrap_or_else(|e| e.into_inner())
116 .clone()
117 }
118
119 pub fn clear(&self) {
121 self.alerts
122 .lock()
123 .unwrap_or_else(|e| e.into_inner())
124 .clear();
125 }
126}
127
128impl AlertSink for CollectingSink {
129 fn on_alert(&self, alert: &FiredAlert) {
130 self.alerts
131 .lock()
132 .unwrap_or_else(|e| e.into_inner())
133 .push(alert.clone());
134 }
135}
136
137pub fn event_dedup_key(event: &AlertEvent) -> String {
142 match event {
143 AlertEvent::NodeFailed { node_id } => format!("node_failed:{node_id}"),
144 AlertEvent::NodeRecovered { node_id } => format!("node_recovered:{node_id}"),
145 AlertEvent::LeaderChanged { new_leader, .. } => format!("leader_changed:{new_leader}"),
146 AlertEvent::QuorumLost { .. } => "quorum_lost".to_owned(),
147 AlertEvent::SlowReplication { follower, .. } => format!("slow_replication:{follower}"),
148 }
149}
150
151pub struct RuleEngine {
156 rules: Vec<AlertRule>,
157 sinks: Vec<Arc<dyn AlertSink>>,
158 dedup_window: Duration,
159 dedup: Mutex<HashMap<(String, String), Instant>>,
161}
162
163impl RuleEngine {
164 pub fn new_with_window(dedup_window: Duration) -> Self {
169 Self {
170 rules: Vec::new(),
171 sinks: Vec::new(),
172 dedup_window,
173 dedup: Mutex::new(HashMap::new()),
174 }
175 }
176
177 pub fn add_rule(&mut self, rule: AlertRule) -> &mut Self {
179 self.rules.push(rule);
180 self
181 }
182
183 pub fn add_sink(&mut self, sink: Arc<dyn AlertSink>) -> &mut Self {
185 self.sinks.push(sink);
186 self
187 }
188
189 pub fn process_event(&self, event: &AlertEvent, now: Instant) {
194 let dedup_key = event_dedup_key(event);
195
196 for rule in &self.rules {
197 let Some(severity) = (rule.predicate)(event) else {
198 continue;
199 };
200
201 let dup_tuple = (rule.name.clone(), dedup_key.clone());
202
203 {
205 let mut guard = self.dedup.lock().unwrap_or_else(|e| e.into_inner());
206 if let Some(last) = guard.get(&dup_tuple) {
207 if now.duration_since(*last) < self.dedup_window {
208 continue; }
210 }
211 guard.insert(dup_tuple, now);
212 } let alert = FiredAlert {
215 rule_name: rule.name.clone(),
216 severity,
217 event: event.clone(),
218 dedup_key: dedup_key.clone(),
219 };
220
221 for sink in &self.sinks {
222 sink.on_alert(&alert);
223 }
224 }
225 }
226}
227
228pub fn default_rules(slow_repl_threshold: u64) -> Vec<AlertRule> {
235 vec![
236 AlertRule {
237 name: "node_failed".to_owned(),
238 predicate: Arc::new(|event| match event {
239 AlertEvent::NodeFailed { .. } => Some(AlertSeverity::Critical),
240 _ => None,
241 }),
242 },
243 AlertRule {
244 name: "quorum_lost".to_owned(),
245 predicate: Arc::new(|event| match event {
246 AlertEvent::QuorumLost { .. } => Some(AlertSeverity::Critical),
247 _ => None,
248 }),
249 },
250 AlertRule {
251 name: "leader_changed".to_owned(),
252 predicate: Arc::new(|event| match event {
253 AlertEvent::LeaderChanged { .. } => Some(AlertSeverity::Warning),
254 _ => None,
255 }),
256 },
257 AlertRule {
258 name: "slow_replication".to_owned(),
259 predicate: Arc::new(move |event| match event {
260 AlertEvent::SlowReplication { lag_entries, .. }
261 if *lag_entries >= slow_repl_threshold =>
262 {
263 Some(AlertSeverity::Warning)
264 }
265 _ => None,
266 }),
267 },
268 ]
269}
270
271#[cfg(test)]
274mod tests {
275 use super::*;
276 use std::time::{Duration, Instant};
277
278 fn build_engine(sink: Arc<CollectingSink>) -> RuleEngine {
279 let mut engine = RuleEngine::new_with_window(Duration::from_secs(60));
280 for rule in default_rules(100) {
281 engine.add_rule(rule);
282 }
283 engine.add_sink(sink);
284 engine
285 }
286
287 #[test]
288 fn test_node_failed_fires_critical() {
289 let sink = Arc::new(CollectingSink::new());
290 let engine = build_engine(Arc::clone(&sink));
291 engine.process_event(&AlertEvent::NodeFailed { node_id: 1 }, Instant::now());
292 let alerts = sink.collected();
293 assert_eq!(alerts.len(), 1);
294 assert_eq!(alerts[0].severity, AlertSeverity::Critical);
295 assert_eq!(alerts[0].rule_name, "node_failed");
296 }
297
298 #[test]
299 fn test_quorum_lost_fires_critical() {
300 let sink = Arc::new(CollectingSink::new());
301 let engine = build_engine(Arc::clone(&sink));
302 engine.process_event(
303 &AlertEvent::QuorumLost {
304 cluster_size: 5,
305 reachable: 2,
306 },
307 Instant::now(),
308 );
309 let alerts = sink.collected();
310 assert_eq!(alerts.len(), 1);
311 assert_eq!(alerts[0].severity, AlertSeverity::Critical);
312 assert_eq!(alerts[0].rule_name, "quorum_lost");
313 }
314
315 #[test]
316 fn test_leader_changed_fires_warning() {
317 let sink = Arc::new(CollectingSink::new());
318 let engine = build_engine(Arc::clone(&sink));
319 engine.process_event(
320 &AlertEvent::LeaderChanged {
321 old_leader: Some(1),
322 new_leader: 2,
323 },
324 Instant::now(),
325 );
326 let alerts = sink.collected();
327 assert_eq!(alerts.len(), 1);
328 assert_eq!(alerts[0].severity, AlertSeverity::Warning);
329 assert_eq!(alerts[0].rule_name, "leader_changed");
330 }
331
332 #[test]
333 fn test_slow_replication_fires_warning_above_threshold() {
334 let sink = Arc::new(CollectingSink::new());
335 let engine = build_engine(Arc::clone(&sink));
336 engine.process_event(
337 &AlertEvent::SlowReplication {
338 follower: 3,
339 lag_entries: 200,
340 },
341 Instant::now(),
342 );
343 let alerts = sink.collected();
344 assert_eq!(alerts.len(), 1);
345 assert_eq!(alerts[0].severity, AlertSeverity::Warning);
346 assert_eq!(alerts[0].rule_name, "slow_replication");
347 }
348
349 #[test]
350 fn test_slow_replication_silent_below_threshold() {
351 let sink = Arc::new(CollectingSink::new());
352 let engine = build_engine(Arc::clone(&sink));
353 engine.process_event(
354 &AlertEvent::SlowReplication {
355 follower: 3,
356 lag_entries: 50, },
358 Instant::now(),
359 );
360 assert!(sink.collected().is_empty());
361 }
362
363 #[test]
364 fn test_dedup_suppresses_repeat_within_window() {
365 let sink = Arc::new(CollectingSink::new());
366 let engine = build_engine(Arc::clone(&sink));
367 let t0 = Instant::now();
368 let event = AlertEvent::NodeFailed { node_id: 7 };
369 engine.process_event(&event, t0);
370 engine.process_event(&event, t0 + Duration::from_secs(1));
372 assert_eq!(
373 sink.collected().len(),
374 1,
375 "second fire should be suppressed"
376 );
377 }
378
379 #[test]
380 fn test_dedup_refires_after_window() {
381 let sink = Arc::new(CollectingSink::new());
382 let engine = build_engine(Arc::clone(&sink));
383 let t0 = Instant::now();
384 let event = AlertEvent::NodeFailed { node_id: 8 };
385 engine.process_event(&event, t0);
386 engine.process_event(&event, t0 + Duration::from_secs(61));
388 assert_eq!(
389 sink.collected().len(),
390 2,
391 "should refire after window expires"
392 );
393 }
394
395 #[test]
396 fn test_collecting_sink_captures_alerts() {
397 let sink = Arc::new(CollectingSink::new());
398 let engine = build_engine(Arc::clone(&sink));
399 let t0 = Instant::now();
400 engine.process_event(&AlertEvent::NodeFailed { node_id: 10 }, t0);
401 engine.process_event(
402 &AlertEvent::QuorumLost {
403 cluster_size: 3,
404 reachable: 1,
405 },
406 t0,
407 );
408 let alerts = sink.collected();
409 assert_eq!(alerts.len(), 2);
410 sink.clear();
411 assert!(sink.collected().is_empty());
412 }
413}