Skip to main content

allsource_core/
webhook_worker.rs

1//! Async webhook delivery worker.
2//!
3//! Receives webhook delivery tasks from the event ingestion path via an mpsc channel,
4//! and delivers events to registered webhook URLs with retry and exponential backoff.
5
6use crate::{
7    application::services::webhook::{
8        DeliveryStatus, WebhookDelivery, WebhookRegistry, WebhookSubscription,
9    },
10    domain::entities::Event,
11    store::WebhookDeliveryTask,
12};
13use chrono::Utc;
14use std::sync::Arc;
15use tokio::sync::mpsc;
16use uuid::Uuid;
17
18/// Maximum number of delivery attempts per event
19const MAX_ATTEMPTS: u32 = 5;
20
21/// Base delay for exponential backoff (seconds)
22const BASE_BACKOFF_SECS: u64 = 2;
23
24/// Run the webhook delivery worker.
25///
26/// Consumes tasks from the channel and delivers them asynchronously.
27/// Each delivery is spawned as an independent tokio task for parallelism.
28#[cfg_attr(feature = "hotpath", hotpath::measure)]
29pub async fn run_webhook_delivery_worker(
30    mut rx: mpsc::UnboundedReceiver<WebhookDeliveryTask>,
31    registry: Arc<WebhookRegistry>,
32) {
33    tracing::info!("Webhook delivery worker started");
34
35    while let Some(task) = rx.recv().await {
36        let registry = Arc::clone(&registry);
37        tokio::spawn(async move {
38            deliver_with_retry(&task.webhook, &task.event, &registry).await;
39        });
40    }
41
42    tracing::info!("Webhook delivery worker stopped (channel closed)");
43}
44
45/// Deliver an event to a webhook URL with exponential backoff retry.
46async fn deliver_with_retry(
47    webhook: &WebhookSubscription,
48    event: &Event,
49    registry: &WebhookRegistry,
50) {
51    let delivery_id = Uuid::new_v4();
52    let client = reqwest::Client::builder()
53        .timeout(std::time::Duration::from_secs(10))
54        .build()
55        .unwrap_or_default();
56
57    // Build the payload
58    let payload = build_payload(webhook, event);
59
60    // Compute HMAC-SHA256 signature
61    let signature = compute_signature(&webhook.secret, &payload);
62
63    for attempt in 1..=MAX_ATTEMPTS {
64        let result = client
65            .post(&webhook.url)
66            .header("Content-Type", "application/json")
67            .header("X-AllSource-Signature", &signature)
68            .header("X-AllSource-Event-Type", event.event_type_str())
69            .header("X-AllSource-Delivery-Id", delivery_id.to_string())
70            .header("X-AllSource-Webhook-Id", webhook.id.to_string())
71            .body(payload.clone())
72            .send()
73            .await;
74
75        match result {
76            Ok(response) => {
77                let status_code = response.status().as_u16();
78                let body = response.text().await.unwrap_or_else(|_| String::new());
79
80                if (200..300).contains(&status_code) {
81                    // Success
82                    registry.record_delivery(WebhookDelivery {
83                        id: delivery_id,
84                        webhook_id: webhook.id,
85                        event_id: event.id,
86                        status: DeliveryStatus::Success,
87                        attempt,
88                        max_attempts: MAX_ATTEMPTS,
89                        response_status: Some(status_code),
90                        response_body: if body.is_empty() {
91                            None
92                        } else {
93                            Some(truncate_string(&body, 500))
94                        },
95                        error: None,
96                        created_at: Utc::now(),
97                        next_retry_at: None,
98                    });
99
100                    tracing::debug!(
101                        "Webhook delivered: {} -> {} (attempt {})",
102                        event.id,
103                        webhook.url,
104                        attempt
105                    );
106                    return;
107                }
108                // Non-2xx response — retry
109                let delay = backoff_delay(attempt);
110                tracing::warn!(
111                    "Webhook delivery failed: {} -> {} (status {}, attempt {}/{}), retrying in {}s",
112                    event.id,
113                    webhook.url,
114                    status_code,
115                    attempt,
116                    MAX_ATTEMPTS,
117                    delay.as_secs()
118                );
119
120                registry.record_delivery(WebhookDelivery {
121                    id: delivery_id,
122                    webhook_id: webhook.id,
123                    event_id: event.id,
124                    status: if attempt < MAX_ATTEMPTS {
125                        DeliveryStatus::Retrying
126                    } else {
127                        DeliveryStatus::Failed
128                    },
129                    attempt,
130                    max_attempts: MAX_ATTEMPTS,
131                    response_status: Some(status_code),
132                    response_body: Some(truncate_string(&body, 500)),
133                    error: None,
134                    created_at: Utc::now(),
135                    next_retry_at: if attempt < MAX_ATTEMPTS {
136                        Some(Utc::now() + chrono::Duration::seconds(delay.as_secs() as i64))
137                    } else {
138                        None
139                    },
140                });
141
142                if attempt < MAX_ATTEMPTS {
143                    tokio::time::sleep(delay).await;
144                }
145            }
146            Err(e) => {
147                // Connection/timeout error — retry
148                let delay = backoff_delay(attempt);
149                tracing::warn!(
150                    "Webhook delivery error: {} -> {} ({}, attempt {}/{}), retrying in {}s",
151                    event.id,
152                    webhook.url,
153                    e,
154                    attempt,
155                    MAX_ATTEMPTS,
156                    delay.as_secs()
157                );
158
159                registry.record_delivery(WebhookDelivery {
160                    id: delivery_id,
161                    webhook_id: webhook.id,
162                    event_id: event.id,
163                    status: if attempt < MAX_ATTEMPTS {
164                        DeliveryStatus::Retrying
165                    } else {
166                        DeliveryStatus::Failed
167                    },
168                    attempt,
169                    max_attempts: MAX_ATTEMPTS,
170                    response_status: None,
171                    response_body: None,
172                    error: Some(e.to_string()),
173                    created_at: Utc::now(),
174                    next_retry_at: if attempt < MAX_ATTEMPTS {
175                        Some(Utc::now() + chrono::Duration::seconds(delay.as_secs() as i64))
176                    } else {
177                        None
178                    },
179                });
180
181                if attempt < MAX_ATTEMPTS {
182                    tokio::time::sleep(delay).await;
183                }
184            }
185        }
186    }
187
188    tracing::error!(
189        "Webhook delivery permanently failed after {} attempts: {} -> {}",
190        MAX_ATTEMPTS,
191        event.id,
192        webhook.url
193    );
194}
195
196/// Build the JSON payload for webhook delivery
197fn build_payload(webhook: &WebhookSubscription, event: &Event) -> String {
198    let payload = serde_json::json!({
199        "webhook_id": webhook.id,
200        "event": {
201            "id": event.id,
202            "event_type": event.event_type_str(),
203            "entity_id": event.entity_id_str(),
204            "timestamp": event.timestamp,
205            "payload": event.payload,
206            "metadata": event.metadata,
207        },
208        "delivered_at": Utc::now(),
209    });
210    serde_json::to_string(&payload).unwrap_or_default()
211}
212
213/// Compute HMAC-SHA256 signature for webhook payload verification
214fn compute_signature(secret: &str, payload: &str) -> String {
215    use hmac::{Hmac, Mac};
216    use sha2::Sha256;
217
218    type HmacSha256 = Hmac<Sha256>;
219    let mut mac =
220        HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");
221    mac.update(payload.as_bytes());
222    let result = mac.finalize();
223    let code_bytes = result.into_bytes();
224
225    // Return as hex string with sha256= prefix
226    format!("sha256={}", hex::encode(code_bytes))
227}
228
229/// Calculate exponential backoff delay: base * 2^(attempt-1)
230fn backoff_delay(attempt: u32) -> std::time::Duration {
231    let secs = BASE_BACKOFF_SECS * 2u64.pow(attempt.saturating_sub(1));
232    // Cap at 5 minutes
233    std::time::Duration::from_secs(secs.min(300))
234}
235
236/// Truncate a string to a maximum length
237fn truncate_string(s: &str, max_len: usize) -> String {
238    if s.len() <= max_len {
239        s.to_string()
240    } else {
241        format!("{}...", &s[..max_len])
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248
249    #[test]
250    fn test_backoff_delay() {
251        assert_eq!(backoff_delay(1), std::time::Duration::from_secs(2));
252        assert_eq!(backoff_delay(2), std::time::Duration::from_secs(4));
253        assert_eq!(backoff_delay(3), std::time::Duration::from_secs(8));
254        assert_eq!(backoff_delay(4), std::time::Duration::from_secs(16));
255        assert_eq!(backoff_delay(5), std::time::Duration::from_secs(32));
256    }
257
258    #[test]
259    fn test_backoff_delay_capped() {
260        // Very high attempt should be capped at 300s
261        assert_eq!(backoff_delay(20), std::time::Duration::from_secs(300));
262    }
263
264    #[test]
265    fn test_compute_signature() {
266        let sig = compute_signature("test-secret", r#"{"test": true}"#);
267        assert!(sig.starts_with("sha256="));
268        assert_eq!(sig.len(), 7 + 64); // "sha256=" + 64 hex chars
269    }
270
271    #[test]
272    fn test_truncate_string() {
273        assert_eq!(truncate_string("short", 10), "short");
274        assert_eq!(truncate_string("a longer string", 5), "a lon...");
275    }
276
277    #[test]
278    fn test_build_payload() {
279        let webhook = WebhookSubscription {
280            id: Uuid::new_v4(),
281            tenant_id: "tenant-1".to_string(),
282            url: "https://example.com/hook".to_string(),
283            event_types: vec![],
284            entity_ids: vec![],
285            secret: "secret".to_string(),
286            active: true,
287            created_at: Utc::now(),
288            updated_at: Utc::now(),
289            description: None,
290        };
291
292        let event = Event::from_strings(
293            "user.created".to_string(),
294            "user-1".to_string(),
295            "default".to_string(),
296            serde_json::json!({"name": "Test"}),
297            None,
298        )
299        .unwrap();
300
301        let payload = build_payload(&webhook, &event);
302        let parsed: serde_json::Value = serde_json::from_str(&payload).unwrap();
303
304        assert_eq!(parsed["event"]["event_type"], "user.created");
305        assert_eq!(parsed["event"]["entity_id"], "user-1");
306        assert_eq!(parsed["webhook_id"], webhook.id.to_string());
307    }
308}