1use std::collections::HashMap;
16
17use chrono::{DateTime, TimeZone, Utc};
18use serde::Serialize;
19
20use rsigma_parser::{CorrelationRule, CorrelationType, Level, SigmaCollection, SigmaRule};
21
22use crate::correlation::{
23 CompiledCorrelation, EventBuffer, EventRef, EventRefBuffer, GroupKey, WindowState,
24 compile_correlation,
25};
26use crate::engine::Engine;
27use crate::error::Result;
28use crate::event::Event;
29use crate::pipeline::{Pipeline, apply_pipelines};
30use crate::result::MatchResult;
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
41#[serde(rename_all = "snake_case")]
42pub enum CorrelationAction {
43 #[default]
46 Alert,
47 Reset,
50}
51
52impl std::str::FromStr for CorrelationAction {
53 type Err = String;
54 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
55 match s {
56 "alert" => Ok(CorrelationAction::Alert),
57 "reset" => Ok(CorrelationAction::Reset),
58 _ => Err(format!(
59 "Unknown correlation action: {s} (expected 'alert' or 'reset')"
60 )),
61 }
62 }
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
70#[serde(rename_all = "snake_case")]
71pub enum CorrelationEventMode {
72 #[default]
74 None,
75 Full,
78 Refs,
81}
82
83impl std::str::FromStr for CorrelationEventMode {
84 type Err = String;
85 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
86 match s.to_lowercase().as_str() {
87 "none" | "off" | "false" => Ok(CorrelationEventMode::None),
88 "full" | "true" => Ok(CorrelationEventMode::Full),
89 "refs" | "references" => Ok(CorrelationEventMode::Refs),
90 _ => Err(format!(
91 "Unknown correlation event mode: {s} (expected 'none', 'full', or 'refs')"
92 )),
93 }
94 }
95}
96
97impl std::fmt::Display for CorrelationEventMode {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 match self {
100 CorrelationEventMode::None => write!(f, "none"),
101 CorrelationEventMode::Full => write!(f, "full"),
102 CorrelationEventMode::Refs => write!(f, "refs"),
103 }
104 }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
109pub enum TimestampFallback {
110 #[default]
112 WallClock,
113 Skip,
117}
118
119#[derive(Debug, Clone)]
125pub struct CorrelationConfig {
126 pub timestamp_fields: Vec<String>,
131
132 pub timestamp_fallback: TimestampFallback,
136
137 pub max_state_entries: usize,
142
143 pub suppress: Option<u64>,
151
152 pub action_on_match: CorrelationAction,
156
157 pub emit_detections: bool,
163
164 pub correlation_event_mode: CorrelationEventMode,
172
173 pub max_correlation_events: usize,
179}
180
181impl Default for CorrelationConfig {
182 fn default() -> Self {
183 CorrelationConfig {
184 timestamp_fields: vec![
185 "@timestamp".to_string(),
186 "timestamp".to_string(),
187 "EventTime".to_string(),
188 "TimeCreated".to_string(),
189 "eventTime".to_string(),
190 ],
191 timestamp_fallback: TimestampFallback::default(),
192 max_state_entries: 100_000,
193 suppress: None,
194 action_on_match: CorrelationAction::default(),
195 emit_detections: true,
196 correlation_event_mode: CorrelationEventMode::default(),
197 max_correlation_events: 10,
198 }
199 }
200}
201
202#[derive(Debug, Clone, Serialize)]
208pub struct ProcessResult {
209 pub detections: Vec<MatchResult>,
211 pub correlations: Vec<CorrelationResult>,
213}
214
215#[derive(Debug, Clone, Serialize)]
217pub struct CorrelationResult {
218 pub rule_title: String,
220 pub rule_id: Option<String>,
222 pub level: Option<Level>,
224 pub tags: Vec<String>,
226 pub correlation_type: CorrelationType,
228 pub group_key: Vec<(String, String)>,
230 pub aggregated_value: f64,
232 pub timespan_secs: u64,
234 #[serde(skip_serializing_if = "Option::is_none")]
239 pub events: Option<Vec<serde_json::Value>>,
240 #[serde(skip_serializing_if = "Option::is_none")]
244 pub event_refs: Option<Vec<EventRef>>,
245}
246
247pub struct CorrelationEngine {
256 engine: Engine,
258 correlations: Vec<CompiledCorrelation>,
260 rule_index: HashMap<String, Vec<usize>>,
263 rule_ids: Vec<(Option<String>, Option<String>)>,
266 state: HashMap<(usize, GroupKey), WindowState>,
268 last_alert: HashMap<(usize, GroupKey), i64>,
270 event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
272 event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
274 correlation_only_rules: std::collections::HashSet<String>,
278 config: CorrelationConfig,
280 pipelines: Vec<Pipeline>,
282}
283
284impl CorrelationEngine {
285 pub fn new(config: CorrelationConfig) -> Self {
287 CorrelationEngine {
288 engine: Engine::new(),
289 correlations: Vec::new(),
290 rule_index: HashMap::new(),
291 rule_ids: Vec::new(),
292 state: HashMap::new(),
293 last_alert: HashMap::new(),
294 event_buffers: HashMap::new(),
295 event_ref_buffers: HashMap::new(),
296 correlation_only_rules: std::collections::HashSet::new(),
297 config,
298 pipelines: Vec::new(),
299 }
300 }
301
302 pub fn add_pipeline(&mut self, pipeline: Pipeline) {
306 self.pipelines.push(pipeline);
307 self.pipelines.sort_by_key(|p| p.priority);
308 }
309
310 pub fn set_include_event(&mut self, include: bool) {
312 self.engine.set_include_event(include);
313 }
314
315 pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
321 self.config.correlation_event_mode = mode;
322 }
323
324 pub fn set_max_correlation_events(&mut self, max: usize) {
327 self.config.max_correlation_events = max;
328 }
329
330 pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
336 if self.pipelines.is_empty() {
337 self.apply_custom_attributes(&rule.custom_attributes);
338 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
339 self.engine.add_rule(rule)?;
340 } else {
341 let mut transformed = rule.clone();
342 apply_pipelines(&self.pipelines, &mut transformed)?;
343 self.apply_custom_attributes(&transformed.custom_attributes);
344 self.rule_ids
345 .push((transformed.id.clone(), transformed.name.clone()));
346 let compiled = crate::compiler::compile_rule(&transformed)?;
348 self.engine.add_compiled_rule(compiled);
349 }
350 Ok(())
351 }
352
353 fn apply_custom_attributes(&mut self, attrs: &std::collections::HashMap<String, String>) {
367 if let Some(field) = attrs.get("rsigma.timestamp_field")
369 && !self.config.timestamp_fields.contains(field)
370 {
371 self.config.timestamp_fields.insert(0, field.clone());
372 }
373
374 if let Some(val) = attrs.get("rsigma.suppress")
376 && self.config.suppress.is_none()
377 && let Ok(ts) = rsigma_parser::Timespan::parse(val)
378 {
379 self.config.suppress = Some(ts.seconds);
380 }
381
382 if let Some(val) = attrs.get("rsigma.action")
384 && self.config.action_on_match == CorrelationAction::Alert
385 && let Ok(a) = val.parse::<CorrelationAction>()
386 {
387 self.config.action_on_match = a;
388 }
389 }
390
391 pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
393 self.apply_custom_attributes(&corr.custom_attributes);
396
397 let compiled = compile_correlation(corr)?;
398 let idx = self.correlations.len();
399
400 for rule_ref in &compiled.rule_refs {
402 self.rule_index
403 .entry(rule_ref.clone())
404 .or_default()
405 .push(idx);
406 }
407
408 if !compiled.generate {
410 for rule_ref in &compiled.rule_refs {
411 self.correlation_only_rules.insert(rule_ref.clone());
412 }
413 }
414
415 self.correlations.push(compiled);
416 Ok(())
417 }
418
419 pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
424 for rule in &collection.rules {
425 self.add_rule(rule)?;
426 }
427 for filter in &collection.filters {
429 self.engine.apply_filter(filter)?;
430 }
431 for corr in &collection.correlations {
432 self.add_correlation(corr)?;
433 }
434 self.detect_correlation_cycles()?;
436 Ok(())
437 }
438
439 fn detect_correlation_cycles(&self) -> Result<()> {
447 let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
449 for (idx, corr) in self.correlations.iter().enumerate() {
450 if let Some(ref id) = corr.id {
451 corr_identifiers.insert(id.as_str(), idx);
452 }
453 if let Some(ref name) = corr.name {
454 corr_identifiers.insert(name.as_str(), idx);
455 }
456 }
457
458 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
460 for (idx, corr) in self.correlations.iter().enumerate() {
461 for rule_ref in &corr.rule_refs {
462 if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
463 adj[idx].push(target_idx);
464 }
465 }
466 }
467
468 let mut state = vec![0u8; self.correlations.len()]; let mut path: Vec<usize> = Vec::new();
471
472 for start in 0..self.correlations.len() {
473 if state[start] == 0
474 && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
475 {
476 let names: Vec<String> = cycle
477 .iter()
478 .map(|&i| {
479 self.correlations[i]
480 .id
481 .as_deref()
482 .or(self.correlations[i].name.as_deref())
483 .unwrap_or(&self.correlations[i].title)
484 .to_string()
485 })
486 .collect();
487 return Err(crate::error::EvalError::CorrelationCycle(
488 names.join(" -> "),
489 ));
490 }
491 }
492 Ok(())
493 }
494
495 fn dfs_find_cycle(
497 node: usize,
498 adj: &[Vec<usize>],
499 state: &mut [u8],
500 path: &mut Vec<usize>,
501 ) -> Option<Vec<usize>> {
502 state[node] = 1; path.push(node);
504
505 for &next in &adj[node] {
506 if state[next] == 1 {
507 if let Some(pos) = path.iter().position(|&n| n == next) {
509 let mut cycle = path[pos..].to_vec();
510 cycle.push(next); return Some(cycle);
512 }
513 }
514 if state[next] == 0
515 && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
516 {
517 return Some(cycle);
518 }
519 }
520
521 path.pop();
522 state[node] = 2; None
524 }
525
526 pub fn process_event(&mut self, event: &Event) -> ProcessResult {
532 let ts = match self.extract_event_timestamp(event) {
533 Some(ts) => ts,
534 None => match self.config.timestamp_fallback {
535 TimestampFallback::WallClock => Utc::now().timestamp(),
536 TimestampFallback::Skip => {
537 let all_detections = self.engine.evaluate(event);
539 let detections = self.filter_detections(all_detections);
540 return ProcessResult {
541 detections,
542 correlations: Vec::new(),
543 };
544 }
545 },
546 };
547 self.process_event_at(event, ts)
548 }
549
550 pub fn process_event_at(&mut self, event: &Event, timestamp_secs: i64) -> ProcessResult {
555 let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
556
557 if self.state.len() >= self.config.max_state_entries {
559 self.evict_all(timestamp_secs);
560 }
561
562 let all_detections = self.engine.evaluate(event);
564
565 let mut correlations = Vec::new();
567 self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
568
569 self.chain_correlations(&correlations, timestamp_secs);
571
572 let detections = self.filter_detections(all_detections);
574
575 ProcessResult {
576 detections,
577 correlations,
578 }
579 }
580
581 fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
586 if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
587 all_detections
588 .into_iter()
589 .filter(|m| {
590 let id_match = m
591 .rule_id
592 .as_ref()
593 .is_some_and(|id| self.correlation_only_rules.contains(id));
594 !id_match
595 })
596 .collect()
597 } else {
598 all_detections
599 }
600 }
601
602 fn feed_detections(
604 &mut self,
605 event: &Event,
606 detections: &[MatchResult],
607 ts: i64,
608 out: &mut Vec<CorrelationResult>,
609 ) {
610 let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
613
614 for det in detections {
615 let (rule_id, rule_name) = self.find_rule_identity(det);
618
619 let mut corr_indices = Vec::new();
621 if let Some(ref id) = rule_id
622 && let Some(indices) = self.rule_index.get(id)
623 {
624 corr_indices.extend(indices);
625 }
626 if let Some(ref name) = rule_name
627 && let Some(indices) = self.rule_index.get(name)
628 {
629 corr_indices.extend(indices);
630 }
631
632 corr_indices.sort_unstable();
633 corr_indices.dedup();
634
635 for &corr_idx in &corr_indices {
636 work.push((corr_idx, rule_id.clone(), rule_name.clone()));
637 }
638 }
639
640 for (corr_idx, rule_id, rule_name) in work {
641 self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
642 }
643 }
644
645 fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
647 if let Some(ref match_id) = det.rule_id {
649 for (id, name) in &self.rule_ids {
650 if id.as_deref() == Some(match_id.as_str()) {
651 return (id.clone(), name.clone());
652 }
653 }
654 }
655 (det.rule_id.clone(), None)
657 }
658
659 fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
661 let corr = &self.correlations[corr_idx];
662 corr.event_mode
663 .unwrap_or(self.config.correlation_event_mode)
664 }
665
666 fn resolve_max_events(&self, corr_idx: usize) -> usize {
668 let corr = &self.correlations[corr_idx];
669 corr.max_events
670 .unwrap_or(self.config.max_correlation_events)
671 }
672
673 fn update_correlation(
675 &mut self,
676 corr_idx: usize,
677 event: &Event,
678 ts: i64,
679 rule_id: &Option<String>,
680 rule_name: &Option<String>,
681 out: &mut Vec<CorrelationResult>,
682 ) {
683 let corr = &self.correlations[corr_idx];
687 let corr_type = corr.correlation_type;
688 let timespan = corr.timespan_secs;
689 let level = corr.level;
690 let suppress_secs = corr.suppress_secs.or(self.config.suppress);
691 let action = corr.action.unwrap_or(self.config.action_on_match);
692 let event_mode = self.resolve_event_mode(corr_idx);
693 let max_events = self.resolve_max_events(corr_idx);
694
695 let mut ref_strs: Vec<&str> = Vec::new();
697 if let Some(id) = rule_id.as_deref() {
698 ref_strs.push(id);
699 }
700 if let Some(name) = rule_name.as_deref() {
701 ref_strs.push(name);
702 }
703 let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
704
705 let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
707
708 let state_key = (corr_idx, group_key.clone());
710 let state = self
711 .state
712 .entry(state_key.clone())
713 .or_insert_with(|| WindowState::new_for(corr_type));
714
715 let cutoff = ts - timespan as i64;
717 state.evict(cutoff);
718
719 match corr_type {
721 CorrelationType::EventCount => {
722 state.push_event_count(ts);
723 }
724 CorrelationType::ValueCount => {
725 if let Some(ref field_name) = corr.condition.field
726 && let Some(val) = event.get_field(field_name)
727 && let Some(s) = value_to_string_for_count(val)
728 {
729 state.push_value_count(ts, s);
730 }
731 }
732 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
733 state.push_temporal(ts, rule_ref);
734 }
735 CorrelationType::ValueSum
736 | CorrelationType::ValueAvg
737 | CorrelationType::ValuePercentile
738 | CorrelationType::ValueMedian => {
739 if let Some(ref field_name) = corr.condition.field
740 && let Some(val) = event.get_field(field_name)
741 && let Some(n) = value_to_f64(val)
742 {
743 state.push_numeric(ts, n);
744 }
745 }
746 }
747
748 match event_mode {
750 CorrelationEventMode::Full => {
751 let buf = self
752 .event_buffers
753 .entry(state_key.clone())
754 .or_insert_with(|| EventBuffer::new(max_events));
755 buf.evict(cutoff);
756 buf.push(ts, event.as_value());
757 }
758 CorrelationEventMode::Refs => {
759 let buf = self
760 .event_ref_buffers
761 .entry(state_key.clone())
762 .or_insert_with(|| EventRefBuffer::new(max_events));
763 buf.evict(cutoff);
764 buf.push(ts, event.as_value());
765 }
766 CorrelationEventMode::None => {}
767 }
768
769 let fired = state.check_condition(
771 &corr.condition,
772 corr_type,
773 &corr.rule_refs,
774 corr.extended_expr.as_ref(),
775 );
776
777 if let Some(agg_value) = fired {
778 let alert_key = (corr_idx, group_key.clone());
779
780 let suppressed = if let Some(suppress) = suppress_secs {
782 if let Some(&last_ts) = self.last_alert.get(&alert_key) {
783 (ts - last_ts) < suppress as i64
784 } else {
785 false
786 }
787 } else {
788 false
789 };
790
791 if !suppressed {
792 let (events, event_refs) = match event_mode {
794 CorrelationEventMode::Full => {
795 let stored = self
796 .event_buffers
797 .get(&alert_key)
798 .map(|buf| buf.decompress_all())
799 .unwrap_or_default();
800 (Some(stored), None)
801 }
802 CorrelationEventMode::Refs => {
803 let stored = self
804 .event_ref_buffers
805 .get(&alert_key)
806 .map(|buf| buf.refs())
807 .unwrap_or_default();
808 (None, Some(stored))
809 }
810 CorrelationEventMode::None => (None, None),
811 };
812
813 let corr = &self.correlations[corr_idx];
815 let result = CorrelationResult {
816 rule_title: corr.title.clone(),
817 rule_id: corr.id.clone(),
818 level,
819 tags: corr.tags.clone(),
820 correlation_type: corr_type,
821 group_key: group_key.to_pairs(&corr.group_by),
822 aggregated_value: agg_value,
823 timespan_secs: timespan,
824 events,
825 event_refs,
826 };
827 out.push(result);
828
829 self.last_alert.insert(alert_key.clone(), ts);
831
832 if action == CorrelationAction::Reset {
834 if let Some(state) = self.state.get_mut(&alert_key) {
835 state.clear();
836 }
837 if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
838 buf.clear();
839 }
840 if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
841 buf.clear();
842 }
843 }
844 }
845 }
846 }
847
848 fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
853 const MAX_CHAIN_DEPTH: usize = 10;
854 let mut pending: Vec<CorrelationResult> = fired.to_vec();
855 let mut depth = 0;
856
857 while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
858 depth += 1;
859
860 #[allow(clippy::type_complexity)]
862 let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
863 for result in &pending {
864 if let Some(ref id) = result.rule_id
865 && let Some(indices) = self.rule_index.get(id)
866 {
867 let fired_ref = result
868 .rule_id
869 .as_deref()
870 .unwrap_or(&result.rule_title)
871 .to_string();
872 for &corr_idx in indices {
873 work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
874 }
875 }
876 }
877
878 let mut next_pending = Vec::new();
879 for (corr_idx, group_key_pairs, fired_ref) in work {
880 let corr = &self.correlations[corr_idx];
881 let corr_type = corr.correlation_type;
882 let timespan = corr.timespan_secs;
883 let level = corr.level;
884
885 let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
886 let state_key = (corr_idx, group_key.clone());
887 let state = self
888 .state
889 .entry(state_key)
890 .or_insert_with(|| WindowState::new_for(corr_type));
891
892 let cutoff = ts - timespan as i64;
893 state.evict(cutoff);
894
895 match corr_type {
896 CorrelationType::EventCount => {
897 state.push_event_count(ts);
898 }
899 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
900 state.push_temporal(ts, &fired_ref);
901 }
902 _ => {
903 state.push_event_count(ts);
904 }
905 }
906
907 let fired = state.check_condition(
908 &corr.condition,
909 corr_type,
910 &corr.rule_refs,
911 corr.extended_expr.as_ref(),
912 );
913
914 if let Some(agg_value) = fired {
915 let corr = &self.correlations[corr_idx];
916 next_pending.push(CorrelationResult {
917 rule_title: corr.title.clone(),
918 rule_id: corr.id.clone(),
919 level,
920 tags: corr.tags.clone(),
921 correlation_type: corr_type,
922 group_key: group_key.to_pairs(&corr.group_by),
923 aggregated_value: agg_value,
924 timespan_secs: timespan,
925 events: None,
928 event_refs: None,
929 });
930 }
931 }
932
933 pending = next_pending;
934 }
935
936 if !pending.is_empty() {
937 log::warn!(
938 "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
939 {} pending result(s) were not propagated further. \
940 This may indicate a cycle in correlation references.",
941 pending.len()
942 );
943 }
944 }
945
946 fn extract_event_timestamp(&self, event: &Event) -> Option<i64> {
958 for field_name in &self.config.timestamp_fields {
959 if let Some(val) = event.get_field(field_name)
960 && let Some(ts) = parse_timestamp_value(val)
961 {
962 return Some(ts);
963 }
964 }
965 None
966 }
967
968 pub fn evict_expired(&mut self, now_secs: i64) {
974 self.evict_all(now_secs);
975 }
976
977 fn evict_all(&mut self, now_secs: i64) {
979 let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
981
982 self.state.retain(|&(corr_idx, _), state| {
983 if corr_idx < timespans.len() {
984 let cutoff = now_secs - timespans[corr_idx] as i64;
985 state.evict(cutoff);
986 }
987 !state.is_empty()
988 });
989
990 self.event_buffers.retain(|&(corr_idx, _), buf| {
992 if corr_idx < timespans.len() {
993 let cutoff = now_secs - timespans[corr_idx] as i64;
994 buf.evict(cutoff);
995 }
996 !buf.is_empty()
997 });
998 self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
999 if corr_idx < timespans.len() {
1000 let cutoff = now_secs - timespans[corr_idx] as i64;
1001 buf.evict(cutoff);
1002 }
1003 !buf.is_empty()
1004 });
1005
1006 if self.state.len() >= self.config.max_state_entries {
1010 let target = self.config.max_state_entries * 9 / 10;
1011 let excess = self.state.len() - target;
1012
1013 let mut by_staleness: Vec<_> = self
1015 .state
1016 .iter()
1017 .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1018 .collect();
1019 by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1020
1021 for (key, _) in by_staleness.into_iter().take(excess) {
1023 self.state.remove(&key);
1024 self.last_alert.remove(&key);
1025 self.event_buffers.remove(&key);
1026 self.event_ref_buffers.remove(&key);
1027 }
1028 }
1029
1030 self.last_alert.retain(|key, &mut alert_ts| {
1033 let suppress = if key.0 < self.correlations.len() {
1034 self.correlations[key.0]
1035 .suppress_secs
1036 .or(self.config.suppress)
1037 .unwrap_or(0)
1038 } else {
1039 0
1040 };
1041 (now_secs - alert_ts) < suppress as i64
1042 });
1043 }
1044
1045 pub fn state_count(&self) -> usize {
1047 self.state.len()
1048 }
1049
1050 pub fn detection_rule_count(&self) -> usize {
1052 self.engine.rule_count()
1053 }
1054
1055 pub fn correlation_rule_count(&self) -> usize {
1057 self.correlations.len()
1058 }
1059
1060 pub fn event_buffer_count(&self) -> usize {
1062 self.event_buffers.len()
1063 }
1064
1065 pub fn event_buffer_bytes(&self) -> usize {
1067 self.event_buffers
1068 .values()
1069 .map(|b| b.compressed_bytes())
1070 .sum()
1071 }
1072
1073 pub fn event_ref_buffer_count(&self) -> usize {
1075 self.event_ref_buffers.len()
1076 }
1077
1078 pub fn engine(&self) -> &Engine {
1080 &self.engine
1081 }
1082}
1083
1084impl Default for CorrelationEngine {
1085 fn default() -> Self {
1086 Self::new(CorrelationConfig::default())
1087 }
1088}
1089
1090fn parse_timestamp_value(val: &serde_json::Value) -> Option<i64> {
1096 match val {
1097 serde_json::Value::Number(n) => {
1098 if let Some(i) = n.as_i64() {
1099 Some(normalize_epoch(i))
1100 } else {
1101 n.as_f64().map(|f| normalize_epoch(f as i64))
1102 }
1103 }
1104 serde_json::Value::String(s) => parse_timestamp_string(s),
1105 _ => None,
1106 }
1107}
1108
1109fn normalize_epoch(v: i64) -> i64 {
1112 if v > 1_000_000_000_000 { v / 1000 } else { v }
1113}
1114
1115fn parse_timestamp_string(s: &str) -> Option<i64> {
1117 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1119 return Some(dt.timestamp());
1120 }
1121
1122 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1125 return Some(Utc.from_utc_datetime(&naive).timestamp());
1126 }
1127 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1128 return Some(Utc.from_utc_datetime(&naive).timestamp());
1129 }
1130
1131 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1133 return Some(Utc.from_utc_datetime(&naive).timestamp());
1134 }
1135 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1136 return Some(Utc.from_utc_datetime(&naive).timestamp());
1137 }
1138
1139 None
1140}
1141
1142fn value_to_string_for_count(v: &serde_json::Value) -> Option<String> {
1144 match v {
1145 serde_json::Value::String(s) => Some(s.clone()),
1146 serde_json::Value::Number(n) => Some(n.to_string()),
1147 serde_json::Value::Bool(b) => Some(b.to_string()),
1148 serde_json::Value::Null => Some("null".to_string()),
1149 _ => None,
1150 }
1151}
1152
1153fn value_to_f64(v: &serde_json::Value) -> Option<f64> {
1155 match v {
1156 serde_json::Value::Number(n) => n.as_f64(),
1157 serde_json::Value::String(s) => s.parse().ok(),
1158 _ => None,
1159 }
1160}
1161
1162#[cfg(test)]
1167mod tests {
1168 use super::*;
1169 use rsigma_parser::parse_sigma_yaml;
1170 use serde_json::json;
1171
1172 #[test]
1177 fn test_parse_timestamp_epoch_secs() {
1178 let val = json!(1720612200);
1179 assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1180 }
1181
1182 #[test]
1183 fn test_parse_timestamp_epoch_millis() {
1184 let val = json!(1720612200000i64);
1185 assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1186 }
1187
1188 #[test]
1189 fn test_parse_timestamp_rfc3339() {
1190 let val = json!("2024-07-10T12:30:00Z");
1191 let ts = parse_timestamp_value(&val).unwrap();
1192 assert_eq!(ts, 1720614600);
1193 }
1194
1195 #[test]
1196 fn test_parse_timestamp_naive() {
1197 let val = json!("2024-07-10T12:30:00");
1198 let ts = parse_timestamp_value(&val).unwrap();
1199 assert_eq!(ts, 1720614600);
1200 }
1201
1202 #[test]
1203 fn test_parse_timestamp_with_space() {
1204 let val = json!("2024-07-10 12:30:00");
1205 let ts = parse_timestamp_value(&val).unwrap();
1206 assert_eq!(ts, 1720614600);
1207 }
1208
1209 #[test]
1210 fn test_parse_timestamp_fractional() {
1211 let val = json!("2024-07-10T12:30:00.123Z");
1212 let ts = parse_timestamp_value(&val).unwrap();
1213 assert_eq!(ts, 1720614600);
1214 }
1215
1216 #[test]
1217 fn test_extract_timestamp_from_event() {
1218 let config = CorrelationConfig {
1219 timestamp_fields: vec!["@timestamp".to_string()],
1220 max_state_entries: 100_000,
1221 ..Default::default()
1222 };
1223 let engine = CorrelationEngine::new(config);
1224
1225 let v = json!({"@timestamp": "2024-07-10T12:30:00Z", "data": "test"});
1226 let event = Event::from_value(&v);
1227 let ts = engine.extract_event_timestamp(&event);
1228 assert_eq!(ts, Some(1720614600));
1229 }
1230
1231 #[test]
1232 fn test_extract_timestamp_fallback_fields() {
1233 let config = CorrelationConfig {
1234 timestamp_fields: vec![
1235 "@timestamp".to_string(),
1236 "timestamp".to_string(),
1237 "EventTime".to_string(),
1238 ],
1239 max_state_entries: 100_000,
1240 ..Default::default()
1241 };
1242 let engine = CorrelationEngine::new(config);
1243
1244 let v = json!({"timestamp": 1720613400, "data": "test"});
1246 let event = Event::from_value(&v);
1247 let ts = engine.extract_event_timestamp(&event);
1248 assert_eq!(ts, Some(1720613400));
1249 }
1250
1251 #[test]
1252 fn test_extract_timestamp_returns_none_when_missing() {
1253 let config = CorrelationConfig {
1254 timestamp_fields: vec!["@timestamp".to_string()],
1255 ..Default::default()
1256 };
1257 let engine = CorrelationEngine::new(config);
1258
1259 let v = json!({"data": "no timestamp here"});
1260 let event = Event::from_value(&v);
1261 assert_eq!(engine.extract_event_timestamp(&event), None);
1262 }
1263
1264 #[test]
1265 fn test_timestamp_fallback_skip() {
1266 let yaml = r#"
1267title: test rule
1268id: ts-skip-rule
1269logsource:
1270 product: test
1271detection:
1272 selection:
1273 action: click
1274 condition: selection
1275level: low
1276---
1277title: test correlation
1278correlation:
1279 type: event_count
1280 rules:
1281 - ts-skip-rule
1282 group-by:
1283 - User
1284 timespan: 10s
1285 condition:
1286 gte: 2
1287level: high
1288"#;
1289 let collection = parse_sigma_yaml(yaml).unwrap();
1290 let mut engine = CorrelationEngine::new(CorrelationConfig {
1291 timestamp_fallback: TimestampFallback::Skip,
1292 ..Default::default()
1293 });
1294 engine.add_collection(&collection).unwrap();
1295 assert_eq!(engine.correlation_rule_count(), 1);
1296
1297 let v = json!({"action": "click", "User": "alice"});
1299 let event = Event::from_value(&v);
1300
1301 let r1 = engine.process_event(&event);
1302 assert!(!r1.detections.is_empty(), "detection should still fire");
1303
1304 let r2 = engine.process_event(&event);
1305 assert!(!r2.detections.is_empty(), "detection should still fire");
1306
1307 let r3 = engine.process_event(&event);
1308 assert!(!r3.detections.is_empty(), "detection should still fire");
1309
1310 assert!(r1.correlations.is_empty());
1312 assert!(r2.correlations.is_empty());
1313 assert!(r3.correlations.is_empty());
1314 }
1315
1316 #[test]
1317 fn test_timestamp_fallback_wallclock_default() {
1318 let yaml = r#"
1319title: test rule
1320id: ts-wc-rule
1321logsource:
1322 product: test
1323detection:
1324 selection:
1325 action: click
1326 condition: selection
1327level: low
1328---
1329title: test correlation
1330correlation:
1331 type: event_count
1332 rules:
1333 - ts-wc-rule
1334 group-by:
1335 - User
1336 timespan: 60s
1337 condition:
1338 gte: 2
1339level: high
1340"#;
1341 let collection = parse_sigma_yaml(yaml).unwrap();
1342 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1343 engine.add_collection(&collection).unwrap();
1344 assert_eq!(engine.correlation_rule_count(), 1);
1345
1346 let v = json!({"action": "click", "User": "alice"});
1349 let event = Event::from_value(&v);
1350
1351 let _r1 = engine.process_event(&event);
1352 let _r2 = engine.process_event(&event);
1353 let r3 = engine.process_event(&event);
1354
1355 assert!(
1357 !r3.correlations.is_empty(),
1358 "WallClock fallback should allow correlation"
1359 );
1360 }
1361
1362 #[test]
1367 fn test_event_count_basic() {
1368 let yaml = r#"
1369title: Base Rule
1370id: base-rule-001
1371name: base_rule
1372logsource:
1373 product: windows
1374 category: process_creation
1375detection:
1376 selection:
1377 CommandLine|contains: 'whoami'
1378 condition: selection
1379level: low
1380---
1381title: Multiple Whoami
1382id: corr-001
1383correlation:
1384 type: event_count
1385 rules:
1386 - base-rule-001
1387 group-by:
1388 - User
1389 timespan: 60s
1390 condition:
1391 gte: 3
1392level: high
1393"#;
1394 let collection = parse_sigma_yaml(yaml).unwrap();
1395 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1396 engine.add_collection(&collection).unwrap();
1397
1398 assert_eq!(engine.detection_rule_count(), 1);
1399 assert_eq!(engine.correlation_rule_count(), 1);
1400
1401 let base_ts = 1000i64;
1403 for i in 0..3 {
1404 let v = json!({"CommandLine": "whoami", "User": "admin"});
1405 let event = Event::from_value(&v);
1406 let result = engine.process_event_at(&event, base_ts + i * 10);
1407
1408 assert_eq!(result.detections.len(), 1);
1410
1411 if i < 2 {
1412 assert!(result.correlations.is_empty());
1414 } else {
1415 assert_eq!(result.correlations.len(), 1);
1417 assert_eq!(result.correlations[0].rule_title, "Multiple Whoami");
1418 assert_eq!(result.correlations[0].aggregated_value, 3.0);
1419 }
1420 }
1421 }
1422
1423 #[test]
1424 fn test_event_count_different_groups() {
1425 let yaml = r#"
1426title: Login
1427id: login-001
1428logsource:
1429 category: auth
1430detection:
1431 selection:
1432 EventType: login
1433 condition: selection
1434level: low
1435---
1436title: Many Logins
1437id: corr-login
1438correlation:
1439 type: event_count
1440 rules:
1441 - login-001
1442 group-by:
1443 - User
1444 timespan: 60s
1445 condition:
1446 gte: 3
1447level: high
1448"#;
1449 let collection = parse_sigma_yaml(yaml).unwrap();
1450 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1451 engine.add_collection(&collection).unwrap();
1452
1453 let ts = 1000i64;
1455 for i in 0..2 {
1456 let v = json!({"EventType": "login", "User": "alice"});
1457 let event = Event::from_value(&v);
1458 let r = engine.process_event_at(&event, ts + i);
1459 assert!(r.correlations.is_empty());
1460 }
1461 for i in 0..3 {
1462 let v = json!({"EventType": "login", "User": "bob"});
1463 let event = Event::from_value(&v);
1464 let r = engine.process_event_at(&event, ts + i);
1465 if i == 2 {
1466 assert_eq!(r.correlations.len(), 1);
1467 assert_eq!(
1468 r.correlations[0].group_key,
1469 vec![("User".to_string(), "bob".to_string())]
1470 );
1471 }
1472 }
1473 }
1474
1475 #[test]
1476 fn test_event_count_window_expiry() {
1477 let yaml = r#"
1478title: Base
1479id: base-002
1480logsource:
1481 category: test
1482detection:
1483 selection:
1484 action: click
1485 condition: selection
1486---
1487title: Rapid Clicks
1488id: corr-002
1489correlation:
1490 type: event_count
1491 rules:
1492 - base-002
1493 group-by:
1494 - User
1495 timespan: 10s
1496 condition:
1497 gte: 3
1498level: medium
1499"#;
1500 let collection = parse_sigma_yaml(yaml).unwrap();
1501 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1502 engine.add_collection(&collection).unwrap();
1503
1504 let v = json!({"action": "click", "User": "admin"});
1506 let event = Event::from_value(&v);
1507 engine.process_event_at(&event, 0);
1508 engine.process_event_at(&event, 1);
1509 let r = engine.process_event_at(&event, 15);
1510 assert!(r.correlations.is_empty());
1512 }
1513
1514 #[test]
1519 fn test_value_count() {
1520 let yaml = r#"
1521title: Failed Login
1522id: failed-login-001
1523logsource:
1524 category: auth
1525detection:
1526 selection:
1527 EventType: failed_login
1528 condition: selection
1529level: low
1530---
1531title: Failed Logins From Many Users
1532id: corr-vc-001
1533correlation:
1534 type: value_count
1535 rules:
1536 - failed-login-001
1537 group-by:
1538 - Host
1539 timespan: 60s
1540 condition:
1541 field: User
1542 gte: 3
1543level: high
1544"#;
1545 let collection = parse_sigma_yaml(yaml).unwrap();
1546 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1547 engine.add_collection(&collection).unwrap();
1548
1549 let ts = 1000i64;
1550 for (i, user) in ["alice", "bob", "charlie"].iter().enumerate() {
1552 let v = json!({"EventType": "failed_login", "Host": "srv01", "User": user});
1553 let event = Event::from_value(&v);
1554 let r = engine.process_event_at(&event, ts + i as i64);
1555 if i == 2 {
1556 assert_eq!(r.correlations.len(), 1);
1557 assert_eq!(r.correlations[0].aggregated_value, 3.0);
1558 }
1559 }
1560 }
1561
1562 #[test]
1567 fn test_temporal() {
1568 let yaml = r#"
1569title: Recon A
1570id: recon-a
1571name: recon_a
1572logsource:
1573 category: process
1574detection:
1575 selection:
1576 CommandLine|contains: 'whoami'
1577 condition: selection
1578---
1579title: Recon B
1580id: recon-b
1581name: recon_b
1582logsource:
1583 category: process
1584detection:
1585 selection:
1586 CommandLine|contains: 'ipconfig'
1587 condition: selection
1588---
1589title: Recon Combo
1590id: corr-temporal
1591correlation:
1592 type: temporal
1593 rules:
1594 - recon-a
1595 - recon-b
1596 group-by:
1597 - User
1598 timespan: 60s
1599 condition:
1600 gte: 2
1601level: high
1602"#;
1603 let collection = parse_sigma_yaml(yaml).unwrap();
1604 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1605 engine.add_collection(&collection).unwrap();
1606
1607 let ts = 1000i64;
1608 let v1 = json!({"CommandLine": "whoami", "User": "admin"});
1610 let ev1 = Event::from_value(&v1);
1611 let r1 = engine.process_event_at(&ev1, ts);
1612 assert!(r1.correlations.is_empty());
1613
1614 let v2 = json!({"CommandLine": "ipconfig /all", "User": "admin"});
1616 let ev2 = Event::from_value(&v2);
1617 let r2 = engine.process_event_at(&ev2, ts + 10);
1618 assert_eq!(r2.correlations.len(), 1);
1619 assert_eq!(r2.correlations[0].rule_title, "Recon Combo");
1620 }
1621
1622 #[test]
1627 fn test_temporal_ordered() {
1628 let yaml = r#"
1629title: Failed Login
1630id: failed-001
1631name: failed_login
1632logsource:
1633 category: auth
1634detection:
1635 selection:
1636 EventType: failed_login
1637 condition: selection
1638---
1639title: Success Login
1640id: success-001
1641name: successful_login
1642logsource:
1643 category: auth
1644detection:
1645 selection:
1646 EventType: success_login
1647 condition: selection
1648---
1649title: Brute Force Then Login
1650id: corr-bf
1651correlation:
1652 type: temporal_ordered
1653 rules:
1654 - failed-001
1655 - success-001
1656 group-by:
1657 - User
1658 timespan: 60s
1659 condition:
1660 gte: 2
1661level: critical
1662"#;
1663 let collection = parse_sigma_yaml(yaml).unwrap();
1664 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1665 engine.add_collection(&collection).unwrap();
1666
1667 let ts = 1000i64;
1668 let v1 = json!({"EventType": "failed_login", "User": "admin"});
1670 let ev1 = Event::from_value(&v1);
1671 let r1 = engine.process_event_at(&ev1, ts);
1672 assert!(r1.correlations.is_empty());
1673
1674 let v2 = json!({"EventType": "success_login", "User": "admin"});
1676 let ev2 = Event::from_value(&v2);
1677 let r2 = engine.process_event_at(&ev2, ts + 10);
1678 assert_eq!(r2.correlations.len(), 1);
1679 }
1680
1681 #[test]
1682 fn test_temporal_ordered_wrong_order() {
1683 let yaml = r#"
1684title: Rule A
1685id: rule-a
1686logsource:
1687 category: test
1688detection:
1689 selection:
1690 type: a
1691 condition: selection
1692---
1693title: Rule B
1694id: rule-b
1695logsource:
1696 category: test
1697detection:
1698 selection:
1699 type: b
1700 condition: selection
1701---
1702title: A then B
1703id: corr-ab
1704correlation:
1705 type: temporal_ordered
1706 rules:
1707 - rule-a
1708 - rule-b
1709 group-by:
1710 - User
1711 timespan: 60s
1712 condition:
1713 gte: 2
1714level: high
1715"#;
1716 let collection = parse_sigma_yaml(yaml).unwrap();
1717 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1718 engine.add_collection(&collection).unwrap();
1719
1720 let ts = 1000i64;
1721 let v1 = json!({"type": "b", "User": "admin"});
1723 let ev1 = Event::from_value(&v1);
1724 engine.process_event_at(&ev1, ts);
1725
1726 let v2 = json!({"type": "a", "User": "admin"});
1727 let ev2 = Event::from_value(&v2);
1728 let r2 = engine.process_event_at(&ev2, ts + 10);
1729 assert!(r2.correlations.is_empty());
1730 }
1731
1732 #[test]
1737 fn test_value_sum() {
1738 let yaml = r#"
1739title: Web Access
1740id: web-001
1741logsource:
1742 category: web
1743detection:
1744 selection:
1745 action: upload
1746 condition: selection
1747---
1748title: Large Upload
1749id: corr-sum
1750correlation:
1751 type: value_sum
1752 rules:
1753 - web-001
1754 group-by:
1755 - User
1756 timespan: 60s
1757 condition:
1758 field: bytes_sent
1759 gt: 1000
1760level: high
1761"#;
1762 let collection = parse_sigma_yaml(yaml).unwrap();
1763 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1764 engine.add_collection(&collection).unwrap();
1765
1766 let ts = 1000i64;
1767 let v1 = json!({"action": "upload", "User": "alice", "bytes_sent": 600});
1768 let ev1 = Event::from_value(&v1);
1769 let r1 = engine.process_event_at(&ev1, ts);
1770 assert!(r1.correlations.is_empty());
1771
1772 let v2 = json!({"action": "upload", "User": "alice", "bytes_sent": 500});
1773 let ev2 = Event::from_value(&v2);
1774 let r2 = engine.process_event_at(&ev2, ts + 5);
1775 assert_eq!(r2.correlations.len(), 1);
1776 assert!((r2.correlations[0].aggregated_value - 1100.0).abs() < f64::EPSILON);
1777 }
1778
1779 #[test]
1780 fn test_value_avg() {
1781 let yaml = r#"
1782title: Request
1783id: req-001
1784logsource:
1785 category: web
1786detection:
1787 selection:
1788 type: request
1789 condition: selection
1790---
1791title: High Avg Latency
1792id: corr-avg
1793correlation:
1794 type: value_avg
1795 rules:
1796 - req-001
1797 group-by:
1798 - Service
1799 timespan: 60s
1800 condition:
1801 field: latency_ms
1802 gt: 500
1803level: medium
1804"#;
1805 let collection = parse_sigma_yaml(yaml).unwrap();
1806 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1807 engine.add_collection(&collection).unwrap();
1808
1809 let ts = 1000i64;
1810 for (i, latency) in [400, 600, 800].iter().enumerate() {
1812 let v = json!({"type": "request", "Service": "api", "latency_ms": latency});
1813 let event = Event::from_value(&v);
1814 let r = engine.process_event_at(&event, ts + i as i64);
1815 if i == 2 {
1816 assert_eq!(r.correlations.len(), 1);
1817 assert!((r.correlations[0].aggregated_value - 600.0).abs() < f64::EPSILON);
1818 }
1819 }
1820 }
1821
1822 #[test]
1827 fn test_state_count() {
1828 let yaml = r#"
1829title: Base
1830id: base-sc
1831logsource:
1832 category: test
1833detection:
1834 selection:
1835 action: test
1836 condition: selection
1837---
1838title: Count
1839id: corr-sc
1840correlation:
1841 type: event_count
1842 rules:
1843 - base-sc
1844 group-by:
1845 - User
1846 timespan: 60s
1847 condition:
1848 gte: 100
1849level: low
1850"#;
1851 let collection = parse_sigma_yaml(yaml).unwrap();
1852 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1853 engine.add_collection(&collection).unwrap();
1854
1855 let v = json!({"action": "test", "User": "alice"});
1856 let event = Event::from_value(&v);
1857 engine.process_event_at(&event, 1000);
1858 assert_eq!(engine.state_count(), 1);
1859
1860 let v2 = json!({"action": "test", "User": "bob"});
1861 let event2 = Event::from_value(&v2);
1862 engine.process_event_at(&event2, 1001);
1863 assert_eq!(engine.state_count(), 2);
1864
1865 engine.evict_expired(2000);
1867 assert_eq!(engine.state_count(), 0);
1868 }
1869
1870 #[test]
1875 fn test_generate_flag_default_false() {
1876 let yaml = r#"
1877title: Base
1878id: gen-base
1879logsource:
1880 category: test
1881detection:
1882 selection:
1883 action: test
1884 condition: selection
1885---
1886title: Correlation
1887id: gen-corr
1888correlation:
1889 type: event_count
1890 rules:
1891 - gen-base
1892 group-by:
1893 - User
1894 timespan: 60s
1895 condition:
1896 gte: 1
1897level: high
1898"#;
1899 let collection = parse_sigma_yaml(yaml).unwrap();
1900 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1901 engine.add_collection(&collection).unwrap();
1902
1903 let v = json!({"action": "test", "User": "alice"});
1906 let event = Event::from_value(&v);
1907 let r = engine.process_event_at(&event, 1000);
1908 assert_eq!(r.detections.len(), 1);
1909 assert_eq!(r.correlations.len(), 1);
1910 }
1911
1912 #[test]
1917 fn test_aws_bucket_enumeration() {
1918 let yaml = r#"
1919title: Potential Bucket Enumeration on AWS
1920id: f305fd62-beca-47da-ad95-7690a0620084
1921logsource:
1922 product: aws
1923 service: cloudtrail
1924detection:
1925 selection:
1926 eventSource: "s3.amazonaws.com"
1927 eventName: "ListBuckets"
1928 condition: selection
1929level: low
1930---
1931title: Multiple AWS bucket enumerations
1932id: be246094-01d3-4bba-88de-69e582eba0cc
1933status: experimental
1934correlation:
1935 type: event_count
1936 rules:
1937 - f305fd62-beca-47da-ad95-7690a0620084
1938 group-by:
1939 - userIdentity.arn
1940 timespan: 1h
1941 condition:
1942 gte: 5
1943level: high
1944"#;
1945 let collection = parse_sigma_yaml(yaml).unwrap();
1946 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1947 engine.add_collection(&collection).unwrap();
1948
1949 let base_ts = 1_700_000_000i64;
1950 for i in 0..5 {
1951 let v = json!({
1952 "eventSource": "s3.amazonaws.com",
1953 "eventName": "ListBuckets",
1954 "userIdentity.arn": "arn:aws:iam::123456789:user/attacker"
1955 });
1956 let event = Event::from_value(&v);
1957 let r = engine.process_event_at(&event, base_ts + i * 60);
1958 if i == 4 {
1959 assert_eq!(r.correlations.len(), 1);
1960 assert_eq!(
1961 r.correlations[0].rule_title,
1962 "Multiple AWS bucket enumerations"
1963 );
1964 assert_eq!(r.correlations[0].aggregated_value, 5.0);
1965 }
1966 }
1967 }
1968
1969 #[test]
1974 fn test_chaining_event_count_to_temporal() {
1975 let yaml = r#"
1978title: Single failed login
1979id: failed-login-chain
1980name: failed_login
1981logsource:
1982 category: auth
1983detection:
1984 selection:
1985 EventType: failed_login
1986 condition: selection
1987---
1988title: Successful login
1989id: success-login-chain
1990name: successful_login
1991logsource:
1992 category: auth
1993detection:
1994 selection:
1995 EventType: success_login
1996 condition: selection
1997---
1998title: Multiple failed logins
1999id: many-failed-chain
2000name: multiple_failed_login
2001correlation:
2002 type: event_count
2003 rules:
2004 - failed-login-chain
2005 group-by:
2006 - User
2007 timespan: 60s
2008 condition:
2009 gte: 3
2010level: medium
2011---
2012title: Brute Force Followed by Login
2013id: brute-force-chain
2014correlation:
2015 type: temporal_ordered
2016 rules:
2017 - many-failed-chain
2018 - success-login-chain
2019 group-by:
2020 - User
2021 timespan: 120s
2022 condition:
2023 gte: 2
2024level: critical
2025"#;
2026 let collection = parse_sigma_yaml(yaml).unwrap();
2027 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2028 engine.add_collection(&collection).unwrap();
2029
2030 assert_eq!(engine.detection_rule_count(), 2);
2031 assert_eq!(engine.correlation_rule_count(), 2);
2032
2033 let ts = 1000i64;
2034
2035 for i in 0..3 {
2037 let v = json!({"EventType": "failed_login", "User": "victim"});
2038 let event = Event::from_value(&v);
2039 let r = engine.process_event_at(&event, ts + i);
2040 if i == 2 {
2041 assert!(
2043 r.correlations
2044 .iter()
2045 .any(|c| c.rule_title == "Multiple failed logins"),
2046 "Expected event_count correlation to fire"
2047 );
2048 }
2049 }
2050
2051 let v = json!({"EventType": "success_login", "User": "victim"});
2058 let event = Event::from_value(&v);
2059 let r = engine.process_event_at(&event, ts + 30);
2060
2061 assert_eq!(r.detections.len(), 1);
2063 assert_eq!(r.detections[0].rule_title, "Successful login");
2064 }
2065
2066 #[test]
2071 fn test_field_aliases() {
2072 let yaml = r#"
2073title: Internal Error
2074id: internal-error-001
2075name: internal_error
2076logsource:
2077 category: web
2078detection:
2079 selection:
2080 http.response.status_code: 500
2081 condition: selection
2082---
2083title: New Connection
2084id: new-conn-001
2085name: new_network_connection
2086logsource:
2087 category: network
2088detection:
2089 selection:
2090 event.type: connection
2091 condition: selection
2092---
2093title: Error Then Connection
2094id: corr-alias
2095correlation:
2096 type: temporal
2097 rules:
2098 - internal-error-001
2099 - new-conn-001
2100 group-by:
2101 - internal_ip
2102 timespan: 60s
2103 condition:
2104 gte: 2
2105 aliases:
2106 internal_ip:
2107 internal_error: destination.ip
2108 new_network_connection: source.ip
2109level: high
2110"#;
2111 let collection = parse_sigma_yaml(yaml).unwrap();
2112 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2113 engine.add_collection(&collection).unwrap();
2114
2115 let ts = 1000i64;
2116
2117 let v1 = json!({
2119 "http.response.status_code": 500,
2120 "destination.ip": "10.0.0.5"
2121 });
2122 let ev1 = Event::from_value(&v1);
2123 let r1 = engine.process_event_at(&ev1, ts);
2124 assert_eq!(r1.detections.len(), 1);
2125 assert!(r1.correlations.is_empty());
2126
2127 let v2 = json!({
2129 "event.type": "connection",
2130 "source.ip": "10.0.0.5"
2131 });
2132 let ev2 = Event::from_value(&v2);
2133 let r2 = engine.process_event_at(&ev2, ts + 5);
2134 assert_eq!(r2.detections.len(), 1);
2135 assert_eq!(r2.correlations.len(), 1);
2137 assert_eq!(r2.correlations[0].rule_title, "Error Then Connection");
2138 assert!(
2140 r2.correlations[0]
2141 .group_key
2142 .iter()
2143 .any(|(k, v)| k == "internal_ip" && v == "10.0.0.5")
2144 );
2145 }
2146
2147 #[test]
2152 fn test_value_percentile() {
2153 let yaml = r#"
2154title: Process Creation
2155id: proc-001
2156logsource:
2157 category: process
2158detection:
2159 selection:
2160 type: process_creation
2161 condition: selection
2162---
2163title: Rare Process
2164id: corr-percentile
2165correlation:
2166 type: value_percentile
2167 rules:
2168 - proc-001
2169 group-by:
2170 - ComputerName
2171 timespan: 60s
2172 condition:
2173 field: image
2174 lte: 50
2175level: medium
2176"#;
2177 let collection = parse_sigma_yaml(yaml).unwrap();
2178 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2179 engine.add_collection(&collection).unwrap();
2180
2181 let ts = 1000i64;
2182 for (i, val) in [10.0, 20.0, 30.0, 40.0, 50.0].iter().enumerate() {
2184 let v = json!({"type": "process_creation", "ComputerName": "srv01", "image": val});
2185 let event = Event::from_value(&v);
2186 let _ = engine.process_event_at(&event, ts + i as i64);
2187 }
2188 }
2191
2192 #[test]
2197 fn test_extended_temporal_and_condition() {
2198 let yaml = r#"
2200title: Login Attempt
2201id: login-attempt
2202logsource:
2203 category: auth
2204detection:
2205 selection:
2206 EventType: login_failure
2207 condition: selection
2208---
2209title: Password Change
2210id: password-change
2211logsource:
2212 category: auth
2213detection:
2214 selection:
2215 EventType: password_change
2216 condition: selection
2217---
2218title: Credential Attack
2219correlation:
2220 type: temporal
2221 rules:
2222 - login-attempt
2223 - password-change
2224 group-by:
2225 - User
2226 timespan: 300s
2227 condition: login-attempt and password-change
2228level: high
2229"#;
2230 let collection = parse_sigma_yaml(yaml).unwrap();
2231 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2232 engine.add_collection(&collection).unwrap();
2233
2234 let ts = 1000i64;
2235
2236 let ev1 = json!({"EventType": "login_failure", "User": "alice"});
2238 let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2239 assert!(r1.correlations.is_empty(), "only one rule fired so far");
2240
2241 let ev2 = json!({"EventType": "password_change", "User": "alice"});
2243 let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 10);
2244 assert_eq!(
2245 r2.correlations.len(),
2246 1,
2247 "temporal correlation should fire: both rules matched"
2248 );
2249 assert_eq!(r2.correlations[0].rule_title, "Credential Attack");
2250 }
2251
2252 #[test]
2253 fn test_extended_temporal_or_condition() {
2254 let yaml = r#"
2256title: SSH Login
2257id: ssh-login
2258logsource:
2259 category: auth
2260detection:
2261 selection:
2262 EventType: ssh_login
2263 condition: selection
2264---
2265title: VPN Login
2266id: vpn-login
2267logsource:
2268 category: auth
2269detection:
2270 selection:
2271 EventType: vpn_login
2272 condition: selection
2273---
2274title: Any Remote Access
2275correlation:
2276 type: temporal
2277 rules:
2278 - ssh-login
2279 - vpn-login
2280 group-by:
2281 - User
2282 timespan: 60s
2283 condition: ssh-login or vpn-login
2284level: medium
2285"#;
2286 let collection = parse_sigma_yaml(yaml).unwrap();
2287 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2288 engine.add_collection(&collection).unwrap();
2289
2290 let ev = json!({"EventType": "ssh_login", "User": "bob"});
2292 let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2293 assert_eq!(r.correlations.len(), 1);
2294 assert_eq!(r.correlations[0].rule_title, "Any Remote Access");
2295 }
2296
2297 #[test]
2298 fn test_extended_temporal_partial_and_no_fire() {
2299 let yaml = r#"
2301title: Recon Step 1
2302id: recon-1
2303logsource:
2304 category: process
2305detection:
2306 selection:
2307 CommandLine|contains: 'whoami'
2308 condition: selection
2309---
2310title: Recon Step 2
2311id: recon-2
2312logsource:
2313 category: process
2314detection:
2315 selection:
2316 CommandLine|contains: 'ipconfig'
2317 condition: selection
2318---
2319title: Full Recon
2320correlation:
2321 type: temporal
2322 rules:
2323 - recon-1
2324 - recon-2
2325 group-by:
2326 - Host
2327 timespan: 120s
2328 condition: recon-1 and recon-2
2329level: high
2330"#;
2331 let collection = parse_sigma_yaml(yaml).unwrap();
2332 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2333 engine.add_collection(&collection).unwrap();
2334
2335 let ev = json!({"CommandLine": "whoami", "Host": "srv01"});
2337 let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2338 assert!(r.correlations.is_empty(), "only one of two AND rules fired");
2339
2340 let ev2 = json!({"CommandLine": "ipconfig /all", "Host": "srv01"});
2342 let r2 = engine.process_event_at(&Event::from_value(&ev2), 1010);
2343 assert_eq!(r2.correlations.len(), 1);
2344 assert_eq!(r2.correlations[0].rule_title, "Full Recon");
2345 }
2346
2347 #[test]
2352 fn test_filter_with_correlation() {
2353 let yaml = r#"
2355title: Failed Auth
2356id: failed-auth
2357logsource:
2358 category: auth
2359detection:
2360 selection:
2361 EventType: auth_failure
2362 condition: selection
2363---
2364title: Exclude Service Accounts
2365filter:
2366 rules:
2367 - failed-auth
2368 selection:
2369 User|startswith: 'svc_'
2370 condition: selection
2371---
2372title: Brute Force
2373correlation:
2374 type: event_count
2375 rules:
2376 - failed-auth
2377 group-by:
2378 - User
2379 timespan: 300s
2380 condition:
2381 gte: 3
2382level: critical
2383"#;
2384 let collection = parse_sigma_yaml(yaml).unwrap();
2385 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2386 engine.add_collection(&collection).unwrap();
2387
2388 let ts = 1000i64;
2389
2390 for i in 0..5 {
2392 let ev = json!({"EventType": "auth_failure", "User": "svc_backup"});
2393 let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2394 assert!(
2395 r.correlations.is_empty(),
2396 "service account should be filtered, no correlation"
2397 );
2398 }
2399
2400 for i in 0..2 {
2402 let ev = json!({"EventType": "auth_failure", "User": "alice"});
2403 let r = engine.process_event_at(&Event::from_value(&ev), ts + 10 + i);
2404 assert!(r.correlations.is_empty(), "not yet 3 events");
2405 }
2406
2407 let ev = json!({"EventType": "auth_failure", "User": "alice"});
2409 let r = engine.process_event_at(&Event::from_value(&ev), ts + 12);
2410 assert_eq!(r.correlations.len(), 1);
2411 assert_eq!(r.correlations[0].rule_title, "Brute Force");
2412 }
2413
2414 #[test]
2419 fn test_repeat_rules_in_correlation() {
2420 let yaml = r#"
2422title: File Access A
2423id: file-a
2424logsource:
2425 category: file_access
2426detection:
2427 selection:
2428 FileName|endswith: '.docx'
2429 condition: selection
2430---
2431action: repeat
2432title: File Access B
2433id: file-b
2434detection:
2435 selection:
2436 FileName|endswith: '.xlsx'
2437 condition: selection
2438---
2439title: Mass File Access
2440correlation:
2441 type: event_count
2442 rules:
2443 - file-a
2444 - file-b
2445 group-by:
2446 - User
2447 timespan: 60s
2448 condition:
2449 gte: 3
2450level: high
2451"#;
2452 let collection = parse_sigma_yaml(yaml).unwrap();
2453 assert_eq!(collection.rules.len(), 2);
2454 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2455 engine.add_collection(&collection).unwrap();
2456 assert_eq!(engine.detection_rule_count(), 2);
2457
2458 let ts = 1000i64;
2459 let ev1 = json!({"FileName": "report.docx", "User": "bob"});
2461 engine.process_event_at(&Event::from_value(&ev1), ts);
2462 let ev2 = json!({"FileName": "data.xlsx", "User": "bob"});
2463 engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2464 let ev3 = json!({"FileName": "notes.docx", "User": "bob"});
2465 let r = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2466
2467 assert_eq!(r.correlations.len(), 1);
2468 assert_eq!(r.correlations[0].rule_title, "Mass File Access");
2469 }
2470
2471 #[test]
2476 fn test_expand_modifier_with_correlation() {
2477 let yaml = r#"
2478title: User Temp File
2479id: user-temp
2480logsource:
2481 category: file_access
2482detection:
2483 selection:
2484 FilePath|expand: 'C:\Users\%User%\Temp'
2485 condition: selection
2486---
2487title: Excessive Temp Access
2488correlation:
2489 type: event_count
2490 rules:
2491 - user-temp
2492 group-by:
2493 - User
2494 timespan: 60s
2495 condition:
2496 gte: 2
2497level: medium
2498"#;
2499 let collection = parse_sigma_yaml(yaml).unwrap();
2500 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2501 engine.add_collection(&collection).unwrap();
2502
2503 let ts = 1000i64;
2504 let ev1 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2506 let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2507 assert!(r1.correlations.is_empty());
2508
2509 let ev2 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2510 let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2511 assert_eq!(r2.correlations.len(), 1);
2512 assert_eq!(r2.correlations[0].rule_title, "Excessive Temp Access");
2513
2514 let ev3 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "bob"});
2516 let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2517 assert_eq!(r3.detections.len(), 0);
2519 }
2520
2521 #[test]
2526 fn test_timestamp_modifier_with_correlation() {
2527 let yaml = r#"
2528title: Night Login
2529id: night-login
2530logsource:
2531 category: auth
2532detection:
2533 login:
2534 EventType: login
2535 night:
2536 Timestamp|hour: 3
2537 condition: login and night
2538---
2539title: Frequent Night Logins
2540correlation:
2541 type: event_count
2542 rules:
2543 - night-login
2544 group-by:
2545 - User
2546 timespan: 3600s
2547 condition:
2548 gte: 2
2549level: high
2550"#;
2551 let collection = parse_sigma_yaml(yaml).unwrap();
2552 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2553 engine.add_collection(&collection).unwrap();
2554
2555 let ts = 1000i64;
2556 let ev1 =
2558 json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:10:00Z"});
2559 let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2560 assert_eq!(r1.detections.len(), 1);
2561 assert!(r1.correlations.is_empty());
2562
2563 let ev2 =
2564 json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:45:00Z"});
2565 let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2566 assert_eq!(r2.correlations.len(), 1);
2567 assert_eq!(r2.correlations[0].rule_title, "Frequent Night Logins");
2568
2569 let ev3 = json!({"EventType": "login", "User": "bob", "Timestamp": "2024-01-15T12:00:00Z"});
2571 let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2572 assert!(
2573 r3.detections.is_empty(),
2574 "noon login should not match night rule"
2575 );
2576 }
2577
2578 #[test]
2583 fn test_event_count_range_condition() {
2584 let yaml = r#"
2585title: Login Attempt
2586id: login-attempt-001
2587name: login_attempt
2588logsource:
2589 product: windows
2590detection:
2591 selection:
2592 EventType: login
2593 condition: selection
2594level: low
2595---
2596title: Login Count Range
2597id: corr-range-001
2598correlation:
2599 type: event_count
2600 rules:
2601 - login-attempt-001
2602 group-by:
2603 - User
2604 timespan: 3600s
2605 condition:
2606 gt: 2
2607 lte: 5
2608level: high
2609"#;
2610 let collection = parse_sigma_yaml(yaml).unwrap();
2611 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2612 engine.add_collection(&collection).unwrap();
2613
2614 let ts: i64 = 1_000_000;
2615
2616 for i in 0..2 {
2618 let ev = json!({"EventType": "login", "User": "alice"});
2619 let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2620 assert!(r.correlations.is_empty(), "2 events should not fire (gt:2)");
2621 }
2622
2623 let ev3 = json!({"EventType": "login", "User": "alice"});
2625 let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 3);
2626 assert_eq!(r3.correlations.len(), 1, "3 events: gt:2 AND lte:5");
2627
2628 for i in 4..=5 {
2630 let ev = json!({"EventType": "login", "User": "alice"});
2631 let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2632 assert_eq!(r.correlations.len(), 1, "{i} events still in range");
2633 }
2634
2635 let ev6 = json!({"EventType": "login", "User": "alice"});
2637 let r6 = engine.process_event_at(&Event::from_value(&ev6), ts + 6);
2638 assert!(
2639 r6.correlations.is_empty(),
2640 "6 events exceeds lte:5, should not fire"
2641 );
2642 }
2643
2644 fn suppression_yaml() -> &'static str {
2649 r#"
2650title: Login
2651id: login-base
2652logsource:
2653 category: auth
2654detection:
2655 selection:
2656 EventType: login
2657 condition: selection
2658---
2659title: Many Logins
2660correlation:
2661 type: event_count
2662 rules:
2663 - login-base
2664 group-by:
2665 - User
2666 timeframe: 60s
2667 condition:
2668 gte: 3
2669level: high
2670"#
2671 }
2672
2673 #[test]
2674 fn test_suppression_window() {
2675 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2676 let config = CorrelationConfig {
2677 suppress: Some(10), ..Default::default()
2679 };
2680 let mut engine = CorrelationEngine::new(config);
2681 engine.add_collection(&collection).unwrap();
2682
2683 let ev = json!({"EventType": "login", "User": "alice"});
2684 let ts = 1000;
2685
2686 engine.process_event_at(&Event::from_value(&ev), ts);
2688 engine.process_event_at(&Event::from_value(&ev), ts + 1);
2689 let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
2690 assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
2691
2692 let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
2694 assert!(
2695 r4.correlations.is_empty(),
2696 "should be suppressed within 10s window"
2697 );
2698
2699 let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 9);
2701 assert!(
2702 r5.correlations.is_empty(),
2703 "should be suppressed at ts+9 (< ts+2+10)"
2704 );
2705
2706 let r6 = engine.process_event_at(&Event::from_value(&ev), ts + 13);
2708 assert_eq!(
2709 r6.correlations.len(),
2710 1,
2711 "should fire again after suppress window expires"
2712 );
2713 }
2714
2715 #[test]
2716 fn test_suppression_per_group_key() {
2717 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2718 let config = CorrelationConfig {
2719 suppress: Some(60),
2720 ..Default::default()
2721 };
2722 let mut engine = CorrelationEngine::new(config);
2723 engine.add_collection(&collection).unwrap();
2724
2725 let ts = 1000;
2726
2727 let ev_a = json!({"EventType": "login", "User": "alice"});
2729 engine.process_event_at(&Event::from_value(&ev_a), ts);
2730 engine.process_event_at(&Event::from_value(&ev_a), ts + 1);
2731 let r = engine.process_event_at(&Event::from_value(&ev_a), ts + 2);
2732 assert_eq!(r.correlations.len(), 1, "alice should fire");
2733
2734 let ev_b = json!({"EventType": "login", "User": "bob"});
2736 engine.process_event_at(&Event::from_value(&ev_b), ts + 3);
2737 engine.process_event_at(&Event::from_value(&ev_b), ts + 4);
2738 let r = engine.process_event_at(&Event::from_value(&ev_b), ts + 5);
2739 assert_eq!(r.correlations.len(), 1, "bob should fire independently");
2740
2741 let r = engine.process_event_at(&Event::from_value(&ev_a), ts + 6);
2743 assert!(r.correlations.is_empty(), "alice still suppressed");
2744 }
2745
2746 #[test]
2751 fn test_action_reset() {
2752 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2753 let config = CorrelationConfig {
2754 action_on_match: CorrelationAction::Reset,
2755 ..Default::default()
2756 };
2757 let mut engine = CorrelationEngine::new(config);
2758 engine.add_collection(&collection).unwrap();
2759
2760 let ev = json!({"EventType": "login", "User": "alice"});
2761 let ts = 1000;
2762
2763 engine.process_event_at(&Event::from_value(&ev), ts);
2765 engine.process_event_at(&Event::from_value(&ev), ts + 1);
2766 let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
2767 assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
2768
2769 let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
2771 assert!(r4.correlations.is_empty(), "reset: need 3 more events");
2772
2773 let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 4);
2774 assert!(r5.correlations.is_empty(), "reset: still only 2");
2775
2776 let r6 = engine.process_event_at(&Event::from_value(&ev), ts + 5);
2778 assert_eq!(
2779 r6.correlations.len(),
2780 1,
2781 "should fire again after 3 events post-reset"
2782 );
2783 }
2784
2785 #[test]
2790 fn test_emit_detections_true_by_default() {
2791 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2792 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2793 engine.add_collection(&collection).unwrap();
2794
2795 let ev = json!({"EventType": "login", "User": "alice"});
2796 let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2797 assert_eq!(r.detections.len(), 1, "by default detections are emitted");
2798 }
2799
2800 #[test]
2801 fn test_emit_detections_false_suppresses() {
2802 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2803 let config = CorrelationConfig {
2804 emit_detections: false,
2805 ..Default::default()
2806 };
2807 let mut engine = CorrelationEngine::new(config);
2808 engine.add_collection(&collection).unwrap();
2809
2810 let ev = json!({"EventType": "login", "User": "alice"});
2811 let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2812 assert!(
2813 r.detections.is_empty(),
2814 "detection matches should be suppressed when emit_detections=false"
2815 );
2816 }
2817
2818 #[test]
2819 fn test_generate_true_keeps_detections() {
2820 let yaml = r#"
2822title: Login
2823id: login-gen
2824logsource:
2825 category: auth
2826detection:
2827 selection:
2828 EventType: login
2829 condition: selection
2830---
2831title: Many Logins
2832correlation:
2833 type: event_count
2834 rules:
2835 - login-gen
2836 group-by:
2837 - User
2838 timeframe: 60s
2839 condition:
2840 gte: 3
2841 generate: true
2842level: high
2843"#;
2844 let collection = parse_sigma_yaml(yaml).unwrap();
2845 let config = CorrelationConfig {
2846 emit_detections: false,
2847 ..Default::default()
2848 };
2849 let mut engine = CorrelationEngine::new(config);
2850 engine.add_collection(&collection).unwrap();
2851
2852 let ev = json!({"EventType": "login", "User": "alice"});
2853 let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2854 assert_eq!(
2856 r.detections.len(),
2857 1,
2858 "generate:true keeps detection output"
2859 );
2860 }
2861
2862 #[test]
2867 fn test_suppress_and_reset_combined() {
2868 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2869 let config = CorrelationConfig {
2870 suppress: Some(5),
2871 action_on_match: CorrelationAction::Reset,
2872 ..Default::default()
2873 };
2874 let mut engine = CorrelationEngine::new(config);
2875 engine.add_collection(&collection).unwrap();
2876
2877 let ev = json!({"EventType": "login", "User": "alice"});
2878 let ts = 1000;
2879
2880 engine.process_event_at(&Event::from_value(&ev), ts);
2882 engine.process_event_at(&Event::from_value(&ev), ts + 1);
2883 let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
2884 assert_eq!(r3.correlations.len(), 1, "fires on 3rd event");
2885
2886 engine.process_event_at(&Event::from_value(&ev), ts + 3);
2889 engine.process_event_at(&Event::from_value(&ev), ts + 4);
2890 let r = engine.process_event_at(&Event::from_value(&ev), ts + 5);
2891 assert!(
2892 r.correlations.is_empty(),
2893 "threshold met again but still suppressed"
2894 );
2895
2896 let r = engine.process_event_at(&Event::from_value(&ev), ts + 8);
2900 assert_eq!(
2901 r.correlations.len(),
2902 1,
2903 "fires after suppress expires (accumulated events + new one)"
2904 );
2905
2906 engine.process_event_at(&Event::from_value(&ev), ts + 9);
2909 engine.process_event_at(&Event::from_value(&ev), ts + 10);
2910 let r = engine.process_event_at(&Event::from_value(&ev), ts + 11);
2911 assert!(
2912 r.correlations.is_empty(),
2913 "threshold met but suppress window hasn't expired (ts+11 - ts+8 = 3 < 5)"
2914 );
2915 }
2916
2917 #[test]
2922 fn test_no_suppression_fires_every_event() {
2923 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2924 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2925 engine.add_collection(&collection).unwrap();
2926
2927 let ev = json!({"EventType": "login", "User": "alice"});
2928 let ts = 1000;
2929
2930 engine.process_event_at(&Event::from_value(&ev), ts);
2931 engine.process_event_at(&Event::from_value(&ev), ts + 1);
2932 let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
2933 assert_eq!(r3.correlations.len(), 1);
2934
2935 let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
2937 assert_eq!(
2938 r4.correlations.len(),
2939 1,
2940 "no suppression: fires on every event after threshold"
2941 );
2942
2943 let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 4);
2944 assert_eq!(r5.correlations.len(), 1, "still fires");
2945 }
2946
2947 #[test]
2952 fn test_custom_attr_timestamp_field() {
2953 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2954 let mut attrs = std::collections::HashMap::new();
2955 attrs.insert("rsigma.timestamp_field".to_string(), "time".to_string());
2956 engine.apply_custom_attributes(&attrs);
2957
2958 assert_eq!(
2959 engine.config.timestamp_fields[0], "time",
2960 "rsigma.timestamp_field should be prepended"
2961 );
2962 assert!(
2964 engine
2965 .config
2966 .timestamp_fields
2967 .contains(&"@timestamp".to_string())
2968 );
2969 }
2970
2971 #[test]
2972 fn test_custom_attr_timestamp_field_no_duplicates() {
2973 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2974 let mut attrs = std::collections::HashMap::new();
2975 attrs.insert("rsigma.timestamp_field".to_string(), "time".to_string());
2976 engine.apply_custom_attributes(&attrs);
2978 engine.apply_custom_attributes(&attrs);
2979
2980 let count = engine
2981 .config
2982 .timestamp_fields
2983 .iter()
2984 .filter(|f| *f == "time")
2985 .count();
2986 assert_eq!(count, 1, "should not duplicate timestamp_field entries");
2987 }
2988
2989 #[test]
2990 fn test_custom_attr_suppress() {
2991 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2992 assert!(engine.config.suppress.is_none());
2993
2994 let mut attrs = std::collections::HashMap::new();
2995 attrs.insert("rsigma.suppress".to_string(), "5m".to_string());
2996 engine.apply_custom_attributes(&attrs);
2997
2998 assert_eq!(engine.config.suppress, Some(300));
2999 }
3000
3001 #[test]
3002 fn test_custom_attr_suppress_does_not_override_cli() {
3003 let config = CorrelationConfig {
3004 suppress: Some(60), ..Default::default()
3006 };
3007 let mut engine = CorrelationEngine::new(config);
3008
3009 let mut attrs = std::collections::HashMap::new();
3010 attrs.insert("rsigma.suppress".to_string(), "5m".to_string());
3011 engine.apply_custom_attributes(&attrs);
3012
3013 assert_eq!(
3014 engine.config.suppress,
3015 Some(60),
3016 "CLI suppress should not be overridden by custom attribute"
3017 );
3018 }
3019
3020 #[test]
3021 fn test_custom_attr_action() {
3022 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3023 assert_eq!(engine.config.action_on_match, CorrelationAction::Alert);
3024
3025 let mut attrs = std::collections::HashMap::new();
3026 attrs.insert("rsigma.action".to_string(), "reset".to_string());
3027 engine.apply_custom_attributes(&attrs);
3028
3029 assert_eq!(engine.config.action_on_match, CorrelationAction::Reset);
3030 }
3031
3032 #[test]
3033 fn test_custom_attr_action_does_not_override_cli() {
3034 let config = CorrelationConfig {
3035 action_on_match: CorrelationAction::Reset, ..Default::default()
3037 };
3038 let mut engine = CorrelationEngine::new(config);
3039
3040 let mut attrs = std::collections::HashMap::new();
3041 attrs.insert("rsigma.action".to_string(), "alert".to_string());
3042 engine.apply_custom_attributes(&attrs);
3043
3044 assert_eq!(
3045 engine.config.action_on_match,
3046 CorrelationAction::Reset,
3047 "CLI action should not be overridden by custom attribute"
3048 );
3049 }
3050
3051 #[test]
3052 fn test_custom_attr_timestamp_field_used_for_extraction() {
3053 let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3055 let mut config = CorrelationConfig::default();
3056 config.timestamp_fields.insert(0, "event_time".to_string());
3058 let mut engine = CorrelationEngine::new(config);
3059 engine.add_collection(&collection).unwrap();
3060
3061 let ev = json!({
3063 "EventType": "login",
3064 "User": "alice",
3065 "event_time": "2026-02-11T12:00:00Z"
3066 });
3067 let result = engine.process_event(&Event::from_value(&ev));
3068
3069 assert!(!result.detections.is_empty() || result.correlations.is_empty());
3071 let ts = engine
3075 .extract_event_timestamp(&Event::from_value(&ev))
3076 .expect("should extract timestamp");
3077 assert!(
3078 ts > 1_700_000_000 && ts < 1_800_000_000,
3079 "timestamp should be ~2026 epoch, got {ts}"
3080 );
3081 }
3082
3083 #[test]
3088 fn test_correlation_cycle_direct() {
3089 let yaml = r#"
3091title: detection rule
3092id: det-rule
3093logsource:
3094 product: test
3095detection:
3096 selection:
3097 action: click
3098 condition: selection
3099level: low
3100---
3101title: correlation A
3102id: corr-a
3103correlation:
3104 type: event_count
3105 rules:
3106 - corr-b
3107 group-by:
3108 - User
3109 timespan: 5m
3110 condition:
3111 gte: 2
3112level: high
3113---
3114title: correlation B
3115id: corr-b
3116correlation:
3117 type: event_count
3118 rules:
3119 - corr-a
3120 group-by:
3121 - User
3122 timespan: 5m
3123 condition:
3124 gte: 2
3125level: high
3126"#;
3127 let collection = parse_sigma_yaml(yaml).unwrap();
3128 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3129 let result = engine.add_collection(&collection);
3130 assert!(result.is_err(), "should detect direct cycle");
3131 let err = result.unwrap_err().to_string();
3132 assert!(err.contains("cycle"), "error should mention cycle: {err}");
3133 assert!(
3134 err.contains("corr-a") && err.contains("corr-b"),
3135 "error should name both correlations: {err}"
3136 );
3137 }
3138
3139 #[test]
3140 fn test_correlation_cycle_self() {
3141 let yaml = r#"
3143title: detection rule
3144id: det-rule
3145logsource:
3146 product: test
3147detection:
3148 selection:
3149 action: click
3150 condition: selection
3151level: low
3152---
3153title: self-ref correlation
3154id: self-corr
3155correlation:
3156 type: event_count
3157 rules:
3158 - self-corr
3159 group-by:
3160 - User
3161 timespan: 5m
3162 condition:
3163 gte: 2
3164level: high
3165"#;
3166 let collection = parse_sigma_yaml(yaml).unwrap();
3167 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3168 let result = engine.add_collection(&collection);
3169 assert!(result.is_err(), "should detect self-referencing cycle");
3170 let err = result.unwrap_err().to_string();
3171 assert!(err.contains("cycle"), "error should mention cycle: {err}");
3172 assert!(
3173 err.contains("self-corr"),
3174 "error should name the correlation: {err}"
3175 );
3176 }
3177
3178 #[test]
3179 fn test_correlation_no_cycle_valid_chain() {
3180 let yaml = r#"
3182title: detection rule
3183id: det-rule
3184logsource:
3185 product: test
3186detection:
3187 selection:
3188 action: click
3189 condition: selection
3190level: low
3191---
3192title: correlation A
3193id: corr-a
3194correlation:
3195 type: event_count
3196 rules:
3197 - det-rule
3198 group-by:
3199 - User
3200 timespan: 5m
3201 condition:
3202 gte: 2
3203level: high
3204---
3205title: correlation B
3206id: corr-b
3207correlation:
3208 type: event_count
3209 rules:
3210 - corr-a
3211 group-by:
3212 - User
3213 timespan: 5m
3214 condition:
3215 gte: 2
3216level: high
3217"#;
3218 let collection = parse_sigma_yaml(yaml).unwrap();
3219 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3220 let result = engine.add_collection(&collection);
3221 assert!(
3222 result.is_ok(),
3223 "valid chain should not be rejected: {result:?}"
3224 );
3225 }
3226
3227 #[test]
3228 fn test_correlation_cycle_transitive() {
3229 let yaml = r#"
3231title: detection rule
3232id: det-rule
3233logsource:
3234 product: test
3235detection:
3236 selection:
3237 action: click
3238 condition: selection
3239level: low
3240---
3241title: correlation A
3242id: corr-a
3243correlation:
3244 type: event_count
3245 rules:
3246 - corr-c
3247 group-by:
3248 - User
3249 timespan: 5m
3250 condition:
3251 gte: 2
3252level: high
3253---
3254title: correlation B
3255id: corr-b
3256correlation:
3257 type: event_count
3258 rules:
3259 - corr-a
3260 group-by:
3261 - User
3262 timespan: 5m
3263 condition:
3264 gte: 2
3265level: high
3266---
3267title: correlation C
3268id: corr-c
3269correlation:
3270 type: event_count
3271 rules:
3272 - corr-b
3273 group-by:
3274 - User
3275 timespan: 5m
3276 condition:
3277 gte: 2
3278level: high
3279"#;
3280 let collection = parse_sigma_yaml(yaml).unwrap();
3281 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3282 let result = engine.add_collection(&collection);
3283 assert!(result.is_err(), "should detect transitive cycle");
3284 let err = result.unwrap_err().to_string();
3285 assert!(err.contains("cycle"), "error should mention cycle: {err}");
3286 }
3287
3288 #[test]
3293 fn test_correlation_events_disabled_by_default() {
3294 let yaml = r#"
3295title: Login
3296id: login-rule
3297logsource:
3298 category: auth
3299detection:
3300 selection:
3301 EventType: login
3302 condition: selection
3303---
3304title: Many Logins
3305correlation:
3306 type: event_count
3307 rules:
3308 - login-rule
3309 group-by:
3310 - User
3311 timespan: 60s
3312 condition:
3313 gte: 3
3314level: high
3315"#;
3316 let collection = parse_sigma_yaml(yaml).unwrap();
3317 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3318 engine.add_collection(&collection).unwrap();
3319
3320 for i in 0..3 {
3321 let v = json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i});
3322 let event = Event::from_value(&v);
3323 let result = engine.process_event_at(&event, 1000 + i);
3324 if i == 2 {
3325 assert_eq!(result.correlations.len(), 1);
3326 assert!(result.correlations[0].events.is_none());
3328 }
3329 }
3330 }
3331
3332 #[test]
3333 fn test_correlation_events_included_when_enabled() {
3334 let yaml = r#"
3335title: Login
3336id: login-rule
3337logsource:
3338 category: auth
3339detection:
3340 selection:
3341 EventType: login
3342 condition: selection
3343---
3344title: Many Logins
3345correlation:
3346 type: event_count
3347 rules:
3348 - login-rule
3349 group-by:
3350 - User
3351 timespan: 60s
3352 condition:
3353 gte: 3
3354level: high
3355"#;
3356 let collection = parse_sigma_yaml(yaml).unwrap();
3357 let config = CorrelationConfig {
3358 correlation_event_mode: CorrelationEventMode::Full,
3359 max_correlation_events: 10,
3360 ..Default::default()
3361 };
3362 let mut engine = CorrelationEngine::new(config);
3363 engine.add_collection(&collection).unwrap();
3364
3365 let events_sent: Vec<serde_json::Value> = (0..3)
3366 .map(|i| json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i}))
3367 .collect();
3368
3369 let mut corr_result = None;
3370 for (i, ev) in events_sent.iter().enumerate() {
3371 let event = Event::from_value(ev);
3372 let result = engine.process_event_at(&event, 1000 + i as i64);
3373 if !result.correlations.is_empty() {
3374 corr_result = Some(result);
3375 }
3376 }
3377
3378 let result = corr_result.expect("correlation should have fired");
3379 let corr = &result.correlations[0];
3380
3381 let events = corr.events.as_ref().expect("events should be present");
3383 assert_eq!(
3384 events.len(),
3385 3,
3386 "all 3 contributing events should be stored"
3387 );
3388
3389 for (i, event) in events.iter().enumerate() {
3391 assert_eq!(event["EventType"], "login");
3392 assert_eq!(event["User"], "admin");
3393 assert_eq!(event["@timestamp"], 1000 + i as i64);
3394 }
3395 }
3396
3397 #[test]
3398 fn test_correlation_events_max_cap() {
3399 let yaml = r#"
3400title: Login
3401id: login-rule
3402logsource:
3403 category: auth
3404detection:
3405 selection:
3406 EventType: login
3407 condition: selection
3408---
3409title: Many Logins
3410correlation:
3411 type: event_count
3412 rules:
3413 - login-rule
3414 group-by:
3415 - User
3416 timespan: 60s
3417 condition:
3418 gte: 5
3419level: high
3420"#;
3421 let collection = parse_sigma_yaml(yaml).unwrap();
3422 let config = CorrelationConfig {
3423 correlation_event_mode: CorrelationEventMode::Full,
3424 max_correlation_events: 3, ..Default::default()
3426 };
3427 let mut engine = CorrelationEngine::new(config);
3428 engine.add_collection(&collection).unwrap();
3429
3430 let mut corr_result = None;
3431 for i in 0..5 {
3432 let v = json!({"EventType": "login", "User": "admin", "idx": i});
3433 let event = Event::from_value(&v);
3434 let result = engine.process_event_at(&event, 1000 + i);
3435 if !result.correlations.is_empty() {
3436 corr_result = Some(result);
3437 }
3438 }
3439
3440 let result = corr_result.expect("correlation should have fired");
3441 let events = result.correlations[0]
3442 .events
3443 .as_ref()
3444 .expect("events should be present");
3445
3446 assert_eq!(events.len(), 3);
3448 assert_eq!(events[0]["idx"], 2);
3449 assert_eq!(events[1]["idx"], 3);
3450 assert_eq!(events[2]["idx"], 4);
3451 }
3452
3453 #[test]
3454 fn test_correlation_events_with_reset_action() {
3455 let yaml = r#"
3456title: Login
3457id: login-rule
3458logsource:
3459 category: auth
3460detection:
3461 selection:
3462 EventType: login
3463 condition: selection
3464---
3465title: Many Logins
3466correlation:
3467 type: event_count
3468 rules:
3469 - login-rule
3470 group-by:
3471 - User
3472 timespan: 60s
3473 condition:
3474 gte: 2
3475level: high
3476"#;
3477 let collection = parse_sigma_yaml(yaml).unwrap();
3478 let config = CorrelationConfig {
3479 correlation_event_mode: CorrelationEventMode::Full,
3480 action_on_match: CorrelationAction::Reset,
3481 ..Default::default()
3482 };
3483 let mut engine = CorrelationEngine::new(config);
3484 engine.add_collection(&collection).unwrap();
3485
3486 for i in 0..2 {
3488 let v = json!({"EventType": "login", "User": "admin", "round": 1, "idx": i});
3489 let event = Event::from_value(&v);
3490 let result = engine.process_event_at(&event, 1000 + i);
3491 if i == 1 {
3492 assert_eq!(result.correlations.len(), 1);
3493 let events = result.correlations[0].events.as_ref().unwrap();
3494 assert_eq!(events.len(), 2);
3495 }
3496 }
3497
3498 let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 0});
3501 let event = Event::from_value(&v);
3502 let result = engine.process_event_at(&event, 1010);
3503 assert!(
3504 result.correlations.is_empty(),
3505 "should not fire with only 1 event after reset"
3506 );
3507
3508 let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 1});
3509 let event = Event::from_value(&v);
3510 let result = engine.process_event_at(&event, 1011);
3511 assert_eq!(result.correlations.len(), 1);
3512 let events = result.correlations[0].events.as_ref().unwrap();
3513 assert_eq!(events.len(), 2);
3514 assert_eq!(events[0]["round"], 2);
3516 assert_eq!(events[1]["round"], 2);
3517 }
3518
3519 #[test]
3520 fn test_correlation_events_with_set_include() {
3521 let yaml = r#"
3522title: Login
3523id: login-rule
3524logsource:
3525 category: auth
3526detection:
3527 selection:
3528 EventType: login
3529 condition: selection
3530---
3531title: Many Logins
3532correlation:
3533 type: event_count
3534 rules:
3535 - login-rule
3536 group-by:
3537 - User
3538 timespan: 60s
3539 condition:
3540 gte: 2
3541level: high
3542"#;
3543 let collection = parse_sigma_yaml(yaml).unwrap();
3544 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3545 engine.add_collection(&collection).unwrap();
3546
3547 engine.set_correlation_event_mode(CorrelationEventMode::Full);
3549
3550 for i in 0..2 {
3551 let v = json!({"EventType": "login", "User": "admin"});
3552 let event = Event::from_value(&v);
3553 let result = engine.process_event_at(&event, 1000 + i);
3554 if i == 1 {
3555 assert_eq!(result.correlations.len(), 1);
3556 assert!(result.correlations[0].events.is_some());
3557 assert_eq!(result.correlations[0].events.as_ref().unwrap().len(), 2);
3558 }
3559 }
3560 }
3561
3562 #[test]
3563 fn test_correlation_events_eviction_syncs_with_window() {
3564 let yaml = r#"
3565title: Login
3566id: login-rule
3567logsource:
3568 category: auth
3569detection:
3570 selection:
3571 EventType: login
3572 condition: selection
3573---
3574title: Many Logins
3575correlation:
3576 type: event_count
3577 rules:
3578 - login-rule
3579 group-by:
3580 - User
3581 timespan: 10s
3582 condition:
3583 gte: 3
3584level: high
3585"#;
3586 let collection = parse_sigma_yaml(yaml).unwrap();
3587 let config = CorrelationConfig {
3588 correlation_event_mode: CorrelationEventMode::Full,
3589 max_correlation_events: 100,
3590 ..Default::default()
3591 };
3592 let mut engine = CorrelationEngine::new(config);
3593 engine.add_collection(&collection).unwrap();
3594
3595 for i in 0..2 {
3597 let v = json!({"EventType": "login", "User": "admin", "idx": i});
3598 let event = Event::from_value(&v);
3599 engine.process_event_at(&event, 1000 + i);
3600 }
3601
3602 let v = json!({"EventType": "login", "User": "admin", "idx": 2});
3605 let event = Event::from_value(&v);
3606 let result = engine.process_event_at(&event, 1015);
3607 assert!(
3609 result.correlations.is_empty(),
3610 "should not fire — old events evicted"
3611 );
3612
3613 for i in 3..5 {
3615 let v = json!({"EventType": "login", "User": "admin", "idx": i});
3616 let event = Event::from_value(&v);
3617 let result = engine.process_event_at(&event, 1016 + i - 3);
3618 if i == 4 {
3619 assert_eq!(result.correlations.len(), 1);
3620 let events = result.correlations[0].events.as_ref().unwrap();
3621 assert_eq!(events.len(), 3);
3623 for ev in events {
3624 assert!(ev["idx"].as_i64().unwrap() >= 2);
3625 }
3626 }
3627 }
3628 }
3629
3630 #[test]
3631 fn test_event_buffer_monitoring() {
3632 let yaml = r#"
3633title: Login
3634id: login-rule
3635logsource:
3636 category: auth
3637detection:
3638 selection:
3639 EventType: login
3640 condition: selection
3641---
3642title: Many Logins
3643correlation:
3644 type: event_count
3645 rules:
3646 - login-rule
3647 group-by:
3648 - User
3649 timespan: 60s
3650 condition:
3651 gte: 100
3652level: high
3653"#;
3654 let collection = parse_sigma_yaml(yaml).unwrap();
3655 let config = CorrelationConfig {
3656 correlation_event_mode: CorrelationEventMode::Full,
3657 ..Default::default()
3658 };
3659 let mut engine = CorrelationEngine::new(config);
3660 engine.add_collection(&collection).unwrap();
3661
3662 assert_eq!(engine.event_buffer_count(), 0);
3663 assert_eq!(engine.event_buffer_bytes(), 0);
3664
3665 for i in 0..5 {
3667 let v = json!({"EventType": "login", "User": "admin"});
3668 let event = Event::from_value(&v);
3669 engine.process_event_at(&event, 1000 + i);
3670 }
3671
3672 assert_eq!(engine.event_buffer_count(), 1); assert!(engine.event_buffer_bytes() > 0);
3674 }
3675
3676 #[test]
3677 fn test_correlation_refs_mode_basic() {
3678 let yaml = r#"
3679title: Login
3680id: login-rule
3681logsource:
3682 category: auth
3683detection:
3684 selection:
3685 EventType: login
3686 condition: selection
3687---
3688title: Many Logins
3689correlation:
3690 type: event_count
3691 rules:
3692 - login-rule
3693 group-by:
3694 - User
3695 timespan: 60s
3696 condition:
3697 gte: 3
3698level: high
3699"#;
3700 let collection = parse_sigma_yaml(yaml).unwrap();
3701 let config = CorrelationConfig {
3702 correlation_event_mode: CorrelationEventMode::Refs,
3703 max_correlation_events: 10,
3704 ..Default::default()
3705 };
3706 let mut engine = CorrelationEngine::new(config);
3707 engine.add_collection(&collection).unwrap();
3708
3709 let mut corr_result = None;
3710 for i in 0..3 {
3711 let v = json!({"EventType": "login", "User": "admin", "id": format!("evt-{i}"), "@timestamp": 1000 + i});
3712 let event = Event::from_value(&v);
3713 let result = engine.process_event_at(&event, 1000 + i);
3714 if !result.correlations.is_empty() {
3715 corr_result = Some(result.correlations[0].clone());
3716 }
3717 }
3718
3719 let result = corr_result.expect("correlation should have fired");
3720 assert!(
3722 result.events.is_none(),
3723 "Full events should not be included in refs mode"
3724 );
3725 let refs = result
3726 .event_refs
3727 .expect("event_refs should be present in refs mode");
3728 assert_eq!(refs.len(), 3);
3729 assert_eq!(refs[0].timestamp, 1000);
3730 assert_eq!(refs[0].id, Some("evt-0".to_string()));
3731 assert_eq!(refs[1].id, Some("evt-1".to_string()));
3732 assert_eq!(refs[2].id, Some("evt-2".to_string()));
3733 }
3734
3735 #[test]
3736 fn test_correlation_refs_mode_no_id_field() {
3737 let yaml = r#"
3738title: Login
3739id: login-rule
3740logsource:
3741 category: auth
3742detection:
3743 selection:
3744 EventType: login
3745 condition: selection
3746---
3747title: Many Logins
3748correlation:
3749 type: event_count
3750 rules:
3751 - login-rule
3752 group-by:
3753 - User
3754 timespan: 60s
3755 condition:
3756 gte: 2
3757level: high
3758"#;
3759 let collection = parse_sigma_yaml(yaml).unwrap();
3760 let config = CorrelationConfig {
3761 correlation_event_mode: CorrelationEventMode::Refs,
3762 ..Default::default()
3763 };
3764 let mut engine = CorrelationEngine::new(config);
3765 engine.add_collection(&collection).unwrap();
3766
3767 let mut corr_result = None;
3768 for i in 0..2 {
3769 let v = json!({"EventType": "login", "User": "admin"});
3770 let event = Event::from_value(&v);
3771 let result = engine.process_event_at(&event, 1000 + i);
3772 if !result.correlations.is_empty() {
3773 corr_result = Some(result.correlations[0].clone());
3774 }
3775 }
3776
3777 let result = corr_result.expect("correlation should have fired");
3778 let refs = result.event_refs.expect("event_refs should be present");
3779 for r in &refs {
3781 assert_eq!(r.id, None);
3782 }
3783 }
3784
3785 #[test]
3786 fn test_per_correlation_custom_attributes_from_yaml() {
3787 let yaml = r#"
3788title: Login
3789id: login-rule
3790logsource:
3791 category: auth
3792detection:
3793 selection:
3794 EventType: login
3795 condition: selection
3796---
3797title: Many Logins
3798custom_attributes:
3799 rsigma.correlation_event_mode: refs
3800 rsigma.max_correlation_events: "5"
3801correlation:
3802 type: event_count
3803 rules:
3804 - login-rule
3805 group-by:
3806 - User
3807 timespan: 60s
3808 condition:
3809 gte: 3
3810level: high
3811"#;
3812 let collection = parse_sigma_yaml(yaml).unwrap();
3813 let config = CorrelationConfig::default();
3815 let mut engine = CorrelationEngine::new(config);
3816 engine.add_collection(&collection).unwrap();
3817
3818 let mut corr_result = None;
3819 for i in 0..3 {
3820 let v = json!({"EventType": "login", "User": "admin", "id": format!("e{i}")});
3821 let event = Event::from_value(&v);
3822 let result = engine.process_event_at(&event, 1000 + i);
3823 if !result.correlations.is_empty() {
3824 corr_result = Some(result.correlations[0].clone());
3825 }
3826 }
3827
3828 let result = corr_result.expect("correlation should fire with per-correlation refs mode");
3829 assert!(result.events.is_none());
3831 let refs = result
3832 .event_refs
3833 .expect("event_refs via per-correlation override");
3834 assert_eq!(refs.len(), 3);
3835 assert_eq!(refs[0].id, Some("e0".to_string()));
3836 }
3837
3838 #[test]
3839 fn test_per_correlation_custom_attr_suppress_and_action() {
3840 let yaml = r#"
3841title: Login
3842id: login-rule
3843logsource:
3844 category: auth
3845detection:
3846 selection:
3847 EventType: login
3848 condition: selection
3849---
3850title: Many Logins
3851custom_attributes:
3852 rsigma.suppress: 10s
3853 rsigma.action: reset
3854correlation:
3855 type: event_count
3856 rules:
3857 - login-rule
3858 group-by:
3859 - User
3860 timespan: 60s
3861 condition:
3862 gte: 2
3863level: high
3864"#;
3865 let collection = parse_sigma_yaml(yaml).unwrap();
3866 let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3867 engine.add_collection(&collection).unwrap();
3868
3869 assert_eq!(engine.correlations[0].suppress_secs, Some(10));
3871 assert_eq!(
3872 engine.correlations[0].action,
3873 Some(CorrelationAction::Reset)
3874 );
3875 }
3876}