Skip to main content

rsigma_runtime/
engine.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use rsigma_eval::event::Event;
5use rsigma_eval::{
6    CorrelationConfig, CorrelationEngine, CorrelationSnapshot, Engine, Pipeline, ProcessResult,
7    parse_pipeline_file,
8};
9use rsigma_parser::SigmaCollection;
10
11use crate::sources::{self, SourceResolver, TemplateExpander};
12
13/// Wraps a CorrelationEngine (or a plain Engine) and provides the interface
14/// the runtime needs: process events, reload rules, and query state.
15pub struct RuntimeEngine {
16    engine: EngineVariant,
17    pipelines: Vec<Pipeline>,
18    pipeline_paths: Vec<PathBuf>,
19    rules_path: std::path::PathBuf,
20    corr_config: CorrelationConfig,
21    include_event: bool,
22    source_resolver: Option<Arc<dyn SourceResolver>>,
23    allow_remote_include: bool,
24    /// Opt-in bloom-filter pre-filtering of positive substring matchers.
25    /// Forwarded to the inner detection engine on every rule reload.
26    bloom_prefilter: bool,
27    /// Optional override for the bloom memory budget in bytes. `None`
28    /// means use the eval crate default.
29    bloom_max_bytes: Option<usize>,
30    /// Opt-in cross-rule Aho-Corasick pre-filter. Forwarded to the inner
31    /// detection engine on every rule reload. Available behind the
32    /// `daachorse-index` Cargo feature.
33    #[cfg(feature = "daachorse-index")]
34    cross_rule_ac: bool,
35}
36
37enum EngineVariant {
38    DetectionOnly(Box<Engine>),
39    WithCorrelations(Box<CorrelationEngine>),
40}
41
42/// Summary statistics about the loaded engine state.
43pub struct EngineStats {
44    pub detection_rules: usize,
45    pub correlation_rules: usize,
46    pub state_entries: usize,
47}
48
49impl RuntimeEngine {
50    pub fn new(
51        rules_path: std::path::PathBuf,
52        pipelines: Vec<Pipeline>,
53        corr_config: CorrelationConfig,
54        include_event: bool,
55    ) -> Self {
56        RuntimeEngine {
57            engine: EngineVariant::DetectionOnly(Box::new(Engine::new())),
58            pipelines,
59            pipeline_paths: Vec::new(),
60            rules_path,
61            corr_config,
62            include_event,
63            source_resolver: None,
64            allow_remote_include: false,
65            bloom_prefilter: false,
66            bloom_max_bytes: None,
67            #[cfg(feature = "daachorse-index")]
68            cross_rule_ac: false,
69        }
70    }
71
72    /// Enable or disable bloom-filter pre-filtering on the inner detection
73    /// engine. Off by default. Applies on the next `load_rules()`; pre-load
74    /// callers should set this before calling `load_rules()`.
75    pub fn set_bloom_prefilter(&mut self, enabled: bool) {
76        self.bloom_prefilter = enabled;
77    }
78
79    /// Override the bloom memory budget on the inner detection engine.
80    /// Applies on the next `load_rules()`.
81    pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
82        self.bloom_max_bytes = Some(max_bytes);
83    }
84
85    /// Enable or disable the cross-rule Aho-Corasick pre-filter on the
86    /// inner detection engine. Off by default; the optimization helps only
87    /// on substring-heavy rule sets > ~5K rules. Applies on the next
88    /// `load_rules()`.
89    ///
90    /// Available behind the `daachorse-index` Cargo feature.
91    #[cfg(feature = "daachorse-index")]
92    pub fn set_cross_rule_ac(&mut self, enabled: bool) {
93        self.cross_rule_ac = enabled;
94    }
95
96    /// Set a source resolver for dynamic pipeline sources.
97    ///
98    /// When set, `load_rules()` resolves dynamic sources and expands
99    /// `${source.*}` templates before compiling rules.
100    pub fn set_source_resolver(&mut self, resolver: Arc<dyn SourceResolver>) {
101        self.source_resolver = Some(resolver);
102    }
103
104    /// Get the source resolver, if one is configured.
105    pub fn source_resolver(&self) -> Option<&Arc<dyn SourceResolver>> {
106        self.source_resolver.as_ref()
107    }
108
109    /// Allow `include` directives to reference HTTP/NATS sources.
110    pub fn set_allow_remote_include(&mut self, allow: bool) {
111        self.allow_remote_include = allow;
112    }
113
114    /// Whether remote includes are allowed.
115    pub fn allow_remote_include(&self) -> bool {
116        self.allow_remote_include
117    }
118
119    /// Set the pipeline file paths used for hot-reload.
120    ///
121    /// When paths are set, `load_rules()` re-reads pipeline YAML from disk
122    /// before rebuilding the engine. This enables pipeline hot-reload
123    /// alongside rule hot-reload.
124    pub fn set_pipeline_paths(&mut self, paths: Vec<PathBuf>) {
125        self.pipeline_paths = paths;
126    }
127
128    /// Return the pipeline file paths (used by the daemon to set up watchers).
129    pub fn pipeline_paths(&self) -> &[PathBuf] {
130        &self.pipeline_paths
131    }
132
133    /// Resolve dynamic sources in all pipelines and expand templates.
134    ///
135    /// This is the async entry point for source resolution. Call this before
136    /// `load_rules()` when you have an async context available, or let
137    /// `load_rules()` handle it synchronously via `tokio::runtime::Handle`.
138    pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> {
139        let Some(resolver) = &self.source_resolver else {
140            return Ok(());
141        };
142
143        let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len());
144        for pipeline in &self.pipelines {
145            if pipeline.is_dynamic() {
146                match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await {
147                    Ok(resolved_data) => {
148                        let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
149                        // Expand include directives
150                        sources::include::expand_includes(
151                            &mut expanded,
152                            &resolved_data,
153                            self.allow_remote_include,
154                        )?;
155                        resolved_pipelines.push(expanded);
156                    }
157                    Err(e) => {
158                        return Err(format!(
159                            "Failed to resolve dynamic pipeline '{}': {e}",
160                            pipeline.name
161                        ));
162                    }
163                }
164            } else {
165                resolved_pipelines.push(pipeline.clone());
166            }
167        }
168        self.pipelines = resolved_pipelines;
169        Ok(())
170    }
171
172    /// Load (or reload) rules from the configured path.
173    ///
174    /// On reload, correlation state is exported before replacing the engine
175    /// and re-imported after, so in-flight windows and suppression state
176    /// survive rule changes (entries for removed correlations are dropped).
177    ///
178    /// If pipeline paths are set (via [`set_pipeline_paths`](Self::set_pipeline_paths)),
179    /// pipelines are re-read from disk before rebuilding the engine. If any
180    /// pipeline file fails to parse, the entire reload is aborted and the
181    /// old engine remains active.
182    ///
183    /// Dynamic pipeline sources are resolved if a source resolver is configured.
184    pub fn load_rules(&mut self) -> Result<EngineStats, String> {
185        if !self.pipeline_paths.is_empty() {
186            self.pipelines = reload_pipelines(&self.pipeline_paths)?;
187        }
188
189        // Resolve dynamic sources if a resolver is set
190        if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) {
191            if let Ok(handle) = tokio::runtime::Handle::try_current() {
192                let pipelines = std::mem::take(&mut self.pipelines);
193                let resolver = self.source_resolver.clone().unwrap();
194                let allow_remote = self.allow_remote_include;
195                let resolved = tokio::task::block_in_place(|| {
196                    handle.block_on(async {
197                        resolve_pipelines_async(&resolver, &pipelines, allow_remote).await
198                    })
199                });
200                match resolved {
201                    Ok(p) => self.pipelines = p,
202                    Err(e) => {
203                        self.pipelines = pipelines;
204                        tracing::warn!(error = %e, "Dynamic source resolution failed, using unresolved pipelines");
205                    }
206                }
207            } else {
208                tracing::warn!("No tokio runtime available for dynamic source resolution");
209            }
210        }
211
212        let previous_state = self.export_state();
213        let collection = load_collection(&self.rules_path)?;
214        let has_correlations = !collection.correlations.is_empty();
215
216        if has_correlations {
217            let mut engine = CorrelationEngine::new(self.corr_config.clone());
218            engine.set_include_event(self.include_event);
219            if let Some(budget) = self.bloom_max_bytes {
220                engine.set_bloom_max_bytes(budget);
221            }
222            engine.set_bloom_prefilter(self.bloom_prefilter);
223            #[cfg(feature = "daachorse-index")]
224            engine.set_cross_rule_ac(self.cross_rule_ac);
225            for p in &self.pipelines {
226                engine.add_pipeline(p.clone());
227            }
228            engine
229                .add_collection(&collection)
230                .map_err(|e| format!("Error compiling rules: {e}"))?;
231
232            if let Some(snapshot) = previous_state {
233                engine.import_state(snapshot);
234            }
235
236            let stats = EngineStats {
237                detection_rules: engine.detection_rule_count(),
238                correlation_rules: engine.correlation_rule_count(),
239                state_entries: engine.state_count(),
240            };
241            self.engine = EngineVariant::WithCorrelations(Box::new(engine));
242            Ok(stats)
243        } else {
244            let mut engine = Engine::new();
245            engine.set_include_event(self.include_event);
246            if let Some(budget) = self.bloom_max_bytes {
247                engine.set_bloom_max_bytes(budget);
248            }
249            engine.set_bloom_prefilter(self.bloom_prefilter);
250            #[cfg(feature = "daachorse-index")]
251            engine.set_cross_rule_ac(self.cross_rule_ac);
252            for p in &self.pipelines {
253                engine.add_pipeline(p.clone());
254            }
255            engine
256                .add_collection(&collection)
257                .map_err(|e| format!("Error compiling rules: {e}"))?;
258
259            let stats = EngineStats {
260                detection_rules: engine.rule_count(),
261                correlation_rules: 0,
262                state_entries: 0,
263            };
264            self.engine = EngineVariant::DetectionOnly(Box::new(engine));
265            Ok(stats)
266        }
267    }
268
269    /// Process a batch of events using parallel detection + sequential correlation.
270    ///
271    /// Delegates to `Engine::evaluate_batch` or `CorrelationEngine::process_batch`
272    /// depending on whether correlation rules are loaded.
273    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
274        match &mut self.engine {
275            EngineVariant::DetectionOnly(engine) => {
276                let batch_detections = engine.evaluate_batch(events);
277                batch_detections
278                    .into_iter()
279                    .map(|detections| ProcessResult {
280                        detections,
281                        correlations: vec![],
282                    })
283                    .collect()
284            }
285            EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
286        }
287    }
288
289    /// Return summary statistics about the current engine state.
290    pub fn stats(&self) -> EngineStats {
291        match &self.engine {
292            EngineVariant::DetectionOnly(engine) => EngineStats {
293                detection_rules: engine.rule_count(),
294                correlation_rules: 0,
295                state_entries: 0,
296            },
297            EngineVariant::WithCorrelations(engine) => EngineStats {
298                detection_rules: engine.detection_rule_count(),
299                correlation_rules: engine.correlation_rule_count(),
300                state_entries: engine.state_count(),
301            },
302        }
303    }
304
305    /// Return the path from which rules are loaded.
306    pub fn rules_path(&self) -> &Path {
307        &self.rules_path
308    }
309
310    /// Return the configured processing pipelines.
311    pub fn pipelines(&self) -> &[Pipeline] {
312        &self.pipelines
313    }
314
315    /// Return the correlation configuration.
316    pub fn corr_config(&self) -> &CorrelationConfig {
317        &self.corr_config
318    }
319
320    /// Whether detection results include the matched event.
321    pub fn include_event(&self) -> bool {
322        self.include_event
323    }
324
325    /// Export correlation state as a serializable snapshot.
326    /// Returns `None` if the engine is detection-only (no correlation state to persist).
327    pub fn export_state(&self) -> Option<CorrelationSnapshot> {
328        match &self.engine {
329            EngineVariant::DetectionOnly(_) => None,
330            EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
331        }
332    }
333
334    /// Import previously exported correlation state.
335    /// Returns `true` if the import succeeded, `false` if the snapshot version
336    /// is incompatible. No-op (returns `true`) if the engine is detection-only.
337    pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
338        if let EngineVariant::WithCorrelations(engine) = &mut self.engine {
339            engine.import_state(snapshot.clone())
340        } else {
341            true
342        }
343    }
344}
345
346fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
347    let collection = if path.is_dir() {
348        rsigma_parser::parse_sigma_directory(path)
349            .map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
350    } else {
351        rsigma_parser::parse_sigma_file(path)
352            .map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
353    };
354
355    if !collection.errors.is_empty() {
356        tracing::warn!(
357            count = collection.errors.len(),
358            "Parse errors while loading rules"
359        );
360    }
361
362    Ok(collection)
363}
364
365/// Re-read and parse all pipeline files from disk, sorted by priority.
366fn reload_pipelines(paths: &[PathBuf]) -> Result<Vec<Pipeline>, String> {
367    let mut pipelines = Vec::with_capacity(paths.len());
368    for path in paths {
369        let pipeline = parse_pipeline_file(path)
370            .map_err(|e| format!("Error reloading pipeline {}: {e}", path.display()))?;
371        pipelines.push(pipeline);
372    }
373    pipelines.sort_by_key(|p| p.priority);
374    Ok(pipelines)
375}
376
377/// Resolve dynamic sources in pipelines asynchronously.
378async fn resolve_pipelines_async(
379    resolver: &Arc<dyn SourceResolver>,
380    pipelines: &[Pipeline],
381    allow_remote_include: bool,
382) -> Result<Vec<Pipeline>, String> {
383    let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
384    for pipeline in pipelines {
385        if pipeline.is_dynamic() {
386            let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources)
387                .await
388                .map_err(|e| {
389                    format!(
390                        "Failed to resolve dynamic pipeline '{}': {e}",
391                        pipeline.name
392                    )
393                })?;
394            let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
395            sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?;
396            resolved_pipelines.push(expanded);
397        } else {
398            resolved_pipelines.push(pipeline.clone());
399        }
400    }
401    Ok(resolved_pipelines)
402}