rsigma_eval/engine/mod.rs
1//! Rule evaluation engine with logsource routing.
2//!
3//! The `Engine` manages a set of compiled Sigma rules and evaluates events
4//! against them. It supports optional logsource-based pre-filtering to
5//! reduce the number of rules evaluated per event.
6
7pub(crate) mod bloom_index;
8#[cfg(feature = "daachorse-index")]
9pub(crate) mod cross_rule_ac;
10mod filters;
11#[cfg(test)]
12mod tests;
13
14use rsigma_parser::{
15 ConditionExpr, FilterRule, FilterRuleTarget, LogSource, SigmaCollection, SigmaRule,
16};
17
18use crate::compiler::{
19 CompiledRule, compile_detection, compile_rule, evaluate_rule, evaluate_rule_with_bloom,
20};
21use crate::error::Result;
22use crate::event::Event;
23use crate::pipeline::{Pipeline, apply_pipelines};
24use crate::result::MatchResult;
25use crate::rule_index::RuleIndex;
26
27use bloom_index::{BloomCache, FieldBloomIndex};
28
29use filters::{filter_logsource_contains, logsource_matches, rewrite_condition_identifiers};
30
31/// The main rule evaluation engine.
32///
33/// Holds a set of compiled rules and provides methods to evaluate events
34/// against them. Supports optional logsource routing for performance.
35///
36/// # Example
37///
38/// ```rust
39/// use rsigma_parser::parse_sigma_yaml;
40/// use rsigma_eval::{Engine, Event};
41/// use rsigma_eval::event::JsonEvent;
42/// use serde_json::json;
43///
44/// let yaml = r#"
45/// title: Detect Whoami
46/// logsource:
47/// product: windows
48/// category: process_creation
49/// detection:
50/// selection:
51/// CommandLine|contains: 'whoami'
52/// condition: selection
53/// level: medium
54/// "#;
55///
56/// let collection = parse_sigma_yaml(yaml).unwrap();
57/// let mut engine = Engine::new();
58/// engine.add_collection(&collection).unwrap();
59///
60/// let event_val = json!({"CommandLine": "cmd /c whoami"});
61/// let event = JsonEvent::borrow(&event_val);
62/// let matches = engine.evaluate(&event);
63/// assert_eq!(matches.len(), 1);
64/// assert_eq!(matches[0].rule_title, "Detect Whoami");
65/// ```
66pub struct Engine {
67 rules: Vec<CompiledRule>,
68 pipelines: Vec<Pipeline>,
69 /// Global override: include the full event JSON in all match results.
70 /// When `true`, overrides per-rule `rsigma.include_event` custom attributes.
71 include_event: bool,
72 /// Monotonic counter used to namespace injected filter detections,
73 /// preventing key collisions when multiple filters share detection names.
74 filter_counter: usize,
75 /// Inverted index mapping `(field, exact_value)` to candidate rule indices.
76 /// Rebuilt after every rule mutation (add, filter).
77 rule_index: RuleIndex,
78 /// Per-field bloom filter over positive substring needles. Rebuilt
79 /// alongside `rule_index`. Consulted only when `bloom_prefilter` is
80 /// enabled.
81 bloom_index: FieldBloomIndex,
82 /// Toggle for bloom pre-filtering. Off by default: the per-event probe
83 /// overhead exceeds the savings on rule sets where most events overlap
84 /// with at least one needle's trigrams. Workloads with many substring
85 /// rules and mostly-non-matching events (e.g. high-volume telemetry
86 /// streams against an active threat-intel ruleset) opt in via
87 /// [`Engine::set_bloom_prefilter`].
88 bloom_prefilter: bool,
89 /// Memory budget the bloom builder is allowed to consume across all
90 /// per-field filters. `None` means use the crate default
91 /// (`bloom_index::DEFAULT_MAX_TOTAL_BYTES`, 1 MB).
92 bloom_max_bytes: Option<usize>,
93 /// Cross-rule Aho-Corasick index for substring patterns, gated on the
94 /// `daachorse-index` feature. Built only when [`cross_rule_ac_enabled`]
95 /// is `true`; [`cross_rule_ac_prunable`] is the conservative per-rule
96 /// flag computed at the same time so the `evaluate` hot path can drop
97 /// rules safely.
98 ///
99 /// [`cross_rule_ac_enabled`]: Self::cross_rule_ac_enabled
100 /// [`cross_rule_ac_prunable`]: Self::cross_rule_ac_prunable
101 #[cfg(feature = "daachorse-index")]
102 cross_rule_ac_index: cross_rule_ac::CrossRuleAcIndex,
103 /// Toggle for the cross-rule AC pre-filter. Off by default; the index
104 /// only pays off on rule sets > 5K rules with many shared substring
105 /// patterns. See [`Engine::set_cross_rule_ac`].
106 #[cfg(feature = "daachorse-index")]
107 cross_rule_ac_enabled: bool,
108 /// Per-rule conservative AC-prunability flag. `true` iff the rule's
109 /// firing requires at least one positive substring match (no `Exact`,
110 /// `Regex`, `Numeric`, `Not`, etc.), so dropping the rule on a
111 /// "no AC hit" verdict is provably correct.
112 #[cfg(feature = "daachorse-index")]
113 cross_rule_ac_prunable: Vec<bool>,
114}
115
116impl Engine {
117 /// Create a new empty engine.
118 pub fn new() -> Self {
119 Engine {
120 rules: Vec::new(),
121 pipelines: Vec::new(),
122 include_event: false,
123 filter_counter: 0,
124 rule_index: RuleIndex::empty(),
125 bloom_index: FieldBloomIndex::empty(),
126 bloom_prefilter: false,
127 bloom_max_bytes: None,
128 #[cfg(feature = "daachorse-index")]
129 cross_rule_ac_index: cross_rule_ac::CrossRuleAcIndex::empty(),
130 #[cfg(feature = "daachorse-index")]
131 cross_rule_ac_enabled: false,
132 #[cfg(feature = "daachorse-index")]
133 cross_rule_ac_prunable: Vec::new(),
134 }
135 }
136
137 /// Create a new engine with a pipeline.
138 pub fn new_with_pipeline(pipeline: Pipeline) -> Self {
139 Engine {
140 rules: Vec::new(),
141 pipelines: vec![pipeline],
142 include_event: false,
143 filter_counter: 0,
144 rule_index: RuleIndex::empty(),
145 bloom_index: FieldBloomIndex::empty(),
146 bloom_prefilter: false,
147 bloom_max_bytes: None,
148 #[cfg(feature = "daachorse-index")]
149 cross_rule_ac_index: cross_rule_ac::CrossRuleAcIndex::empty(),
150 #[cfg(feature = "daachorse-index")]
151 cross_rule_ac_enabled: false,
152 #[cfg(feature = "daachorse-index")]
153 cross_rule_ac_prunable: Vec::new(),
154 }
155 }
156
157 /// Enable or disable bloom-filter pre-filtering of positive substring
158 /// detection items.
159 ///
160 /// When enabled, `evaluate*` short-circuits any positive substring
161 /// matcher (`Contains` / `StartsWith` / `EndsWith` / `AhoCorasickSet`,
162 /// alone or wrapped in `CaseInsensitiveGroup`) whose field cannot
163 /// possibly contain a needle trigram.
164 ///
165 /// Disabled by default. The per-event probe (trigram extraction +
166 /// double hashing) costs ~1 µs on a typical CommandLine field, which
167 /// outweighs the savings on rule sets where most events overlap with
168 /// at least one needle. Enable for workloads that pair many substring
169 /// rules with mostly-non-matching events; benchmark with
170 /// `eval_bloom_rejection` before flipping it on in production.
171 pub fn set_bloom_prefilter(&mut self, enabled: bool) {
172 self.bloom_prefilter = enabled;
173 }
174
175 /// Returns whether bloom pre-filtering is currently enabled.
176 pub fn bloom_prefilter_enabled(&self) -> bool {
177 self.bloom_prefilter
178 }
179
180 /// Set the memory budget for the per-field bloom index.
181 ///
182 /// Must be called **before** `add_collection` / `add_rule` for the new
183 /// budget to take effect on the existing rule set; otherwise it is
184 /// applied at the next index rebuild. The default budget is 1 MB,
185 /// shared across all per-field filters. Lower the cap on memory-
186 /// constrained deployments; raise it for large rule sets where the
187 /// default starts evicting useful filters.
188 pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
189 self.bloom_max_bytes = Some(max_bytes);
190 if !self.rules.is_empty() {
191 self.rebuild_index();
192 }
193 }
194
195 /// Returns the configured bloom memory budget, if one has been set
196 /// explicitly. `None` means the crate default (1 MB) is in use.
197 pub fn bloom_max_bytes(&self) -> Option<usize> {
198 self.bloom_max_bytes
199 }
200
201 /// Enable or disable the cross-rule Aho-Corasick pre-filter.
202 ///
203 /// When enabled, the engine builds a single per-field
204 /// `DoubleArrayAhoCorasick` automaton over every positive substring
205 /// needle from every rule and drops AC-prunable rules from the
206 /// candidate set when none of their patterns hit the event.
207 ///
208 /// Off by default. Pays off on large rule sets (> ~5K rules) with many
209 /// shared substring patterns (threat-intel feeds, IOC packs). For
210 /// smaller rule sets the per-rule [`AhoCorasickSet`] matcher already
211 /// handles the workload optimally; the cross-rule index only adds
212 /// build-time and lookup overhead. Benchmark with `eval_cross_rule_ac`
213 /// against representative rule sets before enabling in production.
214 ///
215 /// Available behind the `daachorse-index` Cargo feature.
216 ///
217 /// [`AhoCorasickSet`]: crate::matcher::CompiledMatcher::AhoCorasickSet
218 #[cfg(feature = "daachorse-index")]
219 pub fn set_cross_rule_ac(&mut self, enabled: bool) {
220 self.cross_rule_ac_enabled = enabled;
221 if enabled && !self.rules.is_empty() {
222 self.rebuild_index();
223 }
224 }
225
226 /// Returns whether the cross-rule AC pre-filter is currently enabled.
227 /// Available behind the `daachorse-index` Cargo feature.
228 #[cfg(feature = "daachorse-index")]
229 pub fn cross_rule_ac_enabled(&self) -> bool {
230 self.cross_rule_ac_enabled
231 }
232
233 /// Set global `include_event` — when `true`, all match results include
234 /// the full event JSON regardless of per-rule custom attributes.
235 pub fn set_include_event(&mut self, include: bool) {
236 self.include_event = include;
237 }
238
239 /// Add a pipeline to the engine.
240 ///
241 /// Pipelines are applied to rules during `add_rule` / `add_collection`.
242 /// Only affects rules added **after** this call.
243 pub fn add_pipeline(&mut self, pipeline: Pipeline) {
244 self.pipelines.push(pipeline);
245 self.pipelines.sort_by_key(|p| p.priority);
246 }
247
248 /// Add a single parsed Sigma rule.
249 ///
250 /// If pipelines are set, the rule is cloned and transformed before compilation.
251 /// The inverted index is rebuilt after adding the rule.
252 pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
253 let compiled = if self.pipelines.is_empty() {
254 compile_rule(rule)?
255 } else {
256 let mut transformed = rule.clone();
257 apply_pipelines(&self.pipelines, &mut transformed)?;
258 compile_rule(&transformed)?
259 };
260 self.rules.push(compiled);
261 self.rebuild_index();
262 Ok(())
263 }
264
265 /// Add all detection rules from a parsed collection, then apply filters.
266 ///
267 /// Filter rules modify referenced detection rules by appending exclusion
268 /// conditions. Correlation rules are handled by `CorrelationEngine`.
269 /// The inverted index is rebuilt once after all rules and filters are loaded.
270 pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
271 for rule in &collection.rules {
272 let compiled = if self.pipelines.is_empty() {
273 compile_rule(rule)?
274 } else {
275 let mut transformed = rule.clone();
276 apply_pipelines(&self.pipelines, &mut transformed)?;
277 compile_rule(&transformed)?
278 };
279 self.rules.push(compiled);
280 }
281 for filter in &collection.filters {
282 self.apply_filter_no_rebuild(filter)?;
283 }
284 self.rebuild_index();
285 Ok(())
286 }
287
288 /// Add all detection rules from a collection, applying the given pipelines.
289 ///
290 /// This is a convenience method that temporarily sets pipelines, adds the
291 /// collection, then clears them. The inverted index is rebuilt once after
292 /// all rules and filters are loaded.
293 pub fn add_collection_with_pipelines(
294 &mut self,
295 collection: &SigmaCollection,
296 pipelines: &[Pipeline],
297 ) -> Result<()> {
298 let prev = std::mem::take(&mut self.pipelines);
299 self.pipelines = pipelines.to_vec();
300 self.pipelines.sort_by_key(|p| p.priority);
301 let result = self.add_collection(collection);
302 self.pipelines = prev;
303 result
304 }
305
306 /// Apply a filter rule to all referenced detection rules and rebuild the index.
307 pub fn apply_filter(&mut self, filter: &FilterRule) -> Result<()> {
308 self.apply_filter_no_rebuild(filter)?;
309 self.rebuild_index();
310 Ok(())
311 }
312
313 /// Apply a filter rule without rebuilding the index.
314 /// Used internally when multiple mutations are batched.
315 fn apply_filter_no_rebuild(&mut self, filter: &FilterRule) -> Result<()> {
316 // Compile filter detections
317 let mut filter_detections = Vec::new();
318 for (name, detection) in &filter.detection.named {
319 let compiled = compile_detection(detection)?;
320 filter_detections.push((name.clone(), compiled));
321 }
322
323 if filter_detections.is_empty() {
324 return Ok(());
325 }
326
327 let fc = self.filter_counter;
328 self.filter_counter += 1;
329
330 // Rewrite the filter's own condition expression with namespaced identifiers
331 // so that `selection` becomes `__filter_0_selection`, etc.
332 let rewritten_cond = if let Some(cond_expr) = filter.detection.conditions.first() {
333 rewrite_condition_identifiers(cond_expr, fc)
334 } else {
335 // No explicit condition: AND all detections (legacy fallback)
336 if filter_detections.len() == 1 {
337 ConditionExpr::Identifier(format!("__filter_{fc}_{}", filter_detections[0].0))
338 } else {
339 ConditionExpr::And(
340 filter_detections
341 .iter()
342 .map(|(name, _)| ConditionExpr::Identifier(format!("__filter_{fc}_{name}")))
343 .collect(),
344 )
345 }
346 };
347
348 // Find and modify referenced rules
349 let mut matched_any = false;
350 for rule in &mut self.rules {
351 let rule_matches = match &filter.rules {
352 FilterRuleTarget::Any => true,
353 FilterRuleTarget::Specific(refs) => refs
354 .iter()
355 .any(|r| rule.id.as_deref() == Some(r.as_str()) || rule.title == *r),
356 };
357
358 // Also check logsource compatibility if the filter specifies one
359 if rule_matches {
360 if let Some(ref filter_ls) = filter.logsource
361 && !filter_logsource_contains(filter_ls, &rule.logsource)
362 {
363 continue;
364 }
365
366 // Inject filter detections into the rule
367 for (name, compiled) in &filter_detections {
368 rule.detections
369 .insert(format!("__filter_{fc}_{name}"), compiled.clone());
370 }
371
372 // Wrap each existing rule condition with the filter condition
373 rule.conditions = rule
374 .conditions
375 .iter()
376 .map(|cond| ConditionExpr::And(vec![cond.clone(), rewritten_cond.clone()]))
377 .collect();
378 matched_any = true;
379 }
380 }
381
382 if let FilterRuleTarget::Specific(_) = &filter.rules
383 && !matched_any
384 {
385 log::warn!(
386 "filter '{}' references rules {:?} but none matched any loaded rule",
387 filter.title,
388 filter.rules
389 );
390 }
391
392 Ok(())
393 }
394
395 /// Add a pre-compiled rule directly and rebuild the index.
396 pub fn add_compiled_rule(&mut self, rule: CompiledRule) {
397 self.rules.push(rule);
398 self.rebuild_index();
399 }
400
401 /// Rebuild every per-engine index from the current rule set.
402 ///
403 /// Called after every rule mutation. Both the inverted index and the
404 /// per-field bloom filter must reflect the same view of the rules,
405 /// so they share a single rebuild path.
406 fn rebuild_index(&mut self) {
407 self.rule_index = RuleIndex::build(&self.rules);
408 self.bloom_index = match self.bloom_max_bytes {
409 Some(budget) => FieldBloomIndex::build_with_budget(&self.rules, budget),
410 None => FieldBloomIndex::build(&self.rules),
411 };
412 #[cfg(feature = "daachorse-index")]
413 {
414 if self.cross_rule_ac_enabled {
415 self.cross_rule_ac_index = cross_rule_ac::CrossRuleAcIndex::build(&self.rules);
416 self.cross_rule_ac_prunable = self
417 .rules
418 .iter()
419 .map(cross_rule_ac::rule_is_ac_prunable)
420 .collect();
421 } else {
422 self.cross_rule_ac_index = cross_rule_ac::CrossRuleAcIndex::empty();
423 self.cross_rule_ac_prunable.clear();
424 }
425 }
426 }
427
428 /// Evaluate an event against candidate rules using the inverted index.
429 pub fn evaluate<E: Event>(&self, event: &E) -> Vec<MatchResult> {
430 if self.bloom_prefilter {
431 self.evaluate_with_bloom_path(event)
432 } else {
433 self.evaluate_no_bloom_path(event)
434 }
435 }
436
437 /// Build the cross-rule AC keep-mask for `event`, or `None` when the
438 /// cross-rule index is disabled or empty (no filtering needed).
439 ///
440 /// `Some(mask)` answers "should this rule survive the cross-rule AC
441 /// filter": `mask[idx] = true` means keep, `false` means drop.
442 /// Non-AC-prunable rules are always kept.
443 #[cfg(feature = "daachorse-index")]
444 fn cross_rule_ac_keep_mask<E: Event>(&self, event: &E) -> Option<Vec<bool>> {
445 if !self.cross_rule_ac_enabled || self.cross_rule_ac_index.is_empty() {
446 return None;
447 }
448 let mut hits = vec![false; self.rules.len()];
449 self.cross_rule_ac_index.mark_hits(event, &mut hits);
450 // Compose: keep = !ac_prunable OR ac_hit. The prunable vector and
451 // the rule slice are kept aligned by `rebuild_index`.
452 for (idx, slot) in hits.iter_mut().enumerate() {
453 if !self
454 .cross_rule_ac_prunable
455 .get(idx)
456 .copied()
457 .unwrap_or(false)
458 {
459 *slot = true;
460 }
461 }
462 Some(hits)
463 }
464
465 #[cfg(not(feature = "daachorse-index"))]
466 #[inline(always)]
467 fn cross_rule_ac_keep_mask<E: Event>(&self, _event: &E) -> Option<Vec<bool>> {
468 None
469 }
470
471 fn evaluate_no_bloom_path<E: Event>(&self, event: &E) -> Vec<MatchResult> {
472 // The public `evaluate_rule` is not generic over `BloomLookup`, so
473 // the no-bloom hot path lowers to straight-line code identical to
474 // the pre-bloom engine.
475 let keep = self.cross_rule_ac_keep_mask(event);
476 let mut results = Vec::new();
477 for idx in self.rule_index.candidates(event) {
478 if let Some(ref mask) = keep
479 && !mask[idx]
480 {
481 continue;
482 }
483 let rule = &self.rules[idx];
484 if let Some(mut m) = evaluate_rule(rule, event) {
485 if self.include_event && m.event.is_none() {
486 m.event = Some(event.to_json());
487 }
488 results.push(m);
489 }
490 }
491 results
492 }
493
494 fn evaluate_with_bloom_path<E: Event>(&self, event: &E) -> Vec<MatchResult> {
495 let bloom = BloomCache::new(&self.bloom_index, event);
496 let keep = self.cross_rule_ac_keep_mask(event);
497 let mut results = Vec::new();
498 for idx in self.rule_index.candidates(event) {
499 if let Some(ref mask) = keep
500 && !mask[idx]
501 {
502 continue;
503 }
504 let rule = &self.rules[idx];
505 if let Some(mut m) = evaluate_rule_with_bloom(rule, event, &bloom) {
506 if self.include_event && m.event.is_none() {
507 m.event = Some(event.to_json());
508 }
509 results.push(m);
510 }
511 }
512 results
513 }
514
515 /// Evaluate an event against candidate rules matching the given logsource.
516 ///
517 /// Uses the inverted index for candidate pre-filtering, then applies the
518 /// logsource constraint. Only rules whose logsource is compatible with
519 /// `event_logsource` are evaluated.
520 pub fn evaluate_with_logsource<E: Event>(
521 &self,
522 event: &E,
523 event_logsource: &LogSource,
524 ) -> Vec<MatchResult> {
525 if self.bloom_prefilter {
526 self.evaluate_with_logsource_with_bloom(event, event_logsource)
527 } else {
528 self.evaluate_with_logsource_no_bloom(event, event_logsource)
529 }
530 }
531
532 fn evaluate_with_logsource_no_bloom<E: Event>(
533 &self,
534 event: &E,
535 event_logsource: &LogSource,
536 ) -> Vec<MatchResult> {
537 let keep = self.cross_rule_ac_keep_mask(event);
538 let mut results = Vec::new();
539 for idx in self.rule_index.candidates(event) {
540 if let Some(ref mask) = keep
541 && !mask[idx]
542 {
543 continue;
544 }
545 let rule = &self.rules[idx];
546 if logsource_matches(&rule.logsource, event_logsource)
547 && let Some(mut m) = evaluate_rule(rule, event)
548 {
549 if self.include_event && m.event.is_none() {
550 m.event = Some(event.to_json());
551 }
552 results.push(m);
553 }
554 }
555 results
556 }
557
558 fn evaluate_with_logsource_with_bloom<E: Event>(
559 &self,
560 event: &E,
561 event_logsource: &LogSource,
562 ) -> Vec<MatchResult> {
563 let bloom = BloomCache::new(&self.bloom_index, event);
564 let keep = self.cross_rule_ac_keep_mask(event);
565 let mut results = Vec::new();
566 for idx in self.rule_index.candidates(event) {
567 if let Some(ref mask) = keep
568 && !mask[idx]
569 {
570 continue;
571 }
572 let rule = &self.rules[idx];
573 if logsource_matches(&rule.logsource, event_logsource)
574 && let Some(mut m) = evaluate_rule_with_bloom(rule, event, &bloom)
575 {
576 if self.include_event && m.event.is_none() {
577 m.event = Some(event.to_json());
578 }
579 results.push(m);
580 }
581 }
582 results
583 }
584
585 /// Evaluate a batch of events, returning per-event match results.
586 ///
587 /// When the `parallel` feature is enabled, events are evaluated concurrently
588 /// using rayon's work-stealing thread pool. Otherwise, falls back to
589 /// sequential evaluation.
590 pub fn evaluate_batch<E: Event + Sync>(&self, events: &[&E]) -> Vec<Vec<MatchResult>> {
591 #[cfg(feature = "parallel")]
592 {
593 use rayon::prelude::*;
594 events.par_iter().map(|e| self.evaluate(e)).collect()
595 }
596 #[cfg(not(feature = "parallel"))]
597 {
598 events.iter().map(|e| self.evaluate(e)).collect()
599 }
600 }
601
602 /// Number of rules loaded in the engine.
603 pub fn rule_count(&self) -> usize {
604 self.rules.len()
605 }
606
607 /// Access the compiled rules.
608 pub fn rules(&self) -> &[CompiledRule] {
609 &self.rules
610 }
611}
612
613impl Default for Engine {
614 fn default() -> Self {
615 Self::new()
616 }
617}