1#[cfg(test)]
16mod tests;
17mod types;
18
19pub use types::*;
20
21use std::collections::HashMap;
22
23use chrono::{DateTime, TimeZone, Utc};
24
25use rsigma_parser::{CorrelationRule, CorrelationType, SigmaCollection, SigmaRule, WindowMode};
26
27use crate::correlation::{
28 CompiledCorrelation, EventBuffer, EventRefBuffer, GroupKey, WindowDecision, WindowState,
29 apply_window_open, compile_correlation,
30};
31use crate::engine::Engine;
32use crate::error::{EvalError, Result};
33use crate::event::{Event, EventValue};
34use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
35use crate::result::{CorrelationBody, EvaluationResult, ResultBody, RuleHeader};
36
37const SNAPSHOT_VERSION: u32 = 1;
43
44pub struct CorrelationEngine {
49 engine: Engine,
51 correlations: Vec<CompiledCorrelation>,
53 rule_index: HashMap<String, Vec<usize>>,
56 rule_ids: Vec<(Option<String>, Option<String>)>,
59 state: HashMap<(usize, GroupKey), WindowState>,
61 last_alert: HashMap<(usize, GroupKey), i64>,
63 event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
65 event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
67 correlation_only_rules: std::collections::HashSet<String>,
71 config: CorrelationConfig,
73 pipelines: Vec<Pipeline>,
75}
76
77impl CorrelationEngine {
78 pub fn new(config: CorrelationConfig) -> Self {
80 CorrelationEngine {
81 engine: Engine::new(),
82 correlations: Vec::new(),
83 rule_index: HashMap::new(),
84 rule_ids: Vec::new(),
85 state: HashMap::new(),
86 last_alert: HashMap::new(),
87 event_buffers: HashMap::new(),
88 event_ref_buffers: HashMap::new(),
89 correlation_only_rules: std::collections::HashSet::new(),
90 config,
91 pipelines: Vec::new(),
92 }
93 }
94
95 pub fn add_pipeline(&mut self, pipeline: Pipeline) {
99 self.pipelines.push(pipeline);
100 self.pipelines.sort_by_key(|p| p.priority);
101 }
102
103 pub fn set_include_event(&mut self, include: bool) {
105 self.engine.set_include_event(include);
106 }
107
108 pub fn set_match_detail(&mut self, level: crate::result::MatchDetailLevel) {
111 self.engine.set_match_detail(level);
112 }
113
114 pub fn set_bloom_prefilter(&mut self, enabled: bool) {
118 self.engine.set_bloom_prefilter(enabled);
119 }
120
121 pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
124 self.engine.set_bloom_max_bytes(max_bytes);
125 }
126
127 #[cfg(feature = "daachorse-index")]
131 pub fn set_cross_rule_ac(&mut self, enabled: bool) {
132 self.engine.set_cross_rule_ac(enabled);
133 }
134
135 pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
141 self.config.correlation_event_mode = mode;
142 }
143
144 pub fn set_max_correlation_events(&mut self, max: usize) {
147 self.config.max_correlation_events = max;
148 }
149
150 pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
156 if self.pipelines.is_empty() {
157 self.apply_custom_attributes(&rule.custom_attributes);
158 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
159 self.engine.add_rule(rule)?;
160 } else {
161 let mut transformed = rule.clone();
162 apply_pipelines(&self.pipelines, &mut transformed)?;
163 self.apply_custom_attributes(&transformed.custom_attributes);
164 self.rule_ids
165 .push((transformed.id.clone(), transformed.name.clone()));
166 let compiled = crate::compiler::compile_rule(&transformed)?;
168 self.engine.add_compiled_rule(compiled);
169 }
170 Ok(())
171 }
172
173 fn apply_custom_attributes(
187 &mut self,
188 attrs: &std::collections::HashMap<String, yaml_serde::Value>,
189 ) {
190 if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
192 && !self.config.timestamp_fields.iter().any(|f| f == field)
193 {
194 self.config.timestamp_fields.insert(0, field.to_string());
195 }
196
197 if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
199 && self.config.suppress.is_none()
200 && let Ok(ts) = rsigma_parser::Timespan::parse(val)
201 {
202 self.config.suppress = Some(ts.seconds);
203 }
204
205 if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
207 && self.config.action_on_match == CorrelationAction::Alert
208 && let Ok(a) = val.parse::<CorrelationAction>()
209 {
210 self.config.action_on_match = a;
211 }
212 }
213
214 pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
216 let owned;
217 let effective = if self.pipelines.is_empty() {
218 corr
219 } else {
220 owned = {
221 let mut c = corr.clone();
222 apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
223 c
224 };
225 &owned
226 };
227
228 self.apply_custom_attributes(&effective.custom_attributes);
231
232 let compiled = compile_correlation(effective)?;
233 let idx = self.correlations.len();
234
235 for rule_ref in &compiled.rule_refs {
237 self.rule_index
238 .entry(rule_ref.clone())
239 .or_default()
240 .push(idx);
241 }
242
243 if !compiled.generate {
245 for rule_ref in &compiled.rule_refs {
246 self.correlation_only_rules.insert(rule_ref.clone());
247 }
248 }
249
250 self.correlations.push(compiled);
251 Ok(())
252 }
253
254 pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
263 let mut compiled_batch = Vec::with_capacity(collection.rules.len());
264 if self.pipelines.is_empty() {
265 for rule in &collection.rules {
266 self.apply_custom_attributes(&rule.custom_attributes);
267 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
268 compiled_batch.push(crate::compiler::compile_rule(rule)?);
269 }
270 } else {
271 for rule in &collection.rules {
272 let mut transformed = rule.clone();
273 apply_pipelines(&self.pipelines, &mut transformed)?;
274 self.apply_custom_attributes(&transformed.custom_attributes);
275 self.rule_ids
276 .push((transformed.id.clone(), transformed.name.clone()));
277 compiled_batch.push(crate::compiler::compile_rule(&transformed)?);
279 }
280 }
281 self.engine.extend_compiled_rules(compiled_batch);
282 for filter in &collection.filters {
284 self.engine.apply_filter(filter)?;
285 }
286 for corr in &collection.correlations {
287 self.add_correlation(corr)?;
288 }
289 self.validate_rule_refs()?;
290 self.detect_correlation_cycles()?;
291 Ok(())
292 }
293
294 fn validate_rule_refs(&self) -> Result<()> {
297 let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
298
299 for (id, name) in &self.rule_ids {
300 if let Some(id) = id {
301 known.insert(id.as_str());
302 }
303 if let Some(name) = name {
304 known.insert(name.as_str());
305 }
306 }
307 for corr in &self.correlations {
308 if let Some(ref id) = corr.id {
309 known.insert(id.as_str());
310 }
311 if let Some(ref name) = corr.name {
312 known.insert(name.as_str());
313 }
314 }
315
316 for corr in &self.correlations {
317 for rule_ref in &corr.rule_refs {
318 if !known.contains(rule_ref.as_str()) {
319 return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
320 }
321 }
322 }
323 Ok(())
324 }
325
326 fn detect_correlation_cycles(&self) -> Result<()> {
334 let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
336 for (idx, corr) in self.correlations.iter().enumerate() {
337 if let Some(ref id) = corr.id {
338 corr_identifiers.insert(id.as_str(), idx);
339 }
340 if let Some(ref name) = corr.name {
341 corr_identifiers.insert(name.as_str(), idx);
342 }
343 }
344
345 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
347 for (idx, corr) in self.correlations.iter().enumerate() {
348 for rule_ref in &corr.rule_refs {
349 if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
350 adj[idx].push(target_idx);
351 }
352 }
353 }
354
355 let mut state = vec![0u8; self.correlations.len()]; let mut path: Vec<usize> = Vec::new();
358
359 for start in 0..self.correlations.len() {
360 if state[start] == 0
361 && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
362 {
363 let names: Vec<String> = cycle
364 .iter()
365 .map(|&i| {
366 self.correlations[i]
367 .id
368 .as_deref()
369 .or(self.correlations[i].name.as_deref())
370 .unwrap_or(&self.correlations[i].title)
371 .to_string()
372 })
373 .collect();
374 return Err(crate::error::EvalError::CorrelationCycle(
375 names.join(" -> "),
376 ));
377 }
378 }
379 Ok(())
380 }
381
382 fn dfs_find_cycle(
384 node: usize,
385 adj: &[Vec<usize>],
386 state: &mut [u8],
387 path: &mut Vec<usize>,
388 ) -> Option<Vec<usize>> {
389 state[node] = 1; path.push(node);
391
392 for &next in &adj[node] {
393 if state[next] == 1 {
394 if let Some(pos) = path.iter().position(|&n| n == next) {
396 let mut cycle = path[pos..].to_vec();
397 cycle.push(next); return Some(cycle);
399 }
400 }
401 if state[next] == 0
402 && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
403 {
404 return Some(cycle);
405 }
406 }
407
408 path.pop();
409 state[node] = 2; None
411 }
412
413 pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
419 let all_detections = self.engine.evaluate(event);
420
421 let ts = match self.extract_event_timestamp(event) {
422 Some(ts) => ts,
423 None => match self.config.timestamp_fallback {
424 TimestampFallback::WallClock => Utc::now().timestamp(),
425 TimestampFallback::Skip => {
426 return self.filter_detections(all_detections);
428 }
429 },
430 };
431 self.process_with_detections(event, all_detections, ts)
432 }
433
434 pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
439 let all_detections = self.engine.evaluate(event);
440 self.process_with_detections(event, all_detections, timestamp_secs)
441 }
442
443 pub fn process_with_detections(
449 &mut self,
450 event: &impl Event,
451 all_detections: Vec<EvaluationResult>,
452 timestamp_secs: i64,
453 ) -> ProcessResult {
454 let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
455
456 if self.state.len() >= self.config.max_state_entries {
458 self.evict_all(timestamp_secs);
459 }
460
461 let mut correlations: Vec<EvaluationResult> = Vec::new();
463 self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
464
465 self.chain_correlations(&correlations, timestamp_secs);
467
468 let mut out = self.filter_detections(all_detections);
470 out.extend(correlations);
471 out
472 }
473
474 pub fn evaluate(&self, event: &impl Event) -> Vec<EvaluationResult> {
481 self.engine.evaluate(event)
482 }
483
484 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
492 let engine = &self.engine;
495 let ts_fields = &self.config.timestamp_fields;
496
497 let batch_results: Vec<(Vec<EvaluationResult>, Option<i64>)> = {
498 #[cfg(feature = "parallel")]
499 {
500 use rayon::prelude::*;
501 events
502 .par_iter()
503 .map(|e| {
504 let detections = engine.evaluate(e);
505 let ts = extract_event_ts(e, ts_fields);
506 (detections, ts)
507 })
508 .collect()
509 }
510 #[cfg(not(feature = "parallel"))]
511 {
512 events
513 .iter()
514 .map(|e| {
515 let detections = engine.evaluate(e);
516 let ts = extract_event_ts(e, ts_fields);
517 (detections, ts)
518 })
519 .collect()
520 }
521 };
522
523 let mut results = Vec::with_capacity(events.len());
525 for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
526 match ts_opt {
527 Some(ts) => {
528 results.push(self.process_with_detections(event, detections, ts));
529 }
530 None => match self.config.timestamp_fallback {
531 TimestampFallback::WallClock => {
532 let ts = Utc::now().timestamp();
533 results.push(self.process_with_detections(event, detections, ts));
534 }
535 TimestampFallback::Skip => {
536 results.push(self.filter_detections(detections));
538 }
539 },
540 }
541 }
542 results
543 }
544
545 fn filter_detections(&self, all_detections: Vec<EvaluationResult>) -> Vec<EvaluationResult> {
550 if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
551 all_detections
552 .into_iter()
553 .filter(|m| {
554 let id_match = m
555 .header
556 .rule_id
557 .as_ref()
558 .is_some_and(|id| self.correlation_only_rules.contains(id));
559 !id_match
560 })
561 .collect()
562 } else {
563 all_detections
564 }
565 }
566
567 fn feed_detections(
569 &mut self,
570 event: &impl Event,
571 detections: &[EvaluationResult],
572 ts: i64,
573 out: &mut Vec<EvaluationResult>,
574 ) {
575 let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
578
579 for det in detections {
580 let (rule_id, rule_name) = self.find_rule_identity(det);
583
584 let mut corr_indices = Vec::new();
586 if let Some(ref id) = rule_id
587 && let Some(indices) = self.rule_index.get(id)
588 {
589 corr_indices.extend(indices);
590 }
591 if let Some(ref name) = rule_name
592 && let Some(indices) = self.rule_index.get(name)
593 {
594 corr_indices.extend(indices);
595 }
596
597 corr_indices.sort_unstable();
598 corr_indices.dedup();
599
600 for &corr_idx in &corr_indices {
601 work.push((corr_idx, rule_id.clone(), rule_name.clone()));
602 }
603 }
604
605 for (corr_idx, rule_id, rule_name) in work {
606 self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
607 }
608 }
609
610 fn find_rule_identity(&self, det: &EvaluationResult) -> (Option<String>, Option<String>) {
612 if let Some(ref match_id) = det.header.rule_id {
614 for (id, name) in &self.rule_ids {
615 if id.as_deref() == Some(match_id.as_str()) {
616 return (id.clone(), name.clone());
617 }
618 }
619 }
620 (det.header.rule_id.clone(), None)
622 }
623
624 fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
626 let corr = &self.correlations[corr_idx];
627 corr.event_mode
628 .unwrap_or(self.config.correlation_event_mode)
629 }
630
631 fn resolve_max_events(&self, corr_idx: usize) -> usize {
633 let corr = &self.correlations[corr_idx];
634 corr.max_events
635 .unwrap_or(self.config.max_correlation_events)
636 }
637
638 fn update_correlation(
640 &mut self,
641 corr_idx: usize,
642 event: &impl Event,
643 ts: i64,
644 rule_id: &Option<String>,
645 rule_name: &Option<String>,
646 out: &mut Vec<EvaluationResult>,
647 ) {
648 let corr = &self.correlations[corr_idx];
652 let corr_type = corr.correlation_type;
653 let timespan = corr.timespan_secs;
654 let window_mode = corr.window_mode;
655 let gap_secs = corr.gap_secs;
656 let level = corr.level;
657 let suppress_secs = corr.suppress_secs.or(self.config.suppress);
658 let action = corr.action.unwrap_or(self.config.action_on_match);
659 let event_mode = self.resolve_event_mode(corr_idx);
660 let max_events = self.resolve_max_events(corr_idx);
661
662 let mut ref_strs: Vec<&str> = Vec::new();
664 if let Some(id) = rule_id.as_deref() {
665 ref_strs.push(id);
666 }
667 if let Some(name) = rule_name.as_deref() {
668 ref_strs.push(name);
669 }
670 let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
671
672 let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
674
675 let state_key = (corr_idx, group_key.clone());
677 let state = self
678 .state
679 .entry(state_key.clone())
680 .or_insert_with(|| WindowState::new_for(corr_type));
681
682 let cutoff = ts - timespan as i64;
688 let decision = apply_window_open(state, ts, timespan, window_mode, gap_secs);
689 if decision == WindowDecision::Discard {
690 return;
691 }
692 let reset = decision == WindowDecision::Reset;
693
694 match corr_type {
696 CorrelationType::EventCount => {
697 state.push_event_count(ts);
698 }
699 CorrelationType::ValueCount => {
700 if let Some(ref fields) = corr.condition.field
701 && let Some(key) = composite_value_count_key(event, fields)
702 {
703 state.push_value_count(ts, key);
704 }
705 }
706 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
707 state.push_temporal(ts, rule_ref);
708 }
709 CorrelationType::ValueSum
710 | CorrelationType::ValueAvg
711 | CorrelationType::ValuePercentile
712 | CorrelationType::ValueMedian => {
713 if let Some(ref fields) = corr.condition.field
714 && let Some(field_name) = fields.first()
715 && let Some(val) = event.get_field(field_name)
716 && let Some(n) = value_to_f64_ev(&val)
717 {
718 state.push_numeric(ts, n);
719 }
720 }
721 }
722
723 match event_mode {
727 CorrelationEventMode::Full => {
728 let buf = self
729 .event_buffers
730 .entry(state_key.clone())
731 .or_insert_with(|| EventBuffer::new(max_events));
732 if window_mode == rsigma_parser::WindowMode::Sliding {
733 buf.evict(cutoff);
734 } else if reset {
735 buf.clear();
736 }
737 let json = event.to_json();
738 buf.push(ts, &json);
739 }
740 CorrelationEventMode::Refs => {
741 let buf = self
742 .event_ref_buffers
743 .entry(state_key.clone())
744 .or_insert_with(|| EventRefBuffer::new(max_events));
745 if window_mode == rsigma_parser::WindowMode::Sliding {
746 buf.evict(cutoff);
747 } else if reset {
748 buf.clear();
749 }
750 let json = event.to_json();
751 buf.push(ts, &json);
752 }
753 CorrelationEventMode::None => {}
754 }
755
756 let fired = state.check_condition(
758 &corr.condition,
759 corr_type,
760 &corr.rule_refs,
761 corr.extended_expr.as_ref(),
762 );
763
764 if let Some(agg_value) = fired {
765 let alert_key = (corr_idx, group_key.clone());
766
767 let suppressed = if let Some(suppress) = suppress_secs {
769 if let Some(&last_ts) = self.last_alert.get(&alert_key) {
770 (ts - last_ts) < suppress as i64
771 } else {
772 false
773 }
774 } else {
775 false
776 };
777
778 if !suppressed {
779 let (events, event_refs) = match event_mode {
781 CorrelationEventMode::Full => {
782 let stored = self
783 .event_buffers
784 .get(&alert_key)
785 .map(|buf| buf.decompress_all())
786 .unwrap_or_default();
787 (Some(stored), None)
788 }
789 CorrelationEventMode::Refs => {
790 let stored = self
791 .event_ref_buffers
792 .get(&alert_key)
793 .map(|buf| buf.refs())
794 .unwrap_or_default();
795 (None, Some(stored))
796 }
797 CorrelationEventMode::None => (None, None),
798 };
799
800 let corr = &self.correlations[corr_idx];
802 let result = EvaluationResult {
803 header: RuleHeader {
804 rule_title: corr.title.clone(),
805 rule_id: corr.id.clone(),
806 level,
807 tags: corr.tags.clone(),
808 custom_attributes: corr.custom_attributes.clone(),
809 enrichments: None,
810 },
811 body: ResultBody::Correlation(CorrelationBody {
812 correlation_type: corr_type,
813 group_key: group_key.to_pairs(&corr.group_by),
814 aggregated_value: agg_value,
815 timespan_secs: timespan,
816 events,
817 event_refs,
818 }),
819 };
820 out.push(result);
821
822 self.last_alert.insert(alert_key.clone(), ts);
824
825 if action == CorrelationAction::Reset {
827 if let Some(state) = self.state.get_mut(&alert_key) {
828 state.clear();
829 }
830 if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
831 buf.clear();
832 }
833 if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
834 buf.clear();
835 }
836 }
837 }
838 }
839 }
840
841 fn chain_correlations(&mut self, fired: &[EvaluationResult], ts: i64) {
846 const MAX_CHAIN_DEPTH: usize = 10;
847 let mut pending: Vec<EvaluationResult> = fired.to_vec();
848 let mut depth = 0;
849
850 while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
851 depth += 1;
852
853 #[allow(clippy::type_complexity)]
855 let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
856 for result in &pending {
857 let Some(body) = result.as_correlation() else {
859 continue;
860 };
861 if let Some(ref id) = result.header.rule_id
862 && let Some(indices) = self.rule_index.get(id)
863 {
864 let fired_ref = result
865 .header
866 .rule_id
867 .as_deref()
868 .unwrap_or(&result.header.rule_title)
869 .to_string();
870 for &corr_idx in indices {
871 work.push((corr_idx, body.group_key.clone(), fired_ref.clone()));
872 }
873 }
874 }
875
876 let mut next_pending = Vec::new();
877 for (corr_idx, group_key_pairs, fired_ref) in work {
878 let corr = &self.correlations[corr_idx];
879 let corr_type = corr.correlation_type;
880 let timespan = corr.timespan_secs;
881 let window_mode = corr.window_mode;
882 let gap_secs = corr.gap_secs;
883 let level = corr.level;
884
885 let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
886 let state_key = (corr_idx, group_key.clone());
887 let state = self
888 .state
889 .entry(state_key)
890 .or_insert_with(|| WindowState::new_for(corr_type));
891
892 if apply_window_open(state, ts, timespan, window_mode, gap_secs)
896 == WindowDecision::Discard
897 {
898 continue;
899 }
900
901 match corr_type {
902 CorrelationType::EventCount => {
903 state.push_event_count(ts);
904 }
905 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
906 state.push_temporal(ts, &fired_ref);
907 }
908 _ => {
909 state.push_event_count(ts);
910 }
911 }
912
913 let fired = state.check_condition(
914 &corr.condition,
915 corr_type,
916 &corr.rule_refs,
917 corr.extended_expr.as_ref(),
918 );
919
920 if let Some(agg_value) = fired {
921 let corr = &self.correlations[corr_idx];
922 next_pending.push(EvaluationResult {
923 header: RuleHeader {
924 rule_title: corr.title.clone(),
925 rule_id: corr.id.clone(),
926 level,
927 tags: corr.tags.clone(),
928 custom_attributes: corr.custom_attributes.clone(),
929 enrichments: None,
930 },
931 body: ResultBody::Correlation(CorrelationBody {
932 correlation_type: corr_type,
933 group_key: group_key.to_pairs(&corr.group_by),
934 aggregated_value: agg_value,
935 timespan_secs: timespan,
936 events: None,
940 event_refs: None,
941 }),
942 });
943 }
944 }
945
946 pending = next_pending;
947 }
948
949 if !pending.is_empty() {
950 log::warn!(
951 "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
952 {} pending result(s) were not propagated further. \
953 This may indicate a cycle in correlation references.",
954 pending.len()
955 );
956 }
957 }
958
959 fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
971 for field_name in &self.config.timestamp_fields {
972 if let Some(val) = event.get_field(field_name)
973 && let Some(ts) = parse_timestamp_value(&val)
974 {
975 return Some(ts);
976 }
977 }
978 None
979 }
980
981 pub fn evict_expired(&mut self, now_secs: i64) {
987 self.evict_all(now_secs);
988 }
989
990 fn evict_all(&mut self, now_secs: i64) {
992 let specs: Vec<(u64, WindowMode, Option<u64>)> = self
1003 .correlations
1004 .iter()
1005 .map(|c| (c.timespan_secs, c.window_mode, c.gap_secs))
1006 .collect();
1007
1008 self.state.retain(|&(corr_idx, _), state| {
1009 if let Some(&(timespan, mode, gap)) = specs.get(corr_idx) {
1010 match mode {
1011 WindowMode::Sliding => {
1012 state.evict(now_secs - timespan as i64);
1013 }
1014 WindowMode::Tumbling | WindowMode::Session => {
1015 let staleness = if mode == WindowMode::Session {
1016 gap.unwrap_or(timespan)
1017 } else {
1018 timespan
1019 } as i64;
1020 if state
1021 .latest_timestamp()
1022 .is_some_and(|last| now_secs - last > staleness)
1023 {
1024 state.clear();
1025 }
1026 }
1027 }
1028 }
1029 !state.is_empty()
1030 });
1031
1032 let state = &self.state;
1036 self.event_buffers.retain(|key, buf| {
1037 if let Some(&(timespan, mode, _)) = specs.get(key.0) {
1038 match mode {
1039 WindowMode::Sliding => buf.evict(now_secs - timespan as i64),
1040 WindowMode::Tumbling | WindowMode::Session => {
1041 if !state.contains_key(key) {
1042 return false;
1043 }
1044 }
1045 }
1046 }
1047 !buf.is_empty()
1048 });
1049 self.event_ref_buffers.retain(|key, buf| {
1050 if let Some(&(timespan, mode, _)) = specs.get(key.0) {
1051 match mode {
1052 WindowMode::Sliding => buf.evict(now_secs - timespan as i64),
1053 WindowMode::Tumbling | WindowMode::Session => {
1054 if !state.contains_key(key) {
1055 return false;
1056 }
1057 }
1058 }
1059 }
1060 !buf.is_empty()
1061 });
1062
1063 if self.state.len() >= self.config.max_state_entries {
1067 let target = self.config.max_state_entries * 9 / 10;
1068 let excess = self.state.len() - target;
1069
1070 log::warn!(
1071 "Correlation state hard cap reached ({} entries, max {}); \
1072 evicting {} stalest entries to {} (90% capacity). \
1073 This indicates high-cardinality traffic; consider raising \
1074 max_state_entries or shortening correlation windows.",
1075 self.state.len(),
1076 self.config.max_state_entries,
1077 excess,
1078 target,
1079 );
1080
1081 let mut by_staleness: Vec<_> = self
1083 .state
1084 .iter()
1085 .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1086 .collect();
1087 by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1088
1089 for (key, _) in by_staleness.into_iter().take(excess) {
1091 self.state.remove(&key);
1092 self.last_alert.remove(&key);
1093 self.event_buffers.remove(&key);
1094 self.event_ref_buffers.remove(&key);
1095 }
1096 }
1097
1098 self.last_alert.retain(|key, &mut alert_ts| {
1101 let suppress = if key.0 < self.correlations.len() {
1102 self.correlations[key.0]
1103 .suppress_secs
1104 .or(self.config.suppress)
1105 .unwrap_or(0)
1106 } else {
1107 0
1108 };
1109 (now_secs - alert_ts) < suppress as i64
1110 });
1111 }
1112
1113 pub fn state_count(&self) -> usize {
1115 self.state.len()
1116 }
1117
1118 pub fn detection_rule_count(&self) -> usize {
1120 self.engine.rule_count()
1121 }
1122
1123 pub fn correlation_rule_count(&self) -> usize {
1125 self.correlations.len()
1126 }
1127
1128 pub fn event_buffer_count(&self) -> usize {
1130 self.event_buffers.len()
1131 }
1132
1133 pub fn event_buffer_bytes(&self) -> usize {
1135 self.event_buffers
1136 .values()
1137 .map(|b| b.compressed_bytes())
1138 .sum()
1139 }
1140
1141 pub fn event_ref_buffer_count(&self) -> usize {
1143 self.event_ref_buffers.len()
1144 }
1145
1146 pub fn engine(&self) -> &Engine {
1148 &self.engine
1149 }
1150
1151 pub fn export_state(&self) -> CorrelationSnapshot {
1157 let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1158 for ((idx, gk), ws) in &self.state {
1159 let corr_id = self.correlation_stable_id(*idx);
1160 windows
1161 .entry(corr_id)
1162 .or_default()
1163 .push((gk.clone(), ws.clone()));
1164 }
1165
1166 let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1167 for ((idx, gk), ts) in &self.last_alert {
1168 let corr_id = self.correlation_stable_id(*idx);
1169 last_alert
1170 .entry(corr_id)
1171 .or_default()
1172 .push((gk.clone(), *ts));
1173 }
1174
1175 let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1176 for ((idx, gk), buf) in &self.event_buffers {
1177 let corr_id = self.correlation_stable_id(*idx);
1178 event_buffers
1179 .entry(corr_id)
1180 .or_default()
1181 .push((gk.clone(), buf.clone()));
1182 }
1183
1184 let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1185 HashMap::new();
1186 for ((idx, gk), buf) in &self.event_ref_buffers {
1187 let corr_id = self.correlation_stable_id(*idx);
1188 event_ref_buffers
1189 .entry(corr_id)
1190 .or_default()
1191 .push((gk.clone(), buf.clone()));
1192 }
1193
1194 CorrelationSnapshot {
1195 version: SNAPSHOT_VERSION,
1196 windows,
1197 last_alert,
1198 event_buffers,
1199 event_ref_buffers,
1200 }
1201 }
1202
1203 pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1210 if snapshot.version != SNAPSHOT_VERSION {
1211 return false;
1212 }
1213 let id_to_idx = self.build_id_to_index_map();
1214
1215 for (corr_id, groups) in snapshot.windows {
1216 if let Some(&idx) = id_to_idx.get(&corr_id) {
1217 for (gk, ws) in groups {
1218 self.state.insert((idx, gk), ws);
1219 }
1220 }
1221 }
1222
1223 for (corr_id, groups) in snapshot.last_alert {
1224 if let Some(&idx) = id_to_idx.get(&corr_id) {
1225 for (gk, ts) in groups {
1226 self.last_alert.insert((idx, gk), ts);
1227 }
1228 }
1229 }
1230
1231 for (corr_id, groups) in snapshot.event_buffers {
1232 if let Some(&idx) = id_to_idx.get(&corr_id) {
1233 for (gk, buf) in groups {
1234 self.event_buffers.insert((idx, gk), buf);
1235 }
1236 }
1237 }
1238
1239 for (corr_id, groups) in snapshot.event_ref_buffers {
1240 if let Some(&idx) = id_to_idx.get(&corr_id) {
1241 for (gk, buf) in groups {
1242 self.event_ref_buffers.insert((idx, gk), buf);
1243 }
1244 }
1245 }
1246
1247 true
1248 }
1249
1250 fn correlation_stable_id(&self, idx: usize) -> String {
1252 let corr = &self.correlations[idx];
1253 corr.id
1254 .clone()
1255 .or_else(|| corr.name.clone())
1256 .unwrap_or_else(|| corr.title.clone())
1257 }
1258
1259 fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1261 self.correlations
1262 .iter()
1263 .enumerate()
1264 .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1265 .collect()
1266 }
1267}
1268
1269impl Default for CorrelationEngine {
1270 fn default() -> Self {
1271 Self::new(CorrelationConfig::default())
1272 }
1273}
1274
1275fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1284 for field_name in timestamp_fields {
1285 if let Some(val) = event.get_field(field_name)
1286 && let Some(ts) = parse_timestamp_value(&val)
1287 {
1288 return Some(ts);
1289 }
1290 }
1291 None
1292}
1293
1294fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1296 match val {
1297 EventValue::Int(i) => Some(normalize_epoch(*i)),
1298 EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1299 EventValue::Str(s) => parse_timestamp_string(s),
1300 _ => None,
1301 }
1302}
1303
1304fn normalize_epoch(v: i64) -> i64 {
1307 if v > 1_000_000_000_000 { v / 1000 } else { v }
1308}
1309
1310fn parse_timestamp_string(s: &str) -> Option<i64> {
1312 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1314 return Some(dt.timestamp());
1315 }
1316
1317 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1320 return Some(Utc.from_utc_datetime(&naive).timestamp());
1321 }
1322 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1323 return Some(Utc.from_utc_datetime(&naive).timestamp());
1324 }
1325
1326 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1328 return Some(Utc.from_utc_datetime(&naive).timestamp());
1329 }
1330 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1331 return Some(Utc.from_utc_datetime(&naive).timestamp());
1332 }
1333
1334 None
1335}
1336
1337fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1339 match v {
1340 EventValue::Str(s) => Some(s.to_string()),
1341 EventValue::Int(n) => Some(n.to_string()),
1342 EventValue::Float(f) => Some(f.to_string()),
1343 EventValue::Bool(b) => Some(b.to_string()),
1344 EventValue::Null => Some("null".to_string()),
1345 _ => None,
1346 }
1347}
1348
1349fn composite_value_count_key(event: &impl Event, fields: &[String]) -> Option<String> {
1358 if let [field_name] = fields {
1360 let val = event.get_field(field_name)?;
1361 return value_to_string_for_count(&val);
1362 }
1363
1364 let mut parts = Vec::with_capacity(fields.len());
1365 for field_name in fields {
1366 let val = event.get_field(field_name)?;
1367 let rendered = value_to_string_for_count(&val)?;
1368 parts.push(rendered);
1369 }
1370 Some(parts.join("\u{1f}"))
1371}
1372
1373fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1375 v.as_f64()
1376}