1use std::sync::Arc;
7use std::time::Duration;
8
9use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
10use rsigma_eval::{EvaluationResult, ProcessResult};
11
12use crate::enrichment::{HttpEnricherClient, render_template, render_template_json};
13use crate::error::RuntimeError;
14use crate::io::DeliveryContext;
15use crate::metrics::MetricsHook;
16
17use super::config::WebhookKind;
18use super::signing::WebhookSigner;
19
20const DRAIN_CAP: usize = 64 * 1024;
24
25const MAX_RETRY_AFTER: Duration = Duration::from_secs(60);
28
29pub(crate) struct TokenBucket {
36 tokens: f64,
37 capacity: f64,
38 refill_per_sec: f64,
39 last: tokio::time::Instant,
40}
41
42impl TokenBucket {
43 pub(crate) fn new(requests: u32, per: Duration) -> Self {
46 let capacity = f64::from(requests);
47 let refill_per_sec = capacity / per.as_secs_f64();
48 TokenBucket {
49 tokens: capacity,
50 capacity,
51 refill_per_sec,
52 last: tokio::time::Instant::now(),
53 }
54 }
55
56 fn refill(&mut self) {
57 let now = tokio::time::Instant::now();
58 let elapsed = now.saturating_duration_since(self.last).as_secs_f64();
59 if elapsed > 0.0 {
60 self.tokens = (self.tokens + elapsed * self.refill_per_sec).min(self.capacity);
61 self.last = now;
62 }
63 }
64
65 async fn acquire(&mut self) -> bool {
68 self.refill();
69 if self.tokens >= 1.0 {
70 self.tokens -= 1.0;
71 return false;
72 }
73 let needed = 1.0 - self.tokens;
74 let wait = Duration::from_secs_f64(needed / self.refill_per_sec);
75 tokio::time::sleep(wait).await;
76 self.refill();
77 self.tokens = (self.tokens - 1.0).max(0.0);
78 true
79 }
80}
81
82pub struct WebhookSink {
86 id: String,
87 label: &'static str,
92 kind: WebhookKind,
93 method: reqwest::Method,
94 url: String,
95 headers: Vec<(String, String)>,
96 body: Option<String>,
97 timeout: Duration,
98 scope: crate::enrichment::Scope,
99 limiter: Option<TokenBucket>,
100 client: HttpEnricherClient,
101 metrics: Arc<dyn MetricsHook>,
102 signer: Option<WebhookSigner>,
105}
106
107impl WebhookSink {
108 #[allow(clippy::too_many_arguments)]
109 pub(crate) fn new(
110 id: String,
111 kind: WebhookKind,
112 method: reqwest::Method,
113 url: String,
114 headers: Vec<(String, String)>,
115 body: Option<String>,
116 timeout: Duration,
117 scope: crate::enrichment::Scope,
118 limiter: Option<TokenBucket>,
119 client: HttpEnricherClient,
120 metrics: Arc<dyn MetricsHook>,
121 signer: Option<WebhookSigner>,
122 ) -> Self {
123 let label: &'static str = Box::leak(id.clone().into_boxed_str());
124 WebhookSink {
125 id,
126 label,
127 kind,
128 method,
129 url,
130 headers,
131 body,
132 timeout,
133 scope,
134 limiter,
135 client,
136 metrics,
137 signer,
138 }
139 }
140
141 pub fn id(&self) -> &str {
143 &self.id
144 }
145
146 pub fn label(&self) -> &'static str {
150 self.label
151 }
152
153 pub async fn send(
161 &mut self,
162 result: &ProcessResult,
163 ctx: &DeliveryContext,
164 ) -> Result<(), RuntimeError> {
165 for (idx, eval) in result.iter().enumerate() {
166 if !self.kind.matches(&eval.body) || !self.scope.matches(eval) {
167 continue;
168 }
169 self.deliver_one(eval, ctx, idx).await?;
170 }
171 Ok(())
172 }
173
174 async fn deliver_one(
175 &mut self,
176 eval: &EvaluationResult,
177 ctx: &DeliveryContext,
178 idx: usize,
179 ) -> Result<(), RuntimeError> {
180 let waited = match &mut self.limiter {
181 Some(limiter) => limiter.acquire().await,
182 None => false,
183 };
184 if waited {
185 self.metrics.on_webhook_rate_limited(&self.id);
186 }
187
188 let url = render_template(&self.url, eval);
189 let signing_headroom = if self.signer.is_some() { 3 } else { 0 };
193 let mut header_map = HeaderMap::with_capacity(self.headers.len() + signing_headroom);
194 for (name, value_template) in &self.headers {
195 let rendered = render_template(value_template, eval);
196 let header_name = HeaderName::from_bytes(name.as_bytes()).map_err(|e| {
197 RuntimeError::Permanent(format!(
198 "webhook {}: invalid header name '{name}': {e}",
199 self.id
200 ))
201 })?;
202 let header_value = HeaderValue::from_str(&rendered).map_err(|e| {
203 RuntimeError::Permanent(format!(
204 "webhook {}: invalid header value for '{name}': {e}",
205 self.id
206 ))
207 })?;
208 header_map.insert(header_name, header_value);
209 }
210 let body = self.body.as_ref().map(|b| render_template_json(b, eval));
211
212 if let Some(signer) = &self.signer {
216 let request_id = format!("{}-{idx}", ctx.id_base());
217 for (name, value) in signer.sign(
218 body.as_deref().unwrap_or(""),
219 ctx.first_attempt(),
220 &request_id,
221 ) {
222 let header_name = HeaderName::from_bytes(name.as_bytes()).map_err(|e| {
223 RuntimeError::Permanent(format!(
224 "webhook {}: invalid signing header name '{name}': {e}",
225 self.id
226 ))
227 })?;
228 let header_value = HeaderValue::from_str(&value).map_err(|e| {
229 RuntimeError::Permanent(format!(
230 "webhook {}: invalid signing header value for '{name}': {e}",
231 self.id
232 ))
233 })?;
234 header_map.insert(header_name, header_value);
235 }
236 }
237
238 let mut req = self
239 .client
240 .inner()
241 .request(self.method.clone(), &url)
242 .headers(header_map)
243 .timeout(self.timeout);
244 if let Some(b) = &body {
245 req = req.body(b.clone());
246 }
247
248 let started = std::time::Instant::now();
249 let resp = match req.send().await {
250 Ok(r) => r,
251 Err(e) => {
254 return Err(RuntimeError::Io(std::io::Error::other(format!(
255 "webhook {}: request error: {e}",
256 self.id
257 ))));
258 }
259 };
260
261 let status = resp.status();
262 let elapsed = started.elapsed().as_secs_f64();
263
264 if status.is_success() {
265 drain_body(resp).await;
266 self.metrics
267 .on_webhook_request(&self.id, "success", elapsed);
268 return Ok(());
269 }
270
271 let retry_after = parse_retry_after(&resp);
272 drain_body(resp).await;
273
274 if status.as_u16() == 429 || status.is_server_error() {
275 if let Some(wait) = retry_after {
278 tokio::time::sleep(wait.min(MAX_RETRY_AFTER)).await;
279 }
280 return Err(RuntimeError::Io(std::io::Error::other(format!(
281 "webhook {}: HTTP {status}",
282 self.id
283 ))));
284 }
285
286 self.metrics
289 .on_webhook_request(&self.id, "permanent_failure", elapsed);
290 Err(RuntimeError::Permanent(format!(
291 "webhook {}: HTTP {status}",
292 self.id
293 )))
294 }
295}
296
297async fn drain_body(mut resp: reqwest::Response) {
299 let mut read = 0usize;
300 while read < DRAIN_CAP {
301 match resp.chunk().await {
302 Ok(Some(chunk)) => read += chunk.len(),
303 _ => break,
304 }
305 }
306}
307
308fn parse_retry_after(resp: &reqwest::Response) -> Option<Duration> {
311 resp.headers()
312 .get(reqwest::header::RETRY_AFTER)?
313 .to_str()
314 .ok()?
315 .trim()
316 .parse::<u64>()
317 .ok()
318 .map(Duration::from_secs)
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324 use std::collections::HashMap;
325
326 use base64::Engine as _;
327 use base64::engine::general_purpose::STANDARD as BASE64;
328 use hmac::{Hmac, KeyInit, Mac};
329 use rsigma_eval::result::{DetectionBody, ResultBody, RuleHeader};
330 use rsigma_parser::Level;
331 use sha2::Sha256;
332 use wiremock::matchers::{method, path};
333 use wiremock::{Mock, MockServer, ResponseTemplate};
334
335 use zeroize::Zeroizing;
336
337 use super::super::signing::SigningScheme;
338 use crate::metrics::NoopMetrics;
339
340 fn detection(title: &str) -> EvaluationResult {
341 EvaluationResult {
342 header: RuleHeader {
343 rule_title: title.to_string(),
344 rule_id: Some("rule-1".to_string()),
345 level: Some(Level::High),
346 tags: vec![],
347 custom_attributes: Arc::new(HashMap::new()),
348 enrichments: None,
349 },
350 body: ResultBody::Detection(DetectionBody {
351 matched_selections: vec!["sel".to_string()],
352 matched_fields: vec![],
353 event: None,
354 }),
355 }
356 }
357
358 fn sink_to(url: String) -> WebhookSink {
359 WebhookSink::new(
360 "test".to_string(),
361 WebhookKind::Detection,
362 reqwest::Method::POST,
363 url,
364 vec![("Content-Type".to_string(), "application/json".to_string())],
365 Some(r#"{"text":"${detection.rule.title}"}"#.to_string()),
366 Duration::from_secs(5),
367 crate::enrichment::Scope::default(),
368 None,
369 crate::enrichment::build_default_http_client().unwrap(),
370 Arc::new(NoopMetrics),
371 None,
372 )
373 }
374
375 fn ctx() -> DeliveryContext {
376 DeliveryContext::new()
377 }
378
379 fn signed_sink_to(url: String, signer: WebhookSigner) -> WebhookSink {
380 WebhookSink::new(
381 "test".to_string(),
382 WebhookKind::Detection,
383 reqwest::Method::POST,
384 url,
385 vec![("Content-Type".to_string(), "application/json".to_string())],
386 Some(r#"{"text":"${detection.rule.title}"}"#.to_string()),
387 Duration::from_secs(5),
388 crate::enrichment::Scope::default(),
389 None,
390 crate::enrichment::build_default_http_client().unwrap(),
391 Arc::new(NoopMetrics),
392 Some(signer),
393 )
394 }
395
396 #[tokio::test]
397 async fn success_2xx_is_ok() {
398 let server = MockServer::start().await;
399 Mock::given(method("POST"))
400 .and(path("/hook"))
401 .respond_with(ResponseTemplate::new(204))
402 .mount(&server)
403 .await;
404 let mut sink = sink_to(format!("{}/hook", server.uri()));
405 let result: ProcessResult = vec![detection("hi")];
406 assert!(sink.send(&result, &ctx()).await.is_ok());
407 }
408
409 #[tokio::test]
410 async fn server_error_is_retryable() {
411 let server = MockServer::start().await;
412 Mock::given(method("POST"))
413 .respond_with(ResponseTemplate::new(500))
414 .mount(&server)
415 .await;
416 let mut sink = sink_to(format!("{}/hook", server.uri()));
417 let result: ProcessResult = vec![detection("hi")];
418 match sink.send(&result, &ctx()).await {
419 Err(RuntimeError::Io(_)) => {}
420 other => panic!("expected retryable Io error, got {other:?}"),
421 }
422 }
423
424 #[tokio::test]
425 async fn client_error_is_permanent() {
426 let server = MockServer::start().await;
427 Mock::given(method("POST"))
428 .respond_with(ResponseTemplate::new(400))
429 .mount(&server)
430 .await;
431 let mut sink = sink_to(format!("{}/hook", server.uri()));
432 let result: ProcessResult = vec![detection("hi")];
433 match sink.send(&result, &ctx()).await {
434 Err(RuntimeError::Permanent(_)) => {}
435 other => panic!("expected permanent error, got {other:?}"),
436 }
437 }
438
439 #[tokio::test]
440 async fn non_matching_kind_is_skipped_without_request() {
441 let server = MockServer::start().await;
444 let mut sink = sink_to(format!("{}/hook", server.uri()));
445 let correlation = EvaluationResult {
446 header: RuleHeader {
447 rule_title: "corr".to_string(),
448 rule_id: None,
449 level: None,
450 tags: vec![],
451 custom_attributes: Arc::new(HashMap::new()),
452 enrichments: None,
453 },
454 body: ResultBody::Correlation(rsigma_eval::result::CorrelationBody {
455 correlation_type: rsigma_parser::CorrelationType::EventCount,
456 aggregated_value: 1.0,
457 timespan_secs: 60,
458 group_key: vec![],
459 events: None,
460 event_refs: None,
461 }),
462 };
463 let result: ProcessResult = vec![correlation];
464 assert!(sink.send(&result, &ctx()).await.is_ok());
465 }
466
467 #[test]
468 fn slack_recipe_body_renders_to_pinned_json() {
469 let body = r#"{"text":":rotating_light: ${detection.rule.title} (${detection.rule.level}) cmd=${detection.fields.CommandLine}"}"#;
473 let mut r = detection("Encoded PowerShell");
474 if let ResultBody::Detection(d) = &mut r.body {
475 d.matched_fields.push(rsigma_eval::result::FieldMatch::new(
476 "CommandLine",
477 serde_json::json!(r#"powershell -enc "AAA""#),
478 ));
479 }
480 let rendered = crate::enrichment::render_template_json(body, &r);
481 assert_eq!(
482 rendered,
483 r#"{"text":":rotating_light: Encoded PowerShell (high) cmd=powershell -enc \"AAA\""}"#,
484 );
485 let _: serde_json::Value = serde_json::from_str(&rendered).expect("valid JSON");
487 }
488
489 #[tokio::test]
490 async fn signed_request_carries_a_verifiable_signature() {
491 let server = MockServer::start().await;
492 Mock::given(method("POST"))
493 .and(path("/hook"))
494 .respond_with(ResponseTemplate::new(200))
495 .mount(&server)
496 .await;
497
498 let secret = b"shared-secret".to_vec();
499 let signer = WebhookSigner::new(
500 SigningScheme::Standard,
501 vec![Zeroizing::new(secret.clone())],
502 );
503 let mut sink = signed_sink_to(format!("{}/hook", server.uri()), signer);
504 sink.send(&vec![detection("hi")], &ctx()).await.unwrap();
505
506 let reqs = server.received_requests().await.unwrap();
507 let req = &reqs[0];
508 let header = |name: &str| {
509 req.headers
510 .get(name)
511 .and_then(|v| v.to_str().ok())
512 .unwrap_or_default()
513 .to_string()
514 };
515 let id = header("webhook-id");
516 let ts = header("webhook-timestamp");
517 let sig = header("webhook-signature");
518 assert!(id.starts_with("msg_"), "id should be msg_<uuid>: {id}");
519
520 let body = std::str::from_utf8(&req.body).unwrap();
523 let signed = format!("{id}.{ts}.{body}");
524 let mut mac = Hmac::<Sha256>::new_from_slice(&secret).unwrap();
525 mac.update(signed.as_bytes());
526 let expected = format!("v1,{}", BASE64.encode(mac.finalize().into_bytes()));
527 assert_eq!(sig, expected, "signature must verify over the wire body");
528 }
529
530 #[tokio::test]
531 async fn retries_with_the_same_context_reproduce_the_signature() {
532 let server = MockServer::start().await;
533 Mock::given(method("POST"))
534 .respond_with(ResponseTemplate::new(200))
535 .mount(&server)
536 .await;
537
538 let signer =
539 WebhookSigner::new(SigningScheme::Standard, vec![Zeroizing::new(b"k".to_vec())]);
540 let mut sink = signed_sink_to(format!("{}/hook", server.uri()), signer);
541 let result: ProcessResult = vec![detection("hi")];
542
543 let context = ctx();
546 sink.send(&result, &context).await.unwrap();
547 sink.send(&result, &context).await.unwrap();
548
549 let reqs = server.received_requests().await.unwrap();
550 assert_eq!(reqs.len(), 2);
551 let pick = |i: usize, name: &str| {
552 reqs[i]
553 .headers
554 .get(name)
555 .and_then(|v| v.to_str().ok())
556 .unwrap_or_default()
557 .to_string()
558 };
559 for name in ["webhook-id", "webhook-timestamp", "webhook-signature"] {
560 assert_eq!(
561 pick(0, name),
562 pick(1, name),
563 "{name} must be identical across retries"
564 );
565 }
566 }
567
568 #[tokio::test]
569 async fn token_bucket_waits_when_empty() {
570 let mut tb = TokenBucket::new(2, Duration::from_millis(100));
572 assert!(!tb.acquire().await, "first token is free");
573 assert!(!tb.acquire().await, "second token is free");
574 let start = std::time::Instant::now();
575 assert!(tb.acquire().await, "third token must wait");
576 assert!(start.elapsed() >= Duration::from_millis(40));
577 }
578}