1mod introspect;
16#[cfg(test)]
17mod tests;
18mod types;
19
20pub use introspect::{CorrelationInfo, CorrelationStateSnapshot, GroupKeyPart, GroupStateInfo};
21pub use types::*;
22
23use std::collections::HashMap;
24
25use chrono::{DateTime, TimeZone, Utc};
26
27use rsigma_parser::{CorrelationRule, CorrelationType, SigmaCollection, SigmaRule, WindowMode};
28
29use crate::correlation::{
30 CompiledCorrelation, EventBuffer, EventRefBuffer, GroupKey, WindowDecision, WindowState,
31 apply_window_open, compile_correlation,
32};
33use crate::engine::Engine;
34use crate::error::{EvalError, Result};
35use crate::event::{Event, EventValue};
36use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
37use crate::result::{CorrelationBody, EvaluationResult, ResultBody, RuleHeader};
38
39const SNAPSHOT_VERSION: u32 = 1;
45
46pub struct CorrelationEngine {
51 engine: Engine,
53 correlations: Vec<CompiledCorrelation>,
55 rule_index: HashMap<String, Vec<usize>>,
58 rule_ids: Vec<(Option<String>, Option<String>)>,
61 state: HashMap<(usize, GroupKey), WindowState>,
63 last_alert: HashMap<(usize, GroupKey), i64>,
65 event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
67 event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
69 correlation_only_rules: std::collections::HashSet<String>,
73 config: CorrelationConfig,
75 pipelines: Vec<Pipeline>,
77}
78
79impl CorrelationEngine {
80 pub fn new(config: CorrelationConfig) -> Self {
82 CorrelationEngine {
83 engine: Engine::new(),
84 correlations: Vec::new(),
85 rule_index: HashMap::new(),
86 rule_ids: Vec::new(),
87 state: HashMap::new(),
88 last_alert: HashMap::new(),
89 event_buffers: HashMap::new(),
90 event_ref_buffers: HashMap::new(),
91 correlation_only_rules: std::collections::HashSet::new(),
92 config,
93 pipelines: Vec::new(),
94 }
95 }
96
97 pub fn add_pipeline(&mut self, pipeline: Pipeline) {
101 self.pipelines.push(pipeline);
102 self.pipelines.sort_by_key(|p| p.priority);
103 }
104
105 pub fn set_include_event(&mut self, include: bool) {
107 self.engine.set_include_event(include);
108 }
109
110 pub fn set_match_detail(&mut self, level: crate::result::MatchDetailLevel) {
113 self.engine.set_match_detail(level);
114 }
115
116 pub fn set_bloom_prefilter(&mut self, enabled: bool) {
120 self.engine.set_bloom_prefilter(enabled);
121 }
122
123 pub fn set_logsource_extractor(
127 &mut self,
128 extractor: Option<crate::logsource::LogSourceExtractor>,
129 ) {
130 self.engine.set_logsource_extractor(extractor);
131 }
132
133 pub fn logsource_pruned_total(&self) -> u64 {
135 self.engine.logsource_pruned_total()
136 }
137
138 pub fn logsource_absent_total(&self) -> u64 {
140 self.engine.logsource_absent_total()
141 }
142
143 pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
146 self.engine.set_bloom_max_bytes(max_bytes);
147 }
148
149 #[cfg(feature = "daachorse-index")]
153 pub fn set_cross_rule_ac(&mut self, enabled: bool) {
154 self.engine.set_cross_rule_ac(enabled);
155 }
156
157 pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
163 self.config.correlation_event_mode = mode;
164 }
165
166 pub fn set_max_correlation_events(&mut self, max: usize) {
169 self.config.max_correlation_events = max;
170 }
171
172 pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
178 if self.pipelines.is_empty() {
179 self.apply_custom_attributes(&rule.custom_attributes);
180 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
181 self.engine.add_rule(rule)?;
182 } else {
183 let mut transformed = rule.clone();
184 apply_pipelines(&self.pipelines, &mut transformed)?;
185 self.apply_custom_attributes(&transformed.custom_attributes);
186 self.rule_ids
187 .push((transformed.id.clone(), transformed.name.clone()));
188 let compiled = crate::compiler::compile_rule(&transformed)?;
190 self.engine.add_compiled_rule(compiled);
191 }
192 Ok(())
193 }
194
195 fn apply_custom_attributes(
209 &mut self,
210 attrs: &std::collections::HashMap<String, yaml_serde::Value>,
211 ) {
212 if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
214 && !self.config.timestamp_fields.iter().any(|f| f == field)
215 {
216 self.config.timestamp_fields.insert(0, field.to_string());
217 }
218
219 if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
221 && self.config.suppress.is_none()
222 && let Ok(ts) = rsigma_parser::Timespan::parse(val)
223 {
224 self.config.suppress = Some(ts.seconds);
225 }
226
227 if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
229 && self.config.action_on_match == CorrelationAction::Alert
230 && let Ok(a) = val.parse::<CorrelationAction>()
231 {
232 self.config.action_on_match = a;
233 }
234 }
235
236 pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
238 let owned;
239 let effective = if self.pipelines.is_empty() {
240 corr
241 } else {
242 owned = {
243 let mut c = corr.clone();
244 apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
245 c
246 };
247 &owned
248 };
249
250 self.apply_custom_attributes(&effective.custom_attributes);
253
254 let compiled = compile_correlation(effective)?;
255 let idx = self.correlations.len();
256
257 for rule_ref in &compiled.rule_refs {
259 self.rule_index
260 .entry(rule_ref.clone())
261 .or_default()
262 .push(idx);
263 }
264
265 if !compiled.generate {
267 for rule_ref in &compiled.rule_refs {
268 self.correlation_only_rules.insert(rule_ref.clone());
269 }
270 }
271
272 self.correlations.push(compiled);
273 Ok(())
274 }
275
276 pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
285 let mut compiled_batch = Vec::with_capacity(collection.rules.len());
286 if self.pipelines.is_empty() {
287 for rule in &collection.rules {
288 self.apply_custom_attributes(&rule.custom_attributes);
289 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
290 compiled_batch.push(crate::compiler::compile_rule(rule)?);
291 }
292 } else {
293 for rule in &collection.rules {
294 let mut transformed = rule.clone();
295 apply_pipelines(&self.pipelines, &mut transformed)?;
296 self.apply_custom_attributes(&transformed.custom_attributes);
297 self.rule_ids
298 .push((transformed.id.clone(), transformed.name.clone()));
299 compiled_batch.push(crate::compiler::compile_rule(&transformed)?);
301 }
302 }
303 self.engine.extend_compiled_rules(compiled_batch);
304 for filter in &collection.filters {
306 self.engine.apply_filter(filter)?;
307 }
308 for corr in &collection.correlations {
309 self.add_correlation(corr)?;
310 }
311 self.validate_rule_refs()?;
312 self.detect_correlation_cycles()?;
313 Ok(())
314 }
315
316 fn validate_rule_refs(&self) -> Result<()> {
319 let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
320
321 for (id, name) in &self.rule_ids {
322 if let Some(id) = id {
323 known.insert(id.as_str());
324 }
325 if let Some(name) = name {
326 known.insert(name.as_str());
327 }
328 }
329 for corr in &self.correlations {
330 if let Some(ref id) = corr.id {
331 known.insert(id.as_str());
332 }
333 if let Some(ref name) = corr.name {
334 known.insert(name.as_str());
335 }
336 }
337
338 for corr in &self.correlations {
339 for rule_ref in &corr.rule_refs {
340 if !known.contains(rule_ref.as_str()) {
341 return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
342 }
343 }
344 }
345 Ok(())
346 }
347
348 fn detect_correlation_cycles(&self) -> Result<()> {
356 let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
358 for (idx, corr) in self.correlations.iter().enumerate() {
359 if let Some(ref id) = corr.id {
360 corr_identifiers.insert(id.as_str(), idx);
361 }
362 if let Some(ref name) = corr.name {
363 corr_identifiers.insert(name.as_str(), idx);
364 }
365 }
366
367 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
369 for (idx, corr) in self.correlations.iter().enumerate() {
370 for rule_ref in &corr.rule_refs {
371 if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
372 adj[idx].push(target_idx);
373 }
374 }
375 }
376
377 let mut state = vec![0u8; self.correlations.len()]; let mut path: Vec<usize> = Vec::new();
380
381 for start in 0..self.correlations.len() {
382 if state[start] == 0
383 && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
384 {
385 let names: Vec<String> = cycle
386 .iter()
387 .map(|&i| {
388 self.correlations[i]
389 .id
390 .as_deref()
391 .or(self.correlations[i].name.as_deref())
392 .unwrap_or(&self.correlations[i].title)
393 .to_string()
394 })
395 .collect();
396 return Err(crate::error::EvalError::CorrelationCycle(
397 names.join(" -> "),
398 ));
399 }
400 }
401 Ok(())
402 }
403
404 fn dfs_find_cycle(
406 node: usize,
407 adj: &[Vec<usize>],
408 state: &mut [u8],
409 path: &mut Vec<usize>,
410 ) -> Option<Vec<usize>> {
411 state[node] = 1; path.push(node);
413
414 for &next in &adj[node] {
415 if state[next] == 1 {
416 if let Some(pos) = path.iter().position(|&n| n == next) {
418 let mut cycle = path[pos..].to_vec();
419 cycle.push(next); return Some(cycle);
421 }
422 }
423 if state[next] == 0
424 && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
425 {
426 return Some(cycle);
427 }
428 }
429
430 path.pop();
431 state[node] = 2; None
433 }
434
435 pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
441 let all_detections = self.engine.evaluate(event);
442 self.correlate_detections(event, all_detections)
443 }
444
445 pub fn correlate_detections(
454 &mut self,
455 event: &impl Event,
456 all_detections: Vec<EvaluationResult>,
457 ) -> ProcessResult {
458 let ts = match self.extract_event_timestamp(event) {
459 Some(ts) => ts,
460 None => match self.config.timestamp_fallback {
461 TimestampFallback::WallClock => Utc::now().timestamp(),
462 TimestampFallback::Skip => {
463 return self.filter_detections(all_detections);
465 }
466 },
467 };
468 self.process_with_detections(event, all_detections, ts)
469 }
470
471 pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
476 let all_detections = self.engine.evaluate(event);
477 self.process_with_detections(event, all_detections, timestamp_secs)
478 }
479
480 pub fn process_with_detections(
486 &mut self,
487 event: &impl Event,
488 all_detections: Vec<EvaluationResult>,
489 timestamp_secs: i64,
490 ) -> ProcessResult {
491 let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
492
493 if self.state.len() >= self.config.max_state_entries {
495 self.evict_all(timestamp_secs);
496 }
497
498 let mut correlations: Vec<EvaluationResult> = Vec::new();
500 self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
501
502 self.chain_correlations(&correlations, timestamp_secs);
504
505 let mut out = self.filter_detections(all_detections);
507 out.extend(correlations);
508 out
509 }
510
511 pub fn evaluate(&self, event: &impl Event) -> Vec<EvaluationResult> {
518 self.engine.evaluate(event)
519 }
520
521 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
529 let engine = &self.engine;
532 let ts_fields = &self.config.timestamp_fields;
533
534 let batch_results: Vec<(Vec<EvaluationResult>, Option<i64>)> = {
535 #[cfg(feature = "parallel")]
536 {
537 use rayon::prelude::*;
538 events
539 .par_iter()
540 .map(|e| {
541 let detections = engine.evaluate(e);
542 let ts = extract_event_ts(e, ts_fields);
543 (detections, ts)
544 })
545 .collect()
546 }
547 #[cfg(not(feature = "parallel"))]
548 {
549 events
550 .iter()
551 .map(|e| {
552 let detections = engine.evaluate(e);
553 let ts = extract_event_ts(e, ts_fields);
554 (detections, ts)
555 })
556 .collect()
557 }
558 };
559
560 let mut results = Vec::with_capacity(events.len());
562 for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
563 match ts_opt {
564 Some(ts) => {
565 results.push(self.process_with_detections(event, detections, ts));
566 }
567 None => match self.config.timestamp_fallback {
568 TimestampFallback::WallClock => {
569 let ts = Utc::now().timestamp();
570 results.push(self.process_with_detections(event, detections, ts));
571 }
572 TimestampFallback::Skip => {
573 results.push(self.filter_detections(detections));
575 }
576 },
577 }
578 }
579 results
580 }
581
582 fn filter_detections(&self, all_detections: Vec<EvaluationResult>) -> Vec<EvaluationResult> {
587 if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
588 all_detections
589 .into_iter()
590 .filter(|m| {
591 let id_match = m
592 .header
593 .rule_id
594 .as_ref()
595 .is_some_and(|id| self.correlation_only_rules.contains(id));
596 !id_match
597 })
598 .collect()
599 } else {
600 all_detections
601 }
602 }
603
604 fn feed_detections(
606 &mut self,
607 event: &impl Event,
608 detections: &[EvaluationResult],
609 ts: i64,
610 out: &mut Vec<EvaluationResult>,
611 ) {
612 let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
615
616 for det in detections {
617 let (rule_id, rule_name) = self.find_rule_identity(det);
620
621 let mut corr_indices = Vec::new();
623 if let Some(ref id) = rule_id
624 && let Some(indices) = self.rule_index.get(id)
625 {
626 corr_indices.extend(indices);
627 }
628 if let Some(ref name) = rule_name
629 && let Some(indices) = self.rule_index.get(name)
630 {
631 corr_indices.extend(indices);
632 }
633
634 corr_indices.sort_unstable();
635 corr_indices.dedup();
636
637 for &corr_idx in &corr_indices {
638 work.push((corr_idx, rule_id.clone(), rule_name.clone()));
639 }
640 }
641
642 for (corr_idx, rule_id, rule_name) in work {
643 self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
644 }
645 }
646
647 fn find_rule_identity(&self, det: &EvaluationResult) -> (Option<String>, Option<String>) {
649 if let Some(ref match_id) = det.header.rule_id {
651 for (id, name) in &self.rule_ids {
652 if id.as_deref() == Some(match_id.as_str()) {
653 return (id.clone(), name.clone());
654 }
655 }
656 }
657 (det.header.rule_id.clone(), None)
659 }
660
661 fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
663 let corr = &self.correlations[corr_idx];
664 corr.event_mode
665 .unwrap_or(self.config.correlation_event_mode)
666 }
667
668 fn resolve_max_events(&self, corr_idx: usize) -> usize {
670 let corr = &self.correlations[corr_idx];
671 corr.max_events
672 .unwrap_or(self.config.max_correlation_events)
673 }
674
675 fn resolve_max_group_entries(&self, corr_idx: usize) -> Option<usize> {
678 let corr = &self.correlations[corr_idx];
679 corr.max_group_entries.or(self.config.max_group_entries)
680 }
681
682 fn update_correlation(
684 &mut self,
685 corr_idx: usize,
686 event: &impl Event,
687 ts: i64,
688 rule_id: &Option<String>,
689 rule_name: &Option<String>,
690 out: &mut Vec<EvaluationResult>,
691 ) {
692 let corr = &self.correlations[corr_idx];
696 let corr_type = corr.correlation_type;
697 let timespan = corr.timespan_secs;
698 let window_mode = corr.window_mode;
699 let gap_secs = corr.gap_secs;
700 let level = corr.level;
701 let suppress_secs = corr.suppress_secs.or(self.config.suppress);
702 let action = corr.action.unwrap_or(self.config.action_on_match);
703 let event_mode = self.resolve_event_mode(corr_idx);
704 let max_events = self.resolve_max_events(corr_idx);
705 let max_group_entries = self.resolve_max_group_entries(corr_idx);
706
707 let mut ref_strs: Vec<&str> = Vec::new();
709 if let Some(id) = rule_id.as_deref() {
710 ref_strs.push(id);
711 }
712 if let Some(name) = rule_name.as_deref() {
713 ref_strs.push(name);
714 }
715 let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
716
717 let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
719
720 let state_key = (corr_idx, group_key.clone());
722 let state = self
723 .state
724 .entry(state_key.clone())
725 .or_insert_with(|| WindowState::new_for(corr_type));
726
727 let cutoff = ts - timespan as i64;
733 let decision = apply_window_open(state, ts, timespan, window_mode, gap_secs);
734 if decision == WindowDecision::Discard {
735 return;
736 }
737 let reset = decision == WindowDecision::Reset;
738
739 match corr_type {
741 CorrelationType::EventCount => {
742 state.push_event_count(ts);
743 }
744 CorrelationType::ValueCount => {
745 if let Some(ref fields) = corr.condition.field
746 && let Some(key) = composite_value_count_key(event, fields)
747 {
748 state.push_value_count(ts, key);
749 }
750 }
751 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
752 state.push_temporal(ts, rule_ref);
753 }
754 CorrelationType::ValueSum
755 | CorrelationType::ValueAvg
756 | CorrelationType::ValuePercentile
757 | CorrelationType::ValueMedian => {
758 if let Some(ref fields) = corr.condition.field
759 && let Some(field_name) = fields.first()
760 && let Some(val) = event.get_field(field_name)
761 && let Some(n) = value_to_f64_ev(&val)
762 {
763 state.push_numeric(ts, n);
764 }
765 }
766 }
767
768 if let Some(cap) = max_group_entries {
772 state.truncate_oldest(cap, window_mode == WindowMode::Session);
773 }
774
775 match event_mode {
779 CorrelationEventMode::Full => {
780 let buf = self
781 .event_buffers
782 .entry(state_key.clone())
783 .or_insert_with(|| EventBuffer::new(max_events));
784 if window_mode == rsigma_parser::WindowMode::Sliding {
785 buf.evict(cutoff);
786 } else if reset {
787 buf.clear();
788 }
789 let json = event.to_json();
790 buf.push(ts, &json);
791 }
792 CorrelationEventMode::Refs => {
793 let buf = self
794 .event_ref_buffers
795 .entry(state_key.clone())
796 .or_insert_with(|| EventRefBuffer::new(max_events));
797 if window_mode == rsigma_parser::WindowMode::Sliding {
798 buf.evict(cutoff);
799 } else if reset {
800 buf.clear();
801 }
802 let json = event.to_json();
803 buf.push(ts, &json);
804 }
805 CorrelationEventMode::None => {}
806 }
807
808 let fired = state.check_condition(
810 &corr.condition,
811 corr_type,
812 &corr.rule_refs,
813 corr.extended_expr.as_ref(),
814 );
815
816 if let Some(agg_value) = fired {
817 let alert_key = (corr_idx, group_key.clone());
818
819 let suppressed = if let Some(suppress) = suppress_secs {
821 if let Some(&last_ts) = self.last_alert.get(&alert_key) {
822 (ts - last_ts) < suppress as i64
823 } else {
824 false
825 }
826 } else {
827 false
828 };
829
830 if !suppressed {
831 let (events, event_refs) = match event_mode {
833 CorrelationEventMode::Full => {
834 let stored = self
835 .event_buffers
836 .get(&alert_key)
837 .map(|buf| buf.decompress_all())
838 .unwrap_or_default();
839 (Some(stored), None)
840 }
841 CorrelationEventMode::Refs => {
842 let stored = self
843 .event_ref_buffers
844 .get(&alert_key)
845 .map(|buf| buf.refs())
846 .unwrap_or_default();
847 (None, Some(stored))
848 }
849 CorrelationEventMode::None => (None, None),
850 };
851
852 let corr = &self.correlations[corr_idx];
854 let result = EvaluationResult {
855 header: RuleHeader {
856 rule_title: corr.title.clone(),
857 rule_id: corr.id.clone(),
858 level,
859 tags: corr.tags.clone(),
860 custom_attributes: corr.custom_attributes.clone(),
861 enrichments: None,
862 },
863 body: ResultBody::Correlation(CorrelationBody {
864 correlation_type: corr_type,
865 group_key: group_key.to_pairs(&corr.group_by),
866 aggregated_value: agg_value,
867 timespan_secs: timespan,
868 events,
869 event_refs,
870 }),
871 };
872 out.push(result);
873
874 self.last_alert.insert(alert_key.clone(), ts);
876
877 if action == CorrelationAction::Reset {
879 if let Some(state) = self.state.get_mut(&alert_key) {
880 state.clear();
881 }
882 if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
883 buf.clear();
884 }
885 if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
886 buf.clear();
887 }
888 }
889 }
890 }
891 }
892
893 fn chain_correlations(&mut self, fired: &[EvaluationResult], ts: i64) {
898 const MAX_CHAIN_DEPTH: usize = 10;
899 let mut pending: Vec<EvaluationResult> = fired.to_vec();
900 let mut depth = 0;
901
902 while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
903 depth += 1;
904
905 #[allow(clippy::type_complexity)]
907 let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
908 for result in &pending {
909 let Some(body) = result.as_correlation() else {
911 continue;
912 };
913 if let Some(ref id) = result.header.rule_id
914 && let Some(indices) = self.rule_index.get(id)
915 {
916 let fired_ref = result
917 .header
918 .rule_id
919 .as_deref()
920 .unwrap_or(&result.header.rule_title)
921 .to_string();
922 for &corr_idx in indices {
923 work.push((corr_idx, body.group_key.clone(), fired_ref.clone()));
924 }
925 }
926 }
927
928 let mut next_pending = Vec::new();
929 for (corr_idx, group_key_pairs, fired_ref) in work {
930 let corr = &self.correlations[corr_idx];
931 let corr_type = corr.correlation_type;
932 let timespan = corr.timespan_secs;
933 let window_mode = corr.window_mode;
934 let gap_secs = corr.gap_secs;
935 let level = corr.level;
936
937 let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
938 let state_key = (corr_idx, group_key.clone());
939 let state = self
940 .state
941 .entry(state_key)
942 .or_insert_with(|| WindowState::new_for(corr_type));
943
944 if apply_window_open(state, ts, timespan, window_mode, gap_secs)
948 == WindowDecision::Discard
949 {
950 continue;
951 }
952
953 match corr_type {
954 CorrelationType::EventCount => {
955 state.push_event_count(ts);
956 }
957 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
958 state.push_temporal(ts, &fired_ref);
959 }
960 _ => {
961 state.push_event_count(ts);
962 }
963 }
964
965 if let Some(cap) = corr.max_group_entries.or(self.config.max_group_entries) {
968 state.truncate_oldest(cap, window_mode == WindowMode::Session);
969 }
970
971 let fired = state.check_condition(
972 &corr.condition,
973 corr_type,
974 &corr.rule_refs,
975 corr.extended_expr.as_ref(),
976 );
977
978 if let Some(agg_value) = fired {
979 let corr = &self.correlations[corr_idx];
980 next_pending.push(EvaluationResult {
981 header: RuleHeader {
982 rule_title: corr.title.clone(),
983 rule_id: corr.id.clone(),
984 level,
985 tags: corr.tags.clone(),
986 custom_attributes: corr.custom_attributes.clone(),
987 enrichments: None,
988 },
989 body: ResultBody::Correlation(CorrelationBody {
990 correlation_type: corr_type,
991 group_key: group_key.to_pairs(&corr.group_by),
992 aggregated_value: agg_value,
993 timespan_secs: timespan,
994 events: None,
998 event_refs: None,
999 }),
1000 });
1001 }
1002 }
1003
1004 pending = next_pending;
1005 }
1006
1007 if !pending.is_empty() {
1008 log::warn!(
1009 "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
1010 {} pending result(s) were not propagated further. \
1011 This may indicate a cycle in correlation references.",
1012 pending.len()
1013 );
1014 }
1015 }
1016
1017 fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
1029 for field_name in &self.config.timestamp_fields {
1030 if let Some(val) = event.get_field(field_name)
1031 && let Some(ts) = parse_timestamp_value(&val)
1032 {
1033 return Some(ts);
1034 }
1035 }
1036 None
1037 }
1038
1039 pub fn evict_expired(&mut self, now_secs: i64) {
1045 self.evict_all(now_secs);
1046 }
1047
1048 fn evict_all(&mut self, now_secs: i64) {
1050 let specs: Vec<(u64, WindowMode, Option<u64>)> = self
1061 .correlations
1062 .iter()
1063 .map(|c| (c.timespan_secs, c.window_mode, c.gap_secs))
1064 .collect();
1065
1066 self.state.retain(|&(corr_idx, _), state| {
1067 if let Some(&(timespan, mode, gap)) = specs.get(corr_idx) {
1068 match mode {
1069 WindowMode::Sliding => {
1070 state.evict(now_secs - timespan as i64);
1071 }
1072 WindowMode::Tumbling | WindowMode::Session => {
1073 let staleness = if mode == WindowMode::Session {
1074 gap.unwrap_or(timespan)
1075 } else {
1076 timespan
1077 } as i64;
1078 if state
1079 .latest_timestamp()
1080 .is_some_and(|last| now_secs - last > staleness)
1081 {
1082 state.clear();
1083 }
1084 }
1085 }
1086 }
1087 !state.is_empty()
1088 });
1089
1090 let state = &self.state;
1094 self.event_buffers.retain(|key, buf| {
1095 if let Some(&(timespan, mode, _)) = specs.get(key.0) {
1096 match mode {
1097 WindowMode::Sliding => buf.evict(now_secs - timespan as i64),
1098 WindowMode::Tumbling | WindowMode::Session => {
1099 if !state.contains_key(key) {
1100 return false;
1101 }
1102 }
1103 }
1104 }
1105 !buf.is_empty()
1106 });
1107 self.event_ref_buffers.retain(|key, buf| {
1108 if let Some(&(timespan, mode, _)) = specs.get(key.0) {
1109 match mode {
1110 WindowMode::Sliding => buf.evict(now_secs - timespan as i64),
1111 WindowMode::Tumbling | WindowMode::Session => {
1112 if !state.contains_key(key) {
1113 return false;
1114 }
1115 }
1116 }
1117 }
1118 !buf.is_empty()
1119 });
1120
1121 if self.state.len() >= self.config.max_state_entries {
1125 let target = self.config.max_state_entries * 9 / 10;
1126 let excess = self.state.len() - target;
1127
1128 log::warn!(
1129 "Correlation state hard cap reached ({} entries, max {}); \
1130 evicting {} stalest entries to {} (90% capacity). \
1131 This indicates high-cardinality traffic; consider raising \
1132 max_state_entries or shortening correlation windows.",
1133 self.state.len(),
1134 self.config.max_state_entries,
1135 excess,
1136 target,
1137 );
1138
1139 let mut by_staleness: Vec<_> = self
1141 .state
1142 .iter()
1143 .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1144 .collect();
1145 by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1146
1147 for (key, _) in by_staleness.into_iter().take(excess) {
1149 self.state.remove(&key);
1150 self.last_alert.remove(&key);
1151 self.event_buffers.remove(&key);
1152 self.event_ref_buffers.remove(&key);
1153 }
1154 }
1155
1156 self.last_alert.retain(|key, &mut alert_ts| {
1159 let suppress = if key.0 < self.correlations.len() {
1160 self.correlations[key.0]
1161 .suppress_secs
1162 .or(self.config.suppress)
1163 .unwrap_or(0)
1164 } else {
1165 0
1166 };
1167 (now_secs - alert_ts) < suppress as i64
1168 });
1169 }
1170
1171 pub fn state_count(&self) -> usize {
1173 self.state.len()
1174 }
1175
1176 pub fn detection_rule_count(&self) -> usize {
1178 self.engine.rule_count()
1179 }
1180
1181 pub fn correlation_rule_count(&self) -> usize {
1183 self.correlations.len()
1184 }
1185
1186 pub fn event_buffer_count(&self) -> usize {
1188 self.event_buffers.len()
1189 }
1190
1191 pub fn event_buffer_bytes(&self) -> usize {
1193 self.event_buffers
1194 .values()
1195 .map(|b| b.compressed_bytes())
1196 .sum()
1197 }
1198
1199 pub fn event_ref_buffer_count(&self) -> usize {
1201 self.event_ref_buffers.len()
1202 }
1203
1204 pub fn engine(&self) -> &Engine {
1206 &self.engine
1207 }
1208
1209 pub fn export_state(&self) -> CorrelationSnapshot {
1215 let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1216 for ((idx, gk), ws) in &self.state {
1217 let corr_id = self.correlation_stable_id(*idx);
1218 windows
1219 .entry(corr_id)
1220 .or_default()
1221 .push((gk.clone(), ws.clone()));
1222 }
1223
1224 let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1225 for ((idx, gk), ts) in &self.last_alert {
1226 let corr_id = self.correlation_stable_id(*idx);
1227 last_alert
1228 .entry(corr_id)
1229 .or_default()
1230 .push((gk.clone(), *ts));
1231 }
1232
1233 let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1234 for ((idx, gk), buf) in &self.event_buffers {
1235 let corr_id = self.correlation_stable_id(*idx);
1236 event_buffers
1237 .entry(corr_id)
1238 .or_default()
1239 .push((gk.clone(), buf.clone()));
1240 }
1241
1242 let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1243 HashMap::new();
1244 for ((idx, gk), buf) in &self.event_ref_buffers {
1245 let corr_id = self.correlation_stable_id(*idx);
1246 event_ref_buffers
1247 .entry(corr_id)
1248 .or_default()
1249 .push((gk.clone(), buf.clone()));
1250 }
1251
1252 CorrelationSnapshot {
1253 version: SNAPSHOT_VERSION,
1254 windows,
1255 last_alert,
1256 event_buffers,
1257 event_ref_buffers,
1258 }
1259 }
1260
1261 pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1268 if snapshot.version != SNAPSHOT_VERSION {
1269 return false;
1270 }
1271 let id_to_idx = self.build_id_to_index_map();
1272
1273 for (corr_id, groups) in snapshot.windows {
1274 if let Some(&idx) = id_to_idx.get(&corr_id) {
1275 for (gk, ws) in groups {
1276 self.state.insert((idx, gk), ws);
1277 }
1278 }
1279 }
1280
1281 for (corr_id, groups) in snapshot.last_alert {
1282 if let Some(&idx) = id_to_idx.get(&corr_id) {
1283 for (gk, ts) in groups {
1284 self.last_alert.insert((idx, gk), ts);
1285 }
1286 }
1287 }
1288
1289 for (corr_id, groups) in snapshot.event_buffers {
1290 if let Some(&idx) = id_to_idx.get(&corr_id) {
1291 for (gk, buf) in groups {
1292 self.event_buffers.insert((idx, gk), buf);
1293 }
1294 }
1295 }
1296
1297 for (corr_id, groups) in snapshot.event_ref_buffers {
1298 if let Some(&idx) = id_to_idx.get(&corr_id) {
1299 for (gk, buf) in groups {
1300 self.event_ref_buffers.insert((idx, gk), buf);
1301 }
1302 }
1303 }
1304
1305 true
1306 }
1307
1308 fn correlation_stable_id(&self, idx: usize) -> String {
1310 let corr = &self.correlations[idx];
1311 corr.id
1312 .clone()
1313 .or_else(|| corr.name.clone())
1314 .unwrap_or_else(|| corr.title.clone())
1315 }
1316
1317 fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1319 self.correlations
1320 .iter()
1321 .enumerate()
1322 .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1323 .collect()
1324 }
1325}
1326
1327impl Default for CorrelationEngine {
1328 fn default() -> Self {
1329 Self::new(CorrelationConfig::default())
1330 }
1331}
1332
1333fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1342 for field_name in timestamp_fields {
1343 if let Some(val) = event.get_field(field_name)
1344 && let Some(ts) = parse_timestamp_value(&val)
1345 {
1346 return Some(ts);
1347 }
1348 }
1349 None
1350}
1351
1352fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1354 match val {
1355 EventValue::Int(i) => Some(normalize_epoch(*i)),
1356 EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1357 EventValue::Str(s) => parse_timestamp_string(s),
1358 _ => None,
1359 }
1360}
1361
1362fn normalize_epoch(v: i64) -> i64 {
1365 if v > 1_000_000_000_000 { v / 1000 } else { v }
1366}
1367
1368fn parse_timestamp_string(s: &str) -> Option<i64> {
1370 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1372 return Some(dt.timestamp());
1373 }
1374
1375 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1378 return Some(Utc.from_utc_datetime(&naive).timestamp());
1379 }
1380 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1381 return Some(Utc.from_utc_datetime(&naive).timestamp());
1382 }
1383
1384 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1386 return Some(Utc.from_utc_datetime(&naive).timestamp());
1387 }
1388 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1389 return Some(Utc.from_utc_datetime(&naive).timestamp());
1390 }
1391
1392 None
1393}
1394
1395fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1397 match v {
1398 EventValue::Str(s) => Some(s.to_string()),
1399 EventValue::Int(n) => Some(n.to_string()),
1400 EventValue::Float(f) => Some(f.to_string()),
1401 EventValue::Bool(b) => Some(b.to_string()),
1402 EventValue::Null => Some("null".to_string()),
1403 _ => None,
1404 }
1405}
1406
1407fn composite_value_count_key(event: &impl Event, fields: &[String]) -> Option<String> {
1416 if let [field_name] = fields {
1418 let val = event.get_field(field_name)?;
1419 return value_to_string_for_count(&val);
1420 }
1421
1422 let mut parts = Vec::with_capacity(fields.len());
1423 for field_name in fields {
1424 let val = event.get_field(field_name)?;
1425 let rendered = value_to_string_for_count(&val)?;
1426 parts.push(rendered);
1427 }
1428 Some(parts.join("\u{1f}"))
1429}
1430
1431fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1433 v.as_f64()
1434}