Skip to main content

rsigma_eval/correlation/
window.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2
3use rsigma_parser::{ConditionExpr, CorrelationType, WindowMode};
4use serde::Serialize;
5
6use super::CompiledCondition;
7
8// =============================================================================
9// Window State
10// =============================================================================
11
12/// Per-group mutable state within a time window.
13///
14/// Each variant matches the type of aggregation being performed.
15#[derive(Debug, Clone, Serialize, serde::Deserialize)]
16pub enum WindowState {
17    /// For `event_count`: timestamps of matching events.
18    EventCount { timestamps: VecDeque<i64> },
19    /// For `value_count`: (timestamp, field_value) pairs.
20    ValueCount { entries: VecDeque<(i64, String)> },
21    /// For `temporal` / `temporal_ordered`: rule_ref -> list of hit timestamps.
22    Temporal {
23        rule_hits: HashMap<String, VecDeque<i64>>,
24    },
25    /// For `value_sum`, `value_avg`, `value_percentile`, `value_median`:
26    /// (timestamp, numeric_value) pairs.
27    NumericAgg { entries: VecDeque<(i64, f64)> },
28}
29
30impl WindowState {
31    /// Create a new empty window state for the given correlation type.
32    pub fn new_for(corr_type: CorrelationType) -> Self {
33        match corr_type {
34            CorrelationType::EventCount => WindowState::EventCount {
35                timestamps: VecDeque::new(),
36            },
37            CorrelationType::ValueCount => WindowState::ValueCount {
38                entries: VecDeque::new(),
39            },
40            CorrelationType::Temporal | CorrelationType::TemporalOrdered => WindowState::Temporal {
41                rule_hits: HashMap::new(),
42            },
43            CorrelationType::ValueSum
44            | CorrelationType::ValueAvg
45            | CorrelationType::ValuePercentile
46            | CorrelationType::ValueMedian => WindowState::NumericAgg {
47                entries: VecDeque::new(),
48            },
49        }
50    }
51
52    /// Remove all entries older than the cutoff timestamp.
53    pub fn evict(&mut self, cutoff: i64) {
54        match self {
55            WindowState::EventCount { timestamps } => {
56                while timestamps.front().is_some_and(|&t| t < cutoff) {
57                    timestamps.pop_front();
58                }
59            }
60            WindowState::ValueCount { entries } => {
61                while entries.front().is_some_and(|(t, _)| *t < cutoff) {
62                    entries.pop_front();
63                }
64            }
65            WindowState::Temporal { rule_hits } => {
66                for timestamps in rule_hits.values_mut() {
67                    while timestamps.front().is_some_and(|&t| t < cutoff) {
68                        timestamps.pop_front();
69                    }
70                }
71                // Remove empty rule entries
72                rule_hits.retain(|_, ts| !ts.is_empty());
73            }
74            WindowState::NumericAgg { entries } => {
75                while entries.front().is_some_and(|(t, _)| *t < cutoff) {
76                    entries.pop_front();
77                }
78            }
79        }
80    }
81
82    /// Returns true if this state has no entries.
83    pub fn is_empty(&self) -> bool {
84        match self {
85            WindowState::EventCount { timestamps } => timestamps.is_empty(),
86            WindowState::ValueCount { entries } => entries.is_empty(),
87            WindowState::Temporal { rule_hits } => rule_hits.is_empty(),
88            WindowState::NumericAgg { entries } => entries.is_empty(),
89        }
90    }
91
92    /// Returns the most recent timestamp in this window, or `None` if empty.
93    pub fn latest_timestamp(&self) -> Option<i64> {
94        match self {
95            WindowState::EventCount { timestamps } => timestamps.back().copied(),
96            WindowState::ValueCount { entries } => entries.back().map(|(t, _)| *t),
97            WindowState::Temporal { rule_hits } => {
98                rule_hits.values().filter_map(|ts| ts.back().copied()).max()
99            }
100            WindowState::NumericAgg { entries } => entries.back().map(|(t, _)| *t),
101        }
102    }
103
104    /// Returns the oldest timestamp in this window, or `None` if empty.
105    ///
106    /// Entries are appended in arrival order, so the front of each deque holds
107    /// the earliest timestamp. For temporal state the minimum is taken across
108    /// all per-rule deques.
109    pub fn earliest_timestamp(&self) -> Option<i64> {
110        match self {
111            WindowState::EventCount { timestamps } => timestamps.front().copied(),
112            WindowState::ValueCount { entries } => entries.front().map(|(t, _)| *t),
113            WindowState::Temporal { rule_hits } => rule_hits
114                .values()
115                .filter_map(|ts| ts.front().copied())
116                .min(),
117            WindowState::NumericAgg { entries } => entries.front().map(|(t, _)| *t),
118        }
119    }
120
121    /// Clear all entries from the window state (used by `CorrelationAction::Reset`).
122    pub fn clear(&mut self) {
123        match self {
124            WindowState::EventCount { timestamps } => timestamps.clear(),
125            WindowState::ValueCount { entries } => entries.clear(),
126            WindowState::Temporal { rule_hits } => rule_hits.clear(),
127            WindowState::NumericAgg { entries } => entries.clear(),
128        }
129    }
130
131    /// Record an event_count hit.
132    pub fn push_event_count(&mut self, ts: i64) {
133        if let WindowState::EventCount { timestamps } = self {
134            timestamps.push_back(ts);
135        }
136    }
137
138    /// Record a value_count hit with the field value.
139    pub fn push_value_count(&mut self, ts: i64, value: String) {
140        if let WindowState::ValueCount { entries } = self {
141            entries.push_back((ts, value));
142        }
143    }
144
145    /// Record a temporal hit for a specific rule reference.
146    pub fn push_temporal(&mut self, ts: i64, rule_ref: &str) {
147        if let WindowState::Temporal { rule_hits } = self {
148            rule_hits
149                .entry(rule_ref.to_string())
150                .or_default()
151                .push_back(ts);
152        }
153    }
154
155    /// Record a numeric aggregation value.
156    pub fn push_numeric(&mut self, ts: i64, value: f64) {
157        if let WindowState::NumericAgg { entries } = self {
158            entries.push_back((ts, value));
159        }
160    }
161
162    /// Evaluate the window state against the correlation condition.
163    ///
164    /// Returns `Some(aggregated_value)` if the condition is satisfied,
165    /// `None` otherwise.
166    ///
167    /// For temporal correlations with an extended expression, the expression
168    /// is evaluated against the set of rules that have fired in the window.
169    pub fn check_condition(
170        &self,
171        condition: &CompiledCondition,
172        corr_type: CorrelationType,
173        rule_refs: &[String],
174        extended_expr: Option<&ConditionExpr>,
175    ) -> Option<f64> {
176        let value = match (self, corr_type) {
177            (WindowState::EventCount { timestamps }, CorrelationType::EventCount) => {
178                timestamps.len() as f64
179            }
180            (WindowState::ValueCount { entries }, CorrelationType::ValueCount) => {
181                // Count distinct values
182                let distinct: HashSet<&String> = entries.iter().map(|(_, v)| v).collect();
183                distinct.len() as f64
184            }
185            (WindowState::Temporal { rule_hits }, CorrelationType::Temporal) => {
186                // If an extended expression is provided, evaluate it
187                if let Some(expr) = extended_expr {
188                    if eval_temporal_expr(expr, rule_hits) {
189                        // Return the count of fired rules as the value
190                        let fired: usize = rule_refs
191                            .iter()
192                            .filter(|r| rule_hits.get(r.as_str()).is_some_and(|ts| !ts.is_empty()))
193                            .count();
194                        return Some(fired as f64);
195                    } else {
196                        return None;
197                    }
198                }
199                // Default: count how many distinct referenced rules have fired
200                let fired: usize = rule_refs
201                    .iter()
202                    .filter(|r| rule_hits.get(r.as_str()).is_some_and(|ts| !ts.is_empty()))
203                    .count();
204                fired as f64
205            }
206            (WindowState::Temporal { rule_hits }, CorrelationType::TemporalOrdered) => {
207                // If an extended expression is provided, evaluate it first
208                if let Some(expr) = extended_expr
209                    && !eval_temporal_expr(expr, rule_hits)
210                {
211                    return None;
212                }
213                // Check if all referenced rules fired in order
214                if check_temporal_ordered(rule_refs, rule_hits) {
215                    rule_refs.len() as f64
216                } else {
217                    0.0
218                }
219            }
220            (WindowState::NumericAgg { entries }, CorrelationType::ValueSum) => {
221                entries.iter().map(|(_, v)| v).sum()
222            }
223            (WindowState::NumericAgg { entries }, CorrelationType::ValueAvg) => {
224                if entries.is_empty() {
225                    0.0
226                } else {
227                    let sum: f64 = entries.iter().map(|(_, v)| v).sum();
228                    sum / entries.len() as f64
229                }
230            }
231            (WindowState::NumericAgg { entries }, CorrelationType::ValuePercentile) => {
232                // Proper percentile calculation using linear interpolation.
233                // The condition threshold represents a percentile rank (0-100).
234                // We compute the value at that percentile from the window data.
235                if entries.is_empty() {
236                    return None;
237                }
238                let mut values: Vec<f64> = entries
239                    .iter()
240                    .map(|(_, v)| *v)
241                    .filter(|v| v.is_finite())
242                    .collect();
243                if values.is_empty() {
244                    return None;
245                }
246                values.sort_by(|a, b| a.total_cmp(b));
247                let percentile_rank = condition.percentile.map(|p| p as f64).unwrap_or(50.0);
248                let pval = percentile_linear_interp(&values, percentile_rank);
249                return Some(pval);
250            }
251            (WindowState::NumericAgg { entries }, CorrelationType::ValueMedian) => {
252                // An empty window has no median. Returning `0.0` here would
253                // spuriously satisfy predicates like `lte: 0` or `eq: 0`, so
254                // match the percentile branch and skip evaluation.
255                if entries.is_empty() {
256                    return None;
257                }
258                let mut values: Vec<f64> = entries
259                    .iter()
260                    .map(|(_, v)| *v)
261                    .filter(|v| v.is_finite())
262                    .collect();
263                if values.is_empty() {
264                    return None;
265                }
266                values.sort_by(|a, b| a.total_cmp(b));
267                let mid = values.len() / 2;
268                if values.len().is_multiple_of(2) && values.len() >= 2 {
269                    (values[mid - 1] + values[mid]) / 2.0
270                } else {
271                    values[mid]
272                }
273            }
274            _ => return None, // mismatched state/type
275        };
276
277        if condition.check(value) {
278            Some(value)
279        } else {
280            None
281        }
282    }
283}
284
285/// Start of the tumbling bucket that contains `ts`, aligned to epoch.
286///
287/// Uses `rem_euclid` so negative timestamps align to the bucket below them
288/// rather than toward zero.
289fn bucket_start(ts: i64, timespan: i64) -> i64 {
290    ts - ts.rem_euclid(timespan)
291}
292
293/// Outcome of a window's pre-insert maintenance for a new event.
294#[derive(Debug, Clone, Copy, PartialEq, Eq)]
295pub enum WindowDecision {
296    /// The current window continues; push the event into it.
297    Extend,
298    /// The window rolled over and the state was cleared; push the event into
299    /// the fresh window. Callers must also clear any associated event buffers
300    /// so they stay in sync with the state.
301    Reset,
302    /// The event predates the current window (a late arrival in an earlier
303    /// tumbling bucket); do not push it and leave the state untouched.
304    Discard,
305}
306
307/// Apply a correlation window's pre-insert maintenance for a new event at `ts`,
308/// before the event is pushed into `state`.
309///
310/// - `Sliding` evicts entries older than `ts - timespan` (the existing default)
311///   and always extends.
312/// - `Tumbling` resets the window when `ts` falls in a *later*
313///   boundary-aligned bucket than the latest retained entry, and discards
314///   events that fall in an *earlier* bucket (a late arrival must not wipe the
315///   active bucket's accumulation).
316/// - `Session` resets when `ts` is more than `gap` after the latest entry, or
317///   when extending would push the total span past `timespan` (the hard cap).
318///
319/// Out-of-order arrivals follow the engine's arrival-order contract: decisions
320/// are made relative to the entries currently retained, not a global watermark.
321pub fn apply_window_open(
322    state: &mut WindowState,
323    ts: i64,
324    timespan_secs: u64,
325    window: WindowMode,
326    gap_secs: Option<u64>,
327) -> WindowDecision {
328    let timespan = timespan_secs as i64;
329    match window {
330        WindowMode::Sliding => {
331            state.evict(ts - timespan);
332            WindowDecision::Extend
333        }
334        WindowMode::Tumbling => {
335            if timespan <= 0 {
336                return WindowDecision::Extend;
337            }
338            match state.latest_timestamp() {
339                Some(last) if bucket_start(ts, timespan) > bucket_start(last, timespan) => {
340                    state.clear();
341                    WindowDecision::Reset
342                }
343                Some(last) if bucket_start(ts, timespan) < bucket_start(last, timespan) => {
344                    WindowDecision::Discard
345                }
346                _ => WindowDecision::Extend,
347            }
348        }
349        WindowMode::Session => {
350            // `gap` is required for session windows; fall back to `timespan` if
351            // it is somehow absent so the window still has a bound.
352            let gap = gap_secs.map(|g| g as i64).unwrap_or(timespan);
353            let reset = match (state.earliest_timestamp(), state.latest_timestamp()) {
354                (Some(start), Some(last)) => {
355                    (ts - last) > gap || (timespan > 0 && (ts - start) > timespan)
356                }
357                _ => false,
358            };
359            if reset {
360                state.clear();
361                WindowDecision::Reset
362            } else {
363                WindowDecision::Extend
364            }
365        }
366    }
367}
368
369/// Check if all referenced rules fired in the correct order within the window.
370///
371/// For `temporal_ordered`, each rule must have at least one hit, and there
372/// must exist a sequence of timestamps (one per rule) that is non-decreasing
373/// and follows the rule ordering.
374fn check_temporal_ordered(
375    rule_refs: &[String],
376    rule_hits: &HashMap<String, VecDeque<i64>>,
377) -> bool {
378    if rule_refs.is_empty() {
379        return true;
380    }
381
382    // All rules must have at least one hit
383    for r in rule_refs {
384        if rule_hits.get(r.as_str()).is_none_or(|ts| ts.is_empty()) {
385            return false;
386        }
387    }
388
389    // Check if there's a valid ordered sequence: for each rule in order,
390    // find a timestamp >= the previous rule's chosen timestamp.
391    fn find_ordered(
392        rule_refs: &[String],
393        rule_hits: &HashMap<String, VecDeque<i64>>,
394        idx: usize,
395        min_ts: i64,
396    ) -> bool {
397        if idx >= rule_refs.len() {
398            return true;
399        }
400        let Some(timestamps) = rule_hits.get(&rule_refs[idx]) else {
401            return false;
402        };
403        for &ts in timestamps {
404            if ts >= min_ts && find_ordered(rule_refs, rule_hits, idx + 1, ts) {
405                return true;
406            }
407        }
408        false
409    }
410
411    find_ordered(rule_refs, rule_hits, 0, i64::MIN)
412}
413
414/// Evaluate a boolean condition expression against the set of rules that have
415/// fired within the temporal window.
416///
417/// Each `Identifier` in the expression is treated as a rule reference — it's
418/// `true` if that rule has at least one hit in `rule_hits`.
419pub(super) fn eval_temporal_expr(
420    expr: &ConditionExpr,
421    rule_hits: &HashMap<String, VecDeque<i64>>,
422) -> bool {
423    match expr {
424        ConditionExpr::Identifier(name) => rule_hits
425            .get(name.as_str())
426            .is_some_and(|ts| !ts.is_empty()),
427        ConditionExpr::And(children) => children.iter().all(|c| eval_temporal_expr(c, rule_hits)),
428        ConditionExpr::Or(children) => children.iter().any(|c| eval_temporal_expr(c, rule_hits)),
429        ConditionExpr::Not(child) => !eval_temporal_expr(child, rule_hits),
430        ConditionExpr::Selector { .. } => {
431            // Selectors are not meaningful for temporal condition evaluation
432            false
433        }
434    }
435}
436
437/// Compute the value at a given percentile rank using linear interpolation.
438///
439/// Returns 0.0 if `values` is empty.
440/// `values` must be sorted in ascending order.
441/// `percentile` is from 0.0 to 100.0.
442pub(super) fn percentile_linear_interp(values: &[f64], percentile: f64) -> f64 {
443    if values.is_empty() {
444        return 0.0;
445    }
446    let n = values.len();
447    if n == 1 {
448        return values[0];
449    }
450
451    // Clamp percentile to [0, 100]
452    let p = percentile.clamp(0.0, 100.0) / 100.0;
453
454    // Use the "C = 1" interpolation method (most common in statistics)
455    // rank = p * (n - 1)
456    let rank = p * (n - 1) as f64;
457    let lower = rank.floor() as usize;
458    let upper = rank.ceil() as usize;
459    let fraction = rank - lower as f64;
460
461    if lower == upper || upper >= n {
462        values[lower.min(n - 1)]
463    } else {
464        values[lower] + fraction * (values[upper] - values[lower])
465    }
466}