Skip to main content

rsigma_eval/correlation/
window.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2
3use rsigma_parser::{ConditionExpr, CorrelationType, WindowMode};
4use serde::Serialize;
5
6use super::CompiledCondition;
7
8// =============================================================================
9// Window State
10// =============================================================================
11
12/// Per-group mutable state within a time window.
13///
14/// Each variant matches the type of aggregation being performed.
15#[derive(Debug, Clone, Serialize, serde::Deserialize)]
16pub enum WindowState {
17    /// For `event_count`: timestamps of matching events.
18    EventCount { timestamps: VecDeque<i64> },
19    /// For `value_count`: (timestamp, field_value) pairs.
20    ValueCount { entries: VecDeque<(i64, String)> },
21    /// For `temporal` / `temporal_ordered`: rule_ref -> list of hit timestamps.
22    Temporal {
23        rule_hits: HashMap<String, VecDeque<i64>>,
24    },
25    /// For `value_sum`, `value_avg`, `value_percentile`, `value_median`:
26    /// (timestamp, numeric_value) pairs.
27    NumericAgg { entries: VecDeque<(i64, f64)> },
28}
29
30impl WindowState {
31    /// Create a new empty window state for the given correlation type.
32    pub fn new_for(corr_type: CorrelationType) -> Self {
33        match corr_type {
34            CorrelationType::EventCount => WindowState::EventCount {
35                timestamps: VecDeque::new(),
36            },
37            CorrelationType::ValueCount => WindowState::ValueCount {
38                entries: VecDeque::new(),
39            },
40            CorrelationType::Temporal | CorrelationType::TemporalOrdered => WindowState::Temporal {
41                rule_hits: HashMap::new(),
42            },
43            CorrelationType::ValueSum
44            | CorrelationType::ValueAvg
45            | CorrelationType::ValuePercentile
46            | CorrelationType::ValueMedian => WindowState::NumericAgg {
47                entries: VecDeque::new(),
48            },
49        }
50    }
51
52    /// Remove all entries older than the cutoff timestamp.
53    pub fn evict(&mut self, cutoff: i64) {
54        match self {
55            WindowState::EventCount { timestamps } => {
56                while timestamps.front().is_some_and(|&t| t < cutoff) {
57                    timestamps.pop_front();
58                }
59            }
60            WindowState::ValueCount { entries } => {
61                while entries.front().is_some_and(|(t, _)| *t < cutoff) {
62                    entries.pop_front();
63                }
64            }
65            WindowState::Temporal { rule_hits } => {
66                for timestamps in rule_hits.values_mut() {
67                    while timestamps.front().is_some_and(|&t| t < cutoff) {
68                        timestamps.pop_front();
69                    }
70                }
71                // Remove empty rule entries
72                rule_hits.retain(|_, ts| !ts.is_empty());
73            }
74            WindowState::NumericAgg { entries } => {
75                while entries.front().is_some_and(|(t, _)| *t < cutoff) {
76                    entries.pop_front();
77                }
78            }
79        }
80    }
81
82    /// Returns true if this state has no entries.
83    pub fn is_empty(&self) -> bool {
84        match self {
85            WindowState::EventCount { timestamps } => timestamps.is_empty(),
86            WindowState::ValueCount { entries } => entries.is_empty(),
87            WindowState::Temporal { rule_hits } => rule_hits.is_empty(),
88            WindowState::NumericAgg { entries } => entries.is_empty(),
89        }
90    }
91
92    /// Returns the most recent timestamp in this window, or `None` if empty.
93    pub fn latest_timestamp(&self) -> Option<i64> {
94        match self {
95            WindowState::EventCount { timestamps } => timestamps.back().copied(),
96            WindowState::ValueCount { entries } => entries.back().map(|(t, _)| *t),
97            WindowState::Temporal { rule_hits } => {
98                rule_hits.values().filter_map(|ts| ts.back().copied()).max()
99            }
100            WindowState::NumericAgg { entries } => entries.back().map(|(t, _)| *t),
101        }
102    }
103
104    /// Returns the oldest timestamp in this window, or `None` if empty.
105    ///
106    /// Entries are appended in arrival order, so the front of each deque holds
107    /// the earliest timestamp. For temporal state the minimum is taken across
108    /// all per-rule deques.
109    pub fn earliest_timestamp(&self) -> Option<i64> {
110        match self {
111            WindowState::EventCount { timestamps } => timestamps.front().copied(),
112            WindowState::ValueCount { entries } => entries.front().map(|(t, _)| *t),
113            WindowState::Temporal { rule_hits } => rule_hits
114                .values()
115                .filter_map(|ts| ts.front().copied())
116                .min(),
117            WindowState::NumericAgg { entries } => entries.front().map(|(t, _)| *t),
118        }
119    }
120
121    /// Enforce a per-group entry cap by dropping the oldest entries.
122    ///
123    /// `cap` is the maximum number of retained entries (clamped to at least
124    /// 1). For temporal state the cap applies to each referenced rule's hit
125    /// deque independently, since the deques are what grow with event rate.
126    ///
127    /// When `preserve_front` is set (session windows), the oldest entry is
128    /// kept and truncation happens behind it: the front entry anchors the
129    /// session's total-span cap (`timespan`), so dropping it would silently
130    /// let the session extend past the cap. Truncation can only under-count;
131    /// it never extends a window.
132    pub fn truncate_oldest(&mut self, cap: usize, preserve_front: bool) {
133        let cap = cap.max(1);
134
135        fn truncate_deque<T>(deque: &mut VecDeque<T>, cap: usize, preserve_front: bool) {
136            if deque.len() <= cap {
137                return;
138            }
139            if preserve_front {
140                let anchor = deque.pop_front().expect("len > cap >= 1");
141                // Keep the anchor plus the newest cap - 1 entries.
142                let excess = deque.len() - (cap - 1);
143                deque.drain(..excess);
144                deque.push_front(anchor);
145            } else {
146                let excess = deque.len() - cap;
147                deque.drain(..excess);
148            }
149        }
150
151        match self {
152            WindowState::EventCount { timestamps } => {
153                truncate_deque(timestamps, cap, preserve_front);
154            }
155            WindowState::ValueCount { entries } => {
156                truncate_deque(entries, cap, preserve_front);
157            }
158            WindowState::Temporal { rule_hits } => {
159                for timestamps in rule_hits.values_mut() {
160                    truncate_deque(timestamps, cap, preserve_front);
161                }
162            }
163            WindowState::NumericAgg { entries } => {
164                truncate_deque(entries, cap, preserve_front);
165            }
166        }
167    }
168
169    /// Clear all entries from the window state (used by `CorrelationAction::Reset`).
170    pub fn clear(&mut self) {
171        match self {
172            WindowState::EventCount { timestamps } => timestamps.clear(),
173            WindowState::ValueCount { entries } => entries.clear(),
174            WindowState::Temporal { rule_hits } => rule_hits.clear(),
175            WindowState::NumericAgg { entries } => entries.clear(),
176        }
177    }
178
179    /// Record an event_count hit.
180    pub fn push_event_count(&mut self, ts: i64) {
181        if let WindowState::EventCount { timestamps } = self {
182            timestamps.push_back(ts);
183        }
184    }
185
186    /// Record a value_count hit with the field value.
187    pub fn push_value_count(&mut self, ts: i64, value: String) {
188        if let WindowState::ValueCount { entries } = self {
189            entries.push_back((ts, value));
190        }
191    }
192
193    /// Record a temporal hit for a specific rule reference.
194    pub fn push_temporal(&mut self, ts: i64, rule_ref: &str) {
195        if let WindowState::Temporal { rule_hits } = self {
196            rule_hits
197                .entry(rule_ref.to_string())
198                .or_default()
199                .push_back(ts);
200        }
201    }
202
203    /// Record a numeric aggregation value.
204    pub fn push_numeric(&mut self, ts: i64, value: f64) {
205        if let WindowState::NumericAgg { entries } = self {
206            entries.push_back((ts, value));
207        }
208    }
209
210    /// Evaluate the window state against the correlation condition.
211    ///
212    /// Returns `Some(aggregated_value)` if the condition is satisfied,
213    /// `None` otherwise.
214    ///
215    /// For temporal correlations with an extended expression, the expression
216    /// is evaluated against the set of rules that have fired in the window.
217    pub fn check_condition(
218        &self,
219        condition: &CompiledCondition,
220        corr_type: CorrelationType,
221        rule_refs: &[String],
222        extended_expr: Option<&ConditionExpr>,
223    ) -> Option<f64> {
224        let value = match (self, corr_type) {
225            (WindowState::EventCount { timestamps }, CorrelationType::EventCount) => {
226                timestamps.len() as f64
227            }
228            (WindowState::ValueCount { entries }, CorrelationType::ValueCount) => {
229                // Count distinct values
230                let distinct: HashSet<&String> = entries.iter().map(|(_, v)| v).collect();
231                distinct.len() as f64
232            }
233            (WindowState::Temporal { rule_hits }, CorrelationType::Temporal) => {
234                // If an extended expression is provided, evaluate it
235                if let Some(expr) = extended_expr {
236                    if eval_temporal_expr(expr, rule_hits) {
237                        // Return the count of fired rules as the value
238                        let fired: usize = rule_refs
239                            .iter()
240                            .filter(|r| rule_hits.get(r.as_str()).is_some_and(|ts| !ts.is_empty()))
241                            .count();
242                        return Some(fired as f64);
243                    } else {
244                        return None;
245                    }
246                }
247                // Default: count how many distinct referenced rules have fired
248                let fired: usize = rule_refs
249                    .iter()
250                    .filter(|r| rule_hits.get(r.as_str()).is_some_and(|ts| !ts.is_empty()))
251                    .count();
252                fired as f64
253            }
254            (WindowState::Temporal { rule_hits }, CorrelationType::TemporalOrdered) => {
255                // If an extended expression is provided, evaluate it first
256                if let Some(expr) = extended_expr
257                    && !eval_temporal_expr(expr, rule_hits)
258                {
259                    return None;
260                }
261                // Check if all referenced rules fired in order
262                if check_temporal_ordered(rule_refs, rule_hits) {
263                    rule_refs.len() as f64
264                } else {
265                    0.0
266                }
267            }
268            (WindowState::NumericAgg { entries }, CorrelationType::ValueSum) => {
269                entries.iter().map(|(_, v)| v).sum()
270            }
271            (WindowState::NumericAgg { entries }, CorrelationType::ValueAvg) => {
272                if entries.is_empty() {
273                    0.0
274                } else {
275                    let sum: f64 = entries.iter().map(|(_, v)| v).sum();
276                    sum / entries.len() as f64
277                }
278            }
279            (WindowState::NumericAgg { entries }, CorrelationType::ValuePercentile) => {
280                // Proper percentile calculation using linear interpolation.
281                // The condition threshold represents a percentile rank (0-100).
282                // We compute the value at that percentile from the window data.
283                if entries.is_empty() {
284                    return None;
285                }
286                let mut values: Vec<f64> = entries
287                    .iter()
288                    .map(|(_, v)| *v)
289                    .filter(|v| v.is_finite())
290                    .collect();
291                if values.is_empty() {
292                    return None;
293                }
294                values.sort_by(|a, b| a.total_cmp(b));
295                let percentile_rank = condition.percentile.map(|p| p as f64).unwrap_or(50.0);
296                let pval = percentile_linear_interp(&values, percentile_rank);
297                return Some(pval);
298            }
299            (WindowState::NumericAgg { entries }, CorrelationType::ValueMedian) => {
300                // An empty window has no median. Returning `0.0` here would
301                // spuriously satisfy predicates like `lte: 0` or `eq: 0`, so
302                // match the percentile branch and skip evaluation.
303                if entries.is_empty() {
304                    return None;
305                }
306                let mut values: Vec<f64> = entries
307                    .iter()
308                    .map(|(_, v)| *v)
309                    .filter(|v| v.is_finite())
310                    .collect();
311                if values.is_empty() {
312                    return None;
313                }
314                values.sort_by(|a, b| a.total_cmp(b));
315                let mid = values.len() / 2;
316                if values.len().is_multiple_of(2) && values.len() >= 2 {
317                    (values[mid - 1] + values[mid]) / 2.0
318                } else {
319                    values[mid]
320                }
321            }
322            _ => return None, // mismatched state/type
323        };
324
325        if condition.check(value) {
326            Some(value)
327        } else {
328            None
329        }
330    }
331}
332
333/// Start of the tumbling bucket that contains `ts`, aligned to epoch.
334///
335/// Uses `rem_euclid` so negative timestamps align to the bucket below them
336/// rather than toward zero.
337fn bucket_start(ts: i64, timespan: i64) -> i64 {
338    ts - ts.rem_euclid(timespan)
339}
340
341/// Outcome of a window's pre-insert maintenance for a new event.
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343pub enum WindowDecision {
344    /// The current window continues; push the event into it.
345    Extend,
346    /// The window rolled over and the state was cleared; push the event into
347    /// the fresh window. Callers must also clear any associated event buffers
348    /// so they stay in sync with the state.
349    Reset,
350    /// The event predates the current window (a late arrival in an earlier
351    /// tumbling bucket); do not push it and leave the state untouched.
352    Discard,
353}
354
355/// Apply a correlation window's pre-insert maintenance for a new event at `ts`,
356/// before the event is pushed into `state`.
357///
358/// - `Sliding` evicts entries older than `ts - timespan` (the existing default)
359///   and always extends.
360/// - `Tumbling` resets the window when `ts` falls in a *later*
361///   boundary-aligned bucket than the latest retained entry, and discards
362///   events that fall in an *earlier* bucket (a late arrival must not wipe the
363///   active bucket's accumulation).
364/// - `Session` resets when `ts` is more than `gap` after the latest entry, or
365///   when extending would push the total span past `timespan` (the hard cap).
366///
367/// Out-of-order arrivals follow the engine's arrival-order contract: decisions
368/// are made relative to the entries currently retained, not a global watermark.
369pub fn apply_window_open(
370    state: &mut WindowState,
371    ts: i64,
372    timespan_secs: u64,
373    window: WindowMode,
374    gap_secs: Option<u64>,
375) -> WindowDecision {
376    let timespan = timespan_secs as i64;
377    match window {
378        WindowMode::Sliding => {
379            state.evict(ts - timespan);
380            WindowDecision::Extend
381        }
382        WindowMode::Tumbling => {
383            if timespan <= 0 {
384                return WindowDecision::Extend;
385            }
386            match state.latest_timestamp() {
387                Some(last) if bucket_start(ts, timespan) > bucket_start(last, timespan) => {
388                    state.clear();
389                    WindowDecision::Reset
390                }
391                Some(last) if bucket_start(ts, timespan) < bucket_start(last, timespan) => {
392                    WindowDecision::Discard
393                }
394                _ => WindowDecision::Extend,
395            }
396        }
397        WindowMode::Session => {
398            // `gap` is required for session windows; fall back to `timespan` if
399            // it is somehow absent so the window still has a bound.
400            let gap = gap_secs.map(|g| g as i64).unwrap_or(timespan);
401            let reset = match (state.earliest_timestamp(), state.latest_timestamp()) {
402                (Some(start), Some(last)) => {
403                    (ts - last) > gap || (timespan > 0 && (ts - start) > timespan)
404                }
405                _ => false,
406            };
407            if reset {
408                state.clear();
409                WindowDecision::Reset
410            } else {
411                WindowDecision::Extend
412            }
413        }
414    }
415}
416
417/// Check if all referenced rules fired in the correct order within the window.
418///
419/// For `temporal_ordered`, each rule must have at least one hit, and there
420/// must exist a sequence of timestamps (one per rule) that is non-decreasing
421/// and follows the rule ordering.
422fn check_temporal_ordered(
423    rule_refs: &[String],
424    rule_hits: &HashMap<String, VecDeque<i64>>,
425) -> bool {
426    if rule_refs.is_empty() {
427        return true;
428    }
429
430    // All rules must have at least one hit
431    for r in rule_refs {
432        if rule_hits.get(r.as_str()).is_none_or(|ts| ts.is_empty()) {
433            return false;
434        }
435    }
436
437    // Check if there's a valid ordered sequence: for each rule in order,
438    // find a timestamp >= the previous rule's chosen timestamp.
439    fn find_ordered(
440        rule_refs: &[String],
441        rule_hits: &HashMap<String, VecDeque<i64>>,
442        idx: usize,
443        min_ts: i64,
444    ) -> bool {
445        if idx >= rule_refs.len() {
446            return true;
447        }
448        let Some(timestamps) = rule_hits.get(&rule_refs[idx]) else {
449            return false;
450        };
451        for &ts in timestamps {
452            if ts >= min_ts && find_ordered(rule_refs, rule_hits, idx + 1, ts) {
453                return true;
454            }
455        }
456        false
457    }
458
459    find_ordered(rule_refs, rule_hits, 0, i64::MIN)
460}
461
462/// Evaluate a boolean condition expression against the set of rules that have
463/// fired within the temporal window.
464///
465/// Each `Identifier` in the expression is treated as a rule reference — it's
466/// `true` if that rule has at least one hit in `rule_hits`.
467pub(super) fn eval_temporal_expr(
468    expr: &ConditionExpr,
469    rule_hits: &HashMap<String, VecDeque<i64>>,
470) -> bool {
471    match expr {
472        ConditionExpr::Identifier(name) => rule_hits
473            .get(name.as_str())
474            .is_some_and(|ts| !ts.is_empty()),
475        ConditionExpr::And(children) => children.iter().all(|c| eval_temporal_expr(c, rule_hits)),
476        ConditionExpr::Or(children) => children.iter().any(|c| eval_temporal_expr(c, rule_hits)),
477        ConditionExpr::Not(child) => !eval_temporal_expr(child, rule_hits),
478        ConditionExpr::Selector { .. } => {
479            // Selectors are not meaningful for temporal condition evaluation
480            false
481        }
482    }
483}
484
485/// Compute the value at a given percentile rank using linear interpolation.
486///
487/// Returns 0.0 if `values` is empty.
488/// `values` must be sorted in ascending order.
489/// `percentile` is from 0.0 to 100.0.
490pub(super) fn percentile_linear_interp(values: &[f64], percentile: f64) -> f64 {
491    if values.is_empty() {
492        return 0.0;
493    }
494    let n = values.len();
495    if n == 1 {
496        return values[0];
497    }
498
499    // Clamp percentile to [0, 100]
500    let p = percentile.clamp(0.0, 100.0) / 100.0;
501
502    // Use the "C = 1" interpolation method (most common in statistics)
503    // rank = p * (n - 1)
504    let rank = p * (n - 1) as f64;
505    let lower = rank.floor() as usize;
506    let upper = rank.ceil() as usize;
507    let fraction = rank - lower as f64;
508
509    if lower == upper || upper >= n {
510        values[lower.min(n - 1)]
511    } else {
512        values[lower] + fraction * (values[upper] - values[lower])
513    }
514}