1use std::collections::HashMap;
16use std::sync::Arc;
17
18use chrono::{DateTime, TimeZone, Utc};
19use serde::Serialize;
20
21use rsigma_parser::{CorrelationRule, CorrelationType, Level, SigmaCollection, SigmaRule};
22
23use crate::correlation::{
24 CompiledCorrelation, EventBuffer, EventRef, EventRefBuffer, GroupKey, WindowState,
25 compile_correlation,
26};
27use crate::engine::Engine;
28use crate::error::{EvalError, Result};
29use crate::event::{Event, EventValue};
30use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
31use crate::result::MatchResult;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
42#[serde(rename_all = "snake_case")]
43pub enum CorrelationAction {
44 #[default]
47 Alert,
48 Reset,
51}
52
53impl std::str::FromStr for CorrelationAction {
54 type Err = String;
55 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
56 match s {
57 "alert" => Ok(CorrelationAction::Alert),
58 "reset" => Ok(CorrelationAction::Reset),
59 _ => Err(format!(
60 "Unknown correlation action: {s} (expected 'alert' or 'reset')"
61 )),
62 }
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
71#[serde(rename_all = "snake_case")]
72pub enum CorrelationEventMode {
73 #[default]
75 None,
76 Full,
79 Refs,
82}
83
84impl std::str::FromStr for CorrelationEventMode {
85 type Err = String;
86 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
87 match s.to_lowercase().as_str() {
88 "none" | "off" | "false" => Ok(CorrelationEventMode::None),
89 "full" | "true" => Ok(CorrelationEventMode::Full),
90 "refs" | "references" => Ok(CorrelationEventMode::Refs),
91 _ => Err(format!(
92 "Unknown correlation event mode: {s} (expected 'none', 'full', or 'refs')"
93 )),
94 }
95 }
96}
97
98impl std::fmt::Display for CorrelationEventMode {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 match self {
101 CorrelationEventMode::None => write!(f, "none"),
102 CorrelationEventMode::Full => write!(f, "full"),
103 CorrelationEventMode::Refs => write!(f, "refs"),
104 }
105 }
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
110pub enum TimestampFallback {
111 #[default]
113 WallClock,
114 Skip,
118}
119
120#[derive(Debug, Clone)]
126pub struct CorrelationConfig {
127 pub timestamp_fields: Vec<String>,
132
133 pub timestamp_fallback: TimestampFallback,
137
138 pub max_state_entries: usize,
143
144 pub suppress: Option<u64>,
152
153 pub action_on_match: CorrelationAction,
157
158 pub emit_detections: bool,
164
165 pub correlation_event_mode: CorrelationEventMode,
173
174 pub max_correlation_events: usize,
180}
181
182impl Default for CorrelationConfig {
183 fn default() -> Self {
184 CorrelationConfig {
185 timestamp_fields: vec![
186 "@timestamp".to_string(),
187 "timestamp".to_string(),
188 "EventTime".to_string(),
189 "TimeCreated".to_string(),
190 "eventTime".to_string(),
191 ],
192 timestamp_fallback: TimestampFallback::default(),
193 max_state_entries: 100_000,
194 suppress: None,
195 action_on_match: CorrelationAction::default(),
196 emit_detections: true,
197 correlation_event_mode: CorrelationEventMode::default(),
198 max_correlation_events: 10,
199 }
200 }
201}
202
203#[derive(Debug, Clone, Serialize)]
209pub struct ProcessResult {
210 pub detections: Vec<MatchResult>,
212 pub correlations: Vec<CorrelationResult>,
214}
215
216#[derive(Debug, Clone, Serialize)]
218pub struct CorrelationResult {
219 pub rule_title: String,
221 pub rule_id: Option<String>,
223 pub level: Option<Level>,
225 pub tags: Vec<String>,
227 pub correlation_type: CorrelationType,
229 pub group_key: Vec<(String, String)>,
231 pub aggregated_value: f64,
233 pub timespan_secs: u64,
235 #[serde(skip_serializing_if = "Option::is_none")]
240 pub events: Option<Vec<serde_json::Value>>,
241 #[serde(skip_serializing_if = "Option::is_none")]
245 pub event_refs: Option<Vec<EventRef>>,
246 #[serde(skip_serializing_if = "HashMap::is_empty")]
250 pub custom_attributes: Arc<HashMap<String, serde_json::Value>>,
251}
252
253pub struct CorrelationEngine {
262 engine: Engine,
264 correlations: Vec<CompiledCorrelation>,
266 rule_index: HashMap<String, Vec<usize>>,
269 rule_ids: Vec<(Option<String>, Option<String>)>,
272 state: HashMap<(usize, GroupKey), WindowState>,
274 last_alert: HashMap<(usize, GroupKey), i64>,
276 event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
278 event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
280 correlation_only_rules: std::collections::HashSet<String>,
284 config: CorrelationConfig,
286 pipelines: Vec<Pipeline>,
288}
289
290impl CorrelationEngine {
291 pub fn new(config: CorrelationConfig) -> Self {
293 CorrelationEngine {
294 engine: Engine::new(),
295 correlations: Vec::new(),
296 rule_index: HashMap::new(),
297 rule_ids: Vec::new(),
298 state: HashMap::new(),
299 last_alert: HashMap::new(),
300 event_buffers: HashMap::new(),
301 event_ref_buffers: HashMap::new(),
302 correlation_only_rules: std::collections::HashSet::new(),
303 config,
304 pipelines: Vec::new(),
305 }
306 }
307
308 pub fn add_pipeline(&mut self, pipeline: Pipeline) {
312 self.pipelines.push(pipeline);
313 self.pipelines.sort_by_key(|p| p.priority);
314 }
315
316 pub fn set_include_event(&mut self, include: bool) {
318 self.engine.set_include_event(include);
319 }
320
321 pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
327 self.config.correlation_event_mode = mode;
328 }
329
330 pub fn set_max_correlation_events(&mut self, max: usize) {
333 self.config.max_correlation_events = max;
334 }
335
336 pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
342 if self.pipelines.is_empty() {
343 self.apply_custom_attributes(&rule.custom_attributes);
344 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
345 self.engine.add_rule(rule)?;
346 } else {
347 let mut transformed = rule.clone();
348 apply_pipelines(&self.pipelines, &mut transformed)?;
349 self.apply_custom_attributes(&transformed.custom_attributes);
350 self.rule_ids
351 .push((transformed.id.clone(), transformed.name.clone()));
352 let compiled = crate::compiler::compile_rule(&transformed)?;
354 self.engine.add_compiled_rule(compiled);
355 }
356 Ok(())
357 }
358
359 fn apply_custom_attributes(
373 &mut self,
374 attrs: &std::collections::HashMap<String, serde_yaml::Value>,
375 ) {
376 if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
378 && !self.config.timestamp_fields.iter().any(|f| f == field)
379 {
380 self.config.timestamp_fields.insert(0, field.to_string());
381 }
382
383 if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
385 && self.config.suppress.is_none()
386 && let Ok(ts) = rsigma_parser::Timespan::parse(val)
387 {
388 self.config.suppress = Some(ts.seconds);
389 }
390
391 if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
393 && self.config.action_on_match == CorrelationAction::Alert
394 && let Ok(a) = val.parse::<CorrelationAction>()
395 {
396 self.config.action_on_match = a;
397 }
398 }
399
400 pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
402 let owned;
403 let effective = if self.pipelines.is_empty() {
404 corr
405 } else {
406 owned = {
407 let mut c = corr.clone();
408 apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
409 c
410 };
411 &owned
412 };
413
414 self.apply_custom_attributes(&effective.custom_attributes);
417
418 let compiled = compile_correlation(effective)?;
419 let idx = self.correlations.len();
420
421 for rule_ref in &compiled.rule_refs {
423 self.rule_index
424 .entry(rule_ref.clone())
425 .or_default()
426 .push(idx);
427 }
428
429 if !compiled.generate {
431 for rule_ref in &compiled.rule_refs {
432 self.correlation_only_rules.insert(rule_ref.clone());
433 }
434 }
435
436 self.correlations.push(compiled);
437 Ok(())
438 }
439
440 pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
445 for rule in &collection.rules {
446 self.add_rule(rule)?;
447 }
448 for filter in &collection.filters {
450 self.engine.apply_filter(filter)?;
451 }
452 for corr in &collection.correlations {
453 self.add_correlation(corr)?;
454 }
455 self.validate_rule_refs()?;
456 self.detect_correlation_cycles()?;
457 Ok(())
458 }
459
460 fn validate_rule_refs(&self) -> Result<()> {
463 let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
464
465 for (id, name) in &self.rule_ids {
466 if let Some(id) = id {
467 known.insert(id.as_str());
468 }
469 if let Some(name) = name {
470 known.insert(name.as_str());
471 }
472 }
473 for corr in &self.correlations {
474 if let Some(ref id) = corr.id {
475 known.insert(id.as_str());
476 }
477 if let Some(ref name) = corr.name {
478 known.insert(name.as_str());
479 }
480 }
481
482 for corr in &self.correlations {
483 for rule_ref in &corr.rule_refs {
484 if !known.contains(rule_ref.as_str()) {
485 return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
486 }
487 }
488 }
489 Ok(())
490 }
491
492 fn detect_correlation_cycles(&self) -> Result<()> {
500 let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
502 for (idx, corr) in self.correlations.iter().enumerate() {
503 if let Some(ref id) = corr.id {
504 corr_identifiers.insert(id.as_str(), idx);
505 }
506 if let Some(ref name) = corr.name {
507 corr_identifiers.insert(name.as_str(), idx);
508 }
509 }
510
511 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
513 for (idx, corr) in self.correlations.iter().enumerate() {
514 for rule_ref in &corr.rule_refs {
515 if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
516 adj[idx].push(target_idx);
517 }
518 }
519 }
520
521 let mut state = vec![0u8; self.correlations.len()]; let mut path: Vec<usize> = Vec::new();
524
525 for start in 0..self.correlations.len() {
526 if state[start] == 0
527 && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
528 {
529 let names: Vec<String> = cycle
530 .iter()
531 .map(|&i| {
532 self.correlations[i]
533 .id
534 .as_deref()
535 .or(self.correlations[i].name.as_deref())
536 .unwrap_or(&self.correlations[i].title)
537 .to_string()
538 })
539 .collect();
540 return Err(crate::error::EvalError::CorrelationCycle(
541 names.join(" -> "),
542 ));
543 }
544 }
545 Ok(())
546 }
547
548 fn dfs_find_cycle(
550 node: usize,
551 adj: &[Vec<usize>],
552 state: &mut [u8],
553 path: &mut Vec<usize>,
554 ) -> Option<Vec<usize>> {
555 state[node] = 1; path.push(node);
557
558 for &next in &adj[node] {
559 if state[next] == 1 {
560 if let Some(pos) = path.iter().position(|&n| n == next) {
562 let mut cycle = path[pos..].to_vec();
563 cycle.push(next); return Some(cycle);
565 }
566 }
567 if state[next] == 0
568 && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
569 {
570 return Some(cycle);
571 }
572 }
573
574 path.pop();
575 state[node] = 2; None
577 }
578
579 pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
585 let all_detections = self.engine.evaluate(event);
586
587 let ts = match self.extract_event_timestamp(event) {
588 Some(ts) => ts,
589 None => match self.config.timestamp_fallback {
590 TimestampFallback::WallClock => Utc::now().timestamp(),
591 TimestampFallback::Skip => {
592 let detections = self.filter_detections(all_detections);
594 return ProcessResult {
595 detections,
596 correlations: Vec::new(),
597 };
598 }
599 },
600 };
601 self.process_with_detections(event, all_detections, ts)
602 }
603
604 pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
609 let all_detections = self.engine.evaluate(event);
610 self.process_with_detections(event, all_detections, timestamp_secs)
611 }
612
613 pub fn process_with_detections(
619 &mut self,
620 event: &impl Event,
621 all_detections: Vec<MatchResult>,
622 timestamp_secs: i64,
623 ) -> ProcessResult {
624 let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
625
626 if self.state.len() >= self.config.max_state_entries {
628 self.evict_all(timestamp_secs);
629 }
630
631 let mut correlations = Vec::new();
633 self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
634
635 self.chain_correlations(&correlations, timestamp_secs);
637
638 let detections = self.filter_detections(all_detections);
640
641 ProcessResult {
642 detections,
643 correlations,
644 }
645 }
646
647 pub fn evaluate(&self, event: &impl Event) -> Vec<MatchResult> {
653 self.engine.evaluate(event)
654 }
655
656 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
664 let engine = &self.engine;
667 let ts_fields = &self.config.timestamp_fields;
668
669 let batch_results: Vec<(Vec<MatchResult>, Option<i64>)> = {
670 #[cfg(feature = "parallel")]
671 {
672 use rayon::prelude::*;
673 events
674 .par_iter()
675 .map(|e| {
676 let detections = engine.evaluate(e);
677 let ts = extract_event_ts(e, ts_fields);
678 (detections, ts)
679 })
680 .collect()
681 }
682 #[cfg(not(feature = "parallel"))]
683 {
684 events
685 .iter()
686 .map(|e| {
687 let detections = engine.evaluate(e);
688 let ts = extract_event_ts(e, ts_fields);
689 (detections, ts)
690 })
691 .collect()
692 }
693 };
694
695 let mut results = Vec::with_capacity(events.len());
697 for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
698 match ts_opt {
699 Some(ts) => {
700 results.push(self.process_with_detections(event, detections, ts));
701 }
702 None => match self.config.timestamp_fallback {
703 TimestampFallback::WallClock => {
704 let ts = Utc::now().timestamp();
705 results.push(self.process_with_detections(event, detections, ts));
706 }
707 TimestampFallback::Skip => {
708 let detections = self.filter_detections(detections);
710 results.push(ProcessResult {
711 detections,
712 correlations: Vec::new(),
713 });
714 }
715 },
716 }
717 }
718 results
719 }
720
721 fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
726 if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
727 all_detections
728 .into_iter()
729 .filter(|m| {
730 let id_match = m
731 .rule_id
732 .as_ref()
733 .is_some_and(|id| self.correlation_only_rules.contains(id));
734 !id_match
735 })
736 .collect()
737 } else {
738 all_detections
739 }
740 }
741
742 fn feed_detections(
744 &mut self,
745 event: &impl Event,
746 detections: &[MatchResult],
747 ts: i64,
748 out: &mut Vec<CorrelationResult>,
749 ) {
750 let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
753
754 for det in detections {
755 let (rule_id, rule_name) = self.find_rule_identity(det);
758
759 let mut corr_indices = Vec::new();
761 if let Some(ref id) = rule_id
762 && let Some(indices) = self.rule_index.get(id)
763 {
764 corr_indices.extend(indices);
765 }
766 if let Some(ref name) = rule_name
767 && let Some(indices) = self.rule_index.get(name)
768 {
769 corr_indices.extend(indices);
770 }
771
772 corr_indices.sort_unstable();
773 corr_indices.dedup();
774
775 for &corr_idx in &corr_indices {
776 work.push((corr_idx, rule_id.clone(), rule_name.clone()));
777 }
778 }
779
780 for (corr_idx, rule_id, rule_name) in work {
781 self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
782 }
783 }
784
785 fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
787 if let Some(ref match_id) = det.rule_id {
789 for (id, name) in &self.rule_ids {
790 if id.as_deref() == Some(match_id.as_str()) {
791 return (id.clone(), name.clone());
792 }
793 }
794 }
795 (det.rule_id.clone(), None)
797 }
798
799 fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
801 let corr = &self.correlations[corr_idx];
802 corr.event_mode
803 .unwrap_or(self.config.correlation_event_mode)
804 }
805
806 fn resolve_max_events(&self, corr_idx: usize) -> usize {
808 let corr = &self.correlations[corr_idx];
809 corr.max_events
810 .unwrap_or(self.config.max_correlation_events)
811 }
812
813 fn update_correlation(
815 &mut self,
816 corr_idx: usize,
817 event: &impl Event,
818 ts: i64,
819 rule_id: &Option<String>,
820 rule_name: &Option<String>,
821 out: &mut Vec<CorrelationResult>,
822 ) {
823 let corr = &self.correlations[corr_idx];
827 let corr_type = corr.correlation_type;
828 let timespan = corr.timespan_secs;
829 let level = corr.level;
830 let suppress_secs = corr.suppress_secs.or(self.config.suppress);
831 let action = corr.action.unwrap_or(self.config.action_on_match);
832 let event_mode = self.resolve_event_mode(corr_idx);
833 let max_events = self.resolve_max_events(corr_idx);
834
835 let mut ref_strs: Vec<&str> = Vec::new();
837 if let Some(id) = rule_id.as_deref() {
838 ref_strs.push(id);
839 }
840 if let Some(name) = rule_name.as_deref() {
841 ref_strs.push(name);
842 }
843 let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
844
845 let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
847
848 let state_key = (corr_idx, group_key.clone());
850 let state = self
851 .state
852 .entry(state_key.clone())
853 .or_insert_with(|| WindowState::new_for(corr_type));
854
855 let cutoff = ts - timespan as i64;
857 state.evict(cutoff);
858
859 match corr_type {
861 CorrelationType::EventCount => {
862 state.push_event_count(ts);
863 }
864 CorrelationType::ValueCount => {
865 if let Some(ref fields) = corr.condition.field
866 && let Some(field_name) = fields.first()
867 && let Some(val) = event.get_field(field_name)
868 && let Some(s) = value_to_string_for_count(&val)
869 {
870 state.push_value_count(ts, s);
871 }
872 }
873 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
874 state.push_temporal(ts, rule_ref);
875 }
876 CorrelationType::ValueSum
877 | CorrelationType::ValueAvg
878 | CorrelationType::ValuePercentile
879 | CorrelationType::ValueMedian => {
880 if let Some(ref fields) = corr.condition.field
881 && let Some(field_name) = fields.first()
882 && let Some(val) = event.get_field(field_name)
883 && let Some(n) = value_to_f64_ev(&val)
884 {
885 state.push_numeric(ts, n);
886 }
887 }
888 }
889
890 match event_mode {
892 CorrelationEventMode::Full => {
893 let buf = self
894 .event_buffers
895 .entry(state_key.clone())
896 .or_insert_with(|| EventBuffer::new(max_events));
897 buf.evict(cutoff);
898 let json = event.to_json();
899 buf.push(ts, &json);
900 }
901 CorrelationEventMode::Refs => {
902 let buf = self
903 .event_ref_buffers
904 .entry(state_key.clone())
905 .or_insert_with(|| EventRefBuffer::new(max_events));
906 buf.evict(cutoff);
907 let json = event.to_json();
908 buf.push(ts, &json);
909 }
910 CorrelationEventMode::None => {}
911 }
912
913 let fired = state.check_condition(
915 &corr.condition,
916 corr_type,
917 &corr.rule_refs,
918 corr.extended_expr.as_ref(),
919 );
920
921 if let Some(agg_value) = fired {
922 let alert_key = (corr_idx, group_key.clone());
923
924 let suppressed = if let Some(suppress) = suppress_secs {
926 if let Some(&last_ts) = self.last_alert.get(&alert_key) {
927 (ts - last_ts) < suppress as i64
928 } else {
929 false
930 }
931 } else {
932 false
933 };
934
935 if !suppressed {
936 let (events, event_refs) = match event_mode {
938 CorrelationEventMode::Full => {
939 let stored = self
940 .event_buffers
941 .get(&alert_key)
942 .map(|buf| buf.decompress_all())
943 .unwrap_or_default();
944 (Some(stored), None)
945 }
946 CorrelationEventMode::Refs => {
947 let stored = self
948 .event_ref_buffers
949 .get(&alert_key)
950 .map(|buf| buf.refs())
951 .unwrap_or_default();
952 (None, Some(stored))
953 }
954 CorrelationEventMode::None => (None, None),
955 };
956
957 let corr = &self.correlations[corr_idx];
959 let result = CorrelationResult {
960 rule_title: corr.title.clone(),
961 rule_id: corr.id.clone(),
962 level,
963 tags: corr.tags.clone(),
964 correlation_type: corr_type,
965 group_key: group_key.to_pairs(&corr.group_by),
966 aggregated_value: agg_value,
967 timespan_secs: timespan,
968 events,
969 event_refs,
970 custom_attributes: corr.custom_attributes.clone(),
971 };
972 out.push(result);
973
974 self.last_alert.insert(alert_key.clone(), ts);
976
977 if action == CorrelationAction::Reset {
979 if let Some(state) = self.state.get_mut(&alert_key) {
980 state.clear();
981 }
982 if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
983 buf.clear();
984 }
985 if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
986 buf.clear();
987 }
988 }
989 }
990 }
991 }
992
993 fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
998 const MAX_CHAIN_DEPTH: usize = 10;
999 let mut pending: Vec<CorrelationResult> = fired.to_vec();
1000 let mut depth = 0;
1001
1002 while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
1003 depth += 1;
1004
1005 #[allow(clippy::type_complexity)]
1007 let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
1008 for result in &pending {
1009 if let Some(ref id) = result.rule_id
1010 && let Some(indices) = self.rule_index.get(id)
1011 {
1012 let fired_ref = result
1013 .rule_id
1014 .as_deref()
1015 .unwrap_or(&result.rule_title)
1016 .to_string();
1017 for &corr_idx in indices {
1018 work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
1019 }
1020 }
1021 }
1022
1023 let mut next_pending = Vec::new();
1024 for (corr_idx, group_key_pairs, fired_ref) in work {
1025 let corr = &self.correlations[corr_idx];
1026 let corr_type = corr.correlation_type;
1027 let timespan = corr.timespan_secs;
1028 let level = corr.level;
1029
1030 let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
1031 let state_key = (corr_idx, group_key.clone());
1032 let state = self
1033 .state
1034 .entry(state_key)
1035 .or_insert_with(|| WindowState::new_for(corr_type));
1036
1037 let cutoff = ts - timespan as i64;
1038 state.evict(cutoff);
1039
1040 match corr_type {
1041 CorrelationType::EventCount => {
1042 state.push_event_count(ts);
1043 }
1044 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
1045 state.push_temporal(ts, &fired_ref);
1046 }
1047 _ => {
1048 state.push_event_count(ts);
1049 }
1050 }
1051
1052 let fired = state.check_condition(
1053 &corr.condition,
1054 corr_type,
1055 &corr.rule_refs,
1056 corr.extended_expr.as_ref(),
1057 );
1058
1059 if let Some(agg_value) = fired {
1060 let corr = &self.correlations[corr_idx];
1061 next_pending.push(CorrelationResult {
1062 rule_title: corr.title.clone(),
1063 rule_id: corr.id.clone(),
1064 level,
1065 tags: corr.tags.clone(),
1066 correlation_type: corr_type,
1067 group_key: group_key.to_pairs(&corr.group_by),
1068 aggregated_value: agg_value,
1069 timespan_secs: timespan,
1070 events: None,
1073 event_refs: None,
1074 custom_attributes: corr.custom_attributes.clone(),
1075 });
1076 }
1077 }
1078
1079 pending = next_pending;
1080 }
1081
1082 if !pending.is_empty() {
1083 log::warn!(
1084 "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
1085 {} pending result(s) were not propagated further. \
1086 This may indicate a cycle in correlation references.",
1087 pending.len()
1088 );
1089 }
1090 }
1091
1092 fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
1104 for field_name in &self.config.timestamp_fields {
1105 if let Some(val) = event.get_field(field_name)
1106 && let Some(ts) = parse_timestamp_value(&val)
1107 {
1108 return Some(ts);
1109 }
1110 }
1111 None
1112 }
1113
1114 pub fn evict_expired(&mut self, now_secs: i64) {
1120 self.evict_all(now_secs);
1121 }
1122
1123 fn evict_all(&mut self, now_secs: i64) {
1125 let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
1127
1128 self.state.retain(|&(corr_idx, _), state| {
1129 if corr_idx < timespans.len() {
1130 let cutoff = now_secs - timespans[corr_idx] as i64;
1131 state.evict(cutoff);
1132 }
1133 !state.is_empty()
1134 });
1135
1136 self.event_buffers.retain(|&(corr_idx, _), buf| {
1138 if corr_idx < timespans.len() {
1139 let cutoff = now_secs - timespans[corr_idx] as i64;
1140 buf.evict(cutoff);
1141 }
1142 !buf.is_empty()
1143 });
1144 self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
1145 if corr_idx < timespans.len() {
1146 let cutoff = now_secs - timespans[corr_idx] as i64;
1147 buf.evict(cutoff);
1148 }
1149 !buf.is_empty()
1150 });
1151
1152 if self.state.len() >= self.config.max_state_entries {
1156 let target = self.config.max_state_entries * 9 / 10;
1157 let excess = self.state.len() - target;
1158
1159 let mut by_staleness: Vec<_> = self
1161 .state
1162 .iter()
1163 .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1164 .collect();
1165 by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1166
1167 for (key, _) in by_staleness.into_iter().take(excess) {
1169 self.state.remove(&key);
1170 self.last_alert.remove(&key);
1171 self.event_buffers.remove(&key);
1172 self.event_ref_buffers.remove(&key);
1173 }
1174 }
1175
1176 self.last_alert.retain(|key, &mut alert_ts| {
1179 let suppress = if key.0 < self.correlations.len() {
1180 self.correlations[key.0]
1181 .suppress_secs
1182 .or(self.config.suppress)
1183 .unwrap_or(0)
1184 } else {
1185 0
1186 };
1187 (now_secs - alert_ts) < suppress as i64
1188 });
1189 }
1190
1191 pub fn state_count(&self) -> usize {
1193 self.state.len()
1194 }
1195
1196 pub fn detection_rule_count(&self) -> usize {
1198 self.engine.rule_count()
1199 }
1200
1201 pub fn correlation_rule_count(&self) -> usize {
1203 self.correlations.len()
1204 }
1205
1206 pub fn event_buffer_count(&self) -> usize {
1208 self.event_buffers.len()
1209 }
1210
1211 pub fn event_buffer_bytes(&self) -> usize {
1213 self.event_buffers
1214 .values()
1215 .map(|b| b.compressed_bytes())
1216 .sum()
1217 }
1218
1219 pub fn event_ref_buffer_count(&self) -> usize {
1221 self.event_ref_buffers.len()
1222 }
1223
1224 pub fn engine(&self) -> &Engine {
1226 &self.engine
1227 }
1228
1229 pub fn export_state(&self) -> CorrelationSnapshot {
1235 let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1236 for ((idx, gk), ws) in &self.state {
1237 let corr_id = self.correlation_stable_id(*idx);
1238 windows
1239 .entry(corr_id)
1240 .or_default()
1241 .push((gk.clone(), ws.clone()));
1242 }
1243
1244 let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1245 for ((idx, gk), ts) in &self.last_alert {
1246 let corr_id = self.correlation_stable_id(*idx);
1247 last_alert
1248 .entry(corr_id)
1249 .or_default()
1250 .push((gk.clone(), *ts));
1251 }
1252
1253 let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1254 for ((idx, gk), buf) in &self.event_buffers {
1255 let corr_id = self.correlation_stable_id(*idx);
1256 event_buffers
1257 .entry(corr_id)
1258 .or_default()
1259 .push((gk.clone(), buf.clone()));
1260 }
1261
1262 let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1263 HashMap::new();
1264 for ((idx, gk), buf) in &self.event_ref_buffers {
1265 let corr_id = self.correlation_stable_id(*idx);
1266 event_ref_buffers
1267 .entry(corr_id)
1268 .or_default()
1269 .push((gk.clone(), buf.clone()));
1270 }
1271
1272 CorrelationSnapshot {
1273 version: SNAPSHOT_VERSION,
1274 windows,
1275 last_alert,
1276 event_buffers,
1277 event_ref_buffers,
1278 }
1279 }
1280
1281 pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1288 if snapshot.version != SNAPSHOT_VERSION {
1289 return false;
1290 }
1291 let id_to_idx = self.build_id_to_index_map();
1292
1293 for (corr_id, groups) in snapshot.windows {
1294 if let Some(&idx) = id_to_idx.get(&corr_id) {
1295 for (gk, ws) in groups {
1296 self.state.insert((idx, gk), ws);
1297 }
1298 }
1299 }
1300
1301 for (corr_id, groups) in snapshot.last_alert {
1302 if let Some(&idx) = id_to_idx.get(&corr_id) {
1303 for (gk, ts) in groups {
1304 self.last_alert.insert((idx, gk), ts);
1305 }
1306 }
1307 }
1308
1309 for (corr_id, groups) in snapshot.event_buffers {
1310 if let Some(&idx) = id_to_idx.get(&corr_id) {
1311 for (gk, buf) in groups {
1312 self.event_buffers.insert((idx, gk), buf);
1313 }
1314 }
1315 }
1316
1317 for (corr_id, groups) in snapshot.event_ref_buffers {
1318 if let Some(&idx) = id_to_idx.get(&corr_id) {
1319 for (gk, buf) in groups {
1320 self.event_ref_buffers.insert((idx, gk), buf);
1321 }
1322 }
1323 }
1324
1325 true
1326 }
1327
1328 fn correlation_stable_id(&self, idx: usize) -> String {
1330 let corr = &self.correlations[idx];
1331 corr.id
1332 .clone()
1333 .or_else(|| corr.name.clone())
1334 .unwrap_or_else(|| corr.title.clone())
1335 }
1336
1337 fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1339 self.correlations
1340 .iter()
1341 .enumerate()
1342 .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1343 .collect()
1344 }
1345}
1346
1347const SNAPSHOT_VERSION: u32 = 1;
1349
1350#[derive(Debug, Clone, Serialize, serde::Deserialize)]
1357pub struct CorrelationSnapshot {
1358 #[serde(default = "default_snapshot_version")]
1360 pub version: u32,
1361 pub windows: HashMap<String, Vec<(GroupKey, WindowState)>>,
1363 pub last_alert: HashMap<String, Vec<(GroupKey, i64)>>,
1365 pub event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>>,
1367 pub event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>>,
1369}
1370
1371fn default_snapshot_version() -> u32 {
1372 1
1373}
1374
1375impl Default for CorrelationEngine {
1376 fn default() -> Self {
1377 Self::new(CorrelationConfig::default())
1378 }
1379}
1380
1381fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1390 for field_name in timestamp_fields {
1391 if let Some(val) = event.get_field(field_name)
1392 && let Some(ts) = parse_timestamp_value(&val)
1393 {
1394 return Some(ts);
1395 }
1396 }
1397 None
1398}
1399
1400fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1402 match val {
1403 EventValue::Int(i) => Some(normalize_epoch(*i)),
1404 EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1405 EventValue::Str(s) => parse_timestamp_string(s),
1406 _ => None,
1407 }
1408}
1409
1410fn normalize_epoch(v: i64) -> i64 {
1413 if v > 1_000_000_000_000 { v / 1000 } else { v }
1414}
1415
1416fn parse_timestamp_string(s: &str) -> Option<i64> {
1418 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1420 return Some(dt.timestamp());
1421 }
1422
1423 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1426 return Some(Utc.from_utc_datetime(&naive).timestamp());
1427 }
1428 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1429 return Some(Utc.from_utc_datetime(&naive).timestamp());
1430 }
1431
1432 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1434 return Some(Utc.from_utc_datetime(&naive).timestamp());
1435 }
1436 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1437 return Some(Utc.from_utc_datetime(&naive).timestamp());
1438 }
1439
1440 None
1441}
1442
1443fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1445 match v {
1446 EventValue::Str(s) => Some(s.to_string()),
1447 EventValue::Int(n) => Some(n.to_string()),
1448 EventValue::Float(f) => Some(f.to_string()),
1449 EventValue::Bool(b) => Some(b.to_string()),
1450 EventValue::Null => Some("null".to_string()),
1451 _ => None,
1452 }
1453}
1454
1455fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1457 v.as_f64()
1458}
1459
1460#[cfg(test)]
1465mod tests {
1466 use super::*;
1467 use crate::event::JsonEvent;
1468 use rsigma_parser::parse_sigma_yaml;
1469 use serde_json::json;
1470
1471 #[test]
1476 fn test_parse_timestamp_epoch_secs() {
1477 let val = EventValue::Int(1720612200);
1478 assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1479 }
1480
1481 #[test]
1482 fn test_parse_timestamp_epoch_millis() {
1483 let val = EventValue::Int(1720612200000);
1484 assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1485 }
1486
1487 #[test]
1488 fn test_parse_timestamp_rfc3339() {
1489 let val = EventValue::Str(std::borrow::Cow::Borrowed("2024-07-10T12:30:00Z"));
1490 let ts = parse_timestamp_value(&val).unwrap();
1491 assert_eq!(ts, 1720614600);
1492 }
1493
1494 #[test]
1495 fn test_parse_timestamp_naive() {
1496 let val = EventValue::Str(std::borrow::Cow::Borrowed("2024-07-10T12:30:00"));
1497 let ts = parse_timestamp_value(&val).unwrap();
1498 assert_eq!(ts, 1720614600);
1499 }
1500
1501 #[test]
1502 fn test_parse_timestamp_with_space() {
1503 let val = EventValue::Str(std::borrow::Cow::Borrowed("2024-07-10 12:30:00"));
1504 let ts = parse_timestamp_value(&val).unwrap();
1505 assert_eq!(ts, 1720614600);
1506 }
1507
1508 #[test]
1509 fn test_parse_timestamp_fractional() {
1510 let val = EventValue::Str(std::borrow::Cow::Borrowed("2024-07-10T12:30:00.123Z"));
1511 let ts = parse_timestamp_value(&val).unwrap();
1512 assert_eq!(ts, 1720614600);
1513 }
1514
1515 #[test]
1516 fn test_extract_timestamp_from_event() {
1517 let config = CorrelationConfig {
1518 timestamp_fields: vec!["@timestamp".to_string()],
1519 max_state_entries: 100_000,
1520 ..Default::default()
1521 };
1522 let engine = CorrelationEngine::new(config);
1523
1524 let v = json!({"@timestamp": "2024-07-10T12:30:00Z", "data": "test"});
1525 let event = JsonEvent::borrow(&v);
1526 let ts = engine.extract_event_timestamp(&event);
1527 assert_eq!(ts, Some(1720614600));
1528 }
1529
1530 #[test]
1531 fn test_extract_timestamp_fallback_fields() {
1532 let config = CorrelationConfig {
1533 timestamp_fields: vec![
1534 "@timestamp".to_string(),
1535 "timestamp".to_string(),
1536 "EventTime".to_string(),
1537 ],
1538 max_state_entries: 100_000,
1539 ..Default::default()
1540 };
1541 let engine = CorrelationEngine::new(config);
1542
1543 let v = json!({"timestamp": 1720613400, "data": "test"});
1545 let event = JsonEvent::borrow(&v);
1546 let ts = engine.extract_event_timestamp(&event);
1547 assert_eq!(ts, Some(1720613400));
1548 }
1549
1550 #[test]
1551 fn test_extract_timestamp_returns_none_when_missing() {
1552 let config = CorrelationConfig {
1553 timestamp_fields: vec!["@timestamp".to_string()],
1554 ..Default::default()
1555 };
1556 let engine = CorrelationEngine::new(config);
1557
1558 let v = json!({"data": "no timestamp here"});
1559 let event = JsonEvent::borrow(&v);
1560 assert_eq!(engine.extract_event_timestamp(&event), None);
1561 }
1562
1563 #[test]
1564 fn test_timestamp_fallback_skip() {
1565 let yaml = r#"
1566title: test rule
1567id: ts-skip-rule
1568logsource:
1569 product: test
1570detection:
1571 selection:
1572 action: click
1573 condition: selection
1574level: low
1575---
1576title: test correlation
1577correlation:
1578 type: event_count
1579 rules:
1580 - ts-skip-rule
1581 group-by:
1582 - User
1583 timespan: 10s
1584 condition:
1585 gte: 2
1586level: high
1587"#;
1588 let collection = parse_sigma_yaml(yaml).unwrap();
1589 let mut engine = CorrelationEngine::new(CorrelationConfig {
1590 timestamp_fallback: TimestampFallback::Skip,
1591 ..Default::default()
1592 });
1593 engine.add_collection(&collection).unwrap();
1594 assert_eq!(engine.correlation_rule_count(), 1);
1595
1596 let v = json!({"action": "click", "User": "alice"});
1598 let event = JsonEvent::borrow(&v);
1599
1600 let r1 = engine.process_event(&event);
1601 assert!(!r1.detections.is_empty(), "detection should still fire");
1602
1603 let r2 = engine.process_event(&event);
1604 assert!(!r2.detections.is_empty(), "detection should still fire");
1605
1606 let r3 = engine.process_event(&event);
1607 assert!(!r3.detections.is_empty(), "detection should still fire");
1608
1609 assert!(r1.correlations.is_empty());
1611 assert!(r2.correlations.is_empty());
1612 assert!(r3.correlations.is_empty());
1613 }
1614
1615 #[test]
1616 fn test_timestamp_fallback_wallclock_default() {
1617 let yaml = r#"
1618title: test rule
1619id: ts-wc-rule
1620logsource:
1621 product: test
1622detection:
1623 selection:
1624 action: click
1625 condition: selection
1626level: low
1627---
1628title: test correlation
1629correlation:
1630 type: event_count
1631 rules:
1632 - ts-wc-rule
1633 group-by:
1634 - User
1635 timespan: 60s
1636 condition:
1637 gte: 2
1638level: high
1639"#;
1640 let collection = parse_sigma_yaml(yaml).unwrap();
1641 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1642 engine.add_collection(&collection).unwrap();
1643 assert_eq!(engine.correlation_rule_count(), 1);
1644
1645 let v = json!({"action": "click", "User": "alice"});
1648 let event = JsonEvent::borrow(&v);
1649
1650 let _r1 = engine.process_event(&event);
1651 let _r2 = engine.process_event(&event);
1652 let r3 = engine.process_event(&event);
1653
1654 assert!(
1656 !r3.correlations.is_empty(),
1657 "WallClock fallback should allow correlation"
1658 );
1659 }
1660
1661 #[test]
1666 fn test_event_count_basic() {
1667 let yaml = r#"
1668title: Base Rule
1669id: base-rule-001
1670name: base_rule
1671logsource:
1672 product: windows
1673 category: process_creation
1674detection:
1675 selection:
1676 CommandLine|contains: 'whoami'
1677 condition: selection
1678level: low
1679---
1680title: Multiple Whoami
1681id: corr-001
1682correlation:
1683 type: event_count
1684 rules:
1685 - base-rule-001
1686 group-by:
1687 - User
1688 timespan: 60s
1689 condition:
1690 gte: 3
1691level: high
1692"#;
1693 let collection = parse_sigma_yaml(yaml).unwrap();
1694 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1695 engine.add_collection(&collection).unwrap();
1696
1697 assert_eq!(engine.detection_rule_count(), 1);
1698 assert_eq!(engine.correlation_rule_count(), 1);
1699
1700 let base_ts = 1000i64;
1702 for i in 0..3 {
1703 let v = json!({"CommandLine": "whoami", "User": "admin"});
1704 let event = JsonEvent::borrow(&v);
1705 let result = engine.process_event_at(&event, base_ts + i * 10);
1706
1707 assert_eq!(result.detections.len(), 1);
1709
1710 if i < 2 {
1711 assert!(result.correlations.is_empty());
1713 } else {
1714 assert_eq!(result.correlations.len(), 1);
1716 assert_eq!(result.correlations[0].rule_title, "Multiple Whoami");
1717 assert_eq!(result.correlations[0].aggregated_value, 3.0);
1718 }
1719 }
1720 }
1721
1722 #[test]
1723 fn test_event_count_different_groups() {
1724 let yaml = r#"
1725title: Login
1726id: login-001
1727logsource:
1728 category: auth
1729detection:
1730 selection:
1731 EventType: login
1732 condition: selection
1733level: low
1734---
1735title: Many Logins
1736id: corr-login
1737correlation:
1738 type: event_count
1739 rules:
1740 - login-001
1741 group-by:
1742 - User
1743 timespan: 60s
1744 condition:
1745 gte: 3
1746level: high
1747"#;
1748 let collection = parse_sigma_yaml(yaml).unwrap();
1749 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1750 engine.add_collection(&collection).unwrap();
1751
1752 let ts = 1000i64;
1754 for i in 0..2 {
1755 let v = json!({"EventType": "login", "User": "alice"});
1756 let event = JsonEvent::borrow(&v);
1757 let r = engine.process_event_at(&event, ts + i);
1758 assert!(r.correlations.is_empty());
1759 }
1760 for i in 0..3 {
1761 let v = json!({"EventType": "login", "User": "bob"});
1762 let event = JsonEvent::borrow(&v);
1763 let r = engine.process_event_at(&event, ts + i);
1764 if i == 2 {
1765 assert_eq!(r.correlations.len(), 1);
1766 assert_eq!(
1767 r.correlations[0].group_key,
1768 vec![("User".to_string(), "bob".to_string())]
1769 );
1770 }
1771 }
1772 }
1773
1774 #[test]
1775 fn test_event_count_window_expiry() {
1776 let yaml = r#"
1777title: Base
1778id: base-002
1779logsource:
1780 category: test
1781detection:
1782 selection:
1783 action: click
1784 condition: selection
1785---
1786title: Rapid Clicks
1787id: corr-002
1788correlation:
1789 type: event_count
1790 rules:
1791 - base-002
1792 group-by:
1793 - User
1794 timespan: 10s
1795 condition:
1796 gte: 3
1797level: medium
1798"#;
1799 let collection = parse_sigma_yaml(yaml).unwrap();
1800 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1801 engine.add_collection(&collection).unwrap();
1802
1803 let v = json!({"action": "click", "User": "admin"});
1805 let event = JsonEvent::borrow(&v);
1806 engine.process_event_at(&event, 0);
1807 engine.process_event_at(&event, 1);
1808 let r = engine.process_event_at(&event, 15);
1809 assert!(r.correlations.is_empty());
1811 }
1812
1813 #[test]
1818 fn test_value_count() {
1819 let yaml = r#"
1820title: Failed Login
1821id: failed-login-001
1822logsource:
1823 category: auth
1824detection:
1825 selection:
1826 EventType: failed_login
1827 condition: selection
1828level: low
1829---
1830title: Failed Logins From Many Users
1831id: corr-vc-001
1832correlation:
1833 type: value_count
1834 rules:
1835 - failed-login-001
1836 group-by:
1837 - Host
1838 timespan: 60s
1839 condition:
1840 field: User
1841 gte: 3
1842level: high
1843"#;
1844 let collection = parse_sigma_yaml(yaml).unwrap();
1845 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1846 engine.add_collection(&collection).unwrap();
1847
1848 let ts = 1000i64;
1849 for (i, user) in ["alice", "bob", "charlie"].iter().enumerate() {
1851 let v = json!({"EventType": "failed_login", "Host": "srv01", "User": user});
1852 let event = JsonEvent::borrow(&v);
1853 let r = engine.process_event_at(&event, ts + i as i64);
1854 if i == 2 {
1855 assert_eq!(r.correlations.len(), 1);
1856 assert_eq!(r.correlations[0].aggregated_value, 3.0);
1857 }
1858 }
1859 }
1860
1861 #[test]
1866 fn test_temporal() {
1867 let yaml = r#"
1868title: Recon A
1869id: recon-a
1870name: recon_a
1871logsource:
1872 category: process
1873detection:
1874 selection:
1875 CommandLine|contains: 'whoami'
1876 condition: selection
1877---
1878title: Recon B
1879id: recon-b
1880name: recon_b
1881logsource:
1882 category: process
1883detection:
1884 selection:
1885 CommandLine|contains: 'ipconfig'
1886 condition: selection
1887---
1888title: Recon Combo
1889id: corr-temporal
1890correlation:
1891 type: temporal
1892 rules:
1893 - recon-a
1894 - recon-b
1895 group-by:
1896 - User
1897 timespan: 60s
1898 condition:
1899 gte: 2
1900level: high
1901"#;
1902 let collection = parse_sigma_yaml(yaml).unwrap();
1903 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1904 engine.add_collection(&collection).unwrap();
1905
1906 let ts = 1000i64;
1907 let v1 = json!({"CommandLine": "whoami", "User": "admin"});
1909 let ev1 = JsonEvent::borrow(&v1);
1910 let r1 = engine.process_event_at(&ev1, ts);
1911 assert!(r1.correlations.is_empty());
1912
1913 let v2 = json!({"CommandLine": "ipconfig /all", "User": "admin"});
1915 let ev2 = JsonEvent::borrow(&v2);
1916 let r2 = engine.process_event_at(&ev2, ts + 10);
1917 assert_eq!(r2.correlations.len(), 1);
1918 assert_eq!(r2.correlations[0].rule_title, "Recon Combo");
1919 }
1920
1921 #[test]
1926 fn test_temporal_ordered() {
1927 let yaml = r#"
1928title: Failed Login
1929id: failed-001
1930name: failed_login
1931logsource:
1932 category: auth
1933detection:
1934 selection:
1935 EventType: failed_login
1936 condition: selection
1937---
1938title: Success Login
1939id: success-001
1940name: successful_login
1941logsource:
1942 category: auth
1943detection:
1944 selection:
1945 EventType: success_login
1946 condition: selection
1947---
1948title: Brute Force Then Login
1949id: corr-bf
1950correlation:
1951 type: temporal_ordered
1952 rules:
1953 - failed-001
1954 - success-001
1955 group-by:
1956 - User
1957 timespan: 60s
1958 condition:
1959 gte: 2
1960level: critical
1961"#;
1962 let collection = parse_sigma_yaml(yaml).unwrap();
1963 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1964 engine.add_collection(&collection).unwrap();
1965
1966 let ts = 1000i64;
1967 let v1 = json!({"EventType": "failed_login", "User": "admin"});
1969 let ev1 = JsonEvent::borrow(&v1);
1970 let r1 = engine.process_event_at(&ev1, ts);
1971 assert!(r1.correlations.is_empty());
1972
1973 let v2 = json!({"EventType": "success_login", "User": "admin"});
1975 let ev2 = JsonEvent::borrow(&v2);
1976 let r2 = engine.process_event_at(&ev2, ts + 10);
1977 assert_eq!(r2.correlations.len(), 1);
1978 }
1979
1980 #[test]
1981 fn test_temporal_ordered_wrong_order() {
1982 let yaml = r#"
1983title: Rule A
1984id: rule-a
1985logsource:
1986 category: test
1987detection:
1988 selection:
1989 type: a
1990 condition: selection
1991---
1992title: Rule B
1993id: rule-b
1994logsource:
1995 category: test
1996detection:
1997 selection:
1998 type: b
1999 condition: selection
2000---
2001title: A then B
2002id: corr-ab
2003correlation:
2004 type: temporal_ordered
2005 rules:
2006 - rule-a
2007 - rule-b
2008 group-by:
2009 - User
2010 timespan: 60s
2011 condition:
2012 gte: 2
2013level: high
2014"#;
2015 let collection = parse_sigma_yaml(yaml).unwrap();
2016 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2017 engine.add_collection(&collection).unwrap();
2018
2019 let ts = 1000i64;
2020 let v1 = json!({"type": "b", "User": "admin"});
2022 let ev1 = JsonEvent::borrow(&v1);
2023 engine.process_event_at(&ev1, ts);
2024
2025 let v2 = json!({"type": "a", "User": "admin"});
2026 let ev2 = JsonEvent::borrow(&v2);
2027 let r2 = engine.process_event_at(&ev2, ts + 10);
2028 assert!(r2.correlations.is_empty());
2029 }
2030
2031 #[test]
2036 fn test_value_sum() {
2037 let yaml = r#"
2038title: Web Access
2039id: web-001
2040logsource:
2041 category: web
2042detection:
2043 selection:
2044 action: upload
2045 condition: selection
2046---
2047title: Large Upload
2048id: corr-sum
2049correlation:
2050 type: value_sum
2051 rules:
2052 - web-001
2053 group-by:
2054 - User
2055 timespan: 60s
2056 condition:
2057 field: bytes_sent
2058 gt: 1000
2059level: high
2060"#;
2061 let collection = parse_sigma_yaml(yaml).unwrap();
2062 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2063 engine.add_collection(&collection).unwrap();
2064
2065 let ts = 1000i64;
2066 let v1 = json!({"action": "upload", "User": "alice", "bytes_sent": 600});
2067 let ev1 = JsonEvent::borrow(&v1);
2068 let r1 = engine.process_event_at(&ev1, ts);
2069 assert!(r1.correlations.is_empty());
2070
2071 let v2 = json!({"action": "upload", "User": "alice", "bytes_sent": 500});
2072 let ev2 = JsonEvent::borrow(&v2);
2073 let r2 = engine.process_event_at(&ev2, ts + 5);
2074 assert_eq!(r2.correlations.len(), 1);
2075 assert!((r2.correlations[0].aggregated_value - 1100.0).abs() < f64::EPSILON);
2076 }
2077
2078 #[test]
2079 fn test_value_avg() {
2080 let yaml = r#"
2081title: Request
2082id: req-001
2083logsource:
2084 category: web
2085detection:
2086 selection:
2087 type: request
2088 condition: selection
2089---
2090title: High Avg Latency
2091id: corr-avg
2092correlation:
2093 type: value_avg
2094 rules:
2095 - req-001
2096 group-by:
2097 - Service
2098 timespan: 60s
2099 condition:
2100 field: latency_ms
2101 gt: 500
2102level: medium
2103"#;
2104 let collection = parse_sigma_yaml(yaml).unwrap();
2105 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2106 engine.add_collection(&collection).unwrap();
2107
2108 let ts = 1000i64;
2109 for (i, latency) in [400, 600, 800].iter().enumerate() {
2111 let v = json!({"type": "request", "Service": "api", "latency_ms": latency});
2112 let event = JsonEvent::borrow(&v);
2113 let r = engine.process_event_at(&event, ts + i as i64);
2114 if i == 2 {
2115 assert_eq!(r.correlations.len(), 1);
2116 assert!((r.correlations[0].aggregated_value - 600.0).abs() < f64::EPSILON);
2117 }
2118 }
2119 }
2120
2121 #[test]
2126 fn test_state_count() {
2127 let yaml = r#"
2128title: Base
2129id: base-sc
2130logsource:
2131 category: test
2132detection:
2133 selection:
2134 action: test
2135 condition: selection
2136---
2137title: Count
2138id: corr-sc
2139correlation:
2140 type: event_count
2141 rules:
2142 - base-sc
2143 group-by:
2144 - User
2145 timespan: 60s
2146 condition:
2147 gte: 100
2148level: low
2149"#;
2150 let collection = parse_sigma_yaml(yaml).unwrap();
2151 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2152 engine.add_collection(&collection).unwrap();
2153
2154 let v = json!({"action": "test", "User": "alice"});
2155 let event = JsonEvent::borrow(&v);
2156 engine.process_event_at(&event, 1000);
2157 assert_eq!(engine.state_count(), 1);
2158
2159 let v2 = json!({"action": "test", "User": "bob"});
2160 let event2 = JsonEvent::borrow(&v2);
2161 engine.process_event_at(&event2, 1001);
2162 assert_eq!(engine.state_count(), 2);
2163
2164 engine.evict_expired(2000);
2166 assert_eq!(engine.state_count(), 0);
2167 }
2168
2169 #[test]
2174 fn test_generate_flag_default_false() {
2175 let yaml = r#"
2176title: Base
2177id: gen-base
2178logsource:
2179 category: test
2180detection:
2181 selection:
2182 action: test
2183 condition: selection
2184---
2185title: Correlation
2186id: gen-corr
2187correlation:
2188 type: event_count
2189 rules:
2190 - gen-base
2191 group-by:
2192 - User
2193 timespan: 60s
2194 condition:
2195 gte: 1
2196level: high
2197"#;
2198 let collection = parse_sigma_yaml(yaml).unwrap();
2199 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2200 engine.add_collection(&collection).unwrap();
2201
2202 let v = json!({"action": "test", "User": "alice"});
2205 let event = JsonEvent::borrow(&v);
2206 let r = engine.process_event_at(&event, 1000);
2207 assert_eq!(r.detections.len(), 1);
2208 assert_eq!(r.correlations.len(), 1);
2209 }
2210
2211 #[test]
2216 fn test_aws_bucket_enumeration() {
2217 let yaml = r#"
2218title: Potential Bucket Enumeration on AWS
2219id: f305fd62-beca-47da-ad95-7690a0620084
2220logsource:
2221 product: aws
2222 service: cloudtrail
2223detection:
2224 selection:
2225 eventSource: "s3.amazonaws.com"
2226 eventName: "ListBuckets"
2227 condition: selection
2228level: low
2229---
2230title: Multiple AWS bucket enumerations
2231id: be246094-01d3-4bba-88de-69e582eba0cc
2232status: experimental
2233correlation:
2234 type: event_count
2235 rules:
2236 - f305fd62-beca-47da-ad95-7690a0620084
2237 group-by:
2238 - userIdentity.arn
2239 timespan: 1h
2240 condition:
2241 gte: 5
2242level: high
2243"#;
2244 let collection = parse_sigma_yaml(yaml).unwrap();
2245 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2246 engine.add_collection(&collection).unwrap();
2247
2248 let base_ts = 1_700_000_000i64;
2249 for i in 0..5 {
2250 let v = json!({
2251 "eventSource": "s3.amazonaws.com",
2252 "eventName": "ListBuckets",
2253 "userIdentity.arn": "arn:aws:iam::123456789:user/attacker"
2254 });
2255 let event = JsonEvent::borrow(&v);
2256 let r = engine.process_event_at(&event, base_ts + i * 60);
2257 if i == 4 {
2258 assert_eq!(r.correlations.len(), 1);
2259 assert_eq!(
2260 r.correlations[0].rule_title,
2261 "Multiple AWS bucket enumerations"
2262 );
2263 assert_eq!(r.correlations[0].aggregated_value, 5.0);
2264 }
2265 }
2266 }
2267
2268 #[test]
2273 fn test_chaining_event_count_to_temporal() {
2274 let yaml = r#"
2277title: Single failed login
2278id: failed-login-chain
2279name: failed_login
2280logsource:
2281 category: auth
2282detection:
2283 selection:
2284 EventType: failed_login
2285 condition: selection
2286---
2287title: Successful login
2288id: success-login-chain
2289name: successful_login
2290logsource:
2291 category: auth
2292detection:
2293 selection:
2294 EventType: success_login
2295 condition: selection
2296---
2297title: Multiple failed logins
2298id: many-failed-chain
2299name: multiple_failed_login
2300correlation:
2301 type: event_count
2302 rules:
2303 - failed-login-chain
2304 group-by:
2305 - User
2306 timespan: 60s
2307 condition:
2308 gte: 3
2309level: medium
2310---
2311title: Brute Force Followed by Login
2312id: brute-force-chain
2313correlation:
2314 type: temporal_ordered
2315 rules:
2316 - many-failed-chain
2317 - success-login-chain
2318 group-by:
2319 - User
2320 timespan: 120s
2321 condition:
2322 gte: 2
2323level: critical
2324"#;
2325 let collection = parse_sigma_yaml(yaml).unwrap();
2326 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2327 engine.add_collection(&collection).unwrap();
2328
2329 assert_eq!(engine.detection_rule_count(), 2);
2330 assert_eq!(engine.correlation_rule_count(), 2);
2331
2332 let ts = 1000i64;
2333
2334 for i in 0..3 {
2336 let v = json!({"EventType": "failed_login", "User": "victim"});
2337 let event = JsonEvent::borrow(&v);
2338 let r = engine.process_event_at(&event, ts + i);
2339 if i == 2 {
2340 assert!(
2342 r.correlations
2343 .iter()
2344 .any(|c| c.rule_title == "Multiple failed logins"),
2345 "Expected event_count correlation to fire"
2346 );
2347 }
2348 }
2349
2350 let v = json!({"EventType": "success_login", "User": "victim"});
2357 let event = JsonEvent::borrow(&v);
2358 let r = engine.process_event_at(&event, ts + 30);
2359
2360 assert_eq!(r.detections.len(), 1);
2362 assert_eq!(r.detections[0].rule_title, "Successful login");
2363 }
2364
2365 #[test]
2370 fn test_field_aliases() {
2371 let yaml = r#"
2372title: Internal Error
2373id: internal-error-001
2374name: internal_error
2375logsource:
2376 category: web
2377detection:
2378 selection:
2379 http.response.status_code: 500
2380 condition: selection
2381---
2382title: New Connection
2383id: new-conn-001
2384name: new_network_connection
2385logsource:
2386 category: network
2387detection:
2388 selection:
2389 event.type: connection
2390 condition: selection
2391---
2392title: Error Then Connection
2393id: corr-alias
2394correlation:
2395 type: temporal
2396 rules:
2397 - internal-error-001
2398 - new-conn-001
2399 group-by:
2400 - internal_ip
2401 timespan: 60s
2402 condition:
2403 gte: 2
2404 aliases:
2405 internal_ip:
2406 internal_error: destination.ip
2407 new_network_connection: source.ip
2408level: high
2409"#;
2410 let collection = parse_sigma_yaml(yaml).unwrap();
2411 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2412 engine.add_collection(&collection).unwrap();
2413
2414 let ts = 1000i64;
2415
2416 let v1 = json!({
2418 "http.response.status_code": 500,
2419 "destination.ip": "10.0.0.5"
2420 });
2421 let ev1 = JsonEvent::borrow(&v1);
2422 let r1 = engine.process_event_at(&ev1, ts);
2423 assert_eq!(r1.detections.len(), 1);
2424 assert!(r1.correlations.is_empty());
2425
2426 let v2 = json!({
2428 "event.type": "connection",
2429 "source.ip": "10.0.0.5"
2430 });
2431 let ev2 = JsonEvent::borrow(&v2);
2432 let r2 = engine.process_event_at(&ev2, ts + 5);
2433 assert_eq!(r2.detections.len(), 1);
2434 assert_eq!(r2.correlations.len(), 1);
2436 assert_eq!(r2.correlations[0].rule_title, "Error Then Connection");
2437 assert!(
2439 r2.correlations[0]
2440 .group_key
2441 .iter()
2442 .any(|(k, v)| k == "internal_ip" && v == "10.0.0.5")
2443 );
2444 }
2445
2446 #[test]
2451 fn test_value_percentile() {
2452 let yaml = r#"
2453title: Process Creation
2454id: proc-001
2455logsource:
2456 category: process
2457detection:
2458 selection:
2459 type: process_creation
2460 condition: selection
2461---
2462title: Rare Process
2463id: corr-percentile
2464correlation:
2465 type: value_percentile
2466 rules:
2467 - proc-001
2468 group-by:
2469 - ComputerName
2470 timespan: 60s
2471 condition:
2472 field: image
2473 lte: 50
2474level: medium
2475"#;
2476 let collection = parse_sigma_yaml(yaml).unwrap();
2477 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2478 engine.add_collection(&collection).unwrap();
2479
2480 let ts = 1000i64;
2481 for (i, val) in [10.0, 20.0, 30.0, 40.0, 50.0].iter().enumerate() {
2483 let v = json!({"type": "process_creation", "ComputerName": "srv01", "image": val});
2484 let event = JsonEvent::borrow(&v);
2485 let _ = engine.process_event_at(&event, ts + i as i64);
2486 }
2487 }
2490
2491 #[test]
2496 fn test_extended_temporal_and_condition() {
2497 let yaml = r#"
2499title: Login Attempt
2500id: login-attempt
2501logsource:
2502 category: auth
2503detection:
2504 selection:
2505 EventType: login_failure
2506 condition: selection
2507---
2508title: Password Change
2509id: password-change
2510logsource:
2511 category: auth
2512detection:
2513 selection:
2514 EventType: password_change
2515 condition: selection
2516---
2517title: Credential Attack
2518correlation:
2519 type: temporal
2520 rules:
2521 - login-attempt
2522 - password-change
2523 group-by:
2524 - User
2525 timespan: 300s
2526 condition: login-attempt and password-change
2527level: high
2528"#;
2529 let collection = parse_sigma_yaml(yaml).unwrap();
2530 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2531 engine.add_collection(&collection).unwrap();
2532
2533 let ts = 1000i64;
2534
2535 let ev1 = json!({"EventType": "login_failure", "User": "alice"});
2537 let r1 = engine.process_event_at(&JsonEvent::borrow(&ev1), ts);
2538 assert!(r1.correlations.is_empty(), "only one rule fired so far");
2539
2540 let ev2 = json!({"EventType": "password_change", "User": "alice"});
2542 let r2 = engine.process_event_at(&JsonEvent::borrow(&ev2), ts + 10);
2543 assert_eq!(
2544 r2.correlations.len(),
2545 1,
2546 "temporal correlation should fire: both rules matched"
2547 );
2548 assert_eq!(r2.correlations[0].rule_title, "Credential Attack");
2549 }
2550
2551 #[test]
2552 fn test_extended_temporal_or_condition() {
2553 let yaml = r#"
2555title: SSH Login
2556id: ssh-login
2557logsource:
2558 category: auth
2559detection:
2560 selection:
2561 EventType: ssh_login
2562 condition: selection
2563---
2564title: VPN Login
2565id: vpn-login
2566logsource:
2567 category: auth
2568detection:
2569 selection:
2570 EventType: vpn_login
2571 condition: selection
2572---
2573title: Any Remote Access
2574correlation:
2575 type: temporal
2576 rules:
2577 - ssh-login
2578 - vpn-login
2579 group-by:
2580 - User
2581 timespan: 60s
2582 condition: ssh-login or vpn-login
2583level: medium
2584"#;
2585 let collection = parse_sigma_yaml(yaml).unwrap();
2586 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2587 engine.add_collection(&collection).unwrap();
2588
2589 let ev = json!({"EventType": "ssh_login", "User": "bob"});
2591 let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
2592 assert_eq!(r.correlations.len(), 1);
2593 assert_eq!(r.correlations[0].rule_title, "Any Remote Access");
2594 }
2595
2596 #[test]
2597 fn test_extended_temporal_partial_and_no_fire() {
2598 let yaml = r#"
2600title: Recon Step 1
2601id: recon-1
2602logsource:
2603 category: process
2604detection:
2605 selection:
2606 CommandLine|contains: 'whoami'
2607 condition: selection
2608---
2609title: Recon Step 2
2610id: recon-2
2611logsource:
2612 category: process
2613detection:
2614 selection:
2615 CommandLine|contains: 'ipconfig'
2616 condition: selection
2617---
2618title: Full Recon
2619correlation:
2620 type: temporal
2621 rules:
2622 - recon-1
2623 - recon-2
2624 group-by:
2625 - Host
2626 timespan: 120s
2627 condition: recon-1 and recon-2
2628level: high
2629"#;
2630 let collection = parse_sigma_yaml(yaml).unwrap();
2631 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2632 engine.add_collection(&collection).unwrap();
2633
2634 let ev = json!({"CommandLine": "whoami", "Host": "srv01"});
2636 let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
2637 assert!(r.correlations.is_empty(), "only one of two AND rules fired");
2638
2639 let ev2 = json!({"CommandLine": "ipconfig /all", "Host": "srv01"});
2641 let r2 = engine.process_event_at(&JsonEvent::borrow(&ev2), 1010);
2642 assert_eq!(r2.correlations.len(), 1);
2643 assert_eq!(r2.correlations[0].rule_title, "Full Recon");
2644 }
2645
2646 #[test]
2651 fn test_filter_with_correlation() {
2652 let yaml = r#"
2654title: Failed Auth
2655id: failed-auth
2656logsource:
2657 category: auth
2658detection:
2659 selection:
2660 EventType: auth_failure
2661 condition: selection
2662---
2663title: Exclude Service Accounts
2664filter:
2665 rules:
2666 - failed-auth
2667 selection:
2668 User|startswith: 'svc_'
2669 condition: not selection
2670---
2671title: Brute Force
2672correlation:
2673 type: event_count
2674 rules:
2675 - failed-auth
2676 group-by:
2677 - User
2678 timespan: 300s
2679 condition:
2680 gte: 3
2681level: critical
2682"#;
2683 let collection = parse_sigma_yaml(yaml).unwrap();
2684 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2685 engine.add_collection(&collection).unwrap();
2686
2687 let ts = 1000i64;
2688
2689 for i in 0..5 {
2691 let ev = json!({"EventType": "auth_failure", "User": "svc_backup"});
2692 let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + i);
2693 assert!(
2694 r.correlations.is_empty(),
2695 "service account should be filtered, no correlation"
2696 );
2697 }
2698
2699 for i in 0..2 {
2701 let ev = json!({"EventType": "auth_failure", "User": "alice"});
2702 let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 10 + i);
2703 assert!(r.correlations.is_empty(), "not yet 3 events");
2704 }
2705
2706 let ev = json!({"EventType": "auth_failure", "User": "alice"});
2708 let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 12);
2709 assert_eq!(r.correlations.len(), 1);
2710 assert_eq!(r.correlations[0].rule_title, "Brute Force");
2711 }
2712
2713 #[test]
2718 fn test_repeat_rules_in_correlation() {
2719 let yaml = r#"
2721title: File Access A
2722id: file-a
2723logsource:
2724 category: file_access
2725detection:
2726 selection:
2727 FileName|endswith: '.docx'
2728 condition: selection
2729---
2730action: repeat
2731title: File Access B
2732id: file-b
2733detection:
2734 selection:
2735 FileName|endswith: '.xlsx'
2736 condition: selection
2737---
2738title: Mass File Access
2739correlation:
2740 type: event_count
2741 rules:
2742 - file-a
2743 - file-b
2744 group-by:
2745 - User
2746 timespan: 60s
2747 condition:
2748 gte: 3
2749level: high
2750"#;
2751 let collection = parse_sigma_yaml(yaml).unwrap();
2752 assert_eq!(collection.rules.len(), 2);
2753 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2754 engine.add_collection(&collection).unwrap();
2755 assert_eq!(engine.detection_rule_count(), 2);
2756
2757 let ts = 1000i64;
2758 let ev1 = json!({"FileName": "report.docx", "User": "bob"});
2760 engine.process_event_at(&JsonEvent::borrow(&ev1), ts);
2761 let ev2 = json!({"FileName": "data.xlsx", "User": "bob"});
2762 engine.process_event_at(&JsonEvent::borrow(&ev2), ts + 1);
2763 let ev3 = json!({"FileName": "notes.docx", "User": "bob"});
2764 let r = engine.process_event_at(&JsonEvent::borrow(&ev3), ts + 2);
2765
2766 assert_eq!(r.correlations.len(), 1);
2767 assert_eq!(r.correlations[0].rule_title, "Mass File Access");
2768 }
2769
2770 #[test]
2775 fn test_expand_modifier_with_correlation() {
2776 let yaml = r#"
2777title: User Temp File
2778id: user-temp
2779logsource:
2780 category: file_access
2781detection:
2782 selection:
2783 FilePath|expand: 'C:\Users\%User%\Temp'
2784 condition: selection
2785---
2786title: Excessive Temp Access
2787correlation:
2788 type: event_count
2789 rules:
2790 - user-temp
2791 group-by:
2792 - User
2793 timespan: 60s
2794 condition:
2795 gte: 2
2796level: medium
2797"#;
2798 let collection = parse_sigma_yaml(yaml).unwrap();
2799 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2800 engine.add_collection(&collection).unwrap();
2801
2802 let ts = 1000i64;
2803 let ev1 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2805 let r1 = engine.process_event_at(&JsonEvent::borrow(&ev1), ts);
2806 assert!(r1.correlations.is_empty());
2807
2808 let ev2 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2809 let r2 = engine.process_event_at(&JsonEvent::borrow(&ev2), ts + 1);
2810 assert_eq!(r2.correlations.len(), 1);
2811 assert_eq!(r2.correlations[0].rule_title, "Excessive Temp Access");
2812
2813 let ev3 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "bob"});
2815 let r3 = engine.process_event_at(&JsonEvent::borrow(&ev3), ts + 2);
2816 assert_eq!(r3.detections.len(), 0);
2818 }
2819
2820 #[test]
2825 fn test_timestamp_modifier_with_correlation() {
2826 let yaml = r#"
2827title: Night Login
2828id: night-login
2829logsource:
2830 category: auth
2831detection:
2832 login:
2833 EventType: login
2834 night:
2835 Timestamp|hour: 3
2836 condition: login and night
2837---
2838title: Frequent Night Logins
2839correlation:
2840 type: event_count
2841 rules:
2842 - night-login
2843 group-by:
2844 - User
2845 timespan: 3600s
2846 condition:
2847 gte: 2
2848level: high
2849"#;
2850 let collection = parse_sigma_yaml(yaml).unwrap();
2851 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2852 engine.add_collection(&collection).unwrap();
2853
2854 let ts = 1000i64;
2855 let ev1 =
2857 json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:10:00Z"});
2858 let r1 = engine.process_event_at(&JsonEvent::borrow(&ev1), ts);
2859 assert_eq!(r1.detections.len(), 1);
2860 assert!(r1.correlations.is_empty());
2861
2862 let ev2 =
2863 json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:45:00Z"});
2864 let r2 = engine.process_event_at(&JsonEvent::borrow(&ev2), ts + 1);
2865 assert_eq!(r2.correlations.len(), 1);
2866 assert_eq!(r2.correlations[0].rule_title, "Frequent Night Logins");
2867
2868 let ev3 = json!({"EventType": "login", "User": "bob", "Timestamp": "2024-01-15T12:00:00Z"});
2870 let r3 = engine.process_event_at(&JsonEvent::borrow(&ev3), ts + 2);
2871 assert!(
2872 r3.detections.is_empty(),
2873 "noon login should not match night rule"
2874 );
2875 }
2876
2877 #[test]
2882 fn test_event_count_range_condition() {
2883 let yaml = r#"
2884title: Login Attempt
2885id: login-attempt-001
2886name: login_attempt
2887logsource:
2888 product: windows
2889detection:
2890 selection:
2891 EventType: login
2892 condition: selection
2893level: low
2894---
2895title: Login Count Range
2896id: corr-range-001
2897correlation:
2898 type: event_count
2899 rules:
2900 - login-attempt-001
2901 group-by:
2902 - User
2903 timespan: 3600s
2904 condition:
2905 gt: 2
2906 lte: 5
2907level: high
2908"#;
2909 let collection = parse_sigma_yaml(yaml).unwrap();
2910 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2911 engine.add_collection(&collection).unwrap();
2912
2913 let ts: i64 = 1_000_000;
2914
2915 for i in 0..2 {
2917 let ev = json!({"EventType": "login", "User": "alice"});
2918 let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + i);
2919 assert!(r.correlations.is_empty(), "2 events should not fire (gt:2)");
2920 }
2921
2922 let ev3 = json!({"EventType": "login", "User": "alice"});
2924 let r3 = engine.process_event_at(&JsonEvent::borrow(&ev3), ts + 3);
2925 assert_eq!(r3.correlations.len(), 1, "3 events: gt:2 AND lte:5");
2926
2927 for i in 4..=5 {
2929 let ev = json!({"EventType": "login", "User": "alice"});
2930 let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + i);
2931 assert_eq!(r.correlations.len(), 1, "{i} events still in range");
2932 }
2933
2934 let ev6 = json!({"EventType": "login", "User": "alice"});
2936 let r6 = engine.process_event_at(&JsonEvent::borrow(&ev6), ts + 6);
2937 assert!(
2938 r6.correlations.is_empty(),
2939 "6 events exceeds lte:5, should not fire"
2940 );
2941 }
2942
2943 fn suppression_yaml() -> &'static str {
2948 r#"
2949title: Login
2950id: login-base
2951logsource:
2952 category: auth
2953detection:
2954 selection:
2955 EventType: login
2956 condition: selection
2957---
2958title: Many Logins
2959correlation:
2960 type: event_count
2961 rules:
2962 - login-base
2963 group-by:
2964 - User
2965 timeframe: 60s
2966 condition:
2967 gte: 3
2968level: high
2969"#
2970 }
2971
2972 #[test]
2973 fn test_suppression_window() {
2974 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2975 let config = CorrelationConfig {
2976 suppress: Some(10), ..Default::default()
2978 };
2979 let mut engine = CorrelationEngine::new(config);
2980 engine.add_collection(&collection).unwrap();
2981
2982 let ev = json!({"EventType": "login", "User": "alice"});
2983 let ts = 1000;
2984
2985 engine.process_event_at(&JsonEvent::borrow(&ev), ts);
2987 engine.process_event_at(&JsonEvent::borrow(&ev), ts + 1);
2988 let r3 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 2);
2989 assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
2990
2991 let r4 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 3);
2993 assert!(
2994 r4.correlations.is_empty(),
2995 "should be suppressed within 10s window"
2996 );
2997
2998 let r5 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 9);
3000 assert!(
3001 r5.correlations.is_empty(),
3002 "should be suppressed at ts+9 (< ts+2+10)"
3003 );
3004
3005 let r6 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 13);
3007 assert_eq!(
3008 r6.correlations.len(),
3009 1,
3010 "should fire again after suppress window expires"
3011 );
3012 }
3013
3014 #[test]
3015 fn test_suppression_per_group_key() {
3016 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3017 let config = CorrelationConfig {
3018 suppress: Some(60),
3019 ..Default::default()
3020 };
3021 let mut engine = CorrelationEngine::new(config);
3022 engine.add_collection(&collection).unwrap();
3023
3024 let ts = 1000;
3025
3026 let ev_a = json!({"EventType": "login", "User": "alice"});
3028 engine.process_event_at(&JsonEvent::borrow(&ev_a), ts);
3029 engine.process_event_at(&JsonEvent::borrow(&ev_a), ts + 1);
3030 let r = engine.process_event_at(&JsonEvent::borrow(&ev_a), ts + 2);
3031 assert_eq!(r.correlations.len(), 1, "alice should fire");
3032
3033 let ev_b = json!({"EventType": "login", "User": "bob"});
3035 engine.process_event_at(&JsonEvent::borrow(&ev_b), ts + 3);
3036 engine.process_event_at(&JsonEvent::borrow(&ev_b), ts + 4);
3037 let r = engine.process_event_at(&JsonEvent::borrow(&ev_b), ts + 5);
3038 assert_eq!(r.correlations.len(), 1, "bob should fire independently");
3039
3040 let r = engine.process_event_at(&JsonEvent::borrow(&ev_a), ts + 6);
3042 assert!(r.correlations.is_empty(), "alice still suppressed");
3043 }
3044
3045 #[test]
3050 fn test_action_reset() {
3051 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3052 let config = CorrelationConfig {
3053 action_on_match: CorrelationAction::Reset,
3054 ..Default::default()
3055 };
3056 let mut engine = CorrelationEngine::new(config);
3057 engine.add_collection(&collection).unwrap();
3058
3059 let ev = json!({"EventType": "login", "User": "alice"});
3060 let ts = 1000;
3061
3062 engine.process_event_at(&JsonEvent::borrow(&ev), ts);
3064 engine.process_event_at(&JsonEvent::borrow(&ev), ts + 1);
3065 let r3 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 2);
3066 assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
3067
3068 let r4 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 3);
3070 assert!(r4.correlations.is_empty(), "reset: need 3 more events");
3071
3072 let r5 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 4);
3073 assert!(r5.correlations.is_empty(), "reset: still only 2");
3074
3075 let r6 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 5);
3077 assert_eq!(
3078 r6.correlations.len(),
3079 1,
3080 "should fire again after 3 events post-reset"
3081 );
3082 }
3083
3084 #[test]
3089 fn test_emit_detections_true_by_default() {
3090 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3091 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3092 engine.add_collection(&collection).unwrap();
3093
3094 let ev = json!({"EventType": "login", "User": "alice"});
3095 let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
3096 assert_eq!(r.detections.len(), 1, "by default detections are emitted");
3097 }
3098
3099 #[test]
3100 fn test_emit_detections_false_suppresses() {
3101 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3102 let config = CorrelationConfig {
3103 emit_detections: false,
3104 ..Default::default()
3105 };
3106 let mut engine = CorrelationEngine::new(config);
3107 engine.add_collection(&collection).unwrap();
3108
3109 let ev = json!({"EventType": "login", "User": "alice"});
3110 let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
3111 assert!(
3112 r.detections.is_empty(),
3113 "detection matches should be suppressed when emit_detections=false"
3114 );
3115 }
3116
3117 #[test]
3118 fn test_generate_true_keeps_detections() {
3119 let yaml = r#"
3121title: Login
3122id: login-gen
3123logsource:
3124 category: auth
3125detection:
3126 selection:
3127 EventType: login
3128 condition: selection
3129---
3130title: Many Logins
3131correlation:
3132 type: event_count
3133 rules:
3134 - login-gen
3135 group-by:
3136 - User
3137 timeframe: 60s
3138 condition:
3139 gte: 3
3140 generate: true
3141level: high
3142"#;
3143 let collection = parse_sigma_yaml(yaml).unwrap();
3144 let config = CorrelationConfig {
3145 emit_detections: false,
3146 ..Default::default()
3147 };
3148 let mut engine = CorrelationEngine::new(config);
3149 engine.add_collection(&collection).unwrap();
3150
3151 let ev = json!({"EventType": "login", "User": "alice"});
3152 let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
3153 assert_eq!(
3155 r.detections.len(),
3156 1,
3157 "generate:true keeps detection output"
3158 );
3159 }
3160
3161 #[test]
3166 fn test_suppress_and_reset_combined() {
3167 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3168 let config = CorrelationConfig {
3169 suppress: Some(5),
3170 action_on_match: CorrelationAction::Reset,
3171 ..Default::default()
3172 };
3173 let mut engine = CorrelationEngine::new(config);
3174 engine.add_collection(&collection).unwrap();
3175
3176 let ev = json!({"EventType": "login", "User": "alice"});
3177 let ts = 1000;
3178
3179 engine.process_event_at(&JsonEvent::borrow(&ev), ts);
3181 engine.process_event_at(&JsonEvent::borrow(&ev), ts + 1);
3182 let r3 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 2);
3183 assert_eq!(r3.correlations.len(), 1, "fires on 3rd event");
3184
3185 engine.process_event_at(&JsonEvent::borrow(&ev), ts + 3);
3188 engine.process_event_at(&JsonEvent::borrow(&ev), ts + 4);
3189 let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 5);
3190 assert!(
3191 r.correlations.is_empty(),
3192 "threshold met again but still suppressed"
3193 );
3194
3195 let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 8);
3199 assert_eq!(
3200 r.correlations.len(),
3201 1,
3202 "fires after suppress expires (accumulated events + new one)"
3203 );
3204
3205 engine.process_event_at(&JsonEvent::borrow(&ev), ts + 9);
3208 engine.process_event_at(&JsonEvent::borrow(&ev), ts + 10);
3209 let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 11);
3210 assert!(
3211 r.correlations.is_empty(),
3212 "threshold met but suppress window hasn't expired (ts+11 - ts+8 = 3 < 5)"
3213 );
3214 }
3215
3216 #[test]
3221 fn test_no_suppression_fires_every_event() {
3222 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3223 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3224 engine.add_collection(&collection).unwrap();
3225
3226 let ev = json!({"EventType": "login", "User": "alice"});
3227 let ts = 1000;
3228
3229 engine.process_event_at(&JsonEvent::borrow(&ev), ts);
3230 engine.process_event_at(&JsonEvent::borrow(&ev), ts + 1);
3231 let r3 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 2);
3232 assert_eq!(r3.correlations.len(), 1);
3233
3234 let r4 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 3);
3236 assert_eq!(
3237 r4.correlations.len(),
3238 1,
3239 "no suppression: fires on every event after threshold"
3240 );
3241
3242 let r5 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 4);
3243 assert_eq!(r5.correlations.len(), 1, "still fires");
3244 }
3245
3246 fn yaml_str_attrs<const N: usize>(
3251 pairs: [(&str, &str); N],
3252 ) -> std::collections::HashMap<String, serde_yaml::Value> {
3253 pairs
3254 .into_iter()
3255 .map(|(k, v)| (k.to_string(), serde_yaml::Value::String(v.to_string())))
3256 .collect()
3257 }
3258
3259 #[test]
3260 fn test_custom_attr_timestamp_field() {
3261 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3262 let attrs = yaml_str_attrs([("rsigma.timestamp_field", "time")]);
3263 engine.apply_custom_attributes(&attrs);
3264
3265 assert_eq!(
3266 engine.config.timestamp_fields[0], "time",
3267 "rsigma.timestamp_field should be prepended"
3268 );
3269 assert!(
3271 engine
3272 .config
3273 .timestamp_fields
3274 .contains(&"@timestamp".to_string())
3275 );
3276 }
3277
3278 #[test]
3279 fn test_custom_attr_timestamp_field_no_duplicates() {
3280 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3281 let attrs = yaml_str_attrs([("rsigma.timestamp_field", "time")]);
3282 engine.apply_custom_attributes(&attrs);
3284 engine.apply_custom_attributes(&attrs);
3285
3286 let count = engine
3287 .config
3288 .timestamp_fields
3289 .iter()
3290 .filter(|f| *f == "time")
3291 .count();
3292 assert_eq!(count, 1, "should not duplicate timestamp_field entries");
3293 }
3294
3295 #[test]
3296 fn test_custom_attr_suppress() {
3297 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3298 assert!(engine.config.suppress.is_none());
3299
3300 let attrs = yaml_str_attrs([("rsigma.suppress", "5m")]);
3301 engine.apply_custom_attributes(&attrs);
3302
3303 assert_eq!(engine.config.suppress, Some(300));
3304 }
3305
3306 #[test]
3307 fn test_custom_attr_suppress_does_not_override_cli() {
3308 let config = CorrelationConfig {
3309 suppress: Some(60), ..Default::default()
3311 };
3312 let mut engine = CorrelationEngine::new(config);
3313
3314 let attrs = yaml_str_attrs([("rsigma.suppress", "5m")]);
3315 engine.apply_custom_attributes(&attrs);
3316
3317 assert_eq!(
3318 engine.config.suppress,
3319 Some(60),
3320 "CLI suppress should not be overridden by custom attribute"
3321 );
3322 }
3323
3324 #[test]
3325 fn test_custom_attr_action() {
3326 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3327 assert_eq!(engine.config.action_on_match, CorrelationAction::Alert);
3328
3329 let attrs = yaml_str_attrs([("rsigma.action", "reset")]);
3330 engine.apply_custom_attributes(&attrs);
3331
3332 assert_eq!(engine.config.action_on_match, CorrelationAction::Reset);
3333 }
3334
3335 #[test]
3336 fn test_custom_attr_action_does_not_override_cli() {
3337 let config = CorrelationConfig {
3338 action_on_match: CorrelationAction::Reset, ..Default::default()
3340 };
3341 let mut engine = CorrelationEngine::new(config);
3342
3343 let attrs = yaml_str_attrs([("rsigma.action", "alert")]);
3344 engine.apply_custom_attributes(&attrs);
3345
3346 assert_eq!(
3347 engine.config.action_on_match,
3348 CorrelationAction::Reset,
3349 "CLI action should not be overridden by custom attribute"
3350 );
3351 }
3352
3353 #[test]
3354 fn test_custom_attr_timestamp_field_used_for_extraction() {
3355 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3357 let mut config = CorrelationConfig::default();
3358 config.timestamp_fields.insert(0, "event_time".to_string());
3360 let mut engine = CorrelationEngine::new(config);
3361 engine.add_collection(&collection).unwrap();
3362
3363 let ev = json!({
3365 "EventType": "login",
3366 "User": "alice",
3367 "event_time": "2026-02-11T12:00:00Z"
3368 });
3369 let result = engine.process_event(&JsonEvent::borrow(&ev));
3370
3371 assert!(!result.detections.is_empty() || result.correlations.is_empty());
3373 let ts = engine
3377 .extract_event_timestamp(&JsonEvent::borrow(&ev))
3378 .expect("should extract timestamp");
3379 assert!(
3380 ts > 1_700_000_000 && ts < 1_800_000_000,
3381 "timestamp should be ~2026 epoch, got {ts}"
3382 );
3383 }
3384
3385 #[test]
3390 fn test_correlation_cycle_direct() {
3391 let yaml = r#"
3393title: detection rule
3394id: det-rule
3395logsource:
3396 product: test
3397detection:
3398 selection:
3399 action: click
3400 condition: selection
3401level: low
3402---
3403title: correlation A
3404id: corr-a
3405correlation:
3406 type: event_count
3407 rules:
3408 - corr-b
3409 group-by:
3410 - User
3411 timespan: 5m
3412 condition:
3413 gte: 2
3414level: high
3415---
3416title: correlation B
3417id: corr-b
3418correlation:
3419 type: event_count
3420 rules:
3421 - corr-a
3422 group-by:
3423 - User
3424 timespan: 5m
3425 condition:
3426 gte: 2
3427level: high
3428"#;
3429 let collection = parse_sigma_yaml(yaml).unwrap();
3430 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3431 let result = engine.add_collection(&collection);
3432 assert!(result.is_err(), "should detect direct cycle");
3433 let err = result.unwrap_err().to_string();
3434 assert!(err.contains("cycle"), "error should mention cycle: {err}");
3435 assert!(
3436 err.contains("corr-a") && err.contains("corr-b"),
3437 "error should name both correlations: {err}"
3438 );
3439 }
3440
3441 #[test]
3442 fn test_correlation_cycle_self() {
3443 let yaml = r#"
3445title: detection rule
3446id: det-rule
3447logsource:
3448 product: test
3449detection:
3450 selection:
3451 action: click
3452 condition: selection
3453level: low
3454---
3455title: self-ref correlation
3456id: self-corr
3457correlation:
3458 type: event_count
3459 rules:
3460 - self-corr
3461 group-by:
3462 - User
3463 timespan: 5m
3464 condition:
3465 gte: 2
3466level: high
3467"#;
3468 let collection = parse_sigma_yaml(yaml).unwrap();
3469 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3470 let result = engine.add_collection(&collection);
3471 assert!(result.is_err(), "should detect self-referencing cycle");
3472 let err = result.unwrap_err().to_string();
3473 assert!(err.contains("cycle"), "error should mention cycle: {err}");
3474 assert!(
3475 err.contains("self-corr"),
3476 "error should name the correlation: {err}"
3477 );
3478 }
3479
3480 #[test]
3481 fn test_correlation_no_cycle_valid_chain() {
3482 let yaml = r#"
3484title: detection rule
3485id: det-rule
3486logsource:
3487 product: test
3488detection:
3489 selection:
3490 action: click
3491 condition: selection
3492level: low
3493---
3494title: correlation A
3495id: corr-a
3496correlation:
3497 type: event_count
3498 rules:
3499 - det-rule
3500 group-by:
3501 - User
3502 timespan: 5m
3503 condition:
3504 gte: 2
3505level: high
3506---
3507title: correlation B
3508id: corr-b
3509correlation:
3510 type: event_count
3511 rules:
3512 - corr-a
3513 group-by:
3514 - User
3515 timespan: 5m
3516 condition:
3517 gte: 2
3518level: high
3519"#;
3520 let collection = parse_sigma_yaml(yaml).unwrap();
3521 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3522 let result = engine.add_collection(&collection);
3523 assert!(
3524 result.is_ok(),
3525 "valid chain should not be rejected: {result:?}"
3526 );
3527 }
3528
3529 #[test]
3530 fn test_correlation_cycle_transitive() {
3531 let yaml = r#"
3533title: detection rule
3534id: det-rule
3535logsource:
3536 product: test
3537detection:
3538 selection:
3539 action: click
3540 condition: selection
3541level: low
3542---
3543title: correlation A
3544id: corr-a
3545correlation:
3546 type: event_count
3547 rules:
3548 - corr-c
3549 group-by:
3550 - User
3551 timespan: 5m
3552 condition:
3553 gte: 2
3554level: high
3555---
3556title: correlation B
3557id: corr-b
3558correlation:
3559 type: event_count
3560 rules:
3561 - corr-a
3562 group-by:
3563 - User
3564 timespan: 5m
3565 condition:
3566 gte: 2
3567level: high
3568---
3569title: correlation C
3570id: corr-c
3571correlation:
3572 type: event_count
3573 rules:
3574 - corr-b
3575 group-by:
3576 - User
3577 timespan: 5m
3578 condition:
3579 gte: 2
3580level: high
3581"#;
3582 let collection = parse_sigma_yaml(yaml).unwrap();
3583 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3584 let result = engine.add_collection(&collection);
3585 assert!(result.is_err(), "should detect transitive cycle");
3586 let err = result.unwrap_err().to_string();
3587 assert!(err.contains("cycle"), "error should mention cycle: {err}");
3588 }
3589
3590 #[test]
3595 fn test_correlation_events_disabled_by_default() {
3596 let yaml = r#"
3597title: Login
3598id: login-rule
3599logsource:
3600 category: auth
3601detection:
3602 selection:
3603 EventType: login
3604 condition: selection
3605---
3606title: Many Logins
3607correlation:
3608 type: event_count
3609 rules:
3610 - login-rule
3611 group-by:
3612 - User
3613 timespan: 60s
3614 condition:
3615 gte: 3
3616level: high
3617"#;
3618 let collection = parse_sigma_yaml(yaml).unwrap();
3619 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3620 engine.add_collection(&collection).unwrap();
3621
3622 for i in 0..3 {
3623 let v = json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i});
3624 let event = JsonEvent::borrow(&v);
3625 let result = engine.process_event_at(&event, 1000 + i);
3626 if i == 2 {
3627 assert_eq!(result.correlations.len(), 1);
3628 assert!(result.correlations[0].events.is_none());
3630 }
3631 }
3632 }
3633
3634 #[test]
3635 fn test_correlation_events_included_when_enabled() {
3636 let yaml = r#"
3637title: Login
3638id: login-rule
3639logsource:
3640 category: auth
3641detection:
3642 selection:
3643 EventType: login
3644 condition: selection
3645---
3646title: Many Logins
3647correlation:
3648 type: event_count
3649 rules:
3650 - login-rule
3651 group-by:
3652 - User
3653 timespan: 60s
3654 condition:
3655 gte: 3
3656level: high
3657"#;
3658 let collection = parse_sigma_yaml(yaml).unwrap();
3659 let config = CorrelationConfig {
3660 correlation_event_mode: CorrelationEventMode::Full,
3661 max_correlation_events: 10,
3662 ..Default::default()
3663 };
3664 let mut engine = CorrelationEngine::new(config);
3665 engine.add_collection(&collection).unwrap();
3666
3667 let events_sent: Vec<serde_json::Value> = (0..3)
3668 .map(|i| json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i}))
3669 .collect();
3670
3671 let mut corr_result = None;
3672 for (i, ev) in events_sent.iter().enumerate() {
3673 let event = JsonEvent::borrow(ev);
3674 let result = engine.process_event_at(&event, 1000 + i as i64);
3675 if !result.correlations.is_empty() {
3676 corr_result = Some(result);
3677 }
3678 }
3679
3680 let result = corr_result.expect("correlation should have fired");
3681 let corr = &result.correlations[0];
3682
3683 let events = corr.events.as_ref().expect("events should be present");
3685 assert_eq!(
3686 events.len(),
3687 3,
3688 "all 3 contributing events should be stored"
3689 );
3690
3691 for (i, event) in events.iter().enumerate() {
3693 assert_eq!(event["EventType"], "login");
3694 assert_eq!(event["User"], "admin");
3695 assert_eq!(event["@timestamp"], 1000 + i as i64);
3696 }
3697 }
3698
3699 #[test]
3700 fn test_correlation_events_max_cap() {
3701 let yaml = r#"
3702title: Login
3703id: login-rule
3704logsource:
3705 category: auth
3706detection:
3707 selection:
3708 EventType: login
3709 condition: selection
3710---
3711title: Many Logins
3712correlation:
3713 type: event_count
3714 rules:
3715 - login-rule
3716 group-by:
3717 - User
3718 timespan: 60s
3719 condition:
3720 gte: 5
3721level: high
3722"#;
3723 let collection = parse_sigma_yaml(yaml).unwrap();
3724 let config = CorrelationConfig {
3725 correlation_event_mode: CorrelationEventMode::Full,
3726 max_correlation_events: 3, ..Default::default()
3728 };
3729 let mut engine = CorrelationEngine::new(config);
3730 engine.add_collection(&collection).unwrap();
3731
3732 let mut corr_result = None;
3733 for i in 0..5 {
3734 let v = json!({"EventType": "login", "User": "admin", "idx": i});
3735 let event = JsonEvent::borrow(&v);
3736 let result = engine.process_event_at(&event, 1000 + i);
3737 if !result.correlations.is_empty() {
3738 corr_result = Some(result);
3739 }
3740 }
3741
3742 let result = corr_result.expect("correlation should have fired");
3743 let events = result.correlations[0]
3744 .events
3745 .as_ref()
3746 .expect("events should be present");
3747
3748 assert_eq!(events.len(), 3);
3750 assert_eq!(events[0]["idx"], 2);
3751 assert_eq!(events[1]["idx"], 3);
3752 assert_eq!(events[2]["idx"], 4);
3753 }
3754
3755 #[test]
3756 fn test_correlation_events_with_reset_action() {
3757 let yaml = r#"
3758title: Login
3759id: login-rule
3760logsource:
3761 category: auth
3762detection:
3763 selection:
3764 EventType: login
3765 condition: selection
3766---
3767title: Many Logins
3768correlation:
3769 type: event_count
3770 rules:
3771 - login-rule
3772 group-by:
3773 - User
3774 timespan: 60s
3775 condition:
3776 gte: 2
3777level: high
3778"#;
3779 let collection = parse_sigma_yaml(yaml).unwrap();
3780 let config = CorrelationConfig {
3781 correlation_event_mode: CorrelationEventMode::Full,
3782 action_on_match: CorrelationAction::Reset,
3783 ..Default::default()
3784 };
3785 let mut engine = CorrelationEngine::new(config);
3786 engine.add_collection(&collection).unwrap();
3787
3788 for i in 0..2 {
3790 let v = json!({"EventType": "login", "User": "admin", "round": 1, "idx": i});
3791 let event = JsonEvent::borrow(&v);
3792 let result = engine.process_event_at(&event, 1000 + i);
3793 if i == 1 {
3794 assert_eq!(result.correlations.len(), 1);
3795 let events = result.correlations[0].events.as_ref().unwrap();
3796 assert_eq!(events.len(), 2);
3797 }
3798 }
3799
3800 let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 0});
3803 let event = JsonEvent::borrow(&v);
3804 let result = engine.process_event_at(&event, 1010);
3805 assert!(
3806 result.correlations.is_empty(),
3807 "should not fire with only 1 event after reset"
3808 );
3809
3810 let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 1});
3811 let event = JsonEvent::borrow(&v);
3812 let result = engine.process_event_at(&event, 1011);
3813 assert_eq!(result.correlations.len(), 1);
3814 let events = result.correlations[0].events.as_ref().unwrap();
3815 assert_eq!(events.len(), 2);
3816 assert_eq!(events[0]["round"], 2);
3818 assert_eq!(events[1]["round"], 2);
3819 }
3820
3821 #[test]
3822 fn test_correlation_events_with_set_include() {
3823 let yaml = r#"
3824title: Login
3825id: login-rule
3826logsource:
3827 category: auth
3828detection:
3829 selection:
3830 EventType: login
3831 condition: selection
3832---
3833title: Many Logins
3834correlation:
3835 type: event_count
3836 rules:
3837 - login-rule
3838 group-by:
3839 - User
3840 timespan: 60s
3841 condition:
3842 gte: 2
3843level: high
3844"#;
3845 let collection = parse_sigma_yaml(yaml).unwrap();
3846 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3847 engine.add_collection(&collection).unwrap();
3848
3849 engine.set_correlation_event_mode(CorrelationEventMode::Full);
3851
3852 for i in 0..2 {
3853 let v = json!({"EventType": "login", "User": "admin"});
3854 let event = JsonEvent::borrow(&v);
3855 let result = engine.process_event_at(&event, 1000 + i);
3856 if i == 1 {
3857 assert_eq!(result.correlations.len(), 1);
3858 assert!(result.correlations[0].events.is_some());
3859 assert_eq!(result.correlations[0].events.as_ref().unwrap().len(), 2);
3860 }
3861 }
3862 }
3863
3864 #[test]
3865 fn test_correlation_events_eviction_syncs_with_window() {
3866 let yaml = r#"
3867title: Login
3868id: login-rule
3869logsource:
3870 category: auth
3871detection:
3872 selection:
3873 EventType: login
3874 condition: selection
3875---
3876title: Many Logins
3877correlation:
3878 type: event_count
3879 rules:
3880 - login-rule
3881 group-by:
3882 - User
3883 timespan: 10s
3884 condition:
3885 gte: 3
3886level: high
3887"#;
3888 let collection = parse_sigma_yaml(yaml).unwrap();
3889 let config = CorrelationConfig {
3890 correlation_event_mode: CorrelationEventMode::Full,
3891 max_correlation_events: 100,
3892 ..Default::default()
3893 };
3894 let mut engine = CorrelationEngine::new(config);
3895 engine.add_collection(&collection).unwrap();
3896
3897 for i in 0..2 {
3899 let v = json!({"EventType": "login", "User": "admin", "idx": i});
3900 let event = JsonEvent::borrow(&v);
3901 engine.process_event_at(&event, 1000 + i);
3902 }
3903
3904 let v = json!({"EventType": "login", "User": "admin", "idx": 2});
3907 let event = JsonEvent::borrow(&v);
3908 let result = engine.process_event_at(&event, 1015);
3909 assert!(
3911 result.correlations.is_empty(),
3912 "should not fire — old events evicted"
3913 );
3914
3915 for i in 3..5 {
3917 let v = json!({"EventType": "login", "User": "admin", "idx": i});
3918 let event = JsonEvent::borrow(&v);
3919 let result = engine.process_event_at(&event, 1016 + i - 3);
3920 if i == 4 {
3921 assert_eq!(result.correlations.len(), 1);
3922 let events = result.correlations[0].events.as_ref().unwrap();
3923 assert_eq!(events.len(), 3);
3925 for ev in events {
3926 assert!(ev["idx"].as_i64().unwrap() >= 2);
3927 }
3928 }
3929 }
3930 }
3931
3932 #[test]
3933 fn test_event_buffer_monitoring() {
3934 let yaml = r#"
3935title: Login
3936id: login-rule
3937logsource:
3938 category: auth
3939detection:
3940 selection:
3941 EventType: login
3942 condition: selection
3943---
3944title: Many Logins
3945correlation:
3946 type: event_count
3947 rules:
3948 - login-rule
3949 group-by:
3950 - User
3951 timespan: 60s
3952 condition:
3953 gte: 100
3954level: high
3955"#;
3956 let collection = parse_sigma_yaml(yaml).unwrap();
3957 let config = CorrelationConfig {
3958 correlation_event_mode: CorrelationEventMode::Full,
3959 ..Default::default()
3960 };
3961 let mut engine = CorrelationEngine::new(config);
3962 engine.add_collection(&collection).unwrap();
3963
3964 assert_eq!(engine.event_buffer_count(), 0);
3965 assert_eq!(engine.event_buffer_bytes(), 0);
3966
3967 for i in 0..5 {
3969 let v = json!({"EventType": "login", "User": "admin"});
3970 let event = JsonEvent::borrow(&v);
3971 engine.process_event_at(&event, 1000 + i);
3972 }
3973
3974 assert_eq!(engine.event_buffer_count(), 1); assert!(engine.event_buffer_bytes() > 0);
3976 }
3977
3978 #[test]
3979 fn test_correlation_refs_mode_basic() {
3980 let yaml = r#"
3981title: Login
3982id: login-rule
3983logsource:
3984 category: auth
3985detection:
3986 selection:
3987 EventType: login
3988 condition: selection
3989---
3990title: Many Logins
3991correlation:
3992 type: event_count
3993 rules:
3994 - login-rule
3995 group-by:
3996 - User
3997 timespan: 60s
3998 condition:
3999 gte: 3
4000level: high
4001"#;
4002 let collection = parse_sigma_yaml(yaml).unwrap();
4003 let config = CorrelationConfig {
4004 correlation_event_mode: CorrelationEventMode::Refs,
4005 max_correlation_events: 10,
4006 ..Default::default()
4007 };
4008 let mut engine = CorrelationEngine::new(config);
4009 engine.add_collection(&collection).unwrap();
4010
4011 let mut corr_result = None;
4012 for i in 0..3 {
4013 let v = json!({"EventType": "login", "User": "admin", "id": format!("evt-{i}"), "@timestamp": 1000 + i});
4014 let event = JsonEvent::borrow(&v);
4015 let result = engine.process_event_at(&event, 1000 + i);
4016 if !result.correlations.is_empty() {
4017 corr_result = Some(result.correlations[0].clone());
4018 }
4019 }
4020
4021 let result = corr_result.expect("correlation should have fired");
4022 assert!(
4024 result.events.is_none(),
4025 "Full events should not be included in refs mode"
4026 );
4027 let refs = result
4028 .event_refs
4029 .expect("event_refs should be present in refs mode");
4030 assert_eq!(refs.len(), 3);
4031 assert_eq!(refs[0].timestamp, 1000);
4032 assert_eq!(refs[0].id, Some("evt-0".to_string()));
4033 assert_eq!(refs[1].id, Some("evt-1".to_string()));
4034 assert_eq!(refs[2].id, Some("evt-2".to_string()));
4035 }
4036
4037 #[test]
4038 fn test_correlation_refs_mode_no_id_field() {
4039 let yaml = r#"
4040title: Login
4041id: login-rule
4042logsource:
4043 category: auth
4044detection:
4045 selection:
4046 EventType: login
4047 condition: selection
4048---
4049title: Many Logins
4050correlation:
4051 type: event_count
4052 rules:
4053 - login-rule
4054 group-by:
4055 - User
4056 timespan: 60s
4057 condition:
4058 gte: 2
4059level: high
4060"#;
4061 let collection = parse_sigma_yaml(yaml).unwrap();
4062 let config = CorrelationConfig {
4063 correlation_event_mode: CorrelationEventMode::Refs,
4064 ..Default::default()
4065 };
4066 let mut engine = CorrelationEngine::new(config);
4067 engine.add_collection(&collection).unwrap();
4068
4069 let mut corr_result = None;
4070 for i in 0..2 {
4071 let v = json!({"EventType": "login", "User": "admin"});
4072 let event = JsonEvent::borrow(&v);
4073 let result = engine.process_event_at(&event, 1000 + i);
4074 if !result.correlations.is_empty() {
4075 corr_result = Some(result.correlations[0].clone());
4076 }
4077 }
4078
4079 let result = corr_result.expect("correlation should have fired");
4080 let refs = result.event_refs.expect("event_refs should be present");
4081 for r in &refs {
4083 assert_eq!(r.id, None);
4084 }
4085 }
4086
4087 #[test]
4088 fn test_per_correlation_custom_attributes_from_yaml() {
4089 let yaml = r#"
4090title: Login
4091id: login-rule
4092logsource:
4093 category: auth
4094detection:
4095 selection:
4096 EventType: login
4097 condition: selection
4098---
4099title: Many Logins
4100custom_attributes:
4101 rsigma.correlation_event_mode: refs
4102 rsigma.max_correlation_events: "5"
4103correlation:
4104 type: event_count
4105 rules:
4106 - login-rule
4107 group-by:
4108 - User
4109 timespan: 60s
4110 condition:
4111 gte: 3
4112level: high
4113"#;
4114 let collection = parse_sigma_yaml(yaml).unwrap();
4115 let config = CorrelationConfig::default();
4117 let mut engine = CorrelationEngine::new(config);
4118 engine.add_collection(&collection).unwrap();
4119
4120 let mut corr_result = None;
4121 for i in 0..3 {
4122 let v = json!({"EventType": "login", "User": "admin", "id": format!("e{i}")});
4123 let event = JsonEvent::borrow(&v);
4124 let result = engine.process_event_at(&event, 1000 + i);
4125 if !result.correlations.is_empty() {
4126 corr_result = Some(result.correlations[0].clone());
4127 }
4128 }
4129
4130 let result = corr_result.expect("correlation should fire with per-correlation refs mode");
4131 assert!(result.events.is_none());
4133 let refs = result
4134 .event_refs
4135 .expect("event_refs via per-correlation override");
4136 assert_eq!(refs.len(), 3);
4137 assert_eq!(refs[0].id, Some("e0".to_string()));
4138 }
4139
4140 #[test]
4141 fn test_per_correlation_custom_attr_suppress_and_action() {
4142 let yaml = r#"
4143title: Login
4144id: login-rule
4145logsource:
4146 category: auth
4147detection:
4148 selection:
4149 EventType: login
4150 condition: selection
4151---
4152title: Many Logins
4153custom_attributes:
4154 rsigma.suppress: 10s
4155 rsigma.action: reset
4156correlation:
4157 type: event_count
4158 rules:
4159 - login-rule
4160 group-by:
4161 - User
4162 timespan: 60s
4163 condition:
4164 gte: 2
4165level: high
4166"#;
4167 let collection = parse_sigma_yaml(yaml).unwrap();
4168 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
4169 engine.add_collection(&collection).unwrap();
4170
4171 assert_eq!(engine.correlations[0].suppress_secs, Some(10));
4173 assert_eq!(
4174 engine.correlations[0].action,
4175 Some(CorrelationAction::Reset)
4176 );
4177 }
4178
4179 #[test]
4180 fn test_process_with_detections_matches_process_event_at() {
4181 let yaml = r#"
4182title: Login Failure
4183id: login-fail
4184logsource:
4185 category: auth
4186detection:
4187 selection:
4188 EventType: login_failure
4189 condition: selection
4190---
4191title: Brute Force
4192correlation:
4193 type: event_count
4194 rules:
4195 - login-fail
4196 group-by:
4197 - User
4198 timespan: 60s
4199 condition:
4200 gte: 3
4201level: high
4202"#;
4203 let collection = parse_sigma_yaml(yaml).unwrap();
4204
4205 let mut engine1 = CorrelationEngine::new(CorrelationConfig::default());
4207 engine1.add_collection(&collection).unwrap();
4208
4209 let events: Vec<serde_json::Value> = (0..5)
4210 .map(|i| json!({"EventType": "login_failure", "User": "admin", "@timestamp": format!("2025-01-01T00:00:0{}Z", i + 1)}))
4211 .collect();
4212
4213 let results1: Vec<ProcessResult> = events
4214 .iter()
4215 .enumerate()
4216 .map(|(i, v)| {
4217 let e = JsonEvent::borrow(v);
4218 engine1.process_event_at(&e, 1000 + i as i64)
4219 })
4220 .collect();
4221
4222 let mut engine2 = CorrelationEngine::new(CorrelationConfig::default());
4224 engine2.add_collection(&collection).unwrap();
4225
4226 let results2: Vec<ProcessResult> = events
4227 .iter()
4228 .enumerate()
4229 .map(|(i, v)| {
4230 let e = JsonEvent::borrow(v);
4231 let detections = engine2.evaluate(&e);
4232 engine2.process_with_detections(&e, detections, 1000 + i as i64)
4233 })
4234 .collect();
4235
4236 assert_eq!(results1.len(), results2.len());
4238 for (r1, r2) in results1.iter().zip(results2.iter()) {
4239 assert_eq!(r1.detections.len(), r2.detections.len());
4240 assert_eq!(r1.correlations.len(), r2.correlations.len());
4241 }
4242 }
4243
4244 #[test]
4245 fn test_process_batch_matches_sequential() {
4246 let yaml = r#"
4247title: Login Failure
4248id: login-fail-batch
4249logsource:
4250 category: auth
4251detection:
4252 selection:
4253 EventType: login_failure
4254 condition: selection
4255---
4256title: Brute Force Batch
4257correlation:
4258 type: event_count
4259 rules:
4260 - login-fail-batch
4261 group-by:
4262 - User
4263 timespan: 60s
4264 condition:
4265 gte: 3
4266level: high
4267"#;
4268 let collection = parse_sigma_yaml(yaml).unwrap();
4269
4270 let event_values: Vec<serde_json::Value> = (0..5)
4271 .map(|i| json!({"EventType": "login_failure", "User": "admin", "@timestamp": format!("2025-01-01T00:00:0{}Z", i + 1)}))
4272 .collect();
4273
4274 let mut engine1 = CorrelationEngine::new(CorrelationConfig::default());
4276 engine1.add_collection(&collection).unwrap();
4277 let sequential: Vec<ProcessResult> = event_values
4278 .iter()
4279 .enumerate()
4280 .map(|(i, v)| {
4281 let e = JsonEvent::borrow(v);
4282 engine1.process_event_at(&e, 1000 + i as i64)
4283 })
4284 .collect();
4285
4286 let mut engine2 = CorrelationEngine::new(CorrelationConfig::default());
4288 engine2.add_collection(&collection).unwrap();
4289 let events: Vec<JsonEvent> = event_values.iter().map(JsonEvent::borrow).collect();
4290 let refs: Vec<&JsonEvent> = events.iter().collect();
4291 let batch = engine2.process_batch(&refs);
4292
4293 assert_eq!(sequential.len(), batch.len());
4294 for (seq, bat) in sequential.iter().zip(batch.iter()) {
4295 assert_eq!(seq.detections.len(), bat.detections.len());
4296 assert_eq!(seq.correlations.len(), bat.correlations.len());
4297 }
4298 }
4299
4300 #[test]
4301 fn test_correlation_result_custom_attributes() {
4302 let yaml = r#"
4303title: Login
4304id: login-cra
4305logsource:
4306 category: auth
4307detection:
4308 selection:
4309 EventType: login
4310 condition: selection
4311level: low
4312---
4313title: Many Logins
4314my_custom_field: hello
4315priority: 9
4316nested:
4317 key: value
4318correlation:
4319 type: event_count
4320 rules:
4321 - login-cra
4322 group-by:
4323 - User
4324 timespan: 60s
4325 condition:
4326 gte: 2
4327level: high
4328"#;
4329 let collection = parse_sigma_yaml(yaml).unwrap();
4330 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
4331 engine.add_collection(&collection).unwrap();
4332
4333 let base_ts = 1000i64;
4334 for i in 0..2 {
4335 let v = json!({"EventType": "login", "User": "alice"});
4336 let event = JsonEvent::borrow(&v);
4337 let result = engine.process_event_at(&event, base_ts + i * 10);
4338
4339 if i == 1 {
4340 assert_eq!(result.correlations.len(), 1);
4341 let corr = &result.correlations[0];
4342 assert_eq!(corr.rule_title, "Many Logins");
4343 assert_eq!(
4344 corr.custom_attributes.get("my_custom_field"),
4345 Some(&serde_json::Value::String("hello".to_string()))
4346 );
4347 assert_eq!(
4348 corr.custom_attributes.get("priority"),
4349 Some(&serde_json::json!(9))
4350 );
4351 let nested = corr.custom_attributes.get("nested").unwrap();
4352 assert_eq!(nested.get("key"), Some(&serde_json::json!("value")));
4353
4354 assert!(!corr.custom_attributes.contains_key("title"));
4355 assert!(!corr.custom_attributes.contains_key("correlation"));
4356 assert!(!corr.custom_attributes.contains_key("level"));
4357 }
4358 }
4359 }
4360
4361 #[test]
4362 fn test_detection_result_custom_attributes() {
4363 let yaml = r#"
4364title: Login Detection
4365logsource:
4366 category: auth
4367detection:
4368 selection:
4369 EventType: login
4370 condition: selection
4371level: low
4372my_detection_tag: important
4373score: 42
4374"#;
4375 let collection = parse_sigma_yaml(yaml).unwrap();
4376 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
4377 engine.add_collection(&collection).unwrap();
4378
4379 let v = json!({"EventType": "login"});
4380 let event = JsonEvent::borrow(&v);
4381 let result = engine.process_event(&event);
4382
4383 assert_eq!(result.detections.len(), 1);
4384 let det = &result.detections[0];
4385 assert_eq!(
4386 det.custom_attributes.get("my_detection_tag"),
4387 Some(&serde_json::Value::String("important".to_string()))
4388 );
4389 assert_eq!(
4390 det.custom_attributes.get("score"),
4391 Some(&serde_json::json!(42))
4392 );
4393 assert!(!det.custom_attributes.contains_key("title"));
4394 }
4395}