Skip to main content

rsigma_runtime/alert_pipeline/
mod.rs

1//! Post-engine alert-processing layer.
2//!
3//! An optional stage in the daemon sink path, between post-evaluation
4//! enrichment and the sinks, modeled on the Alertmanager processing pipeline.
5//! It currently runs two stages: fingerprint deduplication (`active ->
6//! resolved` lifecycle) and incident grouping (`group_by` equality or an
7//! opt-in `entity_graph` union-find). It is the home for the silencing and
8//! inhibition stages as they land.
9//!
10//! The layer is strictly post-engine: it consumes [`EvaluationResult`]s and
11//! emits [`EvaluationResult`]s plus [`IncidentResult`]s, so the evaluation hot
12//! path is untouched. The immutable, validated config ([`AlertPipeline`]) is
13//! built from a YAML file and swapped atomically on hot-reload; the mutable
14//! [`DedupStore`] and [`IncidentStore`] are owned by the sink task (the
15//! incident store behind an `RwLock` so the admin API can read open incidents).
16
17mod config;
18mod dedup;
19mod grouping;
20mod inhibit;
21mod matcher;
22mod silence;
23mod snapshot;
24mod state;
25
26pub use crate::selector::{Selector, SelectorParseError};
27pub use config::{
28    AlertPipelineConfigError, AlertPipelineFile, CapsFile, DEFAULT_MAX_DYNAMIC_SILENCES, DedupFile,
29    GroupFile, GroupModeLabel, IncludeLabel, ScopeConfig, build_alert_pipeline,
30    load_alert_pipeline_file, parse_alert_pipeline_config,
31};
32pub use dedup::DedupStore;
33pub use grouping::{GroupMode, IncidentRef, IncidentResult, IncidentStore, IncludeMode};
34pub use matcher::{MatchOp, Matcher, MatcherError, MatcherSet, MatcherSpec};
35pub use silence::{
36    Silence, SilenceError, SilenceOrigin, SilenceSpec, SilenceState, SilenceStore, SilenceView,
37};
38pub use snapshot::{AlertPipelineSnapshot, SNAPSHOT_VERSION};
39pub use state::AlertPipelineState;
40
41use rsigma_eval::{EvaluationResult, ProcessResult};
42use serde_json::Value;
43
44use crate::{MetricsHook, Scope};
45
46use dedup::DedupConfig;
47use grouping::{GroupConfig, OvermergeGuard};
48use inhibit::InhibitConfig;
49use silence::Silence as StaticSilence;
50
51/// Output of [`AlertPipeline::tick`]: dedup summary records (re-emit /
52/// resolved) and incident emissions.
53#[derive(Debug, Default)]
54pub struct TickOutput {
55    /// Dedup `repeat` / `resolved` summary lines (serialized results with
56    /// `dedup_*` keys), dispatched as raw NDJSON.
57    pub dedup_lines: Vec<Value>,
58    /// Incident emissions, dispatched via the incident path.
59    pub incidents: Vec<IncidentResult>,
60}
61
62/// A validated, runnable alert-processing pipeline.
63///
64/// Immutable after construction and cheap to clone behind an `Arc`, so it can
65/// be swapped atomically on hot-reload while the sink task keeps a live
66/// snapshot for the duration of a batch.
67#[derive(Debug)]
68pub struct AlertPipeline {
69    scope: Scope,
70    strip_event: bool,
71    dedup: Option<DedupConfig>,
72    group: Option<GroupConfig>,
73    static_silences: Vec<StaticSilence>,
74    inhibit: Option<InhibitConfig>,
75    max_silences: usize,
76}
77
78impl AlertPipeline {
79    /// Construct from validated parts. Prefer [`build_alert_pipeline`].
80    pub(crate) fn new(
81        scope: Scope,
82        strip_event: bool,
83        dedup: Option<DedupConfig>,
84        group: Option<GroupConfig>,
85        static_silences: Vec<StaticSilence>,
86        inhibit: Option<InhibitConfig>,
87        max_silences: usize,
88    ) -> Self {
89        AlertPipeline {
90            scope,
91            strip_event,
92            dedup,
93            group,
94            static_silences,
95            inhibit,
96            max_silences,
97        }
98    }
99
100    /// The static silences declared in the config, for (re-)seeding the store.
101    pub fn static_silences(&self) -> &[StaticSilence] {
102        &self.static_silences
103    }
104
105    /// Ceiling on concurrently-tracked dynamic (API) silences. The silence API
106    /// rejects creation past this many.
107    pub fn max_dynamic_silences(&self) -> usize {
108        self.max_silences
109    }
110
111    /// The configured incident include mode, if grouping is enabled.
112    pub fn incident_include(&self) -> Option<IncludeMode> {
113        self.group.as_ref().map(|g| g.include)
114    }
115
116    /// The configured incident NATS subject override, if any.
117    pub fn incident_nats_subject(&self) -> Option<&str> {
118        self.group.as_ref().and_then(|g| g.nats_subject.as_deref())
119    }
120
121    /// Process the results produced from one input event: dedup folds
122    /// duplicates into `dedup_store`, grouping assigns survivors to incidents
123    /// in `incident_store` and annotates them with `incident_id`. Out-of-scope
124    /// results pass through untouched.
125    pub fn process(
126        &self,
127        results: ProcessResult,
128        state: &mut AlertPipelineState,
129        now: i64,
130        metrics: &dyn MetricsHook,
131    ) -> ProcessResult {
132        let start = std::time::Instant::now();
133        let mut kept = Vec::with_capacity(results.len());
134
135        for mut result in results {
136            if !self.scope.matches(&result) {
137                kept.push(result);
138                continue;
139            }
140
141            // Inhibition: an inhibited target is dropped before it can become a
142            // source. `evaluate` also records non-inhibited results (including
143            // ones about to be silenced) as active sources, so a silenced
144            // source still inhibits its targets.
145            if let Some(icfg) = self.inhibit.as_ref()
146                && let Some(rule) = state.inhibit.evaluate(icfg, &result, now)
147            {
148                metrics.on_alert_pipeline_inhibited(&rule);
149                continue;
150            }
151
152            // Silencing: an active silence mutes the result before dedup, so a
153            // silenced result neither emits nor opens an incident.
154            if state.silences.active_match(&result, now).is_some() {
155                metrics.on_alert_pipeline_silenced();
156                continue;
157            }
158
159            // Dedup: fold duplicates into the active alert. When the store is at
160            // its cap, a first-fire for a new fingerprint passes through
161            // un-deduped rather than opening another alert, bounding memory; the
162            // store-entries gauge plateauing at the cap signals saturation.
163            if let Some(cfg) = self.dedup.as_ref() {
164                let fingerprint = dedup::fingerprint(&cfg.fingerprint, &result);
165                if state.dedup.contains(&fingerprint) {
166                    state.dedup.fold(&fingerprint, now);
167                    metrics.on_alert_pipeline_result("folded");
168                    continue;
169                }
170                if state.dedup.len() < cfg.max_active_alerts {
171                    let fields = dedup::resolve_fields(&cfg.fingerprint, &result);
172                    let sample = dedup::sample_of(&result);
173                    state.dedup.insert(fingerprint, now, sample, fields);
174                }
175                metrics.on_alert_pipeline_result("emitted");
176            }
177
178            // Grouping: assign the survivor to an incident, reading entity /
179            // group-by selectors off the result while the event is still
180            // present, then annotate it with the incident id.
181            if let Some(gcfg) = self.group.as_ref()
182                && let Some(id) = state.incidents.assign(gcfg, &result, now, |guard| {
183                    metrics.on_alert_pipeline_overmerge(guard_label(guard));
184                })
185            {
186                if self.strip_event {
187                    strip_event_payloads(&mut result);
188                }
189                annotate_incident(&mut result, id);
190                kept.push(result);
191                continue;
192            }
193
194            if self.strip_event {
195                strip_event_payloads(&mut result);
196            }
197            kept.push(result);
198        }
199
200        if self.dedup.is_some() {
201            metrics.set_alert_pipeline_store_entries(state.dedup.len() as i64);
202        }
203        if self.group.is_some() {
204            metrics.set_incidents_open(state.incidents.len() as i64);
205        }
206        metrics.observe_alert_pipeline_duration(start.elapsed().as_secs_f64());
207        kept
208    }
209
210    /// Advance time: emit due dedup `repeat` / `resolved` records and incident
211    /// emissions (`group_wait` / `group_interval` / `repeat` / `resolved`).
212    pub fn tick(
213        &self,
214        state: &mut AlertPipelineState,
215        now: i64,
216        metrics: &dyn MetricsHook,
217    ) -> TickOutput {
218        let start = std::time::Instant::now();
219        let mut out = TickOutput::default();
220
221        if let Some(cfg) = self.dedup.as_ref() {
222            for record in state.dedup.tick(cfg, now) {
223                metrics.on_alert_pipeline_result(record.state);
224                metrics.on_alert_pipeline_summary_emitted();
225                if record.state == "resolved" {
226                    metrics.on_alert_pipeline_eviction();
227                }
228                out.dedup_lines.push(record.json);
229            }
230            metrics.set_alert_pipeline_store_entries(state.dedup.len() as i64);
231        }
232
233        if let Some(gcfg) = self.group.as_ref() {
234            for emission in state.incidents.tick(gcfg, now) {
235                metrics.on_incident_emitted(emission.trigger);
236                out.incidents.push(emission.result);
237            }
238            metrics.set_incidents_open(state.incidents.len() as i64);
239        }
240
241        // Garbage-collect expired silences and refresh the active gauge.
242        state.silences.gc(now);
243        metrics.set_silences_active(state.silences.active_count(now) as i64);
244
245        // Garbage-collect stale inhibition sources and refresh the gauge.
246        if let Some(icfg) = self.inhibit.as_ref() {
247            state.inhibit.gc(icfg, now);
248            metrics.set_inhibit_sources_active(state.inhibit.active_count(icfg, now) as i64);
249        }
250
251        if !out.dedup_lines.is_empty() || !out.incidents.is_empty() {
252            metrics.observe_alert_pipeline_duration(start.elapsed().as_secs_f64());
253        }
254        out
255    }
256
257    /// Capture the mutable state into a versioned persistence snapshot.
258    pub fn snapshot(&self, state: &AlertPipelineState) -> AlertPipelineSnapshot {
259        AlertPipelineSnapshot {
260            version: SNAPSHOT_VERSION,
261            dedup: state.dedup.snapshot(),
262            incidents: state.incidents.export(),
263            silences: state.silences.api_snapshot(),
264            inhibit_sources: state.inhibit.snapshot(),
265        }
266    }
267
268    /// Restore a snapshot into `state`, pruning entries past their window at
269    /// `now`. Returns `false` on a version mismatch (caller starts fresh).
270    pub fn restore(
271        &self,
272        state: &mut AlertPipelineState,
273        snap: AlertPipelineSnapshot,
274        now: i64,
275    ) -> bool {
276        if snap.version != SNAPSHOT_VERSION {
277            return false;
278        }
279        // Silences are independent of the configured stages.
280        state.silences.restore_api(snap.silences, now);
281        if let Some(cfg) = self.dedup.as_ref() {
282            state
283                .dedup
284                .restore(snap.dedup, now, cfg.resolve_timeout.as_secs() as i64);
285        }
286        if let Some(g) = self.group.as_ref() {
287            state
288                .incidents
289                .restore(snap.incidents, now, g.resolve_timeout.as_secs() as i64);
290        }
291        if let Some(icfg) = self.inhibit.as_ref() {
292            state.inhibit.restore(snap.inhibit_sources, icfg, now);
293        }
294        true
295    }
296}
297
298/// Inject the reserved `incident_id` key into a result's enrichments. The layer
299/// wins on a collision with a user enricher.
300fn annotate_incident(result: &mut EvaluationResult, id: String) {
301    let map = result
302        .header
303        .enrichments
304        .get_or_insert_with(serde_json::Map::new);
305    if map.contains_key("incident_id") {
306        // Debug, not warn: an upstream enricher setting `incident_id` on every
307        // result would otherwise emit one log line per result.
308        tracing::debug!("alert pipeline: overwriting a user-set `incident_id` enrichment key");
309    }
310    map.insert("incident_id".to_string(), Value::String(id));
311}
312
313/// Metric label for an entity-graph guard hit.
314fn guard_label(guard: OvermergeGuard) -> &'static str {
315    match guard {
316        OvermergeGuard::StopValue => "stop_value",
317        OvermergeGuard::CardinalityCeiling => "cardinality_ceiling",
318    }
319}
320
321/// Remove raw event payloads from a result. Used for the long-lived dedup
322/// sample and, when `strip_event` is set, for pass-through results, so the
323/// layer can fingerprint and group on `event.*` without emitting full events.
324pub(crate) fn strip_event_payloads(result: &mut EvaluationResult) {
325    if let Some(detection) = result.as_detection_mut() {
326        detection.event = None;
327    }
328    if let Some(correlation) = result.as_correlation_mut() {
329        correlation.events = None;
330        correlation.event_refs = None;
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use crate::NoopMetrics;
338    use rsigma_eval::{DetectionBody, EvaluationResult, FieldMatch, ResultBody, RuleHeader};
339    use rsigma_parser::Level;
340    use std::collections::HashMap;
341    use std::sync::Arc;
342
343    fn pipeline(yaml: &str) -> AlertPipeline {
344        let file: AlertPipelineFile = yaml_serde::from_str(yaml).unwrap();
345        build_alert_pipeline(file).unwrap()
346    }
347
348    fn detection(ip: &str, level: Level) -> EvaluationResult {
349        EvaluationResult {
350            header: RuleHeader {
351                rule_title: "Brute force".to_string(),
352                rule_id: Some("rule-1".to_string()),
353                level: Some(level),
354                tags: vec![],
355                custom_attributes: Arc::new(HashMap::new()),
356                enrichments: None,
357            },
358            body: ResultBody::Detection(DetectionBody {
359                matched_selections: vec![],
360                matched_fields: vec![FieldMatch::new("SourceIp", serde_json::json!(ip))],
361                event: Some(serde_json::json!({"raw": "event"})),
362            }),
363        }
364    }
365
366    fn run(
367        p: &AlertPipeline,
368        ip: &str,
369        level: Level,
370        state: &mut AlertPipelineState,
371        now: i64,
372    ) -> ProcessResult {
373        p.process(vec![detection(ip, level)], state, now, &NoopMetrics)
374    }
375
376    #[test]
377    fn dedup_emits_first_fire_and_folds_duplicates() {
378        let p = pipeline("dedup:\n  fingerprint: [match.SourceIp]\n  resolve_timeout: 1h\n");
379        let mut st = AlertPipelineState::default();
380
381        let first = run(&p, "10.0.0.1", Level::High, &mut st, 0);
382        assert_eq!(first.len(), 1);
383        let dup = run(&p, "10.0.0.1", Level::High, &mut st, 5);
384        assert!(dup.is_empty());
385    }
386
387    #[test]
388    fn out_of_scope_results_bypass_the_layer() {
389        let p = pipeline("scope:\n  levels: [critical]\ndedup:\n  fingerprint: [match.SourceIp]\n");
390        let mut st = AlertPipelineState::default();
391        let a = run(&p, "10.0.0.1", Level::High, &mut st, 0);
392        let b = run(&p, "10.0.0.1", Level::High, &mut st, 1);
393        assert_eq!(a.len(), 1);
394        assert_eq!(b.len(), 1);
395        assert!(st.dedup.is_empty());
396    }
397
398    #[test]
399    fn grouping_annotates_incident_id_and_opens_on_group_wait() {
400        let p =
401            pipeline("group:\n  by: [match.SourceIp]\n  group_wait: 30s\n  resolve_timeout: 1h\n");
402        let mut st = AlertPipelineState::default();
403        let kept = run(&p, "10.0.0.1", Level::High, &mut st, 0);
404        assert_eq!(kept.len(), 1);
405        let id = kept[0].header.enrichments.as_ref().unwrap()["incident_id"]
406            .as_str()
407            .unwrap()
408            .to_string();
409        assert!(!id.is_empty());
410
411        // No incident emission before group_wait; one open emission after.
412        assert!(p.tick(&mut st, 10, &NoopMetrics).incidents.is_empty());
413        let out = p.tick(&mut st, 40, &NoopMetrics);
414        assert_eq!(out.incidents.len(), 1);
415        assert_eq!(out.incidents[0].incident_id, id);
416        assert_eq!(out.incidents[0].trigger, "group_wait");
417    }
418
419    #[test]
420    fn dedup_then_group_compose() {
421        let p = pipeline(
422            "dedup:\n  fingerprint: [rule, match.SourceIp]\n  resolve_timeout: 1h\ngroup:\n  by: [match.SourceIp]\n  group_wait: 0s\n",
423        );
424        let mut st = AlertPipelineState::default();
425        // First fire: deduped (passes) and grouped.
426        let a = run(&p, "10.0.0.1", Level::High, &mut st, 0);
427        assert_eq!(a.len(), 1);
428        assert!(
429            a[0].header
430                .enrichments
431                .as_ref()
432                .unwrap()
433                .contains_key("incident_id")
434        );
435        // Duplicate: folded by dedup, never reaches grouping.
436        let b = run(&p, "10.0.0.1", Level::High, &mut st, 1);
437        assert!(b.is_empty());
438        assert_eq!(
439            st.incidents.len(),
440            1,
441            "the duplicate did not open a second incident"
442        );
443    }
444
445    #[test]
446    fn strip_event_drops_payload_after_grouping() {
447        let p = pipeline("strip_event: true\ngroup:\n  by: [event.raw]\n  group_wait: 0s\n");
448        let mut st = AlertPipelineState::default();
449        let kept = run(&p, "10.0.0.1", Level::High, &mut st, 0);
450        assert_eq!(kept.len(), 1);
451        // Event stripped from the delivered result, but grouping still keyed
452        // on event.raw (one incident opened).
453        assert!(kept[0].as_detection().unwrap().event.is_none());
454        assert_eq!(st.incidents.len(), 1);
455    }
456
457    #[test]
458    fn inhibition_mutes_target_while_source_active() {
459        let p = pipeline(
460            "inhibit_rules:\n  - name: crit\n    source_match:\n      - selector: level\n        op: \"=\"\n        value: critical\n    target_match:\n      - selector: level\n        op: \"=\"\n        value: high\n    equal: [match.SourceIp]\n    duration: 5m\n",
461        );
462        let mut st = AlertPipelineState::default();
463        // Critical source on 10.0.0.1 passes and registers as a source.
464        assert_eq!(run(&p, "10.0.0.1", Level::Critical, &mut st, 0).len(), 1);
465        // High target on the same IP is inhibited (dropped).
466        assert!(run(&p, "10.0.0.1", Level::High, &mut st, 1).is_empty());
467        // High target on a different IP passes.
468        assert_eq!(run(&p, "10.0.0.2", Level::High, &mut st, 2).len(), 1);
469    }
470
471    #[test]
472    fn snapshot_round_trips_and_prunes() {
473        let p = pipeline(
474            "dedup:\n  fingerprint: [match.SourceIp]\n  resolve_timeout: 1h\ngroup:\n  by: [match.SourceIp]\n  group_wait: 1h\n  resolve_timeout: 1h\n",
475        );
476        let mut st = AlertPipelineState::default();
477        let _ = run(&p, "10.0.0.1", Level::High, &mut st, 100);
478        st.silences.add(
479            Silence::build(
480                SilenceSpec {
481                    matchers: vec![MatcherSpec {
482                        selector: "rule".to_string(),
483                        op: MatchOp::Eq,
484                        value: "other".to_string(),
485                    }],
486                    ..Default::default()
487                },
488                SilenceOrigin::Api,
489            )
490            .unwrap(),
491        );
492        assert_eq!(st.dedup.len(), 1);
493        assert_eq!(st.incidents.len(), 1);
494
495        // Round-trip the snapshot through JSON.
496        let json = serde_json::to_string(&p.snapshot(&st)).unwrap();
497        let snap: AlertPipelineSnapshot = serde_json::from_str(&json).unwrap();
498
499        // Restore within the window: state comes back.
500        let mut fresh = AlertPipelineState::default();
501        assert!(p.restore(&mut fresh, snap, 200));
502        assert_eq!(fresh.dedup.len(), 1, "dedup alert restored");
503        assert_eq!(fresh.incidents.len(), 1, "incident restored");
504        assert_eq!(
505            fresh.silences.api_snapshot().len(),
506            1,
507            "api silence restored"
508        );
509        // The restored dedup alert folds a duplicate.
510        assert!(run(&p, "10.0.0.1", Level::High, &mut fresh, 250).is_empty());
511
512        // Restore far past the windows: dedup + incident are pruned.
513        let snap2: AlertPipelineSnapshot =
514            serde_json::from_str(&serde_json::to_string(&p.snapshot(&st)).unwrap()).unwrap();
515        let mut aged = AlertPipelineState::default();
516        assert!(p.restore(&mut aged, snap2, 100 + 3600 + 5));
517        assert!(aged.dedup.is_empty(), "stale dedup alert pruned on restore");
518        assert!(
519            aged.incidents.is_empty(),
520            "stale incident pruned on restore"
521        );
522    }
523
524    #[test]
525    fn static_silence_mutes_matching_results() {
526        let p = pipeline(
527            "silences:\n  - matchers:\n      - selector: match.SourceIp\n        op: \"=\"\n        value: 10.0.0.1\ndedup:\n  fingerprint: [match.SourceIp]\n",
528        );
529        let mut st = AlertPipelineState::default();
530        st.silences.set_static(p.static_silences().to_vec());
531
532        // 10.0.0.1 is silenced (dropped); 10.0.0.2 passes through.
533        assert!(run(&p, "10.0.0.1", Level::High, &mut st, 0).is_empty());
534        assert_eq!(run(&p, "10.0.0.2", Level::High, &mut st, 0).len(), 1);
535        // The silenced result never entered the dedup store.
536        assert_eq!(st.dedup.len(), 1);
537    }
538}