Skip to main content

rsigma_eval/
correlation_engine.rs

1//! Stateful correlation engine with time-windowed aggregation.
2//!
3//! `CorrelationEngine` wraps the stateless `Engine` and adds support for
4//! Sigma correlation rules: `event_count`, `value_count`, `temporal`,
5//! `temporal_ordered`, `value_sum`, `value_avg`, `value_percentile`,
6//! and `value_median`.
7//!
8//! # Architecture
9//!
10//! 1. Events are first evaluated against detection rules (stateless)
11//! 2. Detection matches update correlation window state (stateful)
12//! 3. When a correlation condition is met, a `CorrelationResult` is emitted
13//! 4. Correlation results can chain into higher-level correlations
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use chrono::{DateTime, TimeZone, Utc};
19use serde::Serialize;
20
21use rsigma_parser::{CorrelationRule, CorrelationType, Level, SigmaCollection, SigmaRule};
22
23use crate::correlation::{
24    CompiledCorrelation, EventBuffer, EventRef, EventRefBuffer, GroupKey, WindowState,
25    compile_correlation,
26};
27use crate::engine::Engine;
28use crate::error::{EvalError, Result};
29use crate::event::{Event, EventValue};
30use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
31use crate::result::MatchResult;
32
33// =============================================================================
34// Configuration
35// =============================================================================
36
37/// What to do with window state after a correlation fires.
38///
39/// This is an engine-level default that can be overridden per-correlation
40/// via the `rsigma.action` custom attribute set in processing pipelines.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
42#[serde(rename_all = "snake_case")]
43pub enum CorrelationAction {
44    /// Keep window state as-is after firing (current / default behavior).
45    /// Subsequent events that still satisfy the condition will re-fire.
46    #[default]
47    Alert,
48    /// Clear the window state for the firing group key after emitting the alert.
49    /// The threshold must be met again from scratch before the next alert.
50    Reset,
51}
52
53impl std::str::FromStr for CorrelationAction {
54    type Err = String;
55    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
56        match s {
57            "alert" => Ok(CorrelationAction::Alert),
58            "reset" => Ok(CorrelationAction::Reset),
59            _ => Err(format!(
60                "Unknown correlation action: {s} (expected 'alert' or 'reset')"
61            )),
62        }
63    }
64}
65
66/// How to include events in correlation results.
67///
68/// Can be overridden per-correlation via the `rsigma.correlation_event_mode`
69/// custom attribute.
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
71#[serde(rename_all = "snake_case")]
72pub enum CorrelationEventMode {
73    /// Don't include events (default). Zero memory overhead.
74    #[default]
75    None,
76    /// Include full event bodies, individually compressed with deflate.
77    /// Typical cost: 100–1000 bytes per event.
78    Full,
79    /// Include only event references (timestamp + optional ID).
80    /// Minimal memory: ~40 bytes per event.
81    Refs,
82}
83
84impl std::str::FromStr for CorrelationEventMode {
85    type Err = String;
86    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
87        match s.to_lowercase().as_str() {
88            "none" | "off" | "false" => Ok(CorrelationEventMode::None),
89            "full" | "true" => Ok(CorrelationEventMode::Full),
90            "refs" | "references" => Ok(CorrelationEventMode::Refs),
91            _ => Err(format!(
92                "Unknown correlation event mode: {s} (expected 'none', 'full', or 'refs')"
93            )),
94        }
95    }
96}
97
98impl std::fmt::Display for CorrelationEventMode {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        match self {
101            CorrelationEventMode::None => write!(f, "none"),
102            CorrelationEventMode::Full => write!(f, "full"),
103            CorrelationEventMode::Refs => write!(f, "refs"),
104        }
105    }
106}
107
108/// Behavior when no timestamp field is found or parseable in an event.
109#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
110pub enum TimestampFallback {
111    /// Use wall-clock time (`Utc::now()`). Good for real-time streaming.
112    #[default]
113    WallClock,
114    /// Skip the event from correlation processing. Detections still fire,
115    /// but the event does not update any correlation state. Recommended for
116    /// batch/replay of historical logs where wall-clock time is meaningless.
117    Skip,
118}
119
120/// Configuration for the correlation engine.
121///
122/// Provides engine-level defaults that mirror pySigma backend optional arguments.
123/// Per-correlation overrides can be set via `SetCustomAttribute` pipeline
124/// transformations using the `rsigma.*` attribute namespace.
125#[derive(Debug, Clone)]
126pub struct CorrelationConfig {
127    /// Field names to try for timestamp extraction, in order of priority.
128    ///
129    /// The engine will try each field until one yields a parseable timestamp.
130    /// If none succeed, the `timestamp_fallback` policy applies.
131    pub timestamp_fields: Vec<String>,
132
133    /// What to do when no timestamp can be extracted from an event.
134    ///
135    /// Default: `WallClock` (use `Utc::now()`).
136    pub timestamp_fallback: TimestampFallback,
137
138    /// Maximum number of state entries (across all correlations and groups)
139    /// before aggressive eviction is triggered. Prevents unbounded memory growth.
140    ///
141    /// Default: 100_000.
142    pub max_state_entries: usize,
143
144    /// Default suppression window in seconds.
145    ///
146    /// After a correlation fires for a `(correlation, group_key)`, suppress
147    /// re-alerts for this duration. `None` means no suppression (every
148    /// condition-satisfying event produces an alert).
149    ///
150    /// Can be overridden per-correlation via the `rsigma.suppress` custom attribute.
151    pub suppress: Option<u64>,
152
153    /// Default action to take after a correlation fires.
154    ///
155    /// Can be overridden per-correlation via the `rsigma.action` custom attribute.
156    pub action_on_match: CorrelationAction,
157
158    /// Whether to emit detection-level matches for rules that are only
159    /// referenced by correlations (where `generate: false`).
160    ///
161    /// Default: `true` (emit all detection matches).
162    /// Set to `false` to suppress detection output for correlation-only rules.
163    pub emit_detections: bool,
164
165    /// How to include contributing events in correlation results.
166    ///
167    /// - `None` (default): no event storage, zero overhead.
168    /// - `Full`: events are deflate-compressed and decompressed on output.
169    /// - `Refs`: only timestamps + event IDs are stored (minimal memory).
170    ///
171    /// Can be overridden per-correlation via `rsigma.correlation_event_mode`.
172    pub correlation_event_mode: CorrelationEventMode,
173
174    /// Maximum number of events to store per (correlation, group_key) window
175    /// when `correlation_event_mode` is not `None`.
176    ///
177    /// Bounds memory at: `max_correlation_events × cost_per_event × active_groups`.
178    /// Default: 10.
179    pub max_correlation_events: usize,
180}
181
182impl Default for CorrelationConfig {
183    fn default() -> Self {
184        CorrelationConfig {
185            timestamp_fields: vec![
186                "@timestamp".to_string(),
187                "timestamp".to_string(),
188                "EventTime".to_string(),
189                "TimeCreated".to_string(),
190                "eventTime".to_string(),
191            ],
192            timestamp_fallback: TimestampFallback::default(),
193            max_state_entries: 100_000,
194            suppress: None,
195            action_on_match: CorrelationAction::default(),
196            emit_detections: true,
197            correlation_event_mode: CorrelationEventMode::default(),
198            max_correlation_events: 10,
199        }
200    }
201}
202
203// =============================================================================
204// Result types
205// =============================================================================
206
207/// Combined result from processing a single event.
208#[derive(Debug, Clone, Serialize)]
209pub struct ProcessResult {
210    /// Detection rule matches (stateless, immediate).
211    pub detections: Vec<MatchResult>,
212    /// Correlation rule matches (stateful, accumulated).
213    pub correlations: Vec<CorrelationResult>,
214}
215
216/// The result of a correlation rule firing.
217#[derive(Debug, Clone, Serialize)]
218pub struct CorrelationResult {
219    /// Title of the correlation rule.
220    pub rule_title: String,
221    /// ID of the correlation rule (if present).
222    pub rule_id: Option<String>,
223    /// Severity level.
224    pub level: Option<Level>,
225    /// Tags from the correlation rule.
226    pub tags: Vec<String>,
227    /// Type of correlation.
228    pub correlation_type: CorrelationType,
229    /// Group-by field names and their values for this match.
230    pub group_key: Vec<(String, String)>,
231    /// The aggregated value that triggered the condition (count, sum, avg, etc.).
232    pub aggregated_value: f64,
233    /// The time window in seconds.
234    pub timespan_secs: u64,
235    /// Full event bodies, included when `correlation_event_mode` is `Full`.
236    ///
237    /// Contains up to `max_correlation_events` recently stored window events.
238    /// Events are decompressed from deflate storage on output.
239    #[serde(skip_serializing_if = "Option::is_none")]
240    pub events: Option<Vec<serde_json::Value>>,
241    /// Lightweight event references, included when `correlation_event_mode` is `Refs`.
242    ///
243    /// Contains up to `max_correlation_events` timestamp + optional ID pairs.
244    #[serde(skip_serializing_if = "Option::is_none")]
245    pub event_refs: Option<Vec<EventRef>>,
246    /// Custom attributes from the original Sigma correlation rule (merged
247    /// view of arbitrary top-level keys, the `custom_attributes:` block, and
248    /// any pipeline-applied overrides).
249    #[serde(skip_serializing_if = "HashMap::is_empty")]
250    pub custom_attributes: Arc<HashMap<String, serde_json::Value>>,
251}
252
253// =============================================================================
254// Correlation Engine
255// =============================================================================
256
257/// Stateful correlation engine.
258///
259/// Wraps the stateless `Engine` for detection rules and adds time-windowed
260/// correlation on top. Supports all 7 Sigma correlation types and chaining.
261pub struct CorrelationEngine {
262    /// Inner stateless detection engine.
263    engine: Engine,
264    /// Compiled correlation rules.
265    correlations: Vec<CompiledCorrelation>,
266    /// Maps rule ID/name -> indices into `correlations` that reference it.
267    /// This allows quick lookup: "which correlations care about rule X?"
268    rule_index: HashMap<String, Vec<usize>>,
269    /// Maps detection rule index -> (rule_id, rule_name) for reverse lookup.
270    /// Used to find which correlations a detection match triggers.
271    rule_ids: Vec<(Option<String>, Option<String>)>,
272    /// Per-(correlation_index, group_key) window state.
273    state: HashMap<(usize, GroupKey), WindowState>,
274    /// Last alert timestamp per (correlation_index, group_key) for suppression.
275    last_alert: HashMap<(usize, GroupKey), i64>,
276    /// Per-(correlation_index, group_key) compressed event buffer (`Full` mode).
277    event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
278    /// Per-(correlation_index, group_key) event reference buffer (`Refs` mode).
279    event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
280    /// Set of detection rule IDs/names that are "correlation-only"
281    /// (referenced by correlations where `generate == false`).
282    /// Used to filter detection output when `config.emit_detections == false`.
283    correlation_only_rules: std::collections::HashSet<String>,
284    /// Configuration.
285    config: CorrelationConfig,
286    /// Processing pipelines applied to rules during add_rule.
287    pipelines: Vec<Pipeline>,
288}
289
290impl CorrelationEngine {
291    /// Create a new correlation engine with the given configuration.
292    pub fn new(config: CorrelationConfig) -> Self {
293        CorrelationEngine {
294            engine: Engine::new(),
295            correlations: Vec::new(),
296            rule_index: HashMap::new(),
297            rule_ids: Vec::new(),
298            state: HashMap::new(),
299            last_alert: HashMap::new(),
300            event_buffers: HashMap::new(),
301            event_ref_buffers: HashMap::new(),
302            correlation_only_rules: std::collections::HashSet::new(),
303            config,
304            pipelines: Vec::new(),
305        }
306    }
307
308    /// Add a pipeline to the engine.
309    ///
310    /// Pipelines are applied to rules during `add_rule` / `add_collection`.
311    pub fn add_pipeline(&mut self, pipeline: Pipeline) {
312        self.pipelines.push(pipeline);
313        self.pipelines.sort_by_key(|p| p.priority);
314    }
315
316    /// Set global `include_event` on the inner detection engine.
317    pub fn set_include_event(&mut self, include: bool) {
318        self.engine.set_include_event(include);
319    }
320
321    /// Set the global correlation event mode.
322    ///
323    /// - `None`: no event storage (default)
324    /// - `Full`: compressed event bodies
325    /// - `Refs`: lightweight timestamp + ID references
326    pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
327        self.config.correlation_event_mode = mode;
328    }
329
330    /// Set the maximum number of events to store per correlation window group.
331    /// Only meaningful when `correlation_event_mode` is not `None`.
332    pub fn set_max_correlation_events(&mut self, max: usize) {
333        self.config.max_correlation_events = max;
334    }
335
336    /// Add a single detection rule.
337    ///
338    /// If pipelines are set, the rule is cloned and transformed before compilation.
339    /// The inner engine receives the already-transformed rule directly (not through
340    /// its own pipeline, to avoid double transformation).
341    pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
342        if self.pipelines.is_empty() {
343            self.apply_custom_attributes(&rule.custom_attributes);
344            self.rule_ids.push((rule.id.clone(), rule.name.clone()));
345            self.engine.add_rule(rule)?;
346        } else {
347            let mut transformed = rule.clone();
348            apply_pipelines(&self.pipelines, &mut transformed)?;
349            self.apply_custom_attributes(&transformed.custom_attributes);
350            self.rule_ids
351                .push((transformed.id.clone(), transformed.name.clone()));
352            // Use compile_rule + add_compiled_rule to bypass inner engine's pipelines
353            let compiled = crate::compiler::compile_rule(&transformed)?;
354            self.engine.add_compiled_rule(compiled);
355        }
356        Ok(())
357    }
358
359    /// Read `rsigma.*` custom attributes from a rule and apply them to the
360    /// engine configuration.  This allows pipelines to influence engine
361    /// behaviour via `SetCustomAttribute` transformations — the same pattern
362    /// used by pySigma backends (e.g. pySigma-backend-loki).
363    ///
364    /// Supported attributes:
365    /// - `rsigma.timestamp_field` — prepends a field name to the timestamp
366    ///   extraction priority list so the correlation engine can find the
367    ///   event timestamp in non-standard field names.
368    /// - `rsigma.suppress` — sets the default suppression window (e.g. `5m`).
369    ///   Only applied when the CLI did not already set `--suppress`.
370    /// - `rsigma.action` — sets the default post-fire action (`alert`/`reset`).
371    ///   Only applied when the CLI did not already set `--action`.
372    fn apply_custom_attributes(
373        &mut self,
374        attrs: &std::collections::HashMap<String, serde_yaml::Value>,
375    ) {
376        // rsigma.timestamp_field — prepend to priority list, skip duplicates
377        if let Some(field) = attrs.get("rsigma.timestamp_field").and_then(|v| v.as_str())
378            && !self.config.timestamp_fields.iter().any(|f| f == field)
379        {
380            self.config.timestamp_fields.insert(0, field.to_string());
381        }
382
383        // rsigma.suppress — only when CLI didn't already set one
384        if let Some(val) = attrs.get("rsigma.suppress").and_then(|v| v.as_str())
385            && self.config.suppress.is_none()
386            && let Ok(ts) = rsigma_parser::Timespan::parse(val)
387        {
388            self.config.suppress = Some(ts.seconds);
389        }
390
391        // rsigma.action — only when CLI left it at the default (Alert)
392        if let Some(val) = attrs.get("rsigma.action").and_then(|v| v.as_str())
393            && self.config.action_on_match == CorrelationAction::Alert
394            && let Ok(a) = val.parse::<CorrelationAction>()
395        {
396            self.config.action_on_match = a;
397        }
398    }
399
400    /// Add a single correlation rule.
401    pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
402        let owned;
403        let effective = if self.pipelines.is_empty() {
404            corr
405        } else {
406            owned = {
407                let mut c = corr.clone();
408                apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
409                c
410            };
411            &owned
412        };
413
414        // Apply engine-level custom attributes from the (possibly transformed)
415        // correlation rule (e.g. rsigma.timestamp_field).
416        self.apply_custom_attributes(&effective.custom_attributes);
417
418        let compiled = compile_correlation(effective)?;
419        let idx = self.correlations.len();
420
421        // Index by each referenced rule ID/name
422        for rule_ref in &compiled.rule_refs {
423            self.rule_index
424                .entry(rule_ref.clone())
425                .or_default()
426                .push(idx);
427        }
428
429        // Track correlation-only rules (generate == false is the default)
430        if !compiled.generate {
431            for rule_ref in &compiled.rule_refs {
432                self.correlation_only_rules.insert(rule_ref.clone());
433            }
434        }
435
436        self.correlations.push(compiled);
437        Ok(())
438    }
439
440    /// Add all rules and correlations from a parsed collection.
441    ///
442    /// Detection rules are added first (so they're available for correlation
443    /// references), then correlation rules.
444    pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
445        for rule in &collection.rules {
446            self.add_rule(rule)?;
447        }
448        // Apply filter rules to the inner engine's detection rules
449        for filter in &collection.filters {
450            self.engine.apply_filter(filter)?;
451        }
452        for corr in &collection.correlations {
453            self.add_correlation(corr)?;
454        }
455        self.validate_rule_refs()?;
456        self.detect_correlation_cycles()?;
457        Ok(())
458    }
459
460    /// Validate that every correlation's `rule_refs` resolve to at least one
461    /// known detection rule (by ID or name) or another correlation (by ID or name).
462    fn validate_rule_refs(&self) -> Result<()> {
463        let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
464
465        for (id, name) in &self.rule_ids {
466            if let Some(id) = id {
467                known.insert(id.as_str());
468            }
469            if let Some(name) = name {
470                known.insert(name.as_str());
471            }
472        }
473        for corr in &self.correlations {
474            if let Some(ref id) = corr.id {
475                known.insert(id.as_str());
476            }
477            if let Some(ref name) = corr.name {
478                known.insert(name.as_str());
479            }
480        }
481
482        for corr in &self.correlations {
483            for rule_ref in &corr.rule_refs {
484                if !known.contains(rule_ref.as_str()) {
485                    return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
486                }
487            }
488        }
489        Ok(())
490    }
491
492    /// Detect cycles in the correlation reference graph.
493    ///
494    /// Builds a directed graph where each correlation (identified by its id/name)
495    /// has edges to the correlations it references via `rule_refs`. Uses DFS with
496    /// a "gray/black" coloring scheme to detect back-edges (cycles).
497    ///
498    /// Returns `Err(EvalError::CorrelationCycle)` if a cycle is found.
499    fn detect_correlation_cycles(&self) -> Result<()> {
500        // Build a set of all correlation identifiers (id and/or name)
501        let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
502        for (idx, corr) in self.correlations.iter().enumerate() {
503            if let Some(ref id) = corr.id {
504                corr_identifiers.insert(id.as_str(), idx);
505            }
506            if let Some(ref name) = corr.name {
507                corr_identifiers.insert(name.as_str(), idx);
508            }
509        }
510
511        // Build adjacency list: corr index → set of corr indices it references
512        let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
513        for (idx, corr) in self.correlations.iter().enumerate() {
514            for rule_ref in &corr.rule_refs {
515                if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
516                    adj[idx].push(target_idx);
517                }
518            }
519        }
520
521        // DFS cycle detection with three states: white (unvisited), gray (in stack), black (done)
522        let mut state = vec![0u8; self.correlations.len()]; // 0=white, 1=gray, 2=black
523        let mut path: Vec<usize> = Vec::new();
524
525        for start in 0..self.correlations.len() {
526            if state[start] == 0
527                && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
528            {
529                let names: Vec<String> = cycle
530                    .iter()
531                    .map(|&i| {
532                        self.correlations[i]
533                            .id
534                            .as_deref()
535                            .or(self.correlations[i].name.as_deref())
536                            .unwrap_or(&self.correlations[i].title)
537                            .to_string()
538                    })
539                    .collect();
540                return Err(crate::error::EvalError::CorrelationCycle(
541                    names.join(" -> "),
542                ));
543            }
544        }
545        Ok(())
546    }
547
548    /// DFS helper that returns the cycle path if a back-edge is found.
549    fn dfs_find_cycle(
550        node: usize,
551        adj: &[Vec<usize>],
552        state: &mut [u8],
553        path: &mut Vec<usize>,
554    ) -> Option<Vec<usize>> {
555        state[node] = 1; // gray
556        path.push(node);
557
558        for &next in &adj[node] {
559            if state[next] == 1 {
560                // Back-edge found — extract cycle from path
561                if let Some(pos) = path.iter().position(|&n| n == next) {
562                    let mut cycle = path[pos..].to_vec();
563                    cycle.push(next); // close the cycle
564                    return Some(cycle);
565                }
566            }
567            if state[next] == 0
568                && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
569            {
570                return Some(cycle);
571            }
572        }
573
574        path.pop();
575        state[node] = 2; // black
576        None
577    }
578
579    /// Process an event, extracting the timestamp from configured event fields.
580    ///
581    /// When no timestamp field is found, the `timestamp_fallback` policy applies:
582    /// - `WallClock`: use `Utc::now()` (good for real-time streaming)
583    /// - `Skip`: return detections only, skip correlation state updates
584    pub fn process_event(&mut self, event: &impl Event) -> ProcessResult {
585        let all_detections = self.engine.evaluate(event);
586
587        let ts = match self.extract_event_timestamp(event) {
588            Some(ts) => ts,
589            None => match self.config.timestamp_fallback {
590                TimestampFallback::WallClock => Utc::now().timestamp(),
591                TimestampFallback::Skip => {
592                    // Still run detection (stateless), but skip correlation
593                    let detections = self.filter_detections(all_detections);
594                    return ProcessResult {
595                        detections,
596                        correlations: Vec::new(),
597                    };
598                }
599            },
600        };
601        self.process_with_detections(event, all_detections, ts)
602    }
603
604    /// Process an event with an explicit Unix epoch timestamp (seconds).
605    ///
606    /// The timestamp is clamped to `[0, i64::MAX / 2]` to prevent overflow
607    /// when adding timespan durations internally.
608    pub fn process_event_at(&mut self, event: &impl Event, timestamp_secs: i64) -> ProcessResult {
609        let all_detections = self.engine.evaluate(event);
610        self.process_with_detections(event, all_detections, timestamp_secs)
611    }
612
613    /// Process an event with pre-computed detection results.
614    ///
615    /// Enables external parallelism: callers can run detection (via
616    /// [`evaluate`](Self::evaluate)) in parallel, then feed results here
617    /// sequentially for stateful correlation.
618    pub fn process_with_detections(
619        &mut self,
620        event: &impl Event,
621        all_detections: Vec<MatchResult>,
622        timestamp_secs: i64,
623    ) -> ProcessResult {
624        let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
625
626        // Memory management — evict before adding new state to enforce limit
627        if self.state.len() >= self.config.max_state_entries {
628            self.evict_all(timestamp_secs);
629        }
630
631        // Feed detection matches into correlations
632        let mut correlations = Vec::new();
633        self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
634
635        // Chain — correlation results may trigger higher-level correlations
636        self.chain_correlations(&correlations, timestamp_secs);
637
638        // Filter detections by generate flag
639        let detections = self.filter_detections(all_detections);
640
641        ProcessResult {
642            detections,
643            correlations,
644        }
645    }
646
647    /// Run stateless detection only (no correlation), delegating to the inner engine.
648    ///
649    /// Takes `&self` so it can be called concurrently from multiple threads
650    /// (e.g. via `rayon::par_iter`) while the mutable correlation phase runs
651    /// sequentially afterwards.
652    pub fn evaluate(&self, event: &impl Event) -> Vec<MatchResult> {
653        self.engine.evaluate(event)
654    }
655
656    /// Process a batch of events: parallel detection, then sequential correlation.
657    ///
658    /// When the `parallel` feature is enabled, the stateless detection phase runs
659    /// concurrently via rayon. Timestamp extraction also runs in the parallel
660    /// phase (it borrows `&self.config` immutably). After `collect()` releases the
661    /// immutable borrows, each event's pre-computed detections are fed into the
662    /// stateful correlation engine sequentially.
663    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
664        // Borrow split: take immutable refs to fields needed for the parallel phase.
665        // These are released by collect() before the sequential &mut self phase.
666        let engine = &self.engine;
667        let ts_fields = &self.config.timestamp_fields;
668
669        let batch_results: Vec<(Vec<MatchResult>, Option<i64>)> = {
670            #[cfg(feature = "parallel")]
671            {
672                use rayon::prelude::*;
673                events
674                    .par_iter()
675                    .map(|e| {
676                        let detections = engine.evaluate(e);
677                        let ts = extract_event_ts(e, ts_fields);
678                        (detections, ts)
679                    })
680                    .collect()
681            }
682            #[cfg(not(feature = "parallel"))]
683            {
684                events
685                    .iter()
686                    .map(|e| {
687                        let detections = engine.evaluate(e);
688                        let ts = extract_event_ts(e, ts_fields);
689                        (detections, ts)
690                    })
691                    .collect()
692            }
693        };
694
695        // Sequential correlation phase
696        let mut results = Vec::with_capacity(events.len());
697        for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
698            match ts_opt {
699                Some(ts) => {
700                    results.push(self.process_with_detections(event, detections, ts));
701                }
702                None => match self.config.timestamp_fallback {
703                    TimestampFallback::WallClock => {
704                        let ts = Utc::now().timestamp();
705                        results.push(self.process_with_detections(event, detections, ts));
706                    }
707                    TimestampFallback::Skip => {
708                        // Still return detection results, but skip correlation
709                        let detections = self.filter_detections(detections);
710                        results.push(ProcessResult {
711                            detections,
712                            correlations: Vec::new(),
713                        });
714                    }
715                },
716            }
717        }
718        results
719    }
720
721    /// Filter detections by the `generate` flag / `emit_detections` config.
722    ///
723    /// If `emit_detections` is false and some rules are correlation-only,
724    /// their detection output is suppressed.
725    fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
726        if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
727            all_detections
728                .into_iter()
729                .filter(|m| {
730                    let id_match = m
731                        .rule_id
732                        .as_ref()
733                        .is_some_and(|id| self.correlation_only_rules.contains(id));
734                    !id_match
735                })
736                .collect()
737        } else {
738            all_detections
739        }
740    }
741
742    /// Feed detection matches into correlation window states.
743    fn feed_detections(
744        &mut self,
745        event: &impl Event,
746        detections: &[MatchResult],
747        ts: i64,
748        out: &mut Vec<CorrelationResult>,
749    ) {
750        // Collect all (corr_idx, rule_id, rule_name) tuples upfront to avoid
751        // borrow conflicts between self.rule_ids and self.update_correlation.
752        let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
753
754        for det in detections {
755            // Use the MatchResult's rule_id to find the original rule's ID/name.
756            // We also look up by rule_id in our rule_ids table for the name.
757            let (rule_id, rule_name) = self.find_rule_identity(det);
758
759            // Collect correlation indices that reference this rule
760            let mut corr_indices = Vec::new();
761            if let Some(ref id) = rule_id
762                && let Some(indices) = self.rule_index.get(id)
763            {
764                corr_indices.extend(indices);
765            }
766            if let Some(ref name) = rule_name
767                && let Some(indices) = self.rule_index.get(name)
768            {
769                corr_indices.extend(indices);
770            }
771
772            corr_indices.sort_unstable();
773            corr_indices.dedup();
774
775            for &corr_idx in &corr_indices {
776                work.push((corr_idx, rule_id.clone(), rule_name.clone()));
777            }
778        }
779
780        for (corr_idx, rule_id, rule_name) in work {
781            self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
782        }
783    }
784
785    /// Find the (id, name) for a detection match by searching our rule_ids table.
786    fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
787        // First, try to find by matching rule_id in our table
788        if let Some(ref match_id) = det.rule_id {
789            for (id, name) in &self.rule_ids {
790                if id.as_deref() == Some(match_id.as_str()) {
791                    return (id.clone(), name.clone());
792                }
793            }
794        }
795        // Fall back to using just the MatchResult's rule_id
796        (det.rule_id.clone(), None)
797    }
798
799    /// Resolve the event mode for a given correlation.
800    fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
801        let corr = &self.correlations[corr_idx];
802        corr.event_mode
803            .unwrap_or(self.config.correlation_event_mode)
804    }
805
806    /// Resolve the max events cap for a given correlation.
807    fn resolve_max_events(&self, corr_idx: usize) -> usize {
808        let corr = &self.correlations[corr_idx];
809        corr.max_events
810            .unwrap_or(self.config.max_correlation_events)
811    }
812
813    /// Update a single correlation's state and check its condition.
814    fn update_correlation(
815        &mut self,
816        corr_idx: usize,
817        event: &impl Event,
818        ts: i64,
819        rule_id: &Option<String>,
820        rule_name: &Option<String>,
821        out: &mut Vec<CorrelationResult>,
822    ) {
823        // Borrow the correlation by reference — no cloning needed.  Rust allows
824        // simultaneous &self.correlations and &mut self.state / &mut self.last_alert
825        // because they are disjoint struct fields.
826        let corr = &self.correlations[corr_idx];
827        let corr_type = corr.correlation_type;
828        let timespan = corr.timespan_secs;
829        let level = corr.level;
830        let suppress_secs = corr.suppress_secs.or(self.config.suppress);
831        let action = corr.action.unwrap_or(self.config.action_on_match);
832        let event_mode = self.resolve_event_mode(corr_idx);
833        let max_events = self.resolve_max_events(corr_idx);
834
835        // Determine the rule_ref strings for alias resolution and temporal tracking.
836        let mut ref_strs: Vec<&str> = Vec::new();
837        if let Some(id) = rule_id.as_deref() {
838            ref_strs.push(id);
839        }
840        if let Some(name) = rule_name.as_deref() {
841            ref_strs.push(name);
842        }
843        let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
844
845        // Extract group key
846        let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
847
848        // Get or create window state
849        let state_key = (corr_idx, group_key.clone());
850        let state = self
851            .state
852            .entry(state_key.clone())
853            .or_insert_with(|| WindowState::new_for(corr_type));
854
855        // Evict expired entries
856        let cutoff = ts - timespan as i64;
857        state.evict(cutoff);
858
859        // Push the new event into the state
860        match corr_type {
861            CorrelationType::EventCount => {
862                state.push_event_count(ts);
863            }
864            CorrelationType::ValueCount => {
865                if let Some(ref fields) = corr.condition.field
866                    && let Some(field_name) = fields.first()
867                    && let Some(val) = event.get_field(field_name)
868                    && let Some(s) = value_to_string_for_count(&val)
869                {
870                    state.push_value_count(ts, s);
871                }
872            }
873            CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
874                state.push_temporal(ts, rule_ref);
875            }
876            CorrelationType::ValueSum
877            | CorrelationType::ValueAvg
878            | CorrelationType::ValuePercentile
879            | CorrelationType::ValueMedian => {
880                if let Some(ref fields) = corr.condition.field
881                    && let Some(field_name) = fields.first()
882                    && let Some(val) = event.get_field(field_name)
883                    && let Some(n) = value_to_f64_ev(&val)
884                {
885                    state.push_numeric(ts, n);
886                }
887            }
888        }
889
890        // Push event into buffer based on event mode
891        match event_mode {
892            CorrelationEventMode::Full => {
893                let buf = self
894                    .event_buffers
895                    .entry(state_key.clone())
896                    .or_insert_with(|| EventBuffer::new(max_events));
897                buf.evict(cutoff);
898                let json = event.to_json();
899                buf.push(ts, &json);
900            }
901            CorrelationEventMode::Refs => {
902                let buf = self
903                    .event_ref_buffers
904                    .entry(state_key.clone())
905                    .or_insert_with(|| EventRefBuffer::new(max_events));
906                buf.evict(cutoff);
907                let json = event.to_json();
908                buf.push(ts, &json);
909            }
910            CorrelationEventMode::None => {}
911        }
912
913        // Check condition — after this, `state` is no longer used (NLL drops the borrow).
914        let fired = state.check_condition(
915            &corr.condition,
916            corr_type,
917            &corr.rule_refs,
918            corr.extended_expr.as_ref(),
919        );
920
921        if let Some(agg_value) = fired {
922            let alert_key = (corr_idx, group_key.clone());
923
924            // Suppression check: skip if we've already alerted within the suppress window
925            let suppressed = if let Some(suppress) = suppress_secs {
926                if let Some(&last_ts) = self.last_alert.get(&alert_key) {
927                    (ts - last_ts) < suppress as i64
928                } else {
929                    false
930                }
931            } else {
932                false
933            };
934
935            if !suppressed {
936                // Retrieve stored events / refs based on mode
937                let (events, event_refs) = match event_mode {
938                    CorrelationEventMode::Full => {
939                        let stored = self
940                            .event_buffers
941                            .get(&alert_key)
942                            .map(|buf| buf.decompress_all())
943                            .unwrap_or_default();
944                        (Some(stored), None)
945                    }
946                    CorrelationEventMode::Refs => {
947                        let stored = self
948                            .event_ref_buffers
949                            .get(&alert_key)
950                            .map(|buf| buf.refs())
951                            .unwrap_or_default();
952                        (None, Some(stored))
953                    }
954                    CorrelationEventMode::None => (None, None),
955                };
956
957                // Only clone title/id/tags when we actually produce output
958                let corr = &self.correlations[corr_idx];
959                let result = CorrelationResult {
960                    rule_title: corr.title.clone(),
961                    rule_id: corr.id.clone(),
962                    level,
963                    tags: corr.tags.clone(),
964                    correlation_type: corr_type,
965                    group_key: group_key.to_pairs(&corr.group_by),
966                    aggregated_value: agg_value,
967                    timespan_secs: timespan,
968                    events,
969                    event_refs,
970                    custom_attributes: corr.custom_attributes.clone(),
971                };
972                out.push(result);
973
974                // Record alert time for suppression
975                self.last_alert.insert(alert_key.clone(), ts);
976
977                // Action on match
978                if action == CorrelationAction::Reset {
979                    if let Some(state) = self.state.get_mut(&alert_key) {
980                        state.clear();
981                    }
982                    if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
983                        buf.clear();
984                    }
985                    if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
986                        buf.clear();
987                    }
988                }
989            }
990        }
991    }
992
993    /// Propagate correlation results to higher-level correlations (chaining).
994    ///
995    /// When a correlation fires, any correlation that references it (by ID or name)
996    /// is updated. Limits chain depth to 10 to prevent infinite loops.
997    fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
998        const MAX_CHAIN_DEPTH: usize = 10;
999        let mut pending: Vec<CorrelationResult> = fired.to_vec();
1000        let mut depth = 0;
1001
1002        while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
1003            depth += 1;
1004
1005            // Collect work items: (corr_idx, group_key_pairs, fired_ref)
1006            #[allow(clippy::type_complexity)]
1007            let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
1008            for result in &pending {
1009                if let Some(ref id) = result.rule_id
1010                    && let Some(indices) = self.rule_index.get(id)
1011                {
1012                    let fired_ref = result
1013                        .rule_id
1014                        .as_deref()
1015                        .unwrap_or(&result.rule_title)
1016                        .to_string();
1017                    for &corr_idx in indices {
1018                        work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
1019                    }
1020                }
1021            }
1022
1023            let mut next_pending = Vec::new();
1024            for (corr_idx, group_key_pairs, fired_ref) in work {
1025                let corr = &self.correlations[corr_idx];
1026                let corr_type = corr.correlation_type;
1027                let timespan = corr.timespan_secs;
1028                let level = corr.level;
1029
1030                let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
1031                let state_key = (corr_idx, group_key.clone());
1032                let state = self
1033                    .state
1034                    .entry(state_key)
1035                    .or_insert_with(|| WindowState::new_for(corr_type));
1036
1037                let cutoff = ts - timespan as i64;
1038                state.evict(cutoff);
1039
1040                match corr_type {
1041                    CorrelationType::EventCount => {
1042                        state.push_event_count(ts);
1043                    }
1044                    CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
1045                        state.push_temporal(ts, &fired_ref);
1046                    }
1047                    _ => {
1048                        state.push_event_count(ts);
1049                    }
1050                }
1051
1052                let fired = state.check_condition(
1053                    &corr.condition,
1054                    corr_type,
1055                    &corr.rule_refs,
1056                    corr.extended_expr.as_ref(),
1057                );
1058
1059                if let Some(agg_value) = fired {
1060                    let corr = &self.correlations[corr_idx];
1061                    next_pending.push(CorrelationResult {
1062                        rule_title: corr.title.clone(),
1063                        rule_id: corr.id.clone(),
1064                        level,
1065                        tags: corr.tags.clone(),
1066                        correlation_type: corr_type,
1067                        group_key: group_key.to_pairs(&corr.group_by),
1068                        aggregated_value: agg_value,
1069                        timespan_secs: timespan,
1070                        // Chained correlations don't include events (they aggregate
1071                        // over correlation results, not raw events)
1072                        events: None,
1073                        event_refs: None,
1074                        custom_attributes: corr.custom_attributes.clone(),
1075                    });
1076                }
1077            }
1078
1079            pending = next_pending;
1080        }
1081
1082        if !pending.is_empty() {
1083            log::warn!(
1084                "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
1085                 {} pending result(s) were not propagated further. \
1086                 This may indicate a cycle in correlation references.",
1087                pending.len()
1088            );
1089        }
1090    }
1091
1092    // =========================================================================
1093    // Timestamp extraction
1094    // =========================================================================
1095
1096    /// Extract a Unix epoch timestamp (seconds) from an event.
1097    ///
1098    /// Tries each configured timestamp field in order. Supports:
1099    /// - Numeric values (epoch seconds, or epoch millis if > 1e12)
1100    /// - ISO 8601 strings (e.g., "2024-07-10T12:30:00Z")
1101    ///
1102    /// Returns `None` if no field yields a valid timestamp.
1103    fn extract_event_timestamp(&self, event: &impl Event) -> Option<i64> {
1104        for field_name in &self.config.timestamp_fields {
1105            if let Some(val) = event.get_field(field_name)
1106                && let Some(ts) = parse_timestamp_value(&val)
1107            {
1108                return Some(ts);
1109            }
1110        }
1111        None
1112    }
1113
1114    // =========================================================================
1115    // State management
1116    // =========================================================================
1117
1118    /// Manually evict all expired state entries.
1119    pub fn evict_expired(&mut self, now_secs: i64) {
1120        self.evict_all(now_secs);
1121    }
1122
1123    /// Evict expired entries and remove empty states.
1124    fn evict_all(&mut self, now_secs: i64) {
1125        // Phase 1: Time-based eviction — remove entries outside their correlation window
1126        let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
1127
1128        self.state.retain(|&(corr_idx, _), state| {
1129            if corr_idx < timespans.len() {
1130                let cutoff = now_secs - timespans[corr_idx] as i64;
1131                state.evict(cutoff);
1132            }
1133            !state.is_empty()
1134        });
1135
1136        // Evict event buffers in sync with window state
1137        self.event_buffers.retain(|&(corr_idx, _), buf| {
1138            if corr_idx < timespans.len() {
1139                let cutoff = now_secs - timespans[corr_idx] as i64;
1140                buf.evict(cutoff);
1141            }
1142            !buf.is_empty()
1143        });
1144        self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
1145            if corr_idx < timespans.len() {
1146                let cutoff = now_secs - timespans[corr_idx] as i64;
1147                buf.evict(cutoff);
1148            }
1149            !buf.is_empty()
1150        });
1151
1152        // Phase 2: Hard cap — if still over limit after time-based eviction (e.g.
1153        // high-cardinality traffic with long windows), drop the stalest entries
1154        // until we're at 90% capacity to avoid evicting on every single event.
1155        if self.state.len() >= self.config.max_state_entries {
1156            let target = self.config.max_state_entries * 9 / 10;
1157            let excess = self.state.len() - target;
1158
1159            // Collect keys with their latest timestamp, sort by oldest first
1160            let mut by_staleness: Vec<_> = self
1161                .state
1162                .iter()
1163                .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1164                .collect();
1165            by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1166
1167            // Drop the oldest entries (and their associated event buffers)
1168            for (key, _) in by_staleness.into_iter().take(excess) {
1169                self.state.remove(&key);
1170                self.last_alert.remove(&key);
1171                self.event_buffers.remove(&key);
1172                self.event_ref_buffers.remove(&key);
1173            }
1174        }
1175
1176        // Phase 3: Evict stale last_alert entries — remove if the suppress window
1177        // has passed or if the corresponding window state no longer exists.
1178        self.last_alert.retain(|key, &mut alert_ts| {
1179            let suppress = if key.0 < self.correlations.len() {
1180                self.correlations[key.0]
1181                    .suppress_secs
1182                    .or(self.config.suppress)
1183                    .unwrap_or(0)
1184            } else {
1185                0
1186            };
1187            (now_secs - alert_ts) < suppress as i64
1188        });
1189    }
1190
1191    /// Number of active state entries (for monitoring).
1192    pub fn state_count(&self) -> usize {
1193        self.state.len()
1194    }
1195
1196    /// Number of detection rules loaded.
1197    pub fn detection_rule_count(&self) -> usize {
1198        self.engine.rule_count()
1199    }
1200
1201    /// Number of correlation rules loaded.
1202    pub fn correlation_rule_count(&self) -> usize {
1203        self.correlations.len()
1204    }
1205
1206    /// Number of active event buffers (for monitoring).
1207    pub fn event_buffer_count(&self) -> usize {
1208        self.event_buffers.len()
1209    }
1210
1211    /// Total compressed bytes across all event buffers (for monitoring).
1212    pub fn event_buffer_bytes(&self) -> usize {
1213        self.event_buffers
1214            .values()
1215            .map(|b| b.compressed_bytes())
1216            .sum()
1217    }
1218
1219    /// Number of active event ref buffers — `Refs` mode (for monitoring).
1220    pub fn event_ref_buffer_count(&self) -> usize {
1221        self.event_ref_buffers.len()
1222    }
1223
1224    /// Access the inner stateless engine.
1225    pub fn engine(&self) -> &Engine {
1226        &self.engine
1227    }
1228
1229    /// Export all mutable correlation state as a serializable snapshot.
1230    ///
1231    /// The snapshot uses stable correlation identifiers (id > name > title)
1232    /// instead of internal indices, so it survives rule reloads as long as
1233    /// the correlation rules keep the same identifiers.
1234    pub fn export_state(&self) -> CorrelationSnapshot {
1235        let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1236        for ((idx, gk), ws) in &self.state {
1237            let corr_id = self.correlation_stable_id(*idx);
1238            windows
1239                .entry(corr_id)
1240                .or_default()
1241                .push((gk.clone(), ws.clone()));
1242        }
1243
1244        let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1245        for ((idx, gk), ts) in &self.last_alert {
1246            let corr_id = self.correlation_stable_id(*idx);
1247            last_alert
1248                .entry(corr_id)
1249                .or_default()
1250                .push((gk.clone(), *ts));
1251        }
1252
1253        let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1254        for ((idx, gk), buf) in &self.event_buffers {
1255            let corr_id = self.correlation_stable_id(*idx);
1256            event_buffers
1257                .entry(corr_id)
1258                .or_default()
1259                .push((gk.clone(), buf.clone()));
1260        }
1261
1262        let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1263            HashMap::new();
1264        for ((idx, gk), buf) in &self.event_ref_buffers {
1265            let corr_id = self.correlation_stable_id(*idx);
1266            event_ref_buffers
1267                .entry(corr_id)
1268                .or_default()
1269                .push((gk.clone(), buf.clone()));
1270        }
1271
1272        CorrelationSnapshot {
1273            version: SNAPSHOT_VERSION,
1274            windows,
1275            last_alert,
1276            event_buffers,
1277            event_ref_buffers,
1278        }
1279    }
1280
1281    /// Import previously exported state, mapping stable identifiers back to
1282    /// current correlation indices. Entries whose identifiers no longer match
1283    /// any loaded correlation are silently dropped.
1284    ///
1285    /// Returns `false` (and imports nothing) if the snapshot version is
1286    /// incompatible with the current schema.
1287    pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1288        if snapshot.version != SNAPSHOT_VERSION {
1289            return false;
1290        }
1291        let id_to_idx = self.build_id_to_index_map();
1292
1293        for (corr_id, groups) in snapshot.windows {
1294            if let Some(&idx) = id_to_idx.get(&corr_id) {
1295                for (gk, ws) in groups {
1296                    self.state.insert((idx, gk), ws);
1297                }
1298            }
1299        }
1300
1301        for (corr_id, groups) in snapshot.last_alert {
1302            if let Some(&idx) = id_to_idx.get(&corr_id) {
1303                for (gk, ts) in groups {
1304                    self.last_alert.insert((idx, gk), ts);
1305                }
1306            }
1307        }
1308
1309        for (corr_id, groups) in snapshot.event_buffers {
1310            if let Some(&idx) = id_to_idx.get(&corr_id) {
1311                for (gk, buf) in groups {
1312                    self.event_buffers.insert((idx, gk), buf);
1313                }
1314            }
1315        }
1316
1317        for (corr_id, groups) in snapshot.event_ref_buffers {
1318            if let Some(&idx) = id_to_idx.get(&corr_id) {
1319                for (gk, buf) in groups {
1320                    self.event_ref_buffers.insert((idx, gk), buf);
1321                }
1322            }
1323        }
1324
1325        true
1326    }
1327
1328    /// Stable identifier for a correlation rule: prefers id, then name, then title.
1329    fn correlation_stable_id(&self, idx: usize) -> String {
1330        let corr = &self.correlations[idx];
1331        corr.id
1332            .clone()
1333            .or_else(|| corr.name.clone())
1334            .unwrap_or_else(|| corr.title.clone())
1335    }
1336
1337    /// Build a reverse map from stable id → current correlation index.
1338    fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1339        self.correlations
1340            .iter()
1341            .enumerate()
1342            .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1343            .collect()
1344    }
1345}
1346
1347/// Current snapshot schema version. Bump when the serialized format changes.
1348const SNAPSHOT_VERSION: u32 = 1;
1349
1350/// Serializable snapshot of all mutable correlation state.
1351///
1352/// Uses stable string identifiers (correlation id/name/title) as keys so the
1353/// snapshot can be restored after a rule reload, even if internal indices change.
1354/// Inner maps use `Vec<(GroupKey, T)>` instead of `HashMap<GroupKey, T>` because
1355/// `GroupKey` cannot be used as a JSON object key.
1356#[derive(Debug, Clone, Serialize, serde::Deserialize)]
1357pub struct CorrelationSnapshot {
1358    /// Schema version — used to detect incompatible snapshots on load.
1359    #[serde(default = "default_snapshot_version")]
1360    pub version: u32,
1361    /// Per-correlation, per-group window state.
1362    pub windows: HashMap<String, Vec<(GroupKey, WindowState)>>,
1363    /// Per-correlation, per-group last alert timestamp (for suppression).
1364    pub last_alert: HashMap<String, Vec<(GroupKey, i64)>>,
1365    /// Per-correlation, per-group compressed event buffers.
1366    pub event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>>,
1367    /// Per-correlation, per-group event reference buffers.
1368    pub event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>>,
1369}
1370
1371fn default_snapshot_version() -> u32 {
1372    1
1373}
1374
1375impl Default for CorrelationEngine {
1376    fn default() -> Self {
1377        Self::new(CorrelationConfig::default())
1378    }
1379}
1380
1381// =============================================================================
1382// Timestamp parsing helpers
1383// =============================================================================
1384
1385/// Extract a timestamp from an event using the given field names.
1386///
1387/// Standalone version of `CorrelationEngine::extract_event_timestamp` for use
1388/// in contexts where borrowing `&self` is not possible (e.g. rayon closures).
1389fn extract_event_ts(event: &impl Event, timestamp_fields: &[String]) -> Option<i64> {
1390    for field_name in timestamp_fields {
1391        if let Some(val) = event.get_field(field_name)
1392            && let Some(ts) = parse_timestamp_value(&val)
1393        {
1394            return Some(ts);
1395        }
1396    }
1397    None
1398}
1399
1400/// Parse an [`EventValue`] as a Unix epoch timestamp in seconds.
1401fn parse_timestamp_value(val: &EventValue) -> Option<i64> {
1402    match val {
1403        EventValue::Int(i) => Some(normalize_epoch(*i)),
1404        EventValue::Float(f) => Some(normalize_epoch(*f as i64)),
1405        EventValue::Str(s) => parse_timestamp_string(s),
1406        _ => None,
1407    }
1408}
1409
1410/// Normalize an epoch value: if it looks like milliseconds (> year 33658),
1411/// convert to seconds.
1412fn normalize_epoch(v: i64) -> i64 {
1413    if v > 1_000_000_000_000 { v / 1000 } else { v }
1414}
1415
1416/// Parse a timestamp string. Tries ISO 8601 with timezone, then without.
1417fn parse_timestamp_string(s: &str) -> Option<i64> {
1418    // Try RFC 3339 / ISO 8601 with timezone
1419    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1420        return Some(dt.timestamp());
1421    }
1422
1423    // Try ISO 8601 without timezone (assume UTC)
1424    // Common formats: "2024-07-10T12:30:00", "2024-07-10 12:30:00"
1425    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1426        return Some(Utc.from_utc_datetime(&naive).timestamp());
1427    }
1428    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1429        return Some(Utc.from_utc_datetime(&naive).timestamp());
1430    }
1431
1432    // Try with fractional seconds
1433    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1434        return Some(Utc.from_utc_datetime(&naive).timestamp());
1435    }
1436    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1437        return Some(Utc.from_utc_datetime(&naive).timestamp());
1438    }
1439
1440    None
1441}
1442
1443/// Convert an [`EventValue`] to a string for value_count purposes.
1444fn value_to_string_for_count(v: &EventValue) -> Option<String> {
1445    match v {
1446        EventValue::Str(s) => Some(s.to_string()),
1447        EventValue::Int(n) => Some(n.to_string()),
1448        EventValue::Float(f) => Some(f.to_string()),
1449        EventValue::Bool(b) => Some(b.to_string()),
1450        EventValue::Null => Some("null".to_string()),
1451        _ => None,
1452    }
1453}
1454
1455/// Convert an [`EventValue`] to f64 for numeric aggregation.
1456fn value_to_f64_ev(v: &EventValue) -> Option<f64> {
1457    v.as_f64()
1458}
1459
1460// =============================================================================
1461// Tests
1462// =============================================================================
1463
1464#[cfg(test)]
1465mod tests {
1466    use super::*;
1467    use crate::event::JsonEvent;
1468    use rsigma_parser::parse_sigma_yaml;
1469    use serde_json::json;
1470
1471    // =========================================================================
1472    // Timestamp parsing
1473    // =========================================================================
1474
1475    #[test]
1476    fn test_parse_timestamp_epoch_secs() {
1477        let val = EventValue::Int(1720612200);
1478        assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1479    }
1480
1481    #[test]
1482    fn test_parse_timestamp_epoch_millis() {
1483        let val = EventValue::Int(1720612200000);
1484        assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1485    }
1486
1487    #[test]
1488    fn test_parse_timestamp_rfc3339() {
1489        let val = EventValue::Str(std::borrow::Cow::Borrowed("2024-07-10T12:30:00Z"));
1490        let ts = parse_timestamp_value(&val).unwrap();
1491        assert_eq!(ts, 1720614600);
1492    }
1493
1494    #[test]
1495    fn test_parse_timestamp_naive() {
1496        let val = EventValue::Str(std::borrow::Cow::Borrowed("2024-07-10T12:30:00"));
1497        let ts = parse_timestamp_value(&val).unwrap();
1498        assert_eq!(ts, 1720614600);
1499    }
1500
1501    #[test]
1502    fn test_parse_timestamp_with_space() {
1503        let val = EventValue::Str(std::borrow::Cow::Borrowed("2024-07-10 12:30:00"));
1504        let ts = parse_timestamp_value(&val).unwrap();
1505        assert_eq!(ts, 1720614600);
1506    }
1507
1508    #[test]
1509    fn test_parse_timestamp_fractional() {
1510        let val = EventValue::Str(std::borrow::Cow::Borrowed("2024-07-10T12:30:00.123Z"));
1511        let ts = parse_timestamp_value(&val).unwrap();
1512        assert_eq!(ts, 1720614600);
1513    }
1514
1515    #[test]
1516    fn test_extract_timestamp_from_event() {
1517        let config = CorrelationConfig {
1518            timestamp_fields: vec!["@timestamp".to_string()],
1519            max_state_entries: 100_000,
1520            ..Default::default()
1521        };
1522        let engine = CorrelationEngine::new(config);
1523
1524        let v = json!({"@timestamp": "2024-07-10T12:30:00Z", "data": "test"});
1525        let event = JsonEvent::borrow(&v);
1526        let ts = engine.extract_event_timestamp(&event);
1527        assert_eq!(ts, Some(1720614600));
1528    }
1529
1530    #[test]
1531    fn test_extract_timestamp_fallback_fields() {
1532        let config = CorrelationConfig {
1533            timestamp_fields: vec![
1534                "@timestamp".to_string(),
1535                "timestamp".to_string(),
1536                "EventTime".to_string(),
1537            ],
1538            max_state_entries: 100_000,
1539            ..Default::default()
1540        };
1541        let engine = CorrelationEngine::new(config);
1542
1543        // First field missing, second field present
1544        let v = json!({"timestamp": 1720613400, "data": "test"});
1545        let event = JsonEvent::borrow(&v);
1546        let ts = engine.extract_event_timestamp(&event);
1547        assert_eq!(ts, Some(1720613400));
1548    }
1549
1550    #[test]
1551    fn test_extract_timestamp_returns_none_when_missing() {
1552        let config = CorrelationConfig {
1553            timestamp_fields: vec!["@timestamp".to_string()],
1554            ..Default::default()
1555        };
1556        let engine = CorrelationEngine::new(config);
1557
1558        let v = json!({"data": "no timestamp here"});
1559        let event = JsonEvent::borrow(&v);
1560        assert_eq!(engine.extract_event_timestamp(&event), None);
1561    }
1562
1563    #[test]
1564    fn test_timestamp_fallback_skip() {
1565        let yaml = r#"
1566title: test rule
1567id: ts-skip-rule
1568logsource:
1569    product: test
1570detection:
1571    selection:
1572        action: click
1573    condition: selection
1574level: low
1575---
1576title: test correlation
1577correlation:
1578    type: event_count
1579    rules:
1580        - ts-skip-rule
1581    group-by:
1582        - User
1583    timespan: 10s
1584    condition:
1585        gte: 2
1586level: high
1587"#;
1588        let collection = parse_sigma_yaml(yaml).unwrap();
1589        let mut engine = CorrelationEngine::new(CorrelationConfig {
1590            timestamp_fallback: TimestampFallback::Skip,
1591            ..Default::default()
1592        });
1593        engine.add_collection(&collection).unwrap();
1594        assert_eq!(engine.correlation_rule_count(), 1);
1595
1596        // Events with no timestamp field — should NOT update correlation state
1597        let v = json!({"action": "click", "User": "alice"});
1598        let event = JsonEvent::borrow(&v);
1599
1600        let r1 = engine.process_event(&event);
1601        assert!(!r1.detections.is_empty(), "detection should still fire");
1602
1603        let r2 = engine.process_event(&event);
1604        assert!(!r2.detections.is_empty(), "detection should still fire");
1605
1606        let r3 = engine.process_event(&event);
1607        assert!(!r3.detections.is_empty(), "detection should still fire");
1608
1609        // No correlations should fire because events were skipped
1610        assert!(r1.correlations.is_empty());
1611        assert!(r2.correlations.is_empty());
1612        assert!(r3.correlations.is_empty());
1613    }
1614
1615    #[test]
1616    fn test_timestamp_fallback_wallclock_default() {
1617        let yaml = r#"
1618title: test rule
1619id: ts-wc-rule
1620logsource:
1621    product: test
1622detection:
1623    selection:
1624        action: click
1625    condition: selection
1626level: low
1627---
1628title: test correlation
1629correlation:
1630    type: event_count
1631    rules:
1632        - ts-wc-rule
1633    group-by:
1634        - User
1635    timespan: 60s
1636    condition:
1637        gte: 2
1638level: high
1639"#;
1640        let collection = parse_sigma_yaml(yaml).unwrap();
1641        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1642        engine.add_collection(&collection).unwrap();
1643        assert_eq!(engine.correlation_rule_count(), 1);
1644
1645        // Events with no timestamp — WallClock fallback means they get Utc::now()
1646        // and should be close enough to correlate (generous 60s window)
1647        let v = json!({"action": "click", "User": "alice"});
1648        let event = JsonEvent::borrow(&v);
1649
1650        let _r1 = engine.process_event(&event);
1651        let _r2 = engine.process_event(&event);
1652        let r3 = engine.process_event(&event);
1653
1654        // With WallClock, all events get near-identical timestamps and should correlate
1655        assert!(
1656            !r3.correlations.is_empty(),
1657            "WallClock fallback should allow correlation"
1658        );
1659    }
1660
1661    // =========================================================================
1662    // Event count correlation
1663    // =========================================================================
1664
1665    #[test]
1666    fn test_event_count_basic() {
1667        let yaml = r#"
1668title: Base Rule
1669id: base-rule-001
1670name: base_rule
1671logsource:
1672    product: windows
1673    category: process_creation
1674detection:
1675    selection:
1676        CommandLine|contains: 'whoami'
1677    condition: selection
1678level: low
1679---
1680title: Multiple Whoami
1681id: corr-001
1682correlation:
1683    type: event_count
1684    rules:
1685        - base-rule-001
1686    group-by:
1687        - User
1688    timespan: 60s
1689    condition:
1690        gte: 3
1691level: high
1692"#;
1693        let collection = parse_sigma_yaml(yaml).unwrap();
1694        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1695        engine.add_collection(&collection).unwrap();
1696
1697        assert_eq!(engine.detection_rule_count(), 1);
1698        assert_eq!(engine.correlation_rule_count(), 1);
1699
1700        // Send 3 events from same user within the window
1701        let base_ts = 1000i64;
1702        for i in 0..3 {
1703            let v = json!({"CommandLine": "whoami", "User": "admin"});
1704            let event = JsonEvent::borrow(&v);
1705            let result = engine.process_event_at(&event, base_ts + i * 10);
1706
1707            // Each event should match the detection rule
1708            assert_eq!(result.detections.len(), 1);
1709
1710            if i < 2 {
1711                // Not enough events yet
1712                assert!(result.correlations.is_empty());
1713            } else {
1714                // 3rd event triggers the correlation
1715                assert_eq!(result.correlations.len(), 1);
1716                assert_eq!(result.correlations[0].rule_title, "Multiple Whoami");
1717                assert_eq!(result.correlations[0].aggregated_value, 3.0);
1718            }
1719        }
1720    }
1721
1722    #[test]
1723    fn test_event_count_different_groups() {
1724        let yaml = r#"
1725title: Login
1726id: login-001
1727logsource:
1728    category: auth
1729detection:
1730    selection:
1731        EventType: login
1732    condition: selection
1733level: low
1734---
1735title: Many Logins
1736id: corr-login
1737correlation:
1738    type: event_count
1739    rules:
1740        - login-001
1741    group-by:
1742        - User
1743    timespan: 60s
1744    condition:
1745        gte: 3
1746level: high
1747"#;
1748        let collection = parse_sigma_yaml(yaml).unwrap();
1749        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1750        engine.add_collection(&collection).unwrap();
1751
1752        // User "alice" sends 2 events, "bob" sends 3
1753        let ts = 1000i64;
1754        for i in 0..2 {
1755            let v = json!({"EventType": "login", "User": "alice"});
1756            let event = JsonEvent::borrow(&v);
1757            let r = engine.process_event_at(&event, ts + i);
1758            assert!(r.correlations.is_empty());
1759        }
1760        for i in 0..3 {
1761            let v = json!({"EventType": "login", "User": "bob"});
1762            let event = JsonEvent::borrow(&v);
1763            let r = engine.process_event_at(&event, ts + i);
1764            if i == 2 {
1765                assert_eq!(r.correlations.len(), 1);
1766                assert_eq!(
1767                    r.correlations[0].group_key,
1768                    vec![("User".to_string(), "bob".to_string())]
1769                );
1770            }
1771        }
1772    }
1773
1774    #[test]
1775    fn test_event_count_window_expiry() {
1776        let yaml = r#"
1777title: Base
1778id: base-002
1779logsource:
1780    category: test
1781detection:
1782    selection:
1783        action: click
1784    condition: selection
1785---
1786title: Rapid Clicks
1787id: corr-002
1788correlation:
1789    type: event_count
1790    rules:
1791        - base-002
1792    group-by:
1793        - User
1794    timespan: 10s
1795    condition:
1796        gte: 3
1797level: medium
1798"#;
1799        let collection = parse_sigma_yaml(yaml).unwrap();
1800        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1801        engine.add_collection(&collection).unwrap();
1802
1803        // Send 2 events at t=0,1 then 1 event at t=15 (outside window)
1804        let v = json!({"action": "click", "User": "admin"});
1805        let event = JsonEvent::borrow(&v);
1806        engine.process_event_at(&event, 0);
1807        engine.process_event_at(&event, 1);
1808        let r = engine.process_event_at(&event, 15);
1809        // Only 1 event in window [5, 15], not enough
1810        assert!(r.correlations.is_empty());
1811    }
1812
1813    // =========================================================================
1814    // Value count correlation
1815    // =========================================================================
1816
1817    #[test]
1818    fn test_value_count() {
1819        let yaml = r#"
1820title: Failed Login
1821id: failed-login-001
1822logsource:
1823    category: auth
1824detection:
1825    selection:
1826        EventType: failed_login
1827    condition: selection
1828level: low
1829---
1830title: Failed Logins From Many Users
1831id: corr-vc-001
1832correlation:
1833    type: value_count
1834    rules:
1835        - failed-login-001
1836    group-by:
1837        - Host
1838    timespan: 60s
1839    condition:
1840        field: User
1841        gte: 3
1842level: high
1843"#;
1844        let collection = parse_sigma_yaml(yaml).unwrap();
1845        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1846        engine.add_collection(&collection).unwrap();
1847
1848        let ts = 1000i64;
1849        // 3 different users failing login on same host
1850        for (i, user) in ["alice", "bob", "charlie"].iter().enumerate() {
1851            let v = json!({"EventType": "failed_login", "Host": "srv01", "User": user});
1852            let event = JsonEvent::borrow(&v);
1853            let r = engine.process_event_at(&event, ts + i as i64);
1854            if i == 2 {
1855                assert_eq!(r.correlations.len(), 1);
1856                assert_eq!(r.correlations[0].aggregated_value, 3.0);
1857            }
1858        }
1859    }
1860
1861    // =========================================================================
1862    // Temporal correlation
1863    // =========================================================================
1864
1865    #[test]
1866    fn test_temporal() {
1867        let yaml = r#"
1868title: Recon A
1869id: recon-a
1870name: recon_a
1871logsource:
1872    category: process
1873detection:
1874    selection:
1875        CommandLine|contains: 'whoami'
1876    condition: selection
1877---
1878title: Recon B
1879id: recon-b
1880name: recon_b
1881logsource:
1882    category: process
1883detection:
1884    selection:
1885        CommandLine|contains: 'ipconfig'
1886    condition: selection
1887---
1888title: Recon Combo
1889id: corr-temporal
1890correlation:
1891    type: temporal
1892    rules:
1893        - recon-a
1894        - recon-b
1895    group-by:
1896        - User
1897    timespan: 60s
1898    condition:
1899        gte: 2
1900level: high
1901"#;
1902        let collection = parse_sigma_yaml(yaml).unwrap();
1903        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1904        engine.add_collection(&collection).unwrap();
1905
1906        let ts = 1000i64;
1907        // Only recon A fires
1908        let v1 = json!({"CommandLine": "whoami", "User": "admin"});
1909        let ev1 = JsonEvent::borrow(&v1);
1910        let r1 = engine.process_event_at(&ev1, ts);
1911        assert!(r1.correlations.is_empty());
1912
1913        // Now recon B fires — both rules have fired within window
1914        let v2 = json!({"CommandLine": "ipconfig /all", "User": "admin"});
1915        let ev2 = JsonEvent::borrow(&v2);
1916        let r2 = engine.process_event_at(&ev2, ts + 10);
1917        assert_eq!(r2.correlations.len(), 1);
1918        assert_eq!(r2.correlations[0].rule_title, "Recon Combo");
1919    }
1920
1921    // =========================================================================
1922    // Temporal ordered correlation
1923    // =========================================================================
1924
1925    #[test]
1926    fn test_temporal_ordered() {
1927        let yaml = r#"
1928title: Failed Login
1929id: failed-001
1930name: failed_login
1931logsource:
1932    category: auth
1933detection:
1934    selection:
1935        EventType: failed_login
1936    condition: selection
1937---
1938title: Success Login
1939id: success-001
1940name: successful_login
1941logsource:
1942    category: auth
1943detection:
1944    selection:
1945        EventType: success_login
1946    condition: selection
1947---
1948title: Brute Force Then Login
1949id: corr-bf
1950correlation:
1951    type: temporal_ordered
1952    rules:
1953        - failed-001
1954        - success-001
1955    group-by:
1956        - User
1957    timespan: 60s
1958    condition:
1959        gte: 2
1960level: critical
1961"#;
1962        let collection = parse_sigma_yaml(yaml).unwrap();
1963        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1964        engine.add_collection(&collection).unwrap();
1965
1966        let ts = 1000i64;
1967        // Failed login first
1968        let v1 = json!({"EventType": "failed_login", "User": "admin"});
1969        let ev1 = JsonEvent::borrow(&v1);
1970        let r1 = engine.process_event_at(&ev1, ts);
1971        assert!(r1.correlations.is_empty());
1972
1973        // Then successful login — correct order!
1974        let v2 = json!({"EventType": "success_login", "User": "admin"});
1975        let ev2 = JsonEvent::borrow(&v2);
1976        let r2 = engine.process_event_at(&ev2, ts + 10);
1977        assert_eq!(r2.correlations.len(), 1);
1978    }
1979
1980    #[test]
1981    fn test_temporal_ordered_wrong_order() {
1982        let yaml = r#"
1983title: Rule A
1984id: rule-a
1985logsource:
1986    category: test
1987detection:
1988    selection:
1989        type: a
1990    condition: selection
1991---
1992title: Rule B
1993id: rule-b
1994logsource:
1995    category: test
1996detection:
1997    selection:
1998        type: b
1999    condition: selection
2000---
2001title: A then B
2002id: corr-ab
2003correlation:
2004    type: temporal_ordered
2005    rules:
2006        - rule-a
2007        - rule-b
2008    group-by:
2009        - User
2010    timespan: 60s
2011    condition:
2012        gte: 2
2013level: high
2014"#;
2015        let collection = parse_sigma_yaml(yaml).unwrap();
2016        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2017        engine.add_collection(&collection).unwrap();
2018
2019        let ts = 1000i64;
2020        // B fires first, then A — wrong order
2021        let v1 = json!({"type": "b", "User": "admin"});
2022        let ev1 = JsonEvent::borrow(&v1);
2023        engine.process_event_at(&ev1, ts);
2024
2025        let v2 = json!({"type": "a", "User": "admin"});
2026        let ev2 = JsonEvent::borrow(&v2);
2027        let r2 = engine.process_event_at(&ev2, ts + 10);
2028        assert!(r2.correlations.is_empty());
2029    }
2030
2031    // =========================================================================
2032    // Numeric aggregation (value_sum, value_avg)
2033    // =========================================================================
2034
2035    #[test]
2036    fn test_value_sum() {
2037        let yaml = r#"
2038title: Web Access
2039id: web-001
2040logsource:
2041    category: web
2042detection:
2043    selection:
2044        action: upload
2045    condition: selection
2046---
2047title: Large Upload
2048id: corr-sum
2049correlation:
2050    type: value_sum
2051    rules:
2052        - web-001
2053    group-by:
2054        - User
2055    timespan: 60s
2056    condition:
2057        field: bytes_sent
2058        gt: 1000
2059level: high
2060"#;
2061        let collection = parse_sigma_yaml(yaml).unwrap();
2062        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2063        engine.add_collection(&collection).unwrap();
2064
2065        let ts = 1000i64;
2066        let v1 = json!({"action": "upload", "User": "alice", "bytes_sent": 600});
2067        let ev1 = JsonEvent::borrow(&v1);
2068        let r1 = engine.process_event_at(&ev1, ts);
2069        assert!(r1.correlations.is_empty());
2070
2071        let v2 = json!({"action": "upload", "User": "alice", "bytes_sent": 500});
2072        let ev2 = JsonEvent::borrow(&v2);
2073        let r2 = engine.process_event_at(&ev2, ts + 5);
2074        assert_eq!(r2.correlations.len(), 1);
2075        assert!((r2.correlations[0].aggregated_value - 1100.0).abs() < f64::EPSILON);
2076    }
2077
2078    #[test]
2079    fn test_value_avg() {
2080        let yaml = r#"
2081title: Request
2082id: req-001
2083logsource:
2084    category: web
2085detection:
2086    selection:
2087        type: request
2088    condition: selection
2089---
2090title: High Avg Latency
2091id: corr-avg
2092correlation:
2093    type: value_avg
2094    rules:
2095        - req-001
2096    group-by:
2097        - Service
2098    timespan: 60s
2099    condition:
2100        field: latency_ms
2101        gt: 500
2102level: medium
2103"#;
2104        let collection = parse_sigma_yaml(yaml).unwrap();
2105        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2106        engine.add_collection(&collection).unwrap();
2107
2108        let ts = 1000i64;
2109        // Avg of 400, 600, 800 = 600 > 500
2110        for (i, latency) in [400, 600, 800].iter().enumerate() {
2111            let v = json!({"type": "request", "Service": "api", "latency_ms": latency});
2112            let event = JsonEvent::borrow(&v);
2113            let r = engine.process_event_at(&event, ts + i as i64);
2114            if i == 2 {
2115                assert_eq!(r.correlations.len(), 1);
2116                assert!((r.correlations[0].aggregated_value - 600.0).abs() < f64::EPSILON);
2117            }
2118        }
2119    }
2120
2121    // =========================================================================
2122    // State management
2123    // =========================================================================
2124
2125    #[test]
2126    fn test_state_count() {
2127        let yaml = r#"
2128title: Base
2129id: base-sc
2130logsource:
2131    category: test
2132detection:
2133    selection:
2134        action: test
2135    condition: selection
2136---
2137title: Count
2138id: corr-sc
2139correlation:
2140    type: event_count
2141    rules:
2142        - base-sc
2143    group-by:
2144        - User
2145    timespan: 60s
2146    condition:
2147        gte: 100
2148level: low
2149"#;
2150        let collection = parse_sigma_yaml(yaml).unwrap();
2151        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2152        engine.add_collection(&collection).unwrap();
2153
2154        let v = json!({"action": "test", "User": "alice"});
2155        let event = JsonEvent::borrow(&v);
2156        engine.process_event_at(&event, 1000);
2157        assert_eq!(engine.state_count(), 1);
2158
2159        let v2 = json!({"action": "test", "User": "bob"});
2160        let event2 = JsonEvent::borrow(&v2);
2161        engine.process_event_at(&event2, 1001);
2162        assert_eq!(engine.state_count(), 2);
2163
2164        // Evict everything
2165        engine.evict_expired(2000);
2166        assert_eq!(engine.state_count(), 0);
2167    }
2168
2169    // =========================================================================
2170    // Generate flag
2171    // =========================================================================
2172
2173    #[test]
2174    fn test_generate_flag_default_false() {
2175        let yaml = r#"
2176title: Base
2177id: gen-base
2178logsource:
2179    category: test
2180detection:
2181    selection:
2182        action: test
2183    condition: selection
2184---
2185title: Correlation
2186id: gen-corr
2187correlation:
2188    type: event_count
2189    rules:
2190        - gen-base
2191    group-by:
2192        - User
2193    timespan: 60s
2194    condition:
2195        gte: 1
2196level: high
2197"#;
2198        let collection = parse_sigma_yaml(yaml).unwrap();
2199        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2200        engine.add_collection(&collection).unwrap();
2201
2202        // generate defaults to false — detection matches are still returned
2203        // (filtering by generate flag is a backend concern, not eval)
2204        let v = json!({"action": "test", "User": "alice"});
2205        let event = JsonEvent::borrow(&v);
2206        let r = engine.process_event_at(&event, 1000);
2207        assert_eq!(r.detections.len(), 1);
2208        assert_eq!(r.correlations.len(), 1);
2209    }
2210
2211    // =========================================================================
2212    // Real-world example: AWS bucket enumeration
2213    // =========================================================================
2214
2215    #[test]
2216    fn test_aws_bucket_enumeration() {
2217        let yaml = r#"
2218title: Potential Bucket Enumeration on AWS
2219id: f305fd62-beca-47da-ad95-7690a0620084
2220logsource:
2221    product: aws
2222    service: cloudtrail
2223detection:
2224    selection:
2225        eventSource: "s3.amazonaws.com"
2226        eventName: "ListBuckets"
2227    condition: selection
2228level: low
2229---
2230title: Multiple AWS bucket enumerations
2231id: be246094-01d3-4bba-88de-69e582eba0cc
2232status: experimental
2233correlation:
2234    type: event_count
2235    rules:
2236        - f305fd62-beca-47da-ad95-7690a0620084
2237    group-by:
2238        - userIdentity.arn
2239    timespan: 1h
2240    condition:
2241        gte: 5
2242level: high
2243"#;
2244        let collection = parse_sigma_yaml(yaml).unwrap();
2245        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2246        engine.add_collection(&collection).unwrap();
2247
2248        let base_ts = 1_700_000_000i64;
2249        for i in 0..5 {
2250            let v = json!({
2251                "eventSource": "s3.amazonaws.com",
2252                "eventName": "ListBuckets",
2253                "userIdentity.arn": "arn:aws:iam::123456789:user/attacker"
2254            });
2255            let event = JsonEvent::borrow(&v);
2256            let r = engine.process_event_at(&event, base_ts + i * 60);
2257            if i == 4 {
2258                assert_eq!(r.correlations.len(), 1);
2259                assert_eq!(
2260                    r.correlations[0].rule_title,
2261                    "Multiple AWS bucket enumerations"
2262                );
2263                assert_eq!(r.correlations[0].aggregated_value, 5.0);
2264            }
2265        }
2266    }
2267
2268    // =========================================================================
2269    // Chaining: event_count -> temporal_ordered
2270    // =========================================================================
2271
2272    #[test]
2273    fn test_chaining_event_count_to_temporal() {
2274        // Reproduces the spec's "failed logins followed by successful login" example.
2275        // Chain: failed_login (detection) -> many_failed (event_count) -> brute_then_login (temporal_ordered)
2276        let yaml = r#"
2277title: Single failed login
2278id: failed-login-chain
2279name: failed_login
2280logsource:
2281    category: auth
2282detection:
2283    selection:
2284        EventType: failed_login
2285    condition: selection
2286---
2287title: Successful login
2288id: success-login-chain
2289name: successful_login
2290logsource:
2291    category: auth
2292detection:
2293    selection:
2294        EventType: success_login
2295    condition: selection
2296---
2297title: Multiple failed logins
2298id: many-failed-chain
2299name: multiple_failed_login
2300correlation:
2301    type: event_count
2302    rules:
2303        - failed-login-chain
2304    group-by:
2305        - User
2306    timespan: 60s
2307    condition:
2308        gte: 3
2309level: medium
2310---
2311title: Brute Force Followed by Login
2312id: brute-force-chain
2313correlation:
2314    type: temporal_ordered
2315    rules:
2316        - many-failed-chain
2317        - success-login-chain
2318    group-by:
2319        - User
2320    timespan: 120s
2321    condition:
2322        gte: 2
2323level: critical
2324"#;
2325        let collection = parse_sigma_yaml(yaml).unwrap();
2326        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2327        engine.add_collection(&collection).unwrap();
2328
2329        assert_eq!(engine.detection_rule_count(), 2);
2330        assert_eq!(engine.correlation_rule_count(), 2);
2331
2332        let ts = 1000i64;
2333
2334        // Send 3 failed logins → triggers "many_failed_chain"
2335        for i in 0..3 {
2336            let v = json!({"EventType": "failed_login", "User": "victim"});
2337            let event = JsonEvent::borrow(&v);
2338            let r = engine.process_event_at(&event, ts + i);
2339            if i == 2 {
2340                // The event_count correlation should fire
2341                assert!(
2342                    r.correlations
2343                        .iter()
2344                        .any(|c| c.rule_title == "Multiple failed logins"),
2345                    "Expected event_count correlation to fire"
2346                );
2347            }
2348        }
2349
2350        // Now send a successful login → should trigger the chained temporal_ordered
2351        // Note: chaining happens in chain_correlations when many-failed-chain fires
2352        // and then success-login-chain matches the detection.
2353        // The temporal_ordered correlation needs BOTH many-failed-chain AND success-login-chain
2354        // to have fired. success-login-chain is a detection rule, not a correlation,
2355        // so it gets matched via the regular detection path.
2356        let v = json!({"EventType": "success_login", "User": "victim"});
2357        let event = JsonEvent::borrow(&v);
2358        let r = engine.process_event_at(&event, ts + 30);
2359
2360        // The detection should match
2361        assert_eq!(r.detections.len(), 1);
2362        assert_eq!(r.detections[0].rule_title, "Successful login");
2363    }
2364
2365    // =========================================================================
2366    // Field aliases
2367    // =========================================================================
2368
2369    #[test]
2370    fn test_field_aliases() {
2371        let yaml = r#"
2372title: Internal Error
2373id: internal-error-001
2374name: internal_error
2375logsource:
2376    category: web
2377detection:
2378    selection:
2379        http.response.status_code: 500
2380    condition: selection
2381---
2382title: New Connection
2383id: new-conn-001
2384name: new_network_connection
2385logsource:
2386    category: network
2387detection:
2388    selection:
2389        event.type: connection
2390    condition: selection
2391---
2392title: Error Then Connection
2393id: corr-alias
2394correlation:
2395    type: temporal
2396    rules:
2397        - internal-error-001
2398        - new-conn-001
2399    group-by:
2400        - internal_ip
2401    timespan: 60s
2402    condition:
2403        gte: 2
2404    aliases:
2405        internal_ip:
2406            internal_error: destination.ip
2407            new_network_connection: source.ip
2408level: high
2409"#;
2410        let collection = parse_sigma_yaml(yaml).unwrap();
2411        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2412        engine.add_collection(&collection).unwrap();
2413
2414        let ts = 1000i64;
2415
2416        // Internal error with destination.ip = 10.0.0.5
2417        let v1 = json!({
2418            "http.response.status_code": 500,
2419            "destination.ip": "10.0.0.5"
2420        });
2421        let ev1 = JsonEvent::borrow(&v1);
2422        let r1 = engine.process_event_at(&ev1, ts);
2423        assert_eq!(r1.detections.len(), 1);
2424        assert!(r1.correlations.is_empty());
2425
2426        // New connection with source.ip = 10.0.0.5 (same IP, aliased)
2427        let v2 = json!({
2428            "event.type": "connection",
2429            "source.ip": "10.0.0.5"
2430        });
2431        let ev2 = JsonEvent::borrow(&v2);
2432        let r2 = engine.process_event_at(&ev2, ts + 5);
2433        assert_eq!(r2.detections.len(), 1);
2434        // Both rules fired for the same internal_ip group → temporal should fire
2435        assert_eq!(r2.correlations.len(), 1);
2436        assert_eq!(r2.correlations[0].rule_title, "Error Then Connection");
2437        // Check group key contains the aliased field
2438        assert!(
2439            r2.correlations[0]
2440                .group_key
2441                .iter()
2442                .any(|(k, v)| k == "internal_ip" && v == "10.0.0.5")
2443        );
2444    }
2445
2446    // =========================================================================
2447    // Value percentile (basic smoke test)
2448    // =========================================================================
2449
2450    #[test]
2451    fn test_value_percentile() {
2452        let yaml = r#"
2453title: Process Creation
2454id: proc-001
2455logsource:
2456    category: process
2457detection:
2458    selection:
2459        type: process_creation
2460    condition: selection
2461---
2462title: Rare Process
2463id: corr-percentile
2464correlation:
2465    type: value_percentile
2466    rules:
2467        - proc-001
2468    group-by:
2469        - ComputerName
2470    timespan: 60s
2471    condition:
2472        field: image
2473        lte: 50
2474level: medium
2475"#;
2476        let collection = parse_sigma_yaml(yaml).unwrap();
2477        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2478        engine.add_collection(&collection).unwrap();
2479
2480        let ts = 1000i64;
2481        // Push some numeric-ish values for the image field
2482        for (i, val) in [10.0, 20.0, 30.0, 40.0, 50.0].iter().enumerate() {
2483            let v = json!({"type": "process_creation", "ComputerName": "srv01", "image": val});
2484            let event = JsonEvent::borrow(&v);
2485            let _ = engine.process_event_at(&event, ts + i as i64);
2486        }
2487        // The median (30.0) should be <= 50, so condition fires
2488        // Note: percentile implementation is simplified for in-memory eval
2489    }
2490
2491    // =========================================================================
2492    // Extended temporal conditions (end-to-end)
2493    // =========================================================================
2494
2495    #[test]
2496    fn test_extended_temporal_and_condition() {
2497        // Temporal correlation with "rule_a and rule_b" extended condition
2498        let yaml = r#"
2499title: Login Attempt
2500id: login-attempt
2501logsource:
2502    category: auth
2503detection:
2504    selection:
2505        EventType: login_failure
2506    condition: selection
2507---
2508title: Password Change
2509id: password-change
2510logsource:
2511    category: auth
2512detection:
2513    selection:
2514        EventType: password_change
2515    condition: selection
2516---
2517title: Credential Attack
2518correlation:
2519    type: temporal
2520    rules:
2521        - login-attempt
2522        - password-change
2523    group-by:
2524        - User
2525    timespan: 300s
2526    condition: login-attempt and password-change
2527level: high
2528"#;
2529        let collection = parse_sigma_yaml(yaml).unwrap();
2530        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2531        engine.add_collection(&collection).unwrap();
2532
2533        let ts = 1000i64;
2534
2535        // Login failure by alice
2536        let ev1 = json!({"EventType": "login_failure", "User": "alice"});
2537        let r1 = engine.process_event_at(&JsonEvent::borrow(&ev1), ts);
2538        assert!(r1.correlations.is_empty(), "only one rule fired so far");
2539
2540        // Password change by alice — both rules have now fired
2541        let ev2 = json!({"EventType": "password_change", "User": "alice"});
2542        let r2 = engine.process_event_at(&JsonEvent::borrow(&ev2), ts + 10);
2543        assert_eq!(
2544            r2.correlations.len(),
2545            1,
2546            "temporal correlation should fire: both rules matched"
2547        );
2548        assert_eq!(r2.correlations[0].rule_title, "Credential Attack");
2549    }
2550
2551    #[test]
2552    fn test_extended_temporal_or_condition() {
2553        // Temporal with "rule_a or rule_b" — should fire when either fires
2554        let yaml = r#"
2555title: SSH Login
2556id: ssh-login
2557logsource:
2558    category: auth
2559detection:
2560    selection:
2561        EventType: ssh_login
2562    condition: selection
2563---
2564title: VPN Login
2565id: vpn-login
2566logsource:
2567    category: auth
2568detection:
2569    selection:
2570        EventType: vpn_login
2571    condition: selection
2572---
2573title: Any Remote Access
2574correlation:
2575    type: temporal
2576    rules:
2577        - ssh-login
2578        - vpn-login
2579    group-by:
2580        - User
2581    timespan: 60s
2582    condition: ssh-login or vpn-login
2583level: medium
2584"#;
2585        let collection = parse_sigma_yaml(yaml).unwrap();
2586        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2587        engine.add_collection(&collection).unwrap();
2588
2589        // Only SSH login by bob — "or" means this suffices
2590        let ev = json!({"EventType": "ssh_login", "User": "bob"});
2591        let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
2592        assert_eq!(r.correlations.len(), 1);
2593        assert_eq!(r.correlations[0].rule_title, "Any Remote Access");
2594    }
2595
2596    #[test]
2597    fn test_extended_temporal_partial_and_no_fire() {
2598        // Temporal "and" with only one rule firing should not trigger
2599        let yaml = r#"
2600title: Recon Step 1
2601id: recon-1
2602logsource:
2603    category: process
2604detection:
2605    selection:
2606        CommandLine|contains: 'whoami'
2607    condition: selection
2608---
2609title: Recon Step 2
2610id: recon-2
2611logsource:
2612    category: process
2613detection:
2614    selection:
2615        CommandLine|contains: 'ipconfig'
2616    condition: selection
2617---
2618title: Full Recon
2619correlation:
2620    type: temporal
2621    rules:
2622        - recon-1
2623        - recon-2
2624    group-by:
2625        - Host
2626    timespan: 120s
2627    condition: recon-1 and recon-2
2628level: high
2629"#;
2630        let collection = parse_sigma_yaml(yaml).unwrap();
2631        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2632        engine.add_collection(&collection).unwrap();
2633
2634        // Only whoami (recon-1) — should not fire
2635        let ev = json!({"CommandLine": "whoami", "Host": "srv01"});
2636        let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
2637        assert!(r.correlations.is_empty(), "only one of two AND rules fired");
2638
2639        // Now ipconfig (recon-2) — should fire
2640        let ev2 = json!({"CommandLine": "ipconfig /all", "Host": "srv01"});
2641        let r2 = engine.process_event_at(&JsonEvent::borrow(&ev2), 1010);
2642        assert_eq!(r2.correlations.len(), 1);
2643        assert_eq!(r2.correlations[0].rule_title, "Full Recon");
2644    }
2645
2646    // =========================================================================
2647    // Filter rules with correlation engine
2648    // =========================================================================
2649
2650    #[test]
2651    fn test_filter_with_correlation() {
2652        // Detection rule + filter + event_count correlation
2653        let yaml = r#"
2654title: Failed Auth
2655id: failed-auth
2656logsource:
2657    category: auth
2658detection:
2659    selection:
2660        EventType: auth_failure
2661    condition: selection
2662---
2663title: Exclude Service Accounts
2664filter:
2665    rules:
2666        - failed-auth
2667    selection:
2668        User|startswith: 'svc_'
2669    condition: not selection
2670---
2671title: Brute Force
2672correlation:
2673    type: event_count
2674    rules:
2675        - failed-auth
2676    group-by:
2677        - User
2678    timespan: 300s
2679    condition:
2680        gte: 3
2681level: critical
2682"#;
2683        let collection = parse_sigma_yaml(yaml).unwrap();
2684        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2685        engine.add_collection(&collection).unwrap();
2686
2687        let ts = 1000i64;
2688
2689        // Service account failures should be filtered — don't count
2690        for i in 0..5 {
2691            let ev = json!({"EventType": "auth_failure", "User": "svc_backup"});
2692            let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + i);
2693            assert!(
2694                r.correlations.is_empty(),
2695                "service account should be filtered, no correlation"
2696            );
2697        }
2698
2699        // Normal user failures should count
2700        for i in 0..2 {
2701            let ev = json!({"EventType": "auth_failure", "User": "alice"});
2702            let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 10 + i);
2703            assert!(r.correlations.is_empty(), "not yet 3 events");
2704        }
2705
2706        // Third failure triggers correlation
2707        let ev = json!({"EventType": "auth_failure", "User": "alice"});
2708        let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 12);
2709        assert_eq!(r.correlations.len(), 1);
2710        assert_eq!(r.correlations[0].rule_title, "Brute Force");
2711    }
2712
2713    // =========================================================================
2714    // action: repeat with correlation engine
2715    // =========================================================================
2716
2717    #[test]
2718    fn test_repeat_rules_in_correlation() {
2719        // Two detection rules via repeat, both feed into event_count
2720        let yaml = r#"
2721title: File Access A
2722id: file-a
2723logsource:
2724    category: file_access
2725detection:
2726    selection:
2727        FileName|endswith: '.docx'
2728    condition: selection
2729---
2730action: repeat
2731title: File Access B
2732id: file-b
2733detection:
2734    selection:
2735        FileName|endswith: '.xlsx'
2736    condition: selection
2737---
2738title: Mass File Access
2739correlation:
2740    type: event_count
2741    rules:
2742        - file-a
2743        - file-b
2744    group-by:
2745        - User
2746    timespan: 60s
2747    condition:
2748        gte: 3
2749level: high
2750"#;
2751        let collection = parse_sigma_yaml(yaml).unwrap();
2752        assert_eq!(collection.rules.len(), 2);
2753        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2754        engine.add_collection(&collection).unwrap();
2755        assert_eq!(engine.detection_rule_count(), 2);
2756
2757        let ts = 1000i64;
2758        // Mix of docx and xlsx accesses by same user
2759        let ev1 = json!({"FileName": "report.docx", "User": "bob"});
2760        engine.process_event_at(&JsonEvent::borrow(&ev1), ts);
2761        let ev2 = json!({"FileName": "data.xlsx", "User": "bob"});
2762        engine.process_event_at(&JsonEvent::borrow(&ev2), ts + 1);
2763        let ev3 = json!({"FileName": "notes.docx", "User": "bob"});
2764        let r = engine.process_event_at(&JsonEvent::borrow(&ev3), ts + 2);
2765
2766        assert_eq!(r.correlations.len(), 1);
2767        assert_eq!(r.correlations[0].rule_title, "Mass File Access");
2768    }
2769
2770    // =========================================================================
2771    // Expand modifier with correlation engine
2772    // =========================================================================
2773
2774    #[test]
2775    fn test_expand_modifier_with_correlation() {
2776        let yaml = r#"
2777title: User Temp File
2778id: user-temp
2779logsource:
2780    category: file_access
2781detection:
2782    selection:
2783        FilePath|expand: 'C:\Users\%User%\Temp'
2784    condition: selection
2785---
2786title: Excessive Temp Access
2787correlation:
2788    type: event_count
2789    rules:
2790        - user-temp
2791    group-by:
2792        - User
2793    timespan: 60s
2794    condition:
2795        gte: 2
2796level: medium
2797"#;
2798        let collection = parse_sigma_yaml(yaml).unwrap();
2799        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2800        engine.add_collection(&collection).unwrap();
2801
2802        let ts = 1000i64;
2803        // Event where User field matches the placeholder
2804        let ev1 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2805        let r1 = engine.process_event_at(&JsonEvent::borrow(&ev1), ts);
2806        assert!(r1.correlations.is_empty());
2807
2808        let ev2 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2809        let r2 = engine.process_event_at(&JsonEvent::borrow(&ev2), ts + 1);
2810        assert_eq!(r2.correlations.len(), 1);
2811        assert_eq!(r2.correlations[0].rule_title, "Excessive Temp Access");
2812
2813        // Different user — should NOT match (path says alice, user is bob)
2814        let ev3 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "bob"});
2815        let r3 = engine.process_event_at(&JsonEvent::borrow(&ev3), ts + 2);
2816        // Detection doesn't fire for this event since expand resolves to C:\Users\bob\Temp
2817        assert_eq!(r3.detections.len(), 0);
2818    }
2819
2820    // =========================================================================
2821    // Timestamp modifier with correlation engine
2822    // =========================================================================
2823
2824    #[test]
2825    fn test_timestamp_modifier_with_correlation() {
2826        let yaml = r#"
2827title: Night Login
2828id: night-login
2829logsource:
2830    category: auth
2831detection:
2832    login:
2833        EventType: login
2834    night:
2835        Timestamp|hour: 3
2836    condition: login and night
2837---
2838title: Frequent Night Logins
2839correlation:
2840    type: event_count
2841    rules:
2842        - night-login
2843    group-by:
2844        - User
2845    timespan: 3600s
2846    condition:
2847        gte: 2
2848level: high
2849"#;
2850        let collection = parse_sigma_yaml(yaml).unwrap();
2851        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2852        engine.add_collection(&collection).unwrap();
2853
2854        let ts = 1000i64;
2855        // Login at 3AM
2856        let ev1 =
2857            json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:10:00Z"});
2858        let r1 = engine.process_event_at(&JsonEvent::borrow(&ev1), ts);
2859        assert_eq!(r1.detections.len(), 1);
2860        assert!(r1.correlations.is_empty());
2861
2862        let ev2 =
2863            json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:45:00Z"});
2864        let r2 = engine.process_event_at(&JsonEvent::borrow(&ev2), ts + 1);
2865        assert_eq!(r2.correlations.len(), 1);
2866        assert_eq!(r2.correlations[0].rule_title, "Frequent Night Logins");
2867
2868        // Login at noon — should NOT count
2869        let ev3 = json!({"EventType": "login", "User": "bob", "Timestamp": "2024-01-15T12:00:00Z"});
2870        let r3 = engine.process_event_at(&JsonEvent::borrow(&ev3), ts + 2);
2871        assert!(
2872            r3.detections.is_empty(),
2873            "noon login should not match night rule"
2874        );
2875    }
2876
2877    // =========================================================================
2878    // Correlation condition range (multiple predicates)
2879    // =========================================================================
2880
2881    #[test]
2882    fn test_event_count_range_condition() {
2883        let yaml = r#"
2884title: Login Attempt
2885id: login-attempt-001
2886name: login_attempt
2887logsource:
2888    product: windows
2889detection:
2890    selection:
2891        EventType: login
2892    condition: selection
2893level: low
2894---
2895title: Login Count Range
2896id: corr-range-001
2897correlation:
2898    type: event_count
2899    rules:
2900        - login-attempt-001
2901    group-by:
2902        - User
2903    timespan: 3600s
2904    condition:
2905        gt: 2
2906        lte: 5
2907level: high
2908"#;
2909        let collection = parse_sigma_yaml(yaml).unwrap();
2910        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2911        engine.add_collection(&collection).unwrap();
2912
2913        let ts: i64 = 1_000_000;
2914
2915        // Send 2 events — gt:2 is false
2916        for i in 0..2 {
2917            let ev = json!({"EventType": "login", "User": "alice"});
2918            let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + i);
2919            assert!(r.correlations.is_empty(), "2 events should not fire (gt:2)");
2920        }
2921
2922        // 3rd event — gt:2 is true, lte:5 is true → fires
2923        let ev3 = json!({"EventType": "login", "User": "alice"});
2924        let r3 = engine.process_event_at(&JsonEvent::borrow(&ev3), ts + 3);
2925        assert_eq!(r3.correlations.len(), 1, "3 events: gt:2 AND lte:5");
2926
2927        // Send events 4, 5 — still in range
2928        for i in 4..=5 {
2929            let ev = json!({"EventType": "login", "User": "alice"});
2930            let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + i);
2931            assert_eq!(r.correlations.len(), 1, "{i} events still in range");
2932        }
2933
2934        // 6th event — lte:5 is false → no fire
2935        let ev6 = json!({"EventType": "login", "User": "alice"});
2936        let r6 = engine.process_event_at(&JsonEvent::borrow(&ev6), ts + 6);
2937        assert!(
2938            r6.correlations.is_empty(),
2939            "6 events exceeds lte:5, should not fire"
2940        );
2941    }
2942
2943    // =========================================================================
2944    // Suppression
2945    // =========================================================================
2946
2947    fn suppression_yaml() -> &'static str {
2948        r#"
2949title: Login
2950id: login-base
2951logsource:
2952    category: auth
2953detection:
2954    selection:
2955        EventType: login
2956    condition: selection
2957---
2958title: Many Logins
2959correlation:
2960    type: event_count
2961    rules:
2962        - login-base
2963    group-by:
2964        - User
2965    timeframe: 60s
2966    condition:
2967        gte: 3
2968level: high
2969"#
2970    }
2971
2972    #[test]
2973    fn test_suppression_window() {
2974        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2975        let config = CorrelationConfig {
2976            suppress: Some(10), // suppress for 10 seconds
2977            ..Default::default()
2978        };
2979        let mut engine = CorrelationEngine::new(config);
2980        engine.add_collection(&collection).unwrap();
2981
2982        let ev = json!({"EventType": "login", "User": "alice"});
2983        let ts = 1000;
2984
2985        // Fire 3 events to hit threshold
2986        engine.process_event_at(&JsonEvent::borrow(&ev), ts);
2987        engine.process_event_at(&JsonEvent::borrow(&ev), ts + 1);
2988        let r3 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 2);
2989        assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
2990
2991        // 4th event within suppress window → suppressed
2992        let r4 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 3);
2993        assert!(
2994            r4.correlations.is_empty(),
2995            "should be suppressed within 10s window"
2996        );
2997
2998        // 5th event still within suppress window → suppressed
2999        let r5 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 9);
3000        assert!(
3001            r5.correlations.is_empty(),
3002            "should be suppressed at ts+9 (< ts+2+10)"
3003        );
3004
3005        // Event after suppress window expires → fires again
3006        let r6 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 13);
3007        assert_eq!(
3008            r6.correlations.len(),
3009            1,
3010            "should fire again after suppress window expires"
3011        );
3012    }
3013
3014    #[test]
3015    fn test_suppression_per_group_key() {
3016        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3017        let config = CorrelationConfig {
3018            suppress: Some(60),
3019            ..Default::default()
3020        };
3021        let mut engine = CorrelationEngine::new(config);
3022        engine.add_collection(&collection).unwrap();
3023
3024        let ts = 1000;
3025
3026        // Alice hits threshold
3027        let ev_a = json!({"EventType": "login", "User": "alice"});
3028        engine.process_event_at(&JsonEvent::borrow(&ev_a), ts);
3029        engine.process_event_at(&JsonEvent::borrow(&ev_a), ts + 1);
3030        let r = engine.process_event_at(&JsonEvent::borrow(&ev_a), ts + 2);
3031        assert_eq!(r.correlations.len(), 1, "alice should fire");
3032
3033        // Bob hits threshold — different group key, not suppressed
3034        let ev_b = json!({"EventType": "login", "User": "bob"});
3035        engine.process_event_at(&JsonEvent::borrow(&ev_b), ts + 3);
3036        engine.process_event_at(&JsonEvent::borrow(&ev_b), ts + 4);
3037        let r = engine.process_event_at(&JsonEvent::borrow(&ev_b), ts + 5);
3038        assert_eq!(r.correlations.len(), 1, "bob should fire independently");
3039
3040        // Alice is still suppressed
3041        let r = engine.process_event_at(&JsonEvent::borrow(&ev_a), ts + 6);
3042        assert!(r.correlations.is_empty(), "alice still suppressed");
3043    }
3044
3045    // =========================================================================
3046    // Action on match: Reset
3047    // =========================================================================
3048
3049    #[test]
3050    fn test_action_reset() {
3051        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3052        let config = CorrelationConfig {
3053            action_on_match: CorrelationAction::Reset,
3054            ..Default::default()
3055        };
3056        let mut engine = CorrelationEngine::new(config);
3057        engine.add_collection(&collection).unwrap();
3058
3059        let ev = json!({"EventType": "login", "User": "alice"});
3060        let ts = 1000;
3061
3062        // Hit threshold: 3 events
3063        engine.process_event_at(&JsonEvent::borrow(&ev), ts);
3064        engine.process_event_at(&JsonEvent::borrow(&ev), ts + 1);
3065        let r3 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 2);
3066        assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
3067
3068        // State was reset, so 4th and 5th events should NOT fire
3069        let r4 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 3);
3070        assert!(r4.correlations.is_empty(), "reset: need 3 more events");
3071
3072        let r5 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 4);
3073        assert!(r5.correlations.is_empty(), "reset: still only 2");
3074
3075        // 6th event (3rd after reset) should fire again
3076        let r6 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 5);
3077        assert_eq!(
3078            r6.correlations.len(),
3079            1,
3080            "should fire again after 3 events post-reset"
3081        );
3082    }
3083
3084    // =========================================================================
3085    // Generate flag / emit_detections
3086    // =========================================================================
3087
3088    #[test]
3089    fn test_emit_detections_true_by_default() {
3090        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3091        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3092        engine.add_collection(&collection).unwrap();
3093
3094        let ev = json!({"EventType": "login", "User": "alice"});
3095        let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
3096        assert_eq!(r.detections.len(), 1, "by default detections are emitted");
3097    }
3098
3099    #[test]
3100    fn test_emit_detections_false_suppresses() {
3101        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3102        let config = CorrelationConfig {
3103            emit_detections: false,
3104            ..Default::default()
3105        };
3106        let mut engine = CorrelationEngine::new(config);
3107        engine.add_collection(&collection).unwrap();
3108
3109        let ev = json!({"EventType": "login", "User": "alice"});
3110        let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
3111        assert!(
3112            r.detections.is_empty(),
3113            "detection matches should be suppressed when emit_detections=false"
3114        );
3115    }
3116
3117    #[test]
3118    fn test_generate_true_keeps_detections() {
3119        // When generate: true, detections should be emitted even with emit_detections=false
3120        let yaml = r#"
3121title: Login
3122id: login-gen
3123logsource:
3124    category: auth
3125detection:
3126    selection:
3127        EventType: login
3128    condition: selection
3129---
3130title: Many Logins
3131correlation:
3132    type: event_count
3133    rules:
3134        - login-gen
3135    group-by:
3136        - User
3137    timeframe: 60s
3138    condition:
3139        gte: 3
3140    generate: true
3141level: high
3142"#;
3143        let collection = parse_sigma_yaml(yaml).unwrap();
3144        let config = CorrelationConfig {
3145            emit_detections: false,
3146            ..Default::default()
3147        };
3148        let mut engine = CorrelationEngine::new(config);
3149        engine.add_collection(&collection).unwrap();
3150
3151        let ev = json!({"EventType": "login", "User": "alice"});
3152        let r = engine.process_event_at(&JsonEvent::borrow(&ev), 1000);
3153        // generate: true means this rule is NOT correlation-only
3154        assert_eq!(
3155            r.detections.len(),
3156            1,
3157            "generate:true keeps detection output"
3158        );
3159    }
3160
3161    // =========================================================================
3162    // Suppression + Reset combined
3163    // =========================================================================
3164
3165    #[test]
3166    fn test_suppress_and_reset_combined() {
3167        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3168        let config = CorrelationConfig {
3169            suppress: Some(5),
3170            action_on_match: CorrelationAction::Reset,
3171            ..Default::default()
3172        };
3173        let mut engine = CorrelationEngine::new(config);
3174        engine.add_collection(&collection).unwrap();
3175
3176        let ev = json!({"EventType": "login", "User": "alice"});
3177        let ts = 1000;
3178
3179        // Hit threshold: fires and resets
3180        engine.process_event_at(&JsonEvent::borrow(&ev), ts);
3181        engine.process_event_at(&JsonEvent::borrow(&ev), ts + 1);
3182        let r3 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 2);
3183        assert_eq!(r3.correlations.len(), 1, "fires on 3rd event");
3184
3185        // Push 3 more events quickly (state was reset, so new count → 3)
3186        // but suppress window hasn't expired (ts+2 + 5 = ts+7)
3187        engine.process_event_at(&JsonEvent::borrow(&ev), ts + 3);
3188        engine.process_event_at(&JsonEvent::borrow(&ev), ts + 4);
3189        let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 5);
3190        assert!(
3191            r.correlations.is_empty(),
3192            "threshold met again but still suppressed"
3193        );
3194
3195        // After suppress expires (at ts+8, which is ts+2+6 > suppress=5),
3196        // the accumulated events from step 2 (ts+3,4,5) still satisfy gte:3,
3197        // so the first event after expiry fires immediately and resets.
3198        let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 8);
3199        assert_eq!(
3200            r.correlations.len(),
3201            1,
3202            "fires after suppress expires (accumulated events + new one)"
3203        );
3204
3205        // State was reset again at ts+8, suppress window now ts+8..ts+13.
3206        // Need 3 new events to fire, and suppress must expire.
3207        engine.process_event_at(&JsonEvent::borrow(&ev), ts + 9);
3208        engine.process_event_at(&JsonEvent::borrow(&ev), ts + 10);
3209        let r = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 11);
3210        assert!(
3211            r.correlations.is_empty(),
3212            "threshold met but suppress window hasn't expired (ts+11 - ts+8 = 3 < 5)"
3213        );
3214    }
3215
3216    // =========================================================================
3217    // No suppression (default behavior preserved)
3218    // =========================================================================
3219
3220    #[test]
3221    fn test_no_suppression_fires_every_event() {
3222        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3223        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3224        engine.add_collection(&collection).unwrap();
3225
3226        let ev = json!({"EventType": "login", "User": "alice"});
3227        let ts = 1000;
3228
3229        engine.process_event_at(&JsonEvent::borrow(&ev), ts);
3230        engine.process_event_at(&JsonEvent::borrow(&ev), ts + 1);
3231        let r3 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 2);
3232        assert_eq!(r3.correlations.len(), 1);
3233
3234        // Without suppression, 4th event should also fire
3235        let r4 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 3);
3236        assert_eq!(
3237            r4.correlations.len(),
3238            1,
3239            "no suppression: fires on every event after threshold"
3240        );
3241
3242        let r5 = engine.process_event_at(&JsonEvent::borrow(&ev), ts + 4);
3243        assert_eq!(r5.correlations.len(), 1, "still fires");
3244    }
3245
3246    // =========================================================================
3247    // Custom attribute → engine config tests
3248    // =========================================================================
3249
3250    fn yaml_str_attrs<const N: usize>(
3251        pairs: [(&str, &str); N],
3252    ) -> std::collections::HashMap<String, serde_yaml::Value> {
3253        pairs
3254            .into_iter()
3255            .map(|(k, v)| (k.to_string(), serde_yaml::Value::String(v.to_string())))
3256            .collect()
3257    }
3258
3259    #[test]
3260    fn test_custom_attr_timestamp_field() {
3261        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3262        let attrs = yaml_str_attrs([("rsigma.timestamp_field", "time")]);
3263        engine.apply_custom_attributes(&attrs);
3264
3265        assert_eq!(
3266            engine.config.timestamp_fields[0], "time",
3267            "rsigma.timestamp_field should be prepended"
3268        );
3269        // Defaults should still be there after the custom one
3270        assert!(
3271            engine
3272                .config
3273                .timestamp_fields
3274                .contains(&"@timestamp".to_string())
3275        );
3276    }
3277
3278    #[test]
3279    fn test_custom_attr_timestamp_field_no_duplicates() {
3280        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3281        let attrs = yaml_str_attrs([("rsigma.timestamp_field", "time")]);
3282        // Apply twice — should not duplicate
3283        engine.apply_custom_attributes(&attrs);
3284        engine.apply_custom_attributes(&attrs);
3285
3286        let count = engine
3287            .config
3288            .timestamp_fields
3289            .iter()
3290            .filter(|f| *f == "time")
3291            .count();
3292        assert_eq!(count, 1, "should not duplicate timestamp_field entries");
3293    }
3294
3295    #[test]
3296    fn test_custom_attr_suppress() {
3297        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3298        assert!(engine.config.suppress.is_none());
3299
3300        let attrs = yaml_str_attrs([("rsigma.suppress", "5m")]);
3301        engine.apply_custom_attributes(&attrs);
3302
3303        assert_eq!(engine.config.suppress, Some(300));
3304    }
3305
3306    #[test]
3307    fn test_custom_attr_suppress_does_not_override_cli() {
3308        let config = CorrelationConfig {
3309            suppress: Some(60), // CLI set to 60s
3310            ..Default::default()
3311        };
3312        let mut engine = CorrelationEngine::new(config);
3313
3314        let attrs = yaml_str_attrs([("rsigma.suppress", "5m")]);
3315        engine.apply_custom_attributes(&attrs);
3316
3317        assert_eq!(
3318            engine.config.suppress,
3319            Some(60),
3320            "CLI suppress should not be overridden by custom attribute"
3321        );
3322    }
3323
3324    #[test]
3325    fn test_custom_attr_action() {
3326        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3327        assert_eq!(engine.config.action_on_match, CorrelationAction::Alert);
3328
3329        let attrs = yaml_str_attrs([("rsigma.action", "reset")]);
3330        engine.apply_custom_attributes(&attrs);
3331
3332        assert_eq!(engine.config.action_on_match, CorrelationAction::Reset);
3333    }
3334
3335    #[test]
3336    fn test_custom_attr_action_does_not_override_cli() {
3337        let config = CorrelationConfig {
3338            action_on_match: CorrelationAction::Reset, // CLI set to reset
3339            ..Default::default()
3340        };
3341        let mut engine = CorrelationEngine::new(config);
3342
3343        let attrs = yaml_str_attrs([("rsigma.action", "alert")]);
3344        engine.apply_custom_attributes(&attrs);
3345
3346        assert_eq!(
3347            engine.config.action_on_match,
3348            CorrelationAction::Reset,
3349            "CLI action should not be overridden by custom attribute"
3350        );
3351    }
3352
3353    #[test]
3354    fn test_custom_attr_timestamp_field_used_for_extraction() {
3355        // The event has "time" but not "@timestamp" or "timestamp"
3356        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3357        let mut config = CorrelationConfig::default();
3358        // Prepend "event_time" to simulate --timestamp-field
3359        config.timestamp_fields.insert(0, "event_time".to_string());
3360        let mut engine = CorrelationEngine::new(config);
3361        engine.add_collection(&collection).unwrap();
3362
3363        // Event with "event_time" field
3364        let ev = json!({
3365            "EventType": "login",
3366            "User": "alice",
3367            "event_time": "2026-02-11T12:00:00Z"
3368        });
3369        let result = engine.process_event(&JsonEvent::borrow(&ev));
3370
3371        // The detection should match, and timestamp should be ~1739275200 (2026-02-11)
3372        assert!(!result.detections.is_empty() || result.correlations.is_empty());
3373        // The key test: ensure the engine extracted the event timestamp, not Utc::now.
3374        // If it used Utc::now, the test would still pass but the timestamp would be
3375        // wildly different. We verify by checking the extracted value directly.
3376        let ts = engine
3377            .extract_event_timestamp(&JsonEvent::borrow(&ev))
3378            .expect("should extract timestamp");
3379        assert!(
3380            ts > 1_700_000_000 && ts < 1_800_000_000,
3381            "timestamp should be ~2026 epoch, got {ts}"
3382        );
3383    }
3384
3385    // =========================================================================
3386    // Cycle detection
3387    // =========================================================================
3388
3389    #[test]
3390    fn test_correlation_cycle_direct() {
3391        // Two correlations that reference each other: A -> B -> A
3392        let yaml = r#"
3393title: detection rule
3394id: det-rule
3395logsource:
3396    product: test
3397detection:
3398    selection:
3399        action: click
3400    condition: selection
3401level: low
3402---
3403title: correlation A
3404id: corr-a
3405correlation:
3406    type: event_count
3407    rules:
3408        - corr-b
3409    group-by:
3410        - User
3411    timespan: 5m
3412    condition:
3413        gte: 2
3414level: high
3415---
3416title: correlation B
3417id: corr-b
3418correlation:
3419    type: event_count
3420    rules:
3421        - corr-a
3422    group-by:
3423        - User
3424    timespan: 5m
3425    condition:
3426        gte: 2
3427level: high
3428"#;
3429        let collection = parse_sigma_yaml(yaml).unwrap();
3430        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3431        let result = engine.add_collection(&collection);
3432        assert!(result.is_err(), "should detect direct cycle");
3433        let err = result.unwrap_err().to_string();
3434        assert!(err.contains("cycle"), "error should mention cycle: {err}");
3435        assert!(
3436            err.contains("corr-a") && err.contains("corr-b"),
3437            "error should name both correlations: {err}"
3438        );
3439    }
3440
3441    #[test]
3442    fn test_correlation_cycle_self() {
3443        // A correlation that references itself
3444        let yaml = r#"
3445title: detection rule
3446id: det-rule
3447logsource:
3448    product: test
3449detection:
3450    selection:
3451        action: click
3452    condition: selection
3453level: low
3454---
3455title: self-ref correlation
3456id: self-corr
3457correlation:
3458    type: event_count
3459    rules:
3460        - self-corr
3461    group-by:
3462        - User
3463    timespan: 5m
3464    condition:
3465        gte: 2
3466level: high
3467"#;
3468        let collection = parse_sigma_yaml(yaml).unwrap();
3469        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3470        let result = engine.add_collection(&collection);
3471        assert!(result.is_err(), "should detect self-referencing cycle");
3472        let err = result.unwrap_err().to_string();
3473        assert!(err.contains("cycle"), "error should mention cycle: {err}");
3474        assert!(
3475            err.contains("self-corr"),
3476            "error should name the correlation: {err}"
3477        );
3478    }
3479
3480    #[test]
3481    fn test_correlation_no_cycle_valid_chain() {
3482        // Valid chain: detection -> corr-A -> corr-B (no cycle)
3483        let yaml = r#"
3484title: detection rule
3485id: det-rule
3486logsource:
3487    product: test
3488detection:
3489    selection:
3490        action: click
3491    condition: selection
3492level: low
3493---
3494title: correlation A
3495id: corr-a
3496correlation:
3497    type: event_count
3498    rules:
3499        - det-rule
3500    group-by:
3501        - User
3502    timespan: 5m
3503    condition:
3504        gte: 2
3505level: high
3506---
3507title: correlation B
3508id: corr-b
3509correlation:
3510    type: event_count
3511    rules:
3512        - corr-a
3513    group-by:
3514        - User
3515    timespan: 5m
3516    condition:
3517        gte: 2
3518level: high
3519"#;
3520        let collection = parse_sigma_yaml(yaml).unwrap();
3521        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3522        let result = engine.add_collection(&collection);
3523        assert!(
3524            result.is_ok(),
3525            "valid chain should not be rejected: {result:?}"
3526        );
3527    }
3528
3529    #[test]
3530    fn test_correlation_cycle_transitive() {
3531        // Transitive cycle: A -> B -> C -> A
3532        let yaml = r#"
3533title: detection rule
3534id: det-rule
3535logsource:
3536    product: test
3537detection:
3538    selection:
3539        action: click
3540    condition: selection
3541level: low
3542---
3543title: correlation A
3544id: corr-a
3545correlation:
3546    type: event_count
3547    rules:
3548        - corr-c
3549    group-by:
3550        - User
3551    timespan: 5m
3552    condition:
3553        gte: 2
3554level: high
3555---
3556title: correlation B
3557id: corr-b
3558correlation:
3559    type: event_count
3560    rules:
3561        - corr-a
3562    group-by:
3563        - User
3564    timespan: 5m
3565    condition:
3566        gte: 2
3567level: high
3568---
3569title: correlation C
3570id: corr-c
3571correlation:
3572    type: event_count
3573    rules:
3574        - corr-b
3575    group-by:
3576        - User
3577    timespan: 5m
3578    condition:
3579        gte: 2
3580level: high
3581"#;
3582        let collection = parse_sigma_yaml(yaml).unwrap();
3583        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3584        let result = engine.add_collection(&collection);
3585        assert!(result.is_err(), "should detect transitive cycle");
3586        let err = result.unwrap_err().to_string();
3587        assert!(err.contains("cycle"), "error should mention cycle: {err}");
3588    }
3589
3590    // =========================================================================
3591    // Correlation event inclusion tests
3592    // =========================================================================
3593
3594    #[test]
3595    fn test_correlation_events_disabled_by_default() {
3596        let yaml = r#"
3597title: Login
3598id: login-rule
3599logsource:
3600    category: auth
3601detection:
3602    selection:
3603        EventType: login
3604    condition: selection
3605---
3606title: Many Logins
3607correlation:
3608    type: event_count
3609    rules:
3610        - login-rule
3611    group-by:
3612        - User
3613    timespan: 60s
3614    condition:
3615        gte: 3
3616level: high
3617"#;
3618        let collection = parse_sigma_yaml(yaml).unwrap();
3619        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3620        engine.add_collection(&collection).unwrap();
3621
3622        for i in 0..3 {
3623            let v = json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i});
3624            let event = JsonEvent::borrow(&v);
3625            let result = engine.process_event_at(&event, 1000 + i);
3626            if i == 2 {
3627                assert_eq!(result.correlations.len(), 1);
3628                // Events should NOT be included by default
3629                assert!(result.correlations[0].events.is_none());
3630            }
3631        }
3632    }
3633
3634    #[test]
3635    fn test_correlation_events_included_when_enabled() {
3636        let yaml = r#"
3637title: Login
3638id: login-rule
3639logsource:
3640    category: auth
3641detection:
3642    selection:
3643        EventType: login
3644    condition: selection
3645---
3646title: Many Logins
3647correlation:
3648    type: event_count
3649    rules:
3650        - login-rule
3651    group-by:
3652        - User
3653    timespan: 60s
3654    condition:
3655        gte: 3
3656level: high
3657"#;
3658        let collection = parse_sigma_yaml(yaml).unwrap();
3659        let config = CorrelationConfig {
3660            correlation_event_mode: CorrelationEventMode::Full,
3661            max_correlation_events: 10,
3662            ..Default::default()
3663        };
3664        let mut engine = CorrelationEngine::new(config);
3665        engine.add_collection(&collection).unwrap();
3666
3667        let events_sent: Vec<serde_json::Value> = (0..3)
3668            .map(|i| json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i}))
3669            .collect();
3670
3671        let mut corr_result = None;
3672        for (i, ev) in events_sent.iter().enumerate() {
3673            let event = JsonEvent::borrow(ev);
3674            let result = engine.process_event_at(&event, 1000 + i as i64);
3675            if !result.correlations.is_empty() {
3676                corr_result = Some(result);
3677            }
3678        }
3679
3680        let result = corr_result.expect("correlation should have fired");
3681        let corr = &result.correlations[0];
3682
3683        // Events should be included
3684        let events = corr.events.as_ref().expect("events should be present");
3685        assert_eq!(
3686            events.len(),
3687            3,
3688            "all 3 contributing events should be stored"
3689        );
3690
3691        // Verify all sent events are present
3692        for (i, event) in events.iter().enumerate() {
3693            assert_eq!(event["EventType"], "login");
3694            assert_eq!(event["User"], "admin");
3695            assert_eq!(event["@timestamp"], 1000 + i as i64);
3696        }
3697    }
3698
3699    #[test]
3700    fn test_correlation_events_max_cap() {
3701        let yaml = r#"
3702title: Login
3703id: login-rule
3704logsource:
3705    category: auth
3706detection:
3707    selection:
3708        EventType: login
3709    condition: selection
3710---
3711title: Many Logins
3712correlation:
3713    type: event_count
3714    rules:
3715        - login-rule
3716    group-by:
3717        - User
3718    timespan: 60s
3719    condition:
3720        gte: 5
3721level: high
3722"#;
3723        let collection = parse_sigma_yaml(yaml).unwrap();
3724        let config = CorrelationConfig {
3725            correlation_event_mode: CorrelationEventMode::Full,
3726            max_correlation_events: 3, // only keep last 3
3727            ..Default::default()
3728        };
3729        let mut engine = CorrelationEngine::new(config);
3730        engine.add_collection(&collection).unwrap();
3731
3732        let mut corr_result = None;
3733        for i in 0..5 {
3734            let v = json!({"EventType": "login", "User": "admin", "idx": i});
3735            let event = JsonEvent::borrow(&v);
3736            let result = engine.process_event_at(&event, 1000 + i);
3737            if !result.correlations.is_empty() {
3738                corr_result = Some(result);
3739            }
3740        }
3741
3742        let result = corr_result.expect("correlation should have fired");
3743        let events = result.correlations[0]
3744            .events
3745            .as_ref()
3746            .expect("events should be present");
3747
3748        // Only the last 3 events should be retained (cap = 3)
3749        assert_eq!(events.len(), 3);
3750        assert_eq!(events[0]["idx"], 2);
3751        assert_eq!(events[1]["idx"], 3);
3752        assert_eq!(events[2]["idx"], 4);
3753    }
3754
3755    #[test]
3756    fn test_correlation_events_with_reset_action() {
3757        let yaml = r#"
3758title: Login
3759id: login-rule
3760logsource:
3761    category: auth
3762detection:
3763    selection:
3764        EventType: login
3765    condition: selection
3766---
3767title: Many Logins
3768correlation:
3769    type: event_count
3770    rules:
3771        - login-rule
3772    group-by:
3773        - User
3774    timespan: 60s
3775    condition:
3776        gte: 2
3777level: high
3778"#;
3779        let collection = parse_sigma_yaml(yaml).unwrap();
3780        let config = CorrelationConfig {
3781            correlation_event_mode: CorrelationEventMode::Full,
3782            action_on_match: CorrelationAction::Reset,
3783            ..Default::default()
3784        };
3785        let mut engine = CorrelationEngine::new(config);
3786        engine.add_collection(&collection).unwrap();
3787
3788        // First round: 2 events -> fires
3789        for i in 0..2 {
3790            let v = json!({"EventType": "login", "User": "admin", "round": 1, "idx": i});
3791            let event = JsonEvent::borrow(&v);
3792            let result = engine.process_event_at(&event, 1000 + i);
3793            if i == 1 {
3794                assert_eq!(result.correlations.len(), 1);
3795                let events = result.correlations[0].events.as_ref().unwrap();
3796                assert_eq!(events.len(), 2);
3797            }
3798        }
3799
3800        // After reset, event buffer should be cleared.
3801        // Second round: need 2 more events to fire again
3802        let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 0});
3803        let event = JsonEvent::borrow(&v);
3804        let result = engine.process_event_at(&event, 1010);
3805        assert!(
3806            result.correlations.is_empty(),
3807            "should not fire with only 1 event after reset"
3808        );
3809
3810        let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 1});
3811        let event = JsonEvent::borrow(&v);
3812        let result = engine.process_event_at(&event, 1011);
3813        assert_eq!(result.correlations.len(), 1);
3814        let events = result.correlations[0].events.as_ref().unwrap();
3815        assert_eq!(events.len(), 2);
3816        // Should only have round 2 events
3817        assert_eq!(events[0]["round"], 2);
3818        assert_eq!(events[1]["round"], 2);
3819    }
3820
3821    #[test]
3822    fn test_correlation_events_with_set_include() {
3823        let yaml = r#"
3824title: Login
3825id: login-rule
3826logsource:
3827    category: auth
3828detection:
3829    selection:
3830        EventType: login
3831    condition: selection
3832---
3833title: Many Logins
3834correlation:
3835    type: event_count
3836    rules:
3837        - login-rule
3838    group-by:
3839        - User
3840    timespan: 60s
3841    condition:
3842        gte: 2
3843level: high
3844"#;
3845        let collection = parse_sigma_yaml(yaml).unwrap();
3846        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3847        engine.add_collection(&collection).unwrap();
3848
3849        // Enable via setter
3850        engine.set_correlation_event_mode(CorrelationEventMode::Full);
3851
3852        for i in 0..2 {
3853            let v = json!({"EventType": "login", "User": "admin"});
3854            let event = JsonEvent::borrow(&v);
3855            let result = engine.process_event_at(&event, 1000 + i);
3856            if i == 1 {
3857                assert_eq!(result.correlations.len(), 1);
3858                assert!(result.correlations[0].events.is_some());
3859                assert_eq!(result.correlations[0].events.as_ref().unwrap().len(), 2);
3860            }
3861        }
3862    }
3863
3864    #[test]
3865    fn test_correlation_events_eviction_syncs_with_window() {
3866        let yaml = r#"
3867title: Login
3868id: login-rule
3869logsource:
3870    category: auth
3871detection:
3872    selection:
3873        EventType: login
3874    condition: selection
3875---
3876title: Many Logins
3877correlation:
3878    type: event_count
3879    rules:
3880        - login-rule
3881    group-by:
3882        - User
3883    timespan: 10s
3884    condition:
3885        gte: 3
3886level: high
3887"#;
3888        let collection = parse_sigma_yaml(yaml).unwrap();
3889        let config = CorrelationConfig {
3890            correlation_event_mode: CorrelationEventMode::Full,
3891            max_correlation_events: 100,
3892            ..Default::default()
3893        };
3894        let mut engine = CorrelationEngine::new(config);
3895        engine.add_collection(&collection).unwrap();
3896
3897        // Push 2 events at ts=1000,1001 — within the 10s window
3898        for i in 0..2 {
3899            let v = json!({"EventType": "login", "User": "admin", "idx": i});
3900            let event = JsonEvent::borrow(&v);
3901            engine.process_event_at(&event, 1000 + i);
3902        }
3903
3904        // Push 1 more event at ts=1015 — the first 2 events are now outside the
3905        // 10s window (cutoff = 1015 - 10 = 1005)
3906        let v = json!({"EventType": "login", "User": "admin", "idx": 2});
3907        let event = JsonEvent::borrow(&v);
3908        let result = engine.process_event_at(&event, 1015);
3909        // Should NOT fire: only 1 event in window (the one at ts=1015)
3910        assert!(
3911            result.correlations.is_empty(),
3912            "should not fire — old events evicted"
3913        );
3914
3915        // Push 2 more to reach threshold
3916        for i in 3..5 {
3917            let v = json!({"EventType": "login", "User": "admin", "idx": i});
3918            let event = JsonEvent::borrow(&v);
3919            let result = engine.process_event_at(&event, 1016 + i - 3);
3920            if i == 4 {
3921                assert_eq!(result.correlations.len(), 1);
3922                let events = result.correlations[0].events.as_ref().unwrap();
3923                // Should have events from ts=1015,1016,1017 — not the old ones
3924                assert_eq!(events.len(), 3);
3925                for ev in events {
3926                    assert!(ev["idx"].as_i64().unwrap() >= 2);
3927                }
3928            }
3929        }
3930    }
3931
3932    #[test]
3933    fn test_event_buffer_monitoring() {
3934        let yaml = r#"
3935title: Login
3936id: login-rule
3937logsource:
3938    category: auth
3939detection:
3940    selection:
3941        EventType: login
3942    condition: selection
3943---
3944title: Many Logins
3945correlation:
3946    type: event_count
3947    rules:
3948        - login-rule
3949    group-by:
3950        - User
3951    timespan: 60s
3952    condition:
3953        gte: 100
3954level: high
3955"#;
3956        let collection = parse_sigma_yaml(yaml).unwrap();
3957        let config = CorrelationConfig {
3958            correlation_event_mode: CorrelationEventMode::Full,
3959            ..Default::default()
3960        };
3961        let mut engine = CorrelationEngine::new(config);
3962        engine.add_collection(&collection).unwrap();
3963
3964        assert_eq!(engine.event_buffer_count(), 0);
3965        assert_eq!(engine.event_buffer_bytes(), 0);
3966
3967        // Push some events
3968        for i in 0..5 {
3969            let v = json!({"EventType": "login", "User": "admin"});
3970            let event = JsonEvent::borrow(&v);
3971            engine.process_event_at(&event, 1000 + i);
3972        }
3973
3974        assert_eq!(engine.event_buffer_count(), 1); // one group key
3975        assert!(engine.event_buffer_bytes() > 0);
3976    }
3977
3978    #[test]
3979    fn test_correlation_refs_mode_basic() {
3980        let yaml = r#"
3981title: Login
3982id: login-rule
3983logsource:
3984    category: auth
3985detection:
3986    selection:
3987        EventType: login
3988    condition: selection
3989---
3990title: Many Logins
3991correlation:
3992    type: event_count
3993    rules:
3994        - login-rule
3995    group-by:
3996        - User
3997    timespan: 60s
3998    condition:
3999        gte: 3
4000level: high
4001"#;
4002        let collection = parse_sigma_yaml(yaml).unwrap();
4003        let config = CorrelationConfig {
4004            correlation_event_mode: CorrelationEventMode::Refs,
4005            max_correlation_events: 10,
4006            ..Default::default()
4007        };
4008        let mut engine = CorrelationEngine::new(config);
4009        engine.add_collection(&collection).unwrap();
4010
4011        let mut corr_result = None;
4012        for i in 0..3 {
4013            let v = json!({"EventType": "login", "User": "admin", "id": format!("evt-{i}"), "@timestamp": 1000 + i});
4014            let event = JsonEvent::borrow(&v);
4015            let result = engine.process_event_at(&event, 1000 + i);
4016            if !result.correlations.is_empty() {
4017                corr_result = Some(result.correlations[0].clone());
4018            }
4019        }
4020
4021        let result = corr_result.expect("correlation should have fired");
4022        // In refs mode: events should be None, event_refs should be Some
4023        assert!(
4024            result.events.is_none(),
4025            "Full events should not be included in refs mode"
4026        );
4027        let refs = result
4028            .event_refs
4029            .expect("event_refs should be present in refs mode");
4030        assert_eq!(refs.len(), 3);
4031        assert_eq!(refs[0].timestamp, 1000);
4032        assert_eq!(refs[0].id, Some("evt-0".to_string()));
4033        assert_eq!(refs[1].id, Some("evt-1".to_string()));
4034        assert_eq!(refs[2].id, Some("evt-2".to_string()));
4035    }
4036
4037    #[test]
4038    fn test_correlation_refs_mode_no_id_field() {
4039        let yaml = r#"
4040title: Login
4041id: login-rule
4042logsource:
4043    category: auth
4044detection:
4045    selection:
4046        EventType: login
4047    condition: selection
4048---
4049title: Many Logins
4050correlation:
4051    type: event_count
4052    rules:
4053        - login-rule
4054    group-by:
4055        - User
4056    timespan: 60s
4057    condition:
4058        gte: 2
4059level: high
4060"#;
4061        let collection = parse_sigma_yaml(yaml).unwrap();
4062        let config = CorrelationConfig {
4063            correlation_event_mode: CorrelationEventMode::Refs,
4064            ..Default::default()
4065        };
4066        let mut engine = CorrelationEngine::new(config);
4067        engine.add_collection(&collection).unwrap();
4068
4069        let mut corr_result = None;
4070        for i in 0..2 {
4071            let v = json!({"EventType": "login", "User": "admin"});
4072            let event = JsonEvent::borrow(&v);
4073            let result = engine.process_event_at(&event, 1000 + i);
4074            if !result.correlations.is_empty() {
4075                corr_result = Some(result.correlations[0].clone());
4076            }
4077        }
4078
4079        let result = corr_result.expect("correlation should have fired");
4080        let refs = result.event_refs.expect("event_refs should be present");
4081        // No ID field in events → id should be None
4082        for r in &refs {
4083            assert_eq!(r.id, None);
4084        }
4085    }
4086
4087    #[test]
4088    fn test_per_correlation_custom_attributes_from_yaml() {
4089        let yaml = r#"
4090title: Login
4091id: login-rule
4092logsource:
4093    category: auth
4094detection:
4095    selection:
4096        EventType: login
4097    condition: selection
4098---
4099title: Many Logins
4100custom_attributes:
4101    rsigma.correlation_event_mode: refs
4102    rsigma.max_correlation_events: "5"
4103correlation:
4104    type: event_count
4105    rules:
4106        - login-rule
4107    group-by:
4108        - User
4109    timespan: 60s
4110    condition:
4111        gte: 3
4112level: high
4113"#;
4114        let collection = parse_sigma_yaml(yaml).unwrap();
4115        // Engine mode is None (default), but per-correlation should override to Refs
4116        let config = CorrelationConfig::default();
4117        let mut engine = CorrelationEngine::new(config);
4118        engine.add_collection(&collection).unwrap();
4119
4120        let mut corr_result = None;
4121        for i in 0..3 {
4122            let v = json!({"EventType": "login", "User": "admin", "id": format!("e{i}")});
4123            let event = JsonEvent::borrow(&v);
4124            let result = engine.process_event_at(&event, 1000 + i);
4125            if !result.correlations.is_empty() {
4126                corr_result = Some(result.correlations[0].clone());
4127            }
4128        }
4129
4130        let result = corr_result.expect("correlation should fire with per-correlation refs mode");
4131        // Per-correlation override should enable refs mode even though engine default is None
4132        assert!(result.events.is_none());
4133        let refs = result
4134            .event_refs
4135            .expect("event_refs via per-correlation override");
4136        assert_eq!(refs.len(), 3);
4137        assert_eq!(refs[0].id, Some("e0".to_string()));
4138    }
4139
4140    #[test]
4141    fn test_per_correlation_custom_attr_suppress_and_action() {
4142        let yaml = r#"
4143title: Login
4144id: login-rule
4145logsource:
4146    category: auth
4147detection:
4148    selection:
4149        EventType: login
4150    condition: selection
4151---
4152title: Many Logins
4153custom_attributes:
4154    rsigma.suppress: 10s
4155    rsigma.action: reset
4156correlation:
4157    type: event_count
4158    rules:
4159        - login-rule
4160    group-by:
4161        - User
4162    timespan: 60s
4163    condition:
4164        gte: 2
4165level: high
4166"#;
4167        let collection = parse_sigma_yaml(yaml).unwrap();
4168        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
4169        engine.add_collection(&collection).unwrap();
4170
4171        // Verify the compiled correlation has per-rule overrides
4172        assert_eq!(engine.correlations[0].suppress_secs, Some(10));
4173        assert_eq!(
4174            engine.correlations[0].action,
4175            Some(CorrelationAction::Reset)
4176        );
4177    }
4178
4179    #[test]
4180    fn test_process_with_detections_matches_process_event_at() {
4181        let yaml = r#"
4182title: Login Failure
4183id: login-fail
4184logsource:
4185    category: auth
4186detection:
4187    selection:
4188        EventType: login_failure
4189    condition: selection
4190---
4191title: Brute Force
4192correlation:
4193    type: event_count
4194    rules:
4195        - login-fail
4196    group-by:
4197        - User
4198    timespan: 60s
4199    condition:
4200        gte: 3
4201level: high
4202"#;
4203        let collection = parse_sigma_yaml(yaml).unwrap();
4204
4205        // Run with process_event_at
4206        let mut engine1 = CorrelationEngine::new(CorrelationConfig::default());
4207        engine1.add_collection(&collection).unwrap();
4208
4209        let events: Vec<serde_json::Value> = (0..5)
4210            .map(|i| json!({"EventType": "login_failure", "User": "admin", "@timestamp": format!("2025-01-01T00:00:0{}Z", i + 1)}))
4211            .collect();
4212
4213        let results1: Vec<ProcessResult> = events
4214            .iter()
4215            .enumerate()
4216            .map(|(i, v)| {
4217                let e = JsonEvent::borrow(v);
4218                engine1.process_event_at(&e, 1000 + i as i64)
4219            })
4220            .collect();
4221
4222        // Run with evaluate + process_with_detections
4223        let mut engine2 = CorrelationEngine::new(CorrelationConfig::default());
4224        engine2.add_collection(&collection).unwrap();
4225
4226        let results2: Vec<ProcessResult> = events
4227            .iter()
4228            .enumerate()
4229            .map(|(i, v)| {
4230                let e = JsonEvent::borrow(v);
4231                let detections = engine2.evaluate(&e);
4232                engine2.process_with_detections(&e, detections, 1000 + i as i64)
4233            })
4234            .collect();
4235
4236        // Same number of results
4237        assert_eq!(results1.len(), results2.len());
4238        for (r1, r2) in results1.iter().zip(results2.iter()) {
4239            assert_eq!(r1.detections.len(), r2.detections.len());
4240            assert_eq!(r1.correlations.len(), r2.correlations.len());
4241        }
4242    }
4243
4244    #[test]
4245    fn test_process_batch_matches_sequential() {
4246        let yaml = r#"
4247title: Login Failure
4248id: login-fail-batch
4249logsource:
4250    category: auth
4251detection:
4252    selection:
4253        EventType: login_failure
4254    condition: selection
4255---
4256title: Brute Force Batch
4257correlation:
4258    type: event_count
4259    rules:
4260        - login-fail-batch
4261    group-by:
4262        - User
4263    timespan: 60s
4264    condition:
4265        gte: 3
4266level: high
4267"#;
4268        let collection = parse_sigma_yaml(yaml).unwrap();
4269
4270        let event_values: Vec<serde_json::Value> = (0..5)
4271            .map(|i| json!({"EventType": "login_failure", "User": "admin", "@timestamp": format!("2025-01-01T00:00:0{}Z", i + 1)}))
4272            .collect();
4273
4274        // Sequential
4275        let mut engine1 = CorrelationEngine::new(CorrelationConfig::default());
4276        engine1.add_collection(&collection).unwrap();
4277        let sequential: Vec<ProcessResult> = event_values
4278            .iter()
4279            .enumerate()
4280            .map(|(i, v)| {
4281                let e = JsonEvent::borrow(v);
4282                engine1.process_event_at(&e, 1000 + i as i64)
4283            })
4284            .collect();
4285
4286        // Batch
4287        let mut engine2 = CorrelationEngine::new(CorrelationConfig::default());
4288        engine2.add_collection(&collection).unwrap();
4289        let events: Vec<JsonEvent> = event_values.iter().map(JsonEvent::borrow).collect();
4290        let refs: Vec<&JsonEvent> = events.iter().collect();
4291        let batch = engine2.process_batch(&refs);
4292
4293        assert_eq!(sequential.len(), batch.len());
4294        for (seq, bat) in sequential.iter().zip(batch.iter()) {
4295            assert_eq!(seq.detections.len(), bat.detections.len());
4296            assert_eq!(seq.correlations.len(), bat.correlations.len());
4297        }
4298    }
4299
4300    #[test]
4301    fn test_correlation_result_custom_attributes() {
4302        let yaml = r#"
4303title: Login
4304id: login-cra
4305logsource:
4306    category: auth
4307detection:
4308    selection:
4309        EventType: login
4310    condition: selection
4311level: low
4312---
4313title: Many Logins
4314my_custom_field: hello
4315priority: 9
4316nested:
4317    key: value
4318correlation:
4319    type: event_count
4320    rules:
4321        - login-cra
4322    group-by:
4323        - User
4324    timespan: 60s
4325    condition:
4326        gte: 2
4327level: high
4328"#;
4329        let collection = parse_sigma_yaml(yaml).unwrap();
4330        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
4331        engine.add_collection(&collection).unwrap();
4332
4333        let base_ts = 1000i64;
4334        for i in 0..2 {
4335            let v = json!({"EventType": "login", "User": "alice"});
4336            let event = JsonEvent::borrow(&v);
4337            let result = engine.process_event_at(&event, base_ts + i * 10);
4338
4339            if i == 1 {
4340                assert_eq!(result.correlations.len(), 1);
4341                let corr = &result.correlations[0];
4342                assert_eq!(corr.rule_title, "Many Logins");
4343                assert_eq!(
4344                    corr.custom_attributes.get("my_custom_field"),
4345                    Some(&serde_json::Value::String("hello".to_string()))
4346                );
4347                assert_eq!(
4348                    corr.custom_attributes.get("priority"),
4349                    Some(&serde_json::json!(9))
4350                );
4351                let nested = corr.custom_attributes.get("nested").unwrap();
4352                assert_eq!(nested.get("key"), Some(&serde_json::json!("value")));
4353
4354                assert!(!corr.custom_attributes.contains_key("title"));
4355                assert!(!corr.custom_attributes.contains_key("correlation"));
4356                assert!(!corr.custom_attributes.contains_key("level"));
4357            }
4358        }
4359    }
4360
4361    #[test]
4362    fn test_detection_result_custom_attributes() {
4363        let yaml = r#"
4364title: Login Detection
4365logsource:
4366    category: auth
4367detection:
4368    selection:
4369        EventType: login
4370    condition: selection
4371level: low
4372my_detection_tag: important
4373score: 42
4374"#;
4375        let collection = parse_sigma_yaml(yaml).unwrap();
4376        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
4377        engine.add_collection(&collection).unwrap();
4378
4379        let v = json!({"EventType": "login"});
4380        let event = JsonEvent::borrow(&v);
4381        let result = engine.process_event(&event);
4382
4383        assert_eq!(result.detections.len(), 1);
4384        let det = &result.detections[0];
4385        assert_eq!(
4386            det.custom_attributes.get("my_detection_tag"),
4387            Some(&serde_json::Value::String("important".to_string()))
4388        );
4389        assert_eq!(
4390            det.custom_attributes.get("score"),
4391            Some(&serde_json::json!(42))
4392        );
4393        assert!(!det.custom_attributes.contains_key("title"));
4394    }
4395}