rsigma_runtime/enrichment/mod.rs
1//! Post-evaluation enrichment for the rsigma daemon.
2//!
3//! Enrichment runs in the daemon's sink task, after `Engine::evaluate()` has
4//! produced a [`ProcessResult`] (a flat `Vec<EvaluationResult>`) and before
5//! that result is serialized to a sink. Each enricher inspects an
6//! [`EvaluationResult`], optionally fetches context (HTTP, command, source
7//! cache, pure template), and writes the result into
8//! `result.header.enrichments` under a configured `inject_field`.
9//!
10//! # Architecture
11//!
12//! A single [`Enricher`] trait covers every primitive (`template`, `lookup`,
13//! `http`, `command`) and any bespoke Rust-coded enrichers. Each enricher
14//! declares an [`EnricherKind`] at config time; the [`EnrichmentPipeline`]
15//! filters results by that declared kind against the
16//! [`EvaluationResult::body`] variant before invoking `enrich()`. There are no
17//! parallel `DetectionEnricher` / `CorrelationEnricher` traits and no separate
18//! context types; enrichers consume `&mut EvaluationResult` directly and
19//! match on `result.body` for kind-specific fields.
20//!
21//! # Concurrency
22//!
23//! Results within a single sink batch are enriched concurrently with bounded
24//! concurrency via a single [`tokio::sync::Semaphore`] owned by the pipeline.
25//! Within a single result the enricher chain runs sequentially (so later
26//! enrichers can depend on earlier ones via `${detection.enrichments.*}` in a
27//! follow-up implementation; for this initial cut the chain is linear and
28//! enrichers are independent).
29//!
30//! # Errors
31//!
32//! Failures are scoped per enricher and do not abort the chain by default;
33//! the per-enricher `on_error` policy ([`OnError`]) decides whether to
34//! `skip` the field, inject a JSON `null` for it, or `drop` the entire
35//! result before serialization. Timeouts are enforced via
36//! [`tokio::time::timeout`] using each enricher's [`Enricher::timeout`].
37
38use std::sync::Arc;
39use std::time::Duration;
40
41use async_trait::async_trait;
42use rsigma_eval::{EvaluationResult, ResultBody};
43use tokio::sync::Semaphore;
44
45use crate::metrics::{MetricsHook, NoopMetrics};
46
47mod command;
48mod http;
49pub mod http_cache;
50mod lookup;
51mod scope;
52mod template;
53#[cfg(test)]
54mod tests;
55
56pub use command::{CommandEnricher, OutputFormat};
57pub use http::{HttpEnricher, HttpEnricherClient, build_default_http_client};
58pub use http_cache::{CacheKey, CacheOutcome, HttpResponseCache};
59pub use lookup::LookupEnricher;
60pub use scope::Scope;
61pub use template::{TemplateEnricher, TemplateError, validate_template_namespace};
62
63/// The kind of [`EvaluationResult`] an [`Enricher`] applies to.
64///
65/// Fixed at config load. Used for two things:
66/// 1. **Template-namespace validation at config load**: a `Detection` enricher
67/// may only reference `${detection.*}`; a `Correlation` enricher may only
68/// reference `${correlation.*}`. Cross-namespace references fail fast at
69/// startup.
70/// 2. **Runtime gating in [`EnrichmentPipeline::run`]**: enrichers whose
71/// declared kind does not match `result.body`'s variant are skipped before
72/// [`Enricher::enrich`] is invoked.
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
74pub enum EnricherKind {
75 /// Applies to detection results ([`ResultBody::Detection`]).
76 Detection,
77 /// Applies to correlation results ([`ResultBody::Correlation`]).
78 Correlation,
79}
80
81impl EnricherKind {
82 /// String label used in metrics, logs, and config errors.
83 pub fn as_str(&self) -> &'static str {
84 match self {
85 EnricherKind::Detection => "detection",
86 EnricherKind::Correlation => "correlation",
87 }
88 }
89
90 /// Returns true if this kind matches the given result body variant.
91 pub fn matches(&self, body: &ResultBody) -> bool {
92 matches!(
93 (self, body),
94 (EnricherKind::Detection, ResultBody::Detection(_))
95 | (EnricherKind::Correlation, ResultBody::Correlation(_))
96 )
97 }
98}
99
100/// Behavior when an enricher fails (timeout, fetch error, parse error, …).
101///
102/// Applied per enricher. Defaults to [`OnError::Skip`] so a single failed
103/// enrichment never breaks the result stream.
104#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
105pub enum OnError {
106 /// Deliver the result unenriched for this field. Default. Logs a warning.
107 #[default]
108 Skip,
109 /// Inject `null` under `inject_field` so downstream consumers see a
110 /// "we tried" marker rather than missing-field ambiguity.
111 Null,
112 /// Suppress the result entirely before serialization. Useful for
113 /// dedup / pre-filter style enrichers that intentionally drop matches
114 /// based on external context.
115 Drop,
116}
117
118/// A typed enrichment failure attributed to a specific enricher.
119#[derive(Debug, Clone)]
120pub struct EnrichError {
121 /// Stable ID of the enricher that produced the error.
122 pub enricher_id: String,
123 /// Categorized failure kind.
124 pub kind: EnrichErrorKind,
125}
126
127impl std::fmt::Display for EnrichError {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 write!(f, "enricher '{}': {}", self.enricher_id, self.kind)
130 }
131}
132
133impl std::error::Error for EnrichError {}
134
135/// Categorized enrichment failure.
136#[derive(Debug, Clone)]
137pub enum EnrichErrorKind {
138 /// The per-enricher timeout elapsed before [`Enricher::enrich`] returned.
139 Timeout,
140 /// External fetch failed (HTTP non-2xx, connection refused, command exit
141 /// status non-zero, etc.).
142 Fetch(String),
143 /// External response could not be parsed (e.g. invalid JSON).
144 Parse(String),
145 /// Extract expression (jq / jsonpath / cel) failed during evaluation.
146 Extract(String),
147 /// Template rendering failed at runtime (missing variable in a strict
148 /// resolver, invalid expansion, …). The config-load-time validator
149 /// catches namespace violations earlier.
150 TemplateRender(String),
151}
152
153impl std::fmt::Display for EnrichErrorKind {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 match self {
156 EnrichErrorKind::Timeout => write!(f, "timeout"),
157 EnrichErrorKind::Fetch(m) => write!(f, "fetch failed: {m}"),
158 EnrichErrorKind::Parse(m) => write!(f, "parse failed: {m}"),
159 EnrichErrorKind::Extract(m) => write!(f, "extract failed: {m}"),
160 EnrichErrorKind::TemplateRender(m) => write!(f, "template render failed: {m}"),
161 }
162 }
163}
164
165/// Trait implemented by every enrichment primitive (`template`, `lookup`,
166/// `http`, `command`) and by any bespoke Rust-coded named enricher
167/// registered via [`register_builtin`].
168///
169/// Implementations read shared rule metadata from [`EvaluationResult::header`]
170/// and dispatch on `result.body` for kind-specific fields. The pipeline
171/// guarantees that `result.body` matches `self.kind()` before calling
172/// `enrich()`, so implementations may rely on the matching variant
173/// (e.g. via [`EvaluationResult::as_detection`] /
174/// [`EvaluationResult::as_correlation`]).
175///
176/// `enrich` is async to accommodate I/O-bound primitives (HTTP, command).
177/// Pure transformations (`template`) still implement `enrich` as `async fn`
178/// even though they perform no I/O.
179#[async_trait]
180pub trait Enricher: Send + Sync {
181 /// The kind of result this enricher applies to. Fixed at config load.
182 fn kind(&self) -> EnricherKind;
183
184 /// Stable identifier for this enricher instance. Used as a metric label
185 /// and in structured log fields. Conventionally something like
186 /// `asset_lookup_det` or `enrich_hash_virustotal`.
187 fn id(&self) -> &str;
188
189 /// Field name under [`RuleHeader::enrichments`](rsigma_eval::RuleHeader::enrichments)
190 /// that this enricher writes to.
191 fn inject_field(&self) -> &str;
192
193 /// Per-enricher timeout. The pipeline wraps each `enrich()` call in
194 /// [`tokio::time::timeout`] using this value. Defaults to 5 seconds.
195 fn timeout(&self) -> Duration {
196 Duration::from_secs(5)
197 }
198
199 /// Optional scope filter. Applied after the kind-vs-body filter and
200 /// before `enrich()` runs. Default is [`Scope::default`] (always fires).
201 fn scope(&self) -> &Scope;
202
203 /// Behavior when this enricher fails (timeout, fetch error, …).
204 /// Defaults to [`OnError::Skip`].
205 fn on_error(&self) -> OnError {
206 OnError::Skip
207 }
208
209 /// Run the enrichment.
210 ///
211 /// Implementations write into [`RuleHeader::enrichments`](rsigma_eval::RuleHeader::enrichments)
212 /// under the configured [`Self::inject_field`]. The pipeline initializes
213 /// the map (`None` → `Some(empty)`) before invoking the first enricher
214 /// for a given result, so implementations can `unwrap` the map safely.
215 async fn enrich(&self, result: &mut EvaluationResult) -> Result<(), EnrichError>;
216}
217
218/// Outcome of running a single enricher against a single result.
219///
220/// Returned internally by [`EnrichmentPipeline::run_one`] so the pipeline
221/// driver can decide whether to drop the entire result, log, or continue.
222enum EnrichOutcome {
223 /// Enricher ran and (possibly) wrote into `enrichments`.
224 Ok,
225 /// Enricher errored or timed out and `on_error: skip` applied.
226 Skip,
227 /// Enricher errored or timed out and `on_error: null` applied; the
228 /// pipeline injected `null` under `inject_field`.
229 Null,
230 /// Enricher errored or timed out and `on_error: drop` applied; the
231 /// pipeline must remove this result from the output vec.
232 Drop,
233 /// Enricher was filtered out (kind or scope mismatch) before running.
234 Filtered,
235}
236
237/// Execution surface for a configured set of enrichers.
238///
239/// One pipeline owns one `Vec<Box<dyn Enricher>>` plus a shared
240/// [`Semaphore`] that bounds the number of in-flight enrichments across
241/// all results in a batch.
242///
243/// The pipeline is constructed by the daemon config layer
244/// (`crates/rsigma-cli/src/daemon/enrichment/config.rs`) and held inside
245/// the daemon's sink task. Each [`ProcessResult`](rsigma_eval::ProcessResult)
246/// (a `Vec<EvaluationResult>`) flows through [`EnrichmentPipeline::run`]
247/// before it is serialized.
248pub struct EnrichmentPipeline {
249 enrichers: Vec<Box<dyn Enricher>>,
250 semaphore: Arc<Semaphore>,
251 metrics: Arc<dyn MetricsHook>,
252}
253
254impl std::fmt::Debug for EnrichmentPipeline {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 f.debug_struct("EnrichmentPipeline")
257 .field("enrichers", &self.enrichers.len())
258 .field("permits", &self.semaphore.available_permits())
259 .finish()
260 }
261}
262
263impl EnrichmentPipeline {
264 /// Build a pipeline from a list of configured enrichers.
265 ///
266 /// `max_concurrent_enrichments` bounds the number of results that can
267 /// be enriched in parallel; defaults to 16 if zero is passed.
268 /// Metrics default to a no-op sink; call [`Self::with_metrics`] to
269 /// route counters and latency histograms into a real
270 /// [`MetricsHook`] implementation.
271 pub fn new(enrichers: Vec<Box<dyn Enricher>>, max_concurrent_enrichments: usize) -> Self {
272 let permits = if max_concurrent_enrichments == 0 {
273 16
274 } else {
275 max_concurrent_enrichments
276 };
277 Self {
278 enrichers,
279 semaphore: Arc::new(Semaphore::new(permits)),
280 metrics: Arc::new(NoopMetrics),
281 }
282 }
283
284 /// Replace the metrics hook this pipeline reports into. The daemon
285 /// passes its Prometheus-backed `Metrics` here; library consumers
286 /// can pass any [`MetricsHook`] implementation.
287 ///
288 /// Pre-registers `(enricher_id, kind)` for every configured
289 /// enricher so `rsigma_enrichment_total{...}` and
290 /// `rsigma_enrichment_duration_seconds{...}` are emitted on
291 /// `/metrics` from the first scrape, even before any enricher has
292 /// fired.
293 pub fn with_metrics(mut self, metrics: Arc<dyn MetricsHook>) -> Self {
294 for enricher in &self.enrichers {
295 metrics.register_enricher(enricher.id(), enricher.kind().as_str());
296 }
297 self.metrics = metrics;
298 self
299 }
300
301 /// Returns true if no enrichers are configured.
302 ///
303 /// The daemon sink task uses this to skip enrichment work entirely
304 /// (no permit acquisition, no per-result loop) when no enrichers are
305 /// configured.
306 pub fn is_empty(&self) -> bool {
307 self.enrichers.is_empty()
308 }
309
310 /// Number of configured enrichers (across both kinds).
311 pub fn len(&self) -> usize {
312 self.enrichers.len()
313 }
314
315 /// Iterate over the configured enrichers (read-only view, used for
316 /// reload-diff logging and tests).
317 pub fn enrichers(&self) -> impl Iterator<Item = &dyn Enricher> {
318 self.enrichers.iter().map(|e| &**e)
319 }
320
321 /// Run every applicable enricher against each result in `results`.
322 ///
323 /// For each result the pipeline:
324 /// 1. Acquires a global semaphore permit (bounded concurrency).
325 /// 2. Iterates the configured enricher list.
326 /// 3. Skips enrichers whose [`EnricherKind`] does not match the
327 /// result's body variant.
328 /// 4. Skips enrichers whose [`Scope`] excludes this result.
329 /// 5. Wraps each remaining `enrich()` call in
330 /// [`tokio::time::timeout`] using the enricher's timeout.
331 /// 6. On error, applies the enricher's [`OnError`] policy.
332 /// 7. If any enricher in the chain returns
333 /// [`EnrichOutcome::Drop`], the result is removed from the output.
334 ///
335 /// The pipeline initializes `result.header.enrichments` to
336 /// `Some(empty)` lazily on first successful injection, so the
337 /// `skip_serializing_if = "Option::is_none"` contract on
338 /// `RuleHeader::enrichments` is preserved when no enricher writes.
339 ///
340 /// Currently sequential per result; concurrent across results would
341 /// require splitting `results` into chunks across futures and is left
342 /// for a follow-up tuning pass once we have realistic throughput
343 /// numbers from the integration tests.
344 pub async fn run(&self, results: &mut Vec<EvaluationResult>) {
345 if self.enrichers.is_empty() || results.is_empty() {
346 return;
347 }
348
349 // Single-pass enrichment with drop bookkeeping.
350 //
351 // We cannot use `Vec::retain_mut` together with `.await` (the
352 // closure must be sync), so we collect drop indices first and
353 // apply them in a single linear pass at the end.
354 let mut drop_indices: Vec<usize> = Vec::new();
355
356 for (idx, result) in results.iter_mut().enumerate() {
357 let permit = self.semaphore.clone().acquire_owned().await.ok();
358 if permit.is_none() {
359 // Semaphore closed (only happens at shutdown). Drain
360 // remaining results unenriched rather than blocking.
361 tracing::debug!("Enrichment semaphore closed, draining remaining results");
362 return;
363 }
364 let _permit = permit.unwrap();
365
366 let mut should_drop = false;
367 for enricher in &self.enrichers {
368 match Self::run_one(enricher.as_ref(), result, self.metrics.as_ref()).await {
369 EnrichOutcome::Drop => {
370 should_drop = true;
371 break;
372 }
373 EnrichOutcome::Ok
374 | EnrichOutcome::Skip
375 | EnrichOutcome::Null
376 | EnrichOutcome::Filtered => {}
377 }
378 }
379 if should_drop {
380 drop_indices.push(idx);
381 }
382 }
383
384 if !drop_indices.is_empty() {
385 // Remove from the back so earlier indices stay valid.
386 for idx in drop_indices.into_iter().rev() {
387 results.swap_remove(idx);
388 }
389 }
390 }
391
392 /// Run a single enricher against a single result, applying the
393 /// kind-vs-body filter, scope filter, timeout, and on_error policy.
394 /// Records `rsigma_enrichment_total{enricher_id, kind, status}` and
395 /// `rsigma_enrichment_duration_seconds{enricher_id, kind}` via the
396 /// configured `MetricsHook` for every non-filtered call.
397 async fn run_one(
398 enricher: &dyn Enricher,
399 result: &mut EvaluationResult,
400 metrics: &dyn MetricsHook,
401 ) -> EnrichOutcome {
402 if !enricher.kind().matches(&result.body) {
403 return EnrichOutcome::Filtered;
404 }
405 if !enricher.scope().matches(result) {
406 return EnrichOutcome::Filtered;
407 }
408
409 let inject_field = enricher.inject_field().to_string();
410 let timeout = enricher.timeout();
411 let id = enricher.id().to_string();
412 let kind_label = enricher.kind().as_str();
413 let on_error = enricher.on_error();
414
415 metrics.on_enrichment_queue_depth_change(1);
416 let started = std::time::Instant::now();
417 let outcome = tokio::time::timeout(timeout, enricher.enrich(result)).await;
418 let elapsed = started.elapsed().as_secs_f64();
419 metrics.on_enrichment_queue_depth_change(-1);
420
421 let err = match outcome {
422 Ok(Ok(())) => {
423 metrics.on_enrichment_completed(&id, kind_label, "success", elapsed);
424 return EnrichOutcome::Ok;
425 }
426 Ok(Err(e)) => e,
427 Err(_) => EnrichError {
428 enricher_id: id.clone(),
429 kind: EnrichErrorKind::Timeout,
430 },
431 };
432
433 let is_timeout = matches!(err.kind, EnrichErrorKind::Timeout);
434 match on_error {
435 OnError::Skip => {
436 tracing::warn!(
437 enricher_id = %id,
438 kind = %kind_label,
439 error = %err,
440 "Enricher failed, skipping"
441 );
442 metrics.on_enrichment_completed(
443 &id,
444 kind_label,
445 if is_timeout { "timeout" } else { "skip" },
446 elapsed,
447 );
448 EnrichOutcome::Skip
449 }
450 OnError::Null => {
451 tracing::warn!(
452 enricher_id = %id,
453 kind = %kind_label,
454 error = %err,
455 "Enricher failed, injecting null"
456 );
457 let map = result
458 .header
459 .enrichments
460 .get_or_insert_with(serde_json::Map::new);
461 map.insert(inject_field, serde_json::Value::Null);
462 metrics.on_enrichment_completed(
463 &id,
464 kind_label,
465 if is_timeout { "timeout" } else { "error" },
466 elapsed,
467 );
468 EnrichOutcome::Null
469 }
470 OnError::Drop => {
471 tracing::warn!(
472 enricher_id = %id,
473 kind = %kind_label,
474 error = %err,
475 "Enricher failed, dropping result"
476 );
477 metrics.on_enrichment_completed(&id, kind_label, "drop", elapsed);
478 EnrichOutcome::Drop
479 }
480 }
481 }
482}
483
484impl Default for EnrichmentPipeline {
485 fn default() -> Self {
486 Self::new(Vec::new(), 16)
487 }
488}
489
490impl Clone for EnrichmentPipeline {
491 fn clone(&self) -> Self {
492 // Boxes of `dyn Enricher` are not `Clone`, so a true deep clone
493 // is not possible here. The hot-reload path always rebuilds the
494 // pipeline from config rather than cloning, but `Clone` is
495 // useful for tests and for `ArcSwap` adapter code that wants a
496 // throwaway snapshot. Returning an empty pipeline that shares
497 // the metrics hook is the safest behaviour: a misuse degrades
498 // to "no enrichment" rather than panicking or silently
499 // double-counting.
500 Self {
501 enrichers: Vec::new(),
502 semaphore: Arc::clone(&self.semaphore),
503 metrics: Arc::clone(&self.metrics),
504 }
505 }
506}
507
508/// Helper for [`Enricher::enrich`] implementations: write `value` into
509/// `result.header.enrichments` under `inject_field`, allocating the map
510/// if it was previously `None`.
511pub fn inject_enrichment(
512 result: &mut EvaluationResult,
513 inject_field: &str,
514 value: serde_json::Value,
515) {
516 let map = result
517 .header
518 .enrichments
519 .get_or_insert_with(serde_json::Map::new);
520 map.insert(inject_field.to_string(), value);
521}
522
523// ---------------------------------------------------------------------------
524// Bespoke enricher registration
525// ---------------------------------------------------------------------------
526
527/// Factory function signature used by [`register_builtin`].
528///
529/// External crates that ship a bespoke enricher type register a
530/// `Box<dyn Fn(&serde_json::Value) -> Result<Box<dyn Enricher>, String>>`
531/// so the daemon config layer can construct the enricher from its YAML
532/// config block at startup. The `serde_json::Value` argument is the raw
533/// enricher-config block (after `kind` / `id` / `type` fields are
534/// extracted by the loader).
535pub type EnricherFactory =
536 Arc<dyn Fn(&serde_json::Value) -> Result<Box<dyn Enricher>, String> + Send + Sync>;
537
538/// Process-wide registry of bespoke enricher factories keyed by `type`.
539///
540/// External crates call [`register_builtin`] once at startup (typically
541/// in their `lib.rs` via `ctor` or an explicit init function) to wire a
542/// new `type: <name>` value into the daemon's config loader. Generic
543/// primitives (`template`, `lookup`, `http`, `command`) are not in this
544/// registry; they are constructed directly by the loader.
545///
546/// The registry is global and append-only — registering the same name
547/// twice is an error. Concurrent reads are lock-free via [`std::sync::OnceLock`]
548/// at the outer level and a [`std::sync::RwLock`] at the inner level for the
549/// (rare) `register_builtin` writes.
550fn registry() -> &'static std::sync::RwLock<std::collections::HashMap<String, EnricherFactory>> {
551 use std::sync::OnceLock;
552 static REGISTRY: OnceLock<
553 std::sync::RwLock<std::collections::HashMap<String, EnricherFactory>>,
554 > = OnceLock::new();
555 REGISTRY.get_or_init(|| std::sync::RwLock::new(std::collections::HashMap::new()))
556}
557
558/// Register a bespoke enricher factory under `type: <name>`.
559///
560/// Returns an error if `name` is already registered (registration is
561/// process-global and append-only) or if `name` collides with a built-in
562/// primitive type (`template`, `lookup`, `http`, `command`).
563///
564/// External crates call this once at startup before the daemon loads its
565/// config. After config load, the registry is read-only in practice.
566pub fn register_builtin(name: &str, factory: EnricherFactory) -> Result<(), String> {
567 if matches!(name, "template" | "lookup" | "http" | "command") {
568 return Err(format!(
569 "cannot register '{name}': name is reserved for a built-in primitive"
570 ));
571 }
572 let reg = registry();
573 let mut guard = reg
574 .write()
575 .map_err(|_| "enricher registry poisoned".to_string())?;
576 if guard.contains_key(name) {
577 return Err(format!("enricher type '{name}' is already registered"));
578 }
579 guard.insert(name.to_string(), factory);
580 Ok(())
581}
582
583/// Look up a registered bespoke enricher factory by `type` name.
584///
585/// Returns `None` if `name` is not registered. The daemon config loader
586/// uses this to construct bespoke enrichers; missing names are surfaced
587/// to the operator as a clear startup error.
588pub fn lookup_builtin(name: &str) -> Option<EnricherFactory> {
589 let reg = registry();
590 let guard = reg.read().ok()?;
591 guard.get(name).cloned()
592}
593
594/// Clear the bespoke enricher registry. **Test-only**: used by unit tests
595/// that need to register / re-register the same name. Not exposed to
596/// downstream crates.
597#[cfg(test)]
598pub(crate) fn clear_builtin_registry() {
599 if let Ok(mut guard) = registry().write() {
600 guard.clear();
601 }
602}