1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use arc_swap::ArcSwap;
5use rsigma_eval::event::Event;
6use rsigma_eval::{
7 CorrelationConfig, CorrelationEngine, CorrelationSnapshot, Engine, MatchDetailLevel, Pipeline,
8 ProcessResult, RuleFieldSet, parse_pipeline_file,
9};
10use rsigma_parser::SigmaCollection;
11
12use crate::sources::{self, SourceResolver, TemplateExpander};
13
14pub struct RuntimeEngine {
17 engine: EngineVariant,
18 pipelines: Vec<Pipeline>,
19 pipeline_paths: Vec<PathBuf>,
20 rules_path: std::path::PathBuf,
21 corr_config: CorrelationConfig,
22 include_event: bool,
23 source_resolver: Option<Arc<dyn SourceResolver>>,
24 allow_remote_include: bool,
25 bloom_prefilter: bool,
28 bloom_max_bytes: Option<usize>,
31 match_detail: MatchDetailLevel,
34 #[cfg(feature = "daachorse-index")]
38 cross_rule_ac: bool,
39 rule_field_set: Arc<ArcSwap<RuleFieldSet>>,
44}
45
46enum EngineVariant {
47 DetectionOnly(Box<Engine>),
48 WithCorrelations(Box<CorrelationEngine>),
49}
50
51#[derive(Debug, Clone, Copy)]
53pub struct EngineStats {
54 pub detection_rules: usize,
55 pub correlation_rules: usize,
56 pub state_entries: usize,
57}
58
59impl RuntimeEngine {
60 pub fn new(
61 rules_path: std::path::PathBuf,
62 pipelines: Vec<Pipeline>,
63 corr_config: CorrelationConfig,
64 include_event: bool,
65 ) -> Self {
66 RuntimeEngine {
67 engine: EngineVariant::DetectionOnly(Box::new(Engine::new())),
68 pipelines,
69 pipeline_paths: Vec::new(),
70 rules_path,
71 corr_config,
72 include_event,
73 source_resolver: None,
74 allow_remote_include: false,
75 bloom_prefilter: false,
76 bloom_max_bytes: None,
77 match_detail: MatchDetailLevel::Off,
78 #[cfg(feature = "daachorse-index")]
79 cross_rule_ac: false,
80 rule_field_set: Arc::new(ArcSwap::from_pointee(RuleFieldSet::default())),
81 }
82 }
83
84 pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
92 self.rule_field_set.load_full()
93 }
94
95 pub fn set_bloom_prefilter(&mut self, enabled: bool) {
99 self.bloom_prefilter = enabled;
100 }
101
102 pub fn bloom_prefilter(&self) -> bool {
107 self.bloom_prefilter
108 }
109
110 pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
113 self.bloom_max_bytes = Some(max_bytes);
114 }
115
116 pub fn bloom_max_bytes(&self) -> Option<usize> {
118 self.bloom_max_bytes
119 }
120
121 pub fn set_match_detail(&mut self, level: MatchDetailLevel) {
125 self.match_detail = level;
126 }
127
128 pub fn match_detail(&self) -> MatchDetailLevel {
131 self.match_detail
132 }
133
134 #[cfg(feature = "daachorse-index")]
141 pub fn set_cross_rule_ac(&mut self, enabled: bool) {
142 self.cross_rule_ac = enabled;
143 }
144
145 #[cfg(feature = "daachorse-index")]
147 pub fn cross_rule_ac(&self) -> bool {
148 self.cross_rule_ac
149 }
150
151 pub fn set_source_resolver(&mut self, resolver: Arc<dyn SourceResolver>) {
156 self.source_resolver = Some(resolver);
157 }
158
159 pub fn source_resolver(&self) -> Option<&Arc<dyn SourceResolver>> {
161 self.source_resolver.as_ref()
162 }
163
164 pub fn set_allow_remote_include(&mut self, allow: bool) {
166 self.allow_remote_include = allow;
167 }
168
169 pub fn allow_remote_include(&self) -> bool {
171 self.allow_remote_include
172 }
173
174 pub fn set_pipeline_paths(&mut self, paths: Vec<PathBuf>) {
180 self.pipeline_paths = paths;
181 }
182
183 pub fn pipeline_paths(&self) -> &[PathBuf] {
185 &self.pipeline_paths
186 }
187
188 pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> {
194 let Some(resolver) = &self.source_resolver else {
195 return Ok(());
196 };
197
198 let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len());
199 for pipeline in &self.pipelines {
200 if pipeline.is_dynamic() {
201 match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await {
202 Ok(resolved_data) => {
203 let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
204 sources::include::expand_includes(
206 &mut expanded,
207 &resolved_data,
208 self.allow_remote_include,
209 )?;
210 resolved_pipelines.push(expanded);
211 }
212 Err(e) => {
213 return Err(format!(
214 "Failed to resolve dynamic pipeline '{}': {e}",
215 pipeline.name
216 ));
217 }
218 }
219 } else {
220 resolved_pipelines.push(pipeline.clone());
221 }
222 }
223 self.pipelines = resolved_pipelines;
224 Ok(())
225 }
226
227 pub fn load_rules(&mut self) -> Result<EngineStats, String> {
240 let load_span = tracing::info_span!("load_rules", rules_path = %self.rules_path.display());
241 let _enter = load_span.enter();
242 let load_start = std::time::Instant::now();
243
244 if !self.pipeline_paths.is_empty() {
245 self.pipelines = reload_pipelines(&self.pipeline_paths)?;
246 }
247
248 if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) {
257 let handle = tokio::runtime::Handle::try_current().map_err(|_| {
258 "Dynamic pipelines require a tokio runtime; refusing to load rules with \
259 unresolved sources"
260 .to_string()
261 })?;
262 let pipelines = std::mem::take(&mut self.pipelines);
263 let resolver = self.source_resolver.clone().unwrap();
264 let allow_remote = self.allow_remote_include;
265 let resolved = tokio::task::block_in_place(|| {
266 handle.block_on(async {
267 resolve_pipelines_async(&resolver, &pipelines, allow_remote).await
268 })
269 });
270 match resolved {
271 Ok(p) => self.pipelines = p,
272 Err(e) => {
273 self.pipelines = pipelines;
277 return Err(format!("Dynamic source resolution failed: {e}"));
278 }
279 }
280 }
281
282 let previous_state = self.export_state();
283 let collection = load_collection(&self.rules_path)?;
284 let has_correlations = !collection.correlations.is_empty();
285
286 if has_correlations {
287 let mut engine = CorrelationEngine::new(self.corr_config.clone());
288 engine.set_include_event(self.include_event);
289 engine.set_match_detail(self.match_detail);
290 if let Some(budget) = self.bloom_max_bytes {
291 engine.set_bloom_max_bytes(budget);
292 }
293 engine.set_bloom_prefilter(self.bloom_prefilter);
294 #[cfg(feature = "daachorse-index")]
295 engine.set_cross_rule_ac(self.cross_rule_ac);
296 for p in &self.pipelines {
297 engine.add_pipeline(p.clone());
298 }
299 engine
300 .add_collection(&collection)
301 .map_err(|e| format!("Error compiling rules: {e}"))?;
302
303 if let Some(snapshot) = previous_state {
304 engine.import_state(snapshot);
305 }
306
307 let stats = EngineStats {
308 detection_rules: engine.detection_rule_count(),
309 correlation_rules: engine.correlation_rule_count(),
310 state_entries: engine.state_count(),
311 };
312 self.engine = EngineVariant::WithCorrelations(Box::new(engine));
313 self.refresh_rule_field_set(&collection);
314 tracing::debug!(
315 detection_rules = stats.detection_rules,
316 correlation_rules = stats.correlation_rules,
317 duration_ms = load_start.elapsed().as_millis() as u64,
318 "Rule load complete",
319 );
320 Ok(stats)
321 } else {
322 let mut engine = Engine::new();
323 engine.set_include_event(self.include_event);
324 engine.set_match_detail(self.match_detail);
325 if let Some(budget) = self.bloom_max_bytes {
326 engine.set_bloom_max_bytes(budget);
327 }
328 engine.set_bloom_prefilter(self.bloom_prefilter);
329 #[cfg(feature = "daachorse-index")]
330 engine.set_cross_rule_ac(self.cross_rule_ac);
331 for p in &self.pipelines {
332 engine.add_pipeline(p.clone());
333 }
334 engine
335 .add_collection(&collection)
336 .map_err(|e| format!("Error compiling rules: {e}"))?;
337
338 let stats = EngineStats {
339 detection_rules: engine.rule_count(),
340 correlation_rules: 0,
341 state_entries: 0,
342 };
343 self.engine = EngineVariant::DetectionOnly(Box::new(engine));
344 self.refresh_rule_field_set(&collection);
345 tracing::debug!(
346 detection_rules = stats.detection_rules,
347 correlation_rules = stats.correlation_rules,
348 duration_ms = load_start.elapsed().as_millis() as u64,
349 "Rule load complete",
350 );
351 Ok(stats)
352 }
353 }
354
355 fn refresh_rule_field_set(&self, collection: &SigmaCollection) {
358 let field_set = RuleFieldSet::collect(collection, &self.pipelines, true);
359 self.rule_field_set.store(Arc::new(field_set));
360 }
361
362 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
367 match &mut self.engine {
368 EngineVariant::DetectionOnly(engine) => engine.evaluate_batch(events),
369 EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
370 }
371 }
372
373 pub fn stats(&self) -> EngineStats {
375 match &self.engine {
376 EngineVariant::DetectionOnly(engine) => EngineStats {
377 detection_rules: engine.rule_count(),
378 correlation_rules: 0,
379 state_entries: 0,
380 },
381 EngineVariant::WithCorrelations(engine) => EngineStats {
382 detection_rules: engine.detection_rule_count(),
383 correlation_rules: engine.correlation_rule_count(),
384 state_entries: engine.state_count(),
385 },
386 }
387 }
388
389 pub fn rules_path(&self) -> &Path {
391 &self.rules_path
392 }
393
394 pub fn pipelines(&self) -> &[Pipeline] {
396 &self.pipelines
397 }
398
399 pub fn corr_config(&self) -> &CorrelationConfig {
401 &self.corr_config
402 }
403
404 pub fn include_event(&self) -> bool {
406 self.include_event
407 }
408
409 pub fn export_state(&self) -> Option<CorrelationSnapshot> {
412 match &self.engine {
413 EngineVariant::DetectionOnly(_) => None,
414 EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
415 }
416 }
417
418 pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
422 if let EngineVariant::WithCorrelations(engine) = &mut self.engine {
423 engine.import_state(snapshot.clone())
424 } else {
425 true
426 }
427 }
428}
429
430fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
431 let collection = if path.is_dir() {
432 rsigma_parser::parse_sigma_directory(path)
433 .map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
434 } else {
435 rsigma_parser::parse_sigma_file(path)
436 .map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
437 };
438
439 if !collection.errors.is_empty() {
440 tracing::warn!(
441 count = collection.errors.len(),
442 "Parse errors while loading rules"
443 );
444 for (i, err) in collection.errors.iter().take(3).enumerate() {
445 tracing::warn!(index = i + 1, error = %err, "Rule parse error detail");
446 }
447 }
448
449 Ok(collection)
450}
451
452fn reload_pipelines(paths: &[PathBuf]) -> Result<Vec<Pipeline>, String> {
462 let mut pipelines = Vec::with_capacity(paths.len());
463 for path in paths {
464 let pipeline = parse_pipeline_file(path)
465 .map_err(|e| format!("Error reloading pipeline {}: {e}", path.display()))?;
466 if !pipeline.sources.is_empty() {
467 crate::warn_pipeline_inline_sources(path, &pipeline.name);
468 }
469 pipelines.push(pipeline);
470 }
471 pipelines.sort_by_key(|p| p.priority);
472 Ok(pipelines)
473}
474
475async fn resolve_pipelines_async(
477 resolver: &Arc<dyn SourceResolver>,
478 pipelines: &[Pipeline],
479 allow_remote_include: bool,
480) -> Result<Vec<Pipeline>, String> {
481 let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
482 for pipeline in pipelines {
483 if pipeline.is_dynamic() {
484 let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources)
485 .await
486 .map_err(|e| {
487 format!(
488 "Failed to resolve dynamic pipeline '{}': {e}",
489 pipeline.name
490 )
491 })?;
492 let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
493 sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?;
494 resolved_pipelines.push(expanded);
495 } else {
496 resolved_pipelines.push(pipeline.clone());
497 }
498 }
499 Ok(resolved_pipelines)
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505 use crate::pipeline_deprecation::reset_inline_sources_dedup_for_tests;
506
507 fn serial_guard() -> std::sync::MutexGuard<'static, ()> {
514 crate::pipeline_deprecation::DEDUP_TEST_LOCK
515 .lock()
516 .unwrap_or_else(|poisoned| poisoned.into_inner())
517 }
518
519 const RULE_YAML: &str = r#"
520title: TestRule
521id: 11111111-1111-1111-1111-111111111111
522status: experimental
523logsource:
524 product: test
525detection:
526 selection:
527 EventID: 1
528 condition: selection
529"#;
530
531 const PIPELINE_WITH_SOURCES: &str = r#"
532name: legacy_pipeline_with_inline_sources
533priority: 50
534sources:
535 - id: threat_feed
536 type: file
537 path: /tmp/does-not-matter.json
538 format: json
539transformations:
540 - type: value_placeholders
541"#;
542
543 const PIPELINE_NO_SOURCES: &str = r#"
544name: simple_pipeline
545priority: 10
546transformations:
547 - id: rename
548 type: field_name_mapping
549 mapping:
550 EventID: event.id
551"#;
552
553 fn dedup_set_contains(path: &Path) -> bool {
554 let canonical = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
555 crate::pipeline_deprecation::tests_only_snapshot().contains(&canonical)
556 }
557
558 #[test]
559 fn load_rules_surfaces_inline_sources_deprecation_through_runtime() {
560 let _guard = serial_guard();
561 reset_inline_sources_dedup_for_tests();
562
563 let dir = tempfile::tempdir().unwrap();
564 let rule_path = dir.path().join("rule.yml");
565 std::fs::write(&rule_path, RULE_YAML).unwrap();
566
567 let pipeline_path = dir.path().join("pipeline.yml");
568 std::fs::write(&pipeline_path, PIPELINE_WITH_SOURCES).unwrap();
569 let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
570
571 let mut engine = RuntimeEngine::new(
572 rule_path,
573 vec![pipeline],
574 CorrelationConfig::default(),
575 false,
576 );
577 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
578 engine.load_rules().unwrap();
579
580 assert!(
581 dedup_set_contains(&pipeline_path),
582 "RuntimeEngine::load_rules should route inline sources through \
583 warn_pipeline_inline_sources so the daemon hot-reload path \
584 covers the deprecation; the canonical pipeline path was not \
585 recorded in the dedup set."
586 );
587 }
588
589 #[test]
590 fn load_rules_does_not_warn_when_pipeline_has_no_inline_sources() {
591 let _guard = serial_guard();
592 reset_inline_sources_dedup_for_tests();
593
594 let dir = tempfile::tempdir().unwrap();
595 let rule_path = dir.path().join("rule.yml");
596 std::fs::write(&rule_path, RULE_YAML).unwrap();
597
598 let pipeline_path = dir.path().join("clean.yml");
599 std::fs::write(&pipeline_path, PIPELINE_NO_SOURCES).unwrap();
600 let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
601
602 let mut engine = RuntimeEngine::new(
603 rule_path,
604 vec![pipeline],
605 CorrelationConfig::default(),
606 false,
607 );
608 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
609 engine.load_rules().unwrap();
610
611 assert!(
612 !dedup_set_contains(&pipeline_path),
613 "a pipeline without inline sources must not register in the \
614 deprecation dedup set."
615 );
616 }
617
618 #[test]
619 fn hot_reload_dedups_inline_sources_warning_for_same_pipeline_path() {
620 let _guard = serial_guard();
621 reset_inline_sources_dedup_for_tests();
622
623 let dir = tempfile::tempdir().unwrap();
624 let rule_path = dir.path().join("rule.yml");
625 std::fs::write(&rule_path, RULE_YAML).unwrap();
626
627 let pipeline_path = dir.path().join("pipeline.yml");
628 std::fs::write(&pipeline_path, PIPELINE_WITH_SOURCES).unwrap();
629 let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
630
631 let mut engine = RuntimeEngine::new(
632 rule_path,
633 vec![pipeline],
634 CorrelationConfig::default(),
635 false,
636 );
637 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
638
639 engine.load_rules().unwrap();
641 assert!(dedup_set_contains(&pipeline_path));
642
643 let canonical = pipeline_path.canonicalize().unwrap();
648 let before = crate::pipeline_deprecation::tests_only_snapshot();
649 engine.load_rules().unwrap();
650 let after = crate::pipeline_deprecation::tests_only_snapshot();
651
652 assert_eq!(
653 before, after,
654 "second load_rules should not change the dedup set",
655 );
656 assert!(after.contains(&canonical));
657 }
658
659 #[tokio::test(flavor = "multi_thread")]
660 async fn load_rules_fails_closed_when_dynamic_source_resolution_fails() {
661 let dir = tempfile::tempdir().unwrap();
668 let rule_path = dir.path().join("rule.yml");
669 std::fs::write(&rule_path, RULE_YAML).unwrap();
670
671 let missing = dir.path().join("missing.json");
674 let pipeline_yaml = format!(
675 r#"
676name: dynamic_missing
677priority: 10
678sources:
679 - id: feed
680 type: file
681 path: {}
682 format: json
683 on_error: fail
684transformations:
685 - type: value_placeholders
686"#,
687 missing.display(),
688 );
689 let pipeline_path = dir.path().join("pipeline.yml");
690 std::fs::write(&pipeline_path, pipeline_yaml).unwrap();
691 let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
692 assert!(
693 pipeline.is_dynamic(),
694 "fixture should produce a dynamic pipeline"
695 );
696
697 let mut engine = RuntimeEngine::new(
698 rule_path,
699 vec![pipeline],
700 CorrelationConfig::default(),
701 false,
702 );
703 engine.set_source_resolver(Arc::new(sources::DefaultSourceResolver::new()));
704
705 let err = engine
706 .load_rules()
707 .expect_err("missing source must cause load_rules to fail closed");
708 assert!(
709 err.contains("Dynamic source resolution failed"),
710 "error should explain the fail-closed path; got: {err}"
711 );
712 }
713}