1use std::collections::{HashMap, HashSet, VecDeque};
2
3use rsigma_parser::{ConditionExpr, CorrelationType, WindowMode};
4use serde::Serialize;
5
6use super::CompiledCondition;
7
8#[derive(Debug, Clone, Serialize, serde::Deserialize)]
16pub enum WindowState {
17 EventCount { timestamps: VecDeque<i64> },
19 ValueCount { entries: VecDeque<(i64, String)> },
21 Temporal {
23 rule_hits: HashMap<String, VecDeque<i64>>,
24 },
25 NumericAgg { entries: VecDeque<(i64, f64)> },
28}
29
30impl WindowState {
31 pub fn new_for(corr_type: CorrelationType) -> Self {
33 match corr_type {
34 CorrelationType::EventCount => WindowState::EventCount {
35 timestamps: VecDeque::new(),
36 },
37 CorrelationType::ValueCount => WindowState::ValueCount {
38 entries: VecDeque::new(),
39 },
40 CorrelationType::Temporal | CorrelationType::TemporalOrdered => WindowState::Temporal {
41 rule_hits: HashMap::new(),
42 },
43 CorrelationType::ValueSum
44 | CorrelationType::ValueAvg
45 | CorrelationType::ValuePercentile
46 | CorrelationType::ValueMedian => WindowState::NumericAgg {
47 entries: VecDeque::new(),
48 },
49 }
50 }
51
52 pub fn evict(&mut self, cutoff: i64) {
54 match self {
55 WindowState::EventCount { timestamps } => {
56 while timestamps.front().is_some_and(|&t| t < cutoff) {
57 timestamps.pop_front();
58 }
59 }
60 WindowState::ValueCount { entries } => {
61 while entries.front().is_some_and(|(t, _)| *t < cutoff) {
62 entries.pop_front();
63 }
64 }
65 WindowState::Temporal { rule_hits } => {
66 for timestamps in rule_hits.values_mut() {
67 while timestamps.front().is_some_and(|&t| t < cutoff) {
68 timestamps.pop_front();
69 }
70 }
71 rule_hits.retain(|_, ts| !ts.is_empty());
73 }
74 WindowState::NumericAgg { entries } => {
75 while entries.front().is_some_and(|(t, _)| *t < cutoff) {
76 entries.pop_front();
77 }
78 }
79 }
80 }
81
82 pub fn is_empty(&self) -> bool {
84 match self {
85 WindowState::EventCount { timestamps } => timestamps.is_empty(),
86 WindowState::ValueCount { entries } => entries.is_empty(),
87 WindowState::Temporal { rule_hits } => rule_hits.is_empty(),
88 WindowState::NumericAgg { entries } => entries.is_empty(),
89 }
90 }
91
92 pub fn latest_timestamp(&self) -> Option<i64> {
94 match self {
95 WindowState::EventCount { timestamps } => timestamps.back().copied(),
96 WindowState::ValueCount { entries } => entries.back().map(|(t, _)| *t),
97 WindowState::Temporal { rule_hits } => {
98 rule_hits.values().filter_map(|ts| ts.back().copied()).max()
99 }
100 WindowState::NumericAgg { entries } => entries.back().map(|(t, _)| *t),
101 }
102 }
103
104 pub fn earliest_timestamp(&self) -> Option<i64> {
110 match self {
111 WindowState::EventCount { timestamps } => timestamps.front().copied(),
112 WindowState::ValueCount { entries } => entries.front().map(|(t, _)| *t),
113 WindowState::Temporal { rule_hits } => rule_hits
114 .values()
115 .filter_map(|ts| ts.front().copied())
116 .min(),
117 WindowState::NumericAgg { entries } => entries.front().map(|(t, _)| *t),
118 }
119 }
120
121 pub fn clear(&mut self) {
123 match self {
124 WindowState::EventCount { timestamps } => timestamps.clear(),
125 WindowState::ValueCount { entries } => entries.clear(),
126 WindowState::Temporal { rule_hits } => rule_hits.clear(),
127 WindowState::NumericAgg { entries } => entries.clear(),
128 }
129 }
130
131 pub fn push_event_count(&mut self, ts: i64) {
133 if let WindowState::EventCount { timestamps } = self {
134 timestamps.push_back(ts);
135 }
136 }
137
138 pub fn push_value_count(&mut self, ts: i64, value: String) {
140 if let WindowState::ValueCount { entries } = self {
141 entries.push_back((ts, value));
142 }
143 }
144
145 pub fn push_temporal(&mut self, ts: i64, rule_ref: &str) {
147 if let WindowState::Temporal { rule_hits } = self {
148 rule_hits
149 .entry(rule_ref.to_string())
150 .or_default()
151 .push_back(ts);
152 }
153 }
154
155 pub fn push_numeric(&mut self, ts: i64, value: f64) {
157 if let WindowState::NumericAgg { entries } = self {
158 entries.push_back((ts, value));
159 }
160 }
161
162 pub fn check_condition(
170 &self,
171 condition: &CompiledCondition,
172 corr_type: CorrelationType,
173 rule_refs: &[String],
174 extended_expr: Option<&ConditionExpr>,
175 ) -> Option<f64> {
176 let value = match (self, corr_type) {
177 (WindowState::EventCount { timestamps }, CorrelationType::EventCount) => {
178 timestamps.len() as f64
179 }
180 (WindowState::ValueCount { entries }, CorrelationType::ValueCount) => {
181 let distinct: HashSet<&String> = entries.iter().map(|(_, v)| v).collect();
183 distinct.len() as f64
184 }
185 (WindowState::Temporal { rule_hits }, CorrelationType::Temporal) => {
186 if let Some(expr) = extended_expr {
188 if eval_temporal_expr(expr, rule_hits) {
189 let fired: usize = rule_refs
191 .iter()
192 .filter(|r| rule_hits.get(r.as_str()).is_some_and(|ts| !ts.is_empty()))
193 .count();
194 return Some(fired as f64);
195 } else {
196 return None;
197 }
198 }
199 let fired: usize = rule_refs
201 .iter()
202 .filter(|r| rule_hits.get(r.as_str()).is_some_and(|ts| !ts.is_empty()))
203 .count();
204 fired as f64
205 }
206 (WindowState::Temporal { rule_hits }, CorrelationType::TemporalOrdered) => {
207 if let Some(expr) = extended_expr
209 && !eval_temporal_expr(expr, rule_hits)
210 {
211 return None;
212 }
213 if check_temporal_ordered(rule_refs, rule_hits) {
215 rule_refs.len() as f64
216 } else {
217 0.0
218 }
219 }
220 (WindowState::NumericAgg { entries }, CorrelationType::ValueSum) => {
221 entries.iter().map(|(_, v)| v).sum()
222 }
223 (WindowState::NumericAgg { entries }, CorrelationType::ValueAvg) => {
224 if entries.is_empty() {
225 0.0
226 } else {
227 let sum: f64 = entries.iter().map(|(_, v)| v).sum();
228 sum / entries.len() as f64
229 }
230 }
231 (WindowState::NumericAgg { entries }, CorrelationType::ValuePercentile) => {
232 if entries.is_empty() {
236 return None;
237 }
238 let mut values: Vec<f64> = entries
239 .iter()
240 .map(|(_, v)| *v)
241 .filter(|v| v.is_finite())
242 .collect();
243 if values.is_empty() {
244 return None;
245 }
246 values.sort_by(|a, b| a.total_cmp(b));
247 let percentile_rank = condition.percentile.map(|p| p as f64).unwrap_or(50.0);
248 let pval = percentile_linear_interp(&values, percentile_rank);
249 return Some(pval);
250 }
251 (WindowState::NumericAgg { entries }, CorrelationType::ValueMedian) => {
252 if entries.is_empty() {
256 return None;
257 }
258 let mut values: Vec<f64> = entries
259 .iter()
260 .map(|(_, v)| *v)
261 .filter(|v| v.is_finite())
262 .collect();
263 if values.is_empty() {
264 return None;
265 }
266 values.sort_by(|a, b| a.total_cmp(b));
267 let mid = values.len() / 2;
268 if values.len().is_multiple_of(2) && values.len() >= 2 {
269 (values[mid - 1] + values[mid]) / 2.0
270 } else {
271 values[mid]
272 }
273 }
274 _ => return None, };
276
277 if condition.check(value) {
278 Some(value)
279 } else {
280 None
281 }
282 }
283}
284
285fn bucket_start(ts: i64, timespan: i64) -> i64 {
290 ts - ts.rem_euclid(timespan)
291}
292
293#[derive(Debug, Clone, Copy, PartialEq, Eq)]
295pub enum WindowDecision {
296 Extend,
298 Reset,
302 Discard,
305}
306
307pub fn apply_window_open(
322 state: &mut WindowState,
323 ts: i64,
324 timespan_secs: u64,
325 window: WindowMode,
326 gap_secs: Option<u64>,
327) -> WindowDecision {
328 let timespan = timespan_secs as i64;
329 match window {
330 WindowMode::Sliding => {
331 state.evict(ts - timespan);
332 WindowDecision::Extend
333 }
334 WindowMode::Tumbling => {
335 if timespan <= 0 {
336 return WindowDecision::Extend;
337 }
338 match state.latest_timestamp() {
339 Some(last) if bucket_start(ts, timespan) > bucket_start(last, timespan) => {
340 state.clear();
341 WindowDecision::Reset
342 }
343 Some(last) if bucket_start(ts, timespan) < bucket_start(last, timespan) => {
344 WindowDecision::Discard
345 }
346 _ => WindowDecision::Extend,
347 }
348 }
349 WindowMode::Session => {
350 let gap = gap_secs.map(|g| g as i64).unwrap_or(timespan);
353 let reset = match (state.earliest_timestamp(), state.latest_timestamp()) {
354 (Some(start), Some(last)) => {
355 (ts - last) > gap || (timespan > 0 && (ts - start) > timespan)
356 }
357 _ => false,
358 };
359 if reset {
360 state.clear();
361 WindowDecision::Reset
362 } else {
363 WindowDecision::Extend
364 }
365 }
366 }
367}
368
369fn check_temporal_ordered(
375 rule_refs: &[String],
376 rule_hits: &HashMap<String, VecDeque<i64>>,
377) -> bool {
378 if rule_refs.is_empty() {
379 return true;
380 }
381
382 for r in rule_refs {
384 if rule_hits.get(r.as_str()).is_none_or(|ts| ts.is_empty()) {
385 return false;
386 }
387 }
388
389 fn find_ordered(
392 rule_refs: &[String],
393 rule_hits: &HashMap<String, VecDeque<i64>>,
394 idx: usize,
395 min_ts: i64,
396 ) -> bool {
397 if idx >= rule_refs.len() {
398 return true;
399 }
400 let Some(timestamps) = rule_hits.get(&rule_refs[idx]) else {
401 return false;
402 };
403 for &ts in timestamps {
404 if ts >= min_ts && find_ordered(rule_refs, rule_hits, idx + 1, ts) {
405 return true;
406 }
407 }
408 false
409 }
410
411 find_ordered(rule_refs, rule_hits, 0, i64::MIN)
412}
413
414pub(super) fn eval_temporal_expr(
420 expr: &ConditionExpr,
421 rule_hits: &HashMap<String, VecDeque<i64>>,
422) -> bool {
423 match expr {
424 ConditionExpr::Identifier(name) => rule_hits
425 .get(name.as_str())
426 .is_some_and(|ts| !ts.is_empty()),
427 ConditionExpr::And(children) => children.iter().all(|c| eval_temporal_expr(c, rule_hits)),
428 ConditionExpr::Or(children) => children.iter().any(|c| eval_temporal_expr(c, rule_hits)),
429 ConditionExpr::Not(child) => !eval_temporal_expr(child, rule_hits),
430 ConditionExpr::Selector { .. } => {
431 false
433 }
434 }
435}
436
437pub(super) fn percentile_linear_interp(values: &[f64], percentile: f64) -> f64 {
443 if values.is_empty() {
444 return 0.0;
445 }
446 let n = values.len();
447 if n == 1 {
448 return values[0];
449 }
450
451 let p = percentile.clamp(0.0, 100.0) / 100.0;
453
454 let rank = p * (n - 1) as f64;
457 let lower = rank.floor() as usize;
458 let upper = rank.ceil() as usize;
459 let fraction = rank - lower as f64;
460
461 if lower == upper || upper >= n {
462 values[lower.min(n - 1)]
463 } else {
464 values[lower] + fraction * (values[upper] - values[lower])
465 }
466}