use std::collections::HashMap;
use rsigma_parser::SigmaCollection;
use crate::correlation_engine::{
CorrelationConfig, CorrelationEngine, CorrelationSnapshot, CorrelationStateSnapshot,
ProcessResult,
};
use crate::engine::Engine;
use crate::error::Result;
use crate::event::{Event, MappedEvent};
use crate::logsource::LogSourceExtractor;
use crate::pipeline::Pipeline;
use crate::pipeline::transformations::Transformation;
use crate::result::EvaluationResult;
use crate::result::MatchDetailLevel;
use crate::schema::{OnUnknown, RouteDecision, RoutingPlan, SchemaClassifier};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RouteOutcome {
Evaluated,
EvaluatedUnknown,
Dropped,
Errored,
}
pub struct RouteResult {
pub results: ProcessResult,
pub schema: Option<String>,
pub outcome: RouteOutcome,
}
fn collect_field_map(pipelines: &[Pipeline]) -> HashMap<String, Vec<String>> {
let mut map: HashMap<String, Vec<String>> = HashMap::new();
for pipeline in pipelines {
for item in &pipeline.transformations {
if let Transformation::FieldNameMapping { mapping } = &item.transformation {
for (from, to) in mapping {
map.entry(from.clone())
.or_default()
.extend(to.iter().cloned());
}
}
}
}
map
}
enum Routed1 {
Skip,
Eval {
set: usize,
detections: Vec<EvaluationResult>,
},
}
fn detect_one<E: Event>(
classifier: &SchemaClassifier,
plan: &RoutingPlan,
engines: &[Engine],
event: &E,
) -> Routed1 {
let schema = classifier.classify(event).map(|m| m.name);
match plan.decide(schema.as_deref()) {
RouteDecision::Drop | RouteDecision::Error => Routed1::Skip,
RouteDecision::Evaluate { set, .. } => Routed1::Eval {
set,
detections: engines[set].evaluate(event),
},
}
}
pub struct SchemaRouter {
classifier: SchemaClassifier,
plan: RoutingPlan,
engines: Vec<Engine>,
field_maps: Vec<HashMap<String, Vec<String>>>,
correlation: Option<CorrelationEngine>,
}
impl SchemaRouter {
#[allow(clippy::too_many_arguments)]
pub fn build(
collection: &SigmaCollection,
classifier: SchemaClassifier,
plan: RoutingPlan,
pipeline_sets: Vec<Vec<Pipeline>>,
corr_config: CorrelationConfig,
include_event: bool,
match_detail: MatchDetailLevel,
logsource_extractor: Option<LogSourceExtractor>,
) -> Result<Self> {
let mut engines = Vec::with_capacity(pipeline_sets.len());
let mut field_maps = Vec::with_capacity(pipeline_sets.len());
for set in &pipeline_sets {
let mut engine = Engine::new();
engine.set_include_event(include_event);
engine.set_match_detail(match_detail);
engine.set_logsource_extractor(logsource_extractor.clone());
for p in set {
engine.add_pipeline(p.clone());
}
engine.add_collection(collection)?;
engines.push(engine);
field_maps.push(collect_field_map(set));
}
let correlation = if collection.correlations.is_empty() {
None
} else {
let mut ce = CorrelationEngine::new(corr_config);
ce.set_include_event(include_event);
ce.set_match_detail(match_detail);
ce.add_collection(collection)?;
Some(ce)
};
Ok(SchemaRouter {
classifier,
plan,
engines,
field_maps,
correlation,
})
}
pub fn on_unknown(&self) -> OnUnknown {
self.plan.on_unknown()
}
pub fn has_correlations(&self) -> bool {
self.correlation.is_some()
}
pub fn detection_rule_count(&self) -> usize {
self.engines.first().map(|e| e.rule_count()).unwrap_or(0)
}
pub fn logsource_pruned_total(&self) -> u64 {
self.engines
.iter()
.map(Engine::logsource_pruned_total)
.sum()
}
pub fn logsource_absent_total(&self) -> u64 {
self.engines
.iter()
.map(Engine::logsource_absent_total)
.sum()
}
pub fn correlation_rule_count(&self) -> usize {
self.correlation
.as_ref()
.map(|c| c.correlation_rule_count())
.unwrap_or(0)
}
pub fn state_count(&self) -> usize {
self.correlation
.as_ref()
.map(|c| c.state_count())
.unwrap_or(0)
}
pub fn correlation_introspect(
&self,
id: Option<&str>,
group: Option<&str>,
) -> Option<CorrelationStateSnapshot> {
self.correlation
.as_ref()
.map(|c| c.introspect_filtered(id, group))
}
pub fn export_state(&self) -> Option<CorrelationSnapshot> {
self.correlation.as_ref().map(|c| c.export_state())
}
pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
match &mut self.correlation {
Some(c) => c.import_state(snapshot),
None => true,
}
}
pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
let classifier = &self.classifier;
let plan = &self.plan;
let engines = &self.engines;
let phase1: Vec<Routed1> = {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
events
.par_iter()
.map(|e| detect_one(classifier, plan, engines, *e))
.collect()
}
#[cfg(not(feature = "parallel"))]
{
events
.iter()
.map(|e| detect_one(classifier, plan, engines, *e))
.collect()
}
};
let field_maps = &self.field_maps;
let correlation = &mut self.correlation;
phase1
.into_iter()
.zip(events)
.map(|(routed, event)| match routed {
Routed1::Skip => Vec::new(),
Routed1::Eval { set, detections } => match correlation {
Some(ce) => {
let mapped = MappedEvent::new(*event, &field_maps[set]);
ce.correlate_detections(&mapped, detections)
}
None => detections,
},
})
.collect()
}
pub fn route(&mut self, event: &impl Event) -> RouteResult {
let schema = self.classifier.classify(event).map(|m| m.name);
match self.plan.decide(schema.as_deref()) {
RouteDecision::Drop => RouteResult {
results: Vec::new(),
schema,
outcome: RouteOutcome::Dropped,
},
RouteDecision::Error => RouteResult {
results: Vec::new(),
schema,
outcome: RouteOutcome::Errored,
},
RouteDecision::Evaluate { set, unknown } => {
let detections = self.engines[set].evaluate(event);
let results = match &mut self.correlation {
Some(ce) => {
let mapped = MappedEvent::new(event, &self.field_maps[set]);
ce.correlate_detections(&mapped, detections)
}
None => detections,
};
RouteResult {
results,
schema,
outcome: if unknown {
RouteOutcome::EvaluatedUnknown
} else {
RouteOutcome::Evaluated
},
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::JsonEvent;
use crate::pipeline::parse_pipeline;
use crate::schema::RoutingConfig;
use rsigma_parser::parse_sigma_yaml;
use serde_json::json;
const RULES: &str = r#"
title: Whoami
id: rule-whoami
logsource:
category: process_creation
product: windows
detection:
selection:
CommandLine|contains: whoami
condition: selection
level: high
"#;
const ECS_PIPELINE: &str = r#"
name: ecs_test
priority: 20
transformations:
- id: map
type: field_name_mapping
mapping:
CommandLine: process.command_line
User: user.name
"#;
fn plan(bindings: &[(&str, &[&str])]) -> RoutingPlan {
let config = RoutingConfig {
on_unknown: OnUnknown::Warn,
default_pipelines: vec![],
bindings: bindings
.iter()
.map(|(s, ps)| crate::schema::SchemaBinding {
schema: (*s).to_string(),
pipelines: ps.iter().map(|p| (*p).to_string()).collect(),
})
.collect(),
};
RoutingPlan::from_config(&config)
}
#[test]
fn routes_ecs_event_to_ecs_engine() {
let collection = parse_sigma_yaml(RULES).unwrap();
let ecs = parse_pipeline(ECS_PIPELINE).unwrap();
let plan = plan(&[("ecs", &["ecs_test"])]);
let mut router = SchemaRouter::build(
&collection,
SchemaClassifier::builtin(),
plan,
vec![vec![], vec![ecs]],
CorrelationConfig::default(),
false,
MatchDetailLevel::Off,
None,
)
.unwrap();
let ecs_event = json!({"ecs.version": "8.0.0", "process.command_line": "cmd /c whoami"});
let r = router.route(&JsonEvent::borrow(&ecs_event));
assert_eq!(r.schema.as_deref(), Some("ecs"));
assert_eq!(r.outcome, RouteOutcome::Evaluated);
assert_eq!(r.results.len(), 1, "ECS event matches via the ECS engine");
let native = json!({"CommandLine": "cmd /c whoami"});
let r = router.route(&JsonEvent::borrow(&native));
assert_eq!(r.schema.as_deref(), Some("generic_json"));
assert_eq!(r.results.len(), 1);
}
#[test]
fn cross_schema_correlation_groups_the_same_entity() {
let rules = r#"
title: Whoami
id: rule-whoami
logsource:
category: process_creation
product: windows
detection:
selection:
CommandLine|contains: whoami
condition: selection
level: high
---
title: Repeated whoami by user
correlation:
type: event_count
rules:
- rule-whoami
group-by:
- User
timespan: 1h
condition:
gte: 2
level: high
"#;
let collection = parse_sigma_yaml(rules).unwrap();
let ecs = parse_pipeline(ECS_PIPELINE).unwrap();
let plan = plan(&[("ecs", &["ecs_test"])]);
let config = CorrelationConfig {
timestamp_fallback: crate::correlation_engine::TimestampFallback::WallClock,
..Default::default()
};
let mut router = SchemaRouter::build(
&collection,
SchemaClassifier::builtin(),
plan,
vec![vec![], vec![ecs]],
config,
false,
MatchDetailLevel::Off,
None,
)
.unwrap();
let ecs_event = json!({
"ecs.version": "8.0.0",
"process.command_line": "cmd /c whoami",
"user.name": "alice"
});
let r1 = router.route(&JsonEvent::borrow(&ecs_event));
assert_eq!(r1.schema.as_deref(), Some("ecs"));
assert!(
!r1.results.iter().any(|r| r.is_correlation()),
"first event must not fire the count>=2 correlation yet"
);
let native_event = json!({"CommandLine": "cmd /c whoami", "User": "alice"});
let r2 = router.route(&JsonEvent::borrow(&native_event));
assert!(
r2.results.iter().any(|r| r.is_correlation()),
"the two events share group User=alice across schemas and must correlate"
);
}
#[test]
fn drop_policy_skips_unknown_events() {
let collection = parse_sigma_yaml(RULES).unwrap();
let config = RoutingConfig {
on_unknown: OnUnknown::Drop,
default_pipelines: vec![],
bindings: vec![],
};
let plan = RoutingPlan::from_config(&config);
let mut router = SchemaRouter::build(
&collection,
SchemaClassifier::new(vec![]),
plan,
vec![vec![]],
CorrelationConfig::default(),
false,
MatchDetailLevel::Off,
None,
)
.unwrap();
let native = json!({"CommandLine": "cmd /c whoami"});
let r = router.route(&JsonEvent::borrow(&native));
assert_eq!(r.schema, None);
assert_eq!(r.outcome, RouteOutcome::Dropped);
assert!(r.results.is_empty());
}
}