Skip to main content

rsigma_eval/
correlation_engine.rs

1//! Stateful correlation engine with time-windowed aggregation.
2//!
3//! `CorrelationEngine` wraps the stateless `Engine` and adds support for
4//! Sigma correlation rules: `event_count`, `value_count`, `temporal`,
5//! `temporal_ordered`, `value_sum`, `value_avg`, `value_percentile`,
6//! and `value_median`.
7//!
8//! # Architecture
9//!
10//! 1. Events are first evaluated against detection rules (stateless)
11//! 2. Detection matches update correlation window state (stateful)
12//! 3. When a correlation condition is met, a `CorrelationResult` is emitted
13//! 4. Correlation results can chain into higher-level correlations
14
15use std::collections::HashMap;
16
17use chrono::{DateTime, TimeZone, Utc};
18use serde::Serialize;
19
20use rsigma_parser::{CorrelationRule, CorrelationType, Level, SigmaCollection, SigmaRule};
21
22use crate::correlation::{
23    CompiledCorrelation, EventBuffer, EventRef, EventRefBuffer, GroupKey, WindowState,
24    compile_correlation,
25};
26use crate::engine::Engine;
27use crate::error::{EvalError, Result};
28use crate::event::Event;
29use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
30use crate::result::MatchResult;
31
32// =============================================================================
33// Configuration
34// =============================================================================
35
36/// What to do with window state after a correlation fires.
37///
38/// This is an engine-level default that can be overridden per-correlation
39/// via the `rsigma.action` custom attribute set in processing pipelines.
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
41#[serde(rename_all = "snake_case")]
42pub enum CorrelationAction {
43    /// Keep window state as-is after firing (current / default behavior).
44    /// Subsequent events that still satisfy the condition will re-fire.
45    #[default]
46    Alert,
47    /// Clear the window state for the firing group key after emitting the alert.
48    /// The threshold must be met again from scratch before the next alert.
49    Reset,
50}
51
52impl std::str::FromStr for CorrelationAction {
53    type Err = String;
54    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
55        match s {
56            "alert" => Ok(CorrelationAction::Alert),
57            "reset" => Ok(CorrelationAction::Reset),
58            _ => Err(format!(
59                "Unknown correlation action: {s} (expected 'alert' or 'reset')"
60            )),
61        }
62    }
63}
64
65/// How to include events in correlation results.
66///
67/// Can be overridden per-correlation via the `rsigma.correlation_event_mode`
68/// custom attribute.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
70#[serde(rename_all = "snake_case")]
71pub enum CorrelationEventMode {
72    /// Don't include events (default). Zero memory overhead.
73    #[default]
74    None,
75    /// Include full event bodies, individually compressed with deflate.
76    /// Typical cost: 100–1000 bytes per event.
77    Full,
78    /// Include only event references (timestamp + optional ID).
79    /// Minimal memory: ~40 bytes per event.
80    Refs,
81}
82
83impl std::str::FromStr for CorrelationEventMode {
84    type Err = String;
85    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
86        match s.to_lowercase().as_str() {
87            "none" | "off" | "false" => Ok(CorrelationEventMode::None),
88            "full" | "true" => Ok(CorrelationEventMode::Full),
89            "refs" | "references" => Ok(CorrelationEventMode::Refs),
90            _ => Err(format!(
91                "Unknown correlation event mode: {s} (expected 'none', 'full', or 'refs')"
92            )),
93        }
94    }
95}
96
97impl std::fmt::Display for CorrelationEventMode {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        match self {
100            CorrelationEventMode::None => write!(f, "none"),
101            CorrelationEventMode::Full => write!(f, "full"),
102            CorrelationEventMode::Refs => write!(f, "refs"),
103        }
104    }
105}
106
107/// Behavior when no timestamp field is found or parseable in an event.
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
109pub enum TimestampFallback {
110    /// Use wall-clock time (`Utc::now()`). Good for real-time streaming.
111    #[default]
112    WallClock,
113    /// Skip the event from correlation processing. Detections still fire,
114    /// but the event does not update any correlation state. Recommended for
115    /// batch/replay of historical logs where wall-clock time is meaningless.
116    Skip,
117}
118
119/// Configuration for the correlation engine.
120///
121/// Provides engine-level defaults that mirror pySigma backend optional arguments.
122/// Per-correlation overrides can be set via `SetCustomAttribute` pipeline
123/// transformations using the `rsigma.*` attribute namespace.
124#[derive(Debug, Clone)]
125pub struct CorrelationConfig {
126    /// Field names to try for timestamp extraction, in order of priority.
127    ///
128    /// The engine will try each field until one yields a parseable timestamp.
129    /// If none succeed, the `timestamp_fallback` policy applies.
130    pub timestamp_fields: Vec<String>,
131
132    /// What to do when no timestamp can be extracted from an event.
133    ///
134    /// Default: `WallClock` (use `Utc::now()`).
135    pub timestamp_fallback: TimestampFallback,
136
137    /// Maximum number of state entries (across all correlations and groups)
138    /// before aggressive eviction is triggered. Prevents unbounded memory growth.
139    ///
140    /// Default: 100_000.
141    pub max_state_entries: usize,
142
143    /// Default suppression window in seconds.
144    ///
145    /// After a correlation fires for a `(correlation, group_key)`, suppress
146    /// re-alerts for this duration. `None` means no suppression (every
147    /// condition-satisfying event produces an alert).
148    ///
149    /// Can be overridden per-correlation via the `rsigma.suppress` custom attribute.
150    pub suppress: Option<u64>,
151
152    /// Default action to take after a correlation fires.
153    ///
154    /// Can be overridden per-correlation via the `rsigma.action` custom attribute.
155    pub action_on_match: CorrelationAction,
156
157    /// Whether to emit detection-level matches for rules that are only
158    /// referenced by correlations (where `generate: false`).
159    ///
160    /// Default: `true` (emit all detection matches).
161    /// Set to `false` to suppress detection output for correlation-only rules.
162    pub emit_detections: bool,
163
164    /// How to include contributing events in correlation results.
165    ///
166    /// - `None` (default): no event storage, zero overhead.
167    /// - `Full`: events are deflate-compressed and decompressed on output.
168    /// - `Refs`: only timestamps + event IDs are stored (minimal memory).
169    ///
170    /// Can be overridden per-correlation via `rsigma.correlation_event_mode`.
171    pub correlation_event_mode: CorrelationEventMode,
172
173    /// Maximum number of events to store per (correlation, group_key) window
174    /// when `correlation_event_mode` is not `None`.
175    ///
176    /// Bounds memory at: `max_correlation_events × cost_per_event × active_groups`.
177    /// Default: 10.
178    pub max_correlation_events: usize,
179}
180
181impl Default for CorrelationConfig {
182    fn default() -> Self {
183        CorrelationConfig {
184            timestamp_fields: vec![
185                "@timestamp".to_string(),
186                "timestamp".to_string(),
187                "EventTime".to_string(),
188                "TimeCreated".to_string(),
189                "eventTime".to_string(),
190            ],
191            timestamp_fallback: TimestampFallback::default(),
192            max_state_entries: 100_000,
193            suppress: None,
194            action_on_match: CorrelationAction::default(),
195            emit_detections: true,
196            correlation_event_mode: CorrelationEventMode::default(),
197            max_correlation_events: 10,
198        }
199    }
200}
201
202// =============================================================================
203// Result types
204// =============================================================================
205
206/// Combined result from processing a single event.
207#[derive(Debug, Clone, Serialize)]
208pub struct ProcessResult {
209    /// Detection rule matches (stateless, immediate).
210    pub detections: Vec<MatchResult>,
211    /// Correlation rule matches (stateful, accumulated).
212    pub correlations: Vec<CorrelationResult>,
213}
214
215/// The result of a correlation rule firing.
216#[derive(Debug, Clone, Serialize)]
217pub struct CorrelationResult {
218    /// Title of the correlation rule.
219    pub rule_title: String,
220    /// ID of the correlation rule (if present).
221    pub rule_id: Option<String>,
222    /// Severity level.
223    pub level: Option<Level>,
224    /// Tags from the correlation rule.
225    pub tags: Vec<String>,
226    /// Type of correlation.
227    pub correlation_type: CorrelationType,
228    /// Group-by field names and their values for this match.
229    pub group_key: Vec<(String, String)>,
230    /// The aggregated value that triggered the condition (count, sum, avg, etc.).
231    pub aggregated_value: f64,
232    /// The time window in seconds.
233    pub timespan_secs: u64,
234    /// Full event bodies, included when `correlation_event_mode` is `Full`.
235    ///
236    /// Contains up to `max_correlation_events` recently stored window events.
237    /// Events are decompressed from deflate storage on output.
238    #[serde(skip_serializing_if = "Option::is_none")]
239    pub events: Option<Vec<serde_json::Value>>,
240    /// Lightweight event references, included when `correlation_event_mode` is `Refs`.
241    ///
242    /// Contains up to `max_correlation_events` timestamp + optional ID pairs.
243    #[serde(skip_serializing_if = "Option::is_none")]
244    pub event_refs: Option<Vec<EventRef>>,
245}
246
247// =============================================================================
248// Correlation Engine
249// =============================================================================
250
251/// Stateful correlation engine.
252///
253/// Wraps the stateless `Engine` for detection rules and adds time-windowed
254/// correlation on top. Supports all 7 Sigma correlation types and chaining.
255pub struct CorrelationEngine {
256    /// Inner stateless detection engine.
257    engine: Engine,
258    /// Compiled correlation rules.
259    correlations: Vec<CompiledCorrelation>,
260    /// Maps rule ID/name -> indices into `correlations` that reference it.
261    /// This allows quick lookup: "which correlations care about rule X?"
262    rule_index: HashMap<String, Vec<usize>>,
263    /// Maps detection rule index -> (rule_id, rule_name) for reverse lookup.
264    /// Used to find which correlations a detection match triggers.
265    rule_ids: Vec<(Option<String>, Option<String>)>,
266    /// Per-(correlation_index, group_key) window state.
267    state: HashMap<(usize, GroupKey), WindowState>,
268    /// Last alert timestamp per (correlation_index, group_key) for suppression.
269    last_alert: HashMap<(usize, GroupKey), i64>,
270    /// Per-(correlation_index, group_key) compressed event buffer (`Full` mode).
271    event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
272    /// Per-(correlation_index, group_key) event reference buffer (`Refs` mode).
273    event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
274    /// Set of detection rule IDs/names that are "correlation-only"
275    /// (referenced by correlations where `generate == false`).
276    /// Used to filter detection output when `config.emit_detections == false`.
277    correlation_only_rules: std::collections::HashSet<String>,
278    /// Configuration.
279    config: CorrelationConfig,
280    /// Processing pipelines applied to rules during add_rule.
281    pipelines: Vec<Pipeline>,
282}
283
284impl CorrelationEngine {
285    /// Create a new correlation engine with the given configuration.
286    pub fn new(config: CorrelationConfig) -> Self {
287        CorrelationEngine {
288            engine: Engine::new(),
289            correlations: Vec::new(),
290            rule_index: HashMap::new(),
291            rule_ids: Vec::new(),
292            state: HashMap::new(),
293            last_alert: HashMap::new(),
294            event_buffers: HashMap::new(),
295            event_ref_buffers: HashMap::new(),
296            correlation_only_rules: std::collections::HashSet::new(),
297            config,
298            pipelines: Vec::new(),
299        }
300    }
301
302    /// Add a pipeline to the engine.
303    ///
304    /// Pipelines are applied to rules during `add_rule` / `add_collection`.
305    pub fn add_pipeline(&mut self, pipeline: Pipeline) {
306        self.pipelines.push(pipeline);
307        self.pipelines.sort_by_key(|p| p.priority);
308    }
309
310    /// Set global `include_event` on the inner detection engine.
311    pub fn set_include_event(&mut self, include: bool) {
312        self.engine.set_include_event(include);
313    }
314
315    /// Set the global correlation event mode.
316    ///
317    /// - `None`: no event storage (default)
318    /// - `Full`: compressed event bodies
319    /// - `Refs`: lightweight timestamp + ID references
320    pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
321        self.config.correlation_event_mode = mode;
322    }
323
324    /// Set the maximum number of events to store per correlation window group.
325    /// Only meaningful when `correlation_event_mode` is not `None`.
326    pub fn set_max_correlation_events(&mut self, max: usize) {
327        self.config.max_correlation_events = max;
328    }
329
330    /// Add a single detection rule.
331    ///
332    /// If pipelines are set, the rule is cloned and transformed before compilation.
333    /// The inner engine receives the already-transformed rule directly (not through
334    /// its own pipeline, to avoid double transformation).
335    pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
336        if self.pipelines.is_empty() {
337            self.apply_custom_attributes(&rule.custom_attributes);
338            self.rule_ids.push((rule.id.clone(), rule.name.clone()));
339            self.engine.add_rule(rule)?;
340        } else {
341            let mut transformed = rule.clone();
342            apply_pipelines(&self.pipelines, &mut transformed)?;
343            self.apply_custom_attributes(&transformed.custom_attributes);
344            self.rule_ids
345                .push((transformed.id.clone(), transformed.name.clone()));
346            // Use compile_rule + add_compiled_rule to bypass inner engine's pipelines
347            let compiled = crate::compiler::compile_rule(&transformed)?;
348            self.engine.add_compiled_rule(compiled);
349        }
350        Ok(())
351    }
352
353    /// Read `rsigma.*` custom attributes from a rule and apply them to the
354    /// engine configuration.  This allows pipelines to influence engine
355    /// behaviour via `SetCustomAttribute` transformations — the same pattern
356    /// used by pySigma backends (e.g. pySigma-backend-loki).
357    ///
358    /// Supported attributes:
359    /// - `rsigma.timestamp_field` — prepends a field name to the timestamp
360    ///   extraction priority list so the correlation engine can find the
361    ///   event timestamp in non-standard field names.
362    /// - `rsigma.suppress` — sets the default suppression window (e.g. `5m`).
363    ///   Only applied when the CLI did not already set `--suppress`.
364    /// - `rsigma.action` — sets the default post-fire action (`alert`/`reset`).
365    ///   Only applied when the CLI did not already set `--action`.
366    fn apply_custom_attributes(&mut self, attrs: &std::collections::HashMap<String, String>) {
367        // rsigma.timestamp_field — prepend to priority list, skip duplicates
368        if let Some(field) = attrs.get("rsigma.timestamp_field")
369            && !self.config.timestamp_fields.contains(field)
370        {
371            self.config.timestamp_fields.insert(0, field.clone());
372        }
373
374        // rsigma.suppress — only when CLI didn't already set one
375        if let Some(val) = attrs.get("rsigma.suppress")
376            && self.config.suppress.is_none()
377            && let Ok(ts) = rsigma_parser::Timespan::parse(val)
378        {
379            self.config.suppress = Some(ts.seconds);
380        }
381
382        // rsigma.action — only when CLI left it at the default (Alert)
383        if let Some(val) = attrs.get("rsigma.action")
384            && self.config.action_on_match == CorrelationAction::Alert
385            && let Ok(a) = val.parse::<CorrelationAction>()
386        {
387            self.config.action_on_match = a;
388        }
389    }
390
391    /// Add a single correlation rule.
392    pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
393        let owned;
394        let effective = if self.pipelines.is_empty() {
395            corr
396        } else {
397            owned = {
398                let mut c = corr.clone();
399                apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
400                c
401            };
402            &owned
403        };
404
405        // Apply engine-level custom attributes from the (possibly transformed)
406        // correlation rule (e.g. rsigma.timestamp_field).
407        self.apply_custom_attributes(&effective.custom_attributes);
408
409        let compiled = compile_correlation(effective)?;
410        let idx = self.correlations.len();
411
412        // Index by each referenced rule ID/name
413        for rule_ref in &compiled.rule_refs {
414            self.rule_index
415                .entry(rule_ref.clone())
416                .or_default()
417                .push(idx);
418        }
419
420        // Track correlation-only rules (generate == false is the default)
421        if !compiled.generate {
422            for rule_ref in &compiled.rule_refs {
423                self.correlation_only_rules.insert(rule_ref.clone());
424            }
425        }
426
427        self.correlations.push(compiled);
428        Ok(())
429    }
430
431    /// Add all rules and correlations from a parsed collection.
432    ///
433    /// Detection rules are added first (so they're available for correlation
434    /// references), then correlation rules.
435    pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
436        for rule in &collection.rules {
437            self.add_rule(rule)?;
438        }
439        // Apply filter rules to the inner engine's detection rules
440        for filter in &collection.filters {
441            self.engine.apply_filter(filter)?;
442        }
443        for corr in &collection.correlations {
444            self.add_correlation(corr)?;
445        }
446        self.validate_rule_refs()?;
447        self.detect_correlation_cycles()?;
448        Ok(())
449    }
450
451    /// Validate that every correlation's `rule_refs` resolve to at least one
452    /// known detection rule (by ID or name) or another correlation (by ID or name).
453    fn validate_rule_refs(&self) -> Result<()> {
454        let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
455
456        for (id, name) in &self.rule_ids {
457            if let Some(id) = id {
458                known.insert(id.as_str());
459            }
460            if let Some(name) = name {
461                known.insert(name.as_str());
462            }
463        }
464        for corr in &self.correlations {
465            if let Some(ref id) = corr.id {
466                known.insert(id.as_str());
467            }
468            if let Some(ref name) = corr.name {
469                known.insert(name.as_str());
470            }
471        }
472
473        for corr in &self.correlations {
474            for rule_ref in &corr.rule_refs {
475                if !known.contains(rule_ref.as_str()) {
476                    return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
477                }
478            }
479        }
480        Ok(())
481    }
482
483    /// Detect cycles in the correlation reference graph.
484    ///
485    /// Builds a directed graph where each correlation (identified by its id/name)
486    /// has edges to the correlations it references via `rule_refs`. Uses DFS with
487    /// a "gray/black" coloring scheme to detect back-edges (cycles).
488    ///
489    /// Returns `Err(EvalError::CorrelationCycle)` if a cycle is found.
490    fn detect_correlation_cycles(&self) -> Result<()> {
491        // Build a set of all correlation identifiers (id and/or name)
492        let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
493        for (idx, corr) in self.correlations.iter().enumerate() {
494            if let Some(ref id) = corr.id {
495                corr_identifiers.insert(id.as_str(), idx);
496            }
497            if let Some(ref name) = corr.name {
498                corr_identifiers.insert(name.as_str(), idx);
499            }
500        }
501
502        // Build adjacency list: corr index → set of corr indices it references
503        let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
504        for (idx, corr) in self.correlations.iter().enumerate() {
505            for rule_ref in &corr.rule_refs {
506                if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
507                    adj[idx].push(target_idx);
508                }
509            }
510        }
511
512        // DFS cycle detection with three states: white (unvisited), gray (in stack), black (done)
513        let mut state = vec![0u8; self.correlations.len()]; // 0=white, 1=gray, 2=black
514        let mut path: Vec<usize> = Vec::new();
515
516        for start in 0..self.correlations.len() {
517            if state[start] == 0
518                && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
519            {
520                let names: Vec<String> = cycle
521                    .iter()
522                    .map(|&i| {
523                        self.correlations[i]
524                            .id
525                            .as_deref()
526                            .or(self.correlations[i].name.as_deref())
527                            .unwrap_or(&self.correlations[i].title)
528                            .to_string()
529                    })
530                    .collect();
531                return Err(crate::error::EvalError::CorrelationCycle(
532                    names.join(" -> "),
533                ));
534            }
535        }
536        Ok(())
537    }
538
539    /// DFS helper that returns the cycle path if a back-edge is found.
540    fn dfs_find_cycle(
541        node: usize,
542        adj: &[Vec<usize>],
543        state: &mut [u8],
544        path: &mut Vec<usize>,
545    ) -> Option<Vec<usize>> {
546        state[node] = 1; // gray
547        path.push(node);
548
549        for &next in &adj[node] {
550            if state[next] == 1 {
551                // Back-edge found — extract cycle from path
552                if let Some(pos) = path.iter().position(|&n| n == next) {
553                    let mut cycle = path[pos..].to_vec();
554                    cycle.push(next); // close the cycle
555                    return Some(cycle);
556                }
557            }
558            if state[next] == 0
559                && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
560            {
561                return Some(cycle);
562            }
563        }
564
565        path.pop();
566        state[node] = 2; // black
567        None
568    }
569
570    /// Process an event, extracting the timestamp from configured event fields.
571    ///
572    /// When no timestamp field is found, the `timestamp_fallback` policy applies:
573    /// - `WallClock`: use `Utc::now()` (good for real-time streaming)
574    /// - `Skip`: return detections only, skip correlation state updates
575    pub fn process_event(&mut self, event: &Event) -> ProcessResult {
576        let all_detections = self.engine.evaluate(event);
577
578        let ts = match self.extract_event_timestamp(event) {
579            Some(ts) => ts,
580            None => match self.config.timestamp_fallback {
581                TimestampFallback::WallClock => Utc::now().timestamp(),
582                TimestampFallback::Skip => {
583                    // Still run detection (stateless), but skip correlation
584                    let detections = self.filter_detections(all_detections);
585                    return ProcessResult {
586                        detections,
587                        correlations: Vec::new(),
588                    };
589                }
590            },
591        };
592        self.process_with_detections(event, all_detections, ts)
593    }
594
595    /// Process an event with an explicit Unix epoch timestamp (seconds).
596    ///
597    /// The timestamp is clamped to `[0, i64::MAX / 2]` to prevent overflow
598    /// when adding timespan durations internally.
599    pub fn process_event_at(&mut self, event: &Event, timestamp_secs: i64) -> ProcessResult {
600        let all_detections = self.engine.evaluate(event);
601        self.process_with_detections(event, all_detections, timestamp_secs)
602    }
603
604    /// Process an event with pre-computed detection results.
605    ///
606    /// Enables external parallelism: callers can run detection (via
607    /// [`evaluate`](Self::evaluate)) in parallel, then feed results here
608    /// sequentially for stateful correlation.
609    pub fn process_with_detections(
610        &mut self,
611        event: &Event,
612        all_detections: Vec<MatchResult>,
613        timestamp_secs: i64,
614    ) -> ProcessResult {
615        let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
616
617        // Memory management — evict before adding new state to enforce limit
618        if self.state.len() >= self.config.max_state_entries {
619            self.evict_all(timestamp_secs);
620        }
621
622        // Feed detection matches into correlations
623        let mut correlations = Vec::new();
624        self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
625
626        // Chain — correlation results may trigger higher-level correlations
627        self.chain_correlations(&correlations, timestamp_secs);
628
629        // Filter detections by generate flag
630        let detections = self.filter_detections(all_detections);
631
632        ProcessResult {
633            detections,
634            correlations,
635        }
636    }
637
638    /// Run stateless detection only (no correlation), delegating to the inner engine.
639    ///
640    /// Takes `&self` so it can be called concurrently from multiple threads
641    /// (e.g. via `rayon::par_iter`) while the mutable correlation phase runs
642    /// sequentially afterwards.
643    pub fn evaluate(&self, event: &Event) -> Vec<MatchResult> {
644        self.engine.evaluate(event)
645    }
646
647    /// Process a batch of events: parallel detection, then sequential correlation.
648    ///
649    /// When the `parallel` feature is enabled, the stateless detection phase runs
650    /// concurrently via rayon. Timestamp extraction also runs in the parallel
651    /// phase (it borrows `&self.config` immutably). After `collect()` releases the
652    /// immutable borrows, each event's pre-computed detections are fed into the
653    /// stateful correlation engine sequentially.
654    pub fn process_batch<'a>(&mut self, events: &[&'a Event<'a>]) -> Vec<ProcessResult> {
655        // Borrow split: take immutable refs to fields needed for the parallel phase.
656        // These are released by collect() before the sequential &mut self phase.
657        let engine = &self.engine;
658        let ts_fields = &self.config.timestamp_fields;
659
660        let batch_results: Vec<(Vec<MatchResult>, Option<i64>)> = {
661            #[cfg(feature = "parallel")]
662            {
663                use rayon::prelude::*;
664                events
665                    .par_iter()
666                    .map(|e| {
667                        let detections = engine.evaluate(e);
668                        let ts = extract_event_ts(e, ts_fields);
669                        (detections, ts)
670                    })
671                    .collect()
672            }
673            #[cfg(not(feature = "parallel"))]
674            {
675                events
676                    .iter()
677                    .map(|e| {
678                        let detections = engine.evaluate(e);
679                        let ts = extract_event_ts(e, ts_fields);
680                        (detections, ts)
681                    })
682                    .collect()
683            }
684        };
685
686        // Sequential correlation phase
687        let mut results = Vec::with_capacity(events.len());
688        for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
689            match ts_opt {
690                Some(ts) => {
691                    results.push(self.process_with_detections(event, detections, ts));
692                }
693                None => match self.config.timestamp_fallback {
694                    TimestampFallback::WallClock => {
695                        let ts = Utc::now().timestamp();
696                        results.push(self.process_with_detections(event, detections, ts));
697                    }
698                    TimestampFallback::Skip => {
699                        // Still return detection results, but skip correlation
700                        let detections = self.filter_detections(detections);
701                        results.push(ProcessResult {
702                            detections,
703                            correlations: Vec::new(),
704                        });
705                    }
706                },
707            }
708        }
709        results
710    }
711
712    /// Filter detections by the `generate` flag / `emit_detections` config.
713    ///
714    /// If `emit_detections` is false and some rules are correlation-only,
715    /// their detection output is suppressed.
716    fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
717        if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
718            all_detections
719                .into_iter()
720                .filter(|m| {
721                    let id_match = m
722                        .rule_id
723                        .as_ref()
724                        .is_some_and(|id| self.correlation_only_rules.contains(id));
725                    !id_match
726                })
727                .collect()
728        } else {
729            all_detections
730        }
731    }
732
733    /// Feed detection matches into correlation window states.
734    fn feed_detections(
735        &mut self,
736        event: &Event,
737        detections: &[MatchResult],
738        ts: i64,
739        out: &mut Vec<CorrelationResult>,
740    ) {
741        // Collect all (corr_idx, rule_id, rule_name) tuples upfront to avoid
742        // borrow conflicts between self.rule_ids and self.update_correlation.
743        let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
744
745        for det in detections {
746            // Use the MatchResult's rule_id to find the original rule's ID/name.
747            // We also look up by rule_id in our rule_ids table for the name.
748            let (rule_id, rule_name) = self.find_rule_identity(det);
749
750            // Collect correlation indices that reference this rule
751            let mut corr_indices = Vec::new();
752            if let Some(ref id) = rule_id
753                && let Some(indices) = self.rule_index.get(id)
754            {
755                corr_indices.extend(indices);
756            }
757            if let Some(ref name) = rule_name
758                && let Some(indices) = self.rule_index.get(name)
759            {
760                corr_indices.extend(indices);
761            }
762
763            corr_indices.sort_unstable();
764            corr_indices.dedup();
765
766            for &corr_idx in &corr_indices {
767                work.push((corr_idx, rule_id.clone(), rule_name.clone()));
768            }
769        }
770
771        for (corr_idx, rule_id, rule_name) in work {
772            self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
773        }
774    }
775
776    /// Find the (id, name) for a detection match by searching our rule_ids table.
777    fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
778        // First, try to find by matching rule_id in our table
779        if let Some(ref match_id) = det.rule_id {
780            for (id, name) in &self.rule_ids {
781                if id.as_deref() == Some(match_id.as_str()) {
782                    return (id.clone(), name.clone());
783                }
784            }
785        }
786        // Fall back to using just the MatchResult's rule_id
787        (det.rule_id.clone(), None)
788    }
789
790    /// Resolve the event mode for a given correlation.
791    fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
792        let corr = &self.correlations[corr_idx];
793        corr.event_mode
794            .unwrap_or(self.config.correlation_event_mode)
795    }
796
797    /// Resolve the max events cap for a given correlation.
798    fn resolve_max_events(&self, corr_idx: usize) -> usize {
799        let corr = &self.correlations[corr_idx];
800        corr.max_events
801            .unwrap_or(self.config.max_correlation_events)
802    }
803
804    /// Update a single correlation's state and check its condition.
805    fn update_correlation(
806        &mut self,
807        corr_idx: usize,
808        event: &Event,
809        ts: i64,
810        rule_id: &Option<String>,
811        rule_name: &Option<String>,
812        out: &mut Vec<CorrelationResult>,
813    ) {
814        // Borrow the correlation by reference — no cloning needed.  Rust allows
815        // simultaneous &self.correlations and &mut self.state / &mut self.last_alert
816        // because they are disjoint struct fields.
817        let corr = &self.correlations[corr_idx];
818        let corr_type = corr.correlation_type;
819        let timespan = corr.timespan_secs;
820        let level = corr.level;
821        let suppress_secs = corr.suppress_secs.or(self.config.suppress);
822        let action = corr.action.unwrap_or(self.config.action_on_match);
823        let event_mode = self.resolve_event_mode(corr_idx);
824        let max_events = self.resolve_max_events(corr_idx);
825
826        // Determine the rule_ref strings for alias resolution and temporal tracking.
827        let mut ref_strs: Vec<&str> = Vec::new();
828        if let Some(id) = rule_id.as_deref() {
829            ref_strs.push(id);
830        }
831        if let Some(name) = rule_name.as_deref() {
832            ref_strs.push(name);
833        }
834        let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
835
836        // Extract group key
837        let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
838
839        // Get or create window state
840        let state_key = (corr_idx, group_key.clone());
841        let state = self
842            .state
843            .entry(state_key.clone())
844            .or_insert_with(|| WindowState::new_for(corr_type));
845
846        // Evict expired entries
847        let cutoff = ts - timespan as i64;
848        state.evict(cutoff);
849
850        // Push the new event into the state
851        match corr_type {
852            CorrelationType::EventCount => {
853                state.push_event_count(ts);
854            }
855            CorrelationType::ValueCount => {
856                if let Some(ref field_name) = corr.condition.field
857                    && let Some(val) = event.get_field(field_name)
858                    && let Some(s) = value_to_string_for_count(val)
859                {
860                    state.push_value_count(ts, s);
861                }
862            }
863            CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
864                state.push_temporal(ts, rule_ref);
865            }
866            CorrelationType::ValueSum
867            | CorrelationType::ValueAvg
868            | CorrelationType::ValuePercentile
869            | CorrelationType::ValueMedian => {
870                if let Some(ref field_name) = corr.condition.field
871                    && let Some(val) = event.get_field(field_name)
872                    && let Some(n) = value_to_f64(val)
873                {
874                    state.push_numeric(ts, n);
875                }
876            }
877        }
878
879        // Push event into buffer based on event mode
880        match event_mode {
881            CorrelationEventMode::Full => {
882                let buf = self
883                    .event_buffers
884                    .entry(state_key.clone())
885                    .or_insert_with(|| EventBuffer::new(max_events));
886                buf.evict(cutoff);
887                buf.push(ts, event.as_value());
888            }
889            CorrelationEventMode::Refs => {
890                let buf = self
891                    .event_ref_buffers
892                    .entry(state_key.clone())
893                    .or_insert_with(|| EventRefBuffer::new(max_events));
894                buf.evict(cutoff);
895                buf.push(ts, event.as_value());
896            }
897            CorrelationEventMode::None => {}
898        }
899
900        // Check condition — after this, `state` is no longer used (NLL drops the borrow).
901        let fired = state.check_condition(
902            &corr.condition,
903            corr_type,
904            &corr.rule_refs,
905            corr.extended_expr.as_ref(),
906        );
907
908        if let Some(agg_value) = fired {
909            let alert_key = (corr_idx, group_key.clone());
910
911            // Suppression check: skip if we've already alerted within the suppress window
912            let suppressed = if let Some(suppress) = suppress_secs {
913                if let Some(&last_ts) = self.last_alert.get(&alert_key) {
914                    (ts - last_ts) < suppress as i64
915                } else {
916                    false
917                }
918            } else {
919                false
920            };
921
922            if !suppressed {
923                // Retrieve stored events / refs based on mode
924                let (events, event_refs) = match event_mode {
925                    CorrelationEventMode::Full => {
926                        let stored = self
927                            .event_buffers
928                            .get(&alert_key)
929                            .map(|buf| buf.decompress_all())
930                            .unwrap_or_default();
931                        (Some(stored), None)
932                    }
933                    CorrelationEventMode::Refs => {
934                        let stored = self
935                            .event_ref_buffers
936                            .get(&alert_key)
937                            .map(|buf| buf.refs())
938                            .unwrap_or_default();
939                        (None, Some(stored))
940                    }
941                    CorrelationEventMode::None => (None, None),
942                };
943
944                // Only clone title/id/tags when we actually produce output
945                let corr = &self.correlations[corr_idx];
946                let result = CorrelationResult {
947                    rule_title: corr.title.clone(),
948                    rule_id: corr.id.clone(),
949                    level,
950                    tags: corr.tags.clone(),
951                    correlation_type: corr_type,
952                    group_key: group_key.to_pairs(&corr.group_by),
953                    aggregated_value: agg_value,
954                    timespan_secs: timespan,
955                    events,
956                    event_refs,
957                };
958                out.push(result);
959
960                // Record alert time for suppression
961                self.last_alert.insert(alert_key.clone(), ts);
962
963                // Action on match
964                if action == CorrelationAction::Reset {
965                    if let Some(state) = self.state.get_mut(&alert_key) {
966                        state.clear();
967                    }
968                    if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
969                        buf.clear();
970                    }
971                    if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
972                        buf.clear();
973                    }
974                }
975            }
976        }
977    }
978
979    /// Propagate correlation results to higher-level correlations (chaining).
980    ///
981    /// When a correlation fires, any correlation that references it (by ID or name)
982    /// is updated. Limits chain depth to 10 to prevent infinite loops.
983    fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
984        const MAX_CHAIN_DEPTH: usize = 10;
985        let mut pending: Vec<CorrelationResult> = fired.to_vec();
986        let mut depth = 0;
987
988        while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
989            depth += 1;
990
991            // Collect work items: (corr_idx, group_key_pairs, fired_ref)
992            #[allow(clippy::type_complexity)]
993            let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
994            for result in &pending {
995                if let Some(ref id) = result.rule_id
996                    && let Some(indices) = self.rule_index.get(id)
997                {
998                    let fired_ref = result
999                        .rule_id
1000                        .as_deref()
1001                        .unwrap_or(&result.rule_title)
1002                        .to_string();
1003                    for &corr_idx in indices {
1004                        work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
1005                    }
1006                }
1007            }
1008
1009            let mut next_pending = Vec::new();
1010            for (corr_idx, group_key_pairs, fired_ref) in work {
1011                let corr = &self.correlations[corr_idx];
1012                let corr_type = corr.correlation_type;
1013                let timespan = corr.timespan_secs;
1014                let level = corr.level;
1015
1016                let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
1017                let state_key = (corr_idx, group_key.clone());
1018                let state = self
1019                    .state
1020                    .entry(state_key)
1021                    .or_insert_with(|| WindowState::new_for(corr_type));
1022
1023                let cutoff = ts - timespan as i64;
1024                state.evict(cutoff);
1025
1026                match corr_type {
1027                    CorrelationType::EventCount => {
1028                        state.push_event_count(ts);
1029                    }
1030                    CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
1031                        state.push_temporal(ts, &fired_ref);
1032                    }
1033                    _ => {
1034                        state.push_event_count(ts);
1035                    }
1036                }
1037
1038                let fired = state.check_condition(
1039                    &corr.condition,
1040                    corr_type,
1041                    &corr.rule_refs,
1042                    corr.extended_expr.as_ref(),
1043                );
1044
1045                if let Some(agg_value) = fired {
1046                    let corr = &self.correlations[corr_idx];
1047                    next_pending.push(CorrelationResult {
1048                        rule_title: corr.title.clone(),
1049                        rule_id: corr.id.clone(),
1050                        level,
1051                        tags: corr.tags.clone(),
1052                        correlation_type: corr_type,
1053                        group_key: group_key.to_pairs(&corr.group_by),
1054                        aggregated_value: agg_value,
1055                        timespan_secs: timespan,
1056                        // Chained correlations don't include events (they aggregate
1057                        // over correlation results, not raw events)
1058                        events: None,
1059                        event_refs: None,
1060                    });
1061                }
1062            }
1063
1064            pending = next_pending;
1065        }
1066
1067        if !pending.is_empty() {
1068            log::warn!(
1069                "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
1070                 {} pending result(s) were not propagated further. \
1071                 This may indicate a cycle in correlation references.",
1072                pending.len()
1073            );
1074        }
1075    }
1076
1077    // =========================================================================
1078    // Timestamp extraction
1079    // =========================================================================
1080
1081    /// Extract a Unix epoch timestamp (seconds) from an event.
1082    ///
1083    /// Tries each configured timestamp field in order. Supports:
1084    /// - Numeric values (epoch seconds, or epoch millis if > 1e12)
1085    /// - ISO 8601 strings (e.g., "2024-07-10T12:30:00Z")
1086    ///
1087    /// Returns `None` if no field yields a valid timestamp.
1088    fn extract_event_timestamp(&self, event: &Event) -> Option<i64> {
1089        for field_name in &self.config.timestamp_fields {
1090            if let Some(val) = event.get_field(field_name)
1091                && let Some(ts) = parse_timestamp_value(val)
1092            {
1093                return Some(ts);
1094            }
1095        }
1096        None
1097    }
1098
1099    // =========================================================================
1100    // State management
1101    // =========================================================================
1102
1103    /// Manually evict all expired state entries.
1104    pub fn evict_expired(&mut self, now_secs: i64) {
1105        self.evict_all(now_secs);
1106    }
1107
1108    /// Evict expired entries and remove empty states.
1109    fn evict_all(&mut self, now_secs: i64) {
1110        // Phase 1: Time-based eviction — remove entries outside their correlation window
1111        let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
1112
1113        self.state.retain(|&(corr_idx, _), state| {
1114            if corr_idx < timespans.len() {
1115                let cutoff = now_secs - timespans[corr_idx] as i64;
1116                state.evict(cutoff);
1117            }
1118            !state.is_empty()
1119        });
1120
1121        // Evict event buffers in sync with window state
1122        self.event_buffers.retain(|&(corr_idx, _), buf| {
1123            if corr_idx < timespans.len() {
1124                let cutoff = now_secs - timespans[corr_idx] as i64;
1125                buf.evict(cutoff);
1126            }
1127            !buf.is_empty()
1128        });
1129        self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
1130            if corr_idx < timespans.len() {
1131                let cutoff = now_secs - timespans[corr_idx] as i64;
1132                buf.evict(cutoff);
1133            }
1134            !buf.is_empty()
1135        });
1136
1137        // Phase 2: Hard cap — if still over limit after time-based eviction (e.g.
1138        // high-cardinality traffic with long windows), drop the stalest entries
1139        // until we're at 90% capacity to avoid evicting on every single event.
1140        if self.state.len() >= self.config.max_state_entries {
1141            let target = self.config.max_state_entries * 9 / 10;
1142            let excess = self.state.len() - target;
1143
1144            // Collect keys with their latest timestamp, sort by oldest first
1145            let mut by_staleness: Vec<_> = self
1146                .state
1147                .iter()
1148                .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1149                .collect();
1150            by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1151
1152            // Drop the oldest entries (and their associated event buffers)
1153            for (key, _) in by_staleness.into_iter().take(excess) {
1154                self.state.remove(&key);
1155                self.last_alert.remove(&key);
1156                self.event_buffers.remove(&key);
1157                self.event_ref_buffers.remove(&key);
1158            }
1159        }
1160
1161        // Phase 3: Evict stale last_alert entries — remove if the suppress window
1162        // has passed or if the corresponding window state no longer exists.
1163        self.last_alert.retain(|key, &mut alert_ts| {
1164            let suppress = if key.0 < self.correlations.len() {
1165                self.correlations[key.0]
1166                    .suppress_secs
1167                    .or(self.config.suppress)
1168                    .unwrap_or(0)
1169            } else {
1170                0
1171            };
1172            (now_secs - alert_ts) < suppress as i64
1173        });
1174    }
1175
1176    /// Number of active state entries (for monitoring).
1177    pub fn state_count(&self) -> usize {
1178        self.state.len()
1179    }
1180
1181    /// Number of detection rules loaded.
1182    pub fn detection_rule_count(&self) -> usize {
1183        self.engine.rule_count()
1184    }
1185
1186    /// Number of correlation rules loaded.
1187    pub fn correlation_rule_count(&self) -> usize {
1188        self.correlations.len()
1189    }
1190
1191    /// Number of active event buffers (for monitoring).
1192    pub fn event_buffer_count(&self) -> usize {
1193        self.event_buffers.len()
1194    }
1195
1196    /// Total compressed bytes across all event buffers (for monitoring).
1197    pub fn event_buffer_bytes(&self) -> usize {
1198        self.event_buffers
1199            .values()
1200            .map(|b| b.compressed_bytes())
1201            .sum()
1202    }
1203
1204    /// Number of active event ref buffers — `Refs` mode (for monitoring).
1205    pub fn event_ref_buffer_count(&self) -> usize {
1206        self.event_ref_buffers.len()
1207    }
1208
1209    /// Access the inner stateless engine.
1210    pub fn engine(&self) -> &Engine {
1211        &self.engine
1212    }
1213
1214    /// Export all mutable correlation state as a serializable snapshot.
1215    ///
1216    /// The snapshot uses stable correlation identifiers (id > name > title)
1217    /// instead of internal indices, so it survives rule reloads as long as
1218    /// the correlation rules keep the same identifiers.
1219    pub fn export_state(&self) -> CorrelationSnapshot {
1220        let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1221        for ((idx, gk), ws) in &self.state {
1222            let corr_id = self.correlation_stable_id(*idx);
1223            windows
1224                .entry(corr_id)
1225                .or_default()
1226                .push((gk.clone(), ws.clone()));
1227        }
1228
1229        let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1230        for ((idx, gk), ts) in &self.last_alert {
1231            let corr_id = self.correlation_stable_id(*idx);
1232            last_alert
1233                .entry(corr_id)
1234                .or_default()
1235                .push((gk.clone(), *ts));
1236        }
1237
1238        let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1239        for ((idx, gk), buf) in &self.event_buffers {
1240            let corr_id = self.correlation_stable_id(*idx);
1241            event_buffers
1242                .entry(corr_id)
1243                .or_default()
1244                .push((gk.clone(), buf.clone()));
1245        }
1246
1247        let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1248            HashMap::new();
1249        for ((idx, gk), buf) in &self.event_ref_buffers {
1250            let corr_id = self.correlation_stable_id(*idx);
1251            event_ref_buffers
1252                .entry(corr_id)
1253                .or_default()
1254                .push((gk.clone(), buf.clone()));
1255        }
1256
1257        CorrelationSnapshot {
1258            version: SNAPSHOT_VERSION,
1259            windows,
1260            last_alert,
1261            event_buffers,
1262            event_ref_buffers,
1263        }
1264    }
1265
1266    /// Import previously exported state, mapping stable identifiers back to
1267    /// current correlation indices. Entries whose identifiers no longer match
1268    /// any loaded correlation are silently dropped.
1269    ///
1270    /// Returns `false` (and imports nothing) if the snapshot version is
1271    /// incompatible with the current schema.
1272    pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1273        if snapshot.version != SNAPSHOT_VERSION {
1274            return false;
1275        }
1276        let id_to_idx = self.build_id_to_index_map();
1277
1278        for (corr_id, groups) in snapshot.windows {
1279            if let Some(&idx) = id_to_idx.get(&corr_id) {
1280                for (gk, ws) in groups {
1281                    self.state.insert((idx, gk), ws);
1282                }
1283            }
1284        }
1285
1286        for (corr_id, groups) in snapshot.last_alert {
1287            if let Some(&idx) = id_to_idx.get(&corr_id) {
1288                for (gk, ts) in groups {
1289                    self.last_alert.insert((idx, gk), ts);
1290                }
1291            }
1292        }
1293
1294        for (corr_id, groups) in snapshot.event_buffers {
1295            if let Some(&idx) = id_to_idx.get(&corr_id) {
1296                for (gk, buf) in groups {
1297                    self.event_buffers.insert((idx, gk), buf);
1298                }
1299            }
1300        }
1301
1302        for (corr_id, groups) in snapshot.event_ref_buffers {
1303            if let Some(&idx) = id_to_idx.get(&corr_id) {
1304                for (gk, buf) in groups {
1305                    self.event_ref_buffers.insert((idx, gk), buf);
1306                }
1307            }
1308        }
1309
1310        true
1311    }
1312
1313    /// Stable identifier for a correlation rule: prefers id, then name, then title.
1314    fn correlation_stable_id(&self, idx: usize) -> String {
1315        let corr = &self.correlations[idx];
1316        corr.id
1317            .clone()
1318            .or_else(|| corr.name.clone())
1319            .unwrap_or_else(|| corr.title.clone())
1320    }
1321
1322    /// Build a reverse map from stable id → current correlation index.
1323    fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1324        self.correlations
1325            .iter()
1326            .enumerate()
1327            .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1328            .collect()
1329    }
1330}
1331
1332/// Current snapshot schema version. Bump when the serialized format changes.
1333const SNAPSHOT_VERSION: u32 = 1;
1334
1335/// Serializable snapshot of all mutable correlation state.
1336///
1337/// Uses stable string identifiers (correlation id/name/title) as keys so the
1338/// snapshot can be restored after a rule reload, even if internal indices change.
1339/// Inner maps use `Vec<(GroupKey, T)>` instead of `HashMap<GroupKey, T>` because
1340/// `GroupKey` cannot be used as a JSON object key.
1341#[derive(Debug, Clone, Serialize, serde::Deserialize)]
1342pub struct CorrelationSnapshot {
1343    /// Schema version — used to detect incompatible snapshots on load.
1344    #[serde(default = "default_snapshot_version")]
1345    pub version: u32,
1346    /// Per-correlation, per-group window state.
1347    pub windows: HashMap<String, Vec<(GroupKey, WindowState)>>,
1348    /// Per-correlation, per-group last alert timestamp (for suppression).
1349    pub last_alert: HashMap<String, Vec<(GroupKey, i64)>>,
1350    /// Per-correlation, per-group compressed event buffers.
1351    pub event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>>,
1352    /// Per-correlation, per-group event reference buffers.
1353    pub event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>>,
1354}
1355
1356fn default_snapshot_version() -> u32 {
1357    1
1358}
1359
1360impl Default for CorrelationEngine {
1361    fn default() -> Self {
1362        Self::new(CorrelationConfig::default())
1363    }
1364}
1365
1366// =============================================================================
1367// Timestamp parsing helpers
1368// =============================================================================
1369
1370/// Extract a timestamp from an event using the given field names.
1371///
1372/// Standalone version of `CorrelationEngine::extract_event_timestamp` for use
1373/// in contexts where borrowing `&self` is not possible (e.g. rayon closures).
1374fn extract_event_ts(event: &Event, timestamp_fields: &[String]) -> Option<i64> {
1375    for field_name in timestamp_fields {
1376        if let Some(val) = event.get_field(field_name)
1377            && let Some(ts) = parse_timestamp_value(val)
1378        {
1379            return Some(ts);
1380        }
1381    }
1382    None
1383}
1384
1385/// Parse a JSON value as a Unix epoch timestamp in seconds.
1386fn parse_timestamp_value(val: &serde_json::Value) -> Option<i64> {
1387    match val {
1388        serde_json::Value::Number(n) => {
1389            if let Some(i) = n.as_i64() {
1390                Some(normalize_epoch(i))
1391            } else {
1392                n.as_f64().map(|f| normalize_epoch(f as i64))
1393            }
1394        }
1395        serde_json::Value::String(s) => parse_timestamp_string(s),
1396        _ => None,
1397    }
1398}
1399
1400/// Normalize an epoch value: if it looks like milliseconds (> year 33658),
1401/// convert to seconds.
1402fn normalize_epoch(v: i64) -> i64 {
1403    if v > 1_000_000_000_000 { v / 1000 } else { v }
1404}
1405
1406/// Parse a timestamp string. Tries ISO 8601 with timezone, then without.
1407fn parse_timestamp_string(s: &str) -> Option<i64> {
1408    // Try RFC 3339 / ISO 8601 with timezone
1409    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1410        return Some(dt.timestamp());
1411    }
1412
1413    // Try ISO 8601 without timezone (assume UTC)
1414    // Common formats: "2024-07-10T12:30:00", "2024-07-10 12:30:00"
1415    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1416        return Some(Utc.from_utc_datetime(&naive).timestamp());
1417    }
1418    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1419        return Some(Utc.from_utc_datetime(&naive).timestamp());
1420    }
1421
1422    // Try with fractional seconds
1423    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1424        return Some(Utc.from_utc_datetime(&naive).timestamp());
1425    }
1426    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1427        return Some(Utc.from_utc_datetime(&naive).timestamp());
1428    }
1429
1430    None
1431}
1432
1433/// Convert a JSON value to a string for value_count purposes.
1434fn value_to_string_for_count(v: &serde_json::Value) -> Option<String> {
1435    match v {
1436        serde_json::Value::String(s) => Some(s.clone()),
1437        serde_json::Value::Number(n) => Some(n.to_string()),
1438        serde_json::Value::Bool(b) => Some(b.to_string()),
1439        serde_json::Value::Null => Some("null".to_string()),
1440        _ => None,
1441    }
1442}
1443
1444/// Convert a JSON value to f64 for numeric aggregation.
1445fn value_to_f64(v: &serde_json::Value) -> Option<f64> {
1446    match v {
1447        serde_json::Value::Number(n) => n.as_f64(),
1448        serde_json::Value::String(s) => s.parse().ok(),
1449        _ => None,
1450    }
1451}
1452
1453// =============================================================================
1454// Tests
1455// =============================================================================
1456
1457#[cfg(test)]
1458mod tests {
1459    use super::*;
1460    use rsigma_parser::parse_sigma_yaml;
1461    use serde_json::json;
1462
1463    // =========================================================================
1464    // Timestamp parsing
1465    // =========================================================================
1466
1467    #[test]
1468    fn test_parse_timestamp_epoch_secs() {
1469        let val = json!(1720612200);
1470        assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1471    }
1472
1473    #[test]
1474    fn test_parse_timestamp_epoch_millis() {
1475        let val = json!(1720612200000i64);
1476        assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1477    }
1478
1479    #[test]
1480    fn test_parse_timestamp_rfc3339() {
1481        let val = json!("2024-07-10T12:30:00Z");
1482        let ts = parse_timestamp_value(&val).unwrap();
1483        assert_eq!(ts, 1720614600);
1484    }
1485
1486    #[test]
1487    fn test_parse_timestamp_naive() {
1488        let val = json!("2024-07-10T12:30:00");
1489        let ts = parse_timestamp_value(&val).unwrap();
1490        assert_eq!(ts, 1720614600);
1491    }
1492
1493    #[test]
1494    fn test_parse_timestamp_with_space() {
1495        let val = json!("2024-07-10 12:30:00");
1496        let ts = parse_timestamp_value(&val).unwrap();
1497        assert_eq!(ts, 1720614600);
1498    }
1499
1500    #[test]
1501    fn test_parse_timestamp_fractional() {
1502        let val = json!("2024-07-10T12:30:00.123Z");
1503        let ts = parse_timestamp_value(&val).unwrap();
1504        assert_eq!(ts, 1720614600);
1505    }
1506
1507    #[test]
1508    fn test_extract_timestamp_from_event() {
1509        let config = CorrelationConfig {
1510            timestamp_fields: vec!["@timestamp".to_string()],
1511            max_state_entries: 100_000,
1512            ..Default::default()
1513        };
1514        let engine = CorrelationEngine::new(config);
1515
1516        let v = json!({"@timestamp": "2024-07-10T12:30:00Z", "data": "test"});
1517        let event = Event::from_value(&v);
1518        let ts = engine.extract_event_timestamp(&event);
1519        assert_eq!(ts, Some(1720614600));
1520    }
1521
1522    #[test]
1523    fn test_extract_timestamp_fallback_fields() {
1524        let config = CorrelationConfig {
1525            timestamp_fields: vec![
1526                "@timestamp".to_string(),
1527                "timestamp".to_string(),
1528                "EventTime".to_string(),
1529            ],
1530            max_state_entries: 100_000,
1531            ..Default::default()
1532        };
1533        let engine = CorrelationEngine::new(config);
1534
1535        // First field missing, second field present
1536        let v = json!({"timestamp": 1720613400, "data": "test"});
1537        let event = Event::from_value(&v);
1538        let ts = engine.extract_event_timestamp(&event);
1539        assert_eq!(ts, Some(1720613400));
1540    }
1541
1542    #[test]
1543    fn test_extract_timestamp_returns_none_when_missing() {
1544        let config = CorrelationConfig {
1545            timestamp_fields: vec!["@timestamp".to_string()],
1546            ..Default::default()
1547        };
1548        let engine = CorrelationEngine::new(config);
1549
1550        let v = json!({"data": "no timestamp here"});
1551        let event = Event::from_value(&v);
1552        assert_eq!(engine.extract_event_timestamp(&event), None);
1553    }
1554
1555    #[test]
1556    fn test_timestamp_fallback_skip() {
1557        let yaml = r#"
1558title: test rule
1559id: ts-skip-rule
1560logsource:
1561    product: test
1562detection:
1563    selection:
1564        action: click
1565    condition: selection
1566level: low
1567---
1568title: test correlation
1569correlation:
1570    type: event_count
1571    rules:
1572        - ts-skip-rule
1573    group-by:
1574        - User
1575    timespan: 10s
1576    condition:
1577        gte: 2
1578level: high
1579"#;
1580        let collection = parse_sigma_yaml(yaml).unwrap();
1581        let mut engine = CorrelationEngine::new(CorrelationConfig {
1582            timestamp_fallback: TimestampFallback::Skip,
1583            ..Default::default()
1584        });
1585        engine.add_collection(&collection).unwrap();
1586        assert_eq!(engine.correlation_rule_count(), 1);
1587
1588        // Events with no timestamp field — should NOT update correlation state
1589        let v = json!({"action": "click", "User": "alice"});
1590        let event = Event::from_value(&v);
1591
1592        let r1 = engine.process_event(&event);
1593        assert!(!r1.detections.is_empty(), "detection should still fire");
1594
1595        let r2 = engine.process_event(&event);
1596        assert!(!r2.detections.is_empty(), "detection should still fire");
1597
1598        let r3 = engine.process_event(&event);
1599        assert!(!r3.detections.is_empty(), "detection should still fire");
1600
1601        // No correlations should fire because events were skipped
1602        assert!(r1.correlations.is_empty());
1603        assert!(r2.correlations.is_empty());
1604        assert!(r3.correlations.is_empty());
1605    }
1606
1607    #[test]
1608    fn test_timestamp_fallback_wallclock_default() {
1609        let yaml = r#"
1610title: test rule
1611id: ts-wc-rule
1612logsource:
1613    product: test
1614detection:
1615    selection:
1616        action: click
1617    condition: selection
1618level: low
1619---
1620title: test correlation
1621correlation:
1622    type: event_count
1623    rules:
1624        - ts-wc-rule
1625    group-by:
1626        - User
1627    timespan: 60s
1628    condition:
1629        gte: 2
1630level: high
1631"#;
1632        let collection = parse_sigma_yaml(yaml).unwrap();
1633        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1634        engine.add_collection(&collection).unwrap();
1635        assert_eq!(engine.correlation_rule_count(), 1);
1636
1637        // Events with no timestamp — WallClock fallback means they get Utc::now()
1638        // and should be close enough to correlate (generous 60s window)
1639        let v = json!({"action": "click", "User": "alice"});
1640        let event = Event::from_value(&v);
1641
1642        let _r1 = engine.process_event(&event);
1643        let _r2 = engine.process_event(&event);
1644        let r3 = engine.process_event(&event);
1645
1646        // With WallClock, all events get near-identical timestamps and should correlate
1647        assert!(
1648            !r3.correlations.is_empty(),
1649            "WallClock fallback should allow correlation"
1650        );
1651    }
1652
1653    // =========================================================================
1654    // Event count correlation
1655    // =========================================================================
1656
1657    #[test]
1658    fn test_event_count_basic() {
1659        let yaml = r#"
1660title: Base Rule
1661id: base-rule-001
1662name: base_rule
1663logsource:
1664    product: windows
1665    category: process_creation
1666detection:
1667    selection:
1668        CommandLine|contains: 'whoami'
1669    condition: selection
1670level: low
1671---
1672title: Multiple Whoami
1673id: corr-001
1674correlation:
1675    type: event_count
1676    rules:
1677        - base-rule-001
1678    group-by:
1679        - User
1680    timespan: 60s
1681    condition:
1682        gte: 3
1683level: high
1684"#;
1685        let collection = parse_sigma_yaml(yaml).unwrap();
1686        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1687        engine.add_collection(&collection).unwrap();
1688
1689        assert_eq!(engine.detection_rule_count(), 1);
1690        assert_eq!(engine.correlation_rule_count(), 1);
1691
1692        // Send 3 events from same user within the window
1693        let base_ts = 1000i64;
1694        for i in 0..3 {
1695            let v = json!({"CommandLine": "whoami", "User": "admin"});
1696            let event = Event::from_value(&v);
1697            let result = engine.process_event_at(&event, base_ts + i * 10);
1698
1699            // Each event should match the detection rule
1700            assert_eq!(result.detections.len(), 1);
1701
1702            if i < 2 {
1703                // Not enough events yet
1704                assert!(result.correlations.is_empty());
1705            } else {
1706                // 3rd event triggers the correlation
1707                assert_eq!(result.correlations.len(), 1);
1708                assert_eq!(result.correlations[0].rule_title, "Multiple Whoami");
1709                assert_eq!(result.correlations[0].aggregated_value, 3.0);
1710            }
1711        }
1712    }
1713
1714    #[test]
1715    fn test_event_count_different_groups() {
1716        let yaml = r#"
1717title: Login
1718id: login-001
1719logsource:
1720    category: auth
1721detection:
1722    selection:
1723        EventType: login
1724    condition: selection
1725level: low
1726---
1727title: Many Logins
1728id: corr-login
1729correlation:
1730    type: event_count
1731    rules:
1732        - login-001
1733    group-by:
1734        - User
1735    timespan: 60s
1736    condition:
1737        gte: 3
1738level: high
1739"#;
1740        let collection = parse_sigma_yaml(yaml).unwrap();
1741        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1742        engine.add_collection(&collection).unwrap();
1743
1744        // User "alice" sends 2 events, "bob" sends 3
1745        let ts = 1000i64;
1746        for i in 0..2 {
1747            let v = json!({"EventType": "login", "User": "alice"});
1748            let event = Event::from_value(&v);
1749            let r = engine.process_event_at(&event, ts + i);
1750            assert!(r.correlations.is_empty());
1751        }
1752        for i in 0..3 {
1753            let v = json!({"EventType": "login", "User": "bob"});
1754            let event = Event::from_value(&v);
1755            let r = engine.process_event_at(&event, ts + i);
1756            if i == 2 {
1757                assert_eq!(r.correlations.len(), 1);
1758                assert_eq!(
1759                    r.correlations[0].group_key,
1760                    vec![("User".to_string(), "bob".to_string())]
1761                );
1762            }
1763        }
1764    }
1765
1766    #[test]
1767    fn test_event_count_window_expiry() {
1768        let yaml = r#"
1769title: Base
1770id: base-002
1771logsource:
1772    category: test
1773detection:
1774    selection:
1775        action: click
1776    condition: selection
1777---
1778title: Rapid Clicks
1779id: corr-002
1780correlation:
1781    type: event_count
1782    rules:
1783        - base-002
1784    group-by:
1785        - User
1786    timespan: 10s
1787    condition:
1788        gte: 3
1789level: medium
1790"#;
1791        let collection = parse_sigma_yaml(yaml).unwrap();
1792        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1793        engine.add_collection(&collection).unwrap();
1794
1795        // Send 2 events at t=0,1 then 1 event at t=15 (outside window)
1796        let v = json!({"action": "click", "User": "admin"});
1797        let event = Event::from_value(&v);
1798        engine.process_event_at(&event, 0);
1799        engine.process_event_at(&event, 1);
1800        let r = engine.process_event_at(&event, 15);
1801        // Only 1 event in window [5, 15], not enough
1802        assert!(r.correlations.is_empty());
1803    }
1804
1805    // =========================================================================
1806    // Value count correlation
1807    // =========================================================================
1808
1809    #[test]
1810    fn test_value_count() {
1811        let yaml = r#"
1812title: Failed Login
1813id: failed-login-001
1814logsource:
1815    category: auth
1816detection:
1817    selection:
1818        EventType: failed_login
1819    condition: selection
1820level: low
1821---
1822title: Failed Logins From Many Users
1823id: corr-vc-001
1824correlation:
1825    type: value_count
1826    rules:
1827        - failed-login-001
1828    group-by:
1829        - Host
1830    timespan: 60s
1831    condition:
1832        field: User
1833        gte: 3
1834level: high
1835"#;
1836        let collection = parse_sigma_yaml(yaml).unwrap();
1837        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1838        engine.add_collection(&collection).unwrap();
1839
1840        let ts = 1000i64;
1841        // 3 different users failing login on same host
1842        for (i, user) in ["alice", "bob", "charlie"].iter().enumerate() {
1843            let v = json!({"EventType": "failed_login", "Host": "srv01", "User": user});
1844            let event = Event::from_value(&v);
1845            let r = engine.process_event_at(&event, ts + i as i64);
1846            if i == 2 {
1847                assert_eq!(r.correlations.len(), 1);
1848                assert_eq!(r.correlations[0].aggregated_value, 3.0);
1849            }
1850        }
1851    }
1852
1853    // =========================================================================
1854    // Temporal correlation
1855    // =========================================================================
1856
1857    #[test]
1858    fn test_temporal() {
1859        let yaml = r#"
1860title: Recon A
1861id: recon-a
1862name: recon_a
1863logsource:
1864    category: process
1865detection:
1866    selection:
1867        CommandLine|contains: 'whoami'
1868    condition: selection
1869---
1870title: Recon B
1871id: recon-b
1872name: recon_b
1873logsource:
1874    category: process
1875detection:
1876    selection:
1877        CommandLine|contains: 'ipconfig'
1878    condition: selection
1879---
1880title: Recon Combo
1881id: corr-temporal
1882correlation:
1883    type: temporal
1884    rules:
1885        - recon-a
1886        - recon-b
1887    group-by:
1888        - User
1889    timespan: 60s
1890    condition:
1891        gte: 2
1892level: high
1893"#;
1894        let collection = parse_sigma_yaml(yaml).unwrap();
1895        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1896        engine.add_collection(&collection).unwrap();
1897
1898        let ts = 1000i64;
1899        // Only recon A fires
1900        let v1 = json!({"CommandLine": "whoami", "User": "admin"});
1901        let ev1 = Event::from_value(&v1);
1902        let r1 = engine.process_event_at(&ev1, ts);
1903        assert!(r1.correlations.is_empty());
1904
1905        // Now recon B fires — both rules have fired within window
1906        let v2 = json!({"CommandLine": "ipconfig /all", "User": "admin"});
1907        let ev2 = Event::from_value(&v2);
1908        let r2 = engine.process_event_at(&ev2, ts + 10);
1909        assert_eq!(r2.correlations.len(), 1);
1910        assert_eq!(r2.correlations[0].rule_title, "Recon Combo");
1911    }
1912
1913    // =========================================================================
1914    // Temporal ordered correlation
1915    // =========================================================================
1916
1917    #[test]
1918    fn test_temporal_ordered() {
1919        let yaml = r#"
1920title: Failed Login
1921id: failed-001
1922name: failed_login
1923logsource:
1924    category: auth
1925detection:
1926    selection:
1927        EventType: failed_login
1928    condition: selection
1929---
1930title: Success Login
1931id: success-001
1932name: successful_login
1933logsource:
1934    category: auth
1935detection:
1936    selection:
1937        EventType: success_login
1938    condition: selection
1939---
1940title: Brute Force Then Login
1941id: corr-bf
1942correlation:
1943    type: temporal_ordered
1944    rules:
1945        - failed-001
1946        - success-001
1947    group-by:
1948        - User
1949    timespan: 60s
1950    condition:
1951        gte: 2
1952level: critical
1953"#;
1954        let collection = parse_sigma_yaml(yaml).unwrap();
1955        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1956        engine.add_collection(&collection).unwrap();
1957
1958        let ts = 1000i64;
1959        // Failed login first
1960        let v1 = json!({"EventType": "failed_login", "User": "admin"});
1961        let ev1 = Event::from_value(&v1);
1962        let r1 = engine.process_event_at(&ev1, ts);
1963        assert!(r1.correlations.is_empty());
1964
1965        // Then successful login — correct order!
1966        let v2 = json!({"EventType": "success_login", "User": "admin"});
1967        let ev2 = Event::from_value(&v2);
1968        let r2 = engine.process_event_at(&ev2, ts + 10);
1969        assert_eq!(r2.correlations.len(), 1);
1970    }
1971
1972    #[test]
1973    fn test_temporal_ordered_wrong_order() {
1974        let yaml = r#"
1975title: Rule A
1976id: rule-a
1977logsource:
1978    category: test
1979detection:
1980    selection:
1981        type: a
1982    condition: selection
1983---
1984title: Rule B
1985id: rule-b
1986logsource:
1987    category: test
1988detection:
1989    selection:
1990        type: b
1991    condition: selection
1992---
1993title: A then B
1994id: corr-ab
1995correlation:
1996    type: temporal_ordered
1997    rules:
1998        - rule-a
1999        - rule-b
2000    group-by:
2001        - User
2002    timespan: 60s
2003    condition:
2004        gte: 2
2005level: high
2006"#;
2007        let collection = parse_sigma_yaml(yaml).unwrap();
2008        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2009        engine.add_collection(&collection).unwrap();
2010
2011        let ts = 1000i64;
2012        // B fires first, then A — wrong order
2013        let v1 = json!({"type": "b", "User": "admin"});
2014        let ev1 = Event::from_value(&v1);
2015        engine.process_event_at(&ev1, ts);
2016
2017        let v2 = json!({"type": "a", "User": "admin"});
2018        let ev2 = Event::from_value(&v2);
2019        let r2 = engine.process_event_at(&ev2, ts + 10);
2020        assert!(r2.correlations.is_empty());
2021    }
2022
2023    // =========================================================================
2024    // Numeric aggregation (value_sum, value_avg)
2025    // =========================================================================
2026
2027    #[test]
2028    fn test_value_sum() {
2029        let yaml = r#"
2030title: Web Access
2031id: web-001
2032logsource:
2033    category: web
2034detection:
2035    selection:
2036        action: upload
2037    condition: selection
2038---
2039title: Large Upload
2040id: corr-sum
2041correlation:
2042    type: value_sum
2043    rules:
2044        - web-001
2045    group-by:
2046        - User
2047    timespan: 60s
2048    condition:
2049        field: bytes_sent
2050        gt: 1000
2051level: high
2052"#;
2053        let collection = parse_sigma_yaml(yaml).unwrap();
2054        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2055        engine.add_collection(&collection).unwrap();
2056
2057        let ts = 1000i64;
2058        let v1 = json!({"action": "upload", "User": "alice", "bytes_sent": 600});
2059        let ev1 = Event::from_value(&v1);
2060        let r1 = engine.process_event_at(&ev1, ts);
2061        assert!(r1.correlations.is_empty());
2062
2063        let v2 = json!({"action": "upload", "User": "alice", "bytes_sent": 500});
2064        let ev2 = Event::from_value(&v2);
2065        let r2 = engine.process_event_at(&ev2, ts + 5);
2066        assert_eq!(r2.correlations.len(), 1);
2067        assert!((r2.correlations[0].aggregated_value - 1100.0).abs() < f64::EPSILON);
2068    }
2069
2070    #[test]
2071    fn test_value_avg() {
2072        let yaml = r#"
2073title: Request
2074id: req-001
2075logsource:
2076    category: web
2077detection:
2078    selection:
2079        type: request
2080    condition: selection
2081---
2082title: High Avg Latency
2083id: corr-avg
2084correlation:
2085    type: value_avg
2086    rules:
2087        - req-001
2088    group-by:
2089        - Service
2090    timespan: 60s
2091    condition:
2092        field: latency_ms
2093        gt: 500
2094level: medium
2095"#;
2096        let collection = parse_sigma_yaml(yaml).unwrap();
2097        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2098        engine.add_collection(&collection).unwrap();
2099
2100        let ts = 1000i64;
2101        // Avg of 400, 600, 800 = 600 > 500
2102        for (i, latency) in [400, 600, 800].iter().enumerate() {
2103            let v = json!({"type": "request", "Service": "api", "latency_ms": latency});
2104            let event = Event::from_value(&v);
2105            let r = engine.process_event_at(&event, ts + i as i64);
2106            if i == 2 {
2107                assert_eq!(r.correlations.len(), 1);
2108                assert!((r.correlations[0].aggregated_value - 600.0).abs() < f64::EPSILON);
2109            }
2110        }
2111    }
2112
2113    // =========================================================================
2114    // State management
2115    // =========================================================================
2116
2117    #[test]
2118    fn test_state_count() {
2119        let yaml = r#"
2120title: Base
2121id: base-sc
2122logsource:
2123    category: test
2124detection:
2125    selection:
2126        action: test
2127    condition: selection
2128---
2129title: Count
2130id: corr-sc
2131correlation:
2132    type: event_count
2133    rules:
2134        - base-sc
2135    group-by:
2136        - User
2137    timespan: 60s
2138    condition:
2139        gte: 100
2140level: low
2141"#;
2142        let collection = parse_sigma_yaml(yaml).unwrap();
2143        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2144        engine.add_collection(&collection).unwrap();
2145
2146        let v = json!({"action": "test", "User": "alice"});
2147        let event = Event::from_value(&v);
2148        engine.process_event_at(&event, 1000);
2149        assert_eq!(engine.state_count(), 1);
2150
2151        let v2 = json!({"action": "test", "User": "bob"});
2152        let event2 = Event::from_value(&v2);
2153        engine.process_event_at(&event2, 1001);
2154        assert_eq!(engine.state_count(), 2);
2155
2156        // Evict everything
2157        engine.evict_expired(2000);
2158        assert_eq!(engine.state_count(), 0);
2159    }
2160
2161    // =========================================================================
2162    // Generate flag
2163    // =========================================================================
2164
2165    #[test]
2166    fn test_generate_flag_default_false() {
2167        let yaml = r#"
2168title: Base
2169id: gen-base
2170logsource:
2171    category: test
2172detection:
2173    selection:
2174        action: test
2175    condition: selection
2176---
2177title: Correlation
2178id: gen-corr
2179correlation:
2180    type: event_count
2181    rules:
2182        - gen-base
2183    group-by:
2184        - User
2185    timespan: 60s
2186    condition:
2187        gte: 1
2188level: high
2189"#;
2190        let collection = parse_sigma_yaml(yaml).unwrap();
2191        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2192        engine.add_collection(&collection).unwrap();
2193
2194        // generate defaults to false — detection matches are still returned
2195        // (filtering by generate flag is a backend concern, not eval)
2196        let v = json!({"action": "test", "User": "alice"});
2197        let event = Event::from_value(&v);
2198        let r = engine.process_event_at(&event, 1000);
2199        assert_eq!(r.detections.len(), 1);
2200        assert_eq!(r.correlations.len(), 1);
2201    }
2202
2203    // =========================================================================
2204    // Real-world example: AWS bucket enumeration
2205    // =========================================================================
2206
2207    #[test]
2208    fn test_aws_bucket_enumeration() {
2209        let yaml = r#"
2210title: Potential Bucket Enumeration on AWS
2211id: f305fd62-beca-47da-ad95-7690a0620084
2212logsource:
2213    product: aws
2214    service: cloudtrail
2215detection:
2216    selection:
2217        eventSource: "s3.amazonaws.com"
2218        eventName: "ListBuckets"
2219    condition: selection
2220level: low
2221---
2222title: Multiple AWS bucket enumerations
2223id: be246094-01d3-4bba-88de-69e582eba0cc
2224status: experimental
2225correlation:
2226    type: event_count
2227    rules:
2228        - f305fd62-beca-47da-ad95-7690a0620084
2229    group-by:
2230        - userIdentity.arn
2231    timespan: 1h
2232    condition:
2233        gte: 5
2234level: high
2235"#;
2236        let collection = parse_sigma_yaml(yaml).unwrap();
2237        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2238        engine.add_collection(&collection).unwrap();
2239
2240        let base_ts = 1_700_000_000i64;
2241        for i in 0..5 {
2242            let v = json!({
2243                "eventSource": "s3.amazonaws.com",
2244                "eventName": "ListBuckets",
2245                "userIdentity.arn": "arn:aws:iam::123456789:user/attacker"
2246            });
2247            let event = Event::from_value(&v);
2248            let r = engine.process_event_at(&event, base_ts + i * 60);
2249            if i == 4 {
2250                assert_eq!(r.correlations.len(), 1);
2251                assert_eq!(
2252                    r.correlations[0].rule_title,
2253                    "Multiple AWS bucket enumerations"
2254                );
2255                assert_eq!(r.correlations[0].aggregated_value, 5.0);
2256            }
2257        }
2258    }
2259
2260    // =========================================================================
2261    // Chaining: event_count -> temporal_ordered
2262    // =========================================================================
2263
2264    #[test]
2265    fn test_chaining_event_count_to_temporal() {
2266        // Reproduces the spec's "failed logins followed by successful login" example.
2267        // Chain: failed_login (detection) -> many_failed (event_count) -> brute_then_login (temporal_ordered)
2268        let yaml = r#"
2269title: Single failed login
2270id: failed-login-chain
2271name: failed_login
2272logsource:
2273    category: auth
2274detection:
2275    selection:
2276        EventType: failed_login
2277    condition: selection
2278---
2279title: Successful login
2280id: success-login-chain
2281name: successful_login
2282logsource:
2283    category: auth
2284detection:
2285    selection:
2286        EventType: success_login
2287    condition: selection
2288---
2289title: Multiple failed logins
2290id: many-failed-chain
2291name: multiple_failed_login
2292correlation:
2293    type: event_count
2294    rules:
2295        - failed-login-chain
2296    group-by:
2297        - User
2298    timespan: 60s
2299    condition:
2300        gte: 3
2301level: medium
2302---
2303title: Brute Force Followed by Login
2304id: brute-force-chain
2305correlation:
2306    type: temporal_ordered
2307    rules:
2308        - many-failed-chain
2309        - success-login-chain
2310    group-by:
2311        - User
2312    timespan: 120s
2313    condition:
2314        gte: 2
2315level: critical
2316"#;
2317        let collection = parse_sigma_yaml(yaml).unwrap();
2318        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2319        engine.add_collection(&collection).unwrap();
2320
2321        assert_eq!(engine.detection_rule_count(), 2);
2322        assert_eq!(engine.correlation_rule_count(), 2);
2323
2324        let ts = 1000i64;
2325
2326        // Send 3 failed logins → triggers "many_failed_chain"
2327        for i in 0..3 {
2328            let v = json!({"EventType": "failed_login", "User": "victim"});
2329            let event = Event::from_value(&v);
2330            let r = engine.process_event_at(&event, ts + i);
2331            if i == 2 {
2332                // The event_count correlation should fire
2333                assert!(
2334                    r.correlations
2335                        .iter()
2336                        .any(|c| c.rule_title == "Multiple failed logins"),
2337                    "Expected event_count correlation to fire"
2338                );
2339            }
2340        }
2341
2342        // Now send a successful login → should trigger the chained temporal_ordered
2343        // Note: chaining happens in chain_correlations when many-failed-chain fires
2344        // and then success-login-chain matches the detection.
2345        // The temporal_ordered correlation needs BOTH many-failed-chain AND success-login-chain
2346        // to have fired. success-login-chain is a detection rule, not a correlation,
2347        // so it gets matched via the regular detection path.
2348        let v = json!({"EventType": "success_login", "User": "victim"});
2349        let event = Event::from_value(&v);
2350        let r = engine.process_event_at(&event, ts + 30);
2351
2352        // The detection should match
2353        assert_eq!(r.detections.len(), 1);
2354        assert_eq!(r.detections[0].rule_title, "Successful login");
2355    }
2356
2357    // =========================================================================
2358    // Field aliases
2359    // =========================================================================
2360
2361    #[test]
2362    fn test_field_aliases() {
2363        let yaml = r#"
2364title: Internal Error
2365id: internal-error-001
2366name: internal_error
2367logsource:
2368    category: web
2369detection:
2370    selection:
2371        http.response.status_code: 500
2372    condition: selection
2373---
2374title: New Connection
2375id: new-conn-001
2376name: new_network_connection
2377logsource:
2378    category: network
2379detection:
2380    selection:
2381        event.type: connection
2382    condition: selection
2383---
2384title: Error Then Connection
2385id: corr-alias
2386correlation:
2387    type: temporal
2388    rules:
2389        - internal-error-001
2390        - new-conn-001
2391    group-by:
2392        - internal_ip
2393    timespan: 60s
2394    condition:
2395        gte: 2
2396    aliases:
2397        internal_ip:
2398            internal_error: destination.ip
2399            new_network_connection: source.ip
2400level: high
2401"#;
2402        let collection = parse_sigma_yaml(yaml).unwrap();
2403        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2404        engine.add_collection(&collection).unwrap();
2405
2406        let ts = 1000i64;
2407
2408        // Internal error with destination.ip = 10.0.0.5
2409        let v1 = json!({
2410            "http.response.status_code": 500,
2411            "destination.ip": "10.0.0.5"
2412        });
2413        let ev1 = Event::from_value(&v1);
2414        let r1 = engine.process_event_at(&ev1, ts);
2415        assert_eq!(r1.detections.len(), 1);
2416        assert!(r1.correlations.is_empty());
2417
2418        // New connection with source.ip = 10.0.0.5 (same IP, aliased)
2419        let v2 = json!({
2420            "event.type": "connection",
2421            "source.ip": "10.0.0.5"
2422        });
2423        let ev2 = Event::from_value(&v2);
2424        let r2 = engine.process_event_at(&ev2, ts + 5);
2425        assert_eq!(r2.detections.len(), 1);
2426        // Both rules fired for the same internal_ip group → temporal should fire
2427        assert_eq!(r2.correlations.len(), 1);
2428        assert_eq!(r2.correlations[0].rule_title, "Error Then Connection");
2429        // Check group key contains the aliased field
2430        assert!(
2431            r2.correlations[0]
2432                .group_key
2433                .iter()
2434                .any(|(k, v)| k == "internal_ip" && v == "10.0.0.5")
2435        );
2436    }
2437
2438    // =========================================================================
2439    // Value percentile (basic smoke test)
2440    // =========================================================================
2441
2442    #[test]
2443    fn test_value_percentile() {
2444        let yaml = r#"
2445title: Process Creation
2446id: proc-001
2447logsource:
2448    category: process
2449detection:
2450    selection:
2451        type: process_creation
2452    condition: selection
2453---
2454title: Rare Process
2455id: corr-percentile
2456correlation:
2457    type: value_percentile
2458    rules:
2459        - proc-001
2460    group-by:
2461        - ComputerName
2462    timespan: 60s
2463    condition:
2464        field: image
2465        lte: 50
2466level: medium
2467"#;
2468        let collection = parse_sigma_yaml(yaml).unwrap();
2469        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2470        engine.add_collection(&collection).unwrap();
2471
2472        let ts = 1000i64;
2473        // Push some numeric-ish values for the image field
2474        for (i, val) in [10.0, 20.0, 30.0, 40.0, 50.0].iter().enumerate() {
2475            let v = json!({"type": "process_creation", "ComputerName": "srv01", "image": val});
2476            let event = Event::from_value(&v);
2477            let _ = engine.process_event_at(&event, ts + i as i64);
2478        }
2479        // The median (30.0) should be <= 50, so condition fires
2480        // Note: percentile implementation is simplified for in-memory eval
2481    }
2482
2483    // =========================================================================
2484    // Extended temporal conditions (end-to-end)
2485    // =========================================================================
2486
2487    #[test]
2488    fn test_extended_temporal_and_condition() {
2489        // Temporal correlation with "rule_a and rule_b" extended condition
2490        let yaml = r#"
2491title: Login Attempt
2492id: login-attempt
2493logsource:
2494    category: auth
2495detection:
2496    selection:
2497        EventType: login_failure
2498    condition: selection
2499---
2500title: Password Change
2501id: password-change
2502logsource:
2503    category: auth
2504detection:
2505    selection:
2506        EventType: password_change
2507    condition: selection
2508---
2509title: Credential Attack
2510correlation:
2511    type: temporal
2512    rules:
2513        - login-attempt
2514        - password-change
2515    group-by:
2516        - User
2517    timespan: 300s
2518    condition: login-attempt and password-change
2519level: high
2520"#;
2521        let collection = parse_sigma_yaml(yaml).unwrap();
2522        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2523        engine.add_collection(&collection).unwrap();
2524
2525        let ts = 1000i64;
2526
2527        // Login failure by alice
2528        let ev1 = json!({"EventType": "login_failure", "User": "alice"});
2529        let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2530        assert!(r1.correlations.is_empty(), "only one rule fired so far");
2531
2532        // Password change by alice — both rules have now fired
2533        let ev2 = json!({"EventType": "password_change", "User": "alice"});
2534        let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 10);
2535        assert_eq!(
2536            r2.correlations.len(),
2537            1,
2538            "temporal correlation should fire: both rules matched"
2539        );
2540        assert_eq!(r2.correlations[0].rule_title, "Credential Attack");
2541    }
2542
2543    #[test]
2544    fn test_extended_temporal_or_condition() {
2545        // Temporal with "rule_a or rule_b" — should fire when either fires
2546        let yaml = r#"
2547title: SSH Login
2548id: ssh-login
2549logsource:
2550    category: auth
2551detection:
2552    selection:
2553        EventType: ssh_login
2554    condition: selection
2555---
2556title: VPN Login
2557id: vpn-login
2558logsource:
2559    category: auth
2560detection:
2561    selection:
2562        EventType: vpn_login
2563    condition: selection
2564---
2565title: Any Remote Access
2566correlation:
2567    type: temporal
2568    rules:
2569        - ssh-login
2570        - vpn-login
2571    group-by:
2572        - User
2573    timespan: 60s
2574    condition: ssh-login or vpn-login
2575level: medium
2576"#;
2577        let collection = parse_sigma_yaml(yaml).unwrap();
2578        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2579        engine.add_collection(&collection).unwrap();
2580
2581        // Only SSH login by bob — "or" means this suffices
2582        let ev = json!({"EventType": "ssh_login", "User": "bob"});
2583        let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2584        assert_eq!(r.correlations.len(), 1);
2585        assert_eq!(r.correlations[0].rule_title, "Any Remote Access");
2586    }
2587
2588    #[test]
2589    fn test_extended_temporal_partial_and_no_fire() {
2590        // Temporal "and" with only one rule firing should not trigger
2591        let yaml = r#"
2592title: Recon Step 1
2593id: recon-1
2594logsource:
2595    category: process
2596detection:
2597    selection:
2598        CommandLine|contains: 'whoami'
2599    condition: selection
2600---
2601title: Recon Step 2
2602id: recon-2
2603logsource:
2604    category: process
2605detection:
2606    selection:
2607        CommandLine|contains: 'ipconfig'
2608    condition: selection
2609---
2610title: Full Recon
2611correlation:
2612    type: temporal
2613    rules:
2614        - recon-1
2615        - recon-2
2616    group-by:
2617        - Host
2618    timespan: 120s
2619    condition: recon-1 and recon-2
2620level: high
2621"#;
2622        let collection = parse_sigma_yaml(yaml).unwrap();
2623        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2624        engine.add_collection(&collection).unwrap();
2625
2626        // Only whoami (recon-1) — should not fire
2627        let ev = json!({"CommandLine": "whoami", "Host": "srv01"});
2628        let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2629        assert!(r.correlations.is_empty(), "only one of two AND rules fired");
2630
2631        // Now ipconfig (recon-2) — should fire
2632        let ev2 = json!({"CommandLine": "ipconfig /all", "Host": "srv01"});
2633        let r2 = engine.process_event_at(&Event::from_value(&ev2), 1010);
2634        assert_eq!(r2.correlations.len(), 1);
2635        assert_eq!(r2.correlations[0].rule_title, "Full Recon");
2636    }
2637
2638    // =========================================================================
2639    // Filter rules with correlation engine
2640    // =========================================================================
2641
2642    #[test]
2643    fn test_filter_with_correlation() {
2644        // Detection rule + filter + event_count correlation
2645        let yaml = r#"
2646title: Failed Auth
2647id: failed-auth
2648logsource:
2649    category: auth
2650detection:
2651    selection:
2652        EventType: auth_failure
2653    condition: selection
2654---
2655title: Exclude Service Accounts
2656filter:
2657    rules:
2658        - failed-auth
2659    selection:
2660        User|startswith: 'svc_'
2661    condition: selection
2662---
2663title: Brute Force
2664correlation:
2665    type: event_count
2666    rules:
2667        - failed-auth
2668    group-by:
2669        - User
2670    timespan: 300s
2671    condition:
2672        gte: 3
2673level: critical
2674"#;
2675        let collection = parse_sigma_yaml(yaml).unwrap();
2676        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2677        engine.add_collection(&collection).unwrap();
2678
2679        let ts = 1000i64;
2680
2681        // Service account failures should be filtered — don't count
2682        for i in 0..5 {
2683            let ev = json!({"EventType": "auth_failure", "User": "svc_backup"});
2684            let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2685            assert!(
2686                r.correlations.is_empty(),
2687                "service account should be filtered, no correlation"
2688            );
2689        }
2690
2691        // Normal user failures should count
2692        for i in 0..2 {
2693            let ev = json!({"EventType": "auth_failure", "User": "alice"});
2694            let r = engine.process_event_at(&Event::from_value(&ev), ts + 10 + i);
2695            assert!(r.correlations.is_empty(), "not yet 3 events");
2696        }
2697
2698        // Third failure triggers correlation
2699        let ev = json!({"EventType": "auth_failure", "User": "alice"});
2700        let r = engine.process_event_at(&Event::from_value(&ev), ts + 12);
2701        assert_eq!(r.correlations.len(), 1);
2702        assert_eq!(r.correlations[0].rule_title, "Brute Force");
2703    }
2704
2705    // =========================================================================
2706    // action: repeat with correlation engine
2707    // =========================================================================
2708
2709    #[test]
2710    fn test_repeat_rules_in_correlation() {
2711        // Two detection rules via repeat, both feed into event_count
2712        let yaml = r#"
2713title: File Access A
2714id: file-a
2715logsource:
2716    category: file_access
2717detection:
2718    selection:
2719        FileName|endswith: '.docx'
2720    condition: selection
2721---
2722action: repeat
2723title: File Access B
2724id: file-b
2725detection:
2726    selection:
2727        FileName|endswith: '.xlsx'
2728    condition: selection
2729---
2730title: Mass File Access
2731correlation:
2732    type: event_count
2733    rules:
2734        - file-a
2735        - file-b
2736    group-by:
2737        - User
2738    timespan: 60s
2739    condition:
2740        gte: 3
2741level: high
2742"#;
2743        let collection = parse_sigma_yaml(yaml).unwrap();
2744        assert_eq!(collection.rules.len(), 2);
2745        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2746        engine.add_collection(&collection).unwrap();
2747        assert_eq!(engine.detection_rule_count(), 2);
2748
2749        let ts = 1000i64;
2750        // Mix of docx and xlsx accesses by same user
2751        let ev1 = json!({"FileName": "report.docx", "User": "bob"});
2752        engine.process_event_at(&Event::from_value(&ev1), ts);
2753        let ev2 = json!({"FileName": "data.xlsx", "User": "bob"});
2754        engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2755        let ev3 = json!({"FileName": "notes.docx", "User": "bob"});
2756        let r = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2757
2758        assert_eq!(r.correlations.len(), 1);
2759        assert_eq!(r.correlations[0].rule_title, "Mass File Access");
2760    }
2761
2762    // =========================================================================
2763    // Expand modifier with correlation engine
2764    // =========================================================================
2765
2766    #[test]
2767    fn test_expand_modifier_with_correlation() {
2768        let yaml = r#"
2769title: User Temp File
2770id: user-temp
2771logsource:
2772    category: file_access
2773detection:
2774    selection:
2775        FilePath|expand: 'C:\Users\%User%\Temp'
2776    condition: selection
2777---
2778title: Excessive Temp Access
2779correlation:
2780    type: event_count
2781    rules:
2782        - user-temp
2783    group-by:
2784        - User
2785    timespan: 60s
2786    condition:
2787        gte: 2
2788level: medium
2789"#;
2790        let collection = parse_sigma_yaml(yaml).unwrap();
2791        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2792        engine.add_collection(&collection).unwrap();
2793
2794        let ts = 1000i64;
2795        // Event where User field matches the placeholder
2796        let ev1 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2797        let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2798        assert!(r1.correlations.is_empty());
2799
2800        let ev2 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2801        let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2802        assert_eq!(r2.correlations.len(), 1);
2803        assert_eq!(r2.correlations[0].rule_title, "Excessive Temp Access");
2804
2805        // Different user — should NOT match (path says alice, user is bob)
2806        let ev3 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "bob"});
2807        let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2808        // Detection doesn't fire for this event since expand resolves to C:\Users\bob\Temp
2809        assert_eq!(r3.detections.len(), 0);
2810    }
2811
2812    // =========================================================================
2813    // Timestamp modifier with correlation engine
2814    // =========================================================================
2815
2816    #[test]
2817    fn test_timestamp_modifier_with_correlation() {
2818        let yaml = r#"
2819title: Night Login
2820id: night-login
2821logsource:
2822    category: auth
2823detection:
2824    login:
2825        EventType: login
2826    night:
2827        Timestamp|hour: 3
2828    condition: login and night
2829---
2830title: Frequent Night Logins
2831correlation:
2832    type: event_count
2833    rules:
2834        - night-login
2835    group-by:
2836        - User
2837    timespan: 3600s
2838    condition:
2839        gte: 2
2840level: high
2841"#;
2842        let collection = parse_sigma_yaml(yaml).unwrap();
2843        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2844        engine.add_collection(&collection).unwrap();
2845
2846        let ts = 1000i64;
2847        // Login at 3AM
2848        let ev1 =
2849            json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:10:00Z"});
2850        let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2851        assert_eq!(r1.detections.len(), 1);
2852        assert!(r1.correlations.is_empty());
2853
2854        let ev2 =
2855            json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:45:00Z"});
2856        let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2857        assert_eq!(r2.correlations.len(), 1);
2858        assert_eq!(r2.correlations[0].rule_title, "Frequent Night Logins");
2859
2860        // Login at noon — should NOT count
2861        let ev3 = json!({"EventType": "login", "User": "bob", "Timestamp": "2024-01-15T12:00:00Z"});
2862        let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2863        assert!(
2864            r3.detections.is_empty(),
2865            "noon login should not match night rule"
2866        );
2867    }
2868
2869    // =========================================================================
2870    // Correlation condition range (multiple predicates)
2871    // =========================================================================
2872
2873    #[test]
2874    fn test_event_count_range_condition() {
2875        let yaml = r#"
2876title: Login Attempt
2877id: login-attempt-001
2878name: login_attempt
2879logsource:
2880    product: windows
2881detection:
2882    selection:
2883        EventType: login
2884    condition: selection
2885level: low
2886---
2887title: Login Count Range
2888id: corr-range-001
2889correlation:
2890    type: event_count
2891    rules:
2892        - login-attempt-001
2893    group-by:
2894        - User
2895    timespan: 3600s
2896    condition:
2897        gt: 2
2898        lte: 5
2899level: high
2900"#;
2901        let collection = parse_sigma_yaml(yaml).unwrap();
2902        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2903        engine.add_collection(&collection).unwrap();
2904
2905        let ts: i64 = 1_000_000;
2906
2907        // Send 2 events — gt:2 is false
2908        for i in 0..2 {
2909            let ev = json!({"EventType": "login", "User": "alice"});
2910            let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2911            assert!(r.correlations.is_empty(), "2 events should not fire (gt:2)");
2912        }
2913
2914        // 3rd event — gt:2 is true, lte:5 is true → fires
2915        let ev3 = json!({"EventType": "login", "User": "alice"});
2916        let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 3);
2917        assert_eq!(r3.correlations.len(), 1, "3 events: gt:2 AND lte:5");
2918
2919        // Send events 4, 5 — still in range
2920        for i in 4..=5 {
2921            let ev = json!({"EventType": "login", "User": "alice"});
2922            let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2923            assert_eq!(r.correlations.len(), 1, "{i} events still in range");
2924        }
2925
2926        // 6th event — lte:5 is false → no fire
2927        let ev6 = json!({"EventType": "login", "User": "alice"});
2928        let r6 = engine.process_event_at(&Event::from_value(&ev6), ts + 6);
2929        assert!(
2930            r6.correlations.is_empty(),
2931            "6 events exceeds lte:5, should not fire"
2932        );
2933    }
2934
2935    // =========================================================================
2936    // Suppression
2937    // =========================================================================
2938
2939    fn suppression_yaml() -> &'static str {
2940        r#"
2941title: Login
2942id: login-base
2943logsource:
2944    category: auth
2945detection:
2946    selection:
2947        EventType: login
2948    condition: selection
2949---
2950title: Many Logins
2951correlation:
2952    type: event_count
2953    rules:
2954        - login-base
2955    group-by:
2956        - User
2957    timeframe: 60s
2958    condition:
2959        gte: 3
2960level: high
2961"#
2962    }
2963
2964    #[test]
2965    fn test_suppression_window() {
2966        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2967        let config = CorrelationConfig {
2968            suppress: Some(10), // suppress for 10 seconds
2969            ..Default::default()
2970        };
2971        let mut engine = CorrelationEngine::new(config);
2972        engine.add_collection(&collection).unwrap();
2973
2974        let ev = json!({"EventType": "login", "User": "alice"});
2975        let ts = 1000;
2976
2977        // Fire 3 events to hit threshold
2978        engine.process_event_at(&Event::from_value(&ev), ts);
2979        engine.process_event_at(&Event::from_value(&ev), ts + 1);
2980        let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
2981        assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
2982
2983        // 4th event within suppress window → suppressed
2984        let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
2985        assert!(
2986            r4.correlations.is_empty(),
2987            "should be suppressed within 10s window"
2988        );
2989
2990        // 5th event still within suppress window → suppressed
2991        let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 9);
2992        assert!(
2993            r5.correlations.is_empty(),
2994            "should be suppressed at ts+9 (< ts+2+10)"
2995        );
2996
2997        // Event after suppress window expires → fires again
2998        let r6 = engine.process_event_at(&Event::from_value(&ev), ts + 13);
2999        assert_eq!(
3000            r6.correlations.len(),
3001            1,
3002            "should fire again after suppress window expires"
3003        );
3004    }
3005
3006    #[test]
3007    fn test_suppression_per_group_key() {
3008        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3009        let config = CorrelationConfig {
3010            suppress: Some(60),
3011            ..Default::default()
3012        };
3013        let mut engine = CorrelationEngine::new(config);
3014        engine.add_collection(&collection).unwrap();
3015
3016        let ts = 1000;
3017
3018        // Alice hits threshold
3019        let ev_a = json!({"EventType": "login", "User": "alice"});
3020        engine.process_event_at(&Event::from_value(&ev_a), ts);
3021        engine.process_event_at(&Event::from_value(&ev_a), ts + 1);
3022        let r = engine.process_event_at(&Event::from_value(&ev_a), ts + 2);
3023        assert_eq!(r.correlations.len(), 1, "alice should fire");
3024
3025        // Bob hits threshold — different group key, not suppressed
3026        let ev_b = json!({"EventType": "login", "User": "bob"});
3027        engine.process_event_at(&Event::from_value(&ev_b), ts + 3);
3028        engine.process_event_at(&Event::from_value(&ev_b), ts + 4);
3029        let r = engine.process_event_at(&Event::from_value(&ev_b), ts + 5);
3030        assert_eq!(r.correlations.len(), 1, "bob should fire independently");
3031
3032        // Alice is still suppressed
3033        let r = engine.process_event_at(&Event::from_value(&ev_a), ts + 6);
3034        assert!(r.correlations.is_empty(), "alice still suppressed");
3035    }
3036
3037    // =========================================================================
3038    // Action on match: Reset
3039    // =========================================================================
3040
3041    #[test]
3042    fn test_action_reset() {
3043        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3044        let config = CorrelationConfig {
3045            action_on_match: CorrelationAction::Reset,
3046            ..Default::default()
3047        };
3048        let mut engine = CorrelationEngine::new(config);
3049        engine.add_collection(&collection).unwrap();
3050
3051        let ev = json!({"EventType": "login", "User": "alice"});
3052        let ts = 1000;
3053
3054        // Hit threshold: 3 events
3055        engine.process_event_at(&Event::from_value(&ev), ts);
3056        engine.process_event_at(&Event::from_value(&ev), ts + 1);
3057        let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
3058        assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
3059
3060        // State was reset, so 4th and 5th events should NOT fire
3061        let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
3062        assert!(r4.correlations.is_empty(), "reset: need 3 more events");
3063
3064        let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 4);
3065        assert!(r5.correlations.is_empty(), "reset: still only 2");
3066
3067        // 6th event (3rd after reset) should fire again
3068        let r6 = engine.process_event_at(&Event::from_value(&ev), ts + 5);
3069        assert_eq!(
3070            r6.correlations.len(),
3071            1,
3072            "should fire again after 3 events post-reset"
3073        );
3074    }
3075
3076    // =========================================================================
3077    // Generate flag / emit_detections
3078    // =========================================================================
3079
3080    #[test]
3081    fn test_emit_detections_true_by_default() {
3082        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3083        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3084        engine.add_collection(&collection).unwrap();
3085
3086        let ev = json!({"EventType": "login", "User": "alice"});
3087        let r = engine.process_event_at(&Event::from_value(&ev), 1000);
3088        assert_eq!(r.detections.len(), 1, "by default detections are emitted");
3089    }
3090
3091    #[test]
3092    fn test_emit_detections_false_suppresses() {
3093        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3094        let config = CorrelationConfig {
3095            emit_detections: false,
3096            ..Default::default()
3097        };
3098        let mut engine = CorrelationEngine::new(config);
3099        engine.add_collection(&collection).unwrap();
3100
3101        let ev = json!({"EventType": "login", "User": "alice"});
3102        let r = engine.process_event_at(&Event::from_value(&ev), 1000);
3103        assert!(
3104            r.detections.is_empty(),
3105            "detection matches should be suppressed when emit_detections=false"
3106        );
3107    }
3108
3109    #[test]
3110    fn test_generate_true_keeps_detections() {
3111        // When generate: true, detections should be emitted even with emit_detections=false
3112        let yaml = r#"
3113title: Login
3114id: login-gen
3115logsource:
3116    category: auth
3117detection:
3118    selection:
3119        EventType: login
3120    condition: selection
3121---
3122title: Many Logins
3123correlation:
3124    type: event_count
3125    rules:
3126        - login-gen
3127    group-by:
3128        - User
3129    timeframe: 60s
3130    condition:
3131        gte: 3
3132    generate: true
3133level: high
3134"#;
3135        let collection = parse_sigma_yaml(yaml).unwrap();
3136        let config = CorrelationConfig {
3137            emit_detections: false,
3138            ..Default::default()
3139        };
3140        let mut engine = CorrelationEngine::new(config);
3141        engine.add_collection(&collection).unwrap();
3142
3143        let ev = json!({"EventType": "login", "User": "alice"});
3144        let r = engine.process_event_at(&Event::from_value(&ev), 1000);
3145        // generate: true means this rule is NOT correlation-only
3146        assert_eq!(
3147            r.detections.len(),
3148            1,
3149            "generate:true keeps detection output"
3150        );
3151    }
3152
3153    // =========================================================================
3154    // Suppression + Reset combined
3155    // =========================================================================
3156
3157    #[test]
3158    fn test_suppress_and_reset_combined() {
3159        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3160        let config = CorrelationConfig {
3161            suppress: Some(5),
3162            action_on_match: CorrelationAction::Reset,
3163            ..Default::default()
3164        };
3165        let mut engine = CorrelationEngine::new(config);
3166        engine.add_collection(&collection).unwrap();
3167
3168        let ev = json!({"EventType": "login", "User": "alice"});
3169        let ts = 1000;
3170
3171        // Hit threshold: fires and resets
3172        engine.process_event_at(&Event::from_value(&ev), ts);
3173        engine.process_event_at(&Event::from_value(&ev), ts + 1);
3174        let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
3175        assert_eq!(r3.correlations.len(), 1, "fires on 3rd event");
3176
3177        // Push 3 more events quickly (state was reset, so new count → 3)
3178        // but suppress window hasn't expired (ts+2 + 5 = ts+7)
3179        engine.process_event_at(&Event::from_value(&ev), ts + 3);
3180        engine.process_event_at(&Event::from_value(&ev), ts + 4);
3181        let r = engine.process_event_at(&Event::from_value(&ev), ts + 5);
3182        assert!(
3183            r.correlations.is_empty(),
3184            "threshold met again but still suppressed"
3185        );
3186
3187        // After suppress expires (at ts+8, which is ts+2+6 > suppress=5),
3188        // the accumulated events from step 2 (ts+3,4,5) still satisfy gte:3,
3189        // so the first event after expiry fires immediately and resets.
3190        let r = engine.process_event_at(&Event::from_value(&ev), ts + 8);
3191        assert_eq!(
3192            r.correlations.len(),
3193            1,
3194            "fires after suppress expires (accumulated events + new one)"
3195        );
3196
3197        // State was reset again at ts+8, suppress window now ts+8..ts+13.
3198        // Need 3 new events to fire, and suppress must expire.
3199        engine.process_event_at(&Event::from_value(&ev), ts + 9);
3200        engine.process_event_at(&Event::from_value(&ev), ts + 10);
3201        let r = engine.process_event_at(&Event::from_value(&ev), ts + 11);
3202        assert!(
3203            r.correlations.is_empty(),
3204            "threshold met but suppress window hasn't expired (ts+11 - ts+8 = 3 < 5)"
3205        );
3206    }
3207
3208    // =========================================================================
3209    // No suppression (default behavior preserved)
3210    // =========================================================================
3211
3212    #[test]
3213    fn test_no_suppression_fires_every_event() {
3214        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3215        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3216        engine.add_collection(&collection).unwrap();
3217
3218        let ev = json!({"EventType": "login", "User": "alice"});
3219        let ts = 1000;
3220
3221        engine.process_event_at(&Event::from_value(&ev), ts);
3222        engine.process_event_at(&Event::from_value(&ev), ts + 1);
3223        let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
3224        assert_eq!(r3.correlations.len(), 1);
3225
3226        // Without suppression, 4th event should also fire
3227        let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
3228        assert_eq!(
3229            r4.correlations.len(),
3230            1,
3231            "no suppression: fires on every event after threshold"
3232        );
3233
3234        let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 4);
3235        assert_eq!(r5.correlations.len(), 1, "still fires");
3236    }
3237
3238    // =========================================================================
3239    // Custom attribute → engine config tests
3240    // =========================================================================
3241
3242    #[test]
3243    fn test_custom_attr_timestamp_field() {
3244        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3245        let mut attrs = std::collections::HashMap::new();
3246        attrs.insert("rsigma.timestamp_field".to_string(), "time".to_string());
3247        engine.apply_custom_attributes(&attrs);
3248
3249        assert_eq!(
3250            engine.config.timestamp_fields[0], "time",
3251            "rsigma.timestamp_field should be prepended"
3252        );
3253        // Defaults should still be there after the custom one
3254        assert!(
3255            engine
3256                .config
3257                .timestamp_fields
3258                .contains(&"@timestamp".to_string())
3259        );
3260    }
3261
3262    #[test]
3263    fn test_custom_attr_timestamp_field_no_duplicates() {
3264        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3265        let mut attrs = std::collections::HashMap::new();
3266        attrs.insert("rsigma.timestamp_field".to_string(), "time".to_string());
3267        // Apply twice — should not duplicate
3268        engine.apply_custom_attributes(&attrs);
3269        engine.apply_custom_attributes(&attrs);
3270
3271        let count = engine
3272            .config
3273            .timestamp_fields
3274            .iter()
3275            .filter(|f| *f == "time")
3276            .count();
3277        assert_eq!(count, 1, "should not duplicate timestamp_field entries");
3278    }
3279
3280    #[test]
3281    fn test_custom_attr_suppress() {
3282        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3283        assert!(engine.config.suppress.is_none());
3284
3285        let mut attrs = std::collections::HashMap::new();
3286        attrs.insert("rsigma.suppress".to_string(), "5m".to_string());
3287        engine.apply_custom_attributes(&attrs);
3288
3289        assert_eq!(engine.config.suppress, Some(300));
3290    }
3291
3292    #[test]
3293    fn test_custom_attr_suppress_does_not_override_cli() {
3294        let config = CorrelationConfig {
3295            suppress: Some(60), // CLI set to 60s
3296            ..Default::default()
3297        };
3298        let mut engine = CorrelationEngine::new(config);
3299
3300        let mut attrs = std::collections::HashMap::new();
3301        attrs.insert("rsigma.suppress".to_string(), "5m".to_string());
3302        engine.apply_custom_attributes(&attrs);
3303
3304        assert_eq!(
3305            engine.config.suppress,
3306            Some(60),
3307            "CLI suppress should not be overridden by custom attribute"
3308        );
3309    }
3310
3311    #[test]
3312    fn test_custom_attr_action() {
3313        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3314        assert_eq!(engine.config.action_on_match, CorrelationAction::Alert);
3315
3316        let mut attrs = std::collections::HashMap::new();
3317        attrs.insert("rsigma.action".to_string(), "reset".to_string());
3318        engine.apply_custom_attributes(&attrs);
3319
3320        assert_eq!(engine.config.action_on_match, CorrelationAction::Reset);
3321    }
3322
3323    #[test]
3324    fn test_custom_attr_action_does_not_override_cli() {
3325        let config = CorrelationConfig {
3326            action_on_match: CorrelationAction::Reset, // CLI set to reset
3327            ..Default::default()
3328        };
3329        let mut engine = CorrelationEngine::new(config);
3330
3331        let mut attrs = std::collections::HashMap::new();
3332        attrs.insert("rsigma.action".to_string(), "alert".to_string());
3333        engine.apply_custom_attributes(&attrs);
3334
3335        assert_eq!(
3336            engine.config.action_on_match,
3337            CorrelationAction::Reset,
3338            "CLI action should not be overridden by custom attribute"
3339        );
3340    }
3341
3342    #[test]
3343    fn test_custom_attr_timestamp_field_used_for_extraction() {
3344        // The event has "time" but not "@timestamp" or "timestamp"
3345        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3346        let mut config = CorrelationConfig::default();
3347        // Prepend "event_time" to simulate --timestamp-field
3348        config.timestamp_fields.insert(0, "event_time".to_string());
3349        let mut engine = CorrelationEngine::new(config);
3350        engine.add_collection(&collection).unwrap();
3351
3352        // Event with "event_time" field
3353        let ev = json!({
3354            "EventType": "login",
3355            "User": "alice",
3356            "event_time": "2026-02-11T12:00:00Z"
3357        });
3358        let result = engine.process_event(&Event::from_value(&ev));
3359
3360        // The detection should match, and timestamp should be ~1739275200 (2026-02-11)
3361        assert!(!result.detections.is_empty() || result.correlations.is_empty());
3362        // The key test: ensure the engine extracted the event timestamp, not Utc::now.
3363        // If it used Utc::now, the test would still pass but the timestamp would be
3364        // wildly different. We verify by checking the extracted value directly.
3365        let ts = engine
3366            .extract_event_timestamp(&Event::from_value(&ev))
3367            .expect("should extract timestamp");
3368        assert!(
3369            ts > 1_700_000_000 && ts < 1_800_000_000,
3370            "timestamp should be ~2026 epoch, got {ts}"
3371        );
3372    }
3373
3374    // =========================================================================
3375    // Cycle detection
3376    // =========================================================================
3377
3378    #[test]
3379    fn test_correlation_cycle_direct() {
3380        // Two correlations that reference each other: A -> B -> A
3381        let yaml = r#"
3382title: detection rule
3383id: det-rule
3384logsource:
3385    product: test
3386detection:
3387    selection:
3388        action: click
3389    condition: selection
3390level: low
3391---
3392title: correlation A
3393id: corr-a
3394correlation:
3395    type: event_count
3396    rules:
3397        - corr-b
3398    group-by:
3399        - User
3400    timespan: 5m
3401    condition:
3402        gte: 2
3403level: high
3404---
3405title: correlation B
3406id: corr-b
3407correlation:
3408    type: event_count
3409    rules:
3410        - corr-a
3411    group-by:
3412        - User
3413    timespan: 5m
3414    condition:
3415        gte: 2
3416level: high
3417"#;
3418        let collection = parse_sigma_yaml(yaml).unwrap();
3419        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3420        let result = engine.add_collection(&collection);
3421        assert!(result.is_err(), "should detect direct cycle");
3422        let err = result.unwrap_err().to_string();
3423        assert!(err.contains("cycle"), "error should mention cycle: {err}");
3424        assert!(
3425            err.contains("corr-a") && err.contains("corr-b"),
3426            "error should name both correlations: {err}"
3427        );
3428    }
3429
3430    #[test]
3431    fn test_correlation_cycle_self() {
3432        // A correlation that references itself
3433        let yaml = r#"
3434title: detection rule
3435id: det-rule
3436logsource:
3437    product: test
3438detection:
3439    selection:
3440        action: click
3441    condition: selection
3442level: low
3443---
3444title: self-ref correlation
3445id: self-corr
3446correlation:
3447    type: event_count
3448    rules:
3449        - self-corr
3450    group-by:
3451        - User
3452    timespan: 5m
3453    condition:
3454        gte: 2
3455level: high
3456"#;
3457        let collection = parse_sigma_yaml(yaml).unwrap();
3458        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3459        let result = engine.add_collection(&collection);
3460        assert!(result.is_err(), "should detect self-referencing cycle");
3461        let err = result.unwrap_err().to_string();
3462        assert!(err.contains("cycle"), "error should mention cycle: {err}");
3463        assert!(
3464            err.contains("self-corr"),
3465            "error should name the correlation: {err}"
3466        );
3467    }
3468
3469    #[test]
3470    fn test_correlation_no_cycle_valid_chain() {
3471        // Valid chain: detection -> corr-A -> corr-B (no cycle)
3472        let yaml = r#"
3473title: detection rule
3474id: det-rule
3475logsource:
3476    product: test
3477detection:
3478    selection:
3479        action: click
3480    condition: selection
3481level: low
3482---
3483title: correlation A
3484id: corr-a
3485correlation:
3486    type: event_count
3487    rules:
3488        - det-rule
3489    group-by:
3490        - User
3491    timespan: 5m
3492    condition:
3493        gte: 2
3494level: high
3495---
3496title: correlation B
3497id: corr-b
3498correlation:
3499    type: event_count
3500    rules:
3501        - corr-a
3502    group-by:
3503        - User
3504    timespan: 5m
3505    condition:
3506        gte: 2
3507level: high
3508"#;
3509        let collection = parse_sigma_yaml(yaml).unwrap();
3510        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3511        let result = engine.add_collection(&collection);
3512        assert!(
3513            result.is_ok(),
3514            "valid chain should not be rejected: {result:?}"
3515        );
3516    }
3517
3518    #[test]
3519    fn test_correlation_cycle_transitive() {
3520        // Transitive cycle: A -> B -> C -> A
3521        let yaml = r#"
3522title: detection rule
3523id: det-rule
3524logsource:
3525    product: test
3526detection:
3527    selection:
3528        action: click
3529    condition: selection
3530level: low
3531---
3532title: correlation A
3533id: corr-a
3534correlation:
3535    type: event_count
3536    rules:
3537        - corr-c
3538    group-by:
3539        - User
3540    timespan: 5m
3541    condition:
3542        gte: 2
3543level: high
3544---
3545title: correlation B
3546id: corr-b
3547correlation:
3548    type: event_count
3549    rules:
3550        - corr-a
3551    group-by:
3552        - User
3553    timespan: 5m
3554    condition:
3555        gte: 2
3556level: high
3557---
3558title: correlation C
3559id: corr-c
3560correlation:
3561    type: event_count
3562    rules:
3563        - corr-b
3564    group-by:
3565        - User
3566    timespan: 5m
3567    condition:
3568        gte: 2
3569level: high
3570"#;
3571        let collection = parse_sigma_yaml(yaml).unwrap();
3572        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3573        let result = engine.add_collection(&collection);
3574        assert!(result.is_err(), "should detect transitive cycle");
3575        let err = result.unwrap_err().to_string();
3576        assert!(err.contains("cycle"), "error should mention cycle: {err}");
3577    }
3578
3579    // =========================================================================
3580    // Correlation event inclusion tests
3581    // =========================================================================
3582
3583    #[test]
3584    fn test_correlation_events_disabled_by_default() {
3585        let yaml = r#"
3586title: Login
3587id: login-rule
3588logsource:
3589    category: auth
3590detection:
3591    selection:
3592        EventType: login
3593    condition: selection
3594---
3595title: Many Logins
3596correlation:
3597    type: event_count
3598    rules:
3599        - login-rule
3600    group-by:
3601        - User
3602    timespan: 60s
3603    condition:
3604        gte: 3
3605level: high
3606"#;
3607        let collection = parse_sigma_yaml(yaml).unwrap();
3608        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3609        engine.add_collection(&collection).unwrap();
3610
3611        for i in 0..3 {
3612            let v = json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i});
3613            let event = Event::from_value(&v);
3614            let result = engine.process_event_at(&event, 1000 + i);
3615            if i == 2 {
3616                assert_eq!(result.correlations.len(), 1);
3617                // Events should NOT be included by default
3618                assert!(result.correlations[0].events.is_none());
3619            }
3620        }
3621    }
3622
3623    #[test]
3624    fn test_correlation_events_included_when_enabled() {
3625        let yaml = r#"
3626title: Login
3627id: login-rule
3628logsource:
3629    category: auth
3630detection:
3631    selection:
3632        EventType: login
3633    condition: selection
3634---
3635title: Many Logins
3636correlation:
3637    type: event_count
3638    rules:
3639        - login-rule
3640    group-by:
3641        - User
3642    timespan: 60s
3643    condition:
3644        gte: 3
3645level: high
3646"#;
3647        let collection = parse_sigma_yaml(yaml).unwrap();
3648        let config = CorrelationConfig {
3649            correlation_event_mode: CorrelationEventMode::Full,
3650            max_correlation_events: 10,
3651            ..Default::default()
3652        };
3653        let mut engine = CorrelationEngine::new(config);
3654        engine.add_collection(&collection).unwrap();
3655
3656        let events_sent: Vec<serde_json::Value> = (0..3)
3657            .map(|i| json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i}))
3658            .collect();
3659
3660        let mut corr_result = None;
3661        for (i, ev) in events_sent.iter().enumerate() {
3662            let event = Event::from_value(ev);
3663            let result = engine.process_event_at(&event, 1000 + i as i64);
3664            if !result.correlations.is_empty() {
3665                corr_result = Some(result);
3666            }
3667        }
3668
3669        let result = corr_result.expect("correlation should have fired");
3670        let corr = &result.correlations[0];
3671
3672        // Events should be included
3673        let events = corr.events.as_ref().expect("events should be present");
3674        assert_eq!(
3675            events.len(),
3676            3,
3677            "all 3 contributing events should be stored"
3678        );
3679
3680        // Verify all sent events are present
3681        for (i, event) in events.iter().enumerate() {
3682            assert_eq!(event["EventType"], "login");
3683            assert_eq!(event["User"], "admin");
3684            assert_eq!(event["@timestamp"], 1000 + i as i64);
3685        }
3686    }
3687
3688    #[test]
3689    fn test_correlation_events_max_cap() {
3690        let yaml = r#"
3691title: Login
3692id: login-rule
3693logsource:
3694    category: auth
3695detection:
3696    selection:
3697        EventType: login
3698    condition: selection
3699---
3700title: Many Logins
3701correlation:
3702    type: event_count
3703    rules:
3704        - login-rule
3705    group-by:
3706        - User
3707    timespan: 60s
3708    condition:
3709        gte: 5
3710level: high
3711"#;
3712        let collection = parse_sigma_yaml(yaml).unwrap();
3713        let config = CorrelationConfig {
3714            correlation_event_mode: CorrelationEventMode::Full,
3715            max_correlation_events: 3, // only keep last 3
3716            ..Default::default()
3717        };
3718        let mut engine = CorrelationEngine::new(config);
3719        engine.add_collection(&collection).unwrap();
3720
3721        let mut corr_result = None;
3722        for i in 0..5 {
3723            let v = json!({"EventType": "login", "User": "admin", "idx": i});
3724            let event = Event::from_value(&v);
3725            let result = engine.process_event_at(&event, 1000 + i);
3726            if !result.correlations.is_empty() {
3727                corr_result = Some(result);
3728            }
3729        }
3730
3731        let result = corr_result.expect("correlation should have fired");
3732        let events = result.correlations[0]
3733            .events
3734            .as_ref()
3735            .expect("events should be present");
3736
3737        // Only the last 3 events should be retained (cap = 3)
3738        assert_eq!(events.len(), 3);
3739        assert_eq!(events[0]["idx"], 2);
3740        assert_eq!(events[1]["idx"], 3);
3741        assert_eq!(events[2]["idx"], 4);
3742    }
3743
3744    #[test]
3745    fn test_correlation_events_with_reset_action() {
3746        let yaml = r#"
3747title: Login
3748id: login-rule
3749logsource:
3750    category: auth
3751detection:
3752    selection:
3753        EventType: login
3754    condition: selection
3755---
3756title: Many Logins
3757correlation:
3758    type: event_count
3759    rules:
3760        - login-rule
3761    group-by:
3762        - User
3763    timespan: 60s
3764    condition:
3765        gte: 2
3766level: high
3767"#;
3768        let collection = parse_sigma_yaml(yaml).unwrap();
3769        let config = CorrelationConfig {
3770            correlation_event_mode: CorrelationEventMode::Full,
3771            action_on_match: CorrelationAction::Reset,
3772            ..Default::default()
3773        };
3774        let mut engine = CorrelationEngine::new(config);
3775        engine.add_collection(&collection).unwrap();
3776
3777        // First round: 2 events -> fires
3778        for i in 0..2 {
3779            let v = json!({"EventType": "login", "User": "admin", "round": 1, "idx": i});
3780            let event = Event::from_value(&v);
3781            let result = engine.process_event_at(&event, 1000 + i);
3782            if i == 1 {
3783                assert_eq!(result.correlations.len(), 1);
3784                let events = result.correlations[0].events.as_ref().unwrap();
3785                assert_eq!(events.len(), 2);
3786            }
3787        }
3788
3789        // After reset, event buffer should be cleared.
3790        // Second round: need 2 more events to fire again
3791        let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 0});
3792        let event = Event::from_value(&v);
3793        let result = engine.process_event_at(&event, 1010);
3794        assert!(
3795            result.correlations.is_empty(),
3796            "should not fire with only 1 event after reset"
3797        );
3798
3799        let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 1});
3800        let event = Event::from_value(&v);
3801        let result = engine.process_event_at(&event, 1011);
3802        assert_eq!(result.correlations.len(), 1);
3803        let events = result.correlations[0].events.as_ref().unwrap();
3804        assert_eq!(events.len(), 2);
3805        // Should only have round 2 events
3806        assert_eq!(events[0]["round"], 2);
3807        assert_eq!(events[1]["round"], 2);
3808    }
3809
3810    #[test]
3811    fn test_correlation_events_with_set_include() {
3812        let yaml = r#"
3813title: Login
3814id: login-rule
3815logsource:
3816    category: auth
3817detection:
3818    selection:
3819        EventType: login
3820    condition: selection
3821---
3822title: Many Logins
3823correlation:
3824    type: event_count
3825    rules:
3826        - login-rule
3827    group-by:
3828        - User
3829    timespan: 60s
3830    condition:
3831        gte: 2
3832level: high
3833"#;
3834        let collection = parse_sigma_yaml(yaml).unwrap();
3835        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3836        engine.add_collection(&collection).unwrap();
3837
3838        // Enable via setter
3839        engine.set_correlation_event_mode(CorrelationEventMode::Full);
3840
3841        for i in 0..2 {
3842            let v = json!({"EventType": "login", "User": "admin"});
3843            let event = Event::from_value(&v);
3844            let result = engine.process_event_at(&event, 1000 + i);
3845            if i == 1 {
3846                assert_eq!(result.correlations.len(), 1);
3847                assert!(result.correlations[0].events.is_some());
3848                assert_eq!(result.correlations[0].events.as_ref().unwrap().len(), 2);
3849            }
3850        }
3851    }
3852
3853    #[test]
3854    fn test_correlation_events_eviction_syncs_with_window() {
3855        let yaml = r#"
3856title: Login
3857id: login-rule
3858logsource:
3859    category: auth
3860detection:
3861    selection:
3862        EventType: login
3863    condition: selection
3864---
3865title: Many Logins
3866correlation:
3867    type: event_count
3868    rules:
3869        - login-rule
3870    group-by:
3871        - User
3872    timespan: 10s
3873    condition:
3874        gte: 3
3875level: high
3876"#;
3877        let collection = parse_sigma_yaml(yaml).unwrap();
3878        let config = CorrelationConfig {
3879            correlation_event_mode: CorrelationEventMode::Full,
3880            max_correlation_events: 100,
3881            ..Default::default()
3882        };
3883        let mut engine = CorrelationEngine::new(config);
3884        engine.add_collection(&collection).unwrap();
3885
3886        // Push 2 events at ts=1000,1001 — within the 10s window
3887        for i in 0..2 {
3888            let v = json!({"EventType": "login", "User": "admin", "idx": i});
3889            let event = Event::from_value(&v);
3890            engine.process_event_at(&event, 1000 + i);
3891        }
3892
3893        // Push 1 more event at ts=1015 — the first 2 events are now outside the
3894        // 10s window (cutoff = 1015 - 10 = 1005)
3895        let v = json!({"EventType": "login", "User": "admin", "idx": 2});
3896        let event = Event::from_value(&v);
3897        let result = engine.process_event_at(&event, 1015);
3898        // Should NOT fire: only 1 event in window (the one at ts=1015)
3899        assert!(
3900            result.correlations.is_empty(),
3901            "should not fire — old events evicted"
3902        );
3903
3904        // Push 2 more to reach threshold
3905        for i in 3..5 {
3906            let v = json!({"EventType": "login", "User": "admin", "idx": i});
3907            let event = Event::from_value(&v);
3908            let result = engine.process_event_at(&event, 1016 + i - 3);
3909            if i == 4 {
3910                assert_eq!(result.correlations.len(), 1);
3911                let events = result.correlations[0].events.as_ref().unwrap();
3912                // Should have events from ts=1015,1016,1017 — not the old ones
3913                assert_eq!(events.len(), 3);
3914                for ev in events {
3915                    assert!(ev["idx"].as_i64().unwrap() >= 2);
3916                }
3917            }
3918        }
3919    }
3920
3921    #[test]
3922    fn test_event_buffer_monitoring() {
3923        let yaml = r#"
3924title: Login
3925id: login-rule
3926logsource:
3927    category: auth
3928detection:
3929    selection:
3930        EventType: login
3931    condition: selection
3932---
3933title: Many Logins
3934correlation:
3935    type: event_count
3936    rules:
3937        - login-rule
3938    group-by:
3939        - User
3940    timespan: 60s
3941    condition:
3942        gte: 100
3943level: high
3944"#;
3945        let collection = parse_sigma_yaml(yaml).unwrap();
3946        let config = CorrelationConfig {
3947            correlation_event_mode: CorrelationEventMode::Full,
3948            ..Default::default()
3949        };
3950        let mut engine = CorrelationEngine::new(config);
3951        engine.add_collection(&collection).unwrap();
3952
3953        assert_eq!(engine.event_buffer_count(), 0);
3954        assert_eq!(engine.event_buffer_bytes(), 0);
3955
3956        // Push some events
3957        for i in 0..5 {
3958            let v = json!({"EventType": "login", "User": "admin"});
3959            let event = Event::from_value(&v);
3960            engine.process_event_at(&event, 1000 + i);
3961        }
3962
3963        assert_eq!(engine.event_buffer_count(), 1); // one group key
3964        assert!(engine.event_buffer_bytes() > 0);
3965    }
3966
3967    #[test]
3968    fn test_correlation_refs_mode_basic() {
3969        let yaml = r#"
3970title: Login
3971id: login-rule
3972logsource:
3973    category: auth
3974detection:
3975    selection:
3976        EventType: login
3977    condition: selection
3978---
3979title: Many Logins
3980correlation:
3981    type: event_count
3982    rules:
3983        - login-rule
3984    group-by:
3985        - User
3986    timespan: 60s
3987    condition:
3988        gte: 3
3989level: high
3990"#;
3991        let collection = parse_sigma_yaml(yaml).unwrap();
3992        let config = CorrelationConfig {
3993            correlation_event_mode: CorrelationEventMode::Refs,
3994            max_correlation_events: 10,
3995            ..Default::default()
3996        };
3997        let mut engine = CorrelationEngine::new(config);
3998        engine.add_collection(&collection).unwrap();
3999
4000        let mut corr_result = None;
4001        for i in 0..3 {
4002            let v = json!({"EventType": "login", "User": "admin", "id": format!("evt-{i}"), "@timestamp": 1000 + i});
4003            let event = Event::from_value(&v);
4004            let result = engine.process_event_at(&event, 1000 + i);
4005            if !result.correlations.is_empty() {
4006                corr_result = Some(result.correlations[0].clone());
4007            }
4008        }
4009
4010        let result = corr_result.expect("correlation should have fired");
4011        // In refs mode: events should be None, event_refs should be Some
4012        assert!(
4013            result.events.is_none(),
4014            "Full events should not be included in refs mode"
4015        );
4016        let refs = result
4017            .event_refs
4018            .expect("event_refs should be present in refs mode");
4019        assert_eq!(refs.len(), 3);
4020        assert_eq!(refs[0].timestamp, 1000);
4021        assert_eq!(refs[0].id, Some("evt-0".to_string()));
4022        assert_eq!(refs[1].id, Some("evt-1".to_string()));
4023        assert_eq!(refs[2].id, Some("evt-2".to_string()));
4024    }
4025
4026    #[test]
4027    fn test_correlation_refs_mode_no_id_field() {
4028        let yaml = r#"
4029title: Login
4030id: login-rule
4031logsource:
4032    category: auth
4033detection:
4034    selection:
4035        EventType: login
4036    condition: selection
4037---
4038title: Many Logins
4039correlation:
4040    type: event_count
4041    rules:
4042        - login-rule
4043    group-by:
4044        - User
4045    timespan: 60s
4046    condition:
4047        gte: 2
4048level: high
4049"#;
4050        let collection = parse_sigma_yaml(yaml).unwrap();
4051        let config = CorrelationConfig {
4052            correlation_event_mode: CorrelationEventMode::Refs,
4053            ..Default::default()
4054        };
4055        let mut engine = CorrelationEngine::new(config);
4056        engine.add_collection(&collection).unwrap();
4057
4058        let mut corr_result = None;
4059        for i in 0..2 {
4060            let v = json!({"EventType": "login", "User": "admin"});
4061            let event = Event::from_value(&v);
4062            let result = engine.process_event_at(&event, 1000 + i);
4063            if !result.correlations.is_empty() {
4064                corr_result = Some(result.correlations[0].clone());
4065            }
4066        }
4067
4068        let result = corr_result.expect("correlation should have fired");
4069        let refs = result.event_refs.expect("event_refs should be present");
4070        // No ID field in events → id should be None
4071        for r in &refs {
4072            assert_eq!(r.id, None);
4073        }
4074    }
4075
4076    #[test]
4077    fn test_per_correlation_custom_attributes_from_yaml() {
4078        let yaml = r#"
4079title: Login
4080id: login-rule
4081logsource:
4082    category: auth
4083detection:
4084    selection:
4085        EventType: login
4086    condition: selection
4087---
4088title: Many Logins
4089custom_attributes:
4090    rsigma.correlation_event_mode: refs
4091    rsigma.max_correlation_events: "5"
4092correlation:
4093    type: event_count
4094    rules:
4095        - login-rule
4096    group-by:
4097        - User
4098    timespan: 60s
4099    condition:
4100        gte: 3
4101level: high
4102"#;
4103        let collection = parse_sigma_yaml(yaml).unwrap();
4104        // Engine mode is None (default), but per-correlation should override to Refs
4105        let config = CorrelationConfig::default();
4106        let mut engine = CorrelationEngine::new(config);
4107        engine.add_collection(&collection).unwrap();
4108
4109        let mut corr_result = None;
4110        for i in 0..3 {
4111            let v = json!({"EventType": "login", "User": "admin", "id": format!("e{i}")});
4112            let event = Event::from_value(&v);
4113            let result = engine.process_event_at(&event, 1000 + i);
4114            if !result.correlations.is_empty() {
4115                corr_result = Some(result.correlations[0].clone());
4116            }
4117        }
4118
4119        let result = corr_result.expect("correlation should fire with per-correlation refs mode");
4120        // Per-correlation override should enable refs mode even though engine default is None
4121        assert!(result.events.is_none());
4122        let refs = result
4123            .event_refs
4124            .expect("event_refs via per-correlation override");
4125        assert_eq!(refs.len(), 3);
4126        assert_eq!(refs[0].id, Some("e0".to_string()));
4127    }
4128
4129    #[test]
4130    fn test_per_correlation_custom_attr_suppress_and_action() {
4131        let yaml = r#"
4132title: Login
4133id: login-rule
4134logsource:
4135    category: auth
4136detection:
4137    selection:
4138        EventType: login
4139    condition: selection
4140---
4141title: Many Logins
4142custom_attributes:
4143    rsigma.suppress: 10s
4144    rsigma.action: reset
4145correlation:
4146    type: event_count
4147    rules:
4148        - login-rule
4149    group-by:
4150        - User
4151    timespan: 60s
4152    condition:
4153        gte: 2
4154level: high
4155"#;
4156        let collection = parse_sigma_yaml(yaml).unwrap();
4157        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
4158        engine.add_collection(&collection).unwrap();
4159
4160        // Verify the compiled correlation has per-rule overrides
4161        assert_eq!(engine.correlations[0].suppress_secs, Some(10));
4162        assert_eq!(
4163            engine.correlations[0].action,
4164            Some(CorrelationAction::Reset)
4165        );
4166    }
4167
4168    #[test]
4169    fn test_process_with_detections_matches_process_event_at() {
4170        let yaml = r#"
4171title: Login Failure
4172id: login-fail
4173logsource:
4174    category: auth
4175detection:
4176    selection:
4177        EventType: login_failure
4178    condition: selection
4179---
4180title: Brute Force
4181correlation:
4182    type: event_count
4183    rules:
4184        - login-fail
4185    group-by:
4186        - User
4187    timespan: 60s
4188    condition:
4189        gte: 3
4190level: high
4191"#;
4192        let collection = parse_sigma_yaml(yaml).unwrap();
4193
4194        // Run with process_event_at
4195        let mut engine1 = CorrelationEngine::new(CorrelationConfig::default());
4196        engine1.add_collection(&collection).unwrap();
4197
4198        let events: Vec<serde_json::Value> = (0..5)
4199            .map(|i| json!({"EventType": "login_failure", "User": "admin", "@timestamp": format!("2025-01-01T00:00:0{}Z", i + 1)}))
4200            .collect();
4201
4202        let results1: Vec<ProcessResult> = events
4203            .iter()
4204            .enumerate()
4205            .map(|(i, v)| {
4206                let e = Event::from_value(v);
4207                engine1.process_event_at(&e, 1000 + i as i64)
4208            })
4209            .collect();
4210
4211        // Run with evaluate + process_with_detections
4212        let mut engine2 = CorrelationEngine::new(CorrelationConfig::default());
4213        engine2.add_collection(&collection).unwrap();
4214
4215        let results2: Vec<ProcessResult> = events
4216            .iter()
4217            .enumerate()
4218            .map(|(i, v)| {
4219                let e = Event::from_value(v);
4220                let detections = engine2.evaluate(&e);
4221                engine2.process_with_detections(&e, detections, 1000 + i as i64)
4222            })
4223            .collect();
4224
4225        // Same number of results
4226        assert_eq!(results1.len(), results2.len());
4227        for (r1, r2) in results1.iter().zip(results2.iter()) {
4228            assert_eq!(r1.detections.len(), r2.detections.len());
4229            assert_eq!(r1.correlations.len(), r2.correlations.len());
4230        }
4231    }
4232
4233    #[test]
4234    fn test_process_batch_matches_sequential() {
4235        let yaml = r#"
4236title: Login Failure
4237id: login-fail-batch
4238logsource:
4239    category: auth
4240detection:
4241    selection:
4242        EventType: login_failure
4243    condition: selection
4244---
4245title: Brute Force Batch
4246correlation:
4247    type: event_count
4248    rules:
4249        - login-fail-batch
4250    group-by:
4251        - User
4252    timespan: 60s
4253    condition:
4254        gte: 3
4255level: high
4256"#;
4257        let collection = parse_sigma_yaml(yaml).unwrap();
4258
4259        let event_values: Vec<serde_json::Value> = (0..5)
4260            .map(|i| json!({"EventType": "login_failure", "User": "admin", "@timestamp": format!("2025-01-01T00:00:0{}Z", i + 1)}))
4261            .collect();
4262
4263        // Sequential
4264        let mut engine1 = CorrelationEngine::new(CorrelationConfig::default());
4265        engine1.add_collection(&collection).unwrap();
4266        let sequential: Vec<ProcessResult> = event_values
4267            .iter()
4268            .enumerate()
4269            .map(|(i, v)| {
4270                let e = Event::from_value(v);
4271                engine1.process_event_at(&e, 1000 + i as i64)
4272            })
4273            .collect();
4274
4275        // Batch
4276        let mut engine2 = CorrelationEngine::new(CorrelationConfig::default());
4277        engine2.add_collection(&collection).unwrap();
4278        let events: Vec<Event> = event_values.iter().map(Event::from_value).collect();
4279        let refs: Vec<&Event> = events.iter().collect();
4280        let batch = engine2.process_batch(&refs);
4281
4282        assert_eq!(sequential.len(), batch.len());
4283        for (seq, bat) in sequential.iter().zip(batch.iter()) {
4284            assert_eq!(seq.detections.len(), bat.detections.len());
4285            assert_eq!(seq.correlations.len(), bat.correlations.len());
4286        }
4287    }
4288}