1use std::collections::HashMap;
25
26use rsigma_parser::SigmaCollection;
27
28use crate::correlation_engine::{
29 CorrelationConfig, CorrelationEngine, CorrelationSnapshot, CorrelationStateSnapshot,
30 ProcessResult,
31};
32use crate::engine::Engine;
33use crate::error::Result;
34use crate::event::{Event, MappedEvent};
35use crate::logsource::LogSourceExtractor;
36use crate::pipeline::Pipeline;
37use crate::pipeline::transformations::Transformation;
38use crate::result::EvaluationResult;
39use crate::result::MatchDetailLevel;
40use crate::schema::{OnUnknown, RouteDecision, RoutingPlan, SchemaClassifier};
41
42#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum RouteOutcome {
45 Evaluated,
47 EvaluatedUnknown,
50 Dropped,
52 Errored,
54}
55
56pub struct RouteResult {
58 pub results: ProcessResult,
60 pub schema: Option<String>,
62 pub outcome: RouteOutcome,
64}
65
66fn collect_field_map(pipelines: &[Pipeline]) -> HashMap<String, Vec<String>> {
69 let mut map: HashMap<String, Vec<String>> = HashMap::new();
70 for pipeline in pipelines {
71 for item in &pipeline.transformations {
72 if let Transformation::FieldNameMapping { mapping } = &item.transformation {
73 for (from, to) in mapping {
74 map.entry(from.clone())
75 .or_default()
76 .extend(to.iter().cloned());
77 }
78 }
79 }
80 }
81 map
82}
83
84enum Routed1 {
86 Skip,
88 Eval {
90 set: usize,
91 detections: Vec<EvaluationResult>,
92 },
93}
94
95fn detect_one<E: Event>(
98 classifier: &SchemaClassifier,
99 plan: &RoutingPlan,
100 engines: &[Engine],
101 event: &E,
102) -> Routed1 {
103 let schema = classifier.classify(event).map(|m| m.name);
104 match plan.decide(schema.as_deref()) {
105 RouteDecision::Drop | RouteDecision::Error => Routed1::Skip,
106 RouteDecision::Evaluate { set, .. } => Routed1::Eval {
107 set,
108 detections: engines[set].evaluate(event),
109 },
110 }
111}
112
113pub struct SchemaRouter {
116 classifier: SchemaClassifier,
117 plan: RoutingPlan,
118 engines: Vec<Engine>,
120 field_maps: Vec<HashMap<String, Vec<String>>>,
122 correlation: Option<CorrelationEngine>,
124}
125
126impl SchemaRouter {
127 #[allow(clippy::too_many_arguments)]
130 pub fn build(
131 collection: &SigmaCollection,
132 classifier: SchemaClassifier,
133 plan: RoutingPlan,
134 pipeline_sets: Vec<Vec<Pipeline>>,
135 corr_config: CorrelationConfig,
136 include_event: bool,
137 match_detail: MatchDetailLevel,
138 logsource_extractor: Option<LogSourceExtractor>,
139 ) -> Result<Self> {
140 let mut engines = Vec::with_capacity(pipeline_sets.len());
141 let mut field_maps = Vec::with_capacity(pipeline_sets.len());
142 for set in &pipeline_sets {
143 let mut engine = Engine::new();
144 engine.set_include_event(include_event);
145 engine.set_match_detail(match_detail);
146 engine.set_logsource_extractor(logsource_extractor.clone());
147 for p in set {
148 engine.add_pipeline(p.clone());
149 }
150 engine.add_collection(collection)?;
151 engines.push(engine);
152 field_maps.push(collect_field_map(set));
153 }
154
155 let correlation = if collection.correlations.is_empty() {
159 None
160 } else {
161 let mut ce = CorrelationEngine::new(corr_config);
162 ce.set_include_event(include_event);
163 ce.set_match_detail(match_detail);
164 ce.add_collection(collection)?;
165 Some(ce)
166 };
167
168 Ok(SchemaRouter {
169 classifier,
170 plan,
171 engines,
172 field_maps,
173 correlation,
174 })
175 }
176
177 pub fn on_unknown(&self) -> OnUnknown {
179 self.plan.on_unknown()
180 }
181
182 pub fn has_correlations(&self) -> bool {
184 self.correlation.is_some()
185 }
186
187 pub fn detection_rule_count(&self) -> usize {
189 self.engines.first().map(|e| e.rule_count()).unwrap_or(0)
190 }
191
192 pub fn logsource_pruned_total(&self) -> u64 {
195 self.engines
196 .iter()
197 .map(Engine::logsource_pruned_total)
198 .sum()
199 }
200
201 pub fn logsource_absent_total(&self) -> u64 {
204 self.engines
205 .iter()
206 .map(Engine::logsource_absent_total)
207 .sum()
208 }
209
210 pub fn correlation_rule_count(&self) -> usize {
212 self.correlation
213 .as_ref()
214 .map(|c| c.correlation_rule_count())
215 .unwrap_or(0)
216 }
217
218 pub fn state_count(&self) -> usize {
220 self.correlation
221 .as_ref()
222 .map(|c| c.state_count())
223 .unwrap_or(0)
224 }
225
226 pub fn correlation_introspect(
228 &self,
229 id: Option<&str>,
230 group: Option<&str>,
231 ) -> Option<CorrelationStateSnapshot> {
232 self.correlation
233 .as_ref()
234 .map(|c| c.introspect_filtered(id, group))
235 }
236
237 pub fn export_state(&self) -> Option<CorrelationSnapshot> {
239 self.correlation.as_ref().map(|c| c.export_state())
240 }
241
242 pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
245 match &mut self.correlation {
246 Some(c) => c.import_state(snapshot),
247 None => true,
248 }
249 }
250
251 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
257 let classifier = &self.classifier;
260 let plan = &self.plan;
261 let engines = &self.engines;
262 let phase1: Vec<Routed1> = {
263 #[cfg(feature = "parallel")]
264 {
265 use rayon::prelude::*;
266 events
267 .par_iter()
268 .map(|e| detect_one(classifier, plan, engines, *e))
269 .collect()
270 }
271 #[cfg(not(feature = "parallel"))]
272 {
273 events
274 .iter()
275 .map(|e| detect_one(classifier, plan, engines, *e))
276 .collect()
277 }
278 };
279
280 let field_maps = &self.field_maps;
284 let correlation = &mut self.correlation;
285 phase1
286 .into_iter()
287 .zip(events)
288 .map(|(routed, event)| match routed {
289 Routed1::Skip => Vec::new(),
290 Routed1::Eval { set, detections } => match correlation {
291 Some(ce) => {
292 let mapped = MappedEvent::new(*event, &field_maps[set]);
293 ce.correlate_detections(&mapped, detections)
294 }
295 None => detections,
296 },
297 })
298 .collect()
299 }
300
301 pub fn route(&mut self, event: &impl Event) -> RouteResult {
303 let schema = self.classifier.classify(event).map(|m| m.name);
304 match self.plan.decide(schema.as_deref()) {
305 RouteDecision::Drop => RouteResult {
306 results: Vec::new(),
307 schema,
308 outcome: RouteOutcome::Dropped,
309 },
310 RouteDecision::Error => RouteResult {
311 results: Vec::new(),
312 schema,
313 outcome: RouteOutcome::Errored,
314 },
315 RouteDecision::Evaluate { set, unknown } => {
316 let detections = self.engines[set].evaluate(event);
317 let results = match &mut self.correlation {
318 Some(ce) => {
319 let mapped = MappedEvent::new(event, &self.field_maps[set]);
320 ce.correlate_detections(&mapped, detections)
321 }
322 None => detections,
323 };
324 RouteResult {
325 results,
326 schema,
327 outcome: if unknown {
328 RouteOutcome::EvaluatedUnknown
329 } else {
330 RouteOutcome::Evaluated
331 },
332 }
333 }
334 }
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341 use crate::JsonEvent;
342 use crate::pipeline::parse_pipeline;
343 use crate::schema::RoutingConfig;
344 use rsigma_parser::parse_sigma_yaml;
345 use serde_json::json;
346
347 const RULES: &str = r#"
348title: Whoami
349id: rule-whoami
350logsource:
351 category: process_creation
352 product: windows
353detection:
354 selection:
355 CommandLine|contains: whoami
356 condition: selection
357level: high
358"#;
359
360 const ECS_PIPELINE: &str = r#"
361name: ecs_test
362priority: 20
363transformations:
364 - id: map
365 type: field_name_mapping
366 mapping:
367 CommandLine: process.command_line
368 User: user.name
369"#;
370
371 fn plan(bindings: &[(&str, &[&str])]) -> RoutingPlan {
372 let config = RoutingConfig {
373 on_unknown: OnUnknown::Warn,
374 default_pipelines: vec![],
375 bindings: bindings
376 .iter()
377 .map(|(s, ps)| crate::schema::SchemaBinding {
378 schema: (*s).to_string(),
379 pipelines: ps.iter().map(|p| (*p).to_string()).collect(),
380 })
381 .collect(),
382 };
383 RoutingPlan::from_config(&config)
384 }
385
386 #[test]
387 fn routes_ecs_event_to_ecs_engine() {
388 let collection = parse_sigma_yaml(RULES).unwrap();
389 let ecs = parse_pipeline(ECS_PIPELINE).unwrap();
391 let plan = plan(&[("ecs", &["ecs_test"])]);
392 let mut router = SchemaRouter::build(
393 &collection,
394 SchemaClassifier::builtin(),
395 plan,
396 vec![vec![], vec![ecs]],
397 CorrelationConfig::default(),
398 false,
399 MatchDetailLevel::Off,
400 None,
401 )
402 .unwrap();
403
404 let ecs_event = json!({"ecs.version": "8.0.0", "process.command_line": "cmd /c whoami"});
406 let r = router.route(&JsonEvent::borrow(&ecs_event));
407 assert_eq!(r.schema.as_deref(), Some("ecs"));
408 assert_eq!(r.outcome, RouteOutcome::Evaluated);
409 assert_eq!(r.results.len(), 1, "ECS event matches via the ECS engine");
410
411 let native = json!({"CommandLine": "cmd /c whoami"});
415 let r = router.route(&JsonEvent::borrow(&native));
416 assert_eq!(r.schema.as_deref(), Some("generic_json"));
417 assert_eq!(r.results.len(), 1);
418 }
419
420 #[test]
421 fn cross_schema_correlation_groups_the_same_entity() {
422 let rules = r#"
426title: Whoami
427id: rule-whoami
428logsource:
429 category: process_creation
430 product: windows
431detection:
432 selection:
433 CommandLine|contains: whoami
434 condition: selection
435level: high
436---
437title: Repeated whoami by user
438correlation:
439 type: event_count
440 rules:
441 - rule-whoami
442 group-by:
443 - User
444 timespan: 1h
445 condition:
446 gte: 2
447level: high
448"#;
449 let collection = parse_sigma_yaml(rules).unwrap();
450 let ecs = parse_pipeline(ECS_PIPELINE).unwrap();
451 let plan = plan(&[("ecs", &["ecs_test"])]);
454
455 let config = CorrelationConfig {
456 timestamp_fallback: crate::correlation_engine::TimestampFallback::WallClock,
457 ..Default::default()
458 };
459
460 let mut router = SchemaRouter::build(
461 &collection,
462 SchemaClassifier::builtin(),
463 plan,
464 vec![vec![], vec![ecs]],
465 config,
466 false,
467 MatchDetailLevel::Off,
468 None,
469 )
470 .unwrap();
471
472 let ecs_event = json!({
474 "ecs.version": "8.0.0",
475 "process.command_line": "cmd /c whoami",
476 "user.name": "alice"
477 });
478 let r1 = router.route(&JsonEvent::borrow(&ecs_event));
479 assert_eq!(r1.schema.as_deref(), Some("ecs"));
480 assert!(
481 !r1.results.iter().any(|r| r.is_correlation()),
482 "first event must not fire the count>=2 correlation yet"
483 );
484
485 let native_event = json!({"CommandLine": "cmd /c whoami", "User": "alice"});
487 let r2 = router.route(&JsonEvent::borrow(&native_event));
488 assert!(
489 r2.results.iter().any(|r| r.is_correlation()),
490 "the two events share group User=alice across schemas and must correlate"
491 );
492 }
493
494 #[test]
495 fn drop_policy_skips_unknown_events() {
496 let collection = parse_sigma_yaml(RULES).unwrap();
497 let config = RoutingConfig {
498 on_unknown: OnUnknown::Drop,
499 default_pipelines: vec![],
500 bindings: vec![],
502 };
503 let plan = RoutingPlan::from_config(&config);
504 let mut router = SchemaRouter::build(
505 &collection,
506 SchemaClassifier::new(vec![]),
509 plan,
510 vec![vec![]],
511 CorrelationConfig::default(),
512 false,
513 MatchDetailLevel::Off,
514 None,
515 )
516 .unwrap();
517
518 let native = json!({"CommandLine": "cmd /c whoami"});
519 let r = router.route(&JsonEvent::borrow(&native));
520 assert_eq!(r.schema, None);
521 assert_eq!(r.outcome, RouteOutcome::Dropped);
522 assert!(r.results.is_empty());
523 }
524}