Skip to main content

rsigma_eval/
correlation_engine.rs

1//! Stateful correlation engine with time-windowed aggregation.
2//!
3//! `CorrelationEngine` wraps the stateless `Engine` and adds support for
4//! Sigma correlation rules: `event_count`, `value_count`, `temporal`,
5//! `temporal_ordered`, `value_sum`, `value_avg`, `value_percentile`,
6//! and `value_median`.
7//!
8//! # Architecture
9//!
10//! 1. Events are first evaluated against detection rules (stateless)
11//! 2. Detection matches update correlation window state (stateful)
12//! 3. When a correlation condition is met, a `CorrelationResult` is emitted
13//! 4. Correlation results can chain into higher-level correlations
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use chrono::{DateTime, TimeZone, Utc};
19use serde::Serialize;
20
21use rsigma_parser::{CorrelationRule, CorrelationType, Level, SigmaCollection, SigmaRule};
22
23use crate::correlation::{
24    CompiledCorrelation, EventBuffer, EventRef, EventRefBuffer, GroupKey, WindowState,
25    compile_correlation,
26};
27use crate::engine::Engine;
28use crate::error::{EvalError, Result};
29use crate::event::{Event, EventValue};
30use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
31use crate::result::MatchResult;
32
33// =============================================================================
34// Configuration
35// =============================================================================
36
37/// What to do with window state after a correlation fires.
38///
39/// This is an engine-level default that can be overridden per-correlation
40/// via the `rsigma.action` custom attribute set in processing pipelines.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
42#[serde(rename_all = "snake_case")]
43pub enum CorrelationAction {
44    /// Keep window state as-is after firing (current / default behavior).
45    /// Subsequent events that still satisfy the condition will re-fire.
46    #[default]
47    Alert,
48    /// Clear the window state for the firing group key after emitting the alert.
49    /// The threshold must be met again from scratch before the next alert.
50    Reset,
51}
52
53impl std::str::FromStr for CorrelationAction {
54    type Err = String;
55    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
56        match s {
57            "alert" => Ok(CorrelationAction::Alert),
58            "reset" => Ok(CorrelationAction::Reset),
59            _ => Err(format!(
60                "Unknown correlation action: {s} (expected 'alert' or 'reset')"
61            )),
62        }
63    }
64}
65
66/// How to include events in correlation results.
67///
68/// Can be overridden per-correlation via the `rsigma.correlation_event_mode`
69/// custom attribute.
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
71#[serde(rename_all = "snake_case")]
72pub enum CorrelationEventMode {
73    /// Don't include events (default). Zero memory overhead.
74    #[default]
75    None,
76    /// Include full event bodies, individually compressed with deflate.
77    /// Typical cost: 100–1000 bytes per event.
78    Full,
79    /// Include only event references (timestamp + optional ID).
80    /// Minimal memory: ~40 bytes per event.
81    Refs,
82}
83
84impl std::str::FromStr for CorrelationEventMode {
85    type Err = String;
86    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
87        match s.to_lowercase().as_str() {
88            "none" | "off" | "false" => Ok(CorrelationEventMode::None),
89            "full" | "true" => Ok(CorrelationEventMode::Full),
90            "refs" | "references" => Ok(CorrelationEventMode::Refs),
91            _ => Err(format!(
92                "Unknown correlation event mode: {s} (expected 'none', 'full', or 'refs')"
93            )),
94        }
95    }
96}
97
98impl std::fmt::Display for CorrelationEventMode {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        match self {
101            CorrelationEventMode::None => write!(f, "none"),
102            CorrelationEventMode::Full => write!(f, "full"),
103            CorrelationEventMode::Refs => write!(f, "refs"),
104        }
105    }
106}
107
108/// Behavior when no timestamp field is found or parseable in an event.
109#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
110pub enum TimestampFallback {
111    /// Use wall-clock time (`Utc::now()`). Good for real-time streaming.
112    #[default]
113    WallClock,
114    /// Skip the event from correlation processing. Detections still fire,
115    /// but the event does not update any correlation state. Recommended for
116    /// batch/replay of historical logs where wall-clock time is meaningless.
117    Skip,
118}
119
120/// Configuration for the correlation engine.
121///
122/// Provides engine-level defaults that mirror pySigma backend optional arguments.
123/// Per-correlation overrides can be set via `SetCustomAttribute` pipeline
124/// transformations using the `rsigma.*` attribute namespace.
125#[derive(Debug, Clone)]
126pub struct CorrelationConfig {
127    /// Field names to try for timestamp extraction, in order of priority.
128    ///
129    /// The engine will try each field until one yields a parseable timestamp.
130    /// If none succeed, the `timestamp_fallback` policy applies.
131    pub timestamp_fields: Vec<String>,
132
133    /// What to do when no timestamp can be extracted from an event.
134    ///
135    /// Default: `WallClock` (use `Utc::now()`).
136    pub timestamp_fallback: TimestampFallback,
137
138    /// Maximum number of state entries (across all correlations and groups)
139    /// before aggressive eviction is triggered. Prevents unbounded memory growth.
140    ///
141    /// Default: 100_000.
142    pub max_state_entries: usize,
143
144    /// Default suppression window in seconds.
145    ///
146    /// After a correlation fires for a `(correlation, group_key)`, suppress
147    /// re-alerts for this duration. `None` means no suppression (every
148    /// condition-satisfying event produces an alert).
149    ///
150    /// Can be overridden per-correlation via the `rsigma.suppress` custom attribute.
151    pub suppress: Option<u64>,
152
153    /// Default action to take after a correlation fires.
154    ///
155    /// Can be overridden per-correlation via the `rsigma.action` custom attribute.
156    pub action_on_match: CorrelationAction,
157
158    /// Whether to emit detection-level matches for rules that are only
159    /// referenced by correlations (where `generate: false`).
160    ///
161    /// Default: `true` (emit all detection matches).
162    /// Set to `false` to suppress detection output for correlation-only rules.
163    pub emit_detections: bool,
164
165    /// How to include contributing events in correlation results.
166    ///
167    /// - `None` (default): no event storage, zero overhead.
168    /// - `Full`: events are deflate-compressed and decompressed on output.
169    /// - `Refs`: only timestamps + event IDs are stored (minimal memory).
170    ///
171    /// Can be overridden per-correlation via `rsigma.correlation_event_mode`.
172    pub correlation_event_mode: CorrelationEventMode,
173
174    /// Maximum number of events to store per (correlation, group_key) window
175    /// when `correlation_event_mode` is not `None`.
176    ///
177    /// Bounds memory at: `max_correlation_events × cost_per_event × active_groups`.
178    /// Default: 10.
179    pub max_correlation_events: usize,
180}
181
182impl Default for CorrelationConfig {
183    fn default() -> Self {
184        CorrelationConfig {
185            timestamp_fields: vec![
186                "@timestamp".to_string(),
187                "timestamp".to_string(),
188                "EventTime".to_string(),
189                "TimeCreated".to_string(),
190                "eventTime".to_string(),
191            ],
192            timestamp_fallback: TimestampFallback::default(),
193            max_state_entries: 100_000,
194            suppress: None,
195            action_on_match: CorrelationAction::default(),
196            emit_detections: true,
197            correlation_event_mode: CorrelationEventMode::default(),
198            max_correlation_events: 10,
199        }
200    }
201}
202
203// =============================================================================
204// Result types
205// =============================================================================
206
207/// Combined result from processing a single event.
208#[derive(Debug, Clone, Serialize)]
209pub struct ProcessResult {
210    /// Detection rule matches (stateless, immediate).
211    pub detections: Vec<MatchResult>,
212    /// Correlation rule matches (stateful, accumulated).
213    pub correlations: Vec<CorrelationResult>,
214}
215
216/// The result of a correlation rule firing.
217#[derive(Debug, Clone, Serialize)]
218pub struct CorrelationResult {
219    /// Title of the correlation rule.
220    pub rule_title: String,
221    /// ID of the correlation rule (if present).
222    pub rule_id: Option<String>,
223    /// Severity level.
224    pub level: Option<Level>,
225    /// Tags from the correlation rule.
226    pub tags: Vec<String>,
227    /// Type of correlation.
228    pub correlation_type: CorrelationType,
229    /// Group-by field names and their values for this match.
230    pub group_key: Vec<(String, String)>,
231    /// The aggregated value that triggered the condition (count, sum, avg, etc.).
232    pub aggregated_value: f64,
233    /// The time window in seconds.
234    pub timespan_secs: u64,
235    /// Full event bodies, included when `correlation_event_mode` is `Full`.
236    ///
237    /// Contains up to `max_correlation_events` recently stored window events.
238    /// Events are decompressed from deflate storage on output.
239    #[serde(skip_serializing_if = "Option::is_none")]
240    pub events: Option<Vec<serde_json::Value>>,
241    /// Lightweight event references, included when `correlation_event_mode` is `Refs`.
242    ///
243    /// Contains up to `max_correlation_events` timestamp + optional ID pairs.
244    #[serde(skip_serializing_if = "Option::is_none")]
245    pub event_refs: Option<Vec<EventRef>>,
246    /// Custom attributes from the original Sigma correlation rule (merged
247    /// view of arbitrary top-level keys, the `custom_attributes:` block, and
248    /// any pipeline-applied overrides).
249    #[serde(skip_serializing_if = "HashMap::is_empty")]
250    pub custom_attributes: Arc<HashMap<String, serde_json::Value>>,
251}
252
253// =============================================================================
254// Correlation Engine
255// =============================================================================
256
257/// Stateful correlation engine.
258///
259/// Wraps the stateless `Engine` for detection rules and adds time-windowed
260/// correlation on top. Supports all 7 Sigma correlation types and chaining.
261pub struct CorrelationEngine {
262    /// Inner stateless detection engine.
263    engine: Engine,
264    /// Compiled correlation rules.
265    correlations: Vec<CompiledCorrelation>,
266    /// Maps rule ID/name -> indices into `correlations` that reference it.
267    /// This allows quick lookup: "which correlations care about rule X?"
268    rule_index: HashMap<String, Vec<usize>>,
269    /// Maps detection rule index -> (rule_id, rule_name) for reverse lookup.
270    /// Used to find which correlations a detection match triggers.
271    rule_ids: Vec<(Option<String>, Option<String>)>,
272    /// Per-(correlation_index, group_key) window state.
273    state: HashMap<(usize, GroupKey), WindowState>,
274    /// Last alert timestamp per (correlation_index, group_key) for suppression.
275    last_alert: HashMap<(usize, GroupKey), i64>,
276    /// Per-(correlation_index, group_key) compressed event buffer (`Full` mode).
277    event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
278    /// Per-(correlation_index, group_key) event reference buffer (`Refs` mode).
279    event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
280    /// Set of detection rule IDs/names that are "correlation-only"
281    /// (referenced by correlations where `generate == false`).
282    /// Used to filter detection output when `config.emit_detections == false`.
283    correlation_only_rules: std::collections::HashSet<String>,
284    /// Configuration.
285    config: CorrelationConfig,
286    /// Processing pipelines applied to rules during add_rule.
287    pipelines: Vec<Pipeline>,
288}
289
290impl CorrelationEngine {
291    /// Create a new correlation engine with the given configuration.
292    pub fn new(config: CorrelationConfig) -> Self {
293        CorrelationEngine {
294            engine: Engine::new(),
295            correlations: Vec::new(),
296            rule_index: HashMap::new(),
297            rule_ids: Vec::new(),
298            state: HashMap::new(),
299            last_alert: HashMap::new(),
300            event_buffers: HashMap::new(),
301            event_ref_buffers: HashMap::new(),
302            correlation_only_rules: std::collections::HashSet::new(),
303            config,
304            pipelines: Vec::new(),
305        }
306    }
307
308    /// Add a pipeline to the engine.
309    ///
310    /// Pipelines are applied to rules during `add_rule` / `add_collection`.
311    pub fn add_pipeline(&mut self, pipeline: Pipeline) {
312        self.pipelines.push(pipeline);
313        self.pipelines.sort_by_key(|p| p.priority);
314    }
315
316    /// Set global `include_event` on the inner detection engine.
317    pub fn set_include_event(&mut self, include: bool) {
318        self.engine.set_include_event(include);
319    }
320
321    /// Set the global correlation event mode.
322    ///
323    /// - `None`: no event storage (default)
324    /// - `Full`: compressed event bodies
325    /// - `Refs`: lightweight timestamp + ID references
326    pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
327        self.config.correlation_event_mode = mode;
328    }
329
330    /// Set the maximum number of events to store per correlation window group.
331    /// Only meaningful when `correlation_event_mode` is not `None`.
332    pub fn set_max_correlation_events(&mut self, max: usize) {
333        self.config.max_correlation_events = max;
334    }
335
336    /// Add a single detection rule.
337    ///
338    /// If pipelines are set, the rule is cloned and transformed before compilation.
339    /// The inner engine receives the already-transformed rule directly (not through
340    /// its own pipeline, to avoid double transformation).
341    pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
342        if self.pipelines.is_empty() {
343            self.apply_custom_attributes(&rule.custom_attributes);
344            self.rule_ids.push((rule.id.clone(), rule.name.clone()));
345            self.engine.add_rule(rule)?;
346        } else {
347            let mut transformed = rule.clone();
348            apply_pipelines(&self.pipelines, &mut transformed)?;
349            self.apply_custom_attributes(&transformed.custom_attributes);
350            self.rule_ids
351                .push((transformed.id.clone(), transformed.name.clone()));
352            // Use compile_rule + add_compiled_rule to bypass inner engine's pipelines
353            let compiled = crate::compiler::compile_rule(&transformed)?;
354            self.engine.add_compiled_rule(compiled);
355        }
356        Ok(())
357    }
358
359    /// Read `rsigma.*` custom attributes from a rule and apply them to the
360    /// engine configuration.  This allows pipelines to influence engine
361    /// behaviour via `SetCustomAttribute` transformations — the same pattern
362    /// used by pySigma backends (e.g. pySigma-backend-loki).
363    ///
364    /// Supported attributes:
365    /// - `rsigma.timestamp_field` — prepends a field name to the timestamp
366    ///   extraction priority list so the correlation engine can find the
367    ///   event timestamp in non-standard field names.
368    /// - `rsigma.suppress` — sets the default suppression window (e.g. `5m`).
369    ///   Only applied when the CLI did not already set `--suppress`.
370    /// - `rsigma.action` — sets the default post-fire action (`alert`/`reset`).
371    ///   Only applied when the CLI did not already set `--action`.
372    fn apply_custom_attributes(
373        &mut self,
374        attrs: &std::collections::HashMap<String, serde_yaml::Value>,
375    ) {
376        // rsigma.timestamp_field — prepend to priority list, skip duplicates
377        if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
378            && !self.config.timestamp_fields.iter().any(|f| f == field)
379        {
380            self.config.timestamp_fields.insert(0, field.to_string());
381        }
382
383        // rsigma.suppress — only when CLI didn't already set one
384        if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
385            && self.config.suppress.is_none()
386            && let Ok(ts) = rsigma_parser::Timespan::parse(val)
387        {
388            self.config.suppress = Some(ts.seconds);
389        }
390
391        // rsigma.action — only when CLI left it at the default (Alert)
392        if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
393            && self.config.action_on_match == CorrelationAction::Alert
394            && let Ok(a) = val.parse::<CorrelationAction>()
395        {
396            self.config.action_on_match = a;
397        }
398    }
399
400    /// Add a single correlation rule.
401    pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
402        let owned;
403        let effective = if self.pipelines.is_empty() {
404            corr
405        } else {
406            owned = {
407                let mut c = corr.clone();
408                apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
409                c
410            };
411            &owned
412        };
413
414        // Apply engine-level custom attributes from the (possibly transformed)
415        // correlation rule (e.g. rsigma.timestamp_field).
416        self.apply_custom_attributes(&effective.custom_attributes);
417
418        let compiled = compile_correlation(effective)?;
419        let idx = self.correlations.len();
420
421        // Index by each referenced rule ID/name
422        for rule_ref in &compiled.rule_refs {
423            self.rule_index
424                .entry(rule_ref.clone())
425                .or_default()
426                .push(idx);
427        }
428
429        // Track correlation-only rules (generate == false is the default)
430        if !compiled.generate {
431            for rule_ref in &compiled.rule_refs {
432                self.correlation_only_rules.insert(rule_ref.clone());
433            }
434        }
435
436        self.correlations.push(compiled);
437        Ok(())
438    }
439
440    /// Add all rules and correlations from a parsed collection.
441    ///
442    /// Detection rules are added first (so they're available for correlation
443    /// references), then correlation rules.
444    pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
445        for rule in &collection.rules {
446            self.add_rule(rule)?;
447        }
448        // Apply filter rules to the inner engine's detection rules
449        for filter in &collection.filters {
450            self.engine.apply_filter(filter)?;
451        }
452        for corr in &collection.correlations {
453            self.add_correlation(corr)?;
454        }
455        self.validate_rule_refs()?;
456        self.detect_correlation_cycles()?;
457        Ok(())
458    }
459
460    /// Validate that every correlation's `rule_refs` resolve to at least one
461    /// known detection rule (by ID or name) or another correlation (by ID or name).
462    fn validate_rule_refs(&self) -> Result<()> {
463        let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
464
465        for (id, name) in &self.rule_ids {
466            if let Some(id) = id {
467                known.insert(id.as_str());
468            }
469            if let Some(name) = name {
470                known.insert(name.as_str());
471            }
472        }
473        for corr in &self.correlations {
474            if let Some(ref id) = corr.id {
475                known.insert(id.as_str());
476            }
477            if let Some(ref name) = corr.name {
478                known.insert(name.as_str());
479            }
480        }
481
482        for corr in &self.correlations {
483            for rule_ref in &corr.rule_refs {
484                if !known.contains(rule_ref.as_str()) {
485                    return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
486                }
487            }
488        }
489        Ok(())
490    }
491
492    /// Detect cycles in the correlation reference graph.
493    ///
494    /// Builds a directed graph where each correlation (identified by its id/name)
495    /// has edges to the correlations it references via `rule_refs`. Uses DFS with
496    /// a "gray/black" coloring scheme to detect back-edges (cycles).
497    ///
498    /// Returns `Err(EvalError::CorrelationCycle)` if a cycle is found.
499    fn detect_correlation_cycles(&self) -> Result<()> {
500        // Build a set of all correlation identifiers (id and/or name)
501        let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
502        for (idx, corr) in self.correlations.iter().enumerate() {
503            if let Some(ref id) = corr.id {
504                corr_identifiers.insert(id.as_str(), idx);
505            }
506            if let Some(ref name) = corr.name {
507                corr_identifiers.insert(name.as_str(), idx);
508            }
509        }
510
511        // Build adjacency list: corr index → set of corr indices it references
512        let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
513        for (idx, corr) in self.correlations.iter().enumerate() {
514            for rule_ref in &corr.rule_refs {
515                if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
516                    adj[idx].push(target_idx);
517                }
518            }
519        }
520
521        // DFS cycle detection with three states: white (unvisited), gray (in stack), black (done)
522        let mut state = vec![0u8; self.correlations.len()]; // 0=white, 1=gray, 2=black
523        let mut path: Vec<usize> = Vec::new();
524
525        for start in 0..self.correlations.len() {
526            if state[start] == 0
527                && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
528            {
529                let names: Vec<String> = cycle
530                    .iter()
531                    .map(|&i| {
532                        self.correlations[i]
533                            .id
534                            .as_deref()
535                            .or(self.correlations[i].name.as_deref())
536                            .unwrap_or(&self.correlations[i].title)
537                            .to_string()
538                    })
539                    .collect();
540                return Err(crate::error::EvalError::CorrelationCycle(
541                    names.join(" -> "),
542                ));
543            }
544        }
545        Ok(())
546    }
547
548    /// DFS helper that returns the cycle path if a back-edge is found.
549    fn dfs_find_cycle(
550        node: usize,
551        adj: &[Vec<usize>],
552        state: &mut [u8],
553        path: &mut Vec<usize>,
554    ) -> Option<Vec<usize>> {
555        state[node] = 1; // gray
556        path.push(node);
557
558        for &next in &adj[node] {
559            if state[next] == 1 {
560                // Back-edge found — extract cycle from path
561                if let Some(pos) = path.iter().position(|&n| n == next) {
562                    let mut cycle = path[pos..].to_vec();
563                    cycle.push(next); // close the cycle
564                    return Some(cycle);
565                }
566            }
567            if state[next] == 0
568                && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
569            {
570                return Some(cycle);
571            }
572        }
573
574        path.pop();
575        state[node] = 2; // black
576        None
577    }
578
579    /// Process an event, extracting the timestamp from configured event fields.
580    ///
581    /// When no timestamp field is found, the `timestamp_fallback` policy applies:
582    /// - `WallClock`: use `Utc::now()` (good for real-time streaming)
583    /// - `Skip`: return detections only, skip correlation state updates
584    pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
585        let all_detections = self.engine.evaluate(event);
586
587        let ts = match self.extract_event_timestamp(event) {
588            Some(ts) => ts,
589            None => match self.config.timestamp_fallback {
590                TimestampFallback::WallClock => Utc::now().timestamp(),
591                TimestampFallback::Skip => {
592                    // Still run detection (stateless), but skip correlation
593                    let detections = self.filter_detections(all_detections);
594                    return ProcessResult {
595                        detections,
596                        correlations: Vec::new(),
597                    };
598                }
599            },
600        };
601        self.process_with_detections(event, all_detections, ts)
602    }
603
604    /// Process an event with an explicit Unix epoch timestamp (seconds).
605    ///
606    /// The timestamp is clamped to `[0, i64::MAX / 2]` to prevent overflow
607    /// when adding timespan durations internally.
608    pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
609        let all_detections = self.engine.evaluate(event);
610        self.process_with_detections(event, all_detections, timestamp_secs)
611    }
612
613    /// Process an event with pre-computed detection results.
614    ///
615    /// Enables external parallelism: callers can run detection (via
616    /// [`evaluate`](Self::evaluate)) in parallel, then feed results here
617    /// sequentially for stateful correlation.
618    pub fn process_with_detections(
619        &mut self,
620        event: &impl Event,
621        all_detections: Vec<MatchResult>,
622        timestamp_secs: i64,
623    ) -> ProcessResult {
624        let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
625
626        // Memory management — evict before adding new state to enforce limit
627        if self.state.len() >= self.config.max_state_entries {
628            self.evict_all(timestamp_secs);
629        }
630
631        // Feed detection matches into correlations
632        let mut correlations = Vec::new();
633        self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
634
635        // Chain — correlation results may trigger higher-level correlations
636        self.chain_correlations(&correlations, timestamp_secs);
637
638        // Filter detections by generate flag
639        let detections = self.filter_detections(all_detections);
640
641        ProcessResult {
642            detections,
643            correlations,
644        }
645    }
646
647    /// Run stateless detection only (no correlation), delegating to the inner engine.
648    ///
649    /// Takes `&self` so it can be called concurrently from multiple threads
650    /// (e.g. via `rayon::par_iter`) while the mutable correlation phase runs
651    /// sequentially afterwards.
652    pub fn evaluate(&self, event: &impl Event) -> Vec<MatchResult> {
653        self.engine.evaluate(event)
654    }
655
656    /// Process a batch of events: parallel detection, then sequential correlation.
657    ///
658    /// When the `parallel` feature is enabled, the stateless detection phase runs
659    /// concurrently via rayon. Timestamp extraction also runs in the parallel
660    /// phase (it borrows `&self.config` immutably). After `collect()` releases the
661    /// immutable borrows, each event's pre-computed detections are fed into the
662    /// stateful correlation engine sequentially.
663    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
664        // Borrow split: take immutable refs to fields needed for the parallel phase.
665        // These are released by collect() before the sequential &mut self phase.
666        let engine = &self.engine;
667        let ts_fields = &self.config.timestamp_fields;
668
669        let batch_results: Vec<(Vec<MatchResult>, Option<i64>)> = {
670            #[cfg(feature = "parallel")]
671            {
672                use rayon::prelude::*;
673                events
674                    .par_iter()
675                    .map(|e| {
676                        let detections = engine.evaluate(e);
677                        let ts = extract_event_ts(e, ts_fields);
678                        (detections, ts)
679                    })
680                    .collect()
681            }
682            #[cfg(not(feature = "parallel"))]
683            {
684                events
685                    .iter()
686                    .map(|e| {
687                        let detections = engine.evaluate(e);
688                        let ts = extract_event_ts(e, ts_fields);
689                        (detections, ts)
690                    })
691                    .collect()
692            }
693        };
694
695        // Sequential correlation phase
696        let mut results = Vec::with_capacity(events.len());
697        for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
698            match ts_opt {
699                Some(ts) => {
700                    results.push(self.process_with_detections(event, detections, ts));
701                }
702                None => match self.config.timestamp_fallback {
703                    TimestampFallback::WallClock => {
704                        let ts = Utc::now().timestamp();
705                        results.push(self.process_with_detections(event, detections, ts));
706                    }
707                    TimestampFallback::Skip => {
708                        // Still return detection results, but skip correlation
709                        let detections = self.filter_detections(detections);
710                        results.push(ProcessResult {
711                            detections,
712                            correlations: Vec::new(),
713                        });
714                    }
715                },
716            }
717        }
718        results
719    }
720
721    /// Filter detections by the `generate` flag / `emit_detections` config.
722    ///
723    /// If `emit_detections` is false and some rules are correlation-only,
724    /// their detection output is suppressed.
725    fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
726        if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
727            all_detections
728                .into_iter()
729                .filter(|m| {
730                    let id_match = m
731                        .rule_id
732                        .as_ref()
733                        .is_some_and(|id| self.correlation_only_rules.contains(id));
734                    !id_match
735                })
736                .collect()
737        } else {
738            all_detections
739        }
740    }
741
742    /// Feed detection matches into correlation window states.
743    fn feed_detections(
744        &mut self,
745        event: &impl Event,
746        detections: &[MatchResult],
747        ts: i64,
748        out: &mut Vec<CorrelationResult>,
749    ) {
750        // Collect all (corr_idx, rule_id, rule_name) tuples upfront to avoid
751        // borrow conflicts between self.rule_ids and self.update_correlation.
752        let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
753
754        for det in detections {
755            // Use the MatchResult's rule_id to find the original rule's ID/name.
756            // We also look up by rule_id in our rule_ids table for the name.
757            let (rule_id, rule_name) = self.find_rule_identity(det);
758
759            // Collect correlation indices that reference this rule
760            let mut corr_indices = Vec::new();
761            if let Some(ref id) = rule_id
762                && let Some(indices) = self.rule_index.get(id)
763            {
764                corr_indices.extend(indices);
765            }
766            if let Some(ref name) = rule_name
767                && let Some(indices) = self.rule_index.get(name)
768            {
769                corr_indices.extend(indices);
770            }
771
772            corr_indices.sort_unstable();
773            corr_indices.dedup();
774
775            for &corr_idx in &corr_indices {
776                work.push((corr_idx, rule_id.clone(), rule_name.clone()));
777            }
778        }
779
780        for (corr_idx, rule_id, rule_name) in work {
781            self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
782        }
783    }
784
785    /// Find the (id, name) for a detection match by searching our rule_ids table.
786    fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
787        // First, try to find by matching rule_id in our table
788        if let Some(ref match_id) = det.rule_id {
789            for (id, name) in &self.rule_ids {
790                if id.as_deref() == Some(match_id.as_str()) {
791                    return (id.clone(), name.clone());
792                }
793            }
794        }
795        // Fall back to using just the MatchResult's rule_id
796        (det.rule_id.clone(), None)
797    }
798
799    /// Resolve the event mode for a given correlation.
800    fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
801        let corr = &self.correlations[corr_idx];
802        corr.event_mode
803            .unwrap_or(self.config.correlation_event_mode)
804    }
805
806    /// Resolve the max events cap for a given correlation.
807    fn resolve_max_events(&self, corr_idx: usize) -> usize {
808        let corr = &self.correlations[corr_idx];
809        corr.max_events
810            .unwrap_or(self.config.max_correlation_events)
811    }
812
813    /// Update a single correlation's state and check its condition.
814    fn update_correlation(
815        &mut self,
816        corr_idx: usize,
817        event: &impl Event,
818        ts: i64,
819        rule_id: &Option<String>,
820        rule_name: &Option<String>,
821        out: &mut Vec<CorrelationResult>,
822    ) {
823        // Borrow the correlation by reference — no cloning needed.  Rust allows
824        // simultaneous &self.correlations and &mut self.state / &mut self.last_alert
825        // because they are disjoint struct fields.
826        let corr = &self.correlations[corr_idx];
827        let corr_type = corr.correlation_type;
828        let timespan = corr.timespan_secs;
829        let level = corr.level;
830        let suppress_secs = corr.suppress_secs.or(self.config.suppress);
831        let action = corr.action.unwrap_or(self.config.action_on_match);
832        let event_mode = self.resolve_event_mode(corr_idx);
833        let max_events = self.resolve_max_events(corr_idx);
834
835        // Determine the rule_ref strings for alias resolution and temporal tracking.
836        let mut ref_strs: Vec<&str> = Vec::new();
837        if let Some(id) = rule_id.as_deref() {
838            ref_strs.push(id);
839        }
840        if let Some(name) = rule_name.as_deref() {
841            ref_strs.push(name);
842        }
843        let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
844
845        // Extract group key
846        let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
847
848        // Get or create window state
849        let state_key = (corr_idx, group_key.clone());
850        let state = self
851            .state
852            .entry(state_key.clone())
853            .or_insert_with(|| WindowState::new_for(corr_type));
854
855        // Evict expired entries
856        let cutoff = ts - timespan as i64;
857        state.evict(cutoff);
858
859        // Push the new event into the state
860        match corr_type {
861            CorrelationType::EventCount => {
862                state.push_event_count(ts);
863            }
864            CorrelationType::ValueCount => {
865                if let Some(ref field_name) = corr.condition.field
866                    && let Some(val) = event.get_field(field_name)
867                    && let Some(s) = value_to_string_for_count(&val)
868                {
869                    state.push_value_count(ts, s);
870                }
871            }
872            CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
873                state.push_temporal(ts, rule_ref);
874            }
875            CorrelationType::ValueSum
876            | CorrelationType::ValueAvg
877            | CorrelationType::ValuePercentile
878            | CorrelationType::ValueMedian => {
879                if let Some(ref field_name) = corr.condition.field
880                    && let Some(val) = event.get_field(field_name)
881                    && let Some(n) = value_to_f64_ev(&val)
882                {
883                    state.push_numeric(ts, n);
884                }
885            }
886        }
887
888        // Push event into buffer based on event mode
889        match event_mode {
890            CorrelationEventMode::Full => {
891                let buf = self
892                    .event_buffers
893                    .entry(state_key.clone())
894                    .or_insert_with(|| EventBuffer::new(max_events));
895                buf.evict(cutoff);
896                let json = event.to_json();
897                buf.push(ts, &json);
898            }
899            CorrelationEventMode::Refs => {
900                let buf = self
901                    .event_ref_buffers
902                    .entry(state_key.clone())
903                    .or_insert_with(|| EventRefBuffer::new(max_events));
904                buf.evict(cutoff);
905                let json = event.to_json();
906                buf.push(ts, &json);
907            }
908            CorrelationEventMode::None => {}
909        }
910
911        // Check condition — after this, `state` is no longer used (NLL drops the borrow).
912        let fired = state.check_condition(
913            &corr.condition,
914            corr_type,
915            &corr.rule_refs,
916            corr.extended_expr.as_ref(),
917        );
918
919        if let Some(agg_value) = fired {
920            let alert_key = (corr_idx, group_key.clone());
921
922            // Suppression check: skip if we've already alerted within the suppress window
923            let suppressed = if let Some(suppress) = suppress_secs {
924                if let Some(&last_ts) = self.last_alert.get(&alert_key) {
925                    (ts - last_ts) < suppress as i64
926                } else {
927                    false
928                }
929            } else {
930                false
931            };
932
933            if !suppressed {
934                // Retrieve stored events / refs based on mode
935                let (events, event_refs) = match event_mode {
936                    CorrelationEventMode::Full => {
937                        let stored = self
938                            .event_buffers
939                            .get(&alert_key)
940                            .map(|buf| buf.decompress_all())
941                            .unwrap_or_default();
942                        (Some(stored), None)
943                    }
944                    CorrelationEventMode::Refs => {
945                        let stored = self
946                            .event_ref_buffers
947                            .get(&alert_key)
948                            .map(|buf| buf.refs())
949                            .unwrap_or_default();
950                        (None, Some(stored))
951                    }
952                    CorrelationEventMode::None => (None, None),
953                };
954
955                // Only clone title/id/tags when we actually produce output
956                let corr = &self.correlations[corr_idx];
957                let result = CorrelationResult {
958                    rule_title: corr.title.clone(),
959                    rule_id: corr.id.clone(),
960                    level,
961                    tags: corr.tags.clone(),
962                    correlation_type: corr_type,
963                    group_key: group_key.to_pairs(&corr.group_by),
964                    aggregated_value: agg_value,
965                    timespan_secs: timespan,
966                    events,
967                    event_refs,
968                    custom_attributes: corr.custom_attributes.clone(),
969                };
970                out.push(result);
971
972                // Record alert time for suppression
973                self.last_alert.insert(alert_key.clone(), ts);
974
975                // Action on match
976                if action == CorrelationAction::Reset {
977                    if let Some(state) = self.state.get_mut(&alert_key) {
978                        state.clear();
979                    }
980                    if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
981                        buf.clear();
982                    }
983                    if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
984                        buf.clear();
985                    }
986                }
987            }
988        }
989    }
990
991    /// Propagate correlation results to higher-level correlations (chaining).
992    ///
993    /// When a correlation fires, any correlation that references it (by ID or name)
994    /// is updated. Limits chain depth to 10 to prevent infinite loops.
995    fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
996        const MAX_CHAIN_DEPTH: usize = 10;
997        let mut pending: Vec<CorrelationResult> = fired.to_vec();
998        let mut depth = 0;
999
1000        while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
1001            depth += 1;
1002
1003            // Collect work items: (corr_idx, group_key_pairs, fired_ref)
1004            #[allow(clippy::type_complexity)]
1005            let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
1006            for result in &pending {
1007                if let Some(ref id) = result.rule_id
1008                    && let Some(indices) = self.rule_index.get(id)
1009                {
1010                    let fired_ref = result
1011                        .rule_id
1012                        .as_deref()
1013                        .unwrap_or(&result.rule_title)
1014                        .to_string();
1015                    for &corr_idx in indices {
1016                        work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
1017                    }
1018                }
1019            }
1020
1021            let mut next_pending = Vec::new();
1022            for (corr_idx, group_key_pairs, fired_ref) in work {
1023                let corr = &self.correlations[corr_idx];
1024                let corr_type = corr.correlation_type;
1025                let timespan = corr.timespan_secs;
1026                let level = corr.level;
1027
1028                let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
1029                let state_key = (corr_idx, group_key.clone());
1030                let state = self
1031                    .state
1032                    .entry(state_key)
1033                    .or_insert_with(|| WindowState::new_for(corr_type));
1034
1035                let cutoff = ts - timespan as i64;
1036                state.evict(cutoff);
1037
1038                match corr_type {
1039                    CorrelationType::EventCount => {
1040                        state.push_event_count(ts);
1041                    }
1042                    CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
1043                        state.push_temporal(ts, &fired_ref);
1044                    }
1045                    _ => {
1046                        state.push_event_count(ts);
1047                    }
1048                }
1049
1050                let fired = state.check_condition(
1051                    &corr.condition,
1052                    corr_type,
1053                    &corr.rule_refs,
1054                    corr.extended_expr.as_ref(),
1055                );
1056
1057                if let Some(agg_value) = fired {
1058                    let corr = &self.correlations[corr_idx];
1059                    next_pending.push(CorrelationResult {
1060                        rule_title: corr.title.clone(),
1061                        rule_id: corr.id.clone(),
1062                        level,
1063                        tags: corr.tags.clone(),
1064                        correlation_type: corr_type,
1065                        group_key: group_key.to_pairs(&corr.group_by),
1066                        aggregated_value: agg_value,
1067                        timespan_secs: timespan,
1068                        // Chained correlations don't include events (they aggregate
1069                        // over correlation results, not raw events)
1070                        events: None,
1071                        event_refs: None,
1072                        custom_attributes: corr.custom_attributes.clone(),
1073                    });
1074                }
1075            }
1076
1077            pending = next_pending;
1078        }
1079
1080        if !pending.is_empty() {
1081            log::warn!(
1082                "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
1083                 {} pending result(s) were not propagated further. \
1084                 This may indicate a cycle in correlation references.",
1085                pending.len()
1086            );
1087        }
1088    }
1089
1090    // =========================================================================
1091    // Timestamp extraction
1092    // =========================================================================
1093
1094    /// Extract a Unix epoch timestamp (seconds) from an event.
1095    ///
1096    /// Tries each configured timestamp field in order. Supports:
1097    /// - Numeric values (epoch seconds, or epoch millis if > 1e12)
1098    /// - ISO 8601 strings (e.g., "2024-07-10T12:30:00Z")
1099    ///
1100    /// Returns `None` if no field yields a valid timestamp.
1101    fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
1102        for field_name in &self.config.timestamp_fields {
1103            if let Some(val) = event.get_field(field_name)
1104                && let Some(ts) = parse_timestamp_value(&val)
1105            {
1106                return Some(ts);
1107            }
1108        }
1109        None
1110    }
1111
1112    // =========================================================================
1113    // State management
1114    // =========================================================================
1115
1116    /// Manually evict all expired state entries.
1117    pub fn evict_expired(&mut self, now_secs: i64) {
1118        self.evict_all(now_secs);
1119    }
1120
1121    /// Evict expired entries and remove empty states.
1122    fn evict_all(&mut self, now_secs: i64) {
1123        // Phase 1: Time-based eviction — remove entries outside their correlation window
1124        let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
1125
1126        self.state.retain(|&(corr_idx, _), state| {
1127            if corr_idx < timespans.len() {
1128                let cutoff = now_secs - timespans[corr_idx] as i64;
1129                state.evict(cutoff);
1130            }
1131            !state.is_empty()
1132        });
1133
1134        // Evict event buffers in sync with window state
1135        self.event_buffers.retain(|&(corr_idx, _), buf| {
1136            if corr_idx < timespans.len() {
1137                let cutoff = now_secs - timespans[corr_idx] as i64;
1138                buf.evict(cutoff);
1139            }
1140            !buf.is_empty()
1141        });
1142        self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
1143            if corr_idx < timespans.len() {
1144                let cutoff = now_secs - timespans[corr_idx] as i64;
1145                buf.evict(cutoff);
1146            }
1147            !buf.is_empty()
1148        });
1149
1150        // Phase 2: Hard cap — if still over limit after time-based eviction (e.g.
1151        // high-cardinality traffic with long windows), drop the stalest entries
1152        // until we're at 90% capacity to avoid evicting on every single event.
1153        if self.state.len() >= self.config.max_state_entries {
1154            let target = self.config.max_state_entries * 9 / 10;
1155            let excess = self.state.len() - target;
1156
1157            // Collect keys with their latest timestamp, sort by oldest first
1158            let mut by_staleness: Vec<_> = self
1159                .state
1160                .iter()
1161                .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1162                .collect();
1163            by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1164
1165            // Drop the oldest entries (and their associated event buffers)
1166            for (key, _) in by_staleness.into_iter().take(excess) {
1167                self.state.remove(&key);
1168                self.last_alert.remove(&key);
1169                self.event_buffers.remove(&key);
1170                self.event_ref_buffers.remove(&key);
1171            }
1172        }
1173
1174        // Phase 3: Evict stale last_alert entries — remove if the suppress window
1175        // has passed or if the corresponding window state no longer exists.
1176        self.last_alert.retain(|key, &mut alert_ts| {
1177            let suppress = if key.0 < self.correlations.len() {
1178                self.correlations[key.0]
1179                    .suppress_secs
1180                    .or(self.config.suppress)
1181                    .unwrap_or(0)
1182            } else {
1183                0
1184            };
1185            (now_secs - alert_ts) < suppress as i64
1186        });
1187    }
1188
1189    /// Number of active state entries (for monitoring).
1190    pub fn state_count(&self) -> usize {
1191        self.state.len()
1192    }
1193
1194    /// Number of detection rules loaded.
1195    pub fn detection_rule_count(&self) -> usize {
1196        self.engine.rule_count()
1197    }
1198
1199    /// Number of correlation rules loaded.
1200    pub fn correlation_rule_count(&self) -> usize {
1201        self.correlations.len()
1202    }
1203
1204    /// Number of active event buffers (for monitoring).
1205    pub fn event_buffer_count(&self) -> usize {
1206        self.event_buffers.len()
1207    }
1208
1209    /// Total compressed bytes across all event buffers (for monitoring).
1210    pub fn event_buffer_bytes(&self) -> usize {
1211        self.event_buffers
1212            .values()
1213            .map(|b| b.compressed_bytes())
1214            .sum()
1215    }
1216
1217    /// Number of active event ref buffers — `Refs` mode (for monitoring).
1218    pub fn event_ref_buffer_count(&self) -> usize {
1219        self.event_ref_buffers.len()
1220    }
1221
1222    /// Access the inner stateless engine.
1223    pub fn engine(&self) -> &Engine {
1224        &self.engine
1225    }
1226
1227    /// Export all mutable correlation state as a serializable snapshot.
1228    ///
1229    /// The snapshot uses stable correlation identifiers (id > name > title)
1230    /// instead of internal indices, so it survives rule reloads as long as
1231    /// the correlation rules keep the same identifiers.
1232    pub fn export_state(&self) -> CorrelationSnapshot {
1233        let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1234        for ((idx, gk), ws) in &self.state {
1235            let corr_id = self.correlation_stable_id(*idx);
1236            windows
1237                .entry(corr_id)
1238                .or_default()
1239                .push((gk.clone(), ws.clone()));
1240        }
1241
1242        let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1243        for ((idx, gk), ts) in &self.last_alert {
1244            let corr_id = self.correlation_stable_id(*idx);
1245            last_alert
1246                .entry(corr_id)
1247                .or_default()
1248                .push((gk.clone(), *ts));
1249        }
1250
1251        let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1252        for ((idx, gk), buf) in &self.event_buffers {
1253            let corr_id = self.correlation_stable_id(*idx);
1254            event_buffers
1255                .entry(corr_id)
1256                .or_default()
1257                .push((gk.clone(), buf.clone()));
1258        }
1259
1260        let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1261            HashMap::new();
1262        for ((idx, gk), buf) in &self.event_ref_buffers {
1263            let corr_id = self.correlation_stable_id(*idx);
1264            event_ref_buffers
1265                .entry(corr_id)
1266                .or_default()
1267                .push((gk.clone(), buf.clone()));
1268        }
1269
1270        CorrelationSnapshot {
1271            version: SNAPSHOT_VERSION,
1272            windows,
1273            last_alert,
1274            event_buffers,
1275            event_ref_buffers,
1276        }
1277    }
1278
1279    /// Import previously exported state, mapping stable identifiers back to
1280    /// current correlation indices. Entries whose identifiers no longer match
1281    /// any loaded correlation are silently dropped.
1282    ///
1283    /// Returns `false` (and imports nothing) if the snapshot version is
1284    /// incompatible with the current schema.
1285    pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1286        if snapshot.version != SNAPSHOT_VERSION {
1287            return false;
1288        }
1289        let id_to_idx = self.build_id_to_index_map();
1290
1291        for (corr_id, groups) in snapshot.windows {
1292            if let Some(&idx) = id_to_idx.get(&corr_id) {
1293                for (gk, ws) in groups {
1294                    self.state.insert((idx, gk), ws);
1295                }
1296            }
1297        }
1298
1299        for (corr_id, groups) in snapshot.last_alert {
1300            if let Some(&idx) = id_to_idx.get(&corr_id) {
1301                for (gk, ts) in groups {
1302                    self.last_alert.insert((idx, gk), ts);
1303                }
1304            }
1305        }
1306
1307        for (corr_id, groups) in snapshot.event_buffers {
1308            if let Some(&idx) = id_to_idx.get(&corr_id) {
1309                for (gk, buf) in groups {
1310                    self.event_buffers.insert((idx, gk), buf);
1311                }
1312            }
1313        }
1314
1315        for (corr_id, groups) in snapshot.event_ref_buffers {
1316            if let Some(&idx) = id_to_idx.get(&corr_id) {
1317                for (gk, buf) in groups {
1318                    self.event_ref_buffers.insert((idx, gk), buf);
1319                }
1320            }
1321        }
1322
1323        true
1324    }
1325
1326    /// Stable identifier for a correlation rule: prefers id, then name, then title.
1327    fn correlation_stable_id(&self, idx: usize) -> String {
1328        let corr = &self.correlations[idx];
1329        corr.id
1330            .clone()
1331            .or_else(|| corr.name.clone())
1332            .unwrap_or_else(|| corr.title.clone())
1333    }
1334
1335    /// Build a reverse map from stable id → current correlation index.
1336    fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1337        self.correlations
1338            .iter()
1339            .enumerate()
1340            .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1341            .collect()
1342    }
1343}
1344
1345/// Current snapshot schema version. Bump when the serialized format changes.
1346const SNAPSHOT_VERSION: u32 = 1;
1347
1348/// Serializable snapshot of all mutable correlation state.
1349///
1350/// Uses stable string identifiers (correlation id/name/title) as keys so the
1351/// snapshot can be restored after a rule reload, even if internal indices change.
1352/// Inner maps use `Vec<(GroupKey, T)>` instead of `HashMap<GroupKey, T>` because
1353/// `GroupKey` cannot be used as a JSON object key.
1354#[derive(Debug, Clone, Serialize, serde::Deserialize)]
1355pub struct CorrelationSnapshot {
1356    /// Schema version — used to detect incompatible snapshots on load.
1357    #[serde(default = "default_snapshot_version")]
1358    pub version: u32,
1359    /// Per-correlation, per-group window state.
1360    pub windows: HashMap<String, Vec<(GroupKey, WindowState)>>,
1361    /// Per-correlation, per-group last alert timestamp (for suppression).
1362    pub last_alert: HashMap<String, Vec<(GroupKey, i64)>>,
1363    /// Per-correlation, per-group compressed event buffers.
1364    pub event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>>,
1365    /// Per-correlation, per-group event reference buffers.
1366    pub event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>>,
1367}
1368
1369fn default_snapshot_version() -> u32 {
1370    1
1371}
1372
1373impl Default for CorrelationEngine {
1374    fn default() -> Self {
1375        Self::new(CorrelationConfig::default())
1376    }
1377}
1378
1379// =============================================================================
1380// Timestamp parsing helpers
1381// =============================================================================
1382
1383/// Extract a timestamp from an event using the given field names.
1384///
1385/// Standalone version of `CorrelationEngine::extract_event_timestamp` for use
1386/// in contexts where borrowing `&self` is not possible (e.g. rayon closures).
1387fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1388    for field_name in timestamp_fields {
1389        if let Some(val) = event.get_field(field_name)
1390            && let Some(ts) = parse_timestamp_value(&val)
1391        {
1392            return Some(ts);
1393        }
1394    }
1395    None
1396}
1397
1398/// Parse an [`EventValue`] as a Unix epoch timestamp in seconds.
1399fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1400    match val {
1401        EventValue::Int(i) => Some(normalize_epoch(*i)),
1402        EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1403        EventValue::Str(s) => parse_timestamp_string(s),
1404        _ => None,
1405    }
1406}
1407
1408/// Normalize an epoch value: if it looks like milliseconds (> year 33658),
1409/// convert to seconds.
1410fn normalize_epoch(v: i64) -> i64 {
1411    if v > 1_000_000_000_000 { v / 1000 } else { v }
1412}
1413
1414/// Parse a timestamp string. Tries ISO 8601 with timezone, then without.
1415fn parse_timestamp_string(s: &str) -> Option<i64> {
1416    // Try RFC 3339 / ISO 8601 with timezone
1417    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1418        return Some(dt.timestamp());
1419    }
1420
1421    // Try ISO 8601 without timezone (assume UTC)
1422    // Common formats: "2024-07-10T12:30:00", "2024-07-10 12:30:00"
1423    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1424        return Some(Utc.from_utc_datetime(&naive).timestamp());
1425    }
1426    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1427        return Some(Utc.from_utc_datetime(&naive).timestamp());
1428    }
1429
1430    // Try with fractional seconds
1431    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1432        return Some(Utc.from_utc_datetime(&naive).timestamp());
1433    }
1434    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1435        return Some(Utc.from_utc_datetime(&naive).timestamp());
1436    }
1437
1438    None
1439}
1440
1441/// Convert an [`EventValue`] to a string for value_count purposes.
1442fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1443    match v {
1444        EventValue::Str(s) => Some(s.to_string()),
1445        EventValue::Int(n) => Some(n.to_string()),
1446        EventValue::Float(f) => Some(f.to_string()),
1447        EventValue::Bool(b) => Some(b.to_string()),
1448        EventValue::Null => Some("null".to_string()),
1449        _ => None,
1450    }
1451}
1452
1453/// Convert an [`EventValue`] to f64 for numeric aggregation.
1454fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1455    v.as_f64()
1456}
1457
1458// =============================================================================
1459// Tests
1460// =============================================================================
1461
1462#[cfg(test)]
1463mod tests {
1464    use super::*;
1465    use crate::event::JsonEvent;
1466    use rsigma_parser::parse_sigma_yaml;
1467    use serde_json::json;
1468
1469    // =========================================================================
1470    // Timestamp parsing
1471    // =========================================================================
1472
1473    #[test]
1474    fn test_parse_timestamp_epoch_secs() {
1475        let val = EventValue::Int(1720612200);
1476        assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1477    }
1478
1479    #[test]
1480    fn test_parse_timestamp_epoch_millis() {
1481        let val = EventValue::Int(1720612200000);
1482        assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1483    }
1484
1485    #[test]
1486    fn test_parse_timestamp_rfc3339() {
1487        let val = EventValue::Str(std::borrow::Cow::Borrowed("2024-07-10T12:30:00Z"));
1488        let ts = parse_timestamp_value(&val).unwrap();
1489        assert_eq!(ts, 1720614600);
1490    }
1491
1492    #[test]
1493    fn test_parse_timestamp_naive() {
1494        let val = EventValue::Str(std::borrow::Cow::Borrowed("2024-07-10T12:30:00"));
1495        let ts = parse_timestamp_value(&val).unwrap();
1496        assert_eq!(ts, 1720614600);
1497    }
1498
1499    #[test]
1500    fn test_parse_timestamp_with_space() {
1501        let val = EventValue::Str(std::borrow::Cow::Borrowed("2024-07-10 12:30:00"));
1502        let ts = parse_timestamp_value(&val).unwrap();
1503        assert_eq!(ts, 1720614600);
1504    }
1505
1506    #[test]
1507    fn test_parse_timestamp_fractional() {
1508        let val = EventValue::Str(std::borrow::Cow::Borrowed("2024-07-10T12:30:00.123Z"));
1509        let ts = parse_timestamp_value(&val).unwrap();
1510        assert_eq!(ts, 1720614600);
1511    }
1512
1513    #[test]
1514    fn test_extract_timestamp_from_event() {
1515        let config = CorrelationConfig {
1516            timestamp_fields: vec!["@timestamp".to_string()],
1517            max_state_entries: 100_000,
1518            ..Default::default()
1519        };
1520        let engine = CorrelationEngine::new(config);
1521
1522        let v = json!({"@timestamp": "2024-07-10T12:30:00Z", "data": "test"});
1523        let event = JsonEvent::borrow(&v);
1524        let ts = engine.extract_event_timestamp(&event);
1525        assert_eq!(ts, Some(1720614600));
1526    }
1527
1528    #[test]
1529    fn test_extract_timestamp_fallback_fields() {
1530        let config = CorrelationConfig {
1531            timestamp_fields: vec![
1532                "@timestamp".to_string(),
1533                "timestamp".to_string(),
1534                "EventTime".to_string(),
1535            ],
1536            max_state_entries: 100_000,
1537            ..Default::default()
1538        };
1539        let engine = CorrelationEngine::new(config);
1540
1541        // First field missing, second field present
1542        let v = json!({"timestamp": 1720613400, "data": "test"});
1543        let event = JsonEvent::borrow(&v);
1544        let ts = engine.extract_event_timestamp(&event);
1545        assert_eq!(ts, Some(1720613400));
1546    }
1547
1548    #[test]
1549    fn test_extract_timestamp_returns_none_when_missing() {
1550        let config = CorrelationConfig {
1551            timestamp_fields: vec!["@timestamp".to_string()],
1552            ..Default::default()
1553        };
1554        let engine = CorrelationEngine::new(config);
1555
1556        let v = json!({"data": "no timestamp here"});
1557        let event = JsonEvent::borrow(&v);
1558        assert_eq!(engine.extract_event_timestamp(&event), None);
1559    }
1560
1561    #[test]
1562    fn test_timestamp_fallback_skip() {
1563        let yaml = r#"
1564title: test rule
1565id: ts-skip-rule
1566logsource:
1567    product: test
1568detection:
1569    selection:
1570        action: click
1571    condition: selection
1572level: low
1573---
1574title: test correlation
1575correlation:
1576    type: event_count
1577    rules:
1578        - ts-skip-rule
1579    group-by:
1580        - User
1581    timespan: 10s
1582    condition:
1583        gte: 2
1584level: high
1585"#;
1586        let collection = parse_sigma_yaml(yaml).unwrap();
1587        let mut engine = CorrelationEngine::new(CorrelationConfig {
1588            timestamp_fallback: TimestampFallback::Skip,
1589            ..Default::default()
1590        });
1591        engine.add_collection(&collection).unwrap();
1592        assert_eq!(engine.correlation_rule_count(), 1);
1593
1594        // Events with no timestamp field — should NOT update correlation state
1595        let v = json!({"action": "click", "User": "alice"});
1596        let event = JsonEvent::borrow(&v);
1597
1598        let r1 = engine.process_event(&event);
1599        assert!(!r1.detections.is_empty(), "detection should still fire");
1600
1601        let r2 = engine.process_event(&event);
1602        assert!(!r2.detections.is_empty(), "detection should still fire");
1603
1604        let r3 = engine.process_event(&event);
1605        assert!(!r3.detections.is_empty(), "detection should still fire");
1606
1607        // No correlations should fire because events were skipped
1608        assert!(r1.correlations.is_empty());
1609        assert!(r2.correlations.is_empty());
1610        assert!(r3.correlations.is_empty());
1611    }
1612
1613    #[test]
1614    fn test_timestamp_fallback_wallclock_default() {
1615        let yaml = r#"
1616title: test rule
1617id: ts-wc-rule
1618logsource:
1619    product: test
1620detection:
1621    selection:
1622        action: click
1623    condition: selection
1624level: low
1625---
1626title: test correlation
1627correlation:
1628    type: event_count
1629    rules:
1630        - ts-wc-rule
1631    group-by:
1632        - User
1633    timespan: 60s
1634    condition:
1635        gte: 2
1636level: high
1637"#;
1638        let collection = parse_sigma_yaml(yaml).unwrap();
1639        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1640        engine.add_collection(&collection).unwrap();
1641        assert_eq!(engine.correlation_rule_count(), 1);
1642
1643        // Events with no timestamp — WallClock fallback means they get Utc::now()
1644        // and should be close enough to correlate (generous 60s window)
1645        let v = json!({"action": "click", "User": "alice"});
1646        let event = JsonEvent::borrow(&v);
1647
1648        let _r1 = engine.process_event(&event);
1649        let _r2 = engine.process_event(&event);
1650        let r3 = engine.process_event(&event);
1651
1652        // With WallClock, all events get near-identical timestamps and should correlate
1653        assert!(
1654            !r3.correlations.is_empty(),
1655            "WallClock fallback should allow correlation"
1656        );
1657    }
1658
1659    // =========================================================================
1660    // Event count correlation
1661    // =========================================================================
1662
1663    #[test]
1664    fn test_event_count_basic() {
1665        let yaml = r#"
1666title: Base Rule
1667id: base-rule-001
1668name: base_rule
1669logsource:
1670    product: windows
1671    category: process_creation
1672detection:
1673    selection:
1674        CommandLine|contains: 'whoami'
1675    condition: selection
1676level: low
1677---
1678title: Multiple Whoami
1679id: corr-001
1680correlation:
1681    type: event_count
1682    rules:
1683        - base-rule-001
1684    group-by:
1685        - User
1686    timespan: 60s
1687    condition:
1688        gte: 3
1689level: high
1690"#;
1691        let collection = parse_sigma_yaml(yaml).unwrap();
1692        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1693        engine.add_collection(&collection).unwrap();
1694
1695        assert_eq!(engine.detection_rule_count(), 1);
1696        assert_eq!(engine.correlation_rule_count(), 1);
1697
1698        // Send 3 events from same user within the window
1699        let base_ts = 1000i64;
1700        for i in 0..3 {
1701            let v = json!({"CommandLine": "whoami", "User": "admin"});
1702            let event = JsonEvent::borrow(&v);
1703            let result = engine.process_event_at(&event, base_ts + i * 10);
1704
1705            // Each event should match the detection rule
1706            assert_eq!(result.detections.len(), 1);
1707
1708            if i < 2 {
1709                // Not enough events yet
1710                assert!(result.correlations.is_empty());
1711            } else {
1712                // 3rd event triggers the correlation
1713                assert_eq!(result.correlations.len(), 1);
1714                assert_eq!(result.correlations[0].rule_title, "Multiple Whoami");
1715                assert_eq!(result.correlations[0].aggregated_value, 3.0);
1716            }
1717        }
1718    }
1719
1720    #[test]
1721    fn test_event_count_different_groups() {
1722        let yaml = r#"
1723title: Login
1724id: login-001
1725logsource:
1726    category: auth
1727detection:
1728    selection:
1729        EventType: login
1730    condition: selection
1731level: low
1732---
1733title: Many Logins
1734id: corr-login
1735correlation:
1736    type: event_count
1737    rules:
1738        - login-001
1739    group-by:
1740        - User
1741    timespan: 60s
1742    condition:
1743        gte: 3
1744level: high
1745"#;
1746        let collection = parse_sigma_yaml(yaml).unwrap();
1747        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1748        engine.add_collection(&collection).unwrap();
1749
1750        // User "alice" sends 2 events, "bob" sends 3
1751        let ts = 1000i64;
1752        for i in 0..2 {
1753            let v = json!({"EventType": "login", "User": "alice"});
1754            let event = JsonEvent::borrow(&v);
1755            let r = engine.process_event_at(&event, ts + i);
1756            assert!(r.correlations.is_empty());
1757        }
1758        for i in 0..3 {
1759            let v = json!({"EventType": "login", "User": "bob"});
1760            let event = JsonEvent::borrow(&v);
1761            let r = engine.process_event_at(&event, ts + i);
1762            if i == 2 {
1763                assert_eq!(r.correlations.len(), 1);
1764                assert_eq!(
1765                    r.correlations[0].group_key,
1766                    vec![("User".to_string(), "bob".to_string())]
1767                );
1768            }
1769        }
1770    }
1771
1772    #[test]
1773    fn test_event_count_window_expiry() {
1774        let yaml = r#"
1775title: Base
1776id: base-002
1777logsource:
1778    category: test
1779detection:
1780    selection:
1781        action: click
1782    condition: selection
1783---
1784title: Rapid Clicks
1785id: corr-002
1786correlation:
1787    type: event_count
1788    rules:
1789        - base-002
1790    group-by:
1791        - User
1792    timespan: 10s
1793    condition:
1794        gte: 3
1795level: medium
1796"#;
1797        let collection = parse_sigma_yaml(yaml).unwrap();
1798        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1799        engine.add_collection(&collection).unwrap();
1800
1801        // Send 2 events at t=0,1 then 1 event at t=15 (outside window)
1802        let v = json!({"action": "click", "User": "admin"});
1803        let event = JsonEvent::borrow(&v);
1804        engine.process_event_at(&event, 0);
1805        engine.process_event_at(&event, 1);
1806        let r = engine.process_event_at(&event, 15);
1807        // Only 1 event in window [5, 15], not enough
1808        assert!(r.correlations.is_empty());
1809    }
1810
1811    // =========================================================================
1812    // Value count correlation
1813    // =========================================================================
1814
1815    #[test]
1816    fn test_value_count() {
1817        let yaml = r#"
1818title: Failed Login
1819id: failed-login-001
1820logsource:
1821    category: auth
1822detection:
1823    selection:
1824        EventType: failed_login
1825    condition: selection
1826level: low
1827---
1828title: Failed Logins From Many Users
1829id: corr-vc-001
1830correlation:
1831    type: value_count
1832    rules:
1833        - failed-login-001
1834    group-by:
1835        - Host
1836    timespan: 60s
1837    condition:
1838        field: User
1839        gte: 3
1840level: high
1841"#;
1842        let collection = parse_sigma_yaml(yaml).unwrap();
1843        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1844        engine.add_collection(&collection).unwrap();
1845
1846        let ts = 1000i64;
1847        // 3 different users failing login on same host
1848        for (i, user) in ["alice", "bob", "charlie"].iter().enumerate() {
1849            let v = json!({"EventType": "failed_login", "Host": "srv01", "User": user});
1850            let event = JsonEvent::borrow(&v);
1851            let r = engine.process_event_at(&event, ts + i as i64);
1852            if i == 2 {
1853                assert_eq!(r.correlations.len(), 1);
1854                assert_eq!(r.correlations[0].aggregated_value, 3.0);
1855            }
1856        }
1857    }
1858
1859    // =========================================================================
1860    // Temporal correlation
1861    // =========================================================================
1862
1863    #[test]
1864    fn test_temporal() {
1865        let yaml = r#"
1866title: Recon A
1867id: recon-a
1868name: recon_a
1869logsource:
1870    category: process
1871detection:
1872    selection:
1873        CommandLine|contains: 'whoami'
1874    condition: selection
1875---
1876title: Recon B
1877id: recon-b
1878name: recon_b
1879logsource:
1880    category: process
1881detection:
1882    selection:
1883        CommandLine|contains: 'ipconfig'
1884    condition: selection
1885---
1886title: Recon Combo
1887id: corr-temporal
1888correlation:
1889    type: temporal
1890    rules:
1891        - recon-a
1892        - recon-b
1893    group-by:
1894        - User
1895    timespan: 60s
1896    condition:
1897        gte: 2
1898level: high
1899"#;
1900        let collection = parse_sigma_yaml(yaml).unwrap();
1901        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1902        engine.add_collection(&collection).unwrap();
1903
1904        let ts = 1000i64;
1905        // Only recon A fires
1906        let v1 = json!({"CommandLine": "whoami", "User": "admin"});
1907        let ev1 = JsonEvent::borrow(&v1);
1908        let r1 = engine.process_event_at(&ev1, ts);
1909        assert!(r1.correlations.is_empty());
1910
1911        // Now recon B fires — both rules have fired within window
1912        let v2 = json!({"CommandLine": "ipconfig /all", "User": "admin"});
1913        let ev2 = JsonEvent::borrow(&v2);
1914        let r2 = engine.process_event_at(&ev2, ts + 10);
1915        assert_eq!(r2.correlations.len(), 1);
1916        assert_eq!(r2.correlations[0].rule_title, "Recon Combo");
1917    }
1918
1919    // =========================================================================
1920    // Temporal ordered correlation
1921    // =========================================================================
1922
1923    #[test]
1924    fn test_temporal_ordered() {
1925        let yaml = r#"
1926title: Failed Login
1927id: failed-001
1928name: failed_login
1929logsource:
1930    category: auth
1931detection:
1932    selection:
1933        EventType: failed_login
1934    condition: selection
1935---
1936title: Success Login
1937id: success-001
1938name: successful_login
1939logsource:
1940    category: auth
1941detection:
1942    selection:
1943        EventType: success_login
1944    condition: selection
1945---
1946title: Brute Force Then Login
1947id: corr-bf
1948correlation:
1949    type: temporal_ordered
1950    rules:
1951        - failed-001
1952        - success-001
1953    group-by:
1954        - User
1955    timespan: 60s
1956    condition:
1957        gte: 2
1958level: critical
1959"#;
1960        let collection = parse_sigma_yaml(yaml).unwrap();
1961        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1962        engine.add_collection(&collection).unwrap();
1963
1964        let ts = 1000i64;
1965        // Failed login first
1966        let v1 = json!({"EventType": "failed_login", "User": "admin"});
1967        let ev1 = JsonEvent::borrow(&v1);
1968        let r1 = engine.process_event_at(&ev1, ts);
1969        assert!(r1.correlations.is_empty());
1970
1971        // Then successful login — correct order!
1972        let v2 = json!({"EventType": "success_login", "User": "admin"});
1973        let ev2 = JsonEvent::borrow(&v2);
1974        let r2 = engine.process_event_at(&ev2, ts + 10);
1975        assert_eq!(r2.correlations.len(), 1);
1976    }
1977
1978    #[test]
1979    fn test_temporal_ordered_wrong_order() {
1980        let yaml = r#"
1981title: Rule A
1982id: rule-a
1983logsource:
1984    category: test
1985detection:
1986    selection:
1987        type: a
1988    condition: selection
1989---
1990title: Rule B
1991id: rule-b
1992logsource:
1993    category: test
1994detection:
1995    selection:
1996        type: b
1997    condition: selection
1998---
1999title: A then B
2000id: corr-ab
2001correlation:
2002    type: temporal_ordered
2003    rules:
2004        - rule-a
2005        - rule-b
2006    group-by:
2007        - User
2008    timespan: 60s
2009    condition:
2010        gte: 2
2011level: high
2012"#;
2013        let collection = parse_sigma_yaml(yaml).unwrap();
2014        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2015        engine.add_collection(&collection).unwrap();
2016
2017        let ts = 1000i64;
2018        // B fires first, then A — wrong order
2019        let v1 = json!({"type": "b", "User": "admin"});
2020        let ev1 = JsonEvent::borrow(&v1);
2021        engine.process_event_at(&ev1, ts);
2022
2023        let v2 = json!({"type": "a", "User": "admin"});
2024        let ev2 = JsonEvent::borrow(&v2);
2025        let r2 = engine.process_event_at(&ev2, ts + 10);
2026        assert!(r2.correlations.is_empty());
2027    }
2028
2029    // =========================================================================
2030    // Numeric aggregation (value_sum, value_avg)
2031    // =========================================================================
2032
2033    #[test]
2034    fn test_value_sum() {
2035        let yaml = r#"
2036title: Web Access
2037id: web-001
2038logsource:
2039    category: web
2040detection:
2041    selection:
2042        action: upload
2043    condition: selection
2044---
2045title: Large Upload
2046id: corr-sum
2047correlation:
2048    type: value_sum
2049    rules:
2050        - web-001
2051    group-by:
2052        - User
2053    timespan: 60s
2054    condition:
2055        field: bytes_sent
2056        gt: 1000
2057level: high
2058"#;
2059        let collection = parse_sigma_yaml(yaml).unwrap();
2060        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2061        engine.add_collection(&collection).unwrap();
2062
2063        let ts = 1000i64;
2064        let v1 = json!({"action": "upload", "User": "alice", "bytes_sent": 600});
2065        let ev1 = JsonEvent::borrow(&v1);
2066        let r1 = engine.process_event_at(&ev1, ts);
2067        assert!(r1.correlations.is_empty());
2068
2069        let v2 = json!({"action": "upload", "User": "alice", "bytes_sent": 500});
2070        let ev2 = JsonEvent::borrow(&v2);
2071        let r2 = engine.process_event_at(&ev2, ts + 5);
2072        assert_eq!(r2.correlations.len(), 1);
2073        assert!((r2.correlations[0].aggregated_value - 1100.0).abs() < f64::EPSILON);
2074    }
2075
2076    #[test]
2077    fn test_value_avg() {
2078        let yaml = r#"
2079title: Request
2080id: req-001
2081logsource:
2082    category: web
2083detection:
2084    selection:
2085        type: request
2086    condition: selection
2087---
2088title: High Avg Latency
2089id: corr-avg
2090correlation:
2091    type: value_avg
2092    rules:
2093        - req-001
2094    group-by:
2095        - Service
2096    timespan: 60s
2097    condition:
2098        field: latency_ms
2099        gt: 500
2100level: medium
2101"#;
2102        let collection = parse_sigma_yaml(yaml).unwrap();
2103        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2104        engine.add_collection(&collection).unwrap();
2105
2106        let ts = 1000i64;
2107        // Avg of 400, 600, 800 = 600 > 500
2108        for (i, latency) in [400, 600, 800].iter().enumerate() {
2109            let v = json!({"type": "request", "Service": "api", "latency_ms": latency});
2110            let event = JsonEvent::borrow(&v);
2111            let r = engine.process_event_at(&event, ts + i as i64);
2112            if i == 2 {
2113                assert_eq!(r.correlations.len(), 1);
2114                assert!((r.correlations[0].aggregated_value - 600.0).abs() < f64::EPSILON);
2115            }
2116        }
2117    }
2118
2119    // =========================================================================
2120    // State management
2121    // =========================================================================
2122
2123    #[test]
2124    fn test_state_count() {
2125        let yaml = r#"
2126title: Base
2127id: base-sc
2128logsource:
2129    category: test
2130detection:
2131    selection:
2132        action: test
2133    condition: selection
2134---
2135title: Count
2136id: corr-sc
2137correlation:
2138    type: event_count
2139    rules:
2140        - base-sc
2141    group-by:
2142        - User
2143    timespan: 60s
2144    condition:
2145        gte: 100
2146level: low
2147"#;
2148        let collection = parse_sigma_yaml(yaml).unwrap();
2149        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2150        engine.add_collection(&collection).unwrap();
2151
2152        let v = json!({"action": "test", "User": "alice"});
2153        let event = JsonEvent::borrow(&v);
2154        engine.process_event_at(&event, 1000);
2155        assert_eq!(engine.state_count(), 1);
2156
2157        let v2 = json!({"action": "test", "User": "bob"});
2158        let event2 = JsonEvent::borrow(&v2);
2159        engine.process_event_at(&event2, 1001);
2160        assert_eq!(engine.state_count(), 2);
2161
2162        // Evict everything
2163        engine.evict_expired(2000);
2164        assert_eq!(engine.state_count(), 0);
2165    }
2166
2167    // =========================================================================
2168    // Generate flag
2169    // =========================================================================
2170
2171    #[test]
2172    fn test_generate_flag_default_false() {
2173        let yaml = r#"
2174title: Base
2175id: gen-base
2176logsource:
2177    category: test
2178detection:
2179    selection:
2180        action: test
2181    condition: selection
2182---
2183title: Correlation
2184id: gen-corr
2185correlation:
2186    type: event_count
2187    rules:
2188        - gen-base
2189    group-by:
2190        - User
2191    timespan: 60s
2192    condition:
2193        gte: 1
2194level: high
2195"#;
2196        let collection = parse_sigma_yaml(yaml).unwrap();
2197        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2198        engine.add_collection(&collection).unwrap();
2199
2200        // generate defaults to false — detection matches are still returned
2201        // (filtering by generate flag is a backend concern, not eval)
2202        let v = json!({"action": "test", "User": "alice"});
2203        let event = JsonEvent::borrow(&v);
2204        let r = engine.process_event_at(&event, 1000);
2205        assert_eq!(r.detections.len(), 1);
2206        assert_eq!(r.correlations.len(), 1);
2207    }
2208
2209    // =========================================================================
2210    // Real-world example: AWS bucket enumeration
2211    // =========================================================================
2212
2213    #[test]
2214    fn test_aws_bucket_enumeration() {
2215        let yaml = r#"
2216title: Potential Bucket Enumeration on AWS
2217id: f305fd62-beca-47da-ad95-7690a0620084
2218logsource:
2219    product: aws
2220    service: cloudtrail
2221detection:
2222    selection:
2223        eventSource: "s3.amazonaws.com"
2224        eventName: "ListBuckets"
2225    condition: selection
2226level: low
2227---
2228title: Multiple AWS bucket enumerations
2229id: be246094-01d3-4bba-88de-69e582eba0cc
2230status: experimental
2231correlation:
2232    type: event_count
2233    rules:
2234        - f305fd62-beca-47da-ad95-7690a0620084
2235    group-by:
2236        - userIdentity.arn
2237    timespan: 1h
2238    condition:
2239        gte: 5
2240level: high
2241"#;
2242        let collection = parse_sigma_yaml(yaml).unwrap();
2243        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2244        engine.add_collection(&collection).unwrap();
2245
2246        let base_ts = 1_700_000_000i64;
2247        for i in 0..5 {
2248            let v = json!({
2249                "eventSource": "s3.amazonaws.com",
2250                "eventName": "ListBuckets",
2251                "userIdentity.arn": "arn:aws:iam::123456789:user/attacker"
2252            });
2253            let event = JsonEvent::borrow(&v);
2254            let r = engine.process_event_at(&event, base_ts + i * 60);
2255            if i == 4 {
2256                assert_eq!(r.correlations.len(), 1);
2257                assert_eq!(
2258                    r.correlations[0].rule_title,
2259                    "Multiple AWS bucket enumerations"
2260                );
2261                assert_eq!(r.correlations[0].aggregated_value, 5.0);
2262            }
2263        }
2264    }
2265
2266    // =========================================================================
2267    // Chaining: event_count -> temporal_ordered
2268    // =========================================================================
2269
2270    #[test]
2271    fn test_chaining_event_count_to_temporal() {
2272        // Reproduces the spec's "failed logins followed by successful login" example.
2273        // Chain: failed_login (detection) -> many_failed (event_count) -> brute_then_login (temporal_ordered)
2274        let yaml = r#"
2275title: Single failed login
2276id: failed-login-chain
2277name: failed_login
2278logsource:
2279    category: auth
2280detection:
2281    selection:
2282        EventType: failed_login
2283    condition: selection
2284---
2285title: Successful login
2286id: success-login-chain
2287name: successful_login
2288logsource:
2289    category: auth
2290detection:
2291    selection:
2292        EventType: success_login
2293    condition: selection
2294---
2295title: Multiple failed logins
2296id: many-failed-chain
2297name: multiple_failed_login
2298correlation:
2299    type: event_count
2300    rules:
2301        - failed-login-chain
2302    group-by:
2303        - User
2304    timespan: 60s
2305    condition:
2306        gte: 3
2307level: medium
2308---
2309title: Brute Force Followed by Login
2310id: brute-force-chain
2311correlation:
2312    type: temporal_ordered
2313    rules:
2314        - many-failed-chain
2315        - success-login-chain
2316    group-by:
2317        - User
2318    timespan: 120s
2319    condition:
2320        gte: 2
2321level: critical
2322"#;
2323        let collection = parse_sigma_yaml(yaml).unwrap();
2324        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2325        engine.add_collection(&collection).unwrap();
2326
2327        assert_eq!(engine.detection_rule_count(), 2);
2328        assert_eq!(engine.correlation_rule_count(), 2);
2329
2330        let ts = 1000i64;
2331
2332        // Send 3 failed logins → triggers "many_failed_chain"
2333        for i in 0..3 {
2334            let v = json!({"EventType": "failed_login", "User": "victim"});
2335            let event = JsonEvent::borrow(&v);
2336            let r = engine.process_event_at(&event, ts + i);
2337            if i == 2 {
2338                // The event_count correlation should fire
2339                assert!(
2340                    r.correlations
2341                        .iter()
2342                        .any(|c| c.rule_title == "Multiple failed logins"),
2343                    "Expected event_count correlation to fire"
2344                );
2345            }
2346        }
2347
2348        // Now send a successful login → should trigger the chained temporal_ordered
2349        // Note: chaining happens in chain_correlations when many-failed-chain fires
2350        // and then success-login-chain matches the detection.
2351        // The temporal_ordered correlation needs BOTH many-failed-chain AND success-login-chain
2352        // to have fired. success-login-chain is a detection rule, not a correlation,
2353        // so it gets matched via the regular detection path.
2354        let v = json!({"EventType": "success_login", "User": "victim"});
2355        let event = JsonEvent::borrow(&v);
2356        let r = engine.process_event_at(&event, ts + 30);
2357
2358        // The detection should match
2359        assert_eq!(r.detections.len(), 1);
2360        assert_eq!(r.detections[0].rule_title, "Successful login");
2361    }
2362
2363    // =========================================================================
2364    // Field aliases
2365    // =========================================================================
2366
2367    #[test]
2368    fn test_field_aliases() {
2369        let yaml = r#"
2370title: Internal Error
2371id: internal-error-001
2372name: internal_error
2373logsource:
2374    category: web
2375detection:
2376    selection:
2377        http.response.status_code: 500
2378    condition: selection
2379---
2380title: New Connection
2381id: new-conn-001
2382name: new_network_connection
2383logsource:
2384    category: network
2385detection:
2386    selection:
2387        event.type: connection
2388    condition: selection
2389---
2390title: Error Then Connection
2391id: corr-alias
2392correlation:
2393    type: temporal
2394    rules:
2395        - internal-error-001
2396        - new-conn-001
2397    group-by:
2398        - internal_ip
2399    timespan: 60s
2400    condition:
2401        gte: 2
2402    aliases:
2403        internal_ip:
2404            internal_error: destination.ip
2405            new_network_connection: source.ip
2406level: high
2407"#;
2408        let collection = parse_sigma_yaml(yaml).unwrap();
2409        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2410        engine.add_collection(&collection).unwrap();
2411
2412        let ts = 1000i64;
2413
2414        // Internal error with destination.ip = 10.0.0.5
2415        let v1 = json!({
2416            "http.response.status_code": 500,
2417            "destination.ip": "10.0.0.5"
2418        });
2419        let ev1 = JsonEvent::borrow(&v1);
2420        let r1 = engine.process_event_at(&ev1, ts);
2421        assert_eq!(r1.detections.len(), 1);
2422        assert!(r1.correlations.is_empty());
2423
2424        // New connection with source.ip = 10.0.0.5 (same IP, aliased)
2425        let v2 = json!({
2426            "event.type": "connection",
2427            "source.ip": "10.0.0.5"
2428        });
2429        let ev2 = JsonEvent::borrow(&v2);
2430        let r2 = engine.process_event_at(&ev2, ts + 5);
2431        assert_eq!(r2.detections.len(), 1);
2432        // Both rules fired for the same internal_ip group → temporal should fire
2433        assert_eq!(r2.correlations.len(), 1);
2434        assert_eq!(r2.correlations[0].rule_title, "Error Then Connection");
2435        // Check group key contains the aliased field
2436        assert!(
2437            r2.correlations[0]
2438                .group_key
2439                .iter()
2440                .any(|(k, v)| k == "internal_ip" && v == "10.0.0.5")
2441        );
2442    }
2443
2444    // =========================================================================
2445    // Value percentile (basic smoke test)
2446    // =========================================================================
2447
2448    #[test]
2449    fn test_value_percentile() {
2450        let yaml = r#"
2451title: Process Creation
2452id: proc-001
2453logsource:
2454    category: process
2455detection:
2456    selection:
2457        type: process_creation
2458    condition: selection
2459---
2460title: Rare Process
2461id: corr-percentile
2462correlation:
2463    type: value_percentile
2464    rules:
2465        - proc-001
2466    group-by:
2467        - ComputerName
2468    timespan: 60s
2469    condition:
2470        field: image
2471        lte: 50
2472level: medium
2473"#;
2474        let collection = parse_sigma_yaml(yaml).unwrap();
2475        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2476        engine.add_collection(&collection).unwrap();
2477
2478        let ts = 1000i64;
2479        // Push some numeric-ish values for the image field
2480        for (i, val) in [10.0, 20.0, 30.0, 40.0, 50.0].iter().enumerate() {
2481            let v = json!({"type": "process_creation", "ComputerName": "srv01", "image": val});
2482            let event = JsonEvent::borrow(&v);
2483            let _ = engine.process_event_at(&event, ts + i as i64);
2484        }
2485        // The median (30.0) should be <= 50, so condition fires
2486        // Note: percentile implementation is simplified for in-memory eval
2487    }
2488
2489    // =========================================================================
2490    // Extended temporal conditions (end-to-end)
2491    // =========================================================================
2492
2493    #[test]
2494    fn test_extended_temporal_and_condition() {
2495        // Temporal correlation with "rule_a and rule_b" extended condition
2496        let yaml = r#"
2497title: Login Attempt
2498id: login-attempt
2499logsource:
2500    category: auth
2501detection:
2502    selection:
2503        EventType: login_failure
2504    condition: selection
2505---
2506title: Password Change
2507id: password-change
2508logsource:
2509    category: auth
2510detection:
2511    selection:
2512        EventType: password_change
2513    condition: selection
2514---
2515title: Credential Attack
2516correlation:
2517    type: temporal
2518    rules:
2519        - login-attempt
2520        - password-change
2521    group-by:
2522        - User
2523    timespan: 300s
2524    condition: login-attempt and password-change
2525level: high
2526"#;
2527        let collection = parse_sigma_yaml(yaml).unwrap();
2528        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2529        engine.add_collection(&collection).unwrap();
2530
2531        let ts = 1000i64;
2532
2533        // Login failure by alice
2534        let ev1 = json!({"EventType": "login_failure", "User": "alice"});
2535        let r1 = engine.process_event_at(&JsonEvent::borrow(&ev1), ts);
2536        assert!(r1.correlations.is_empty(), "only one rule fired so far");
2537
2538        // Password change by alice — both rules have now fired
2539        let ev2 = json!({"EventType": "password_change", "User": "alice"});
2540        let r2 = engine.process_event_at(&JsonEvent::borrow(&ev2), ts + 10);
2541        assert_eq!(
2542            r2.correlations.len(),
2543            1,
2544            "temporal correlation should fire: both rules matched"
2545        );
2546        assert_eq!(r2.correlations[0].rule_title, "Credential Attack");
2547    }
2548
2549    #[test]
2550    fn test_extended_temporal_or_condition() {
2551        // Temporal with "rule_a or rule_b" — should fire when either fires
2552        let yaml = r#"
2553title: SSH Login
2554id: ssh-login
2555logsource:
2556    category: auth
2557detection:
2558    selection:
2559        EventType: ssh_login
2560    condition: selection
2561---
2562title: VPN Login
2563id: vpn-login
2564logsource:
2565    category: auth
2566detection:
2567    selection:
2568        EventType: vpn_login
2569    condition: selection
2570---
2571title: Any Remote Access
2572correlation:
2573    type: temporal
2574    rules:
2575        - ssh-login
2576        - vpn-login
2577    group-by:
2578        - User
2579    timespan: 60s
2580    condition: ssh-login or vpn-login
2581level: medium
2582"#;
2583        let collection = parse_sigma_yaml(yaml).unwrap();
2584        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2585        engine.add_collection(&collection).unwrap();
2586
2587        // Only SSH login by bob — "or" means this suffices
2588        let ev = json!({"EventType": "ssh_login", "User": "bob"});
2589        let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
2590        assert_eq!(r.correlations.len(), 1);
2591        assert_eq!(r.correlations[0].rule_title, "Any Remote Access");
2592    }
2593
2594    #[test]
2595    fn test_extended_temporal_partial_and_no_fire() {
2596        // Temporal "and" with only one rule firing should not trigger
2597        let yaml = r#"
2598title: Recon Step 1
2599id: recon-1
2600logsource:
2601    category: process
2602detection:
2603    selection:
2604        CommandLine|contains: 'whoami'
2605    condition: selection
2606---
2607title: Recon Step 2
2608id: recon-2
2609logsource:
2610    category: process
2611detection:
2612    selection:
2613        CommandLine|contains: 'ipconfig'
2614    condition: selection
2615---
2616title: Full Recon
2617correlation:
2618    type: temporal
2619    rules:
2620        - recon-1
2621        - recon-2
2622    group-by:
2623        - Host
2624    timespan: 120s
2625    condition: recon-1 and recon-2
2626level: high
2627"#;
2628        let collection = parse_sigma_yaml(yaml).unwrap();
2629        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2630        engine.add_collection(&collection).unwrap();
2631
2632        // Only whoami (recon-1) — should not fire
2633        let ev = json!({"CommandLine": "whoami", "Host": "srv01"});
2634        let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
2635        assert!(r.correlations.is_empty(), "only one of two AND rules fired");
2636
2637        // Now ipconfig (recon-2) — should fire
2638        let ev2 = json!({"CommandLine": "ipconfig /all", "Host": "srv01"});
2639        let r2 = engine.process_event_at(&JsonEvent::borrow(&ev2), 1010);
2640        assert_eq!(r2.correlations.len(), 1);
2641        assert_eq!(r2.correlations[0].rule_title, "Full Recon");
2642    }
2643
2644    // =========================================================================
2645    // Filter rules with correlation engine
2646    // =========================================================================
2647
2648    #[test]
2649    fn test_filter_with_correlation() {
2650        // Detection rule + filter + event_count correlation
2651        let yaml = r#"
2652title: Failed Auth
2653id: failed-auth
2654logsource:
2655    category: auth
2656detection:
2657    selection:
2658        EventType: auth_failure
2659    condition: selection
2660---
2661title: Exclude Service Accounts
2662filter:
2663    rules:
2664        - failed-auth
2665    selection:
2666        User|startswith: 'svc_'
2667    condition: selection
2668---
2669title: Brute Force
2670correlation:
2671    type: event_count
2672    rules:
2673        - failed-auth
2674    group-by:
2675        - User
2676    timespan: 300s
2677    condition:
2678        gte: 3
2679level: critical
2680"#;
2681        let collection = parse_sigma_yaml(yaml).unwrap();
2682        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2683        engine.add_collection(&collection).unwrap();
2684
2685        let ts = 1000i64;
2686
2687        // Service account failures should be filtered — don't count
2688        for i in 0..5 {
2689            let ev = json!({"EventType": "auth_failure", "User": "svc_backup"});
2690            let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + i);
2691            assert!(
2692                r.correlations.is_empty(),
2693                "service account should be filtered, no correlation"
2694            );
2695        }
2696
2697        // Normal user failures should count
2698        for i in 0..2 {
2699            let ev = json!({"EventType": "auth_failure", "User": "alice"});
2700            let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 10 + i);
2701            assert!(r.correlations.is_empty(), "not yet 3 events");
2702        }
2703
2704        // Third failure triggers correlation
2705        let ev = json!({"EventType": "auth_failure", "User": "alice"});
2706        let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 12);
2707        assert_eq!(r.correlations.len(), 1);
2708        assert_eq!(r.correlations[0].rule_title, "Brute Force");
2709    }
2710
2711    // =========================================================================
2712    // action: repeat with correlation engine
2713    // =========================================================================
2714
2715    #[test]
2716    fn test_repeat_rules_in_correlation() {
2717        // Two detection rules via repeat, both feed into event_count
2718        let yaml = r#"
2719title: File Access A
2720id: file-a
2721logsource:
2722    category: file_access
2723detection:
2724    selection:
2725        FileName|endswith: '.docx'
2726    condition: selection
2727---
2728action: repeat
2729title: File Access B
2730id: file-b
2731detection:
2732    selection:
2733        FileName|endswith: '.xlsx'
2734    condition: selection
2735---
2736title: Mass File Access
2737correlation:
2738    type: event_count
2739    rules:
2740        - file-a
2741        - file-b
2742    group-by:
2743        - User
2744    timespan: 60s
2745    condition:
2746        gte: 3
2747level: high
2748"#;
2749        let collection = parse_sigma_yaml(yaml).unwrap();
2750        assert_eq!(collection.rules.len(), 2);
2751        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2752        engine.add_collection(&collection).unwrap();
2753        assert_eq!(engine.detection_rule_count(), 2);
2754
2755        let ts = 1000i64;
2756        // Mix of docx and xlsx accesses by same user
2757        let ev1 = json!({"FileName": "report.docx", "User": "bob"});
2758        engine.process_event_at(&JsonEvent::borrow(&ev1), ts);
2759        let ev2 = json!({"FileName": "data.xlsx", "User": "bob"});
2760        engine.process_event_at(&JsonEvent::borrow(&ev2), ts + 1);
2761        let ev3 = json!({"FileName": "notes.docx", "User": "bob"});
2762        let r = engine.process_event_at(&JsonEvent::borrow(&ev3), ts + 2);
2763
2764        assert_eq!(r.correlations.len(), 1);
2765        assert_eq!(r.correlations[0].rule_title, "Mass File Access");
2766    }
2767
2768    // =========================================================================
2769    // Expand modifier with correlation engine
2770    // =========================================================================
2771
2772    #[test]
2773    fn test_expand_modifier_with_correlation() {
2774        let yaml = r#"
2775title: User Temp File
2776id: user-temp
2777logsource:
2778    category: file_access
2779detection:
2780    selection:
2781        FilePath|expand: 'C:\Users\%User%\Temp'
2782    condition: selection
2783---
2784title: Excessive Temp Access
2785correlation:
2786    type: event_count
2787    rules:
2788        - user-temp
2789    group-by:
2790        - User
2791    timespan: 60s
2792    condition:
2793        gte: 2
2794level: medium
2795"#;
2796        let collection = parse_sigma_yaml(yaml).unwrap();
2797        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2798        engine.add_collection(&collection).unwrap();
2799
2800        let ts = 1000i64;
2801        // Event where User field matches the placeholder
2802        let ev1 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2803        let r1 = engine.process_event_at(&JsonEvent::borrow(&ev1), ts);
2804        assert!(r1.correlations.is_empty());
2805
2806        let ev2 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2807        let r2 = engine.process_event_at(&JsonEvent::borrow(&ev2), ts + 1);
2808        assert_eq!(r2.correlations.len(), 1);
2809        assert_eq!(r2.correlations[0].rule_title, "Excessive Temp Access");
2810
2811        // Different user — should NOT match (path says alice, user is bob)
2812        let ev3 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "bob"});
2813        let r3 = engine.process_event_at(&JsonEvent::borrow(&ev3), ts + 2);
2814        // Detection doesn't fire for this event since expand resolves to C:\Users\bob\Temp
2815        assert_eq!(r3.detections.len(), 0);
2816    }
2817
2818    // =========================================================================
2819    // Timestamp modifier with correlation engine
2820    // =========================================================================
2821
2822    #[test]
2823    fn test_timestamp_modifier_with_correlation() {
2824        let yaml = r#"
2825title: Night Login
2826id: night-login
2827logsource:
2828    category: auth
2829detection:
2830    login:
2831        EventType: login
2832    night:
2833        Timestamp|hour: 3
2834    condition: login and night
2835---
2836title: Frequent Night Logins
2837correlation:
2838    type: event_count
2839    rules:
2840        - night-login
2841    group-by:
2842        - User
2843    timespan: 3600s
2844    condition:
2845        gte: 2
2846level: high
2847"#;
2848        let collection = parse_sigma_yaml(yaml).unwrap();
2849        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2850        engine.add_collection(&collection).unwrap();
2851
2852        let ts = 1000i64;
2853        // Login at 3AM
2854        let ev1 =
2855            json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:10:00Z"});
2856        let r1 = engine.process_event_at(&JsonEvent::borrow(&ev1), ts);
2857        assert_eq!(r1.detections.len(), 1);
2858        assert!(r1.correlations.is_empty());
2859
2860        let ev2 =
2861            json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:45:00Z"});
2862        let r2 = engine.process_event_at(&JsonEvent::borrow(&ev2), ts + 1);
2863        assert_eq!(r2.correlations.len(), 1);
2864        assert_eq!(r2.correlations[0].rule_title, "Frequent Night Logins");
2865
2866        // Login at noon — should NOT count
2867        let ev3 = json!({"EventType": "login", "User": "bob", "Timestamp": "2024-01-15T12:00:00Z"});
2868        let r3 = engine.process_event_at(&JsonEvent::borrow(&ev3), ts + 2);
2869        assert!(
2870            r3.detections.is_empty(),
2871            "noon login should not match night rule"
2872        );
2873    }
2874
2875    // =========================================================================
2876    // Correlation condition range (multiple predicates)
2877    // =========================================================================
2878
2879    #[test]
2880    fn test_event_count_range_condition() {
2881        let yaml = r#"
2882title: Login Attempt
2883id: login-attempt-001
2884name: login_attempt
2885logsource:
2886    product: windows
2887detection:
2888    selection:
2889        EventType: login
2890    condition: selection
2891level: low
2892---
2893title: Login Count Range
2894id: corr-range-001
2895correlation:
2896    type: event_count
2897    rules:
2898        - login-attempt-001
2899    group-by:
2900        - User
2901    timespan: 3600s
2902    condition:
2903        gt: 2
2904        lte: 5
2905level: high
2906"#;
2907        let collection = parse_sigma_yaml(yaml).unwrap();
2908        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2909        engine.add_collection(&collection).unwrap();
2910
2911        let ts: i64 = 1_000_000;
2912
2913        // Send 2 events — gt:2 is false
2914        for i in 0..2 {
2915            let ev = json!({"EventType": "login", "User": "alice"});
2916            let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + i);
2917            assert!(r.correlations.is_empty(), "2 events should not fire (gt:2)");
2918        }
2919
2920        // 3rd event — gt:2 is true, lte:5 is true → fires
2921        let ev3 = json!({"EventType": "login", "User": "alice"});
2922        let r3 = engine.process_event_at(&JsonEvent::borrow(&ev3), ts + 3);
2923        assert_eq!(r3.correlations.len(), 1, "3 events: gt:2 AND lte:5");
2924
2925        // Send events 4, 5 — still in range
2926        for i in 4..=5 {
2927            let ev = json!({"EventType": "login", "User": "alice"});
2928            let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + i);
2929            assert_eq!(r.correlations.len(), 1, "{i} events still in range");
2930        }
2931
2932        // 6th event — lte:5 is false → no fire
2933        let ev6 = json!({"EventType": "login", "User": "alice"});
2934        let r6 = engine.process_event_at(&JsonEvent::borrow(&ev6), ts + 6);
2935        assert!(
2936            r6.correlations.is_empty(),
2937            "6 events exceeds lte:5, should not fire"
2938        );
2939    }
2940
2941    // =========================================================================
2942    // Suppression
2943    // =========================================================================
2944
2945    fn suppression_yaml() -> &'static str {
2946        r#"
2947title: Login
2948id: login-base
2949logsource:
2950    category: auth
2951detection:
2952    selection:
2953        EventType: login
2954    condition: selection
2955---
2956title: Many Logins
2957correlation:
2958    type: event_count
2959    rules:
2960        - login-base
2961    group-by:
2962        - User
2963    timeframe: 60s
2964    condition:
2965        gte: 3
2966level: high
2967"#
2968    }
2969
2970    #[test]
2971    fn test_suppression_window() {
2972        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2973        let config = CorrelationConfig {
2974            suppress: Some(10), // suppress for 10 seconds
2975            ..Default::default()
2976        };
2977        let mut engine = CorrelationEngine::new(config);
2978        engine.add_collection(&collection).unwrap();
2979
2980        let ev = json!({"EventType": "login", "User": "alice"});
2981        let ts = 1000;
2982
2983        // Fire 3 events to hit threshold
2984        engine.process_event_at(&JsonEvent::borrow(&ev), ts);
2985        engine.process_event_at(&JsonEvent::borrow(&ev), ts + 1);
2986        let r3 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 2);
2987        assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
2988
2989        // 4th event within suppress window → suppressed
2990        let r4 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 3);
2991        assert!(
2992            r4.correlations.is_empty(),
2993            "should be suppressed within 10s window"
2994        );
2995
2996        // 5th event still within suppress window → suppressed
2997        let r5 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 9);
2998        assert!(
2999            r5.correlations.is_empty(),
3000            "should be suppressed at ts+9 (< ts+2+10)"
3001        );
3002
3003        // Event after suppress window expires → fires again
3004        let r6 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 13);
3005        assert_eq!(
3006            r6.correlations.len(),
3007            1,
3008            "should fire again after suppress window expires"
3009        );
3010    }
3011
3012    #[test]
3013    fn test_suppression_per_group_key() {
3014        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3015        let config = CorrelationConfig {
3016            suppress: Some(60),
3017            ..Default::default()
3018        };
3019        let mut engine = CorrelationEngine::new(config);
3020        engine.add_collection(&collection).unwrap();
3021
3022        let ts = 1000;
3023
3024        // Alice hits threshold
3025        let ev_a = json!({"EventType": "login", "User": "alice"});
3026        engine.process_event_at(&JsonEvent::borrow(&ev_a), ts);
3027        engine.process_event_at(&JsonEvent::borrow(&ev_a), ts + 1);
3028        let r = engine.process_event_at(&JsonEvent::borrow(&ev_a), ts + 2);
3029        assert_eq!(r.correlations.len(), 1, "alice should fire");
3030
3031        // Bob hits threshold — different group key, not suppressed
3032        let ev_b = json!({"EventType": "login", "User": "bob"});
3033        engine.process_event_at(&JsonEvent::borrow(&ev_b), ts + 3);
3034        engine.process_event_at(&JsonEvent::borrow(&ev_b), ts + 4);
3035        let r = engine.process_event_at(&JsonEvent::borrow(&ev_b), ts + 5);
3036        assert_eq!(r.correlations.len(), 1, "bob should fire independently");
3037
3038        // Alice is still suppressed
3039        let r = engine.process_event_at(&JsonEvent::borrow(&ev_a), ts + 6);
3040        assert!(r.correlations.is_empty(), "alice still suppressed");
3041    }
3042
3043    // =========================================================================
3044    // Action on match: Reset
3045    // =========================================================================
3046
3047    #[test]
3048    fn test_action_reset() {
3049        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3050        let config = CorrelationConfig {
3051            action_on_match: CorrelationAction::Reset,
3052            ..Default::default()
3053        };
3054        let mut engine = CorrelationEngine::new(config);
3055        engine.add_collection(&collection).unwrap();
3056
3057        let ev = json!({"EventType": "login", "User": "alice"});
3058        let ts = 1000;
3059
3060        // Hit threshold: 3 events
3061        engine.process_event_at(&JsonEvent::borrow(&ev), ts);
3062        engine.process_event_at(&JsonEvent::borrow(&ev), ts + 1);
3063        let r3 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 2);
3064        assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
3065
3066        // State was reset, so 4th and 5th events should NOT fire
3067        let r4 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 3);
3068        assert!(r4.correlations.is_empty(), "reset: need 3 more events");
3069
3070        let r5 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 4);
3071        assert!(r5.correlations.is_empty(), "reset: still only 2");
3072
3073        // 6th event (3rd after reset) should fire again
3074        let r6 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 5);
3075        assert_eq!(
3076            r6.correlations.len(),
3077            1,
3078            "should fire again after 3 events post-reset"
3079        );
3080    }
3081
3082    // =========================================================================
3083    // Generate flag / emit_detections
3084    // =========================================================================
3085
3086    #[test]
3087    fn test_emit_detections_true_by_default() {
3088        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3089        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3090        engine.add_collection(&collection).unwrap();
3091
3092        let ev = json!({"EventType": "login", "User": "alice"});
3093        let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
3094        assert_eq!(r.detections.len(), 1, "by default detections are emitted");
3095    }
3096
3097    #[test]
3098    fn test_emit_detections_false_suppresses() {
3099        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3100        let config = CorrelationConfig {
3101            emit_detections: false,
3102            ..Default::default()
3103        };
3104        let mut engine = CorrelationEngine::new(config);
3105        engine.add_collection(&collection).unwrap();
3106
3107        let ev = json!({"EventType": "login", "User": "alice"});
3108        let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
3109        assert!(
3110            r.detections.is_empty(),
3111            "detection matches should be suppressed when emit_detections=false"
3112        );
3113    }
3114
3115    #[test]
3116    fn test_generate_true_keeps_detections() {
3117        // When generate: true, detections should be emitted even with emit_detections=false
3118        let yaml = r#"
3119title: Login
3120id: login-gen
3121logsource:
3122    category: auth
3123detection:
3124    selection:
3125        EventType: login
3126    condition: selection
3127---
3128title: Many Logins
3129correlation:
3130    type: event_count
3131    rules:
3132        - login-gen
3133    group-by:
3134        - User
3135    timeframe: 60s
3136    condition:
3137        gte: 3
3138    generate: true
3139level: high
3140"#;
3141        let collection = parse_sigma_yaml(yaml).unwrap();
3142        let config = CorrelationConfig {
3143            emit_detections: false,
3144            ..Default::default()
3145        };
3146        let mut engine = CorrelationEngine::new(config);
3147        engine.add_collection(&collection).unwrap();
3148
3149        let ev = json!({"EventType": "login", "User": "alice"});
3150        let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
3151        // generate: true means this rule is NOT correlation-only
3152        assert_eq!(
3153            r.detections.len(),
3154            1,
3155            "generate:true keeps detection output"
3156        );
3157    }
3158
3159    // =========================================================================
3160    // Suppression + Reset combined
3161    // =========================================================================
3162
3163    #[test]
3164    fn test_suppress_and_reset_combined() {
3165        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3166        let config = CorrelationConfig {
3167            suppress: Some(5),
3168            action_on_match: CorrelationAction::Reset,
3169            ..Default::default()
3170        };
3171        let mut engine = CorrelationEngine::new(config);
3172        engine.add_collection(&collection).unwrap();
3173
3174        let ev = json!({"EventType": "login", "User": "alice"});
3175        let ts = 1000;
3176
3177        // Hit threshold: fires and resets
3178        engine.process_event_at(&JsonEvent::borrow(&ev), ts);
3179        engine.process_event_at(&JsonEvent::borrow(&ev), ts + 1);
3180        let r3 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 2);
3181        assert_eq!(r3.correlations.len(), 1, "fires on 3rd event");
3182
3183        // Push 3 more events quickly (state was reset, so new count → 3)
3184        // but suppress window hasn't expired (ts+2 + 5 = ts+7)
3185        engine.process_event_at(&JsonEvent::borrow(&ev), ts + 3);
3186        engine.process_event_at(&JsonEvent::borrow(&ev), ts + 4);
3187        let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 5);
3188        assert!(
3189            r.correlations.is_empty(),
3190            "threshold met again but still suppressed"
3191        );
3192
3193        // After suppress expires (at ts+8, which is ts+2+6 > suppress=5),
3194        // the accumulated events from step 2 (ts+3,4,5) still satisfy gte:3,
3195        // so the first event after expiry fires immediately and resets.
3196        let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 8);
3197        assert_eq!(
3198            r.correlations.len(),
3199            1,
3200            "fires after suppress expires (accumulated events + new one)"
3201        );
3202
3203        // State was reset again at ts+8, suppress window now ts+8..ts+13.
3204        // Need 3 new events to fire, and suppress must expire.
3205        engine.process_event_at(&JsonEvent::borrow(&ev), ts + 9);
3206        engine.process_event_at(&JsonEvent::borrow(&ev), ts + 10);
3207        let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 11);
3208        assert!(
3209            r.correlations.is_empty(),
3210            "threshold met but suppress window hasn't expired (ts+11 - ts+8 = 3 < 5)"
3211        );
3212    }
3213
3214    // =========================================================================
3215    // No suppression (default behavior preserved)
3216    // =========================================================================
3217
3218    #[test]
3219    fn test_no_suppression_fires_every_event() {
3220        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3221        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3222        engine.add_collection(&collection).unwrap();
3223
3224        let ev = json!({"EventType": "login", "User": "alice"});
3225        let ts = 1000;
3226
3227        engine.process_event_at(&JsonEvent::borrow(&ev), ts);
3228        engine.process_event_at(&JsonEvent::borrow(&ev), ts + 1);
3229        let r3 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 2);
3230        assert_eq!(r3.correlations.len(), 1);
3231
3232        // Without suppression, 4th event should also fire
3233        let r4 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 3);
3234        assert_eq!(
3235            r4.correlations.len(),
3236            1,
3237            "no suppression: fires on every event after threshold"
3238        );
3239
3240        let r5 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 4);
3241        assert_eq!(r5.correlations.len(), 1, "still fires");
3242    }
3243
3244    // =========================================================================
3245    // Custom attribute → engine config tests
3246    // =========================================================================
3247
3248    fn yaml_str_attrs<const N: usize>(
3249        pairs: [(&str, &str); N],
3250    ) -> std::collections::HashMap<String, serde_yaml::Value> {
3251        pairs
3252            .into_iter()
3253            .map(|(k, v)| (k.to_string(), serde_yaml::Value::String(v.to_string())))
3254            .collect()
3255    }
3256
3257    #[test]
3258    fn test_custom_attr_timestamp_field() {
3259        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3260        let attrs = yaml_str_attrs([("rsigma.timestamp_field", "time")]);
3261        engine.apply_custom_attributes(&attrs);
3262
3263        assert_eq!(
3264            engine.config.timestamp_fields[0], "time",
3265            "rsigma.timestamp_field should be prepended"
3266        );
3267        // Defaults should still be there after the custom one
3268        assert!(
3269            engine
3270                .config
3271                .timestamp_fields
3272                .contains(&"@timestamp".to_string())
3273        );
3274    }
3275
3276    #[test]
3277    fn test_custom_attr_timestamp_field_no_duplicates() {
3278        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3279        let attrs = yaml_str_attrs([("rsigma.timestamp_field", "time")]);
3280        // Apply twice — should not duplicate
3281        engine.apply_custom_attributes(&attrs);
3282        engine.apply_custom_attributes(&attrs);
3283
3284        let count = engine
3285            .config
3286            .timestamp_fields
3287            .iter()
3288            .filter(|f| *f == "time")
3289            .count();
3290        assert_eq!(count, 1, "should not duplicate timestamp_field entries");
3291    }
3292
3293    #[test]
3294    fn test_custom_attr_suppress() {
3295        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3296        assert!(engine.config.suppress.is_none());
3297
3298        let attrs = yaml_str_attrs([("rsigma.suppress", "5m")]);
3299        engine.apply_custom_attributes(&attrs);
3300
3301        assert_eq!(engine.config.suppress, Some(300));
3302    }
3303
3304    #[test]
3305    fn test_custom_attr_suppress_does_not_override_cli() {
3306        let config = CorrelationConfig {
3307            suppress: Some(60), // CLI set to 60s
3308            ..Default::default()
3309        };
3310        let mut engine = CorrelationEngine::new(config);
3311
3312        let attrs = yaml_str_attrs([("rsigma.suppress", "5m")]);
3313        engine.apply_custom_attributes(&attrs);
3314
3315        assert_eq!(
3316            engine.config.suppress,
3317            Some(60),
3318            "CLI suppress should not be overridden by custom attribute"
3319        );
3320    }
3321
3322    #[test]
3323    fn test_custom_attr_action() {
3324        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3325        assert_eq!(engine.config.action_on_match, CorrelationAction::Alert);
3326
3327        let attrs = yaml_str_attrs([("rsigma.action", "reset")]);
3328        engine.apply_custom_attributes(&attrs);
3329
3330        assert_eq!(engine.config.action_on_match, CorrelationAction::Reset);
3331    }
3332
3333    #[test]
3334    fn test_custom_attr_action_does_not_override_cli() {
3335        let config = CorrelationConfig {
3336            action_on_match: CorrelationAction::Reset, // CLI set to reset
3337            ..Default::default()
3338        };
3339        let mut engine = CorrelationEngine::new(config);
3340
3341        let attrs = yaml_str_attrs([("rsigma.action", "alert")]);
3342        engine.apply_custom_attributes(&attrs);
3343
3344        assert_eq!(
3345            engine.config.action_on_match,
3346            CorrelationAction::Reset,
3347            "CLI action should not be overridden by custom attribute"
3348        );
3349    }
3350
3351    #[test]
3352    fn test_custom_attr_timestamp_field_used_for_extraction() {
3353        // The event has "time" but not "@timestamp" or "timestamp"
3354        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3355        let mut config = CorrelationConfig::default();
3356        // Prepend "event_time" to simulate --timestamp-field
3357        config.timestamp_fields.insert(0, "event_time".to_string());
3358        let mut engine = CorrelationEngine::new(config);
3359        engine.add_collection(&collection).unwrap();
3360
3361        // Event with "event_time" field
3362        let ev = json!({
3363            "EventType": "login",
3364            "User": "alice",
3365            "event_time": "2026-02-11T12:00:00Z"
3366        });
3367        let result = engine.process_event(&JsonEvent::borrow(&ev));
3368
3369        // The detection should match, and timestamp should be ~1739275200 (2026-02-11)
3370        assert!(!result.detections.is_empty() || result.correlations.is_empty());
3371        // The key test: ensure the engine extracted the event timestamp, not Utc::now.
3372        // If it used Utc::now, the test would still pass but the timestamp would be
3373        // wildly different. We verify by checking the extracted value directly.
3374        let ts = engine
3375            .extract_event_timestamp(&JsonEvent::borrow(&ev))
3376            .expect("should extract timestamp");
3377        assert!(
3378            ts > 1_700_000_000 && ts < 1_800_000_000,
3379            "timestamp should be ~2026 epoch, got {ts}"
3380        );
3381    }
3382
3383    // =========================================================================
3384    // Cycle detection
3385    // =========================================================================
3386
3387    #[test]
3388    fn test_correlation_cycle_direct() {
3389        // Two correlations that reference each other: A -> B -> A
3390        let yaml = r#"
3391title: detection rule
3392id: det-rule
3393logsource:
3394    product: test
3395detection:
3396    selection:
3397        action: click
3398    condition: selection
3399level: low
3400---
3401title: correlation A
3402id: corr-a
3403correlation:
3404    type: event_count
3405    rules:
3406        - corr-b
3407    group-by:
3408        - User
3409    timespan: 5m
3410    condition:
3411        gte: 2
3412level: high
3413---
3414title: correlation B
3415id: corr-b
3416correlation:
3417    type: event_count
3418    rules:
3419        - corr-a
3420    group-by:
3421        - User
3422    timespan: 5m
3423    condition:
3424        gte: 2
3425level: high
3426"#;
3427        let collection = parse_sigma_yaml(yaml).unwrap();
3428        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3429        let result = engine.add_collection(&collection);
3430        assert!(result.is_err(), "should detect direct cycle");
3431        let err = result.unwrap_err().to_string();
3432        assert!(err.contains("cycle"), "error should mention cycle: {err}");
3433        assert!(
3434            err.contains("corr-a") && err.contains("corr-b"),
3435            "error should name both correlations: {err}"
3436        );
3437    }
3438
3439    #[test]
3440    fn test_correlation_cycle_self() {
3441        // A correlation that references itself
3442        let yaml = r#"
3443title: detection rule
3444id: det-rule
3445logsource:
3446    product: test
3447detection:
3448    selection:
3449        action: click
3450    condition: selection
3451level: low
3452---
3453title: self-ref correlation
3454id: self-corr
3455correlation:
3456    type: event_count
3457    rules:
3458        - self-corr
3459    group-by:
3460        - User
3461    timespan: 5m
3462    condition:
3463        gte: 2
3464level: high
3465"#;
3466        let collection = parse_sigma_yaml(yaml).unwrap();
3467        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3468        let result = engine.add_collection(&collection);
3469        assert!(result.is_err(), "should detect self-referencing cycle");
3470        let err = result.unwrap_err().to_string();
3471        assert!(err.contains("cycle"), "error should mention cycle: {err}");
3472        assert!(
3473            err.contains("self-corr"),
3474            "error should name the correlation: {err}"
3475        );
3476    }
3477
3478    #[test]
3479    fn test_correlation_no_cycle_valid_chain() {
3480        // Valid chain: detection -> corr-A -> corr-B (no cycle)
3481        let yaml = r#"
3482title: detection rule
3483id: det-rule
3484logsource:
3485    product: test
3486detection:
3487    selection:
3488        action: click
3489    condition: selection
3490level: low
3491---
3492title: correlation A
3493id: corr-a
3494correlation:
3495    type: event_count
3496    rules:
3497        - det-rule
3498    group-by:
3499        - User
3500    timespan: 5m
3501    condition:
3502        gte: 2
3503level: high
3504---
3505title: correlation B
3506id: corr-b
3507correlation:
3508    type: event_count
3509    rules:
3510        - corr-a
3511    group-by:
3512        - User
3513    timespan: 5m
3514    condition:
3515        gte: 2
3516level: high
3517"#;
3518        let collection = parse_sigma_yaml(yaml).unwrap();
3519        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3520        let result = engine.add_collection(&collection);
3521        assert!(
3522            result.is_ok(),
3523            "valid chain should not be rejected: {result:?}"
3524        );
3525    }
3526
3527    #[test]
3528    fn test_correlation_cycle_transitive() {
3529        // Transitive cycle: A -> B -> C -> A
3530        let yaml = r#"
3531title: detection rule
3532id: det-rule
3533logsource:
3534    product: test
3535detection:
3536    selection:
3537        action: click
3538    condition: selection
3539level: low
3540---
3541title: correlation A
3542id: corr-a
3543correlation:
3544    type: event_count
3545    rules:
3546        - corr-c
3547    group-by:
3548        - User
3549    timespan: 5m
3550    condition:
3551        gte: 2
3552level: high
3553---
3554title: correlation B
3555id: corr-b
3556correlation:
3557    type: event_count
3558    rules:
3559        - corr-a
3560    group-by:
3561        - User
3562    timespan: 5m
3563    condition:
3564        gte: 2
3565level: high
3566---
3567title: correlation C
3568id: corr-c
3569correlation:
3570    type: event_count
3571    rules:
3572        - corr-b
3573    group-by:
3574        - User
3575    timespan: 5m
3576    condition:
3577        gte: 2
3578level: high
3579"#;
3580        let collection = parse_sigma_yaml(yaml).unwrap();
3581        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3582        let result = engine.add_collection(&collection);
3583        assert!(result.is_err(), "should detect transitive cycle");
3584        let err = result.unwrap_err().to_string();
3585        assert!(err.contains("cycle"), "error should mention cycle: {err}");
3586    }
3587
3588    // =========================================================================
3589    // Correlation event inclusion tests
3590    // =========================================================================
3591
3592    #[test]
3593    fn test_correlation_events_disabled_by_default() {
3594        let yaml = r#"
3595title: Login
3596id: login-rule
3597logsource:
3598    category: auth
3599detection:
3600    selection:
3601        EventType: login
3602    condition: selection
3603---
3604title: Many Logins
3605correlation:
3606    type: event_count
3607    rules:
3608        - login-rule
3609    group-by:
3610        - User
3611    timespan: 60s
3612    condition:
3613        gte: 3
3614level: high
3615"#;
3616        let collection = parse_sigma_yaml(yaml).unwrap();
3617        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3618        engine.add_collection(&collection).unwrap();
3619
3620        for i in 0..3 {
3621            let v = json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i});
3622            let event = JsonEvent::borrow(&v);
3623            let result = engine.process_event_at(&event, 1000 + i);
3624            if i == 2 {
3625                assert_eq!(result.correlations.len(), 1);
3626                // Events should NOT be included by default
3627                assert!(result.correlations[0].events.is_none());
3628            }
3629        }
3630    }
3631
3632    #[test]
3633    fn test_correlation_events_included_when_enabled() {
3634        let yaml = r#"
3635title: Login
3636id: login-rule
3637logsource:
3638    category: auth
3639detection:
3640    selection:
3641        EventType: login
3642    condition: selection
3643---
3644title: Many Logins
3645correlation:
3646    type: event_count
3647    rules:
3648        - login-rule
3649    group-by:
3650        - User
3651    timespan: 60s
3652    condition:
3653        gte: 3
3654level: high
3655"#;
3656        let collection = parse_sigma_yaml(yaml).unwrap();
3657        let config = CorrelationConfig {
3658            correlation_event_mode: CorrelationEventMode::Full,
3659            max_correlation_events: 10,
3660            ..Default::default()
3661        };
3662        let mut engine = CorrelationEngine::new(config);
3663        engine.add_collection(&collection).unwrap();
3664
3665        let events_sent: Vec<serde_json::Value> = (0..3)
3666            .map(|i| json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i}))
3667            .collect();
3668
3669        let mut corr_result = None;
3670        for (i, ev) in events_sent.iter().enumerate() {
3671            let event = JsonEvent::borrow(ev);
3672            let result = engine.process_event_at(&event, 1000 + i as i64);
3673            if !result.correlations.is_empty() {
3674                corr_result = Some(result);
3675            }
3676        }
3677
3678        let result = corr_result.expect("correlation should have fired");
3679        let corr = &result.correlations[0];
3680
3681        // Events should be included
3682        let events = corr.events.as_ref().expect("events should be present");
3683        assert_eq!(
3684            events.len(),
3685            3,
3686            "all 3 contributing events should be stored"
3687        );
3688
3689        // Verify all sent events are present
3690        for (i, event) in events.iter().enumerate() {
3691            assert_eq!(event["EventType"], "login");
3692            assert_eq!(event["User"], "admin");
3693            assert_eq!(event["@timestamp"], 1000 + i as i64);
3694        }
3695    }
3696
3697    #[test]
3698    fn test_correlation_events_max_cap() {
3699        let yaml = r#"
3700title: Login
3701id: login-rule
3702logsource:
3703    category: auth
3704detection:
3705    selection:
3706        EventType: login
3707    condition: selection
3708---
3709title: Many Logins
3710correlation:
3711    type: event_count
3712    rules:
3713        - login-rule
3714    group-by:
3715        - User
3716    timespan: 60s
3717    condition:
3718        gte: 5
3719level: high
3720"#;
3721        let collection = parse_sigma_yaml(yaml).unwrap();
3722        let config = CorrelationConfig {
3723            correlation_event_mode: CorrelationEventMode::Full,
3724            max_correlation_events: 3, // only keep last 3
3725            ..Default::default()
3726        };
3727        let mut engine = CorrelationEngine::new(config);
3728        engine.add_collection(&collection).unwrap();
3729
3730        let mut corr_result = None;
3731        for i in 0..5 {
3732            let v = json!({"EventType": "login", "User": "admin", "idx": i});
3733            let event = JsonEvent::borrow(&v);
3734            let result = engine.process_event_at(&event, 1000 + i);
3735            if !result.correlations.is_empty() {
3736                corr_result = Some(result);
3737            }
3738        }
3739
3740        let result = corr_result.expect("correlation should have fired");
3741        let events = result.correlations[0]
3742            .events
3743            .as_ref()
3744            .expect("events should be present");
3745
3746        // Only the last 3 events should be retained (cap = 3)
3747        assert_eq!(events.len(), 3);
3748        assert_eq!(events[0]["idx"], 2);
3749        assert_eq!(events[1]["idx"], 3);
3750        assert_eq!(events[2]["idx"], 4);
3751    }
3752
3753    #[test]
3754    fn test_correlation_events_with_reset_action() {
3755        let yaml = r#"
3756title: Login
3757id: login-rule
3758logsource:
3759    category: auth
3760detection:
3761    selection:
3762        EventType: login
3763    condition: selection
3764---
3765title: Many Logins
3766correlation:
3767    type: event_count
3768    rules:
3769        - login-rule
3770    group-by:
3771        - User
3772    timespan: 60s
3773    condition:
3774        gte: 2
3775level: high
3776"#;
3777        let collection = parse_sigma_yaml(yaml).unwrap();
3778        let config = CorrelationConfig {
3779            correlation_event_mode: CorrelationEventMode::Full,
3780            action_on_match: CorrelationAction::Reset,
3781            ..Default::default()
3782        };
3783        let mut engine = CorrelationEngine::new(config);
3784        engine.add_collection(&collection).unwrap();
3785
3786        // First round: 2 events -> fires
3787        for i in 0..2 {
3788            let v = json!({"EventType": "login", "User": "admin", "round": 1, "idx": i});
3789            let event = JsonEvent::borrow(&v);
3790            let result = engine.process_event_at(&event, 1000 + i);
3791            if i == 1 {
3792                assert_eq!(result.correlations.len(), 1);
3793                let events = result.correlations[0].events.as_ref().unwrap();
3794                assert_eq!(events.len(), 2);
3795            }
3796        }
3797
3798        // After reset, event buffer should be cleared.
3799        // Second round: need 2 more events to fire again
3800        let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 0});
3801        let event = JsonEvent::borrow(&v);
3802        let result = engine.process_event_at(&event, 1010);
3803        assert!(
3804            result.correlations.is_empty(),
3805            "should not fire with only 1 event after reset"
3806        );
3807
3808        let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 1});
3809        let event = JsonEvent::borrow(&v);
3810        let result = engine.process_event_at(&event, 1011);
3811        assert_eq!(result.correlations.len(), 1);
3812        let events = result.correlations[0].events.as_ref().unwrap();
3813        assert_eq!(events.len(), 2);
3814        // Should only have round 2 events
3815        assert_eq!(events[0]["round"], 2);
3816        assert_eq!(events[1]["round"], 2);
3817    }
3818
3819    #[test]
3820    fn test_correlation_events_with_set_include() {
3821        let yaml = r#"
3822title: Login
3823id: login-rule
3824logsource:
3825    category: auth
3826detection:
3827    selection:
3828        EventType: login
3829    condition: selection
3830---
3831title: Many Logins
3832correlation:
3833    type: event_count
3834    rules:
3835        - login-rule
3836    group-by:
3837        - User
3838    timespan: 60s
3839    condition:
3840        gte: 2
3841level: high
3842"#;
3843        let collection = parse_sigma_yaml(yaml).unwrap();
3844        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3845        engine.add_collection(&collection).unwrap();
3846
3847        // Enable via setter
3848        engine.set_correlation_event_mode(CorrelationEventMode::Full);
3849
3850        for i in 0..2 {
3851            let v = json!({"EventType": "login", "User": "admin"});
3852            let event = JsonEvent::borrow(&v);
3853            let result = engine.process_event_at(&event, 1000 + i);
3854            if i == 1 {
3855                assert_eq!(result.correlations.len(), 1);
3856                assert!(result.correlations[0].events.is_some());
3857                assert_eq!(result.correlations[0].events.as_ref().unwrap().len(), 2);
3858            }
3859        }
3860    }
3861
3862    #[test]
3863    fn test_correlation_events_eviction_syncs_with_window() {
3864        let yaml = r#"
3865title: Login
3866id: login-rule
3867logsource:
3868    category: auth
3869detection:
3870    selection:
3871        EventType: login
3872    condition: selection
3873---
3874title: Many Logins
3875correlation:
3876    type: event_count
3877    rules:
3878        - login-rule
3879    group-by:
3880        - User
3881    timespan: 10s
3882    condition:
3883        gte: 3
3884level: high
3885"#;
3886        let collection = parse_sigma_yaml(yaml).unwrap();
3887        let config = CorrelationConfig {
3888            correlation_event_mode: CorrelationEventMode::Full,
3889            max_correlation_events: 100,
3890            ..Default::default()
3891        };
3892        let mut engine = CorrelationEngine::new(config);
3893        engine.add_collection(&collection).unwrap();
3894
3895        // Push 2 events at ts=1000,1001 — within the 10s window
3896        for i in 0..2 {
3897            let v = json!({"EventType": "login", "User": "admin", "idx": i});
3898            let event = JsonEvent::borrow(&v);
3899            engine.process_event_at(&event, 1000 + i);
3900        }
3901
3902        // Push 1 more event at ts=1015 — the first 2 events are now outside the
3903        // 10s window (cutoff = 1015 - 10 = 1005)
3904        let v = json!({"EventType": "login", "User": "admin", "idx": 2});
3905        let event = JsonEvent::borrow(&v);
3906        let result = engine.process_event_at(&event, 1015);
3907        // Should NOT fire: only 1 event in window (the one at ts=1015)
3908        assert!(
3909            result.correlations.is_empty(),
3910            "should not fire — old events evicted"
3911        );
3912
3913        // Push 2 more to reach threshold
3914        for i in 3..5 {
3915            let v = json!({"EventType": "login", "User": "admin", "idx": i});
3916            let event = JsonEvent::borrow(&v);
3917            let result = engine.process_event_at(&event, 1016 + i - 3);
3918            if i == 4 {
3919                assert_eq!(result.correlations.len(), 1);
3920                let events = result.correlations[0].events.as_ref().unwrap();
3921                // Should have events from ts=1015,1016,1017 — not the old ones
3922                assert_eq!(events.len(), 3);
3923                for ev in events {
3924                    assert!(ev["idx"].as_i64().unwrap() >= 2);
3925                }
3926            }
3927        }
3928    }
3929
3930    #[test]
3931    fn test_event_buffer_monitoring() {
3932        let yaml = r#"
3933title: Login
3934id: login-rule
3935logsource:
3936    category: auth
3937detection:
3938    selection:
3939        EventType: login
3940    condition: selection
3941---
3942title: Many Logins
3943correlation:
3944    type: event_count
3945    rules:
3946        - login-rule
3947    group-by:
3948        - User
3949    timespan: 60s
3950    condition:
3951        gte: 100
3952level: high
3953"#;
3954        let collection = parse_sigma_yaml(yaml).unwrap();
3955        let config = CorrelationConfig {
3956            correlation_event_mode: CorrelationEventMode::Full,
3957            ..Default::default()
3958        };
3959        let mut engine = CorrelationEngine::new(config);
3960        engine.add_collection(&collection).unwrap();
3961
3962        assert_eq!(engine.event_buffer_count(), 0);
3963        assert_eq!(engine.event_buffer_bytes(), 0);
3964
3965        // Push some events
3966        for i in 0..5 {
3967            let v = json!({"EventType": "login", "User": "admin"});
3968            let event = JsonEvent::borrow(&v);
3969            engine.process_event_at(&event, 1000 + i);
3970        }
3971
3972        assert_eq!(engine.event_buffer_count(), 1); // one group key
3973        assert!(engine.event_buffer_bytes() > 0);
3974    }
3975
3976    #[test]
3977    fn test_correlation_refs_mode_basic() {
3978        let yaml = r#"
3979title: Login
3980id: login-rule
3981logsource:
3982    category: auth
3983detection:
3984    selection:
3985        EventType: login
3986    condition: selection
3987---
3988title: Many Logins
3989correlation:
3990    type: event_count
3991    rules:
3992        - login-rule
3993    group-by:
3994        - User
3995    timespan: 60s
3996    condition:
3997        gte: 3
3998level: high
3999"#;
4000        let collection = parse_sigma_yaml(yaml).unwrap();
4001        let config = CorrelationConfig {
4002            correlation_event_mode: CorrelationEventMode::Refs,
4003            max_correlation_events: 10,
4004            ..Default::default()
4005        };
4006        let mut engine = CorrelationEngine::new(config);
4007        engine.add_collection(&collection).unwrap();
4008
4009        let mut corr_result = None;
4010        for i in 0..3 {
4011            let v = json!({"EventType": "login", "User": "admin", "id": format!("evt-{i}"), "@timestamp": 1000 + i});
4012            let event = JsonEvent::borrow(&v);
4013            let result = engine.process_event_at(&event, 1000 + i);
4014            if !result.correlations.is_empty() {
4015                corr_result = Some(result.correlations[0].clone());
4016            }
4017        }
4018
4019        let result = corr_result.expect("correlation should have fired");
4020        // In refs mode: events should be None, event_refs should be Some
4021        assert!(
4022            result.events.is_none(),
4023            "Full events should not be included in refs mode"
4024        );
4025        let refs = result
4026            .event_refs
4027            .expect("event_refs should be present in refs mode");
4028        assert_eq!(refs.len(), 3);
4029        assert_eq!(refs[0].timestamp, 1000);
4030        assert_eq!(refs[0].id, Some("evt-0".to_string()));
4031        assert_eq!(refs[1].id, Some("evt-1".to_string()));
4032        assert_eq!(refs[2].id, Some("evt-2".to_string()));
4033    }
4034
4035    #[test]
4036    fn test_correlation_refs_mode_no_id_field() {
4037        let yaml = r#"
4038title: Login
4039id: login-rule
4040logsource:
4041    category: auth
4042detection:
4043    selection:
4044        EventType: login
4045    condition: selection
4046---
4047title: Many Logins
4048correlation:
4049    type: event_count
4050    rules:
4051        - login-rule
4052    group-by:
4053        - User
4054    timespan: 60s
4055    condition:
4056        gte: 2
4057level: high
4058"#;
4059        let collection = parse_sigma_yaml(yaml).unwrap();
4060        let config = CorrelationConfig {
4061            correlation_event_mode: CorrelationEventMode::Refs,
4062            ..Default::default()
4063        };
4064        let mut engine = CorrelationEngine::new(config);
4065        engine.add_collection(&collection).unwrap();
4066
4067        let mut corr_result = None;
4068        for i in 0..2 {
4069            let v = json!({"EventType": "login", "User": "admin"});
4070            let event = JsonEvent::borrow(&v);
4071            let result = engine.process_event_at(&event, 1000 + i);
4072            if !result.correlations.is_empty() {
4073                corr_result = Some(result.correlations[0].clone());
4074            }
4075        }
4076
4077        let result = corr_result.expect("correlation should have fired");
4078        let refs = result.event_refs.expect("event_refs should be present");
4079        // No ID field in events → id should be None
4080        for r in &refs {
4081            assert_eq!(r.id, None);
4082        }
4083    }
4084
4085    #[test]
4086    fn test_per_correlation_custom_attributes_from_yaml() {
4087        let yaml = r#"
4088title: Login
4089id: login-rule
4090logsource:
4091    category: auth
4092detection:
4093    selection:
4094        EventType: login
4095    condition: selection
4096---
4097title: Many Logins
4098custom_attributes:
4099    rsigma.correlation_event_mode: refs
4100    rsigma.max_correlation_events: "5"
4101correlation:
4102    type: event_count
4103    rules:
4104        - login-rule
4105    group-by:
4106        - User
4107    timespan: 60s
4108    condition:
4109        gte: 3
4110level: high
4111"#;
4112        let collection = parse_sigma_yaml(yaml).unwrap();
4113        // Engine mode is None (default), but per-correlation should override to Refs
4114        let config = CorrelationConfig::default();
4115        let mut engine = CorrelationEngine::new(config);
4116        engine.add_collection(&collection).unwrap();
4117
4118        let mut corr_result = None;
4119        for i in 0..3 {
4120            let v = json!({"EventType": "login", "User": "admin", "id": format!("e{i}")});
4121            let event = JsonEvent::borrow(&v);
4122            let result = engine.process_event_at(&event, 1000 + i);
4123            if !result.correlations.is_empty() {
4124                corr_result = Some(result.correlations[0].clone());
4125            }
4126        }
4127
4128        let result = corr_result.expect("correlation should fire with per-correlation refs mode");
4129        // Per-correlation override should enable refs mode even though engine default is None
4130        assert!(result.events.is_none());
4131        let refs = result
4132            .event_refs
4133            .expect("event_refs via per-correlation override");
4134        assert_eq!(refs.len(), 3);
4135        assert_eq!(refs[0].id, Some("e0".to_string()));
4136    }
4137
4138    #[test]
4139    fn test_per_correlation_custom_attr_suppress_and_action() {
4140        let yaml = r#"
4141title: Login
4142id: login-rule
4143logsource:
4144    category: auth
4145detection:
4146    selection:
4147        EventType: login
4148    condition: selection
4149---
4150title: Many Logins
4151custom_attributes:
4152    rsigma.suppress: 10s
4153    rsigma.action: reset
4154correlation:
4155    type: event_count
4156    rules:
4157        - login-rule
4158    group-by:
4159        - User
4160    timespan: 60s
4161    condition:
4162        gte: 2
4163level: high
4164"#;
4165        let collection = parse_sigma_yaml(yaml).unwrap();
4166        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
4167        engine.add_collection(&collection).unwrap();
4168
4169        // Verify the compiled correlation has per-rule overrides
4170        assert_eq!(engine.correlations[0].suppress_secs, Some(10));
4171        assert_eq!(
4172            engine.correlations[0].action,
4173            Some(CorrelationAction::Reset)
4174        );
4175    }
4176
4177    #[test]
4178    fn test_process_with_detections_matches_process_event_at() {
4179        let yaml = r#"
4180title: Login Failure
4181id: login-fail
4182logsource:
4183    category: auth
4184detection:
4185    selection:
4186        EventType: login_failure
4187    condition: selection
4188---
4189title: Brute Force
4190correlation:
4191    type: event_count
4192    rules:
4193        - login-fail
4194    group-by:
4195        - User
4196    timespan: 60s
4197    condition:
4198        gte: 3
4199level: high
4200"#;
4201        let collection = parse_sigma_yaml(yaml).unwrap();
4202
4203        // Run with process_event_at
4204        let mut engine1 = CorrelationEngine::new(CorrelationConfig::default());
4205        engine1.add_collection(&collection).unwrap();
4206
4207        let events: Vec<serde_json::Value> = (0..5)
4208            .map(|i| json!({"EventType": "login_failure", "User": "admin", "@timestamp": format!("2025-01-01T00:00:0{}Z", i + 1)}))
4209            .collect();
4210
4211        let results1: Vec<ProcessResult> = events
4212            .iter()
4213            .enumerate()
4214            .map(|(i, v)| {
4215                let e = JsonEvent::borrow(v);
4216                engine1.process_event_at(&e, 1000 + i as i64)
4217            })
4218            .collect();
4219
4220        // Run with evaluate + process_with_detections
4221        let mut engine2 = CorrelationEngine::new(CorrelationConfig::default());
4222        engine2.add_collection(&collection).unwrap();
4223
4224        let results2: Vec<ProcessResult> = events
4225            .iter()
4226            .enumerate()
4227            .map(|(i, v)| {
4228                let e = JsonEvent::borrow(v);
4229                let detections = engine2.evaluate(&e);
4230                engine2.process_with_detections(&e, detections, 1000 + i as i64)
4231            })
4232            .collect();
4233
4234        // Same number of results
4235        assert_eq!(results1.len(), results2.len());
4236        for (r1, r2) in results1.iter().zip(results2.iter()) {
4237            assert_eq!(r1.detections.len(), r2.detections.len());
4238            assert_eq!(r1.correlations.len(), r2.correlations.len());
4239        }
4240    }
4241
4242    #[test]
4243    fn test_process_batch_matches_sequential() {
4244        let yaml = r#"
4245title: Login Failure
4246id: login-fail-batch
4247logsource:
4248    category: auth
4249detection:
4250    selection:
4251        EventType: login_failure
4252    condition: selection
4253---
4254title: Brute Force Batch
4255correlation:
4256    type: event_count
4257    rules:
4258        - login-fail-batch
4259    group-by:
4260        - User
4261    timespan: 60s
4262    condition:
4263        gte: 3
4264level: high
4265"#;
4266        let collection = parse_sigma_yaml(yaml).unwrap();
4267
4268        let event_values: Vec<serde_json::Value> = (0..5)
4269            .map(|i| json!({"EventType": "login_failure", "User": "admin", "@timestamp": format!("2025-01-01T00:00:0{}Z", i + 1)}))
4270            .collect();
4271
4272        // Sequential
4273        let mut engine1 = CorrelationEngine::new(CorrelationConfig::default());
4274        engine1.add_collection(&collection).unwrap();
4275        let sequential: Vec<ProcessResult> = event_values
4276            .iter()
4277            .enumerate()
4278            .map(|(i, v)| {
4279                let e = JsonEvent::borrow(v);
4280                engine1.process_event_at(&e, 1000 + i as i64)
4281            })
4282            .collect();
4283
4284        // Batch
4285        let mut engine2 = CorrelationEngine::new(CorrelationConfig::default());
4286        engine2.add_collection(&collection).unwrap();
4287        let events: Vec<JsonEvent> = event_values.iter().map(JsonEvent::borrow).collect();
4288        let refs: Vec<&JsonEvent> = events.iter().collect();
4289        let batch = engine2.process_batch(&refs);
4290
4291        assert_eq!(sequential.len(), batch.len());
4292        for (seq, bat) in sequential.iter().zip(batch.iter()) {
4293            assert_eq!(seq.detections.len(), bat.detections.len());
4294            assert_eq!(seq.correlations.len(), bat.correlations.len());
4295        }
4296    }
4297
4298    #[test]
4299    fn test_correlation_result_custom_attributes() {
4300        let yaml = r#"
4301title: Login
4302id: login-cra
4303logsource:
4304    category: auth
4305detection:
4306    selection:
4307        EventType: login
4308    condition: selection
4309level: low
4310---
4311title: Many Logins
4312my_custom_field: hello
4313priority: 9
4314nested:
4315    key: value
4316correlation:
4317    type: event_count
4318    rules:
4319        - login-cra
4320    group-by:
4321        - User
4322    timespan: 60s
4323    condition:
4324        gte: 2
4325level: high
4326"#;
4327        let collection = parse_sigma_yaml(yaml).unwrap();
4328        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
4329        engine.add_collection(&collection).unwrap();
4330
4331        let base_ts = 1000i64;
4332        for i in 0..2 {
4333            let v = json!({"EventType": "login", "User": "alice"});
4334            let event = JsonEvent::borrow(&v);
4335            let result = engine.process_event_at(&event, base_ts + i * 10);
4336
4337            if i == 1 {
4338                assert_eq!(result.correlations.len(), 1);
4339                let corr = &result.correlations[0];
4340                assert_eq!(corr.rule_title, "Many Logins");
4341                assert_eq!(
4342                    corr.custom_attributes.get("my_custom_field"),
4343                    Some(&serde_json::Value::String("hello".to_string()))
4344                );
4345                assert_eq!(
4346                    corr.custom_attributes.get("priority"),
4347                    Some(&serde_json::json!(9))
4348                );
4349                let nested = corr.custom_attributes.get("nested").unwrap();
4350                assert_eq!(nested.get("key"), Some(&serde_json::json!("value")));
4351
4352                assert!(!corr.custom_attributes.contains_key("title"));
4353                assert!(!corr.custom_attributes.contains_key("correlation"));
4354                assert!(!corr.custom_attributes.contains_key("level"));
4355            }
4356        }
4357    }
4358
4359    #[test]
4360    fn test_detection_result_custom_attributes() {
4361        let yaml = r#"
4362title: Login Detection
4363logsource:
4364    category: auth
4365detection:
4366    selection:
4367        EventType: login
4368    condition: selection
4369level: low
4370my_detection_tag: important
4371score: 42
4372"#;
4373        let collection = parse_sigma_yaml(yaml).unwrap();
4374        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
4375        engine.add_collection(&collection).unwrap();
4376
4377        let v = json!({"EventType": "login"});
4378        let event = JsonEvent::borrow(&v);
4379        let result = engine.process_event(&event);
4380
4381        assert_eq!(result.detections.len(), 1);
4382        let det = &result.detections[0];
4383        assert_eq!(
4384            det.custom_attributes.get("my_detection_tag"),
4385            Some(&serde_json::Value::String("important".to_string()))
4386        );
4387        assert_eq!(
4388            det.custom_attributes.get("score"),
4389            Some(&serde_json::json!(42))
4390        );
4391        assert!(!det.custom_attributes.contains_key("title"));
4392    }
4393}