Skip to main content

rsigma_eval/correlation_engine/
mod.rs

1//! Stateful correlation engine with time-windowed aggregation.
2//!
3//! `CorrelationEngine` wraps the stateless `Engine` and adds support for
4//! Sigma correlation rules: `event_count`, `value_count`, `temporal`,
5//! `temporal_ordered`, `value_sum`, `value_avg`, `value_percentile`,
6//! and `value_median`.
7//!
8//! # Architecture
9//!
10//! 1. Events are first evaluated against detection rules (stateless)
11//! 2. Detection matches update correlation window state (stateful)
12//! 3. When a correlation condition is met, a `CorrelationResult` is emitted
13//! 4. Correlation results can chain into higher-level correlations
14
15#[cfg(test)]
16mod tests;
17mod types;
18
19pub use types::*;
20
21use std::collections::HashMap;
22
23use chrono::{DateTime, TimeZone, Utc};
24
25use rsigma_parser::{CorrelationRule, CorrelationType, SigmaCollection, SigmaRule};
26
27use crate::correlation::{
28    CompiledCorrelation, EventBuffer, EventRefBuffer, GroupKey, WindowState, compile_correlation,
29};
30use crate::engine::Engine;
31use crate::error::{EvalError, Result};
32use crate::event::{Event, EventValue};
33use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
34use crate::result::MatchResult;
35
36// =============================================================================
37// Correlation Engine
38// =============================================================================
39
40/// Current snapshot schema version. Bump when the serialized format changes.
41const SNAPSHOT_VERSION: u32 = 1;
42
43/// Stateful correlation engine.
44///
45/// Wraps the stateless `Engine` for detection rules and adds time-windowed
46/// correlation on top. Supports all 7 Sigma correlation types and chaining.
47pub struct CorrelationEngine {
48    /// Inner stateless detection engine.
49    engine: Engine,
50    /// Compiled correlation rules.
51    correlations: Vec<CompiledCorrelation>,
52    /// Maps rule ID/name -> indices into `correlations` that reference it.
53    /// This allows quick lookup: "which correlations care about rule X?"
54    rule_index: HashMap<String, Vec<usize>>,
55    /// Maps detection rule index -> (rule_id, rule_name) for reverse lookup.
56    /// Used to find which correlations a detection match triggers.
57    rule_ids: Vec<(Option<String>, Option<String>)>,
58    /// Per-(correlation_index, group_key) window state.
59    state: HashMap<(usize, GroupKey), WindowState>,
60    /// Last alert timestamp per (correlation_index, group_key) for suppression.
61    last_alert: HashMap<(usize, GroupKey), i64>,
62    /// Per-(correlation_index, group_key) compressed event buffer (`Full` mode).
63    event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
64    /// Per-(correlation_index, group_key) event reference buffer (`Refs` mode).
65    event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
66    /// Set of detection rule IDs/names that are "correlation-only"
67    /// (referenced by correlations where `generate == false`).
68    /// Used to filter detection output when `config.emit_detections == false`.
69    correlation_only_rules: std::collections::HashSet<String>,
70    /// Configuration.
71    config: CorrelationConfig,
72    /// Processing pipelines applied to rules during add_rule.
73    pipelines: Vec<Pipeline>,
74}
75
76impl CorrelationEngine {
77    /// Create a new correlation engine with the given configuration.
78    pub fn new(config: CorrelationConfig) -> Self {
79        CorrelationEngine {
80            engine: Engine::new(),
81            correlations: Vec::new(),
82            rule_index: HashMap::new(),
83            rule_ids: Vec::new(),
84            state: HashMap::new(),
85            last_alert: HashMap::new(),
86            event_buffers: HashMap::new(),
87            event_ref_buffers: HashMap::new(),
88            correlation_only_rules: std::collections::HashSet::new(),
89            config,
90            pipelines: Vec::new(),
91        }
92    }
93
94    /// Add a pipeline to the engine.
95    ///
96    /// Pipelines are applied to rules during `add_rule` / `add_collection`.
97    pub fn add_pipeline(&mut self, pipeline: Pipeline) {
98        self.pipelines.push(pipeline);
99        self.pipelines.sort_by_key(|p| p.priority);
100    }
101
102    /// Set global `include_event` on the inner detection engine.
103    pub fn set_include_event(&mut self, include: bool) {
104        self.engine.set_include_event(include);
105    }
106
107    /// Set the global correlation event mode.
108    ///
109    /// - `None`: no event storage (default)
110    /// - `Full`: compressed event bodies
111    /// - `Refs`: lightweight timestamp + ID references
112    pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
113        self.config.correlation_event_mode = mode;
114    }
115
116    /// Set the maximum number of events to store per correlation window group.
117    /// Only meaningful when `correlation_event_mode` is not `None`.
118    pub fn set_max_correlation_events(&mut self, max: usize) {
119        self.config.max_correlation_events = max;
120    }
121
122    /// Add a single detection rule.
123    ///
124    /// If pipelines are set, the rule is cloned and transformed before compilation.
125    /// The inner engine receives the already-transformed rule directly (not through
126    /// its own pipeline, to avoid double transformation).
127    pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
128        if self.pipelines.is_empty() {
129            self.apply_custom_attributes(&rule.custom_attributes);
130            self.rule_ids.push((rule.id.clone(), rule.name.clone()));
131            self.engine.add_rule(rule)?;
132        } else {
133            let mut transformed = rule.clone();
134            apply_pipelines(&self.pipelines, &mut transformed)?;
135            self.apply_custom_attributes(&transformed.custom_attributes);
136            self.rule_ids
137                .push((transformed.id.clone(), transformed.name.clone()));
138            // Use compile_rule + add_compiled_rule to bypass inner engine's pipelines
139            let compiled = crate::compiler::compile_rule(&transformed)?;
140            self.engine.add_compiled_rule(compiled);
141        }
142        Ok(())
143    }
144
145    /// Read `rsigma.*` custom attributes from a rule and apply them to the
146    /// engine configuration.  This allows pipelines to influence engine
147    /// behaviour via `SetCustomAttribute` transformations — the same pattern
148    /// used by pySigma backends (e.g. pySigma-backend-loki).
149    ///
150    /// Supported attributes:
151    /// - `rsigma.timestamp_field` — prepends a field name to the timestamp
152    ///   extraction priority list so the correlation engine can find the
153    ///   event timestamp in non-standard field names.
154    /// - `rsigma.suppress` — sets the default suppression window (e.g. `5m`).
155    ///   Only applied when the CLI did not already set `--suppress`.
156    /// - `rsigma.action` — sets the default post-fire action (`alert`/`reset`).
157    ///   Only applied when the CLI did not already set `--action`.
158    fn apply_custom_attributes(
159        &mut self,
160        attrs: &std::collections::HashMap<String, serde_yaml::Value>,
161    ) {
162        // rsigma.timestamp_field — prepend to priority list, skip duplicates
163        if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
164            && !self.config.timestamp_fields.iter().any(|f| f == field)
165        {
166            self.config.timestamp_fields.insert(0, field.to_string());
167        }
168
169        // rsigma.suppress — only when CLI didn't already set one
170        if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
171            && self.config.suppress.is_none()
172            && let Ok(ts) = rsigma_parser::Timespan::parse(val)
173        {
174            self.config.suppress = Some(ts.seconds);
175        }
176
177        // rsigma.action — only when CLI left it at the default (Alert)
178        if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
179            && self.config.action_on_match == CorrelationAction::Alert
180            && let Ok(a) = val.parse::<CorrelationAction>()
181        {
182            self.config.action_on_match = a;
183        }
184    }
185
186    /// Add a single correlation rule.
187    pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
188        let owned;
189        let effective = if self.pipelines.is_empty() {
190            corr
191        } else {
192            owned = {
193                let mut c = corr.clone();
194                apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
195                c
196            };
197            &owned
198        };
199
200        // Apply engine-level custom attributes from the (possibly transformed)
201        // correlation rule (e.g. rsigma.timestamp_field).
202        self.apply_custom_attributes(&effective.custom_attributes);
203
204        let compiled = compile_correlation(effective)?;
205        let idx = self.correlations.len();
206
207        // Index by each referenced rule ID/name
208        for rule_ref in &compiled.rule_refs {
209            self.rule_index
210                .entry(rule_ref.clone())
211                .or_default()
212                .push(idx);
213        }
214
215        // Track correlation-only rules (generate == false is the default)
216        if !compiled.generate {
217            for rule_ref in &compiled.rule_refs {
218                self.correlation_only_rules.insert(rule_ref.clone());
219            }
220        }
221
222        self.correlations.push(compiled);
223        Ok(())
224    }
225
226    /// Add all rules and correlations from a parsed collection.
227    ///
228    /// Detection rules are added first (so they're available for correlation
229    /// references), then correlation rules.
230    pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
231        for rule in &collection.rules {
232            self.add_rule(rule)?;
233        }
234        // Apply filter rules to the inner engine's detection rules
235        for filter in &collection.filters {
236            self.engine.apply_filter(filter)?;
237        }
238        for corr in &collection.correlations {
239            self.add_correlation(corr)?;
240        }
241        self.validate_rule_refs()?;
242        self.detect_correlation_cycles()?;
243        Ok(())
244    }
245
246    /// Validate that every correlation's `rule_refs` resolve to at least one
247    /// known detection rule (by ID or name) or another correlation (by ID or name).
248    fn validate_rule_refs(&self) -> Result<()> {
249        let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
250
251        for (id, name) in &self.rule_ids {
252            if let Some(id) = id {
253                known.insert(id.as_str());
254            }
255            if let Some(name) = name {
256                known.insert(name.as_str());
257            }
258        }
259        for corr in &self.correlations {
260            if let Some(ref id) = corr.id {
261                known.insert(id.as_str());
262            }
263            if let Some(ref name) = corr.name {
264                known.insert(name.as_str());
265            }
266        }
267
268        for corr in &self.correlations {
269            for rule_ref in &corr.rule_refs {
270                if !known.contains(rule_ref.as_str()) {
271                    return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
272                }
273            }
274        }
275        Ok(())
276    }
277
278    /// Detect cycles in the correlation reference graph.
279    ///
280    /// Builds a directed graph where each correlation (identified by its id/name)
281    /// has edges to the correlations it references via `rule_refs`. Uses DFS with
282    /// a "gray/black" coloring scheme to detect back-edges (cycles).
283    ///
284    /// Returns `Err(EvalError::CorrelationCycle)` if a cycle is found.
285    fn detect_correlation_cycles(&self) -> Result<()> {
286        // Build a set of all correlation identifiers (id and/or name)
287        let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
288        for (idx, corr) in self.correlations.iter().enumerate() {
289            if let Some(ref id) = corr.id {
290                corr_identifiers.insert(id.as_str(), idx);
291            }
292            if let Some(ref name) = corr.name {
293                corr_identifiers.insert(name.as_str(), idx);
294            }
295        }
296
297        // Build adjacency list: corr index → set of corr indices it references
298        let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
299        for (idx, corr) in self.correlations.iter().enumerate() {
300            for rule_ref in &corr.rule_refs {
301                if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
302                    adj[idx].push(target_idx);
303                }
304            }
305        }
306
307        // DFS cycle detection with three states: white (unvisited), gray (in stack), black (done)
308        let mut state = vec![0u8; self.correlations.len()]; // 0=white, 1=gray, 2=black
309        let mut path: Vec<usize> = Vec::new();
310
311        for start in 0..self.correlations.len() {
312            if state[start] == 0
313                && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
314            {
315                let names: Vec<String> = cycle
316                    .iter()
317                    .map(|&i| {
318                        self.correlations[i]
319                            .id
320                            .as_deref()
321                            .or(self.correlations[i].name.as_deref())
322                            .unwrap_or(&self.correlations[i].title)
323                            .to_string()
324                    })
325                    .collect();
326                return Err(crate::error::EvalError::CorrelationCycle(
327                    names.join(" -> "),
328                ));
329            }
330        }
331        Ok(())
332    }
333
334    /// DFS helper that returns the cycle path if a back-edge is found.
335    fn dfs_find_cycle(
336        node: usize,
337        adj: &[Vec<usize>],
338        state: &mut [u8],
339        path: &mut Vec<usize>,
340    ) -> Option<Vec<usize>> {
341        state[node] = 1; // gray
342        path.push(node);
343
344        for &next in &adj[node] {
345            if state[next] == 1 {
346                // Back-edge found — extract cycle from path
347                if let Some(pos) = path.iter().position(|&n| n == next) {
348                    let mut cycle = path[pos..].to_vec();
349                    cycle.push(next); // close the cycle
350                    return Some(cycle);
351                }
352            }
353            if state[next] == 0
354                && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
355            {
356                return Some(cycle);
357            }
358        }
359
360        path.pop();
361        state[node] = 2; // black
362        None
363    }
364
365    /// Process an event, extracting the timestamp from configured event fields.
366    ///
367    /// When no timestamp field is found, the `timestamp_fallback` policy applies:
368    /// - `WallClock`: use `Utc::now()` (good for real-time streaming)
369    /// - `Skip`: return detections only, skip correlation state updates
370    pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
371        let all_detections = self.engine.evaluate(event);
372
373        let ts = match self.extract_event_timestamp(event) {
374            Some(ts) => ts,
375            None => match self.config.timestamp_fallback {
376                TimestampFallback::WallClock => Utc::now().timestamp(),
377                TimestampFallback::Skip => {
378                    // Still run detection (stateless), but skip correlation
379                    let detections = self.filter_detections(all_detections);
380                    return ProcessResult {
381                        detections,
382                        correlations: Vec::new(),
383                    };
384                }
385            },
386        };
387        self.process_with_detections(event, all_detections, ts)
388    }
389
390    /// Process an event with an explicit Unix epoch timestamp (seconds).
391    ///
392    /// The timestamp is clamped to `[0, i64::MAX / 2]` to prevent overflow
393    /// when adding timespan durations internally.
394    pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
395        let all_detections = self.engine.evaluate(event);
396        self.process_with_detections(event, all_detections, timestamp_secs)
397    }
398
399    /// Process an event with pre-computed detection results.
400    ///
401    /// Enables external parallelism: callers can run detection (via
402    /// [`evaluate`](Self::evaluate)) in parallel, then feed results here
403    /// sequentially for stateful correlation.
404    pub fn process_with_detections(
405        &mut self,
406        event: &impl Event,
407        all_detections: Vec<MatchResult>,
408        timestamp_secs: i64,
409    ) -> ProcessResult {
410        let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
411
412        // Memory management — evict before adding new state to enforce limit
413        if self.state.len() >= self.config.max_state_entries {
414            self.evict_all(timestamp_secs);
415        }
416
417        // Feed detection matches into correlations
418        let mut correlations = Vec::new();
419        self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
420
421        // Chain — correlation results may trigger higher-level correlations
422        self.chain_correlations(&correlations, timestamp_secs);
423
424        // Filter detections by generate flag
425        let detections = self.filter_detections(all_detections);
426
427        ProcessResult {
428            detections,
429            correlations,
430        }
431    }
432
433    /// Run stateless detection only (no correlation), delegating to the inner engine.
434    ///
435    /// Takes `&self` so it can be called concurrently from multiple threads
436    /// (e.g. via `rayon::par_iter`) while the mutable correlation phase runs
437    /// sequentially afterwards.
438    pub fn evaluate(&self, event: &impl Event) -> Vec<MatchResult> {
439        self.engine.evaluate(event)
440    }
441
442    /// Process a batch of events: parallel detection, then sequential correlation.
443    ///
444    /// When the `parallel` feature is enabled, the stateless detection phase runs
445    /// concurrently via rayon. Timestamp extraction also runs in the parallel
446    /// phase (it borrows `&self.config` immutably). After `collect()` releases the
447    /// immutable borrows, each event's pre-computed detections are fed into the
448    /// stateful correlation engine sequentially.
449    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
450        // Borrow split: take immutable refs to fields needed for the parallel phase.
451        // These are released by collect() before the sequential &mut self phase.
452        let engine = &self.engine;
453        let ts_fields = &self.config.timestamp_fields;
454
455        let batch_results: Vec<(Vec<MatchResult>, Option<i64>)> = {
456            #[cfg(feature = "parallel")]
457            {
458                use rayon::prelude::*;
459                events
460                    .par_iter()
461                    .map(|e| {
462                        let detections = engine.evaluate(e);
463                        let ts = extract_event_ts(e, ts_fields);
464                        (detections, ts)
465                    })
466                    .collect()
467            }
468            #[cfg(not(feature = "parallel"))]
469            {
470                events
471                    .iter()
472                    .map(|e| {
473                        let detections = engine.evaluate(e);
474                        let ts = extract_event_ts(e, ts_fields);
475                        (detections, ts)
476                    })
477                    .collect()
478            }
479        };
480
481        // Sequential correlation phase
482        let mut results = Vec::with_capacity(events.len());
483        for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
484            match ts_opt {
485                Some(ts) => {
486                    results.push(self.process_with_detections(event, detections, ts));
487                }
488                None => match self.config.timestamp_fallback {
489                    TimestampFallback::WallClock => {
490                        let ts = Utc::now().timestamp();
491                        results.push(self.process_with_detections(event, detections, ts));
492                    }
493                    TimestampFallback::Skip => {
494                        // Still return detection results, but skip correlation
495                        let detections = self.filter_detections(detections);
496                        results.push(ProcessResult {
497                            detections,
498                            correlations: Vec::new(),
499                        });
500                    }
501                },
502            }
503        }
504        results
505    }
506
507    /// Filter detections by the `generate` flag / `emit_detections` config.
508    ///
509    /// If `emit_detections` is false and some rules are correlation-only,
510    /// their detection output is suppressed.
511    fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
512        if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
513            all_detections
514                .into_iter()
515                .filter(|m| {
516                    let id_match = m
517                        .rule_id
518                        .as_ref()
519                        .is_some_and(|id| self.correlation_only_rules.contains(id));
520                    !id_match
521                })
522                .collect()
523        } else {
524            all_detections
525        }
526    }
527
528    /// Feed detection matches into correlation window states.
529    fn feed_detections(
530        &mut self,
531        event: &impl Event,
532        detections: &[MatchResult],
533        ts: i64,
534        out: &mut Vec<CorrelationResult>,
535    ) {
536        // Collect all (corr_idx, rule_id, rule_name) tuples upfront to avoid
537        // borrow conflicts between self.rule_ids and self.update_correlation.
538        let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
539
540        for det in detections {
541            // Use the MatchResult's rule_id to find the original rule's ID/name.
542            // We also look up by rule_id in our rule_ids table for the name.
543            let (rule_id, rule_name) = self.find_rule_identity(det);
544
545            // Collect correlation indices that reference this rule
546            let mut corr_indices = Vec::new();
547            if let Some(ref id) = rule_id
548                && let Some(indices) = self.rule_index.get(id)
549            {
550                corr_indices.extend(indices);
551            }
552            if let Some(ref name) = rule_name
553                && let Some(indices) = self.rule_index.get(name)
554            {
555                corr_indices.extend(indices);
556            }
557
558            corr_indices.sort_unstable();
559            corr_indices.dedup();
560
561            for &corr_idx in &corr_indices {
562                work.push((corr_idx, rule_id.clone(), rule_name.clone()));
563            }
564        }
565
566        for (corr_idx, rule_id, rule_name) in work {
567            self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
568        }
569    }
570
571    /// Find the (id, name) for a detection match by searching our rule_ids table.
572    fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
573        // First, try to find by matching rule_id in our table
574        if let Some(ref match_id) = det.rule_id {
575            for (id, name) in &self.rule_ids {
576                if id.as_deref() == Some(match_id.as_str()) {
577                    return (id.clone(), name.clone());
578                }
579            }
580        }
581        // Fall back to using just the MatchResult's rule_id
582        (det.rule_id.clone(), None)
583    }
584
585    /// Resolve the event mode for a given correlation.
586    fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
587        let corr = &self.correlations[corr_idx];
588        corr.event_mode
589            .unwrap_or(self.config.correlation_event_mode)
590    }
591
592    /// Resolve the max events cap for a given correlation.
593    fn resolve_max_events(&self, corr_idx: usize) -> usize {
594        let corr = &self.correlations[corr_idx];
595        corr.max_events
596            .unwrap_or(self.config.max_correlation_events)
597    }
598
599    /// Update a single correlation's state and check its condition.
600    fn update_correlation(
601        &mut self,
602        corr_idx: usize,
603        event: &impl Event,
604        ts: i64,
605        rule_id: &Option<String>,
606        rule_name: &Option<String>,
607        out: &mut Vec<CorrelationResult>,
608    ) {
609        // Borrow the correlation by reference — no cloning needed.  Rust allows
610        // simultaneous &self.correlations and &mut self.state / &mut self.last_alert
611        // because they are disjoint struct fields.
612        let corr = &self.correlations[corr_idx];
613        let corr_type = corr.correlation_type;
614        let timespan = corr.timespan_secs;
615        let level = corr.level;
616        let suppress_secs = corr.suppress_secs.or(self.config.suppress);
617        let action = corr.action.unwrap_or(self.config.action_on_match);
618        let event_mode = self.resolve_event_mode(corr_idx);
619        let max_events = self.resolve_max_events(corr_idx);
620
621        // Determine the rule_ref strings for alias resolution and temporal tracking.
622        let mut ref_strs: Vec<&str> = Vec::new();
623        if let Some(id) = rule_id.as_deref() {
624            ref_strs.push(id);
625        }
626        if let Some(name) = rule_name.as_deref() {
627            ref_strs.push(name);
628        }
629        let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
630
631        // Extract group key
632        let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
633
634        // Get or create window state
635        let state_key = (corr_idx, group_key.clone());
636        let state = self
637            .state
638            .entry(state_key.clone())
639            .or_insert_with(|| WindowState::new_for(corr_type));
640
641        // Evict expired entries
642        let cutoff = ts - timespan as i64;
643        state.evict(cutoff);
644
645        // Push the new event into the state
646        match corr_type {
647            CorrelationType::EventCount => {
648                state.push_event_count(ts);
649            }
650            CorrelationType::ValueCount => {
651                if let Some(ref fields) = corr.condition.field
652                    && let Some(field_name) = fields.first()
653                    && let Some(val) = event.get_field(field_name)
654                    && let Some(s) = value_to_string_for_count(&val)
655                {
656                    state.push_value_count(ts, s);
657                }
658            }
659            CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
660                state.push_temporal(ts, rule_ref);
661            }
662            CorrelationType::ValueSum
663            | CorrelationType::ValueAvg
664            | CorrelationType::ValuePercentile
665            | CorrelationType::ValueMedian => {
666                if let Some(ref fields) = corr.condition.field
667                    && let Some(field_name) = fields.first()
668                    && let Some(val) = event.get_field(field_name)
669                    && let Some(n) = value_to_f64_ev(&val)
670                {
671                    state.push_numeric(ts, n);
672                }
673            }
674        }
675
676        // Push event into buffer based on event mode
677        match event_mode {
678            CorrelationEventMode::Full => {
679                let buf = self
680                    .event_buffers
681                    .entry(state_key.clone())
682                    .or_insert_with(|| EventBuffer::new(max_events));
683                buf.evict(cutoff);
684                let json = event.to_json();
685                buf.push(ts, &json);
686            }
687            CorrelationEventMode::Refs => {
688                let buf = self
689                    .event_ref_buffers
690                    .entry(state_key.clone())
691                    .or_insert_with(|| EventRefBuffer::new(max_events));
692                buf.evict(cutoff);
693                let json = event.to_json();
694                buf.push(ts, &json);
695            }
696            CorrelationEventMode::None => {}
697        }
698
699        // Check condition — after this, `state` is no longer used (NLL drops the borrow).
700        let fired = state.check_condition(
701            &corr.condition,
702            corr_type,
703            &corr.rule_refs,
704            corr.extended_expr.as_ref(),
705        );
706
707        if let Some(agg_value) = fired {
708            let alert_key = (corr_idx, group_key.clone());
709
710            // Suppression check: skip if we've already alerted within the suppress window
711            let suppressed = if let Some(suppress) = suppress_secs {
712                if let Some(&last_ts) = self.last_alert.get(&alert_key) {
713                    (ts - last_ts) < suppress as i64
714                } else {
715                    false
716                }
717            } else {
718                false
719            };
720
721            if !suppressed {
722                // Retrieve stored events / refs based on mode
723                let (events, event_refs) = match event_mode {
724                    CorrelationEventMode::Full => {
725                        let stored = self
726                            .event_buffers
727                            .get(&alert_key)
728                            .map(|buf| buf.decompress_all())
729                            .unwrap_or_default();
730                        (Some(stored), None)
731                    }
732                    CorrelationEventMode::Refs => {
733                        let stored = self
734                            .event_ref_buffers
735                            .get(&alert_key)
736                            .map(|buf| buf.refs())
737                            .unwrap_or_default();
738                        (None, Some(stored))
739                    }
740                    CorrelationEventMode::None => (None, None),
741                };
742
743                // Only clone title/id/tags when we actually produce output
744                let corr = &self.correlations[corr_idx];
745                let result = CorrelationResult {
746                    rule_title: corr.title.clone(),
747                    rule_id: corr.id.clone(),
748                    level,
749                    tags: corr.tags.clone(),
750                    correlation_type: corr_type,
751                    group_key: group_key.to_pairs(&corr.group_by),
752                    aggregated_value: agg_value,
753                    timespan_secs: timespan,
754                    events,
755                    event_refs,
756                    custom_attributes: corr.custom_attributes.clone(),
757                };
758                out.push(result);
759
760                // Record alert time for suppression
761                self.last_alert.insert(alert_key.clone(), ts);
762
763                // Action on match
764                if action == CorrelationAction::Reset {
765                    if let Some(state) = self.state.get_mut(&alert_key) {
766                        state.clear();
767                    }
768                    if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
769                        buf.clear();
770                    }
771                    if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
772                        buf.clear();
773                    }
774                }
775            }
776        }
777    }
778
779    /// Propagate correlation results to higher-level correlations (chaining).
780    ///
781    /// When a correlation fires, any correlation that references it (by ID or name)
782    /// is updated. Limits chain depth to 10 to prevent infinite loops.
783    fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
784        const MAX_CHAIN_DEPTH: usize = 10;
785        let mut pending: Vec<CorrelationResult> = fired.to_vec();
786        let mut depth = 0;
787
788        while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
789            depth += 1;
790
791            // Collect work items: (corr_idx, group_key_pairs, fired_ref)
792            #[allow(clippy::type_complexity)]
793            let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
794            for result in &pending {
795                if let Some(ref id) = result.rule_id
796                    && let Some(indices) = self.rule_index.get(id)
797                {
798                    let fired_ref = result
799                        .rule_id
800                        .as_deref()
801                        .unwrap_or(&result.rule_title)
802                        .to_string();
803                    for &corr_idx in indices {
804                        work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
805                    }
806                }
807            }
808
809            let mut next_pending = Vec::new();
810            for (corr_idx, group_key_pairs, fired_ref) in work {
811                let corr = &self.correlations[corr_idx];
812                let corr_type = corr.correlation_type;
813                let timespan = corr.timespan_secs;
814                let level = corr.level;
815
816                let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
817                let state_key = (corr_idx, group_key.clone());
818                let state = self
819                    .state
820                    .entry(state_key)
821                    .or_insert_with(|| WindowState::new_for(corr_type));
822
823                let cutoff = ts - timespan as i64;
824                state.evict(cutoff);
825
826                match corr_type {
827                    CorrelationType::EventCount => {
828                        state.push_event_count(ts);
829                    }
830                    CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
831                        state.push_temporal(ts, &fired_ref);
832                    }
833                    _ => {
834                        state.push_event_count(ts);
835                    }
836                }
837
838                let fired = state.check_condition(
839                    &corr.condition,
840                    corr_type,
841                    &corr.rule_refs,
842                    corr.extended_expr.as_ref(),
843                );
844
845                if let Some(agg_value) = fired {
846                    let corr = &self.correlations[corr_idx];
847                    next_pending.push(CorrelationResult {
848                        rule_title: corr.title.clone(),
849                        rule_id: corr.id.clone(),
850                        level,
851                        tags: corr.tags.clone(),
852                        correlation_type: corr_type,
853                        group_key: group_key.to_pairs(&corr.group_by),
854                        aggregated_value: agg_value,
855                        timespan_secs: timespan,
856                        // Chained correlations don't include events (they aggregate
857                        // over correlation results, not raw events)
858                        events: None,
859                        event_refs: None,
860                        custom_attributes: corr.custom_attributes.clone(),
861                    });
862                }
863            }
864
865            pending = next_pending;
866        }
867
868        if !pending.is_empty() {
869            log::warn!(
870                "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
871                 {} pending result(s) were not propagated further. \
872                 This may indicate a cycle in correlation references.",
873                pending.len()
874            );
875        }
876    }
877
878    // =========================================================================
879    // Timestamp extraction
880    // =========================================================================
881
882    /// Extract a Unix epoch timestamp (seconds) from an event.
883    ///
884    /// Tries each configured timestamp field in order. Supports:
885    /// - Numeric values (epoch seconds, or epoch millis if > 1e12)
886    /// - ISO 8601 strings (e.g., "2024-07-10T12:30:00Z")
887    ///
888    /// Returns `None` if no field yields a valid timestamp.
889    fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
890        for field_name in &self.config.timestamp_fields {
891            if let Some(val) = event.get_field(field_name)
892                && let Some(ts) = parse_timestamp_value(&val)
893            {
894                return Some(ts);
895            }
896        }
897        None
898    }
899
900    // =========================================================================
901    // State management
902    // =========================================================================
903
904    /// Manually evict all expired state entries.
905    pub fn evict_expired(&mut self, now_secs: i64) {
906        self.evict_all(now_secs);
907    }
908
909    /// Evict expired entries and remove empty states.
910    fn evict_all(&mut self, now_secs: i64) {
911        // Phase 1: Time-based eviction — remove entries outside their correlation window
912        let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
913
914        self.state.retain(|&(corr_idx, _), state| {
915            if corr_idx < timespans.len() {
916                let cutoff = now_secs - timespans[corr_idx] as i64;
917                state.evict(cutoff);
918            }
919            !state.is_empty()
920        });
921
922        // Evict event buffers in sync with window state
923        self.event_buffers.retain(|&(corr_idx, _), buf| {
924            if corr_idx < timespans.len() {
925                let cutoff = now_secs - timespans[corr_idx] as i64;
926                buf.evict(cutoff);
927            }
928            !buf.is_empty()
929        });
930        self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
931            if corr_idx < timespans.len() {
932                let cutoff = now_secs - timespans[corr_idx] as i64;
933                buf.evict(cutoff);
934            }
935            !buf.is_empty()
936        });
937
938        // Phase 2: Hard cap — if still over limit after time-based eviction (e.g.
939        // high-cardinality traffic with long windows), drop the stalest entries
940        // until we're at 90% capacity to avoid evicting on every single event.
941        if self.state.len() >= self.config.max_state_entries {
942            let target = self.config.max_state_entries * 9 / 10;
943            let excess = self.state.len() - target;
944
945            // Collect keys with their latest timestamp, sort by oldest first
946            let mut by_staleness: Vec<_> = self
947                .state
948                .iter()
949                .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
950                .collect();
951            by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
952
953            // Drop the oldest entries (and their associated event buffers)
954            for (key, _) in by_staleness.into_iter().take(excess) {
955                self.state.remove(&key);
956                self.last_alert.remove(&key);
957                self.event_buffers.remove(&key);
958                self.event_ref_buffers.remove(&key);
959            }
960        }
961
962        // Phase 3: Evict stale last_alert entries — remove if the suppress window
963        // has passed or if the corresponding window state no longer exists.
964        self.last_alert.retain(|key, &mut alert_ts| {
965            let suppress = if key.0 < self.correlations.len() {
966                self.correlations[key.0]
967                    .suppress_secs
968                    .or(self.config.suppress)
969                    .unwrap_or(0)
970            } else {
971                0
972            };
973            (now_secs - alert_ts) < suppress as i64
974        });
975    }
976
977    /// Number of active state entries (for monitoring).
978    pub fn state_count(&self) -> usize {
979        self.state.len()
980    }
981
982    /// Number of detection rules loaded.
983    pub fn detection_rule_count(&self) -> usize {
984        self.engine.rule_count()
985    }
986
987    /// Number of correlation rules loaded.
988    pub fn correlation_rule_count(&self) -> usize {
989        self.correlations.len()
990    }
991
992    /// Number of active event buffers (for monitoring).
993    pub fn event_buffer_count(&self) -> usize {
994        self.event_buffers.len()
995    }
996
997    /// Total compressed bytes across all event buffers (for monitoring).
998    pub fn event_buffer_bytes(&self) -> usize {
999        self.event_buffers
1000            .values()
1001            .map(|b| b.compressed_bytes())
1002            .sum()
1003    }
1004
1005    /// Number of active event ref buffers — `Refs` mode (for monitoring).
1006    pub fn event_ref_buffer_count(&self) -> usize {
1007        self.event_ref_buffers.len()
1008    }
1009
1010    /// Access the inner stateless engine.
1011    pub fn engine(&self) -> &Engine {
1012        &self.engine
1013    }
1014
1015    /// Export all mutable correlation state as a serializable snapshot.
1016    ///
1017    /// The snapshot uses stable correlation identifiers (id > name > title)
1018    /// instead of internal indices, so it survives rule reloads as long as
1019    /// the correlation rules keep the same identifiers.
1020    pub fn export_state(&self) -> CorrelationSnapshot {
1021        let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1022        for ((idx, gk), ws) in &self.state {
1023            let corr_id = self.correlation_stable_id(*idx);
1024            windows
1025                .entry(corr_id)
1026                .or_default()
1027                .push((gk.clone(), ws.clone()));
1028        }
1029
1030        let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1031        for ((idx, gk), ts) in &self.last_alert {
1032            let corr_id = self.correlation_stable_id(*idx);
1033            last_alert
1034                .entry(corr_id)
1035                .or_default()
1036                .push((gk.clone(), *ts));
1037        }
1038
1039        let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1040        for ((idx, gk), buf) in &self.event_buffers {
1041            let corr_id = self.correlation_stable_id(*idx);
1042            event_buffers
1043                .entry(corr_id)
1044                .or_default()
1045                .push((gk.clone(), buf.clone()));
1046        }
1047
1048        let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1049            HashMap::new();
1050        for ((idx, gk), buf) in &self.event_ref_buffers {
1051            let corr_id = self.correlation_stable_id(*idx);
1052            event_ref_buffers
1053                .entry(corr_id)
1054                .or_default()
1055                .push((gk.clone(), buf.clone()));
1056        }
1057
1058        CorrelationSnapshot {
1059            version: SNAPSHOT_VERSION,
1060            windows,
1061            last_alert,
1062            event_buffers,
1063            event_ref_buffers,
1064        }
1065    }
1066
1067    /// Import previously exported state, mapping stable identifiers back to
1068    /// current correlation indices. Entries whose identifiers no longer match
1069    /// any loaded correlation are silently dropped.
1070    ///
1071    /// Returns `false` (and imports nothing) if the snapshot version is
1072    /// incompatible with the current schema.
1073    pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1074        if snapshot.version != SNAPSHOT_VERSION {
1075            return false;
1076        }
1077        let id_to_idx = self.build_id_to_index_map();
1078
1079        for (corr_id, groups) in snapshot.windows {
1080            if let Some(&idx) = id_to_idx.get(&corr_id) {
1081                for (gk, ws) in groups {
1082                    self.state.insert((idx, gk), ws);
1083                }
1084            }
1085        }
1086
1087        for (corr_id, groups) in snapshot.last_alert {
1088            if let Some(&idx) = id_to_idx.get(&corr_id) {
1089                for (gk, ts) in groups {
1090                    self.last_alert.insert((idx, gk), ts);
1091                }
1092            }
1093        }
1094
1095        for (corr_id, groups) in snapshot.event_buffers {
1096            if let Some(&idx) = id_to_idx.get(&corr_id) {
1097                for (gk, buf) in groups {
1098                    self.event_buffers.insert((idx, gk), buf);
1099                }
1100            }
1101        }
1102
1103        for (corr_id, groups) in snapshot.event_ref_buffers {
1104            if let Some(&idx) = id_to_idx.get(&corr_id) {
1105                for (gk, buf) in groups {
1106                    self.event_ref_buffers.insert((idx, gk), buf);
1107                }
1108            }
1109        }
1110
1111        true
1112    }
1113
1114    /// Stable identifier for a correlation rule: prefers id, then name, then title.
1115    fn correlation_stable_id(&self, idx: usize) -> String {
1116        let corr = &self.correlations[idx];
1117        corr.id
1118            .clone()
1119            .or_else(|| corr.name.clone())
1120            .unwrap_or_else(|| corr.title.clone())
1121    }
1122
1123    /// Build a reverse map from stable id → current correlation index.
1124    fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1125        self.correlations
1126            .iter()
1127            .enumerate()
1128            .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1129            .collect()
1130    }
1131}
1132
1133impl Default for CorrelationEngine {
1134    fn default() -> Self {
1135        Self::new(CorrelationConfig::default())
1136    }
1137}
1138
1139// =============================================================================
1140// Timestamp parsing helpers
1141// =============================================================================
1142
1143/// Extract a timestamp from an event using the given field names.
1144///
1145/// Standalone version of `CorrelationEngine::extract_event_timestamp` for use
1146/// in contexts where borrowing `&self` is not possible (e.g. rayon closures).
1147fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1148    for field_name in timestamp_fields {
1149        if let Some(val) = event.get_field(field_name)
1150            && let Some(ts) = parse_timestamp_value(&val)
1151        {
1152            return Some(ts);
1153        }
1154    }
1155    None
1156}
1157
1158/// Parse an [`EventValue`] as a Unix epoch timestamp in seconds.
1159fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1160    match val {
1161        EventValue::Int(i) => Some(normalize_epoch(*i)),
1162        EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1163        EventValue::Str(s) => parse_timestamp_string(s),
1164        _ => None,
1165    }
1166}
1167
1168/// Normalize an epoch value: if it looks like milliseconds (> year 33658),
1169/// convert to seconds.
1170fn normalize_epoch(v: i64) -> i64 {
1171    if v > 1_000_000_000_000 { v / 1000 } else { v }
1172}
1173
1174/// Parse a timestamp string. Tries ISO 8601 with timezone, then without.
1175fn parse_timestamp_string(s: &str) -> Option<i64> {
1176    // Try RFC 3339 / ISO 8601 with timezone
1177    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1178        return Some(dt.timestamp());
1179    }
1180
1181    // Try ISO 8601 without timezone (assume UTC)
1182    // Common formats: "2024-07-10T12:30:00", "2024-07-10 12:30:00"
1183    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1184        return Some(Utc.from_utc_datetime(&naive).timestamp());
1185    }
1186    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1187        return Some(Utc.from_utc_datetime(&naive).timestamp());
1188    }
1189
1190    // Try with fractional seconds
1191    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1192        return Some(Utc.from_utc_datetime(&naive).timestamp());
1193    }
1194    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1195        return Some(Utc.from_utc_datetime(&naive).timestamp());
1196    }
1197
1198    None
1199}
1200
1201/// Convert an [`EventValue`] to a string for value_count purposes.
1202fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1203    match v {
1204        EventValue::Str(s) => Some(s.to_string()),
1205        EventValue::Int(n) => Some(n.to_string()),
1206        EventValue::Float(f) => Some(f.to_string()),
1207        EventValue::Bool(b) => Some(b.to_string()),
1208        EventValue::Null => Some("null".to_string()),
1209        _ => None,
1210    }
1211}
1212
1213/// Convert an [`EventValue`] to f64 for numeric aggregation.
1214fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1215    v.as_f64()
1216}