1use std::sync::{Arc, Mutex};
16use std::time::Duration;
17
18use serde::Serialize;
19
20use crate::webhooks::WebhookRegistry;
21
22#[derive(Debug, Clone, Serialize)]
26pub struct DeliveryConfig {
27 pub timeout: Duration,
29 pub max_retries: u32,
31 pub base_delay: Duration,
33 pub max_delay: Duration,
35}
36
37impl Default for DeliveryConfig {
38 fn default() -> Self {
39 DeliveryConfig {
40 timeout: Duration::from_secs(10),
41 max_retries: 3,
42 base_delay: Duration::from_millis(500),
43 max_delay: Duration::from_secs(30),
44 }
45 }
46}
47
48#[derive(Debug, Clone, Serialize)]
52pub struct DeliveryResult {
53 pub webhook_id: String,
54 pub topic: String,
55 pub status_code: u16,
56 pub success: bool,
57 pub latency_ms: u64,
58 pub attempts: u32,
59 pub error: Option<String>,
60}
61
62#[derive(Debug, Clone, Serialize)]
66pub struct WebhookPayload {
67 pub event: String,
68 pub payload: serde_json::Value,
69 pub source: String,
70 pub timestamp: u64,
71}
72
73pub async fn deliver_one(
80 url: &str,
81 body: &WebhookPayload,
82 signature: Option<&str>,
83 timeout: Duration,
84) -> (u16, u64, Option<String>) {
85 let start = std::time::Instant::now();
86
87 let client = match reqwest::Client::builder()
88 .timeout(timeout)
89 .build()
90 {
91 Ok(c) => c,
92 Err(e) => return (0, 0, Some(format!("client build error: {e}"))),
93 };
94
95 let mut request = client.post(url)
96 .header("Content-Type", "application/json")
97 .header("User-Agent", "AxonServer-Webhook/1.0");
98
99 if let Some(sig) = signature {
100 request = request.header("X-Axon-Signature", sig);
101 }
102
103 let body_bytes = match serde_json::to_vec(body) {
104 Ok(b) => b,
105 Err(e) => return (0, 0, Some(format!("serialize error: {e}"))),
106 };
107
108 request = request.body(body_bytes);
109
110 match request.send().await {
111 Ok(response) => {
112 let latency = start.elapsed().as_millis() as u64;
113 let status = response.status().as_u16();
114 if (200..300).contains(&status) {
115 (status, latency, None)
116 } else {
117 let error_text = response.text().await.unwrap_or_default();
118 let msg = if error_text.len() > 200 {
119 format!("HTTP {status}: {}...", &error_text[..200])
120 } else {
121 format!("HTTP {status}: {error_text}")
122 };
123 (status, latency, Some(msg))
124 }
125 }
126 Err(e) => {
127 let latency = start.elapsed().as_millis() as u64;
128 if e.is_timeout() {
129 (0, latency, Some("timeout".to_string()))
130 } else if e.is_connect() {
131 (0, latency, Some(format!("connection error: {e}")))
132 } else {
133 (0, latency, Some(format!("request error: {e}")))
134 }
135 }
136 }
137}
138
139pub async fn deliver_with_retry(
146 url: &str,
147 body: &WebhookPayload,
148 signature: Option<&str>,
149 config: &DeliveryConfig,
150) -> DeliveryResult {
151 let webhook_id = String::new(); let topic = body.event.clone();
153 let mut last_status = 0u16;
154 let mut _last_latency = 0u64;
155 let mut last_error = None;
156 let total_start = std::time::Instant::now();
157
158 for attempt in 0..=config.max_retries {
159 let (status, latency, error) = deliver_one(url, body, signature, config.timeout).await;
160
161 last_status = status;
162 _last_latency = latency;
163 last_error = error.clone();
164
165 if (200..300).contains(&status) {
167 return DeliveryResult {
168 webhook_id,
169 topic,
170 status_code: status,
171 success: true,
172 latency_ms: total_start.elapsed().as_millis() as u64,
173 attempts: attempt + 1,
174 error: None,
175 };
176 }
177
178 if (400..500).contains(&status) {
180 return DeliveryResult {
181 webhook_id,
182 topic,
183 status_code: status,
184 success: false,
185 latency_ms: total_start.elapsed().as_millis() as u64,
186 attempts: attempt + 1,
187 error,
188 };
189 }
190
191 if attempt < config.max_retries {
193 let delay = compute_backoff(attempt, config.base_delay, config.max_delay);
194 tokio::time::sleep(delay).await;
195 }
196 }
197
198 DeliveryResult {
200 webhook_id,
201 topic,
202 status_code: last_status,
203 success: false,
204 latency_ms: total_start.elapsed().as_millis() as u64,
205 attempts: config.max_retries + 1,
206 error: last_error,
207 }
208}
209
210fn compute_backoff(attempt: u32, base: Duration, max: Duration) -> Duration {
212 let exp_ms = base.as_millis() as u64 * (1u64 << attempt.min(10));
213 let capped = exp_ms.min(max.as_millis() as u64);
214 let jitter_factor = match attempt % 4 {
216 0 => 100,
217 1 => 75,
218 2 => 125,
219 _ => 110,
220 };
221 let final_ms = capped * jitter_factor / 100;
222 Duration::from_millis(final_ms)
223}
224
225pub fn dispatch_all(
235 registry: Arc<Mutex<WebhookRegistry>>,
236 matched_ids: Vec<String>,
237 topic: String,
238 payload: serde_json::Value,
239 source: String,
240 config: DeliveryConfig,
241) -> usize {
242 let mut targets: Vec<(String, String, Option<String>)> = Vec::new(); {
246 let reg = registry.lock().unwrap();
247 for id in &matched_ids {
248 if let Some(wh) = reg.get(id) {
249 targets.push((wh.id.clone(), wh.url.clone(), wh.secret.clone()));
250 }
251 }
252 }
253
254 let count = targets.len();
255 let timestamp = std::time::SystemTime::now()
256 .duration_since(std::time::UNIX_EPOCH)
257 .unwrap_or_default()
258 .as_secs();
259
260 for (webhook_id, url, secret) in targets {
261 let registry = registry.clone();
262 let topic = topic.clone();
263 let payload = payload.clone();
264 let source = source.clone();
265 let config = config.clone();
266
267 tokio::spawn(async move {
268 let body = WebhookPayload {
269 event: topic.clone(),
270 payload,
271 source,
272 timestamp,
273 };
274
275 let signature = secret.as_ref().map(|s| {
276 let body_bytes = serde_json::to_vec(&body).unwrap_or_default();
277 WebhookRegistry::compute_signature(s, &body_bytes)
278 });
279
280 let mut result = deliver_with_retry(
281 &url,
282 &body,
283 signature.as_deref(),
284 &config,
285 ).await;
286
287 result.webhook_id = webhook_id.clone();
288
289 if let Ok(mut reg) = registry.lock() {
291 reg.record_completed(
292 &webhook_id,
293 &topic,
294 result.status_code,
295 result.latency_ms,
296 result.error.clone(),
297 result.attempts - 1,
298 );
299 }
300 });
301 }
302
303 count
304}
305
306#[cfg(test)]
309mod tests {
310 use super::*;
311
312 #[test]
313 fn delivery_config_defaults() {
314 let config = DeliveryConfig::default();
315 assert_eq!(config.timeout, Duration::from_secs(10));
316 assert_eq!(config.max_retries, 3);
317 assert_eq!(config.base_delay, Duration::from_millis(500));
318 assert_eq!(config.max_delay, Duration::from_secs(30));
319 }
320
321 #[test]
322 fn delivery_config_serializable() {
323 let config = DeliveryConfig::default();
324 let json = serde_json::to_value(&config).unwrap();
325 assert!(json["timeout"].is_object()); assert_eq!(json["max_retries"], 3);
327 }
328
329 #[test]
330 fn delivery_result_serializable() {
331 let result = DeliveryResult {
332 webhook_id: "wh_1".to_string(),
333 topic: "deploy".to_string(),
334 status_code: 200,
335 success: true,
336 latency_ms: 45,
337 attempts: 1,
338 error: None,
339 };
340 let json = serde_json::to_value(&result).unwrap();
341 assert_eq!(json["webhook_id"], "wh_1");
342 assert_eq!(json["status_code"], 200);
343 assert_eq!(json["success"], true);
344 assert_eq!(json["attempts"], 1);
345 assert!(json["error"].is_null());
346 }
347
348 #[test]
349 fn delivery_result_with_error() {
350 let result = DeliveryResult {
351 webhook_id: "wh_2".to_string(),
352 topic: "config.updated".to_string(),
353 status_code: 500,
354 success: false,
355 latency_ms: 120,
356 attempts: 4,
357 error: Some("HTTP 500: Internal Server Error".to_string()),
358 };
359 let json = serde_json::to_value(&result).unwrap();
360 assert_eq!(json["success"], false);
361 assert_eq!(json["attempts"], 4);
362 assert_eq!(json["error"], "HTTP 500: Internal Server Error");
363 }
364
365 #[test]
366 fn webhook_payload_serializable() {
367 let payload = WebhookPayload {
368 event: "deploy".to_string(),
369 payload: serde_json::json!({"flows": ["FlowA"]}),
370 source: "server".to_string(),
371 timestamp: 1700000000,
372 };
373 let json = serde_json::to_value(&payload).unwrap();
374 assert_eq!(json["event"], "deploy");
375 assert_eq!(json["source"], "server");
376 assert_eq!(json["timestamp"], 1700000000u64);
377 assert!(json["payload"]["flows"].is_array());
378 }
379
380 #[test]
381 fn compute_backoff_exponential() {
382 let base = Duration::from_millis(500);
383 let max = Duration::from_secs(30);
384
385 let d0 = compute_backoff(0, base, max);
386 let d1 = compute_backoff(1, base, max);
387 let d2 = compute_backoff(2, base, max);
388
389 assert_eq!(d0.as_millis(), 500);
391 assert_eq!(d1.as_millis(), 750);
393 assert_eq!(d2.as_millis(), 2500);
395 }
396
397 #[test]
398 fn compute_backoff_capped() {
399 let base = Duration::from_secs(10);
400 let max = Duration::from_secs(30);
401
402 let d = compute_backoff(5, base, max);
404 assert!(d.as_secs() <= 40); }
406
407 #[tokio::test]
408 async fn deliver_one_connection_refused() {
409 let body = WebhookPayload {
411 event: "test".to_string(),
412 payload: serde_json::json!(null),
413 source: "test".to_string(),
414 timestamp: 0,
415 };
416
417 let (status, _latency, error) = deliver_one(
418 "http://127.0.0.1:19999/nonexistent",
419 &body,
420 None,
421 Duration::from_secs(2),
422 ).await;
423
424 assert_eq!(status, 0);
425 assert!(error.is_some());
426 let err_msg = error.unwrap();
427 assert!(!err_msg.is_empty(), "expected non-empty error, got: {err_msg}");
429 }
430
431 #[tokio::test]
432 async fn deliver_with_retry_connection_refused_exhausts_retries() {
433 let body = WebhookPayload {
434 event: "test".to_string(),
435 payload: serde_json::json!(null),
436 source: "test".to_string(),
437 timestamp: 0,
438 };
439
440 let config = DeliveryConfig {
441 timeout: Duration::from_secs(1),
442 max_retries: 1, base_delay: Duration::from_millis(50),
444 max_delay: Duration::from_millis(100),
445 };
446
447 let result = deliver_with_retry(
448 "http://127.0.0.1:19999/nonexistent",
449 &body,
450 None,
451 &config,
452 ).await;
453
454 assert!(!result.success);
455 assert_eq!(result.attempts, 2); assert!(result.error.is_some());
457 }
458}