Skip to main content

rsigma_runtime/alert_pipeline/
grouping.rs

1//! Incident grouping, modeled on Alertmanager.
2//!
3//! Two modes:
4//!
5//! - `group_by` (default): group results by equality on an ordered selector
6//!   list. The incident identity is a deterministic fingerprint of the
7//!   `(selector -> value)` pairs, stable across restarts and re-emissions.
8//! - `entity_graph` (opt-in): union-find over `(selector, value)` entity pairs.
9//!   A result joins (and merges) any open incident sharing an entity value.
10//!   Guarded against the giant-component failure by a `stop_values` list and a
11//!   per-value `max_value_cardinality` ceiling above which a value stops acting
12//!   as a join key.
13//!
14//! Incidents follow the Alertmanager timers: `group_wait` (initial batching
15//! delay before the first emission), `group_interval` (minimum delay before
16//! emitting an updated incident), and `repeat_interval` (re-emit cadence for a
17//! still-open incident). An incident resolves and is evicted once
18//! `resolve_timeout` elapses with no new results.
19
20use std::collections::{BTreeMap, BTreeSet, HashMap};
21use std::time::Duration;
22
23use rsigma_eval::EvaluationResult;
24use rsigma_parser::Level;
25use serde::{Deserialize, Serialize};
26use serde_json::Value;
27
28use super::dedup::fnv1a64;
29use super::strip_event_payloads;
30use crate::selector::Selector;
31
32/// Whether the layer groups by key equality or by entity union-find.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum GroupMode {
35    /// Group by equality on the `by` selectors (deterministic incident id).
36    GroupBy,
37    /// Union-find over `entities` selector values (surrogate UUID id).
38    EntityGraph,
39}
40
41/// How much contributing-result detail to embed in an `IncidentResult`.
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum IncludeMode {
44    /// Lightweight references (rule + level) only.
45    Refs,
46    /// Full (event-stripped) contributing results.
47    Results,
48}
49
50/// Bounds on incident growth, with eviction metrics when exceeded.
51#[derive(Debug, Clone, Copy)]
52pub struct Caps {
53    /// Maximum simultaneously-open incidents.
54    pub max_open_incidents: usize,
55    /// Maximum distinct entity values retained per incident (entity_graph).
56    pub max_entities_per_incident: usize,
57    /// Maximum contributing results/refs retained per incident.
58    pub max_results_per_incident: usize,
59    /// Per-value occurrence ceiling above which an entity value stops joining
60    /// (entity_graph giant-component guard).
61    pub max_value_cardinality: u64,
62}
63
64impl Default for Caps {
65    fn default() -> Self {
66        Caps {
67            max_open_incidents: 10_000,
68            max_entities_per_incident: 1_000,
69            max_results_per_incident: 1_000,
70            max_value_cardinality: 10_000,
71        }
72    }
73}
74
75/// Validated grouping configuration.
76#[derive(Debug, Clone)]
77pub struct GroupConfig {
78    pub mode: GroupMode,
79    /// `group_by` mode: selectors whose values form the group key.
80    pub by: Vec<Selector>,
81    /// `entity_graph` mode: selectors whose values form join edges.
82    pub entities: Vec<Selector>,
83    pub group_wait: Duration,
84    pub group_interval: Duration,
85    pub repeat_interval: Duration,
86    pub resolve_timeout: Duration,
87    pub include: IncludeMode,
88    pub caps: Caps,
89    /// Entity values that never form a join edge (entity_graph).
90    pub stop_values: BTreeSet<String>,
91    /// Optional NATS subject override for emitted incidents.
92    pub nats_subject: Option<String>,
93}
94
95/// A lightweight reference to a contributing result.
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct IncidentRef {
98    /// Rule id, falling back to the rule title.
99    pub rule: String,
100    /// Severity, lowercased.
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub level: Option<String>,
103}
104
105/// The wire shape emitted for an incident. One flat NDJSON object,
106/// disambiguated downstream by the presence of `incident_id`.
107#[derive(Debug, Clone, Serialize)]
108pub struct IncidentResult {
109    /// Deterministic group fingerprint (group_by) or surrogate UUIDv4
110    /// (entity_graph). Stable across restarts in group_by mode.
111    pub incident_id: String,
112    /// `open` or `resolved`.
113    pub state: &'static str,
114    /// What produced this emission: `group_wait` / `group_interval` /
115    /// `repeat` / `resolved`.
116    pub trigger: &'static str,
117    /// First and last contributing-result timestamps (unix seconds).
118    pub first_seen: i64,
119    pub last_seen: i64,
120    /// Highest severity seen across contributing results.
121    #[serde(skip_serializing_if = "Option::is_none")]
122    pub max_level: Option<String>,
123    /// Number of contributing results.
124    pub result_count: u64,
125    /// Per-rule contributing-result counts.
126    pub rule_counts: BTreeMap<String, u64>,
127    /// group_by mode: the group key field values.
128    #[serde(skip_serializing_if = "serde_json::Map::is_empty")]
129    pub group_by: serde_json::Map<String, Value>,
130    /// entity_graph mode: the entity values that bind the incident.
131    #[serde(skip_serializing_if = "serde_json::Map::is_empty")]
132    pub entities: serde_json::Map<String, Value>,
133    /// Contributing references (`include: refs`).
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub refs: Option<Vec<IncidentRef>>,
136    /// Contributing results (`include: results`), event payloads stripped and
137    /// stored as serialized JSON values.
138    #[serde(skip_serializing_if = "Option::is_none")]
139    pub results: Option<Vec<Value>>,
140}
141
142/// Internal per-incident state.
143///
144/// Serializable for persistence: contributing `results` are stored as
145/// serialized JSON values (since [`EvaluationResult`] is serialize-only).
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub(crate) struct Incident {
148    id: String,
149    first_seen: i64,
150    last_seen: i64,
151    last_emitted: i64,
152    emitted_count: u64,
153    opened: bool,
154    dirty: bool,
155    group_by: Vec<(String, Value)>,
156    entities: BTreeMap<String, BTreeSet<String>>,
157    max_level: Option<Level>,
158    rule_counts: BTreeMap<String, u64>,
159    result_count: u64,
160    refs: Vec<IncidentRef>,
161    results: Vec<Value>,
162}
163
164/// A grouping outcome the driver records a metric for.
165pub(crate) struct IncidentEmission {
166    pub trigger: &'static str,
167    pub result: IncidentResult,
168}
169
170/// In-memory incident store, owned single-threaded by the sink task (or shared
171/// behind an `RwLock` so the HTTP API can read open incidents).
172#[derive(Debug, Default)]
173pub struct IncidentStore {
174    incidents: HashMap<String, Incident>,
175    /// entity_graph: `(selector, value)` -> incident id.
176    entity_index: HashMap<(String, String), String>,
177    /// entity_graph: per-value occurrence count for the cardinality guard.
178    value_counts: HashMap<(String, String), u64>,
179}
180
181/// A value the cardinality guard or stop-list suppressed, reported via a
182/// metric.
183#[derive(Debug, Clone, Copy, PartialEq, Eq)]
184pub(crate) enum OvermergeGuard {
185    StopValue,
186    CardinalityCeiling,
187}
188
189impl IncidentStore {
190    /// Number of open incidents.
191    pub fn len(&self) -> usize {
192        self.incidents.len()
193    }
194
195    /// True when no incidents are open.
196    pub fn is_empty(&self) -> bool {
197        self.incidents.is_empty()
198    }
199
200    /// A snapshot of every open incident, for the admin API.
201    pub fn snapshot(&self, include: IncludeMode) -> Vec<IncidentResult> {
202        self.incidents
203            .values()
204            .map(|inc| inc.to_result("open", "snapshot", include))
205            .collect()
206    }
207
208    /// Assign a result to an incident, returning the incident id to annotate
209    /// the pass-through result with. Records guard hits via `on_guard`.
210    pub(crate) fn assign(
211        &mut self,
212        cfg: &GroupConfig,
213        result: &EvaluationResult,
214        now: i64,
215        mut on_guard: impl FnMut(OvermergeGuard),
216    ) -> Option<String> {
217        match cfg.mode {
218            GroupMode::GroupBy => self.assign_group_by(cfg, result, now),
219            GroupMode::EntityGraph => self.assign_entity_graph(cfg, result, now, &mut on_guard),
220        }
221    }
222
223    fn assign_group_by(
224        &mut self,
225        cfg: &GroupConfig,
226        result: &EvaluationResult,
227        now: i64,
228    ) -> Option<String> {
229        let (id, key) = group_fingerprint(&cfg.by, result);
230        let exists = self.incidents.contains_key(&id);
231        if !exists && self.incidents.len() >= cfg.caps.max_open_incidents {
232            return None;
233        }
234        let incident = self
235            .incidents
236            .entry(id.clone())
237            .or_insert_with(|| Incident::new(id.clone(), now, key.clone()));
238        incident.absorb(result, now, cfg);
239        Some(id)
240    }
241
242    fn assign_entity_graph(
243        &mut self,
244        cfg: &GroupConfig,
245        result: &EvaluationResult,
246        now: i64,
247        on_guard: &mut impl FnMut(OvermergeGuard),
248    ) -> Option<String> {
249        // Extract joinable entity pairs, applying the stop-list and
250        // per-value cardinality ceiling.
251        let mut pairs: Vec<(String, String)> = Vec::new();
252        for sel in &cfg.entities {
253            let Some(value) = sel.resolve(result) else {
254                continue;
255            };
256            let value = match value {
257                Value::String(s) => s,
258                other => other.to_string(),
259            };
260            if cfg.stop_values.contains(&value) {
261                on_guard(OvermergeGuard::StopValue);
262                continue;
263            }
264            let key = (sel.as_str(), value);
265            let count = self.value_counts.entry(key.clone()).or_insert(0);
266            *count += 1;
267            if *count > cfg.caps.max_value_cardinality {
268                on_guard(OvermergeGuard::CardinalityCeiling);
269                continue;
270            }
271            pairs.push(key);
272        }
273
274        // Incidents already touched by any of these entity values.
275        let mut touched: Vec<String> = Vec::new();
276        for pair in &pairs {
277            if let Some(id) = self.entity_index.get(pair)
278                && !touched.contains(id)
279            {
280                touched.push(id.clone());
281            }
282        }
283
284        let survivor_id = match touched.first().cloned() {
285            None => {
286                if self.incidents.len() >= cfg.caps.max_open_incidents {
287                    return None;
288                }
289                let id = uuid::Uuid::new_v4().to_string();
290                self.incidents
291                    .insert(id.clone(), Incident::new(id.clone(), now, Vec::new()));
292                id
293            }
294            Some(survivor) => {
295                // Merge any other touched incidents into the survivor.
296                for other in touched.iter().skip(1) {
297                    self.merge(&survivor, other, cfg.caps.max_results_per_incident);
298                }
299                survivor
300            }
301        };
302
303        // Register the entity values and absorb the result.
304        if let Some(incident) = self.incidents.get_mut(&survivor_id) {
305            for (sel, value) in &pairs {
306                let set = incident.entities.entry(sel.clone()).or_default();
307                if set.len() < cfg.caps.max_entities_per_incident {
308                    set.insert(value.clone());
309                }
310            }
311            incident.absorb(result, now, cfg);
312        }
313        for pair in pairs {
314            self.entity_index.insert(pair, survivor_id.clone());
315        }
316        Some(survivor_id)
317    }
318
319    /// Merge incident `other` into `survivor`, repointing its entity index.
320    fn merge(&mut self, survivor: &str, other: &str, max_results: usize) {
321        if survivor == other {
322            return;
323        }
324        let Some(mut victim) = self.incidents.remove(other) else {
325            return;
326        };
327        // Repoint entity-index entries from the victim to the survivor.
328        for (sel, values) in &victim.entities {
329            for value in values {
330                self.entity_index
331                    .insert((sel.clone(), value.clone()), survivor.to_string());
332            }
333        }
334        if let Some(inc) = self.incidents.get_mut(survivor) {
335            inc.first_seen = inc.first_seen.min(victim.first_seen);
336            inc.last_seen = inc.last_seen.max(victim.last_seen);
337            inc.result_count += victim.result_count;
338            inc.dirty = true;
339            inc.max_level = max_level(inc.max_level, victim.max_level);
340            for (rule, count) in victim.rule_counts {
341                *inc.rule_counts.entry(rule).or_insert(0) += count;
342            }
343            for (sel, values) in victim.entities {
344                inc.entities.entry(sel).or_default().extend(values);
345            }
346            inc.refs.append(&mut victim.refs);
347            inc.results.append(&mut victim.results);
348            // Keep the per-incident cap after merging two incidents' samples.
349            inc.refs.truncate(max_results);
350            inc.results.truncate(max_results);
351        }
352    }
353
354    /// Advance time: emit incidents due per the Alertmanager timers and evict
355    /// resolved ones.
356    pub(crate) fn tick(&mut self, cfg: &GroupConfig, now: i64) -> Vec<IncidentEmission> {
357        let group_wait = cfg.group_wait.as_secs() as i64;
358        let group_interval = cfg.group_interval.as_secs() as i64;
359        let repeat = cfg.repeat_interval.as_secs() as i64;
360        let resolve = cfg.resolve_timeout.as_secs() as i64;
361
362        let mut out = Vec::new();
363        let mut resolved = Vec::new();
364        for (id, inc) in self.incidents.iter_mut() {
365            if !inc.opened {
366                if now - inc.first_seen >= group_wait {
367                    out.push(IncidentEmission {
368                        trigger: "group_wait",
369                        result: inc.to_result("open", "group_wait", cfg.include),
370                    });
371                    inc.opened = true;
372                    inc.dirty = false;
373                    inc.last_emitted = now;
374                    inc.emitted_count = inc.result_count;
375                }
376                continue;
377            }
378            if now - inc.last_seen >= resolve {
379                out.push(IncidentEmission {
380                    trigger: "resolved",
381                    result: inc.to_result("resolved", "resolved", cfg.include),
382                });
383                resolved.push(id.clone());
384            } else if inc.dirty && now - inc.last_emitted >= group_interval {
385                out.push(IncidentEmission {
386                    trigger: "group_interval",
387                    result: inc.to_result("open", "group_interval", cfg.include),
388                });
389                inc.dirty = false;
390                inc.last_emitted = now;
391                inc.emitted_count = inc.result_count;
392            } else if repeat > 0 && now - inc.last_emitted >= repeat {
393                out.push(IncidentEmission {
394                    trigger: "repeat",
395                    result: inc.to_result("open", "repeat", cfg.include),
396                });
397                inc.last_emitted = now;
398            }
399        }
400        for id in resolved {
401            if let Some(inc) = self.incidents.remove(&id) {
402                // Drop the resolved incident's entity values from both the index
403                // and the cardinality counters so neither grows unbounded with
404                // distinct entity values over time.
405                for (sel, values) in inc.entities {
406                    for value in values {
407                        let key = (sel.clone(), value);
408                        self.entity_index.remove(&key);
409                        self.value_counts.remove(&key);
410                    }
411                }
412            }
413        }
414        out
415    }
416
417    /// Export open incidents for persistence.
418    pub(crate) fn export(&self) -> Vec<Incident> {
419        self.incidents.values().cloned().collect()
420    }
421
422    /// Restore incidents, dropping any already past `resolve_timeout` at `now`,
423    /// and rebuild the entity index from the restored incidents' entity values
424    /// (the index and cardinality counters are not themselves persisted).
425    pub(crate) fn restore(&mut self, incidents: Vec<Incident>, now: i64, resolve_secs: i64) {
426        for inc in incidents {
427            if now - inc.last_seen >= resolve_secs {
428                continue;
429            }
430            for (sel, values) in &inc.entities {
431                for value in values {
432                    self.entity_index
433                        .insert((sel.clone(), value.clone()), inc.id.clone());
434                }
435            }
436            self.incidents.insert(inc.id.clone(), inc);
437        }
438    }
439}
440
441impl Incident {
442    fn new(id: String, now: i64, group_by: Vec<(String, Value)>) -> Self {
443        Incident {
444            id,
445            first_seen: now,
446            last_seen: now,
447            last_emitted: now,
448            emitted_count: 0,
449            opened: false,
450            dirty: true,
451            group_by,
452            entities: BTreeMap::new(),
453            max_level: None,
454            rule_counts: BTreeMap::new(),
455            result_count: 0,
456            refs: Vec::new(),
457            results: Vec::new(),
458        }
459    }
460
461    fn absorb(&mut self, result: &EvaluationResult, now: i64, cfg: &GroupConfig) {
462        self.last_seen = now;
463        self.dirty = true;
464        self.result_count += 1;
465        self.max_level = max_level(self.max_level, result.header.level);
466        let rule = result
467            .header
468            .rule_id
469            .clone()
470            .unwrap_or_else(|| result.header.rule_title.clone());
471        *self.rule_counts.entry(rule.clone()).or_insert(0) += 1;
472        match cfg.include {
473            IncludeMode::Refs => {
474                if self.refs.len() < cfg.caps.max_results_per_incident {
475                    self.refs.push(IncidentRef {
476                        rule,
477                        level: result.header.level.map(|l| l.as_str().to_string()),
478                    });
479                }
480            }
481            IncludeMode::Results => {
482                if self.results.len() < cfg.caps.max_results_per_incident {
483                    let mut sample = result.clone();
484                    strip_event_payloads(&mut sample);
485                    self.results
486                        .push(serde_json::to_value(&sample).unwrap_or(Value::Null));
487                }
488            }
489        }
490    }
491
492    fn to_result(
493        &self,
494        state: &'static str,
495        trigger: &'static str,
496        include: IncludeMode,
497    ) -> IncidentResult {
498        let group_by: serde_json::Map<String, Value> = self.group_by.iter().cloned().collect();
499        let entities: serde_json::Map<String, Value> = self
500            .entities
501            .iter()
502            .map(|(sel, values)| {
503                (
504                    sel.clone(),
505                    Value::Array(values.iter().cloned().map(Value::String).collect()),
506                )
507            })
508            .collect();
509        let (refs, results) = match include {
510            IncludeMode::Refs => (Some(self.refs.clone()), None),
511            IncludeMode::Results => (None, Some(self.results.clone())),
512        };
513        IncidentResult {
514            incident_id: self.id.clone(),
515            state,
516            trigger,
517            first_seen: self.first_seen,
518            last_seen: self.last_seen,
519            max_level: self.max_level.map(|l| l.as_str().to_string()),
520            result_count: self.result_count,
521            rule_counts: self.rule_counts.clone(),
522            group_by,
523            entities,
524            refs,
525            results,
526        }
527    }
528}
529
530/// The deterministic group fingerprint and the resolved key values for a
531/// `group_by`-mode result. The rule identity is deliberately excluded so an
532/// incident can span rules that share the group key.
533pub(crate) fn group_fingerprint(
534    selectors: &[Selector],
535    result: &EvaluationResult,
536) -> (String, Vec<(String, Value)>) {
537    let mut buf = String::with_capacity(64);
538    let mut key = Vec::with_capacity(selectors.len());
539    for sel in selectors {
540        let value = sel.resolve(result).unwrap_or(Value::Null);
541        buf.push('\u{1f}');
542        buf.push_str(&sel.as_str());
543        buf.push('=');
544        match &value {
545            Value::String(s) => buf.push_str(s),
546            other => buf.push_str(&other.to_string()),
547        }
548        key.push((sel.as_str(), value));
549    }
550    (format!("{:016x}", fnv1a64(buf.as_bytes())), key)
551}
552
553/// Rank for severity comparison (`Level` is not `Ord`).
554fn level_rank(level: Level) -> u8 {
555    match level {
556        Level::Informational => 0,
557        Level::Low => 1,
558        Level::Medium => 2,
559        Level::High => 3,
560        Level::Critical => 4,
561    }
562}
563
564/// The higher-severity of two optional levels.
565fn max_level(a: Option<Level>, b: Option<Level>) -> Option<Level> {
566    match (a, b) {
567        (Some(x), Some(y)) => Some(if level_rank(x) >= level_rank(y) { x } else { y }),
568        (Some(x), None) => Some(x),
569        (None, b) => b,
570    }
571}
572
573#[cfg(test)]
574mod tests {
575    use super::*;
576    use rsigma_eval::{DetectionBody, EvaluationResult, FieldMatch, ResultBody, RuleHeader};
577    use std::collections::HashMap;
578    use std::sync::Arc;
579
580    fn detection(rule: &str, ip: &str, user: &str, level: Level) -> EvaluationResult {
581        EvaluationResult {
582            header: RuleHeader {
583                rule_title: rule.to_string(),
584                rule_id: Some(rule.to_string()),
585                level: Some(level),
586                tags: vec![],
587                custom_attributes: Arc::new(HashMap::new()),
588                enrichments: None,
589            },
590            body: ResultBody::Detection(DetectionBody {
591                matched_selections: vec![],
592                matched_fields: vec![
593                    FieldMatch::new("SourceIp", serde_json::json!(ip)),
594                    FieldMatch::new("User", serde_json::json!(user)),
595                ],
596                event: None,
597            }),
598        }
599    }
600
601    fn group_by_cfg() -> GroupConfig {
602        GroupConfig {
603            mode: GroupMode::GroupBy,
604            by: vec![Selector::parse("match.SourceIp").unwrap()],
605            entities: vec![],
606            group_wait: Duration::from_secs(30),
607            group_interval: Duration::from_secs(300),
608            repeat_interval: Duration::from_secs(0),
609            resolve_timeout: Duration::from_secs(3600),
610            include: IncludeMode::Refs,
611            caps: Caps::default(),
612            stop_values: BTreeSet::new(),
613            nats_subject: None,
614        }
615    }
616
617    #[test]
618    fn group_by_assigns_same_key_to_one_incident() {
619        let cfg = group_by_cfg();
620        let mut store = IncidentStore::default();
621        let a = store.assign(
622            &cfg,
623            &detection("r1", "10.0.0.1", "alice", Level::High),
624            0,
625            |_| {},
626        );
627        let b = store.assign(
628            &cfg,
629            &detection("r2", "10.0.0.1", "bob", Level::Low),
630            1,
631            |_| {},
632        );
633        let c = store.assign(
634            &cfg,
635            &detection("r1", "10.0.0.2", "carol", Level::High),
636            2,
637            |_| {},
638        );
639        assert_eq!(a, b, "same SourceIp groups together across rules");
640        assert_ne!(a, c, "different SourceIp is a separate incident");
641        assert_eq!(store.len(), 2);
642    }
643
644    #[test]
645    fn group_by_fingerprint_is_deterministic() {
646        let cfg = group_by_cfg();
647        let mut s1 = IncidentStore::default();
648        let mut s2 = IncidentStore::default();
649        let id1 = s1.assign(
650            &cfg,
651            &detection("r", "1.2.3.4", "x", Level::High),
652            0,
653            |_| {},
654        );
655        let id2 = s2.assign(&cfg, &detection("r", "1.2.3.4", "y", Level::Low), 9, |_| {});
656        assert_eq!(id1, id2, "same key yields the same id across stores");
657    }
658
659    #[test]
660    fn group_wait_then_resolve() {
661        let cfg = group_by_cfg();
662        let mut store = IncidentStore::default();
663        store.assign(
664            &cfg,
665            &detection("r", "10.0.0.1", "a", Level::High),
666            0,
667            |_| {},
668        );
669        assert!(store.tick(&cfg, 10).is_empty(), "before group_wait");
670        let opened = store.tick(&cfg, 40);
671        assert_eq!(opened.len(), 1);
672        assert_eq!(opened[0].trigger, "group_wait");
673        assert_eq!(opened[0].result.state, "open");
674        let resolved = store.tick(&cfg, 40 + 3600);
675        assert_eq!(resolved.len(), 1);
676        assert_eq!(resolved[0].trigger, "resolved");
677        assert!(store.is_empty());
678    }
679
680    fn entity_cfg() -> GroupConfig {
681        GroupConfig {
682            mode: GroupMode::EntityGraph,
683            by: vec![],
684            entities: vec![
685                Selector::parse("match.SourceIp").unwrap(),
686                Selector::parse("match.User").unwrap(),
687            ],
688            group_wait: Duration::from_secs(0),
689            group_interval: Duration::from_secs(300),
690            repeat_interval: Duration::from_secs(0),
691            resolve_timeout: Duration::from_secs(3600),
692            include: IncludeMode::Refs,
693            caps: Caps::default(),
694            stop_values: BTreeSet::new(),
695            nats_subject: None,
696        }
697    }
698
699    #[test]
700    fn entity_graph_merges_via_shared_value() {
701        let cfg = entity_cfg();
702        let mut store = IncidentStore::default();
703        // A: ip1 + alice
704        let a = store
705            .assign(
706                &cfg,
707                &detection("r", "10.0.0.1", "alice", Level::High),
708                0,
709                |_| {},
710            )
711            .unwrap();
712        // B: ip2 + bob (separate)
713        let b = store
714            .assign(
715                &cfg,
716                &detection("r", "10.0.0.2", "bob", Level::Low),
717                1,
718                |_| {},
719            )
720            .unwrap();
721        assert_ne!(a, b);
722        assert_eq!(store.len(), 2);
723        // C: ip2 + alice -> bridges A and B into one incident.
724        let c = store
725            .assign(
726                &cfg,
727                &detection("r", "10.0.0.2", "alice", Level::Medium),
728                2,
729                |_| {},
730            )
731            .unwrap();
732        assert_eq!(store.len(), 1, "the bridge merged the two incidents");
733        assert!(c == a || c == b);
734    }
735
736    #[test]
737    fn entity_graph_stop_value_does_not_join() {
738        let mut cfg = entity_cfg();
739        cfg.stop_values.insert("0.0.0.0".to_string());
740        let mut store = IncidentStore::default();
741        let mut guards = 0;
742        // Two results sharing only the stop value 0.0.0.0 must NOT merge.
743        store.assign(
744            &cfg,
745            &detection("r", "0.0.0.0", "alice", Level::High),
746            0,
747            |_| guards += 1,
748        );
749        store.assign(
750            &cfg,
751            &detection("r", "0.0.0.0", "bob", Level::High),
752            1,
753            |_| guards += 1,
754        );
755        assert_eq!(store.len(), 2, "stop value must not bridge incidents");
756        assert!(guards >= 2, "stop-value guard fired");
757    }
758
759    #[test]
760    fn cardinality_counter_freed_after_resolve() {
761        // A value's occurrence counter must not leak: once its incident
762        // resolves, the counter is dropped so the value can join fresh.
763        let mut cfg = entity_cfg();
764        cfg.caps.max_value_cardinality = 2;
765        cfg.entities = vec![Selector::parse("match.SourceIp").unwrap()];
766        let mut store = IncidentStore::default();
767        let mut guards = 0;
768        store.assign(
769            &cfg,
770            &detection("r", "10.0.0.9", "a", Level::High),
771            0,
772            |_| guards += 1,
773        );
774        store.assign(
775            &cfg,
776            &detection("r", "10.0.0.9", "b", Level::High),
777            1,
778            |_| guards += 1,
779        );
780        assert_eq!(guards, 0, "two occurrences are within the ceiling");
781        store.tick(&cfg, 0); // group_wait 0 opens the incident
782        store.tick(&cfg, 5000); // past resolve_timeout
783        assert!(store.is_empty());
784        // The counter for 10.0.0.9 was freed on resolve, so a new occurrence
785        // joins fresh rather than tripping the ceiling.
786        store.assign(
787            &cfg,
788            &detection("r", "10.0.0.9", "c", Level::High),
789            6000,
790            |_| guards += 1,
791        );
792        assert_eq!(guards, 0, "counter reset after resolve");
793        assert_eq!(store.len(), 1);
794    }
795
796    #[test]
797    fn entity_graph_cardinality_ceiling_stops_joining() {
798        let mut cfg = entity_cfg();
799        cfg.caps.max_value_cardinality = 1;
800        cfg.entities = vec![Selector::parse("match.SourceIp").unwrap()];
801        let mut store = IncidentStore::default();
802        let mut guards = 0;
803        // First occurrence joins; subsequent ones exceed the ceiling and stop.
804        store.assign(
805            &cfg,
806            &detection("r", "10.0.0.9", "a", Level::High),
807            0,
808            |_| guards += 1,
809        );
810        store.assign(
811            &cfg,
812            &detection("r", "10.0.0.9", "b", Level::High),
813            1,
814            |_| guards += 1,
815        );
816        store.assign(
817            &cfg,
818            &detection("r", "10.0.0.9", "c", Level::High),
819            2,
820            |_| guards += 1,
821        );
822        assert!(guards >= 2, "cardinality guard fired after the ceiling");
823        // The 2nd and 3rd did not join the 1st, so they each opened their own.
824        assert_eq!(store.len(), 3);
825    }
826}