1#[cfg(test)]
16mod tests;
17mod types;
18
19pub use types::*;
20
21use std::collections::HashMap;
22
23use chrono::{DateTime, TimeZone, Utc};
24
25use rsigma_parser::{CorrelationRule, CorrelationType, SigmaCollection, SigmaRule};
26
27use crate::correlation::{
28 CompiledCorrelation, EventBuffer, EventRefBuffer, GroupKey, WindowState, compile_correlation,
29};
30use crate::engine::Engine;
31use crate::error::{EvalError, Result};
32use crate::event::{Event, EventValue};
33use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
34use crate::result::{CorrelationBody, EvaluationResult, ResultBody, RuleHeader};
35
36const SNAPSHOT_VERSION: u32 = 1;
42
43pub struct CorrelationEngine {
48 engine: Engine,
50 correlations: Vec<CompiledCorrelation>,
52 rule_index: HashMap<String, Vec<usize>>,
55 rule_ids: Vec<(Option<String>, Option<String>)>,
58 state: HashMap<(usize, GroupKey), WindowState>,
60 last_alert: HashMap<(usize, GroupKey), i64>,
62 event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
64 event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
66 correlation_only_rules: std::collections::HashSet<String>,
70 config: CorrelationConfig,
72 pipelines: Vec<Pipeline>,
74}
75
76impl CorrelationEngine {
77 pub fn new(config: CorrelationConfig) -> Self {
79 CorrelationEngine {
80 engine: Engine::new(),
81 correlations: Vec::new(),
82 rule_index: HashMap::new(),
83 rule_ids: Vec::new(),
84 state: HashMap::new(),
85 last_alert: HashMap::new(),
86 event_buffers: HashMap::new(),
87 event_ref_buffers: HashMap::new(),
88 correlation_only_rules: std::collections::HashSet::new(),
89 config,
90 pipelines: Vec::new(),
91 }
92 }
93
94 pub fn add_pipeline(&mut self, pipeline: Pipeline) {
98 self.pipelines.push(pipeline);
99 self.pipelines.sort_by_key(|p| p.priority);
100 }
101
102 pub fn set_include_event(&mut self, include: bool) {
104 self.engine.set_include_event(include);
105 }
106
107 pub fn set_bloom_prefilter(&mut self, enabled: bool) {
111 self.engine.set_bloom_prefilter(enabled);
112 }
113
114 pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
117 self.engine.set_bloom_max_bytes(max_bytes);
118 }
119
120 #[cfg(feature = "daachorse-index")]
124 pub fn set_cross_rule_ac(&mut self, enabled: bool) {
125 self.engine.set_cross_rule_ac(enabled);
126 }
127
128 pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
134 self.config.correlation_event_mode = mode;
135 }
136
137 pub fn set_max_correlation_events(&mut self, max: usize) {
140 self.config.max_correlation_events = max;
141 }
142
143 pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
149 if self.pipelines.is_empty() {
150 self.apply_custom_attributes(&rule.custom_attributes);
151 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
152 self.engine.add_rule(rule)?;
153 } else {
154 let mut transformed = rule.clone();
155 apply_pipelines(&self.pipelines, &mut transformed)?;
156 self.apply_custom_attributes(&transformed.custom_attributes);
157 self.rule_ids
158 .push((transformed.id.clone(), transformed.name.clone()));
159 let compiled = crate::compiler::compile_rule(&transformed)?;
161 self.engine.add_compiled_rule(compiled);
162 }
163 Ok(())
164 }
165
166 fn apply_custom_attributes(
180 &mut self,
181 attrs: &std::collections::HashMap<String, yaml_serde::Value>,
182 ) {
183 if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
185 && !self.config.timestamp_fields.iter().any(|f| f == field)
186 {
187 self.config.timestamp_fields.insert(0, field.to_string());
188 }
189
190 if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
192 && self.config.suppress.is_none()
193 && let Ok(ts) = rsigma_parser::Timespan::parse(val)
194 {
195 self.config.suppress = Some(ts.seconds);
196 }
197
198 if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
200 && self.config.action_on_match == CorrelationAction::Alert
201 && let Ok(a) = val.parse::<CorrelationAction>()
202 {
203 self.config.action_on_match = a;
204 }
205 }
206
207 pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
209 let owned;
210 let effective = if self.pipelines.is_empty() {
211 corr
212 } else {
213 owned = {
214 let mut c = corr.clone();
215 apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
216 c
217 };
218 &owned
219 };
220
221 self.apply_custom_attributes(&effective.custom_attributes);
224
225 let compiled = compile_correlation(effective)?;
226 let idx = self.correlations.len();
227
228 for rule_ref in &compiled.rule_refs {
230 self.rule_index
231 .entry(rule_ref.clone())
232 .or_default()
233 .push(idx);
234 }
235
236 if !compiled.generate {
238 for rule_ref in &compiled.rule_refs {
239 self.correlation_only_rules.insert(rule_ref.clone());
240 }
241 }
242
243 self.correlations.push(compiled);
244 Ok(())
245 }
246
247 pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
256 let mut compiled_batch = Vec::with_capacity(collection.rules.len());
257 if self.pipelines.is_empty() {
258 for rule in &collection.rules {
259 self.apply_custom_attributes(&rule.custom_attributes);
260 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
261 compiled_batch.push(crate::compiler::compile_rule(rule)?);
262 }
263 } else {
264 for rule in &collection.rules {
265 let mut transformed = rule.clone();
266 apply_pipelines(&self.pipelines, &mut transformed)?;
267 self.apply_custom_attributes(&transformed.custom_attributes);
268 self.rule_ids
269 .push((transformed.id.clone(), transformed.name.clone()));
270 compiled_batch.push(crate::compiler::compile_rule(&transformed)?);
272 }
273 }
274 self.engine.extend_compiled_rules(compiled_batch);
275 for filter in &collection.filters {
277 self.engine.apply_filter(filter)?;
278 }
279 for corr in &collection.correlations {
280 self.add_correlation(corr)?;
281 }
282 self.validate_rule_refs()?;
283 self.detect_correlation_cycles()?;
284 Ok(())
285 }
286
287 fn validate_rule_refs(&self) -> Result<()> {
290 let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
291
292 for (id, name) in &self.rule_ids {
293 if let Some(id) = id {
294 known.insert(id.as_str());
295 }
296 if let Some(name) = name {
297 known.insert(name.as_str());
298 }
299 }
300 for corr in &self.correlations {
301 if let Some(ref id) = corr.id {
302 known.insert(id.as_str());
303 }
304 if let Some(ref name) = corr.name {
305 known.insert(name.as_str());
306 }
307 }
308
309 for corr in &self.correlations {
310 for rule_ref in &corr.rule_refs {
311 if !known.contains(rule_ref.as_str()) {
312 return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
313 }
314 }
315 }
316 Ok(())
317 }
318
319 fn detect_correlation_cycles(&self) -> Result<()> {
327 let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
329 for (idx, corr) in self.correlations.iter().enumerate() {
330 if let Some(ref id) = corr.id {
331 corr_identifiers.insert(id.as_str(), idx);
332 }
333 if let Some(ref name) = corr.name {
334 corr_identifiers.insert(name.as_str(), idx);
335 }
336 }
337
338 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
340 for (idx, corr) in self.correlations.iter().enumerate() {
341 for rule_ref in &corr.rule_refs {
342 if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
343 adj[idx].push(target_idx);
344 }
345 }
346 }
347
348 let mut state = vec![0u8; self.correlations.len()]; let mut path: Vec<usize> = Vec::new();
351
352 for start in 0..self.correlations.len() {
353 if state[start] == 0
354 && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
355 {
356 let names: Vec<String> = cycle
357 .iter()
358 .map(|&i| {
359 self.correlations[i]
360 .id
361 .as_deref()
362 .or(self.correlations[i].name.as_deref())
363 .unwrap_or(&self.correlations[i].title)
364 .to_string()
365 })
366 .collect();
367 return Err(crate::error::EvalError::CorrelationCycle(
368 names.join(" -> "),
369 ));
370 }
371 }
372 Ok(())
373 }
374
375 fn dfs_find_cycle(
377 node: usize,
378 adj: &[Vec<usize>],
379 state: &mut [u8],
380 path: &mut Vec<usize>,
381 ) -> Option<Vec<usize>> {
382 state[node] = 1; path.push(node);
384
385 for &next in &adj[node] {
386 if state[next] == 1 {
387 if let Some(pos) = path.iter().position(|&n| n == next) {
389 let mut cycle = path[pos..].to_vec();
390 cycle.push(next); return Some(cycle);
392 }
393 }
394 if state[next] == 0
395 && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
396 {
397 return Some(cycle);
398 }
399 }
400
401 path.pop();
402 state[node] = 2; None
404 }
405
406 pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
412 let all_detections = self.engine.evaluate(event);
413
414 let ts = match self.extract_event_timestamp(event) {
415 Some(ts) => ts,
416 None => match self.config.timestamp_fallback {
417 TimestampFallback::WallClock => Utc::now().timestamp(),
418 TimestampFallback::Skip => {
419 return self.filter_detections(all_detections);
421 }
422 },
423 };
424 self.process_with_detections(event, all_detections, ts)
425 }
426
427 pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
432 let all_detections = self.engine.evaluate(event);
433 self.process_with_detections(event, all_detections, timestamp_secs)
434 }
435
436 pub fn process_with_detections(
442 &mut self,
443 event: &impl Event,
444 all_detections: Vec<EvaluationResult>,
445 timestamp_secs: i64,
446 ) -> ProcessResult {
447 let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
448
449 if self.state.len() >= self.config.max_state_entries {
451 self.evict_all(timestamp_secs);
452 }
453
454 let mut correlations: Vec<EvaluationResult> = Vec::new();
456 self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
457
458 self.chain_correlations(&correlations, timestamp_secs);
460
461 let mut out = self.filter_detections(all_detections);
463 out.extend(correlations);
464 out
465 }
466
467 pub fn evaluate(&self, event: &impl Event) -> Vec<EvaluationResult> {
474 self.engine.evaluate(event)
475 }
476
477 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
485 let engine = &self.engine;
488 let ts_fields = &self.config.timestamp_fields;
489
490 let batch_results: Vec<(Vec<EvaluationResult>, Option<i64>)> = {
491 #[cfg(feature = "parallel")]
492 {
493 use rayon::prelude::*;
494 events
495 .par_iter()
496 .map(|e| {
497 let detections = engine.evaluate(e);
498 let ts = extract_event_ts(e, ts_fields);
499 (detections, ts)
500 })
501 .collect()
502 }
503 #[cfg(not(feature = "parallel"))]
504 {
505 events
506 .iter()
507 .map(|e| {
508 let detections = engine.evaluate(e);
509 let ts = extract_event_ts(e, ts_fields);
510 (detections, ts)
511 })
512 .collect()
513 }
514 };
515
516 let mut results = Vec::with_capacity(events.len());
518 for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
519 match ts_opt {
520 Some(ts) => {
521 results.push(self.process_with_detections(event, detections, ts));
522 }
523 None => match self.config.timestamp_fallback {
524 TimestampFallback::WallClock => {
525 let ts = Utc::now().timestamp();
526 results.push(self.process_with_detections(event, detections, ts));
527 }
528 TimestampFallback::Skip => {
529 results.push(self.filter_detections(detections));
531 }
532 },
533 }
534 }
535 results
536 }
537
538 fn filter_detections(&self, all_detections: Vec<EvaluationResult>) -> Vec<EvaluationResult> {
543 if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
544 all_detections
545 .into_iter()
546 .filter(|m| {
547 let id_match = m
548 .header
549 .rule_id
550 .as_ref()
551 .is_some_and(|id| self.correlation_only_rules.contains(id));
552 !id_match
553 })
554 .collect()
555 } else {
556 all_detections
557 }
558 }
559
560 fn feed_detections(
562 &mut self,
563 event: &impl Event,
564 detections: &[EvaluationResult],
565 ts: i64,
566 out: &mut Vec<EvaluationResult>,
567 ) {
568 let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
571
572 for det in detections {
573 let (rule_id, rule_name) = self.find_rule_identity(det);
576
577 let mut corr_indices = Vec::new();
579 if let Some(ref id) = rule_id
580 && let Some(indices) = self.rule_index.get(id)
581 {
582 corr_indices.extend(indices);
583 }
584 if let Some(ref name) = rule_name
585 && let Some(indices) = self.rule_index.get(name)
586 {
587 corr_indices.extend(indices);
588 }
589
590 corr_indices.sort_unstable();
591 corr_indices.dedup();
592
593 for &corr_idx in &corr_indices {
594 work.push((corr_idx, rule_id.clone(), rule_name.clone()));
595 }
596 }
597
598 for (corr_idx, rule_id, rule_name) in work {
599 self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
600 }
601 }
602
603 fn find_rule_identity(&self, det: &EvaluationResult) -> (Option<String>, Option<String>) {
605 if let Some(ref match_id) = det.header.rule_id {
607 for (id, name) in &self.rule_ids {
608 if id.as_deref() == Some(match_id.as_str()) {
609 return (id.clone(), name.clone());
610 }
611 }
612 }
613 (det.header.rule_id.clone(), None)
615 }
616
617 fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
619 let corr = &self.correlations[corr_idx];
620 corr.event_mode
621 .unwrap_or(self.config.correlation_event_mode)
622 }
623
624 fn resolve_max_events(&self, corr_idx: usize) -> usize {
626 let corr = &self.correlations[corr_idx];
627 corr.max_events
628 .unwrap_or(self.config.max_correlation_events)
629 }
630
631 fn update_correlation(
633 &mut self,
634 corr_idx: usize,
635 event: &impl Event,
636 ts: i64,
637 rule_id: &Option<String>,
638 rule_name: &Option<String>,
639 out: &mut Vec<EvaluationResult>,
640 ) {
641 let corr = &self.correlations[corr_idx];
645 let corr_type = corr.correlation_type;
646 let timespan = corr.timespan_secs;
647 let level = corr.level;
648 let suppress_secs = corr.suppress_secs.or(self.config.suppress);
649 let action = corr.action.unwrap_or(self.config.action_on_match);
650 let event_mode = self.resolve_event_mode(corr_idx);
651 let max_events = self.resolve_max_events(corr_idx);
652
653 let mut ref_strs: Vec<&str> = Vec::new();
655 if let Some(id) = rule_id.as_deref() {
656 ref_strs.push(id);
657 }
658 if let Some(name) = rule_name.as_deref() {
659 ref_strs.push(name);
660 }
661 let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
662
663 let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
665
666 let state_key = (corr_idx, group_key.clone());
668 let state = self
669 .state
670 .entry(state_key.clone())
671 .or_insert_with(|| WindowState::new_for(corr_type));
672
673 let cutoff = ts - timespan as i64;
675 state.evict(cutoff);
676
677 match corr_type {
679 CorrelationType::EventCount => {
680 state.push_event_count(ts);
681 }
682 CorrelationType::ValueCount => {
683 if let Some(ref fields) = corr.condition.field
684 && let Some(key) = composite_value_count_key(event, fields)
685 {
686 state.push_value_count(ts, key);
687 }
688 }
689 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
690 state.push_temporal(ts, rule_ref);
691 }
692 CorrelationType::ValueSum
693 | CorrelationType::ValueAvg
694 | CorrelationType::ValuePercentile
695 | CorrelationType::ValueMedian => {
696 if let Some(ref fields) = corr.condition.field
697 && let Some(field_name) = fields.first()
698 && let Some(val) = event.get_field(field_name)
699 && let Some(n) = value_to_f64_ev(&val)
700 {
701 state.push_numeric(ts, n);
702 }
703 }
704 }
705
706 match event_mode {
708 CorrelationEventMode::Full => {
709 let buf = self
710 .event_buffers
711 .entry(state_key.clone())
712 .or_insert_with(|| EventBuffer::new(max_events));
713 buf.evict(cutoff);
714 let json = event.to_json();
715 buf.push(ts, &json);
716 }
717 CorrelationEventMode::Refs => {
718 let buf = self
719 .event_ref_buffers
720 .entry(state_key.clone())
721 .or_insert_with(|| EventRefBuffer::new(max_events));
722 buf.evict(cutoff);
723 let json = event.to_json();
724 buf.push(ts, &json);
725 }
726 CorrelationEventMode::None => {}
727 }
728
729 let fired = state.check_condition(
731 &corr.condition,
732 corr_type,
733 &corr.rule_refs,
734 corr.extended_expr.as_ref(),
735 );
736
737 if let Some(agg_value) = fired {
738 let alert_key = (corr_idx, group_key.clone());
739
740 let suppressed = if let Some(suppress) = suppress_secs {
742 if let Some(&last_ts) = self.last_alert.get(&alert_key) {
743 (ts - last_ts) < suppress as i64
744 } else {
745 false
746 }
747 } else {
748 false
749 };
750
751 if !suppressed {
752 let (events, event_refs) = match event_mode {
754 CorrelationEventMode::Full => {
755 let stored = self
756 .event_buffers
757 .get(&alert_key)
758 .map(|buf| buf.decompress_all())
759 .unwrap_or_default();
760 (Some(stored), None)
761 }
762 CorrelationEventMode::Refs => {
763 let stored = self
764 .event_ref_buffers
765 .get(&alert_key)
766 .map(|buf| buf.refs())
767 .unwrap_or_default();
768 (None, Some(stored))
769 }
770 CorrelationEventMode::None => (None, None),
771 };
772
773 let corr = &self.correlations[corr_idx];
775 let result = EvaluationResult {
776 header: RuleHeader {
777 rule_title: corr.title.clone(),
778 rule_id: corr.id.clone(),
779 level,
780 tags: corr.tags.clone(),
781 custom_attributes: corr.custom_attributes.clone(),
782 enrichments: None,
783 },
784 body: ResultBody::Correlation(CorrelationBody {
785 correlation_type: corr_type,
786 group_key: group_key.to_pairs(&corr.group_by),
787 aggregated_value: agg_value,
788 timespan_secs: timespan,
789 events,
790 event_refs,
791 }),
792 };
793 out.push(result);
794
795 self.last_alert.insert(alert_key.clone(), ts);
797
798 if action == CorrelationAction::Reset {
800 if let Some(state) = self.state.get_mut(&alert_key) {
801 state.clear();
802 }
803 if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
804 buf.clear();
805 }
806 if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
807 buf.clear();
808 }
809 }
810 }
811 }
812 }
813
814 fn chain_correlations(&mut self, fired: &[EvaluationResult], ts: i64) {
819 const MAX_CHAIN_DEPTH: usize = 10;
820 let mut pending: Vec<EvaluationResult> = fired.to_vec();
821 let mut depth = 0;
822
823 while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
824 depth += 1;
825
826 #[allow(clippy::type_complexity)]
828 let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
829 for result in &pending {
830 let Some(body) = result.as_correlation() else {
832 continue;
833 };
834 if let Some(ref id) = result.header.rule_id
835 && let Some(indices) = self.rule_index.get(id)
836 {
837 let fired_ref = result
838 .header
839 .rule_id
840 .as_deref()
841 .unwrap_or(&result.header.rule_title)
842 .to_string();
843 for &corr_idx in indices {
844 work.push((corr_idx, body.group_key.clone(), fired_ref.clone()));
845 }
846 }
847 }
848
849 let mut next_pending = Vec::new();
850 for (corr_idx, group_key_pairs, fired_ref) in work {
851 let corr = &self.correlations[corr_idx];
852 let corr_type = corr.correlation_type;
853 let timespan = corr.timespan_secs;
854 let level = corr.level;
855
856 let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
857 let state_key = (corr_idx, group_key.clone());
858 let state = self
859 .state
860 .entry(state_key)
861 .or_insert_with(|| WindowState::new_for(corr_type));
862
863 let cutoff = ts - timespan as i64;
864 state.evict(cutoff);
865
866 match corr_type {
867 CorrelationType::EventCount => {
868 state.push_event_count(ts);
869 }
870 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
871 state.push_temporal(ts, &fired_ref);
872 }
873 _ => {
874 state.push_event_count(ts);
875 }
876 }
877
878 let fired = state.check_condition(
879 &corr.condition,
880 corr_type,
881 &corr.rule_refs,
882 corr.extended_expr.as_ref(),
883 );
884
885 if let Some(agg_value) = fired {
886 let corr = &self.correlations[corr_idx];
887 next_pending.push(EvaluationResult {
888 header: RuleHeader {
889 rule_title: corr.title.clone(),
890 rule_id: corr.id.clone(),
891 level,
892 tags: corr.tags.clone(),
893 custom_attributes: corr.custom_attributes.clone(),
894 enrichments: None,
895 },
896 body: ResultBody::Correlation(CorrelationBody {
897 correlation_type: corr_type,
898 group_key: group_key.to_pairs(&corr.group_by),
899 aggregated_value: agg_value,
900 timespan_secs: timespan,
901 events: None,
905 event_refs: None,
906 }),
907 });
908 }
909 }
910
911 pending = next_pending;
912 }
913
914 if !pending.is_empty() {
915 log::warn!(
916 "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
917 {} pending result(s) were not propagated further. \
918 This may indicate a cycle in correlation references.",
919 pending.len()
920 );
921 }
922 }
923
924 fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
936 for field_name in &self.config.timestamp_fields {
937 if let Some(val) = event.get_field(field_name)
938 && let Some(ts) = parse_timestamp_value(&val)
939 {
940 return Some(ts);
941 }
942 }
943 None
944 }
945
946 pub fn evict_expired(&mut self, now_secs: i64) {
952 self.evict_all(now_secs);
953 }
954
955 fn evict_all(&mut self, now_secs: i64) {
957 let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
959
960 self.state.retain(|&(corr_idx, _), state| {
961 if corr_idx < timespans.len() {
962 let cutoff = now_secs - timespans[corr_idx] as i64;
963 state.evict(cutoff);
964 }
965 !state.is_empty()
966 });
967
968 self.event_buffers.retain(|&(corr_idx, _), buf| {
970 if corr_idx < timespans.len() {
971 let cutoff = now_secs - timespans[corr_idx] as i64;
972 buf.evict(cutoff);
973 }
974 !buf.is_empty()
975 });
976 self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
977 if corr_idx < timespans.len() {
978 let cutoff = now_secs - timespans[corr_idx] as i64;
979 buf.evict(cutoff);
980 }
981 !buf.is_empty()
982 });
983
984 if self.state.len() >= self.config.max_state_entries {
988 let target = self.config.max_state_entries * 9 / 10;
989 let excess = self.state.len() - target;
990
991 log::warn!(
992 "Correlation state hard cap reached ({} entries, max {}); \
993 evicting {} stalest entries to {} (90% capacity). \
994 This indicates high-cardinality traffic; consider raising \
995 max_state_entries or shortening correlation windows.",
996 self.state.len(),
997 self.config.max_state_entries,
998 excess,
999 target,
1000 );
1001
1002 let mut by_staleness: Vec<_> = self
1004 .state
1005 .iter()
1006 .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1007 .collect();
1008 by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1009
1010 for (key, _) in by_staleness.into_iter().take(excess) {
1012 self.state.remove(&key);
1013 self.last_alert.remove(&key);
1014 self.event_buffers.remove(&key);
1015 self.event_ref_buffers.remove(&key);
1016 }
1017 }
1018
1019 self.last_alert.retain(|key, &mut alert_ts| {
1022 let suppress = if key.0 < self.correlations.len() {
1023 self.correlations[key.0]
1024 .suppress_secs
1025 .or(self.config.suppress)
1026 .unwrap_or(0)
1027 } else {
1028 0
1029 };
1030 (now_secs - alert_ts) < suppress as i64
1031 });
1032 }
1033
1034 pub fn state_count(&self) -> usize {
1036 self.state.len()
1037 }
1038
1039 pub fn detection_rule_count(&self) -> usize {
1041 self.engine.rule_count()
1042 }
1043
1044 pub fn correlation_rule_count(&self) -> usize {
1046 self.correlations.len()
1047 }
1048
1049 pub fn event_buffer_count(&self) -> usize {
1051 self.event_buffers.len()
1052 }
1053
1054 pub fn event_buffer_bytes(&self) -> usize {
1056 self.event_buffers
1057 .values()
1058 .map(|b| b.compressed_bytes())
1059 .sum()
1060 }
1061
1062 pub fn event_ref_buffer_count(&self) -> usize {
1064 self.event_ref_buffers.len()
1065 }
1066
1067 pub fn engine(&self) -> &Engine {
1069 &self.engine
1070 }
1071
1072 pub fn export_state(&self) -> CorrelationSnapshot {
1078 let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1079 for ((idx, gk), ws) in &self.state {
1080 let corr_id = self.correlation_stable_id(*idx);
1081 windows
1082 .entry(corr_id)
1083 .or_default()
1084 .push((gk.clone(), ws.clone()));
1085 }
1086
1087 let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1088 for ((idx, gk), ts) in &self.last_alert {
1089 let corr_id = self.correlation_stable_id(*idx);
1090 last_alert
1091 .entry(corr_id)
1092 .or_default()
1093 .push((gk.clone(), *ts));
1094 }
1095
1096 let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1097 for ((idx, gk), buf) in &self.event_buffers {
1098 let corr_id = self.correlation_stable_id(*idx);
1099 event_buffers
1100 .entry(corr_id)
1101 .or_default()
1102 .push((gk.clone(), buf.clone()));
1103 }
1104
1105 let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1106 HashMap::new();
1107 for ((idx, gk), buf) in &self.event_ref_buffers {
1108 let corr_id = self.correlation_stable_id(*idx);
1109 event_ref_buffers
1110 .entry(corr_id)
1111 .or_default()
1112 .push((gk.clone(), buf.clone()));
1113 }
1114
1115 CorrelationSnapshot {
1116 version: SNAPSHOT_VERSION,
1117 windows,
1118 last_alert,
1119 event_buffers,
1120 event_ref_buffers,
1121 }
1122 }
1123
1124 pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1131 if snapshot.version != SNAPSHOT_VERSION {
1132 return false;
1133 }
1134 let id_to_idx = self.build_id_to_index_map();
1135
1136 for (corr_id, groups) in snapshot.windows {
1137 if let Some(&idx) = id_to_idx.get(&corr_id) {
1138 for (gk, ws) in groups {
1139 self.state.insert((idx, gk), ws);
1140 }
1141 }
1142 }
1143
1144 for (corr_id, groups) in snapshot.last_alert {
1145 if let Some(&idx) = id_to_idx.get(&corr_id) {
1146 for (gk, ts) in groups {
1147 self.last_alert.insert((idx, gk), ts);
1148 }
1149 }
1150 }
1151
1152 for (corr_id, groups) in snapshot.event_buffers {
1153 if let Some(&idx) = id_to_idx.get(&corr_id) {
1154 for (gk, buf) in groups {
1155 self.event_buffers.insert((idx, gk), buf);
1156 }
1157 }
1158 }
1159
1160 for (corr_id, groups) in snapshot.event_ref_buffers {
1161 if let Some(&idx) = id_to_idx.get(&corr_id) {
1162 for (gk, buf) in groups {
1163 self.event_ref_buffers.insert((idx, gk), buf);
1164 }
1165 }
1166 }
1167
1168 true
1169 }
1170
1171 fn correlation_stable_id(&self, idx: usize) -> String {
1173 let corr = &self.correlations[idx];
1174 corr.id
1175 .clone()
1176 .or_else(|| corr.name.clone())
1177 .unwrap_or_else(|| corr.title.clone())
1178 }
1179
1180 fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1182 self.correlations
1183 .iter()
1184 .enumerate()
1185 .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1186 .collect()
1187 }
1188}
1189
1190impl Default for CorrelationEngine {
1191 fn default() -> Self {
1192 Self::new(CorrelationConfig::default())
1193 }
1194}
1195
1196fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1205 for field_name in timestamp_fields {
1206 if let Some(val) = event.get_field(field_name)
1207 && let Some(ts) = parse_timestamp_value(&val)
1208 {
1209 return Some(ts);
1210 }
1211 }
1212 None
1213}
1214
1215fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1217 match val {
1218 EventValue::Int(i) => Some(normalize_epoch(*i)),
1219 EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1220 EventValue::Str(s) => parse_timestamp_string(s),
1221 _ => None,
1222 }
1223}
1224
1225fn normalize_epoch(v: i64) -> i64 {
1228 if v > 1_000_000_000_000 { v / 1000 } else { v }
1229}
1230
1231fn parse_timestamp_string(s: &str) -> Option<i64> {
1233 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1235 return Some(dt.timestamp());
1236 }
1237
1238 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1241 return Some(Utc.from_utc_datetime(&naive).timestamp());
1242 }
1243 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1244 return Some(Utc.from_utc_datetime(&naive).timestamp());
1245 }
1246
1247 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1249 return Some(Utc.from_utc_datetime(&naive).timestamp());
1250 }
1251 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1252 return Some(Utc.from_utc_datetime(&naive).timestamp());
1253 }
1254
1255 None
1256}
1257
1258fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1260 match v {
1261 EventValue::Str(s) => Some(s.to_string()),
1262 EventValue::Int(n) => Some(n.to_string()),
1263 EventValue::Float(f) => Some(f.to_string()),
1264 EventValue::Bool(b) => Some(b.to_string()),
1265 EventValue::Null => Some("null".to_string()),
1266 _ => None,
1267 }
1268}
1269
1270fn composite_value_count_key(event: &impl Event, fields: &[String]) -> Option<String> {
1279 if let [field_name] = fields {
1281 let val = event.get_field(field_name)?;
1282 return value_to_string_for_count(&val);
1283 }
1284
1285 let mut parts = Vec::with_capacity(fields.len());
1286 for field_name in fields {
1287 let val = event.get_field(field_name)?;
1288 let rendered = value_to_string_for_count(&val)?;
1289 parts.push(rendered);
1290 }
1291 Some(parts.join("\u{1f}"))
1292}
1293
1294fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1296 v.as_f64()
1297}