Skip to main content

rsigma_runtime/
engine.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use arc_swap::ArcSwap;
5use rsigma_eval::event::Event;
6use rsigma_eval::{
7    CorrelationConfig, CorrelationEngine, CorrelationSnapshot, Engine, Pipeline, ProcessResult,
8    RuleFieldSet, parse_pipeline_file,
9};
10use rsigma_parser::SigmaCollection;
11
12use crate::sources::{self, SourceResolver, TemplateExpander};
13
14/// Wraps a CorrelationEngine (or a plain Engine) and provides the interface
15/// the runtime needs: process events, reload rules, and query state.
16pub struct RuntimeEngine {
17    engine: EngineVariant,
18    pipelines: Vec<Pipeline>,
19    pipeline_paths: Vec<PathBuf>,
20    rules_path: std::path::PathBuf,
21    corr_config: CorrelationConfig,
22    include_event: bool,
23    source_resolver: Option<Arc<dyn SourceResolver>>,
24    allow_remote_include: bool,
25    /// Opt-in bloom-filter pre-filtering of positive substring matchers.
26    /// Forwarded to the inner detection engine on every rule reload.
27    bloom_prefilter: bool,
28    /// Optional override for the bloom memory budget in bytes. `None`
29    /// means use the eval crate default.
30    bloom_max_bytes: Option<usize>,
31    /// Opt-in cross-rule Aho-Corasick pre-filter. Forwarded to the inner
32    /// detection engine on every rule reload. Available behind the
33    /// `daachorse-index` Cargo feature.
34    #[cfg(feature = "daachorse-index")]
35    cross_rule_ac: bool,
36    /// Post-pipeline rule field set, refreshed at the end of every
37    /// `load_rules()`. Wrapped in `ArcSwap` so readers (e.g. the daemon's
38    /// `/api/v1/fields/*` endpoints) can snapshot a stable view without
39    /// blocking the hot path during a reload.
40    rule_field_set: Arc<ArcSwap<RuleFieldSet>>,
41}
42
43enum EngineVariant {
44    DetectionOnly(Box<Engine>),
45    WithCorrelations(Box<CorrelationEngine>),
46}
47
48/// Summary statistics about the loaded engine state.
49#[derive(Debug, Clone, Copy)]
50pub struct EngineStats {
51    pub detection_rules: usize,
52    pub correlation_rules: usize,
53    pub state_entries: usize,
54}
55
56impl RuntimeEngine {
57    pub fn new(
58        rules_path: std::path::PathBuf,
59        pipelines: Vec<Pipeline>,
60        corr_config: CorrelationConfig,
61        include_event: bool,
62    ) -> Self {
63        RuntimeEngine {
64            engine: EngineVariant::DetectionOnly(Box::new(Engine::new())),
65            pipelines,
66            pipeline_paths: Vec::new(),
67            rules_path,
68            corr_config,
69            include_event,
70            source_resolver: None,
71            allow_remote_include: false,
72            bloom_prefilter: false,
73            bloom_max_bytes: None,
74            #[cfg(feature = "daachorse-index")]
75            cross_rule_ac: false,
76            rule_field_set: Arc::new(ArcSwap::from_pointee(RuleFieldSet::default())),
77        }
78    }
79
80    /// Return an immutable snapshot of the post-pipeline rule field set.
81    ///
82    /// Cheap to call: returns a refcounted handle that stays valid even if
83    /// `load_rules()` runs concurrently. The daemon's field-observability
84    /// endpoints use this to compute the intersection between observed
85    /// event keys and rule-referenced fields without coordinating with the
86    /// engine lock.
87    pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
88        self.rule_field_set.load_full()
89    }
90
91    /// Enable or disable bloom-filter pre-filtering on the inner detection
92    /// engine. Off by default. Applies on the next `load_rules()`; pre-load
93    /// callers should set this before calling `load_rules()`.
94    pub fn set_bloom_prefilter(&mut self, enabled: bool) {
95        self.bloom_prefilter = enabled;
96    }
97
98    /// Return the current bloom pre-filter setting. Used by hot-reload to
99    /// carry tuning across a `RuntimeEngine` swap so a daemon-startup
100    /// `set_bloom_prefilter(true)` does not silently revert on the first
101    /// reload.
102    pub fn bloom_prefilter(&self) -> bool {
103        self.bloom_prefilter
104    }
105
106    /// Override the bloom memory budget on the inner detection engine.
107    /// Applies on the next `load_rules()`.
108    pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
109        self.bloom_max_bytes = Some(max_bytes);
110    }
111
112    /// Return the configured bloom memory budget, if one was set.
113    pub fn bloom_max_bytes(&self) -> Option<usize> {
114        self.bloom_max_bytes
115    }
116
117    /// Enable or disable the cross-rule Aho-Corasick pre-filter on the
118    /// inner detection engine. Off by default; the optimization helps only
119    /// on substring-heavy rule sets > ~5K rules. Applies on the next
120    /// `load_rules()`.
121    ///
122    /// Available behind the `daachorse-index` Cargo feature.
123    #[cfg(feature = "daachorse-index")]
124    pub fn set_cross_rule_ac(&mut self, enabled: bool) {
125        self.cross_rule_ac = enabled;
126    }
127
128    /// Return the current cross-rule Aho-Corasick setting.
129    #[cfg(feature = "daachorse-index")]
130    pub fn cross_rule_ac(&self) -> bool {
131        self.cross_rule_ac
132    }
133
134    /// Set a source resolver for dynamic pipeline sources.
135    ///
136    /// When set, `load_rules()` resolves dynamic sources and expands
137    /// `${source.*}` templates before compiling rules.
138    pub fn set_source_resolver(&mut self, resolver: Arc<dyn SourceResolver>) {
139        self.source_resolver = Some(resolver);
140    }
141
142    /// Get the source resolver, if one is configured.
143    pub fn source_resolver(&self) -> Option<&Arc<dyn SourceResolver>> {
144        self.source_resolver.as_ref()
145    }
146
147    /// Allow `include` directives to reference HTTP/NATS sources.
148    pub fn set_allow_remote_include(&mut self, allow: bool) {
149        self.allow_remote_include = allow;
150    }
151
152    /// Whether remote includes are allowed.
153    pub fn allow_remote_include(&self) -> bool {
154        self.allow_remote_include
155    }
156
157    /// Set the pipeline file paths used for hot-reload.
158    ///
159    /// When paths are set, `load_rules()` re-reads pipeline YAML from disk
160    /// before rebuilding the engine. This enables pipeline hot-reload
161    /// alongside rule hot-reload.
162    pub fn set_pipeline_paths(&mut self, paths: Vec<PathBuf>) {
163        self.pipeline_paths = paths;
164    }
165
166    /// Return the pipeline file paths (used by the daemon to set up watchers).
167    pub fn pipeline_paths(&self) -> &[PathBuf] {
168        &self.pipeline_paths
169    }
170
171    /// Resolve dynamic sources in all pipelines and expand templates.
172    ///
173    /// This is the async entry point for source resolution. Call this before
174    /// `load_rules()` when you have an async context available, or let
175    /// `load_rules()` handle it synchronously via `tokio::runtime::Handle`.
176    pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> {
177        let Some(resolver) = &self.source_resolver else {
178            return Ok(());
179        };
180
181        let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len());
182        for pipeline in &self.pipelines {
183            if pipeline.is_dynamic() {
184                match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await {
185                    Ok(resolved_data) => {
186                        let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
187                        // Expand include directives
188                        sources::include::expand_includes(
189                            &mut expanded,
190                            &resolved_data,
191                            self.allow_remote_include,
192                        )?;
193                        resolved_pipelines.push(expanded);
194                    }
195                    Err(e) => {
196                        return Err(format!(
197                            "Failed to resolve dynamic pipeline '{}': {e}",
198                            pipeline.name
199                        ));
200                    }
201                }
202            } else {
203                resolved_pipelines.push(pipeline.clone());
204            }
205        }
206        self.pipelines = resolved_pipelines;
207        Ok(())
208    }
209
210    /// Load (or reload) rules from the configured path.
211    ///
212    /// On reload, correlation state is exported before replacing the engine
213    /// and re-imported after, so in-flight windows and suppression state
214    /// survive rule changes (entries for removed correlations are dropped).
215    ///
216    /// If pipeline paths are set (via [`set_pipeline_paths`](Self::set_pipeline_paths)),
217    /// pipelines are re-read from disk before rebuilding the engine. If any
218    /// pipeline file fails to parse, the entire reload is aborted and the
219    /// old engine remains active.
220    ///
221    /// Dynamic pipeline sources are resolved if a source resolver is configured.
222    pub fn load_rules(&mut self) -> Result<EngineStats, String> {
223        let load_span = tracing::info_span!("load_rules", rules_path = %self.rules_path.display());
224        let _enter = load_span.enter();
225        let load_start = std::time::Instant::now();
226
227        if !self.pipeline_paths.is_empty() {
228            self.pipelines = reload_pipelines(&self.pipeline_paths)?;
229        }
230
231        // Resolve dynamic sources if a resolver is set.
232        //
233        // Both error cases must fail closed. Loading rules with unresolved
234        // `${source.*}` templates produces rules whose semantics differ
235        // from what the operator wrote; on a hot-reload, the previous
236        // engine is still serving traffic, so returning an error here
237        // keeps it active rather than silently replacing it with a broken
238        // one.
239        if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) {
240            let handle = tokio::runtime::Handle::try_current().map_err(|_| {
241                "Dynamic pipelines require a tokio runtime; refusing to load rules with \
242                 unresolved sources"
243                    .to_string()
244            })?;
245            let pipelines = std::mem::take(&mut self.pipelines);
246            let resolver = self.source_resolver.clone().unwrap();
247            let allow_remote = self.allow_remote_include;
248            let resolved = tokio::task::block_in_place(|| {
249                handle.block_on(async {
250                    resolve_pipelines_async(&resolver, &pipelines, allow_remote).await
251                })
252            });
253            match resolved {
254                Ok(p) => self.pipelines = p,
255                Err(e) => {
256                    // Restore the captured pipelines so a higher-level
257                    // retry can re-run the same load against the same
258                    // inputs.
259                    self.pipelines = pipelines;
260                    return Err(format!("Dynamic source resolution failed: {e}"));
261                }
262            }
263        }
264
265        let previous_state = self.export_state();
266        let collection = load_collection(&self.rules_path)?;
267        let has_correlations = !collection.correlations.is_empty();
268
269        if has_correlations {
270            let mut engine = CorrelationEngine::new(self.corr_config.clone());
271            engine.set_include_event(self.include_event);
272            if let Some(budget) = self.bloom_max_bytes {
273                engine.set_bloom_max_bytes(budget);
274            }
275            engine.set_bloom_prefilter(self.bloom_prefilter);
276            #[cfg(feature = "daachorse-index")]
277            engine.set_cross_rule_ac(self.cross_rule_ac);
278            for p in &self.pipelines {
279                engine.add_pipeline(p.clone());
280            }
281            engine
282                .add_collection(&collection)
283                .map_err(|e| format!("Error compiling rules: {e}"))?;
284
285            if let Some(snapshot) = previous_state {
286                engine.import_state(snapshot);
287            }
288
289            let stats = EngineStats {
290                detection_rules: engine.detection_rule_count(),
291                correlation_rules: engine.correlation_rule_count(),
292                state_entries: engine.state_count(),
293            };
294            self.engine = EngineVariant::WithCorrelations(Box::new(engine));
295            self.refresh_rule_field_set(&collection);
296            tracing::debug!(
297                detection_rules = stats.detection_rules,
298                correlation_rules = stats.correlation_rules,
299                duration_ms = load_start.elapsed().as_millis() as u64,
300                "Rule load complete",
301            );
302            Ok(stats)
303        } else {
304            let mut engine = Engine::new();
305            engine.set_include_event(self.include_event);
306            if let Some(budget) = self.bloom_max_bytes {
307                engine.set_bloom_max_bytes(budget);
308            }
309            engine.set_bloom_prefilter(self.bloom_prefilter);
310            #[cfg(feature = "daachorse-index")]
311            engine.set_cross_rule_ac(self.cross_rule_ac);
312            for p in &self.pipelines {
313                engine.add_pipeline(p.clone());
314            }
315            engine
316                .add_collection(&collection)
317                .map_err(|e| format!("Error compiling rules: {e}"))?;
318
319            let stats = EngineStats {
320                detection_rules: engine.rule_count(),
321                correlation_rules: 0,
322                state_entries: 0,
323            };
324            self.engine = EngineVariant::DetectionOnly(Box::new(engine));
325            self.refresh_rule_field_set(&collection);
326            tracing::debug!(
327                detection_rules = stats.detection_rules,
328                correlation_rules = stats.correlation_rules,
329                duration_ms = load_start.elapsed().as_millis() as u64,
330                "Rule load complete",
331            );
332            Ok(stats)
333        }
334    }
335
336    /// Recompute the post-pipeline rule field set and publish it. Called at
337    /// the end of every successful `load_rules()` branch.
338    fn refresh_rule_field_set(&self, collection: &SigmaCollection) {
339        let field_set = RuleFieldSet::collect(collection, &self.pipelines, true);
340        self.rule_field_set.store(Arc::new(field_set));
341    }
342
343    /// Process a batch of events using parallel detection + sequential correlation.
344    ///
345    /// Delegates to `Engine::evaluate_batch` or `CorrelationEngine::process_batch`
346    /// depending on whether correlation rules are loaded.
347    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
348        match &mut self.engine {
349            EngineVariant::DetectionOnly(engine) => engine.evaluate_batch(events),
350            EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
351        }
352    }
353
354    /// Return summary statistics about the current engine state.
355    pub fn stats(&self) -> EngineStats {
356        match &self.engine {
357            EngineVariant::DetectionOnly(engine) => EngineStats {
358                detection_rules: engine.rule_count(),
359                correlation_rules: 0,
360                state_entries: 0,
361            },
362            EngineVariant::WithCorrelations(engine) => EngineStats {
363                detection_rules: engine.detection_rule_count(),
364                correlation_rules: engine.correlation_rule_count(),
365                state_entries: engine.state_count(),
366            },
367        }
368    }
369
370    /// Return the path from which rules are loaded.
371    pub fn rules_path(&self) -> &Path {
372        &self.rules_path
373    }
374
375    /// Return the configured processing pipelines.
376    pub fn pipelines(&self) -> &[Pipeline] {
377        &self.pipelines
378    }
379
380    /// Return the correlation configuration.
381    pub fn corr_config(&self) -> &CorrelationConfig {
382        &self.corr_config
383    }
384
385    /// Whether detection results include the matched event.
386    pub fn include_event(&self) -> bool {
387        self.include_event
388    }
389
390    /// Export correlation state as a serializable snapshot.
391    /// Returns `None` if the engine is detection-only (no correlation state to persist).
392    pub fn export_state(&self) -> Option<CorrelationSnapshot> {
393        match &self.engine {
394            EngineVariant::DetectionOnly(_) => None,
395            EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
396        }
397    }
398
399    /// Import previously exported correlation state.
400    /// Returns `true` if the import succeeded, `false` if the snapshot version
401    /// is incompatible. No-op (returns `true`) if the engine is detection-only.
402    pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
403        if let EngineVariant::WithCorrelations(engine) = &mut self.engine {
404            engine.import_state(snapshot.clone())
405        } else {
406            true
407        }
408    }
409}
410
411fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
412    let collection = if path.is_dir() {
413        rsigma_parser::parse_sigma_directory(path)
414            .map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
415    } else {
416        rsigma_parser::parse_sigma_file(path)
417            .map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
418    };
419
420    if !collection.errors.is_empty() {
421        tracing::warn!(
422            count = collection.errors.len(),
423            "Parse errors while loading rules"
424        );
425        for (i, err) in collection.errors.iter().take(3).enumerate() {
426            tracing::warn!(index = i + 1, error = %err, "Rule parse error detail");
427        }
428    }
429
430    Ok(collection)
431}
432
433/// Re-read and parse all pipeline files from disk, sorted by priority.
434///
435/// Every pipeline file that still declares an inline `sources:` block
436/// triggers the [`warn_pipeline_inline_sources`](crate::warn_pipeline_inline_sources)
437/// deprecation notice. The helper deduplicates by canonical path across the
438/// whole process, so the warning surfaces on the first daemon hot-reload that
439/// observes a deprecated pipeline and is silent on subsequent reloads of the
440/// same file (every SIGHUP, file-watcher event, or `POST /api/v1/reload`
441/// thereafter is a noop for the dedup set).
442fn reload_pipelines(paths: &[PathBuf]) -> Result<Vec<Pipeline>, String> {
443    let mut pipelines = Vec::with_capacity(paths.len());
444    for path in paths {
445        let pipeline = parse_pipeline_file(path)
446            .map_err(|e| format!("Error reloading pipeline {}: {e}", path.display()))?;
447        if !pipeline.sources.is_empty() {
448            crate::warn_pipeline_inline_sources(path, &pipeline.name);
449        }
450        pipelines.push(pipeline);
451    }
452    pipelines.sort_by_key(|p| p.priority);
453    Ok(pipelines)
454}
455
456/// Resolve dynamic sources in pipelines asynchronously.
457async fn resolve_pipelines_async(
458    resolver: &Arc<dyn SourceResolver>,
459    pipelines: &[Pipeline],
460    allow_remote_include: bool,
461) -> Result<Vec<Pipeline>, String> {
462    let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
463    for pipeline in pipelines {
464        if pipeline.is_dynamic() {
465            let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources)
466                .await
467                .map_err(|e| {
468                    format!(
469                        "Failed to resolve dynamic pipeline '{}': {e}",
470                        pipeline.name
471                    )
472                })?;
473            let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
474            sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?;
475            resolved_pipelines.push(expanded);
476        } else {
477            resolved_pipelines.push(pipeline.clone());
478        }
479    }
480    Ok(resolved_pipelines)
481}
482
483#[cfg(test)]
484mod tests {
485    use super::*;
486    use crate::pipeline_deprecation::reset_inline_sources_dedup_for_tests;
487    use std::sync::Mutex;
488
489    // The pipeline-embedded `sources:` dedup set is process-wide, so tests
490    // that read it need to serialize. cargo test runs tests in a binary
491    // concurrently; this guard turns those into sequential probes of the
492    // shared set.
493    static DEDUP_TEST_GUARD: Mutex<()> = Mutex::new(());
494
495    const RULE_YAML: &str = r#"
496title: TestRule
497id: 11111111-1111-1111-1111-111111111111
498status: experimental
499logsource:
500    product: test
501detection:
502    selection:
503        EventID: 1
504    condition: selection
505"#;
506
507    const PIPELINE_WITH_SOURCES: &str = r#"
508name: legacy_pipeline_with_inline_sources
509priority: 50
510sources:
511  - id: threat_feed
512    type: file
513    path: /tmp/does-not-matter.json
514    format: json
515transformations:
516  - type: value_placeholders
517"#;
518
519    const PIPELINE_NO_SOURCES: &str = r#"
520name: simple_pipeline
521priority: 10
522transformations:
523  - id: rename
524    type: field_name_mapping
525    mapping:
526      EventID: event.id
527"#;
528
529    fn dedup_set_contains(path: &Path) -> bool {
530        let canonical = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
531        crate::pipeline_deprecation::tests_only_snapshot().contains(&canonical)
532    }
533
534    #[test]
535    fn load_rules_surfaces_inline_sources_deprecation_through_runtime() {
536        let _guard = DEDUP_TEST_GUARD.lock().unwrap();
537        reset_inline_sources_dedup_for_tests();
538
539        let dir = tempfile::tempdir().unwrap();
540        let rule_path = dir.path().join("rule.yml");
541        std::fs::write(&rule_path, RULE_YAML).unwrap();
542
543        let pipeline_path = dir.path().join("pipeline.yml");
544        std::fs::write(&pipeline_path, PIPELINE_WITH_SOURCES).unwrap();
545        let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
546
547        let mut engine = RuntimeEngine::new(
548            rule_path,
549            vec![pipeline],
550            CorrelationConfig::default(),
551            false,
552        );
553        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
554        engine.load_rules().unwrap();
555
556        assert!(
557            dedup_set_contains(&pipeline_path),
558            "RuntimeEngine::load_rules should route inline sources through \
559             warn_pipeline_inline_sources so the daemon hot-reload path \
560             covers the deprecation; the canonical pipeline path was not \
561             recorded in the dedup set."
562        );
563    }
564
565    #[test]
566    fn load_rules_does_not_warn_when_pipeline_has_no_inline_sources() {
567        let _guard = DEDUP_TEST_GUARD.lock().unwrap();
568        reset_inline_sources_dedup_for_tests();
569
570        let dir = tempfile::tempdir().unwrap();
571        let rule_path = dir.path().join("rule.yml");
572        std::fs::write(&rule_path, RULE_YAML).unwrap();
573
574        let pipeline_path = dir.path().join("clean.yml");
575        std::fs::write(&pipeline_path, PIPELINE_NO_SOURCES).unwrap();
576        let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
577
578        let mut engine = RuntimeEngine::new(
579            rule_path,
580            vec![pipeline],
581            CorrelationConfig::default(),
582            false,
583        );
584        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
585        engine.load_rules().unwrap();
586
587        assert!(
588            !dedup_set_contains(&pipeline_path),
589            "a pipeline without inline sources must not register in the \
590             deprecation dedup set."
591        );
592    }
593
594    #[test]
595    fn hot_reload_dedups_inline_sources_warning_for_same_pipeline_path() {
596        let _guard = DEDUP_TEST_GUARD.lock().unwrap();
597        reset_inline_sources_dedup_for_tests();
598
599        let dir = tempfile::tempdir().unwrap();
600        let rule_path = dir.path().join("rule.yml");
601        std::fs::write(&rule_path, RULE_YAML).unwrap();
602
603        let pipeline_path = dir.path().join("pipeline.yml");
604        std::fs::write(&pipeline_path, PIPELINE_WITH_SOURCES).unwrap();
605        let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
606
607        let mut engine = RuntimeEngine::new(
608            rule_path,
609            vec![pipeline],
610            CorrelationConfig::default(),
611            false,
612        );
613        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
614
615        // Initial daemon startup loads the pipeline once.
616        engine.load_rules().unwrap();
617        assert!(dedup_set_contains(&pipeline_path));
618
619        // A hot-reload (SIGHUP, file-watcher event, POST /api/v1/reload)
620        // re-enters reload_pipelines; the dedup set must already contain
621        // the canonical path so the warning does not re-fire. The proof is
622        // that the set state is unchanged after the second reload.
623        let canonical = pipeline_path.canonicalize().unwrap();
624        let before = crate::pipeline_deprecation::tests_only_snapshot();
625        engine.load_rules().unwrap();
626        let after = crate::pipeline_deprecation::tests_only_snapshot();
627
628        assert_eq!(
629            before, after,
630            "second load_rules should not change the dedup set",
631        );
632        assert!(after.contains(&canonical));
633    }
634
635    #[tokio::test(flavor = "multi_thread")]
636    async fn load_rules_fails_closed_when_dynamic_source_resolution_fails() {
637        // A dynamic pipeline whose source cannot resolve must surface the
638        // error so callers (LogProcessor::reload_rules in particular) can
639        // keep the previous engine active. The historical behavior logged
640        // a warning and loaded rules with unexpanded `${source.*}`
641        // placeholders, which silently produced detection rules with
642        // different semantics from what the operator wrote.
643        let dir = tempfile::tempdir().unwrap();
644        let rule_path = dir.path().join("rule.yml");
645        std::fs::write(&rule_path, RULE_YAML).unwrap();
646
647        // Pipeline declares a file dynamic source pointing at a path that
648        // does not exist; the resolver returns SourceError on first read.
649        let missing = dir.path().join("missing.json");
650        let pipeline_yaml = format!(
651            r#"
652name: dynamic_missing
653priority: 10
654sources:
655  - id: feed
656    type: file
657    path: {}
658    format: json
659    on_error: fail
660transformations:
661  - type: value_placeholders
662"#,
663            missing.display(),
664        );
665        let pipeline_path = dir.path().join("pipeline.yml");
666        std::fs::write(&pipeline_path, pipeline_yaml).unwrap();
667        let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
668        assert!(
669            pipeline.is_dynamic(),
670            "fixture should produce a dynamic pipeline"
671        );
672
673        let mut engine = RuntimeEngine::new(
674            rule_path,
675            vec![pipeline],
676            CorrelationConfig::default(),
677            false,
678        );
679        engine.set_source_resolver(Arc::new(sources::DefaultSourceResolver::new()));
680
681        let err = engine
682            .load_rules()
683            .expect_err("missing source must cause load_rules to fail closed");
684        assert!(
685            err.contains("Dynamic source resolution failed"),
686            "error should explain the fail-closed path; got: {err}"
687        );
688    }
689}