Skip to main content

rsigma_eval/correlation_engine/
mod.rs

1//! Stateful correlation engine with time-windowed aggregation.
2//!
3//! `CorrelationEngine` wraps the stateless `Engine` and adds support for
4//! Sigma correlation rules: `event_count`, `value_count`, `temporal`,
5//! `temporal_ordered`, `value_sum`, `value_avg`, `value_percentile`,
6//! and `value_median`.
7//!
8//! # Architecture
9//!
10//! 1. Events are first evaluated against detection rules (stateless)
11//! 2. Detection matches update correlation window state (stateful)
12//! 3. When a correlation condition is met, a `CorrelationResult` is emitted
13//! 4. Correlation results can chain into higher-level correlations
14
15mod introspect;
16#[cfg(test)]
17mod tests;
18mod types;
19
20pub use introspect::{CorrelationInfo, CorrelationStateSnapshot, GroupKeyPart, GroupStateInfo};
21pub use types::*;
22
23use std::collections::HashMap;
24
25use chrono::{DateTime, TimeZone, Utc};
26
27use rsigma_parser::{CorrelationRule, CorrelationType, SigmaCollection, SigmaRule, WindowMode};
28
29use crate::correlation::{
30    CompiledCorrelation, EventBuffer, EventRefBuffer, GroupKey, WindowDecision, WindowState,
31    apply_window_open, compile_correlation,
32};
33use crate::engine::Engine;
34use crate::error::{EvalError, Result};
35use crate::event::{Event, EventValue};
36use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
37use crate::result::{CorrelationBody, EvaluationResult, ResultBody, RuleHeader};
38
39// =============================================================================
40// Correlation Engine
41// =============================================================================
42
43/// Current snapshot schema version. Bump when the serialized format changes.
44const SNAPSHOT_VERSION: u32 = 1;
45
46/// Stateful correlation engine.
47///
48/// Wraps the stateless `Engine` for detection rules and adds time-windowed
49/// correlation on top. Supports all 7 Sigma correlation types and chaining.
50pub struct CorrelationEngine {
51    /// Inner stateless detection engine.
52    engine: Engine,
53    /// Compiled correlation rules.
54    correlations: Vec<CompiledCorrelation>,
55    /// Maps rule ID/name -> indices into `correlations` that reference it.
56    /// This allows quick lookup: "which correlations care about rule X?"
57    rule_index: HashMap<String, Vec<usize>>,
58    /// Maps detection rule index -> (rule_id, rule_name) for reverse lookup.
59    /// Used to find which correlations a detection match triggers.
60    rule_ids: Vec<(Option<String>, Option<String>)>,
61    /// Per-(correlation_index, group_key) window state.
62    state: HashMap<(usize, GroupKey), WindowState>,
63    /// Last alert timestamp per (correlation_index, group_key) for suppression.
64    last_alert: HashMap<(usize, GroupKey), i64>,
65    /// Per-(correlation_index, group_key) compressed event buffer (`Full` mode).
66    event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
67    /// Per-(correlation_index, group_key) event reference buffer (`Refs` mode).
68    event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
69    /// Set of detection rule IDs/names that are "correlation-only"
70    /// (referenced by correlations where `generate == false`).
71    /// Used to filter detection output when `config.emit_detections == false`.
72    correlation_only_rules: std::collections::HashSet<String>,
73    /// Configuration.
74    config: CorrelationConfig,
75    /// Processing pipelines applied to rules during add_rule.
76    pipelines: Vec<Pipeline>,
77}
78
79impl CorrelationEngine {
80    /// Create a new correlation engine with the given configuration.
81    pub fn new(config: CorrelationConfig) -> Self {
82        CorrelationEngine {
83            engine: Engine::new(),
84            correlations: Vec::new(),
85            rule_index: HashMap::new(),
86            rule_ids: Vec::new(),
87            state: HashMap::new(),
88            last_alert: HashMap::new(),
89            event_buffers: HashMap::new(),
90            event_ref_buffers: HashMap::new(),
91            correlation_only_rules: std::collections::HashSet::new(),
92            config,
93            pipelines: Vec::new(),
94        }
95    }
96
97    /// Add a pipeline to the engine.
98    ///
99    /// Pipelines are applied to rules during `add_rule` / `add_collection`.
100    pub fn add_pipeline(&mut self, pipeline: Pipeline) {
101        self.pipelines.push(pipeline);
102        self.pipelines.sort_by_key(|p| p.priority);
103    }
104
105    /// Set global `include_event` on the inner detection engine.
106    pub fn set_include_event(&mut self, include: bool) {
107        self.engine.set_include_event(include);
108    }
109
110    /// Forward to [`crate::Engine::set_match_detail`] on the inner detection
111    /// engine. Correlation detections inherit the level set here.
112    pub fn set_match_detail(&mut self, level: crate::result::MatchDetailLevel) {
113        self.engine.set_match_detail(level);
114    }
115
116    /// Forward to [`crate::Engine::set_bloom_prefilter`] on the inner
117    /// detection engine. Off by default; the optimization helps only on
118    /// substring-heavy rule sets paired with mostly-non-matching events.
119    pub fn set_bloom_prefilter(&mut self, enabled: bool) {
120        self.engine.set_bloom_prefilter(enabled);
121    }
122
123    /// Forward to [`crate::Engine::set_logsource_extractor`] on the inner
124    /// detection engine. Correlation inherits logsource pruning, since
125    /// `process_event` evaluates through this engine.
126    pub fn set_logsource_extractor(
127        &mut self,
128        extractor: Option<crate::logsource::LogSourceExtractor>,
129    ) {
130        self.engine.set_logsource_extractor(extractor);
131    }
132
133    /// Total rule candidates pruned by logsource on the inner engine.
134    pub fn logsource_pruned_total(&self) -> u64 {
135        self.engine.logsource_pruned_total()
136    }
137
138    /// Total evaluate calls with no extractable event logsource (fail-open).
139    pub fn logsource_absent_total(&self) -> u64 {
140        self.engine.logsource_absent_total()
141    }
142
143    /// Forward to [`crate::Engine::set_bloom_max_bytes`] on the inner
144    /// detection engine.
145    pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
146        self.engine.set_bloom_max_bytes(max_bytes);
147    }
148
149    /// Forward to [`crate::Engine::set_cross_rule_ac`] on the inner
150    /// detection engine. Off by default. Available behind the
151    /// `daachorse-index` Cargo feature.
152    #[cfg(feature = "daachorse-index")]
153    pub fn set_cross_rule_ac(&mut self, enabled: bool) {
154        self.engine.set_cross_rule_ac(enabled);
155    }
156
157    /// Set the global correlation event mode.
158    ///
159    /// - `None`: no event storage (default)
160    /// - `Full`: compressed event bodies
161    /// - `Refs`: lightweight timestamp + ID references
162    pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
163        self.config.correlation_event_mode = mode;
164    }
165
166    /// Set the maximum number of events to store per correlation window group.
167    /// Only meaningful when `correlation_event_mode` is not `None`.
168    pub fn set_max_correlation_events(&mut self, max: usize) {
169        self.config.max_correlation_events = max;
170    }
171
172    /// Add a single detection rule.
173    ///
174    /// If pipelines are set, the rule is cloned and transformed before compilation.
175    /// The inner engine receives the already-transformed rule directly (not through
176    /// its own pipeline, to avoid double transformation).
177    pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
178        if self.pipelines.is_empty() {
179            self.apply_custom_attributes(&rule.custom_attributes);
180            self.rule_ids.push((rule.id.clone(), rule.name.clone()));
181            self.engine.add_rule(rule)?;
182        } else {
183            let mut transformed = rule.clone();
184            apply_pipelines(&self.pipelines, &mut transformed)?;
185            self.apply_custom_attributes(&transformed.custom_attributes);
186            self.rule_ids
187                .push((transformed.id.clone(), transformed.name.clone()));
188            // Use compile_rule + add_compiled_rule to bypass inner engine's pipelines
189            let compiled = crate::compiler::compile_rule(&transformed)?;
190            self.engine.add_compiled_rule(compiled);
191        }
192        Ok(())
193    }
194
195    /// Read `rsigma.*` custom attributes from a rule and apply them to the
196    /// engine configuration.  This allows pipelines to influence engine
197    /// behaviour via `SetCustomAttribute` transformations — the same pattern
198    /// used by pySigma backends (e.g. pySigma-backend-loki).
199    ///
200    /// Supported attributes:
201    /// - `rsigma.timestamp_field` — prepends a field name to the timestamp
202    ///   extraction priority list so the correlation engine can find the
203    ///   event timestamp in non-standard field names.
204    /// - `rsigma.suppress` — sets the default suppression window (e.g. `5m`).
205    ///   Only applied when the CLI did not already set `--suppress`.
206    /// - `rsigma.action` — sets the default post-fire action (`alert`/`reset`).
207    ///   Only applied when the CLI did not already set `--action`.
208    fn apply_custom_attributes(
209        &mut self,
210        attrs: &std::collections::HashMap<String, yaml_serde::Value>,
211    ) {
212        // rsigma.timestamp_field — prepend to priority list, skip duplicates
213        if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
214            && !self.config.timestamp_fields.iter().any(|f| f == field)
215        {
216            self.config.timestamp_fields.insert(0, field.to_string());
217        }
218
219        // rsigma.suppress — only when CLI didn't already set one
220        if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
221            && self.config.suppress.is_none()
222            && let Ok(ts) = rsigma_parser::Timespan::parse(val)
223        {
224            self.config.suppress = Some(ts.seconds);
225        }
226
227        // rsigma.action — only when CLI left it at the default (Alert)
228        if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
229            && self.config.action_on_match == CorrelationAction::Alert
230            && let Ok(a) = val.parse::<CorrelationAction>()
231        {
232            self.config.action_on_match = a;
233        }
234    }
235
236    /// Add a single correlation rule.
237    pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
238        let owned;
239        let effective = if self.pipelines.is_empty() {
240            corr
241        } else {
242            owned = {
243                let mut c = corr.clone();
244                apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
245                c
246            };
247            &owned
248        };
249
250        // Apply engine-level custom attributes from the (possibly transformed)
251        // correlation rule (e.g. rsigma.timestamp_field).
252        self.apply_custom_attributes(&effective.custom_attributes);
253
254        let compiled = compile_correlation(effective)?;
255        let idx = self.correlations.len();
256
257        // Index by each referenced rule ID/name
258        for rule_ref in &compiled.rule_refs {
259            self.rule_index
260                .entry(rule_ref.clone())
261                .or_default()
262                .push(idx);
263        }
264
265        // Track correlation-only rules (generate == false is the default)
266        if !compiled.generate {
267            for rule_ref in &compiled.rule_refs {
268                self.correlation_only_rules.insert(rule_ref.clone());
269            }
270        }
271
272        self.correlations.push(compiled);
273        Ok(())
274    }
275
276    /// Add all rules and correlations from a parsed collection.
277    ///
278    /// Detection rules are added first (so they're available for correlation
279    /// references), then correlation rules. Detection rules are compiled
280    /// sequentially and then pushed to the inner engine in a single batch,
281    /// so the inverted index and bloom filter are rebuilt exactly once for
282    /// the whole collection. Without this batching, large rule sets
283    /// (multi-thousand rules) hit an O(N²) rebuild cost on load.
284    pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
285        let mut compiled_batch = Vec::with_capacity(collection.rules.len());
286        if self.pipelines.is_empty() {
287            for rule in &collection.rules {
288                self.apply_custom_attributes(&rule.custom_attributes);
289                self.rule_ids.push((rule.id.clone(), rule.name.clone()));
290                compiled_batch.push(crate::compiler::compile_rule(rule)?);
291            }
292        } else {
293            for rule in &collection.rules {
294                let mut transformed = rule.clone();
295                apply_pipelines(&self.pipelines, &mut transformed)?;
296                self.apply_custom_attributes(&transformed.custom_attributes);
297                self.rule_ids
298                    .push((transformed.id.clone(), transformed.name.clone()));
299                // Bypass the inner engine's pipelines (would double-transform)
300                compiled_batch.push(crate::compiler::compile_rule(&transformed)?);
301            }
302        }
303        self.engine.extend_compiled_rules(compiled_batch);
304        // Apply filter rules to the inner engine's detection rules
305        for filter in &collection.filters {
306            self.engine.apply_filter(filter)?;
307        }
308        for corr in &collection.correlations {
309            self.add_correlation(corr)?;
310        }
311        self.validate_rule_refs()?;
312        self.detect_correlation_cycles()?;
313        Ok(())
314    }
315
316    /// Validate that every correlation's `rule_refs` resolve to at least one
317    /// known detection rule (by ID or name) or another correlation (by ID or name).
318    fn validate_rule_refs(&self) -> Result<()> {
319        let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
320
321        for (id, name) in &self.rule_ids {
322            if let Some(id) = id {
323                known.insert(id.as_str());
324            }
325            if let Some(name) = name {
326                known.insert(name.as_str());
327            }
328        }
329        for corr in &self.correlations {
330            if let Some(ref id) = corr.id {
331                known.insert(id.as_str());
332            }
333            if let Some(ref name) = corr.name {
334                known.insert(name.as_str());
335            }
336        }
337
338        for corr in &self.correlations {
339            for rule_ref in &corr.rule_refs {
340                if !known.contains(rule_ref.as_str()) {
341                    return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
342                }
343            }
344        }
345        Ok(())
346    }
347
348    /// Detect cycles in the correlation reference graph.
349    ///
350    /// Builds a directed graph where each correlation (identified by its id/name)
351    /// has edges to the correlations it references via `rule_refs`. Uses DFS with
352    /// a "gray/black" coloring scheme to detect back-edges (cycles).
353    ///
354    /// Returns `Err(EvalError::CorrelationCycle)` if a cycle is found.
355    fn detect_correlation_cycles(&self) -> Result<()> {
356        // Build a set of all correlation identifiers (id and/or name)
357        let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
358        for (idx, corr) in self.correlations.iter().enumerate() {
359            if let Some(ref id) = corr.id {
360                corr_identifiers.insert(id.as_str(), idx);
361            }
362            if let Some(ref name) = corr.name {
363                corr_identifiers.insert(name.as_str(), idx);
364            }
365        }
366
367        // Build adjacency list: corr index → set of corr indices it references
368        let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
369        for (idx, corr) in self.correlations.iter().enumerate() {
370            for rule_ref in &corr.rule_refs {
371                if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
372                    adj[idx].push(target_idx);
373                }
374            }
375        }
376
377        // DFS cycle detection with three states: white (unvisited), gray (in stack), black (done)
378        let mut state = vec![0u8; self.correlations.len()]; // 0=white, 1=gray, 2=black
379        let mut path: Vec<usize> = Vec::new();
380
381        for start in 0..self.correlations.len() {
382            if state[start] == 0
383                && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
384            {
385                let names: Vec<String> = cycle
386                    .iter()
387                    .map(|&i| {
388                        self.correlations[i]
389                            .id
390                            .as_deref()
391                            .or(self.correlations[i].name.as_deref())
392                            .unwrap_or(&self.correlations[i].title)
393                            .to_string()
394                    })
395                    .collect();
396                return Err(crate::error::EvalError::CorrelationCycle(
397                    names.join(" -> "),
398                ));
399            }
400        }
401        Ok(())
402    }
403
404    /// DFS helper that returns the cycle path if a back-edge is found.
405    fn dfs_find_cycle(
406        node: usize,
407        adj: &[Vec<usize>],
408        state: &mut [u8],
409        path: &mut Vec<usize>,
410    ) -> Option<Vec<usize>> {
411        state[node] = 1; // gray
412        path.push(node);
413
414        for &next in &adj[node] {
415            if state[next] == 1 {
416                // Back-edge found — extract cycle from path
417                if let Some(pos) = path.iter().position(|&n| n == next) {
418                    let mut cycle = path[pos..].to_vec();
419                    cycle.push(next); // close the cycle
420                    return Some(cycle);
421                }
422            }
423            if state[next] == 0
424                && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
425            {
426                return Some(cycle);
427            }
428        }
429
430        path.pop();
431        state[node] = 2; // black
432        None
433    }
434
435    /// Process an event, extracting the timestamp from configured event fields.
436    ///
437    /// When no timestamp field is found, the `timestamp_fallback` policy applies:
438    /// - `WallClock`: use `Utc::now()` (good for real-time streaming)
439    /// - `Skip`: return detections only, skip correlation state updates
440    pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
441        let all_detections = self.engine.evaluate(event);
442        self.correlate_detections(event, all_detections)
443    }
444
445    /// Run the correlation layer over externally-produced detections.
446    ///
447    /// Like [`process_event`](Self::process_event) but the detections are
448    /// supplied by the caller instead of computed by this engine's inner
449    /// detection engine. This lets a multi-engine router run detection in a
450    /// per-schema engine and still feed every detection into one shared
451    /// correlation store. Timestamp extraction and the `timestamp_fallback`
452    /// policy match `process_event`.
453    pub fn correlate_detections(
454        &mut self,
455        event: &impl Event,
456        all_detections: Vec<EvaluationResult>,
457    ) -> ProcessResult {
458        let ts = match self.extract_event_timestamp(event) {
459            Some(ts) => ts,
460            None => match self.config.timestamp_fallback {
461                TimestampFallback::WallClock => Utc::now().timestamp(),
462                TimestampFallback::Skip => {
463                    // Still surface detections, but skip correlation state.
464                    return self.filter_detections(all_detections);
465                }
466            },
467        };
468        self.process_with_detections(event, all_detections, ts)
469    }
470
471    /// Process an event with an explicit Unix epoch timestamp (seconds).
472    ///
473    /// The timestamp is clamped to `[0, i64::MAX / 2]` to prevent overflow
474    /// when adding timespan durations internally.
475    pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
476        let all_detections = self.engine.evaluate(event);
477        self.process_with_detections(event, all_detections, timestamp_secs)
478    }
479
480    /// Process an event with pre-computed detection results.
481    ///
482    /// Enables external parallelism: callers can run detection (via
483    /// [`evaluate`](Self::evaluate)) in parallel, then feed results here
484    /// sequentially for stateful correlation.
485    pub fn process_with_detections(
486        &mut self,
487        event: &impl Event,
488        all_detections: Vec<EvaluationResult>,
489        timestamp_secs: i64,
490    ) -> ProcessResult {
491        let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
492
493        // Memory management — evict before adding new state to enforce limit
494        if self.state.len() >= self.config.max_state_entries {
495            self.evict_all(timestamp_secs);
496        }
497
498        // Feed detection matches into correlations
499        let mut correlations: Vec<EvaluationResult> = Vec::new();
500        self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
501
502        // Chain — correlation results may trigger higher-level correlations
503        self.chain_correlations(&correlations, timestamp_secs);
504
505        // Filter detections by generate flag, then append the correlations.
506        let mut out = self.filter_detections(all_detections);
507        out.extend(correlations);
508        out
509    }
510
511    /// Run stateless detection only (no correlation), delegating to the inner engine.
512    ///
513    /// Returns one [`EvaluationResult`] per matched detection. Takes `&self`
514    /// so it can be called concurrently from multiple threads (e.g. via
515    /// `rayon::par_iter`) while the mutable correlation phase runs
516    /// sequentially afterwards.
517    pub fn evaluate(&self, event: &impl Event) -> Vec<EvaluationResult> {
518        self.engine.evaluate(event)
519    }
520
521    /// Process a batch of events: parallel detection, then sequential correlation.
522    ///
523    /// When the `parallel` feature is enabled, the stateless detection phase runs
524    /// concurrently via rayon. Timestamp extraction also runs in the parallel
525    /// phase (it borrows `&self.config` immutably). After `collect()` releases the
526    /// immutable borrows, each event's pre-computed detections are fed into the
527    /// stateful correlation engine sequentially.
528    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
529        // Borrow split: take immutable refs to fields needed for the parallel phase.
530        // These are released by collect() before the sequential &mut self phase.
531        let engine = &self.engine;
532        let ts_fields = &self.config.timestamp_fields;
533
534        let batch_results: Vec<(Vec<EvaluationResult>, Option<i64>)> = {
535            #[cfg(feature = "parallel")]
536            {
537                use rayon::prelude::*;
538                events
539                    .par_iter()
540                    .map(|e| {
541                        let detections = engine.evaluate(e);
542                        let ts = extract_event_ts(e, ts_fields);
543                        (detections, ts)
544                    })
545                    .collect()
546            }
547            #[cfg(not(feature = "parallel"))]
548            {
549                events
550                    .iter()
551                    .map(|e| {
552                        let detections = engine.evaluate(e);
553                        let ts = extract_event_ts(e, ts_fields);
554                        (detections, ts)
555                    })
556                    .collect()
557            }
558        };
559
560        // Sequential correlation phase
561        let mut results = Vec::with_capacity(events.len());
562        for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
563            match ts_opt {
564                Some(ts) => {
565                    results.push(self.process_with_detections(event, detections, ts));
566                }
567                None => match self.config.timestamp_fallback {
568                    TimestampFallback::WallClock => {
569                        let ts = Utc::now().timestamp();
570                        results.push(self.process_with_detections(event, detections, ts));
571                    }
572                    TimestampFallback::Skip => {
573                        // Still return detection results, but skip correlation
574                        results.push(self.filter_detections(detections));
575                    }
576                },
577            }
578        }
579        results
580    }
581
582    /// Filter detections by the `generate` flag / `emit_detections` config.
583    ///
584    /// If `emit_detections` is false and some rules are correlation-only,
585    /// their detection output is suppressed.
586    fn filter_detections(&self, all_detections: Vec<EvaluationResult>) -> Vec<EvaluationResult> {
587        if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
588            all_detections
589                .into_iter()
590                .filter(|m| {
591                    let id_match = m
592                        .header
593                        .rule_id
594                        .as_ref()
595                        .is_some_and(|id| self.correlation_only_rules.contains(id));
596                    !id_match
597                })
598                .collect()
599        } else {
600            all_detections
601        }
602    }
603
604    /// Feed detection matches into correlation window states.
605    fn feed_detections(
606        &mut self,
607        event: &impl Event,
608        detections: &[EvaluationResult],
609        ts: i64,
610        out: &mut Vec<EvaluationResult>,
611    ) {
612        // Collect all (corr_idx, rule_id, rule_name) tuples upfront to avoid
613        // borrow conflicts between self.rule_ids and self.update_correlation.
614        let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
615
616        for det in detections {
617            // Use the MatchResult's rule_id to find the original rule's ID/name.
618            // We also look up by rule_id in our rule_ids table for the name.
619            let (rule_id, rule_name) = self.find_rule_identity(det);
620
621            // Collect correlation indices that reference this rule
622            let mut corr_indices = Vec::new();
623            if let Some(ref id) = rule_id
624                && let Some(indices) = self.rule_index.get(id)
625            {
626                corr_indices.extend(indices);
627            }
628            if let Some(ref name) = rule_name
629                && let Some(indices) = self.rule_index.get(name)
630            {
631                corr_indices.extend(indices);
632            }
633
634            corr_indices.sort_unstable();
635            corr_indices.dedup();
636
637            for &corr_idx in &corr_indices {
638                work.push((corr_idx, rule_id.clone(), rule_name.clone()));
639            }
640        }
641
642        for (corr_idx, rule_id, rule_name) in work {
643            self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
644        }
645    }
646
647    /// Find the (id, name) for a detection match by searching our rule_ids table.
648    fn find_rule_identity(&self, det: &EvaluationResult) -> (Option<String>, Option<String>) {
649        // First, try to find by matching rule_id in our table
650        if let Some(ref match_id) = det.header.rule_id {
651            for (id, name) in &self.rule_ids {
652                if id.as_deref() == Some(match_id.as_str()) {
653                    return (id.clone(), name.clone());
654                }
655            }
656        }
657        // Fall back to using just the EvaluationResult's rule_id
658        (det.header.rule_id.clone(), None)
659    }
660
661    /// Resolve the event mode for a given correlation.
662    fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
663        let corr = &self.correlations[corr_idx];
664        corr.event_mode
665            .unwrap_or(self.config.correlation_event_mode)
666    }
667
668    /// Resolve the max events cap for a given correlation.
669    fn resolve_max_events(&self, corr_idx: usize) -> usize {
670        let corr = &self.correlations[corr_idx];
671        corr.max_events
672            .unwrap_or(self.config.max_correlation_events)
673    }
674
675    /// Resolve the per-group window-state entry cap for a given correlation.
676    /// `None` means unbounded.
677    fn resolve_max_group_entries(&self, corr_idx: usize) -> Option<usize> {
678        let corr = &self.correlations[corr_idx];
679        corr.max_group_entries.or(self.config.max_group_entries)
680    }
681
682    /// Update a single correlation's state and check its condition.
683    fn update_correlation(
684        &mut self,
685        corr_idx: usize,
686        event: &impl Event,
687        ts: i64,
688        rule_id: &Option<String>,
689        rule_name: &Option<String>,
690        out: &mut Vec<EvaluationResult>,
691    ) {
692        // Borrow the correlation by reference — no cloning needed.  Rust allows
693        // simultaneous &self.correlations and &mut self.state / &mut self.last_alert
694        // because they are disjoint struct fields.
695        let corr = &self.correlations[corr_idx];
696        let corr_type = corr.correlation_type;
697        let timespan = corr.timespan_secs;
698        let window_mode = corr.window_mode;
699        let gap_secs = corr.gap_secs;
700        let level = corr.level;
701        let suppress_secs = corr.suppress_secs.or(self.config.suppress);
702        let action = corr.action.unwrap_or(self.config.action_on_match);
703        let event_mode = self.resolve_event_mode(corr_idx);
704        let max_events = self.resolve_max_events(corr_idx);
705        let max_group_entries = self.resolve_max_group_entries(corr_idx);
706
707        // Determine the rule_ref strings for alias resolution and temporal tracking.
708        let mut ref_strs: Vec<&str> = Vec::new();
709        if let Some(id) = rule_id.as_deref() {
710            ref_strs.push(id);
711        }
712        if let Some(name) = rule_name.as_deref() {
713            ref_strs.push(name);
714        }
715        let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
716
717        // Extract group key
718        let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
719
720        // Get or create window state
721        let state_key = (corr_idx, group_key.clone());
722        let state = self
723            .state
724            .entry(state_key.clone())
725            .or_insert_with(|| WindowState::new_for(corr_type));
726
727        // Apply the window's pre-insert maintenance (sliding evict, tumbling
728        // bucket reset/late-event discard, or session gap/cap reset). On
729        // `Reset` the event buffers below are cleared in sync; on `Discard`
730        // (a late arrival in an earlier tumbling bucket) the event is dropped
731        // without touching the state or buffers.
732        let cutoff = ts - timespan as i64;
733        let decision = apply_window_open(state, ts, timespan, window_mode, gap_secs);
734        if decision == WindowDecision::Discard {
735            return;
736        }
737        let reset = decision == WindowDecision::Reset;
738
739        // Push the new event into the state
740        match corr_type {
741            CorrelationType::EventCount => {
742                state.push_event_count(ts);
743            }
744            CorrelationType::ValueCount => {
745                if let Some(ref fields) = corr.condition.field
746                    && let Some(key) = composite_value_count_key(event, fields)
747                {
748                    state.push_value_count(ts, key);
749                }
750            }
751            CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
752                state.push_temporal(ts, rule_ref);
753            }
754            CorrelationType::ValueSum
755            | CorrelationType::ValueAvg
756            | CorrelationType::ValuePercentile
757            | CorrelationType::ValueMedian => {
758                if let Some(ref fields) = corr.condition.field
759                    && let Some(field_name) = fields.first()
760                    && let Some(val) = event.get_field(field_name)
761                    && let Some(n) = value_to_f64_ev(&val)
762                {
763                    state.push_numeric(ts, n);
764                }
765            }
766        }
767
768        // Enforce the per-group entry cap. Session windows keep their oldest
769        // entry as the span anchor so truncation cannot silently extend the
770        // `timespan` cap.
771        if let Some(cap) = max_group_entries {
772            state.truncate_oldest(cap, window_mode == WindowMode::Session);
773        }
774
775        // Push event into buffer based on event mode. Keep the buffer's retained
776        // events aligned with the window state: sliding evicts by the trailing
777        // cutoff, while tumbling/session clear the buffer when the window reset.
778        match event_mode {
779            CorrelationEventMode::Full => {
780                let buf = self
781                    .event_buffers
782                    .entry(state_key.clone())
783                    .or_insert_with(|| EventBuffer::new(max_events));
784                if window_mode == rsigma_parser::WindowMode::Sliding {
785                    buf.evict(cutoff);
786                } else if reset {
787                    buf.clear();
788                }
789                let json = event.to_json();
790                buf.push(ts, &json);
791            }
792            CorrelationEventMode::Refs => {
793                let buf = self
794                    .event_ref_buffers
795                    .entry(state_key.clone())
796                    .or_insert_with(|| EventRefBuffer::new(max_events));
797                if window_mode == rsigma_parser::WindowMode::Sliding {
798                    buf.evict(cutoff);
799                } else if reset {
800                    buf.clear();
801                }
802                let json = event.to_json();
803                buf.push(ts, &json);
804            }
805            CorrelationEventMode::None => {}
806        }
807
808        // Check condition — after this, `state` is no longer used (NLL drops the borrow).
809        let fired = state.check_condition(
810            &corr.condition,
811            corr_type,
812            &corr.rule_refs,
813            corr.extended_expr.as_ref(),
814        );
815
816        if let Some(agg_value) = fired {
817            let alert_key = (corr_idx, group_key.clone());
818
819            // Suppression check: skip if we've already alerted within the suppress window
820            let suppressed = if let Some(suppress) = suppress_secs {
821                if let Some(&last_ts) = self.last_alert.get(&alert_key) {
822                    (ts - last_ts) < suppress as i64
823                } else {
824                    false
825                }
826            } else {
827                false
828            };
829
830            if !suppressed {
831                // Retrieve stored events / refs based on mode
832                let (events, event_refs) = match event_mode {
833                    CorrelationEventMode::Full => {
834                        let stored = self
835                            .event_buffers
836                            .get(&alert_key)
837                            .map(|buf| buf.decompress_all())
838                            .unwrap_or_default();
839                        (Some(stored), None)
840                    }
841                    CorrelationEventMode::Refs => {
842                        let stored = self
843                            .event_ref_buffers
844                            .get(&alert_key)
845                            .map(|buf| buf.refs())
846                            .unwrap_or_default();
847                        (None, Some(stored))
848                    }
849                    CorrelationEventMode::None => (None, None),
850                };
851
852                // Only clone title/id/tags when we actually produce output
853                let corr = &self.correlations[corr_idx];
854                let result = EvaluationResult {
855                    header: RuleHeader {
856                        rule_title: corr.title.clone(),
857                        rule_id: corr.id.clone(),
858                        level,
859                        tags: corr.tags.clone(),
860                        custom_attributes: corr.custom_attributes.clone(),
861                        enrichments: None,
862                    },
863                    body: ResultBody::Correlation(CorrelationBody {
864                        correlation_type: corr_type,
865                        group_key: group_key.to_pairs(&corr.group_by),
866                        aggregated_value: agg_value,
867                        timespan_secs: timespan,
868                        events,
869                        event_refs,
870                    }),
871                };
872                out.push(result);
873
874                // Record alert time for suppression
875                self.last_alert.insert(alert_key.clone(), ts);
876
877                // Action on match
878                if action == CorrelationAction::Reset {
879                    if let Some(state) = self.state.get_mut(&alert_key) {
880                        state.clear();
881                    }
882                    if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
883                        buf.clear();
884                    }
885                    if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
886                        buf.clear();
887                    }
888                }
889            }
890        }
891    }
892
893    /// Propagate correlation results to higher-level correlations (chaining).
894    ///
895    /// When a correlation fires, any correlation that references it (by ID or name)
896    /// is updated. Limits chain depth to 10 to prevent infinite loops.
897    fn chain_correlations(&mut self, fired: &[EvaluationResult], ts: i64) {
898        const MAX_CHAIN_DEPTH: usize = 10;
899        let mut pending: Vec<EvaluationResult> = fired.to_vec();
900        let mut depth = 0;
901
902        while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
903            depth += 1;
904
905            // Collect work items: (corr_idx, group_key_pairs, fired_ref)
906            #[allow(clippy::type_complexity)]
907            let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
908            for result in &pending {
909                // Only correlation results chain. Detections never reach here.
910                let Some(body) = result.as_correlation() else {
911                    continue;
912                };
913                if let Some(ref id) = result.header.rule_id
914                    && let Some(indices) = self.rule_index.get(id)
915                {
916                    let fired_ref = result
917                        .header
918                        .rule_id
919                        .as_deref()
920                        .unwrap_or(&result.header.rule_title)
921                        .to_string();
922                    for &corr_idx in indices {
923                        work.push((corr_idx, body.group_key.clone(), fired_ref.clone()));
924                    }
925                }
926            }
927
928            let mut next_pending = Vec::new();
929            for (corr_idx, group_key_pairs, fired_ref) in work {
930                let corr = &self.correlations[corr_idx];
931                let corr_type = corr.correlation_type;
932                let timespan = corr.timespan_secs;
933                let window_mode = corr.window_mode;
934                let gap_secs = corr.gap_secs;
935                let level = corr.level;
936
937                let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
938                let state_key = (corr_idx, group_key.clone());
939                let state = self
940                    .state
941                    .entry(state_key)
942                    .or_insert_with(|| WindowState::new_for(corr_type));
943
944                // Late arrivals in an earlier tumbling bucket are discarded;
945                // chained correlations keep no event buffers, so `Reset` needs
946                // no extra bookkeeping here.
947                if apply_window_open(state, ts, timespan, window_mode, gap_secs)
948                    == WindowDecision::Discard
949                {
950                    continue;
951                }
952
953                match corr_type {
954                    CorrelationType::EventCount => {
955                        state.push_event_count(ts);
956                    }
957                    CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
958                        state.push_temporal(ts, &fired_ref);
959                    }
960                    _ => {
961                        state.push_event_count(ts);
962                    }
963                }
964
965                // Same per-group cap as the direct path; session windows
966                // keep the span anchor.
967                if let Some(cap) = corr.max_group_entries.or(self.config.max_group_entries) {
968                    state.truncate_oldest(cap, window_mode == WindowMode::Session);
969                }
970
971                let fired = state.check_condition(
972                    &corr.condition,
973                    corr_type,
974                    &corr.rule_refs,
975                    corr.extended_expr.as_ref(),
976                );
977
978                if let Some(agg_value) = fired {
979                    let corr = &self.correlations[corr_idx];
980                    next_pending.push(EvaluationResult {
981                        header: RuleHeader {
982                            rule_title: corr.title.clone(),
983                            rule_id: corr.id.clone(),
984                            level,
985                            tags: corr.tags.clone(),
986                            custom_attributes: corr.custom_attributes.clone(),
987                            enrichments: None,
988                        },
989                        body: ResultBody::Correlation(CorrelationBody {
990                            correlation_type: corr_type,
991                            group_key: group_key.to_pairs(&corr.group_by),
992                            aggregated_value: agg_value,
993                            timespan_secs: timespan,
994                            // Chained correlations don't include events
995                            // (they aggregate over correlation results, not
996                            // raw events)
997                            events: None,
998                            event_refs: None,
999                        }),
1000                    });
1001                }
1002            }
1003
1004            pending = next_pending;
1005        }
1006
1007        if !pending.is_empty() {
1008            log::warn!(
1009                "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
1010                 {} pending result(s) were not propagated further. \
1011                 This may indicate a cycle in correlation references.",
1012                pending.len()
1013            );
1014        }
1015    }
1016
1017    // =========================================================================
1018    // Timestamp extraction
1019    // =========================================================================
1020
1021    /// Extract a Unix epoch timestamp (seconds) from an event.
1022    ///
1023    /// Tries each configured timestamp field in order. Supports:
1024    /// - Numeric values (epoch seconds, or epoch millis if > 1e12)
1025    /// - ISO 8601 strings (e.g., "2024-07-10T12:30:00Z")
1026    ///
1027    /// Returns `None` if no field yields a valid timestamp.
1028    fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
1029        for field_name in &self.config.timestamp_fields {
1030            if let Some(val) = event.get_field(field_name)
1031                && let Some(ts) = parse_timestamp_value(&val)
1032            {
1033                return Some(ts);
1034            }
1035        }
1036        None
1037    }
1038
1039    // =========================================================================
1040    // State management
1041    // =========================================================================
1042
1043    /// Manually evict all expired state entries.
1044    pub fn evict_expired(&mut self, now_secs: i64) {
1045        self.evict_all(now_secs);
1046    }
1047
1048    /// Evict expired entries and remove empty states.
1049    fn evict_all(&mut self, now_secs: i64) {
1050        // Phase 1: Time-based eviction — remove entries outside their correlation
1051        // window. Eviction is window-mode aware:
1052        //
1053        // - Sliding: trim entries older than the trailing cutoff, as always.
1054        // - Tumbling/session: never trim from the front — that would forget the
1055        //   bucket/session start and silently weaken the `timespan` cap (the
1056        //   window would drift toward sliding semantics). Instead, drop the
1057        //   whole group once it is stale (no event within its bucket span /
1058        //   session gap), which is exactly the point at which the next arrival
1059        //   would reset it anyway.
1060        let specs: Vec<(u64, WindowMode, Option<u64>)> = self
1061            .correlations
1062            .iter()
1063            .map(|c| (c.timespan_secs, c.window_mode, c.gap_secs))
1064            .collect();
1065
1066        self.state.retain(|&(corr_idx, _), state| {
1067            if let Some(&(timespan, mode, gap)) = specs.get(corr_idx) {
1068                match mode {
1069                    WindowMode::Sliding => {
1070                        state.evict(now_secs - timespan as i64);
1071                    }
1072                    WindowMode::Tumbling | WindowMode::Session => {
1073                        let staleness = if mode == WindowMode::Session {
1074                            gap.unwrap_or(timespan)
1075                        } else {
1076                            timespan
1077                        } as i64;
1078                        if state
1079                            .latest_timestamp()
1080                            .is_some_and(|last| now_secs - last > staleness)
1081                        {
1082                            state.clear();
1083                        }
1084                    }
1085                }
1086            }
1087            !state.is_empty()
1088        });
1089
1090        // Evict event buffers in sync with window state: sliding buffers trim by
1091        // the trailing cutoff, tumbling/session buffers live and die with their
1092        // window state (dropped above when the group went stale).
1093        let state = &self.state;
1094        self.event_buffers.retain(|key, buf| {
1095            if let Some(&(timespan, mode, _)) = specs.get(key.0) {
1096                match mode {
1097                    WindowMode::Sliding => buf.evict(now_secs - timespan as i64),
1098                    WindowMode::Tumbling | WindowMode::Session => {
1099                        if !state.contains_key(key) {
1100                            return false;
1101                        }
1102                    }
1103                }
1104            }
1105            !buf.is_empty()
1106        });
1107        self.event_ref_buffers.retain(|key, buf| {
1108            if let Some(&(timespan, mode, _)) = specs.get(key.0) {
1109                match mode {
1110                    WindowMode::Sliding => buf.evict(now_secs - timespan as i64),
1111                    WindowMode::Tumbling | WindowMode::Session => {
1112                        if !state.contains_key(key) {
1113                            return false;
1114                        }
1115                    }
1116                }
1117            }
1118            !buf.is_empty()
1119        });
1120
1121        // Phase 2: Hard cap — if still over limit after time-based eviction (e.g.
1122        // high-cardinality traffic with long windows), drop the stalest entries
1123        // until we're at 90% capacity to avoid evicting on every single event.
1124        if self.state.len() >= self.config.max_state_entries {
1125            let target = self.config.max_state_entries * 9 / 10;
1126            let excess = self.state.len() - target;
1127
1128            log::warn!(
1129                "Correlation state hard cap reached ({} entries, max {}); \
1130                 evicting {} stalest entries to {} (90% capacity). \
1131                 This indicates high-cardinality traffic; consider raising \
1132                 max_state_entries or shortening correlation windows.",
1133                self.state.len(),
1134                self.config.max_state_entries,
1135                excess,
1136                target,
1137            );
1138
1139            // Collect keys with their latest timestamp, sort by oldest first
1140            let mut by_staleness: Vec<_> = self
1141                .state
1142                .iter()
1143                .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1144                .collect();
1145            by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1146
1147            // Drop the oldest entries (and their associated event buffers)
1148            for (key, _) in by_staleness.into_iter().take(excess) {
1149                self.state.remove(&key);
1150                self.last_alert.remove(&key);
1151                self.event_buffers.remove(&key);
1152                self.event_ref_buffers.remove(&key);
1153            }
1154        }
1155
1156        // Phase 3: Evict stale last_alert entries — remove if the suppress window
1157        // has passed or if the corresponding window state no longer exists.
1158        self.last_alert.retain(|key, &mut alert_ts| {
1159            let suppress = if key.0 < self.correlations.len() {
1160                self.correlations[key.0]
1161                    .suppress_secs
1162                    .or(self.config.suppress)
1163                    .unwrap_or(0)
1164            } else {
1165                0
1166            };
1167            (now_secs - alert_ts) < suppress as i64
1168        });
1169    }
1170
1171    /// Number of active state entries (for monitoring).
1172    pub fn state_count(&self) -> usize {
1173        self.state.len()
1174    }
1175
1176    /// Number of detection rules loaded.
1177    pub fn detection_rule_count(&self) -> usize {
1178        self.engine.rule_count()
1179    }
1180
1181    /// Number of correlation rules loaded.
1182    pub fn correlation_rule_count(&self) -> usize {
1183        self.correlations.len()
1184    }
1185
1186    /// Number of active event buffers (for monitoring).
1187    pub fn event_buffer_count(&self) -> usize {
1188        self.event_buffers.len()
1189    }
1190
1191    /// Total compressed bytes across all event buffers (for monitoring).
1192    pub fn event_buffer_bytes(&self) -> usize {
1193        self.event_buffers
1194            .values()
1195            .map(|b| b.compressed_bytes())
1196            .sum()
1197    }
1198
1199    /// Number of active event ref buffers — `Refs` mode (for monitoring).
1200    pub fn event_ref_buffer_count(&self) -> usize {
1201        self.event_ref_buffers.len()
1202    }
1203
1204    /// Access the inner stateless engine.
1205    pub fn engine(&self) -> &Engine {
1206        &self.engine
1207    }
1208
1209    /// Export all mutable correlation state as a serializable snapshot.
1210    ///
1211    /// The snapshot uses stable correlation identifiers (id > name > title)
1212    /// instead of internal indices, so it survives rule reloads as long as
1213    /// the correlation rules keep the same identifiers.
1214    pub fn export_state(&self) -> CorrelationSnapshot {
1215        let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1216        for ((idx, gk), ws) in &self.state {
1217            let corr_id = self.correlation_stable_id(*idx);
1218            windows
1219                .entry(corr_id)
1220                .or_default()
1221                .push((gk.clone(), ws.clone()));
1222        }
1223
1224        let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1225        for ((idx, gk), ts) in &self.last_alert {
1226            let corr_id = self.correlation_stable_id(*idx);
1227            last_alert
1228                .entry(corr_id)
1229                .or_default()
1230                .push((gk.clone(), *ts));
1231        }
1232
1233        let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1234        for ((idx, gk), buf) in &self.event_buffers {
1235            let corr_id = self.correlation_stable_id(*idx);
1236            event_buffers
1237                .entry(corr_id)
1238                .or_default()
1239                .push((gk.clone(), buf.clone()));
1240        }
1241
1242        let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1243            HashMap::new();
1244        for ((idx, gk), buf) in &self.event_ref_buffers {
1245            let corr_id = self.correlation_stable_id(*idx);
1246            event_ref_buffers
1247                .entry(corr_id)
1248                .or_default()
1249                .push((gk.clone(), buf.clone()));
1250        }
1251
1252        CorrelationSnapshot {
1253            version: SNAPSHOT_VERSION,
1254            windows,
1255            last_alert,
1256            event_buffers,
1257            event_ref_buffers,
1258        }
1259    }
1260
1261    /// Import previously exported state, mapping stable identifiers back to
1262    /// current correlation indices. Entries whose identifiers no longer match
1263    /// any loaded correlation are silently dropped.
1264    ///
1265    /// Returns `false` (and imports nothing) if the snapshot version is
1266    /// incompatible with the current schema.
1267    pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1268        if snapshot.version != SNAPSHOT_VERSION {
1269            return false;
1270        }
1271        let id_to_idx = self.build_id_to_index_map();
1272
1273        for (corr_id, groups) in snapshot.windows {
1274            if let Some(&idx) = id_to_idx.get(&corr_id) {
1275                for (gk, ws) in groups {
1276                    self.state.insert((idx, gk), ws);
1277                }
1278            }
1279        }
1280
1281        for (corr_id, groups) in snapshot.last_alert {
1282            if let Some(&idx) = id_to_idx.get(&corr_id) {
1283                for (gk, ts) in groups {
1284                    self.last_alert.insert((idx, gk), ts);
1285                }
1286            }
1287        }
1288
1289        for (corr_id, groups) in snapshot.event_buffers {
1290            if let Some(&idx) = id_to_idx.get(&corr_id) {
1291                for (gk, buf) in groups {
1292                    self.event_buffers.insert((idx, gk), buf);
1293                }
1294            }
1295        }
1296
1297        for (corr_id, groups) in snapshot.event_ref_buffers {
1298            if let Some(&idx) = id_to_idx.get(&corr_id) {
1299                for (gk, buf) in groups {
1300                    self.event_ref_buffers.insert((idx, gk), buf);
1301                }
1302            }
1303        }
1304
1305        true
1306    }
1307
1308    /// Stable identifier for a correlation rule: prefers id, then name, then title.
1309    fn correlation_stable_id(&self, idx: usize) -> String {
1310        let corr = &self.correlations[idx];
1311        corr.id
1312            .clone()
1313            .or_else(|| corr.name.clone())
1314            .unwrap_or_else(|| corr.title.clone())
1315    }
1316
1317    /// Build a reverse map from stable id → current correlation index.
1318    fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1319        self.correlations
1320            .iter()
1321            .enumerate()
1322            .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1323            .collect()
1324    }
1325}
1326
1327impl Default for CorrelationEngine {
1328    fn default() -> Self {
1329        Self::new(CorrelationConfig::default())
1330    }
1331}
1332
1333// =============================================================================
1334// Timestamp parsing helpers
1335// =============================================================================
1336
1337/// Extract a timestamp from an event using the given field names.
1338///
1339/// Standalone version of `CorrelationEngine::extract_event_timestamp` for use
1340/// in contexts where borrowing `&self` is not possible (e.g. rayon closures).
1341fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1342    for field_name in timestamp_fields {
1343        if let Some(val) = event.get_field(field_name)
1344            && let Some(ts) = parse_timestamp_value(&val)
1345        {
1346            return Some(ts);
1347        }
1348    }
1349    None
1350}
1351
1352/// Parse an [`EventValue`] as a Unix epoch timestamp in seconds.
1353fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1354    match val {
1355        EventValue::Int(i) => Some(normalize_epoch(*i)),
1356        EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1357        EventValue::Str(s) => parse_timestamp_string(s),
1358        _ => None,
1359    }
1360}
1361
1362/// Normalize an epoch value: if it looks like milliseconds (> year 33658),
1363/// convert to seconds.
1364fn normalize_epoch(v: i64) -> i64 {
1365    if v > 1_000_000_000_000 { v / 1000 } else { v }
1366}
1367
1368/// Parse a timestamp string. Tries ISO 8601 with timezone, then without.
1369fn parse_timestamp_string(s: &str) -> Option<i64> {
1370    // Try RFC 3339 / ISO 8601 with timezone
1371    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1372        return Some(dt.timestamp());
1373    }
1374
1375    // Try ISO 8601 without timezone (assume UTC)
1376    // Common formats: "2024-07-10T12:30:00", "2024-07-10 12:30:00"
1377    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1378        return Some(Utc.from_utc_datetime(&naive).timestamp());
1379    }
1380    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1381        return Some(Utc.from_utc_datetime(&naive).timestamp());
1382    }
1383
1384    // Try with fractional seconds
1385    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1386        return Some(Utc.from_utc_datetime(&naive).timestamp());
1387    }
1388    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1389        return Some(Utc.from_utc_datetime(&naive).timestamp());
1390    }
1391
1392    None
1393}
1394
1395/// Convert an [`EventValue`] to a string for value_count purposes.
1396fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1397    match v {
1398        EventValue::Str(s) => Some(s.to_string()),
1399        EventValue::Int(n) => Some(n.to_string()),
1400        EventValue::Float(f) => Some(f.to_string()),
1401        EventValue::Bool(b) => Some(b.to_string()),
1402        EventValue::Null => Some("null".to_string()),
1403        _ => None,
1404    }
1405}
1406
1407/// Build a composite distinct-key for `value_count` over one or more fields.
1408///
1409/// Each field's value is rendered with [`value_to_string_for_count`] and the
1410/// rendered parts are joined with `\u{1f}` (the ASCII Unit Separator), which
1411/// is unlikely to occur in normal log data. If any field is missing or has a
1412/// type that does not produce a stable string representation, the event is
1413/// excluded from the distinct count (return `None`), matching the historical
1414/// single-field behavior.
1415fn composite_value_count_key(event: &impl Event, fields: &[String]) -> Option<String> {
1416    // Common case: exactly one field. Avoid the separator overhead.
1417    if let [field_name] = fields {
1418        let val = event.get_field(field_name)?;
1419        return value_to_string_for_count(&val);
1420    }
1421
1422    let mut parts = Vec::with_capacity(fields.len());
1423    for field_name in fields {
1424        let val = event.get_field(field_name)?;
1425        let rendered = value_to_string_for_count(&val)?;
1426        parts.push(rendered);
1427    }
1428    Some(parts.join("\u{1f}"))
1429}
1430
1431/// Convert an [`EventValue`] to f64 for numeric aggregation.
1432fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1433    v.as_f64()
1434}