Skip to main content

rsigma_eval/
correlation.rs

1//! Compiled correlation types, group key, window state, and compilation.
2//!
3//! Transforms the parser's `CorrelationRule` AST into an optimized
4//! `CompiledCorrelation` with associated `WindowState` for stateful evaluation.
5
6use std::collections::{HashMap, HashSet, VecDeque};
7use std::io::{Read as IoRead, Write as IoWrite};
8use std::sync::Arc;
9
10use flate2::Compression;
11use flate2::read::DeflateDecoder;
12use flate2::write::DeflateEncoder;
13use serde::Serialize;
14
15use rsigma_parser::{
16    ConditionExpr, ConditionOperator, CorrelationCondition, CorrelationRule, CorrelationType,
17    FieldAlias, Level,
18};
19
20use crate::error::{EvalError, Result};
21use crate::event::{Event, EventValue};
22
23// =============================================================================
24// Compiled types
25// =============================================================================
26
27/// Compiled form of a `CorrelationRule`, ready for stateful evaluation.
28#[derive(Debug, Clone)]
29pub struct CompiledCorrelation {
30    pub id: Option<String>,
31    pub name: Option<String>,
32    pub title: String,
33    pub level: Option<Level>,
34    pub tags: Vec<String>,
35    pub correlation_type: CorrelationType,
36    /// IDs or names of referenced rules (detection or other correlations).
37    pub rule_refs: Vec<String>,
38    /// Resolved group-by fields (may include aliases).
39    pub group_by: Vec<GroupByField>,
40    /// Time window in seconds.
41    pub timespan_secs: u64,
42    /// Compiled threshold condition.
43    pub condition: CompiledCondition,
44    /// Extended boolean condition expression for temporal correlations.
45    /// When set, evaluates this expression against fired rules instead of
46    /// a simple threshold count.
47    pub extended_expr: Option<ConditionExpr>,
48    /// Whether referenced detection rules should also generate standalone matches.
49    pub generate: bool,
50    /// Per-correlation suppression window in seconds, resolved from the
51    /// `rsigma.suppress` custom attribute. `None` means use engine default.
52    pub suppress_secs: Option<u64>,
53    /// Per-correlation action on match, resolved from the `rsigma.action`
54    /// custom attribute. `None` means use engine default.
55    pub action: Option<crate::correlation_engine::CorrelationAction>,
56    /// Event inclusion mode for this correlation.
57    /// `None` means use the engine default (`CorrelationConfig.correlation_event_mode`).
58    pub event_mode: Option<crate::correlation_engine::CorrelationEventMode>,
59    /// Maximum events to store per window group for event inclusion.
60    /// `None` means use the engine default (`CorrelationConfig.max_correlation_events`).
61    pub max_events: Option<usize>,
62    /// Custom attributes from the original Sigma correlation rule (merged).
63    /// Wrapped in `Arc` so that per-match cloning is a pointer bump.
64    pub custom_attributes: Arc<HashMap<String, serde_json::Value>>,
65}
66
67/// A group-by field, potentially aliased per referenced rule.
68#[derive(Debug, Clone)]
69pub enum GroupByField {
70    /// Simple field name, same across all referenced rules.
71    Direct(String),
72    /// Aliased: maps rule_ref -> actual field name in that rule's events.
73    Aliased {
74        alias: String,
75        mapping: HashMap<String, String>,
76    },
77}
78
79impl GroupByField {
80    /// Get the display name of this group-by field.
81    pub fn name(&self) -> &str {
82        match self {
83            GroupByField::Direct(s) => s,
84            GroupByField::Aliased { alias, .. } => alias,
85        }
86    }
87
88    /// Resolve the actual field name to look up in an event, given which
89    /// rule (by ID or name) produced the detection match.
90    ///
91    /// Tries to find the rule in the alias mapping by any of the provided
92    /// identifiers (ID, name, etc.).
93    pub fn resolve(&self, rule_refs: &[&str]) -> &str {
94        match self {
95            GroupByField::Direct(s) => s,
96            GroupByField::Aliased { alias, mapping } => {
97                for r in rule_refs {
98                    if let Some(field) = mapping.get(*r) {
99                        return field.as_str();
100                    }
101                }
102                alias
103            }
104        }
105    }
106}
107
108/// Compiled threshold condition with one or more predicates (supports ranges).
109#[derive(Debug, Clone)]
110pub struct CompiledCondition {
111    /// Optional field name for value_count, value_sum, value_avg, value_percentile.
112    pub field: Option<String>,
113    /// One or more predicates to satisfy (all must be true for the condition to match).
114    pub predicates: Vec<(ConditionOperator, f64)>,
115}
116
117impl CompiledCondition {
118    /// Check if the given value satisfies all predicates.
119    pub fn check(&self, value: f64) -> bool {
120        self.predicates.iter().all(|(op, threshold)| match op {
121            ConditionOperator::Lt => value < *threshold,
122            ConditionOperator::Lte => value <= *threshold,
123            ConditionOperator::Gt => value > *threshold,
124            ConditionOperator::Gte => value >= *threshold,
125            ConditionOperator::Eq => (value - *threshold).abs() < f64::EPSILON,
126            ConditionOperator::Neq => (value - *threshold).abs() >= f64::EPSILON,
127        })
128    }
129}
130
131// =============================================================================
132// Group Key
133// =============================================================================
134
135/// Composite key for group-by partitioning.
136///
137/// Each element corresponds to a `GroupByField` value extracted from an event.
138/// `None` means the field was absent from the event.
139#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, serde::Deserialize)]
140pub struct GroupKey(pub Vec<Option<String>>);
141
142impl GroupKey {
143    /// Extract a group key from an event given the group-by fields and the
144    /// rule reference identifiers (ID, name, etc.) that produced the detection match.
145    pub fn extract(event: &impl Event, group_by: &[GroupByField], rule_refs: &[&str]) -> Self {
146        let values = group_by
147            .iter()
148            .map(|field| {
149                let field_name = field.resolve(rule_refs);
150                event
151                    .get_field(field_name)
152                    .and_then(|v| value_to_string(&v))
153            })
154            .collect();
155        GroupKey(values)
156    }
157
158    /// Build a group key from explicit field-value pairs (for chaining).
159    pub fn from_pairs(pairs: &[(String, String)], group_by: &[GroupByField]) -> Self {
160        let values = group_by
161            .iter()
162            .map(|field| {
163                let name = field.name();
164                pairs
165                    .iter()
166                    .find(|(k, _)| k == name)
167                    .map(|(_, v)| v.clone())
168            })
169            .collect();
170        GroupKey(values)
171    }
172
173    /// Convert to field-name/value pairs for output.
174    pub fn to_pairs(&self, group_by: &[GroupByField]) -> Vec<(String, String)> {
175        group_by
176            .iter()
177            .zip(self.0.iter())
178            .filter_map(|(field, value)| {
179                value
180                    .as_ref()
181                    .map(|v| (field.name().to_string(), v.clone()))
182            })
183            .collect()
184    }
185}
186
187/// Convert an [`EventValue`] to a string for group-key purposes.
188fn value_to_string(v: &EventValue) -> Option<String> {
189    match v {
190        EventValue::Str(s) => Some(s.to_string()),
191        EventValue::Int(n) => Some(n.to_string()),
192        EventValue::Float(f) => Some(f.to_string()),
193        EventValue::Bool(b) => Some(b.to_string()),
194        _ => None,
195    }
196}
197
198// =============================================================================
199// Compressed Event Buffer
200// =============================================================================
201
202/// Default compression level — fast compression (level 1) for minimal latency.
203/// Deflate level 1 still achieves ~2-3x compression on JSON while being very fast.
204const COMPRESSION_LEVEL: Compression = Compression::fast();
205
206/// Compressed event storage for correlation event inclusion.
207///
208/// Stores event JSON payloads as individually deflate-compressed blobs alongside
209/// their timestamps. This enables per-event eviction (matching `WindowState`
210/// eviction) while keeping memory usage low.
211///
212/// # Memory Model
213///
214/// Each stored event costs approximately `compressed_size + 24` bytes
215/// (8 for timestamp, 16 for Vec overhead). Typical JSON events (500B–5KB)
216/// compress to 100B–1KB with deflate, giving 3–5x memory savings.
217///
218/// The buffer enforces a hard cap (`max_events`) so memory is bounded at:
219///   `max_events × (avg_compressed_size + 24)` bytes per group key.
220#[derive(Debug, Clone, Serialize, serde::Deserialize)]
221pub struct EventBuffer {
222    /// (timestamp, deflate-compressed event JSON) pairs, ordered by timestamp.
223    #[serde(with = "event_buffer_serde")]
224    entries: VecDeque<(i64, Vec<u8>)>,
225    /// Maximum number of events to retain. When exceeded, the oldest event is
226    /// evicted regardless of the time window.
227    max_events: usize,
228}
229
230/// Custom serde for EventBuffer entries: encodes compressed bytes as base64
231/// instead of JSON number arrays, cutting snapshot size ~3x.
232mod event_buffer_serde {
233    use serde::{Deserialize, Deserializer, Serialize, Serializer};
234    use std::collections::VecDeque;
235
236    #[derive(Serialize, Deserialize)]
237    struct Entry {
238        ts: i64,
239        #[serde(with = "base64_bytes")]
240        data: Vec<u8>,
241    }
242
243    mod base64_bytes {
244        use base64::Engine as _;
245        use base64::engine::general_purpose::STANDARD;
246        use serde::{Deserializer, Serializer};
247
248        pub fn serialize<S: Serializer>(bytes: &Vec<u8>, s: S) -> Result<S::Ok, S::Error> {
249            s.serialize_str(&STANDARD.encode(bytes))
250        }
251
252        pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Vec<u8>, D::Error> {
253            let s: String = serde::Deserialize::deserialize(d)?;
254            STANDARD.decode(s).map_err(serde::de::Error::custom)
255        }
256    }
257
258    pub fn serialize<S: Serializer>(
259        entries: &VecDeque<(i64, Vec<u8>)>,
260        s: S,
261    ) -> Result<S::Ok, S::Error> {
262        let v: Vec<Entry> = entries
263            .iter()
264            .map(|(ts, data)| Entry {
265                ts: *ts,
266                data: data.clone(),
267            })
268            .collect();
269        v.serialize(s)
270    }
271
272    pub fn deserialize<'de, D: Deserializer<'de>>(
273        d: D,
274    ) -> Result<VecDeque<(i64, Vec<u8>)>, D::Error> {
275        let v: Vec<Entry> = Vec::deserialize(d)?;
276        Ok(v.into_iter().map(|e| (e.ts, e.data)).collect())
277    }
278}
279
280impl EventBuffer {
281    /// Create a new event buffer with the given capacity cap.
282    pub fn new(max_events: usize) -> Self {
283        EventBuffer {
284            entries: VecDeque::with_capacity(max_events.min(64)),
285            max_events,
286        }
287    }
288
289    /// Compress and store an event. Evicts the oldest entry if at capacity.
290    pub fn push(&mut self, ts: i64, event: &serde_json::Value) {
291        // Compress the event JSON with deflate
292        if let Some(compressed) = compress_event(event) {
293            if self.entries.len() >= self.max_events {
294                self.entries.pop_front();
295            }
296            self.entries.push_back((ts, compressed));
297        }
298    }
299
300    /// Remove all entries older than the cutoff timestamp.
301    pub fn evict(&mut self, cutoff: i64) {
302        while self.entries.front().is_some_and(|(t, _)| *t < cutoff) {
303            self.entries.pop_front();
304        }
305    }
306
307    /// Decompress and return all stored events.
308    pub fn decompress_all(&self) -> Vec<serde_json::Value> {
309        self.entries
310            .iter()
311            .filter_map(|(_, compressed)| decompress_event(compressed))
312            .collect()
313    }
314
315    /// Returns true if there are no stored events.
316    pub fn is_empty(&self) -> bool {
317        self.entries.is_empty()
318    }
319
320    /// Clear all stored events.
321    pub fn clear(&mut self) {
322        self.entries.clear();
323    }
324
325    /// Total compressed bytes stored (for monitoring/diagnostics).
326    pub fn compressed_bytes(&self) -> usize {
327        self.entries.iter().map(|(_, data)| data.len()).sum()
328    }
329
330    /// Number of stored events.
331    pub fn len(&self) -> usize {
332        self.entries.len()
333    }
334}
335
336/// Compress an event JSON value using deflate.
337fn compress_event(event: &serde_json::Value) -> Option<Vec<u8>> {
338    let json_bytes = serde_json::to_vec(event).ok()?;
339    let mut encoder = DeflateEncoder::new(Vec::new(), COMPRESSION_LEVEL);
340    encoder.write_all(&json_bytes).ok()?;
341    encoder.finish().ok()
342}
343
344/// Decompress a deflate-compressed event back to a JSON value.
345fn decompress_event(compressed: &[u8]) -> Option<serde_json::Value> {
346    let mut decoder = DeflateDecoder::new(compressed);
347    let mut json_bytes = Vec::new();
348    decoder.read_to_end(&mut json_bytes).ok()?;
349    serde_json::from_slice(&json_bytes).ok()
350}
351
352// =============================================================================
353// Event Reference (lightweight mode)
354// =============================================================================
355
356/// A lightweight event reference: timestamp plus optional event ID.
357///
358/// Used in `Refs` mode for memory-efficient correlation event tracking.
359/// Each ref costs ~40 bytes (vs. 100–1000+ bytes for compressed events),
360/// making this mode suitable for high-volume correlations where only
361/// traceability is needed.
362#[derive(Debug, Clone, Serialize, serde::Deserialize)]
363pub struct EventRef {
364    /// Event timestamp (epoch seconds).
365    pub timestamp: i64,
366    /// Event ID extracted from common fields (`id`, `_id`, `event_id`, etc.).
367    #[serde(skip_serializing_if = "Option::is_none")]
368    pub id: Option<String>,
369}
370
371/// Lightweight event reference buffer for `Refs` mode.
372///
373/// Stores only timestamps and optional event IDs — no event payload,
374/// no compression. This is the minimal-memory alternative to `EventBuffer`.
375#[derive(Debug, Clone, Serialize, serde::Deserialize)]
376pub struct EventRefBuffer {
377    /// Event references, ordered by timestamp.
378    entries: VecDeque<EventRef>,
379    /// Maximum number of refs to retain.
380    max_events: usize,
381}
382
383impl EventRefBuffer {
384    /// Create a new ref buffer with the given capacity cap.
385    pub fn new(max_events: usize) -> Self {
386        EventRefBuffer {
387            entries: VecDeque::with_capacity(max_events.min(64)),
388            max_events,
389        }
390    }
391
392    /// Store a reference to an event. Evicts the oldest ref if at capacity.
393    pub fn push(&mut self, ts: i64, event: &serde_json::Value) {
394        if self.entries.len() >= self.max_events {
395            self.entries.pop_front();
396        }
397        let id = extract_event_id(event);
398        self.entries.push_back(EventRef { timestamp: ts, id });
399    }
400
401    /// Remove all refs older than the cutoff timestamp.
402    pub fn evict(&mut self, cutoff: i64) {
403        while self.entries.front().is_some_and(|r| r.timestamp < cutoff) {
404            self.entries.pop_front();
405        }
406    }
407
408    /// Return cloned refs.
409    pub fn refs(&self) -> Vec<EventRef> {
410        self.entries.iter().cloned().collect()
411    }
412
413    /// Returns true if there are no stored refs.
414    pub fn is_empty(&self) -> bool {
415        self.entries.is_empty()
416    }
417
418    /// Clear all stored refs.
419    pub fn clear(&mut self) {
420        self.entries.clear();
421    }
422
423    /// Number of stored refs.
424    pub fn len(&self) -> usize {
425        self.entries.len()
426    }
427}
428
429/// Try to extract an event ID from common fields.
430///
431/// Checks (in order): `id`, `_id`, `event_id`, `EventRecordID`, `event.id`.
432/// Returns the first found value as a string.
433fn extract_event_id(event: &serde_json::Value) -> Option<String> {
434    const ID_FIELDS: &[&str] = &["id", "_id", "event_id", "EventRecordID", "event.id"];
435    for field in ID_FIELDS {
436        if let Some(val) = event.get(field) {
437            return match val {
438                serde_json::Value::String(s) => Some(s.clone()),
439                serde_json::Value::Number(n) => Some(n.to_string()),
440                _ => None,
441            };
442        }
443    }
444    None
445}
446
447// =============================================================================
448// Window State
449// =============================================================================
450
451/// Per-group mutable state within a time window.
452///
453/// Each variant matches the type of aggregation being performed.
454#[derive(Debug, Clone, Serialize, serde::Deserialize)]
455pub enum WindowState {
456    /// For `event_count`: timestamps of matching events.
457    EventCount { timestamps: VecDeque<i64> },
458    /// For `value_count`: (timestamp, field_value) pairs.
459    ValueCount { entries: VecDeque<(i64, String)> },
460    /// For `temporal` / `temporal_ordered`: rule_ref -> list of hit timestamps.
461    Temporal {
462        rule_hits: HashMap<String, VecDeque<i64>>,
463    },
464    /// For `value_sum`, `value_avg`, `value_percentile`, `value_median`:
465    /// (timestamp, numeric_value) pairs.
466    NumericAgg { entries: VecDeque<(i64, f64)> },
467}
468
469impl WindowState {
470    /// Create a new empty window state for the given correlation type.
471    pub fn new_for(corr_type: CorrelationType) -> Self {
472        match corr_type {
473            CorrelationType::EventCount => WindowState::EventCount {
474                timestamps: VecDeque::new(),
475            },
476            CorrelationType::ValueCount => WindowState::ValueCount {
477                entries: VecDeque::new(),
478            },
479            CorrelationType::Temporal | CorrelationType::TemporalOrdered => WindowState::Temporal {
480                rule_hits: HashMap::new(),
481            },
482            CorrelationType::ValueSum
483            | CorrelationType::ValueAvg
484            | CorrelationType::ValuePercentile
485            | CorrelationType::ValueMedian => WindowState::NumericAgg {
486                entries: VecDeque::new(),
487            },
488        }
489    }
490
491    /// Remove all entries older than the cutoff timestamp.
492    pub fn evict(&mut self, cutoff: i64) {
493        match self {
494            WindowState::EventCount { timestamps } => {
495                while timestamps.front().is_some_and(|&t| t < cutoff) {
496                    timestamps.pop_front();
497                }
498            }
499            WindowState::ValueCount { entries } => {
500                while entries.front().is_some_and(|(t, _)| *t < cutoff) {
501                    entries.pop_front();
502                }
503            }
504            WindowState::Temporal { rule_hits } => {
505                for timestamps in rule_hits.values_mut() {
506                    while timestamps.front().is_some_and(|&t| t < cutoff) {
507                        timestamps.pop_front();
508                    }
509                }
510                // Remove empty rule entries
511                rule_hits.retain(|_, ts| !ts.is_empty());
512            }
513            WindowState::NumericAgg { entries } => {
514                while entries.front().is_some_and(|(t, _)| *t < cutoff) {
515                    entries.pop_front();
516                }
517            }
518        }
519    }
520
521    /// Returns true if this state has no entries.
522    pub fn is_empty(&self) -> bool {
523        match self {
524            WindowState::EventCount { timestamps } => timestamps.is_empty(),
525            WindowState::ValueCount { entries } => entries.is_empty(),
526            WindowState::Temporal { rule_hits } => rule_hits.is_empty(),
527            WindowState::NumericAgg { entries } => entries.is_empty(),
528        }
529    }
530
531    /// Returns the most recent timestamp in this window, or `None` if empty.
532    pub fn latest_timestamp(&self) -> Option<i64> {
533        match self {
534            WindowState::EventCount { timestamps } => timestamps.back().copied(),
535            WindowState::ValueCount { entries } => entries.back().map(|(t, _)| *t),
536            WindowState::Temporal { rule_hits } => {
537                rule_hits.values().filter_map(|ts| ts.back().copied()).max()
538            }
539            WindowState::NumericAgg { entries } => entries.back().map(|(t, _)| *t),
540        }
541    }
542
543    /// Clear all entries from the window state (used by `CorrelationAction::Reset`).
544    pub fn clear(&mut self) {
545        match self {
546            WindowState::EventCount { timestamps } => timestamps.clear(),
547            WindowState::ValueCount { entries } => entries.clear(),
548            WindowState::Temporal { rule_hits } => rule_hits.clear(),
549            WindowState::NumericAgg { entries } => entries.clear(),
550        }
551    }
552
553    /// Record an event_count hit.
554    pub fn push_event_count(&mut self, ts: i64) {
555        if let WindowState::EventCount { timestamps } = self {
556            timestamps.push_back(ts);
557        }
558    }
559
560    /// Record a value_count hit with the field value.
561    pub fn push_value_count(&mut self, ts: i64, value: String) {
562        if let WindowState::ValueCount { entries } = self {
563            entries.push_back((ts, value));
564        }
565    }
566
567    /// Record a temporal hit for a specific rule reference.
568    pub fn push_temporal(&mut self, ts: i64, rule_ref: &str) {
569        if let WindowState::Temporal { rule_hits } = self {
570            rule_hits
571                .entry(rule_ref.to_string())
572                .or_default()
573                .push_back(ts);
574        }
575    }
576
577    /// Record a numeric aggregation value.
578    pub fn push_numeric(&mut self, ts: i64, value: f64) {
579        if let WindowState::NumericAgg { entries } = self {
580            entries.push_back((ts, value));
581        }
582    }
583
584    /// Evaluate the window state against the correlation condition.
585    ///
586    /// Returns `Some(aggregated_value)` if the condition is satisfied,
587    /// `None` otherwise.
588    ///
589    /// For temporal correlations with an extended expression, the expression
590    /// is evaluated against the set of rules that have fired in the window.
591    pub fn check_condition(
592        &self,
593        condition: &CompiledCondition,
594        corr_type: CorrelationType,
595        rule_refs: &[String],
596        extended_expr: Option<&ConditionExpr>,
597    ) -> Option<f64> {
598        let value = match (self, corr_type) {
599            (WindowState::EventCount { timestamps }, CorrelationType::EventCount) => {
600                timestamps.len() as f64
601            }
602            (WindowState::ValueCount { entries }, CorrelationType::ValueCount) => {
603                // Count distinct values
604                let distinct: HashSet<&String> = entries.iter().map(|(_, v)| v).collect();
605                distinct.len() as f64
606            }
607            (WindowState::Temporal { rule_hits }, CorrelationType::Temporal) => {
608                // If an extended expression is provided, evaluate it
609                if let Some(expr) = extended_expr {
610                    if eval_temporal_expr(expr, rule_hits) {
611                        // Return the count of fired rules as the value
612                        let fired: usize = rule_refs
613                            .iter()
614                            .filter(|r| rule_hits.get(r.as_str()).is_some_and(|ts| !ts.is_empty()))
615                            .count();
616                        return Some(fired as f64);
617                    } else {
618                        return None;
619                    }
620                }
621                // Default: count how many distinct referenced rules have fired
622                let fired: usize = rule_refs
623                    .iter()
624                    .filter(|r| rule_hits.get(r.as_str()).is_some_and(|ts| !ts.is_empty()))
625                    .count();
626                fired as f64
627            }
628            (WindowState::Temporal { rule_hits }, CorrelationType::TemporalOrdered) => {
629                // If an extended expression is provided, evaluate it first
630                if let Some(expr) = extended_expr
631                    && !eval_temporal_expr(expr, rule_hits)
632                {
633                    return None;
634                }
635                // Check if all referenced rules fired in order
636                if check_temporal_ordered(rule_refs, rule_hits) {
637                    rule_refs.len() as f64
638                } else {
639                    0.0
640                }
641            }
642            (WindowState::NumericAgg { entries }, CorrelationType::ValueSum) => {
643                entries.iter().map(|(_, v)| v).sum()
644            }
645            (WindowState::NumericAgg { entries }, CorrelationType::ValueAvg) => {
646                if entries.is_empty() {
647                    0.0
648                } else {
649                    let sum: f64 = entries.iter().map(|(_, v)| v).sum();
650                    sum / entries.len() as f64
651                }
652            }
653            (WindowState::NumericAgg { entries }, CorrelationType::ValuePercentile) => {
654                // Proper percentile calculation using linear interpolation.
655                // The condition threshold represents a percentile rank (0-100).
656                // We compute the value at that percentile from the window data.
657                if entries.is_empty() {
658                    return None;
659                }
660                let mut values: Vec<f64> = entries
661                    .iter()
662                    .map(|(_, v)| *v)
663                    .filter(|v| v.is_finite())
664                    .collect();
665                if values.is_empty() {
666                    return None;
667                }
668                values.sort_by(|a, b| a.partial_cmp(b).expect("NaN filtered"));
669                // Extract the percentile rank from the condition's first predicate
670                let percentile_rank = condition
671                    .predicates
672                    .first()
673                    .map(|(_, threshold)| *threshold)
674                    .unwrap_or(50.0);
675                let pval = percentile_linear_interp(&values, percentile_rank);
676                return Some(pval);
677            }
678            (WindowState::NumericAgg { entries }, CorrelationType::ValueMedian) => {
679                if entries.is_empty() {
680                    0.0
681                } else {
682                    let mut values: Vec<f64> = entries
683                        .iter()
684                        .map(|(_, v)| *v)
685                        .filter(|v| v.is_finite())
686                        .collect();
687                    if values.is_empty() {
688                        return None;
689                    }
690                    values.sort_by(|a, b| a.partial_cmp(b).expect("NaN filtered"));
691                    let mid = values.len() / 2;
692                    if values.len().is_multiple_of(2) && values.len() >= 2 {
693                        (values[mid - 1] + values[mid]) / 2.0
694                    } else {
695                        values[mid]
696                    }
697                }
698            }
699            _ => return None, // mismatched state/type
700        };
701
702        if condition.check(value) {
703            Some(value)
704        } else {
705            None
706        }
707    }
708}
709
710/// Check if all referenced rules fired in the correct order within the window.
711///
712/// For `temporal_ordered`, each rule must have at least one hit, and there
713/// must exist a sequence of timestamps (one per rule) that is non-decreasing
714/// and follows the rule ordering.
715fn check_temporal_ordered(
716    rule_refs: &[String],
717    rule_hits: &HashMap<String, VecDeque<i64>>,
718) -> bool {
719    if rule_refs.is_empty() {
720        return true;
721    }
722
723    // All rules must have at least one hit
724    for r in rule_refs {
725        if rule_hits.get(r.as_str()).is_none_or(|ts| ts.is_empty()) {
726            return false;
727        }
728    }
729
730    // Check if there's a valid ordered sequence: for each rule in order,
731    // find a timestamp >= the previous rule's chosen timestamp.
732    fn find_ordered(
733        rule_refs: &[String],
734        rule_hits: &HashMap<String, VecDeque<i64>>,
735        idx: usize,
736        min_ts: i64,
737    ) -> bool {
738        if idx >= rule_refs.len() {
739            return true;
740        }
741        let Some(timestamps) = rule_hits.get(&rule_refs[idx]) else {
742            return false;
743        };
744        for &ts in timestamps {
745            if ts >= min_ts && find_ordered(rule_refs, rule_hits, idx + 1, ts) {
746                return true;
747            }
748        }
749        false
750    }
751
752    find_ordered(rule_refs, rule_hits, 0, i64::MIN)
753}
754
755/// Evaluate a boolean condition expression against the set of rules that have
756/// fired within the temporal window.
757///
758/// Each `Identifier` in the expression is treated as a rule reference — it's
759/// `true` if that rule has at least one hit in `rule_hits`.
760fn eval_temporal_expr(expr: &ConditionExpr, rule_hits: &HashMap<String, VecDeque<i64>>) -> bool {
761    match expr {
762        ConditionExpr::Identifier(name) => rule_hits
763            .get(name.as_str())
764            .is_some_and(|ts| !ts.is_empty()),
765        ConditionExpr::And(children) => children.iter().all(|c| eval_temporal_expr(c, rule_hits)),
766        ConditionExpr::Or(children) => children.iter().any(|c| eval_temporal_expr(c, rule_hits)),
767        ConditionExpr::Not(child) => !eval_temporal_expr(child, rule_hits),
768        ConditionExpr::Selector { .. } => {
769            // Selectors are not meaningful for temporal condition evaluation
770            false
771        }
772    }
773}
774
775/// Compute the value at a given percentile rank using linear interpolation.
776///
777/// Returns 0.0 if `values` is empty.
778/// `values` must be sorted in ascending order.
779/// `percentile` is from 0.0 to 100.0.
780fn percentile_linear_interp(values: &[f64], percentile: f64) -> f64 {
781    if values.is_empty() {
782        return 0.0;
783    }
784    let n = values.len();
785    if n == 1 {
786        return values[0];
787    }
788
789    // Clamp percentile to [0, 100]
790    let p = percentile.clamp(0.0, 100.0) / 100.0;
791
792    // Use the "C = 1" interpolation method (most common in statistics)
793    // rank = p * (n - 1)
794    let rank = p * (n - 1) as f64;
795    let lower = rank.floor() as usize;
796    let upper = rank.ceil() as usize;
797    let fraction = rank - lower as f64;
798
799    if lower == upper || upper >= n {
800        values[lower.min(n - 1)]
801    } else {
802        values[lower] + fraction * (values[upper] - values[lower])
803    }
804}
805
806// =============================================================================
807// Compilation
808// =============================================================================
809
810/// Compile a parsed `CorrelationRule` into a `CompiledCorrelation`.
811pub fn compile_correlation(rule: &CorrelationRule) -> Result<CompiledCorrelation> {
812    // Build group-by fields, resolving aliases
813    let alias_map: HashMap<&str, &FieldAlias> =
814        rule.aliases.iter().map(|a| (a.alias.as_str(), a)).collect();
815
816    let group_by: Vec<GroupByField> = rule
817        .group_by
818        .iter()
819        .map(|field_name| {
820            if let Some(alias) = alias_map.get(field_name.as_str()) {
821                GroupByField::Aliased {
822                    alias: field_name.clone(),
823                    mapping: alias.mapping.clone(),
824                }
825            } else {
826                GroupByField::Direct(field_name.clone())
827            }
828        })
829        .collect();
830
831    // Compile condition
832    let (condition, extended_expr) = compile_condition(&rule.condition, rule.correlation_type)?;
833
834    // Resolve per-correlation overrides from custom attributes.
835    // These mirror the engine-level `rsigma.*` attributes but apply only
836    // to this correlation rule, taking precedence over engine defaults.
837    let suppress_secs = rule
838        .custom_attributes
839        .get("rsigma.suppress")
840        .and_then(|v| v.as_str())
841        .and_then(|s| rsigma_parser::Timespan::parse(s).ok())
842        .map(|ts| ts.seconds);
843
844    let action = rule
845        .custom_attributes
846        .get("rsigma.action")
847        .and_then(|v| v.as_str())
848        .and_then(|s| {
849            s.parse::<crate::correlation_engine::CorrelationAction>()
850                .ok()
851        });
852
853    let event_mode = rule
854        .custom_attributes
855        .get("rsigma.correlation_event_mode")
856        .and_then(|v| v.as_str())
857        .and_then(|s| {
858            s.parse::<crate::correlation_engine::CorrelationEventMode>()
859                .ok()
860        });
861
862    let max_events = rule
863        .custom_attributes
864        .get("rsigma.max_correlation_events")
865        .and_then(|v| v.as_str())
866        .and_then(|s| s.parse::<usize>().ok());
867
868    let custom_attributes = Arc::new(crate::compiler::yaml_to_json_map(&rule.custom_attributes));
869
870    Ok(CompiledCorrelation {
871        id: rule.id.clone(),
872        name: rule.name.clone(),
873        title: rule.title.clone(),
874        level: rule.level,
875        tags: rule.tags.clone(),
876        correlation_type: rule.correlation_type,
877        rule_refs: rule.rules.clone(),
878        group_by,
879        timespan_secs: rule.timespan.seconds,
880        condition,
881        extended_expr,
882        generate: rule.generate,
883        suppress_secs,
884        action,
885        event_mode,
886        max_events,
887        custom_attributes,
888    })
889}
890
891/// Compile a `CorrelationCondition` into a `CompiledCondition` and optional expression.
892fn compile_condition(
893    cond: &CorrelationCondition,
894    corr_type: CorrelationType,
895) -> Result<(CompiledCondition, Option<ConditionExpr>)> {
896    match cond {
897        CorrelationCondition::Threshold { predicates, field } => Ok((
898            CompiledCondition {
899                field: field.clone(),
900                predicates: predicates
901                    .iter()
902                    .map(|(op, count)| (*op, *count as f64))
903                    .collect(),
904            },
905            None,
906        )),
907        CorrelationCondition::Extended(expr) => {
908            match corr_type {
909                CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
910                    // For extended conditions, the threshold is a dummy (gte: 1)
911                    // since the actual evaluation is done via the expression tree.
912                    Ok((
913                        CompiledCondition {
914                            field: None,
915                            predicates: vec![(ConditionOperator::Gte, 1.0)],
916                        },
917                        Some(expr.clone()),
918                    ))
919                }
920                _ => Err(EvalError::CorrelationError(
921                    "Extended conditions are only supported for temporal correlation types"
922                        .to_string(),
923                )),
924            }
925        }
926    }
927}
928
929#[cfg(test)]
930mod tests {
931    use super::*;
932    use crate::event::JsonEvent;
933    use serde_json::json;
934
935    #[test]
936    fn test_group_key_extract() {
937        let v = json!({"User": "admin", "Host": "srv01"});
938        let event = JsonEvent::borrow(&v);
939        let group_by = vec![
940            GroupByField::Direct("User".to_string()),
941            GroupByField::Direct("Host".to_string()),
942        ];
943        let key = GroupKey::extract(&event, &group_by, &["rule1"]);
944        assert_eq!(
945            key.0,
946            vec![Some("admin".to_string()), Some("srv01".to_string())]
947        );
948    }
949
950    #[test]
951    fn test_group_key_missing_field() {
952        let v = json!({"User": "admin"});
953        let event = JsonEvent::borrow(&v);
954        let group_by = vec![
955            GroupByField::Direct("User".to_string()),
956            GroupByField::Direct("Host".to_string()),
957        ];
958        let key = GroupKey::extract(&event, &group_by, &["rule1"]);
959        assert_eq!(key.0, vec![Some("admin".to_string()), None]);
960    }
961
962    #[test]
963    fn test_group_key_aliased() {
964        let v = json!({"source.ip": "10.0.0.1"});
965        let event = JsonEvent::borrow(&v);
966        let group_by = vec![GroupByField::Aliased {
967            alias: "internal_ip".to_string(),
968            mapping: HashMap::from([
969                ("rule_a".to_string(), "source.ip".to_string()),
970                ("rule_b".to_string(), "destination.ip".to_string()),
971            ]),
972        }];
973        let key = GroupKey::extract(&event, &group_by, &["rule_a"]);
974        assert_eq!(key.0, vec![Some("10.0.0.1".to_string())]);
975    }
976
977    #[test]
978    fn test_condition_check() {
979        let cond = CompiledCondition {
980            field: None,
981            predicates: vec![(ConditionOperator::Gte, 100.0)],
982        };
983        assert!(!cond.check(99.0));
984        assert!(cond.check(100.0));
985        assert!(cond.check(101.0));
986    }
987
988    #[test]
989    fn test_condition_check_range() {
990        let cond = CompiledCondition {
991            field: None,
992            predicates: vec![
993                (ConditionOperator::Gt, 100.0),
994                (ConditionOperator::Lte, 200.0),
995            ],
996        };
997        assert!(!cond.check(100.0));
998        assert!(cond.check(101.0));
999        assert!(cond.check(200.0));
1000        assert!(!cond.check(201.0));
1001    }
1002
1003    #[test]
1004    fn test_window_event_count() {
1005        let mut state = WindowState::new_for(CorrelationType::EventCount);
1006        for i in 0..5 {
1007            state.push_event_count(1000 + i);
1008        }
1009        let cond = CompiledCondition {
1010            field: None,
1011            predicates: vec![(ConditionOperator::Gte, 5.0)],
1012        };
1013        assert_eq!(
1014            state.check_condition(&cond, CorrelationType::EventCount, &[], None),
1015            Some(5.0)
1016        );
1017    }
1018
1019    #[test]
1020    fn test_window_event_count_eviction() {
1021        let mut state = WindowState::new_for(CorrelationType::EventCount);
1022        for i in 0..10 {
1023            state.push_event_count(1000 + i);
1024        }
1025        // Evict events before ts=1005
1026        state.evict(1005);
1027        let cond = CompiledCondition {
1028            field: None,
1029            predicates: vec![(ConditionOperator::Gte, 5.0)],
1030        };
1031        assert_eq!(
1032            state.check_condition(&cond, CorrelationType::EventCount, &[], None),
1033            Some(5.0)
1034        );
1035    }
1036
1037    #[test]
1038    fn test_window_value_count() {
1039        let mut state = WindowState::new_for(CorrelationType::ValueCount);
1040        state.push_value_count(1000, "user1".to_string());
1041        state.push_value_count(1001, "user2".to_string());
1042        state.push_value_count(1002, "user1".to_string()); // duplicate
1043        state.push_value_count(1003, "user3".to_string());
1044
1045        let cond = CompiledCondition {
1046            field: Some("User".to_string()),
1047            predicates: vec![(ConditionOperator::Gte, 3.0)],
1048        };
1049        assert_eq!(
1050            state.check_condition(&cond, CorrelationType::ValueCount, &[], None),
1051            Some(3.0)
1052        );
1053    }
1054
1055    #[test]
1056    fn test_window_temporal() {
1057        let refs = vec!["rule_a".to_string(), "rule_b".to_string()];
1058        let mut state = WindowState::new_for(CorrelationType::Temporal);
1059        state.push_temporal(1000, "rule_a");
1060        // Only rule_a fired — condition: all refs must fire
1061        let cond = CompiledCondition {
1062            field: None,
1063            predicates: vec![(ConditionOperator::Gte, 2.0)],
1064        };
1065        assert!(
1066            state
1067                .check_condition(&cond, CorrelationType::Temporal, &refs, None)
1068                .is_none()
1069        );
1070
1071        // Now rule_b fires too
1072        state.push_temporal(1001, "rule_b");
1073        assert_eq!(
1074            state.check_condition(&cond, CorrelationType::Temporal, &refs, None),
1075            Some(2.0)
1076        );
1077    }
1078
1079    #[test]
1080    fn test_window_temporal_ordered() {
1081        let refs = vec![
1082            "rule_a".to_string(),
1083            "rule_b".to_string(),
1084            "rule_c".to_string(),
1085        ];
1086        let mut state = WindowState::new_for(CorrelationType::TemporalOrdered);
1087        // Fire in order: a, b, c
1088        state.push_temporal(1000, "rule_a");
1089        state.push_temporal(1001, "rule_b");
1090        state.push_temporal(1002, "rule_c");
1091
1092        let cond = CompiledCondition {
1093            field: None,
1094            predicates: vec![(ConditionOperator::Gte, 3.0)],
1095        };
1096        assert!(
1097            state
1098                .check_condition(&cond, CorrelationType::TemporalOrdered, &refs, None)
1099                .is_some()
1100        );
1101    }
1102
1103    #[test]
1104    fn test_window_temporal_ordered_wrong_order() {
1105        let refs = vec!["rule_a".to_string(), "rule_b".to_string()];
1106        let mut state = WindowState::new_for(CorrelationType::TemporalOrdered);
1107        // Fire in wrong order: b before a
1108        state.push_temporal(1000, "rule_b");
1109        state.push_temporal(1001, "rule_a");
1110
1111        let cond = CompiledCondition {
1112            field: None,
1113            predicates: vec![(ConditionOperator::Gte, 2.0)],
1114        };
1115        assert!(
1116            state
1117                .check_condition(&cond, CorrelationType::TemporalOrdered, &refs, None)
1118                .is_none()
1119        );
1120    }
1121
1122    #[test]
1123    fn test_window_value_sum() {
1124        let mut state = WindowState::new_for(CorrelationType::ValueSum);
1125        state.push_numeric(1000, 500.0);
1126        state.push_numeric(1001, 600.0);
1127
1128        let cond = CompiledCondition {
1129            field: Some("bytes_sent".to_string()),
1130            predicates: vec![(ConditionOperator::Gt, 1000.0)],
1131        };
1132        assert_eq!(
1133            state.check_condition(&cond, CorrelationType::ValueSum, &[], None),
1134            Some(1100.0)
1135        );
1136    }
1137
1138    #[test]
1139    fn test_window_value_avg() {
1140        let mut state = WindowState::new_for(CorrelationType::ValueAvg);
1141        state.push_numeric(1000, 100.0);
1142        state.push_numeric(1001, 200.0);
1143        state.push_numeric(1002, 300.0);
1144
1145        let cond = CompiledCondition {
1146            field: Some("bytes".to_string()),
1147            predicates: vec![(ConditionOperator::Gte, 200.0)],
1148        };
1149        assert_eq!(
1150            state.check_condition(&cond, CorrelationType::ValueAvg, &[], None),
1151            Some(200.0)
1152        );
1153    }
1154
1155    #[test]
1156    fn test_window_value_median() {
1157        let mut state = WindowState::new_for(CorrelationType::ValueMedian);
1158        state.push_numeric(1000, 10.0);
1159        state.push_numeric(1001, 20.0);
1160        state.push_numeric(1002, 30.0);
1161
1162        let cond = CompiledCondition {
1163            field: Some("latency".to_string()),
1164            predicates: vec![(ConditionOperator::Gte, 20.0)],
1165        };
1166        assert_eq!(
1167            state.check_condition(&cond, CorrelationType::ValueMedian, &[], None),
1168            Some(20.0)
1169        );
1170    }
1171
1172    #[test]
1173    fn test_compile_correlation_basic() {
1174        use rsigma_parser::parse_sigma_yaml;
1175
1176        let yaml = r#"
1177title: Base Rule
1178id: f305fd62-beca-47da-ad95-7690a0620084
1179logsource:
1180    product: aws
1181    service: cloudtrail
1182detection:
1183    selection:
1184        eventSource: "s3.amazonaws.com"
1185    condition: selection
1186level: low
1187---
1188title: Multiple AWS bucket enumerations
1189id: be246094-01d3-4bba-88de-69e582eba0cc
1190status: experimental
1191correlation:
1192    type: event_count
1193    rules:
1194        - f305fd62-beca-47da-ad95-7690a0620084
1195    group-by:
1196        - userIdentity.arn
1197    timespan: 1h
1198    condition:
1199        gte: 100
1200level: high
1201"#;
1202        let collection = parse_sigma_yaml(yaml).unwrap();
1203        assert_eq!(collection.correlations.len(), 1);
1204
1205        let compiled = compile_correlation(&collection.correlations[0]).unwrap();
1206        assert_eq!(compiled.correlation_type, CorrelationType::EventCount);
1207        assert_eq!(compiled.timespan_secs, 3600);
1208        assert_eq!(compiled.rule_refs.len(), 1);
1209        assert_eq!(compiled.group_by.len(), 1);
1210        assert!(compiled.condition.check(100.0));
1211        assert!(!compiled.condition.check(99.0));
1212    }
1213
1214    // =========================================================================
1215    // Extended temporal condition tests
1216    // =========================================================================
1217
1218    #[test]
1219    fn test_eval_temporal_expr_and() {
1220        let mut rule_hits = HashMap::new();
1221        rule_hits.insert("rule_a".to_string(), VecDeque::from([1000]));
1222        rule_hits.insert("rule_b".to_string(), VecDeque::from([1001]));
1223
1224        let expr = ConditionExpr::And(vec![
1225            ConditionExpr::Identifier("rule_a".to_string()),
1226            ConditionExpr::Identifier("rule_b".to_string()),
1227        ]);
1228        assert!(eval_temporal_expr(&expr, &rule_hits));
1229    }
1230
1231    #[test]
1232    fn test_eval_temporal_expr_and_incomplete() {
1233        let mut rule_hits = HashMap::new();
1234        rule_hits.insert("rule_a".to_string(), VecDeque::from([1000]));
1235        // rule_b not fired
1236
1237        let expr = ConditionExpr::And(vec![
1238            ConditionExpr::Identifier("rule_a".to_string()),
1239            ConditionExpr::Identifier("rule_b".to_string()),
1240        ]);
1241        assert!(!eval_temporal_expr(&expr, &rule_hits));
1242    }
1243
1244    #[test]
1245    fn test_eval_temporal_expr_or() {
1246        let mut rule_hits = HashMap::new();
1247        rule_hits.insert("rule_a".to_string(), VecDeque::from([1000]));
1248
1249        let expr = ConditionExpr::Or(vec![
1250            ConditionExpr::Identifier("rule_a".to_string()),
1251            ConditionExpr::Identifier("rule_b".to_string()),
1252        ]);
1253        assert!(eval_temporal_expr(&expr, &rule_hits));
1254    }
1255
1256    #[test]
1257    fn test_eval_temporal_expr_not() {
1258        let rule_hits = HashMap::new();
1259
1260        let expr = ConditionExpr::Not(Box::new(ConditionExpr::Identifier("rule_a".to_string())));
1261        assert!(eval_temporal_expr(&expr, &rule_hits));
1262    }
1263
1264    #[test]
1265    fn test_eval_temporal_expr_complex() {
1266        let mut rule_hits = HashMap::new();
1267        rule_hits.insert("rule_a".to_string(), VecDeque::from([1000]));
1268        rule_hits.insert("rule_b".to_string(), VecDeque::from([1001]));
1269        // rule_c NOT fired
1270
1271        // (rule_a and rule_b) and not rule_c
1272        let expr = ConditionExpr::And(vec![
1273            ConditionExpr::And(vec![
1274                ConditionExpr::Identifier("rule_a".to_string()),
1275                ConditionExpr::Identifier("rule_b".to_string()),
1276            ]),
1277            ConditionExpr::Not(Box::new(ConditionExpr::Identifier("rule_c".to_string()))),
1278        ]);
1279        assert!(eval_temporal_expr(&expr, &rule_hits));
1280    }
1281
1282    #[test]
1283    fn test_check_condition_with_extended_expr() {
1284        let refs = vec!["rule_a".to_string(), "rule_b".to_string()];
1285        let mut state = WindowState::new_for(CorrelationType::Temporal);
1286        state.push_temporal(1000, "rule_a");
1287        state.push_temporal(1001, "rule_b");
1288
1289        let cond = CompiledCondition {
1290            field: None,
1291            predicates: vec![(ConditionOperator::Gte, 1.0)],
1292        };
1293        let expr = ConditionExpr::And(vec![
1294            ConditionExpr::Identifier("rule_a".to_string()),
1295            ConditionExpr::Identifier("rule_b".to_string()),
1296        ]);
1297
1298        // With expression: should match (both rules fired)
1299        assert!(
1300            state
1301                .check_condition(&cond, CorrelationType::Temporal, &refs, Some(&expr))
1302                .is_some()
1303        );
1304
1305        // Now test with only rule_a: expression should fail
1306        let mut state2 = WindowState::new_for(CorrelationType::Temporal);
1307        state2.push_temporal(1000, "rule_a");
1308        assert!(
1309            state2
1310                .check_condition(&cond, CorrelationType::Temporal, &refs, Some(&expr))
1311                .is_none()
1312        );
1313    }
1314
1315    // =========================================================================
1316    // Percentile linear interpolation tests
1317    // =========================================================================
1318
1319    #[test]
1320    fn test_percentile_linear_interp_single() {
1321        assert!((percentile_linear_interp(&[42.0], 50.0) - 42.0).abs() < f64::EPSILON);
1322    }
1323
1324    #[test]
1325    fn test_percentile_linear_interp_basic() {
1326        // Values: [1, 2, 3, 4, 5]
1327        let values = &[1.0, 2.0, 3.0, 4.0, 5.0];
1328        // 0th percentile = 1.0
1329        assert!((percentile_linear_interp(values, 0.0) - 1.0).abs() < f64::EPSILON);
1330        // 25th percentile = 2.0
1331        assert!((percentile_linear_interp(values, 25.0) - 2.0).abs() < f64::EPSILON);
1332        // 50th percentile = 3.0
1333        assert!((percentile_linear_interp(values, 50.0) - 3.0).abs() < f64::EPSILON);
1334        // 75th percentile = 4.0
1335        assert!((percentile_linear_interp(values, 75.0) - 4.0).abs() < f64::EPSILON);
1336        // 100th percentile = 5.0
1337        assert!((percentile_linear_interp(values, 100.0) - 5.0).abs() < f64::EPSILON);
1338    }
1339
1340    #[test]
1341    fn test_percentile_linear_interp_interpolation() {
1342        // Values: [10, 20, 30, 40]
1343        let values = &[10.0, 20.0, 30.0, 40.0];
1344        // 50th percentile: rank = 0.5 * 3 = 1.5, interp between 20 and 30 = 25
1345        assert!((percentile_linear_interp(values, 50.0) - 25.0).abs() < f64::EPSILON);
1346    }
1347
1348    #[test]
1349    fn test_percentile_linear_interp_1st_percentile() {
1350        // Values: [1, 2, 3, ..., 100]
1351        let values: Vec<f64> = (1..=100).map(|x| x as f64).collect();
1352        // 1st percentile = 1.0 + 0.01 * 99 * (2.0 - 1.0) ~ 1.99
1353        let p1 = percentile_linear_interp(&values, 1.0);
1354        assert!((p1 - 1.99).abs() < 0.01);
1355    }
1356
1357    #[test]
1358    fn test_value_percentile_check_condition() {
1359        let mut state = WindowState::new_for(CorrelationType::ValuePercentile);
1360        // Push 100 values: 1.0, 2.0, ..., 100.0
1361        for i in 1..=100 {
1362            state.push_numeric(1000 + i, i as f64);
1363        }
1364
1365        let cond = CompiledCondition {
1366            field: Some("latency".to_string()),
1367            // The condition threshold is used as the percentile rank
1368            predicates: vec![(ConditionOperator::Lte, 50.0)],
1369        };
1370        // 50th percentile of 1..100 should be ~50.5
1371        let result = state.check_condition(&cond, CorrelationType::ValuePercentile, &[], None);
1372        assert!(result.is_some());
1373        let val = result.unwrap();
1374        assert!((val - 50.5).abs() < 1.0, "expected ~50.5, got {val}");
1375    }
1376
1377    #[test]
1378    fn test_percentile_0th_and_100th() {
1379        let values = &[5.0, 10.0, 15.0, 20.0];
1380        assert!((percentile_linear_interp(values, 0.0) - 5.0).abs() < f64::EPSILON);
1381        assert!((percentile_linear_interp(values, 100.0) - 20.0).abs() < f64::EPSILON);
1382    }
1383
1384    #[test]
1385    fn test_percentile_two_values() {
1386        let values = &[10.0, 20.0];
1387        // 50th percentile between 10 and 20 = 15
1388        assert!((percentile_linear_interp(values, 50.0) - 15.0).abs() < f64::EPSILON);
1389        // 25th percentile = 12.5
1390        assert!((percentile_linear_interp(values, 25.0) - 12.5).abs() < f64::EPSILON);
1391    }
1392
1393    #[test]
1394    fn test_percentile_clamps_out_of_range() {
1395        let values = &[1.0, 2.0, 3.0];
1396        // Negative percentile clamps to 0
1397        assert!((percentile_linear_interp(values, -10.0) - 1.0).abs() < f64::EPSILON);
1398        // > 100 clamps to 100
1399        assert!((percentile_linear_interp(values, 150.0) - 3.0).abs() < f64::EPSILON);
1400    }
1401
1402    #[test]
1403    fn test_value_percentile_empty_window() {
1404        let state = WindowState::new_for(CorrelationType::ValuePercentile);
1405        let cond = CompiledCondition {
1406            field: Some("latency".to_string()),
1407            predicates: vec![(ConditionOperator::Lte, 50.0)],
1408        };
1409        // Empty window should return None
1410        assert!(
1411            state
1412                .check_condition(&cond, CorrelationType::ValuePercentile, &[], None)
1413                .is_none()
1414        );
1415    }
1416
1417    #[test]
1418    fn test_extended_temporal_or_single_rule() {
1419        // "rule_a or rule_b" — only rule_a fired
1420        let mut rule_hits = HashMap::new();
1421        rule_hits.insert("rule_a".to_string(), VecDeque::from([1000]));
1422
1423        let expr = ConditionExpr::Or(vec![
1424            ConditionExpr::Identifier("rule_a".to_string()),
1425            ConditionExpr::Identifier("rule_b".to_string()),
1426        ]);
1427        assert!(eval_temporal_expr(&expr, &rule_hits));
1428    }
1429
1430    #[test]
1431    fn test_extended_temporal_empty_hits() {
1432        let rule_hits = HashMap::new();
1433
1434        // "rule_a and rule_b" — nothing fired
1435        let expr = ConditionExpr::And(vec![
1436            ConditionExpr::Identifier("rule_a".to_string()),
1437            ConditionExpr::Identifier("rule_b".to_string()),
1438        ]);
1439        assert!(!eval_temporal_expr(&expr, &rule_hits));
1440
1441        // "rule_a or rule_b" — nothing fired
1442        let expr_or = ConditionExpr::Or(vec![
1443            ConditionExpr::Identifier("rule_a".to_string()),
1444            ConditionExpr::Identifier("rule_b".to_string()),
1445        ]);
1446        assert!(!eval_temporal_expr(&expr_or, &rule_hits));
1447    }
1448
1449    #[test]
1450    fn test_extended_temporal_with_empty_deque() {
1451        // Rule exists in map but with empty deque (all evicted)
1452        let mut rule_hits = HashMap::new();
1453        rule_hits.insert("rule_a".to_string(), VecDeque::new());
1454        rule_hits.insert("rule_b".to_string(), VecDeque::from([1000]));
1455
1456        let expr = ConditionExpr::And(vec![
1457            ConditionExpr::Identifier("rule_a".to_string()),
1458            ConditionExpr::Identifier("rule_b".to_string()),
1459        ]);
1460        // rule_a has empty deque — should be treated as not fired
1461        assert!(!eval_temporal_expr(&expr, &rule_hits));
1462    }
1463
1464    #[test]
1465    fn test_check_condition_temporal_no_extended_expr() {
1466        // Standard temporal without extended expr: uses threshold count
1467        let refs = vec![
1468            "rule_a".to_string(),
1469            "rule_b".to_string(),
1470            "rule_c".to_string(),
1471        ];
1472        let mut state = WindowState::new_for(CorrelationType::Temporal);
1473        state.push_temporal(1000, "rule_a");
1474        state.push_temporal(1001, "rule_b");
1475
1476        // Threshold: at least 2 rules must fire
1477        let cond = CompiledCondition {
1478            field: None,
1479            predicates: vec![(ConditionOperator::Gte, 2.0)],
1480        };
1481        // Without extended expr: 2 of 3 rules fired, meets gte 2
1482        assert_eq!(
1483            state.check_condition(&cond, CorrelationType::Temporal, &refs, None),
1484            Some(2.0)
1485        );
1486
1487        // With threshold 3: not enough
1488        let cond3 = CompiledCondition {
1489            field: None,
1490            predicates: vec![(ConditionOperator::Gte, 3.0)],
1491        };
1492        assert!(
1493            state
1494                .check_condition(&cond3, CorrelationType::Temporal, &refs, None)
1495                .is_none()
1496        );
1497    }
1498
1499    // =========================================================================
1500    // EventBuffer tests
1501    // =========================================================================
1502
1503    #[test]
1504    fn test_event_buffer_push_and_decompress() {
1505        let mut buf = EventBuffer::new(10);
1506        let event = json!({"User": "admin", "action": "login", "src_ip": "10.0.0.1"});
1507        buf.push(1000, &event);
1508
1509        assert_eq!(buf.len(), 1);
1510        assert!(!buf.is_empty());
1511
1512        let events = buf.decompress_all();
1513        assert_eq!(events.len(), 1);
1514        assert_eq!(events[0], event);
1515    }
1516
1517    #[test]
1518    fn test_event_buffer_compression_saves_memory() {
1519        let mut buf = EventBuffer::new(100);
1520        // Push a realistic-sized event (~500 bytes JSON)
1521        let event = json!({
1522            "User": "admin",
1523            "action": "login",
1524            "src_ip": "192.168.1.100",
1525            "dst_ip": "10.0.0.1",
1526            "EventTime": "2024-07-10T12:30:00Z",
1527            "process": "sshd",
1528            "host": "production-server-01.example.com",
1529            "message": "Accepted password for admin from 192.168.1.100 port 22 ssh2",
1530            "severity": "info",
1531            "tags": ["authentication", "network", "linux"]
1532        });
1533
1534        let raw_size = serde_json::to_vec(&event).unwrap().len();
1535        buf.push(1000, &event);
1536        let compressed_size = buf.compressed_bytes();
1537
1538        // Compressed should be notably smaller than raw
1539        assert!(
1540            compressed_size < raw_size,
1541            "Compressed {compressed_size}B should be less than raw {raw_size}B"
1542        );
1543
1544        // Verify roundtrip
1545        let events = buf.decompress_all();
1546        assert_eq!(events[0], event);
1547    }
1548
1549    #[test]
1550    fn test_event_buffer_max_events_cap() {
1551        let mut buf = EventBuffer::new(3);
1552
1553        for i in 0..5 {
1554            buf.push(1000 + i, &json!({"idx": i}));
1555        }
1556
1557        // Only the last 3 should remain
1558        assert_eq!(buf.len(), 3);
1559        let events = buf.decompress_all();
1560        assert_eq!(events[0], json!({"idx": 2}));
1561        assert_eq!(events[1], json!({"idx": 3}));
1562        assert_eq!(events[2], json!({"idx": 4}));
1563    }
1564
1565    #[test]
1566    fn test_event_buffer_eviction() {
1567        let mut buf = EventBuffer::new(10);
1568        for i in 0..5 {
1569            buf.push(1000 + i, &json!({"idx": i}));
1570        }
1571        assert_eq!(buf.len(), 5);
1572
1573        // Evict everything before ts 1003
1574        buf.evict(1003);
1575        assert_eq!(buf.len(), 2);
1576
1577        let events = buf.decompress_all();
1578        assert_eq!(events[0], json!({"idx": 3}));
1579        assert_eq!(events[1], json!({"idx": 4}));
1580    }
1581
1582    #[test]
1583    fn test_event_buffer_clear() {
1584        let mut buf = EventBuffer::new(10);
1585        buf.push(1000, &json!({"a": 1}));
1586        buf.push(1001, &json!({"b": 2}));
1587        assert_eq!(buf.len(), 2);
1588
1589        buf.clear();
1590        assert!(buf.is_empty());
1591        assert_eq!(buf.len(), 0);
1592        assert_eq!(buf.compressed_bytes(), 0);
1593    }
1594
1595    #[test]
1596    fn test_compress_decompress_roundtrip() {
1597        // Test various JSON shapes
1598        let values = vec![
1599            json!(null),
1600            json!(42),
1601            json!("hello world"),
1602            json!({"nested": {"deep": [1, 2, 3]}}),
1603            json!([1, "two", null, true, {"five": 5}]),
1604        ];
1605        for val in values {
1606            let compressed = compress_event(&val).unwrap();
1607            let decompressed = decompress_event(&compressed).unwrap();
1608            assert_eq!(decompressed, val, "Roundtrip failed for {val}");
1609        }
1610    }
1611
1612    // =========================================================================
1613    // EventRefBuffer tests
1614    // =========================================================================
1615
1616    #[test]
1617    fn test_event_ref_buffer_push_and_refs() {
1618        let mut buf = EventRefBuffer::new(10);
1619        buf.push(1000, &json!({"id": "evt-1", "data": "hello"}));
1620        buf.push(1001, &json!({"_id": 42, "data": "world"}));
1621        buf.push(1002, &json!({"data": "no-id"}));
1622
1623        assert_eq!(buf.len(), 3);
1624        let refs = buf.refs();
1625        assert_eq!(refs[0].timestamp, 1000);
1626        assert_eq!(refs[0].id, Some("evt-1".to_string()));
1627        assert_eq!(refs[1].timestamp, 1001);
1628        assert_eq!(refs[1].id, Some("42".to_string()));
1629        assert_eq!(refs[2].timestamp, 1002);
1630        assert_eq!(refs[2].id, None);
1631    }
1632
1633    #[test]
1634    fn test_event_ref_buffer_max_cap() {
1635        let mut buf = EventRefBuffer::new(3);
1636        for i in 0..5 {
1637            buf.push(1000 + i, &json!({"id": format!("e-{i}")}));
1638        }
1639        assert_eq!(buf.len(), 3);
1640        let refs = buf.refs();
1641        assert_eq!(refs[0].id, Some("e-2".to_string()));
1642        assert_eq!(refs[1].id, Some("e-3".to_string()));
1643        assert_eq!(refs[2].id, Some("e-4".to_string()));
1644    }
1645
1646    #[test]
1647    fn test_event_ref_buffer_eviction() {
1648        let mut buf = EventRefBuffer::new(10);
1649        for i in 0..5 {
1650            buf.push(1000 + i, &json!({"id": format!("e-{i}")}));
1651        }
1652        buf.evict(1003);
1653        assert_eq!(buf.len(), 2);
1654        let refs = buf.refs();
1655        assert_eq!(refs[0].timestamp, 1003);
1656        assert_eq!(refs[1].timestamp, 1004);
1657    }
1658
1659    #[test]
1660    fn test_event_ref_buffer_clear() {
1661        let mut buf = EventRefBuffer::new(10);
1662        buf.push(1000, &json!({"id": "a"}));
1663        buf.push(1001, &json!({"id": "b"}));
1664        assert_eq!(buf.len(), 2);
1665
1666        buf.clear();
1667        assert!(buf.is_empty());
1668        assert_eq!(buf.len(), 0);
1669    }
1670
1671    #[test]
1672    fn test_extract_event_id_common_fields() {
1673        assert_eq!(
1674            extract_event_id(&json!({"id": "abc"})),
1675            Some("abc".to_string())
1676        );
1677        assert_eq!(
1678            extract_event_id(&json!({"_id": 123})),
1679            Some("123".to_string())
1680        );
1681        assert_eq!(
1682            extract_event_id(&json!({"event_id": "x-1"})),
1683            Some("x-1".to_string())
1684        );
1685        assert_eq!(
1686            extract_event_id(&json!({"EventRecordID": 999})),
1687            Some("999".to_string())
1688        );
1689        assert_eq!(extract_event_id(&json!({"no_id_field": true})), None);
1690    }
1691
1692    #[test]
1693    fn test_compile_correlation_with_custom_attributes() {
1694        use rsigma_parser::*;
1695
1696        let mut custom_attributes: HashMap<String, serde_yaml::Value> =
1697            std::collections::HashMap::new();
1698        custom_attributes.insert(
1699            "rsigma.correlation_event_mode".to_string(),
1700            serde_yaml::Value::String("refs".to_string()),
1701        );
1702        custom_attributes.insert(
1703            "rsigma.max_correlation_events".to_string(),
1704            serde_yaml::Value::String("25".to_string()),
1705        );
1706        custom_attributes.insert(
1707            "rsigma.suppress".to_string(),
1708            serde_yaml::Value::String("5m".to_string()),
1709        );
1710        custom_attributes.insert(
1711            "rsigma.action".to_string(),
1712            serde_yaml::Value::String("reset".to_string()),
1713        );
1714
1715        let rule = CorrelationRule {
1716            title: "Test Corr".to_string(),
1717            id: Some("corr-1".to_string()),
1718            name: None,
1719            status: None,
1720            description: None,
1721            author: None,
1722            date: None,
1723            modified: None,
1724            references: vec![],
1725            taxonomy: None,
1726            tags: vec![],
1727            falsepositives: vec![],
1728            level: Some(Level::High),
1729            correlation_type: CorrelationType::EventCount,
1730            rules: vec!["rule-1".to_string()],
1731            group_by: vec!["User".to_string()],
1732            timespan: Timespan::parse("60s").unwrap(),
1733            condition: CorrelationCondition::Threshold {
1734                predicates: vec![(ConditionOperator::Gte, 5)],
1735                field: None,
1736            },
1737            aliases: vec![],
1738            generate: false,
1739            custom_attributes,
1740        };
1741
1742        let compiled = compile_correlation(&rule).unwrap();
1743
1744        // Per-correlation overrides should be resolved from custom_attributes
1745        assert_eq!(
1746            compiled.event_mode,
1747            Some(crate::correlation_engine::CorrelationEventMode::Refs)
1748        );
1749        assert_eq!(compiled.max_events, Some(25));
1750        assert_eq!(compiled.suppress_secs, Some(300)); // 5m = 300s
1751        assert_eq!(
1752            compiled.action,
1753            Some(crate::correlation_engine::CorrelationAction::Reset)
1754        );
1755    }
1756}