Skip to main content

rsigma_runtime/
engine.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use arc_swap::ArcSwap;
5use rsigma_eval::event::Event;
6use rsigma_eval::{
7    CorrelationConfig, CorrelationEngine, CorrelationSnapshot, Engine, MatchDetailLevel, Pipeline,
8    ProcessResult, RuleFieldSet, parse_pipeline_file,
9};
10use rsigma_parser::SigmaCollection;
11
12use crate::sources::{self, SourceResolver, TemplateExpander};
13
14/// Wraps a CorrelationEngine (or a plain Engine) and provides the interface
15/// the runtime needs: process events, reload rules, and query state.
16pub struct RuntimeEngine {
17    engine: EngineVariant,
18    pipelines: Vec<Pipeline>,
19    pipeline_paths: Vec<PathBuf>,
20    rules_path: std::path::PathBuf,
21    corr_config: CorrelationConfig,
22    include_event: bool,
23    source_resolver: Option<Arc<dyn SourceResolver>>,
24    allow_remote_include: bool,
25    /// Opt-in bloom-filter pre-filtering of positive substring matchers.
26    /// Forwarded to the inner detection engine on every rule reload.
27    bloom_prefilter: bool,
28    /// Optional override for the bloom memory budget in bytes. `None`
29    /// means use the eval crate default.
30    bloom_max_bytes: Option<usize>,
31    /// Match-detail verbosity forwarded to the inner detection engine on
32    /// every rule reload. `Off` by default (historical wire shape).
33    match_detail: MatchDetailLevel,
34    /// Opt-in cross-rule Aho-Corasick pre-filter. Forwarded to the inner
35    /// detection engine on every rule reload. Available behind the
36    /// `daachorse-index` Cargo feature.
37    #[cfg(feature = "daachorse-index")]
38    cross_rule_ac: bool,
39    /// Post-pipeline rule field set, refreshed at the end of every
40    /// `load_rules()`. Wrapped in `ArcSwap` so readers (e.g. the daemon's
41    /// `/api/v1/fields/*` endpoints) can snapshot a stable view without
42    /// blocking the hot path during a reload.
43    rule_field_set: Arc<ArcSwap<RuleFieldSet>>,
44}
45
46enum EngineVariant {
47    DetectionOnly(Box<Engine>),
48    WithCorrelations(Box<CorrelationEngine>),
49}
50
51/// Summary statistics about the loaded engine state.
52#[derive(Debug, Clone, Copy)]
53pub struct EngineStats {
54    pub detection_rules: usize,
55    pub correlation_rules: usize,
56    pub state_entries: usize,
57}
58
59impl RuntimeEngine {
60    pub fn new(
61        rules_path: std::path::PathBuf,
62        pipelines: Vec<Pipeline>,
63        corr_config: CorrelationConfig,
64        include_event: bool,
65    ) -> Self {
66        RuntimeEngine {
67            engine: EngineVariant::DetectionOnly(Box::new(Engine::new())),
68            pipelines,
69            pipeline_paths: Vec::new(),
70            rules_path,
71            corr_config,
72            include_event,
73            source_resolver: None,
74            allow_remote_include: false,
75            bloom_prefilter: false,
76            bloom_max_bytes: None,
77            match_detail: MatchDetailLevel::Off,
78            #[cfg(feature = "daachorse-index")]
79            cross_rule_ac: false,
80            rule_field_set: Arc::new(ArcSwap::from_pointee(RuleFieldSet::default())),
81        }
82    }
83
84    /// Return an immutable snapshot of the post-pipeline rule field set.
85    ///
86    /// Cheap to call: returns a refcounted handle that stays valid even if
87    /// `load_rules()` runs concurrently. The daemon's field-observability
88    /// endpoints use this to compute the intersection between observed
89    /// event keys and rule-referenced fields without coordinating with the
90    /// engine lock.
91    pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
92        self.rule_field_set.load_full()
93    }
94
95    /// Enable or disable bloom-filter pre-filtering on the inner detection
96    /// engine. Off by default. Applies on the next `load_rules()`; pre-load
97    /// callers should set this before calling `load_rules()`.
98    pub fn set_bloom_prefilter(&mut self, enabled: bool) {
99        self.bloom_prefilter = enabled;
100    }
101
102    /// Return the current bloom pre-filter setting. Used by hot-reload to
103    /// carry tuning across a `RuntimeEngine` swap so a daemon-startup
104    /// `set_bloom_prefilter(true)` does not silently revert on the first
105    /// reload.
106    pub fn bloom_prefilter(&self) -> bool {
107        self.bloom_prefilter
108    }
109
110    /// Override the bloom memory budget on the inner detection engine.
111    /// Applies on the next `load_rules()`.
112    pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
113        self.bloom_max_bytes = Some(max_bytes);
114    }
115
116    /// Return the configured bloom memory budget, if one was set.
117    pub fn bloom_max_bytes(&self) -> Option<usize> {
118        self.bloom_max_bytes
119    }
120
121    /// Set the match-detail verbosity on the inner detection engine.
122    /// `Off` by default. Applies on the next `load_rules()`; pre-load
123    /// callers should set this before calling `load_rules()`.
124    pub fn set_match_detail(&mut self, level: MatchDetailLevel) {
125        self.match_detail = level;
126    }
127
128    /// Return the current match-detail verbosity. Used by hot-reload to
129    /// carry the setting across a `RuntimeEngine` swap.
130    pub fn match_detail(&self) -> MatchDetailLevel {
131        self.match_detail
132    }
133
134    /// Enable or disable the cross-rule Aho-Corasick pre-filter on the
135    /// inner detection engine. Off by default; the optimization helps only
136    /// on substring-heavy rule sets > ~5K rules. Applies on the next
137    /// `load_rules()`.
138    ///
139    /// Available behind the `daachorse-index` Cargo feature.
140    #[cfg(feature = "daachorse-index")]
141    pub fn set_cross_rule_ac(&mut self, enabled: bool) {
142        self.cross_rule_ac = enabled;
143    }
144
145    /// Return the current cross-rule Aho-Corasick setting.
146    #[cfg(feature = "daachorse-index")]
147    pub fn cross_rule_ac(&self) -> bool {
148        self.cross_rule_ac
149    }
150
151    /// Set a source resolver for dynamic pipeline sources.
152    ///
153    /// When set, `load_rules()` resolves dynamic sources and expands
154    /// `${source.*}` templates before compiling rules.
155    pub fn set_source_resolver(&mut self, resolver: Arc<dyn SourceResolver>) {
156        self.source_resolver = Some(resolver);
157    }
158
159    /// Get the source resolver, if one is configured.
160    pub fn source_resolver(&self) -> Option<&Arc<dyn SourceResolver>> {
161        self.source_resolver.as_ref()
162    }
163
164    /// Allow `include` directives to reference HTTP/NATS sources.
165    pub fn set_allow_remote_include(&mut self, allow: bool) {
166        self.allow_remote_include = allow;
167    }
168
169    /// Whether remote includes are allowed.
170    pub fn allow_remote_include(&self) -> bool {
171        self.allow_remote_include
172    }
173
174    /// Set the pipeline file paths used for hot-reload.
175    ///
176    /// When paths are set, `load_rules()` re-reads pipeline YAML from disk
177    /// before rebuilding the engine. This enables pipeline hot-reload
178    /// alongside rule hot-reload.
179    pub fn set_pipeline_paths(&mut self, paths: Vec<PathBuf>) {
180        self.pipeline_paths = paths;
181    }
182
183    /// Return the pipeline file paths (used by the daemon to set up watchers).
184    pub fn pipeline_paths(&self) -> &[PathBuf] {
185        &self.pipeline_paths
186    }
187
188    /// Resolve dynamic sources in all pipelines and expand templates.
189    ///
190    /// This is the async entry point for source resolution. Call this before
191    /// `load_rules()` when you have an async context available, or let
192    /// `load_rules()` handle it synchronously via `tokio::runtime::Handle`.
193    pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> {
194        let Some(resolver) = &self.source_resolver else {
195            return Ok(());
196        };
197
198        let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len());
199        for pipeline in &self.pipelines {
200            if pipeline.is_dynamic() {
201                match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await {
202                    Ok(resolved_data) => {
203                        let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
204                        // Expand include directives
205                        sources::include::expand_includes(
206                            &mut expanded,
207                            &resolved_data,
208                            self.allow_remote_include,
209                        )?;
210                        resolved_pipelines.push(expanded);
211                    }
212                    Err(e) => {
213                        return Err(format!(
214                            "Failed to resolve dynamic pipeline '{}': {e}",
215                            pipeline.name
216                        ));
217                    }
218                }
219            } else {
220                resolved_pipelines.push(pipeline.clone());
221            }
222        }
223        self.pipelines = resolved_pipelines;
224        Ok(())
225    }
226
227    /// Load (or reload) rules from the configured path.
228    ///
229    /// On reload, correlation state is exported before replacing the engine
230    /// and re-imported after, so in-flight windows and suppression state
231    /// survive rule changes (entries for removed correlations are dropped).
232    ///
233    /// If pipeline paths are set (via [`set_pipeline_paths`](Self::set_pipeline_paths)),
234    /// pipelines are re-read from disk before rebuilding the engine. If any
235    /// pipeline file fails to parse, the entire reload is aborted and the
236    /// old engine remains active.
237    ///
238    /// Dynamic pipeline sources are resolved if a source resolver is configured.
239    pub fn load_rules(&mut self) -> Result<EngineStats, String> {
240        let load_span = tracing::info_span!("load_rules", rules_path = %self.rules_path.display());
241        let _enter = load_span.enter();
242        let load_start = std::time::Instant::now();
243
244        if !self.pipeline_paths.is_empty() {
245            self.pipelines = reload_pipelines(&self.pipeline_paths)?;
246        }
247
248        // Resolve dynamic sources if a resolver is set.
249        //
250        // Both error cases must fail closed. Loading rules with unresolved
251        // `${source.*}` templates produces rules whose semantics differ
252        // from what the operator wrote; on a hot-reload, the previous
253        // engine is still serving traffic, so returning an error here
254        // keeps it active rather than silently replacing it with a broken
255        // one.
256        if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) {
257            let handle = tokio::runtime::Handle::try_current().map_err(|_| {
258                "Dynamic pipelines require a tokio runtime; refusing to load rules with \
259                 unresolved sources"
260                    .to_string()
261            })?;
262            let pipelines = std::mem::take(&mut self.pipelines);
263            let resolver = self.source_resolver.clone().unwrap();
264            let allow_remote = self.allow_remote_include;
265            let resolved = tokio::task::block_in_place(|| {
266                handle.block_on(async {
267                    resolve_pipelines_async(&resolver, &pipelines, allow_remote).await
268                })
269            });
270            match resolved {
271                Ok(p) => self.pipelines = p,
272                Err(e) => {
273                    // Restore the captured pipelines so a higher-level
274                    // retry can re-run the same load against the same
275                    // inputs.
276                    self.pipelines = pipelines;
277                    return Err(format!("Dynamic source resolution failed: {e}"));
278                }
279            }
280        }
281
282        let previous_state = self.export_state();
283        let collection = load_collection(&self.rules_path)?;
284        let has_correlations = !collection.correlations.is_empty();
285
286        if has_correlations {
287            let mut engine = CorrelationEngine::new(self.corr_config.clone());
288            engine.set_include_event(self.include_event);
289            engine.set_match_detail(self.match_detail);
290            if let Some(budget) = self.bloom_max_bytes {
291                engine.set_bloom_max_bytes(budget);
292            }
293            engine.set_bloom_prefilter(self.bloom_prefilter);
294            #[cfg(feature = "daachorse-index")]
295            engine.set_cross_rule_ac(self.cross_rule_ac);
296            for p in &self.pipelines {
297                engine.add_pipeline(p.clone());
298            }
299            engine
300                .add_collection(&collection)
301                .map_err(|e| format!("Error compiling rules: {e}"))?;
302
303            if let Some(snapshot) = previous_state {
304                engine.import_state(snapshot);
305            }
306
307            let stats = EngineStats {
308                detection_rules: engine.detection_rule_count(),
309                correlation_rules: engine.correlation_rule_count(),
310                state_entries: engine.state_count(),
311            };
312            self.engine = EngineVariant::WithCorrelations(Box::new(engine));
313            self.refresh_rule_field_set(&collection);
314            tracing::debug!(
315                detection_rules = stats.detection_rules,
316                correlation_rules = stats.correlation_rules,
317                duration_ms = load_start.elapsed().as_millis() as u64,
318                "Rule load complete",
319            );
320            Ok(stats)
321        } else {
322            let mut engine = Engine::new();
323            engine.set_include_event(self.include_event);
324            engine.set_match_detail(self.match_detail);
325            if let Some(budget) = self.bloom_max_bytes {
326                engine.set_bloom_max_bytes(budget);
327            }
328            engine.set_bloom_prefilter(self.bloom_prefilter);
329            #[cfg(feature = "daachorse-index")]
330            engine.set_cross_rule_ac(self.cross_rule_ac);
331            for p in &self.pipelines {
332                engine.add_pipeline(p.clone());
333            }
334            engine
335                .add_collection(&collection)
336                .map_err(|e| format!("Error compiling rules: {e}"))?;
337
338            let stats = EngineStats {
339                detection_rules: engine.rule_count(),
340                correlation_rules: 0,
341                state_entries: 0,
342            };
343            self.engine = EngineVariant::DetectionOnly(Box::new(engine));
344            self.refresh_rule_field_set(&collection);
345            tracing::debug!(
346                detection_rules = stats.detection_rules,
347                correlation_rules = stats.correlation_rules,
348                duration_ms = load_start.elapsed().as_millis() as u64,
349                "Rule load complete",
350            );
351            Ok(stats)
352        }
353    }
354
355    /// Recompute the post-pipeline rule field set and publish it. Called at
356    /// the end of every successful `load_rules()` branch.
357    fn refresh_rule_field_set(&self, collection: &SigmaCollection) {
358        let field_set = RuleFieldSet::collect(collection, &self.pipelines, true);
359        self.rule_field_set.store(Arc::new(field_set));
360    }
361
362    /// Process a batch of events using parallel detection + sequential correlation.
363    ///
364    /// Delegates to `Engine::evaluate_batch` or `CorrelationEngine::process_batch`
365    /// depending on whether correlation rules are loaded.
366    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
367        match &mut self.engine {
368            EngineVariant::DetectionOnly(engine) => engine.evaluate_batch(events),
369            EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
370        }
371    }
372
373    /// Return summary statistics about the current engine state.
374    pub fn stats(&self) -> EngineStats {
375        match &self.engine {
376            EngineVariant::DetectionOnly(engine) => EngineStats {
377                detection_rules: engine.rule_count(),
378                correlation_rules: 0,
379                state_entries: 0,
380            },
381            EngineVariant::WithCorrelations(engine) => EngineStats {
382                detection_rules: engine.detection_rule_count(),
383                correlation_rules: engine.correlation_rule_count(),
384                state_entries: engine.state_count(),
385            },
386        }
387    }
388
389    /// Return the path from which rules are loaded.
390    pub fn rules_path(&self) -> &Path {
391        &self.rules_path
392    }
393
394    /// Return the configured processing pipelines.
395    pub fn pipelines(&self) -> &[Pipeline] {
396        &self.pipelines
397    }
398
399    /// Return the correlation configuration.
400    pub fn corr_config(&self) -> &CorrelationConfig {
401        &self.corr_config
402    }
403
404    /// Whether detection results include the matched event.
405    pub fn include_event(&self) -> bool {
406        self.include_event
407    }
408
409    /// Export correlation state as a serializable snapshot.
410    /// Returns `None` if the engine is detection-only (no correlation state to persist).
411    pub fn export_state(&self) -> Option<CorrelationSnapshot> {
412        match &self.engine {
413            EngineVariant::DetectionOnly(_) => None,
414            EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
415        }
416    }
417
418    /// Import previously exported correlation state.
419    /// Returns `true` if the import succeeded, `false` if the snapshot version
420    /// is incompatible. No-op (returns `true`) if the engine is detection-only.
421    pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
422        if let EngineVariant::WithCorrelations(engine) = &mut self.engine {
423            engine.import_state(snapshot.clone())
424        } else {
425            true
426        }
427    }
428}
429
430fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
431    let collection = if path.is_dir() {
432        rsigma_parser::parse_sigma_directory(path)
433            .map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
434    } else {
435        rsigma_parser::parse_sigma_file(path)
436            .map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
437    };
438
439    if !collection.errors.is_empty() {
440        tracing::warn!(
441            count = collection.errors.len(),
442            "Parse errors while loading rules"
443        );
444        for (i, err) in collection.errors.iter().take(3).enumerate() {
445            tracing::warn!(index = i + 1, error = %err, "Rule parse error detail");
446        }
447    }
448
449    Ok(collection)
450}
451
452/// Re-read and parse all pipeline files from disk, sorted by priority.
453///
454/// Every pipeline file that still declares an inline `sources:` block
455/// triggers the [`warn_pipeline_inline_sources`](crate::warn_pipeline_inline_sources)
456/// deprecation notice. The helper deduplicates by canonical path across the
457/// whole process, so the warning surfaces on the first daemon hot-reload that
458/// observes a deprecated pipeline and is silent on subsequent reloads of the
459/// same file (every SIGHUP, file-watcher event, or `POST /api/v1/reload`
460/// thereafter is a noop for the dedup set).
461fn reload_pipelines(paths: &[PathBuf]) -> Result<Vec<Pipeline>, String> {
462    let mut pipelines = Vec::with_capacity(paths.len());
463    for path in paths {
464        let pipeline = parse_pipeline_file(path)
465            .map_err(|e| format!("Error reloading pipeline {}: {e}", path.display()))?;
466        if !pipeline.sources.is_empty() {
467            crate::warn_pipeline_inline_sources(path, &pipeline.name);
468        }
469        pipelines.push(pipeline);
470    }
471    pipelines.sort_by_key(|p| p.priority);
472    Ok(pipelines)
473}
474
475/// Resolve dynamic sources in pipelines asynchronously.
476async fn resolve_pipelines_async(
477    resolver: &Arc<dyn SourceResolver>,
478    pipelines: &[Pipeline],
479    allow_remote_include: bool,
480) -> Result<Vec<Pipeline>, String> {
481    let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
482    for pipeline in pipelines {
483        if pipeline.is_dynamic() {
484            let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources)
485                .await
486                .map_err(|e| {
487                    format!(
488                        "Failed to resolve dynamic pipeline '{}': {e}",
489                        pipeline.name
490                    )
491                })?;
492            let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
493            sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?;
494            resolved_pipelines.push(expanded);
495        } else {
496            resolved_pipelines.push(pipeline.clone());
497        }
498    }
499    Ok(resolved_pipelines)
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505    use crate::pipeline_deprecation::reset_inline_sources_dedup_for_tests;
506
507    // The pipeline-embedded `sources:` dedup set is process-wide, so tests
508    // that read it must serialize against every other test that touches it,
509    // including the unit tests in `pipeline_deprecation`. They share one lock
510    // (`DEDUP_TEST_LOCK`) so cargo's parallel test threads don't race on the
511    // global set. `serial_guard` recovers a poisoned lock so a failing test
512    // does not cascade into the others.
513    fn serial_guard() -> std::sync::MutexGuard<'static, ()> {
514        crate::pipeline_deprecation::DEDUP_TEST_LOCK
515            .lock()
516            .unwrap_or_else(|poisoned| poisoned.into_inner())
517    }
518
519    const RULE_YAML: &str = r#"
520title: TestRule
521id: 11111111-1111-1111-1111-111111111111
522status: experimental
523logsource:
524    product: test
525detection:
526    selection:
527        EventID: 1
528    condition: selection
529"#;
530
531    const PIPELINE_WITH_SOURCES: &str = r#"
532name: legacy_pipeline_with_inline_sources
533priority: 50
534sources:
535  - id: threat_feed
536    type: file
537    path: /tmp/does-not-matter.json
538    format: json
539transformations:
540  - type: value_placeholders
541"#;
542
543    const PIPELINE_NO_SOURCES: &str = r#"
544name: simple_pipeline
545priority: 10
546transformations:
547  - id: rename
548    type: field_name_mapping
549    mapping:
550      EventID: event.id
551"#;
552
553    fn dedup_set_contains(path: &Path) -> bool {
554        let canonical = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
555        crate::pipeline_deprecation::tests_only_snapshot().contains(&canonical)
556    }
557
558    #[test]
559    fn load_rules_surfaces_inline_sources_deprecation_through_runtime() {
560        let _guard = serial_guard();
561        reset_inline_sources_dedup_for_tests();
562
563        let dir = tempfile::tempdir().unwrap();
564        let rule_path = dir.path().join("rule.yml");
565        std::fs::write(&rule_path, RULE_YAML).unwrap();
566
567        let pipeline_path = dir.path().join("pipeline.yml");
568        std::fs::write(&pipeline_path, PIPELINE_WITH_SOURCES).unwrap();
569        let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
570
571        let mut engine = RuntimeEngine::new(
572            rule_path,
573            vec![pipeline],
574            CorrelationConfig::default(),
575            false,
576        );
577        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
578        engine.load_rules().unwrap();
579
580        assert!(
581            dedup_set_contains(&pipeline_path),
582            "RuntimeEngine::load_rules should route inline sources through \
583             warn_pipeline_inline_sources so the daemon hot-reload path \
584             covers the deprecation; the canonical pipeline path was not \
585             recorded in the dedup set."
586        );
587    }
588
589    #[test]
590    fn load_rules_does_not_warn_when_pipeline_has_no_inline_sources() {
591        let _guard = serial_guard();
592        reset_inline_sources_dedup_for_tests();
593
594        let dir = tempfile::tempdir().unwrap();
595        let rule_path = dir.path().join("rule.yml");
596        std::fs::write(&rule_path, RULE_YAML).unwrap();
597
598        let pipeline_path = dir.path().join("clean.yml");
599        std::fs::write(&pipeline_path, PIPELINE_NO_SOURCES).unwrap();
600        let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
601
602        let mut engine = RuntimeEngine::new(
603            rule_path,
604            vec![pipeline],
605            CorrelationConfig::default(),
606            false,
607        );
608        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
609        engine.load_rules().unwrap();
610
611        assert!(
612            !dedup_set_contains(&pipeline_path),
613            "a pipeline without inline sources must not register in the \
614             deprecation dedup set."
615        );
616    }
617
618    #[test]
619    fn hot_reload_dedups_inline_sources_warning_for_same_pipeline_path() {
620        let _guard = serial_guard();
621        reset_inline_sources_dedup_for_tests();
622
623        let dir = tempfile::tempdir().unwrap();
624        let rule_path = dir.path().join("rule.yml");
625        std::fs::write(&rule_path, RULE_YAML).unwrap();
626
627        let pipeline_path = dir.path().join("pipeline.yml");
628        std::fs::write(&pipeline_path, PIPELINE_WITH_SOURCES).unwrap();
629        let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
630
631        let mut engine = RuntimeEngine::new(
632            rule_path,
633            vec![pipeline],
634            CorrelationConfig::default(),
635            false,
636        );
637        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
638
639        // Initial daemon startup loads the pipeline once.
640        engine.load_rules().unwrap();
641        assert!(dedup_set_contains(&pipeline_path));
642
643        // A hot-reload (SIGHUP, file-watcher event, POST /api/v1/reload)
644        // re-enters reload_pipelines; the dedup set must already contain
645        // the canonical path so the warning does not re-fire. The proof is
646        // that the set state is unchanged after the second reload.
647        let canonical = pipeline_path.canonicalize().unwrap();
648        let before = crate::pipeline_deprecation::tests_only_snapshot();
649        engine.load_rules().unwrap();
650        let after = crate::pipeline_deprecation::tests_only_snapshot();
651
652        assert_eq!(
653            before, after,
654            "second load_rules should not change the dedup set",
655        );
656        assert!(after.contains(&canonical));
657    }
658
659    #[tokio::test(flavor = "multi_thread")]
660    async fn load_rules_fails_closed_when_dynamic_source_resolution_fails() {
661        // A dynamic pipeline whose source cannot resolve must surface the
662        // error so callers (LogProcessor::reload_rules in particular) can
663        // keep the previous engine active. The historical behavior logged
664        // a warning and loaded rules with unexpanded `${source.*}`
665        // placeholders, which silently produced detection rules with
666        // different semantics from what the operator wrote.
667        let dir = tempfile::tempdir().unwrap();
668        let rule_path = dir.path().join("rule.yml");
669        std::fs::write(&rule_path, RULE_YAML).unwrap();
670
671        // Pipeline declares a file dynamic source pointing at a path that
672        // does not exist; the resolver returns SourceError on first read.
673        let missing = dir.path().join("missing.json");
674        let pipeline_yaml = format!(
675            r#"
676name: dynamic_missing
677priority: 10
678sources:
679  - id: feed
680    type: file
681    path: {}
682    format: json
683    on_error: fail
684transformations:
685  - type: value_placeholders
686"#,
687            missing.display(),
688        );
689        let pipeline_path = dir.path().join("pipeline.yml");
690        std::fs::write(&pipeline_path, pipeline_yaml).unwrap();
691        let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
692        assert!(
693            pipeline.is_dynamic(),
694            "fixture should produce a dynamic pipeline"
695        );
696
697        let mut engine = RuntimeEngine::new(
698            rule_path,
699            vec![pipeline],
700            CorrelationConfig::default(),
701            false,
702        );
703        engine.set_source_resolver(Arc::new(sources::DefaultSourceResolver::new()));
704
705        let err = engine
706            .load_rules()
707            .expect_err("missing source must cause load_rules to fail closed");
708        assert!(
709            err.contains("Dynamic source resolution failed"),
710            "error should explain the fail-closed path; got: {err}"
711        );
712    }
713}