rsigma_runtime/enrichment/mod.rs
1//! Post-evaluation enrichment for the rsigma daemon.
2//!
3//! Enrichment runs in the daemon's sink task, after `Engine::evaluate()` has
4//! produced a [`ProcessResult`](rsigma_eval::ProcessResult) (a flat
5//! `Vec<EvaluationResult>`) and before that result is serialized to a sink.
6//! Each enricher inspects an [`EvaluationResult`], optionally fetches
7//! context (HTTP, command, source cache, pure template), and writes the
8//! result into `result.header.enrichments` under a configured
9//! `inject_field`.
10//!
11//! # Architecture
12//!
13//! A single [`Enricher`] trait covers every primitive (`template`, `lookup`,
14//! `http`, `command`) and any bespoke Rust-coded enrichers. Each enricher
15//! declares an [`EnricherKind`] at config time; the [`EnrichmentPipeline`]
16//! filters results by that declared kind against the
17//! [`EvaluationResult::body`] variant before invoking `enrich()`. There are no
18//! parallel `DetectionEnricher` / `CorrelationEnricher` traits and no separate
19//! context types; enrichers consume `&mut EvaluationResult` directly and
20//! match on `result.body` for kind-specific fields.
21//!
22//! # Concurrency
23//!
24//! Results within a single sink batch are enriched concurrently with bounded
25//! concurrency via a single [`tokio::sync::Semaphore`] owned by the pipeline.
26//! Within a single result the enricher chain runs sequentially (so later
27//! enrichers can depend on earlier ones via `${detection.enrichments.*}` in a
28//! follow-up implementation; for this initial cut the chain is linear and
29//! enrichers are independent).
30//!
31//! # Errors
32//!
33//! Failures are scoped per enricher and do not abort the chain by default;
34//! the per-enricher `on_error` policy ([`OnError`]) decides whether to
35//! `skip` the field, inject a JSON `null` for it, or `drop` the entire
36//! result before serialization. Timeouts are enforced via
37//! [`tokio::time::timeout`] using each enricher's [`Enricher::timeout`].
38
39use std::sync::Arc;
40use std::time::Duration;
41
42use async_trait::async_trait;
43use rsigma_eval::{EvaluationResult, ResultBody};
44use tokio::sync::Semaphore;
45
46use crate::metrics::{MetricsHook, NoopMetrics};
47
48mod command;
49mod http;
50pub mod http_cache;
51mod lookup;
52mod scope;
53mod template;
54#[cfg(test)]
55mod tests;
56
57pub use command::{CommandEnricher, OutputFormat};
58pub use http::{
59 DEFAULT_ENRICHER_MAX_RESPONSE_BYTES, HttpEnricher, HttpEnricherClient,
60 build_default_http_client,
61};
62pub use http_cache::{CacheKey, CacheOutcome, HttpResponseCache};
63pub use lookup::LookupEnricher;
64pub use scope::Scope;
65pub use template::{TemplateEnricher, TemplateError, validate_template_namespace};
66
67/// The kind of [`EvaluationResult`] an [`Enricher`] applies to.
68///
69/// Fixed at config load. Used for two things:
70/// 1. **Template-namespace validation at config load**: a `Detection` enricher
71/// may only reference `${detection.*}`; a `Correlation` enricher may only
72/// reference `${correlation.*}`. Cross-namespace references fail fast at
73/// startup.
74/// 2. **Runtime gating in [`EnrichmentPipeline::run`]**: enrichers whose
75/// declared kind does not match `result.body`'s variant are skipped before
76/// [`Enricher::enrich`] is invoked.
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
78pub enum EnricherKind {
79 /// Applies to detection results ([`ResultBody::Detection`]).
80 Detection,
81 /// Applies to correlation results ([`ResultBody::Correlation`]).
82 Correlation,
83}
84
85impl EnricherKind {
86 /// String label used in metrics, logs, and config errors.
87 pub fn as_str(&self) -> &'static str {
88 match self {
89 EnricherKind::Detection => "detection",
90 EnricherKind::Correlation => "correlation",
91 }
92 }
93
94 /// Returns true if this kind matches the given result body variant.
95 pub fn matches(&self, body: &ResultBody) -> bool {
96 matches!(
97 (self, body),
98 (EnricherKind::Detection, ResultBody::Detection(_))
99 | (EnricherKind::Correlation, ResultBody::Correlation(_))
100 )
101 }
102}
103
104/// Behavior when an enricher fails (timeout, fetch error, parse error, …).
105///
106/// Applied per enricher. Defaults to [`OnError::Skip`] so a single failed
107/// enrichment never breaks the result stream.
108#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
109pub enum OnError {
110 /// Deliver the result unenriched for this field. Default. Logs a warning.
111 #[default]
112 Skip,
113 /// Inject `null` under `inject_field` so downstream consumers see a
114 /// "we tried" marker rather than missing-field ambiguity.
115 Null,
116 /// Suppress the result entirely before serialization. Useful for
117 /// dedup / pre-filter style enrichers that intentionally drop matches
118 /// based on external context.
119 Drop,
120}
121
122/// A typed enrichment failure attributed to a specific enricher.
123#[derive(Debug, Clone)]
124pub struct EnrichError {
125 /// Stable ID of the enricher that produced the error.
126 pub enricher_id: String,
127 /// Categorized failure kind.
128 pub kind: EnrichErrorKind,
129}
130
131impl std::fmt::Display for EnrichError {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 write!(f, "enricher '{}': {}", self.enricher_id, self.kind)
134 }
135}
136
137impl std::error::Error for EnrichError {}
138
139/// Categorized enrichment failure.
140#[derive(Debug, Clone)]
141pub enum EnrichErrorKind {
142 /// The per-enricher timeout elapsed before [`Enricher::enrich`] returned.
143 Timeout,
144 /// External fetch failed (HTTP non-2xx, connection refused, command exit
145 /// status non-zero, etc.).
146 Fetch(String),
147 /// External response could not be parsed (e.g. invalid JSON).
148 Parse(String),
149 /// Extract expression (jq / jsonpath / cel) failed during evaluation.
150 Extract(String),
151 /// Template rendering failed at runtime (missing variable in a strict
152 /// resolver, invalid expansion, …). The config-load-time validator
153 /// catches namespace violations earlier.
154 TemplateRender(String),
155}
156
157impl std::fmt::Display for EnrichErrorKind {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 match self {
160 EnrichErrorKind::Timeout => write!(f, "timeout"),
161 EnrichErrorKind::Fetch(m) => write!(f, "fetch failed: {m}"),
162 EnrichErrorKind::Parse(m) => write!(f, "parse failed: {m}"),
163 EnrichErrorKind::Extract(m) => write!(f, "extract failed: {m}"),
164 EnrichErrorKind::TemplateRender(m) => write!(f, "template render failed: {m}"),
165 }
166 }
167}
168
169/// Trait implemented by every enrichment primitive (`template`, `lookup`,
170/// `http`, `command`) and by any bespoke Rust-coded named enricher
171/// registered via [`register_builtin`].
172///
173/// Implementations read shared rule metadata from [`EvaluationResult::header`]
174/// and dispatch on `result.body` for kind-specific fields. The pipeline
175/// guarantees that `result.body` matches `self.kind()` before calling
176/// `enrich()`, so implementations may rely on the matching variant
177/// (e.g. via [`EvaluationResult::as_detection`] /
178/// [`EvaluationResult::as_correlation`]).
179///
180/// `enrich` is async to accommodate I/O-bound primitives (HTTP, command).
181/// Pure transformations (`template`) still implement `enrich` as `async fn`
182/// even though they perform no I/O.
183#[async_trait]
184pub trait Enricher: Send + Sync {
185 /// The kind of result this enricher applies to. Fixed at config load.
186 fn kind(&self) -> EnricherKind;
187
188 /// Stable identifier for this enricher instance. Used as a metric label
189 /// and in structured log fields. Conventionally something like
190 /// `asset_lookup_det` or `enrich_hash_virustotal`.
191 fn id(&self) -> &str;
192
193 /// Field name under [`RuleHeader::enrichments`](rsigma_eval::RuleHeader::enrichments)
194 /// that this enricher writes to.
195 fn inject_field(&self) -> &str;
196
197 /// Per-enricher timeout. The pipeline wraps each `enrich()` call in
198 /// [`tokio::time::timeout`] using this value. Defaults to 5 seconds.
199 fn timeout(&self) -> Duration {
200 Duration::from_secs(5)
201 }
202
203 /// Optional scope filter. Applied after the kind-vs-body filter and
204 /// before `enrich()` runs. Default is [`Scope::default`] (always fires).
205 fn scope(&self) -> &Scope;
206
207 /// Behavior when this enricher fails (timeout, fetch error, …).
208 /// Defaults to [`OnError::Skip`].
209 fn on_error(&self) -> OnError {
210 OnError::Skip
211 }
212
213 /// Run the enrichment.
214 ///
215 /// Implementations write into [`RuleHeader::enrichments`](rsigma_eval::RuleHeader::enrichments)
216 /// under the configured [`Self::inject_field`]. The pipeline initializes
217 /// the map (`None` → `Some(empty)`) before invoking the first enricher
218 /// for a given result, so implementations can `unwrap` the map safely.
219 async fn enrich(&self, result: &mut EvaluationResult) -> Result<(), EnrichError>;
220}
221
222/// Outcome of running a single enricher against a single result.
223///
224/// Returned internally by [`EnrichmentPipeline::run_one`] so the pipeline
225/// driver can decide whether to drop the entire result, log, or continue.
226enum EnrichOutcome {
227 /// Enricher ran and (possibly) wrote into `enrichments`.
228 Ok,
229 /// Enricher errored or timed out and `on_error: skip` applied.
230 Skip,
231 /// Enricher errored or timed out and `on_error: null` applied; the
232 /// pipeline injected `null` under `inject_field`.
233 Null,
234 /// Enricher errored or timed out and `on_error: drop` applied; the
235 /// pipeline must remove this result from the output vec.
236 Drop,
237 /// Enricher was filtered out (kind or scope mismatch) before running.
238 Filtered,
239}
240
241/// Execution surface for a configured set of enrichers.
242///
243/// One pipeline owns one `Vec<Box<dyn Enricher>>` plus a shared
244/// [`Semaphore`] that bounds the number of in-flight enrichments across
245/// all results in a batch.
246///
247/// The pipeline is constructed by the daemon config layer
248/// (`crates/rsigma-cli/src/daemon/enrichment/config.rs`) and held inside
249/// the daemon's sink task. Each [`ProcessResult`](rsigma_eval::ProcessResult)
250/// (a `Vec<EvaluationResult>`) flows through [`EnrichmentPipeline::run`]
251/// before it is serialized.
252pub struct EnrichmentPipeline {
253 enrichers: Vec<Box<dyn Enricher>>,
254 semaphore: Arc<Semaphore>,
255 metrics: Arc<dyn MetricsHook>,
256}
257
258impl std::fmt::Debug for EnrichmentPipeline {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 f.debug_struct("EnrichmentPipeline")
261 .field("enrichers", &self.enrichers.len())
262 .field("permits", &self.semaphore.available_permits())
263 .finish()
264 }
265}
266
267impl EnrichmentPipeline {
268 /// Build a pipeline from a list of configured enrichers.
269 ///
270 /// `max_concurrent_enrichments` bounds the number of results that can
271 /// be enriched in parallel; defaults to 16 if zero is passed.
272 /// Metrics default to a no-op sink; call [`Self::with_metrics`] to
273 /// route counters and latency histograms into a real
274 /// [`MetricsHook`] implementation.
275 pub fn new(enrichers: Vec<Box<dyn Enricher>>, max_concurrent_enrichments: usize) -> Self {
276 let permits = if max_concurrent_enrichments == 0 {
277 16
278 } else {
279 max_concurrent_enrichments
280 };
281 Self {
282 enrichers,
283 semaphore: Arc::new(Semaphore::new(permits)),
284 metrics: Arc::new(NoopMetrics),
285 }
286 }
287
288 /// Replace the metrics hook this pipeline reports into. The daemon
289 /// passes its Prometheus-backed `Metrics` here; library consumers
290 /// can pass any [`MetricsHook`] implementation.
291 ///
292 /// Pre-registers `(enricher_id, kind)` for every configured
293 /// enricher so `rsigma_enrichment_total{...}` and
294 /// `rsigma_enrichment_duration_seconds{...}` are emitted on
295 /// `/metrics` from the first scrape, even before any enricher has
296 /// fired.
297 pub fn with_metrics(mut self, metrics: Arc<dyn MetricsHook>) -> Self {
298 for enricher in &self.enrichers {
299 metrics.register_enricher(enricher.id(), enricher.kind().as_str());
300 }
301 self.metrics = metrics;
302 self
303 }
304
305 /// Returns true if no enrichers are configured.
306 ///
307 /// The daemon sink task uses this to skip enrichment work entirely
308 /// (no permit acquisition, no per-result loop) when no enrichers are
309 /// configured.
310 pub fn is_empty(&self) -> bool {
311 self.enrichers.is_empty()
312 }
313
314 /// Number of configured enrichers (across both kinds).
315 pub fn len(&self) -> usize {
316 self.enrichers.len()
317 }
318
319 /// Iterate over the configured enrichers (read-only view, used for
320 /// reload-diff logging and tests).
321 pub fn enrichers(&self) -> impl Iterator<Item = &dyn Enricher> {
322 self.enrichers.iter().map(|e| &**e)
323 }
324
325 /// Run every applicable enricher against each result in `results`.
326 ///
327 /// For each result the pipeline:
328 /// 1. Acquires a global semaphore permit (bounded concurrency).
329 /// 2. Iterates the configured enricher list.
330 /// 3. Skips enrichers whose [`EnricherKind`] does not match the
331 /// result's body variant.
332 /// 4. Skips enrichers whose [`Scope`] excludes this result.
333 /// 5. Wraps each remaining `enrich()` call in
334 /// [`tokio::time::timeout`] using the enricher's timeout.
335 /// 6. On error, applies the enricher's [`OnError`] policy.
336 /// 7. If any enricher in the chain returns the internal `Drop`
337 /// outcome (via an enricher whose [`OnError`] policy is set
338 /// to [`OnError::Drop`]), the result is removed from the
339 /// output.
340 ///
341 /// The pipeline initializes `result.header.enrichments` to
342 /// `Some(empty)` lazily on first successful injection, so the
343 /// `skip_serializing_if = "Option::is_none"` contract on
344 /// `RuleHeader::enrichments` is preserved when no enricher writes.
345 ///
346 /// Currently sequential per result; concurrent across results would
347 /// require splitting `results` into chunks across futures and is left
348 /// for a follow-up tuning pass once we have realistic throughput
349 /// numbers from the integration tests.
350 pub async fn run(&self, results: &mut Vec<EvaluationResult>) {
351 if self.enrichers.is_empty() || results.is_empty() {
352 return;
353 }
354
355 // Single-pass enrichment with drop bookkeeping.
356 //
357 // We cannot use `Vec::retain_mut` together with `.await` (the
358 // closure must be sync), so we collect drop indices first and
359 // apply them in a single linear pass at the end.
360 let mut drop_indices: Vec<usize> = Vec::new();
361
362 for (idx, result) in results.iter_mut().enumerate() {
363 let permit = self.semaphore.clone().acquire_owned().await.ok();
364 if permit.is_none() {
365 // Semaphore closed (only happens at shutdown). Drain
366 // remaining results unenriched rather than blocking.
367 tracing::debug!("Enrichment semaphore closed, draining remaining results");
368 return;
369 }
370 let _permit = permit.unwrap();
371
372 let mut should_drop = false;
373 for enricher in &self.enrichers {
374 match Self::run_one(enricher.as_ref(), result, self.metrics.as_ref()).await {
375 EnrichOutcome::Drop => {
376 should_drop = true;
377 break;
378 }
379 EnrichOutcome::Ok
380 | EnrichOutcome::Skip
381 | EnrichOutcome::Null
382 | EnrichOutcome::Filtered => {}
383 }
384 }
385 if should_drop {
386 drop_indices.push(idx);
387 }
388 }
389
390 if !drop_indices.is_empty() {
391 // Remove from the back so earlier indices stay valid.
392 for idx in drop_indices.into_iter().rev() {
393 results.swap_remove(idx);
394 }
395 }
396 }
397
398 /// Run a single enricher against a single result, applying the
399 /// kind-vs-body filter, scope filter, timeout, and on_error policy.
400 /// Records `rsigma_enrichment_total{enricher_id, kind, status}` and
401 /// `rsigma_enrichment_duration_seconds{enricher_id, kind}` via the
402 /// configured `MetricsHook` for every non-filtered call.
403 async fn run_one(
404 enricher: &dyn Enricher,
405 result: &mut EvaluationResult,
406 metrics: &dyn MetricsHook,
407 ) -> EnrichOutcome {
408 if !enricher.kind().matches(&result.body) {
409 return EnrichOutcome::Filtered;
410 }
411 if !enricher.scope().matches(result) {
412 return EnrichOutcome::Filtered;
413 }
414
415 let inject_field = enricher.inject_field().to_string();
416 let timeout = enricher.timeout();
417 let id = enricher.id().to_string();
418 let kind_label = enricher.kind().as_str();
419 let on_error = enricher.on_error();
420
421 metrics.on_enrichment_queue_depth_change(1);
422 let started = std::time::Instant::now();
423 let outcome = tokio::time::timeout(timeout, enricher.enrich(result)).await;
424 let elapsed = started.elapsed().as_secs_f64();
425 metrics.on_enrichment_queue_depth_change(-1);
426
427 let err = match outcome {
428 Ok(Ok(())) => {
429 metrics.on_enrichment_completed(&id, kind_label, "success", elapsed);
430 return EnrichOutcome::Ok;
431 }
432 Ok(Err(e)) => e,
433 Err(_) => EnrichError {
434 enricher_id: id.clone(),
435 kind: EnrichErrorKind::Timeout,
436 },
437 };
438
439 let is_timeout = matches!(err.kind, EnrichErrorKind::Timeout);
440 match on_error {
441 OnError::Skip => {
442 tracing::warn!(
443 enricher_id = %id,
444 kind = %kind_label,
445 error = %err,
446 "Enricher failed, skipping"
447 );
448 metrics.on_enrichment_completed(
449 &id,
450 kind_label,
451 if is_timeout { "timeout" } else { "skip" },
452 elapsed,
453 );
454 EnrichOutcome::Skip
455 }
456 OnError::Null => {
457 tracing::warn!(
458 enricher_id = %id,
459 kind = %kind_label,
460 error = %err,
461 "Enricher failed, injecting null"
462 );
463 let map = result
464 .header
465 .enrichments
466 .get_or_insert_with(serde_json::Map::new);
467 map.insert(inject_field, serde_json::Value::Null);
468 metrics.on_enrichment_completed(
469 &id,
470 kind_label,
471 if is_timeout { "timeout" } else { "error" },
472 elapsed,
473 );
474 EnrichOutcome::Null
475 }
476 OnError::Drop => {
477 tracing::warn!(
478 enricher_id = %id,
479 kind = %kind_label,
480 error = %err,
481 "Enricher failed, dropping result"
482 );
483 metrics.on_enrichment_completed(&id, kind_label, "drop", elapsed);
484 EnrichOutcome::Drop
485 }
486 }
487 }
488}
489
490impl Default for EnrichmentPipeline {
491 fn default() -> Self {
492 Self::new(Vec::new(), 16)
493 }
494}
495
496impl Clone for EnrichmentPipeline {
497 fn clone(&self) -> Self {
498 // Boxes of `dyn Enricher` are not `Clone`, so a true deep clone
499 // is not possible here. The hot-reload path always rebuilds the
500 // pipeline from config rather than cloning, but `Clone` is
501 // useful for tests and for `ArcSwap` adapter code that wants a
502 // throwaway snapshot. Returning an empty pipeline that shares
503 // the metrics hook is the safest behaviour: a misuse degrades
504 // to "no enrichment" rather than panicking or silently
505 // double-counting.
506 Self {
507 enrichers: Vec::new(),
508 semaphore: Arc::clone(&self.semaphore),
509 metrics: Arc::clone(&self.metrics),
510 }
511 }
512}
513
514/// Helper for [`Enricher::enrich`] implementations: write `value` into
515/// `result.header.enrichments` under `inject_field`, allocating the map
516/// if it was previously `None`.
517pub fn inject_enrichment(
518 result: &mut EvaluationResult,
519 inject_field: &str,
520 value: serde_json::Value,
521) {
522 let map = result
523 .header
524 .enrichments
525 .get_or_insert_with(serde_json::Map::new);
526 map.insert(inject_field.to_string(), value);
527}
528
529// ---------------------------------------------------------------------------
530// Bespoke enricher registration
531// ---------------------------------------------------------------------------
532
533/// Factory function signature used by [`register_builtin`].
534///
535/// External crates that ship a bespoke enricher type register a
536/// `Box<dyn Fn(&serde_json::Value) -> Result<Box<dyn Enricher>, String>>`
537/// so the daemon config layer can construct the enricher from its YAML
538/// config block at startup. The `serde_json::Value` argument is the raw
539/// enricher-config block (after `kind` / `id` / `type` fields are
540/// extracted by the loader).
541pub type EnricherFactory =
542 Arc<dyn Fn(&serde_json::Value) -> Result<Box<dyn Enricher>, String> + Send + Sync>;
543
544/// Process-wide registry of bespoke enricher factories keyed by `type`.
545///
546/// External crates call [`register_builtin`] once at startup (typically
547/// in their `lib.rs` via `ctor` or an explicit init function) to wire a
548/// new `type: <name>` value into the daemon's config loader. Generic
549/// primitives (`template`, `lookup`, `http`, `command`) are not in this
550/// registry; they are constructed directly by the loader.
551///
552/// The registry is global and append-only — registering the same name
553/// twice is an error. Concurrent reads are lock-free via [`std::sync::OnceLock`]
554/// at the outer level and a [`std::sync::RwLock`] at the inner level for the
555/// (rare) `register_builtin` writes.
556fn registry() -> &'static std::sync::RwLock<std::collections::HashMap<String, EnricherFactory>> {
557 use std::sync::OnceLock;
558 static REGISTRY: OnceLock<
559 std::sync::RwLock<std::collections::HashMap<String, EnricherFactory>>,
560 > = OnceLock::new();
561 REGISTRY.get_or_init(|| std::sync::RwLock::new(std::collections::HashMap::new()))
562}
563
564/// Register a bespoke enricher factory under `type: <name>`.
565///
566/// Returns an error if `name` is already registered (registration is
567/// process-global and append-only) or if `name` collides with a built-in
568/// primitive type (`template`, `lookup`, `http`, `command`).
569///
570/// External crates call this once at startup before the daemon loads its
571/// config. After config load, the registry is read-only in practice.
572pub fn register_builtin(name: &str, factory: EnricherFactory) -> Result<(), String> {
573 if matches!(name, "template" | "lookup" | "http" | "command") {
574 return Err(format!(
575 "cannot register '{name}': name is reserved for a built-in primitive"
576 ));
577 }
578 let reg = registry();
579 let mut guard = reg
580 .write()
581 .map_err(|_| "enricher registry poisoned".to_string())?;
582 if guard.contains_key(name) {
583 return Err(format!("enricher type '{name}' is already registered"));
584 }
585 guard.insert(name.to_string(), factory);
586 Ok(())
587}
588
589/// Look up a registered bespoke enricher factory by `type` name.
590///
591/// Returns `None` if `name` is not registered. The daemon config loader
592/// uses this to construct bespoke enrichers; missing names are surfaced
593/// to the operator as a clear startup error.
594pub fn lookup_builtin(name: &str) -> Option<EnricherFactory> {
595 let reg = registry();
596 let guard = reg.read().ok()?;
597 guard.get(name).cloned()
598}
599
600/// Clear the bespoke enricher registry. **Test-only**: used by unit tests
601/// that need to register / re-register the same name. Not exposed to
602/// downstream crates.
603#[cfg(test)]
604pub(crate) fn clear_builtin_registry() {
605 if let Ok(mut guard) = registry().write() {
606 guard.clear();
607 }
608}