Skip to main content

rsigma_runtime/
engine.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use rsigma_eval::event::Event;
5use rsigma_eval::{
6    CorrelationConfig, CorrelationEngine, CorrelationSnapshot, Engine, Pipeline, ProcessResult,
7    parse_pipeline_file,
8};
9use rsigma_parser::SigmaCollection;
10
11use crate::sources::{self, SourceResolver, TemplateExpander};
12
13/// Wraps a CorrelationEngine (or a plain Engine) and provides the interface
14/// the runtime needs: process events, reload rules, and query state.
15pub struct RuntimeEngine {
16    engine: EngineVariant,
17    pipelines: Vec<Pipeline>,
18    pipeline_paths: Vec<PathBuf>,
19    rules_path: std::path::PathBuf,
20    corr_config: CorrelationConfig,
21    include_event: bool,
22    source_resolver: Option<Arc<dyn SourceResolver>>,
23    allow_remote_include: bool,
24}
25
26enum EngineVariant {
27    DetectionOnly(Engine),
28    WithCorrelations(Box<CorrelationEngine>),
29}
30
31/// Summary statistics about the loaded engine state.
32pub struct EngineStats {
33    pub detection_rules: usize,
34    pub correlation_rules: usize,
35    pub state_entries: usize,
36}
37
38impl RuntimeEngine {
39    pub fn new(
40        rules_path: std::path::PathBuf,
41        pipelines: Vec<Pipeline>,
42        corr_config: CorrelationConfig,
43        include_event: bool,
44    ) -> Self {
45        RuntimeEngine {
46            engine: EngineVariant::DetectionOnly(Engine::new()),
47            pipelines,
48            pipeline_paths: Vec::new(),
49            rules_path,
50            corr_config,
51            include_event,
52            source_resolver: None,
53            allow_remote_include: false,
54        }
55    }
56
57    /// Set a source resolver for dynamic pipeline sources.
58    ///
59    /// When set, `load_rules()` resolves dynamic sources and expands
60    /// `${source.*}` templates before compiling rules.
61    pub fn set_source_resolver(&mut self, resolver: Arc<dyn SourceResolver>) {
62        self.source_resolver = Some(resolver);
63    }
64
65    /// Get the source resolver, if one is configured.
66    pub fn source_resolver(&self) -> Option<&Arc<dyn SourceResolver>> {
67        self.source_resolver.as_ref()
68    }
69
70    /// Allow `include` directives to reference HTTP/NATS sources.
71    pub fn set_allow_remote_include(&mut self, allow: bool) {
72        self.allow_remote_include = allow;
73    }
74
75    /// Whether remote includes are allowed.
76    pub fn allow_remote_include(&self) -> bool {
77        self.allow_remote_include
78    }
79
80    /// Set the pipeline file paths used for hot-reload.
81    ///
82    /// When paths are set, `load_rules()` re-reads pipeline YAML from disk
83    /// before rebuilding the engine. This enables pipeline hot-reload
84    /// alongside rule hot-reload.
85    pub fn set_pipeline_paths(&mut self, paths: Vec<PathBuf>) {
86        self.pipeline_paths = paths;
87    }
88
89    /// Return the pipeline file paths (used by the daemon to set up watchers).
90    pub fn pipeline_paths(&self) -> &[PathBuf] {
91        &self.pipeline_paths
92    }
93
94    /// Resolve dynamic sources in all pipelines and expand templates.
95    ///
96    /// This is the async entry point for source resolution. Call this before
97    /// `load_rules()` when you have an async context available, or let
98    /// `load_rules()` handle it synchronously via `tokio::runtime::Handle`.
99    pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> {
100        let Some(resolver) = &self.source_resolver else {
101            return Ok(());
102        };
103
104        let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len());
105        for pipeline in &self.pipelines {
106            if pipeline.is_dynamic() {
107                match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await {
108                    Ok(resolved_data) => {
109                        let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
110                        // Expand include directives
111                        sources::include::expand_includes(
112                            &mut expanded,
113                            &resolved_data,
114                            self.allow_remote_include,
115                        )?;
116                        resolved_pipelines.push(expanded);
117                    }
118                    Err(e) => {
119                        return Err(format!(
120                            "Failed to resolve dynamic pipeline '{}': {e}",
121                            pipeline.name
122                        ));
123                    }
124                }
125            } else {
126                resolved_pipelines.push(pipeline.clone());
127            }
128        }
129        self.pipelines = resolved_pipelines;
130        Ok(())
131    }
132
133    /// Load (or reload) rules from the configured path.
134    ///
135    /// On reload, correlation state is exported before replacing the engine
136    /// and re-imported after, so in-flight windows and suppression state
137    /// survive rule changes (entries for removed correlations are dropped).
138    ///
139    /// If pipeline paths are set (via [`set_pipeline_paths`](Self::set_pipeline_paths)),
140    /// pipelines are re-read from disk before rebuilding the engine. If any
141    /// pipeline file fails to parse, the entire reload is aborted and the
142    /// old engine remains active.
143    ///
144    /// Dynamic pipeline sources are resolved if a source resolver is configured.
145    pub fn load_rules(&mut self) -> Result<EngineStats, String> {
146        if !self.pipeline_paths.is_empty() {
147            self.pipelines = reload_pipelines(&self.pipeline_paths)?;
148        }
149
150        // Resolve dynamic sources if a resolver is set
151        if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) {
152            if let Ok(handle) = tokio::runtime::Handle::try_current() {
153                let pipelines = std::mem::take(&mut self.pipelines);
154                let resolver = self.source_resolver.clone().unwrap();
155                let allow_remote = self.allow_remote_include;
156                let resolved = tokio::task::block_in_place(|| {
157                    handle.block_on(async {
158                        resolve_pipelines_async(&resolver, &pipelines, allow_remote).await
159                    })
160                });
161                match resolved {
162                    Ok(p) => self.pipelines = p,
163                    Err(e) => {
164                        self.pipelines = pipelines;
165                        tracing::warn!(error = %e, "Dynamic source resolution failed, using unresolved pipelines");
166                    }
167                }
168            } else {
169                tracing::warn!("No tokio runtime available for dynamic source resolution");
170            }
171        }
172
173        let previous_state = self.export_state();
174        let collection = load_collection(&self.rules_path)?;
175        let has_correlations = !collection.correlations.is_empty();
176
177        if has_correlations {
178            let mut engine = CorrelationEngine::new(self.corr_config.clone());
179            engine.set_include_event(self.include_event);
180            for p in &self.pipelines {
181                engine.add_pipeline(p.clone());
182            }
183            engine
184                .add_collection(&collection)
185                .map_err(|e| format!("Error compiling rules: {e}"))?;
186
187            if let Some(snapshot) = previous_state {
188                engine.import_state(snapshot);
189            }
190
191            let stats = EngineStats {
192                detection_rules: engine.detection_rule_count(),
193                correlation_rules: engine.correlation_rule_count(),
194                state_entries: engine.state_count(),
195            };
196            self.engine = EngineVariant::WithCorrelations(Box::new(engine));
197            Ok(stats)
198        } else {
199            let mut engine = Engine::new();
200            engine.set_include_event(self.include_event);
201            for p in &self.pipelines {
202                engine.add_pipeline(p.clone());
203            }
204            engine
205                .add_collection(&collection)
206                .map_err(|e| format!("Error compiling rules: {e}"))?;
207
208            let stats = EngineStats {
209                detection_rules: engine.rule_count(),
210                correlation_rules: 0,
211                state_entries: 0,
212            };
213            self.engine = EngineVariant::DetectionOnly(engine);
214            Ok(stats)
215        }
216    }
217
218    /// Process a batch of events using parallel detection + sequential correlation.
219    ///
220    /// Delegates to `Engine::evaluate_batch` or `CorrelationEngine::process_batch`
221    /// depending on whether correlation rules are loaded.
222    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
223        match &mut self.engine {
224            EngineVariant::DetectionOnly(engine) => {
225                let batch_detections = engine.evaluate_batch(events);
226                batch_detections
227                    .into_iter()
228                    .map(|detections| ProcessResult {
229                        detections,
230                        correlations: vec![],
231                    })
232                    .collect()
233            }
234            EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
235        }
236    }
237
238    /// Return summary statistics about the current engine state.
239    pub fn stats(&self) -> EngineStats {
240        match &self.engine {
241            EngineVariant::DetectionOnly(engine) => EngineStats {
242                detection_rules: engine.rule_count(),
243                correlation_rules: 0,
244                state_entries: 0,
245            },
246            EngineVariant::WithCorrelations(engine) => EngineStats {
247                detection_rules: engine.detection_rule_count(),
248                correlation_rules: engine.correlation_rule_count(),
249                state_entries: engine.state_count(),
250            },
251        }
252    }
253
254    /// Return the path from which rules are loaded.
255    pub fn rules_path(&self) -> &Path {
256        &self.rules_path
257    }
258
259    /// Return the configured processing pipelines.
260    pub fn pipelines(&self) -> &[Pipeline] {
261        &self.pipelines
262    }
263
264    /// Return the correlation configuration.
265    pub fn corr_config(&self) -> &CorrelationConfig {
266        &self.corr_config
267    }
268
269    /// Whether detection results include the matched event.
270    pub fn include_event(&self) -> bool {
271        self.include_event
272    }
273
274    /// Export correlation state as a serializable snapshot.
275    /// Returns `None` if the engine is detection-only (no correlation state to persist).
276    pub fn export_state(&self) -> Option<CorrelationSnapshot> {
277        match &self.engine {
278            EngineVariant::DetectionOnly(_) => None,
279            EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
280        }
281    }
282
283    /// Import previously exported correlation state.
284    /// Returns `true` if the import succeeded, `false` if the snapshot version
285    /// is incompatible. No-op (returns `true`) if the engine is detection-only.
286    pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
287        if let EngineVariant::WithCorrelations(engine) = &mut self.engine {
288            engine.import_state(snapshot.clone())
289        } else {
290            true
291        }
292    }
293}
294
295fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
296    let collection = if path.is_dir() {
297        rsigma_parser::parse_sigma_directory(path)
298            .map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
299    } else {
300        rsigma_parser::parse_sigma_file(path)
301            .map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
302    };
303
304    if !collection.errors.is_empty() {
305        tracing::warn!(
306            count = collection.errors.len(),
307            "Parse errors while loading rules"
308        );
309    }
310
311    Ok(collection)
312}
313
314/// Re-read and parse all pipeline files from disk, sorted by priority.
315fn reload_pipelines(paths: &[PathBuf]) -> Result<Vec<Pipeline>, String> {
316    let mut pipelines = Vec::with_capacity(paths.len());
317    for path in paths {
318        let pipeline = parse_pipeline_file(path)
319            .map_err(|e| format!("Error reloading pipeline {}: {e}", path.display()))?;
320        pipelines.push(pipeline);
321    }
322    pipelines.sort_by_key(|p| p.priority);
323    Ok(pipelines)
324}
325
326/// Resolve dynamic sources in pipelines asynchronously.
327async fn resolve_pipelines_async(
328    resolver: &Arc<dyn SourceResolver>,
329    pipelines: &[Pipeline],
330    allow_remote_include: bool,
331) -> Result<Vec<Pipeline>, String> {
332    let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
333    for pipeline in pipelines {
334        if pipeline.is_dynamic() {
335            let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources)
336                .await
337                .map_err(|e| {
338                    format!(
339                        "Failed to resolve dynamic pipeline '{}': {e}",
340                        pipeline.name
341                    )
342                })?;
343            let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
344            sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?;
345            resolved_pipelines.push(expanded);
346        } else {
347            resolved_pipelines.push(pipeline.clone());
348        }
349    }
350    Ok(resolved_pipelines)
351}