1use std::collections::HashMap;
19use std::path::Path;
20use std::sync::Arc;
21use std::time::Duration;
22
23use rsigma_eval::ResultBody;
24use serde::Deserialize;
25
26use crate::enrichment::{
27 EnricherKind, HttpEnricherClient, Scope, TemplateError, build_default_http_client,
28 validate_template_namespace,
29};
30use crate::io::DeliveryConfig;
31use crate::metrics::MetricsHook;
32
33use super::sink::{TokenBucket, WebhookSink};
34
35pub const DEFAULT_WEBHOOK_TIMEOUT: Duration = Duration::from_secs(10);
37pub const DEFAULT_WEBHOOK_ATTEMPTS: u32 = 3;
39pub const DEFAULT_WEBHOOK_BACKOFF: Duration = Duration::from_secs(1);
41pub const DEFAULT_WEBHOOK_MAX_BACKOFF: Duration = Duration::from_secs(30);
43pub const DEFAULT_WEBHOOK_QUEUE_SIZE: usize = 1024;
45
46#[derive(Debug, Clone, Deserialize)]
59pub struct WebhooksFile {
60 #[serde(default)]
63 pub webhooks: Vec<WebhookConfig>,
64}
65
66#[derive(Debug, Clone, Deserialize)]
68pub struct WebhookConfig {
69 pub id: String,
71 pub kind: String,
75 pub url: String,
77 #[serde(default)]
79 pub method: Option<String>,
80 #[serde(default)]
82 pub headers: HashMap<String, String>,
83 #[serde(default)]
86 pub body: Option<String>,
87 #[serde(default, with = "humantime_opt")]
89 pub timeout: Option<Duration>,
90 #[serde(default)]
92 pub retry: Option<RetryConfig>,
93 #[serde(default)]
95 pub rate_limit: Option<RateLimitConfig>,
96 #[serde(default)]
98 pub scope: Option<ScopeConfig>,
99 #[serde(default)]
101 pub queue_size: Option<usize>,
102 #[serde(default)]
106 pub tls: Option<WebhookTlsConfig>,
107}
108
109#[derive(Debug, Clone, Default, Deserialize)]
111pub struct WebhookTlsConfig {
112 #[serde(default)]
115 pub ca: Option<String>,
116 #[serde(default)]
119 pub client_cert: Option<String>,
120 #[serde(default)]
123 pub client_key: Option<String>,
124}
125
126#[derive(Debug, Clone, Default, Deserialize)]
128pub struct RetryConfig {
129 #[serde(default)]
132 pub attempts: Option<u32>,
133 #[serde(default, with = "humantime_opt")]
135 pub backoff: Option<Duration>,
136 #[serde(default, with = "humantime_opt")]
138 pub max_backoff: Option<Duration>,
139}
140
141#[derive(Debug, Clone, Deserialize)]
143pub struct RateLimitConfig {
144 pub requests: u32,
146 #[serde(with = "humantime_dur")]
148 pub per: Duration,
149}
150
151#[derive(Debug, Clone, Default, Deserialize)]
153pub struct ScopeConfig {
154 #[serde(default)]
155 pub rules: Vec<String>,
156 #[serde(default)]
157 pub tags: Vec<String>,
158 #[serde(default)]
159 pub levels: Vec<String>,
160}
161
162#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum WebhookKind {
168 Detection,
170 Correlation,
172}
173
174impl WebhookKind {
175 pub fn as_str(self) -> &'static str {
177 match self {
178 WebhookKind::Detection => "detection",
179 WebhookKind::Correlation => "correlation",
180 }
181 }
182
183 pub(crate) fn as_enricher_kind(self) -> EnricherKind {
186 match self {
187 WebhookKind::Detection => EnricherKind::Detection,
188 WebhookKind::Correlation => EnricherKind::Correlation,
189 }
190 }
191
192 pub fn matches(self, body: &ResultBody) -> bool {
194 self.as_enricher_kind().matches(body)
195 }
196
197 fn parse(s: &str) -> Option<Self> {
198 match s {
199 "detection" => Some(WebhookKind::Detection),
200 "correlation" => Some(WebhookKind::Correlation),
201 _ => None,
202 }
203 }
204}
205
206pub struct BuiltWebhook {
210 pub sink: WebhookSink,
211 pub delivery: DeliveryConfig,
212}
213
214#[derive(Debug)]
216pub enum WebhookConfigError {
217 Io(std::io::Error, std::path::PathBuf),
219 Yaml(yaml_serde::Error),
221 UnknownKind { webhook_id: String, kind: String },
223 MissingField {
225 webhook_id: String,
226 field: &'static str,
227 },
228 InvalidMethod { webhook_id: String, method: String },
230 CrossNamespace {
232 webhook_id: String,
233 kind: &'static str,
234 reference: String,
235 field: &'static str,
236 },
237 MalformedTemplate {
239 webhook_id: String,
240 reference: String,
241 field: &'static str,
242 },
243 InvalidRetry { webhook_id: String, message: String },
245 InvalidRateLimit { webhook_id: String, message: String },
247 Scope { webhook_id: String, message: String },
249 Tls { webhook_id: String, message: String },
251 Client { message: String },
253}
254
255impl std::fmt::Display for WebhookConfigError {
256 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
257 match self {
258 WebhookConfigError::Io(e, p) => {
259 write!(f, "failed to read webhooks config '{}': {e}", p.display())
260 }
261 WebhookConfigError::Yaml(e) => write!(f, "invalid webhooks YAML: {e}"),
262 WebhookConfigError::UnknownKind { webhook_id, kind } => write!(
263 f,
264 "webhook '{webhook_id}': unknown kind '{kind}' (valid kinds: detection, correlation; incident arrives with roadmap item #48)"
265 ),
266 WebhookConfigError::MissingField { webhook_id, field } => {
267 write!(
268 f,
269 "webhook '{webhook_id}': missing required field '{field}'"
270 )
271 }
272 WebhookConfigError::InvalidMethod { webhook_id, method } => {
273 write!(f, "webhook '{webhook_id}': invalid HTTP method '{method}'")
274 }
275 WebhookConfigError::CrossNamespace {
276 webhook_id,
277 kind,
278 reference,
279 field,
280 } => write!(
281 f,
282 "webhook '{webhook_id}' (kind: {kind}): template reference '${{{reference}}}' in field '{field}' is the wrong namespace for a {kind} webhook"
283 ),
284 WebhookConfigError::MalformedTemplate {
285 webhook_id,
286 reference,
287 field,
288 } => write!(
289 f,
290 "webhook '{webhook_id}': malformed template reference '${{{reference}}}' in field '{field}'; expected ${{detection.*}}, ${{correlation.*}}, or ${{ENV_VAR}}"
291 ),
292 WebhookConfigError::InvalidRetry {
293 webhook_id,
294 message,
295 } => write!(f, "webhook '{webhook_id}': {message}"),
296 WebhookConfigError::InvalidRateLimit {
297 webhook_id,
298 message,
299 } => write!(f, "webhook '{webhook_id}': {message}"),
300 WebhookConfigError::Scope {
301 webhook_id,
302 message,
303 } => write!(f, "webhook '{webhook_id}': {message}"),
304 WebhookConfigError::Tls {
305 webhook_id,
306 message,
307 } => write!(f, "webhook '{webhook_id}': {message}"),
308 WebhookConfigError::Client { message } => {
309 write!(f, "webhook HTTP client build failed: {message}")
310 }
311 }
312 }
313}
314
315impl std::error::Error for WebhookConfigError {}
316
317pub fn load_webhooks_file(path: &Path) -> Result<WebhooksFile, WebhookConfigError> {
320 let text =
321 std::fs::read_to_string(path).map_err(|e| WebhookConfigError::Io(e, path.to_path_buf()))?;
322 let parsed: WebhooksFile = yaml_serde::from_str(&text).map_err(WebhookConfigError::Yaml)?;
323 Ok(parsed)
324}
325
326pub fn build_webhooks(
334 file: WebhooksFile,
335 metrics: Arc<dyn MetricsHook>,
336) -> Result<Vec<BuiltWebhook>, WebhookConfigError> {
337 let client =
338 build_default_http_client().map_err(|message| WebhookConfigError::Client { message })?;
339 let mut built = Vec::with_capacity(file.webhooks.len());
340 for cfg in file.webhooks {
341 built.push(build_one(cfg, client.clone(), metrics.clone())?);
342 }
343 Ok(built)
344}
345
346fn build_one(
347 cfg: WebhookConfig,
348 default_client: HttpEnricherClient,
349 metrics: Arc<dyn MetricsHook>,
350) -> Result<BuiltWebhook, WebhookConfigError> {
351 let kind = WebhookKind::parse(&cfg.kind).ok_or_else(|| WebhookConfigError::UnknownKind {
352 webhook_id: cfg.id.clone(),
353 kind: cfg.kind.clone(),
354 })?;
355 if cfg.url.trim().is_empty() {
356 return Err(WebhookConfigError::MissingField {
357 webhook_id: cfg.id.clone(),
358 field: "url",
359 });
360 }
361
362 let ek = kind.as_enricher_kind();
363 check_template(&cfg.url, ek, &cfg.id, "url")?;
364 for (name, value) in &cfg.headers {
365 let field: &'static str = Box::leak(format!("headers.{name}").into_boxed_str());
366 check_template(value, ek, &cfg.id, field)?;
367 }
368 if let Some(body) = &cfg.body {
369 check_template(body, ek, &cfg.id, "body")?;
370 }
371
372 let method = match &cfg.method {
373 Some(m) => {
374 reqwest::Method::from_bytes(m.to_ascii_uppercase().as_bytes()).map_err(|_| {
375 WebhookConfigError::InvalidMethod {
376 webhook_id: cfg.id.clone(),
377 method: m.clone(),
378 }
379 })?
380 }
381 None => reqwest::Method::POST,
382 };
383
384 let scope =
385 match &cfg.scope {
386 Some(s) => Scope::new(s.rules.clone(), s.tags.clone(), s.levels.clone()).map_err(
387 |message| WebhookConfigError::Scope {
388 webhook_id: cfg.id.clone(),
389 message,
390 },
391 )?,
392 None => Scope::default(),
393 };
394
395 let limiter = match &cfg.rate_limit {
396 Some(rl) => {
397 if rl.requests == 0 {
398 return Err(WebhookConfigError::InvalidRateLimit {
399 webhook_id: cfg.id.clone(),
400 message: "rate_limit.requests must be at least 1".to_string(),
401 });
402 }
403 if rl.per.is_zero() {
404 return Err(WebhookConfigError::InvalidRateLimit {
405 webhook_id: cfg.id.clone(),
406 message: "rate_limit.per must be greater than zero".to_string(),
407 });
408 }
409 Some(TokenBucket::new(rl.requests, rl.per))
410 }
411 None => None,
412 };
413
414 let retry = cfg.retry.clone().unwrap_or_default();
415 let attempts = retry.attempts.unwrap_or(DEFAULT_WEBHOOK_ATTEMPTS);
416 if attempts == 0 {
417 return Err(WebhookConfigError::InvalidRetry {
418 webhook_id: cfg.id.clone(),
419 message: "retry.attempts must be at least 1".to_string(),
420 });
421 }
422
423 let delivery = DeliveryConfig {
424 queue_depth: cfg.queue_size.unwrap_or(DEFAULT_WEBHOOK_QUEUE_SIZE),
425 batch_max: 1,
428 batch_flush: DeliveryConfig::default().batch_flush,
429 retry_max: attempts.saturating_sub(1),
430 backoff_base: retry.backoff.unwrap_or(DEFAULT_WEBHOOK_BACKOFF),
431 backoff_max: retry.max_backoff.unwrap_or(DEFAULT_WEBHOOK_MAX_BACKOFF),
432 };
433
434 let timeout = cfg.timeout.unwrap_or(DEFAULT_WEBHOOK_TIMEOUT);
435 let headers: Vec<(String, String)> = cfg
436 .headers
437 .iter()
438 .map(|(k, v)| (k.clone(), v.clone()))
439 .collect();
440
441 let client = match &cfg.tls {
444 Some(tls) => build_tls_client(&cfg.id, tls)?,
445 None => default_client,
446 };
447
448 metrics.register_webhook(&cfg.id);
449 let sink = WebhookSink::new(
450 cfg.id, kind, method, cfg.url, headers, cfg.body, timeout, scope, limiter, client, metrics,
451 );
452 Ok(BuiltWebhook { sink, delivery })
453}
454
455fn build_tls_client(
461 id: &str,
462 tls: &WebhookTlsConfig,
463) -> Result<HttpEnricherClient, WebhookConfigError> {
464 let err = |message: String| WebhookConfigError::Tls {
465 webhook_id: id.to_string(),
466 message,
467 };
468 match (&tls.client_cert, &tls.client_key) {
469 (Some(_), Some(_)) | (None, None) => {}
470 _ => {
471 return Err(err(
472 "tls.client_cert and tls.client_key must be set together for mutual TLS"
473 .to_string(),
474 ));
475 }
476 }
477
478 ensure_crypto_provider();
479 let resolver =
480 crate::egress::EgressFilteredResolver::new(crate::egress::default_egress_policy())
481 .into_dns_resolver();
482 let mut builder = reqwest::Client::builder().dns_resolver(resolver);
483
484 if let Some(ca_path) = &tls.ca {
485 let pem = std::fs::read(ca_path)
486 .map_err(|e| err(format!("failed to read tls.ca '{ca_path}': {e}")))?;
487 let cert = reqwest::Certificate::from_pem(&pem)
488 .map_err(|e| err(format!("invalid tls.ca PEM: {e}")))?;
489 builder = builder.add_root_certificate(cert);
490 }
491
492 if let (Some(cert_path), Some(key_path)) = (&tls.client_cert, &tls.client_key) {
493 let cert = std::fs::read(cert_path)
494 .map_err(|e| err(format!("failed to read tls.client_cert '{cert_path}': {e}")))?;
495 let key = std::fs::read(key_path)
496 .map_err(|e| err(format!("failed to read tls.client_key '{key_path}': {e}")))?;
497 let mut pem = cert;
499 pem.push(b'\n');
500 pem.extend_from_slice(&key);
501 let identity = reqwest::Identity::from_pem(&pem)
502 .map_err(|e| err(format!("invalid client identity PEM: {e}")))?;
503 builder = builder.identity(identity);
504 }
505
506 builder
507 .build()
508 .map(|c| HttpEnricherClient::from_reqwest(std::sync::Arc::new(c)))
509 .map_err(|e| err(format!("TLS client build failed: {e}")))
510}
511
512fn ensure_crypto_provider() {
520 #[cfg(feature = "otlp")]
521 {
522 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
523 }
524}
525
526fn check_template(
527 text: &str,
528 kind: EnricherKind,
529 id: &str,
530 field: &'static str,
531) -> Result<(), WebhookConfigError> {
532 validate_template_namespace(text, kind, id, field).map_err(|te| match te {
533 TemplateError::CrossNamespace {
534 reference, field, ..
535 } => WebhookConfigError::CrossNamespace {
536 webhook_id: id.to_string(),
537 kind: kind.as_str(),
538 reference,
539 field,
540 },
541 TemplateError::Malformed {
542 reference, field, ..
543 } => WebhookConfigError::MalformedTemplate {
544 webhook_id: id.to_string(),
545 reference,
546 field,
547 },
548 })
549}
550
551mod humantime_opt {
553 use std::time::Duration;
554
555 use serde::{Deserialize, Deserializer};
556
557 pub fn deserialize<'de, D>(d: D) -> Result<Option<Duration>, D::Error>
558 where
559 D: Deserializer<'de>,
560 {
561 let raw: Option<String> = Option::deserialize(d)?;
562 match raw {
563 Some(s) => humantime::parse_duration(&s)
564 .map(Some)
565 .map_err(serde::de::Error::custom),
566 None => Ok(None),
567 }
568 }
569}
570
571mod humantime_dur {
573 use std::time::Duration;
574
575 use serde::{Deserialize, Deserializer};
576
577 pub fn deserialize<'de, D>(d: D) -> Result<Duration, D::Error>
578 where
579 D: Deserializer<'de>,
580 {
581 let s = String::deserialize(d)?;
582 humantime::parse_duration(&s).map_err(serde::de::Error::custom)
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589 use crate::metrics::NoopMetrics;
590
591 fn build(yaml: &str) -> Result<Vec<BuiltWebhook>, WebhookConfigError> {
592 let file: WebhooksFile = yaml_serde::from_str(yaml).expect("yaml parses");
593 build_webhooks(file, Arc::new(NoopMetrics))
594 }
595
596 fn build_err(yaml: &str) -> WebhookConfigError {
599 build(yaml).map(|_| ()).unwrap_err()
600 }
601
602 #[test]
603 fn minimal_detection_webhook_builds() {
604 let built = build(
605 r#"
606webhooks:
607 - id: slack
608 kind: detection
609 url: https://example.test/hook
610 body: '{"text":"${detection.rule.title}"}'
611"#,
612 )
613 .expect("valid config");
614 assert_eq!(built.len(), 1);
615 assert_eq!(built[0].delivery.retry_max, 2);
617 assert_eq!(built[0].delivery.batch_max, 1);
618 assert_eq!(built[0].delivery.queue_depth, 1024);
619 }
620
621 #[test]
622 fn unknown_kind_is_rejected_with_incident_hint() {
623 let err = build_err(
624 r#"
625webhooks:
626 - id: pd
627 kind: incident
628 url: https://example.test/hook
629"#,
630 );
631 let msg = err.to_string();
632 assert!(msg.contains("unknown kind 'incident'"), "{msg}");
633 assert!(msg.contains("roadmap item #48"), "{msg}");
634 }
635
636 #[test]
637 fn missing_url_is_rejected() {
638 let err = build_err(
639 r#"
640webhooks:
641 - id: x
642 kind: detection
643 url: " "
644"#,
645 );
646 assert!(err.to_string().contains("missing required field 'url'"));
647 }
648
649 #[test]
650 fn cross_namespace_template_points_at_the_field() {
651 let err = build_err(
652 r#"
653webhooks:
654 - id: x
655 kind: detection
656 url: https://example.test/hook
657 body: '{"t":"${correlation.rule.title}"}'
658"#,
659 );
660 let msg = err.to_string();
661 assert!(
662 msg.contains("wrong namespace for a detection webhook"),
663 "{msg}"
664 );
665 assert!(msg.contains("field 'body'"), "{msg}");
666 }
667
668 #[test]
669 fn zero_attempts_is_rejected() {
670 let err = build_err(
671 r#"
672webhooks:
673 - id: x
674 kind: detection
675 url: https://example.test/hook
676 retry:
677 attempts: 0
678"#,
679 );
680 assert!(
681 err.to_string()
682 .contains("retry.attempts must be at least 1")
683 );
684 }
685
686 #[test]
687 fn retry_and_queue_override_delivery_defaults() {
688 let built = build(
689 r#"
690webhooks:
691 - id: x
692 kind: detection
693 url: https://example.test/hook
694 retry:
695 attempts: 5
696 backoff: 2s
697 max_backoff: 45s
698 queue_size: 256
699"#,
700 )
701 .expect("valid config");
702 let d = &built[0].delivery;
703 assert_eq!(d.retry_max, 4);
704 assert_eq!(d.backoff_base, Duration::from_secs(2));
705 assert_eq!(d.backoff_max, Duration::from_secs(45));
706 assert_eq!(d.queue_depth, 256);
707 }
708
709 #[test]
710 fn malformed_duration_is_rejected() {
711 let file: Result<WebhooksFile, _> = yaml_serde::from_str(
712 r#"
713webhooks:
714 - id: x
715 kind: detection
716 url: https://example.test/hook
717 timeout: "not-a-duration"
718"#,
719 );
720 assert!(file.is_err(), "humantime parse should fail at deserialize");
721 }
722
723 #[test]
724 fn tls_webhook_with_ca_and_identity_builds() {
725 use std::io::Write;
726
727 use rcgen::{BasicConstraints, CertificateParams, IsCa, KeyPair};
728
729 let mut ca_params = CertificateParams::new(Vec::<String>::new()).unwrap();
730 ca_params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
731 let ca_key = KeyPair::generate().unwrap();
732 let ca_pem = ca_params.self_signed(&ca_key).unwrap().pem();
733
734 let client_key = KeyPair::generate().unwrap();
735 let client_pem = CertificateParams::new(vec!["client".to_string()])
736 .unwrap()
737 .self_signed(&client_key)
738 .unwrap()
739 .pem();
740 let client_key_pem = client_key.serialize_pem();
741
742 let write = |contents: &str| {
743 let mut f = tempfile::Builder::new().suffix(".pem").tempfile().unwrap();
744 f.write_all(contents.as_bytes()).unwrap();
745 f.flush().unwrap();
746 f
747 };
748 let ca = write(&ca_pem);
749 let cert = write(&client_pem);
750 let key = write(&client_key_pem);
751
752 let yaml = format!(
753 r#"
754webhooks:
755 - id: internal
756 kind: detection
757 url: https://relay.internal/hook
758 tls:
759 ca: {ca}
760 client_cert: {cert}
761 client_key: {key}
762"#,
763 ca = ca.path().display(),
764 cert = cert.path().display(),
765 key = key.path().display(),
766 );
767 let built = build(&yaml).expect("a webhook with a CA and client identity should build");
768 assert_eq!(built.len(), 1);
769 }
770
771 #[test]
772 fn tls_client_cert_without_key_is_rejected() {
773 let err = build_err(
774 r#"
775webhooks:
776 - id: internal
777 kind: detection
778 url: https://relay.internal/hook
779 tls:
780 client_cert: /nonexistent/cert.pem
781"#,
782 );
783 assert!(
784 err.to_string()
785 .contains("must be set together for mutual TLS"),
786 "{err}"
787 );
788 }
789
790 #[test]
791 fn tls_unreadable_ca_is_rejected() {
792 let err = build_err(
793 r#"
794webhooks:
795 - id: internal
796 kind: detection
797 url: https://relay.internal/hook
798 tls:
799 ca: /nonexistent/ca.pem
800"#,
801 );
802 assert!(err.to_string().contains("failed to read tls.ca"), "{err}");
803 }
804
805 #[test]
806 fn rate_limit_requires_positive_budget() {
807 let err = build_err(
808 r#"
809webhooks:
810 - id: x
811 kind: detection
812 url: https://example.test/hook
813 rate_limit:
814 requests: 0
815 per: 1m
816"#,
817 );
818 assert!(
819 err.to_string()
820 .contains("rate_limit.requests must be at least 1")
821 );
822 }
823}