1use anyhow::{Result, bail};
2use async_trait::async_trait;
3use axum::Router;
4use axum::body::Bytes;
5use axum::extract::State;
6use axum::http::{HeaderMap, StatusCode};
7use axum::response::{IntoResponse, Response};
8use axum::routing::post;
9use serde::Deserialize;
10use serde_json::Value;
11use std::net::SocketAddr;
12use tokio::net::TcpListener;
13use tokio::sync::mpsc::Sender;
14use tokio_util::sync::CancellationToken;
15
16use crate::config::{parse_config, redact_secret};
17use crate::envelope::Envelope;
18use crate::observability::trace_context::{TRACEPARENT, TRACESTATE};
19use crate::observability::{NodeCtx, SourceCtx};
20use crate::retry::RetryPolicy;
21use crate::sources::Source;
22
23pub struct HttpWebhookSource {
28 id: String,
29 bind: SocketAddr,
30 path: String,
31 source_ctx: SourceCtx,
32}
33
34impl HttpWebhookSource {
35 pub fn new(id: impl Into<String>, bind: SocketAddr, path: impl Into<String>) -> Self {
36 let id = id.into();
37 Self {
38 source_ctx: SourceCtx::new(&id),
39 id,
40 bind,
41 path: path.into(),
42 }
43 }
44}
45
46#[async_trait]
47impl Source for HttpWebhookSource {
48 fn id(&self) -> &str {
49 &self.id
50 }
51
52 fn set_node_ctx(&mut self, ctx: NodeCtx) {
53 self.source_ctx = SourceCtx::from_node_ctx(ctx);
54 }
55
56 async fn run(self: Box<Self>, tx: Sender<Envelope>, cancel: CancellationToken) {
57 let state = WebhookState {
58 source_id: self.id.clone(),
59 source_ctx: self.source_ctx.clone(),
60 tx,
61 cancel: cancel.clone(),
62 };
63 let app = Router::new()
64 .route(&self.path, post(handle_webhook))
65 .fallback(not_found)
66 .with_state(state);
67
68 let listener = match TcpListener::bind(self.bind).await {
69 Ok(listener) => listener,
70 Err(e) => {
71 log::error!(
72 "[{}] failed to bind {}: {e}",
73 redact_secret(&self.id),
74 redact_secret(&self.bind.to_string())
75 );
76 return;
77 }
78 };
79
80 let local_addr = listener.local_addr().unwrap_or(self.bind);
81 log::info!(
82 "[{}] listening for POST {} on {}",
83 redact_secret(&self.id),
84 redact_secret(&self.path),
85 local_addr
86 );
87
88 if let Err(e) = axum::serve(listener, app)
89 .with_graceful_shutdown(cancel.cancelled_owned())
90 .await
91 {
92 log::error!("[{}] webhook server failed: {e}", redact_secret(&self.id));
93 }
94 }
95}
96
97#[derive(Clone)]
98struct WebhookState {
99 source_id: String,
100 source_ctx: SourceCtx,
101 tx: Sender<Envelope>,
102 cancel: CancellationToken,
103}
104
105async fn handle_webhook(
106 State(state): State<WebhookState>,
107 headers: HeaderMap,
108 body: Bytes,
109) -> Response {
110 let payload = match serde_json::from_slice::<Value>(&body) {
111 Ok(payload) => payload,
112 Err(e) => {
113 return (
114 StatusCode::BAD_REQUEST,
115 format!("invalid JSON request body: {e}"),
116 )
117 .into_response();
118 }
119 };
120
121 let mut env = Envelope::new(&state.source_id, payload);
122 capture_headers(&headers, &mut env);
123
124 match state.source_ctx.send(&state.tx, env, &state.cancel).await {
125 Ok(()) => (StatusCode::ACCEPTED, "accepted").into_response(),
126 Err(_) => (
127 StatusCode::SERVICE_UNAVAILABLE,
128 "pipeline is not accepting webhook events",
129 )
130 .into_response(),
131 }
132}
133
134async fn not_found() -> impl IntoResponse {
135 (StatusCode::NOT_FOUND, "webhook path not found")
136}
137
138fn capture_headers(headers: &HeaderMap, env: &mut Envelope) {
139 for (name, value) in headers {
140 let Ok(value) = value.to_str() else {
141 continue;
142 };
143 let name_str = name.as_str();
144 if matches!(name_str, TRACEPARENT | TRACESTATE) {
145 env.meta
149 .headers
150 .insert(name_str.to_string(), value.to_string());
151 } else {
152 env.meta
153 .headers
154 .insert(format!("http.header.{}", name_str), value.to_string());
155 }
156 }
157}
158
159#[derive(Debug, Deserialize)]
160struct HttpWebhookSourceConfig {
161 bind: String,
162 path: String,
163}
164
165pub fn http_webhook_source_factory(
171 id: &str,
172 config: Value,
173 retry: Option<RetryPolicy>,
174) -> Result<Box<dyn Source>> {
175 if retry.is_some() {
176 bail!(
177 "invalid config for component type 'http_webhook': retry has no effect on push-based sources"
178 );
179 }
180 let config: HttpWebhookSourceConfig = parse_config("http_webhook", config)?;
181 let bind = config.bind.parse::<SocketAddr>().map_err(|e| {
182 anyhow::anyhow!(
183 "invalid http_webhook bind '{}': {e}",
184 redact_secret(&config.bind)
185 )
186 })?;
187 if !config.path.starts_with('/') {
188 bail!(
189 "invalid http_webhook path '{}': path must start with '/'",
190 redact_secret(&config.path)
191 );
192 }
193
194 Ok(Box::new(HttpWebhookSource::new(id, bind, config.path)))
195}
196
197#[cfg(test)]
198mod tests {
199 use std::net::{SocketAddr, TcpListener as StdTcpListener};
200 use std::time::Duration;
201
202 use reqwest::Client;
203 use serde_json::json;
204 use tokio::sync::mpsc;
205
206 use super::*;
207
208 #[tokio::test]
209 async fn emits_envelope_for_valid_webhook_request() {
210 let bind = unused_local_addr();
211 let source = HttpWebhookSource::new("webhook", bind, "/events");
212 let (tx, mut rx) = mpsc::channel(8);
213 let cancel = CancellationToken::new();
214
215 let c = cancel.clone();
216 let handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
217 tokio::time::sleep(Duration::from_millis(50)).await;
218
219 let response = Client::new()
220 .post(format!("http://{bind}/events"))
221 .header("x-event-id", "evt-1")
222 .header(
223 "traceparent",
224 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
225 )
226 .json(&json!({ "event": "created" }))
227 .send()
228 .await
229 .unwrap();
230
231 assert_eq!(response.status(), StatusCode::ACCEPTED);
232
233 let env = tokio::time::timeout(Duration::from_secs(2), rx.recv())
234 .await
235 .expect("webhook timed out")
236 .expect("source closed before emitting");
237
238 assert_eq!(env.meta.source_id, "webhook");
239 assert_eq!(env.payload, json!({ "event": "created" }));
240 assert_eq!(
241 env.meta.headers.get("http.header.x-event-id"),
242 Some(&"evt-1".to_string())
243 );
244 assert!(env.meta.headers.contains_key(TRACEPARENT));
245 assert!(
249 !env.meta.headers.contains_key("http.header.traceparent"),
250 "traceparent should not be duplicated under http.header.*"
251 );
252
253 cancel.cancel();
254 let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
255 }
256
257 #[tokio::test]
258 async fn rejects_wrong_method_and_invalid_json() {
259 let bind = unused_local_addr();
260 let source = HttpWebhookSource::new("webhook", bind, "/events");
261 let (tx, _rx) = mpsc::channel(8);
262 let cancel = CancellationToken::new();
263
264 let c = cancel.clone();
265 let handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
266 tokio::time::sleep(Duration::from_millis(50)).await;
267
268 let client = Client::new();
269 let wrong_method = client
270 .get(format!("http://{bind}/events"))
271 .send()
272 .await
273 .unwrap();
274 assert_eq!(wrong_method.status(), StatusCode::METHOD_NOT_ALLOWED);
275
276 let invalid_json = client
277 .post(format!("http://{bind}/events"))
278 .body("not json")
279 .send()
280 .await
281 .unwrap();
282 assert_eq!(invalid_json.status(), StatusCode::BAD_REQUEST);
283
284 cancel.cancel();
285 let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
286 }
287
288 #[tokio::test]
289 async fn response_waits_for_channel_capacity_before_returning_accepted() {
290 let bind = unused_local_addr();
291 let source = HttpWebhookSource::new("webhook", bind, "/events");
292 let (tx, mut rx) = mpsc::channel(1);
293 let cancel = CancellationToken::new();
294
295 let c = cancel.clone();
296 let source_handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
297 tokio::time::sleep(Duration::from_millis(50)).await;
298
299 let client = Client::new();
300 let first = client
301 .post(format!("http://{bind}/events"))
302 .json(&json!({ "n": 1 }))
303 .send()
304 .await
305 .unwrap();
306 assert_eq!(first.status(), StatusCode::ACCEPTED);
307
308 let second_client = client.clone();
309 let second = tokio::spawn(async move {
310 second_client
311 .post(format!("http://{bind}/events"))
312 .json(&json!({ "n": 2 }))
313 .send()
314 .await
315 .unwrap()
316 });
317
318 tokio::time::sleep(Duration::from_millis(100)).await;
319 assert!(
320 !second.is_finished(),
321 "second request returned before downstream channel had capacity"
322 );
323
324 let first_env = rx.recv().await.expect("expected first envelope");
325 assert_eq!(first_env.payload, json!({ "n": 1 }));
326
327 let second_response = tokio::time::timeout(Duration::from_secs(2), second)
328 .await
329 .expect("second request stayed blocked after capacity was freed")
330 .expect("second request task failed");
331 assert_eq!(second_response.status(), StatusCode::ACCEPTED);
332
333 let second_env = rx.recv().await.expect("expected second envelope");
334 assert_eq!(second_env.payload, json!({ "n": 2 }));
335
336 cancel.cancel();
337 let _ = tokio::time::timeout(Duration::from_secs(1), source_handle).await;
338 }
339
340 #[tokio::test]
341 async fn blocked_request_returns_unavailable_when_cancelled() {
342 let bind = unused_local_addr();
343 let source = HttpWebhookSource::new("webhook", bind, "/events");
344 let (tx, _rx) = mpsc::channel(1);
345 let cancel = CancellationToken::new();
346
347 let c = cancel.clone();
348 let source_handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
349 tokio::time::sleep(Duration::from_millis(50)).await;
350
351 let client = Client::new();
352 let first = client
353 .post(format!("http://{bind}/events"))
354 .json(&json!({ "n": 1 }))
355 .send()
356 .await
357 .unwrap();
358 assert_eq!(first.status(), StatusCode::ACCEPTED);
359
360 let second_client = client.clone();
361 let second = tokio::spawn(async move {
362 second_client
363 .post(format!("http://{bind}/events"))
364 .json(&json!({ "n": 2 }))
365 .send()
366 .await
367 .unwrap()
368 });
369
370 tokio::time::sleep(Duration::from_millis(100)).await;
371 assert!(
372 !second.is_finished(),
373 "second request returned before cancellation"
374 );
375
376 cancel.cancel();
377 let second_response = tokio::time::timeout(Duration::from_secs(2), second)
378 .await
379 .expect("second request stayed blocked after cancellation")
380 .expect("second request task failed");
381 assert_eq!(second_response.status(), StatusCode::SERVICE_UNAVAILABLE);
382
383 let _ = tokio::time::timeout(Duration::from_secs(1), source_handle).await;
384 }
385
386 #[test]
387 fn factory_rejects_invalid_path() {
388 let err = match http_webhook_source_factory(
389 "webhook",
390 json!({
391 "bind": "127.0.0.1:8080",
392 "path": "events",
393 }),
394 None,
395 ) {
396 Ok(_) => panic!("expected invalid path error"),
397 Err(err) => err,
398 };
399
400 assert!(err.to_string().contains("path must start with '/'"));
401 }
402
403 #[test]
404 fn factory_builds_with_required_fields() {
405 let source = http_webhook_source_factory(
406 "webhook",
407 json!({
408 "bind": "127.0.0.1:8080",
409 "path": "/events",
410 }),
411 None,
412 )
413 .unwrap();
414
415 assert_eq!(source.id(), "webhook");
416 }
417
418 #[test]
419 fn factory_rejects_retry_policy() {
420 use crate::retry::{ExhaustedPolicy, RetryPolicy};
421
422 let err = http_webhook_source_factory(
423 "webhook",
424 json!({ "bind": "127.0.0.1:8080", "path": "/events" }),
425 Some(RetryPolicy {
426 max_attempts: 3,
427 initial_delay_ms: 100,
428 backoff_multiplier: 2.0,
429 max_delay_ms: 1000,
430 on_exhausted: ExhaustedPolicy::Propagate,
431 }),
432 )
433 .err()
434 .expect("expected retry rejection");
435 let msg = format!("{err:#}");
436 assert!(
437 msg.contains("retry has no effect on push-based sources"),
438 "{msg}"
439 );
440 }
441
442 fn unused_local_addr() -> SocketAddr {
443 let listener = StdTcpListener::bind("127.0.0.1:0").unwrap();
444 listener.local_addr().unwrap()
445 }
446}