1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use rsigma_eval::event::Event;
5use rsigma_eval::{
6 CorrelationConfig, CorrelationEngine, CorrelationSnapshot, Engine, Pipeline, ProcessResult,
7 parse_pipeline_file,
8};
9use rsigma_parser::SigmaCollection;
10
11use crate::sources::{self, SourceResolver, TemplateExpander};
12
13pub struct RuntimeEngine {
16 engine: EngineVariant,
17 pipelines: Vec<Pipeline>,
18 pipeline_paths: Vec<PathBuf>,
19 rules_path: std::path::PathBuf,
20 corr_config: CorrelationConfig,
21 include_event: bool,
22 source_resolver: Option<Arc<dyn SourceResolver>>,
23 allow_remote_include: bool,
24 bloom_prefilter: bool,
27 bloom_max_bytes: Option<usize>,
30 #[cfg(feature = "daachorse-index")]
34 cross_rule_ac: bool,
35}
36
37enum EngineVariant {
38 DetectionOnly(Box<Engine>),
39 WithCorrelations(Box<CorrelationEngine>),
40}
41
42pub struct EngineStats {
44 pub detection_rules: usize,
45 pub correlation_rules: usize,
46 pub state_entries: usize,
47}
48
49impl RuntimeEngine {
50 pub fn new(
51 rules_path: std::path::PathBuf,
52 pipelines: Vec<Pipeline>,
53 corr_config: CorrelationConfig,
54 include_event: bool,
55 ) -> Self {
56 RuntimeEngine {
57 engine: EngineVariant::DetectionOnly(Box::new(Engine::new())),
58 pipelines,
59 pipeline_paths: Vec::new(),
60 rules_path,
61 corr_config,
62 include_event,
63 source_resolver: None,
64 allow_remote_include: false,
65 bloom_prefilter: false,
66 bloom_max_bytes: None,
67 #[cfg(feature = "daachorse-index")]
68 cross_rule_ac: false,
69 }
70 }
71
72 pub fn set_bloom_prefilter(&mut self, enabled: bool) {
76 self.bloom_prefilter = enabled;
77 }
78
79 pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
82 self.bloom_max_bytes = Some(max_bytes);
83 }
84
85 #[cfg(feature = "daachorse-index")]
92 pub fn set_cross_rule_ac(&mut self, enabled: bool) {
93 self.cross_rule_ac = enabled;
94 }
95
96 pub fn set_source_resolver(&mut self, resolver: Arc<dyn SourceResolver>) {
101 self.source_resolver = Some(resolver);
102 }
103
104 pub fn source_resolver(&self) -> Option<&Arc<dyn SourceResolver>> {
106 self.source_resolver.as_ref()
107 }
108
109 pub fn set_allow_remote_include(&mut self, allow: bool) {
111 self.allow_remote_include = allow;
112 }
113
114 pub fn allow_remote_include(&self) -> bool {
116 self.allow_remote_include
117 }
118
119 pub fn set_pipeline_paths(&mut self, paths: Vec<PathBuf>) {
125 self.pipeline_paths = paths;
126 }
127
128 pub fn pipeline_paths(&self) -> &[PathBuf] {
130 &self.pipeline_paths
131 }
132
133 pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> {
139 let Some(resolver) = &self.source_resolver else {
140 return Ok(());
141 };
142
143 let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len());
144 for pipeline in &self.pipelines {
145 if pipeline.is_dynamic() {
146 match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await {
147 Ok(resolved_data) => {
148 let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
149 sources::include::expand_includes(
151 &mut expanded,
152 &resolved_data,
153 self.allow_remote_include,
154 )?;
155 resolved_pipelines.push(expanded);
156 }
157 Err(e) => {
158 return Err(format!(
159 "Failed to resolve dynamic pipeline '{}': {e}",
160 pipeline.name
161 ));
162 }
163 }
164 } else {
165 resolved_pipelines.push(pipeline.clone());
166 }
167 }
168 self.pipelines = resolved_pipelines;
169 Ok(())
170 }
171
172 pub fn load_rules(&mut self) -> Result<EngineStats, String> {
185 let load_span = tracing::info_span!("load_rules", rules_path = %self.rules_path.display());
186 let _enter = load_span.enter();
187 let load_start = std::time::Instant::now();
188
189 if !self.pipeline_paths.is_empty() {
190 self.pipelines = reload_pipelines(&self.pipeline_paths)?;
191 }
192
193 if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) {
195 if let Ok(handle) = tokio::runtime::Handle::try_current() {
196 let pipelines = std::mem::take(&mut self.pipelines);
197 let resolver = self.source_resolver.clone().unwrap();
198 let allow_remote = self.allow_remote_include;
199 let resolved = tokio::task::block_in_place(|| {
200 handle.block_on(async {
201 resolve_pipelines_async(&resolver, &pipelines, allow_remote).await
202 })
203 });
204 match resolved {
205 Ok(p) => self.pipelines = p,
206 Err(e) => {
207 self.pipelines = pipelines;
208 tracing::warn!(error = %e, "Dynamic source resolution failed, using unresolved pipelines");
209 }
210 }
211 } else {
212 tracing::warn!("No tokio runtime available for dynamic source resolution");
213 }
214 }
215
216 let previous_state = self.export_state();
217 let collection = load_collection(&self.rules_path)?;
218 let has_correlations = !collection.correlations.is_empty();
219
220 if has_correlations {
221 let mut engine = CorrelationEngine::new(self.corr_config.clone());
222 engine.set_include_event(self.include_event);
223 if let Some(budget) = self.bloom_max_bytes {
224 engine.set_bloom_max_bytes(budget);
225 }
226 engine.set_bloom_prefilter(self.bloom_prefilter);
227 #[cfg(feature = "daachorse-index")]
228 engine.set_cross_rule_ac(self.cross_rule_ac);
229 for p in &self.pipelines {
230 engine.add_pipeline(p.clone());
231 }
232 engine
233 .add_collection(&collection)
234 .map_err(|e| format!("Error compiling rules: {e}"))?;
235
236 if let Some(snapshot) = previous_state {
237 engine.import_state(snapshot);
238 }
239
240 let stats = EngineStats {
241 detection_rules: engine.detection_rule_count(),
242 correlation_rules: engine.correlation_rule_count(),
243 state_entries: engine.state_count(),
244 };
245 self.engine = EngineVariant::WithCorrelations(Box::new(engine));
246 tracing::debug!(
247 detection_rules = stats.detection_rules,
248 correlation_rules = stats.correlation_rules,
249 duration_ms = load_start.elapsed().as_millis() as u64,
250 "Rule load complete",
251 );
252 Ok(stats)
253 } else {
254 let mut engine = Engine::new();
255 engine.set_include_event(self.include_event);
256 if let Some(budget) = self.bloom_max_bytes {
257 engine.set_bloom_max_bytes(budget);
258 }
259 engine.set_bloom_prefilter(self.bloom_prefilter);
260 #[cfg(feature = "daachorse-index")]
261 engine.set_cross_rule_ac(self.cross_rule_ac);
262 for p in &self.pipelines {
263 engine.add_pipeline(p.clone());
264 }
265 engine
266 .add_collection(&collection)
267 .map_err(|e| format!("Error compiling rules: {e}"))?;
268
269 let stats = EngineStats {
270 detection_rules: engine.rule_count(),
271 correlation_rules: 0,
272 state_entries: 0,
273 };
274 self.engine = EngineVariant::DetectionOnly(Box::new(engine));
275 tracing::debug!(
276 detection_rules = stats.detection_rules,
277 correlation_rules = stats.correlation_rules,
278 duration_ms = load_start.elapsed().as_millis() as u64,
279 "Rule load complete",
280 );
281 Ok(stats)
282 }
283 }
284
285 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
290 match &mut self.engine {
291 EngineVariant::DetectionOnly(engine) => {
292 let batch_detections = engine.evaluate_batch(events);
293 batch_detections
294 .into_iter()
295 .map(|detections| ProcessResult {
296 detections,
297 correlations: vec![],
298 })
299 .collect()
300 }
301 EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
302 }
303 }
304
305 pub fn stats(&self) -> EngineStats {
307 match &self.engine {
308 EngineVariant::DetectionOnly(engine) => EngineStats {
309 detection_rules: engine.rule_count(),
310 correlation_rules: 0,
311 state_entries: 0,
312 },
313 EngineVariant::WithCorrelations(engine) => EngineStats {
314 detection_rules: engine.detection_rule_count(),
315 correlation_rules: engine.correlation_rule_count(),
316 state_entries: engine.state_count(),
317 },
318 }
319 }
320
321 pub fn rules_path(&self) -> &Path {
323 &self.rules_path
324 }
325
326 pub fn pipelines(&self) -> &[Pipeline] {
328 &self.pipelines
329 }
330
331 pub fn corr_config(&self) -> &CorrelationConfig {
333 &self.corr_config
334 }
335
336 pub fn include_event(&self) -> bool {
338 self.include_event
339 }
340
341 pub fn export_state(&self) -> Option<CorrelationSnapshot> {
344 match &self.engine {
345 EngineVariant::DetectionOnly(_) => None,
346 EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
347 }
348 }
349
350 pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
354 if let EngineVariant::WithCorrelations(engine) = &mut self.engine {
355 engine.import_state(snapshot.clone())
356 } else {
357 true
358 }
359 }
360}
361
362fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
363 let collection = if path.is_dir() {
364 rsigma_parser::parse_sigma_directory(path)
365 .map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
366 } else {
367 rsigma_parser::parse_sigma_file(path)
368 .map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
369 };
370
371 if !collection.errors.is_empty() {
372 tracing::warn!(
373 count = collection.errors.len(),
374 "Parse errors while loading rules"
375 );
376 for (i, err) in collection.errors.iter().take(3).enumerate() {
377 tracing::warn!(index = i + 1, error = %err, "Rule parse error detail");
378 }
379 }
380
381 Ok(collection)
382}
383
384fn reload_pipelines(paths: &[PathBuf]) -> Result<Vec<Pipeline>, String> {
386 let mut pipelines = Vec::with_capacity(paths.len());
387 for path in paths {
388 let pipeline = parse_pipeline_file(path)
389 .map_err(|e| format!("Error reloading pipeline {}: {e}", path.display()))?;
390 pipelines.push(pipeline);
391 }
392 pipelines.sort_by_key(|p| p.priority);
393 Ok(pipelines)
394}
395
396async fn resolve_pipelines_async(
398 resolver: &Arc<dyn SourceResolver>,
399 pipelines: &[Pipeline],
400 allow_remote_include: bool,
401) -> Result<Vec<Pipeline>, String> {
402 let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
403 for pipeline in pipelines {
404 if pipeline.is_dynamic() {
405 let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources)
406 .await
407 .map_err(|e| {
408 format!(
409 "Failed to resolve dynamic pipeline '{}': {e}",
410 pipeline.name
411 )
412 })?;
413 let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
414 sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?;
415 resolved_pipelines.push(expanded);
416 } else {
417 resolved_pipelines.push(pipeline.clone());
418 }
419 }
420 Ok(resolved_pipelines)
421}