Skip to main content

rsigma_runtime/enrichment/
mod.rs

1//! Post-evaluation enrichment for the rsigma daemon.
2//!
3//! Enrichment runs in the daemon's sink task, after `Engine::evaluate()` has
4//! produced a [`ProcessResult`](rsigma_eval::ProcessResult) (a flat
5//! `Vec<EvaluationResult>`) and before that result is serialized to a sink.
6//! Each enricher inspects an [`EvaluationResult`], optionally fetches
7//! context (HTTP, command, source cache, pure template), and writes the
8//! result into `result.header.enrichments` under a configured
9//! `inject_field`.
10//!
11//! # Architecture
12//!
13//! A single [`Enricher`] trait covers every primitive (`template`, `lookup`,
14//! `http`, `command`) and any bespoke Rust-coded enrichers. Each enricher
15//! declares an [`EnricherKind`] at config time; the [`EnrichmentPipeline`]
16//! filters results by that declared kind against the
17//! [`EvaluationResult::body`] variant before invoking `enrich()`. There are no
18//! parallel `DetectionEnricher` / `CorrelationEnricher` traits and no separate
19//! context types; enrichers consume `&mut EvaluationResult` directly and
20//! match on `result.body` for kind-specific fields.
21//!
22//! # Concurrency
23//!
24//! Results within a single sink batch are enriched concurrently with bounded
25//! concurrency via a single [`tokio::sync::Semaphore`] owned by the pipeline.
26//! Within a single result the enricher chain runs sequentially (so later
27//! enrichers can depend on earlier ones via `${detection.enrichments.*}` in a
28//! follow-up implementation; for this initial cut the chain is linear and
29//! enrichers are independent).
30//!
31//! # Errors
32//!
33//! Failures are scoped per enricher and do not abort the chain by default;
34//! the per-enricher `on_error` policy ([`OnError`]) decides whether to
35//! `skip` the field, inject a JSON `null` for it, or `drop` the entire
36//! result before serialization. Timeouts are enforced via
37//! [`tokio::time::timeout`] using each enricher's [`Enricher::timeout`].
38
39use std::sync::Arc;
40use std::time::Duration;
41
42use async_trait::async_trait;
43use rsigma_eval::{EvaluationResult, ResultBody};
44use tokio::sync::Semaphore;
45
46use crate::metrics::{MetricsHook, NoopMetrics};
47
48mod command;
49pub mod config;
50mod http;
51pub mod http_cache;
52mod lookup;
53mod template;
54#[cfg(test)]
55mod tests;
56
57pub use crate::scope::Scope;
58pub use command::{CommandEnricher, OutputFormat};
59pub use http::{
60    DEFAULT_ENRICHER_MAX_RESPONSE_BYTES, HttpEnricher, HttpEnricherClient,
61    build_default_http_client,
62};
63pub use http_cache::{CacheKey, CacheOutcome, HttpResponseCache};
64pub use lookup::LookupEnricher;
65pub use template::{
66    TemplateEnricher, TemplateError, render_template, render_template_json,
67    validate_template_namespace,
68};
69
70/// The kind of [`EvaluationResult`] an [`Enricher`] applies to.
71///
72/// Fixed at config load. Used for two things:
73/// 1. **Template-namespace validation at config load**: a `Detection` enricher
74///    may only reference `${detection.*}`; a `Correlation` enricher may only
75///    reference `${correlation.*}`. Cross-namespace references fail fast at
76///    startup.
77/// 2. **Runtime gating in [`EnrichmentPipeline::run`]**: enrichers whose
78///    declared kind does not match `result.body`'s variant are skipped before
79///    [`Enricher::enrich`] is invoked.
80#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
81pub enum EnricherKind {
82    /// Applies to detection results ([`ResultBody::Detection`]).
83    Detection,
84    /// Applies to correlation results ([`ResultBody::Correlation`]).
85    Correlation,
86}
87
88impl EnricherKind {
89    /// String label used in metrics, logs, and config errors.
90    pub fn as_str(&self) -> &'static str {
91        match self {
92            EnricherKind::Detection => "detection",
93            EnricherKind::Correlation => "correlation",
94        }
95    }
96
97    /// Returns true if this kind matches the given result body variant.
98    pub fn matches(&self, body: &ResultBody) -> bool {
99        matches!(
100            (self, body),
101            (EnricherKind::Detection, ResultBody::Detection(_))
102                | (EnricherKind::Correlation, ResultBody::Correlation(_))
103        )
104    }
105}
106
107/// Behavior when an enricher fails (timeout, fetch error, parse error, …).
108///
109/// Applied per enricher. Defaults to [`OnError::Skip`] so a single failed
110/// enrichment never breaks the result stream.
111#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
112pub enum OnError {
113    /// Deliver the result unenriched for this field. Default. Logs a warning.
114    #[default]
115    Skip,
116    /// Inject `null` under `inject_field` so downstream consumers see a
117    /// "we tried" marker rather than missing-field ambiguity.
118    Null,
119    /// Suppress the result entirely before serialization. Useful for
120    /// dedup / pre-filter style enrichers that intentionally drop matches
121    /// based on external context.
122    Drop,
123}
124
125/// A typed enrichment failure attributed to a specific enricher.
126#[derive(Debug, Clone)]
127pub struct EnrichError {
128    /// Stable ID of the enricher that produced the error.
129    pub enricher_id: String,
130    /// Categorized failure kind.
131    pub kind: EnrichErrorKind,
132}
133
134impl std::fmt::Display for EnrichError {
135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136        write!(f, "enricher '{}': {}", self.enricher_id, self.kind)
137    }
138}
139
140impl std::error::Error for EnrichError {}
141
142/// Categorized enrichment failure.
143#[derive(Debug, Clone)]
144pub enum EnrichErrorKind {
145    /// The per-enricher timeout elapsed before [`Enricher::enrich`] returned.
146    Timeout,
147    /// External fetch failed (HTTP non-2xx, connection refused, command exit
148    /// status non-zero, etc.).
149    Fetch(String),
150    /// External response could not be parsed (e.g. invalid JSON).
151    Parse(String),
152    /// Extract expression (jq / jsonpath / cel) failed during evaluation.
153    Extract(String),
154    /// Template rendering failed at runtime (missing variable in a strict
155    /// resolver, invalid expansion, …). The config-load-time validator
156    /// catches namespace violations earlier.
157    TemplateRender(String),
158}
159
160impl std::fmt::Display for EnrichErrorKind {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        match self {
163            EnrichErrorKind::Timeout => write!(f, "timeout"),
164            EnrichErrorKind::Fetch(m) => write!(f, "fetch failed: {m}"),
165            EnrichErrorKind::Parse(m) => write!(f, "parse failed: {m}"),
166            EnrichErrorKind::Extract(m) => write!(f, "extract failed: {m}"),
167            EnrichErrorKind::TemplateRender(m) => write!(f, "template render failed: {m}"),
168        }
169    }
170}
171
172/// Trait implemented by every enrichment primitive (`template`, `lookup`,
173/// `http`, `command`) and by any bespoke Rust-coded named enricher
174/// registered via [`register_builtin`].
175///
176/// Implementations read shared rule metadata from [`EvaluationResult::header`]
177/// and dispatch on `result.body` for kind-specific fields. The pipeline
178/// guarantees that `result.body` matches `self.kind()` before calling
179/// `enrich()`, so implementations may rely on the matching variant
180/// (e.g. via [`EvaluationResult::as_detection`] /
181/// [`EvaluationResult::as_correlation`]).
182///
183/// `enrich` is async to accommodate I/O-bound primitives (HTTP, command).
184/// Pure transformations (`template`) still implement `enrich` as `async fn`
185/// even though they perform no I/O.
186#[async_trait]
187pub trait Enricher: Send + Sync {
188    /// The kind of result this enricher applies to. Fixed at config load.
189    fn kind(&self) -> EnricherKind;
190
191    /// Stable identifier for this enricher instance. Used as a metric label
192    /// and in structured log fields. Conventionally something like
193    /// `asset_lookup_det` or `enrich_hash_virustotal`.
194    fn id(&self) -> &str;
195
196    /// Field name under [`RuleHeader::enrichments`](rsigma_eval::RuleHeader::enrichments)
197    /// that this enricher writes to.
198    fn inject_field(&self) -> &str;
199
200    /// Per-enricher timeout. The pipeline wraps each `enrich()` call in
201    /// [`tokio::time::timeout`] using this value. Defaults to 5 seconds.
202    fn timeout(&self) -> Duration {
203        Duration::from_secs(5)
204    }
205
206    /// Optional scope filter. Applied after the kind-vs-body filter and
207    /// before `enrich()` runs. Default is [`Scope::default`] (always fires).
208    fn scope(&self) -> &Scope;
209
210    /// Behavior when this enricher fails (timeout, fetch error, …).
211    /// Defaults to [`OnError::Skip`].
212    fn on_error(&self) -> OnError {
213        OnError::Skip
214    }
215
216    /// Run the enrichment.
217    ///
218    /// Implementations write into [`RuleHeader::enrichments`](rsigma_eval::RuleHeader::enrichments)
219    /// under the configured [`Self::inject_field`]. The pipeline initializes
220    /// the map (`None` → `Some(empty)`) before invoking the first enricher
221    /// for a given result, so implementations can `unwrap` the map safely.
222    async fn enrich(&self, result: &mut EvaluationResult) -> Result<(), EnrichError>;
223}
224
225/// Outcome of running a single enricher against a single result.
226///
227/// Returned internally by [`EnrichmentPipeline::run_one`] so the pipeline
228/// driver can decide whether to drop the entire result, log, or continue.
229enum EnrichOutcome {
230    /// Enricher ran and (possibly) wrote into `enrichments`.
231    Ok,
232    /// Enricher errored or timed out and `on_error: skip` applied.
233    Skip,
234    /// Enricher errored or timed out and `on_error: null` applied; the
235    /// pipeline injected `null` under `inject_field`.
236    Null,
237    /// Enricher errored or timed out and `on_error: drop` applied; the
238    /// pipeline must remove this result from the output vec.
239    Drop,
240    /// Enricher was filtered out (kind or scope mismatch) before running.
241    Filtered,
242}
243
244/// Execution surface for a configured set of enrichers.
245///
246/// One pipeline owns one `Vec<Box<dyn Enricher>>` plus a shared
247/// [`Semaphore`] that bounds the number of in-flight enrichments across
248/// all results in a batch.
249///
250/// The pipeline is constructed by the daemon config layer
251/// (`crates/rsigma-cli/src/daemon/enrichment/config.rs`) and held inside
252/// the daemon's sink task. Each [`ProcessResult`](rsigma_eval::ProcessResult)
253/// (a `Vec<EvaluationResult>`) flows through [`EnrichmentPipeline::run`]
254/// before it is serialized.
255pub struct EnrichmentPipeline {
256    enrichers: Vec<Box<dyn Enricher>>,
257    semaphore: Arc<Semaphore>,
258    metrics: Arc<dyn MetricsHook>,
259}
260
261impl std::fmt::Debug for EnrichmentPipeline {
262    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263        f.debug_struct("EnrichmentPipeline")
264            .field("enrichers", &self.enrichers.len())
265            .field("permits", &self.semaphore.available_permits())
266            .finish()
267    }
268}
269
270impl EnrichmentPipeline {
271    /// Build a pipeline from a list of configured enrichers.
272    ///
273    /// `max_concurrent_enrichments` bounds the number of results that can
274    /// be enriched in parallel; defaults to 16 if zero is passed.
275    /// Metrics default to a no-op sink; call [`Self::with_metrics`] to
276    /// route counters and latency histograms into a real
277    /// [`MetricsHook`] implementation.
278    pub fn new(enrichers: Vec<Box<dyn Enricher>>, max_concurrent_enrichments: usize) -> Self {
279        let permits = if max_concurrent_enrichments == 0 {
280            16
281        } else {
282            max_concurrent_enrichments
283        };
284        Self {
285            enrichers,
286            semaphore: Arc::new(Semaphore::new(permits)),
287            metrics: Arc::new(NoopMetrics),
288        }
289    }
290
291    /// Replace the metrics hook this pipeline reports into. The daemon
292    /// passes its Prometheus-backed `Metrics` here; library consumers
293    /// can pass any [`MetricsHook`] implementation.
294    ///
295    /// Pre-registers `(enricher_id, kind)` for every configured
296    /// enricher so `rsigma_enrichment_total{...}` and
297    /// `rsigma_enrichment_duration_seconds{...}` are emitted on
298    /// `/metrics` from the first scrape, even before any enricher has
299    /// fired.
300    pub fn with_metrics(mut self, metrics: Arc<dyn MetricsHook>) -> Self {
301        for enricher in &self.enrichers {
302            metrics.register_enricher(enricher.id(), enricher.kind().as_str());
303        }
304        self.metrics = metrics;
305        self
306    }
307
308    /// Returns true if no enrichers are configured.
309    ///
310    /// The daemon sink task uses this to skip enrichment work entirely
311    /// (no permit acquisition, no per-result loop) when no enrichers are
312    /// configured.
313    pub fn is_empty(&self) -> bool {
314        self.enrichers.is_empty()
315    }
316
317    /// Number of configured enrichers (across both kinds).
318    pub fn len(&self) -> usize {
319        self.enrichers.len()
320    }
321
322    /// Iterate over the configured enrichers (read-only view, used for
323    /// reload-diff logging and tests).
324    pub fn enrichers(&self) -> impl Iterator<Item = &dyn Enricher> {
325        self.enrichers.iter().map(|e| &**e)
326    }
327
328    /// Run every applicable enricher against each result in `results`.
329    ///
330    /// For each result the pipeline:
331    /// 1. Acquires a global semaphore permit (bounded concurrency).
332    /// 2. Iterates the configured enricher list.
333    /// 3. Skips enrichers whose [`EnricherKind`] does not match the
334    ///    result's body variant.
335    /// 4. Skips enrichers whose [`Scope`] excludes this result.
336    /// 5. Wraps each remaining `enrich()` call in
337    ///    [`tokio::time::timeout`] using the enricher's timeout.
338    /// 6. On error, applies the enricher's [`OnError`] policy.
339    /// 7. If any enricher in the chain returns the internal `Drop`
340    ///    outcome (via an enricher whose [`OnError`] policy is set
341    ///    to [`OnError::Drop`]), the result is removed from the
342    ///    output.
343    ///
344    /// The pipeline initializes `result.header.enrichments` to
345    /// `Some(empty)` lazily on first successful injection, so the
346    /// `skip_serializing_if = "Option::is_none"` contract on
347    /// `RuleHeader::enrichments` is preserved when no enricher writes.
348    ///
349    /// Currently sequential per result; concurrent across results would
350    /// require splitting `results` into chunks across futures and is left
351    /// for a follow-up tuning pass once we have realistic throughput
352    /// numbers from the integration tests.
353    pub async fn run(&self, results: &mut Vec<EvaluationResult>) {
354        if self.enrichers.is_empty() || results.is_empty() {
355            return;
356        }
357
358        // Single-pass enrichment with drop bookkeeping.
359        //
360        // We cannot use `Vec::retain_mut` together with `.await` (the
361        // closure must be sync), so we collect drop indices first and
362        // apply them in a single linear pass at the end.
363        let mut drop_indices: Vec<usize> = Vec::new();
364
365        for (idx, result) in results.iter_mut().enumerate() {
366            let permit = self.semaphore.clone().acquire_owned().await.ok();
367            if permit.is_none() {
368                // Semaphore closed (only happens at shutdown). Drain
369                // remaining results unenriched rather than blocking.
370                tracing::debug!("Enrichment semaphore closed, draining remaining results");
371                return;
372            }
373            let _permit = permit.unwrap();
374
375            let mut should_drop = false;
376            for enricher in &self.enrichers {
377                match Self::run_one(enricher.as_ref(), result, self.metrics.as_ref()).await {
378                    EnrichOutcome::Drop => {
379                        should_drop = true;
380                        break;
381                    }
382                    EnrichOutcome::Ok
383                    | EnrichOutcome::Skip
384                    | EnrichOutcome::Null
385                    | EnrichOutcome::Filtered => {}
386                }
387            }
388            if should_drop {
389                drop_indices.push(idx);
390            }
391        }
392
393        if !drop_indices.is_empty() {
394            // Remove from the back so earlier indices stay valid.
395            for idx in drop_indices.into_iter().rev() {
396                results.swap_remove(idx);
397            }
398        }
399    }
400
401    /// Run a single enricher against a single result, applying the
402    /// kind-vs-body filter, scope filter, timeout, and on_error policy.
403    /// Records `rsigma_enrichment_total{enricher_id, kind, status}` and
404    /// `rsigma_enrichment_duration_seconds{enricher_id, kind}` via the
405    /// configured `MetricsHook` for every non-filtered call.
406    async fn run_one(
407        enricher: &dyn Enricher,
408        result: &mut EvaluationResult,
409        metrics: &dyn MetricsHook,
410    ) -> EnrichOutcome {
411        if !enricher.kind().matches(&result.body) {
412            return EnrichOutcome::Filtered;
413        }
414        if !enricher.scope().matches(result) {
415            return EnrichOutcome::Filtered;
416        }
417
418        let inject_field = enricher.inject_field().to_string();
419        let timeout = enricher.timeout();
420        let id = enricher.id().to_string();
421        let kind_label = enricher.kind().as_str();
422        let on_error = enricher.on_error();
423
424        metrics.on_enrichment_queue_depth_change(1);
425        let started = std::time::Instant::now();
426        let outcome = tokio::time::timeout(timeout, enricher.enrich(result)).await;
427        let elapsed = started.elapsed().as_secs_f64();
428        metrics.on_enrichment_queue_depth_change(-1);
429
430        let err = match outcome {
431            Ok(Ok(())) => {
432                metrics.on_enrichment_completed(&id, kind_label, "success", elapsed);
433                return EnrichOutcome::Ok;
434            }
435            Ok(Err(e)) => e,
436            Err(_) => EnrichError {
437                enricher_id: id.clone(),
438                kind: EnrichErrorKind::Timeout,
439            },
440        };
441
442        let is_timeout = matches!(err.kind, EnrichErrorKind::Timeout);
443        match on_error {
444            OnError::Skip => {
445                tracing::warn!(
446                    enricher_id = %id,
447                    kind = %kind_label,
448                    error = %err,
449                    "Enricher failed, skipping"
450                );
451                metrics.on_enrichment_completed(
452                    &id,
453                    kind_label,
454                    if is_timeout { "timeout" } else { "skip" },
455                    elapsed,
456                );
457                EnrichOutcome::Skip
458            }
459            OnError::Null => {
460                tracing::warn!(
461                    enricher_id = %id,
462                    kind = %kind_label,
463                    error = %err,
464                    "Enricher failed, injecting null"
465                );
466                let map = result
467                    .header
468                    .enrichments
469                    .get_or_insert_with(serde_json::Map::new);
470                map.insert(inject_field, serde_json::Value::Null);
471                metrics.on_enrichment_completed(
472                    &id,
473                    kind_label,
474                    if is_timeout { "timeout" } else { "error" },
475                    elapsed,
476                );
477                EnrichOutcome::Null
478            }
479            OnError::Drop => {
480                tracing::warn!(
481                    enricher_id = %id,
482                    kind = %kind_label,
483                    error = %err,
484                    "Enricher failed, dropping result"
485                );
486                metrics.on_enrichment_completed(&id, kind_label, "drop", elapsed);
487                EnrichOutcome::Drop
488            }
489        }
490    }
491}
492
493impl Default for EnrichmentPipeline {
494    fn default() -> Self {
495        Self::new(Vec::new(), 16)
496    }
497}
498
499impl Clone for EnrichmentPipeline {
500    fn clone(&self) -> Self {
501        // Boxes of `dyn Enricher` are not `Clone`, so a true deep clone
502        // is not possible here. The hot-reload path always rebuilds the
503        // pipeline from config rather than cloning, but `Clone` is
504        // useful for tests and for `ArcSwap` adapter code that wants a
505        // throwaway snapshot. Returning an empty pipeline that shares
506        // the metrics hook is the safest behaviour: a misuse degrades
507        // to "no enrichment" rather than panicking or silently
508        // double-counting.
509        Self {
510            enrichers: Vec::new(),
511            semaphore: Arc::clone(&self.semaphore),
512            metrics: Arc::clone(&self.metrics),
513        }
514    }
515}
516
517/// Helper for [`Enricher::enrich`] implementations: write `value` into
518/// `result.header.enrichments` under `inject_field`, allocating the map
519/// if it was previously `None`.
520pub fn inject_enrichment(
521    result: &mut EvaluationResult,
522    inject_field: &str,
523    value: serde_json::Value,
524) {
525    let map = result
526        .header
527        .enrichments
528        .get_or_insert_with(serde_json::Map::new);
529    map.insert(inject_field.to_string(), value);
530}
531
532// ---------------------------------------------------------------------------
533// Bespoke enricher registration
534// ---------------------------------------------------------------------------
535
536/// Factory function signature used by [`register_builtin`].
537///
538/// External crates that ship a bespoke enricher type register a
539/// `Box<dyn Fn(&serde_json::Value) -> Result<Box<dyn Enricher>, String>>`
540/// so the daemon config layer can construct the enricher from its YAML
541/// config block at startup. The `serde_json::Value` argument is the raw
542/// enricher-config block (after `kind` / `id` / `type` fields are
543/// extracted by the loader).
544pub type EnricherFactory =
545    Arc<dyn Fn(&serde_json::Value) -> Result<Box<dyn Enricher>, String> + Send + Sync>;
546
547/// Process-wide registry of bespoke enricher factories keyed by `type`.
548///
549/// External crates call [`register_builtin`] once at startup (typically
550/// in their `lib.rs` via `ctor` or an explicit init function) to wire a
551/// new `type: <name>` value into the daemon's config loader. Generic
552/// primitives (`template`, `lookup`, `http`, `command`) are not in this
553/// registry; they are constructed directly by the loader.
554///
555/// The registry is global and append-only — registering the same name
556/// twice is an error. Concurrent reads are lock-free via [`std::sync::OnceLock`]
557/// at the outer level and a [`std::sync::RwLock`] at the inner level for the
558/// (rare) `register_builtin` writes.
559fn registry() -> &'static std::sync::RwLock<std::collections::HashMap<String, EnricherFactory>> {
560    use std::sync::OnceLock;
561    static REGISTRY: OnceLock<
562        std::sync::RwLock<std::collections::HashMap<String, EnricherFactory>>,
563    > = OnceLock::new();
564    REGISTRY.get_or_init(|| std::sync::RwLock::new(std::collections::HashMap::new()))
565}
566
567/// Register a bespoke enricher factory under `type: <name>`.
568///
569/// Returns an error if `name` is already registered (registration is
570/// process-global and append-only) or if `name` collides with a built-in
571/// primitive type (`template`, `lookup`, `http`, `command`).
572///
573/// External crates call this once at startup before the daemon loads its
574/// config. After config load, the registry is read-only in practice.
575pub fn register_builtin(name: &str, factory: EnricherFactory) -> Result<(), String> {
576    if matches!(name, "template" | "lookup" | "http" | "command") {
577        return Err(format!(
578            "cannot register '{name}': name is reserved for a built-in primitive"
579        ));
580    }
581    let reg = registry();
582    let mut guard = reg
583        .write()
584        .map_err(|_| "enricher registry poisoned".to_string())?;
585    if guard.contains_key(name) {
586        return Err(format!("enricher type '{name}' is already registered"));
587    }
588    guard.insert(name.to_string(), factory);
589    Ok(())
590}
591
592/// Look up a registered bespoke enricher factory by `type` name.
593///
594/// Returns `None` if `name` is not registered. The daemon config loader
595/// uses this to construct bespoke enrichers; missing names are surfaced
596/// to the operator as a clear startup error.
597pub fn lookup_builtin(name: &str) -> Option<EnricherFactory> {
598    let reg = registry();
599    let guard = reg.read().ok()?;
600    guard.get(name).cloned()
601}
602
603/// Clear the bespoke enricher registry. **Test-only**: used by unit tests
604/// that need to register / re-register the same name. Not exposed to
605/// downstream crates.
606#[cfg(test)]
607pub(crate) fn clear_builtin_registry() {
608    if let Ok(mut guard) = registry().write() {
609        guard.clear();
610    }
611}