1use std::collections::HashMap;
16use std::path::Path;
17use std::time::Duration;
18
19use serde::Deserialize;
20
21use crate::{
22 CommandEnricher, EnricherKind, EnrichmentPipeline, HttpEnricher, HttpEnricherClient,
23 HttpResponseCache, LookupEnricher, MetricsHook, NoopMetrics, OnError, OutputFormat, Scope,
24 SourceCache, TemplateEnricher, build_default_http_client, lookup_builtin,
25 validate_template_namespace,
26};
27
28const DEFAULT_ENRICHER_TIMEOUT: Duration = Duration::from_secs(5);
30
31const DEFAULT_MAX_CONCURRENT_ENRICHMENTS: usize = 16;
33
34#[derive(Debug, Clone, Deserialize)]
46pub struct EnrichersFile {
47 #[serde(default)]
51 pub max_concurrent_enrichments: Option<usize>,
52
53 #[serde(default)]
57 pub enrichers: Vec<EnricherConfig>,
58}
59
60#[derive(Debug, Clone, Deserialize)]
62pub struct EnricherConfig {
63 pub id: String,
65 pub kind: KindLabel,
67 #[serde(rename = "type")]
71 pub type_name: String,
72 pub inject_field: String,
74 #[serde(default, with = "humantime_opt")]
76 pub timeout: Option<Duration>,
77 #[serde(default)]
79 pub on_error: OnErrorLabel,
80 #[serde(default)]
82 pub scope: Option<ScopeConfig>,
83
84 #[serde(default)]
89 pub template: Option<String>,
90
91 #[serde(default)]
97 pub url: Option<String>,
98 #[serde(default)]
101 pub method: Option<String>,
102 #[serde(default)]
104 pub headers: HashMap<String, String>,
105 #[serde(default)]
107 pub body: Option<String>,
108 #[serde(default, with = "humantime_opt")]
110 pub cache_ttl: Option<Duration>,
111 #[serde(default)]
114 pub extract: Option<String>,
115 #[serde(default)]
118 pub extract_type: Option<String>,
119 #[serde(default)]
122 pub command: Vec<String>,
123 #[serde(default)]
125 pub env: HashMap<String, String>,
126 #[serde(default)]
128 pub output: OutputFormatLabel,
129 #[serde(default)]
131 pub source: Option<String>,
132 #[serde(default)]
135 pub default: Option<serde_json::Value>,
136}
137
138#[derive(Debug, Clone, Copy, Deserialize)]
141#[serde(rename_all = "lowercase")]
142pub enum KindLabel {
143 Detection,
144 Correlation,
145}
146
147impl From<KindLabel> for EnricherKind {
148 fn from(k: KindLabel) -> Self {
149 match k {
150 KindLabel::Detection => EnricherKind::Detection,
151 KindLabel::Correlation => EnricherKind::Correlation,
152 }
153 }
154}
155
156#[derive(Debug, Clone, Copy, Default, Deserialize)]
158#[serde(rename_all = "lowercase")]
159pub enum OnErrorLabel {
160 #[default]
161 Skip,
162 Null,
163 Drop,
164}
165
166impl From<OnErrorLabel> for OnError {
167 fn from(o: OnErrorLabel) -> Self {
168 match o {
169 OnErrorLabel::Skip => OnError::Skip,
170 OnErrorLabel::Null => OnError::Null,
171 OnErrorLabel::Drop => OnError::Drop,
172 }
173 }
174}
175
176#[derive(Debug, Clone, Copy, Default, Deserialize)]
178#[serde(rename_all = "lowercase")]
179pub enum OutputFormatLabel {
180 #[default]
182 Json,
183 Raw,
185}
186
187impl From<OutputFormatLabel> for OutputFormat {
188 fn from(o: OutputFormatLabel) -> Self {
189 match o {
190 OutputFormatLabel::Json => OutputFormat::Json,
191 OutputFormatLabel::Raw => OutputFormat::Raw,
192 }
193 }
194}
195
196#[derive(Debug, Clone, Default, Deserialize)]
198pub struct ScopeConfig {
199 #[serde(default)]
200 pub rules: Vec<String>,
201 #[serde(default)]
202 pub tags: Vec<String>,
203 #[serde(default)]
204 pub levels: Vec<String>,
205}
206
207#[derive(Debug)]
209pub enum EnrichersConfigError {
210 Io(std::io::Error, std::path::PathBuf),
212 Yaml(yaml_serde::Error),
214 UnknownType {
216 enricher_id: String,
217 type_name: String,
218 },
219 MissingField {
222 enricher_id: String,
223 type_name: String,
224 field: &'static str,
225 },
226 Template(crate::TemplateError),
228 Scope {
230 enricher_id: String,
231 message: String,
232 },
233 BespokeFactory {
235 enricher_id: String,
236 message: String,
237 },
238}
239
240impl std::fmt::Display for EnrichersConfigError {
241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 match self {
243 EnrichersConfigError::Io(e, p) => {
244 write!(f, "failed to read enrichers config '{}': {e}", p.display())
245 }
246 EnrichersConfigError::Yaml(e) => write!(f, "invalid enrichers YAML: {e}"),
247 EnrichersConfigError::UnknownType {
248 enricher_id,
249 type_name,
250 } => write!(
251 f,
252 "enricher '{enricher_id}': unknown type '{type_name}' (built-ins: template, lookup, http, command; bespoke types must register_builtin() before daemon start)"
253 ),
254 EnrichersConfigError::MissingField {
255 enricher_id,
256 type_name,
257 field,
258 } => write!(
259 f,
260 "enricher '{enricher_id}' (type: {type_name}): missing required field '{field}'"
261 ),
262 EnrichersConfigError::Template(e) => write!(f, "{e}"),
263 EnrichersConfigError::Scope {
264 enricher_id,
265 message,
266 } => write!(f, "enricher '{enricher_id}': {message}"),
267 EnrichersConfigError::BespokeFactory {
268 enricher_id,
269 message,
270 } => write!(
271 f,
272 "enricher '{enricher_id}': bespoke factory rejected config: {message}"
273 ),
274 }
275 }
276}
277
278impl std::error::Error for EnrichersConfigError {}
279
280pub fn load_enrichers_file(path: &Path) -> Result<EnrichersFile, EnrichersConfigError> {
282 let text = std::fs::read_to_string(path)
283 .map_err(|e| EnrichersConfigError::Io(e, path.to_path_buf()))?;
284 let parsed: EnrichersFile = yaml_serde::from_str(&text).map_err(EnrichersConfigError::Yaml)?;
285 Ok(parsed)
286}
287
288pub fn build_enrichers(file: EnrichersFile) -> Result<EnrichmentPipeline, EnrichersConfigError> {
297 build_enrichers_full(file, None, std::sync::Arc::new(NoopMetrics))
298}
299
300pub fn build_enrichers_full(
309 file: EnrichersFile,
310 source_cache: Option<std::sync::Arc<SourceCache>>,
311 metrics: std::sync::Arc<dyn MetricsHook>,
312) -> Result<EnrichmentPipeline, EnrichersConfigError> {
313 let http_client =
314 build_default_http_client().map_err(|message| EnrichersConfigError::BespokeFactory {
315 enricher_id: "<global>".to_string(),
316 message,
317 })?;
318 let mut enrichers: Vec<Box<dyn crate::Enricher>> = Vec::with_capacity(file.enrichers.len());
319 for cfg in file.enrichers {
320 enrichers.push(build_one(
321 cfg,
322 http_client.clone(),
323 source_cache.clone(),
324 metrics.clone(),
325 )?);
326 }
327 let cap = file
328 .max_concurrent_enrichments
329 .unwrap_or(DEFAULT_MAX_CONCURRENT_ENRICHMENTS);
330 Ok(EnrichmentPipeline::new(enrichers, cap).with_metrics(metrics))
331}
332
333fn build_one(
339 cfg: EnricherConfig,
340 http_client: HttpEnricherClient,
341 source_cache: Option<std::sync::Arc<SourceCache>>,
342 metrics: std::sync::Arc<dyn MetricsHook>,
343) -> Result<Box<dyn crate::Enricher>, EnrichersConfigError> {
344 let kind: EnricherKind = cfg.kind.into();
345 let on_error: OnError = cfg.on_error.into();
346 let timeout = cfg.timeout.unwrap_or(DEFAULT_ENRICHER_TIMEOUT);
347
348 let scope =
349 match &cfg.scope {
350 Some(s) => Scope::new(s.rules.clone(), s.tags.clone(), s.levels.clone()).map_err(
351 |message| EnrichersConfigError::Scope {
352 enricher_id: cfg.id.clone(),
353 message,
354 },
355 )?,
356 None => Scope::default(),
357 };
358
359 validate_templated_fields(&cfg, kind)?;
364
365 match cfg.type_name.as_str() {
366 "template" => {
367 let template = cfg
368 .template
369 .clone()
370 .ok_or(EnrichersConfigError::MissingField {
371 enricher_id: cfg.id.clone(),
372 type_name: cfg.type_name.clone(),
373 field: "template",
374 })?;
375 Ok(Box::new(TemplateEnricher::new(
376 cfg.id,
377 kind,
378 cfg.inject_field,
379 template,
380 timeout,
381 on_error,
382 scope,
383 )))
384 }
385 "http" => {
386 let url = cfg.url.clone().ok_or(EnrichersConfigError::MissingField {
387 enricher_id: cfg.id.clone(),
388 type_name: cfg.type_name.clone(),
389 field: "url",
390 })?;
391 let method = cfg.method.clone().unwrap_or_else(|| "GET".to_string());
392 let headers: Vec<(String, String)> = cfg
393 .headers
394 .iter()
395 .map(|(k, v)| (k.clone(), v.clone()))
396 .collect();
397 let extract = build_extract_expr(&cfg)?;
398 let cache_ttl = cfg.cache_ttl.unwrap_or_default();
399 let cache = HttpResponseCache::new(cache_ttl);
400 Ok(Box::new(
401 HttpEnricher::new(
402 cfg.id,
403 kind,
404 cfg.inject_field,
405 method,
406 url,
407 headers,
408 cfg.body.clone(),
409 timeout,
410 on_error,
411 scope,
412 extract,
413 http_client,
414 cache,
415 )
416 .with_metrics(metrics),
417 ))
418 }
419 "command" => {
420 if cfg.command.is_empty() {
421 return Err(EnrichersConfigError::MissingField {
422 enricher_id: cfg.id.clone(),
423 type_name: cfg.type_name.clone(),
424 field: "command",
425 });
426 }
427 Ok(Box::new(CommandEnricher::new(
428 cfg.id,
429 kind,
430 cfg.inject_field,
431 cfg.command,
432 cfg.env,
433 timeout,
434 on_error,
435 scope,
436 cfg.output.into(),
437 )))
438 }
439 "lookup" => {
440 let source = cfg
441 .source
442 .clone()
443 .ok_or(EnrichersConfigError::MissingField {
444 enricher_id: cfg.id.clone(),
445 type_name: cfg.type_name.clone(),
446 field: "source",
447 })?;
448 let cache = source_cache.ok_or(EnrichersConfigError::MissingField {
449 enricher_id: cfg.id.clone(),
450 type_name: cfg.type_name.clone(),
451 field: "<source_cache: no dynamic sources configured; \
452 pass --source <file> to the daemon to declare sources>",
453 })?;
454 let extract = build_extract_expr(&cfg)?;
455 Ok(Box::new(LookupEnricher::new(
456 cfg.id,
457 kind,
458 cfg.inject_field,
459 source,
460 extract,
461 cfg.default,
462 timeout,
463 on_error,
464 scope,
465 cache,
466 )))
467 }
468 other => {
469 let factory = lookup_builtin(other).ok_or(EnrichersConfigError::UnknownType {
471 enricher_id: cfg.id.clone(),
472 type_name: other.to_string(),
473 })?;
474 let raw =
477 serde_json::to_value(&cfg).map_err(|e| EnrichersConfigError::BespokeFactory {
478 enricher_id: cfg.id.clone(),
479 message: format!("internal: re-serialize failed: {e}"),
480 })?;
481 factory(&raw).map_err(|message| EnrichersConfigError::BespokeFactory {
482 enricher_id: cfg.id.clone(),
483 message,
484 })
485 }
486 }
487}
488
489fn build_extract_expr(
494 cfg: &EnricherConfig,
495) -> Result<Option<rsigma_eval::pipeline::sources::ExtractExpr>, EnrichersConfigError> {
496 use rsigma_eval::pipeline::sources::ExtractExpr;
497 let Some(expr) = cfg.extract.clone() else {
498 return Ok(None);
499 };
500 let kind = cfg.extract_type.as_deref().unwrap_or("jq");
501 Ok(Some(match kind {
502 "jq" => ExtractExpr::Jq(expr),
503 "jsonpath" => ExtractExpr::JsonPath(expr),
504 "cel" => ExtractExpr::Cel(expr),
505 other => {
506 return Err(EnrichersConfigError::Scope {
507 enricher_id: cfg.id.clone(),
508 message: format!("unknown extract_type '{other}' (expected jq | jsonpath | cel)"),
509 });
510 }
511 }))
512}
513
514fn validate_templated_fields(
517 cfg: &EnricherConfig,
518 kind: EnricherKind,
519) -> Result<(), EnrichersConfigError> {
520 let id = cfg.id.as_str();
521 let check = |s: &str, field: &'static str| -> Result<(), EnrichersConfigError> {
522 validate_template_namespace(s, kind, id, field).map_err(EnrichersConfigError::Template)
523 };
524 if let Some(t) = &cfg.template {
525 check(t, "template")?;
526 }
527 if let Some(u) = &cfg.url {
528 check(u, "url")?;
529 }
530 for (k, v) in &cfg.headers {
531 let static_field: &'static str = Box::leak(format!("headers.{k}").into_boxed_str());
535 check(v, static_field)?;
536 }
537 if let Some(b) = &cfg.body {
538 check(b, "body")?;
539 }
540 for (i, c) in cfg.command.iter().enumerate() {
541 let static_field: &'static str = Box::leak(format!("command[{i}]").into_boxed_str());
542 check(c, static_field)?;
543 }
544 for (k, v) in &cfg.env {
545 let static_field: &'static str = Box::leak(format!("env.{k}").into_boxed_str());
546 check(v, static_field)?;
547 }
548 if let Some(e) = &cfg.extract {
549 check(e, "extract")?;
550 }
551 Ok(())
552}
553
554impl serde::Serialize for EnricherConfig {
560 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
561 where
562 S: serde::Serializer,
563 {
564 use serde::ser::SerializeMap;
565 let mut m = serializer.serialize_map(None)?;
566 m.serialize_entry("id", &self.id)?;
567 m.serialize_entry(
568 "kind",
569 match self.kind {
570 KindLabel::Detection => "detection",
571 KindLabel::Correlation => "correlation",
572 },
573 )?;
574 m.serialize_entry("type", &self.type_name)?;
575 m.serialize_entry("inject_field", &self.inject_field)?;
576 if let Some(t) = &self.timeout {
577 m.serialize_entry("timeout_ms", &(t.as_millis() as u64))?;
578 }
579 m.serialize_entry(
580 "on_error",
581 match self.on_error {
582 OnErrorLabel::Skip => "skip",
583 OnErrorLabel::Null => "null",
584 OnErrorLabel::Drop => "drop",
585 },
586 )?;
587 if let Some(s) = &self.scope {
588 m.serialize_entry("scope", s)?;
589 }
590 if let Some(t) = &self.template {
591 m.serialize_entry("template", t)?;
592 }
593 if let Some(u) = &self.url {
594 m.serialize_entry("url", u)?;
595 }
596 if let Some(meth) = &self.method {
597 m.serialize_entry("method", meth)?;
598 }
599 if !self.headers.is_empty() {
600 m.serialize_entry("headers", &self.headers)?;
601 }
602 if let Some(b) = &self.body {
603 m.serialize_entry("body", b)?;
604 }
605 if let Some(c) = &self.cache_ttl {
606 m.serialize_entry("cache_ttl_ms", &(c.as_millis() as u64))?;
607 }
608 if let Some(e) = &self.extract {
609 m.serialize_entry("extract", e)?;
610 }
611 if let Some(et) = &self.extract_type {
612 m.serialize_entry("extract_type", et)?;
613 }
614 if !self.command.is_empty() {
615 m.serialize_entry("command", &self.command)?;
616 }
617 if !self.env.is_empty() {
618 m.serialize_entry("env", &self.env)?;
619 }
620 if let Some(s) = &self.source {
621 m.serialize_entry("source", s)?;
622 }
623 if let Some(d) = &self.default {
624 m.serialize_entry("default", d)?;
625 }
626 m.end()
627 }
628}
629
630impl serde::Serialize for ScopeConfig {
631 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
632 where
633 S: serde::Serializer,
634 {
635 use serde::ser::SerializeMap;
636 let mut m = serializer.serialize_map(None)?;
637 if !self.rules.is_empty() {
638 m.serialize_entry("rules", &self.rules)?;
639 }
640 if !self.tags.is_empty() {
641 m.serialize_entry("tags", &self.tags)?;
642 }
643 if !self.levels.is_empty() {
644 m.serialize_entry("levels", &self.levels)?;
645 }
646 m.end()
647 }
648}
649
650mod humantime_opt {
656 use std::time::Duration;
657
658 use serde::{Deserialize, Deserializer};
659
660 pub fn deserialize<'de, D>(d: D) -> Result<Option<Duration>, D::Error>
661 where
662 D: Deserializer<'de>,
663 {
664 let raw: Option<String> = Option::deserialize(d)?;
665 match raw {
666 Some(s) => humantime::parse_duration(&s)
667 .map(Some)
668 .map_err(serde::de::Error::custom),
669 None => Ok(None),
670 }
671 }
672}
673
674#[cfg(test)]
675mod tests {
676 use super::*;
677
678 fn cfg_template_yaml() -> &'static str {
679 r#"
680max_concurrent_enrichments: 8
681enrichers:
682 - id: runbook_det
683 kind: detection
684 type: template
685 template: "https://wiki/runbooks/${detection.rule.id}"
686 inject_field: runbook_url
687
688 - id: runbook_corr
689 kind: correlation
690 type: template
691 template: "https://wiki/runbooks/${correlation.rule.id}"
692 inject_field: runbook_url
693"#
694 }
695
696 #[test]
697 fn loads_minimal_template_config() {
698 let parsed: EnrichersFile = yaml_serde::from_str(cfg_template_yaml()).unwrap();
699 let pipeline = build_enrichers(parsed).unwrap();
700 assert_eq!(pipeline.len(), 2);
701 }
702
703 #[test]
704 fn rejects_cross_namespace_in_detection_enricher() {
705 let yaml = r#"
706enrichers:
707 - id: bad
708 kind: detection
709 type: template
710 inject_field: out
711 template: "https://wiki/${correlation.rule.id}"
712"#;
713 let parsed: EnrichersFile = yaml_serde::from_str(yaml).unwrap();
714 let err = build_enrichers(parsed).unwrap_err();
715 let msg = format!("{err}");
716 assert!(msg.contains("wrong namespace"), "got: {msg}");
717 }
718
719 #[test]
720 fn rejects_unknown_type() {
721 let yaml = r#"
722enrichers:
723 - id: weird
724 kind: detection
725 type: something_unknown
726 inject_field: out
727"#;
728 let parsed: EnrichersFile = yaml_serde::from_str(yaml).unwrap();
729 let err = build_enrichers(parsed).unwrap_err();
730 let msg = format!("{err}");
731 assert!(msg.contains("unknown type"), "got: {msg}");
732 }
733
734 #[test]
735 fn two_kind_aware_entries_for_one_logical_enricher() {
736 let yaml = r#"
737enrichers:
738 - id: runbook_det
739 kind: detection
740 type: template
741 inject_field: runbook_url
742 template: "https://wiki/${detection.rule.id}"
743
744 - id: runbook_corr
745 kind: correlation
746 type: template
747 inject_field: runbook_url
748 template: "https://wiki/${correlation.rule.id}"
749"#;
750 let parsed: EnrichersFile = yaml_serde::from_str(yaml).unwrap();
751 let pipeline = build_enrichers(parsed).unwrap();
752 assert_eq!(pipeline.len(), 2);
753 }
754
755 #[test]
756 fn rejects_missing_template_field() {
757 let yaml = r#"
758enrichers:
759 - id: t
760 kind: detection
761 type: template
762 inject_field: out
763"#;
764 let parsed: EnrichersFile = yaml_serde::from_str(yaml).unwrap();
765 let err = build_enrichers(parsed).unwrap_err();
766 let msg = format!("{err}");
767 assert!(
768 msg.contains("missing required field 'template'"),
769 "got: {msg}"
770 );
771 }
772
773 #[test]
774 fn defaults_max_concurrent_when_unset_or_zero() {
775 let yaml = r#"
776enrichers: []
777"#;
778 let parsed: EnrichersFile = yaml_serde::from_str(yaml).unwrap();
779 let pipeline = build_enrichers(parsed).unwrap();
780 assert!(pipeline.is_empty());
781 }
782
783 #[test]
784 fn timeout_string_parses_humantime() {
785 let yaml = r#"
786enrichers:
787 - id: t
788 kind: detection
789 type: template
790 inject_field: out
791 template: "x"
792 timeout: 2500ms
793"#;
794 let parsed: EnrichersFile = yaml_serde::from_str(yaml).unwrap();
795 build_enrichers(parsed).unwrap();
798 }
799}