Skip to main content

this/events/sinks/
webhook.rs

1//! Webhook sink — delivers events to external HTTP endpoints
2//!
3//! Sends processed event payloads to configured webhook URLs via
4//! HTTP POST (or PUT). Supports custom headers, retry with exponential
5//! backoff, and configurable timeouts.
6//!
7//! ```yaml
8//! sinks:
9//!   - name: analytics-webhook
10//!     type: webhook
11//!     config:
12//!       url: https://analytics.example.com/events
13//!       method: POST
14//!       headers:
15//!         Authorization: "Bearer {{ env.ANALYTICS_TOKEN }}"
16//! ```
17
18use crate::config::sinks::SinkType;
19use crate::events::sinks::Sink;
20use anyhow::{Result, anyhow};
21use async_trait::async_trait;
22use serde_json::Value;
23use std::collections::HashMap;
24use std::sync::Arc;
25use std::time::Duration;
26
27/// Trait for sending HTTP requests (abstracts reqwest for testability)
28#[async_trait]
29pub trait HttpSender: Send + Sync + std::fmt::Debug {
30    /// Send an HTTP request and return the status code
31    async fn send(
32        &self,
33        method: &str,
34        url: &str,
35        headers: &HashMap<String, String>,
36        body: Value,
37    ) -> Result<u16>;
38}
39
40/// Webhook sink configuration
41#[derive(Debug, Clone)]
42pub struct WebhookConfig {
43    /// Target URL
44    pub url: String,
45
46    /// HTTP method (POST or PUT)
47    pub method: String,
48
49    /// Custom headers to include
50    pub headers: HashMap<String, String>,
51
52    /// Maximum retry attempts
53    pub max_retries: u32,
54
55    /// Backoff durations for retries
56    pub backoff: Vec<Duration>,
57
58    /// Request timeout
59    pub timeout: Duration,
60}
61
62impl Default for WebhookConfig {
63    fn default() -> Self {
64        Self {
65            url: String::new(),
66            method: "POST".to_string(),
67            headers: HashMap::new(),
68            max_retries: 3,
69            backoff: vec![
70                Duration::from_millis(100),
71                Duration::from_millis(500),
72                Duration::from_secs(2),
73            ],
74            timeout: Duration::from_secs(10),
75        }
76    }
77}
78
79/// Webhook notification sink
80///
81/// Sends event payloads to an HTTP endpoint. Supports retry with
82/// exponential backoff on server errors and network failures.
83#[derive(Debug)]
84pub struct WebhookSink {
85    /// Webhook configuration
86    config: WebhookConfig,
87
88    /// HTTP sender (abstract for testing)
89    sender: Arc<dyn HttpSender>,
90}
91
92impl WebhookSink {
93    /// Create a new WebhookSink with a sender and config
94    pub fn new(sender: Arc<dyn HttpSender>, config: WebhookConfig) -> Self {
95        Self { config, sender }
96    }
97
98    /// Send with retry logic
99    async fn send_with_retry(&self, payload: Value) -> Result<()> {
100        let mut last_error = String::new();
101
102        for attempt in 0..=self.config.max_retries {
103            if attempt > 0 {
104                let backoff_idx = (attempt as usize - 1).min(self.config.backoff.len() - 1);
105                let delay = self.config.backoff[backoff_idx];
106                tracing::debug!(
107                    attempt = attempt,
108                    delay_ms = delay.as_millis(),
109                    "webhook: retrying after backoff"
110                );
111                tokio::time::sleep(delay).await;
112            }
113
114            match self
115                .sender
116                .send(
117                    &self.config.method,
118                    &self.config.url,
119                    &self.config.headers,
120                    payload.clone(),
121                )
122                .await
123            {
124                Ok(status) if (200..300).contains(&status) => {
125                    tracing::debug!(
126                        url = %self.config.url,
127                        status = status,
128                        "webhook: delivered successfully"
129                    );
130                    return Ok(());
131                }
132                Ok(status) if (400..500).contains(&status) => {
133                    // Client error — don't retry
134                    return Err(anyhow!(
135                        "webhook: client error {} from {}",
136                        status,
137                        self.config.url
138                    ));
139                }
140                Ok(status) => {
141                    // Server error — retry
142                    last_error = format!("server error {} from {}", status, self.config.url);
143                    tracing::warn!(
144                        url = %self.config.url,
145                        status = status,
146                        attempt = attempt + 1,
147                        "webhook: server error, will retry"
148                    );
149                }
150                Err(e) => {
151                    // Network error — retry
152                    last_error = format!("network error: {}", e);
153                    tracing::warn!(
154                        url = %self.config.url,
155                        error = %e,
156                        attempt = attempt + 1,
157                        "webhook: network error, will retry"
158                    );
159                }
160            }
161        }
162
163        Err(anyhow!(
164            "webhook: failed after {} retries: {}",
165            self.config.max_retries,
166            last_error
167        ))
168    }
169}
170
171#[async_trait]
172impl Sink for WebhookSink {
173    async fn deliver(
174        &self,
175        payload: Value,
176        _recipient_id: Option<&str>,
177        _context_vars: &HashMap<String, Value>,
178    ) -> Result<()> {
179        if self.config.url.is_empty() {
180            return Err(anyhow!("webhook: URL not configured"));
181        }
182
183        self.send_with_retry(payload).await
184    }
185
186    fn name(&self) -> &str {
187        "webhook"
188    }
189
190    fn sink_type(&self) -> SinkType {
191        SinkType::Webhook
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use serde_json::json;
199    use std::sync::atomic::{AtomicUsize, Ordering};
200    use tokio::sync::Mutex;
201
202    /// Recorded HTTP request
203    #[derive(Debug, Clone)]
204    struct RecordedRequest {
205        method: String,
206        url: String,
207        headers: HashMap<String, String>,
208        body: Value,
209    }
210
211    /// Mock HTTP sender
212    #[derive(Debug)]
213    struct MockHttpSender {
214        /// Status codes to return on successive calls
215        responses: Mutex<Vec<Result<u16>>>,
216        /// Recorded requests
217        requests: Mutex<Vec<RecordedRequest>>,
218        /// Call count
219        call_count: AtomicUsize,
220    }
221
222    impl MockHttpSender {
223        fn with_responses(responses: Vec<Result<u16>>) -> Self {
224            Self {
225                responses: Mutex::new(responses),
226                requests: Mutex::new(Vec::new()),
227                call_count: AtomicUsize::new(0),
228            }
229        }
230
231        fn always_ok() -> Self {
232            Self::with_responses(vec![])
233        }
234    }
235
236    #[async_trait]
237    impl HttpSender for MockHttpSender {
238        async fn send(
239            &self,
240            method: &str,
241            url: &str,
242            headers: &HashMap<String, String>,
243            body: Value,
244        ) -> Result<u16> {
245            let idx = self.call_count.fetch_add(1, Ordering::SeqCst);
246            self.requests.lock().await.push(RecordedRequest {
247                method: method.to_string(),
248                url: url.to_string(),
249                headers: headers.clone(),
250                body,
251            });
252
253            let mut responses = self.responses.lock().await;
254            if idx < responses.len() {
255                // Use a placeholder to avoid shifting
256
257                std::mem::replace(&mut responses[idx], Ok(0))
258            } else {
259                Ok(200) // Default: success
260            }
261        }
262    }
263
264    fn fast_config(url: &str) -> WebhookConfig {
265        WebhookConfig {
266            url: url.to_string(),
267            method: "POST".to_string(),
268            headers: HashMap::new(),
269            max_retries: 3,
270            backoff: vec![
271                Duration::from_millis(1),
272                Duration::from_millis(1),
273                Duration::from_millis(1),
274            ],
275            timeout: Duration::from_secs(5),
276        }
277    }
278
279    #[tokio::test]
280    async fn test_webhook_success() {
281        let sender = Arc::new(MockHttpSender::always_ok());
282        let sink = WebhookSink::new(sender.clone(), fast_config("https://example.com/hook"));
283
284        let payload = json!({"event": "user.created", "user_id": "123"});
285        sink.deliver(payload.clone(), None, &HashMap::new())
286            .await
287            .unwrap();
288
289        let requests = sender.requests.lock().await;
290        assert_eq!(requests.len(), 1);
291        assert_eq!(requests[0].method, "POST");
292        assert_eq!(requests[0].url, "https://example.com/hook");
293        assert_eq!(requests[0].body, payload);
294    }
295
296    #[tokio::test]
297    async fn test_webhook_custom_headers() {
298        let sender = Arc::new(MockHttpSender::always_ok());
299        let mut config = fast_config("https://example.com/hook");
300        config
301            .headers
302            .insert("Authorization".to_string(), "Bearer token123".to_string());
303        config.method = "PUT".to_string();
304
305        let sink = WebhookSink::new(sender.clone(), config);
306        sink.deliver(json!({}), None, &HashMap::new())
307            .await
308            .unwrap();
309
310        let requests = sender.requests.lock().await;
311        assert_eq!(requests[0].method, "PUT");
312        assert_eq!(
313            requests[0].headers.get("Authorization").unwrap(),
314            "Bearer token123"
315        );
316    }
317
318    #[tokio::test]
319    async fn test_webhook_retry_on_server_error() {
320        let sender = Arc::new(MockHttpSender::with_responses(vec![
321            Ok(500), // First: server error
322            Ok(200), // Second: success
323        ]));
324
325        let sink = WebhookSink::new(sender.clone(), fast_config("https://example.com"));
326        sink.deliver(json!({}), None, &HashMap::new())
327            .await
328            .unwrap();
329
330        assert_eq!(sender.call_count.load(Ordering::SeqCst), 2);
331    }
332
333    #[tokio::test]
334    async fn test_webhook_no_retry_on_client_error() {
335        let sender = Arc::new(MockHttpSender::with_responses(vec![
336            Ok(400), // Client error — don't retry
337        ]));
338
339        let sink = WebhookSink::new(sender.clone(), fast_config("https://example.com"));
340        let result = sink.deliver(json!({}), None, &HashMap::new()).await;
341
342        assert!(result.is_err());
343        assert!(result.unwrap_err().to_string().contains("client error 400"));
344        assert_eq!(sender.call_count.load(Ordering::SeqCst), 1);
345    }
346
347    #[tokio::test]
348    async fn test_webhook_retry_on_network_error() {
349        let sender = Arc::new(MockHttpSender::with_responses(vec![
350            Err(anyhow!("connection refused")),
351            Ok(200),
352        ]));
353
354        let sink = WebhookSink::new(sender.clone(), fast_config("https://example.com"));
355        sink.deliver(json!({}), None, &HashMap::new())
356            .await
357            .unwrap();
358
359        assert_eq!(sender.call_count.load(Ordering::SeqCst), 2);
360    }
361
362    #[tokio::test]
363    async fn test_webhook_max_retries_exceeded() {
364        let sender = Arc::new(MockHttpSender::with_responses(vec![
365            Ok(503),
366            Ok(503),
367            Ok(503),
368            Ok(503),
369        ]));
370
371        let sink = WebhookSink::new(sender.clone(), fast_config("https://example.com"));
372        let result = sink.deliver(json!({}), None, &HashMap::new()).await;
373
374        assert!(result.is_err());
375        assert!(result.unwrap_err().to_string().contains("after 3 retries"));
376        assert_eq!(sender.call_count.load(Ordering::SeqCst), 4); // 1 + 3 retries
377    }
378
379    #[tokio::test]
380    async fn test_webhook_empty_url_error() {
381        let sender = Arc::new(MockHttpSender::always_ok());
382        let sink = WebhookSink::new(sender, fast_config(""));
383
384        let result = sink.deliver(json!({}), None, &HashMap::new()).await;
385        assert!(result.is_err());
386        assert!(
387            result
388                .unwrap_err()
389                .to_string()
390                .contains("URL not configured")
391        );
392    }
393
394    #[test]
395    fn test_webhook_sink_name_and_type() {
396        let sender = Arc::new(MockHttpSender::always_ok());
397        let sink = WebhookSink::new(sender, fast_config("https://example.com"));
398        assert_eq!(sink.name(), "webhook");
399        assert_eq!(sink.sink_type(), SinkType::Webhook);
400    }
401}