Skip to main content

courier/sources/
http_webhook.rs

1use anyhow::{Result, bail};
2use async_trait::async_trait;
3use axum::Router;
4use axum::body::Bytes;
5use axum::extract::State;
6use axum::http::{HeaderMap, StatusCode};
7use axum::response::{IntoResponse, Response};
8use axum::routing::post;
9use serde::Deserialize;
10use serde_json::Value;
11use std::net::SocketAddr;
12use tokio::net::TcpListener;
13use tokio::sync::mpsc::Sender;
14use tokio_util::sync::CancellationToken;
15
16use crate::config::{parse_config, redact_secret};
17use crate::envelope::Envelope;
18use crate::observability::trace_context::{TRACEPARENT, TRACESTATE};
19use crate::observability::{NodeCtx, SourceCtx};
20use crate::retry::RetryPolicy;
21use crate::sources::Source;
22
23/// Accepts HTTP webhook requests and emits each valid JSON request body as
24/// an envelope payload. The request is acknowledged only after the envelope
25/// has been sent to the pipeline channel, so normal mpsc backpressure applies
26/// to incoming HTTP traffic.
27pub struct HttpWebhookSource {
28    id: String,
29    bind: SocketAddr,
30    path: String,
31    source_ctx: SourceCtx,
32}
33
34impl HttpWebhookSource {
35    pub fn new(id: impl Into<String>, bind: SocketAddr, path: impl Into<String>) -> Self {
36        let id = id.into();
37        Self {
38            source_ctx: SourceCtx::new(&id),
39            id,
40            bind,
41            path: path.into(),
42        }
43    }
44}
45
46#[async_trait]
47impl Source for HttpWebhookSource {
48    fn id(&self) -> &str {
49        &self.id
50    }
51
52    fn set_node_ctx(&mut self, ctx: NodeCtx) {
53        self.source_ctx = SourceCtx::from_node_ctx(ctx);
54    }
55
56    async fn run(self: Box<Self>, tx: Sender<Envelope>, cancel: CancellationToken) {
57        let state = WebhookState {
58            source_id: self.id.clone(),
59            source_ctx: self.source_ctx.clone(),
60            tx,
61            cancel: cancel.clone(),
62        };
63        let app = Router::new()
64            .route(&self.path, post(handle_webhook))
65            .fallback(not_found)
66            .with_state(state);
67
68        let listener = match TcpListener::bind(self.bind).await {
69            Ok(listener) => listener,
70            Err(e) => {
71                log::error!(
72                    "[{}] failed to bind {}: {e}",
73                    redact_secret(&self.id),
74                    redact_secret(&self.bind.to_string())
75                );
76                return;
77            }
78        };
79
80        let local_addr = listener.local_addr().unwrap_or(self.bind);
81        log::info!(
82            "[{}] listening for POST {} on {}",
83            redact_secret(&self.id),
84            redact_secret(&self.path),
85            local_addr
86        );
87
88        if let Err(e) = axum::serve(listener, app)
89            .with_graceful_shutdown(cancel.cancelled_owned())
90            .await
91        {
92            log::error!("[{}] webhook server failed: {e}", redact_secret(&self.id));
93        }
94    }
95}
96
97#[derive(Clone)]
98struct WebhookState {
99    source_id: String,
100    source_ctx: SourceCtx,
101    tx: Sender<Envelope>,
102    cancel: CancellationToken,
103}
104
105async fn handle_webhook(
106    State(state): State<WebhookState>,
107    headers: HeaderMap,
108    body: Bytes,
109) -> Response {
110    let payload = match serde_json::from_slice::<Value>(&body) {
111        Ok(payload) => payload,
112        Err(e) => {
113            return (
114                StatusCode::BAD_REQUEST,
115                format!("invalid JSON request body: {e}"),
116            )
117                .into_response();
118        }
119    };
120
121    let mut env = Envelope::new(&state.source_id, payload);
122    capture_headers(&headers, &mut env);
123
124    match state.source_ctx.send(&state.tx, env, &state.cancel).await {
125        Ok(()) => (StatusCode::ACCEPTED, "accepted").into_response(),
126        Err(_) => (
127            StatusCode::SERVICE_UNAVAILABLE,
128            "pipeline is not accepting webhook events",
129        )
130            .into_response(),
131    }
132}
133
134async fn not_found() -> impl IntoResponse {
135    (StatusCode::NOT_FOUND, "webhook path not found")
136}
137
138fn capture_headers(headers: &HeaderMap, env: &mut Envelope) {
139    for (name, value) in headers {
140        let Ok(value) = value.to_str() else {
141            continue;
142        };
143        let name_str = name.as_str();
144        if matches!(name_str, TRACEPARENT | TRACESTATE) {
145            // Trace-context headers are owned by the observability layer.
146            // Store them under the bare key so SourceCtx::send can refresh
147            // the span context without leaving a stale `http.header.*` copy.
148            env.meta
149                .headers
150                .insert(name_str.to_string(), value.to_string());
151        } else {
152            env.meta
153                .headers
154                .insert(format!("http.header.{}", name_str), value.to_string());
155        }
156    }
157}
158
159#[derive(Debug, Deserialize)]
160struct HttpWebhookSourceConfig {
161    bind: String,
162    path: String,
163}
164
165/// Registry factory for [`HttpWebhookSource`]. Registered by
166/// `courier::registry::register_builtin` under kind `"http_webhook"`.
167///
168/// `retry` is rejected at config time: webhook is push-based, so there is
169/// nothing to "retry" — the upstream owns the retry decision.
170pub fn http_webhook_source_factory(
171    id: &str,
172    config: Value,
173    retry: Option<RetryPolicy>,
174) -> Result<Box<dyn Source>> {
175    if retry.is_some() {
176        bail!(
177            "invalid config for component type 'http_webhook': retry has no effect on push-based sources"
178        );
179    }
180    let config: HttpWebhookSourceConfig = parse_config("http_webhook", config)?;
181    let bind = config.bind.parse::<SocketAddr>().map_err(|e| {
182        anyhow::anyhow!(
183            "invalid http_webhook bind '{}': {e}",
184            redact_secret(&config.bind)
185        )
186    })?;
187    if !config.path.starts_with('/') {
188        bail!(
189            "invalid http_webhook path '{}': path must start with '/'",
190            redact_secret(&config.path)
191        );
192    }
193
194    Ok(Box::new(HttpWebhookSource::new(id, bind, config.path)))
195}
196
197#[cfg(test)]
198mod tests {
199    use std::net::{SocketAddr, TcpListener as StdTcpListener};
200    use std::time::Duration;
201
202    use reqwest::Client;
203    use serde_json::json;
204    use tokio::sync::mpsc;
205
206    use super::*;
207
208    #[tokio::test]
209    async fn emits_envelope_for_valid_webhook_request() {
210        let bind = unused_local_addr();
211        let source = HttpWebhookSource::new("webhook", bind, "/events");
212        let (tx, mut rx) = mpsc::channel(8);
213        let cancel = CancellationToken::new();
214
215        let c = cancel.clone();
216        let handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
217        tokio::time::sleep(Duration::from_millis(50)).await;
218
219        let response = Client::new()
220            .post(format!("http://{bind}/events"))
221            .header("x-event-id", "evt-1")
222            .header(
223                "traceparent",
224                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
225            )
226            .json(&json!({ "event": "created" }))
227            .send()
228            .await
229            .unwrap();
230
231        assert_eq!(response.status(), StatusCode::ACCEPTED);
232
233        let env = tokio::time::timeout(Duration::from_secs(2), rx.recv())
234            .await
235            .expect("webhook timed out")
236            .expect("source closed before emitting");
237
238        assert_eq!(env.meta.source_id, "webhook");
239        assert_eq!(env.payload, json!({ "event": "created" }));
240        assert_eq!(
241            env.meta.headers.get("http.header.x-event-id"),
242            Some(&"evt-1".to_string())
243        );
244        assert!(env.meta.headers.contains_key(TRACEPARENT));
245        // Trace-context headers are not duplicated under http.header.* so
246        // that SourceCtx::send can refresh the bare key without leaving a
247        // stale copy behind.
248        assert!(
249            !env.meta.headers.contains_key("http.header.traceparent"),
250            "traceparent should not be duplicated under http.header.*"
251        );
252
253        cancel.cancel();
254        let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
255    }
256
257    #[tokio::test]
258    async fn rejects_wrong_method_and_invalid_json() {
259        let bind = unused_local_addr();
260        let source = HttpWebhookSource::new("webhook", bind, "/events");
261        let (tx, _rx) = mpsc::channel(8);
262        let cancel = CancellationToken::new();
263
264        let c = cancel.clone();
265        let handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
266        tokio::time::sleep(Duration::from_millis(50)).await;
267
268        let client = Client::new();
269        let wrong_method = client
270            .get(format!("http://{bind}/events"))
271            .send()
272            .await
273            .unwrap();
274        assert_eq!(wrong_method.status(), StatusCode::METHOD_NOT_ALLOWED);
275
276        let invalid_json = client
277            .post(format!("http://{bind}/events"))
278            .body("not json")
279            .send()
280            .await
281            .unwrap();
282        assert_eq!(invalid_json.status(), StatusCode::BAD_REQUEST);
283
284        cancel.cancel();
285        let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
286    }
287
288    #[tokio::test]
289    async fn response_waits_for_channel_capacity_before_returning_accepted() {
290        let bind = unused_local_addr();
291        let source = HttpWebhookSource::new("webhook", bind, "/events");
292        let (tx, mut rx) = mpsc::channel(1);
293        let cancel = CancellationToken::new();
294
295        let c = cancel.clone();
296        let source_handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
297        tokio::time::sleep(Duration::from_millis(50)).await;
298
299        let client = Client::new();
300        let first = client
301            .post(format!("http://{bind}/events"))
302            .json(&json!({ "n": 1 }))
303            .send()
304            .await
305            .unwrap();
306        assert_eq!(first.status(), StatusCode::ACCEPTED);
307
308        let second_client = client.clone();
309        let second = tokio::spawn(async move {
310            second_client
311                .post(format!("http://{bind}/events"))
312                .json(&json!({ "n": 2 }))
313                .send()
314                .await
315                .unwrap()
316        });
317
318        tokio::time::sleep(Duration::from_millis(100)).await;
319        assert!(
320            !second.is_finished(),
321            "second request returned before downstream channel had capacity"
322        );
323
324        let first_env = rx.recv().await.expect("expected first envelope");
325        assert_eq!(first_env.payload, json!({ "n": 1 }));
326
327        let second_response = tokio::time::timeout(Duration::from_secs(2), second)
328            .await
329            .expect("second request stayed blocked after capacity was freed")
330            .expect("second request task failed");
331        assert_eq!(second_response.status(), StatusCode::ACCEPTED);
332
333        let second_env = rx.recv().await.expect("expected second envelope");
334        assert_eq!(second_env.payload, json!({ "n": 2 }));
335
336        cancel.cancel();
337        let _ = tokio::time::timeout(Duration::from_secs(1), source_handle).await;
338    }
339
340    #[tokio::test]
341    async fn blocked_request_returns_unavailable_when_cancelled() {
342        let bind = unused_local_addr();
343        let source = HttpWebhookSource::new("webhook", bind, "/events");
344        let (tx, _rx) = mpsc::channel(1);
345        let cancel = CancellationToken::new();
346
347        let c = cancel.clone();
348        let source_handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
349        tokio::time::sleep(Duration::from_millis(50)).await;
350
351        let client = Client::new();
352        let first = client
353            .post(format!("http://{bind}/events"))
354            .json(&json!({ "n": 1 }))
355            .send()
356            .await
357            .unwrap();
358        assert_eq!(first.status(), StatusCode::ACCEPTED);
359
360        let second_client = client.clone();
361        let second = tokio::spawn(async move {
362            second_client
363                .post(format!("http://{bind}/events"))
364                .json(&json!({ "n": 2 }))
365                .send()
366                .await
367                .unwrap()
368        });
369
370        tokio::time::sleep(Duration::from_millis(100)).await;
371        assert!(
372            !second.is_finished(),
373            "second request returned before cancellation"
374        );
375
376        cancel.cancel();
377        let second_response = tokio::time::timeout(Duration::from_secs(2), second)
378            .await
379            .expect("second request stayed blocked after cancellation")
380            .expect("second request task failed");
381        assert_eq!(second_response.status(), StatusCode::SERVICE_UNAVAILABLE);
382
383        let _ = tokio::time::timeout(Duration::from_secs(1), source_handle).await;
384    }
385
386    #[test]
387    fn factory_rejects_invalid_path() {
388        let err = match http_webhook_source_factory(
389            "webhook",
390            json!({
391                "bind": "127.0.0.1:8080",
392                "path": "events",
393            }),
394            None,
395        ) {
396            Ok(_) => panic!("expected invalid path error"),
397            Err(err) => err,
398        };
399
400        assert!(err.to_string().contains("path must start with '/'"));
401    }
402
403    #[test]
404    fn factory_builds_with_required_fields() {
405        let source = http_webhook_source_factory(
406            "webhook",
407            json!({
408                "bind": "127.0.0.1:8080",
409                "path": "/events",
410            }),
411            None,
412        )
413        .unwrap();
414
415        assert_eq!(source.id(), "webhook");
416    }
417
418    #[test]
419    fn factory_rejects_retry_policy() {
420        use crate::retry::{ExhaustedPolicy, RetryPolicy};
421
422        let err = http_webhook_source_factory(
423            "webhook",
424            json!({ "bind": "127.0.0.1:8080", "path": "/events" }),
425            Some(RetryPolicy {
426                max_attempts: 3,
427                initial_delay_ms: 100,
428                backoff_multiplier: 2.0,
429                max_delay_ms: 1000,
430                on_exhausted: ExhaustedPolicy::Propagate,
431            }),
432        )
433        .err()
434        .expect("expected retry rejection");
435        let msg = format!("{err:#}");
436        assert!(
437            msg.contains("retry has no effect on push-based sources"),
438            "{msg}"
439        );
440    }
441
442    fn unused_local_addr() -> SocketAddr {
443        let listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
444        listener.local_addr().unwrap()
445    }
446}