Skip to main content

rsigma_runtime/alert_pipeline/
config.rs

1//! YAML schema and loader for the alert-pipeline config file.
2//!
3//! Loaded by the daemon at startup and again on hot-reload. Validation runs at
4//! build time and fails with an error pointing at the offending field (a bad
5//! selector names the selector; a bad scope reports the scope message), so the
6//! daemon refuses to start on a malformed config rather than silently
7//! mismatching at runtime.
8
9use std::collections::BTreeSet;
10use std::path::{Path, PathBuf};
11use std::time::Duration;
12
13use serde::Deserialize;
14
15use crate::Scope;
16
17use super::AlertPipeline;
18use super::dedup::DedupConfig;
19use super::grouping::{Caps, GroupConfig, GroupMode, IncludeMode};
20use super::inhibit::{InhibitConfig, InhibitRule};
21use super::matcher::{MatcherError, MatcherSet, MatcherSpec};
22use super::silence::{Silence, SilenceError, SilenceOrigin, SilenceSpec};
23use crate::selector::{Selector, SelectorParseError};
24
25/// Default re-emit cadence: `0` means pure suppression (no re-emits, only a
26/// resolved summary on expiry).
27const DEFAULT_REPEAT_INTERVAL: Duration = Duration::from_secs(0);
28
29/// Default idle timeout after which an active alert resolves.
30const DEFAULT_RESOLVE_TIMEOUT: Duration = Duration::from_secs(3600);
31
32/// Default incident batching delay before the first emission.
33const DEFAULT_GROUP_WAIT: Duration = Duration::from_secs(30);
34
35/// Default minimum delay before emitting an updated incident.
36const DEFAULT_GROUP_INTERVAL: Duration = Duration::from_secs(300);
37
38/// Default window a source alert stays active for inhibition.
39const DEFAULT_INHIBIT_DURATION: Duration = Duration::from_secs(300);
40
41/// Default ceiling on concurrently-active dedup alerts. Once full, further
42/// first-fires pass through un-deduped instead of growing the store, bounding
43/// memory under a high-cardinality fingerprint.
44const DEFAULT_MAX_ACTIVE_ALERTS: usize = 100_000;
45
46/// Default ceiling on dynamic (API-created) silences, bounding the admin
47/// surface against unbounded silence creation.
48pub const DEFAULT_MAX_DYNAMIC_SILENCES: usize = 1_000;
49
50/// Top-level alert-pipeline config file.
51///
52/// ```yaml
53/// strip_event: false
54/// scope:
55///   levels: [high, critical]
56/// dedup:
57///   fingerprint:
58///     - rule
59///     - match.SourceIp
60///   repeat_interval: 1h
61///   resolve_timeout: 30m
62/// ```
63#[derive(Debug, Clone, Default, Deserialize)]
64pub struct AlertPipelineFile {
65    /// Retain the event for selector resolution but drop raw event payloads
66    /// before sink delivery.
67    #[serde(default)]
68    pub strip_event: bool,
69    /// Restrict which results the layer acts on. Out-of-scope results pass
70    /// through untouched.
71    #[serde(default)]
72    pub scope: Option<ScopeConfig>,
73    /// Fingerprint deduplication. Omitted means no dedup.
74    #[serde(default)]
75    pub dedup: Option<DedupFile>,
76    /// Incident grouping. Omitted means no grouping.
77    #[serde(default)]
78    pub group: Option<GroupFile>,
79    /// Static silences seeded at load and re-seeded on hot-reload. Dynamic
80    /// silences created over the API are independent of this list.
81    #[serde(default)]
82    pub silences: Vec<SilenceSpec>,
83    /// Ceiling on concurrently-tracked dynamic (API) silences. Creation past
84    /// this many is rejected with `429`. Defaults to 1000.
85    #[serde(default)]
86    pub max_silences: Option<usize>,
87    /// Inhibition rules. An active source mutes matching targets.
88    #[serde(default)]
89    pub inhibit_rules: Vec<InhibitRuleFile>,
90}
91
92/// `inhibit_rules:` entry.
93#[derive(Debug, Clone, Default, Deserialize)]
94pub struct InhibitRuleFile {
95    /// Stable name (used as the metric label); defaults to `inhibit_rule_<i>`.
96    #[serde(default)]
97    pub name: Option<String>,
98    /// Matchers a source alert must satisfy.
99    #[serde(default)]
100    pub source_match: Vec<MatcherSpec>,
101    /// Matchers a target alert must satisfy.
102    #[serde(default)]
103    pub target_match: Vec<MatcherSpec>,
104    /// Selectors whose values must match between source and target.
105    #[serde(default)]
106    pub equal: Vec<String>,
107    /// How long a source remains active after last seen (humantime).
108    #[serde(default, with = "humantime_opt")]
109    pub duration: Option<Duration>,
110}
111
112/// `scope:` block, mirroring the enrichers config.
113#[derive(Debug, Clone, Default, Deserialize)]
114pub struct ScopeConfig {
115    /// Rule-id exact matches or rule-title globs.
116    #[serde(default)]
117    pub rules: Vec<String>,
118    /// Tag exact matches or `prefix.*` wildcards.
119    #[serde(default)]
120    pub tags: Vec<String>,
121    /// Severity levels.
122    #[serde(default)]
123    pub levels: Vec<String>,
124}
125
126/// `dedup:` block.
127#[derive(Debug, Clone, Default, Deserialize)]
128pub struct DedupFile {
129    /// Selectors hashed (with the rule identity) into the fingerprint.
130    #[serde(default)]
131    pub fingerprint: Vec<String>,
132    /// Re-emit cadence (humantime, e.g. `1h`). `0` / omitted means pure
133    /// suppression.
134    #[serde(default, with = "humantime_opt")]
135    pub repeat_interval: Option<Duration>,
136    /// Idle timeout after which an active alert resolves (humantime).
137    #[serde(default, with = "humantime_opt")]
138    pub resolve_timeout: Option<Duration>,
139    /// Ceiling on concurrently-active alerts. Past this, first-fires pass
140    /// through un-deduped (bounds memory under high-cardinality fingerprints).
141    /// Defaults to 100000.
142    #[serde(default)]
143    pub max_active_alerts: Option<usize>,
144}
145
146/// Grouping mode label.
147#[derive(Debug, Clone, Copy, Default, Deserialize)]
148#[serde(rename_all = "snake_case")]
149pub enum GroupModeLabel {
150    /// Group by equality on the `by` selectors (default).
151    #[default]
152    GroupBy,
153    /// Union-find over `entities` selector values.
154    EntityGraph,
155}
156
157/// Contributing-result include label.
158#[derive(Debug, Clone, Copy, Default, Deserialize)]
159#[serde(rename_all = "snake_case")]
160pub enum IncludeLabel {
161    /// Lightweight references only (default).
162    #[default]
163    Refs,
164    /// Full (event-stripped) contributing results.
165    Results,
166}
167
168/// `group.caps:` block.
169#[derive(Debug, Clone, Default, Deserialize)]
170pub struct CapsFile {
171    #[serde(default)]
172    pub max_open_incidents: Option<usize>,
173    #[serde(default)]
174    pub max_entities_per_incident: Option<usize>,
175    #[serde(default)]
176    pub max_results_per_incident: Option<usize>,
177    #[serde(default)]
178    pub max_value_cardinality: Option<u64>,
179}
180
181/// `group:` block.
182#[derive(Debug, Clone, Default, Deserialize)]
183pub struct GroupFile {
184    #[serde(default)]
185    pub mode: GroupModeLabel,
186    /// `group_by` mode: selectors forming the group key.
187    #[serde(default)]
188    pub by: Vec<String>,
189    /// `entity_graph` mode: selectors forming join edges.
190    #[serde(default)]
191    pub entities: Vec<String>,
192    #[serde(default, with = "humantime_opt")]
193    pub group_wait: Option<Duration>,
194    #[serde(default, with = "humantime_opt")]
195    pub group_interval: Option<Duration>,
196    #[serde(default, with = "humantime_opt")]
197    pub repeat_interval: Option<Duration>,
198    #[serde(default, with = "humantime_opt")]
199    pub resolve_timeout: Option<Duration>,
200    #[serde(default)]
201    pub include: IncludeLabel,
202    #[serde(default)]
203    pub caps: Option<CapsFile>,
204    /// `entity_graph` values that never form a join edge.
205    #[serde(default)]
206    pub stop_values: Vec<String>,
207    /// Optional NATS subject override for emitted incidents.
208    #[serde(default)]
209    pub nats_subject: Option<String>,
210}
211
212/// Errors produced while loading or validating an alert-pipeline config.
213#[derive(Debug)]
214pub enum AlertPipelineConfigError {
215    /// File could not be read.
216    Io(std::io::Error, PathBuf),
217    /// YAML failed to deserialize.
218    Yaml(yaml_serde::Error),
219    /// A fingerprint selector failed to parse.
220    Selector(SelectorParseError),
221    /// Scope construction failed.
222    Scope(String),
223    /// `dedup` was configured with an empty `fingerprint` list.
224    EmptyFingerprint,
225    /// A grouping selector failed to parse.
226    GroupSelector(SelectorParseError),
227    /// `group.mode: group_by` with an empty `by` list.
228    EmptyGroupBy,
229    /// `group.mode: entity_graph` with an empty `entities` list.
230    EmptyEntities,
231    /// A static silence failed to build.
232    Silence(SilenceError),
233    /// An inhibit-rule matcher failed to compile.
234    InhibitMatcher(MatcherError),
235    /// An inhibit-rule `equal` selector failed to parse.
236    InhibitSelector(SelectorParseError),
237    /// An inhibit rule has an empty `source_match` or `target_match`.
238    EmptyInhibitMatch,
239}
240
241impl std::fmt::Display for AlertPipelineConfigError {
242    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243        match self {
244            AlertPipelineConfigError::Io(e, p) => {
245                write!(
246                    f,
247                    "failed to read alert-pipeline config '{}': {e}",
248                    p.display()
249                )
250            }
251            AlertPipelineConfigError::Yaml(e) => write!(f, "invalid alert-pipeline YAML: {e}"),
252            AlertPipelineConfigError::Selector(e) => write!(f, "dedup.fingerprint: {e}"),
253            AlertPipelineConfigError::Scope(message) => write!(f, "scope: {message}"),
254            AlertPipelineConfigError::EmptyFingerprint => write!(
255                f,
256                "dedup is configured but dedup.fingerprint is empty; list at least one selector"
257            ),
258            AlertPipelineConfigError::GroupSelector(e) => write!(f, "group: {e}"),
259            AlertPipelineConfigError::EmptyGroupBy => write!(
260                f,
261                "group.mode is group_by but group.by is empty; list at least one selector"
262            ),
263            AlertPipelineConfigError::EmptyEntities => write!(
264                f,
265                "group.mode is entity_graph but group.entities is empty; list at least one selector"
266            ),
267            AlertPipelineConfigError::Silence(e) => write!(f, "silences: {e}"),
268            AlertPipelineConfigError::InhibitMatcher(e) => write!(f, "inhibit_rules: {e}"),
269            AlertPipelineConfigError::InhibitSelector(e) => write!(f, "inhibit_rules.equal: {e}"),
270            AlertPipelineConfigError::EmptyInhibitMatch => write!(
271                f,
272                "an inhibit rule requires a non-empty source_match and target_match"
273            ),
274        }
275    }
276}
277
278impl std::error::Error for AlertPipelineConfigError {}
279
280/// Read and deserialize an alert-pipeline config file.
281pub fn load_alert_pipeline_file(
282    path: &Path,
283) -> Result<AlertPipelineFile, AlertPipelineConfigError> {
284    let text = std::fs::read_to_string(path)
285        .map_err(|e| AlertPipelineConfigError::Io(e, path.to_path_buf()))?;
286    yaml_serde::from_str(&text).map_err(AlertPipelineConfigError::Yaml)
287}
288
289/// Parse and validate an alert-pipeline config from a YAML string.
290///
291/// Convenience over [`load_alert_pipeline_file`] for in-memory inputs (tests
292/// and fuzzing): deserializes then runs the same validation [`build_alert_pipeline`]
293/// performs.
294pub fn parse_alert_pipeline_config(text: &str) -> Result<AlertPipeline, AlertPipelineConfigError> {
295    let file: AlertPipelineFile =
296        yaml_serde::from_str(text).map_err(AlertPipelineConfigError::Yaml)?;
297    build_alert_pipeline(file)
298}
299
300/// Validate a parsed file into a runnable [`AlertPipeline`].
301pub fn build_alert_pipeline(
302    file: AlertPipelineFile,
303) -> Result<AlertPipeline, AlertPipelineConfigError> {
304    let scope = match file.scope {
305        Some(s) => {
306            Scope::new(s.rules, s.tags, s.levels).map_err(AlertPipelineConfigError::Scope)?
307        }
308        None => Scope::default(),
309    };
310
311    let dedup = match file.dedup {
312        Some(d) => {
313            let mut fingerprint = Vec::with_capacity(d.fingerprint.len());
314            for raw in &d.fingerprint {
315                fingerprint.push(Selector::parse(raw).map_err(AlertPipelineConfigError::Selector)?);
316            }
317            if fingerprint.is_empty() {
318                return Err(AlertPipelineConfigError::EmptyFingerprint);
319            }
320            Some(DedupConfig {
321                fingerprint,
322                repeat_interval: d.repeat_interval.unwrap_or(DEFAULT_REPEAT_INTERVAL),
323                resolve_timeout: d.resolve_timeout.unwrap_or(DEFAULT_RESOLVE_TIMEOUT),
324                max_active_alerts: d.max_active_alerts.unwrap_or(DEFAULT_MAX_ACTIVE_ALERTS),
325            })
326        }
327        None => None,
328    };
329
330    let group = match file.group {
331        Some(g) => Some(build_group(g)?),
332        None => None,
333    };
334
335    let mut static_silences = Vec::with_capacity(file.silences.len());
336    for spec in file.silences {
337        static_silences.push(
338            Silence::build(spec, SilenceOrigin::Static)
339                .map_err(AlertPipelineConfigError::Silence)?,
340        );
341    }
342
343    let inhibit = if file.inhibit_rules.is_empty() {
344        None
345    } else {
346        Some(build_inhibit(file.inhibit_rules)?)
347    };
348
349    let max_silences = file.max_silences.unwrap_or(DEFAULT_MAX_DYNAMIC_SILENCES);
350
351    Ok(AlertPipeline::new(
352        scope,
353        file.strip_event,
354        dedup,
355        group,
356        static_silences,
357        inhibit,
358        max_silences,
359    ))
360}
361
362/// Validate the `inhibit_rules:` list into an [`InhibitConfig`].
363fn build_inhibit(rules: Vec<InhibitRuleFile>) -> Result<InhibitConfig, AlertPipelineConfigError> {
364    let mut out = Vec::with_capacity(rules.len());
365    for (i, rule) in rules.into_iter().enumerate() {
366        if rule.source_match.is_empty() || rule.target_match.is_empty() {
367            return Err(AlertPipelineConfigError::EmptyInhibitMatch);
368        }
369        let source_match = MatcherSet::compile(&rule.source_match)
370            .map_err(AlertPipelineConfigError::InhibitMatcher)?;
371        let target_match = MatcherSet::compile(&rule.target_match)
372            .map_err(AlertPipelineConfigError::InhibitMatcher)?;
373        let mut equal = Vec::with_capacity(rule.equal.len());
374        for raw in &rule.equal {
375            equal.push(Selector::parse(raw).map_err(AlertPipelineConfigError::InhibitSelector)?);
376        }
377        out.push(InhibitRule {
378            name: rule.name.unwrap_or_else(|| format!("inhibit_rule_{i}")),
379            source_match,
380            target_match,
381            equal,
382            duration: rule.duration.unwrap_or(DEFAULT_INHIBIT_DURATION),
383        });
384    }
385    Ok(InhibitConfig { rules: out })
386}
387
388/// Validate a `group:` block into a [`GroupConfig`].
389fn build_group(g: GroupFile) -> Result<GroupConfig, AlertPipelineConfigError> {
390    let mode = match g.mode {
391        GroupModeLabel::GroupBy => GroupMode::GroupBy,
392        GroupModeLabel::EntityGraph => GroupMode::EntityGraph,
393    };
394    let parse = |raw: &str| Selector::parse(raw).map_err(AlertPipelineConfigError::GroupSelector);
395    let by =
396        g.by.iter()
397            .map(|s| parse(s))
398            .collect::<Result<Vec<_>, _>>()?;
399    let entities = g
400        .entities
401        .iter()
402        .map(|s| parse(s))
403        .collect::<Result<Vec<_>, _>>()?;
404    match mode {
405        GroupMode::GroupBy if by.is_empty() => return Err(AlertPipelineConfigError::EmptyGroupBy),
406        GroupMode::EntityGraph if entities.is_empty() => {
407            return Err(AlertPipelineConfigError::EmptyEntities);
408        }
409        _ => {}
410    }
411    let include = match g.include {
412        IncludeLabel::Refs => IncludeMode::Refs,
413        IncludeLabel::Results => IncludeMode::Results,
414    };
415    let caps_file = g.caps.unwrap_or_default();
416    let defaults = Caps::default();
417    let caps = Caps {
418        max_open_incidents: caps_file
419            .max_open_incidents
420            .unwrap_or(defaults.max_open_incidents),
421        max_entities_per_incident: caps_file
422            .max_entities_per_incident
423            .unwrap_or(defaults.max_entities_per_incident),
424        max_results_per_incident: caps_file
425            .max_results_per_incident
426            .unwrap_or(defaults.max_results_per_incident),
427        max_value_cardinality: caps_file
428            .max_value_cardinality
429            .unwrap_or(defaults.max_value_cardinality),
430    };
431    Ok(GroupConfig {
432        mode,
433        by,
434        entities,
435        group_wait: g.group_wait.unwrap_or(DEFAULT_GROUP_WAIT),
436        group_interval: g.group_interval.unwrap_or(DEFAULT_GROUP_INTERVAL),
437        repeat_interval: g.repeat_interval.unwrap_or(DEFAULT_REPEAT_INTERVAL),
438        resolve_timeout: g.resolve_timeout.unwrap_or(DEFAULT_RESOLVE_TIMEOUT),
439        include,
440        caps,
441        stop_values: g.stop_values.into_iter().collect::<BTreeSet<_>>(),
442        nats_subject: g.nats_subject,
443    })
444}
445
446/// humantime serde adapter for `Option<Duration>`, accepting `null` / missing.
447mod humantime_opt {
448    use std::time::Duration;
449
450    use serde::{Deserialize, Deserializer};
451
452    pub fn deserialize<'de, D>(d: D) -> Result<Option<Duration>, D::Error>
453    where
454        D: Deserializer<'de>,
455    {
456        let raw: Option<String> = Option::deserialize(d)?;
457        match raw {
458            Some(s) => humantime::parse_duration(&s)
459                .map(Some)
460                .map_err(serde::de::Error::custom),
461            None => Ok(None),
462        }
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469
470    #[test]
471    fn empty_file_builds() {
472        let file: AlertPipelineFile = yaml_serde::from_str("{}").unwrap();
473        build_alert_pipeline(file).unwrap();
474    }
475
476    #[test]
477    fn full_dedup_config_parses() {
478        let yaml = r#"
479strip_event: true
480scope:
481  levels: [high, critical]
482dedup:
483  fingerprint:
484    - rule
485    - match.SourceIp
486  repeat_interval: 1h
487  resolve_timeout: 30m
488"#;
489        let file: AlertPipelineFile = yaml_serde::from_str(yaml).unwrap();
490        build_alert_pipeline(file).unwrap();
491    }
492
493    #[test]
494    fn static_silence_parses() {
495        let yaml = r#"
496silences:
497  - matchers:
498      - selector: rule
499        op: "="
500        value: noisy-rule
501    comment: maintenance
502"#;
503        let file: AlertPipelineFile = yaml_serde::from_str(yaml).unwrap();
504        let pipeline = build_alert_pipeline(file).unwrap();
505        assert_eq!(pipeline.static_silences().len(), 1);
506    }
507
508    #[test]
509    fn static_silence_without_matchers_is_rejected() {
510        let yaml = r#"
511silences:
512  - comment: bad
513"#;
514        let file: AlertPipelineFile = yaml_serde::from_str(yaml).unwrap();
515        assert!(build_alert_pipeline(file).is_err());
516    }
517
518    #[test]
519    fn inhibit_rule_parses() {
520        let yaml = r#"
521inhibit_rules:
522  - name: crit-inhibits-high
523    source_match:
524      - selector: level
525        op: "="
526        value: critical
527    target_match:
528      - selector: level
529        op: "="
530        value: high
531    equal: [match.SourceIp]
532    duration: 5m
533"#;
534        let file: AlertPipelineFile = yaml_serde::from_str(yaml).unwrap();
535        build_alert_pipeline(file).unwrap();
536    }
537
538    #[test]
539    fn inhibit_rule_without_matchers_is_rejected() {
540        let yaml = r#"
541inhibit_rules:
542  - equal: [match.SourceIp]
543"#;
544        let file: AlertPipelineFile = yaml_serde::from_str(yaml).unwrap();
545        assert!(build_alert_pipeline(file).is_err());
546    }
547
548    #[test]
549    fn bad_selector_points_at_the_field() {
550        let yaml = r#"
551dedup:
552  fingerprint:
553    - bogus.field
554"#;
555        let file: AlertPipelineFile = yaml_serde::from_str(yaml).unwrap();
556        let err = build_alert_pipeline(file).unwrap_err();
557        let msg = err.to_string();
558        assert!(msg.contains("dedup.fingerprint"), "got: {msg}");
559        assert!(msg.contains("bogus.field"), "got: {msg}");
560    }
561
562    #[test]
563    fn empty_fingerprint_is_rejected() {
564        let yaml = r#"
565dedup:
566  fingerprint: []
567"#;
568        let file: AlertPipelineFile = yaml_serde::from_str(yaml).unwrap();
569        let err = build_alert_pipeline(file).unwrap_err();
570        assert!(matches!(err, AlertPipelineConfigError::EmptyFingerprint));
571    }
572
573    #[test]
574    fn bad_scope_glob_is_rejected() {
575        let yaml = r#"
576scope:
577  rules:
578    - "[unclosed"
579dedup:
580  fingerprint: [rule]
581"#;
582        let file: AlertPipelineFile = yaml_serde::from_str(yaml).unwrap();
583        let err = build_alert_pipeline(file).unwrap_err();
584        assert!(matches!(err, AlertPipelineConfigError::Scope(_)));
585    }
586}