use std::collections::{HashMap, HashSet, VecDeque};
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use axum::body::{to_bytes, Body};
use axum::extract::{ConnectInfo, State};
use axum::http::header::CONTENT_LENGTH;
use axum::http::{HeaderMap, Request, StatusCode};
use axum::response::IntoResponse;
use axum::Json;
use hmac::{Hmac, KeyInit, Mac};
use serde::Serialize;
use sha2::Sha256;
use subtle::ConstantTimeEq;
use crate::state::{ApiError, ScanTrigger, ScanTriggerSource, ServerState};
type HmacSha256 = Hmac<Sha256>;
pub const MAX_WEBHOOK_BODY_BYTES: usize = 1024 * 1024;
const SIGNATURE_HEADER: &str = "X-Hub-Signature-256";
const SIGNATURE_PREFIX: &str = "sha256=";
const SIGNATURE_HEX_LEN: usize = 64;
const EVENT_HEADERS: &[&str] =
&["X-GitHub-Event", "X-Gitea-Event", "X-Forgejo-Event", "X-Gogs-Event", "X-Gitlab-Event"];
const DELIVERY_HEADERS: &[&str] = &[
"X-GitHub-Delivery",
"X-Gitea-Delivery",
"X-Forgejo-Delivery",
"X-Gogs-Delivery",
"X-Gitlab-Event-UUID",
];
pub const DELIVERY_DEDUP_CAP: usize = 1024;
pub const DEFAULT_WEBHOOK_MAX_CONCURRENT: usize = 8;
pub const DEFAULT_WEBHOOK_RATE_LIMIT_PER_MINUTE: u32 = 30;
pub const DEFAULT_WEBHOOK_RATE_LIMIT_BURST: u32 = 30;
pub const DEFAULT_WEBHOOK_RATE_LIMIT_MAX_IPS: usize = 1024;
pub struct WebhookConcurrencyLimit {
inner: Arc<tokio::sync::Semaphore>,
permits: usize,
}
impl WebhookConcurrencyLimit {
pub fn new(permits: usize) -> Self {
let permits = permits.max(1);
Self { inner: Arc::new(tokio::sync::Semaphore::new(permits)), permits }
}
pub fn permits(&self) -> usize {
self.permits
}
pub fn try_acquire(&self) -> Option<tokio::sync::OwnedSemaphorePermit> {
Arc::clone(&self.inner).try_acquire_owned().ok()
}
}
pub struct WebhookRateLimiter {
capacity: f64,
refill_per_sec: f64,
max_ips: usize,
inner: Mutex<HashMap<IpAddr, TokenBucket>>,
}
#[derive(Debug)]
struct TokenBucket {
tokens: f64,
last_refill: Instant,
}
impl WebhookRateLimiter {
pub fn new(capacity: u32, refill_per_sec: f64, max_ips: usize) -> Self {
Self {
capacity: f64::from(capacity.max(1)),
refill_per_sec: refill_per_sec.max(0.0),
max_ips: max_ips.max(1),
inner: Mutex::new(HashMap::new()),
}
}
pub fn per_minute(rate_per_minute: u32, max_ips: usize) -> Self {
let rate = rate_per_minute.max(1);
Self::new(rate, f64::from(rate) / 60.0, max_ips)
}
pub fn admit(&self, ip: IpAddr) -> bool {
self.admit_at(ip, Instant::now())
}
pub fn admit_at(&self, ip: IpAddr, now: Instant) -> bool {
let mut g = match self.inner.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
if !g.contains_key(&ip) && g.len() >= self.max_ips {
if let Some(victim) = g.iter().min_by_key(|(_, b)| b.last_refill).map(|(k, _)| *k) {
g.remove(&victim);
}
}
let bucket =
g.entry(ip).or_insert_with(|| TokenBucket { tokens: self.capacity, last_refill: now });
let elapsed = now.saturating_duration_since(bucket.last_refill).as_secs_f64();
bucket.tokens = (bucket.tokens + elapsed * self.refill_per_sec).min(self.capacity);
bucket.last_refill = now;
if bucket.tokens >= 1.0 {
bucket.tokens -= 1.0;
true
} else {
false
}
}
#[cfg(test)]
pub fn tracked_ips(&self) -> usize {
self.inner.lock().map(|g| g.len()).unwrap_or(0)
}
}
fn signature_header_is_well_formed(header: &str) -> bool {
let Some(rest) = header.trim().strip_prefix(SIGNATURE_PREFIX) else { return false };
let rest = rest.trim();
rest.len() == SIGNATURE_HEX_LEN && rest.bytes().all(|b| b.is_ascii_hexdigit())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EventKind {
Push,
Ping,
Other(String),
Unknown,
}
pub fn classify_event(headers: &HeaderMap) -> EventKind {
for name in EVENT_HEADERS {
let Some(raw) = headers.get(*name).and_then(|v| v.to_str().ok()) else { continue };
let value = raw.trim();
if value.is_empty() {
continue;
}
if value.eq_ignore_ascii_case("push") || value.eq_ignore_ascii_case("push hook") {
return EventKind::Push;
}
if value.eq_ignore_ascii_case("ping") {
return EventKind::Ping;
}
return EventKind::Other(value.to_string());
}
EventKind::Unknown
}
pub fn delivery_id(headers: &HeaderMap) -> Option<String> {
for name in DELIVERY_HEADERS {
let Some(raw) = headers.get(*name).and_then(|v| v.to_str().ok()) else { continue };
let trimmed = raw.trim();
if !trimmed.is_empty() {
return Some(trimmed.to_string());
}
}
None
}
#[derive(Default)]
pub struct DeliveryDedupCache {
seen: HashSet<String>,
order: VecDeque<String>,
}
impl DeliveryDedupCache {
pub fn new() -> Self {
Self::default()
}
pub fn record(&mut self, id: &str) -> bool {
if self.seen.contains(id) {
return false;
}
if self.order.len() >= DELIVERY_DEDUP_CAP {
if let Some(old) = self.order.pop_front() {
self.seen.remove(&old);
}
}
self.seen.insert(id.to_string());
self.order.push_back(id.to_string());
true
}
#[cfg(test)]
pub fn len(&self) -> usize {
self.order.len()
}
#[cfg(test)]
pub fn is_empty(&self) -> bool {
self.order.is_empty()
}
}
pub trait WebhookSecretResolver: Send + Sync + 'static {
fn resolve(&self) -> Option<Vec<u8>>;
}
pub struct EnvSecretResolver {
pub spec: Option<String>,
}
impl WebhookSecretResolver for EnvSecretResolver {
fn resolve(&self) -> Option<Vec<u8>> {
let spec = self.spec.as_deref()?;
let raw = if let Some(var) = spec.strip_prefix("env:") {
std::env::var(var).ok()?.into_bytes()
} else {
spec.as_bytes().to_vec()
};
if raw.is_empty() {
None
} else {
Some(raw)
}
}
}
#[derive(Clone)]
pub struct StaticSecretResolver {
pub secret: Option<Vec<u8>>,
}
impl WebhookSecretResolver for StaticSecretResolver {
fn resolve(&self) -> Option<Vec<u8>> {
self.secret.clone()
}
}
#[derive(Clone)]
pub struct WebhookConfig {
pub secret: Arc<dyn WebhookSecretResolver>,
pub branch: Option<String>,
pub repo: Option<String>,
pub dedup: Arc<Mutex<DeliveryDedupCache>>,
pub extractor: Arc<dyn WebhookPayloadExtractor>,
pub concurrency: Option<Arc<WebhookConcurrencyLimit>>,
pub rate_limit: Option<Arc<WebhookRateLimiter>>,
}
impl WebhookConfig {
pub fn new(
secret: Arc<dyn WebhookSecretResolver>,
branch: Option<String>,
repo: Option<String>,
) -> Self {
Self::with_extractor(secret, branch, repo, Arc::new(RefHeadsExtractor))
}
pub fn with_extractor(
secret: Arc<dyn WebhookSecretResolver>,
branch: Option<String>,
repo: Option<String>,
extractor: Arc<dyn WebhookPayloadExtractor>,
) -> Self {
Self {
secret,
branch,
repo,
dedup: Arc::new(Mutex::new(DeliveryDedupCache::new())),
extractor,
concurrency: None,
rate_limit: None,
}
}
pub fn with_concurrency_limit(mut self, limit: Arc<WebhookConcurrencyLimit>) -> Self {
self.concurrency = Some(limit);
self
}
pub fn with_rate_limit(mut self, limit: Arc<WebhookRateLimiter>) -> Self {
self.rate_limit = Some(limit);
self
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct ParsedPush {
pub branch: Option<String>,
pub repo_hint: Option<String>,
}
pub trait WebhookPayloadExtractor: Send + Sync + 'static {
fn extract(&self, headers: &HeaderMap, body: &[u8]) -> Option<ParsedPush>;
}
pub struct RefHeadsExtractor;
impl WebhookPayloadExtractor for RefHeadsExtractor {
fn extract(&self, _headers: &HeaderMap, body: &[u8]) -> Option<ParsedPush> {
let value: serde_json::Value = serde_json::from_slice(body).ok()?;
let branch = value
.get("ref")
.and_then(|v| v.as_str())
.and_then(|r| r.strip_prefix("refs/heads/"))
.map(|s| s.to_string());
let repo_hint = value
.get("repository")
.and_then(|r| r.get("full_name").or_else(|| r.get("name")))
.and_then(|v| v.as_str())
.map(|s| s.to_string());
Some(ParsedPush { branch, repo_hint })
}
}
pub struct BitbucketServerExtractor;
impl WebhookPayloadExtractor for BitbucketServerExtractor {
fn extract(&self, _headers: &HeaderMap, body: &[u8]) -> Option<ParsedPush> {
let value: serde_json::Value = serde_json::from_slice(body).ok()?;
let branch = value
.get("changes")
.and_then(|c| c.as_array())
.and_then(|arr| arr.first())
.and_then(|first| first.get("refId"))
.and_then(|v| v.as_str())
.and_then(|r| r.strip_prefix("refs/heads/"))
.map(|s| s.to_string());
let repo_hint = value
.get("repository")
.and_then(|r| r.get("slug").or_else(|| r.get("name")))
.and_then(|v| v.as_str())
.map(|s| s.to_string());
Some(ParsedPush { branch, repo_hint })
}
}
pub struct SourcehutExtractor;
impl WebhookPayloadExtractor for SourcehutExtractor {
fn extract(&self, _headers: &HeaderMap, body: &[u8]) -> Option<ParsedPush> {
let value: serde_json::Value = serde_json::from_slice(body).ok()?;
let event = value.get("event")?;
let branch = event
.get("refs")
.and_then(|r| r.as_array())
.and_then(|arr| arr.first())
.and_then(|first| first.get("name"))
.and_then(|v| v.as_str())
.and_then(|r| r.strip_prefix("refs/heads/").or(Some(r)))
.map(|s| s.to_string());
let repo_hint = event
.get("repo")
.and_then(|r| r.get("name"))
.and_then(|v| v.as_str())
.map(|s| s.to_string());
Some(ParsedPush { branch, repo_hint })
}
}
pub fn extractor_for_provider(name: Option<&str>) -> Arc<dyn WebhookPayloadExtractor> {
let Some(raw) = name else { return Arc::new(RefHeadsExtractor) };
match raw.trim().to_ascii_lowercase().as_str() {
"" | "github" | "gitea" | "forgejo" | "gogs" | "gitlab" | "refheads" => {
Arc::new(RefHeadsExtractor)
}
"bitbucket" | "bitbucket-server" | "bitbucket_data_center" => {
Arc::new(BitbucketServerExtractor)
}
"sourcehut" | "srht" => Arc::new(SourcehutExtractor),
other => {
tracing::warn!(
provider = other,
"unknown `[triggers].webhook_provider`; defaulting to `refheads`"
);
Arc::new(RefHeadsExtractor)
}
}
}
#[derive(Debug, Serialize)]
pub struct WebhookResponse {
pub triggered: bool,
pub run_id: Option<String>,
pub message: String,
}
fn peer_ip_from_request(req: &Request<Body>) -> Option<IpAddr> {
req.extensions().get::<ConnectInfo<SocketAddr>>().map(|c| c.0.ip())
}
pub async fn webhook_git(
State(state): State<ServerState>,
req: Request<Body>,
) -> Result<impl IntoResponse, ApiError> {
let Some(cfg) = state.webhook.as_ref() else {
return Err(ApiError::Internal(
"webhook not enabled; set [triggers].webhook_secret_ref in nyx-agent.toml".to_string(),
));
};
if let Some(limiter) = cfg.rate_limit.as_ref() {
if let Some(ip) = peer_ip_from_request(&req) {
if !limiter.admit(ip) {
return Err(ApiError::TooManyRequests(format!(
"webhook rate limit exceeded for `{ip}`"
)));
}
}
}
let _permit = if let Some(limit) = cfg.concurrency.as_ref() {
match limit.try_acquire() {
Some(permit) => Some(permit),
None => {
return Err(ApiError::TooManyRequests(
"webhook concurrency limit reached".to_string(),
));
}
}
} else {
None
};
let Some(secret) = cfg.secret.resolve() else {
return Ok((
StatusCode::SERVICE_UNAVAILABLE,
Json(WebhookResponse {
triggered: false,
run_id: None,
message: "webhook secret is not configured".to_string(),
}),
)
.into_response());
};
let sig_header = req
.headers()
.get(SIGNATURE_HEADER)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string())
.ok_or(ApiError::Unauthorized)?;
if !signature_header_is_well_formed(&sig_header) {
return Err(ApiError::Unauthorized);
}
let event = classify_event(req.headers());
match &event {
EventKind::Push | EventKind::Unknown => {}
EventKind::Ping => {
return Ok((
StatusCode::OK,
Json(WebhookResponse {
triggered: false,
run_id: None,
message: "ping event acknowledged".to_string(),
}),
)
.into_response());
}
EventKind::Other(name) => {
return Ok((
StatusCode::OK,
Json(WebhookResponse {
triggered: false,
run_id: None,
message: format!("event `{name}` is not a push; ignored"),
}),
)
.into_response());
}
}
if let Some(declared) = req
.headers()
.get(CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<usize>().ok())
{
if declared > MAX_WEBHOOK_BODY_BYTES {
return Err(ApiError::PayloadTooLarge(format!(
"webhook body {declared} bytes exceeds {MAX_WEBHOOK_BODY_BYTES} byte limit"
)));
}
}
let (parts, body) = req.into_parts();
let headers = parts.headers;
let body_bytes = to_bytes(body, MAX_WEBHOOK_BODY_BYTES).await.map_err(|e| {
ApiError::PayloadTooLarge(format!("webhook body exceeded limit or failed to read: {e}"))
})?;
if !verify_signature(&secret, body_bytes.as_ref(), &sig_header) {
return Err(ApiError::Unauthorized);
}
if let Some(delivery) = delivery_id(&headers) {
let fresh = match cfg.dedup.lock() {
Ok(mut guard) => guard.record(&delivery),
Err(poisoned) => {
tracing::warn!("webhook dedup cache poisoned: {poisoned}");
true
}
};
if !fresh {
return Ok((
StatusCode::OK,
Json(WebhookResponse {
triggered: false,
run_id: None,
message: format!("delivery `{delivery}` already processed"),
}),
)
.into_response());
}
}
let parsed = cfg.extractor.extract(&headers, body_bytes.as_ref());
let branch = parsed.as_ref().and_then(|p| p.branch.clone());
if matches!(event, EventKind::Unknown) && branch.is_none() {
return Ok((
StatusCode::OK,
Json(WebhookResponse {
triggered: false,
run_id: None,
message: "payload carried no recognised ref; not a push event".to_string(),
}),
)
.into_response());
}
if let Some(want) = cfg.branch.as_deref() {
match branch.as_deref() {
Some(actual) if actual == want => {}
other => {
return Ok((
StatusCode::OK,
Json(WebhookResponse {
triggered: false,
run_id: None,
message: format!(
"branch filter rejected delivery (want `{want}`, got `{}`)",
other.unwrap_or("<unknown>")
),
}),
)
.into_response());
}
}
}
let trigger: Arc<dyn ScanTrigger> = Arc::clone(&state.scan);
let run_id = trigger.trigger(ScanTriggerSource::Webhook, None, cfg.repo.clone(), None).await?;
Ok((
StatusCode::ACCEPTED,
Json(WebhookResponse { triggered: true, run_id: Some(run_id), message: String::new() }),
)
.into_response())
}
pub fn verify_signature(secret: &[u8], body: &[u8], header: &str) -> bool {
let Some(hex_sig) = header.trim().strip_prefix(SIGNATURE_PREFIX) else { return false };
let Ok(provided) = hex::decode(hex_sig.trim()) else { return false };
let Ok(mut mac) = HmacSha256::new_from_slice(secret) else { return false };
mac.update(body);
let expected = mac.finalize().into_bytes();
provided.as_slice().ct_eq(expected.as_slice()).into()
}
pub fn sign(secret: &[u8], body: &[u8]) -> String {
let mut mac = HmacSha256::new_from_slice(secret).expect("HMAC accepts any key length");
mac.update(body);
format!("{}{}", SIGNATURE_PREFIX, hex::encode(mac.finalize().into_bytes()))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn signature_roundtrip() {
let secret = b"hunter2";
let body = br#"{"ref":"refs/heads/main"}"#;
let header = sign(secret, body);
assert!(header.starts_with(SIGNATURE_PREFIX));
assert!(verify_signature(secret, body, &header));
}
#[test]
fn signature_rejects_modified_body() {
let secret = b"hunter2";
let body = br#"{"ref":"refs/heads/main"}"#;
let header = sign(secret, body);
assert!(!verify_signature(secret, br#"{"ref":"refs/heads/evil"}"#, &header));
}
#[test]
fn signature_rejects_wrong_secret() {
let secret = b"hunter2";
let body = br#"{"ref":"refs/heads/main"}"#;
let header = sign(secret, body);
assert!(!verify_signature(b"wrong-secret", body, &header));
}
#[test]
fn signature_rejects_missing_prefix() {
let secret = b"hunter2";
let body = b"{}";
let mut header = sign(secret, body);
header.replace_range(..SIGNATURE_PREFIX.len(), "");
assert!(!verify_signature(secret, body, &header));
}
#[test]
fn env_resolver_reads_from_environment() {
let var = format!("NYX_TEST_WEBHOOK_{}", std::process::id());
std::env::set_var(&var, "shh");
let resolver = EnvSecretResolver { spec: Some(format!("env:{var}")) };
assert_eq!(resolver.resolve().as_deref(), Some(b"shh".as_slice()));
std::env::remove_var(&var);
assert!(resolver.resolve().is_none());
}
#[test]
fn env_resolver_passes_literal_through() {
let resolver = EnvSecretResolver { spec: Some("literal-secret".to_string()) };
assert_eq!(resolver.resolve().as_deref(), Some(b"literal-secret".as_slice()));
}
#[test]
fn env_resolver_returns_none_when_unset() {
let resolver = EnvSecretResolver { spec: None };
assert!(resolver.resolve().is_none());
}
#[test]
fn env_resolver_refuses_empty_literal() {
let resolver = EnvSecretResolver { spec: Some(String::new()) };
assert!(resolver.resolve().is_none(), "empty literal secret must not pass HMAC auth");
}
#[test]
fn env_resolver_refuses_empty_env_value() {
let var = format!("NYX_TEST_WEBHOOK_EMPTY_{}", std::process::id());
std::env::set_var(&var, "");
let resolver = EnvSecretResolver { spec: Some(format!("env:{var}")) };
assert!(resolver.resolve().is_none(), "empty env-backed secret must not pass HMAC auth");
std::env::remove_var(&var);
}
#[test]
fn signature_header_shape_accepts_canonical_form() {
let header = format!("sha256={}", "a".repeat(SIGNATURE_HEX_LEN));
assert!(signature_header_is_well_formed(&header));
}
#[test]
fn signature_header_shape_accepts_mixed_case_hex() {
let header = format!("sha256={}", "AbCdEf0123456789".repeat(4));
assert!(signature_header_is_well_formed(&header));
}
#[test]
fn signature_header_shape_rejects_missing_prefix() {
let header = "a".repeat(SIGNATURE_HEX_LEN);
assert!(!signature_header_is_well_formed(&header));
}
#[test]
fn signature_header_shape_rejects_short_digest() {
let header = format!("sha256={}", "a".repeat(SIGNATURE_HEX_LEN - 1));
assert!(!signature_header_is_well_formed(&header));
}
#[test]
fn signature_header_shape_rejects_long_digest() {
let header = format!("sha256={}", "a".repeat(SIGNATURE_HEX_LEN + 1));
assert!(!signature_header_is_well_formed(&header));
}
#[test]
fn signature_header_shape_rejects_non_hex_chars() {
let header = format!("sha256={}", "z".repeat(SIGNATURE_HEX_LEN));
assert!(!signature_header_is_well_formed(&header));
}
fn map(pairs: &[(&str, &str)]) -> HeaderMap {
let mut m = HeaderMap::new();
for (k, v) in pairs {
m.insert(
axum::http::HeaderName::from_bytes(k.as_bytes()).expect("header name"),
axum::http::HeaderValue::from_str(v).expect("header value"),
);
}
m
}
#[test]
fn classify_event_recognises_github_push() {
assert_eq!(classify_event(&map(&[("X-GitHub-Event", "push")])), EventKind::Push);
}
#[test]
fn classify_event_is_case_insensitive() {
assert_eq!(classify_event(&map(&[("X-GitHub-Event", "PuSh")])), EventKind::Push);
}
#[test]
fn classify_event_recognises_gitlab_push_hook() {
assert_eq!(classify_event(&map(&[("X-Gitlab-Event", "Push Hook")])), EventKind::Push);
}
#[test]
fn classify_event_recognises_ping() {
assert_eq!(classify_event(&map(&[("X-GitHub-Event", "ping")])), EventKind::Ping);
}
#[test]
fn classify_event_returns_other_for_unknown_event_name() {
match classify_event(&map(&[("X-GitHub-Event", "issues")])) {
EventKind::Other(name) => assert_eq!(name, "issues"),
other => panic!("expected Other(\"issues\"), got {other:?}"),
}
}
#[test]
fn classify_event_returns_unknown_when_no_provider_header() {
assert_eq!(classify_event(&HeaderMap::new()), EventKind::Unknown);
}
#[test]
fn classify_event_ignores_empty_header_value() {
assert_eq!(classify_event(&map(&[("X-GitHub-Event", "")])), EventKind::Unknown);
}
#[test]
fn delivery_id_reads_github_header() {
let id = delivery_id(&map(&[("X-GitHub-Delivery", "abc-123")]));
assert_eq!(id.as_deref(), Some("abc-123"));
}
#[test]
fn delivery_id_reads_gitea_header_when_github_absent() {
let id = delivery_id(&map(&[("X-Gitea-Delivery", "xyz-789")]));
assert_eq!(id.as_deref(), Some("xyz-789"));
}
#[test]
fn delivery_id_is_none_when_no_header() {
assert!(delivery_id(&HeaderMap::new()).is_none());
}
#[test]
fn dedup_cache_records_new_id() {
let mut cache = DeliveryDedupCache::new();
assert!(cache.record("a"));
assert_eq!(cache.len(), 1);
}
#[test]
fn dedup_cache_drops_repeat() {
let mut cache = DeliveryDedupCache::new();
assert!(cache.record("a"));
assert!(!cache.record("a"), "second insert must report duplicate");
assert_eq!(cache.len(), 1);
}
#[test]
fn refheads_extractor_reads_github_push() {
let body = br#"{"ref":"refs/heads/main","repository":{"full_name":"acme/api"}}"#;
let parsed = RefHeadsExtractor.extract(&HeaderMap::new(), body).expect("parsed");
assert_eq!(parsed.branch.as_deref(), Some("main"));
assert_eq!(parsed.repo_hint.as_deref(), Some("acme/api"));
}
#[test]
fn refheads_extractor_returns_none_branch_for_tag_push() {
let body = br#"{"ref":"refs/tags/v1.2.3"}"#;
let parsed = RefHeadsExtractor.extract(&HeaderMap::new(), body).expect("parsed");
assert!(parsed.branch.is_none(), "tag pushes are not branch pushes");
}
#[test]
fn refheads_extractor_falls_back_to_repo_name() {
let body = br#"{"ref":"refs/heads/dev","repository":{"name":"api"}}"#;
let parsed = RefHeadsExtractor.extract(&HeaderMap::new(), body).expect("parsed");
assert_eq!(parsed.repo_hint.as_deref(), Some("api"));
}
#[test]
fn refheads_extractor_returns_none_on_garbage() {
assert!(RefHeadsExtractor.extract(&HeaderMap::new(), b"not-json").is_none());
}
#[test]
fn bitbucket_server_extractor_reads_changes_array() {
let body = br#"{
"changes":[{"refId":"refs/heads/develop","type":"UPDATE"}],
"repository":{"slug":"api","name":"Api Service"}
}"#;
let parsed = BitbucketServerExtractor.extract(&HeaderMap::new(), body).expect("parsed");
assert_eq!(parsed.branch.as_deref(), Some("develop"));
assert_eq!(parsed.repo_hint.as_deref(), Some("api"));
}
#[test]
fn bitbucket_server_extractor_returns_none_branch_when_changes_empty() {
let body = br#"{"changes":[],"repository":{"slug":"api"}}"#;
let parsed = BitbucketServerExtractor.extract(&HeaderMap::new(), body).expect("parsed");
assert!(parsed.branch.is_none());
assert_eq!(parsed.repo_hint.as_deref(), Some("api"));
}
#[test]
fn sourcehut_extractor_reads_nested_event_refs() {
let body =
br#"{"event":{"refs":[{"name":"refs/heads/main"}],"repo":{"name":"~user/proj"}}}"#;
let parsed = SourcehutExtractor.extract(&HeaderMap::new(), body).expect("parsed");
assert_eq!(parsed.branch.as_deref(), Some("main"));
assert_eq!(parsed.repo_hint.as_deref(), Some("~user/proj"));
}
#[test]
fn sourcehut_extractor_keeps_bare_branch_names() {
let body = br#"{"event":{"refs":[{"name":"main"}]}}"#;
let parsed = SourcehutExtractor.extract(&HeaderMap::new(), body).expect("parsed");
assert_eq!(parsed.branch.as_deref(), Some("main"));
}
#[test]
fn extractor_for_provider_defaults_when_missing() {
let body = br#"{"ref":"refs/heads/main"}"#;
let ex = extractor_for_provider(None);
assert_eq!(
ex.extract(&HeaderMap::new(), body).and_then(|p| p.branch).as_deref(),
Some("main"),
);
}
#[test]
fn extractor_for_provider_matches_known_aliases() {
for name in ["github", "GITHUB", " gitea ", "forgejo", "gogs", "gitlab", "refheads"] {
let ex = extractor_for_provider(Some(name));
let body = br#"{"ref":"refs/heads/main"}"#;
assert_eq!(
ex.extract(&HeaderMap::new(), body).and_then(|p| p.branch).as_deref(),
Some("main"),
"alias `{name}` should map to RefHeadsExtractor",
);
}
}
#[test]
fn extractor_for_provider_picks_bitbucket() {
let ex = extractor_for_provider(Some("bitbucket"));
let body = br#"{"changes":[{"refId":"refs/heads/main"}]}"#;
assert_eq!(
ex.extract(&HeaderMap::new(), body).and_then(|p| p.branch).as_deref(),
Some("main"),
);
}
#[test]
fn extractor_for_provider_picks_sourcehut() {
let ex = extractor_for_provider(Some("sourcehut"));
let body = br#"{"event":{"refs":[{"name":"refs/heads/main"}]}}"#;
assert_eq!(
ex.extract(&HeaderMap::new(), body).and_then(|p| p.branch).as_deref(),
Some("main"),
);
}
#[test]
fn extractor_for_provider_falls_back_on_unknown() {
let ex = extractor_for_provider(Some("notarealthing"));
let body = br#"{"ref":"refs/heads/main"}"#;
assert_eq!(
ex.extract(&HeaderMap::new(), body).and_then(|p| p.branch).as_deref(),
Some("main"),
);
}
#[test]
fn rate_limiter_admits_until_bucket_empty() {
let limiter = WebhookRateLimiter::new(3, 0.0, 16);
let ip: IpAddr = "127.0.0.1".parse().unwrap();
assert!(limiter.admit(ip));
assert!(limiter.admit(ip));
assert!(limiter.admit(ip));
assert!(!limiter.admit(ip), "fourth request must be refused");
}
#[test]
fn rate_limiter_refills_over_time() {
let limiter = WebhookRateLimiter::new(2, 1.0, 16);
let ip: IpAddr = "10.0.0.5".parse().unwrap();
let t0 = Instant::now();
assert!(limiter.admit_at(ip, t0));
assert!(limiter.admit_at(ip, t0));
assert!(!limiter.admit_at(ip, t0));
let t1 = t0 + std::time::Duration::from_secs(1);
assert!(limiter.admit_at(ip, t1));
assert!(!limiter.admit_at(ip, t1));
let t6 = t1 + std::time::Duration::from_secs(5);
assert!(limiter.admit_at(ip, t6));
assert!(limiter.admit_at(ip, t6));
assert!(!limiter.admit_at(ip, t6));
}
#[test]
fn rate_limiter_per_ip_buckets_are_independent() {
let limiter = WebhookRateLimiter::new(1, 0.0, 16);
let a: IpAddr = "127.0.0.1".parse().unwrap();
let b: IpAddr = "127.0.0.2".parse().unwrap();
assert!(limiter.admit(a));
assert!(!limiter.admit(a));
assert!(limiter.admit(b));
}
#[test]
fn rate_limiter_per_minute_helper_matches_rate() {
let limiter = WebhookRateLimiter::per_minute(60, 64);
let ip: IpAddr = "127.0.0.1".parse().unwrap();
let t0 = Instant::now();
for _ in 0..60 {
assert!(limiter.admit_at(ip, t0));
}
assert!(!limiter.admit_at(ip, t0));
}
#[test]
fn rate_limiter_evicts_oldest_ip_at_cap() {
let limiter = WebhookRateLimiter::new(1, 0.0, 2);
let t0 = Instant::now();
let a: IpAddr = "127.0.0.1".parse().unwrap();
let b: IpAddr = "127.0.0.2".parse().unwrap();
let c: IpAddr = "127.0.0.3".parse().unwrap();
assert!(limiter.admit_at(a, t0));
assert!(limiter.admit_at(b, t0 + std::time::Duration::from_secs(1)));
assert!(limiter.admit_at(c, t0 + std::time::Duration::from_secs(2)));
assert_eq!(limiter.tracked_ips(), 2);
}
#[test]
fn concurrency_limit_refuses_past_cap() {
let limit = WebhookConcurrencyLimit::new(2);
let p1 = limit.try_acquire().expect("first permit");
let p2 = limit.try_acquire().expect("second permit");
assert!(limit.try_acquire().is_none(), "third acquire must fail when cap is reached");
drop(p1);
assert!(limit.try_acquire().is_some(), "releasing a permit must make one available again");
drop(p2);
}
#[test]
fn concurrency_limit_floor_is_one() {
let limit = WebhookConcurrencyLimit::new(0);
assert_eq!(limit.permits(), 1);
assert!(limit.try_acquire().is_some());
}
#[test]
fn dedup_cache_evicts_oldest_at_cap() {
let mut cache = DeliveryDedupCache::new();
for i in 0..DELIVERY_DEDUP_CAP {
assert!(cache.record(&format!("d-{i}")));
}
assert_eq!(cache.len(), DELIVERY_DEDUP_CAP);
assert!(cache.record("d-new"));
assert_eq!(cache.len(), DELIVERY_DEDUP_CAP);
assert!(cache.record("d-0"));
}
}