Skip to main content

rsigma_runtime/
engine.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use arc_swap::ArcSwap;
5use rsigma_eval::event::Event;
6use rsigma_eval::{
7    CorrelationConfig, CorrelationEngine, CorrelationSnapshot, Engine, Pipeline, ProcessResult,
8    RuleFieldSet, parse_pipeline_file,
9};
10use rsigma_parser::SigmaCollection;
11
12use crate::sources::{self, SourceResolver, TemplateExpander};
13
14/// Wraps a CorrelationEngine (or a plain Engine) and provides the interface
15/// the runtime needs: process events, reload rules, and query state.
16pub struct RuntimeEngine {
17    engine: EngineVariant,
18    pipelines: Vec<Pipeline>,
19    pipeline_paths: Vec<PathBuf>,
20    rules_path: std::path::PathBuf,
21    corr_config: CorrelationConfig,
22    include_event: bool,
23    source_resolver: Option<Arc<dyn SourceResolver>>,
24    allow_remote_include: bool,
25    /// Opt-in bloom-filter pre-filtering of positive substring matchers.
26    /// Forwarded to the inner detection engine on every rule reload.
27    bloom_prefilter: bool,
28    /// Optional override for the bloom memory budget in bytes. `None`
29    /// means use the eval crate default.
30    bloom_max_bytes: Option<usize>,
31    /// Opt-in cross-rule Aho-Corasick pre-filter. Forwarded to the inner
32    /// detection engine on every rule reload. Available behind the
33    /// `daachorse-index` Cargo feature.
34    #[cfg(feature = "daachorse-index")]
35    cross_rule_ac: bool,
36    /// Post-pipeline rule field set, refreshed at the end of every
37    /// `load_rules()`. Wrapped in `ArcSwap` so readers (e.g. the daemon's
38    /// `/api/v1/fields/*` endpoints) can snapshot a stable view without
39    /// blocking the hot path during a reload.
40    rule_field_set: Arc<ArcSwap<RuleFieldSet>>,
41}
42
43enum EngineVariant {
44    DetectionOnly(Box<Engine>),
45    WithCorrelations(Box<CorrelationEngine>),
46}
47
48/// Summary statistics about the loaded engine state.
49pub struct EngineStats {
50    pub detection_rules: usize,
51    pub correlation_rules: usize,
52    pub state_entries: usize,
53}
54
55impl RuntimeEngine {
56    pub fn new(
57        rules_path: std::path::PathBuf,
58        pipelines: Vec<Pipeline>,
59        corr_config: CorrelationConfig,
60        include_event: bool,
61    ) -> Self {
62        RuntimeEngine {
63            engine: EngineVariant::DetectionOnly(Box::new(Engine::new())),
64            pipelines,
65            pipeline_paths: Vec::new(),
66            rules_path,
67            corr_config,
68            include_event,
69            source_resolver: None,
70            allow_remote_include: false,
71            bloom_prefilter: false,
72            bloom_max_bytes: None,
73            #[cfg(feature = "daachorse-index")]
74            cross_rule_ac: false,
75            rule_field_set: Arc::new(ArcSwap::from_pointee(RuleFieldSet::default())),
76        }
77    }
78
79    /// Return an immutable snapshot of the post-pipeline rule field set.
80    ///
81    /// Cheap to call: returns a refcounted handle that stays valid even if
82    /// `load_rules()` runs concurrently. The daemon's field-observability
83    /// endpoints use this to compute the intersection between observed
84    /// event keys and rule-referenced fields without coordinating with the
85    /// engine lock.
86    pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
87        self.rule_field_set.load_full()
88    }
89
90    /// Enable or disable bloom-filter pre-filtering on the inner detection
91    /// engine. Off by default. Applies on the next `load_rules()`; pre-load
92    /// callers should set this before calling `load_rules()`.
93    pub fn set_bloom_prefilter(&mut self, enabled: bool) {
94        self.bloom_prefilter = enabled;
95    }
96
97    /// Override the bloom memory budget on the inner detection engine.
98    /// Applies on the next `load_rules()`.
99    pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
100        self.bloom_max_bytes = Some(max_bytes);
101    }
102
103    /// Enable or disable the cross-rule Aho-Corasick pre-filter on the
104    /// inner detection engine. Off by default; the optimization helps only
105    /// on substring-heavy rule sets > ~5K rules. Applies on the next
106    /// `load_rules()`.
107    ///
108    /// Available behind the `daachorse-index` Cargo feature.
109    #[cfg(feature = "daachorse-index")]
110    pub fn set_cross_rule_ac(&mut self, enabled: bool) {
111        self.cross_rule_ac = enabled;
112    }
113
114    /// Set a source resolver for dynamic pipeline sources.
115    ///
116    /// When set, `load_rules()` resolves dynamic sources and expands
117    /// `${source.*}` templates before compiling rules.
118    pub fn set_source_resolver(&mut self, resolver: Arc<dyn SourceResolver>) {
119        self.source_resolver = Some(resolver);
120    }
121
122    /// Get the source resolver, if one is configured.
123    pub fn source_resolver(&self) -> Option<&Arc<dyn SourceResolver>> {
124        self.source_resolver.as_ref()
125    }
126
127    /// Allow `include` directives to reference HTTP/NATS sources.
128    pub fn set_allow_remote_include(&mut self, allow: bool) {
129        self.allow_remote_include = allow;
130    }
131
132    /// Whether remote includes are allowed.
133    pub fn allow_remote_include(&self) -> bool {
134        self.allow_remote_include
135    }
136
137    /// Set the pipeline file paths used for hot-reload.
138    ///
139    /// When paths are set, `load_rules()` re-reads pipeline YAML from disk
140    /// before rebuilding the engine. This enables pipeline hot-reload
141    /// alongside rule hot-reload.
142    pub fn set_pipeline_paths(&mut self, paths: Vec<PathBuf>) {
143        self.pipeline_paths = paths;
144    }
145
146    /// Return the pipeline file paths (used by the daemon to set up watchers).
147    pub fn pipeline_paths(&self) -> &[PathBuf] {
148        &self.pipeline_paths
149    }
150
151    /// Resolve dynamic sources in all pipelines and expand templates.
152    ///
153    /// This is the async entry point for source resolution. Call this before
154    /// `load_rules()` when you have an async context available, or let
155    /// `load_rules()` handle it synchronously via `tokio::runtime::Handle`.
156    pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> {
157        let Some(resolver) = &self.source_resolver else {
158            return Ok(());
159        };
160
161        let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len());
162        for pipeline in &self.pipelines {
163            if pipeline.is_dynamic() {
164                match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await {
165                    Ok(resolved_data) => {
166                        let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
167                        // Expand include directives
168                        sources::include::expand_includes(
169                            &mut expanded,
170                            &resolved_data,
171                            self.allow_remote_include,
172                        )?;
173                        resolved_pipelines.push(expanded);
174                    }
175                    Err(e) => {
176                        return Err(format!(
177                            "Failed to resolve dynamic pipeline '{}': {e}",
178                            pipeline.name
179                        ));
180                    }
181                }
182            } else {
183                resolved_pipelines.push(pipeline.clone());
184            }
185        }
186        self.pipelines = resolved_pipelines;
187        Ok(())
188    }
189
190    /// Load (or reload) rules from the configured path.
191    ///
192    /// On reload, correlation state is exported before replacing the engine
193    /// and re-imported after, so in-flight windows and suppression state
194    /// survive rule changes (entries for removed correlations are dropped).
195    ///
196    /// If pipeline paths are set (via [`set_pipeline_paths`](Self::set_pipeline_paths)),
197    /// pipelines are re-read from disk before rebuilding the engine. If any
198    /// pipeline file fails to parse, the entire reload is aborted and the
199    /// old engine remains active.
200    ///
201    /// Dynamic pipeline sources are resolved if a source resolver is configured.
202    pub fn load_rules(&mut self) -> Result<EngineStats, String> {
203        let load_span = tracing::info_span!("load_rules", rules_path = %self.rules_path.display());
204        let _enter = load_span.enter();
205        let load_start = std::time::Instant::now();
206
207        if !self.pipeline_paths.is_empty() {
208            self.pipelines = reload_pipelines(&self.pipeline_paths)?;
209        }
210
211        // Resolve dynamic sources if a resolver is set
212        if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) {
213            if let Ok(handle) = tokio::runtime::Handle::try_current() {
214                let pipelines = std::mem::take(&mut self.pipelines);
215                let resolver = self.source_resolver.clone().unwrap();
216                let allow_remote = self.allow_remote_include;
217                let resolved = tokio::task::block_in_place(|| {
218                    handle.block_on(async {
219                        resolve_pipelines_async(&resolver, &pipelines, allow_remote).await
220                    })
221                });
222                match resolved {
223                    Ok(p) => self.pipelines = p,
224                    Err(e) => {
225                        self.pipelines = pipelines;
226                        tracing::warn!(error = %e, "Dynamic source resolution failed, using unresolved pipelines");
227                    }
228                }
229            } else {
230                tracing::warn!("No tokio runtime available for dynamic source resolution");
231            }
232        }
233
234        let previous_state = self.export_state();
235        let collection = load_collection(&self.rules_path)?;
236        let has_correlations = !collection.correlations.is_empty();
237
238        if has_correlations {
239            let mut engine = CorrelationEngine::new(self.corr_config.clone());
240            engine.set_include_event(self.include_event);
241            if let Some(budget) = self.bloom_max_bytes {
242                engine.set_bloom_max_bytes(budget);
243            }
244            engine.set_bloom_prefilter(self.bloom_prefilter);
245            #[cfg(feature = "daachorse-index")]
246            engine.set_cross_rule_ac(self.cross_rule_ac);
247            for p in &self.pipelines {
248                engine.add_pipeline(p.clone());
249            }
250            engine
251                .add_collection(&collection)
252                .map_err(|e| format!("Error compiling rules: {e}"))?;
253
254            if let Some(snapshot) = previous_state {
255                engine.import_state(snapshot);
256            }
257
258            let stats = EngineStats {
259                detection_rules: engine.detection_rule_count(),
260                correlation_rules: engine.correlation_rule_count(),
261                state_entries: engine.state_count(),
262            };
263            self.engine = EngineVariant::WithCorrelations(Box::new(engine));
264            self.refresh_rule_field_set(&collection);
265            tracing::debug!(
266                detection_rules = stats.detection_rules,
267                correlation_rules = stats.correlation_rules,
268                duration_ms = load_start.elapsed().as_millis() as u64,
269                "Rule load complete",
270            );
271            Ok(stats)
272        } else {
273            let mut engine = Engine::new();
274            engine.set_include_event(self.include_event);
275            if let Some(budget) = self.bloom_max_bytes {
276                engine.set_bloom_max_bytes(budget);
277            }
278            engine.set_bloom_prefilter(self.bloom_prefilter);
279            #[cfg(feature = "daachorse-index")]
280            engine.set_cross_rule_ac(self.cross_rule_ac);
281            for p in &self.pipelines {
282                engine.add_pipeline(p.clone());
283            }
284            engine
285                .add_collection(&collection)
286                .map_err(|e| format!("Error compiling rules: {e}"))?;
287
288            let stats = EngineStats {
289                detection_rules: engine.rule_count(),
290                correlation_rules: 0,
291                state_entries: 0,
292            };
293            self.engine = EngineVariant::DetectionOnly(Box::new(engine));
294            self.refresh_rule_field_set(&collection);
295            tracing::debug!(
296                detection_rules = stats.detection_rules,
297                correlation_rules = stats.correlation_rules,
298                duration_ms = load_start.elapsed().as_millis() as u64,
299                "Rule load complete",
300            );
301            Ok(stats)
302        }
303    }
304
305    /// Recompute the post-pipeline rule field set and publish it. Called at
306    /// the end of every successful `load_rules()` branch.
307    fn refresh_rule_field_set(&self, collection: &SigmaCollection) {
308        let field_set = RuleFieldSet::collect(collection, &self.pipelines, true);
309        self.rule_field_set.store(Arc::new(field_set));
310    }
311
312    /// Process a batch of events using parallel detection + sequential correlation.
313    ///
314    /// Delegates to `Engine::evaluate_batch` or `CorrelationEngine::process_batch`
315    /// depending on whether correlation rules are loaded.
316    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
317        match &mut self.engine {
318            EngineVariant::DetectionOnly(engine) => engine.evaluate_batch(events),
319            EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
320        }
321    }
322
323    /// Return summary statistics about the current engine state.
324    pub fn stats(&self) -> EngineStats {
325        match &self.engine {
326            EngineVariant::DetectionOnly(engine) => EngineStats {
327                detection_rules: engine.rule_count(),
328                correlation_rules: 0,
329                state_entries: 0,
330            },
331            EngineVariant::WithCorrelations(engine) => EngineStats {
332                detection_rules: engine.detection_rule_count(),
333                correlation_rules: engine.correlation_rule_count(),
334                state_entries: engine.state_count(),
335            },
336        }
337    }
338
339    /// Return the path from which rules are loaded.
340    pub fn rules_path(&self) -> &Path {
341        &self.rules_path
342    }
343
344    /// Return the configured processing pipelines.
345    pub fn pipelines(&self) -> &[Pipeline] {
346        &self.pipelines
347    }
348
349    /// Return the correlation configuration.
350    pub fn corr_config(&self) -> &CorrelationConfig {
351        &self.corr_config
352    }
353
354    /// Whether detection results include the matched event.
355    pub fn include_event(&self) -> bool {
356        self.include_event
357    }
358
359    /// Export correlation state as a serializable snapshot.
360    /// Returns `None` if the engine is detection-only (no correlation state to persist).
361    pub fn export_state(&self) -> Option<CorrelationSnapshot> {
362        match &self.engine {
363            EngineVariant::DetectionOnly(_) => None,
364            EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
365        }
366    }
367
368    /// Import previously exported correlation state.
369    /// Returns `true` if the import succeeded, `false` if the snapshot version
370    /// is incompatible. No-op (returns `true`) if the engine is detection-only.
371    pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
372        if let EngineVariant::WithCorrelations(engine) = &mut self.engine {
373            engine.import_state(snapshot.clone())
374        } else {
375            true
376        }
377    }
378}
379
380fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
381    let collection = if path.is_dir() {
382        rsigma_parser::parse_sigma_directory(path)
383            .map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
384    } else {
385        rsigma_parser::parse_sigma_file(path)
386            .map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
387    };
388
389    if !collection.errors.is_empty() {
390        tracing::warn!(
391            count = collection.errors.len(),
392            "Parse errors while loading rules"
393        );
394        for (i, err) in collection.errors.iter().take(3).enumerate() {
395            tracing::warn!(index = i + 1, error = %err, "Rule parse error detail");
396        }
397    }
398
399    Ok(collection)
400}
401
402/// Re-read and parse all pipeline files from disk, sorted by priority.
403fn reload_pipelines(paths: &[PathBuf]) -> Result<Vec<Pipeline>, String> {
404    let mut pipelines = Vec::with_capacity(paths.len());
405    for path in paths {
406        let pipeline = parse_pipeline_file(path)
407            .map_err(|e| format!("Error reloading pipeline {}: {e}", path.display()))?;
408        pipelines.push(pipeline);
409    }
410    pipelines.sort_by_key(|p| p.priority);
411    Ok(pipelines)
412}
413
414/// Resolve dynamic sources in pipelines asynchronously.
415async fn resolve_pipelines_async(
416    resolver: &Arc<dyn SourceResolver>,
417    pipelines: &[Pipeline],
418    allow_remote_include: bool,
419) -> Result<Vec<Pipeline>, String> {
420    let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
421    for pipeline in pipelines {
422        if pipeline.is_dynamic() {
423            let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources)
424                .await
425                .map_err(|e| {
426                    format!(
427                        "Failed to resolve dynamic pipeline '{}': {e}",
428                        pipeline.name
429                    )
430                })?;
431            let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
432            sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?;
433            resolved_pipelines.push(expanded);
434        } else {
435            resolved_pipelines.push(pipeline.clone());
436        }
437    }
438    Ok(resolved_pipelines)
439}