Skip to main content

rsigma_eval/
router.rs

1//! Multi-engine schema router: classify each event, route it to the detection
2//! engine built for its schema's pipeline-set, and feed every detection into
3//! one shared correlation store.
4//!
5//! # Design
6//!
7//! - One [`Engine`] per deduplicated pipeline-set (index-aligned with
8//!   [`RoutingPlan::pipeline_sets`]). The schema's pipeline is applied to the
9//!   detection rules in its engine, exactly as a single-pipeline run would.
10//! - One shared [`CorrelationEngine`] (present only when the rule set has
11//!   correlation rules), built Sigma-native (no pipeline). Detections from any
12//!   per-schema engine feed into it via
13//!   [`CorrelationEngine::correlate_detections`].
14//! - Cross-schema correlation grouping works because the group-by extraction is
15//!   schema-aware: each set carries a `Sigma -> event field` map (derived from
16//!   its pipelines' field-name mappings), and the event is wrapped in a
17//!   [`MappedEvent`] before correlation so the Sigma-native group-by names
18//!   resolve to the schema's field names. The window store stays shared, keyed
19//!   by the logical correlation plus the extracted group values.
20//!
21//! This subsumes the single-schema case (one pipeline-set is the degenerate
22//! configuration), so there is no separate code path for "routing off".
23
24use std::collections::HashMap;
25
26use rsigma_parser::SigmaCollection;
27
28use crate::correlation_engine::{
29    CorrelationConfig, CorrelationEngine, CorrelationSnapshot, CorrelationStateSnapshot,
30    ProcessResult,
31};
32use crate::engine::Engine;
33use crate::error::Result;
34use crate::event::{Event, MappedEvent};
35use crate::logsource::LogSourceExtractor;
36use crate::pipeline::Pipeline;
37use crate::pipeline::transformations::Transformation;
38use crate::result::EvaluationResult;
39use crate::result::MatchDetailLevel;
40use crate::schema::{OnUnknown, RouteDecision, RoutingPlan, SchemaClassifier};
41
42/// What the router did with an event, for reporting and `on_unknown` handling.
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum RouteOutcome {
45    /// Evaluated against a bound or known schema's set.
46    Evaluated,
47    /// Evaluated against the default set because the schema was unrecognized
48    /// (`on_unknown: warn` or `passthrough`).
49    EvaluatedUnknown,
50    /// Dropped without evaluating (`on_unknown: drop`).
51    Dropped,
52    /// Dropped and flagged as an error (`on_unknown: error`).
53    Errored,
54}
55
56/// The result of routing one event.
57pub struct RouteResult {
58    /// Evaluation results (empty when dropped or errored).
59    pub results: ProcessResult,
60    /// The classified schema name, or `None` when unrecognized.
61    pub schema: Option<String>,
62    /// What the router did.
63    pub outcome: RouteOutcome,
64}
65
66/// Collect a combined `Sigma -> [event field]` map from a pipeline-set's
67/// field-name mappings, used for schema-aware correlation group-by extraction.
68fn collect_field_map(pipelines: &[Pipeline]) -> HashMap<String, Vec<String>> {
69    let mut map: HashMap<String, Vec<String>> = HashMap::new();
70    for pipeline in pipelines {
71        for item in &pipeline.transformations {
72            if let Transformation::FieldNameMapping { mapping } = &item.transformation {
73                for (from, to) in mapping {
74                    map.entry(from.clone())
75                        .or_default()
76                        .extend(to.iter().cloned());
77                }
78            }
79        }
80    }
81    map
82}
83
84/// Outcome of the stateless phase for one event in [`SchemaRouter::process_batch`].
85enum Routed1 {
86    /// Dropped or errored (`on_unknown`): no results.
87    Skip,
88    /// Evaluate detections against the shared correlation store under set `set`.
89    Eval {
90        set: usize,
91        detections: Vec<EvaluationResult>,
92    },
93}
94
95/// Stateless detection for one event: classify, decide, evaluate. Borrows only
96/// shared state so it can run in parallel across a batch.
97fn detect_one<E: Event>(
98    classifier: &SchemaClassifier,
99    plan: &RoutingPlan,
100    engines: &[Engine],
101    event: &E,
102) -> Routed1 {
103    let schema = classifier.classify(event).map(|m| m.name);
104    match plan.decide(schema.as_deref()) {
105        RouteDecision::Drop | RouteDecision::Error => Routed1::Skip,
106        RouteDecision::Evaluate { set, .. } => Routed1::Eval {
107            set,
108            detections: engines[set].evaluate(event),
109        },
110    }
111}
112
113/// A multi-engine router over a classifier, a [`RoutingPlan`], one detection
114/// engine per pipeline-set, and one shared correlation store.
115pub struct SchemaRouter {
116    classifier: SchemaClassifier,
117    plan: RoutingPlan,
118    /// One detection engine per pipeline-set (index = set index).
119    engines: Vec<Engine>,
120    /// `Sigma -> event field` map per pipeline-set, for correlation group-by.
121    field_maps: Vec<HashMap<String, Vec<String>>>,
122    /// Shared correlation store; `None` when there are no correlation rules.
123    correlation: Option<CorrelationEngine>,
124}
125
126impl SchemaRouter {
127    /// Build a router. `pipeline_sets` must be index-aligned with
128    /// `plan.pipeline_sets()` (one resolved pipeline list per set).
129    #[allow(clippy::too_many_arguments)]
130    pub fn build(
131        collection: &SigmaCollection,
132        classifier: SchemaClassifier,
133        plan: RoutingPlan,
134        pipeline_sets: Vec<Vec<Pipeline>>,
135        corr_config: CorrelationConfig,
136        include_event: bool,
137        match_detail: MatchDetailLevel,
138        logsource_extractor: Option<LogSourceExtractor>,
139    ) -> Result<Self> {
140        let mut engines = Vec::with_capacity(pipeline_sets.len());
141        let mut field_maps = Vec::with_capacity(pipeline_sets.len());
142        for set in &pipeline_sets {
143            let mut engine = Engine::new();
144            engine.set_include_event(include_event);
145            engine.set_match_detail(match_detail);
146            engine.set_logsource_extractor(logsource_extractor.clone());
147            for p in set {
148                engine.add_pipeline(p.clone());
149            }
150            engine.add_collection(collection)?;
151            engines.push(engine);
152            field_maps.push(collect_field_map(set));
153        }
154
155        // The shared correlation store is Sigma-native (no pipeline): group-by
156        // names stay logical and are mapped per schema at feed time. Its inner
157        // detection engine is unused (routed detection runs in `engines`).
158        let correlation = if collection.correlations.is_empty() {
159            None
160        } else {
161            let mut ce = CorrelationEngine::new(corr_config);
162            ce.set_include_event(include_event);
163            ce.set_match_detail(match_detail);
164            ce.add_collection(collection)?;
165            Some(ce)
166        };
167
168        Ok(SchemaRouter {
169            classifier,
170            plan,
171            engines,
172            field_maps,
173            correlation,
174        })
175    }
176
177    /// The unknown-handling policy this router enforces.
178    pub fn on_unknown(&self) -> OnUnknown {
179        self.plan.on_unknown()
180    }
181
182    /// Whether this router has a correlation store.
183    pub fn has_correlations(&self) -> bool {
184        self.correlation.is_some()
185    }
186
187    /// Number of detection rules (same across every per-schema engine).
188    pub fn detection_rule_count(&self) -> usize {
189        self.engines.first().map(|e| e.rule_count()).unwrap_or(0)
190    }
191
192    /// Total rule candidates pruned by logsource across every per-schema
193    /// engine (each event routes to exactly one engine).
194    pub fn logsource_pruned_total(&self) -> u64 {
195        self.engines
196            .iter()
197            .map(Engine::logsource_pruned_total)
198            .sum()
199    }
200
201    /// Total evaluate calls with no extractable event logsource (fail-open)
202    /// across every per-schema engine.
203    pub fn logsource_absent_total(&self) -> u64 {
204        self.engines
205            .iter()
206            .map(Engine::logsource_absent_total)
207            .sum()
208    }
209
210    /// Number of correlation rules in the shared store (0 when none).
211    pub fn correlation_rule_count(&self) -> usize {
212        self.correlation
213            .as_ref()
214            .map(|c| c.correlation_rule_count())
215            .unwrap_or(0)
216    }
217
218    /// Number of live correlation window-state entries (0 when none).
219    pub fn state_count(&self) -> usize {
220        self.correlation
221            .as_ref()
222            .map(|c| c.state_count())
223            .unwrap_or(0)
224    }
225
226    /// Introspect the shared correlation store, if any (id/group filtered).
227    pub fn correlation_introspect(
228        &self,
229        id: Option<&str>,
230        group: Option<&str>,
231    ) -> Option<CorrelationStateSnapshot> {
232        self.correlation
233            .as_ref()
234            .map(|c| c.introspect_filtered(id, group))
235    }
236
237    /// Export the shared correlation state, if any, for hot-reload carry-over.
238    pub fn export_state(&self) -> Option<CorrelationSnapshot> {
239        self.correlation.as_ref().map(|c| c.export_state())
240    }
241
242    /// Import previously exported correlation state into the shared store.
243    /// No-op (returns `true`) when there is no correlation store.
244    pub fn import_state(&mut self, snapshot: CorrelationSnapshot) -> bool {
245        match &mut self.correlation {
246            Some(c) => c.import_state(snapshot),
247            None => true,
248        }
249    }
250
251    /// Route a batch of events: parallel classify + detection, then sequential
252    /// correlation into the shared store. Mirrors
253    /// `CorrelationEngine::process_batch`: the stateless phase runs concurrently
254    /// (under the `parallel` feature) and the stateful correlation phase runs
255    /// in order. Drop/error outcomes yield empty results for that event.
256    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
257        // Stateless phase: classify + route + detect. Borrows only `&self`
258        // fields, so it parallelizes; correlation state is untouched here.
259        let classifier = &self.classifier;
260        let plan = &self.plan;
261        let engines = &self.engines;
262        let phase1: Vec<Routed1> = {
263            #[cfg(feature = "parallel")]
264            {
265                use rayon::prelude::*;
266                events
267                    .par_iter()
268                    .map(|e| detect_one(classifier, plan, engines, *e))
269                    .collect()
270            }
271            #[cfg(not(feature = "parallel"))]
272            {
273                events
274                    .iter()
275                    .map(|e| detect_one(classifier, plan, engines, *e))
276                    .collect()
277            }
278        };
279
280        // Stateful phase: feed detections into the shared correlation store in
281        // event order. Disjoint field borrows let the field maps and the
282        // correlation store be held at once.
283        let field_maps = &self.field_maps;
284        let correlation = &mut self.correlation;
285        phase1
286            .into_iter()
287            .zip(events)
288            .map(|(routed, event)| match routed {
289                Routed1::Skip => Vec::new(),
290                Routed1::Eval { set, detections } => match correlation {
291                    Some(ce) => {
292                        let mapped = MappedEvent::new(*event, &field_maps[set]);
293                        ce.correlate_detections(&mapped, detections)
294                    }
295                    None => detections,
296                },
297            })
298            .collect()
299    }
300
301    /// Classify and route one event.
302    pub fn route(&mut self, event: &impl Event) -> RouteResult {
303        let schema = self.classifier.classify(event).map(|m| m.name);
304        match self.plan.decide(schema.as_deref()) {
305            RouteDecision::Drop => RouteResult {
306                results: Vec::new(),
307                schema,
308                outcome: RouteOutcome::Dropped,
309            },
310            RouteDecision::Error => RouteResult {
311                results: Vec::new(),
312                schema,
313                outcome: RouteOutcome::Errored,
314            },
315            RouteDecision::Evaluate { set, unknown } => {
316                let detections = self.engines[set].evaluate(event);
317                let results = match &mut self.correlation {
318                    Some(ce) => {
319                        let mapped = MappedEvent::new(event, &self.field_maps[set]);
320                        ce.correlate_detections(&mapped, detections)
321                    }
322                    None => detections,
323                };
324                RouteResult {
325                    results,
326                    schema,
327                    outcome: if unknown {
328                        RouteOutcome::EvaluatedUnknown
329                    } else {
330                        RouteOutcome::Evaluated
331                    },
332                }
333            }
334        }
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341    use crate::JsonEvent;
342    use crate::pipeline::parse_pipeline;
343    use crate::schema::RoutingConfig;
344    use rsigma_parser::parse_sigma_yaml;
345    use serde_json::json;
346
347    const RULES: &str = r#"
348title: Whoami
349id: rule-whoami
350logsource:
351    category: process_creation
352    product: windows
353detection:
354    selection:
355        CommandLine|contains: whoami
356    condition: selection
357level: high
358"#;
359
360    const ECS_PIPELINE: &str = r#"
361name: ecs_test
362priority: 20
363transformations:
364  - id: map
365    type: field_name_mapping
366    mapping:
367      CommandLine: process.command_line
368      User: user.name
369"#;
370
371    fn plan(bindings: &[(&str, &[&str])]) -> RoutingPlan {
372        let config = RoutingConfig {
373            on_unknown: OnUnknown::Warn,
374            default_pipelines: vec![],
375            bindings: bindings
376                .iter()
377                .map(|(s, ps)| crate::schema::SchemaBinding {
378                    schema: (*s).to_string(),
379                    pipelines: ps.iter().map(|p| (*p).to_string()).collect(),
380                })
381                .collect(),
382        };
383        RoutingPlan::from_config(&config)
384    }
385
386    #[test]
387    fn routes_ecs_event_to_ecs_engine() {
388        let collection = parse_sigma_yaml(RULES).unwrap();
389        // set 0 = default (no pipeline, Sigma-native fields), set 1 = ECS.
390        let ecs = parse_pipeline(ECS_PIPELINE).unwrap();
391        let plan = plan(&[("ecs", &["ecs_test"])]);
392        let mut router = SchemaRouter::build(
393            &collection,
394            SchemaClassifier::builtin(),
395            plan,
396            vec![vec![], vec![ecs]],
397            CorrelationConfig::default(),
398            false,
399            MatchDetailLevel::Off,
400            None,
401        )
402        .unwrap();
403
404        // ECS event: fields are renamed; only the ECS engine matches it.
405        let ecs_event = json!({"ecs.version": "8.0.0", "process.command_line": "cmd /c whoami"});
406        let r = router.route(&JsonEvent::borrow(&ecs_event));
407        assert_eq!(r.schema.as_deref(), Some("ecs"));
408        assert_eq!(r.outcome, RouteOutcome::Evaluated);
409        assert_eq!(r.results.len(), 1, "ECS event matches via the ECS engine");
410
411        // A Sigma-native event with the same command is unrecognized here
412        // (no ecs.version, no sysmon markers) -> generic_json -> default set,
413        // which has no pipeline, so the rule's CommandLine matches it.
414        let native = json!({"CommandLine": "cmd /c whoami"});
415        let r = router.route(&JsonEvent::borrow(&native));
416        assert_eq!(r.schema.as_deref(), Some("generic_json"));
417        assert_eq!(r.results.len(), 1);
418    }
419
420    #[test]
421    fn cross_schema_correlation_groups_the_same_entity() {
422        // A detection rule plus an event_count correlation grouped by User.
423        // The same user appears once as an ECS event (user.name) and once as a
424        // Sigma-native event (User); they must land in the same window and fire.
425        let rules = r#"
426title: Whoami
427id: rule-whoami
428logsource:
429    category: process_creation
430    product: windows
431detection:
432    selection:
433        CommandLine|contains: whoami
434    condition: selection
435level: high
436---
437title: Repeated whoami by user
438correlation:
439    type: event_count
440    rules:
441        - rule-whoami
442    group-by:
443        - User
444    timespan: 1h
445    condition:
446        gte: 2
447level: high
448"#;
449        let collection = parse_sigma_yaml(rules).unwrap();
450        let ecs = parse_pipeline(ECS_PIPELINE).unwrap();
451        // set 0 = default (Sigma-native), set 1 = ECS. ecs schema -> set 1;
452        // everything else (incl. the generic event) -> default set 0.
453        let plan = plan(&[("ecs", &["ecs_test"])]);
454
455        let config = CorrelationConfig {
456            timestamp_fallback: crate::correlation_engine::TimestampFallback::WallClock,
457            ..Default::default()
458        };
459
460        let mut router = SchemaRouter::build(
461            &collection,
462            SchemaClassifier::builtin(),
463            plan,
464            vec![vec![], vec![ecs]],
465            config,
466            false,
467            MatchDetailLevel::Off,
468            None,
469        )
470        .unwrap();
471
472        // First occurrence: ECS event for user alice.
473        let ecs_event = json!({
474            "ecs.version": "8.0.0",
475            "process.command_line": "cmd /c whoami",
476            "user.name": "alice"
477        });
478        let r1 = router.route(&JsonEvent::borrow(&ecs_event));
479        assert_eq!(r1.schema.as_deref(), Some("ecs"));
480        assert!(
481            !r1.results.iter().any(|r| r.is_correlation()),
482            "first event must not fire the count>=2 correlation yet"
483        );
484
485        // Second occurrence: Sigma-native event for the SAME user alice.
486        let native_event = json!({"CommandLine": "cmd /c whoami", "User": "alice"});
487        let r2 = router.route(&JsonEvent::borrow(&native_event));
488        assert!(
489            r2.results.iter().any(|r| r.is_correlation()),
490            "the two events share group User=alice across schemas and must correlate"
491        );
492    }
493
494    #[test]
495    fn drop_policy_skips_unknown_events() {
496        let collection = parse_sigma_yaml(RULES).unwrap();
497        let config = RoutingConfig {
498            on_unknown: OnUnknown::Drop,
499            default_pipelines: vec![],
500            // Bind generic_json away so a plain event is truly unknown.
501            bindings: vec![],
502        };
503        let plan = RoutingPlan::from_config(&config);
504        let mut router = SchemaRouter::build(
505            &collection,
506            // Classifier with no generic_json: only ECS recognized, everything
507            // else is unknown.
508            SchemaClassifier::new(vec![]),
509            plan,
510            vec![vec![]],
511            CorrelationConfig::default(),
512            false,
513            MatchDetailLevel::Off,
514            None,
515        )
516        .unwrap();
517
518        let native = json!({"CommandLine": "cmd /c whoami"});
519        let r = router.route(&JsonEvent::borrow(&native));
520        assert_eq!(r.schema, None);
521        assert_eq!(r.outcome, RouteOutcome::Dropped);
522        assert!(r.results.is_empty());
523    }
524}