Skip to main content

rsigma_runtime/
engine.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use rsigma_eval::event::Event;
5use rsigma_eval::{
6    CorrelationConfig, CorrelationEngine, CorrelationSnapshot, Engine, Pipeline, ProcessResult,
7    parse_pipeline_file,
8};
9use rsigma_parser::SigmaCollection;
10
11use crate::sources::{self, SourceResolver, TemplateExpander};
12
13/// Wraps a CorrelationEngine (or a plain Engine) and provides the interface
14/// the runtime needs: process events, reload rules, and query state.
15pub struct RuntimeEngine {
16    engine: EngineVariant,
17    pipelines: Vec<Pipeline>,
18    pipeline_paths: Vec<PathBuf>,
19    rules_path: std::path::PathBuf,
20    corr_config: CorrelationConfig,
21    include_event: bool,
22    source_resolver: Option<Arc<dyn SourceResolver>>,
23    allow_remote_include: bool,
24    /// Opt-in bloom-filter pre-filtering of positive substring matchers.
25    /// Forwarded to the inner detection engine on every rule reload.
26    bloom_prefilter: bool,
27    /// Optional override for the bloom memory budget in bytes. `None`
28    /// means use the eval crate default.
29    bloom_max_bytes: Option<usize>,
30    /// Opt-in cross-rule Aho-Corasick pre-filter. Forwarded to the inner
31    /// detection engine on every rule reload. Available behind the
32    /// `daachorse-index` Cargo feature.
33    #[cfg(feature = "daachorse-index")]
34    cross_rule_ac: bool,
35}
36
37enum EngineVariant {
38    DetectionOnly(Box<Engine>),
39    WithCorrelations(Box<CorrelationEngine>),
40}
41
42/// Summary statistics about the loaded engine state.
43pub struct EngineStats {
44    pub detection_rules: usize,
45    pub correlation_rules: usize,
46    pub state_entries: usize,
47}
48
49impl RuntimeEngine {
50    pub fn new(
51        rules_path: std::path::PathBuf,
52        pipelines: Vec<Pipeline>,
53        corr_config: CorrelationConfig,
54        include_event: bool,
55    ) -> Self {
56        RuntimeEngine {
57            engine: EngineVariant::DetectionOnly(Box::new(Engine::new())),
58            pipelines,
59            pipeline_paths: Vec::new(),
60            rules_path,
61            corr_config,
62            include_event,
63            source_resolver: None,
64            allow_remote_include: false,
65            bloom_prefilter: false,
66            bloom_max_bytes: None,
67            #[cfg(feature = "daachorse-index")]
68            cross_rule_ac: false,
69        }
70    }
71
72    /// Enable or disable bloom-filter pre-filtering on the inner detection
73    /// engine. Off by default. Applies on the next `load_rules()`; pre-load
74    /// callers should set this before calling `load_rules()`.
75    pub fn set_bloom_prefilter(&mut self, enabled: bool) {
76        self.bloom_prefilter = enabled;
77    }
78
79    /// Override the bloom memory budget on the inner detection engine.
80    /// Applies on the next `load_rules()`.
81    pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
82        self.bloom_max_bytes = Some(max_bytes);
83    }
84
85    /// Enable or disable the cross-rule Aho-Corasick pre-filter on the
86    /// inner detection engine. Off by default; the optimization helps only
87    /// on substring-heavy rule sets > ~5K rules. Applies on the next
88    /// `load_rules()`.
89    ///
90    /// Available behind the `daachorse-index` Cargo feature.
91    #[cfg(feature = "daachorse-index")]
92    pub fn set_cross_rule_ac(&mut self, enabled: bool) {
93        self.cross_rule_ac = enabled;
94    }
95
96    /// Set a source resolver for dynamic pipeline sources.
97    ///
98    /// When set, `load_rules()` resolves dynamic sources and expands
99    /// `${source.*}` templates before compiling rules.
100    pub fn set_source_resolver(&mut self, resolver: Arc<dyn SourceResolver>) {
101        self.source_resolver = Some(resolver);
102    }
103
104    /// Get the source resolver, if one is configured.
105    pub fn source_resolver(&self) -> Option<&Arc<dyn SourceResolver>> {
106        self.source_resolver.as_ref()
107    }
108
109    /// Allow `include` directives to reference HTTP/NATS sources.
110    pub fn set_allow_remote_include(&mut self, allow: bool) {
111        self.allow_remote_include = allow;
112    }
113
114    /// Whether remote includes are allowed.
115    pub fn allow_remote_include(&self) -> bool {
116        self.allow_remote_include
117    }
118
119    /// Set the pipeline file paths used for hot-reload.
120    ///
121    /// When paths are set, `load_rules()` re-reads pipeline YAML from disk
122    /// before rebuilding the engine. This enables pipeline hot-reload
123    /// alongside rule hot-reload.
124    pub fn set_pipeline_paths(&mut self, paths: Vec<PathBuf>) {
125        self.pipeline_paths = paths;
126    }
127
128    /// Return the pipeline file paths (used by the daemon to set up watchers).
129    pub fn pipeline_paths(&self) -> &[PathBuf] {
130        &self.pipeline_paths
131    }
132
133    /// Resolve dynamic sources in all pipelines and expand templates.
134    ///
135    /// This is the async entry point for source resolution. Call this before
136    /// `load_rules()` when you have an async context available, or let
137    /// `load_rules()` handle it synchronously via `tokio::runtime::Handle`.
138    pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> {
139        let Some(resolver) = &self.source_resolver else {
140            return Ok(());
141        };
142
143        let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len());
144        for pipeline in &self.pipelines {
145            if pipeline.is_dynamic() {
146                match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await {
147                    Ok(resolved_data) => {
148                        let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
149                        // Expand include directives
150                        sources::include::expand_includes(
151                            &mut expanded,
152                            &resolved_data,
153                            self.allow_remote_include,
154                        )?;
155                        resolved_pipelines.push(expanded);
156                    }
157                    Err(e) => {
158                        return Err(format!(
159                            "Failed to resolve dynamic pipeline '{}': {e}",
160                            pipeline.name
161                        ));
162                    }
163                }
164            } else {
165                resolved_pipelines.push(pipeline.clone());
166            }
167        }
168        self.pipelines = resolved_pipelines;
169        Ok(())
170    }
171
172    /// Load (or reload) rules from the configured path.
173    ///
174    /// On reload, correlation state is exported before replacing the engine
175    /// and re-imported after, so in-flight windows and suppression state
176    /// survive rule changes (entries for removed correlations are dropped).
177    ///
178    /// If pipeline paths are set (via [`set_pipeline_paths`](Self::set_pipeline_paths)),
179    /// pipelines are re-read from disk before rebuilding the engine. If any
180    /// pipeline file fails to parse, the entire reload is aborted and the
181    /// old engine remains active.
182    ///
183    /// Dynamic pipeline sources are resolved if a source resolver is configured.
184    pub fn load_rules(&mut self) -> Result<EngineStats, String> {
185        let load_span = tracing::info_span!("load_rules", rules_path = %self.rules_path.display());
186        let _enter = load_span.enter();
187        let load_start = std::time::Instant::now();
188
189        if !self.pipeline_paths.is_empty() {
190            self.pipelines = reload_pipelines(&self.pipeline_paths)?;
191        }
192
193        // Resolve dynamic sources if a resolver is set
194        if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) {
195            if let Ok(handle) = tokio::runtime::Handle::try_current() {
196                let pipelines = std::mem::take(&mut self.pipelines);
197                let resolver = self.source_resolver.clone().unwrap();
198                let allow_remote = self.allow_remote_include;
199                let resolved = tokio::task::block_in_place(|| {
200                    handle.block_on(async {
201                        resolve_pipelines_async(&resolver, &pipelines, allow_remote).await
202                    })
203                });
204                match resolved {
205                    Ok(p) => self.pipelines = p,
206                    Err(e) => {
207                        self.pipelines = pipelines;
208                        tracing::warn!(error = %e, "Dynamic source resolution failed, using unresolved pipelines");
209                    }
210                }
211            } else {
212                tracing::warn!("No tokio runtime available for dynamic source resolution");
213            }
214        }
215
216        let previous_state = self.export_state();
217        let collection = load_collection(&self.rules_path)?;
218        let has_correlations = !collection.correlations.is_empty();
219
220        if has_correlations {
221            let mut engine = CorrelationEngine::new(self.corr_config.clone());
222            engine.set_include_event(self.include_event);
223            if let Some(budget) = self.bloom_max_bytes {
224                engine.set_bloom_max_bytes(budget);
225            }
226            engine.set_bloom_prefilter(self.bloom_prefilter);
227            #[cfg(feature = "daachorse-index")]
228            engine.set_cross_rule_ac(self.cross_rule_ac);
229            for p in &self.pipelines {
230                engine.add_pipeline(p.clone());
231            }
232            engine
233                .add_collection(&collection)
234                .map_err(|e| format!("Error compiling rules: {e}"))?;
235
236            if let Some(snapshot) = previous_state {
237                engine.import_state(snapshot);
238            }
239
240            let stats = EngineStats {
241                detection_rules: engine.detection_rule_count(),
242                correlation_rules: engine.correlation_rule_count(),
243                state_entries: engine.state_count(),
244            };
245            self.engine = EngineVariant::WithCorrelations(Box::new(engine));
246            tracing::debug!(
247                detection_rules = stats.detection_rules,
248                correlation_rules = stats.correlation_rules,
249                duration_ms = load_start.elapsed().as_millis() as u64,
250                "Rule load complete",
251            );
252            Ok(stats)
253        } else {
254            let mut engine = Engine::new();
255            engine.set_include_event(self.include_event);
256            if let Some(budget) = self.bloom_max_bytes {
257                engine.set_bloom_max_bytes(budget);
258            }
259            engine.set_bloom_prefilter(self.bloom_prefilter);
260            #[cfg(feature = "daachorse-index")]
261            engine.set_cross_rule_ac(self.cross_rule_ac);
262            for p in &self.pipelines {
263                engine.add_pipeline(p.clone());
264            }
265            engine
266                .add_collection(&collection)
267                .map_err(|e| format!("Error compiling rules: {e}"))?;
268
269            let stats = EngineStats {
270                detection_rules: engine.rule_count(),
271                correlation_rules: 0,
272                state_entries: 0,
273            };
274            self.engine = EngineVariant::DetectionOnly(Box::new(engine));
275            tracing::debug!(
276                detection_rules = stats.detection_rules,
277                correlation_rules = stats.correlation_rules,
278                duration_ms = load_start.elapsed().as_millis() as u64,
279                "Rule load complete",
280            );
281            Ok(stats)
282        }
283    }
284
285    /// Process a batch of events using parallel detection + sequential correlation.
286    ///
287    /// Delegates to `Engine::evaluate_batch` or `CorrelationEngine::process_batch`
288    /// depending on whether correlation rules are loaded.
289    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
290        match &mut self.engine {
291            EngineVariant::DetectionOnly(engine) => {
292                let batch_detections = engine.evaluate_batch(events);
293                batch_detections
294                    .into_iter()
295                    .map(|detections| ProcessResult {
296                        detections,
297                        correlations: vec![],
298                    })
299                    .collect()
300            }
301            EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
302        }
303    }
304
305    /// Return summary statistics about the current engine state.
306    pub fn stats(&self) -> EngineStats {
307        match &self.engine {
308            EngineVariant::DetectionOnly(engine) => EngineStats {
309                detection_rules: engine.rule_count(),
310                correlation_rules: 0,
311                state_entries: 0,
312            },
313            EngineVariant::WithCorrelations(engine) => EngineStats {
314                detection_rules: engine.detection_rule_count(),
315                correlation_rules: engine.correlation_rule_count(),
316                state_entries: engine.state_count(),
317            },
318        }
319    }
320
321    /// Return the path from which rules are loaded.
322    pub fn rules_path(&self) -> &Path {
323        &self.rules_path
324    }
325
326    /// Return the configured processing pipelines.
327    pub fn pipelines(&self) -> &[Pipeline] {
328        &self.pipelines
329    }
330
331    /// Return the correlation configuration.
332    pub fn corr_config(&self) -> &CorrelationConfig {
333        &self.corr_config
334    }
335
336    /// Whether detection results include the matched event.
337    pub fn include_event(&self) -> bool {
338        self.include_event
339    }
340
341    /// Export correlation state as a serializable snapshot.
342    /// Returns `None` if the engine is detection-only (no correlation state to persist).
343    pub fn export_state(&self) -> Option<CorrelationSnapshot> {
344        match &self.engine {
345            EngineVariant::DetectionOnly(_) => None,
346            EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
347        }
348    }
349
350    /// Import previously exported correlation state.
351    /// Returns `true` if the import succeeded, `false` if the snapshot version
352    /// is incompatible. No-op (returns `true`) if the engine is detection-only.
353    pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
354        if let EngineVariant::WithCorrelations(engine) = &mut self.engine {
355            engine.import_state(snapshot.clone())
356        } else {
357            true
358        }
359    }
360}
361
362fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
363    let collection = if path.is_dir() {
364        rsigma_parser::parse_sigma_directory(path)
365            .map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
366    } else {
367        rsigma_parser::parse_sigma_file(path)
368            .map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
369    };
370
371    if !collection.errors.is_empty() {
372        tracing::warn!(
373            count = collection.errors.len(),
374            "Parse errors while loading rules"
375        );
376        for (i, err) in collection.errors.iter().take(3).enumerate() {
377            tracing::warn!(index = i + 1, error = %err, "Rule parse error detail");
378        }
379    }
380
381    Ok(collection)
382}
383
384/// Re-read and parse all pipeline files from disk, sorted by priority.
385fn reload_pipelines(paths: &[PathBuf]) -> Result<Vec<Pipeline>, String> {
386    let mut pipelines = Vec::with_capacity(paths.len());
387    for path in paths {
388        let pipeline = parse_pipeline_file(path)
389            .map_err(|e| format!("Error reloading pipeline {}: {e}", path.display()))?;
390        pipelines.push(pipeline);
391    }
392    pipelines.sort_by_key(|p| p.priority);
393    Ok(pipelines)
394}
395
396/// Resolve dynamic sources in pipelines asynchronously.
397async fn resolve_pipelines_async(
398    resolver: &Arc<dyn SourceResolver>,
399    pipelines: &[Pipeline],
400    allow_remote_include: bool,
401) -> Result<Vec<Pipeline>, String> {
402    let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
403    for pipeline in pipelines {
404        if pipeline.is_dynamic() {
405            let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources)
406                .await
407                .map_err(|e| {
408                    format!(
409                        "Failed to resolve dynamic pipeline '{}': {e}",
410                        pipeline.name
411                    )
412                })?;
413            let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
414            sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?;
415            resolved_pipelines.push(expanded);
416        } else {
417            resolved_pipelines.push(pipeline.clone());
418        }
419    }
420    Ok(resolved_pipelines)
421}