1#[cfg(test)]
16mod tests;
17mod types;
18
19pub use types::*;
20
21use std::collections::HashMap;
22
23use chrono::{DateTime, TimeZone, Utc};
24
25use rsigma_parser::{CorrelationRule, CorrelationType, SigmaCollection, SigmaRule, WindowMode};
26
27use crate::correlation::{
28 CompiledCorrelation, EventBuffer, EventRefBuffer, GroupKey, WindowDecision, WindowState,
29 apply_window_open, compile_correlation,
30};
31use crate::engine::Engine;
32use crate::error::{EvalError, Result};
33use crate::event::{Event, EventValue};
34use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
35use crate::result::{CorrelationBody, EvaluationResult, ResultBody, RuleHeader};
36
37const SNAPSHOT_VERSION: u32 = 1;
43
44pub struct CorrelationEngine {
49 engine: Engine,
51 correlations: Vec<CompiledCorrelation>,
53 rule_index: HashMap<String, Vec<usize>>,
56 rule_ids: Vec<(Option<String>, Option<String>)>,
59 state: HashMap<(usize, GroupKey), WindowState>,
61 last_alert: HashMap<(usize, GroupKey), i64>,
63 event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
65 event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
67 correlation_only_rules: std::collections::HashSet<String>,
71 config: CorrelationConfig,
73 pipelines: Vec<Pipeline>,
75}
76
77impl CorrelationEngine {
78 pub fn new(config: CorrelationConfig) -> Self {
80 CorrelationEngine {
81 engine: Engine::new(),
82 correlations: Vec::new(),
83 rule_index: HashMap::new(),
84 rule_ids: Vec::new(),
85 state: HashMap::new(),
86 last_alert: HashMap::new(),
87 event_buffers: HashMap::new(),
88 event_ref_buffers: HashMap::new(),
89 correlation_only_rules: std::collections::HashSet::new(),
90 config,
91 pipelines: Vec::new(),
92 }
93 }
94
95 pub fn add_pipeline(&mut self, pipeline: Pipeline) {
99 self.pipelines.push(pipeline);
100 self.pipelines.sort_by_key(|p| p.priority);
101 }
102
103 pub fn set_include_event(&mut self, include: bool) {
105 self.engine.set_include_event(include);
106 }
107
108 pub fn set_match_detail(&mut self, level: crate::result::MatchDetailLevel) {
111 self.engine.set_match_detail(level);
112 }
113
114 pub fn set_bloom_prefilter(&mut self, enabled: bool) {
118 self.engine.set_bloom_prefilter(enabled);
119 }
120
121 pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
124 self.engine.set_bloom_max_bytes(max_bytes);
125 }
126
127 #[cfg(feature = "daachorse-index")]
131 pub fn set_cross_rule_ac(&mut self, enabled: bool) {
132 self.engine.set_cross_rule_ac(enabled);
133 }
134
135 pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
141 self.config.correlation_event_mode = mode;
142 }
143
144 pub fn set_max_correlation_events(&mut self, max: usize) {
147 self.config.max_correlation_events = max;
148 }
149
150 pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
156 if self.pipelines.is_empty() {
157 self.apply_custom_attributes(&rule.custom_attributes);
158 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
159 self.engine.add_rule(rule)?;
160 } else {
161 let mut transformed = rule.clone();
162 apply_pipelines(&self.pipelines, &mut transformed)?;
163 self.apply_custom_attributes(&transformed.custom_attributes);
164 self.rule_ids
165 .push((transformed.id.clone(), transformed.name.clone()));
166 let compiled = crate::compiler::compile_rule(&transformed)?;
168 self.engine.add_compiled_rule(compiled);
169 }
170 Ok(())
171 }
172
173 fn apply_custom_attributes(
187 &mut self,
188 attrs: &std::collections::HashMap<String, yaml_serde::Value>,
189 ) {
190 if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
192 && !self.config.timestamp_fields.iter().any(|f| f == field)
193 {
194 self.config.timestamp_fields.insert(0, field.to_string());
195 }
196
197 if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
199 && self.config.suppress.is_none()
200 && let Ok(ts) = rsigma_parser::Timespan::parse(val)
201 {
202 self.config.suppress = Some(ts.seconds);
203 }
204
205 if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
207 && self.config.action_on_match == CorrelationAction::Alert
208 && let Ok(a) = val.parse::<CorrelationAction>()
209 {
210 self.config.action_on_match = a;
211 }
212 }
213
214 pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
216 let owned;
217 let effective = if self.pipelines.is_empty() {
218 corr
219 } else {
220 owned = {
221 let mut c = corr.clone();
222 apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
223 c
224 };
225 &owned
226 };
227
228 self.apply_custom_attributes(&effective.custom_attributes);
231
232 let compiled = compile_correlation(effective)?;
233 let idx = self.correlations.len();
234
235 for rule_ref in &compiled.rule_refs {
237 self.rule_index
238 .entry(rule_ref.clone())
239 .or_default()
240 .push(idx);
241 }
242
243 if !compiled.generate {
245 for rule_ref in &compiled.rule_refs {
246 self.correlation_only_rules.insert(rule_ref.clone());
247 }
248 }
249
250 self.correlations.push(compiled);
251 Ok(())
252 }
253
254 pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
263 let mut compiled_batch = Vec::with_capacity(collection.rules.len());
264 if self.pipelines.is_empty() {
265 for rule in &collection.rules {
266 self.apply_custom_attributes(&rule.custom_attributes);
267 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
268 compiled_batch.push(crate::compiler::compile_rule(rule)?);
269 }
270 } else {
271 for rule in &collection.rules {
272 let mut transformed = rule.clone();
273 apply_pipelines(&self.pipelines, &mut transformed)?;
274 self.apply_custom_attributes(&transformed.custom_attributes);
275 self.rule_ids
276 .push((transformed.id.clone(), transformed.name.clone()));
277 compiled_batch.push(crate::compiler::compile_rule(&transformed)?);
279 }
280 }
281 self.engine.extend_compiled_rules(compiled_batch);
282 for filter in &collection.filters {
284 self.engine.apply_filter(filter)?;
285 }
286 for corr in &collection.correlations {
287 self.add_correlation(corr)?;
288 }
289 self.validate_rule_refs()?;
290 self.detect_correlation_cycles()?;
291 Ok(())
292 }
293
294 fn validate_rule_refs(&self) -> Result<()> {
297 let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
298
299 for (id, name) in &self.rule_ids {
300 if let Some(id) = id {
301 known.insert(id.as_str());
302 }
303 if let Some(name) = name {
304 known.insert(name.as_str());
305 }
306 }
307 for corr in &self.correlations {
308 if let Some(ref id) = corr.id {
309 known.insert(id.as_str());
310 }
311 if let Some(ref name) = corr.name {
312 known.insert(name.as_str());
313 }
314 }
315
316 for corr in &self.correlations {
317 for rule_ref in &corr.rule_refs {
318 if !known.contains(rule_ref.as_str()) {
319 return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
320 }
321 }
322 }
323 Ok(())
324 }
325
326 fn detect_correlation_cycles(&self) -> Result<()> {
334 let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
336 for (idx, corr) in self.correlations.iter().enumerate() {
337 if let Some(ref id) = corr.id {
338 corr_identifiers.insert(id.as_str(), idx);
339 }
340 if let Some(ref name) = corr.name {
341 corr_identifiers.insert(name.as_str(), idx);
342 }
343 }
344
345 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
347 for (idx, corr) in self.correlations.iter().enumerate() {
348 for rule_ref in &corr.rule_refs {
349 if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
350 adj[idx].push(target_idx);
351 }
352 }
353 }
354
355 let mut state = vec![0u8; self.correlations.len()]; let mut path: Vec<usize> = Vec::new();
358
359 for start in 0..self.correlations.len() {
360 if state[start] == 0
361 && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
362 {
363 let names: Vec<String> = cycle
364 .iter()
365 .map(|&i| {
366 self.correlations[i]
367 .id
368 .as_deref()
369 .or(self.correlations[i].name.as_deref())
370 .unwrap_or(&self.correlations[i].title)
371 .to_string()
372 })
373 .collect();
374 return Err(crate::error::EvalError::CorrelationCycle(
375 names.join(" -> "),
376 ));
377 }
378 }
379 Ok(())
380 }
381
382 fn dfs_find_cycle(
384 node: usize,
385 adj: &[Vec<usize>],
386 state: &mut [u8],
387 path: &mut Vec<usize>,
388 ) -> Option<Vec<usize>> {
389 state[node] = 1; path.push(node);
391
392 for &next in &adj[node] {
393 if state[next] == 1 {
394 if let Some(pos) = path.iter().position(|&n| n == next) {
396 let mut cycle = path[pos..].to_vec();
397 cycle.push(next); return Some(cycle);
399 }
400 }
401 if state[next] == 0
402 && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
403 {
404 return Some(cycle);
405 }
406 }
407
408 path.pop();
409 state[node] = 2; None
411 }
412
413 pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
419 let all_detections = self.engine.evaluate(event);
420
421 let ts = match self.extract_event_timestamp(event) {
422 Some(ts) => ts,
423 None => match self.config.timestamp_fallback {
424 TimestampFallback::WallClock => Utc::now().timestamp(),
425 TimestampFallback::Skip => {
426 return self.filter_detections(all_detections);
428 }
429 },
430 };
431 self.process_with_detections(event, all_detections, ts)
432 }
433
434 pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
439 let all_detections = self.engine.evaluate(event);
440 self.process_with_detections(event, all_detections, timestamp_secs)
441 }
442
443 pub fn process_with_detections(
449 &mut self,
450 event: &impl Event,
451 all_detections: Vec<EvaluationResult>,
452 timestamp_secs: i64,
453 ) -> ProcessResult {
454 let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
455
456 if self.state.len() >= self.config.max_state_entries {
458 self.evict_all(timestamp_secs);
459 }
460
461 let mut correlations: Vec<EvaluationResult> = Vec::new();
463 self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
464
465 self.chain_correlations(&correlations, timestamp_secs);
467
468 let mut out = self.filter_detections(all_detections);
470 out.extend(correlations);
471 out
472 }
473
474 pub fn evaluate(&self, event: &impl Event) -> Vec<EvaluationResult> {
481 self.engine.evaluate(event)
482 }
483
484 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
492 let engine = &self.engine;
495 let ts_fields = &self.config.timestamp_fields;
496
497 let batch_results: Vec<(Vec<EvaluationResult>, Option<i64>)> = {
498 #[cfg(feature = "parallel")]
499 {
500 use rayon::prelude::*;
501 events
502 .par_iter()
503 .map(|e| {
504 let detections = engine.evaluate(e);
505 let ts = extract_event_ts(e, ts_fields);
506 (detections, ts)
507 })
508 .collect()
509 }
510 #[cfg(not(feature = "parallel"))]
511 {
512 events
513 .iter()
514 .map(|e| {
515 let detections = engine.evaluate(e);
516 let ts = extract_event_ts(e, ts_fields);
517 (detections, ts)
518 })
519 .collect()
520 }
521 };
522
523 let mut results = Vec::with_capacity(events.len());
525 for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
526 match ts_opt {
527 Some(ts) => {
528 results.push(self.process_with_detections(event, detections, ts));
529 }
530 None => match self.config.timestamp_fallback {
531 TimestampFallback::WallClock => {
532 let ts = Utc::now().timestamp();
533 results.push(self.process_with_detections(event, detections, ts));
534 }
535 TimestampFallback::Skip => {
536 results.push(self.filter_detections(detections));
538 }
539 },
540 }
541 }
542 results
543 }
544
545 fn filter_detections(&self, all_detections: Vec<EvaluationResult>) -> Vec<EvaluationResult> {
550 if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
551 all_detections
552 .into_iter()
553 .filter(|m| {
554 let id_match = m
555 .header
556 .rule_id
557 .as_ref()
558 .is_some_and(|id| self.correlation_only_rules.contains(id));
559 !id_match
560 })
561 .collect()
562 } else {
563 all_detections
564 }
565 }
566
567 fn feed_detections(
569 &mut self,
570 event: &impl Event,
571 detections: &[EvaluationResult],
572 ts: i64,
573 out: &mut Vec<EvaluationResult>,
574 ) {
575 let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
578
579 for det in detections {
580 let (rule_id, rule_name) = self.find_rule_identity(det);
583
584 let mut corr_indices = Vec::new();
586 if let Some(ref id) = rule_id
587 && let Some(indices) = self.rule_index.get(id)
588 {
589 corr_indices.extend(indices);
590 }
591 if let Some(ref name) = rule_name
592 && let Some(indices) = self.rule_index.get(name)
593 {
594 corr_indices.extend(indices);
595 }
596
597 corr_indices.sort_unstable();
598 corr_indices.dedup();
599
600 for &corr_idx in &corr_indices {
601 work.push((corr_idx, rule_id.clone(), rule_name.clone()));
602 }
603 }
604
605 for (corr_idx, rule_id, rule_name) in work {
606 self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
607 }
608 }
609
610 fn find_rule_identity(&self, det: &EvaluationResult) -> (Option<String>, Option<String>) {
612 if let Some(ref match_id) = det.header.rule_id {
614 for (id, name) in &self.rule_ids {
615 if id.as_deref() == Some(match_id.as_str()) {
616 return (id.clone(), name.clone());
617 }
618 }
619 }
620 (det.header.rule_id.clone(), None)
622 }
623
624 fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
626 let corr = &self.correlations[corr_idx];
627 corr.event_mode
628 .unwrap_or(self.config.correlation_event_mode)
629 }
630
631 fn resolve_max_events(&self, corr_idx: usize) -> usize {
633 let corr = &self.correlations[corr_idx];
634 corr.max_events
635 .unwrap_or(self.config.max_correlation_events)
636 }
637
638 fn resolve_max_group_entries(&self, corr_idx: usize) -> Option<usize> {
641 let corr = &self.correlations[corr_idx];
642 corr.max_group_entries.or(self.config.max_group_entries)
643 }
644
645 fn update_correlation(
647 &mut self,
648 corr_idx: usize,
649 event: &impl Event,
650 ts: i64,
651 rule_id: &Option<String>,
652 rule_name: &Option<String>,
653 out: &mut Vec<EvaluationResult>,
654 ) {
655 let corr = &self.correlations[corr_idx];
659 let corr_type = corr.correlation_type;
660 let timespan = corr.timespan_secs;
661 let window_mode = corr.window_mode;
662 let gap_secs = corr.gap_secs;
663 let level = corr.level;
664 let suppress_secs = corr.suppress_secs.or(self.config.suppress);
665 let action = corr.action.unwrap_or(self.config.action_on_match);
666 let event_mode = self.resolve_event_mode(corr_idx);
667 let max_events = self.resolve_max_events(corr_idx);
668 let max_group_entries = self.resolve_max_group_entries(corr_idx);
669
670 let mut ref_strs: Vec<&str> = Vec::new();
672 if let Some(id) = rule_id.as_deref() {
673 ref_strs.push(id);
674 }
675 if let Some(name) = rule_name.as_deref() {
676 ref_strs.push(name);
677 }
678 let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
679
680 let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
682
683 let state_key = (corr_idx, group_key.clone());
685 let state = self
686 .state
687 .entry(state_key.clone())
688 .or_insert_with(|| WindowState::new_for(corr_type));
689
690 let cutoff = ts - timespan as i64;
696 let decision = apply_window_open(state, ts, timespan, window_mode, gap_secs);
697 if decision == WindowDecision::Discard {
698 return;
699 }
700 let reset = decision == WindowDecision::Reset;
701
702 match corr_type {
704 CorrelationType::EventCount => {
705 state.push_event_count(ts);
706 }
707 CorrelationType::ValueCount => {
708 if let Some(ref fields) = corr.condition.field
709 && let Some(key) = composite_value_count_key(event, fields)
710 {
711 state.push_value_count(ts, key);
712 }
713 }
714 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
715 state.push_temporal(ts, rule_ref);
716 }
717 CorrelationType::ValueSum
718 | CorrelationType::ValueAvg
719 | CorrelationType::ValuePercentile
720 | CorrelationType::ValueMedian => {
721 if let Some(ref fields) = corr.condition.field
722 && let Some(field_name) = fields.first()
723 && let Some(val) = event.get_field(field_name)
724 && let Some(n) = value_to_f64_ev(&val)
725 {
726 state.push_numeric(ts, n);
727 }
728 }
729 }
730
731 if let Some(cap) = max_group_entries {
735 state.truncate_oldest(cap, window_mode == WindowMode::Session);
736 }
737
738 match event_mode {
742 CorrelationEventMode::Full => {
743 let buf = self
744 .event_buffers
745 .entry(state_key.clone())
746 .or_insert_with(|| EventBuffer::new(max_events));
747 if window_mode == rsigma_parser::WindowMode::Sliding {
748 buf.evict(cutoff);
749 } else if reset {
750 buf.clear();
751 }
752 let json = event.to_json();
753 buf.push(ts, &json);
754 }
755 CorrelationEventMode::Refs => {
756 let buf = self
757 .event_ref_buffers
758 .entry(state_key.clone())
759 .or_insert_with(|| EventRefBuffer::new(max_events));
760 if window_mode == rsigma_parser::WindowMode::Sliding {
761 buf.evict(cutoff);
762 } else if reset {
763 buf.clear();
764 }
765 let json = event.to_json();
766 buf.push(ts, &json);
767 }
768 CorrelationEventMode::None => {}
769 }
770
771 let fired = state.check_condition(
773 &corr.condition,
774 corr_type,
775 &corr.rule_refs,
776 corr.extended_expr.as_ref(),
777 );
778
779 if let Some(agg_value) = fired {
780 let alert_key = (corr_idx, group_key.clone());
781
782 let suppressed = if let Some(suppress) = suppress_secs {
784 if let Some(&last_ts) = self.last_alert.get(&alert_key) {
785 (ts - last_ts) < suppress as i64
786 } else {
787 false
788 }
789 } else {
790 false
791 };
792
793 if !suppressed {
794 let (events, event_refs) = match event_mode {
796 CorrelationEventMode::Full => {
797 let stored = self
798 .event_buffers
799 .get(&alert_key)
800 .map(|buf| buf.decompress_all())
801 .unwrap_or_default();
802 (Some(stored), None)
803 }
804 CorrelationEventMode::Refs => {
805 let stored = self
806 .event_ref_buffers
807 .get(&alert_key)
808 .map(|buf| buf.refs())
809 .unwrap_or_default();
810 (None, Some(stored))
811 }
812 CorrelationEventMode::None => (None, None),
813 };
814
815 let corr = &self.correlations[corr_idx];
817 let result = EvaluationResult {
818 header: RuleHeader {
819 rule_title: corr.title.clone(),
820 rule_id: corr.id.clone(),
821 level,
822 tags: corr.tags.clone(),
823 custom_attributes: corr.custom_attributes.clone(),
824 enrichments: None,
825 },
826 body: ResultBody::Correlation(CorrelationBody {
827 correlation_type: corr_type,
828 group_key: group_key.to_pairs(&corr.group_by),
829 aggregated_value: agg_value,
830 timespan_secs: timespan,
831 events,
832 event_refs,
833 }),
834 };
835 out.push(result);
836
837 self.last_alert.insert(alert_key.clone(), ts);
839
840 if action == CorrelationAction::Reset {
842 if let Some(state) = self.state.get_mut(&alert_key) {
843 state.clear();
844 }
845 if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
846 buf.clear();
847 }
848 if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
849 buf.clear();
850 }
851 }
852 }
853 }
854 }
855
856 fn chain_correlations(&mut self, fired: &[EvaluationResult], ts: i64) {
861 const MAX_CHAIN_DEPTH: usize = 10;
862 let mut pending: Vec<EvaluationResult> = fired.to_vec();
863 let mut depth = 0;
864
865 while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
866 depth += 1;
867
868 #[allow(clippy::type_complexity)]
870 let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
871 for result in &pending {
872 let Some(body) = result.as_correlation() else {
874 continue;
875 };
876 if let Some(ref id) = result.header.rule_id
877 && let Some(indices) = self.rule_index.get(id)
878 {
879 let fired_ref = result
880 .header
881 .rule_id
882 .as_deref()
883 .unwrap_or(&result.header.rule_title)
884 .to_string();
885 for &corr_idx in indices {
886 work.push((corr_idx, body.group_key.clone(), fired_ref.clone()));
887 }
888 }
889 }
890
891 let mut next_pending = Vec::new();
892 for (corr_idx, group_key_pairs, fired_ref) in work {
893 let corr = &self.correlations[corr_idx];
894 let corr_type = corr.correlation_type;
895 let timespan = corr.timespan_secs;
896 let window_mode = corr.window_mode;
897 let gap_secs = corr.gap_secs;
898 let level = corr.level;
899
900 let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
901 let state_key = (corr_idx, group_key.clone());
902 let state = self
903 .state
904 .entry(state_key)
905 .or_insert_with(|| WindowState::new_for(corr_type));
906
907 if apply_window_open(state, ts, timespan, window_mode, gap_secs)
911 == WindowDecision::Discard
912 {
913 continue;
914 }
915
916 match corr_type {
917 CorrelationType::EventCount => {
918 state.push_event_count(ts);
919 }
920 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
921 state.push_temporal(ts, &fired_ref);
922 }
923 _ => {
924 state.push_event_count(ts);
925 }
926 }
927
928 if let Some(cap) = corr.max_group_entries.or(self.config.max_group_entries) {
931 state.truncate_oldest(cap, window_mode == WindowMode::Session);
932 }
933
934 let fired = state.check_condition(
935 &corr.condition,
936 corr_type,
937 &corr.rule_refs,
938 corr.extended_expr.as_ref(),
939 );
940
941 if let Some(agg_value) = fired {
942 let corr = &self.correlations[corr_idx];
943 next_pending.push(EvaluationResult {
944 header: RuleHeader {
945 rule_title: corr.title.clone(),
946 rule_id: corr.id.clone(),
947 level,
948 tags: corr.tags.clone(),
949 custom_attributes: corr.custom_attributes.clone(),
950 enrichments: None,
951 },
952 body: ResultBody::Correlation(CorrelationBody {
953 correlation_type: corr_type,
954 group_key: group_key.to_pairs(&corr.group_by),
955 aggregated_value: agg_value,
956 timespan_secs: timespan,
957 events: None,
961 event_refs: None,
962 }),
963 });
964 }
965 }
966
967 pending = next_pending;
968 }
969
970 if !pending.is_empty() {
971 log::warn!(
972 "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
973 {} pending result(s) were not propagated further. \
974 This may indicate a cycle in correlation references.",
975 pending.len()
976 );
977 }
978 }
979
980 fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
992 for field_name in &self.config.timestamp_fields {
993 if let Some(val) = event.get_field(field_name)
994 && let Some(ts) = parse_timestamp_value(&val)
995 {
996 return Some(ts);
997 }
998 }
999 None
1000 }
1001
1002 pub fn evict_expired(&mut self, now_secs: i64) {
1008 self.evict_all(now_secs);
1009 }
1010
1011 fn evict_all(&mut self, now_secs: i64) {
1013 let specs: Vec<(u64, WindowMode, Option<u64>)> = self
1024 .correlations
1025 .iter()
1026 .map(|c| (c.timespan_secs, c.window_mode, c.gap_secs))
1027 .collect();
1028
1029 self.state.retain(|&(corr_idx, _), state| {
1030 if let Some(&(timespan, mode, gap)) = specs.get(corr_idx) {
1031 match mode {
1032 WindowMode::Sliding => {
1033 state.evict(now_secs - timespan as i64);
1034 }
1035 WindowMode::Tumbling | WindowMode::Session => {
1036 let staleness = if mode == WindowMode::Session {
1037 gap.unwrap_or(timespan)
1038 } else {
1039 timespan
1040 } as i64;
1041 if state
1042 .latest_timestamp()
1043 .is_some_and(|last| now_secs - last > staleness)
1044 {
1045 state.clear();
1046 }
1047 }
1048 }
1049 }
1050 !state.is_empty()
1051 });
1052
1053 let state = &self.state;
1057 self.event_buffers.retain(|key, buf| {
1058 if let Some(&(timespan, mode, _)) = specs.get(key.0) {
1059 match mode {
1060 WindowMode::Sliding => buf.evict(now_secs - timespan as i64),
1061 WindowMode::Tumbling | WindowMode::Session => {
1062 if !state.contains_key(key) {
1063 return false;
1064 }
1065 }
1066 }
1067 }
1068 !buf.is_empty()
1069 });
1070 self.event_ref_buffers.retain(|key, buf| {
1071 if let Some(&(timespan, mode, _)) = specs.get(key.0) {
1072 match mode {
1073 WindowMode::Sliding => buf.evict(now_secs - timespan as i64),
1074 WindowMode::Tumbling | WindowMode::Session => {
1075 if !state.contains_key(key) {
1076 return false;
1077 }
1078 }
1079 }
1080 }
1081 !buf.is_empty()
1082 });
1083
1084 if self.state.len() >= self.config.max_state_entries {
1088 let target = self.config.max_state_entries * 9 / 10;
1089 let excess = self.state.len() - target;
1090
1091 log::warn!(
1092 "Correlation state hard cap reached ({} entries, max {}); \
1093 evicting {} stalest entries to {} (90% capacity). \
1094 This indicates high-cardinality traffic; consider raising \
1095 max_state_entries or shortening correlation windows.",
1096 self.state.len(),
1097 self.config.max_state_entries,
1098 excess,
1099 target,
1100 );
1101
1102 let mut by_staleness: Vec<_> = self
1104 .state
1105 .iter()
1106 .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1107 .collect();
1108 by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1109
1110 for (key, _) in by_staleness.into_iter().take(excess) {
1112 self.state.remove(&key);
1113 self.last_alert.remove(&key);
1114 self.event_buffers.remove(&key);
1115 self.event_ref_buffers.remove(&key);
1116 }
1117 }
1118
1119 self.last_alert.retain(|key, &mut alert_ts| {
1122 let suppress = if key.0 < self.correlations.len() {
1123 self.correlations[key.0]
1124 .suppress_secs
1125 .or(self.config.suppress)
1126 .unwrap_or(0)
1127 } else {
1128 0
1129 };
1130 (now_secs - alert_ts) < suppress as i64
1131 });
1132 }
1133
1134 pub fn state_count(&self) -> usize {
1136 self.state.len()
1137 }
1138
1139 pub fn detection_rule_count(&self) -> usize {
1141 self.engine.rule_count()
1142 }
1143
1144 pub fn correlation_rule_count(&self) -> usize {
1146 self.correlations.len()
1147 }
1148
1149 pub fn event_buffer_count(&self) -> usize {
1151 self.event_buffers.len()
1152 }
1153
1154 pub fn event_buffer_bytes(&self) -> usize {
1156 self.event_buffers
1157 .values()
1158 .map(|b| b.compressed_bytes())
1159 .sum()
1160 }
1161
1162 pub fn event_ref_buffer_count(&self) -> usize {
1164 self.event_ref_buffers.len()
1165 }
1166
1167 pub fn engine(&self) -> &Engine {
1169 &self.engine
1170 }
1171
1172 pub fn export_state(&self) -> CorrelationSnapshot {
1178 let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1179 for ((idx, gk), ws) in &self.state {
1180 let corr_id = self.correlation_stable_id(*idx);
1181 windows
1182 .entry(corr_id)
1183 .or_default()
1184 .push((gk.clone(), ws.clone()));
1185 }
1186
1187 let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1188 for ((idx, gk), ts) in &self.last_alert {
1189 let corr_id = self.correlation_stable_id(*idx);
1190 last_alert
1191 .entry(corr_id)
1192 .or_default()
1193 .push((gk.clone(), *ts));
1194 }
1195
1196 let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1197 for ((idx, gk), buf) in &self.event_buffers {
1198 let corr_id = self.correlation_stable_id(*idx);
1199 event_buffers
1200 .entry(corr_id)
1201 .or_default()
1202 .push((gk.clone(), buf.clone()));
1203 }
1204
1205 let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1206 HashMap::new();
1207 for ((idx, gk), buf) in &self.event_ref_buffers {
1208 let corr_id = self.correlation_stable_id(*idx);
1209 event_ref_buffers
1210 .entry(corr_id)
1211 .or_default()
1212 .push((gk.clone(), buf.clone()));
1213 }
1214
1215 CorrelationSnapshot {
1216 version: SNAPSHOT_VERSION,
1217 windows,
1218 last_alert,
1219 event_buffers,
1220 event_ref_buffers,
1221 }
1222 }
1223
1224 pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1231 if snapshot.version != SNAPSHOT_VERSION {
1232 return false;
1233 }
1234 let id_to_idx = self.build_id_to_index_map();
1235
1236 for (corr_id, groups) in snapshot.windows {
1237 if let Some(&idx) = id_to_idx.get(&corr_id) {
1238 for (gk, ws) in groups {
1239 self.state.insert((idx, gk), ws);
1240 }
1241 }
1242 }
1243
1244 for (corr_id, groups) in snapshot.last_alert {
1245 if let Some(&idx) = id_to_idx.get(&corr_id) {
1246 for (gk, ts) in groups {
1247 self.last_alert.insert((idx, gk), ts);
1248 }
1249 }
1250 }
1251
1252 for (corr_id, groups) in snapshot.event_buffers {
1253 if let Some(&idx) = id_to_idx.get(&corr_id) {
1254 for (gk, buf) in groups {
1255 self.event_buffers.insert((idx, gk), buf);
1256 }
1257 }
1258 }
1259
1260 for (corr_id, groups) in snapshot.event_ref_buffers {
1261 if let Some(&idx) = id_to_idx.get(&corr_id) {
1262 for (gk, buf) in groups {
1263 self.event_ref_buffers.insert((idx, gk), buf);
1264 }
1265 }
1266 }
1267
1268 true
1269 }
1270
1271 fn correlation_stable_id(&self, idx: usize) -> String {
1273 let corr = &self.correlations[idx];
1274 corr.id
1275 .clone()
1276 .or_else(|| corr.name.clone())
1277 .unwrap_or_else(|| corr.title.clone())
1278 }
1279
1280 fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1282 self.correlations
1283 .iter()
1284 .enumerate()
1285 .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1286 .collect()
1287 }
1288}
1289
1290impl Default for CorrelationEngine {
1291 fn default() -> Self {
1292 Self::new(CorrelationConfig::default())
1293 }
1294}
1295
1296fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1305 for field_name in timestamp_fields {
1306 if let Some(val) = event.get_field(field_name)
1307 && let Some(ts) = parse_timestamp_value(&val)
1308 {
1309 return Some(ts);
1310 }
1311 }
1312 None
1313}
1314
1315fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1317 match val {
1318 EventValue::Int(i) => Some(normalize_epoch(*i)),
1319 EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1320 EventValue::Str(s) => parse_timestamp_string(s),
1321 _ => None,
1322 }
1323}
1324
1325fn normalize_epoch(v: i64) -> i64 {
1328 if v > 1_000_000_000_000 { v / 1000 } else { v }
1329}
1330
1331fn parse_timestamp_string(s: &str) -> Option<i64> {
1333 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1335 return Some(dt.timestamp());
1336 }
1337
1338 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1341 return Some(Utc.from_utc_datetime(&naive).timestamp());
1342 }
1343 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1344 return Some(Utc.from_utc_datetime(&naive).timestamp());
1345 }
1346
1347 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1349 return Some(Utc.from_utc_datetime(&naive).timestamp());
1350 }
1351 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1352 return Some(Utc.from_utc_datetime(&naive).timestamp());
1353 }
1354
1355 None
1356}
1357
1358fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1360 match v {
1361 EventValue::Str(s) => Some(s.to_string()),
1362 EventValue::Int(n) => Some(n.to_string()),
1363 EventValue::Float(f) => Some(f.to_string()),
1364 EventValue::Bool(b) => Some(b.to_string()),
1365 EventValue::Null => Some("null".to_string()),
1366 _ => None,
1367 }
1368}
1369
1370fn composite_value_count_key(event: &impl Event, fields: &[String]) -> Option<String> {
1379 if let [field_name] = fields {
1381 let val = event.get_field(field_name)?;
1382 return value_to_string_for_count(&val);
1383 }
1384
1385 let mut parts = Vec::with_capacity(fields.len());
1386 for field_name in fields {
1387 let val = event.get_field(field_name)?;
1388 let rendered = value_to_string_for_count(&val)?;
1389 parts.push(rendered);
1390 }
1391 Some(parts.join("\u{1f}"))
1392}
1393
1394fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1396 v.as_f64()
1397}