use anyhow::Context;
use crate::contracts::validate_webhook_destination_url;
use crossbeam_channel::Sender;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::Weak;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[cfg(test)]
use std::sync::{Arc, OnceLock, RwLock};
use super::super::diagnostics;
use super::super::types::WebhookMessage;
use super::runtime::{DeliveryTask, ScheduledRetry};
const WEBHOOK_RETRY_MAX_DELAY: Duration = Duration::from_secs(30);
const WEBHOOK_RETRY_JITTER_PER_MILLE: i32 = 200;
static WEBHOOK_RETRY_JITTER_COUNTER: AtomicU64 = AtomicU64::new(0);
pub(super) fn handle_delivery_task(
task: DeliveryTask,
retry_sender: &Weak<Sender<ScheduledRetry>>,
) {
match deliver_attempt(&task.msg) {
Ok(()) => {
diagnostics::note_delivery_success();
log::debug!(
"Webhook delivered successfully to {}",
redact_webhook_destination(
task.msg
.config
.url
.as_deref()
.unwrap_or("<missing webhook URL>")
)
);
}
Err(err) => {
if task.attempt < task.msg.config.retry_count {
diagnostics::note_retry_attempt();
let retry_number = task.attempt.saturating_add(1);
let scheduled = ScheduledRetry {
ready_at: Instant::now()
+ retry_delay(task.msg.config.retry_backoff, retry_number),
task: DeliveryTask {
msg: task.msg.clone(),
attempt: retry_number,
},
};
log::debug!(
"Webhook attempt {} failed for {}; scheduling retry: {:#}",
retry_number,
redact_webhook_destination(
task.msg
.config
.url
.as_deref()
.unwrap_or("<missing webhook URL>")
),
err
);
let Some(retry_sender) = retry_sender.upgrade() else {
let scheduler_error = anyhow::anyhow!(
"retry scheduler unavailable for webhook: dispatcher shutting down"
);
diagnostics::note_delivery_failure(
&task.msg,
&scheduler_error,
retry_number.saturating_add(1),
);
log::warn!("{scheduler_error:#}");
return;
};
if let Err(send_err) = retry_sender.send(scheduled) {
let scheduler_error =
anyhow::anyhow!("retry scheduler unavailable for webhook: {}", send_err);
diagnostics::note_delivery_failure(
&task.msg,
&scheduler_error,
retry_number.saturating_add(1),
);
log::warn!("{scheduler_error:#}");
}
} else {
let attempts = task.attempt.saturating_add(1);
diagnostics::note_delivery_failure(&task.msg, &err, attempts);
log::warn!(
"Webhook delivery failed after {} attempts: {:#}",
attempts,
err
);
}
}
}
}
fn retry_delay(base: Duration, retry_number: u32) -> Duration {
retry_delay_with_jitter(base, retry_number, random_jitter_per_mille())
}
fn retry_delay_with_jitter(base: Duration, retry_number: u32, jitter_per_mille: i32) -> Duration {
let multiplier = exponential_retry_multiplier(retry_number);
let millis = base
.as_millis()
.saturating_mul(multiplier)
.min(WEBHOOK_RETRY_MAX_DELAY.as_millis());
let bounded_jitter = jitter_per_mille.clamp(
-WEBHOOK_RETRY_JITTER_PER_MILLE,
WEBHOOK_RETRY_JITTER_PER_MILLE,
);
let jittered = apply_jitter(millis, bounded_jitter)
.min(WEBHOOK_RETRY_MAX_DELAY.as_millis())
.min(u64::MAX as u128) as u64;
Duration::from_millis(jittered)
}
fn exponential_retry_multiplier(retry_number: u32) -> u128 {
if retry_number == 0 {
return 0;
}
1u128
.checked_shl(retry_number.saturating_sub(1))
.unwrap_or(u128::MAX)
}
fn apply_jitter(millis: u128, jitter_per_mille: i32) -> u128 {
let scale = 1000i128 + jitter_per_mille as i128;
if scale <= 0 {
return 0;
}
millis.saturating_mul(scale as u128) / 1000
}
fn random_jitter_per_mille() -> i32 {
let mut hasher = DefaultHasher::new();
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let counter = WEBHOOK_RETRY_JITTER_COUNTER.fetch_add(1, Ordering::Relaxed);
now.hash(&mut hasher);
counter.hash(&mut hasher);
std::process::id().hash(&mut hasher);
let span = (WEBHOOK_RETRY_JITTER_PER_MILLE as u64).saturating_mul(2);
(hasher.finish() % (span + 1)) as i32 - WEBHOOK_RETRY_JITTER_PER_MILLE
}
fn deliver_attempt(msg: &WebhookMessage) -> anyhow::Result<()> {
let url = msg
.config
.url
.as_deref()
.ok_or_else(|| anyhow::anyhow!("Webhook URL not configured"))?;
validate_webhook_destination_url(
url,
msg.config.allow_insecure_http,
msg.config.allow_private_targets,
)
.context("webhook URL failed safety validation")?;
let destination = redact_webhook_destination(url);
let body = serde_json::to_string(&msg.payload)?;
let signature = msg
.config
.secret
.as_ref()
.map(|secret| generate_signature(&body, secret));
send_request(url, &body, signature.as_deref(), msg.config.timeout)
.with_context(|| format!("webhook delivery to {destination}"))
}
fn send_request(
url: &str,
body: &str,
signature: Option<&str>,
timeout: Duration,
) -> anyhow::Result<()> {
#[cfg(test)]
if let Some(handler) = test_transport() {
return handler(&TestRequest {
url: url.to_string(),
body: body.to_string(),
signature: signature.map(std::string::ToString::to_string),
timeout,
});
}
let agent = ureq::Agent::new_with_config(
ureq::Agent::config_builder()
.timeout_global(Some(timeout))
.build(),
);
let mut request = agent
.post(url)
.header("Content-Type", "application/json")
.header("User-Agent", concat!("cueloop/", env!("CARGO_PKG_VERSION")));
if let Some(sig) = signature {
request = request.header("X-CueLoop-Signature", sig);
}
let response = request.send(body)?;
let status = response.status();
if status.is_success() {
Ok(())
} else {
Err(anyhow::anyhow!(
"HTTP {}: webhook endpoint returned error",
status
))
}
}
pub(crate) fn redact_webhook_destination(url: &str) -> String {
let trimmed = url.trim();
if trimmed.is_empty() {
return "<missing webhook URL>".to_string();
}
let without_fragment = trimmed.split('#').next().unwrap_or(trimmed);
let without_query = without_fragment
.split('?')
.next()
.unwrap_or(without_fragment);
if let Some((scheme, rest)) = without_query.split_once("://") {
let authority_and_path = rest.trim_start_matches('/');
let authority = authority_and_path
.split('/')
.next()
.unwrap_or(authority_and_path)
.split('@')
.next_back()
.unwrap_or(authority_and_path);
if authority.is_empty() {
return format!("{scheme}://<redacted>");
}
let has_path = authority_and_path.len() > authority.len();
return if has_path {
format!("{scheme}://{authority}/…")
} else {
format!("{scheme}://{authority}")
};
}
let without_userinfo = without_query
.split('@')
.next_back()
.unwrap_or(without_query);
let host = without_userinfo
.split('/')
.next()
.unwrap_or(without_userinfo);
if host.is_empty() {
"<redacted webhook destination>".to_string()
} else if without_userinfo.len() > host.len() {
format!("{host}/…")
} else {
host.to_string()
}
}
pub(crate) fn generate_signature(body: &str, secret: &str) -> String {
use hmac::{Hmac, KeyInit, Mac};
use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>;
let mut mac = match HmacSha256::new_from_slice(secret.as_bytes()) {
Ok(mac) => mac,
Err(e) => {
log::error!("Failed to create HMAC (this should never happen): {}", e);
return "sha256=invalid".to_string();
}
};
mac.update(body.as_bytes());
let result = mac.finalize();
let code_bytes = result.into_bytes();
format!("sha256={}", hex::encode(code_bytes))
}
#[cfg(test)]
#[derive(Clone, Debug)]
pub(crate) struct TestRequest {
pub(crate) url: String,
pub(crate) body: String,
pub(crate) signature: Option<String>,
pub(crate) timeout: Duration,
}
#[cfg(test)]
pub(crate) type TestTransportHandler =
Arc<dyn Fn(&TestRequest) -> anyhow::Result<()> + Send + Sync + 'static>;
#[cfg(test)]
static TEST_TRANSPORT: OnceLock<RwLock<Option<TestTransportHandler>>> = OnceLock::new();
#[cfg(test)]
fn test_transport() -> Option<TestTransportHandler> {
let lock = TEST_TRANSPORT.get_or_init(|| RwLock::new(None));
match lock.read() {
Ok(guard) => guard.clone(),
Err(poisoned) => poisoned.into_inner().clone(),
}
}
#[cfg(test)]
pub(crate) fn install_test_transport_for_tests(handler: Option<TestTransportHandler>) {
let lock = TEST_TRANSPORT.get_or_init(|| RwLock::new(None));
match lock.write() {
Ok(mut guard) => *guard = handler,
Err(poisoned) => {
let mut guard = poisoned.into_inner();
*guard = handler;
}
}
}
#[cfg(test)]
mod tests {
use super::super::runtime::DeliveryTask;
use super::*;
use crate::contracts::WebhookConfig;
use crate::webhook::diagnostics::{
diagnostics_snapshot, load_failure_records_for_tests, reset_webhook_metrics_for_tests,
};
use crate::webhook::types::{
ResolvedWebhookConfig, WebhookContext, WebhookMessage, WebhookPayload,
};
use crossbeam_channel::bounded;
use serial_test::serial;
use std::sync::Arc;
fn test_message(
repo_root: &std::path::Path,
url: &str,
retry_count: u32,
allow_insecure_http: bool,
allow_private_targets: bool,
) -> WebhookMessage {
let mut config = WebhookConfig {
enabled: Some(true),
url: Some(url.to_string()),
allow_insecure_http: Some(allow_insecure_http),
allow_private_targets: Some(allow_private_targets),
timeout_secs: Some(1),
retry_count: Some(retry_count),
retry_backoff_ms: Some(10),
queue_capacity: Some(8),
parallel_queue_multiplier: None,
queue_policy: Some(crate::contracts::WebhookQueuePolicy::DropNew),
secret: None,
events: None,
};
let resolved = ResolvedWebhookConfig::from_config(&config);
config.url = None;
WebhookMessage {
payload: WebhookPayload {
event: "task_failed".to_string(),
timestamp: "2026-04-25T00:00:00Z".to_string(),
task_id: Some("RQ-0006".to_string()),
task_title: Some("Webhook contract".to_string()),
previous_status: None,
current_status: None,
note: None,
context: WebhookContext {
repo_root: Some(repo_root.display().to_string()),
..WebhookContext::default()
},
},
config: resolved,
}
}
#[test]
#[serial]
fn handle_delivery_task_records_failure_when_retry_scheduler_upgrade_fails() {
reset_webhook_metrics_for_tests();
install_test_transport_for_tests(Some(Arc::new(|_| anyhow::bail!("simulated failure"))));
let repo_root = tempfile::tempdir().expect("tempdir");
let msg = test_message(
repo_root.path(),
"https://hooks.example.com/fail",
2,
false,
false,
);
handle_delivery_task(
DeliveryTask {
msg: msg.clone(),
attempt: 0,
},
&Weak::new(),
);
let snapshot = diagnostics_snapshot(repo_root.path(), &WebhookConfig::default(), 10)
.expect("snapshot");
let records = load_failure_records_for_tests(repo_root.path()).expect("records");
assert_eq!(snapshot.failed_total, 1);
assert_eq!(records.len(), 1);
assert!(records[0].error.contains("retry scheduler unavailable"));
}
#[test]
#[serial]
fn handle_delivery_task_records_failure_when_retry_scheduler_send_fails() {
reset_webhook_metrics_for_tests();
install_test_transport_for_tests(Some(Arc::new(|_| anyhow::bail!("simulated failure"))));
let repo_root = tempfile::tempdir().expect("tempdir");
let msg = test_message(
repo_root.path(),
"https://hooks.example.com/fail",
2,
false,
false,
);
let (tx, rx) = bounded(1);
drop(rx);
let scheduler = Arc::new(tx);
let weak = Arc::downgrade(&scheduler);
handle_delivery_task(
DeliveryTask {
msg: msg.clone(),
attempt: 0,
},
&weak,
);
let snapshot = diagnostics_snapshot(repo_root.path(), &WebhookConfig::default(), 10)
.expect("snapshot");
let records = load_failure_records_for_tests(repo_root.path()).expect("records");
assert_eq!(snapshot.failed_total, 1);
assert_eq!(records.len(), 1);
assert!(records[0].error.contains("retry scheduler unavailable"));
}
#[test]
#[serial]
fn deliver_attempt_rejects_private_targets_without_leaking_secrets() {
reset_webhook_metrics_for_tests();
install_test_transport_for_tests(None);
let repo_root = tempfile::tempdir().expect("tempdir");
let secret_url = "http://127.0.0.1:9000/hooks/private-token?sig=supersecret";
let msg = test_message(repo_root.path(), secret_url, 0, false, false);
handle_delivery_task(DeliveryTask { msg, attempt: 0 }, &Weak::new());
let records = load_failure_records_for_tests(repo_root.path()).expect("records");
assert_eq!(records.len(), 1);
assert_eq!(
records[0].destination.as_deref(),
Some("http://127.0.0.1:9000/…")
);
assert!(!records[0].error.contains("private-token"));
assert!(!records[0].error.contains("supersecret"));
}
#[test]
#[serial]
fn exhausted_delivery_failure_persists_redacted_destination_and_sanitized_error() {
reset_webhook_metrics_for_tests();
let repo_root = tempfile::tempdir().expect("tempdir");
install_test_transport_for_tests(Some(Arc::new(|request| {
anyhow::bail!("delivery to {} failed with token supersecret", request.url)
})));
let msg = test_message(
repo_root.path(),
"https://user:pass@example.com/hooks/private-token?sig=supersecret",
0,
false,
false,
);
handle_delivery_task(DeliveryTask { msg, attempt: 0 }, &Weak::new());
let records = load_failure_records_for_tests(repo_root.path()).expect("records");
assert_eq!(records.len(), 1);
assert_eq!(
records[0].destination.as_deref(),
Some("https://example.com/…")
);
assert!(!records[0].error.contains("private-token"));
assert!(!records[0].error.contains("supersecret"));
assert!(records[0].error.contains("https://example.com/…"));
}
#[test]
fn retry_delay_uses_exponential_sequence_without_jitter() {
let base = Duration::from_millis(100);
assert_eq!(
retry_delay_with_jitter(base, 1, 0),
Duration::from_millis(100)
);
assert_eq!(
retry_delay_with_jitter(base, 2, 0),
Duration::from_millis(200)
);
assert_eq!(
retry_delay_with_jitter(base, 3, 0),
Duration::from_millis(400)
);
assert_eq!(
retry_delay_with_jitter(base, 4, 0),
Duration::from_millis(800)
);
}
#[test]
fn retry_delay_applies_bounded_jitter() {
let base = Duration::from_millis(1000);
assert_eq!(
retry_delay_with_jitter(base, 1, -WEBHOOK_RETRY_JITTER_PER_MILLE),
Duration::from_millis(800)
);
assert_eq!(
retry_delay_with_jitter(base, 1, WEBHOOK_RETRY_JITTER_PER_MILLE),
Duration::from_millis(1200)
);
assert_eq!(
retry_delay_with_jitter(base, 1, -999),
Duration::from_millis(800)
);
assert_eq!(
retry_delay_with_jitter(base, 1, 999),
Duration::from_millis(1200)
);
}
#[test]
fn retry_delay_caps_final_delay_after_jitter() {
let base = Duration::from_millis(10_000);
assert_eq!(
retry_delay_with_jitter(base, 4, WEBHOOK_RETRY_JITTER_PER_MILLE),
WEBHOOK_RETRY_MAX_DELAY
);
assert_eq!(
retry_delay_with_jitter(base, 10, 0),
WEBHOOK_RETRY_MAX_DELAY
);
}
#[test]
fn retry_delay_handles_extreme_retry_numbers() {
let delay = retry_delay_with_jitter(
Duration::from_millis(u64::MAX),
u32::MAX,
WEBHOOK_RETRY_JITTER_PER_MILLE,
);
assert_eq!(delay, WEBHOOK_RETRY_MAX_DELAY);
}
}