Skip to main content

rsigma_runtime/alert_pipeline/
dedup.rs

1//! Fingerprint deduplication with an Alertmanager-style active-alert lifecycle.
2//!
3//! Each in-scope result is reduced to a fingerprint (the rule identity plus the
4//! configured selector values). The first fire for a fingerprint passes through
5//! unchanged and opens an *active alert*; subsequent fires within the window
6//! fold into that alert (incrementing the count and `last_seen`) instead of
7//! being emitted again. A periodic tick re-emits a still-active alert every
8//! `repeat_interval` (carrying the accumulated fire count) and emits a final
9//! `resolved` record once `resolve_timeout` elapses with no further fires.
10//!
11//! Re-emit and resolved records are ordinary [`EvaluationResult`]s carrying a
12//! `dedup_state` key in `header.enrichments`, so they ride the existing sink
13//! path and wire shape.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use rsigma_eval::EvaluationResult;
19use serde::{Deserialize, Serialize};
20use serde_json::Value;
21
22use super::strip_event_payloads;
23use crate::selector::Selector;
24
25/// Validated dedup configuration.
26#[derive(Debug, Clone)]
27pub struct DedupConfig {
28    /// Selectors hashed (with the rule identity) into the fingerprint.
29    pub fingerprint: Vec<Selector>,
30    /// Re-emit cadence for a still-active alert. `0` disables re-emits
31    /// (pure suppression with a single resolved summary on expiry).
32    pub repeat_interval: Duration,
33    /// Idle timeout after which an active alert resolves and is evicted.
34    pub resolve_timeout: Duration,
35    /// Ceiling on concurrently-active alerts. Once reached, a first-fire for a
36    /// new fingerprint passes through un-deduped rather than opening another
37    /// alert, so a high-cardinality fingerprint cannot grow the store without
38    /// bound between resolve windows.
39    pub max_active_alerts: usize,
40}
41
42/// One fingerprint's active-alert state.
43///
44/// `sample` is the event-stripped first-fire result as a JSON [`Value`] (rather
45/// than an [`EvaluationResult`], which is serialize-only), so the active-alert
46/// store round-trips through the persistence snapshot.
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub(crate) struct ActiveAlert {
49    first_seen: i64,
50    last_seen: i64,
51    last_emitted: i64,
52    fire_count: u64,
53    /// `fire_count` as of the last emission, so a repeat tick is a no-op when
54    /// nothing new folded in since the previous re-emit.
55    emitted_count: u64,
56    /// Representative result (event payloads stripped) used to build the
57    /// `repeat` / `resolved` records.
58    sample: Value,
59    /// Resolved fingerprint selector values, surfaced on the summary records.
60    fields: Vec<(String, Value)>,
61}
62
63/// In-memory active-alert store, owned single-threaded by the sink task.
64#[derive(Debug, Default)]
65pub struct DedupStore {
66    alerts: HashMap<String, ActiveAlert>,
67}
68
69/// A summary record produced by the periodic tick, tagged with its state so
70/// the driver can record the matching metric.
71pub(crate) struct DedupRecord {
72    /// `repeat` or `resolved`.
73    pub state: &'static str,
74    /// The summary line to emit (a serialized result with `dedup_*` keys).
75    pub json: Value,
76}
77
78impl DedupStore {
79    /// Number of active alerts currently tracked.
80    pub fn len(&self) -> usize {
81        self.alerts.len()
82    }
83
84    /// True when no active alerts are tracked.
85    pub fn is_empty(&self) -> bool {
86        self.alerts.is_empty()
87    }
88
89    /// True when a fingerprint already has an active alert.
90    pub(crate) fn contains(&self, fingerprint: &str) -> bool {
91        self.alerts.contains_key(fingerprint)
92    }
93
94    /// Fold a duplicate into the existing active alert.
95    pub(crate) fn fold(&mut self, fingerprint: &str, now: i64) {
96        if let Some(alert) = self.alerts.get_mut(fingerprint) {
97            alert.fire_count += 1;
98            alert.last_seen = now;
99        }
100    }
101
102    /// Open a new active alert for a first-fire result.
103    pub(crate) fn insert(
104        &mut self,
105        fingerprint: String,
106        now: i64,
107        sample: Value,
108        fields: Vec<(String, Value)>,
109    ) {
110        self.alerts.insert(
111            fingerprint,
112            ActiveAlert {
113                first_seen: now,
114                last_seen: now,
115                last_emitted: now,
116                fire_count: 1,
117                emitted_count: 1,
118                sample,
119                fields,
120            },
121        );
122    }
123
124    /// Advance time: emit repeat records for due alerts and resolved records
125    /// for idle alerts, evicting the latter.
126    pub(crate) fn tick(&mut self, cfg: &DedupConfig, now: i64) -> Vec<DedupRecord> {
127        let resolve_secs = cfg.resolve_timeout.as_secs() as i64;
128        let repeat_secs = cfg.repeat_interval.as_secs() as i64;
129        let mut out = Vec::new();
130        let mut resolved = Vec::new();
131
132        for (fingerprint, alert) in self.alerts.iter_mut() {
133            if now - alert.last_seen >= resolve_secs {
134                out.push(DedupRecord {
135                    state: "resolved",
136                    json: build_record(alert, fingerprint, "resolved"),
137                });
138                resolved.push(fingerprint.clone());
139            } else if repeat_secs > 0
140                && now - alert.last_emitted >= repeat_secs
141                && alert.fire_count > alert.emitted_count
142            {
143                out.push(DedupRecord {
144                    state: "repeat",
145                    json: build_record(alert, fingerprint, "repeat"),
146                });
147                alert.last_emitted = now;
148                alert.emitted_count = alert.fire_count;
149            }
150        }
151
152        for key in resolved {
153            self.alerts.remove(&key);
154        }
155        out
156    }
157
158    /// Snapshot the active alerts (fingerprint -> alert) for persistence.
159    pub(crate) fn snapshot(&self) -> Vec<(String, ActiveAlert)> {
160        self.alerts
161            .iter()
162            .map(|(k, v)| (k.clone(), v.clone()))
163            .collect()
164    }
165
166    /// Restore active alerts, dropping any already past `resolve_timeout` at
167    /// `now`.
168    pub(crate) fn restore(
169        &mut self,
170        alerts: Vec<(String, ActiveAlert)>,
171        now: i64,
172        resolve_secs: i64,
173    ) {
174        for (fingerprint, alert) in alerts {
175            if now - alert.last_seen < resolve_secs {
176                self.alerts.insert(fingerprint, alert);
177            }
178        }
179    }
180}
181
182/// Compute the fingerprint for a result under `selectors`.
183///
184/// The rule identity is always part of the fingerprint; each selector
185/// contributes its resolved value or an explicit null marker. The canonical
186/// string is reduced to a compact, stable 64-bit FNV-1a hex digest so the same
187/// logical alert keeps one fingerprint across restarts.
188pub(crate) fn fingerprint(selectors: &[Selector], result: &EvaluationResult) -> String {
189    let rule = result
190        .header
191        .rule_id
192        .as_deref()
193        .unwrap_or(result.header.rule_title.as_str());
194
195    let mut buf = String::with_capacity(64);
196    buf.push_str("rule=");
197    buf.push_str(rule);
198    for sel in selectors {
199        buf.push('\u{1f}');
200        buf.push_str(&sel.as_str());
201        buf.push('=');
202        match sel.resolve(result) {
203            Some(value) => buf.push_str(&canonical(&value)),
204            None => buf.push_str("\u{0}null"),
205        }
206    }
207    format!("{:016x}", fnv1a64(buf.as_bytes()))
208}
209
210/// Resolve the selector values once, for storage on the active alert and
211/// surfacing on the summary records.
212pub(crate) fn resolve_fields(
213    selectors: &[Selector],
214    result: &EvaluationResult,
215) -> Vec<(String, Value)> {
216    selectors
217        .iter()
218        .map(|sel| (sel.as_str(), sel.resolve(result).unwrap_or(Value::Null)))
219        .collect()
220}
221
222/// Canonical string form of a resolved value for fingerprinting. Strings use
223/// their raw text; everything else uses compact JSON.
224fn canonical(value: &Value) -> String {
225    match value {
226        Value::String(s) => s.clone(),
227        other => other.to_string(),
228    }
229}
230
231/// Build a `repeat` / `resolved` summary line (a JSON [`Value`]) from an active
232/// alert by injecting the `dedup_*` keys into the sample's `enrichments` object.
233fn build_record(alert: &ActiveAlert, fingerprint: &str, state: &'static str) -> Value {
234    let mut result = alert.sample.clone();
235    if !result.is_object() {
236        result = Value::Object(serde_json::Map::new());
237    }
238    let obj = result.as_object_mut().expect("result is an object");
239    let enrichments = obj
240        .entry("enrichments")
241        .or_insert_with(|| Value::Object(serde_json::Map::new()));
242    if !enrichments.is_object() {
243        *enrichments = Value::Object(serde_json::Map::new());
244    }
245    let map = enrichments
246        .as_object_mut()
247        .expect("enrichments is an object");
248    map.insert("dedup_state".to_string(), Value::String(state.to_string()));
249    map.insert(
250        "dedup_fingerprint".to_string(),
251        Value::String(fingerprint.to_string()),
252    );
253    map.insert(
254        "dedup_fire_count".to_string(),
255        Value::from(alert.fire_count),
256    );
257    map.insert(
258        "dedup_first_seen".to_string(),
259        Value::from(alert.first_seen),
260    );
261    map.insert("dedup_last_seen".to_string(), Value::from(alert.last_seen));
262    let fields: serde_json::Map<String, Value> = alert.fields.iter().cloned().collect();
263    map.insert("dedup_fields".to_string(), Value::Object(fields));
264    result
265}
266
267/// The event-stripped, serialized form of a result, retained as a long-lived
268/// sample. A `Value` (not an [`EvaluationResult`]) so the store is persistable.
269pub(crate) fn sample_of(result: &EvaluationResult) -> Value {
270    let mut sample = result.clone();
271    strip_event_payloads(&mut sample);
272    serde_json::to_value(&sample).unwrap_or(Value::Null)
273}
274
275/// FNV-1a 64-bit. Inlined so the digest is stable across toolchains and crate
276/// versions (unlike `DefaultHasher`), which matters for persisted fingerprints.
277pub(crate) fn fnv1a64(bytes: &[u8]) -> u64 {
278    let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
279    for &byte in bytes {
280        hash ^= u64::from(byte);
281        hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
282    }
283    hash
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289    use rsigma_eval::{DetectionBody, EvaluationResult, FieldMatch, ResultBody, RuleHeader};
290    use rsigma_parser::Level;
291    use std::collections::HashMap;
292    use std::sync::Arc;
293
294    fn result(ip: &str) -> EvaluationResult {
295        EvaluationResult {
296            header: RuleHeader {
297                rule_title: "Brute force".to_string(),
298                rule_id: Some("rule-1".to_string()),
299                level: Some(Level::High),
300                tags: vec![],
301                custom_attributes: Arc::new(HashMap::new()),
302                enrichments: None,
303            },
304            body: ResultBody::Detection(DetectionBody {
305                matched_selections: vec![],
306                matched_fields: vec![FieldMatch::new("SourceIp", serde_json::json!(ip))],
307                event: Some(serde_json::json!({"big": "payload"})),
308            }),
309        }
310    }
311
312    fn cfg(repeat: u64, resolve: u64) -> DedupConfig {
313        DedupConfig {
314            fingerprint: vec![Selector::parse("match.SourceIp").unwrap()],
315            repeat_interval: Duration::from_secs(repeat),
316            resolve_timeout: Duration::from_secs(resolve),
317            max_active_alerts: 100_000,
318        }
319    }
320
321    #[test]
322    fn fingerprint_is_stable_and_value_sensitive() {
323        let c = cfg(0, 60);
324        let a = fingerprint(&c.fingerprint, &result("10.0.0.1"));
325        let b = fingerprint(&c.fingerprint, &result("10.0.0.1"));
326        let d = fingerprint(&c.fingerprint, &result("10.0.0.2"));
327        assert_eq!(a, b, "same inputs must hash identically");
328        assert_ne!(a, d, "different selector values must differ");
329    }
330
331    #[test]
332    fn first_fire_opens_alert_then_folds() {
333        let c = cfg(0, 60);
334        let mut store = DedupStore::default();
335        let r = result("10.0.0.1");
336        let fp = fingerprint(&c.fingerprint, &r);
337
338        assert!(!store.contains(&fp));
339        store.insert(
340            fp.clone(),
341            100,
342            sample_of(&r),
343            resolve_fields(&c.fingerprint, &r),
344        );
345        assert_eq!(store.len(), 1);
346
347        // The stored sample has no raw event payload.
348        // (Folding does not re-store the sample.)
349        store.fold(&fp, 105);
350        store.fold(&fp, 110);
351        // Three fires total: one insert + two folds.
352        let records = {
353            // Force resolution by jumping past resolve_timeout.
354            store.tick(&c, 200)
355        };
356        assert_eq!(records.len(), 1);
357        assert_eq!(records[0].state, "resolved");
358        let enr = &records[0].json["enrichments"];
359        assert_eq!(enr["dedup_state"], serde_json::json!("resolved"));
360        assert_eq!(enr["dedup_fire_count"], serde_json::json!(3));
361        assert!(records[0].json.get("event").is_none());
362        assert!(store.is_empty(), "resolved alert is evicted");
363    }
364
365    #[test]
366    fn repeat_emits_only_when_new_fires_accumulate() {
367        let c = cfg(10, 600);
368        let mut store = DedupStore::default();
369        let r = result("10.0.0.1");
370        let fp = fingerprint(&c.fingerprint, &r);
371        store.insert(
372            fp.clone(),
373            0,
374            sample_of(&r),
375            resolve_fields(&c.fingerprint, &r),
376        );
377
378        // No new fires since insert: a repeat tick is a no-op.
379        assert!(store.tick(&c, 20).is_empty());
380
381        // A fold then a due tick emits one repeat carrying the new count.
382        store.fold(&fp, 25);
383        let records = store.tick(&c, 40);
384        assert_eq!(records.len(), 1);
385        assert_eq!(records[0].state, "repeat");
386        assert_eq!(
387            records[0].json["enrichments"]["dedup_fire_count"],
388            serde_json::json!(2)
389        );
390
391        // Immediately after a repeat, with no new fires, the next tick is a no-op.
392        assert!(store.tick(&c, 55).is_empty());
393        assert_eq!(store.len(), 1, "still active, not resolved");
394    }
395
396    #[test]
397    fn repeat_interval_zero_is_pure_suppression() {
398        let c = cfg(0, 100);
399        let mut store = DedupStore::default();
400        let r = result("10.0.0.1");
401        let fp = fingerprint(&c.fingerprint, &r);
402        store.insert(
403            fp.clone(),
404            0,
405            sample_of(&r),
406            resolve_fields(&c.fingerprint, &r),
407        );
408        store.fold(&fp, 10);
409        // No repeats ever, even with new fires.
410        assert!(store.tick(&c, 50).is_empty());
411        // Only a resolved record on expiry.
412        let records = store.tick(&c, 200);
413        assert_eq!(records.len(), 1);
414        assert_eq!(records[0].state, "resolved");
415    }
416}