1use crate::{
7 application::services::webhook::{
8 DeliveryStatus, WebhookDelivery, WebhookRegistry, WebhookSubscription,
9 },
10 domain::entities::Event,
11 store::WebhookDeliveryTask,
12};
13use chrono::Utc;
14use std::sync::Arc;
15use tokio::sync::mpsc;
16use uuid::Uuid;
17
18const MAX_ATTEMPTS: u32 = 5;
20
21const BASE_BACKOFF_SECS: u64 = 2;
23
24#[cfg_attr(feature = "hotpath", hotpath::measure)]
29pub async fn run_webhook_delivery_worker(
30 mut rx: mpsc::UnboundedReceiver<WebhookDeliveryTask>,
31 registry: Arc<WebhookRegistry>,
32) {
33 tracing::info!("Webhook delivery worker started");
34
35 while let Some(task) = rx.recv().await {
36 let registry = Arc::clone(®istry);
37 tokio::spawn(async move {
38 deliver_with_retry(&task.webhook, &task.event, ®istry).await;
39 });
40 }
41
42 tracing::info!("Webhook delivery worker stopped (channel closed)");
43}
44
45async fn deliver_with_retry(
47 webhook: &WebhookSubscription,
48 event: &Event,
49 registry: &WebhookRegistry,
50) {
51 let delivery_id = Uuid::new_v4();
52 let client = reqwest::Client::builder()
53 .timeout(std::time::Duration::from_secs(10))
54 .build()
55 .unwrap_or_default();
56
57 let payload = build_payload(webhook, event);
59
60 let signature = compute_signature(&webhook.secret, &payload);
62
63 for attempt in 1..=MAX_ATTEMPTS {
64 let result = client
65 .post(&webhook.url)
66 .header("Content-Type", "application/json")
67 .header("X-AllSource-Signature", &signature)
68 .header("X-AllSource-Event-Type", event.event_type_str())
69 .header("X-AllSource-Delivery-Id", delivery_id.to_string())
70 .header("X-AllSource-Webhook-Id", webhook.id.to_string())
71 .body(payload.clone())
72 .send()
73 .await;
74
75 match result {
76 Ok(response) => {
77 let status_code = response.status().as_u16();
78 let body = response.text().await.unwrap_or_else(|_| String::new());
79
80 if (200..300).contains(&status_code) {
81 registry.record_delivery(WebhookDelivery {
83 id: delivery_id,
84 webhook_id: webhook.id,
85 event_id: event.id,
86 status: DeliveryStatus::Success,
87 attempt,
88 max_attempts: MAX_ATTEMPTS,
89 response_status: Some(status_code),
90 response_body: if body.is_empty() {
91 None
92 } else {
93 Some(truncate_string(&body, 500))
94 },
95 error: None,
96 created_at: Utc::now(),
97 next_retry_at: None,
98 });
99
100 tracing::debug!(
101 "Webhook delivered: {} -> {} (attempt {})",
102 event.id,
103 webhook.url,
104 attempt
105 );
106 return;
107 }
108 let delay = backoff_delay(attempt);
110 tracing::warn!(
111 "Webhook delivery failed: {} -> {} (status {}, attempt {}/{}), retrying in {}s",
112 event.id,
113 webhook.url,
114 status_code,
115 attempt,
116 MAX_ATTEMPTS,
117 delay.as_secs()
118 );
119
120 registry.record_delivery(WebhookDelivery {
121 id: delivery_id,
122 webhook_id: webhook.id,
123 event_id: event.id,
124 status: if attempt < MAX_ATTEMPTS {
125 DeliveryStatus::Retrying
126 } else {
127 DeliveryStatus::Failed
128 },
129 attempt,
130 max_attempts: MAX_ATTEMPTS,
131 response_status: Some(status_code),
132 response_body: Some(truncate_string(&body, 500)),
133 error: None,
134 created_at: Utc::now(),
135 next_retry_at: if attempt < MAX_ATTEMPTS {
136 Some(Utc::now() + chrono::Duration::seconds(delay.as_secs() as i64))
137 } else {
138 None
139 },
140 });
141
142 if attempt < MAX_ATTEMPTS {
143 tokio::time::sleep(delay).await;
144 }
145 }
146 Err(e) => {
147 let delay = backoff_delay(attempt);
149 tracing::warn!(
150 "Webhook delivery error: {} -> {} ({}, attempt {}/{}), retrying in {}s",
151 event.id,
152 webhook.url,
153 e,
154 attempt,
155 MAX_ATTEMPTS,
156 delay.as_secs()
157 );
158
159 registry.record_delivery(WebhookDelivery {
160 id: delivery_id,
161 webhook_id: webhook.id,
162 event_id: event.id,
163 status: if attempt < MAX_ATTEMPTS {
164 DeliveryStatus::Retrying
165 } else {
166 DeliveryStatus::Failed
167 },
168 attempt,
169 max_attempts: MAX_ATTEMPTS,
170 response_status: None,
171 response_body: None,
172 error: Some(e.to_string()),
173 created_at: Utc::now(),
174 next_retry_at: if attempt < MAX_ATTEMPTS {
175 Some(Utc::now() + chrono::Duration::seconds(delay.as_secs() as i64))
176 } else {
177 None
178 },
179 });
180
181 if attempt < MAX_ATTEMPTS {
182 tokio::time::sleep(delay).await;
183 }
184 }
185 }
186 }
187
188 tracing::error!(
189 "Webhook delivery permanently failed after {} attempts: {} -> {}",
190 MAX_ATTEMPTS,
191 event.id,
192 webhook.url
193 );
194}
195
196fn build_payload(webhook: &WebhookSubscription, event: &Event) -> String {
198 let payload = serde_json::json!({
199 "webhook_id": webhook.id,
200 "event": {
201 "id": event.id,
202 "event_type": event.event_type_str(),
203 "entity_id": event.entity_id_str(),
204 "timestamp": event.timestamp,
205 "payload": event.payload,
206 "metadata": event.metadata,
207 },
208 "delivered_at": Utc::now(),
209 });
210 serde_json::to_string(&payload).unwrap_or_default()
211}
212
213fn compute_signature(secret: &str, payload: &str) -> String {
215 use hmac::{Hmac, Mac};
216 use sha2::Sha256;
217
218 type HmacSha256 = Hmac<Sha256>;
219 let mut mac =
220 HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");
221 mac.update(payload.as_bytes());
222 let result = mac.finalize();
223 let code_bytes = result.into_bytes();
224
225 format!("sha256={}", hex::encode(code_bytes))
227}
228
229fn backoff_delay(attempt: u32) -> std::time::Duration {
231 let secs = BASE_BACKOFF_SECS * 2u64.pow(attempt.saturating_sub(1));
232 std::time::Duration::from_secs(secs.min(300))
234}
235
236fn truncate_string(s: &str, max_len: usize) -> String {
238 if s.len() <= max_len {
239 s.to_string()
240 } else {
241 format!("{}...", &s[..max_len])
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248
249 #[test]
250 fn test_backoff_delay() {
251 assert_eq!(backoff_delay(1), std::time::Duration::from_secs(2));
252 assert_eq!(backoff_delay(2), std::time::Duration::from_secs(4));
253 assert_eq!(backoff_delay(3), std::time::Duration::from_secs(8));
254 assert_eq!(backoff_delay(4), std::time::Duration::from_secs(16));
255 assert_eq!(backoff_delay(5), std::time::Duration::from_secs(32));
256 }
257
258 #[test]
259 fn test_backoff_delay_capped() {
260 assert_eq!(backoff_delay(20), std::time::Duration::from_secs(300));
262 }
263
264 #[test]
265 fn test_compute_signature() {
266 let sig = compute_signature("test-secret", r#"{"test": true}"#);
267 assert!(sig.starts_with("sha256="));
268 assert_eq!(sig.len(), 7 + 64); }
270
271 #[test]
272 fn test_truncate_string() {
273 assert_eq!(truncate_string("short", 10), "short");
274 assert_eq!(truncate_string("a longer string", 5), "a lon...");
275 }
276
277 #[test]
278 fn test_build_payload() {
279 let webhook = WebhookSubscription {
280 id: Uuid::new_v4(),
281 tenant_id: "tenant-1".to_string(),
282 url: "https://example.com/hook".to_string(),
283 event_types: vec![],
284 entity_ids: vec![],
285 secret: "secret".to_string(),
286 active: true,
287 created_at: Utc::now(),
288 updated_at: Utc::now(),
289 description: None,
290 };
291
292 let event = Event::from_strings(
293 "user.created".to_string(),
294 "user-1".to_string(),
295 "default".to_string(),
296 serde_json::json!({"name": "Test"}),
297 None,
298 )
299 .unwrap();
300
301 let payload = build_payload(&webhook, &event);
302 let parsed: serde_json::Value = serde_json::from_str(&payload).unwrap();
303
304 assert_eq!(parsed["event"]["event_type"], "user.created");
305 assert_eq!(parsed["event"]["entity_id"], "user-1");
306 assert_eq!(parsed["webhook_id"], webhook.id.to_string());
307 }
308}