Skip to main content

rsigma_runtime/alert_pipeline/
silence.rs

1//! Operator silences, modeled on Alertmanager.
2//!
3//! A silence mutes results matching a [`MatcherSet`] for a time window. The
4//! first fire matching an active silence is acked and dropped before dedup, so
5//! a silenced result neither emits nor opens an incident.
6//!
7//! Silences come from two origins: `static` ones declared in the
8//! `--alert-pipeline` config (replaced on hot-reload) and `api` ones created at
9//! runtime over `POST /api/v1/silences` (independent of the config file).
10
11use serde::{Deserialize, Serialize};
12
13use rsigma_eval::EvaluationResult;
14
15use super::matcher::{MatcherError, MatcherSet, MatcherSpec};
16
17/// Where a silence came from.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
19#[serde(rename_all = "snake_case")]
20pub enum SilenceOrigin {
21    /// Declared in the config `silences:` block.
22    Static,
23    /// Created at runtime over the API.
24    Api,
25}
26
27/// A silence's lifecycle state, derived from the time window.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
29#[serde(rename_all = "snake_case")]
30pub enum SilenceState {
31    /// `now` is before `starts_at`.
32    Pending,
33    /// Within the window (or unbounded).
34    Active,
35    /// `now` is at or after `ends_at`.
36    Expired,
37}
38
39/// A silence input from config or the API.
40#[derive(Debug, Clone, Default, Deserialize)]
41pub struct SilenceSpec {
42    /// Optional client-supplied id; a UUID is assigned when absent.
43    #[serde(default)]
44    pub id: Option<String>,
45    /// Matchers (ANDed). At least one is required.
46    #[serde(default)]
47    pub matchers: Vec<MatcherSpec>,
48    /// RFC 3339 start; absent means active immediately.
49    #[serde(default)]
50    pub starts_at: Option<String>,
51    /// RFC 3339 end; absent means it never expires.
52    #[serde(default)]
53    pub ends_at: Option<String>,
54    /// Who created it (recorded, does not affect matching).
55    #[serde(default)]
56    pub created_by: Option<String>,
57    /// Free-text comment (recorded).
58    #[serde(default)]
59    pub comment: Option<String>,
60}
61
62/// The serialized view of a silence (the GET response shape).
63#[derive(Debug, Clone, Serialize)]
64pub struct SilenceView {
65    pub id: String,
66    pub matchers: Vec<MatcherSpec>,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub starts_at: Option<String>,
69    #[serde(skip_serializing_if = "Option::is_none")]
70    pub ends_at: Option<String>,
71    #[serde(skip_serializing_if = "Option::is_none")]
72    pub created_by: Option<String>,
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub comment: Option<String>,
75    pub origin: SilenceOrigin,
76    pub state: SilenceState,
77}
78
79/// Persisted form of a dynamic (API) silence.
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct SilenceSnap {
82    pub id: String,
83    pub matchers: Vec<MatcherSpec>,
84    pub starts_at: Option<i64>,
85    pub ends_at: Option<i64>,
86    pub created_by: Option<String>,
87    pub comment: Option<String>,
88}
89
90/// Errors building a silence from a spec.
91#[derive(Debug, Clone)]
92pub enum SilenceError {
93    /// No matchers supplied.
94    EmptyMatchers,
95    /// A matcher failed to compile.
96    Matcher(MatcherError),
97    /// A time field failed to parse as RFC 3339.
98    Time { field: &'static str, value: String },
99    /// `ends_at` is not after `starts_at`.
100    Window,
101}
102
103impl std::fmt::Display for SilenceError {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        match self {
106            SilenceError::EmptyMatchers => write!(f, "a silence requires at least one matcher"),
107            SilenceError::Matcher(e) => write!(f, "{e}"),
108            SilenceError::Time { field, value } => {
109                write!(
110                    f,
111                    "invalid {field} '{value}': expected an RFC 3339 timestamp"
112                )
113            }
114            SilenceError::Window => write!(f, "ends_at must be after starts_at"),
115        }
116    }
117}
118
119impl std::error::Error for SilenceError {}
120
121/// A compiled silence.
122#[derive(Debug, Clone)]
123pub struct Silence {
124    id: String,
125    matchers: MatcherSet,
126    starts_at: Option<i64>,
127    ends_at: Option<i64>,
128    created_by: Option<String>,
129    comment: Option<String>,
130    origin: SilenceOrigin,
131}
132
133impl Silence {
134    /// Build and validate a silence from a spec.
135    pub fn build(spec: SilenceSpec, origin: SilenceOrigin) -> Result<Self, SilenceError> {
136        if spec.matchers.is_empty() {
137            return Err(SilenceError::EmptyMatchers);
138        }
139        let matchers = MatcherSet::compile(&spec.matchers).map_err(SilenceError::Matcher)?;
140        let starts_at = parse_time(spec.starts_at.as_deref(), "starts_at")?;
141        let ends_at = parse_time(spec.ends_at.as_deref(), "ends_at")?;
142        if let (Some(s), Some(e)) = (starts_at, ends_at)
143            && e <= s
144        {
145            return Err(SilenceError::Window);
146        }
147        Ok(Silence {
148            id: spec.id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
149            matchers,
150            starts_at,
151            ends_at,
152            created_by: spec.created_by,
153            comment: spec.comment,
154            origin,
155        })
156    }
157
158    /// The silence id.
159    pub fn id(&self) -> &str {
160        &self.id
161    }
162
163    /// The lifecycle state at `now`.
164    pub fn state(&self, now: i64) -> SilenceState {
165        if self.starts_at.is_some_and(|s| now < s) {
166            SilenceState::Pending
167        } else if self.ends_at.is_some_and(|e| now >= e) {
168            SilenceState::Expired
169        } else {
170            SilenceState::Active
171        }
172    }
173
174    /// True when active and every matcher matches the result.
175    fn mutes(&self, result: &EvaluationResult, now: i64) -> bool {
176        self.state(now) == SilenceState::Active && self.matchers.matches(result)
177    }
178
179    /// Persisted form (for API-origin silences).
180    fn to_snap(&self) -> SilenceSnap {
181        SilenceSnap {
182            id: self.id.clone(),
183            matchers: self.matchers.to_specs(),
184            starts_at: self.starts_at,
185            ends_at: self.ends_at,
186            created_by: self.created_by.clone(),
187            comment: self.comment.clone(),
188        }
189    }
190
191    /// Rebuild an API-origin silence from its persisted form.
192    fn from_snap(snap: SilenceSnap) -> Result<Self, SilenceError> {
193        if snap.matchers.is_empty() {
194            return Err(SilenceError::EmptyMatchers);
195        }
196        let matchers = MatcherSet::compile(&snap.matchers).map_err(SilenceError::Matcher)?;
197        Ok(Silence {
198            id: snap.id,
199            matchers,
200            starts_at: snap.starts_at,
201            ends_at: snap.ends_at,
202            created_by: snap.created_by,
203            comment: snap.comment,
204            origin: SilenceOrigin::Api,
205        })
206    }
207
208    fn view(&self, now: i64) -> SilenceView {
209        SilenceView {
210            id: self.id.clone(),
211            matchers: self.matchers.to_specs(),
212            starts_at: self.starts_at.map(unix_to_rfc3339),
213            ends_at: self.ends_at.map(unix_to_rfc3339),
214            created_by: self.created_by.clone(),
215            comment: self.comment.clone(),
216            origin: self.origin,
217            state: self.state(now),
218        }
219    }
220}
221
222/// The runtime silence set, owned single-threaded by the sink task and shared
223/// behind an `RwLock` so the silence API can mutate it.
224#[derive(Debug, Default)]
225pub struct SilenceStore {
226    silences: Vec<Silence>,
227}
228
229impl SilenceStore {
230    /// True when no silences are tracked.
231    pub fn is_empty(&self) -> bool {
232        self.silences.is_empty()
233    }
234
235    /// Add a silence (used by restore and tests). Prefer [`try_add`] on the API
236    /// path so the dynamic-silence cap is enforced.
237    ///
238    /// [`try_add`]: SilenceStore::try_add
239    pub fn add(&mut self, silence: Silence) {
240        self.silences.push(silence);
241    }
242
243    /// Count of dynamic (API-origin) silences currently tracked.
244    pub fn dynamic_count(&self) -> usize {
245        self.silences
246            .iter()
247            .filter(|s| s.origin == SilenceOrigin::Api)
248            .count()
249    }
250
251    /// Add an API-origin silence unless the dynamic-silence cap is reached.
252    /// Returns `false` (and does not add) when at or over `max_dynamic`.
253    pub fn try_add(&mut self, silence: Silence, max_dynamic: usize) -> bool {
254        if self.dynamic_count() >= max_dynamic {
255            return false;
256        }
257        self.silences.push(silence);
258        true
259    }
260
261    /// Remove a silence by id. Returns whether one was removed.
262    pub fn remove(&mut self, id: &str) -> bool {
263        let before = self.silences.len();
264        self.silences.retain(|s| s.id != id);
265        self.silences.len() != before
266    }
267
268    /// Replace all `static`-origin silences with `statics`, keeping `api` ones.
269    /// Used on config load and hot-reload.
270    pub fn set_static(&mut self, statics: Vec<Silence>) {
271        self.silences.retain(|s| s.origin != SilenceOrigin::Static);
272        self.silences.extend(statics);
273    }
274
275    /// Drop expired silences.
276    pub fn gc(&mut self, now: i64) {
277        self.silences
278            .retain(|s| s.state(now) != SilenceState::Expired);
279    }
280
281    /// The id of the first active silence muting the result, if any.
282    pub fn active_match(&self, result: &EvaluationResult, now: i64) -> Option<&str> {
283        self.silences
284            .iter()
285            .find(|s| s.mutes(result, now))
286            .map(|s| s.id.as_str())
287    }
288
289    /// Count of currently-active silences.
290    pub fn active_count(&self, now: i64) -> usize {
291        self.silences
292            .iter()
293            .filter(|s| s.state(now) == SilenceState::Active)
294            .count()
295    }
296
297    /// A snapshot of every silence for the API.
298    pub fn snapshot(&self, now: i64) -> Vec<SilenceView> {
299        self.silences.iter().map(|s| s.view(now)).collect()
300    }
301
302    /// Persisted form of the dynamic (API) silences only; static ones come from
303    /// config and are re-seeded on boot.
304    pub(crate) fn api_snapshot(&self) -> Vec<SilenceSnap> {
305        self.silences
306            .iter()
307            .filter(|s| s.origin == SilenceOrigin::Api)
308            .map(|s| s.to_snap())
309            .collect()
310    }
311
312    /// Restore API silences from persisted form, skipping any already expired at
313    /// `now` or that fail to recompile.
314    pub(crate) fn restore_api(&mut self, snaps: Vec<SilenceSnap>, now: i64) {
315        for snap in snaps {
316            match Silence::from_snap(snap) {
317                Ok(silence) if silence.state(now) != SilenceState::Expired => {
318                    self.silences.push(silence);
319                }
320                Ok(_) => {}
321                Err(e) => tracing::warn!(error = %e, "Dropping unrestorable silence"),
322            }
323        }
324    }
325}
326
327/// Parse an optional RFC 3339 timestamp into unix seconds.
328fn parse_time(raw: Option<&str>, field: &'static str) -> Result<Option<i64>, SilenceError> {
329    match raw {
330        None => Ok(None),
331        Some(s) => chrono::DateTime::parse_from_rfc3339(s)
332            .map(|dt| Some(dt.timestamp()))
333            .map_err(|_| SilenceError::Time {
334                field,
335                value: s.to_string(),
336            }),
337    }
338}
339
340/// Format unix seconds as an RFC 3339 timestamp (UTC).
341fn unix_to_rfc3339(secs: i64) -> String {
342    chrono::DateTime::from_timestamp(secs, 0)
343        .unwrap_or_default()
344        .to_rfc3339()
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350    use rsigma_eval::{DetectionBody, EvaluationResult, FieldMatch, ResultBody, RuleHeader};
351    use rsigma_parser::Level;
352    use std::collections::HashMap;
353    use std::sync::Arc;
354
355    use super::super::matcher::MatchOp;
356
357    fn detection(ip: &str) -> EvaluationResult {
358        EvaluationResult {
359            header: RuleHeader {
360                rule_title: "t".to_string(),
361                rule_id: Some("rule-1".to_string()),
362                level: Some(Level::High),
363                tags: vec![],
364                custom_attributes: Arc::new(HashMap::new()),
365                enrichments: None,
366            },
367            body: ResultBody::Detection(DetectionBody {
368                matched_selections: vec![],
369                matched_fields: vec![FieldMatch::new("SourceIp", serde_json::json!(ip))],
370                event: None,
371            }),
372        }
373    }
374
375    fn spec(ip: &str) -> SilenceSpec {
376        SilenceSpec {
377            matchers: vec![MatcherSpec {
378                selector: "match.SourceIp".to_string(),
379                op: MatchOp::Eq,
380                value: ip.to_string(),
381            }],
382            ..Default::default()
383        }
384    }
385
386    #[test]
387    fn unbounded_silence_is_active_and_mutes() {
388        let mut store = SilenceStore::default();
389        store.add(Silence::build(spec("10.0.0.1"), SilenceOrigin::Api).unwrap());
390        assert!(store.active_match(&detection("10.0.0.1"), 100).is_some());
391        assert!(store.active_match(&detection("10.0.0.2"), 100).is_none());
392        assert_eq!(store.active_count(100), 1);
393    }
394
395    #[test]
396    fn time_window_pending_active_expired() {
397        let s = Silence::build(
398            SilenceSpec {
399                starts_at: Some("2026-01-01T00:00:00Z".to_string()),
400                ends_at: Some("2026-01-02T00:00:00Z".to_string()),
401                ..spec("10.0.0.1")
402            },
403            SilenceOrigin::Api,
404        )
405        .unwrap();
406        let before = 1_767_139_200; // 2025-12-31
407        let during = 1_767_283_200; // 2026-01-01T12:00
408        let after = 1_767_312_000; // 2026-01-02T...
409        assert_eq!(s.state(before), SilenceState::Pending);
410        assert_eq!(s.state(during), SilenceState::Active);
411        assert_eq!(s.state(after), SilenceState::Expired);
412    }
413
414    #[test]
415    fn gc_drops_expired() {
416        let mut store = SilenceStore::default();
417        store.add(
418            Silence::build(
419                SilenceSpec {
420                    ends_at: Some("2026-01-01T00:00:00Z".to_string()),
421                    ..spec("10.0.0.1")
422                },
423                SilenceOrigin::Api,
424            )
425            .unwrap(),
426        );
427        store.gc(1_700_000_000); // before end: kept
428        assert!(!store.is_empty());
429        store.gc(1_800_000_000); // after end: dropped
430        assert!(store.is_empty());
431    }
432
433    #[test]
434    fn set_static_replaces_static_keeps_api() {
435        let mut store = SilenceStore::default();
436        store.add(Silence::build(spec("10.0.0.9"), SilenceOrigin::Api).unwrap());
437        store.set_static(vec![
438            Silence::build(spec("10.0.0.1"), SilenceOrigin::Static).unwrap(),
439        ]);
440        assert_eq!(store.snapshot(0).len(), 2);
441        // Reseed with a different static set: the API one survives.
442        store.set_static(vec![
443            Silence::build(spec("10.0.0.2"), SilenceOrigin::Static).unwrap(),
444        ]);
445        let ips: Vec<String> = store
446            .snapshot(0)
447            .into_iter()
448            .flat_map(|v| v.matchers.into_iter().map(|m| m.value))
449            .collect();
450        assert!(ips.contains(&"10.0.0.9".to_string()), "api silence kept");
451        assert!(ips.contains(&"10.0.0.2".to_string()), "new static seeded");
452        assert!(
453            !ips.contains(&"10.0.0.1".to_string()),
454            "old static replaced"
455        );
456    }
457
458    #[test]
459    fn remove_by_id() {
460        let mut store = SilenceStore::default();
461        let s = Silence::build(spec("10.0.0.1"), SilenceOrigin::Api).unwrap();
462        let id = s.id().to_string();
463        store.add(s);
464        assert!(store.remove(&id));
465        assert!(!store.remove(&id));
466        assert!(store.is_empty());
467    }
468
469    #[test]
470    fn empty_matchers_rejected() {
471        let err = Silence::build(SilenceSpec::default(), SilenceOrigin::Api).unwrap_err();
472        assert!(matches!(err, SilenceError::EmptyMatchers));
473    }
474
475    #[test]
476    fn try_add_enforces_dynamic_cap() {
477        let mut store = SilenceStore::default();
478        // A static silence does not count against the dynamic cap.
479        store.add(Silence::build(spec("10.0.0.0"), SilenceOrigin::Static).unwrap());
480        assert!(store.try_add(
481            Silence::build(spec("10.0.0.1"), SilenceOrigin::Api).unwrap(),
482            2
483        ));
484        assert!(store.try_add(
485            Silence::build(spec("10.0.0.2"), SilenceOrigin::Api).unwrap(),
486            2
487        ));
488        // Third dynamic silence is rejected at the cap of 2.
489        assert!(!store.try_add(
490            Silence::build(spec("10.0.0.3"), SilenceOrigin::Api).unwrap(),
491            2
492        ));
493        assert_eq!(store.dynamic_count(), 2);
494    }
495}