Skip to main content

nyx_agent_api/
webhook.rs

1//! `POST /webhook/git` route.
2//!
3//! Accepts a GitHub-shaped push payload, verifies the HMAC-SHA256
4//! signature against the operator's configured shared secret, applies
5//! the optional branch filter, and triggers a scan via the same
6//! [`ScanTrigger`] handle the manual `/api/v1/scan` endpoint uses.
7//!
8//! Layout chosen for maximum compatibility with self-hosted git
9//! servers:
10//!
11//! - Header: `X-Hub-Signature-256: sha256=<hex>` (the GitHub /
12//!   Gitea / Forgejo / Sourcehut convention).
13//! - Body: any JSON object carrying a `"ref": "refs/heads/<branch>"`
14//!   field. Other fields are ignored so a thin Gitea / Bitbucket
15//!   payload also works.
16//! - HMAC: the signature is computed over the raw body bytes; we use
17//!   `subtle::ConstantTimeEq` to avoid timing leaks.
18//! - The webhook bypasses bearer auth because HMAC IS the auth.
19//!
20//! Errors:
21//! - Missing / invalid signature → HTTP 401.
22//! - Missing / unset secret → HTTP 503 (operator must configure
23//!   `triggers.webhook_secret_ref`).
24//! - Wrong branch → HTTP 200 with `triggered=false` so the upstream
25//!   git server records a successful delivery and stops retrying.
26
27use std::collections::{HashMap, HashSet, VecDeque};
28use std::net::{IpAddr, SocketAddr};
29use std::sync::{Arc, Mutex};
30use std::time::Instant;
31
32use axum::body::{to_bytes, Body};
33use axum::extract::{ConnectInfo, State};
34use axum::http::header::CONTENT_LENGTH;
35use axum::http::{HeaderMap, Request, StatusCode};
36use axum::response::IntoResponse;
37use axum::Json;
38use hmac::{Hmac, KeyInit, Mac};
39use serde::Serialize;
40use sha2::Sha256;
41use subtle::ConstantTimeEq;
42
43use crate::state::{ApiError, ScanTrigger, ScanTriggerSource, ServerState};
44
45type HmacSha256 = Hmac<Sha256>;
46
47/// Maximum webhook body the handler will buffer before bailing out
48/// with 413. 1 MiB covers every observed git-server payload comfortably
49/// while still bounding peak memory under a malicious caller.
50pub const MAX_WEBHOOK_BODY_BYTES: usize = 1024 * 1024;
51
52const SIGNATURE_HEADER: &str = "X-Hub-Signature-256";
53const SIGNATURE_PREFIX: &str = "sha256=";
54
55/// `sha256` produces a 32-byte digest, which encodes to 64 hex chars.
56const SIGNATURE_HEX_LEN: usize = 64;
57
58/// Headers we consult to identify the upstream event type. Order is
59/// the precedence used when more than one is present (which never
60/// happens in practice but stays deterministic if it does).
61const EVENT_HEADERS: &[&str] =
62    &["X-GitHub-Event", "X-Gitea-Event", "X-Forgejo-Event", "X-Gogs-Event", "X-Gitlab-Event"];
63
64/// Headers we consult for a delivery / replay id. Same provider order
65/// as [`EVENT_HEADERS`].
66const DELIVERY_HEADERS: &[&str] = &[
67    "X-GitHub-Delivery",
68    "X-Gitea-Delivery",
69    "X-Forgejo-Delivery",
70    "X-Gogs-Delivery",
71    "X-Gitlab-Event-UUID",
72];
73
74/// Bounded cap on the in-memory replay-dedup cache. Each entry is the
75/// raw delivery id string from the upstream provider (typically a
76/// UUID, ~36 bytes); 1024 entries caps memory at well under 100 KiB
77/// and covers the largest plausible burst window before older
78/// deliveries naturally roll off.
79pub const DELIVERY_DEDUP_CAP: usize = 1024;
80
81/// Default cap on simultaneous in-flight webhook handlers. Set above
82/// the dispatcher's scan-request queue depth so legitimate bursts are
83/// absorbed, but bounded so a flood of valid-HMAC deliveries cannot
84/// peg every tokio worker on HMAC verification + body read in
85/// parallel. Operators tune via `[triggers].webhook_max_concurrent`.
86pub const DEFAULT_WEBHOOK_MAX_CONCURRENT: usize = 8;
87
88/// Default per-source-IP token bucket size. One push every two seconds
89/// sustained, with bursts up to [`DEFAULT_WEBHOOK_RATE_LIMIT_BURST`]
90/// allowed. Operators tune via `[triggers].webhook_rate_limit_per_minute`.
91pub const DEFAULT_WEBHOOK_RATE_LIMIT_PER_MINUTE: u32 = 30;
92
93/// Burst depth for the per-IP token bucket. Matches the default
94/// per-minute rate so a fresh sender can fire up to this many
95/// deliveries back-to-back before the bucket drains.
96pub const DEFAULT_WEBHOOK_RATE_LIMIT_BURST: u32 = 30;
97
98/// Maximum number of IPs the per-IP rate limiter tracks before it
99/// evicts the least-recently-seen entry. Caps memory under a flood of
100/// unique source addresses.
101pub const DEFAULT_WEBHOOK_RATE_LIMIT_MAX_IPS: usize = 1024;
102
103/// Hand-rolled semaphore-backed concurrency cap on `webhook_git`.
104/// Lives on [`WebhookConfig`] so a router rebuild keeps the live
105/// permit count intact. Wraps an `Arc<Semaphore>` directly rather
106/// than depending on `tower::limit::ConcurrencyLimitLayer` so the
107/// workspace stays off the `tower` base crate.
108pub struct WebhookConcurrencyLimit {
109    inner: Arc<tokio::sync::Semaphore>,
110    permits: usize,
111}
112
113impl WebhookConcurrencyLimit {
114    pub fn new(permits: usize) -> Self {
115        let permits = permits.max(1);
116        Self { inner: Arc::new(tokio::sync::Semaphore::new(permits)), permits }
117    }
118
119    /// Total permits configured. Used by tests and for operator
120    /// reporting.
121    pub fn permits(&self) -> usize {
122        self.permits
123    }
124
125    /// Try to acquire one permit without waiting. Returns
126    /// `Some(permit)` on success; the caller must hold the permit
127    /// until the response is sent. Returns `None` when every permit
128    /// is in flight so the handler can refuse with 429.
129    pub fn try_acquire(&self) -> Option<tokio::sync::OwnedSemaphorePermit> {
130        Arc::clone(&self.inner).try_acquire_owned().ok()
131    }
132}
133
134/// Per-source-IP token bucket. `capacity` tokens refill at
135/// `refill_per_sec` tokens/second; each admitted request consumes
136/// one. When the bucket empties the next call to
137/// [`WebhookRateLimiter::admit`] returns `false` so the handler can
138/// refuse with 429.
139///
140/// The map is bounded at `max_ips`: when an unknown IP would push
141/// the map past the cap, the entry with the oldest `last_refill`
142/// timestamp is evicted. That keeps memory bounded under a flood of
143/// unique source addresses while letting a steady stream of senders
144/// keep their state warm.
145pub struct WebhookRateLimiter {
146    capacity: f64,
147    refill_per_sec: f64,
148    max_ips: usize,
149    inner: Mutex<HashMap<IpAddr, TokenBucket>>,
150}
151
152#[derive(Debug)]
153struct TokenBucket {
154    tokens: f64,
155    last_refill: Instant,
156}
157
158impl WebhookRateLimiter {
159    /// Build a limiter that admits `capacity` requests up front and
160    /// then refills at `refill_per_sec` tokens per second. `max_ips`
161    /// caps the tracked set; entries past the cap are evicted oldest
162    /// first.
163    pub fn new(capacity: u32, refill_per_sec: f64, max_ips: usize) -> Self {
164        Self {
165            capacity: f64::from(capacity.max(1)),
166            refill_per_sec: refill_per_sec.max(0.0),
167            max_ips: max_ips.max(1),
168            inner: Mutex::new(HashMap::new()),
169        }
170    }
171
172    /// Build a limiter from the operator-facing
173    /// `webhook_rate_limit_per_minute` knob. The burst depth is
174    /// the same value (so a fresh sender can fire that many requests
175    /// back-to-back before throttling kicks in).
176    pub fn per_minute(rate_per_minute: u32, max_ips: usize) -> Self {
177        let rate = rate_per_minute.max(1);
178        Self::new(rate, f64::from(rate) / 60.0, max_ips)
179    }
180
181    /// Consume one token for `ip`. Returns `true` when the request is
182    /// admitted and `false` when the bucket is empty.
183    pub fn admit(&self, ip: IpAddr) -> bool {
184        self.admit_at(ip, Instant::now())
185    }
186
187    /// Same as [`Self::admit`] but with an explicit `now` so tests
188    /// can drive the refill clock deterministically without sleeping.
189    pub fn admit_at(&self, ip: IpAddr, now: Instant) -> bool {
190        let mut g = match self.inner.lock() {
191            Ok(g) => g,
192            // A poisoned mutex means a prior insert panicked. Recover
193            // by taking the inner data; the next caller starts with
194            // whatever state was visible at the panic. Failing open
195            // here is wrong for a rate limiter (it would defeat the
196            // throttle), so we proceed but the existing entries stay
197            // intact.
198            Err(p) => p.into_inner(),
199        };
200
201        // Evict oldest if at capacity AND ip is not already tracked.
202        if !g.contains_key(&ip) && g.len() >= self.max_ips {
203            if let Some(victim) = g.iter().min_by_key(|(_, b)| b.last_refill).map(|(k, _)| *k) {
204                g.remove(&victim);
205            }
206        }
207
208        let bucket =
209            g.entry(ip).or_insert_with(|| TokenBucket { tokens: self.capacity, last_refill: now });
210
211        // Clock can jump backwards on a leap-second / clock-drift
212        // event; clamp the elapsed delta to zero so we don't add a
213        // negative refill.
214        let elapsed = now.saturating_duration_since(bucket.last_refill).as_secs_f64();
215        bucket.tokens = (bucket.tokens + elapsed * self.refill_per_sec).min(self.capacity);
216        bucket.last_refill = now;
217
218        if bucket.tokens >= 1.0 {
219            bucket.tokens -= 1.0;
220            true
221        } else {
222            false
223        }
224    }
225
226    /// Number of currently tracked IPs. Used by tests.
227    #[cfg(test)]
228    pub fn tracked_ips(&self) -> usize {
229        self.inner.lock().map(|g| g.len()).unwrap_or(0)
230    }
231}
232
233/// Quick syntactic check on the signature header. Refuses anything that
234/// is not `sha256=` + exactly 64 lowercase-or-uppercase hex chars. Lets
235/// the handler 401 a forged delivery without buffering the body or
236/// running a full HMAC pass.
237fn signature_header_is_well_formed(header: &str) -> bool {
238    let Some(rest) = header.trim().strip_prefix(SIGNATURE_PREFIX) else { return false };
239    let rest = rest.trim();
240    rest.len() == SIGNATURE_HEX_LEN && rest.bytes().all(|b| b.is_ascii_hexdigit())
241}
242
243/// What kind of event the upstream advertised. Read from the
244/// provider-specific event header; `Unknown` when no recognised header
245/// is present so the handler can fall through to the legacy
246/// best-effort path.
247#[derive(Debug, Clone, PartialEq, Eq)]
248pub enum EventKind {
249    /// A real push event we want to scan.
250    Push,
251    /// GitHub's webhook-creation ping. Should be accepted at the
252    /// transport layer (so the upstream marks the webhook healthy) but
253    /// never trigger a scan.
254    Ping,
255    /// Anything else the upstream named (issues / pull_request /
256    /// workflow_run / ...). Acknowledged 200 so the upstream stops
257    /// retrying, never triggers a scan.
258    Other(String),
259    /// No recognised event header was present. Conservative fallback
260    /// for unknown providers; the handler then requires a `ref`-shaped
261    /// JSON body before triggering a scan.
262    Unknown,
263}
264
265/// Read the provider-specific event header into an [`EventKind`].
266pub fn classify_event(headers: &HeaderMap) -> EventKind {
267    for name in EVENT_HEADERS {
268        let Some(raw) = headers.get(*name).and_then(|v| v.to_str().ok()) else { continue };
269        let value = raw.trim();
270        if value.is_empty() {
271            continue;
272        }
273        if value.eq_ignore_ascii_case("push") || value.eq_ignore_ascii_case("push hook") {
274            return EventKind::Push;
275        }
276        if value.eq_ignore_ascii_case("ping") {
277            return EventKind::Ping;
278        }
279        return EventKind::Other(value.to_string());
280    }
281    EventKind::Unknown
282}
283
284/// Read the provider-specific delivery id (if any). Returned trimmed
285/// so trailing whitespace from misbehaving clients does not split the
286/// dedup cache.
287pub fn delivery_id(headers: &HeaderMap) -> Option<String> {
288    for name in DELIVERY_HEADERS {
289        let Some(raw) = headers.get(*name).and_then(|v| v.to_str().ok()) else { continue };
290        let trimmed = raw.trim();
291        if !trimmed.is_empty() {
292            return Some(trimmed.to_string());
293        }
294    }
295    None
296}
297
298/// Bounded LRU-ish set of delivery ids we have already processed.
299/// Insertion is O(1) amortised: a `HashSet` answers membership, a
300/// `VecDeque` records arrival order so the oldest entry rolls off
301/// once the cap is reached. The cap is [`DELIVERY_DEDUP_CAP`].
302#[derive(Default)]
303pub struct DeliveryDedupCache {
304    seen: HashSet<String>,
305    order: VecDeque<String>,
306}
307
308impl DeliveryDedupCache {
309    pub fn new() -> Self {
310        Self::default()
311    }
312
313    /// Record a delivery id. Returns `true` if the id is new, `false`
314    /// if it has already been seen within the cap window.
315    pub fn record(&mut self, id: &str) -> bool {
316        if self.seen.contains(id) {
317            return false;
318        }
319        if self.order.len() >= DELIVERY_DEDUP_CAP {
320            if let Some(old) = self.order.pop_front() {
321                self.seen.remove(&old);
322            }
323        }
324        self.seen.insert(id.to_string());
325        self.order.push_back(id.to_string());
326        true
327    }
328
329    #[cfg(test)]
330    pub fn len(&self) -> usize {
331        self.order.len()
332    }
333
334    #[cfg(test)]
335    pub fn is_empty(&self) -> bool {
336        self.order.is_empty()
337    }
338}
339
340/// Pluggable resolver that turns the operator's
341/// `triggers.webhook_secret_ref` value into the raw bytes used as the
342/// HMAC key. Production maps `env:<NAME>` to `$NAME`, but tests
343/// substitute an in-process stub so they don't have to mutate the
344/// environment.
345pub trait WebhookSecretResolver: Send + Sync + 'static {
346    fn resolve(&self) -> Option<Vec<u8>>;
347}
348
349/// Resolves `env:<NAME>` against the process environment. Any other
350/// shape is treated as the literal secret. Returns `None` when the
351/// referenced environment variable is unset (so the handler returns
352/// 503 rather than accepting unauthenticated triggers).
353pub struct EnvSecretResolver {
354    /// Raw value of `triggers.webhook_secret_ref` (e.g.
355    /// `env:NYX_WEBHOOK_SECRET` or a literal). `None` when the
356    /// operator has not configured the webhook.
357    pub spec: Option<String>,
358}
359
360impl WebhookSecretResolver for EnvSecretResolver {
361    fn resolve(&self) -> Option<Vec<u8>> {
362        let spec = self.spec.as_deref()?;
363        let raw = if let Some(var) = spec.strip_prefix("env:") {
364            std::env::var(var).ok()?.into_bytes()
365        } else {
366            spec.as_bytes().to_vec()
367        };
368        // Refuse empty secrets. An empty key satisfies the HMAC API
369        // but accepts any HMAC over the empty byte string, which is
370        // not authentication.
371        if raw.is_empty() {
372            None
373        } else {
374            Some(raw)
375        }
376    }
377}
378
379/// In-memory secret resolver for tests.
380#[derive(Clone)]
381pub struct StaticSecretResolver {
382    pub secret: Option<Vec<u8>>,
383}
384
385impl WebhookSecretResolver for StaticSecretResolver {
386    fn resolve(&self) -> Option<Vec<u8>> {
387        self.secret.clone()
388    }
389}
390
391/// Per-route config attached to the webhook handler.
392#[derive(Clone)]
393pub struct WebhookConfig {
394    /// Resolves the shared secret on every request so a wizard rotate
395    /// flow doesn't require a router rebuild.
396    pub secret: Arc<dyn WebhookSecretResolver>,
397    /// When `Some(branch)`, only payloads whose `ref` equals
398    /// `refs/heads/<branch>` trigger a scan. `None` accepts any branch.
399    pub branch: Option<String>,
400    /// Optional repo filter forwarded to the [`ScanTrigger`]. `None`
401    /// scans every enabled repo, matching the API's manual-trigger
402    /// behaviour.
403    pub repo: Option<String>,
404    /// Bounded set of delivery ids already processed, so a webhook-UI
405    /// redelivery (or a hostile replay of a captured-and-still-valid
406    /// HMAC body) does not retrigger the dispatcher. Shared across
407    /// clones of the config so a router rebuild keeps the cache hot.
408    pub dedup: Arc<Mutex<DeliveryDedupCache>>,
409    /// Provider-specific decoder for the verified push body. Operators
410    /// pick this via `[triggers].webhook_provider`; the default
411    /// (`refheads`) covers GitHub / Gitea / Forgejo / Gogs / GitLab,
412    /// which all ship the branch under top-level `ref`.
413    pub extractor: Arc<dyn WebhookPayloadExtractor>,
414    /// Optional cap on simultaneous in-flight handler invocations.
415    /// `None` disables the gate (every request is admitted). When
416    /// set, requests past the cap return 429 before HMAC verification
417    /// so a flood of valid-signed pushes cannot peg the executor.
418    pub concurrency: Option<Arc<WebhookConcurrencyLimit>>,
419    /// Optional per-source-IP token bucket. `None` disables the gate
420    /// (every IP is admitted). When set, requests past the per-IP
421    /// budget return 429 before HMAC verification. The handler
422    /// reads the source IP from the request's [`ConnectInfo`]
423    /// extension; deployments that route through a reverse proxy
424    /// without preserving the peer address will not see per-IP
425    /// throttling and fall back to the global concurrency cap.
426    pub rate_limit: Option<Arc<WebhookRateLimiter>>,
427}
428
429impl WebhookConfig {
430    /// Build a webhook config with a fresh dedup cache and the default
431    /// `ref: refs/heads/<branch>` extractor.
432    pub fn new(
433        secret: Arc<dyn WebhookSecretResolver>,
434        branch: Option<String>,
435        repo: Option<String>,
436    ) -> Self {
437        Self::with_extractor(secret, branch, repo, Arc::new(RefHeadsExtractor))
438    }
439
440    /// Build a webhook config with a fresh dedup cache and an explicit
441    /// extractor. Used when the operator's `[triggers].webhook_provider`
442    /// names a non-default shape (Bitbucket Server, Sourcehut, ...).
443    pub fn with_extractor(
444        secret: Arc<dyn WebhookSecretResolver>,
445        branch: Option<String>,
446        repo: Option<String>,
447        extractor: Arc<dyn WebhookPayloadExtractor>,
448    ) -> Self {
449        Self {
450            secret,
451            branch,
452            repo,
453            dedup: Arc::new(Mutex::new(DeliveryDedupCache::new())),
454            extractor,
455            concurrency: None,
456            rate_limit: None,
457        }
458    }
459
460    /// Attach a concurrency cap. Subsequent requests past the cap
461    /// return 429 before HMAC verification.
462    pub fn with_concurrency_limit(mut self, limit: Arc<WebhookConcurrencyLimit>) -> Self {
463        self.concurrency = Some(limit);
464        self
465    }
466
467    /// Attach a per-source-IP rate limiter. Requests past the per-IP
468    /// budget return 429 before HMAC verification.
469    pub fn with_rate_limit(mut self, limit: Arc<WebhookRateLimiter>) -> Self {
470        self.rate_limit = Some(limit);
471        self
472    }
473}
474
475/// Outcome of pulling the push fields out of an upstream-signed body.
476/// `branch` carries the bare branch name (e.g. `main`, not the full
477/// `refs/heads/main` ref). `repo_hint` is the upstream-reported repo
478/// identifier when present; reserved for the future per-repo trigger
479/// path (today the handler scope-alls and ignores the hint).
480#[derive(Debug, Default, Clone, PartialEq, Eq)]
481pub struct ParsedPush {
482    pub branch: Option<String>,
483    pub repo_hint: Option<String>,
484}
485
486/// Provider-specific decoder for the verified webhook body. Implementors
487/// receive the raw headers (so they can pick a payload shape per
488/// `Content-Type` or X-Event-Key) and the byte slice the HMAC already
489/// covered. Returning `None` signals "this body is not a push we can
490/// route" and the handler responds 200 + `triggered=false`.
491pub trait WebhookPayloadExtractor: Send + Sync + 'static {
492    fn extract(&self, headers: &HeaderMap, body: &[u8]) -> Option<ParsedPush>;
493}
494
495/// Decodes the top-level `"ref": "refs/heads/<branch>"` shape shipped by
496/// GitHub, Gitea, Forgejo, Gogs, and GitLab. Tolerates extra fields and
497/// reads `repository.full_name` when present so future per-repo routing
498/// has a hint to work with.
499pub struct RefHeadsExtractor;
500
501impl WebhookPayloadExtractor for RefHeadsExtractor {
502    fn extract(&self, _headers: &HeaderMap, body: &[u8]) -> Option<ParsedPush> {
503        let value: serde_json::Value = serde_json::from_slice(body).ok()?;
504        let branch = value
505            .get("ref")
506            .and_then(|v| v.as_str())
507            .and_then(|r| r.strip_prefix("refs/heads/"))
508            .map(|s| s.to_string());
509        let repo_hint = value
510            .get("repository")
511            .and_then(|r| r.get("full_name").or_else(|| r.get("name")))
512            .and_then(|v| v.as_str())
513            .map(|s| s.to_string());
514        Some(ParsedPush { branch, repo_hint })
515    }
516}
517
518/// Decodes the Bitbucket Server / Data Center push shape:
519/// `{ "changes": [ { "refId": "refs/heads/<branch>", ... } ], "repository": { "slug": "..." } }`.
520pub struct BitbucketServerExtractor;
521
522impl WebhookPayloadExtractor for BitbucketServerExtractor {
523    fn extract(&self, _headers: &HeaderMap, body: &[u8]) -> Option<ParsedPush> {
524        let value: serde_json::Value = serde_json::from_slice(body).ok()?;
525        let branch = value
526            .get("changes")
527            .and_then(|c| c.as_array())
528            .and_then(|arr| arr.first())
529            .and_then(|first| first.get("refId"))
530            .and_then(|v| v.as_str())
531            .and_then(|r| r.strip_prefix("refs/heads/"))
532            .map(|s| s.to_string());
533        let repo_hint = value
534            .get("repository")
535            .and_then(|r| r.get("slug").or_else(|| r.get("name")))
536            .and_then(|v| v.as_str())
537            .map(|s| s.to_string());
538        Some(ParsedPush { branch, repo_hint })
539    }
540}
541
542/// Decodes the Sourcehut `hgmail`/builds shape that nests the refs under
543/// `event.refs[0].name`. Repo hint comes from `event.repo.name`.
544pub struct SourcehutExtractor;
545
546impl WebhookPayloadExtractor for SourcehutExtractor {
547    fn extract(&self, _headers: &HeaderMap, body: &[u8]) -> Option<ParsedPush> {
548        let value: serde_json::Value = serde_json::from_slice(body).ok()?;
549        let event = value.get("event")?;
550        let branch = event
551            .get("refs")
552            .and_then(|r| r.as_array())
553            .and_then(|arr| arr.first())
554            .and_then(|first| first.get("name"))
555            .and_then(|v| v.as_str())
556            .and_then(|r| r.strip_prefix("refs/heads/").or(Some(r)))
557            .map(|s| s.to_string());
558        let repo_hint = event
559            .get("repo")
560            .and_then(|r| r.get("name"))
561            .and_then(|v| v.as_str())
562            .map(|s| s.to_string());
563        Some(ParsedPush { branch, repo_hint })
564    }
565}
566
567/// Parse the operator's `[triggers].webhook_provider` string into an
568/// extractor. Unknown / empty strings fall back to the default
569/// (`RefHeadsExtractor`) so a typo never silently disables webhooks.
570pub fn extractor_for_provider(name: Option<&str>) -> Arc<dyn WebhookPayloadExtractor> {
571    let Some(raw) = name else { return Arc::new(RefHeadsExtractor) };
572    match raw.trim().to_ascii_lowercase().as_str() {
573        "" | "github" | "gitea" | "forgejo" | "gogs" | "gitlab" | "refheads" => {
574            Arc::new(RefHeadsExtractor)
575        }
576        "bitbucket" | "bitbucket-server" | "bitbucket_data_center" => {
577            Arc::new(BitbucketServerExtractor)
578        }
579        "sourcehut" | "srht" => Arc::new(SourcehutExtractor),
580        // Unknown provider falls back to the default but is worth a
581        // log line so the operator can spot the typo in their config.
582        other => {
583            tracing::warn!(
584                provider = other,
585                "unknown `[triggers].webhook_provider`; defaulting to `refheads`"
586            );
587            Arc::new(RefHeadsExtractor)
588        }
589    }
590}
591
592#[derive(Debug, Serialize)]
593pub struct WebhookResponse {
594    pub triggered: bool,
595    /// Run id when `triggered=true`; `None` when the branch filter
596    /// rejected the payload.
597    pub run_id: Option<String>,
598    /// Operator-readable explanation for an accepted-but-skipped
599    /// delivery (wrong branch). Empty on a triggered scan.
600    pub message: String,
601}
602
603/// Read the peer's IP from the request extensions. Populated by
604/// `axum::serve(_, app.into_make_service_with_connect_info::<SocketAddr>())`.
605/// Returns `None` when the server was launched without
606/// `into_make_service_with_connect_info`, in which case the per-IP
607/// rate limiter is skipped (the global concurrency gate still
608/// applies).
609fn peer_ip_from_request(req: &Request<Body>) -> Option<IpAddr> {
610    req.extensions().get::<ConnectInfo<SocketAddr>>().map(|c| c.0.ip())
611}
612
613/// `POST /webhook/git` handler.
614pub async fn webhook_git(
615    State(state): State<ServerState>,
616    req: Request<Body>,
617) -> Result<impl IntoResponse, ApiError> {
618    let Some(cfg) = state.webhook.as_ref() else {
619        return Err(ApiError::Internal(
620            "webhook not enabled; set [triggers].webhook_secret_ref in nyx-agent.toml".to_string(),
621        ));
622    };
623
624    // Per-source-IP rate limit. Runs first so a hostile sender's
625    // CPU spend per delivery stays at "header parse" before we even
626    // resolve the secret. Skipped when the server was not launched
627    // with `into_make_service_with_connect_info` (peer IP absent).
628    if let Some(limiter) = cfg.rate_limit.as_ref() {
629        if let Some(ip) = peer_ip_from_request(&req) {
630            if !limiter.admit(ip) {
631                return Err(ApiError::TooManyRequests(format!(
632                    "webhook rate limit exceeded for `{ip}`"
633                )));
634            }
635        }
636    }
637
638    // Global concurrency cap on the handler. Holding the permit
639    // until end-of-handler covers HMAC verification + body buffer +
640    // scan-trigger dispatch.
641    let _permit = if let Some(limit) = cfg.concurrency.as_ref() {
642        match limit.try_acquire() {
643            Some(permit) => Some(permit),
644            None => {
645                return Err(ApiError::TooManyRequests(
646                    "webhook concurrency limit reached".to_string(),
647                ));
648            }
649        }
650    } else {
651        None
652    };
653
654    let Some(secret) = cfg.secret.resolve() else {
655        // Webhook is configured but the secret cannot be resolved
656        // (e.g. unset env var). Refuse the delivery: accepting it
657        // would be unauthenticated.
658        return Ok((
659            StatusCode::SERVICE_UNAVAILABLE,
660            Json(WebhookResponse {
661                triggered: false,
662                run_id: None,
663                message: "webhook secret is not configured".to_string(),
664            }),
665        )
666            .into_response());
667    };
668
669    // Pull the signature header BEFORE consuming the body so a missing
670    // or syntactically-malformed header short-circuits without
671    // buffering and without burning an HMAC pass per forged delivery.
672    let sig_header = req
673        .headers()
674        .get(SIGNATURE_HEADER)
675        .and_then(|v| v.to_str().ok())
676        .map(|s| s.to_string())
677        .ok_or(ApiError::Unauthorized)?;
678    if !signature_header_is_well_formed(&sig_header) {
679        return Err(ApiError::Unauthorized);
680    }
681
682    // Refuse non-push event types we can identify by header before
683    // buffering the body. A GitHub `ping` (sent on webhook creation),
684    // `issues`, `pull_request`, `workflow_run`, ... all carry valid
685    // HMAC over the body but must not trigger a scan. The transport
686    // status stays 200 so the upstream marks the delivery healthy and
687    // does not retry.
688    let event = classify_event(req.headers());
689    match &event {
690        EventKind::Push | EventKind::Unknown => {}
691        EventKind::Ping => {
692            return Ok((
693                StatusCode::OK,
694                Json(WebhookResponse {
695                    triggered: false,
696                    run_id: None,
697                    message: "ping event acknowledged".to_string(),
698                }),
699            )
700                .into_response());
701        }
702        EventKind::Other(name) => {
703            return Ok((
704                StatusCode::OK,
705                Json(WebhookResponse {
706                    triggered: false,
707                    run_id: None,
708                    message: format!("event `{name}` is not a push; ignored"),
709                }),
710            )
711                .into_response());
712        }
713    }
714
715    // Reject oversized payloads on the advertised Content-Length before
716    // buffering. `to_bytes` enforces the same cap (covering chunked
717    // transfer encoding where Content-Length is absent), but the
718    // header-side check refuses a hostile sender before any body read.
719    if let Some(declared) = req
720        .headers()
721        .get(CONTENT_LENGTH)
722        .and_then(|v| v.to_str().ok())
723        .and_then(|s| s.parse::<usize>().ok())
724    {
725        if declared > MAX_WEBHOOK_BODY_BYTES {
726            return Err(ApiError::PayloadTooLarge(format!(
727                "webhook body {declared} bytes exceeds {MAX_WEBHOOK_BODY_BYTES} byte limit"
728            )));
729        }
730    }
731
732    let (parts, body) = req.into_parts();
733    let headers = parts.headers;
734    let body_bytes = to_bytes(body, MAX_WEBHOOK_BODY_BYTES).await.map_err(|e| {
735        ApiError::PayloadTooLarge(format!("webhook body exceeded limit or failed to read: {e}"))
736    })?;
737
738    if !verify_signature(&secret, body_bytes.as_ref(), &sig_header) {
739        return Err(ApiError::Unauthorized);
740    }
741
742    // Replay drop: only after HMAC verified, so a hostile sender
743    // without the secret cannot poison the cache by spraying random
744    // delivery ids. Providers that do not emit a delivery header
745    // skip dedup; the HMAC + push-event filter is the floor in that
746    // case.
747    if let Some(delivery) = delivery_id(&headers) {
748        let fresh = match cfg.dedup.lock() {
749            Ok(mut guard) => guard.record(&delivery),
750            // A poisoned mutex means a previous insert panicked. The
751            // safe response is to fail open (treat the delivery as
752            // new) rather than reject every subsequent request.
753            Err(poisoned) => {
754                tracing::warn!("webhook dedup cache poisoned: {poisoned}");
755                true
756            }
757        };
758        if !fresh {
759            return Ok((
760                StatusCode::OK,
761                Json(WebhookResponse {
762                    triggered: false,
763                    run_id: None,
764                    message: format!("delivery `{delivery}` already processed"),
765                }),
766            )
767                .into_response());
768        }
769    }
770
771    // Best-effort decode via the operator-selected extractor. A body
772    // the extractor cannot parse is accepted (some upstream form-encoded
773    // payloads include a JSON value as the `payload` form field; we
774    // tolerate that by reading the branch only when the extractor
775    // recognises the shape).
776    let parsed = cfg.extractor.extract(&headers, body_bytes.as_ref());
777    let branch = parsed.as_ref().and_then(|p| p.branch.clone());
778
779    // A signed-but-branchless body for an Unknown-event provider is not
780    // a push; refuse to trigger. (Push events for known providers were
781    // already classified above; this guard catches the legacy
782    // best-effort path so it stops accepting non-push deliveries
783    // whose provider did not set an event header.)
784    if matches!(event, EventKind::Unknown) && branch.is_none() {
785        return Ok((
786            StatusCode::OK,
787            Json(WebhookResponse {
788                triggered: false,
789                run_id: None,
790                message: "payload carried no recognised ref; not a push event".to_string(),
791            }),
792        )
793            .into_response());
794    }
795
796    if let Some(want) = cfg.branch.as_deref() {
797        match branch.as_deref() {
798            Some(actual) if actual == want => {}
799            other => {
800                return Ok((
801                    StatusCode::OK,
802                    Json(WebhookResponse {
803                        triggered: false,
804                        run_id: None,
805                        message: format!(
806                            "branch filter rejected delivery (want `{want}`, got `{}`)",
807                            other.unwrap_or("<unknown>")
808                        ),
809                    }),
810                )
811                    .into_response());
812            }
813        }
814    }
815
816    let trigger: Arc<dyn ScanTrigger> = Arc::clone(&state.scan);
817    // Webhook config does not yet plumb a project filter; scope-all is
818    // preserved by passing `None` for project_id. An optional
819    // `project = "..."` field in the trigger config block could narrow
820    // this later.
821    let run_id = trigger.trigger(ScanTriggerSource::Webhook, None, cfg.repo.clone(), None).await?;
822    Ok((
823        StatusCode::ACCEPTED,
824        Json(WebhookResponse { triggered: true, run_id: Some(run_id), message: String::new() }),
825    )
826        .into_response())
827}
828
829/// Constant-time HMAC-SHA256 verification.
830pub fn verify_signature(secret: &[u8], body: &[u8], header: &str) -> bool {
831    let Some(hex_sig) = header.trim().strip_prefix(SIGNATURE_PREFIX) else { return false };
832    let Ok(provided) = hex::decode(hex_sig.trim()) else { return false };
833    let Ok(mut mac) = HmacSha256::new_from_slice(secret) else { return false };
834    mac.update(body);
835    let expected = mac.finalize().into_bytes();
836    provided.as_slice().ct_eq(expected.as_slice()).into()
837}
838
839/// Helper used by the daemon's wiring + the test harness to mint the
840/// `sha256=<hex>` header for a given (secret, body).
841pub fn sign(secret: &[u8], body: &[u8]) -> String {
842    let mut mac = HmacSha256::new_from_slice(secret).expect("HMAC accepts any key length");
843    mac.update(body);
844    format!("{}{}", SIGNATURE_PREFIX, hex::encode(mac.finalize().into_bytes()))
845}
846
847#[cfg(test)]
848mod tests {
849    use super::*;
850
851    #[test]
852    fn signature_roundtrip() {
853        let secret = b"hunter2";
854        let body = br#"{"ref":"refs/heads/main"}"#;
855        let header = sign(secret, body);
856        assert!(header.starts_with(SIGNATURE_PREFIX));
857        assert!(verify_signature(secret, body, &header));
858    }
859
860    #[test]
861    fn signature_rejects_modified_body() {
862        let secret = b"hunter2";
863        let body = br#"{"ref":"refs/heads/main"}"#;
864        let header = sign(secret, body);
865        assert!(!verify_signature(secret, br#"{"ref":"refs/heads/evil"}"#, &header));
866    }
867
868    #[test]
869    fn signature_rejects_wrong_secret() {
870        let secret = b"hunter2";
871        let body = br#"{"ref":"refs/heads/main"}"#;
872        let header = sign(secret, body);
873        assert!(!verify_signature(b"wrong-secret", body, &header));
874    }
875
876    #[test]
877    fn signature_rejects_missing_prefix() {
878        let secret = b"hunter2";
879        let body = b"{}";
880        let mut header = sign(secret, body);
881        // Strip the `sha256=` prefix.
882        header.replace_range(..SIGNATURE_PREFIX.len(), "");
883        assert!(!verify_signature(secret, body, &header));
884    }
885
886    #[test]
887    fn env_resolver_reads_from_environment() {
888        // Use a randomised env var name so concurrent test runs do not
889        // race on a shared name.
890        let var = format!("NYX_TEST_WEBHOOK_{}", std::process::id());
891        std::env::set_var(&var, "shh");
892        let resolver = EnvSecretResolver { spec: Some(format!("env:{var}")) };
893        assert_eq!(resolver.resolve().as_deref(), Some(b"shh".as_slice()));
894        std::env::remove_var(&var);
895        assert!(resolver.resolve().is_none());
896    }
897
898    #[test]
899    fn env_resolver_passes_literal_through() {
900        let resolver = EnvSecretResolver { spec: Some("literal-secret".to_string()) };
901        assert_eq!(resolver.resolve().as_deref(), Some(b"literal-secret".as_slice()));
902    }
903
904    #[test]
905    fn env_resolver_returns_none_when_unset() {
906        let resolver = EnvSecretResolver { spec: None };
907        assert!(resolver.resolve().is_none());
908    }
909
910    #[test]
911    fn env_resolver_refuses_empty_literal() {
912        let resolver = EnvSecretResolver { spec: Some(String::new()) };
913        assert!(resolver.resolve().is_none(), "empty literal secret must not pass HMAC auth");
914    }
915
916    #[test]
917    fn env_resolver_refuses_empty_env_value() {
918        let var = format!("NYX_TEST_WEBHOOK_EMPTY_{}", std::process::id());
919        std::env::set_var(&var, "");
920        let resolver = EnvSecretResolver { spec: Some(format!("env:{var}")) };
921        assert!(resolver.resolve().is_none(), "empty env-backed secret must not pass HMAC auth");
922        std::env::remove_var(&var);
923    }
924
925    #[test]
926    fn signature_header_shape_accepts_canonical_form() {
927        let header = format!("sha256={}", "a".repeat(SIGNATURE_HEX_LEN));
928        assert!(signature_header_is_well_formed(&header));
929    }
930
931    #[test]
932    fn signature_header_shape_accepts_mixed_case_hex() {
933        let header = format!("sha256={}", "AbCdEf0123456789".repeat(4));
934        assert!(signature_header_is_well_formed(&header));
935    }
936
937    #[test]
938    fn signature_header_shape_rejects_missing_prefix() {
939        let header = "a".repeat(SIGNATURE_HEX_LEN);
940        assert!(!signature_header_is_well_formed(&header));
941    }
942
943    #[test]
944    fn signature_header_shape_rejects_short_digest() {
945        let header = format!("sha256={}", "a".repeat(SIGNATURE_HEX_LEN - 1));
946        assert!(!signature_header_is_well_formed(&header));
947    }
948
949    #[test]
950    fn signature_header_shape_rejects_long_digest() {
951        let header = format!("sha256={}", "a".repeat(SIGNATURE_HEX_LEN + 1));
952        assert!(!signature_header_is_well_formed(&header));
953    }
954
955    #[test]
956    fn signature_header_shape_rejects_non_hex_chars() {
957        let header = format!("sha256={}", "z".repeat(SIGNATURE_HEX_LEN));
958        assert!(!signature_header_is_well_formed(&header));
959    }
960
961    fn map(pairs: &[(&str, &str)]) -> HeaderMap {
962        let mut m = HeaderMap::new();
963        for (k, v) in pairs {
964            m.insert(
965                axum::http::HeaderName::from_bytes(k.as_bytes()).expect("header name"),
966                axum::http::HeaderValue::from_str(v).expect("header value"),
967            );
968        }
969        m
970    }
971
972    #[test]
973    fn classify_event_recognises_github_push() {
974        assert_eq!(classify_event(&map(&[("X-GitHub-Event", "push")])), EventKind::Push);
975    }
976
977    #[test]
978    fn classify_event_is_case_insensitive() {
979        assert_eq!(classify_event(&map(&[("X-GitHub-Event", "PuSh")])), EventKind::Push);
980    }
981
982    #[test]
983    fn classify_event_recognises_gitlab_push_hook() {
984        assert_eq!(classify_event(&map(&[("X-Gitlab-Event", "Push Hook")])), EventKind::Push);
985    }
986
987    #[test]
988    fn classify_event_recognises_ping() {
989        assert_eq!(classify_event(&map(&[("X-GitHub-Event", "ping")])), EventKind::Ping);
990    }
991
992    #[test]
993    fn classify_event_returns_other_for_unknown_event_name() {
994        match classify_event(&map(&[("X-GitHub-Event", "issues")])) {
995            EventKind::Other(name) => assert_eq!(name, "issues"),
996            other => panic!("expected Other(\"issues\"), got {other:?}"),
997        }
998    }
999
1000    #[test]
1001    fn classify_event_returns_unknown_when_no_provider_header() {
1002        assert_eq!(classify_event(&HeaderMap::new()), EventKind::Unknown);
1003    }
1004
1005    #[test]
1006    fn classify_event_ignores_empty_header_value() {
1007        assert_eq!(classify_event(&map(&[("X-GitHub-Event", "")])), EventKind::Unknown);
1008    }
1009
1010    #[test]
1011    fn delivery_id_reads_github_header() {
1012        let id = delivery_id(&map(&[("X-GitHub-Delivery", "abc-123")]));
1013        assert_eq!(id.as_deref(), Some("abc-123"));
1014    }
1015
1016    #[test]
1017    fn delivery_id_reads_gitea_header_when_github_absent() {
1018        let id = delivery_id(&map(&[("X-Gitea-Delivery", "xyz-789")]));
1019        assert_eq!(id.as_deref(), Some("xyz-789"));
1020    }
1021
1022    #[test]
1023    fn delivery_id_is_none_when_no_header() {
1024        assert!(delivery_id(&HeaderMap::new()).is_none());
1025    }
1026
1027    #[test]
1028    fn dedup_cache_records_new_id() {
1029        let mut cache = DeliveryDedupCache::new();
1030        assert!(cache.record("a"));
1031        assert_eq!(cache.len(), 1);
1032    }
1033
1034    #[test]
1035    fn dedup_cache_drops_repeat() {
1036        let mut cache = DeliveryDedupCache::new();
1037        assert!(cache.record("a"));
1038        assert!(!cache.record("a"), "second insert must report duplicate");
1039        assert_eq!(cache.len(), 1);
1040    }
1041
1042    #[test]
1043    fn refheads_extractor_reads_github_push() {
1044        let body = br#"{"ref":"refs/heads/main","repository":{"full_name":"acme/api"}}"#;
1045        let parsed = RefHeadsExtractor.extract(&HeaderMap::new(), body).expect("parsed");
1046        assert_eq!(parsed.branch.as_deref(), Some("main"));
1047        assert_eq!(parsed.repo_hint.as_deref(), Some("acme/api"));
1048    }
1049
1050    #[test]
1051    fn refheads_extractor_returns_none_branch_for_tag_push() {
1052        let body = br#"{"ref":"refs/tags/v1.2.3"}"#;
1053        let parsed = RefHeadsExtractor.extract(&HeaderMap::new(), body).expect("parsed");
1054        assert!(parsed.branch.is_none(), "tag pushes are not branch pushes");
1055    }
1056
1057    #[test]
1058    fn refheads_extractor_falls_back_to_repo_name() {
1059        let body = br#"{"ref":"refs/heads/dev","repository":{"name":"api"}}"#;
1060        let parsed = RefHeadsExtractor.extract(&HeaderMap::new(), body).expect("parsed");
1061        assert_eq!(parsed.repo_hint.as_deref(), Some("api"));
1062    }
1063
1064    #[test]
1065    fn refheads_extractor_returns_none_on_garbage() {
1066        assert!(RefHeadsExtractor.extract(&HeaderMap::new(), b"not-json").is_none());
1067    }
1068
1069    #[test]
1070    fn bitbucket_server_extractor_reads_changes_array() {
1071        let body = br#"{
1072            "changes":[{"refId":"refs/heads/develop","type":"UPDATE"}],
1073            "repository":{"slug":"api","name":"Api Service"}
1074        }"#;
1075        let parsed = BitbucketServerExtractor.extract(&HeaderMap::new(), body).expect("parsed");
1076        assert_eq!(parsed.branch.as_deref(), Some("develop"));
1077        assert_eq!(parsed.repo_hint.as_deref(), Some("api"));
1078    }
1079
1080    #[test]
1081    fn bitbucket_server_extractor_returns_none_branch_when_changes_empty() {
1082        let body = br#"{"changes":[],"repository":{"slug":"api"}}"#;
1083        let parsed = BitbucketServerExtractor.extract(&HeaderMap::new(), body).expect("parsed");
1084        assert!(parsed.branch.is_none());
1085        assert_eq!(parsed.repo_hint.as_deref(), Some("api"));
1086    }
1087
1088    #[test]
1089    fn sourcehut_extractor_reads_nested_event_refs() {
1090        let body =
1091            br#"{"event":{"refs":[{"name":"refs/heads/main"}],"repo":{"name":"~user/proj"}}}"#;
1092        let parsed = SourcehutExtractor.extract(&HeaderMap::new(), body).expect("parsed");
1093        assert_eq!(parsed.branch.as_deref(), Some("main"));
1094        assert_eq!(parsed.repo_hint.as_deref(), Some("~user/proj"));
1095    }
1096
1097    #[test]
1098    fn sourcehut_extractor_keeps_bare_branch_names() {
1099        // Some sr.ht builds emit just the branch name without the
1100        // `refs/heads/` prefix.
1101        let body = br#"{"event":{"refs":[{"name":"main"}]}}"#;
1102        let parsed = SourcehutExtractor.extract(&HeaderMap::new(), body).expect("parsed");
1103        assert_eq!(parsed.branch.as_deref(), Some("main"));
1104    }
1105
1106    #[test]
1107    fn extractor_for_provider_defaults_when_missing() {
1108        let body = br#"{"ref":"refs/heads/main"}"#;
1109        let ex = extractor_for_provider(None);
1110        assert_eq!(
1111            ex.extract(&HeaderMap::new(), body).and_then(|p| p.branch).as_deref(),
1112            Some("main"),
1113        );
1114    }
1115
1116    #[test]
1117    fn extractor_for_provider_matches_known_aliases() {
1118        for name in ["github", "GITHUB", " gitea ", "forgejo", "gogs", "gitlab", "refheads"] {
1119            let ex = extractor_for_provider(Some(name));
1120            let body = br#"{"ref":"refs/heads/main"}"#;
1121            assert_eq!(
1122                ex.extract(&HeaderMap::new(), body).and_then(|p| p.branch).as_deref(),
1123                Some("main"),
1124                "alias `{name}` should map to RefHeadsExtractor",
1125            );
1126        }
1127    }
1128
1129    #[test]
1130    fn extractor_for_provider_picks_bitbucket() {
1131        let ex = extractor_for_provider(Some("bitbucket"));
1132        let body = br#"{"changes":[{"refId":"refs/heads/main"}]}"#;
1133        assert_eq!(
1134            ex.extract(&HeaderMap::new(), body).and_then(|p| p.branch).as_deref(),
1135            Some("main"),
1136        );
1137    }
1138
1139    #[test]
1140    fn extractor_for_provider_picks_sourcehut() {
1141        let ex = extractor_for_provider(Some("sourcehut"));
1142        let body = br#"{"event":{"refs":[{"name":"refs/heads/main"}]}}"#;
1143        assert_eq!(
1144            ex.extract(&HeaderMap::new(), body).and_then(|p| p.branch).as_deref(),
1145            Some("main"),
1146        );
1147    }
1148
1149    #[test]
1150    fn extractor_for_provider_falls_back_on_unknown() {
1151        let ex = extractor_for_provider(Some("notarealthing"));
1152        let body = br#"{"ref":"refs/heads/main"}"#;
1153        // Unknown provider warns + falls back to RefHeads.
1154        assert_eq!(
1155            ex.extract(&HeaderMap::new(), body).and_then(|p| p.branch).as_deref(),
1156            Some("main"),
1157        );
1158    }
1159
1160    #[test]
1161    fn rate_limiter_admits_until_bucket_empty() {
1162        let limiter = WebhookRateLimiter::new(3, 0.0, 16);
1163        let ip: IpAddr = "127.0.0.1".parse().unwrap();
1164        assert!(limiter.admit(ip));
1165        assert!(limiter.admit(ip));
1166        assert!(limiter.admit(ip));
1167        assert!(!limiter.admit(ip), "fourth request must be refused");
1168    }
1169
1170    #[test]
1171    fn rate_limiter_refills_over_time() {
1172        // 1 token / second refill. Bucket size 2.
1173        let limiter = WebhookRateLimiter::new(2, 1.0, 16);
1174        let ip: IpAddr = "10.0.0.5".parse().unwrap();
1175        let t0 = Instant::now();
1176        assert!(limiter.admit_at(ip, t0));
1177        assert!(limiter.admit_at(ip, t0));
1178        // Same instant: bucket empty.
1179        assert!(!limiter.admit_at(ip, t0));
1180        // One second later: one token regenerated.
1181        let t1 = t0 + std::time::Duration::from_secs(1);
1182        assert!(limiter.admit_at(ip, t1));
1183        // Immediately after: empty again.
1184        assert!(!limiter.admit_at(ip, t1));
1185        // Five seconds later: bucket fully refilled, but cap at 2.
1186        let t6 = t1 + std::time::Duration::from_secs(5);
1187        assert!(limiter.admit_at(ip, t6));
1188        assert!(limiter.admit_at(ip, t6));
1189        assert!(!limiter.admit_at(ip, t6));
1190    }
1191
1192    #[test]
1193    fn rate_limiter_per_ip_buckets_are_independent() {
1194        let limiter = WebhookRateLimiter::new(1, 0.0, 16);
1195        let a: IpAddr = "127.0.0.1".parse().unwrap();
1196        let b: IpAddr = "127.0.0.2".parse().unwrap();
1197        assert!(limiter.admit(a));
1198        // a is exhausted; b still has its own token.
1199        assert!(!limiter.admit(a));
1200        assert!(limiter.admit(b));
1201    }
1202
1203    #[test]
1204    fn rate_limiter_per_minute_helper_matches_rate() {
1205        // 60/min == 1 token / second refill, with burst depth 60.
1206        let limiter = WebhookRateLimiter::per_minute(60, 64);
1207        let ip: IpAddr = "127.0.0.1".parse().unwrap();
1208        let t0 = Instant::now();
1209        for _ in 0..60 {
1210            assert!(limiter.admit_at(ip, t0));
1211        }
1212        assert!(!limiter.admit_at(ip, t0));
1213    }
1214
1215    #[test]
1216    fn rate_limiter_evicts_oldest_ip_at_cap() {
1217        let limiter = WebhookRateLimiter::new(1, 0.0, 2);
1218        let t0 = Instant::now();
1219        let a: IpAddr = "127.0.0.1".parse().unwrap();
1220        let b: IpAddr = "127.0.0.2".parse().unwrap();
1221        let c: IpAddr = "127.0.0.3".parse().unwrap();
1222        assert!(limiter.admit_at(a, t0));
1223        assert!(limiter.admit_at(b, t0 + std::time::Duration::from_secs(1)));
1224        assert!(limiter.admit_at(c, t0 + std::time::Duration::from_secs(2)));
1225        // `a` was oldest; it should have been evicted to make room
1226        // for `c`, so the map carries `b` and `c` only.
1227        assert_eq!(limiter.tracked_ips(), 2);
1228    }
1229
1230    #[test]
1231    fn concurrency_limit_refuses_past_cap() {
1232        let limit = WebhookConcurrencyLimit::new(2);
1233        let p1 = limit.try_acquire().expect("first permit");
1234        let p2 = limit.try_acquire().expect("second permit");
1235        assert!(limit.try_acquire().is_none(), "third acquire must fail when cap is reached");
1236        drop(p1);
1237        assert!(limit.try_acquire().is_some(), "releasing a permit must make one available again");
1238        drop(p2);
1239    }
1240
1241    #[test]
1242    fn concurrency_limit_floor_is_one() {
1243        let limit = WebhookConcurrencyLimit::new(0);
1244        assert_eq!(limit.permits(), 1);
1245        assert!(limit.try_acquire().is_some());
1246    }
1247
1248    #[test]
1249    fn dedup_cache_evicts_oldest_at_cap() {
1250        let mut cache = DeliveryDedupCache::new();
1251        for i in 0..DELIVERY_DEDUP_CAP {
1252            assert!(cache.record(&format!("d-{i}")));
1253        }
1254        assert_eq!(cache.len(), DELIVERY_DEDUP_CAP);
1255        // Push one more; the oldest entry rolls off.
1256        assert!(cache.record("d-new"));
1257        assert_eq!(cache.len(), DELIVERY_DEDUP_CAP);
1258        // The id we just evicted is now `record()`-able again.
1259        assert!(cache.record("d-0"));
1260    }
1261}