1use std::cell::RefCell;
31use std::collections::{BTreeMap, HashMap};
32use std::sync::Arc;
33use std::time::Duration as StdDuration;
34
35use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
36use base64::Engine;
37use serde::{Deserialize, Serialize};
38use subtle::ConstantTimeEq;
39use time::format_description::well_known::Rfc3339;
40use time::OffsetDateTime;
41use uuid::Uuid;
42
43use crate::connectors::MetricsRegistry;
44use crate::event_log::{
45 active_event_log, install_memory_for_current_thread, EventLog, LogEvent, Topic,
46};
47use crate::runtime_limits::RuntimeLimits;
48use crate::triggers::inbox::InboxIndex;
49
50pub const DEFAULT_DEDUPE_TTL_SECS: u64 = 24 * 60 * 60;
53pub const REJECTION_TOPIC: &str = "triggers.webhook_intake.rejections";
55const INTAKE_EVENT_LOG_QUEUE_DEPTH: usize = RuntimeLimits::DEFAULT.default_event_log_queue_depth;
56
57#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
60pub struct WebhookIntakeId(pub String);
61
62impl WebhookIntakeId {
63 pub fn new() -> Self {
64 Self(format!("intake_{}", Uuid::now_v7()))
65 }
66
67 pub fn as_str(&self) -> &str {
68 &self.0
69 }
70}
71
72impl Default for WebhookIntakeId {
73 fn default() -> Self {
74 Self::new()
75 }
76}
77
78#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
80#[serde(rename_all = "snake_case")]
81pub enum HmacAlgorithm {
82 #[default]
83 Sha256,
84 Sha1,
85}
86
87impl HmacAlgorithm {
88 pub fn parse(raw: &str) -> Option<Self> {
89 Self::parse_with_legacy_sha1(raw, false)
90 }
91
92 pub fn parse_with_legacy_sha1(raw: &str, allow_legacy_sha1: bool) -> Option<Self> {
93 match raw.trim().to_ascii_lowercase().as_str() {
94 "" | "sha256" | "hmac-sha256" => Some(Self::Sha256),
95 "sha1" | "hmac-sha1" if allow_legacy_sha1 => Some(Self::Sha1),
96 _ => None,
97 }
98 }
99
100 pub fn is_legacy_sha1_alias(raw: &str) -> bool {
101 matches!(
102 raw.trim().to_ascii_lowercase().as_str(),
103 "sha1" | "hmac-sha1"
104 )
105 }
106
107 pub fn as_str(&self) -> &'static str {
108 match self {
109 Self::Sha256 => "sha256",
110 Self::Sha1 => "sha1",
111 }
112 }
113}
114
115#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
117#[serde(rename_all = "snake_case")]
118pub enum SignatureEncoding {
119 #[default]
120 Hex,
121 Base64,
122}
123
124impl SignatureEncoding {
125 pub fn parse(raw: &str) -> Option<Self> {
126 match raw.trim().to_ascii_lowercase().as_str() {
127 "" | "hex" | "base16" => Some(Self::Hex),
128 "base64" | "b64" => Some(Self::Base64),
129 _ => None,
130 }
131 }
132
133 pub fn as_str(&self) -> &'static str {
134 match self {
135 Self::Hex => "hex",
136 Self::Base64 => "base64",
137 }
138 }
139}
140
141#[derive(Clone, Debug)]
143pub struct WebhookIntakeConfig {
144 pub id: Option<String>,
145 pub path: Option<String>,
146 pub signature_header: String,
147 pub signature_prefix: Option<String>,
148 pub signature_encoding: SignatureEncoding,
149 pub algorithm: HmacAlgorithm,
150 pub allow_legacy_sha1: bool,
151 pub secret: Vec<u8>,
152 pub delivery_id_header: String,
153 pub topic: String,
154 pub dedupe_ttl: StdDuration,
155}
156
157impl WebhookIntakeConfig {
158 pub fn default_prefix(algorithm: HmacAlgorithm) -> Option<String> {
162 Some(format!("{}=", algorithm.as_str()))
163 }
164}
165
166#[derive(Clone, Debug, PartialEq, Eq)]
168pub enum WebhookIntakeStatus {
169 Accepted,
170 Duplicate,
171 Rejected,
172}
173
174impl WebhookIntakeStatus {
175 pub fn as_str(&self) -> &'static str {
176 match self {
177 Self::Accepted => "accepted",
178 Self::Duplicate => "duplicate",
179 Self::Rejected => "rejected",
180 }
181 }
182}
183
184#[derive(Clone, Debug, Serialize, Deserialize)]
186pub struct WebhookIntakeOutcome {
187 pub status: String,
188 pub intake_id: String,
189 pub topic: String,
190 pub delivery_id: Option<String>,
191 pub topic_event_id: Option<u64>,
192 pub reason: Option<String>,
193 pub received_at: String,
194}
195
196#[derive(Clone, Debug, Serialize, Deserialize)]
198pub struct WebhookIntakeSnapshot {
199 pub id: String,
200 pub path: Option<String>,
201 pub topic: String,
202 pub signature_header: String,
203 pub signature_prefix: Option<String>,
204 pub signature_encoding: String,
205 pub algorithm: String,
206 pub allow_legacy_sha1: bool,
207 pub delivery_id_header: String,
208 pub dedupe_ttl_seconds: u64,
209}
210
211#[derive(Clone, Debug, Default)]
213pub struct WebhookIntakeRequest {
214 pub headers: BTreeMap<String, String>,
215 pub body: Vec<u8>,
216 pub path: Option<String>,
217 pub received_at: Option<OffsetDateTime>,
218}
219
220#[derive(Debug)]
221pub enum WebhookIntakeError {
222 Config(String),
223 UnknownIntake(String),
224 Internal(String),
225}
226
227impl std::fmt::Display for WebhookIntakeError {
228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 match self {
230 Self::Config(detail) => write!(f, "intake config: {detail}"),
231 Self::UnknownIntake(id) => write!(f, "intake `{id}` is not registered"),
232 Self::Internal(detail) => write!(f, "intake substrate: {detail}"),
233 }
234 }
235}
236
237impl std::error::Error for WebhookIntakeError {}
238
239#[derive(Clone)]
240struct RegisteredIntake {
241 snapshot: WebhookIntakeSnapshot,
242 config: WebhookIntakeConfig,
243}
244
245#[derive(Default)]
246struct WebhookIntakeRegistry {
247 by_id: HashMap<String, RegisteredIntake>,
248 by_path: HashMap<String, String>,
249}
250
251thread_local! {
252 static REGISTRY: RefCell<WebhookIntakeRegistry> =
253 RefCell::new(WebhookIntakeRegistry::default());
254}
255
256fn with_registry<R>(f: impl FnOnce(&WebhookIntakeRegistry) -> R) -> R {
257 REGISTRY.with(|slot| f(&slot.borrow()))
258}
259
260fn with_registry_mut<R>(f: impl FnOnce(&mut WebhookIntakeRegistry) -> R) -> R {
261 REGISTRY.with(|slot| f(&mut slot.borrow_mut()))
262}
263
264pub fn clear_webhook_intake_state() {
266 with_registry_mut(|state| {
267 state.by_id.clear();
268 state.by_path.clear();
269 });
270 reset_cached_inbox();
271}
272
273pub fn snapshot_webhook_intakes() -> Vec<WebhookIntakeSnapshot> {
275 with_registry(|state| {
276 let mut out: Vec<_> = state
277 .by_id
278 .values()
279 .map(|entry| entry.snapshot.clone())
280 .collect();
281 out.sort_by(|left, right| left.id.cmp(&right.id));
282 out
283 })
284}
285
286pub fn intake_for_path(path: &str) -> Option<String> {
288 with_registry(|state| state.by_path.get(path).cloned())
289}
290
291pub fn register_webhook_intake(
293 config: WebhookIntakeConfig,
294) -> Result<WebhookIntakeSnapshot, WebhookIntakeError> {
295 if config.signature_header.trim().is_empty() {
296 return Err(WebhookIntakeError::Config(
297 "signature_header cannot be empty".to_string(),
298 ));
299 }
300 if config.delivery_id_header.trim().is_empty() {
301 return Err(WebhookIntakeError::Config(
302 "delivery_id_header cannot be empty".to_string(),
303 ));
304 }
305 if config.topic.trim().is_empty() {
306 return Err(WebhookIntakeError::Config(
307 "topic cannot be empty".to_string(),
308 ));
309 }
310 if config.secret.is_empty() {
311 return Err(WebhookIntakeError::Config(
312 "secret cannot be empty".to_string(),
313 ));
314 }
315 if config.algorithm == HmacAlgorithm::Sha1 && !config.allow_legacy_sha1 {
316 return Err(WebhookIntakeError::Config(
317 "algorithm `sha1` is legacy; set `allow_legacy_sha1: true` to verify SHA-1 HMAC signatures for an existing provider".to_string(),
318 ));
319 }
320 Topic::new(config.topic.clone())
321 .map_err(|error| WebhookIntakeError::Config(format!("invalid topic: {error}")))?;
322
323 let id = config
324 .id
325 .clone()
326 .filter(|raw| !raw.trim().is_empty())
327 .unwrap_or_else(|| WebhookIntakeId::new().0);
328
329 with_registry_mut(|state| {
330 if state.by_id.contains_key(&id) {
331 return Err(WebhookIntakeError::Config(format!(
332 "intake `{id}` is already registered"
333 )));
334 }
335 if let Some(path) = config.path.as_ref() {
336 if state.by_path.contains_key(path) {
337 return Err(WebhookIntakeError::Config(format!(
338 "path `{path}` is already bound to intake `{}`",
339 state.by_path[path]
340 )));
341 }
342 }
343
344 let snapshot = WebhookIntakeSnapshot {
345 id: id.clone(),
346 path: config.path.clone(),
347 topic: config.topic.clone(),
348 signature_header: normalize_header(&config.signature_header),
349 signature_prefix: config.signature_prefix.clone(),
350 signature_encoding: config.signature_encoding.as_str().to_string(),
351 algorithm: config.algorithm.as_str().to_string(),
352 allow_legacy_sha1: config.allow_legacy_sha1,
353 delivery_id_header: normalize_header(&config.delivery_id_header),
354 dedupe_ttl_seconds: config.dedupe_ttl.as_secs(),
355 };
356
357 let mut stored = config;
358 stored.id = Some(id.clone());
359 stored.signature_header = normalize_header(&stored.signature_header);
360 stored.delivery_id_header = normalize_header(&stored.delivery_id_header);
361
362 if let Some(path) = stored.path.as_ref() {
363 state.by_path.insert(path.clone(), id.clone());
364 }
365 state.by_id.insert(
366 id,
367 RegisteredIntake {
368 snapshot: snapshot.clone(),
369 config: stored,
370 },
371 );
372 Ok(snapshot)
373 })
374}
375
376pub fn deregister_webhook_intake(intake_id: &str) -> bool {
378 with_registry_mut(|state| {
379 let Some(entry) = state.by_id.remove(intake_id) else {
380 return false;
381 };
382 if let Some(path) = entry.snapshot.path {
383 state.by_path.remove(&path);
384 }
385 true
386 })
387}
388
389pub async fn feed_webhook_intake(
396 intake_id: &str,
397 request: WebhookIntakeRequest,
398) -> Result<WebhookIntakeOutcome, WebhookIntakeError> {
399 let entry = with_registry(|state| state.by_id.get(intake_id).cloned())
400 .ok_or_else(|| WebhookIntakeError::UnknownIntake(intake_id.to_string()))?;
401 let received_at = request.received_at.unwrap_or_else(OffsetDateTime::now_utc);
402 let received_at_str = format_rfc3339(received_at);
403 let log = ensure_event_log();
404 let topic = Topic::new(entry.snapshot.topic.clone())
405 .map_err(|error| WebhookIntakeError::Internal(format!("invalid topic: {error}")))?;
406
407 if let Some(expected_path) = entry.config.path.as_deref() {
408 if let Some(actual_path) = request.path.as_deref() {
409 if actual_path != expected_path {
410 let reason =
411 format!("path mismatch: expected `{expected_path}`, got `{actual_path}`");
412 emit_rejection(&log, &entry.snapshot, &reason, &received_at_str).await;
413 return Ok(WebhookIntakeOutcome {
414 status: WebhookIntakeStatus::Rejected.as_str().to_string(),
415 intake_id: entry.snapshot.id.clone(),
416 topic: entry.snapshot.topic.clone(),
417 delivery_id: None,
418 topic_event_id: None,
419 reason: Some(reason),
420 received_at: received_at_str,
421 });
422 }
423 }
424 }
425
426 let signature = match find_header(&request.headers, &entry.config.signature_header) {
427 Some(value) => value,
428 None => {
429 let reason = format!(
430 "missing signature header `{}`",
431 entry.config.signature_header
432 );
433 emit_rejection(&log, &entry.snapshot, &reason, &received_at_str).await;
434 return Ok(WebhookIntakeOutcome {
435 status: WebhookIntakeStatus::Rejected.as_str().to_string(),
436 intake_id: entry.snapshot.id.clone(),
437 topic: entry.snapshot.topic.clone(),
438 delivery_id: None,
439 topic_event_id: None,
440 reason: Some(reason),
441 received_at: received_at_str,
442 });
443 }
444 };
445
446 if let Err(reason) = verify_signature(&entry.config, signature.as_str(), &request.body) {
447 emit_rejection(&log, &entry.snapshot, &reason, &received_at_str).await;
448 return Ok(WebhookIntakeOutcome {
449 status: WebhookIntakeStatus::Rejected.as_str().to_string(),
450 intake_id: entry.snapshot.id.clone(),
451 topic: entry.snapshot.topic.clone(),
452 delivery_id: None,
453 topic_event_id: None,
454 reason: Some(reason),
455 received_at: received_at_str,
456 });
457 }
458
459 let delivery_id = match find_header(&request.headers, &entry.config.delivery_id_header) {
460 Some(value) if !value.trim().is_empty() => value,
461 _ => {
462 let reason = format!(
463 "missing delivery id header `{}`",
464 entry.config.delivery_id_header
465 );
466 emit_rejection(&log, &entry.snapshot, &reason, &received_at_str).await;
467 return Ok(WebhookIntakeOutcome {
468 status: WebhookIntakeStatus::Rejected.as_str().to_string(),
469 intake_id: entry.snapshot.id.clone(),
470 topic: entry.snapshot.topic.clone(),
471 delivery_id: None,
472 topic_event_id: None,
473 reason: Some(reason),
474 received_at: received_at_str,
475 });
476 }
477 };
478
479 let inbox = ensure_inbox().await?;
480 let claim_key = format!("intake:{}", entry.snapshot.id);
481 let claimed = inbox
482 .insert_if_new(&claim_key, &delivery_id, entry.config.dedupe_ttl)
483 .await
484 .map_err(|error| WebhookIntakeError::Internal(format!("inbox claim: {error}")))?;
485 if !claimed {
486 return Ok(WebhookIntakeOutcome {
487 status: WebhookIntakeStatus::Duplicate.as_str().to_string(),
488 intake_id: entry.snapshot.id.clone(),
489 topic: entry.snapshot.topic.clone(),
490 delivery_id: Some(delivery_id),
491 topic_event_id: None,
492 reason: None,
493 received_at: received_at_str,
494 });
495 }
496
497 let payload = serde_json::json!({
498 "intake_id": entry.snapshot.id,
499 "delivery_id": delivery_id,
500 "received_at": received_at_str,
501 "headers": request.headers,
502 "body_b64": BASE64_STANDARD.encode(&request.body),
503 "body_text": std::str::from_utf8(&request.body).ok().map(str::to_string),
504 "path": entry.config.path,
505 "signature_header": entry.config.signature_header,
506 "delivery_id_header": entry.config.delivery_id_header,
507 "algorithm": entry.config.algorithm.as_str(),
508 });
509 let mut headers_meta = BTreeMap::new();
510 headers_meta.insert("intake_id".to_string(), entry.snapshot.id.clone());
511 headers_meta.insert("delivery_id".to_string(), delivery_id.clone());
512 let event_id = log
513 .append(
514 &topic,
515 LogEvent::new("webhook_delivery", payload).with_headers(headers_meta),
516 )
517 .await
518 .map_err(|error| WebhookIntakeError::Internal(format!("topic append: {error}")))?;
519
520 Ok(WebhookIntakeOutcome {
521 status: WebhookIntakeStatus::Accepted.as_str().to_string(),
522 intake_id: entry.snapshot.id.clone(),
523 topic: entry.snapshot.topic.clone(),
524 delivery_id: Some(delivery_id),
525 topic_event_id: Some(event_id),
526 reason: None,
527 received_at: received_at_str,
528 })
529}
530
531pub async fn recent_webhook_deliveries(
534 intake_id: &str,
535 limit: usize,
536) -> Result<Vec<serde_json::Value>, WebhookIntakeError> {
537 let topic_name = with_registry(|state| {
538 state
539 .by_id
540 .get(intake_id)
541 .map(|entry| entry.snapshot.topic.clone())
542 })
543 .ok_or_else(|| WebhookIntakeError::UnknownIntake(intake_id.to_string()))?;
544 let log = ensure_event_log();
545 let topic = Topic::new(topic_name)
546 .map_err(|error| WebhookIntakeError::Internal(format!("invalid topic: {error}")))?;
547 let events = log
548 .read_range(&topic, None, usize::MAX)
549 .await
550 .map_err(|error| WebhookIntakeError::Internal(format!("topic read: {error}")))?;
551 let mut payloads: Vec<serde_json::Value> = events
552 .into_iter()
553 .filter(|(_, event)| {
554 event.kind == "webhook_delivery"
555 && event
556 .headers
557 .get("intake_id")
558 .map(|value| value == intake_id)
559 .unwrap_or(true)
560 })
561 .map(|(_, event)| event.payload)
562 .collect();
563 if payloads.len() > limit {
564 let drop = payloads.len() - limit;
565 payloads.drain(0..drop);
566 }
567 Ok(payloads)
568}
569
570fn verify_signature(
571 config: &WebhookIntakeConfig,
572 raw_signature: &str,
573 body: &[u8],
574) -> Result<(), String> {
575 let stripped = strip_prefix(raw_signature, config.signature_prefix.as_deref())?;
576 let provided = decode_signature(stripped, config.signature_encoding)?;
577 let expected = compute_hmac(config.algorithm, &config.secret, body);
578 if provided.len() != expected.len() {
579 return Err(format!(
580 "signature length mismatch (expected {} bytes, got {})",
581 expected.len(),
582 provided.len()
583 ));
584 }
585 if expected.ct_eq(&provided).into() {
586 Ok(())
587 } else {
588 Err("signature mismatch".to_string())
589 }
590}
591
592fn strip_prefix<'a>(raw: &'a str, prefix: Option<&str>) -> Result<&'a str, String> {
593 let trimmed = raw.trim();
594 match prefix {
595 Some(expected) if !expected.is_empty() => trimmed
596 .strip_prefix(expected)
597 .ok_or_else(|| format!("signature missing expected prefix `{expected}`")),
598 _ => Ok(trimmed),
599 }
600}
601
602fn decode_signature(raw: &str, encoding: SignatureEncoding) -> Result<Vec<u8>, String> {
603 match encoding {
604 SignatureEncoding::Hex => hex::decode(raw).map_err(|error| format!("invalid hex: {error}")),
605 SignatureEncoding::Base64 => BASE64_STANDARD
606 .decode(raw)
607 .map_err(|error| format!("invalid base64: {error}")),
608 }
609}
610
611fn compute_hmac(algorithm: HmacAlgorithm, secret: &[u8], data: &[u8]) -> Vec<u8> {
612 match algorithm {
613 HmacAlgorithm::Sha256 => crate::connectors::hmac::hmac_sha256(secret, data),
614 HmacAlgorithm::Sha1 => crate::connectors::hmac::hmac_sha1(secret, data),
615 }
616}
617
618fn find_header(headers: &BTreeMap<String, String>, name: &str) -> Option<String> {
619 let lower = name.to_ascii_lowercase();
620 headers
621 .iter()
622 .find(|(key, _)| key.to_ascii_lowercase() == lower)
623 .map(|(_, value)| value.clone())
624}
625
626fn normalize_header(name: &str) -> String {
627 name.trim().to_ascii_lowercase()
628}
629
630fn format_rfc3339(value: OffsetDateTime) -> String {
631 value.format(&Rfc3339).unwrap_or_default()
632}
633
634fn ensure_event_log() -> Arc<crate::event_log::AnyEventLog> {
635 active_event_log()
636 .unwrap_or_else(|| install_memory_for_current_thread(INTAKE_EVENT_LOG_QUEUE_DEPTH))
637}
638
639thread_local! {
640 static CACHED_INBOX: RefCell<Option<(Arc<crate::event_log::AnyEventLog>, Arc<InboxIndex>)>> =
641 const { RefCell::new(None) };
642}
643
644async fn ensure_inbox() -> Result<Arc<InboxIndex>, WebhookIntakeError> {
645 let log = ensure_event_log();
646 if let Some(existing) = CACHED_INBOX.with(|slot| {
647 slot.borrow()
648 .as_ref()
649 .filter(|(cached_log, _)| Arc::ptr_eq(cached_log, &log))
650 .map(|(_, inbox)| inbox.clone())
651 }) {
652 return Ok(existing);
653 }
654 let metrics = Arc::new(MetricsRegistry::default());
655 let inbox = Arc::new(
656 InboxIndex::new(log.clone(), metrics)
657 .await
658 .map_err(|error| WebhookIntakeError::Internal(format!("inbox init: {error}")))?,
659 );
660 CACHED_INBOX.with(|slot| {
661 *slot.borrow_mut() = Some((log, inbox.clone()));
662 });
663 Ok(inbox)
664}
665
666fn reset_cached_inbox() {
667 CACHED_INBOX.with(|slot| *slot.borrow_mut() = None);
668}
669
670async fn emit_rejection(
671 log: &Arc<crate::event_log::AnyEventLog>,
672 snapshot: &WebhookIntakeSnapshot,
673 reason: &str,
674 received_at: &str,
675) {
676 let topic = match Topic::new(REJECTION_TOPIC) {
677 Ok(topic) => topic,
678 Err(_) => return,
679 };
680 let mut headers = BTreeMap::new();
681 headers.insert("intake_id".to_string(), snapshot.id.clone());
682 let payload = serde_json::json!({
683 "intake_id": snapshot.id,
684 "topic": snapshot.topic,
685 "path": snapshot.path,
686 "reason": reason,
687 "received_at": received_at,
688 });
689 let _ = log
690 .append(
691 &topic,
692 LogEvent::new("webhook_intake_rejected", payload).with_headers(headers),
693 )
694 .await;
695}
696
697pub fn build_request(
701 headers: BTreeMap<String, String>,
702 body: Vec<u8>,
703 path: Option<String>,
704 received_at: Option<OffsetDateTime>,
705) -> WebhookIntakeRequest {
706 WebhookIntakeRequest {
707 headers,
708 body,
709 path,
710 received_at,
711 }
712}
713
714#[cfg(test)]
715mod tests {
716 use super::*;
717 use crate::event_log::{install_default_for_base_dir, AnyEventLog, FileEventLog};
718
719 fn make_config(secret: &[u8]) -> WebhookIntakeConfig {
720 WebhookIntakeConfig {
721 id: None,
722 path: None,
723 signature_header: "x-test-signature".to_string(),
724 signature_prefix: Some("sha256=".to_string()),
725 signature_encoding: SignatureEncoding::Hex,
726 algorithm: HmacAlgorithm::Sha256,
727 allow_legacy_sha1: false,
728 secret: secret.to_vec(),
729 delivery_id_header: "x-test-delivery".to_string(),
730 topic: "tests.webhook_intake".to_string(),
731 dedupe_ttl: StdDuration::from_secs(60),
732 }
733 }
734
735 fn signed_headers(
736 config: &WebhookIntakeConfig,
737 body: &[u8],
738 delivery_id: &str,
739 ) -> BTreeMap<String, String> {
740 let digest = compute_hmac(config.algorithm, &config.secret, body);
741 let signature = match config.signature_encoding {
742 SignatureEncoding::Hex => hex::encode(&digest),
743 SignatureEncoding::Base64 => BASE64_STANDARD.encode(&digest),
744 };
745 let prefix = config.signature_prefix.clone().unwrap_or_default();
746 let mut headers = BTreeMap::new();
747 headers.insert(
748 config.signature_header.clone(),
749 format!("{prefix}{signature}"),
750 );
751 headers.insert(config.delivery_id_header.clone(), delivery_id.to_string());
752 headers
753 }
754
755 async fn reset() {
756 clear_webhook_intake_state();
757 crate::event_log::reset_active_event_log();
758 }
759
760 #[tokio::test(flavor = "current_thread")]
761 async fn round_trip_accepts_then_dedupes() {
762 reset().await;
763 let body = br#"{"hello":"world"}"#.to_vec();
764 let config = make_config(b"super-secret");
765 let snapshot = register_webhook_intake(config.clone()).expect("register");
766
767 let headers = signed_headers(&config, &body, "delivery-1");
768 let outcome = feed_webhook_intake(
769 snapshot.id.as_str(),
770 build_request(headers.clone(), body.clone(), None, None),
771 )
772 .await
773 .expect("feed");
774 assert_eq!(outcome.status, "accepted");
775 assert_eq!(outcome.delivery_id.as_deref(), Some("delivery-1"));
776 assert!(outcome.topic_event_id.is_some());
777
778 let dup = feed_webhook_intake(
779 snapshot.id.as_str(),
780 build_request(headers, body, None, None),
781 )
782 .await
783 .expect("feed dup");
784 assert_eq!(dup.status, "duplicate");
785 }
786
787 #[tokio::test(flavor = "current_thread")]
788 async fn rejects_bad_signature() {
789 reset().await;
790 let body = b"payload".to_vec();
791 let config = make_config(b"correct-secret");
792 let snapshot = register_webhook_intake(config.clone()).expect("register");
793
794 let mut headers = signed_headers(&config, &body, "delivery-1");
795 headers.insert(
796 config.signature_header.clone(),
797 "sha256=deadbeef".to_string(),
798 );
799 let outcome = feed_webhook_intake(
800 snapshot.id.as_str(),
801 build_request(headers, body, None, None),
802 )
803 .await
804 .expect("feed");
805 assert_eq!(outcome.status, "rejected");
806 assert!(outcome
807 .reason
808 .as_deref()
809 .unwrap_or("")
810 .contains("signature"));
811 }
812
813 #[tokio::test(flavor = "current_thread")]
814 async fn isolates_two_intakes_on_different_paths() {
815 reset().await;
816 let body = b"shared-body".to_vec();
817 let mut config_a = make_config(b"secret-a");
818 config_a.path = Some("/hooks/a".to_string());
819 config_a.topic = "tests.intake_a".to_string();
820 config_a.delivery_id_header = "x-a-delivery".to_string();
821
822 let mut config_b = make_config(b"secret-b");
823 config_b.path = Some("/hooks/b".to_string());
824 config_b.topic = "tests.intake_b".to_string();
825 config_b.delivery_id_header = "x-b-delivery".to_string();
826
827 let snap_a = register_webhook_intake(config_a.clone()).expect("register a");
828 let snap_b = register_webhook_intake(config_b.clone()).expect("register b");
829
830 let headers_a = signed_headers(&config_a, &body, "shared-delivery");
832 let outcome_a = feed_webhook_intake(
833 snap_a.id.as_str(),
834 build_request(headers_a, body.clone(), Some("/hooks/a".to_string()), None),
835 )
836 .await
837 .expect("feed a");
838 assert_eq!(outcome_a.status, "accepted");
839
840 let headers_b = signed_headers(&config_b, &body, "shared-delivery");
841 let outcome_b = feed_webhook_intake(
842 snap_b.id.as_str(),
843 build_request(headers_b, body, Some("/hooks/b".to_string()), None),
844 )
845 .await
846 .expect("feed b");
847 assert_eq!(outcome_b.status, "accepted");
848
849 let body = b"more".to_vec();
851 let headers = signed_headers(&config_a, &body, "another-delivery");
852 let outcome = feed_webhook_intake(
853 snap_a.id.as_str(),
854 build_request(headers, body, Some("/hooks/wrong".to_string()), None),
855 )
856 .await
857 .expect("feed mismatch");
858 assert_eq!(outcome.status, "rejected");
859 assert!(outcome.reason.unwrap().contains("path mismatch"));
860 }
861
862 #[tokio::test(flavor = "current_thread")]
863 async fn supports_sha1_legacy() {
864 reset().await;
865 let body = b"legacy".to_vec();
866 let mut config = make_config(b"legacy-secret");
867 config.algorithm = HmacAlgorithm::Sha1;
868 config.allow_legacy_sha1 = true;
869 config.signature_prefix = Some("sha1=".to_string());
870 let snapshot = register_webhook_intake(config.clone()).expect("register");
871
872 let headers = signed_headers(&config, &body, "legacy-delivery");
873 let outcome = feed_webhook_intake(
874 snapshot.id.as_str(),
875 build_request(headers, body, None, None),
876 )
877 .await
878 .expect("feed");
879 assert_eq!(outcome.status, "accepted");
880 }
881
882 #[test]
883 fn hmac_algorithm_parser_gates_sha1() {
884 assert_eq!(HmacAlgorithm::parse("sha256"), Some(HmacAlgorithm::Sha256));
885 assert_eq!(HmacAlgorithm::parse("sha1"), None);
886 assert_eq!(
887 HmacAlgorithm::parse_with_legacy_sha1("sha1", true),
888 Some(HmacAlgorithm::Sha1)
889 );
890 }
891
892 #[tokio::test(flavor = "current_thread")]
893 async fn rejects_sha1_without_legacy_opt_in() {
894 reset().await;
895 let mut config = make_config(b"legacy-secret");
896 config.algorithm = HmacAlgorithm::Sha1;
897 config.signature_prefix = Some("sha1=".to_string());
898
899 let error = register_webhook_intake(config).expect_err("sha1 requires opt-in");
900 assert!(error.to_string().contains("set `allow_legacy_sha1: true`"));
901 }
902
903 #[tokio::test(flavor = "current_thread")]
904 async fn dedupe_survives_process_restart() {
905 clear_webhook_intake_state();
906 crate::event_log::reset_active_event_log();
907 let tmp = tempfile::tempdir().expect("tempdir");
908 let log = Arc::new(AnyEventLog::File(
910 FileEventLog::open(tmp.path().to_path_buf(), 32).expect("file log"),
911 ));
912 crate::event_log::install_active_event_log(log.clone());
913
914 let body = br#"{"hello":"durable"}"#.to_vec();
915 let mut config = make_config(b"durable-secret");
916 config.id = Some("durable-intake".to_string());
919 let snapshot = register_webhook_intake(config.clone()).expect("register");
920 let headers = signed_headers(&config, &body, "durable-1");
921
922 let first = feed_webhook_intake(
923 snapshot.id.as_str(),
924 build_request(headers.clone(), body.clone(), None, None),
925 )
926 .await
927 .expect("first feed");
928 assert_eq!(first.status, "accepted");
929
930 clear_webhook_intake_state();
934 crate::event_log::reset_active_event_log();
935 let log = Arc::new(AnyEventLog::File(
936 FileEventLog::open(tmp.path().to_path_buf(), 32).expect("file log"),
937 ));
938 crate::event_log::install_active_event_log(log);
939 let snapshot = register_webhook_intake(config.clone()).expect("re-register");
940
941 let second = feed_webhook_intake(
942 snapshot.id.as_str(),
943 build_request(headers, body, None, None),
944 )
945 .await
946 .expect("second feed");
947 assert_eq!(second.status, "duplicate");
948
949 let _log = install_default_for_base_dir(tmp.path()).expect("install default");
952 }
953}