Skip to main content

rsigma_eval/
correlation_engine.rs

1//! Stateful correlation engine with time-windowed aggregation.
2//!
3//! `CorrelationEngine` wraps the stateless `Engine` and adds support for
4//! Sigma correlation rules: `event_count`, `value_count`, `temporal`,
5//! `temporal_ordered`, `value_sum`, `value_avg`, `value_percentile`,
6//! and `value_median`.
7//!
8//! # Architecture
9//!
10//! 1. Events are first evaluated against detection rules (stateless)
11//! 2. Detection matches update correlation window state (stateful)
12//! 3. When a correlation condition is met, a `CorrelationResult` is emitted
13//! 4. Correlation results can chain into higher-level correlations
14
15use std::collections::HashMap;
16
17use chrono::{DateTime, TimeZone, Utc};
18use serde::Serialize;
19
20use rsigma_parser::{CorrelationRule, CorrelationType, Level, SigmaCollection, SigmaRule};
21
22use crate::correlation::{
23    CompiledCorrelation, EventBuffer, EventRef, EventRefBuffer, GroupKey, WindowState,
24    compile_correlation,
25};
26use crate::engine::Engine;
27use crate::error::{EvalError, Result};
28use crate::event::Event;
29use crate::pipeline::{Pipeline, apply_pipelines};
30use crate::result::MatchResult;
31
32// =============================================================================
33// Configuration
34// =============================================================================
35
36/// What to do with window state after a correlation fires.
37///
38/// This is an engine-level default that can be overridden per-correlation
39/// via the `rsigma.action` custom attribute set in processing pipelines.
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
41#[serde(rename_all = "snake_case")]
42pub enum CorrelationAction {
43    /// Keep window state as-is after firing (current / default behavior).
44    /// Subsequent events that still satisfy the condition will re-fire.
45    #[default]
46    Alert,
47    /// Clear the window state for the firing group key after emitting the alert.
48    /// The threshold must be met again from scratch before the next alert.
49    Reset,
50}
51
52impl std::str::FromStr for CorrelationAction {
53    type Err = String;
54    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
55        match s {
56            "alert" => Ok(CorrelationAction::Alert),
57            "reset" => Ok(CorrelationAction::Reset),
58            _ => Err(format!(
59                "Unknown correlation action: {s} (expected 'alert' or 'reset')"
60            )),
61        }
62    }
63}
64
65/// How to include events in correlation results.
66///
67/// Can be overridden per-correlation via the `rsigma.correlation_event_mode`
68/// custom attribute.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
70#[serde(rename_all = "snake_case")]
71pub enum CorrelationEventMode {
72    /// Don't include events (default). Zero memory overhead.
73    #[default]
74    None,
75    /// Include full event bodies, individually compressed with deflate.
76    /// Typical cost: 100–1000 bytes per event.
77    Full,
78    /// Include only event references (timestamp + optional ID).
79    /// Minimal memory: ~40 bytes per event.
80    Refs,
81}
82
83impl std::str::FromStr for CorrelationEventMode {
84    type Err = String;
85    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
86        match s.to_lowercase().as_str() {
87            "none" | "off" | "false" => Ok(CorrelationEventMode::None),
88            "full" | "true" => Ok(CorrelationEventMode::Full),
89            "refs" | "references" => Ok(CorrelationEventMode::Refs),
90            _ => Err(format!(
91                "Unknown correlation event mode: {s} (expected 'none', 'full', or 'refs')"
92            )),
93        }
94    }
95}
96
97impl std::fmt::Display for CorrelationEventMode {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        match self {
100            CorrelationEventMode::None => write!(f, "none"),
101            CorrelationEventMode::Full => write!(f, "full"),
102            CorrelationEventMode::Refs => write!(f, "refs"),
103        }
104    }
105}
106
107/// Behavior when no timestamp field is found or parseable in an event.
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
109pub enum TimestampFallback {
110    /// Use wall-clock time (`Utc::now()`). Good for real-time streaming.
111    #[default]
112    WallClock,
113    /// Skip the event from correlation processing. Detections still fire,
114    /// but the event does not update any correlation state. Recommended for
115    /// batch/replay of historical logs where wall-clock time is meaningless.
116    Skip,
117}
118
119/// Configuration for the correlation engine.
120///
121/// Provides engine-level defaults that mirror pySigma backend optional arguments.
122/// Per-correlation overrides can be set via `SetCustomAttribute` pipeline
123/// transformations using the `rsigma.*` attribute namespace.
124#[derive(Debug, Clone)]
125pub struct CorrelationConfig {
126    /// Field names to try for timestamp extraction, in order of priority.
127    ///
128    /// The engine will try each field until one yields a parseable timestamp.
129    /// If none succeed, the `timestamp_fallback` policy applies.
130    pub timestamp_fields: Vec<String>,
131
132    /// What to do when no timestamp can be extracted from an event.
133    ///
134    /// Default: `WallClock` (use `Utc::now()`).
135    pub timestamp_fallback: TimestampFallback,
136
137    /// Maximum number of state entries (across all correlations and groups)
138    /// before aggressive eviction is triggered. Prevents unbounded memory growth.
139    ///
140    /// Default: 100_000.
141    pub max_state_entries: usize,
142
143    /// Default suppression window in seconds.
144    ///
145    /// After a correlation fires for a `(correlation, group_key)`, suppress
146    /// re-alerts for this duration. `None` means no suppression (every
147    /// condition-satisfying event produces an alert).
148    ///
149    /// Can be overridden per-correlation via the `rsigma.suppress` custom attribute.
150    pub suppress: Option<u64>,
151
152    /// Default action to take after a correlation fires.
153    ///
154    /// Can be overridden per-correlation via the `rsigma.action` custom attribute.
155    pub action_on_match: CorrelationAction,
156
157    /// Whether to emit detection-level matches for rules that are only
158    /// referenced by correlations (where `generate: false`).
159    ///
160    /// Default: `true` (emit all detection matches).
161    /// Set to `false` to suppress detection output for correlation-only rules.
162    pub emit_detections: bool,
163
164    /// How to include contributing events in correlation results.
165    ///
166    /// - `None` (default): no event storage, zero overhead.
167    /// - `Full`: events are deflate-compressed and decompressed on output.
168    /// - `Refs`: only timestamps + event IDs are stored (minimal memory).
169    ///
170    /// Can be overridden per-correlation via `rsigma.correlation_event_mode`.
171    pub correlation_event_mode: CorrelationEventMode,
172
173    /// Maximum number of events to store per (correlation, group_key) window
174    /// when `correlation_event_mode` is not `None`.
175    ///
176    /// Bounds memory at: `max_correlation_events × cost_per_event × active_groups`.
177    /// Default: 10.
178    pub max_correlation_events: usize,
179}
180
181impl Default for CorrelationConfig {
182    fn default() -> Self {
183        CorrelationConfig {
184            timestamp_fields: vec![
185                "@timestamp".to_string(),
186                "timestamp".to_string(),
187                "EventTime".to_string(),
188                "TimeCreated".to_string(),
189                "eventTime".to_string(),
190            ],
191            timestamp_fallback: TimestampFallback::default(),
192            max_state_entries: 100_000,
193            suppress: None,
194            action_on_match: CorrelationAction::default(),
195            emit_detections: true,
196            correlation_event_mode: CorrelationEventMode::default(),
197            max_correlation_events: 10,
198        }
199    }
200}
201
202// =============================================================================
203// Result types
204// =============================================================================
205
206/// Combined result from processing a single event.
207#[derive(Debug, Clone, Serialize)]
208pub struct ProcessResult {
209    /// Detection rule matches (stateless, immediate).
210    pub detections: Vec<MatchResult>,
211    /// Correlation rule matches (stateful, accumulated).
212    pub correlations: Vec<CorrelationResult>,
213}
214
215/// The result of a correlation rule firing.
216#[derive(Debug, Clone, Serialize)]
217pub struct CorrelationResult {
218    /// Title of the correlation rule.
219    pub rule_title: String,
220    /// ID of the correlation rule (if present).
221    pub rule_id: Option<String>,
222    /// Severity level.
223    pub level: Option<Level>,
224    /// Tags from the correlation rule.
225    pub tags: Vec<String>,
226    /// Type of correlation.
227    pub correlation_type: CorrelationType,
228    /// Group-by field names and their values for this match.
229    pub group_key: Vec<(String, String)>,
230    /// The aggregated value that triggered the condition (count, sum, avg, etc.).
231    pub aggregated_value: f64,
232    /// The time window in seconds.
233    pub timespan_secs: u64,
234    /// Full event bodies, included when `correlation_event_mode` is `Full`.
235    ///
236    /// Contains up to `max_correlation_events` recently stored window events.
237    /// Events are decompressed from deflate storage on output.
238    #[serde(skip_serializing_if = "Option::is_none")]
239    pub events: Option<Vec<serde_json::Value>>,
240    /// Lightweight event references, included when `correlation_event_mode` is `Refs`.
241    ///
242    /// Contains up to `max_correlation_events` timestamp + optional ID pairs.
243    #[serde(skip_serializing_if = "Option::is_none")]
244    pub event_refs: Option<Vec<EventRef>>,
245}
246
247// =============================================================================
248// Correlation Engine
249// =============================================================================
250
251/// Stateful correlation engine.
252///
253/// Wraps the stateless `Engine` for detection rules and adds time-windowed
254/// correlation on top. Supports all 7 Sigma correlation types and chaining.
255pub struct CorrelationEngine {
256    /// Inner stateless detection engine.
257    engine: Engine,
258    /// Compiled correlation rules.
259    correlations: Vec<CompiledCorrelation>,
260    /// Maps rule ID/name -> indices into `correlations` that reference it.
261    /// This allows quick lookup: "which correlations care about rule X?"
262    rule_index: HashMap<String, Vec<usize>>,
263    /// Maps detection rule index -> (rule_id, rule_name) for reverse lookup.
264    /// Used to find which correlations a detection match triggers.
265    rule_ids: Vec<(Option<String>, Option<String>)>,
266    /// Per-(correlation_index, group_key) window state.
267    state: HashMap<(usize, GroupKey), WindowState>,
268    /// Last alert timestamp per (correlation_index, group_key) for suppression.
269    last_alert: HashMap<(usize, GroupKey), i64>,
270    /// Per-(correlation_index, group_key) compressed event buffer (`Full` mode).
271    event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
272    /// Per-(correlation_index, group_key) event reference buffer (`Refs` mode).
273    event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
274    /// Set of detection rule IDs/names that are "correlation-only"
275    /// (referenced by correlations where `generate == false`).
276    /// Used to filter detection output when `config.emit_detections == false`.
277    correlation_only_rules: std::collections::HashSet<String>,
278    /// Configuration.
279    config: CorrelationConfig,
280    /// Processing pipelines applied to rules during add_rule.
281    pipelines: Vec<Pipeline>,
282}
283
284impl CorrelationEngine {
285    /// Create a new correlation engine with the given configuration.
286    pub fn new(config: CorrelationConfig) -> Self {
287        CorrelationEngine {
288            engine: Engine::new(),
289            correlations: Vec::new(),
290            rule_index: HashMap::new(),
291            rule_ids: Vec::new(),
292            state: HashMap::new(),
293            last_alert: HashMap::new(),
294            event_buffers: HashMap::new(),
295            event_ref_buffers: HashMap::new(),
296            correlation_only_rules: std::collections::HashSet::new(),
297            config,
298            pipelines: Vec::new(),
299        }
300    }
301
302    /// Add a pipeline to the engine.
303    ///
304    /// Pipelines are applied to rules during `add_rule` / `add_collection`.
305    pub fn add_pipeline(&mut self, pipeline: Pipeline) {
306        self.pipelines.push(pipeline);
307        self.pipelines.sort_by_key(|p| p.priority);
308    }
309
310    /// Set global `include_event` on the inner detection engine.
311    pub fn set_include_event(&mut self, include: bool) {
312        self.engine.set_include_event(include);
313    }
314
315    /// Set the global correlation event mode.
316    ///
317    /// - `None`: no event storage (default)
318    /// - `Full`: compressed event bodies
319    /// - `Refs`: lightweight timestamp + ID references
320    pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
321        self.config.correlation_event_mode = mode;
322    }
323
324    /// Set the maximum number of events to store per correlation window group.
325    /// Only meaningful when `correlation_event_mode` is not `None`.
326    pub fn set_max_correlation_events(&mut self, max: usize) {
327        self.config.max_correlation_events = max;
328    }
329
330    /// Add a single detection rule.
331    ///
332    /// If pipelines are set, the rule is cloned and transformed before compilation.
333    /// The inner engine receives the already-transformed rule directly (not through
334    /// its own pipeline, to avoid double transformation).
335    pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
336        if self.pipelines.is_empty() {
337            self.apply_custom_attributes(&rule.custom_attributes);
338            self.rule_ids.push((rule.id.clone(), rule.name.clone()));
339            self.engine.add_rule(rule)?;
340        } else {
341            let mut transformed = rule.clone();
342            apply_pipelines(&self.pipelines, &mut transformed)?;
343            self.apply_custom_attributes(&transformed.custom_attributes);
344            self.rule_ids
345                .push((transformed.id.clone(), transformed.name.clone()));
346            // Use compile_rule + add_compiled_rule to bypass inner engine's pipelines
347            let compiled = crate::compiler::compile_rule(&transformed)?;
348            self.engine.add_compiled_rule(compiled);
349        }
350        Ok(())
351    }
352
353    /// Read `rsigma.*` custom attributes from a rule and apply them to the
354    /// engine configuration.  This allows pipelines to influence engine
355    /// behaviour via `SetCustomAttribute` transformations — the same pattern
356    /// used by pySigma backends (e.g. pySigma-backend-loki).
357    ///
358    /// Supported attributes:
359    /// - `rsigma.timestamp_field` — prepends a field name to the timestamp
360    ///   extraction priority list so the correlation engine can find the
361    ///   event timestamp in non-standard field names.
362    /// - `rsigma.suppress` — sets the default suppression window (e.g. `5m`).
363    ///   Only applied when the CLI did not already set `--suppress`.
364    /// - `rsigma.action` — sets the default post-fire action (`alert`/`reset`).
365    ///   Only applied when the CLI did not already set `--action`.
366    fn apply_custom_attributes(&mut self, attrs: &std::collections::HashMap<String, String>) {
367        // rsigma.timestamp_field — prepend to priority list, skip duplicates
368        if let Some(field) = attrs.get("rsigma.timestamp_field")
369            && !self.config.timestamp_fields.contains(field)
370        {
371            self.config.timestamp_fields.insert(0, field.clone());
372        }
373
374        // rsigma.suppress — only when CLI didn't already set one
375        if let Some(val) = attrs.get("rsigma.suppress")
376            && self.config.suppress.is_none()
377            && let Ok(ts) = rsigma_parser::Timespan::parse(val)
378        {
379            self.config.suppress = Some(ts.seconds);
380        }
381
382        // rsigma.action — only when CLI left it at the default (Alert)
383        if let Some(val) = attrs.get("rsigma.action")
384            && self.config.action_on_match == CorrelationAction::Alert
385            && let Ok(a) = val.parse::<CorrelationAction>()
386        {
387            self.config.action_on_match = a;
388        }
389    }
390
391    /// Add a single correlation rule.
392    pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
393        // Apply engine-level custom attributes from the correlation rule
394        // (e.g. rsigma.timestamp_field).
395        self.apply_custom_attributes(&corr.custom_attributes);
396
397        let compiled = compile_correlation(corr)?;
398        let idx = self.correlations.len();
399
400        // Index by each referenced rule ID/name
401        for rule_ref in &compiled.rule_refs {
402            self.rule_index
403                .entry(rule_ref.clone())
404                .or_default()
405                .push(idx);
406        }
407
408        // Track correlation-only rules (generate == false is the default)
409        if !compiled.generate {
410            for rule_ref in &compiled.rule_refs {
411                self.correlation_only_rules.insert(rule_ref.clone());
412            }
413        }
414
415        self.correlations.push(compiled);
416        Ok(())
417    }
418
419    /// Add all rules and correlations from a parsed collection.
420    ///
421    /// Detection rules are added first (so they're available for correlation
422    /// references), then correlation rules.
423    pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
424        for rule in &collection.rules {
425            self.add_rule(rule)?;
426        }
427        // Apply filter rules to the inner engine's detection rules
428        for filter in &collection.filters {
429            self.engine.apply_filter(filter)?;
430        }
431        for corr in &collection.correlations {
432            self.add_correlation(corr)?;
433        }
434        self.validate_rule_refs()?;
435        self.detect_correlation_cycles()?;
436        Ok(())
437    }
438
439    /// Validate that every correlation's `rule_refs` resolve to at least one
440    /// known detection rule (by ID or name) or another correlation (by ID or name).
441    fn validate_rule_refs(&self) -> Result<()> {
442        let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
443
444        for (id, name) in &self.rule_ids {
445            if let Some(id) = id {
446                known.insert(id.as_str());
447            }
448            if let Some(name) = name {
449                known.insert(name.as_str());
450            }
451        }
452        for corr in &self.correlations {
453            if let Some(ref id) = corr.id {
454                known.insert(id.as_str());
455            }
456            if let Some(ref name) = corr.name {
457                known.insert(name.as_str());
458            }
459        }
460
461        for corr in &self.correlations {
462            for rule_ref in &corr.rule_refs {
463                if !known.contains(rule_ref.as_str()) {
464                    return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
465                }
466            }
467        }
468        Ok(())
469    }
470
471    /// Detect cycles in the correlation reference graph.
472    ///
473    /// Builds a directed graph where each correlation (identified by its id/name)
474    /// has edges to the correlations it references via `rule_refs`. Uses DFS with
475    /// a "gray/black" coloring scheme to detect back-edges (cycles).
476    ///
477    /// Returns `Err(EvalError::CorrelationCycle)` if a cycle is found.
478    fn detect_correlation_cycles(&self) -> Result<()> {
479        // Build a set of all correlation identifiers (id and/or name)
480        let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
481        for (idx, corr) in self.correlations.iter().enumerate() {
482            if let Some(ref id) = corr.id {
483                corr_identifiers.insert(id.as_str(), idx);
484            }
485            if let Some(ref name) = corr.name {
486                corr_identifiers.insert(name.as_str(), idx);
487            }
488        }
489
490        // Build adjacency list: corr index → set of corr indices it references
491        let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
492        for (idx, corr) in self.correlations.iter().enumerate() {
493            for rule_ref in &corr.rule_refs {
494                if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
495                    adj[idx].push(target_idx);
496                }
497            }
498        }
499
500        // DFS cycle detection with three states: white (unvisited), gray (in stack), black (done)
501        let mut state = vec![0u8; self.correlations.len()]; // 0=white, 1=gray, 2=black
502        let mut path: Vec<usize> = Vec::new();
503
504        for start in 0..self.correlations.len() {
505            if state[start] == 0
506                && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
507            {
508                let names: Vec<String> = cycle
509                    .iter()
510                    .map(|&i| {
511                        self.correlations[i]
512                            .id
513                            .as_deref()
514                            .or(self.correlations[i].name.as_deref())
515                            .unwrap_or(&self.correlations[i].title)
516                            .to_string()
517                    })
518                    .collect();
519                return Err(crate::error::EvalError::CorrelationCycle(
520                    names.join(" -> "),
521                ));
522            }
523        }
524        Ok(())
525    }
526
527    /// DFS helper that returns the cycle path if a back-edge is found.
528    fn dfs_find_cycle(
529        node: usize,
530        adj: &[Vec<usize>],
531        state: &mut [u8],
532        path: &mut Vec<usize>,
533    ) -> Option<Vec<usize>> {
534        state[node] = 1; // gray
535        path.push(node);
536
537        for &next in &adj[node] {
538            if state[next] == 1 {
539                // Back-edge found — extract cycle from path
540                if let Some(pos) = path.iter().position(|&n| n == next) {
541                    let mut cycle = path[pos..].to_vec();
542                    cycle.push(next); // close the cycle
543                    return Some(cycle);
544                }
545            }
546            if state[next] == 0
547                && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
548            {
549                return Some(cycle);
550            }
551        }
552
553        path.pop();
554        state[node] = 2; // black
555        None
556    }
557
558    /// Process an event, extracting the timestamp from configured event fields.
559    ///
560    /// When no timestamp field is found, the `timestamp_fallback` policy applies:
561    /// - `WallClock`: use `Utc::now()` (good for real-time streaming)
562    /// - `Skip`: return detections only, skip correlation state updates
563    pub fn process_event(&mut self, event: &Event) -> ProcessResult {
564        let ts = match self.extract_event_timestamp(event) {
565            Some(ts) => ts,
566            None => match self.config.timestamp_fallback {
567                TimestampFallback::WallClock => Utc::now().timestamp(),
568                TimestampFallback::Skip => {
569                    // Still run detection (stateless), but skip correlation
570                    let all_detections = self.engine.evaluate(event);
571                    let detections = self.filter_detections(all_detections);
572                    return ProcessResult {
573                        detections,
574                        correlations: Vec::new(),
575                    };
576                }
577            },
578        };
579        self.process_event_at(event, ts)
580    }
581
582    /// Process an event with an explicit Unix epoch timestamp (seconds).
583    ///
584    /// The timestamp is clamped to `[0, i64::MAX / 2]` to prevent overflow
585    /// when adding timespan durations internally.
586    pub fn process_event_at(&mut self, event: &Event, timestamp_secs: i64) -> ProcessResult {
587        let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
588
589        // Step 1: Memory management — evict before adding new state to enforce limit
590        if self.state.len() >= self.config.max_state_entries {
591            self.evict_all(timestamp_secs);
592        }
593
594        // Step 2: Run stateless detection
595        let all_detections = self.engine.evaluate(event);
596
597        // Step 3: Feed detection matches into correlations
598        let mut correlations = Vec::new();
599        self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
600
601        // Step 4: Chain — correlation results may trigger higher-level correlations
602        self.chain_correlations(&correlations, timestamp_secs);
603
604        // Step 5: Filter detections by generate flag
605        let detections = self.filter_detections(all_detections);
606
607        ProcessResult {
608            detections,
609            correlations,
610        }
611    }
612
613    /// Filter detections by the `generate` flag / `emit_detections` config.
614    ///
615    /// If `emit_detections` is false and some rules are correlation-only,
616    /// their detection output is suppressed.
617    fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
618        if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
619            all_detections
620                .into_iter()
621                .filter(|m| {
622                    let id_match = m
623                        .rule_id
624                        .as_ref()
625                        .is_some_and(|id| self.correlation_only_rules.contains(id));
626                    !id_match
627                })
628                .collect()
629        } else {
630            all_detections
631        }
632    }
633
634    /// Feed detection matches into correlation window states.
635    fn feed_detections(
636        &mut self,
637        event: &Event,
638        detections: &[MatchResult],
639        ts: i64,
640        out: &mut Vec<CorrelationResult>,
641    ) {
642        // Collect all (corr_idx, rule_id, rule_name) tuples upfront to avoid
643        // borrow conflicts between self.rule_ids and self.update_correlation.
644        let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
645
646        for det in detections {
647            // Use the MatchResult's rule_id to find the original rule's ID/name.
648            // We also look up by rule_id in our rule_ids table for the name.
649            let (rule_id, rule_name) = self.find_rule_identity(det);
650
651            // Collect correlation indices that reference this rule
652            let mut corr_indices = Vec::new();
653            if let Some(ref id) = rule_id
654                && let Some(indices) = self.rule_index.get(id)
655            {
656                corr_indices.extend(indices);
657            }
658            if let Some(ref name) = rule_name
659                && let Some(indices) = self.rule_index.get(name)
660            {
661                corr_indices.extend(indices);
662            }
663
664            corr_indices.sort_unstable();
665            corr_indices.dedup();
666
667            for &corr_idx in &corr_indices {
668                work.push((corr_idx, rule_id.clone(), rule_name.clone()));
669            }
670        }
671
672        for (corr_idx, rule_id, rule_name) in work {
673            self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
674        }
675    }
676
677    /// Find the (id, name) for a detection match by searching our rule_ids table.
678    fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
679        // First, try to find by matching rule_id in our table
680        if let Some(ref match_id) = det.rule_id {
681            for (id, name) in &self.rule_ids {
682                if id.as_deref() == Some(match_id.as_str()) {
683                    return (id.clone(), name.clone());
684                }
685            }
686        }
687        // Fall back to using just the MatchResult's rule_id
688        (det.rule_id.clone(), None)
689    }
690
691    /// Resolve the event mode for a given correlation.
692    fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
693        let corr = &self.correlations[corr_idx];
694        corr.event_mode
695            .unwrap_or(self.config.correlation_event_mode)
696    }
697
698    /// Resolve the max events cap for a given correlation.
699    fn resolve_max_events(&self, corr_idx: usize) -> usize {
700        let corr = &self.correlations[corr_idx];
701        corr.max_events
702            .unwrap_or(self.config.max_correlation_events)
703    }
704
705    /// Update a single correlation's state and check its condition.
706    fn update_correlation(
707        &mut self,
708        corr_idx: usize,
709        event: &Event,
710        ts: i64,
711        rule_id: &Option<String>,
712        rule_name: &Option<String>,
713        out: &mut Vec<CorrelationResult>,
714    ) {
715        // Borrow the correlation by reference — no cloning needed.  Rust allows
716        // simultaneous &self.correlations and &mut self.state / &mut self.last_alert
717        // because they are disjoint struct fields.
718        let corr = &self.correlations[corr_idx];
719        let corr_type = corr.correlation_type;
720        let timespan = corr.timespan_secs;
721        let level = corr.level;
722        let suppress_secs = corr.suppress_secs.or(self.config.suppress);
723        let action = corr.action.unwrap_or(self.config.action_on_match);
724        let event_mode = self.resolve_event_mode(corr_idx);
725        let max_events = self.resolve_max_events(corr_idx);
726
727        // Determine the rule_ref strings for alias resolution and temporal tracking.
728        let mut ref_strs: Vec<&str> = Vec::new();
729        if let Some(id) = rule_id.as_deref() {
730            ref_strs.push(id);
731        }
732        if let Some(name) = rule_name.as_deref() {
733            ref_strs.push(name);
734        }
735        let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
736
737        // Extract group key
738        let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
739
740        // Get or create window state
741        let state_key = (corr_idx, group_key.clone());
742        let state = self
743            .state
744            .entry(state_key.clone())
745            .or_insert_with(|| WindowState::new_for(corr_type));
746
747        // Evict expired entries
748        let cutoff = ts - timespan as i64;
749        state.evict(cutoff);
750
751        // Push the new event into the state
752        match corr_type {
753            CorrelationType::EventCount => {
754                state.push_event_count(ts);
755            }
756            CorrelationType::ValueCount => {
757                if let Some(ref field_name) = corr.condition.field
758                    && let Some(val) = event.get_field(field_name)
759                    && let Some(s) = value_to_string_for_count(val)
760                {
761                    state.push_value_count(ts, s);
762                }
763            }
764            CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
765                state.push_temporal(ts, rule_ref);
766            }
767            CorrelationType::ValueSum
768            | CorrelationType::ValueAvg
769            | CorrelationType::ValuePercentile
770            | CorrelationType::ValueMedian => {
771                if let Some(ref field_name) = corr.condition.field
772                    && let Some(val) = event.get_field(field_name)
773                    && let Some(n) = value_to_f64(val)
774                {
775                    state.push_numeric(ts, n);
776                }
777            }
778        }
779
780        // Push event into buffer based on event mode
781        match event_mode {
782            CorrelationEventMode::Full => {
783                let buf = self
784                    .event_buffers
785                    .entry(state_key.clone())
786                    .or_insert_with(|| EventBuffer::new(max_events));
787                buf.evict(cutoff);
788                buf.push(ts, event.as_value());
789            }
790            CorrelationEventMode::Refs => {
791                let buf = self
792                    .event_ref_buffers
793                    .entry(state_key.clone())
794                    .or_insert_with(|| EventRefBuffer::new(max_events));
795                buf.evict(cutoff);
796                buf.push(ts, event.as_value());
797            }
798            CorrelationEventMode::None => {}
799        }
800
801        // Check condition — after this, `state` is no longer used (NLL drops the borrow).
802        let fired = state.check_condition(
803            &corr.condition,
804            corr_type,
805            &corr.rule_refs,
806            corr.extended_expr.as_ref(),
807        );
808
809        if let Some(agg_value) = fired {
810            let alert_key = (corr_idx, group_key.clone());
811
812            // Suppression check: skip if we've already alerted within the suppress window
813            let suppressed = if let Some(suppress) = suppress_secs {
814                if let Some(&last_ts) = self.last_alert.get(&alert_key) {
815                    (ts - last_ts) < suppress as i64
816                } else {
817                    false
818                }
819            } else {
820                false
821            };
822
823            if !suppressed {
824                // Retrieve stored events / refs based on mode
825                let (events, event_refs) = match event_mode {
826                    CorrelationEventMode::Full => {
827                        let stored = self
828                            .event_buffers
829                            .get(&alert_key)
830                            .map(|buf| buf.decompress_all())
831                            .unwrap_or_default();
832                        (Some(stored), None)
833                    }
834                    CorrelationEventMode::Refs => {
835                        let stored = self
836                            .event_ref_buffers
837                            .get(&alert_key)
838                            .map(|buf| buf.refs())
839                            .unwrap_or_default();
840                        (None, Some(stored))
841                    }
842                    CorrelationEventMode::None => (None, None),
843                };
844
845                // Only clone title/id/tags when we actually produce output
846                let corr = &self.correlations[corr_idx];
847                let result = CorrelationResult {
848                    rule_title: corr.title.clone(),
849                    rule_id: corr.id.clone(),
850                    level,
851                    tags: corr.tags.clone(),
852                    correlation_type: corr_type,
853                    group_key: group_key.to_pairs(&corr.group_by),
854                    aggregated_value: agg_value,
855                    timespan_secs: timespan,
856                    events,
857                    event_refs,
858                };
859                out.push(result);
860
861                // Record alert time for suppression
862                self.last_alert.insert(alert_key.clone(), ts);
863
864                // Action on match
865                if action == CorrelationAction::Reset {
866                    if let Some(state) = self.state.get_mut(&alert_key) {
867                        state.clear();
868                    }
869                    if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
870                        buf.clear();
871                    }
872                    if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
873                        buf.clear();
874                    }
875                }
876            }
877        }
878    }
879
880    /// Propagate correlation results to higher-level correlations (chaining).
881    ///
882    /// When a correlation fires, any correlation that references it (by ID or name)
883    /// is updated. Limits chain depth to 10 to prevent infinite loops.
884    fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
885        const MAX_CHAIN_DEPTH: usize = 10;
886        let mut pending: Vec<CorrelationResult> = fired.to_vec();
887        let mut depth = 0;
888
889        while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
890            depth += 1;
891
892            // Collect work items: (corr_idx, group_key_pairs, fired_ref)
893            #[allow(clippy::type_complexity)]
894            let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
895            for result in &pending {
896                if let Some(ref id) = result.rule_id
897                    && let Some(indices) = self.rule_index.get(id)
898                {
899                    let fired_ref = result
900                        .rule_id
901                        .as_deref()
902                        .unwrap_or(&result.rule_title)
903                        .to_string();
904                    for &corr_idx in indices {
905                        work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
906                    }
907                }
908            }
909
910            let mut next_pending = Vec::new();
911            for (corr_idx, group_key_pairs, fired_ref) in work {
912                let corr = &self.correlations[corr_idx];
913                let corr_type = corr.correlation_type;
914                let timespan = corr.timespan_secs;
915                let level = corr.level;
916
917                let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
918                let state_key = (corr_idx, group_key.clone());
919                let state = self
920                    .state
921                    .entry(state_key)
922                    .or_insert_with(|| WindowState::new_for(corr_type));
923
924                let cutoff = ts - timespan as i64;
925                state.evict(cutoff);
926
927                match corr_type {
928                    CorrelationType::EventCount => {
929                        state.push_event_count(ts);
930                    }
931                    CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
932                        state.push_temporal(ts, &fired_ref);
933                    }
934                    _ => {
935                        state.push_event_count(ts);
936                    }
937                }
938
939                let fired = state.check_condition(
940                    &corr.condition,
941                    corr_type,
942                    &corr.rule_refs,
943                    corr.extended_expr.as_ref(),
944                );
945
946                if let Some(agg_value) = fired {
947                    let corr = &self.correlations[corr_idx];
948                    next_pending.push(CorrelationResult {
949                        rule_title: corr.title.clone(),
950                        rule_id: corr.id.clone(),
951                        level,
952                        tags: corr.tags.clone(),
953                        correlation_type: corr_type,
954                        group_key: group_key.to_pairs(&corr.group_by),
955                        aggregated_value: agg_value,
956                        timespan_secs: timespan,
957                        // Chained correlations don't include events (they aggregate
958                        // over correlation results, not raw events)
959                        events: None,
960                        event_refs: None,
961                    });
962                }
963            }
964
965            pending = next_pending;
966        }
967
968        if !pending.is_empty() {
969            log::warn!(
970                "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
971                 {} pending result(s) were not propagated further. \
972                 This may indicate a cycle in correlation references.",
973                pending.len()
974            );
975        }
976    }
977
978    // =========================================================================
979    // Timestamp extraction
980    // =========================================================================
981
982    /// Extract a Unix epoch timestamp (seconds) from an event.
983    ///
984    /// Tries each configured timestamp field in order. Supports:
985    /// - Numeric values (epoch seconds, or epoch millis if > 1e12)
986    /// - ISO 8601 strings (e.g., "2024-07-10T12:30:00Z")
987    ///
988    /// Returns `None` if no field yields a valid timestamp.
989    fn extract_event_timestamp(&self, event: &Event) -> Option<i64> {
990        for field_name in &self.config.timestamp_fields {
991            if let Some(val) = event.get_field(field_name)
992                && let Some(ts) = parse_timestamp_value(val)
993            {
994                return Some(ts);
995            }
996        }
997        None
998    }
999
1000    // =========================================================================
1001    // State management
1002    // =========================================================================
1003
1004    /// Manually evict all expired state entries.
1005    pub fn evict_expired(&mut self, now_secs: i64) {
1006        self.evict_all(now_secs);
1007    }
1008
1009    /// Evict expired entries and remove empty states.
1010    fn evict_all(&mut self, now_secs: i64) {
1011        // Phase 1: Time-based eviction — remove entries outside their correlation window
1012        let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
1013
1014        self.state.retain(|&(corr_idx, _), state| {
1015            if corr_idx < timespans.len() {
1016                let cutoff = now_secs - timespans[corr_idx] as i64;
1017                state.evict(cutoff);
1018            }
1019            !state.is_empty()
1020        });
1021
1022        // Evict event buffers in sync with window state
1023        self.event_buffers.retain(|&(corr_idx, _), buf| {
1024            if corr_idx < timespans.len() {
1025                let cutoff = now_secs - timespans[corr_idx] as i64;
1026                buf.evict(cutoff);
1027            }
1028            !buf.is_empty()
1029        });
1030        self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
1031            if corr_idx < timespans.len() {
1032                let cutoff = now_secs - timespans[corr_idx] as i64;
1033                buf.evict(cutoff);
1034            }
1035            !buf.is_empty()
1036        });
1037
1038        // Phase 2: Hard cap — if still over limit after time-based eviction (e.g.
1039        // high-cardinality traffic with long windows), drop the stalest entries
1040        // until we're at 90% capacity to avoid evicting on every single event.
1041        if self.state.len() >= self.config.max_state_entries {
1042            let target = self.config.max_state_entries * 9 / 10;
1043            let excess = self.state.len() - target;
1044
1045            // Collect keys with their latest timestamp, sort by oldest first
1046            let mut by_staleness: Vec<_> = self
1047                .state
1048                .iter()
1049                .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1050                .collect();
1051            by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1052
1053            // Drop the oldest entries (and their associated event buffers)
1054            for (key, _) in by_staleness.into_iter().take(excess) {
1055                self.state.remove(&key);
1056                self.last_alert.remove(&key);
1057                self.event_buffers.remove(&key);
1058                self.event_ref_buffers.remove(&key);
1059            }
1060        }
1061
1062        // Phase 3: Evict stale last_alert entries — remove if the suppress window
1063        // has passed or if the corresponding window state no longer exists.
1064        self.last_alert.retain(|key, &mut alert_ts| {
1065            let suppress = if key.0 < self.correlations.len() {
1066                self.correlations[key.0]
1067                    .suppress_secs
1068                    .or(self.config.suppress)
1069                    .unwrap_or(0)
1070            } else {
1071                0
1072            };
1073            (now_secs - alert_ts) < suppress as i64
1074        });
1075    }
1076
1077    /// Number of active state entries (for monitoring).
1078    pub fn state_count(&self) -> usize {
1079        self.state.len()
1080    }
1081
1082    /// Number of detection rules loaded.
1083    pub fn detection_rule_count(&self) -> usize {
1084        self.engine.rule_count()
1085    }
1086
1087    /// Number of correlation rules loaded.
1088    pub fn correlation_rule_count(&self) -> usize {
1089        self.correlations.len()
1090    }
1091
1092    /// Number of active event buffers (for monitoring).
1093    pub fn event_buffer_count(&self) -> usize {
1094        self.event_buffers.len()
1095    }
1096
1097    /// Total compressed bytes across all event buffers (for monitoring).
1098    pub fn event_buffer_bytes(&self) -> usize {
1099        self.event_buffers
1100            .values()
1101            .map(|b| b.compressed_bytes())
1102            .sum()
1103    }
1104
1105    /// Number of active event ref buffers — `Refs` mode (for monitoring).
1106    pub fn event_ref_buffer_count(&self) -> usize {
1107        self.event_ref_buffers.len()
1108    }
1109
1110    /// Access the inner stateless engine.
1111    pub fn engine(&self) -> &Engine {
1112        &self.engine
1113    }
1114
1115    /// Export all mutable correlation state as a serializable snapshot.
1116    ///
1117    /// The snapshot uses stable correlation identifiers (id > name > title)
1118    /// instead of internal indices, so it survives rule reloads as long as
1119    /// the correlation rules keep the same identifiers.
1120    pub fn export_state(&self) -> CorrelationSnapshot {
1121        let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
1122        for ((idx, gk), ws) in &self.state {
1123            let corr_id = self.correlation_stable_id(*idx);
1124            windows
1125                .entry(corr_id)
1126                .or_default()
1127                .push((gk.clone(), ws.clone()));
1128        }
1129
1130        let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
1131        for ((idx, gk), ts) in &self.last_alert {
1132            let corr_id = self.correlation_stable_id(*idx);
1133            last_alert
1134                .entry(corr_id)
1135                .or_default()
1136                .push((gk.clone(), *ts));
1137        }
1138
1139        let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
1140        for ((idx, gk), buf) in &self.event_buffers {
1141            let corr_id = self.correlation_stable_id(*idx);
1142            event_buffers
1143                .entry(corr_id)
1144                .or_default()
1145                .push((gk.clone(), buf.clone()));
1146        }
1147
1148        let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
1149            HashMap::new();
1150        for ((idx, gk), buf) in &self.event_ref_buffers {
1151            let corr_id = self.correlation_stable_id(*idx);
1152            event_ref_buffers
1153                .entry(corr_id)
1154                .or_default()
1155                .push((gk.clone(), buf.clone()));
1156        }
1157
1158        CorrelationSnapshot {
1159            version: SNAPSHOT_VERSION,
1160            windows,
1161            last_alert,
1162            event_buffers,
1163            event_ref_buffers,
1164        }
1165    }
1166
1167    /// Import previously exported state, mapping stable identifiers back to
1168    /// current correlation indices. Entries whose identifiers no longer match
1169    /// any loaded correlation are silently dropped.
1170    ///
1171    /// Returns `false` (and imports nothing) if the snapshot version is
1172    /// incompatible with the current schema.
1173    pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
1174        if snapshot.version != SNAPSHOT_VERSION {
1175            return false;
1176        }
1177        let id_to_idx = self.build_id_to_index_map();
1178
1179        for (corr_id, groups) in snapshot.windows {
1180            if let Some(&idx) = id_to_idx.get(&corr_id) {
1181                for (gk, ws) in groups {
1182                    self.state.insert((idx, gk), ws);
1183                }
1184            }
1185        }
1186
1187        for (corr_id, groups) in snapshot.last_alert {
1188            if let Some(&idx) = id_to_idx.get(&corr_id) {
1189                for (gk, ts) in groups {
1190                    self.last_alert.insert((idx, gk), ts);
1191                }
1192            }
1193        }
1194
1195        for (corr_id, groups) in snapshot.event_buffers {
1196            if let Some(&idx) = id_to_idx.get(&corr_id) {
1197                for (gk, buf) in groups {
1198                    self.event_buffers.insert((idx, gk), buf);
1199                }
1200            }
1201        }
1202
1203        for (corr_id, groups) in snapshot.event_ref_buffers {
1204            if let Some(&idx) = id_to_idx.get(&corr_id) {
1205                for (gk, buf) in groups {
1206                    self.event_ref_buffers.insert((idx, gk), buf);
1207                }
1208            }
1209        }
1210
1211        true
1212    }
1213
1214    /// Stable identifier for a correlation rule: prefers id, then name, then title.
1215    fn correlation_stable_id(&self, idx: usize) -> String {
1216        let corr = &self.correlations[idx];
1217        corr.id
1218            .clone()
1219            .or_else(|| corr.name.clone())
1220            .unwrap_or_else(|| corr.title.clone())
1221    }
1222
1223    /// Build a reverse map from stable id → current correlation index.
1224    fn build_id_to_index_map(&self) -> HashMap<String, usize> {
1225        self.correlations
1226            .iter()
1227            .enumerate()
1228            .map(|(idx, _)| (self.correlation_stable_id(idx), idx))
1229            .collect()
1230    }
1231}
1232
1233/// Current snapshot schema version. Bump when the serialized format changes.
1234const SNAPSHOT_VERSION: u32 = 1;
1235
1236/// Serializable snapshot of all mutable correlation state.
1237///
1238/// Uses stable string identifiers (correlation id/name/title) as keys so the
1239/// snapshot can be restored after a rule reload, even if internal indices change.
1240/// Inner maps use `Vec<(GroupKey, T)>` instead of `HashMap<GroupKey, T>` because
1241/// `GroupKey` cannot be used as a JSON object key.
1242#[derive(Debug, Clone, Serialize, serde::Deserialize)]
1243pub struct CorrelationSnapshot {
1244    /// Schema version — used to detect incompatible snapshots on load.
1245    #[serde(default = "default_snapshot_version")]
1246    pub version: u32,
1247    /// Per-correlation, per-group window state.
1248    pub windows: HashMap<String, Vec<(GroupKey, WindowState)>>,
1249    /// Per-correlation, per-group last alert timestamp (for suppression).
1250    pub last_alert: HashMap<String, Vec<(GroupKey, i64)>>,
1251    /// Per-correlation, per-group compressed event buffers.
1252    pub event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>>,
1253    /// Per-correlation, per-group event reference buffers.
1254    pub event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>>,
1255}
1256
1257fn default_snapshot_version() -> u32 {
1258    1
1259}
1260
1261impl Default for CorrelationEngine {
1262    fn default() -> Self {
1263        Self::new(CorrelationConfig::default())
1264    }
1265}
1266
1267// =============================================================================
1268// Timestamp parsing helpers
1269// =============================================================================
1270
1271/// Parse a JSON value as a Unix epoch timestamp in seconds.
1272fn parse_timestamp_value(val: &serde_json::Value) -> Option<i64> {
1273    match val {
1274        serde_json::Value::Number(n) => {
1275            if let Some(i) = n.as_i64() {
1276                Some(normalize_epoch(i))
1277            } else {
1278                n.as_f64().map(|f| normalize_epoch(f as i64))
1279            }
1280        }
1281        serde_json::Value::String(s) => parse_timestamp_string(s),
1282        _ => None,
1283    }
1284}
1285
1286/// Normalize an epoch value: if it looks like milliseconds (> year 33658),
1287/// convert to seconds.
1288fn normalize_epoch(v: i64) -> i64 {
1289    if v > 1_000_000_000_000 { v / 1000 } else { v }
1290}
1291
1292/// Parse a timestamp string. Tries ISO 8601 with timezone, then without.
1293fn parse_timestamp_string(s: &str) -> Option<i64> {
1294    // Try RFC 3339 / ISO 8601 with timezone
1295    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1296        return Some(dt.timestamp());
1297    }
1298
1299    // Try ISO 8601 without timezone (assume UTC)
1300    // Common formats: "2024-07-10T12:30:00", "2024-07-10 12:30:00"
1301    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1302        return Some(Utc.from_utc_datetime(&naive).timestamp());
1303    }
1304    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1305        return Some(Utc.from_utc_datetime(&naive).timestamp());
1306    }
1307
1308    // Try with fractional seconds
1309    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1310        return Some(Utc.from_utc_datetime(&naive).timestamp());
1311    }
1312    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1313        return Some(Utc.from_utc_datetime(&naive).timestamp());
1314    }
1315
1316    None
1317}
1318
1319/// Convert a JSON value to a string for value_count purposes.
1320fn value_to_string_for_count(v: &serde_json::Value) -> Option<String> {
1321    match v {
1322        serde_json::Value::String(s) => Some(s.clone()),
1323        serde_json::Value::Number(n) => Some(n.to_string()),
1324        serde_json::Value::Bool(b) => Some(b.to_string()),
1325        serde_json::Value::Null => Some("null".to_string()),
1326        _ => None,
1327    }
1328}
1329
1330/// Convert a JSON value to f64 for numeric aggregation.
1331fn value_to_f64(v: &serde_json::Value) -> Option<f64> {
1332    match v {
1333        serde_json::Value::Number(n) => n.as_f64(),
1334        serde_json::Value::String(s) => s.parse().ok(),
1335        _ => None,
1336    }
1337}
1338
1339// =============================================================================
1340// Tests
1341// =============================================================================
1342
1343#[cfg(test)]
1344mod tests {
1345    use super::*;
1346    use rsigma_parser::parse_sigma_yaml;
1347    use serde_json::json;
1348
1349    // =========================================================================
1350    // Timestamp parsing
1351    // =========================================================================
1352
1353    #[test]
1354    fn test_parse_timestamp_epoch_secs() {
1355        let val = json!(1720612200);
1356        assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1357    }
1358
1359    #[test]
1360    fn test_parse_timestamp_epoch_millis() {
1361        let val = json!(1720612200000i64);
1362        assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1363    }
1364
1365    #[test]
1366    fn test_parse_timestamp_rfc3339() {
1367        let val = json!("2024-07-10T12:30:00Z");
1368        let ts = parse_timestamp_value(&val).unwrap();
1369        assert_eq!(ts, 1720614600);
1370    }
1371
1372    #[test]
1373    fn test_parse_timestamp_naive() {
1374        let val = json!("2024-07-10T12:30:00");
1375        let ts = parse_timestamp_value(&val).unwrap();
1376        assert_eq!(ts, 1720614600);
1377    }
1378
1379    #[test]
1380    fn test_parse_timestamp_with_space() {
1381        let val = json!("2024-07-10 12:30:00");
1382        let ts = parse_timestamp_value(&val).unwrap();
1383        assert_eq!(ts, 1720614600);
1384    }
1385
1386    #[test]
1387    fn test_parse_timestamp_fractional() {
1388        let val = json!("2024-07-10T12:30:00.123Z");
1389        let ts = parse_timestamp_value(&val).unwrap();
1390        assert_eq!(ts, 1720614600);
1391    }
1392
1393    #[test]
1394    fn test_extract_timestamp_from_event() {
1395        let config = CorrelationConfig {
1396            timestamp_fields: vec!["@timestamp".to_string()],
1397            max_state_entries: 100_000,
1398            ..Default::default()
1399        };
1400        let engine = CorrelationEngine::new(config);
1401
1402        let v = json!({"@timestamp": "2024-07-10T12:30:00Z", "data": "test"});
1403        let event = Event::from_value(&v);
1404        let ts = engine.extract_event_timestamp(&event);
1405        assert_eq!(ts, Some(1720614600));
1406    }
1407
1408    #[test]
1409    fn test_extract_timestamp_fallback_fields() {
1410        let config = CorrelationConfig {
1411            timestamp_fields: vec![
1412                "@timestamp".to_string(),
1413                "timestamp".to_string(),
1414                "EventTime".to_string(),
1415            ],
1416            max_state_entries: 100_000,
1417            ..Default::default()
1418        };
1419        let engine = CorrelationEngine::new(config);
1420
1421        // First field missing, second field present
1422        let v = json!({"timestamp": 1720613400, "data": "test"});
1423        let event = Event::from_value(&v);
1424        let ts = engine.extract_event_timestamp(&event);
1425        assert_eq!(ts, Some(1720613400));
1426    }
1427
1428    #[test]
1429    fn test_extract_timestamp_returns_none_when_missing() {
1430        let config = CorrelationConfig {
1431            timestamp_fields: vec!["@timestamp".to_string()],
1432            ..Default::default()
1433        };
1434        let engine = CorrelationEngine::new(config);
1435
1436        let v = json!({"data": "no timestamp here"});
1437        let event = Event::from_value(&v);
1438        assert_eq!(engine.extract_event_timestamp(&event), None);
1439    }
1440
1441    #[test]
1442    fn test_timestamp_fallback_skip() {
1443        let yaml = r#"
1444title: test rule
1445id: ts-skip-rule
1446logsource:
1447    product: test
1448detection:
1449    selection:
1450        action: click
1451    condition: selection
1452level: low
1453---
1454title: test correlation
1455correlation:
1456    type: event_count
1457    rules:
1458        - ts-skip-rule
1459    group-by:
1460        - User
1461    timespan: 10s
1462    condition:
1463        gte: 2
1464level: high
1465"#;
1466        let collection = parse_sigma_yaml(yaml).unwrap();
1467        let mut engine = CorrelationEngine::new(CorrelationConfig {
1468            timestamp_fallback: TimestampFallback::Skip,
1469            ..Default::default()
1470        });
1471        engine.add_collection(&collection).unwrap();
1472        assert_eq!(engine.correlation_rule_count(), 1);
1473
1474        // Events with no timestamp field — should NOT update correlation state
1475        let v = json!({"action": "click", "User": "alice"});
1476        let event = Event::from_value(&v);
1477
1478        let r1 = engine.process_event(&event);
1479        assert!(!r1.detections.is_empty(), "detection should still fire");
1480
1481        let r2 = engine.process_event(&event);
1482        assert!(!r2.detections.is_empty(), "detection should still fire");
1483
1484        let r3 = engine.process_event(&event);
1485        assert!(!r3.detections.is_empty(), "detection should still fire");
1486
1487        // No correlations should fire because events were skipped
1488        assert!(r1.correlations.is_empty());
1489        assert!(r2.correlations.is_empty());
1490        assert!(r3.correlations.is_empty());
1491    }
1492
1493    #[test]
1494    fn test_timestamp_fallback_wallclock_default() {
1495        let yaml = r#"
1496title: test rule
1497id: ts-wc-rule
1498logsource:
1499    product: test
1500detection:
1501    selection:
1502        action: click
1503    condition: selection
1504level: low
1505---
1506title: test correlation
1507correlation:
1508    type: event_count
1509    rules:
1510        - ts-wc-rule
1511    group-by:
1512        - User
1513    timespan: 60s
1514    condition:
1515        gte: 2
1516level: high
1517"#;
1518        let collection = parse_sigma_yaml(yaml).unwrap();
1519        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1520        engine.add_collection(&collection).unwrap();
1521        assert_eq!(engine.correlation_rule_count(), 1);
1522
1523        // Events with no timestamp — WallClock fallback means they get Utc::now()
1524        // and should be close enough to correlate (generous 60s window)
1525        let v = json!({"action": "click", "User": "alice"});
1526        let event = Event::from_value(&v);
1527
1528        let _r1 = engine.process_event(&event);
1529        let _r2 = engine.process_event(&event);
1530        let r3 = engine.process_event(&event);
1531
1532        // With WallClock, all events get near-identical timestamps and should correlate
1533        assert!(
1534            !r3.correlations.is_empty(),
1535            "WallClock fallback should allow correlation"
1536        );
1537    }
1538
1539    // =========================================================================
1540    // Event count correlation
1541    // =========================================================================
1542
1543    #[test]
1544    fn test_event_count_basic() {
1545        let yaml = r#"
1546title: Base Rule
1547id: base-rule-001
1548name: base_rule
1549logsource:
1550    product: windows
1551    category: process_creation
1552detection:
1553    selection:
1554        CommandLine|contains: 'whoami'
1555    condition: selection
1556level: low
1557---
1558title: Multiple Whoami
1559id: corr-001
1560correlation:
1561    type: event_count
1562    rules:
1563        - base-rule-001
1564    group-by:
1565        - User
1566    timespan: 60s
1567    condition:
1568        gte: 3
1569level: high
1570"#;
1571        let collection = parse_sigma_yaml(yaml).unwrap();
1572        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1573        engine.add_collection(&collection).unwrap();
1574
1575        assert_eq!(engine.detection_rule_count(), 1);
1576        assert_eq!(engine.correlation_rule_count(), 1);
1577
1578        // Send 3 events from same user within the window
1579        let base_ts = 1000i64;
1580        for i in 0..3 {
1581            let v = json!({"CommandLine": "whoami", "User": "admin"});
1582            let event = Event::from_value(&v);
1583            let result = engine.process_event_at(&event, base_ts + i * 10);
1584
1585            // Each event should match the detection rule
1586            assert_eq!(result.detections.len(), 1);
1587
1588            if i < 2 {
1589                // Not enough events yet
1590                assert!(result.correlations.is_empty());
1591            } else {
1592                // 3rd event triggers the correlation
1593                assert_eq!(result.correlations.len(), 1);
1594                assert_eq!(result.correlations[0].rule_title, "Multiple Whoami");
1595                assert_eq!(result.correlations[0].aggregated_value, 3.0);
1596            }
1597        }
1598    }
1599
1600    #[test]
1601    fn test_event_count_different_groups() {
1602        let yaml = r#"
1603title: Login
1604id: login-001
1605logsource:
1606    category: auth
1607detection:
1608    selection:
1609        EventType: login
1610    condition: selection
1611level: low
1612---
1613title: Many Logins
1614id: corr-login
1615correlation:
1616    type: event_count
1617    rules:
1618        - login-001
1619    group-by:
1620        - User
1621    timespan: 60s
1622    condition:
1623        gte: 3
1624level: high
1625"#;
1626        let collection = parse_sigma_yaml(yaml).unwrap();
1627        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1628        engine.add_collection(&collection).unwrap();
1629
1630        // User "alice" sends 2 events, "bob" sends 3
1631        let ts = 1000i64;
1632        for i in 0..2 {
1633            let v = json!({"EventType": "login", "User": "alice"});
1634            let event = Event::from_value(&v);
1635            let r = engine.process_event_at(&event, ts + i);
1636            assert!(r.correlations.is_empty());
1637        }
1638        for i in 0..3 {
1639            let v = json!({"EventType": "login", "User": "bob"});
1640            let event = Event::from_value(&v);
1641            let r = engine.process_event_at(&event, ts + i);
1642            if i == 2 {
1643                assert_eq!(r.correlations.len(), 1);
1644                assert_eq!(
1645                    r.correlations[0].group_key,
1646                    vec![("User".to_string(), "bob".to_string())]
1647                );
1648            }
1649        }
1650    }
1651
1652    #[test]
1653    fn test_event_count_window_expiry() {
1654        let yaml = r#"
1655title: Base
1656id: base-002
1657logsource:
1658    category: test
1659detection:
1660    selection:
1661        action: click
1662    condition: selection
1663---
1664title: Rapid Clicks
1665id: corr-002
1666correlation:
1667    type: event_count
1668    rules:
1669        - base-002
1670    group-by:
1671        - User
1672    timespan: 10s
1673    condition:
1674        gte: 3
1675level: medium
1676"#;
1677        let collection = parse_sigma_yaml(yaml).unwrap();
1678        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1679        engine.add_collection(&collection).unwrap();
1680
1681        // Send 2 events at t=0,1 then 1 event at t=15 (outside window)
1682        let v = json!({"action": "click", "User": "admin"});
1683        let event = Event::from_value(&v);
1684        engine.process_event_at(&event, 0);
1685        engine.process_event_at(&event, 1);
1686        let r = engine.process_event_at(&event, 15);
1687        // Only 1 event in window [5, 15], not enough
1688        assert!(r.correlations.is_empty());
1689    }
1690
1691    // =========================================================================
1692    // Value count correlation
1693    // =========================================================================
1694
1695    #[test]
1696    fn test_value_count() {
1697        let yaml = r#"
1698title: Failed Login
1699id: failed-login-001
1700logsource:
1701    category: auth
1702detection:
1703    selection:
1704        EventType: failed_login
1705    condition: selection
1706level: low
1707---
1708title: Failed Logins From Many Users
1709id: corr-vc-001
1710correlation:
1711    type: value_count
1712    rules:
1713        - failed-login-001
1714    group-by:
1715        - Host
1716    timespan: 60s
1717    condition:
1718        field: User
1719        gte: 3
1720level: high
1721"#;
1722        let collection = parse_sigma_yaml(yaml).unwrap();
1723        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1724        engine.add_collection(&collection).unwrap();
1725
1726        let ts = 1000i64;
1727        // 3 different users failing login on same host
1728        for (i, user) in ["alice", "bob", "charlie"].iter().enumerate() {
1729            let v = json!({"EventType": "failed_login", "Host": "srv01", "User": user});
1730            let event = Event::from_value(&v);
1731            let r = engine.process_event_at(&event, ts + i as i64);
1732            if i == 2 {
1733                assert_eq!(r.correlations.len(), 1);
1734                assert_eq!(r.correlations[0].aggregated_value, 3.0);
1735            }
1736        }
1737    }
1738
1739    // =========================================================================
1740    // Temporal correlation
1741    // =========================================================================
1742
1743    #[test]
1744    fn test_temporal() {
1745        let yaml = r#"
1746title: Recon A
1747id: recon-a
1748name: recon_a
1749logsource:
1750    category: process
1751detection:
1752    selection:
1753        CommandLine|contains: 'whoami'
1754    condition: selection
1755---
1756title: Recon B
1757id: recon-b
1758name: recon_b
1759logsource:
1760    category: process
1761detection:
1762    selection:
1763        CommandLine|contains: 'ipconfig'
1764    condition: selection
1765---
1766title: Recon Combo
1767id: corr-temporal
1768correlation:
1769    type: temporal
1770    rules:
1771        - recon-a
1772        - recon-b
1773    group-by:
1774        - User
1775    timespan: 60s
1776    condition:
1777        gte: 2
1778level: high
1779"#;
1780        let collection = parse_sigma_yaml(yaml).unwrap();
1781        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1782        engine.add_collection(&collection).unwrap();
1783
1784        let ts = 1000i64;
1785        // Only recon A fires
1786        let v1 = json!({"CommandLine": "whoami", "User": "admin"});
1787        let ev1 = Event::from_value(&v1);
1788        let r1 = engine.process_event_at(&ev1, ts);
1789        assert!(r1.correlations.is_empty());
1790
1791        // Now recon B fires — both rules have fired within window
1792        let v2 = json!({"CommandLine": "ipconfig /all", "User": "admin"});
1793        let ev2 = Event::from_value(&v2);
1794        let r2 = engine.process_event_at(&ev2, ts + 10);
1795        assert_eq!(r2.correlations.len(), 1);
1796        assert_eq!(r2.correlations[0].rule_title, "Recon Combo");
1797    }
1798
1799    // =========================================================================
1800    // Temporal ordered correlation
1801    // =========================================================================
1802
1803    #[test]
1804    fn test_temporal_ordered() {
1805        let yaml = r#"
1806title: Failed Login
1807id: failed-001
1808name: failed_login
1809logsource:
1810    category: auth
1811detection:
1812    selection:
1813        EventType: failed_login
1814    condition: selection
1815---
1816title: Success Login
1817id: success-001
1818name: successful_login
1819logsource:
1820    category: auth
1821detection:
1822    selection:
1823        EventType: success_login
1824    condition: selection
1825---
1826title: Brute Force Then Login
1827id: corr-bf
1828correlation:
1829    type: temporal_ordered
1830    rules:
1831        - failed-001
1832        - success-001
1833    group-by:
1834        - User
1835    timespan: 60s
1836    condition:
1837        gte: 2
1838level: critical
1839"#;
1840        let collection = parse_sigma_yaml(yaml).unwrap();
1841        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1842        engine.add_collection(&collection).unwrap();
1843
1844        let ts = 1000i64;
1845        // Failed login first
1846        let v1 = json!({"EventType": "failed_login", "User": "admin"});
1847        let ev1 = Event::from_value(&v1);
1848        let r1 = engine.process_event_at(&ev1, ts);
1849        assert!(r1.correlations.is_empty());
1850
1851        // Then successful login — correct order!
1852        let v2 = json!({"EventType": "success_login", "User": "admin"});
1853        let ev2 = Event::from_value(&v2);
1854        let r2 = engine.process_event_at(&ev2, ts + 10);
1855        assert_eq!(r2.correlations.len(), 1);
1856    }
1857
1858    #[test]
1859    fn test_temporal_ordered_wrong_order() {
1860        let yaml = r#"
1861title: Rule A
1862id: rule-a
1863logsource:
1864    category: test
1865detection:
1866    selection:
1867        type: a
1868    condition: selection
1869---
1870title: Rule B
1871id: rule-b
1872logsource:
1873    category: test
1874detection:
1875    selection:
1876        type: b
1877    condition: selection
1878---
1879title: A then B
1880id: corr-ab
1881correlation:
1882    type: temporal_ordered
1883    rules:
1884        - rule-a
1885        - rule-b
1886    group-by:
1887        - User
1888    timespan: 60s
1889    condition:
1890        gte: 2
1891level: high
1892"#;
1893        let collection = parse_sigma_yaml(yaml).unwrap();
1894        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1895        engine.add_collection(&collection).unwrap();
1896
1897        let ts = 1000i64;
1898        // B fires first, then A — wrong order
1899        let v1 = json!({"type": "b", "User": "admin"});
1900        let ev1 = Event::from_value(&v1);
1901        engine.process_event_at(&ev1, ts);
1902
1903        let v2 = json!({"type": "a", "User": "admin"});
1904        let ev2 = Event::from_value(&v2);
1905        let r2 = engine.process_event_at(&ev2, ts + 10);
1906        assert!(r2.correlations.is_empty());
1907    }
1908
1909    // =========================================================================
1910    // Numeric aggregation (value_sum, value_avg)
1911    // =========================================================================
1912
1913    #[test]
1914    fn test_value_sum() {
1915        let yaml = r#"
1916title: Web Access
1917id: web-001
1918logsource:
1919    category: web
1920detection:
1921    selection:
1922        action: upload
1923    condition: selection
1924---
1925title: Large Upload
1926id: corr-sum
1927correlation:
1928    type: value_sum
1929    rules:
1930        - web-001
1931    group-by:
1932        - User
1933    timespan: 60s
1934    condition:
1935        field: bytes_sent
1936        gt: 1000
1937level: high
1938"#;
1939        let collection = parse_sigma_yaml(yaml).unwrap();
1940        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1941        engine.add_collection(&collection).unwrap();
1942
1943        let ts = 1000i64;
1944        let v1 = json!({"action": "upload", "User": "alice", "bytes_sent": 600});
1945        let ev1 = Event::from_value(&v1);
1946        let r1 = engine.process_event_at(&ev1, ts);
1947        assert!(r1.correlations.is_empty());
1948
1949        let v2 = json!({"action": "upload", "User": "alice", "bytes_sent": 500});
1950        let ev2 = Event::from_value(&v2);
1951        let r2 = engine.process_event_at(&ev2, ts + 5);
1952        assert_eq!(r2.correlations.len(), 1);
1953        assert!((r2.correlations[0].aggregated_value - 1100.0).abs() < f64::EPSILON);
1954    }
1955
1956    #[test]
1957    fn test_value_avg() {
1958        let yaml = r#"
1959title: Request
1960id: req-001
1961logsource:
1962    category: web
1963detection:
1964    selection:
1965        type: request
1966    condition: selection
1967---
1968title: High Avg Latency
1969id: corr-avg
1970correlation:
1971    type: value_avg
1972    rules:
1973        - req-001
1974    group-by:
1975        - Service
1976    timespan: 60s
1977    condition:
1978        field: latency_ms
1979        gt: 500
1980level: medium
1981"#;
1982        let collection = parse_sigma_yaml(yaml).unwrap();
1983        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1984        engine.add_collection(&collection).unwrap();
1985
1986        let ts = 1000i64;
1987        // Avg of 400, 600, 800 = 600 > 500
1988        for (i, latency) in [400, 600, 800].iter().enumerate() {
1989            let v = json!({"type": "request", "Service": "api", "latency_ms": latency});
1990            let event = Event::from_value(&v);
1991            let r = engine.process_event_at(&event, ts + i as i64);
1992            if i == 2 {
1993                assert_eq!(r.correlations.len(), 1);
1994                assert!((r.correlations[0].aggregated_value - 600.0).abs() < f64::EPSILON);
1995            }
1996        }
1997    }
1998
1999    // =========================================================================
2000    // State management
2001    // =========================================================================
2002
2003    #[test]
2004    fn test_state_count() {
2005        let yaml = r#"
2006title: Base
2007id: base-sc
2008logsource:
2009    category: test
2010detection:
2011    selection:
2012        action: test
2013    condition: selection
2014---
2015title: Count
2016id: corr-sc
2017correlation:
2018    type: event_count
2019    rules:
2020        - base-sc
2021    group-by:
2022        - User
2023    timespan: 60s
2024    condition:
2025        gte: 100
2026level: low
2027"#;
2028        let collection = parse_sigma_yaml(yaml).unwrap();
2029        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2030        engine.add_collection(&collection).unwrap();
2031
2032        let v = json!({"action": "test", "User": "alice"});
2033        let event = Event::from_value(&v);
2034        engine.process_event_at(&event, 1000);
2035        assert_eq!(engine.state_count(), 1);
2036
2037        let v2 = json!({"action": "test", "User": "bob"});
2038        let event2 = Event::from_value(&v2);
2039        engine.process_event_at(&event2, 1001);
2040        assert_eq!(engine.state_count(), 2);
2041
2042        // Evict everything
2043        engine.evict_expired(2000);
2044        assert_eq!(engine.state_count(), 0);
2045    }
2046
2047    // =========================================================================
2048    // Generate flag
2049    // =========================================================================
2050
2051    #[test]
2052    fn test_generate_flag_default_false() {
2053        let yaml = r#"
2054title: Base
2055id: gen-base
2056logsource:
2057    category: test
2058detection:
2059    selection:
2060        action: test
2061    condition: selection
2062---
2063title: Correlation
2064id: gen-corr
2065correlation:
2066    type: event_count
2067    rules:
2068        - gen-base
2069    group-by:
2070        - User
2071    timespan: 60s
2072    condition:
2073        gte: 1
2074level: high
2075"#;
2076        let collection = parse_sigma_yaml(yaml).unwrap();
2077        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2078        engine.add_collection(&collection).unwrap();
2079
2080        // generate defaults to false — detection matches are still returned
2081        // (filtering by generate flag is a backend concern, not eval)
2082        let v = json!({"action": "test", "User": "alice"});
2083        let event = Event::from_value(&v);
2084        let r = engine.process_event_at(&event, 1000);
2085        assert_eq!(r.detections.len(), 1);
2086        assert_eq!(r.correlations.len(), 1);
2087    }
2088
2089    // =========================================================================
2090    // Real-world example: AWS bucket enumeration
2091    // =========================================================================
2092
2093    #[test]
2094    fn test_aws_bucket_enumeration() {
2095        let yaml = r#"
2096title: Potential Bucket Enumeration on AWS
2097id: f305fd62-beca-47da-ad95-7690a0620084
2098logsource:
2099    product: aws
2100    service: cloudtrail
2101detection:
2102    selection:
2103        eventSource: "s3.amazonaws.com"
2104        eventName: "ListBuckets"
2105    condition: selection
2106level: low
2107---
2108title: Multiple AWS bucket enumerations
2109id: be246094-01d3-4bba-88de-69e582eba0cc
2110status: experimental
2111correlation:
2112    type: event_count
2113    rules:
2114        - f305fd62-beca-47da-ad95-7690a0620084
2115    group-by:
2116        - userIdentity.arn
2117    timespan: 1h
2118    condition:
2119        gte: 5
2120level: high
2121"#;
2122        let collection = parse_sigma_yaml(yaml).unwrap();
2123        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2124        engine.add_collection(&collection).unwrap();
2125
2126        let base_ts = 1_700_000_000i64;
2127        for i in 0..5 {
2128            let v = json!({
2129                "eventSource": "s3.amazonaws.com",
2130                "eventName": "ListBuckets",
2131                "userIdentity.arn": "arn:aws:iam::123456789:user/attacker"
2132            });
2133            let event = Event::from_value(&v);
2134            let r = engine.process_event_at(&event, base_ts + i * 60);
2135            if i == 4 {
2136                assert_eq!(r.correlations.len(), 1);
2137                assert_eq!(
2138                    r.correlations[0].rule_title,
2139                    "Multiple AWS bucket enumerations"
2140                );
2141                assert_eq!(r.correlations[0].aggregated_value, 5.0);
2142            }
2143        }
2144    }
2145
2146    // =========================================================================
2147    // Chaining: event_count -> temporal_ordered
2148    // =========================================================================
2149
2150    #[test]
2151    fn test_chaining_event_count_to_temporal() {
2152        // Reproduces the spec's "failed logins followed by successful login" example.
2153        // Chain: failed_login (detection) -> many_failed (event_count) -> brute_then_login (temporal_ordered)
2154        let yaml = r#"
2155title: Single failed login
2156id: failed-login-chain
2157name: failed_login
2158logsource:
2159    category: auth
2160detection:
2161    selection:
2162        EventType: failed_login
2163    condition: selection
2164---
2165title: Successful login
2166id: success-login-chain
2167name: successful_login
2168logsource:
2169    category: auth
2170detection:
2171    selection:
2172        EventType: success_login
2173    condition: selection
2174---
2175title: Multiple failed logins
2176id: many-failed-chain
2177name: multiple_failed_login
2178correlation:
2179    type: event_count
2180    rules:
2181        - failed-login-chain
2182    group-by:
2183        - User
2184    timespan: 60s
2185    condition:
2186        gte: 3
2187level: medium
2188---
2189title: Brute Force Followed by Login
2190id: brute-force-chain
2191correlation:
2192    type: temporal_ordered
2193    rules:
2194        - many-failed-chain
2195        - success-login-chain
2196    group-by:
2197        - User
2198    timespan: 120s
2199    condition:
2200        gte: 2
2201level: critical
2202"#;
2203        let collection = parse_sigma_yaml(yaml).unwrap();
2204        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2205        engine.add_collection(&collection).unwrap();
2206
2207        assert_eq!(engine.detection_rule_count(), 2);
2208        assert_eq!(engine.correlation_rule_count(), 2);
2209
2210        let ts = 1000i64;
2211
2212        // Send 3 failed logins → triggers "many_failed_chain"
2213        for i in 0..3 {
2214            let v = json!({"EventType": "failed_login", "User": "victim"});
2215            let event = Event::from_value(&v);
2216            let r = engine.process_event_at(&event, ts + i);
2217            if i == 2 {
2218                // The event_count correlation should fire
2219                assert!(
2220                    r.correlations
2221                        .iter()
2222                        .any(|c| c.rule_title == "Multiple failed logins"),
2223                    "Expected event_count correlation to fire"
2224                );
2225            }
2226        }
2227
2228        // Now send a successful login → should trigger the chained temporal_ordered
2229        // Note: chaining happens in chain_correlations when many-failed-chain fires
2230        // and then success-login-chain matches the detection.
2231        // The temporal_ordered correlation needs BOTH many-failed-chain AND success-login-chain
2232        // to have fired. success-login-chain is a detection rule, not a correlation,
2233        // so it gets matched via the regular detection path.
2234        let v = json!({"EventType": "success_login", "User": "victim"});
2235        let event = Event::from_value(&v);
2236        let r = engine.process_event_at(&event, ts + 30);
2237
2238        // The detection should match
2239        assert_eq!(r.detections.len(), 1);
2240        assert_eq!(r.detections[0].rule_title, "Successful login");
2241    }
2242
2243    // =========================================================================
2244    // Field aliases
2245    // =========================================================================
2246
2247    #[test]
2248    fn test_field_aliases() {
2249        let yaml = r#"
2250title: Internal Error
2251id: internal-error-001
2252name: internal_error
2253logsource:
2254    category: web
2255detection:
2256    selection:
2257        http.response.status_code: 500
2258    condition: selection
2259---
2260title: New Connection
2261id: new-conn-001
2262name: new_network_connection
2263logsource:
2264    category: network
2265detection:
2266    selection:
2267        event.type: connection
2268    condition: selection
2269---
2270title: Error Then Connection
2271id: corr-alias
2272correlation:
2273    type: temporal
2274    rules:
2275        - internal-error-001
2276        - new-conn-001
2277    group-by:
2278        - internal_ip
2279    timespan: 60s
2280    condition:
2281        gte: 2
2282    aliases:
2283        internal_ip:
2284            internal_error: destination.ip
2285            new_network_connection: source.ip
2286level: high
2287"#;
2288        let collection = parse_sigma_yaml(yaml).unwrap();
2289        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2290        engine.add_collection(&collection).unwrap();
2291
2292        let ts = 1000i64;
2293
2294        // Internal error with destination.ip = 10.0.0.5
2295        let v1 = json!({
2296            "http.response.status_code": 500,
2297            "destination.ip": "10.0.0.5"
2298        });
2299        let ev1 = Event::from_value(&v1);
2300        let r1 = engine.process_event_at(&ev1, ts);
2301        assert_eq!(r1.detections.len(), 1);
2302        assert!(r1.correlations.is_empty());
2303
2304        // New connection with source.ip = 10.0.0.5 (same IP, aliased)
2305        let v2 = json!({
2306            "event.type": "connection",
2307            "source.ip": "10.0.0.5"
2308        });
2309        let ev2 = Event::from_value(&v2);
2310        let r2 = engine.process_event_at(&ev2, ts + 5);
2311        assert_eq!(r2.detections.len(), 1);
2312        // Both rules fired for the same internal_ip group → temporal should fire
2313        assert_eq!(r2.correlations.len(), 1);
2314        assert_eq!(r2.correlations[0].rule_title, "Error Then Connection");
2315        // Check group key contains the aliased field
2316        assert!(
2317            r2.correlations[0]
2318                .group_key
2319                .iter()
2320                .any(|(k, v)| k == "internal_ip" && v == "10.0.0.5")
2321        );
2322    }
2323
2324    // =========================================================================
2325    // Value percentile (basic smoke test)
2326    // =========================================================================
2327
2328    #[test]
2329    fn test_value_percentile() {
2330        let yaml = r#"
2331title: Process Creation
2332id: proc-001
2333logsource:
2334    category: process
2335detection:
2336    selection:
2337        type: process_creation
2338    condition: selection
2339---
2340title: Rare Process
2341id: corr-percentile
2342correlation:
2343    type: value_percentile
2344    rules:
2345        - proc-001
2346    group-by:
2347        - ComputerName
2348    timespan: 60s
2349    condition:
2350        field: image
2351        lte: 50
2352level: medium
2353"#;
2354        let collection = parse_sigma_yaml(yaml).unwrap();
2355        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2356        engine.add_collection(&collection).unwrap();
2357
2358        let ts = 1000i64;
2359        // Push some numeric-ish values for the image field
2360        for (i, val) in [10.0, 20.0, 30.0, 40.0, 50.0].iter().enumerate() {
2361            let v = json!({"type": "process_creation", "ComputerName": "srv01", "image": val});
2362            let event = Event::from_value(&v);
2363            let _ = engine.process_event_at(&event, ts + i as i64);
2364        }
2365        // The median (30.0) should be <= 50, so condition fires
2366        // Note: percentile implementation is simplified for in-memory eval
2367    }
2368
2369    // =========================================================================
2370    // Extended temporal conditions (end-to-end)
2371    // =========================================================================
2372
2373    #[test]
2374    fn test_extended_temporal_and_condition() {
2375        // Temporal correlation with "rule_a and rule_b" extended condition
2376        let yaml = r#"
2377title: Login Attempt
2378id: login-attempt
2379logsource:
2380    category: auth
2381detection:
2382    selection:
2383        EventType: login_failure
2384    condition: selection
2385---
2386title: Password Change
2387id: password-change
2388logsource:
2389    category: auth
2390detection:
2391    selection:
2392        EventType: password_change
2393    condition: selection
2394---
2395title: Credential Attack
2396correlation:
2397    type: temporal
2398    rules:
2399        - login-attempt
2400        - password-change
2401    group-by:
2402        - User
2403    timespan: 300s
2404    condition: login-attempt and password-change
2405level: high
2406"#;
2407        let collection = parse_sigma_yaml(yaml).unwrap();
2408        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2409        engine.add_collection(&collection).unwrap();
2410
2411        let ts = 1000i64;
2412
2413        // Login failure by alice
2414        let ev1 = json!({"EventType": "login_failure", "User": "alice"});
2415        let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2416        assert!(r1.correlations.is_empty(), "only one rule fired so far");
2417
2418        // Password change by alice — both rules have now fired
2419        let ev2 = json!({"EventType": "password_change", "User": "alice"});
2420        let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 10);
2421        assert_eq!(
2422            r2.correlations.len(),
2423            1,
2424            "temporal correlation should fire: both rules matched"
2425        );
2426        assert_eq!(r2.correlations[0].rule_title, "Credential Attack");
2427    }
2428
2429    #[test]
2430    fn test_extended_temporal_or_condition() {
2431        // Temporal with "rule_a or rule_b" — should fire when either fires
2432        let yaml = r#"
2433title: SSH Login
2434id: ssh-login
2435logsource:
2436    category: auth
2437detection:
2438    selection:
2439        EventType: ssh_login
2440    condition: selection
2441---
2442title: VPN Login
2443id: vpn-login
2444logsource:
2445    category: auth
2446detection:
2447    selection:
2448        EventType: vpn_login
2449    condition: selection
2450---
2451title: Any Remote Access
2452correlation:
2453    type: temporal
2454    rules:
2455        - ssh-login
2456        - vpn-login
2457    group-by:
2458        - User
2459    timespan: 60s
2460    condition: ssh-login or vpn-login
2461level: medium
2462"#;
2463        let collection = parse_sigma_yaml(yaml).unwrap();
2464        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2465        engine.add_collection(&collection).unwrap();
2466
2467        // Only SSH login by bob — "or" means this suffices
2468        let ev = json!({"EventType": "ssh_login", "User": "bob"});
2469        let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2470        assert_eq!(r.correlations.len(), 1);
2471        assert_eq!(r.correlations[0].rule_title, "Any Remote Access");
2472    }
2473
2474    #[test]
2475    fn test_extended_temporal_partial_and_no_fire() {
2476        // Temporal "and" with only one rule firing should not trigger
2477        let yaml = r#"
2478title: Recon Step 1
2479id: recon-1
2480logsource:
2481    category: process
2482detection:
2483    selection:
2484        CommandLine|contains: 'whoami'
2485    condition: selection
2486---
2487title: Recon Step 2
2488id: recon-2
2489logsource:
2490    category: process
2491detection:
2492    selection:
2493        CommandLine|contains: 'ipconfig'
2494    condition: selection
2495---
2496title: Full Recon
2497correlation:
2498    type: temporal
2499    rules:
2500        - recon-1
2501        - recon-2
2502    group-by:
2503        - Host
2504    timespan: 120s
2505    condition: recon-1 and recon-2
2506level: high
2507"#;
2508        let collection = parse_sigma_yaml(yaml).unwrap();
2509        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2510        engine.add_collection(&collection).unwrap();
2511
2512        // Only whoami (recon-1) — should not fire
2513        let ev = json!({"CommandLine": "whoami", "Host": "srv01"});
2514        let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2515        assert!(r.correlations.is_empty(), "only one of two AND rules fired");
2516
2517        // Now ipconfig (recon-2) — should fire
2518        let ev2 = json!({"CommandLine": "ipconfig /all", "Host": "srv01"});
2519        let r2 = engine.process_event_at(&Event::from_value(&ev2), 1010);
2520        assert_eq!(r2.correlations.len(), 1);
2521        assert_eq!(r2.correlations[0].rule_title, "Full Recon");
2522    }
2523
2524    // =========================================================================
2525    // Filter rules with correlation engine
2526    // =========================================================================
2527
2528    #[test]
2529    fn test_filter_with_correlation() {
2530        // Detection rule + filter + event_count correlation
2531        let yaml = r#"
2532title: Failed Auth
2533id: failed-auth
2534logsource:
2535    category: auth
2536detection:
2537    selection:
2538        EventType: auth_failure
2539    condition: selection
2540---
2541title: Exclude Service Accounts
2542filter:
2543    rules:
2544        - failed-auth
2545    selection:
2546        User|startswith: 'svc_'
2547    condition: selection
2548---
2549title: Brute Force
2550correlation:
2551    type: event_count
2552    rules:
2553        - failed-auth
2554    group-by:
2555        - User
2556    timespan: 300s
2557    condition:
2558        gte: 3
2559level: critical
2560"#;
2561        let collection = parse_sigma_yaml(yaml).unwrap();
2562        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2563        engine.add_collection(&collection).unwrap();
2564
2565        let ts = 1000i64;
2566
2567        // Service account failures should be filtered — don't count
2568        for i in 0..5 {
2569            let ev = json!({"EventType": "auth_failure", "User": "svc_backup"});
2570            let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2571            assert!(
2572                r.correlations.is_empty(),
2573                "service account should be filtered, no correlation"
2574            );
2575        }
2576
2577        // Normal user failures should count
2578        for i in 0..2 {
2579            let ev = json!({"EventType": "auth_failure", "User": "alice"});
2580            let r = engine.process_event_at(&Event::from_value(&ev), ts + 10 + i);
2581            assert!(r.correlations.is_empty(), "not yet 3 events");
2582        }
2583
2584        // Third failure triggers correlation
2585        let ev = json!({"EventType": "auth_failure", "User": "alice"});
2586        let r = engine.process_event_at(&Event::from_value(&ev), ts + 12);
2587        assert_eq!(r.correlations.len(), 1);
2588        assert_eq!(r.correlations[0].rule_title, "Brute Force");
2589    }
2590
2591    // =========================================================================
2592    // action: repeat with correlation engine
2593    // =========================================================================
2594
2595    #[test]
2596    fn test_repeat_rules_in_correlation() {
2597        // Two detection rules via repeat, both feed into event_count
2598        let yaml = r#"
2599title: File Access A
2600id: file-a
2601logsource:
2602    category: file_access
2603detection:
2604    selection:
2605        FileName|endswith: '.docx'
2606    condition: selection
2607---
2608action: repeat
2609title: File Access B
2610id: file-b
2611detection:
2612    selection:
2613        FileName|endswith: '.xlsx'
2614    condition: selection
2615---
2616title: Mass File Access
2617correlation:
2618    type: event_count
2619    rules:
2620        - file-a
2621        - file-b
2622    group-by:
2623        - User
2624    timespan: 60s
2625    condition:
2626        gte: 3
2627level: high
2628"#;
2629        let collection = parse_sigma_yaml(yaml).unwrap();
2630        assert_eq!(collection.rules.len(), 2);
2631        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2632        engine.add_collection(&collection).unwrap();
2633        assert_eq!(engine.detection_rule_count(), 2);
2634
2635        let ts = 1000i64;
2636        // Mix of docx and xlsx accesses by same user
2637        let ev1 = json!({"FileName": "report.docx", "User": "bob"});
2638        engine.process_event_at(&Event::from_value(&ev1), ts);
2639        let ev2 = json!({"FileName": "data.xlsx", "User": "bob"});
2640        engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2641        let ev3 = json!({"FileName": "notes.docx", "User": "bob"});
2642        let r = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2643
2644        assert_eq!(r.correlations.len(), 1);
2645        assert_eq!(r.correlations[0].rule_title, "Mass File Access");
2646    }
2647
2648    // =========================================================================
2649    // Expand modifier with correlation engine
2650    // =========================================================================
2651
2652    #[test]
2653    fn test_expand_modifier_with_correlation() {
2654        let yaml = r#"
2655title: User Temp File
2656id: user-temp
2657logsource:
2658    category: file_access
2659detection:
2660    selection:
2661        FilePath|expand: 'C:\Users\%User%\Temp'
2662    condition: selection
2663---
2664title: Excessive Temp Access
2665correlation:
2666    type: event_count
2667    rules:
2668        - user-temp
2669    group-by:
2670        - User
2671    timespan: 60s
2672    condition:
2673        gte: 2
2674level: medium
2675"#;
2676        let collection = parse_sigma_yaml(yaml).unwrap();
2677        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2678        engine.add_collection(&collection).unwrap();
2679
2680        let ts = 1000i64;
2681        // Event where User field matches the placeholder
2682        let ev1 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2683        let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2684        assert!(r1.correlations.is_empty());
2685
2686        let ev2 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2687        let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2688        assert_eq!(r2.correlations.len(), 1);
2689        assert_eq!(r2.correlations[0].rule_title, "Excessive Temp Access");
2690
2691        // Different user — should NOT match (path says alice, user is bob)
2692        let ev3 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "bob"});
2693        let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2694        // Detection doesn't fire for this event since expand resolves to C:\Users\bob\Temp
2695        assert_eq!(r3.detections.len(), 0);
2696    }
2697
2698    // =========================================================================
2699    // Timestamp modifier with correlation engine
2700    // =========================================================================
2701
2702    #[test]
2703    fn test_timestamp_modifier_with_correlation() {
2704        let yaml = r#"
2705title: Night Login
2706id: night-login
2707logsource:
2708    category: auth
2709detection:
2710    login:
2711        EventType: login
2712    night:
2713        Timestamp|hour: 3
2714    condition: login and night
2715---
2716title: Frequent Night Logins
2717correlation:
2718    type: event_count
2719    rules:
2720        - night-login
2721    group-by:
2722        - User
2723    timespan: 3600s
2724    condition:
2725        gte: 2
2726level: high
2727"#;
2728        let collection = parse_sigma_yaml(yaml).unwrap();
2729        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2730        engine.add_collection(&collection).unwrap();
2731
2732        let ts = 1000i64;
2733        // Login at 3AM
2734        let ev1 =
2735            json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:10:00Z"});
2736        let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2737        assert_eq!(r1.detections.len(), 1);
2738        assert!(r1.correlations.is_empty());
2739
2740        let ev2 =
2741            json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:45:00Z"});
2742        let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2743        assert_eq!(r2.correlations.len(), 1);
2744        assert_eq!(r2.correlations[0].rule_title, "Frequent Night Logins");
2745
2746        // Login at noon — should NOT count
2747        let ev3 = json!({"EventType": "login", "User": "bob", "Timestamp": "2024-01-15T12:00:00Z"});
2748        let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2749        assert!(
2750            r3.detections.is_empty(),
2751            "noon login should not match night rule"
2752        );
2753    }
2754
2755    // =========================================================================
2756    // Correlation condition range (multiple predicates)
2757    // =========================================================================
2758
2759    #[test]
2760    fn test_event_count_range_condition() {
2761        let yaml = r#"
2762title: Login Attempt
2763id: login-attempt-001
2764name: login_attempt
2765logsource:
2766    product: windows
2767detection:
2768    selection:
2769        EventType: login
2770    condition: selection
2771level: low
2772---
2773title: Login Count Range
2774id: corr-range-001
2775correlation:
2776    type: event_count
2777    rules:
2778        - login-attempt-001
2779    group-by:
2780        - User
2781    timespan: 3600s
2782    condition:
2783        gt: 2
2784        lte: 5
2785level: high
2786"#;
2787        let collection = parse_sigma_yaml(yaml).unwrap();
2788        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2789        engine.add_collection(&collection).unwrap();
2790
2791        let ts: i64 = 1_000_000;
2792
2793        // Send 2 events — gt:2 is false
2794        for i in 0..2 {
2795            let ev = json!({"EventType": "login", "User": "alice"});
2796            let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2797            assert!(r.correlations.is_empty(), "2 events should not fire (gt:2)");
2798        }
2799
2800        // 3rd event — gt:2 is true, lte:5 is true → fires
2801        let ev3 = json!({"EventType": "login", "User": "alice"});
2802        let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 3);
2803        assert_eq!(r3.correlations.len(), 1, "3 events: gt:2 AND lte:5");
2804
2805        // Send events 4, 5 — still in range
2806        for i in 4..=5 {
2807            let ev = json!({"EventType": "login", "User": "alice"});
2808            let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2809            assert_eq!(r.correlations.len(), 1, "{i} events still in range");
2810        }
2811
2812        // 6th event — lte:5 is false → no fire
2813        let ev6 = json!({"EventType": "login", "User": "alice"});
2814        let r6 = engine.process_event_at(&Event::from_value(&ev6), ts + 6);
2815        assert!(
2816            r6.correlations.is_empty(),
2817            "6 events exceeds lte:5, should not fire"
2818        );
2819    }
2820
2821    // =========================================================================
2822    // Suppression
2823    // =========================================================================
2824
2825    fn suppression_yaml() -> &'static str {
2826        r#"
2827title: Login
2828id: login-base
2829logsource:
2830    category: auth
2831detection:
2832    selection:
2833        EventType: login
2834    condition: selection
2835---
2836title: Many Logins
2837correlation:
2838    type: event_count
2839    rules:
2840        - login-base
2841    group-by:
2842        - User
2843    timeframe: 60s
2844    condition:
2845        gte: 3
2846level: high
2847"#
2848    }
2849
2850    #[test]
2851    fn test_suppression_window() {
2852        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2853        let config = CorrelationConfig {
2854            suppress: Some(10), // suppress for 10 seconds
2855            ..Default::default()
2856        };
2857        let mut engine = CorrelationEngine::new(config);
2858        engine.add_collection(&collection).unwrap();
2859
2860        let ev = json!({"EventType": "login", "User": "alice"});
2861        let ts = 1000;
2862
2863        // Fire 3 events to hit threshold
2864        engine.process_event_at(&Event::from_value(&ev), ts);
2865        engine.process_event_at(&Event::from_value(&ev), ts + 1);
2866        let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
2867        assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
2868
2869        // 4th event within suppress window → suppressed
2870        let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
2871        assert!(
2872            r4.correlations.is_empty(),
2873            "should be suppressed within 10s window"
2874        );
2875
2876        // 5th event still within suppress window → suppressed
2877        let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 9);
2878        assert!(
2879            r5.correlations.is_empty(),
2880            "should be suppressed at ts+9 (< ts+2+10)"
2881        );
2882
2883        // Event after suppress window expires → fires again
2884        let r6 = engine.process_event_at(&Event::from_value(&ev), ts + 13);
2885        assert_eq!(
2886            r6.correlations.len(),
2887            1,
2888            "should fire again after suppress window expires"
2889        );
2890    }
2891
2892    #[test]
2893    fn test_suppression_per_group_key() {
2894        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2895        let config = CorrelationConfig {
2896            suppress: Some(60),
2897            ..Default::default()
2898        };
2899        let mut engine = CorrelationEngine::new(config);
2900        engine.add_collection(&collection).unwrap();
2901
2902        let ts = 1000;
2903
2904        // Alice hits threshold
2905        let ev_a = json!({"EventType": "login", "User": "alice"});
2906        engine.process_event_at(&Event::from_value(&ev_a), ts);
2907        engine.process_event_at(&Event::from_value(&ev_a), ts + 1);
2908        let r = engine.process_event_at(&Event::from_value(&ev_a), ts + 2);
2909        assert_eq!(r.correlations.len(), 1, "alice should fire");
2910
2911        // Bob hits threshold — different group key, not suppressed
2912        let ev_b = json!({"EventType": "login", "User": "bob"});
2913        engine.process_event_at(&Event::from_value(&ev_b), ts + 3);
2914        engine.process_event_at(&Event::from_value(&ev_b), ts + 4);
2915        let r = engine.process_event_at(&Event::from_value(&ev_b), ts + 5);
2916        assert_eq!(r.correlations.len(), 1, "bob should fire independently");
2917
2918        // Alice is still suppressed
2919        let r = engine.process_event_at(&Event::from_value(&ev_a), ts + 6);
2920        assert!(r.correlations.is_empty(), "alice still suppressed");
2921    }
2922
2923    // =========================================================================
2924    // Action on match: Reset
2925    // =========================================================================
2926
2927    #[test]
2928    fn test_action_reset() {
2929        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2930        let config = CorrelationConfig {
2931            action_on_match: CorrelationAction::Reset,
2932            ..Default::default()
2933        };
2934        let mut engine = CorrelationEngine::new(config);
2935        engine.add_collection(&collection).unwrap();
2936
2937        let ev = json!({"EventType": "login", "User": "alice"});
2938        let ts = 1000;
2939
2940        // Hit threshold: 3 events
2941        engine.process_event_at(&Event::from_value(&ev), ts);
2942        engine.process_event_at(&Event::from_value(&ev), ts + 1);
2943        let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
2944        assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
2945
2946        // State was reset, so 4th and 5th events should NOT fire
2947        let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
2948        assert!(r4.correlations.is_empty(), "reset: need 3 more events");
2949
2950        let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 4);
2951        assert!(r5.correlations.is_empty(), "reset: still only 2");
2952
2953        // 6th event (3rd after reset) should fire again
2954        let r6 = engine.process_event_at(&Event::from_value(&ev), ts + 5);
2955        assert_eq!(
2956            r6.correlations.len(),
2957            1,
2958            "should fire again after 3 events post-reset"
2959        );
2960    }
2961
2962    // =========================================================================
2963    // Generate flag / emit_detections
2964    // =========================================================================
2965
2966    #[test]
2967    fn test_emit_detections_true_by_default() {
2968        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2969        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2970        engine.add_collection(&collection).unwrap();
2971
2972        let ev = json!({"EventType": "login", "User": "alice"});
2973        let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2974        assert_eq!(r.detections.len(), 1, "by default detections are emitted");
2975    }
2976
2977    #[test]
2978    fn test_emit_detections_false_suppresses() {
2979        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2980        let config = CorrelationConfig {
2981            emit_detections: false,
2982            ..Default::default()
2983        };
2984        let mut engine = CorrelationEngine::new(config);
2985        engine.add_collection(&collection).unwrap();
2986
2987        let ev = json!({"EventType": "login", "User": "alice"});
2988        let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2989        assert!(
2990            r.detections.is_empty(),
2991            "detection matches should be suppressed when emit_detections=false"
2992        );
2993    }
2994
2995    #[test]
2996    fn test_generate_true_keeps_detections() {
2997        // When generate: true, detections should be emitted even with emit_detections=false
2998        let yaml = r#"
2999title: Login
3000id: login-gen
3001logsource:
3002    category: auth
3003detection:
3004    selection:
3005        EventType: login
3006    condition: selection
3007---
3008title: Many Logins
3009correlation:
3010    type: event_count
3011    rules:
3012        - login-gen
3013    group-by:
3014        - User
3015    timeframe: 60s
3016    condition:
3017        gte: 3
3018    generate: true
3019level: high
3020"#;
3021        let collection = parse_sigma_yaml(yaml).unwrap();
3022        let config = CorrelationConfig {
3023            emit_detections: false,
3024            ..Default::default()
3025        };
3026        let mut engine = CorrelationEngine::new(config);
3027        engine.add_collection(&collection).unwrap();
3028
3029        let ev = json!({"EventType": "login", "User": "alice"});
3030        let r = engine.process_event_at(&Event::from_value(&ev), 1000);
3031        // generate: true means this rule is NOT correlation-only
3032        assert_eq!(
3033            r.detections.len(),
3034            1,
3035            "generate:true keeps detection output"
3036        );
3037    }
3038
3039    // =========================================================================
3040    // Suppression + Reset combined
3041    // =========================================================================
3042
3043    #[test]
3044    fn test_suppress_and_reset_combined() {
3045        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3046        let config = CorrelationConfig {
3047            suppress: Some(5),
3048            action_on_match: CorrelationAction::Reset,
3049            ..Default::default()
3050        };
3051        let mut engine = CorrelationEngine::new(config);
3052        engine.add_collection(&collection).unwrap();
3053
3054        let ev = json!({"EventType": "login", "User": "alice"});
3055        let ts = 1000;
3056
3057        // Hit threshold: fires and resets
3058        engine.process_event_at(&Event::from_value(&ev), ts);
3059        engine.process_event_at(&Event::from_value(&ev), ts + 1);
3060        let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
3061        assert_eq!(r3.correlations.len(), 1, "fires on 3rd event");
3062
3063        // Push 3 more events quickly (state was reset, so new count → 3)
3064        // but suppress window hasn't expired (ts+2 + 5 = ts+7)
3065        engine.process_event_at(&Event::from_value(&ev), ts + 3);
3066        engine.process_event_at(&Event::from_value(&ev), ts + 4);
3067        let r = engine.process_event_at(&Event::from_value(&ev), ts + 5);
3068        assert!(
3069            r.correlations.is_empty(),
3070            "threshold met again but still suppressed"
3071        );
3072
3073        // After suppress expires (at ts+8, which is ts+2+6 > suppress=5),
3074        // the accumulated events from step 2 (ts+3,4,5) still satisfy gte:3,
3075        // so the first event after expiry fires immediately and resets.
3076        let r = engine.process_event_at(&Event::from_value(&ev), ts + 8);
3077        assert_eq!(
3078            r.correlations.len(),
3079            1,
3080            "fires after suppress expires (accumulated events + new one)"
3081        );
3082
3083        // State was reset again at ts+8, suppress window now ts+8..ts+13.
3084        // Need 3 new events to fire, and suppress must expire.
3085        engine.process_event_at(&Event::from_value(&ev), ts + 9);
3086        engine.process_event_at(&Event::from_value(&ev), ts + 10);
3087        let r = engine.process_event_at(&Event::from_value(&ev), ts + 11);
3088        assert!(
3089            r.correlations.is_empty(),
3090            "threshold met but suppress window hasn't expired (ts+11 - ts+8 = 3 < 5)"
3091        );
3092    }
3093
3094    // =========================================================================
3095    // No suppression (default behavior preserved)
3096    // =========================================================================
3097
3098    #[test]
3099    fn test_no_suppression_fires_every_event() {
3100        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3101        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3102        engine.add_collection(&collection).unwrap();
3103
3104        let ev = json!({"EventType": "login", "User": "alice"});
3105        let ts = 1000;
3106
3107        engine.process_event_at(&Event::from_value(&ev), ts);
3108        engine.process_event_at(&Event::from_value(&ev), ts + 1);
3109        let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
3110        assert_eq!(r3.correlations.len(), 1);
3111
3112        // Without suppression, 4th event should also fire
3113        let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
3114        assert_eq!(
3115            r4.correlations.len(),
3116            1,
3117            "no suppression: fires on every event after threshold"
3118        );
3119
3120        let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 4);
3121        assert_eq!(r5.correlations.len(), 1, "still fires");
3122    }
3123
3124    // =========================================================================
3125    // Custom attribute → engine config tests
3126    // =========================================================================
3127
3128    #[test]
3129    fn test_custom_attr_timestamp_field() {
3130        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3131        let mut attrs = std::collections::HashMap::new();
3132        attrs.insert("rsigma.timestamp_field".to_string(), "time".to_string());
3133        engine.apply_custom_attributes(&attrs);
3134
3135        assert_eq!(
3136            engine.config.timestamp_fields[0], "time",
3137            "rsigma.timestamp_field should be prepended"
3138        );
3139        // Defaults should still be there after the custom one
3140        assert!(
3141            engine
3142                .config
3143                .timestamp_fields
3144                .contains(&"@timestamp".to_string())
3145        );
3146    }
3147
3148    #[test]
3149    fn test_custom_attr_timestamp_field_no_duplicates() {
3150        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3151        let mut attrs = std::collections::HashMap::new();
3152        attrs.insert("rsigma.timestamp_field".to_string(), "time".to_string());
3153        // Apply twice — should not duplicate
3154        engine.apply_custom_attributes(&attrs);
3155        engine.apply_custom_attributes(&attrs);
3156
3157        let count = engine
3158            .config
3159            .timestamp_fields
3160            .iter()
3161            .filter(|f| *f == "time")
3162            .count();
3163        assert_eq!(count, 1, "should not duplicate timestamp_field entries");
3164    }
3165
3166    #[test]
3167    fn test_custom_attr_suppress() {
3168        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3169        assert!(engine.config.suppress.is_none());
3170
3171        let mut attrs = std::collections::HashMap::new();
3172        attrs.insert("rsigma.suppress".to_string(), "5m".to_string());
3173        engine.apply_custom_attributes(&attrs);
3174
3175        assert_eq!(engine.config.suppress, Some(300));
3176    }
3177
3178    #[test]
3179    fn test_custom_attr_suppress_does_not_override_cli() {
3180        let config = CorrelationConfig {
3181            suppress: Some(60), // CLI set to 60s
3182            ..Default::default()
3183        };
3184        let mut engine = CorrelationEngine::new(config);
3185
3186        let mut attrs = std::collections::HashMap::new();
3187        attrs.insert("rsigma.suppress".to_string(), "5m".to_string());
3188        engine.apply_custom_attributes(&attrs);
3189
3190        assert_eq!(
3191            engine.config.suppress,
3192            Some(60),
3193            "CLI suppress should not be overridden by custom attribute"
3194        );
3195    }
3196
3197    #[test]
3198    fn test_custom_attr_action() {
3199        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3200        assert_eq!(engine.config.action_on_match, CorrelationAction::Alert);
3201
3202        let mut attrs = std::collections::HashMap::new();
3203        attrs.insert("rsigma.action".to_string(), "reset".to_string());
3204        engine.apply_custom_attributes(&attrs);
3205
3206        assert_eq!(engine.config.action_on_match, CorrelationAction::Reset);
3207    }
3208
3209    #[test]
3210    fn test_custom_attr_action_does_not_override_cli() {
3211        let config = CorrelationConfig {
3212            action_on_match: CorrelationAction::Reset, // CLI set to reset
3213            ..Default::default()
3214        };
3215        let mut engine = CorrelationEngine::new(config);
3216
3217        let mut attrs = std::collections::HashMap::new();
3218        attrs.insert("rsigma.action".to_string(), "alert".to_string());
3219        engine.apply_custom_attributes(&attrs);
3220
3221        assert_eq!(
3222            engine.config.action_on_match,
3223            CorrelationAction::Reset,
3224            "CLI action should not be overridden by custom attribute"
3225        );
3226    }
3227
3228    #[test]
3229    fn test_custom_attr_timestamp_field_used_for_extraction() {
3230        // The event has "time" but not "@timestamp" or "timestamp"
3231        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3232        let mut config = CorrelationConfig::default();
3233        // Prepend "event_time" to simulate --timestamp-field
3234        config.timestamp_fields.insert(0, "event_time".to_string());
3235        let mut engine = CorrelationEngine::new(config);
3236        engine.add_collection(&collection).unwrap();
3237
3238        // Event with "event_time" field
3239        let ev = json!({
3240            "EventType": "login",
3241            "User": "alice",
3242            "event_time": "2026-02-11T12:00:00Z"
3243        });
3244        let result = engine.process_event(&Event::from_value(&ev));
3245
3246        // The detection should match, and timestamp should be ~1739275200 (2026-02-11)
3247        assert!(!result.detections.is_empty() || result.correlations.is_empty());
3248        // The key test: ensure the engine extracted the event timestamp, not Utc::now.
3249        // If it used Utc::now, the test would still pass but the timestamp would be
3250        // wildly different. We verify by checking the extracted value directly.
3251        let ts = engine
3252            .extract_event_timestamp(&Event::from_value(&ev))
3253            .expect("should extract timestamp");
3254        assert!(
3255            ts > 1_700_000_000 && ts < 1_800_000_000,
3256            "timestamp should be ~2026 epoch, got {ts}"
3257        );
3258    }
3259
3260    // =========================================================================
3261    // Cycle detection
3262    // =========================================================================
3263
3264    #[test]
3265    fn test_correlation_cycle_direct() {
3266        // Two correlations that reference each other: A -> B -> A
3267        let yaml = r#"
3268title: detection rule
3269id: det-rule
3270logsource:
3271    product: test
3272detection:
3273    selection:
3274        action: click
3275    condition: selection
3276level: low
3277---
3278title: correlation A
3279id: corr-a
3280correlation:
3281    type: event_count
3282    rules:
3283        - corr-b
3284    group-by:
3285        - User
3286    timespan: 5m
3287    condition:
3288        gte: 2
3289level: high
3290---
3291title: correlation B
3292id: corr-b
3293correlation:
3294    type: event_count
3295    rules:
3296        - corr-a
3297    group-by:
3298        - User
3299    timespan: 5m
3300    condition:
3301        gte: 2
3302level: high
3303"#;
3304        let collection = parse_sigma_yaml(yaml).unwrap();
3305        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3306        let result = engine.add_collection(&collection);
3307        assert!(result.is_err(), "should detect direct cycle");
3308        let err = result.unwrap_err().to_string();
3309        assert!(err.contains("cycle"), "error should mention cycle: {err}");
3310        assert!(
3311            err.contains("corr-a") && err.contains("corr-b"),
3312            "error should name both correlations: {err}"
3313        );
3314    }
3315
3316    #[test]
3317    fn test_correlation_cycle_self() {
3318        // A correlation that references itself
3319        let yaml = r#"
3320title: detection rule
3321id: det-rule
3322logsource:
3323    product: test
3324detection:
3325    selection:
3326        action: click
3327    condition: selection
3328level: low
3329---
3330title: self-ref correlation
3331id: self-corr
3332correlation:
3333    type: event_count
3334    rules:
3335        - self-corr
3336    group-by:
3337        - User
3338    timespan: 5m
3339    condition:
3340        gte: 2
3341level: high
3342"#;
3343        let collection = parse_sigma_yaml(yaml).unwrap();
3344        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3345        let result = engine.add_collection(&collection);
3346        assert!(result.is_err(), "should detect self-referencing cycle");
3347        let err = result.unwrap_err().to_string();
3348        assert!(err.contains("cycle"), "error should mention cycle: {err}");
3349        assert!(
3350            err.contains("self-corr"),
3351            "error should name the correlation: {err}"
3352        );
3353    }
3354
3355    #[test]
3356    fn test_correlation_no_cycle_valid_chain() {
3357        // Valid chain: detection -> corr-A -> corr-B (no cycle)
3358        let yaml = r#"
3359title: detection rule
3360id: det-rule
3361logsource:
3362    product: test
3363detection:
3364    selection:
3365        action: click
3366    condition: selection
3367level: low
3368---
3369title: correlation A
3370id: corr-a
3371correlation:
3372    type: event_count
3373    rules:
3374        - det-rule
3375    group-by:
3376        - User
3377    timespan: 5m
3378    condition:
3379        gte: 2
3380level: high
3381---
3382title: correlation B
3383id: corr-b
3384correlation:
3385    type: event_count
3386    rules:
3387        - corr-a
3388    group-by:
3389        - User
3390    timespan: 5m
3391    condition:
3392        gte: 2
3393level: high
3394"#;
3395        let collection = parse_sigma_yaml(yaml).unwrap();
3396        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3397        let result = engine.add_collection(&collection);
3398        assert!(
3399            result.is_ok(),
3400            "valid chain should not be rejected: {result:?}"
3401        );
3402    }
3403
3404    #[test]
3405    fn test_correlation_cycle_transitive() {
3406        // Transitive cycle: A -> B -> C -> A
3407        let yaml = r#"
3408title: detection rule
3409id: det-rule
3410logsource:
3411    product: test
3412detection:
3413    selection:
3414        action: click
3415    condition: selection
3416level: low
3417---
3418title: correlation A
3419id: corr-a
3420correlation:
3421    type: event_count
3422    rules:
3423        - corr-c
3424    group-by:
3425        - User
3426    timespan: 5m
3427    condition:
3428        gte: 2
3429level: high
3430---
3431title: correlation B
3432id: corr-b
3433correlation:
3434    type: event_count
3435    rules:
3436        - corr-a
3437    group-by:
3438        - User
3439    timespan: 5m
3440    condition:
3441        gte: 2
3442level: high
3443---
3444title: correlation C
3445id: corr-c
3446correlation:
3447    type: event_count
3448    rules:
3449        - corr-b
3450    group-by:
3451        - User
3452    timespan: 5m
3453    condition:
3454        gte: 2
3455level: high
3456"#;
3457        let collection = parse_sigma_yaml(yaml).unwrap();
3458        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3459        let result = engine.add_collection(&collection);
3460        assert!(result.is_err(), "should detect transitive cycle");
3461        let err = result.unwrap_err().to_string();
3462        assert!(err.contains("cycle"), "error should mention cycle: {err}");
3463    }
3464
3465    // =========================================================================
3466    // Correlation event inclusion tests
3467    // =========================================================================
3468
3469    #[test]
3470    fn test_correlation_events_disabled_by_default() {
3471        let yaml = r#"
3472title: Login
3473id: login-rule
3474logsource:
3475    category: auth
3476detection:
3477    selection:
3478        EventType: login
3479    condition: selection
3480---
3481title: Many Logins
3482correlation:
3483    type: event_count
3484    rules:
3485        - login-rule
3486    group-by:
3487        - User
3488    timespan: 60s
3489    condition:
3490        gte: 3
3491level: high
3492"#;
3493        let collection = parse_sigma_yaml(yaml).unwrap();
3494        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3495        engine.add_collection(&collection).unwrap();
3496
3497        for i in 0..3 {
3498            let v = json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i});
3499            let event = Event::from_value(&v);
3500            let result = engine.process_event_at(&event, 1000 + i);
3501            if i == 2 {
3502                assert_eq!(result.correlations.len(), 1);
3503                // Events should NOT be included by default
3504                assert!(result.correlations[0].events.is_none());
3505            }
3506        }
3507    }
3508
3509    #[test]
3510    fn test_correlation_events_included_when_enabled() {
3511        let yaml = r#"
3512title: Login
3513id: login-rule
3514logsource:
3515    category: auth
3516detection:
3517    selection:
3518        EventType: login
3519    condition: selection
3520---
3521title: Many Logins
3522correlation:
3523    type: event_count
3524    rules:
3525        - login-rule
3526    group-by:
3527        - User
3528    timespan: 60s
3529    condition:
3530        gte: 3
3531level: high
3532"#;
3533        let collection = parse_sigma_yaml(yaml).unwrap();
3534        let config = CorrelationConfig {
3535            correlation_event_mode: CorrelationEventMode::Full,
3536            max_correlation_events: 10,
3537            ..Default::default()
3538        };
3539        let mut engine = CorrelationEngine::new(config);
3540        engine.add_collection(&collection).unwrap();
3541
3542        let events_sent: Vec<serde_json::Value> = (0..3)
3543            .map(|i| json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i}))
3544            .collect();
3545
3546        let mut corr_result = None;
3547        for (i, ev) in events_sent.iter().enumerate() {
3548            let event = Event::from_value(ev);
3549            let result = engine.process_event_at(&event, 1000 + i as i64);
3550            if !result.correlations.is_empty() {
3551                corr_result = Some(result);
3552            }
3553        }
3554
3555        let result = corr_result.expect("correlation should have fired");
3556        let corr = &result.correlations[0];
3557
3558        // Events should be included
3559        let events = corr.events.as_ref().expect("events should be present");
3560        assert_eq!(
3561            events.len(),
3562            3,
3563            "all 3 contributing events should be stored"
3564        );
3565
3566        // Verify all sent events are present
3567        for (i, event) in events.iter().enumerate() {
3568            assert_eq!(event["EventType"], "login");
3569            assert_eq!(event["User"], "admin");
3570            assert_eq!(event["@timestamp"], 1000 + i as i64);
3571        }
3572    }
3573
3574    #[test]
3575    fn test_correlation_events_max_cap() {
3576        let yaml = r#"
3577title: Login
3578id: login-rule
3579logsource:
3580    category: auth
3581detection:
3582    selection:
3583        EventType: login
3584    condition: selection
3585---
3586title: Many Logins
3587correlation:
3588    type: event_count
3589    rules:
3590        - login-rule
3591    group-by:
3592        - User
3593    timespan: 60s
3594    condition:
3595        gte: 5
3596level: high
3597"#;
3598        let collection = parse_sigma_yaml(yaml).unwrap();
3599        let config = CorrelationConfig {
3600            correlation_event_mode: CorrelationEventMode::Full,
3601            max_correlation_events: 3, // only keep last 3
3602            ..Default::default()
3603        };
3604        let mut engine = CorrelationEngine::new(config);
3605        engine.add_collection(&collection).unwrap();
3606
3607        let mut corr_result = None;
3608        for i in 0..5 {
3609            let v = json!({"EventType": "login", "User": "admin", "idx": i});
3610            let event = Event::from_value(&v);
3611            let result = engine.process_event_at(&event, 1000 + i);
3612            if !result.correlations.is_empty() {
3613                corr_result = Some(result);
3614            }
3615        }
3616
3617        let result = corr_result.expect("correlation should have fired");
3618        let events = result.correlations[0]
3619            .events
3620            .as_ref()
3621            .expect("events should be present");
3622
3623        // Only the last 3 events should be retained (cap = 3)
3624        assert_eq!(events.len(), 3);
3625        assert_eq!(events[0]["idx"], 2);
3626        assert_eq!(events[1]["idx"], 3);
3627        assert_eq!(events[2]["idx"], 4);
3628    }
3629
3630    #[test]
3631    fn test_correlation_events_with_reset_action() {
3632        let yaml = r#"
3633title: Login
3634id: login-rule
3635logsource:
3636    category: auth
3637detection:
3638    selection:
3639        EventType: login
3640    condition: selection
3641---
3642title: Many Logins
3643correlation:
3644    type: event_count
3645    rules:
3646        - login-rule
3647    group-by:
3648        - User
3649    timespan: 60s
3650    condition:
3651        gte: 2
3652level: high
3653"#;
3654        let collection = parse_sigma_yaml(yaml).unwrap();
3655        let config = CorrelationConfig {
3656            correlation_event_mode: CorrelationEventMode::Full,
3657            action_on_match: CorrelationAction::Reset,
3658            ..Default::default()
3659        };
3660        let mut engine = CorrelationEngine::new(config);
3661        engine.add_collection(&collection).unwrap();
3662
3663        // First round: 2 events -> fires
3664        for i in 0..2 {
3665            let v = json!({"EventType": "login", "User": "admin", "round": 1, "idx": i});
3666            let event = Event::from_value(&v);
3667            let result = engine.process_event_at(&event, 1000 + i);
3668            if i == 1 {
3669                assert_eq!(result.correlations.len(), 1);
3670                let events = result.correlations[0].events.as_ref().unwrap();
3671                assert_eq!(events.len(), 2);
3672            }
3673        }
3674
3675        // After reset, event buffer should be cleared.
3676        // Second round: need 2 more events to fire again
3677        let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 0});
3678        let event = Event::from_value(&v);
3679        let result = engine.process_event_at(&event, 1010);
3680        assert!(
3681            result.correlations.is_empty(),
3682            "should not fire with only 1 event after reset"
3683        );
3684
3685        let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 1});
3686        let event = Event::from_value(&v);
3687        let result = engine.process_event_at(&event, 1011);
3688        assert_eq!(result.correlations.len(), 1);
3689        let events = result.correlations[0].events.as_ref().unwrap();
3690        assert_eq!(events.len(), 2);
3691        // Should only have round 2 events
3692        assert_eq!(events[0]["round"], 2);
3693        assert_eq!(events[1]["round"], 2);
3694    }
3695
3696    #[test]
3697    fn test_correlation_events_with_set_include() {
3698        let yaml = r#"
3699title: Login
3700id: login-rule
3701logsource:
3702    category: auth
3703detection:
3704    selection:
3705        EventType: login
3706    condition: selection
3707---
3708title: Many Logins
3709correlation:
3710    type: event_count
3711    rules:
3712        - login-rule
3713    group-by:
3714        - User
3715    timespan: 60s
3716    condition:
3717        gte: 2
3718level: high
3719"#;
3720        let collection = parse_sigma_yaml(yaml).unwrap();
3721        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3722        engine.add_collection(&collection).unwrap();
3723
3724        // Enable via setter
3725        engine.set_correlation_event_mode(CorrelationEventMode::Full);
3726
3727        for i in 0..2 {
3728            let v = json!({"EventType": "login", "User": "admin"});
3729            let event = Event::from_value(&v);
3730            let result = engine.process_event_at(&event, 1000 + i);
3731            if i == 1 {
3732                assert_eq!(result.correlations.len(), 1);
3733                assert!(result.correlations[0].events.is_some());
3734                assert_eq!(result.correlations[0].events.as_ref().unwrap().len(), 2);
3735            }
3736        }
3737    }
3738
3739    #[test]
3740    fn test_correlation_events_eviction_syncs_with_window() {
3741        let yaml = r#"
3742title: Login
3743id: login-rule
3744logsource:
3745    category: auth
3746detection:
3747    selection:
3748        EventType: login
3749    condition: selection
3750---
3751title: Many Logins
3752correlation:
3753    type: event_count
3754    rules:
3755        - login-rule
3756    group-by:
3757        - User
3758    timespan: 10s
3759    condition:
3760        gte: 3
3761level: high
3762"#;
3763        let collection = parse_sigma_yaml(yaml).unwrap();
3764        let config = CorrelationConfig {
3765            correlation_event_mode: CorrelationEventMode::Full,
3766            max_correlation_events: 100,
3767            ..Default::default()
3768        };
3769        let mut engine = CorrelationEngine::new(config);
3770        engine.add_collection(&collection).unwrap();
3771
3772        // Push 2 events at ts=1000,1001 — within the 10s window
3773        for i in 0..2 {
3774            let v = json!({"EventType": "login", "User": "admin", "idx": i});
3775            let event = Event::from_value(&v);
3776            engine.process_event_at(&event, 1000 + i);
3777        }
3778
3779        // Push 1 more event at ts=1015 — the first 2 events are now outside the
3780        // 10s window (cutoff = 1015 - 10 = 1005)
3781        let v = json!({"EventType": "login", "User": "admin", "idx": 2});
3782        let event = Event::from_value(&v);
3783        let result = engine.process_event_at(&event, 1015);
3784        // Should NOT fire: only 1 event in window (the one at ts=1015)
3785        assert!(
3786            result.correlations.is_empty(),
3787            "should not fire — old events evicted"
3788        );
3789
3790        // Push 2 more to reach threshold
3791        for i in 3..5 {
3792            let v = json!({"EventType": "login", "User": "admin", "idx": i});
3793            let event = Event::from_value(&v);
3794            let result = engine.process_event_at(&event, 1016 + i - 3);
3795            if i == 4 {
3796                assert_eq!(result.correlations.len(), 1);
3797                let events = result.correlations[0].events.as_ref().unwrap();
3798                // Should have events from ts=1015,1016,1017 — not the old ones
3799                assert_eq!(events.len(), 3);
3800                for ev in events {
3801                    assert!(ev["idx"].as_i64().unwrap() >= 2);
3802                }
3803            }
3804        }
3805    }
3806
3807    #[test]
3808    fn test_event_buffer_monitoring() {
3809        let yaml = r#"
3810title: Login
3811id: login-rule
3812logsource:
3813    category: auth
3814detection:
3815    selection:
3816        EventType: login
3817    condition: selection
3818---
3819title: Many Logins
3820correlation:
3821    type: event_count
3822    rules:
3823        - login-rule
3824    group-by:
3825        - User
3826    timespan: 60s
3827    condition:
3828        gte: 100
3829level: high
3830"#;
3831        let collection = parse_sigma_yaml(yaml).unwrap();
3832        let config = CorrelationConfig {
3833            correlation_event_mode: CorrelationEventMode::Full,
3834            ..Default::default()
3835        };
3836        let mut engine = CorrelationEngine::new(config);
3837        engine.add_collection(&collection).unwrap();
3838
3839        assert_eq!(engine.event_buffer_count(), 0);
3840        assert_eq!(engine.event_buffer_bytes(), 0);
3841
3842        // Push some events
3843        for i in 0..5 {
3844            let v = json!({"EventType": "login", "User": "admin"});
3845            let event = Event::from_value(&v);
3846            engine.process_event_at(&event, 1000 + i);
3847        }
3848
3849        assert_eq!(engine.event_buffer_count(), 1); // one group key
3850        assert!(engine.event_buffer_bytes() > 0);
3851    }
3852
3853    #[test]
3854    fn test_correlation_refs_mode_basic() {
3855        let yaml = r#"
3856title: Login
3857id: login-rule
3858logsource:
3859    category: auth
3860detection:
3861    selection:
3862        EventType: login
3863    condition: selection
3864---
3865title: Many Logins
3866correlation:
3867    type: event_count
3868    rules:
3869        - login-rule
3870    group-by:
3871        - User
3872    timespan: 60s
3873    condition:
3874        gte: 3
3875level: high
3876"#;
3877        let collection = parse_sigma_yaml(yaml).unwrap();
3878        let config = CorrelationConfig {
3879            correlation_event_mode: CorrelationEventMode::Refs,
3880            max_correlation_events: 10,
3881            ..Default::default()
3882        };
3883        let mut engine = CorrelationEngine::new(config);
3884        engine.add_collection(&collection).unwrap();
3885
3886        let mut corr_result = None;
3887        for i in 0..3 {
3888            let v = json!({"EventType": "login", "User": "admin", "id": format!("evt-{i}"), "@timestamp": 1000 + i});
3889            let event = Event::from_value(&v);
3890            let result = engine.process_event_at(&event, 1000 + i);
3891            if !result.correlations.is_empty() {
3892                corr_result = Some(result.correlations[0].clone());
3893            }
3894        }
3895
3896        let result = corr_result.expect("correlation should have fired");
3897        // In refs mode: events should be None, event_refs should be Some
3898        assert!(
3899            result.events.is_none(),
3900            "Full events should not be included in refs mode"
3901        );
3902        let refs = result
3903            .event_refs
3904            .expect("event_refs should be present in refs mode");
3905        assert_eq!(refs.len(), 3);
3906        assert_eq!(refs[0].timestamp, 1000);
3907        assert_eq!(refs[0].id, Some("evt-0".to_string()));
3908        assert_eq!(refs[1].id, Some("evt-1".to_string()));
3909        assert_eq!(refs[2].id, Some("evt-2".to_string()));
3910    }
3911
3912    #[test]
3913    fn test_correlation_refs_mode_no_id_field() {
3914        let yaml = r#"
3915title: Login
3916id: login-rule
3917logsource:
3918    category: auth
3919detection:
3920    selection:
3921        EventType: login
3922    condition: selection
3923---
3924title: Many Logins
3925correlation:
3926    type: event_count
3927    rules:
3928        - login-rule
3929    group-by:
3930        - User
3931    timespan: 60s
3932    condition:
3933        gte: 2
3934level: high
3935"#;
3936        let collection = parse_sigma_yaml(yaml).unwrap();
3937        let config = CorrelationConfig {
3938            correlation_event_mode: CorrelationEventMode::Refs,
3939            ..Default::default()
3940        };
3941        let mut engine = CorrelationEngine::new(config);
3942        engine.add_collection(&collection).unwrap();
3943
3944        let mut corr_result = None;
3945        for i in 0..2 {
3946            let v = json!({"EventType": "login", "User": "admin"});
3947            let event = Event::from_value(&v);
3948            let result = engine.process_event_at(&event, 1000 + i);
3949            if !result.correlations.is_empty() {
3950                corr_result = Some(result.correlations[0].clone());
3951            }
3952        }
3953
3954        let result = corr_result.expect("correlation should have fired");
3955        let refs = result.event_refs.expect("event_refs should be present");
3956        // No ID field in events → id should be None
3957        for r in &refs {
3958            assert_eq!(r.id, None);
3959        }
3960    }
3961
3962    #[test]
3963    fn test_per_correlation_custom_attributes_from_yaml() {
3964        let yaml = r#"
3965title: Login
3966id: login-rule
3967logsource:
3968    category: auth
3969detection:
3970    selection:
3971        EventType: login
3972    condition: selection
3973---
3974title: Many Logins
3975custom_attributes:
3976    rsigma.correlation_event_mode: refs
3977    rsigma.max_correlation_events: "5"
3978correlation:
3979    type: event_count
3980    rules:
3981        - login-rule
3982    group-by:
3983        - User
3984    timespan: 60s
3985    condition:
3986        gte: 3
3987level: high
3988"#;
3989        let collection = parse_sigma_yaml(yaml).unwrap();
3990        // Engine mode is None (default), but per-correlation should override to Refs
3991        let config = CorrelationConfig::default();
3992        let mut engine = CorrelationEngine::new(config);
3993        engine.add_collection(&collection).unwrap();
3994
3995        let mut corr_result = None;
3996        for i in 0..3 {
3997            let v = json!({"EventType": "login", "User": "admin", "id": format!("e{i}")});
3998            let event = Event::from_value(&v);
3999            let result = engine.process_event_at(&event, 1000 + i);
4000            if !result.correlations.is_empty() {
4001                corr_result = Some(result.correlations[0].clone());
4002            }
4003        }
4004
4005        let result = corr_result.expect("correlation should fire with per-correlation refs mode");
4006        // Per-correlation override should enable refs mode even though engine default is None
4007        assert!(result.events.is_none());
4008        let refs = result
4009            .event_refs
4010            .expect("event_refs via per-correlation override");
4011        assert_eq!(refs.len(), 3);
4012        assert_eq!(refs[0].id, Some("e0".to_string()));
4013    }
4014
4015    #[test]
4016    fn test_per_correlation_custom_attr_suppress_and_action() {
4017        let yaml = r#"
4018title: Login
4019id: login-rule
4020logsource:
4021    category: auth
4022detection:
4023    selection:
4024        EventType: login
4025    condition: selection
4026---
4027title: Many Logins
4028custom_attributes:
4029    rsigma.suppress: 10s
4030    rsigma.action: reset
4031correlation:
4032    type: event_count
4033    rules:
4034        - login-rule
4035    group-by:
4036        - User
4037    timespan: 60s
4038    condition:
4039        gte: 2
4040level: high
4041"#;
4042        let collection = parse_sigma_yaml(yaml).unwrap();
4043        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
4044        engine.add_collection(&collection).unwrap();
4045
4046        // Verify the compiled correlation has per-rule overrides
4047        assert_eq!(engine.correlations[0].suppress_secs, Some(10));
4048        assert_eq!(
4049            engine.correlations[0].action,
4050            Some(CorrelationAction::Reset)
4051        );
4052    }
4053}