1use std::collections::HashMap;
16
17use chrono::{DateTime, TimeZone, Utc};
18use serde::Serialize;
19
20use rsigma_parser::{CorrelationRule, CorrelationType, Level, SigmaCollection, SigmaRule};
21
22use crate::correlation::{
23 CompiledCorrelation, EventBuffer, EventRef, EventRefBuffer, GroupKey, WindowState,
24 compile_correlation,
25};
26use crate::engine::Engine;
27use crate::error::{EvalError, Result};
28use crate::event::Event;
29use crate::pipeline::{Pipeline, apply_pipelines};
30use crate::result::MatchResult;
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
41#[serde(rename_all = "snake_case")]
42pub enum CorrelationAction {
43 #[default]
46 Alert,
47 Reset,
50}
51
52impl std::str::FromStr for CorrelationAction {
53 type Err = String;
54 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
55 match s {
56 "alert" => Ok(CorrelationAction::Alert),
57 "reset" => Ok(CorrelationAction::Reset),
58 _ => Err(format!(
59 "Unknown correlation action: {s} (expected 'alert' or 'reset')"
60 )),
61 }
62 }
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
70#[serde(rename_all = "snake_case")]
71pub enum CorrelationEventMode {
72 #[default]
74 None,
75 Full,
78 Refs,
81}
82
83impl std::str::FromStr for CorrelationEventMode {
84 type Err = String;
85 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
86 match s.to_lowercase().as_str() {
87 "none" | "off" | "false" => Ok(CorrelationEventMode::None),
88 "full" | "true" => Ok(CorrelationEventMode::Full),
89 "refs" | "references" => Ok(CorrelationEventMode::Refs),
90 _ => Err(format!(
91 "Unknown correlation event mode: {s} (expected 'none', 'full', or 'refs')"
92 )),
93 }
94 }
95}
96
97impl std::fmt::Display for CorrelationEventMode {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 match self {
100 CorrelationEventMode::None => write!(f, "none"),
101 CorrelationEventMode::Full => write!(f, "full"),
102 CorrelationEventMode::Refs => write!(f, "refs"),
103 }
104 }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
109pub enum TimestampFallback {
110 #[default]
112 WallClock,
113 Skip,
117}
118
119#[derive(Debug, Clone)]
125pub struct CorrelationConfig {
126 pub timestamp_fields: Vec<String>,
131
132 pub timestamp_fallback: TimestampFallback,
136
137 pub max_state_entries: usize,
142
143 pub suppress: Option<u64>,
151
152 pub action_on_match: CorrelationAction,
156
157 pub emit_detections: bool,
163
164 pub correlation_event_mode: CorrelationEventMode,
172
173 pub max_correlation_events: usize,
179}
180
181impl Default for CorrelationConfig {
182 fn default() -> Self {
183 CorrelationConfig {
184 timestamp_fields: vec![
185 "@timestamp".to_string(),
186 "timestamp".to_string(),
187 "EventTime".to_string(),
188 "TimeCreated".to_string(),
189 "eventTime".to_string(),
190 ],
191 timestamp_fallback: TimestampFallback::default(),
192 max_state_entries: 100_000,
193 suppress: None,
194 action_on_match: CorrelationAction::default(),
195 emit_detections: true,
196 correlation_event_mode: CorrelationEventMode::default(),
197 max_correlation_events: 10,
198 }
199 }
200}
201
202#[derive(Debug, Clone, Serialize)]
208pub struct ProcessResult {
209 pub detections: Vec<MatchResult>,
211 pub correlations: Vec<CorrelationResult>,
213}
214
215#[derive(Debug, Clone, Serialize)]
217pub struct CorrelationResult {
218 pub rule_title: String,
220 pub rule_id: Option<String>,
222 pub level: Option<Level>,
224 pub tags: Vec<String>,
226 pub correlation_type: CorrelationType,
228 pub group_key: Vec<(String, String)>,
230 pub aggregated_value: f64,
232 pub timespan_secs: u64,
234 #[serde(skip_serializing_if = "Option::is_none")]
239 pub events: Option<Vec<serde_json::Value>>,
240 #[serde(skip_serializing_if = "Option::is_none")]
244 pub event_refs: Option<Vec<EventRef>>,
245}
246
247pub struct CorrelationEngine {
256 engine: Engine,
258 correlations: Vec<CompiledCorrelation>,
260 rule_index: HashMap<String, Vec<usize>>,
263 rule_ids: Vec<(Option<String>, Option<String>)>,
266 state: HashMap<(usize, GroupKey), WindowState>,
268 last_alert: HashMap<(usize, GroupKey), i64>,
270 event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
272 event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
274 correlation_only_rules: std::collections::HashSet<String>,
278 config: CorrelationConfig,
280 pipelines: Vec<Pipeline>,
282}
283
284impl CorrelationEngine {
285 pub fn new(config: CorrelationConfig) -> Self {
287 CorrelationEngine {
288 engine: Engine::new(),
289 correlations: Vec::new(),
290 rule_index: HashMap::new(),
291 rule_ids: Vec::new(),
292 state: HashMap::new(),
293 last_alert: HashMap::new(),
294 event_buffers: HashMap::new(),
295 event_ref_buffers: HashMap::new(),
296 correlation_only_rules: std::collections::HashSet::new(),
297 config,
298 pipelines: Vec::new(),
299 }
300 }
301
302 pub fn add_pipeline(&mut self, pipeline: Pipeline) {
306 self.pipelines.push(pipeline);
307 self.pipelines.sort_by_key(|p| p.priority);
308 }
309
310 pub fn set_include_event(&mut self, include: bool) {
312 self.engine.set_include_event(include);
313 }
314
315 pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
321 self.config.correlation_event_mode = mode;
322 }
323
324 pub fn set_max_correlation_events(&mut self, max: usize) {
327 self.config.max_correlation_events = max;
328 }
329
330 pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
336 if self.pipelines.is_empty() {
337 self.apply_custom_attributes(&rule.custom_attributes);
338 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
339 self.engine.add_rule(rule)?;
340 } else {
341 let mut transformed = rule.clone();
342 apply_pipelines(&self.pipelines, &mut transformed)?;
343 self.apply_custom_attributes(&transformed.custom_attributes);
344 self.rule_ids
345 .push((transformed.id.clone(), transformed.name.clone()));
346 let compiled = crate::compiler::compile_rule(&transformed)?;
348 self.engine.add_compiled_rule(compiled);
349 }
350 Ok(())
351 }
352
353 fn apply_custom_attributes(&mut self, attrs: &std::collections::HashMap<String, String>) {
367 if let Some(field) = attrs.get("rsigma.timestamp_field")
369 && !self.config.timestamp_fields.contains(field)
370 {
371 self.config.timestamp_fields.insert(0, field.clone());
372 }
373
374 if let Some(val) = attrs.get("rsigma.suppress")
376 && self.config.suppress.is_none()
377 && let Ok(ts) = rsigma_parser::Timespan::parse(val)
378 {
379 self.config.suppress = Some(ts.seconds);
380 }
381
382 if let Some(val) = attrs.get("rsigma.action")
384 && self.config.action_on_match == CorrelationAction::Alert
385 && let Ok(a) = val.parse::<CorrelationAction>()
386 {
387 self.config.action_on_match = a;
388 }
389 }
390
391 pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
393 self.apply_custom_attributes(&corr.custom_attributes);
396
397 let compiled = compile_correlation(corr)?;
398 let idx = self.correlations.len();
399
400 for rule_ref in &compiled.rule_refs {
402 self.rule_index
403 .entry(rule_ref.clone())
404 .or_default()
405 .push(idx);
406 }
407
408 if !compiled.generate {
410 for rule_ref in &compiled.rule_refs {
411 self.correlation_only_rules.insert(rule_ref.clone());
412 }
413 }
414
415 self.correlations.push(compiled);
416 Ok(())
417 }
418
419 pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
424 for rule in &collection.rules {
425 self.add_rule(rule)?;
426 }
427 for filter in &collection.filters {
429 self.engine.apply_filter(filter)?;
430 }
431 for corr in &collection.correlations {
432 self.add_correlation(corr)?;
433 }
434 self.validate_rule_refs()?;
435 self.detect_correlation_cycles()?;
436 Ok(())
437 }
438
439 fn validate_rule_refs(&self) -> Result<()> {
442 let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
443
444 for (id, name) in &self.rule_ids {
445 if let Some(id) = id {
446 known.insert(id.as_str());
447 }
448 if let Some(name) = name {
449 known.insert(name.as_str());
450 }
451 }
452 for corr in &self.correlations {
453 if let Some(ref id) = corr.id {
454 known.insert(id.as_str());
455 }
456 if let Some(ref name) = corr.name {
457 known.insert(name.as_str());
458 }
459 }
460
461 for corr in &self.correlations {
462 for rule_ref in &corr.rule_refs {
463 if !known.contains(rule_ref.as_str()) {
464 return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
465 }
466 }
467 }
468 Ok(())
469 }
470
471 fn detect_correlation_cycles(&self) -> Result<()> {
479 let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
481 for (idx, corr) in self.correlations.iter().enumerate() {
482 if let Some(ref id) = corr.id {
483 corr_identifiers.insert(id.as_str(), idx);
484 }
485 if let Some(ref name) = corr.name {
486 corr_identifiers.insert(name.as_str(), idx);
487 }
488 }
489
490 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
492 for (idx, corr) in self.correlations.iter().enumerate() {
493 for rule_ref in &corr.rule_refs {
494 if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
495 adj[idx].push(target_idx);
496 }
497 }
498 }
499
500 let mut state = vec![0u8; self.correlations.len()]; let mut path: Vec<usize> = Vec::new();
503
504 for start in 0..self.correlations.len() {
505 if state[start] == 0
506 && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
507 {
508 let names: Vec<String> = cycle
509 .iter()
510 .map(|&i| {
511 self.correlations[i]
512 .id
513 .as_deref()
514 .or(self.correlations[i].name.as_deref())
515 .unwrap_or(&self.correlations[i].title)
516 .to_string()
517 })
518 .collect();
519 return Err(crate::error::EvalError::CorrelationCycle(
520 names.join(" -> "),
521 ));
522 }
523 }
524 Ok(())
525 }
526
527 fn dfs_find_cycle(
529 node: usize,
530 adj: &[Vec<usize>],
531 state: &mut [u8],
532 path: &mut Vec<usize>,
533 ) -> Option<Vec<usize>> {
534 state[node] = 1; path.push(node);
536
537 for &next in &adj[node] {
538 if state[next] == 1 {
539 if let Some(pos) = path.iter().position(|&n| n == next) {
541 let mut cycle = path[pos..].to_vec();
542 cycle.push(next); return Some(cycle);
544 }
545 }
546 if state[next] == 0
547 && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
548 {
549 return Some(cycle);
550 }
551 }
552
553 path.pop();
554 state[node] = 2; None
556 }
557
558 pub fn process_event(&mut self, event: &Event) -> ProcessResult {
564 let ts = match self.extract_event_timestamp(event) {
565 Some(ts) => ts,
566 None => match self.config.timestamp_fallback {
567 TimestampFallback::WallClock => Utc::now().timestamp(),
568 TimestampFallback::Skip => {
569 let all_detections = self.engine.evaluate(event);
571 let detections = self.filter_detections(all_detections);
572 return ProcessResult {
573 detections,
574 correlations: Vec::new(),
575 };
576 }
577 },
578 };
579 self.process_event_at(event, ts)
580 }
581
582 pub fn process_event_at(&mut self, event: &Event, timestamp_secs: i64) -> ProcessResult {
587 let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
588
589 if self.state.len() >= self.config.max_state_entries {
591 self.evict_all(timestamp_secs);
592 }
593
594 let all_detections = self.engine.evaluate(event);
596
597 let mut correlations = Vec::new();
599 self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
600
601 self.chain_correlations(&correlations, timestamp_secs);
603
604 let detections = self.filter_detections(all_detections);
606
607 ProcessResult {
608 detections,
609 correlations,
610 }
611 }
612
613 fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
618 if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
619 all_detections
620 .into_iter()
621 .filter(|m| {
622 let id_match = m
623 .rule_id
624 .as_ref()
625 .is_some_and(|id| self.correlation_only_rules.contains(id));
626 !id_match
627 })
628 .collect()
629 } else {
630 all_detections
631 }
632 }
633
634 fn feed_detections(
636 &mut self,
637 event: &Event,
638 detections: &[MatchResult],
639 ts: i64,
640 out: &mut Vec<CorrelationResult>,
641 ) {
642 let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
645
646 for det in detections {
647 let (rule_id, rule_name) = self.find_rule_identity(det);
650
651 let mut corr_indices = Vec::new();
653 if let Some(ref id) = rule_id
654 && let Some(indices) = self.rule_index.get(id)
655 {
656 corr_indices.extend(indices);
657 }
658 if let Some(ref name) = rule_name
659 && let Some(indices) = self.rule_index.get(name)
660 {
661 corr_indices.extend(indices);
662 }
663
664 corr_indices.sort_unstable();
665 corr_indices.dedup();
666
667 for &corr_idx in &corr_indices {
668 work.push((corr_idx, rule_id.clone(), rule_name.clone()));
669 }
670 }
671
672 for (corr_idx, rule_id, rule_name) in work {
673 self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
674 }
675 }
676
677 fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
679 if let Some(ref match_id) = det.rule_id {
681 for (id, name) in &self.rule_ids {
682 if id.as_deref() == Some(match_id.as_str()) {
683 return (id.clone(), name.clone());
684 }
685 }
686 }
687 (det.rule_id.clone(), None)
689 }
690
691 fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
693 let corr = &self.correlations[corr_idx];
694 corr.event_mode
695 .unwrap_or(self.config.correlation_event_mode)
696 }
697
698 fn resolve_max_events(&self, corr_idx: usize) -> usize {
700 let corr = &self.correlations[corr_idx];
701 corr.max_events
702 .unwrap_or(self.config.max_correlation_events)
703 }
704
705 fn update_correlation(
707 &mut self,
708 corr_idx: usize,
709 event: &Event,
710 ts: i64,
711 rule_id: &Option<String>,
712 rule_name: &Option<String>,
713 out: &mut Vec<CorrelationResult>,
714 ) {
715 let corr = &self.correlations[corr_idx];
719 let corr_type = corr.correlation_type;
720 let timespan = corr.timespan_secs;
721 let level = corr.level;
722 let suppress_secs = corr.suppress_secs.or(self.config.suppress);
723 let action = corr.action.unwrap_or(self.config.action_on_match);
724 let event_mode = self.resolve_event_mode(corr_idx);
725 let max_events = self.resolve_max_events(corr_idx);
726
727 let mut ref_strs: Vec<&str> = Vec::new();
729 if let Some(id) = rule_id.as_deref() {
730 ref_strs.push(id);
731 }
732 if let Some(name) = rule_name.as_deref() {
733 ref_strs.push(name);
734 }
735 let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
736
737 let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
739
740 let state_key = (corr_idx, group_key.clone());
742 let state = self
743 .state
744 .entry(state_key.clone())
745 .or_insert_with(|| WindowState::new_for(corr_type));
746
747 let cutoff = ts - timespan as i64;
749 state.evict(cutoff);
750
751 match corr_type {
753 CorrelationType::EventCount => {
754 state.push_event_count(ts);
755 }
756 CorrelationType::ValueCount => {
757 if let Some(ref field_name) = corr.condition.field
758 && let Some(val) = event.get_field(field_name)
759 && let Some(s) = value_to_string_for_count(val)
760 {
761 state.push_value_count(ts, s);
762 }
763 }
764 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
765 state.push_temporal(ts, rule_ref);
766 }
767 CorrelationType::ValueSum
768 | CorrelationType::ValueAvg
769 | CorrelationType::ValuePercentile
770 | CorrelationType::ValueMedian => {
771 if let Some(ref field_name) = corr.condition.field
772 && let Some(val) = event.get_field(field_name)
773 && let Some(n) = value_to_f64(val)
774 {
775 state.push_numeric(ts, n);
776 }
777 }
778 }
779
780 match event_mode {
782 CorrelationEventMode::Full => {
783 let buf = self
784 .event_buffers
785 .entry(state_key.clone())
786 .or_insert_with(|| EventBuffer::new(max_events));
787 buf.evict(cutoff);
788 buf.push(ts, event.as_value());
789 }
790 CorrelationEventMode::Refs => {
791 let buf = self
792 .event_ref_buffers
793 .entry(state_key.clone())
794 .or_insert_with(|| EventRefBuffer::new(max_events));
795 buf.evict(cutoff);
796 buf.push(ts, event.as_value());
797 }
798 CorrelationEventMode::None => {}
799 }
800
801 let fired = state.check_condition(
803 &corr.condition,
804 corr_type,
805 &corr.rule_refs,
806 corr.extended_expr.as_ref(),
807 );
808
809 if let Some(agg_value) = fired {
810 let alert_key = (corr_idx, group_key.clone());
811
812 let suppressed = if let Some(suppress) = suppress_secs {
814 if let Some(&last_ts) = self.last_alert.get(&alert_key) {
815 (ts - last_ts) < suppress as i64
816 } else {
817 false
818 }
819 } else {
820 false
821 };
822
823 if !suppressed {
824 let (events, event_refs) = match event_mode {
826 CorrelationEventMode::Full => {
827 let stored = self
828 .event_buffers
829 .get(&alert_key)
830 .map(|buf| buf.decompress_all())
831 .unwrap_or_default();
832 (Some(stored), None)
833 }
834 CorrelationEventMode::Refs => {
835 let stored = self
836 .event_ref_buffers
837 .get(&alert_key)
838 .map(|buf| buf.refs())
839 .unwrap_or_default();
840 (None, Some(stored))
841 }
842 CorrelationEventMode::None => (None, None),
843 };
844
845 let corr = &self.correlations[corr_idx];
847 let result = CorrelationResult {
848 rule_title: corr.title.clone(),
849 rule_id: corr.id.clone(),
850 level,
851 tags: corr.tags.clone(),
852 correlation_type: corr_type,
853 group_key: group_key.to_pairs(&corr.group_by),
854 aggregated_value: agg_value,
855 timespan_secs: timespan,
856 events,
857 event_refs,
858 };
859 out.push(result);
860
861 self.last_alert.insert(alert_key.clone(), ts);
863
864 if action == CorrelationAction::Reset {
866 if let Some(state) = self.state.get_mut(&alert_key) {
867 state.clear();
868 }
869 if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
870 buf.clear();
871 }
872 if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
873 buf.clear();
874 }
875 }
876 }
877 }
878 }
879
880 fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
885 const MAX_CHAIN_DEPTH: usize = 10;
886 let mut pending: Vec<CorrelationResult> = fired.to_vec();
887 let mut depth = 0;
888
889 while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
890 depth += 1;
891
892 #[allow(clippy::type_complexity)]
894 let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
895 for result in &pending {
896 if let Some(ref id) = result.rule_id
897 && let Some(indices) = self.rule_index.get(id)
898 {
899 let fired_ref = result
900 .rule_id
901 .as_deref()
902 .unwrap_or(&result.rule_title)
903 .to_string();
904 for &corr_idx in indices {
905 work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
906 }
907 }
908 }
909
910 let mut next_pending = Vec::new();
911 for (corr_idx, group_key_pairs, fired_ref) in work {
912 let corr = &self.correlations[corr_idx];
913 let corr_type = corr.correlation_type;
914 let timespan = corr.timespan_secs;
915 let level = corr.level;
916
917 let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
918 let state_key = (corr_idx, group_key.clone());
919 let state = self
920 .state
921 .entry(state_key)
922 .or_insert_with(|| WindowState::new_for(corr_type));
923
924 let cutoff = ts - timespan as i64;
925 state.evict(cutoff);
926
927 match corr_type {
928 CorrelationType::EventCount => {
929 state.push_event_count(ts);
930 }
931 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
932 state.push_temporal(ts, &fired_ref);
933 }
934 _ => {
935 state.push_event_count(ts);
936 }
937 }
938
939 let fired = state.check_condition(
940 &corr.condition,
941 corr_type,
942 &corr.rule_refs,
943 corr.extended_expr.as_ref(),
944 );
945
946 if let Some(agg_value) = fired {
947 let corr = &self.correlations[corr_idx];
948 next_pending.push(CorrelationResult {
949 rule_title: corr.title.clone(),
950 rule_id: corr.id.clone(),
951 level,
952 tags: corr.tags.clone(),
953 correlation_type: corr_type,
954 group_key: group_key.to_pairs(&corr.group_by),
955 aggregated_value: agg_value,
956 timespan_secs: timespan,
957 events: None,
960 event_refs: None,
961 });
962 }
963 }
964
965 pending = next_pending;
966 }
967
968 if !pending.is_empty() {
969 log::warn!(
970 "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
971 {} pending result(s) were not propagated further. \
972 This may indicate a cycle in correlation references.",
973 pending.len()
974 );
975 }
976 }
977
978 fn extract_event_timestamp(&self, event: &Event) -> Option<i64> {
990 for field_name in &self.config.timestamp_fields {
991 if let Some(val) = event.get_field(field_name)
992 && let Some(ts) = parse_timestamp_value(val)
993 {
994 return Some(ts);
995 }
996 }
997 None
998 }
999
1000 pub fn evict_expired(&mut self, now_secs: i64) {
1006 self.evict_all(now_secs);
1007 }
1008
1009 fn evict_all(&mut self, now_secs: i64) {
1011 let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
1013
1014 self.state.retain(|&(corr_idx, _), state| {
1015 if corr_idx < timespans.len() {
1016 let cutoff = now_secs - timespans[corr_idx] as i64;
1017 state.evict(cutoff);
1018 }
1019 !state.is_empty()
1020 });
1021
1022 self.event_buffers.retain(|&(corr_idx, _), buf| {
1024 if corr_idx < timespans.len() {
1025 let cutoff = now_secs - timespans[corr_idx] as i64;
1026 buf.evict(cutoff);
1027 }
1028 !buf.is_empty()
1029 });
1030 self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
1031 if corr_idx < timespans.len() {
1032 let cutoff = now_secs - timespans[corr_idx] as i64;
1033 buf.evict(cutoff);
1034 }
1035 !buf.is_empty()
1036 });
1037
1038 if self.state.len() >= self.config.max_state_entries {
1042 let target = self.config.max_state_entries * 9 / 10;
1043 let excess = self.state.len() - target;
1044
1045 let mut by_staleness: Vec<_> = self
1047 .state
1048 .iter()
1049 .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1050 .collect();
1051 by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1052
1053 for (key, _) in by_staleness.into_iter().take(excess) {
1055 self.state.remove(&key);
1056 self.last_alert.remove(&key);
1057 self.event_buffers.remove(&key);
1058 self.event_ref_buffers.remove(&key);
1059 }
1060 }
1061
1062 self.last_alert.retain(|key, &mut alert_ts| {
1065 let suppress = if key.0 < self.correlations.len() {
1066 self.correlations[key.0]
1067 .suppress_secs
1068 .or(self.config.suppress)
1069 .unwrap_or(0)
1070 } else {
1071 0
1072 };
1073 (now_secs - alert_ts) < suppress as i64
1074 });
1075 }
1076
1077 pub fn state_count(&self) -> usize {
1079 self.state.len()
1080 }
1081
1082 pub fn detection_rule_count(&self) -> usize {
1084 self.engine.rule_count()
1085 }
1086
1087 pub fn correlation_rule_count(&self) -> usize {
1089 self.correlations.len()
1090 }
1091
1092 pub fn event_buffer_count(&self) -> usize {
1094 self.event_buffers.len()
1095 }
1096
1097 pub fn event_buffer_bytes(&self) -> usize {
1099 self.event_buffers
1100 .values()
1101 .map(|b| b.compressed_bytes())
1102 .sum()
1103 }
1104
1105 pub fn event_ref_buffer_count(&self) -> usize {
1107 self.event_ref_buffers.len()
1108 }
1109
1110 pub fn engine(&self) -> &Engine {
1112 &self.engine
1113 }
1114
1115 pub fn export_state(&self) -> CorrelationSnapshot {
1121 let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1122 for ((idx, gk), ws) in &self.state {
1123 let corr_id = self.correlation_stable_id(*idx);
1124 windows
1125 .entry(corr_id)
1126 .or_default()
1127 .push((gk.clone(), ws.clone()));
1128 }
1129
1130 let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1131 for ((idx, gk), ts) in &self.last_alert {
1132 let corr_id = self.correlation_stable_id(*idx);
1133 last_alert
1134 .entry(corr_id)
1135 .or_default()
1136 .push((gk.clone(), *ts));
1137 }
1138
1139 let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1140 for ((idx, gk), buf) in &self.event_buffers {
1141 let corr_id = self.correlation_stable_id(*idx);
1142 event_buffers
1143 .entry(corr_id)
1144 .or_default()
1145 .push((gk.clone(), buf.clone()));
1146 }
1147
1148 let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1149 HashMap::new();
1150 for ((idx, gk), buf) in &self.event_ref_buffers {
1151 let corr_id = self.correlation_stable_id(*idx);
1152 event_ref_buffers
1153 .entry(corr_id)
1154 .or_default()
1155 .push((gk.clone(), buf.clone()));
1156 }
1157
1158 CorrelationSnapshot {
1159 version: SNAPSHOT_VERSION,
1160 windows,
1161 last_alert,
1162 event_buffers,
1163 event_ref_buffers,
1164 }
1165 }
1166
1167 pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1174 if snapshot.version != SNAPSHOT_VERSION {
1175 return false;
1176 }
1177 let id_to_idx = self.build_id_to_index_map();
1178
1179 for (corr_id, groups) in snapshot.windows {
1180 if let Some(&idx) = id_to_idx.get(&corr_id) {
1181 for (gk, ws) in groups {
1182 self.state.insert((idx, gk), ws);
1183 }
1184 }
1185 }
1186
1187 for (corr_id, groups) in snapshot.last_alert {
1188 if let Some(&idx) = id_to_idx.get(&corr_id) {
1189 for (gk, ts) in groups {
1190 self.last_alert.insert((idx, gk), ts);
1191 }
1192 }
1193 }
1194
1195 for (corr_id, groups) in snapshot.event_buffers {
1196 if let Some(&idx) = id_to_idx.get(&corr_id) {
1197 for (gk, buf) in groups {
1198 self.event_buffers.insert((idx, gk), buf);
1199 }
1200 }
1201 }
1202
1203 for (corr_id, groups) in snapshot.event_ref_buffers {
1204 if let Some(&idx) = id_to_idx.get(&corr_id) {
1205 for (gk, buf) in groups {
1206 self.event_ref_buffers.insert((idx, gk), buf);
1207 }
1208 }
1209 }
1210
1211 true
1212 }
1213
1214 fn correlation_stable_id(&self, idx: usize) -> String {
1216 let corr = &self.correlations[idx];
1217 corr.id
1218 .clone()
1219 .or_else(|| corr.name.clone())
1220 .unwrap_or_else(|| corr.title.clone())
1221 }
1222
1223 fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1225 self.correlations
1226 .iter()
1227 .enumerate()
1228 .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1229 .collect()
1230 }
1231}
1232
1233const SNAPSHOT_VERSION: u32 = 1;
1235
1236#[derive(Debug, Clone, Serialize, serde::Deserialize)]
1243pub struct CorrelationSnapshot {
1244 #[serde(default = "default_snapshot_version")]
1246 pub version: u32,
1247 pub windows: HashMap<String, Vec<(GroupKey, WindowState)>>,
1249 pub last_alert: HashMap<String, Vec<(GroupKey, i64)>>,
1251 pub event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>>,
1253 pub event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>>,
1255}
1256
1257fn default_snapshot_version() -> u32 {
1258 1
1259}
1260
1261impl Default for CorrelationEngine {
1262 fn default() -> Self {
1263 Self::new(CorrelationConfig::default())
1264 }
1265}
1266
1267fn parse_timestamp_value(val: &serde_json::Value) -> Option<i64> {
1273 match val {
1274 serde_json::Value::Number(n) => {
1275 if let Some(i) = n.as_i64() {
1276 Some(normalize_epoch(i))
1277 } else {
1278 n.as_f64().map(|f| normalize_epoch(f as i64))
1279 }
1280 }
1281 serde_json::Value::String(s) => parse_timestamp_string(s),
1282 _ => None,
1283 }
1284}
1285
1286fn normalize_epoch(v: i64) -> i64 {
1289 if v > 1_000_000_000_000 { v / 1000 } else { v }
1290}
1291
1292fn parse_timestamp_string(s: &str) -> Option<i64> {
1294 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1296 return Some(dt.timestamp());
1297 }
1298
1299 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1302 return Some(Utc.from_utc_datetime(&naive).timestamp());
1303 }
1304 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1305 return Some(Utc.from_utc_datetime(&naive).timestamp());
1306 }
1307
1308 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1310 return Some(Utc.from_utc_datetime(&naive).timestamp());
1311 }
1312 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1313 return Some(Utc.from_utc_datetime(&naive).timestamp());
1314 }
1315
1316 None
1317}
1318
1319fn value_to_string_for_count(v: &serde_json::Value) -> Option<String> {
1321 match v {
1322 serde_json::Value::String(s) => Some(s.clone()),
1323 serde_json::Value::Number(n) => Some(n.to_string()),
1324 serde_json::Value::Bool(b) => Some(b.to_string()),
1325 serde_json::Value::Null => Some("null".to_string()),
1326 _ => None,
1327 }
1328}
1329
1330fn value_to_f64(v: &serde_json::Value) -> Option<f64> {
1332 match v {
1333 serde_json::Value::Number(n) => n.as_f64(),
1334 serde_json::Value::String(s) => s.parse().ok(),
1335 _ => None,
1336 }
1337}
1338
1339#[cfg(test)]
1344mod tests {
1345 use super::*;
1346 use rsigma_parser::parse_sigma_yaml;
1347 use serde_json::json;
1348
1349 #[test]
1354 fn test_parse_timestamp_epoch_secs() {
1355 let val = json!(1720612200);
1356 assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1357 }
1358
1359 #[test]
1360 fn test_parse_timestamp_epoch_millis() {
1361 let val = json!(1720612200000i64);
1362 assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1363 }
1364
1365 #[test]
1366 fn test_parse_timestamp_rfc3339() {
1367 let val = json!("2024-07-10T12:30:00Z");
1368 let ts = parse_timestamp_value(&val).unwrap();
1369 assert_eq!(ts, 1720614600);
1370 }
1371
1372 #[test]
1373 fn test_parse_timestamp_naive() {
1374 let val = json!("2024-07-10T12:30:00");
1375 let ts = parse_timestamp_value(&val).unwrap();
1376 assert_eq!(ts, 1720614600);
1377 }
1378
1379 #[test]
1380 fn test_parse_timestamp_with_space() {
1381 let val = json!("2024-07-10 12:30:00");
1382 let ts = parse_timestamp_value(&val).unwrap();
1383 assert_eq!(ts, 1720614600);
1384 }
1385
1386 #[test]
1387 fn test_parse_timestamp_fractional() {
1388 let val = json!("2024-07-10T12:30:00.123Z");
1389 let ts = parse_timestamp_value(&val).unwrap();
1390 assert_eq!(ts, 1720614600);
1391 }
1392
1393 #[test]
1394 fn test_extract_timestamp_from_event() {
1395 let config = CorrelationConfig {
1396 timestamp_fields: vec!["@timestamp".to_string()],
1397 max_state_entries: 100_000,
1398 ..Default::default()
1399 };
1400 let engine = CorrelationEngine::new(config);
1401
1402 let v = json!({"@timestamp": "2024-07-10T12:30:00Z", "data": "test"});
1403 let event = Event::from_value(&v);
1404 let ts = engine.extract_event_timestamp(&event);
1405 assert_eq!(ts, Some(1720614600));
1406 }
1407
1408 #[test]
1409 fn test_extract_timestamp_fallback_fields() {
1410 let config = CorrelationConfig {
1411 timestamp_fields: vec![
1412 "@timestamp".to_string(),
1413 "timestamp".to_string(),
1414 "EventTime".to_string(),
1415 ],
1416 max_state_entries: 100_000,
1417 ..Default::default()
1418 };
1419 let engine = CorrelationEngine::new(config);
1420
1421 let v = json!({"timestamp": 1720613400, "data": "test"});
1423 let event = Event::from_value(&v);
1424 let ts = engine.extract_event_timestamp(&event);
1425 assert_eq!(ts, Some(1720613400));
1426 }
1427
1428 #[test]
1429 fn test_extract_timestamp_returns_none_when_missing() {
1430 let config = CorrelationConfig {
1431 timestamp_fields: vec!["@timestamp".to_string()],
1432 ..Default::default()
1433 };
1434 let engine = CorrelationEngine::new(config);
1435
1436 let v = json!({"data": "no timestamp here"});
1437 let event = Event::from_value(&v);
1438 assert_eq!(engine.extract_event_timestamp(&event), None);
1439 }
1440
1441 #[test]
1442 fn test_timestamp_fallback_skip() {
1443 let yaml = r#"
1444title: test rule
1445id: ts-skip-rule
1446logsource:
1447 product: test
1448detection:
1449 selection:
1450 action: click
1451 condition: selection
1452level: low
1453---
1454title: test correlation
1455correlation:
1456 type: event_count
1457 rules:
1458 - ts-skip-rule
1459 group-by:
1460 - User
1461 timespan: 10s
1462 condition:
1463 gte: 2
1464level: high
1465"#;
1466 let collection = parse_sigma_yaml(yaml).unwrap();
1467 let mut engine = CorrelationEngine::new(CorrelationConfig {
1468 timestamp_fallback: TimestampFallback::Skip,
1469 ..Default::default()
1470 });
1471 engine.add_collection(&collection).unwrap();
1472 assert_eq!(engine.correlation_rule_count(), 1);
1473
1474 let v = json!({"action": "click", "User": "alice"});
1476 let event = Event::from_value(&v);
1477
1478 let r1 = engine.process_event(&event);
1479 assert!(!r1.detections.is_empty(), "detection should still fire");
1480
1481 let r2 = engine.process_event(&event);
1482 assert!(!r2.detections.is_empty(), "detection should still fire");
1483
1484 let r3 = engine.process_event(&event);
1485 assert!(!r3.detections.is_empty(), "detection should still fire");
1486
1487 assert!(r1.correlations.is_empty());
1489 assert!(r2.correlations.is_empty());
1490 assert!(r3.correlations.is_empty());
1491 }
1492
1493 #[test]
1494 fn test_timestamp_fallback_wallclock_default() {
1495 let yaml = r#"
1496title: test rule
1497id: ts-wc-rule
1498logsource:
1499 product: test
1500detection:
1501 selection:
1502 action: click
1503 condition: selection
1504level: low
1505---
1506title: test correlation
1507correlation:
1508 type: event_count
1509 rules:
1510 - ts-wc-rule
1511 group-by:
1512 - User
1513 timespan: 60s
1514 condition:
1515 gte: 2
1516level: high
1517"#;
1518 let collection = parse_sigma_yaml(yaml).unwrap();
1519 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1520 engine.add_collection(&collection).unwrap();
1521 assert_eq!(engine.correlation_rule_count(), 1);
1522
1523 let v = json!({"action": "click", "User": "alice"});
1526 let event = Event::from_value(&v);
1527
1528 let _r1 = engine.process_event(&event);
1529 let _r2 = engine.process_event(&event);
1530 let r3 = engine.process_event(&event);
1531
1532 assert!(
1534 !r3.correlations.is_empty(),
1535 "WallClock fallback should allow correlation"
1536 );
1537 }
1538
1539 #[test]
1544 fn test_event_count_basic() {
1545 let yaml = r#"
1546title: Base Rule
1547id: base-rule-001
1548name: base_rule
1549logsource:
1550 product: windows
1551 category: process_creation
1552detection:
1553 selection:
1554 CommandLine|contains: 'whoami'
1555 condition: selection
1556level: low
1557---
1558title: Multiple Whoami
1559id: corr-001
1560correlation:
1561 type: event_count
1562 rules:
1563 - base-rule-001
1564 group-by:
1565 - User
1566 timespan: 60s
1567 condition:
1568 gte: 3
1569level: high
1570"#;
1571 let collection = parse_sigma_yaml(yaml).unwrap();
1572 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1573 engine.add_collection(&collection).unwrap();
1574
1575 assert_eq!(engine.detection_rule_count(), 1);
1576 assert_eq!(engine.correlation_rule_count(), 1);
1577
1578 let base_ts = 1000i64;
1580 for i in 0..3 {
1581 let v = json!({"CommandLine": "whoami", "User": "admin"});
1582 let event = Event::from_value(&v);
1583 let result = engine.process_event_at(&event, base_ts + i * 10);
1584
1585 assert_eq!(result.detections.len(), 1);
1587
1588 if i < 2 {
1589 assert!(result.correlations.is_empty());
1591 } else {
1592 assert_eq!(result.correlations.len(), 1);
1594 assert_eq!(result.correlations[0].rule_title, "Multiple Whoami");
1595 assert_eq!(result.correlations[0].aggregated_value, 3.0);
1596 }
1597 }
1598 }
1599
1600 #[test]
1601 fn test_event_count_different_groups() {
1602 let yaml = r#"
1603title: Login
1604id: login-001
1605logsource:
1606 category: auth
1607detection:
1608 selection:
1609 EventType: login
1610 condition: selection
1611level: low
1612---
1613title: Many Logins
1614id: corr-login
1615correlation:
1616 type: event_count
1617 rules:
1618 - login-001
1619 group-by:
1620 - User
1621 timespan: 60s
1622 condition:
1623 gte: 3
1624level: high
1625"#;
1626 let collection = parse_sigma_yaml(yaml).unwrap();
1627 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1628 engine.add_collection(&collection).unwrap();
1629
1630 let ts = 1000i64;
1632 for i in 0..2 {
1633 let v = json!({"EventType": "login", "User": "alice"});
1634 let event = Event::from_value(&v);
1635 let r = engine.process_event_at(&event, ts + i);
1636 assert!(r.correlations.is_empty());
1637 }
1638 for i in 0..3 {
1639 let v = json!({"EventType": "login", "User": "bob"});
1640 let event = Event::from_value(&v);
1641 let r = engine.process_event_at(&event, ts + i);
1642 if i == 2 {
1643 assert_eq!(r.correlations.len(), 1);
1644 assert_eq!(
1645 r.correlations[0].group_key,
1646 vec![("User".to_string(), "bob".to_string())]
1647 );
1648 }
1649 }
1650 }
1651
1652 #[test]
1653 fn test_event_count_window_expiry() {
1654 let yaml = r#"
1655title: Base
1656id: base-002
1657logsource:
1658 category: test
1659detection:
1660 selection:
1661 action: click
1662 condition: selection
1663---
1664title: Rapid Clicks
1665id: corr-002
1666correlation:
1667 type: event_count
1668 rules:
1669 - base-002
1670 group-by:
1671 - User
1672 timespan: 10s
1673 condition:
1674 gte: 3
1675level: medium
1676"#;
1677 let collection = parse_sigma_yaml(yaml).unwrap();
1678 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1679 engine.add_collection(&collection).unwrap();
1680
1681 let v = json!({"action": "click", "User": "admin"});
1683 let event = Event::from_value(&v);
1684 engine.process_event_at(&event, 0);
1685 engine.process_event_at(&event, 1);
1686 let r = engine.process_event_at(&event, 15);
1687 assert!(r.correlations.is_empty());
1689 }
1690
1691 #[test]
1696 fn test_value_count() {
1697 let yaml = r#"
1698title: Failed Login
1699id: failed-login-001
1700logsource:
1701 category: auth
1702detection:
1703 selection:
1704 EventType: failed_login
1705 condition: selection
1706level: low
1707---
1708title: Failed Logins From Many Users
1709id: corr-vc-001
1710correlation:
1711 type: value_count
1712 rules:
1713 - failed-login-001
1714 group-by:
1715 - Host
1716 timespan: 60s
1717 condition:
1718 field: User
1719 gte: 3
1720level: high
1721"#;
1722 let collection = parse_sigma_yaml(yaml).unwrap();
1723 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1724 engine.add_collection(&collection).unwrap();
1725
1726 let ts = 1000i64;
1727 for (i, user) in ["alice", "bob", "charlie"].iter().enumerate() {
1729 let v = json!({"EventType": "failed_login", "Host": "srv01", "User": user});
1730 let event = Event::from_value(&v);
1731 let r = engine.process_event_at(&event, ts + i as i64);
1732 if i == 2 {
1733 assert_eq!(r.correlations.len(), 1);
1734 assert_eq!(r.correlations[0].aggregated_value, 3.0);
1735 }
1736 }
1737 }
1738
1739 #[test]
1744 fn test_temporal() {
1745 let yaml = r#"
1746title: Recon A
1747id: recon-a
1748name: recon_a
1749logsource:
1750 category: process
1751detection:
1752 selection:
1753 CommandLine|contains: 'whoami'
1754 condition: selection
1755---
1756title: Recon B
1757id: recon-b
1758name: recon_b
1759logsource:
1760 category: process
1761detection:
1762 selection:
1763 CommandLine|contains: 'ipconfig'
1764 condition: selection
1765---
1766title: Recon Combo
1767id: corr-temporal
1768correlation:
1769 type: temporal
1770 rules:
1771 - recon-a
1772 - recon-b
1773 group-by:
1774 - User
1775 timespan: 60s
1776 condition:
1777 gte: 2
1778level: high
1779"#;
1780 let collection = parse_sigma_yaml(yaml).unwrap();
1781 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1782 engine.add_collection(&collection).unwrap();
1783
1784 let ts = 1000i64;
1785 let v1 = json!({"CommandLine": "whoami", "User": "admin"});
1787 let ev1 = Event::from_value(&v1);
1788 let r1 = engine.process_event_at(&ev1, ts);
1789 assert!(r1.correlations.is_empty());
1790
1791 let v2 = json!({"CommandLine": "ipconfig /all", "User": "admin"});
1793 let ev2 = Event::from_value(&v2);
1794 let r2 = engine.process_event_at(&ev2, ts + 10);
1795 assert_eq!(r2.correlations.len(), 1);
1796 assert_eq!(r2.correlations[0].rule_title, "Recon Combo");
1797 }
1798
1799 #[test]
1804 fn test_temporal_ordered() {
1805 let yaml = r#"
1806title: Failed Login
1807id: failed-001
1808name: failed_login
1809logsource:
1810 category: auth
1811detection:
1812 selection:
1813 EventType: failed_login
1814 condition: selection
1815---
1816title: Success Login
1817id: success-001
1818name: successful_login
1819logsource:
1820 category: auth
1821detection:
1822 selection:
1823 EventType: success_login
1824 condition: selection
1825---
1826title: Brute Force Then Login
1827id: corr-bf
1828correlation:
1829 type: temporal_ordered
1830 rules:
1831 - failed-001
1832 - success-001
1833 group-by:
1834 - User
1835 timespan: 60s
1836 condition:
1837 gte: 2
1838level: critical
1839"#;
1840 let collection = parse_sigma_yaml(yaml).unwrap();
1841 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1842 engine.add_collection(&collection).unwrap();
1843
1844 let ts = 1000i64;
1845 let v1 = json!({"EventType": "failed_login", "User": "admin"});
1847 let ev1 = Event::from_value(&v1);
1848 let r1 = engine.process_event_at(&ev1, ts);
1849 assert!(r1.correlations.is_empty());
1850
1851 let v2 = json!({"EventType": "success_login", "User": "admin"});
1853 let ev2 = Event::from_value(&v2);
1854 let r2 = engine.process_event_at(&ev2, ts + 10);
1855 assert_eq!(r2.correlations.len(), 1);
1856 }
1857
1858 #[test]
1859 fn test_temporal_ordered_wrong_order() {
1860 let yaml = r#"
1861title: Rule A
1862id: rule-a
1863logsource:
1864 category: test
1865detection:
1866 selection:
1867 type: a
1868 condition: selection
1869---
1870title: Rule B
1871id: rule-b
1872logsource:
1873 category: test
1874detection:
1875 selection:
1876 type: b
1877 condition: selection
1878---
1879title: A then B
1880id: corr-ab
1881correlation:
1882 type: temporal_ordered
1883 rules:
1884 - rule-a
1885 - rule-b
1886 group-by:
1887 - User
1888 timespan: 60s
1889 condition:
1890 gte: 2
1891level: high
1892"#;
1893 let collection = parse_sigma_yaml(yaml).unwrap();
1894 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1895 engine.add_collection(&collection).unwrap();
1896
1897 let ts = 1000i64;
1898 let v1 = json!({"type": "b", "User": "admin"});
1900 let ev1 = Event::from_value(&v1);
1901 engine.process_event_at(&ev1, ts);
1902
1903 let v2 = json!({"type": "a", "User": "admin"});
1904 let ev2 = Event::from_value(&v2);
1905 let r2 = engine.process_event_at(&ev2, ts + 10);
1906 assert!(r2.correlations.is_empty());
1907 }
1908
1909 #[test]
1914 fn test_value_sum() {
1915 let yaml = r#"
1916title: Web Access
1917id: web-001
1918logsource:
1919 category: web
1920detection:
1921 selection:
1922 action: upload
1923 condition: selection
1924---
1925title: Large Upload
1926id: corr-sum
1927correlation:
1928 type: value_sum
1929 rules:
1930 - web-001
1931 group-by:
1932 - User
1933 timespan: 60s
1934 condition:
1935 field: bytes_sent
1936 gt: 1000
1937level: high
1938"#;
1939 let collection = parse_sigma_yaml(yaml).unwrap();
1940 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1941 engine.add_collection(&collection).unwrap();
1942
1943 let ts = 1000i64;
1944 let v1 = json!({"action": "upload", "User": "alice", "bytes_sent": 600});
1945 let ev1 = Event::from_value(&v1);
1946 let r1 = engine.process_event_at(&ev1, ts);
1947 assert!(r1.correlations.is_empty());
1948
1949 let v2 = json!({"action": "upload", "User": "alice", "bytes_sent": 500});
1950 let ev2 = Event::from_value(&v2);
1951 let r2 = engine.process_event_at(&ev2, ts + 5);
1952 assert_eq!(r2.correlations.len(), 1);
1953 assert!((r2.correlations[0].aggregated_value - 1100.0).abs() < f64::EPSILON);
1954 }
1955
1956 #[test]
1957 fn test_value_avg() {
1958 let yaml = r#"
1959title: Request
1960id: req-001
1961logsource:
1962 category: web
1963detection:
1964 selection:
1965 type: request
1966 condition: selection
1967---
1968title: High Avg Latency
1969id: corr-avg
1970correlation:
1971 type: value_avg
1972 rules:
1973 - req-001
1974 group-by:
1975 - Service
1976 timespan: 60s
1977 condition:
1978 field: latency_ms
1979 gt: 500
1980level: medium
1981"#;
1982 let collection = parse_sigma_yaml(yaml).unwrap();
1983 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1984 engine.add_collection(&collection).unwrap();
1985
1986 let ts = 1000i64;
1987 for (i, latency) in [400, 600, 800].iter().enumerate() {
1989 let v = json!({"type": "request", "Service": "api", "latency_ms": latency});
1990 let event = Event::from_value(&v);
1991 let r = engine.process_event_at(&event, ts + i as i64);
1992 if i == 2 {
1993 assert_eq!(r.correlations.len(), 1);
1994 assert!((r.correlations[0].aggregated_value - 600.0).abs() < f64::EPSILON);
1995 }
1996 }
1997 }
1998
1999 #[test]
2004 fn test_state_count() {
2005 let yaml = r#"
2006title: Base
2007id: base-sc
2008logsource:
2009 category: test
2010detection:
2011 selection:
2012 action: test
2013 condition: selection
2014---
2015title: Count
2016id: corr-sc
2017correlation:
2018 type: event_count
2019 rules:
2020 - base-sc
2021 group-by:
2022 - User
2023 timespan: 60s
2024 condition:
2025 gte: 100
2026level: low
2027"#;
2028 let collection = parse_sigma_yaml(yaml).unwrap();
2029 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2030 engine.add_collection(&collection).unwrap();
2031
2032 let v = json!({"action": "test", "User": "alice"});
2033 let event = Event::from_value(&v);
2034 engine.process_event_at(&event, 1000);
2035 assert_eq!(engine.state_count(), 1);
2036
2037 let v2 = json!({"action": "test", "User": "bob"});
2038 let event2 = Event::from_value(&v2);
2039 engine.process_event_at(&event2, 1001);
2040 assert_eq!(engine.state_count(), 2);
2041
2042 engine.evict_expired(2000);
2044 assert_eq!(engine.state_count(), 0);
2045 }
2046
2047 #[test]
2052 fn test_generate_flag_default_false() {
2053 let yaml = r#"
2054title: Base
2055id: gen-base
2056logsource:
2057 category: test
2058detection:
2059 selection:
2060 action: test
2061 condition: selection
2062---
2063title: Correlation
2064id: gen-corr
2065correlation:
2066 type: event_count
2067 rules:
2068 - gen-base
2069 group-by:
2070 - User
2071 timespan: 60s
2072 condition:
2073 gte: 1
2074level: high
2075"#;
2076 let collection = parse_sigma_yaml(yaml).unwrap();
2077 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2078 engine.add_collection(&collection).unwrap();
2079
2080 let v = json!({"action": "test", "User": "alice"});
2083 let event = Event::from_value(&v);
2084 let r = engine.process_event_at(&event, 1000);
2085 assert_eq!(r.detections.len(), 1);
2086 assert_eq!(r.correlations.len(), 1);
2087 }
2088
2089 #[test]
2094 fn test_aws_bucket_enumeration() {
2095 let yaml = r#"
2096title: Potential Bucket Enumeration on AWS
2097id: f305fd62-beca-47da-ad95-7690a0620084
2098logsource:
2099 product: aws
2100 service: cloudtrail
2101detection:
2102 selection:
2103 eventSource: "s3.amazonaws.com"
2104 eventName: "ListBuckets"
2105 condition: selection
2106level: low
2107---
2108title: Multiple AWS bucket enumerations
2109id: be246094-01d3-4bba-88de-69e582eba0cc
2110status: experimental
2111correlation:
2112 type: event_count
2113 rules:
2114 - f305fd62-beca-47da-ad95-7690a0620084
2115 group-by:
2116 - userIdentity.arn
2117 timespan: 1h
2118 condition:
2119 gte: 5
2120level: high
2121"#;
2122 let collection = parse_sigma_yaml(yaml).unwrap();
2123 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2124 engine.add_collection(&collection).unwrap();
2125
2126 let base_ts = 1_700_000_000i64;
2127 for i in 0..5 {
2128 let v = json!({
2129 "eventSource": "s3.amazonaws.com",
2130 "eventName": "ListBuckets",
2131 "userIdentity.arn": "arn:aws:iam::123456789:user/attacker"
2132 });
2133 let event = Event::from_value(&v);
2134 let r = engine.process_event_at(&event, base_ts + i * 60);
2135 if i == 4 {
2136 assert_eq!(r.correlations.len(), 1);
2137 assert_eq!(
2138 r.correlations[0].rule_title,
2139 "Multiple AWS bucket enumerations"
2140 );
2141 assert_eq!(r.correlations[0].aggregated_value, 5.0);
2142 }
2143 }
2144 }
2145
2146 #[test]
2151 fn test_chaining_event_count_to_temporal() {
2152 let yaml = r#"
2155title: Single failed login
2156id: failed-login-chain
2157name: failed_login
2158logsource:
2159 category: auth
2160detection:
2161 selection:
2162 EventType: failed_login
2163 condition: selection
2164---
2165title: Successful login
2166id: success-login-chain
2167name: successful_login
2168logsource:
2169 category: auth
2170detection:
2171 selection:
2172 EventType: success_login
2173 condition: selection
2174---
2175title: Multiple failed logins
2176id: many-failed-chain
2177name: multiple_failed_login
2178correlation:
2179 type: event_count
2180 rules:
2181 - failed-login-chain
2182 group-by:
2183 - User
2184 timespan: 60s
2185 condition:
2186 gte: 3
2187level: medium
2188---
2189title: Brute Force Followed by Login
2190id: brute-force-chain
2191correlation:
2192 type: temporal_ordered
2193 rules:
2194 - many-failed-chain
2195 - success-login-chain
2196 group-by:
2197 - User
2198 timespan: 120s
2199 condition:
2200 gte: 2
2201level: critical
2202"#;
2203 let collection = parse_sigma_yaml(yaml).unwrap();
2204 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2205 engine.add_collection(&collection).unwrap();
2206
2207 assert_eq!(engine.detection_rule_count(), 2);
2208 assert_eq!(engine.correlation_rule_count(), 2);
2209
2210 let ts = 1000i64;
2211
2212 for i in 0..3 {
2214 let v = json!({"EventType": "failed_login", "User": "victim"});
2215 let event = Event::from_value(&v);
2216 let r = engine.process_event_at(&event, ts + i);
2217 if i == 2 {
2218 assert!(
2220 r.correlations
2221 .iter()
2222 .any(|c| c.rule_title == "Multiple failed logins"),
2223 "Expected event_count correlation to fire"
2224 );
2225 }
2226 }
2227
2228 let v = json!({"EventType": "success_login", "User": "victim"});
2235 let event = Event::from_value(&v);
2236 let r = engine.process_event_at(&event, ts + 30);
2237
2238 assert_eq!(r.detections.len(), 1);
2240 assert_eq!(r.detections[0].rule_title, "Successful login");
2241 }
2242
2243 #[test]
2248 fn test_field_aliases() {
2249 let yaml = r#"
2250title: Internal Error
2251id: internal-error-001
2252name: internal_error
2253logsource:
2254 category: web
2255detection:
2256 selection:
2257 http.response.status_code: 500
2258 condition: selection
2259---
2260title: New Connection
2261id: new-conn-001
2262name: new_network_connection
2263logsource:
2264 category: network
2265detection:
2266 selection:
2267 event.type: connection
2268 condition: selection
2269---
2270title: Error Then Connection
2271id: corr-alias
2272correlation:
2273 type: temporal
2274 rules:
2275 - internal-error-001
2276 - new-conn-001
2277 group-by:
2278 - internal_ip
2279 timespan: 60s
2280 condition:
2281 gte: 2
2282 aliases:
2283 internal_ip:
2284 internal_error: destination.ip
2285 new_network_connection: source.ip
2286level: high
2287"#;
2288 let collection = parse_sigma_yaml(yaml).unwrap();
2289 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2290 engine.add_collection(&collection).unwrap();
2291
2292 let ts = 1000i64;
2293
2294 let v1 = json!({
2296 "http.response.status_code": 500,
2297 "destination.ip": "10.0.0.5"
2298 });
2299 let ev1 = Event::from_value(&v1);
2300 let r1 = engine.process_event_at(&ev1, ts);
2301 assert_eq!(r1.detections.len(), 1);
2302 assert!(r1.correlations.is_empty());
2303
2304 let v2 = json!({
2306 "event.type": "connection",
2307 "source.ip": "10.0.0.5"
2308 });
2309 let ev2 = Event::from_value(&v2);
2310 let r2 = engine.process_event_at(&ev2, ts + 5);
2311 assert_eq!(r2.detections.len(), 1);
2312 assert_eq!(r2.correlations.len(), 1);
2314 assert_eq!(r2.correlations[0].rule_title, "Error Then Connection");
2315 assert!(
2317 r2.correlations[0]
2318 .group_key
2319 .iter()
2320 .any(|(k, v)| k == "internal_ip" && v == "10.0.0.5")
2321 );
2322 }
2323
2324 #[test]
2329 fn test_value_percentile() {
2330 let yaml = r#"
2331title: Process Creation
2332id: proc-001
2333logsource:
2334 category: process
2335detection:
2336 selection:
2337 type: process_creation
2338 condition: selection
2339---
2340title: Rare Process
2341id: corr-percentile
2342correlation:
2343 type: value_percentile
2344 rules:
2345 - proc-001
2346 group-by:
2347 - ComputerName
2348 timespan: 60s
2349 condition:
2350 field: image
2351 lte: 50
2352level: medium
2353"#;
2354 let collection = parse_sigma_yaml(yaml).unwrap();
2355 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2356 engine.add_collection(&collection).unwrap();
2357
2358 let ts = 1000i64;
2359 for (i, val) in [10.0, 20.0, 30.0, 40.0, 50.0].iter().enumerate() {
2361 let v = json!({"type": "process_creation", "ComputerName": "srv01", "image": val});
2362 let event = Event::from_value(&v);
2363 let _ = engine.process_event_at(&event, ts + i as i64);
2364 }
2365 }
2368
2369 #[test]
2374 fn test_extended_temporal_and_condition() {
2375 let yaml = r#"
2377title: Login Attempt
2378id: login-attempt
2379logsource:
2380 category: auth
2381detection:
2382 selection:
2383 EventType: login_failure
2384 condition: selection
2385---
2386title: Password Change
2387id: password-change
2388logsource:
2389 category: auth
2390detection:
2391 selection:
2392 EventType: password_change
2393 condition: selection
2394---
2395title: Credential Attack
2396correlation:
2397 type: temporal
2398 rules:
2399 - login-attempt
2400 - password-change
2401 group-by:
2402 - User
2403 timespan: 300s
2404 condition: login-attempt and password-change
2405level: high
2406"#;
2407 let collection = parse_sigma_yaml(yaml).unwrap();
2408 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2409 engine.add_collection(&collection).unwrap();
2410
2411 let ts = 1000i64;
2412
2413 let ev1 = json!({"EventType": "login_failure", "User": "alice"});
2415 let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2416 assert!(r1.correlations.is_empty(), "only one rule fired so far");
2417
2418 let ev2 = json!({"EventType": "password_change", "User": "alice"});
2420 let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 10);
2421 assert_eq!(
2422 r2.correlations.len(),
2423 1,
2424 "temporal correlation should fire: both rules matched"
2425 );
2426 assert_eq!(r2.correlations[0].rule_title, "Credential Attack");
2427 }
2428
2429 #[test]
2430 fn test_extended_temporal_or_condition() {
2431 let yaml = r#"
2433title: SSH Login
2434id: ssh-login
2435logsource:
2436 category: auth
2437detection:
2438 selection:
2439 EventType: ssh_login
2440 condition: selection
2441---
2442title: VPN Login
2443id: vpn-login
2444logsource:
2445 category: auth
2446detection:
2447 selection:
2448 EventType: vpn_login
2449 condition: selection
2450---
2451title: Any Remote Access
2452correlation:
2453 type: temporal
2454 rules:
2455 - ssh-login
2456 - vpn-login
2457 group-by:
2458 - User
2459 timespan: 60s
2460 condition: ssh-login or vpn-login
2461level: medium
2462"#;
2463 let collection = parse_sigma_yaml(yaml).unwrap();
2464 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2465 engine.add_collection(&collection).unwrap();
2466
2467 let ev = json!({"EventType": "ssh_login", "User": "bob"});
2469 let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2470 assert_eq!(r.correlations.len(), 1);
2471 assert_eq!(r.correlations[0].rule_title, "Any Remote Access");
2472 }
2473
2474 #[test]
2475 fn test_extended_temporal_partial_and_no_fire() {
2476 let yaml = r#"
2478title: Recon Step 1
2479id: recon-1
2480logsource:
2481 category: process
2482detection:
2483 selection:
2484 CommandLine|contains: 'whoami'
2485 condition: selection
2486---
2487title: Recon Step 2
2488id: recon-2
2489logsource:
2490 category: process
2491detection:
2492 selection:
2493 CommandLine|contains: 'ipconfig'
2494 condition: selection
2495---
2496title: Full Recon
2497correlation:
2498 type: temporal
2499 rules:
2500 - recon-1
2501 - recon-2
2502 group-by:
2503 - Host
2504 timespan: 120s
2505 condition: recon-1 and recon-2
2506level: high
2507"#;
2508 let collection = parse_sigma_yaml(yaml).unwrap();
2509 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2510 engine.add_collection(&collection).unwrap();
2511
2512 let ev = json!({"CommandLine": "whoami", "Host": "srv01"});
2514 let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2515 assert!(r.correlations.is_empty(), "only one of two AND rules fired");
2516
2517 let ev2 = json!({"CommandLine": "ipconfig /all", "Host": "srv01"});
2519 let r2 = engine.process_event_at(&Event::from_value(&ev2), 1010);
2520 assert_eq!(r2.correlations.len(), 1);
2521 assert_eq!(r2.correlations[0].rule_title, "Full Recon");
2522 }
2523
2524 #[test]
2529 fn test_filter_with_correlation() {
2530 let yaml = r#"
2532title: Failed Auth
2533id: failed-auth
2534logsource:
2535 category: auth
2536detection:
2537 selection:
2538 EventType: auth_failure
2539 condition: selection
2540---
2541title: Exclude Service Accounts
2542filter:
2543 rules:
2544 - failed-auth
2545 selection:
2546 User|startswith: 'svc_'
2547 condition: selection
2548---
2549title: Brute Force
2550correlation:
2551 type: event_count
2552 rules:
2553 - failed-auth
2554 group-by:
2555 - User
2556 timespan: 300s
2557 condition:
2558 gte: 3
2559level: critical
2560"#;
2561 let collection = parse_sigma_yaml(yaml).unwrap();
2562 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2563 engine.add_collection(&collection).unwrap();
2564
2565 let ts = 1000i64;
2566
2567 for i in 0..5 {
2569 let ev = json!({"EventType": "auth_failure", "User": "svc_backup"});
2570 let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2571 assert!(
2572 r.correlations.is_empty(),
2573 "service account should be filtered, no correlation"
2574 );
2575 }
2576
2577 for i in 0..2 {
2579 let ev = json!({"EventType": "auth_failure", "User": "alice"});
2580 let r = engine.process_event_at(&Event::from_value(&ev), ts + 10 + i);
2581 assert!(r.correlations.is_empty(), "not yet 3 events");
2582 }
2583
2584 let ev = json!({"EventType": "auth_failure", "User": "alice"});
2586 let r = engine.process_event_at(&Event::from_value(&ev), ts + 12);
2587 assert_eq!(r.correlations.len(), 1);
2588 assert_eq!(r.correlations[0].rule_title, "Brute Force");
2589 }
2590
2591 #[test]
2596 fn test_repeat_rules_in_correlation() {
2597 let yaml = r#"
2599title: File Access A
2600id: file-a
2601logsource:
2602 category: file_access
2603detection:
2604 selection:
2605 FileName|endswith: '.docx'
2606 condition: selection
2607---
2608action: repeat
2609title: File Access B
2610id: file-b
2611detection:
2612 selection:
2613 FileName|endswith: '.xlsx'
2614 condition: selection
2615---
2616title: Mass File Access
2617correlation:
2618 type: event_count
2619 rules:
2620 - file-a
2621 - file-b
2622 group-by:
2623 - User
2624 timespan: 60s
2625 condition:
2626 gte: 3
2627level: high
2628"#;
2629 let collection = parse_sigma_yaml(yaml).unwrap();
2630 assert_eq!(collection.rules.len(), 2);
2631 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2632 engine.add_collection(&collection).unwrap();
2633 assert_eq!(engine.detection_rule_count(), 2);
2634
2635 let ts = 1000i64;
2636 let ev1 = json!({"FileName": "report.docx", "User": "bob"});
2638 engine.process_event_at(&Event::from_value(&ev1), ts);
2639 let ev2 = json!({"FileName": "data.xlsx", "User": "bob"});
2640 engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2641 let ev3 = json!({"FileName": "notes.docx", "User": "bob"});
2642 let r = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2643
2644 assert_eq!(r.correlations.len(), 1);
2645 assert_eq!(r.correlations[0].rule_title, "Mass File Access");
2646 }
2647
2648 #[test]
2653 fn test_expand_modifier_with_correlation() {
2654 let yaml = r#"
2655title: User Temp File
2656id: user-temp
2657logsource:
2658 category: file_access
2659detection:
2660 selection:
2661 FilePath|expand: 'C:\Users\%User%\Temp'
2662 condition: selection
2663---
2664title: Excessive Temp Access
2665correlation:
2666 type: event_count
2667 rules:
2668 - user-temp
2669 group-by:
2670 - User
2671 timespan: 60s
2672 condition:
2673 gte: 2
2674level: medium
2675"#;
2676 let collection = parse_sigma_yaml(yaml).unwrap();
2677 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2678 engine.add_collection(&collection).unwrap();
2679
2680 let ts = 1000i64;
2681 let ev1 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2683 let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2684 assert!(r1.correlations.is_empty());
2685
2686 let ev2 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2687 let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2688 assert_eq!(r2.correlations.len(), 1);
2689 assert_eq!(r2.correlations[0].rule_title, "Excessive Temp Access");
2690
2691 let ev3 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "bob"});
2693 let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2694 assert_eq!(r3.detections.len(), 0);
2696 }
2697
2698 #[test]
2703 fn test_timestamp_modifier_with_correlation() {
2704 let yaml = r#"
2705title: Night Login
2706id: night-login
2707logsource:
2708 category: auth
2709detection:
2710 login:
2711 EventType: login
2712 night:
2713 Timestamp|hour: 3
2714 condition: login and night
2715---
2716title: Frequent Night Logins
2717correlation:
2718 type: event_count
2719 rules:
2720 - night-login
2721 group-by:
2722 - User
2723 timespan: 3600s
2724 condition:
2725 gte: 2
2726level: high
2727"#;
2728 let collection = parse_sigma_yaml(yaml).unwrap();
2729 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2730 engine.add_collection(&collection).unwrap();
2731
2732 let ts = 1000i64;
2733 let ev1 =
2735 json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:10:00Z"});
2736 let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2737 assert_eq!(r1.detections.len(), 1);
2738 assert!(r1.correlations.is_empty());
2739
2740 let ev2 =
2741 json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:45:00Z"});
2742 let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2743 assert_eq!(r2.correlations.len(), 1);
2744 assert_eq!(r2.correlations[0].rule_title, "Frequent Night Logins");
2745
2746 let ev3 = json!({"EventType": "login", "User": "bob", "Timestamp": "2024-01-15T12:00:00Z"});
2748 let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2749 assert!(
2750 r3.detections.is_empty(),
2751 "noon login should not match night rule"
2752 );
2753 }
2754
2755 #[test]
2760 fn test_event_count_range_condition() {
2761 let yaml = r#"
2762title: Login Attempt
2763id: login-attempt-001
2764name: login_attempt
2765logsource:
2766 product: windows
2767detection:
2768 selection:
2769 EventType: login
2770 condition: selection
2771level: low
2772---
2773title: Login Count Range
2774id: corr-range-001
2775correlation:
2776 type: event_count
2777 rules:
2778 - login-attempt-001
2779 group-by:
2780 - User
2781 timespan: 3600s
2782 condition:
2783 gt: 2
2784 lte: 5
2785level: high
2786"#;
2787 let collection = parse_sigma_yaml(yaml).unwrap();
2788 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2789 engine.add_collection(&collection).unwrap();
2790
2791 let ts: i64 = 1_000_000;
2792
2793 for i in 0..2 {
2795 let ev = json!({"EventType": "login", "User": "alice"});
2796 let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2797 assert!(r.correlations.is_empty(), "2 events should not fire (gt:2)");
2798 }
2799
2800 let ev3 = json!({"EventType": "login", "User": "alice"});
2802 let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 3);
2803 assert_eq!(r3.correlations.len(), 1, "3 events: gt:2 AND lte:5");
2804
2805 for i in 4..=5 {
2807 let ev = json!({"EventType": "login", "User": "alice"});
2808 let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2809 assert_eq!(r.correlations.len(), 1, "{i} events still in range");
2810 }
2811
2812 let ev6 = json!({"EventType": "login", "User": "alice"});
2814 let r6 = engine.process_event_at(&Event::from_value(&ev6), ts + 6);
2815 assert!(
2816 r6.correlations.is_empty(),
2817 "6 events exceeds lte:5, should not fire"
2818 );
2819 }
2820
2821 fn suppression_yaml() -> &'static str {
2826 r#"
2827title: Login
2828id: login-base
2829logsource:
2830 category: auth
2831detection:
2832 selection:
2833 EventType: login
2834 condition: selection
2835---
2836title: Many Logins
2837correlation:
2838 type: event_count
2839 rules:
2840 - login-base
2841 group-by:
2842 - User
2843 timeframe: 60s
2844 condition:
2845 gte: 3
2846level: high
2847"#
2848 }
2849
2850 #[test]
2851 fn test_suppression_window() {
2852 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2853 let config = CorrelationConfig {
2854 suppress: Some(10), ..Default::default()
2856 };
2857 let mut engine = CorrelationEngine::new(config);
2858 engine.add_collection(&collection).unwrap();
2859
2860 let ev = json!({"EventType": "login", "User": "alice"});
2861 let ts = 1000;
2862
2863 engine.process_event_at(&Event::from_value(&ev), ts);
2865 engine.process_event_at(&Event::from_value(&ev), ts + 1);
2866 let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
2867 assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
2868
2869 let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
2871 assert!(
2872 r4.correlations.is_empty(),
2873 "should be suppressed within 10s window"
2874 );
2875
2876 let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 9);
2878 assert!(
2879 r5.correlations.is_empty(),
2880 "should be suppressed at ts+9 (< ts+2+10)"
2881 );
2882
2883 let r6 = engine.process_event_at(&Event::from_value(&ev), ts + 13);
2885 assert_eq!(
2886 r6.correlations.len(),
2887 1,
2888 "should fire again after suppress window expires"
2889 );
2890 }
2891
2892 #[test]
2893 fn test_suppression_per_group_key() {
2894 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2895 let config = CorrelationConfig {
2896 suppress: Some(60),
2897 ..Default::default()
2898 };
2899 let mut engine = CorrelationEngine::new(config);
2900 engine.add_collection(&collection).unwrap();
2901
2902 let ts = 1000;
2903
2904 let ev_a = json!({"EventType": "login", "User": "alice"});
2906 engine.process_event_at(&Event::from_value(&ev_a), ts);
2907 engine.process_event_at(&Event::from_value(&ev_a), ts + 1);
2908 let r = engine.process_event_at(&Event::from_value(&ev_a), ts + 2);
2909 assert_eq!(r.correlations.len(), 1, "alice should fire");
2910
2911 let ev_b = json!({"EventType": "login", "User": "bob"});
2913 engine.process_event_at(&Event::from_value(&ev_b), ts + 3);
2914 engine.process_event_at(&Event::from_value(&ev_b), ts + 4);
2915 let r = engine.process_event_at(&Event::from_value(&ev_b), ts + 5);
2916 assert_eq!(r.correlations.len(), 1, "bob should fire independently");
2917
2918 let r = engine.process_event_at(&Event::from_value(&ev_a), ts + 6);
2920 assert!(r.correlations.is_empty(), "alice still suppressed");
2921 }
2922
2923 #[test]
2928 fn test_action_reset() {
2929 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2930 let config = CorrelationConfig {
2931 action_on_match: CorrelationAction::Reset,
2932 ..Default::default()
2933 };
2934 let mut engine = CorrelationEngine::new(config);
2935 engine.add_collection(&collection).unwrap();
2936
2937 let ev = json!({"EventType": "login", "User": "alice"});
2938 let ts = 1000;
2939
2940 engine.process_event_at(&Event::from_value(&ev), ts);
2942 engine.process_event_at(&Event::from_value(&ev), ts + 1);
2943 let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
2944 assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
2945
2946 let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
2948 assert!(r4.correlations.is_empty(), "reset: need 3 more events");
2949
2950 let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 4);
2951 assert!(r5.correlations.is_empty(), "reset: still only 2");
2952
2953 let r6 = engine.process_event_at(&Event::from_value(&ev), ts + 5);
2955 assert_eq!(
2956 r6.correlations.len(),
2957 1,
2958 "should fire again after 3 events post-reset"
2959 );
2960 }
2961
2962 #[test]
2967 fn test_emit_detections_true_by_default() {
2968 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2969 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2970 engine.add_collection(&collection).unwrap();
2971
2972 let ev = json!({"EventType": "login", "User": "alice"});
2973 let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2974 assert_eq!(r.detections.len(), 1, "by default detections are emitted");
2975 }
2976
2977 #[test]
2978 fn test_emit_detections_false_suppresses() {
2979 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2980 let config = CorrelationConfig {
2981 emit_detections: false,
2982 ..Default::default()
2983 };
2984 let mut engine = CorrelationEngine::new(config);
2985 engine.add_collection(&collection).unwrap();
2986
2987 let ev = json!({"EventType": "login", "User": "alice"});
2988 let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2989 assert!(
2990 r.detections.is_empty(),
2991 "detection matches should be suppressed when emit_detections=false"
2992 );
2993 }
2994
2995 #[test]
2996 fn test_generate_true_keeps_detections() {
2997 let yaml = r#"
2999title: Login
3000id: login-gen
3001logsource:
3002 category: auth
3003detection:
3004 selection:
3005 EventType: login
3006 condition: selection
3007---
3008title: Many Logins
3009correlation:
3010 type: event_count
3011 rules:
3012 - login-gen
3013 group-by:
3014 - User
3015 timeframe: 60s
3016 condition:
3017 gte: 3
3018 generate: true
3019level: high
3020"#;
3021 let collection = parse_sigma_yaml(yaml).unwrap();
3022 let config = CorrelationConfig {
3023 emit_detections: false,
3024 ..Default::default()
3025 };
3026 let mut engine = CorrelationEngine::new(config);
3027 engine.add_collection(&collection).unwrap();
3028
3029 let ev = json!({"EventType": "login", "User": "alice"});
3030 let r = engine.process_event_at(&Event::from_value(&ev), 1000);
3031 assert_eq!(
3033 r.detections.len(),
3034 1,
3035 "generate:true keeps detection output"
3036 );
3037 }
3038
3039 #[test]
3044 fn test_suppress_and_reset_combined() {
3045 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3046 let config = CorrelationConfig {
3047 suppress: Some(5),
3048 action_on_match: CorrelationAction::Reset,
3049 ..Default::default()
3050 };
3051 let mut engine = CorrelationEngine::new(config);
3052 engine.add_collection(&collection).unwrap();
3053
3054 let ev = json!({"EventType": "login", "User": "alice"});
3055 let ts = 1000;
3056
3057 engine.process_event_at(&Event::from_value(&ev), ts);
3059 engine.process_event_at(&Event::from_value(&ev), ts + 1);
3060 let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
3061 assert_eq!(r3.correlations.len(), 1, "fires on 3rd event");
3062
3063 engine.process_event_at(&Event::from_value(&ev), ts + 3);
3066 engine.process_event_at(&Event::from_value(&ev), ts + 4);
3067 let r = engine.process_event_at(&Event::from_value(&ev), ts + 5);
3068 assert!(
3069 r.correlations.is_empty(),
3070 "threshold met again but still suppressed"
3071 );
3072
3073 let r = engine.process_event_at(&Event::from_value(&ev), ts + 8);
3077 assert_eq!(
3078 r.correlations.len(),
3079 1,
3080 "fires after suppress expires (accumulated events + new one)"
3081 );
3082
3083 engine.process_event_at(&Event::from_value(&ev), ts + 9);
3086 engine.process_event_at(&Event::from_value(&ev), ts + 10);
3087 let r = engine.process_event_at(&Event::from_value(&ev), ts + 11);
3088 assert!(
3089 r.correlations.is_empty(),
3090 "threshold met but suppress window hasn't expired (ts+11 - ts+8 = 3 < 5)"
3091 );
3092 }
3093
3094 #[test]
3099 fn test_no_suppression_fires_every_event() {
3100 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3101 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3102 engine.add_collection(&collection).unwrap();
3103
3104 let ev = json!({"EventType": "login", "User": "alice"});
3105 let ts = 1000;
3106
3107 engine.process_event_at(&Event::from_value(&ev), ts);
3108 engine.process_event_at(&Event::from_value(&ev), ts + 1);
3109 let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
3110 assert_eq!(r3.correlations.len(), 1);
3111
3112 let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
3114 assert_eq!(
3115 r4.correlations.len(),
3116 1,
3117 "no suppression: fires on every event after threshold"
3118 );
3119
3120 let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 4);
3121 assert_eq!(r5.correlations.len(), 1, "still fires");
3122 }
3123
3124 #[test]
3129 fn test_custom_attr_timestamp_field() {
3130 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3131 let mut attrs = std::collections::HashMap::new();
3132 attrs.insert("rsigma.timestamp_field".to_string(), "time".to_string());
3133 engine.apply_custom_attributes(&attrs);
3134
3135 assert_eq!(
3136 engine.config.timestamp_fields[0], "time",
3137 "rsigma.timestamp_field should be prepended"
3138 );
3139 assert!(
3141 engine
3142 .config
3143 .timestamp_fields
3144 .contains(&"@timestamp".to_string())
3145 );
3146 }
3147
3148 #[test]
3149 fn test_custom_attr_timestamp_field_no_duplicates() {
3150 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3151 let mut attrs = std::collections::HashMap::new();
3152 attrs.insert("rsigma.timestamp_field".to_string(), "time".to_string());
3153 engine.apply_custom_attributes(&attrs);
3155 engine.apply_custom_attributes(&attrs);
3156
3157 let count = engine
3158 .config
3159 .timestamp_fields
3160 .iter()
3161 .filter(|f| *f == "time")
3162 .count();
3163 assert_eq!(count, 1, "should not duplicate timestamp_field entries");
3164 }
3165
3166 #[test]
3167 fn test_custom_attr_suppress() {
3168 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3169 assert!(engine.config.suppress.is_none());
3170
3171 let mut attrs = std::collections::HashMap::new();
3172 attrs.insert("rsigma.suppress".to_string(), "5m".to_string());
3173 engine.apply_custom_attributes(&attrs);
3174
3175 assert_eq!(engine.config.suppress, Some(300));
3176 }
3177
3178 #[test]
3179 fn test_custom_attr_suppress_does_not_override_cli() {
3180 let config = CorrelationConfig {
3181 suppress: Some(60), ..Default::default()
3183 };
3184 let mut engine = CorrelationEngine::new(config);
3185
3186 let mut attrs = std::collections::HashMap::new();
3187 attrs.insert("rsigma.suppress".to_string(), "5m".to_string());
3188 engine.apply_custom_attributes(&attrs);
3189
3190 assert_eq!(
3191 engine.config.suppress,
3192 Some(60),
3193 "CLI suppress should not be overridden by custom attribute"
3194 );
3195 }
3196
3197 #[test]
3198 fn test_custom_attr_action() {
3199 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3200 assert_eq!(engine.config.action_on_match, CorrelationAction::Alert);
3201
3202 let mut attrs = std::collections::HashMap::new();
3203 attrs.insert("rsigma.action".to_string(), "reset".to_string());
3204 engine.apply_custom_attributes(&attrs);
3205
3206 assert_eq!(engine.config.action_on_match, CorrelationAction::Reset);
3207 }
3208
3209 #[test]
3210 fn test_custom_attr_action_does_not_override_cli() {
3211 let config = CorrelationConfig {
3212 action_on_match: CorrelationAction::Reset, ..Default::default()
3214 };
3215 let mut engine = CorrelationEngine::new(config);
3216
3217 let mut attrs = std::collections::HashMap::new();
3218 attrs.insert("rsigma.action".to_string(), "alert".to_string());
3219 engine.apply_custom_attributes(&attrs);
3220
3221 assert_eq!(
3222 engine.config.action_on_match,
3223 CorrelationAction::Reset,
3224 "CLI action should not be overridden by custom attribute"
3225 );
3226 }
3227
3228 #[test]
3229 fn test_custom_attr_timestamp_field_used_for_extraction() {
3230 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3232 let mut config = CorrelationConfig::default();
3233 config.timestamp_fields.insert(0, "event_time".to_string());
3235 let mut engine = CorrelationEngine::new(config);
3236 engine.add_collection(&collection).unwrap();
3237
3238 let ev = json!({
3240 "EventType": "login",
3241 "User": "alice",
3242 "event_time": "2026-02-11T12:00:00Z"
3243 });
3244 let result = engine.process_event(&Event::from_value(&ev));
3245
3246 assert!(!result.detections.is_empty() || result.correlations.is_empty());
3248 let ts = engine
3252 .extract_event_timestamp(&Event::from_value(&ev))
3253 .expect("should extract timestamp");
3254 assert!(
3255 ts > 1_700_000_000 && ts < 1_800_000_000,
3256 "timestamp should be ~2026 epoch, got {ts}"
3257 );
3258 }
3259
3260 #[test]
3265 fn test_correlation_cycle_direct() {
3266 let yaml = r#"
3268title: detection rule
3269id: det-rule
3270logsource:
3271 product: test
3272detection:
3273 selection:
3274 action: click
3275 condition: selection
3276level: low
3277---
3278title: correlation A
3279id: corr-a
3280correlation:
3281 type: event_count
3282 rules:
3283 - corr-b
3284 group-by:
3285 - User
3286 timespan: 5m
3287 condition:
3288 gte: 2
3289level: high
3290---
3291title: correlation B
3292id: corr-b
3293correlation:
3294 type: event_count
3295 rules:
3296 - corr-a
3297 group-by:
3298 - User
3299 timespan: 5m
3300 condition:
3301 gte: 2
3302level: high
3303"#;
3304 let collection = parse_sigma_yaml(yaml).unwrap();
3305 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3306 let result = engine.add_collection(&collection);
3307 assert!(result.is_err(), "should detect direct cycle");
3308 let err = result.unwrap_err().to_string();
3309 assert!(err.contains("cycle"), "error should mention cycle: {err}");
3310 assert!(
3311 err.contains("corr-a") && err.contains("corr-b"),
3312 "error should name both correlations: {err}"
3313 );
3314 }
3315
3316 #[test]
3317 fn test_correlation_cycle_self() {
3318 let yaml = r#"
3320title: detection rule
3321id: det-rule
3322logsource:
3323 product: test
3324detection:
3325 selection:
3326 action: click
3327 condition: selection
3328level: low
3329---
3330title: self-ref correlation
3331id: self-corr
3332correlation:
3333 type: event_count
3334 rules:
3335 - self-corr
3336 group-by:
3337 - User
3338 timespan: 5m
3339 condition:
3340 gte: 2
3341level: high
3342"#;
3343 let collection = parse_sigma_yaml(yaml).unwrap();
3344 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3345 let result = engine.add_collection(&collection);
3346 assert!(result.is_err(), "should detect self-referencing cycle");
3347 let err = result.unwrap_err().to_string();
3348 assert!(err.contains("cycle"), "error should mention cycle: {err}");
3349 assert!(
3350 err.contains("self-corr"),
3351 "error should name the correlation: {err}"
3352 );
3353 }
3354
3355 #[test]
3356 fn test_correlation_no_cycle_valid_chain() {
3357 let yaml = r#"
3359title: detection rule
3360id: det-rule
3361logsource:
3362 product: test
3363detection:
3364 selection:
3365 action: click
3366 condition: selection
3367level: low
3368---
3369title: correlation A
3370id: corr-a
3371correlation:
3372 type: event_count
3373 rules:
3374 - det-rule
3375 group-by:
3376 - User
3377 timespan: 5m
3378 condition:
3379 gte: 2
3380level: high
3381---
3382title: correlation B
3383id: corr-b
3384correlation:
3385 type: event_count
3386 rules:
3387 - corr-a
3388 group-by:
3389 - User
3390 timespan: 5m
3391 condition:
3392 gte: 2
3393level: high
3394"#;
3395 let collection = parse_sigma_yaml(yaml).unwrap();
3396 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3397 let result = engine.add_collection(&collection);
3398 assert!(
3399 result.is_ok(),
3400 "valid chain should not be rejected: {result:?}"
3401 );
3402 }
3403
3404 #[test]
3405 fn test_correlation_cycle_transitive() {
3406 let yaml = r#"
3408title: detection rule
3409id: det-rule
3410logsource:
3411 product: test
3412detection:
3413 selection:
3414 action: click
3415 condition: selection
3416level: low
3417---
3418title: correlation A
3419id: corr-a
3420correlation:
3421 type: event_count
3422 rules:
3423 - corr-c
3424 group-by:
3425 - User
3426 timespan: 5m
3427 condition:
3428 gte: 2
3429level: high
3430---
3431title: correlation B
3432id: corr-b
3433correlation:
3434 type: event_count
3435 rules:
3436 - corr-a
3437 group-by:
3438 - User
3439 timespan: 5m
3440 condition:
3441 gte: 2
3442level: high
3443---
3444title: correlation C
3445id: corr-c
3446correlation:
3447 type: event_count
3448 rules:
3449 - corr-b
3450 group-by:
3451 - User
3452 timespan: 5m
3453 condition:
3454 gte: 2
3455level: high
3456"#;
3457 let collection = parse_sigma_yaml(yaml).unwrap();
3458 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3459 let result = engine.add_collection(&collection);
3460 assert!(result.is_err(), "should detect transitive cycle");
3461 let err = result.unwrap_err().to_string();
3462 assert!(err.contains("cycle"), "error should mention cycle: {err}");
3463 }
3464
3465 #[test]
3470 fn test_correlation_events_disabled_by_default() {
3471 let yaml = r#"
3472title: Login
3473id: login-rule
3474logsource:
3475 category: auth
3476detection:
3477 selection:
3478 EventType: login
3479 condition: selection
3480---
3481title: Many Logins
3482correlation:
3483 type: event_count
3484 rules:
3485 - login-rule
3486 group-by:
3487 - User
3488 timespan: 60s
3489 condition:
3490 gte: 3
3491level: high
3492"#;
3493 let collection = parse_sigma_yaml(yaml).unwrap();
3494 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3495 engine.add_collection(&collection).unwrap();
3496
3497 for i in 0..3 {
3498 let v = json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i});
3499 let event = Event::from_value(&v);
3500 let result = engine.process_event_at(&event, 1000 + i);
3501 if i == 2 {
3502 assert_eq!(result.correlations.len(), 1);
3503 assert!(result.correlations[0].events.is_none());
3505 }
3506 }
3507 }
3508
3509 #[test]
3510 fn test_correlation_events_included_when_enabled() {
3511 let yaml = r#"
3512title: Login
3513id: login-rule
3514logsource:
3515 category: auth
3516detection:
3517 selection:
3518 EventType: login
3519 condition: selection
3520---
3521title: Many Logins
3522correlation:
3523 type: event_count
3524 rules:
3525 - login-rule
3526 group-by:
3527 - User
3528 timespan: 60s
3529 condition:
3530 gte: 3
3531level: high
3532"#;
3533 let collection = parse_sigma_yaml(yaml).unwrap();
3534 let config = CorrelationConfig {
3535 correlation_event_mode: CorrelationEventMode::Full,
3536 max_correlation_events: 10,
3537 ..Default::default()
3538 };
3539 let mut engine = CorrelationEngine::new(config);
3540 engine.add_collection(&collection).unwrap();
3541
3542 let events_sent: Vec<serde_json::Value> = (0..3)
3543 .map(|i| json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i}))
3544 .collect();
3545
3546 let mut corr_result = None;
3547 for (i, ev) in events_sent.iter().enumerate() {
3548 let event = Event::from_value(ev);
3549 let result = engine.process_event_at(&event, 1000 + i as i64);
3550 if !result.correlations.is_empty() {
3551 corr_result = Some(result);
3552 }
3553 }
3554
3555 let result = corr_result.expect("correlation should have fired");
3556 let corr = &result.correlations[0];
3557
3558 let events = corr.events.as_ref().expect("events should be present");
3560 assert_eq!(
3561 events.len(),
3562 3,
3563 "all 3 contributing events should be stored"
3564 );
3565
3566 for (i, event) in events.iter().enumerate() {
3568 assert_eq!(event["EventType"], "login");
3569 assert_eq!(event["User"], "admin");
3570 assert_eq!(event["@timestamp"], 1000 + i as i64);
3571 }
3572 }
3573
3574 #[test]
3575 fn test_correlation_events_max_cap() {
3576 let yaml = r#"
3577title: Login
3578id: login-rule
3579logsource:
3580 category: auth
3581detection:
3582 selection:
3583 EventType: login
3584 condition: selection
3585---
3586title: Many Logins
3587correlation:
3588 type: event_count
3589 rules:
3590 - login-rule
3591 group-by:
3592 - User
3593 timespan: 60s
3594 condition:
3595 gte: 5
3596level: high
3597"#;
3598 let collection = parse_sigma_yaml(yaml).unwrap();
3599 let config = CorrelationConfig {
3600 correlation_event_mode: CorrelationEventMode::Full,
3601 max_correlation_events: 3, ..Default::default()
3603 };
3604 let mut engine = CorrelationEngine::new(config);
3605 engine.add_collection(&collection).unwrap();
3606
3607 let mut corr_result = None;
3608 for i in 0..5 {
3609 let v = json!({"EventType": "login", "User": "admin", "idx": i});
3610 let event = Event::from_value(&v);
3611 let result = engine.process_event_at(&event, 1000 + i);
3612 if !result.correlations.is_empty() {
3613 corr_result = Some(result);
3614 }
3615 }
3616
3617 let result = corr_result.expect("correlation should have fired");
3618 let events = result.correlations[0]
3619 .events
3620 .as_ref()
3621 .expect("events should be present");
3622
3623 assert_eq!(events.len(), 3);
3625 assert_eq!(events[0]["idx"], 2);
3626 assert_eq!(events[1]["idx"], 3);
3627 assert_eq!(events[2]["idx"], 4);
3628 }
3629
3630 #[test]
3631 fn test_correlation_events_with_reset_action() {
3632 let yaml = r#"
3633title: Login
3634id: login-rule
3635logsource:
3636 category: auth
3637detection:
3638 selection:
3639 EventType: login
3640 condition: selection
3641---
3642title: Many Logins
3643correlation:
3644 type: event_count
3645 rules:
3646 - login-rule
3647 group-by:
3648 - User
3649 timespan: 60s
3650 condition:
3651 gte: 2
3652level: high
3653"#;
3654 let collection = parse_sigma_yaml(yaml).unwrap();
3655 let config = CorrelationConfig {
3656 correlation_event_mode: CorrelationEventMode::Full,
3657 action_on_match: CorrelationAction::Reset,
3658 ..Default::default()
3659 };
3660 let mut engine = CorrelationEngine::new(config);
3661 engine.add_collection(&collection).unwrap();
3662
3663 for i in 0..2 {
3665 let v = json!({"EventType": "login", "User": "admin", "round": 1, "idx": i});
3666 let event = Event::from_value(&v);
3667 let result = engine.process_event_at(&event, 1000 + i);
3668 if i == 1 {
3669 assert_eq!(result.correlations.len(), 1);
3670 let events = result.correlations[0].events.as_ref().unwrap();
3671 assert_eq!(events.len(), 2);
3672 }
3673 }
3674
3675 let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 0});
3678 let event = Event::from_value(&v);
3679 let result = engine.process_event_at(&event, 1010);
3680 assert!(
3681 result.correlations.is_empty(),
3682 "should not fire with only 1 event after reset"
3683 );
3684
3685 let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 1});
3686 let event = Event::from_value(&v);
3687 let result = engine.process_event_at(&event, 1011);
3688 assert_eq!(result.correlations.len(), 1);
3689 let events = result.correlations[0].events.as_ref().unwrap();
3690 assert_eq!(events.len(), 2);
3691 assert_eq!(events[0]["round"], 2);
3693 assert_eq!(events[1]["round"], 2);
3694 }
3695
3696 #[test]
3697 fn test_correlation_events_with_set_include() {
3698 let yaml = r#"
3699title: Login
3700id: login-rule
3701logsource:
3702 category: auth
3703detection:
3704 selection:
3705 EventType: login
3706 condition: selection
3707---
3708title: Many Logins
3709correlation:
3710 type: event_count
3711 rules:
3712 - login-rule
3713 group-by:
3714 - User
3715 timespan: 60s
3716 condition:
3717 gte: 2
3718level: high
3719"#;
3720 let collection = parse_sigma_yaml(yaml).unwrap();
3721 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3722 engine.add_collection(&collection).unwrap();
3723
3724 engine.set_correlation_event_mode(CorrelationEventMode::Full);
3726
3727 for i in 0..2 {
3728 let v = json!({"EventType": "login", "User": "admin"});
3729 let event = Event::from_value(&v);
3730 let result = engine.process_event_at(&event, 1000 + i);
3731 if i == 1 {
3732 assert_eq!(result.correlations.len(), 1);
3733 assert!(result.correlations[0].events.is_some());
3734 assert_eq!(result.correlations[0].events.as_ref().unwrap().len(), 2);
3735 }
3736 }
3737 }
3738
3739 #[test]
3740 fn test_correlation_events_eviction_syncs_with_window() {
3741 let yaml = r#"
3742title: Login
3743id: login-rule
3744logsource:
3745 category: auth
3746detection:
3747 selection:
3748 EventType: login
3749 condition: selection
3750---
3751title: Many Logins
3752correlation:
3753 type: event_count
3754 rules:
3755 - login-rule
3756 group-by:
3757 - User
3758 timespan: 10s
3759 condition:
3760 gte: 3
3761level: high
3762"#;
3763 let collection = parse_sigma_yaml(yaml).unwrap();
3764 let config = CorrelationConfig {
3765 correlation_event_mode: CorrelationEventMode::Full,
3766 max_correlation_events: 100,
3767 ..Default::default()
3768 };
3769 let mut engine = CorrelationEngine::new(config);
3770 engine.add_collection(&collection).unwrap();
3771
3772 for i in 0..2 {
3774 let v = json!({"EventType": "login", "User": "admin", "idx": i});
3775 let event = Event::from_value(&v);
3776 engine.process_event_at(&event, 1000 + i);
3777 }
3778
3779 let v = json!({"EventType": "login", "User": "admin", "idx": 2});
3782 let event = Event::from_value(&v);
3783 let result = engine.process_event_at(&event, 1015);
3784 assert!(
3786 result.correlations.is_empty(),
3787 "should not fire — old events evicted"
3788 );
3789
3790 for i in 3..5 {
3792 let v = json!({"EventType": "login", "User": "admin", "idx": i});
3793 let event = Event::from_value(&v);
3794 let result = engine.process_event_at(&event, 1016 + i - 3);
3795 if i == 4 {
3796 assert_eq!(result.correlations.len(), 1);
3797 let events = result.correlations[0].events.as_ref().unwrap();
3798 assert_eq!(events.len(), 3);
3800 for ev in events {
3801 assert!(ev["idx"].as_i64().unwrap() >= 2);
3802 }
3803 }
3804 }
3805 }
3806
3807 #[test]
3808 fn test_event_buffer_monitoring() {
3809 let yaml = r#"
3810title: Login
3811id: login-rule
3812logsource:
3813 category: auth
3814detection:
3815 selection:
3816 EventType: login
3817 condition: selection
3818---
3819title: Many Logins
3820correlation:
3821 type: event_count
3822 rules:
3823 - login-rule
3824 group-by:
3825 - User
3826 timespan: 60s
3827 condition:
3828 gte: 100
3829level: high
3830"#;
3831 let collection = parse_sigma_yaml(yaml).unwrap();
3832 let config = CorrelationConfig {
3833 correlation_event_mode: CorrelationEventMode::Full,
3834 ..Default::default()
3835 };
3836 let mut engine = CorrelationEngine::new(config);
3837 engine.add_collection(&collection).unwrap();
3838
3839 assert_eq!(engine.event_buffer_count(), 0);
3840 assert_eq!(engine.event_buffer_bytes(), 0);
3841
3842 for i in 0..5 {
3844 let v = json!({"EventType": "login", "User": "admin"});
3845 let event = Event::from_value(&v);
3846 engine.process_event_at(&event, 1000 + i);
3847 }
3848
3849 assert_eq!(engine.event_buffer_count(), 1); assert!(engine.event_buffer_bytes() > 0);
3851 }
3852
3853 #[test]
3854 fn test_correlation_refs_mode_basic() {
3855 let yaml = r#"
3856title: Login
3857id: login-rule
3858logsource:
3859 category: auth
3860detection:
3861 selection:
3862 EventType: login
3863 condition: selection
3864---
3865title: Many Logins
3866correlation:
3867 type: event_count
3868 rules:
3869 - login-rule
3870 group-by:
3871 - User
3872 timespan: 60s
3873 condition:
3874 gte: 3
3875level: high
3876"#;
3877 let collection = parse_sigma_yaml(yaml).unwrap();
3878 let config = CorrelationConfig {
3879 correlation_event_mode: CorrelationEventMode::Refs,
3880 max_correlation_events: 10,
3881 ..Default::default()
3882 };
3883 let mut engine = CorrelationEngine::new(config);
3884 engine.add_collection(&collection).unwrap();
3885
3886 let mut corr_result = None;
3887 for i in 0..3 {
3888 let v = json!({"EventType": "login", "User": "admin", "id": format!("evt-{i}"), "@timestamp": 1000 + i});
3889 let event = Event::from_value(&v);
3890 let result = engine.process_event_at(&event, 1000 + i);
3891 if !result.correlations.is_empty() {
3892 corr_result = Some(result.correlations[0].clone());
3893 }
3894 }
3895
3896 let result = corr_result.expect("correlation should have fired");
3897 assert!(
3899 result.events.is_none(),
3900 "Full events should not be included in refs mode"
3901 );
3902 let refs = result
3903 .event_refs
3904 .expect("event_refs should be present in refs mode");
3905 assert_eq!(refs.len(), 3);
3906 assert_eq!(refs[0].timestamp, 1000);
3907 assert_eq!(refs[0].id, Some("evt-0".to_string()));
3908 assert_eq!(refs[1].id, Some("evt-1".to_string()));
3909 assert_eq!(refs[2].id, Some("evt-2".to_string()));
3910 }
3911
3912 #[test]
3913 fn test_correlation_refs_mode_no_id_field() {
3914 let yaml = r#"
3915title: Login
3916id: login-rule
3917logsource:
3918 category: auth
3919detection:
3920 selection:
3921 EventType: login
3922 condition: selection
3923---
3924title: Many Logins
3925correlation:
3926 type: event_count
3927 rules:
3928 - login-rule
3929 group-by:
3930 - User
3931 timespan: 60s
3932 condition:
3933 gte: 2
3934level: high
3935"#;
3936 let collection = parse_sigma_yaml(yaml).unwrap();
3937 let config = CorrelationConfig {
3938 correlation_event_mode: CorrelationEventMode::Refs,
3939 ..Default::default()
3940 };
3941 let mut engine = CorrelationEngine::new(config);
3942 engine.add_collection(&collection).unwrap();
3943
3944 let mut corr_result = None;
3945 for i in 0..2 {
3946 let v = json!({"EventType": "login", "User": "admin"});
3947 let event = Event::from_value(&v);
3948 let result = engine.process_event_at(&event, 1000 + i);
3949 if !result.correlations.is_empty() {
3950 corr_result = Some(result.correlations[0].clone());
3951 }
3952 }
3953
3954 let result = corr_result.expect("correlation should have fired");
3955 let refs = result.event_refs.expect("event_refs should be present");
3956 for r in &refs {
3958 assert_eq!(r.id, None);
3959 }
3960 }
3961
3962 #[test]
3963 fn test_per_correlation_custom_attributes_from_yaml() {
3964 let yaml = r#"
3965title: Login
3966id: login-rule
3967logsource:
3968 category: auth
3969detection:
3970 selection:
3971 EventType: login
3972 condition: selection
3973---
3974title: Many Logins
3975custom_attributes:
3976 rsigma.correlation_event_mode: refs
3977 rsigma.max_correlation_events: "5"
3978correlation:
3979 type: event_count
3980 rules:
3981 - login-rule
3982 group-by:
3983 - User
3984 timespan: 60s
3985 condition:
3986 gte: 3
3987level: high
3988"#;
3989 let collection = parse_sigma_yaml(yaml).unwrap();
3990 let config = CorrelationConfig::default();
3992 let mut engine = CorrelationEngine::new(config);
3993 engine.add_collection(&collection).unwrap();
3994
3995 let mut corr_result = None;
3996 for i in 0..3 {
3997 let v = json!({"EventType": "login", "User": "admin", "id": format!("e{i}")});
3998 let event = Event::from_value(&v);
3999 let result = engine.process_event_at(&event, 1000 + i);
4000 if !result.correlations.is_empty() {
4001 corr_result = Some(result.correlations[0].clone());
4002 }
4003 }
4004
4005 let result = corr_result.expect("correlation should fire with per-correlation refs mode");
4006 assert!(result.events.is_none());
4008 let refs = result
4009 .event_refs
4010 .expect("event_refs via per-correlation override");
4011 assert_eq!(refs.len(), 3);
4012 assert_eq!(refs[0].id, Some("e0".to_string()));
4013 }
4014
4015 #[test]
4016 fn test_per_correlation_custom_attr_suppress_and_action() {
4017 let yaml = r#"
4018title: Login
4019id: login-rule
4020logsource:
4021 category: auth
4022detection:
4023 selection:
4024 EventType: login
4025 condition: selection
4026---
4027title: Many Logins
4028custom_attributes:
4029 rsigma.suppress: 10s
4030 rsigma.action: reset
4031correlation:
4032 type: event_count
4033 rules:
4034 - login-rule
4035 group-by:
4036 - User
4037 timespan: 60s
4038 condition:
4039 gte: 2
4040level: high
4041"#;
4042 let collection = parse_sigma_yaml(yaml).unwrap();
4043 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
4044 engine.add_collection(&collection).unwrap();
4045
4046 assert_eq!(engine.correlations[0].suppress_secs, Some(10));
4048 assert_eq!(
4049 engine.correlations[0].action,
4050 Some(CorrelationAction::Reset)
4051 );
4052 }
4053}