Skip to main content

rsigma_eval/correlation_engine/
mod.rs

1//! Stateful correlation engine with time-windowed aggregation.
2//!
3//! `CorrelationEngine` wraps the stateless `Engine` and adds support for
4//! Sigma correlation rules: `event_count`, `value_count`, `temporal`,
5//! `temporal_ordered`, `value_sum`, `value_avg`, `value_percentile`,
6//! and `value_median`.
7//!
8//! # Architecture
9//!
10//! 1. Events are first evaluated against detection rules (stateless)
11//! 2. Detection matches update correlation window state (stateful)
12//! 3. When a correlation condition is met, a `CorrelationResult` is emitted
13//! 4. Correlation results can chain into higher-level correlations
14
15#[cfg(test)]
16mod tests;
17mod types;
18
19pub use types::*;
20
21use std::collections::HashMap;
22
23use chrono::{DateTime, TimeZone, Utc};
24
25use rsigma_parser::{CorrelationRule, CorrelationType, SigmaCollection, SigmaRule, WindowMode};
26
27use crate::correlation::{
28    CompiledCorrelation, EventBuffer, EventRefBuffer, GroupKey, WindowDecision, WindowState,
29    apply_window_open, compile_correlation,
30};
31use crate::engine::Engine;
32use crate::error::{EvalError, Result};
33use crate::event::{Event, EventValue};
34use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
35use crate::result::{CorrelationBody, EvaluationResult, ResultBody, RuleHeader};
36
37// =============================================================================
38// Correlation Engine
39// =============================================================================
40
41/// Current snapshot schema version. Bump when the serialized format changes.
42const SNAPSHOT_VERSION: u32 = 1;
43
44/// Stateful correlation engine.
45///
46/// Wraps the stateless `Engine` for detection rules and adds time-windowed
47/// correlation on top. Supports all 7 Sigma correlation types and chaining.
48pub struct CorrelationEngine {
49    /// Inner stateless detection engine.
50    engine: Engine,
51    /// Compiled correlation rules.
52    correlations: Vec<CompiledCorrelation>,
53    /// Maps rule ID/name -> indices into `correlations` that reference it.
54    /// This allows quick lookup: "which correlations care about rule X?"
55    rule_index: HashMap<String, Vec<usize>>,
56    /// Maps detection rule index -> (rule_id, rule_name) for reverse lookup.
57    /// Used to find which correlations a detection match triggers.
58    rule_ids: Vec<(Option<String>, Option<String>)>,
59    /// Per-(correlation_index, group_key) window state.
60    state: HashMap<(usize, GroupKey), WindowState>,
61    /// Last alert timestamp per (correlation_index, group_key) for suppression.
62    last_alert: HashMap<(usize, GroupKey), i64>,
63    /// Per-(correlation_index, group_key) compressed event buffer (`Full` mode).
64    event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
65    /// Per-(correlation_index, group_key) event reference buffer (`Refs` mode).
66    event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
67    /// Set of detection rule IDs/names that are "correlation-only"
68    /// (referenced by correlations where `generate == false`).
69    /// Used to filter detection output when `config.emit_detections == false`.
70    correlation_only_rules: std::collections::HashSet<String>,
71    /// Configuration.
72    config: CorrelationConfig,
73    /// Processing pipelines applied to rules during add_rule.
74    pipelines: Vec<Pipeline>,
75}
76
77impl CorrelationEngine {
78    /// Create a new correlation engine with the given configuration.
79    pub fn new(config: CorrelationConfig) -> Self {
80        CorrelationEngine {
81            engine: Engine::new(),
82            correlations: Vec::new(),
83            rule_index: HashMap::new(),
84            rule_ids: Vec::new(),
85            state: HashMap::new(),
86            last_alert: HashMap::new(),
87            event_buffers: HashMap::new(),
88            event_ref_buffers: HashMap::new(),
89            correlation_only_rules: std::collections::HashSet::new(),
90            config,
91            pipelines: Vec::new(),
92        }
93    }
94
95    /// Add a pipeline to the engine.
96    ///
97    /// Pipelines are applied to rules during `add_rule` / `add_collection`.
98    pub fn add_pipeline(&mut self, pipeline: Pipeline) {
99        self.pipelines.push(pipeline);
100        self.pipelines.sort_by_key(|p| p.priority);
101    }
102
103    /// Set global `include_event` on the inner detection engine.
104    pub fn set_include_event(&mut self, include: bool) {
105        self.engine.set_include_event(include);
106    }
107
108    /// Forward to [`crate::Engine::set_match_detail`] on the inner detection
109    /// engine. Correlation detections inherit the level set here.
110    pub fn set_match_detail(&mut self, level: crate::result::MatchDetailLevel) {
111        self.engine.set_match_detail(level);
112    }
113
114    /// Forward to [`crate::Engine::set_bloom_prefilter`] on the inner
115    /// detection engine. Off by default; the optimization helps only on
116    /// substring-heavy rule sets paired with mostly-non-matching events.
117    pub fn set_bloom_prefilter(&mut self, enabled: bool) {
118        self.engine.set_bloom_prefilter(enabled);
119    }
120
121    /// Forward to [`crate::Engine::set_bloom_max_bytes`] on the inner
122    /// detection engine.
123    pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
124        self.engine.set_bloom_max_bytes(max_bytes);
125    }
126
127    /// Forward to [`crate::Engine::set_cross_rule_ac`] on the inner
128    /// detection engine. Off by default. Available behind the
129    /// `daachorse-index` Cargo feature.
130    #[cfg(feature = "daachorse-index")]
131    pub fn set_cross_rule_ac(&mut self, enabled: bool) {
132        self.engine.set_cross_rule_ac(enabled);
133    }
134
135    /// Set the global correlation event mode.
136    ///
137    /// - `None`: no event storage (default)
138    /// - `Full`: compressed event bodies
139    /// - `Refs`: lightweight timestamp + ID references
140    pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
141        self.config.correlation_event_mode = mode;
142    }
143
144    /// Set the maximum number of events to store per correlation window group.
145    /// Only meaningful when `correlation_event_mode` is not `None`.
146    pub fn set_max_correlation_events(&mut self, max: usize) {
147        self.config.max_correlation_events = max;
148    }
149
150    /// Add a single detection rule.
151    ///
152    /// If pipelines are set, the rule is cloned and transformed before compilation.
153    /// The inner engine receives the already-transformed rule directly (not through
154    /// its own pipeline, to avoid double transformation).
155    pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
156        if self.pipelines.is_empty() {
157            self.apply_custom_attributes(&rule.custom_attributes);
158            self.rule_ids.push((rule.id.clone(), rule.name.clone()));
159            self.engine.add_rule(rule)?;
160        } else {
161            let mut transformed = rule.clone();
162            apply_pipelines(&self.pipelines, &mut transformed)?;
163            self.apply_custom_attributes(&transformed.custom_attributes);
164            self.rule_ids
165                .push((transformed.id.clone(), transformed.name.clone()));
166            // Use compile_rule + add_compiled_rule to bypass inner engine's pipelines
167            let compiled = crate::compiler::compile_rule(&transformed)?;
168            self.engine.add_compiled_rule(compiled);
169        }
170        Ok(())
171    }
172
173    /// Read `rsigma.*` custom attributes from a rule and apply them to the
174    /// engine configuration.  This allows pipelines to influence engine
175    /// behaviour via `SetCustomAttribute` transformations — the same pattern
176    /// used by pySigma backends (e.g. pySigma-backend-loki).
177    ///
178    /// Supported attributes:
179    /// - `rsigma.timestamp_field` — prepends a field name to the timestamp
180    ///   extraction priority list so the correlation engine can find the
181    ///   event timestamp in non-standard field names.
182    /// - `rsigma.suppress` — sets the default suppression window (e.g. `5m`).
183    ///   Only applied when the CLI did not already set `--suppress`.
184    /// - `rsigma.action` — sets the default post-fire action (`alert`/`reset`).
185    ///   Only applied when the CLI did not already set `--action`.
186    fn apply_custom_attributes(
187        &mut self,
188        attrs: &std::collections::HashMap<String, yaml_serde::Value>,
189    ) {
190        // rsigma.timestamp_field — prepend to priority list, skip duplicates
191        if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
192            && !self.config.timestamp_fields.iter().any(|f| f == field)
193        {
194            self.config.timestamp_fields.insert(0, field.to_string());
195        }
196
197        // rsigma.suppress — only when CLI didn't already set one
198        if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
199            && self.config.suppress.is_none()
200            && let Ok(ts) = rsigma_parser::Timespan::parse(val)
201        {
202            self.config.suppress = Some(ts.seconds);
203        }
204
205        // rsigma.action — only when CLI left it at the default (Alert)
206        if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
207            && self.config.action_on_match == CorrelationAction::Alert
208            && let Ok(a) = val.parse::<CorrelationAction>()
209        {
210            self.config.action_on_match = a;
211        }
212    }
213
214    /// Add a single correlation rule.
215    pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
216        let owned;
217        let effective = if self.pipelines.is_empty() {
218            corr
219        } else {
220            owned = {
221                let mut c = corr.clone();
222                apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
223                c
224            };
225            &owned
226        };
227
228        // Apply engine-level custom attributes from the (possibly transformed)
229        // correlation rule (e.g. rsigma.timestamp_field).
230        self.apply_custom_attributes(&effective.custom_attributes);
231
232        let compiled = compile_correlation(effective)?;
233        let idx = self.correlations.len();
234
235        // Index by each referenced rule ID/name
236        for rule_ref in &compiled.rule_refs {
237            self.rule_index
238                .entry(rule_ref.clone())
239                .or_default()
240                .push(idx);
241        }
242
243        // Track correlation-only rules (generate == false is the default)
244        if !compiled.generate {
245            for rule_ref in &compiled.rule_refs {
246                self.correlation_only_rules.insert(rule_ref.clone());
247            }
248        }
249
250        self.correlations.push(compiled);
251        Ok(())
252    }
253
254    /// Add all rules and correlations from a parsed collection.
255    ///
256    /// Detection rules are added first (so they're available for correlation
257    /// references), then correlation rules. Detection rules are compiled
258    /// sequentially and then pushed to the inner engine in a single batch,
259    /// so the inverted index and bloom filter are rebuilt exactly once for
260    /// the whole collection. Without this batching, large rule sets
261    /// (multi-thousand rules) hit an O(N²) rebuild cost on load.
262    pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
263        let mut compiled_batch = Vec::with_capacity(collection.rules.len());
264        if self.pipelines.is_empty() {
265            for rule in &collection.rules {
266                self.apply_custom_attributes(&rule.custom_attributes);
267                self.rule_ids.push((rule.id.clone(), rule.name.clone()));
268                compiled_batch.push(crate::compiler::compile_rule(rule)?);
269            }
270        } else {
271            for rule in &collection.rules {
272                let mut transformed = rule.clone();
273                apply_pipelines(&self.pipelines, &mut transformed)?;
274                self.apply_custom_attributes(&transformed.custom_attributes);
275                self.rule_ids
276                    .push((transformed.id.clone(), transformed.name.clone()));
277                // Bypass the inner engine's pipelines (would double-transform)
278                compiled_batch.push(crate::compiler::compile_rule(&transformed)?);
279            }
280        }
281        self.engine.extend_compiled_rules(compiled_batch);
282        // Apply filter rules to the inner engine's detection rules
283        for filter in &collection.filters {
284            self.engine.apply_filter(filter)?;
285        }
286        for corr in &collection.correlations {
287            self.add_correlation(corr)?;
288        }
289        self.validate_rule_refs()?;
290        self.detect_correlation_cycles()?;
291        Ok(())
292    }
293
294    /// Validate that every correlation's `rule_refs` resolve to at least one
295    /// known detection rule (by ID or name) or another correlation (by ID or name).
296    fn validate_rule_refs(&self) -> Result<()> {
297        let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
298
299        for (id, name) in &self.rule_ids {
300            if let Some(id) = id {
301                known.insert(id.as_str());
302            }
303            if let Some(name) = name {
304                known.insert(name.as_str());
305            }
306        }
307        for corr in &self.correlations {
308            if let Some(ref id) = corr.id {
309                known.insert(id.as_str());
310            }
311            if let Some(ref name) = corr.name {
312                known.insert(name.as_str());
313            }
314        }
315
316        for corr in &self.correlations {
317            for rule_ref in &corr.rule_refs {
318                if !known.contains(rule_ref.as_str()) {
319                    return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
320                }
321            }
322        }
323        Ok(())
324    }
325
326    /// Detect cycles in the correlation reference graph.
327    ///
328    /// Builds a directed graph where each correlation (identified by its id/name)
329    /// has edges to the correlations it references via `rule_refs`. Uses DFS with
330    /// a "gray/black" coloring scheme to detect back-edges (cycles).
331    ///
332    /// Returns `Err(EvalError::CorrelationCycle)` if a cycle is found.
333    fn detect_correlation_cycles(&self) -> Result<()> {
334        // Build a set of all correlation identifiers (id and/or name)
335        let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
336        for (idx, corr) in self.correlations.iter().enumerate() {
337            if let Some(ref id) = corr.id {
338                corr_identifiers.insert(id.as_str(), idx);
339            }
340            if let Some(ref name) = corr.name {
341                corr_identifiers.insert(name.as_str(), idx);
342            }
343        }
344
345        // Build adjacency list: corr index → set of corr indices it references
346        let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
347        for (idx, corr) in self.correlations.iter().enumerate() {
348            for rule_ref in &corr.rule_refs {
349                if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
350                    adj[idx].push(target_idx);
351                }
352            }
353        }
354
355        // DFS cycle detection with three states: white (unvisited), gray (in stack), black (done)
356        let mut state = vec![0u8; self.correlations.len()]; // 0=white, 1=gray, 2=black
357        let mut path: Vec<usize> = Vec::new();
358
359        for start in 0..self.correlations.len() {
360            if state[start] == 0
361                && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
362            {
363                let names: Vec<String> = cycle
364                    .iter()
365                    .map(|&i| {
366                        self.correlations[i]
367                            .id
368                            .as_deref()
369                            .or(self.correlations[i].name.as_deref())
370                            .unwrap_or(&self.correlations[i].title)
371                            .to_string()
372                    })
373                    .collect();
374                return Err(crate::error::EvalError::CorrelationCycle(
375                    names.join(" -> "),
376                ));
377            }
378        }
379        Ok(())
380    }
381
382    /// DFS helper that returns the cycle path if a back-edge is found.
383    fn dfs_find_cycle(
384        node: usize,
385        adj: &[Vec<usize>],
386        state: &mut [u8],
387        path: &mut Vec<usize>,
388    ) -> Option<Vec<usize>> {
389        state[node] = 1; // gray
390        path.push(node);
391
392        for &next in &adj[node] {
393            if state[next] == 1 {
394                // Back-edge found — extract cycle from path
395                if let Some(pos) = path.iter().position(|&n| n == next) {
396                    let mut cycle = path[pos..].to_vec();
397                    cycle.push(next); // close the cycle
398                    return Some(cycle);
399                }
400            }
401            if state[next] == 0
402                && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
403            {
404                return Some(cycle);
405            }
406        }
407
408        path.pop();
409        state[node] = 2; // black
410        None
411    }
412
413    /// Process an event, extracting the timestamp from configured event fields.
414    ///
415    /// When no timestamp field is found, the `timestamp_fallback` policy applies:
416    /// - `WallClock`: use `Utc::now()` (good for real-time streaming)
417    /// - `Skip`: return detections only, skip correlation state updates
418    pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
419        let all_detections = self.engine.evaluate(event);
420
421        let ts = match self.extract_event_timestamp(event) {
422            Some(ts) => ts,
423            None => match self.config.timestamp_fallback {
424                TimestampFallback::WallClock => Utc::now().timestamp(),
425                TimestampFallback::Skip => {
426                    // Still run detection (stateless), but skip correlation
427                    return self.filter_detections(all_detections);
428                }
429            },
430        };
431        self.process_with_detections(event, all_detections, ts)
432    }
433
434    /// Process an event with an explicit Unix epoch timestamp (seconds).
435    ///
436    /// The timestamp is clamped to `[0, i64::MAX / 2]` to prevent overflow
437    /// when adding timespan durations internally.
438    pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
439        let all_detections = self.engine.evaluate(event);
440        self.process_with_detections(event, all_detections, timestamp_secs)
441    }
442
443    /// Process an event with pre-computed detection results.
444    ///
445    /// Enables external parallelism: callers can run detection (via
446    /// [`evaluate`](Self::evaluate)) in parallel, then feed results here
447    /// sequentially for stateful correlation.
448    pub fn process_with_detections(
449        &mut self,
450        event: &impl Event,
451        all_detections: Vec<EvaluationResult>,
452        timestamp_secs: i64,
453    ) -> ProcessResult {
454        let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
455
456        // Memory management — evict before adding new state to enforce limit
457        if self.state.len() >= self.config.max_state_entries {
458            self.evict_all(timestamp_secs);
459        }
460
461        // Feed detection matches into correlations
462        let mut correlations: Vec<EvaluationResult> = Vec::new();
463        self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
464
465        // Chain — correlation results may trigger higher-level correlations
466        self.chain_correlations(&correlations, timestamp_secs);
467
468        // Filter detections by generate flag, then append the correlations.
469        let mut out = self.filter_detections(all_detections);
470        out.extend(correlations);
471        out
472    }
473
474    /// Run stateless detection only (no correlation), delegating to the inner engine.
475    ///
476    /// Returns one [`EvaluationResult`] per matched detection. Takes `&self`
477    /// so it can be called concurrently from multiple threads (e.g. via
478    /// `rayon::par_iter`) while the mutable correlation phase runs
479    /// sequentially afterwards.
480    pub fn evaluate(&self, event: &impl Event) -> Vec<EvaluationResult> {
481        self.engine.evaluate(event)
482    }
483
484    /// Process a batch of events: parallel detection, then sequential correlation.
485    ///
486    /// When the `parallel` feature is enabled, the stateless detection phase runs
487    /// concurrently via rayon. Timestamp extraction also runs in the parallel
488    /// phase (it borrows `&self.config` immutably). After `collect()` releases the
489    /// immutable borrows, each event's pre-computed detections are fed into the
490    /// stateful correlation engine sequentially.
491    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
492        // Borrow split: take immutable refs to fields needed for the parallel phase.
493        // These are released by collect() before the sequential &mut self phase.
494        let engine = &self.engine;
495        let ts_fields = &self.config.timestamp_fields;
496
497        let batch_results: Vec<(Vec<EvaluationResult>, Option<i64>)> = {
498            #[cfg(feature = "parallel")]
499            {
500                use rayon::prelude::*;
501                events
502                    .par_iter()
503                    .map(|e| {
504                        let detections = engine.evaluate(e);
505                        let ts = extract_event_ts(e, ts_fields);
506                        (detections, ts)
507                    })
508                    .collect()
509            }
510            #[cfg(not(feature = "parallel"))]
511            {
512                events
513                    .iter()
514                    .map(|e| {
515                        let detections = engine.evaluate(e);
516                        let ts = extract_event_ts(e, ts_fields);
517                        (detections, ts)
518                    })
519                    .collect()
520            }
521        };
522
523        // Sequential correlation phase
524        let mut results = Vec::with_capacity(events.len());
525        for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
526            match ts_opt {
527                Some(ts) => {
528                    results.push(self.process_with_detections(event, detections, ts));
529                }
530                None => match self.config.timestamp_fallback {
531                    TimestampFallback::WallClock => {
532                        let ts = Utc::now().timestamp();
533                        results.push(self.process_with_detections(event, detections, ts));
534                    }
535                    TimestampFallback::Skip => {
536                        // Still return detection results, but skip correlation
537                        results.push(self.filter_detections(detections));
538                    }
539                },
540            }
541        }
542        results
543    }
544
545    /// Filter detections by the `generate` flag / `emit_detections` config.
546    ///
547    /// If `emit_detections` is false and some rules are correlation-only,
548    /// their detection output is suppressed.
549    fn filter_detections(&self, all_detections: Vec<EvaluationResult>) -> Vec<EvaluationResult> {
550        if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
551            all_detections
552                .into_iter()
553                .filter(|m| {
554                    let id_match = m
555                        .header
556                        .rule_id
557                        .as_ref()
558                        .is_some_and(|id| self.correlation_only_rules.contains(id));
559                    !id_match
560                })
561                .collect()
562        } else {
563            all_detections
564        }
565    }
566
567    /// Feed detection matches into correlation window states.
568    fn feed_detections(
569        &mut self,
570        event: &impl Event,
571        detections: &[EvaluationResult],
572        ts: i64,
573        out: &mut Vec<EvaluationResult>,
574    ) {
575        // Collect all (corr_idx, rule_id, rule_name) tuples upfront to avoid
576        // borrow conflicts between self.rule_ids and self.update_correlation.
577        let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
578
579        for det in detections {
580            // Use the MatchResult's rule_id to find the original rule's ID/name.
581            // We also look up by rule_id in our rule_ids table for the name.
582            let (rule_id, rule_name) = self.find_rule_identity(det);
583
584            // Collect correlation indices that reference this rule
585            let mut corr_indices = Vec::new();
586            if let Some(ref id) = rule_id
587                && let Some(indices) = self.rule_index.get(id)
588            {
589                corr_indices.extend(indices);
590            }
591            if let Some(ref name) = rule_name
592                && let Some(indices) = self.rule_index.get(name)
593            {
594                corr_indices.extend(indices);
595            }
596
597            corr_indices.sort_unstable();
598            corr_indices.dedup();
599
600            for &corr_idx in &corr_indices {
601                work.push((corr_idx, rule_id.clone(), rule_name.clone()));
602            }
603        }
604
605        for (corr_idx, rule_id, rule_name) in work {
606            self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
607        }
608    }
609
610    /// Find the (id, name) for a detection match by searching our rule_ids table.
611    fn find_rule_identity(&self, det: &EvaluationResult) -> (Option<String>, Option<String>) {
612        // First, try to find by matching rule_id in our table
613        if let Some(ref match_id) = det.header.rule_id {
614            for (id, name) in &self.rule_ids {
615                if id.as_deref() == Some(match_id.as_str()) {
616                    return (id.clone(), name.clone());
617                }
618            }
619        }
620        // Fall back to using just the EvaluationResult's rule_id
621        (det.header.rule_id.clone(), None)
622    }
623
624    /// Resolve the event mode for a given correlation.
625    fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
626        let corr = &self.correlations[corr_idx];
627        corr.event_mode
628            .unwrap_or(self.config.correlation_event_mode)
629    }
630
631    /// Resolve the max events cap for a given correlation.
632    fn resolve_max_events(&self, corr_idx: usize) -> usize {
633        let corr = &self.correlations[corr_idx];
634        corr.max_events
635            .unwrap_or(self.config.max_correlation_events)
636    }
637
638    /// Resolve the per-group window-state entry cap for a given correlation.
639    /// `None` means unbounded.
640    fn resolve_max_group_entries(&self, corr_idx: usize) -> Option<usize> {
641        let corr = &self.correlations[corr_idx];
642        corr.max_group_entries.or(self.config.max_group_entries)
643    }
644
645    /// Update a single correlation's state and check its condition.
646    fn update_correlation(
647        &mut self,
648        corr_idx: usize,
649        event: &impl Event,
650        ts: i64,
651        rule_id: &Option<String>,
652        rule_name: &Option<String>,
653        out: &mut Vec<EvaluationResult>,
654    ) {
655        // Borrow the correlation by reference — no cloning needed.  Rust allows
656        // simultaneous &self.correlations and &mut self.state / &mut self.last_alert
657        // because they are disjoint struct fields.
658        let corr = &self.correlations[corr_idx];
659        let corr_type = corr.correlation_type;
660        let timespan = corr.timespan_secs;
661        let window_mode = corr.window_mode;
662        let gap_secs = corr.gap_secs;
663        let level = corr.level;
664        let suppress_secs = corr.suppress_secs.or(self.config.suppress);
665        let action = corr.action.unwrap_or(self.config.action_on_match);
666        let event_mode = self.resolve_event_mode(corr_idx);
667        let max_events = self.resolve_max_events(corr_idx);
668        let max_group_entries = self.resolve_max_group_entries(corr_idx);
669
670        // Determine the rule_ref strings for alias resolution and temporal tracking.
671        let mut ref_strs: Vec<&str> = Vec::new();
672        if let Some(id) = rule_id.as_deref() {
673            ref_strs.push(id);
674        }
675        if let Some(name) = rule_name.as_deref() {
676            ref_strs.push(name);
677        }
678        let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
679
680        // Extract group key
681        let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
682
683        // Get or create window state
684        let state_key = (corr_idx, group_key.clone());
685        let state = self
686            .state
687            .entry(state_key.clone())
688            .or_insert_with(|| WindowState::new_for(corr_type));
689
690        // Apply the window's pre-insert maintenance (sliding evict, tumbling
691        // bucket reset/late-event discard, or session gap/cap reset). On
692        // `Reset` the event buffers below are cleared in sync; on `Discard`
693        // (a late arrival in an earlier tumbling bucket) the event is dropped
694        // without touching the state or buffers.
695        let cutoff = ts - timespan as i64;
696        let decision = apply_window_open(state, ts, timespan, window_mode, gap_secs);
697        if decision == WindowDecision::Discard {
698            return;
699        }
700        let reset = decision == WindowDecision::Reset;
701
702        // Push the new event into the state
703        match corr_type {
704            CorrelationType::EventCount => {
705                state.push_event_count(ts);
706            }
707            CorrelationType::ValueCount => {
708                if let Some(ref fields) = corr.condition.field
709                    && let Some(key) = composite_value_count_key(event, fields)
710                {
711                    state.push_value_count(ts, key);
712                }
713            }
714            CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
715                state.push_temporal(ts, rule_ref);
716            }
717            CorrelationType::ValueSum
718            | CorrelationType::ValueAvg
719            | CorrelationType::ValuePercentile
720            | CorrelationType::ValueMedian => {
721                if let Some(ref fields) = corr.condition.field
722                    && let Some(field_name) = fields.first()
723                    && let Some(val) = event.get_field(field_name)
724                    && let Some(n) = value_to_f64_ev(&val)
725                {
726                    state.push_numeric(ts, n);
727                }
728            }
729        }
730
731        // Enforce the per-group entry cap. Session windows keep their oldest
732        // entry as the span anchor so truncation cannot silently extend the
733        // `timespan` cap.
734        if let Some(cap) = max_group_entries {
735            state.truncate_oldest(cap, window_mode == WindowMode::Session);
736        }
737
738        // Push event into buffer based on event mode. Keep the buffer's retained
739        // events aligned with the window state: sliding evicts by the trailing
740        // cutoff, while tumbling/session clear the buffer when the window reset.
741        match event_mode {
742            CorrelationEventMode::Full => {
743                let buf = self
744                    .event_buffers
745                    .entry(state_key.clone())
746                    .or_insert_with(|| EventBuffer::new(max_events));
747                if window_mode == rsigma_parser::WindowMode::Sliding {
748                    buf.evict(cutoff);
749                } else if reset {
750                    buf.clear();
751                }
752                let json = event.to_json();
753                buf.push(ts, &json);
754            }
755            CorrelationEventMode::Refs => {
756                let buf = self
757                    .event_ref_buffers
758                    .entry(state_key.clone())
759                    .or_insert_with(|| EventRefBuffer::new(max_events));
760                if window_mode == rsigma_parser::WindowMode::Sliding {
761                    buf.evict(cutoff);
762                } else if reset {
763                    buf.clear();
764                }
765                let json = event.to_json();
766                buf.push(ts, &json);
767            }
768            CorrelationEventMode::None => {}
769        }
770
771        // Check condition — after this, `state` is no longer used (NLL drops the borrow).
772        let fired = state.check_condition(
773            &corr.condition,
774            corr_type,
775            &corr.rule_refs,
776            corr.extended_expr.as_ref(),
777        );
778
779        if let Some(agg_value) = fired {
780            let alert_key = (corr_idx, group_key.clone());
781
782            // Suppression check: skip if we've already alerted within the suppress window
783            let suppressed = if let Some(suppress) = suppress_secs {
784                if let Some(&last_ts) = self.last_alert.get(&alert_key) {
785                    (ts - last_ts) < suppress as i64
786                } else {
787                    false
788                }
789            } else {
790                false
791            };
792
793            if !suppressed {
794                // Retrieve stored events / refs based on mode
795                let (events, event_refs) = match event_mode {
796                    CorrelationEventMode::Full => {
797                        let stored = self
798                            .event_buffers
799                            .get(&alert_key)
800                            .map(|buf| buf.decompress_all())
801                            .unwrap_or_default();
802                        (Some(stored), None)
803                    }
804                    CorrelationEventMode::Refs => {
805                        let stored = self
806                            .event_ref_buffers
807                            .get(&alert_key)
808                            .map(|buf| buf.refs())
809                            .unwrap_or_default();
810                        (None, Some(stored))
811                    }
812                    CorrelationEventMode::None => (None, None),
813                };
814
815                // Only clone title/id/tags when we actually produce output
816                let corr = &self.correlations[corr_idx];
817                let result = EvaluationResult {
818                    header: RuleHeader {
819                        rule_title: corr.title.clone(),
820                        rule_id: corr.id.clone(),
821                        level,
822                        tags: corr.tags.clone(),
823                        custom_attributes: corr.custom_attributes.clone(),
824                        enrichments: None,
825                    },
826                    body: ResultBody::Correlation(CorrelationBody {
827                        correlation_type: corr_type,
828                        group_key: group_key.to_pairs(&corr.group_by),
829                        aggregated_value: agg_value,
830                        timespan_secs: timespan,
831                        events,
832                        event_refs,
833                    }),
834                };
835                out.push(result);
836
837                // Record alert time for suppression
838                self.last_alert.insert(alert_key.clone(), ts);
839
840                // Action on match
841                if action == CorrelationAction::Reset {
842                    if let Some(state) = self.state.get_mut(&alert_key) {
843                        state.clear();
844                    }
845                    if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
846                        buf.clear();
847                    }
848                    if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
849                        buf.clear();
850                    }
851                }
852            }
853        }
854    }
855
856    /// Propagate correlation results to higher-level correlations (chaining).
857    ///
858    /// When a correlation fires, any correlation that references it (by ID or name)
859    /// is updated. Limits chain depth to 10 to prevent infinite loops.
860    fn chain_correlations(&mut self, fired: &[EvaluationResult], ts: i64) {
861        const MAX_CHAIN_DEPTH: usize = 10;
862        let mut pending: Vec<EvaluationResult> = fired.to_vec();
863        let mut depth = 0;
864
865        while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
866            depth += 1;
867
868            // Collect work items: (corr_idx, group_key_pairs, fired_ref)
869            #[allow(clippy::type_complexity)]
870            let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
871            for result in &pending {
872                // Only correlation results chain. Detections never reach here.
873                let Some(body) = result.as_correlation() else {
874                    continue;
875                };
876                if let Some(ref id) = result.header.rule_id
877                    && let Some(indices) = self.rule_index.get(id)
878                {
879                    let fired_ref = result
880                        .header
881                        .rule_id
882                        .as_deref()
883                        .unwrap_or(&result.header.rule_title)
884                        .to_string();
885                    for &corr_idx in indices {
886                        work.push((corr_idx, body.group_key.clone(), fired_ref.clone()));
887                    }
888                }
889            }
890
891            let mut next_pending = Vec::new();
892            for (corr_idx, group_key_pairs, fired_ref) in work {
893                let corr = &self.correlations[corr_idx];
894                let corr_type = corr.correlation_type;
895                let timespan = corr.timespan_secs;
896                let window_mode = corr.window_mode;
897                let gap_secs = corr.gap_secs;
898                let level = corr.level;
899
900                let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
901                let state_key = (corr_idx, group_key.clone());
902                let state = self
903                    .state
904                    .entry(state_key)
905                    .or_insert_with(|| WindowState::new_for(corr_type));
906
907                // Late arrivals in an earlier tumbling bucket are discarded;
908                // chained correlations keep no event buffers, so `Reset` needs
909                // no extra bookkeeping here.
910                if apply_window_open(state, ts, timespan, window_mode, gap_secs)
911                    == WindowDecision::Discard
912                {
913                    continue;
914                }
915
916                match corr_type {
917                    CorrelationType::EventCount => {
918                        state.push_event_count(ts);
919                    }
920                    CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
921                        state.push_temporal(ts, &fired_ref);
922                    }
923                    _ => {
924                        state.push_event_count(ts);
925                    }
926                }
927
928                // Same per-group cap as the direct path; session windows
929                // keep the span anchor.
930                if let Some(cap) = corr.max_group_entries.or(self.config.max_group_entries) {
931                    state.truncate_oldest(cap, window_mode == WindowMode::Session);
932                }
933
934                let fired = state.check_condition(
935                    &corr.condition,
936                    corr_type,
937                    &corr.rule_refs,
938                    corr.extended_expr.as_ref(),
939                );
940
941                if let Some(agg_value) = fired {
942                    let corr = &self.correlations[corr_idx];
943                    next_pending.push(EvaluationResult {
944                        header: RuleHeader {
945                            rule_title: corr.title.clone(),
946                            rule_id: corr.id.clone(),
947                            level,
948                            tags: corr.tags.clone(),
949                            custom_attributes: corr.custom_attributes.clone(),
950                            enrichments: None,
951                        },
952                        body: ResultBody::Correlation(CorrelationBody {
953                            correlation_type: corr_type,
954                            group_key: group_key.to_pairs(&corr.group_by),
955                            aggregated_value: agg_value,
956                            timespan_secs: timespan,
957                            // Chained correlations don't include events
958                            // (they aggregate over correlation results, not
959                            // raw events)
960                            events: None,
961                            event_refs: None,
962                        }),
963                    });
964                }
965            }
966
967            pending = next_pending;
968        }
969
970        if !pending.is_empty() {
971            log::warn!(
972                "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
973                 {} pending result(s) were not propagated further. \
974                 This may indicate a cycle in correlation references.",
975                pending.len()
976            );
977        }
978    }
979
980    // =========================================================================
981    // Timestamp extraction
982    // =========================================================================
983
984    /// Extract a Unix epoch timestamp (seconds) from an event.
985    ///
986    /// Tries each configured timestamp field in order. Supports:
987    /// - Numeric values (epoch seconds, or epoch millis if > 1e12)
988    /// - ISO 8601 strings (e.g., "2024-07-10T12:30:00Z")
989    ///
990    /// Returns `None` if no field yields a valid timestamp.
991    fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
992        for field_name in &self.config.timestamp_fields {
993            if let Some(val) = event.get_field(field_name)
994                && let Some(ts) = parse_timestamp_value(&val)
995            {
996                return Some(ts);
997            }
998        }
999        None
1000    }
1001
1002    // =========================================================================
1003    // State management
1004    // =========================================================================
1005
1006    /// Manually evict all expired state entries.
1007    pub fn evict_expired(&mut self, now_secs: i64) {
1008        self.evict_all(now_secs);
1009    }
1010
1011    /// Evict expired entries and remove empty states.
1012    fn evict_all(&mut self, now_secs: i64) {
1013        // Phase 1: Time-based eviction — remove entries outside their correlation
1014        // window. Eviction is window-mode aware:
1015        //
1016        // - Sliding: trim entries older than the trailing cutoff, as always.
1017        // - Tumbling/session: never trim from the front — that would forget the
1018        //   bucket/session start and silently weaken the `timespan` cap (the
1019        //   window would drift toward sliding semantics). Instead, drop the
1020        //   whole group once it is stale (no event within its bucket span /
1021        //   session gap), which is exactly the point at which the next arrival
1022        //   would reset it anyway.
1023        let specs: Vec<(u64, WindowMode, Option<u64>)> = self
1024            .correlations
1025            .iter()
1026            .map(|c| (c.timespan_secs, c.window_mode, c.gap_secs))
1027            .collect();
1028
1029        self.state.retain(|&(corr_idx, _), state| {
1030            if let Some(&(timespan, mode, gap)) = specs.get(corr_idx) {
1031                match mode {
1032                    WindowMode::Sliding => {
1033                        state.evict(now_secs - timespan as i64);
1034                    }
1035                    WindowMode::Tumbling | WindowMode::Session => {
1036                        let staleness = if mode == WindowMode::Session {
1037                            gap.unwrap_or(timespan)
1038                        } else {
1039                            timespan
1040                        } as i64;
1041                        if state
1042                            .latest_timestamp()
1043                            .is_some_and(|last| now_secs - last > staleness)
1044                        {
1045                            state.clear();
1046                        }
1047                    }
1048                }
1049            }
1050            !state.is_empty()
1051        });
1052
1053        // Evict event buffers in sync with window state: sliding buffers trim by
1054        // the trailing cutoff, tumbling/session buffers live and die with their
1055        // window state (dropped above when the group went stale).
1056        let state = &self.state;
1057        self.event_buffers.retain(|key, buf| {
1058            if let Some(&(timespan, mode, _)) = specs.get(key.0) {
1059                match mode {
1060                    WindowMode::Sliding => buf.evict(now_secs - timespan as i64),
1061                    WindowMode::Tumbling | WindowMode::Session => {
1062                        if !state.contains_key(key) {
1063                            return false;
1064                        }
1065                    }
1066                }
1067            }
1068            !buf.is_empty()
1069        });
1070        self.event_ref_buffers.retain(|key, buf| {
1071            if let Some(&(timespan, mode, _)) = specs.get(key.0) {
1072                match mode {
1073                    WindowMode::Sliding => buf.evict(now_secs - timespan as i64),
1074                    WindowMode::Tumbling | WindowMode::Session => {
1075                        if !state.contains_key(key) {
1076                            return false;
1077                        }
1078                    }
1079                }
1080            }
1081            !buf.is_empty()
1082        });
1083
1084        // Phase 2: Hard cap — if still over limit after time-based eviction (e.g.
1085        // high-cardinality traffic with long windows), drop the stalest entries
1086        // until we're at 90% capacity to avoid evicting on every single event.
1087        if self.state.len() >= self.config.max_state_entries {
1088            let target = self.config.max_state_entries * 9 / 10;
1089            let excess = self.state.len() - target;
1090
1091            log::warn!(
1092                "Correlation state hard cap reached ({} entries, max {}); \
1093                 evicting {} stalest entries to {} (90% capacity). \
1094                 This indicates high-cardinality traffic; consider raising \
1095                 max_state_entries or shortening correlation windows.",
1096                self.state.len(),
1097                self.config.max_state_entries,
1098                excess,
1099                target,
1100            );
1101
1102            // Collect keys with their latest timestamp, sort by oldest first
1103            let mut by_staleness: Vec<_> = self
1104                .state
1105                .iter()
1106                .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1107                .collect();
1108            by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1109
1110            // Drop the oldest entries (and their associated event buffers)
1111            for (key, _) in by_staleness.into_iter().take(excess) {
1112                self.state.remove(&key);
1113                self.last_alert.remove(&key);
1114                self.event_buffers.remove(&key);
1115                self.event_ref_buffers.remove(&key);
1116            }
1117        }
1118
1119        // Phase 3: Evict stale last_alert entries — remove if the suppress window
1120        // has passed or if the corresponding window state no longer exists.
1121        self.last_alert.retain(|key, &mut alert_ts| {
1122            let suppress = if key.0 < self.correlations.len() {
1123                self.correlations[key.0]
1124                    .suppress_secs
1125                    .or(self.config.suppress)
1126                    .unwrap_or(0)
1127            } else {
1128                0
1129            };
1130            (now_secs - alert_ts) < suppress as i64
1131        });
1132    }
1133
1134    /// Number of active state entries (for monitoring).
1135    pub fn state_count(&self) -> usize {
1136        self.state.len()
1137    }
1138
1139    /// Number of detection rules loaded.
1140    pub fn detection_rule_count(&self) -> usize {
1141        self.engine.rule_count()
1142    }
1143
1144    /// Number of correlation rules loaded.
1145    pub fn correlation_rule_count(&self) -> usize {
1146        self.correlations.len()
1147    }
1148
1149    /// Number of active event buffers (for monitoring).
1150    pub fn event_buffer_count(&self) -> usize {
1151        self.event_buffers.len()
1152    }
1153
1154    /// Total compressed bytes across all event buffers (for monitoring).
1155    pub fn event_buffer_bytes(&self) -> usize {
1156        self.event_buffers
1157            .values()
1158            .map(|b| b.compressed_bytes())
1159            .sum()
1160    }
1161
1162    /// Number of active event ref buffers — `Refs` mode (for monitoring).
1163    pub fn event_ref_buffer_count(&self) -> usize {
1164        self.event_ref_buffers.len()
1165    }
1166
1167    /// Access the inner stateless engine.
1168    pub fn engine(&self) -> &Engine {
1169        &self.engine
1170    }
1171
1172    /// Export all mutable correlation state as a serializable snapshot.
1173    ///
1174    /// The snapshot uses stable correlation identifiers (id > name > title)
1175    /// instead of internal indices, so it survives rule reloads as long as
1176    /// the correlation rules keep the same identifiers.
1177    pub fn export_state(&self) -> CorrelationSnapshot {
1178        let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1179        for ((idx, gk), ws) in &self.state {
1180            let corr_id = self.correlation_stable_id(*idx);
1181            windows
1182                .entry(corr_id)
1183                .or_default()
1184                .push((gk.clone(), ws.clone()));
1185        }
1186
1187        let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1188        for ((idx, gk), ts) in &self.last_alert {
1189            let corr_id = self.correlation_stable_id(*idx);
1190            last_alert
1191                .entry(corr_id)
1192                .or_default()
1193                .push((gk.clone(), *ts));
1194        }
1195
1196        let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1197        for ((idx, gk), buf) in &self.event_buffers {
1198            let corr_id = self.correlation_stable_id(*idx);
1199            event_buffers
1200                .entry(corr_id)
1201                .or_default()
1202                .push((gk.clone(), buf.clone()));
1203        }
1204
1205        let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1206            HashMap::new();
1207        for ((idx, gk), buf) in &self.event_ref_buffers {
1208            let corr_id = self.correlation_stable_id(*idx);
1209            event_ref_buffers
1210                .entry(corr_id)
1211                .or_default()
1212                .push((gk.clone(), buf.clone()));
1213        }
1214
1215        CorrelationSnapshot {
1216            version: SNAPSHOT_VERSION,
1217            windows,
1218            last_alert,
1219            event_buffers,
1220            event_ref_buffers,
1221        }
1222    }
1223
1224    /// Import previously exported state, mapping stable identifiers back to
1225    /// current correlation indices. Entries whose identifiers no longer match
1226    /// any loaded correlation are silently dropped.
1227    ///
1228    /// Returns `false` (and imports nothing) if the snapshot version is
1229    /// incompatible with the current schema.
1230    pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1231        if snapshot.version != SNAPSHOT_VERSION {
1232            return false;
1233        }
1234        let id_to_idx = self.build_id_to_index_map();
1235
1236        for (corr_id, groups) in snapshot.windows {
1237            if let Some(&idx) = id_to_idx.get(&corr_id) {
1238                for (gk, ws) in groups {
1239                    self.state.insert((idx, gk), ws);
1240                }
1241            }
1242        }
1243
1244        for (corr_id, groups) in snapshot.last_alert {
1245            if let Some(&idx) = id_to_idx.get(&corr_id) {
1246                for (gk, ts) in groups {
1247                    self.last_alert.insert((idx, gk), ts);
1248                }
1249            }
1250        }
1251
1252        for (corr_id, groups) in snapshot.event_buffers {
1253            if let Some(&idx) = id_to_idx.get(&corr_id) {
1254                for (gk, buf) in groups {
1255                    self.event_buffers.insert((idx, gk), buf);
1256                }
1257            }
1258        }
1259
1260        for (corr_id, groups) in snapshot.event_ref_buffers {
1261            if let Some(&idx) = id_to_idx.get(&corr_id) {
1262                for (gk, buf) in groups {
1263                    self.event_ref_buffers.insert((idx, gk), buf);
1264                }
1265            }
1266        }
1267
1268        true
1269    }
1270
1271    /// Stable identifier for a correlation rule: prefers id, then name, then title.
1272    fn correlation_stable_id(&self, idx: usize) -> String {
1273        let corr = &self.correlations[idx];
1274        corr.id
1275            .clone()
1276            .or_else(|| corr.name.clone())
1277            .unwrap_or_else(|| corr.title.clone())
1278    }
1279
1280    /// Build a reverse map from stable id → current correlation index.
1281    fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1282        self.correlations
1283            .iter()
1284            .enumerate()
1285            .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1286            .collect()
1287    }
1288}
1289
1290impl Default for CorrelationEngine {
1291    fn default() -> Self {
1292        Self::new(CorrelationConfig::default())
1293    }
1294}
1295
1296// =============================================================================
1297// Timestamp parsing helpers
1298// =============================================================================
1299
1300/// Extract a timestamp from an event using the given field names.
1301///
1302/// Standalone version of `CorrelationEngine::extract_event_timestamp` for use
1303/// in contexts where borrowing `&self` is not possible (e.g. rayon closures).
1304fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1305    for field_name in timestamp_fields {
1306        if let Some(val) = event.get_field(field_name)
1307            && let Some(ts) = parse_timestamp_value(&val)
1308        {
1309            return Some(ts);
1310        }
1311    }
1312    None
1313}
1314
1315/// Parse an [`EventValue`] as a Unix epoch timestamp in seconds.
1316fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1317    match val {
1318        EventValue::Int(i) => Some(normalize_epoch(*i)),
1319        EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1320        EventValue::Str(s) => parse_timestamp_string(s),
1321        _ => None,
1322    }
1323}
1324
1325/// Normalize an epoch value: if it looks like milliseconds (> year 33658),
1326/// convert to seconds.
1327fn normalize_epoch(v: i64) -> i64 {
1328    if v > 1_000_000_000_000 { v / 1000 } else { v }
1329}
1330
1331/// Parse a timestamp string. Tries ISO 8601 with timezone, then without.
1332fn parse_timestamp_string(s: &str) -> Option<i64> {
1333    // Try RFC 3339 / ISO 8601 with timezone
1334    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1335        return Some(dt.timestamp());
1336    }
1337
1338    // Try ISO 8601 without timezone (assume UTC)
1339    // Common formats: "2024-07-10T12:30:00", "2024-07-10 12:30:00"
1340    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1341        return Some(Utc.from_utc_datetime(&naive).timestamp());
1342    }
1343    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1344        return Some(Utc.from_utc_datetime(&naive).timestamp());
1345    }
1346
1347    // Try with fractional seconds
1348    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1349        return Some(Utc.from_utc_datetime(&naive).timestamp());
1350    }
1351    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1352        return Some(Utc.from_utc_datetime(&naive).timestamp());
1353    }
1354
1355    None
1356}
1357
1358/// Convert an [`EventValue`] to a string for value_count purposes.
1359fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1360    match v {
1361        EventValue::Str(s) => Some(s.to_string()),
1362        EventValue::Int(n) => Some(n.to_string()),
1363        EventValue::Float(f) => Some(f.to_string()),
1364        EventValue::Bool(b) => Some(b.to_string()),
1365        EventValue::Null => Some("null".to_string()),
1366        _ => None,
1367    }
1368}
1369
1370/// Build a composite distinct-key for `value_count` over one or more fields.
1371///
1372/// Each field's value is rendered with [`value_to_string_for_count`] and the
1373/// rendered parts are joined with `\u{1f}` (the ASCII Unit Separator), which
1374/// is unlikely to occur in normal log data. If any field is missing or has a
1375/// type that does not produce a stable string representation, the event is
1376/// excluded from the distinct count (return `None`), matching the historical
1377/// single-field behavior.
1378fn composite_value_count_key(event: &impl Event, fields: &[String]) -> Option<String> {
1379    // Common case: exactly one field. Avoid the separator overhead.
1380    if let [field_name] = fields {
1381        let val = event.get_field(field_name)?;
1382        return value_to_string_for_count(&val);
1383    }
1384
1385    let mut parts = Vec::with_capacity(fields.len());
1386    for field_name in fields {
1387        let val = event.get_field(field_name)?;
1388        let rendered = value_to_string_for_count(&val)?;
1389        parts.push(rendered);
1390    }
1391    Some(parts.join("\u{1f}"))
1392}
1393
1394/// Convert an [`EventValue`] to f64 for numeric aggregation.
1395fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1396    v.as_f64()
1397}