pub mod builtin;
pub mod conditions;
pub mod finalizers;
mod parsing;
pub mod sources;
pub mod state;
pub mod transformations;
#[cfg(test)]
mod tests;
use std::collections::HashMap;
use rsigma_parser::{CorrelationRule, SigmaCollection, SigmaRule};
use crate::error::{EvalError, Result};
pub use conditions::{
DetectionItemCondition, FieldNameCondition, NamedRuleCondition, RuleCondition,
eval_condition_expr,
};
pub use finalizers::Finalizer;
pub use parsing::{parse_pipeline, parse_pipeline_file, parse_transformation_items};
pub use state::PipelineState;
pub use transformations::Transformation;
#[derive(Debug, Clone)]
pub struct Pipeline {
pub name: String,
pub priority: i32,
pub vars: HashMap<String, Vec<String>>,
pub transformations: Vec<TransformationItem>,
pub finalizers: Vec<Finalizer>,
pub sources: Vec<sources::DynamicSource>,
pub source_refs: Vec<sources::SourceRef>,
}
#[derive(Debug, Clone)]
pub struct TransformationItem {
pub id: Option<String>,
pub transformation: Transformation,
pub rule_conditions: Vec<NamedRuleCondition>,
pub rule_cond_expr: Option<String>,
pub detection_item_conditions: Vec<DetectionItemCondition>,
pub field_name_conditions: Vec<FieldNameCondition>,
pub field_name_cond_not: bool,
}
impl Pipeline {
pub fn apply(&self, rule: &mut SigmaRule, state: &mut PipelineState) -> Result<()> {
state.reset_rule();
for item in &self.transformations {
if !self.check_rule_conditions(rule, state, item) {
continue;
}
state.reset_detection_item();
let applied = item.transformation.apply(
rule,
state,
&item.detection_item_conditions,
&item.field_name_conditions,
item.field_name_cond_not,
)?;
if applied && let Some(ref id) = item.id {
state.mark_applied(id);
}
}
Ok(())
}
pub fn apply_to_collection(&self, collection: &SigmaCollection) -> Result<Vec<SigmaRule>> {
let mut state = PipelineState::new(self.vars.clone());
let mut transformed = Vec::with_capacity(collection.rules.len());
for rule in &collection.rules {
let mut cloned = rule.clone();
self.apply(&mut cloned, &mut state)?;
transformed.push(cloned);
}
Ok(transformed)
}
fn check_rule_conditions(
&self,
rule: &SigmaRule,
state: &PipelineState,
item: &TransformationItem,
) -> bool {
if item.rule_conditions.is_empty() {
return true;
}
if let Some(ref expr) = item.rule_cond_expr {
let mut results = HashMap::new();
for (i, named) in item.rule_conditions.iter().enumerate() {
let id = named.id.clone().unwrap_or_else(|| format!("cond_{i}"));
results.insert(id, named.condition.matches_rule(rule, state));
}
return eval_condition_expr(expr, &results);
}
item.rule_conditions
.iter()
.all(|c| c.condition.matches_rule(rule, state))
}
pub fn apply_to_correlation(
&self,
corr: &mut CorrelationRule,
state: &mut PipelineState,
) -> Result<()> {
state.reset_rule();
for item in &self.transformations {
if !self.check_correlation_conditions(corr, state, item) {
continue;
}
state.reset_detection_item();
let applied = apply_correlation_transformation(corr, &item.transformation, state)?;
if applied && let Some(ref id) = item.id {
state.mark_applied(id);
}
}
Ok(())
}
pub fn is_dynamic(&self) -> bool {
!self.sources.is_empty() || !self.source_refs.is_empty()
}
pub fn dynamic_references(&self) -> &[sources::SourceRef] {
&self.source_refs
}
fn check_correlation_conditions(
&self,
corr: &CorrelationRule,
state: &PipelineState,
item: &TransformationItem,
) -> bool {
if item.rule_conditions.is_empty() {
return true;
}
if let Some(ref expr) = item.rule_cond_expr {
let mut results = HashMap::new();
for (i, named) in item.rule_conditions.iter().enumerate() {
let id = named.id.clone().unwrap_or_else(|| format!("cond_{i}"));
results.insert(id, named.condition.matches_correlation(corr, state));
}
return eval_condition_expr(expr, &results);
}
item.rule_conditions
.iter()
.all(|c| c.condition.matches_correlation(corr, state))
}
}
fn apply_correlation_transformation(
corr: &mut CorrelationRule,
transformation: &Transformation,
state: &mut PipelineState,
) -> Result<bool> {
match transformation {
Transformation::FieldNameMapping { mapping } => {
let alias_names: std::collections::HashSet<String> =
corr.aliases.iter().map(|a| a.alias.clone()).collect();
for alias in &mut corr.aliases {
for (rule_ref, field_name) in &mut alias.mapping {
if let Some(alts) = mapping.get(field_name.as_str())
&& alts.len() > 1
{
return Err(EvalError::InvalidModifiers(format!(
"field_name_mapping one-to-many cannot be applied to \
correlation alias mapping (alias '{}', rule '{}', \
field '{}' maps to {} alternatives)",
alias.alias,
rule_ref,
field_name,
alts.len(),
)));
} else if let Some(alts) = mapping.get(field_name.as_str()) {
*field_name = alts[0].clone();
}
}
}
corr.group_by = corr
.group_by
.iter()
.flat_map(|field_name| {
if alias_names.contains(field_name.as_str()) {
vec![field_name.clone()]
} else if let Some(alts) = mapping.get(field_name.as_str()) {
if alts.len() > 1 {
log::warn!(
"correlation '{}': group_by field '{}' has a one-to-many \
mapping ({} alternatives: {:?}); expanding all — \
correlation grouping may be broader than intended",
corr.title,
field_name,
alts.len(),
alts,
);
}
alts.clone()
} else {
vec![field_name.clone()]
}
})
.collect();
if let rsigma_parser::CorrelationCondition::Threshold { ref mut field, .. } =
corr.condition
&& let Some(fields) = field.as_mut()
{
for f in fields.iter_mut() {
if let Some(alts) = mapping.get(f.as_str()) {
if alts.len() > 1 {
return Err(EvalError::InvalidModifiers(format!(
"field_name_mapping one-to-many cannot be applied to \
correlation condition field reference ('{}' maps to \
{} alternatives)",
f,
alts.len(),
)));
}
*f = alts[0].clone();
}
}
}
Ok(true)
}
Transformation::FieldNamePrefixMapping { mapping } => {
remap_correlation_fields(corr, |name| {
for (prefix, replacement) in mapping {
if let Some(rest) = name.strip_prefix(prefix.as_str()) {
return Some(format!("{replacement}{rest}"));
}
}
None
});
Ok(true)
}
Transformation::FieldNamePrefix { prefix } => {
remap_correlation_fields(corr, |name| Some(format!("{prefix}{name}")));
Ok(true)
}
Transformation::FieldNameSuffix { suffix } => {
remap_correlation_fields(corr, |name| Some(format!("{name}{suffix}")));
Ok(true)
}
Transformation::SetCustomAttribute { attribute, value } => {
corr.custom_attributes
.insert(attribute.clone(), yaml_serde::Value::String(value.clone()));
Ok(true)
}
Transformation::SetState { key, value } => {
state.set_state(key.clone(), serde_json::Value::String(value.clone()));
Ok(true)
}
Transformation::RuleFailure { message } => Err(EvalError::InvalidModifiers(format!(
"Pipeline rule failure: {message} (correlation: {})",
corr.title
))),
_ => Ok(false),
}
}
fn remap_correlation_fields(corr: &mut CorrelationRule, mapper: impl Fn(&str) -> Option<String>) {
for field in &mut corr.group_by {
if let Some(new_name) = mapper(field) {
*field = new_name;
}
}
for alias in &mut corr.aliases {
let remapped: HashMap<String, String> = alias
.mapping
.iter()
.map(|(rule_ref, field_name)| {
let new_name = mapper(field_name).unwrap_or_else(|| field_name.clone());
(rule_ref.clone(), new_name)
})
.collect();
alias.mapping = remapped;
}
if let rsigma_parser::CorrelationCondition::Threshold { ref mut field, .. } = corr.condition
&& let Some(fields) = field.as_mut()
{
for f in fields.iter_mut() {
if let Some(new_name) = mapper(f) {
*f = new_name;
}
}
}
}
pub fn merge_pipelines(pipelines: &mut [Pipeline]) {
pipelines.sort_by_key(|p| p.priority);
}
pub fn apply_pipelines(pipelines: &[Pipeline], rule: &mut SigmaRule) -> Result<()> {
for pipeline in pipelines {
let mut state = PipelineState::new(pipeline.vars.clone());
pipeline.apply(rule, &mut state)?;
}
Ok(())
}
pub fn apply_pipelines_with_state(
pipelines: &[Pipeline],
rule: &mut SigmaRule,
) -> Result<PipelineState> {
let mut merged = PipelineState::default();
for pipeline in pipelines {
let mut state = PipelineState::new(pipeline.vars.clone());
pipeline.apply(rule, &mut state)?;
for (k, v) in state.state {
merged.state.insert(k, v);
}
merged.applied_items.extend(state.applied_items);
merged.vars.extend(state.vars);
}
Ok(merged)
}
pub fn apply_pipelines_to_correlation(
pipelines: &[Pipeline],
corr: &mut CorrelationRule,
) -> Result<PipelineState> {
let mut merged = PipelineState::default();
for pipeline in pipelines {
let mut state = PipelineState::new(pipeline.vars.clone());
pipeline.apply_to_correlation(corr, &mut state)?;
for (k, v) in state.state {
merged.state.insert(k, v);
}
merged.applied_items.extend(state.applied_items);
merged.vars.extend(state.vars);
}
Ok(merged)
}