Skip to main content

rsigma_eval/
correlation_engine.rs

1//! Stateful correlation engine with time-windowed aggregation.
2//!
3//! `CorrelationEngine` wraps the stateless `Engine` and adds support for
4//! Sigma correlation rules: `event_count`, `value_count`, `temporal`,
5//! `temporal_ordered`, `value_sum`, `value_avg`, `value_percentile`,
6//! and `value_median`.
7//!
8//! # Architecture
9//!
10//! 1. Events are first evaluated against detection rules (stateless)
11//! 2. Detection matches update correlation window state (stateful)
12//! 3. When a correlation condition is met, a `CorrelationResult` is emitted
13//! 4. Correlation results can chain into higher-level correlations
14
15use std::collections::HashMap;
16
17use chrono::{DateTime, TimeZone, Utc};
18use serde::Serialize;
19
20use rsigma_parser::{CorrelationRule, CorrelationType, Level, SigmaCollection, SigmaRule};
21
22use crate::correlation::{
23    CompiledCorrelation, EventBuffer, EventRef, EventRefBuffer, GroupKey, WindowState,
24    compile_correlation,
25};
26use crate::engine::Engine;
27use crate::error::Result;
28use crate::event::Event;
29use crate::pipeline::{Pipeline, apply_pipelines};
30use crate::result::MatchResult;
31
32// =============================================================================
33// Configuration
34// =============================================================================
35
36/// What to do with window state after a correlation fires.
37///
38/// This is an engine-level default that can be overridden per-correlation
39/// via the `rsigma.action` custom attribute set in processing pipelines.
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
41#[serde(rename_all = "snake_case")]
42pub enum CorrelationAction {
43    /// Keep window state as-is after firing (current / default behavior).
44    /// Subsequent events that still satisfy the condition will re-fire.
45    #[default]
46    Alert,
47    /// Clear the window state for the firing group key after emitting the alert.
48    /// The threshold must be met again from scratch before the next alert.
49    Reset,
50}
51
52impl std::str::FromStr for CorrelationAction {
53    type Err = String;
54    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
55        match s {
56            "alert" => Ok(CorrelationAction::Alert),
57            "reset" => Ok(CorrelationAction::Reset),
58            _ => Err(format!(
59                "Unknown correlation action: {s} (expected 'alert' or 'reset')"
60            )),
61        }
62    }
63}
64
65/// How to include events in correlation results.
66///
67/// Can be overridden per-correlation via the `rsigma.correlation_event_mode`
68/// custom attribute.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
70#[serde(rename_all = "snake_case")]
71pub enum CorrelationEventMode {
72    /// Don't include events (default). Zero memory overhead.
73    #[default]
74    None,
75    /// Include full event bodies, individually compressed with deflate.
76    /// Typical cost: 100–1000 bytes per event.
77    Full,
78    /// Include only event references (timestamp + optional ID).
79    /// Minimal memory: ~40 bytes per event.
80    Refs,
81}
82
83impl std::str::FromStr for CorrelationEventMode {
84    type Err = String;
85    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
86        match s.to_lowercase().as_str() {
87            "none" | "off" | "false" => Ok(CorrelationEventMode::None),
88            "full" | "true" => Ok(CorrelationEventMode::Full),
89            "refs" | "references" => Ok(CorrelationEventMode::Refs),
90            _ => Err(format!(
91                "Unknown correlation event mode: {s} (expected 'none', 'full', or 'refs')"
92            )),
93        }
94    }
95}
96
97impl std::fmt::Display for CorrelationEventMode {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        match self {
100            CorrelationEventMode::None => write!(f, "none"),
101            CorrelationEventMode::Full => write!(f, "full"),
102            CorrelationEventMode::Refs => write!(f, "refs"),
103        }
104    }
105}
106
107/// Behavior when no timestamp field is found or parseable in an event.
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
109pub enum TimestampFallback {
110    /// Use wall-clock time (`Utc::now()`). Good for real-time streaming.
111    #[default]
112    WallClock,
113    /// Skip the event from correlation processing. Detections still fire,
114    /// but the event does not update any correlation state. Recommended for
115    /// batch/replay of historical logs where wall-clock time is meaningless.
116    Skip,
117}
118
119/// Configuration for the correlation engine.
120///
121/// Provides engine-level defaults that mirror pySigma backend optional arguments.
122/// Per-correlation overrides can be set via `SetCustomAttribute` pipeline
123/// transformations using the `rsigma.*` attribute namespace.
124#[derive(Debug, Clone)]
125pub struct CorrelationConfig {
126    /// Field names to try for timestamp extraction, in order of priority.
127    ///
128    /// The engine will try each field until one yields a parseable timestamp.
129    /// If none succeed, the `timestamp_fallback` policy applies.
130    pub timestamp_fields: Vec<String>,
131
132    /// What to do when no timestamp can be extracted from an event.
133    ///
134    /// Default: `WallClock` (use `Utc::now()`).
135    pub timestamp_fallback: TimestampFallback,
136
137    /// Maximum number of state entries (across all correlations and groups)
138    /// before aggressive eviction is triggered. Prevents unbounded memory growth.
139    ///
140    /// Default: 100_000.
141    pub max_state_entries: usize,
142
143    /// Default suppression window in seconds.
144    ///
145    /// After a correlation fires for a `(correlation, group_key)`, suppress
146    /// re-alerts for this duration. `None` means no suppression (every
147    /// condition-satisfying event produces an alert).
148    ///
149    /// Can be overridden per-correlation via the `rsigma.suppress` custom attribute.
150    pub suppress: Option<u64>,
151
152    /// Default action to take after a correlation fires.
153    ///
154    /// Can be overridden per-correlation via the `rsigma.action` custom attribute.
155    pub action_on_match: CorrelationAction,
156
157    /// Whether to emit detection-level matches for rules that are only
158    /// referenced by correlations (where `generate: false`).
159    ///
160    /// Default: `true` (emit all detection matches).
161    /// Set to `false` to suppress detection output for correlation-only rules.
162    pub emit_detections: bool,
163
164    /// How to include contributing events in correlation results.
165    ///
166    /// - `None` (default): no event storage, zero overhead.
167    /// - `Full`: events are deflate-compressed and decompressed on output.
168    /// - `Refs`: only timestamps + event IDs are stored (minimal memory).
169    ///
170    /// Can be overridden per-correlation via `rsigma.correlation_event_mode`.
171    pub correlation_event_mode: CorrelationEventMode,
172
173    /// Maximum number of events to store per (correlation, group_key) window
174    /// when `correlation_event_mode` is not `None`.
175    ///
176    /// Bounds memory at: `max_correlation_events × cost_per_event × active_groups`.
177    /// Default: 10.
178    pub max_correlation_events: usize,
179}
180
181impl Default for CorrelationConfig {
182    fn default() -> Self {
183        CorrelationConfig {
184            timestamp_fields: vec![
185                "@timestamp".to_string(),
186                "timestamp".to_string(),
187                "EventTime".to_string(),
188                "TimeCreated".to_string(),
189                "eventTime".to_string(),
190            ],
191            timestamp_fallback: TimestampFallback::default(),
192            max_state_entries: 100_000,
193            suppress: None,
194            action_on_match: CorrelationAction::default(),
195            emit_detections: true,
196            correlation_event_mode: CorrelationEventMode::default(),
197            max_correlation_events: 10,
198        }
199    }
200}
201
202// =============================================================================
203// Result types
204// =============================================================================
205
206/// Combined result from processing a single event.
207#[derive(Debug, Clone, Serialize)]
208pub struct ProcessResult {
209    /// Detection rule matches (stateless, immediate).
210    pub detections: Vec<MatchResult>,
211    /// Correlation rule matches (stateful, accumulated).
212    pub correlations: Vec<CorrelationResult>,
213}
214
215/// The result of a correlation rule firing.
216#[derive(Debug, Clone, Serialize)]
217pub struct CorrelationResult {
218    /// Title of the correlation rule.
219    pub rule_title: String,
220    /// ID of the correlation rule (if present).
221    pub rule_id: Option<String>,
222    /// Severity level.
223    pub level: Option<Level>,
224    /// Tags from the correlation rule.
225    pub tags: Vec<String>,
226    /// Type of correlation.
227    pub correlation_type: CorrelationType,
228    /// Group-by field names and their values for this match.
229    pub group_key: Vec<(String, String)>,
230    /// The aggregated value that triggered the condition (count, sum, avg, etc.).
231    pub aggregated_value: f64,
232    /// The time window in seconds.
233    pub timespan_secs: u64,
234    /// Full event bodies, included when `correlation_event_mode` is `Full`.
235    ///
236    /// Contains up to `max_correlation_events` recently stored window events.
237    /// Events are decompressed from deflate storage on output.
238    #[serde(skip_serializing_if = "Option::is_none")]
239    pub events: Option<Vec<serde_json::Value>>,
240    /// Lightweight event references, included when `correlation_event_mode` is `Refs`.
241    ///
242    /// Contains up to `max_correlation_events` timestamp + optional ID pairs.
243    #[serde(skip_serializing_if = "Option::is_none")]
244    pub event_refs: Option<Vec<EventRef>>,
245}
246
247// =============================================================================
248// Correlation Engine
249// =============================================================================
250
251/// Stateful correlation engine.
252///
253/// Wraps the stateless `Engine` for detection rules and adds time-windowed
254/// correlation on top. Supports all 7 Sigma correlation types and chaining.
255pub struct CorrelationEngine {
256    /// Inner stateless detection engine.
257    engine: Engine,
258    /// Compiled correlation rules.
259    correlations: Vec<CompiledCorrelation>,
260    /// Maps rule ID/name -> indices into `correlations` that reference it.
261    /// This allows quick lookup: "which correlations care about rule X?"
262    rule_index: HashMap<String, Vec<usize>>,
263    /// Maps detection rule index -> (rule_id, rule_name) for reverse lookup.
264    /// Used to find which correlations a detection match triggers.
265    rule_ids: Vec<(Option<String>, Option<String>)>,
266    /// Per-(correlation_index, group_key) window state.
267    state: HashMap<(usize, GroupKey), WindowState>,
268    /// Last alert timestamp per (correlation_index, group_key) for suppression.
269    last_alert: HashMap<(usize, GroupKey), i64>,
270    /// Per-(correlation_index, group_key) compressed event buffer (`Full` mode).
271    event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
272    /// Per-(correlation_index, group_key) event reference buffer (`Refs` mode).
273    event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
274    /// Set of detection rule IDs/names that are "correlation-only"
275    /// (referenced by correlations where `generate == false`).
276    /// Used to filter detection output when `config.emit_detections == false`.
277    correlation_only_rules: std::collections::HashSet<String>,
278    /// Configuration.
279    config: CorrelationConfig,
280    /// Processing pipelines applied to rules during add_rule.
281    pipelines: Vec<Pipeline>,
282}
283
284impl CorrelationEngine {
285    /// Create a new correlation engine with the given configuration.
286    pub fn new(config: CorrelationConfig) -> Self {
287        CorrelationEngine {
288            engine: Engine::new(),
289            correlations: Vec::new(),
290            rule_index: HashMap::new(),
291            rule_ids: Vec::new(),
292            state: HashMap::new(),
293            last_alert: HashMap::new(),
294            event_buffers: HashMap::new(),
295            event_ref_buffers: HashMap::new(),
296            correlation_only_rules: std::collections::HashSet::new(),
297            config,
298            pipelines: Vec::new(),
299        }
300    }
301
302    /// Add a pipeline to the engine.
303    ///
304    /// Pipelines are applied to rules during `add_rule` / `add_collection`.
305    pub fn add_pipeline(&mut self, pipeline: Pipeline) {
306        self.pipelines.push(pipeline);
307        self.pipelines.sort_by_key(|p| p.priority);
308    }
309
310    /// Set global `include_event` on the inner detection engine.
311    pub fn set_include_event(&mut self, include: bool) {
312        self.engine.set_include_event(include);
313    }
314
315    /// Set the global correlation event mode.
316    ///
317    /// - `None`: no event storage (default)
318    /// - `Full`: compressed event bodies
319    /// - `Refs`: lightweight timestamp + ID references
320    pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
321        self.config.correlation_event_mode = mode;
322    }
323
324    /// Set the maximum number of events to store per correlation window group.
325    /// Only meaningful when `correlation_event_mode` is not `None`.
326    pub fn set_max_correlation_events(&mut self, max: usize) {
327        self.config.max_correlation_events = max;
328    }
329
330    /// Add a single detection rule.
331    ///
332    /// If pipelines are set, the rule is cloned and transformed before compilation.
333    /// The inner engine receives the already-transformed rule directly (not through
334    /// its own pipeline, to avoid double transformation).
335    pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
336        if self.pipelines.is_empty() {
337            self.apply_custom_attributes(&rule.custom_attributes);
338            self.rule_ids.push((rule.id.clone(), rule.name.clone()));
339            self.engine.add_rule(rule)?;
340        } else {
341            let mut transformed = rule.clone();
342            apply_pipelines(&self.pipelines, &mut transformed)?;
343            self.apply_custom_attributes(&transformed.custom_attributes);
344            self.rule_ids
345                .push((transformed.id.clone(), transformed.name.clone()));
346            // Use compile_rule + add_compiled_rule to bypass inner engine's pipelines
347            let compiled = crate::compiler::compile_rule(&transformed)?;
348            self.engine.add_compiled_rule(compiled);
349        }
350        Ok(())
351    }
352
353    /// Read `rsigma.*` custom attributes from a rule and apply them to the
354    /// engine configuration.  This allows pipelines to influence engine
355    /// behaviour via `SetCustomAttribute` transformations — the same pattern
356    /// used by pySigma backends (e.g. pySigma-backend-loki).
357    ///
358    /// Supported attributes:
359    /// - `rsigma.timestamp_field` — prepends a field name to the timestamp
360    ///   extraction priority list so the correlation engine can find the
361    ///   event timestamp in non-standard field names.
362    /// - `rsigma.suppress` — sets the default suppression window (e.g. `5m`).
363    ///   Only applied when the CLI did not already set `--suppress`.
364    /// - `rsigma.action` — sets the default post-fire action (`alert`/`reset`).
365    ///   Only applied when the CLI did not already set `--action`.
366    fn apply_custom_attributes(&mut self, attrs: &std::collections::HashMap<String, String>) {
367        // rsigma.timestamp_field — prepend to priority list, skip duplicates
368        if let Some(field) = attrs.get("rsigma.timestamp_field")
369            && !self.config.timestamp_fields.contains(field)
370        {
371            self.config.timestamp_fields.insert(0, field.clone());
372        }
373
374        // rsigma.suppress — only when CLI didn't already set one
375        if let Some(val) = attrs.get("rsigma.suppress")
376            && self.config.suppress.is_none()
377            && let Ok(ts) = rsigma_parser::Timespan::parse(val)
378        {
379            self.config.suppress = Some(ts.seconds);
380        }
381
382        // rsigma.action — only when CLI left it at the default (Alert)
383        if let Some(val) = attrs.get("rsigma.action")
384            && self.config.action_on_match == CorrelationAction::Alert
385            && let Ok(a) = val.parse::<CorrelationAction>()
386        {
387            self.config.action_on_match = a;
388        }
389    }
390
391    /// Add a single correlation rule.
392    pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
393        // Apply engine-level custom attributes from the correlation rule
394        // (e.g. rsigma.timestamp_field).
395        self.apply_custom_attributes(&corr.custom_attributes);
396
397        let compiled = compile_correlation(corr)?;
398        let idx = self.correlations.len();
399
400        // Index by each referenced rule ID/name
401        for rule_ref in &compiled.rule_refs {
402            self.rule_index
403                .entry(rule_ref.clone())
404                .or_default()
405                .push(idx);
406        }
407
408        // Track correlation-only rules (generate == false is the default)
409        if !compiled.generate {
410            for rule_ref in &compiled.rule_refs {
411                self.correlation_only_rules.insert(rule_ref.clone());
412            }
413        }
414
415        self.correlations.push(compiled);
416        Ok(())
417    }
418
419    /// Add all rules and correlations from a parsed collection.
420    ///
421    /// Detection rules are added first (so they're available for correlation
422    /// references), then correlation rules.
423    pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
424        for rule in &collection.rules {
425            self.add_rule(rule)?;
426        }
427        // Apply filter rules to the inner engine's detection rules
428        for filter in &collection.filters {
429            self.engine.apply_filter(filter)?;
430        }
431        for corr in &collection.correlations {
432            self.add_correlation(corr)?;
433        }
434        // Validate no cycles exist in the correlation reference graph
435        self.detect_correlation_cycles()?;
436        Ok(())
437    }
438
439    /// Detect cycles in the correlation reference graph.
440    ///
441    /// Builds a directed graph where each correlation (identified by its id/name)
442    /// has edges to the correlations it references via `rule_refs`. Uses DFS with
443    /// a "gray/black" coloring scheme to detect back-edges (cycles).
444    ///
445    /// Returns `Err(EvalError::CorrelationCycle)` if a cycle is found.
446    fn detect_correlation_cycles(&self) -> Result<()> {
447        // Build a set of all correlation identifiers (id and/or name)
448        let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
449        for (idx, corr) in self.correlations.iter().enumerate() {
450            if let Some(ref id) = corr.id {
451                corr_identifiers.insert(id.as_str(), idx);
452            }
453            if let Some(ref name) = corr.name {
454                corr_identifiers.insert(name.as_str(), idx);
455            }
456        }
457
458        // Build adjacency list: corr index → set of corr indices it references
459        let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
460        for (idx, corr) in self.correlations.iter().enumerate() {
461            for rule_ref in &corr.rule_refs {
462                if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
463                    adj[idx].push(target_idx);
464                }
465            }
466        }
467
468        // DFS cycle detection with three states: white (unvisited), gray (in stack), black (done)
469        let mut state = vec![0u8; self.correlations.len()]; // 0=white, 1=gray, 2=black
470        let mut path: Vec<usize> = Vec::new();
471
472        for start in 0..self.correlations.len() {
473            if state[start] == 0
474                && let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
475            {
476                let names: Vec<String> = cycle
477                    .iter()
478                    .map(|&i| {
479                        self.correlations[i]
480                            .id
481                            .as_deref()
482                            .or(self.correlations[i].name.as_deref())
483                            .unwrap_or(&self.correlations[i].title)
484                            .to_string()
485                    })
486                    .collect();
487                return Err(crate::error::EvalError::CorrelationCycle(
488                    names.join(" -> "),
489                ));
490            }
491        }
492        Ok(())
493    }
494
495    /// DFS helper that returns the cycle path if a back-edge is found.
496    fn dfs_find_cycle(
497        node: usize,
498        adj: &[Vec<usize>],
499        state: &mut [u8],
500        path: &mut Vec<usize>,
501    ) -> Option<Vec<usize>> {
502        state[node] = 1; // gray
503        path.push(node);
504
505        for &next in &adj[node] {
506            if state[next] == 1 {
507                // Back-edge found — extract cycle from path
508                if let Some(pos) = path.iter().position(|&n| n == next) {
509                    let mut cycle = path[pos..].to_vec();
510                    cycle.push(next); // close the cycle
511                    return Some(cycle);
512                }
513            }
514            if state[next] == 0
515                && let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
516            {
517                return Some(cycle);
518            }
519        }
520
521        path.pop();
522        state[node] = 2; // black
523        None
524    }
525
526    /// Process an event, extracting the timestamp from configured event fields.
527    ///
528    /// When no timestamp field is found, the `timestamp_fallback` policy applies:
529    /// - `WallClock`: use `Utc::now()` (good for real-time streaming)
530    /// - `Skip`: return detections only, skip correlation state updates
531    pub fn process_event(&mut self, event: &Event) -> ProcessResult {
532        let ts = match self.extract_event_timestamp(event) {
533            Some(ts) => ts,
534            None => match self.config.timestamp_fallback {
535                TimestampFallback::WallClock => Utc::now().timestamp(),
536                TimestampFallback::Skip => {
537                    // Still run detection (stateless), but skip correlation
538                    let all_detections = self.engine.evaluate(event);
539                    let detections = self.filter_detections(all_detections);
540                    return ProcessResult {
541                        detections,
542                        correlations: Vec::new(),
543                    };
544                }
545            },
546        };
547        self.process_event_at(event, ts)
548    }
549
550    /// Process an event with an explicit Unix epoch timestamp (seconds).
551    ///
552    /// The timestamp is clamped to `[0, i64::MAX / 2]` to prevent overflow
553    /// when adding timespan durations internally.
554    pub fn process_event_at(&mut self, event: &Event, timestamp_secs: i64) -> ProcessResult {
555        let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
556
557        // Step 1: Memory management — evict before adding new state to enforce limit
558        if self.state.len() >= self.config.max_state_entries {
559            self.evict_all(timestamp_secs);
560        }
561
562        // Step 2: Run stateless detection
563        let all_detections = self.engine.evaluate(event);
564
565        // Step 3: Feed detection matches into correlations
566        let mut correlations = Vec::new();
567        self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
568
569        // Step 4: Chain — correlation results may trigger higher-level correlations
570        self.chain_correlations(&correlations, timestamp_secs);
571
572        // Step 5: Filter detections by generate flag
573        let detections = self.filter_detections(all_detections);
574
575        ProcessResult {
576            detections,
577            correlations,
578        }
579    }
580
581    /// Filter detections by the `generate` flag / `emit_detections` config.
582    ///
583    /// If `emit_detections` is false and some rules are correlation-only,
584    /// their detection output is suppressed.
585    fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
586        if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
587            all_detections
588                .into_iter()
589                .filter(|m| {
590                    let id_match = m
591                        .rule_id
592                        .as_ref()
593                        .is_some_and(|id| self.correlation_only_rules.contains(id));
594                    !id_match
595                })
596                .collect()
597        } else {
598            all_detections
599        }
600    }
601
602    /// Feed detection matches into correlation window states.
603    fn feed_detections(
604        &mut self,
605        event: &Event,
606        detections: &[MatchResult],
607        ts: i64,
608        out: &mut Vec<CorrelationResult>,
609    ) {
610        // Collect all (corr_idx, rule_id, rule_name) tuples upfront to avoid
611        // borrow conflicts between self.rule_ids and self.update_correlation.
612        let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
613
614        for det in detections {
615            // Use the MatchResult's rule_id to find the original rule's ID/name.
616            // We also look up by rule_id in our rule_ids table for the name.
617            let (rule_id, rule_name) = self.find_rule_identity(det);
618
619            // Collect correlation indices that reference this rule
620            let mut corr_indices = Vec::new();
621            if let Some(ref id) = rule_id
622                && let Some(indices) = self.rule_index.get(id)
623            {
624                corr_indices.extend(indices);
625            }
626            if let Some(ref name) = rule_name
627                && let Some(indices) = self.rule_index.get(name)
628            {
629                corr_indices.extend(indices);
630            }
631
632            corr_indices.sort_unstable();
633            corr_indices.dedup();
634
635            for &corr_idx in &corr_indices {
636                work.push((corr_idx, rule_id.clone(), rule_name.clone()));
637            }
638        }
639
640        for (corr_idx, rule_id, rule_name) in work {
641            self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
642        }
643    }
644
645    /// Find the (id, name) for a detection match by searching our rule_ids table.
646    fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
647        // First, try to find by matching rule_id in our table
648        if let Some(ref match_id) = det.rule_id {
649            for (id, name) in &self.rule_ids {
650                if id.as_deref() == Some(match_id.as_str()) {
651                    return (id.clone(), name.clone());
652                }
653            }
654        }
655        // Fall back to using just the MatchResult's rule_id
656        (det.rule_id.clone(), None)
657    }
658
659    /// Resolve the event mode for a given correlation.
660    fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
661        let corr = &self.correlations[corr_idx];
662        corr.event_mode
663            .unwrap_or(self.config.correlation_event_mode)
664    }
665
666    /// Resolve the max events cap for a given correlation.
667    fn resolve_max_events(&self, corr_idx: usize) -> usize {
668        let corr = &self.correlations[corr_idx];
669        corr.max_events
670            .unwrap_or(self.config.max_correlation_events)
671    }
672
673    /// Update a single correlation's state and check its condition.
674    fn update_correlation(
675        &mut self,
676        corr_idx: usize,
677        event: &Event,
678        ts: i64,
679        rule_id: &Option<String>,
680        rule_name: &Option<String>,
681        out: &mut Vec<CorrelationResult>,
682    ) {
683        // Borrow the correlation by reference — no cloning needed.  Rust allows
684        // simultaneous &self.correlations and &mut self.state / &mut self.last_alert
685        // because they are disjoint struct fields.
686        let corr = &self.correlations[corr_idx];
687        let corr_type = corr.correlation_type;
688        let timespan = corr.timespan_secs;
689        let level = corr.level;
690        let suppress_secs = corr.suppress_secs.or(self.config.suppress);
691        let action = corr.action.unwrap_or(self.config.action_on_match);
692        let event_mode = self.resolve_event_mode(corr_idx);
693        let max_events = self.resolve_max_events(corr_idx);
694
695        // Determine the rule_ref strings for alias resolution and temporal tracking.
696        let mut ref_strs: Vec<&str> = Vec::new();
697        if let Some(id) = rule_id.as_deref() {
698            ref_strs.push(id);
699        }
700        if let Some(name) = rule_name.as_deref() {
701            ref_strs.push(name);
702        }
703        let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
704
705        // Extract group key
706        let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
707
708        // Get or create window state
709        let state_key = (corr_idx, group_key.clone());
710        let state = self
711            .state
712            .entry(state_key.clone())
713            .or_insert_with(|| WindowState::new_for(corr_type));
714
715        // Evict expired entries
716        let cutoff = ts - timespan as i64;
717        state.evict(cutoff);
718
719        // Push the new event into the state
720        match corr_type {
721            CorrelationType::EventCount => {
722                state.push_event_count(ts);
723            }
724            CorrelationType::ValueCount => {
725                if let Some(ref field_name) = corr.condition.field
726                    && let Some(val) = event.get_field(field_name)
727                    && let Some(s) = value_to_string_for_count(val)
728                {
729                    state.push_value_count(ts, s);
730                }
731            }
732            CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
733                state.push_temporal(ts, rule_ref);
734            }
735            CorrelationType::ValueSum
736            | CorrelationType::ValueAvg
737            | CorrelationType::ValuePercentile
738            | CorrelationType::ValueMedian => {
739                if let Some(ref field_name) = corr.condition.field
740                    && let Some(val) = event.get_field(field_name)
741                    && let Some(n) = value_to_f64(val)
742                {
743                    state.push_numeric(ts, n);
744                }
745            }
746        }
747
748        // Push event into buffer based on event mode
749        match event_mode {
750            CorrelationEventMode::Full => {
751                let buf = self
752                    .event_buffers
753                    .entry(state_key.clone())
754                    .or_insert_with(|| EventBuffer::new(max_events));
755                buf.evict(cutoff);
756                buf.push(ts, event.as_value());
757            }
758            CorrelationEventMode::Refs => {
759                let buf = self
760                    .event_ref_buffers
761                    .entry(state_key.clone())
762                    .or_insert_with(|| EventRefBuffer::new(max_events));
763                buf.evict(cutoff);
764                buf.push(ts, event.as_value());
765            }
766            CorrelationEventMode::None => {}
767        }
768
769        // Check condition — after this, `state` is no longer used (NLL drops the borrow).
770        let fired = state.check_condition(
771            &corr.condition,
772            corr_type,
773            &corr.rule_refs,
774            corr.extended_expr.as_ref(),
775        );
776
777        if let Some(agg_value) = fired {
778            let alert_key = (corr_idx, group_key.clone());
779
780            // Suppression check: skip if we've already alerted within the suppress window
781            let suppressed = if let Some(suppress) = suppress_secs {
782                if let Some(&last_ts) = self.last_alert.get(&alert_key) {
783                    (ts - last_ts) < suppress as i64
784                } else {
785                    false
786                }
787            } else {
788                false
789            };
790
791            if !suppressed {
792                // Retrieve stored events / refs based on mode
793                let (events, event_refs) = match event_mode {
794                    CorrelationEventMode::Full => {
795                        let stored = self
796                            .event_buffers
797                            .get(&alert_key)
798                            .map(|buf| buf.decompress_all())
799                            .unwrap_or_default();
800                        (Some(stored), None)
801                    }
802                    CorrelationEventMode::Refs => {
803                        let stored = self
804                            .event_ref_buffers
805                            .get(&alert_key)
806                            .map(|buf| buf.refs())
807                            .unwrap_or_default();
808                        (None, Some(stored))
809                    }
810                    CorrelationEventMode::None => (None, None),
811                };
812
813                // Only clone title/id/tags when we actually produce output
814                let corr = &self.correlations[corr_idx];
815                let result = CorrelationResult {
816                    rule_title: corr.title.clone(),
817                    rule_id: corr.id.clone(),
818                    level,
819                    tags: corr.tags.clone(),
820                    correlation_type: corr_type,
821                    group_key: group_key.to_pairs(&corr.group_by),
822                    aggregated_value: agg_value,
823                    timespan_secs: timespan,
824                    events,
825                    event_refs,
826                };
827                out.push(result);
828
829                // Record alert time for suppression
830                self.last_alert.insert(alert_key.clone(), ts);
831
832                // Action on match
833                if action == CorrelationAction::Reset {
834                    if let Some(state) = self.state.get_mut(&alert_key) {
835                        state.clear();
836                    }
837                    if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
838                        buf.clear();
839                    }
840                    if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
841                        buf.clear();
842                    }
843                }
844            }
845        }
846    }
847
848    /// Propagate correlation results to higher-level correlations (chaining).
849    ///
850    /// When a correlation fires, any correlation that references it (by ID or name)
851    /// is updated. Limits chain depth to 10 to prevent infinite loops.
852    fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
853        const MAX_CHAIN_DEPTH: usize = 10;
854        let mut pending: Vec<CorrelationResult> = fired.to_vec();
855        let mut depth = 0;
856
857        while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
858            depth += 1;
859
860            // Collect work items: (corr_idx, group_key_pairs, fired_ref)
861            #[allow(clippy::type_complexity)]
862            let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
863            for result in &pending {
864                if let Some(ref id) = result.rule_id
865                    && let Some(indices) = self.rule_index.get(id)
866                {
867                    let fired_ref = result
868                        .rule_id
869                        .as_deref()
870                        .unwrap_or(&result.rule_title)
871                        .to_string();
872                    for &corr_idx in indices {
873                        work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
874                    }
875                }
876            }
877
878            let mut next_pending = Vec::new();
879            for (corr_idx, group_key_pairs, fired_ref) in work {
880                let corr = &self.correlations[corr_idx];
881                let corr_type = corr.correlation_type;
882                let timespan = corr.timespan_secs;
883                let level = corr.level;
884
885                let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
886                let state_key = (corr_idx, group_key.clone());
887                let state = self
888                    .state
889                    .entry(state_key)
890                    .or_insert_with(|| WindowState::new_for(corr_type));
891
892                let cutoff = ts - timespan as i64;
893                state.evict(cutoff);
894
895                match corr_type {
896                    CorrelationType::EventCount => {
897                        state.push_event_count(ts);
898                    }
899                    CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
900                        state.push_temporal(ts, &fired_ref);
901                    }
902                    _ => {
903                        state.push_event_count(ts);
904                    }
905                }
906
907                let fired = state.check_condition(
908                    &corr.condition,
909                    corr_type,
910                    &corr.rule_refs,
911                    corr.extended_expr.as_ref(),
912                );
913
914                if let Some(agg_value) = fired {
915                    let corr = &self.correlations[corr_idx];
916                    next_pending.push(CorrelationResult {
917                        rule_title: corr.title.clone(),
918                        rule_id: corr.id.clone(),
919                        level,
920                        tags: corr.tags.clone(),
921                        correlation_type: corr_type,
922                        group_key: group_key.to_pairs(&corr.group_by),
923                        aggregated_value: agg_value,
924                        timespan_secs: timespan,
925                        // Chained correlations don't include events (they aggregate
926                        // over correlation results, not raw events)
927                        events: None,
928                        event_refs: None,
929                    });
930                }
931            }
932
933            pending = next_pending;
934        }
935
936        if !pending.is_empty() {
937            log::warn!(
938                "Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
939                 {} pending result(s) were not propagated further. \
940                 This may indicate a cycle in correlation references.",
941                pending.len()
942            );
943        }
944    }
945
946    // =========================================================================
947    // Timestamp extraction
948    // =========================================================================
949
950    /// Extract a Unix epoch timestamp (seconds) from an event.
951    ///
952    /// Tries each configured timestamp field in order. Supports:
953    /// - Numeric values (epoch seconds, or epoch millis if > 1e12)
954    /// - ISO 8601 strings (e.g., "2024-07-10T12:30:00Z")
955    ///
956    /// Returns `None` if no field yields a valid timestamp.
957    fn extract_event_timestamp(&self, event: &Event) -> Option<i64> {
958        for field_name in &self.config.timestamp_fields {
959            if let Some(val) = event.get_field(field_name)
960                && let Some(ts) = parse_timestamp_value(val)
961            {
962                return Some(ts);
963            }
964        }
965        None
966    }
967
968    // =========================================================================
969    // State management
970    // =========================================================================
971
972    /// Manually evict all expired state entries.
973    pub fn evict_expired(&mut self, now_secs: i64) {
974        self.evict_all(now_secs);
975    }
976
977    /// Evict expired entries and remove empty states.
978    fn evict_all(&mut self, now_secs: i64) {
979        // Phase 1: Time-based eviction — remove entries outside their correlation window
980        let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
981
982        self.state.retain(|&(corr_idx, _), state| {
983            if corr_idx < timespans.len() {
984                let cutoff = now_secs - timespans[corr_idx] as i64;
985                state.evict(cutoff);
986            }
987            !state.is_empty()
988        });
989
990        // Evict event buffers in sync with window state
991        self.event_buffers.retain(|&(corr_idx, _), buf| {
992            if corr_idx < timespans.len() {
993                let cutoff = now_secs - timespans[corr_idx] as i64;
994                buf.evict(cutoff);
995            }
996            !buf.is_empty()
997        });
998        self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
999            if corr_idx < timespans.len() {
1000                let cutoff = now_secs - timespans[corr_idx] as i64;
1001                buf.evict(cutoff);
1002            }
1003            !buf.is_empty()
1004        });
1005
1006        // Phase 2: Hard cap — if still over limit after time-based eviction (e.g.
1007        // high-cardinality traffic with long windows), drop the stalest entries
1008        // until we're at 90% capacity to avoid evicting on every single event.
1009        if self.state.len() >= self.config.max_state_entries {
1010            let target = self.config.max_state_entries * 9 / 10;
1011            let excess = self.state.len() - target;
1012
1013            // Collect keys with their latest timestamp, sort by oldest first
1014            let mut by_staleness: Vec<_> = self
1015                .state
1016                .iter()
1017                .map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
1018                .collect();
1019            by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
1020
1021            // Drop the oldest entries (and their associated event buffers)
1022            for (key, _) in by_staleness.into_iter().take(excess) {
1023                self.state.remove(&key);
1024                self.last_alert.remove(&key);
1025                self.event_buffers.remove(&key);
1026                self.event_ref_buffers.remove(&key);
1027            }
1028        }
1029
1030        // Phase 3: Evict stale last_alert entries — remove if the suppress window
1031        // has passed or if the corresponding window state no longer exists.
1032        self.last_alert.retain(|key, &mut alert_ts| {
1033            let suppress = if key.0 < self.correlations.len() {
1034                self.correlations[key.0]
1035                    .suppress_secs
1036                    .or(self.config.suppress)
1037                    .unwrap_or(0)
1038            } else {
1039                0
1040            };
1041            (now_secs - alert_ts) < suppress as i64
1042        });
1043    }
1044
1045    /// Number of active state entries (for monitoring).
1046    pub fn state_count(&self) -> usize {
1047        self.state.len()
1048    }
1049
1050    /// Number of detection rules loaded.
1051    pub fn detection_rule_count(&self) -> usize {
1052        self.engine.rule_count()
1053    }
1054
1055    /// Number of correlation rules loaded.
1056    pub fn correlation_rule_count(&self) -> usize {
1057        self.correlations.len()
1058    }
1059
1060    /// Number of active event buffers (for monitoring).
1061    pub fn event_buffer_count(&self) -> usize {
1062        self.event_buffers.len()
1063    }
1064
1065    /// Total compressed bytes across all event buffers (for monitoring).
1066    pub fn event_buffer_bytes(&self) -> usize {
1067        self.event_buffers
1068            .values()
1069            .map(|b| b.compressed_bytes())
1070            .sum()
1071    }
1072
1073    /// Number of active event ref buffers — `Refs` mode (for monitoring).
1074    pub fn event_ref_buffer_count(&self) -> usize {
1075        self.event_ref_buffers.len()
1076    }
1077
1078    /// Access the inner stateless engine.
1079    pub fn engine(&self) -> &Engine {
1080        &self.engine
1081    }
1082}
1083
1084impl Default for CorrelationEngine {
1085    fn default() -> Self {
1086        Self::new(CorrelationConfig::default())
1087    }
1088}
1089
1090// =============================================================================
1091// Timestamp parsing helpers
1092// =============================================================================
1093
1094/// Parse a JSON value as a Unix epoch timestamp in seconds.
1095fn parse_timestamp_value(val: &serde_json::Value) -> Option<i64> {
1096    match val {
1097        serde_json::Value::Number(n) => {
1098            if let Some(i) = n.as_i64() {
1099                Some(normalize_epoch(i))
1100            } else {
1101                n.as_f64().map(|f| normalize_epoch(f as i64))
1102            }
1103        }
1104        serde_json::Value::String(s) => parse_timestamp_string(s),
1105        _ => None,
1106    }
1107}
1108
1109/// Normalize an epoch value: if it looks like milliseconds (> year 33658),
1110/// convert to seconds.
1111fn normalize_epoch(v: i64) -> i64 {
1112    if v > 1_000_000_000_000 { v / 1000 } else { v }
1113}
1114
1115/// Parse a timestamp string. Tries ISO 8601 with timezone, then without.
1116fn parse_timestamp_string(s: &str) -> Option<i64> {
1117    // Try RFC 3339 / ISO 8601 with timezone
1118    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1119        return Some(dt.timestamp());
1120    }
1121
1122    // Try ISO 8601 without timezone (assume UTC)
1123    // Common formats: "2024-07-10T12:30:00", "2024-07-10 12:30:00"
1124    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1125        return Some(Utc.from_utc_datetime(&naive).timestamp());
1126    }
1127    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1128        return Some(Utc.from_utc_datetime(&naive).timestamp());
1129    }
1130
1131    // Try with fractional seconds
1132    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1133        return Some(Utc.from_utc_datetime(&naive).timestamp());
1134    }
1135    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
1136        return Some(Utc.from_utc_datetime(&naive).timestamp());
1137    }
1138
1139    None
1140}
1141
1142/// Convert a JSON value to a string for value_count purposes.
1143fn value_to_string_for_count(v: &serde_json::Value) -> Option<String> {
1144    match v {
1145        serde_json::Value::String(s) => Some(s.clone()),
1146        serde_json::Value::Number(n) => Some(n.to_string()),
1147        serde_json::Value::Bool(b) => Some(b.to_string()),
1148        serde_json::Value::Null => Some("null".to_string()),
1149        _ => None,
1150    }
1151}
1152
1153/// Convert a JSON value to f64 for numeric aggregation.
1154fn value_to_f64(v: &serde_json::Value) -> Option<f64> {
1155    match v {
1156        serde_json::Value::Number(n) => n.as_f64(),
1157        serde_json::Value::String(s) => s.parse().ok(),
1158        _ => None,
1159    }
1160}
1161
1162// =============================================================================
1163// Tests
1164// =============================================================================
1165
1166#[cfg(test)]
1167mod tests {
1168    use super::*;
1169    use rsigma_parser::parse_sigma_yaml;
1170    use serde_json::json;
1171
1172    // =========================================================================
1173    // Timestamp parsing
1174    // =========================================================================
1175
1176    #[test]
1177    fn test_parse_timestamp_epoch_secs() {
1178        let val = json!(1720612200);
1179        assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1180    }
1181
1182    #[test]
1183    fn test_parse_timestamp_epoch_millis() {
1184        let val = json!(1720612200000i64);
1185        assert_eq!(parse_timestamp_value(&val), Some(1720612200));
1186    }
1187
1188    #[test]
1189    fn test_parse_timestamp_rfc3339() {
1190        let val = json!("2024-07-10T12:30:00Z");
1191        let ts = parse_timestamp_value(&val).unwrap();
1192        assert_eq!(ts, 1720614600);
1193    }
1194
1195    #[test]
1196    fn test_parse_timestamp_naive() {
1197        let val = json!("2024-07-10T12:30:00");
1198        let ts = parse_timestamp_value(&val).unwrap();
1199        assert_eq!(ts, 1720614600);
1200    }
1201
1202    #[test]
1203    fn test_parse_timestamp_with_space() {
1204        let val = json!("2024-07-10 12:30:00");
1205        let ts = parse_timestamp_value(&val).unwrap();
1206        assert_eq!(ts, 1720614600);
1207    }
1208
1209    #[test]
1210    fn test_parse_timestamp_fractional() {
1211        let val = json!("2024-07-10T12:30:00.123Z");
1212        let ts = parse_timestamp_value(&val).unwrap();
1213        assert_eq!(ts, 1720614600);
1214    }
1215
1216    #[test]
1217    fn test_extract_timestamp_from_event() {
1218        let config = CorrelationConfig {
1219            timestamp_fields: vec!["@timestamp".to_string()],
1220            max_state_entries: 100_000,
1221            ..Default::default()
1222        };
1223        let engine = CorrelationEngine::new(config);
1224
1225        let v = json!({"@timestamp": "2024-07-10T12:30:00Z", "data": "test"});
1226        let event = Event::from_value(&v);
1227        let ts = engine.extract_event_timestamp(&event);
1228        assert_eq!(ts, Some(1720614600));
1229    }
1230
1231    #[test]
1232    fn test_extract_timestamp_fallback_fields() {
1233        let config = CorrelationConfig {
1234            timestamp_fields: vec![
1235                "@timestamp".to_string(),
1236                "timestamp".to_string(),
1237                "EventTime".to_string(),
1238            ],
1239            max_state_entries: 100_000,
1240            ..Default::default()
1241        };
1242        let engine = CorrelationEngine::new(config);
1243
1244        // First field missing, second field present
1245        let v = json!({"timestamp": 1720613400, "data": "test"});
1246        let event = Event::from_value(&v);
1247        let ts = engine.extract_event_timestamp(&event);
1248        assert_eq!(ts, Some(1720613400));
1249    }
1250
1251    #[test]
1252    fn test_extract_timestamp_returns_none_when_missing() {
1253        let config = CorrelationConfig {
1254            timestamp_fields: vec!["@timestamp".to_string()],
1255            ..Default::default()
1256        };
1257        let engine = CorrelationEngine::new(config);
1258
1259        let v = json!({"data": "no timestamp here"});
1260        let event = Event::from_value(&v);
1261        assert_eq!(engine.extract_event_timestamp(&event), None);
1262    }
1263
1264    #[test]
1265    fn test_timestamp_fallback_skip() {
1266        let yaml = r#"
1267title: test rule
1268id: ts-skip-rule
1269logsource:
1270    product: test
1271detection:
1272    selection:
1273        action: click
1274    condition: selection
1275level: low
1276---
1277title: test correlation
1278correlation:
1279    type: event_count
1280    rules:
1281        - ts-skip-rule
1282    group-by:
1283        - User
1284    timespan: 10s
1285    condition:
1286        gte: 2
1287level: high
1288"#;
1289        let collection = parse_sigma_yaml(yaml).unwrap();
1290        let mut engine = CorrelationEngine::new(CorrelationConfig {
1291            timestamp_fallback: TimestampFallback::Skip,
1292            ..Default::default()
1293        });
1294        engine.add_collection(&collection).unwrap();
1295        assert_eq!(engine.correlation_rule_count(), 1);
1296
1297        // Events with no timestamp field — should NOT update correlation state
1298        let v = json!({"action": "click", "User": "alice"});
1299        let event = Event::from_value(&v);
1300
1301        let r1 = engine.process_event(&event);
1302        assert!(!r1.detections.is_empty(), "detection should still fire");
1303
1304        let r2 = engine.process_event(&event);
1305        assert!(!r2.detections.is_empty(), "detection should still fire");
1306
1307        let r3 = engine.process_event(&event);
1308        assert!(!r3.detections.is_empty(), "detection should still fire");
1309
1310        // No correlations should fire because events were skipped
1311        assert!(r1.correlations.is_empty());
1312        assert!(r2.correlations.is_empty());
1313        assert!(r3.correlations.is_empty());
1314    }
1315
1316    #[test]
1317    fn test_timestamp_fallback_wallclock_default() {
1318        let yaml = r#"
1319title: test rule
1320id: ts-wc-rule
1321logsource:
1322    product: test
1323detection:
1324    selection:
1325        action: click
1326    condition: selection
1327level: low
1328---
1329title: test correlation
1330correlation:
1331    type: event_count
1332    rules:
1333        - ts-wc-rule
1334    group-by:
1335        - User
1336    timespan: 60s
1337    condition:
1338        gte: 2
1339level: high
1340"#;
1341        let collection = parse_sigma_yaml(yaml).unwrap();
1342        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1343        engine.add_collection(&collection).unwrap();
1344        assert_eq!(engine.correlation_rule_count(), 1);
1345
1346        // Events with no timestamp — WallClock fallback means they get Utc::now()
1347        // and should be close enough to correlate (generous 60s window)
1348        let v = json!({"action": "click", "User": "alice"});
1349        let event = Event::from_value(&v);
1350
1351        let _r1 = engine.process_event(&event);
1352        let _r2 = engine.process_event(&event);
1353        let r3 = engine.process_event(&event);
1354
1355        // With WallClock, all events get near-identical timestamps and should correlate
1356        assert!(
1357            !r3.correlations.is_empty(),
1358            "WallClock fallback should allow correlation"
1359        );
1360    }
1361
1362    // =========================================================================
1363    // Event count correlation
1364    // =========================================================================
1365
1366    #[test]
1367    fn test_event_count_basic() {
1368        let yaml = r#"
1369title: Base Rule
1370id: base-rule-001
1371name: base_rule
1372logsource:
1373    product: windows
1374    category: process_creation
1375detection:
1376    selection:
1377        CommandLine|contains: 'whoami'
1378    condition: selection
1379level: low
1380---
1381title: Multiple Whoami
1382id: corr-001
1383correlation:
1384    type: event_count
1385    rules:
1386        - base-rule-001
1387    group-by:
1388        - User
1389    timespan: 60s
1390    condition:
1391        gte: 3
1392level: high
1393"#;
1394        let collection = parse_sigma_yaml(yaml).unwrap();
1395        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1396        engine.add_collection(&collection).unwrap();
1397
1398        assert_eq!(engine.detection_rule_count(), 1);
1399        assert_eq!(engine.correlation_rule_count(), 1);
1400
1401        // Send 3 events from same user within the window
1402        let base_ts = 1000i64;
1403        for i in 0..3 {
1404            let v = json!({"CommandLine": "whoami", "User": "admin"});
1405            let event = Event::from_value(&v);
1406            let result = engine.process_event_at(&event, base_ts + i * 10);
1407
1408            // Each event should match the detection rule
1409            assert_eq!(result.detections.len(), 1);
1410
1411            if i < 2 {
1412                // Not enough events yet
1413                assert!(result.correlations.is_empty());
1414            } else {
1415                // 3rd event triggers the correlation
1416                assert_eq!(result.correlations.len(), 1);
1417                assert_eq!(result.correlations[0].rule_title, "Multiple Whoami");
1418                assert_eq!(result.correlations[0].aggregated_value, 3.0);
1419            }
1420        }
1421    }
1422
1423    #[test]
1424    fn test_event_count_different_groups() {
1425        let yaml = r#"
1426title: Login
1427id: login-001
1428logsource:
1429    category: auth
1430detection:
1431    selection:
1432        EventType: login
1433    condition: selection
1434level: low
1435---
1436title: Many Logins
1437id: corr-login
1438correlation:
1439    type: event_count
1440    rules:
1441        - login-001
1442    group-by:
1443        - User
1444    timespan: 60s
1445    condition:
1446        gte: 3
1447level: high
1448"#;
1449        let collection = parse_sigma_yaml(yaml).unwrap();
1450        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1451        engine.add_collection(&collection).unwrap();
1452
1453        // User "alice" sends 2 events, "bob" sends 3
1454        let ts = 1000i64;
1455        for i in 0..2 {
1456            let v = json!({"EventType": "login", "User": "alice"});
1457            let event = Event::from_value(&v);
1458            let r = engine.process_event_at(&event, ts + i);
1459            assert!(r.correlations.is_empty());
1460        }
1461        for i in 0..3 {
1462            let v = json!({"EventType": "login", "User": "bob"});
1463            let event = Event::from_value(&v);
1464            let r = engine.process_event_at(&event, ts + i);
1465            if i == 2 {
1466                assert_eq!(r.correlations.len(), 1);
1467                assert_eq!(
1468                    r.correlations[0].group_key,
1469                    vec![("User".to_string(), "bob".to_string())]
1470                );
1471            }
1472        }
1473    }
1474
1475    #[test]
1476    fn test_event_count_window_expiry() {
1477        let yaml = r#"
1478title: Base
1479id: base-002
1480logsource:
1481    category: test
1482detection:
1483    selection:
1484        action: click
1485    condition: selection
1486---
1487title: Rapid Clicks
1488id: corr-002
1489correlation:
1490    type: event_count
1491    rules:
1492        - base-002
1493    group-by:
1494        - User
1495    timespan: 10s
1496    condition:
1497        gte: 3
1498level: medium
1499"#;
1500        let collection = parse_sigma_yaml(yaml).unwrap();
1501        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1502        engine.add_collection(&collection).unwrap();
1503
1504        // Send 2 events at t=0,1 then 1 event at t=15 (outside window)
1505        let v = json!({"action": "click", "User": "admin"});
1506        let event = Event::from_value(&v);
1507        engine.process_event_at(&event, 0);
1508        engine.process_event_at(&event, 1);
1509        let r = engine.process_event_at(&event, 15);
1510        // Only 1 event in window [5, 15], not enough
1511        assert!(r.correlations.is_empty());
1512    }
1513
1514    // =========================================================================
1515    // Value count correlation
1516    // =========================================================================
1517
1518    #[test]
1519    fn test_value_count() {
1520        let yaml = r#"
1521title: Failed Login
1522id: failed-login-001
1523logsource:
1524    category: auth
1525detection:
1526    selection:
1527        EventType: failed_login
1528    condition: selection
1529level: low
1530---
1531title: Failed Logins From Many Users
1532id: corr-vc-001
1533correlation:
1534    type: value_count
1535    rules:
1536        - failed-login-001
1537    group-by:
1538        - Host
1539    timespan: 60s
1540    condition:
1541        field: User
1542        gte: 3
1543level: high
1544"#;
1545        let collection = parse_sigma_yaml(yaml).unwrap();
1546        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1547        engine.add_collection(&collection).unwrap();
1548
1549        let ts = 1000i64;
1550        // 3 different users failing login on same host
1551        for (i, user) in ["alice", "bob", "charlie"].iter().enumerate() {
1552            let v = json!({"EventType": "failed_login", "Host": "srv01", "User": user});
1553            let event = Event::from_value(&v);
1554            let r = engine.process_event_at(&event, ts + i as i64);
1555            if i == 2 {
1556                assert_eq!(r.correlations.len(), 1);
1557                assert_eq!(r.correlations[0].aggregated_value, 3.0);
1558            }
1559        }
1560    }
1561
1562    // =========================================================================
1563    // Temporal correlation
1564    // =========================================================================
1565
1566    #[test]
1567    fn test_temporal() {
1568        let yaml = r#"
1569title: Recon A
1570id: recon-a
1571name: recon_a
1572logsource:
1573    category: process
1574detection:
1575    selection:
1576        CommandLine|contains: 'whoami'
1577    condition: selection
1578---
1579title: Recon B
1580id: recon-b
1581name: recon_b
1582logsource:
1583    category: process
1584detection:
1585    selection:
1586        CommandLine|contains: 'ipconfig'
1587    condition: selection
1588---
1589title: Recon Combo
1590id: corr-temporal
1591correlation:
1592    type: temporal
1593    rules:
1594        - recon-a
1595        - recon-b
1596    group-by:
1597        - User
1598    timespan: 60s
1599    condition:
1600        gte: 2
1601level: high
1602"#;
1603        let collection = parse_sigma_yaml(yaml).unwrap();
1604        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1605        engine.add_collection(&collection).unwrap();
1606
1607        let ts = 1000i64;
1608        // Only recon A fires
1609        let v1 = json!({"CommandLine": "whoami", "User": "admin"});
1610        let ev1 = Event::from_value(&v1);
1611        let r1 = engine.process_event_at(&ev1, ts);
1612        assert!(r1.correlations.is_empty());
1613
1614        // Now recon B fires — both rules have fired within window
1615        let v2 = json!({"CommandLine": "ipconfig /all", "User": "admin"});
1616        let ev2 = Event::from_value(&v2);
1617        let r2 = engine.process_event_at(&ev2, ts + 10);
1618        assert_eq!(r2.correlations.len(), 1);
1619        assert_eq!(r2.correlations[0].rule_title, "Recon Combo");
1620    }
1621
1622    // =========================================================================
1623    // Temporal ordered correlation
1624    // =========================================================================
1625
1626    #[test]
1627    fn test_temporal_ordered() {
1628        let yaml = r#"
1629title: Failed Login
1630id: failed-001
1631name: failed_login
1632logsource:
1633    category: auth
1634detection:
1635    selection:
1636        EventType: failed_login
1637    condition: selection
1638---
1639title: Success Login
1640id: success-001
1641name: successful_login
1642logsource:
1643    category: auth
1644detection:
1645    selection:
1646        EventType: success_login
1647    condition: selection
1648---
1649title: Brute Force Then Login
1650id: corr-bf
1651correlation:
1652    type: temporal_ordered
1653    rules:
1654        - failed-001
1655        - success-001
1656    group-by:
1657        - User
1658    timespan: 60s
1659    condition:
1660        gte: 2
1661level: critical
1662"#;
1663        let collection = parse_sigma_yaml(yaml).unwrap();
1664        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1665        engine.add_collection(&collection).unwrap();
1666
1667        let ts = 1000i64;
1668        // Failed login first
1669        let v1 = json!({"EventType": "failed_login", "User": "admin"});
1670        let ev1 = Event::from_value(&v1);
1671        let r1 = engine.process_event_at(&ev1, ts);
1672        assert!(r1.correlations.is_empty());
1673
1674        // Then successful login — correct order!
1675        let v2 = json!({"EventType": "success_login", "User": "admin"});
1676        let ev2 = Event::from_value(&v2);
1677        let r2 = engine.process_event_at(&ev2, ts + 10);
1678        assert_eq!(r2.correlations.len(), 1);
1679    }
1680
1681    #[test]
1682    fn test_temporal_ordered_wrong_order() {
1683        let yaml = r#"
1684title: Rule A
1685id: rule-a
1686logsource:
1687    category: test
1688detection:
1689    selection:
1690        type: a
1691    condition: selection
1692---
1693title: Rule B
1694id: rule-b
1695logsource:
1696    category: test
1697detection:
1698    selection:
1699        type: b
1700    condition: selection
1701---
1702title: A then B
1703id: corr-ab
1704correlation:
1705    type: temporal_ordered
1706    rules:
1707        - rule-a
1708        - rule-b
1709    group-by:
1710        - User
1711    timespan: 60s
1712    condition:
1713        gte: 2
1714level: high
1715"#;
1716        let collection = parse_sigma_yaml(yaml).unwrap();
1717        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1718        engine.add_collection(&collection).unwrap();
1719
1720        let ts = 1000i64;
1721        // B fires first, then A — wrong order
1722        let v1 = json!({"type": "b", "User": "admin"});
1723        let ev1 = Event::from_value(&v1);
1724        engine.process_event_at(&ev1, ts);
1725
1726        let v2 = json!({"type": "a", "User": "admin"});
1727        let ev2 = Event::from_value(&v2);
1728        let r2 = engine.process_event_at(&ev2, ts + 10);
1729        assert!(r2.correlations.is_empty());
1730    }
1731
1732    // =========================================================================
1733    // Numeric aggregation (value_sum, value_avg)
1734    // =========================================================================
1735
1736    #[test]
1737    fn test_value_sum() {
1738        let yaml = r#"
1739title: Web Access
1740id: web-001
1741logsource:
1742    category: web
1743detection:
1744    selection:
1745        action: upload
1746    condition: selection
1747---
1748title: Large Upload
1749id: corr-sum
1750correlation:
1751    type: value_sum
1752    rules:
1753        - web-001
1754    group-by:
1755        - User
1756    timespan: 60s
1757    condition:
1758        field: bytes_sent
1759        gt: 1000
1760level: high
1761"#;
1762        let collection = parse_sigma_yaml(yaml).unwrap();
1763        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1764        engine.add_collection(&collection).unwrap();
1765
1766        let ts = 1000i64;
1767        let v1 = json!({"action": "upload", "User": "alice", "bytes_sent": 600});
1768        let ev1 = Event::from_value(&v1);
1769        let r1 = engine.process_event_at(&ev1, ts);
1770        assert!(r1.correlations.is_empty());
1771
1772        let v2 = json!({"action": "upload", "User": "alice", "bytes_sent": 500});
1773        let ev2 = Event::from_value(&v2);
1774        let r2 = engine.process_event_at(&ev2, ts + 5);
1775        assert_eq!(r2.correlations.len(), 1);
1776        assert!((r2.correlations[0].aggregated_value - 1100.0).abs() < f64::EPSILON);
1777    }
1778
1779    #[test]
1780    fn test_value_avg() {
1781        let yaml = r#"
1782title: Request
1783id: req-001
1784logsource:
1785    category: web
1786detection:
1787    selection:
1788        type: request
1789    condition: selection
1790---
1791title: High Avg Latency
1792id: corr-avg
1793correlation:
1794    type: value_avg
1795    rules:
1796        - req-001
1797    group-by:
1798        - Service
1799    timespan: 60s
1800    condition:
1801        field: latency_ms
1802        gt: 500
1803level: medium
1804"#;
1805        let collection = parse_sigma_yaml(yaml).unwrap();
1806        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1807        engine.add_collection(&collection).unwrap();
1808
1809        let ts = 1000i64;
1810        // Avg of 400, 600, 800 = 600 > 500
1811        for (i, latency) in [400, 600, 800].iter().enumerate() {
1812            let v = json!({"type": "request", "Service": "api", "latency_ms": latency});
1813            let event = Event::from_value(&v);
1814            let r = engine.process_event_at(&event, ts + i as i64);
1815            if i == 2 {
1816                assert_eq!(r.correlations.len(), 1);
1817                assert!((r.correlations[0].aggregated_value - 600.0).abs() < f64::EPSILON);
1818            }
1819        }
1820    }
1821
1822    // =========================================================================
1823    // State management
1824    // =========================================================================
1825
1826    #[test]
1827    fn test_state_count() {
1828        let yaml = r#"
1829title: Base
1830id: base-sc
1831logsource:
1832    category: test
1833detection:
1834    selection:
1835        action: test
1836    condition: selection
1837---
1838title: Count
1839id: corr-sc
1840correlation:
1841    type: event_count
1842    rules:
1843        - base-sc
1844    group-by:
1845        - User
1846    timespan: 60s
1847    condition:
1848        gte: 100
1849level: low
1850"#;
1851        let collection = parse_sigma_yaml(yaml).unwrap();
1852        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1853        engine.add_collection(&collection).unwrap();
1854
1855        let v = json!({"action": "test", "User": "alice"});
1856        let event = Event::from_value(&v);
1857        engine.process_event_at(&event, 1000);
1858        assert_eq!(engine.state_count(), 1);
1859
1860        let v2 = json!({"action": "test", "User": "bob"});
1861        let event2 = Event::from_value(&v2);
1862        engine.process_event_at(&event2, 1001);
1863        assert_eq!(engine.state_count(), 2);
1864
1865        // Evict everything
1866        engine.evict_expired(2000);
1867        assert_eq!(engine.state_count(), 0);
1868    }
1869
1870    // =========================================================================
1871    // Generate flag
1872    // =========================================================================
1873
1874    #[test]
1875    fn test_generate_flag_default_false() {
1876        let yaml = r#"
1877title: Base
1878id: gen-base
1879logsource:
1880    category: test
1881detection:
1882    selection:
1883        action: test
1884    condition: selection
1885---
1886title: Correlation
1887id: gen-corr
1888correlation:
1889    type: event_count
1890    rules:
1891        - gen-base
1892    group-by:
1893        - User
1894    timespan: 60s
1895    condition:
1896        gte: 1
1897level: high
1898"#;
1899        let collection = parse_sigma_yaml(yaml).unwrap();
1900        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1901        engine.add_collection(&collection).unwrap();
1902
1903        // generate defaults to false — detection matches are still returned
1904        // (filtering by generate flag is a backend concern, not eval)
1905        let v = json!({"action": "test", "User": "alice"});
1906        let event = Event::from_value(&v);
1907        let r = engine.process_event_at(&event, 1000);
1908        assert_eq!(r.detections.len(), 1);
1909        assert_eq!(r.correlations.len(), 1);
1910    }
1911
1912    // =========================================================================
1913    // Real-world example: AWS bucket enumeration
1914    // =========================================================================
1915
1916    #[test]
1917    fn test_aws_bucket_enumeration() {
1918        let yaml = r#"
1919title: Potential Bucket Enumeration on AWS
1920id: f305fd62-beca-47da-ad95-7690a0620084
1921logsource:
1922    product: aws
1923    service: cloudtrail
1924detection:
1925    selection:
1926        eventSource: "s3.amazonaws.com"
1927        eventName: "ListBuckets"
1928    condition: selection
1929level: low
1930---
1931title: Multiple AWS bucket enumerations
1932id: be246094-01d3-4bba-88de-69e582eba0cc
1933status: experimental
1934correlation:
1935    type: event_count
1936    rules:
1937        - f305fd62-beca-47da-ad95-7690a0620084
1938    group-by:
1939        - userIdentity.arn
1940    timespan: 1h
1941    condition:
1942        gte: 5
1943level: high
1944"#;
1945        let collection = parse_sigma_yaml(yaml).unwrap();
1946        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
1947        engine.add_collection(&collection).unwrap();
1948
1949        let base_ts = 1_700_000_000i64;
1950        for i in 0..5 {
1951            let v = json!({
1952                "eventSource": "s3.amazonaws.com",
1953                "eventName": "ListBuckets",
1954                "userIdentity.arn": "arn:aws:iam::123456789:user/attacker"
1955            });
1956            let event = Event::from_value(&v);
1957            let r = engine.process_event_at(&event, base_ts + i * 60);
1958            if i == 4 {
1959                assert_eq!(r.correlations.len(), 1);
1960                assert_eq!(
1961                    r.correlations[0].rule_title,
1962                    "Multiple AWS bucket enumerations"
1963                );
1964                assert_eq!(r.correlations[0].aggregated_value, 5.0);
1965            }
1966        }
1967    }
1968
1969    // =========================================================================
1970    // Chaining: event_count -> temporal_ordered
1971    // =========================================================================
1972
1973    #[test]
1974    fn test_chaining_event_count_to_temporal() {
1975        // Reproduces the spec's "failed logins followed by successful login" example.
1976        // Chain: failed_login (detection) -> many_failed (event_count) -> brute_then_login (temporal_ordered)
1977        let yaml = r#"
1978title: Single failed login
1979id: failed-login-chain
1980name: failed_login
1981logsource:
1982    category: auth
1983detection:
1984    selection:
1985        EventType: failed_login
1986    condition: selection
1987---
1988title: Successful login
1989id: success-login-chain
1990name: successful_login
1991logsource:
1992    category: auth
1993detection:
1994    selection:
1995        EventType: success_login
1996    condition: selection
1997---
1998title: Multiple failed logins
1999id: many-failed-chain
2000name: multiple_failed_login
2001correlation:
2002    type: event_count
2003    rules:
2004        - failed-login-chain
2005    group-by:
2006        - User
2007    timespan: 60s
2008    condition:
2009        gte: 3
2010level: medium
2011---
2012title: Brute Force Followed by Login
2013id: brute-force-chain
2014correlation:
2015    type: temporal_ordered
2016    rules:
2017        - many-failed-chain
2018        - success-login-chain
2019    group-by:
2020        - User
2021    timespan: 120s
2022    condition:
2023        gte: 2
2024level: critical
2025"#;
2026        let collection = parse_sigma_yaml(yaml).unwrap();
2027        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2028        engine.add_collection(&collection).unwrap();
2029
2030        assert_eq!(engine.detection_rule_count(), 2);
2031        assert_eq!(engine.correlation_rule_count(), 2);
2032
2033        let ts = 1000i64;
2034
2035        // Send 3 failed logins → triggers "many_failed_chain"
2036        for i in 0..3 {
2037            let v = json!({"EventType": "failed_login", "User": "victim"});
2038            let event = Event::from_value(&v);
2039            let r = engine.process_event_at(&event, ts + i);
2040            if i == 2 {
2041                // The event_count correlation should fire
2042                assert!(
2043                    r.correlations
2044                        .iter()
2045                        .any(|c| c.rule_title == "Multiple failed logins"),
2046                    "Expected event_count correlation to fire"
2047                );
2048            }
2049        }
2050
2051        // Now send a successful login → should trigger the chained temporal_ordered
2052        // Note: chaining happens in chain_correlations when many-failed-chain fires
2053        // and then success-login-chain matches the detection.
2054        // The temporal_ordered correlation needs BOTH many-failed-chain AND success-login-chain
2055        // to have fired. success-login-chain is a detection rule, not a correlation,
2056        // so it gets matched via the regular detection path.
2057        let v = json!({"EventType": "success_login", "User": "victim"});
2058        let event = Event::from_value(&v);
2059        let r = engine.process_event_at(&event, ts + 30);
2060
2061        // The detection should match
2062        assert_eq!(r.detections.len(), 1);
2063        assert_eq!(r.detections[0].rule_title, "Successful login");
2064    }
2065
2066    // =========================================================================
2067    // Field aliases
2068    // =========================================================================
2069
2070    #[test]
2071    fn test_field_aliases() {
2072        let yaml = r#"
2073title: Internal Error
2074id: internal-error-001
2075name: internal_error
2076logsource:
2077    category: web
2078detection:
2079    selection:
2080        http.response.status_code: 500
2081    condition: selection
2082---
2083title: New Connection
2084id: new-conn-001
2085name: new_network_connection
2086logsource:
2087    category: network
2088detection:
2089    selection:
2090        event.type: connection
2091    condition: selection
2092---
2093title: Error Then Connection
2094id: corr-alias
2095correlation:
2096    type: temporal
2097    rules:
2098        - internal-error-001
2099        - new-conn-001
2100    group-by:
2101        - internal_ip
2102    timespan: 60s
2103    condition:
2104        gte: 2
2105    aliases:
2106        internal_ip:
2107            internal_error: destination.ip
2108            new_network_connection: source.ip
2109level: high
2110"#;
2111        let collection = parse_sigma_yaml(yaml).unwrap();
2112        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2113        engine.add_collection(&collection).unwrap();
2114
2115        let ts = 1000i64;
2116
2117        // Internal error with destination.ip = 10.0.0.5
2118        let v1 = json!({
2119            "http.response.status_code": 500,
2120            "destination.ip": "10.0.0.5"
2121        });
2122        let ev1 = Event::from_value(&v1);
2123        let r1 = engine.process_event_at(&ev1, ts);
2124        assert_eq!(r1.detections.len(), 1);
2125        assert!(r1.correlations.is_empty());
2126
2127        // New connection with source.ip = 10.0.0.5 (same IP, aliased)
2128        let v2 = json!({
2129            "event.type": "connection",
2130            "source.ip": "10.0.0.5"
2131        });
2132        let ev2 = Event::from_value(&v2);
2133        let r2 = engine.process_event_at(&ev2, ts + 5);
2134        assert_eq!(r2.detections.len(), 1);
2135        // Both rules fired for the same internal_ip group → temporal should fire
2136        assert_eq!(r2.correlations.len(), 1);
2137        assert_eq!(r2.correlations[0].rule_title, "Error Then Connection");
2138        // Check group key contains the aliased field
2139        assert!(
2140            r2.correlations[0]
2141                .group_key
2142                .iter()
2143                .any(|(k, v)| k == "internal_ip" && v == "10.0.0.5")
2144        );
2145    }
2146
2147    // =========================================================================
2148    // Value percentile (basic smoke test)
2149    // =========================================================================
2150
2151    #[test]
2152    fn test_value_percentile() {
2153        let yaml = r#"
2154title: Process Creation
2155id: proc-001
2156logsource:
2157    category: process
2158detection:
2159    selection:
2160        type: process_creation
2161    condition: selection
2162---
2163title: Rare Process
2164id: corr-percentile
2165correlation:
2166    type: value_percentile
2167    rules:
2168        - proc-001
2169    group-by:
2170        - ComputerName
2171    timespan: 60s
2172    condition:
2173        field: image
2174        lte: 50
2175level: medium
2176"#;
2177        let collection = parse_sigma_yaml(yaml).unwrap();
2178        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2179        engine.add_collection(&collection).unwrap();
2180
2181        let ts = 1000i64;
2182        // Push some numeric-ish values for the image field
2183        for (i, val) in [10.0, 20.0, 30.0, 40.0, 50.0].iter().enumerate() {
2184            let v = json!({"type": "process_creation", "ComputerName": "srv01", "image": val});
2185            let event = Event::from_value(&v);
2186            let _ = engine.process_event_at(&event, ts + i as i64);
2187        }
2188        // The median (30.0) should be <= 50, so condition fires
2189        // Note: percentile implementation is simplified for in-memory eval
2190    }
2191
2192    // =========================================================================
2193    // Extended temporal conditions (end-to-end)
2194    // =========================================================================
2195
2196    #[test]
2197    fn test_extended_temporal_and_condition() {
2198        // Temporal correlation with "rule_a and rule_b" extended condition
2199        let yaml = r#"
2200title: Login Attempt
2201id: login-attempt
2202logsource:
2203    category: auth
2204detection:
2205    selection:
2206        EventType: login_failure
2207    condition: selection
2208---
2209title: Password Change
2210id: password-change
2211logsource:
2212    category: auth
2213detection:
2214    selection:
2215        EventType: password_change
2216    condition: selection
2217---
2218title: Credential Attack
2219correlation:
2220    type: temporal
2221    rules:
2222        - login-attempt
2223        - password-change
2224    group-by:
2225        - User
2226    timespan: 300s
2227    condition: login-attempt and password-change
2228level: high
2229"#;
2230        let collection = parse_sigma_yaml(yaml).unwrap();
2231        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2232        engine.add_collection(&collection).unwrap();
2233
2234        let ts = 1000i64;
2235
2236        // Login failure by alice
2237        let ev1 = json!({"EventType": "login_failure", "User": "alice"});
2238        let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2239        assert!(r1.correlations.is_empty(), "only one rule fired so far");
2240
2241        // Password change by alice — both rules have now fired
2242        let ev2 = json!({"EventType": "password_change", "User": "alice"});
2243        let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 10);
2244        assert_eq!(
2245            r2.correlations.len(),
2246            1,
2247            "temporal correlation should fire: both rules matched"
2248        );
2249        assert_eq!(r2.correlations[0].rule_title, "Credential Attack");
2250    }
2251
2252    #[test]
2253    fn test_extended_temporal_or_condition() {
2254        // Temporal with "rule_a or rule_b" — should fire when either fires
2255        let yaml = r#"
2256title: SSH Login
2257id: ssh-login
2258logsource:
2259    category: auth
2260detection:
2261    selection:
2262        EventType: ssh_login
2263    condition: selection
2264---
2265title: VPN Login
2266id: vpn-login
2267logsource:
2268    category: auth
2269detection:
2270    selection:
2271        EventType: vpn_login
2272    condition: selection
2273---
2274title: Any Remote Access
2275correlation:
2276    type: temporal
2277    rules:
2278        - ssh-login
2279        - vpn-login
2280    group-by:
2281        - User
2282    timespan: 60s
2283    condition: ssh-login or vpn-login
2284level: medium
2285"#;
2286        let collection = parse_sigma_yaml(yaml).unwrap();
2287        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2288        engine.add_collection(&collection).unwrap();
2289
2290        // Only SSH login by bob — "or" means this suffices
2291        let ev = json!({"EventType": "ssh_login", "User": "bob"});
2292        let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2293        assert_eq!(r.correlations.len(), 1);
2294        assert_eq!(r.correlations[0].rule_title, "Any Remote Access");
2295    }
2296
2297    #[test]
2298    fn test_extended_temporal_partial_and_no_fire() {
2299        // Temporal "and" with only one rule firing should not trigger
2300        let yaml = r#"
2301title: Recon Step 1
2302id: recon-1
2303logsource:
2304    category: process
2305detection:
2306    selection:
2307        CommandLine|contains: 'whoami'
2308    condition: selection
2309---
2310title: Recon Step 2
2311id: recon-2
2312logsource:
2313    category: process
2314detection:
2315    selection:
2316        CommandLine|contains: 'ipconfig'
2317    condition: selection
2318---
2319title: Full Recon
2320correlation:
2321    type: temporal
2322    rules:
2323        - recon-1
2324        - recon-2
2325    group-by:
2326        - Host
2327    timespan: 120s
2328    condition: recon-1 and recon-2
2329level: high
2330"#;
2331        let collection = parse_sigma_yaml(yaml).unwrap();
2332        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2333        engine.add_collection(&collection).unwrap();
2334
2335        // Only whoami (recon-1) — should not fire
2336        let ev = json!({"CommandLine": "whoami", "Host": "srv01"});
2337        let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2338        assert!(r.correlations.is_empty(), "only one of two AND rules fired");
2339
2340        // Now ipconfig (recon-2) — should fire
2341        let ev2 = json!({"CommandLine": "ipconfig /all", "Host": "srv01"});
2342        let r2 = engine.process_event_at(&Event::from_value(&ev2), 1010);
2343        assert_eq!(r2.correlations.len(), 1);
2344        assert_eq!(r2.correlations[0].rule_title, "Full Recon");
2345    }
2346
2347    // =========================================================================
2348    // Filter rules with correlation engine
2349    // =========================================================================
2350
2351    #[test]
2352    fn test_filter_with_correlation() {
2353        // Detection rule + filter + event_count correlation
2354        let yaml = r#"
2355title: Failed Auth
2356id: failed-auth
2357logsource:
2358    category: auth
2359detection:
2360    selection:
2361        EventType: auth_failure
2362    condition: selection
2363---
2364title: Exclude Service Accounts
2365filter:
2366    rules:
2367        - failed-auth
2368    selection:
2369        User|startswith: 'svc_'
2370    condition: selection
2371---
2372title: Brute Force
2373correlation:
2374    type: event_count
2375    rules:
2376        - failed-auth
2377    group-by:
2378        - User
2379    timespan: 300s
2380    condition:
2381        gte: 3
2382level: critical
2383"#;
2384        let collection = parse_sigma_yaml(yaml).unwrap();
2385        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2386        engine.add_collection(&collection).unwrap();
2387
2388        let ts = 1000i64;
2389
2390        // Service account failures should be filtered — don't count
2391        for i in 0..5 {
2392            let ev = json!({"EventType": "auth_failure", "User": "svc_backup"});
2393            let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2394            assert!(
2395                r.correlations.is_empty(),
2396                "service account should be filtered, no correlation"
2397            );
2398        }
2399
2400        // Normal user failures should count
2401        for i in 0..2 {
2402            let ev = json!({"EventType": "auth_failure", "User": "alice"});
2403            let r = engine.process_event_at(&Event::from_value(&ev), ts + 10 + i);
2404            assert!(r.correlations.is_empty(), "not yet 3 events");
2405        }
2406
2407        // Third failure triggers correlation
2408        let ev = json!({"EventType": "auth_failure", "User": "alice"});
2409        let r = engine.process_event_at(&Event::from_value(&ev), ts + 12);
2410        assert_eq!(r.correlations.len(), 1);
2411        assert_eq!(r.correlations[0].rule_title, "Brute Force");
2412    }
2413
2414    // =========================================================================
2415    // action: repeat with correlation engine
2416    // =========================================================================
2417
2418    #[test]
2419    fn test_repeat_rules_in_correlation() {
2420        // Two detection rules via repeat, both feed into event_count
2421        let yaml = r#"
2422title: File Access A
2423id: file-a
2424logsource:
2425    category: file_access
2426detection:
2427    selection:
2428        FileName|endswith: '.docx'
2429    condition: selection
2430---
2431action: repeat
2432title: File Access B
2433id: file-b
2434detection:
2435    selection:
2436        FileName|endswith: '.xlsx'
2437    condition: selection
2438---
2439title: Mass File Access
2440correlation:
2441    type: event_count
2442    rules:
2443        - file-a
2444        - file-b
2445    group-by:
2446        - User
2447    timespan: 60s
2448    condition:
2449        gte: 3
2450level: high
2451"#;
2452        let collection = parse_sigma_yaml(yaml).unwrap();
2453        assert_eq!(collection.rules.len(), 2);
2454        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2455        engine.add_collection(&collection).unwrap();
2456        assert_eq!(engine.detection_rule_count(), 2);
2457
2458        let ts = 1000i64;
2459        // Mix of docx and xlsx accesses by same user
2460        let ev1 = json!({"FileName": "report.docx", "User": "bob"});
2461        engine.process_event_at(&Event::from_value(&ev1), ts);
2462        let ev2 = json!({"FileName": "data.xlsx", "User": "bob"});
2463        engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2464        let ev3 = json!({"FileName": "notes.docx", "User": "bob"});
2465        let r = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2466
2467        assert_eq!(r.correlations.len(), 1);
2468        assert_eq!(r.correlations[0].rule_title, "Mass File Access");
2469    }
2470
2471    // =========================================================================
2472    // Expand modifier with correlation engine
2473    // =========================================================================
2474
2475    #[test]
2476    fn test_expand_modifier_with_correlation() {
2477        let yaml = r#"
2478title: User Temp File
2479id: user-temp
2480logsource:
2481    category: file_access
2482detection:
2483    selection:
2484        FilePath|expand: 'C:\Users\%User%\Temp'
2485    condition: selection
2486---
2487title: Excessive Temp Access
2488correlation:
2489    type: event_count
2490    rules:
2491        - user-temp
2492    group-by:
2493        - User
2494    timespan: 60s
2495    condition:
2496        gte: 2
2497level: medium
2498"#;
2499        let collection = parse_sigma_yaml(yaml).unwrap();
2500        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2501        engine.add_collection(&collection).unwrap();
2502
2503        let ts = 1000i64;
2504        // Event where User field matches the placeholder
2505        let ev1 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2506        let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2507        assert!(r1.correlations.is_empty());
2508
2509        let ev2 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
2510        let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2511        assert_eq!(r2.correlations.len(), 1);
2512        assert_eq!(r2.correlations[0].rule_title, "Excessive Temp Access");
2513
2514        // Different user — should NOT match (path says alice, user is bob)
2515        let ev3 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "bob"});
2516        let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2517        // Detection doesn't fire for this event since expand resolves to C:\Users\bob\Temp
2518        assert_eq!(r3.detections.len(), 0);
2519    }
2520
2521    // =========================================================================
2522    // Timestamp modifier with correlation engine
2523    // =========================================================================
2524
2525    #[test]
2526    fn test_timestamp_modifier_with_correlation() {
2527        let yaml = r#"
2528title: Night Login
2529id: night-login
2530logsource:
2531    category: auth
2532detection:
2533    login:
2534        EventType: login
2535    night:
2536        Timestamp|hour: 3
2537    condition: login and night
2538---
2539title: Frequent Night Logins
2540correlation:
2541    type: event_count
2542    rules:
2543        - night-login
2544    group-by:
2545        - User
2546    timespan: 3600s
2547    condition:
2548        gte: 2
2549level: high
2550"#;
2551        let collection = parse_sigma_yaml(yaml).unwrap();
2552        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2553        engine.add_collection(&collection).unwrap();
2554
2555        let ts = 1000i64;
2556        // Login at 3AM
2557        let ev1 =
2558            json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:10:00Z"});
2559        let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
2560        assert_eq!(r1.detections.len(), 1);
2561        assert!(r1.correlations.is_empty());
2562
2563        let ev2 =
2564            json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:45:00Z"});
2565        let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 1);
2566        assert_eq!(r2.correlations.len(), 1);
2567        assert_eq!(r2.correlations[0].rule_title, "Frequent Night Logins");
2568
2569        // Login at noon — should NOT count
2570        let ev3 = json!({"EventType": "login", "User": "bob", "Timestamp": "2024-01-15T12:00:00Z"});
2571        let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
2572        assert!(
2573            r3.detections.is_empty(),
2574            "noon login should not match night rule"
2575        );
2576    }
2577
2578    // =========================================================================
2579    // Correlation condition range (multiple predicates)
2580    // =========================================================================
2581
2582    #[test]
2583    fn test_event_count_range_condition() {
2584        let yaml = r#"
2585title: Login Attempt
2586id: login-attempt-001
2587name: login_attempt
2588logsource:
2589    product: windows
2590detection:
2591    selection:
2592        EventType: login
2593    condition: selection
2594level: low
2595---
2596title: Login Count Range
2597id: corr-range-001
2598correlation:
2599    type: event_count
2600    rules:
2601        - login-attempt-001
2602    group-by:
2603        - User
2604    timespan: 3600s
2605    condition:
2606        gt: 2
2607        lte: 5
2608level: high
2609"#;
2610        let collection = parse_sigma_yaml(yaml).unwrap();
2611        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2612        engine.add_collection(&collection).unwrap();
2613
2614        let ts: i64 = 1_000_000;
2615
2616        // Send 2 events — gt:2 is false
2617        for i in 0..2 {
2618            let ev = json!({"EventType": "login", "User": "alice"});
2619            let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2620            assert!(r.correlations.is_empty(), "2 events should not fire (gt:2)");
2621        }
2622
2623        // 3rd event — gt:2 is true, lte:5 is true → fires
2624        let ev3 = json!({"EventType": "login", "User": "alice"});
2625        let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 3);
2626        assert_eq!(r3.correlations.len(), 1, "3 events: gt:2 AND lte:5");
2627
2628        // Send events 4, 5 — still in range
2629        for i in 4..=5 {
2630            let ev = json!({"EventType": "login", "User": "alice"});
2631            let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
2632            assert_eq!(r.correlations.len(), 1, "{i} events still in range");
2633        }
2634
2635        // 6th event — lte:5 is false → no fire
2636        let ev6 = json!({"EventType": "login", "User": "alice"});
2637        let r6 = engine.process_event_at(&Event::from_value(&ev6), ts + 6);
2638        assert!(
2639            r6.correlations.is_empty(),
2640            "6 events exceeds lte:5, should not fire"
2641        );
2642    }
2643
2644    // =========================================================================
2645    // Suppression
2646    // =========================================================================
2647
2648    fn suppression_yaml() -> &'static str {
2649        r#"
2650title: Login
2651id: login-base
2652logsource:
2653    category: auth
2654detection:
2655    selection:
2656        EventType: login
2657    condition: selection
2658---
2659title: Many Logins
2660correlation:
2661    type: event_count
2662    rules:
2663        - login-base
2664    group-by:
2665        - User
2666    timeframe: 60s
2667    condition:
2668        gte: 3
2669level: high
2670"#
2671    }
2672
2673    #[test]
2674    fn test_suppression_window() {
2675        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2676        let config = CorrelationConfig {
2677            suppress: Some(10), // suppress for 10 seconds
2678            ..Default::default()
2679        };
2680        let mut engine = CorrelationEngine::new(config);
2681        engine.add_collection(&collection).unwrap();
2682
2683        let ev = json!({"EventType": "login", "User": "alice"});
2684        let ts = 1000;
2685
2686        // Fire 3 events to hit threshold
2687        engine.process_event_at(&Event::from_value(&ev), ts);
2688        engine.process_event_at(&Event::from_value(&ev), ts + 1);
2689        let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
2690        assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
2691
2692        // 4th event within suppress window → suppressed
2693        let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
2694        assert!(
2695            r4.correlations.is_empty(),
2696            "should be suppressed within 10s window"
2697        );
2698
2699        // 5th event still within suppress window → suppressed
2700        let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 9);
2701        assert!(
2702            r5.correlations.is_empty(),
2703            "should be suppressed at ts+9 (< ts+2+10)"
2704        );
2705
2706        // Event after suppress window expires → fires again
2707        let r6 = engine.process_event_at(&Event::from_value(&ev), ts + 13);
2708        assert_eq!(
2709            r6.correlations.len(),
2710            1,
2711            "should fire again after suppress window expires"
2712        );
2713    }
2714
2715    #[test]
2716    fn test_suppression_per_group_key() {
2717        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2718        let config = CorrelationConfig {
2719            suppress: Some(60),
2720            ..Default::default()
2721        };
2722        let mut engine = CorrelationEngine::new(config);
2723        engine.add_collection(&collection).unwrap();
2724
2725        let ts = 1000;
2726
2727        // Alice hits threshold
2728        let ev_a = json!({"EventType": "login", "User": "alice"});
2729        engine.process_event_at(&Event::from_value(&ev_a), ts);
2730        engine.process_event_at(&Event::from_value(&ev_a), ts + 1);
2731        let r = engine.process_event_at(&Event::from_value(&ev_a), ts + 2);
2732        assert_eq!(r.correlations.len(), 1, "alice should fire");
2733
2734        // Bob hits threshold — different group key, not suppressed
2735        let ev_b = json!({"EventType": "login", "User": "bob"});
2736        engine.process_event_at(&Event::from_value(&ev_b), ts + 3);
2737        engine.process_event_at(&Event::from_value(&ev_b), ts + 4);
2738        let r = engine.process_event_at(&Event::from_value(&ev_b), ts + 5);
2739        assert_eq!(r.correlations.len(), 1, "bob should fire independently");
2740
2741        // Alice is still suppressed
2742        let r = engine.process_event_at(&Event::from_value(&ev_a), ts + 6);
2743        assert!(r.correlations.is_empty(), "alice still suppressed");
2744    }
2745
2746    // =========================================================================
2747    // Action on match: Reset
2748    // =========================================================================
2749
2750    #[test]
2751    fn test_action_reset() {
2752        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2753        let config = CorrelationConfig {
2754            action_on_match: CorrelationAction::Reset,
2755            ..Default::default()
2756        };
2757        let mut engine = CorrelationEngine::new(config);
2758        engine.add_collection(&collection).unwrap();
2759
2760        let ev = json!({"EventType": "login", "User": "alice"});
2761        let ts = 1000;
2762
2763        // Hit threshold: 3 events
2764        engine.process_event_at(&Event::from_value(&ev), ts);
2765        engine.process_event_at(&Event::from_value(&ev), ts + 1);
2766        let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
2767        assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
2768
2769        // State was reset, so 4th and 5th events should NOT fire
2770        let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
2771        assert!(r4.correlations.is_empty(), "reset: need 3 more events");
2772
2773        let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 4);
2774        assert!(r5.correlations.is_empty(), "reset: still only 2");
2775
2776        // 6th event (3rd after reset) should fire again
2777        let r6 = engine.process_event_at(&Event::from_value(&ev), ts + 5);
2778        assert_eq!(
2779            r6.correlations.len(),
2780            1,
2781            "should fire again after 3 events post-reset"
2782        );
2783    }
2784
2785    // =========================================================================
2786    // Generate flag / emit_detections
2787    // =========================================================================
2788
2789    #[test]
2790    fn test_emit_detections_true_by_default() {
2791        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2792        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2793        engine.add_collection(&collection).unwrap();
2794
2795        let ev = json!({"EventType": "login", "User": "alice"});
2796        let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2797        assert_eq!(r.detections.len(), 1, "by default detections are emitted");
2798    }
2799
2800    #[test]
2801    fn test_emit_detections_false_suppresses() {
2802        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2803        let config = CorrelationConfig {
2804            emit_detections: false,
2805            ..Default::default()
2806        };
2807        let mut engine = CorrelationEngine::new(config);
2808        engine.add_collection(&collection).unwrap();
2809
2810        let ev = json!({"EventType": "login", "User": "alice"});
2811        let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2812        assert!(
2813            r.detections.is_empty(),
2814            "detection matches should be suppressed when emit_detections=false"
2815        );
2816    }
2817
2818    #[test]
2819    fn test_generate_true_keeps_detections() {
2820        // When generate: true, detections should be emitted even with emit_detections=false
2821        let yaml = r#"
2822title: Login
2823id: login-gen
2824logsource:
2825    category: auth
2826detection:
2827    selection:
2828        EventType: login
2829    condition: selection
2830---
2831title: Many Logins
2832correlation:
2833    type: event_count
2834    rules:
2835        - login-gen
2836    group-by:
2837        - User
2838    timeframe: 60s
2839    condition:
2840        gte: 3
2841    generate: true
2842level: high
2843"#;
2844        let collection = parse_sigma_yaml(yaml).unwrap();
2845        let config = CorrelationConfig {
2846            emit_detections: false,
2847            ..Default::default()
2848        };
2849        let mut engine = CorrelationEngine::new(config);
2850        engine.add_collection(&collection).unwrap();
2851
2852        let ev = json!({"EventType": "login", "User": "alice"});
2853        let r = engine.process_event_at(&Event::from_value(&ev), 1000);
2854        // generate: true means this rule is NOT correlation-only
2855        assert_eq!(
2856            r.detections.len(),
2857            1,
2858            "generate:true keeps detection output"
2859        );
2860    }
2861
2862    // =========================================================================
2863    // Suppression + Reset combined
2864    // =========================================================================
2865
2866    #[test]
2867    fn test_suppress_and_reset_combined() {
2868        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2869        let config = CorrelationConfig {
2870            suppress: Some(5),
2871            action_on_match: CorrelationAction::Reset,
2872            ..Default::default()
2873        };
2874        let mut engine = CorrelationEngine::new(config);
2875        engine.add_collection(&collection).unwrap();
2876
2877        let ev = json!({"EventType": "login", "User": "alice"});
2878        let ts = 1000;
2879
2880        // Hit threshold: fires and resets
2881        engine.process_event_at(&Event::from_value(&ev), ts);
2882        engine.process_event_at(&Event::from_value(&ev), ts + 1);
2883        let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
2884        assert_eq!(r3.correlations.len(), 1, "fires on 3rd event");
2885
2886        // Push 3 more events quickly (state was reset, so new count → 3)
2887        // but suppress window hasn't expired (ts+2 + 5 = ts+7)
2888        engine.process_event_at(&Event::from_value(&ev), ts + 3);
2889        engine.process_event_at(&Event::from_value(&ev), ts + 4);
2890        let r = engine.process_event_at(&Event::from_value(&ev), ts + 5);
2891        assert!(
2892            r.correlations.is_empty(),
2893            "threshold met again but still suppressed"
2894        );
2895
2896        // After suppress expires (at ts+8, which is ts+2+6 > suppress=5),
2897        // the accumulated events from step 2 (ts+3,4,5) still satisfy gte:3,
2898        // so the first event after expiry fires immediately and resets.
2899        let r = engine.process_event_at(&Event::from_value(&ev), ts + 8);
2900        assert_eq!(
2901            r.correlations.len(),
2902            1,
2903            "fires after suppress expires (accumulated events + new one)"
2904        );
2905
2906        // State was reset again at ts+8, suppress window now ts+8..ts+13.
2907        // Need 3 new events to fire, and suppress must expire.
2908        engine.process_event_at(&Event::from_value(&ev), ts + 9);
2909        engine.process_event_at(&Event::from_value(&ev), ts + 10);
2910        let r = engine.process_event_at(&Event::from_value(&ev), ts + 11);
2911        assert!(
2912            r.correlations.is_empty(),
2913            "threshold met but suppress window hasn't expired (ts+11 - ts+8 = 3 < 5)"
2914        );
2915    }
2916
2917    // =========================================================================
2918    // No suppression (default behavior preserved)
2919    // =========================================================================
2920
2921    #[test]
2922    fn test_no_suppression_fires_every_event() {
2923        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
2924        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2925        engine.add_collection(&collection).unwrap();
2926
2927        let ev = json!({"EventType": "login", "User": "alice"});
2928        let ts = 1000;
2929
2930        engine.process_event_at(&Event::from_value(&ev), ts);
2931        engine.process_event_at(&Event::from_value(&ev), ts + 1);
2932        let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
2933        assert_eq!(r3.correlations.len(), 1);
2934
2935        // Without suppression, 4th event should also fire
2936        let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
2937        assert_eq!(
2938            r4.correlations.len(),
2939            1,
2940            "no suppression: fires on every event after threshold"
2941        );
2942
2943        let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 4);
2944        assert_eq!(r5.correlations.len(), 1, "still fires");
2945    }
2946
2947    // =========================================================================
2948    // Custom attribute → engine config tests
2949    // =========================================================================
2950
2951    #[test]
2952    fn test_custom_attr_timestamp_field() {
2953        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2954        let mut attrs = std::collections::HashMap::new();
2955        attrs.insert("rsigma.timestamp_field".to_string(), "time".to_string());
2956        engine.apply_custom_attributes(&attrs);
2957
2958        assert_eq!(
2959            engine.config.timestamp_fields[0], "time",
2960            "rsigma.timestamp_field should be prepended"
2961        );
2962        // Defaults should still be there after the custom one
2963        assert!(
2964            engine
2965                .config
2966                .timestamp_fields
2967                .contains(&"@timestamp".to_string())
2968        );
2969    }
2970
2971    #[test]
2972    fn test_custom_attr_timestamp_field_no_duplicates() {
2973        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2974        let mut attrs = std::collections::HashMap::new();
2975        attrs.insert("rsigma.timestamp_field".to_string(), "time".to_string());
2976        // Apply twice — should not duplicate
2977        engine.apply_custom_attributes(&attrs);
2978        engine.apply_custom_attributes(&attrs);
2979
2980        let count = engine
2981            .config
2982            .timestamp_fields
2983            .iter()
2984            .filter(|f| *f == "time")
2985            .count();
2986        assert_eq!(count, 1, "should not duplicate timestamp_field entries");
2987    }
2988
2989    #[test]
2990    fn test_custom_attr_suppress() {
2991        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
2992        assert!(engine.config.suppress.is_none());
2993
2994        let mut attrs = std::collections::HashMap::new();
2995        attrs.insert("rsigma.suppress".to_string(), "5m".to_string());
2996        engine.apply_custom_attributes(&attrs);
2997
2998        assert_eq!(engine.config.suppress, Some(300));
2999    }
3000
3001    #[test]
3002    fn test_custom_attr_suppress_does_not_override_cli() {
3003        let config = CorrelationConfig {
3004            suppress: Some(60), // CLI set to 60s
3005            ..Default::default()
3006        };
3007        let mut engine = CorrelationEngine::new(config);
3008
3009        let mut attrs = std::collections::HashMap::new();
3010        attrs.insert("rsigma.suppress".to_string(), "5m".to_string());
3011        engine.apply_custom_attributes(&attrs);
3012
3013        assert_eq!(
3014            engine.config.suppress,
3015            Some(60),
3016            "CLI suppress should not be overridden by custom attribute"
3017        );
3018    }
3019
3020    #[test]
3021    fn test_custom_attr_action() {
3022        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3023        assert_eq!(engine.config.action_on_match, CorrelationAction::Alert);
3024
3025        let mut attrs = std::collections::HashMap::new();
3026        attrs.insert("rsigma.action".to_string(), "reset".to_string());
3027        engine.apply_custom_attributes(&attrs);
3028
3029        assert_eq!(engine.config.action_on_match, CorrelationAction::Reset);
3030    }
3031
3032    #[test]
3033    fn test_custom_attr_action_does_not_override_cli() {
3034        let config = CorrelationConfig {
3035            action_on_match: CorrelationAction::Reset, // CLI set to reset
3036            ..Default::default()
3037        };
3038        let mut engine = CorrelationEngine::new(config);
3039
3040        let mut attrs = std::collections::HashMap::new();
3041        attrs.insert("rsigma.action".to_string(), "alert".to_string());
3042        engine.apply_custom_attributes(&attrs);
3043
3044        assert_eq!(
3045            engine.config.action_on_match,
3046            CorrelationAction::Reset,
3047            "CLI action should not be overridden by custom attribute"
3048        );
3049    }
3050
3051    #[test]
3052    fn test_custom_attr_timestamp_field_used_for_extraction() {
3053        // The event has "time" but not "@timestamp" or "timestamp"
3054        let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
3055        let mut config = CorrelationConfig::default();
3056        // Prepend "event_time" to simulate --timestamp-field
3057        config.timestamp_fields.insert(0, "event_time".to_string());
3058        let mut engine = CorrelationEngine::new(config);
3059        engine.add_collection(&collection).unwrap();
3060
3061        // Event with "event_time" field
3062        let ev = json!({
3063            "EventType": "login",
3064            "User": "alice",
3065            "event_time": "2026-02-11T12:00:00Z"
3066        });
3067        let result = engine.process_event(&Event::from_value(&ev));
3068
3069        // The detection should match, and timestamp should be ~1739275200 (2026-02-11)
3070        assert!(!result.detections.is_empty() || result.correlations.is_empty());
3071        // The key test: ensure the engine extracted the event timestamp, not Utc::now.
3072        // If it used Utc::now, the test would still pass but the timestamp would be
3073        // wildly different. We verify by checking the extracted value directly.
3074        let ts = engine
3075            .extract_event_timestamp(&Event::from_value(&ev))
3076            .expect("should extract timestamp");
3077        assert!(
3078            ts > 1_700_000_000 && ts < 1_800_000_000,
3079            "timestamp should be ~2026 epoch, got {ts}"
3080        );
3081    }
3082
3083    // =========================================================================
3084    // Cycle detection
3085    // =========================================================================
3086
3087    #[test]
3088    fn test_correlation_cycle_direct() {
3089        // Two correlations that reference each other: A -> B -> A
3090        let yaml = r#"
3091title: detection rule
3092id: det-rule
3093logsource:
3094    product: test
3095detection:
3096    selection:
3097        action: click
3098    condition: selection
3099level: low
3100---
3101title: correlation A
3102id: corr-a
3103correlation:
3104    type: event_count
3105    rules:
3106        - corr-b
3107    group-by:
3108        - User
3109    timespan: 5m
3110    condition:
3111        gte: 2
3112level: high
3113---
3114title: correlation B
3115id: corr-b
3116correlation:
3117    type: event_count
3118    rules:
3119        - corr-a
3120    group-by:
3121        - User
3122    timespan: 5m
3123    condition:
3124        gte: 2
3125level: high
3126"#;
3127        let collection = parse_sigma_yaml(yaml).unwrap();
3128        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3129        let result = engine.add_collection(&collection);
3130        assert!(result.is_err(), "should detect direct cycle");
3131        let err = result.unwrap_err().to_string();
3132        assert!(err.contains("cycle"), "error should mention cycle: {err}");
3133        assert!(
3134            err.contains("corr-a") && err.contains("corr-b"),
3135            "error should name both correlations: {err}"
3136        );
3137    }
3138
3139    #[test]
3140    fn test_correlation_cycle_self() {
3141        // A correlation that references itself
3142        let yaml = r#"
3143title: detection rule
3144id: det-rule
3145logsource:
3146    product: test
3147detection:
3148    selection:
3149        action: click
3150    condition: selection
3151level: low
3152---
3153title: self-ref correlation
3154id: self-corr
3155correlation:
3156    type: event_count
3157    rules:
3158        - self-corr
3159    group-by:
3160        - User
3161    timespan: 5m
3162    condition:
3163        gte: 2
3164level: high
3165"#;
3166        let collection = parse_sigma_yaml(yaml).unwrap();
3167        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3168        let result = engine.add_collection(&collection);
3169        assert!(result.is_err(), "should detect self-referencing cycle");
3170        let err = result.unwrap_err().to_string();
3171        assert!(err.contains("cycle"), "error should mention cycle: {err}");
3172        assert!(
3173            err.contains("self-corr"),
3174            "error should name the correlation: {err}"
3175        );
3176    }
3177
3178    #[test]
3179    fn test_correlation_no_cycle_valid_chain() {
3180        // Valid chain: detection -> corr-A -> corr-B (no cycle)
3181        let yaml = r#"
3182title: detection rule
3183id: det-rule
3184logsource:
3185    product: test
3186detection:
3187    selection:
3188        action: click
3189    condition: selection
3190level: low
3191---
3192title: correlation A
3193id: corr-a
3194correlation:
3195    type: event_count
3196    rules:
3197        - det-rule
3198    group-by:
3199        - User
3200    timespan: 5m
3201    condition:
3202        gte: 2
3203level: high
3204---
3205title: correlation B
3206id: corr-b
3207correlation:
3208    type: event_count
3209    rules:
3210        - corr-a
3211    group-by:
3212        - User
3213    timespan: 5m
3214    condition:
3215        gte: 2
3216level: high
3217"#;
3218        let collection = parse_sigma_yaml(yaml).unwrap();
3219        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3220        let result = engine.add_collection(&collection);
3221        assert!(
3222            result.is_ok(),
3223            "valid chain should not be rejected: {result:?}"
3224        );
3225    }
3226
3227    #[test]
3228    fn test_correlation_cycle_transitive() {
3229        // Transitive cycle: A -> B -> C -> A
3230        let yaml = r#"
3231title: detection rule
3232id: det-rule
3233logsource:
3234    product: test
3235detection:
3236    selection:
3237        action: click
3238    condition: selection
3239level: low
3240---
3241title: correlation A
3242id: corr-a
3243correlation:
3244    type: event_count
3245    rules:
3246        - corr-c
3247    group-by:
3248        - User
3249    timespan: 5m
3250    condition:
3251        gte: 2
3252level: high
3253---
3254title: correlation B
3255id: corr-b
3256correlation:
3257    type: event_count
3258    rules:
3259        - corr-a
3260    group-by:
3261        - User
3262    timespan: 5m
3263    condition:
3264        gte: 2
3265level: high
3266---
3267title: correlation C
3268id: corr-c
3269correlation:
3270    type: event_count
3271    rules:
3272        - corr-b
3273    group-by:
3274        - User
3275    timespan: 5m
3276    condition:
3277        gte: 2
3278level: high
3279"#;
3280        let collection = parse_sigma_yaml(yaml).unwrap();
3281        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3282        let result = engine.add_collection(&collection);
3283        assert!(result.is_err(), "should detect transitive cycle");
3284        let err = result.unwrap_err().to_string();
3285        assert!(err.contains("cycle"), "error should mention cycle: {err}");
3286    }
3287
3288    // =========================================================================
3289    // Correlation event inclusion tests
3290    // =========================================================================
3291
3292    #[test]
3293    fn test_correlation_events_disabled_by_default() {
3294        let yaml = r#"
3295title: Login
3296id: login-rule
3297logsource:
3298    category: auth
3299detection:
3300    selection:
3301        EventType: login
3302    condition: selection
3303---
3304title: Many Logins
3305correlation:
3306    type: event_count
3307    rules:
3308        - login-rule
3309    group-by:
3310        - User
3311    timespan: 60s
3312    condition:
3313        gte: 3
3314level: high
3315"#;
3316        let collection = parse_sigma_yaml(yaml).unwrap();
3317        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3318        engine.add_collection(&collection).unwrap();
3319
3320        for i in 0..3 {
3321            let v = json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i});
3322            let event = Event::from_value(&v);
3323            let result = engine.process_event_at(&event, 1000 + i);
3324            if i == 2 {
3325                assert_eq!(result.correlations.len(), 1);
3326                // Events should NOT be included by default
3327                assert!(result.correlations[0].events.is_none());
3328            }
3329        }
3330    }
3331
3332    #[test]
3333    fn test_correlation_events_included_when_enabled() {
3334        let yaml = r#"
3335title: Login
3336id: login-rule
3337logsource:
3338    category: auth
3339detection:
3340    selection:
3341        EventType: login
3342    condition: selection
3343---
3344title: Many Logins
3345correlation:
3346    type: event_count
3347    rules:
3348        - login-rule
3349    group-by:
3350        - User
3351    timespan: 60s
3352    condition:
3353        gte: 3
3354level: high
3355"#;
3356        let collection = parse_sigma_yaml(yaml).unwrap();
3357        let config = CorrelationConfig {
3358            correlation_event_mode: CorrelationEventMode::Full,
3359            max_correlation_events: 10,
3360            ..Default::default()
3361        };
3362        let mut engine = CorrelationEngine::new(config);
3363        engine.add_collection(&collection).unwrap();
3364
3365        let events_sent: Vec<serde_json::Value> = (0..3)
3366            .map(|i| json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i}))
3367            .collect();
3368
3369        let mut corr_result = None;
3370        for (i, ev) in events_sent.iter().enumerate() {
3371            let event = Event::from_value(ev);
3372            let result = engine.process_event_at(&event, 1000 + i as i64);
3373            if !result.correlations.is_empty() {
3374                corr_result = Some(result);
3375            }
3376        }
3377
3378        let result = corr_result.expect("correlation should have fired");
3379        let corr = &result.correlations[0];
3380
3381        // Events should be included
3382        let events = corr.events.as_ref().expect("events should be present");
3383        assert_eq!(
3384            events.len(),
3385            3,
3386            "all 3 contributing events should be stored"
3387        );
3388
3389        // Verify all sent events are present
3390        for (i, event) in events.iter().enumerate() {
3391            assert_eq!(event["EventType"], "login");
3392            assert_eq!(event["User"], "admin");
3393            assert_eq!(event["@timestamp"], 1000 + i as i64);
3394        }
3395    }
3396
3397    #[test]
3398    fn test_correlation_events_max_cap() {
3399        let yaml = r#"
3400title: Login
3401id: login-rule
3402logsource:
3403    category: auth
3404detection:
3405    selection:
3406        EventType: login
3407    condition: selection
3408---
3409title: Many Logins
3410correlation:
3411    type: event_count
3412    rules:
3413        - login-rule
3414    group-by:
3415        - User
3416    timespan: 60s
3417    condition:
3418        gte: 5
3419level: high
3420"#;
3421        let collection = parse_sigma_yaml(yaml).unwrap();
3422        let config = CorrelationConfig {
3423            correlation_event_mode: CorrelationEventMode::Full,
3424            max_correlation_events: 3, // only keep last 3
3425            ..Default::default()
3426        };
3427        let mut engine = CorrelationEngine::new(config);
3428        engine.add_collection(&collection).unwrap();
3429
3430        let mut corr_result = None;
3431        for i in 0..5 {
3432            let v = json!({"EventType": "login", "User": "admin", "idx": i});
3433            let event = Event::from_value(&v);
3434            let result = engine.process_event_at(&event, 1000 + i);
3435            if !result.correlations.is_empty() {
3436                corr_result = Some(result);
3437            }
3438        }
3439
3440        let result = corr_result.expect("correlation should have fired");
3441        let events = result.correlations[0]
3442            .events
3443            .as_ref()
3444            .expect("events should be present");
3445
3446        // Only the last 3 events should be retained (cap = 3)
3447        assert_eq!(events.len(), 3);
3448        assert_eq!(events[0]["idx"], 2);
3449        assert_eq!(events[1]["idx"], 3);
3450        assert_eq!(events[2]["idx"], 4);
3451    }
3452
3453    #[test]
3454    fn test_correlation_events_with_reset_action() {
3455        let yaml = r#"
3456title: Login
3457id: login-rule
3458logsource:
3459    category: auth
3460detection:
3461    selection:
3462        EventType: login
3463    condition: selection
3464---
3465title: Many Logins
3466correlation:
3467    type: event_count
3468    rules:
3469        - login-rule
3470    group-by:
3471        - User
3472    timespan: 60s
3473    condition:
3474        gte: 2
3475level: high
3476"#;
3477        let collection = parse_sigma_yaml(yaml).unwrap();
3478        let config = CorrelationConfig {
3479            correlation_event_mode: CorrelationEventMode::Full,
3480            action_on_match: CorrelationAction::Reset,
3481            ..Default::default()
3482        };
3483        let mut engine = CorrelationEngine::new(config);
3484        engine.add_collection(&collection).unwrap();
3485
3486        // First round: 2 events -> fires
3487        for i in 0..2 {
3488            let v = json!({"EventType": "login", "User": "admin", "round": 1, "idx": i});
3489            let event = Event::from_value(&v);
3490            let result = engine.process_event_at(&event, 1000 + i);
3491            if i == 1 {
3492                assert_eq!(result.correlations.len(), 1);
3493                let events = result.correlations[0].events.as_ref().unwrap();
3494                assert_eq!(events.len(), 2);
3495            }
3496        }
3497
3498        // After reset, event buffer should be cleared.
3499        // Second round: need 2 more events to fire again
3500        let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 0});
3501        let event = Event::from_value(&v);
3502        let result = engine.process_event_at(&event, 1010);
3503        assert!(
3504            result.correlations.is_empty(),
3505            "should not fire with only 1 event after reset"
3506        );
3507
3508        let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 1});
3509        let event = Event::from_value(&v);
3510        let result = engine.process_event_at(&event, 1011);
3511        assert_eq!(result.correlations.len(), 1);
3512        let events = result.correlations[0].events.as_ref().unwrap();
3513        assert_eq!(events.len(), 2);
3514        // Should only have round 2 events
3515        assert_eq!(events[0]["round"], 2);
3516        assert_eq!(events[1]["round"], 2);
3517    }
3518
3519    #[test]
3520    fn test_correlation_events_with_set_include() {
3521        let yaml = r#"
3522title: Login
3523id: login-rule
3524logsource:
3525    category: auth
3526detection:
3527    selection:
3528        EventType: login
3529    condition: selection
3530---
3531title: Many Logins
3532correlation:
3533    type: event_count
3534    rules:
3535        - login-rule
3536    group-by:
3537        - User
3538    timespan: 60s
3539    condition:
3540        gte: 2
3541level: high
3542"#;
3543        let collection = parse_sigma_yaml(yaml).unwrap();
3544        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3545        engine.add_collection(&collection).unwrap();
3546
3547        // Enable via setter
3548        engine.set_correlation_event_mode(CorrelationEventMode::Full);
3549
3550        for i in 0..2 {
3551            let v = json!({"EventType": "login", "User": "admin"});
3552            let event = Event::from_value(&v);
3553            let result = engine.process_event_at(&event, 1000 + i);
3554            if i == 1 {
3555                assert_eq!(result.correlations.len(), 1);
3556                assert!(result.correlations[0].events.is_some());
3557                assert_eq!(result.correlations[0].events.as_ref().unwrap().len(), 2);
3558            }
3559        }
3560    }
3561
3562    #[test]
3563    fn test_correlation_events_eviction_syncs_with_window() {
3564        let yaml = r#"
3565title: Login
3566id: login-rule
3567logsource:
3568    category: auth
3569detection:
3570    selection:
3571        EventType: login
3572    condition: selection
3573---
3574title: Many Logins
3575correlation:
3576    type: event_count
3577    rules:
3578        - login-rule
3579    group-by:
3580        - User
3581    timespan: 10s
3582    condition:
3583        gte: 3
3584level: high
3585"#;
3586        let collection = parse_sigma_yaml(yaml).unwrap();
3587        let config = CorrelationConfig {
3588            correlation_event_mode: CorrelationEventMode::Full,
3589            max_correlation_events: 100,
3590            ..Default::default()
3591        };
3592        let mut engine = CorrelationEngine::new(config);
3593        engine.add_collection(&collection).unwrap();
3594
3595        // Push 2 events at ts=1000,1001 — within the 10s window
3596        for i in 0..2 {
3597            let v = json!({"EventType": "login", "User": "admin", "idx": i});
3598            let event = Event::from_value(&v);
3599            engine.process_event_at(&event, 1000 + i);
3600        }
3601
3602        // Push 1 more event at ts=1015 — the first 2 events are now outside the
3603        // 10s window (cutoff = 1015 - 10 = 1005)
3604        let v = json!({"EventType": "login", "User": "admin", "idx": 2});
3605        let event = Event::from_value(&v);
3606        let result = engine.process_event_at(&event, 1015);
3607        // Should NOT fire: only 1 event in window (the one at ts=1015)
3608        assert!(
3609            result.correlations.is_empty(),
3610            "should not fire — old events evicted"
3611        );
3612
3613        // Push 2 more to reach threshold
3614        for i in 3..5 {
3615            let v = json!({"EventType": "login", "User": "admin", "idx": i});
3616            let event = Event::from_value(&v);
3617            let result = engine.process_event_at(&event, 1016 + i - 3);
3618            if i == 4 {
3619                assert_eq!(result.correlations.len(), 1);
3620                let events = result.correlations[0].events.as_ref().unwrap();
3621                // Should have events from ts=1015,1016,1017 — not the old ones
3622                assert_eq!(events.len(), 3);
3623                for ev in events {
3624                    assert!(ev["idx"].as_i64().unwrap() >= 2);
3625                }
3626            }
3627        }
3628    }
3629
3630    #[test]
3631    fn test_event_buffer_monitoring() {
3632        let yaml = r#"
3633title: Login
3634id: login-rule
3635logsource:
3636    category: auth
3637detection:
3638    selection:
3639        EventType: login
3640    condition: selection
3641---
3642title: Many Logins
3643correlation:
3644    type: event_count
3645    rules:
3646        - login-rule
3647    group-by:
3648        - User
3649    timespan: 60s
3650    condition:
3651        gte: 100
3652level: high
3653"#;
3654        let collection = parse_sigma_yaml(yaml).unwrap();
3655        let config = CorrelationConfig {
3656            correlation_event_mode: CorrelationEventMode::Full,
3657            ..Default::default()
3658        };
3659        let mut engine = CorrelationEngine::new(config);
3660        engine.add_collection(&collection).unwrap();
3661
3662        assert_eq!(engine.event_buffer_count(), 0);
3663        assert_eq!(engine.event_buffer_bytes(), 0);
3664
3665        // Push some events
3666        for i in 0..5 {
3667            let v = json!({"EventType": "login", "User": "admin"});
3668            let event = Event::from_value(&v);
3669            engine.process_event_at(&event, 1000 + i);
3670        }
3671
3672        assert_eq!(engine.event_buffer_count(), 1); // one group key
3673        assert!(engine.event_buffer_bytes() > 0);
3674    }
3675
3676    #[test]
3677    fn test_correlation_refs_mode_basic() {
3678        let yaml = r#"
3679title: Login
3680id: login-rule
3681logsource:
3682    category: auth
3683detection:
3684    selection:
3685        EventType: login
3686    condition: selection
3687---
3688title: Many Logins
3689correlation:
3690    type: event_count
3691    rules:
3692        - login-rule
3693    group-by:
3694        - User
3695    timespan: 60s
3696    condition:
3697        gte: 3
3698level: high
3699"#;
3700        let collection = parse_sigma_yaml(yaml).unwrap();
3701        let config = CorrelationConfig {
3702            correlation_event_mode: CorrelationEventMode::Refs,
3703            max_correlation_events: 10,
3704            ..Default::default()
3705        };
3706        let mut engine = CorrelationEngine::new(config);
3707        engine.add_collection(&collection).unwrap();
3708
3709        let mut corr_result = None;
3710        for i in 0..3 {
3711            let v = json!({"EventType": "login", "User": "admin", "id": format!("evt-{i}"), "@timestamp": 1000 + i});
3712            let event = Event::from_value(&v);
3713            let result = engine.process_event_at(&event, 1000 + i);
3714            if !result.correlations.is_empty() {
3715                corr_result = Some(result.correlations[0].clone());
3716            }
3717        }
3718
3719        let result = corr_result.expect("correlation should have fired");
3720        // In refs mode: events should be None, event_refs should be Some
3721        assert!(
3722            result.events.is_none(),
3723            "Full events should not be included in refs mode"
3724        );
3725        let refs = result
3726            .event_refs
3727            .expect("event_refs should be present in refs mode");
3728        assert_eq!(refs.len(), 3);
3729        assert_eq!(refs[0].timestamp, 1000);
3730        assert_eq!(refs[0].id, Some("evt-0".to_string()));
3731        assert_eq!(refs[1].id, Some("evt-1".to_string()));
3732        assert_eq!(refs[2].id, Some("evt-2".to_string()));
3733    }
3734
3735    #[test]
3736    fn test_correlation_refs_mode_no_id_field() {
3737        let yaml = r#"
3738title: Login
3739id: login-rule
3740logsource:
3741    category: auth
3742detection:
3743    selection:
3744        EventType: login
3745    condition: selection
3746---
3747title: Many Logins
3748correlation:
3749    type: event_count
3750    rules:
3751        - login-rule
3752    group-by:
3753        - User
3754    timespan: 60s
3755    condition:
3756        gte: 2
3757level: high
3758"#;
3759        let collection = parse_sigma_yaml(yaml).unwrap();
3760        let config = CorrelationConfig {
3761            correlation_event_mode: CorrelationEventMode::Refs,
3762            ..Default::default()
3763        };
3764        let mut engine = CorrelationEngine::new(config);
3765        engine.add_collection(&collection).unwrap();
3766
3767        let mut corr_result = None;
3768        for i in 0..2 {
3769            let v = json!({"EventType": "login", "User": "admin"});
3770            let event = Event::from_value(&v);
3771            let result = engine.process_event_at(&event, 1000 + i);
3772            if !result.correlations.is_empty() {
3773                corr_result = Some(result.correlations[0].clone());
3774            }
3775        }
3776
3777        let result = corr_result.expect("correlation should have fired");
3778        let refs = result.event_refs.expect("event_refs should be present");
3779        // No ID field in events → id should be None
3780        for r in &refs {
3781            assert_eq!(r.id, None);
3782        }
3783    }
3784
3785    #[test]
3786    fn test_per_correlation_custom_attributes_from_yaml() {
3787        let yaml = r#"
3788title: Login
3789id: login-rule
3790logsource:
3791    category: auth
3792detection:
3793    selection:
3794        EventType: login
3795    condition: selection
3796---
3797title: Many Logins
3798custom_attributes:
3799    rsigma.correlation_event_mode: refs
3800    rsigma.max_correlation_events: "5"
3801correlation:
3802    type: event_count
3803    rules:
3804        - login-rule
3805    group-by:
3806        - User
3807    timespan: 60s
3808    condition:
3809        gte: 3
3810level: high
3811"#;
3812        let collection = parse_sigma_yaml(yaml).unwrap();
3813        // Engine mode is None (default), but per-correlation should override to Refs
3814        let config = CorrelationConfig::default();
3815        let mut engine = CorrelationEngine::new(config);
3816        engine.add_collection(&collection).unwrap();
3817
3818        let mut corr_result = None;
3819        for i in 0..3 {
3820            let v = json!({"EventType": "login", "User": "admin", "id": format!("e{i}")});
3821            let event = Event::from_value(&v);
3822            let result = engine.process_event_at(&event, 1000 + i);
3823            if !result.correlations.is_empty() {
3824                corr_result = Some(result.correlations[0].clone());
3825            }
3826        }
3827
3828        let result = corr_result.expect("correlation should fire with per-correlation refs mode");
3829        // Per-correlation override should enable refs mode even though engine default is None
3830        assert!(result.events.is_none());
3831        let refs = result
3832            .event_refs
3833            .expect("event_refs via per-correlation override");
3834        assert_eq!(refs.len(), 3);
3835        assert_eq!(refs[0].id, Some("e0".to_string()));
3836    }
3837
3838    #[test]
3839    fn test_per_correlation_custom_attr_suppress_and_action() {
3840        let yaml = r#"
3841title: Login
3842id: login-rule
3843logsource:
3844    category: auth
3845detection:
3846    selection:
3847        EventType: login
3848    condition: selection
3849---
3850title: Many Logins
3851custom_attributes:
3852    rsigma.suppress: 10s
3853    rsigma.action: reset
3854correlation:
3855    type: event_count
3856    rules:
3857        - login-rule
3858    group-by:
3859        - User
3860    timespan: 60s
3861    condition:
3862        gte: 2
3863level: high
3864"#;
3865        let collection = parse_sigma_yaml(yaml).unwrap();
3866        let mut engine = CorrelationEngine::new(CorrelationConfig::default());
3867        engine.add_collection(&collection).unwrap();
3868
3869        // Verify the compiled correlation has per-rule overrides
3870        assert_eq!(engine.correlations[0].suppress_secs, Some(10));
3871        assert_eq!(
3872            engine.correlations[0].action,
3873            Some(CorrelationAction::Reset)
3874        );
3875    }
3876}