1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use rsigma_eval::event::Event;
5use rsigma_eval::{
6 CorrelationConfig, CorrelationEngine, CorrelationSnapshot, Engine, Pipeline, ProcessResult,
7 parse_pipeline_file,
8};
9use rsigma_parser::SigmaCollection;
10
11use crate::sources::{self, SourceResolver, TemplateExpander};
12
13pub struct RuntimeEngine {
16 engine: EngineVariant,
17 pipelines: Vec<Pipeline>,
18 pipeline_paths: Vec<PathBuf>,
19 rules_path: std::path::PathBuf,
20 corr_config: CorrelationConfig,
21 include_event: bool,
22 source_resolver: Option<Arc<dyn SourceResolver>>,
23 allow_remote_include: bool,
24 bloom_prefilter: bool,
27 bloom_max_bytes: Option<usize>,
30 #[cfg(feature = "daachorse-index")]
34 cross_rule_ac: bool,
35}
36
37enum EngineVariant {
38 DetectionOnly(Box<Engine>),
39 WithCorrelations(Box<CorrelationEngine>),
40}
41
42pub struct EngineStats {
44 pub detection_rules: usize,
45 pub correlation_rules: usize,
46 pub state_entries: usize,
47}
48
49impl RuntimeEngine {
50 pub fn new(
51 rules_path: std::path::PathBuf,
52 pipelines: Vec<Pipeline>,
53 corr_config: CorrelationConfig,
54 include_event: bool,
55 ) -> Self {
56 RuntimeEngine {
57 engine: EngineVariant::DetectionOnly(Box::new(Engine::new())),
58 pipelines,
59 pipeline_paths: Vec::new(),
60 rules_path,
61 corr_config,
62 include_event,
63 source_resolver: None,
64 allow_remote_include: false,
65 bloom_prefilter: false,
66 bloom_max_bytes: None,
67 #[cfg(feature = "daachorse-index")]
68 cross_rule_ac: false,
69 }
70 }
71
72 pub fn set_bloom_prefilter(&mut self, enabled: bool) {
76 self.bloom_prefilter = enabled;
77 }
78
79 pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
82 self.bloom_max_bytes = Some(max_bytes);
83 }
84
85 #[cfg(feature = "daachorse-index")]
92 pub fn set_cross_rule_ac(&mut self, enabled: bool) {
93 self.cross_rule_ac = enabled;
94 }
95
96 pub fn set_source_resolver(&mut self, resolver: Arc<dyn SourceResolver>) {
101 self.source_resolver = Some(resolver);
102 }
103
104 pub fn source_resolver(&self) -> Option<&Arc<dyn SourceResolver>> {
106 self.source_resolver.as_ref()
107 }
108
109 pub fn set_allow_remote_include(&mut self, allow: bool) {
111 self.allow_remote_include = allow;
112 }
113
114 pub fn allow_remote_include(&self) -> bool {
116 self.allow_remote_include
117 }
118
119 pub fn set_pipeline_paths(&mut self, paths: Vec<PathBuf>) {
125 self.pipeline_paths = paths;
126 }
127
128 pub fn pipeline_paths(&self) -> &[PathBuf] {
130 &self.pipeline_paths
131 }
132
133 pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> {
139 let Some(resolver) = &self.source_resolver else {
140 return Ok(());
141 };
142
143 let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len());
144 for pipeline in &self.pipelines {
145 if pipeline.is_dynamic() {
146 match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await {
147 Ok(resolved_data) => {
148 let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
149 sources::include::expand_includes(
151 &mut expanded,
152 &resolved_data,
153 self.allow_remote_include,
154 )?;
155 resolved_pipelines.push(expanded);
156 }
157 Err(e) => {
158 return Err(format!(
159 "Failed to resolve dynamic pipeline '{}': {e}",
160 pipeline.name
161 ));
162 }
163 }
164 } else {
165 resolved_pipelines.push(pipeline.clone());
166 }
167 }
168 self.pipelines = resolved_pipelines;
169 Ok(())
170 }
171
172 pub fn load_rules(&mut self) -> Result<EngineStats, String> {
185 if !self.pipeline_paths.is_empty() {
186 self.pipelines = reload_pipelines(&self.pipeline_paths)?;
187 }
188
189 if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) {
191 if let Ok(handle) = tokio::runtime::Handle::try_current() {
192 let pipelines = std::mem::take(&mut self.pipelines);
193 let resolver = self.source_resolver.clone().unwrap();
194 let allow_remote = self.allow_remote_include;
195 let resolved = tokio::task::block_in_place(|| {
196 handle.block_on(async {
197 resolve_pipelines_async(&resolver, &pipelines, allow_remote).await
198 })
199 });
200 match resolved {
201 Ok(p) => self.pipelines = p,
202 Err(e) => {
203 self.pipelines = pipelines;
204 tracing::warn!(error = %e, "Dynamic source resolution failed, using unresolved pipelines");
205 }
206 }
207 } else {
208 tracing::warn!("No tokio runtime available for dynamic source resolution");
209 }
210 }
211
212 let previous_state = self.export_state();
213 let collection = load_collection(&self.rules_path)?;
214 let has_correlations = !collection.correlations.is_empty();
215
216 if has_correlations {
217 let mut engine = CorrelationEngine::new(self.corr_config.clone());
218 engine.set_include_event(self.include_event);
219 if let Some(budget) = self.bloom_max_bytes {
220 engine.set_bloom_max_bytes(budget);
221 }
222 engine.set_bloom_prefilter(self.bloom_prefilter);
223 #[cfg(feature = "daachorse-index")]
224 engine.set_cross_rule_ac(self.cross_rule_ac);
225 for p in &self.pipelines {
226 engine.add_pipeline(p.clone());
227 }
228 engine
229 .add_collection(&collection)
230 .map_err(|e| format!("Error compiling rules: {e}"))?;
231
232 if let Some(snapshot) = previous_state {
233 engine.import_state(snapshot);
234 }
235
236 let stats = EngineStats {
237 detection_rules: engine.detection_rule_count(),
238 correlation_rules: engine.correlation_rule_count(),
239 state_entries: engine.state_count(),
240 };
241 self.engine = EngineVariant::WithCorrelations(Box::new(engine));
242 Ok(stats)
243 } else {
244 let mut engine = Engine::new();
245 engine.set_include_event(self.include_event);
246 if let Some(budget) = self.bloom_max_bytes {
247 engine.set_bloom_max_bytes(budget);
248 }
249 engine.set_bloom_prefilter(self.bloom_prefilter);
250 #[cfg(feature = "daachorse-index")]
251 engine.set_cross_rule_ac(self.cross_rule_ac);
252 for p in &self.pipelines {
253 engine.add_pipeline(p.clone());
254 }
255 engine
256 .add_collection(&collection)
257 .map_err(|e| format!("Error compiling rules: {e}"))?;
258
259 let stats = EngineStats {
260 detection_rules: engine.rule_count(),
261 correlation_rules: 0,
262 state_entries: 0,
263 };
264 self.engine = EngineVariant::DetectionOnly(Box::new(engine));
265 Ok(stats)
266 }
267 }
268
269 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
274 match &mut self.engine {
275 EngineVariant::DetectionOnly(engine) => {
276 let batch_detections = engine.evaluate_batch(events);
277 batch_detections
278 .into_iter()
279 .map(|detections| ProcessResult {
280 detections,
281 correlations: vec![],
282 })
283 .collect()
284 }
285 EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
286 }
287 }
288
289 pub fn stats(&self) -> EngineStats {
291 match &self.engine {
292 EngineVariant::DetectionOnly(engine) => EngineStats {
293 detection_rules: engine.rule_count(),
294 correlation_rules: 0,
295 state_entries: 0,
296 },
297 EngineVariant::WithCorrelations(engine) => EngineStats {
298 detection_rules: engine.detection_rule_count(),
299 correlation_rules: engine.correlation_rule_count(),
300 state_entries: engine.state_count(),
301 },
302 }
303 }
304
305 pub fn rules_path(&self) -> &Path {
307 &self.rules_path
308 }
309
310 pub fn pipelines(&self) -> &[Pipeline] {
312 &self.pipelines
313 }
314
315 pub fn corr_config(&self) -> &CorrelationConfig {
317 &self.corr_config
318 }
319
320 pub fn include_event(&self) -> bool {
322 self.include_event
323 }
324
325 pub fn export_state(&self) -> Option<CorrelationSnapshot> {
328 match &self.engine {
329 EngineVariant::DetectionOnly(_) => None,
330 EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
331 }
332 }
333
334 pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
338 if let EngineVariant::WithCorrelations(engine) = &mut self.engine {
339 engine.import_state(snapshot.clone())
340 } else {
341 true
342 }
343 }
344}
345
346fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
347 let collection = if path.is_dir() {
348 rsigma_parser::parse_sigma_directory(path)
349 .map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
350 } else {
351 rsigma_parser::parse_sigma_file(path)
352 .map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
353 };
354
355 if !collection.errors.is_empty() {
356 tracing::warn!(
357 count = collection.errors.len(),
358 "Parse errors while loading rules"
359 );
360 }
361
362 Ok(collection)
363}
364
365fn reload_pipelines(paths: &[PathBuf]) -> Result<Vec<Pipeline>, String> {
367 let mut pipelines = Vec::with_capacity(paths.len());
368 for path in paths {
369 let pipeline = parse_pipeline_file(path)
370 .map_err(|e| format!("Error reloading pipeline {}: {e}", path.display()))?;
371 pipelines.push(pipeline);
372 }
373 pipelines.sort_by_key(|p| p.priority);
374 Ok(pipelines)
375}
376
377async fn resolve_pipelines_async(
379 resolver: &Arc<dyn SourceResolver>,
380 pipelines: &[Pipeline],
381 allow_remote_include: bool,
382) -> Result<Vec<Pipeline>, String> {
383 let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
384 for pipeline in pipelines {
385 if pipeline.is_dynamic() {
386 let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources)
387 .await
388 .map_err(|e| {
389 format!(
390 "Failed to resolve dynamic pipeline '{}': {e}",
391 pipeline.name
392 )
393 })?;
394 let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
395 sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?;
396 resolved_pipelines.push(expanded);
397 } else {
398 resolved_pipelines.push(pipeline.clone());
399 }
400 }
401 Ok(resolved_pipelines)
402}