use std::collections::HashMap;
use std::path::Path;
use std::time::Duration;
#[cfg(test)]
use rsigma_runtime::NoopMetrics;
use rsigma_runtime::{
CommandEnricher, EnricherKind, EnrichmentPipeline, HttpEnricher, HttpEnricherClient,
HttpResponseCache, LookupEnricher, MetricsHook, OnError, OutputFormat, Scope, SourceCache,
TemplateEnricher, build_default_http_client, lookup_builtin, validate_template_namespace,
};
use serde::Deserialize;
const DEFAULT_ENRICHER_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_MAX_CONCURRENT_ENRICHMENTS: usize = 16;
#[derive(Debug, Clone, Deserialize)]
pub struct EnrichersFile {
#[serde(default)]
pub max_concurrent_enrichments: Option<usize>,
#[serde(default)]
pub enrichers: Vec<EnricherConfig>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct EnricherConfig {
pub id: String,
pub kind: KindLabel,
#[serde(rename = "type")]
pub type_name: String,
pub inject_field: String,
#[serde(default, with = "humantime_opt")]
pub timeout: Option<Duration>,
#[serde(default)]
pub on_error: OnErrorLabel,
#[serde(default)]
pub scope: Option<ScopeConfig>,
#[serde(default)]
pub template: Option<String>,
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub method: Option<String>,
#[serde(default)]
pub headers: HashMap<String, String>,
#[serde(default)]
pub body: Option<String>,
#[serde(default, with = "humantime_opt")]
pub cache_ttl: Option<Duration>,
#[serde(default)]
pub extract: Option<String>,
#[serde(default)]
pub extract_type: Option<String>,
#[serde(default)]
pub command: Vec<String>,
#[serde(default)]
pub env: HashMap<String, String>,
#[serde(default)]
pub output: OutputFormatLabel,
#[serde(default)]
pub source: Option<String>,
#[serde(default)]
pub default: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Copy, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum KindLabel {
Detection,
Correlation,
}
impl From<KindLabel> for EnricherKind {
fn from(k: KindLabel) -> Self {
match k {
KindLabel::Detection => EnricherKind::Detection,
KindLabel::Correlation => EnricherKind::Correlation,
}
}
}
#[derive(Debug, Clone, Copy, Default, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum OnErrorLabel {
#[default]
Skip,
Null,
Drop,
}
impl From<OnErrorLabel> for OnError {
fn from(o: OnErrorLabel) -> Self {
match o {
OnErrorLabel::Skip => OnError::Skip,
OnErrorLabel::Null => OnError::Null,
OnErrorLabel::Drop => OnError::Drop,
}
}
}
#[derive(Debug, Clone, Copy, Default, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum OutputFormatLabel {
#[default]
Json,
Raw,
}
impl From<OutputFormatLabel> for OutputFormat {
fn from(o: OutputFormatLabel) -> Self {
match o {
OutputFormatLabel::Json => OutputFormat::Json,
OutputFormatLabel::Raw => OutputFormat::Raw,
}
}
}
#[derive(Debug, Clone, Default, Deserialize)]
pub struct ScopeConfig {
#[serde(default)]
pub rules: Vec<String>,
#[serde(default)]
pub tags: Vec<String>,
#[serde(default)]
pub levels: Vec<String>,
}
#[derive(Debug)]
pub enum EnrichersConfigError {
Io(std::io::Error, std::path::PathBuf),
Yaml(yaml_serde::Error),
UnknownType {
enricher_id: String,
type_name: String,
},
MissingField {
enricher_id: String,
type_name: String,
field: &'static str,
},
Template(rsigma_runtime::TemplateError),
Scope {
enricher_id: String,
message: String,
},
BespokeFactory {
enricher_id: String,
message: String,
},
}
impl std::fmt::Display for EnrichersConfigError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EnrichersConfigError::Io(e, p) => {
write!(f, "failed to read enrichers config '{}': {e}", p.display())
}
EnrichersConfigError::Yaml(e) => write!(f, "invalid enrichers YAML: {e}"),
EnrichersConfigError::UnknownType {
enricher_id,
type_name,
} => write!(
f,
"enricher '{enricher_id}': unknown type '{type_name}' (built-ins: template, lookup, http, command; bespoke types must register_builtin() before daemon start)"
),
EnrichersConfigError::MissingField {
enricher_id,
type_name,
field,
} => write!(
f,
"enricher '{enricher_id}' (type: {type_name}): missing required field '{field}'"
),
EnrichersConfigError::Template(e) => write!(f, "{e}"),
EnrichersConfigError::Scope {
enricher_id,
message,
} => write!(f, "enricher '{enricher_id}': {message}"),
EnrichersConfigError::BespokeFactory {
enricher_id,
message,
} => write!(
f,
"enricher '{enricher_id}': bespoke factory rejected config: {message}"
),
}
}
}
impl std::error::Error for EnrichersConfigError {}
pub fn load_enrichers_file(path: &Path) -> Result<EnrichersFile, EnrichersConfigError> {
let text = std::fs::read_to_string(path)
.map_err(|e| EnrichersConfigError::Io(e, path.to_path_buf()))?;
let parsed: EnrichersFile = yaml_serde::from_str(&text).map_err(EnrichersConfigError::Yaml)?;
Ok(parsed)
}
#[cfg(test)]
pub fn build_enrichers(file: EnrichersFile) -> Result<EnrichmentPipeline, EnrichersConfigError> {
build_enrichers_full(file, None, std::sync::Arc::new(NoopMetrics))
}
pub fn build_enrichers_full(
file: EnrichersFile,
source_cache: Option<std::sync::Arc<SourceCache>>,
metrics: std::sync::Arc<dyn MetricsHook>,
) -> Result<EnrichmentPipeline, EnrichersConfigError> {
let http_client =
build_default_http_client().map_err(|message| EnrichersConfigError::BespokeFactory {
enricher_id: "<global>".to_string(),
message,
})?;
let mut enrichers: Vec<Box<dyn rsigma_runtime::Enricher>> =
Vec::with_capacity(file.enrichers.len());
for cfg in file.enrichers {
enrichers.push(build_one(
cfg,
http_client.clone(),
source_cache.clone(),
metrics.clone(),
)?);
}
let cap = file
.max_concurrent_enrichments
.unwrap_or(DEFAULT_MAX_CONCURRENT_ENRICHMENTS);
Ok(EnrichmentPipeline::new(enrichers, cap).with_metrics(metrics))
}
fn build_one(
cfg: EnricherConfig,
http_client: HttpEnricherClient,
source_cache: Option<std::sync::Arc<SourceCache>>,
metrics: std::sync::Arc<dyn MetricsHook>,
) -> Result<Box<dyn rsigma_runtime::Enricher>, EnrichersConfigError> {
let kind: EnricherKind = cfg.kind.into();
let on_error: OnError = cfg.on_error.into();
let timeout = cfg.timeout.unwrap_or(DEFAULT_ENRICHER_TIMEOUT);
let scope =
match &cfg.scope {
Some(s) => Scope::new(s.rules.clone(), s.tags.clone(), s.levels.clone()).map_err(
|message| EnrichersConfigError::Scope {
enricher_id: cfg.id.clone(),
message,
},
)?,
None => Scope::default(),
};
validate_templated_fields(&cfg, kind)?;
match cfg.type_name.as_str() {
"template" => {
let template = cfg
.template
.clone()
.ok_or(EnrichersConfigError::MissingField {
enricher_id: cfg.id.clone(),
type_name: cfg.type_name.clone(),
field: "template",
})?;
Ok(Box::new(TemplateEnricher::new(
cfg.id,
kind,
cfg.inject_field,
template,
timeout,
on_error,
scope,
)))
}
"http" => {
let url = cfg.url.clone().ok_or(EnrichersConfigError::MissingField {
enricher_id: cfg.id.clone(),
type_name: cfg.type_name.clone(),
field: "url",
})?;
let method = cfg.method.clone().unwrap_or_else(|| "GET".to_string());
let headers: Vec<(String, String)> = cfg
.headers
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let extract = build_extract_expr(&cfg)?;
let cache_ttl = cfg.cache_ttl.unwrap_or_default();
let cache = HttpResponseCache::new(cache_ttl);
Ok(Box::new(
HttpEnricher::new(
cfg.id,
kind,
cfg.inject_field,
method,
url,
headers,
cfg.body.clone(),
timeout,
on_error,
scope,
extract,
http_client,
cache,
)
.with_metrics(metrics),
))
}
"command" => {
if cfg.command.is_empty() {
return Err(EnrichersConfigError::MissingField {
enricher_id: cfg.id.clone(),
type_name: cfg.type_name.clone(),
field: "command",
});
}
Ok(Box::new(CommandEnricher::new(
cfg.id,
kind,
cfg.inject_field,
cfg.command,
cfg.env,
timeout,
on_error,
scope,
cfg.output.into(),
)))
}
"lookup" => {
let source = cfg
.source
.clone()
.ok_or(EnrichersConfigError::MissingField {
enricher_id: cfg.id.clone(),
type_name: cfg.type_name.clone(),
field: "source",
})?;
let cache = source_cache.ok_or(EnrichersConfigError::MissingField {
enricher_id: cfg.id.clone(),
type_name: cfg.type_name.clone(),
field: "<source_cache: no dynamic sources configured; \
pass --source <file> to the daemon to declare sources>",
})?;
let extract = build_extract_expr(&cfg)?;
Ok(Box::new(LookupEnricher::new(
cfg.id,
kind,
cfg.inject_field,
source,
extract,
cfg.default,
timeout,
on_error,
scope,
cache,
)))
}
other => {
let factory = lookup_builtin(other).ok_or(EnrichersConfigError::UnknownType {
enricher_id: cfg.id.clone(),
type_name: other.to_string(),
})?;
let raw =
serde_json::to_value(&cfg).map_err(|e| EnrichersConfigError::BespokeFactory {
enricher_id: cfg.id.clone(),
message: format!("internal: re-serialize failed: {e}"),
})?;
factory(&raw).map_err(|message| EnrichersConfigError::BespokeFactory {
enricher_id: cfg.id.clone(),
message,
})
}
}
}
fn build_extract_expr(
cfg: &EnricherConfig,
) -> Result<Option<rsigma_eval::pipeline::sources::ExtractExpr>, EnrichersConfigError> {
use rsigma_eval::pipeline::sources::ExtractExpr;
let Some(expr) = cfg.extract.clone() else {
return Ok(None);
};
let kind = cfg.extract_type.as_deref().unwrap_or("jq");
Ok(Some(match kind {
"jq" => ExtractExpr::Jq(expr),
"jsonpath" => ExtractExpr::JsonPath(expr),
"cel" => ExtractExpr::Cel(expr),
other => {
return Err(EnrichersConfigError::Scope {
enricher_id: cfg.id.clone(),
message: format!("unknown extract_type '{other}' (expected jq | jsonpath | cel)"),
});
}
}))
}
fn validate_templated_fields(
cfg: &EnricherConfig,
kind: EnricherKind,
) -> Result<(), EnrichersConfigError> {
let id = cfg.id.as_str();
let check = |s: &str, field: &'static str| -> Result<(), EnrichersConfigError> {
validate_template_namespace(s, kind, id, field).map_err(EnrichersConfigError::Template)
};
if let Some(t) = &cfg.template {
check(t, "template")?;
}
if let Some(u) = &cfg.url {
check(u, "url")?;
}
for (k, v) in &cfg.headers {
let static_field: &'static str = Box::leak(format!("headers.{k}").into_boxed_str());
check(v, static_field)?;
}
if let Some(b) = &cfg.body {
check(b, "body")?;
}
for (i, c) in cfg.command.iter().enumerate() {
let static_field: &'static str = Box::leak(format!("command[{i}]").into_boxed_str());
check(c, static_field)?;
}
for (k, v) in &cfg.env {
let static_field: &'static str = Box::leak(format!("env.{k}").into_boxed_str());
check(v, static_field)?;
}
if let Some(e) = &cfg.extract {
check(e, "extract")?;
}
Ok(())
}
impl serde::Serialize for EnricherConfig {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut m = serializer.serialize_map(None)?;
m.serialize_entry("id", &self.id)?;
m.serialize_entry(
"kind",
match self.kind {
KindLabel::Detection => "detection",
KindLabel::Correlation => "correlation",
},
)?;
m.serialize_entry("type", &self.type_name)?;
m.serialize_entry("inject_field", &self.inject_field)?;
if let Some(t) = &self.timeout {
m.serialize_entry("timeout_ms", &(t.as_millis() as u64))?;
}
m.serialize_entry(
"on_error",
match self.on_error {
OnErrorLabel::Skip => "skip",
OnErrorLabel::Null => "null",
OnErrorLabel::Drop => "drop",
},
)?;
if let Some(s) = &self.scope {
m.serialize_entry("scope", s)?;
}
if let Some(t) = &self.template {
m.serialize_entry("template", t)?;
}
if let Some(u) = &self.url {
m.serialize_entry("url", u)?;
}
if let Some(meth) = &self.method {
m.serialize_entry("method", meth)?;
}
if !self.headers.is_empty() {
m.serialize_entry("headers", &self.headers)?;
}
if let Some(b) = &self.body {
m.serialize_entry("body", b)?;
}
if let Some(c) = &self.cache_ttl {
m.serialize_entry("cache_ttl_ms", &(c.as_millis() as u64))?;
}
if let Some(e) = &self.extract {
m.serialize_entry("extract", e)?;
}
if let Some(et) = &self.extract_type {
m.serialize_entry("extract_type", et)?;
}
if !self.command.is_empty() {
m.serialize_entry("command", &self.command)?;
}
if !self.env.is_empty() {
m.serialize_entry("env", &self.env)?;
}
if let Some(s) = &self.source {
m.serialize_entry("source", s)?;
}
if let Some(d) = &self.default {
m.serialize_entry("default", d)?;
}
m.end()
}
}
impl serde::Serialize for ScopeConfig {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut m = serializer.serialize_map(None)?;
if !self.rules.is_empty() {
m.serialize_entry("rules", &self.rules)?;
}
if !self.tags.is_empty() {
m.serialize_entry("tags", &self.tags)?;
}
if !self.levels.is_empty() {
m.serialize_entry("levels", &self.levels)?;
}
m.end()
}
}
mod humantime_opt {
use std::time::Duration;
use serde::{Deserialize, Deserializer};
pub fn deserialize<'de, D>(d: D) -> Result<Option<Duration>, D::Error>
where
D: Deserializer<'de>,
{
let raw: Option<String> = Option::deserialize(d)?;
match raw {
Some(s) => humantime::parse_duration(&s)
.map(Some)
.map_err(serde::de::Error::custom),
None => Ok(None),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn cfg_template_yaml() -> &'static str {
r#"
max_concurrent_enrichments: 8
enrichers:
- id: runbook_det
kind: detection
type: template
template: "https://wiki/runbooks/${detection.rule.id}"
inject_field: runbook_url
- id: runbook_corr
kind: correlation
type: template
template: "https://wiki/runbooks/${correlation.rule.id}"
inject_field: runbook_url
"#
}
#[test]
fn loads_minimal_template_config() {
let parsed: EnrichersFile = yaml_serde::from_str(cfg_template_yaml()).unwrap();
let pipeline = build_enrichers(parsed).unwrap();
assert_eq!(pipeline.len(), 2);
}
#[test]
fn rejects_cross_namespace_in_detection_enricher() {
let yaml = r#"
enrichers:
- id: bad
kind: detection
type: template
inject_field: out
template: "https://wiki/${correlation.rule.id}"
"#;
let parsed: EnrichersFile = yaml_serde::from_str(yaml).unwrap();
let err = build_enrichers(parsed).unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("wrong namespace"), "got: {msg}");
}
#[test]
fn rejects_unknown_type() {
let yaml = r#"
enrichers:
- id: weird
kind: detection
type: something_unknown
inject_field: out
"#;
let parsed: EnrichersFile = yaml_serde::from_str(yaml).unwrap();
let err = build_enrichers(parsed).unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("unknown type"), "got: {msg}");
}
#[test]
fn two_kind_aware_entries_for_one_logical_enricher() {
let yaml = r#"
enrichers:
- id: runbook_det
kind: detection
type: template
inject_field: runbook_url
template: "https://wiki/${detection.rule.id}"
- id: runbook_corr
kind: correlation
type: template
inject_field: runbook_url
template: "https://wiki/${correlation.rule.id}"
"#;
let parsed: EnrichersFile = yaml_serde::from_str(yaml).unwrap();
let pipeline = build_enrichers(parsed).unwrap();
assert_eq!(pipeline.len(), 2);
}
#[test]
fn rejects_missing_template_field() {
let yaml = r#"
enrichers:
- id: t
kind: detection
type: template
inject_field: out
"#;
let parsed: EnrichersFile = yaml_serde::from_str(yaml).unwrap();
let err = build_enrichers(parsed).unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("missing required field 'template'"),
"got: {msg}"
);
}
#[test]
fn defaults_max_concurrent_when_unset_or_zero() {
let yaml = r#"
enrichers: []
"#;
let parsed: EnrichersFile = yaml_serde::from_str(yaml).unwrap();
let pipeline = build_enrichers(parsed).unwrap();
assert!(pipeline.is_empty());
}
#[test]
fn timeout_string_parses_humantime() {
let yaml = r#"
enrichers:
- id: t
kind: detection
type: template
inject_field: out
template: "x"
timeout: 2500ms
"#;
let parsed: EnrichersFile = yaml_serde::from_str(yaml).unwrap();
build_enrichers(parsed).unwrap();
}
}