1use std::collections::{BTreeMap, BTreeSet, HashMap};
21use std::time::Duration;
22
23use rsigma_eval::EvaluationResult;
24use rsigma_parser::Level;
25use serde::{Deserialize, Serialize};
26use serde_json::Value;
27
28use super::dedup::fnv1a64;
29use super::strip_event_payloads;
30use crate::selector::Selector;
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum GroupMode {
35 GroupBy,
37 EntityGraph,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum IncludeMode {
44 Refs,
46 Results,
48}
49
50#[derive(Debug, Clone, Copy)]
52pub struct Caps {
53 pub max_open_incidents: usize,
55 pub max_entities_per_incident: usize,
57 pub max_results_per_incident: usize,
59 pub max_value_cardinality: u64,
62}
63
64impl Default for Caps {
65 fn default() -> Self {
66 Caps {
67 max_open_incidents: 10_000,
68 max_entities_per_incident: 1_000,
69 max_results_per_incident: 1_000,
70 max_value_cardinality: 10_000,
71 }
72 }
73}
74
75#[derive(Debug, Clone)]
77pub struct GroupConfig {
78 pub mode: GroupMode,
79 pub by: Vec<Selector>,
81 pub entities: Vec<Selector>,
83 pub group_wait: Duration,
84 pub group_interval: Duration,
85 pub repeat_interval: Duration,
86 pub resolve_timeout: Duration,
87 pub include: IncludeMode,
88 pub caps: Caps,
89 pub stop_values: BTreeSet<String>,
91 pub nats_subject: Option<String>,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct IncidentRef {
98 pub rule: String,
100 #[serde(skip_serializing_if = "Option::is_none")]
102 pub level: Option<String>,
103}
104
105#[derive(Debug, Clone, Serialize)]
108pub struct IncidentResult {
109 pub incident_id: String,
112 pub state: &'static str,
114 pub trigger: &'static str,
117 pub first_seen: i64,
119 pub last_seen: i64,
120 #[serde(skip_serializing_if = "Option::is_none")]
122 pub max_level: Option<String>,
123 pub result_count: u64,
125 pub rule_counts: BTreeMap<String, u64>,
127 #[serde(skip_serializing_if = "serde_json::Map::is_empty")]
129 pub group_by: serde_json::Map<String, Value>,
130 #[serde(skip_serializing_if = "serde_json::Map::is_empty")]
132 pub entities: serde_json::Map<String, Value>,
133 #[serde(skip_serializing_if = "Option::is_none")]
135 pub refs: Option<Vec<IncidentRef>>,
136 #[serde(skip_serializing_if = "Option::is_none")]
139 pub results: Option<Vec<Value>>,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
147pub(crate) struct Incident {
148 id: String,
149 first_seen: i64,
150 last_seen: i64,
151 last_emitted: i64,
152 emitted_count: u64,
153 opened: bool,
154 dirty: bool,
155 group_by: Vec<(String, Value)>,
156 entities: BTreeMap<String, BTreeSet<String>>,
157 max_level: Option<Level>,
158 rule_counts: BTreeMap<String, u64>,
159 result_count: u64,
160 refs: Vec<IncidentRef>,
161 results: Vec<Value>,
162}
163
164pub(crate) struct IncidentEmission {
166 pub trigger: &'static str,
167 pub result: IncidentResult,
168}
169
170#[derive(Debug, Default)]
173pub struct IncidentStore {
174 incidents: HashMap<String, Incident>,
175 entity_index: HashMap<(String, String), String>,
177 value_counts: HashMap<(String, String), u64>,
179}
180
181#[derive(Debug, Clone, Copy, PartialEq, Eq)]
184pub(crate) enum OvermergeGuard {
185 StopValue,
186 CardinalityCeiling,
187}
188
189impl IncidentStore {
190 pub fn len(&self) -> usize {
192 self.incidents.len()
193 }
194
195 pub fn is_empty(&self) -> bool {
197 self.incidents.is_empty()
198 }
199
200 pub fn snapshot(&self, include: IncludeMode) -> Vec<IncidentResult> {
202 self.incidents
203 .values()
204 .map(|inc| inc.to_result("open", "snapshot", include))
205 .collect()
206 }
207
208 pub(crate) fn assign(
211 &mut self,
212 cfg: &GroupConfig,
213 result: &EvaluationResult,
214 now: i64,
215 mut on_guard: impl FnMut(OvermergeGuard),
216 ) -> Option<String> {
217 match cfg.mode {
218 GroupMode::GroupBy => self.assign_group_by(cfg, result, now),
219 GroupMode::EntityGraph => self.assign_entity_graph(cfg, result, now, &mut on_guard),
220 }
221 }
222
223 fn assign_group_by(
224 &mut self,
225 cfg: &GroupConfig,
226 result: &EvaluationResult,
227 now: i64,
228 ) -> Option<String> {
229 let (id, key) = group_fingerprint(&cfg.by, result);
230 let exists = self.incidents.contains_key(&id);
231 if !exists && self.incidents.len() >= cfg.caps.max_open_incidents {
232 return None;
233 }
234 let incident = self
235 .incidents
236 .entry(id.clone())
237 .or_insert_with(|| Incident::new(id.clone(), now, key.clone()));
238 incident.absorb(result, now, cfg);
239 Some(id)
240 }
241
242 fn assign_entity_graph(
243 &mut self,
244 cfg: &GroupConfig,
245 result: &EvaluationResult,
246 now: i64,
247 on_guard: &mut impl FnMut(OvermergeGuard),
248 ) -> Option<String> {
249 let mut pairs: Vec<(String, String)> = Vec::new();
252 for sel in &cfg.entities {
253 let Some(value) = sel.resolve(result) else {
254 continue;
255 };
256 let value = match value {
257 Value::String(s) => s,
258 other => other.to_string(),
259 };
260 if cfg.stop_values.contains(&value) {
261 on_guard(OvermergeGuard::StopValue);
262 continue;
263 }
264 let key = (sel.as_str(), value);
265 let count = self.value_counts.entry(key.clone()).or_insert(0);
266 *count += 1;
267 if *count > cfg.caps.max_value_cardinality {
268 on_guard(OvermergeGuard::CardinalityCeiling);
269 continue;
270 }
271 pairs.push(key);
272 }
273
274 let mut touched: Vec<String> = Vec::new();
276 for pair in &pairs {
277 if let Some(id) = self.entity_index.get(pair)
278 && !touched.contains(id)
279 {
280 touched.push(id.clone());
281 }
282 }
283
284 let survivor_id = match touched.first().cloned() {
285 None => {
286 if self.incidents.len() >= cfg.caps.max_open_incidents {
287 return None;
288 }
289 let id = uuid::Uuid::new_v4().to_string();
290 self.incidents
291 .insert(id.clone(), Incident::new(id.clone(), now, Vec::new()));
292 id
293 }
294 Some(survivor) => {
295 for other in touched.iter().skip(1) {
297 self.merge(&survivor, other, cfg.caps.max_results_per_incident);
298 }
299 survivor
300 }
301 };
302
303 if let Some(incident) = self.incidents.get_mut(&survivor_id) {
305 for (sel, value) in &pairs {
306 let set = incident.entities.entry(sel.clone()).or_default();
307 if set.len() < cfg.caps.max_entities_per_incident {
308 set.insert(value.clone());
309 }
310 }
311 incident.absorb(result, now, cfg);
312 }
313 for pair in pairs {
314 self.entity_index.insert(pair, survivor_id.clone());
315 }
316 Some(survivor_id)
317 }
318
319 fn merge(&mut self, survivor: &str, other: &str, max_results: usize) {
321 if survivor == other {
322 return;
323 }
324 let Some(mut victim) = self.incidents.remove(other) else {
325 return;
326 };
327 for (sel, values) in &victim.entities {
329 for value in values {
330 self.entity_index
331 .insert((sel.clone(), value.clone()), survivor.to_string());
332 }
333 }
334 if let Some(inc) = self.incidents.get_mut(survivor) {
335 inc.first_seen = inc.first_seen.min(victim.first_seen);
336 inc.last_seen = inc.last_seen.max(victim.last_seen);
337 inc.result_count += victim.result_count;
338 inc.dirty = true;
339 inc.max_level = max_level(inc.max_level, victim.max_level);
340 for (rule, count) in victim.rule_counts {
341 *inc.rule_counts.entry(rule).or_insert(0) += count;
342 }
343 for (sel, values) in victim.entities {
344 inc.entities.entry(sel).or_default().extend(values);
345 }
346 inc.refs.append(&mut victim.refs);
347 inc.results.append(&mut victim.results);
348 inc.refs.truncate(max_results);
350 inc.results.truncate(max_results);
351 }
352 }
353
354 pub(crate) fn tick(&mut self, cfg: &GroupConfig, now: i64) -> Vec<IncidentEmission> {
357 let group_wait = cfg.group_wait.as_secs() as i64;
358 let group_interval = cfg.group_interval.as_secs() as i64;
359 let repeat = cfg.repeat_interval.as_secs() as i64;
360 let resolve = cfg.resolve_timeout.as_secs() as i64;
361
362 let mut out = Vec::new();
363 let mut resolved = Vec::new();
364 for (id, inc) in self.incidents.iter_mut() {
365 if !inc.opened {
366 if now - inc.first_seen >= group_wait {
367 out.push(IncidentEmission {
368 trigger: "group_wait",
369 result: inc.to_result("open", "group_wait", cfg.include),
370 });
371 inc.opened = true;
372 inc.dirty = false;
373 inc.last_emitted = now;
374 inc.emitted_count = inc.result_count;
375 }
376 continue;
377 }
378 if now - inc.last_seen >= resolve {
379 out.push(IncidentEmission {
380 trigger: "resolved",
381 result: inc.to_result("resolved", "resolved", cfg.include),
382 });
383 resolved.push(id.clone());
384 } else if inc.dirty && now - inc.last_emitted >= group_interval {
385 out.push(IncidentEmission {
386 trigger: "group_interval",
387 result: inc.to_result("open", "group_interval", cfg.include),
388 });
389 inc.dirty = false;
390 inc.last_emitted = now;
391 inc.emitted_count = inc.result_count;
392 } else if repeat > 0 && now - inc.last_emitted >= repeat {
393 out.push(IncidentEmission {
394 trigger: "repeat",
395 result: inc.to_result("open", "repeat", cfg.include),
396 });
397 inc.last_emitted = now;
398 }
399 }
400 for id in resolved {
401 if let Some(inc) = self.incidents.remove(&id) {
402 for (sel, values) in inc.entities {
406 for value in values {
407 let key = (sel.clone(), value);
408 self.entity_index.remove(&key);
409 self.value_counts.remove(&key);
410 }
411 }
412 }
413 }
414 out
415 }
416
417 pub(crate) fn export(&self) -> Vec<Incident> {
419 self.incidents.values().cloned().collect()
420 }
421
422 pub(crate) fn restore(&mut self, incidents: Vec<Incident>, now: i64, resolve_secs: i64) {
426 for inc in incidents {
427 if now - inc.last_seen >= resolve_secs {
428 continue;
429 }
430 for (sel, values) in &inc.entities {
431 for value in values {
432 self.entity_index
433 .insert((sel.clone(), value.clone()), inc.id.clone());
434 }
435 }
436 self.incidents.insert(inc.id.clone(), inc);
437 }
438 }
439}
440
441impl Incident {
442 fn new(id: String, now: i64, group_by: Vec<(String, Value)>) -> Self {
443 Incident {
444 id,
445 first_seen: now,
446 last_seen: now,
447 last_emitted: now,
448 emitted_count: 0,
449 opened: false,
450 dirty: true,
451 group_by,
452 entities: BTreeMap::new(),
453 max_level: None,
454 rule_counts: BTreeMap::new(),
455 result_count: 0,
456 refs: Vec::new(),
457 results: Vec::new(),
458 }
459 }
460
461 fn absorb(&mut self, result: &EvaluationResult, now: i64, cfg: &GroupConfig) {
462 self.last_seen = now;
463 self.dirty = true;
464 self.result_count += 1;
465 self.max_level = max_level(self.max_level, result.header.level);
466 let rule = result
467 .header
468 .rule_id
469 .clone()
470 .unwrap_or_else(|| result.header.rule_title.clone());
471 *self.rule_counts.entry(rule.clone()).or_insert(0) += 1;
472 match cfg.include {
473 IncludeMode::Refs => {
474 if self.refs.len() < cfg.caps.max_results_per_incident {
475 self.refs.push(IncidentRef {
476 rule,
477 level: result.header.level.map(|l| l.as_str().to_string()),
478 });
479 }
480 }
481 IncludeMode::Results => {
482 if self.results.len() < cfg.caps.max_results_per_incident {
483 let mut sample = result.clone();
484 strip_event_payloads(&mut sample);
485 self.results
486 .push(serde_json::to_value(&sample).unwrap_or(Value::Null));
487 }
488 }
489 }
490 }
491
492 fn to_result(
493 &self,
494 state: &'static str,
495 trigger: &'static str,
496 include: IncludeMode,
497 ) -> IncidentResult {
498 let group_by: serde_json::Map<String, Value> = self.group_by.iter().cloned().collect();
499 let entities: serde_json::Map<String, Value> = self
500 .entities
501 .iter()
502 .map(|(sel, values)| {
503 (
504 sel.clone(),
505 Value::Array(values.iter().cloned().map(Value::String).collect()),
506 )
507 })
508 .collect();
509 let (refs, results) = match include {
510 IncludeMode::Refs => (Some(self.refs.clone()), None),
511 IncludeMode::Results => (None, Some(self.results.clone())),
512 };
513 IncidentResult {
514 incident_id: self.id.clone(),
515 state,
516 trigger,
517 first_seen: self.first_seen,
518 last_seen: self.last_seen,
519 max_level: self.max_level.map(|l| l.as_str().to_string()),
520 result_count: self.result_count,
521 rule_counts: self.rule_counts.clone(),
522 group_by,
523 entities,
524 refs,
525 results,
526 }
527 }
528}
529
530pub(crate) fn group_fingerprint(
534 selectors: &[Selector],
535 result: &EvaluationResult,
536) -> (String, Vec<(String, Value)>) {
537 let mut buf = String::with_capacity(64);
538 let mut key = Vec::with_capacity(selectors.len());
539 for sel in selectors {
540 let value = sel.resolve(result).unwrap_or(Value::Null);
541 buf.push('\u{1f}');
542 buf.push_str(&sel.as_str());
543 buf.push('=');
544 match &value {
545 Value::String(s) => buf.push_str(s),
546 other => buf.push_str(&other.to_string()),
547 }
548 key.push((sel.as_str(), value));
549 }
550 (format!("{:016x}", fnv1a64(buf.as_bytes())), key)
551}
552
553fn level_rank(level: Level) -> u8 {
555 match level {
556 Level::Informational => 0,
557 Level::Low => 1,
558 Level::Medium => 2,
559 Level::High => 3,
560 Level::Critical => 4,
561 }
562}
563
564fn max_level(a: Option<Level>, b: Option<Level>) -> Option<Level> {
566 match (a, b) {
567 (Some(x), Some(y)) => Some(if level_rank(x) >= level_rank(y) { x } else { y }),
568 (Some(x), None) => Some(x),
569 (None, b) => b,
570 }
571}
572
573#[cfg(test)]
574mod tests {
575 use super::*;
576 use rsigma_eval::{DetectionBody, EvaluationResult, FieldMatch, ResultBody, RuleHeader};
577 use std::collections::HashMap;
578 use std::sync::Arc;
579
580 fn detection(rule: &str, ip: &str, user: &str, level: Level) -> EvaluationResult {
581 EvaluationResult {
582 header: RuleHeader {
583 rule_title: rule.to_string(),
584 rule_id: Some(rule.to_string()),
585 level: Some(level),
586 tags: vec![],
587 custom_attributes: Arc::new(HashMap::new()),
588 enrichments: None,
589 },
590 body: ResultBody::Detection(DetectionBody {
591 matched_selections: vec![],
592 matched_fields: vec![
593 FieldMatch::new("SourceIp", serde_json::json!(ip)),
594 FieldMatch::new("User", serde_json::json!(user)),
595 ],
596 event: None,
597 }),
598 }
599 }
600
601 fn group_by_cfg() -> GroupConfig {
602 GroupConfig {
603 mode: GroupMode::GroupBy,
604 by: vec![Selector::parse("match.SourceIp").unwrap()],
605 entities: vec![],
606 group_wait: Duration::from_secs(30),
607 group_interval: Duration::from_secs(300),
608 repeat_interval: Duration::from_secs(0),
609 resolve_timeout: Duration::from_secs(3600),
610 include: IncludeMode::Refs,
611 caps: Caps::default(),
612 stop_values: BTreeSet::new(),
613 nats_subject: None,
614 }
615 }
616
617 #[test]
618 fn group_by_assigns_same_key_to_one_incident() {
619 let cfg = group_by_cfg();
620 let mut store = IncidentStore::default();
621 let a = store.assign(
622 &cfg,
623 &detection("r1", "10.0.0.1", "alice", Level::High),
624 0,
625 |_| {},
626 );
627 let b = store.assign(
628 &cfg,
629 &detection("r2", "10.0.0.1", "bob", Level::Low),
630 1,
631 |_| {},
632 );
633 let c = store.assign(
634 &cfg,
635 &detection("r1", "10.0.0.2", "carol", Level::High),
636 2,
637 |_| {},
638 );
639 assert_eq!(a, b, "same SourceIp groups together across rules");
640 assert_ne!(a, c, "different SourceIp is a separate incident");
641 assert_eq!(store.len(), 2);
642 }
643
644 #[test]
645 fn group_by_fingerprint_is_deterministic() {
646 let cfg = group_by_cfg();
647 let mut s1 = IncidentStore::default();
648 let mut s2 = IncidentStore::default();
649 let id1 = s1.assign(
650 &cfg,
651 &detection("r", "1.2.3.4", "x", Level::High),
652 0,
653 |_| {},
654 );
655 let id2 = s2.assign(&cfg, &detection("r", "1.2.3.4", "y", Level::Low), 9, |_| {});
656 assert_eq!(id1, id2, "same key yields the same id across stores");
657 }
658
659 #[test]
660 fn group_wait_then_resolve() {
661 let cfg = group_by_cfg();
662 let mut store = IncidentStore::default();
663 store.assign(
664 &cfg,
665 &detection("r", "10.0.0.1", "a", Level::High),
666 0,
667 |_| {},
668 );
669 assert!(store.tick(&cfg, 10).is_empty(), "before group_wait");
670 let opened = store.tick(&cfg, 40);
671 assert_eq!(opened.len(), 1);
672 assert_eq!(opened[0].trigger, "group_wait");
673 assert_eq!(opened[0].result.state, "open");
674 let resolved = store.tick(&cfg, 40 + 3600);
675 assert_eq!(resolved.len(), 1);
676 assert_eq!(resolved[0].trigger, "resolved");
677 assert!(store.is_empty());
678 }
679
680 fn entity_cfg() -> GroupConfig {
681 GroupConfig {
682 mode: GroupMode::EntityGraph,
683 by: vec![],
684 entities: vec![
685 Selector::parse("match.SourceIp").unwrap(),
686 Selector::parse("match.User").unwrap(),
687 ],
688 group_wait: Duration::from_secs(0),
689 group_interval: Duration::from_secs(300),
690 repeat_interval: Duration::from_secs(0),
691 resolve_timeout: Duration::from_secs(3600),
692 include: IncludeMode::Refs,
693 caps: Caps::default(),
694 stop_values: BTreeSet::new(),
695 nats_subject: None,
696 }
697 }
698
699 #[test]
700 fn entity_graph_merges_via_shared_value() {
701 let cfg = entity_cfg();
702 let mut store = IncidentStore::default();
703 let a = store
705 .assign(
706 &cfg,
707 &detection("r", "10.0.0.1", "alice", Level::High),
708 0,
709 |_| {},
710 )
711 .unwrap();
712 let b = store
714 .assign(
715 &cfg,
716 &detection("r", "10.0.0.2", "bob", Level::Low),
717 1,
718 |_| {},
719 )
720 .unwrap();
721 assert_ne!(a, b);
722 assert_eq!(store.len(), 2);
723 let c = store
725 .assign(
726 &cfg,
727 &detection("r", "10.0.0.2", "alice", Level::Medium),
728 2,
729 |_| {},
730 )
731 .unwrap();
732 assert_eq!(store.len(), 1, "the bridge merged the two incidents");
733 assert!(c == a || c == b);
734 }
735
736 #[test]
737 fn entity_graph_stop_value_does_not_join() {
738 let mut cfg = entity_cfg();
739 cfg.stop_values.insert("0.0.0.0".to_string());
740 let mut store = IncidentStore::default();
741 let mut guards = 0;
742 store.assign(
744 &cfg,
745 &detection("r", "0.0.0.0", "alice", Level::High),
746 0,
747 |_| guards += 1,
748 );
749 store.assign(
750 &cfg,
751 &detection("r", "0.0.0.0", "bob", Level::High),
752 1,
753 |_| guards += 1,
754 );
755 assert_eq!(store.len(), 2, "stop value must not bridge incidents");
756 assert!(guards >= 2, "stop-value guard fired");
757 }
758
759 #[test]
760 fn cardinality_counter_freed_after_resolve() {
761 let mut cfg = entity_cfg();
764 cfg.caps.max_value_cardinality = 2;
765 cfg.entities = vec![Selector::parse("match.SourceIp").unwrap()];
766 let mut store = IncidentStore::default();
767 let mut guards = 0;
768 store.assign(
769 &cfg,
770 &detection("r", "10.0.0.9", "a", Level::High),
771 0,
772 |_| guards += 1,
773 );
774 store.assign(
775 &cfg,
776 &detection("r", "10.0.0.9", "b", Level::High),
777 1,
778 |_| guards += 1,
779 );
780 assert_eq!(guards, 0, "two occurrences are within the ceiling");
781 store.tick(&cfg, 0); store.tick(&cfg, 5000); assert!(store.is_empty());
784 store.assign(
787 &cfg,
788 &detection("r", "10.0.0.9", "c", Level::High),
789 6000,
790 |_| guards += 1,
791 );
792 assert_eq!(guards, 0, "counter reset after resolve");
793 assert_eq!(store.len(), 1);
794 }
795
796 #[test]
797 fn entity_graph_cardinality_ceiling_stops_joining() {
798 let mut cfg = entity_cfg();
799 cfg.caps.max_value_cardinality = 1;
800 cfg.entities = vec![Selector::parse("match.SourceIp").unwrap()];
801 let mut store = IncidentStore::default();
802 let mut guards = 0;
803 store.assign(
805 &cfg,
806 &detection("r", "10.0.0.9", "a", Level::High),
807 0,
808 |_| guards += 1,
809 );
810 store.assign(
811 &cfg,
812 &detection("r", "10.0.0.9", "b", Level::High),
813 1,
814 |_| guards += 1,
815 );
816 store.assign(
817 &cfg,
818 &detection("r", "10.0.0.9", "c", Level::High),
819 2,
820 |_| guards += 1,
821 );
822 assert!(guards >= 2, "cardinality guard fired after the ceiling");
823 assert_eq!(store.len(), 3);
825 }
826}