1use std::collections::HashMap;
16use std::sync::Arc;
17
18use chrono::{DateTime, TimeZone, Utc};
19use serde::Serialize;
20
21use rsigma_parser::{CorrelationRule, CorrelationType, Level, SigmaCollection, SigmaRule};
22
23use crate::correlation::{
24 CompiledCorrelation, EventBuffer, EventRef, EventRefBuffer, GroupKey, WindowState,
25 compile_correlation,
26};
27use crate::engine::Engine;
28use crate::error::{EvalError, Result};
29use crate::event::{Event, EventValue};
30use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
31use crate::result::MatchResult;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
42#[serde(rename_all = "snake_case")]
43pub enum CorrelationAction {
44 #[default]
47 Alert,
48 Reset,
51}
52
53impl std::str::FromStr for CorrelationAction {
54 type Err = String;
55 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
56 match s {
57 "alert" => Ok(CorrelationAction::Alert),
58 "reset" => Ok(CorrelationAction::Reset),
59 _ => Err(format!(
60 "Unknown correlation action: {s} (expected 'alert' or 'reset')"
61 )),
62 }
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
71#[serde(rename_all = "snake_case")]
72pub enum CorrelationEventMode {
73 #[default]
75 None,
76 Full,
79 Refs,
82}
83
84impl std::str::FromStr for CorrelationEventMode {
85 type Err = String;
86 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
87 match s.to_lowercase().as_str() {
88 "none" | "off" | "false" => Ok(CorrelationEventMode::None),
89 "full" | "true" => Ok(CorrelationEventMode::Full),
90 "refs" | "references" => Ok(CorrelationEventMode::Refs),
91 _ => Err(format!(
92 "Unknown correlation event mode: {s} (expected 'none', 'full', or 'refs')"
93 )),
94 }
95 }
96}
97
98impl std::fmt::Display for CorrelationEventMode {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 match self {
101 CorrelationEventMode::None => write!(f, "none"),
102 CorrelationEventMode::Full => write!(f, "full"),
103 CorrelationEventMode::Refs => write!(f, "refs"),
104 }
105 }
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
110pub enum TimestampFallback {
111 #[default]
113 WallClock,
114 Skip,
118}
119
120#[derive(Debug, Clone)]
126pub struct CorrelationConfig {
127 pub timestamp_fields: Vec<String>,
132
133 pub timestamp_fallback: TimestampFallback,
137
138 pub max_state_entries: usize,
143
144 pub suppress: Option<u64>,
152
153 pub action_on_match: CorrelationAction,
157
158 pub emit_detections: bool,
164
165 pub correlation_event_mode: CorrelationEventMode,
173
174 pub max_correlation_events: usize,
180}
181
182impl Default for CorrelationConfig {
183 fn default() -> Self {
184 CorrelationConfig {
185 timestamp_fields: vec![
186 "@timestamp".to_string(),
187 "timestamp".to_string(),
188 "EventTime".to_string(),
189 "TimeCreated".to_string(),
190 "eventTime".to_string(),
191 ],
192 timestamp_fallback: TimestampFallback::default(),
193 max_state_entries: 100_000,
194 suppress: None,
195 action_on_match: CorrelationAction::default(),
196 emit_detections: true,
197 correlation_event_mode: CorrelationEventMode::default(),
198 max_correlation_events: 10,
199 }
200 }
201}
202
203#[derive(Debug, Clone, Serialize)]
209pub struct ProcessResult {
210 pub detections: Vec<MatchResult>,
212 pub correlations: Vec<CorrelationResult>,
214}
215
216#[derive(Debug, Clone, Serialize)]
218pub struct CorrelationResult {
219 pub rule_title: String,
221 pub rule_id: Option<String>,
223 pub level: Option<Level>,
225 pub tags: Vec<String>,
227 pub correlation_type: CorrelationType,
229 pub group_key: Vec<(String, String)>,
231 pub aggregated_value: f64,
233 pub timespan_secs: u64,
235 #[serde(skip_serializing_if = "Option::is_none")]
240 pub events: Option<Vec<serde_json::Value>>,
241 #[serde(skip_serializing_if = "Option::is_none")]
245 pub event_refs: Option<Vec<EventRef>>,
246 #[serde(skip_serializing_if = "HashMap::is_empty")]
250 pub custom_attributes: Arc<HashMap<String, serde_json::Value>>,
251}
252
253pub struct CorrelationEngine {
262 engine: Engine,
264 correlations: Vec<CompiledCorrelation>,
266 rule_index: HashMap<String, Vec<usize>>,
269 rule_ids: Vec<(Option<String>, Option<String>)>,
272 state: HashMap<(usize, GroupKey), WindowState>,
274 last_alert: HashMap<(usize, GroupKey), i64>,
276 event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
278 event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
280 correlation_only_rules: std::collections::HashSet<String>,
284 config: CorrelationConfig,
286 pipelines: Vec<Pipeline>,
288}
289
290impl CorrelationEngine {
291 pub fn new(config: CorrelationConfig) -> Self {
293 CorrelationEngine {
294 engine: Engine::new(),
295 correlations: Vec::new(),
296 rule_index: HashMap::new(),
297 rule_ids: Vec::new(),
298 state: HashMap::new(),
299 last_alert: HashMap::new(),
300 event_buffers: HashMap::new(),
301 event_ref_buffers: HashMap::new(),
302 correlation_only_rules: std::collections::HashSet::new(),
303 config,
304 pipelines: Vec::new(),
305 }
306 }
307
308 pub fn add_pipeline(&mut self, pipeline: Pipeline) {
312 self.pipelines.push(pipeline);
313 self.pipelines.sort_by_key(|p| p.priority);
314 }
315
316 pub fn set_include_event(&mut self, include: bool) {
318 self.engine.set_include_event(include);
319 }
320
321 pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
327 self.config.correlation_event_mode = mode;
328 }
329
330 pub fn set_max_correlation_events(&mut self, max: usize) {
333 self.config.max_correlation_events = max;
334 }
335
336 pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
342 if self.pipelines.is_empty() {
343 self.apply_custom_attributes(&rule.custom_attributes);
344 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
345 self.engine.add_rule(rule)?;
346 } else {
347 let mut transformed = rule.clone();
348 apply_pipelines(&self.pipelines, &mut transformed)?;
349 self.apply_custom_attributes(&transformed.custom_attributes);
350 self.rule_ids
351 .push((transformed.id.clone(), transformed.name.clone()));
352 let compiled = crate::compiler::compile_rule(&transformed)?;
354 self.engine.add_compiled_rule(compiled);
355 }
356 Ok(())
357 }
358
359 fn apply_custom_attributes(
373 &mut self,
374 attrs: &std::collections::HashMap<String, serde_yaml::Value>,
375 ) {
376 if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
378 && !self.config.timestamp_fields.iter().any(|f| f == field)
379 {
380 self.config.timestamp_fields.insert(0, field.to_string());
381 }
382
383 if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
385 && self.config.suppress.is_none()
386 && let Ok(ts) = rsigma_parser::Timespan::parse(val)
387 {
388 self.config.suppress = Some(ts.seconds);
389 }
390
391 if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
393 && self.config.action_on_match == CorrelationAction::Alert
394 && let Ok(a) = val.parse::<CorrelationAction>()
395 {
396 self.config.action_on_match = a;
397 }
398 }
399
400 pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
402 let owned;
403 let effective = if self.pipelines.is_empty() {
404 corr
405 } else {
406 owned = {
407 let mut c = corr.clone();
408 apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
409 c
410 };
411 &owned
412 };
413
414 self.apply_custom_attributes(&effective.custom_attributes);
417
418 let compiled = compile_correlation(effective)?;
419 let idx = self.correlations.len();
420
421 for rule_ref in &compiled.rule_refs {
423 self.rule_index
424 .entry(rule_ref.clone())
425 .or_default()
426 .push(idx);
427 }
428
429 if !compiled.generate {
431 for rule_ref in &compiled.rule_refs {
432 self.correlation_only_rules.insert(rule_ref.clone());
433 }
434 }
435
436 self.correlations.push(compiled);
437 Ok(())
438 }
439
440 pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
445 for rule in &collection.rules {
446 self.add_rule(rule)?;
447 }
448 for filter in &collection.filters {
450 self.engine.apply_filter(filter)?;
451 }
452 for corr in &collection.correlations {
453 self.add_correlation(corr)?;
454 }
455 self.validate_rule_refs()?;
456 self.detect_correlation_cycles()?;
457 Ok(())
458 }
459
460 fn validate_rule_refs(&self) -> Result<()> {
463 let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
464
465 for (id, name) in &self.rule_ids {
466 if let Some(id) = id {
467 known.insert(id.as_str());
468 }
469 if let Some(name) = name {
470 known.insert(name.as_str());
471 }
472 }
473 for corr in &self.correlations {
474 if let Some(ref id) = corr.id {
475 known.insert(id.as_str());
476 }
477 if let Some(ref name) = corr.name {
478 known.insert(name.as_str());
479 }
480 }
481
482 for corr in &self.correlations {
483 for rule_ref in &corr.rule_refs {
484 if !known.contains(rule_ref.as_str()) {
485 return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
486 }
487 }
488 }
489 Ok(())
490 }
491
492 fn detect_correlation_cycles(&self) -> Result<()> {
500 let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
502 for (idx, corr) in self.correlations.iter().enumerate() {
503 if let Some(ref id) = corr.id {
504 corr_identifiers.insert(id.as_str(), idx);
505 }
506 if let Some(ref name) = corr.name {
507 corr_identifiers.insert(name.as_str(), idx);
508 }
509 }
510
511 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
513 for (idx, corr) in self.correlations.iter().enumerate() {
514 for rule_ref in &corr.rule_refs {
515 if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
516 adj[idx].push(target_idx);
517 }
518 }
519 }
520
521 let mut state = vec![0u8; self.correlations.len()]; let mut path: Vec<usize> = Vec::new();
524
525 for start in 0..self.correlations.len() {
526 if state[start] == 0
527 && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
528 {
529 let names: Vec<String> = cycle
530 .iter()
531 .map(|&i| {
532 self.correlations[i]
533 .id
534 .as_deref()
535 .or(self.correlations[i].name.as_deref())
536 .unwrap_or(&self.correlations[i].title)
537 .to_string()
538 })
539 .collect();
540 return Err(crate::error::EvalError::CorrelationCycle(
541 names.join(" -> "),
542 ));
543 }
544 }
545 Ok(())
546 }
547
548 fn dfs_find_cycle(
550 node: usize,
551 adj: &[Vec<usize>],
552 state: &mut [u8],
553 path: &mut Vec<usize>,
554 ) -> Option<Vec<usize>> {
555 state[node] = 1; path.push(node);
557
558 for &next in &adj[node] {
559 if state[next] == 1 {
560 if let Some(pos) = path.iter().position(|&n| n == next) {
562 let mut cycle = path[pos..].to_vec();
563 cycle.push(next); return Some(cycle);
565 }
566 }
567 if state[next] == 0
568 && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
569 {
570 return Some(cycle);
571 }
572 }
573
574 path.pop();
575 state[node] = 2; None
577 }
578
579 pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
585 let all_detections = self.engine.evaluate(event);
586
587 let ts = match self.extract_event_timestamp(event) {
588 Some(ts) => ts,
589 None => match self.config.timestamp_fallback {
590 TimestampFallback::WallClock => Utc::now().timestamp(),
591 TimestampFallback::Skip => {
592 let detections = self.filter_detections(all_detections);
594 return ProcessResult {
595 detections,
596 correlations: Vec::new(),
597 };
598 }
599 },
600 };
601 self.process_with_detections(event, all_detections, ts)
602 }
603
604 pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
609 let all_detections = self.engine.evaluate(event);
610 self.process_with_detections(event, all_detections, timestamp_secs)
611 }
612
613 pub fn process_with_detections(
619 &mut self,
620 event: &impl Event,
621 all_detections: Vec<MatchResult>,
622 timestamp_secs: i64,
623 ) -> ProcessResult {
624 let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
625
626 if self.state.len() >= self.config.max_state_entries {
628 self.evict_all(timestamp_secs);
629 }
630
631 let mut correlations = Vec::new();
633 self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
634
635 self.chain_correlations(&correlations, timestamp_secs);
637
638 let detections = self.filter_detections(all_detections);
640
641 ProcessResult {
642 detections,
643 correlations,
644 }
645 }
646
647 pub fn evaluate(&self, event: &impl Event) -> Vec<MatchResult> {
653 self.engine.evaluate(event)
654 }
655
656 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
664 let engine = &self.engine;
667 let ts_fields = &self.config.timestamp_fields;
668
669 let batch_results: Vec<(Vec<MatchResult>, Option<i64>)> = {
670 #[cfg(feature = "parallel")]
671 {
672 use rayon::prelude::*;
673 events
674 .par_iter()
675 .map(|e| {
676 let detections = engine.evaluate(e);
677 let ts = extract_event_ts(e, ts_fields);
678 (detections, ts)
679 })
680 .collect()
681 }
682 #[cfg(not(feature = "parallel"))]
683 {
684 events
685 .iter()
686 .map(|e| {
687 let detections = engine.evaluate(e);
688 let ts = extract_event_ts(e, ts_fields);
689 (detections, ts)
690 })
691 .collect()
692 }
693 };
694
695 let mut results = Vec::with_capacity(events.len());
697 for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
698 match ts_opt {
699 Some(ts) => {
700 results.push(self.process_with_detections(event, detections, ts));
701 }
702 None => match self.config.timestamp_fallback {
703 TimestampFallback::WallClock => {
704 let ts = Utc::now().timestamp();
705 results.push(self.process_with_detections(event, detections, ts));
706 }
707 TimestampFallback::Skip => {
708 let detections = self.filter_detections(detections);
710 results.push(ProcessResult {
711 detections,
712 correlations: Vec::new(),
713 });
714 }
715 },
716 }
717 }
718 results
719 }
720
721 fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
726 if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
727 all_detections
728 .into_iter()
729 .filter(|m| {
730 let id_match = m
731 .rule_id
732 .as_ref()
733 .is_some_and(|id| self.correlation_only_rules.contains(id));
734 !id_match
735 })
736 .collect()
737 } else {
738 all_detections
739 }
740 }
741
742 fn feed_detections(
744 &mut self,
745 event: &impl Event,
746 detections: &[MatchResult],
747 ts: i64,
748 out: &mut Vec<CorrelationResult>,
749 ) {
750 let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
753
754 for det in detections {
755 let (rule_id, rule_name) = self.find_rule_identity(det);
758
759 let mut corr_indices = Vec::new();
761 if let Some(ref id) = rule_id
762 && let Some(indices) = self.rule_index.get(id)
763 {
764 corr_indices.extend(indices);
765 }
766 if let Some(ref name) = rule_name
767 && let Some(indices) = self.rule_index.get(name)
768 {
769 corr_indices.extend(indices);
770 }
771
772 corr_indices.sort_unstable();
773 corr_indices.dedup();
774
775 for &corr_idx in &corr_indices {
776 work.push((corr_idx, rule_id.clone(), rule_name.clone()));
777 }
778 }
779
780 for (corr_idx, rule_id, rule_name) in work {
781 self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
782 }
783 }
784
785 fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
787 if let Some(ref match_id) = det.rule_id {
789 for (id, name) in &self.rule_ids {
790 if id.as_deref() == Some(match_id.as_str()) {
791 return (id.clone(), name.clone());
792 }
793 }
794 }
795 (det.rule_id.clone(), None)
797 }
798
799 fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
801 let corr = &self.correlations[corr_idx];
802 corr.event_mode
803 .unwrap_or(self.config.correlation_event_mode)
804 }
805
806 fn resolve_max_events(&self, corr_idx: usize) -> usize {
808 let corr = &self.correlations[corr_idx];
809 corr.max_events
810 .unwrap_or(self.config.max_correlation_events)
811 }
812
813 fn update_correlation(
815 &mut self,
816 corr_idx: usize,
817 event: &impl Event,
818 ts: i64,
819 rule_id: &Option<String>,
820 rule_name: &Option<String>,
821 out: &mut Vec<CorrelationResult>,
822 ) {
823 let corr = &self.correlations[corr_idx];
827 let corr_type = corr.correlation_type;
828 let timespan = corr.timespan_secs;
829 let level = corr.level;
830 let suppress_secs = corr.suppress_secs.or(self.config.suppress);
831 let action = corr.action.unwrap_or(self.config.action_on_match);
832 let event_mode = self.resolve_event_mode(corr_idx);
833 let max_events = self.resolve_max_events(corr_idx);
834
835 let mut ref_strs: Vec<&str> = Vec::new();
837 if let Some(id) = rule_id.as_deref() {
838 ref_strs.push(id);
839 }
840 if let Some(name) = rule_name.as_deref() {
841 ref_strs.push(name);
842 }
843 let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
844
845 let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
847
848 let state_key = (corr_idx, group_key.clone());
850 let state = self
851 .state
852 .entry(state_key.clone())
853 .or_insert_with(|| WindowState::new_for(corr_type));
854
855 let cutoff = ts - timespan as i64;
857 state.evict(cutoff);
858
859 match corr_type {
861 CorrelationType::EventCount => {
862 state.push_event_count(ts);
863 }
864 CorrelationType::ValueCount => {
865 if let Some(ref field_name) = corr.condition.field
866 && let Some(val) = event.get_field(field_name)
867 && let Some(s) = value_to_string_for_count(&val)
868 {
869 state.push_value_count(ts, s);
870 }
871 }
872 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
873 state.push_temporal(ts, rule_ref);
874 }
875 CorrelationType::ValueSum
876 | CorrelationType::ValueAvg
877 | CorrelationType::ValuePercentile
878 | CorrelationType::ValueMedian => {
879 if let Some(ref field_name) = corr.condition.field
880 && let Some(val) = event.get_field(field_name)
881 && let Some(n) = value_to_f64_ev(&val)
882 {
883 state.push_numeric(ts, n);
884 }
885 }
886 }
887
888 match event_mode {
890 CorrelationEventMode::Full => {
891 let buf = self
892 .event_buffers
893 .entry(state_key.clone())
894 .or_insert_with(|| EventBuffer::new(max_events));
895 buf.evict(cutoff);
896 let json = event.to_json();
897 buf.push(ts, &json);
898 }
899 CorrelationEventMode::Refs => {
900 let buf = self
901 .event_ref_buffers
902 .entry(state_key.clone())
903 .or_insert_with(|| EventRefBuffer::new(max_events));
904 buf.evict(cutoff);
905 let json = event.to_json();
906 buf.push(ts, &json);
907 }
908 CorrelationEventMode::None => {}
909 }
910
911 let fired = state.check_condition(
913 &corr.condition,
914 corr_type,
915 &corr.rule_refs,
916 corr.extended_expr.as_ref(),
917 );
918
919 if let Some(agg_value) = fired {
920 let alert_key = (corr_idx, group_key.clone());
921
922 let suppressed = if let Some(suppress) = suppress_secs {
924 if let Some(&last_ts) = self.last_alert.get(&alert_key) {
925 (ts - last_ts) < suppress as i64
926 } else {
927 false
928 }
929 } else {
930 false
931 };
932
933 if !suppressed {
934 let (events, event_refs) = match event_mode {
936 CorrelationEventMode::Full => {
937 let stored = self
938 .event_buffers
939 .get(&alert_key)
940 .map(|buf| buf.decompress_all())
941 .unwrap_or_default();
942 (Some(stored), None)
943 }
944 CorrelationEventMode::Refs => {
945 let stored = self
946 .event_ref_buffers
947 .get(&alert_key)
948 .map(|buf| buf.refs())
949 .unwrap_or_default();
950 (None, Some(stored))
951 }
952 CorrelationEventMode::None => (None, None),
953 };
954
955 let corr = &self.correlations[corr_idx];
957 let result = CorrelationResult {
958 rule_title: corr.title.clone(),
959 rule_id: corr.id.clone(),
960 level,
961 tags: corr.tags.clone(),
962 correlation_type: corr_type,
963 group_key: group_key.to_pairs(&corr.group_by),
964 aggregated_value: agg_value,
965 timespan_secs: timespan,
966 events,
967 event_refs,
968 custom_attributes: corr.custom_attributes.clone(),
969 };
970 out.push(result);
971
972 self.last_alert.insert(alert_key.clone(), ts);
974
975 if action == CorrelationAction::Reset {
977 if let Some(state) = self.state.get_mut(&alert_key) {
978 state.clear();
979 }
980 if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
981 buf.clear();
982 }
983 if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
984 buf.clear();
985 }
986 }
987 }
988 }
989 }
990
991 fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
996 const MAX_CHAIN_DEPTH: usize = 10;
997 let mut pending: Vec<CorrelationResult> = fired.to_vec();
998 let mut depth = 0;
999
1000 while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
1001 depth += 1;
1002
1003 #[allow(clippy::type_complexity)]
1005 let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
1006 for result in &pending {
1007 if let Some(ref id) = result.rule_id
1008 && let Some(indices) = self.rule_index.get(id)
1009 {
1010 let fired_ref = result
1011 .rule_id
1012 .as_deref()
1013 .unwrap_or(&result.rule_title)
1014 .to_string();
1015 for &corr_idx in indices {
1016 work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
1017 }
1018 }
1019 }
1020
1021 let mut next_pending = Vec::new();
1022 for (corr_idx, group_key_pairs, fired_ref) in work {
1023 let corr = &self.correlations[corr_idx];
1024 let corr_type = corr.correlation_type;
1025 let timespan = corr.timespan_secs;
1026 let level = corr.level;
1027
1028 let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
1029 let state_key = (corr_idx, group_key.clone());
1030 let state = self
1031 .state
1032 .entry(state_key)
1033 .or_insert_with(|| WindowState::new_for(corr_type));
1034
1035 let cutoff = ts - timespan as i64;
1036 state.evict(cutoff);
1037
1038 match corr_type {
1039 CorrelationType::EventCount => {
1040 state.push_event_count(ts);
1041 }
1042 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
1043 state.push_temporal(ts, &fired_ref);
1044 }
1045 _ => {
1046 state.push_event_count(ts);
1047 }
1048 }
1049
1050 let fired = state.check_condition(
1051 &corr.condition,
1052 corr_type,
1053 &corr.rule_refs,
1054 corr.extended_expr.as_ref(),
1055 );
1056
1057 if let Some(agg_value) = fired {
1058 let corr = &self.correlations[corr_idx];
1059 next_pending.push(CorrelationResult {
1060 rule_title: corr.title.clone(),
1061 rule_id: corr.id.clone(),
1062 level,
1063 tags: corr.tags.clone(),
1064 correlation_type: corr_type,
1065 group_key: group_key.to_pairs(&corr.group_by),
1066 aggregated_value: agg_value,
1067 timespan_secs: timespan,
1068 events: None,
1071 event_refs: None,
1072 custom_attributes: corr.custom_attributes.clone(),
1073 });
1074 }
1075 }
1076
1077 pending = next_pending;
1078 }
1079
1080 if !pending.is_empty() {
1081 log::warn!(
1082 "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
1083 {} pending result(s) were not propagated further. \
1084 This may indicate a cycle in correlation references.",
1085 pending.len()
1086 );
1087 }
1088 }
1089
1090 fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
1102 for field_name in &self.config.timestamp_fields {
1103 if let Some(val) = event.get_field(field_name)
1104 && let Some(ts) = parse_timestamp_value(&val)
1105 {
1106 return Some(ts);
1107 }
1108 }
1109 None
1110 }
1111
1112 pub fn evict_expired(&mut self, now_secs: i64) {
1118 self.evict_all(now_secs);
1119 }
1120
1121 fn evict_all(&mut self, now_secs: i64) {
1123 let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
1125
1126 self.state.retain(|&(corr_idx, _), state| {
1127 if corr_idx < timespans.len() {
1128 let cutoff = now_secs - timespans[corr_idx] as i64;
1129 state.evict(cutoff);
1130 }
1131 !state.is_empty()
1132 });
1133
1134 self.event_buffers.retain(|&(corr_idx, _), buf| {
1136 if corr_idx < timespans.len() {
1137 let cutoff = now_secs - timespans[corr_idx] as i64;
1138 buf.evict(cutoff);
1139 }
1140 !buf.is_empty()
1141 });
1142 self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
1143 if corr_idx < timespans.len() {
1144 let cutoff = now_secs - timespans[corr_idx] as i64;
1145 buf.evict(cutoff);
1146 }
1147 !buf.is_empty()
1148 });
1149
1150 if self.state.len() >= self.config.max_state_entries {
1154 let target = self.config.max_state_entries * 9 / 10;
1155 let excess = self.state.len() - target;
1156
1157 let mut by_staleness: Vec<_> = self
1159 .state
1160 .iter()
1161 .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1162 .collect();
1163 by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1164
1165 for (key, _) in by_staleness.into_iter().take(excess) {
1167 self.state.remove(&key);
1168 self.last_alert.remove(&key);
1169 self.event_buffers.remove(&key);
1170 self.event_ref_buffers.remove(&key);
1171 }
1172 }
1173
1174 self.last_alert.retain(|key, &mut alert_ts| {
1177 let suppress = if key.0 < self.correlations.len() {
1178 self.correlations[key.0]
1179 .suppress_secs
1180 .or(self.config.suppress)
1181 .unwrap_or(0)
1182 } else {
1183 0
1184 };
1185 (now_secs - alert_ts) < suppress as i64
1186 });
1187 }
1188
1189 pub fn state_count(&self) -> usize {
1191 self.state.len()
1192 }
1193
1194 pub fn detection_rule_count(&self) -> usize {
1196 self.engine.rule_count()
1197 }
1198
1199 pub fn correlation_rule_count(&self) -> usize {
1201 self.correlations.len()
1202 }
1203
1204 pub fn event_buffer_count(&self) -> usize {
1206 self.event_buffers.len()
1207 }
1208
1209 pub fn event_buffer_bytes(&self) -> usize {
1211 self.event_buffers
1212 .values()
1213 .map(|b| b.compressed_bytes())
1214 .sum()
1215 }
1216
1217 pub fn event_ref_buffer_count(&self) -> usize {
1219 self.event_ref_buffers.len()
1220 }
1221
1222 pub fn engine(&self) -> &Engine {
1224 &self.engine
1225 }
1226
1227 pub fn export_state(&self) -> CorrelationSnapshot {
1233 let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1234 for ((idx, gk), ws) in &self.state {
1235 let corr_id = self.correlation_stable_id(*idx);
1236 windows
1237 .entry(corr_id)
1238 .or_default()
1239 .push((gk.clone(), ws.clone()));
1240 }
1241
1242 let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1243 for ((idx, gk), ts) in &self.last_alert {
1244 let corr_id = self.correlation_stable_id(*idx);
1245 last_alert
1246 .entry(corr_id)
1247 .or_default()
1248 .push((gk.clone(), *ts));
1249 }
1250
1251 let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1252 for ((idx, gk), buf) in &self.event_buffers {
1253 let corr_id = self.correlation_stable_id(*idx);
1254 event_buffers
1255 .entry(corr_id)
1256 .or_default()
1257 .push((gk.clone(), buf.clone()));
1258 }
1259
1260 let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1261 HashMap::new();
1262 for ((idx, gk), buf) in &self.event_ref_buffers {
1263 let corr_id = self.correlation_stable_id(*idx);
1264 event_ref_buffers
1265 .entry(corr_id)
1266 .or_default()
1267 .push((gk.clone(), buf.clone()));
1268 }
1269
1270 CorrelationSnapshot {
1271 version: SNAPSHOT_VERSION,
1272 windows,
1273 last_alert,
1274 event_buffers,
1275 event_ref_buffers,
1276 }
1277 }
1278
1279 pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1286 if snapshot.version != SNAPSHOT_VERSION {
1287 return false;
1288 }
1289 let id_to_idx = self.build_id_to_index_map();
1290
1291 for (corr_id, groups) in snapshot.windows {
1292 if let Some(&idx) = id_to_idx.get(&corr_id) {
1293 for (gk, ws) in groups {
1294 self.state.insert((idx, gk), ws);
1295 }
1296 }
1297 }
1298
1299 for (corr_id, groups) in snapshot.last_alert {
1300 if let Some(&idx) = id_to_idx.get(&corr_id) {
1301 for (gk, ts) in groups {
1302 self.last_alert.insert((idx, gk), ts);
1303 }
1304 }
1305 }
1306
1307 for (corr_id, groups) in snapshot.event_buffers {
1308 if let Some(&idx) = id_to_idx.get(&corr_id) {
1309 for (gk, buf) in groups {
1310 self.event_buffers.insert((idx, gk), buf);
1311 }
1312 }
1313 }
1314
1315 for (corr_id, groups) in snapshot.event_ref_buffers {
1316 if let Some(&idx) = id_to_idx.get(&corr_id) {
1317 for (gk, buf) in groups {
1318 self.event_ref_buffers.insert((idx, gk), buf);
1319 }
1320 }
1321 }
1322
1323 true
1324 }
1325
1326 fn correlation_stable_id(&self, idx: usize) -> String {
1328 let corr = &self.correlations[idx];
1329 corr.id
1330 .clone()
1331 .or_else(|| corr.name.clone())
1332 .unwrap_or_else(|| corr.title.clone())
1333 }
1334
1335 fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1337 self.correlations
1338 .iter()
1339 .enumerate()
1340 .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1341 .collect()
1342 }
1343}
1344
1345const SNAPSHOT_VERSION: u32 = 1;
1347
1348#[derive(Debug, Clone, Serialize, serde::Deserialize)]
1355pub struct CorrelationSnapshot {
1356 #[serde(default = "default_snapshot_version")]
1358 pub version: u32,
1359 pub windows: HashMap<String, Vec<(GroupKey, WindowState)>>,
1361 pub last_alert: HashMap<String, Vec<(GroupKey, i64)>>,
1363 pub event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>>,
1365 pub event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>>,
1367}
1368
1369fn default_snapshot_version() -> u32 {
1370 1
1371}
1372
1373impl Default for CorrelationEngine {
1374 fn default() -> Self {
1375 Self::new(CorrelationConfig::default())
1376 }
1377}
1378
1379fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1388 for field_name in timestamp_fields {
1389 if let Some(val) = event.get_field(field_name)
1390 && let Some(ts) = parse_timestamp_value(&val)
1391 {
1392 return Some(ts);
1393 }
1394 }
1395 None
1396}
1397
1398fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1400 match val {
1401 EventValue::Int(i) => Some(normalize_epoch(*i)),
1402 EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1403 EventValue::Str(s) => parse_timestamp_string(s),
1404 _ => None,
1405 }
1406}
1407
1408fn normalize_epoch(v: i64) -> i64 {
1411 if v > 1_000_000_000_000 { v / 1000 } else { v }
1412}
1413
1414fn parse_timestamp_string(s: &str) -> Option<i64> {
1416 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1418 return Some(dt.timestamp());
1419 }
1420
1421 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1424 return Some(Utc.from_utc_datetime(&naive).timestamp());
1425 }
1426 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1427 return Some(Utc.from_utc_datetime(&naive).timestamp());
1428 }
1429
1430 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1432 return Some(Utc.from_utc_datetime(&naive).timestamp());
1433 }
1434 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1435 return Some(Utc.from_utc_datetime(&naive).timestamp());
1436 }
1437
1438 None
1439}
1440
1441fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1443 match v {
1444 EventValue::Str(s) => Some(s.to_string()),
1445 EventValue::Int(n) => Some(n.to_string()),
1446 EventValue::Float(f) => Some(f.to_string()),
1447 EventValue::Bool(b) => Some(b.to_string()),
1448 EventValue::Null => Some("null".to_string()),
1449 _ => None,
1450 }
1451}
1452
1453fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1455 v.as_f64()
1456}
1457
1458#[cfg(test)]
1463mod tests {
1464 use super::*;
1465 use crate::event::JsonEvent;
1466 use rsigma_parser::parse_sigma_yaml;
1467 use serde_json::json;
1468
1469 #[test]
1474 fn test_parse_timestamp_epoch_secs() {
1475 let val = EventValue::Int(1720612200);
1476 assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1477 }
1478
1479 #[test]
1480 fn test_parse_timestamp_epoch_millis() {
1481 let val = EventValue::Int(1720612200000);
1482 assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1483 }
1484
1485 #[test]
1486 fn test_parse_timestamp_rfc3339() {
1487 let val = EventValue::Str(std::borrow::Cow::Borrowed("2024-07-10T12:30:00Z"));
1488 let ts = parse_timestamp_value(&val).unwrap();
1489 assert_eq!(ts, 1720614600);
1490 }
1491
1492 #[test]
1493 fn test_parse_timestamp_naive() {
1494 let val = EventValue::Str(std::borrow::Cow::Borrowed("2024-07-10T12:30:00"));
1495 let ts = parse_timestamp_value(&val).unwrap();
1496 assert_eq!(ts, 1720614600);
1497 }
1498
1499 #[test]
1500 fn test_parse_timestamp_with_space() {
1501 let val = EventValue::Str(std::borrow::Cow::Borrowed("2024-07-10 12:30:00"));
1502 let ts = parse_timestamp_value(&val).unwrap();
1503 assert_eq!(ts, 1720614600);
1504 }
1505
1506 #[test]
1507 fn test_parse_timestamp_fractional() {
1508 let val = EventValue::Str(std::borrow::Cow::Borrowed("2024-07-10T12:30:00.123Z"));
1509 let ts = parse_timestamp_value(&val).unwrap();
1510 assert_eq!(ts, 1720614600);
1511 }
1512
1513 #[test]
1514 fn test_extract_timestamp_from_event() {
1515 let config = CorrelationConfig {
1516 timestamp_fields: vec!["@timestamp".to_string()],
1517 max_state_entries: 100_000,
1518 ..Default::default()
1519 };
1520 let engine = CorrelationEngine::new(config);
1521
1522 let v = json!({"@timestamp": "2024-07-10T12:30:00Z", "data": "test"});
1523 let event = JsonEvent::borrow(&v);
1524 let ts = engine.extract_event_timestamp(&event);
1525 assert_eq!(ts, Some(1720614600));
1526 }
1527
1528 #[test]
1529 fn test_extract_timestamp_fallback_fields() {
1530 let config = CorrelationConfig {
1531 timestamp_fields: vec![
1532 "@timestamp".to_string(),
1533 "timestamp".to_string(),
1534 "EventTime".to_string(),
1535 ],
1536 max_state_entries: 100_000,
1537 ..Default::default()
1538 };
1539 let engine = CorrelationEngine::new(config);
1540
1541 let v = json!({"timestamp": 1720613400, "data": "test"});
1543 let event = JsonEvent::borrow(&v);
1544 let ts = engine.extract_event_timestamp(&event);
1545 assert_eq!(ts, Some(1720613400));
1546 }
1547
1548 #[test]
1549 fn test_extract_timestamp_returns_none_when_missing() {
1550 let config = CorrelationConfig {
1551 timestamp_fields: vec!["@timestamp".to_string()],
1552 ..Default::default()
1553 };
1554 let engine = CorrelationEngine::new(config);
1555
1556 let v = json!({"data": "no timestamp here"});
1557 let event = JsonEvent::borrow(&v);
1558 assert_eq!(engine.extract_event_timestamp(&event), None);
1559 }
1560
1561 #[test]
1562 fn test_timestamp_fallback_skip() {
1563 let yaml = r#"
1564title: test rule
1565id: ts-skip-rule
1566logsource:
1567 product: test
1568detection:
1569 selection:
1570 action: click
1571 condition: selection
1572level: low
1573---
1574title: test correlation
1575correlation:
1576 type: event_count
1577 rules:
1578 - ts-skip-rule
1579 group-by:
1580 - User
1581 timespan: 10s
1582 condition:
1583 gte: 2
1584level: high
1585"#;
1586 let collection = parse_sigma_yaml(yaml).unwrap();
1587 let mut engine = CorrelationEngine::new(CorrelationConfig {
1588 timestamp_fallback: TimestampFallback::Skip,
1589 ..Default::default()
1590 });
1591 engine.add_collection(&collection).unwrap();
1592 assert_eq!(engine.correlation_rule_count(), 1);
1593
1594 let v = json!({"action": "click", "User": "alice"});
1596 let event = JsonEvent::borrow(&v);
1597
1598 let r1 = engine.process_event(&event);
1599 assert!(!r1.detections.is_empty(), "detection should still fire");
1600
1601 let r2 = engine.process_event(&event);
1602 assert!(!r2.detections.is_empty(), "detection should still fire");
1603
1604 let r3 = engine.process_event(&event);
1605 assert!(!r3.detections.is_empty(), "detection should still fire");
1606
1607 assert!(r1.correlations.is_empty());
1609 assert!(r2.correlations.is_empty());
1610 assert!(r3.correlations.is_empty());
1611 }
1612
1613 #[test]
1614 fn test_timestamp_fallback_wallclock_default() {
1615 let yaml = r#"
1616title: test rule
1617id: ts-wc-rule
1618logsource:
1619 product: test
1620detection:
1621 selection:
1622 action: click
1623 condition: selection
1624level: low
1625---
1626title: test correlation
1627correlation:
1628 type: event_count
1629 rules:
1630 - ts-wc-rule
1631 group-by:
1632 - User
1633 timespan: 60s
1634 condition:
1635 gte: 2
1636level: high
1637"#;
1638 let collection = parse_sigma_yaml(yaml).unwrap();
1639 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1640 engine.add_collection(&collection).unwrap();
1641 assert_eq!(engine.correlation_rule_count(), 1);
1642
1643 let v = json!({"action": "click", "User": "alice"});
1646 let event = JsonEvent::borrow(&v);
1647
1648 let _r1 = engine.process_event(&event);
1649 let _r2 = engine.process_event(&event);
1650 let r3 = engine.process_event(&event);
1651
1652 assert!(
1654 !r3.correlations.is_empty(),
1655 "WallClock fallback should allow correlation"
1656 );
1657 }
1658
1659 #[test]
1664 fn test_event_count_basic() {
1665 let yaml = r#"
1666title: Base Rule
1667id: base-rule-001
1668name: base_rule
1669logsource:
1670 product: windows
1671 category: process_creation
1672detection:
1673 selection:
1674 CommandLine|contains: 'whoami'
1675 condition: selection
1676level: low
1677---
1678title: Multiple Whoami
1679id: corr-001
1680correlation:
1681 type: event_count
1682 rules:
1683 - base-rule-001
1684 group-by:
1685 - User
1686 timespan: 60s
1687 condition:
1688 gte: 3
1689level: high
1690"#;
1691 let collection = parse_sigma_yaml(yaml).unwrap();
1692 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1693 engine.add_collection(&collection).unwrap();
1694
1695 assert_eq!(engine.detection_rule_count(), 1);
1696 assert_eq!(engine.correlation_rule_count(), 1);
1697
1698 let base_ts = 1000i64;
1700 for i in 0..3 {
1701 let v = json!({"CommandLine": "whoami", "User": "admin"});
1702 let event = JsonEvent::borrow(&v);
1703 let result = engine.process_event_at(&event, base_ts + i * 10);
1704
1705 assert_eq!(result.detections.len(), 1);
1707
1708 if i < 2 {
1709 assert!(result.correlations.is_empty());
1711 } else {
1712 assert_eq!(result.correlations.len(), 1);
1714 assert_eq!(result.correlations[0].rule_title, "Multiple Whoami");
1715 assert_eq!(result.correlations[0].aggregated_value, 3.0);
1716 }
1717 }
1718 }
1719
1720 #[test]
1721 fn test_event_count_different_groups() {
1722 let yaml = r#"
1723title: Login
1724id: login-001
1725logsource:
1726 category: auth
1727detection:
1728 selection:
1729 EventType: login
1730 condition: selection
1731level: low
1732---
1733title: Many Logins
1734id: corr-login
1735correlation:
1736 type: event_count
1737 rules:
1738 - login-001
1739 group-by:
1740 - User
1741 timespan: 60s
1742 condition:
1743 gte: 3
1744level: high
1745"#;
1746 let collection = parse_sigma_yaml(yaml).unwrap();
1747 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1748 engine.add_collection(&collection).unwrap();
1749
1750 let ts = 1000i64;
1752 for i in 0..2 {
1753 let v = json!({"EventType": "login", "User": "alice"});
1754 let event = JsonEvent::borrow(&v);
1755 let r = engine.process_event_at(&event, ts + i);
1756 assert!(r.correlations.is_empty());
1757 }
1758 for i in 0..3 {
1759 let v = json!({"EventType": "login", "User": "bob"});
1760 let event = JsonEvent::borrow(&v);
1761 let r = engine.process_event_at(&event, ts + i);
1762 if i == 2 {
1763 assert_eq!(r.correlations.len(), 1);
1764 assert_eq!(
1765 r.correlations[0].group_key,
1766 vec![("User".to_string(), "bob".to_string())]
1767 );
1768 }
1769 }
1770 }
1771
1772 #[test]
1773 fn test_event_count_window_expiry() {
1774 let yaml = r#"
1775title: Base
1776id: base-002
1777logsource:
1778 category: test
1779detection:
1780 selection:
1781 action: click
1782 condition: selection
1783---
1784title: Rapid Clicks
1785id: corr-002
1786correlation:
1787 type: event_count
1788 rules:
1789 - base-002
1790 group-by:
1791 - User
1792 timespan: 10s
1793 condition:
1794 gte: 3
1795level: medium
1796"#;
1797 let collection = parse_sigma_yaml(yaml).unwrap();
1798 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1799 engine.add_collection(&collection).unwrap();
1800
1801 let v = json!({"action": "click", "User": "admin"});
1803 let event = JsonEvent::borrow(&v);
1804 engine.process_event_at(&event, 0);
1805 engine.process_event_at(&event, 1);
1806 let r = engine.process_event_at(&event, 15);
1807 assert!(r.correlations.is_empty());
1809 }
1810
1811 #[test]
1816 fn test_value_count() {
1817 let yaml = r#"
1818title: Failed Login
1819id: failed-login-001
1820logsource:
1821 category: auth
1822detection:
1823 selection:
1824 EventType: failed_login
1825 condition: selection
1826level: low
1827---
1828title: Failed Logins From Many Users
1829id: corr-vc-001
1830correlation:
1831 type: value_count
1832 rules:
1833 - failed-login-001
1834 group-by:
1835 - Host
1836 timespan: 60s
1837 condition:
1838 field: User
1839 gte: 3
1840level: high
1841"#;
1842 let collection = parse_sigma_yaml(yaml).unwrap();
1843 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1844 engine.add_collection(&collection).unwrap();
1845
1846 let ts = 1000i64;
1847 for (i, user) in ["alice", "bob", "charlie"].iter().enumerate() {
1849 let v = json!({"EventType": "failed_login", "Host": "srv01", "User": user});
1850 let event = JsonEvent::borrow(&v);
1851 let r = engine.process_event_at(&event, ts + i as i64);
1852 if i == 2 {
1853 assert_eq!(r.correlations.len(), 1);
1854 assert_eq!(r.correlations[0].aggregated_value, 3.0);
1855 }
1856 }
1857 }
1858
1859 #[test]
1864 fn test_temporal() {
1865 let yaml = r#"
1866title: Recon A
1867id: recon-a
1868name: recon_a
1869logsource:
1870 category: process
1871detection:
1872 selection:
1873 CommandLine|contains: 'whoami'
1874 condition: selection
1875---
1876title: Recon B
1877id: recon-b
1878name: recon_b
1879logsource:
1880 category: process
1881detection:
1882 selection:
1883 CommandLine|contains: 'ipconfig'
1884 condition: selection
1885---
1886title: Recon Combo
1887id: corr-temporal
1888correlation:
1889 type: temporal
1890 rules:
1891 - recon-a
1892 - recon-b
1893 group-by:
1894 - User
1895 timespan: 60s
1896 condition:
1897 gte: 2
1898level: high
1899"#;
1900 let collection = parse_sigma_yaml(yaml).unwrap();
1901 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1902 engine.add_collection(&collection).unwrap();
1903
1904 let ts = 1000i64;
1905 let v1 = json!({"CommandLine": "whoami", "User": "admin"});
1907 let ev1 = JsonEvent::borrow(&v1);
1908 let r1 = engine.process_event_at(&ev1, ts);
1909 assert!(r1.correlations.is_empty());
1910
1911 let v2 = json!({"CommandLine": "ipconfig /all", "User": "admin"});
1913 let ev2 = JsonEvent::borrow(&v2);
1914 let r2 = engine.process_event_at(&ev2, ts + 10);
1915 assert_eq!(r2.correlations.len(), 1);
1916 assert_eq!(r2.correlations[0].rule_title, "Recon Combo");
1917 }
1918
1919 #[test]
1924 fn test_temporal_ordered() {
1925 let yaml = r#"
1926title: Failed Login
1927id: failed-001
1928name: failed_login
1929logsource:
1930 category: auth
1931detection:
1932 selection:
1933 EventType: failed_login
1934 condition: selection
1935---
1936title: Success Login
1937id: success-001
1938name: successful_login
1939logsource:
1940 category: auth
1941detection:
1942 selection:
1943 EventType: success_login
1944 condition: selection
1945---
1946title: Brute Force Then Login
1947id: corr-bf
1948correlation:
1949 type: temporal_ordered
1950 rules:
1951 - failed-001
1952 - success-001
1953 group-by:
1954 - User
1955 timespan: 60s
1956 condition:
1957 gte: 2
1958level: critical
1959"#;
1960 let collection = parse_sigma_yaml(yaml).unwrap();
1961 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1962 engine.add_collection(&collection).unwrap();
1963
1964 let ts = 1000i64;
1965 let v1 = json!({"EventType": "failed_login", "User": "admin"});
1967 let ev1 = JsonEvent::borrow(&v1);
1968 let r1 = engine.process_event_at(&ev1, ts);
1969 assert!(r1.correlations.is_empty());
1970
1971 let v2 = json!({"EventType": "success_login", "User": "admin"});
1973 let ev2 = JsonEvent::borrow(&v2);
1974 let r2 = engine.process_event_at(&ev2, ts + 10);
1975 assert_eq!(r2.correlations.len(), 1);
1976 }
1977
1978 #[test]
1979 fn test_temporal_ordered_wrong_order() {
1980 let yaml = r#"
1981title: Rule A
1982id: rule-a
1983logsource:
1984 category: test
1985detection:
1986 selection:
1987 type: a
1988 condition: selection
1989---
1990title: Rule B
1991id: rule-b
1992logsource:
1993 category: test
1994detection:
1995 selection:
1996 type: b
1997 condition: selection
1998---
1999title: A then B
2000id: corr-ab
2001correlation:
2002 type: temporal_ordered
2003 rules:
2004 - rule-a
2005 - rule-b
2006 group-by:
2007 - User
2008 timespan: 60s
2009 condition:
2010 gte: 2
2011level: high
2012"#;
2013 let collection = parse_sigma_yaml(yaml).unwrap();
2014 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2015 engine.add_collection(&collection).unwrap();
2016
2017 let ts = 1000i64;
2018 let v1 = json!({"type": "b", "User": "admin"});
2020 let ev1 = JsonEvent::borrow(&v1);
2021 engine.process_event_at(&ev1, ts);
2022
2023 let v2 = json!({"type": "a", "User": "admin"});
2024 let ev2 = JsonEvent::borrow(&v2);
2025 let r2 = engine.process_event_at(&ev2, ts + 10);
2026 assert!(r2.correlations.is_empty());
2027 }
2028
2029 #[test]
2034 fn test_value_sum() {
2035 let yaml = r#"
2036title: Web Access
2037id: web-001
2038logsource:
2039 category: web
2040detection:
2041 selection:
2042 action: upload
2043 condition: selection
2044---
2045title: Large Upload
2046id: corr-sum
2047correlation:
2048 type: value_sum
2049 rules:
2050 - web-001
2051 group-by:
2052 - User
2053 timespan: 60s
2054 condition:
2055 field: bytes_sent
2056 gt: 1000
2057level: high
2058"#;
2059 let collection = parse_sigma_yaml(yaml).unwrap();
2060 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2061 engine.add_collection(&collection).unwrap();
2062
2063 let ts = 1000i64;
2064 let v1 = json!({"action": "upload", "User": "alice", "bytes_sent": 600});
2065 let ev1 = JsonEvent::borrow(&v1);
2066 let r1 = engine.process_event_at(&ev1, ts);
2067 assert!(r1.correlations.is_empty());
2068
2069 let v2 = json!({"action": "upload", "User": "alice", "bytes_sent": 500});
2070 let ev2 = JsonEvent::borrow(&v2);
2071 let r2 = engine.process_event_at(&ev2, ts + 5);
2072 assert_eq!(r2.correlations.len(), 1);
2073 assert!((r2.correlations[0].aggregated_value - 1100.0).abs() < f64::EPSILON);
2074 }
2075
2076 #[test]
2077 fn test_value_avg() {
2078 let yaml = r#"
2079title: Request
2080id: req-001
2081logsource:
2082 category: web
2083detection:
2084 selection:
2085 type: request
2086 condition: selection
2087---
2088title: High Avg Latency
2089id: corr-avg
2090correlation:
2091 type: value_avg
2092 rules:
2093 - req-001
2094 group-by:
2095 - Service
2096 timespan: 60s
2097 condition:
2098 field: latency_ms
2099 gt: 500
2100level: medium
2101"#;
2102 let collection = parse_sigma_yaml(yaml).unwrap();
2103 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2104 engine.add_collection(&collection).unwrap();
2105
2106 let ts = 1000i64;
2107 for (i, latency) in [400, 600, 800].iter().enumerate() {
2109 let v = json!({"type": "request", "Service": "api", "latency_ms": latency});
2110 let event = JsonEvent::borrow(&v);
2111 let r = engine.process_event_at(&event, ts + i as i64);
2112 if i == 2 {
2113 assert_eq!(r.correlations.len(), 1);
2114 assert!((r.correlations[0].aggregated_value - 600.0).abs() < f64::EPSILON);
2115 }
2116 }
2117 }
2118
2119 #[test]
2124 fn test_state_count() {
2125 let yaml = r#"
2126title: Base
2127id: base-sc
2128logsource:
2129 category: test
2130detection:
2131 selection:
2132 action: test
2133 condition: selection
2134---
2135title: Count
2136id: corr-sc
2137correlation:
2138 type: event_count
2139 rules:
2140 - base-sc
2141 group-by:
2142 - User
2143 timespan: 60s
2144 condition:
2145 gte: 100
2146level: low
2147"#;
2148 let collection = parse_sigma_yaml(yaml).unwrap();
2149 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2150 engine.add_collection(&collection).unwrap();
2151
2152 let v = json!({"action": "test", "User": "alice"});
2153 let event = JsonEvent::borrow(&v);
2154 engine.process_event_at(&event, 1000);
2155 assert_eq!(engine.state_count(), 1);
2156
2157 let v2 = json!({"action": "test", "User": "bob"});
2158 let event2 = JsonEvent::borrow(&v2);
2159 engine.process_event_at(&event2, 1001);
2160 assert_eq!(engine.state_count(), 2);
2161
2162 engine.evict_expired(2000);
2164 assert_eq!(engine.state_count(), 0);
2165 }
2166
2167 #[test]
2172 fn test_generate_flag_default_false() {
2173 let yaml = r#"
2174title: Base
2175id: gen-base
2176logsource:
2177 category: test
2178detection:
2179 selection:
2180 action: test
2181 condition: selection
2182---
2183title: Correlation
2184id: gen-corr
2185correlation:
2186 type: event_count
2187 rules:
2188 - gen-base
2189 group-by:
2190 - User
2191 timespan: 60s
2192 condition:
2193 gte: 1
2194level: high
2195"#;
2196 let collection = parse_sigma_yaml(yaml).unwrap();
2197 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2198 engine.add_collection(&collection).unwrap();
2199
2200 let v = json!({"action": "test", "User": "alice"});
2203 let event = JsonEvent::borrow(&v);
2204 let r = engine.process_event_at(&event, 1000);
2205 assert_eq!(r.detections.len(), 1);
2206 assert_eq!(r.correlations.len(), 1);
2207 }
2208
2209 #[test]
2214 fn test_aws_bucket_enumeration() {
2215 let yaml = r#"
2216title: Potential Bucket Enumeration on AWS
2217id: f305fd62-beca-47da-ad95-7690a0620084
2218logsource:
2219 product: aws
2220 service: cloudtrail
2221detection:
2222 selection:
2223 eventSource: "s3.amazonaws.com"
2224 eventName: "ListBuckets"
2225 condition: selection
2226level: low
2227---
2228title: Multiple AWS bucket enumerations
2229id: be246094-01d3-4bba-88de-69e582eba0cc
2230status: experimental
2231correlation:
2232 type: event_count
2233 rules:
2234 - f305fd62-beca-47da-ad95-7690a0620084
2235 group-by:
2236 - userIdentity.arn
2237 timespan: 1h
2238 condition:
2239 gte: 5
2240level: high
2241"#;
2242 let collection = parse_sigma_yaml(yaml).unwrap();
2243 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2244 engine.add_collection(&collection).unwrap();
2245
2246 let base_ts = 1_700_000_000i64;
2247 for i in 0..5 {
2248 let v = json!({
2249 "eventSource": "s3.amazonaws.com",
2250 "eventName": "ListBuckets",
2251 "userIdentity.arn": "arn:aws:iam::123456789:user/attacker"
2252 });
2253 let event = JsonEvent::borrow(&v);
2254 let r = engine.process_event_at(&event, base_ts + i * 60);
2255 if i == 4 {
2256 assert_eq!(r.correlations.len(), 1);
2257 assert_eq!(
2258 r.correlations[0].rule_title,
2259 "Multiple AWS bucket enumerations"
2260 );
2261 assert_eq!(r.correlations[0].aggregated_value, 5.0);
2262 }
2263 }
2264 }
2265
2266 #[test]
2271 fn test_chaining_event_count_to_temporal() {
2272 let yaml = r#"
2275title: Single failed login
2276id: failed-login-chain
2277name: failed_login
2278logsource:
2279 category: auth
2280detection:
2281 selection:
2282 EventType: failed_login
2283 condition: selection
2284---
2285title: Successful login
2286id: success-login-chain
2287name: successful_login
2288logsource:
2289 category: auth
2290detection:
2291 selection:
2292 EventType: success_login
2293 condition: selection
2294---
2295title: Multiple failed logins
2296id: many-failed-chain
2297name: multiple_failed_login
2298correlation:
2299 type: event_count
2300 rules:
2301 - failed-login-chain
2302 group-by:
2303 - User
2304 timespan: 60s
2305 condition:
2306 gte: 3
2307level: medium
2308---
2309title: Brute Force Followed by Login
2310id: brute-force-chain
2311correlation:
2312 type: temporal_ordered
2313 rules:
2314 - many-failed-chain
2315 - success-login-chain
2316 group-by:
2317 - User
2318 timespan: 120s
2319 condition:
2320 gte: 2
2321level: critical
2322"#;
2323 let collection = parse_sigma_yaml(yaml).unwrap();
2324 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2325 engine.add_collection(&collection).unwrap();
2326
2327 assert_eq!(engine.detection_rule_count(), 2);
2328 assert_eq!(engine.correlation_rule_count(), 2);
2329
2330 let ts = 1000i64;
2331
2332 for i in 0..3 {
2334 let v = json!({"EventType": "failed_login", "User": "victim"});
2335 let event = JsonEvent::borrow(&v);
2336 let r = engine.process_event_at(&event, ts + i);
2337 if i == 2 {
2338 assert!(
2340 r.correlations
2341 .iter()
2342 .any(|c| c.rule_title == "Multiple failed logins"),
2343 "Expected event_count correlation to fire"
2344 );
2345 }
2346 }
2347
2348 let v = json!({"EventType": "success_login", "User": "victim"});
2355 let event = JsonEvent::borrow(&v);
2356 let r = engine.process_event_at(&event, ts + 30);
2357
2358 assert_eq!(r.detections.len(), 1);
2360 assert_eq!(r.detections[0].rule_title, "Successful login");
2361 }
2362
2363 #[test]
2368 fn test_field_aliases() {
2369 let yaml = r#"
2370title: Internal Error
2371id: internal-error-001
2372name: internal_error
2373logsource:
2374 category: web
2375detection:
2376 selection:
2377 http.response.status_code: 500
2378 condition: selection
2379---
2380title: New Connection
2381id: new-conn-001
2382name: new_network_connection
2383logsource:
2384 category: network
2385detection:
2386 selection:
2387 event.type: connection
2388 condition: selection
2389---
2390title: Error Then Connection
2391id: corr-alias
2392correlation:
2393 type: temporal
2394 rules:
2395 - internal-error-001
2396 - new-conn-001
2397 group-by:
2398 - internal_ip
2399 timespan: 60s
2400 condition:
2401 gte: 2
2402 aliases:
2403 internal_ip:
2404 internal_error: destination.ip
2405 new_network_connection: source.ip
2406level: high
2407"#;
2408 let collection = parse_sigma_yaml(yaml).unwrap();
2409 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2410 engine.add_collection(&collection).unwrap();
2411
2412 let ts = 1000i64;
2413
2414 let v1 = json!({
2416 "http.response.status_code": 500,
2417 "destination.ip": "10.0.0.5"
2418 });
2419 let ev1 = JsonEvent::borrow(&v1);
2420 let r1 = engine.process_event_at(&ev1, ts);
2421 assert_eq!(r1.detections.len(), 1);
2422 assert!(r1.correlations.is_empty());
2423
2424 let v2 = json!({
2426 "event.type": "connection",
2427 "source.ip": "10.0.0.5"
2428 });
2429 let ev2 = JsonEvent::borrow(&v2);
2430 let r2 = engine.process_event_at(&ev2, ts + 5);
2431 assert_eq!(r2.detections.len(), 1);
2432 assert_eq!(r2.correlations.len(), 1);
2434 assert_eq!(r2.correlations[0].rule_title, "Error Then Connection");
2435 assert!(
2437 r2.correlations[0]
2438 .group_key
2439 .iter()
2440 .any(|(k, v)| k == "internal_ip" && v == "10.0.0.5")
2441 );
2442 }
2443
2444 #[test]
2449 fn test_value_percentile() {
2450 let yaml = r#"
2451title: Process Creation
2452id: proc-001
2453logsource:
2454 category: process
2455detection:
2456 selection:
2457 type: process_creation
2458 condition: selection
2459---
2460title: Rare Process
2461id: corr-percentile
2462correlation:
2463 type: value_percentile
2464 rules:
2465 - proc-001
2466 group-by:
2467 - ComputerName
2468 timespan: 60s
2469 condition:
2470 field: image
2471 lte: 50
2472level: medium
2473"#;
2474 let collection = parse_sigma_yaml(yaml).unwrap();
2475 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2476 engine.add_collection(&collection).unwrap();
2477
2478 let ts = 1000i64;
2479 for (i, val) in [10.0, 20.0, 30.0, 40.0, 50.0].iter().enumerate() {
2481 let v = json!({"type": "process_creation", "ComputerName": "srv01", "image": val});
2482 let event = JsonEvent::borrow(&v);
2483 let _ = engine.process_event_at(&event, ts + i as i64);
2484 }
2485 }
2488
2489 #[test]
2494 fn test_extended_temporal_and_condition() {
2495 let yaml = r#"
2497title: Login Attempt
2498id: login-attempt
2499logsource:
2500 category: auth
2501detection:
2502 selection:
2503 EventType: login_failure
2504 condition: selection
2505---
2506title: Password Change
2507id: password-change
2508logsource:
2509 category: auth
2510detection:
2511 selection:
2512 EventType: password_change
2513 condition: selection
2514---
2515title: Credential Attack
2516correlation:
2517 type: temporal
2518 rules:
2519 - login-attempt
2520 - password-change
2521 group-by:
2522 - User
2523 timespan: 300s
2524 condition: login-attempt and password-change
2525level: high
2526"#;
2527 let collection = parse_sigma_yaml(yaml).unwrap();
2528 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2529 engine.add_collection(&collection).unwrap();
2530
2531 let ts = 1000i64;
2532
2533 let ev1 = json!({"EventType": "login_failure", "User": "alice"});
2535 let r1 = engine.process_event_at(&JsonEvent::borrow(&ev1), ts);
2536 assert!(r1.correlations.is_empty(), "only one rule fired so far");
2537
2538 let ev2 = json!({"EventType": "password_change", "User": "alice"});
2540 let r2 = engine.process_event_at(&JsonEvent::borrow(&ev2), ts + 10);
2541 assert_eq!(
2542 r2.correlations.len(),
2543 1,
2544 "temporal correlation should fire: both rules matched"
2545 );
2546 assert_eq!(r2.correlations[0].rule_title, "Credential Attack");
2547 }
2548
2549 #[test]
2550 fn test_extended_temporal_or_condition() {
2551 let yaml = r#"
2553title: SSH Login
2554id: ssh-login
2555logsource:
2556 category: auth
2557detection:
2558 selection:
2559 EventType: ssh_login
2560 condition: selection
2561---
2562title: VPN Login
2563id: vpn-login
2564logsource:
2565 category: auth
2566detection:
2567 selection:
2568 EventType: vpn_login
2569 condition: selection
2570---
2571title: Any Remote Access
2572correlation:
2573 type: temporal
2574 rules:
2575 - ssh-login
2576 - vpn-login
2577 group-by:
2578 - User
2579 timespan: 60s
2580 condition: ssh-login or vpn-login
2581level: medium
2582"#;
2583 let collection = parse_sigma_yaml(yaml).unwrap();
2584 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2585 engine.add_collection(&collection).unwrap();
2586
2587 let ev = json!({"EventType": "ssh_login", "User": "bob"});
2589 let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
2590 assert_eq!(r.correlations.len(), 1);
2591 assert_eq!(r.correlations[0].rule_title, "Any Remote Access");
2592 }
2593
2594 #[test]
2595 fn test_extended_temporal_partial_and_no_fire() {
2596 let yaml = r#"
2598title: Recon Step 1
2599id: recon-1
2600logsource:
2601 category: process
2602detection:
2603 selection:
2604 CommandLine|contains: 'whoami'
2605 condition: selection
2606---
2607title: Recon Step 2
2608id: recon-2
2609logsource:
2610 category: process
2611detection:
2612 selection:
2613 CommandLine|contains: 'ipconfig'
2614 condition: selection
2615---
2616title: Full Recon
2617correlation:
2618 type: temporal
2619 rules:
2620 - recon-1
2621 - recon-2
2622 group-by:
2623 - Host
2624 timespan: 120s
2625 condition: recon-1 and recon-2
2626level: high
2627"#;
2628 let collection = parse_sigma_yaml(yaml).unwrap();
2629 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2630 engine.add_collection(&collection).unwrap();
2631
2632 let ev = json!({"CommandLine": "whoami", "Host": "srv01"});
2634 let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
2635 assert!(r.correlations.is_empty(), "only one of two AND rules fired");
2636
2637 let ev2 = json!({"CommandLine": "ipconfig /all", "Host": "srv01"});
2639 let r2 = engine.process_event_at(&JsonEvent::borrow(&ev2), 1010);
2640 assert_eq!(r2.correlations.len(), 1);
2641 assert_eq!(r2.correlations[0].rule_title, "Full Recon");
2642 }
2643
2644 #[test]
2649 fn test_filter_with_correlation() {
2650 let yaml = r#"
2652title: Failed Auth
2653id: failed-auth
2654logsource:
2655 category: auth
2656detection:
2657 selection:
2658 EventType: auth_failure
2659 condition: selection
2660---
2661title: Exclude Service Accounts
2662filter:
2663 rules:
2664 - failed-auth
2665 selection:
2666 User|startswith: 'svc_'
2667 condition: selection
2668---
2669title: Brute Force
2670correlation:
2671 type: event_count
2672 rules:
2673 - failed-auth
2674 group-by:
2675 - User
2676 timespan: 300s
2677 condition:
2678 gte: 3
2679level: critical
2680"#;
2681 let collection = parse_sigma_yaml(yaml).unwrap();
2682 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2683 engine.add_collection(&collection).unwrap();
2684
2685 let ts = 1000i64;
2686
2687 for i in 0..5 {
2689 let ev = json!({"EventType": "auth_failure", "User": "svc_backup"});
2690 let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + i);
2691 assert!(
2692 r.correlations.is_empty(),
2693 "service account should be filtered, no correlation"
2694 );
2695 }
2696
2697 for i in 0..2 {
2699 let ev = json!({"EventType": "auth_failure", "User": "alice"});
2700 let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 10 + i);
2701 assert!(r.correlations.is_empty(), "not yet 3 events");
2702 }
2703
2704 let ev = json!({"EventType": "auth_failure", "User": "alice"});
2706 let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 12);
2707 assert_eq!(r.correlations.len(), 1);
2708 assert_eq!(r.correlations[0].rule_title, "Brute Force");
2709 }
2710
2711 #[test]
2716 fn test_repeat_rules_in_correlation() {
2717 let yaml = r#"
2719title: File Access A
2720id: file-a
2721logsource:
2722 category: file_access
2723detection:
2724 selection:
2725 FileName|endswith: '.docx'
2726 condition: selection
2727---
2728action: repeat
2729title: File Access B
2730id: file-b
2731detection:
2732 selection:
2733 FileName|endswith: '.xlsx'
2734 condition: selection
2735---
2736title: Mass File Access
2737correlation:
2738 type: event_count
2739 rules:
2740 - file-a
2741 - file-b
2742 group-by:
2743 - User
2744 timespan: 60s
2745 condition:
2746 gte: 3
2747level: high
2748"#;
2749 let collection = parse_sigma_yaml(yaml).unwrap();
2750 assert_eq!(collection.rules.len(), 2);
2751 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2752 engine.add_collection(&collection).unwrap();
2753 assert_eq!(engine.detection_rule_count(), 2);
2754
2755 let ts = 1000i64;
2756 let ev1 = json!({"FileName": "report.docx", "User": "bob"});
2758 engine.process_event_at(&JsonEvent::borrow(&ev1), ts);
2759 let ev2 = json!({"FileName": "data.xlsx", "User": "bob"});
2760 engine.process_event_at(&JsonEvent::borrow(&ev2), ts + 1);
2761 let ev3 = json!({"FileName": "notes.docx", "User": "bob"});
2762 let r = engine.process_event_at(&JsonEvent::borrow(&ev3), ts + 2);
2763
2764 assert_eq!(r.correlations.len(), 1);
2765 assert_eq!(r.correlations[0].rule_title, "Mass File Access");
2766 }
2767
2768 #[test]
2773 fn test_expand_modifier_with_correlation() {
2774 let yaml = r#"
2775title: User Temp File
2776id: user-temp
2777logsource:
2778 category: file_access
2779detection:
2780 selection:
2781 FilePath|expand: 'C:\Users\%User%\Temp'
2782 condition: selection
2783---
2784title: Excessive Temp Access
2785correlation:
2786 type: event_count
2787 rules:
2788 - user-temp
2789 group-by:
2790 - User
2791 timespan: 60s
2792 condition:
2793 gte: 2
2794level: medium
2795"#;
2796 let collection = parse_sigma_yaml(yaml).unwrap();
2797 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2798 engine.add_collection(&collection).unwrap();
2799
2800 let ts = 1000i64;
2801 let ev1 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2803 let r1 = engine.process_event_at(&JsonEvent::borrow(&ev1), ts);
2804 assert!(r1.correlations.is_empty());
2805
2806 let ev2 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2807 let r2 = engine.process_event_at(&JsonEvent::borrow(&ev2), ts + 1);
2808 assert_eq!(r2.correlations.len(), 1);
2809 assert_eq!(r2.correlations[0].rule_title, "Excessive Temp Access");
2810
2811 let ev3 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "bob"});
2813 let r3 = engine.process_event_at(&JsonEvent::borrow(&ev3), ts + 2);
2814 assert_eq!(r3.detections.len(), 0);
2816 }
2817
2818 #[test]
2823 fn test_timestamp_modifier_with_correlation() {
2824 let yaml = r#"
2825title: Night Login
2826id: night-login
2827logsource:
2828 category: auth
2829detection:
2830 login:
2831 EventType: login
2832 night:
2833 Timestamp|hour: 3
2834 condition: login and night
2835---
2836title: Frequent Night Logins
2837correlation:
2838 type: event_count
2839 rules:
2840 - night-login
2841 group-by:
2842 - User
2843 timespan: 3600s
2844 condition:
2845 gte: 2
2846level: high
2847"#;
2848 let collection = parse_sigma_yaml(yaml).unwrap();
2849 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2850 engine.add_collection(&collection).unwrap();
2851
2852 let ts = 1000i64;
2853 let ev1 =
2855 json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:10:00Z"});
2856 let r1 = engine.process_event_at(&JsonEvent::borrow(&ev1), ts);
2857 assert_eq!(r1.detections.len(), 1);
2858 assert!(r1.correlations.is_empty());
2859
2860 let ev2 =
2861 json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:45:00Z"});
2862 let r2 = engine.process_event_at(&JsonEvent::borrow(&ev2), ts + 1);
2863 assert_eq!(r2.correlations.len(), 1);
2864 assert_eq!(r2.correlations[0].rule_title, "Frequent Night Logins");
2865
2866 let ev3 = json!({"EventType": "login", "User": "bob", "Timestamp": "2024-01-15T12:00:00Z"});
2868 let r3 = engine.process_event_at(&JsonEvent::borrow(&ev3), ts + 2);
2869 assert!(
2870 r3.detections.is_empty(),
2871 "noon login should not match night rule"
2872 );
2873 }
2874
2875 #[test]
2880 fn test_event_count_range_condition() {
2881 let yaml = r#"
2882title: Login Attempt
2883id: login-attempt-001
2884name: login_attempt
2885logsource:
2886 product: windows
2887detection:
2888 selection:
2889 EventType: login
2890 condition: selection
2891level: low
2892---
2893title: Login Count Range
2894id: corr-range-001
2895correlation:
2896 type: event_count
2897 rules:
2898 - login-attempt-001
2899 group-by:
2900 - User
2901 timespan: 3600s
2902 condition:
2903 gt: 2
2904 lte: 5
2905level: high
2906"#;
2907 let collection = parse_sigma_yaml(yaml).unwrap();
2908 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2909 engine.add_collection(&collection).unwrap();
2910
2911 let ts: i64 = 1_000_000;
2912
2913 for i in 0..2 {
2915 let ev = json!({"EventType": "login", "User": "alice"});
2916 let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + i);
2917 assert!(r.correlations.is_empty(), "2 events should not fire (gt:2)");
2918 }
2919
2920 let ev3 = json!({"EventType": "login", "User": "alice"});
2922 let r3 = engine.process_event_at(&JsonEvent::borrow(&ev3), ts + 3);
2923 assert_eq!(r3.correlations.len(), 1, "3 events: gt:2 AND lte:5");
2924
2925 for i in 4..=5 {
2927 let ev = json!({"EventType": "login", "User": "alice"});
2928 let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + i);
2929 assert_eq!(r.correlations.len(), 1, "{i} events still in range");
2930 }
2931
2932 let ev6 = json!({"EventType": "login", "User": "alice"});
2934 let r6 = engine.process_event_at(&JsonEvent::borrow(&ev6), ts + 6);
2935 assert!(
2936 r6.correlations.is_empty(),
2937 "6 events exceeds lte:5, should not fire"
2938 );
2939 }
2940
2941 fn suppression_yaml() -> &'static str {
2946 r#"
2947title: Login
2948id: login-base
2949logsource:
2950 category: auth
2951detection:
2952 selection:
2953 EventType: login
2954 condition: selection
2955---
2956title: Many Logins
2957correlation:
2958 type: event_count
2959 rules:
2960 - login-base
2961 group-by:
2962 - User
2963 timeframe: 60s
2964 condition:
2965 gte: 3
2966level: high
2967"#
2968 }
2969
2970 #[test]
2971 fn test_suppression_window() {
2972 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2973 let config = CorrelationConfig {
2974 suppress: Some(10), ..Default::default()
2976 };
2977 let mut engine = CorrelationEngine::new(config);
2978 engine.add_collection(&collection).unwrap();
2979
2980 let ev = json!({"EventType": "login", "User": "alice"});
2981 let ts = 1000;
2982
2983 engine.process_event_at(&JsonEvent::borrow(&ev), ts);
2985 engine.process_event_at(&JsonEvent::borrow(&ev), ts + 1);
2986 let r3 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 2);
2987 assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
2988
2989 let r4 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 3);
2991 assert!(
2992 r4.correlations.is_empty(),
2993 "should be suppressed within 10s window"
2994 );
2995
2996 let r5 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 9);
2998 assert!(
2999 r5.correlations.is_empty(),
3000 "should be suppressed at ts+9 (< ts+2+10)"
3001 );
3002
3003 let r6 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 13);
3005 assert_eq!(
3006 r6.correlations.len(),
3007 1,
3008 "should fire again after suppress window expires"
3009 );
3010 }
3011
3012 #[test]
3013 fn test_suppression_per_group_key() {
3014 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3015 let config = CorrelationConfig {
3016 suppress: Some(60),
3017 ..Default::default()
3018 };
3019 let mut engine = CorrelationEngine::new(config);
3020 engine.add_collection(&collection).unwrap();
3021
3022 let ts = 1000;
3023
3024 let ev_a = json!({"EventType": "login", "User": "alice"});
3026 engine.process_event_at(&JsonEvent::borrow(&ev_a), ts);
3027 engine.process_event_at(&JsonEvent::borrow(&ev_a), ts + 1);
3028 let r = engine.process_event_at(&JsonEvent::borrow(&ev_a), ts + 2);
3029 assert_eq!(r.correlations.len(), 1, "alice should fire");
3030
3031 let ev_b = json!({"EventType": "login", "User": "bob"});
3033 engine.process_event_at(&JsonEvent::borrow(&ev_b), ts + 3);
3034 engine.process_event_at(&JsonEvent::borrow(&ev_b), ts + 4);
3035 let r = engine.process_event_at(&JsonEvent::borrow(&ev_b), ts + 5);
3036 assert_eq!(r.correlations.len(), 1, "bob should fire independently");
3037
3038 let r = engine.process_event_at(&JsonEvent::borrow(&ev_a), ts + 6);
3040 assert!(r.correlations.is_empty(), "alice still suppressed");
3041 }
3042
3043 #[test]
3048 fn test_action_reset() {
3049 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3050 let config = CorrelationConfig {
3051 action_on_match: CorrelationAction::Reset,
3052 ..Default::default()
3053 };
3054 let mut engine = CorrelationEngine::new(config);
3055 engine.add_collection(&collection).unwrap();
3056
3057 let ev = json!({"EventType": "login", "User": "alice"});
3058 let ts = 1000;
3059
3060 engine.process_event_at(&JsonEvent::borrow(&ev), ts);
3062 engine.process_event_at(&JsonEvent::borrow(&ev), ts + 1);
3063 let r3 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 2);
3064 assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
3065
3066 let r4 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 3);
3068 assert!(r4.correlations.is_empty(), "reset: need 3 more events");
3069
3070 let r5 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 4);
3071 assert!(r5.correlations.is_empty(), "reset: still only 2");
3072
3073 let r6 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 5);
3075 assert_eq!(
3076 r6.correlations.len(),
3077 1,
3078 "should fire again after 3 events post-reset"
3079 );
3080 }
3081
3082 #[test]
3087 fn test_emit_detections_true_by_default() {
3088 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3089 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3090 engine.add_collection(&collection).unwrap();
3091
3092 let ev = json!({"EventType": "login", "User": "alice"});
3093 let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
3094 assert_eq!(r.detections.len(), 1, "by default detections are emitted");
3095 }
3096
3097 #[test]
3098 fn test_emit_detections_false_suppresses() {
3099 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3100 let config = CorrelationConfig {
3101 emit_detections: false,
3102 ..Default::default()
3103 };
3104 let mut engine = CorrelationEngine::new(config);
3105 engine.add_collection(&collection).unwrap();
3106
3107 let ev = json!({"EventType": "login", "User": "alice"});
3108 let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
3109 assert!(
3110 r.detections.is_empty(),
3111 "detection matches should be suppressed when emit_detections=false"
3112 );
3113 }
3114
3115 #[test]
3116 fn test_generate_true_keeps_detections() {
3117 let yaml = r#"
3119title: Login
3120id: login-gen
3121logsource:
3122 category: auth
3123detection:
3124 selection:
3125 EventType: login
3126 condition: selection
3127---
3128title: Many Logins
3129correlation:
3130 type: event_count
3131 rules:
3132 - login-gen
3133 group-by:
3134 - User
3135 timeframe: 60s
3136 condition:
3137 gte: 3
3138 generate: true
3139level: high
3140"#;
3141 let collection = parse_sigma_yaml(yaml).unwrap();
3142 let config = CorrelationConfig {
3143 emit_detections: false,
3144 ..Default::default()
3145 };
3146 let mut engine = CorrelationEngine::new(config);
3147 engine.add_collection(&collection).unwrap();
3148
3149 let ev = json!({"EventType": "login", "User": "alice"});
3150 let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
3151 assert_eq!(
3153 r.detections.len(),
3154 1,
3155 "generate:true keeps detection output"
3156 );
3157 }
3158
3159 #[test]
3164 fn test_suppress_and_reset_combined() {
3165 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3166 let config = CorrelationConfig {
3167 suppress: Some(5),
3168 action_on_match: CorrelationAction::Reset,
3169 ..Default::default()
3170 };
3171 let mut engine = CorrelationEngine::new(config);
3172 engine.add_collection(&collection).unwrap();
3173
3174 let ev = json!({"EventType": "login", "User": "alice"});
3175 let ts = 1000;
3176
3177 engine.process_event_at(&JsonEvent::borrow(&ev), ts);
3179 engine.process_event_at(&JsonEvent::borrow(&ev), ts + 1);
3180 let r3 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 2);
3181 assert_eq!(r3.correlations.len(), 1, "fires on 3rd event");
3182
3183 engine.process_event_at(&JsonEvent::borrow(&ev), ts + 3);
3186 engine.process_event_at(&JsonEvent::borrow(&ev), ts + 4);
3187 let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 5);
3188 assert!(
3189 r.correlations.is_empty(),
3190 "threshold met again but still suppressed"
3191 );
3192
3193 let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 8);
3197 assert_eq!(
3198 r.correlations.len(),
3199 1,
3200 "fires after suppress expires (accumulated events + new one)"
3201 );
3202
3203 engine.process_event_at(&JsonEvent::borrow(&ev), ts + 9);
3206 engine.process_event_at(&JsonEvent::borrow(&ev), ts + 10);
3207 let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 11);
3208 assert!(
3209 r.correlations.is_empty(),
3210 "threshold met but suppress window hasn't expired (ts+11 - ts+8 = 3 < 5)"
3211 );
3212 }
3213
3214 #[test]
3219 fn test_no_suppression_fires_every_event() {
3220 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3221 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3222 engine.add_collection(&collection).unwrap();
3223
3224 let ev = json!({"EventType": "login", "User": "alice"});
3225 let ts = 1000;
3226
3227 engine.process_event_at(&JsonEvent::borrow(&ev), ts);
3228 engine.process_event_at(&JsonEvent::borrow(&ev), ts + 1);
3229 let r3 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 2);
3230 assert_eq!(r3.correlations.len(), 1);
3231
3232 let r4 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 3);
3234 assert_eq!(
3235 r4.correlations.len(),
3236 1,
3237 "no suppression: fires on every event after threshold"
3238 );
3239
3240 let r5 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 4);
3241 assert_eq!(r5.correlations.len(), 1, "still fires");
3242 }
3243
3244 fn yaml_str_attrs<const N: usize>(
3249 pairs: [(&str, &str); N],
3250 ) -> std::collections::HashMap<String, serde_yaml::Value> {
3251 pairs
3252 .into_iter()
3253 .map(|(k, v)| (k.to_string(), serde_yaml::Value::String(v.to_string())))
3254 .collect()
3255 }
3256
3257 #[test]
3258 fn test_custom_attr_timestamp_field() {
3259 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3260 let attrs = yaml_str_attrs([("rsigma.timestamp_field", "time")]);
3261 engine.apply_custom_attributes(&attrs);
3262
3263 assert_eq!(
3264 engine.config.timestamp_fields[0], "time",
3265 "rsigma.timestamp_field should be prepended"
3266 );
3267 assert!(
3269 engine
3270 .config
3271 .timestamp_fields
3272 .contains(&"@timestamp".to_string())
3273 );
3274 }
3275
3276 #[test]
3277 fn test_custom_attr_timestamp_field_no_duplicates() {
3278 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3279 let attrs = yaml_str_attrs([("rsigma.timestamp_field", "time")]);
3280 engine.apply_custom_attributes(&attrs);
3282 engine.apply_custom_attributes(&attrs);
3283
3284 let count = engine
3285 .config
3286 .timestamp_fields
3287 .iter()
3288 .filter(|f| *f == "time")
3289 .count();
3290 assert_eq!(count, 1, "should not duplicate timestamp_field entries");
3291 }
3292
3293 #[test]
3294 fn test_custom_attr_suppress() {
3295 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3296 assert!(engine.config.suppress.is_none());
3297
3298 let attrs = yaml_str_attrs([("rsigma.suppress", "5m")]);
3299 engine.apply_custom_attributes(&attrs);
3300
3301 assert_eq!(engine.config.suppress, Some(300));
3302 }
3303
3304 #[test]
3305 fn test_custom_attr_suppress_does_not_override_cli() {
3306 let config = CorrelationConfig {
3307 suppress: Some(60), ..Default::default()
3309 };
3310 let mut engine = CorrelationEngine::new(config);
3311
3312 let attrs = yaml_str_attrs([("rsigma.suppress", "5m")]);
3313 engine.apply_custom_attributes(&attrs);
3314
3315 assert_eq!(
3316 engine.config.suppress,
3317 Some(60),
3318 "CLI suppress should not be overridden by custom attribute"
3319 );
3320 }
3321
3322 #[test]
3323 fn test_custom_attr_action() {
3324 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3325 assert_eq!(engine.config.action_on_match, CorrelationAction::Alert);
3326
3327 let attrs = yaml_str_attrs([("rsigma.action", "reset")]);
3328 engine.apply_custom_attributes(&attrs);
3329
3330 assert_eq!(engine.config.action_on_match, CorrelationAction::Reset);
3331 }
3332
3333 #[test]
3334 fn test_custom_attr_action_does_not_override_cli() {
3335 let config = CorrelationConfig {
3336 action_on_match: CorrelationAction::Reset, ..Default::default()
3338 };
3339 let mut engine = CorrelationEngine::new(config);
3340
3341 let attrs = yaml_str_attrs([("rsigma.action", "alert")]);
3342 engine.apply_custom_attributes(&attrs);
3343
3344 assert_eq!(
3345 engine.config.action_on_match,
3346 CorrelationAction::Reset,
3347 "CLI action should not be overridden by custom attribute"
3348 );
3349 }
3350
3351 #[test]
3352 fn test_custom_attr_timestamp_field_used_for_extraction() {
3353 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3355 let mut config = CorrelationConfig::default();
3356 config.timestamp_fields.insert(0, "event_time".to_string());
3358 let mut engine = CorrelationEngine::new(config);
3359 engine.add_collection(&collection).unwrap();
3360
3361 let ev = json!({
3363 "EventType": "login",
3364 "User": "alice",
3365 "event_time": "2026-02-11T12:00:00Z"
3366 });
3367 let result = engine.process_event(&JsonEvent::borrow(&ev));
3368
3369 assert!(!result.detections.is_empty() || result.correlations.is_empty());
3371 let ts = engine
3375 .extract_event_timestamp(&JsonEvent::borrow(&ev))
3376 .expect("should extract timestamp");
3377 assert!(
3378 ts > 1_700_000_000 && ts < 1_800_000_000,
3379 "timestamp should be ~2026 epoch, got {ts}"
3380 );
3381 }
3382
3383 #[test]
3388 fn test_correlation_cycle_direct() {
3389 let yaml = r#"
3391title: detection rule
3392id: det-rule
3393logsource:
3394 product: test
3395detection:
3396 selection:
3397 action: click
3398 condition: selection
3399level: low
3400---
3401title: correlation A
3402id: corr-a
3403correlation:
3404 type: event_count
3405 rules:
3406 - corr-b
3407 group-by:
3408 - User
3409 timespan: 5m
3410 condition:
3411 gte: 2
3412level: high
3413---
3414title: correlation B
3415id: corr-b
3416correlation:
3417 type: event_count
3418 rules:
3419 - corr-a
3420 group-by:
3421 - User
3422 timespan: 5m
3423 condition:
3424 gte: 2
3425level: high
3426"#;
3427 let collection = parse_sigma_yaml(yaml).unwrap();
3428 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3429 let result = engine.add_collection(&collection);
3430 assert!(result.is_err(), "should detect direct cycle");
3431 let err = result.unwrap_err().to_string();
3432 assert!(err.contains("cycle"), "error should mention cycle: {err}");
3433 assert!(
3434 err.contains("corr-a") && err.contains("corr-b"),
3435 "error should name both correlations: {err}"
3436 );
3437 }
3438
3439 #[test]
3440 fn test_correlation_cycle_self() {
3441 let yaml = r#"
3443title: detection rule
3444id: det-rule
3445logsource:
3446 product: test
3447detection:
3448 selection:
3449 action: click
3450 condition: selection
3451level: low
3452---
3453title: self-ref correlation
3454id: self-corr
3455correlation:
3456 type: event_count
3457 rules:
3458 - self-corr
3459 group-by:
3460 - User
3461 timespan: 5m
3462 condition:
3463 gte: 2
3464level: high
3465"#;
3466 let collection = parse_sigma_yaml(yaml).unwrap();
3467 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3468 let result = engine.add_collection(&collection);
3469 assert!(result.is_err(), "should detect self-referencing cycle");
3470 let err = result.unwrap_err().to_string();
3471 assert!(err.contains("cycle"), "error should mention cycle: {err}");
3472 assert!(
3473 err.contains("self-corr"),
3474 "error should name the correlation: {err}"
3475 );
3476 }
3477
3478 #[test]
3479 fn test_correlation_no_cycle_valid_chain() {
3480 let yaml = r#"
3482title: detection rule
3483id: det-rule
3484logsource:
3485 product: test
3486detection:
3487 selection:
3488 action: click
3489 condition: selection
3490level: low
3491---
3492title: correlation A
3493id: corr-a
3494correlation:
3495 type: event_count
3496 rules:
3497 - det-rule
3498 group-by:
3499 - User
3500 timespan: 5m
3501 condition:
3502 gte: 2
3503level: high
3504---
3505title: correlation B
3506id: corr-b
3507correlation:
3508 type: event_count
3509 rules:
3510 - corr-a
3511 group-by:
3512 - User
3513 timespan: 5m
3514 condition:
3515 gte: 2
3516level: high
3517"#;
3518 let collection = parse_sigma_yaml(yaml).unwrap();
3519 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3520 let result = engine.add_collection(&collection);
3521 assert!(
3522 result.is_ok(),
3523 "valid chain should not be rejected: {result:?}"
3524 );
3525 }
3526
3527 #[test]
3528 fn test_correlation_cycle_transitive() {
3529 let yaml = r#"
3531title: detection rule
3532id: det-rule
3533logsource:
3534 product: test
3535detection:
3536 selection:
3537 action: click
3538 condition: selection
3539level: low
3540---
3541title: correlation A
3542id: corr-a
3543correlation:
3544 type: event_count
3545 rules:
3546 - corr-c
3547 group-by:
3548 - User
3549 timespan: 5m
3550 condition:
3551 gte: 2
3552level: high
3553---
3554title: correlation B
3555id: corr-b
3556correlation:
3557 type: event_count
3558 rules:
3559 - corr-a
3560 group-by:
3561 - User
3562 timespan: 5m
3563 condition:
3564 gte: 2
3565level: high
3566---
3567title: correlation C
3568id: corr-c
3569correlation:
3570 type: event_count
3571 rules:
3572 - corr-b
3573 group-by:
3574 - User
3575 timespan: 5m
3576 condition:
3577 gte: 2
3578level: high
3579"#;
3580 let collection = parse_sigma_yaml(yaml).unwrap();
3581 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3582 let result = engine.add_collection(&collection);
3583 assert!(result.is_err(), "should detect transitive cycle");
3584 let err = result.unwrap_err().to_string();
3585 assert!(err.contains("cycle"), "error should mention cycle: {err}");
3586 }
3587
3588 #[test]
3593 fn test_correlation_events_disabled_by_default() {
3594 let yaml = r#"
3595title: Login
3596id: login-rule
3597logsource:
3598 category: auth
3599detection:
3600 selection:
3601 EventType: login
3602 condition: selection
3603---
3604title: Many Logins
3605correlation:
3606 type: event_count
3607 rules:
3608 - login-rule
3609 group-by:
3610 - User
3611 timespan: 60s
3612 condition:
3613 gte: 3
3614level: high
3615"#;
3616 let collection = parse_sigma_yaml(yaml).unwrap();
3617 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3618 engine.add_collection(&collection).unwrap();
3619
3620 for i in 0..3 {
3621 let v = json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i});
3622 let event = JsonEvent::borrow(&v);
3623 let result = engine.process_event_at(&event, 1000 + i);
3624 if i == 2 {
3625 assert_eq!(result.correlations.len(), 1);
3626 assert!(result.correlations[0].events.is_none());
3628 }
3629 }
3630 }
3631
3632 #[test]
3633 fn test_correlation_events_included_when_enabled() {
3634 let yaml = r#"
3635title: Login
3636id: login-rule
3637logsource:
3638 category: auth
3639detection:
3640 selection:
3641 EventType: login
3642 condition: selection
3643---
3644title: Many Logins
3645correlation:
3646 type: event_count
3647 rules:
3648 - login-rule
3649 group-by:
3650 - User
3651 timespan: 60s
3652 condition:
3653 gte: 3
3654level: high
3655"#;
3656 let collection = parse_sigma_yaml(yaml).unwrap();
3657 let config = CorrelationConfig {
3658 correlation_event_mode: CorrelationEventMode::Full,
3659 max_correlation_events: 10,
3660 ..Default::default()
3661 };
3662 let mut engine = CorrelationEngine::new(config);
3663 engine.add_collection(&collection).unwrap();
3664
3665 let events_sent: Vec<serde_json::Value> = (0..3)
3666 .map(|i| json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i}))
3667 .collect();
3668
3669 let mut corr_result = None;
3670 for (i, ev) in events_sent.iter().enumerate() {
3671 let event = JsonEvent::borrow(ev);
3672 let result = engine.process_event_at(&event, 1000 + i as i64);
3673 if !result.correlations.is_empty() {
3674 corr_result = Some(result);
3675 }
3676 }
3677
3678 let result = corr_result.expect("correlation should have fired");
3679 let corr = &result.correlations[0];
3680
3681 let events = corr.events.as_ref().expect("events should be present");
3683 assert_eq!(
3684 events.len(),
3685 3,
3686 "all 3 contributing events should be stored"
3687 );
3688
3689 for (i, event) in events.iter().enumerate() {
3691 assert_eq!(event["EventType"], "login");
3692 assert_eq!(event["User"], "admin");
3693 assert_eq!(event["@timestamp"], 1000 + i as i64);
3694 }
3695 }
3696
3697 #[test]
3698 fn test_correlation_events_max_cap() {
3699 let yaml = r#"
3700title: Login
3701id: login-rule
3702logsource:
3703 category: auth
3704detection:
3705 selection:
3706 EventType: login
3707 condition: selection
3708---
3709title: Many Logins
3710correlation:
3711 type: event_count
3712 rules:
3713 - login-rule
3714 group-by:
3715 - User
3716 timespan: 60s
3717 condition:
3718 gte: 5
3719level: high
3720"#;
3721 let collection = parse_sigma_yaml(yaml).unwrap();
3722 let config = CorrelationConfig {
3723 correlation_event_mode: CorrelationEventMode::Full,
3724 max_correlation_events: 3, ..Default::default()
3726 };
3727 let mut engine = CorrelationEngine::new(config);
3728 engine.add_collection(&collection).unwrap();
3729
3730 let mut corr_result = None;
3731 for i in 0..5 {
3732 let v = json!({"EventType": "login", "User": "admin", "idx": i});
3733 let event = JsonEvent::borrow(&v);
3734 let result = engine.process_event_at(&event, 1000 + i);
3735 if !result.correlations.is_empty() {
3736 corr_result = Some(result);
3737 }
3738 }
3739
3740 let result = corr_result.expect("correlation should have fired");
3741 let events = result.correlations[0]
3742 .events
3743 .as_ref()
3744 .expect("events should be present");
3745
3746 assert_eq!(events.len(), 3);
3748 assert_eq!(events[0]["idx"], 2);
3749 assert_eq!(events[1]["idx"], 3);
3750 assert_eq!(events[2]["idx"], 4);
3751 }
3752
3753 #[test]
3754 fn test_correlation_events_with_reset_action() {
3755 let yaml = r#"
3756title: Login
3757id: login-rule
3758logsource:
3759 category: auth
3760detection:
3761 selection:
3762 EventType: login
3763 condition: selection
3764---
3765title: Many Logins
3766correlation:
3767 type: event_count
3768 rules:
3769 - login-rule
3770 group-by:
3771 - User
3772 timespan: 60s
3773 condition:
3774 gte: 2
3775level: high
3776"#;
3777 let collection = parse_sigma_yaml(yaml).unwrap();
3778 let config = CorrelationConfig {
3779 correlation_event_mode: CorrelationEventMode::Full,
3780 action_on_match: CorrelationAction::Reset,
3781 ..Default::default()
3782 };
3783 let mut engine = CorrelationEngine::new(config);
3784 engine.add_collection(&collection).unwrap();
3785
3786 for i in 0..2 {
3788 let v = json!({"EventType": "login", "User": "admin", "round": 1, "idx": i});
3789 let event = JsonEvent::borrow(&v);
3790 let result = engine.process_event_at(&event, 1000 + i);
3791 if i == 1 {
3792 assert_eq!(result.correlations.len(), 1);
3793 let events = result.correlations[0].events.as_ref().unwrap();
3794 assert_eq!(events.len(), 2);
3795 }
3796 }
3797
3798 let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 0});
3801 let event = JsonEvent::borrow(&v);
3802 let result = engine.process_event_at(&event, 1010);
3803 assert!(
3804 result.correlations.is_empty(),
3805 "should not fire with only 1 event after reset"
3806 );
3807
3808 let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 1});
3809 let event = JsonEvent::borrow(&v);
3810 let result = engine.process_event_at(&event, 1011);
3811 assert_eq!(result.correlations.len(), 1);
3812 let events = result.correlations[0].events.as_ref().unwrap();
3813 assert_eq!(events.len(), 2);
3814 assert_eq!(events[0]["round"], 2);
3816 assert_eq!(events[1]["round"], 2);
3817 }
3818
3819 #[test]
3820 fn test_correlation_events_with_set_include() {
3821 let yaml = r#"
3822title: Login
3823id: login-rule
3824logsource:
3825 category: auth
3826detection:
3827 selection:
3828 EventType: login
3829 condition: selection
3830---
3831title: Many Logins
3832correlation:
3833 type: event_count
3834 rules:
3835 - login-rule
3836 group-by:
3837 - User
3838 timespan: 60s
3839 condition:
3840 gte: 2
3841level: high
3842"#;
3843 let collection = parse_sigma_yaml(yaml).unwrap();
3844 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3845 engine.add_collection(&collection).unwrap();
3846
3847 engine.set_correlation_event_mode(CorrelationEventMode::Full);
3849
3850 for i in 0..2 {
3851 let v = json!({"EventType": "login", "User": "admin"});
3852 let event = JsonEvent::borrow(&v);
3853 let result = engine.process_event_at(&event, 1000 + i);
3854 if i == 1 {
3855 assert_eq!(result.correlations.len(), 1);
3856 assert!(result.correlations[0].events.is_some());
3857 assert_eq!(result.correlations[0].events.as_ref().unwrap().len(), 2);
3858 }
3859 }
3860 }
3861
3862 #[test]
3863 fn test_correlation_events_eviction_syncs_with_window() {
3864 let yaml = r#"
3865title: Login
3866id: login-rule
3867logsource:
3868 category: auth
3869detection:
3870 selection:
3871 EventType: login
3872 condition: selection
3873---
3874title: Many Logins
3875correlation:
3876 type: event_count
3877 rules:
3878 - login-rule
3879 group-by:
3880 - User
3881 timespan: 10s
3882 condition:
3883 gte: 3
3884level: high
3885"#;
3886 let collection = parse_sigma_yaml(yaml).unwrap();
3887 let config = CorrelationConfig {
3888 correlation_event_mode: CorrelationEventMode::Full,
3889 max_correlation_events: 100,
3890 ..Default::default()
3891 };
3892 let mut engine = CorrelationEngine::new(config);
3893 engine.add_collection(&collection).unwrap();
3894
3895 for i in 0..2 {
3897 let v = json!({"EventType": "login", "User": "admin", "idx": i});
3898 let event = JsonEvent::borrow(&v);
3899 engine.process_event_at(&event, 1000 + i);
3900 }
3901
3902 let v = json!({"EventType": "login", "User": "admin", "idx": 2});
3905 let event = JsonEvent::borrow(&v);
3906 let result = engine.process_event_at(&event, 1015);
3907 assert!(
3909 result.correlations.is_empty(),
3910 "should not fire — old events evicted"
3911 );
3912
3913 for i in 3..5 {
3915 let v = json!({"EventType": "login", "User": "admin", "idx": i});
3916 let event = JsonEvent::borrow(&v);
3917 let result = engine.process_event_at(&event, 1016 + i - 3);
3918 if i == 4 {
3919 assert_eq!(result.correlations.len(), 1);
3920 let events = result.correlations[0].events.as_ref().unwrap();
3921 assert_eq!(events.len(), 3);
3923 for ev in events {
3924 assert!(ev["idx"].as_i64().unwrap() >= 2);
3925 }
3926 }
3927 }
3928 }
3929
3930 #[test]
3931 fn test_event_buffer_monitoring() {
3932 let yaml = r#"
3933title: Login
3934id: login-rule
3935logsource:
3936 category: auth
3937detection:
3938 selection:
3939 EventType: login
3940 condition: selection
3941---
3942title: Many Logins
3943correlation:
3944 type: event_count
3945 rules:
3946 - login-rule
3947 group-by:
3948 - User
3949 timespan: 60s
3950 condition:
3951 gte: 100
3952level: high
3953"#;
3954 let collection = parse_sigma_yaml(yaml).unwrap();
3955 let config = CorrelationConfig {
3956 correlation_event_mode: CorrelationEventMode::Full,
3957 ..Default::default()
3958 };
3959 let mut engine = CorrelationEngine::new(config);
3960 engine.add_collection(&collection).unwrap();
3961
3962 assert_eq!(engine.event_buffer_count(), 0);
3963 assert_eq!(engine.event_buffer_bytes(), 0);
3964
3965 for i in 0..5 {
3967 let v = json!({"EventType": "login", "User": "admin"});
3968 let event = JsonEvent::borrow(&v);
3969 engine.process_event_at(&event, 1000 + i);
3970 }
3971
3972 assert_eq!(engine.event_buffer_count(), 1); assert!(engine.event_buffer_bytes() > 0);
3974 }
3975
3976 #[test]
3977 fn test_correlation_refs_mode_basic() {
3978 let yaml = r#"
3979title: Login
3980id: login-rule
3981logsource:
3982 category: auth
3983detection:
3984 selection:
3985 EventType: login
3986 condition: selection
3987---
3988title: Many Logins
3989correlation:
3990 type: event_count
3991 rules:
3992 - login-rule
3993 group-by:
3994 - User
3995 timespan: 60s
3996 condition:
3997 gte: 3
3998level: high
3999"#;
4000 let collection = parse_sigma_yaml(yaml).unwrap();
4001 let config = CorrelationConfig {
4002 correlation_event_mode: CorrelationEventMode::Refs,
4003 max_correlation_events: 10,
4004 ..Default::default()
4005 };
4006 let mut engine = CorrelationEngine::new(config);
4007 engine.add_collection(&collection).unwrap();
4008
4009 let mut corr_result = None;
4010 for i in 0..3 {
4011 let v = json!({"EventType": "login", "User": "admin", "id": format!("evt-{i}"), "@timestamp": 1000 + i});
4012 let event = JsonEvent::borrow(&v);
4013 let result = engine.process_event_at(&event, 1000 + i);
4014 if !result.correlations.is_empty() {
4015 corr_result = Some(result.correlations[0].clone());
4016 }
4017 }
4018
4019 let result = corr_result.expect("correlation should have fired");
4020 assert!(
4022 result.events.is_none(),
4023 "Full events should not be included in refs mode"
4024 );
4025 let refs = result
4026 .event_refs
4027 .expect("event_refs should be present in refs mode");
4028 assert_eq!(refs.len(), 3);
4029 assert_eq!(refs[0].timestamp, 1000);
4030 assert_eq!(refs[0].id, Some("evt-0".to_string()));
4031 assert_eq!(refs[1].id, Some("evt-1".to_string()));
4032 assert_eq!(refs[2].id, Some("evt-2".to_string()));
4033 }
4034
4035 #[test]
4036 fn test_correlation_refs_mode_no_id_field() {
4037 let yaml = r#"
4038title: Login
4039id: login-rule
4040logsource:
4041 category: auth
4042detection:
4043 selection:
4044 EventType: login
4045 condition: selection
4046---
4047title: Many Logins
4048correlation:
4049 type: event_count
4050 rules:
4051 - login-rule
4052 group-by:
4053 - User
4054 timespan: 60s
4055 condition:
4056 gte: 2
4057level: high
4058"#;
4059 let collection = parse_sigma_yaml(yaml).unwrap();
4060 let config = CorrelationConfig {
4061 correlation_event_mode: CorrelationEventMode::Refs,
4062 ..Default::default()
4063 };
4064 let mut engine = CorrelationEngine::new(config);
4065 engine.add_collection(&collection).unwrap();
4066
4067 let mut corr_result = None;
4068 for i in 0..2 {
4069 let v = json!({"EventType": "login", "User": "admin"});
4070 let event = JsonEvent::borrow(&v);
4071 let result = engine.process_event_at(&event, 1000 + i);
4072 if !result.correlations.is_empty() {
4073 corr_result = Some(result.correlations[0].clone());
4074 }
4075 }
4076
4077 let result = corr_result.expect("correlation should have fired");
4078 let refs = result.event_refs.expect("event_refs should be present");
4079 for r in &refs {
4081 assert_eq!(r.id, None);
4082 }
4083 }
4084
4085 #[test]
4086 fn test_per_correlation_custom_attributes_from_yaml() {
4087 let yaml = r#"
4088title: Login
4089id: login-rule
4090logsource:
4091 category: auth
4092detection:
4093 selection:
4094 EventType: login
4095 condition: selection
4096---
4097title: Many Logins
4098custom_attributes:
4099 rsigma.correlation_event_mode: refs
4100 rsigma.max_correlation_events: "5"
4101correlation:
4102 type: event_count
4103 rules:
4104 - login-rule
4105 group-by:
4106 - User
4107 timespan: 60s
4108 condition:
4109 gte: 3
4110level: high
4111"#;
4112 let collection = parse_sigma_yaml(yaml).unwrap();
4113 let config = CorrelationConfig::default();
4115 let mut engine = CorrelationEngine::new(config);
4116 engine.add_collection(&collection).unwrap();
4117
4118 let mut corr_result = None;
4119 for i in 0..3 {
4120 let v = json!({"EventType": "login", "User": "admin", "id": format!("e{i}")});
4121 let event = JsonEvent::borrow(&v);
4122 let result = engine.process_event_at(&event, 1000 + i);
4123 if !result.correlations.is_empty() {
4124 corr_result = Some(result.correlations[0].clone());
4125 }
4126 }
4127
4128 let result = corr_result.expect("correlation should fire with per-correlation refs mode");
4129 assert!(result.events.is_none());
4131 let refs = result
4132 .event_refs
4133 .expect("event_refs via per-correlation override");
4134 assert_eq!(refs.len(), 3);
4135 assert_eq!(refs[0].id, Some("e0".to_string()));
4136 }
4137
4138 #[test]
4139 fn test_per_correlation_custom_attr_suppress_and_action() {
4140 let yaml = r#"
4141title: Login
4142id: login-rule
4143logsource:
4144 category: auth
4145detection:
4146 selection:
4147 EventType: login
4148 condition: selection
4149---
4150title: Many Logins
4151custom_attributes:
4152 rsigma.suppress: 10s
4153 rsigma.action: reset
4154correlation:
4155 type: event_count
4156 rules:
4157 - login-rule
4158 group-by:
4159 - User
4160 timespan: 60s
4161 condition:
4162 gte: 2
4163level: high
4164"#;
4165 let collection = parse_sigma_yaml(yaml).unwrap();
4166 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
4167 engine.add_collection(&collection).unwrap();
4168
4169 assert_eq!(engine.correlations[0].suppress_secs, Some(10));
4171 assert_eq!(
4172 engine.correlations[0].action,
4173 Some(CorrelationAction::Reset)
4174 );
4175 }
4176
4177 #[test]
4178 fn test_process_with_detections_matches_process_event_at() {
4179 let yaml = r#"
4180title: Login Failure
4181id: login-fail
4182logsource:
4183 category: auth
4184detection:
4185 selection:
4186 EventType: login_failure
4187 condition: selection
4188---
4189title: Brute Force
4190correlation:
4191 type: event_count
4192 rules:
4193 - login-fail
4194 group-by:
4195 - User
4196 timespan: 60s
4197 condition:
4198 gte: 3
4199level: high
4200"#;
4201 let collection = parse_sigma_yaml(yaml).unwrap();
4202
4203 let mut engine1 = CorrelationEngine::new(CorrelationConfig::default());
4205 engine1.add_collection(&collection).unwrap();
4206
4207 let events: Vec<serde_json::Value> = (0..5)
4208 .map(|i| json!({"EventType": "login_failure", "User": "admin", "@timestamp": format!("2025-01-01T00:00:0{}Z", i + 1)}))
4209 .collect();
4210
4211 let results1: Vec<ProcessResult> = events
4212 .iter()
4213 .enumerate()
4214 .map(|(i, v)| {
4215 let e = JsonEvent::borrow(v);
4216 engine1.process_event_at(&e, 1000 + i as i64)
4217 })
4218 .collect();
4219
4220 let mut engine2 = CorrelationEngine::new(CorrelationConfig::default());
4222 engine2.add_collection(&collection).unwrap();
4223
4224 let results2: Vec<ProcessResult> = events
4225 .iter()
4226 .enumerate()
4227 .map(|(i, v)| {
4228 let e = JsonEvent::borrow(v);
4229 let detections = engine2.evaluate(&e);
4230 engine2.process_with_detections(&e, detections, 1000 + i as i64)
4231 })
4232 .collect();
4233
4234 assert_eq!(results1.len(), results2.len());
4236 for (r1, r2) in results1.iter().zip(results2.iter()) {
4237 assert_eq!(r1.detections.len(), r2.detections.len());
4238 assert_eq!(r1.correlations.len(), r2.correlations.len());
4239 }
4240 }
4241
4242 #[test]
4243 fn test_process_batch_matches_sequential() {
4244 let yaml = r#"
4245title: Login Failure
4246id: login-fail-batch
4247logsource:
4248 category: auth
4249detection:
4250 selection:
4251 EventType: login_failure
4252 condition: selection
4253---
4254title: Brute Force Batch
4255correlation:
4256 type: event_count
4257 rules:
4258 - login-fail-batch
4259 group-by:
4260 - User
4261 timespan: 60s
4262 condition:
4263 gte: 3
4264level: high
4265"#;
4266 let collection = parse_sigma_yaml(yaml).unwrap();
4267
4268 let event_values: Vec<serde_json::Value> = (0..5)
4269 .map(|i| json!({"EventType": "login_failure", "User": "admin", "@timestamp": format!("2025-01-01T00:00:0{}Z", i + 1)}))
4270 .collect();
4271
4272 let mut engine1 = CorrelationEngine::new(CorrelationConfig::default());
4274 engine1.add_collection(&collection).unwrap();
4275 let sequential: Vec<ProcessResult> = event_values
4276 .iter()
4277 .enumerate()
4278 .map(|(i, v)| {
4279 let e = JsonEvent::borrow(v);
4280 engine1.process_event_at(&e, 1000 + i as i64)
4281 })
4282 .collect();
4283
4284 let mut engine2 = CorrelationEngine::new(CorrelationConfig::default());
4286 engine2.add_collection(&collection).unwrap();
4287 let events: Vec<JsonEvent> = event_values.iter().map(JsonEvent::borrow).collect();
4288 let refs: Vec<&JsonEvent> = events.iter().collect();
4289 let batch = engine2.process_batch(&refs);
4290
4291 assert_eq!(sequential.len(), batch.len());
4292 for (seq, bat) in sequential.iter().zip(batch.iter()) {
4293 assert_eq!(seq.detections.len(), bat.detections.len());
4294 assert_eq!(seq.correlations.len(), bat.correlations.len());
4295 }
4296 }
4297
4298 #[test]
4299 fn test_correlation_result_custom_attributes() {
4300 let yaml = r#"
4301title: Login
4302id: login-cra
4303logsource:
4304 category: auth
4305detection:
4306 selection:
4307 EventType: login
4308 condition: selection
4309level: low
4310---
4311title: Many Logins
4312my_custom_field: hello
4313priority: 9
4314nested:
4315 key: value
4316correlation:
4317 type: event_count
4318 rules:
4319 - login-cra
4320 group-by:
4321 - User
4322 timespan: 60s
4323 condition:
4324 gte: 2
4325level: high
4326"#;
4327 let collection = parse_sigma_yaml(yaml).unwrap();
4328 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
4329 engine.add_collection(&collection).unwrap();
4330
4331 let base_ts = 1000i64;
4332 for i in 0..2 {
4333 let v = json!({"EventType": "login", "User": "alice"});
4334 let event = JsonEvent::borrow(&v);
4335 let result = engine.process_event_at(&event, base_ts + i * 10);
4336
4337 if i == 1 {
4338 assert_eq!(result.correlations.len(), 1);
4339 let corr = &result.correlations[0];
4340 assert_eq!(corr.rule_title, "Many Logins");
4341 assert_eq!(
4342 corr.custom_attributes.get("my_custom_field"),
4343 Some(&serde_json::Value::String("hello".to_string()))
4344 );
4345 assert_eq!(
4346 corr.custom_attributes.get("priority"),
4347 Some(&serde_json::json!(9))
4348 );
4349 let nested = corr.custom_attributes.get("nested").unwrap();
4350 assert_eq!(nested.get("key"), Some(&serde_json::json!("value")));
4351
4352 assert!(!corr.custom_attributes.contains_key("title"));
4353 assert!(!corr.custom_attributes.contains_key("correlation"));
4354 assert!(!corr.custom_attributes.contains_key("level"));
4355 }
4356 }
4357 }
4358
4359 #[test]
4360 fn test_detection_result_custom_attributes() {
4361 let yaml = r#"
4362title: Login Detection
4363logsource:
4364 category: auth
4365detection:
4366 selection:
4367 EventType: login
4368 condition: selection
4369level: low
4370my_detection_tag: important
4371score: 42
4372"#;
4373 let collection = parse_sigma_yaml(yaml).unwrap();
4374 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
4375 engine.add_collection(&collection).unwrap();
4376
4377 let v = json!({"EventType": "login"});
4378 let event = JsonEvent::borrow(&v);
4379 let result = engine.process_event(&event);
4380
4381 assert_eq!(result.detections.len(), 1);
4382 let det = &result.detections[0];
4383 assert_eq!(
4384 det.custom_attributes.get("my_detection_tag"),
4385 Some(&serde_json::Value::String("important".to_string()))
4386 );
4387 assert_eq!(
4388 det.custom_attributes.get("score"),
4389 Some(&serde_json::json!(42))
4390 );
4391 assert!(!det.custom_attributes.contains_key("title"));
4392 }
4393}