1#[cfg(test)]
16mod tests;
17mod types;
18
19pub use types::*;
20
21use std::collections::HashMap;
22
23use chrono::{DateTime, TimeZone, Utc};
24
25use rsigma_parser::{CorrelationRule, CorrelationType, SigmaCollection, SigmaRule};
26
27use crate::correlation::{
28 CompiledCorrelation, EventBuffer, EventRefBuffer, GroupKey, WindowState, compile_correlation,
29};
30use crate::engine::Engine;
31use crate::error::{EvalError, Result};
32use crate::event::{Event, EventValue};
33use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
34use crate::result::MatchResult;
35
36const SNAPSHOT_VERSION: u32 = 1;
42
43pub struct CorrelationEngine {
48 engine: Engine,
50 correlations: Vec<CompiledCorrelation>,
52 rule_index: HashMap<String, Vec<usize>>,
55 rule_ids: Vec<(Option<String>, Option<String>)>,
58 state: HashMap<(usize, GroupKey), WindowState>,
60 last_alert: HashMap<(usize, GroupKey), i64>,
62 event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
64 event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
66 correlation_only_rules: std::collections::HashSet<String>,
70 config: CorrelationConfig,
72 pipelines: Vec<Pipeline>,
74}
75
76impl CorrelationEngine {
77 pub fn new(config: CorrelationConfig) -> Self {
79 CorrelationEngine {
80 engine: Engine::new(),
81 correlations: Vec::new(),
82 rule_index: HashMap::new(),
83 rule_ids: Vec::new(),
84 state: HashMap::new(),
85 last_alert: HashMap::new(),
86 event_buffers: HashMap::new(),
87 event_ref_buffers: HashMap::new(),
88 correlation_only_rules: std::collections::HashSet::new(),
89 config,
90 pipelines: Vec::new(),
91 }
92 }
93
94 pub fn add_pipeline(&mut self, pipeline: Pipeline) {
98 self.pipelines.push(pipeline);
99 self.pipelines.sort_by_key(|p| p.priority);
100 }
101
102 pub fn set_include_event(&mut self, include: bool) {
104 self.engine.set_include_event(include);
105 }
106
107 pub fn set_bloom_prefilter(&mut self, enabled: bool) {
111 self.engine.set_bloom_prefilter(enabled);
112 }
113
114 pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
117 self.engine.set_bloom_max_bytes(max_bytes);
118 }
119
120 #[cfg(feature = "daachorse-index")]
124 pub fn set_cross_rule_ac(&mut self, enabled: bool) {
125 self.engine.set_cross_rule_ac(enabled);
126 }
127
128 pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
134 self.config.correlation_event_mode = mode;
135 }
136
137 pub fn set_max_correlation_events(&mut self, max: usize) {
140 self.config.max_correlation_events = max;
141 }
142
143 pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
149 if self.pipelines.is_empty() {
150 self.apply_custom_attributes(&rule.custom_attributes);
151 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
152 self.engine.add_rule(rule)?;
153 } else {
154 let mut transformed = rule.clone();
155 apply_pipelines(&self.pipelines, &mut transformed)?;
156 self.apply_custom_attributes(&transformed.custom_attributes);
157 self.rule_ids
158 .push((transformed.id.clone(), transformed.name.clone()));
159 let compiled = crate::compiler::compile_rule(&transformed)?;
161 self.engine.add_compiled_rule(compiled);
162 }
163 Ok(())
164 }
165
166 fn apply_custom_attributes(
180 &mut self,
181 attrs: &std::collections::HashMap<String, yaml_serde::Value>,
182 ) {
183 if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
185 && !self.config.timestamp_fields.iter().any(|f| f == field)
186 {
187 self.config.timestamp_fields.insert(0, field.to_string());
188 }
189
190 if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
192 && self.config.suppress.is_none()
193 && let Ok(ts) = rsigma_parser::Timespan::parse(val)
194 {
195 self.config.suppress = Some(ts.seconds);
196 }
197
198 if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
200 && self.config.action_on_match == CorrelationAction::Alert
201 && let Ok(a) = val.parse::<CorrelationAction>()
202 {
203 self.config.action_on_match = a;
204 }
205 }
206
207 pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
209 let owned;
210 let effective = if self.pipelines.is_empty() {
211 corr
212 } else {
213 owned = {
214 let mut c = corr.clone();
215 apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
216 c
217 };
218 &owned
219 };
220
221 self.apply_custom_attributes(&effective.custom_attributes);
224
225 let compiled = compile_correlation(effective)?;
226 let idx = self.correlations.len();
227
228 for rule_ref in &compiled.rule_refs {
230 self.rule_index
231 .entry(rule_ref.clone())
232 .or_default()
233 .push(idx);
234 }
235
236 if !compiled.generate {
238 for rule_ref in &compiled.rule_refs {
239 self.correlation_only_rules.insert(rule_ref.clone());
240 }
241 }
242
243 self.correlations.push(compiled);
244 Ok(())
245 }
246
247 pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
256 let mut compiled_batch = Vec::with_capacity(collection.rules.len());
257 if self.pipelines.is_empty() {
258 for rule in &collection.rules {
259 self.apply_custom_attributes(&rule.custom_attributes);
260 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
261 compiled_batch.push(crate::compiler::compile_rule(rule)?);
262 }
263 } else {
264 for rule in &collection.rules {
265 let mut transformed = rule.clone();
266 apply_pipelines(&self.pipelines, &mut transformed)?;
267 self.apply_custom_attributes(&transformed.custom_attributes);
268 self.rule_ids
269 .push((transformed.id.clone(), transformed.name.clone()));
270 compiled_batch.push(crate::compiler::compile_rule(&transformed)?);
272 }
273 }
274 self.engine.extend_compiled_rules(compiled_batch);
275 for filter in &collection.filters {
277 self.engine.apply_filter(filter)?;
278 }
279 for corr in &collection.correlations {
280 self.add_correlation(corr)?;
281 }
282 self.validate_rule_refs()?;
283 self.detect_correlation_cycles()?;
284 Ok(())
285 }
286
287 fn validate_rule_refs(&self) -> Result<()> {
290 let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
291
292 for (id, name) in &self.rule_ids {
293 if let Some(id) = id {
294 known.insert(id.as_str());
295 }
296 if let Some(name) = name {
297 known.insert(name.as_str());
298 }
299 }
300 for corr in &self.correlations {
301 if let Some(ref id) = corr.id {
302 known.insert(id.as_str());
303 }
304 if let Some(ref name) = corr.name {
305 known.insert(name.as_str());
306 }
307 }
308
309 for corr in &self.correlations {
310 for rule_ref in &corr.rule_refs {
311 if !known.contains(rule_ref.as_str()) {
312 return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
313 }
314 }
315 }
316 Ok(())
317 }
318
319 fn detect_correlation_cycles(&self) -> Result<()> {
327 let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
329 for (idx, corr) in self.correlations.iter().enumerate() {
330 if let Some(ref id) = corr.id {
331 corr_identifiers.insert(id.as_str(), idx);
332 }
333 if let Some(ref name) = corr.name {
334 corr_identifiers.insert(name.as_str(), idx);
335 }
336 }
337
338 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
340 for (idx, corr) in self.correlations.iter().enumerate() {
341 for rule_ref in &corr.rule_refs {
342 if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
343 adj[idx].push(target_idx);
344 }
345 }
346 }
347
348 let mut state = vec![0u8; self.correlations.len()]; let mut path: Vec<usize> = Vec::new();
351
352 for start in 0..self.correlations.len() {
353 if state[start] == 0
354 && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
355 {
356 let names: Vec<String> = cycle
357 .iter()
358 .map(|&i| {
359 self.correlations[i]
360 .id
361 .as_deref()
362 .or(self.correlations[i].name.as_deref())
363 .unwrap_or(&self.correlations[i].title)
364 .to_string()
365 })
366 .collect();
367 return Err(crate::error::EvalError::CorrelationCycle(
368 names.join(" -> "),
369 ));
370 }
371 }
372 Ok(())
373 }
374
375 fn dfs_find_cycle(
377 node: usize,
378 adj: &[Vec<usize>],
379 state: &mut [u8],
380 path: &mut Vec<usize>,
381 ) -> Option<Vec<usize>> {
382 state[node] = 1; path.push(node);
384
385 for &next in &adj[node] {
386 if state[next] == 1 {
387 if let Some(pos) = path.iter().position(|&n| n == next) {
389 let mut cycle = path[pos..].to_vec();
390 cycle.push(next); return Some(cycle);
392 }
393 }
394 if state[next] == 0
395 && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
396 {
397 return Some(cycle);
398 }
399 }
400
401 path.pop();
402 state[node] = 2; None
404 }
405
406 pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
412 let all_detections = self.engine.evaluate(event);
413
414 let ts = match self.extract_event_timestamp(event) {
415 Some(ts) => ts,
416 None => match self.config.timestamp_fallback {
417 TimestampFallback::WallClock => Utc::now().timestamp(),
418 TimestampFallback::Skip => {
419 let detections = self.filter_detections(all_detections);
421 return ProcessResult {
422 detections,
423 correlations: Vec::new(),
424 };
425 }
426 },
427 };
428 self.process_with_detections(event, all_detections, ts)
429 }
430
431 pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
436 let all_detections = self.engine.evaluate(event);
437 self.process_with_detections(event, all_detections, timestamp_secs)
438 }
439
440 pub fn process_with_detections(
446 &mut self,
447 event: &impl Event,
448 all_detections: Vec<MatchResult>,
449 timestamp_secs: i64,
450 ) -> ProcessResult {
451 let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
452
453 if self.state.len() >= self.config.max_state_entries {
455 self.evict_all(timestamp_secs);
456 }
457
458 let mut correlations = Vec::new();
460 self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
461
462 self.chain_correlations(&correlations, timestamp_secs);
464
465 let detections = self.filter_detections(all_detections);
467
468 ProcessResult {
469 detections,
470 correlations,
471 }
472 }
473
474 pub fn evaluate(&self, event: &impl Event) -> Vec<MatchResult> {
480 self.engine.evaluate(event)
481 }
482
483 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
491 let engine = &self.engine;
494 let ts_fields = &self.config.timestamp_fields;
495
496 let batch_results: Vec<(Vec<MatchResult>, Option<i64>)> = {
497 #[cfg(feature = "parallel")]
498 {
499 use rayon::prelude::*;
500 events
501 .par_iter()
502 .map(|e| {
503 let detections = engine.evaluate(e);
504 let ts = extract_event_ts(e, ts_fields);
505 (detections, ts)
506 })
507 .collect()
508 }
509 #[cfg(not(feature = "parallel"))]
510 {
511 events
512 .iter()
513 .map(|e| {
514 let detections = engine.evaluate(e);
515 let ts = extract_event_ts(e, ts_fields);
516 (detections, ts)
517 })
518 .collect()
519 }
520 };
521
522 let mut results = Vec::with_capacity(events.len());
524 for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
525 match ts_opt {
526 Some(ts) => {
527 results.push(self.process_with_detections(event, detections, ts));
528 }
529 None => match self.config.timestamp_fallback {
530 TimestampFallback::WallClock => {
531 let ts = Utc::now().timestamp();
532 results.push(self.process_with_detections(event, detections, ts));
533 }
534 TimestampFallback::Skip => {
535 let detections = self.filter_detections(detections);
537 results.push(ProcessResult {
538 detections,
539 correlations: Vec::new(),
540 });
541 }
542 },
543 }
544 }
545 results
546 }
547
548 fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
553 if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
554 all_detections
555 .into_iter()
556 .filter(|m| {
557 let id_match = m
558 .rule_id
559 .as_ref()
560 .is_some_and(|id| self.correlation_only_rules.contains(id));
561 !id_match
562 })
563 .collect()
564 } else {
565 all_detections
566 }
567 }
568
569 fn feed_detections(
571 &mut self,
572 event: &impl Event,
573 detections: &[MatchResult],
574 ts: i64,
575 out: &mut Vec<CorrelationResult>,
576 ) {
577 let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
580
581 for det in detections {
582 let (rule_id, rule_name) = self.find_rule_identity(det);
585
586 let mut corr_indices = Vec::new();
588 if let Some(ref id) = rule_id
589 && let Some(indices) = self.rule_index.get(id)
590 {
591 corr_indices.extend(indices);
592 }
593 if let Some(ref name) = rule_name
594 && let Some(indices) = self.rule_index.get(name)
595 {
596 corr_indices.extend(indices);
597 }
598
599 corr_indices.sort_unstable();
600 corr_indices.dedup();
601
602 for &corr_idx in &corr_indices {
603 work.push((corr_idx, rule_id.clone(), rule_name.clone()));
604 }
605 }
606
607 for (corr_idx, rule_id, rule_name) in work {
608 self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
609 }
610 }
611
612 fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
614 if let Some(ref match_id) = det.rule_id {
616 for (id, name) in &self.rule_ids {
617 if id.as_deref() == Some(match_id.as_str()) {
618 return (id.clone(), name.clone());
619 }
620 }
621 }
622 (det.rule_id.clone(), None)
624 }
625
626 fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
628 let corr = &self.correlations[corr_idx];
629 corr.event_mode
630 .unwrap_or(self.config.correlation_event_mode)
631 }
632
633 fn resolve_max_events(&self, corr_idx: usize) -> usize {
635 let corr = &self.correlations[corr_idx];
636 corr.max_events
637 .unwrap_or(self.config.max_correlation_events)
638 }
639
640 fn update_correlation(
642 &mut self,
643 corr_idx: usize,
644 event: &impl Event,
645 ts: i64,
646 rule_id: &Option<String>,
647 rule_name: &Option<String>,
648 out: &mut Vec<CorrelationResult>,
649 ) {
650 let corr = &self.correlations[corr_idx];
654 let corr_type = corr.correlation_type;
655 let timespan = corr.timespan_secs;
656 let level = corr.level;
657 let suppress_secs = corr.suppress_secs.or(self.config.suppress);
658 let action = corr.action.unwrap_or(self.config.action_on_match);
659 let event_mode = self.resolve_event_mode(corr_idx);
660 let max_events = self.resolve_max_events(corr_idx);
661
662 let mut ref_strs: Vec<&str> = Vec::new();
664 if let Some(id) = rule_id.as_deref() {
665 ref_strs.push(id);
666 }
667 if let Some(name) = rule_name.as_deref() {
668 ref_strs.push(name);
669 }
670 let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
671
672 let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
674
675 let state_key = (corr_idx, group_key.clone());
677 let state = self
678 .state
679 .entry(state_key.clone())
680 .or_insert_with(|| WindowState::new_for(corr_type));
681
682 let cutoff = ts - timespan as i64;
684 state.evict(cutoff);
685
686 match corr_type {
688 CorrelationType::EventCount => {
689 state.push_event_count(ts);
690 }
691 CorrelationType::ValueCount => {
692 if let Some(ref fields) = corr.condition.field
693 && let Some(field_name) = fields.first()
694 && let Some(val) = event.get_field(field_name)
695 && let Some(s) = value_to_string_for_count(&val)
696 {
697 state.push_value_count(ts, s);
698 }
699 }
700 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
701 state.push_temporal(ts, rule_ref);
702 }
703 CorrelationType::ValueSum
704 | CorrelationType::ValueAvg
705 | CorrelationType::ValuePercentile
706 | CorrelationType::ValueMedian => {
707 if let Some(ref fields) = corr.condition.field
708 && let Some(field_name) = fields.first()
709 && let Some(val) = event.get_field(field_name)
710 && let Some(n) = value_to_f64_ev(&val)
711 {
712 state.push_numeric(ts, n);
713 }
714 }
715 }
716
717 match event_mode {
719 CorrelationEventMode::Full => {
720 let buf = self
721 .event_buffers
722 .entry(state_key.clone())
723 .or_insert_with(|| EventBuffer::new(max_events));
724 buf.evict(cutoff);
725 let json = event.to_json();
726 buf.push(ts, &json);
727 }
728 CorrelationEventMode::Refs => {
729 let buf = self
730 .event_ref_buffers
731 .entry(state_key.clone())
732 .or_insert_with(|| EventRefBuffer::new(max_events));
733 buf.evict(cutoff);
734 let json = event.to_json();
735 buf.push(ts, &json);
736 }
737 CorrelationEventMode::None => {}
738 }
739
740 let fired = state.check_condition(
742 &corr.condition,
743 corr_type,
744 &corr.rule_refs,
745 corr.extended_expr.as_ref(),
746 );
747
748 if let Some(agg_value) = fired {
749 let alert_key = (corr_idx, group_key.clone());
750
751 let suppressed = if let Some(suppress) = suppress_secs {
753 if let Some(&last_ts) = self.last_alert.get(&alert_key) {
754 (ts - last_ts) < suppress as i64
755 } else {
756 false
757 }
758 } else {
759 false
760 };
761
762 if !suppressed {
763 let (events, event_refs) = match event_mode {
765 CorrelationEventMode::Full => {
766 let stored = self
767 .event_buffers
768 .get(&alert_key)
769 .map(|buf| buf.decompress_all())
770 .unwrap_or_default();
771 (Some(stored), None)
772 }
773 CorrelationEventMode::Refs => {
774 let stored = self
775 .event_ref_buffers
776 .get(&alert_key)
777 .map(|buf| buf.refs())
778 .unwrap_or_default();
779 (None, Some(stored))
780 }
781 CorrelationEventMode::None => (None, None),
782 };
783
784 let corr = &self.correlations[corr_idx];
786 let result = CorrelationResult {
787 rule_title: corr.title.clone(),
788 rule_id: corr.id.clone(),
789 level,
790 tags: corr.tags.clone(),
791 correlation_type: corr_type,
792 group_key: group_key.to_pairs(&corr.group_by),
793 aggregated_value: agg_value,
794 timespan_secs: timespan,
795 events,
796 event_refs,
797 custom_attributes: corr.custom_attributes.clone(),
798 };
799 out.push(result);
800
801 self.last_alert.insert(alert_key.clone(), ts);
803
804 if action == CorrelationAction::Reset {
806 if let Some(state) = self.state.get_mut(&alert_key) {
807 state.clear();
808 }
809 if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
810 buf.clear();
811 }
812 if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
813 buf.clear();
814 }
815 }
816 }
817 }
818 }
819
820 fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
825 const MAX_CHAIN_DEPTH: usize = 10;
826 let mut pending: Vec<CorrelationResult> = fired.to_vec();
827 let mut depth = 0;
828
829 while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
830 depth += 1;
831
832 #[allow(clippy::type_complexity)]
834 let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
835 for result in &pending {
836 if let Some(ref id) = result.rule_id
837 && let Some(indices) = self.rule_index.get(id)
838 {
839 let fired_ref = result
840 .rule_id
841 .as_deref()
842 .unwrap_or(&result.rule_title)
843 .to_string();
844 for &corr_idx in indices {
845 work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
846 }
847 }
848 }
849
850 let mut next_pending = Vec::new();
851 for (corr_idx, group_key_pairs, fired_ref) in work {
852 let corr = &self.correlations[corr_idx];
853 let corr_type = corr.correlation_type;
854 let timespan = corr.timespan_secs;
855 let level = corr.level;
856
857 let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
858 let state_key = (corr_idx, group_key.clone());
859 let state = self
860 .state
861 .entry(state_key)
862 .or_insert_with(|| WindowState::new_for(corr_type));
863
864 let cutoff = ts - timespan as i64;
865 state.evict(cutoff);
866
867 match corr_type {
868 CorrelationType::EventCount => {
869 state.push_event_count(ts);
870 }
871 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
872 state.push_temporal(ts, &fired_ref);
873 }
874 _ => {
875 state.push_event_count(ts);
876 }
877 }
878
879 let fired = state.check_condition(
880 &corr.condition,
881 corr_type,
882 &corr.rule_refs,
883 corr.extended_expr.as_ref(),
884 );
885
886 if let Some(agg_value) = fired {
887 let corr = &self.correlations[corr_idx];
888 next_pending.push(CorrelationResult {
889 rule_title: corr.title.clone(),
890 rule_id: corr.id.clone(),
891 level,
892 tags: corr.tags.clone(),
893 correlation_type: corr_type,
894 group_key: group_key.to_pairs(&corr.group_by),
895 aggregated_value: agg_value,
896 timespan_secs: timespan,
897 events: None,
900 event_refs: None,
901 custom_attributes: corr.custom_attributes.clone(),
902 });
903 }
904 }
905
906 pending = next_pending;
907 }
908
909 if !pending.is_empty() {
910 log::warn!(
911 "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
912 {} pending result(s) were not propagated further. \
913 This may indicate a cycle in correlation references.",
914 pending.len()
915 );
916 }
917 }
918
919 fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
931 for field_name in &self.config.timestamp_fields {
932 if let Some(val) = event.get_field(field_name)
933 && let Some(ts) = parse_timestamp_value(&val)
934 {
935 return Some(ts);
936 }
937 }
938 None
939 }
940
941 pub fn evict_expired(&mut self, now_secs: i64) {
947 self.evict_all(now_secs);
948 }
949
950 fn evict_all(&mut self, now_secs: i64) {
952 let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
954
955 self.state.retain(|&(corr_idx, _), state| {
956 if corr_idx < timespans.len() {
957 let cutoff = now_secs - timespans[corr_idx] as i64;
958 state.evict(cutoff);
959 }
960 !state.is_empty()
961 });
962
963 self.event_buffers.retain(|&(corr_idx, _), buf| {
965 if corr_idx < timespans.len() {
966 let cutoff = now_secs - timespans[corr_idx] as i64;
967 buf.evict(cutoff);
968 }
969 !buf.is_empty()
970 });
971 self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
972 if corr_idx < timespans.len() {
973 let cutoff = now_secs - timespans[corr_idx] as i64;
974 buf.evict(cutoff);
975 }
976 !buf.is_empty()
977 });
978
979 if self.state.len() >= self.config.max_state_entries {
983 let target = self.config.max_state_entries * 9 / 10;
984 let excess = self.state.len() - target;
985
986 log::warn!(
987 "Correlation state hard cap reached ({} entries, max {}); \
988 evicting {} stalest entries to {} (90% capacity). \
989 This indicates high-cardinality traffic; consider raising \
990 max_state_entries or shortening correlation windows.",
991 self.state.len(),
992 self.config.max_state_entries,
993 excess,
994 target,
995 );
996
997 let mut by_staleness: Vec<_> = self
999 .state
1000 .iter()
1001 .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1002 .collect();
1003 by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1004
1005 for (key, _) in by_staleness.into_iter().take(excess) {
1007 self.state.remove(&key);
1008 self.last_alert.remove(&key);
1009 self.event_buffers.remove(&key);
1010 self.event_ref_buffers.remove(&key);
1011 }
1012 }
1013
1014 self.last_alert.retain(|key, &mut alert_ts| {
1017 let suppress = if key.0 < self.correlations.len() {
1018 self.correlations[key.0]
1019 .suppress_secs
1020 .or(self.config.suppress)
1021 .unwrap_or(0)
1022 } else {
1023 0
1024 };
1025 (now_secs - alert_ts) < suppress as i64
1026 });
1027 }
1028
1029 pub fn state_count(&self) -> usize {
1031 self.state.len()
1032 }
1033
1034 pub fn detection_rule_count(&self) -> usize {
1036 self.engine.rule_count()
1037 }
1038
1039 pub fn correlation_rule_count(&self) -> usize {
1041 self.correlations.len()
1042 }
1043
1044 pub fn event_buffer_count(&self) -> usize {
1046 self.event_buffers.len()
1047 }
1048
1049 pub fn event_buffer_bytes(&self) -> usize {
1051 self.event_buffers
1052 .values()
1053 .map(|b| b.compressed_bytes())
1054 .sum()
1055 }
1056
1057 pub fn event_ref_buffer_count(&self) -> usize {
1059 self.event_ref_buffers.len()
1060 }
1061
1062 pub fn engine(&self) -> &Engine {
1064 &self.engine
1065 }
1066
1067 pub fn export_state(&self) -> CorrelationSnapshot {
1073 let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1074 for ((idx, gk), ws) in &self.state {
1075 let corr_id = self.correlation_stable_id(*idx);
1076 windows
1077 .entry(corr_id)
1078 .or_default()
1079 .push((gk.clone(), ws.clone()));
1080 }
1081
1082 let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1083 for ((idx, gk), ts) in &self.last_alert {
1084 let corr_id = self.correlation_stable_id(*idx);
1085 last_alert
1086 .entry(corr_id)
1087 .or_default()
1088 .push((gk.clone(), *ts));
1089 }
1090
1091 let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1092 for ((idx, gk), buf) in &self.event_buffers {
1093 let corr_id = self.correlation_stable_id(*idx);
1094 event_buffers
1095 .entry(corr_id)
1096 .or_default()
1097 .push((gk.clone(), buf.clone()));
1098 }
1099
1100 let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1101 HashMap::new();
1102 for ((idx, gk), buf) in &self.event_ref_buffers {
1103 let corr_id = self.correlation_stable_id(*idx);
1104 event_ref_buffers
1105 .entry(corr_id)
1106 .or_default()
1107 .push((gk.clone(), buf.clone()));
1108 }
1109
1110 CorrelationSnapshot {
1111 version: SNAPSHOT_VERSION,
1112 windows,
1113 last_alert,
1114 event_buffers,
1115 event_ref_buffers,
1116 }
1117 }
1118
1119 pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1126 if snapshot.version != SNAPSHOT_VERSION {
1127 return false;
1128 }
1129 let id_to_idx = self.build_id_to_index_map();
1130
1131 for (corr_id, groups) in snapshot.windows {
1132 if let Some(&idx) = id_to_idx.get(&corr_id) {
1133 for (gk, ws) in groups {
1134 self.state.insert((idx, gk), ws);
1135 }
1136 }
1137 }
1138
1139 for (corr_id, groups) in snapshot.last_alert {
1140 if let Some(&idx) = id_to_idx.get(&corr_id) {
1141 for (gk, ts) in groups {
1142 self.last_alert.insert((idx, gk), ts);
1143 }
1144 }
1145 }
1146
1147 for (corr_id, groups) in snapshot.event_buffers {
1148 if let Some(&idx) = id_to_idx.get(&corr_id) {
1149 for (gk, buf) in groups {
1150 self.event_buffers.insert((idx, gk), buf);
1151 }
1152 }
1153 }
1154
1155 for (corr_id, groups) in snapshot.event_ref_buffers {
1156 if let Some(&idx) = id_to_idx.get(&corr_id) {
1157 for (gk, buf) in groups {
1158 self.event_ref_buffers.insert((idx, gk), buf);
1159 }
1160 }
1161 }
1162
1163 true
1164 }
1165
1166 fn correlation_stable_id(&self, idx: usize) -> String {
1168 let corr = &self.correlations[idx];
1169 corr.id
1170 .clone()
1171 .or_else(|| corr.name.clone())
1172 .unwrap_or_else(|| corr.title.clone())
1173 }
1174
1175 fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1177 self.correlations
1178 .iter()
1179 .enumerate()
1180 .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1181 .collect()
1182 }
1183}
1184
1185impl Default for CorrelationEngine {
1186 fn default() -> Self {
1187 Self::new(CorrelationConfig::default())
1188 }
1189}
1190
1191fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1200 for field_name in timestamp_fields {
1201 if let Some(val) = event.get_field(field_name)
1202 && let Some(ts) = parse_timestamp_value(&val)
1203 {
1204 return Some(ts);
1205 }
1206 }
1207 None
1208}
1209
1210fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1212 match val {
1213 EventValue::Int(i) => Some(normalize_epoch(*i)),
1214 EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1215 EventValue::Str(s) => parse_timestamp_string(s),
1216 _ => None,
1217 }
1218}
1219
1220fn normalize_epoch(v: i64) -> i64 {
1223 if v > 1_000_000_000_000 { v / 1000 } else { v }
1224}
1225
1226fn parse_timestamp_string(s: &str) -> Option<i64> {
1228 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1230 return Some(dt.timestamp());
1231 }
1232
1233 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1236 return Some(Utc.from_utc_datetime(&naive).timestamp());
1237 }
1238 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1239 return Some(Utc.from_utc_datetime(&naive).timestamp());
1240 }
1241
1242 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1244 return Some(Utc.from_utc_datetime(&naive).timestamp());
1245 }
1246 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1247 return Some(Utc.from_utc_datetime(&naive).timestamp());
1248 }
1249
1250 None
1251}
1252
1253fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1255 match v {
1256 EventValue::Str(s) => Some(s.to_string()),
1257 EventValue::Int(n) => Some(n.to_string()),
1258 EventValue::Float(f) => Some(f.to_string()),
1259 EventValue::Bool(b) => Some(b.to_string()),
1260 EventValue::Null => Some("null".to_string()),
1261 _ => None,
1262 }
1263}
1264
1265fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1267 v.as_f64()
1268}