1use crate::config::sinks::SinkType;
19use crate::events::sinks::Sink;
20use anyhow::{Result, anyhow};
21use async_trait::async_trait;
22use serde_json::Value;
23use std::collections::HashMap;
24use std::sync::Arc;
25use std::time::Duration;
26
27#[async_trait]
29pub trait HttpSender: Send + Sync + std::fmt::Debug {
30 async fn send(
32 &self,
33 method: &str,
34 url: &str,
35 headers: &HashMap<String, String>,
36 body: Value,
37 ) -> Result<u16>;
38}
39
40#[derive(Debug, Clone)]
42pub struct WebhookConfig {
43 pub url: String,
45
46 pub method: String,
48
49 pub headers: HashMap<String, String>,
51
52 pub max_retries: u32,
54
55 pub backoff: Vec<Duration>,
57
58 pub timeout: Duration,
60}
61
62impl Default for WebhookConfig {
63 fn default() -> Self {
64 Self {
65 url: String::new(),
66 method: "POST".to_string(),
67 headers: HashMap::new(),
68 max_retries: 3,
69 backoff: vec![
70 Duration::from_millis(100),
71 Duration::from_millis(500),
72 Duration::from_secs(2),
73 ],
74 timeout: Duration::from_secs(10),
75 }
76 }
77}
78
79#[derive(Debug)]
84pub struct WebhookSink {
85 config: WebhookConfig,
87
88 sender: Arc<dyn HttpSender>,
90}
91
92impl WebhookSink {
93 pub fn new(sender: Arc<dyn HttpSender>, config: WebhookConfig) -> Self {
95 Self { config, sender }
96 }
97
98 async fn send_with_retry(&self, payload: Value) -> Result<()> {
100 let mut last_error = String::new();
101
102 for attempt in 0..=self.config.max_retries {
103 if attempt > 0 {
104 let backoff_idx = (attempt as usize - 1).min(self.config.backoff.len() - 1);
105 let delay = self.config.backoff[backoff_idx];
106 tracing::debug!(
107 attempt = attempt,
108 delay_ms = delay.as_millis(),
109 "webhook: retrying after backoff"
110 );
111 tokio::time::sleep(delay).await;
112 }
113
114 match self
115 .sender
116 .send(
117 &self.config.method,
118 &self.config.url,
119 &self.config.headers,
120 payload.clone(),
121 )
122 .await
123 {
124 Ok(status) if (200..300).contains(&status) => {
125 tracing::debug!(
126 url = %self.config.url,
127 status = status,
128 "webhook: delivered successfully"
129 );
130 return Ok(());
131 }
132 Ok(status) if (400..500).contains(&status) => {
133 return Err(anyhow!(
135 "webhook: client error {} from {}",
136 status,
137 self.config.url
138 ));
139 }
140 Ok(status) => {
141 last_error = format!("server error {} from {}", status, self.config.url);
143 tracing::warn!(
144 url = %self.config.url,
145 status = status,
146 attempt = attempt + 1,
147 "webhook: server error, will retry"
148 );
149 }
150 Err(e) => {
151 last_error = format!("network error: {}", e);
153 tracing::warn!(
154 url = %self.config.url,
155 error = %e,
156 attempt = attempt + 1,
157 "webhook: network error, will retry"
158 );
159 }
160 }
161 }
162
163 Err(anyhow!(
164 "webhook: failed after {} retries: {}",
165 self.config.max_retries,
166 last_error
167 ))
168 }
169}
170
171#[async_trait]
172impl Sink for WebhookSink {
173 async fn deliver(
174 &self,
175 payload: Value,
176 _recipient_id: Option<&str>,
177 _context_vars: &HashMap<String, Value>,
178 ) -> Result<()> {
179 if self.config.url.is_empty() {
180 return Err(anyhow!("webhook: URL not configured"));
181 }
182
183 self.send_with_retry(payload).await
184 }
185
186 fn name(&self) -> &str {
187 "webhook"
188 }
189
190 fn sink_type(&self) -> SinkType {
191 SinkType::Webhook
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198 use serde_json::json;
199 use std::sync::atomic::{AtomicUsize, Ordering};
200 use tokio::sync::Mutex;
201
202 #[derive(Debug, Clone)]
204 struct RecordedRequest {
205 method: String,
206 url: String,
207 headers: HashMap<String, String>,
208 body: Value,
209 }
210
211 #[derive(Debug)]
213 struct MockHttpSender {
214 responses: Mutex<Vec<Result<u16>>>,
216 requests: Mutex<Vec<RecordedRequest>>,
218 call_count: AtomicUsize,
220 }
221
222 impl MockHttpSender {
223 fn with_responses(responses: Vec<Result<u16>>) -> Self {
224 Self {
225 responses: Mutex::new(responses),
226 requests: Mutex::new(Vec::new()),
227 call_count: AtomicUsize::new(0),
228 }
229 }
230
231 fn always_ok() -> Self {
232 Self::with_responses(vec![])
233 }
234 }
235
236 #[async_trait]
237 impl HttpSender for MockHttpSender {
238 async fn send(
239 &self,
240 method: &str,
241 url: &str,
242 headers: &HashMap<String, String>,
243 body: Value,
244 ) -> Result<u16> {
245 let idx = self.call_count.fetch_add(1, Ordering::SeqCst);
246 self.requests.lock().await.push(RecordedRequest {
247 method: method.to_string(),
248 url: url.to_string(),
249 headers: headers.clone(),
250 body,
251 });
252
253 let mut responses = self.responses.lock().await;
254 if idx < responses.len() {
255 std::mem::replace(&mut responses[idx], Ok(0))
258 } else {
259 Ok(200) }
261 }
262 }
263
264 fn fast_config(url: &str) -> WebhookConfig {
265 WebhookConfig {
266 url: url.to_string(),
267 method: "POST".to_string(),
268 headers: HashMap::new(),
269 max_retries: 3,
270 backoff: vec![
271 Duration::from_millis(1),
272 Duration::from_millis(1),
273 Duration::from_millis(1),
274 ],
275 timeout: Duration::from_secs(5),
276 }
277 }
278
279 #[tokio::test]
280 async fn test_webhook_success() {
281 let sender = Arc::new(MockHttpSender::always_ok());
282 let sink = WebhookSink::new(sender.clone(), fast_config("https://example.com/hook"));
283
284 let payload = json!({"event": "user.created", "user_id": "123"});
285 sink.deliver(payload.clone(), None, &HashMap::new())
286 .await
287 .unwrap();
288
289 let requests = sender.requests.lock().await;
290 assert_eq!(requests.len(), 1);
291 assert_eq!(requests[0].method, "POST");
292 assert_eq!(requests[0].url, "https://example.com/hook");
293 assert_eq!(requests[0].body, payload);
294 }
295
296 #[tokio::test]
297 async fn test_webhook_custom_headers() {
298 let sender = Arc::new(MockHttpSender::always_ok());
299 let mut config = fast_config("https://example.com/hook");
300 config
301 .headers
302 .insert("Authorization".to_string(), "Bearer token123".to_string());
303 config.method = "PUT".to_string();
304
305 let sink = WebhookSink::new(sender.clone(), config);
306 sink.deliver(json!({}), None, &HashMap::new())
307 .await
308 .unwrap();
309
310 let requests = sender.requests.lock().await;
311 assert_eq!(requests[0].method, "PUT");
312 assert_eq!(
313 requests[0].headers.get("Authorization").unwrap(),
314 "Bearer token123"
315 );
316 }
317
318 #[tokio::test]
319 async fn test_webhook_retry_on_server_error() {
320 let sender = Arc::new(MockHttpSender::with_responses(vec![
321 Ok(500), Ok(200), ]));
324
325 let sink = WebhookSink::new(sender.clone(), fast_config("https://example.com"));
326 sink.deliver(json!({}), None, &HashMap::new())
327 .await
328 .unwrap();
329
330 assert_eq!(sender.call_count.load(Ordering::SeqCst), 2);
331 }
332
333 #[tokio::test]
334 async fn test_webhook_no_retry_on_client_error() {
335 let sender = Arc::new(MockHttpSender::with_responses(vec![
336 Ok(400), ]));
338
339 let sink = WebhookSink::new(sender.clone(), fast_config("https://example.com"));
340 let result = sink.deliver(json!({}), None, &HashMap::new()).await;
341
342 assert!(result.is_err());
343 assert!(result.unwrap_err().to_string().contains("client error 400"));
344 assert_eq!(sender.call_count.load(Ordering::SeqCst), 1);
345 }
346
347 #[tokio::test]
348 async fn test_webhook_retry_on_network_error() {
349 let sender = Arc::new(MockHttpSender::with_responses(vec![
350 Err(anyhow!("connection refused")),
351 Ok(200),
352 ]));
353
354 let sink = WebhookSink::new(sender.clone(), fast_config("https://example.com"));
355 sink.deliver(json!({}), None, &HashMap::new())
356 .await
357 .unwrap();
358
359 assert_eq!(sender.call_count.load(Ordering::SeqCst), 2);
360 }
361
362 #[tokio::test]
363 async fn test_webhook_max_retries_exceeded() {
364 let sender = Arc::new(MockHttpSender::with_responses(vec![
365 Ok(503),
366 Ok(503),
367 Ok(503),
368 Ok(503),
369 ]));
370
371 let sink = WebhookSink::new(sender.clone(), fast_config("https://example.com"));
372 let result = sink.deliver(json!({}), None, &HashMap::new()).await;
373
374 assert!(result.is_err());
375 assert!(result.unwrap_err().to_string().contains("after 3 retries"));
376 assert_eq!(sender.call_count.load(Ordering::SeqCst), 4); }
378
379 #[tokio::test]
380 async fn test_webhook_empty_url_error() {
381 let sender = Arc::new(MockHttpSender::always_ok());
382 let sink = WebhookSink::new(sender, fast_config(""));
383
384 let result = sink.deliver(json!({}), None, &HashMap::new()).await;
385 assert!(result.is_err());
386 assert!(
387 result
388 .unwrap_err()
389 .to_string()
390 .contains("URL not configured")
391 );
392 }
393
394 #[test]
395 fn test_webhook_sink_name_and_type() {
396 let sender = Arc::new(MockHttpSender::always_ok());
397 let sink = WebhookSink::new(sender, fast_config("https://example.com"));
398 assert_eq!(sink.name(), "webhook");
399 assert_eq!(sink.sink_type(), SinkType::Webhook);
400 }
401}