1mod config;
18mod dedup;
19mod grouping;
20mod inhibit;
21mod matcher;
22mod silence;
23mod snapshot;
24mod state;
25
26pub use crate::selector::{Selector, SelectorParseError};
27pub use config::{
28 AlertPipelineConfigError, AlertPipelineFile, CapsFile, DEFAULT_MAX_DYNAMIC_SILENCES, DedupFile,
29 GroupFile, GroupModeLabel, IncludeLabel, ScopeConfig, build_alert_pipeline,
30 load_alert_pipeline_file, parse_alert_pipeline_config,
31};
32pub use dedup::DedupStore;
33pub use grouping::{GroupMode, IncidentRef, IncidentResult, IncidentStore, IncludeMode};
34pub use matcher::{MatchOp, Matcher, MatcherError, MatcherSet, MatcherSpec};
35pub use silence::{
36 Silence, SilenceError, SilenceOrigin, SilenceSpec, SilenceState, SilenceStore, SilenceView,
37};
38pub use snapshot::{AlertPipelineSnapshot, SNAPSHOT_VERSION};
39pub use state::AlertPipelineState;
40
41use rsigma_eval::{EvaluationResult, ProcessResult};
42use serde_json::Value;
43
44use crate::{MetricsHook, Scope};
45
46use dedup::DedupConfig;
47use grouping::{GroupConfig, OvermergeGuard};
48use inhibit::InhibitConfig;
49use silence::Silence as StaticSilence;
50
51#[derive(Debug, Default)]
54pub struct TickOutput {
55 pub dedup_lines: Vec<Value>,
58 pub incidents: Vec<IncidentResult>,
60}
61
62#[derive(Debug)]
68pub struct AlertPipeline {
69 scope: Scope,
70 strip_event: bool,
71 dedup: Option<DedupConfig>,
72 group: Option<GroupConfig>,
73 static_silences: Vec<StaticSilence>,
74 inhibit: Option<InhibitConfig>,
75 max_silences: usize,
76}
77
78impl AlertPipeline {
79 pub(crate) fn new(
81 scope: Scope,
82 strip_event: bool,
83 dedup: Option<DedupConfig>,
84 group: Option<GroupConfig>,
85 static_silences: Vec<StaticSilence>,
86 inhibit: Option<InhibitConfig>,
87 max_silences: usize,
88 ) -> Self {
89 AlertPipeline {
90 scope,
91 strip_event,
92 dedup,
93 group,
94 static_silences,
95 inhibit,
96 max_silences,
97 }
98 }
99
100 pub fn static_silences(&self) -> &[StaticSilence] {
102 &self.static_silences
103 }
104
105 pub fn max_dynamic_silences(&self) -> usize {
108 self.max_silences
109 }
110
111 pub fn incident_include(&self) -> Option<IncludeMode> {
113 self.group.as_ref().map(|g| g.include)
114 }
115
116 pub fn incident_nats_subject(&self) -> Option<&str> {
118 self.group.as_ref().and_then(|g| g.nats_subject.as_deref())
119 }
120
121 pub fn process(
126 &self,
127 results: ProcessResult,
128 state: &mut AlertPipelineState,
129 now: i64,
130 metrics: &dyn MetricsHook,
131 ) -> ProcessResult {
132 let start = std::time::Instant::now();
133 let mut kept = Vec::with_capacity(results.len());
134
135 for mut result in results {
136 if !self.scope.matches(&result) {
137 kept.push(result);
138 continue;
139 }
140
141 if let Some(icfg) = self.inhibit.as_ref()
146 && let Some(rule) = state.inhibit.evaluate(icfg, &result, now)
147 {
148 metrics.on_alert_pipeline_inhibited(&rule);
149 continue;
150 }
151
152 if state.silences.active_match(&result, now).is_some() {
155 metrics.on_alert_pipeline_silenced();
156 continue;
157 }
158
159 if let Some(cfg) = self.dedup.as_ref() {
164 let fingerprint = dedup::fingerprint(&cfg.fingerprint, &result);
165 if state.dedup.contains(&fingerprint) {
166 state.dedup.fold(&fingerprint, now);
167 metrics.on_alert_pipeline_result("folded");
168 continue;
169 }
170 if state.dedup.len() < cfg.max_active_alerts {
171 let fields = dedup::resolve_fields(&cfg.fingerprint, &result);
172 let sample = dedup::sample_of(&result);
173 state.dedup.insert(fingerprint, now, sample, fields);
174 }
175 metrics.on_alert_pipeline_result("emitted");
176 }
177
178 if let Some(gcfg) = self.group.as_ref()
182 && let Some(id) = state.incidents.assign(gcfg, &result, now, |guard| {
183 metrics.on_alert_pipeline_overmerge(guard_label(guard));
184 })
185 {
186 if self.strip_event {
187 strip_event_payloads(&mut result);
188 }
189 annotate_incident(&mut result, id);
190 kept.push(result);
191 continue;
192 }
193
194 if self.strip_event {
195 strip_event_payloads(&mut result);
196 }
197 kept.push(result);
198 }
199
200 if self.dedup.is_some() {
201 metrics.set_alert_pipeline_store_entries(state.dedup.len() as i64);
202 }
203 if self.group.is_some() {
204 metrics.set_incidents_open(state.incidents.len() as i64);
205 }
206 metrics.observe_alert_pipeline_duration(start.elapsed().as_secs_f64());
207 kept
208 }
209
210 pub fn tick(
213 &self,
214 state: &mut AlertPipelineState,
215 now: i64,
216 metrics: &dyn MetricsHook,
217 ) -> TickOutput {
218 let start = std::time::Instant::now();
219 let mut out = TickOutput::default();
220
221 if let Some(cfg) = self.dedup.as_ref() {
222 for record in state.dedup.tick(cfg, now) {
223 metrics.on_alert_pipeline_result(record.state);
224 metrics.on_alert_pipeline_summary_emitted();
225 if record.state == "resolved" {
226 metrics.on_alert_pipeline_eviction();
227 }
228 out.dedup_lines.push(record.json);
229 }
230 metrics.set_alert_pipeline_store_entries(state.dedup.len() as i64);
231 }
232
233 if let Some(gcfg) = self.group.as_ref() {
234 for emission in state.incidents.tick(gcfg, now) {
235 metrics.on_incident_emitted(emission.trigger);
236 out.incidents.push(emission.result);
237 }
238 metrics.set_incidents_open(state.incidents.len() as i64);
239 }
240
241 state.silences.gc(now);
243 metrics.set_silences_active(state.silences.active_count(now) as i64);
244
245 if let Some(icfg) = self.inhibit.as_ref() {
247 state.inhibit.gc(icfg, now);
248 metrics.set_inhibit_sources_active(state.inhibit.active_count(icfg, now) as i64);
249 }
250
251 if !out.dedup_lines.is_empty() || !out.incidents.is_empty() {
252 metrics.observe_alert_pipeline_duration(start.elapsed().as_secs_f64());
253 }
254 out
255 }
256
257 pub fn snapshot(&self, state: &AlertPipelineState) -> AlertPipelineSnapshot {
259 AlertPipelineSnapshot {
260 version: SNAPSHOT_VERSION,
261 dedup: state.dedup.snapshot(),
262 incidents: state.incidents.export(),
263 silences: state.silences.api_snapshot(),
264 inhibit_sources: state.inhibit.snapshot(),
265 }
266 }
267
268 pub fn restore(
271 &self,
272 state: &mut AlertPipelineState,
273 snap: AlertPipelineSnapshot,
274 now: i64,
275 ) -> bool {
276 if snap.version != SNAPSHOT_VERSION {
277 return false;
278 }
279 state.silences.restore_api(snap.silences, now);
281 if let Some(cfg) = self.dedup.as_ref() {
282 state
283 .dedup
284 .restore(snap.dedup, now, cfg.resolve_timeout.as_secs() as i64);
285 }
286 if let Some(g) = self.group.as_ref() {
287 state
288 .incidents
289 .restore(snap.incidents, now, g.resolve_timeout.as_secs() as i64);
290 }
291 if let Some(icfg) = self.inhibit.as_ref() {
292 state.inhibit.restore(snap.inhibit_sources, icfg, now);
293 }
294 true
295 }
296}
297
298fn annotate_incident(result: &mut EvaluationResult, id: String) {
301 let map = result
302 .header
303 .enrichments
304 .get_or_insert_with(serde_json::Map::new);
305 if map.contains_key("incident_id") {
306 tracing::debug!("alert pipeline: overwriting a user-set `incident_id` enrichment key");
309 }
310 map.insert("incident_id".to_string(), Value::String(id));
311}
312
313fn guard_label(guard: OvermergeGuard) -> &'static str {
315 match guard {
316 OvermergeGuard::StopValue => "stop_value",
317 OvermergeGuard::CardinalityCeiling => "cardinality_ceiling",
318 }
319}
320
321pub(crate) fn strip_event_payloads(result: &mut EvaluationResult) {
325 if let Some(detection) = result.as_detection_mut() {
326 detection.event = None;
327 }
328 if let Some(correlation) = result.as_correlation_mut() {
329 correlation.events = None;
330 correlation.event_refs = None;
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use crate::NoopMetrics;
338 use rsigma_eval::{DetectionBody, EvaluationResult, FieldMatch, ResultBody, RuleHeader};
339 use rsigma_parser::Level;
340 use std::collections::HashMap;
341 use std::sync::Arc;
342
343 fn pipeline(yaml: &str) -> AlertPipeline {
344 let file: AlertPipelineFile = yaml_serde::from_str(yaml).unwrap();
345 build_alert_pipeline(file).unwrap()
346 }
347
348 fn detection(ip: &str, level: Level) -> EvaluationResult {
349 EvaluationResult {
350 header: RuleHeader {
351 rule_title: "Brute force".to_string(),
352 rule_id: Some("rule-1".to_string()),
353 level: Some(level),
354 tags: vec![],
355 custom_attributes: Arc::new(HashMap::new()),
356 enrichments: None,
357 },
358 body: ResultBody::Detection(DetectionBody {
359 matched_selections: vec![],
360 matched_fields: vec![FieldMatch::new("SourceIp", serde_json::json!(ip))],
361 event: Some(serde_json::json!({"raw": "event"})),
362 }),
363 }
364 }
365
366 fn run(
367 p: &AlertPipeline,
368 ip: &str,
369 level: Level,
370 state: &mut AlertPipelineState,
371 now: i64,
372 ) -> ProcessResult {
373 p.process(vec![detection(ip, level)], state, now, &NoopMetrics)
374 }
375
376 #[test]
377 fn dedup_emits_first_fire_and_folds_duplicates() {
378 let p = pipeline("dedup:\n fingerprint: [match.SourceIp]\n resolve_timeout: 1h\n");
379 let mut st = AlertPipelineState::default();
380
381 let first = run(&p, "10.0.0.1", Level::High, &mut st, 0);
382 assert_eq!(first.len(), 1);
383 let dup = run(&p, "10.0.0.1", Level::High, &mut st, 5);
384 assert!(dup.is_empty());
385 }
386
387 #[test]
388 fn out_of_scope_results_bypass_the_layer() {
389 let p = pipeline("scope:\n levels: [critical]\ndedup:\n fingerprint: [match.SourceIp]\n");
390 let mut st = AlertPipelineState::default();
391 let a = run(&p, "10.0.0.1", Level::High, &mut st, 0);
392 let b = run(&p, "10.0.0.1", Level::High, &mut st, 1);
393 assert_eq!(a.len(), 1);
394 assert_eq!(b.len(), 1);
395 assert!(st.dedup.is_empty());
396 }
397
398 #[test]
399 fn grouping_annotates_incident_id_and_opens_on_group_wait() {
400 let p =
401 pipeline("group:\n by: [match.SourceIp]\n group_wait: 30s\n resolve_timeout: 1h\n");
402 let mut st = AlertPipelineState::default();
403 let kept = run(&p, "10.0.0.1", Level::High, &mut st, 0);
404 assert_eq!(kept.len(), 1);
405 let id = kept[0].header.enrichments.as_ref().unwrap()["incident_id"]
406 .as_str()
407 .unwrap()
408 .to_string();
409 assert!(!id.is_empty());
410
411 assert!(p.tick(&mut st, 10, &NoopMetrics).incidents.is_empty());
413 let out = p.tick(&mut st, 40, &NoopMetrics);
414 assert_eq!(out.incidents.len(), 1);
415 assert_eq!(out.incidents[0].incident_id, id);
416 assert_eq!(out.incidents[0].trigger, "group_wait");
417 }
418
419 #[test]
420 fn dedup_then_group_compose() {
421 let p = pipeline(
422 "dedup:\n fingerprint: [rule, match.SourceIp]\n resolve_timeout: 1h\ngroup:\n by: [match.SourceIp]\n group_wait: 0s\n",
423 );
424 let mut st = AlertPipelineState::default();
425 let a = run(&p, "10.0.0.1", Level::High, &mut st, 0);
427 assert_eq!(a.len(), 1);
428 assert!(
429 a[0].header
430 .enrichments
431 .as_ref()
432 .unwrap()
433 .contains_key("incident_id")
434 );
435 let b = run(&p, "10.0.0.1", Level::High, &mut st, 1);
437 assert!(b.is_empty());
438 assert_eq!(
439 st.incidents.len(),
440 1,
441 "the duplicate did not open a second incident"
442 );
443 }
444
445 #[test]
446 fn strip_event_drops_payload_after_grouping() {
447 let p = pipeline("strip_event: true\ngroup:\n by: [event.raw]\n group_wait: 0s\n");
448 let mut st = AlertPipelineState::default();
449 let kept = run(&p, "10.0.0.1", Level::High, &mut st, 0);
450 assert_eq!(kept.len(), 1);
451 assert!(kept[0].as_detection().unwrap().event.is_none());
454 assert_eq!(st.incidents.len(), 1);
455 }
456
457 #[test]
458 fn inhibition_mutes_target_while_source_active() {
459 let p = pipeline(
460 "inhibit_rules:\n - name: crit\n source_match:\n - selector: level\n op: \"=\"\n value: critical\n target_match:\n - selector: level\n op: \"=\"\n value: high\n equal: [match.SourceIp]\n duration: 5m\n",
461 );
462 let mut st = AlertPipelineState::default();
463 assert_eq!(run(&p, "10.0.0.1", Level::Critical, &mut st, 0).len(), 1);
465 assert!(run(&p, "10.0.0.1", Level::High, &mut st, 1).is_empty());
467 assert_eq!(run(&p, "10.0.0.2", Level::High, &mut st, 2).len(), 1);
469 }
470
471 #[test]
472 fn snapshot_round_trips_and_prunes() {
473 let p = pipeline(
474 "dedup:\n fingerprint: [match.SourceIp]\n resolve_timeout: 1h\ngroup:\n by: [match.SourceIp]\n group_wait: 1h\n resolve_timeout: 1h\n",
475 );
476 let mut st = AlertPipelineState::default();
477 let _ = run(&p, "10.0.0.1", Level::High, &mut st, 100);
478 st.silences.add(
479 Silence::build(
480 SilenceSpec {
481 matchers: vec![MatcherSpec {
482 selector: "rule".to_string(),
483 op: MatchOp::Eq,
484 value: "other".to_string(),
485 }],
486 ..Default::default()
487 },
488 SilenceOrigin::Api,
489 )
490 .unwrap(),
491 );
492 assert_eq!(st.dedup.len(), 1);
493 assert_eq!(st.incidents.len(), 1);
494
495 let json = serde_json::to_string(&p.snapshot(&st)).unwrap();
497 let snap: AlertPipelineSnapshot = serde_json::from_str(&json).unwrap();
498
499 let mut fresh = AlertPipelineState::default();
501 assert!(p.restore(&mut fresh, snap, 200));
502 assert_eq!(fresh.dedup.len(), 1, "dedup alert restored");
503 assert_eq!(fresh.incidents.len(), 1, "incident restored");
504 assert_eq!(
505 fresh.silences.api_snapshot().len(),
506 1,
507 "api silence restored"
508 );
509 assert!(run(&p, "10.0.0.1", Level::High, &mut fresh, 250).is_empty());
511
512 let snap2: AlertPipelineSnapshot =
514 serde_json::from_str(&serde_json::to_string(&p.snapshot(&st)).unwrap()).unwrap();
515 let mut aged = AlertPipelineState::default();
516 assert!(p.restore(&mut aged, snap2, 100 + 3600 + 5));
517 assert!(aged.dedup.is_empty(), "stale dedup alert pruned on restore");
518 assert!(
519 aged.incidents.is_empty(),
520 "stale incident pruned on restore"
521 );
522 }
523
524 #[test]
525 fn static_silence_mutes_matching_results() {
526 let p = pipeline(
527 "silences:\n - matchers:\n - selector: match.SourceIp\n op: \"=\"\n value: 10.0.0.1\ndedup:\n fingerprint: [match.SourceIp]\n",
528 );
529 let mut st = AlertPipelineState::default();
530 st.silences.set_static(p.static_silences().to_vec());
531
532 assert!(run(&p, "10.0.0.1", Level::High, &mut st, 0).is_empty());
534 assert_eq!(run(&p, "10.0.0.2", Level::High, &mut st, 0).len(), 1);
535 assert_eq!(st.dedup.len(), 1);
537 }
538}