1#[cfg(test)]
16mod tests;
17mod types;
18
19pub use types::*;
20
21use std::collections::HashMap;
22
23use chrono::{DateTime, TimeZone, Utc};
24
25use rsigma_parser::{CorrelationRule, CorrelationType, SigmaCollection, SigmaRule};
26
27use crate::correlation::{
28 CompiledCorrelation, EventBuffer, EventRefBuffer, GroupKey, WindowState, compile_correlation,
29};
30use crate::engine::Engine;
31use crate::error::{EvalError, Result};
32use crate::event::{Event, EventValue};
33use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
34use crate::result::MatchResult;
35
36const SNAPSHOT_VERSION: u32 = 1;
42
43pub struct CorrelationEngine {
48 engine: Engine,
50 correlations: Vec<CompiledCorrelation>,
52 rule_index: HashMap<String, Vec<usize>>,
55 rule_ids: Vec<(Option<String>, Option<String>)>,
58 state: HashMap<(usize, GroupKey), WindowState>,
60 last_alert: HashMap<(usize, GroupKey), i64>,
62 event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
64 event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
66 correlation_only_rules: std::collections::HashSet<String>,
70 config: CorrelationConfig,
72 pipelines: Vec<Pipeline>,
74}
75
76impl CorrelationEngine {
77 pub fn new(config: CorrelationConfig) -> Self {
79 CorrelationEngine {
80 engine: Engine::new(),
81 correlations: Vec::new(),
82 rule_index: HashMap::new(),
83 rule_ids: Vec::new(),
84 state: HashMap::new(),
85 last_alert: HashMap::new(),
86 event_buffers: HashMap::new(),
87 event_ref_buffers: HashMap::new(),
88 correlation_only_rules: std::collections::HashSet::new(),
89 config,
90 pipelines: Vec::new(),
91 }
92 }
93
94 pub fn add_pipeline(&mut self, pipeline: Pipeline) {
98 self.pipelines.push(pipeline);
99 self.pipelines.sort_by_key(|p| p.priority);
100 }
101
102 pub fn set_include_event(&mut self, include: bool) {
104 self.engine.set_include_event(include);
105 }
106
107 pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
113 self.config.correlation_event_mode = mode;
114 }
115
116 pub fn set_max_correlation_events(&mut self, max: usize) {
119 self.config.max_correlation_events = max;
120 }
121
122 pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
128 if self.pipelines.is_empty() {
129 self.apply_custom_attributes(&rule.custom_attributes);
130 self.rule_ids.push((rule.id.clone(), rule.name.clone()));
131 self.engine.add_rule(rule)?;
132 } else {
133 let mut transformed = rule.clone();
134 apply_pipelines(&self.pipelines, &mut transformed)?;
135 self.apply_custom_attributes(&transformed.custom_attributes);
136 self.rule_ids
137 .push((transformed.id.clone(), transformed.name.clone()));
138 let compiled = crate::compiler::compile_rule(&transformed)?;
140 self.engine.add_compiled_rule(compiled);
141 }
142 Ok(())
143 }
144
145 fn apply_custom_attributes(
159 &mut self,
160 attrs: &std::collections::HashMap<String, serde_yaml::Value>,
161 ) {
162 if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
164 && !self.config.timestamp_fields.iter().any(|f| f == field)
165 {
166 self.config.timestamp_fields.insert(0, field.to_string());
167 }
168
169 if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
171 && self.config.suppress.is_none()
172 && let Ok(ts) = rsigma_parser::Timespan::parse(val)
173 {
174 self.config.suppress = Some(ts.seconds);
175 }
176
177 if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
179 && self.config.action_on_match == CorrelationAction::Alert
180 && let Ok(a) = val.parse::<CorrelationAction>()
181 {
182 self.config.action_on_match = a;
183 }
184 }
185
186 pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
188 let owned;
189 let effective = if self.pipelines.is_empty() {
190 corr
191 } else {
192 owned = {
193 let mut c = corr.clone();
194 apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
195 c
196 };
197 &owned
198 };
199
200 self.apply_custom_attributes(&effective.custom_attributes);
203
204 let compiled = compile_correlation(effective)?;
205 let idx = self.correlations.len();
206
207 for rule_ref in &compiled.rule_refs {
209 self.rule_index
210 .entry(rule_ref.clone())
211 .or_default()
212 .push(idx);
213 }
214
215 if !compiled.generate {
217 for rule_ref in &compiled.rule_refs {
218 self.correlation_only_rules.insert(rule_ref.clone());
219 }
220 }
221
222 self.correlations.push(compiled);
223 Ok(())
224 }
225
226 pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
231 for rule in &collection.rules {
232 self.add_rule(rule)?;
233 }
234 for filter in &collection.filters {
236 self.engine.apply_filter(filter)?;
237 }
238 for corr in &collection.correlations {
239 self.add_correlation(corr)?;
240 }
241 self.validate_rule_refs()?;
242 self.detect_correlation_cycles()?;
243 Ok(())
244 }
245
246 fn validate_rule_refs(&self) -> Result<()> {
249 let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
250
251 for (id, name) in &self.rule_ids {
252 if let Some(id) = id {
253 known.insert(id.as_str());
254 }
255 if let Some(name) = name {
256 known.insert(name.as_str());
257 }
258 }
259 for corr in &self.correlations {
260 if let Some(ref id) = corr.id {
261 known.insert(id.as_str());
262 }
263 if let Some(ref name) = corr.name {
264 known.insert(name.as_str());
265 }
266 }
267
268 for corr in &self.correlations {
269 for rule_ref in &corr.rule_refs {
270 if !known.contains(rule_ref.as_str()) {
271 return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
272 }
273 }
274 }
275 Ok(())
276 }
277
278 fn detect_correlation_cycles(&self) -> Result<()> {
286 let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
288 for (idx, corr) in self.correlations.iter().enumerate() {
289 if let Some(ref id) = corr.id {
290 corr_identifiers.insert(id.as_str(), idx);
291 }
292 if let Some(ref name) = corr.name {
293 corr_identifiers.insert(name.as_str(), idx);
294 }
295 }
296
297 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
299 for (idx, corr) in self.correlations.iter().enumerate() {
300 for rule_ref in &corr.rule_refs {
301 if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
302 adj[idx].push(target_idx);
303 }
304 }
305 }
306
307 let mut state = vec![0u8; self.correlations.len()]; let mut path: Vec<usize> = Vec::new();
310
311 for start in 0..self.correlations.len() {
312 if state[start] == 0
313 && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
314 {
315 let names: Vec<String> = cycle
316 .iter()
317 .map(|&i| {
318 self.correlations[i]
319 .id
320 .as_deref()
321 .or(self.correlations[i].name.as_deref())
322 .unwrap_or(&self.correlations[i].title)
323 .to_string()
324 })
325 .collect();
326 return Err(crate::error::EvalError::CorrelationCycle(
327 names.join(" -> "),
328 ));
329 }
330 }
331 Ok(())
332 }
333
334 fn dfs_find_cycle(
336 node: usize,
337 adj: &[Vec<usize>],
338 state: &mut [u8],
339 path: &mut Vec<usize>,
340 ) -> Option<Vec<usize>> {
341 state[node] = 1; path.push(node);
343
344 for &next in &adj[node] {
345 if state[next] == 1 {
346 if let Some(pos) = path.iter().position(|&n| n == next) {
348 let mut cycle = path[pos..].to_vec();
349 cycle.push(next); return Some(cycle);
351 }
352 }
353 if state[next] == 0
354 && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
355 {
356 return Some(cycle);
357 }
358 }
359
360 path.pop();
361 state[node] = 2; None
363 }
364
365 pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
371 let all_detections = self.engine.evaluate(event);
372
373 let ts = match self.extract_event_timestamp(event) {
374 Some(ts) => ts,
375 None => match self.config.timestamp_fallback {
376 TimestampFallback::WallClock => Utc::now().timestamp(),
377 TimestampFallback::Skip => {
378 let detections = self.filter_detections(all_detections);
380 return ProcessResult {
381 detections,
382 correlations: Vec::new(),
383 };
384 }
385 },
386 };
387 self.process_with_detections(event, all_detections, ts)
388 }
389
390 pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
395 let all_detections = self.engine.evaluate(event);
396 self.process_with_detections(event, all_detections, timestamp_secs)
397 }
398
399 pub fn process_with_detections(
405 &mut self,
406 event: &impl Event,
407 all_detections: Vec<MatchResult>,
408 timestamp_secs: i64,
409 ) -> ProcessResult {
410 let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
411
412 if self.state.len() >= self.config.max_state_entries {
414 self.evict_all(timestamp_secs);
415 }
416
417 let mut correlations = Vec::new();
419 self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
420
421 self.chain_correlations(&correlations, timestamp_secs);
423
424 let detections = self.filter_detections(all_detections);
426
427 ProcessResult {
428 detections,
429 correlations,
430 }
431 }
432
433 pub fn evaluate(&self, event: &impl Event) -> Vec<MatchResult> {
439 self.engine.evaluate(event)
440 }
441
442 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
450 let engine = &self.engine;
453 let ts_fields = &self.config.timestamp_fields;
454
455 let batch_results: Vec<(Vec<MatchResult>, Option<i64>)> = {
456 #[cfg(feature = "parallel")]
457 {
458 use rayon::prelude::*;
459 events
460 .par_iter()
461 .map(|e| {
462 let detections = engine.evaluate(e);
463 let ts = extract_event_ts(e, ts_fields);
464 (detections, ts)
465 })
466 .collect()
467 }
468 #[cfg(not(feature = "parallel"))]
469 {
470 events
471 .iter()
472 .map(|e| {
473 let detections = engine.evaluate(e);
474 let ts = extract_event_ts(e, ts_fields);
475 (detections, ts)
476 })
477 .collect()
478 }
479 };
480
481 let mut results = Vec::with_capacity(events.len());
483 for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
484 match ts_opt {
485 Some(ts) => {
486 results.push(self.process_with_detections(event, detections, ts));
487 }
488 None => match self.config.timestamp_fallback {
489 TimestampFallback::WallClock => {
490 let ts = Utc::now().timestamp();
491 results.push(self.process_with_detections(event, detections, ts));
492 }
493 TimestampFallback::Skip => {
494 let detections = self.filter_detections(detections);
496 results.push(ProcessResult {
497 detections,
498 correlations: Vec::new(),
499 });
500 }
501 },
502 }
503 }
504 results
505 }
506
507 fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
512 if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
513 all_detections
514 .into_iter()
515 .filter(|m| {
516 let id_match = m
517 .rule_id
518 .as_ref()
519 .is_some_and(|id| self.correlation_only_rules.contains(id));
520 !id_match
521 })
522 .collect()
523 } else {
524 all_detections
525 }
526 }
527
528 fn feed_detections(
530 &mut self,
531 event: &impl Event,
532 detections: &[MatchResult],
533 ts: i64,
534 out: &mut Vec<CorrelationResult>,
535 ) {
536 let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
539
540 for det in detections {
541 let (rule_id, rule_name) = self.find_rule_identity(det);
544
545 let mut corr_indices = Vec::new();
547 if let Some(ref id) = rule_id
548 && let Some(indices) = self.rule_index.get(id)
549 {
550 corr_indices.extend(indices);
551 }
552 if let Some(ref name) = rule_name
553 && let Some(indices) = self.rule_index.get(name)
554 {
555 corr_indices.extend(indices);
556 }
557
558 corr_indices.sort_unstable();
559 corr_indices.dedup();
560
561 for &corr_idx in &corr_indices {
562 work.push((corr_idx, rule_id.clone(), rule_name.clone()));
563 }
564 }
565
566 for (corr_idx, rule_id, rule_name) in work {
567 self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
568 }
569 }
570
571 fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
573 if let Some(ref match_id) = det.rule_id {
575 for (id, name) in &self.rule_ids {
576 if id.as_deref() == Some(match_id.as_str()) {
577 return (id.clone(), name.clone());
578 }
579 }
580 }
581 (det.rule_id.clone(), None)
583 }
584
585 fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
587 let corr = &self.correlations[corr_idx];
588 corr.event_mode
589 .unwrap_or(self.config.correlation_event_mode)
590 }
591
592 fn resolve_max_events(&self, corr_idx: usize) -> usize {
594 let corr = &self.correlations[corr_idx];
595 corr.max_events
596 .unwrap_or(self.config.max_correlation_events)
597 }
598
599 fn update_correlation(
601 &mut self,
602 corr_idx: usize,
603 event: &impl Event,
604 ts: i64,
605 rule_id: &Option<String>,
606 rule_name: &Option<String>,
607 out: &mut Vec<CorrelationResult>,
608 ) {
609 let corr = &self.correlations[corr_idx];
613 let corr_type = corr.correlation_type;
614 let timespan = corr.timespan_secs;
615 let level = corr.level;
616 let suppress_secs = corr.suppress_secs.or(self.config.suppress);
617 let action = corr.action.unwrap_or(self.config.action_on_match);
618 let event_mode = self.resolve_event_mode(corr_idx);
619 let max_events = self.resolve_max_events(corr_idx);
620
621 let mut ref_strs: Vec<&str> = Vec::new();
623 if let Some(id) = rule_id.as_deref() {
624 ref_strs.push(id);
625 }
626 if let Some(name) = rule_name.as_deref() {
627 ref_strs.push(name);
628 }
629 let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
630
631 let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
633
634 let state_key = (corr_idx, group_key.clone());
636 let state = self
637 .state
638 .entry(state_key.clone())
639 .or_insert_with(|| WindowState::new_for(corr_type));
640
641 let cutoff = ts - timespan as i64;
643 state.evict(cutoff);
644
645 match corr_type {
647 CorrelationType::EventCount => {
648 state.push_event_count(ts);
649 }
650 CorrelationType::ValueCount => {
651 if let Some(ref fields) = corr.condition.field
652 && let Some(field_name) = fields.first()
653 && let Some(val) = event.get_field(field_name)
654 && let Some(s) = value_to_string_for_count(&val)
655 {
656 state.push_value_count(ts, s);
657 }
658 }
659 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
660 state.push_temporal(ts, rule_ref);
661 }
662 CorrelationType::ValueSum
663 | CorrelationType::ValueAvg
664 | CorrelationType::ValuePercentile
665 | CorrelationType::ValueMedian => {
666 if let Some(ref fields) = corr.condition.field
667 && let Some(field_name) = fields.first()
668 && let Some(val) = event.get_field(field_name)
669 && let Some(n) = value_to_f64_ev(&val)
670 {
671 state.push_numeric(ts, n);
672 }
673 }
674 }
675
676 match event_mode {
678 CorrelationEventMode::Full => {
679 let buf = self
680 .event_buffers
681 .entry(state_key.clone())
682 .or_insert_with(|| EventBuffer::new(max_events));
683 buf.evict(cutoff);
684 let json = event.to_json();
685 buf.push(ts, &json);
686 }
687 CorrelationEventMode::Refs => {
688 let buf = self
689 .event_ref_buffers
690 .entry(state_key.clone())
691 .or_insert_with(|| EventRefBuffer::new(max_events));
692 buf.evict(cutoff);
693 let json = event.to_json();
694 buf.push(ts, &json);
695 }
696 CorrelationEventMode::None => {}
697 }
698
699 let fired = state.check_condition(
701 &corr.condition,
702 corr_type,
703 &corr.rule_refs,
704 corr.extended_expr.as_ref(),
705 );
706
707 if let Some(agg_value) = fired {
708 let alert_key = (corr_idx, group_key.clone());
709
710 let suppressed = if let Some(suppress) = suppress_secs {
712 if let Some(&last_ts) = self.last_alert.get(&alert_key) {
713 (ts - last_ts) < suppress as i64
714 } else {
715 false
716 }
717 } else {
718 false
719 };
720
721 if !suppressed {
722 let (events, event_refs) = match event_mode {
724 CorrelationEventMode::Full => {
725 let stored = self
726 .event_buffers
727 .get(&alert_key)
728 .map(|buf| buf.decompress_all())
729 .unwrap_or_default();
730 (Some(stored), None)
731 }
732 CorrelationEventMode::Refs => {
733 let stored = self
734 .event_ref_buffers
735 .get(&alert_key)
736 .map(|buf| buf.refs())
737 .unwrap_or_default();
738 (None, Some(stored))
739 }
740 CorrelationEventMode::None => (None, None),
741 };
742
743 let corr = &self.correlations[corr_idx];
745 let result = CorrelationResult {
746 rule_title: corr.title.clone(),
747 rule_id: corr.id.clone(),
748 level,
749 tags: corr.tags.clone(),
750 correlation_type: corr_type,
751 group_key: group_key.to_pairs(&corr.group_by),
752 aggregated_value: agg_value,
753 timespan_secs: timespan,
754 events,
755 event_refs,
756 custom_attributes: corr.custom_attributes.clone(),
757 };
758 out.push(result);
759
760 self.last_alert.insert(alert_key.clone(), ts);
762
763 if action == CorrelationAction::Reset {
765 if let Some(state) = self.state.get_mut(&alert_key) {
766 state.clear();
767 }
768 if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
769 buf.clear();
770 }
771 if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
772 buf.clear();
773 }
774 }
775 }
776 }
777 }
778
779 fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
784 const MAX_CHAIN_DEPTH: usize = 10;
785 let mut pending: Vec<CorrelationResult> = fired.to_vec();
786 let mut depth = 0;
787
788 while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
789 depth += 1;
790
791 #[allow(clippy::type_complexity)]
793 let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
794 for result in &pending {
795 if let Some(ref id) = result.rule_id
796 && let Some(indices) = self.rule_index.get(id)
797 {
798 let fired_ref = result
799 .rule_id
800 .as_deref()
801 .unwrap_or(&result.rule_title)
802 .to_string();
803 for &corr_idx in indices {
804 work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
805 }
806 }
807 }
808
809 let mut next_pending = Vec::new();
810 for (corr_idx, group_key_pairs, fired_ref) in work {
811 let corr = &self.correlations[corr_idx];
812 let corr_type = corr.correlation_type;
813 let timespan = corr.timespan_secs;
814 let level = corr.level;
815
816 let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
817 let state_key = (corr_idx, group_key.clone());
818 let state = self
819 .state
820 .entry(state_key)
821 .or_insert_with(|| WindowState::new_for(corr_type));
822
823 let cutoff = ts - timespan as i64;
824 state.evict(cutoff);
825
826 match corr_type {
827 CorrelationType::EventCount => {
828 state.push_event_count(ts);
829 }
830 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
831 state.push_temporal(ts, &fired_ref);
832 }
833 _ => {
834 state.push_event_count(ts);
835 }
836 }
837
838 let fired = state.check_condition(
839 &corr.condition,
840 corr_type,
841 &corr.rule_refs,
842 corr.extended_expr.as_ref(),
843 );
844
845 if let Some(agg_value) = fired {
846 let corr = &self.correlations[corr_idx];
847 next_pending.push(CorrelationResult {
848 rule_title: corr.title.clone(),
849 rule_id: corr.id.clone(),
850 level,
851 tags: corr.tags.clone(),
852 correlation_type: corr_type,
853 group_key: group_key.to_pairs(&corr.group_by),
854 aggregated_value: agg_value,
855 timespan_secs: timespan,
856 events: None,
859 event_refs: None,
860 custom_attributes: corr.custom_attributes.clone(),
861 });
862 }
863 }
864
865 pending = next_pending;
866 }
867
868 if !pending.is_empty() {
869 log::warn!(
870 "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
871 {} pending result(s) were not propagated further. \
872 This may indicate a cycle in correlation references.",
873 pending.len()
874 );
875 }
876 }
877
878 fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
890 for field_name in &self.config.timestamp_fields {
891 if let Some(val) = event.get_field(field_name)
892 && let Some(ts) = parse_timestamp_value(&val)
893 {
894 return Some(ts);
895 }
896 }
897 None
898 }
899
900 pub fn evict_expired(&mut self, now_secs: i64) {
906 self.evict_all(now_secs);
907 }
908
909 fn evict_all(&mut self, now_secs: i64) {
911 let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
913
914 self.state.retain(|&(corr_idx, _), state| {
915 if corr_idx < timespans.len() {
916 let cutoff = now_secs - timespans[corr_idx] as i64;
917 state.evict(cutoff);
918 }
919 !state.is_empty()
920 });
921
922 self.event_buffers.retain(|&(corr_idx, _), buf| {
924 if corr_idx < timespans.len() {
925 let cutoff = now_secs - timespans[corr_idx] as i64;
926 buf.evict(cutoff);
927 }
928 !buf.is_empty()
929 });
930 self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
931 if corr_idx < timespans.len() {
932 let cutoff = now_secs - timespans[corr_idx] as i64;
933 buf.evict(cutoff);
934 }
935 !buf.is_empty()
936 });
937
938 if self.state.len() >= self.config.max_state_entries {
942 let target = self.config.max_state_entries * 9 / 10;
943 let excess = self.state.len() - target;
944
945 let mut by_staleness: Vec<_> = self
947 .state
948 .iter()
949 .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
950 .collect();
951 by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
952
953 for (key, _) in by_staleness.into_iter().take(excess) {
955 self.state.remove(&key);
956 self.last_alert.remove(&key);
957 self.event_buffers.remove(&key);
958 self.event_ref_buffers.remove(&key);
959 }
960 }
961
962 self.last_alert.retain(|key, &mut alert_ts| {
965 let suppress = if key.0 < self.correlations.len() {
966 self.correlations[key.0]
967 .suppress_secs
968 .or(self.config.suppress)
969 .unwrap_or(0)
970 } else {
971 0
972 };
973 (now_secs - alert_ts) < suppress as i64
974 });
975 }
976
977 pub fn state_count(&self) -> usize {
979 self.state.len()
980 }
981
982 pub fn detection_rule_count(&self) -> usize {
984 self.engine.rule_count()
985 }
986
987 pub fn correlation_rule_count(&self) -> usize {
989 self.correlations.len()
990 }
991
992 pub fn event_buffer_count(&self) -> usize {
994 self.event_buffers.len()
995 }
996
997 pub fn event_buffer_bytes(&self) -> usize {
999 self.event_buffers
1000 .values()
1001 .map(|b| b.compressed_bytes())
1002 .sum()
1003 }
1004
1005 pub fn event_ref_buffer_count(&self) -> usize {
1007 self.event_ref_buffers.len()
1008 }
1009
1010 pub fn engine(&self) -> &Engine {
1012 &self.engine
1013 }
1014
1015 pub fn export_state(&self) -> CorrelationSnapshot {
1021 let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1022 for ((idx, gk), ws) in &self.state {
1023 let corr_id = self.correlation_stable_id(*idx);
1024 windows
1025 .entry(corr_id)
1026 .or_default()
1027 .push((gk.clone(), ws.clone()));
1028 }
1029
1030 let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1031 for ((idx, gk), ts) in &self.last_alert {
1032 let corr_id = self.correlation_stable_id(*idx);
1033 last_alert
1034 .entry(corr_id)
1035 .or_default()
1036 .push((gk.clone(), *ts));
1037 }
1038
1039 let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1040 for ((idx, gk), buf) in &self.event_buffers {
1041 let corr_id = self.correlation_stable_id(*idx);
1042 event_buffers
1043 .entry(corr_id)
1044 .or_default()
1045 .push((gk.clone(), buf.clone()));
1046 }
1047
1048 let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1049 HashMap::new();
1050 for ((idx, gk), buf) in &self.event_ref_buffers {
1051 let corr_id = self.correlation_stable_id(*idx);
1052 event_ref_buffers
1053 .entry(corr_id)
1054 .or_default()
1055 .push((gk.clone(), buf.clone()));
1056 }
1057
1058 CorrelationSnapshot {
1059 version: SNAPSHOT_VERSION,
1060 windows,
1061 last_alert,
1062 event_buffers,
1063 event_ref_buffers,
1064 }
1065 }
1066
1067 pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1074 if snapshot.version != SNAPSHOT_VERSION {
1075 return false;
1076 }
1077 let id_to_idx = self.build_id_to_index_map();
1078
1079 for (corr_id, groups) in snapshot.windows {
1080 if let Some(&idx) = id_to_idx.get(&corr_id) {
1081 for (gk, ws) in groups {
1082 self.state.insert((idx, gk), ws);
1083 }
1084 }
1085 }
1086
1087 for (corr_id, groups) in snapshot.last_alert {
1088 if let Some(&idx) = id_to_idx.get(&corr_id) {
1089 for (gk, ts) in groups {
1090 self.last_alert.insert((idx, gk), ts);
1091 }
1092 }
1093 }
1094
1095 for (corr_id, groups) in snapshot.event_buffers {
1096 if let Some(&idx) = id_to_idx.get(&corr_id) {
1097 for (gk, buf) in groups {
1098 self.event_buffers.insert((idx, gk), buf);
1099 }
1100 }
1101 }
1102
1103 for (corr_id, groups) in snapshot.event_ref_buffers {
1104 if let Some(&idx) = id_to_idx.get(&corr_id) {
1105 for (gk, buf) in groups {
1106 self.event_ref_buffers.insert((idx, gk), buf);
1107 }
1108 }
1109 }
1110
1111 true
1112 }
1113
1114 fn correlation_stable_id(&self, idx: usize) -> String {
1116 let corr = &self.correlations[idx];
1117 corr.id
1118 .clone()
1119 .or_else(|| corr.name.clone())
1120 .unwrap_or_else(|| corr.title.clone())
1121 }
1122
1123 fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1125 self.correlations
1126 .iter()
1127 .enumerate()
1128 .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1129 .collect()
1130 }
1131}
1132
1133impl Default for CorrelationEngine {
1134 fn default() -> Self {
1135 Self::new(CorrelationConfig::default())
1136 }
1137}
1138
1139fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1148 for field_name in timestamp_fields {
1149 if let Some(val) = event.get_field(field_name)
1150 && let Some(ts) = parse_timestamp_value(&val)
1151 {
1152 return Some(ts);
1153 }
1154 }
1155 None
1156}
1157
1158fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1160 match val {
1161 EventValue::Int(i) => Some(normalize_epoch(*i)),
1162 EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1163 EventValue::Str(s) => parse_timestamp_string(s),
1164 _ => None,
1165 }
1166}
1167
1168fn normalize_epoch(v: i64) -> i64 {
1171 if v > 1_000_000_000_000 { v / 1000 } else { v }
1172}
1173
1174fn parse_timestamp_string(s: &str) -> Option<i64> {
1176 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1178 return Some(dt.timestamp());
1179 }
1180
1181 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1184 return Some(Utc.from_utc_datetime(&naive).timestamp());
1185 }
1186 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1187 return Some(Utc.from_utc_datetime(&naive).timestamp());
1188 }
1189
1190 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1192 return Some(Utc.from_utc_datetime(&naive).timestamp());
1193 }
1194 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1195 return Some(Utc.from_utc_datetime(&naive).timestamp());
1196 }
1197
1198 None
1199}
1200
1201fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1203 match v {
1204 EventValue::Str(s) => Some(s.to_string()),
1205 EventValue::Int(n) => Some(n.to_string()),
1206 EventValue::Float(f) => Some(f.to_string()),
1207 EventValue::Bool(b) => Some(b.to_string()),
1208 EventValue::Null => Some("null".to_string()),
1209 _ => None,
1210 }
1211}
1212
1213fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1215 v.as_f64()
1216}