1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use arc_swap::ArcSwap;
5use rsigma_eval::event::Event;
6use rsigma_eval::{
7 CorrelationConfig, CorrelationEngine, CorrelationSnapshot, CorrelationStateSnapshot, Engine,
8 LogSourceExtractor, MatchDetailLevel, Pipeline, ProcessResult, RoutingPlan, RuleFieldSet,
9 SchemaClassifier, SchemaRouter, parse_pipeline_file,
10};
11use rsigma_parser::SigmaCollection;
12
13use crate::sources::{self, SourceResolver, TemplateExpander};
14
15pub struct RuntimeEngine {
18 engine: EngineVariant,
19 pipelines: Vec<Pipeline>,
20 pipeline_paths: Vec<PathBuf>,
21 rules_path: std::path::PathBuf,
22 corr_config: CorrelationConfig,
23 include_event: bool,
24 source_resolver: Option<Arc<dyn SourceResolver>>,
25 allow_remote_include: bool,
26 bloom_prefilter: bool,
29 bloom_max_bytes: Option<usize>,
32 match_detail: MatchDetailLevel,
35 #[cfg(feature = "daachorse-index")]
39 cross_rule_ac: bool,
40 rule_field_set: Arc<ArcSwap<RuleFieldSet>>,
45 routing: Option<RoutingSpec>,
50 logsource_extractor: Option<LogSourceExtractor>,
53}
54
55#[derive(Clone)]
59pub struct RoutingSpec {
60 pub classifier: SchemaClassifier,
61 pub plan: RoutingPlan,
62 pub pipeline_sets: Vec<Vec<Pipeline>>,
63}
64
65enum EngineVariant {
66 DetectionOnly(Box<Engine>),
67 WithCorrelations(Box<CorrelationEngine>),
68 Routed(Box<SchemaRouter>),
69}
70
71#[derive(Debug, Clone, Copy)]
73pub struct EngineStats {
74 pub detection_rules: usize,
75 pub correlation_rules: usize,
76 pub state_entries: usize,
77}
78
79impl RuntimeEngine {
80 pub fn new(
81 rules_path: std::path::PathBuf,
82 pipelines: Vec<Pipeline>,
83 corr_config: CorrelationConfig,
84 include_event: bool,
85 ) -> Self {
86 RuntimeEngine {
87 engine: EngineVariant::DetectionOnly(Box::new(Engine::new())),
88 pipelines,
89 pipeline_paths: Vec::new(),
90 rules_path,
91 corr_config,
92 include_event,
93 source_resolver: None,
94 allow_remote_include: false,
95 bloom_prefilter: false,
96 bloom_max_bytes: None,
97 match_detail: MatchDetailLevel::Off,
98 #[cfg(feature = "daachorse-index")]
99 cross_rule_ac: false,
100 rule_field_set: Arc::new(ArcSwap::from_pointee(RuleFieldSet::default())),
101 routing: None,
102 logsource_extractor: None,
103 }
104 }
105
106 pub fn set_routing(&mut self, spec: Option<RoutingSpec>) {
110 self.routing = spec;
111 }
112
113 pub fn has_routing(&self) -> bool {
115 self.routing.is_some()
116 }
117
118 pub fn routing(&self) -> Option<RoutingSpec> {
121 self.routing.clone()
122 }
123
124 pub fn set_logsource_extractor(&mut self, extractor: Option<LogSourceExtractor>) {
128 self.logsource_extractor = extractor;
129 }
130
131 pub fn logsource_extractor(&self) -> Option<LogSourceExtractor> {
134 self.logsource_extractor.clone()
135 }
136
137 pub fn logsource_pruned_total(&self) -> u64 {
140 match &self.engine {
141 EngineVariant::DetectionOnly(engine) => engine.logsource_pruned_total(),
142 EngineVariant::WithCorrelations(engine) => engine.logsource_pruned_total(),
143 EngineVariant::Routed(router) => router.logsource_pruned_total(),
144 }
145 }
146
147 pub fn logsource_absent_total(&self) -> u64 {
150 match &self.engine {
151 EngineVariant::DetectionOnly(engine) => engine.logsource_absent_total(),
152 EngineVariant::WithCorrelations(engine) => engine.logsource_absent_total(),
153 EngineVariant::Routed(router) => router.logsource_absent_total(),
154 }
155 }
156
157 pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
165 self.rule_field_set.load_full()
166 }
167
168 pub fn set_bloom_prefilter(&mut self, enabled: bool) {
172 self.bloom_prefilter = enabled;
173 }
174
175 pub fn bloom_prefilter(&self) -> bool {
180 self.bloom_prefilter
181 }
182
183 pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
186 self.bloom_max_bytes = Some(max_bytes);
187 }
188
189 pub fn bloom_max_bytes(&self) -> Option<usize> {
191 self.bloom_max_bytes
192 }
193
194 pub fn set_match_detail(&mut self, level: MatchDetailLevel) {
198 self.match_detail = level;
199 }
200
201 pub fn match_detail(&self) -> MatchDetailLevel {
204 self.match_detail
205 }
206
207 #[cfg(feature = "daachorse-index")]
214 pub fn set_cross_rule_ac(&mut self, enabled: bool) {
215 self.cross_rule_ac = enabled;
216 }
217
218 #[cfg(feature = "daachorse-index")]
220 pub fn cross_rule_ac(&self) -> bool {
221 self.cross_rule_ac
222 }
223
224 pub fn set_source_resolver(&mut self, resolver: Arc<dyn SourceResolver>) {
229 self.source_resolver = Some(resolver);
230 }
231
232 pub fn source_resolver(&self) -> Option<&Arc<dyn SourceResolver>> {
234 self.source_resolver.as_ref()
235 }
236
237 pub fn set_allow_remote_include(&mut self, allow: bool) {
239 self.allow_remote_include = allow;
240 }
241
242 pub fn allow_remote_include(&self) -> bool {
244 self.allow_remote_include
245 }
246
247 pub fn set_pipeline_paths(&mut self, paths: Vec<PathBuf>) {
253 self.pipeline_paths = paths;
254 }
255
256 pub fn pipeline_paths(&self) -> &[PathBuf] {
258 &self.pipeline_paths
259 }
260
261 pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> {
267 let Some(resolver) = &self.source_resolver else {
268 return Ok(());
269 };
270
271 let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len());
272 for pipeline in &self.pipelines {
273 if pipeline.is_dynamic() {
274 match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await {
275 Ok(resolved_data) => {
276 let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
277 sources::include::expand_includes(
279 &mut expanded,
280 &resolved_data,
281 self.allow_remote_include,
282 )?;
283 resolved_pipelines.push(expanded);
284 }
285 Err(e) => {
286 return Err(format!(
287 "Failed to resolve dynamic pipeline '{}': {e}",
288 pipeline.name
289 ));
290 }
291 }
292 } else {
293 resolved_pipelines.push(pipeline.clone());
294 }
295 }
296 self.pipelines = resolved_pipelines;
297 Ok(())
298 }
299
300 pub fn load_rules(&mut self) -> Result<EngineStats, String> {
313 let load_span = tracing::info_span!("load_rules", rules_path = %self.rules_path.display());
314 let _enter = load_span.enter();
315 let load_start = std::time::Instant::now();
316
317 if !self.pipeline_paths.is_empty() {
318 self.pipelines = reload_pipelines(&self.pipeline_paths)?;
319 }
320
321 if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) {
330 let handle = tokio::runtime::Handle::try_current().map_err(|_| {
331 "Dynamic pipelines require a tokio runtime; refusing to load rules with \
332 unresolved sources"
333 .to_string()
334 })?;
335 let pipelines = std::mem::take(&mut self.pipelines);
336 let resolver = self.source_resolver.clone().unwrap();
337 let allow_remote = self.allow_remote_include;
338 let resolved = tokio::task::block_in_place(|| {
339 handle.block_on(async {
340 resolve_pipelines_async(&resolver, &pipelines, allow_remote).await
341 })
342 });
343 match resolved {
344 Ok(p) => self.pipelines = p,
345 Err(e) => {
346 self.pipelines = pipelines;
350 return Err(format!("Dynamic source resolution failed: {e}"));
351 }
352 }
353 }
354
355 let previous_state = self.export_state();
356 let collection = load_collection(&self.rules_path)?;
357 let has_correlations = !collection.correlations.is_empty();
358
359 if let Some(mut spec) = self.routing.clone() {
360 if let Some(resolver) = self.source_resolver.clone() {
363 let needs_resolve = spec
364 .pipeline_sets
365 .iter()
366 .any(|set| set.iter().any(|p| p.is_dynamic()));
367 if needs_resolve {
368 let handle = tokio::runtime::Handle::try_current().map_err(|_| {
369 "Dynamic pipelines require a tokio runtime; refusing to load rules with \
370 unresolved sources"
371 .to_string()
372 })?;
373 let allow_remote = self.allow_remote_include;
374 let mut resolved_sets = Vec::with_capacity(spec.pipeline_sets.len());
375 for set in spec.pipeline_sets {
376 let resolved = tokio::task::block_in_place(|| {
377 handle.block_on(async {
378 resolve_pipelines_async(&resolver, &set, allow_remote).await
379 })
380 });
381 match resolved {
382 Ok(p) => resolved_sets.push(p),
383 Err(e) => {
384 return Err(format!(
385 "Dynamic source resolution failed (schema routing): {e}"
386 ));
387 }
388 }
389 }
390 spec.pipeline_sets = resolved_sets;
391 }
392 }
393
394 let mut router = SchemaRouter::build(
395 &collection,
396 spec.classifier,
397 spec.plan,
398 spec.pipeline_sets,
399 self.corr_config.clone(),
400 self.include_event,
401 self.match_detail,
402 self.logsource_extractor.clone(),
403 )
404 .map_err(|e| format!("Error building schema router: {e}"))?;
405
406 if let Some(snapshot) = previous_state {
407 router.import_state(snapshot);
408 }
409
410 let stats = EngineStats {
411 detection_rules: router.detection_rule_count(),
412 correlation_rules: router.correlation_rule_count(),
413 state_entries: router.state_count(),
414 };
415 self.engine = EngineVariant::Routed(Box::new(router));
416 self.refresh_rule_field_set(&collection);
417 tracing::debug!(
418 detection_rules = stats.detection_rules,
419 correlation_rules = stats.correlation_rules,
420 duration_ms = load_start.elapsed().as_millis() as u64,
421 "Rule load complete (schema routing)",
422 );
423 return Ok(stats);
424 }
425
426 if has_correlations {
427 let mut engine = CorrelationEngine::new(self.corr_config.clone());
428 engine.set_include_event(self.include_event);
429 engine.set_match_detail(self.match_detail);
430 if let Some(budget) = self.bloom_max_bytes {
431 engine.set_bloom_max_bytes(budget);
432 }
433 engine.set_bloom_prefilter(self.bloom_prefilter);
434 #[cfg(feature = "daachorse-index")]
435 engine.set_cross_rule_ac(self.cross_rule_ac);
436 engine.set_logsource_extractor(self.logsource_extractor.clone());
437 for p in &self.pipelines {
438 engine.add_pipeline(p.clone());
439 }
440 engine
441 .add_collection(&collection)
442 .map_err(|e| format!("Error compiling rules: {e}"))?;
443
444 if let Some(snapshot) = previous_state {
445 engine.import_state(snapshot);
446 }
447
448 let stats = EngineStats {
449 detection_rules: engine.detection_rule_count(),
450 correlation_rules: engine.correlation_rule_count(),
451 state_entries: engine.state_count(),
452 };
453 self.engine = EngineVariant::WithCorrelations(Box::new(engine));
454 self.refresh_rule_field_set(&collection);
455 tracing::debug!(
456 detection_rules = stats.detection_rules,
457 correlation_rules = stats.correlation_rules,
458 duration_ms = load_start.elapsed().as_millis() as u64,
459 "Rule load complete",
460 );
461 Ok(stats)
462 } else {
463 let mut engine = Engine::new();
464 engine.set_include_event(self.include_event);
465 engine.set_match_detail(self.match_detail);
466 if let Some(budget) = self.bloom_max_bytes {
467 engine.set_bloom_max_bytes(budget);
468 }
469 engine.set_bloom_prefilter(self.bloom_prefilter);
470 #[cfg(feature = "daachorse-index")]
471 engine.set_cross_rule_ac(self.cross_rule_ac);
472 engine.set_logsource_extractor(self.logsource_extractor.clone());
473 for p in &self.pipelines {
474 engine.add_pipeline(p.clone());
475 }
476 engine
477 .add_collection(&collection)
478 .map_err(|e| format!("Error compiling rules: {e}"))?;
479
480 let stats = EngineStats {
481 detection_rules: engine.rule_count(),
482 correlation_rules: 0,
483 state_entries: 0,
484 };
485 self.engine = EngineVariant::DetectionOnly(Box::new(engine));
486 self.refresh_rule_field_set(&collection);
487 tracing::debug!(
488 detection_rules = stats.detection_rules,
489 correlation_rules = stats.correlation_rules,
490 duration_ms = load_start.elapsed().as_millis() as u64,
491 "Rule load complete",
492 );
493 Ok(stats)
494 }
495 }
496
497 fn refresh_rule_field_set(&self, collection: &SigmaCollection) {
500 let field_set = RuleFieldSet::collect(collection, &self.pipelines, true);
501 self.rule_field_set.store(Arc::new(field_set));
502 }
503
504 pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
509 match &mut self.engine {
510 EngineVariant::DetectionOnly(engine) => engine.evaluate_batch(events),
511 EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
512 EngineVariant::Routed(router) => router.process_batch(events),
513 }
514 }
515
516 pub fn stats(&self) -> EngineStats {
518 match &self.engine {
519 EngineVariant::DetectionOnly(engine) => EngineStats {
520 detection_rules: engine.rule_count(),
521 correlation_rules: 0,
522 state_entries: 0,
523 },
524 EngineVariant::WithCorrelations(engine) => EngineStats {
525 detection_rules: engine.detection_rule_count(),
526 correlation_rules: engine.correlation_rule_count(),
527 state_entries: engine.state_count(),
528 },
529 EngineVariant::Routed(router) => EngineStats {
530 detection_rules: router.detection_rule_count(),
531 correlation_rules: router.correlation_rule_count(),
532 state_entries: router.state_count(),
533 },
534 }
535 }
536
537 pub fn rules_path(&self) -> &Path {
539 &self.rules_path
540 }
541
542 pub fn pipelines(&self) -> &[Pipeline] {
544 &self.pipelines
545 }
546
547 pub fn corr_config(&self) -> &CorrelationConfig {
549 &self.corr_config
550 }
551
552 pub fn include_event(&self) -> bool {
554 self.include_event
555 }
556
557 pub fn export_state(&self) -> Option<CorrelationSnapshot> {
560 match &self.engine {
561 EngineVariant::DetectionOnly(_) => None,
562 EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
563 EngineVariant::Routed(router) => router.export_state(),
564 }
565 }
566
567 pub fn introspect_correlations(
571 &self,
572 id: Option<&str>,
573 group: Option<&str>,
574 ) -> Option<CorrelationStateSnapshot> {
575 match &self.engine {
576 EngineVariant::DetectionOnly(_) => None,
577 EngineVariant::WithCorrelations(engine) => Some(engine.introspect_filtered(id, group)),
578 EngineVariant::Routed(router) => router.correlation_introspect(id, group),
579 }
580 }
581
582 pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
586 match &mut self.engine {
587 EngineVariant::WithCorrelations(engine) => engine.import_state(snapshot.clone()),
588 EngineVariant::Routed(router) => router.import_state(snapshot.clone()),
589 EngineVariant::DetectionOnly(_) => true,
590 }
591 }
592}
593
594fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
595 let collection = if path.is_dir() {
596 rsigma_parser::parse_sigma_directory(path)
597 .map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
598 } else {
599 rsigma_parser::parse_sigma_file(path)
600 .map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
601 };
602
603 if !collection.errors.is_empty() {
604 tracing::warn!(
605 count = collection.errors.len(),
606 "Parse errors while loading rules"
607 );
608 for (i, err) in collection.errors.iter().take(3).enumerate() {
609 tracing::warn!(index = i + 1, error = %err, "Rule parse error detail");
610 }
611 }
612
613 Ok(collection)
614}
615
616fn reload_pipelines(paths: &[PathBuf]) -> Result<Vec<Pipeline>, String> {
626 let mut pipelines = Vec::with_capacity(paths.len());
627 for path in paths {
628 let pipeline = parse_pipeline_file(path)
629 .map_err(|e| format!("Error reloading pipeline {}: {e}", path.display()))?;
630 if !pipeline.sources.is_empty() {
631 crate::warn_pipeline_inline_sources(path, &pipeline.name);
632 }
633 pipelines.push(pipeline);
634 }
635 pipelines.sort_by_key(|p| p.priority);
636 Ok(pipelines)
637}
638
639async fn resolve_pipelines_async(
641 resolver: &Arc<dyn SourceResolver>,
642 pipelines: &[Pipeline],
643 allow_remote_include: bool,
644) -> Result<Vec<Pipeline>, String> {
645 let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
646 for pipeline in pipelines {
647 if pipeline.is_dynamic() {
648 let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources)
649 .await
650 .map_err(|e| {
651 format!(
652 "Failed to resolve dynamic pipeline '{}': {e}",
653 pipeline.name
654 )
655 })?;
656 let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
657 sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?;
658 resolved_pipelines.push(expanded);
659 } else {
660 resolved_pipelines.push(pipeline.clone());
661 }
662 }
663 Ok(resolved_pipelines)
664}
665
666#[cfg(test)]
667mod tests {
668 use super::*;
669 use crate::pipeline_deprecation::reset_inline_sources_dedup_for_tests;
670
671 fn serial_guard() -> std::sync::MutexGuard<'static, ()> {
678 crate::pipeline_deprecation::DEDUP_TEST_LOCK
679 .lock()
680 .unwrap_or_else(|poisoned| poisoned.into_inner())
681 }
682
683 const RULE_YAML: &str = r#"
684title: TestRule
685id: 11111111-1111-1111-1111-111111111111
686status: experimental
687logsource:
688 product: test
689detection:
690 selection:
691 EventID: 1
692 condition: selection
693"#;
694
695 const PIPELINE_WITH_SOURCES: &str = r#"
696name: legacy_pipeline_with_inline_sources
697priority: 50
698sources:
699 - id: threat_feed
700 type: file
701 path: /tmp/does-not-matter.json
702 format: json
703transformations:
704 - type: value_placeholders
705"#;
706
707 const PIPELINE_NO_SOURCES: &str = r#"
708name: simple_pipeline
709priority: 10
710transformations:
711 - id: rename
712 type: field_name_mapping
713 mapping:
714 EventID: event.id
715"#;
716
717 fn dedup_set_contains(path: &Path) -> bool {
718 let canonical = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
719 crate::pipeline_deprecation::tests_only_snapshot().contains(&canonical)
720 }
721
722 #[test]
723 fn load_rules_surfaces_inline_sources_deprecation_through_runtime() {
724 let _guard = serial_guard();
725 reset_inline_sources_dedup_for_tests();
726
727 let dir = tempfile::tempdir().unwrap();
728 let rule_path = dir.path().join("rule.yml");
729 std::fs::write(&rule_path, RULE_YAML).unwrap();
730
731 let pipeline_path = dir.path().join("pipeline.yml");
732 std::fs::write(&pipeline_path, PIPELINE_WITH_SOURCES).unwrap();
733 let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
734
735 let mut engine = RuntimeEngine::new(
736 rule_path,
737 vec![pipeline],
738 CorrelationConfig::default(),
739 false,
740 );
741 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
742 engine.load_rules().unwrap();
743
744 assert!(
745 dedup_set_contains(&pipeline_path),
746 "RuntimeEngine::load_rules should route inline sources through \
747 warn_pipeline_inline_sources so the daemon hot-reload path \
748 covers the deprecation; the canonical pipeline path was not \
749 recorded in the dedup set."
750 );
751 }
752
753 #[test]
754 fn load_rules_does_not_warn_when_pipeline_has_no_inline_sources() {
755 let _guard = serial_guard();
756 reset_inline_sources_dedup_for_tests();
757
758 let dir = tempfile::tempdir().unwrap();
759 let rule_path = dir.path().join("rule.yml");
760 std::fs::write(&rule_path, RULE_YAML).unwrap();
761
762 let pipeline_path = dir.path().join("clean.yml");
763 std::fs::write(&pipeline_path, PIPELINE_NO_SOURCES).unwrap();
764 let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
765
766 let mut engine = RuntimeEngine::new(
767 rule_path,
768 vec![pipeline],
769 CorrelationConfig::default(),
770 false,
771 );
772 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
773 engine.load_rules().unwrap();
774
775 assert!(
776 !dedup_set_contains(&pipeline_path),
777 "a pipeline without inline sources must not register in the \
778 deprecation dedup set."
779 );
780 }
781
782 #[test]
783 fn hot_reload_dedups_inline_sources_warning_for_same_pipeline_path() {
784 let _guard = serial_guard();
785 reset_inline_sources_dedup_for_tests();
786
787 let dir = tempfile::tempdir().unwrap();
788 let rule_path = dir.path().join("rule.yml");
789 std::fs::write(&rule_path, RULE_YAML).unwrap();
790
791 let pipeline_path = dir.path().join("pipeline.yml");
792 std::fs::write(&pipeline_path, PIPELINE_WITH_SOURCES).unwrap();
793 let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
794
795 let mut engine = RuntimeEngine::new(
796 rule_path,
797 vec![pipeline],
798 CorrelationConfig::default(),
799 false,
800 );
801 engine.set_pipeline_paths(vec![pipeline_path.clone()]);
802
803 engine.load_rules().unwrap();
805 assert!(dedup_set_contains(&pipeline_path));
806
807 let canonical = pipeline_path.canonicalize().unwrap();
812 let before = crate::pipeline_deprecation::tests_only_snapshot();
813 engine.load_rules().unwrap();
814 let after = crate::pipeline_deprecation::tests_only_snapshot();
815
816 assert_eq!(
817 before, after,
818 "second load_rules should not change the dedup set",
819 );
820 assert!(after.contains(&canonical));
821 }
822
823 #[tokio::test(flavor = "multi_thread")]
824 async fn load_rules_fails_closed_when_dynamic_source_resolution_fails() {
825 let dir = tempfile::tempdir().unwrap();
832 let rule_path = dir.path().join("rule.yml");
833 std::fs::write(&rule_path, RULE_YAML).unwrap();
834
835 let missing = dir.path().join("missing.json");
838 let pipeline_yaml = format!(
839 r#"
840name: dynamic_missing
841priority: 10
842sources:
843 - id: feed
844 type: file
845 path: {}
846 format: json
847 on_error: fail
848transformations:
849 - type: value_placeholders
850"#,
851 missing.display(),
852 );
853 let pipeline_path = dir.path().join("pipeline.yml");
854 std::fs::write(&pipeline_path, pipeline_yaml).unwrap();
855 let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
856 assert!(
857 pipeline.is_dynamic(),
858 "fixture should produce a dynamic pipeline"
859 );
860
861 let mut engine = RuntimeEngine::new(
862 rule_path,
863 vec![pipeline],
864 CorrelationConfig::default(),
865 false,
866 );
867 engine.set_source_resolver(Arc::new(sources::DefaultSourceResolver::new()));
868
869 let err = engine
870 .load_rules()
871 .expect_err("missing source must cause load_rules to fail closed");
872 assert!(
873 err.contains("Dynamic source resolution failed"),
874 "error should explain the fail-closed path; got: {err}"
875 );
876 }
877}