Skip to main content

rsigma_runtime/
engine.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use arc_swap::ArcSwap;
5use rsigma_eval::event::Event;
6use rsigma_eval::{
7    CorrelationConfig, CorrelationEngine, CorrelationSnapshot, CorrelationStateSnapshot, Engine,
8    LogSourceExtractor, MatchDetailLevel, Pipeline, ProcessResult, RoutingPlan, RuleFieldSet,
9    SchemaClassifier, SchemaRouter, parse_pipeline_file,
10};
11use rsigma_parser::SigmaCollection;
12
13use crate::sources::{self, SourceResolver, TemplateExpander};
14
15/// Wraps a CorrelationEngine (or a plain Engine) and provides the interface
16/// the runtime needs: process events, reload rules, and query state.
17pub struct RuntimeEngine {
18    engine: EngineVariant,
19    pipelines: Vec<Pipeline>,
20    pipeline_paths: Vec<PathBuf>,
21    rules_path: std::path::PathBuf,
22    corr_config: CorrelationConfig,
23    include_event: bool,
24    source_resolver: Option<Arc<dyn SourceResolver>>,
25    allow_remote_include: bool,
26    /// Opt-in bloom-filter pre-filtering of positive substring matchers.
27    /// Forwarded to the inner detection engine on every rule reload.
28    bloom_prefilter: bool,
29    /// Optional override for the bloom memory budget in bytes. `None`
30    /// means use the eval crate default.
31    bloom_max_bytes: Option<usize>,
32    /// Match-detail verbosity forwarded to the inner detection engine on
33    /// every rule reload. `Off` by default (historical wire shape).
34    match_detail: MatchDetailLevel,
35    /// Opt-in cross-rule Aho-Corasick pre-filter. Forwarded to the inner
36    /// detection engine on every rule reload. Available behind the
37    /// `daachorse-index` Cargo feature.
38    #[cfg(feature = "daachorse-index")]
39    cross_rule_ac: bool,
40    /// Post-pipeline rule field set, refreshed at the end of every
41    /// `load_rules()`. Wrapped in `ArcSwap` so readers (e.g. the daemon's
42    /// `/api/v1/fields/*` endpoints) can snapshot a stable view without
43    /// blocking the hot path during a reload.
44    rule_field_set: Arc<ArcSwap<RuleFieldSet>>,
45    /// Optional schema-routing spec. When set, `load_rules` builds a
46    /// [`SchemaRouter`] (one detection engine per pipeline-set plus a shared
47    /// correlation store) instead of a single engine, and rebuilds it on
48    /// hot-reload from the same spec.
49    routing: Option<RoutingSpec>,
50    /// Opt-in conflict-based logsource extractor. Forwarded to the inner
51    /// detection engine(s) on every rule reload and carried across hot-reload.
52    logsource_extractor: Option<LogSourceExtractor>,
53}
54
55/// Everything needed to (re)build a [`SchemaRouter`] on rule load. Pipeline
56/// sets are pre-resolved by the caller (builtin names + files), index-aligned
57/// with `plan.pipeline_sets()`.
58#[derive(Clone)]
59pub struct RoutingSpec {
60    pub classifier: SchemaClassifier,
61    pub plan: RoutingPlan,
62    pub pipeline_sets: Vec<Vec<Pipeline>>,
63}
64
65enum EngineVariant {
66    DetectionOnly(Box<Engine>),
67    WithCorrelations(Box<CorrelationEngine>),
68    Routed(Box<SchemaRouter>),
69}
70
71/// Summary statistics about the loaded engine state.
72#[derive(Debug, Clone, Copy)]
73pub struct EngineStats {
74    pub detection_rules: usize,
75    pub correlation_rules: usize,
76    pub state_entries: usize,
77}
78
79impl RuntimeEngine {
80    pub fn new(
81        rules_path: std::path::PathBuf,
82        pipelines: Vec<Pipeline>,
83        corr_config: CorrelationConfig,
84        include_event: bool,
85    ) -> Self {
86        RuntimeEngine {
87            engine: EngineVariant::DetectionOnly(Box::new(Engine::new())),
88            pipelines,
89            pipeline_paths: Vec::new(),
90            rules_path,
91            corr_config,
92            include_event,
93            source_resolver: None,
94            allow_remote_include: false,
95            bloom_prefilter: false,
96            bloom_max_bytes: None,
97            match_detail: MatchDetailLevel::Off,
98            #[cfg(feature = "daachorse-index")]
99            cross_rule_ac: false,
100            rule_field_set: Arc::new(ArcSwap::from_pointee(RuleFieldSet::default())),
101            routing: None,
102            logsource_extractor: None,
103        }
104    }
105
106    /// Enable schema routing. The next `load_rules()` builds a
107    /// [`SchemaRouter`] from this spec instead of a single engine. Pass `None`
108    /// to disable. Set before `load_rules()`; hot-reload carries it forward.
109    pub fn set_routing(&mut self, spec: Option<RoutingSpec>) {
110        self.routing = spec;
111    }
112
113    /// Whether schema routing is configured.
114    pub fn has_routing(&self) -> bool {
115        self.routing.is_some()
116    }
117
118    /// Return the routing spec, if any. Used by hot-reload to carry the
119    /// configuration across a `RuntimeEngine` swap.
120    pub fn routing(&self) -> Option<RoutingSpec> {
121        self.routing.clone()
122    }
123
124    /// Set the opt-in logsource extractor. The next `load_rules()` forwards it
125    /// to the inner detection engine(s); hot-reload carries it forward. Pass
126    /// `None` to disable. Set before `load_rules()`.
127    pub fn set_logsource_extractor(&mut self, extractor: Option<LogSourceExtractor>) {
128        self.logsource_extractor = extractor;
129    }
130
131    /// Return the logsource extractor, if any. Used by hot-reload to carry the
132    /// configuration across a `RuntimeEngine` swap.
133    pub fn logsource_extractor(&self) -> Option<LogSourceExtractor> {
134        self.logsource_extractor.clone()
135    }
136
137    /// Total rule candidates pruned by logsource across the active engine
138    /// variant since the last load. Zero when no extractor is configured.
139    pub fn logsource_pruned_total(&self) -> u64 {
140        match &self.engine {
141            EngineVariant::DetectionOnly(engine) => engine.logsource_pruned_total(),
142            EngineVariant::WithCorrelations(engine) => engine.logsource_pruned_total(),
143            EngineVariant::Routed(router) => router.logsource_pruned_total(),
144        }
145    }
146
147    /// Total evaluate calls with no extractable event logsource (fail-open)
148    /// across the active engine variant.
149    pub fn logsource_absent_total(&self) -> u64 {
150        match &self.engine {
151            EngineVariant::DetectionOnly(engine) => engine.logsource_absent_total(),
152            EngineVariant::WithCorrelations(engine) => engine.logsource_absent_total(),
153            EngineVariant::Routed(router) => router.logsource_absent_total(),
154        }
155    }
156
157    /// Return an immutable snapshot of the post-pipeline rule field set.
158    ///
159    /// Cheap to call: returns a refcounted handle that stays valid even if
160    /// `load_rules()` runs concurrently. The daemon's field-observability
161    /// endpoints use this to compute the intersection between observed
162    /// event keys and rule-referenced fields without coordinating with the
163    /// engine lock.
164    pub fn rule_field_set(&self) -> Arc<RuleFieldSet> {
165        self.rule_field_set.load_full()
166    }
167
168    /// Enable or disable bloom-filter pre-filtering on the inner detection
169    /// engine. Off by default. Applies on the next `load_rules()`; pre-load
170    /// callers should set this before calling `load_rules()`.
171    pub fn set_bloom_prefilter(&mut self, enabled: bool) {
172        self.bloom_prefilter = enabled;
173    }
174
175    /// Return the current bloom pre-filter setting. Used by hot-reload to
176    /// carry tuning across a `RuntimeEngine` swap so a daemon-startup
177    /// `set_bloom_prefilter(true)` does not silently revert on the first
178    /// reload.
179    pub fn bloom_prefilter(&self) -> bool {
180        self.bloom_prefilter
181    }
182
183    /// Override the bloom memory budget on the inner detection engine.
184    /// Applies on the next `load_rules()`.
185    pub fn set_bloom_max_bytes(&mut self, max_bytes: usize) {
186        self.bloom_max_bytes = Some(max_bytes);
187    }
188
189    /// Return the configured bloom memory budget, if one was set.
190    pub fn bloom_max_bytes(&self) -> Option<usize> {
191        self.bloom_max_bytes
192    }
193
194    /// Set the match-detail verbosity on the inner detection engine.
195    /// `Off` by default. Applies on the next `load_rules()`; pre-load
196    /// callers should set this before calling `load_rules()`.
197    pub fn set_match_detail(&mut self, level: MatchDetailLevel) {
198        self.match_detail = level;
199    }
200
201    /// Return the current match-detail verbosity. Used by hot-reload to
202    /// carry the setting across a `RuntimeEngine` swap.
203    pub fn match_detail(&self) -> MatchDetailLevel {
204        self.match_detail
205    }
206
207    /// Enable or disable the cross-rule Aho-Corasick pre-filter on the
208    /// inner detection engine. Off by default; the optimization helps only
209    /// on substring-heavy rule sets > ~5K rules. Applies on the next
210    /// `load_rules()`.
211    ///
212    /// Available behind the `daachorse-index` Cargo feature.
213    #[cfg(feature = "daachorse-index")]
214    pub fn set_cross_rule_ac(&mut self, enabled: bool) {
215        self.cross_rule_ac = enabled;
216    }
217
218    /// Return the current cross-rule Aho-Corasick setting.
219    #[cfg(feature = "daachorse-index")]
220    pub fn cross_rule_ac(&self) -> bool {
221        self.cross_rule_ac
222    }
223
224    /// Set a source resolver for dynamic pipeline sources.
225    ///
226    /// When set, `load_rules()` resolves dynamic sources and expands
227    /// `${source.*}` templates before compiling rules.
228    pub fn set_source_resolver(&mut self, resolver: Arc<dyn SourceResolver>) {
229        self.source_resolver = Some(resolver);
230    }
231
232    /// Get the source resolver, if one is configured.
233    pub fn source_resolver(&self) -> Option<&Arc<dyn SourceResolver>> {
234        self.source_resolver.as_ref()
235    }
236
237    /// Allow `include` directives to reference HTTP/NATS sources.
238    pub fn set_allow_remote_include(&mut self, allow: bool) {
239        self.allow_remote_include = allow;
240    }
241
242    /// Whether remote includes are allowed.
243    pub fn allow_remote_include(&self) -> bool {
244        self.allow_remote_include
245    }
246
247    /// Set the pipeline file paths used for hot-reload.
248    ///
249    /// When paths are set, `load_rules()` re-reads pipeline YAML from disk
250    /// before rebuilding the engine. This enables pipeline hot-reload
251    /// alongside rule hot-reload.
252    pub fn set_pipeline_paths(&mut self, paths: Vec<PathBuf>) {
253        self.pipeline_paths = paths;
254    }
255
256    /// Return the pipeline file paths (used by the daemon to set up watchers).
257    pub fn pipeline_paths(&self) -> &[PathBuf] {
258        &self.pipeline_paths
259    }
260
261    /// Resolve dynamic sources in all pipelines and expand templates.
262    ///
263    /// This is the async entry point for source resolution. Call this before
264    /// `load_rules()` when you have an async context available, or let
265    /// `load_rules()` handle it synchronously via `tokio::runtime::Handle`.
266    pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> {
267        let Some(resolver) = &self.source_resolver else {
268            return Ok(());
269        };
270
271        let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len());
272        for pipeline in &self.pipelines {
273            if pipeline.is_dynamic() {
274                match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await {
275                    Ok(resolved_data) => {
276                        let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
277                        // Expand include directives
278                        sources::include::expand_includes(
279                            &mut expanded,
280                            &resolved_data,
281                            self.allow_remote_include,
282                        )?;
283                        resolved_pipelines.push(expanded);
284                    }
285                    Err(e) => {
286                        return Err(format!(
287                            "Failed to resolve dynamic pipeline '{}': {e}",
288                            pipeline.name
289                        ));
290                    }
291                }
292            } else {
293                resolved_pipelines.push(pipeline.clone());
294            }
295        }
296        self.pipelines = resolved_pipelines;
297        Ok(())
298    }
299
300    /// Load (or reload) rules from the configured path.
301    ///
302    /// On reload, correlation state is exported before replacing the engine
303    /// and re-imported after, so in-flight windows and suppression state
304    /// survive rule changes (entries for removed correlations are dropped).
305    ///
306    /// If pipeline paths are set (via [`set_pipeline_paths`](Self::set_pipeline_paths)),
307    /// pipelines are re-read from disk before rebuilding the engine. If any
308    /// pipeline file fails to parse, the entire reload is aborted and the
309    /// old engine remains active.
310    ///
311    /// Dynamic pipeline sources are resolved if a source resolver is configured.
312    pub fn load_rules(&mut self) -> Result<EngineStats, String> {
313        let load_span = tracing::info_span!("load_rules", rules_path = %self.rules_path.display());
314        let _enter = load_span.enter();
315        let load_start = std::time::Instant::now();
316
317        if !self.pipeline_paths.is_empty() {
318            self.pipelines = reload_pipelines(&self.pipeline_paths)?;
319        }
320
321        // Resolve dynamic sources if a resolver is set.
322        //
323        // Both error cases must fail closed. Loading rules with unresolved
324        // `${source.*}` templates produces rules whose semantics differ
325        // from what the operator wrote; on a hot-reload, the previous
326        // engine is still serving traffic, so returning an error here
327        // keeps it active rather than silently replacing it with a broken
328        // one.
329        if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) {
330            let handle = tokio::runtime::Handle::try_current().map_err(|_| {
331                "Dynamic pipelines require a tokio runtime; refusing to load rules with \
332                 unresolved sources"
333                    .to_string()
334            })?;
335            let pipelines = std::mem::take(&mut self.pipelines);
336            let resolver = self.source_resolver.clone().unwrap();
337            let allow_remote = self.allow_remote_include;
338            let resolved = tokio::task::block_in_place(|| {
339                handle.block_on(async {
340                    resolve_pipelines_async(&resolver, &pipelines, allow_remote).await
341                })
342            });
343            match resolved {
344                Ok(p) => self.pipelines = p,
345                Err(e) => {
346                    // Restore the captured pipelines so a higher-level
347                    // retry can re-run the same load against the same
348                    // inputs.
349                    self.pipelines = pipelines;
350                    return Err(format!("Dynamic source resolution failed: {e}"));
351                }
352            }
353        }
354
355        let previous_state = self.export_state();
356        let collection = load_collection(&self.rules_path)?;
357        let has_correlations = !collection.correlations.is_empty();
358
359        if let Some(mut spec) = self.routing.clone() {
360            // Resolve dynamic `${source.*}` sources in the routed pipeline-sets,
361            // with the same fail-closed policy as the single-engine path above.
362            if let Some(resolver) = self.source_resolver.clone() {
363                let needs_resolve = spec
364                    .pipeline_sets
365                    .iter()
366                    .any(|set| set.iter().any(|p| p.is_dynamic()));
367                if needs_resolve {
368                    let handle = tokio::runtime::Handle::try_current().map_err(|_| {
369                        "Dynamic pipelines require a tokio runtime; refusing to load rules with \
370                         unresolved sources"
371                            .to_string()
372                    })?;
373                    let allow_remote = self.allow_remote_include;
374                    let mut resolved_sets = Vec::with_capacity(spec.pipeline_sets.len());
375                    for set in spec.pipeline_sets {
376                        let resolved = tokio::task::block_in_place(|| {
377                            handle.block_on(async {
378                                resolve_pipelines_async(&resolver, &set, allow_remote).await
379                            })
380                        });
381                        match resolved {
382                            Ok(p) => resolved_sets.push(p),
383                            Err(e) => {
384                                return Err(format!(
385                                    "Dynamic source resolution failed (schema routing): {e}"
386                                ));
387                            }
388                        }
389                    }
390                    spec.pipeline_sets = resolved_sets;
391                }
392            }
393
394            let mut router = SchemaRouter::build(
395                &collection,
396                spec.classifier,
397                spec.plan,
398                spec.pipeline_sets,
399                self.corr_config.clone(),
400                self.include_event,
401                self.match_detail,
402                self.logsource_extractor.clone(),
403            )
404            .map_err(|e| format!("Error building schema router: {e}"))?;
405
406            if let Some(snapshot) = previous_state {
407                router.import_state(snapshot);
408            }
409
410            let stats = EngineStats {
411                detection_rules: router.detection_rule_count(),
412                correlation_rules: router.correlation_rule_count(),
413                state_entries: router.state_count(),
414            };
415            self.engine = EngineVariant::Routed(Box::new(router));
416            self.refresh_rule_field_set(&collection);
417            tracing::debug!(
418                detection_rules = stats.detection_rules,
419                correlation_rules = stats.correlation_rules,
420                duration_ms = load_start.elapsed().as_millis() as u64,
421                "Rule load complete (schema routing)",
422            );
423            return Ok(stats);
424        }
425
426        if has_correlations {
427            let mut engine = CorrelationEngine::new(self.corr_config.clone());
428            engine.set_include_event(self.include_event);
429            engine.set_match_detail(self.match_detail);
430            if let Some(budget) = self.bloom_max_bytes {
431                engine.set_bloom_max_bytes(budget);
432            }
433            engine.set_bloom_prefilter(self.bloom_prefilter);
434            #[cfg(feature = "daachorse-index")]
435            engine.set_cross_rule_ac(self.cross_rule_ac);
436            engine.set_logsource_extractor(self.logsource_extractor.clone());
437            for p in &self.pipelines {
438                engine.add_pipeline(p.clone());
439            }
440            engine
441                .add_collection(&collection)
442                .map_err(|e| format!("Error compiling rules: {e}"))?;
443
444            if let Some(snapshot) = previous_state {
445                engine.import_state(snapshot);
446            }
447
448            let stats = EngineStats {
449                detection_rules: engine.detection_rule_count(),
450                correlation_rules: engine.correlation_rule_count(),
451                state_entries: engine.state_count(),
452            };
453            self.engine = EngineVariant::WithCorrelations(Box::new(engine));
454            self.refresh_rule_field_set(&collection);
455            tracing::debug!(
456                detection_rules = stats.detection_rules,
457                correlation_rules = stats.correlation_rules,
458                duration_ms = load_start.elapsed().as_millis() as u64,
459                "Rule load complete",
460            );
461            Ok(stats)
462        } else {
463            let mut engine = Engine::new();
464            engine.set_include_event(self.include_event);
465            engine.set_match_detail(self.match_detail);
466            if let Some(budget) = self.bloom_max_bytes {
467                engine.set_bloom_max_bytes(budget);
468            }
469            engine.set_bloom_prefilter(self.bloom_prefilter);
470            #[cfg(feature = "daachorse-index")]
471            engine.set_cross_rule_ac(self.cross_rule_ac);
472            engine.set_logsource_extractor(self.logsource_extractor.clone());
473            for p in &self.pipelines {
474                engine.add_pipeline(p.clone());
475            }
476            engine
477                .add_collection(&collection)
478                .map_err(|e| format!("Error compiling rules: {e}"))?;
479
480            let stats = EngineStats {
481                detection_rules: engine.rule_count(),
482                correlation_rules: 0,
483                state_entries: 0,
484            };
485            self.engine = EngineVariant::DetectionOnly(Box::new(engine));
486            self.refresh_rule_field_set(&collection);
487            tracing::debug!(
488                detection_rules = stats.detection_rules,
489                correlation_rules = stats.correlation_rules,
490                duration_ms = load_start.elapsed().as_millis() as u64,
491                "Rule load complete",
492            );
493            Ok(stats)
494        }
495    }
496
497    /// Recompute the post-pipeline rule field set and publish it. Called at
498    /// the end of every successful `load_rules()` branch.
499    fn refresh_rule_field_set(&self, collection: &SigmaCollection) {
500        let field_set = RuleFieldSet::collect(collection, &self.pipelines, true);
501        self.rule_field_set.store(Arc::new(field_set));
502    }
503
504    /// Process a batch of events using parallel detection + sequential correlation.
505    ///
506    /// Delegates to `Engine::evaluate_batch` or `CorrelationEngine::process_batch`
507    /// depending on whether correlation rules are loaded.
508    pub fn process_batch<E: Event + Sync>(&mut self, events: &[&E]) -> Vec<ProcessResult> {
509        match &mut self.engine {
510            EngineVariant::DetectionOnly(engine) => engine.evaluate_batch(events),
511            EngineVariant::WithCorrelations(engine) => engine.process_batch(events),
512            EngineVariant::Routed(router) => router.process_batch(events),
513        }
514    }
515
516    /// Return summary statistics about the current engine state.
517    pub fn stats(&self) -> EngineStats {
518        match &self.engine {
519            EngineVariant::DetectionOnly(engine) => EngineStats {
520                detection_rules: engine.rule_count(),
521                correlation_rules: 0,
522                state_entries: 0,
523            },
524            EngineVariant::WithCorrelations(engine) => EngineStats {
525                detection_rules: engine.detection_rule_count(),
526                correlation_rules: engine.correlation_rule_count(),
527                state_entries: engine.state_count(),
528            },
529            EngineVariant::Routed(router) => EngineStats {
530                detection_rules: router.detection_rule_count(),
531                correlation_rules: router.correlation_rule_count(),
532                state_entries: router.state_count(),
533            },
534        }
535    }
536
537    /// Return the path from which rules are loaded.
538    pub fn rules_path(&self) -> &Path {
539        &self.rules_path
540    }
541
542    /// Return the configured processing pipelines.
543    pub fn pipelines(&self) -> &[Pipeline] {
544        &self.pipelines
545    }
546
547    /// Return the correlation configuration.
548    pub fn corr_config(&self) -> &CorrelationConfig {
549        &self.corr_config
550    }
551
552    /// Whether detection results include the matched event.
553    pub fn include_event(&self) -> bool {
554        self.include_event
555    }
556
557    /// Export correlation state as a serializable snapshot.
558    /// Returns `None` if the engine is detection-only (no correlation state to persist).
559    pub fn export_state(&self) -> Option<CorrelationSnapshot> {
560        match &self.engine {
561            EngineVariant::DetectionOnly(_) => None,
562            EngineVariant::WithCorrelations(engine) => Some(engine.export_state()),
563            EngineVariant::Routed(router) => router.export_state(),
564        }
565    }
566
567    /// Read-only introspection of the correlation window state, filtered by
568    /// correlation id and/or group-key substring. `None` for a detection-only
569    /// engine or a routed engine with no correlation rules.
570    pub fn introspect_correlations(
571        &self,
572        id: Option<&str>,
573        group: Option<&str>,
574    ) -> Option<CorrelationStateSnapshot> {
575        match &self.engine {
576            EngineVariant::DetectionOnly(_) => None,
577            EngineVariant::WithCorrelations(engine) => Some(engine.introspect_filtered(id, group)),
578            EngineVariant::Routed(router) => router.correlation_introspect(id, group),
579        }
580    }
581
582    /// Import previously exported correlation state.
583    /// Returns `true` if the import succeeded, `false` if the snapshot version
584    /// is incompatible. No-op (returns `true`) if the engine is detection-only.
585    pub fn import_state(&mut self, snapshot: &CorrelationSnapshot) -> bool {
586        match &mut self.engine {
587            EngineVariant::WithCorrelations(engine) => engine.import_state(snapshot.clone()),
588            EngineVariant::Routed(router) => router.import_state(snapshot.clone()),
589            EngineVariant::DetectionOnly(_) => true,
590        }
591    }
592}
593
594fn load_collection(path: &Path) -> Result<SigmaCollection, String> {
595    let collection = if path.is_dir() {
596        rsigma_parser::parse_sigma_directory(path)
597            .map_err(|e| format!("Error loading rules from {}: {e}", path.display()))?
598    } else {
599        rsigma_parser::parse_sigma_file(path)
600            .map_err(|e| format!("Error loading rule {}: {e}", path.display()))?
601    };
602
603    if !collection.errors.is_empty() {
604        tracing::warn!(
605            count = collection.errors.len(),
606            "Parse errors while loading rules"
607        );
608        for (i, err) in collection.errors.iter().take(3).enumerate() {
609            tracing::warn!(index = i + 1, error = %err, "Rule parse error detail");
610        }
611    }
612
613    Ok(collection)
614}
615
616/// Re-read and parse all pipeline files from disk, sorted by priority.
617///
618/// Every pipeline file that still declares an inline `sources:` block
619/// triggers the [`warn_pipeline_inline_sources`](crate::warn_pipeline_inline_sources)
620/// deprecation notice. The helper deduplicates by canonical path across the
621/// whole process, so the warning surfaces on the first daemon hot-reload that
622/// observes a deprecated pipeline and is silent on subsequent reloads of the
623/// same file (every SIGHUP, file-watcher event, or `POST /api/v1/reload`
624/// thereafter is a noop for the dedup set).
625fn reload_pipelines(paths: &[PathBuf]) -> Result<Vec<Pipeline>, String> {
626    let mut pipelines = Vec::with_capacity(paths.len());
627    for path in paths {
628        let pipeline = parse_pipeline_file(path)
629            .map_err(|e| format!("Error reloading pipeline {}: {e}", path.display()))?;
630        if !pipeline.sources.is_empty() {
631            crate::warn_pipeline_inline_sources(path, &pipeline.name);
632        }
633        pipelines.push(pipeline);
634    }
635    pipelines.sort_by_key(|p| p.priority);
636    Ok(pipelines)
637}
638
639/// Resolve dynamic sources in pipelines asynchronously.
640async fn resolve_pipelines_async(
641    resolver: &Arc<dyn SourceResolver>,
642    pipelines: &[Pipeline],
643    allow_remote_include: bool,
644) -> Result<Vec<Pipeline>, String> {
645    let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
646    for pipeline in pipelines {
647        if pipeline.is_dynamic() {
648            let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources)
649                .await
650                .map_err(|e| {
651                    format!(
652                        "Failed to resolve dynamic pipeline '{}': {e}",
653                        pipeline.name
654                    )
655                })?;
656            let mut expanded = TemplateExpander::expand(pipeline, &resolved_data);
657            sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?;
658            resolved_pipelines.push(expanded);
659        } else {
660            resolved_pipelines.push(pipeline.clone());
661        }
662    }
663    Ok(resolved_pipelines)
664}
665
666#[cfg(test)]
667mod tests {
668    use super::*;
669    use crate::pipeline_deprecation::reset_inline_sources_dedup_for_tests;
670
671    // The pipeline-embedded `sources:` dedup set is process-wide, so tests
672    // that read it must serialize against every other test that touches it,
673    // including the unit tests in `pipeline_deprecation`. They share one lock
674    // (`DEDUP_TEST_LOCK`) so cargo's parallel test threads don't race on the
675    // global set. `serial_guard` recovers a poisoned lock so a failing test
676    // does not cascade into the others.
677    fn serial_guard() -> std::sync::MutexGuard<'static, ()> {
678        crate::pipeline_deprecation::DEDUP_TEST_LOCK
679            .lock()
680            .unwrap_or_else(|poisoned| poisoned.into_inner())
681    }
682
683    const RULE_YAML: &str = r#"
684title: TestRule
685id: 11111111-1111-1111-1111-111111111111
686status: experimental
687logsource:
688    product: test
689detection:
690    selection:
691        EventID: 1
692    condition: selection
693"#;
694
695    const PIPELINE_WITH_SOURCES: &str = r#"
696name: legacy_pipeline_with_inline_sources
697priority: 50
698sources:
699  - id: threat_feed
700    type: file
701    path: /tmp/does-not-matter.json
702    format: json
703transformations:
704  - type: value_placeholders
705"#;
706
707    const PIPELINE_NO_SOURCES: &str = r#"
708name: simple_pipeline
709priority: 10
710transformations:
711  - id: rename
712    type: field_name_mapping
713    mapping:
714      EventID: event.id
715"#;
716
717    fn dedup_set_contains(path: &Path) -> bool {
718        let canonical = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
719        crate::pipeline_deprecation::tests_only_snapshot().contains(&canonical)
720    }
721
722    #[test]
723    fn load_rules_surfaces_inline_sources_deprecation_through_runtime() {
724        let _guard = serial_guard();
725        reset_inline_sources_dedup_for_tests();
726
727        let dir = tempfile::tempdir().unwrap();
728        let rule_path = dir.path().join("rule.yml");
729        std::fs::write(&rule_path, RULE_YAML).unwrap();
730
731        let pipeline_path = dir.path().join("pipeline.yml");
732        std::fs::write(&pipeline_path, PIPELINE_WITH_SOURCES).unwrap();
733        let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
734
735        let mut engine = RuntimeEngine::new(
736            rule_path,
737            vec![pipeline],
738            CorrelationConfig::default(),
739            false,
740        );
741        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
742        engine.load_rules().unwrap();
743
744        assert!(
745            dedup_set_contains(&pipeline_path),
746            "RuntimeEngine::load_rules should route inline sources through \
747             warn_pipeline_inline_sources so the daemon hot-reload path \
748             covers the deprecation; the canonical pipeline path was not \
749             recorded in the dedup set."
750        );
751    }
752
753    #[test]
754    fn load_rules_does_not_warn_when_pipeline_has_no_inline_sources() {
755        let _guard = serial_guard();
756        reset_inline_sources_dedup_for_tests();
757
758        let dir = tempfile::tempdir().unwrap();
759        let rule_path = dir.path().join("rule.yml");
760        std::fs::write(&rule_path, RULE_YAML).unwrap();
761
762        let pipeline_path = dir.path().join("clean.yml");
763        std::fs::write(&pipeline_path, PIPELINE_NO_SOURCES).unwrap();
764        let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
765
766        let mut engine = RuntimeEngine::new(
767            rule_path,
768            vec![pipeline],
769            CorrelationConfig::default(),
770            false,
771        );
772        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
773        engine.load_rules().unwrap();
774
775        assert!(
776            !dedup_set_contains(&pipeline_path),
777            "a pipeline without inline sources must not register in the \
778             deprecation dedup set."
779        );
780    }
781
782    #[test]
783    fn hot_reload_dedups_inline_sources_warning_for_same_pipeline_path() {
784        let _guard = serial_guard();
785        reset_inline_sources_dedup_for_tests();
786
787        let dir = tempfile::tempdir().unwrap();
788        let rule_path = dir.path().join("rule.yml");
789        std::fs::write(&rule_path, RULE_YAML).unwrap();
790
791        let pipeline_path = dir.path().join("pipeline.yml");
792        std::fs::write(&pipeline_path, PIPELINE_WITH_SOURCES).unwrap();
793        let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
794
795        let mut engine = RuntimeEngine::new(
796            rule_path,
797            vec![pipeline],
798            CorrelationConfig::default(),
799            false,
800        );
801        engine.set_pipeline_paths(vec![pipeline_path.clone()]);
802
803        // Initial daemon startup loads the pipeline once.
804        engine.load_rules().unwrap();
805        assert!(dedup_set_contains(&pipeline_path));
806
807        // A hot-reload (SIGHUP, file-watcher event, POST /api/v1/reload)
808        // re-enters reload_pipelines; the dedup set must already contain
809        // the canonical path so the warning does not re-fire. The proof is
810        // that the set state is unchanged after the second reload.
811        let canonical = pipeline_path.canonicalize().unwrap();
812        let before = crate::pipeline_deprecation::tests_only_snapshot();
813        engine.load_rules().unwrap();
814        let after = crate::pipeline_deprecation::tests_only_snapshot();
815
816        assert_eq!(
817            before, after,
818            "second load_rules should not change the dedup set",
819        );
820        assert!(after.contains(&canonical));
821    }
822
823    #[tokio::test(flavor = "multi_thread")]
824    async fn load_rules_fails_closed_when_dynamic_source_resolution_fails() {
825        // A dynamic pipeline whose source cannot resolve must surface the
826        // error so callers (LogProcessor::reload_rules in particular) can
827        // keep the previous engine active. The historical behavior logged
828        // a warning and loaded rules with unexpanded `${source.*}`
829        // placeholders, which silently produced detection rules with
830        // different semantics from what the operator wrote.
831        let dir = tempfile::tempdir().unwrap();
832        let rule_path = dir.path().join("rule.yml");
833        std::fs::write(&rule_path, RULE_YAML).unwrap();
834
835        // Pipeline declares a file dynamic source pointing at a path that
836        // does not exist; the resolver returns SourceError on first read.
837        let missing = dir.path().join("missing.json");
838        let pipeline_yaml = format!(
839            r#"
840name: dynamic_missing
841priority: 10
842sources:
843  - id: feed
844    type: file
845    path: {}
846    format: json
847    on_error: fail
848transformations:
849  - type: value_placeholders
850"#,
851            missing.display(),
852        );
853        let pipeline_path = dir.path().join("pipeline.yml");
854        std::fs::write(&pipeline_path, pipeline_yaml).unwrap();
855        let pipeline = parse_pipeline_file(&pipeline_path).unwrap();
856        assert!(
857            pipeline.is_dynamic(),
858            "fixture should produce a dynamic pipeline"
859        );
860
861        let mut engine = RuntimeEngine::new(
862            rule_path,
863            vec![pipeline],
864            CorrelationConfig::default(),
865            false,
866        );
867        engine.set_source_resolver(Arc::new(sources::DefaultSourceResolver::new()));
868
869        let err = engine
870            .load_rules()
871            .expect_err("missing source must cause load_rules to fail closed");
872        assert!(
873            err.contains("Dynamic source resolution failed"),
874            "error should explain the fail-closed path; got: {err}"
875        );
876    }
877}