1use std::collections::{HashMap, HashSet, VecDeque};
2
3use rsigma_parser::{ConditionExpr, CorrelationType, WindowMode};
4use serde::Serialize;
5
6use super::CompiledCondition;
7
8#[derive(Debug, Clone, Serialize, serde::Deserialize)]
16pub enum WindowState {
17 EventCount { timestamps: VecDeque<i64> },
19 ValueCount { entries: VecDeque<(i64, String)> },
21 Temporal {
23 rule_hits: HashMap<String, VecDeque<i64>>,
24 },
25 NumericAgg { entries: VecDeque<(i64, f64)> },
28}
29
30impl WindowState {
31 pub fn new_for(corr_type: CorrelationType) -> Self {
33 match corr_type {
34 CorrelationType::EventCount => WindowState::EventCount {
35 timestamps: VecDeque::new(),
36 },
37 CorrelationType::ValueCount => WindowState::ValueCount {
38 entries: VecDeque::new(),
39 },
40 CorrelationType::Temporal | CorrelationType::TemporalOrdered => WindowState::Temporal {
41 rule_hits: HashMap::new(),
42 },
43 CorrelationType::ValueSum
44 | CorrelationType::ValueAvg
45 | CorrelationType::ValuePercentile
46 | CorrelationType::ValueMedian => WindowState::NumericAgg {
47 entries: VecDeque::new(),
48 },
49 }
50 }
51
52 pub fn evict(&mut self, cutoff: i64) {
54 match self {
55 WindowState::EventCount { timestamps } => {
56 while timestamps.front().is_some_and(|&t| t < cutoff) {
57 timestamps.pop_front();
58 }
59 }
60 WindowState::ValueCount { entries } => {
61 while entries.front().is_some_and(|(t, _)| *t < cutoff) {
62 entries.pop_front();
63 }
64 }
65 WindowState::Temporal { rule_hits } => {
66 for timestamps in rule_hits.values_mut() {
67 while timestamps.front().is_some_and(|&t| t < cutoff) {
68 timestamps.pop_front();
69 }
70 }
71 rule_hits.retain(|_, ts| !ts.is_empty());
73 }
74 WindowState::NumericAgg { entries } => {
75 while entries.front().is_some_and(|(t, _)| *t < cutoff) {
76 entries.pop_front();
77 }
78 }
79 }
80 }
81
82 pub fn is_empty(&self) -> bool {
84 match self {
85 WindowState::EventCount { timestamps } => timestamps.is_empty(),
86 WindowState::ValueCount { entries } => entries.is_empty(),
87 WindowState::Temporal { rule_hits } => rule_hits.is_empty(),
88 WindowState::NumericAgg { entries } => entries.is_empty(),
89 }
90 }
91
92 pub fn latest_timestamp(&self) -> Option<i64> {
94 match self {
95 WindowState::EventCount { timestamps } => timestamps.back().copied(),
96 WindowState::ValueCount { entries } => entries.back().map(|(t, _)| *t),
97 WindowState::Temporal { rule_hits } => {
98 rule_hits.values().filter_map(|ts| ts.back().copied()).max()
99 }
100 WindowState::NumericAgg { entries } => entries.back().map(|(t, _)| *t),
101 }
102 }
103
104 pub fn earliest_timestamp(&self) -> Option<i64> {
110 match self {
111 WindowState::EventCount { timestamps } => timestamps.front().copied(),
112 WindowState::ValueCount { entries } => entries.front().map(|(t, _)| *t),
113 WindowState::Temporal { rule_hits } => rule_hits
114 .values()
115 .filter_map(|ts| ts.front().copied())
116 .min(),
117 WindowState::NumericAgg { entries } => entries.front().map(|(t, _)| *t),
118 }
119 }
120
121 pub fn truncate_oldest(&mut self, cap: usize, preserve_front: bool) {
133 let cap = cap.max(1);
134
135 fn truncate_deque<T>(deque: &mut VecDeque<T>, cap: usize, preserve_front: bool) {
136 if deque.len() <= cap {
137 return;
138 }
139 if preserve_front {
140 let anchor = deque.pop_front().expect("len > cap >= 1");
141 let excess = deque.len() - (cap - 1);
143 deque.drain(..excess);
144 deque.push_front(anchor);
145 } else {
146 let excess = deque.len() - cap;
147 deque.drain(..excess);
148 }
149 }
150
151 match self {
152 WindowState::EventCount { timestamps } => {
153 truncate_deque(timestamps, cap, preserve_front);
154 }
155 WindowState::ValueCount { entries } => {
156 truncate_deque(entries, cap, preserve_front);
157 }
158 WindowState::Temporal { rule_hits } => {
159 for timestamps in rule_hits.values_mut() {
160 truncate_deque(timestamps, cap, preserve_front);
161 }
162 }
163 WindowState::NumericAgg { entries } => {
164 truncate_deque(entries, cap, preserve_front);
165 }
166 }
167 }
168
169 pub fn clear(&mut self) {
171 match self {
172 WindowState::EventCount { timestamps } => timestamps.clear(),
173 WindowState::ValueCount { entries } => entries.clear(),
174 WindowState::Temporal { rule_hits } => rule_hits.clear(),
175 WindowState::NumericAgg { entries } => entries.clear(),
176 }
177 }
178
179 pub fn push_event_count(&mut self, ts: i64) {
181 if let WindowState::EventCount { timestamps } = self {
182 timestamps.push_back(ts);
183 }
184 }
185
186 pub fn push_value_count(&mut self, ts: i64, value: String) {
188 if let WindowState::ValueCount { entries } = self {
189 entries.push_back((ts, value));
190 }
191 }
192
193 pub fn push_temporal(&mut self, ts: i64, rule_ref: &str) {
195 if let WindowState::Temporal { rule_hits } = self {
196 rule_hits
197 .entry(rule_ref.to_string())
198 .or_default()
199 .push_back(ts);
200 }
201 }
202
203 pub fn push_numeric(&mut self, ts: i64, value: f64) {
205 if let WindowState::NumericAgg { entries } = self {
206 entries.push_back((ts, value));
207 }
208 }
209
210 pub fn check_condition(
218 &self,
219 condition: &CompiledCondition,
220 corr_type: CorrelationType,
221 rule_refs: &[String],
222 extended_expr: Option<&ConditionExpr>,
223 ) -> Option<f64> {
224 let value = match (self, corr_type) {
225 (WindowState::EventCount { timestamps }, CorrelationType::EventCount) => {
226 timestamps.len() as f64
227 }
228 (WindowState::ValueCount { entries }, CorrelationType::ValueCount) => {
229 let distinct: HashSet<&String> = entries.iter().map(|(_, v)| v).collect();
231 distinct.len() as f64
232 }
233 (WindowState::Temporal { rule_hits }, CorrelationType::Temporal) => {
234 if let Some(expr) = extended_expr {
236 if eval_temporal_expr(expr, rule_hits) {
237 let fired: usize = rule_refs
239 .iter()
240 .filter(|r| rule_hits.get(r.as_str()).is_some_and(|ts| !ts.is_empty()))
241 .count();
242 return Some(fired as f64);
243 } else {
244 return None;
245 }
246 }
247 let fired: usize = rule_refs
249 .iter()
250 .filter(|r| rule_hits.get(r.as_str()).is_some_and(|ts| !ts.is_empty()))
251 .count();
252 fired as f64
253 }
254 (WindowState::Temporal { rule_hits }, CorrelationType::TemporalOrdered) => {
255 if let Some(expr) = extended_expr
257 && !eval_temporal_expr(expr, rule_hits)
258 {
259 return None;
260 }
261 if check_temporal_ordered(rule_refs, rule_hits) {
263 rule_refs.len() as f64
264 } else {
265 0.0
266 }
267 }
268 (WindowState::NumericAgg { entries }, CorrelationType::ValueSum) => {
269 entries.iter().map(|(_, v)| v).sum()
270 }
271 (WindowState::NumericAgg { entries }, CorrelationType::ValueAvg) => {
272 if entries.is_empty() {
273 0.0
274 } else {
275 let sum: f64 = entries.iter().map(|(_, v)| v).sum();
276 sum / entries.len() as f64
277 }
278 }
279 (WindowState::NumericAgg { entries }, CorrelationType::ValuePercentile) => {
280 if entries.is_empty() {
284 return None;
285 }
286 let mut values: Vec<f64> = entries
287 .iter()
288 .map(|(_, v)| *v)
289 .filter(|v| v.is_finite())
290 .collect();
291 if values.is_empty() {
292 return None;
293 }
294 values.sort_by(|a, b| a.total_cmp(b));
295 let percentile_rank = condition.percentile.map(|p| p as f64).unwrap_or(50.0);
296 let pval = percentile_linear_interp(&values, percentile_rank);
297 return Some(pval);
298 }
299 (WindowState::NumericAgg { entries }, CorrelationType::ValueMedian) => {
300 if entries.is_empty() {
304 return None;
305 }
306 let mut values: Vec<f64> = entries
307 .iter()
308 .map(|(_, v)| *v)
309 .filter(|v| v.is_finite())
310 .collect();
311 if values.is_empty() {
312 return None;
313 }
314 values.sort_by(|a, b| a.total_cmp(b));
315 let mid = values.len() / 2;
316 if values.len().is_multiple_of(2) && values.len() >= 2 {
317 (values[mid - 1] + values[mid]) / 2.0
318 } else {
319 values[mid]
320 }
321 }
322 _ => return None, };
324
325 if condition.check(value) {
326 Some(value)
327 } else {
328 None
329 }
330 }
331}
332
333fn bucket_start(ts: i64, timespan: i64) -> i64 {
338 ts - ts.rem_euclid(timespan)
339}
340
341#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343pub enum WindowDecision {
344 Extend,
346 Reset,
350 Discard,
353}
354
355pub fn apply_window_open(
370 state: &mut WindowState,
371 ts: i64,
372 timespan_secs: u64,
373 window: WindowMode,
374 gap_secs: Option<u64>,
375) -> WindowDecision {
376 let timespan = timespan_secs as i64;
377 match window {
378 WindowMode::Sliding => {
379 state.evict(ts - timespan);
380 WindowDecision::Extend
381 }
382 WindowMode::Tumbling => {
383 if timespan <= 0 {
384 return WindowDecision::Extend;
385 }
386 match state.latest_timestamp() {
387 Some(last) if bucket_start(ts, timespan) > bucket_start(last, timespan) => {
388 state.clear();
389 WindowDecision::Reset
390 }
391 Some(last) if bucket_start(ts, timespan) < bucket_start(last, timespan) => {
392 WindowDecision::Discard
393 }
394 _ => WindowDecision::Extend,
395 }
396 }
397 WindowMode::Session => {
398 let gap = gap_secs.map(|g| g as i64).unwrap_or(timespan);
401 let reset = match (state.earliest_timestamp(), state.latest_timestamp()) {
402 (Some(start), Some(last)) => {
403 (ts - last) > gap || (timespan > 0 && (ts - start) > timespan)
404 }
405 _ => false,
406 };
407 if reset {
408 state.clear();
409 WindowDecision::Reset
410 } else {
411 WindowDecision::Extend
412 }
413 }
414 }
415}
416
417fn check_temporal_ordered(
423 rule_refs: &[String],
424 rule_hits: &HashMap<String, VecDeque<i64>>,
425) -> bool {
426 if rule_refs.is_empty() {
427 return true;
428 }
429
430 for r in rule_refs {
432 if rule_hits.get(r.as_str()).is_none_or(|ts| ts.is_empty()) {
433 return false;
434 }
435 }
436
437 fn find_ordered(
440 rule_refs: &[String],
441 rule_hits: &HashMap<String, VecDeque<i64>>,
442 idx: usize,
443 min_ts: i64,
444 ) -> bool {
445 if idx >= rule_refs.len() {
446 return true;
447 }
448 let Some(timestamps) = rule_hits.get(&rule_refs[idx]) else {
449 return false;
450 };
451 for &ts in timestamps {
452 if ts >= min_ts && find_ordered(rule_refs, rule_hits, idx + 1, ts) {
453 return true;
454 }
455 }
456 false
457 }
458
459 find_ordered(rule_refs, rule_hits, 0, i64::MIN)
460}
461
462pub(super) fn eval_temporal_expr(
468 expr: &ConditionExpr,
469 rule_hits: &HashMap<String, VecDeque<i64>>,
470) -> bool {
471 match expr {
472 ConditionExpr::Identifier(name) => rule_hits
473 .get(name.as_str())
474 .is_some_and(|ts| !ts.is_empty()),
475 ConditionExpr::And(children) => children.iter().all(|c| eval_temporal_expr(c, rule_hits)),
476 ConditionExpr::Or(children) => children.iter().any(|c| eval_temporal_expr(c, rule_hits)),
477 ConditionExpr::Not(child) => !eval_temporal_expr(child, rule_hits),
478 ConditionExpr::Selector { .. } => {
479 false
481 }
482 }
483}
484
485pub(super) fn percentile_linear_interp(values: &[f64], percentile: f64) -> f64 {
491 if values.is_empty() {
492 return 0.0;
493 }
494 let n = values.len();
495 if n == 1 {
496 return values[0];
497 }
498
499 let p = percentile.clamp(0.0, 100.0) / 100.0;
501
502 let rank = p * (n - 1) as f64;
505 let lower = rank.floor() as usize;
506 let upper = rank.ceil() as usize;
507 let fraction = rank - lower as f64;
508
509 if lower == upper || upper >= n {
510 values[lower.min(n - 1)]
511 } else {
512 values[lower] + fraction * (values[upper] - values[lower])
513 }
514}