1use std::cell::RefCell;
31use std::collections::{BTreeMap, HashMap};
32use std::sync::Arc;
33use std::time::Duration as StdDuration;
34
35use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
36use base64::Engine;
37use serde::{Deserialize, Serialize};
38use subtle::ConstantTimeEq;
39use time::format_description::well_known::Rfc3339;
40use time::OffsetDateTime;
41use uuid::Uuid;
42
43use crate::connectors::MetricsRegistry;
44use crate::event_log::{
45 active_event_log, install_memory_for_current_thread, EventLog, LogEvent, Topic,
46};
47use crate::triggers::inbox::InboxIndex;
48
49pub const DEFAULT_DEDUPE_TTL_SECS: u64 = 24 * 60 * 60;
52pub const REJECTION_TOPIC: &str = "triggers.webhook_intake.rejections";
54const INTAKE_EVENT_LOG_QUEUE_DEPTH: usize = 128;
55
56#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
59pub struct WebhookIntakeId(pub String);
60
61impl WebhookIntakeId {
62 pub fn new() -> Self {
63 Self(format!("intake_{}", Uuid::now_v7()))
64 }
65
66 pub fn as_str(&self) -> &str {
67 &self.0
68 }
69}
70
71impl Default for WebhookIntakeId {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub enum HmacAlgorithm {
81 #[default]
82 Sha256,
83 Sha1,
84}
85
86impl HmacAlgorithm {
87 pub fn parse(raw: &str) -> Option<Self> {
88 match raw.trim().to_ascii_lowercase().as_str() {
89 "" | "sha256" | "hmac-sha256" => Some(Self::Sha256),
90 "sha1" | "hmac-sha1" => Some(Self::Sha1),
91 _ => None,
92 }
93 }
94
95 pub fn as_str(&self) -> &'static str {
96 match self {
97 Self::Sha256 => "sha256",
98 Self::Sha1 => "sha1",
99 }
100 }
101}
102
103#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub enum SignatureEncoding {
107 #[default]
108 Hex,
109 Base64,
110}
111
112impl SignatureEncoding {
113 pub fn parse(raw: &str) -> Option<Self> {
114 match raw.trim().to_ascii_lowercase().as_str() {
115 "" | "hex" | "base16" => Some(Self::Hex),
116 "base64" | "b64" => Some(Self::Base64),
117 _ => None,
118 }
119 }
120
121 pub fn as_str(&self) -> &'static str {
122 match self {
123 Self::Hex => "hex",
124 Self::Base64 => "base64",
125 }
126 }
127}
128
129#[derive(Clone, Debug)]
131pub struct WebhookIntakeConfig {
132 pub id: Option<String>,
133 pub path: Option<String>,
134 pub signature_header: String,
135 pub signature_prefix: Option<String>,
136 pub signature_encoding: SignatureEncoding,
137 pub algorithm: HmacAlgorithm,
138 pub secret: Vec<u8>,
139 pub delivery_id_header: String,
140 pub topic: String,
141 pub dedupe_ttl: StdDuration,
142}
143
144impl WebhookIntakeConfig {
145 pub fn default_prefix(algorithm: HmacAlgorithm) -> Option<String> {
149 Some(format!("{}=", algorithm.as_str()))
150 }
151}
152
153#[derive(Clone, Debug, PartialEq, Eq)]
155pub enum WebhookIntakeStatus {
156 Accepted,
157 Duplicate,
158 Rejected,
159}
160
161impl WebhookIntakeStatus {
162 pub fn as_str(&self) -> &'static str {
163 match self {
164 Self::Accepted => "accepted",
165 Self::Duplicate => "duplicate",
166 Self::Rejected => "rejected",
167 }
168 }
169}
170
171#[derive(Clone, Debug, Serialize, Deserialize)]
173pub struct WebhookIntakeOutcome {
174 pub status: String,
175 pub intake_id: String,
176 pub topic: String,
177 pub delivery_id: Option<String>,
178 pub topic_event_id: Option<u64>,
179 pub reason: Option<String>,
180 pub received_at: String,
181}
182
183#[derive(Clone, Debug, Serialize, Deserialize)]
185pub struct WebhookIntakeSnapshot {
186 pub id: String,
187 pub path: Option<String>,
188 pub topic: String,
189 pub signature_header: String,
190 pub signature_prefix: Option<String>,
191 pub signature_encoding: String,
192 pub algorithm: String,
193 pub delivery_id_header: String,
194 pub dedupe_ttl_seconds: u64,
195}
196
197#[derive(Clone, Debug, Default)]
199pub struct WebhookIntakeRequest {
200 pub headers: BTreeMap<String, String>,
201 pub body: Vec<u8>,
202 pub path: Option<String>,
203 pub received_at: Option<OffsetDateTime>,
204}
205
206#[derive(Debug)]
207pub enum WebhookIntakeError {
208 Config(String),
209 UnknownIntake(String),
210 Internal(String),
211}
212
213impl std::fmt::Display for WebhookIntakeError {
214 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215 match self {
216 Self::Config(detail) => write!(f, "intake config: {detail}"),
217 Self::UnknownIntake(id) => write!(f, "intake `{id}` is not registered"),
218 Self::Internal(detail) => write!(f, "intake substrate: {detail}"),
219 }
220 }
221}
222
223impl std::error::Error for WebhookIntakeError {}
224
225#[derive(Clone)]
226struct RegisteredIntake {
227 snapshot: WebhookIntakeSnapshot,
228 config: WebhookIntakeConfig,
229}
230
231#[derive(Default)]
232struct WebhookIntakeRegistry {
233 by_id: HashMap<String, RegisteredIntake>,
234 by_path: HashMap<String, String>,
235}
236
237thread_local! {
238 static REGISTRY: RefCell<WebhookIntakeRegistry> =
239 RefCell::new(WebhookIntakeRegistry::default());
240}
241
242fn with_registry<R>(f: impl FnOnce(&WebhookIntakeRegistry) -> R) -> R {
243 REGISTRY.with(|slot| f(&slot.borrow()))
244}
245
246fn with_registry_mut<R>(f: impl FnOnce(&mut WebhookIntakeRegistry) -> R) -> R {
247 REGISTRY.with(|slot| f(&mut slot.borrow_mut()))
248}
249
250pub fn clear_webhook_intake_state() {
252 with_registry_mut(|state| {
253 state.by_id.clear();
254 state.by_path.clear();
255 });
256 reset_cached_inbox();
257}
258
259pub fn snapshot_webhook_intakes() -> Vec<WebhookIntakeSnapshot> {
261 with_registry(|state| {
262 let mut out: Vec<_> = state
263 .by_id
264 .values()
265 .map(|entry| entry.snapshot.clone())
266 .collect();
267 out.sort_by(|left, right| left.id.cmp(&right.id));
268 out
269 })
270}
271
272pub fn intake_for_path(path: &str) -> Option<String> {
274 with_registry(|state| state.by_path.get(path).cloned())
275}
276
277pub fn register_webhook_intake(
279 config: WebhookIntakeConfig,
280) -> Result<WebhookIntakeSnapshot, WebhookIntakeError> {
281 if config.signature_header.trim().is_empty() {
282 return Err(WebhookIntakeError::Config(
283 "signature_header cannot be empty".to_string(),
284 ));
285 }
286 if config.delivery_id_header.trim().is_empty() {
287 return Err(WebhookIntakeError::Config(
288 "delivery_id_header cannot be empty".to_string(),
289 ));
290 }
291 if config.topic.trim().is_empty() {
292 return Err(WebhookIntakeError::Config(
293 "topic cannot be empty".to_string(),
294 ));
295 }
296 if config.secret.is_empty() {
297 return Err(WebhookIntakeError::Config(
298 "secret cannot be empty".to_string(),
299 ));
300 }
301 Topic::new(config.topic.clone())
302 .map_err(|error| WebhookIntakeError::Config(format!("invalid topic: {error}")))?;
303
304 let id = config
305 .id
306 .clone()
307 .filter(|raw| !raw.trim().is_empty())
308 .unwrap_or_else(|| WebhookIntakeId::new().0);
309
310 with_registry_mut(|state| {
311 if state.by_id.contains_key(&id) {
312 return Err(WebhookIntakeError::Config(format!(
313 "intake `{id}` is already registered"
314 )));
315 }
316 if let Some(path) = config.path.as_ref() {
317 if state.by_path.contains_key(path) {
318 return Err(WebhookIntakeError::Config(format!(
319 "path `{path}` is already bound to intake `{}`",
320 state.by_path[path]
321 )));
322 }
323 }
324
325 let snapshot = WebhookIntakeSnapshot {
326 id: id.clone(),
327 path: config.path.clone(),
328 topic: config.topic.clone(),
329 signature_header: normalize_header(&config.signature_header),
330 signature_prefix: config.signature_prefix.clone(),
331 signature_encoding: config.signature_encoding.as_str().to_string(),
332 algorithm: config.algorithm.as_str().to_string(),
333 delivery_id_header: normalize_header(&config.delivery_id_header),
334 dedupe_ttl_seconds: config.dedupe_ttl.as_secs(),
335 };
336
337 let mut stored = config;
338 stored.id = Some(id.clone());
339 stored.signature_header = normalize_header(&stored.signature_header);
340 stored.delivery_id_header = normalize_header(&stored.delivery_id_header);
341
342 if let Some(path) = stored.path.as_ref() {
343 state.by_path.insert(path.clone(), id.clone());
344 }
345 state.by_id.insert(
346 id,
347 RegisteredIntake {
348 snapshot: snapshot.clone(),
349 config: stored,
350 },
351 );
352 Ok(snapshot)
353 })
354}
355
356pub fn deregister_webhook_intake(intake_id: &str) -> bool {
358 with_registry_mut(|state| {
359 let Some(entry) = state.by_id.remove(intake_id) else {
360 return false;
361 };
362 if let Some(path) = entry.snapshot.path {
363 state.by_path.remove(&path);
364 }
365 true
366 })
367}
368
369pub async fn feed_webhook_intake(
376 intake_id: &str,
377 request: WebhookIntakeRequest,
378) -> Result<WebhookIntakeOutcome, WebhookIntakeError> {
379 let entry = with_registry(|state| state.by_id.get(intake_id).cloned())
380 .ok_or_else(|| WebhookIntakeError::UnknownIntake(intake_id.to_string()))?;
381 let received_at = request.received_at.unwrap_or_else(OffsetDateTime::now_utc);
382 let received_at_str = format_rfc3339(received_at);
383 let log = ensure_event_log();
384 let topic = Topic::new(entry.snapshot.topic.clone())
385 .map_err(|error| WebhookIntakeError::Internal(format!("invalid topic: {error}")))?;
386
387 if let Some(expected_path) = entry.config.path.as_deref() {
388 if let Some(actual_path) = request.path.as_deref() {
389 if actual_path != expected_path {
390 let reason =
391 format!("path mismatch: expected `{expected_path}`, got `{actual_path}`");
392 emit_rejection(&log, &entry.snapshot, &reason, &received_at_str).await;
393 return Ok(WebhookIntakeOutcome {
394 status: WebhookIntakeStatus::Rejected.as_str().to_string(),
395 intake_id: entry.snapshot.id.clone(),
396 topic: entry.snapshot.topic.clone(),
397 delivery_id: None,
398 topic_event_id: None,
399 reason: Some(reason),
400 received_at: received_at_str,
401 });
402 }
403 }
404 }
405
406 let signature = match find_header(&request.headers, &entry.config.signature_header) {
407 Some(value) => value,
408 None => {
409 let reason = format!(
410 "missing signature header `{}`",
411 entry.config.signature_header
412 );
413 emit_rejection(&log, &entry.snapshot, &reason, &received_at_str).await;
414 return Ok(WebhookIntakeOutcome {
415 status: WebhookIntakeStatus::Rejected.as_str().to_string(),
416 intake_id: entry.snapshot.id.clone(),
417 topic: entry.snapshot.topic.clone(),
418 delivery_id: None,
419 topic_event_id: None,
420 reason: Some(reason),
421 received_at: received_at_str,
422 });
423 }
424 };
425
426 if let Err(reason) = verify_signature(&entry.config, signature.as_str(), &request.body) {
427 emit_rejection(&log, &entry.snapshot, &reason, &received_at_str).await;
428 return Ok(WebhookIntakeOutcome {
429 status: WebhookIntakeStatus::Rejected.as_str().to_string(),
430 intake_id: entry.snapshot.id.clone(),
431 topic: entry.snapshot.topic.clone(),
432 delivery_id: None,
433 topic_event_id: None,
434 reason: Some(reason),
435 received_at: received_at_str,
436 });
437 }
438
439 let delivery_id = match find_header(&request.headers, &entry.config.delivery_id_header) {
440 Some(value) if !value.trim().is_empty() => value,
441 _ => {
442 let reason = format!(
443 "missing delivery id header `{}`",
444 entry.config.delivery_id_header
445 );
446 emit_rejection(&log, &entry.snapshot, &reason, &received_at_str).await;
447 return Ok(WebhookIntakeOutcome {
448 status: WebhookIntakeStatus::Rejected.as_str().to_string(),
449 intake_id: entry.snapshot.id.clone(),
450 topic: entry.snapshot.topic.clone(),
451 delivery_id: None,
452 topic_event_id: None,
453 reason: Some(reason),
454 received_at: received_at_str,
455 });
456 }
457 };
458
459 let inbox = ensure_inbox().await?;
460 let claim_key = format!("intake:{}", entry.snapshot.id);
461 let claimed = inbox
462 .insert_if_new(&claim_key, &delivery_id, entry.config.dedupe_ttl)
463 .await
464 .map_err(|error| WebhookIntakeError::Internal(format!("inbox claim: {error}")))?;
465 if !claimed {
466 return Ok(WebhookIntakeOutcome {
467 status: WebhookIntakeStatus::Duplicate.as_str().to_string(),
468 intake_id: entry.snapshot.id.clone(),
469 topic: entry.snapshot.topic.clone(),
470 delivery_id: Some(delivery_id),
471 topic_event_id: None,
472 reason: None,
473 received_at: received_at_str,
474 });
475 }
476
477 let payload = serde_json::json!({
478 "intake_id": entry.snapshot.id,
479 "delivery_id": delivery_id,
480 "received_at": received_at_str,
481 "headers": request.headers,
482 "body_b64": BASE64_STANDARD.encode(&request.body),
483 "body_text": std::str::from_utf8(&request.body).ok().map(str::to_string),
484 "path": entry.config.path,
485 "signature_header": entry.config.signature_header,
486 "delivery_id_header": entry.config.delivery_id_header,
487 "algorithm": entry.config.algorithm.as_str(),
488 });
489 let mut headers_meta = BTreeMap::new();
490 headers_meta.insert("intake_id".to_string(), entry.snapshot.id.clone());
491 headers_meta.insert("delivery_id".to_string(), delivery_id.clone());
492 let event_id = log
493 .append(
494 &topic,
495 LogEvent::new("webhook_delivery", payload).with_headers(headers_meta),
496 )
497 .await
498 .map_err(|error| WebhookIntakeError::Internal(format!("topic append: {error}")))?;
499
500 Ok(WebhookIntakeOutcome {
501 status: WebhookIntakeStatus::Accepted.as_str().to_string(),
502 intake_id: entry.snapshot.id.clone(),
503 topic: entry.snapshot.topic.clone(),
504 delivery_id: Some(delivery_id),
505 topic_event_id: Some(event_id),
506 reason: None,
507 received_at: received_at_str,
508 })
509}
510
511pub async fn recent_webhook_deliveries(
514 intake_id: &str,
515 limit: usize,
516) -> Result<Vec<serde_json::Value>, WebhookIntakeError> {
517 let topic_name = with_registry(|state| {
518 state
519 .by_id
520 .get(intake_id)
521 .map(|entry| entry.snapshot.topic.clone())
522 })
523 .ok_or_else(|| WebhookIntakeError::UnknownIntake(intake_id.to_string()))?;
524 let log = ensure_event_log();
525 let topic = Topic::new(topic_name)
526 .map_err(|error| WebhookIntakeError::Internal(format!("invalid topic: {error}")))?;
527 let events = log
528 .read_range(&topic, None, usize::MAX)
529 .await
530 .map_err(|error| WebhookIntakeError::Internal(format!("topic read: {error}")))?;
531 let mut payloads: Vec<serde_json::Value> = events
532 .into_iter()
533 .filter(|(_, event)| {
534 event.kind == "webhook_delivery"
535 && event
536 .headers
537 .get("intake_id")
538 .map(|value| value == intake_id)
539 .unwrap_or(true)
540 })
541 .map(|(_, event)| event.payload)
542 .collect();
543 if payloads.len() > limit {
544 let drop = payloads.len() - limit;
545 payloads.drain(0..drop);
546 }
547 Ok(payloads)
548}
549
550fn verify_signature(
551 config: &WebhookIntakeConfig,
552 raw_signature: &str,
553 body: &[u8],
554) -> Result<(), String> {
555 let stripped = strip_prefix(raw_signature, config.signature_prefix.as_deref())?;
556 let provided = decode_signature(stripped, config.signature_encoding)?;
557 let expected = compute_hmac(config.algorithm, &config.secret, body);
558 if provided.len() != expected.len() {
559 return Err(format!(
560 "signature length mismatch (expected {} bytes, got {})",
561 expected.len(),
562 provided.len()
563 ));
564 }
565 if expected.ct_eq(&provided).into() {
566 Ok(())
567 } else {
568 Err("signature mismatch".to_string())
569 }
570}
571
572fn strip_prefix<'a>(raw: &'a str, prefix: Option<&str>) -> Result<&'a str, String> {
573 let trimmed = raw.trim();
574 match prefix {
575 Some(expected) if !expected.is_empty() => trimmed
576 .strip_prefix(expected)
577 .ok_or_else(|| format!("signature missing expected prefix `{expected}`")),
578 _ => Ok(trimmed),
579 }
580}
581
582fn decode_signature(raw: &str, encoding: SignatureEncoding) -> Result<Vec<u8>, String> {
583 match encoding {
584 SignatureEncoding::Hex => hex::decode(raw).map_err(|error| format!("invalid hex: {error}")),
585 SignatureEncoding::Base64 => BASE64_STANDARD
586 .decode(raw)
587 .map_err(|error| format!("invalid base64: {error}")),
588 }
589}
590
591fn compute_hmac(algorithm: HmacAlgorithm, secret: &[u8], data: &[u8]) -> Vec<u8> {
592 match algorithm {
593 HmacAlgorithm::Sha256 => crate::connectors::hmac::hmac_sha256(secret, data),
594 HmacAlgorithm::Sha1 => crate::connectors::hmac::hmac_sha1(secret, data),
595 }
596}
597
598fn find_header(headers: &BTreeMap<String, String>, name: &str) -> Option<String> {
599 let lower = name.to_ascii_lowercase();
600 headers
601 .iter()
602 .find(|(key, _)| key.to_ascii_lowercase() == lower)
603 .map(|(_, value)| value.clone())
604}
605
606fn normalize_header(name: &str) -> String {
607 name.trim().to_ascii_lowercase()
608}
609
610fn format_rfc3339(value: OffsetDateTime) -> String {
611 value.format(&Rfc3339).unwrap_or_default()
612}
613
614fn ensure_event_log() -> Arc<crate::event_log::AnyEventLog> {
615 active_event_log()
616 .unwrap_or_else(|| install_memory_for_current_thread(INTAKE_EVENT_LOG_QUEUE_DEPTH))
617}
618
619thread_local! {
620 static CACHED_INBOX: RefCell<Option<(Arc<crate::event_log::AnyEventLog>, Arc<InboxIndex>)>> =
621 const { RefCell::new(None) };
622}
623
624async fn ensure_inbox() -> Result<Arc<InboxIndex>, WebhookIntakeError> {
625 let log = ensure_event_log();
626 if let Some(existing) = CACHED_INBOX.with(|slot| {
627 slot.borrow()
628 .as_ref()
629 .filter(|(cached_log, _)| Arc::ptr_eq(cached_log, &log))
630 .map(|(_, inbox)| inbox.clone())
631 }) {
632 return Ok(existing);
633 }
634 let metrics = Arc::new(MetricsRegistry::default());
635 let inbox = Arc::new(
636 InboxIndex::new(log.clone(), metrics)
637 .await
638 .map_err(|error| WebhookIntakeError::Internal(format!("inbox init: {error}")))?,
639 );
640 CACHED_INBOX.with(|slot| {
641 *slot.borrow_mut() = Some((log, inbox.clone()));
642 });
643 Ok(inbox)
644}
645
646fn reset_cached_inbox() {
647 CACHED_INBOX.with(|slot| *slot.borrow_mut() = None);
648}
649
650async fn emit_rejection(
651 log: &Arc<crate::event_log::AnyEventLog>,
652 snapshot: &WebhookIntakeSnapshot,
653 reason: &str,
654 received_at: &str,
655) {
656 let topic = match Topic::new(REJECTION_TOPIC) {
657 Ok(topic) => topic,
658 Err(_) => return,
659 };
660 let mut headers = BTreeMap::new();
661 headers.insert("intake_id".to_string(), snapshot.id.clone());
662 let payload = serde_json::json!({
663 "intake_id": snapshot.id,
664 "topic": snapshot.topic,
665 "path": snapshot.path,
666 "reason": reason,
667 "received_at": received_at,
668 });
669 let _ = log
670 .append(
671 &topic,
672 LogEvent::new("webhook_intake_rejected", payload).with_headers(headers),
673 )
674 .await;
675}
676
677pub fn build_request(
681 headers: BTreeMap<String, String>,
682 body: Vec<u8>,
683 path: Option<String>,
684 received_at: Option<OffsetDateTime>,
685) -> WebhookIntakeRequest {
686 WebhookIntakeRequest {
687 headers,
688 body,
689 path,
690 received_at,
691 }
692}
693
694#[cfg(test)]
695mod tests {
696 use super::*;
697 use crate::event_log::{install_default_for_base_dir, AnyEventLog, FileEventLog};
698
699 fn make_config(secret: &[u8]) -> WebhookIntakeConfig {
700 WebhookIntakeConfig {
701 id: None,
702 path: None,
703 signature_header: "x-test-signature".to_string(),
704 signature_prefix: Some("sha256=".to_string()),
705 signature_encoding: SignatureEncoding::Hex,
706 algorithm: HmacAlgorithm::Sha256,
707 secret: secret.to_vec(),
708 delivery_id_header: "x-test-delivery".to_string(),
709 topic: "tests.webhook_intake".to_string(),
710 dedupe_ttl: StdDuration::from_secs(60),
711 }
712 }
713
714 fn signed_headers(
715 config: &WebhookIntakeConfig,
716 body: &[u8],
717 delivery_id: &str,
718 ) -> BTreeMap<String, String> {
719 let digest = compute_hmac(config.algorithm, &config.secret, body);
720 let signature = match config.signature_encoding {
721 SignatureEncoding::Hex => hex::encode(&digest),
722 SignatureEncoding::Base64 => BASE64_STANDARD.encode(&digest),
723 };
724 let prefix = config.signature_prefix.clone().unwrap_or_default();
725 let mut headers = BTreeMap::new();
726 headers.insert(
727 config.signature_header.clone(),
728 format!("{prefix}{signature}"),
729 );
730 headers.insert(config.delivery_id_header.clone(), delivery_id.to_string());
731 headers
732 }
733
734 async fn reset() {
735 clear_webhook_intake_state();
736 crate::event_log::reset_active_event_log();
737 }
738
739 #[tokio::test(flavor = "current_thread")]
740 async fn round_trip_accepts_then_dedupes() {
741 reset().await;
742 let body = br#"{"hello":"world"}"#.to_vec();
743 let config = make_config(b"super-secret");
744 let snapshot = register_webhook_intake(config.clone()).expect("register");
745
746 let headers = signed_headers(&config, &body, "delivery-1");
747 let outcome = feed_webhook_intake(
748 snapshot.id.as_str(),
749 build_request(headers.clone(), body.clone(), None, None),
750 )
751 .await
752 .expect("feed");
753 assert_eq!(outcome.status, "accepted");
754 assert_eq!(outcome.delivery_id.as_deref(), Some("delivery-1"));
755 assert!(outcome.topic_event_id.is_some());
756
757 let dup = feed_webhook_intake(
758 snapshot.id.as_str(),
759 build_request(headers, body, None, None),
760 )
761 .await
762 .expect("feed dup");
763 assert_eq!(dup.status, "duplicate");
764 }
765
766 #[tokio::test(flavor = "current_thread")]
767 async fn rejects_bad_signature() {
768 reset().await;
769 let body = b"payload".to_vec();
770 let config = make_config(b"correct-secret");
771 let snapshot = register_webhook_intake(config.clone()).expect("register");
772
773 let mut headers = signed_headers(&config, &body, "delivery-1");
774 headers.insert(
775 config.signature_header.clone(),
776 "sha256=deadbeef".to_string(),
777 );
778 let outcome = feed_webhook_intake(
779 snapshot.id.as_str(),
780 build_request(headers, body, None, None),
781 )
782 .await
783 .expect("feed");
784 assert_eq!(outcome.status, "rejected");
785 assert!(outcome
786 .reason
787 .as_deref()
788 .unwrap_or("")
789 .contains("signature"));
790 }
791
792 #[tokio::test(flavor = "current_thread")]
793 async fn isolates_two_intakes_on_different_paths() {
794 reset().await;
795 let body = b"shared-body".to_vec();
796 let mut config_a = make_config(b"secret-a");
797 config_a.path = Some("/hooks/a".to_string());
798 config_a.topic = "tests.intake_a".to_string();
799 config_a.delivery_id_header = "x-a-delivery".to_string();
800
801 let mut config_b = make_config(b"secret-b");
802 config_b.path = Some("/hooks/b".to_string());
803 config_b.topic = "tests.intake_b".to_string();
804 config_b.delivery_id_header = "x-b-delivery".to_string();
805
806 let snap_a = register_webhook_intake(config_a.clone()).expect("register a");
807 let snap_b = register_webhook_intake(config_b.clone()).expect("register b");
808
809 let headers_a = signed_headers(&config_a, &body, "shared-delivery");
811 let outcome_a = feed_webhook_intake(
812 snap_a.id.as_str(),
813 build_request(headers_a, body.clone(), Some("/hooks/a".to_string()), None),
814 )
815 .await
816 .expect("feed a");
817 assert_eq!(outcome_a.status, "accepted");
818
819 let headers_b = signed_headers(&config_b, &body, "shared-delivery");
820 let outcome_b = feed_webhook_intake(
821 snap_b.id.as_str(),
822 build_request(headers_b, body, Some("/hooks/b".to_string()), None),
823 )
824 .await
825 .expect("feed b");
826 assert_eq!(outcome_b.status, "accepted");
827
828 let body = b"more".to_vec();
830 let headers = signed_headers(&config_a, &body, "another-delivery");
831 let outcome = feed_webhook_intake(
832 snap_a.id.as_str(),
833 build_request(headers, body, Some("/hooks/wrong".to_string()), None),
834 )
835 .await
836 .expect("feed mismatch");
837 assert_eq!(outcome.status, "rejected");
838 assert!(outcome.reason.unwrap().contains("path mismatch"));
839 }
840
841 #[tokio::test(flavor = "current_thread")]
842 async fn supports_sha1_legacy() {
843 reset().await;
844 let body = b"legacy".to_vec();
845 let mut config = make_config(b"legacy-secret");
846 config.algorithm = HmacAlgorithm::Sha1;
847 config.signature_prefix = Some("sha1=".to_string());
848 let snapshot = register_webhook_intake(config.clone()).expect("register");
849
850 let headers = signed_headers(&config, &body, "legacy-delivery");
851 let outcome = feed_webhook_intake(
852 snapshot.id.as_str(),
853 build_request(headers, body, None, None),
854 )
855 .await
856 .expect("feed");
857 assert_eq!(outcome.status, "accepted");
858 }
859
860 #[tokio::test(flavor = "current_thread")]
861 async fn dedupe_survives_process_restart() {
862 clear_webhook_intake_state();
863 crate::event_log::reset_active_event_log();
864 let tmp = tempfile::tempdir().expect("tempdir");
865 let log = Arc::new(AnyEventLog::File(
867 FileEventLog::open(tmp.path().to_path_buf(), 32).expect("file log"),
868 ));
869 crate::event_log::install_active_event_log(log.clone());
870
871 let body = br#"{"hello":"durable"}"#.to_vec();
872 let mut config = make_config(b"durable-secret");
873 config.id = Some("durable-intake".to_string());
876 let snapshot = register_webhook_intake(config.clone()).expect("register");
877 let headers = signed_headers(&config, &body, "durable-1");
878
879 let first = feed_webhook_intake(
880 snapshot.id.as_str(),
881 build_request(headers.clone(), body.clone(), None, None),
882 )
883 .await
884 .expect("first feed");
885 assert_eq!(first.status, "accepted");
886
887 clear_webhook_intake_state();
891 crate::event_log::reset_active_event_log();
892 let log = Arc::new(AnyEventLog::File(
893 FileEventLog::open(tmp.path().to_path_buf(), 32).expect("file log"),
894 ));
895 crate::event_log::install_active_event_log(log);
896 let snapshot = register_webhook_intake(config.clone()).expect("re-register");
897
898 let second = feed_webhook_intake(
899 snapshot.id.as_str(),
900 build_request(headers, body, None, None),
901 )
902 .await
903 .expect("second feed");
904 assert_eq!(second.status, "duplicate");
905
906 let _log = install_default_for_base_dir(tmp.path()).expect("install default");
909 }
910}