rsigma_runtime/enrichment/mod.rs
1//! Post-evaluation enrichment for the rsigma daemon.
2//!
3//! Enrichment runs in the daemon's sink task, after `Engine::evaluate()` has
4//! produced a [`ProcessResult`](rsigma_eval::ProcessResult) (a flat
5//! `Vec<EvaluationResult>`) and before that result is serialized to a sink.
6//! Each enricher inspects an [`EvaluationResult`], optionally fetches
7//! context (HTTP, command, source cache, pure template), and writes the
8//! result into `result.header.enrichments` under a configured
9//! `inject_field`.
10//!
11//! # Architecture
12//!
13//! A single [`Enricher`] trait covers every primitive (`template`, `lookup`,
14//! `http`, `command`) and any bespoke Rust-coded enrichers. Each enricher
15//! declares an [`EnricherKind`] at config time; the [`EnrichmentPipeline`]
16//! filters results by that declared kind against the
17//! [`EvaluationResult::body`] variant before invoking `enrich()`. There are no
18//! parallel `DetectionEnricher` / `CorrelationEnricher` traits and no separate
19//! context types; enrichers consume `&mut EvaluationResult` directly and
20//! match on `result.body` for kind-specific fields.
21//!
22//! # Concurrency
23//!
24//! Results within a single sink batch are enriched concurrently with bounded
25//! concurrency via a single [`tokio::sync::Semaphore`] owned by the pipeline.
26//! Within a single result the enricher chain runs sequentially (so later
27//! enrichers can depend on earlier ones via `${detection.enrichments.*}` in a
28//! follow-up implementation; for this initial cut the chain is linear and
29//! enrichers are independent).
30//!
31//! # Errors
32//!
33//! Failures are scoped per enricher and do not abort the chain by default;
34//! the per-enricher `on_error` policy ([`OnError`]) decides whether to
35//! `skip` the field, inject a JSON `null` for it, or `drop` the entire
36//! result before serialization. Timeouts are enforced via
37//! [`tokio::time::timeout`] using each enricher's [`Enricher::timeout`].
38
39use std::sync::Arc;
40use std::time::Duration;
41
42use async_trait::async_trait;
43use rsigma_eval::{EvaluationResult, ResultBody};
44use tokio::sync::Semaphore;
45
46use crate::metrics::{MetricsHook, NoopMetrics};
47
48mod command;
49pub mod config;
50mod http;
51pub mod http_cache;
52mod lookup;
53mod template;
54#[cfg(test)]
55mod tests;
56
57pub use crate::scope::Scope;
58pub use command::{CommandEnricher, OutputFormat};
59pub use http::{
60 DEFAULT_ENRICHER_MAX_RESPONSE_BYTES, HttpEnricher, HttpEnricherClient,
61 build_default_http_client,
62};
63pub use http_cache::{CacheKey, CacheOutcome, HttpResponseCache};
64pub use lookup::LookupEnricher;
65pub use template::{
66 TemplateEnricher, TemplateError, render_template, render_template_json,
67 validate_template_namespace,
68};
69
70/// The kind of [`EvaluationResult`] an [`Enricher`] applies to.
71///
72/// Fixed at config load. Used for two things:
73/// 1. **Template-namespace validation at config load**: a `Detection` enricher
74/// may only reference `${detection.*}`; a `Correlation` enricher may only
75/// reference `${correlation.*}`. Cross-namespace references fail fast at
76/// startup.
77/// 2. **Runtime gating in [`EnrichmentPipeline::run`]**: enrichers whose
78/// declared kind does not match `result.body`'s variant are skipped before
79/// [`Enricher::enrich`] is invoked.
80#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
81pub enum EnricherKind {
82 /// Applies to detection results ([`ResultBody::Detection`]).
83 Detection,
84 /// Applies to correlation results ([`ResultBody::Correlation`]).
85 Correlation,
86}
87
88impl EnricherKind {
89 /// String label used in metrics, logs, and config errors.
90 pub fn as_str(&self) -> &'static str {
91 match self {
92 EnricherKind::Detection => "detection",
93 EnricherKind::Correlation => "correlation",
94 }
95 }
96
97 /// Returns true if this kind matches the given result body variant.
98 pub fn matches(&self, body: &ResultBody) -> bool {
99 matches!(
100 (self, body),
101 (EnricherKind::Detection, ResultBody::Detection(_))
102 | (EnricherKind::Correlation, ResultBody::Correlation(_))
103 )
104 }
105}
106
107/// Behavior when an enricher fails (timeout, fetch error, parse error, …).
108///
109/// Applied per enricher. Defaults to [`OnError::Skip`] so a single failed
110/// enrichment never breaks the result stream.
111#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
112pub enum OnError {
113 /// Deliver the result unenriched for this field. Default. Logs a warning.
114 #[default]
115 Skip,
116 /// Inject `null` under `inject_field` so downstream consumers see a
117 /// "we tried" marker rather than missing-field ambiguity.
118 Null,
119 /// Suppress the result entirely before serialization. Useful for
120 /// dedup / pre-filter style enrichers that intentionally drop matches
121 /// based on external context.
122 Drop,
123}
124
125/// A typed enrichment failure attributed to a specific enricher.
126#[derive(Debug, Clone)]
127pub struct EnrichError {
128 /// Stable ID of the enricher that produced the error.
129 pub enricher_id: String,
130 /// Categorized failure kind.
131 pub kind: EnrichErrorKind,
132}
133
134impl std::fmt::Display for EnrichError {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 write!(f, "enricher '{}': {}", self.enricher_id, self.kind)
137 }
138}
139
140impl std::error::Error for EnrichError {}
141
142/// Categorized enrichment failure.
143#[derive(Debug, Clone)]
144pub enum EnrichErrorKind {
145 /// The per-enricher timeout elapsed before [`Enricher::enrich`] returned.
146 Timeout,
147 /// External fetch failed (HTTP non-2xx, connection refused, command exit
148 /// status non-zero, etc.).
149 Fetch(String),
150 /// External response could not be parsed (e.g. invalid JSON).
151 Parse(String),
152 /// Extract expression (jq / jsonpath / cel) failed during evaluation.
153 Extract(String),
154 /// Template rendering failed at runtime (missing variable in a strict
155 /// resolver, invalid expansion, …). The config-load-time validator
156 /// catches namespace violations earlier.
157 TemplateRender(String),
158}
159
160impl std::fmt::Display for EnrichErrorKind {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 match self {
163 EnrichErrorKind::Timeout => write!(f, "timeout"),
164 EnrichErrorKind::Fetch(m) => write!(f, "fetch failed: {m}"),
165 EnrichErrorKind::Parse(m) => write!(f, "parse failed: {m}"),
166 EnrichErrorKind::Extract(m) => write!(f, "extract failed: {m}"),
167 EnrichErrorKind::TemplateRender(m) => write!(f, "template render failed: {m}"),
168 }
169 }
170}
171
172/// Trait implemented by every enrichment primitive (`template`, `lookup`,
173/// `http`, `command`) and by any bespoke Rust-coded named enricher
174/// registered via [`register_builtin`].
175///
176/// Implementations read shared rule metadata from [`EvaluationResult::header`]
177/// and dispatch on `result.body` for kind-specific fields. The pipeline
178/// guarantees that `result.body` matches `self.kind()` before calling
179/// `enrich()`, so implementations may rely on the matching variant
180/// (e.g. via [`EvaluationResult::as_detection`] /
181/// [`EvaluationResult::as_correlation`]).
182///
183/// `enrich` is async to accommodate I/O-bound primitives (HTTP, command).
184/// Pure transformations (`template`) still implement `enrich` as `async fn`
185/// even though they perform no I/O.
186#[async_trait]
187pub trait Enricher: Send + Sync {
188 /// The kind of result this enricher applies to. Fixed at config load.
189 fn kind(&self) -> EnricherKind;
190
191 /// Stable identifier for this enricher instance. Used as a metric label
192 /// and in structured log fields. Conventionally something like
193 /// `asset_lookup_det` or `enrich_hash_virustotal`.
194 fn id(&self) -> &str;
195
196 /// Field name under [`RuleHeader::enrichments`](rsigma_eval::RuleHeader::enrichments)
197 /// that this enricher writes to.
198 fn inject_field(&self) -> &str;
199
200 /// Per-enricher timeout. The pipeline wraps each `enrich()` call in
201 /// [`tokio::time::timeout`] using this value. Defaults to 5 seconds.
202 fn timeout(&self) -> Duration {
203 Duration::from_secs(5)
204 }
205
206 /// Optional scope filter. Applied after the kind-vs-body filter and
207 /// before `enrich()` runs. Default is [`Scope::default`] (always fires).
208 fn scope(&self) -> &Scope;
209
210 /// Behavior when this enricher fails (timeout, fetch error, …).
211 /// Defaults to [`OnError::Skip`].
212 fn on_error(&self) -> OnError {
213 OnError::Skip
214 }
215
216 /// Run the enrichment.
217 ///
218 /// Implementations write into [`RuleHeader::enrichments`](rsigma_eval::RuleHeader::enrichments)
219 /// under the configured [`Self::inject_field`]. The pipeline initializes
220 /// the map (`None` → `Some(empty)`) before invoking the first enricher
221 /// for a given result, so implementations can `unwrap` the map safely.
222 async fn enrich(&self, result: &mut EvaluationResult) -> Result<(), EnrichError>;
223}
224
225/// Outcome of running a single enricher against a single result.
226///
227/// Returned internally by [`EnrichmentPipeline::run_one`] so the pipeline
228/// driver can decide whether to drop the entire result, log, or continue.
229enum EnrichOutcome {
230 /// Enricher ran and (possibly) wrote into `enrichments`.
231 Ok,
232 /// Enricher errored or timed out and `on_error: skip` applied.
233 Skip,
234 /// Enricher errored or timed out and `on_error: null` applied; the
235 /// pipeline injected `null` under `inject_field`.
236 Null,
237 /// Enricher errored or timed out and `on_error: drop` applied; the
238 /// pipeline must remove this result from the output vec.
239 Drop,
240 /// Enricher was filtered out (kind or scope mismatch) before running.
241 Filtered,
242}
243
244/// Execution surface for a configured set of enrichers.
245///
246/// One pipeline owns one `Vec<Box<dyn Enricher>>` plus a shared
247/// [`Semaphore`] that bounds the number of in-flight enrichments across
248/// all results in a batch.
249///
250/// The pipeline is constructed by the daemon config layer
251/// (`crates/rsigma-cli/src/daemon/enrichment/config.rs`) and held inside
252/// the daemon's sink task. Each [`ProcessResult`](rsigma_eval::ProcessResult)
253/// (a `Vec<EvaluationResult>`) flows through [`EnrichmentPipeline::run`]
254/// before it is serialized.
255pub struct EnrichmentPipeline {
256 enrichers: Vec<Box<dyn Enricher>>,
257 semaphore: Arc<Semaphore>,
258 metrics: Arc<dyn MetricsHook>,
259}
260
261impl std::fmt::Debug for EnrichmentPipeline {
262 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263 f.debug_struct("EnrichmentPipeline")
264 .field("enrichers", &self.enrichers.len())
265 .field("permits", &self.semaphore.available_permits())
266 .finish()
267 }
268}
269
270impl EnrichmentPipeline {
271 /// Build a pipeline from a list of configured enrichers.
272 ///
273 /// `max_concurrent_enrichments` bounds the number of results that can
274 /// be enriched in parallel; defaults to 16 if zero is passed.
275 /// Metrics default to a no-op sink; call [`Self::with_metrics`] to
276 /// route counters and latency histograms into a real
277 /// [`MetricsHook`] implementation.
278 pub fn new(enrichers: Vec<Box<dyn Enricher>>, max_concurrent_enrichments: usize) -> Self {
279 let permits = if max_concurrent_enrichments == 0 {
280 16
281 } else {
282 max_concurrent_enrichments
283 };
284 Self {
285 enrichers,
286 semaphore: Arc::new(Semaphore::new(permits)),
287 metrics: Arc::new(NoopMetrics),
288 }
289 }
290
291 /// Replace the metrics hook this pipeline reports into. The daemon
292 /// passes its Prometheus-backed `Metrics` here; library consumers
293 /// can pass any [`MetricsHook`] implementation.
294 ///
295 /// Pre-registers `(enricher_id, kind)` for every configured
296 /// enricher so `rsigma_enrichment_total{...}` and
297 /// `rsigma_enrichment_duration_seconds{...}` are emitted on
298 /// `/metrics` from the first scrape, even before any enricher has
299 /// fired.
300 pub fn with_metrics(mut self, metrics: Arc<dyn MetricsHook>) -> Self {
301 for enricher in &self.enrichers {
302 metrics.register_enricher(enricher.id(), enricher.kind().as_str());
303 }
304 self.metrics = metrics;
305 self
306 }
307
308 /// Returns true if no enrichers are configured.
309 ///
310 /// The daemon sink task uses this to skip enrichment work entirely
311 /// (no permit acquisition, no per-result loop) when no enrichers are
312 /// configured.
313 pub fn is_empty(&self) -> bool {
314 self.enrichers.is_empty()
315 }
316
317 /// Number of configured enrichers (across both kinds).
318 pub fn len(&self) -> usize {
319 self.enrichers.len()
320 }
321
322 /// Iterate over the configured enrichers (read-only view, used for
323 /// reload-diff logging and tests).
324 pub fn enrichers(&self) -> impl Iterator<Item = &dyn Enricher> {
325 self.enrichers.iter().map(|e| &**e)
326 }
327
328 /// Run every applicable enricher against each result in `results`.
329 ///
330 /// For each result the pipeline:
331 /// 1. Acquires a global semaphore permit (bounded concurrency).
332 /// 2. Iterates the configured enricher list.
333 /// 3. Skips enrichers whose [`EnricherKind`] does not match the
334 /// result's body variant.
335 /// 4. Skips enrichers whose [`Scope`] excludes this result.
336 /// 5. Wraps each remaining `enrich()` call in
337 /// [`tokio::time::timeout`] using the enricher's timeout.
338 /// 6. On error, applies the enricher's [`OnError`] policy.
339 /// 7. If any enricher in the chain returns the internal `Drop`
340 /// outcome (via an enricher whose [`OnError`] policy is set
341 /// to [`OnError::Drop`]), the result is removed from the
342 /// output.
343 ///
344 /// The pipeline initializes `result.header.enrichments` to
345 /// `Some(empty)` lazily on first successful injection, so the
346 /// `skip_serializing_if = "Option::is_none"` contract on
347 /// `RuleHeader::enrichments` is preserved when no enricher writes.
348 ///
349 /// Currently sequential per result; concurrent across results would
350 /// require splitting `results` into chunks across futures and is left
351 /// for a follow-up tuning pass once we have realistic throughput
352 /// numbers from the integration tests.
353 pub async fn run(&self, results: &mut Vec<EvaluationResult>) {
354 if self.enrichers.is_empty() || results.is_empty() {
355 return;
356 }
357
358 // Single-pass enrichment with drop bookkeeping.
359 //
360 // We cannot use `Vec::retain_mut` together with `.await` (the
361 // closure must be sync), so we collect drop indices first and
362 // apply them in a single linear pass at the end.
363 let mut drop_indices: Vec<usize> = Vec::new();
364
365 for (idx, result) in results.iter_mut().enumerate() {
366 let permit = self.semaphore.clone().acquire_owned().await.ok();
367 if permit.is_none() {
368 // Semaphore closed (only happens at shutdown). Drain
369 // remaining results unenriched rather than blocking.
370 tracing::debug!("Enrichment semaphore closed, draining remaining results");
371 return;
372 }
373 let _permit = permit.unwrap();
374
375 let mut should_drop = false;
376 for enricher in &self.enrichers {
377 match Self::run_one(enricher.as_ref(), result, self.metrics.as_ref()).await {
378 EnrichOutcome::Drop => {
379 should_drop = true;
380 break;
381 }
382 EnrichOutcome::Ok
383 | EnrichOutcome::Skip
384 | EnrichOutcome::Null
385 | EnrichOutcome::Filtered => {}
386 }
387 }
388 if should_drop {
389 drop_indices.push(idx);
390 }
391 }
392
393 if !drop_indices.is_empty() {
394 // Remove from the back so earlier indices stay valid.
395 for idx in drop_indices.into_iter().rev() {
396 results.swap_remove(idx);
397 }
398 }
399 }
400
401 /// Run a single enricher against a single result, applying the
402 /// kind-vs-body filter, scope filter, timeout, and on_error policy.
403 /// Records `rsigma_enrichment_total{enricher_id, kind, status}` and
404 /// `rsigma_enrichment_duration_seconds{enricher_id, kind}` via the
405 /// configured `MetricsHook` for every non-filtered call.
406 async fn run_one(
407 enricher: &dyn Enricher,
408 result: &mut EvaluationResult,
409 metrics: &dyn MetricsHook,
410 ) -> EnrichOutcome {
411 if !enricher.kind().matches(&result.body) {
412 return EnrichOutcome::Filtered;
413 }
414 if !enricher.scope().matches(result) {
415 return EnrichOutcome::Filtered;
416 }
417
418 let inject_field = enricher.inject_field().to_string();
419 let timeout = enricher.timeout();
420 let id = enricher.id().to_string();
421 let kind_label = enricher.kind().as_str();
422 let on_error = enricher.on_error();
423
424 metrics.on_enrichment_queue_depth_change(1);
425 let started = std::time::Instant::now();
426 let outcome = tokio::time::timeout(timeout, enricher.enrich(result)).await;
427 let elapsed = started.elapsed().as_secs_f64();
428 metrics.on_enrichment_queue_depth_change(-1);
429
430 let err = match outcome {
431 Ok(Ok(())) => {
432 metrics.on_enrichment_completed(&id, kind_label, "success", elapsed);
433 return EnrichOutcome::Ok;
434 }
435 Ok(Err(e)) => e,
436 Err(_) => EnrichError {
437 enricher_id: id.clone(),
438 kind: EnrichErrorKind::Timeout,
439 },
440 };
441
442 let is_timeout = matches!(err.kind, EnrichErrorKind::Timeout);
443 match on_error {
444 OnError::Skip => {
445 tracing::warn!(
446 enricher_id = %id,
447 kind = %kind_label,
448 error = %err,
449 "Enricher failed, skipping"
450 );
451 metrics.on_enrichment_completed(
452 &id,
453 kind_label,
454 if is_timeout { "timeout" } else { "skip" },
455 elapsed,
456 );
457 EnrichOutcome::Skip
458 }
459 OnError::Null => {
460 tracing::warn!(
461 enricher_id = %id,
462 kind = %kind_label,
463 error = %err,
464 "Enricher failed, injecting null"
465 );
466 let map = result
467 .header
468 .enrichments
469 .get_or_insert_with(serde_json::Map::new);
470 map.insert(inject_field, serde_json::Value::Null);
471 metrics.on_enrichment_completed(
472 &id,
473 kind_label,
474 if is_timeout { "timeout" } else { "error" },
475 elapsed,
476 );
477 EnrichOutcome::Null
478 }
479 OnError::Drop => {
480 tracing::warn!(
481 enricher_id = %id,
482 kind = %kind_label,
483 error = %err,
484 "Enricher failed, dropping result"
485 );
486 metrics.on_enrichment_completed(&id, kind_label, "drop", elapsed);
487 EnrichOutcome::Drop
488 }
489 }
490 }
491}
492
493impl Default for EnrichmentPipeline {
494 fn default() -> Self {
495 Self::new(Vec::new(), 16)
496 }
497}
498
499impl Clone for EnrichmentPipeline {
500 fn clone(&self) -> Self {
501 // Boxes of `dyn Enricher` are not `Clone`, so a true deep clone
502 // is not possible here. The hot-reload path always rebuilds the
503 // pipeline from config rather than cloning, but `Clone` is
504 // useful for tests and for `ArcSwap` adapter code that wants a
505 // throwaway snapshot. Returning an empty pipeline that shares
506 // the metrics hook is the safest behaviour: a misuse degrades
507 // to "no enrichment" rather than panicking or silently
508 // double-counting.
509 Self {
510 enrichers: Vec::new(),
511 semaphore: Arc::clone(&self.semaphore),
512 metrics: Arc::clone(&self.metrics),
513 }
514 }
515}
516
517/// Helper for [`Enricher::enrich`] implementations: write `value` into
518/// `result.header.enrichments` under `inject_field`, allocating the map
519/// if it was previously `None`.
520pub fn inject_enrichment(
521 result: &mut EvaluationResult,
522 inject_field: &str,
523 value: serde_json::Value,
524) {
525 let map = result
526 .header
527 .enrichments
528 .get_or_insert_with(serde_json::Map::new);
529 map.insert(inject_field.to_string(), value);
530}
531
532// ---------------------------------------------------------------------------
533// Bespoke enricher registration
534// ---------------------------------------------------------------------------
535
536/// Factory function signature used by [`register_builtin`].
537///
538/// External crates that ship a bespoke enricher type register a
539/// `Box<dyn Fn(&serde_json::Value) -> Result<Box<dyn Enricher>, String>>`
540/// so the daemon config layer can construct the enricher from its YAML
541/// config block at startup. The `serde_json::Value` argument is the raw
542/// enricher-config block (after `kind` / `id` / `type` fields are
543/// extracted by the loader).
544pub type EnricherFactory =
545 Arc<dyn Fn(&serde_json::Value) -> Result<Box<dyn Enricher>, String> + Send + Sync>;
546
547/// Process-wide registry of bespoke enricher factories keyed by `type`.
548///
549/// External crates call [`register_builtin`] once at startup (typically
550/// in their `lib.rs` via `ctor` or an explicit init function) to wire a
551/// new `type: <name>` value into the daemon's config loader. Generic
552/// primitives (`template`, `lookup`, `http`, `command`) are not in this
553/// registry; they are constructed directly by the loader.
554///
555/// The registry is global and append-only — registering the same name
556/// twice is an error. Concurrent reads are lock-free via [`std::sync::OnceLock`]
557/// at the outer level and a [`std::sync::RwLock`] at the inner level for the
558/// (rare) `register_builtin` writes.
559fn registry() -> &'static std::sync::RwLock<std::collections::HashMap<String, EnricherFactory>> {
560 use std::sync::OnceLock;
561 static REGISTRY: OnceLock<
562 std::sync::RwLock<std::collections::HashMap<String, EnricherFactory>>,
563 > = OnceLock::new();
564 REGISTRY.get_or_init(|| std::sync::RwLock::new(std::collections::HashMap::new()))
565}
566
567/// Register a bespoke enricher factory under `type: <name>`.
568///
569/// Returns an error if `name` is already registered (registration is
570/// process-global and append-only) or if `name` collides with a built-in
571/// primitive type (`template`, `lookup`, `http`, `command`).
572///
573/// External crates call this once at startup before the daemon loads its
574/// config. After config load, the registry is read-only in practice.
575pub fn register_builtin(name: &str, factory: EnricherFactory) -> Result<(), String> {
576 if matches!(name, "template" | "lookup" | "http" | "command") {
577 return Err(format!(
578 "cannot register '{name}': name is reserved for a built-in primitive"
579 ));
580 }
581 let reg = registry();
582 let mut guard = reg
583 .write()
584 .map_err(|_| "enricher registry poisoned".to_string())?;
585 if guard.contains_key(name) {
586 return Err(format!("enricher type '{name}' is already registered"));
587 }
588 guard.insert(name.to_string(), factory);
589 Ok(())
590}
591
592/// Look up a registered bespoke enricher factory by `type` name.
593///
594/// Returns `None` if `name` is not registered. The daemon config loader
595/// uses this to construct bespoke enrichers; missing names are surfaced
596/// to the operator as a clear startup error.
597pub fn lookup_builtin(name: &str) -> Option<EnricherFactory> {
598 let reg = registry();
599 let guard = reg.read().ok()?;
600 guard.get(name).cloned()
601}
602
603/// Clear the bespoke enricher registry. **Test-only**: used by unit tests
604/// that need to register / re-register the same name. Not exposed to
605/// downstream crates.
606#[cfg(test)]
607pub(crate) fn clear_builtin_registry() {
608 if let Ok(mut guard) = registry().write() {
609 guard.clear();
610 }
611}