Skip to main content

rsigma_eval/correlation_engine/
mod.rs

1//! Stateful correlation engine with time-windowed aggregation.
2//!
3//! `CorrelationEngine` wraps the stateless `Engine` and adds support for
4//! Sigma correlation rules: `event_count`, `value_count`, `temporal`,
5//! `temporal_ordered`, `value_sum`, `value_avg`, `value_percentile`,
6//! and `value_median`.
7//!
8//! # Architecture
9//!
10//! 1. Events are first evaluated against detection rules (stateless)
11//! 2. Detection matches update correlation window state (stateful)
12//! 3. When a correlation condition is met, a `CorrelationResult` is emitted
13//! 4. Correlation results can chain into higher-level correlations
14
15#[cfg(test)]
16mod tests;
17mod types;
18
19pub use types::*;
20
21use std::collections::HashMap;
22
23use chrono::{DateTime, TimeZone, Utc};
24
25use rsigma_parser::{CorrelationRule, CorrelationType, SigmaCollection, SigmaRule};
26
27use crate::correlation::{
28    CompiledCorrelation, EventBuffer, EventRefBuffer, GroupKey, WindowState, compile_correlation,
29};
30use crate::engine::Engine;
31use crate::error::{EvalError, Result};
32use crate::event::{Event, EventValue};
33use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
34use crate::result::MatchResult;
35
36// =============================================================================
37// Correlation Engine
38// =============================================================================
39
40/// Current snapshot schema version. Bump when the serialized format changes.
41const SNAPSHOT_VERSION: u32 = 1;
42
43/// Stateful correlation engine.
44///
45/// Wraps the stateless `Engine` for detection rules and adds time-windowed
46/// correlation on top. Supports all 7 Sigma correlation types and chaining.
47pub struct CorrelationEngine {
48    /// Inner stateless detection engine.
49    engine: Engine,
50    /// Compiled correlation rules.
51    correlations: Vec<CompiledCorrelation>,
52    /// Maps rule ID/name -> indices into `correlations` that reference it.
53    /// This allows quick lookup: "which correlations care about rule X?"
54    rule_index: HashMap<String, Vec<usize>>,
55    /// Maps detection rule index -> (rule_id, rule_name) for reverse lookup.
56    /// Used to find which correlations a detection match triggers.
57    rule_ids: Vec<(Option<String>, Option<String>)>,
58    /// Per-(correlation_index, group_key) window state.
59    state: HashMap<(usize, GroupKey), WindowState>,
60    /// Last alert timestamp per (correlation_index, group_key) for suppression.
61    last_alert: HashMap<(usize, GroupKey), i64>,
62    /// Per-(correlation_index, group_key) compressed event buffer (`Full` mode).
63    event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
64    /// Per-(correlation_index, group_key) event reference buffer (`Refs` mode).
65    event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
66    /// Set of detection rule IDs/names that are "correlation-only"
67    /// (referenced by correlations where `generate == false`).
68    /// Used to filter detection output when `config.emit_detections == false`.
69    correlation_only_rules: std::collections::HashSet<String>,
70    /// Configuration.
71    config: CorrelationConfig,
72    /// Processing pipelines applied to rules during add_rule.
73    pipelines: Vec<Pipeline>,
74}
75
76impl CorrelationEngine {
77    /// Create a new correlation engine with the given configuration.
78    pub fn new(config: CorrelationConfig) -> Self {
79        CorrelationEngine {
80            engine: Engine::new(),
81            correlations: Vec::new(),
82            rule_index: HashMap::new(),
83            rule_ids: Vec::new(),
84            state: HashMap::new(),
85            last_alert: HashMap::new(),
86            event_buffers: HashMap::new(),
87            event_ref_buffers: HashMap::new(),
88            correlation_only_rules: std::collections::HashSet::new(),
89            config,
90            pipelines: Vec::new(),
91        }
92    }
93
94    /// Add a pipeline to the engine.
95    ///
96    /// Pipelines are applied to rules during `add_rule` / `add_collection`.
97    pub fn add_pipeline(&mut self, pipeline: Pipeline) {
98        self.pipelines.push(pipeline);
99        self.pipelines.sort_by_key(|p| p.priority);
100    }
101
102    /// Set global `include_event` on the inner detection engine.
103    pub fn set_include_event(&mut self, include: bool) {
104        self.engine.set_include_event(include);
105    }
106
107    /// Forward to [`crate::Engine::set_bloom_prefilter`] on the inner
108    /// detection engine. Off by default; the optimization helps only on
109    /// substring-heavy rule sets paired with mostly-non-matching events.
110    pub fn set_bloom_prefilter(&mut self, enabled: bool) {
111        self.engine.set_bloom_prefilter(enabled);
112    }
113
114    /// Forward to [`crate::Engine::set_bloom_max_bytes`] on the inner
115    /// detection engine.
116    pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
117        self.engine.set_bloom_max_bytes(max_bytes);
118    }
119
120    /// Forward to [`crate::Engine::set_cross_rule_ac`] on the inner
121    /// detection engine. Off by default. Available behind the
122    /// `daachorse-index` Cargo feature.
123    #[cfg(feature = "daachorse-index")]
124    pub fn set_cross_rule_ac(&mut self, enabled: bool) {
125        self.engine.set_cross_rule_ac(enabled);
126    }
127
128    /// Set the global correlation event mode.
129    ///
130    /// - `None`: no event storage (default)
131    /// - `Full`: compressed event bodies
132    /// - `Refs`: lightweight timestamp + ID references
133    pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
134        self.config.correlation_event_mode = mode;
135    }
136
137    /// Set the maximum number of events to store per correlation window group.
138    /// Only meaningful when `correlation_event_mode` is not `None`.
139    pub fn set_max_correlation_events(&mut self, max: usize) {
140        self.config.max_correlation_events = max;
141    }
142
143    /// Add a single detection rule.
144    ///
145    /// If pipelines are set, the rule is cloned and transformed before compilation.
146    /// The inner engine receives the already-transformed rule directly (not through
147    /// its own pipeline, to avoid double transformation).
148    pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
149        if self.pipelines.is_empty() {
150            self.apply_custom_attributes(&rule.custom_attributes);
151            self.rule_ids.push((rule.id.clone(), rule.name.clone()));
152            self.engine.add_rule(rule)?;
153        } else {
154            let mut transformed = rule.clone();
155            apply_pipelines(&self.pipelines, &mut transformed)?;
156            self.apply_custom_attributes(&transformed.custom_attributes);
157            self.rule_ids
158                .push((transformed.id.clone(), transformed.name.clone()));
159            // Use compile_rule + add_compiled_rule to bypass inner engine's pipelines
160            let compiled = crate::compiler::compile_rule(&transformed)?;
161            self.engine.add_compiled_rule(compiled);
162        }
163        Ok(())
164    }
165
166    /// Read `rsigma.*` custom attributes from a rule and apply them to the
167    /// engine configuration.  This allows pipelines to influence engine
168    /// behaviour via `SetCustomAttribute` transformations — the same pattern
169    /// used by pySigma backends (e.g. pySigma-backend-loki).
170    ///
171    /// Supported attributes:
172    /// - `rsigma.timestamp_field` — prepends a field name to the timestamp
173    ///   extraction priority list so the correlation engine can find the
174    ///   event timestamp in non-standard field names.
175    /// - `rsigma.suppress` — sets the default suppression window (e.g. `5m`).
176    ///   Only applied when the CLI did not already set `--suppress`.
177    /// - `rsigma.action` — sets the default post-fire action (`alert`/`reset`).
178    ///   Only applied when the CLI did not already set `--action`.
179    fn apply_custom_attributes(
180        &mut self,
181        attrs: &std::collections::HashMap<String, yaml_serde::Value>,
182    ) {
183        // rsigma.timestamp_field — prepend to priority list, skip duplicates
184        if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
185            && !self.config.timestamp_fields.iter().any(|f| f == field)
186        {
187            self.config.timestamp_fields.insert(0, field.to_string());
188        }
189
190        // rsigma.suppress — only when CLI didn't already set one
191        if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
192            && self.config.suppress.is_none()
193            && let Ok(ts) = rsigma_parser::Timespan::parse(val)
194        {
195            self.config.suppress = Some(ts.seconds);
196        }
197
198        // rsigma.action — only when CLI left it at the default (Alert)
199        if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
200            && self.config.action_on_match == CorrelationAction::Alert
201            && let Ok(a) = val.parse::<CorrelationAction>()
202        {
203            self.config.action_on_match = a;
204        }
205    }
206
207    /// Add a single correlation rule.
208    pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
209        let owned;
210        let effective = if self.pipelines.is_empty() {
211            corr
212        } else {
213            owned = {
214                let mut c = corr.clone();
215                apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
216                c
217            };
218            &owned
219        };
220
221        // Apply engine-level custom attributes from the (possibly transformed)
222        // correlation rule (e.g. rsigma.timestamp_field).
223        self.apply_custom_attributes(&effective.custom_attributes);
224
225        let compiled = compile_correlation(effective)?;
226        let idx = self.correlations.len();
227
228        // Index by each referenced rule ID/name
229        for rule_ref in &compiled.rule_refs {
230            self.rule_index
231                .entry(rule_ref.clone())
232                .or_default()
233                .push(idx);
234        }
235
236        // Track correlation-only rules (generate == false is the default)
237        if !compiled.generate {
238            for rule_ref in &compiled.rule_refs {
239                self.correlation_only_rules.insert(rule_ref.clone());
240            }
241        }
242
243        self.correlations.push(compiled);
244        Ok(())
245    }
246
247    /// Add all rules and correlations from a parsed collection.
248    ///
249    /// Detection rules are added first (so they're available for correlation
250    /// references), then correlation rules. Detection rules are compiled
251    /// sequentially and then pushed to the inner engine in a single batch,
252    /// so the inverted index and bloom filter are rebuilt exactly once for
253    /// the whole collection. Without this batching, large rule sets
254    /// (multi-thousand rules) hit an O(N²) rebuild cost on load.
255    pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
256        let mut compiled_batch = Vec::with_capacity(collection.rules.len());
257        if self.pipelines.is_empty() {
258            for rule in &collection.rules {
259                self.apply_custom_attributes(&rule.custom_attributes);
260                self.rule_ids.push((rule.id.clone(), rule.name.clone()));
261                compiled_batch.push(crate::compiler::compile_rule(rule)?);
262            }
263        } else {
264            for rule in &collection.rules {
265                let mut transformed = rule.clone();
266                apply_pipelines(&self.pipelines, &mut transformed)?;
267                self.apply_custom_attributes(&transformed.custom_attributes);
268                self.rule_ids
269                    .push((transformed.id.clone(), transformed.name.clone()));
270                // Bypass the inner engine's pipelines (would double-transform)
271                compiled_batch.push(crate::compiler::compile_rule(&transformed)?);
272            }
273        }
274        self.engine.extend_compiled_rules(compiled_batch);
275        // Apply filter rules to the inner engine's detection rules
276        for filter in &collection.filters {
277            self.engine.apply_filter(filter)?;
278        }
279        for corr in &collection.correlations {
280            self.add_correlation(corr)?;
281        }
282        self.validate_rule_refs()?;
283        self.detect_correlation_cycles()?;
284        Ok(())
285    }
286
287    /// Validate that every correlation's `rule_refs` resolve to at least one
288    /// known detection rule (by ID or name) or another correlation (by ID or name).
289    fn validate_rule_refs(&self) -> Result<()> {
290        let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
291
292        for (id, name) in &self.rule_ids {
293            if let Some(id) = id {
294                known.insert(id.as_str());
295            }
296            if let Some(name) = name {
297                known.insert(name.as_str());
298            }
299        }
300        for corr in &self.correlations {
301            if let Some(ref id) = corr.id {
302                known.insert(id.as_str());
303            }
304            if let Some(ref name) = corr.name {
305                known.insert(name.as_str());
306            }
307        }
308
309        for corr in &self.correlations {
310            for rule_ref in &corr.rule_refs {
311                if !known.contains(rule_ref.as_str()) {
312                    return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
313                }
314            }
315        }
316        Ok(())
317    }
318
319    /// Detect cycles in the correlation reference graph.
320    ///
321    /// Builds a directed graph where each correlation (identified by its id/name)
322    /// has edges to the correlations it references via `rule_refs`. Uses DFS with
323    /// a "gray/black" coloring scheme to detect back-edges (cycles).
324    ///
325    /// Returns `Err(EvalError::CorrelationCycle)` if a cycle is found.
326    fn detect_correlation_cycles(&self) -> Result<()> {
327        // Build a set of all correlation identifiers (id and/or name)
328        let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
329        for (idx, corr) in self.correlations.iter().enumerate() {
330            if let Some(ref id) = corr.id {
331                corr_identifiers.insert(id.as_str(), idx);
332            }
333            if let Some(ref name) = corr.name {
334                corr_identifiers.insert(name.as_str(), idx);
335            }
336        }
337
338        // Build adjacency list: corr index → set of corr indices it references
339        let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
340        for (idx, corr) in self.correlations.iter().enumerate() {
341            for rule_ref in &corr.rule_refs {
342                if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
343                    adj[idx].push(target_idx);
344                }
345            }
346        }
347
348        // DFS cycle detection with three states: white (unvisited), gray (in stack), black (done)
349        let mut state = vec![0u8; self.correlations.len()]; // 0=white, 1=gray, 2=black
350        let mut path: Vec<usize> = Vec::new();
351
352        for start in 0..self.correlations.len() {
353            if state[start] == 0
354                && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
355            {
356                let names: Vec<String> = cycle
357                    .iter()
358                    .map(|&i| {
359                        self.correlations[i]
360                            .id
361                            .as_deref()
362                            .or(self.correlations[i].name.as_deref())
363                            .unwrap_or(&self.correlations[i].title)
364                            .to_string()
365                    })
366                    .collect();
367                return Err(crate::error::EvalError::CorrelationCycle(
368                    names.join(" -> "),
369                ));
370            }
371        }
372        Ok(())
373    }
374
375    /// DFS helper that returns the cycle path if a back-edge is found.
376    fn dfs_find_cycle(
377        node: usize,
378        adj: &[Vec<usize>],
379        state: &mut [u8],
380        path: &mut Vec<usize>,
381    ) -> Option<Vec<usize>> {
382        state[node] = 1; // gray
383        path.push(node);
384
385        for &next in &adj[node] {
386            if state[next] == 1 {
387                // Back-edge found — extract cycle from path
388                if let Some(pos) = path.iter().position(|&n| n == next) {
389                    let mut cycle = path[pos..].to_vec();
390                    cycle.push(next); // close the cycle
391                    return Some(cycle);
392                }
393            }
394            if state[next] == 0
395                && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
396            {
397                return Some(cycle);
398            }
399        }
400
401        path.pop();
402        state[node] = 2; // black
403        None
404    }
405
406    /// Process an event, extracting the timestamp from configured event fields.
407    ///
408    /// When no timestamp field is found, the `timestamp_fallback` policy applies:
409    /// - `WallClock`: use `Utc::now()` (good for real-time streaming)
410    /// - `Skip`: return detections only, skip correlation state updates
411    pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
412        let all_detections = self.engine.evaluate(event);
413
414        let ts = match self.extract_event_timestamp(event) {
415            Some(ts) => ts,
416            None => match self.config.timestamp_fallback {
417                TimestampFallback::WallClock => Utc::now().timestamp(),
418                TimestampFallback::Skip => {
419                    // Still run detection (stateless), but skip correlation
420                    let detections = self.filter_detections(all_detections);
421                    return ProcessResult {
422                        detections,
423                        correlations: Vec::new(),
424                    };
425                }
426            },
427        };
428        self.process_with_detections(event, all_detections, ts)
429    }
430
431    /// Process an event with an explicit Unix epoch timestamp (seconds).
432    ///
433    /// The timestamp is clamped to `[0, i64::MAX / 2]` to prevent overflow
434    /// when adding timespan durations internally.
435    pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
436        let all_detections = self.engine.evaluate(event);
437        self.process_with_detections(event, all_detections, timestamp_secs)
438    }
439
440    /// Process an event with pre-computed detection results.
441    ///
442    /// Enables external parallelism: callers can run detection (via
443    /// [`evaluate`](Self::evaluate)) in parallel, then feed results here
444    /// sequentially for stateful correlation.
445    pub fn process_with_detections(
446        &mut self,
447        event: &impl Event,
448        all_detections: Vec<MatchResult>,
449        timestamp_secs: i64,
450    ) -> ProcessResult {
451        let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
452
453        // Memory management — evict before adding new state to enforce limit
454        if self.state.len() >= self.config.max_state_entries {
455            self.evict_all(timestamp_secs);
456        }
457
458        // Feed detection matches into correlations
459        let mut correlations = Vec::new();
460        self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
461
462        // Chain — correlation results may trigger higher-level correlations
463        self.chain_correlations(&correlations, timestamp_secs);
464
465        // Filter detections by generate flag
466        let detections = self.filter_detections(all_detections);
467
468        ProcessResult {
469            detections,
470            correlations,
471        }
472    }
473
474    /// Run stateless detection only (no correlation), delegating to the inner engine.
475    ///
476    /// Takes `&self` so it can be called concurrently from multiple threads
477    /// (e.g. via `rayon::par_iter`) while the mutable correlation phase runs
478    /// sequentially afterwards.
479    pub fn evaluate(&self, event: &impl Event) -> Vec<MatchResult> {
480        self.engine.evaluate(event)
481    }
482
483    /// Process a batch of events: parallel detection, then sequential correlation.
484    ///
485    /// When the `parallel` feature is enabled, the stateless detection phase runs
486    /// concurrently via rayon. Timestamp extraction also runs in the parallel
487    /// phase (it borrows `&self.config` immutably). After `collect()` releases the
488    /// immutable borrows, each event's pre-computed detections are fed into the
489    /// stateful correlation engine sequentially.
490    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
491        // Borrow split: take immutable refs to fields needed for the parallel phase.
492        // These are released by collect() before the sequential &mut self phase.
493        let engine = &self.engine;
494        let ts_fields = &self.config.timestamp_fields;
495
496        let batch_results: Vec<(Vec<MatchResult>, Option<i64>)> = {
497            #[cfg(feature = "parallel")]
498            {
499                use rayon::prelude::*;
500                events
501                    .par_iter()
502                    .map(|e| {
503                        let detections = engine.evaluate(e);
504                        let ts = extract_event_ts(e, ts_fields);
505                        (detections, ts)
506                    })
507                    .collect()
508            }
509            #[cfg(not(feature = "parallel"))]
510            {
511                events
512                    .iter()
513                    .map(|e| {
514                        let detections = engine.evaluate(e);
515                        let ts = extract_event_ts(e, ts_fields);
516                        (detections, ts)
517                    })
518                    .collect()
519            }
520        };
521
522        // Sequential correlation phase
523        let mut results = Vec::with_capacity(events.len());
524        for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
525            match ts_opt {
526                Some(ts) => {
527                    results.push(self.process_with_detections(event, detections, ts));
528                }
529                None => match self.config.timestamp_fallback {
530                    TimestampFallback::WallClock => {
531                        let ts = Utc::now().timestamp();
532                        results.push(self.process_with_detections(event, detections, ts));
533                    }
534                    TimestampFallback::Skip => {
535                        // Still return detection results, but skip correlation
536                        let detections = self.filter_detections(detections);
537                        results.push(ProcessResult {
538                            detections,
539                            correlations: Vec::new(),
540                        });
541                    }
542                },
543            }
544        }
545        results
546    }
547
548    /// Filter detections by the `generate` flag / `emit_detections` config.
549    ///
550    /// If `emit_detections` is false and some rules are correlation-only,
551    /// their detection output is suppressed.
552    fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
553        if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
554            all_detections
555                .into_iter()
556                .filter(|m| {
557                    let id_match = m
558                        .rule_id
559                        .as_ref()
560                        .is_some_and(|id| self.correlation_only_rules.contains(id));
561                    !id_match
562                })
563                .collect()
564        } else {
565            all_detections
566        }
567    }
568
569    /// Feed detection matches into correlation window states.
570    fn feed_detections(
571        &mut self,
572        event: &impl Event,
573        detections: &[MatchResult],
574        ts: i64,
575        out: &mut Vec<CorrelationResult>,
576    ) {
577        // Collect all (corr_idx, rule_id, rule_name) tuples upfront to avoid
578        // borrow conflicts between self.rule_ids and self.update_correlation.
579        let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
580
581        for det in detections {
582            // Use the MatchResult's rule_id to find the original rule's ID/name.
583            // We also look up by rule_id in our rule_ids table for the name.
584            let (rule_id, rule_name) = self.find_rule_identity(det);
585
586            // Collect correlation indices that reference this rule
587            let mut corr_indices = Vec::new();
588            if let Some(ref id) = rule_id
589                && let Some(indices) = self.rule_index.get(id)
590            {
591                corr_indices.extend(indices);
592            }
593            if let Some(ref name) = rule_name
594                && let Some(indices) = self.rule_index.get(name)
595            {
596                corr_indices.extend(indices);
597            }
598
599            corr_indices.sort_unstable();
600            corr_indices.dedup();
601
602            for &corr_idx in &corr_indices {
603                work.push((corr_idx, rule_id.clone(), rule_name.clone()));
604            }
605        }
606
607        for (corr_idx, rule_id, rule_name) in work {
608            self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
609        }
610    }
611
612    /// Find the (id, name) for a detection match by searching our rule_ids table.
613    fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
614        // First, try to find by matching rule_id in our table
615        if let Some(ref match_id) = det.rule_id {
616            for (id, name) in &self.rule_ids {
617                if id.as_deref() == Some(match_id.as_str()) {
618                    return (id.clone(), name.clone());
619                }
620            }
621        }
622        // Fall back to using just the MatchResult's rule_id
623        (det.rule_id.clone(), None)
624    }
625
626    /// Resolve the event mode for a given correlation.
627    fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
628        let corr = &self.correlations[corr_idx];
629        corr.event_mode
630            .unwrap_or(self.config.correlation_event_mode)
631    }
632
633    /// Resolve the max events cap for a given correlation.
634    fn resolve_max_events(&self, corr_idx: usize) -> usize {
635        let corr = &self.correlations[corr_idx];
636        corr.max_events
637            .unwrap_or(self.config.max_correlation_events)
638    }
639
640    /// Update a single correlation's state and check its condition.
641    fn update_correlation(
642        &mut self,
643        corr_idx: usize,
644        event: &impl Event,
645        ts: i64,
646        rule_id: &Option<String>,
647        rule_name: &Option<String>,
648        out: &mut Vec<CorrelationResult>,
649    ) {
650        // Borrow the correlation by reference — no cloning needed.  Rust allows
651        // simultaneous &self.correlations and &mut self.state / &mut self.last_alert
652        // because they are disjoint struct fields.
653        let corr = &self.correlations[corr_idx];
654        let corr_type = corr.correlation_type;
655        let timespan = corr.timespan_secs;
656        let level = corr.level;
657        let suppress_secs = corr.suppress_secs.or(self.config.suppress);
658        let action = corr.action.unwrap_or(self.config.action_on_match);
659        let event_mode = self.resolve_event_mode(corr_idx);
660        let max_events = self.resolve_max_events(corr_idx);
661
662        // Determine the rule_ref strings for alias resolution and temporal tracking.
663        let mut ref_strs: Vec<&str> = Vec::new();
664        if let Some(id) = rule_id.as_deref() {
665            ref_strs.push(id);
666        }
667        if let Some(name) = rule_name.as_deref() {
668            ref_strs.push(name);
669        }
670        let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
671
672        // Extract group key
673        let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
674
675        // Get or create window state
676        let state_key = (corr_idx, group_key.clone());
677        let state = self
678            .state
679            .entry(state_key.clone())
680            .or_insert_with(|| WindowState::new_for(corr_type));
681
682        // Evict expired entries
683        let cutoff = ts - timespan as i64;
684        state.evict(cutoff);
685
686        // Push the new event into the state
687        match corr_type {
688            CorrelationType::EventCount => {
689                state.push_event_count(ts);
690            }
691            CorrelationType::ValueCount => {
692                if let Some(ref fields) = corr.condition.field
693                    && let Some(field_name) = fields.first()
694                    && let Some(val) = event.get_field(field_name)
695                    && let Some(s) = value_to_string_for_count(&val)
696                {
697                    state.push_value_count(ts, s);
698                }
699            }
700            CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
701                state.push_temporal(ts, rule_ref);
702            }
703            CorrelationType::ValueSum
704            | CorrelationType::ValueAvg
705            | CorrelationType::ValuePercentile
706            | CorrelationType::ValueMedian => {
707                if let Some(ref fields) = corr.condition.field
708                    && let Some(field_name) = fields.first()
709                    && let Some(val) = event.get_field(field_name)
710                    && let Some(n) = value_to_f64_ev(&val)
711                {
712                    state.push_numeric(ts, n);
713                }
714            }
715        }
716
717        // Push event into buffer based on event mode
718        match event_mode {
719            CorrelationEventMode::Full => {
720                let buf = self
721                    .event_buffers
722                    .entry(state_key.clone())
723                    .or_insert_with(|| EventBuffer::new(max_events));
724                buf.evict(cutoff);
725                let json = event.to_json();
726                buf.push(ts, &json);
727            }
728            CorrelationEventMode::Refs => {
729                let buf = self
730                    .event_ref_buffers
731                    .entry(state_key.clone())
732                    .or_insert_with(|| EventRefBuffer::new(max_events));
733                buf.evict(cutoff);
734                let json = event.to_json();
735                buf.push(ts, &json);
736            }
737            CorrelationEventMode::None => {}
738        }
739
740        // Check condition — after this, `state` is no longer used (NLL drops the borrow).
741        let fired = state.check_condition(
742            &corr.condition,
743            corr_type,
744            &corr.rule_refs,
745            corr.extended_expr.as_ref(),
746        );
747
748        if let Some(agg_value) = fired {
749            let alert_key = (corr_idx, group_key.clone());
750
751            // Suppression check: skip if we've already alerted within the suppress window
752            let suppressed = if let Some(suppress) = suppress_secs {
753                if let Some(&last_ts) = self.last_alert.get(&alert_key) {
754                    (ts - last_ts) < suppress as i64
755                } else {
756                    false
757                }
758            } else {
759                false
760            };
761
762            if !suppressed {
763                // Retrieve stored events / refs based on mode
764                let (events, event_refs) = match event_mode {
765                    CorrelationEventMode::Full => {
766                        let stored = self
767                            .event_buffers
768                            .get(&alert_key)
769                            .map(|buf| buf.decompress_all())
770                            .unwrap_or_default();
771                        (Some(stored), None)
772                    }
773                    CorrelationEventMode::Refs => {
774                        let stored = self
775                            .event_ref_buffers
776                            .get(&alert_key)
777                            .map(|buf| buf.refs())
778                            .unwrap_or_default();
779                        (None, Some(stored))
780                    }
781                    CorrelationEventMode::None => (None, None),
782                };
783
784                // Only clone title/id/tags when we actually produce output
785                let corr = &self.correlations[corr_idx];
786                let result = CorrelationResult {
787                    rule_title: corr.title.clone(),
788                    rule_id: corr.id.clone(),
789                    level,
790                    tags: corr.tags.clone(),
791                    correlation_type: corr_type,
792                    group_key: group_key.to_pairs(&corr.group_by),
793                    aggregated_value: agg_value,
794                    timespan_secs: timespan,
795                    events,
796                    event_refs,
797                    custom_attributes: corr.custom_attributes.clone(),
798                };
799                out.push(result);
800
801                // Record alert time for suppression
802                self.last_alert.insert(alert_key.clone(), ts);
803
804                // Action on match
805                if action == CorrelationAction::Reset {
806                    if let Some(state) = self.state.get_mut(&alert_key) {
807                        state.clear();
808                    }
809                    if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
810                        buf.clear();
811                    }
812                    if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
813                        buf.clear();
814                    }
815                }
816            }
817        }
818    }
819
820    /// Propagate correlation results to higher-level correlations (chaining).
821    ///
822    /// When a correlation fires, any correlation that references it (by ID or name)
823    /// is updated. Limits chain depth to 10 to prevent infinite loops.
824    fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
825        const MAX_CHAIN_DEPTH: usize = 10;
826        let mut pending: Vec<CorrelationResult> = fired.to_vec();
827        let mut depth = 0;
828
829        while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
830            depth += 1;
831
832            // Collect work items: (corr_idx, group_key_pairs, fired_ref)
833            #[allow(clippy::type_complexity)]
834            let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
835            for result in &pending {
836                if let Some(ref id) = result.rule_id
837                    && let Some(indices) = self.rule_index.get(id)
838                {
839                    let fired_ref = result
840                        .rule_id
841                        .as_deref()
842                        .unwrap_or(&result.rule_title)
843                        .to_string();
844                    for &corr_idx in indices {
845                        work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
846                    }
847                }
848            }
849
850            let mut next_pending = Vec::new();
851            for (corr_idx, group_key_pairs, fired_ref) in work {
852                let corr = &self.correlations[corr_idx];
853                let corr_type = corr.correlation_type;
854                let timespan = corr.timespan_secs;
855                let level = corr.level;
856
857                let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
858                let state_key = (corr_idx, group_key.clone());
859                let state = self
860                    .state
861                    .entry(state_key)
862                    .or_insert_with(|| WindowState::new_for(corr_type));
863
864                let cutoff = ts - timespan as i64;
865                state.evict(cutoff);
866
867                match corr_type {
868                    CorrelationType::EventCount => {
869                        state.push_event_count(ts);
870                    }
871                    CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
872                        state.push_temporal(ts, &fired_ref);
873                    }
874                    _ => {
875                        state.push_event_count(ts);
876                    }
877                }
878
879                let fired = state.check_condition(
880                    &corr.condition,
881                    corr_type,
882                    &corr.rule_refs,
883                    corr.extended_expr.as_ref(),
884                );
885
886                if let Some(agg_value) = fired {
887                    let corr = &self.correlations[corr_idx];
888                    next_pending.push(CorrelationResult {
889                        rule_title: corr.title.clone(),
890                        rule_id: corr.id.clone(),
891                        level,
892                        tags: corr.tags.clone(),
893                        correlation_type: corr_type,
894                        group_key: group_key.to_pairs(&corr.group_by),
895                        aggregated_value: agg_value,
896                        timespan_secs: timespan,
897                        // Chained correlations don't include events (they aggregate
898                        // over correlation results, not raw events)
899                        events: None,
900                        event_refs: None,
901                        custom_attributes: corr.custom_attributes.clone(),
902                    });
903                }
904            }
905
906            pending = next_pending;
907        }
908
909        if !pending.is_empty() {
910            log::warn!(
911                "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
912                 {} pending result(s) were not propagated further. \
913                 This may indicate a cycle in correlation references.",
914                pending.len()
915            );
916        }
917    }
918
919    // =========================================================================
920    // Timestamp extraction
921    // =========================================================================
922
923    /// Extract a Unix epoch timestamp (seconds) from an event.
924    ///
925    /// Tries each configured timestamp field in order. Supports:
926    /// - Numeric values (epoch seconds, or epoch millis if > 1e12)
927    /// - ISO 8601 strings (e.g., "2024-07-10T12:30:00Z")
928    ///
929    /// Returns `None` if no field yields a valid timestamp.
930    fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
931        for field_name in &self.config.timestamp_fields {
932            if let Some(val) = event.get_field(field_name)
933                && let Some(ts) = parse_timestamp_value(&val)
934            {
935                return Some(ts);
936            }
937        }
938        None
939    }
940
941    // =========================================================================
942    // State management
943    // =========================================================================
944
945    /// Manually evict all expired state entries.
946    pub fn evict_expired(&mut self, now_secs: i64) {
947        self.evict_all(now_secs);
948    }
949
950    /// Evict expired entries and remove empty states.
951    fn evict_all(&mut self, now_secs: i64) {
952        // Phase 1: Time-based eviction — remove entries outside their correlation window
953        let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
954
955        self.state.retain(|&(corr_idx, _), state| {
956            if corr_idx < timespans.len() {
957                let cutoff = now_secs - timespans[corr_idx] as i64;
958                state.evict(cutoff);
959            }
960            !state.is_empty()
961        });
962
963        // Evict event buffers in sync with window state
964        self.event_buffers.retain(|&(corr_idx, _), buf| {
965            if corr_idx < timespans.len() {
966                let cutoff = now_secs - timespans[corr_idx] as i64;
967                buf.evict(cutoff);
968            }
969            !buf.is_empty()
970        });
971        self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
972            if corr_idx < timespans.len() {
973                let cutoff = now_secs - timespans[corr_idx] as i64;
974                buf.evict(cutoff);
975            }
976            !buf.is_empty()
977        });
978
979        // Phase 2: Hard cap — if still over limit after time-based eviction (e.g.
980        // high-cardinality traffic with long windows), drop the stalest entries
981        // until we're at 90% capacity to avoid evicting on every single event.
982        if self.state.len() >= self.config.max_state_entries {
983            let target = self.config.max_state_entries * 9 / 10;
984            let excess = self.state.len() - target;
985
986            log::warn!(
987                "Correlation state hard cap reached ({} entries, max {}); \
988                 evicting {} stalest entries to {} (90% capacity). \
989                 This indicates high-cardinality traffic; consider raising \
990                 max_state_entries or shortening correlation windows.",
991                self.state.len(),
992                self.config.max_state_entries,
993                excess,
994                target,
995            );
996
997            // Collect keys with their latest timestamp, sort by oldest first
998            let mut by_staleness: Vec<_> = self
999                .state
1000                .iter()
1001                .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1002                .collect();
1003            by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1004
1005            // Drop the oldest entries (and their associated event buffers)
1006            for (key, _) in by_staleness.into_iter().take(excess) {
1007                self.state.remove(&key);
1008                self.last_alert.remove(&key);
1009                self.event_buffers.remove(&key);
1010                self.event_ref_buffers.remove(&key);
1011            }
1012        }
1013
1014        // Phase 3: Evict stale last_alert entries — remove if the suppress window
1015        // has passed or if the corresponding window state no longer exists.
1016        self.last_alert.retain(|key, &mut alert_ts| {
1017            let suppress = if key.0 < self.correlations.len() {
1018                self.correlations[key.0]
1019                    .suppress_secs
1020                    .or(self.config.suppress)
1021                    .unwrap_or(0)
1022            } else {
1023                0
1024            };
1025            (now_secs - alert_ts) < suppress as i64
1026        });
1027    }
1028
1029    /// Number of active state entries (for monitoring).
1030    pub fn state_count(&self) -> usize {
1031        self.state.len()
1032    }
1033
1034    /// Number of detection rules loaded.
1035    pub fn detection_rule_count(&self) -> usize {
1036        self.engine.rule_count()
1037    }
1038
1039    /// Number of correlation rules loaded.
1040    pub fn correlation_rule_count(&self) -> usize {
1041        self.correlations.len()
1042    }
1043
1044    /// Number of active event buffers (for monitoring).
1045    pub fn event_buffer_count(&self) -> usize {
1046        self.event_buffers.len()
1047    }
1048
1049    /// Total compressed bytes across all event buffers (for monitoring).
1050    pub fn event_buffer_bytes(&self) -> usize {
1051        self.event_buffers
1052            .values()
1053            .map(|b| b.compressed_bytes())
1054            .sum()
1055    }
1056
1057    /// Number of active event ref buffers — `Refs` mode (for monitoring).
1058    pub fn event_ref_buffer_count(&self) -> usize {
1059        self.event_ref_buffers.len()
1060    }
1061
1062    /// Access the inner stateless engine.
1063    pub fn engine(&self) -> &Engine {
1064        &self.engine
1065    }
1066
1067    /// Export all mutable correlation state as a serializable snapshot.
1068    ///
1069    /// The snapshot uses stable correlation identifiers (id > name > title)
1070    /// instead of internal indices, so it survives rule reloads as long as
1071    /// the correlation rules keep the same identifiers.
1072    pub fn export_state(&self) -> CorrelationSnapshot {
1073        let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1074        for ((idx, gk), ws) in &self.state {
1075            let corr_id = self.correlation_stable_id(*idx);
1076            windows
1077                .entry(corr_id)
1078                .or_default()
1079                .push((gk.clone(), ws.clone()));
1080        }
1081
1082        let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1083        for ((idx, gk), ts) in &self.last_alert {
1084            let corr_id = self.correlation_stable_id(*idx);
1085            last_alert
1086                .entry(corr_id)
1087                .or_default()
1088                .push((gk.clone(), *ts));
1089        }
1090
1091        let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1092        for ((idx, gk), buf) in &self.event_buffers {
1093            let corr_id = self.correlation_stable_id(*idx);
1094            event_buffers
1095                .entry(corr_id)
1096                .or_default()
1097                .push((gk.clone(), buf.clone()));
1098        }
1099
1100        let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1101            HashMap::new();
1102        for ((idx, gk), buf) in &self.event_ref_buffers {
1103            let corr_id = self.correlation_stable_id(*idx);
1104            event_ref_buffers
1105                .entry(corr_id)
1106                .or_default()
1107                .push((gk.clone(), buf.clone()));
1108        }
1109
1110        CorrelationSnapshot {
1111            version: SNAPSHOT_VERSION,
1112            windows,
1113            last_alert,
1114            event_buffers,
1115            event_ref_buffers,
1116        }
1117    }
1118
1119    /// Import previously exported state, mapping stable identifiers back to
1120    /// current correlation indices. Entries whose identifiers no longer match
1121    /// any loaded correlation are silently dropped.
1122    ///
1123    /// Returns `false` (and imports nothing) if the snapshot version is
1124    /// incompatible with the current schema.
1125    pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1126        if snapshot.version != SNAPSHOT_VERSION {
1127            return false;
1128        }
1129        let id_to_idx = self.build_id_to_index_map();
1130
1131        for (corr_id, groups) in snapshot.windows {
1132            if let Some(&idx) = id_to_idx.get(&corr_id) {
1133                for (gk, ws) in groups {
1134                    self.state.insert((idx, gk), ws);
1135                }
1136            }
1137        }
1138
1139        for (corr_id, groups) in snapshot.last_alert {
1140            if let Some(&idx) = id_to_idx.get(&corr_id) {
1141                for (gk, ts) in groups {
1142                    self.last_alert.insert((idx, gk), ts);
1143                }
1144            }
1145        }
1146
1147        for (corr_id, groups) in snapshot.event_buffers {
1148            if let Some(&idx) = id_to_idx.get(&corr_id) {
1149                for (gk, buf) in groups {
1150                    self.event_buffers.insert((idx, gk), buf);
1151                }
1152            }
1153        }
1154
1155        for (corr_id, groups) in snapshot.event_ref_buffers {
1156            if let Some(&idx) = id_to_idx.get(&corr_id) {
1157                for (gk, buf) in groups {
1158                    self.event_ref_buffers.insert((idx, gk), buf);
1159                }
1160            }
1161        }
1162
1163        true
1164    }
1165
1166    /// Stable identifier for a correlation rule: prefers id, then name, then title.
1167    fn correlation_stable_id(&self, idx: usize) -> String {
1168        let corr = &self.correlations[idx];
1169        corr.id
1170            .clone()
1171            .or_else(|| corr.name.clone())
1172            .unwrap_or_else(|| corr.title.clone())
1173    }
1174
1175    /// Build a reverse map from stable id → current correlation index.
1176    fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1177        self.correlations
1178            .iter()
1179            .enumerate()
1180            .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1181            .collect()
1182    }
1183}
1184
1185impl Default for CorrelationEngine {
1186    fn default() -> Self {
1187        Self::new(CorrelationConfig::default())
1188    }
1189}
1190
1191// =============================================================================
1192// Timestamp parsing helpers
1193// =============================================================================
1194
1195/// Extract a timestamp from an event using the given field names.
1196///
1197/// Standalone version of `CorrelationEngine::extract_event_timestamp` for use
1198/// in contexts where borrowing `&self` is not possible (e.g. rayon closures).
1199fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1200    for field_name in timestamp_fields {
1201        if let Some(val) = event.get_field(field_name)
1202            && let Some(ts) = parse_timestamp_value(&val)
1203        {
1204            return Some(ts);
1205        }
1206    }
1207    None
1208}
1209
1210/// Parse an [`EventValue`] as a Unix epoch timestamp in seconds.
1211fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1212    match val {
1213        EventValue::Int(i) => Some(normalize_epoch(*i)),
1214        EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1215        EventValue::Str(s) => parse_timestamp_string(s),
1216        _ => None,
1217    }
1218}
1219
1220/// Normalize an epoch value: if it looks like milliseconds (> year 33658),
1221/// convert to seconds.
1222fn normalize_epoch(v: i64) -> i64 {
1223    if v > 1_000_000_000_000 { v / 1000 } else { v }
1224}
1225
1226/// Parse a timestamp string. Tries ISO 8601 with timezone, then without.
1227fn parse_timestamp_string(s: &str) -> Option<i64> {
1228    // Try RFC 3339 / ISO 8601 with timezone
1229    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1230        return Some(dt.timestamp());
1231    }
1232
1233    // Try ISO 8601 without timezone (assume UTC)
1234    // Common formats: "2024-07-10T12:30:00", "2024-07-10 12:30:00"
1235    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1236        return Some(Utc.from_utc_datetime(&naive).timestamp());
1237    }
1238    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1239        return Some(Utc.from_utc_datetime(&naive).timestamp());
1240    }
1241
1242    // Try with fractional seconds
1243    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1244        return Some(Utc.from_utc_datetime(&naive).timestamp());
1245    }
1246    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1247        return Some(Utc.from_utc_datetime(&naive).timestamp());
1248    }
1249
1250    None
1251}
1252
1253/// Convert an [`EventValue`] to a string for value_count purposes.
1254fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1255    match v {
1256        EventValue::Str(s) => Some(s.to_string()),
1257        EventValue::Int(n) => Some(n.to_string()),
1258        EventValue::Float(f) => Some(f.to_string()),
1259        EventValue::Bool(b) => Some(b.to_string()),
1260        EventValue::Null => Some("null".to_string()),
1261        _ => None,
1262    }
1263}
1264
1265/// Convert an [`EventValue`] to f64 for numeric aggregation.
1266fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1267    v.as_f64()
1268}