Skip to main content

rsigma_eval/correlation_engine/
mod.rs

1//! Stateful correlation engine with time-windowed aggregation.
2//!
3//! `CorrelationEngine` wraps the stateless `Engine` and adds support for
4//! Sigma correlation rules: `event_count`, `value_count`, `temporal`,
5//! `temporal_ordered`, `value_sum`, `value_avg`, `value_percentile`,
6//! and `value_median`.
7//!
8//! # Architecture
9//!
10//! 1. Events are first evaluated against detection rules (stateless)
11//! 2. Detection matches update correlation window state (stateful)
12//! 3. When a correlation condition is met, a `CorrelationResult` is emitted
13//! 4. Correlation results can chain into higher-level correlations
14
15#[cfg(test)]
16mod tests;
17mod types;
18
19pub use types::*;
20
21use std::collections::HashMap;
22
23use chrono::{DateTime, TimeZone, Utc};
24
25use rsigma_parser::{CorrelationRule, CorrelationType, SigmaCollection, SigmaRule, WindowMode};
26
27use crate::correlation::{
28    CompiledCorrelation, EventBuffer, EventRefBuffer, GroupKey, WindowDecision, WindowState,
29    apply_window_open, compile_correlation,
30};
31use crate::engine::Engine;
32use crate::error::{EvalError, Result};
33use crate::event::{Event, EventValue};
34use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
35use crate::result::{CorrelationBody, EvaluationResult, ResultBody, RuleHeader};
36
37// =============================================================================
38// Correlation Engine
39// =============================================================================
40
41/// Current snapshot schema version. Bump when the serialized format changes.
42const SNAPSHOT_VERSION: u32 = 1;
43
44/// Stateful correlation engine.
45///
46/// Wraps the stateless `Engine` for detection rules and adds time-windowed
47/// correlation on top. Supports all 7 Sigma correlation types and chaining.
48pub struct CorrelationEngine {
49    /// Inner stateless detection engine.
50    engine: Engine,
51    /// Compiled correlation rules.
52    correlations: Vec<CompiledCorrelation>,
53    /// Maps rule ID/name -> indices into `correlations` that reference it.
54    /// This allows quick lookup: "which correlations care about rule X?"
55    rule_index: HashMap<String, Vec<usize>>,
56    /// Maps detection rule index -> (rule_id, rule_name) for reverse lookup.
57    /// Used to find which correlations a detection match triggers.
58    rule_ids: Vec<(Option<String>, Option<String>)>,
59    /// Per-(correlation_index, group_key) window state.
60    state: HashMap<(usize, GroupKey), WindowState>,
61    /// Last alert timestamp per (correlation_index, group_key) for suppression.
62    last_alert: HashMap<(usize, GroupKey), i64>,
63    /// Per-(correlation_index, group_key) compressed event buffer (`Full` mode).
64    event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
65    /// Per-(correlation_index, group_key) event reference buffer (`Refs` mode).
66    event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
67    /// Set of detection rule IDs/names that are "correlation-only"
68    /// (referenced by correlations where `generate == false`).
69    /// Used to filter detection output when `config.emit_detections == false`.
70    correlation_only_rules: std::collections::HashSet<String>,
71    /// Configuration.
72    config: CorrelationConfig,
73    /// Processing pipelines applied to rules during add_rule.
74    pipelines: Vec<Pipeline>,
75}
76
77impl CorrelationEngine {
78    /// Create a new correlation engine with the given configuration.
79    pub fn new(config: CorrelationConfig) -> Self {
80        CorrelationEngine {
81            engine: Engine::new(),
82            correlations: Vec::new(),
83            rule_index: HashMap::new(),
84            rule_ids: Vec::new(),
85            state: HashMap::new(),
86            last_alert: HashMap::new(),
87            event_buffers: HashMap::new(),
88            event_ref_buffers: HashMap::new(),
89            correlation_only_rules: std::collections::HashSet::new(),
90            config,
91            pipelines: Vec::new(),
92        }
93    }
94
95    /// Add a pipeline to the engine.
96    ///
97    /// Pipelines are applied to rules during `add_rule` / `add_collection`.
98    pub fn add_pipeline(&mut self, pipeline: Pipeline) {
99        self.pipelines.push(pipeline);
100        self.pipelines.sort_by_key(|p| p.priority);
101    }
102
103    /// Set global `include_event` on the inner detection engine.
104    pub fn set_include_event(&mut self, include: bool) {
105        self.engine.set_include_event(include);
106    }
107
108    /// Forward to [`crate::Engine::set_match_detail`] on the inner detection
109    /// engine. Correlation detections inherit the level set here.
110    pub fn set_match_detail(&mut self, level: crate::result::MatchDetailLevel) {
111        self.engine.set_match_detail(level);
112    }
113
114    /// Forward to [`crate::Engine::set_bloom_prefilter`] on the inner
115    /// detection engine. Off by default; the optimization helps only on
116    /// substring-heavy rule sets paired with mostly-non-matching events.
117    pub fn set_bloom_prefilter(&mut self, enabled: bool) {
118        self.engine.set_bloom_prefilter(enabled);
119    }
120
121    /// Forward to [`crate::Engine::set_bloom_max_bytes`] on the inner
122    /// detection engine.
123    pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
124        self.engine.set_bloom_max_bytes(max_bytes);
125    }
126
127    /// Forward to [`crate::Engine::set_cross_rule_ac`] on the inner
128    /// detection engine. Off by default. Available behind the
129    /// `daachorse-index` Cargo feature.
130    #[cfg(feature = "daachorse-index")]
131    pub fn set_cross_rule_ac(&mut self, enabled: bool) {
132        self.engine.set_cross_rule_ac(enabled);
133    }
134
135    /// Set the global correlation event mode.
136    ///
137    /// - `None`: no event storage (default)
138    /// - `Full`: compressed event bodies
139    /// - `Refs`: lightweight timestamp + ID references
140    pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
141        self.config.correlation_event_mode = mode;
142    }
143
144    /// Set the maximum number of events to store per correlation window group.
145    /// Only meaningful when `correlation_event_mode` is not `None`.
146    pub fn set_max_correlation_events(&mut self, max: usize) {
147        self.config.max_correlation_events = max;
148    }
149
150    /// Add a single detection rule.
151    ///
152    /// If pipelines are set, the rule is cloned and transformed before compilation.
153    /// The inner engine receives the already-transformed rule directly (not through
154    /// its own pipeline, to avoid double transformation).
155    pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
156        if self.pipelines.is_empty() {
157            self.apply_custom_attributes(&rule.custom_attributes);
158            self.rule_ids.push((rule.id.clone(), rule.name.clone()));
159            self.engine.add_rule(rule)?;
160        } else {
161            let mut transformed = rule.clone();
162            apply_pipelines(&self.pipelines, &mut transformed)?;
163            self.apply_custom_attributes(&transformed.custom_attributes);
164            self.rule_ids
165                .push((transformed.id.clone(), transformed.name.clone()));
166            // Use compile_rule + add_compiled_rule to bypass inner engine's pipelines
167            let compiled = crate::compiler::compile_rule(&transformed)?;
168            self.engine.add_compiled_rule(compiled);
169        }
170        Ok(())
171    }
172
173    /// Read `rsigma.*` custom attributes from a rule and apply them to the
174    /// engine configuration.  This allows pipelines to influence engine
175    /// behaviour via `SetCustomAttribute` transformations — the same pattern
176    /// used by pySigma backends (e.g. pySigma-backend-loki).
177    ///
178    /// Supported attributes:
179    /// - `rsigma.timestamp_field` — prepends a field name to the timestamp
180    ///   extraction priority list so the correlation engine can find the
181    ///   event timestamp in non-standard field names.
182    /// - `rsigma.suppress` — sets the default suppression window (e.g. `5m`).
183    ///   Only applied when the CLI did not already set `--suppress`.
184    /// - `rsigma.action` — sets the default post-fire action (`alert`/`reset`).
185    ///   Only applied when the CLI did not already set `--action`.
186    fn apply_custom_attributes(
187        &mut self,
188        attrs: &std::collections::HashMap<String, yaml_serde::Value>,
189    ) {
190        // rsigma.timestamp_field — prepend to priority list, skip duplicates
191        if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
192            && !self.config.timestamp_fields.iter().any(|f| f == field)
193        {
194            self.config.timestamp_fields.insert(0, field.to_string());
195        }
196
197        // rsigma.suppress — only when CLI didn't already set one
198        if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
199            && self.config.suppress.is_none()
200            && let Ok(ts) = rsigma_parser::Timespan::parse(val)
201        {
202            self.config.suppress = Some(ts.seconds);
203        }
204
205        // rsigma.action — only when CLI left it at the default (Alert)
206        if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
207            && self.config.action_on_match == CorrelationAction::Alert
208            && let Ok(a) = val.parse::<CorrelationAction>()
209        {
210            self.config.action_on_match = a;
211        }
212    }
213
214    /// Add a single correlation rule.
215    pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
216        let owned;
217        let effective = if self.pipelines.is_empty() {
218            corr
219        } else {
220            owned = {
221                let mut c = corr.clone();
222                apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
223                c
224            };
225            &owned
226        };
227
228        // Apply engine-level custom attributes from the (possibly transformed)
229        // correlation rule (e.g. rsigma.timestamp_field).
230        self.apply_custom_attributes(&effective.custom_attributes);
231
232        let compiled = compile_correlation(effective)?;
233        let idx = self.correlations.len();
234
235        // Index by each referenced rule ID/name
236        for rule_ref in &compiled.rule_refs {
237            self.rule_index
238                .entry(rule_ref.clone())
239                .or_default()
240                .push(idx);
241        }
242
243        // Track correlation-only rules (generate == false is the default)
244        if !compiled.generate {
245            for rule_ref in &compiled.rule_refs {
246                self.correlation_only_rules.insert(rule_ref.clone());
247            }
248        }
249
250        self.correlations.push(compiled);
251        Ok(())
252    }
253
254    /// Add all rules and correlations from a parsed collection.
255    ///
256    /// Detection rules are added first (so they're available for correlation
257    /// references), then correlation rules. Detection rules are compiled
258    /// sequentially and then pushed to the inner engine in a single batch,
259    /// so the inverted index and bloom filter are rebuilt exactly once for
260    /// the whole collection. Without this batching, large rule sets
261    /// (multi-thousand rules) hit an O(N²) rebuild cost on load.
262    pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
263        let mut compiled_batch = Vec::with_capacity(collection.rules.len());
264        if self.pipelines.is_empty() {
265            for rule in &collection.rules {
266                self.apply_custom_attributes(&rule.custom_attributes);
267                self.rule_ids.push((rule.id.clone(), rule.name.clone()));
268                compiled_batch.push(crate::compiler::compile_rule(rule)?);
269            }
270        } else {
271            for rule in &collection.rules {
272                let mut transformed = rule.clone();
273                apply_pipelines(&self.pipelines, &mut transformed)?;
274                self.apply_custom_attributes(&transformed.custom_attributes);
275                self.rule_ids
276                    .push((transformed.id.clone(), transformed.name.clone()));
277                // Bypass the inner engine's pipelines (would double-transform)
278                compiled_batch.push(crate::compiler::compile_rule(&transformed)?);
279            }
280        }
281        self.engine.extend_compiled_rules(compiled_batch);
282        // Apply filter rules to the inner engine's detection rules
283        for filter in &collection.filters {
284            self.engine.apply_filter(filter)?;
285        }
286        for corr in &collection.correlations {
287            self.add_correlation(corr)?;
288        }
289        self.validate_rule_refs()?;
290        self.detect_correlation_cycles()?;
291        Ok(())
292    }
293
294    /// Validate that every correlation's `rule_refs` resolve to at least one
295    /// known detection rule (by ID or name) or another correlation (by ID or name).
296    fn validate_rule_refs(&self) -> Result<()> {
297        let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
298
299        for (id, name) in &self.rule_ids {
300            if let Some(id) = id {
301                known.insert(id.as_str());
302            }
303            if let Some(name) = name {
304                known.insert(name.as_str());
305            }
306        }
307        for corr in &self.correlations {
308            if let Some(ref id) = corr.id {
309                known.insert(id.as_str());
310            }
311            if let Some(ref name) = corr.name {
312                known.insert(name.as_str());
313            }
314        }
315
316        for corr in &self.correlations {
317            for rule_ref in &corr.rule_refs {
318                if !known.contains(rule_ref.as_str()) {
319                    return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
320                }
321            }
322        }
323        Ok(())
324    }
325
326    /// Detect cycles in the correlation reference graph.
327    ///
328    /// Builds a directed graph where each correlation (identified by its id/name)
329    /// has edges to the correlations it references via `rule_refs`. Uses DFS with
330    /// a "gray/black" coloring scheme to detect back-edges (cycles).
331    ///
332    /// Returns `Err(EvalError::CorrelationCycle)` if a cycle is found.
333    fn detect_correlation_cycles(&self) -> Result<()> {
334        // Build a set of all correlation identifiers (id and/or name)
335        let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
336        for (idx, corr) in self.correlations.iter().enumerate() {
337            if let Some(ref id) = corr.id {
338                corr_identifiers.insert(id.as_str(), idx);
339            }
340            if let Some(ref name) = corr.name {
341                corr_identifiers.insert(name.as_str(), idx);
342            }
343        }
344
345        // Build adjacency list: corr index → set of corr indices it references
346        let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
347        for (idx, corr) in self.correlations.iter().enumerate() {
348            for rule_ref in &corr.rule_refs {
349                if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
350                    adj[idx].push(target_idx);
351                }
352            }
353        }
354
355        // DFS cycle detection with three states: white (unvisited), gray (in stack), black (done)
356        let mut state = vec![0u8; self.correlations.len()]; // 0=white, 1=gray, 2=black
357        let mut path: Vec<usize> = Vec::new();
358
359        for start in 0..self.correlations.len() {
360            if state[start] == 0
361                && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
362            {
363                let names: Vec<String> = cycle
364                    .iter()
365                    .map(|&i| {
366                        self.correlations[i]
367                            .id
368                            .as_deref()
369                            .or(self.correlations[i].name.as_deref())
370                            .unwrap_or(&self.correlations[i].title)
371                            .to_string()
372                    })
373                    .collect();
374                return Err(crate::error::EvalError::CorrelationCycle(
375                    names.join(" -> "),
376                ));
377            }
378        }
379        Ok(())
380    }
381
382    /// DFS helper that returns the cycle path if a back-edge is found.
383    fn dfs_find_cycle(
384        node: usize,
385        adj: &[Vec<usize>],
386        state: &mut [u8],
387        path: &mut Vec<usize>,
388    ) -> Option<Vec<usize>> {
389        state[node] = 1; // gray
390        path.push(node);
391
392        for &next in &adj[node] {
393            if state[next] == 1 {
394                // Back-edge found — extract cycle from path
395                if let Some(pos) = path.iter().position(|&n| n == next) {
396                    let mut cycle = path[pos..].to_vec();
397                    cycle.push(next); // close the cycle
398                    return Some(cycle);
399                }
400            }
401            if state[next] == 0
402                && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
403            {
404                return Some(cycle);
405            }
406        }
407
408        path.pop();
409        state[node] = 2; // black
410        None
411    }
412
413    /// Process an event, extracting the timestamp from configured event fields.
414    ///
415    /// When no timestamp field is found, the `timestamp_fallback` policy applies:
416    /// - `WallClock`: use `Utc::now()` (good for real-time streaming)
417    /// - `Skip`: return detections only, skip correlation state updates
418    pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
419        let all_detections = self.engine.evaluate(event);
420
421        let ts = match self.extract_event_timestamp(event) {
422            Some(ts) => ts,
423            None => match self.config.timestamp_fallback {
424                TimestampFallback::WallClock => Utc::now().timestamp(),
425                TimestampFallback::Skip => {
426                    // Still run detection (stateless), but skip correlation
427                    return self.filter_detections(all_detections);
428                }
429            },
430        };
431        self.process_with_detections(event, all_detections, ts)
432    }
433
434    /// Process an event with an explicit Unix epoch timestamp (seconds).
435    ///
436    /// The timestamp is clamped to `[0, i64::MAX / 2]` to prevent overflow
437    /// when adding timespan durations internally.
438    pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
439        let all_detections = self.engine.evaluate(event);
440        self.process_with_detections(event, all_detections, timestamp_secs)
441    }
442
443    /// Process an event with pre-computed detection results.
444    ///
445    /// Enables external parallelism: callers can run detection (via
446    /// [`evaluate`](Self::evaluate)) in parallel, then feed results here
447    /// sequentially for stateful correlation.
448    pub fn process_with_detections(
449        &mut self,
450        event: &impl Event,
451        all_detections: Vec<EvaluationResult>,
452        timestamp_secs: i64,
453    ) -> ProcessResult {
454        let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
455
456        // Memory management — evict before adding new state to enforce limit
457        if self.state.len() >= self.config.max_state_entries {
458            self.evict_all(timestamp_secs);
459        }
460
461        // Feed detection matches into correlations
462        let mut correlations: Vec<EvaluationResult> = Vec::new();
463        self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
464
465        // Chain — correlation results may trigger higher-level correlations
466        self.chain_correlations(&correlations, timestamp_secs);
467
468        // Filter detections by generate flag, then append the correlations.
469        let mut out = self.filter_detections(all_detections);
470        out.extend(correlations);
471        out
472    }
473
474    /// Run stateless detection only (no correlation), delegating to the inner engine.
475    ///
476    /// Returns one [`EvaluationResult`] per matched detection. Takes `&self`
477    /// so it can be called concurrently from multiple threads (e.g. via
478    /// `rayon::par_iter`) while the mutable correlation phase runs
479    /// sequentially afterwards.
480    pub fn evaluate(&self, event: &impl Event) -> Vec<EvaluationResult> {
481        self.engine.evaluate(event)
482    }
483
484    /// Process a batch of events: parallel detection, then sequential correlation.
485    ///
486    /// When the `parallel` feature is enabled, the stateless detection phase runs
487    /// concurrently via rayon. Timestamp extraction also runs in the parallel
488    /// phase (it borrows `&self.config` immutably). After `collect()` releases the
489    /// immutable borrows, each event's pre-computed detections are fed into the
490    /// stateful correlation engine sequentially.
491    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
492        // Borrow split: take immutable refs to fields needed for the parallel phase.
493        // These are released by collect() before the sequential &mut self phase.
494        let engine = &self.engine;
495        let ts_fields = &self.config.timestamp_fields;
496
497        let batch_results: Vec<(Vec<EvaluationResult>, Option<i64>)> = {
498            #[cfg(feature = "parallel")]
499            {
500                use rayon::prelude::*;
501                events
502                    .par_iter()
503                    .map(|e| {
504                        let detections = engine.evaluate(e);
505                        let ts = extract_event_ts(e, ts_fields);
506                        (detections, ts)
507                    })
508                    .collect()
509            }
510            #[cfg(not(feature = "parallel"))]
511            {
512                events
513                    .iter()
514                    .map(|e| {
515                        let detections = engine.evaluate(e);
516                        let ts = extract_event_ts(e, ts_fields);
517                        (detections, ts)
518                    })
519                    .collect()
520            }
521        };
522
523        // Sequential correlation phase
524        let mut results = Vec::with_capacity(events.len());
525        for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
526            match ts_opt {
527                Some(ts) => {
528                    results.push(self.process_with_detections(event, detections, ts));
529                }
530                None => match self.config.timestamp_fallback {
531                    TimestampFallback::WallClock => {
532                        let ts = Utc::now().timestamp();
533                        results.push(self.process_with_detections(event, detections, ts));
534                    }
535                    TimestampFallback::Skip => {
536                        // Still return detection results, but skip correlation
537                        results.push(self.filter_detections(detections));
538                    }
539                },
540            }
541        }
542        results
543    }
544
545    /// Filter detections by the `generate` flag / `emit_detections` config.
546    ///
547    /// If `emit_detections` is false and some rules are correlation-only,
548    /// their detection output is suppressed.
549    fn filter_detections(&self, all_detections: Vec<EvaluationResult>) -> Vec<EvaluationResult> {
550        if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
551            all_detections
552                .into_iter()
553                .filter(|m| {
554                    let id_match = m
555                        .header
556                        .rule_id
557                        .as_ref()
558                        .is_some_and(|id| self.correlation_only_rules.contains(id));
559                    !id_match
560                })
561                .collect()
562        } else {
563            all_detections
564        }
565    }
566
567    /// Feed detection matches into correlation window states.
568    fn feed_detections(
569        &mut self,
570        event: &impl Event,
571        detections: &[EvaluationResult],
572        ts: i64,
573        out: &mut Vec<EvaluationResult>,
574    ) {
575        // Collect all (corr_idx, rule_id, rule_name) tuples upfront to avoid
576        // borrow conflicts between self.rule_ids and self.update_correlation.
577        let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
578
579        for det in detections {
580            // Use the MatchResult's rule_id to find the original rule's ID/name.
581            // We also look up by rule_id in our rule_ids table for the name.
582            let (rule_id, rule_name) = self.find_rule_identity(det);
583
584            // Collect correlation indices that reference this rule
585            let mut corr_indices = Vec::new();
586            if let Some(ref id) = rule_id
587                && let Some(indices) = self.rule_index.get(id)
588            {
589                corr_indices.extend(indices);
590            }
591            if let Some(ref name) = rule_name
592                && let Some(indices) = self.rule_index.get(name)
593            {
594                corr_indices.extend(indices);
595            }
596
597            corr_indices.sort_unstable();
598            corr_indices.dedup();
599
600            for &corr_idx in &corr_indices {
601                work.push((corr_idx, rule_id.clone(), rule_name.clone()));
602            }
603        }
604
605        for (corr_idx, rule_id, rule_name) in work {
606            self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
607        }
608    }
609
610    /// Find the (id, name) for a detection match by searching our rule_ids table.
611    fn find_rule_identity(&self, det: &EvaluationResult) -> (Option<String>, Option<String>) {
612        // First, try to find by matching rule_id in our table
613        if let Some(ref match_id) = det.header.rule_id {
614            for (id, name) in &self.rule_ids {
615                if id.as_deref() == Some(match_id.as_str()) {
616                    return (id.clone(), name.clone());
617                }
618            }
619        }
620        // Fall back to using just the EvaluationResult's rule_id
621        (det.header.rule_id.clone(), None)
622    }
623
624    /// Resolve the event mode for a given correlation.
625    fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
626        let corr = &self.correlations[corr_idx];
627        corr.event_mode
628            .unwrap_or(self.config.correlation_event_mode)
629    }
630
631    /// Resolve the max events cap for a given correlation.
632    fn resolve_max_events(&self, corr_idx: usize) -> usize {
633        let corr = &self.correlations[corr_idx];
634        corr.max_events
635            .unwrap_or(self.config.max_correlation_events)
636    }
637
638    /// Update a single correlation's state and check its condition.
639    fn update_correlation(
640        &mut self,
641        corr_idx: usize,
642        event: &impl Event,
643        ts: i64,
644        rule_id: &Option<String>,
645        rule_name: &Option<String>,
646        out: &mut Vec<EvaluationResult>,
647    ) {
648        // Borrow the correlation by reference — no cloning needed.  Rust allows
649        // simultaneous &self.correlations and &mut self.state / &mut self.last_alert
650        // because they are disjoint struct fields.
651        let corr = &self.correlations[corr_idx];
652        let corr_type = corr.correlation_type;
653        let timespan = corr.timespan_secs;
654        let window_mode = corr.window_mode;
655        let gap_secs = corr.gap_secs;
656        let level = corr.level;
657        let suppress_secs = corr.suppress_secs.or(self.config.suppress);
658        let action = corr.action.unwrap_or(self.config.action_on_match);
659        let event_mode = self.resolve_event_mode(corr_idx);
660        let max_events = self.resolve_max_events(corr_idx);
661
662        // Determine the rule_ref strings for alias resolution and temporal tracking.
663        let mut ref_strs: Vec<&str> = Vec::new();
664        if let Some(id) = rule_id.as_deref() {
665            ref_strs.push(id);
666        }
667        if let Some(name) = rule_name.as_deref() {
668            ref_strs.push(name);
669        }
670        let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
671
672        // Extract group key
673        let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
674
675        // Get or create window state
676        let state_key = (corr_idx, group_key.clone());
677        let state = self
678            .state
679            .entry(state_key.clone())
680            .or_insert_with(|| WindowState::new_for(corr_type));
681
682        // Apply the window's pre-insert maintenance (sliding evict, tumbling
683        // bucket reset/late-event discard, or session gap/cap reset). On
684        // `Reset` the event buffers below are cleared in sync; on `Discard`
685        // (a late arrival in an earlier tumbling bucket) the event is dropped
686        // without touching the state or buffers.
687        let cutoff = ts - timespan as i64;
688        let decision = apply_window_open(state, ts, timespan, window_mode, gap_secs);
689        if decision == WindowDecision::Discard {
690            return;
691        }
692        let reset = decision == WindowDecision::Reset;
693
694        // Push the new event into the state
695        match corr_type {
696            CorrelationType::EventCount => {
697                state.push_event_count(ts);
698            }
699            CorrelationType::ValueCount => {
700                if let Some(ref fields) = corr.condition.field
701                    && let Some(key) = composite_value_count_key(event, fields)
702                {
703                    state.push_value_count(ts, key);
704                }
705            }
706            CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
707                state.push_temporal(ts, rule_ref);
708            }
709            CorrelationType::ValueSum
710            | CorrelationType::ValueAvg
711            | CorrelationType::ValuePercentile
712            | CorrelationType::ValueMedian => {
713                if let Some(ref fields) = corr.condition.field
714                    && let Some(field_name) = fields.first()
715                    && let Some(val) = event.get_field(field_name)
716                    && let Some(n) = value_to_f64_ev(&val)
717                {
718                    state.push_numeric(ts, n);
719                }
720            }
721        }
722
723        // Push event into buffer based on event mode. Keep the buffer's retained
724        // events aligned with the window state: sliding evicts by the trailing
725        // cutoff, while tumbling/session clear the buffer when the window reset.
726        match event_mode {
727            CorrelationEventMode::Full => {
728                let buf = self
729                    .event_buffers
730                    .entry(state_key.clone())
731                    .or_insert_with(|| EventBuffer::new(max_events));
732                if window_mode == rsigma_parser::WindowMode::Sliding {
733                    buf.evict(cutoff);
734                } else if reset {
735                    buf.clear();
736                }
737                let json = event.to_json();
738                buf.push(ts, &json);
739            }
740            CorrelationEventMode::Refs => {
741                let buf = self
742                    .event_ref_buffers
743                    .entry(state_key.clone())
744                    .or_insert_with(|| EventRefBuffer::new(max_events));
745                if window_mode == rsigma_parser::WindowMode::Sliding {
746                    buf.evict(cutoff);
747                } else if reset {
748                    buf.clear();
749                }
750                let json = event.to_json();
751                buf.push(ts, &json);
752            }
753            CorrelationEventMode::None => {}
754        }
755
756        // Check condition — after this, `state` is no longer used (NLL drops the borrow).
757        let fired = state.check_condition(
758            &corr.condition,
759            corr_type,
760            &corr.rule_refs,
761            corr.extended_expr.as_ref(),
762        );
763
764        if let Some(agg_value) = fired {
765            let alert_key = (corr_idx, group_key.clone());
766
767            // Suppression check: skip if we've already alerted within the suppress window
768            let suppressed = if let Some(suppress) = suppress_secs {
769                if let Some(&last_ts) = self.last_alert.get(&alert_key) {
770                    (ts - last_ts) < suppress as i64
771                } else {
772                    false
773                }
774            } else {
775                false
776            };
777
778            if !suppressed {
779                // Retrieve stored events / refs based on mode
780                let (events, event_refs) = match event_mode {
781                    CorrelationEventMode::Full => {
782                        let stored = self
783                            .event_buffers
784                            .get(&alert_key)
785                            .map(|buf| buf.decompress_all())
786                            .unwrap_or_default();
787                        (Some(stored), None)
788                    }
789                    CorrelationEventMode::Refs => {
790                        let stored = self
791                            .event_ref_buffers
792                            .get(&alert_key)
793                            .map(|buf| buf.refs())
794                            .unwrap_or_default();
795                        (None, Some(stored))
796                    }
797                    CorrelationEventMode::None => (None, None),
798                };
799
800                // Only clone title/id/tags when we actually produce output
801                let corr = &self.correlations[corr_idx];
802                let result = EvaluationResult {
803                    header: RuleHeader {
804                        rule_title: corr.title.clone(),
805                        rule_id: corr.id.clone(),
806                        level,
807                        tags: corr.tags.clone(),
808                        custom_attributes: corr.custom_attributes.clone(),
809                        enrichments: None,
810                    },
811                    body: ResultBody::Correlation(CorrelationBody {
812                        correlation_type: corr_type,
813                        group_key: group_key.to_pairs(&corr.group_by),
814                        aggregated_value: agg_value,
815                        timespan_secs: timespan,
816                        events,
817                        event_refs,
818                    }),
819                };
820                out.push(result);
821
822                // Record alert time for suppression
823                self.last_alert.insert(alert_key.clone(), ts);
824
825                // Action on match
826                if action == CorrelationAction::Reset {
827                    if let Some(state) = self.state.get_mut(&alert_key) {
828                        state.clear();
829                    }
830                    if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
831                        buf.clear();
832                    }
833                    if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
834                        buf.clear();
835                    }
836                }
837            }
838        }
839    }
840
841    /// Propagate correlation results to higher-level correlations (chaining).
842    ///
843    /// When a correlation fires, any correlation that references it (by ID or name)
844    /// is updated. Limits chain depth to 10 to prevent infinite loops.
845    fn chain_correlations(&mut self, fired: &[EvaluationResult], ts: i64) {
846        const MAX_CHAIN_DEPTH: usize = 10;
847        let mut pending: Vec<EvaluationResult> = fired.to_vec();
848        let mut depth = 0;
849
850        while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
851            depth += 1;
852
853            // Collect work items: (corr_idx, group_key_pairs, fired_ref)
854            #[allow(clippy::type_complexity)]
855            let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
856            for result in &pending {
857                // Only correlation results chain. Detections never reach here.
858                let Some(body) = result.as_correlation() else {
859                    continue;
860                };
861                if let Some(ref id) = result.header.rule_id
862                    && let Some(indices) = self.rule_index.get(id)
863                {
864                    let fired_ref = result
865                        .header
866                        .rule_id
867                        .as_deref()
868                        .unwrap_or(&result.header.rule_title)
869                        .to_string();
870                    for &corr_idx in indices {
871                        work.push((corr_idx, body.group_key.clone(), fired_ref.clone()));
872                    }
873                }
874            }
875
876            let mut next_pending = Vec::new();
877            for (corr_idx, group_key_pairs, fired_ref) in work {
878                let corr = &self.correlations[corr_idx];
879                let corr_type = corr.correlation_type;
880                let timespan = corr.timespan_secs;
881                let window_mode = corr.window_mode;
882                let gap_secs = corr.gap_secs;
883                let level = corr.level;
884
885                let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
886                let state_key = (corr_idx, group_key.clone());
887                let state = self
888                    .state
889                    .entry(state_key)
890                    .or_insert_with(|| WindowState::new_for(corr_type));
891
892                // Late arrivals in an earlier tumbling bucket are discarded;
893                // chained correlations keep no event buffers, so `Reset` needs
894                // no extra bookkeeping here.
895                if apply_window_open(state, ts, timespan, window_mode, gap_secs)
896                    == WindowDecision::Discard
897                {
898                    continue;
899                }
900
901                match corr_type {
902                    CorrelationType::EventCount => {
903                        state.push_event_count(ts);
904                    }
905                    CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
906                        state.push_temporal(ts, &fired_ref);
907                    }
908                    _ => {
909                        state.push_event_count(ts);
910                    }
911                }
912
913                let fired = state.check_condition(
914                    &corr.condition,
915                    corr_type,
916                    &corr.rule_refs,
917                    corr.extended_expr.as_ref(),
918                );
919
920                if let Some(agg_value) = fired {
921                    let corr = &self.correlations[corr_idx];
922                    next_pending.push(EvaluationResult {
923                        header: RuleHeader {
924                            rule_title: corr.title.clone(),
925                            rule_id: corr.id.clone(),
926                            level,
927                            tags: corr.tags.clone(),
928                            custom_attributes: corr.custom_attributes.clone(),
929                            enrichments: None,
930                        },
931                        body: ResultBody::Correlation(CorrelationBody {
932                            correlation_type: corr_type,
933                            group_key: group_key.to_pairs(&corr.group_by),
934                            aggregated_value: agg_value,
935                            timespan_secs: timespan,
936                            // Chained correlations don't include events
937                            // (they aggregate over correlation results, not
938                            // raw events)
939                            events: None,
940                            event_refs: None,
941                        }),
942                    });
943                }
944            }
945
946            pending = next_pending;
947        }
948
949        if !pending.is_empty() {
950            log::warn!(
951                "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
952                 {} pending result(s) were not propagated further. \
953                 This may indicate a cycle in correlation references.",
954                pending.len()
955            );
956        }
957    }
958
959    // =========================================================================
960    // Timestamp extraction
961    // =========================================================================
962
963    /// Extract a Unix epoch timestamp (seconds) from an event.
964    ///
965    /// Tries each configured timestamp field in order. Supports:
966    /// - Numeric values (epoch seconds, or epoch millis if > 1e12)
967    /// - ISO 8601 strings (e.g., "2024-07-10T12:30:00Z")
968    ///
969    /// Returns `None` if no field yields a valid timestamp.
970    fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
971        for field_name in &self.config.timestamp_fields {
972            if let Some(val) = event.get_field(field_name)
973                && let Some(ts) = parse_timestamp_value(&val)
974            {
975                return Some(ts);
976            }
977        }
978        None
979    }
980
981    // =========================================================================
982    // State management
983    // =========================================================================
984
985    /// Manually evict all expired state entries.
986    pub fn evict_expired(&mut self, now_secs: i64) {
987        self.evict_all(now_secs);
988    }
989
990    /// Evict expired entries and remove empty states.
991    fn evict_all(&mut self, now_secs: i64) {
992        // Phase 1: Time-based eviction — remove entries outside their correlation
993        // window. Eviction is window-mode aware:
994        //
995        // - Sliding: trim entries older than the trailing cutoff, as always.
996        // - Tumbling/session: never trim from the front — that would forget the
997        //   bucket/session start and silently weaken the `timespan` cap (the
998        //   window would drift toward sliding semantics). Instead, drop the
999        //   whole group once it is stale (no event within its bucket span /
1000        //   session gap), which is exactly the point at which the next arrival
1001        //   would reset it anyway.
1002        let specs: Vec<(u64, WindowMode, Option<u64>)> = self
1003            .correlations
1004            .iter()
1005            .map(|c| (c.timespan_secs, c.window_mode, c.gap_secs))
1006            .collect();
1007
1008        self.state.retain(|&(corr_idx, _), state| {
1009            if let Some(&(timespan, mode, gap)) = specs.get(corr_idx) {
1010                match mode {
1011                    WindowMode::Sliding => {
1012                        state.evict(now_secs - timespan as i64);
1013                    }
1014                    WindowMode::Tumbling | WindowMode::Session => {
1015                        let staleness = if mode == WindowMode::Session {
1016                            gap.unwrap_or(timespan)
1017                        } else {
1018                            timespan
1019                        } as i64;
1020                        if state
1021                            .latest_timestamp()
1022                            .is_some_and(|last| now_secs - last > staleness)
1023                        {
1024                            state.clear();
1025                        }
1026                    }
1027                }
1028            }
1029            !state.is_empty()
1030        });
1031
1032        // Evict event buffers in sync with window state: sliding buffers trim by
1033        // the trailing cutoff, tumbling/session buffers live and die with their
1034        // window state (dropped above when the group went stale).
1035        let state = &self.state;
1036        self.event_buffers.retain(|key, buf| {
1037            if let Some(&(timespan, mode, _)) = specs.get(key.0) {
1038                match mode {
1039                    WindowMode::Sliding => buf.evict(now_secs - timespan as i64),
1040                    WindowMode::Tumbling | WindowMode::Session => {
1041                        if !state.contains_key(key) {
1042                            return false;
1043                        }
1044                    }
1045                }
1046            }
1047            !buf.is_empty()
1048        });
1049        self.event_ref_buffers.retain(|key, buf| {
1050            if let Some(&(timespan, mode, _)) = specs.get(key.0) {
1051                match mode {
1052                    WindowMode::Sliding => buf.evict(now_secs - timespan as i64),
1053                    WindowMode::Tumbling | WindowMode::Session => {
1054                        if !state.contains_key(key) {
1055                            return false;
1056                        }
1057                    }
1058                }
1059            }
1060            !buf.is_empty()
1061        });
1062
1063        // Phase 2: Hard cap — if still over limit after time-based eviction (e.g.
1064        // high-cardinality traffic with long windows), drop the stalest entries
1065        // until we're at 90% capacity to avoid evicting on every single event.
1066        if self.state.len() >= self.config.max_state_entries {
1067            let target = self.config.max_state_entries * 9 / 10;
1068            let excess = self.state.len() - target;
1069
1070            log::warn!(
1071                "Correlation state hard cap reached ({} entries, max {}); \
1072                 evicting {} stalest entries to {} (90% capacity). \
1073                 This indicates high-cardinality traffic; consider raising \
1074                 max_state_entries or shortening correlation windows.",
1075                self.state.len(),
1076                self.config.max_state_entries,
1077                excess,
1078                target,
1079            );
1080
1081            // Collect keys with their latest timestamp, sort by oldest first
1082            let mut by_staleness: Vec<_> = self
1083                .state
1084                .iter()
1085                .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1086                .collect();
1087            by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1088
1089            // Drop the oldest entries (and their associated event buffers)
1090            for (key, _) in by_staleness.into_iter().take(excess) {
1091                self.state.remove(&key);
1092                self.last_alert.remove(&key);
1093                self.event_buffers.remove(&key);
1094                self.event_ref_buffers.remove(&key);
1095            }
1096        }
1097
1098        // Phase 3: Evict stale last_alert entries — remove if the suppress window
1099        // has passed or if the corresponding window state no longer exists.
1100        self.last_alert.retain(|key, &mut alert_ts| {
1101            let suppress = if key.0 < self.correlations.len() {
1102                self.correlations[key.0]
1103                    .suppress_secs
1104                    .or(self.config.suppress)
1105                    .unwrap_or(0)
1106            } else {
1107                0
1108            };
1109            (now_secs - alert_ts) < suppress as i64
1110        });
1111    }
1112
1113    /// Number of active state entries (for monitoring).
1114    pub fn state_count(&self) -> usize {
1115        self.state.len()
1116    }
1117
1118    /// Number of detection rules loaded.
1119    pub fn detection_rule_count(&self) -> usize {
1120        self.engine.rule_count()
1121    }
1122
1123    /// Number of correlation rules loaded.
1124    pub fn correlation_rule_count(&self) -> usize {
1125        self.correlations.len()
1126    }
1127
1128    /// Number of active event buffers (for monitoring).
1129    pub fn event_buffer_count(&self) -> usize {
1130        self.event_buffers.len()
1131    }
1132
1133    /// Total compressed bytes across all event buffers (for monitoring).
1134    pub fn event_buffer_bytes(&self) -> usize {
1135        self.event_buffers
1136            .values()
1137            .map(|b| b.compressed_bytes())
1138            .sum()
1139    }
1140
1141    /// Number of active event ref buffers — `Refs` mode (for monitoring).
1142    pub fn event_ref_buffer_count(&self) -> usize {
1143        self.event_ref_buffers.len()
1144    }
1145
1146    /// Access the inner stateless engine.
1147    pub fn engine(&self) -> &Engine {
1148        &self.engine
1149    }
1150
1151    /// Export all mutable correlation state as a serializable snapshot.
1152    ///
1153    /// The snapshot uses stable correlation identifiers (id > name > title)
1154    /// instead of internal indices, so it survives rule reloads as long as
1155    /// the correlation rules keep the same identifiers.
1156    pub fn export_state(&self) -> CorrelationSnapshot {
1157        let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1158        for ((idx, gk), ws) in &self.state {
1159            let corr_id = self.correlation_stable_id(*idx);
1160            windows
1161                .entry(corr_id)
1162                .or_default()
1163                .push((gk.clone(), ws.clone()));
1164        }
1165
1166        let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1167        for ((idx, gk), ts) in &self.last_alert {
1168            let corr_id = self.correlation_stable_id(*idx);
1169            last_alert
1170                .entry(corr_id)
1171                .or_default()
1172                .push((gk.clone(), *ts));
1173        }
1174
1175        let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1176        for ((idx, gk), buf) in &self.event_buffers {
1177            let corr_id = self.correlation_stable_id(*idx);
1178            event_buffers
1179                .entry(corr_id)
1180                .or_default()
1181                .push((gk.clone(), buf.clone()));
1182        }
1183
1184        let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1185            HashMap::new();
1186        for ((idx, gk), buf) in &self.event_ref_buffers {
1187            let corr_id = self.correlation_stable_id(*idx);
1188            event_ref_buffers
1189                .entry(corr_id)
1190                .or_default()
1191                .push((gk.clone(), buf.clone()));
1192        }
1193
1194        CorrelationSnapshot {
1195            version: SNAPSHOT_VERSION,
1196            windows,
1197            last_alert,
1198            event_buffers,
1199            event_ref_buffers,
1200        }
1201    }
1202
1203    /// Import previously exported state, mapping stable identifiers back to
1204    /// current correlation indices. Entries whose identifiers no longer match
1205    /// any loaded correlation are silently dropped.
1206    ///
1207    /// Returns `false` (and imports nothing) if the snapshot version is
1208    /// incompatible with the current schema.
1209    pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1210        if snapshot.version != SNAPSHOT_VERSION {
1211            return false;
1212        }
1213        let id_to_idx = self.build_id_to_index_map();
1214
1215        for (corr_id, groups) in snapshot.windows {
1216            if let Some(&idx) = id_to_idx.get(&corr_id) {
1217                for (gk, ws) in groups {
1218                    self.state.insert((idx, gk), ws);
1219                }
1220            }
1221        }
1222
1223        for (corr_id, groups) in snapshot.last_alert {
1224            if let Some(&idx) = id_to_idx.get(&corr_id) {
1225                for (gk, ts) in groups {
1226                    self.last_alert.insert((idx, gk), ts);
1227                }
1228            }
1229        }
1230
1231        for (corr_id, groups) in snapshot.event_buffers {
1232            if let Some(&idx) = id_to_idx.get(&corr_id) {
1233                for (gk, buf) in groups {
1234                    self.event_buffers.insert((idx, gk), buf);
1235                }
1236            }
1237        }
1238
1239        for (corr_id, groups) in snapshot.event_ref_buffers {
1240            if let Some(&idx) = id_to_idx.get(&corr_id) {
1241                for (gk, buf) in groups {
1242                    self.event_ref_buffers.insert((idx, gk), buf);
1243                }
1244            }
1245        }
1246
1247        true
1248    }
1249
1250    /// Stable identifier for a correlation rule: prefers id, then name, then title.
1251    fn correlation_stable_id(&self, idx: usize) -> String {
1252        let corr = &self.correlations[idx];
1253        corr.id
1254            .clone()
1255            .or_else(|| corr.name.clone())
1256            .unwrap_or_else(|| corr.title.clone())
1257    }
1258
1259    /// Build a reverse map from stable id → current correlation index.
1260    fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1261        self.correlations
1262            .iter()
1263            .enumerate()
1264            .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1265            .collect()
1266    }
1267}
1268
1269impl Default for CorrelationEngine {
1270    fn default() -> Self {
1271        Self::new(CorrelationConfig::default())
1272    }
1273}
1274
1275// =============================================================================
1276// Timestamp parsing helpers
1277// =============================================================================
1278
1279/// Extract a timestamp from an event using the given field names.
1280///
1281/// Standalone version of `CorrelationEngine::extract_event_timestamp` for use
1282/// in contexts where borrowing `&self` is not possible (e.g. rayon closures).
1283fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1284    for field_name in timestamp_fields {
1285        if let Some(val) = event.get_field(field_name)
1286            && let Some(ts) = parse_timestamp_value(&val)
1287        {
1288            return Some(ts);
1289        }
1290    }
1291    None
1292}
1293
1294/// Parse an [`EventValue`] as a Unix epoch timestamp in seconds.
1295fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1296    match val {
1297        EventValue::Int(i) => Some(normalize_epoch(*i)),
1298        EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1299        EventValue::Str(s) => parse_timestamp_string(s),
1300        _ => None,
1301    }
1302}
1303
1304/// Normalize an epoch value: if it looks like milliseconds (> year 33658),
1305/// convert to seconds.
1306fn normalize_epoch(v: i64) -> i64 {
1307    if v > 1_000_000_000_000 { v / 1000 } else { v }
1308}
1309
1310/// Parse a timestamp string. Tries ISO 8601 with timezone, then without.
1311fn parse_timestamp_string(s: &str) -> Option<i64> {
1312    // Try RFC 3339 / ISO 8601 with timezone
1313    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1314        return Some(dt.timestamp());
1315    }
1316
1317    // Try ISO 8601 without timezone (assume UTC)
1318    // Common formats: "2024-07-10T12:30:00", "2024-07-10 12:30:00"
1319    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1320        return Some(Utc.from_utc_datetime(&naive).timestamp());
1321    }
1322    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1323        return Some(Utc.from_utc_datetime(&naive).timestamp());
1324    }
1325
1326    // Try with fractional seconds
1327    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1328        return Some(Utc.from_utc_datetime(&naive).timestamp());
1329    }
1330    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1331        return Some(Utc.from_utc_datetime(&naive).timestamp());
1332    }
1333
1334    None
1335}
1336
1337/// Convert an [`EventValue`] to a string for value_count purposes.
1338fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1339    match v {
1340        EventValue::Str(s) => Some(s.to_string()),
1341        EventValue::Int(n) => Some(n.to_string()),
1342        EventValue::Float(f) => Some(f.to_string()),
1343        EventValue::Bool(b) => Some(b.to_string()),
1344        EventValue::Null => Some("null".to_string()),
1345        _ => None,
1346    }
1347}
1348
1349/// Build a composite distinct-key for `value_count` over one or more fields.
1350///
1351/// Each field's value is rendered with [`value_to_string_for_count`] and the
1352/// rendered parts are joined with `\u{1f}` (the ASCII Unit Separator), which
1353/// is unlikely to occur in normal log data. If any field is missing or has a
1354/// type that does not produce a stable string representation, the event is
1355/// excluded from the distinct count (return `None`), matching the historical
1356/// single-field behavior.
1357fn composite_value_count_key(event: &impl Event, fields: &[String]) -> Option<String> {
1358    // Common case: exactly one field. Avoid the separator overhead.
1359    if let [field_name] = fields {
1360        let val = event.get_field(field_name)?;
1361        return value_to_string_for_count(&val);
1362    }
1363
1364    let mut parts = Vec::with_capacity(fields.len());
1365    for field_name in fields {
1366        let val = event.get_field(field_name)?;
1367        let rendered = value_to_string_for_count(&val)?;
1368        parts.push(rendered);
1369    }
1370    Some(parts.join("\u{1f}"))
1371}
1372
1373/// Convert an [`EventValue`] to f64 for numeric aggregation.
1374fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1375    v.as_f64()
1376}