1use std::collections::HashMap;
16
17use chrono::{DateTime, TimeZone, Utc};
18use serde::Serialize;
19
20use rsigma_parser::{CorrelationRule, CorrelationType, Level, SigmaCollection, SigmaRule};
21
22use crate::correlation::{
23 CompiledCorrelation, EventBuffer, EventRef, EventRefBuffer, GroupKey, WindowState,
24 compile_correlation,
25};
26use crate::engine::Engine;
27use crate::error::{EvalError, Result};
28use crate::event::Event;
29use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
30use crate::result::MatchResult;
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
41#[serde(rename_all = "snake_case")]
42pub enum CorrelationAction {
43 #[default]
46 Alert,
47 Reset,
50}
51
52impl std::str::FromStr for CorrelationAction {
53 type Err = String;
54 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
55 match s {
56 "alert" => Ok(CorrelationAction::Alert),
57 "reset" => Ok(CorrelationAction::Reset),
58 _ => Err(format!(
59 "Unknown correlation action: {s} (expected 'alert' or 'reset')"
60 )),
61 }
62 }
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
70#[serde(rename_all = "snake_case")]
71pub enum CorrelationEventMode {
72 #[default]
74 None,
75 Full,
78 Refs,
81}
82
83impl std::str::FromStr for CorrelationEventMode {
84 type Err = String;
85 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
86 match s.to_lowercase().as_str() {
87 "none" | "off" | "false" => Ok(CorrelationEventMode::None),
88 "full" | "true" => Ok(CorrelationEventMode::Full),
89 "refs" | "references" => Ok(CorrelationEventMode::Refs),
90 _ => Err(format!(
91 "Unknown correlation event mode: {s} (expected 'none', 'full', or 'refs')"
92 )),
93 }
94 }
95}
96
97impl std::fmt::Display for CorrelationEventMode {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 match self {
100 CorrelationEventMode::None => write!(f, "none"),
101 CorrelationEventMode::Full => write!(f, "full"),
102 CorrelationEventMode::Refs => write!(f, "refs"),
103 }
104 }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
109pub enum TimestampFallback {
110 #[default]
112 WallClock,
113 Skip,
117}
118
119#[derive(Debug, Clone)]
125pub struct CorrelationConfig {
126 pub timestamp_fields: Vec<String>,
131
132 pub timestamp_fallback: TimestampFallback,
136
137 pub max_state_entries: usize,
142
143 pub suppress: Option<u64>,
151
152 pub action_on_match: CorrelationAction,
156
157 pub emit_detections: bool,
163
164 pub correlation_event_mode: CorrelationEventMode,
172
173 pub max_correlation_events: usize,
179}
180
181impl Default for CorrelationConfig {
182 fn default() -> Self {
183 CorrelationConfig {
184 timestamp_fields: vec![
185 "@timestamp".to_string(),
186 "timestamp".to_string(),
187 "EventTime".to_string(),
188 "TimeCreated".to_string(),
189 "eventTime".to_string(),
190 ],
191 timestamp_fallback: TimestampFallback::default(),
192 max_state_entries: 100_000,
193 suppress: None,
194 action_on_match: CorrelationAction::default(),
195 emit_detections: true,
196 correlation_event_mode: CorrelationEventMode::default(),
197 max_correlation_events: 10,
198 }
199 }
200}
201
202#[derive(Debug, Clone, Serialize)]
208pub struct ProcessResult {
209 pub detections: Vec<MatchResult>,
211 pub correlations: Vec<CorrelationResult>,
213}
214
215#[derive(Debug, Clone, Serialize)]
217pub struct CorrelationResult {
218 pub rule_title: String,
220 pub rule_id: Option<String>,
222 pub level: Option<Level>,
224 pub tags: Vec<String>,
226 pub correlation_type: CorrelationType,
228 pub group_key: Vec<(String, String)>,
230 pub aggregated_value: f64,
232 pub timespan_secs: u64,
234 #[serde(skip_serializing_if = "Option::is_none")]
239 pub events: Option<Vec<serde_json::Value>>,
240 #[serde(skip_serializing_if = "Option::is_none")]
244 pub event_refs: Option<Vec<EventRef>>,
245}
246
247pub struct CorrelationEngine {
256 engine: Engine,
258 correlations: Vec<CompiledCorrelation>,
260 rule_index: HashMap<String, Vec<usize>>,
263 rule_ids: Vec<(Option<String>, Option<String>)>,
266 state: HashMap<(usize, GroupKey), WindowState>,
268 last_alert: HashMap<(usize, GroupKey), i64>,
270 event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
272 event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
274 correlation_only_rules: std::collections::HashSet<String>,
278 config: CorrelationConfig,
280 pipelines: Vec<Pipeline>,
282}
283
284impl CorrelationEngine {
285 pub fn new(config: CorrelationConfig) -> Self {
287 CorrelationEngine {
288 engine: Engine::new(),
289 correlations: Vec::new(),
290 rule_index: HashMap::new(),
291 rule_ids: Vec::new(),
292 state: HashMap::new(),
293 last_alert: HashMap::new(),
294 event_buffers: HashMap::new(),
295 event_ref_buffers: HashMap::new(),
296 correlation_only_rules: std::collections::HashSet::new(),
297 config,
298 pipelines: Vec::new(),
299 }
300 }
301
302 pub fn add_pipeline(&mut self, pipeline: Pipeline) {
306 self.pipelines.push(pipeline);
307 self.pipelines.sort_by_key(|p| p.priority);
308 }
309
310 pub fn set_include_event(&mut self, include: bool) {
312 self.engine.set_include_event(include);
313 }
314
315 pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
321 self.config.correlation_event_mode = mode;
322 }
323
324 pub fn set_max_correlation_events(&mut self, max: usize) {
327 self.config.max_correlation_events = max;
328 }
329
330 pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
336 if self.pipelines.is_empty() {
337 self.apply_custom_attributes(&rule.custom_attributes);
338 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
339 self.engine.add_rule(rule)?;
340 } else {
341 let mut transformed = rule.clone();
342 apply_pipelines(&self.pipelines, &mut transformed)?;
343 self.apply_custom_attributes(&transformed.custom_attributes);
344 self.rule_ids
345 .push((transformed.id.clone(), transformed.name.clone()));
346 let compiled = crate::compiler::compile_rule(&transformed)?;
348 self.engine.add_compiled_rule(compiled);
349 }
350 Ok(())
351 }
352
353 fn apply_custom_attributes(&mut self, attrs: &std::collections::HashMap<String, String>) {
367 if let Some(field) = attrs.get("rsigma.timestamp_field")
369 && !self.config.timestamp_fields.contains(field)
370 {
371 self.config.timestamp_fields.insert(0, field.clone());
372 }
373
374 if let Some(val) = attrs.get("rsigma.suppress")
376 && self.config.suppress.is_none()
377 && let Ok(ts) = rsigma_parser::Timespan::parse(val)
378 {
379 self.config.suppress = Some(ts.seconds);
380 }
381
382 if let Some(val) = attrs.get("rsigma.action")
384 && self.config.action_on_match == CorrelationAction::Alert
385 && let Ok(a) = val.parse::<CorrelationAction>()
386 {
387 self.config.action_on_match = a;
388 }
389 }
390
391 pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
393 let owned;
394 let effective = if self.pipelines.is_empty() {
395 corr
396 } else {
397 owned = {
398 let mut c = corr.clone();
399 apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
400 c
401 };
402 &owned
403 };
404
405 self.apply_custom_attributes(&effective.custom_attributes);
408
409 let compiled = compile_correlation(effective)?;
410 let idx = self.correlations.len();
411
412 for rule_ref in &compiled.rule_refs {
414 self.rule_index
415 .entry(rule_ref.clone())
416 .or_default()
417 .push(idx);
418 }
419
420 if !compiled.generate {
422 for rule_ref in &compiled.rule_refs {
423 self.correlation_only_rules.insert(rule_ref.clone());
424 }
425 }
426
427 self.correlations.push(compiled);
428 Ok(())
429 }
430
431 pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
436 for rule in &collection.rules {
437 self.add_rule(rule)?;
438 }
439 for filter in &collection.filters {
441 self.engine.apply_filter(filter)?;
442 }
443 for corr in &collection.correlations {
444 self.add_correlation(corr)?;
445 }
446 self.validate_rule_refs()?;
447 self.detect_correlation_cycles()?;
448 Ok(())
449 }
450
451 fn validate_rule_refs(&self) -> Result<()> {
454 let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
455
456 for (id, name) in &self.rule_ids {
457 if let Some(id) = id {
458 known.insert(id.as_str());
459 }
460 if let Some(name) = name {
461 known.insert(name.as_str());
462 }
463 }
464 for corr in &self.correlations {
465 if let Some(ref id) = corr.id {
466 known.insert(id.as_str());
467 }
468 if let Some(ref name) = corr.name {
469 known.insert(name.as_str());
470 }
471 }
472
473 for corr in &self.correlations {
474 for rule_ref in &corr.rule_refs {
475 if !known.contains(rule_ref.as_str()) {
476 return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
477 }
478 }
479 }
480 Ok(())
481 }
482
483 fn detect_correlation_cycles(&self) -> Result<()> {
491 let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
493 for (idx, corr) in self.correlations.iter().enumerate() {
494 if let Some(ref id) = corr.id {
495 corr_identifiers.insert(id.as_str(), idx);
496 }
497 if let Some(ref name) = corr.name {
498 corr_identifiers.insert(name.as_str(), idx);
499 }
500 }
501
502 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
504 for (idx, corr) in self.correlations.iter().enumerate() {
505 for rule_ref in &corr.rule_refs {
506 if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
507 adj[idx].push(target_idx);
508 }
509 }
510 }
511
512 let mut state = vec![0u8; self.correlations.len()]; let mut path: Vec<usize> = Vec::new();
515
516 for start in 0..self.correlations.len() {
517 if state[start] == 0
518 && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
519 {
520 let names: Vec<String> = cycle
521 .iter()
522 .map(|&i| {
523 self.correlations[i]
524 .id
525 .as_deref()
526 .or(self.correlations[i].name.as_deref())
527 .unwrap_or(&self.correlations[i].title)
528 .to_string()
529 })
530 .collect();
531 return Err(crate::error::EvalError::CorrelationCycle(
532 names.join(" -> "),
533 ));
534 }
535 }
536 Ok(())
537 }
538
539 fn dfs_find_cycle(
541 node: usize,
542 adj: &[Vec<usize>],
543 state: &mut [u8],
544 path: &mut Vec<usize>,
545 ) -> Option<Vec<usize>> {
546 state[node] = 1; path.push(node);
548
549 for &next in &adj[node] {
550 if state[next] == 1 {
551 if let Some(pos) = path.iter().position(|&n| n == next) {
553 let mut cycle = path[pos..].to_vec();
554 cycle.push(next); return Some(cycle);
556 }
557 }
558 if state[next] == 0
559 && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
560 {
561 return Some(cycle);
562 }
563 }
564
565 path.pop();
566 state[node] = 2; None
568 }
569
570 pub fn process_event(&mut self, event: &Event) -> ProcessResult {
576 let all_detections = self.engine.evaluate(event);
577
578 let ts = match self.extract_event_timestamp(event) {
579 Some(ts) => ts,
580 None => match self.config.timestamp_fallback {
581 TimestampFallback::WallClock => Utc::now().timestamp(),
582 TimestampFallback::Skip => {
583 let detections = self.filter_detections(all_detections);
585 return ProcessResult {
586 detections,
587 correlations: Vec::new(),
588 };
589 }
590 },
591 };
592 self.process_with_detections(event, all_detections, ts)
593 }
594
595 pub fn process_event_at(&mut self, event: &Event, timestamp_secs: i64) -> ProcessResult {
600 let all_detections = self.engine.evaluate(event);
601 self.process_with_detections(event, all_detections, timestamp_secs)
602 }
603
604 pub fn process_with_detections(
610 &mut self,
611 event: &Event,
612 all_detections: Vec<MatchResult>,
613 timestamp_secs: i64,
614 ) -> ProcessResult {
615 let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
616
617 if self.state.len() >= self.config.max_state_entries {
619 self.evict_all(timestamp_secs);
620 }
621
622 let mut correlations = Vec::new();
624 self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
625
626 self.chain_correlations(&correlations, timestamp_secs);
628
629 let detections = self.filter_detections(all_detections);
631
632 ProcessResult {
633 detections,
634 correlations,
635 }
636 }
637
638 pub fn evaluate(&self, event: &Event) -> Vec<MatchResult> {
644 self.engine.evaluate(event)
645 }
646
647 pub fn process_batch<'a>(&mut self, events: &[&'a Event<'a>]) -> Vec<ProcessResult> {
655 let engine = &self.engine;
658 let ts_fields = &self.config.timestamp_fields;
659
660 let batch_results: Vec<(Vec<MatchResult>, Option<i64>)> = {
661 #[cfg(feature = "parallel")]
662 {
663 use rayon::prelude::*;
664 events
665 .par_iter()
666 .map(|e| {
667 let detections = engine.evaluate(e);
668 let ts = extract_event_ts(e, ts_fields);
669 (detections, ts)
670 })
671 .collect()
672 }
673 #[cfg(not(feature = "parallel"))]
674 {
675 events
676 .iter()
677 .map(|e| {
678 let detections = engine.evaluate(e);
679 let ts = extract_event_ts(e, ts_fields);
680 (detections, ts)
681 })
682 .collect()
683 }
684 };
685
686 let mut results = Vec::with_capacity(events.len());
688 for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
689 match ts_opt {
690 Some(ts) => {
691 results.push(self.process_with_detections(event, detections, ts));
692 }
693 None => match self.config.timestamp_fallback {
694 TimestampFallback::WallClock => {
695 let ts = Utc::now().timestamp();
696 results.push(self.process_with_detections(event, detections, ts));
697 }
698 TimestampFallback::Skip => {
699 let detections = self.filter_detections(detections);
701 results.push(ProcessResult {
702 detections,
703 correlations: Vec::new(),
704 });
705 }
706 },
707 }
708 }
709 results
710 }
711
712 fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
717 if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
718 all_detections
719 .into_iter()
720 .filter(|m| {
721 let id_match = m
722 .rule_id
723 .as_ref()
724 .is_some_and(|id| self.correlation_only_rules.contains(id));
725 !id_match
726 })
727 .collect()
728 } else {
729 all_detections
730 }
731 }
732
733 fn feed_detections(
735 &mut self,
736 event: &Event,
737 detections: &[MatchResult],
738 ts: i64,
739 out: &mut Vec<CorrelationResult>,
740 ) {
741 let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
744
745 for det in detections {
746 let (rule_id, rule_name) = self.find_rule_identity(det);
749
750 let mut corr_indices = Vec::new();
752 if let Some(ref id) = rule_id
753 && let Some(indices) = self.rule_index.get(id)
754 {
755 corr_indices.extend(indices);
756 }
757 if let Some(ref name) = rule_name
758 && let Some(indices) = self.rule_index.get(name)
759 {
760 corr_indices.extend(indices);
761 }
762
763 corr_indices.sort_unstable();
764 corr_indices.dedup();
765
766 for &corr_idx in &corr_indices {
767 work.push((corr_idx, rule_id.clone(), rule_name.clone()));
768 }
769 }
770
771 for (corr_idx, rule_id, rule_name) in work {
772 self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
773 }
774 }
775
776 fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
778 if let Some(ref match_id) = det.rule_id {
780 for (id, name) in &self.rule_ids {
781 if id.as_deref() == Some(match_id.as_str()) {
782 return (id.clone(), name.clone());
783 }
784 }
785 }
786 (det.rule_id.clone(), None)
788 }
789
790 fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
792 let corr = &self.correlations[corr_idx];
793 corr.event_mode
794 .unwrap_or(self.config.correlation_event_mode)
795 }
796
797 fn resolve_max_events(&self, corr_idx: usize) -> usize {
799 let corr = &self.correlations[corr_idx];
800 corr.max_events
801 .unwrap_or(self.config.max_correlation_events)
802 }
803
804 fn update_correlation(
806 &mut self,
807 corr_idx: usize,
808 event: &Event,
809 ts: i64,
810 rule_id: &Option<String>,
811 rule_name: &Option<String>,
812 out: &mut Vec<CorrelationResult>,
813 ) {
814 let corr = &self.correlations[corr_idx];
818 let corr_type = corr.correlation_type;
819 let timespan = corr.timespan_secs;
820 let level = corr.level;
821 let suppress_secs = corr.suppress_secs.or(self.config.suppress);
822 let action = corr.action.unwrap_or(self.config.action_on_match);
823 let event_mode = self.resolve_event_mode(corr_idx);
824 let max_events = self.resolve_max_events(corr_idx);
825
826 let mut ref_strs: Vec<&str> = Vec::new();
828 if let Some(id) = rule_id.as_deref() {
829 ref_strs.push(id);
830 }
831 if let Some(name) = rule_name.as_deref() {
832 ref_strs.push(name);
833 }
834 let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
835
836 let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
838
839 let state_key = (corr_idx, group_key.clone());
841 let state = self
842 .state
843 .entry(state_key.clone())
844 .or_insert_with(|| WindowState::new_for(corr_type));
845
846 let cutoff = ts - timespan as i64;
848 state.evict(cutoff);
849
850 match corr_type {
852 CorrelationType::EventCount => {
853 state.push_event_count(ts);
854 }
855 CorrelationType::ValueCount => {
856 if let Some(ref field_name) = corr.condition.field
857 && let Some(val) = event.get_field(field_name)
858 && let Some(s) = value_to_string_for_count(val)
859 {
860 state.push_value_count(ts, s);
861 }
862 }
863 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
864 state.push_temporal(ts, rule_ref);
865 }
866 CorrelationType::ValueSum
867 | CorrelationType::ValueAvg
868 | CorrelationType::ValuePercentile
869 | CorrelationType::ValueMedian => {
870 if let Some(ref field_name) = corr.condition.field
871 && let Some(val) = event.get_field(field_name)
872 && let Some(n) = value_to_f64(val)
873 {
874 state.push_numeric(ts, n);
875 }
876 }
877 }
878
879 match event_mode {
881 CorrelationEventMode::Full => {
882 let buf = self
883 .event_buffers
884 .entry(state_key.clone())
885 .or_insert_with(|| EventBuffer::new(max_events));
886 buf.evict(cutoff);
887 buf.push(ts, event.as_value());
888 }
889 CorrelationEventMode::Refs => {
890 let buf = self
891 .event_ref_buffers
892 .entry(state_key.clone())
893 .or_insert_with(|| EventRefBuffer::new(max_events));
894 buf.evict(cutoff);
895 buf.push(ts, event.as_value());
896 }
897 CorrelationEventMode::None => {}
898 }
899
900 let fired = state.check_condition(
902 &corr.condition,
903 corr_type,
904 &corr.rule_refs,
905 corr.extended_expr.as_ref(),
906 );
907
908 if let Some(agg_value) = fired {
909 let alert_key = (corr_idx, group_key.clone());
910
911 let suppressed = if let Some(suppress) = suppress_secs {
913 if let Some(&last_ts) = self.last_alert.get(&alert_key) {
914 (ts - last_ts) < suppress as i64
915 } else {
916 false
917 }
918 } else {
919 false
920 };
921
922 if !suppressed {
923 let (events, event_refs) = match event_mode {
925 CorrelationEventMode::Full => {
926 let stored = self
927 .event_buffers
928 .get(&alert_key)
929 .map(|buf| buf.decompress_all())
930 .unwrap_or_default();
931 (Some(stored), None)
932 }
933 CorrelationEventMode::Refs => {
934 let stored = self
935 .event_ref_buffers
936 .get(&alert_key)
937 .map(|buf| buf.refs())
938 .unwrap_or_default();
939 (None, Some(stored))
940 }
941 CorrelationEventMode::None => (None, None),
942 };
943
944 let corr = &self.correlations[corr_idx];
946 let result = CorrelationResult {
947 rule_title: corr.title.clone(),
948 rule_id: corr.id.clone(),
949 level,
950 tags: corr.tags.clone(),
951 correlation_type: corr_type,
952 group_key: group_key.to_pairs(&corr.group_by),
953 aggregated_value: agg_value,
954 timespan_secs: timespan,
955 events,
956 event_refs,
957 };
958 out.push(result);
959
960 self.last_alert.insert(alert_key.clone(), ts);
962
963 if action == CorrelationAction::Reset {
965 if let Some(state) = self.state.get_mut(&alert_key) {
966 state.clear();
967 }
968 if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
969 buf.clear();
970 }
971 if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
972 buf.clear();
973 }
974 }
975 }
976 }
977 }
978
979 fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
984 const MAX_CHAIN_DEPTH: usize = 10;
985 let mut pending: Vec<CorrelationResult> = fired.to_vec();
986 let mut depth = 0;
987
988 while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
989 depth += 1;
990
991 #[allow(clippy::type_complexity)]
993 let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
994 for result in &pending {
995 if let Some(ref id) = result.rule_id
996 && let Some(indices) = self.rule_index.get(id)
997 {
998 let fired_ref = result
999 .rule_id
1000 .as_deref()
1001 .unwrap_or(&result.rule_title)
1002 .to_string();
1003 for &corr_idx in indices {
1004 work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
1005 }
1006 }
1007 }
1008
1009 let mut next_pending = Vec::new();
1010 for (corr_idx, group_key_pairs, fired_ref) in work {
1011 let corr = &self.correlations[corr_idx];
1012 let corr_type = corr.correlation_type;
1013 let timespan = corr.timespan_secs;
1014 let level = corr.level;
1015
1016 let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
1017 let state_key = (corr_idx, group_key.clone());
1018 let state = self
1019 .state
1020 .entry(state_key)
1021 .or_insert_with(|| WindowState::new_for(corr_type));
1022
1023 let cutoff = ts - timespan as i64;
1024 state.evict(cutoff);
1025
1026 match corr_type {
1027 CorrelationType::EventCount => {
1028 state.push_event_count(ts);
1029 }
1030 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
1031 state.push_temporal(ts, &fired_ref);
1032 }
1033 _ => {
1034 state.push_event_count(ts);
1035 }
1036 }
1037
1038 let fired = state.check_condition(
1039 &corr.condition,
1040 corr_type,
1041 &corr.rule_refs,
1042 corr.extended_expr.as_ref(),
1043 );
1044
1045 if let Some(agg_value) = fired {
1046 let corr = &self.correlations[corr_idx];
1047 next_pending.push(CorrelationResult {
1048 rule_title: corr.title.clone(),
1049 rule_id: corr.id.clone(),
1050 level,
1051 tags: corr.tags.clone(),
1052 correlation_type: corr_type,
1053 group_key: group_key.to_pairs(&corr.group_by),
1054 aggregated_value: agg_value,
1055 timespan_secs: timespan,
1056 events: None,
1059 event_refs: None,
1060 });
1061 }
1062 }
1063
1064 pending = next_pending;
1065 }
1066
1067 if !pending.is_empty() {
1068 log::warn!(
1069 "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
1070 {} pending result(s) were not propagated further. \
1071 This may indicate a cycle in correlation references.",
1072 pending.len()
1073 );
1074 }
1075 }
1076
1077 fn extract_event_timestamp(&self, event: &Event) -> Option<i64> {
1089 for field_name in &self.config.timestamp_fields {
1090 if let Some(val) = event.get_field(field_name)
1091 && let Some(ts) = parse_timestamp_value(val)
1092 {
1093 return Some(ts);
1094 }
1095 }
1096 None
1097 }
1098
1099 pub fn evict_expired(&mut self, now_secs: i64) {
1105 self.evict_all(now_secs);
1106 }
1107
1108 fn evict_all(&mut self, now_secs: i64) {
1110 let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
1112
1113 self.state.retain(|&(corr_idx, _), state| {
1114 if corr_idx < timespans.len() {
1115 let cutoff = now_secs - timespans[corr_idx] as i64;
1116 state.evict(cutoff);
1117 }
1118 !state.is_empty()
1119 });
1120
1121 self.event_buffers.retain(|&(corr_idx, _), buf| {
1123 if corr_idx < timespans.len() {
1124 let cutoff = now_secs - timespans[corr_idx] as i64;
1125 buf.evict(cutoff);
1126 }
1127 !buf.is_empty()
1128 });
1129 self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
1130 if corr_idx < timespans.len() {
1131 let cutoff = now_secs - timespans[corr_idx] as i64;
1132 buf.evict(cutoff);
1133 }
1134 !buf.is_empty()
1135 });
1136
1137 if self.state.len() >= self.config.max_state_entries {
1141 let target = self.config.max_state_entries * 9 / 10;
1142 let excess = self.state.len() - target;
1143
1144 let mut by_staleness: Vec<_> = self
1146 .state
1147 .iter()
1148 .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1149 .collect();
1150 by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1151
1152 for (key, _) in by_staleness.into_iter().take(excess) {
1154 self.state.remove(&key);
1155 self.last_alert.remove(&key);
1156 self.event_buffers.remove(&key);
1157 self.event_ref_buffers.remove(&key);
1158 }
1159 }
1160
1161 self.last_alert.retain(|key, &mut alert_ts| {
1164 let suppress = if key.0 < self.correlations.len() {
1165 self.correlations[key.0]
1166 .suppress_secs
1167 .or(self.config.suppress)
1168 .unwrap_or(0)
1169 } else {
1170 0
1171 };
1172 (now_secs - alert_ts) < suppress as i64
1173 });
1174 }
1175
1176 pub fn state_count(&self) -> usize {
1178 self.state.len()
1179 }
1180
1181 pub fn detection_rule_count(&self) -> usize {
1183 self.engine.rule_count()
1184 }
1185
1186 pub fn correlation_rule_count(&self) -> usize {
1188 self.correlations.len()
1189 }
1190
1191 pub fn event_buffer_count(&self) -> usize {
1193 self.event_buffers.len()
1194 }
1195
1196 pub fn event_buffer_bytes(&self) -> usize {
1198 self.event_buffers
1199 .values()
1200 .map(|b| b.compressed_bytes())
1201 .sum()
1202 }
1203
1204 pub fn event_ref_buffer_count(&self) -> usize {
1206 self.event_ref_buffers.len()
1207 }
1208
1209 pub fn engine(&self) -> &Engine {
1211 &self.engine
1212 }
1213
1214 pub fn export_state(&self) -> CorrelationSnapshot {
1220 let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1221 for ((idx, gk), ws) in &self.state {
1222 let corr_id = self.correlation_stable_id(*idx);
1223 windows
1224 .entry(corr_id)
1225 .or_default()
1226 .push((gk.clone(), ws.clone()));
1227 }
1228
1229 let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1230 for ((idx, gk), ts) in &self.last_alert {
1231 let corr_id = self.correlation_stable_id(*idx);
1232 last_alert
1233 .entry(corr_id)
1234 .or_default()
1235 .push((gk.clone(), *ts));
1236 }
1237
1238 let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1239 for ((idx, gk), buf) in &self.event_buffers {
1240 let corr_id = self.correlation_stable_id(*idx);
1241 event_buffers
1242 .entry(corr_id)
1243 .or_default()
1244 .push((gk.clone(), buf.clone()));
1245 }
1246
1247 let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1248 HashMap::new();
1249 for ((idx, gk), buf) in &self.event_ref_buffers {
1250 let corr_id = self.correlation_stable_id(*idx);
1251 event_ref_buffers
1252 .entry(corr_id)
1253 .or_default()
1254 .push((gk.clone(), buf.clone()));
1255 }
1256
1257 CorrelationSnapshot {
1258 version: SNAPSHOT_VERSION,
1259 windows,
1260 last_alert,
1261 event_buffers,
1262 event_ref_buffers,
1263 }
1264 }
1265
1266 pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1273 if snapshot.version != SNAPSHOT_VERSION {
1274 return false;
1275 }
1276 let id_to_idx = self.build_id_to_index_map();
1277
1278 for (corr_id, groups) in snapshot.windows {
1279 if let Some(&idx) = id_to_idx.get(&corr_id) {
1280 for (gk, ws) in groups {
1281 self.state.insert((idx, gk), ws);
1282 }
1283 }
1284 }
1285
1286 for (corr_id, groups) in snapshot.last_alert {
1287 if let Some(&idx) = id_to_idx.get(&corr_id) {
1288 for (gk, ts) in groups {
1289 self.last_alert.insert((idx, gk), ts);
1290 }
1291 }
1292 }
1293
1294 for (corr_id, groups) in snapshot.event_buffers {
1295 if let Some(&idx) = id_to_idx.get(&corr_id) {
1296 for (gk, buf) in groups {
1297 self.event_buffers.insert((idx, gk), buf);
1298 }
1299 }
1300 }
1301
1302 for (corr_id, groups) in snapshot.event_ref_buffers {
1303 if let Some(&idx) = id_to_idx.get(&corr_id) {
1304 for (gk, buf) in groups {
1305 self.event_ref_buffers.insert((idx, gk), buf);
1306 }
1307 }
1308 }
1309
1310 true
1311 }
1312
1313 fn correlation_stable_id(&self, idx: usize) -> String {
1315 let corr = &self.correlations[idx];
1316 corr.id
1317 .clone()
1318 .or_else(|| corr.name.clone())
1319 .unwrap_or_else(|| corr.title.clone())
1320 }
1321
1322 fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1324 self.correlations
1325 .iter()
1326 .enumerate()
1327 .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1328 .collect()
1329 }
1330}
1331
1332const SNAPSHOT_VERSION: u32 = 1;
1334
1335#[derive(Debug, Clone, Serialize, serde::Deserialize)]
1342pub struct CorrelationSnapshot {
1343 #[serde(default = "default_snapshot_version")]
1345 pub version: u32,
1346 pub windows: HashMap<String, Vec<(GroupKey, WindowState)>>,
1348 pub last_alert: HashMap<String, Vec<(GroupKey, i64)>>,
1350 pub event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>>,
1352 pub event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>>,
1354}
1355
1356fn default_snapshot_version() -> u32 {
1357 1
1358}
1359
1360impl Default for CorrelationEngine {
1361 fn default() -> Self {
1362 Self::new(CorrelationConfig::default())
1363 }
1364}
1365
1366fn extract_event_ts(event: &Event, timestamp_fields: &[String]) -> Option<i64> {
1375 for field_name in timestamp_fields {
1376 if let Some(val) = event.get_field(field_name)
1377 && let Some(ts) = parse_timestamp_value(val)
1378 {
1379 return Some(ts);
1380 }
1381 }
1382 None
1383}
1384
1385fn parse_timestamp_value(val: &serde_json::Value) -> Option<i64> {
1387 match val {
1388 serde_json::Value::Number(n) => {
1389 if let Some(i) = n.as_i64() {
1390 Some(normalize_epoch(i))
1391 } else {
1392 n.as_f64().map(|f| normalize_epoch(f as i64))
1393 }
1394 }
1395 serde_json::Value::String(s) => parse_timestamp_string(s),
1396 _ => None,
1397 }
1398}
1399
1400fn normalize_epoch(v: i64) -> i64 {
1403 if v > 1_000_000_000_000 { v / 1000 } else { v }
1404}
1405
1406fn parse_timestamp_string(s: &str) -> Option<i64> {
1408 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1410 return Some(dt.timestamp());
1411 }
1412
1413 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1416 return Some(Utc.from_utc_datetime(&naive).timestamp());
1417 }
1418 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1419 return Some(Utc.from_utc_datetime(&naive).timestamp());
1420 }
1421
1422 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1424 return Some(Utc.from_utc_datetime(&naive).timestamp());
1425 }
1426 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1427 return Some(Utc.from_utc_datetime(&naive).timestamp());
1428 }
1429
1430 None
1431}
1432
1433fn value_to_string_for_count(v: &serde_json::Value) -> Option<String> {
1435 match v {
1436 serde_json::Value::String(s) => Some(s.clone()),
1437 serde_json::Value::Number(n) => Some(n.to_string()),
1438 serde_json::Value::Bool(b) => Some(b.to_string()),
1439 serde_json::Value::Null => Some("null".to_string()),
1440 _ => None,
1441 }
1442}
1443
1444fn value_to_f64(v: &serde_json::Value) -> Option<f64> {
1446 match v {
1447 serde_json::Value::Number(n) => n.as_f64(),
1448 serde_json::Value::String(s) => s.parse().ok(),
1449 _ => None,
1450 }
1451}
1452
1453#[cfg(test)]
1458mod tests {
1459 use super::*;
1460 use rsigma_parser::parse_sigma_yaml;
1461 use serde_json::json;
1462
1463 #[test]
1468 fn test_parse_timestamp_epoch_secs() {
1469 let val = json!(1720612200);
1470 assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1471 }
1472
1473 #[test]
1474 fn test_parse_timestamp_epoch_millis() {
1475 let val = json!(1720612200000i64);
1476 assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1477 }
1478
1479 #[test]
1480 fn test_parse_timestamp_rfc3339() {
1481 let val = json!("2024-07-10T12:30:00Z");
1482 let ts = parse_timestamp_value(&val).unwrap();
1483 assert_eq!(ts, 1720614600);
1484 }
1485
1486 #[test]
1487 fn test_parse_timestamp_naive() {
1488 let val = json!("2024-07-10T12:30:00");
1489 let ts = parse_timestamp_value(&val).unwrap();
1490 assert_eq!(ts, 1720614600);
1491 }
1492
1493 #[test]
1494 fn test_parse_timestamp_with_space() {
1495 let val = json!("2024-07-10 12:30:00");
1496 let ts = parse_timestamp_value(&val).unwrap();
1497 assert_eq!(ts, 1720614600);
1498 }
1499
1500 #[test]
1501 fn test_parse_timestamp_fractional() {
1502 let val = json!("2024-07-10T12:30:00.123Z");
1503 let ts = parse_timestamp_value(&val).unwrap();
1504 assert_eq!(ts, 1720614600);
1505 }
1506
1507 #[test]
1508 fn test_extract_timestamp_from_event() {
1509 let config = CorrelationConfig {
1510 timestamp_fields: vec!["@timestamp".to_string()],
1511 max_state_entries: 100_000,
1512 ..Default::default()
1513 };
1514 let engine = CorrelationEngine::new(config);
1515
1516 let v = json!({"@timestamp": "2024-07-10T12:30:00Z", "data": "test"});
1517 let event = Event::from_value(&v);
1518 let ts = engine.extract_event_timestamp(&event);
1519 assert_eq!(ts, Some(1720614600));
1520 }
1521
1522 #[test]
1523 fn test_extract_timestamp_fallback_fields() {
1524 let config = CorrelationConfig {
1525 timestamp_fields: vec![
1526 "@timestamp".to_string(),
1527 "timestamp".to_string(),
1528 "EventTime".to_string(),
1529 ],
1530 max_state_entries: 100_000,
1531 ..Default::default()
1532 };
1533 let engine = CorrelationEngine::new(config);
1534
1535 let v = json!({"timestamp": 1720613400, "data": "test"});
1537 let event = Event::from_value(&v);
1538 let ts = engine.extract_event_timestamp(&event);
1539 assert_eq!(ts, Some(1720613400));
1540 }
1541
1542 #[test]
1543 fn test_extract_timestamp_returns_none_when_missing() {
1544 let config = CorrelationConfig {
1545 timestamp_fields: vec!["@timestamp".to_string()],
1546 ..Default::default()
1547 };
1548 let engine = CorrelationEngine::new(config);
1549
1550 let v = json!({"data": "no timestamp here"});
1551 let event = Event::from_value(&v);
1552 assert_eq!(engine.extract_event_timestamp(&event), None);
1553 }
1554
1555 #[test]
1556 fn test_timestamp_fallback_skip() {
1557 let yaml = r#"
1558title: test rule
1559id: ts-skip-rule
1560logsource:
1561 product: test
1562detection:
1563 selection:
1564 action: click
1565 condition: selection
1566level: low
1567---
1568title: test correlation
1569correlation:
1570 type: event_count
1571 rules:
1572 - ts-skip-rule
1573 group-by:
1574 - User
1575 timespan: 10s
1576 condition:
1577 gte: 2
1578level: high
1579"#;
1580 let collection = parse_sigma_yaml(yaml).unwrap();
1581 let mut engine = CorrelationEngine::new(CorrelationConfig {
1582 timestamp_fallback: TimestampFallback::Skip,
1583 ..Default::default()
1584 });
1585 engine.add_collection(&collection).unwrap();
1586 assert_eq!(engine.correlation_rule_count(), 1);
1587
1588 let v = json!({"action": "click", "User": "alice"});
1590 let event = Event::from_value(&v);
1591
1592 let r1 = engine.process_event(&event);
1593 assert!(!r1.detections.is_empty(), "detection should still fire");
1594
1595 let r2 = engine.process_event(&event);
1596 assert!(!r2.detections.is_empty(), "detection should still fire");
1597
1598 let r3 = engine.process_event(&event);
1599 assert!(!r3.detections.is_empty(), "detection should still fire");
1600
1601 assert!(r1.correlations.is_empty());
1603 assert!(r2.correlations.is_empty());
1604 assert!(r3.correlations.is_empty());
1605 }
1606
1607 #[test]
1608 fn test_timestamp_fallback_wallclock_default() {
1609 let yaml = r#"
1610title: test rule
1611id: ts-wc-rule
1612logsource:
1613 product: test
1614detection:
1615 selection:
1616 action: click
1617 condition: selection
1618level: low
1619---
1620title: test correlation
1621correlation:
1622 type: event_count
1623 rules:
1624 - ts-wc-rule
1625 group-by:
1626 - User
1627 timespan: 60s
1628 condition:
1629 gte: 2
1630level: high
1631"#;
1632 let collection = parse_sigma_yaml(yaml).unwrap();
1633 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1634 engine.add_collection(&collection).unwrap();
1635 assert_eq!(engine.correlation_rule_count(), 1);
1636
1637 let v = json!({"action": "click", "User": "alice"});
1640 let event = Event::from_value(&v);
1641
1642 let _r1 = engine.process_event(&event);
1643 let _r2 = engine.process_event(&event);
1644 let r3 = engine.process_event(&event);
1645
1646 assert!(
1648 !r3.correlations.is_empty(),
1649 "WallClock fallback should allow correlation"
1650 );
1651 }
1652
1653 #[test]
1658 fn test_event_count_basic() {
1659 let yaml = r#"
1660title: Base Rule
1661id: base-rule-001
1662name: base_rule
1663logsource:
1664 product: windows
1665 category: process_creation
1666detection:
1667 selection:
1668 CommandLine|contains: 'whoami'
1669 condition: selection
1670level: low
1671---
1672title: Multiple Whoami
1673id: corr-001
1674correlation:
1675 type: event_count
1676 rules:
1677 - base-rule-001
1678 group-by:
1679 - User
1680 timespan: 60s
1681 condition:
1682 gte: 3
1683level: high
1684"#;
1685 let collection = parse_sigma_yaml(yaml).unwrap();
1686 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1687 engine.add_collection(&collection).unwrap();
1688
1689 assert_eq!(engine.detection_rule_count(), 1);
1690 assert_eq!(engine.correlation_rule_count(), 1);
1691
1692 let base_ts = 1000i64;
1694 for i in 0..3 {
1695 let v = json!({"CommandLine": "whoami", "User": "admin"});
1696 let event = Event::from_value(&v);
1697 let result = engine.process_event_at(&event, base_ts + i * 10);
1698
1699 assert_eq!(result.detections.len(), 1);
1701
1702 if i < 2 {
1703 assert!(result.correlations.is_empty());
1705 } else {
1706 assert_eq!(result.correlations.len(), 1);
1708 assert_eq!(result.correlations[0].rule_title, "Multiple Whoami");
1709 assert_eq!(result.correlations[0].aggregated_value, 3.0);
1710 }
1711 }
1712 }
1713
1714 #[test]
1715 fn test_event_count_different_groups() {
1716 let yaml = r#"
1717title: Login
1718id: login-001
1719logsource:
1720 category: auth
1721detection:
1722 selection:
1723 EventType: login
1724 condition: selection
1725level: low
1726---
1727title: Many Logins
1728id: corr-login
1729correlation:
1730 type: event_count
1731 rules:
1732 - login-001
1733 group-by:
1734 - User
1735 timespan: 60s
1736 condition:
1737 gte: 3
1738level: high
1739"#;
1740 let collection = parse_sigma_yaml(yaml).unwrap();
1741 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1742 engine.add_collection(&collection).unwrap();
1743
1744 let ts = 1000i64;
1746 for i in 0..2 {
1747 let v = json!({"EventType": "login", "User": "alice"});
1748 let event = Event::from_value(&v);
1749 let r = engine.process_event_at(&event, ts + i);
1750 assert!(r.correlations.is_empty());
1751 }
1752 for i in 0..3 {
1753 let v = json!({"EventType": "login", "User": "bob"});
1754 let event = Event::from_value(&v);
1755 let r = engine.process_event_at(&event, ts + i);
1756 if i == 2 {
1757 assert_eq!(r.correlations.len(), 1);
1758 assert_eq!(
1759 r.correlations[0].group_key,
1760 vec![("User".to_string(), "bob".to_string())]
1761 );
1762 }
1763 }
1764 }
1765
1766 #[test]
1767 fn test_event_count_window_expiry() {
1768 let yaml = r#"
1769title: Base
1770id: base-002
1771logsource:
1772 category: test
1773detection:
1774 selection:
1775 action: click
1776 condition: selection
1777---
1778title: Rapid Clicks
1779id: corr-002
1780correlation:
1781 type: event_count
1782 rules:
1783 - base-002
1784 group-by:
1785 - User
1786 timespan: 10s
1787 condition:
1788 gte: 3
1789level: medium
1790"#;
1791 let collection = parse_sigma_yaml(yaml).unwrap();
1792 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1793 engine.add_collection(&collection).unwrap();
1794
1795 let v = json!({"action": "click", "User": "admin"});
1797 let event = Event::from_value(&v);
1798 engine.process_event_at(&event, 0);
1799 engine.process_event_at(&event, 1);
1800 let r = engine.process_event_at(&event, 15);
1801 assert!(r.correlations.is_empty());
1803 }
1804
1805 #[test]
1810 fn test_value_count() {
1811 let yaml = r#"
1812title: Failed Login
1813id: failed-login-001
1814logsource:
1815 category: auth
1816detection:
1817 selection:
1818 EventType: failed_login
1819 condition: selection
1820level: low
1821---
1822title: Failed Logins From Many Users
1823id: corr-vc-001
1824correlation:
1825 type: value_count
1826 rules:
1827 - failed-login-001
1828 group-by:
1829 - Host
1830 timespan: 60s
1831 condition:
1832 field: User
1833 gte: 3
1834level: high
1835"#;
1836 let collection = parse_sigma_yaml(yaml).unwrap();
1837 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1838 engine.add_collection(&collection).unwrap();
1839
1840 let ts = 1000i64;
1841 for (i, user) in ["alice", "bob", "charlie"].iter().enumerate() {
1843 let v = json!({"EventType": "failed_login", "Host": "srv01", "User": user});
1844 let event = Event::from_value(&v);
1845 let r = engine.process_event_at(&event, ts + i as i64);
1846 if i == 2 {
1847 assert_eq!(r.correlations.len(), 1);
1848 assert_eq!(r.correlations[0].aggregated_value, 3.0);
1849 }
1850 }
1851 }
1852
1853 #[test]
1858 fn test_temporal() {
1859 let yaml = r#"
1860title: Recon A
1861id: recon-a
1862name: recon_a
1863logsource:
1864 category: process
1865detection:
1866 selection:
1867 CommandLine|contains: 'whoami'
1868 condition: selection
1869---
1870title: Recon B
1871id: recon-b
1872name: recon_b
1873logsource:
1874 category: process
1875detection:
1876 selection:
1877 CommandLine|contains: 'ipconfig'
1878 condition: selection
1879---
1880title: Recon Combo
1881id: corr-temporal
1882correlation:
1883 type: temporal
1884 rules:
1885 - recon-a
1886 - recon-b
1887 group-by:
1888 - User
1889 timespan: 60s
1890 condition:
1891 gte: 2
1892level: high
1893"#;
1894 let collection = parse_sigma_yaml(yaml).unwrap();
1895 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1896 engine.add_collection(&collection).unwrap();
1897
1898 let ts = 1000i64;
1899 let v1 = json!({"CommandLine": "whoami", "User": "admin"});
1901 let ev1 = Event::from_value(&v1);
1902 let r1 = engine.process_event_at(&ev1, ts);
1903 assert!(r1.correlations.is_empty());
1904
1905 let v2 = json!({"CommandLine": "ipconfig /all", "User": "admin"});
1907 let ev2 = Event::from_value(&v2);
1908 let r2 = engine.process_event_at(&ev2, ts + 10);
1909 assert_eq!(r2.correlations.len(), 1);
1910 assert_eq!(r2.correlations[0].rule_title, "Recon Combo");
1911 }
1912
1913 #[test]
1918 fn test_temporal_ordered() {
1919 let yaml = r#"
1920title: Failed Login
1921id: failed-001
1922name: failed_login
1923logsource:
1924 category: auth
1925detection:
1926 selection:
1927 EventType: failed_login
1928 condition: selection
1929---
1930title: Success Login
1931id: success-001
1932name: successful_login
1933logsource:
1934 category: auth
1935detection:
1936 selection:
1937 EventType: success_login
1938 condition: selection
1939---
1940title: Brute Force Then Login
1941id: corr-bf
1942correlation:
1943 type: temporal_ordered
1944 rules:
1945 - failed-001
1946 - success-001
1947 group-by:
1948 - User
1949 timespan: 60s
1950 condition:
1951 gte: 2
1952level: critical
1953"#;
1954 let collection = parse_sigma_yaml(yaml).unwrap();
1955 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1956 engine.add_collection(&collection).unwrap();
1957
1958 let ts = 1000i64;
1959 let v1 = json!({"EventType": "failed_login", "User": "admin"});
1961 let ev1 = Event::from_value(&v1);
1962 let r1 = engine.process_event_at(&ev1, ts);
1963 assert!(r1.correlations.is_empty());
1964
1965 let v2 = json!({"EventType": "success_login", "User": "admin"});
1967 let ev2 = Event::from_value(&v2);
1968 let r2 = engine.process_event_at(&ev2, ts + 10);
1969 assert_eq!(r2.correlations.len(), 1);
1970 }
1971
1972 #[test]
1973 fn test_temporal_ordered_wrong_order() {
1974 let yaml = r#"
1975title: Rule A
1976id: rule-a
1977logsource:
1978 category: test
1979detection:
1980 selection:
1981 type: a
1982 condition: selection
1983---
1984title: Rule B
1985id: rule-b
1986logsource:
1987 category: test
1988detection:
1989 selection:
1990 type: b
1991 condition: selection
1992---
1993title: A then B
1994id: corr-ab
1995correlation:
1996 type: temporal_ordered
1997 rules:
1998 - rule-a
1999 - rule-b
2000 group-by:
2001 - User
2002 timespan: 60s
2003 condition:
2004 gte: 2
2005level: high
2006"#;
2007 let collection = parse_sigma_yaml(yaml).unwrap();
2008 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2009 engine.add_collection(&collection).unwrap();
2010
2011 let ts = 1000i64;
2012 let v1 = json!({"type": "b", "User": "admin"});
2014 let ev1 = Event::from_value(&v1);
2015 engine.process_event_at(&ev1, ts);
2016
2017 let v2 = json!({"type": "a", "User": "admin"});
2018 let ev2 = Event::from_value(&v2);
2019 let r2 = engine.process_event_at(&ev2, ts + 10);
2020 assert!(r2.correlations.is_empty());
2021 }
2022
2023 #[test]
2028 fn test_value_sum() {
2029 let yaml = r#"
2030title: Web Access
2031id: web-001
2032logsource:
2033 category: web
2034detection:
2035 selection:
2036 action: upload
2037 condition: selection
2038---
2039title: Large Upload
2040id: corr-sum
2041correlation:
2042 type: value_sum
2043 rules:
2044 - web-001
2045 group-by:
2046 - User
2047 timespan: 60s
2048 condition:
2049 field: bytes_sent
2050 gt: 1000
2051level: high
2052"#;
2053 let collection = parse_sigma_yaml(yaml).unwrap();
2054 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2055 engine.add_collection(&collection).unwrap();
2056
2057 let ts = 1000i64;
2058 let v1 = json!({"action": "upload", "User": "alice", "bytes_sent": 600});
2059 let ev1 = Event::from_value(&v1);
2060 let r1 = engine.process_event_at(&ev1, ts);
2061 assert!(r1.correlations.is_empty());
2062
2063 let v2 = json!({"action": "upload", "User": "alice", "bytes_sent": 500});
2064 let ev2 = Event::from_value(&v2);
2065 let r2 = engine.process_event_at(&ev2, ts + 5);
2066 assert_eq!(r2.correlations.len(), 1);
2067 assert!((r2.correlations[0].aggregated_value - 1100.0).abs() < f64::EPSILON);
2068 }
2069
2070 #[test]
2071 fn test_value_avg() {
2072 let yaml = r#"
2073title: Request
2074id: req-001
2075logsource:
2076 category: web
2077detection:
2078 selection:
2079 type: request
2080 condition: selection
2081---
2082title: High Avg Latency
2083id: corr-avg
2084correlation:
2085 type: value_avg
2086 rules:
2087 - req-001
2088 group-by:
2089 - Service
2090 timespan: 60s
2091 condition:
2092 field: latency_ms
2093 gt: 500
2094level: medium
2095"#;
2096 let collection = parse_sigma_yaml(yaml).unwrap();
2097 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2098 engine.add_collection(&collection).unwrap();
2099
2100 let ts = 1000i64;
2101 for (i, latency) in [400, 600, 800].iter().enumerate() {
2103 let v = json!({"type": "request", "Service": "api", "latency_ms": latency});
2104 let event = Event::from_value(&v);
2105 let r = engine.process_event_at(&event, ts + i as i64);
2106 if i == 2 {
2107 assert_eq!(r.correlations.len(), 1);
2108 assert!((r.correlations[0].aggregated_value - 600.0).abs() < f64::EPSILON);
2109 }
2110 }
2111 }
2112
2113 #[test]
2118 fn test_state_count() {
2119 let yaml = r#"
2120title: Base
2121id: base-sc
2122logsource:
2123 category: test
2124detection:
2125 selection:
2126 action: test
2127 condition: selection
2128---
2129title: Count
2130id: corr-sc
2131correlation:
2132 type: event_count
2133 rules:
2134 - base-sc
2135 group-by:
2136 - User
2137 timespan: 60s
2138 condition:
2139 gte: 100
2140level: low
2141"#;
2142 let collection = parse_sigma_yaml(yaml).unwrap();
2143 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2144 engine.add_collection(&collection).unwrap();
2145
2146 let v = json!({"action": "test", "User": "alice"});
2147 let event = Event::from_value(&v);
2148 engine.process_event_at(&event, 1000);
2149 assert_eq!(engine.state_count(), 1);
2150
2151 let v2 = json!({"action": "test", "User": "bob"});
2152 let event2 = Event::from_value(&v2);
2153 engine.process_event_at(&event2, 1001);
2154 assert_eq!(engine.state_count(), 2);
2155
2156 engine.evict_expired(2000);
2158 assert_eq!(engine.state_count(), 0);
2159 }
2160
2161 #[test]
2166 fn test_generate_flag_default_false() {
2167 let yaml = r#"
2168title: Base
2169id: gen-base
2170logsource:
2171 category: test
2172detection:
2173 selection:
2174 action: test
2175 condition: selection
2176---
2177title: Correlation
2178id: gen-corr
2179correlation:
2180 type: event_count
2181 rules:
2182 - gen-base
2183 group-by:
2184 - User
2185 timespan: 60s
2186 condition:
2187 gte: 1
2188level: high
2189"#;
2190 let collection = parse_sigma_yaml(yaml).unwrap();
2191 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2192 engine.add_collection(&collection).unwrap();
2193
2194 let v = json!({"action": "test", "User": "alice"});
2197 let event = Event::from_value(&v);
2198 let r = engine.process_event_at(&event, 1000);
2199 assert_eq!(r.detections.len(), 1);
2200 assert_eq!(r.correlations.len(), 1);
2201 }
2202
2203 #[test]
2208 fn test_aws_bucket_enumeration() {
2209 let yaml = r#"
2210title: Potential Bucket Enumeration on AWS
2211id: f305fd62-beca-47da-ad95-7690a0620084
2212logsource:
2213 product: aws
2214 service: cloudtrail
2215detection:
2216 selection:
2217 eventSource: "s3.amazonaws.com"
2218 eventName: "ListBuckets"
2219 condition: selection
2220level: low
2221---
2222title: Multiple AWS bucket enumerations
2223id: be246094-01d3-4bba-88de-69e582eba0cc
2224status: experimental
2225correlation:
2226 type: event_count
2227 rules:
2228 - f305fd62-beca-47da-ad95-7690a0620084
2229 group-by:
2230 - userIdentity.arn
2231 timespan: 1h
2232 condition:
2233 gte: 5
2234level: high
2235"#;
2236 let collection = parse_sigma_yaml(yaml).unwrap();
2237 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2238 engine.add_collection(&collection).unwrap();
2239
2240 let base_ts = 1_700_000_000i64;
2241 for i in 0..5 {
2242 let v = json!({
2243 "eventSource": "s3.amazonaws.com",
2244 "eventName": "ListBuckets",
2245 "userIdentity.arn": "arn:aws:iam::123456789:user/attacker"
2246 });
2247 let event = Event::from_value(&v);
2248 let r = engine.process_event_at(&event, base_ts + i * 60);
2249 if i == 4 {
2250 assert_eq!(r.correlations.len(), 1);
2251 assert_eq!(
2252 r.correlations[0].rule_title,
2253 "Multiple AWS bucket enumerations"
2254 );
2255 assert_eq!(r.correlations[0].aggregated_value, 5.0);
2256 }
2257 }
2258 }
2259
2260 #[test]
2265 fn test_chaining_event_count_to_temporal() {
2266 let yaml = r#"
2269title: Single failed login
2270id: failed-login-chain
2271name: failed_login
2272logsource:
2273 category: auth
2274detection:
2275 selection:
2276 EventType: failed_login
2277 condition: selection
2278---
2279title: Successful login
2280id: success-login-chain
2281name: successful_login
2282logsource:
2283 category: auth
2284detection:
2285 selection:
2286 EventType: success_login
2287 condition: selection
2288---
2289title: Multiple failed logins
2290id: many-failed-chain
2291name: multiple_failed_login
2292correlation:
2293 type: event_count
2294 rules:
2295 - failed-login-chain
2296 group-by:
2297 - User
2298 timespan: 60s
2299 condition:
2300 gte: 3
2301level: medium
2302---
2303title: Brute Force Followed by Login
2304id: brute-force-chain
2305correlation:
2306 type: temporal_ordered
2307 rules:
2308 - many-failed-chain
2309 - success-login-chain
2310 group-by:
2311 - User
2312 timespan: 120s
2313 condition:
2314 gte: 2
2315level: critical
2316"#;
2317 let collection = parse_sigma_yaml(yaml).unwrap();
2318 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2319 engine.add_collection(&collection).unwrap();
2320
2321 assert_eq!(engine.detection_rule_count(), 2);
2322 assert_eq!(engine.correlation_rule_count(), 2);
2323
2324 let ts = 1000i64;
2325
2326 for i in 0..3 {
2328 let v = json!({"EventType": "failed_login", "User": "victim"});
2329 let event = Event::from_value(&v);
2330 let r = engine.process_event_at(&event, ts + i);
2331 if i == 2 {
2332 assert!(
2334 r.correlations
2335 .iter()
2336 .any(|c| c.rule_title == "Multiple failed logins"),
2337 "Expected event_count correlation to fire"
2338 );
2339 }
2340 }
2341
2342 let v = json!({"EventType": "success_login", "User": "victim"});
2349 let event = Event::from_value(&v);
2350 let r = engine.process_event_at(&event, ts + 30);
2351
2352 assert_eq!(r.detections.len(), 1);
2354 assert_eq!(r.detections[0].rule_title, "Successful login");
2355 }
2356
2357 #[test]
2362 fn test_field_aliases() {
2363 let yaml = r#"
2364title: Internal Error
2365id: internal-error-001
2366name: internal_error
2367logsource:
2368 category: web
2369detection:
2370 selection:
2371 http.response.status_code: 500
2372 condition: selection
2373---
2374title: New Connection
2375id: new-conn-001
2376name: new_network_connection
2377logsource:
2378 category: network
2379detection:
2380 selection:
2381 event.type: connection
2382 condition: selection
2383---
2384title: Error Then Connection
2385id: corr-alias
2386correlation:
2387 type: temporal
2388 rules:
2389 - internal-error-001
2390 - new-conn-001
2391 group-by:
2392 - internal_ip
2393 timespan: 60s
2394 condition:
2395 gte: 2
2396 aliases:
2397 internal_ip:
2398 internal_error: destination.ip
2399 new_network_connection: source.ip
2400level: high
2401"#;
2402 let collection = parse_sigma_yaml(yaml).unwrap();
2403 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2404 engine.add_collection(&collection).unwrap();
2405
2406 let ts = 1000i64;
2407
2408 let v1 = json!({
2410 "http.response.status_code": 500,
2411 "destination.ip": "10.0.0.5"
2412 });
2413 let ev1 = Event::from_value(&v1);
2414 let r1 = engine.process_event_at(&ev1, ts);
2415 assert_eq!(r1.detections.len(), 1);
2416 assert!(r1.correlations.is_empty());
2417
2418 let v2 = json!({
2420 "event.type": "connection",
2421 "source.ip": "10.0.0.5"
2422 });
2423 let ev2 = Event::from_value(&v2);
2424 let r2 = engine.process_event_at(&ev2, ts + 5);
2425 assert_eq!(r2.detections.len(), 1);
2426 assert_eq!(r2.correlations.len(), 1);
2428 assert_eq!(r2.correlations[0].rule_title, "Error Then Connection");
2429 assert!(
2431 r2.correlations[0]
2432 .group_key
2433 .iter()
2434 .any(|(k, v)| k == "internal_ip" && v == "10.0.0.5")
2435 );
2436 }
2437
2438 #[test]
2443 fn test_value_percentile() {
2444 let yaml = r#"
2445title: Process Creation
2446id: proc-001
2447logsource:
2448 category: process
2449detection:
2450 selection:
2451 type: process_creation
2452 condition: selection
2453---
2454title: Rare Process
2455id: corr-percentile
2456correlation:
2457 type: value_percentile
2458 rules:
2459 - proc-001
2460 group-by:
2461 - ComputerName
2462 timespan: 60s
2463 condition:
2464 field: image
2465 lte: 50
2466level: medium
2467"#;
2468 let collection = parse_sigma_yaml(yaml).unwrap();
2469 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2470 engine.add_collection(&collection).unwrap();
2471
2472 let ts = 1000i64;
2473 for (i, val) in [10.0, 20.0, 30.0, 40.0, 50.0].iter().enumerate() {
2475 let v = json!({"type": "process_creation", "ComputerName": "srv01", "image": val});
2476 let event = Event::from_value(&v);
2477 let _ = engine.process_event_at(&event, ts + i as i64);
2478 }
2479 }
2482
2483 #[test]
2488 fn test_extended_temporal_and_condition() {
2489 let yaml = r#"
2491title: Login Attempt
2492id: login-attempt
2493logsource:
2494 category: auth
2495detection:
2496 selection:
2497 EventType: login_failure
2498 condition: selection
2499---
2500title: Password Change
2501id: password-change
2502logsource:
2503 category: auth
2504detection:
2505 selection:
2506 EventType: password_change
2507 condition: selection
2508---
2509title: Credential Attack
2510correlation:
2511 type: temporal
2512 rules:
2513 - login-attempt
2514 - password-change
2515 group-by:
2516 - User
2517 timespan: 300s
2518 condition: login-attempt and password-change
2519level: high
2520"#;
2521 let collection = parse_sigma_yaml(yaml).unwrap();
2522 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2523 engine.add_collection(&collection).unwrap();
2524
2525 let ts = 1000i64;
2526
2527 let ev1 = json!({"EventType": "login_failure", "User": "alice"});
2529 let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2530 assert!(r1.correlations.is_empty(), "only one rule fired so far");
2531
2532 let ev2 = json!({"EventType": "password_change", "User": "alice"});
2534 let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 10);
2535 assert_eq!(
2536 r2.correlations.len(),
2537 1,
2538 "temporal correlation should fire: both rules matched"
2539 );
2540 assert_eq!(r2.correlations[0].rule_title, "Credential Attack");
2541 }
2542
2543 #[test]
2544 fn test_extended_temporal_or_condition() {
2545 let yaml = r#"
2547title: SSH Login
2548id: ssh-login
2549logsource:
2550 category: auth
2551detection:
2552 selection:
2553 EventType: ssh_login
2554 condition: selection
2555---
2556title: VPN Login
2557id: vpn-login
2558logsource:
2559 category: auth
2560detection:
2561 selection:
2562 EventType: vpn_login
2563 condition: selection
2564---
2565title: Any Remote Access
2566correlation:
2567 type: temporal
2568 rules:
2569 - ssh-login
2570 - vpn-login
2571 group-by:
2572 - User
2573 timespan: 60s
2574 condition: ssh-login or vpn-login
2575level: medium
2576"#;
2577 let collection = parse_sigma_yaml(yaml).unwrap();
2578 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2579 engine.add_collection(&collection).unwrap();
2580
2581 let ev = json!({"EventType": "ssh_login", "User": "bob"});
2583 let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2584 assert_eq!(r.correlations.len(), 1);
2585 assert_eq!(r.correlations[0].rule_title, "Any Remote Access");
2586 }
2587
2588 #[test]
2589 fn test_extended_temporal_partial_and_no_fire() {
2590 let yaml = r#"
2592title: Recon Step 1
2593id: recon-1
2594logsource:
2595 category: process
2596detection:
2597 selection:
2598 CommandLine|contains: 'whoami'
2599 condition: selection
2600---
2601title: Recon Step 2
2602id: recon-2
2603logsource:
2604 category: process
2605detection:
2606 selection:
2607 CommandLine|contains: 'ipconfig'
2608 condition: selection
2609---
2610title: Full Recon
2611correlation:
2612 type: temporal
2613 rules:
2614 - recon-1
2615 - recon-2
2616 group-by:
2617 - Host
2618 timespan: 120s
2619 condition: recon-1 and recon-2
2620level: high
2621"#;
2622 let collection = parse_sigma_yaml(yaml).unwrap();
2623 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2624 engine.add_collection(&collection).unwrap();
2625
2626 let ev = json!({"CommandLine": "whoami", "Host": "srv01"});
2628 let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2629 assert!(r.correlations.is_empty(), "only one of two AND rules fired");
2630
2631 let ev2 = json!({"CommandLine": "ipconfig /all", "Host": "srv01"});
2633 let r2 = engine.process_event_at(&Event::from_value(&ev2), 1010);
2634 assert_eq!(r2.correlations.len(), 1);
2635 assert_eq!(r2.correlations[0].rule_title, "Full Recon");
2636 }
2637
2638 #[test]
2643 fn test_filter_with_correlation() {
2644 let yaml = r#"
2646title: Failed Auth
2647id: failed-auth
2648logsource:
2649 category: auth
2650detection:
2651 selection:
2652 EventType: auth_failure
2653 condition: selection
2654---
2655title: Exclude Service Accounts
2656filter:
2657 rules:
2658 - failed-auth
2659 selection:
2660 User|startswith: 'svc_'
2661 condition: selection
2662---
2663title: Brute Force
2664correlation:
2665 type: event_count
2666 rules:
2667 - failed-auth
2668 group-by:
2669 - User
2670 timespan: 300s
2671 condition:
2672 gte: 3
2673level: critical
2674"#;
2675 let collection = parse_sigma_yaml(yaml).unwrap();
2676 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2677 engine.add_collection(&collection).unwrap();
2678
2679 let ts = 1000i64;
2680
2681 for i in 0..5 {
2683 let ev = json!({"EventType": "auth_failure", "User": "svc_backup"});
2684 let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2685 assert!(
2686 r.correlations.is_empty(),
2687 "service account should be filtered, no correlation"
2688 );
2689 }
2690
2691 for i in 0..2 {
2693 let ev = json!({"EventType": "auth_failure", "User": "alice"});
2694 let r = engine.process_event_at(&Event::from_value(&ev), ts + 10 + i);
2695 assert!(r.correlations.is_empty(), "not yet 3 events");
2696 }
2697
2698 let ev = json!({"EventType": "auth_failure", "User": "alice"});
2700 let r = engine.process_event_at(&Event::from_value(&ev), ts + 12);
2701 assert_eq!(r.correlations.len(), 1);
2702 assert_eq!(r.correlations[0].rule_title, "Brute Force");
2703 }
2704
2705 #[test]
2710 fn test_repeat_rules_in_correlation() {
2711 let yaml = r#"
2713title: File Access A
2714id: file-a
2715logsource:
2716 category: file_access
2717detection:
2718 selection:
2719 FileName|endswith: '.docx'
2720 condition: selection
2721---
2722action: repeat
2723title: File Access B
2724id: file-b
2725detection:
2726 selection:
2727 FileName|endswith: '.xlsx'
2728 condition: selection
2729---
2730title: Mass File Access
2731correlation:
2732 type: event_count
2733 rules:
2734 - file-a
2735 - file-b
2736 group-by:
2737 - User
2738 timespan: 60s
2739 condition:
2740 gte: 3
2741level: high
2742"#;
2743 let collection = parse_sigma_yaml(yaml).unwrap();
2744 assert_eq!(collection.rules.len(), 2);
2745 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2746 engine.add_collection(&collection).unwrap();
2747 assert_eq!(engine.detection_rule_count(), 2);
2748
2749 let ts = 1000i64;
2750 let ev1 = json!({"FileName": "report.docx", "User": "bob"});
2752 engine.process_event_at(&Event::from_value(&ev1), ts);
2753 let ev2 = json!({"FileName": "data.xlsx", "User": "bob"});
2754 engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2755 let ev3 = json!({"FileName": "notes.docx", "User": "bob"});
2756 let r = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2757
2758 assert_eq!(r.correlations.len(), 1);
2759 assert_eq!(r.correlations[0].rule_title, "Mass File Access");
2760 }
2761
2762 #[test]
2767 fn test_expand_modifier_with_correlation() {
2768 let yaml = r#"
2769title: User Temp File
2770id: user-temp
2771logsource:
2772 category: file_access
2773detection:
2774 selection:
2775 FilePath|expand: 'C:\Users\%User%\Temp'
2776 condition: selection
2777---
2778title: Excessive Temp Access
2779correlation:
2780 type: event_count
2781 rules:
2782 - user-temp
2783 group-by:
2784 - User
2785 timespan: 60s
2786 condition:
2787 gte: 2
2788level: medium
2789"#;
2790 let collection = parse_sigma_yaml(yaml).unwrap();
2791 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2792 engine.add_collection(&collection).unwrap();
2793
2794 let ts = 1000i64;
2795 let ev1 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2797 let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2798 assert!(r1.correlations.is_empty());
2799
2800 let ev2 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2801 let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2802 assert_eq!(r2.correlations.len(), 1);
2803 assert_eq!(r2.correlations[0].rule_title, "Excessive Temp Access");
2804
2805 let ev3 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "bob"});
2807 let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2808 assert_eq!(r3.detections.len(), 0);
2810 }
2811
2812 #[test]
2817 fn test_timestamp_modifier_with_correlation() {
2818 let yaml = r#"
2819title: Night Login
2820id: night-login
2821logsource:
2822 category: auth
2823detection:
2824 login:
2825 EventType: login
2826 night:
2827 Timestamp|hour: 3
2828 condition: login and night
2829---
2830title: Frequent Night Logins
2831correlation:
2832 type: event_count
2833 rules:
2834 - night-login
2835 group-by:
2836 - User
2837 timespan: 3600s
2838 condition:
2839 gte: 2
2840level: high
2841"#;
2842 let collection = parse_sigma_yaml(yaml).unwrap();
2843 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2844 engine.add_collection(&collection).unwrap();
2845
2846 let ts = 1000i64;
2847 let ev1 =
2849 json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:10:00Z"});
2850 let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2851 assert_eq!(r1.detections.len(), 1);
2852 assert!(r1.correlations.is_empty());
2853
2854 let ev2 =
2855 json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:45:00Z"});
2856 let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2857 assert_eq!(r2.correlations.len(), 1);
2858 assert_eq!(r2.correlations[0].rule_title, "Frequent Night Logins");
2859
2860 let ev3 = json!({"EventType": "login", "User": "bob", "Timestamp": "2024-01-15T12:00:00Z"});
2862 let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2863 assert!(
2864 r3.detections.is_empty(),
2865 "noon login should not match night rule"
2866 );
2867 }
2868
2869 #[test]
2874 fn test_event_count_range_condition() {
2875 let yaml = r#"
2876title: Login Attempt
2877id: login-attempt-001
2878name: login_attempt
2879logsource:
2880 product: windows
2881detection:
2882 selection:
2883 EventType: login
2884 condition: selection
2885level: low
2886---
2887title: Login Count Range
2888id: corr-range-001
2889correlation:
2890 type: event_count
2891 rules:
2892 - login-attempt-001
2893 group-by:
2894 - User
2895 timespan: 3600s
2896 condition:
2897 gt: 2
2898 lte: 5
2899level: high
2900"#;
2901 let collection = parse_sigma_yaml(yaml).unwrap();
2902 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2903 engine.add_collection(&collection).unwrap();
2904
2905 let ts: i64 = 1_000_000;
2906
2907 for i in 0..2 {
2909 let ev = json!({"EventType": "login", "User": "alice"});
2910 let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2911 assert!(r.correlations.is_empty(), "2 events should not fire (gt:2)");
2912 }
2913
2914 let ev3 = json!({"EventType": "login", "User": "alice"});
2916 let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 3);
2917 assert_eq!(r3.correlations.len(), 1, "3 events: gt:2 AND lte:5");
2918
2919 for i in 4..=5 {
2921 let ev = json!({"EventType": "login", "User": "alice"});
2922 let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2923 assert_eq!(r.correlations.len(), 1, "{i} events still in range");
2924 }
2925
2926 let ev6 = json!({"EventType": "login", "User": "alice"});
2928 let r6 = engine.process_event_at(&Event::from_value(&ev6), ts + 6);
2929 assert!(
2930 r6.correlations.is_empty(),
2931 "6 events exceeds lte:5, should not fire"
2932 );
2933 }
2934
2935 fn suppression_yaml() -> &'static str {
2940 r#"
2941title: Login
2942id: login-base
2943logsource:
2944 category: auth
2945detection:
2946 selection:
2947 EventType: login
2948 condition: selection
2949---
2950title: Many Logins
2951correlation:
2952 type: event_count
2953 rules:
2954 - login-base
2955 group-by:
2956 - User
2957 timeframe: 60s
2958 condition:
2959 gte: 3
2960level: high
2961"#
2962 }
2963
2964 #[test]
2965 fn test_suppression_window() {
2966 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2967 let config = CorrelationConfig {
2968 suppress: Some(10), ..Default::default()
2970 };
2971 let mut engine = CorrelationEngine::new(config);
2972 engine.add_collection(&collection).unwrap();
2973
2974 let ev = json!({"EventType": "login", "User": "alice"});
2975 let ts = 1000;
2976
2977 engine.process_event_at(&Event::from_value(&ev), ts);
2979 engine.process_event_at(&Event::from_value(&ev), ts + 1);
2980 let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
2981 assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
2982
2983 let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
2985 assert!(
2986 r4.correlations.is_empty(),
2987 "should be suppressed within 10s window"
2988 );
2989
2990 let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 9);
2992 assert!(
2993 r5.correlations.is_empty(),
2994 "should be suppressed at ts+9 (< ts+2+10)"
2995 );
2996
2997 let r6 = engine.process_event_at(&Event::from_value(&ev), ts + 13);
2999 assert_eq!(
3000 r6.correlations.len(),
3001 1,
3002 "should fire again after suppress window expires"
3003 );
3004 }
3005
3006 #[test]
3007 fn test_suppression_per_group_key() {
3008 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3009 let config = CorrelationConfig {
3010 suppress: Some(60),
3011 ..Default::default()
3012 };
3013 let mut engine = CorrelationEngine::new(config);
3014 engine.add_collection(&collection).unwrap();
3015
3016 let ts = 1000;
3017
3018 let ev_a = json!({"EventType": "login", "User": "alice"});
3020 engine.process_event_at(&Event::from_value(&ev_a), ts);
3021 engine.process_event_at(&Event::from_value(&ev_a), ts + 1);
3022 let r = engine.process_event_at(&Event::from_value(&ev_a), ts + 2);
3023 assert_eq!(r.correlations.len(), 1, "alice should fire");
3024
3025 let ev_b = json!({"EventType": "login", "User": "bob"});
3027 engine.process_event_at(&Event::from_value(&ev_b), ts + 3);
3028 engine.process_event_at(&Event::from_value(&ev_b), ts + 4);
3029 let r = engine.process_event_at(&Event::from_value(&ev_b), ts + 5);
3030 assert_eq!(r.correlations.len(), 1, "bob should fire independently");
3031
3032 let r = engine.process_event_at(&Event::from_value(&ev_a), ts + 6);
3034 assert!(r.correlations.is_empty(), "alice still suppressed");
3035 }
3036
3037 #[test]
3042 fn test_action_reset() {
3043 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3044 let config = CorrelationConfig {
3045 action_on_match: CorrelationAction::Reset,
3046 ..Default::default()
3047 };
3048 let mut engine = CorrelationEngine::new(config);
3049 engine.add_collection(&collection).unwrap();
3050
3051 let ev = json!({"EventType": "login", "User": "alice"});
3052 let ts = 1000;
3053
3054 engine.process_event_at(&Event::from_value(&ev), ts);
3056 engine.process_event_at(&Event::from_value(&ev), ts + 1);
3057 let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
3058 assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
3059
3060 let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
3062 assert!(r4.correlations.is_empty(), "reset: need 3 more events");
3063
3064 let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 4);
3065 assert!(r5.correlations.is_empty(), "reset: still only 2");
3066
3067 let r6 = engine.process_event_at(&Event::from_value(&ev), ts + 5);
3069 assert_eq!(
3070 r6.correlations.len(),
3071 1,
3072 "should fire again after 3 events post-reset"
3073 );
3074 }
3075
3076 #[test]
3081 fn test_emit_detections_true_by_default() {
3082 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3083 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3084 engine.add_collection(&collection).unwrap();
3085
3086 let ev = json!({"EventType": "login", "User": "alice"});
3087 let r = engine.process_event_at(&Event::from_value(&ev), 1000);
3088 assert_eq!(r.detections.len(), 1, "by default detections are emitted");
3089 }
3090
3091 #[test]
3092 fn test_emit_detections_false_suppresses() {
3093 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3094 let config = CorrelationConfig {
3095 emit_detections: false,
3096 ..Default::default()
3097 };
3098 let mut engine = CorrelationEngine::new(config);
3099 engine.add_collection(&collection).unwrap();
3100
3101 let ev = json!({"EventType": "login", "User": "alice"});
3102 let r = engine.process_event_at(&Event::from_value(&ev), 1000);
3103 assert!(
3104 r.detections.is_empty(),
3105 "detection matches should be suppressed when emit_detections=false"
3106 );
3107 }
3108
3109 #[test]
3110 fn test_generate_true_keeps_detections() {
3111 let yaml = r#"
3113title: Login
3114id: login-gen
3115logsource:
3116 category: auth
3117detection:
3118 selection:
3119 EventType: login
3120 condition: selection
3121---
3122title: Many Logins
3123correlation:
3124 type: event_count
3125 rules:
3126 - login-gen
3127 group-by:
3128 - User
3129 timeframe: 60s
3130 condition:
3131 gte: 3
3132 generate: true
3133level: high
3134"#;
3135 let collection = parse_sigma_yaml(yaml).unwrap();
3136 let config = CorrelationConfig {
3137 emit_detections: false,
3138 ..Default::default()
3139 };
3140 let mut engine = CorrelationEngine::new(config);
3141 engine.add_collection(&collection).unwrap();
3142
3143 let ev = json!({"EventType": "login", "User": "alice"});
3144 let r = engine.process_event_at(&Event::from_value(&ev), 1000);
3145 assert_eq!(
3147 r.detections.len(),
3148 1,
3149 "generate:true keeps detection output"
3150 );
3151 }
3152
3153 #[test]
3158 fn test_suppress_and_reset_combined() {
3159 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3160 let config = CorrelationConfig {
3161 suppress: Some(5),
3162 action_on_match: CorrelationAction::Reset,
3163 ..Default::default()
3164 };
3165 let mut engine = CorrelationEngine::new(config);
3166 engine.add_collection(&collection).unwrap();
3167
3168 let ev = json!({"EventType": "login", "User": "alice"});
3169 let ts = 1000;
3170
3171 engine.process_event_at(&Event::from_value(&ev), ts);
3173 engine.process_event_at(&Event::from_value(&ev), ts + 1);
3174 let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
3175 assert_eq!(r3.correlations.len(), 1, "fires on 3rd event");
3176
3177 engine.process_event_at(&Event::from_value(&ev), ts + 3);
3180 engine.process_event_at(&Event::from_value(&ev), ts + 4);
3181 let r = engine.process_event_at(&Event::from_value(&ev), ts + 5);
3182 assert!(
3183 r.correlations.is_empty(),
3184 "threshold met again but still suppressed"
3185 );
3186
3187 let r = engine.process_event_at(&Event::from_value(&ev), ts + 8);
3191 assert_eq!(
3192 r.correlations.len(),
3193 1,
3194 "fires after suppress expires (accumulated events + new one)"
3195 );
3196
3197 engine.process_event_at(&Event::from_value(&ev), ts + 9);
3200 engine.process_event_at(&Event::from_value(&ev), ts + 10);
3201 let r = engine.process_event_at(&Event::from_value(&ev), ts + 11);
3202 assert!(
3203 r.correlations.is_empty(),
3204 "threshold met but suppress window hasn't expired (ts+11 - ts+8 = 3 < 5)"
3205 );
3206 }
3207
3208 #[test]
3213 fn test_no_suppression_fires_every_event() {
3214 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3215 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3216 engine.add_collection(&collection).unwrap();
3217
3218 let ev = json!({"EventType": "login", "User": "alice"});
3219 let ts = 1000;
3220
3221 engine.process_event_at(&Event::from_value(&ev), ts);
3222 engine.process_event_at(&Event::from_value(&ev), ts + 1);
3223 let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
3224 assert_eq!(r3.correlations.len(), 1);
3225
3226 let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
3228 assert_eq!(
3229 r4.correlations.len(),
3230 1,
3231 "no suppression: fires on every event after threshold"
3232 );
3233
3234 let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 4);
3235 assert_eq!(r5.correlations.len(), 1, "still fires");
3236 }
3237
3238 #[test]
3243 fn test_custom_attr_timestamp_field() {
3244 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3245 let mut attrs = std::collections::HashMap::new();
3246 attrs.insert("rsigma.timestamp_field".to_string(), "time".to_string());
3247 engine.apply_custom_attributes(&attrs);
3248
3249 assert_eq!(
3250 engine.config.timestamp_fields[0], "time",
3251 "rsigma.timestamp_field should be prepended"
3252 );
3253 assert!(
3255 engine
3256 .config
3257 .timestamp_fields
3258 .contains(&"@timestamp".to_string())
3259 );
3260 }
3261
3262 #[test]
3263 fn test_custom_attr_timestamp_field_no_duplicates() {
3264 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3265 let mut attrs = std::collections::HashMap::new();
3266 attrs.insert("rsigma.timestamp_field".to_string(), "time".to_string());
3267 engine.apply_custom_attributes(&attrs);
3269 engine.apply_custom_attributes(&attrs);
3270
3271 let count = engine
3272 .config
3273 .timestamp_fields
3274 .iter()
3275 .filter(|f| *f == "time")
3276 .count();
3277 assert_eq!(count, 1, "should not duplicate timestamp_field entries");
3278 }
3279
3280 #[test]
3281 fn test_custom_attr_suppress() {
3282 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3283 assert!(engine.config.suppress.is_none());
3284
3285 let mut attrs = std::collections::HashMap::new();
3286 attrs.insert("rsigma.suppress".to_string(), "5m".to_string());
3287 engine.apply_custom_attributes(&attrs);
3288
3289 assert_eq!(engine.config.suppress, Some(300));
3290 }
3291
3292 #[test]
3293 fn test_custom_attr_suppress_does_not_override_cli() {
3294 let config = CorrelationConfig {
3295 suppress: Some(60), ..Default::default()
3297 };
3298 let mut engine = CorrelationEngine::new(config);
3299
3300 let mut attrs = std::collections::HashMap::new();
3301 attrs.insert("rsigma.suppress".to_string(), "5m".to_string());
3302 engine.apply_custom_attributes(&attrs);
3303
3304 assert_eq!(
3305 engine.config.suppress,
3306 Some(60),
3307 "CLI suppress should not be overridden by custom attribute"
3308 );
3309 }
3310
3311 #[test]
3312 fn test_custom_attr_action() {
3313 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3314 assert_eq!(engine.config.action_on_match, CorrelationAction::Alert);
3315
3316 let mut attrs = std::collections::HashMap::new();
3317 attrs.insert("rsigma.action".to_string(), "reset".to_string());
3318 engine.apply_custom_attributes(&attrs);
3319
3320 assert_eq!(engine.config.action_on_match, CorrelationAction::Reset);
3321 }
3322
3323 #[test]
3324 fn test_custom_attr_action_does_not_override_cli() {
3325 let config = CorrelationConfig {
3326 action_on_match: CorrelationAction::Reset, ..Default::default()
3328 };
3329 let mut engine = CorrelationEngine::new(config);
3330
3331 let mut attrs = std::collections::HashMap::new();
3332 attrs.insert("rsigma.action".to_string(), "alert".to_string());
3333 engine.apply_custom_attributes(&attrs);
3334
3335 assert_eq!(
3336 engine.config.action_on_match,
3337 CorrelationAction::Reset,
3338 "CLI action should not be overridden by custom attribute"
3339 );
3340 }
3341
3342 #[test]
3343 fn test_custom_attr_timestamp_field_used_for_extraction() {
3344 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3346 let mut config = CorrelationConfig::default();
3347 config.timestamp_fields.insert(0, "event_time".to_string());
3349 let mut engine = CorrelationEngine::new(config);
3350 engine.add_collection(&collection).unwrap();
3351
3352 let ev = json!({
3354 "EventType": "login",
3355 "User": "alice",
3356 "event_time": "2026-02-11T12:00:00Z"
3357 });
3358 let result = engine.process_event(&Event::from_value(&ev));
3359
3360 assert!(!result.detections.is_empty() || result.correlations.is_empty());
3362 let ts = engine
3366 .extract_event_timestamp(&Event::from_value(&ev))
3367 .expect("should extract timestamp");
3368 assert!(
3369 ts > 1_700_000_000 && ts < 1_800_000_000,
3370 "timestamp should be ~2026 epoch, got {ts}"
3371 );
3372 }
3373
3374 #[test]
3379 fn test_correlation_cycle_direct() {
3380 let yaml = r#"
3382title: detection rule
3383id: det-rule
3384logsource:
3385 product: test
3386detection:
3387 selection:
3388 action: click
3389 condition: selection
3390level: low
3391---
3392title: correlation A
3393id: corr-a
3394correlation:
3395 type: event_count
3396 rules:
3397 - corr-b
3398 group-by:
3399 - User
3400 timespan: 5m
3401 condition:
3402 gte: 2
3403level: high
3404---
3405title: correlation B
3406id: corr-b
3407correlation:
3408 type: event_count
3409 rules:
3410 - corr-a
3411 group-by:
3412 - User
3413 timespan: 5m
3414 condition:
3415 gte: 2
3416level: high
3417"#;
3418 let collection = parse_sigma_yaml(yaml).unwrap();
3419 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3420 let result = engine.add_collection(&collection);
3421 assert!(result.is_err(), "should detect direct cycle");
3422 let err = result.unwrap_err().to_string();
3423 assert!(err.contains("cycle"), "error should mention cycle: {err}");
3424 assert!(
3425 err.contains("corr-a") && err.contains("corr-b"),
3426 "error should name both correlations: {err}"
3427 );
3428 }
3429
3430 #[test]
3431 fn test_correlation_cycle_self() {
3432 let yaml = r#"
3434title: detection rule
3435id: det-rule
3436logsource:
3437 product: test
3438detection:
3439 selection:
3440 action: click
3441 condition: selection
3442level: low
3443---
3444title: self-ref correlation
3445id: self-corr
3446correlation:
3447 type: event_count
3448 rules:
3449 - self-corr
3450 group-by:
3451 - User
3452 timespan: 5m
3453 condition:
3454 gte: 2
3455level: high
3456"#;
3457 let collection = parse_sigma_yaml(yaml).unwrap();
3458 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3459 let result = engine.add_collection(&collection);
3460 assert!(result.is_err(), "should detect self-referencing cycle");
3461 let err = result.unwrap_err().to_string();
3462 assert!(err.contains("cycle"), "error should mention cycle: {err}");
3463 assert!(
3464 err.contains("self-corr"),
3465 "error should name the correlation: {err}"
3466 );
3467 }
3468
3469 #[test]
3470 fn test_correlation_no_cycle_valid_chain() {
3471 let yaml = r#"
3473title: detection rule
3474id: det-rule
3475logsource:
3476 product: test
3477detection:
3478 selection:
3479 action: click
3480 condition: selection
3481level: low
3482---
3483title: correlation A
3484id: corr-a
3485correlation:
3486 type: event_count
3487 rules:
3488 - det-rule
3489 group-by:
3490 - User
3491 timespan: 5m
3492 condition:
3493 gte: 2
3494level: high
3495---
3496title: correlation B
3497id: corr-b
3498correlation:
3499 type: event_count
3500 rules:
3501 - corr-a
3502 group-by:
3503 - User
3504 timespan: 5m
3505 condition:
3506 gte: 2
3507level: high
3508"#;
3509 let collection = parse_sigma_yaml(yaml).unwrap();
3510 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3511 let result = engine.add_collection(&collection);
3512 assert!(
3513 result.is_ok(),
3514 "valid chain should not be rejected: {result:?}"
3515 );
3516 }
3517
3518 #[test]
3519 fn test_correlation_cycle_transitive() {
3520 let yaml = r#"
3522title: detection rule
3523id: det-rule
3524logsource:
3525 product: test
3526detection:
3527 selection:
3528 action: click
3529 condition: selection
3530level: low
3531---
3532title: correlation A
3533id: corr-a
3534correlation:
3535 type: event_count
3536 rules:
3537 - corr-c
3538 group-by:
3539 - User
3540 timespan: 5m
3541 condition:
3542 gte: 2
3543level: high
3544---
3545title: correlation B
3546id: corr-b
3547correlation:
3548 type: event_count
3549 rules:
3550 - corr-a
3551 group-by:
3552 - User
3553 timespan: 5m
3554 condition:
3555 gte: 2
3556level: high
3557---
3558title: correlation C
3559id: corr-c
3560correlation:
3561 type: event_count
3562 rules:
3563 - corr-b
3564 group-by:
3565 - User
3566 timespan: 5m
3567 condition:
3568 gte: 2
3569level: high
3570"#;
3571 let collection = parse_sigma_yaml(yaml).unwrap();
3572 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3573 let result = engine.add_collection(&collection);
3574 assert!(result.is_err(), "should detect transitive cycle");
3575 let err = result.unwrap_err().to_string();
3576 assert!(err.contains("cycle"), "error should mention cycle: {err}");
3577 }
3578
3579 #[test]
3584 fn test_correlation_events_disabled_by_default() {
3585 let yaml = r#"
3586title: Login
3587id: login-rule
3588logsource:
3589 category: auth
3590detection:
3591 selection:
3592 EventType: login
3593 condition: selection
3594---
3595title: Many Logins
3596correlation:
3597 type: event_count
3598 rules:
3599 - login-rule
3600 group-by:
3601 - User
3602 timespan: 60s
3603 condition:
3604 gte: 3
3605level: high
3606"#;
3607 let collection = parse_sigma_yaml(yaml).unwrap();
3608 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3609 engine.add_collection(&collection).unwrap();
3610
3611 for i in 0..3 {
3612 let v = json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i});
3613 let event = Event::from_value(&v);
3614 let result = engine.process_event_at(&event, 1000 + i);
3615 if i == 2 {
3616 assert_eq!(result.correlations.len(), 1);
3617 assert!(result.correlations[0].events.is_none());
3619 }
3620 }
3621 }
3622
3623 #[test]
3624 fn test_correlation_events_included_when_enabled() {
3625 let yaml = r#"
3626title: Login
3627id: login-rule
3628logsource:
3629 category: auth
3630detection:
3631 selection:
3632 EventType: login
3633 condition: selection
3634---
3635title: Many Logins
3636correlation:
3637 type: event_count
3638 rules:
3639 - login-rule
3640 group-by:
3641 - User
3642 timespan: 60s
3643 condition:
3644 gte: 3
3645level: high
3646"#;
3647 let collection = parse_sigma_yaml(yaml).unwrap();
3648 let config = CorrelationConfig {
3649 correlation_event_mode: CorrelationEventMode::Full,
3650 max_correlation_events: 10,
3651 ..Default::default()
3652 };
3653 let mut engine = CorrelationEngine::new(config);
3654 engine.add_collection(&collection).unwrap();
3655
3656 let events_sent: Vec<serde_json::Value> = (0..3)
3657 .map(|i| json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i}))
3658 .collect();
3659
3660 let mut corr_result = None;
3661 for (i, ev) in events_sent.iter().enumerate() {
3662 let event = Event::from_value(ev);
3663 let result = engine.process_event_at(&event, 1000 + i as i64);
3664 if !result.correlations.is_empty() {
3665 corr_result = Some(result);
3666 }
3667 }
3668
3669 let result = corr_result.expect("correlation should have fired");
3670 let corr = &result.correlations[0];
3671
3672 let events = corr.events.as_ref().expect("events should be present");
3674 assert_eq!(
3675 events.len(),
3676 3,
3677 "all 3 contributing events should be stored"
3678 );
3679
3680 for (i, event) in events.iter().enumerate() {
3682 assert_eq!(event["EventType"], "login");
3683 assert_eq!(event["User"], "admin");
3684 assert_eq!(event["@timestamp"], 1000 + i as i64);
3685 }
3686 }
3687
3688 #[test]
3689 fn test_correlation_events_max_cap() {
3690 let yaml = r#"
3691title: Login
3692id: login-rule
3693logsource:
3694 category: auth
3695detection:
3696 selection:
3697 EventType: login
3698 condition: selection
3699---
3700title: Many Logins
3701correlation:
3702 type: event_count
3703 rules:
3704 - login-rule
3705 group-by:
3706 - User
3707 timespan: 60s
3708 condition:
3709 gte: 5
3710level: high
3711"#;
3712 let collection = parse_sigma_yaml(yaml).unwrap();
3713 let config = CorrelationConfig {
3714 correlation_event_mode: CorrelationEventMode::Full,
3715 max_correlation_events: 3, ..Default::default()
3717 };
3718 let mut engine = CorrelationEngine::new(config);
3719 engine.add_collection(&collection).unwrap();
3720
3721 let mut corr_result = None;
3722 for i in 0..5 {
3723 let v = json!({"EventType": "login", "User": "admin", "idx": i});
3724 let event = Event::from_value(&v);
3725 let result = engine.process_event_at(&event, 1000 + i);
3726 if !result.correlations.is_empty() {
3727 corr_result = Some(result);
3728 }
3729 }
3730
3731 let result = corr_result.expect("correlation should have fired");
3732 let events = result.correlations[0]
3733 .events
3734 .as_ref()
3735 .expect("events should be present");
3736
3737 assert_eq!(events.len(), 3);
3739 assert_eq!(events[0]["idx"], 2);
3740 assert_eq!(events[1]["idx"], 3);
3741 assert_eq!(events[2]["idx"], 4);
3742 }
3743
3744 #[test]
3745 fn test_correlation_events_with_reset_action() {
3746 let yaml = r#"
3747title: Login
3748id: login-rule
3749logsource:
3750 category: auth
3751detection:
3752 selection:
3753 EventType: login
3754 condition: selection
3755---
3756title: Many Logins
3757correlation:
3758 type: event_count
3759 rules:
3760 - login-rule
3761 group-by:
3762 - User
3763 timespan: 60s
3764 condition:
3765 gte: 2
3766level: high
3767"#;
3768 let collection = parse_sigma_yaml(yaml).unwrap();
3769 let config = CorrelationConfig {
3770 correlation_event_mode: CorrelationEventMode::Full,
3771 action_on_match: CorrelationAction::Reset,
3772 ..Default::default()
3773 };
3774 let mut engine = CorrelationEngine::new(config);
3775 engine.add_collection(&collection).unwrap();
3776
3777 for i in 0..2 {
3779 let v = json!({"EventType": "login", "User": "admin", "round": 1, "idx": i});
3780 let event = Event::from_value(&v);
3781 let result = engine.process_event_at(&event, 1000 + i);
3782 if i == 1 {
3783 assert_eq!(result.correlations.len(), 1);
3784 let events = result.correlations[0].events.as_ref().unwrap();
3785 assert_eq!(events.len(), 2);
3786 }
3787 }
3788
3789 let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 0});
3792 let event = Event::from_value(&v);
3793 let result = engine.process_event_at(&event, 1010);
3794 assert!(
3795 result.correlations.is_empty(),
3796 "should not fire with only 1 event after reset"
3797 );
3798
3799 let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 1});
3800 let event = Event::from_value(&v);
3801 let result = engine.process_event_at(&event, 1011);
3802 assert_eq!(result.correlations.len(), 1);
3803 let events = result.correlations[0].events.as_ref().unwrap();
3804 assert_eq!(events.len(), 2);
3805 assert_eq!(events[0]["round"], 2);
3807 assert_eq!(events[1]["round"], 2);
3808 }
3809
3810 #[test]
3811 fn test_correlation_events_with_set_include() {
3812 let yaml = r#"
3813title: Login
3814id: login-rule
3815logsource:
3816 category: auth
3817detection:
3818 selection:
3819 EventType: login
3820 condition: selection
3821---
3822title: Many Logins
3823correlation:
3824 type: event_count
3825 rules:
3826 - login-rule
3827 group-by:
3828 - User
3829 timespan: 60s
3830 condition:
3831 gte: 2
3832level: high
3833"#;
3834 let collection = parse_sigma_yaml(yaml).unwrap();
3835 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3836 engine.add_collection(&collection).unwrap();
3837
3838 engine.set_correlation_event_mode(CorrelationEventMode::Full);
3840
3841 for i in 0..2 {
3842 let v = json!({"EventType": "login", "User": "admin"});
3843 let event = Event::from_value(&v);
3844 let result = engine.process_event_at(&event, 1000 + i);
3845 if i == 1 {
3846 assert_eq!(result.correlations.len(), 1);
3847 assert!(result.correlations[0].events.is_some());
3848 assert_eq!(result.correlations[0].events.as_ref().unwrap().len(), 2);
3849 }
3850 }
3851 }
3852
3853 #[test]
3854 fn test_correlation_events_eviction_syncs_with_window() {
3855 let yaml = r#"
3856title: Login
3857id: login-rule
3858logsource:
3859 category: auth
3860detection:
3861 selection:
3862 EventType: login
3863 condition: selection
3864---
3865title: Many Logins
3866correlation:
3867 type: event_count
3868 rules:
3869 - login-rule
3870 group-by:
3871 - User
3872 timespan: 10s
3873 condition:
3874 gte: 3
3875level: high
3876"#;
3877 let collection = parse_sigma_yaml(yaml).unwrap();
3878 let config = CorrelationConfig {
3879 correlation_event_mode: CorrelationEventMode::Full,
3880 max_correlation_events: 100,
3881 ..Default::default()
3882 };
3883 let mut engine = CorrelationEngine::new(config);
3884 engine.add_collection(&collection).unwrap();
3885
3886 for i in 0..2 {
3888 let v = json!({"EventType": "login", "User": "admin", "idx": i});
3889 let event = Event::from_value(&v);
3890 engine.process_event_at(&event, 1000 + i);
3891 }
3892
3893 let v = json!({"EventType": "login", "User": "admin", "idx": 2});
3896 let event = Event::from_value(&v);
3897 let result = engine.process_event_at(&event, 1015);
3898 assert!(
3900 result.correlations.is_empty(),
3901 "should not fire — old events evicted"
3902 );
3903
3904 for i in 3..5 {
3906 let v = json!({"EventType": "login", "User": "admin", "idx": i});
3907 let event = Event::from_value(&v);
3908 let result = engine.process_event_at(&event, 1016 + i - 3);
3909 if i == 4 {
3910 assert_eq!(result.correlations.len(), 1);
3911 let events = result.correlations[0].events.as_ref().unwrap();
3912 assert_eq!(events.len(), 3);
3914 for ev in events {
3915 assert!(ev["idx"].as_i64().unwrap() >= 2);
3916 }
3917 }
3918 }
3919 }
3920
3921 #[test]
3922 fn test_event_buffer_monitoring() {
3923 let yaml = r#"
3924title: Login
3925id: login-rule
3926logsource:
3927 category: auth
3928detection:
3929 selection:
3930 EventType: login
3931 condition: selection
3932---
3933title: Many Logins
3934correlation:
3935 type: event_count
3936 rules:
3937 - login-rule
3938 group-by:
3939 - User
3940 timespan: 60s
3941 condition:
3942 gte: 100
3943level: high
3944"#;
3945 let collection = parse_sigma_yaml(yaml).unwrap();
3946 let config = CorrelationConfig {
3947 correlation_event_mode: CorrelationEventMode::Full,
3948 ..Default::default()
3949 };
3950 let mut engine = CorrelationEngine::new(config);
3951 engine.add_collection(&collection).unwrap();
3952
3953 assert_eq!(engine.event_buffer_count(), 0);
3954 assert_eq!(engine.event_buffer_bytes(), 0);
3955
3956 for i in 0..5 {
3958 let v = json!({"EventType": "login", "User": "admin"});
3959 let event = Event::from_value(&v);
3960 engine.process_event_at(&event, 1000 + i);
3961 }
3962
3963 assert_eq!(engine.event_buffer_count(), 1); assert!(engine.event_buffer_bytes() > 0);
3965 }
3966
3967 #[test]
3968 fn test_correlation_refs_mode_basic() {
3969 let yaml = r#"
3970title: Login
3971id: login-rule
3972logsource:
3973 category: auth
3974detection:
3975 selection:
3976 EventType: login
3977 condition: selection
3978---
3979title: Many Logins
3980correlation:
3981 type: event_count
3982 rules:
3983 - login-rule
3984 group-by:
3985 - User
3986 timespan: 60s
3987 condition:
3988 gte: 3
3989level: high
3990"#;
3991 let collection = parse_sigma_yaml(yaml).unwrap();
3992 let config = CorrelationConfig {
3993 correlation_event_mode: CorrelationEventMode::Refs,
3994 max_correlation_events: 10,
3995 ..Default::default()
3996 };
3997 let mut engine = CorrelationEngine::new(config);
3998 engine.add_collection(&collection).unwrap();
3999
4000 let mut corr_result = None;
4001 for i in 0..3 {
4002 let v = json!({"EventType": "login", "User": "admin", "id": format!("evt-{i}"), "@timestamp": 1000 + i});
4003 let event = Event::from_value(&v);
4004 let result = engine.process_event_at(&event, 1000 + i);
4005 if !result.correlations.is_empty() {
4006 corr_result = Some(result.correlations[0].clone());
4007 }
4008 }
4009
4010 let result = corr_result.expect("correlation should have fired");
4011 assert!(
4013 result.events.is_none(),
4014 "Full events should not be included in refs mode"
4015 );
4016 let refs = result
4017 .event_refs
4018 .expect("event_refs should be present in refs mode");
4019 assert_eq!(refs.len(), 3);
4020 assert_eq!(refs[0].timestamp, 1000);
4021 assert_eq!(refs[0].id, Some("evt-0".to_string()));
4022 assert_eq!(refs[1].id, Some("evt-1".to_string()));
4023 assert_eq!(refs[2].id, Some("evt-2".to_string()));
4024 }
4025
4026 #[test]
4027 fn test_correlation_refs_mode_no_id_field() {
4028 let yaml = r#"
4029title: Login
4030id: login-rule
4031logsource:
4032 category: auth
4033detection:
4034 selection:
4035 EventType: login
4036 condition: selection
4037---
4038title: Many Logins
4039correlation:
4040 type: event_count
4041 rules:
4042 - login-rule
4043 group-by:
4044 - User
4045 timespan: 60s
4046 condition:
4047 gte: 2
4048level: high
4049"#;
4050 let collection = parse_sigma_yaml(yaml).unwrap();
4051 let config = CorrelationConfig {
4052 correlation_event_mode: CorrelationEventMode::Refs,
4053 ..Default::default()
4054 };
4055 let mut engine = CorrelationEngine::new(config);
4056 engine.add_collection(&collection).unwrap();
4057
4058 let mut corr_result = None;
4059 for i in 0..2 {
4060 let v = json!({"EventType": "login", "User": "admin"});
4061 let event = Event::from_value(&v);
4062 let result = engine.process_event_at(&event, 1000 + i);
4063 if !result.correlations.is_empty() {
4064 corr_result = Some(result.correlations[0].clone());
4065 }
4066 }
4067
4068 let result = corr_result.expect("correlation should have fired");
4069 let refs = result.event_refs.expect("event_refs should be present");
4070 for r in &refs {
4072 assert_eq!(r.id, None);
4073 }
4074 }
4075
4076 #[test]
4077 fn test_per_correlation_custom_attributes_from_yaml() {
4078 let yaml = r#"
4079title: Login
4080id: login-rule
4081logsource:
4082 category: auth
4083detection:
4084 selection:
4085 EventType: login
4086 condition: selection
4087---
4088title: Many Logins
4089custom_attributes:
4090 rsigma.correlation_event_mode: refs
4091 rsigma.max_correlation_events: "5"
4092correlation:
4093 type: event_count
4094 rules:
4095 - login-rule
4096 group-by:
4097 - User
4098 timespan: 60s
4099 condition:
4100 gte: 3
4101level: high
4102"#;
4103 let collection = parse_sigma_yaml(yaml).unwrap();
4104 let config = CorrelationConfig::default();
4106 let mut engine = CorrelationEngine::new(config);
4107 engine.add_collection(&collection).unwrap();
4108
4109 let mut corr_result = None;
4110 for i in 0..3 {
4111 let v = json!({"EventType": "login", "User": "admin", "id": format!("e{i}")});
4112 let event = Event::from_value(&v);
4113 let result = engine.process_event_at(&event, 1000 + i);
4114 if !result.correlations.is_empty() {
4115 corr_result = Some(result.correlations[0].clone());
4116 }
4117 }
4118
4119 let result = corr_result.expect("correlation should fire with per-correlation refs mode");
4120 assert!(result.events.is_none());
4122 let refs = result
4123 .event_refs
4124 .expect("event_refs via per-correlation override");
4125 assert_eq!(refs.len(), 3);
4126 assert_eq!(refs[0].id, Some("e0".to_string()));
4127 }
4128
4129 #[test]
4130 fn test_per_correlation_custom_attr_suppress_and_action() {
4131 let yaml = r#"
4132title: Login
4133id: login-rule
4134logsource:
4135 category: auth
4136detection:
4137 selection:
4138 EventType: login
4139 condition: selection
4140---
4141title: Many Logins
4142custom_attributes:
4143 rsigma.suppress: 10s
4144 rsigma.action: reset
4145correlation:
4146 type: event_count
4147 rules:
4148 - login-rule
4149 group-by:
4150 - User
4151 timespan: 60s
4152 condition:
4153 gte: 2
4154level: high
4155"#;
4156 let collection = parse_sigma_yaml(yaml).unwrap();
4157 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
4158 engine.add_collection(&collection).unwrap();
4159
4160 assert_eq!(engine.correlations[0].suppress_secs, Some(10));
4162 assert_eq!(
4163 engine.correlations[0].action,
4164 Some(CorrelationAction::Reset)
4165 );
4166 }
4167
4168 #[test]
4169 fn test_process_with_detections_matches_process_event_at() {
4170 let yaml = r#"
4171title: Login Failure
4172id: login-fail
4173logsource:
4174 category: auth
4175detection:
4176 selection:
4177 EventType: login_failure
4178 condition: selection
4179---
4180title: Brute Force
4181correlation:
4182 type: event_count
4183 rules:
4184 - login-fail
4185 group-by:
4186 - User
4187 timespan: 60s
4188 condition:
4189 gte: 3
4190level: high
4191"#;
4192 let collection = parse_sigma_yaml(yaml).unwrap();
4193
4194 let mut engine1 = CorrelationEngine::new(CorrelationConfig::default());
4196 engine1.add_collection(&collection).unwrap();
4197
4198 let events: Vec<serde_json::Value> = (0..5)
4199 .map(|i| json!({"EventType": "login_failure", "User": "admin", "@timestamp": format!("2025-01-01T00:00:0{}Z", i + 1)}))
4200 .collect();
4201
4202 let results1: Vec<ProcessResult> = events
4203 .iter()
4204 .enumerate()
4205 .map(|(i, v)| {
4206 let e = Event::from_value(v);
4207 engine1.process_event_at(&e, 1000 + i as i64)
4208 })
4209 .collect();
4210
4211 let mut engine2 = CorrelationEngine::new(CorrelationConfig::default());
4213 engine2.add_collection(&collection).unwrap();
4214
4215 let results2: Vec<ProcessResult> = events
4216 .iter()
4217 .enumerate()
4218 .map(|(i, v)| {
4219 let e = Event::from_value(v);
4220 let detections = engine2.evaluate(&e);
4221 engine2.process_with_detections(&e, detections, 1000 + i as i64)
4222 })
4223 .collect();
4224
4225 assert_eq!(results1.len(), results2.len());
4227 for (r1, r2) in results1.iter().zip(results2.iter()) {
4228 assert_eq!(r1.detections.len(), r2.detections.len());
4229 assert_eq!(r1.correlations.len(), r2.correlations.len());
4230 }
4231 }
4232
4233 #[test]
4234 fn test_process_batch_matches_sequential() {
4235 let yaml = r#"
4236title: Login Failure
4237id: login-fail-batch
4238logsource:
4239 category: auth
4240detection:
4241 selection:
4242 EventType: login_failure
4243 condition: selection
4244---
4245title: Brute Force Batch
4246correlation:
4247 type: event_count
4248 rules:
4249 - login-fail-batch
4250 group-by:
4251 - User
4252 timespan: 60s
4253 condition:
4254 gte: 3
4255level: high
4256"#;
4257 let collection = parse_sigma_yaml(yaml).unwrap();
4258
4259 let event_values: Vec<serde_json::Value> = (0..5)
4260 .map(|i| json!({"EventType": "login_failure", "User": "admin", "@timestamp": format!("2025-01-01T00:00:0{}Z", i + 1)}))
4261 .collect();
4262
4263 let mut engine1 = CorrelationEngine::new(CorrelationConfig::default());
4265 engine1.add_collection(&collection).unwrap();
4266 let sequential: Vec<ProcessResult> = event_values
4267 .iter()
4268 .enumerate()
4269 .map(|(i, v)| {
4270 let e = Event::from_value(v);
4271 engine1.process_event_at(&e, 1000 + i as i64)
4272 })
4273 .collect();
4274
4275 let mut engine2 = CorrelationEngine::new(CorrelationConfig::default());
4277 engine2.add_collection(&collection).unwrap();
4278 let events: Vec<Event> = event_values.iter().map(Event::from_value).collect();
4279 let refs: Vec<&Event> = events.iter().collect();
4280 let batch = engine2.process_batch(&refs);
4281
4282 assert_eq!(sequential.len(), batch.len());
4283 for (seq, bat) in sequential.iter().zip(batch.iter()) {
4284 assert_eq!(seq.detections.len(), bat.detections.len());
4285 assert_eq!(seq.correlations.len(), bat.correlations.len());
4286 }
4287 }
4288}