Skip to main content

rsigma_runtime/io/webhook/
sink.rs

1//! `WebhookSink`: render a templated HTTP request per result and classify the
2//! response. The queue, retry/backoff, and DLQ routing belong to the shared
3//! `crate::io::delivery` layer; this type owns only the webhook-specific
4//! request behavior (render, rate limit, retryable-vs-permanent classification).
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
10use rsigma_eval::{EvaluationResult, ProcessResult};
11
12use crate::enrichment::{HttpEnricherClient, render_template, render_template_json};
13use crate::error::RuntimeError;
14use crate::io::DeliveryContext;
15use crate::metrics::MetricsHook;
16
17use super::config::WebhookKind;
18use super::signing::WebhookSigner;
19
20/// Cap on the response bytes drained (and discarded) per request. Webhook
21/// responses are never parsed; draining a bounded prefix lets reqwest reuse
22/// the connection without letting a hostile endpoint stream an unbounded body.
23const DRAIN_CAP: usize = 64 * 1024;
24
25/// Ceiling applied to a `Retry-After` header so a hostile or misconfigured
26/// endpoint cannot pin a worker for an arbitrarily long sleep.
27const MAX_RETRY_AFTER: Duration = Duration::from_secs(60);
28
29/// A per-entry token bucket: `capacity` tokens, refilled at `refill_per_sec`.
30///
31/// Driven from `WebhookSink::deliver_one` under `&mut self`, so the worker
32/// processes one request at a time and the bucket needs no interior locking.
33/// Uses [`tokio::time::Instant`] so it stays consistent under paused time in
34/// tests.
35pub(crate) struct TokenBucket {
36    tokens: f64,
37    capacity: f64,
38    refill_per_sec: f64,
39    last: tokio::time::Instant,
40}
41
42impl TokenBucket {
43    /// `requests` tokens per `per` window; starts full (burst = `requests`).
44    /// Callers must ensure `requests >= 1` and `per > 0`.
45    pub(crate) fn new(requests: u32, per: Duration) -> Self {
46        let capacity = f64::from(requests);
47        let refill_per_sec = capacity / per.as_secs_f64();
48        TokenBucket {
49            tokens: capacity,
50            capacity,
51            refill_per_sec,
52            last: tokio::time::Instant::now(),
53        }
54    }
55
56    fn refill(&mut self) {
57        let now = tokio::time::Instant::now();
58        let elapsed = now.saturating_duration_since(self.last).as_secs_f64();
59        if elapsed > 0.0 {
60            self.tokens = (self.tokens + elapsed * self.refill_per_sec).min(self.capacity);
61            self.last = now;
62        }
63    }
64
65    /// Acquire one token, sleeping until the bucket refills if empty. Returns
66    /// `true` if it had to wait (so the caller can record the rate-limit metric).
67    async fn acquire(&mut self) -> bool {
68        self.refill();
69        if self.tokens >= 1.0 {
70            self.tokens -= 1.0;
71            return false;
72        }
73        let needed = 1.0 - self.tokens;
74        let wait = Duration::from_secs_f64(needed / self.refill_per_sec);
75        tokio::time::sleep(wait).await;
76        self.refill();
77        self.tokens = (self.tokens - 1.0).max(0.0);
78        true
79    }
80}
81
82/// One configured webhook. Filters each result by kind and scope, then renders
83/// and posts a templated request, classifying the outcome for the delivery
84/// layer.
85pub struct WebhookSink {
86    id: String,
87    /// `id` leaked to `&'static str` so it can serve as the shared per-sink
88    /// delivery label (queue depth, retries, DLQ), giving a one-to-one mapping
89    /// to the webhook-specific `rsigma_webhook_*` series. Bounded by the
90    /// configured webhook count, leaked once at startup.
91    label: &'static str,
92    kind: WebhookKind,
93    method: reqwest::Method,
94    url: String,
95    headers: Vec<(String, String)>,
96    body: Option<String>,
97    timeout: Duration,
98    scope: crate::enrichment::Scope,
99    limiter: Option<TokenBucket>,
100    client: HttpEnricherClient,
101    metrics: Arc<dyn MetricsHook>,
102    /// Optional HMAC request signer. When set, every delivery carries
103    /// signature headers over the rendered body bytes.
104    signer: Option<WebhookSigner>,
105}
106
107impl WebhookSink {
108    #[allow(clippy::too_many_arguments)]
109    pub(crate) fn new(
110        id: String,
111        kind: WebhookKind,
112        method: reqwest::Method,
113        url: String,
114        headers: Vec<(String, String)>,
115        body: Option<String>,
116        timeout: Duration,
117        scope: crate::enrichment::Scope,
118        limiter: Option<TokenBucket>,
119        client: HttpEnricherClient,
120        metrics: Arc<dyn MetricsHook>,
121        signer: Option<WebhookSigner>,
122    ) -> Self {
123        let label: &'static str = Box::leak(id.clone().into_boxed_str());
124        WebhookSink {
125            id,
126            label,
127            kind,
128            method,
129            url,
130            headers,
131            body,
132            timeout,
133            scope,
134            limiter,
135            client,
136            metrics,
137            signer,
138        }
139    }
140
141    /// The webhook id, used as the webhook-specific metric label.
142    pub fn id(&self) -> &str {
143        &self.id
144    }
145
146    /// The webhook id as a `&'static str`, used as the shared per-sink
147    /// delivery label so its queue/retry/DLQ series map one-to-one to the
148    /// `rsigma_webhook_*` series.
149    pub fn label(&self) -> &'static str {
150        self.label
151    }
152
153    /// Deliver every matching result in `result`.
154    ///
155    /// Results that do not match this webhook's `kind` or `scope` are skipped
156    /// (a no-op success). On the first delivery error the call short-circuits
157    /// with that error so the shared worker can apply retry/backoff (for a
158    /// retryable error) or route to the DLQ (for a [`RuntimeError::Permanent`]
159    /// or after the retry budget is spent).
160    pub async fn send(
161        &mut self,
162        result: &ProcessResult,
163        ctx: &DeliveryContext,
164    ) -> Result<(), RuntimeError> {
165        for (idx, eval) in result.iter().enumerate() {
166            if !self.kind.matches(&eval.body) || !self.scope.matches(eval) {
167                continue;
168            }
169            self.deliver_one(eval, ctx, idx).await?;
170        }
171        Ok(())
172    }
173
174    async fn deliver_one(
175        &mut self,
176        eval: &EvaluationResult,
177        ctx: &DeliveryContext,
178        idx: usize,
179    ) -> Result<(), RuntimeError> {
180        let waited = match &mut self.limiter {
181            Some(limiter) => limiter.acquire().await,
182            None => false,
183        };
184        if waited {
185            self.metrics.on_webhook_rate_limited(&self.id);
186        }
187
188        let url = render_template(&self.url, eval);
189        // Reserve for the user headers plus up to three signing headers
190        // (webhook-id/timestamp/signature is the widest scheme) so a signed
191        // request does not reallocate the map.
192        let signing_headroom = if self.signer.is_some() { 3 } else { 0 };
193        let mut header_map = HeaderMap::with_capacity(self.headers.len() + signing_headroom);
194        for (name, value_template) in &self.headers {
195            let rendered = render_template(value_template, eval);
196            let header_name = HeaderName::from_bytes(name.as_bytes()).map_err(|e| {
197                RuntimeError::Permanent(format!(
198                    "webhook {}: invalid header name '{name}': {e}",
199                    self.id
200                ))
201            })?;
202            let header_value = HeaderValue::from_str(&rendered).map_err(|e| {
203                RuntimeError::Permanent(format!(
204                    "webhook {}: invalid header value for '{name}': {e}",
205                    self.id
206                ))
207            })?;
208            header_map.insert(header_name, header_value);
209        }
210        let body = self.body.as_ref().map(|b| render_template_json(b, eval));
211
212        // Sign the exact body bytes that go on the wire. The id and timestamp
213        // come from the per-delivery context, so a retry reproduces the same
214        // signature. The id is unique per result within the delivery.
215        if let Some(signer) = &self.signer {
216            let request_id = format!("{}-{idx}", ctx.id_base());
217            for (name, value) in signer.sign(
218                body.as_deref().unwrap_or(""),
219                ctx.first_attempt(),
220                &request_id,
221            ) {
222                let header_name = HeaderName::from_bytes(name.as_bytes()).map_err(|e| {
223                    RuntimeError::Permanent(format!(
224                        "webhook {}: invalid signing header name '{name}': {e}",
225                        self.id
226                    ))
227                })?;
228                let header_value = HeaderValue::from_str(&value).map_err(|e| {
229                    RuntimeError::Permanent(format!(
230                        "webhook {}: invalid signing header value for '{name}': {e}",
231                        self.id
232                    ))
233                })?;
234                header_map.insert(header_name, header_value);
235            }
236        }
237
238        let mut req = self
239            .client
240            .inner()
241            .request(self.method.clone(), &url)
242            .headers(header_map)
243            .timeout(self.timeout);
244        if let Some(b) = &body {
245            req = req.body(b.clone());
246        }
247
248        let started = std::time::Instant::now();
249        let resp = match req.send().await {
250            Ok(r) => r,
251            // Transport-level failures (connect, DNS/egress block, timeout)
252            // heal on retry, so they are retryable, not permanent.
253            Err(e) => {
254                return Err(RuntimeError::Io(std::io::Error::other(format!(
255                    "webhook {}: request error: {e}",
256                    self.id
257                ))));
258            }
259        };
260
261        let status = resp.status();
262        let elapsed = started.elapsed().as_secs_f64();
263
264        if status.is_success() {
265            drain_body(resp).await;
266            self.metrics
267                .on_webhook_request(&self.id, "success", elapsed);
268            return Ok(());
269        }
270
271        let retry_after = parse_retry_after(&resp);
272        drain_body(resp).await;
273
274        if status.as_u16() == 429 || status.is_server_error() {
275            // Retryable: honor Retry-After (capped) before yielding to the
276            // shared worker's own backoff schedule.
277            if let Some(wait) = retry_after {
278                tokio::time::sleep(wait.min(MAX_RETRY_AFTER)).await;
279            }
280            return Err(RuntimeError::Io(std::io::Error::other(format!(
281                "webhook {}: HTTP {status}",
282                self.id
283            ))));
284        }
285
286        // Other 4xx (and any non-2xx, non-429, non-5xx): a misrendered payload
287        // or bad endpoint will not heal on retry.
288        self.metrics
289            .on_webhook_request(&self.id, "permanent_failure", elapsed);
290        Err(RuntimeError::Permanent(format!(
291            "webhook {}: HTTP {status}",
292            self.id
293        )))
294    }
295}
296
297/// Read and discard up to [`DRAIN_CAP`] bytes of the response body.
298async fn drain_body(mut resp: reqwest::Response) {
299    let mut read = 0usize;
300    while read < DRAIN_CAP {
301        match resp.chunk().await {
302            Ok(Some(chunk)) => read += chunk.len(),
303            _ => break,
304        }
305    }
306}
307
308/// Parse a numeric `Retry-After` header (delay in seconds). The HTTP-date form
309/// is intentionally not supported; chat/paging endpoints use delay-seconds.
310fn parse_retry_after(resp: &reqwest::Response) -> Option<Duration> {
311    resp.headers()
312        .get(reqwest::header::RETRY_AFTER)?
313        .to_str()
314        .ok()?
315        .trim()
316        .parse::<u64>()
317        .ok()
318        .map(Duration::from_secs)
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324    use std::collections::HashMap;
325
326    use base64::Engine as _;
327    use base64::engine::general_purpose::STANDARD as BASE64;
328    use hmac::{Hmac, KeyInit, Mac};
329    use rsigma_eval::result::{DetectionBody, ResultBody, RuleHeader};
330    use rsigma_parser::Level;
331    use sha2::Sha256;
332    use wiremock::matchers::{method, path};
333    use wiremock::{Mock, MockServer, ResponseTemplate};
334
335    use zeroize::Zeroizing;
336
337    use super::super::signing::SigningScheme;
338    use crate::metrics::NoopMetrics;
339
340    fn detection(title: &str) -> EvaluationResult {
341        EvaluationResult {
342            header: RuleHeader {
343                rule_title: title.to_string(),
344                rule_id: Some("rule-1".to_string()),
345                level: Some(Level::High),
346                tags: vec![],
347                custom_attributes: Arc::new(HashMap::new()),
348                enrichments: None,
349            },
350            body: ResultBody::Detection(DetectionBody {
351                matched_selections: vec!["sel".to_string()],
352                matched_fields: vec![],
353                event: None,
354            }),
355        }
356    }
357
358    fn sink_to(url: String) -> WebhookSink {
359        WebhookSink::new(
360            "test".to_string(),
361            WebhookKind::Detection,
362            reqwest::Method::POST,
363            url,
364            vec![("Content-Type".to_string(), "application/json".to_string())],
365            Some(r#"{"text":"${detection.rule.title}"}"#.to_string()),
366            Duration::from_secs(5),
367            crate::enrichment::Scope::default(),
368            None,
369            crate::enrichment::build_default_http_client().unwrap(),
370            Arc::new(NoopMetrics),
371            None,
372        )
373    }
374
375    fn ctx() -> DeliveryContext {
376        DeliveryContext::new()
377    }
378
379    fn signed_sink_to(url: String, signer: WebhookSigner) -> WebhookSink {
380        WebhookSink::new(
381            "test".to_string(),
382            WebhookKind::Detection,
383            reqwest::Method::POST,
384            url,
385            vec![("Content-Type".to_string(), "application/json".to_string())],
386            Some(r#"{"text":"${detection.rule.title}"}"#.to_string()),
387            Duration::from_secs(5),
388            crate::enrichment::Scope::default(),
389            None,
390            crate::enrichment::build_default_http_client().unwrap(),
391            Arc::new(NoopMetrics),
392            Some(signer),
393        )
394    }
395
396    #[tokio::test]
397    async fn success_2xx_is_ok() {
398        let server = MockServer::start().await;
399        Mock::given(method("POST"))
400            .and(path("/hook"))
401            .respond_with(ResponseTemplate::new(204))
402            .mount(&server)
403            .await;
404        let mut sink = sink_to(format!("{}/hook", server.uri()));
405        let result: ProcessResult = vec![detection("hi")];
406        assert!(sink.send(&result, &ctx()).await.is_ok());
407    }
408
409    #[tokio::test]
410    async fn server_error_is_retryable() {
411        let server = MockServer::start().await;
412        Mock::given(method("POST"))
413            .respond_with(ResponseTemplate::new(500))
414            .mount(&server)
415            .await;
416        let mut sink = sink_to(format!("{}/hook", server.uri()));
417        let result: ProcessResult = vec![detection("hi")];
418        match sink.send(&result, &ctx()).await {
419            Err(RuntimeError::Io(_)) => {}
420            other => panic!("expected retryable Io error, got {other:?}"),
421        }
422    }
423
424    #[tokio::test]
425    async fn client_error_is_permanent() {
426        let server = MockServer::start().await;
427        Mock::given(method("POST"))
428            .respond_with(ResponseTemplate::new(400))
429            .mount(&server)
430            .await;
431        let mut sink = sink_to(format!("{}/hook", server.uri()));
432        let result: ProcessResult = vec![detection("hi")];
433        match sink.send(&result, &ctx()).await {
434            Err(RuntimeError::Permanent(_)) => {}
435            other => panic!("expected permanent error, got {other:?}"),
436        }
437    }
438
439    #[tokio::test]
440    async fn non_matching_kind_is_skipped_without_request() {
441        // No mock mounted: any request would 404 and fail. A correlation-only
442        // result must be skipped by a detection webhook, so send() is a no-op.
443        let server = MockServer::start().await;
444        let mut sink = sink_to(format!("{}/hook", server.uri()));
445        let correlation = EvaluationResult {
446            header: RuleHeader {
447                rule_title: "corr".to_string(),
448                rule_id: None,
449                level: None,
450                tags: vec![],
451                custom_attributes: Arc::new(HashMap::new()),
452                enrichments: None,
453            },
454            body: ResultBody::Correlation(rsigma_eval::result::CorrelationBody {
455                correlation_type: rsigma_parser::CorrelationType::EventCount,
456                aggregated_value: 1.0,
457                timespan_secs: 60,
458                group_key: vec![],
459                events: None,
460                event_refs: None,
461            }),
462        };
463        let result: ProcessResult = vec![correlation];
464        assert!(sink.send(&result, &ctx()).await.is_ok());
465    }
466
467    #[test]
468    fn slack_recipe_body_renders_to_pinned_json() {
469        // Pin the template-engine-plus-JSON-escaping contract for a realistic
470        // Slack-style body: the matched CommandLine carries embedded quotes
471        // that must be escaped so the rendered body stays valid JSON.
472        let body = r#"{"text":":rotating_light: ${detection.rule.title} (${detection.rule.level}) cmd=${detection.fields.CommandLine}"}"#;
473        let mut r = detection("Encoded PowerShell");
474        if let ResultBody::Detection(d) = &mut r.body {
475            d.matched_fields.push(rsigma_eval::result::FieldMatch::new(
476                "CommandLine",
477                serde_json::json!(r#"powershell -enc "AAA""#),
478            ));
479        }
480        let rendered = crate::enrichment::render_template_json(body, &r);
481        assert_eq!(
482            rendered,
483            r#"{"text":":rotating_light: Encoded PowerShell (high) cmd=powershell -enc \"AAA\""}"#,
484        );
485        // The escaped body must parse as JSON despite the embedded quotes.
486        let _: serde_json::Value = serde_json::from_str(&rendered).expect("valid JSON");
487    }
488
489    #[tokio::test]
490    async fn signed_request_carries_a_verifiable_signature() {
491        let server = MockServer::start().await;
492        Mock::given(method("POST"))
493            .and(path("/hook"))
494            .respond_with(ResponseTemplate::new(200))
495            .mount(&server)
496            .await;
497
498        let secret = b"shared-secret".to_vec();
499        let signer = WebhookSigner::new(
500            SigningScheme::Standard,
501            vec![Zeroizing::new(secret.clone())],
502        );
503        let mut sink = signed_sink_to(format!("{}/hook", server.uri()), signer);
504        sink.send(&vec![detection("hi")], &ctx()).await.unwrap();
505
506        let reqs = server.received_requests().await.unwrap();
507        let req = &reqs[0];
508        let header = |name: &str| {
509            req.headers
510                .get(name)
511                .and_then(|v| v.to_str().ok())
512                .unwrap_or_default()
513                .to_string()
514        };
515        let id = header("webhook-id");
516        let ts = header("webhook-timestamp");
517        let sig = header("webhook-signature");
518        assert!(id.starts_with("msg_"), "id should be msg_<uuid>: {id}");
519
520        // Recompute the HMAC over the exact bytes the receiver would: the
521        // signed content is "{id}.{timestamp}.{body}" and body is the wire body.
522        let body = std::str::from_utf8(&req.body).unwrap();
523        let signed = format!("{id}.{ts}.{body}");
524        let mut mac = Hmac::<Sha256>::new_from_slice(&secret).unwrap();
525        mac.update(signed.as_bytes());
526        let expected = format!("v1,{}", BASE64.encode(mac.finalize().into_bytes()));
527        assert_eq!(sig, expected, "signature must verify over the wire body");
528    }
529
530    #[tokio::test]
531    async fn retries_with_the_same_context_reproduce_the_signature() {
532        let server = MockServer::start().await;
533        Mock::given(method("POST"))
534            .respond_with(ResponseTemplate::new(200))
535            .mount(&server)
536            .await;
537
538        let signer =
539            WebhookSigner::new(SigningScheme::Standard, vec![Zeroizing::new(b"k".to_vec())]);
540        let mut sink = signed_sink_to(format!("{}/hook", server.uri()), signer);
541        let result: ProcessResult = vec![detection("hi")];
542
543        // The delivery worker reuses one context across retries; emulate that by
544        // sending twice with the same context.
545        let context = ctx();
546        sink.send(&result, &context).await.unwrap();
547        sink.send(&result, &context).await.unwrap();
548
549        let reqs = server.received_requests().await.unwrap();
550        assert_eq!(reqs.len(), 2);
551        let pick = |i: usize, name: &str| {
552            reqs[i]
553                .headers
554                .get(name)
555                .and_then(|v| v.to_str().ok())
556                .unwrap_or_default()
557                .to_string()
558        };
559        for name in ["webhook-id", "webhook-timestamp", "webhook-signature"] {
560            assert_eq!(
561                pick(0, name),
562                pick(1, name),
563                "{name} must be identical across retries"
564            );
565        }
566    }
567
568    #[tokio::test]
569    async fn token_bucket_waits_when_empty() {
570        // 2 tokens per 100ms => one token refills in ~50ms.
571        let mut tb = TokenBucket::new(2, Duration::from_millis(100));
572        assert!(!tb.acquire().await, "first token is free");
573        assert!(!tb.acquire().await, "second token is free");
574        let start = std::time::Instant::now();
575        assert!(tb.acquire().await, "third token must wait");
576        assert!(start.elapsed() >= Duration::from_millis(40));
577    }
578}