Skip to main content

rsigma_eval/engine/
mod.rs

1//! Rule evaluation engine with logsource routing.
2//!
3//! The `Engine` manages a set of compiled Sigma rules and evaluates events
4//! against them. It supports optional logsource-based pre-filtering to
5//! reduce the number of rules evaluated per event.
6
7pub(crate) mod bloom_index;
8#[cfg(feature = "daachorse-index")]
9pub(crate) mod cross_rule_ac;
10mod filters;
11#[cfg(test)]
12mod tests;
13
14use std::sync::atomic::{AtomicU64, Ordering};
15
16use rsigma_parser::{
17    ConditionExpr, FilterRule, FilterRuleTarget, LogSource, SigmaCollection, SigmaRule,
18};
19
20use crate::compiler::{CompiledRule, compile_detection, compile_rule, evaluate_rule_with_bloom};
21use crate::error::{EvalError, Result};
22use crate::event::Event;
23use crate::logsource::LogSourceExtractor;
24use crate::pipeline::{Pipeline, apply_pipelines};
25use crate::result::{EvaluationResult, MatchDetailLevel};
26use crate::rule_index::RuleIndex;
27
28use bloom_index::{BloomCache, FieldBloomIndex};
29
30use filters::{
31    filter_logsource_contains, logsource_compatible, logsource_matches,
32    rewrite_condition_identifiers,
33};
34
35/// The main rule evaluation engine.
36///
37/// Holds a set of compiled rules and provides methods to evaluate events
38/// against them. Supports optional logsource routing for performance.
39///
40/// # Example
41///
42/// ```rust
43/// use rsigma_parser::parse_sigma_yaml;
44/// use rsigma_eval::{Engine, Event};
45/// use rsigma_eval::event::JsonEvent;
46/// use serde_json::json;
47///
48/// let yaml = r#"
49/// title: Detect Whoami
50/// logsource:
51///     product: windows
52///     category: process_creation
53/// detection:
54///     selection:
55///         CommandLine|contains: 'whoami'
56///     condition: selection
57/// level: medium
58/// "#;
59///
60/// let collection = parse_sigma_yaml(yaml).unwrap();
61/// let mut engine = Engine::new();
62/// engine.add_collection(&collection).unwrap();
63///
64/// let event_val = json!({"CommandLine": "cmd /c whoami"});
65/// let event = JsonEvent::borrow(&event_val);
66/// let matches = engine.evaluate(&event);
67/// assert_eq!(matches.len(), 1);
68/// assert_eq!(matches[0].header.rule_title, "Detect Whoami");
69/// ```
70pub struct Engine {
71    rules: Vec<CompiledRule>,
72    pipelines: Vec<Pipeline>,
73    /// Global override: include the full event JSON in all match results.
74    /// When `true`, overrides per-rule `rsigma.include_event` custom attributes.
75    include_event: bool,
76    /// Verbosity of the match detail recorded on detection results.
77    /// `Off` by default, which preserves the historical `{ field, value }`
78    /// wire shape. See [`Engine::set_match_detail`].
79    match_detail: MatchDetailLevel,
80    /// Monotonic counter used to namespace injected filter detections,
81    /// preventing key collisions when multiple filters share detection names.
82    filter_counter: usize,
83    /// Inverted index mapping `(field, exact_value)` to candidate rule indices.
84    /// Rebuilt after every rule mutation (add, filter).
85    rule_index: RuleIndex,
86    /// Per-field bloom filter over positive substring needles. Rebuilt
87    /// alongside `rule_index`. Consulted only when `bloom_prefilter` is
88    /// enabled.
89    bloom_index: FieldBloomIndex,
90    /// Toggle for bloom pre-filtering. Off by default: the per-event probe
91    /// overhead exceeds the savings on rule sets where most events overlap
92    /// with at least one needle's trigrams. Workloads with many substring
93    /// rules and mostly-non-matching events (e.g. high-volume telemetry
94    /// streams against an active threat-intel ruleset) opt in via
95    /// [`Engine::set_bloom_prefilter`].
96    bloom_prefilter: bool,
97    /// Memory budget the bloom builder is allowed to consume across all
98    /// per-field filters. `None` means use the crate default
99    /// (`bloom_index::DEFAULT_MAX_TOTAL_BYTES`, 1 MB).
100    bloom_max_bytes: Option<usize>,
101    /// Opt-in event-logsource extractor for conflict-based rule pruning.
102    /// `None` (default) leaves the hot path unchanged; when `Some`, the
103    /// engine extracts each event's logsource once and skips rules whose
104    /// logsource conflicts (see [`Engine::set_logsource_extractor`]).
105    logsource_extractor: Option<LogSourceExtractor>,
106    /// Monotonic count of always-evaluated rules skipped because their
107    /// product conflicts with the event's. Incremented only when an extractor
108    /// is set; surfaced via [`Engine::logsource_pruned_total`].
109    logsource_pruned: AtomicU64,
110    /// Monotonic count of `evaluate` calls where the extractor produced no
111    /// logsource at all (fail-open: every rule was evaluated). Surfaced via
112    /// [`Engine::logsource_absent_total`].
113    logsource_absent: AtomicU64,
114    /// Cross-rule Aho-Corasick index for substring patterns, gated on the
115    /// `daachorse-index` feature. Built only when [`cross_rule_ac_enabled`]
116    /// is `true`; [`cross_rule_ac_prunable`] is the conservative per-rule
117    /// flag computed at the same time so the `evaluate` hot path can drop
118    /// rules safely.
119    ///
120    /// [`cross_rule_ac_enabled`]: Self::cross_rule_ac_enabled
121    /// [`cross_rule_ac_prunable`]: Self::cross_rule_ac_prunable
122    #[cfg(feature = "daachorse-index")]
123    cross_rule_ac_index: cross_rule_ac::CrossRuleAcIndex,
124    /// Toggle for the cross-rule AC pre-filter. Off by default; the index
125    /// only pays off on rule sets > 5K rules with many shared substring
126    /// patterns. See [`Engine::set_cross_rule_ac`].
127    #[cfg(feature = "daachorse-index")]
128    cross_rule_ac_enabled: bool,
129    /// Per-rule conservative AC-prunability flag. `true` iff the rule's
130    /// firing requires at least one positive substring match (no `Exact`,
131    /// `Regex`, `Numeric`, `Not`, etc.), so dropping the rule on a
132    /// "no AC hit" verdict is provably correct.
133    #[cfg(feature = "daachorse-index")]
134    cross_rule_ac_prunable: Vec<bool>,
135}
136
137impl Engine {
138    /// Create a new empty engine.
139    pub fn new() -> Self {
140        Engine {
141            rules: Vec::new(),
142            pipelines: Vec::new(),
143            include_event: false,
144            match_detail: MatchDetailLevel::Off,
145            filter_counter: 0,
146            rule_index: RuleIndex::empty(),
147            bloom_index: FieldBloomIndex::empty(),
148            bloom_prefilter: false,
149            bloom_max_bytes: None,
150            logsource_extractor: None,
151            logsource_pruned: AtomicU64::new(0),
152            logsource_absent: AtomicU64::new(0),
153            #[cfg(feature = "daachorse-index")]
154            cross_rule_ac_index: cross_rule_ac::CrossRuleAcIndex::empty(),
155            #[cfg(feature = "daachorse-index")]
156            cross_rule_ac_enabled: false,
157            #[cfg(feature = "daachorse-index")]
158            cross_rule_ac_prunable: Vec::new(),
159        }
160    }
161
162    /// Create a new engine with a pipeline.
163    pub fn new_with_pipeline(pipeline: Pipeline) -> Self {
164        Engine {
165            rules: Vec::new(),
166            pipelines: vec![pipeline],
167            include_event: false,
168            match_detail: MatchDetailLevel::Off,
169            filter_counter: 0,
170            rule_index: RuleIndex::empty(),
171            bloom_index: FieldBloomIndex::empty(),
172            bloom_prefilter: false,
173            bloom_max_bytes: None,
174            logsource_extractor: None,
175            logsource_pruned: AtomicU64::new(0),
176            logsource_absent: AtomicU64::new(0),
177            #[cfg(feature = "daachorse-index")]
178            cross_rule_ac_index: cross_rule_ac::CrossRuleAcIndex::empty(),
179            #[cfg(feature = "daachorse-index")]
180            cross_rule_ac_enabled: false,
181            #[cfg(feature = "daachorse-index")]
182            cross_rule_ac_prunable: Vec::new(),
183        }
184    }
185
186    /// Enable or disable bloom-filter pre-filtering of positive substring
187    /// detection items.
188    ///
189    /// When enabled, `evaluate*` short-circuits any positive substring
190    /// matcher (`Contains` / `StartsWith` / `EndsWith` / `AhoCorasickSet`,
191    /// alone or wrapped in `CaseInsensitiveGroup`) whose field cannot
192    /// possibly contain a needle trigram.
193    ///
194    /// Disabled by default. The per-event probe (trigram extraction +
195    /// double hashing) costs ~1 µs on a typical CommandLine field, which
196    /// outweighs the savings on rule sets where most events overlap with
197    /// at least one needle. Enable for workloads that pair many substring
198    /// rules with mostly-non-matching events; benchmark with
199    /// `eval_bloom_rejection` before flipping it on in production.
200    pub fn set_bloom_prefilter(&mut self, enabled: bool) {
201        self.bloom_prefilter = enabled;
202    }
203
204    /// Returns whether bloom pre-filtering is currently enabled.
205    pub fn bloom_prefilter_enabled(&self) -> bool {
206        self.bloom_prefilter
207    }
208
209    /// Set the memory budget for the per-field bloom index.
210    ///
211    /// Must be called **before** `add_collection` / `add_rule` for the new
212    /// budget to take effect on the existing rule set; otherwise it is
213    /// applied at the next index rebuild. The default budget is 1 MB,
214    /// shared across all per-field filters. Lower the cap on memory-
215    /// constrained deployments; raise it for large rule sets where the
216    /// default starts evicting useful filters.
217    pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
218        self.bloom_max_bytes = Some(max_bytes);
219        if !self.rules.is_empty() {
220            self.rebuild_index();
221        }
222    }
223
224    /// Returns the configured bloom memory budget, if one has been set
225    /// explicitly. `None` means the crate default (1 MB) is in use.
226    pub fn bloom_max_bytes(&self) -> Option<usize> {
227        self.bloom_max_bytes
228    }
229
230    /// Enable or disable opt-in, conflict-based logsource pruning.
231    ///
232    /// When set to `Some`, `evaluate` extracts each event's logsource once via
233    /// the [`LogSourceExtractor`] and skips any candidate rule whose logsource
234    /// conflicts with the event's (a dimension set on both sides that
235    /// disagrees). A dimension unset on either side is a wildcard, so an event
236    /// tagged only `product: windows` skips `product: linux` rules while still
237    /// evaluating Windows-category and logsource-less rules.
238    ///
239    /// Disabled by default (`None`), leaving the hot path unchanged. Pruning
240    /// fails open: an event with no extractable logsource evaluates every
241    /// rule. The extractor is read on every `evaluate` call, so it can be
242    /// swapped at runtime (e.g. carried across a hot-reload).
243    pub fn set_logsource_extractor(&mut self, extractor: Option<LogSourceExtractor>) {
244        self.logsource_extractor = extractor;
245    }
246
247    /// Returns the configured logsource extractor, if any. `None` means
248    /// logsource pruning is disabled.
249    pub fn logsource_extractor(&self) -> Option<&LogSourceExtractor> {
250        self.logsource_extractor.as_ref()
251    }
252
253    /// Total always-evaluated rules skipped by logsource product pruning since
254    /// engine creation. Zero unless an extractor is set.
255    pub fn logsource_pruned_total(&self) -> u64 {
256        self.logsource_pruned.load(Ordering::Relaxed)
257    }
258
259    /// Total `evaluate` calls where the extractor produced no logsource and
260    /// pruning failed open (every rule evaluated). Zero unless an extractor
261    /// is set.
262    pub fn logsource_absent_total(&self) -> u64 {
263        self.logsource_absent.load(Ordering::Relaxed)
264    }
265
266    /// Enable or disable the cross-rule Aho-Corasick pre-filter.
267    ///
268    /// When enabled, the engine builds a single per-field
269    /// `DoubleArrayAhoCorasick` automaton over every positive substring
270    /// needle from every rule and drops AC-prunable rules from the
271    /// candidate set when none of their patterns hit the event.
272    ///
273    /// Off by default. Pays off on large rule sets (> ~5K rules) with many
274    /// shared substring patterns (threat-intel feeds, IOC packs). For
275    /// smaller rule sets the per-rule [`AhoCorasickSet`] matcher already
276    /// handles the workload optimally; the cross-rule index only adds
277    /// build-time and lookup overhead. Benchmark with `eval_cross_rule_ac`
278    /// against representative rule sets before enabling in production.
279    ///
280    /// Available behind the `daachorse-index` Cargo feature.
281    ///
282    /// [`AhoCorasickSet`]: crate::matcher::CompiledMatcher::AhoCorasickSet
283    #[cfg(feature = "daachorse-index")]
284    pub fn set_cross_rule_ac(&mut self, enabled: bool) {
285        self.cross_rule_ac_enabled = enabled;
286        if enabled && !self.rules.is_empty() {
287            self.rebuild_index();
288        }
289    }
290
291    /// Returns whether the cross-rule AC pre-filter is currently enabled.
292    /// Available behind the `daachorse-index` Cargo feature.
293    #[cfg(feature = "daachorse-index")]
294    pub fn cross_rule_ac_enabled(&self) -> bool {
295        self.cross_rule_ac_enabled
296    }
297
298    /// Set global `include_event` — when `true`, all match results include
299    /// the full event JSON regardless of per-rule custom attributes.
300    pub fn set_include_event(&mut self, include: bool) {
301        self.include_event = include;
302    }
303
304    /// Set the match-detail verbosity for detection results.
305    ///
306    /// `Off` (default) records each match as `{ field, value }`, identical to
307    /// pre-enrichment releases. `Summary` adds the originating selection, the
308    /// matcher kind, and case sensitivity, and reports keyword and absence
309    /// matches that `Off` omits. `Full` additionally records the pattern that
310    /// fired. The extra work runs only when a rule matches and only above
311    /// `Off`, so the default hot path is unchanged.
312    pub fn set_match_detail(&mut self, level: MatchDetailLevel) {
313        self.match_detail = level;
314    }
315
316    /// Returns the configured match-detail verbosity.
317    pub fn match_detail(&self) -> MatchDetailLevel {
318        self.match_detail
319    }
320
321    /// Add a pipeline to the engine.
322    ///
323    /// Pipelines are applied to rules during `add_rule` / `add_collection`.
324    /// Only affects rules added **after** this call.
325    pub fn add_pipeline(&mut self, pipeline: Pipeline) {
326        self.pipelines.push(pipeline);
327        self.pipelines.sort_by_key(|p| p.priority);
328    }
329
330    /// Add a single parsed Sigma rule.
331    ///
332    /// If pipelines are set, the rule is cloned and transformed before
333    /// compilation. The rule index folds the new rule incrementally; the
334    /// bloom index also folds it incrementally and only triggers a full
335    /// rebuild when its doubling watermark is reached, so this call is
336    /// amortized O(1) per rule. With the `daachorse-index` feature
337    /// enabled **and** the cross-rule AC index turned on at runtime, the
338    /// call falls back to a full rebuild because the daachorse automaton
339    /// has no incremental update path.
340    pub fn add_rule(&mut self, rule: &SigmaRule) -> Result<()> {
341        let compiled = self.compile_with_pipelines(rule)?;
342        self.rules.push(compiled);
343        self.index_append_last_rule();
344        Ok(())
345    }
346
347    /// Add many parsed Sigma rules in a single batch.
348    ///
349    /// Each rule is compiled (with the engine's pipelines applied, if any)
350    /// and pushed onto the rule set. Compilation errors are collected and
351    /// returned as `(rule_index_in_input, error)` pairs without aborting the
352    /// batch; rules that did compile remain loaded. The inverted index and
353    /// per-field bloom filter are rebuilt **once** at the end of the batch.
354    ///
355    /// Prefer this over a loop of [`Engine::add_rule`] when loading large
356    /// rule sets: the per-call rebuild is O(N) in the total rule count, so
357    /// per-rule adds turn a 3K-rule corpus into O(N²) work.
358    pub fn add_rules<'a, I>(&mut self, rules: I) -> Vec<(usize, EvalError)>
359    where
360        I: IntoIterator<Item = &'a SigmaRule>,
361    {
362        let mut errors = Vec::new();
363        for (idx, rule) in rules.into_iter().enumerate() {
364            match self.compile_with_pipelines(rule) {
365                Ok(compiled) => self.rules.push(compiled),
366                Err(e) => errors.push((idx, e)),
367            }
368        }
369        self.rebuild_index();
370        errors
371    }
372
373    /// Add all detection rules from a parsed collection, then apply filters.
374    ///
375    /// Filter rules modify referenced detection rules by appending exclusion
376    /// conditions. Correlation rules are handled by `CorrelationEngine`.
377    /// The inverted index is rebuilt once after all rules and filters are loaded.
378    pub fn add_collection(&mut self, collection: &SigmaCollection) -> Result<()> {
379        for rule in &collection.rules {
380            let compiled = self.compile_with_pipelines(rule)?;
381            self.rules.push(compiled);
382        }
383        for filter in &collection.filters {
384            self.apply_filter_no_rebuild(filter)?;
385        }
386        self.rebuild_index();
387        Ok(())
388    }
389
390    /// Compile a rule, applying any configured pipelines first. Shared by
391    /// the single- and batched-add paths so they stay behaviourally
392    /// identical.
393    fn compile_with_pipelines(&self, rule: &SigmaRule) -> Result<CompiledRule> {
394        if self.pipelines.is_empty() {
395            compile_rule(rule)
396        } else {
397            let mut transformed = rule.clone();
398            apply_pipelines(&self.pipelines, &mut transformed)?;
399            compile_rule(&transformed)
400        }
401    }
402
403    /// Add all detection rules from a collection, applying the given pipelines.
404    ///
405    /// This is a convenience method that temporarily sets pipelines, adds the
406    /// collection, then clears them. The inverted index is rebuilt once after
407    /// all rules and filters are loaded.
408    pub fn add_collection_with_pipelines(
409        &mut self,
410        collection: &SigmaCollection,
411        pipelines: &[Pipeline],
412    ) -> Result<()> {
413        let prev = std::mem::take(&mut self.pipelines);
414        self.pipelines = pipelines.to_vec();
415        self.pipelines.sort_by_key(|p| p.priority);
416        let result = self.add_collection(collection);
417        self.pipelines = prev;
418        result
419    }
420
421    /// Apply a filter rule to all referenced detection rules and rebuild the index.
422    pub fn apply_filter(&mut self, filter: &FilterRule) -> Result<()> {
423        self.apply_filter_no_rebuild(filter)?;
424        self.rebuild_index();
425        Ok(())
426    }
427
428    /// Apply a filter rule without rebuilding the index.
429    /// Used internally when multiple mutations are batched.
430    fn apply_filter_no_rebuild(&mut self, filter: &FilterRule) -> Result<()> {
431        // Compile filter detections
432        let mut filter_detections = Vec::new();
433        for (name, detection) in &filter.detection.named {
434            let compiled = compile_detection(detection)?;
435            filter_detections.push((name.clone(), compiled));
436        }
437
438        if filter_detections.is_empty() {
439            return Ok(());
440        }
441
442        let fc = self.filter_counter;
443        self.filter_counter += 1;
444
445        // Rewrite the filter's own condition expression with namespaced identifiers
446        // so that `selection` becomes `__filter_0_selection`, etc.
447        let rewritten_cond = if let Some(cond_expr) = filter.detection.conditions.first() {
448            rewrite_condition_identifiers(cond_expr, fc)
449        } else {
450            // No explicit condition: AND all detections (legacy fallback)
451            if filter_detections.len() == 1 {
452                ConditionExpr::Identifier(format!("__filter_{fc}_{}", filter_detections[0].0))
453            } else {
454                ConditionExpr::And(
455                    filter_detections
456                        .iter()
457                        .map(|(name, _)| ConditionExpr::Identifier(format!("__filter_{fc}_{name}")))
458                        .collect(),
459                )
460            }
461        };
462
463        // Find and modify referenced rules
464        let mut matched_any = false;
465        for rule in &mut self.rules {
466            let rule_matches = match &filter.rules {
467                FilterRuleTarget::Any => true,
468                FilterRuleTarget::Specific(refs) => refs
469                    .iter()
470                    .any(|r| rule.id.as_deref() == Some(r.as_str()) || rule.title == *r),
471            };
472
473            // Also check logsource compatibility if the filter specifies one
474            if rule_matches {
475                if let Some(ref filter_ls) = filter.logsource
476                    && !filter_logsource_contains(filter_ls, &rule.logsource)
477                {
478                    continue;
479                }
480
481                // Inject filter detections into the rule
482                for (name, compiled) in &filter_detections {
483                    rule.detections
484                        .insert(format!("__filter_{fc}_{name}"), compiled.clone());
485                }
486
487                // Wrap each existing rule condition with the filter condition
488                rule.conditions = rule
489                    .conditions
490                    .iter()
491                    .map(|cond| ConditionExpr::And(vec![cond.clone(), rewritten_cond.clone()]))
492                    .collect();
493                matched_any = true;
494            }
495        }
496
497        if let FilterRuleTarget::Specific(_) = &filter.rules
498            && !matched_any
499        {
500            log::warn!(
501                "filter '{}' references rules {:?} but none matched any loaded rule",
502                filter.title,
503                filter.rules
504            );
505        }
506
507        Ok(())
508    }
509
510    /// Add a pre-compiled rule directly. The rule index folds the new
511    /// rule incrementally; the bloom index also folds it incrementally
512    /// and only triggers a full rebuild when its doubling watermark is
513    /// reached, so this call is amortized O(1) per rule. With the
514    /// cross-rule AC index enabled (`daachorse-index` feature, runtime
515    /// toggle), this falls back to a full rebuild because daachorse has
516    /// no incremental update path.
517    pub fn add_compiled_rule(&mut self, rule: CompiledRule) {
518        self.rules.push(rule);
519        self.index_append_last_rule();
520    }
521
522    /// Add many pre-compiled rules in a single batch. The inverted index
523    /// and bloom filter are rebuilt exactly once at the end, regardless of
524    /// how many rules are appended.
525    pub fn extend_compiled_rules<I>(&mut self, rules: I)
526    where
527        I: IntoIterator<Item = CompiledRule>,
528    {
529        self.rules.extend(rules);
530        self.rebuild_index();
531    }
532
533    /// Rebuild every per-engine index from the current rule set.
534    ///
535    /// Used by batched rule loads (`add_rules`, `extend_compiled_rules`,
536    /// `add_collection`) and by mutations that rewrite existing rules
537    /// (`apply_filter`), where rebuilding once over the final shape is
538    /// cheaper than maintaining incremental state across mutations. The
539    /// single-rule paths use [`Engine::index_append_last_rule`] instead.
540    fn rebuild_index(&mut self) {
541        self.rule_index = RuleIndex::build(&self.rules);
542        self.bloom_index = match self.bloom_max_bytes {
543            Some(budget) => FieldBloomIndex::build_with_budget(&self.rules, budget),
544            None => FieldBloomIndex::build(&self.rules),
545        };
546        #[cfg(feature = "daachorse-index")]
547        {
548            if self.cross_rule_ac_enabled {
549                self.cross_rule_ac_index = cross_rule_ac::CrossRuleAcIndex::build(&self.rules);
550                self.cross_rule_ac_prunable = self
551                    .rules
552                    .iter()
553                    .map(cross_rule_ac::rule_is_ac_prunable)
554                    .collect();
555            } else {
556                self.cross_rule_ac_index = cross_rule_ac::CrossRuleAcIndex::empty();
557                self.cross_rule_ac_prunable.clear();
558            }
559        }
560    }
561
562    /// Fold the rule most recently pushed onto `self.rules` into the
563    /// inverted and bloom indexes incrementally. Cost is bounded by the
564    /// new rule's detection tree size, not by the total rule count.
565    ///
566    /// The bloom index periodically forces a full rebuild via its
567    /// doubling watermark to re-enforce the memory budget and reset the
568    /// FPR drift that incremental inserts accumulate. Cross-rule AC
569    /// (daachorse) has no incremental story, so when it is enabled this
570    /// call falls back to [`Engine::rebuild_index`].
571    fn index_append_last_rule(&mut self) {
572        #[cfg(feature = "daachorse-index")]
573        {
574            if self.cross_rule_ac_enabled {
575                self.rebuild_index();
576                return;
577            }
578        }
579
580        let new_idx = self.rules.len() - 1;
581        let rule = &self.rules[new_idx];
582        self.rule_index.append_rule(new_idx, rule);
583        self.bloom_index.append_rule(rule);
584
585        if self.bloom_index.should_rebuild(self.rules.len()) {
586            self.bloom_index = match self.bloom_max_bytes {
587                Some(budget) => FieldBloomIndex::build_with_budget(&self.rules, budget),
588                None => FieldBloomIndex::build(&self.rules),
589            };
590        }
591    }
592
593    /// Evaluate an event against candidate rules using the inverted index.
594    pub fn evaluate<E: Event>(&self, event: &E) -> Vec<EvaluationResult> {
595        if self.bloom_prefilter {
596            self.evaluate_with_bloom_path(event)
597        } else {
598            self.evaluate_no_bloom_path(event)
599        }
600    }
601
602    /// Build the cross-rule AC keep-mask for `event`, or `None` when the
603    /// cross-rule index is disabled or empty (no filtering needed).
604    ///
605    /// `Some(mask)` answers "should this rule survive the cross-rule AC
606    /// filter": `mask[idx] = true` means keep, `false` means drop.
607    /// Non-AC-prunable rules are always kept.
608    #[cfg(feature = "daachorse-index")]
609    fn cross_rule_ac_keep_mask<E: Event>(&self, event: &E) -> Option<Vec<bool>> {
610        if !self.cross_rule_ac_enabled || self.cross_rule_ac_index.is_empty() {
611            return None;
612        }
613        let mut hits = vec![false; self.rules.len()];
614        self.cross_rule_ac_index.mark_hits(event, &mut hits);
615        // Compose: keep = !ac_prunable OR ac_hit. The prunable vector and
616        // the rule slice are kept aligned by `rebuild_index`.
617        for (idx, slot) in hits.iter_mut().enumerate() {
618            if !self
619                .cross_rule_ac_prunable
620                .get(idx)
621                .copied()
622                .unwrap_or(false)
623            {
624                *slot = true;
625            }
626        }
627        Some(hits)
628    }
629
630    #[cfg(not(feature = "daachorse-index"))]
631    #[inline(always)]
632    fn cross_rule_ac_keep_mask<E: Event>(&self, _event: &E) -> Option<Vec<bool>> {
633        None
634    }
635
636    /// Pick the candidate rule set for `event`. When a logsource extractor
637    /// produced an event logsource, the product-partitioned index drops
638    /// always-evaluated rules of a conflicting product; otherwise the full
639    /// candidate set is returned (zero behaviour change when pruning is off).
640    fn logsource_candidates<E: Event>(
641        &self,
642        event: &E,
643        event_logsource: Option<&LogSource>,
644    ) -> Vec<usize> {
645        match event_logsource {
646            Some(ls) => {
647                // Observability: count the fail-open case (no logsource at all)
648                // and the always-evaluated rules pruned by product conflict.
649                if ls.product.is_none() && ls.service.is_none() && ls.category.is_none() {
650                    self.logsource_absent.fetch_add(1, Ordering::Relaxed);
651                }
652                let pruned = self
653                    .rule_index
654                    .conflicting_unindexable_count(ls.product.as_deref());
655                if pruned > 0 {
656                    self.logsource_pruned
657                        .fetch_add(pruned as u64, Ordering::Relaxed);
658                }
659                self.rule_index
660                    .candidates_with_logsource(event, ls.product.as_deref())
661            }
662            None => self.rule_index.candidates(event),
663        }
664    }
665
666    fn evaluate_no_bloom_path<E: Event>(&self, event: &E) -> Vec<EvaluationResult> {
667        // Pass the zero-sized `NoBloom` lookup so this monomorphizes to the
668        // same straight-line code as the pre-bloom engine while still
669        // threading the configured match-detail level.
670        let keep = self.cross_rule_ac_keep_mask(event);
671        // Extract the event logsource once when pruning is enabled; `None`
672        // (the default) leaves the loop's behaviour unchanged.
673        let event_logsource = self
674            .logsource_extractor
675            .as_ref()
676            .map(|ex| ex.extract(event));
677        let candidates = self.logsource_candidates(event, event_logsource.as_ref());
678        let mut results = Vec::new();
679        for idx in candidates {
680            if let Some(ref mask) = keep
681                && !mask[idx]
682            {
683                continue;
684            }
685            let rule = &self.rules[idx];
686            if let Some(ref event_ls) = event_logsource
687                && !logsource_compatible(&rule.logsource, event_ls)
688            {
689                continue;
690            }
691            if let Some(mut m) =
692                evaluate_rule_with_bloom(rule, event, &bloom_index::NoBloom, self.match_detail)
693            {
694                if self.include_event
695                    && let Some(d) = m.as_detection_mut()
696                    && d.event.is_none()
697                {
698                    d.event = Some(event.to_json());
699                }
700                results.push(m);
701            }
702        }
703        results
704    }
705
706    fn evaluate_with_bloom_path<E: Event>(&self, event: &E) -> Vec<EvaluationResult> {
707        let bloom = BloomCache::new(&self.bloom_index, event);
708        let keep = self.cross_rule_ac_keep_mask(event);
709        // Extract the event logsource once when pruning is enabled; `None`
710        // (the default) leaves the loop's behaviour unchanged.
711        let event_logsource = self
712            .logsource_extractor
713            .as_ref()
714            .map(|ex| ex.extract(event));
715        let candidates = self.logsource_candidates(event, event_logsource.as_ref());
716        let mut results = Vec::new();
717        for idx in candidates {
718            if let Some(ref mask) = keep
719                && !mask[idx]
720            {
721                continue;
722            }
723            let rule = &self.rules[idx];
724            if let Some(ref event_ls) = event_logsource
725                && !logsource_compatible(&rule.logsource, event_ls)
726            {
727                continue;
728            }
729            if let Some(mut m) = evaluate_rule_with_bloom(rule, event, &bloom, self.match_detail) {
730                if self.include_event
731                    && let Some(d) = m.as_detection_mut()
732                    && d.event.is_none()
733                {
734                    d.event = Some(event.to_json());
735                }
736                results.push(m);
737            }
738        }
739        results
740    }
741
742    /// Evaluate an event against candidate rules matching the given logsource.
743    ///
744    /// Uses the inverted index for candidate pre-filtering, then applies the
745    /// logsource constraint. Only rules whose logsource is compatible with
746    /// `event_logsource` are evaluated.
747    pub fn evaluate_with_logsource<E: Event>(
748        &self,
749        event: &E,
750        event_logsource: &LogSource,
751    ) -> Vec<EvaluationResult> {
752        if self.bloom_prefilter {
753            self.evaluate_with_logsource_with_bloom(event, event_logsource)
754        } else {
755            self.evaluate_with_logsource_no_bloom(event, event_logsource)
756        }
757    }
758
759    fn evaluate_with_logsource_no_bloom<E: Event>(
760        &self,
761        event: &E,
762        event_logsource: &LogSource,
763    ) -> Vec<EvaluationResult> {
764        let keep = self.cross_rule_ac_keep_mask(event);
765        let mut results = Vec::new();
766        for idx in self.rule_index.candidates(event) {
767            if let Some(ref mask) = keep
768                && !mask[idx]
769            {
770                continue;
771            }
772            let rule = &self.rules[idx];
773            if logsource_matches(&rule.logsource, event_logsource)
774                && let Some(mut m) =
775                    evaluate_rule_with_bloom(rule, event, &bloom_index::NoBloom, self.match_detail)
776            {
777                if self.include_event
778                    && let Some(d) = m.as_detection_mut()
779                    && d.event.is_none()
780                {
781                    d.event = Some(event.to_json());
782                }
783                results.push(m);
784            }
785        }
786        results
787    }
788
789    fn evaluate_with_logsource_with_bloom<E: Event>(
790        &self,
791        event: &E,
792        event_logsource: &LogSource,
793    ) -> Vec<EvaluationResult> {
794        let bloom = BloomCache::new(&self.bloom_index, event);
795        let keep = self.cross_rule_ac_keep_mask(event);
796        let mut results = Vec::new();
797        for idx in self.rule_index.candidates(event) {
798            if let Some(ref mask) = keep
799                && !mask[idx]
800            {
801                continue;
802            }
803            let rule = &self.rules[idx];
804            if logsource_matches(&rule.logsource, event_logsource)
805                && let Some(mut m) =
806                    evaluate_rule_with_bloom(rule, event, &bloom, self.match_detail)
807            {
808                if self.include_event
809                    && let Some(d) = m.as_detection_mut()
810                    && d.event.is_none()
811                {
812                    d.event = Some(event.to_json());
813                }
814                results.push(m);
815            }
816        }
817        results
818    }
819
820    /// Evaluate a batch of events, returning per-event match results.
821    ///
822    /// When the `parallel` feature is enabled, events are evaluated concurrently
823    /// using rayon's work-stealing thread pool. Otherwise, falls back to
824    /// sequential evaluation.
825    pub fn evaluate_batch<E: Event + Sync>(&self, events: &[&E]) -> Vec<Vec<EvaluationResult>> {
826        #[cfg(feature = "parallel")]
827        {
828            use rayon::prelude::*;
829            events.par_iter().map(|e| self.evaluate(e)).collect()
830        }
831        #[cfg(not(feature = "parallel"))]
832        {
833            events.iter().map(|e| self.evaluate(e)).collect()
834        }
835    }
836
837    /// Number of rules loaded in the engine.
838    pub fn rule_count(&self) -> usize {
839        self.rules.len()
840    }
841
842    /// Access the compiled rules.
843    pub fn rules(&self) -> &[CompiledRule] {
844        &self.rules
845    }
846}
847
848impl Default for Engine {
849    fn default() -> Self {
850        Self::new()
851    }
852}