1use crate::{
7 application::services::webhook::{
8 DeliveryStatus, WebhookDelivery, WebhookRegistry, WebhookSubscription,
9 },
10 domain::entities::Event,
11 store::WebhookDeliveryTask,
12};
13use chrono::Utc;
14use std::sync::Arc;
15use tokio::sync::mpsc;
16use uuid::Uuid;
17
18const MAX_ATTEMPTS: u32 = 5;
20
21const BASE_BACKOFF_SECS: u64 = 2;
23
24pub async fn run_webhook_delivery_worker(
29 mut rx: mpsc::UnboundedReceiver<WebhookDeliveryTask>,
30 registry: Arc<WebhookRegistry>,
31) {
32 tracing::info!("Webhook delivery worker started");
33
34 while let Some(task) = rx.recv().await {
35 let registry = Arc::clone(®istry);
36 tokio::spawn(async move {
37 deliver_with_retry(&task.webhook, &task.event, ®istry).await;
38 });
39 }
40
41 tracing::info!("Webhook delivery worker stopped (channel closed)");
42}
43
44async fn deliver_with_retry(
46 webhook: &WebhookSubscription,
47 event: &Event,
48 registry: &WebhookRegistry,
49) {
50 let delivery_id = Uuid::new_v4();
51 let client = reqwest::Client::builder()
52 .timeout(std::time::Duration::from_secs(10))
53 .build()
54 .unwrap_or_default();
55
56 let payload = build_payload(webhook, event);
58
59 let signature = compute_signature(&webhook.secret, &payload);
61
62 for attempt in 1..=MAX_ATTEMPTS {
63 let result = client
64 .post(&webhook.url)
65 .header("Content-Type", "application/json")
66 .header("X-AllSource-Signature", &signature)
67 .header("X-AllSource-Event-Type", event.event_type_str())
68 .header("X-AllSource-Delivery-Id", delivery_id.to_string())
69 .header("X-AllSource-Webhook-Id", webhook.id.to_string())
70 .body(payload.clone())
71 .send()
72 .await;
73
74 match result {
75 Ok(response) => {
76 let status_code = response.status().as_u16();
77 let body = response.text().await.unwrap_or_else(|_| String::new());
78
79 if (200..300).contains(&status_code) {
80 registry.record_delivery(WebhookDelivery {
82 id: delivery_id,
83 webhook_id: webhook.id,
84 event_id: event.id,
85 status: DeliveryStatus::Success,
86 attempt,
87 max_attempts: MAX_ATTEMPTS,
88 response_status: Some(status_code),
89 response_body: if body.is_empty() {
90 None
91 } else {
92 Some(truncate_string(&body, 500))
93 },
94 error: None,
95 created_at: Utc::now(),
96 next_retry_at: None,
97 });
98
99 tracing::debug!(
100 "Webhook delivered: {} -> {} (attempt {})",
101 event.id,
102 webhook.url,
103 attempt
104 );
105 return;
106 } else {
107 let delay = backoff_delay(attempt);
109 tracing::warn!(
110 "Webhook delivery failed: {} -> {} (status {}, attempt {}/{}), retrying in {}s",
111 event.id,
112 webhook.url,
113 status_code,
114 attempt,
115 MAX_ATTEMPTS,
116 delay.as_secs()
117 );
118
119 registry.record_delivery(WebhookDelivery {
120 id: delivery_id,
121 webhook_id: webhook.id,
122 event_id: event.id,
123 status: if attempt < MAX_ATTEMPTS {
124 DeliveryStatus::Retrying
125 } else {
126 DeliveryStatus::Failed
127 },
128 attempt,
129 max_attempts: MAX_ATTEMPTS,
130 response_status: Some(status_code),
131 response_body: Some(truncate_string(&body, 500)),
132 error: None,
133 created_at: Utc::now(),
134 next_retry_at: if attempt < MAX_ATTEMPTS {
135 Some(Utc::now() + chrono::Duration::seconds(delay.as_secs() as i64))
136 } else {
137 None
138 },
139 });
140
141 if attempt < MAX_ATTEMPTS {
142 tokio::time::sleep(delay).await;
143 }
144 }
145 }
146 Err(e) => {
147 let delay = backoff_delay(attempt);
149 tracing::warn!(
150 "Webhook delivery error: {} -> {} ({}, attempt {}/{}), retrying in {}s",
151 event.id,
152 webhook.url,
153 e,
154 attempt,
155 MAX_ATTEMPTS,
156 delay.as_secs()
157 );
158
159 registry.record_delivery(WebhookDelivery {
160 id: delivery_id,
161 webhook_id: webhook.id,
162 event_id: event.id,
163 status: if attempt < MAX_ATTEMPTS {
164 DeliveryStatus::Retrying
165 } else {
166 DeliveryStatus::Failed
167 },
168 attempt,
169 max_attempts: MAX_ATTEMPTS,
170 response_status: None,
171 response_body: None,
172 error: Some(e.to_string()),
173 created_at: Utc::now(),
174 next_retry_at: if attempt < MAX_ATTEMPTS {
175 Some(Utc::now() + chrono::Duration::seconds(delay.as_secs() as i64))
176 } else {
177 None
178 },
179 });
180
181 if attempt < MAX_ATTEMPTS {
182 tokio::time::sleep(delay).await;
183 }
184 }
185 }
186 }
187
188 tracing::error!(
189 "Webhook delivery permanently failed after {} attempts: {} -> {}",
190 MAX_ATTEMPTS,
191 event.id,
192 webhook.url
193 );
194}
195
196fn build_payload(webhook: &WebhookSubscription, event: &Event) -> String {
198 let payload = serde_json::json!({
199 "webhook_id": webhook.id,
200 "event": {
201 "id": event.id,
202 "event_type": event.event_type_str(),
203 "entity_id": event.entity_id_str(),
204 "timestamp": event.timestamp,
205 "payload": event.payload,
206 "metadata": event.metadata,
207 },
208 "delivered_at": Utc::now(),
209 });
210 serde_json::to_string(&payload).unwrap_or_default()
211}
212
213fn compute_signature(secret: &str, payload: &str) -> String {
215 use hmac::{Hmac, Mac};
216 use sha2::Sha256;
217
218 type HmacSha256 = Hmac<Sha256>;
219 let mut mac =
220 HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");
221 mac.update(payload.as_bytes());
222 let result = mac.finalize();
223 let code_bytes = result.into_bytes();
224
225 format!("sha256={}", hex::encode(code_bytes))
227}
228
229fn backoff_delay(attempt: u32) -> std::time::Duration {
231 let secs = BASE_BACKOFF_SECS * 2u64.pow(attempt.saturating_sub(1));
232 std::time::Duration::from_secs(secs.min(300))
234}
235
236fn truncate_string(s: &str, max_len: usize) -> String {
238 if s.len() <= max_len {
239 s.to_string()
240 } else {
241 format!("{}...", &s[..max_len])
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248
249 #[test]
250 fn test_backoff_delay() {
251 assert_eq!(backoff_delay(1), std::time::Duration::from_secs(2));
252 assert_eq!(backoff_delay(2), std::time::Duration::from_secs(4));
253 assert_eq!(backoff_delay(3), std::time::Duration::from_secs(8));
254 assert_eq!(backoff_delay(4), std::time::Duration::from_secs(16));
255 assert_eq!(backoff_delay(5), std::time::Duration::from_secs(32));
256 }
257
258 #[test]
259 fn test_backoff_delay_capped() {
260 assert_eq!(backoff_delay(20), std::time::Duration::from_secs(300));
262 }
263
264 #[test]
265 fn test_compute_signature() {
266 let sig = compute_signature("test-secret", r#"{"test": true}"#);
267 assert!(sig.starts_with("sha256="));
268 assert_eq!(sig.len(), 7 + 64); }
270
271 #[test]
272 fn test_truncate_string() {
273 assert_eq!(truncate_string("short", 10), "short");
274 assert_eq!(truncate_string("a longer string", 5), "a lon...");
275 }
276
277 #[test]
278 fn test_build_payload() {
279 let webhook = WebhookSubscription {
280 id: Uuid::new_v4(),
281 tenant_id: "tenant-1".to_string(),
282 url: "https://example.com/hook".to_string(),
283 event_types: vec![],
284 entity_ids: vec![],
285 secret: "secret".to_string(),
286 active: true,
287 created_at: Utc::now(),
288 updated_at: Utc::now(),
289 description: None,
290 };
291
292 let event = Event::from_strings(
293 "user.created".to_string(),
294 "user-1".to_string(),
295 "default".to_string(),
296 serde_json::json!({"name": "Test"}),
297 None,
298 )
299 .unwrap();
300
301 let payload = build_payload(&webhook, &event);
302 let parsed: serde_json::Value = serde_json::from_str(&payload).unwrap();
303
304 assert_eq!(parsed["event"]["event_type"], "user.created");
305 assert_eq!(parsed["event"]["entity_id"], "user-1");
306 assert_eq!(parsed["webhook_id"], webhook.id.to_string());
307 }
308}