rsigma_eval/engine/mod.rs
1//! Rule evaluation engine with logsource routing.
2//!
3//! The `Engine` manages a set of compiled Sigma rules and evaluates events
4//! against them. It supports optional logsource-based pre-filtering to
5//! reduce the number of rules evaluated per event.
6
7pub(crate) mod bloom_index;
8#[cfg(feature = "daachorse-index")]
9pub(crate) mod cross_rule_ac;
10mod filters;
11#[cfg(test)]
12mod tests;
13
14use std::sync::atomic::{AtomicU64, Ordering};
15
16use rsigma_parser::{
17 ConditionExpr, FilterRule, FilterRuleTarget, LogSource, SigmaCollection, SigmaRule,
18};
19
20use crate::compiler::{CompiledRule, compile_detection, compile_rule, evaluate_rule_with_bloom};
21use crate::error::{EvalError, Result};
22use crate::event::Event;
23use crate::logsource::LogSourceExtractor;
24use crate::pipeline::{Pipeline, apply_pipelines};
25use crate::result::{EvaluationResult, MatchDetailLevel};
26use crate::rule_index::RuleIndex;
27
28use bloom_index::{BloomCache, FieldBloomIndex};
29
30use filters::{
31 filter_logsource_contains, logsource_compatible, logsource_matches,
32 rewrite_condition_identifiers,
33};
34
35/// The main rule evaluation engine.
36///
37/// Holds a set of compiled rules and provides methods to evaluate events
38/// against them. Supports optional logsource routing for performance.
39///
40/// # Example
41///
42/// ```rust
43/// use rsigma_parser::parse_sigma_yaml;
44/// use rsigma_eval::{Engine, Event};
45/// use rsigma_eval::event::JsonEvent;
46/// use serde_json::json;
47///
48/// let yaml = r#"
49/// title: Detect Whoami
50/// logsource:
51/// product: windows
52/// category: process_creation
53/// detection:
54/// selection:
55/// CommandLine|contains: 'whoami'
56/// condition: selection
57/// level: medium
58/// "#;
59///
60/// let collection = parse_sigma_yaml(yaml).unwrap();
61/// let mut engine = Engine::new();
62/// engine.add_collection(&collection).unwrap();
63///
64/// let event_val = json!({"CommandLine": "cmd /c whoami"});
65/// let event = JsonEvent::borrow(&event_val);
66/// let matches = engine.evaluate(&event);
67/// assert_eq!(matches.len(), 1);
68/// assert_eq!(matches[0].header.rule_title, "Detect Whoami");
69/// ```
70pub struct Engine {
71 rules: Vec<CompiledRule>,
72 pipelines: Vec<Pipeline>,
73 /// Global override: include the full event JSON in all match results.
74 /// When `true`, overrides per-rule `rsigma.include_event` custom attributes.
75 include_event: bool,
76 /// Verbosity of the match detail recorded on detection results.
77 /// `Off` by default, which preserves the historical `{ field, value }`
78 /// wire shape. See [`Engine::set_match_detail`].
79 match_detail: MatchDetailLevel,
80 /// Monotonic counter used to namespace injected filter detections,
81 /// preventing key collisions when multiple filters share detection names.
82 filter_counter: usize,
83 /// Inverted index mapping `(field, exact_value)` to candidate rule indices.
84 /// Rebuilt after every rule mutation (add, filter).
85 rule_index: RuleIndex,
86 /// Per-field bloom filter over positive substring needles. Rebuilt
87 /// alongside `rule_index`. Consulted only when `bloom_prefilter` is
88 /// enabled.
89 bloom_index: FieldBloomIndex,
90 /// Toggle for bloom pre-filtering. Off by default: the per-event probe
91 /// overhead exceeds the savings on rule sets where most events overlap
92 /// with at least one needle's trigrams. Workloads with many substring
93 /// rules and mostly-non-matching events (e.g. high-volume telemetry
94 /// streams against an active threat-intel ruleset) opt in via
95 /// [`Engine::set_bloom_prefilter`].
96 bloom_prefilter: bool,
97 /// Memory budget the bloom builder is allowed to consume across all
98 /// per-field filters. `None` means use the crate default
99 /// (`bloom_index::DEFAULT_MAX_TOTAL_BYTES`, 1 MB).
100 bloom_max_bytes: Option<usize>,
101 /// Opt-in event-logsource extractor for conflict-based rule pruning.
102 /// `None` (default) leaves the hot path unchanged; when `Some`, the
103 /// engine extracts each event's logsource once and skips rules whose
104 /// logsource conflicts (see [`Engine::set_logsource_extractor`]).
105 logsource_extractor: Option<LogSourceExtractor>,
106 /// Monotonic count of always-evaluated rules skipped because their
107 /// product conflicts with the event's. Incremented only when an extractor
108 /// is set; surfaced via [`Engine::logsource_pruned_total`].
109 logsource_pruned: AtomicU64,
110 /// Monotonic count of `evaluate` calls where the extractor produced no
111 /// logsource at all (fail-open: every rule was evaluated). Surfaced via
112 /// [`Engine::logsource_absent_total`].
113 logsource_absent: AtomicU64,
114 /// Cross-rule Aho-Corasick index for substring patterns, gated on the
115 /// `daachorse-index` feature. Built only when [`cross_rule_ac_enabled`]
116 /// is `true`; [`cross_rule_ac_prunable`] is the conservative per-rule
117 /// flag computed at the same time so the `evaluate` hot path can drop
118 /// rules safely.
119 ///
120 /// [`cross_rule_ac_enabled`]: Self::cross_rule_ac_enabled
121 /// [`cross_rule_ac_prunable`]: Self::cross_rule_ac_prunable
122 #[cfg(feature = "daachorse-index")]
123 cross_rule_ac_index: cross_rule_ac::CrossRuleAcIndex,
124 /// Toggle for the cross-rule AC pre-filter. Off by default; the index
125 /// only pays off on rule sets > 5K rules with many shared substring
126 /// patterns. See [`Engine::set_cross_rule_ac`].
127 #[cfg(feature = "daachorse-index")]
128 cross_rule_ac_enabled: bool,
129 /// Per-rule conservative AC-prunability flag. `true` iff the rule's
130 /// firing requires at least one positive substring match (no `Exact`,
131 /// `Regex`, `Numeric`, `Not`, etc.), so dropping the rule on a
132 /// "no AC hit" verdict is provably correct.
133 #[cfg(feature = "daachorse-index")]
134 cross_rule_ac_prunable: Vec<bool>,
135}
136
137impl Engine {
138 /// Create a new empty engine.
139 pub fn new() -> Self {
140 Engine {
141 rules: Vec::new(),
142 pipelines: Vec::new(),
143 include_event: false,
144 match_detail: MatchDetailLevel::Off,
145 filter_counter: 0,
146 rule_index: RuleIndex::empty(),
147 bloom_index: FieldBloomIndex::empty(),
148 bloom_prefilter: false,
149 bloom_max_bytes: None,
150 logsource_extractor: None,
151 logsource_pruned: AtomicU64::new(0),
152 logsource_absent: AtomicU64::new(0),
153 #[cfg(feature = "daachorse-index")]
154 cross_rule_ac_index: cross_rule_ac::CrossRuleAcIndex::empty(),
155 #[cfg(feature = "daachorse-index")]
156 cross_rule_ac_enabled: false,
157 #[cfg(feature = "daachorse-index")]
158 cross_rule_ac_prunable: Vec::new(),
159 }
160 }
161
162 /// Create a new engine with a pipeline.
163 pub fn new_with_pipeline(pipeline: Pipeline) -> Self {
164 Engine {
165 rules: Vec::new(),
166 pipelines: vec![pipeline],
167 include_event: false,
168 match_detail: MatchDetailLevel::Off,
169 filter_counter: 0,
170 rule_index: RuleIndex::empty(),
171 bloom_index: FieldBloomIndex::empty(),
172 bloom_prefilter: false,
173 bloom_max_bytes: None,
174 logsource_extractor: None,
175 logsource_pruned: AtomicU64::new(0),
176 logsource_absent: AtomicU64::new(0),
177 #[cfg(feature = "daachorse-index")]
178 cross_rule_ac_index: cross_rule_ac::CrossRuleAcIndex::empty(),
179 #[cfg(feature = "daachorse-index")]
180 cross_rule_ac_enabled: false,
181 #[cfg(feature = "daachorse-index")]
182 cross_rule_ac_prunable: Vec::new(),
183 }
184 }
185
186 /// Enable or disable bloom-filter pre-filtering of positive substring
187 /// detection items.
188 ///
189 /// When enabled, `evaluate*` short-circuits any positive substring
190 /// matcher (`Contains` / `StartsWith` / `EndsWith` / `AhoCorasickSet`,
191 /// alone or wrapped in `CaseInsensitiveGroup`) whose field cannot
192 /// possibly contain a needle trigram.
193 ///
194 /// Disabled by default. The per-event probe (trigram extraction +
195 /// double hashing) costs ~1 µs on a typical CommandLine field, which
196 /// outweighs the savings on rule sets where most events overlap with
197 /// at least one needle. Enable for workloads that pair many substring
198 /// rules with mostly-non-matching events; benchmark with
199 /// `eval_bloom_rejection` before flipping it on in production.
200 pub fn set_bloom_prefilter(&mut self, enabled: bool) {
201 self.bloom_prefilter = enabled;
202 }
203
204 /// Returns whether bloom pre-filtering is currently enabled.
205 pub fn bloom_prefilter_enabled(&self) -> bool {
206 self.bloom_prefilter
207 }
208
209 /// Set the memory budget for the per-field bloom index.
210 ///
211 /// Must be called **before** `add_collection` / `add_rule` for the new
212 /// budget to take effect on the existing rule set; otherwise it is
213 /// applied at the next index rebuild. The default budget is 1 MB,
214 /// shared across all per-field filters. Lower the cap on memory-
215 /// constrained deployments; raise it for large rule sets where the
216 /// default starts evicting useful filters.
217 pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
218 self.bloom_max_bytes = Some(max_bytes);
219 if !self.rules.is_empty() {
220 self.rebuild_index();
221 }
222 }
223
224 /// Returns the configured bloom memory budget, if one has been set
225 /// explicitly. `None` means the crate default (1 MB) is in use.
226 pub fn bloom_max_bytes(&self) -> Option<usize> {
227 self.bloom_max_bytes
228 }
229
230 /// Enable or disable opt-in, conflict-based logsource pruning.
231 ///
232 /// When set to `Some`, `evaluate` extracts each event's logsource once via
233 /// the [`LogSourceExtractor`] and skips any candidate rule whose logsource
234 /// conflicts with the event's (a dimension set on both sides that
235 /// disagrees). A dimension unset on either side is a wildcard, so an event
236 /// tagged only `product: windows` skips `product: linux` rules while still
237 /// evaluating Windows-category and logsource-less rules.
238 ///
239 /// Disabled by default (`None`), leaving the hot path unchanged. Pruning
240 /// fails open: an event with no extractable logsource evaluates every
241 /// rule. The extractor is read on every `evaluate` call, so it can be
242 /// swapped at runtime (e.g. carried across a hot-reload).
243 pub fn set_logsource_extractor(&mut self, extractor: Option<LogSourceExtractor>) {
244 self.logsource_extractor = extractor;
245 }
246
247 /// Returns the configured logsource extractor, if any. `None` means
248 /// logsource pruning is disabled.
249 pub fn logsource_extractor(&self) -> Option<&LogSourceExtractor> {
250 self.logsource_extractor.as_ref()
251 }
252
253 /// Total always-evaluated rules skipped by logsource product pruning since
254 /// engine creation. Zero unless an extractor is set.
255 pub fn logsource_pruned_total(&self) -> u64 {
256 self.logsource_pruned.load(Ordering::Relaxed)
257 }
258
259 /// Total `evaluate` calls where the extractor produced no logsource and
260 /// pruning failed open (every rule evaluated). Zero unless an extractor
261 /// is set.
262 pub fn logsource_absent_total(&self) -> u64 {
263 self.logsource_absent.load(Ordering::Relaxed)
264 }
265
266 /// Enable or disable the cross-rule Aho-Corasick pre-filter.
267 ///
268 /// When enabled, the engine builds a single per-field
269 /// `DoubleArrayAhoCorasick` automaton over every positive substring
270 /// needle from every rule and drops AC-prunable rules from the
271 /// candidate set when none of their patterns hit the event.
272 ///
273 /// Off by default. Pays off on large rule sets (> ~5K rules) with many
274 /// shared substring patterns (threat-intel feeds, IOC packs). For
275 /// smaller rule sets the per-rule [`AhoCorasickSet`] matcher already
276 /// handles the workload optimally; the cross-rule index only adds
277 /// build-time and lookup overhead. Benchmark with `eval_cross_rule_ac`
278 /// against representative rule sets before enabling in production.
279 ///
280 /// Available behind the `daachorse-index` Cargo feature.
281 ///
282 /// [`AhoCorasickSet`]: crate::matcher::CompiledMatcher::AhoCorasickSet
283 #[cfg(feature = "daachorse-index")]
284 pub fn set_cross_rule_ac(&mut self, enabled: bool) {
285 self.cross_rule_ac_enabled = enabled;
286 if enabled && !self.rules.is_empty() {
287 self.rebuild_index();
288 }
289 }
290
291 /// Returns whether the cross-rule AC pre-filter is currently enabled.
292 /// Available behind the `daachorse-index` Cargo feature.
293 #[cfg(feature = "daachorse-index")]
294 pub fn cross_rule_ac_enabled(&self) -> bool {
295 self.cross_rule_ac_enabled
296 }
297
298 /// Set global `include_event` — when `true`, all match results include
299 /// the full event JSON regardless of per-rule custom attributes.
300 pub fn set_include_event(&mut self, include: bool) {
301 self.include_event = include;
302 }
303
304 /// Set the match-detail verbosity for detection results.
305 ///
306 /// `Off` (default) records each match as `{ field, value }`, identical to
307 /// pre-enrichment releases. `Summary` adds the originating selection, the
308 /// matcher kind, and case sensitivity, and reports keyword and absence
309 /// matches that `Off` omits. `Full` additionally records the pattern that
310 /// fired. The extra work runs only when a rule matches and only above
311 /// `Off`, so the default hot path is unchanged.
312 pub fn set_match_detail(&mut self, level: MatchDetailLevel) {
313 self.match_detail = level;
314 }
315
316 /// Returns the configured match-detail verbosity.
317 pub fn match_detail(&self) -> MatchDetailLevel {
318 self.match_detail
319 }
320
321 /// Add a pipeline to the engine.
322 ///
323 /// Pipelines are applied to rules during `add_rule` / `add_collection`.
324 /// Only affects rules added **after** this call.
325 pub fn add_pipeline(&mut self, pipeline: Pipeline) {
326 self.pipelines.push(pipeline);
327 self.pipelines.sort_by_key(|p| p.priority);
328 }
329
330 /// Add a single parsed Sigma rule.
331 ///
332 /// If pipelines are set, the rule is cloned and transformed before
333 /// compilation. The rule index folds the new rule incrementally; the
334 /// bloom index also folds it incrementally and only triggers a full
335 /// rebuild when its doubling watermark is reached, so this call is
336 /// amortized O(1) per rule. With the `daachorse-index` feature
337 /// enabled **and** the cross-rule AC index turned on at runtime, the
338 /// call falls back to a full rebuild because the daachorse automaton
339 /// has no incremental update path.
340 pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
341 let compiled = self.compile_with_pipelines(rule)?;
342 self.rules.push(compiled);
343 self.index_append_last_rule();
344 Ok(())
345 }
346
347 /// Add many parsed Sigma rules in a single batch.
348 ///
349 /// Each rule is compiled (with the engine's pipelines applied, if any)
350 /// and pushed onto the rule set. Compilation errors are collected and
351 /// returned as `(rule_index_in_input, error)` pairs without aborting the
352 /// batch; rules that did compile remain loaded. The inverted index and
353 /// per-field bloom filter are rebuilt **once** at the end of the batch.
354 ///
355 /// Prefer this over a loop of [`Engine::add_rule`] when loading large
356 /// rule sets: the per-call rebuild is O(N) in the total rule count, so
357 /// per-rule adds turn a 3K-rule corpus into O(N²) work.
358 pub fn add_rules<'a, I>(&mut self, rules: I) -> Vec<(usize, EvalError)>
359 where
360 I: IntoIterator<Item = &'a SigmaRule>,
361 {
362 let mut errors = Vec::new();
363 for (idx, rule) in rules.into_iter().enumerate() {
364 match self.compile_with_pipelines(rule) {
365 Ok(compiled) => self.rules.push(compiled),
366 Err(e) => errors.push((idx, e)),
367 }
368 }
369 self.rebuild_index();
370 errors
371 }
372
373 /// Add all detection rules from a parsed collection, then apply filters.
374 ///
375 /// Filter rules modify referenced detection rules by appending exclusion
376 /// conditions. Correlation rules are handled by `CorrelationEngine`.
377 /// The inverted index is rebuilt once after all rules and filters are loaded.
378 pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
379 for rule in &collection.rules {
380 let compiled = self.compile_with_pipelines(rule)?;
381 self.rules.push(compiled);
382 }
383 for filter in &collection.filters {
384 self.apply_filter_no_rebuild(filter)?;
385 }
386 self.rebuild_index();
387 Ok(())
388 }
389
390 /// Compile a rule, applying any configured pipelines first. Shared by
391 /// the single- and batched-add paths so they stay behaviourally
392 /// identical.
393 fn compile_with_pipelines(&self, rule: &SigmaRule) -> Result<CompiledRule> {
394 if self.pipelines.is_empty() {
395 compile_rule(rule)
396 } else {
397 let mut transformed = rule.clone();
398 apply_pipelines(&self.pipelines, &mut transformed)?;
399 compile_rule(&transformed)
400 }
401 }
402
403 /// Add all detection rules from a collection, applying the given pipelines.
404 ///
405 /// This is a convenience method that temporarily sets pipelines, adds the
406 /// collection, then clears them. The inverted index is rebuilt once after
407 /// all rules and filters are loaded.
408 pub fn add_collection_with_pipelines(
409 &mut self,
410 collection: &SigmaCollection,
411 pipelines: &[Pipeline],
412 ) -> Result<()> {
413 let prev = std::mem::take(&mut self.pipelines);
414 self.pipelines = pipelines.to_vec();
415 self.pipelines.sort_by_key(|p| p.priority);
416 let result = self.add_collection(collection);
417 self.pipelines = prev;
418 result
419 }
420
421 /// Apply a filter rule to all referenced detection rules and rebuild the index.
422 pub fn apply_filter(&mut self, filter: &FilterRule) -> Result<()> {
423 self.apply_filter_no_rebuild(filter)?;
424 self.rebuild_index();
425 Ok(())
426 }
427
428 /// Apply a filter rule without rebuilding the index.
429 /// Used internally when multiple mutations are batched.
430 fn apply_filter_no_rebuild(&mut self, filter: &FilterRule) -> Result<()> {
431 // Compile filter detections
432 let mut filter_detections = Vec::new();
433 for (name, detection) in &filter.detection.named {
434 let compiled = compile_detection(detection)?;
435 filter_detections.push((name.clone(), compiled));
436 }
437
438 if filter_detections.is_empty() {
439 return Ok(());
440 }
441
442 let fc = self.filter_counter;
443 self.filter_counter += 1;
444
445 // Rewrite the filter's own condition expression with namespaced identifiers
446 // so that `selection` becomes `__filter_0_selection`, etc.
447 let rewritten_cond = if let Some(cond_expr) = filter.detection.conditions.first() {
448 rewrite_condition_identifiers(cond_expr, fc)
449 } else {
450 // No explicit condition: AND all detections (legacy fallback)
451 if filter_detections.len() == 1 {
452 ConditionExpr::Identifier(format!("__filter_{fc}_{}", filter_detections[0].0))
453 } else {
454 ConditionExpr::And(
455 filter_detections
456 .iter()
457 .map(|(name, _)| ConditionExpr::Identifier(format!("__filter_{fc}_{name}")))
458 .collect(),
459 )
460 }
461 };
462
463 // Find and modify referenced rules
464 let mut matched_any = false;
465 for rule in &mut self.rules {
466 let rule_matches = match &filter.rules {
467 FilterRuleTarget::Any => true,
468 FilterRuleTarget::Specific(refs) => refs
469 .iter()
470 .any(|r| rule.id.as_deref() == Some(r.as_str()) || rule.title == *r),
471 };
472
473 // Also check logsource compatibility if the filter specifies one
474 if rule_matches {
475 if let Some(ref filter_ls) = filter.logsource
476 && !filter_logsource_contains(filter_ls, &rule.logsource)
477 {
478 continue;
479 }
480
481 // Inject filter detections into the rule
482 for (name, compiled) in &filter_detections {
483 rule.detections
484 .insert(format!("__filter_{fc}_{name}"), compiled.clone());
485 }
486
487 // Wrap each existing rule condition with the filter condition
488 rule.conditions = rule
489 .conditions
490 .iter()
491 .map(|cond| ConditionExpr::And(vec![cond.clone(), rewritten_cond.clone()]))
492 .collect();
493 matched_any = true;
494 }
495 }
496
497 if let FilterRuleTarget::Specific(_) = &filter.rules
498 && !matched_any
499 {
500 log::warn!(
501 "filter '{}' references rules {:?} but none matched any loaded rule",
502 filter.title,
503 filter.rules
504 );
505 }
506
507 Ok(())
508 }
509
510 /// Add a pre-compiled rule directly. The rule index folds the new
511 /// rule incrementally; the bloom index also folds it incrementally
512 /// and only triggers a full rebuild when its doubling watermark is
513 /// reached, so this call is amortized O(1) per rule. With the
514 /// cross-rule AC index enabled (`daachorse-index` feature, runtime
515 /// toggle), this falls back to a full rebuild because daachorse has
516 /// no incremental update path.
517 pub fn add_compiled_rule(&mut self, rule: CompiledRule) {
518 self.rules.push(rule);
519 self.index_append_last_rule();
520 }
521
522 /// Add many pre-compiled rules in a single batch. The inverted index
523 /// and bloom filter are rebuilt exactly once at the end, regardless of
524 /// how many rules are appended.
525 pub fn extend_compiled_rules<I>(&mut self, rules: I)
526 where
527 I: IntoIterator<Item = CompiledRule>,
528 {
529 self.rules.extend(rules);
530 self.rebuild_index();
531 }
532
533 /// Rebuild every per-engine index from the current rule set.
534 ///
535 /// Used by batched rule loads (`add_rules`, `extend_compiled_rules`,
536 /// `add_collection`) and by mutations that rewrite existing rules
537 /// (`apply_filter`), where rebuilding once over the final shape is
538 /// cheaper than maintaining incremental state across mutations. The
539 /// single-rule paths use [`Engine::index_append_last_rule`] instead.
540 fn rebuild_index(&mut self) {
541 self.rule_index = RuleIndex::build(&self.rules);
542 self.bloom_index = match self.bloom_max_bytes {
543 Some(budget) => FieldBloomIndex::build_with_budget(&self.rules, budget),
544 None => FieldBloomIndex::build(&self.rules),
545 };
546 #[cfg(feature = "daachorse-index")]
547 {
548 if self.cross_rule_ac_enabled {
549 self.cross_rule_ac_index = cross_rule_ac::CrossRuleAcIndex::build(&self.rules);
550 self.cross_rule_ac_prunable = self
551 .rules
552 .iter()
553 .map(cross_rule_ac::rule_is_ac_prunable)
554 .collect();
555 } else {
556 self.cross_rule_ac_index = cross_rule_ac::CrossRuleAcIndex::empty();
557 self.cross_rule_ac_prunable.clear();
558 }
559 }
560 }
561
562 /// Fold the rule most recently pushed onto `self.rules` into the
563 /// inverted and bloom indexes incrementally. Cost is bounded by the
564 /// new rule's detection tree size, not by the total rule count.
565 ///
566 /// The bloom index periodically forces a full rebuild via its
567 /// doubling watermark to re-enforce the memory budget and reset the
568 /// FPR drift that incremental inserts accumulate. Cross-rule AC
569 /// (daachorse) has no incremental story, so when it is enabled this
570 /// call falls back to [`Engine::rebuild_index`].
571 fn index_append_last_rule(&mut self) {
572 #[cfg(feature = "daachorse-index")]
573 {
574 if self.cross_rule_ac_enabled {
575 self.rebuild_index();
576 return;
577 }
578 }
579
580 let new_idx = self.rules.len() - 1;
581 let rule = &self.rules[new_idx];
582 self.rule_index.append_rule(new_idx, rule);
583 self.bloom_index.append_rule(rule);
584
585 if self.bloom_index.should_rebuild(self.rules.len()) {
586 self.bloom_index = match self.bloom_max_bytes {
587 Some(budget) => FieldBloomIndex::build_with_budget(&self.rules, budget),
588 None => FieldBloomIndex::build(&self.rules),
589 };
590 }
591 }
592
593 /// Evaluate an event against candidate rules using the inverted index.
594 pub fn evaluate<E: Event>(&self, event: &E) -> Vec<EvaluationResult> {
595 if self.bloom_prefilter {
596 self.evaluate_with_bloom_path(event)
597 } else {
598 self.evaluate_no_bloom_path(event)
599 }
600 }
601
602 /// Build the cross-rule AC keep-mask for `event`, or `None` when the
603 /// cross-rule index is disabled or empty (no filtering needed).
604 ///
605 /// `Some(mask)` answers "should this rule survive the cross-rule AC
606 /// filter": `mask[idx] = true` means keep, `false` means drop.
607 /// Non-AC-prunable rules are always kept.
608 #[cfg(feature = "daachorse-index")]
609 fn cross_rule_ac_keep_mask<E: Event>(&self, event: &E) -> Option<Vec<bool>> {
610 if !self.cross_rule_ac_enabled || self.cross_rule_ac_index.is_empty() {
611 return None;
612 }
613 let mut hits = vec![false; self.rules.len()];
614 self.cross_rule_ac_index.mark_hits(event, &mut hits);
615 // Compose: keep = !ac_prunable OR ac_hit. The prunable vector and
616 // the rule slice are kept aligned by `rebuild_index`.
617 for (idx, slot) in hits.iter_mut().enumerate() {
618 if !self
619 .cross_rule_ac_prunable
620 .get(idx)
621 .copied()
622 .unwrap_or(false)
623 {
624 *slot = true;
625 }
626 }
627 Some(hits)
628 }
629
630 #[cfg(not(feature = "daachorse-index"))]
631 #[inline(always)]
632 fn cross_rule_ac_keep_mask<E: Event>(&self, _event: &E) -> Option<Vec<bool>> {
633 None
634 }
635
636 /// Pick the candidate rule set for `event`. When a logsource extractor
637 /// produced an event logsource, the product-partitioned index drops
638 /// always-evaluated rules of a conflicting product; otherwise the full
639 /// candidate set is returned (zero behaviour change when pruning is off).
640 fn logsource_candidates<E: Event>(
641 &self,
642 event: &E,
643 event_logsource: Option<&LogSource>,
644 ) -> Vec<usize> {
645 match event_logsource {
646 Some(ls) => {
647 // Observability: count the fail-open case (no logsource at all)
648 // and the always-evaluated rules pruned by product conflict.
649 if ls.product.is_none() && ls.service.is_none() && ls.category.is_none() {
650 self.logsource_absent.fetch_add(1, Ordering::Relaxed);
651 }
652 let pruned = self
653 .rule_index
654 .conflicting_unindexable_count(ls.product.as_deref());
655 if pruned > 0 {
656 self.logsource_pruned
657 .fetch_add(pruned as u64, Ordering::Relaxed);
658 }
659 self.rule_index
660 .candidates_with_logsource(event, ls.product.as_deref())
661 }
662 None => self.rule_index.candidates(event),
663 }
664 }
665
666 fn evaluate_no_bloom_path<E: Event>(&self, event: &E) -> Vec<EvaluationResult> {
667 // Pass the zero-sized `NoBloom` lookup so this monomorphizes to the
668 // same straight-line code as the pre-bloom engine while still
669 // threading the configured match-detail level.
670 let keep = self.cross_rule_ac_keep_mask(event);
671 // Extract the event logsource once when pruning is enabled; `None`
672 // (the default) leaves the loop's behaviour unchanged.
673 let event_logsource = self
674 .logsource_extractor
675 .as_ref()
676 .map(|ex| ex.extract(event));
677 let candidates = self.logsource_candidates(event, event_logsource.as_ref());
678 let mut results = Vec::new();
679 for idx in candidates {
680 if let Some(ref mask) = keep
681 && !mask[idx]
682 {
683 continue;
684 }
685 let rule = &self.rules[idx];
686 if let Some(ref event_ls) = event_logsource
687 && !logsource_compatible(&rule.logsource, event_ls)
688 {
689 continue;
690 }
691 if let Some(mut m) =
692 evaluate_rule_with_bloom(rule, event, &bloom_index::NoBloom, self.match_detail)
693 {
694 if self.include_event
695 && let Some(d) = m.as_detection_mut()
696 && d.event.is_none()
697 {
698 d.event = Some(event.to_json());
699 }
700 results.push(m);
701 }
702 }
703 results
704 }
705
706 fn evaluate_with_bloom_path<E: Event>(&self, event: &E) -> Vec<EvaluationResult> {
707 let bloom = BloomCache::new(&self.bloom_index, event);
708 let keep = self.cross_rule_ac_keep_mask(event);
709 // Extract the event logsource once when pruning is enabled; `None`
710 // (the default) leaves the loop's behaviour unchanged.
711 let event_logsource = self
712 .logsource_extractor
713 .as_ref()
714 .map(|ex| ex.extract(event));
715 let candidates = self.logsource_candidates(event, event_logsource.as_ref());
716 let mut results = Vec::new();
717 for idx in candidates {
718 if let Some(ref mask) = keep
719 && !mask[idx]
720 {
721 continue;
722 }
723 let rule = &self.rules[idx];
724 if let Some(ref event_ls) = event_logsource
725 && !logsource_compatible(&rule.logsource, event_ls)
726 {
727 continue;
728 }
729 if let Some(mut m) = evaluate_rule_with_bloom(rule, event, &bloom, self.match_detail) {
730 if self.include_event
731 && let Some(d) = m.as_detection_mut()
732 && d.event.is_none()
733 {
734 d.event = Some(event.to_json());
735 }
736 results.push(m);
737 }
738 }
739 results
740 }
741
742 /// Evaluate an event against candidate rules matching the given logsource.
743 ///
744 /// Uses the inverted index for candidate pre-filtering, then applies the
745 /// logsource constraint. Only rules whose logsource is compatible with
746 /// `event_logsource` are evaluated.
747 pub fn evaluate_with_logsource<E: Event>(
748 &self,
749 event: &E,
750 event_logsource: &LogSource,
751 ) -> Vec<EvaluationResult> {
752 if self.bloom_prefilter {
753 self.evaluate_with_logsource_with_bloom(event, event_logsource)
754 } else {
755 self.evaluate_with_logsource_no_bloom(event, event_logsource)
756 }
757 }
758
759 fn evaluate_with_logsource_no_bloom<E: Event>(
760 &self,
761 event: &E,
762 event_logsource: &LogSource,
763 ) -> Vec<EvaluationResult> {
764 let keep = self.cross_rule_ac_keep_mask(event);
765 let mut results = Vec::new();
766 for idx in self.rule_index.candidates(event) {
767 if let Some(ref mask) = keep
768 && !mask[idx]
769 {
770 continue;
771 }
772 let rule = &self.rules[idx];
773 if logsource_matches(&rule.logsource, event_logsource)
774 && let Some(mut m) =
775 evaluate_rule_with_bloom(rule, event, &bloom_index::NoBloom, self.match_detail)
776 {
777 if self.include_event
778 && let Some(d) = m.as_detection_mut()
779 && d.event.is_none()
780 {
781 d.event = Some(event.to_json());
782 }
783 results.push(m);
784 }
785 }
786 results
787 }
788
789 fn evaluate_with_logsource_with_bloom<E: Event>(
790 &self,
791 event: &E,
792 event_logsource: &LogSource,
793 ) -> Vec<EvaluationResult> {
794 let bloom = BloomCache::new(&self.bloom_index, event);
795 let keep = self.cross_rule_ac_keep_mask(event);
796 let mut results = Vec::new();
797 for idx in self.rule_index.candidates(event) {
798 if let Some(ref mask) = keep
799 && !mask[idx]
800 {
801 continue;
802 }
803 let rule = &self.rules[idx];
804 if logsource_matches(&rule.logsource, event_logsource)
805 && let Some(mut m) =
806 evaluate_rule_with_bloom(rule, event, &bloom, self.match_detail)
807 {
808 if self.include_event
809 && let Some(d) = m.as_detection_mut()
810 && d.event.is_none()
811 {
812 d.event = Some(event.to_json());
813 }
814 results.push(m);
815 }
816 }
817 results
818 }
819
820 /// Evaluate a batch of events, returning per-event match results.
821 ///
822 /// When the `parallel` feature is enabled, events are evaluated concurrently
823 /// using rayon's work-stealing thread pool. Otherwise, falls back to
824 /// sequential evaluation.
825 pub fn evaluate_batch<E: Event + Sync>(&self, events: &[&E]) -> Vec<Vec<EvaluationResult>> {
826 #[cfg(feature = "parallel")]
827 {
828 use rayon::prelude::*;
829 events.par_iter().map(|e| self.evaluate(e)).collect()
830 }
831 #[cfg(not(feature = "parallel"))]
832 {
833 events.iter().map(|e| self.evaluate(e)).collect()
834 }
835 }
836
837 /// Number of rules loaded in the engine.
838 pub fn rule_count(&self) -> usize {
839 self.rules.len()
840 }
841
842 /// Access the compiled rules.
843 pub fn rules(&self) -> &[CompiledRule] {
844 &self.rules
845 }
846}
847
848impl Default for Engine {
849 fn default() -> Self {
850 Self::new()
851 }
852}