1use std::collections::{HashMap, HashSet, VecDeque};
7use std::io::{Read as IoRead, Write as IoWrite};
8use std::sync::Arc;
9
10use flate2::Compression;
11use flate2::read::DeflateDecoder;
12use flate2::write::DeflateEncoder;
13use serde::Serialize;
14
15use rsigma_parser::{
16 ConditionExpr, ConditionOperator, CorrelationCondition, CorrelationRule, CorrelationType,
17 FieldAlias, Level,
18};
19
20use crate::error::{EvalError, Result};
21use crate::event::{Event, EventValue};
22
23#[derive(Debug, Clone)]
29pub struct CompiledCorrelation {
30 pub id: Option<String>,
31 pub name: Option<String>,
32 pub title: String,
33 pub level: Option<Level>,
34 pub tags: Vec<String>,
35 pub correlation_type: CorrelationType,
36 pub rule_refs: Vec<String>,
38 pub group_by: Vec<GroupByField>,
40 pub timespan_secs: u64,
42 pub condition: CompiledCondition,
44 pub extended_expr: Option<ConditionExpr>,
48 pub generate: bool,
50 pub suppress_secs: Option<u64>,
53 pub action: Option<crate::correlation_engine::CorrelationAction>,
56 pub event_mode: Option<crate::correlation_engine::CorrelationEventMode>,
59 pub max_events: Option<usize>,
62 pub custom_attributes: Arc<HashMap<String, serde_json::Value>>,
65}
66
67#[derive(Debug, Clone)]
69pub enum GroupByField {
70 Direct(String),
72 Aliased {
74 alias: String,
75 mapping: HashMap<String, String>,
76 },
77}
78
79impl GroupByField {
80 pub fn name(&self) -> &str {
82 match self {
83 GroupByField::Direct(s) => s,
84 GroupByField::Aliased { alias, .. } => alias,
85 }
86 }
87
88 pub fn resolve(&self, rule_refs: &[&str]) -> &str {
94 match self {
95 GroupByField::Direct(s) => s,
96 GroupByField::Aliased { alias, mapping } => {
97 for r in rule_refs {
98 if let Some(field) = mapping.get(*r) {
99 return field.as_str();
100 }
101 }
102 alias
103 }
104 }
105 }
106}
107
108#[derive(Debug, Clone)]
110pub struct CompiledCondition {
111 pub field: Option<String>,
113 pub predicates: Vec<(ConditionOperator, f64)>,
115}
116
117impl CompiledCondition {
118 pub fn check(&self, value: f64) -> bool {
120 self.predicates.iter().all(|(op, threshold)| match op {
121 ConditionOperator::Lt => value < *threshold,
122 ConditionOperator::Lte => value <= *threshold,
123 ConditionOperator::Gt => value > *threshold,
124 ConditionOperator::Gte => value >= *threshold,
125 ConditionOperator::Eq => (value - *threshold).abs() < f64::EPSILON,
126 ConditionOperator::Neq => (value - *threshold).abs() >= f64::EPSILON,
127 })
128 }
129}
130
131#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, serde::Deserialize)]
140pub struct GroupKey(pub Vec<Option<String>>);
141
142impl GroupKey {
143 pub fn extract(event: &impl Event, group_by: &[GroupByField], rule_refs: &[&str]) -> Self {
146 let values = group_by
147 .iter()
148 .map(|field| {
149 let field_name = field.resolve(rule_refs);
150 event
151 .get_field(field_name)
152 .and_then(|v| value_to_string(&v))
153 })
154 .collect();
155 GroupKey(values)
156 }
157
158 pub fn from_pairs(pairs: &[(String, String)], group_by: &[GroupByField]) -> Self {
160 let values = group_by
161 .iter()
162 .map(|field| {
163 let name = field.name();
164 pairs
165 .iter()
166 .find(|(k, _)| k == name)
167 .map(|(_, v)| v.clone())
168 })
169 .collect();
170 GroupKey(values)
171 }
172
173 pub fn to_pairs(&self, group_by: &[GroupByField]) -> Vec<(String, String)> {
175 group_by
176 .iter()
177 .zip(self.0.iter())
178 .filter_map(|(field, value)| {
179 value
180 .as_ref()
181 .map(|v| (field.name().to_string(), v.clone()))
182 })
183 .collect()
184 }
185}
186
187fn value_to_string(v: &EventValue) -> Option<String> {
189 match v {
190 EventValue::Str(s) => Some(s.to_string()),
191 EventValue::Int(n) => Some(n.to_string()),
192 EventValue::Float(f) => Some(f.to_string()),
193 EventValue::Bool(b) => Some(b.to_string()),
194 _ => None,
195 }
196}
197
198const COMPRESSION_LEVEL: Compression = Compression::fast();
205
206#[derive(Debug, Clone, Serialize, serde::Deserialize)]
221pub struct EventBuffer {
222 #[serde(with = "event_buffer_serde")]
224 entries: VecDeque<(i64, Vec<u8>)>,
225 max_events: usize,
228}
229
230mod event_buffer_serde {
233 use serde::{Deserialize, Deserializer, Serialize, Serializer};
234 use std::collections::VecDeque;
235
236 #[derive(Serialize, Deserialize)]
237 struct Entry {
238 ts: i64,
239 #[serde(with = "base64_bytes")]
240 data: Vec<u8>,
241 }
242
243 mod base64_bytes {
244 use base64::Engine as _;
245 use base64::engine::general_purpose::STANDARD;
246 use serde::{Deserializer, Serializer};
247
248 pub fn serialize<S: Serializer>(bytes: &Vec<u8>, s: S) -> Result<S::Ok, S::Error> {
249 s.serialize_str(&STANDARD.encode(bytes))
250 }
251
252 pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Vec<u8>, D::Error> {
253 let s: String = serde::Deserialize::deserialize(d)?;
254 STANDARD.decode(s).map_err(serde::de::Error::custom)
255 }
256 }
257
258 pub fn serialize<S: Serializer>(
259 entries: &VecDeque<(i64, Vec<u8>)>,
260 s: S,
261 ) -> Result<S::Ok, S::Error> {
262 let v: Vec<Entry> = entries
263 .iter()
264 .map(|(ts, data)| Entry {
265 ts: *ts,
266 data: data.clone(),
267 })
268 .collect();
269 v.serialize(s)
270 }
271
272 pub fn deserialize<'de, D: Deserializer<'de>>(
273 d: D,
274 ) -> Result<VecDeque<(i64, Vec<u8>)>, D::Error> {
275 let v: Vec<Entry> = Vec::deserialize(d)?;
276 Ok(v.into_iter().map(|e| (e.ts, e.data)).collect())
277 }
278}
279
280impl EventBuffer {
281 pub fn new(max_events: usize) -> Self {
283 EventBuffer {
284 entries: VecDeque::with_capacity(max_events.min(64)),
285 max_events,
286 }
287 }
288
289 pub fn push(&mut self, ts: i64, event: &serde_json::Value) {
291 if let Some(compressed) = compress_event(event) {
293 if self.entries.len() >= self.max_events {
294 self.entries.pop_front();
295 }
296 self.entries.push_back((ts, compressed));
297 }
298 }
299
300 pub fn evict(&mut self, cutoff: i64) {
302 while self.entries.front().is_some_and(|(t, _)| *t < cutoff) {
303 self.entries.pop_front();
304 }
305 }
306
307 pub fn decompress_all(&self) -> Vec<serde_json::Value> {
309 self.entries
310 .iter()
311 .filter_map(|(_, compressed)| decompress_event(compressed))
312 .collect()
313 }
314
315 pub fn is_empty(&self) -> bool {
317 self.entries.is_empty()
318 }
319
320 pub fn clear(&mut self) {
322 self.entries.clear();
323 }
324
325 pub fn compressed_bytes(&self) -> usize {
327 self.entries.iter().map(|(_, data)| data.len()).sum()
328 }
329
330 pub fn len(&self) -> usize {
332 self.entries.len()
333 }
334}
335
336fn compress_event(event: &serde_json::Value) -> Option<Vec<u8>> {
338 let json_bytes = serde_json::to_vec(event).ok()?;
339 let mut encoder = DeflateEncoder::new(Vec::new(), COMPRESSION_LEVEL);
340 encoder.write_all(&json_bytes).ok()?;
341 encoder.finish().ok()
342}
343
344fn decompress_event(compressed: &[u8]) -> Option<serde_json::Value> {
346 let mut decoder = DeflateDecoder::new(compressed);
347 let mut json_bytes = Vec::new();
348 decoder.read_to_end(&mut json_bytes).ok()?;
349 serde_json::from_slice(&json_bytes).ok()
350}
351
352#[derive(Debug, Clone, Serialize, serde::Deserialize)]
363pub struct EventRef {
364 pub timestamp: i64,
366 #[serde(skip_serializing_if = "Option::is_none")]
368 pub id: Option<String>,
369}
370
371#[derive(Debug, Clone, Serialize, serde::Deserialize)]
376pub struct EventRefBuffer {
377 entries: VecDeque<EventRef>,
379 max_events: usize,
381}
382
383impl EventRefBuffer {
384 pub fn new(max_events: usize) -> Self {
386 EventRefBuffer {
387 entries: VecDeque::with_capacity(max_events.min(64)),
388 max_events,
389 }
390 }
391
392 pub fn push(&mut self, ts: i64, event: &serde_json::Value) {
394 if self.entries.len() >= self.max_events {
395 self.entries.pop_front();
396 }
397 let id = extract_event_id(event);
398 self.entries.push_back(EventRef { timestamp: ts, id });
399 }
400
401 pub fn evict(&mut self, cutoff: i64) {
403 while self.entries.front().is_some_and(|r| r.timestamp < cutoff) {
404 self.entries.pop_front();
405 }
406 }
407
408 pub fn refs(&self) -> Vec<EventRef> {
410 self.entries.iter().cloned().collect()
411 }
412
413 pub fn is_empty(&self) -> bool {
415 self.entries.is_empty()
416 }
417
418 pub fn clear(&mut self) {
420 self.entries.clear();
421 }
422
423 pub fn len(&self) -> usize {
425 self.entries.len()
426 }
427}
428
429fn extract_event_id(event: &serde_json::Value) -> Option<String> {
434 const ID_FIELDS: &[&str] = &["id", "_id", "event_id", "EventRecordID", "event.id"];
435 for field in ID_FIELDS {
436 if let Some(val) = event.get(field) {
437 return match val {
438 serde_json::Value::String(s) => Some(s.clone()),
439 serde_json::Value::Number(n) => Some(n.to_string()),
440 _ => None,
441 };
442 }
443 }
444 None
445}
446
447#[derive(Debug, Clone, Serialize, serde::Deserialize)]
455pub enum WindowState {
456 EventCount { timestamps: VecDeque<i64> },
458 ValueCount { entries: VecDeque<(i64, String)> },
460 Temporal {
462 rule_hits: HashMap<String, VecDeque<i64>>,
463 },
464 NumericAgg { entries: VecDeque<(i64, f64)> },
467}
468
469impl WindowState {
470 pub fn new_for(corr_type: CorrelationType) -> Self {
472 match corr_type {
473 CorrelationType::EventCount => WindowState::EventCount {
474 timestamps: VecDeque::new(),
475 },
476 CorrelationType::ValueCount => WindowState::ValueCount {
477 entries: VecDeque::new(),
478 },
479 CorrelationType::Temporal | CorrelationType::TemporalOrdered => WindowState::Temporal {
480 rule_hits: HashMap::new(),
481 },
482 CorrelationType::ValueSum
483 | CorrelationType::ValueAvg
484 | CorrelationType::ValuePercentile
485 | CorrelationType::ValueMedian => WindowState::NumericAgg {
486 entries: VecDeque::new(),
487 },
488 }
489 }
490
491 pub fn evict(&mut self, cutoff: i64) {
493 match self {
494 WindowState::EventCount { timestamps } => {
495 while timestamps.front().is_some_and(|&t| t < cutoff) {
496 timestamps.pop_front();
497 }
498 }
499 WindowState::ValueCount { entries } => {
500 while entries.front().is_some_and(|(t, _)| *t < cutoff) {
501 entries.pop_front();
502 }
503 }
504 WindowState::Temporal { rule_hits } => {
505 for timestamps in rule_hits.values_mut() {
506 while timestamps.front().is_some_and(|&t| t < cutoff) {
507 timestamps.pop_front();
508 }
509 }
510 rule_hits.retain(|_, ts| !ts.is_empty());
512 }
513 WindowState::NumericAgg { entries } => {
514 while entries.front().is_some_and(|(t, _)| *t < cutoff) {
515 entries.pop_front();
516 }
517 }
518 }
519 }
520
521 pub fn is_empty(&self) -> bool {
523 match self {
524 WindowState::EventCount { timestamps } => timestamps.is_empty(),
525 WindowState::ValueCount { entries } => entries.is_empty(),
526 WindowState::Temporal { rule_hits } => rule_hits.is_empty(),
527 WindowState::NumericAgg { entries } => entries.is_empty(),
528 }
529 }
530
531 pub fn latest_timestamp(&self) -> Option<i64> {
533 match self {
534 WindowState::EventCount { timestamps } => timestamps.back().copied(),
535 WindowState::ValueCount { entries } => entries.back().map(|(t, _)| *t),
536 WindowState::Temporal { rule_hits } => {
537 rule_hits.values().filter_map(|ts| ts.back().copied()).max()
538 }
539 WindowState::NumericAgg { entries } => entries.back().map(|(t, _)| *t),
540 }
541 }
542
543 pub fn clear(&mut self) {
545 match self {
546 WindowState::EventCount { timestamps } => timestamps.clear(),
547 WindowState::ValueCount { entries } => entries.clear(),
548 WindowState::Temporal { rule_hits } => rule_hits.clear(),
549 WindowState::NumericAgg { entries } => entries.clear(),
550 }
551 }
552
553 pub fn push_event_count(&mut self, ts: i64) {
555 if let WindowState::EventCount { timestamps } = self {
556 timestamps.push_back(ts);
557 }
558 }
559
560 pub fn push_value_count(&mut self, ts: i64, value: String) {
562 if let WindowState::ValueCount { entries } = self {
563 entries.push_back((ts, value));
564 }
565 }
566
567 pub fn push_temporal(&mut self, ts: i64, rule_ref: &str) {
569 if let WindowState::Temporal { rule_hits } = self {
570 rule_hits
571 .entry(rule_ref.to_string())
572 .or_default()
573 .push_back(ts);
574 }
575 }
576
577 pub fn push_numeric(&mut self, ts: i64, value: f64) {
579 if let WindowState::NumericAgg { entries } = self {
580 entries.push_back((ts, value));
581 }
582 }
583
584 pub fn check_condition(
592 &self,
593 condition: &CompiledCondition,
594 corr_type: CorrelationType,
595 rule_refs: &[String],
596 extended_expr: Option<&ConditionExpr>,
597 ) -> Option<f64> {
598 let value = match (self, corr_type) {
599 (WindowState::EventCount { timestamps }, CorrelationType::EventCount) => {
600 timestamps.len() as f64
601 }
602 (WindowState::ValueCount { entries }, CorrelationType::ValueCount) => {
603 let distinct: HashSet<&String> = entries.iter().map(|(_, v)| v).collect();
605 distinct.len() as f64
606 }
607 (WindowState::Temporal { rule_hits }, CorrelationType::Temporal) => {
608 if let Some(expr) = extended_expr {
610 if eval_temporal_expr(expr, rule_hits) {
611 let fired: usize = rule_refs
613 .iter()
614 .filter(|r| rule_hits.get(r.as_str()).is_some_and(|ts| !ts.is_empty()))
615 .count();
616 return Some(fired as f64);
617 } else {
618 return None;
619 }
620 }
621 let fired: usize = rule_refs
623 .iter()
624 .filter(|r| rule_hits.get(r.as_str()).is_some_and(|ts| !ts.is_empty()))
625 .count();
626 fired as f64
627 }
628 (WindowState::Temporal { rule_hits }, CorrelationType::TemporalOrdered) => {
629 if let Some(expr) = extended_expr
631 && !eval_temporal_expr(expr, rule_hits)
632 {
633 return None;
634 }
635 if check_temporal_ordered(rule_refs, rule_hits) {
637 rule_refs.len() as f64
638 } else {
639 0.0
640 }
641 }
642 (WindowState::NumericAgg { entries }, CorrelationType::ValueSum) => {
643 entries.iter().map(|(_, v)| v).sum()
644 }
645 (WindowState::NumericAgg { entries }, CorrelationType::ValueAvg) => {
646 if entries.is_empty() {
647 0.0
648 } else {
649 let sum: f64 = entries.iter().map(|(_, v)| v).sum();
650 sum / entries.len() as f64
651 }
652 }
653 (WindowState::NumericAgg { entries }, CorrelationType::ValuePercentile) => {
654 if entries.is_empty() {
658 return None;
659 }
660 let mut values: Vec<f64> = entries
661 .iter()
662 .map(|(_, v)| *v)
663 .filter(|v| v.is_finite())
664 .collect();
665 if values.is_empty() {
666 return None;
667 }
668 values.sort_by(|a, b| a.partial_cmp(b).expect("NaN filtered"));
669 let percentile_rank = condition
671 .predicates
672 .first()
673 .map(|(_, threshold)| *threshold)
674 .unwrap_or(50.0);
675 let pval = percentile_linear_interp(&values, percentile_rank);
676 return Some(pval);
677 }
678 (WindowState::NumericAgg { entries }, CorrelationType::ValueMedian) => {
679 if entries.is_empty() {
680 0.0
681 } else {
682 let mut values: Vec<f64> = entries
683 .iter()
684 .map(|(_, v)| *v)
685 .filter(|v| v.is_finite())
686 .collect();
687 if values.is_empty() {
688 return None;
689 }
690 values.sort_by(|a, b| a.partial_cmp(b).expect("NaN filtered"));
691 let mid = values.len() / 2;
692 if values.len().is_multiple_of(2) && values.len() >= 2 {
693 (values[mid - 1] + values[mid]) / 2.0
694 } else {
695 values[mid]
696 }
697 }
698 }
699 _ => return None, };
701
702 if condition.check(value) {
703 Some(value)
704 } else {
705 None
706 }
707 }
708}
709
710fn check_temporal_ordered(
716 rule_refs: &[String],
717 rule_hits: &HashMap<String, VecDeque<i64>>,
718) -> bool {
719 if rule_refs.is_empty() {
720 return true;
721 }
722
723 for r in rule_refs {
725 if rule_hits.get(r.as_str()).is_none_or(|ts| ts.is_empty()) {
726 return false;
727 }
728 }
729
730 fn find_ordered(
733 rule_refs: &[String],
734 rule_hits: &HashMap<String, VecDeque<i64>>,
735 idx: usize,
736 min_ts: i64,
737 ) -> bool {
738 if idx >= rule_refs.len() {
739 return true;
740 }
741 let Some(timestamps) = rule_hits.get(&rule_refs[idx]) else {
742 return false;
743 };
744 for &ts in timestamps {
745 if ts >= min_ts && find_ordered(rule_refs, rule_hits, idx + 1, ts) {
746 return true;
747 }
748 }
749 false
750 }
751
752 find_ordered(rule_refs, rule_hits, 0, i64::MIN)
753}
754
755fn eval_temporal_expr(expr: &ConditionExpr, rule_hits: &HashMap<String, VecDeque<i64>>) -> bool {
761 match expr {
762 ConditionExpr::Identifier(name) => rule_hits
763 .get(name.as_str())
764 .is_some_and(|ts| !ts.is_empty()),
765 ConditionExpr::And(children) => children.iter().all(|c| eval_temporal_expr(c, rule_hits)),
766 ConditionExpr::Or(children) => children.iter().any(|c| eval_temporal_expr(c, rule_hits)),
767 ConditionExpr::Not(child) => !eval_temporal_expr(child, rule_hits),
768 ConditionExpr::Selector { .. } => {
769 false
771 }
772 }
773}
774
775fn percentile_linear_interp(values: &[f64], percentile: f64) -> f64 {
781 if values.is_empty() {
782 return 0.0;
783 }
784 let n = values.len();
785 if n == 1 {
786 return values[0];
787 }
788
789 let p = percentile.clamp(0.0, 100.0) / 100.0;
791
792 let rank = p * (n - 1) as f64;
795 let lower = rank.floor() as usize;
796 let upper = rank.ceil() as usize;
797 let fraction = rank - lower as f64;
798
799 if lower == upper || upper >= n {
800 values[lower.min(n - 1)]
801 } else {
802 values[lower] + fraction * (values[upper] - values[lower])
803 }
804}
805
806pub fn compile_correlation(rule: &CorrelationRule) -> Result<CompiledCorrelation> {
812 let alias_map: HashMap<&str, &FieldAlias> =
814 rule.aliases.iter().map(|a| (a.alias.as_str(), a)).collect();
815
816 let group_by: Vec<GroupByField> = rule
817 .group_by
818 .iter()
819 .map(|field_name| {
820 if let Some(alias) = alias_map.get(field_name.as_str()) {
821 GroupByField::Aliased {
822 alias: field_name.clone(),
823 mapping: alias.mapping.clone(),
824 }
825 } else {
826 GroupByField::Direct(field_name.clone())
827 }
828 })
829 .collect();
830
831 let (condition, extended_expr) = compile_condition(&rule.condition, rule.correlation_type)?;
833
834 let suppress_secs = rule
838 .custom_attributes
839 .get("rsigma.suppress")
840 .and_then(|v| v.as_str())
841 .and_then(|s| rsigma_parser::Timespan::parse(s).ok())
842 .map(|ts| ts.seconds);
843
844 let action = rule
845 .custom_attributes
846 .get("rsigma.action")
847 .and_then(|v| v.as_str())
848 .and_then(|s| {
849 s.parse::<crate::correlation_engine::CorrelationAction>()
850 .ok()
851 });
852
853 let event_mode = rule
854 .custom_attributes
855 .get("rsigma.correlation_event_mode")
856 .and_then(|v| v.as_str())
857 .and_then(|s| {
858 s.parse::<crate::correlation_engine::CorrelationEventMode>()
859 .ok()
860 });
861
862 let max_events = rule
863 .custom_attributes
864 .get("rsigma.max_correlation_events")
865 .and_then(|v| v.as_str())
866 .and_then(|s| s.parse::<usize>().ok());
867
868 let custom_attributes = Arc::new(crate::compiler::yaml_to_json_map(&rule.custom_attributes));
869
870 Ok(CompiledCorrelation {
871 id: rule.id.clone(),
872 name: rule.name.clone(),
873 title: rule.title.clone(),
874 level: rule.level,
875 tags: rule.tags.clone(),
876 correlation_type: rule.correlation_type,
877 rule_refs: rule.rules.clone(),
878 group_by,
879 timespan_secs: rule.timespan.seconds,
880 condition,
881 extended_expr,
882 generate: rule.generate,
883 suppress_secs,
884 action,
885 event_mode,
886 max_events,
887 custom_attributes,
888 })
889}
890
891fn compile_condition(
893 cond: &CorrelationCondition,
894 corr_type: CorrelationType,
895) -> Result<(CompiledCondition, Option<ConditionExpr>)> {
896 match cond {
897 CorrelationCondition::Threshold { predicates, field } => Ok((
898 CompiledCondition {
899 field: field.clone(),
900 predicates: predicates
901 .iter()
902 .map(|(op, count)| (*op, *count as f64))
903 .collect(),
904 },
905 None,
906 )),
907 CorrelationCondition::Extended(expr) => {
908 match corr_type {
909 CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
910 Ok((
913 CompiledCondition {
914 field: None,
915 predicates: vec![(ConditionOperator::Gte, 1.0)],
916 },
917 Some(expr.clone()),
918 ))
919 }
920 _ => Err(EvalError::CorrelationError(
921 "Extended conditions are only supported for temporal correlation types"
922 .to_string(),
923 )),
924 }
925 }
926 }
927}
928
929#[cfg(test)]
930mod tests {
931 use super::*;
932 use crate::event::JsonEvent;
933 use serde_json::json;
934
935 #[test]
936 fn test_group_key_extract() {
937 let v = json!({"User": "admin", "Host": "srv01"});
938 let event = JsonEvent::borrow(&v);
939 let group_by = vec![
940 GroupByField::Direct("User".to_string()),
941 GroupByField::Direct("Host".to_string()),
942 ];
943 let key = GroupKey::extract(&event, &group_by, &["rule1"]);
944 assert_eq!(
945 key.0,
946 vec![Some("admin".to_string()), Some("srv01".to_string())]
947 );
948 }
949
950 #[test]
951 fn test_group_key_missing_field() {
952 let v = json!({"User": "admin"});
953 let event = JsonEvent::borrow(&v);
954 let group_by = vec![
955 GroupByField::Direct("User".to_string()),
956 GroupByField::Direct("Host".to_string()),
957 ];
958 let key = GroupKey::extract(&event, &group_by, &["rule1"]);
959 assert_eq!(key.0, vec![Some("admin".to_string()), None]);
960 }
961
962 #[test]
963 fn test_group_key_aliased() {
964 let v = json!({"source.ip": "10.0.0.1"});
965 let event = JsonEvent::borrow(&v);
966 let group_by = vec![GroupByField::Aliased {
967 alias: "internal_ip".to_string(),
968 mapping: HashMap::from([
969 ("rule_a".to_string(), "source.ip".to_string()),
970 ("rule_b".to_string(), "destination.ip".to_string()),
971 ]),
972 }];
973 let key = GroupKey::extract(&event, &group_by, &["rule_a"]);
974 assert_eq!(key.0, vec![Some("10.0.0.1".to_string())]);
975 }
976
977 #[test]
978 fn test_condition_check() {
979 let cond = CompiledCondition {
980 field: None,
981 predicates: vec![(ConditionOperator::Gte, 100.0)],
982 };
983 assert!(!cond.check(99.0));
984 assert!(cond.check(100.0));
985 assert!(cond.check(101.0));
986 }
987
988 #[test]
989 fn test_condition_check_range() {
990 let cond = CompiledCondition {
991 field: None,
992 predicates: vec![
993 (ConditionOperator::Gt, 100.0),
994 (ConditionOperator::Lte, 200.0),
995 ],
996 };
997 assert!(!cond.check(100.0));
998 assert!(cond.check(101.0));
999 assert!(cond.check(200.0));
1000 assert!(!cond.check(201.0));
1001 }
1002
1003 #[test]
1004 fn test_window_event_count() {
1005 let mut state = WindowState::new_for(CorrelationType::EventCount);
1006 for i in 0..5 {
1007 state.push_event_count(1000 + i);
1008 }
1009 let cond = CompiledCondition {
1010 field: None,
1011 predicates: vec![(ConditionOperator::Gte, 5.0)],
1012 };
1013 assert_eq!(
1014 state.check_condition(&cond, CorrelationType::EventCount, &[], None),
1015 Some(5.0)
1016 );
1017 }
1018
1019 #[test]
1020 fn test_window_event_count_eviction() {
1021 let mut state = WindowState::new_for(CorrelationType::EventCount);
1022 for i in 0..10 {
1023 state.push_event_count(1000 + i);
1024 }
1025 state.evict(1005);
1027 let cond = CompiledCondition {
1028 field: None,
1029 predicates: vec![(ConditionOperator::Gte, 5.0)],
1030 };
1031 assert_eq!(
1032 state.check_condition(&cond, CorrelationType::EventCount, &[], None),
1033 Some(5.0)
1034 );
1035 }
1036
1037 #[test]
1038 fn test_window_value_count() {
1039 let mut state = WindowState::new_for(CorrelationType::ValueCount);
1040 state.push_value_count(1000, "user1".to_string());
1041 state.push_value_count(1001, "user2".to_string());
1042 state.push_value_count(1002, "user1".to_string()); state.push_value_count(1003, "user3".to_string());
1044
1045 let cond = CompiledCondition {
1046 field: Some("User".to_string()),
1047 predicates: vec![(ConditionOperator::Gte, 3.0)],
1048 };
1049 assert_eq!(
1050 state.check_condition(&cond, CorrelationType::ValueCount, &[], None),
1051 Some(3.0)
1052 );
1053 }
1054
1055 #[test]
1056 fn test_window_temporal() {
1057 let refs = vec!["rule_a".to_string(), "rule_b".to_string()];
1058 let mut state = WindowState::new_for(CorrelationType::Temporal);
1059 state.push_temporal(1000, "rule_a");
1060 let cond = CompiledCondition {
1062 field: None,
1063 predicates: vec![(ConditionOperator::Gte, 2.0)],
1064 };
1065 assert!(
1066 state
1067 .check_condition(&cond, CorrelationType::Temporal, &refs, None)
1068 .is_none()
1069 );
1070
1071 state.push_temporal(1001, "rule_b");
1073 assert_eq!(
1074 state.check_condition(&cond, CorrelationType::Temporal, &refs, None),
1075 Some(2.0)
1076 );
1077 }
1078
1079 #[test]
1080 fn test_window_temporal_ordered() {
1081 let refs = vec![
1082 "rule_a".to_string(),
1083 "rule_b".to_string(),
1084 "rule_c".to_string(),
1085 ];
1086 let mut state = WindowState::new_for(CorrelationType::TemporalOrdered);
1087 state.push_temporal(1000, "rule_a");
1089 state.push_temporal(1001, "rule_b");
1090 state.push_temporal(1002, "rule_c");
1091
1092 let cond = CompiledCondition {
1093 field: None,
1094 predicates: vec![(ConditionOperator::Gte, 3.0)],
1095 };
1096 assert!(
1097 state
1098 .check_condition(&cond, CorrelationType::TemporalOrdered, &refs, None)
1099 .is_some()
1100 );
1101 }
1102
1103 #[test]
1104 fn test_window_temporal_ordered_wrong_order() {
1105 let refs = vec!["rule_a".to_string(), "rule_b".to_string()];
1106 let mut state = WindowState::new_for(CorrelationType::TemporalOrdered);
1107 state.push_temporal(1000, "rule_b");
1109 state.push_temporal(1001, "rule_a");
1110
1111 let cond = CompiledCondition {
1112 field: None,
1113 predicates: vec![(ConditionOperator::Gte, 2.0)],
1114 };
1115 assert!(
1116 state
1117 .check_condition(&cond, CorrelationType::TemporalOrdered, &refs, None)
1118 .is_none()
1119 );
1120 }
1121
1122 #[test]
1123 fn test_window_value_sum() {
1124 let mut state = WindowState::new_for(CorrelationType::ValueSum);
1125 state.push_numeric(1000, 500.0);
1126 state.push_numeric(1001, 600.0);
1127
1128 let cond = CompiledCondition {
1129 field: Some("bytes_sent".to_string()),
1130 predicates: vec![(ConditionOperator::Gt, 1000.0)],
1131 };
1132 assert_eq!(
1133 state.check_condition(&cond, CorrelationType::ValueSum, &[], None),
1134 Some(1100.0)
1135 );
1136 }
1137
1138 #[test]
1139 fn test_window_value_avg() {
1140 let mut state = WindowState::new_for(CorrelationType::ValueAvg);
1141 state.push_numeric(1000, 100.0);
1142 state.push_numeric(1001, 200.0);
1143 state.push_numeric(1002, 300.0);
1144
1145 let cond = CompiledCondition {
1146 field: Some("bytes".to_string()),
1147 predicates: vec![(ConditionOperator::Gte, 200.0)],
1148 };
1149 assert_eq!(
1150 state.check_condition(&cond, CorrelationType::ValueAvg, &[], None),
1151 Some(200.0)
1152 );
1153 }
1154
1155 #[test]
1156 fn test_window_value_median() {
1157 let mut state = WindowState::new_for(CorrelationType::ValueMedian);
1158 state.push_numeric(1000, 10.0);
1159 state.push_numeric(1001, 20.0);
1160 state.push_numeric(1002, 30.0);
1161
1162 let cond = CompiledCondition {
1163 field: Some("latency".to_string()),
1164 predicates: vec![(ConditionOperator::Gte, 20.0)],
1165 };
1166 assert_eq!(
1167 state.check_condition(&cond, CorrelationType::ValueMedian, &[], None),
1168 Some(20.0)
1169 );
1170 }
1171
1172 #[test]
1173 fn test_compile_correlation_basic() {
1174 use rsigma_parser::parse_sigma_yaml;
1175
1176 let yaml = r#"
1177title: Base Rule
1178id: f305fd62-beca-47da-ad95-7690a0620084
1179logsource:
1180 product: aws
1181 service: cloudtrail
1182detection:
1183 selection:
1184 eventSource: "s3.amazonaws.com"
1185 condition: selection
1186level: low
1187---
1188title: Multiple AWS bucket enumerations
1189id: be246094-01d3-4bba-88de-69e582eba0cc
1190status: experimental
1191correlation:
1192 type: event_count
1193 rules:
1194 - f305fd62-beca-47da-ad95-7690a0620084
1195 group-by:
1196 - userIdentity.arn
1197 timespan: 1h
1198 condition:
1199 gte: 100
1200level: high
1201"#;
1202 let collection = parse_sigma_yaml(yaml).unwrap();
1203 assert_eq!(collection.correlations.len(), 1);
1204
1205 let compiled = compile_correlation(&collection.correlations[0]).unwrap();
1206 assert_eq!(compiled.correlation_type, CorrelationType::EventCount);
1207 assert_eq!(compiled.timespan_secs, 3600);
1208 assert_eq!(compiled.rule_refs.len(), 1);
1209 assert_eq!(compiled.group_by.len(), 1);
1210 assert!(compiled.condition.check(100.0));
1211 assert!(!compiled.condition.check(99.0));
1212 }
1213
1214 #[test]
1219 fn test_eval_temporal_expr_and() {
1220 let mut rule_hits = HashMap::new();
1221 rule_hits.insert("rule_a".to_string(), VecDeque::from([1000]));
1222 rule_hits.insert("rule_b".to_string(), VecDeque::from([1001]));
1223
1224 let expr = ConditionExpr::And(vec![
1225 ConditionExpr::Identifier("rule_a".to_string()),
1226 ConditionExpr::Identifier("rule_b".to_string()),
1227 ]);
1228 assert!(eval_temporal_expr(&expr, &rule_hits));
1229 }
1230
1231 #[test]
1232 fn test_eval_temporal_expr_and_incomplete() {
1233 let mut rule_hits = HashMap::new();
1234 rule_hits.insert("rule_a".to_string(), VecDeque::from([1000]));
1235 let expr = ConditionExpr::And(vec![
1238 ConditionExpr::Identifier("rule_a".to_string()),
1239 ConditionExpr::Identifier("rule_b".to_string()),
1240 ]);
1241 assert!(!eval_temporal_expr(&expr, &rule_hits));
1242 }
1243
1244 #[test]
1245 fn test_eval_temporal_expr_or() {
1246 let mut rule_hits = HashMap::new();
1247 rule_hits.insert("rule_a".to_string(), VecDeque::from([1000]));
1248
1249 let expr = ConditionExpr::Or(vec![
1250 ConditionExpr::Identifier("rule_a".to_string()),
1251 ConditionExpr::Identifier("rule_b".to_string()),
1252 ]);
1253 assert!(eval_temporal_expr(&expr, &rule_hits));
1254 }
1255
1256 #[test]
1257 fn test_eval_temporal_expr_not() {
1258 let rule_hits = HashMap::new();
1259
1260 let expr = ConditionExpr::Not(Box::new(ConditionExpr::Identifier("rule_a".to_string())));
1261 assert!(eval_temporal_expr(&expr, &rule_hits));
1262 }
1263
1264 #[test]
1265 fn test_eval_temporal_expr_complex() {
1266 let mut rule_hits = HashMap::new();
1267 rule_hits.insert("rule_a".to_string(), VecDeque::from([1000]));
1268 rule_hits.insert("rule_b".to_string(), VecDeque::from([1001]));
1269 let expr = ConditionExpr::And(vec![
1273 ConditionExpr::And(vec![
1274 ConditionExpr::Identifier("rule_a".to_string()),
1275 ConditionExpr::Identifier("rule_b".to_string()),
1276 ]),
1277 ConditionExpr::Not(Box::new(ConditionExpr::Identifier("rule_c".to_string()))),
1278 ]);
1279 assert!(eval_temporal_expr(&expr, &rule_hits));
1280 }
1281
1282 #[test]
1283 fn test_check_condition_with_extended_expr() {
1284 let refs = vec!["rule_a".to_string(), "rule_b".to_string()];
1285 let mut state = WindowState::new_for(CorrelationType::Temporal);
1286 state.push_temporal(1000, "rule_a");
1287 state.push_temporal(1001, "rule_b");
1288
1289 let cond = CompiledCondition {
1290 field: None,
1291 predicates: vec![(ConditionOperator::Gte, 1.0)],
1292 };
1293 let expr = ConditionExpr::And(vec![
1294 ConditionExpr::Identifier("rule_a".to_string()),
1295 ConditionExpr::Identifier("rule_b".to_string()),
1296 ]);
1297
1298 assert!(
1300 state
1301 .check_condition(&cond, CorrelationType::Temporal, &refs, Some(&expr))
1302 .is_some()
1303 );
1304
1305 let mut state2 = WindowState::new_for(CorrelationType::Temporal);
1307 state2.push_temporal(1000, "rule_a");
1308 assert!(
1309 state2
1310 .check_condition(&cond, CorrelationType::Temporal, &refs, Some(&expr))
1311 .is_none()
1312 );
1313 }
1314
1315 #[test]
1320 fn test_percentile_linear_interp_single() {
1321 assert!((percentile_linear_interp(&[42.0], 50.0) - 42.0).abs() < f64::EPSILON);
1322 }
1323
1324 #[test]
1325 fn test_percentile_linear_interp_basic() {
1326 let values = &[1.0, 2.0, 3.0, 4.0, 5.0];
1328 assert!((percentile_linear_interp(values, 0.0) - 1.0).abs() < f64::EPSILON);
1330 assert!((percentile_linear_interp(values, 25.0) - 2.0).abs() < f64::EPSILON);
1332 assert!((percentile_linear_interp(values, 50.0) - 3.0).abs() < f64::EPSILON);
1334 assert!((percentile_linear_interp(values, 75.0) - 4.0).abs() < f64::EPSILON);
1336 assert!((percentile_linear_interp(values, 100.0) - 5.0).abs() < f64::EPSILON);
1338 }
1339
1340 #[test]
1341 fn test_percentile_linear_interp_interpolation() {
1342 let values = &[10.0, 20.0, 30.0, 40.0];
1344 assert!((percentile_linear_interp(values, 50.0) - 25.0).abs() < f64::EPSILON);
1346 }
1347
1348 #[test]
1349 fn test_percentile_linear_interp_1st_percentile() {
1350 let values: Vec<f64> = (1..=100).map(|x| x as f64).collect();
1352 let p1 = percentile_linear_interp(&values, 1.0);
1354 assert!((p1 - 1.99).abs() < 0.01);
1355 }
1356
1357 #[test]
1358 fn test_value_percentile_check_condition() {
1359 let mut state = WindowState::new_for(CorrelationType::ValuePercentile);
1360 for i in 1..=100 {
1362 state.push_numeric(1000 + i, i as f64);
1363 }
1364
1365 let cond = CompiledCondition {
1366 field: Some("latency".to_string()),
1367 predicates: vec![(ConditionOperator::Lte, 50.0)],
1369 };
1370 let result = state.check_condition(&cond, CorrelationType::ValuePercentile, &[], None);
1372 assert!(result.is_some());
1373 let val = result.unwrap();
1374 assert!((val - 50.5).abs() < 1.0, "expected ~50.5, got {val}");
1375 }
1376
1377 #[test]
1378 fn test_percentile_0th_and_100th() {
1379 let values = &[5.0, 10.0, 15.0, 20.0];
1380 assert!((percentile_linear_interp(values, 0.0) - 5.0).abs() < f64::EPSILON);
1381 assert!((percentile_linear_interp(values, 100.0) - 20.0).abs() < f64::EPSILON);
1382 }
1383
1384 #[test]
1385 fn test_percentile_two_values() {
1386 let values = &[10.0, 20.0];
1387 assert!((percentile_linear_interp(values, 50.0) - 15.0).abs() < f64::EPSILON);
1389 assert!((percentile_linear_interp(values, 25.0) - 12.5).abs() < f64::EPSILON);
1391 }
1392
1393 #[test]
1394 fn test_percentile_clamps_out_of_range() {
1395 let values = &[1.0, 2.0, 3.0];
1396 assert!((percentile_linear_interp(values, -10.0) - 1.0).abs() < f64::EPSILON);
1398 assert!((percentile_linear_interp(values, 150.0) - 3.0).abs() < f64::EPSILON);
1400 }
1401
1402 #[test]
1403 fn test_value_percentile_empty_window() {
1404 let state = WindowState::new_for(CorrelationType::ValuePercentile);
1405 let cond = CompiledCondition {
1406 field: Some("latency".to_string()),
1407 predicates: vec![(ConditionOperator::Lte, 50.0)],
1408 };
1409 assert!(
1411 state
1412 .check_condition(&cond, CorrelationType::ValuePercentile, &[], None)
1413 .is_none()
1414 );
1415 }
1416
1417 #[test]
1418 fn test_extended_temporal_or_single_rule() {
1419 let mut rule_hits = HashMap::new();
1421 rule_hits.insert("rule_a".to_string(), VecDeque::from([1000]));
1422
1423 let expr = ConditionExpr::Or(vec![
1424 ConditionExpr::Identifier("rule_a".to_string()),
1425 ConditionExpr::Identifier("rule_b".to_string()),
1426 ]);
1427 assert!(eval_temporal_expr(&expr, &rule_hits));
1428 }
1429
1430 #[test]
1431 fn test_extended_temporal_empty_hits() {
1432 let rule_hits = HashMap::new();
1433
1434 let expr = ConditionExpr::And(vec![
1436 ConditionExpr::Identifier("rule_a".to_string()),
1437 ConditionExpr::Identifier("rule_b".to_string()),
1438 ]);
1439 assert!(!eval_temporal_expr(&expr, &rule_hits));
1440
1441 let expr_or = ConditionExpr::Or(vec![
1443 ConditionExpr::Identifier("rule_a".to_string()),
1444 ConditionExpr::Identifier("rule_b".to_string()),
1445 ]);
1446 assert!(!eval_temporal_expr(&expr_or, &rule_hits));
1447 }
1448
1449 #[test]
1450 fn test_extended_temporal_with_empty_deque() {
1451 let mut rule_hits = HashMap::new();
1453 rule_hits.insert("rule_a".to_string(), VecDeque::new());
1454 rule_hits.insert("rule_b".to_string(), VecDeque::from([1000]));
1455
1456 let expr = ConditionExpr::And(vec![
1457 ConditionExpr::Identifier("rule_a".to_string()),
1458 ConditionExpr::Identifier("rule_b".to_string()),
1459 ]);
1460 assert!(!eval_temporal_expr(&expr, &rule_hits));
1462 }
1463
1464 #[test]
1465 fn test_check_condition_temporal_no_extended_expr() {
1466 let refs = vec![
1468 "rule_a".to_string(),
1469 "rule_b".to_string(),
1470 "rule_c".to_string(),
1471 ];
1472 let mut state = WindowState::new_for(CorrelationType::Temporal);
1473 state.push_temporal(1000, "rule_a");
1474 state.push_temporal(1001, "rule_b");
1475
1476 let cond = CompiledCondition {
1478 field: None,
1479 predicates: vec![(ConditionOperator::Gte, 2.0)],
1480 };
1481 assert_eq!(
1483 state.check_condition(&cond, CorrelationType::Temporal, &refs, None),
1484 Some(2.0)
1485 );
1486
1487 let cond3 = CompiledCondition {
1489 field: None,
1490 predicates: vec![(ConditionOperator::Gte, 3.0)],
1491 };
1492 assert!(
1493 state
1494 .check_condition(&cond3, CorrelationType::Temporal, &refs, None)
1495 .is_none()
1496 );
1497 }
1498
1499 #[test]
1504 fn test_event_buffer_push_and_decompress() {
1505 let mut buf = EventBuffer::new(10);
1506 let event = json!({"User": "admin", "action": "login", "src_ip": "10.0.0.1"});
1507 buf.push(1000, &event);
1508
1509 assert_eq!(buf.len(), 1);
1510 assert!(!buf.is_empty());
1511
1512 let events = buf.decompress_all();
1513 assert_eq!(events.len(), 1);
1514 assert_eq!(events[0], event);
1515 }
1516
1517 #[test]
1518 fn test_event_buffer_compression_saves_memory() {
1519 let mut buf = EventBuffer::new(100);
1520 let event = json!({
1522 "User": "admin",
1523 "action": "login",
1524 "src_ip": "192.168.1.100",
1525 "dst_ip": "10.0.0.1",
1526 "EventTime": "2024-07-10T12:30:00Z",
1527 "process": "sshd",
1528 "host": "production-server-01.example.com",
1529 "message": "Accepted password for admin from 192.168.1.100 port 22 ssh2",
1530 "severity": "info",
1531 "tags": ["authentication", "network", "linux"]
1532 });
1533
1534 let raw_size = serde_json::to_vec(&event).unwrap().len();
1535 buf.push(1000, &event);
1536 let compressed_size = buf.compressed_bytes();
1537
1538 assert!(
1540 compressed_size < raw_size,
1541 "Compressed {compressed_size}B should be less than raw {raw_size}B"
1542 );
1543
1544 let events = buf.decompress_all();
1546 assert_eq!(events[0], event);
1547 }
1548
1549 #[test]
1550 fn test_event_buffer_max_events_cap() {
1551 let mut buf = EventBuffer::new(3);
1552
1553 for i in 0..5 {
1554 buf.push(1000 + i, &json!({"idx": i}));
1555 }
1556
1557 assert_eq!(buf.len(), 3);
1559 let events = buf.decompress_all();
1560 assert_eq!(events[0], json!({"idx": 2}));
1561 assert_eq!(events[1], json!({"idx": 3}));
1562 assert_eq!(events[2], json!({"idx": 4}));
1563 }
1564
1565 #[test]
1566 fn test_event_buffer_eviction() {
1567 let mut buf = EventBuffer::new(10);
1568 for i in 0..5 {
1569 buf.push(1000 + i, &json!({"idx": i}));
1570 }
1571 assert_eq!(buf.len(), 5);
1572
1573 buf.evict(1003);
1575 assert_eq!(buf.len(), 2);
1576
1577 let events = buf.decompress_all();
1578 assert_eq!(events[0], json!({"idx": 3}));
1579 assert_eq!(events[1], json!({"idx": 4}));
1580 }
1581
1582 #[test]
1583 fn test_event_buffer_clear() {
1584 let mut buf = EventBuffer::new(10);
1585 buf.push(1000, &json!({"a": 1}));
1586 buf.push(1001, &json!({"b": 2}));
1587 assert_eq!(buf.len(), 2);
1588
1589 buf.clear();
1590 assert!(buf.is_empty());
1591 assert_eq!(buf.len(), 0);
1592 assert_eq!(buf.compressed_bytes(), 0);
1593 }
1594
1595 #[test]
1596 fn test_compress_decompress_roundtrip() {
1597 let values = vec![
1599 json!(null),
1600 json!(42),
1601 json!("hello world"),
1602 json!({"nested": {"deep": [1, 2, 3]}}),
1603 json!([1, "two", null, true, {"five": 5}]),
1604 ];
1605 for val in values {
1606 let compressed = compress_event(&val).unwrap();
1607 let decompressed = decompress_event(&compressed).unwrap();
1608 assert_eq!(decompressed, val, "Roundtrip failed for {val}");
1609 }
1610 }
1611
1612 #[test]
1617 fn test_event_ref_buffer_push_and_refs() {
1618 let mut buf = EventRefBuffer::new(10);
1619 buf.push(1000, &json!({"id": "evt-1", "data": "hello"}));
1620 buf.push(1001, &json!({"_id": 42, "data": "world"}));
1621 buf.push(1002, &json!({"data": "no-id"}));
1622
1623 assert_eq!(buf.len(), 3);
1624 let refs = buf.refs();
1625 assert_eq!(refs[0].timestamp, 1000);
1626 assert_eq!(refs[0].id, Some("evt-1".to_string()));
1627 assert_eq!(refs[1].timestamp, 1001);
1628 assert_eq!(refs[1].id, Some("42".to_string()));
1629 assert_eq!(refs[2].timestamp, 1002);
1630 assert_eq!(refs[2].id, None);
1631 }
1632
1633 #[test]
1634 fn test_event_ref_buffer_max_cap() {
1635 let mut buf = EventRefBuffer::new(3);
1636 for i in 0..5 {
1637 buf.push(1000 + i, &json!({"id": format!("e-{i}")}));
1638 }
1639 assert_eq!(buf.len(), 3);
1640 let refs = buf.refs();
1641 assert_eq!(refs[0].id, Some("e-2".to_string()));
1642 assert_eq!(refs[1].id, Some("e-3".to_string()));
1643 assert_eq!(refs[2].id, Some("e-4".to_string()));
1644 }
1645
1646 #[test]
1647 fn test_event_ref_buffer_eviction() {
1648 let mut buf = EventRefBuffer::new(10);
1649 for i in 0..5 {
1650 buf.push(1000 + i, &json!({"id": format!("e-{i}")}));
1651 }
1652 buf.evict(1003);
1653 assert_eq!(buf.len(), 2);
1654 let refs = buf.refs();
1655 assert_eq!(refs[0].timestamp, 1003);
1656 assert_eq!(refs[1].timestamp, 1004);
1657 }
1658
1659 #[test]
1660 fn test_event_ref_buffer_clear() {
1661 let mut buf = EventRefBuffer::new(10);
1662 buf.push(1000, &json!({"id": "a"}));
1663 buf.push(1001, &json!({"id": "b"}));
1664 assert_eq!(buf.len(), 2);
1665
1666 buf.clear();
1667 assert!(buf.is_empty());
1668 assert_eq!(buf.len(), 0);
1669 }
1670
1671 #[test]
1672 fn test_extract_event_id_common_fields() {
1673 assert_eq!(
1674 extract_event_id(&json!({"id": "abc"})),
1675 Some("abc".to_string())
1676 );
1677 assert_eq!(
1678 extract_event_id(&json!({"_id": 123})),
1679 Some("123".to_string())
1680 );
1681 assert_eq!(
1682 extract_event_id(&json!({"event_id": "x-1"})),
1683 Some("x-1".to_string())
1684 );
1685 assert_eq!(
1686 extract_event_id(&json!({"EventRecordID": 999})),
1687 Some("999".to_string())
1688 );
1689 assert_eq!(extract_event_id(&json!({"no_id_field": true})), None);
1690 }
1691
1692 #[test]
1693 fn test_compile_correlation_with_custom_attributes() {
1694 use rsigma_parser::*;
1695
1696 let mut custom_attributes: HashMap<String, serde_yaml::Value> =
1697 std::collections::HashMap::new();
1698 custom_attributes.insert(
1699 "rsigma.correlation_event_mode".to_string(),
1700 serde_yaml::Value::String("refs".to_string()),
1701 );
1702 custom_attributes.insert(
1703 "rsigma.max_correlation_events".to_string(),
1704 serde_yaml::Value::String("25".to_string()),
1705 );
1706 custom_attributes.insert(
1707 "rsigma.suppress".to_string(),
1708 serde_yaml::Value::String("5m".to_string()),
1709 );
1710 custom_attributes.insert(
1711 "rsigma.action".to_string(),
1712 serde_yaml::Value::String("reset".to_string()),
1713 );
1714
1715 let rule = CorrelationRule {
1716 title: "Test Corr".to_string(),
1717 id: Some("corr-1".to_string()),
1718 name: None,
1719 status: None,
1720 description: None,
1721 author: None,
1722 date: None,
1723 modified: None,
1724 references: vec![],
1725 taxonomy: None,
1726 tags: vec![],
1727 falsepositives: vec![],
1728 level: Some(Level::High),
1729 correlation_type: CorrelationType::EventCount,
1730 rules: vec!["rule-1".to_string()],
1731 group_by: vec!["User".to_string()],
1732 timespan: Timespan::parse("60s").unwrap(),
1733 condition: CorrelationCondition::Threshold {
1734 predicates: vec![(ConditionOperator::Gte, 5)],
1735 field: None,
1736 },
1737 aliases: vec![],
1738 generate: false,
1739 custom_attributes,
1740 };
1741
1742 let compiled = compile_correlation(&rule).unwrap();
1743
1744 assert_eq!(
1746 compiled.event_mode,
1747 Some(crate::correlation_engine::CorrelationEventMode::Refs)
1748 );
1749 assert_eq!(compiled.max_events, Some(25));
1750 assert_eq!(compiled.suppress_secs, Some(300)); assert_eq!(
1752 compiled.action,
1753 Some(crate::correlation_engine::CorrelationAction::Reset)
1754 );
1755 }
1756}