Skip to main content

rsigma_runtime/enrichment/
mod.rs

1//! Post-evaluation enrichment for the rsigma daemon.
2//!
3//! Enrichment runs in the daemon's sink task, after `Engine::evaluate()` has
4//! produced a [`ProcessResult`](rsigma_eval::ProcessResult) (a flat
5//! `Vec<EvaluationResult>`) and before that result is serialized to a sink.
6//! Each enricher inspects an [`EvaluationResult`], optionally fetches
7//! context (HTTP, command, source cache, pure template), and writes the
8//! result into `result.header.enrichments` under a configured
9//! `inject_field`.
10//!
11//! # Architecture
12//!
13//! A single [`Enricher`] trait covers every primitive (`template`, `lookup`,
14//! `http`, `command`) and any bespoke Rust-coded enrichers. Each enricher
15//! declares an [`EnricherKind`] at config time; the [`EnrichmentPipeline`]
16//! filters results by that declared kind against the
17//! [`EvaluationResult::body`] variant before invoking `enrich()`. There are no
18//! parallel `DetectionEnricher` / `CorrelationEnricher` traits and no separate
19//! context types; enrichers consume `&mut EvaluationResult` directly and
20//! match on `result.body` for kind-specific fields.
21//!
22//! # Concurrency
23//!
24//! Results within a single sink batch are enriched concurrently with bounded
25//! concurrency via a single [`tokio::sync::Semaphore`] owned by the pipeline.
26//! Within a single result the enricher chain runs sequentially (so later
27//! enrichers can depend on earlier ones via `${detection.enrichments.*}` in a
28//! follow-up implementation; for this initial cut the chain is linear and
29//! enrichers are independent).
30//!
31//! # Errors
32//!
33//! Failures are scoped per enricher and do not abort the chain by default;
34//! the per-enricher `on_error` policy ([`OnError`]) decides whether to
35//! `skip` the field, inject a JSON `null` for it, or `drop` the entire
36//! result before serialization. Timeouts are enforced via
37//! [`tokio::time::timeout`] using each enricher's [`Enricher::timeout`].
38
39use std::sync::Arc;
40use std::time::Duration;
41
42use async_trait::async_trait;
43use rsigma_eval::{EvaluationResult, ResultBody};
44use tokio::sync::Semaphore;
45
46use crate::metrics::{MetricsHook, NoopMetrics};
47
48mod command;
49mod http;
50pub mod http_cache;
51mod lookup;
52mod scope;
53mod template;
54#[cfg(test)]
55mod tests;
56
57pub use command::{CommandEnricher, OutputFormat};
58pub use http::{
59    DEFAULT_ENRICHER_MAX_RESPONSE_BYTES, HttpEnricher, HttpEnricherClient,
60    build_default_http_client,
61};
62pub use http_cache::{CacheKey, CacheOutcome, HttpResponseCache};
63pub use lookup::LookupEnricher;
64pub use scope::Scope;
65pub use template::{TemplateEnricher, TemplateError, validate_template_namespace};
66
67/// The kind of [`EvaluationResult`] an [`Enricher`] applies to.
68///
69/// Fixed at config load. Used for two things:
70/// 1. **Template-namespace validation at config load**: a `Detection` enricher
71///    may only reference `${detection.*}`; a `Correlation` enricher may only
72///    reference `${correlation.*}`. Cross-namespace references fail fast at
73///    startup.
74/// 2. **Runtime gating in [`EnrichmentPipeline::run`]**: enrichers whose
75///    declared kind does not match `result.body`'s variant are skipped before
76///    [`Enricher::enrich`] is invoked.
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
78pub enum EnricherKind {
79    /// Applies to detection results ([`ResultBody::Detection`]).
80    Detection,
81    /// Applies to correlation results ([`ResultBody::Correlation`]).
82    Correlation,
83}
84
85impl EnricherKind {
86    /// String label used in metrics, logs, and config errors.
87    pub fn as_str(&self) -> &'static str {
88        match self {
89            EnricherKind::Detection => "detection",
90            EnricherKind::Correlation => "correlation",
91        }
92    }
93
94    /// Returns true if this kind matches the given result body variant.
95    pub fn matches(&self, body: &ResultBody) -> bool {
96        matches!(
97            (self, body),
98            (EnricherKind::Detection, ResultBody::Detection(_))
99                | (EnricherKind::Correlation, ResultBody::Correlation(_))
100        )
101    }
102}
103
104/// Behavior when an enricher fails (timeout, fetch error, parse error, …).
105///
106/// Applied per enricher. Defaults to [`OnError::Skip`] so a single failed
107/// enrichment never breaks the result stream.
108#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
109pub enum OnError {
110    /// Deliver the result unenriched for this field. Default. Logs a warning.
111    #[default]
112    Skip,
113    /// Inject `null` under `inject_field` so downstream consumers see a
114    /// "we tried" marker rather than missing-field ambiguity.
115    Null,
116    /// Suppress the result entirely before serialization. Useful for
117    /// dedup / pre-filter style enrichers that intentionally drop matches
118    /// based on external context.
119    Drop,
120}
121
122/// A typed enrichment failure attributed to a specific enricher.
123#[derive(Debug, Clone)]
124pub struct EnrichError {
125    /// Stable ID of the enricher that produced the error.
126    pub enricher_id: String,
127    /// Categorized failure kind.
128    pub kind: EnrichErrorKind,
129}
130
131impl std::fmt::Display for EnrichError {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        write!(f, "enricher '{}': {}", self.enricher_id, self.kind)
134    }
135}
136
137impl std::error::Error for EnrichError {}
138
139/// Categorized enrichment failure.
140#[derive(Debug, Clone)]
141pub enum EnrichErrorKind {
142    /// The per-enricher timeout elapsed before [`Enricher::enrich`] returned.
143    Timeout,
144    /// External fetch failed (HTTP non-2xx, connection refused, command exit
145    /// status non-zero, etc.).
146    Fetch(String),
147    /// External response could not be parsed (e.g. invalid JSON).
148    Parse(String),
149    /// Extract expression (jq / jsonpath / cel) failed during evaluation.
150    Extract(String),
151    /// Template rendering failed at runtime (missing variable in a strict
152    /// resolver, invalid expansion, …). The config-load-time validator
153    /// catches namespace violations earlier.
154    TemplateRender(String),
155}
156
157impl std::fmt::Display for EnrichErrorKind {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        match self {
160            EnrichErrorKind::Timeout => write!(f, "timeout"),
161            EnrichErrorKind::Fetch(m) => write!(f, "fetch failed: {m}"),
162            EnrichErrorKind::Parse(m) => write!(f, "parse failed: {m}"),
163            EnrichErrorKind::Extract(m) => write!(f, "extract failed: {m}"),
164            EnrichErrorKind::TemplateRender(m) => write!(f, "template render failed: {m}"),
165        }
166    }
167}
168
169/// Trait implemented by every enrichment primitive (`template`, `lookup`,
170/// `http`, `command`) and by any bespoke Rust-coded named enricher
171/// registered via [`register_builtin`].
172///
173/// Implementations read shared rule metadata from [`EvaluationResult::header`]
174/// and dispatch on `result.body` for kind-specific fields. The pipeline
175/// guarantees that `result.body` matches `self.kind()` before calling
176/// `enrich()`, so implementations may rely on the matching variant
177/// (e.g. via [`EvaluationResult::as_detection`] /
178/// [`EvaluationResult::as_correlation`]).
179///
180/// `enrich` is async to accommodate I/O-bound primitives (HTTP, command).
181/// Pure transformations (`template`) still implement `enrich` as `async fn`
182/// even though they perform no I/O.
183#[async_trait]
184pub trait Enricher: Send + Sync {
185    /// The kind of result this enricher applies to. Fixed at config load.
186    fn kind(&self) -> EnricherKind;
187
188    /// Stable identifier for this enricher instance. Used as a metric label
189    /// and in structured log fields. Conventionally something like
190    /// `asset_lookup_det` or `enrich_hash_virustotal`.
191    fn id(&self) -> &str;
192
193    /// Field name under [`RuleHeader::enrichments`](rsigma_eval::RuleHeader::enrichments)
194    /// that this enricher writes to.
195    fn inject_field(&self) -> &str;
196
197    /// Per-enricher timeout. The pipeline wraps each `enrich()` call in
198    /// [`tokio::time::timeout`] using this value. Defaults to 5 seconds.
199    fn timeout(&self) -> Duration {
200        Duration::from_secs(5)
201    }
202
203    /// Optional scope filter. Applied after the kind-vs-body filter and
204    /// before `enrich()` runs. Default is [`Scope::default`] (always fires).
205    fn scope(&self) -> &Scope;
206
207    /// Behavior when this enricher fails (timeout, fetch error, …).
208    /// Defaults to [`OnError::Skip`].
209    fn on_error(&self) -> OnError {
210        OnError::Skip
211    }
212
213    /// Run the enrichment.
214    ///
215    /// Implementations write into [`RuleHeader::enrichments`](rsigma_eval::RuleHeader::enrichments)
216    /// under the configured [`Self::inject_field`]. The pipeline initializes
217    /// the map (`None` → `Some(empty)`) before invoking the first enricher
218    /// for a given result, so implementations can `unwrap` the map safely.
219    async fn enrich(&self, result: &mut EvaluationResult) -> Result<(), EnrichError>;
220}
221
222/// Outcome of running a single enricher against a single result.
223///
224/// Returned internally by [`EnrichmentPipeline::run_one`] so the pipeline
225/// driver can decide whether to drop the entire result, log, or continue.
226enum EnrichOutcome {
227    /// Enricher ran and (possibly) wrote into `enrichments`.
228    Ok,
229    /// Enricher errored or timed out and `on_error: skip` applied.
230    Skip,
231    /// Enricher errored or timed out and `on_error: null` applied; the
232    /// pipeline injected `null` under `inject_field`.
233    Null,
234    /// Enricher errored or timed out and `on_error: drop` applied; the
235    /// pipeline must remove this result from the output vec.
236    Drop,
237    /// Enricher was filtered out (kind or scope mismatch) before running.
238    Filtered,
239}
240
241/// Execution surface for a configured set of enrichers.
242///
243/// One pipeline owns one `Vec<Box<dyn Enricher>>` plus a shared
244/// [`Semaphore`] that bounds the number of in-flight enrichments across
245/// all results in a batch.
246///
247/// The pipeline is constructed by the daemon config layer
248/// (`crates/rsigma-cli/src/daemon/enrichment/config.rs`) and held inside
249/// the daemon's sink task. Each [`ProcessResult`](rsigma_eval::ProcessResult)
250/// (a `Vec<EvaluationResult>`) flows through [`EnrichmentPipeline::run`]
251/// before it is serialized.
252pub struct EnrichmentPipeline {
253    enrichers: Vec<Box<dyn Enricher>>,
254    semaphore: Arc<Semaphore>,
255    metrics: Arc<dyn MetricsHook>,
256}
257
258impl std::fmt::Debug for EnrichmentPipeline {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        f.debug_struct("EnrichmentPipeline")
261            .field("enrichers", &self.enrichers.len())
262            .field("permits", &self.semaphore.available_permits())
263            .finish()
264    }
265}
266
267impl EnrichmentPipeline {
268    /// Build a pipeline from a list of configured enrichers.
269    ///
270    /// `max_concurrent_enrichments` bounds the number of results that can
271    /// be enriched in parallel; defaults to 16 if zero is passed.
272    /// Metrics default to a no-op sink; call [`Self::with_metrics`] to
273    /// route counters and latency histograms into a real
274    /// [`MetricsHook`] implementation.
275    pub fn new(enrichers: Vec<Box<dyn Enricher>>, max_concurrent_enrichments: usize) -> Self {
276        let permits = if max_concurrent_enrichments == 0 {
277            16
278        } else {
279            max_concurrent_enrichments
280        };
281        Self {
282            enrichers,
283            semaphore: Arc::new(Semaphore::new(permits)),
284            metrics: Arc::new(NoopMetrics),
285        }
286    }
287
288    /// Replace the metrics hook this pipeline reports into. The daemon
289    /// passes its Prometheus-backed `Metrics` here; library consumers
290    /// can pass any [`MetricsHook`] implementation.
291    ///
292    /// Pre-registers `(enricher_id, kind)` for every configured
293    /// enricher so `rsigma_enrichment_total{...}` and
294    /// `rsigma_enrichment_duration_seconds{...}` are emitted on
295    /// `/metrics` from the first scrape, even before any enricher has
296    /// fired.
297    pub fn with_metrics(mut self, metrics: Arc<dyn MetricsHook>) -> Self {
298        for enricher in &self.enrichers {
299            metrics.register_enricher(enricher.id(), enricher.kind().as_str());
300        }
301        self.metrics = metrics;
302        self
303    }
304
305    /// Returns true if no enrichers are configured.
306    ///
307    /// The daemon sink task uses this to skip enrichment work entirely
308    /// (no permit acquisition, no per-result loop) when no enrichers are
309    /// configured.
310    pub fn is_empty(&self) -> bool {
311        self.enrichers.is_empty()
312    }
313
314    /// Number of configured enrichers (across both kinds).
315    pub fn len(&self) -> usize {
316        self.enrichers.len()
317    }
318
319    /// Iterate over the configured enrichers (read-only view, used for
320    /// reload-diff logging and tests).
321    pub fn enrichers(&self) -> impl Iterator<Item = &dyn Enricher> {
322        self.enrichers.iter().map(|e| &**e)
323    }
324
325    /// Run every applicable enricher against each result in `results`.
326    ///
327    /// For each result the pipeline:
328    /// 1. Acquires a global semaphore permit (bounded concurrency).
329    /// 2. Iterates the configured enricher list.
330    /// 3. Skips enrichers whose [`EnricherKind`] does not match the
331    ///    result's body variant.
332    /// 4. Skips enrichers whose [`Scope`] excludes this result.
333    /// 5. Wraps each remaining `enrich()` call in
334    ///    [`tokio::time::timeout`] using the enricher's timeout.
335    /// 6. On error, applies the enricher's [`OnError`] policy.
336    /// 7. If any enricher in the chain returns the internal `Drop`
337    ///    outcome (via an enricher whose [`OnError`] policy is set
338    ///    to [`OnError::Drop`]), the result is removed from the
339    ///    output.
340    ///
341    /// The pipeline initializes `result.header.enrichments` to
342    /// `Some(empty)` lazily on first successful injection, so the
343    /// `skip_serializing_if = "Option::is_none"` contract on
344    /// `RuleHeader::enrichments` is preserved when no enricher writes.
345    ///
346    /// Currently sequential per result; concurrent across results would
347    /// require splitting `results` into chunks across futures and is left
348    /// for a follow-up tuning pass once we have realistic throughput
349    /// numbers from the integration tests.
350    pub async fn run(&self, results: &mut Vec<EvaluationResult>) {
351        if self.enrichers.is_empty() || results.is_empty() {
352            return;
353        }
354
355        // Single-pass enrichment with drop bookkeeping.
356        //
357        // We cannot use `Vec::retain_mut` together with `.await` (the
358        // closure must be sync), so we collect drop indices first and
359        // apply them in a single linear pass at the end.
360        let mut drop_indices: Vec<usize> = Vec::new();
361
362        for (idx, result) in results.iter_mut().enumerate() {
363            let permit = self.semaphore.clone().acquire_owned().await.ok();
364            if permit.is_none() {
365                // Semaphore closed (only happens at shutdown). Drain
366                // remaining results unenriched rather than blocking.
367                tracing::debug!("Enrichment semaphore closed, draining remaining results");
368                return;
369            }
370            let _permit = permit.unwrap();
371
372            let mut should_drop = false;
373            for enricher in &self.enrichers {
374                match Self::run_one(enricher.as_ref(), result, self.metrics.as_ref()).await {
375                    EnrichOutcome::Drop => {
376                        should_drop = true;
377                        break;
378                    }
379                    EnrichOutcome::Ok
380                    | EnrichOutcome::Skip
381                    | EnrichOutcome::Null
382                    | EnrichOutcome::Filtered => {}
383                }
384            }
385            if should_drop {
386                drop_indices.push(idx);
387            }
388        }
389
390        if !drop_indices.is_empty() {
391            // Remove from the back so earlier indices stay valid.
392            for idx in drop_indices.into_iter().rev() {
393                results.swap_remove(idx);
394            }
395        }
396    }
397
398    /// Run a single enricher against a single result, applying the
399    /// kind-vs-body filter, scope filter, timeout, and on_error policy.
400    /// Records `rsigma_enrichment_total{enricher_id, kind, status}` and
401    /// `rsigma_enrichment_duration_seconds{enricher_id, kind}` via the
402    /// configured `MetricsHook` for every non-filtered call.
403    async fn run_one(
404        enricher: &dyn Enricher,
405        result: &mut EvaluationResult,
406        metrics: &dyn MetricsHook,
407    ) -> EnrichOutcome {
408        if !enricher.kind().matches(&result.body) {
409            return EnrichOutcome::Filtered;
410        }
411        if !enricher.scope().matches(result) {
412            return EnrichOutcome::Filtered;
413        }
414
415        let inject_field = enricher.inject_field().to_string();
416        let timeout = enricher.timeout();
417        let id = enricher.id().to_string();
418        let kind_label = enricher.kind().as_str();
419        let on_error = enricher.on_error();
420
421        metrics.on_enrichment_queue_depth_change(1);
422        let started = std::time::Instant::now();
423        let outcome = tokio::time::timeout(timeout, enricher.enrich(result)).await;
424        let elapsed = started.elapsed().as_secs_f64();
425        metrics.on_enrichment_queue_depth_change(-1);
426
427        let err = match outcome {
428            Ok(Ok(())) => {
429                metrics.on_enrichment_completed(&id, kind_label, "success", elapsed);
430                return EnrichOutcome::Ok;
431            }
432            Ok(Err(e)) => e,
433            Err(_) => EnrichError {
434                enricher_id: id.clone(),
435                kind: EnrichErrorKind::Timeout,
436            },
437        };
438
439        let is_timeout = matches!(err.kind, EnrichErrorKind::Timeout);
440        match on_error {
441            OnError::Skip => {
442                tracing::warn!(
443                    enricher_id = %id,
444                    kind = %kind_label,
445                    error = %err,
446                    "Enricher failed, skipping"
447                );
448                metrics.on_enrichment_completed(
449                    &id,
450                    kind_label,
451                    if is_timeout { "timeout" } else { "skip" },
452                    elapsed,
453                );
454                EnrichOutcome::Skip
455            }
456            OnError::Null => {
457                tracing::warn!(
458                    enricher_id = %id,
459                    kind = %kind_label,
460                    error = %err,
461                    "Enricher failed, injecting null"
462                );
463                let map = result
464                    .header
465                    .enrichments
466                    .get_or_insert_with(serde_json::Map::new);
467                map.insert(inject_field, serde_json::Value::Null);
468                metrics.on_enrichment_completed(
469                    &id,
470                    kind_label,
471                    if is_timeout { "timeout" } else { "error" },
472                    elapsed,
473                );
474                EnrichOutcome::Null
475            }
476            OnError::Drop => {
477                tracing::warn!(
478                    enricher_id = %id,
479                    kind = %kind_label,
480                    error = %err,
481                    "Enricher failed, dropping result"
482                );
483                metrics.on_enrichment_completed(&id, kind_label, "drop", elapsed);
484                EnrichOutcome::Drop
485            }
486        }
487    }
488}
489
490impl Default for EnrichmentPipeline {
491    fn default() -> Self {
492        Self::new(Vec::new(), 16)
493    }
494}
495
496impl Clone for EnrichmentPipeline {
497    fn clone(&self) -> Self {
498        // Boxes of `dyn Enricher` are not `Clone`, so a true deep clone
499        // is not possible here. The hot-reload path always rebuilds the
500        // pipeline from config rather than cloning, but `Clone` is
501        // useful for tests and for `ArcSwap` adapter code that wants a
502        // throwaway snapshot. Returning an empty pipeline that shares
503        // the metrics hook is the safest behaviour: a misuse degrades
504        // to "no enrichment" rather than panicking or silently
505        // double-counting.
506        Self {
507            enrichers: Vec::new(),
508            semaphore: Arc::clone(&self.semaphore),
509            metrics: Arc::clone(&self.metrics),
510        }
511    }
512}
513
514/// Helper for [`Enricher::enrich`] implementations: write `value` into
515/// `result.header.enrichments` under `inject_field`, allocating the map
516/// if it was previously `None`.
517pub fn inject_enrichment(
518    result: &mut EvaluationResult,
519    inject_field: &str,
520    value: serde_json::Value,
521) {
522    let map = result
523        .header
524        .enrichments
525        .get_or_insert_with(serde_json::Map::new);
526    map.insert(inject_field.to_string(), value);
527}
528
529// ---------------------------------------------------------------------------
530// Bespoke enricher registration
531// ---------------------------------------------------------------------------
532
533/// Factory function signature used by [`register_builtin`].
534///
535/// External crates that ship a bespoke enricher type register a
536/// `Box<dyn Fn(&serde_json::Value) -> Result<Box<dyn Enricher>, String>>`
537/// so the daemon config layer can construct the enricher from its YAML
538/// config block at startup. The `serde_json::Value` argument is the raw
539/// enricher-config block (after `kind` / `id` / `type` fields are
540/// extracted by the loader).
541pub type EnricherFactory =
542    Arc<dyn Fn(&serde_json::Value) -> Result<Box<dyn Enricher>, String> + Send + Sync>;
543
544/// Process-wide registry of bespoke enricher factories keyed by `type`.
545///
546/// External crates call [`register_builtin`] once at startup (typically
547/// in their `lib.rs` via `ctor` or an explicit init function) to wire a
548/// new `type: <name>` value into the daemon's config loader. Generic
549/// primitives (`template`, `lookup`, `http`, `command`) are not in this
550/// registry; they are constructed directly by the loader.
551///
552/// The registry is global and append-only — registering the same name
553/// twice is an error. Concurrent reads are lock-free via [`std::sync::OnceLock`]
554/// at the outer level and a [`std::sync::RwLock`] at the inner level for the
555/// (rare) `register_builtin` writes.
556fn registry() -> &'static std::sync::RwLock<std::collections::HashMap<String, EnricherFactory>> {
557    use std::sync::OnceLock;
558    static REGISTRY: OnceLock<
559        std::sync::RwLock<std::collections::HashMap<String, EnricherFactory>>,
560    > = OnceLock::new();
561    REGISTRY.get_or_init(|| std::sync::RwLock::new(std::collections::HashMap::new()))
562}
563
564/// Register a bespoke enricher factory under `type: <name>`.
565///
566/// Returns an error if `name` is already registered (registration is
567/// process-global and append-only) or if `name` collides with a built-in
568/// primitive type (`template`, `lookup`, `http`, `command`).
569///
570/// External crates call this once at startup before the daemon loads its
571/// config. After config load, the registry is read-only in practice.
572pub fn register_builtin(name: &str, factory: EnricherFactory) -> Result<(), String> {
573    if matches!(name, "template" | "lookup" | "http" | "command") {
574        return Err(format!(
575            "cannot register '{name}': name is reserved for a built-in primitive"
576        ));
577    }
578    let reg = registry();
579    let mut guard = reg
580        .write()
581        .map_err(|_| "enricher registry poisoned".to_string())?;
582    if guard.contains_key(name) {
583        return Err(format!("enricher type '{name}' is already registered"));
584    }
585    guard.insert(name.to_string(), factory);
586    Ok(())
587}
588
589/// Look up a registered bespoke enricher factory by `type` name.
590///
591/// Returns `None` if `name` is not registered. The daemon config loader
592/// uses this to construct bespoke enrichers; missing names are surfaced
593/// to the operator as a clear startup error.
594pub fn lookup_builtin(name: &str) -> Option<EnricherFactory> {
595    let reg = registry();
596    let guard = reg.read().ok()?;
597    guard.get(name).cloned()
598}
599
600/// Clear the bespoke enricher registry. **Test-only**: used by unit tests
601/// that need to register / re-register the same name. Not exposed to
602/// downstream crates.
603#[cfg(test)]
604pub(crate) fn clear_builtin_registry() {
605    if let Ok(mut guard) = registry().write() {
606        guard.clear();
607    }
608}