Skip to main content

rsigma_eval/correlation_engine/
mod.rs

1//! Stateful correlation engine with time-windowed aggregation.
2//!
3//! `CorrelationEngine` wraps the stateless `Engine` and adds support for
4//! Sigma correlation rules: `event_count`, `value_count`, `temporal`,
5//! `temporal_ordered`, `value_sum`, `value_avg`, `value_percentile`,
6//! and `value_median`.
7//!
8//! # Architecture
9//!
10//! 1. Events are first evaluated against detection rules (stateless)
11//! 2. Detection matches update correlation window state (stateful)
12//! 3. When a correlation condition is met, a `CorrelationResult` is emitted
13//! 4. Correlation results can chain into higher-level correlations
14
15#[cfg(test)]
16mod tests;
17mod types;
18
19pub use types::*;
20
21use std::collections::HashMap;
22
23use chrono::{DateTime, TimeZone, Utc};
24
25use rsigma_parser::{CorrelationRule, CorrelationType, SigmaCollection, SigmaRule};
26
27use crate::correlation::{
28    CompiledCorrelation, EventBuffer, EventRefBuffer, GroupKey, WindowState, compile_correlation,
29};
30use crate::engine::Engine;
31use crate::error::{EvalError, Result};
32use crate::event::{Event, EventValue};
33use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
34use crate::result::MatchResult;
35
36// =============================================================================
37// Correlation Engine
38// =============================================================================
39
40/// Current snapshot schema version. Bump when the serialized format changes.
41const SNAPSHOT_VERSION: u32 = 1;
42
43/// Stateful correlation engine.
44///
45/// Wraps the stateless `Engine` for detection rules and adds time-windowed
46/// correlation on top. Supports all 7 Sigma correlation types and chaining.
47pub struct CorrelationEngine {
48    /// Inner stateless detection engine.
49    engine: Engine,
50    /// Compiled correlation rules.
51    correlations: Vec<CompiledCorrelation>,
52    /// Maps rule ID/name -> indices into `correlations` that reference it.
53    /// This allows quick lookup: "which correlations care about rule X?"
54    rule_index: HashMap<String, Vec<usize>>,
55    /// Maps detection rule index -> (rule_id, rule_name) for reverse lookup.
56    /// Used to find which correlations a detection match triggers.
57    rule_ids: Vec<(Option<String>, Option<String>)>,
58    /// Per-(correlation_index, group_key) window state.
59    state: HashMap<(usize, GroupKey), WindowState>,
60    /// Last alert timestamp per (correlation_index, group_key) for suppression.
61    last_alert: HashMap<(usize, GroupKey), i64>,
62    /// Per-(correlation_index, group_key) compressed event buffer (`Full` mode).
63    event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
64    /// Per-(correlation_index, group_key) event reference buffer (`Refs` mode).
65    event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
66    /// Set of detection rule IDs/names that are "correlation-only"
67    /// (referenced by correlations where `generate == false`).
68    /// Used to filter detection output when `config.emit_detections == false`.
69    correlation_only_rules: std::collections::HashSet<String>,
70    /// Configuration.
71    config: CorrelationConfig,
72    /// Processing pipelines applied to rules during add_rule.
73    pipelines: Vec<Pipeline>,
74}
75
76impl CorrelationEngine {
77    /// Create a new correlation engine with the given configuration.
78    pub fn new(config: CorrelationConfig) -> Self {
79        CorrelationEngine {
80            engine: Engine::new(),
81            correlations: Vec::new(),
82            rule_index: HashMap::new(),
83            rule_ids: Vec::new(),
84            state: HashMap::new(),
85            last_alert: HashMap::new(),
86            event_buffers: HashMap::new(),
87            event_ref_buffers: HashMap::new(),
88            correlation_only_rules: std::collections::HashSet::new(),
89            config,
90            pipelines: Vec::new(),
91        }
92    }
93
94    /// Add a pipeline to the engine.
95    ///
96    /// Pipelines are applied to rules during `add_rule` / `add_collection`.
97    pub fn add_pipeline(&mut self, pipeline: Pipeline) {
98        self.pipelines.push(pipeline);
99        self.pipelines.sort_by_key(|p| p.priority);
100    }
101
102    /// Set global `include_event` on the inner detection engine.
103    pub fn set_include_event(&mut self, include: bool) {
104        self.engine.set_include_event(include);
105    }
106
107    /// Forward to [`crate::Engine::set_bloom_prefilter`] on the inner
108    /// detection engine. Off by default; the optimization helps only on
109    /// substring-heavy rule sets paired with mostly-non-matching events.
110    pub fn set_bloom_prefilter(&mut self, enabled: bool) {
111        self.engine.set_bloom_prefilter(enabled);
112    }
113
114    /// Forward to [`crate::Engine::set_bloom_max_bytes`] on the inner
115    /// detection engine.
116    pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
117        self.engine.set_bloom_max_bytes(max_bytes);
118    }
119
120    /// Forward to [`crate::Engine::set_cross_rule_ac`] on the inner
121    /// detection engine. Off by default. Available behind the
122    /// `daachorse-index` Cargo feature.
123    #[cfg(feature = "daachorse-index")]
124    pub fn set_cross_rule_ac(&mut self, enabled: bool) {
125        self.engine.set_cross_rule_ac(enabled);
126    }
127
128    /// Set the global correlation event mode.
129    ///
130    /// - `None`: no event storage (default)
131    /// - `Full`: compressed event bodies
132    /// - `Refs`: lightweight timestamp + ID references
133    pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
134        self.config.correlation_event_mode = mode;
135    }
136
137    /// Set the maximum number of events to store per correlation window group.
138    /// Only meaningful when `correlation_event_mode` is not `None`.
139    pub fn set_max_correlation_events(&mut self, max: usize) {
140        self.config.max_correlation_events = max;
141    }
142
143    /// Add a single detection rule.
144    ///
145    /// If pipelines are set, the rule is cloned and transformed before compilation.
146    /// The inner engine receives the already-transformed rule directly (not through
147    /// its own pipeline, to avoid double transformation).
148    pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
149        if self.pipelines.is_empty() {
150            self.apply_custom_attributes(&rule.custom_attributes);
151            self.rule_ids.push((rule.id.clone(), rule.name.clone()));
152            self.engine.add_rule(rule)?;
153        } else {
154            let mut transformed = rule.clone();
155            apply_pipelines(&self.pipelines, &mut transformed)?;
156            self.apply_custom_attributes(&transformed.custom_attributes);
157            self.rule_ids
158                .push((transformed.id.clone(), transformed.name.clone()));
159            // Use compile_rule + add_compiled_rule to bypass inner engine's pipelines
160            let compiled = crate::compiler::compile_rule(&transformed)?;
161            self.engine.add_compiled_rule(compiled);
162        }
163        Ok(())
164    }
165
166    /// Read `rsigma.*` custom attributes from a rule and apply them to the
167    /// engine configuration.  This allows pipelines to influence engine
168    /// behaviour via `SetCustomAttribute` transformations — the same pattern
169    /// used by pySigma backends (e.g. pySigma-backend-loki).
170    ///
171    /// Supported attributes:
172    /// - `rsigma.timestamp_field` — prepends a field name to the timestamp
173    ///   extraction priority list so the correlation engine can find the
174    ///   event timestamp in non-standard field names.
175    /// - `rsigma.suppress` — sets the default suppression window (e.g. `5m`).
176    ///   Only applied when the CLI did not already set `--suppress`.
177    /// - `rsigma.action` — sets the default post-fire action (`alert`/`reset`).
178    ///   Only applied when the CLI did not already set `--action`.
179    fn apply_custom_attributes(
180        &mut self,
181        attrs: &std::collections::HashMap<String, serde_yaml::Value>,
182    ) {
183        // rsigma.timestamp_field — prepend to priority list, skip duplicates
184        if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
185            && !self.config.timestamp_fields.iter().any(|f| f == field)
186        {
187            self.config.timestamp_fields.insert(0, field.to_string());
188        }
189
190        // rsigma.suppress — only when CLI didn't already set one
191        if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
192            && self.config.suppress.is_none()
193            && let Ok(ts) = rsigma_parser::Timespan::parse(val)
194        {
195            self.config.suppress = Some(ts.seconds);
196        }
197
198        // rsigma.action — only when CLI left it at the default (Alert)
199        if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
200            && self.config.action_on_match == CorrelationAction::Alert
201            && let Ok(a) = val.parse::<CorrelationAction>()
202        {
203            self.config.action_on_match = a;
204        }
205    }
206
207    /// Add a single correlation rule.
208    pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
209        let owned;
210        let effective = if self.pipelines.is_empty() {
211            corr
212        } else {
213            owned = {
214                let mut c = corr.clone();
215                apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
216                c
217            };
218            &owned
219        };
220
221        // Apply engine-level custom attributes from the (possibly transformed)
222        // correlation rule (e.g. rsigma.timestamp_field).
223        self.apply_custom_attributes(&effective.custom_attributes);
224
225        let compiled = compile_correlation(effective)?;
226        let idx = self.correlations.len();
227
228        // Index by each referenced rule ID/name
229        for rule_ref in &compiled.rule_refs {
230            self.rule_index
231                .entry(rule_ref.clone())
232                .or_default()
233                .push(idx);
234        }
235
236        // Track correlation-only rules (generate == false is the default)
237        if !compiled.generate {
238            for rule_ref in &compiled.rule_refs {
239                self.correlation_only_rules.insert(rule_ref.clone());
240            }
241        }
242
243        self.correlations.push(compiled);
244        Ok(())
245    }
246
247    /// Add all rules and correlations from a parsed collection.
248    ///
249    /// Detection rules are added first (so they're available for correlation
250    /// references), then correlation rules.
251    pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
252        for rule in &collection.rules {
253            self.add_rule(rule)?;
254        }
255        // Apply filter rules to the inner engine's detection rules
256        for filter in &collection.filters {
257            self.engine.apply_filter(filter)?;
258        }
259        for corr in &collection.correlations {
260            self.add_correlation(corr)?;
261        }
262        self.validate_rule_refs()?;
263        self.detect_correlation_cycles()?;
264        Ok(())
265    }
266
267    /// Validate that every correlation's `rule_refs` resolve to at least one
268    /// known detection rule (by ID or name) or another correlation (by ID or name).
269    fn validate_rule_refs(&self) -> Result<()> {
270        let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
271
272        for (id, name) in &self.rule_ids {
273            if let Some(id) = id {
274                known.insert(id.as_str());
275            }
276            if let Some(name) = name {
277                known.insert(name.as_str());
278            }
279        }
280        for corr in &self.correlations {
281            if let Some(ref id) = corr.id {
282                known.insert(id.as_str());
283            }
284            if let Some(ref name) = corr.name {
285                known.insert(name.as_str());
286            }
287        }
288
289        for corr in &self.correlations {
290            for rule_ref in &corr.rule_refs {
291                if !known.contains(rule_ref.as_str()) {
292                    return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
293                }
294            }
295        }
296        Ok(())
297    }
298
299    /// Detect cycles in the correlation reference graph.
300    ///
301    /// Builds a directed graph where each correlation (identified by its id/name)
302    /// has edges to the correlations it references via `rule_refs`. Uses DFS with
303    /// a "gray/black" coloring scheme to detect back-edges (cycles).
304    ///
305    /// Returns `Err(EvalError::CorrelationCycle)` if a cycle is found.
306    fn detect_correlation_cycles(&self) -> Result<()> {
307        // Build a set of all correlation identifiers (id and/or name)
308        let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
309        for (idx, corr) in self.correlations.iter().enumerate() {
310            if let Some(ref id) = corr.id {
311                corr_identifiers.insert(id.as_str(), idx);
312            }
313            if let Some(ref name) = corr.name {
314                corr_identifiers.insert(name.as_str(), idx);
315            }
316        }
317
318        // Build adjacency list: corr index → set of corr indices it references
319        let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
320        for (idx, corr) in self.correlations.iter().enumerate() {
321            for rule_ref in &corr.rule_refs {
322                if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
323                    adj[idx].push(target_idx);
324                }
325            }
326        }
327
328        // DFS cycle detection with three states: white (unvisited), gray (in stack), black (done)
329        let mut state = vec![0u8; self.correlations.len()]; // 0=white, 1=gray, 2=black
330        let mut path: Vec<usize> = Vec::new();
331
332        for start in 0..self.correlations.len() {
333            if state[start] == 0
334                && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
335            {
336                let names: Vec<String> = cycle
337                    .iter()
338                    .map(|&i| {
339                        self.correlations[i]
340                            .id
341                            .as_deref()
342                            .or(self.correlations[i].name.as_deref())
343                            .unwrap_or(&self.correlations[i].title)
344                            .to_string()
345                    })
346                    .collect();
347                return Err(crate::error::EvalError::CorrelationCycle(
348                    names.join(" -> "),
349                ));
350            }
351        }
352        Ok(())
353    }
354
355    /// DFS helper that returns the cycle path if a back-edge is found.
356    fn dfs_find_cycle(
357        node: usize,
358        adj: &[Vec<usize>],
359        state: &mut [u8],
360        path: &mut Vec<usize>,
361    ) -> Option<Vec<usize>> {
362        state[node] = 1; // gray
363        path.push(node);
364
365        for &next in &adj[node] {
366            if state[next] == 1 {
367                // Back-edge found — extract cycle from path
368                if let Some(pos) = path.iter().position(|&n| n == next) {
369                    let mut cycle = path[pos..].to_vec();
370                    cycle.push(next); // close the cycle
371                    return Some(cycle);
372                }
373            }
374            if state[next] == 0
375                && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
376            {
377                return Some(cycle);
378            }
379        }
380
381        path.pop();
382        state[node] = 2; // black
383        None
384    }
385
386    /// Process an event, extracting the timestamp from configured event fields.
387    ///
388    /// When no timestamp field is found, the `timestamp_fallback` policy applies:
389    /// - `WallClock`: use `Utc::now()` (good for real-time streaming)
390    /// - `Skip`: return detections only, skip correlation state updates
391    pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
392        let all_detections = self.engine.evaluate(event);
393
394        let ts = match self.extract_event_timestamp(event) {
395            Some(ts) => ts,
396            None => match self.config.timestamp_fallback {
397                TimestampFallback::WallClock => Utc::now().timestamp(),
398                TimestampFallback::Skip => {
399                    // Still run detection (stateless), but skip correlation
400                    let detections = self.filter_detections(all_detections);
401                    return ProcessResult {
402                        detections,
403                        correlations: Vec::new(),
404                    };
405                }
406            },
407        };
408        self.process_with_detections(event, all_detections, ts)
409    }
410
411    /// Process an event with an explicit Unix epoch timestamp (seconds).
412    ///
413    /// The timestamp is clamped to `[0, i64::MAX / 2]` to prevent overflow
414    /// when adding timespan durations internally.
415    pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
416        let all_detections = self.engine.evaluate(event);
417        self.process_with_detections(event, all_detections, timestamp_secs)
418    }
419
420    /// Process an event with pre-computed detection results.
421    ///
422    /// Enables external parallelism: callers can run detection (via
423    /// [`evaluate`](Self::evaluate)) in parallel, then feed results here
424    /// sequentially for stateful correlation.
425    pub fn process_with_detections(
426        &mut self,
427        event: &impl Event,
428        all_detections: Vec<MatchResult>,
429        timestamp_secs: i64,
430    ) -> ProcessResult {
431        let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
432
433        // Memory management — evict before adding new state to enforce limit
434        if self.state.len() >= self.config.max_state_entries {
435            self.evict_all(timestamp_secs);
436        }
437
438        // Feed detection matches into correlations
439        let mut correlations = Vec::new();
440        self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
441
442        // Chain — correlation results may trigger higher-level correlations
443        self.chain_correlations(&correlations, timestamp_secs);
444
445        // Filter detections by generate flag
446        let detections = self.filter_detections(all_detections);
447
448        ProcessResult {
449            detections,
450            correlations,
451        }
452    }
453
454    /// Run stateless detection only (no correlation), delegating to the inner engine.
455    ///
456    /// Takes `&self` so it can be called concurrently from multiple threads
457    /// (e.g. via `rayon::par_iter`) while the mutable correlation phase runs
458    /// sequentially afterwards.
459    pub fn evaluate(&self, event: &impl Event) -> Vec<MatchResult> {
460        self.engine.evaluate(event)
461    }
462
463    /// Process a batch of events: parallel detection, then sequential correlation.
464    ///
465    /// When the `parallel` feature is enabled, the stateless detection phase runs
466    /// concurrently via rayon. Timestamp extraction also runs in the parallel
467    /// phase (it borrows `&self.config` immutably). After `collect()` releases the
468    /// immutable borrows, each event's pre-computed detections are fed into the
469    /// stateful correlation engine sequentially.
470    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
471        // Borrow split: take immutable refs to fields needed for the parallel phase.
472        // These are released by collect() before the sequential &mut self phase.
473        let engine = &self.engine;
474        let ts_fields = &self.config.timestamp_fields;
475
476        let batch_results: Vec<(Vec<MatchResult>, Option<i64>)> = {
477            #[cfg(feature = "parallel")]
478            {
479                use rayon::prelude::*;
480                events
481                    .par_iter()
482                    .map(|e| {
483                        let detections = engine.evaluate(e);
484                        let ts = extract_event_ts(e, ts_fields);
485                        (detections, ts)
486                    })
487                    .collect()
488            }
489            #[cfg(not(feature = "parallel"))]
490            {
491                events
492                    .iter()
493                    .map(|e| {
494                        let detections = engine.evaluate(e);
495                        let ts = extract_event_ts(e, ts_fields);
496                        (detections, ts)
497                    })
498                    .collect()
499            }
500        };
501
502        // Sequential correlation phase
503        let mut results = Vec::with_capacity(events.len());
504        for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
505            match ts_opt {
506                Some(ts) => {
507                    results.push(self.process_with_detections(event, detections, ts));
508                }
509                None => match self.config.timestamp_fallback {
510                    TimestampFallback::WallClock => {
511                        let ts = Utc::now().timestamp();
512                        results.push(self.process_with_detections(event, detections, ts));
513                    }
514                    TimestampFallback::Skip => {
515                        // Still return detection results, but skip correlation
516                        let detections = self.filter_detections(detections);
517                        results.push(ProcessResult {
518                            detections,
519                            correlations: Vec::new(),
520                        });
521                    }
522                },
523            }
524        }
525        results
526    }
527
528    /// Filter detections by the `generate` flag / `emit_detections` config.
529    ///
530    /// If `emit_detections` is false and some rules are correlation-only,
531    /// their detection output is suppressed.
532    fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
533        if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
534            all_detections
535                .into_iter()
536                .filter(|m| {
537                    let id_match = m
538                        .rule_id
539                        .as_ref()
540                        .is_some_and(|id| self.correlation_only_rules.contains(id));
541                    !id_match
542                })
543                .collect()
544        } else {
545            all_detections
546        }
547    }
548
549    /// Feed detection matches into correlation window states.
550    fn feed_detections(
551        &mut self,
552        event: &impl Event,
553        detections: &[MatchResult],
554        ts: i64,
555        out: &mut Vec<CorrelationResult>,
556    ) {
557        // Collect all (corr_idx, rule_id, rule_name) tuples upfront to avoid
558        // borrow conflicts between self.rule_ids and self.update_correlation.
559        let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
560
561        for det in detections {
562            // Use the MatchResult's rule_id to find the original rule's ID/name.
563            // We also look up by rule_id in our rule_ids table for the name.
564            let (rule_id, rule_name) = self.find_rule_identity(det);
565
566            // Collect correlation indices that reference this rule
567            let mut corr_indices = Vec::new();
568            if let Some(ref id) = rule_id
569                && let Some(indices) = self.rule_index.get(id)
570            {
571                corr_indices.extend(indices);
572            }
573            if let Some(ref name) = rule_name
574                && let Some(indices) = self.rule_index.get(name)
575            {
576                corr_indices.extend(indices);
577            }
578
579            corr_indices.sort_unstable();
580            corr_indices.dedup();
581
582            for &corr_idx in &corr_indices {
583                work.push((corr_idx, rule_id.clone(), rule_name.clone()));
584            }
585        }
586
587        for (corr_idx, rule_id, rule_name) in work {
588            self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
589        }
590    }
591
592    /// Find the (id, name) for a detection match by searching our rule_ids table.
593    fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
594        // First, try to find by matching rule_id in our table
595        if let Some(ref match_id) = det.rule_id {
596            for (id, name) in &self.rule_ids {
597                if id.as_deref() == Some(match_id.as_str()) {
598                    return (id.clone(), name.clone());
599                }
600            }
601        }
602        // Fall back to using just the MatchResult's rule_id
603        (det.rule_id.clone(), None)
604    }
605
606    /// Resolve the event mode for a given correlation.
607    fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
608        let corr = &self.correlations[corr_idx];
609        corr.event_mode
610            .unwrap_or(self.config.correlation_event_mode)
611    }
612
613    /// Resolve the max events cap for a given correlation.
614    fn resolve_max_events(&self, corr_idx: usize) -> usize {
615        let corr = &self.correlations[corr_idx];
616        corr.max_events
617            .unwrap_or(self.config.max_correlation_events)
618    }
619
620    /// Update a single correlation's state and check its condition.
621    fn update_correlation(
622        &mut self,
623        corr_idx: usize,
624        event: &impl Event,
625        ts: i64,
626        rule_id: &Option<String>,
627        rule_name: &Option<String>,
628        out: &mut Vec<CorrelationResult>,
629    ) {
630        // Borrow the correlation by reference — no cloning needed.  Rust allows
631        // simultaneous &self.correlations and &mut self.state / &mut self.last_alert
632        // because they are disjoint struct fields.
633        let corr = &self.correlations[corr_idx];
634        let corr_type = corr.correlation_type;
635        let timespan = corr.timespan_secs;
636        let level = corr.level;
637        let suppress_secs = corr.suppress_secs.or(self.config.suppress);
638        let action = corr.action.unwrap_or(self.config.action_on_match);
639        let event_mode = self.resolve_event_mode(corr_idx);
640        let max_events = self.resolve_max_events(corr_idx);
641
642        // Determine the rule_ref strings for alias resolution and temporal tracking.
643        let mut ref_strs: Vec<&str> = Vec::new();
644        if let Some(id) = rule_id.as_deref() {
645            ref_strs.push(id);
646        }
647        if let Some(name) = rule_name.as_deref() {
648            ref_strs.push(name);
649        }
650        let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
651
652        // Extract group key
653        let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
654
655        // Get or create window state
656        let state_key = (corr_idx, group_key.clone());
657        let state = self
658            .state
659            .entry(state_key.clone())
660            .or_insert_with(|| WindowState::new_for(corr_type));
661
662        // Evict expired entries
663        let cutoff = ts - timespan as i64;
664        state.evict(cutoff);
665
666        // Push the new event into the state
667        match corr_type {
668            CorrelationType::EventCount => {
669                state.push_event_count(ts);
670            }
671            CorrelationType::ValueCount => {
672                if let Some(ref fields) = corr.condition.field
673                    && let Some(field_name) = fields.first()
674                    && let Some(val) = event.get_field(field_name)
675                    && let Some(s) = value_to_string_for_count(&val)
676                {
677                    state.push_value_count(ts, s);
678                }
679            }
680            CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
681                state.push_temporal(ts, rule_ref);
682            }
683            CorrelationType::ValueSum
684            | CorrelationType::ValueAvg
685            | CorrelationType::ValuePercentile
686            | CorrelationType::ValueMedian => {
687                if let Some(ref fields) = corr.condition.field
688                    && let Some(field_name) = fields.first()
689                    && let Some(val) = event.get_field(field_name)
690                    && let Some(n) = value_to_f64_ev(&val)
691                {
692                    state.push_numeric(ts, n);
693                }
694            }
695        }
696
697        // Push event into buffer based on event mode
698        match event_mode {
699            CorrelationEventMode::Full => {
700                let buf = self
701                    .event_buffers
702                    .entry(state_key.clone())
703                    .or_insert_with(|| EventBuffer::new(max_events));
704                buf.evict(cutoff);
705                let json = event.to_json();
706                buf.push(ts, &json);
707            }
708            CorrelationEventMode::Refs => {
709                let buf = self
710                    .event_ref_buffers
711                    .entry(state_key.clone())
712                    .or_insert_with(|| EventRefBuffer::new(max_events));
713                buf.evict(cutoff);
714                let json = event.to_json();
715                buf.push(ts, &json);
716            }
717            CorrelationEventMode::None => {}
718        }
719
720        // Check condition — after this, `state` is no longer used (NLL drops the borrow).
721        let fired = state.check_condition(
722            &corr.condition,
723            corr_type,
724            &corr.rule_refs,
725            corr.extended_expr.as_ref(),
726        );
727
728        if let Some(agg_value) = fired {
729            let alert_key = (corr_idx, group_key.clone());
730
731            // Suppression check: skip if we've already alerted within the suppress window
732            let suppressed = if let Some(suppress) = suppress_secs {
733                if let Some(&last_ts) = self.last_alert.get(&alert_key) {
734                    (ts - last_ts) < suppress as i64
735                } else {
736                    false
737                }
738            } else {
739                false
740            };
741
742            if !suppressed {
743                // Retrieve stored events / refs based on mode
744                let (events, event_refs) = match event_mode {
745                    CorrelationEventMode::Full => {
746                        let stored = self
747                            .event_buffers
748                            .get(&alert_key)
749                            .map(|buf| buf.decompress_all())
750                            .unwrap_or_default();
751                        (Some(stored), None)
752                    }
753                    CorrelationEventMode::Refs => {
754                        let stored = self
755                            .event_ref_buffers
756                            .get(&alert_key)
757                            .map(|buf| buf.refs())
758                            .unwrap_or_default();
759                        (None, Some(stored))
760                    }
761                    CorrelationEventMode::None => (None, None),
762                };
763
764                // Only clone title/id/tags when we actually produce output
765                let corr = &self.correlations[corr_idx];
766                let result = CorrelationResult {
767                    rule_title: corr.title.clone(),
768                    rule_id: corr.id.clone(),
769                    level,
770                    tags: corr.tags.clone(),
771                    correlation_type: corr_type,
772                    group_key: group_key.to_pairs(&corr.group_by),
773                    aggregated_value: agg_value,
774                    timespan_secs: timespan,
775                    events,
776                    event_refs,
777                    custom_attributes: corr.custom_attributes.clone(),
778                };
779                out.push(result);
780
781                // Record alert time for suppression
782                self.last_alert.insert(alert_key.clone(), ts);
783
784                // Action on match
785                if action == CorrelationAction::Reset {
786                    if let Some(state) = self.state.get_mut(&alert_key) {
787                        state.clear();
788                    }
789                    if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
790                        buf.clear();
791                    }
792                    if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
793                        buf.clear();
794                    }
795                }
796            }
797        }
798    }
799
800    /// Propagate correlation results to higher-level correlations (chaining).
801    ///
802    /// When a correlation fires, any correlation that references it (by ID or name)
803    /// is updated. Limits chain depth to 10 to prevent infinite loops.
804    fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
805        const MAX_CHAIN_DEPTH: usize = 10;
806        let mut pending: Vec<CorrelationResult> = fired.to_vec();
807        let mut depth = 0;
808
809        while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
810            depth += 1;
811
812            // Collect work items: (corr_idx, group_key_pairs, fired_ref)
813            #[allow(clippy::type_complexity)]
814            let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
815            for result in &pending {
816                if let Some(ref id) = result.rule_id
817                    && let Some(indices) = self.rule_index.get(id)
818                {
819                    let fired_ref = result
820                        .rule_id
821                        .as_deref()
822                        .unwrap_or(&result.rule_title)
823                        .to_string();
824                    for &corr_idx in indices {
825                        work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
826                    }
827                }
828            }
829
830            let mut next_pending = Vec::new();
831            for (corr_idx, group_key_pairs, fired_ref) in work {
832                let corr = &self.correlations[corr_idx];
833                let corr_type = corr.correlation_type;
834                let timespan = corr.timespan_secs;
835                let level = corr.level;
836
837                let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
838                let state_key = (corr_idx, group_key.clone());
839                let state = self
840                    .state
841                    .entry(state_key)
842                    .or_insert_with(|| WindowState::new_for(corr_type));
843
844                let cutoff = ts - timespan as i64;
845                state.evict(cutoff);
846
847                match corr_type {
848                    CorrelationType::EventCount => {
849                        state.push_event_count(ts);
850                    }
851                    CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
852                        state.push_temporal(ts, &fired_ref);
853                    }
854                    _ => {
855                        state.push_event_count(ts);
856                    }
857                }
858
859                let fired = state.check_condition(
860                    &corr.condition,
861                    corr_type,
862                    &corr.rule_refs,
863                    corr.extended_expr.as_ref(),
864                );
865
866                if let Some(agg_value) = fired {
867                    let corr = &self.correlations[corr_idx];
868                    next_pending.push(CorrelationResult {
869                        rule_title: corr.title.clone(),
870                        rule_id: corr.id.clone(),
871                        level,
872                        tags: corr.tags.clone(),
873                        correlation_type: corr_type,
874                        group_key: group_key.to_pairs(&corr.group_by),
875                        aggregated_value: agg_value,
876                        timespan_secs: timespan,
877                        // Chained correlations don't include events (they aggregate
878                        // over correlation results, not raw events)
879                        events: None,
880                        event_refs: None,
881                        custom_attributes: corr.custom_attributes.clone(),
882                    });
883                }
884            }
885
886            pending = next_pending;
887        }
888
889        if !pending.is_empty() {
890            log::warn!(
891                "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
892                 {} pending result(s) were not propagated further. \
893                 This may indicate a cycle in correlation references.",
894                pending.len()
895            );
896        }
897    }
898
899    // =========================================================================
900    // Timestamp extraction
901    // =========================================================================
902
903    /// Extract a Unix epoch timestamp (seconds) from an event.
904    ///
905    /// Tries each configured timestamp field in order. Supports:
906    /// - Numeric values (epoch seconds, or epoch millis if > 1e12)
907    /// - ISO 8601 strings (e.g., "2024-07-10T12:30:00Z")
908    ///
909    /// Returns `None` if no field yields a valid timestamp.
910    fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
911        for field_name in &self.config.timestamp_fields {
912            if let Some(val) = event.get_field(field_name)
913                && let Some(ts) = parse_timestamp_value(&val)
914            {
915                return Some(ts);
916            }
917        }
918        None
919    }
920
921    // =========================================================================
922    // State management
923    // =========================================================================
924
925    /// Manually evict all expired state entries.
926    pub fn evict_expired(&mut self, now_secs: i64) {
927        self.evict_all(now_secs);
928    }
929
930    /// Evict expired entries and remove empty states.
931    fn evict_all(&mut self, now_secs: i64) {
932        // Phase 1: Time-based eviction — remove entries outside their correlation window
933        let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
934
935        self.state.retain(|&(corr_idx, _), state| {
936            if corr_idx < timespans.len() {
937                let cutoff = now_secs - timespans[corr_idx] as i64;
938                state.evict(cutoff);
939            }
940            !state.is_empty()
941        });
942
943        // Evict event buffers in sync with window state
944        self.event_buffers.retain(|&(corr_idx, _), buf| {
945            if corr_idx < timespans.len() {
946                let cutoff = now_secs - timespans[corr_idx] as i64;
947                buf.evict(cutoff);
948            }
949            !buf.is_empty()
950        });
951        self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
952            if corr_idx < timespans.len() {
953                let cutoff = now_secs - timespans[corr_idx] as i64;
954                buf.evict(cutoff);
955            }
956            !buf.is_empty()
957        });
958
959        // Phase 2: Hard cap — if still over limit after time-based eviction (e.g.
960        // high-cardinality traffic with long windows), drop the stalest entries
961        // until we're at 90% capacity to avoid evicting on every single event.
962        if self.state.len() >= self.config.max_state_entries {
963            let target = self.config.max_state_entries * 9 / 10;
964            let excess = self.state.len() - target;
965
966            // Collect keys with their latest timestamp, sort by oldest first
967            let mut by_staleness: Vec<_> = self
968                .state
969                .iter()
970                .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
971                .collect();
972            by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
973
974            // Drop the oldest entries (and their associated event buffers)
975            for (key, _) in by_staleness.into_iter().take(excess) {
976                self.state.remove(&key);
977                self.last_alert.remove(&key);
978                self.event_buffers.remove(&key);
979                self.event_ref_buffers.remove(&key);
980            }
981        }
982
983        // Phase 3: Evict stale last_alert entries — remove if the suppress window
984        // has passed or if the corresponding window state no longer exists.
985        self.last_alert.retain(|key, &mut alert_ts| {
986            let suppress = if key.0 < self.correlations.len() {
987                self.correlations[key.0]
988                    .suppress_secs
989                    .or(self.config.suppress)
990                    .unwrap_or(0)
991            } else {
992                0
993            };
994            (now_secs - alert_ts) < suppress as i64
995        });
996    }
997
998    /// Number of active state entries (for monitoring).
999    pub fn state_count(&self) -> usize {
1000        self.state.len()
1001    }
1002
1003    /// Number of detection rules loaded.
1004    pub fn detection_rule_count(&self) -> usize {
1005        self.engine.rule_count()
1006    }
1007
1008    /// Number of correlation rules loaded.
1009    pub fn correlation_rule_count(&self) -> usize {
1010        self.correlations.len()
1011    }
1012
1013    /// Number of active event buffers (for monitoring).
1014    pub fn event_buffer_count(&self) -> usize {
1015        self.event_buffers.len()
1016    }
1017
1018    /// Total compressed bytes across all event buffers (for monitoring).
1019    pub fn event_buffer_bytes(&self) -> usize {
1020        self.event_buffers
1021            .values()
1022            .map(|b| b.compressed_bytes())
1023            .sum()
1024    }
1025
1026    /// Number of active event ref buffers — `Refs` mode (for monitoring).
1027    pub fn event_ref_buffer_count(&self) -> usize {
1028        self.event_ref_buffers.len()
1029    }
1030
1031    /// Access the inner stateless engine.
1032    pub fn engine(&self) -> &Engine {
1033        &self.engine
1034    }
1035
1036    /// Export all mutable correlation state as a serializable snapshot.
1037    ///
1038    /// The snapshot uses stable correlation identifiers (id > name > title)
1039    /// instead of internal indices, so it survives rule reloads as long as
1040    /// the correlation rules keep the same identifiers.
1041    pub fn export_state(&self) -> CorrelationSnapshot {
1042        let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1043        for ((idx, gk), ws) in &self.state {
1044            let corr_id = self.correlation_stable_id(*idx);
1045            windows
1046                .entry(corr_id)
1047                .or_default()
1048                .push((gk.clone(), ws.clone()));
1049        }
1050
1051        let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1052        for ((idx, gk), ts) in &self.last_alert {
1053            let corr_id = self.correlation_stable_id(*idx);
1054            last_alert
1055                .entry(corr_id)
1056                .or_default()
1057                .push((gk.clone(), *ts));
1058        }
1059
1060        let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1061        for ((idx, gk), buf) in &self.event_buffers {
1062            let corr_id = self.correlation_stable_id(*idx);
1063            event_buffers
1064                .entry(corr_id)
1065                .or_default()
1066                .push((gk.clone(), buf.clone()));
1067        }
1068
1069        let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1070            HashMap::new();
1071        for ((idx, gk), buf) in &self.event_ref_buffers {
1072            let corr_id = self.correlation_stable_id(*idx);
1073            event_ref_buffers
1074                .entry(corr_id)
1075                .or_default()
1076                .push((gk.clone(), buf.clone()));
1077        }
1078
1079        CorrelationSnapshot {
1080            version: SNAPSHOT_VERSION,
1081            windows,
1082            last_alert,
1083            event_buffers,
1084            event_ref_buffers,
1085        }
1086    }
1087
1088    /// Import previously exported state, mapping stable identifiers back to
1089    /// current correlation indices. Entries whose identifiers no longer match
1090    /// any loaded correlation are silently dropped.
1091    ///
1092    /// Returns `false` (and imports nothing) if the snapshot version is
1093    /// incompatible with the current schema.
1094    pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1095        if snapshot.version != SNAPSHOT_VERSION {
1096            return false;
1097        }
1098        let id_to_idx = self.build_id_to_index_map();
1099
1100        for (corr_id, groups) in snapshot.windows {
1101            if let Some(&idx) = id_to_idx.get(&corr_id) {
1102                for (gk, ws) in groups {
1103                    self.state.insert((idx, gk), ws);
1104                }
1105            }
1106        }
1107
1108        for (corr_id, groups) in snapshot.last_alert {
1109            if let Some(&idx) = id_to_idx.get(&corr_id) {
1110                for (gk, ts) in groups {
1111                    self.last_alert.insert((idx, gk), ts);
1112                }
1113            }
1114        }
1115
1116        for (corr_id, groups) in snapshot.event_buffers {
1117            if let Some(&idx) = id_to_idx.get(&corr_id) {
1118                for (gk, buf) in groups {
1119                    self.event_buffers.insert((idx, gk), buf);
1120                }
1121            }
1122        }
1123
1124        for (corr_id, groups) in snapshot.event_ref_buffers {
1125            if let Some(&idx) = id_to_idx.get(&corr_id) {
1126                for (gk, buf) in groups {
1127                    self.event_ref_buffers.insert((idx, gk), buf);
1128                }
1129            }
1130        }
1131
1132        true
1133    }
1134
1135    /// Stable identifier for a correlation rule: prefers id, then name, then title.
1136    fn correlation_stable_id(&self, idx: usize) -> String {
1137        let corr = &self.correlations[idx];
1138        corr.id
1139            .clone()
1140            .or_else(|| corr.name.clone())
1141            .unwrap_or_else(|| corr.title.clone())
1142    }
1143
1144    /// Build a reverse map from stable id → current correlation index.
1145    fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1146        self.correlations
1147            .iter()
1148            .enumerate()
1149            .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1150            .collect()
1151    }
1152}
1153
1154impl Default for CorrelationEngine {
1155    fn default() -> Self {
1156        Self::new(CorrelationConfig::default())
1157    }
1158}
1159
1160// =============================================================================
1161// Timestamp parsing helpers
1162// =============================================================================
1163
1164/// Extract a timestamp from an event using the given field names.
1165///
1166/// Standalone version of `CorrelationEngine::extract_event_timestamp` for use
1167/// in contexts where borrowing `&self` is not possible (e.g. rayon closures).
1168fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1169    for field_name in timestamp_fields {
1170        if let Some(val) = event.get_field(field_name)
1171            && let Some(ts) = parse_timestamp_value(&val)
1172        {
1173            return Some(ts);
1174        }
1175    }
1176    None
1177}
1178
1179/// Parse an [`EventValue`] as a Unix epoch timestamp in seconds.
1180fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1181    match val {
1182        EventValue::Int(i) => Some(normalize_epoch(*i)),
1183        EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1184        EventValue::Str(s) => parse_timestamp_string(s),
1185        _ => None,
1186    }
1187}
1188
1189/// Normalize an epoch value: if it looks like milliseconds (> year 33658),
1190/// convert to seconds.
1191fn normalize_epoch(v: i64) -> i64 {
1192    if v > 1_000_000_000_000 { v / 1000 } else { v }
1193}
1194
1195/// Parse a timestamp string. Tries ISO 8601 with timezone, then without.
1196fn parse_timestamp_string(s: &str) -> Option<i64> {
1197    // Try RFC 3339 / ISO 8601 with timezone
1198    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1199        return Some(dt.timestamp());
1200    }
1201
1202    // Try ISO 8601 without timezone (assume UTC)
1203    // Common formats: "2024-07-10T12:30:00", "2024-07-10 12:30:00"
1204    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1205        return Some(Utc.from_utc_datetime(&naive).timestamp());
1206    }
1207    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1208        return Some(Utc.from_utc_datetime(&naive).timestamp());
1209    }
1210
1211    // Try with fractional seconds
1212    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1213        return Some(Utc.from_utc_datetime(&naive).timestamp());
1214    }
1215    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1216        return Some(Utc.from_utc_datetime(&naive).timestamp());
1217    }
1218
1219    None
1220}
1221
1222/// Convert an [`EventValue`] to a string for value_count purposes.
1223fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1224    match v {
1225        EventValue::Str(s) => Some(s.to_string()),
1226        EventValue::Int(n) => Some(n.to_string()),
1227        EventValue::Float(f) => Some(f.to_string()),
1228        EventValue::Bool(b) => Some(b.to_string()),
1229        EventValue::Null => Some("null".to_string()),
1230        _ => None,
1231    }
1232}
1233
1234/// Convert an [`EventValue`] to f64 for numeric aggregation.
1235fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1236    v.as_f64()
1237}