Skip to main content

rsigma_eval/engine/
mod.rs

1//! Rule evaluation engine with logsource routing.
2//!
3//! The `Engine` manages a set of compiled Sigma rules and evaluates events
4//! against them. It supports optional logsource-based pre-filtering to
5//! reduce the number of rules evaluated per event.
6
7pub(crate) mod bloom_index;
8#[cfg(feature = "daachorse-index")]
9pub(crate) mod cross_rule_ac;
10mod filters;
11#[cfg(test)]
12mod tests;
13
14use rsigma_parser::{
15    ConditionExpr, FilterRule, FilterRuleTarget, LogSource, SigmaCollection, SigmaRule,
16};
17
18use crate::compiler::{
19    CompiledRule, compile_detection, compile_rule, evaluate_rule, evaluate_rule_with_bloom,
20};
21use crate::error::Result;
22use crate::event::Event;
23use crate::pipeline::{Pipeline, apply_pipelines};
24use crate::result::MatchResult;
25use crate::rule_index::RuleIndex;
26
27use bloom_index::{BloomCache, FieldBloomIndex};
28
29use filters::{filter_logsource_contains, logsource_matches, rewrite_condition_identifiers};
30
31/// The main rule evaluation engine.
32///
33/// Holds a set of compiled rules and provides methods to evaluate events
34/// against them. Supports optional logsource routing for performance.
35///
36/// # Example
37///
38/// ```rust
39/// use rsigma_parser::parse_sigma_yaml;
40/// use rsigma_eval::{Engine, Event};
41/// use rsigma_eval::event::JsonEvent;
42/// use serde_json::json;
43///
44/// let yaml = r#"
45/// title: Detect Whoami
46/// logsource:
47///     product: windows
48///     category: process_creation
49/// detection:
50///     selection:
51///         CommandLine|contains: 'whoami'
52///     condition: selection
53/// level: medium
54/// "#;
55///
56/// let collection = parse_sigma_yaml(yaml).unwrap();
57/// let mut engine = Engine::new();
58/// engine.add_collection(&collection).unwrap();
59///
60/// let event_val = json!({"CommandLine": "cmd /c whoami"});
61/// let event = JsonEvent::borrow(&event_val);
62/// let matches = engine.evaluate(&event);
63/// assert_eq!(matches.len(), 1);
64/// assert_eq!(matches[0].rule_title, "Detect Whoami");
65/// ```
66pub struct Engine {
67    rules: Vec<CompiledRule>,
68    pipelines: Vec<Pipeline>,
69    /// Global override: include the full event JSON in all match results.
70    /// When `true`, overrides per-rule `rsigma.include_event` custom attributes.
71    include_event: bool,
72    /// Monotonic counter used to namespace injected filter detections,
73    /// preventing key collisions when multiple filters share detection names.
74    filter_counter: usize,
75    /// Inverted index mapping `(field, exact_value)` to candidate rule indices.
76    /// Rebuilt after every rule mutation (add, filter).
77    rule_index: RuleIndex,
78    /// Per-field bloom filter over positive substring needles. Rebuilt
79    /// alongside `rule_index`. Consulted only when `bloom_prefilter` is
80    /// enabled.
81    bloom_index: FieldBloomIndex,
82    /// Toggle for bloom pre-filtering. Off by default: the per-event probe
83    /// overhead exceeds the savings on rule sets where most events overlap
84    /// with at least one needle's trigrams. Workloads with many substring
85    /// rules and mostly-non-matching events (e.g. high-volume telemetry
86    /// streams against an active threat-intel ruleset) opt in via
87    /// [`Engine::set_bloom_prefilter`].
88    bloom_prefilter: bool,
89    /// Memory budget the bloom builder is allowed to consume across all
90    /// per-field filters. `None` means use the crate default
91    /// (`bloom_index::DEFAULT_MAX_TOTAL_BYTES`, 1 MB).
92    bloom_max_bytes: Option<usize>,
93    /// Cross-rule Aho-Corasick index for substring patterns, gated on the
94    /// `daachorse-index` feature. Built only when [`cross_rule_ac_enabled`]
95    /// is `true`; [`cross_rule_ac_prunable`] is the conservative per-rule
96    /// flag computed at the same time so the `evaluate` hot path can drop
97    /// rules safely.
98    ///
99    /// [`cross_rule_ac_enabled`]: Self::cross_rule_ac_enabled
100    /// [`cross_rule_ac_prunable`]: Self::cross_rule_ac_prunable
101    #[cfg(feature = "daachorse-index")]
102    cross_rule_ac_index: cross_rule_ac::CrossRuleAcIndex,
103    /// Toggle for the cross-rule AC pre-filter. Off by default; the index
104    /// only pays off on rule sets > 5K rules with many shared substring
105    /// patterns. See [`Engine::set_cross_rule_ac`].
106    #[cfg(feature = "daachorse-index")]
107    cross_rule_ac_enabled: bool,
108    /// Per-rule conservative AC-prunability flag. `true` iff the rule's
109    /// firing requires at least one positive substring match (no `Exact`,
110    /// `Regex`, `Numeric`, `Not`, etc.), so dropping the rule on a
111    /// "no AC hit" verdict is provably correct.
112    #[cfg(feature = "daachorse-index")]
113    cross_rule_ac_prunable: Vec<bool>,
114}
115
116impl Engine {
117    /// Create a new empty engine.
118    pub fn new() -> Self {
119        Engine {
120            rules: Vec::new(),
121            pipelines: Vec::new(),
122            include_event: false,
123            filter_counter: 0,
124            rule_index: RuleIndex::empty(),
125            bloom_index: FieldBloomIndex::empty(),
126            bloom_prefilter: false,
127            bloom_max_bytes: None,
128            #[cfg(feature = "daachorse-index")]
129            cross_rule_ac_index: cross_rule_ac::CrossRuleAcIndex::empty(),
130            #[cfg(feature = "daachorse-index")]
131            cross_rule_ac_enabled: false,
132            #[cfg(feature = "daachorse-index")]
133            cross_rule_ac_prunable: Vec::new(),
134        }
135    }
136
137    /// Create a new engine with a pipeline.
138    pub fn new_with_pipeline(pipeline: Pipeline) -> Self {
139        Engine {
140            rules: Vec::new(),
141            pipelines: vec![pipeline],
142            include_event: false,
143            filter_counter: 0,
144            rule_index: RuleIndex::empty(),
145            bloom_index: FieldBloomIndex::empty(),
146            bloom_prefilter: false,
147            bloom_max_bytes: None,
148            #[cfg(feature = "daachorse-index")]
149            cross_rule_ac_index: cross_rule_ac::CrossRuleAcIndex::empty(),
150            #[cfg(feature = "daachorse-index")]
151            cross_rule_ac_enabled: false,
152            #[cfg(feature = "daachorse-index")]
153            cross_rule_ac_prunable: Vec::new(),
154        }
155    }
156
157    /// Enable or disable bloom-filter pre-filtering of positive substring
158    /// detection items.
159    ///
160    /// When enabled, `evaluate*` short-circuits any positive substring
161    /// matcher (`Contains` / `StartsWith` / `EndsWith` / `AhoCorasickSet`,
162    /// alone or wrapped in `CaseInsensitiveGroup`) whose field cannot
163    /// possibly contain a needle trigram.
164    ///
165    /// Disabled by default. The per-event probe (trigram extraction +
166    /// double hashing) costs ~1 µs on a typical CommandLine field, which
167    /// outweighs the savings on rule sets where most events overlap with
168    /// at least one needle. Enable for workloads that pair many substring
169    /// rules with mostly-non-matching events; benchmark with
170    /// `eval_bloom_rejection` before flipping it on in production.
171    pub fn set_bloom_prefilter(&mut self, enabled: bool) {
172        self.bloom_prefilter = enabled;
173    }
174
175    /// Returns whether bloom pre-filtering is currently enabled.
176    pub fn bloom_prefilter_enabled(&self) -> bool {
177        self.bloom_prefilter
178    }
179
180    /// Set the memory budget for the per-field bloom index.
181    ///
182    /// Must be called **before** `add_collection` / `add_rule` for the new
183    /// budget to take effect on the existing rule set; otherwise it is
184    /// applied at the next index rebuild. The default budget is 1 MB,
185    /// shared across all per-field filters. Lower the cap on memory-
186    /// constrained deployments; raise it for large rule sets where the
187    /// default starts evicting useful filters.
188    pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
189        self.bloom_max_bytes = Some(max_bytes);
190        if !self.rules.is_empty() {
191            self.rebuild_index();
192        }
193    }
194
195    /// Returns the configured bloom memory budget, if one has been set
196    /// explicitly. `None` means the crate default (1 MB) is in use.
197    pub fn bloom_max_bytes(&self) -> Option<usize> {
198        self.bloom_max_bytes
199    }
200
201    /// Enable or disable the cross-rule Aho-Corasick pre-filter.
202    ///
203    /// When enabled, the engine builds a single per-field
204    /// `DoubleArrayAhoCorasick` automaton over every positive substring
205    /// needle from every rule and drops AC-prunable rules from the
206    /// candidate set when none of their patterns hit the event.
207    ///
208    /// Off by default. Pays off on large rule sets (> ~5K rules) with many
209    /// shared substring patterns (threat-intel feeds, IOC packs). For
210    /// smaller rule sets the per-rule [`AhoCorasickSet`] matcher already
211    /// handles the workload optimally; the cross-rule index only adds
212    /// build-time and lookup overhead. Benchmark with `eval_cross_rule_ac`
213    /// against representative rule sets before enabling in production.
214    ///
215    /// Available behind the `daachorse-index` Cargo feature.
216    ///
217    /// [`AhoCorasickSet`]: crate::matcher::CompiledMatcher::AhoCorasickSet
218    #[cfg(feature = "daachorse-index")]
219    pub fn set_cross_rule_ac(&mut self, enabled: bool) {
220        self.cross_rule_ac_enabled = enabled;
221        if enabled && !self.rules.is_empty() {
222            self.rebuild_index();
223        }
224    }
225
226    /// Returns whether the cross-rule AC pre-filter is currently enabled.
227    /// Available behind the `daachorse-index` Cargo feature.
228    #[cfg(feature = "daachorse-index")]
229    pub fn cross_rule_ac_enabled(&self) -> bool {
230        self.cross_rule_ac_enabled
231    }
232
233    /// Set global `include_event` — when `true`, all match results include
234    /// the full event JSON regardless of per-rule custom attributes.
235    pub fn set_include_event(&mut self, include: bool) {
236        self.include_event = include;
237    }
238
239    /// Add a pipeline to the engine.
240    ///
241    /// Pipelines are applied to rules during `add_rule` / `add_collection`.
242    /// Only affects rules added **after** this call.
243    pub fn add_pipeline(&mut self, pipeline: Pipeline) {
244        self.pipelines.push(pipeline);
245        self.pipelines.sort_by_key(|p| p.priority);
246    }
247
248    /// Add a single parsed Sigma rule.
249    ///
250    /// If pipelines are set, the rule is cloned and transformed before compilation.
251    /// The inverted index is rebuilt after adding the rule.
252    pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
253        let compiled = if self.pipelines.is_empty() {
254            compile_rule(rule)?
255        } else {
256            let mut transformed = rule.clone();
257            apply_pipelines(&self.pipelines, &mut transformed)?;
258            compile_rule(&transformed)?
259        };
260        self.rules.push(compiled);
261        self.rebuild_index();
262        Ok(())
263    }
264
265    /// Add all detection rules from a parsed collection, then apply filters.
266    ///
267    /// Filter rules modify referenced detection rules by appending exclusion
268    /// conditions. Correlation rules are handled by `CorrelationEngine`.
269    /// The inverted index is rebuilt once after all rules and filters are loaded.
270    pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
271        for rule in &collection.rules {
272            let compiled = if self.pipelines.is_empty() {
273                compile_rule(rule)?
274            } else {
275                let mut transformed = rule.clone();
276                apply_pipelines(&self.pipelines, &mut transformed)?;
277                compile_rule(&transformed)?
278            };
279            self.rules.push(compiled);
280        }
281        for filter in &collection.filters {
282            self.apply_filter_no_rebuild(filter)?;
283        }
284        self.rebuild_index();
285        Ok(())
286    }
287
288    /// Add all detection rules from a collection, applying the given pipelines.
289    ///
290    /// This is a convenience method that temporarily sets pipelines, adds the
291    /// collection, then clears them. The inverted index is rebuilt once after
292    /// all rules and filters are loaded.
293    pub fn add_collection_with_pipelines(
294        &mut self,
295        collection: &SigmaCollection,
296        pipelines: &[Pipeline],
297    ) -> Result<()> {
298        let prev = std::mem::take(&mut self.pipelines);
299        self.pipelines = pipelines.to_vec();
300        self.pipelines.sort_by_key(|p| p.priority);
301        let result = self.add_collection(collection);
302        self.pipelines = prev;
303        result
304    }
305
306    /// Apply a filter rule to all referenced detection rules and rebuild the index.
307    pub fn apply_filter(&mut self, filter: &FilterRule) -> Result<()> {
308        self.apply_filter_no_rebuild(filter)?;
309        self.rebuild_index();
310        Ok(())
311    }
312
313    /// Apply a filter rule without rebuilding the index.
314    /// Used internally when multiple mutations are batched.
315    fn apply_filter_no_rebuild(&mut self, filter: &FilterRule) -> Result<()> {
316        // Compile filter detections
317        let mut filter_detections = Vec::new();
318        for (name, detection) in &filter.detection.named {
319            let compiled = compile_detection(detection)?;
320            filter_detections.push((name.clone(), compiled));
321        }
322
323        if filter_detections.is_empty() {
324            return Ok(());
325        }
326
327        let fc = self.filter_counter;
328        self.filter_counter += 1;
329
330        // Rewrite the filter's own condition expression with namespaced identifiers
331        // so that `selection` becomes `__filter_0_selection`, etc.
332        let rewritten_cond = if let Some(cond_expr) = filter.detection.conditions.first() {
333            rewrite_condition_identifiers(cond_expr, fc)
334        } else {
335            // No explicit condition: AND all detections (legacy fallback)
336            if filter_detections.len() == 1 {
337                ConditionExpr::Identifier(format!("__filter_{fc}_{}", filter_detections[0].0))
338            } else {
339                ConditionExpr::And(
340                    filter_detections
341                        .iter()
342                        .map(|(name, _)| ConditionExpr::Identifier(format!("__filter_{fc}_{name}")))
343                        .collect(),
344                )
345            }
346        };
347
348        // Find and modify referenced rules
349        let mut matched_any = false;
350        for rule in &mut self.rules {
351            let rule_matches = match &filter.rules {
352                FilterRuleTarget::Any => true,
353                FilterRuleTarget::Specific(refs) => refs
354                    .iter()
355                    .any(|r| rule.id.as_deref() == Some(r.as_str()) || rule.title == *r),
356            };
357
358            // Also check logsource compatibility if the filter specifies one
359            if rule_matches {
360                if let Some(ref filter_ls) = filter.logsource
361                    && !filter_logsource_contains(filter_ls, &rule.logsource)
362                {
363                    continue;
364                }
365
366                // Inject filter detections into the rule
367                for (name, compiled) in &filter_detections {
368                    rule.detections
369                        .insert(format!("__filter_{fc}_{name}"), compiled.clone());
370                }
371
372                // Wrap each existing rule condition with the filter condition
373                rule.conditions = rule
374                    .conditions
375                    .iter()
376                    .map(|cond| ConditionExpr::And(vec![cond.clone(), rewritten_cond.clone()]))
377                    .collect();
378                matched_any = true;
379            }
380        }
381
382        if let FilterRuleTarget::Specific(_) = &filter.rules
383            && !matched_any
384        {
385            log::warn!(
386                "filter '{}' references rules {:?} but none matched any loaded rule",
387                filter.title,
388                filter.rules
389            );
390        }
391
392        Ok(())
393    }
394
395    /// Add a pre-compiled rule directly and rebuild the index.
396    pub fn add_compiled_rule(&mut self, rule: CompiledRule) {
397        self.rules.push(rule);
398        self.rebuild_index();
399    }
400
401    /// Rebuild every per-engine index from the current rule set.
402    ///
403    /// Called after every rule mutation. Both the inverted index and the
404    /// per-field bloom filter must reflect the same view of the rules,
405    /// so they share a single rebuild path.
406    fn rebuild_index(&mut self) {
407        self.rule_index = RuleIndex::build(&self.rules);
408        self.bloom_index = match self.bloom_max_bytes {
409            Some(budget) => FieldBloomIndex::build_with_budget(&self.rules, budget),
410            None => FieldBloomIndex::build(&self.rules),
411        };
412        #[cfg(feature = "daachorse-index")]
413        {
414            if self.cross_rule_ac_enabled {
415                self.cross_rule_ac_index = cross_rule_ac::CrossRuleAcIndex::build(&self.rules);
416                self.cross_rule_ac_prunable = self
417                    .rules
418                    .iter()
419                    .map(cross_rule_ac::rule_is_ac_prunable)
420                    .collect();
421            } else {
422                self.cross_rule_ac_index = cross_rule_ac::CrossRuleAcIndex::empty();
423                self.cross_rule_ac_prunable.clear();
424            }
425        }
426    }
427
428    /// Evaluate an event against candidate rules using the inverted index.
429    pub fn evaluate<E: Event>(&self, event: &E) -> Vec<MatchResult> {
430        if self.bloom_prefilter {
431            self.evaluate_with_bloom_path(event)
432        } else {
433            self.evaluate_no_bloom_path(event)
434        }
435    }
436
437    /// Build the cross-rule AC keep-mask for `event`, or `None` when the
438    /// cross-rule index is disabled or empty (no filtering needed).
439    ///
440    /// `Some(mask)` answers "should this rule survive the cross-rule AC
441    /// filter": `mask[idx] = true` means keep, `false` means drop.
442    /// Non-AC-prunable rules are always kept.
443    #[cfg(feature = "daachorse-index")]
444    fn cross_rule_ac_keep_mask<E: Event>(&self, event: &E) -> Option<Vec<bool>> {
445        if !self.cross_rule_ac_enabled || self.cross_rule_ac_index.is_empty() {
446            return None;
447        }
448        let mut hits = vec![false; self.rules.len()];
449        self.cross_rule_ac_index.mark_hits(event, &mut hits);
450        // Compose: keep = !ac_prunable OR ac_hit. The prunable vector and
451        // the rule slice are kept aligned by `rebuild_index`.
452        for (idx, slot) in hits.iter_mut().enumerate() {
453            if !self
454                .cross_rule_ac_prunable
455                .get(idx)
456                .copied()
457                .unwrap_or(false)
458            {
459                *slot = true;
460            }
461        }
462        Some(hits)
463    }
464
465    #[cfg(not(feature = "daachorse-index"))]
466    #[inline(always)]
467    fn cross_rule_ac_keep_mask<E: Event>(&self, _event: &E) -> Option<Vec<bool>> {
468        None
469    }
470
471    fn evaluate_no_bloom_path<E: Event>(&self, event: &E) -> Vec<MatchResult> {
472        // The public `evaluate_rule` is not generic over `BloomLookup`, so
473        // the no-bloom hot path lowers to straight-line code identical to
474        // the pre-bloom engine.
475        let keep = self.cross_rule_ac_keep_mask(event);
476        let mut results = Vec::new();
477        for idx in self.rule_index.candidates(event) {
478            if let Some(ref mask) = keep
479                && !mask[idx]
480            {
481                continue;
482            }
483            let rule = &self.rules[idx];
484            if let Some(mut m) = evaluate_rule(rule, event) {
485                if self.include_event && m.event.is_none() {
486                    m.event = Some(event.to_json());
487                }
488                results.push(m);
489            }
490        }
491        results
492    }
493
494    fn evaluate_with_bloom_path<E: Event>(&self, event: &E) -> Vec<MatchResult> {
495        let bloom = BloomCache::new(&self.bloom_index, event);
496        let keep = self.cross_rule_ac_keep_mask(event);
497        let mut results = Vec::new();
498        for idx in self.rule_index.candidates(event) {
499            if let Some(ref mask) = keep
500                && !mask[idx]
501            {
502                continue;
503            }
504            let rule = &self.rules[idx];
505            if let Some(mut m) = evaluate_rule_with_bloom(rule, event, &bloom) {
506                if self.include_event && m.event.is_none() {
507                    m.event = Some(event.to_json());
508                }
509                results.push(m);
510            }
511        }
512        results
513    }
514
515    /// Evaluate an event against candidate rules matching the given logsource.
516    ///
517    /// Uses the inverted index for candidate pre-filtering, then applies the
518    /// logsource constraint. Only rules whose logsource is compatible with
519    /// `event_logsource` are evaluated.
520    pub fn evaluate_with_logsource<E: Event>(
521        &self,
522        event: &E,
523        event_logsource: &LogSource,
524    ) -> Vec<MatchResult> {
525        if self.bloom_prefilter {
526            self.evaluate_with_logsource_with_bloom(event, event_logsource)
527        } else {
528            self.evaluate_with_logsource_no_bloom(event, event_logsource)
529        }
530    }
531
532    fn evaluate_with_logsource_no_bloom<E: Event>(
533        &self,
534        event: &E,
535        event_logsource: &LogSource,
536    ) -> Vec<MatchResult> {
537        let keep = self.cross_rule_ac_keep_mask(event);
538        let mut results = Vec::new();
539        for idx in self.rule_index.candidates(event) {
540            if let Some(ref mask) = keep
541                && !mask[idx]
542            {
543                continue;
544            }
545            let rule = &self.rules[idx];
546            if logsource_matches(&rule.logsource, event_logsource)
547                && let Some(mut m) = evaluate_rule(rule, event)
548            {
549                if self.include_event && m.event.is_none() {
550                    m.event = Some(event.to_json());
551                }
552                results.push(m);
553            }
554        }
555        results
556    }
557
558    fn evaluate_with_logsource_with_bloom<E: Event>(
559        &self,
560        event: &E,
561        event_logsource: &LogSource,
562    ) -> Vec<MatchResult> {
563        let bloom = BloomCache::new(&self.bloom_index, event);
564        let keep = self.cross_rule_ac_keep_mask(event);
565        let mut results = Vec::new();
566        for idx in self.rule_index.candidates(event) {
567            if let Some(ref mask) = keep
568                && !mask[idx]
569            {
570                continue;
571            }
572            let rule = &self.rules[idx];
573            if logsource_matches(&rule.logsource, event_logsource)
574                && let Some(mut m) = evaluate_rule_with_bloom(rule, event, &bloom)
575            {
576                if self.include_event && m.event.is_none() {
577                    m.event = Some(event.to_json());
578                }
579                results.push(m);
580            }
581        }
582        results
583    }
584
585    /// Evaluate a batch of events, returning per-event match results.
586    ///
587    /// When the `parallel` feature is enabled, events are evaluated concurrently
588    /// using rayon's work-stealing thread pool. Otherwise, falls back to
589    /// sequential evaluation.
590    pub fn evaluate_batch<E: Event + Sync>(&self, events: &[&E]) -> Vec<Vec<MatchResult>> {
591        #[cfg(feature = "parallel")]
592        {
593            use rayon::prelude::*;
594            events.par_iter().map(|e| self.evaluate(e)).collect()
595        }
596        #[cfg(not(feature = "parallel"))]
597        {
598            events.iter().map(|e| self.evaluate(e)).collect()
599        }
600    }
601
602    /// Number of rules loaded in the engine.
603    pub fn rule_count(&self) -> usize {
604        self.rules.len()
605    }
606
607    /// Access the compiled rules.
608    pub fn rules(&self) -> &[CompiledRule] {
609        &self.rules
610    }
611}
612
613impl Default for Engine {
614    fn default() -> Self {
615        Self::new()
616    }
617}