1use std::collections::{HashMap, HashSet, VecDeque};
2
3use rsigma_parser::{ConditionExpr, CorrelationType, WindowMode};
4use serde::Serialize;
5
6use super::CompiledCondition;
7
8#[derive(Debug, Clone, Serialize, serde::Deserialize)]
16pub enum WindowState {
17 EventCount { timestamps: VecDeque<i64> },
19 ValueCount { entries: VecDeque<(i64, String)> },
21 Temporal {
23 rule_hits: HashMap<String, VecDeque<i64>>,
24 },
25 NumericAgg { entries: VecDeque<(i64, f64)> },
28}
29
30impl WindowState {
31 pub fn new_for(corr_type: CorrelationType) -> Self {
33 match corr_type {
34 CorrelationType::EventCount => WindowState::EventCount {
35 timestamps: VecDeque::new(),
36 },
37 CorrelationType::ValueCount => WindowState::ValueCount {
38 entries: VecDeque::new(),
39 },
40 CorrelationType::Temporal | CorrelationType::TemporalOrdered => WindowState::Temporal {
41 rule_hits: HashMap::new(),
42 },
43 CorrelationType::ValueSum
44 | CorrelationType::ValueAvg
45 | CorrelationType::ValuePercentile
46 | CorrelationType::ValueMedian => WindowState::NumericAgg {
47 entries: VecDeque::new(),
48 },
49 }
50 }
51
52 pub fn evict(&mut self, cutoff: i64) {
54 match self {
55 WindowState::EventCount { timestamps } => {
56 while timestamps.front().is_some_and(|&t| t < cutoff) {
57 timestamps.pop_front();
58 }
59 }
60 WindowState::ValueCount { entries } => {
61 while entries.front().is_some_and(|(t, _)| *t < cutoff) {
62 entries.pop_front();
63 }
64 }
65 WindowState::Temporal { rule_hits } => {
66 for timestamps in rule_hits.values_mut() {
67 while timestamps.front().is_some_and(|&t| t < cutoff) {
68 timestamps.pop_front();
69 }
70 }
71 rule_hits.retain(|_, ts| !ts.is_empty());
73 }
74 WindowState::NumericAgg { entries } => {
75 while entries.front().is_some_and(|(t, _)| *t < cutoff) {
76 entries.pop_front();
77 }
78 }
79 }
80 }
81
82 pub fn is_empty(&self) -> bool {
84 match self {
85 WindowState::EventCount { timestamps } => timestamps.is_empty(),
86 WindowState::ValueCount { entries } => entries.is_empty(),
87 WindowState::Temporal { rule_hits } => rule_hits.is_empty(),
88 WindowState::NumericAgg { entries } => entries.is_empty(),
89 }
90 }
91
92 pub fn latest_timestamp(&self) -> Option<i64> {
94 match self {
95 WindowState::EventCount { timestamps } => timestamps.back().copied(),
96 WindowState::ValueCount { entries } => entries.back().map(|(t, _)| *t),
97 WindowState::Temporal { rule_hits } => {
98 rule_hits.values().filter_map(|ts| ts.back().copied()).max()
99 }
100 WindowState::NumericAgg { entries } => entries.back().map(|(t, _)| *t),
101 }
102 }
103
104 pub fn earliest_timestamp(&self) -> Option<i64> {
110 match self {
111 WindowState::EventCount { timestamps } => timestamps.front().copied(),
112 WindowState::ValueCount { entries } => entries.front().map(|(t, _)| *t),
113 WindowState::Temporal { rule_hits } => rule_hits
114 .values()
115 .filter_map(|ts| ts.front().copied())
116 .min(),
117 WindowState::NumericAgg { entries } => entries.front().map(|(t, _)| *t),
118 }
119 }
120
121 pub fn truncate_oldest(&mut self, cap: usize, preserve_front: bool) {
133 let cap = cap.max(1);
134
135 fn truncate_deque<T>(deque: &mut VecDeque<T>, cap: usize, preserve_front: bool) {
136 if deque.len() <= cap {
137 return;
138 }
139 if preserve_front {
140 let anchor = deque.pop_front().expect("len > cap >= 1");
141 let excess = deque.len() - (cap - 1);
143 deque.drain(..excess);
144 deque.push_front(anchor);
145 } else {
146 let excess = deque.len() - cap;
147 deque.drain(..excess);
148 }
149 }
150
151 match self {
152 WindowState::EventCount { timestamps } => {
153 truncate_deque(timestamps, cap, preserve_front);
154 }
155 WindowState::ValueCount { entries } => {
156 truncate_deque(entries, cap, preserve_front);
157 }
158 WindowState::Temporal { rule_hits } => {
159 for timestamps in rule_hits.values_mut() {
160 truncate_deque(timestamps, cap, preserve_front);
161 }
162 }
163 WindowState::NumericAgg { entries } => {
164 truncate_deque(entries, cap, preserve_front);
165 }
166 }
167 }
168
169 pub fn clear(&mut self) {
171 match self {
172 WindowState::EventCount { timestamps } => timestamps.clear(),
173 WindowState::ValueCount { entries } => entries.clear(),
174 WindowState::Temporal { rule_hits } => rule_hits.clear(),
175 WindowState::NumericAgg { entries } => entries.clear(),
176 }
177 }
178
179 pub fn push_event_count(&mut self, ts: i64) {
181 if let WindowState::EventCount { timestamps } = self {
182 timestamps.push_back(ts);
183 }
184 }
185
186 pub fn push_value_count(&mut self, ts: i64, value: String) {
188 if let WindowState::ValueCount { entries } = self {
189 entries.push_back((ts, value));
190 }
191 }
192
193 pub fn push_temporal(&mut self, ts: i64, rule_ref: &str) {
195 if let WindowState::Temporal { rule_hits } = self {
196 rule_hits
197 .entry(rule_ref.to_string())
198 .or_default()
199 .push_back(ts);
200 }
201 }
202
203 pub fn push_numeric(&mut self, ts: i64, value: f64) {
205 if let WindowState::NumericAgg { entries } = self {
206 entries.push_back((ts, value));
207 }
208 }
209
210 pub fn check_condition(
218 &self,
219 condition: &CompiledCondition,
220 corr_type: CorrelationType,
221 rule_refs: &[String],
222 extended_expr: Option<&ConditionExpr>,
223 ) -> Option<f64> {
224 let value = match (self, corr_type) {
225 (WindowState::EventCount { timestamps }, CorrelationType::EventCount) => {
226 timestamps.len() as f64
227 }
228 (WindowState::ValueCount { entries }, CorrelationType::ValueCount) => {
229 let distinct: HashSet<&String> = entries.iter().map(|(_, v)| v).collect();
231 distinct.len() as f64
232 }
233 (WindowState::Temporal { rule_hits }, CorrelationType::Temporal) => {
234 if let Some(expr) = extended_expr {
236 if eval_temporal_expr(expr, rule_hits) {
237 let fired: usize = rule_refs
239 .iter()
240 .filter(|r| rule_hits.get(r.as_str()).is_some_and(|ts| !ts.is_empty()))
241 .count();
242 return Some(fired as f64);
243 } else {
244 return None;
245 }
246 }
247 let fired: usize = rule_refs
249 .iter()
250 .filter(|r| rule_hits.get(r.as_str()).is_some_and(|ts| !ts.is_empty()))
251 .count();
252 fired as f64
253 }
254 (WindowState::Temporal { rule_hits }, CorrelationType::TemporalOrdered) => {
255 if let Some(expr) = extended_expr
257 && !eval_temporal_expr(expr, rule_hits)
258 {
259 return None;
260 }
261 if check_temporal_ordered(rule_refs, rule_hits) {
263 rule_refs.len() as f64
264 } else {
265 0.0
266 }
267 }
268 (WindowState::NumericAgg { entries }, CorrelationType::ValueSum) => {
269 entries.iter().map(|(_, v)| v).sum()
270 }
271 (WindowState::NumericAgg { entries }, CorrelationType::ValueAvg) => {
272 if entries.is_empty() {
273 0.0
274 } else {
275 let sum: f64 = entries.iter().map(|(_, v)| v).sum();
276 sum / entries.len() as f64
277 }
278 }
279 (WindowState::NumericAgg { entries }, CorrelationType::ValuePercentile) => {
280 if entries.is_empty() {
284 return None;
285 }
286 let mut values: Vec<f64> = entries
287 .iter()
288 .map(|(_, v)| *v)
289 .filter(|v| v.is_finite())
290 .collect();
291 if values.is_empty() {
292 return None;
293 }
294 values.sort_by(|a, b| a.total_cmp(b));
295 let percentile_rank = condition.percentile.map(|p| p as f64).unwrap_or(50.0);
296 let pval = percentile_linear_interp(&values, percentile_rank);
297 return Some(pval);
298 }
299 (WindowState::NumericAgg { entries }, CorrelationType::ValueMedian) => {
300 if entries.is_empty() {
304 return None;
305 }
306 let mut values: Vec<f64> = entries
307 .iter()
308 .map(|(_, v)| *v)
309 .filter(|v| v.is_finite())
310 .collect();
311 if values.is_empty() {
312 return None;
313 }
314 values.sort_by(|a, b| a.total_cmp(b));
315 let mid = values.len() / 2;
316 if values.len().is_multiple_of(2) && values.len() >= 2 {
317 (values[mid - 1] + values[mid]) / 2.0
318 } else {
319 values[mid]
320 }
321 }
322 _ => return None, };
324
325 if condition.check(value) {
326 Some(value)
327 } else {
328 None
329 }
330 }
331}
332
333impl WindowState {
334 pub fn entry_count(&self) -> usize {
337 match self {
338 WindowState::EventCount { timestamps } => timestamps.len(),
339 WindowState::ValueCount { entries } => entries.len(),
340 WindowState::Temporal { rule_hits } => rule_hits.values().map(VecDeque::len).sum(),
341 WindowState::NumericAgg { entries } => entries.len(),
342 }
343 }
344
345 pub fn current_value(
354 &self,
355 corr_type: CorrelationType,
356 rule_refs: &[String],
357 percentile: Option<u64>,
358 ) -> Option<f64> {
359 match (self, corr_type) {
360 (WindowState::EventCount { timestamps }, CorrelationType::EventCount) => {
361 Some(timestamps.len() as f64)
362 }
363 (WindowState::ValueCount { entries }, CorrelationType::ValueCount) => {
364 let distinct: HashSet<&String> = entries.iter().map(|(_, v)| v).collect();
365 Some(distinct.len() as f64)
366 }
367 (
368 WindowState::Temporal { rule_hits },
369 CorrelationType::Temporal | CorrelationType::TemporalOrdered,
370 ) => {
371 let fired = rule_refs
372 .iter()
373 .filter(|r| rule_hits.get(r.as_str()).is_some_and(|ts| !ts.is_empty()))
374 .count();
375 Some(fired as f64)
376 }
377 (WindowState::NumericAgg { entries }, CorrelationType::ValueSum) => {
378 Some(entries.iter().map(|(_, v)| v).sum())
379 }
380 (WindowState::NumericAgg { entries }, CorrelationType::ValueAvg) => {
381 if entries.is_empty() {
382 Some(0.0)
383 } else {
384 let sum: f64 = entries.iter().map(|(_, v)| v).sum();
385 Some(sum / entries.len() as f64)
386 }
387 }
388 (WindowState::NumericAgg { entries }, CorrelationType::ValuePercentile) => {
389 let mut values: Vec<f64> = entries
390 .iter()
391 .map(|(_, v)| *v)
392 .filter(|v| v.is_finite())
393 .collect();
394 if values.is_empty() {
395 return None;
396 }
397 values.sort_by(|a, b| a.total_cmp(b));
398 let rank = percentile.map(|p| p as f64).unwrap_or(50.0);
399 Some(percentile_linear_interp(&values, rank))
400 }
401 (WindowState::NumericAgg { entries }, CorrelationType::ValueMedian) => {
402 let mut values: Vec<f64> = entries
403 .iter()
404 .map(|(_, v)| *v)
405 .filter(|v| v.is_finite())
406 .collect();
407 if values.is_empty() {
408 return None;
409 }
410 values.sort_by(|a, b| a.total_cmp(b));
411 let mid = values.len() / 2;
412 let v = if values.len().is_multiple_of(2) && values.len() >= 2 {
413 (values[mid - 1] + values[mid]) / 2.0
414 } else {
415 values[mid]
416 };
417 Some(v)
418 }
419 _ => None,
420 }
421 }
422}
423
424fn bucket_start(ts: i64, timespan: i64) -> i64 {
429 ts - ts.rem_euclid(timespan)
430}
431
432#[derive(Debug, Clone, Copy, PartialEq, Eq)]
434pub enum WindowDecision {
435 Extend,
437 Reset,
441 Discard,
444}
445
446pub fn apply_window_open(
461 state: &mut WindowState,
462 ts: i64,
463 timespan_secs: u64,
464 window: WindowMode,
465 gap_secs: Option<u64>,
466) -> WindowDecision {
467 let timespan = timespan_secs as i64;
468 match window {
469 WindowMode::Sliding => {
470 state.evict(ts - timespan);
471 WindowDecision::Extend
472 }
473 WindowMode::Tumbling => {
474 if timespan <= 0 {
475 return WindowDecision::Extend;
476 }
477 match state.latest_timestamp() {
478 Some(last) if bucket_start(ts, timespan) > bucket_start(last, timespan) => {
479 state.clear();
480 WindowDecision::Reset
481 }
482 Some(last) if bucket_start(ts, timespan) < bucket_start(last, timespan) => {
483 WindowDecision::Discard
484 }
485 _ => WindowDecision::Extend,
486 }
487 }
488 WindowMode::Session => {
489 let gap = gap_secs.map(|g| g as i64).unwrap_or(timespan);
492 let reset = match (state.earliest_timestamp(), state.latest_timestamp()) {
493 (Some(start), Some(last)) => {
494 (ts - last) > gap || (timespan > 0 && (ts - start) > timespan)
495 }
496 _ => false,
497 };
498 if reset {
499 state.clear();
500 WindowDecision::Reset
501 } else {
502 WindowDecision::Extend
503 }
504 }
505 }
506}
507
508fn check_temporal_ordered(
514 rule_refs: &[String],
515 rule_hits: &HashMap<String, VecDeque<i64>>,
516) -> bool {
517 if rule_refs.is_empty() {
518 return true;
519 }
520
521 for r in rule_refs {
523 if rule_hits.get(r.as_str()).is_none_or(|ts| ts.is_empty()) {
524 return false;
525 }
526 }
527
528 fn find_ordered(
531 rule_refs: &[String],
532 rule_hits: &HashMap<String, VecDeque<i64>>,
533 idx: usize,
534 min_ts: i64,
535 ) -> bool {
536 if idx >= rule_refs.len() {
537 return true;
538 }
539 let Some(timestamps) = rule_hits.get(&rule_refs[idx]) else {
540 return false;
541 };
542 for &ts in timestamps {
543 if ts >= min_ts && find_ordered(rule_refs, rule_hits, idx + 1, ts) {
544 return true;
545 }
546 }
547 false
548 }
549
550 find_ordered(rule_refs, rule_hits, 0, i64::MIN)
551}
552
553pub(super) fn eval_temporal_expr(
559 expr: &ConditionExpr,
560 rule_hits: &HashMap<String, VecDeque<i64>>,
561) -> bool {
562 match expr {
563 ConditionExpr::Identifier(name) => rule_hits
564 .get(name.as_str())
565 .is_some_and(|ts| !ts.is_empty()),
566 ConditionExpr::And(children) => children.iter().all(|c| eval_temporal_expr(c, rule_hits)),
567 ConditionExpr::Or(children) => children.iter().any(|c| eval_temporal_expr(c, rule_hits)),
568 ConditionExpr::Not(child) => !eval_temporal_expr(child, rule_hits),
569 ConditionExpr::Selector { .. } => {
570 false
572 }
573 }
574}
575
576pub(super) fn percentile_linear_interp(values: &[f64], percentile: f64) -> f64 {
582 if values.is_empty() {
583 return 0.0;
584 }
585 let n = values.len();
586 if n == 1 {
587 return values[0];
588 }
589
590 let p = percentile.clamp(0.0, 100.0) / 100.0;
592
593 let rank = p * (n - 1) as f64;
596 let lower = rank.floor() as usize;
597 let upper = rank.ceil() as usize;
598 let fraction = rank - lower as f64;
599
600 if lower == upper || upper >= n {
601 values[lower.min(n - 1)]
602 } else {
603 values[lower] + fraction * (values[upper] - values[lower])
604 }
605}