#![cfg(feature = "webhook-signing")]
use std::time::{Duration, Instant};
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
use solid_pod_rs::notifications::{
ChangeNotification, WebhookChannelManager, WebhookDelivery,
};
fn sample_note(object: &str) -> ChangeNotification {
ChangeNotification {
context: "https://www.w3.org/ns/activitystreams".into(),
id: format!("urn:uuid:{}", uuid::Uuid::new_v4()),
kind: "Create".into(),
object: object.into(),
published: chrono::Utc::now().to_rfc3339(),
}
}
#[tokio::test]
async fn webhook_4xx_retains_subscription_except_410() {
let server_401 = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/hook-401"))
.respond_with(ResponseTemplate::new(401))
.mount(&server_401)
.await;
let server_410 = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/hook-410"))
.respond_with(ResponseTemplate::new(410))
.mount(&server_410)
.await;
let mgr = WebhookChannelManager::new()
.with_max_attempts(1)
.with_circuit_threshold(100);
let sub_401 = mgr
.subscribe(
"/public/",
&format!("{}/hook-401", server_401.uri()),
)
.await;
let sub_410 = mgr
.subscribe(
"/public/",
&format!("{}/hook-410", server_410.uri()),
)
.await;
assert_eq!(mgr.active_subscriptions().await, 2);
let note = sample_note("https://pod.example/public/a.ttl");
let outcomes = mgr.deliver_all(¬e, |t| t == "/public/").await;
assert_eq!(outcomes.len(), 2);
let out_401 = outcomes
.iter()
.find(|(id, _)| id == &sub_401.id)
.expect("401 sub must be in outcomes")
.1
.clone();
assert!(
matches!(out_401, WebhookDelivery::TransientRetry { .. }),
"401 should be TransientRetry, got {out_401:?}"
);
let out_410 = outcomes
.iter()
.find(|(id, _)| id == &sub_410.id)
.expect("410 sub must be in outcomes")
.1
.clone();
assert!(
matches!(out_410, WebhookDelivery::FatalDrop { status: 410 }),
"410 should be FatalDrop, got {out_410:?}"
);
assert_eq!(
mgr.active_subscriptions().await,
1,
"only the 410 subscription should have been dropped"
);
}
#[tokio::test]
async fn webhook_5xx_retry_honours_retry_after_then_succeeds() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/hook"))
.respond_with(
ResponseTemplate::new(503).insert_header("Retry-After", "1"),
)
.up_to_n_times(2)
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/hook"))
.respond_with(ResponseTemplate::new(200))
.mount(&server)
.await;
let mgr = WebhookChannelManager::new()
.with_max_attempts(3)
.with_max_backoff(Duration::from_secs(10))
.with_circuit_threshold(100);
let note = sample_note("https://pod.example/public/a.ttl");
let url = format!("{}/hook", server.uri());
let start = Instant::now();
let outcome = mgr.deliver_one(&url, ¬e).await;
let elapsed = start.elapsed();
assert!(
matches!(outcome, WebhookDelivery::Delivered { status: 200 }),
"expected Delivered{{status:200}}, got {outcome:?}"
);
assert!(
elapsed >= Duration::from_millis(1800),
"Retry-After not honoured: only slept {elapsed:?}"
);
assert_eq!(mgr.consecutive_failures(), 0);
}
#[tokio::test]
async fn webhook_circuit_breaker_opens_after_threshold() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/hook"))
.respond_with(ResponseTemplate::new(500))
.mount(&server)
.await;
let threshold = 3_u32;
let mgr = WebhookChannelManager::new()
.with_max_attempts(1) .with_max_backoff(Duration::from_millis(10))
.with_circuit_threshold(threshold);
let note = sample_note("https://pod.example/public/a.ttl");
let url = format!("{}/hook", server.uri());
for _ in 0..threshold {
let outcome = mgr.deliver_one(&url, ¬e).await;
assert!(matches!(outcome, WebhookDelivery::TransientRetry { .. }));
}
assert!(mgr.circuit_open(), "breaker should be OPEN after threshold");
let blocked = mgr.deliver_one(&url, ¬e).await;
match blocked {
WebhookDelivery::TransientRetry { reason } => {
assert!(
reason.contains("circuit open"),
"expected circuit-open reason, got {reason}"
);
}
other => panic!("expected TransientRetry(circuit open), got {other:?}"),
}
mgr.reset_circuit();
assert!(!mgr.circuit_open());
}
#[test]
fn webhook_jitter_within_window() {
let mgr = WebhookChannelManager::new()
.with_max_backoff(Duration::from_secs(10));
let cap = Duration::from_millis(500) * 4;
let mut min = Duration::from_secs(u64::MAX / 2);
let mut max = Duration::ZERO;
for _ in 0..100 {
let d = mgr.compute_backoff(2);
assert!(
d <= cap,
"back-off {d:?} exceeded cap {cap:?}"
);
let floor = Duration::from_nanos(
(cap.as_nanos() as f64 * 0.8) as u64,
);
assert!(
d >= floor,
"back-off {d:?} below 80% floor {floor:?}"
);
if d < min {
min = d;
}
if d > max {
max = d;
}
}
let spread = max.saturating_sub(min);
assert!(
spread >= Duration::from_nanos((cap.as_nanos() as f64 * 0.05) as u64),
"jitter spread {spread:?} too narrow — suggests constant back-off"
);
}