1#[cfg(test)]
16mod tests;
17mod types;
18
19pub use types::*;
20
21use std::collections::HashMap;
22
23use chrono::{DateTime, TimeZone, Utc};
24
25use rsigma_parser::{CorrelationRule, CorrelationType, SigmaCollection, SigmaRule};
26
27use crate::correlation::{
28 CompiledCorrelation, EventBuffer, EventRefBuffer, GroupKey, WindowState, compile_correlation,
29};
30use crate::engine::Engine;
31use crate::error::{EvalError, Result};
32use crate::event::{Event, EventValue};
33use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
34use crate::result::MatchResult;
35
36const SNAPSHOT_VERSION: u32 = 1;
42
43pub struct CorrelationEngine {
48 engine: Engine,
50 correlations: Vec<CompiledCorrelation>,
52 rule_index: HashMap<String, Vec<usize>>,
55 rule_ids: Vec<(Option<String>, Option<String>)>,
58 state: HashMap<(usize, GroupKey), WindowState>,
60 last_alert: HashMap<(usize, GroupKey), i64>,
62 event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
64 event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
66 correlation_only_rules: std::collections::HashSet<String>,
70 config: CorrelationConfig,
72 pipelines: Vec<Pipeline>,
74}
75
76impl CorrelationEngine {
77 pub fn new(config: CorrelationConfig) -> Self {
79 CorrelationEngine {
80 engine: Engine::new(),
81 correlations: Vec::new(),
82 rule_index: HashMap::new(),
83 rule_ids: Vec::new(),
84 state: HashMap::new(),
85 last_alert: HashMap::new(),
86 event_buffers: HashMap::new(),
87 event_ref_buffers: HashMap::new(),
88 correlation_only_rules: std::collections::HashSet::new(),
89 config,
90 pipelines: Vec::new(),
91 }
92 }
93
94 pub fn add_pipeline(&mut self, pipeline: Pipeline) {
98 self.pipelines.push(pipeline);
99 self.pipelines.sort_by_key(|p| p.priority);
100 }
101
102 pub fn set_include_event(&mut self, include: bool) {
104 self.engine.set_include_event(include);
105 }
106
107 pub fn set_bloom_prefilter(&mut self, enabled: bool) {
111 self.engine.set_bloom_prefilter(enabled);
112 }
113
114 pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
117 self.engine.set_bloom_max_bytes(max_bytes);
118 }
119
120 #[cfg(feature = "daachorse-index")]
124 pub fn set_cross_rule_ac(&mut self, enabled: bool) {
125 self.engine.set_cross_rule_ac(enabled);
126 }
127
128 pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
134 self.config.correlation_event_mode = mode;
135 }
136
137 pub fn set_max_correlation_events(&mut self, max: usize) {
140 self.config.max_correlation_events = max;
141 }
142
143 pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
149 if self.pipelines.is_empty() {
150 self.apply_custom_attributes(&rule.custom_attributes);
151 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
152 self.engine.add_rule(rule)?;
153 } else {
154 let mut transformed = rule.clone();
155 apply_pipelines(&self.pipelines, &mut transformed)?;
156 self.apply_custom_attributes(&transformed.custom_attributes);
157 self.rule_ids
158 .push((transformed.id.clone(), transformed.name.clone()));
159 let compiled = crate::compiler::compile_rule(&transformed)?;
161 self.engine.add_compiled_rule(compiled);
162 }
163 Ok(())
164 }
165
166 fn apply_custom_attributes(
180 &mut self,
181 attrs: &std::collections::HashMap<String, serde_yaml::Value>,
182 ) {
183 if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
185 && !self.config.timestamp_fields.iter().any(|f| f == field)
186 {
187 self.config.timestamp_fields.insert(0, field.to_string());
188 }
189
190 if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
192 && self.config.suppress.is_none()
193 && let Ok(ts) = rsigma_parser::Timespan::parse(val)
194 {
195 self.config.suppress = Some(ts.seconds);
196 }
197
198 if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
200 && self.config.action_on_match == CorrelationAction::Alert
201 && let Ok(a) = val.parse::<CorrelationAction>()
202 {
203 self.config.action_on_match = a;
204 }
205 }
206
207 pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
209 let owned;
210 let effective = if self.pipelines.is_empty() {
211 corr
212 } else {
213 owned = {
214 let mut c = corr.clone();
215 apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
216 c
217 };
218 &owned
219 };
220
221 self.apply_custom_attributes(&effective.custom_attributes);
224
225 let compiled = compile_correlation(effective)?;
226 let idx = self.correlations.len();
227
228 for rule_ref in &compiled.rule_refs {
230 self.rule_index
231 .entry(rule_ref.clone())
232 .or_default()
233 .push(idx);
234 }
235
236 if !compiled.generate {
238 for rule_ref in &compiled.rule_refs {
239 self.correlation_only_rules.insert(rule_ref.clone());
240 }
241 }
242
243 self.correlations.push(compiled);
244 Ok(())
245 }
246
247 pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
252 for rule in &collection.rules {
253 self.add_rule(rule)?;
254 }
255 for filter in &collection.filters {
257 self.engine.apply_filter(filter)?;
258 }
259 for corr in &collection.correlations {
260 self.add_correlation(corr)?;
261 }
262 self.validate_rule_refs()?;
263 self.detect_correlation_cycles()?;
264 Ok(())
265 }
266
267 fn validate_rule_refs(&self) -> Result<()> {
270 let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
271
272 for (id, name) in &self.rule_ids {
273 if let Some(id) = id {
274 known.insert(id.as_str());
275 }
276 if let Some(name) = name {
277 known.insert(name.as_str());
278 }
279 }
280 for corr in &self.correlations {
281 if let Some(ref id) = corr.id {
282 known.insert(id.as_str());
283 }
284 if let Some(ref name) = corr.name {
285 known.insert(name.as_str());
286 }
287 }
288
289 for corr in &self.correlations {
290 for rule_ref in &corr.rule_refs {
291 if !known.contains(rule_ref.as_str()) {
292 return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
293 }
294 }
295 }
296 Ok(())
297 }
298
299 fn detect_correlation_cycles(&self) -> Result<()> {
307 let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
309 for (idx, corr) in self.correlations.iter().enumerate() {
310 if let Some(ref id) = corr.id {
311 corr_identifiers.insert(id.as_str(), idx);
312 }
313 if let Some(ref name) = corr.name {
314 corr_identifiers.insert(name.as_str(), idx);
315 }
316 }
317
318 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
320 for (idx, corr) in self.correlations.iter().enumerate() {
321 for rule_ref in &corr.rule_refs {
322 if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
323 adj[idx].push(target_idx);
324 }
325 }
326 }
327
328 let mut state = vec![0u8; self.correlations.len()]; let mut path: Vec<usize> = Vec::new();
331
332 for start in 0..self.correlations.len() {
333 if state[start] == 0
334 && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
335 {
336 let names: Vec<String> = cycle
337 .iter()
338 .map(|&i| {
339 self.correlations[i]
340 .id
341 .as_deref()
342 .or(self.correlations[i].name.as_deref())
343 .unwrap_or(&self.correlations[i].title)
344 .to_string()
345 })
346 .collect();
347 return Err(crate::error::EvalError::CorrelationCycle(
348 names.join(" -> "),
349 ));
350 }
351 }
352 Ok(())
353 }
354
355 fn dfs_find_cycle(
357 node: usize,
358 adj: &[Vec<usize>],
359 state: &mut [u8],
360 path: &mut Vec<usize>,
361 ) -> Option<Vec<usize>> {
362 state[node] = 1; path.push(node);
364
365 for &next in &adj[node] {
366 if state[next] == 1 {
367 if let Some(pos) = path.iter().position(|&n| n == next) {
369 let mut cycle = path[pos..].to_vec();
370 cycle.push(next); return Some(cycle);
372 }
373 }
374 if state[next] == 0
375 && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
376 {
377 return Some(cycle);
378 }
379 }
380
381 path.pop();
382 state[node] = 2; None
384 }
385
386 pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
392 let all_detections = self.engine.evaluate(event);
393
394 let ts = match self.extract_event_timestamp(event) {
395 Some(ts) => ts,
396 None => match self.config.timestamp_fallback {
397 TimestampFallback::WallClock => Utc::now().timestamp(),
398 TimestampFallback::Skip => {
399 let detections = self.filter_detections(all_detections);
401 return ProcessResult {
402 detections,
403 correlations: Vec::new(),
404 };
405 }
406 },
407 };
408 self.process_with_detections(event, all_detections, ts)
409 }
410
411 pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
416 let all_detections = self.engine.evaluate(event);
417 self.process_with_detections(event, all_detections, timestamp_secs)
418 }
419
420 pub fn process_with_detections(
426 &mut self,
427 event: &impl Event,
428 all_detections: Vec<MatchResult>,
429 timestamp_secs: i64,
430 ) -> ProcessResult {
431 let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
432
433 if self.state.len() >= self.config.max_state_entries {
435 self.evict_all(timestamp_secs);
436 }
437
438 let mut correlations = Vec::new();
440 self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
441
442 self.chain_correlations(&correlations, timestamp_secs);
444
445 let detections = self.filter_detections(all_detections);
447
448 ProcessResult {
449 detections,
450 correlations,
451 }
452 }
453
454 pub fn evaluate(&self, event: &impl Event) -> Vec<MatchResult> {
460 self.engine.evaluate(event)
461 }
462
463 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
471 let engine = &self.engine;
474 let ts_fields = &self.config.timestamp_fields;
475
476 let batch_results: Vec<(Vec<MatchResult>, Option<i64>)> = {
477 #[cfg(feature = "parallel")]
478 {
479 use rayon::prelude::*;
480 events
481 .par_iter()
482 .map(|e| {
483 let detections = engine.evaluate(e);
484 let ts = extract_event_ts(e, ts_fields);
485 (detections, ts)
486 })
487 .collect()
488 }
489 #[cfg(not(feature = "parallel"))]
490 {
491 events
492 .iter()
493 .map(|e| {
494 let detections = engine.evaluate(e);
495 let ts = extract_event_ts(e, ts_fields);
496 (detections, ts)
497 })
498 .collect()
499 }
500 };
501
502 let mut results = Vec::with_capacity(events.len());
504 for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
505 match ts_opt {
506 Some(ts) => {
507 results.push(self.process_with_detections(event, detections, ts));
508 }
509 None => match self.config.timestamp_fallback {
510 TimestampFallback::WallClock => {
511 let ts = Utc::now().timestamp();
512 results.push(self.process_with_detections(event, detections, ts));
513 }
514 TimestampFallback::Skip => {
515 let detections = self.filter_detections(detections);
517 results.push(ProcessResult {
518 detections,
519 correlations: Vec::new(),
520 });
521 }
522 },
523 }
524 }
525 results
526 }
527
528 fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
533 if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
534 all_detections
535 .into_iter()
536 .filter(|m| {
537 let id_match = m
538 .rule_id
539 .as_ref()
540 .is_some_and(|id| self.correlation_only_rules.contains(id));
541 !id_match
542 })
543 .collect()
544 } else {
545 all_detections
546 }
547 }
548
549 fn feed_detections(
551 &mut self,
552 event: &impl Event,
553 detections: &[MatchResult],
554 ts: i64,
555 out: &mut Vec<CorrelationResult>,
556 ) {
557 let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
560
561 for det in detections {
562 let (rule_id, rule_name) = self.find_rule_identity(det);
565
566 let mut corr_indices = Vec::new();
568 if let Some(ref id) = rule_id
569 && let Some(indices) = self.rule_index.get(id)
570 {
571 corr_indices.extend(indices);
572 }
573 if let Some(ref name) = rule_name
574 && let Some(indices) = self.rule_index.get(name)
575 {
576 corr_indices.extend(indices);
577 }
578
579 corr_indices.sort_unstable();
580 corr_indices.dedup();
581
582 for &corr_idx in &corr_indices {
583 work.push((corr_idx, rule_id.clone(), rule_name.clone()));
584 }
585 }
586
587 for (corr_idx, rule_id, rule_name) in work {
588 self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
589 }
590 }
591
592 fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
594 if let Some(ref match_id) = det.rule_id {
596 for (id, name) in &self.rule_ids {
597 if id.as_deref() == Some(match_id.as_str()) {
598 return (id.clone(), name.clone());
599 }
600 }
601 }
602 (det.rule_id.clone(), None)
604 }
605
606 fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
608 let corr = &self.correlations[corr_idx];
609 corr.event_mode
610 .unwrap_or(self.config.correlation_event_mode)
611 }
612
613 fn resolve_max_events(&self, corr_idx: usize) -> usize {
615 let corr = &self.correlations[corr_idx];
616 corr.max_events
617 .unwrap_or(self.config.max_correlation_events)
618 }
619
620 fn update_correlation(
622 &mut self,
623 corr_idx: usize,
624 event: &impl Event,
625 ts: i64,
626 rule_id: &Option<String>,
627 rule_name: &Option<String>,
628 out: &mut Vec<CorrelationResult>,
629 ) {
630 let corr = &self.correlations[corr_idx];
634 let corr_type = corr.correlation_type;
635 let timespan = corr.timespan_secs;
636 let level = corr.level;
637 let suppress_secs = corr.suppress_secs.or(self.config.suppress);
638 let action = corr.action.unwrap_or(self.config.action_on_match);
639 let event_mode = self.resolve_event_mode(corr_idx);
640 let max_events = self.resolve_max_events(corr_idx);
641
642 let mut ref_strs: Vec<&str> = Vec::new();
644 if let Some(id) = rule_id.as_deref() {
645 ref_strs.push(id);
646 }
647 if let Some(name) = rule_name.as_deref() {
648 ref_strs.push(name);
649 }
650 let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
651
652 let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
654
655 let state_key = (corr_idx, group_key.clone());
657 let state = self
658 .state
659 .entry(state_key.clone())
660 .or_insert_with(|| WindowState::new_for(corr_type));
661
662 let cutoff = ts - timespan as i64;
664 state.evict(cutoff);
665
666 match corr_type {
668 CorrelationType::EventCount => {
669 state.push_event_count(ts);
670 }
671 CorrelationType::ValueCount => {
672 if let Some(ref fields) = corr.condition.field
673 && let Some(field_name) = fields.first()
674 && let Some(val) = event.get_field(field_name)
675 && let Some(s) = value_to_string_for_count(&val)
676 {
677 state.push_value_count(ts, s);
678 }
679 }
680 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
681 state.push_temporal(ts, rule_ref);
682 }
683 CorrelationType::ValueSum
684 | CorrelationType::ValueAvg
685 | CorrelationType::ValuePercentile
686 | CorrelationType::ValueMedian => {
687 if let Some(ref fields) = corr.condition.field
688 && let Some(field_name) = fields.first()
689 && let Some(val) = event.get_field(field_name)
690 && let Some(n) = value_to_f64_ev(&val)
691 {
692 state.push_numeric(ts, n);
693 }
694 }
695 }
696
697 match event_mode {
699 CorrelationEventMode::Full => {
700 let buf = self
701 .event_buffers
702 .entry(state_key.clone())
703 .or_insert_with(|| EventBuffer::new(max_events));
704 buf.evict(cutoff);
705 let json = event.to_json();
706 buf.push(ts, &json);
707 }
708 CorrelationEventMode::Refs => {
709 let buf = self
710 .event_ref_buffers
711 .entry(state_key.clone())
712 .or_insert_with(|| EventRefBuffer::new(max_events));
713 buf.evict(cutoff);
714 let json = event.to_json();
715 buf.push(ts, &json);
716 }
717 CorrelationEventMode::None => {}
718 }
719
720 let fired = state.check_condition(
722 &corr.condition,
723 corr_type,
724 &corr.rule_refs,
725 corr.extended_expr.as_ref(),
726 );
727
728 if let Some(agg_value) = fired {
729 let alert_key = (corr_idx, group_key.clone());
730
731 let suppressed = if let Some(suppress) = suppress_secs {
733 if let Some(&last_ts) = self.last_alert.get(&alert_key) {
734 (ts - last_ts) < suppress as i64
735 } else {
736 false
737 }
738 } else {
739 false
740 };
741
742 if !suppressed {
743 let (events, event_refs) = match event_mode {
745 CorrelationEventMode::Full => {
746 let stored = self
747 .event_buffers
748 .get(&alert_key)
749 .map(|buf| buf.decompress_all())
750 .unwrap_or_default();
751 (Some(stored), None)
752 }
753 CorrelationEventMode::Refs => {
754 let stored = self
755 .event_ref_buffers
756 .get(&alert_key)
757 .map(|buf| buf.refs())
758 .unwrap_or_default();
759 (None, Some(stored))
760 }
761 CorrelationEventMode::None => (None, None),
762 };
763
764 let corr = &self.correlations[corr_idx];
766 let result = CorrelationResult {
767 rule_title: corr.title.clone(),
768 rule_id: corr.id.clone(),
769 level,
770 tags: corr.tags.clone(),
771 correlation_type: corr_type,
772 group_key: group_key.to_pairs(&corr.group_by),
773 aggregated_value: agg_value,
774 timespan_secs: timespan,
775 events,
776 event_refs,
777 custom_attributes: corr.custom_attributes.clone(),
778 };
779 out.push(result);
780
781 self.last_alert.insert(alert_key.clone(), ts);
783
784 if action == CorrelationAction::Reset {
786 if let Some(state) = self.state.get_mut(&alert_key) {
787 state.clear();
788 }
789 if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
790 buf.clear();
791 }
792 if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
793 buf.clear();
794 }
795 }
796 }
797 }
798 }
799
800 fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
805 const MAX_CHAIN_DEPTH: usize = 10;
806 let mut pending: Vec<CorrelationResult> = fired.to_vec();
807 let mut depth = 0;
808
809 while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
810 depth += 1;
811
812 #[allow(clippy::type_complexity)]
814 let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
815 for result in &pending {
816 if let Some(ref id) = result.rule_id
817 && let Some(indices) = self.rule_index.get(id)
818 {
819 let fired_ref = result
820 .rule_id
821 .as_deref()
822 .unwrap_or(&result.rule_title)
823 .to_string();
824 for &corr_idx in indices {
825 work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
826 }
827 }
828 }
829
830 let mut next_pending = Vec::new();
831 for (corr_idx, group_key_pairs, fired_ref) in work {
832 let corr = &self.correlations[corr_idx];
833 let corr_type = corr.correlation_type;
834 let timespan = corr.timespan_secs;
835 let level = corr.level;
836
837 let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
838 let state_key = (corr_idx, group_key.clone());
839 let state = self
840 .state
841 .entry(state_key)
842 .or_insert_with(|| WindowState::new_for(corr_type));
843
844 let cutoff = ts - timespan as i64;
845 state.evict(cutoff);
846
847 match corr_type {
848 CorrelationType::EventCount => {
849 state.push_event_count(ts);
850 }
851 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
852 state.push_temporal(ts, &fired_ref);
853 }
854 _ => {
855 state.push_event_count(ts);
856 }
857 }
858
859 let fired = state.check_condition(
860 &corr.condition,
861 corr_type,
862 &corr.rule_refs,
863 corr.extended_expr.as_ref(),
864 );
865
866 if let Some(agg_value) = fired {
867 let corr = &self.correlations[corr_idx];
868 next_pending.push(CorrelationResult {
869 rule_title: corr.title.clone(),
870 rule_id: corr.id.clone(),
871 level,
872 tags: corr.tags.clone(),
873 correlation_type: corr_type,
874 group_key: group_key.to_pairs(&corr.group_by),
875 aggregated_value: agg_value,
876 timespan_secs: timespan,
877 events: None,
880 event_refs: None,
881 custom_attributes: corr.custom_attributes.clone(),
882 });
883 }
884 }
885
886 pending = next_pending;
887 }
888
889 if !pending.is_empty() {
890 log::warn!(
891 "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
892 {} pending result(s) were not propagated further. \
893 This may indicate a cycle in correlation references.",
894 pending.len()
895 );
896 }
897 }
898
899 fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
911 for field_name in &self.config.timestamp_fields {
912 if let Some(val) = event.get_field(field_name)
913 && let Some(ts) = parse_timestamp_value(&val)
914 {
915 return Some(ts);
916 }
917 }
918 None
919 }
920
921 pub fn evict_expired(&mut self, now_secs: i64) {
927 self.evict_all(now_secs);
928 }
929
930 fn evict_all(&mut self, now_secs: i64) {
932 let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
934
935 self.state.retain(|&(corr_idx, _), state| {
936 if corr_idx < timespans.len() {
937 let cutoff = now_secs - timespans[corr_idx] as i64;
938 state.evict(cutoff);
939 }
940 !state.is_empty()
941 });
942
943 self.event_buffers.retain(|&(corr_idx, _), buf| {
945 if corr_idx < timespans.len() {
946 let cutoff = now_secs - timespans[corr_idx] as i64;
947 buf.evict(cutoff);
948 }
949 !buf.is_empty()
950 });
951 self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
952 if corr_idx < timespans.len() {
953 let cutoff = now_secs - timespans[corr_idx] as i64;
954 buf.evict(cutoff);
955 }
956 !buf.is_empty()
957 });
958
959 if self.state.len() >= self.config.max_state_entries {
963 let target = self.config.max_state_entries * 9 / 10;
964 let excess = self.state.len() - target;
965
966 let mut by_staleness: Vec<_> = self
968 .state
969 .iter()
970 .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
971 .collect();
972 by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
973
974 for (key, _) in by_staleness.into_iter().take(excess) {
976 self.state.remove(&key);
977 self.last_alert.remove(&key);
978 self.event_buffers.remove(&key);
979 self.event_ref_buffers.remove(&key);
980 }
981 }
982
983 self.last_alert.retain(|key, &mut alert_ts| {
986 let suppress = if key.0 < self.correlations.len() {
987 self.correlations[key.0]
988 .suppress_secs
989 .or(self.config.suppress)
990 .unwrap_or(0)
991 } else {
992 0
993 };
994 (now_secs - alert_ts) < suppress as i64
995 });
996 }
997
998 pub fn state_count(&self) -> usize {
1000 self.state.len()
1001 }
1002
1003 pub fn detection_rule_count(&self) -> usize {
1005 self.engine.rule_count()
1006 }
1007
1008 pub fn correlation_rule_count(&self) -> usize {
1010 self.correlations.len()
1011 }
1012
1013 pub fn event_buffer_count(&self) -> usize {
1015 self.event_buffers.len()
1016 }
1017
1018 pub fn event_buffer_bytes(&self) -> usize {
1020 self.event_buffers
1021 .values()
1022 .map(|b| b.compressed_bytes())
1023 .sum()
1024 }
1025
1026 pub fn event_ref_buffer_count(&self) -> usize {
1028 self.event_ref_buffers.len()
1029 }
1030
1031 pub fn engine(&self) -> &Engine {
1033 &self.engine
1034 }
1035
1036 pub fn export_state(&self) -> CorrelationSnapshot {
1042 let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1043 for ((idx, gk), ws) in &self.state {
1044 let corr_id = self.correlation_stable_id(*idx);
1045 windows
1046 .entry(corr_id)
1047 .or_default()
1048 .push((gk.clone(), ws.clone()));
1049 }
1050
1051 let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1052 for ((idx, gk), ts) in &self.last_alert {
1053 let corr_id = self.correlation_stable_id(*idx);
1054 last_alert
1055 .entry(corr_id)
1056 .or_default()
1057 .push((gk.clone(), *ts));
1058 }
1059
1060 let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1061 for ((idx, gk), buf) in &self.event_buffers {
1062 let corr_id = self.correlation_stable_id(*idx);
1063 event_buffers
1064 .entry(corr_id)
1065 .or_default()
1066 .push((gk.clone(), buf.clone()));
1067 }
1068
1069 let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1070 HashMap::new();
1071 for ((idx, gk), buf) in &self.event_ref_buffers {
1072 let corr_id = self.correlation_stable_id(*idx);
1073 event_ref_buffers
1074 .entry(corr_id)
1075 .or_default()
1076 .push((gk.clone(), buf.clone()));
1077 }
1078
1079 CorrelationSnapshot {
1080 version: SNAPSHOT_VERSION,
1081 windows,
1082 last_alert,
1083 event_buffers,
1084 event_ref_buffers,
1085 }
1086 }
1087
1088 pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1095 if snapshot.version != SNAPSHOT_VERSION {
1096 return false;
1097 }
1098 let id_to_idx = self.build_id_to_index_map();
1099
1100 for (corr_id, groups) in snapshot.windows {
1101 if let Some(&idx) = id_to_idx.get(&corr_id) {
1102 for (gk, ws) in groups {
1103 self.state.insert((idx, gk), ws);
1104 }
1105 }
1106 }
1107
1108 for (corr_id, groups) in snapshot.last_alert {
1109 if let Some(&idx) = id_to_idx.get(&corr_id) {
1110 for (gk, ts) in groups {
1111 self.last_alert.insert((idx, gk), ts);
1112 }
1113 }
1114 }
1115
1116 for (corr_id, groups) in snapshot.event_buffers {
1117 if let Some(&idx) = id_to_idx.get(&corr_id) {
1118 for (gk, buf) in groups {
1119 self.event_buffers.insert((idx, gk), buf);
1120 }
1121 }
1122 }
1123
1124 for (corr_id, groups) in snapshot.event_ref_buffers {
1125 if let Some(&idx) = id_to_idx.get(&corr_id) {
1126 for (gk, buf) in groups {
1127 self.event_ref_buffers.insert((idx, gk), buf);
1128 }
1129 }
1130 }
1131
1132 true
1133 }
1134
1135 fn correlation_stable_id(&self, idx: usize) -> String {
1137 let corr = &self.correlations[idx];
1138 corr.id
1139 .clone()
1140 .or_else(|| corr.name.clone())
1141 .unwrap_or_else(|| corr.title.clone())
1142 }
1143
1144 fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1146 self.correlations
1147 .iter()
1148 .enumerate()
1149 .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1150 .collect()
1151 }
1152}
1153
1154impl Default for CorrelationEngine {
1155 fn default() -> Self {
1156 Self::new(CorrelationConfig::default())
1157 }
1158}
1159
1160fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1169 for field_name in timestamp_fields {
1170 if let Some(val) = event.get_field(field_name)
1171 && let Some(ts) = parse_timestamp_value(&val)
1172 {
1173 return Some(ts);
1174 }
1175 }
1176 None
1177}
1178
1179fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1181 match val {
1182 EventValue::Int(i) => Some(normalize_epoch(*i)),
1183 EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1184 EventValue::Str(s) => parse_timestamp_string(s),
1185 _ => None,
1186 }
1187}
1188
1189fn normalize_epoch(v: i64) -> i64 {
1192 if v > 1_000_000_000_000 { v / 1000 } else { v }
1193}
1194
1195fn parse_timestamp_string(s: &str) -> Option<i64> {
1197 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1199 return Some(dt.timestamp());
1200 }
1201
1202 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1205 return Some(Utc.from_utc_datetime(&naive).timestamp());
1206 }
1207 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1208 return Some(Utc.from_utc_datetime(&naive).timestamp());
1209 }
1210
1211 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1213 return Some(Utc.from_utc_datetime(&naive).timestamp());
1214 }
1215 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1216 return Some(Utc.from_utc_datetime(&naive).timestamp());
1217 }
1218
1219 None
1220}
1221
1222fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1224 match v {
1225 EventValue::Str(s) => Some(s.to_string()),
1226 EventValue::Int(n) => Some(n.to_string()),
1227 EventValue::Float(f) => Some(f.to_string()),
1228 EventValue::Bool(b) => Some(b.to_string()),
1229 EventValue::Null => Some("null".to_string()),
1230 _ => None,
1231 }
1232}
1233
1234fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1236 v.as_f64()
1237}