use crate::{
application::services::webhook::{
DeliveryStatus, WebhookDelivery, WebhookRegistry, WebhookSubscription,
},
domain::entities::Event,
store::WebhookDeliveryTask,
};
use chrono::Utc;
use std::sync::Arc;
use tokio::sync::mpsc;
use uuid::Uuid;
const MAX_ATTEMPTS: u32 = 5;
const BASE_BACKOFF_SECS: u64 = 2;
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub async fn run_webhook_delivery_worker(
mut rx: mpsc::UnboundedReceiver<WebhookDeliveryTask>,
registry: Arc<WebhookRegistry>,
) {
tracing::info!("Webhook delivery worker started");
while let Some(task) = rx.recv().await {
let registry = Arc::clone(®istry);
tokio::spawn(async move {
deliver_with_retry(&task.webhook, &task.event, ®istry).await;
});
}
tracing::info!("Webhook delivery worker stopped (channel closed)");
}
async fn deliver_with_retry(
webhook: &WebhookSubscription,
event: &Event,
registry: &WebhookRegistry,
) {
let delivery_id = Uuid::new_v4();
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.unwrap_or_default();
let payload = build_payload(webhook, event);
let signature = compute_signature(&webhook.secret, &payload);
for attempt in 1..=MAX_ATTEMPTS {
let result = client
.post(&webhook.url)
.header("Content-Type", "application/json")
.header("X-AllSource-Signature", &signature)
.header("X-AllSource-Event-Type", event.event_type_str())
.header("X-AllSource-Delivery-Id", delivery_id.to_string())
.header("X-AllSource-Webhook-Id", webhook.id.to_string())
.body(payload.clone())
.send()
.await;
match result {
Ok(response) => {
let status_code = response.status().as_u16();
let body = response.text().await.unwrap_or_else(|_| String::new());
if (200..300).contains(&status_code) {
registry.record_delivery(WebhookDelivery {
id: delivery_id,
webhook_id: webhook.id,
event_id: event.id,
status: DeliveryStatus::Success,
attempt,
max_attempts: MAX_ATTEMPTS,
response_status: Some(status_code),
response_body: if body.is_empty() {
None
} else {
Some(truncate_string(&body, 500))
},
error: None,
created_at: Utc::now(),
next_retry_at: None,
});
tracing::debug!(
"Webhook delivered: {} -> {} (attempt {})",
event.id,
webhook.url,
attempt
);
return;
}
let delay = backoff_delay(attempt);
tracing::warn!(
"Webhook delivery failed: {} -> {} (status {}, attempt {}/{}), retrying in {}s",
event.id,
webhook.url,
status_code,
attempt,
MAX_ATTEMPTS,
delay.as_secs()
);
registry.record_delivery(WebhookDelivery {
id: delivery_id,
webhook_id: webhook.id,
event_id: event.id,
status: if attempt < MAX_ATTEMPTS {
DeliveryStatus::Retrying
} else {
DeliveryStatus::Failed
},
attempt,
max_attempts: MAX_ATTEMPTS,
response_status: Some(status_code),
response_body: Some(truncate_string(&body, 500)),
error: None,
created_at: Utc::now(),
next_retry_at: if attempt < MAX_ATTEMPTS {
Some(Utc::now() + chrono::Duration::seconds(delay.as_secs() as i64))
} else {
None
},
});
if attempt < MAX_ATTEMPTS {
tokio::time::sleep(delay).await;
}
}
Err(e) => {
let delay = backoff_delay(attempt);
tracing::warn!(
"Webhook delivery error: {} -> {} ({}, attempt {}/{}), retrying in {}s",
event.id,
webhook.url,
e,
attempt,
MAX_ATTEMPTS,
delay.as_secs()
);
registry.record_delivery(WebhookDelivery {
id: delivery_id,
webhook_id: webhook.id,
event_id: event.id,
status: if attempt < MAX_ATTEMPTS {
DeliveryStatus::Retrying
} else {
DeliveryStatus::Failed
},
attempt,
max_attempts: MAX_ATTEMPTS,
response_status: None,
response_body: None,
error: Some(e.to_string()),
created_at: Utc::now(),
next_retry_at: if attempt < MAX_ATTEMPTS {
Some(Utc::now() + chrono::Duration::seconds(delay.as_secs() as i64))
} else {
None
},
});
if attempt < MAX_ATTEMPTS {
tokio::time::sleep(delay).await;
}
}
}
}
tracing::error!(
"Webhook delivery permanently failed after {} attempts: {} -> {}",
MAX_ATTEMPTS,
event.id,
webhook.url
);
}
fn build_payload(webhook: &WebhookSubscription, event: &Event) -> String {
let payload = serde_json::json!({
"webhook_id": webhook.id,
"event": {
"id": event.id,
"event_type": event.event_type_str(),
"entity_id": event.entity_id_str(),
"timestamp": event.timestamp,
"payload": event.payload,
"metadata": event.metadata,
},
"delivered_at": Utc::now(),
});
serde_json::to_string(&payload).unwrap_or_default()
}
fn compute_signature(secret: &str, payload: &str) -> String {
use hmac::{Hmac, Mac};
use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>;
let mut mac =
HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");
mac.update(payload.as_bytes());
let result = mac.finalize();
let code_bytes = result.into_bytes();
format!("sha256={}", hex::encode(code_bytes))
}
fn backoff_delay(attempt: u32) -> std::time::Duration {
let secs = BASE_BACKOFF_SECS * 2u64.pow(attempt.saturating_sub(1));
std::time::Duration::from_secs(secs.min(300))
}
fn truncate_string(s: &str, max_len: usize) -> String {
if s.len() <= max_len {
s.to_string()
} else {
format!("{}...", &s[..max_len])
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_backoff_delay() {
assert_eq!(backoff_delay(1), std::time::Duration::from_secs(2));
assert_eq!(backoff_delay(2), std::time::Duration::from_secs(4));
assert_eq!(backoff_delay(3), std::time::Duration::from_secs(8));
assert_eq!(backoff_delay(4), std::time::Duration::from_secs(16));
assert_eq!(backoff_delay(5), std::time::Duration::from_secs(32));
}
#[test]
fn test_backoff_delay_capped() {
assert_eq!(backoff_delay(20), std::time::Duration::from_secs(300));
}
#[test]
fn test_compute_signature() {
let sig = compute_signature("test-secret", r#"{"test": true}"#);
assert!(sig.starts_with("sha256="));
assert_eq!(sig.len(), 7 + 64); }
#[test]
fn test_truncate_string() {
assert_eq!(truncate_string("short", 10), "short");
assert_eq!(truncate_string("a longer string", 5), "a lon...");
}
#[test]
fn test_build_payload() {
let webhook = WebhookSubscription {
id: Uuid::new_v4(),
tenant_id: "tenant-1".to_string(),
url: "https://example.com/hook".to_string(),
event_types: vec![],
entity_ids: vec![],
secret: "secret".to_string(),
active: true,
created_at: Utc::now(),
updated_at: Utc::now(),
description: None,
};
let event = Event::from_strings(
"user.created".to_string(),
"user-1".to_string(),
"default".to_string(),
serde_json::json!({"name": "Test"}),
None,
)
.unwrap();
let payload = build_payload(&webhook, &event);
let parsed: serde_json::Value = serde_json::from_str(&payload).unwrap();
assert_eq!(parsed["event"]["event_type"], "user.created");
assert_eq!(parsed["event"]["entity_id"], "user-1");
assert_eq!(parsed["webhook_id"], webhook.id.to_string());
}
}