1use std::collections::{HashMap, HashSet, VecDeque};
28use std::net::{IpAddr, SocketAddr};
29use std::sync::{Arc, Mutex};
30use std::time::Instant;
31
32use axum::body::{to_bytes, Body};
33use axum::extract::{ConnectInfo, State};
34use axum::http::header::CONTENT_LENGTH;
35use axum::http::{HeaderMap, Request, StatusCode};
36use axum::response::IntoResponse;
37use axum::Json;
38use hmac::{Hmac, KeyInit, Mac};
39use serde::Serialize;
40use sha2::Sha256;
41use subtle::ConstantTimeEq;
42
43use crate::state::{ApiError, ScanTrigger, ScanTriggerSource, ServerState};
44
45type HmacSha256 = Hmac<Sha256>;
46
47pub const MAX_WEBHOOK_BODY_BYTES: usize = 1024 * 1024;
51
52const SIGNATURE_HEADER: &str = "X-Hub-Signature-256";
53const SIGNATURE_PREFIX: &str = "sha256=";
54
55const SIGNATURE_HEX_LEN: usize = 64;
57
58const EVENT_HEADERS: &[&str] =
62 &["X-GitHub-Event", "X-Gitea-Event", "X-Forgejo-Event", "X-Gogs-Event", "X-Gitlab-Event"];
63
64const DELIVERY_HEADERS: &[&str] = &[
67 "X-GitHub-Delivery",
68 "X-Gitea-Delivery",
69 "X-Forgejo-Delivery",
70 "X-Gogs-Delivery",
71 "X-Gitlab-Event-UUID",
72];
73
74pub const DELIVERY_DEDUP_CAP: usize = 1024;
80
81pub const DEFAULT_WEBHOOK_MAX_CONCURRENT: usize = 8;
87
88pub const DEFAULT_WEBHOOK_RATE_LIMIT_PER_MINUTE: u32 = 30;
92
93pub const DEFAULT_WEBHOOK_RATE_LIMIT_BURST: u32 = 30;
97
98pub const DEFAULT_WEBHOOK_RATE_LIMIT_MAX_IPS: usize = 1024;
102
103pub struct WebhookConcurrencyLimit {
109 inner: Arc<tokio::sync::Semaphore>,
110 permits: usize,
111}
112
113impl WebhookConcurrencyLimit {
114 pub fn new(permits: usize) -> Self {
115 let permits = permits.max(1);
116 Self { inner: Arc::new(tokio::sync::Semaphore::new(permits)), permits }
117 }
118
119 pub fn permits(&self) -> usize {
122 self.permits
123 }
124
125 pub fn try_acquire(&self) -> Option<tokio::sync::OwnedSemaphorePermit> {
130 Arc::clone(&self.inner).try_acquire_owned().ok()
131 }
132}
133
134pub struct WebhookRateLimiter {
146 capacity: f64,
147 refill_per_sec: f64,
148 max_ips: usize,
149 inner: Mutex<HashMap<IpAddr, TokenBucket>>,
150}
151
152#[derive(Debug)]
153struct TokenBucket {
154 tokens: f64,
155 last_refill: Instant,
156}
157
158impl WebhookRateLimiter {
159 pub fn new(capacity: u32, refill_per_sec: f64, max_ips: usize) -> Self {
164 Self {
165 capacity: f64::from(capacity.max(1)),
166 refill_per_sec: refill_per_sec.max(0.0),
167 max_ips: max_ips.max(1),
168 inner: Mutex::new(HashMap::new()),
169 }
170 }
171
172 pub fn per_minute(rate_per_minute: u32, max_ips: usize) -> Self {
177 let rate = rate_per_minute.max(1);
178 Self::new(rate, f64::from(rate) / 60.0, max_ips)
179 }
180
181 pub fn admit(&self, ip: IpAddr) -> bool {
184 self.admit_at(ip, Instant::now())
185 }
186
187 pub fn admit_at(&self, ip: IpAddr, now: Instant) -> bool {
190 let mut g = match self.inner.lock() {
191 Ok(g) => g,
192 Err(p) => p.into_inner(),
199 };
200
201 if !g.contains_key(&ip) && g.len() >= self.max_ips {
203 if let Some(victim) = g.iter().min_by_key(|(_, b)| b.last_refill).map(|(k, _)| *k) {
204 g.remove(&victim);
205 }
206 }
207
208 let bucket =
209 g.entry(ip).or_insert_with(|| TokenBucket { tokens: self.capacity, last_refill: now });
210
211 let elapsed = now.saturating_duration_since(bucket.last_refill).as_secs_f64();
215 bucket.tokens = (bucket.tokens + elapsed * self.refill_per_sec).min(self.capacity);
216 bucket.last_refill = now;
217
218 if bucket.tokens >= 1.0 {
219 bucket.tokens -= 1.0;
220 true
221 } else {
222 false
223 }
224 }
225
226 #[cfg(test)]
228 pub fn tracked_ips(&self) -> usize {
229 self.inner.lock().map(|g| g.len()).unwrap_or(0)
230 }
231}
232
233fn signature_header_is_well_formed(header: &str) -> bool {
238 let Some(rest) = header.trim().strip_prefix(SIGNATURE_PREFIX) else { return false };
239 let rest = rest.trim();
240 rest.len() == SIGNATURE_HEX_LEN && rest.bytes().all(|b| b.is_ascii_hexdigit())
241}
242
243#[derive(Debug, Clone, PartialEq, Eq)]
248pub enum EventKind {
249 Push,
251 Ping,
255 Other(String),
259 Unknown,
263}
264
265pub fn classify_event(headers: &HeaderMap) -> EventKind {
267 for name in EVENT_HEADERS {
268 let Some(raw) = headers.get(*name).and_then(|v| v.to_str().ok()) else { continue };
269 let value = raw.trim();
270 if value.is_empty() {
271 continue;
272 }
273 if value.eq_ignore_ascii_case("push") || value.eq_ignore_ascii_case("push hook") {
274 return EventKind::Push;
275 }
276 if value.eq_ignore_ascii_case("ping") {
277 return EventKind::Ping;
278 }
279 return EventKind::Other(value.to_string());
280 }
281 EventKind::Unknown
282}
283
284pub fn delivery_id(headers: &HeaderMap) -> Option<String> {
288 for name in DELIVERY_HEADERS {
289 let Some(raw) = headers.get(*name).and_then(|v| v.to_str().ok()) else { continue };
290 let trimmed = raw.trim();
291 if !trimmed.is_empty() {
292 return Some(trimmed.to_string());
293 }
294 }
295 None
296}
297
298#[derive(Default)]
303pub struct DeliveryDedupCache {
304 seen: HashSet<String>,
305 order: VecDeque<String>,
306}
307
308impl DeliveryDedupCache {
309 pub fn new() -> Self {
310 Self::default()
311 }
312
313 pub fn record(&mut self, id: &str) -> bool {
316 if self.seen.contains(id) {
317 return false;
318 }
319 if self.order.len() >= DELIVERY_DEDUP_CAP {
320 if let Some(old) = self.order.pop_front() {
321 self.seen.remove(&old);
322 }
323 }
324 self.seen.insert(id.to_string());
325 self.order.push_back(id.to_string());
326 true
327 }
328
329 #[cfg(test)]
330 pub fn len(&self) -> usize {
331 self.order.len()
332 }
333
334 #[cfg(test)]
335 pub fn is_empty(&self) -> bool {
336 self.order.is_empty()
337 }
338}
339
340pub trait WebhookSecretResolver: Send + Sync + 'static {
346 fn resolve(&self) -> Option<Vec<u8>>;
347}
348
349pub struct EnvSecretResolver {
354 pub spec: Option<String>,
358}
359
360impl WebhookSecretResolver for EnvSecretResolver {
361 fn resolve(&self) -> Option<Vec<u8>> {
362 let spec = self.spec.as_deref()?;
363 let raw = if let Some(var) = spec.strip_prefix("env:") {
364 std::env::var(var).ok()?.into_bytes()
365 } else {
366 spec.as_bytes().to_vec()
367 };
368 if raw.is_empty() {
372 None
373 } else {
374 Some(raw)
375 }
376 }
377}
378
379#[derive(Clone)]
381pub struct StaticSecretResolver {
382 pub secret: Option<Vec<u8>>,
383}
384
385impl WebhookSecretResolver for StaticSecretResolver {
386 fn resolve(&self) -> Option<Vec<u8>> {
387 self.secret.clone()
388 }
389}
390
391#[derive(Clone)]
393pub struct WebhookConfig {
394 pub secret: Arc<dyn WebhookSecretResolver>,
397 pub branch: Option<String>,
400 pub repo: Option<String>,
404 pub dedup: Arc<Mutex<DeliveryDedupCache>>,
409 pub extractor: Arc<dyn WebhookPayloadExtractor>,
414 pub concurrency: Option<Arc<WebhookConcurrencyLimit>>,
419 pub rate_limit: Option<Arc<WebhookRateLimiter>>,
427}
428
429impl WebhookConfig {
430 pub fn new(
433 secret: Arc<dyn WebhookSecretResolver>,
434 branch: Option<String>,
435 repo: Option<String>,
436 ) -> Self {
437 Self::with_extractor(secret, branch, repo, Arc::new(RefHeadsExtractor))
438 }
439
440 pub fn with_extractor(
444 secret: Arc<dyn WebhookSecretResolver>,
445 branch: Option<String>,
446 repo: Option<String>,
447 extractor: Arc<dyn WebhookPayloadExtractor>,
448 ) -> Self {
449 Self {
450 secret,
451 branch,
452 repo,
453 dedup: Arc::new(Mutex::new(DeliveryDedupCache::new())),
454 extractor,
455 concurrency: None,
456 rate_limit: None,
457 }
458 }
459
460 pub fn with_concurrency_limit(mut self, limit: Arc<WebhookConcurrencyLimit>) -> Self {
463 self.concurrency = Some(limit);
464 self
465 }
466
467 pub fn with_rate_limit(mut self, limit: Arc<WebhookRateLimiter>) -> Self {
470 self.rate_limit = Some(limit);
471 self
472 }
473}
474
475#[derive(Debug, Default, Clone, PartialEq, Eq)]
481pub struct ParsedPush {
482 pub branch: Option<String>,
483 pub repo_hint: Option<String>,
484}
485
486pub trait WebhookPayloadExtractor: Send + Sync + 'static {
492 fn extract(&self, headers: &HeaderMap, body: &[u8]) -> Option<ParsedPush>;
493}
494
495pub struct RefHeadsExtractor;
500
501impl WebhookPayloadExtractor for RefHeadsExtractor {
502 fn extract(&self, _headers: &HeaderMap, body: &[u8]) -> Option<ParsedPush> {
503 let value: serde_json::Value = serde_json::from_slice(body).ok()?;
504 let branch = value
505 .get("ref")
506 .and_then(|v| v.as_str())
507 .and_then(|r| r.strip_prefix("refs/heads/"))
508 .map(|s| s.to_string());
509 let repo_hint = value
510 .get("repository")
511 .and_then(|r| r.get("full_name").or_else(|| r.get("name")))
512 .and_then(|v| v.as_str())
513 .map(|s| s.to_string());
514 Some(ParsedPush { branch, repo_hint })
515 }
516}
517
518pub struct BitbucketServerExtractor;
521
522impl WebhookPayloadExtractor for BitbucketServerExtractor {
523 fn extract(&self, _headers: &HeaderMap, body: &[u8]) -> Option<ParsedPush> {
524 let value: serde_json::Value = serde_json::from_slice(body).ok()?;
525 let branch = value
526 .get("changes")
527 .and_then(|c| c.as_array())
528 .and_then(|arr| arr.first())
529 .and_then(|first| first.get("refId"))
530 .and_then(|v| v.as_str())
531 .and_then(|r| r.strip_prefix("refs/heads/"))
532 .map(|s| s.to_string());
533 let repo_hint = value
534 .get("repository")
535 .and_then(|r| r.get("slug").or_else(|| r.get("name")))
536 .and_then(|v| v.as_str())
537 .map(|s| s.to_string());
538 Some(ParsedPush { branch, repo_hint })
539 }
540}
541
542pub struct SourcehutExtractor;
545
546impl WebhookPayloadExtractor for SourcehutExtractor {
547 fn extract(&self, _headers: &HeaderMap, body: &[u8]) -> Option<ParsedPush> {
548 let value: serde_json::Value = serde_json::from_slice(body).ok()?;
549 let event = value.get("event")?;
550 let branch = event
551 .get("refs")
552 .and_then(|r| r.as_array())
553 .and_then(|arr| arr.first())
554 .and_then(|first| first.get("name"))
555 .and_then(|v| v.as_str())
556 .and_then(|r| r.strip_prefix("refs/heads/").or(Some(r)))
557 .map(|s| s.to_string());
558 let repo_hint = event
559 .get("repo")
560 .and_then(|r| r.get("name"))
561 .and_then(|v| v.as_str())
562 .map(|s| s.to_string());
563 Some(ParsedPush { branch, repo_hint })
564 }
565}
566
567pub fn extractor_for_provider(name: Option<&str>) -> Arc<dyn WebhookPayloadExtractor> {
571 let Some(raw) = name else { return Arc::new(RefHeadsExtractor) };
572 match raw.trim().to_ascii_lowercase().as_str() {
573 "" | "github" | "gitea" | "forgejo" | "gogs" | "gitlab" | "refheads" => {
574 Arc::new(RefHeadsExtractor)
575 }
576 "bitbucket" | "bitbucket-server" | "bitbucket_data_center" => {
577 Arc::new(BitbucketServerExtractor)
578 }
579 "sourcehut" | "srht" => Arc::new(SourcehutExtractor),
580 other => {
583 tracing::warn!(
584 provider = other,
585 "unknown `[triggers].webhook_provider`; defaulting to `refheads`"
586 );
587 Arc::new(RefHeadsExtractor)
588 }
589 }
590}
591
592#[derive(Debug, Serialize)]
593pub struct WebhookResponse {
594 pub triggered: bool,
595 pub run_id: Option<String>,
598 pub message: String,
601}
602
603fn peer_ip_from_request(req: &Request<Body>) -> Option<IpAddr> {
610 req.extensions().get::<ConnectInfo<SocketAddr>>().map(|c| c.0.ip())
611}
612
613pub async fn webhook_git(
615 State(state): State<ServerState>,
616 req: Request<Body>,
617) -> Result<impl IntoResponse, ApiError> {
618 let Some(cfg) = state.webhook.as_ref() else {
619 return Err(ApiError::Internal(
620 "webhook not enabled; set [triggers].webhook_secret_ref in nyx-agent.toml".to_string(),
621 ));
622 };
623
624 if let Some(limiter) = cfg.rate_limit.as_ref() {
629 if let Some(ip) = peer_ip_from_request(&req) {
630 if !limiter.admit(ip) {
631 return Err(ApiError::TooManyRequests(format!(
632 "webhook rate limit exceeded for `{ip}`"
633 )));
634 }
635 }
636 }
637
638 let _permit = if let Some(limit) = cfg.concurrency.as_ref() {
642 match limit.try_acquire() {
643 Some(permit) => Some(permit),
644 None => {
645 return Err(ApiError::TooManyRequests(
646 "webhook concurrency limit reached".to_string(),
647 ));
648 }
649 }
650 } else {
651 None
652 };
653
654 let Some(secret) = cfg.secret.resolve() else {
655 return Ok((
659 StatusCode::SERVICE_UNAVAILABLE,
660 Json(WebhookResponse {
661 triggered: false,
662 run_id: None,
663 message: "webhook secret is not configured".to_string(),
664 }),
665 )
666 .into_response());
667 };
668
669 let sig_header = req
673 .headers()
674 .get(SIGNATURE_HEADER)
675 .and_then(|v| v.to_str().ok())
676 .map(|s| s.to_string())
677 .ok_or(ApiError::Unauthorized)?;
678 if !signature_header_is_well_formed(&sig_header) {
679 return Err(ApiError::Unauthorized);
680 }
681
682 let event = classify_event(req.headers());
689 match &event {
690 EventKind::Push | EventKind::Unknown => {}
691 EventKind::Ping => {
692 return Ok((
693 StatusCode::OK,
694 Json(WebhookResponse {
695 triggered: false,
696 run_id: None,
697 message: "ping event acknowledged".to_string(),
698 }),
699 )
700 .into_response());
701 }
702 EventKind::Other(name) => {
703 return Ok((
704 StatusCode::OK,
705 Json(WebhookResponse {
706 triggered: false,
707 run_id: None,
708 message: format!("event `{name}` is not a push; ignored"),
709 }),
710 )
711 .into_response());
712 }
713 }
714
715 if let Some(declared) = req
720 .headers()
721 .get(CONTENT_LENGTH)
722 .and_then(|v| v.to_str().ok())
723 .and_then(|s| s.parse::<usize>().ok())
724 {
725 if declared > MAX_WEBHOOK_BODY_BYTES {
726 return Err(ApiError::PayloadTooLarge(format!(
727 "webhook body {declared} bytes exceeds {MAX_WEBHOOK_BODY_BYTES} byte limit"
728 )));
729 }
730 }
731
732 let (parts, body) = req.into_parts();
733 let headers = parts.headers;
734 let body_bytes = to_bytes(body, MAX_WEBHOOK_BODY_BYTES).await.map_err(|e| {
735 ApiError::PayloadTooLarge(format!("webhook body exceeded limit or failed to read: {e}"))
736 })?;
737
738 if !verify_signature(&secret, body_bytes.as_ref(), &sig_header) {
739 return Err(ApiError::Unauthorized);
740 }
741
742 if let Some(delivery) = delivery_id(&headers) {
748 let fresh = match cfg.dedup.lock() {
749 Ok(mut guard) => guard.record(&delivery),
750 Err(poisoned) => {
754 tracing::warn!("webhook dedup cache poisoned: {poisoned}");
755 true
756 }
757 };
758 if !fresh {
759 return Ok((
760 StatusCode::OK,
761 Json(WebhookResponse {
762 triggered: false,
763 run_id: None,
764 message: format!("delivery `{delivery}` already processed"),
765 }),
766 )
767 .into_response());
768 }
769 }
770
771 let parsed = cfg.extractor.extract(&headers, body_bytes.as_ref());
777 let branch = parsed.as_ref().and_then(|p| p.branch.clone());
778
779 if matches!(event, EventKind::Unknown) && branch.is_none() {
785 return Ok((
786 StatusCode::OK,
787 Json(WebhookResponse {
788 triggered: false,
789 run_id: None,
790 message: "payload carried no recognised ref; not a push event".to_string(),
791 }),
792 )
793 .into_response());
794 }
795
796 if let Some(want) = cfg.branch.as_deref() {
797 match branch.as_deref() {
798 Some(actual) if actual == want => {}
799 other => {
800 return Ok((
801 StatusCode::OK,
802 Json(WebhookResponse {
803 triggered: false,
804 run_id: None,
805 message: format!(
806 "branch filter rejected delivery (want `{want}`, got `{}`)",
807 other.unwrap_or("<unknown>")
808 ),
809 }),
810 )
811 .into_response());
812 }
813 }
814 }
815
816 let trigger: Arc<dyn ScanTrigger> = Arc::clone(&state.scan);
817 let run_id = trigger.trigger(ScanTriggerSource::Webhook, None, cfg.repo.clone(), None).await?;
822 Ok((
823 StatusCode::ACCEPTED,
824 Json(WebhookResponse { triggered: true, run_id: Some(run_id), message: String::new() }),
825 )
826 .into_response())
827}
828
829pub fn verify_signature(secret: &[u8], body: &[u8], header: &str) -> bool {
831 let Some(hex_sig) = header.trim().strip_prefix(SIGNATURE_PREFIX) else { return false };
832 let Ok(provided) = hex::decode(hex_sig.trim()) else { return false };
833 let Ok(mut mac) = HmacSha256::new_from_slice(secret) else { return false };
834 mac.update(body);
835 let expected = mac.finalize().into_bytes();
836 provided.as_slice().ct_eq(expected.as_slice()).into()
837}
838
839pub fn sign(secret: &[u8], body: &[u8]) -> String {
842 let mut mac = HmacSha256::new_from_slice(secret).expect("HMAC accepts any key length");
843 mac.update(body);
844 format!("{}{}", SIGNATURE_PREFIX, hex::encode(mac.finalize().into_bytes()))
845}
846
847#[cfg(test)]
848mod tests {
849 use super::*;
850
851 #[test]
852 fn signature_roundtrip() {
853 let secret = b"hunter2";
854 let body = br#"{"ref":"refs/heads/main"}"#;
855 let header = sign(secret, body);
856 assert!(header.starts_with(SIGNATURE_PREFIX));
857 assert!(verify_signature(secret, body, &header));
858 }
859
860 #[test]
861 fn signature_rejects_modified_body() {
862 let secret = b"hunter2";
863 let body = br#"{"ref":"refs/heads/main"}"#;
864 let header = sign(secret, body);
865 assert!(!verify_signature(secret, br#"{"ref":"refs/heads/evil"}"#, &header));
866 }
867
868 #[test]
869 fn signature_rejects_wrong_secret() {
870 let secret = b"hunter2";
871 let body = br#"{"ref":"refs/heads/main"}"#;
872 let header = sign(secret, body);
873 assert!(!verify_signature(b"wrong-secret", body, &header));
874 }
875
876 #[test]
877 fn signature_rejects_missing_prefix() {
878 let secret = b"hunter2";
879 let body = b"{}";
880 let mut header = sign(secret, body);
881 header.replace_range(..SIGNATURE_PREFIX.len(), "");
883 assert!(!verify_signature(secret, body, &header));
884 }
885
886 #[test]
887 fn env_resolver_reads_from_environment() {
888 let var = format!("NYX_TEST_WEBHOOK_{}", std::process::id());
891 std::env::set_var(&var, "shh");
892 let resolver = EnvSecretResolver { spec: Some(format!("env:{var}")) };
893 assert_eq!(resolver.resolve().as_deref(), Some(b"shh".as_slice()));
894 std::env::remove_var(&var);
895 assert!(resolver.resolve().is_none());
896 }
897
898 #[test]
899 fn env_resolver_passes_literal_through() {
900 let resolver = EnvSecretResolver { spec: Some("literal-secret".to_string()) };
901 assert_eq!(resolver.resolve().as_deref(), Some(b"literal-secret".as_slice()));
902 }
903
904 #[test]
905 fn env_resolver_returns_none_when_unset() {
906 let resolver = EnvSecretResolver { spec: None };
907 assert!(resolver.resolve().is_none());
908 }
909
910 #[test]
911 fn env_resolver_refuses_empty_literal() {
912 let resolver = EnvSecretResolver { spec: Some(String::new()) };
913 assert!(resolver.resolve().is_none(), "empty literal secret must not pass HMAC auth");
914 }
915
916 #[test]
917 fn env_resolver_refuses_empty_env_value() {
918 let var = format!("NYX_TEST_WEBHOOK_EMPTY_{}", std::process::id());
919 std::env::set_var(&var, "");
920 let resolver = EnvSecretResolver { spec: Some(format!("env:{var}")) };
921 assert!(resolver.resolve().is_none(), "empty env-backed secret must not pass HMAC auth");
922 std::env::remove_var(&var);
923 }
924
925 #[test]
926 fn signature_header_shape_accepts_canonical_form() {
927 let header = format!("sha256={}", "a".repeat(SIGNATURE_HEX_LEN));
928 assert!(signature_header_is_well_formed(&header));
929 }
930
931 #[test]
932 fn signature_header_shape_accepts_mixed_case_hex() {
933 let header = format!("sha256={}", "AbCdEf0123456789".repeat(4));
934 assert!(signature_header_is_well_formed(&header));
935 }
936
937 #[test]
938 fn signature_header_shape_rejects_missing_prefix() {
939 let header = "a".repeat(SIGNATURE_HEX_LEN);
940 assert!(!signature_header_is_well_formed(&header));
941 }
942
943 #[test]
944 fn signature_header_shape_rejects_short_digest() {
945 let header = format!("sha256={}", "a".repeat(SIGNATURE_HEX_LEN - 1));
946 assert!(!signature_header_is_well_formed(&header));
947 }
948
949 #[test]
950 fn signature_header_shape_rejects_long_digest() {
951 let header = format!("sha256={}", "a".repeat(SIGNATURE_HEX_LEN + 1));
952 assert!(!signature_header_is_well_formed(&header));
953 }
954
955 #[test]
956 fn signature_header_shape_rejects_non_hex_chars() {
957 let header = format!("sha256={}", "z".repeat(SIGNATURE_HEX_LEN));
958 assert!(!signature_header_is_well_formed(&header));
959 }
960
961 fn map(pairs: &[(&str, &str)]) -> HeaderMap {
962 let mut m = HeaderMap::new();
963 for (k, v) in pairs {
964 m.insert(
965 axum::http::HeaderName::from_bytes(k.as_bytes()).expect("header name"),
966 axum::http::HeaderValue::from_str(v).expect("header value"),
967 );
968 }
969 m
970 }
971
972 #[test]
973 fn classify_event_recognises_github_push() {
974 assert_eq!(classify_event(&map(&[("X-GitHub-Event", "push")])), EventKind::Push);
975 }
976
977 #[test]
978 fn classify_event_is_case_insensitive() {
979 assert_eq!(classify_event(&map(&[("X-GitHub-Event", "PuSh")])), EventKind::Push);
980 }
981
982 #[test]
983 fn classify_event_recognises_gitlab_push_hook() {
984 assert_eq!(classify_event(&map(&[("X-Gitlab-Event", "Push Hook")])), EventKind::Push);
985 }
986
987 #[test]
988 fn classify_event_recognises_ping() {
989 assert_eq!(classify_event(&map(&[("X-GitHub-Event", "ping")])), EventKind::Ping);
990 }
991
992 #[test]
993 fn classify_event_returns_other_for_unknown_event_name() {
994 match classify_event(&map(&[("X-GitHub-Event", "issues")])) {
995 EventKind::Other(name) => assert_eq!(name, "issues"),
996 other => panic!("expected Other(\"issues\"), got {other:?}"),
997 }
998 }
999
1000 #[test]
1001 fn classify_event_returns_unknown_when_no_provider_header() {
1002 assert_eq!(classify_event(&HeaderMap::new()), EventKind::Unknown);
1003 }
1004
1005 #[test]
1006 fn classify_event_ignores_empty_header_value() {
1007 assert_eq!(classify_event(&map(&[("X-GitHub-Event", "")])), EventKind::Unknown);
1008 }
1009
1010 #[test]
1011 fn delivery_id_reads_github_header() {
1012 let id = delivery_id(&map(&[("X-GitHub-Delivery", "abc-123")]));
1013 assert_eq!(id.as_deref(), Some("abc-123"));
1014 }
1015
1016 #[test]
1017 fn delivery_id_reads_gitea_header_when_github_absent() {
1018 let id = delivery_id(&map(&[("X-Gitea-Delivery", "xyz-789")]));
1019 assert_eq!(id.as_deref(), Some("xyz-789"));
1020 }
1021
1022 #[test]
1023 fn delivery_id_is_none_when_no_header() {
1024 assert!(delivery_id(&HeaderMap::new()).is_none());
1025 }
1026
1027 #[test]
1028 fn dedup_cache_records_new_id() {
1029 let mut cache = DeliveryDedupCache::new();
1030 assert!(cache.record("a"));
1031 assert_eq!(cache.len(), 1);
1032 }
1033
1034 #[test]
1035 fn dedup_cache_drops_repeat() {
1036 let mut cache = DeliveryDedupCache::new();
1037 assert!(cache.record("a"));
1038 assert!(!cache.record("a"), "second insert must report duplicate");
1039 assert_eq!(cache.len(), 1);
1040 }
1041
1042 #[test]
1043 fn refheads_extractor_reads_github_push() {
1044 let body = br#"{"ref":"refs/heads/main","repository":{"full_name":"acme/api"}}"#;
1045 let parsed = RefHeadsExtractor.extract(&HeaderMap::new(), body).expect("parsed");
1046 assert_eq!(parsed.branch.as_deref(), Some("main"));
1047 assert_eq!(parsed.repo_hint.as_deref(), Some("acme/api"));
1048 }
1049
1050 #[test]
1051 fn refheads_extractor_returns_none_branch_for_tag_push() {
1052 let body = br#"{"ref":"refs/tags/v1.2.3"}"#;
1053 let parsed = RefHeadsExtractor.extract(&HeaderMap::new(), body).expect("parsed");
1054 assert!(parsed.branch.is_none(), "tag pushes are not branch pushes");
1055 }
1056
1057 #[test]
1058 fn refheads_extractor_falls_back_to_repo_name() {
1059 let body = br#"{"ref":"refs/heads/dev","repository":{"name":"api"}}"#;
1060 let parsed = RefHeadsExtractor.extract(&HeaderMap::new(), body).expect("parsed");
1061 assert_eq!(parsed.repo_hint.as_deref(), Some("api"));
1062 }
1063
1064 #[test]
1065 fn refheads_extractor_returns_none_on_garbage() {
1066 assert!(RefHeadsExtractor.extract(&HeaderMap::new(), b"not-json").is_none());
1067 }
1068
1069 #[test]
1070 fn bitbucket_server_extractor_reads_changes_array() {
1071 let body = br#"{
1072 "changes":[{"refId":"refs/heads/develop","type":"UPDATE"}],
1073 "repository":{"slug":"api","name":"Api Service"}
1074 }"#;
1075 let parsed = BitbucketServerExtractor.extract(&HeaderMap::new(), body).expect("parsed");
1076 assert_eq!(parsed.branch.as_deref(), Some("develop"));
1077 assert_eq!(parsed.repo_hint.as_deref(), Some("api"));
1078 }
1079
1080 #[test]
1081 fn bitbucket_server_extractor_returns_none_branch_when_changes_empty() {
1082 let body = br#"{"changes":[],"repository":{"slug":"api"}}"#;
1083 let parsed = BitbucketServerExtractor.extract(&HeaderMap::new(), body).expect("parsed");
1084 assert!(parsed.branch.is_none());
1085 assert_eq!(parsed.repo_hint.as_deref(), Some("api"));
1086 }
1087
1088 #[test]
1089 fn sourcehut_extractor_reads_nested_event_refs() {
1090 let body =
1091 br#"{"event":{"refs":[{"name":"refs/heads/main"}],"repo":{"name":"~user/proj"}}}"#;
1092 let parsed = SourcehutExtractor.extract(&HeaderMap::new(), body).expect("parsed");
1093 assert_eq!(parsed.branch.as_deref(), Some("main"));
1094 assert_eq!(parsed.repo_hint.as_deref(), Some("~user/proj"));
1095 }
1096
1097 #[test]
1098 fn sourcehut_extractor_keeps_bare_branch_names() {
1099 let body = br#"{"event":{"refs":[{"name":"main"}]}}"#;
1102 let parsed = SourcehutExtractor.extract(&HeaderMap::new(), body).expect("parsed");
1103 assert_eq!(parsed.branch.as_deref(), Some("main"));
1104 }
1105
1106 #[test]
1107 fn extractor_for_provider_defaults_when_missing() {
1108 let body = br#"{"ref":"refs/heads/main"}"#;
1109 let ex = extractor_for_provider(None);
1110 assert_eq!(
1111 ex.extract(&HeaderMap::new(), body).and_then(|p| p.branch).as_deref(),
1112 Some("main"),
1113 );
1114 }
1115
1116 #[test]
1117 fn extractor_for_provider_matches_known_aliases() {
1118 for name in ["github", "GITHUB", " gitea ", "forgejo", "gogs", "gitlab", "refheads"] {
1119 let ex = extractor_for_provider(Some(name));
1120 let body = br#"{"ref":"refs/heads/main"}"#;
1121 assert_eq!(
1122 ex.extract(&HeaderMap::new(), body).and_then(|p| p.branch).as_deref(),
1123 Some("main"),
1124 "alias `{name}` should map to RefHeadsExtractor",
1125 );
1126 }
1127 }
1128
1129 #[test]
1130 fn extractor_for_provider_picks_bitbucket() {
1131 let ex = extractor_for_provider(Some("bitbucket"));
1132 let body = br#"{"changes":[{"refId":"refs/heads/main"}]}"#;
1133 assert_eq!(
1134 ex.extract(&HeaderMap::new(), body).and_then(|p| p.branch).as_deref(),
1135 Some("main"),
1136 );
1137 }
1138
1139 #[test]
1140 fn extractor_for_provider_picks_sourcehut() {
1141 let ex = extractor_for_provider(Some("sourcehut"));
1142 let body = br#"{"event":{"refs":[{"name":"refs/heads/main"}]}}"#;
1143 assert_eq!(
1144 ex.extract(&HeaderMap::new(), body).and_then(|p| p.branch).as_deref(),
1145 Some("main"),
1146 );
1147 }
1148
1149 #[test]
1150 fn extractor_for_provider_falls_back_on_unknown() {
1151 let ex = extractor_for_provider(Some("notarealthing"));
1152 let body = br#"{"ref":"refs/heads/main"}"#;
1153 assert_eq!(
1155 ex.extract(&HeaderMap::new(), body).and_then(|p| p.branch).as_deref(),
1156 Some("main"),
1157 );
1158 }
1159
1160 #[test]
1161 fn rate_limiter_admits_until_bucket_empty() {
1162 let limiter = WebhookRateLimiter::new(3, 0.0, 16);
1163 let ip: IpAddr = "127.0.0.1".parse().unwrap();
1164 assert!(limiter.admit(ip));
1165 assert!(limiter.admit(ip));
1166 assert!(limiter.admit(ip));
1167 assert!(!limiter.admit(ip), "fourth request must be refused");
1168 }
1169
1170 #[test]
1171 fn rate_limiter_refills_over_time() {
1172 let limiter = WebhookRateLimiter::new(2, 1.0, 16);
1174 let ip: IpAddr = "10.0.0.5".parse().unwrap();
1175 let t0 = Instant::now();
1176 assert!(limiter.admit_at(ip, t0));
1177 assert!(limiter.admit_at(ip, t0));
1178 assert!(!limiter.admit_at(ip, t0));
1180 let t1 = t0 + std::time::Duration::from_secs(1);
1182 assert!(limiter.admit_at(ip, t1));
1183 assert!(!limiter.admit_at(ip, t1));
1185 let t6 = t1 + std::time::Duration::from_secs(5);
1187 assert!(limiter.admit_at(ip, t6));
1188 assert!(limiter.admit_at(ip, t6));
1189 assert!(!limiter.admit_at(ip, t6));
1190 }
1191
1192 #[test]
1193 fn rate_limiter_per_ip_buckets_are_independent() {
1194 let limiter = WebhookRateLimiter::new(1, 0.0, 16);
1195 let a: IpAddr = "127.0.0.1".parse().unwrap();
1196 let b: IpAddr = "127.0.0.2".parse().unwrap();
1197 assert!(limiter.admit(a));
1198 assert!(!limiter.admit(a));
1200 assert!(limiter.admit(b));
1201 }
1202
1203 #[test]
1204 fn rate_limiter_per_minute_helper_matches_rate() {
1205 let limiter = WebhookRateLimiter::per_minute(60, 64);
1207 let ip: IpAddr = "127.0.0.1".parse().unwrap();
1208 let t0 = Instant::now();
1209 for _ in 0..60 {
1210 assert!(limiter.admit_at(ip, t0));
1211 }
1212 assert!(!limiter.admit_at(ip, t0));
1213 }
1214
1215 #[test]
1216 fn rate_limiter_evicts_oldest_ip_at_cap() {
1217 let limiter = WebhookRateLimiter::new(1, 0.0, 2);
1218 let t0 = Instant::now();
1219 let a: IpAddr = "127.0.0.1".parse().unwrap();
1220 let b: IpAddr = "127.0.0.2".parse().unwrap();
1221 let c: IpAddr = "127.0.0.3".parse().unwrap();
1222 assert!(limiter.admit_at(a, t0));
1223 assert!(limiter.admit_at(b, t0 + std::time::Duration::from_secs(1)));
1224 assert!(limiter.admit_at(c, t0 + std::time::Duration::from_secs(2)));
1225 assert_eq!(limiter.tracked_ips(), 2);
1228 }
1229
1230 #[test]
1231 fn concurrency_limit_refuses_past_cap() {
1232 let limit = WebhookConcurrencyLimit::new(2);
1233 let p1 = limit.try_acquire().expect("first permit");
1234 let p2 = limit.try_acquire().expect("second permit");
1235 assert!(limit.try_acquire().is_none(), "third acquire must fail when cap is reached");
1236 drop(p1);
1237 assert!(limit.try_acquire().is_some(), "releasing a permit must make one available again");
1238 drop(p2);
1239 }
1240
1241 #[test]
1242 fn concurrency_limit_floor_is_one() {
1243 let limit = WebhookConcurrencyLimit::new(0);
1244 assert_eq!(limit.permits(), 1);
1245 assert!(limit.try_acquire().is_some());
1246 }
1247
1248 #[test]
1249 fn dedup_cache_evicts_oldest_at_cap() {
1250 let mut cache = DeliveryDedupCache::new();
1251 for i in 0..DELIVERY_DEDUP_CAP {
1252 assert!(cache.record(&format!("d-{i}")));
1253 }
1254 assert_eq!(cache.len(), DELIVERY_DEDUP_CAP);
1255 assert!(cache.record("d-new"));
1257 assert_eq!(cache.len(), DELIVERY_DEDUP_CAP);
1258 assert!(cache.record("d-0"));
1260 }
1261}