use std::collections::HashMap;
use chrono::{DateTime, TimeZone, Utc};
use serde::Serialize;
use rsigma_parser::{CorrelationRule, CorrelationType, Level, SigmaCollection, SigmaRule};
use crate::correlation::{
CompiledCorrelation, EventBuffer, EventRef, EventRefBuffer, GroupKey, WindowState,
compile_correlation,
};
use crate::engine::Engine;
use crate::error::{EvalError, Result};
use crate::event::Event;
use crate::pipeline::{Pipeline, apply_pipelines, apply_pipelines_to_correlation};
use crate::result::MatchResult;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum CorrelationAction {
#[default]
Alert,
Reset,
}
impl std::str::FromStr for CorrelationAction {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s {
"alert" => Ok(CorrelationAction::Alert),
"reset" => Ok(CorrelationAction::Reset),
_ => Err(format!(
"Unknown correlation action: {s} (expected 'alert' or 'reset')"
)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum CorrelationEventMode {
#[default]
None,
Full,
Refs,
}
impl std::str::FromStr for CorrelationEventMode {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"none" | "off" | "false" => Ok(CorrelationEventMode::None),
"full" | "true" => Ok(CorrelationEventMode::Full),
"refs" | "references" => Ok(CorrelationEventMode::Refs),
_ => Err(format!(
"Unknown correlation event mode: {s} (expected 'none', 'full', or 'refs')"
)),
}
}
}
impl std::fmt::Display for CorrelationEventMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CorrelationEventMode::None => write!(f, "none"),
CorrelationEventMode::Full => write!(f, "full"),
CorrelationEventMode::Refs => write!(f, "refs"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum TimestampFallback {
#[default]
WallClock,
Skip,
}
#[derive(Debug, Clone)]
pub struct CorrelationConfig {
pub timestamp_fields: Vec<String>,
pub timestamp_fallback: TimestampFallback,
pub max_state_entries: usize,
pub suppress: Option<u64>,
pub action_on_match: CorrelationAction,
pub emit_detections: bool,
pub correlation_event_mode: CorrelationEventMode,
pub max_correlation_events: usize,
}
impl Default for CorrelationConfig {
fn default() -> Self {
CorrelationConfig {
timestamp_fields: vec![
"@timestamp".to_string(),
"timestamp".to_string(),
"EventTime".to_string(),
"TimeCreated".to_string(),
"eventTime".to_string(),
],
timestamp_fallback: TimestampFallback::default(),
max_state_entries: 100_000,
suppress: None,
action_on_match: CorrelationAction::default(),
emit_detections: true,
correlation_event_mode: CorrelationEventMode::default(),
max_correlation_events: 10,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct ProcessResult {
pub detections: Vec<MatchResult>,
pub correlations: Vec<CorrelationResult>,
}
#[derive(Debug, Clone, Serialize)]
pub struct CorrelationResult {
pub rule_title: String,
pub rule_id: Option<String>,
pub level: Option<Level>,
pub tags: Vec<String>,
pub correlation_type: CorrelationType,
pub group_key: Vec<(String, String)>,
pub aggregated_value: f64,
pub timespan_secs: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub events: Option<Vec<serde_json::Value>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub event_refs: Option<Vec<EventRef>>,
}
pub struct CorrelationEngine {
engine: Engine,
correlations: Vec<CompiledCorrelation>,
rule_index: HashMap<String, Vec<usize>>,
rule_ids: Vec<(Option<String>, Option<String>)>,
state: HashMap<(usize, GroupKey), WindowState>,
last_alert: HashMap<(usize, GroupKey), i64>,
event_buffers: HashMap<(usize, GroupKey), EventBuffer>,
event_ref_buffers: HashMap<(usize, GroupKey), EventRefBuffer>,
correlation_only_rules: std::collections::HashSet<String>,
config: CorrelationConfig,
pipelines: Vec<Pipeline>,
}
impl CorrelationEngine {
pub fn new(config: CorrelationConfig) -> Self {
CorrelationEngine {
engine: Engine::new(),
correlations: Vec::new(),
rule_index: HashMap::new(),
rule_ids: Vec::new(),
state: HashMap::new(),
last_alert: HashMap::new(),
event_buffers: HashMap::new(),
event_ref_buffers: HashMap::new(),
correlation_only_rules: std::collections::HashSet::new(),
config,
pipelines: Vec::new(),
}
}
pub fn add_pipeline(&mut self, pipeline: Pipeline) {
self.pipelines.push(pipeline);
self.pipelines.sort_by_key(|p| p.priority);
}
pub fn set_include_event(&mut self, include: bool) {
self.engine.set_include_event(include);
}
pub fn set_correlation_event_mode(&mut self, mode: CorrelationEventMode) {
self.config.correlation_event_mode = mode;
}
pub fn set_max_correlation_events(&mut self, max: usize) {
self.config.max_correlation_events = max;
}
pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
if self.pipelines.is_empty() {
self.apply_custom_attributes(&rule.custom_attributes);
self.rule_ids.push((rule.id.clone(), rule.name.clone()));
self.engine.add_rule(rule)?;
} else {
let mut transformed = rule.clone();
apply_pipelines(&self.pipelines, &mut transformed)?;
self.apply_custom_attributes(&transformed.custom_attributes);
self.rule_ids
.push((transformed.id.clone(), transformed.name.clone()));
let compiled = crate::compiler::compile_rule(&transformed)?;
self.engine.add_compiled_rule(compiled);
}
Ok(())
}
fn apply_custom_attributes(&mut self, attrs: &std::collections::HashMap<String, String>) {
if let Some(field) = attrs.get("rsigma.timestamp_field")
&& !self.config.timestamp_fields.contains(field)
{
self.config.timestamp_fields.insert(0, field.clone());
}
if let Some(val) = attrs.get("rsigma.suppress")
&& self.config.suppress.is_none()
&& let Ok(ts) = rsigma_parser::Timespan::parse(val)
{
self.config.suppress = Some(ts.seconds);
}
if let Some(val) = attrs.get("rsigma.action")
&& self.config.action_on_match == CorrelationAction::Alert
&& let Ok(a) = val.parse::<CorrelationAction>()
{
self.config.action_on_match = a;
}
}
pub fn add_correlation(&mut self, corr: &CorrelationRule) -> Result<()> {
let owned;
let effective = if self.pipelines.is_empty() {
corr
} else {
owned = {
let mut c = corr.clone();
apply_pipelines_to_correlation(&self.pipelines, &mut c)?;
c
};
&owned
};
self.apply_custom_attributes(&effective.custom_attributes);
let compiled = compile_correlation(effective)?;
let idx = self.correlations.len();
for rule_ref in &compiled.rule_refs {
self.rule_index
.entry(rule_ref.clone())
.or_default()
.push(idx);
}
if !compiled.generate {
for rule_ref in &compiled.rule_refs {
self.correlation_only_rules.insert(rule_ref.clone());
}
}
self.correlations.push(compiled);
Ok(())
}
pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
for rule in &collection.rules {
self.add_rule(rule)?;
}
for filter in &collection.filters {
self.engine.apply_filter(filter)?;
}
for corr in &collection.correlations {
self.add_correlation(corr)?;
}
self.validate_rule_refs()?;
self.detect_correlation_cycles()?;
Ok(())
}
fn validate_rule_refs(&self) -> Result<()> {
let mut known: std::collections::HashSet<&str> = std::collections::HashSet::new();
for (id, name) in &self.rule_ids {
if let Some(id) = id {
known.insert(id.as_str());
}
if let Some(name) = name {
known.insert(name.as_str());
}
}
for corr in &self.correlations {
if let Some(ref id) = corr.id {
known.insert(id.as_str());
}
if let Some(ref name) = corr.name {
known.insert(name.as_str());
}
}
for corr in &self.correlations {
for rule_ref in &corr.rule_refs {
if !known.contains(rule_ref.as_str()) {
return Err(EvalError::UnknownRuleRef(rule_ref.clone()));
}
}
}
Ok(())
}
fn detect_correlation_cycles(&self) -> Result<()> {
let mut corr_identifiers: HashMap<&str, usize> = HashMap::new();
for (idx, corr) in self.correlations.iter().enumerate() {
if let Some(ref id) = corr.id {
corr_identifiers.insert(id.as_str(), idx);
}
if let Some(ref name) = corr.name {
corr_identifiers.insert(name.as_str(), idx);
}
}
let mut adj: Vec<Vec<usize>> = vec![Vec::new(); self.correlations.len()];
for (idx, corr) in self.correlations.iter().enumerate() {
for rule_ref in &corr.rule_refs {
if let Some(&target_idx) = corr_identifiers.get(rule_ref.as_str()) {
adj[idx].push(target_idx);
}
}
}
let mut state = vec![0u8; self.correlations.len()]; let mut path: Vec<usize> = Vec::new();
for start in 0..self.correlations.len() {
if state[start] == 0
&& let Some(cycle) = Self::dfs_find_cycle(start, &adj, &mut state, &mut path)
{
let names: Vec<String> = cycle
.iter()
.map(|&i| {
self.correlations[i]
.id
.as_deref()
.or(self.correlations[i].name.as_deref())
.unwrap_or(&self.correlations[i].title)
.to_string()
})
.collect();
return Err(crate::error::EvalError::CorrelationCycle(
names.join(" -> "),
));
}
}
Ok(())
}
fn dfs_find_cycle(
node: usize,
adj: &[Vec<usize>],
state: &mut [u8],
path: &mut Vec<usize>,
) -> Option<Vec<usize>> {
state[node] = 1; path.push(node);
for &next in &adj[node] {
if state[next] == 1 {
if let Some(pos) = path.iter().position(|&n| n == next) {
let mut cycle = path[pos..].to_vec();
cycle.push(next); return Some(cycle);
}
}
if state[next] == 0
&& let Some(cycle) = Self::dfs_find_cycle(next, adj, state, path)
{
return Some(cycle);
}
}
path.pop();
state[node] = 2; None
}
pub fn process_event(&mut self, event: &Event) -> ProcessResult {
let all_detections = self.engine.evaluate(event);
let ts = match self.extract_event_timestamp(event) {
Some(ts) => ts,
None => match self.config.timestamp_fallback {
TimestampFallback::WallClock => Utc::now().timestamp(),
TimestampFallback::Skip => {
let detections = self.filter_detections(all_detections);
return ProcessResult {
detections,
correlations: Vec::new(),
};
}
},
};
self.process_with_detections(event, all_detections, ts)
}
pub fn process_event_at(&mut self, event: &Event, timestamp_secs: i64) -> ProcessResult {
let all_detections = self.engine.evaluate(event);
self.process_with_detections(event, all_detections, timestamp_secs)
}
pub fn process_with_detections(
&mut self,
event: &Event,
all_detections: Vec<MatchResult>,
timestamp_secs: i64,
) -> ProcessResult {
let timestamp_secs = timestamp_secs.clamp(0, i64::MAX / 2);
if self.state.len() >= self.config.max_state_entries {
self.evict_all(timestamp_secs);
}
let mut correlations = Vec::new();
self.feed_detections(event, &all_detections, timestamp_secs, &mut correlations);
self.chain_correlations(&correlations, timestamp_secs);
let detections = self.filter_detections(all_detections);
ProcessResult {
detections,
correlations,
}
}
pub fn evaluate(&self, event: &Event) -> Vec<MatchResult> {
self.engine.evaluate(event)
}
pub fn process_batch<'a>(&mut self, events: &[&'a Event<'a>]) -> Vec<ProcessResult> {
let engine = &self.engine;
let ts_fields = &self.config.timestamp_fields;
let batch_results: Vec<(Vec<MatchResult>, Option<i64>)> = {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
events
.par_iter()
.map(|e| {
let detections = engine.evaluate(e);
let ts = extract_event_ts(e, ts_fields);
(detections, ts)
})
.collect()
}
#[cfg(not(feature = "parallel"))]
{
events
.iter()
.map(|e| {
let detections = engine.evaluate(e);
let ts = extract_event_ts(e, ts_fields);
(detections, ts)
})
.collect()
}
};
let mut results = Vec::with_capacity(events.len());
for ((detections, ts_opt), event) in batch_results.into_iter().zip(events) {
match ts_opt {
Some(ts) => {
results.push(self.process_with_detections(event, detections, ts));
}
None => match self.config.timestamp_fallback {
TimestampFallback::WallClock => {
let ts = Utc::now().timestamp();
results.push(self.process_with_detections(event, detections, ts));
}
TimestampFallback::Skip => {
let detections = self.filter_detections(detections);
results.push(ProcessResult {
detections,
correlations: Vec::new(),
});
}
},
}
}
results
}
fn filter_detections(&self, all_detections: Vec<MatchResult>) -> Vec<MatchResult> {
if !self.config.emit_detections && !self.correlation_only_rules.is_empty() {
all_detections
.into_iter()
.filter(|m| {
let id_match = m
.rule_id
.as_ref()
.is_some_and(|id| self.correlation_only_rules.contains(id));
!id_match
})
.collect()
} else {
all_detections
}
}
fn feed_detections(
&mut self,
event: &Event,
detections: &[MatchResult],
ts: i64,
out: &mut Vec<CorrelationResult>,
) {
let mut work: Vec<(usize, Option<String>, Option<String>)> = Vec::new();
for det in detections {
let (rule_id, rule_name) = self.find_rule_identity(det);
let mut corr_indices = Vec::new();
if let Some(ref id) = rule_id
&& let Some(indices) = self.rule_index.get(id)
{
corr_indices.extend(indices);
}
if let Some(ref name) = rule_name
&& let Some(indices) = self.rule_index.get(name)
{
corr_indices.extend(indices);
}
corr_indices.sort_unstable();
corr_indices.dedup();
for &corr_idx in &corr_indices {
work.push((corr_idx, rule_id.clone(), rule_name.clone()));
}
}
for (corr_idx, rule_id, rule_name) in work {
self.update_correlation(corr_idx, event, ts, &rule_id, &rule_name, out);
}
}
fn find_rule_identity(&self, det: &MatchResult) -> (Option<String>, Option<String>) {
if let Some(ref match_id) = det.rule_id {
for (id, name) in &self.rule_ids {
if id.as_deref() == Some(match_id.as_str()) {
return (id.clone(), name.clone());
}
}
}
(det.rule_id.clone(), None)
}
fn resolve_event_mode(&self, corr_idx: usize) -> CorrelationEventMode {
let corr = &self.correlations[corr_idx];
corr.event_mode
.unwrap_or(self.config.correlation_event_mode)
}
fn resolve_max_events(&self, corr_idx: usize) -> usize {
let corr = &self.correlations[corr_idx];
corr.max_events
.unwrap_or(self.config.max_correlation_events)
}
fn update_correlation(
&mut self,
corr_idx: usize,
event: &Event,
ts: i64,
rule_id: &Option<String>,
rule_name: &Option<String>,
out: &mut Vec<CorrelationResult>,
) {
let corr = &self.correlations[corr_idx];
let corr_type = corr.correlation_type;
let timespan = corr.timespan_secs;
let level = corr.level;
let suppress_secs = corr.suppress_secs.or(self.config.suppress);
let action = corr.action.unwrap_or(self.config.action_on_match);
let event_mode = self.resolve_event_mode(corr_idx);
let max_events = self.resolve_max_events(corr_idx);
let mut ref_strs: Vec<&str> = Vec::new();
if let Some(id) = rule_id.as_deref() {
ref_strs.push(id);
}
if let Some(name) = rule_name.as_deref() {
ref_strs.push(name);
}
let rule_ref = rule_id.as_deref().or(rule_name.as_deref()).unwrap_or("");
let group_key = GroupKey::extract(event, &corr.group_by, &ref_strs);
let state_key = (corr_idx, group_key.clone());
let state = self
.state
.entry(state_key.clone())
.or_insert_with(|| WindowState::new_for(corr_type));
let cutoff = ts - timespan as i64;
state.evict(cutoff);
match corr_type {
CorrelationType::EventCount => {
state.push_event_count(ts);
}
CorrelationType::ValueCount => {
if let Some(ref field_name) = corr.condition.field
&& let Some(val) = event.get_field(field_name)
&& let Some(s) = value_to_string_for_count(val)
{
state.push_value_count(ts, s);
}
}
CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
state.push_temporal(ts, rule_ref);
}
CorrelationType::ValueSum
| CorrelationType::ValueAvg
| CorrelationType::ValuePercentile
| CorrelationType::ValueMedian => {
if let Some(ref field_name) = corr.condition.field
&& let Some(val) = event.get_field(field_name)
&& let Some(n) = value_to_f64(val)
{
state.push_numeric(ts, n);
}
}
}
match event_mode {
CorrelationEventMode::Full => {
let buf = self
.event_buffers
.entry(state_key.clone())
.or_insert_with(|| EventBuffer::new(max_events));
buf.evict(cutoff);
buf.push(ts, event.as_value());
}
CorrelationEventMode::Refs => {
let buf = self
.event_ref_buffers
.entry(state_key.clone())
.or_insert_with(|| EventRefBuffer::new(max_events));
buf.evict(cutoff);
buf.push(ts, event.as_value());
}
CorrelationEventMode::None => {}
}
let fired = state.check_condition(
&corr.condition,
corr_type,
&corr.rule_refs,
corr.extended_expr.as_ref(),
);
if let Some(agg_value) = fired {
let alert_key = (corr_idx, group_key.clone());
let suppressed = if let Some(suppress) = suppress_secs {
if let Some(&last_ts) = self.last_alert.get(&alert_key) {
(ts - last_ts) < suppress as i64
} else {
false
}
} else {
false
};
if !suppressed {
let (events, event_refs) = match event_mode {
CorrelationEventMode::Full => {
let stored = self
.event_buffers
.get(&alert_key)
.map(|buf| buf.decompress_all())
.unwrap_or_default();
(Some(stored), None)
}
CorrelationEventMode::Refs => {
let stored = self
.event_ref_buffers
.get(&alert_key)
.map(|buf| buf.refs())
.unwrap_or_default();
(None, Some(stored))
}
CorrelationEventMode::None => (None, None),
};
let corr = &self.correlations[corr_idx];
let result = CorrelationResult {
rule_title: corr.title.clone(),
rule_id: corr.id.clone(),
level,
tags: corr.tags.clone(),
correlation_type: corr_type,
group_key: group_key.to_pairs(&corr.group_by),
aggregated_value: agg_value,
timespan_secs: timespan,
events,
event_refs,
};
out.push(result);
self.last_alert.insert(alert_key.clone(), ts);
if action == CorrelationAction::Reset {
if let Some(state) = self.state.get_mut(&alert_key) {
state.clear();
}
if let Some(buf) = self.event_buffers.get_mut(&alert_key) {
buf.clear();
}
if let Some(buf) = self.event_ref_buffers.get_mut(&alert_key) {
buf.clear();
}
}
}
}
}
fn chain_correlations(&mut self, fired: &[CorrelationResult], ts: i64) {
const MAX_CHAIN_DEPTH: usize = 10;
let mut pending: Vec<CorrelationResult> = fired.to_vec();
let mut depth = 0;
while !pending.is_empty() && depth < MAX_CHAIN_DEPTH {
depth += 1;
#[allow(clippy::type_complexity)]
let mut work: Vec<(usize, Vec<(String, String)>, String)> = Vec::new();
for result in &pending {
if let Some(ref id) = result.rule_id
&& let Some(indices) = self.rule_index.get(id)
{
let fired_ref = result
.rule_id
.as_deref()
.unwrap_or(&result.rule_title)
.to_string();
for &corr_idx in indices {
work.push((corr_idx, result.group_key.clone(), fired_ref.clone()));
}
}
}
let mut next_pending = Vec::new();
for (corr_idx, group_key_pairs, fired_ref) in work {
let corr = &self.correlations[corr_idx];
let corr_type = corr.correlation_type;
let timespan = corr.timespan_secs;
let level = corr.level;
let group_key = GroupKey::from_pairs(&group_key_pairs, &corr.group_by);
let state_key = (corr_idx, group_key.clone());
let state = self
.state
.entry(state_key)
.or_insert_with(|| WindowState::new_for(corr_type));
let cutoff = ts - timespan as i64;
state.evict(cutoff);
match corr_type {
CorrelationType::EventCount => {
state.push_event_count(ts);
}
CorrelationType::Temporal | CorrelationType::TemporalOrdered => {
state.push_temporal(ts, &fired_ref);
}
_ => {
state.push_event_count(ts);
}
}
let fired = state.check_condition(
&corr.condition,
corr_type,
&corr.rule_refs,
corr.extended_expr.as_ref(),
);
if let Some(agg_value) = fired {
let corr = &self.correlations[corr_idx];
next_pending.push(CorrelationResult {
rule_title: corr.title.clone(),
rule_id: corr.id.clone(),
level,
tags: corr.tags.clone(),
correlation_type: corr_type,
group_key: group_key.to_pairs(&corr.group_by),
aggregated_value: agg_value,
timespan_secs: timespan,
events: None,
event_refs: None,
});
}
}
pending = next_pending;
}
if !pending.is_empty() {
log::warn!(
"Correlation chain depth limit reached ({MAX_CHAIN_DEPTH}); \
{} pending result(s) were not propagated further. \
This may indicate a cycle in correlation references.",
pending.len()
);
}
}
fn extract_event_timestamp(&self, event: &Event) -> Option<i64> {
for field_name in &self.config.timestamp_fields {
if let Some(val) = event.get_field(field_name)
&& let Some(ts) = parse_timestamp_value(val)
{
return Some(ts);
}
}
None
}
pub fn evict_expired(&mut self, now_secs: i64) {
self.evict_all(now_secs);
}
fn evict_all(&mut self, now_secs: i64) {
let timespans: Vec<u64> = self.correlations.iter().map(|c| c.timespan_secs).collect();
self.state.retain(|&(corr_idx, _), state| {
if corr_idx < timespans.len() {
let cutoff = now_secs - timespans[corr_idx] as i64;
state.evict(cutoff);
}
!state.is_empty()
});
self.event_buffers.retain(|&(corr_idx, _), buf| {
if corr_idx < timespans.len() {
let cutoff = now_secs - timespans[corr_idx] as i64;
buf.evict(cutoff);
}
!buf.is_empty()
});
self.event_ref_buffers.retain(|&(corr_idx, _), buf| {
if corr_idx < timespans.len() {
let cutoff = now_secs - timespans[corr_idx] as i64;
buf.evict(cutoff);
}
!buf.is_empty()
});
if self.state.len() >= self.config.max_state_entries {
let target = self.config.max_state_entries * 9 / 10;
let excess = self.state.len() - target;
let mut by_staleness: Vec<_> = self
.state
.iter()
.map(|(k, v)| (k.clone(), v.latest_timestamp().unwrap_or(i64::MIN)))
.collect();
by_staleness.sort_unstable_by_key(|&(_, ts)| ts);
for (key, _) in by_staleness.into_iter().take(excess) {
self.state.remove(&key);
self.last_alert.remove(&key);
self.event_buffers.remove(&key);
self.event_ref_buffers.remove(&key);
}
}
self.last_alert.retain(|key, &mut alert_ts| {
let suppress = if key.0 < self.correlations.len() {
self.correlations[key.0]
.suppress_secs
.or(self.config.suppress)
.unwrap_or(0)
} else {
0
};
(now_secs - alert_ts) < suppress as i64
});
}
pub fn state_count(&self) -> usize {
self.state.len()
}
pub fn detection_rule_count(&self) -> usize {
self.engine.rule_count()
}
pub fn correlation_rule_count(&self) -> usize {
self.correlations.len()
}
pub fn event_buffer_count(&self) -> usize {
self.event_buffers.len()
}
pub fn event_buffer_bytes(&self) -> usize {
self.event_buffers
.values()
.map(|b| b.compressed_bytes())
.sum()
}
pub fn event_ref_buffer_count(&self) -> usize {
self.event_ref_buffers.len()
}
pub fn engine(&self) -> &Engine {
&self.engine
}
pub fn export_state(&self) -> CorrelationSnapshot {
let mut windows: HashMap<String, Vec<(GroupKey, WindowState)>> = HashMap::new();
for ((idx, gk), ws) in &self.state {
let corr_id = self.correlation_stable_id(*idx);
windows
.entry(corr_id)
.or_default()
.push((gk.clone(), ws.clone()));
}
let mut last_alert: HashMap<String, Vec<(GroupKey, i64)>> = HashMap::new();
for ((idx, gk), ts) in &self.last_alert {
let corr_id = self.correlation_stable_id(*idx);
last_alert
.entry(corr_id)
.or_default()
.push((gk.clone(), *ts));
}
let mut event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>> = HashMap::new();
for ((idx, gk), buf) in &self.event_buffers {
let corr_id = self.correlation_stable_id(*idx);
event_buffers
.entry(corr_id)
.or_default()
.push((gk.clone(), buf.clone()));
}
let mut event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>> =
HashMap::new();
for ((idx, gk), buf) in &self.event_ref_buffers {
let corr_id = self.correlation_stable_id(*idx);
event_ref_buffers
.entry(corr_id)
.or_default()
.push((gk.clone(), buf.clone()));
}
CorrelationSnapshot {
version: SNAPSHOT_VERSION,
windows,
last_alert,
event_buffers,
event_ref_buffers,
}
}
pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
if snapshot.version != SNAPSHOT_VERSION {
return false;
}
let id_to_idx = self.build_id_to_index_map();
for (corr_id, groups) in snapshot.windows {
if let Some(&idx) = id_to_idx.get(&corr_id) {
for (gk, ws) in groups {
self.state.insert((idx, gk), ws);
}
}
}
for (corr_id, groups) in snapshot.last_alert {
if let Some(&idx) = id_to_idx.get(&corr_id) {
for (gk, ts) in groups {
self.last_alert.insert((idx, gk), ts);
}
}
}
for (corr_id, groups) in snapshot.event_buffers {
if let Some(&idx) = id_to_idx.get(&corr_id) {
for (gk, buf) in groups {
self.event_buffers.insert((idx, gk), buf);
}
}
}
for (corr_id, groups) in snapshot.event_ref_buffers {
if let Some(&idx) = id_to_idx.get(&corr_id) {
for (gk, buf) in groups {
self.event_ref_buffers.insert((idx, gk), buf);
}
}
}
true
}
fn correlation_stable_id(&self, idx: usize) -> String {
let corr = &self.correlations[idx];
corr.id
.clone()
.or_else(|| corr.name.clone())
.unwrap_or_else(|| corr.title.clone())
}
fn build_id_to_index_map(&self) -> HashMap<String, usize> {
self.correlations
.iter()
.enumerate()
.map(|(idx, _)| (self.correlation_stable_id(idx), idx))
.collect()
}
}
const SNAPSHOT_VERSION: u32 = 1;
#[derive(Debug, Clone, Serialize, serde::Deserialize)]
pub struct CorrelationSnapshot {
#[serde(default = "default_snapshot_version")]
pub version: u32,
pub windows: HashMap<String, Vec<(GroupKey, WindowState)>>,
pub last_alert: HashMap<String, Vec<(GroupKey, i64)>>,
pub event_buffers: HashMap<String, Vec<(GroupKey, EventBuffer)>>,
pub event_ref_buffers: HashMap<String, Vec<(GroupKey, EventRefBuffer)>>,
}
fn default_snapshot_version() -> u32 {
1
}
impl Default for CorrelationEngine {
fn default() -> Self {
Self::new(CorrelationConfig::default())
}
}
fn extract_event_ts(event: &Event, timestamp_fields: &[String]) -> Option<i64> {
for field_name in timestamp_fields {
if let Some(val) = event.get_field(field_name)
&& let Some(ts) = parse_timestamp_value(val)
{
return Some(ts);
}
}
None
}
fn parse_timestamp_value(val: &serde_json::Value) -> Option<i64> {
match val {
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
Some(normalize_epoch(i))
} else {
n.as_f64().map(|f| normalize_epoch(f as i64))
}
}
serde_json::Value::String(s) => parse_timestamp_string(s),
_ => None,
}
}
fn normalize_epoch(v: i64) -> i64 {
if v > 1_000_000_000_000 { v / 1000 } else { v }
}
fn parse_timestamp_string(s: &str) -> Option<i64> {
if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
return Some(dt.timestamp());
}
if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
return Some(Utc.from_utc_datetime(&naive).timestamp());
}
if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
return Some(Utc.from_utc_datetime(&naive).timestamp());
}
if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
return Some(Utc.from_utc_datetime(&naive).timestamp());
}
if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
return Some(Utc.from_utc_datetime(&naive).timestamp());
}
None
}
fn value_to_string_for_count(v: &serde_json::Value) -> Option<String> {
match v {
serde_json::Value::String(s) => Some(s.clone()),
serde_json::Value::Number(n) => Some(n.to_string()),
serde_json::Value::Bool(b) => Some(b.to_string()),
serde_json::Value::Null => Some("null".to_string()),
_ => None,
}
}
fn value_to_f64(v: &serde_json::Value) -> Option<f64> {
match v {
serde_json::Value::Number(n) => n.as_f64(),
serde_json::Value::String(s) => s.parse().ok(),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use rsigma_parser::parse_sigma_yaml;
use serde_json::json;
#[test]
fn test_parse_timestamp_epoch_secs() {
let val = json!(1720612200);
assert_eq!(parse_timestamp_value(&val), Some(1720612200));
}
#[test]
fn test_parse_timestamp_epoch_millis() {
let val = json!(1720612200000i64);
assert_eq!(parse_timestamp_value(&val), Some(1720612200));
}
#[test]
fn test_parse_timestamp_rfc3339() {
let val = json!("2024-07-10T12:30:00Z");
let ts = parse_timestamp_value(&val).unwrap();
assert_eq!(ts, 1720614600);
}
#[test]
fn test_parse_timestamp_naive() {
let val = json!("2024-07-10T12:30:00");
let ts = parse_timestamp_value(&val).unwrap();
assert_eq!(ts, 1720614600);
}
#[test]
fn test_parse_timestamp_with_space() {
let val = json!("2024-07-10 12:30:00");
let ts = parse_timestamp_value(&val).unwrap();
assert_eq!(ts, 1720614600);
}
#[test]
fn test_parse_timestamp_fractional() {
let val = json!("2024-07-10T12:30:00.123Z");
let ts = parse_timestamp_value(&val).unwrap();
assert_eq!(ts, 1720614600);
}
#[test]
fn test_extract_timestamp_from_event() {
let config = CorrelationConfig {
timestamp_fields: vec!["@timestamp".to_string()],
max_state_entries: 100_000,
..Default::default()
};
let engine = CorrelationEngine::new(config);
let v = json!({"@timestamp": "2024-07-10T12:30:00Z", "data": "test"});
let event = Event::from_value(&v);
let ts = engine.extract_event_timestamp(&event);
assert_eq!(ts, Some(1720614600));
}
#[test]
fn test_extract_timestamp_fallback_fields() {
let config = CorrelationConfig {
timestamp_fields: vec![
"@timestamp".to_string(),
"timestamp".to_string(),
"EventTime".to_string(),
],
max_state_entries: 100_000,
..Default::default()
};
let engine = CorrelationEngine::new(config);
let v = json!({"timestamp": 1720613400, "data": "test"});
let event = Event::from_value(&v);
let ts = engine.extract_event_timestamp(&event);
assert_eq!(ts, Some(1720613400));
}
#[test]
fn test_extract_timestamp_returns_none_when_missing() {
let config = CorrelationConfig {
timestamp_fields: vec!["@timestamp".to_string()],
..Default::default()
};
let engine = CorrelationEngine::new(config);
let v = json!({"data": "no timestamp here"});
let event = Event::from_value(&v);
assert_eq!(engine.extract_event_timestamp(&event), None);
}
#[test]
fn test_timestamp_fallback_skip() {
let yaml = r#"
title: test rule
id: ts-skip-rule
logsource:
product: test
detection:
selection:
action: click
condition: selection
level: low
---
title: test correlation
correlation:
type: event_count
rules:
- ts-skip-rule
group-by:
- User
timespan: 10s
condition:
gte: 2
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig {
timestamp_fallback: TimestampFallback::Skip,
..Default::default()
});
engine.add_collection(&collection).unwrap();
assert_eq!(engine.correlation_rule_count(), 1);
let v = json!({"action": "click", "User": "alice"});
let event = Event::from_value(&v);
let r1 = engine.process_event(&event);
assert!(!r1.detections.is_empty(), "detection should still fire");
let r2 = engine.process_event(&event);
assert!(!r2.detections.is_empty(), "detection should still fire");
let r3 = engine.process_event(&event);
assert!(!r3.detections.is_empty(), "detection should still fire");
assert!(r1.correlations.is_empty());
assert!(r2.correlations.is_empty());
assert!(r3.correlations.is_empty());
}
#[test]
fn test_timestamp_fallback_wallclock_default() {
let yaml = r#"
title: test rule
id: ts-wc-rule
logsource:
product: test
detection:
selection:
action: click
condition: selection
level: low
---
title: test correlation
correlation:
type: event_count
rules:
- ts-wc-rule
group-by:
- User
timespan: 60s
condition:
gte: 2
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
assert_eq!(engine.correlation_rule_count(), 1);
let v = json!({"action": "click", "User": "alice"});
let event = Event::from_value(&v);
let _r1 = engine.process_event(&event);
let _r2 = engine.process_event(&event);
let r3 = engine.process_event(&event);
assert!(
!r3.correlations.is_empty(),
"WallClock fallback should allow correlation"
);
}
#[test]
fn test_event_count_basic() {
let yaml = r#"
title: Base Rule
id: base-rule-001
name: base_rule
logsource:
product: windows
category: process_creation
detection:
selection:
CommandLine|contains: 'whoami'
condition: selection
level: low
---
title: Multiple Whoami
id: corr-001
correlation:
type: event_count
rules:
- base-rule-001
group-by:
- User
timespan: 60s
condition:
gte: 3
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
assert_eq!(engine.detection_rule_count(), 1);
assert_eq!(engine.correlation_rule_count(), 1);
let base_ts = 1000i64;
for i in 0..3 {
let v = json!({"CommandLine": "whoami", "User": "admin"});
let event = Event::from_value(&v);
let result = engine.process_event_at(&event, base_ts + i * 10);
assert_eq!(result.detections.len(), 1);
if i < 2 {
assert!(result.correlations.is_empty());
} else {
assert_eq!(result.correlations.len(), 1);
assert_eq!(result.correlations[0].rule_title, "Multiple Whoami");
assert_eq!(result.correlations[0].aggregated_value, 3.0);
}
}
}
#[test]
fn test_event_count_different_groups() {
let yaml = r#"
title: Login
id: login-001
logsource:
category: auth
detection:
selection:
EventType: login
condition: selection
level: low
---
title: Many Logins
id: corr-login
correlation:
type: event_count
rules:
- login-001
group-by:
- User
timespan: 60s
condition:
gte: 3
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ts = 1000i64;
for i in 0..2 {
let v = json!({"EventType": "login", "User": "alice"});
let event = Event::from_value(&v);
let r = engine.process_event_at(&event, ts + i);
assert!(r.correlations.is_empty());
}
for i in 0..3 {
let v = json!({"EventType": "login", "User": "bob"});
let event = Event::from_value(&v);
let r = engine.process_event_at(&event, ts + i);
if i == 2 {
assert_eq!(r.correlations.len(), 1);
assert_eq!(
r.correlations[0].group_key,
vec![("User".to_string(), "bob".to_string())]
);
}
}
}
#[test]
fn test_event_count_window_expiry() {
let yaml = r#"
title: Base
id: base-002
logsource:
category: test
detection:
selection:
action: click
condition: selection
---
title: Rapid Clicks
id: corr-002
correlation:
type: event_count
rules:
- base-002
group-by:
- User
timespan: 10s
condition:
gte: 3
level: medium
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let v = json!({"action": "click", "User": "admin"});
let event = Event::from_value(&v);
engine.process_event_at(&event, 0);
engine.process_event_at(&event, 1);
let r = engine.process_event_at(&event, 15);
assert!(r.correlations.is_empty());
}
#[test]
fn test_value_count() {
let yaml = r#"
title: Failed Login
id: failed-login-001
logsource:
category: auth
detection:
selection:
EventType: failed_login
condition: selection
level: low
---
title: Failed Logins From Many Users
id: corr-vc-001
correlation:
type: value_count
rules:
- failed-login-001
group-by:
- Host
timespan: 60s
condition:
field: User
gte: 3
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ts = 1000i64;
for (i, user) in ["alice", "bob", "charlie"].iter().enumerate() {
let v = json!({"EventType": "failed_login", "Host": "srv01", "User": user});
let event = Event::from_value(&v);
let r = engine.process_event_at(&event, ts + i as i64);
if i == 2 {
assert_eq!(r.correlations.len(), 1);
assert_eq!(r.correlations[0].aggregated_value, 3.0);
}
}
}
#[test]
fn test_temporal() {
let yaml = r#"
title: Recon A
id: recon-a
name: recon_a
logsource:
category: process
detection:
selection:
CommandLine|contains: 'whoami'
condition: selection
---
title: Recon B
id: recon-b
name: recon_b
logsource:
category: process
detection:
selection:
CommandLine|contains: 'ipconfig'
condition: selection
---
title: Recon Combo
id: corr-temporal
correlation:
type: temporal
rules:
- recon-a
- recon-b
group-by:
- User
timespan: 60s
condition:
gte: 2
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ts = 1000i64;
let v1 = json!({"CommandLine": "whoami", "User": "admin"});
let ev1 = Event::from_value(&v1);
let r1 = engine.process_event_at(&ev1, ts);
assert!(r1.correlations.is_empty());
let v2 = json!({"CommandLine": "ipconfig /all", "User": "admin"});
let ev2 = Event::from_value(&v2);
let r2 = engine.process_event_at(&ev2, ts + 10);
assert_eq!(r2.correlations.len(), 1);
assert_eq!(r2.correlations[0].rule_title, "Recon Combo");
}
#[test]
fn test_temporal_ordered() {
let yaml = r#"
title: Failed Login
id: failed-001
name: failed_login
logsource:
category: auth
detection:
selection:
EventType: failed_login
condition: selection
---
title: Success Login
id: success-001
name: successful_login
logsource:
category: auth
detection:
selection:
EventType: success_login
condition: selection
---
title: Brute Force Then Login
id: corr-bf
correlation:
type: temporal_ordered
rules:
- failed-001
- success-001
group-by:
- User
timespan: 60s
condition:
gte: 2
level: critical
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ts = 1000i64;
let v1 = json!({"EventType": "failed_login", "User": "admin"});
let ev1 = Event::from_value(&v1);
let r1 = engine.process_event_at(&ev1, ts);
assert!(r1.correlations.is_empty());
let v2 = json!({"EventType": "success_login", "User": "admin"});
let ev2 = Event::from_value(&v2);
let r2 = engine.process_event_at(&ev2, ts + 10);
assert_eq!(r2.correlations.len(), 1);
}
#[test]
fn test_temporal_ordered_wrong_order() {
let yaml = r#"
title: Rule A
id: rule-a
logsource:
category: test
detection:
selection:
type: a
condition: selection
---
title: Rule B
id: rule-b
logsource:
category: test
detection:
selection:
type: b
condition: selection
---
title: A then B
id: corr-ab
correlation:
type: temporal_ordered
rules:
- rule-a
- rule-b
group-by:
- User
timespan: 60s
condition:
gte: 2
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ts = 1000i64;
let v1 = json!({"type": "b", "User": "admin"});
let ev1 = Event::from_value(&v1);
engine.process_event_at(&ev1, ts);
let v2 = json!({"type": "a", "User": "admin"});
let ev2 = Event::from_value(&v2);
let r2 = engine.process_event_at(&ev2, ts + 10);
assert!(r2.correlations.is_empty());
}
#[test]
fn test_value_sum() {
let yaml = r#"
title: Web Access
id: web-001
logsource:
category: web
detection:
selection:
action: upload
condition: selection
---
title: Large Upload
id: corr-sum
correlation:
type: value_sum
rules:
- web-001
group-by:
- User
timespan: 60s
condition:
field: bytes_sent
gt: 1000
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ts = 1000i64;
let v1 = json!({"action": "upload", "User": "alice", "bytes_sent": 600});
let ev1 = Event::from_value(&v1);
let r1 = engine.process_event_at(&ev1, ts);
assert!(r1.correlations.is_empty());
let v2 = json!({"action": "upload", "User": "alice", "bytes_sent": 500});
let ev2 = Event::from_value(&v2);
let r2 = engine.process_event_at(&ev2, ts + 5);
assert_eq!(r2.correlations.len(), 1);
assert!((r2.correlations[0].aggregated_value - 1100.0).abs() < f64::EPSILON);
}
#[test]
fn test_value_avg() {
let yaml = r#"
title: Request
id: req-001
logsource:
category: web
detection:
selection:
type: request
condition: selection
---
title: High Avg Latency
id: corr-avg
correlation:
type: value_avg
rules:
- req-001
group-by:
- Service
timespan: 60s
condition:
field: latency_ms
gt: 500
level: medium
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ts = 1000i64;
for (i, latency) in [400, 600, 800].iter().enumerate() {
let v = json!({"type": "request", "Service": "api", "latency_ms": latency});
let event = Event::from_value(&v);
let r = engine.process_event_at(&event, ts + i as i64);
if i == 2 {
assert_eq!(r.correlations.len(), 1);
assert!((r.correlations[0].aggregated_value - 600.0).abs() < f64::EPSILON);
}
}
}
#[test]
fn test_state_count() {
let yaml = r#"
title: Base
id: base-sc
logsource:
category: test
detection:
selection:
action: test
condition: selection
---
title: Count
id: corr-sc
correlation:
type: event_count
rules:
- base-sc
group-by:
- User
timespan: 60s
condition:
gte: 100
level: low
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let v = json!({"action": "test", "User": "alice"});
let event = Event::from_value(&v);
engine.process_event_at(&event, 1000);
assert_eq!(engine.state_count(), 1);
let v2 = json!({"action": "test", "User": "bob"});
let event2 = Event::from_value(&v2);
engine.process_event_at(&event2, 1001);
assert_eq!(engine.state_count(), 2);
engine.evict_expired(2000);
assert_eq!(engine.state_count(), 0);
}
#[test]
fn test_generate_flag_default_false() {
let yaml = r#"
title: Base
id: gen-base
logsource:
category: test
detection:
selection:
action: test
condition: selection
---
title: Correlation
id: gen-corr
correlation:
type: event_count
rules:
- gen-base
group-by:
- User
timespan: 60s
condition:
gte: 1
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let v = json!({"action": "test", "User": "alice"});
let event = Event::from_value(&v);
let r = engine.process_event_at(&event, 1000);
assert_eq!(r.detections.len(), 1);
assert_eq!(r.correlations.len(), 1);
}
#[test]
fn test_aws_bucket_enumeration() {
let yaml = r#"
title: Potential Bucket Enumeration on AWS
id: f305fd62-beca-47da-ad95-7690a0620084
logsource:
product: aws
service: cloudtrail
detection:
selection:
eventSource: "s3.amazonaws.com"
eventName: "ListBuckets"
condition: selection
level: low
---
title: Multiple AWS bucket enumerations
id: be246094-01d3-4bba-88de-69e582eba0cc
status: experimental
correlation:
type: event_count
rules:
- f305fd62-beca-47da-ad95-7690a0620084
group-by:
- userIdentity.arn
timespan: 1h
condition:
gte: 5
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let base_ts = 1_700_000_000i64;
for i in 0..5 {
let v = json!({
"eventSource": "s3.amazonaws.com",
"eventName": "ListBuckets",
"userIdentity.arn": "arn:aws:iam::123456789:user/attacker"
});
let event = Event::from_value(&v);
let r = engine.process_event_at(&event, base_ts + i * 60);
if i == 4 {
assert_eq!(r.correlations.len(), 1);
assert_eq!(
r.correlations[0].rule_title,
"Multiple AWS bucket enumerations"
);
assert_eq!(r.correlations[0].aggregated_value, 5.0);
}
}
}
#[test]
fn test_chaining_event_count_to_temporal() {
let yaml = r#"
title: Single failed login
id: failed-login-chain
name: failed_login
logsource:
category: auth
detection:
selection:
EventType: failed_login
condition: selection
---
title: Successful login
id: success-login-chain
name: successful_login
logsource:
category: auth
detection:
selection:
EventType: success_login
condition: selection
---
title: Multiple failed logins
id: many-failed-chain
name: multiple_failed_login
correlation:
type: event_count
rules:
- failed-login-chain
group-by:
- User
timespan: 60s
condition:
gte: 3
level: medium
---
title: Brute Force Followed by Login
id: brute-force-chain
correlation:
type: temporal_ordered
rules:
- many-failed-chain
- success-login-chain
group-by:
- User
timespan: 120s
condition:
gte: 2
level: critical
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
assert_eq!(engine.detection_rule_count(), 2);
assert_eq!(engine.correlation_rule_count(), 2);
let ts = 1000i64;
for i in 0..3 {
let v = json!({"EventType": "failed_login", "User": "victim"});
let event = Event::from_value(&v);
let r = engine.process_event_at(&event, ts + i);
if i == 2 {
assert!(
r.correlations
.iter()
.any(|c| c.rule_title == "Multiple failed logins"),
"Expected event_count correlation to fire"
);
}
}
let v = json!({"EventType": "success_login", "User": "victim"});
let event = Event::from_value(&v);
let r = engine.process_event_at(&event, ts + 30);
assert_eq!(r.detections.len(), 1);
assert_eq!(r.detections[0].rule_title, "Successful login");
}
#[test]
fn test_field_aliases() {
let yaml = r#"
title: Internal Error
id: internal-error-001
name: internal_error
logsource:
category: web
detection:
selection:
http.response.status_code: 500
condition: selection
---
title: New Connection
id: new-conn-001
name: new_network_connection
logsource:
category: network
detection:
selection:
event.type: connection
condition: selection
---
title: Error Then Connection
id: corr-alias
correlation:
type: temporal
rules:
- internal-error-001
- new-conn-001
group-by:
- internal_ip
timespan: 60s
condition:
gte: 2
aliases:
internal_ip:
internal_error: destination.ip
new_network_connection: source.ip
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ts = 1000i64;
let v1 = json!({
"http.response.status_code": 500,
"destination.ip": "10.0.0.5"
});
let ev1 = Event::from_value(&v1);
let r1 = engine.process_event_at(&ev1, ts);
assert_eq!(r1.detections.len(), 1);
assert!(r1.correlations.is_empty());
let v2 = json!({
"event.type": "connection",
"source.ip": "10.0.0.5"
});
let ev2 = Event::from_value(&v2);
let r2 = engine.process_event_at(&ev2, ts + 5);
assert_eq!(r2.detections.len(), 1);
assert_eq!(r2.correlations.len(), 1);
assert_eq!(r2.correlations[0].rule_title, "Error Then Connection");
assert!(
r2.correlations[0]
.group_key
.iter()
.any(|(k, v)| k == "internal_ip" && v == "10.0.0.5")
);
}
#[test]
fn test_value_percentile() {
let yaml = r#"
title: Process Creation
id: proc-001
logsource:
category: process
detection:
selection:
type: process_creation
condition: selection
---
title: Rare Process
id: corr-percentile
correlation:
type: value_percentile
rules:
- proc-001
group-by:
- ComputerName
timespan: 60s
condition:
field: image
lte: 50
level: medium
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ts = 1000i64;
for (i, val) in [10.0, 20.0, 30.0, 40.0, 50.0].iter().enumerate() {
let v = json!({"type": "process_creation", "ComputerName": "srv01", "image": val});
let event = Event::from_value(&v);
let _ = engine.process_event_at(&event, ts + i as i64);
}
}
#[test]
fn test_extended_temporal_and_condition() {
let yaml = r#"
title: Login Attempt
id: login-attempt
logsource:
category: auth
detection:
selection:
EventType: login_failure
condition: selection
---
title: Password Change
id: password-change
logsource:
category: auth
detection:
selection:
EventType: password_change
condition: selection
---
title: Credential Attack
correlation:
type: temporal
rules:
- login-attempt
- password-change
group-by:
- User
timespan: 300s
condition: login-attempt and password-change
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ts = 1000i64;
let ev1 = json!({"EventType": "login_failure", "User": "alice"});
let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
assert!(r1.correlations.is_empty(), "only one rule fired so far");
let ev2 = json!({"EventType": "password_change", "User": "alice"});
let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 10);
assert_eq!(
r2.correlations.len(),
1,
"temporal correlation should fire: both rules matched"
);
assert_eq!(r2.correlations[0].rule_title, "Credential Attack");
}
#[test]
fn test_extended_temporal_or_condition() {
let yaml = r#"
title: SSH Login
id: ssh-login
logsource:
category: auth
detection:
selection:
EventType: ssh_login
condition: selection
---
title: VPN Login
id: vpn-login
logsource:
category: auth
detection:
selection:
EventType: vpn_login
condition: selection
---
title: Any Remote Access
correlation:
type: temporal
rules:
- ssh-login
- vpn-login
group-by:
- User
timespan: 60s
condition: ssh-login or vpn-login
level: medium
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ev = json!({"EventType": "ssh_login", "User": "bob"});
let r = engine.process_event_at(&Event::from_value(&ev), 1000);
assert_eq!(r.correlations.len(), 1);
assert_eq!(r.correlations[0].rule_title, "Any Remote Access");
}
#[test]
fn test_extended_temporal_partial_and_no_fire() {
let yaml = r#"
title: Recon Step 1
id: recon-1
logsource:
category: process
detection:
selection:
CommandLine|contains: 'whoami'
condition: selection
---
title: Recon Step 2
id: recon-2
logsource:
category: process
detection:
selection:
CommandLine|contains: 'ipconfig'
condition: selection
---
title: Full Recon
correlation:
type: temporal
rules:
- recon-1
- recon-2
group-by:
- Host
timespan: 120s
condition: recon-1 and recon-2
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ev = json!({"CommandLine": "whoami", "Host": "srv01"});
let r = engine.process_event_at(&Event::from_value(&ev), 1000);
assert!(r.correlations.is_empty(), "only one of two AND rules fired");
let ev2 = json!({"CommandLine": "ipconfig /all", "Host": "srv01"});
let r2 = engine.process_event_at(&Event::from_value(&ev2), 1010);
assert_eq!(r2.correlations.len(), 1);
assert_eq!(r2.correlations[0].rule_title, "Full Recon");
}
#[test]
fn test_filter_with_correlation() {
let yaml = r#"
title: Failed Auth
id: failed-auth
logsource:
category: auth
detection:
selection:
EventType: auth_failure
condition: selection
---
title: Exclude Service Accounts
filter:
rules:
- failed-auth
selection:
User|startswith: 'svc_'
condition: selection
---
title: Brute Force
correlation:
type: event_count
rules:
- failed-auth
group-by:
- User
timespan: 300s
condition:
gte: 3
level: critical
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ts = 1000i64;
for i in 0..5 {
let ev = json!({"EventType": "auth_failure", "User": "svc_backup"});
let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
assert!(
r.correlations.is_empty(),
"service account should be filtered, no correlation"
);
}
for i in 0..2 {
let ev = json!({"EventType": "auth_failure", "User": "alice"});
let r = engine.process_event_at(&Event::from_value(&ev), ts + 10 + i);
assert!(r.correlations.is_empty(), "not yet 3 events");
}
let ev = json!({"EventType": "auth_failure", "User": "alice"});
let r = engine.process_event_at(&Event::from_value(&ev), ts + 12);
assert_eq!(r.correlations.len(), 1);
assert_eq!(r.correlations[0].rule_title, "Brute Force");
}
#[test]
fn test_repeat_rules_in_correlation() {
let yaml = r#"
title: File Access A
id: file-a
logsource:
category: file_access
detection:
selection:
FileName|endswith: '.docx'
condition: selection
---
action: repeat
title: File Access B
id: file-b
detection:
selection:
FileName|endswith: '.xlsx'
condition: selection
---
title: Mass File Access
correlation:
type: event_count
rules:
- file-a
- file-b
group-by:
- User
timespan: 60s
condition:
gte: 3
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
assert_eq!(collection.rules.len(), 2);
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
assert_eq!(engine.detection_rule_count(), 2);
let ts = 1000i64;
let ev1 = json!({"FileName": "report.docx", "User": "bob"});
engine.process_event_at(&Event::from_value(&ev1), ts);
let ev2 = json!({"FileName": "data.xlsx", "User": "bob"});
engine.process_event_at(&Event::from_value(&ev2), ts + 1);
let ev3 = json!({"FileName": "notes.docx", "User": "bob"});
let r = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
assert_eq!(r.correlations.len(), 1);
assert_eq!(r.correlations[0].rule_title, "Mass File Access");
}
#[test]
fn test_expand_modifier_with_correlation() {
let yaml = r#"
title: User Temp File
id: user-temp
logsource:
category: file_access
detection:
selection:
FilePath|expand: 'C:\Users\%User%\Temp'
condition: selection
---
title: Excessive Temp Access
correlation:
type: event_count
rules:
- user-temp
group-by:
- User
timespan: 60s
condition:
gte: 2
level: medium
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ts = 1000i64;
let ev1 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
assert!(r1.correlations.is_empty());
let ev2 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "alice"});
let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 1);
assert_eq!(r2.correlations.len(), 1);
assert_eq!(r2.correlations[0].rule_title, "Excessive Temp Access");
let ev3 = json!({"FilePath": "C:\\Users\\alice\\Temp", "User": "bob"});
let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
assert_eq!(r3.detections.len(), 0);
}
#[test]
fn test_timestamp_modifier_with_correlation() {
let yaml = r#"
title: Night Login
id: night-login
logsource:
category: auth
detection:
login:
EventType: login
night:
Timestamp|hour: 3
condition: login and night
---
title: Frequent Night Logins
correlation:
type: event_count
rules:
- night-login
group-by:
- User
timespan: 3600s
condition:
gte: 2
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ts = 1000i64;
let ev1 =
json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:10:00Z"});
let r1 = engine.process_event_at(&Event::from_value(&ev1), ts);
assert_eq!(r1.detections.len(), 1);
assert!(r1.correlations.is_empty());
let ev2 =
json!({"EventType": "login", "User": "alice", "Timestamp": "2024-01-15T03:45:00Z"});
let r2 = engine.process_event_at(&Event::from_value(&ev2), ts + 1);
assert_eq!(r2.correlations.len(), 1);
assert_eq!(r2.correlations[0].rule_title, "Frequent Night Logins");
let ev3 = json!({"EventType": "login", "User": "bob", "Timestamp": "2024-01-15T12:00:00Z"});
let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 2);
assert!(
r3.detections.is_empty(),
"noon login should not match night rule"
);
}
#[test]
fn test_event_count_range_condition() {
let yaml = r#"
title: Login Attempt
id: login-attempt-001
name: login_attempt
logsource:
product: windows
detection:
selection:
EventType: login
condition: selection
level: low
---
title: Login Count Range
id: corr-range-001
correlation:
type: event_count
rules:
- login-attempt-001
group-by:
- User
timespan: 3600s
condition:
gt: 2
lte: 5
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ts: i64 = 1_000_000;
for i in 0..2 {
let ev = json!({"EventType": "login", "User": "alice"});
let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
assert!(r.correlations.is_empty(), "2 events should not fire (gt:2)");
}
let ev3 = json!({"EventType": "login", "User": "alice"});
let r3 = engine.process_event_at(&Event::from_value(&ev3), ts + 3);
assert_eq!(r3.correlations.len(), 1, "3 events: gt:2 AND lte:5");
for i in 4..=5 {
let ev = json!({"EventType": "login", "User": "alice"});
let r = engine.process_event_at(&Event::from_value(&ev), ts + i);
assert_eq!(r.correlations.len(), 1, "{i} events still in range");
}
let ev6 = json!({"EventType": "login", "User": "alice"});
let r6 = engine.process_event_at(&Event::from_value(&ev6), ts + 6);
assert!(
r6.correlations.is_empty(),
"6 events exceeds lte:5, should not fire"
);
}
fn suppression_yaml() -> &'static str {
r#"
title: Login
id: login-base
logsource:
category: auth
detection:
selection:
EventType: login
condition: selection
---
title: Many Logins
correlation:
type: event_count
rules:
- login-base
group-by:
- User
timeframe: 60s
condition:
gte: 3
level: high
"#
}
#[test]
fn test_suppression_window() {
let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
let config = CorrelationConfig {
suppress: Some(10), ..Default::default()
};
let mut engine = CorrelationEngine::new(config);
engine.add_collection(&collection).unwrap();
let ev = json!({"EventType": "login", "User": "alice"});
let ts = 1000;
engine.process_event_at(&Event::from_value(&ev), ts);
engine.process_event_at(&Event::from_value(&ev), ts + 1);
let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
assert!(
r4.correlations.is_empty(),
"should be suppressed within 10s window"
);
let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 9);
assert!(
r5.correlations.is_empty(),
"should be suppressed at ts+9 (< ts+2+10)"
);
let r6 = engine.process_event_at(&Event::from_value(&ev), ts + 13);
assert_eq!(
r6.correlations.len(),
1,
"should fire again after suppress window expires"
);
}
#[test]
fn test_suppression_per_group_key() {
let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
let config = CorrelationConfig {
suppress: Some(60),
..Default::default()
};
let mut engine = CorrelationEngine::new(config);
engine.add_collection(&collection).unwrap();
let ts = 1000;
let ev_a = json!({"EventType": "login", "User": "alice"});
engine.process_event_at(&Event::from_value(&ev_a), ts);
engine.process_event_at(&Event::from_value(&ev_a), ts + 1);
let r = engine.process_event_at(&Event::from_value(&ev_a), ts + 2);
assert_eq!(r.correlations.len(), 1, "alice should fire");
let ev_b = json!({"EventType": "login", "User": "bob"});
engine.process_event_at(&Event::from_value(&ev_b), ts + 3);
engine.process_event_at(&Event::from_value(&ev_b), ts + 4);
let r = engine.process_event_at(&Event::from_value(&ev_b), ts + 5);
assert_eq!(r.correlations.len(), 1, "bob should fire independently");
let r = engine.process_event_at(&Event::from_value(&ev_a), ts + 6);
assert!(r.correlations.is_empty(), "alice still suppressed");
}
#[test]
fn test_action_reset() {
let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
let config = CorrelationConfig {
action_on_match: CorrelationAction::Reset,
..Default::default()
};
let mut engine = CorrelationEngine::new(config);
engine.add_collection(&collection).unwrap();
let ev = json!({"EventType": "login", "User": "alice"});
let ts = 1000;
engine.process_event_at(&Event::from_value(&ev), ts);
engine.process_event_at(&Event::from_value(&ev), ts + 1);
let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
assert_eq!(r3.correlations.len(), 1, "should fire on 3rd event");
let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
assert!(r4.correlations.is_empty(), "reset: need 3 more events");
let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 4);
assert!(r5.correlations.is_empty(), "reset: still only 2");
let r6 = engine.process_event_at(&Event::from_value(&ev), ts + 5);
assert_eq!(
r6.correlations.len(),
1,
"should fire again after 3 events post-reset"
);
}
#[test]
fn test_emit_detections_true_by_default() {
let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ev = json!({"EventType": "login", "User": "alice"});
let r = engine.process_event_at(&Event::from_value(&ev), 1000);
assert_eq!(r.detections.len(), 1, "by default detections are emitted");
}
#[test]
fn test_emit_detections_false_suppresses() {
let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
let config = CorrelationConfig {
emit_detections: false,
..Default::default()
};
let mut engine = CorrelationEngine::new(config);
engine.add_collection(&collection).unwrap();
let ev = json!({"EventType": "login", "User": "alice"});
let r = engine.process_event_at(&Event::from_value(&ev), 1000);
assert!(
r.detections.is_empty(),
"detection matches should be suppressed when emit_detections=false"
);
}
#[test]
fn test_generate_true_keeps_detections() {
let yaml = r#"
title: Login
id: login-gen
logsource:
category: auth
detection:
selection:
EventType: login
condition: selection
---
title: Many Logins
correlation:
type: event_count
rules:
- login-gen
group-by:
- User
timeframe: 60s
condition:
gte: 3
generate: true
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let config = CorrelationConfig {
emit_detections: false,
..Default::default()
};
let mut engine = CorrelationEngine::new(config);
engine.add_collection(&collection).unwrap();
let ev = json!({"EventType": "login", "User": "alice"});
let r = engine.process_event_at(&Event::from_value(&ev), 1000);
assert_eq!(
r.detections.len(),
1,
"generate:true keeps detection output"
);
}
#[test]
fn test_suppress_and_reset_combined() {
let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
let config = CorrelationConfig {
suppress: Some(5),
action_on_match: CorrelationAction::Reset,
..Default::default()
};
let mut engine = CorrelationEngine::new(config);
engine.add_collection(&collection).unwrap();
let ev = json!({"EventType": "login", "User": "alice"});
let ts = 1000;
engine.process_event_at(&Event::from_value(&ev), ts);
engine.process_event_at(&Event::from_value(&ev), ts + 1);
let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
assert_eq!(r3.correlations.len(), 1, "fires on 3rd event");
engine.process_event_at(&Event::from_value(&ev), ts + 3);
engine.process_event_at(&Event::from_value(&ev), ts + 4);
let r = engine.process_event_at(&Event::from_value(&ev), ts + 5);
assert!(
r.correlations.is_empty(),
"threshold met again but still suppressed"
);
let r = engine.process_event_at(&Event::from_value(&ev), ts + 8);
assert_eq!(
r.correlations.len(),
1,
"fires after suppress expires (accumulated events + new one)"
);
engine.process_event_at(&Event::from_value(&ev), ts + 9);
engine.process_event_at(&Event::from_value(&ev), ts + 10);
let r = engine.process_event_at(&Event::from_value(&ev), ts + 11);
assert!(
r.correlations.is_empty(),
"threshold met but suppress window hasn't expired (ts+11 - ts+8 = 3 < 5)"
);
}
#[test]
fn test_no_suppression_fires_every_event() {
let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
let ev = json!({"EventType": "login", "User": "alice"});
let ts = 1000;
engine.process_event_at(&Event::from_value(&ev), ts);
engine.process_event_at(&Event::from_value(&ev), ts + 1);
let r3 = engine.process_event_at(&Event::from_value(&ev), ts + 2);
assert_eq!(r3.correlations.len(), 1);
let r4 = engine.process_event_at(&Event::from_value(&ev), ts + 3);
assert_eq!(
r4.correlations.len(),
1,
"no suppression: fires on every event after threshold"
);
let r5 = engine.process_event_at(&Event::from_value(&ev), ts + 4);
assert_eq!(r5.correlations.len(), 1, "still fires");
}
#[test]
fn test_custom_attr_timestamp_field() {
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
let mut attrs = std::collections::HashMap::new();
attrs.insert("rsigma.timestamp_field".to_string(), "time".to_string());
engine.apply_custom_attributes(&attrs);
assert_eq!(
engine.config.timestamp_fields[0], "time",
"rsigma.timestamp_field should be prepended"
);
assert!(
engine
.config
.timestamp_fields
.contains(&"@timestamp".to_string())
);
}
#[test]
fn test_custom_attr_timestamp_field_no_duplicates() {
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
let mut attrs = std::collections::HashMap::new();
attrs.insert("rsigma.timestamp_field".to_string(), "time".to_string());
engine.apply_custom_attributes(&attrs);
engine.apply_custom_attributes(&attrs);
let count = engine
.config
.timestamp_fields
.iter()
.filter(|f| *f == "time")
.count();
assert_eq!(count, 1, "should not duplicate timestamp_field entries");
}
#[test]
fn test_custom_attr_suppress() {
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
assert!(engine.config.suppress.is_none());
let mut attrs = std::collections::HashMap::new();
attrs.insert("rsigma.suppress".to_string(), "5m".to_string());
engine.apply_custom_attributes(&attrs);
assert_eq!(engine.config.suppress, Some(300));
}
#[test]
fn test_custom_attr_suppress_does_not_override_cli() {
let config = CorrelationConfig {
suppress: Some(60), ..Default::default()
};
let mut engine = CorrelationEngine::new(config);
let mut attrs = std::collections::HashMap::new();
attrs.insert("rsigma.suppress".to_string(), "5m".to_string());
engine.apply_custom_attributes(&attrs);
assert_eq!(
engine.config.suppress,
Some(60),
"CLI suppress should not be overridden by custom attribute"
);
}
#[test]
fn test_custom_attr_action() {
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
assert_eq!(engine.config.action_on_match, CorrelationAction::Alert);
let mut attrs = std::collections::HashMap::new();
attrs.insert("rsigma.action".to_string(), "reset".to_string());
engine.apply_custom_attributes(&attrs);
assert_eq!(engine.config.action_on_match, CorrelationAction::Reset);
}
#[test]
fn test_custom_attr_action_does_not_override_cli() {
let config = CorrelationConfig {
action_on_match: CorrelationAction::Reset, ..Default::default()
};
let mut engine = CorrelationEngine::new(config);
let mut attrs = std::collections::HashMap::new();
attrs.insert("rsigma.action".to_string(), "alert".to_string());
engine.apply_custom_attributes(&attrs);
assert_eq!(
engine.config.action_on_match,
CorrelationAction::Reset,
"CLI action should not be overridden by custom attribute"
);
}
#[test]
fn test_custom_attr_timestamp_field_used_for_extraction() {
let collection = parse_sigma_yaml(suppression_yaml()).unwrap();
let mut config = CorrelationConfig::default();
config.timestamp_fields.insert(0, "event_time".to_string());
let mut engine = CorrelationEngine::new(config);
engine.add_collection(&collection).unwrap();
let ev = json!({
"EventType": "login",
"User": "alice",
"event_time": "2026-02-11T12:00:00Z"
});
let result = engine.process_event(&Event::from_value(&ev));
assert!(!result.detections.is_empty() || result.correlations.is_empty());
let ts = engine
.extract_event_timestamp(&Event::from_value(&ev))
.expect("should extract timestamp");
assert!(
ts > 1_700_000_000 && ts < 1_800_000_000,
"timestamp should be ~2026 epoch, got {ts}"
);
}
#[test]
fn test_correlation_cycle_direct() {
let yaml = r#"
title: detection rule
id: det-rule
logsource:
product: test
detection:
selection:
action: click
condition: selection
level: low
---
title: correlation A
id: corr-a
correlation:
type: event_count
rules:
- corr-b
group-by:
- User
timespan: 5m
condition:
gte: 2
level: high
---
title: correlation B
id: corr-b
correlation:
type: event_count
rules:
- corr-a
group-by:
- User
timespan: 5m
condition:
gte: 2
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
let result = engine.add_collection(&collection);
assert!(result.is_err(), "should detect direct cycle");
let err = result.unwrap_err().to_string();
assert!(err.contains("cycle"), "error should mention cycle: {err}");
assert!(
err.contains("corr-a") && err.contains("corr-b"),
"error should name both correlations: {err}"
);
}
#[test]
fn test_correlation_cycle_self() {
let yaml = r#"
title: detection rule
id: det-rule
logsource:
product: test
detection:
selection:
action: click
condition: selection
level: low
---
title: self-ref correlation
id: self-corr
correlation:
type: event_count
rules:
- self-corr
group-by:
- User
timespan: 5m
condition:
gte: 2
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
let result = engine.add_collection(&collection);
assert!(result.is_err(), "should detect self-referencing cycle");
let err = result.unwrap_err().to_string();
assert!(err.contains("cycle"), "error should mention cycle: {err}");
assert!(
err.contains("self-corr"),
"error should name the correlation: {err}"
);
}
#[test]
fn test_correlation_no_cycle_valid_chain() {
let yaml = r#"
title: detection rule
id: det-rule
logsource:
product: test
detection:
selection:
action: click
condition: selection
level: low
---
title: correlation A
id: corr-a
correlation:
type: event_count
rules:
- det-rule
group-by:
- User
timespan: 5m
condition:
gte: 2
level: high
---
title: correlation B
id: corr-b
correlation:
type: event_count
rules:
- corr-a
group-by:
- User
timespan: 5m
condition:
gte: 2
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
let result = engine.add_collection(&collection);
assert!(
result.is_ok(),
"valid chain should not be rejected: {result:?}"
);
}
#[test]
fn test_correlation_cycle_transitive() {
let yaml = r#"
title: detection rule
id: det-rule
logsource:
product: test
detection:
selection:
action: click
condition: selection
level: low
---
title: correlation A
id: corr-a
correlation:
type: event_count
rules:
- corr-c
group-by:
- User
timespan: 5m
condition:
gte: 2
level: high
---
title: correlation B
id: corr-b
correlation:
type: event_count
rules:
- corr-a
group-by:
- User
timespan: 5m
condition:
gte: 2
level: high
---
title: correlation C
id: corr-c
correlation:
type: event_count
rules:
- corr-b
group-by:
- User
timespan: 5m
condition:
gte: 2
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
let result = engine.add_collection(&collection);
assert!(result.is_err(), "should detect transitive cycle");
let err = result.unwrap_err().to_string();
assert!(err.contains("cycle"), "error should mention cycle: {err}");
}
#[test]
fn test_correlation_events_disabled_by_default() {
let yaml = r#"
title: Login
id: login-rule
logsource:
category: auth
detection:
selection:
EventType: login
condition: selection
---
title: Many Logins
correlation:
type: event_count
rules:
- login-rule
group-by:
- User
timespan: 60s
condition:
gte: 3
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
for i in 0..3 {
let v = json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i});
let event = Event::from_value(&v);
let result = engine.process_event_at(&event, 1000 + i);
if i == 2 {
assert_eq!(result.correlations.len(), 1);
assert!(result.correlations[0].events.is_none());
}
}
}
#[test]
fn test_correlation_events_included_when_enabled() {
let yaml = r#"
title: Login
id: login-rule
logsource:
category: auth
detection:
selection:
EventType: login
condition: selection
---
title: Many Logins
correlation:
type: event_count
rules:
- login-rule
group-by:
- User
timespan: 60s
condition:
gte: 3
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let config = CorrelationConfig {
correlation_event_mode: CorrelationEventMode::Full,
max_correlation_events: 10,
..Default::default()
};
let mut engine = CorrelationEngine::new(config);
engine.add_collection(&collection).unwrap();
let events_sent: Vec<serde_json::Value> = (0..3)
.map(|i| json!({"EventType": "login", "User": "admin", "@timestamp": 1000 + i}))
.collect();
let mut corr_result = None;
for (i, ev) in events_sent.iter().enumerate() {
let event = Event::from_value(ev);
let result = engine.process_event_at(&event, 1000 + i as i64);
if !result.correlations.is_empty() {
corr_result = Some(result);
}
}
let result = corr_result.expect("correlation should have fired");
let corr = &result.correlations[0];
let events = corr.events.as_ref().expect("events should be present");
assert_eq!(
events.len(),
3,
"all 3 contributing events should be stored"
);
for (i, event) in events.iter().enumerate() {
assert_eq!(event["EventType"], "login");
assert_eq!(event["User"], "admin");
assert_eq!(event["@timestamp"], 1000 + i as i64);
}
}
#[test]
fn test_correlation_events_max_cap() {
let yaml = r#"
title: Login
id: login-rule
logsource:
category: auth
detection:
selection:
EventType: login
condition: selection
---
title: Many Logins
correlation:
type: event_count
rules:
- login-rule
group-by:
- User
timespan: 60s
condition:
gte: 5
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let config = CorrelationConfig {
correlation_event_mode: CorrelationEventMode::Full,
max_correlation_events: 3, ..Default::default()
};
let mut engine = CorrelationEngine::new(config);
engine.add_collection(&collection).unwrap();
let mut corr_result = None;
for i in 0..5 {
let v = json!({"EventType": "login", "User": "admin", "idx": i});
let event = Event::from_value(&v);
let result = engine.process_event_at(&event, 1000 + i);
if !result.correlations.is_empty() {
corr_result = Some(result);
}
}
let result = corr_result.expect("correlation should have fired");
let events = result.correlations[0]
.events
.as_ref()
.expect("events should be present");
assert_eq!(events.len(), 3);
assert_eq!(events[0]["idx"], 2);
assert_eq!(events[1]["idx"], 3);
assert_eq!(events[2]["idx"], 4);
}
#[test]
fn test_correlation_events_with_reset_action() {
let yaml = r#"
title: Login
id: login-rule
logsource:
category: auth
detection:
selection:
EventType: login
condition: selection
---
title: Many Logins
correlation:
type: event_count
rules:
- login-rule
group-by:
- User
timespan: 60s
condition:
gte: 2
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let config = CorrelationConfig {
correlation_event_mode: CorrelationEventMode::Full,
action_on_match: CorrelationAction::Reset,
..Default::default()
};
let mut engine = CorrelationEngine::new(config);
engine.add_collection(&collection).unwrap();
for i in 0..2 {
let v = json!({"EventType": "login", "User": "admin", "round": 1, "idx": i});
let event = Event::from_value(&v);
let result = engine.process_event_at(&event, 1000 + i);
if i == 1 {
assert_eq!(result.correlations.len(), 1);
let events = result.correlations[0].events.as_ref().unwrap();
assert_eq!(events.len(), 2);
}
}
let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 0});
let event = Event::from_value(&v);
let result = engine.process_event_at(&event, 1010);
assert!(
result.correlations.is_empty(),
"should not fire with only 1 event after reset"
);
let v = json!({"EventType": "login", "User": "admin", "round": 2, "idx": 1});
let event = Event::from_value(&v);
let result = engine.process_event_at(&event, 1011);
assert_eq!(result.correlations.len(), 1);
let events = result.correlations[0].events.as_ref().unwrap();
assert_eq!(events.len(), 2);
assert_eq!(events[0]["round"], 2);
assert_eq!(events[1]["round"], 2);
}
#[test]
fn test_correlation_events_with_set_include() {
let yaml = r#"
title: Login
id: login-rule
logsource:
category: auth
detection:
selection:
EventType: login
condition: selection
---
title: Many Logins
correlation:
type: event_count
rules:
- login-rule
group-by:
- User
timespan: 60s
condition:
gte: 2
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
engine.set_correlation_event_mode(CorrelationEventMode::Full);
for i in 0..2 {
let v = json!({"EventType": "login", "User": "admin"});
let event = Event::from_value(&v);
let result = engine.process_event_at(&event, 1000 + i);
if i == 1 {
assert_eq!(result.correlations.len(), 1);
assert!(result.correlations[0].events.is_some());
assert_eq!(result.correlations[0].events.as_ref().unwrap().len(), 2);
}
}
}
#[test]
fn test_correlation_events_eviction_syncs_with_window() {
let yaml = r#"
title: Login
id: login-rule
logsource:
category: auth
detection:
selection:
EventType: login
condition: selection
---
title: Many Logins
correlation:
type: event_count
rules:
- login-rule
group-by:
- User
timespan: 10s
condition:
gte: 3
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let config = CorrelationConfig {
correlation_event_mode: CorrelationEventMode::Full,
max_correlation_events: 100,
..Default::default()
};
let mut engine = CorrelationEngine::new(config);
engine.add_collection(&collection).unwrap();
for i in 0..2 {
let v = json!({"EventType": "login", "User": "admin", "idx": i});
let event = Event::from_value(&v);
engine.process_event_at(&event, 1000 + i);
}
let v = json!({"EventType": "login", "User": "admin", "idx": 2});
let event = Event::from_value(&v);
let result = engine.process_event_at(&event, 1015);
assert!(
result.correlations.is_empty(),
"should not fire — old events evicted"
);
for i in 3..5 {
let v = json!({"EventType": "login", "User": "admin", "idx": i});
let event = Event::from_value(&v);
let result = engine.process_event_at(&event, 1016 + i - 3);
if i == 4 {
assert_eq!(result.correlations.len(), 1);
let events = result.correlations[0].events.as_ref().unwrap();
assert_eq!(events.len(), 3);
for ev in events {
assert!(ev["idx"].as_i64().unwrap() >= 2);
}
}
}
}
#[test]
fn test_event_buffer_monitoring() {
let yaml = r#"
title: Login
id: login-rule
logsource:
category: auth
detection:
selection:
EventType: login
condition: selection
---
title: Many Logins
correlation:
type: event_count
rules:
- login-rule
group-by:
- User
timespan: 60s
condition:
gte: 100
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let config = CorrelationConfig {
correlation_event_mode: CorrelationEventMode::Full,
..Default::default()
};
let mut engine = CorrelationEngine::new(config);
engine.add_collection(&collection).unwrap();
assert_eq!(engine.event_buffer_count(), 0);
assert_eq!(engine.event_buffer_bytes(), 0);
for i in 0..5 {
let v = json!({"EventType": "login", "User": "admin"});
let event = Event::from_value(&v);
engine.process_event_at(&event, 1000 + i);
}
assert_eq!(engine.event_buffer_count(), 1); assert!(engine.event_buffer_bytes() > 0);
}
#[test]
fn test_correlation_refs_mode_basic() {
let yaml = r#"
title: Login
id: login-rule
logsource:
category: auth
detection:
selection:
EventType: login
condition: selection
---
title: Many Logins
correlation:
type: event_count
rules:
- login-rule
group-by:
- User
timespan: 60s
condition:
gte: 3
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let config = CorrelationConfig {
correlation_event_mode: CorrelationEventMode::Refs,
max_correlation_events: 10,
..Default::default()
};
let mut engine = CorrelationEngine::new(config);
engine.add_collection(&collection).unwrap();
let mut corr_result = None;
for i in 0..3 {
let v = json!({"EventType": "login", "User": "admin", "id": format!("evt-{i}"), "@timestamp": 1000 + i});
let event = Event::from_value(&v);
let result = engine.process_event_at(&event, 1000 + i);
if !result.correlations.is_empty() {
corr_result = Some(result.correlations[0].clone());
}
}
let result = corr_result.expect("correlation should have fired");
assert!(
result.events.is_none(),
"Full events should not be included in refs mode"
);
let refs = result
.event_refs
.expect("event_refs should be present in refs mode");
assert_eq!(refs.len(), 3);
assert_eq!(refs[0].timestamp, 1000);
assert_eq!(refs[0].id, Some("evt-0".to_string()));
assert_eq!(refs[1].id, Some("evt-1".to_string()));
assert_eq!(refs[2].id, Some("evt-2".to_string()));
}
#[test]
fn test_correlation_refs_mode_no_id_field() {
let yaml = r#"
title: Login
id: login-rule
logsource:
category: auth
detection:
selection:
EventType: login
condition: selection
---
title: Many Logins
correlation:
type: event_count
rules:
- login-rule
group-by:
- User
timespan: 60s
condition:
gte: 2
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let config = CorrelationConfig {
correlation_event_mode: CorrelationEventMode::Refs,
..Default::default()
};
let mut engine = CorrelationEngine::new(config);
engine.add_collection(&collection).unwrap();
let mut corr_result = None;
for i in 0..2 {
let v = json!({"EventType": "login", "User": "admin"});
let event = Event::from_value(&v);
let result = engine.process_event_at(&event, 1000 + i);
if !result.correlations.is_empty() {
corr_result = Some(result.correlations[0].clone());
}
}
let result = corr_result.expect("correlation should have fired");
let refs = result.event_refs.expect("event_refs should be present");
for r in &refs {
assert_eq!(r.id, None);
}
}
#[test]
fn test_per_correlation_custom_attributes_from_yaml() {
let yaml = r#"
title: Login
id: login-rule
logsource:
category: auth
detection:
selection:
EventType: login
condition: selection
---
title: Many Logins
custom_attributes:
rsigma.correlation_event_mode: refs
rsigma.max_correlation_events: "5"
correlation:
type: event_count
rules:
- login-rule
group-by:
- User
timespan: 60s
condition:
gte: 3
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let config = CorrelationConfig::default();
let mut engine = CorrelationEngine::new(config);
engine.add_collection(&collection).unwrap();
let mut corr_result = None;
for i in 0..3 {
let v = json!({"EventType": "login", "User": "admin", "id": format!("e{i}")});
let event = Event::from_value(&v);
let result = engine.process_event_at(&event, 1000 + i);
if !result.correlations.is_empty() {
corr_result = Some(result.correlations[0].clone());
}
}
let result = corr_result.expect("correlation should fire with per-correlation refs mode");
assert!(result.events.is_none());
let refs = result
.event_refs
.expect("event_refs via per-correlation override");
assert_eq!(refs.len(), 3);
assert_eq!(refs[0].id, Some("e0".to_string()));
}
#[test]
fn test_per_correlation_custom_attr_suppress_and_action() {
let yaml = r#"
title: Login
id: login-rule
logsource:
category: auth
detection:
selection:
EventType: login
condition: selection
---
title: Many Logins
custom_attributes:
rsigma.suppress: 10s
rsigma.action: reset
correlation:
type: event_count
rules:
- login-rule
group-by:
- User
timespan: 60s
condition:
gte: 2
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine = CorrelationEngine::new(CorrelationConfig::default());
engine.add_collection(&collection).unwrap();
assert_eq!(engine.correlations[0].suppress_secs, Some(10));
assert_eq!(
engine.correlations[0].action,
Some(CorrelationAction::Reset)
);
}
#[test]
fn test_process_with_detections_matches_process_event_at() {
let yaml = r#"
title: Login Failure
id: login-fail
logsource:
category: auth
detection:
selection:
EventType: login_failure
condition: selection
---
title: Brute Force
correlation:
type: event_count
rules:
- login-fail
group-by:
- User
timespan: 60s
condition:
gte: 3
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let mut engine1 = CorrelationEngine::new(CorrelationConfig::default());
engine1.add_collection(&collection).unwrap();
let events: Vec<serde_json::Value> = (0..5)
.map(|i| json!({"EventType": "login_failure", "User": "admin", "@timestamp": format!("2025-01-01T00:00:0{}Z", i + 1)}))
.collect();
let results1: Vec<ProcessResult> = events
.iter()
.enumerate()
.map(|(i, v)| {
let e = Event::from_value(v);
engine1.process_event_at(&e, 1000 + i as i64)
})
.collect();
let mut engine2 = CorrelationEngine::new(CorrelationConfig::default());
engine2.add_collection(&collection).unwrap();
let results2: Vec<ProcessResult> = events
.iter()
.enumerate()
.map(|(i, v)| {
let e = Event::from_value(v);
let detections = engine2.evaluate(&e);
engine2.process_with_detections(&e, detections, 1000 + i as i64)
})
.collect();
assert_eq!(results1.len(), results2.len());
for (r1, r2) in results1.iter().zip(results2.iter()) {
assert_eq!(r1.detections.len(), r2.detections.len());
assert_eq!(r1.correlations.len(), r2.correlations.len());
}
}
#[test]
fn test_process_batch_matches_sequential() {
let yaml = r#"
title: Login Failure
id: login-fail-batch
logsource:
category: auth
detection:
selection:
EventType: login_failure
condition: selection
---
title: Brute Force Batch
correlation:
type: event_count
rules:
- login-fail-batch
group-by:
- User
timespan: 60s
condition:
gte: 3
level: high
"#;
let collection = parse_sigma_yaml(yaml).unwrap();
let event_values: Vec<serde_json::Value> = (0..5)
.map(|i| json!({"EventType": "login_failure", "User": "admin", "@timestamp": format!("2025-01-01T00:00:0{}Z", i + 1)}))
.collect();
let mut engine1 = CorrelationEngine::new(CorrelationConfig::default());
engine1.add_collection(&collection).unwrap();
let sequential: Vec<ProcessResult> = event_values
.iter()
.enumerate()
.map(|(i, v)| {
let e = Event::from_value(v);
engine1.process_event_at(&e, 1000 + i as i64)
})
.collect();
let mut engine2 = CorrelationEngine::new(CorrelationConfig::default());
engine2.add_collection(&collection).unwrap();
let events: Vec<Event> = event_values.iter().map(Event::from_value).collect();
let refs: Vec<&Event> = events.iter().collect();
let batch = engine2.process_batch(&refs);
assert_eq!(sequential.len(), batch.len());
for (seq, bat) in sequential.iter().zip(batch.iter()) {
assert_eq!(seq.detections.len(), bat.detections.len());
assert_eq!(seq.correlations.len(), bat.correlations.len());
}
}
}