Skip to main content

rsigma_runtime/enrichment/
mod.rs

1//! Post-evaluation enrichment for the rsigma daemon.
2//!
3//! Enrichment runs in the daemon's sink task, after `Engine::evaluate()` has
4//! produced a [`ProcessResult`] (a flat `Vec<EvaluationResult>`) and before
5//! that result is serialized to a sink. Each enricher inspects an
6//! [`EvaluationResult`], optionally fetches context (HTTP, command, source
7//! cache, pure template), and writes the result into
8//! `result.header.enrichments` under a configured `inject_field`.
9//!
10//! # Architecture
11//!
12//! A single [`Enricher`] trait covers every primitive (`template`, `lookup`,
13//! `http`, `command`) and any bespoke Rust-coded enrichers. Each enricher
14//! declares an [`EnricherKind`] at config time; the [`EnrichmentPipeline`]
15//! filters results by that declared kind against the
16//! [`EvaluationResult::body`] variant before invoking `enrich()`. There are no
17//! parallel `DetectionEnricher` / `CorrelationEnricher` traits and no separate
18//! context types; enrichers consume `&mut EvaluationResult` directly and
19//! match on `result.body` for kind-specific fields.
20//!
21//! # Concurrency
22//!
23//! Results within a single sink batch are enriched concurrently with bounded
24//! concurrency via a single [`tokio::sync::Semaphore`] owned by the pipeline.
25//! Within a single result the enricher chain runs sequentially (so later
26//! enrichers can depend on earlier ones via `${detection.enrichments.*}` in a
27//! follow-up implementation; for this initial cut the chain is linear and
28//! enrichers are independent).
29//!
30//! # Errors
31//!
32//! Failures are scoped per enricher and do not abort the chain by default;
33//! the per-enricher `on_error` policy ([`OnError`]) decides whether to
34//! `skip` the field, inject a JSON `null` for it, or `drop` the entire
35//! result before serialization. Timeouts are enforced via
36//! [`tokio::time::timeout`] using each enricher's [`Enricher::timeout`].
37
38use std::sync::Arc;
39use std::time::Duration;
40
41use async_trait::async_trait;
42use rsigma_eval::{EvaluationResult, ResultBody};
43use tokio::sync::Semaphore;
44
45use crate::metrics::{MetricsHook, NoopMetrics};
46
47mod command;
48mod http;
49pub mod http_cache;
50mod lookup;
51mod scope;
52mod template;
53#[cfg(test)]
54mod tests;
55
56pub use command::{CommandEnricher, OutputFormat};
57pub use http::{HttpEnricher, HttpEnricherClient, build_default_http_client};
58pub use http_cache::{CacheKey, CacheOutcome, HttpResponseCache};
59pub use lookup::LookupEnricher;
60pub use scope::Scope;
61pub use template::{TemplateEnricher, TemplateError, validate_template_namespace};
62
63/// The kind of [`EvaluationResult`] an [`Enricher`] applies to.
64///
65/// Fixed at config load. Used for two things:
66/// 1. **Template-namespace validation at config load**: a `Detection` enricher
67///    may only reference `${detection.*}`; a `Correlation` enricher may only
68///    reference `${correlation.*}`. Cross-namespace references fail fast at
69///    startup.
70/// 2. **Runtime gating in [`EnrichmentPipeline::run`]**: enrichers whose
71///    declared kind does not match `result.body`'s variant are skipped before
72///    [`Enricher::enrich`] is invoked.
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
74pub enum EnricherKind {
75    /// Applies to detection results ([`ResultBody::Detection`]).
76    Detection,
77    /// Applies to correlation results ([`ResultBody::Correlation`]).
78    Correlation,
79}
80
81impl EnricherKind {
82    /// String label used in metrics, logs, and config errors.
83    pub fn as_str(&self) -> &'static str {
84        match self {
85            EnricherKind::Detection => "detection",
86            EnricherKind::Correlation => "correlation",
87        }
88    }
89
90    /// Returns true if this kind matches the given result body variant.
91    pub fn matches(&self, body: &ResultBody) -> bool {
92        matches!(
93            (self, body),
94            (EnricherKind::Detection, ResultBody::Detection(_))
95                | (EnricherKind::Correlation, ResultBody::Correlation(_))
96        )
97    }
98}
99
100/// Behavior when an enricher fails (timeout, fetch error, parse error, …).
101///
102/// Applied per enricher. Defaults to [`OnError::Skip`] so a single failed
103/// enrichment never breaks the result stream.
104#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
105pub enum OnError {
106    /// Deliver the result unenriched for this field. Default. Logs a warning.
107    #[default]
108    Skip,
109    /// Inject `null` under `inject_field` so downstream consumers see a
110    /// "we tried" marker rather than missing-field ambiguity.
111    Null,
112    /// Suppress the result entirely before serialization. Useful for
113    /// dedup / pre-filter style enrichers that intentionally drop matches
114    /// based on external context.
115    Drop,
116}
117
118/// A typed enrichment failure attributed to a specific enricher.
119#[derive(Debug, Clone)]
120pub struct EnrichError {
121    /// Stable ID of the enricher that produced the error.
122    pub enricher_id: String,
123    /// Categorized failure kind.
124    pub kind: EnrichErrorKind,
125}
126
127impl std::fmt::Display for EnrichError {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        write!(f, "enricher '{}': {}", self.enricher_id, self.kind)
130    }
131}
132
133impl std::error::Error for EnrichError {}
134
135/// Categorized enrichment failure.
136#[derive(Debug, Clone)]
137pub enum EnrichErrorKind {
138    /// The per-enricher timeout elapsed before [`Enricher::enrich`] returned.
139    Timeout,
140    /// External fetch failed (HTTP non-2xx, connection refused, command exit
141    /// status non-zero, etc.).
142    Fetch(String),
143    /// External response could not be parsed (e.g. invalid JSON).
144    Parse(String),
145    /// Extract expression (jq / jsonpath / cel) failed during evaluation.
146    Extract(String),
147    /// Template rendering failed at runtime (missing variable in a strict
148    /// resolver, invalid expansion, …). The config-load-time validator
149    /// catches namespace violations earlier.
150    TemplateRender(String),
151}
152
153impl std::fmt::Display for EnrichErrorKind {
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155        match self {
156            EnrichErrorKind::Timeout => write!(f, "timeout"),
157            EnrichErrorKind::Fetch(m) => write!(f, "fetch failed: {m}"),
158            EnrichErrorKind::Parse(m) => write!(f, "parse failed: {m}"),
159            EnrichErrorKind::Extract(m) => write!(f, "extract failed: {m}"),
160            EnrichErrorKind::TemplateRender(m) => write!(f, "template render failed: {m}"),
161        }
162    }
163}
164
165/// Trait implemented by every enrichment primitive (`template`, `lookup`,
166/// `http`, `command`) and by any bespoke Rust-coded named enricher
167/// registered via [`register_builtin`].
168///
169/// Implementations read shared rule metadata from [`EvaluationResult::header`]
170/// and dispatch on `result.body` for kind-specific fields. The pipeline
171/// guarantees that `result.body` matches `self.kind()` before calling
172/// `enrich()`, so implementations may rely on the matching variant
173/// (e.g. via [`EvaluationResult::as_detection`] /
174/// [`EvaluationResult::as_correlation`]).
175///
176/// `enrich` is async to accommodate I/O-bound primitives (HTTP, command).
177/// Pure transformations (`template`) still implement `enrich` as `async fn`
178/// even though they perform no I/O.
179#[async_trait]
180pub trait Enricher: Send + Sync {
181    /// The kind of result this enricher applies to. Fixed at config load.
182    fn kind(&self) -> EnricherKind;
183
184    /// Stable identifier for this enricher instance. Used as a metric label
185    /// and in structured log fields. Conventionally something like
186    /// `asset_lookup_det` or `enrich_hash_virustotal`.
187    fn id(&self) -> &str;
188
189    /// Field name under [`RuleHeader::enrichments`](rsigma_eval::RuleHeader::enrichments)
190    /// that this enricher writes to.
191    fn inject_field(&self) -> &str;
192
193    /// Per-enricher timeout. The pipeline wraps each `enrich()` call in
194    /// [`tokio::time::timeout`] using this value. Defaults to 5 seconds.
195    fn timeout(&self) -> Duration {
196        Duration::from_secs(5)
197    }
198
199    /// Optional scope filter. Applied after the kind-vs-body filter and
200    /// before `enrich()` runs. Default is [`Scope::default`] (always fires).
201    fn scope(&self) -> &Scope;
202
203    /// Behavior when this enricher fails (timeout, fetch error, …).
204    /// Defaults to [`OnError::Skip`].
205    fn on_error(&self) -> OnError {
206        OnError::Skip
207    }
208
209    /// Run the enrichment.
210    ///
211    /// Implementations write into [`RuleHeader::enrichments`](rsigma_eval::RuleHeader::enrichments)
212    /// under the configured [`Self::inject_field`]. The pipeline initializes
213    /// the map (`None` → `Some(empty)`) before invoking the first enricher
214    /// for a given result, so implementations can `unwrap` the map safely.
215    async fn enrich(&self, result: &mut EvaluationResult) -> Result<(), EnrichError>;
216}
217
218/// Outcome of running a single enricher against a single result.
219///
220/// Returned internally by [`EnrichmentPipeline::run_one`] so the pipeline
221/// driver can decide whether to drop the entire result, log, or continue.
222enum EnrichOutcome {
223    /// Enricher ran and (possibly) wrote into `enrichments`.
224    Ok,
225    /// Enricher errored or timed out and `on_error: skip` applied.
226    Skip,
227    /// Enricher errored or timed out and `on_error: null` applied; the
228    /// pipeline injected `null` under `inject_field`.
229    Null,
230    /// Enricher errored or timed out and `on_error: drop` applied; the
231    /// pipeline must remove this result from the output vec.
232    Drop,
233    /// Enricher was filtered out (kind or scope mismatch) before running.
234    Filtered,
235}
236
237/// Execution surface for a configured set of enrichers.
238///
239/// One pipeline owns one `Vec<Box<dyn Enricher>>` plus a shared
240/// [`Semaphore`] that bounds the number of in-flight enrichments across
241/// all results in a batch.
242///
243/// The pipeline is constructed by the daemon config layer
244/// (`crates/rsigma-cli/src/daemon/enrichment/config.rs`) and held inside
245/// the daemon's sink task. Each [`ProcessResult`](rsigma_eval::ProcessResult)
246/// (a `Vec<EvaluationResult>`) flows through [`EnrichmentPipeline::run`]
247/// before it is serialized.
248pub struct EnrichmentPipeline {
249    enrichers: Vec<Box<dyn Enricher>>,
250    semaphore: Arc<Semaphore>,
251    metrics: Arc<dyn MetricsHook>,
252}
253
254impl std::fmt::Debug for EnrichmentPipeline {
255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256        f.debug_struct("EnrichmentPipeline")
257            .field("enrichers", &self.enrichers.len())
258            .field("permits", &self.semaphore.available_permits())
259            .finish()
260    }
261}
262
263impl EnrichmentPipeline {
264    /// Build a pipeline from a list of configured enrichers.
265    ///
266    /// `max_concurrent_enrichments` bounds the number of results that can
267    /// be enriched in parallel; defaults to 16 if zero is passed.
268    /// Metrics default to a no-op sink; call [`Self::with_metrics`] to
269    /// route counters and latency histograms into a real
270    /// [`MetricsHook`] implementation.
271    pub fn new(enrichers: Vec<Box<dyn Enricher>>, max_concurrent_enrichments: usize) -> Self {
272        let permits = if max_concurrent_enrichments == 0 {
273            16
274        } else {
275            max_concurrent_enrichments
276        };
277        Self {
278            enrichers,
279            semaphore: Arc::new(Semaphore::new(permits)),
280            metrics: Arc::new(NoopMetrics),
281        }
282    }
283
284    /// Replace the metrics hook this pipeline reports into. The daemon
285    /// passes its Prometheus-backed `Metrics` here; library consumers
286    /// can pass any [`MetricsHook`] implementation.
287    ///
288    /// Pre-registers `(enricher_id, kind)` for every configured
289    /// enricher so `rsigma_enrichment_total{...}` and
290    /// `rsigma_enrichment_duration_seconds{...}` are emitted on
291    /// `/metrics` from the first scrape, even before any enricher has
292    /// fired.
293    pub fn with_metrics(mut self, metrics: Arc<dyn MetricsHook>) -> Self {
294        for enricher in &self.enrichers {
295            metrics.register_enricher(enricher.id(), enricher.kind().as_str());
296        }
297        self.metrics = metrics;
298        self
299    }
300
301    /// Returns true if no enrichers are configured.
302    ///
303    /// The daemon sink task uses this to skip enrichment work entirely
304    /// (no permit acquisition, no per-result loop) when no enrichers are
305    /// configured.
306    pub fn is_empty(&self) -> bool {
307        self.enrichers.is_empty()
308    }
309
310    /// Number of configured enrichers (across both kinds).
311    pub fn len(&self) -> usize {
312        self.enrichers.len()
313    }
314
315    /// Iterate over the configured enrichers (read-only view, used for
316    /// reload-diff logging and tests).
317    pub fn enrichers(&self) -> impl Iterator<Item = &dyn Enricher> {
318        self.enrichers.iter().map(|e| &**e)
319    }
320
321    /// Run every applicable enricher against each result in `results`.
322    ///
323    /// For each result the pipeline:
324    /// 1. Acquires a global semaphore permit (bounded concurrency).
325    /// 2. Iterates the configured enricher list.
326    /// 3. Skips enrichers whose [`EnricherKind`] does not match the
327    ///    result's body variant.
328    /// 4. Skips enrichers whose [`Scope`] excludes this result.
329    /// 5. Wraps each remaining `enrich()` call in
330    ///    [`tokio::time::timeout`] using the enricher's timeout.
331    /// 6. On error, applies the enricher's [`OnError`] policy.
332    /// 7. If any enricher in the chain returns
333    ///    [`EnrichOutcome::Drop`], the result is removed from the output.
334    ///
335    /// The pipeline initializes `result.header.enrichments` to
336    /// `Some(empty)` lazily on first successful injection, so the
337    /// `skip_serializing_if = "Option::is_none"` contract on
338    /// `RuleHeader::enrichments` is preserved when no enricher writes.
339    ///
340    /// Currently sequential per result; concurrent across results would
341    /// require splitting `results` into chunks across futures and is left
342    /// for a follow-up tuning pass once we have realistic throughput
343    /// numbers from the integration tests.
344    pub async fn run(&self, results: &mut Vec<EvaluationResult>) {
345        if self.enrichers.is_empty() || results.is_empty() {
346            return;
347        }
348
349        // Single-pass enrichment with drop bookkeeping.
350        //
351        // We cannot use `Vec::retain_mut` together with `.await` (the
352        // closure must be sync), so we collect drop indices first and
353        // apply them in a single linear pass at the end.
354        let mut drop_indices: Vec<usize> = Vec::new();
355
356        for (idx, result) in results.iter_mut().enumerate() {
357            let permit = self.semaphore.clone().acquire_owned().await.ok();
358            if permit.is_none() {
359                // Semaphore closed (only happens at shutdown). Drain
360                // remaining results unenriched rather than blocking.
361                tracing::debug!("Enrichment semaphore closed, draining remaining results");
362                return;
363            }
364            let _permit = permit.unwrap();
365
366            let mut should_drop = false;
367            for enricher in &self.enrichers {
368                match Self::run_one(enricher.as_ref(), result, self.metrics.as_ref()).await {
369                    EnrichOutcome::Drop => {
370                        should_drop = true;
371                        break;
372                    }
373                    EnrichOutcome::Ok
374                    | EnrichOutcome::Skip
375                    | EnrichOutcome::Null
376                    | EnrichOutcome::Filtered => {}
377                }
378            }
379            if should_drop {
380                drop_indices.push(idx);
381            }
382        }
383
384        if !drop_indices.is_empty() {
385            // Remove from the back so earlier indices stay valid.
386            for idx in drop_indices.into_iter().rev() {
387                results.swap_remove(idx);
388            }
389        }
390    }
391
392    /// Run a single enricher against a single result, applying the
393    /// kind-vs-body filter, scope filter, timeout, and on_error policy.
394    /// Records `rsigma_enrichment_total{enricher_id, kind, status}` and
395    /// `rsigma_enrichment_duration_seconds{enricher_id, kind}` via the
396    /// configured `MetricsHook` for every non-filtered call.
397    async fn run_one(
398        enricher: &dyn Enricher,
399        result: &mut EvaluationResult,
400        metrics: &dyn MetricsHook,
401    ) -> EnrichOutcome {
402        if !enricher.kind().matches(&result.body) {
403            return EnrichOutcome::Filtered;
404        }
405        if !enricher.scope().matches(result) {
406            return EnrichOutcome::Filtered;
407        }
408
409        let inject_field = enricher.inject_field().to_string();
410        let timeout = enricher.timeout();
411        let id = enricher.id().to_string();
412        let kind_label = enricher.kind().as_str();
413        let on_error = enricher.on_error();
414
415        metrics.on_enrichment_queue_depth_change(1);
416        let started = std::time::Instant::now();
417        let outcome = tokio::time::timeout(timeout, enricher.enrich(result)).await;
418        let elapsed = started.elapsed().as_secs_f64();
419        metrics.on_enrichment_queue_depth_change(-1);
420
421        let err = match outcome {
422            Ok(Ok(())) => {
423                metrics.on_enrichment_completed(&id, kind_label, "success", elapsed);
424                return EnrichOutcome::Ok;
425            }
426            Ok(Err(e)) => e,
427            Err(_) => EnrichError {
428                enricher_id: id.clone(),
429                kind: EnrichErrorKind::Timeout,
430            },
431        };
432
433        let is_timeout = matches!(err.kind, EnrichErrorKind::Timeout);
434        match on_error {
435            OnError::Skip => {
436                tracing::warn!(
437                    enricher_id = %id,
438                    kind = %kind_label,
439                    error = %err,
440                    "Enricher failed, skipping"
441                );
442                metrics.on_enrichment_completed(
443                    &id,
444                    kind_label,
445                    if is_timeout { "timeout" } else { "skip" },
446                    elapsed,
447                );
448                EnrichOutcome::Skip
449            }
450            OnError::Null => {
451                tracing::warn!(
452                    enricher_id = %id,
453                    kind = %kind_label,
454                    error = %err,
455                    "Enricher failed, injecting null"
456                );
457                let map = result
458                    .header
459                    .enrichments
460                    .get_or_insert_with(serde_json::Map::new);
461                map.insert(inject_field, serde_json::Value::Null);
462                metrics.on_enrichment_completed(
463                    &id,
464                    kind_label,
465                    if is_timeout { "timeout" } else { "error" },
466                    elapsed,
467                );
468                EnrichOutcome::Null
469            }
470            OnError::Drop => {
471                tracing::warn!(
472                    enricher_id = %id,
473                    kind = %kind_label,
474                    error = %err,
475                    "Enricher failed, dropping result"
476                );
477                metrics.on_enrichment_completed(&id, kind_label, "drop", elapsed);
478                EnrichOutcome::Drop
479            }
480        }
481    }
482}
483
484impl Default for EnrichmentPipeline {
485    fn default() -> Self {
486        Self::new(Vec::new(), 16)
487    }
488}
489
490impl Clone for EnrichmentPipeline {
491    fn clone(&self) -> Self {
492        // Boxes of `dyn Enricher` are not `Clone`, so a true deep clone
493        // is not possible here. The hot-reload path always rebuilds the
494        // pipeline from config rather than cloning, but `Clone` is
495        // useful for tests and for `ArcSwap` adapter code that wants a
496        // throwaway snapshot. Returning an empty pipeline that shares
497        // the metrics hook is the safest behaviour: a misuse degrades
498        // to "no enrichment" rather than panicking or silently
499        // double-counting.
500        Self {
501            enrichers: Vec::new(),
502            semaphore: Arc::clone(&self.semaphore),
503            metrics: Arc::clone(&self.metrics),
504        }
505    }
506}
507
508/// Helper for [`Enricher::enrich`] implementations: write `value` into
509/// `result.header.enrichments` under `inject_field`, allocating the map
510/// if it was previously `None`.
511pub fn inject_enrichment(
512    result: &mut EvaluationResult,
513    inject_field: &str,
514    value: serde_json::Value,
515) {
516    let map = result
517        .header
518        .enrichments
519        .get_or_insert_with(serde_json::Map::new);
520    map.insert(inject_field.to_string(), value);
521}
522
523// ---------------------------------------------------------------------------
524// Bespoke enricher registration
525// ---------------------------------------------------------------------------
526
527/// Factory function signature used by [`register_builtin`].
528///
529/// External crates that ship a bespoke enricher type register a
530/// `Box<dyn Fn(&serde_json::Value) -> Result<Box<dyn Enricher>, String>>`
531/// so the daemon config layer can construct the enricher from its YAML
532/// config block at startup. The `serde_json::Value` argument is the raw
533/// enricher-config block (after `kind` / `id` / `type` fields are
534/// extracted by the loader).
535pub type EnricherFactory =
536    Arc<dyn Fn(&serde_json::Value) -> Result<Box<dyn Enricher>, String> + Send + Sync>;
537
538/// Process-wide registry of bespoke enricher factories keyed by `type`.
539///
540/// External crates call [`register_builtin`] once at startup (typically
541/// in their `lib.rs` via `ctor` or an explicit init function) to wire a
542/// new `type: <name>` value into the daemon's config loader. Generic
543/// primitives (`template`, `lookup`, `http`, `command`) are not in this
544/// registry; they are constructed directly by the loader.
545///
546/// The registry is global and append-only — registering the same name
547/// twice is an error. Concurrent reads are lock-free via [`std::sync::OnceLock`]
548/// at the outer level and a [`std::sync::RwLock`] at the inner level for the
549/// (rare) `register_builtin` writes.
550fn registry() -> &'static std::sync::RwLock<std::collections::HashMap<String, EnricherFactory>> {
551    use std::sync::OnceLock;
552    static REGISTRY: OnceLock<
553        std::sync::RwLock<std::collections::HashMap<String, EnricherFactory>>,
554    > = OnceLock::new();
555    REGISTRY.get_or_init(|| std::sync::RwLock::new(std::collections::HashMap::new()))
556}
557
558/// Register a bespoke enricher factory under `type: <name>`.
559///
560/// Returns an error if `name` is already registered (registration is
561/// process-global and append-only) or if `name` collides with a built-in
562/// primitive type (`template`, `lookup`, `http`, `command`).
563///
564/// External crates call this once at startup before the daemon loads its
565/// config. After config load, the registry is read-only in practice.
566pub fn register_builtin(name: &str, factory: EnricherFactory) -> Result<(), String> {
567    if matches!(name, "template" | "lookup" | "http" | "command") {
568        return Err(format!(
569            "cannot register '{name}': name is reserved for a built-in primitive"
570        ));
571    }
572    let reg = registry();
573    let mut guard = reg
574        .write()
575        .map_err(|_| "enricher registry poisoned".to_string())?;
576    if guard.contains_key(name) {
577        return Err(format!("enricher type '{name}' is already registered"));
578    }
579    guard.insert(name.to_string(), factory);
580    Ok(())
581}
582
583/// Look up a registered bespoke enricher factory by `type` name.
584///
585/// Returns `None` if `name` is not registered. The daemon config loader
586/// uses this to construct bespoke enrichers; missing names are surfaced
587/// to the operator as a clear startup error.
588pub fn lookup_builtin(name: &str) -> Option<EnricherFactory> {
589    let reg = registry();
590    let guard = reg.read().ok()?;
591    guard.get(name).cloned()
592}
593
594/// Clear the bespoke enricher registry. **Test-only**: used by unit tests
595/// that need to register / re-register the same name. Not exposed to
596/// downstream crates.
597#[cfg(test)]
598pub(crate) fn clear_builtin_registry() {
599    if let Ok(mut guard) = registry().write() {
600        guard.clear();
601    }
602}