Skip to main content

rsigma_eval/correlation_engine/
mod.rs

1//! Stateful correlation engine with time-windowed aggregation.
2//!
3//! `CorrelationEngine` wraps the stateless `Engine` and adds support for
4//! Sigma correlation rules: `event_count`, `value_count`, `temporal`,
5//! `temporal_ordered`, `value_sum`, `value_avg`, `value_percentile`,
6//! and `value_median`.
7//!
8//! # Architecture
9//!
10//! 1. Events are first evaluated against detection rules (stateless)
11//! 2. Detection matches update correlation window state (stateful)
12//! 3. When a correlation condition is met, a `CorrelationResult` is emitted
13//! 4. Correlation results can chain into higher-level correlations
14
15#[cfg(test)]
16mod tests;
17mod types;
18
19pub use types::*;
20
21use std::collections::HashMap;
22
23use chrono::{DateTime, TimeZone, Utc};
24
25use rsigma_parser::{CorrelationRule, CorrelationType, SigmaCollection, SigmaRule};
26
27use crate::correlation::{
28    CompiledCorrelation, EventBuffer, EventRefBuffer, GroupKey, WindowState, compile_correlation,
29};
30use crate::engine::Engine;
31use crate::error::{EvalError, Result};
32use crate::event::{Event, EventValue};
33use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
34use crate::result::{CorrelationBody, EvaluationResult, ResultBody, RuleHeader};
35
36// =============================================================================
37// Correlation Engine
38// =============================================================================
39
40/// Current snapshot schema version. Bump when the serialized format changes.
41const SNAPSHOT_VERSION: u32 = 1;
42
43/// Stateful correlation engine.
44///
45/// Wraps the stateless `Engine` for detection rules and adds time-windowed
46/// correlation on top. Supports all 7 Sigma correlation types and chaining.
47pub struct CorrelationEngine {
48    /// Inner stateless detection engine.
49    engine: Engine,
50    /// Compiled correlation rules.
51    correlations: Vec<CompiledCorrelation>,
52    /// Maps rule ID/name -> indices into `correlations` that reference it.
53    /// This allows quick lookup: "which correlations care about rule X?"
54    rule_index: HashMap<String, Vec<usize>>,
55    /// Maps detection rule index -> (rule_id, rule_name) for reverse lookup.
56    /// Used to find which correlations a detection match triggers.
57    rule_ids: Vec<(Option<String>, Option<String>)>,
58    /// Per-(correlation_index, group_key) window state.
59    state: HashMap<(usize, GroupKey), WindowState>,
60    /// Last alert timestamp per (correlation_index, group_key) for suppression.
61    last_alert: HashMap<(usize, GroupKey), i64>,
62    /// Per-(correlation_index, group_key) compressed event buffer (`Full` mode).
63    event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
64    /// Per-(correlation_index, group_key) event reference buffer (`Refs` mode).
65    event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
66    /// Set of detection rule IDs/names that are "correlation-only"
67    /// (referenced by correlations where `generate == false`).
68    /// Used to filter detection output when `config.emit_detections == false`.
69    correlation_only_rules: std::collections::HashSet<String>,
70    /// Configuration.
71    config: CorrelationConfig,
72    /// Processing pipelines applied to rules during add_rule.
73    pipelines: Vec<Pipeline>,
74}
75
76impl CorrelationEngine {
77    /// Create a new correlation engine with the given configuration.
78    pub fn new(config: CorrelationConfig) -> Self {
79        CorrelationEngine {
80            engine: Engine::new(),
81            correlations: Vec::new(),
82            rule_index: HashMap::new(),
83            rule_ids: Vec::new(),
84            state: HashMap::new(),
85            last_alert: HashMap::new(),
86            event_buffers: HashMap::new(),
87            event_ref_buffers: HashMap::new(),
88            correlation_only_rules: std::collections::HashSet::new(),
89            config,
90            pipelines: Vec::new(),
91        }
92    }
93
94    /// Add a pipeline to the engine.
95    ///
96    /// Pipelines are applied to rules during `add_rule` / `add_collection`.
97    pub fn add_pipeline(&mut self, pipeline: Pipeline) {
98        self.pipelines.push(pipeline);
99        self.pipelines.sort_by_key(|p| p.priority);
100    }
101
102    /// Set global `include_event` on the inner detection engine.
103    pub fn set_include_event(&mut self, include: bool) {
104        self.engine.set_include_event(include);
105    }
106
107    /// Forward to [`crate::Engine::set_bloom_prefilter`] on the inner
108    /// detection engine. Off by default; the optimization helps only on
109    /// substring-heavy rule sets paired with mostly-non-matching events.
110    pub fn set_bloom_prefilter(&mut self, enabled: bool) {
111        self.engine.set_bloom_prefilter(enabled);
112    }
113
114    /// Forward to [`crate::Engine::set_bloom_max_bytes`] on the inner
115    /// detection engine.
116    pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
117        self.engine.set_bloom_max_bytes(max_bytes);
118    }
119
120    /// Forward to [`crate::Engine::set_cross_rule_ac`] on the inner
121    /// detection engine. Off by default. Available behind the
122    /// `daachorse-index` Cargo feature.
123    #[cfg(feature = "daachorse-index")]
124    pub fn set_cross_rule_ac(&mut self, enabled: bool) {
125        self.engine.set_cross_rule_ac(enabled);
126    }
127
128    /// Set the global correlation event mode.
129    ///
130    /// - `None`: no event storage (default)
131    /// - `Full`: compressed event bodies
132    /// - `Refs`: lightweight timestamp + ID references
133    pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
134        self.config.correlation_event_mode = mode;
135    }
136
137    /// Set the maximum number of events to store per correlation window group.
138    /// Only meaningful when `correlation_event_mode` is not `None`.
139    pub fn set_max_correlation_events(&mut self, max: usize) {
140        self.config.max_correlation_events = max;
141    }
142
143    /// Add a single detection rule.
144    ///
145    /// If pipelines are set, the rule is cloned and transformed before compilation.
146    /// The inner engine receives the already-transformed rule directly (not through
147    /// its own pipeline, to avoid double transformation).
148    pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
149        if self.pipelines.is_empty() {
150            self.apply_custom_attributes(&rule.custom_attributes);
151            self.rule_ids.push((rule.id.clone(), rule.name.clone()));
152            self.engine.add_rule(rule)?;
153        } else {
154            let mut transformed = rule.clone();
155            apply_pipelines(&self.pipelines, &mut transformed)?;
156            self.apply_custom_attributes(&transformed.custom_attributes);
157            self.rule_ids
158                .push((transformed.id.clone(), transformed.name.clone()));
159            // Use compile_rule + add_compiled_rule to bypass inner engine's pipelines
160            let compiled = crate::compiler::compile_rule(&transformed)?;
161            self.engine.add_compiled_rule(compiled);
162        }
163        Ok(())
164    }
165
166    /// Read `rsigma.*` custom attributes from a rule and apply them to the
167    /// engine configuration.  This allows pipelines to influence engine
168    /// behaviour via `SetCustomAttribute` transformations — the same pattern
169    /// used by pySigma backends (e.g. pySigma-backend-loki).
170    ///
171    /// Supported attributes:
172    /// - `rsigma.timestamp_field` — prepends a field name to the timestamp
173    ///   extraction priority list so the correlation engine can find the
174    ///   event timestamp in non-standard field names.
175    /// - `rsigma.suppress` — sets the default suppression window (e.g. `5m`).
176    ///   Only applied when the CLI did not already set `--suppress`.
177    /// - `rsigma.action` — sets the default post-fire action (`alert`/`reset`).
178    ///   Only applied when the CLI did not already set `--action`.
179    fn apply_custom_attributes(
180        &mut self,
181        attrs: &std::collections::HashMap<String, yaml_serde::Value>,
182    ) {
183        // rsigma.timestamp_field — prepend to priority list, skip duplicates
184        if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
185            && !self.config.timestamp_fields.iter().any(|f| f == field)
186        {
187            self.config.timestamp_fields.insert(0, field.to_string());
188        }
189
190        // rsigma.suppress — only when CLI didn't already set one
191        if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
192            && self.config.suppress.is_none()
193            && let Ok(ts) = rsigma_parser::Timespan::parse(val)
194        {
195            self.config.suppress = Some(ts.seconds);
196        }
197
198        // rsigma.action — only when CLI left it at the default (Alert)
199        if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
200            && self.config.action_on_match == CorrelationAction::Alert
201            && let Ok(a) = val.parse::<CorrelationAction>()
202        {
203            self.config.action_on_match = a;
204        }
205    }
206
207    /// Add a single correlation rule.
208    pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
209        let owned;
210        let effective = if self.pipelines.is_empty() {
211            corr
212        } else {
213            owned = {
214                let mut c = corr.clone();
215                apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
216                c
217            };
218            &owned
219        };
220
221        // Apply engine-level custom attributes from the (possibly transformed)
222        // correlation rule (e.g. rsigma.timestamp_field).
223        self.apply_custom_attributes(&effective.custom_attributes);
224
225        let compiled = compile_correlation(effective)?;
226        let idx = self.correlations.len();
227
228        // Index by each referenced rule ID/name
229        for rule_ref in &compiled.rule_refs {
230            self.rule_index
231                .entry(rule_ref.clone())
232                .or_default()
233                .push(idx);
234        }
235
236        // Track correlation-only rules (generate == false is the default)
237        if !compiled.generate {
238            for rule_ref in &compiled.rule_refs {
239                self.correlation_only_rules.insert(rule_ref.clone());
240            }
241        }
242
243        self.correlations.push(compiled);
244        Ok(())
245    }
246
247    /// Add all rules and correlations from a parsed collection.
248    ///
249    /// Detection rules are added first (so they're available for correlation
250    /// references), then correlation rules. Detection rules are compiled
251    /// sequentially and then pushed to the inner engine in a single batch,
252    /// so the inverted index and bloom filter are rebuilt exactly once for
253    /// the whole collection. Without this batching, large rule sets
254    /// (multi-thousand rules) hit an O(N²) rebuild cost on load.
255    pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
256        let mut compiled_batch = Vec::with_capacity(collection.rules.len());
257        if self.pipelines.is_empty() {
258            for rule in &collection.rules {
259                self.apply_custom_attributes(&rule.custom_attributes);
260                self.rule_ids.push((rule.id.clone(), rule.name.clone()));
261                compiled_batch.push(crate::compiler::compile_rule(rule)?);
262            }
263        } else {
264            for rule in &collection.rules {
265                let mut transformed = rule.clone();
266                apply_pipelines(&self.pipelines, &mut transformed)?;
267                self.apply_custom_attributes(&transformed.custom_attributes);
268                self.rule_ids
269                    .push((transformed.id.clone(), transformed.name.clone()));
270                // Bypass the inner engine's pipelines (would double-transform)
271                compiled_batch.push(crate::compiler::compile_rule(&transformed)?);
272            }
273        }
274        self.engine.extend_compiled_rules(compiled_batch);
275        // Apply filter rules to the inner engine's detection rules
276        for filter in &collection.filters {
277            self.engine.apply_filter(filter)?;
278        }
279        for corr in &collection.correlations {
280            self.add_correlation(corr)?;
281        }
282        self.validate_rule_refs()?;
283        self.detect_correlation_cycles()?;
284        Ok(())
285    }
286
287    /// Validate that every correlation's `rule_refs` resolve to at least one
288    /// known detection rule (by ID or name) or another correlation (by ID or name).
289    fn validate_rule_refs(&self) -> Result<()> {
290        let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
291
292        for (id, name) in &self.rule_ids {
293            if let Some(id) = id {
294                known.insert(id.as_str());
295            }
296            if let Some(name) = name {
297                known.insert(name.as_str());
298            }
299        }
300        for corr in &self.correlations {
301            if let Some(ref id) = corr.id {
302                known.insert(id.as_str());
303            }
304            if let Some(ref name) = corr.name {
305                known.insert(name.as_str());
306            }
307        }
308
309        for corr in &self.correlations {
310            for rule_ref in &corr.rule_refs {
311                if !known.contains(rule_ref.as_str()) {
312                    return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
313                }
314            }
315        }
316        Ok(())
317    }
318
319    /// Detect cycles in the correlation reference graph.
320    ///
321    /// Builds a directed graph where each correlation (identified by its id/name)
322    /// has edges to the correlations it references via `rule_refs`. Uses DFS with
323    /// a "gray/black" coloring scheme to detect back-edges (cycles).
324    ///
325    /// Returns `Err(EvalError::CorrelationCycle)` if a cycle is found.
326    fn detect_correlation_cycles(&self) -> Result<()> {
327        // Build a set of all correlation identifiers (id and/or name)
328        let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
329        for (idx, corr) in self.correlations.iter().enumerate() {
330            if let Some(ref id) = corr.id {
331                corr_identifiers.insert(id.as_str(), idx);
332            }
333            if let Some(ref name) = corr.name {
334                corr_identifiers.insert(name.as_str(), idx);
335            }
336        }
337
338        // Build adjacency list: corr index → set of corr indices it references
339        let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
340        for (idx, corr) in self.correlations.iter().enumerate() {
341            for rule_ref in &corr.rule_refs {
342                if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
343                    adj[idx].push(target_idx);
344                }
345            }
346        }
347
348        // DFS cycle detection with three states: white (unvisited), gray (in stack), black (done)
349        let mut state = vec![0u8; self.correlations.len()]; // 0=white, 1=gray, 2=black
350        let mut path: Vec<usize> = Vec::new();
351
352        for start in 0..self.correlations.len() {
353            if state[start] == 0
354                && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
355            {
356                let names: Vec<String> = cycle
357                    .iter()
358                    .map(|&i| {
359                        self.correlations[i]
360                            .id
361                            .as_deref()
362                            .or(self.correlations[i].name.as_deref())
363                            .unwrap_or(&self.correlations[i].title)
364                            .to_string()
365                    })
366                    .collect();
367                return Err(crate::error::EvalError::CorrelationCycle(
368                    names.join(" -> "),
369                ));
370            }
371        }
372        Ok(())
373    }
374
375    /// DFS helper that returns the cycle path if a back-edge is found.
376    fn dfs_find_cycle(
377        node: usize,
378        adj: &[Vec<usize>],
379        state: &mut [u8],
380        path: &mut Vec<usize>,
381    ) -> Option<Vec<usize>> {
382        state[node] = 1; // gray
383        path.push(node);
384
385        for &next in &adj[node] {
386            if state[next] == 1 {
387                // Back-edge found — extract cycle from path
388                if let Some(pos) = path.iter().position(|&n| n == next) {
389                    let mut cycle = path[pos..].to_vec();
390                    cycle.push(next); // close the cycle
391                    return Some(cycle);
392                }
393            }
394            if state[next] == 0
395                && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
396            {
397                return Some(cycle);
398            }
399        }
400
401        path.pop();
402        state[node] = 2; // black
403        None
404    }
405
406    /// Process an event, extracting the timestamp from configured event fields.
407    ///
408    /// When no timestamp field is found, the `timestamp_fallback` policy applies:
409    /// - `WallClock`: use `Utc::now()` (good for real-time streaming)
410    /// - `Skip`: return detections only, skip correlation state updates
411    pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
412        let all_detections = self.engine.evaluate(event);
413
414        let ts = match self.extract_event_timestamp(event) {
415            Some(ts) => ts,
416            None => match self.config.timestamp_fallback {
417                TimestampFallback::WallClock => Utc::now().timestamp(),
418                TimestampFallback::Skip => {
419                    // Still run detection (stateless), but skip correlation
420                    return self.filter_detections(all_detections);
421                }
422            },
423        };
424        self.process_with_detections(event, all_detections, ts)
425    }
426
427    /// Process an event with an explicit Unix epoch timestamp (seconds).
428    ///
429    /// The timestamp is clamped to `[0, i64::MAX / 2]` to prevent overflow
430    /// when adding timespan durations internally.
431    pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
432        let all_detections = self.engine.evaluate(event);
433        self.process_with_detections(event, all_detections, timestamp_secs)
434    }
435
436    /// Process an event with pre-computed detection results.
437    ///
438    /// Enables external parallelism: callers can run detection (via
439    /// [`evaluate`](Self::evaluate)) in parallel, then feed results here
440    /// sequentially for stateful correlation.
441    pub fn process_with_detections(
442        &mut self,
443        event: &impl Event,
444        all_detections: Vec<EvaluationResult>,
445        timestamp_secs: i64,
446    ) -> ProcessResult {
447        let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
448
449        // Memory management — evict before adding new state to enforce limit
450        if self.state.len() >= self.config.max_state_entries {
451            self.evict_all(timestamp_secs);
452        }
453
454        // Feed detection matches into correlations
455        let mut correlations: Vec<EvaluationResult> = Vec::new();
456        self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
457
458        // Chain — correlation results may trigger higher-level correlations
459        self.chain_correlations(&correlations, timestamp_secs);
460
461        // Filter detections by generate flag, then append the correlations.
462        let mut out = self.filter_detections(all_detections);
463        out.extend(correlations);
464        out
465    }
466
467    /// Run stateless detection only (no correlation), delegating to the inner engine.
468    ///
469    /// Returns one [`EvaluationResult`] per matched detection. Takes `&self`
470    /// so it can be called concurrently from multiple threads (e.g. via
471    /// `rayon::par_iter`) while the mutable correlation phase runs
472    /// sequentially afterwards.
473    pub fn evaluate(&self, event: &impl Event) -> Vec<EvaluationResult> {
474        self.engine.evaluate(event)
475    }
476
477    /// Process a batch of events: parallel detection, then sequential correlation.
478    ///
479    /// When the `parallel` feature is enabled, the stateless detection phase runs
480    /// concurrently via rayon. Timestamp extraction also runs in the parallel
481    /// phase (it borrows `&self.config` immutably). After `collect()` releases the
482    /// immutable borrows, each event's pre-computed detections are fed into the
483    /// stateful correlation engine sequentially.
484    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
485        // Borrow split: take immutable refs to fields needed for the parallel phase.
486        // These are released by collect() before the sequential &mut self phase.
487        let engine = &self.engine;
488        let ts_fields = &self.config.timestamp_fields;
489
490        let batch_results: Vec<(Vec<EvaluationResult>, Option<i64>)> = {
491            #[cfg(feature = "parallel")]
492            {
493                use rayon::prelude::*;
494                events
495                    .par_iter()
496                    .map(|e| {
497                        let detections = engine.evaluate(e);
498                        let ts = extract_event_ts(e, ts_fields);
499                        (detections, ts)
500                    })
501                    .collect()
502            }
503            #[cfg(not(feature = "parallel"))]
504            {
505                events
506                    .iter()
507                    .map(|e| {
508                        let detections = engine.evaluate(e);
509                        let ts = extract_event_ts(e, ts_fields);
510                        (detections, ts)
511                    })
512                    .collect()
513            }
514        };
515
516        // Sequential correlation phase
517        let mut results = Vec::with_capacity(events.len());
518        for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
519            match ts_opt {
520                Some(ts) => {
521                    results.push(self.process_with_detections(event, detections, ts));
522                }
523                None => match self.config.timestamp_fallback {
524                    TimestampFallback::WallClock => {
525                        let ts = Utc::now().timestamp();
526                        results.push(self.process_with_detections(event, detections, ts));
527                    }
528                    TimestampFallback::Skip => {
529                        // Still return detection results, but skip correlation
530                        results.push(self.filter_detections(detections));
531                    }
532                },
533            }
534        }
535        results
536    }
537
538    /// Filter detections by the `generate` flag / `emit_detections` config.
539    ///
540    /// If `emit_detections` is false and some rules are correlation-only,
541    /// their detection output is suppressed.
542    fn filter_detections(&self, all_detections: Vec<EvaluationResult>) -> Vec<EvaluationResult> {
543        if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
544            all_detections
545                .into_iter()
546                .filter(|m| {
547                    let id_match = m
548                        .header
549                        .rule_id
550                        .as_ref()
551                        .is_some_and(|id| self.correlation_only_rules.contains(id));
552                    !id_match
553                })
554                .collect()
555        } else {
556            all_detections
557        }
558    }
559
560    /// Feed detection matches into correlation window states.
561    fn feed_detections(
562        &mut self,
563        event: &impl Event,
564        detections: &[EvaluationResult],
565        ts: i64,
566        out: &mut Vec<EvaluationResult>,
567    ) {
568        // Collect all (corr_idx, rule_id, rule_name) tuples upfront to avoid
569        // borrow conflicts between self.rule_ids and self.update_correlation.
570        let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
571
572        for det in detections {
573            // Use the MatchResult's rule_id to find the original rule's ID/name.
574            // We also look up by rule_id in our rule_ids table for the name.
575            let (rule_id, rule_name) = self.find_rule_identity(det);
576
577            // Collect correlation indices that reference this rule
578            let mut corr_indices = Vec::new();
579            if let Some(ref id) = rule_id
580                && let Some(indices) = self.rule_index.get(id)
581            {
582                corr_indices.extend(indices);
583            }
584            if let Some(ref name) = rule_name
585                && let Some(indices) = self.rule_index.get(name)
586            {
587                corr_indices.extend(indices);
588            }
589
590            corr_indices.sort_unstable();
591            corr_indices.dedup();
592
593            for &corr_idx in &corr_indices {
594                work.push((corr_idx, rule_id.clone(), rule_name.clone()));
595            }
596        }
597
598        for (corr_idx, rule_id, rule_name) in work {
599            self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
600        }
601    }
602
603    /// Find the (id, name) for a detection match by searching our rule_ids table.
604    fn find_rule_identity(&self, det: &EvaluationResult) -> (Option<String>, Option<String>) {
605        // First, try to find by matching rule_id in our table
606        if let Some(ref match_id) = det.header.rule_id {
607            for (id, name) in &self.rule_ids {
608                if id.as_deref() == Some(match_id.as_str()) {
609                    return (id.clone(), name.clone());
610                }
611            }
612        }
613        // Fall back to using just the EvaluationResult's rule_id
614        (det.header.rule_id.clone(), None)
615    }
616
617    /// Resolve the event mode for a given correlation.
618    fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
619        let corr = &self.correlations[corr_idx];
620        corr.event_mode
621            .unwrap_or(self.config.correlation_event_mode)
622    }
623
624    /// Resolve the max events cap for a given correlation.
625    fn resolve_max_events(&self, corr_idx: usize) -> usize {
626        let corr = &self.correlations[corr_idx];
627        corr.max_events
628            .unwrap_or(self.config.max_correlation_events)
629    }
630
631    /// Update a single correlation's state and check its condition.
632    fn update_correlation(
633        &mut self,
634        corr_idx: usize,
635        event: &impl Event,
636        ts: i64,
637        rule_id: &Option<String>,
638        rule_name: &Option<String>,
639        out: &mut Vec<EvaluationResult>,
640    ) {
641        // Borrow the correlation by reference — no cloning needed.  Rust allows
642        // simultaneous &self.correlations and &mut self.state / &mut self.last_alert
643        // because they are disjoint struct fields.
644        let corr = &self.correlations[corr_idx];
645        let corr_type = corr.correlation_type;
646        let timespan = corr.timespan_secs;
647        let level = corr.level;
648        let suppress_secs = corr.suppress_secs.or(self.config.suppress);
649        let action = corr.action.unwrap_or(self.config.action_on_match);
650        let event_mode = self.resolve_event_mode(corr_idx);
651        let max_events = self.resolve_max_events(corr_idx);
652
653        // Determine the rule_ref strings for alias resolution and temporal tracking.
654        let mut ref_strs: Vec<&str> = Vec::new();
655        if let Some(id) = rule_id.as_deref() {
656            ref_strs.push(id);
657        }
658        if let Some(name) = rule_name.as_deref() {
659            ref_strs.push(name);
660        }
661        let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
662
663        // Extract group key
664        let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
665
666        // Get or create window state
667        let state_key = (corr_idx, group_key.clone());
668        let state = self
669            .state
670            .entry(state_key.clone())
671            .or_insert_with(|| WindowState::new_for(corr_type));
672
673        // Evict expired entries
674        let cutoff = ts - timespan as i64;
675        state.evict(cutoff);
676
677        // Push the new event into the state
678        match corr_type {
679            CorrelationType::EventCount => {
680                state.push_event_count(ts);
681            }
682            CorrelationType::ValueCount => {
683                if let Some(ref fields) = corr.condition.field
684                    && let Some(key) = composite_value_count_key(event, fields)
685                {
686                    state.push_value_count(ts, key);
687                }
688            }
689            CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
690                state.push_temporal(ts, rule_ref);
691            }
692            CorrelationType::ValueSum
693            | CorrelationType::ValueAvg
694            | CorrelationType::ValuePercentile
695            | CorrelationType::ValueMedian => {
696                if let Some(ref fields) = corr.condition.field
697                    && let Some(field_name) = fields.first()
698                    && let Some(val) = event.get_field(field_name)
699                    && let Some(n) = value_to_f64_ev(&val)
700                {
701                    state.push_numeric(ts, n);
702                }
703            }
704        }
705
706        // Push event into buffer based on event mode
707        match event_mode {
708            CorrelationEventMode::Full => {
709                let buf = self
710                    .event_buffers
711                    .entry(state_key.clone())
712                    .or_insert_with(|| EventBuffer::new(max_events));
713                buf.evict(cutoff);
714                let json = event.to_json();
715                buf.push(ts, &json);
716            }
717            CorrelationEventMode::Refs => {
718                let buf = self
719                    .event_ref_buffers
720                    .entry(state_key.clone())
721                    .or_insert_with(|| EventRefBuffer::new(max_events));
722                buf.evict(cutoff);
723                let json = event.to_json();
724                buf.push(ts, &json);
725            }
726            CorrelationEventMode::None => {}
727        }
728
729        // Check condition — after this, `state` is no longer used (NLL drops the borrow).
730        let fired = state.check_condition(
731            &corr.condition,
732            corr_type,
733            &corr.rule_refs,
734            corr.extended_expr.as_ref(),
735        );
736
737        if let Some(agg_value) = fired {
738            let alert_key = (corr_idx, group_key.clone());
739
740            // Suppression check: skip if we've already alerted within the suppress window
741            let suppressed = if let Some(suppress) = suppress_secs {
742                if let Some(&last_ts) = self.last_alert.get(&alert_key) {
743                    (ts - last_ts) < suppress as i64
744                } else {
745                    false
746                }
747            } else {
748                false
749            };
750
751            if !suppressed {
752                // Retrieve stored events / refs based on mode
753                let (events, event_refs) = match event_mode {
754                    CorrelationEventMode::Full => {
755                        let stored = self
756                            .event_buffers
757                            .get(&alert_key)
758                            .map(|buf| buf.decompress_all())
759                            .unwrap_or_default();
760                        (Some(stored), None)
761                    }
762                    CorrelationEventMode::Refs => {
763                        let stored = self
764                            .event_ref_buffers
765                            .get(&alert_key)
766                            .map(|buf| buf.refs())
767                            .unwrap_or_default();
768                        (None, Some(stored))
769                    }
770                    CorrelationEventMode::None => (None, None),
771                };
772
773                // Only clone title/id/tags when we actually produce output
774                let corr = &self.correlations[corr_idx];
775                let result = EvaluationResult {
776                    header: RuleHeader {
777                        rule_title: corr.title.clone(),
778                        rule_id: corr.id.clone(),
779                        level,
780                        tags: corr.tags.clone(),
781                        custom_attributes: corr.custom_attributes.clone(),
782                        enrichments: None,
783                    },
784                    body: ResultBody::Correlation(CorrelationBody {
785                        correlation_type: corr_type,
786                        group_key: group_key.to_pairs(&corr.group_by),
787                        aggregated_value: agg_value,
788                        timespan_secs: timespan,
789                        events,
790                        event_refs,
791                    }),
792                };
793                out.push(result);
794
795                // Record alert time for suppression
796                self.last_alert.insert(alert_key.clone(), ts);
797
798                // Action on match
799                if action == CorrelationAction::Reset {
800                    if let Some(state) = self.state.get_mut(&alert_key) {
801                        state.clear();
802                    }
803                    if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
804                        buf.clear();
805                    }
806                    if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
807                        buf.clear();
808                    }
809                }
810            }
811        }
812    }
813
814    /// Propagate correlation results to higher-level correlations (chaining).
815    ///
816    /// When a correlation fires, any correlation that references it (by ID or name)
817    /// is updated. Limits chain depth to 10 to prevent infinite loops.
818    fn chain_correlations(&mut self, fired: &[EvaluationResult], ts: i64) {
819        const MAX_CHAIN_DEPTH: usize = 10;
820        let mut pending: Vec<EvaluationResult> = fired.to_vec();
821        let mut depth = 0;
822
823        while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
824            depth += 1;
825
826            // Collect work items: (corr_idx, group_key_pairs, fired_ref)
827            #[allow(clippy::type_complexity)]
828            let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
829            for result in &pending {
830                // Only correlation results chain. Detections never reach here.
831                let Some(body) = result.as_correlation() else {
832                    continue;
833                };
834                if let Some(ref id) = result.header.rule_id
835                    && let Some(indices) = self.rule_index.get(id)
836                {
837                    let fired_ref = result
838                        .header
839                        .rule_id
840                        .as_deref()
841                        .unwrap_or(&result.header.rule_title)
842                        .to_string();
843                    for &corr_idx in indices {
844                        work.push((corr_idx, body.group_key.clone(), fired_ref.clone()));
845                    }
846                }
847            }
848
849            let mut next_pending = Vec::new();
850            for (corr_idx, group_key_pairs, fired_ref) in work {
851                let corr = &self.correlations[corr_idx];
852                let corr_type = corr.correlation_type;
853                let timespan = corr.timespan_secs;
854                let level = corr.level;
855
856                let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
857                let state_key = (corr_idx, group_key.clone());
858                let state = self
859                    .state
860                    .entry(state_key)
861                    .or_insert_with(|| WindowState::new_for(corr_type));
862
863                let cutoff = ts - timespan as i64;
864                state.evict(cutoff);
865
866                match corr_type {
867                    CorrelationType::EventCount => {
868                        state.push_event_count(ts);
869                    }
870                    CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
871                        state.push_temporal(ts, &fired_ref);
872                    }
873                    _ => {
874                        state.push_event_count(ts);
875                    }
876                }
877
878                let fired = state.check_condition(
879                    &corr.condition,
880                    corr_type,
881                    &corr.rule_refs,
882                    corr.extended_expr.as_ref(),
883                );
884
885                if let Some(agg_value) = fired {
886                    let corr = &self.correlations[corr_idx];
887                    next_pending.push(EvaluationResult {
888                        header: RuleHeader {
889                            rule_title: corr.title.clone(),
890                            rule_id: corr.id.clone(),
891                            level,
892                            tags: corr.tags.clone(),
893                            custom_attributes: corr.custom_attributes.clone(),
894                            enrichments: None,
895                        },
896                        body: ResultBody::Correlation(CorrelationBody {
897                            correlation_type: corr_type,
898                            group_key: group_key.to_pairs(&corr.group_by),
899                            aggregated_value: agg_value,
900                            timespan_secs: timespan,
901                            // Chained correlations don't include events
902                            // (they aggregate over correlation results, not
903                            // raw events)
904                            events: None,
905                            event_refs: None,
906                        }),
907                    });
908                }
909            }
910
911            pending = next_pending;
912        }
913
914        if !pending.is_empty() {
915            log::warn!(
916                "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
917                 {} pending result(s) were not propagated further. \
918                 This may indicate a cycle in correlation references.",
919                pending.len()
920            );
921        }
922    }
923
924    // =========================================================================
925    // Timestamp extraction
926    // =========================================================================
927
928    /// Extract a Unix epoch timestamp (seconds) from an event.
929    ///
930    /// Tries each configured timestamp field in order. Supports:
931    /// - Numeric values (epoch seconds, or epoch millis if > 1e12)
932    /// - ISO 8601 strings (e.g., "2024-07-10T12:30:00Z")
933    ///
934    /// Returns `None` if no field yields a valid timestamp.
935    fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
936        for field_name in &self.config.timestamp_fields {
937            if let Some(val) = event.get_field(field_name)
938                && let Some(ts) = parse_timestamp_value(&val)
939            {
940                return Some(ts);
941            }
942        }
943        None
944    }
945
946    // =========================================================================
947    // State management
948    // =========================================================================
949
950    /// Manually evict all expired state entries.
951    pub fn evict_expired(&mut self, now_secs: i64) {
952        self.evict_all(now_secs);
953    }
954
955    /// Evict expired entries and remove empty states.
956    fn evict_all(&mut self, now_secs: i64) {
957        // Phase 1: Time-based eviction — remove entries outside their correlation window
958        let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
959
960        self.state.retain(|&(corr_idx, _), state| {
961            if corr_idx < timespans.len() {
962                let cutoff = now_secs - timespans[corr_idx] as i64;
963                state.evict(cutoff);
964            }
965            !state.is_empty()
966        });
967
968        // Evict event buffers in sync with window state
969        self.event_buffers.retain(|&(corr_idx, _), buf| {
970            if corr_idx < timespans.len() {
971                let cutoff = now_secs - timespans[corr_idx] as i64;
972                buf.evict(cutoff);
973            }
974            !buf.is_empty()
975        });
976        self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
977            if corr_idx < timespans.len() {
978                let cutoff = now_secs - timespans[corr_idx] as i64;
979                buf.evict(cutoff);
980            }
981            !buf.is_empty()
982        });
983
984        // Phase 2: Hard cap — if still over limit after time-based eviction (e.g.
985        // high-cardinality traffic with long windows), drop the stalest entries
986        // until we're at 90% capacity to avoid evicting on every single event.
987        if self.state.len() >= self.config.max_state_entries {
988            let target = self.config.max_state_entries * 9 / 10;
989            let excess = self.state.len() - target;
990
991            log::warn!(
992                "Correlation state hard cap reached ({} entries, max {}); \
993                 evicting {} stalest entries to {} (90% capacity). \
994                 This indicates high-cardinality traffic; consider raising \
995                 max_state_entries or shortening correlation windows.",
996                self.state.len(),
997                self.config.max_state_entries,
998                excess,
999                target,
1000            );
1001
1002            // Collect keys with their latest timestamp, sort by oldest first
1003            let mut by_staleness: Vec<_> = self
1004                .state
1005                .iter()
1006                .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1007                .collect();
1008            by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1009
1010            // Drop the oldest entries (and their associated event buffers)
1011            for (key, _) in by_staleness.into_iter().take(excess) {
1012                self.state.remove(&key);
1013                self.last_alert.remove(&key);
1014                self.event_buffers.remove(&key);
1015                self.event_ref_buffers.remove(&key);
1016            }
1017        }
1018
1019        // Phase 3: Evict stale last_alert entries — remove if the suppress window
1020        // has passed or if the corresponding window state no longer exists.
1021        self.last_alert.retain(|key, &mut alert_ts| {
1022            let suppress = if key.0 < self.correlations.len() {
1023                self.correlations[key.0]
1024                    .suppress_secs
1025                    .or(self.config.suppress)
1026                    .unwrap_or(0)
1027            } else {
1028                0
1029            };
1030            (now_secs - alert_ts) < suppress as i64
1031        });
1032    }
1033
1034    /// Number of active state entries (for monitoring).
1035    pub fn state_count(&self) -> usize {
1036        self.state.len()
1037    }
1038
1039    /// Number of detection rules loaded.
1040    pub fn detection_rule_count(&self) -> usize {
1041        self.engine.rule_count()
1042    }
1043
1044    /// Number of correlation rules loaded.
1045    pub fn correlation_rule_count(&self) -> usize {
1046        self.correlations.len()
1047    }
1048
1049    /// Number of active event buffers (for monitoring).
1050    pub fn event_buffer_count(&self) -> usize {
1051        self.event_buffers.len()
1052    }
1053
1054    /// Total compressed bytes across all event buffers (for monitoring).
1055    pub fn event_buffer_bytes(&self) -> usize {
1056        self.event_buffers
1057            .values()
1058            .map(|b| b.compressed_bytes())
1059            .sum()
1060    }
1061
1062    /// Number of active event ref buffers — `Refs` mode (for monitoring).
1063    pub fn event_ref_buffer_count(&self) -> usize {
1064        self.event_ref_buffers.len()
1065    }
1066
1067    /// Access the inner stateless engine.
1068    pub fn engine(&self) -> &Engine {
1069        &self.engine
1070    }
1071
1072    /// Export all mutable correlation state as a serializable snapshot.
1073    ///
1074    /// The snapshot uses stable correlation identifiers (id > name > title)
1075    /// instead of internal indices, so it survives rule reloads as long as
1076    /// the correlation rules keep the same identifiers.
1077    pub fn export_state(&self) -> CorrelationSnapshot {
1078        let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1079        for ((idx, gk), ws) in &self.state {
1080            let corr_id = self.correlation_stable_id(*idx);
1081            windows
1082                .entry(corr_id)
1083                .or_default()
1084                .push((gk.clone(), ws.clone()));
1085        }
1086
1087        let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1088        for ((idx, gk), ts) in &self.last_alert {
1089            let corr_id = self.correlation_stable_id(*idx);
1090            last_alert
1091                .entry(corr_id)
1092                .or_default()
1093                .push((gk.clone(), *ts));
1094        }
1095
1096        let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1097        for ((idx, gk), buf) in &self.event_buffers {
1098            let corr_id = self.correlation_stable_id(*idx);
1099            event_buffers
1100                .entry(corr_id)
1101                .or_default()
1102                .push((gk.clone(), buf.clone()));
1103        }
1104
1105        let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1106            HashMap::new();
1107        for ((idx, gk), buf) in &self.event_ref_buffers {
1108            let corr_id = self.correlation_stable_id(*idx);
1109            event_ref_buffers
1110                .entry(corr_id)
1111                .or_default()
1112                .push((gk.clone(), buf.clone()));
1113        }
1114
1115        CorrelationSnapshot {
1116            version: SNAPSHOT_VERSION,
1117            windows,
1118            last_alert,
1119            event_buffers,
1120            event_ref_buffers,
1121        }
1122    }
1123
1124    /// Import previously exported state, mapping stable identifiers back to
1125    /// current correlation indices. Entries whose identifiers no longer match
1126    /// any loaded correlation are silently dropped.
1127    ///
1128    /// Returns `false` (and imports nothing) if the snapshot version is
1129    /// incompatible with the current schema.
1130    pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1131        if snapshot.version != SNAPSHOT_VERSION {
1132            return false;
1133        }
1134        let id_to_idx = self.build_id_to_index_map();
1135
1136        for (corr_id, groups) in snapshot.windows {
1137            if let Some(&idx) = id_to_idx.get(&corr_id) {
1138                for (gk, ws) in groups {
1139                    self.state.insert((idx, gk), ws);
1140                }
1141            }
1142        }
1143
1144        for (corr_id, groups) in snapshot.last_alert {
1145            if let Some(&idx) = id_to_idx.get(&corr_id) {
1146                for (gk, ts) in groups {
1147                    self.last_alert.insert((idx, gk), ts);
1148                }
1149            }
1150        }
1151
1152        for (corr_id, groups) in snapshot.event_buffers {
1153            if let Some(&idx) = id_to_idx.get(&corr_id) {
1154                for (gk, buf) in groups {
1155                    self.event_buffers.insert((idx, gk), buf);
1156                }
1157            }
1158        }
1159
1160        for (corr_id, groups) in snapshot.event_ref_buffers {
1161            if let Some(&idx) = id_to_idx.get(&corr_id) {
1162                for (gk, buf) in groups {
1163                    self.event_ref_buffers.insert((idx, gk), buf);
1164                }
1165            }
1166        }
1167
1168        true
1169    }
1170
1171    /// Stable identifier for a correlation rule: prefers id, then name, then title.
1172    fn correlation_stable_id(&self, idx: usize) -> String {
1173        let corr = &self.correlations[idx];
1174        corr.id
1175            .clone()
1176            .or_else(|| corr.name.clone())
1177            .unwrap_or_else(|| corr.title.clone())
1178    }
1179
1180    /// Build a reverse map from stable id → current correlation index.
1181    fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1182        self.correlations
1183            .iter()
1184            .enumerate()
1185            .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1186            .collect()
1187    }
1188}
1189
1190impl Default for CorrelationEngine {
1191    fn default() -> Self {
1192        Self::new(CorrelationConfig::default())
1193    }
1194}
1195
1196// =============================================================================
1197// Timestamp parsing helpers
1198// =============================================================================
1199
1200/// Extract a timestamp from an event using the given field names.
1201///
1202/// Standalone version of `CorrelationEngine::extract_event_timestamp` for use
1203/// in contexts where borrowing `&self` is not possible (e.g. rayon closures).
1204fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1205    for field_name in timestamp_fields {
1206        if let Some(val) = event.get_field(field_name)
1207            && let Some(ts) = parse_timestamp_value(&val)
1208        {
1209            return Some(ts);
1210        }
1211    }
1212    None
1213}
1214
1215/// Parse an [`EventValue`] as a Unix epoch timestamp in seconds.
1216fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1217    match val {
1218        EventValue::Int(i) => Some(normalize_epoch(*i)),
1219        EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1220        EventValue::Str(s) => parse_timestamp_string(s),
1221        _ => None,
1222    }
1223}
1224
1225/// Normalize an epoch value: if it looks like milliseconds (> year 33658),
1226/// convert to seconds.
1227fn normalize_epoch(v: i64) -> i64 {
1228    if v > 1_000_000_000_000 { v / 1000 } else { v }
1229}
1230
1231/// Parse a timestamp string. Tries ISO 8601 with timezone, then without.
1232fn parse_timestamp_string(s: &str) -> Option<i64> {
1233    // Try RFC 3339 / ISO 8601 with timezone
1234    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1235        return Some(dt.timestamp());
1236    }
1237
1238    // Try ISO 8601 without timezone (assume UTC)
1239    // Common formats: "2024-07-10T12:30:00", "2024-07-10 12:30:00"
1240    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1241        return Some(Utc.from_utc_datetime(&naive).timestamp());
1242    }
1243    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1244        return Some(Utc.from_utc_datetime(&naive).timestamp());
1245    }
1246
1247    // Try with fractional seconds
1248    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1249        return Some(Utc.from_utc_datetime(&naive).timestamp());
1250    }
1251    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1252        return Some(Utc.from_utc_datetime(&naive).timestamp());
1253    }
1254
1255    None
1256}
1257
1258/// Convert an [`EventValue`] to a string for value_count purposes.
1259fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1260    match v {
1261        EventValue::Str(s) => Some(s.to_string()),
1262        EventValue::Int(n) => Some(n.to_string()),
1263        EventValue::Float(f) => Some(f.to_string()),
1264        EventValue::Bool(b) => Some(b.to_string()),
1265        EventValue::Null => Some("null".to_string()),
1266        _ => None,
1267    }
1268}
1269
1270/// Build a composite distinct-key for `value_count` over one or more fields.
1271///
1272/// Each field's value is rendered with [`value_to_string_for_count`] and the
1273/// rendered parts are joined with `\u{1f}` (the ASCII Unit Separator), which
1274/// is unlikely to occur in normal log data. If any field is missing or has a
1275/// type that does not produce a stable string representation, the event is
1276/// excluded from the distinct count (return `None`), matching the historical
1277/// single-field behavior.
1278fn composite_value_count_key(event: &impl Event, fields: &[String]) -> Option<String> {
1279    // Common case: exactly one field. Avoid the separator overhead.
1280    if let [field_name] = fields {
1281        let val = event.get_field(field_name)?;
1282        return value_to_string_for_count(&val);
1283    }
1284
1285    let mut parts = Vec::with_capacity(fields.len());
1286    for field_name in fields {
1287        let val = event.get_field(field_name)?;
1288        let rendered = value_to_string_for_count(&val)?;
1289        parts.push(rendered);
1290    }
1291    Some(parts.join("\u{1f}"))
1292}
1293
1294/// Convert an [`EventValue`] to f64 for numeric aggregation.
1295fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1296    v.as_f64()
1297}