Skip to main content

axon/
webhook_delivery.rs

1//! Webhook Delivery — async HTTP delivery with retry and exponential backoff.
2//!
3//! When an event is dispatched to matching webhooks, this module handles the
4//! actual HTTP POST to each webhook URL:
5//!   - Timeout per request (configurable, default 10s)
6//!   - Exponential backoff with jitter on failure (configurable retries)
7//!   - HMAC signature header when webhook has a secret
8//!   - Records delivery results back into WebhookRegistry
9//!
10//! Usage:
11//!   - `deliver_one()` — single delivery attempt to one webhook
12//!   - `deliver_with_retry()` — delivery with exponential backoff retries
13//!   - `dispatch_all()` — spawn tokio tasks for all matched webhooks
14
15use std::sync::{Arc, Mutex};
16use std::time::Duration;
17
18use serde::Serialize;
19
20use crate::webhooks::WebhookRegistry;
21
22// ── Configuration ───────────────────────────────────────────────────────
23
24/// Configuration for webhook delivery behavior.
25#[derive(Debug, Clone, Serialize)]
26pub struct DeliveryConfig {
27    /// Timeout per HTTP request.
28    pub timeout: Duration,
29    /// Maximum retry attempts (0 = no retries, 1 = one retry, etc.).
30    pub max_retries: u32,
31    /// Base delay for exponential backoff (doubles each retry).
32    pub base_delay: Duration,
33    /// Maximum backoff delay cap.
34    pub max_delay: Duration,
35}
36
37impl Default for DeliveryConfig {
38    fn default() -> Self {
39        DeliveryConfig {
40            timeout: Duration::from_secs(10),
41            max_retries: 3,
42            base_delay: Duration::from_millis(500),
43            max_delay: Duration::from_secs(30),
44        }
45    }
46}
47
48// ── Delivery result ─────────────────────────────────────────────────────
49
50/// Result of a single delivery attempt.
51#[derive(Debug, Clone, Serialize)]
52pub struct DeliveryResult {
53    pub webhook_id: String,
54    pub topic: String,
55    pub status_code: u16,
56    pub success: bool,
57    pub latency_ms: u64,
58    pub attempts: u32,
59    pub error: Option<String>,
60}
61
62// ── Delivery payload ────────────────────────────────────────────────────
63
64/// Payload sent to webhook URL.
65#[derive(Debug, Clone, Serialize)]
66pub struct WebhookPayload {
67    pub event: String,
68    pub payload: serde_json::Value,
69    pub source: String,
70    pub timestamp: u64,
71}
72
73// ── Single delivery ─────────────────────────────────────────────────────
74
75/// Deliver a single POST request to a webhook URL.
76///
77/// Returns (status_code, latency_ms, error).
78/// status_code is 0 if the connection failed entirely.
79pub async fn deliver_one(
80    url: &str,
81    body: &WebhookPayload,
82    signature: Option<&str>,
83    timeout: Duration,
84) -> (u16, u64, Option<String>) {
85    let start = std::time::Instant::now();
86
87    let client = match reqwest::Client::builder()
88        .timeout(timeout)
89        .build()
90    {
91        Ok(c) => c,
92        Err(e) => return (0, 0, Some(format!("client build error: {e}"))),
93    };
94
95    let mut request = client.post(url)
96        .header("Content-Type", "application/json")
97        .header("User-Agent", "AxonServer-Webhook/1.0");
98
99    if let Some(sig) = signature {
100        request = request.header("X-Axon-Signature", sig);
101    }
102
103    let body_bytes = match serde_json::to_vec(body) {
104        Ok(b) => b,
105        Err(e) => return (0, 0, Some(format!("serialize error: {e}"))),
106    };
107
108    request = request.body(body_bytes);
109
110    match request.send().await {
111        Ok(response) => {
112            let latency = start.elapsed().as_millis() as u64;
113            let status = response.status().as_u16();
114            if (200..300).contains(&status) {
115                (status, latency, None)
116            } else {
117                let error_text = response.text().await.unwrap_or_default();
118                let msg = if error_text.len() > 200 {
119                    format!("HTTP {status}: {}...", &error_text[..200])
120                } else {
121                    format!("HTTP {status}: {error_text}")
122                };
123                (status, latency, Some(msg))
124            }
125        }
126        Err(e) => {
127            let latency = start.elapsed().as_millis() as u64;
128            if e.is_timeout() {
129                (0, latency, Some("timeout".to_string()))
130            } else if e.is_connect() {
131                (0, latency, Some(format!("connection error: {e}")))
132            } else {
133                (0, latency, Some(format!("request error: {e}")))
134            }
135        }
136    }
137}
138
139// ── Delivery with retry ─────────────────────────────────────────────────
140
141/// Deliver to a webhook with exponential backoff retries.
142///
143/// Retries on: connection errors, timeouts, 5xx status codes.
144/// Does NOT retry on: 4xx status codes (client error, won't improve).
145pub async fn deliver_with_retry(
146    url: &str,
147    body: &WebhookPayload,
148    signature: Option<&str>,
149    config: &DeliveryConfig,
150) -> DeliveryResult {
151    let webhook_id = String::new(); // filled by caller
152    let topic = body.event.clone();
153    let mut last_status = 0u16;
154    let mut _last_latency = 0u64;
155    let mut last_error = None;
156    let total_start = std::time::Instant::now();
157
158    for attempt in 0..=config.max_retries {
159        let (status, latency, error) = deliver_one(url, body, signature, config.timeout).await;
160
161        last_status = status;
162        _last_latency = latency;
163        last_error = error.clone();
164
165        // Success: 2xx
166        if (200..300).contains(&status) {
167            return DeliveryResult {
168                webhook_id,
169                topic,
170                status_code: status,
171                success: true,
172                latency_ms: total_start.elapsed().as_millis() as u64,
173                attempts: attempt + 1,
174                error: None,
175            };
176        }
177
178        // Don't retry 4xx (client errors won't improve)
179        if (400..500).contains(&status) {
180            return DeliveryResult {
181                webhook_id,
182                topic,
183                status_code: status,
184                success: false,
185                latency_ms: total_start.elapsed().as_millis() as u64,
186                attempts: attempt + 1,
187                error,
188            };
189        }
190
191        // Retry: 5xx, timeout, connection error
192        if attempt < config.max_retries {
193            let delay = compute_backoff(attempt, config.base_delay, config.max_delay);
194            tokio::time::sleep(delay).await;
195        }
196    }
197
198    // All retries exhausted
199    DeliveryResult {
200        webhook_id,
201        topic,
202        status_code: last_status,
203        success: false,
204        latency_ms: total_start.elapsed().as_millis() as u64,
205        attempts: config.max_retries + 1,
206        error: last_error,
207    }
208}
209
210/// Compute exponential backoff delay with jitter.
211fn compute_backoff(attempt: u32, base: Duration, max: Duration) -> Duration {
212    let exp_ms = base.as_millis() as u64 * (1u64 << attempt.min(10));
213    let capped = exp_ms.min(max.as_millis() as u64);
214    // Simple deterministic jitter: vary by ±25% based on attempt
215    let jitter_factor = match attempt % 4 {
216        0 => 100,
217        1 => 75,
218        2 => 125,
219        _ => 110,
220    };
221    let final_ms = capped * jitter_factor / 100;
222    Duration::from_millis(final_ms)
223}
224
225// ── Batch dispatch ──────────────────────────────────────────────────────
226
227/// Spawn async delivery tasks for all matching webhooks.
228///
229/// This function reads the webhook configs from the registry, then spawns
230/// independent tokio tasks for each delivery. Results are recorded back
231/// into the registry when each task completes.
232///
233/// Returns the number of delivery tasks spawned.
234pub fn dispatch_all(
235    registry: Arc<Mutex<WebhookRegistry>>,
236    matched_ids: Vec<String>,
237    topic: String,
238    payload: serde_json::Value,
239    source: String,
240    config: DeliveryConfig,
241) -> usize {
242    // Collect webhook info we need outside the lock
243    let mut targets: Vec<(String, String, Option<String>)> = Vec::new(); // (id, url, secret)
244
245    {
246        let reg = registry.lock().unwrap();
247        for id in &matched_ids {
248            if let Some(wh) = reg.get(id) {
249                targets.push((wh.id.clone(), wh.url.clone(), wh.secret.clone()));
250            }
251        }
252    }
253
254    let count = targets.len();
255    let timestamp = std::time::SystemTime::now()
256        .duration_since(std::time::UNIX_EPOCH)
257        .unwrap_or_default()
258        .as_secs();
259
260    for (webhook_id, url, secret) in targets {
261        let registry = registry.clone();
262        let topic = topic.clone();
263        let payload = payload.clone();
264        let source = source.clone();
265        let config = config.clone();
266
267        tokio::spawn(async move {
268            let body = WebhookPayload {
269                event: topic.clone(),
270                payload,
271                source,
272                timestamp,
273            };
274
275            let signature = secret.as_ref().map(|s| {
276                let body_bytes = serde_json::to_vec(&body).unwrap_or_default();
277                WebhookRegistry::compute_signature(s, &body_bytes)
278            });
279
280            let mut result = deliver_with_retry(
281                &url,
282                &body,
283                signature.as_deref(),
284                &config,
285            ).await;
286
287            result.webhook_id = webhook_id.clone();
288
289            // Record result back in registry
290            if let Ok(mut reg) = registry.lock() {
291                reg.record_completed(
292                    &webhook_id,
293                    &topic,
294                    result.status_code,
295                    result.latency_ms,
296                    result.error.clone(),
297                    result.attempts - 1,
298                );
299            }
300        });
301    }
302
303    count
304}
305
306// ── Tests ────────────────────────────────────────────────────────────────
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311
312    #[test]
313    fn delivery_config_defaults() {
314        let config = DeliveryConfig::default();
315        assert_eq!(config.timeout, Duration::from_secs(10));
316        assert_eq!(config.max_retries, 3);
317        assert_eq!(config.base_delay, Duration::from_millis(500));
318        assert_eq!(config.max_delay, Duration::from_secs(30));
319    }
320
321    #[test]
322    fn delivery_config_serializable() {
323        let config = DeliveryConfig::default();
324        let json = serde_json::to_value(&config).unwrap();
325        assert!(json["timeout"].is_object()); // Duration serializes as {secs, nanos}
326        assert_eq!(json["max_retries"], 3);
327    }
328
329    #[test]
330    fn delivery_result_serializable() {
331        let result = DeliveryResult {
332            webhook_id: "wh_1".to_string(),
333            topic: "deploy".to_string(),
334            status_code: 200,
335            success: true,
336            latency_ms: 45,
337            attempts: 1,
338            error: None,
339        };
340        let json = serde_json::to_value(&result).unwrap();
341        assert_eq!(json["webhook_id"], "wh_1");
342        assert_eq!(json["status_code"], 200);
343        assert_eq!(json["success"], true);
344        assert_eq!(json["attempts"], 1);
345        assert!(json["error"].is_null());
346    }
347
348    #[test]
349    fn delivery_result_with_error() {
350        let result = DeliveryResult {
351            webhook_id: "wh_2".to_string(),
352            topic: "config.updated".to_string(),
353            status_code: 500,
354            success: false,
355            latency_ms: 120,
356            attempts: 4,
357            error: Some("HTTP 500: Internal Server Error".to_string()),
358        };
359        let json = serde_json::to_value(&result).unwrap();
360        assert_eq!(json["success"], false);
361        assert_eq!(json["attempts"], 4);
362        assert_eq!(json["error"], "HTTP 500: Internal Server Error");
363    }
364
365    #[test]
366    fn webhook_payload_serializable() {
367        let payload = WebhookPayload {
368            event: "deploy".to_string(),
369            payload: serde_json::json!({"flows": ["FlowA"]}),
370            source: "server".to_string(),
371            timestamp: 1700000000,
372        };
373        let json = serde_json::to_value(&payload).unwrap();
374        assert_eq!(json["event"], "deploy");
375        assert_eq!(json["source"], "server");
376        assert_eq!(json["timestamp"], 1700000000u64);
377        assert!(json["payload"]["flows"].is_array());
378    }
379
380    #[test]
381    fn compute_backoff_exponential() {
382        let base = Duration::from_millis(500);
383        let max = Duration::from_secs(30);
384
385        let d0 = compute_backoff(0, base, max);
386        let d1 = compute_backoff(1, base, max);
387        let d2 = compute_backoff(2, base, max);
388
389        // Attempt 0: 500ms * 1 * jitter(100%) = 500ms
390        assert_eq!(d0.as_millis(), 500);
391        // Attempt 1: 500ms * 2 * jitter(75%) = 750ms
392        assert_eq!(d1.as_millis(), 750);
393        // Attempt 2: 500ms * 4 * jitter(125%) = 2500ms
394        assert_eq!(d2.as_millis(), 2500);
395    }
396
397    #[test]
398    fn compute_backoff_capped() {
399        let base = Duration::from_secs(10);
400        let max = Duration::from_secs(30);
401
402        // Attempt 5: 10s * 32 = 320s, capped at 30s, then jitter
403        let d = compute_backoff(5, base, max);
404        assert!(d.as_secs() <= 40); // 30s + jitter margin
405    }
406
407    #[tokio::test]
408    async fn deliver_one_connection_refused() {
409        // Deliver to a port that's not listening
410        let body = WebhookPayload {
411            event: "test".to_string(),
412            payload: serde_json::json!(null),
413            source: "test".to_string(),
414            timestamp: 0,
415        };
416
417        let (status, _latency, error) = deliver_one(
418            "http://127.0.0.1:19999/nonexistent",
419            &body,
420            None,
421            Duration::from_secs(2),
422        ).await;
423
424        assert_eq!(status, 0);
425        assert!(error.is_some());
426        let err_msg = error.unwrap();
427        // Error message varies by platform; just check it's non-empty
428        assert!(!err_msg.is_empty(), "expected non-empty error, got: {err_msg}");
429    }
430
431    #[tokio::test]
432    async fn deliver_with_retry_connection_refused_exhausts_retries() {
433        let body = WebhookPayload {
434            event: "test".to_string(),
435            payload: serde_json::json!(null),
436            source: "test".to_string(),
437            timestamp: 0,
438        };
439
440        let config = DeliveryConfig {
441            timeout: Duration::from_secs(1),
442            max_retries: 1, // Only 1 retry to keep test fast
443            base_delay: Duration::from_millis(50),
444            max_delay: Duration::from_millis(100),
445        };
446
447        let result = deliver_with_retry(
448            "http://127.0.0.1:19999/nonexistent",
449            &body,
450            None,
451            &config,
452        ).await;
453
454        assert!(!result.success);
455        assert_eq!(result.attempts, 2); // initial + 1 retry
456        assert!(result.error.is_some());
457    }
458}