Skip to main content

harn_vm/triggers/
webhook_intake.rs

1//! Forge-agnostic webhook intake substrate.
2//!
3//! This is the lowest-level layer connectors can wire into to absorb webhook
4//! deliveries. It is deliberately ignorant of any specific provider (GitHub,
5//! GitLab, Linear, Slack, Stripe, ...). A connector declares:
6//!
7//! * a path scope (e.g. `/hooks/github`)
8//! * a signature header + algorithm + format the substrate should verify
9//! * a delivery-id header the substrate should dedupe on
10//! * a topic to republish accepted deliveries onto
11//!
12//! and the substrate handles the rest: HMAC verification, delivery-id
13//! deduplication (durable across process restarts via the trigger inbox), and
14//! republishing to the chosen event-log topic. Per-forge event normalization
15//! lives in the connector that consumes the topic.
16//!
17//! The substrate is process-global, mirroring the rest of the trigger runtime.
18//! It is exposed to Harn scripts through the `webhook_intake_*` builtins.
19//
20// Design notes:
21// - A connector should be able to wire intake in <30 lines of Harn (issue
22//   #1011). The Harn-facing surface is therefore a single `register` call that
23//   takes a config dict, plus `feed` for hand-driving the substrate from a
24//   handler or a test fixture.
25// - Dedupe survives process restart because we delegate to the existing
26//   `InboxIndex`, which rehydrates from the durable claim topic on construction.
27// - Multiple connectors on different paths must not collide — each intake gets
28//   its own intake id, and the inbox dedupe is namespaced by intake id.
29
30use std::cell::RefCell;
31use std::collections::{BTreeMap, HashMap};
32use std::sync::Arc;
33use std::time::Duration as StdDuration;
34
35use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
36use base64::Engine;
37use serde::{Deserialize, Serialize};
38use subtle::ConstantTimeEq;
39use time::format_description::well_known::Rfc3339;
40use time::OffsetDateTime;
41use uuid::Uuid;
42
43use crate::connectors::MetricsRegistry;
44use crate::event_log::{
45    active_event_log, install_memory_for_current_thread, EventLog, LogEvent, Topic,
46};
47use crate::runtime_limits::RuntimeLimits;
48use crate::triggers::inbox::InboxIndex;
49
50/// Default delivery-id retention used when callers omit the field. Matches the
51/// 24-hour window most providers retry within.
52pub const DEFAULT_DEDUPE_TTL_SECS: u64 = 24 * 60 * 60;
53/// Topic the substrate appends `webhook_intake_rejected` audit events on.
54pub const REJECTION_TOPIC: &str = "triggers.webhook_intake.rejections";
55const INTAKE_EVENT_LOG_QUEUE_DEPTH: usize = RuntimeLimits::DEFAULT.default_event_log_queue_depth;
56
57/// Identifier of a registered intake. Stable for the lifetime of the process,
58/// or until `webhook_intake_deregister` is called.
59#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
60pub struct WebhookIntakeId(pub String);
61
62impl WebhookIntakeId {
63    pub fn new() -> Self {
64        Self(format!("intake_{}", Uuid::now_v7()))
65    }
66
67    pub fn as_str(&self) -> &str {
68        &self.0
69    }
70}
71
72impl Default for WebhookIntakeId {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78/// Hash algorithm used to recompute the HMAC over the request body.
79#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
80#[serde(rename_all = "snake_case")]
81pub enum HmacAlgorithm {
82    #[default]
83    Sha256,
84    Sha1,
85}
86
87impl HmacAlgorithm {
88    pub fn parse(raw: &str) -> Option<Self> {
89        Self::parse_with_legacy_sha1(raw, false)
90    }
91
92    pub fn parse_with_legacy_sha1(raw: &str, allow_legacy_sha1: bool) -> Option<Self> {
93        match raw.trim().to_ascii_lowercase().as_str() {
94            "" | "sha256" | "hmac-sha256" => Some(Self::Sha256),
95            "sha1" | "hmac-sha1" if allow_legacy_sha1 => Some(Self::Sha1),
96            _ => None,
97        }
98    }
99
100    pub fn is_legacy_sha1_alias(raw: &str) -> bool {
101        matches!(
102            raw.trim().to_ascii_lowercase().as_str(),
103            "sha1" | "hmac-sha1"
104        )
105    }
106
107    pub fn as_str(&self) -> &'static str {
108        match self {
109            Self::Sha256 => "sha256",
110            Self::Sha1 => "sha1",
111        }
112    }
113}
114
115/// Encoding the wire signature is presented in.
116#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
117#[serde(rename_all = "snake_case")]
118pub enum SignatureEncoding {
119    #[default]
120    Hex,
121    Base64,
122}
123
124impl SignatureEncoding {
125    pub fn parse(raw: &str) -> Option<Self> {
126        match raw.trim().to_ascii_lowercase().as_str() {
127            "" | "hex" | "base16" => Some(Self::Hex),
128            "base64" | "b64" => Some(Self::Base64),
129            _ => None,
130        }
131    }
132
133    pub fn as_str(&self) -> &'static str {
134        match self {
135            Self::Hex => "hex",
136            Self::Base64 => "base64",
137        }
138    }
139}
140
141/// Configuration for a single registered intake.
142#[derive(Clone, Debug)]
143pub struct WebhookIntakeConfig {
144    pub id: Option<String>,
145    pub path: Option<String>,
146    pub signature_header: String,
147    pub signature_prefix: Option<String>,
148    pub signature_encoding: SignatureEncoding,
149    pub algorithm: HmacAlgorithm,
150    pub allow_legacy_sha1: bool,
151    pub secret: Vec<u8>,
152    pub delivery_id_header: String,
153    pub topic: String,
154    pub dedupe_ttl: StdDuration,
155}
156
157impl WebhookIntakeConfig {
158    /// Common forges (GitHub, GitLab, Bitbucket) prefix the digest with the
159    /// algorithm name. Helper for callers that want the same default the
160    /// stdlib parser applies when `signature_prefix` is omitted.
161    pub fn default_prefix(algorithm: HmacAlgorithm) -> Option<String> {
162        Some(format!("{}=", algorithm.as_str()))
163    }
164}
165
166/// What the substrate did with a delivery.
167#[derive(Clone, Debug, PartialEq, Eq)]
168pub enum WebhookIntakeStatus {
169    Accepted,
170    Duplicate,
171    Rejected,
172}
173
174impl WebhookIntakeStatus {
175    pub fn as_str(&self) -> &'static str {
176        match self {
177            Self::Accepted => "accepted",
178            Self::Duplicate => "duplicate",
179            Self::Rejected => "rejected",
180        }
181    }
182}
183
184/// Result of feeding a delivery through the substrate.
185#[derive(Clone, Debug, Serialize, Deserialize)]
186pub struct WebhookIntakeOutcome {
187    pub status: String,
188    pub intake_id: String,
189    pub topic: String,
190    pub delivery_id: Option<String>,
191    pub topic_event_id: Option<u64>,
192    pub reason: Option<String>,
193    pub received_at: String,
194}
195
196/// Snapshot of an intake registration. Returned by `register` and `list`.
197#[derive(Clone, Debug, Serialize, Deserialize)]
198pub struct WebhookIntakeSnapshot {
199    pub id: String,
200    pub path: Option<String>,
201    pub topic: String,
202    pub signature_header: String,
203    pub signature_prefix: Option<String>,
204    pub signature_encoding: String,
205    pub algorithm: String,
206    pub allow_legacy_sha1: bool,
207    pub delivery_id_header: String,
208    pub dedupe_ttl_seconds: u64,
209}
210
211/// Raw inbound delivery as fed to the substrate.
212#[derive(Clone, Debug, Default)]
213pub struct WebhookIntakeRequest {
214    pub headers: BTreeMap<String, String>,
215    pub body: Vec<u8>,
216    pub path: Option<String>,
217    pub received_at: Option<OffsetDateTime>,
218}
219
220#[derive(Debug)]
221pub enum WebhookIntakeError {
222    Config(String),
223    UnknownIntake(String),
224    Internal(String),
225}
226
227impl std::fmt::Display for WebhookIntakeError {
228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229        match self {
230            Self::Config(detail) => write!(f, "intake config: {detail}"),
231            Self::UnknownIntake(id) => write!(f, "intake `{id}` is not registered"),
232            Self::Internal(detail) => write!(f, "intake substrate: {detail}"),
233        }
234    }
235}
236
237impl std::error::Error for WebhookIntakeError {}
238
239#[derive(Clone)]
240struct RegisteredIntake {
241    snapshot: WebhookIntakeSnapshot,
242    config: WebhookIntakeConfig,
243}
244
245#[derive(Default)]
246struct WebhookIntakeRegistry {
247    by_id: HashMap<String, RegisteredIntake>,
248    by_path: HashMap<String, String>,
249}
250
251thread_local! {
252    static REGISTRY: RefCell<WebhookIntakeRegistry> =
253        RefCell::new(WebhookIntakeRegistry::default());
254}
255
256fn with_registry<R>(f: impl FnOnce(&WebhookIntakeRegistry) -> R) -> R {
257    REGISTRY.with(|slot| f(&slot.borrow()))
258}
259
260fn with_registry_mut<R>(f: impl FnOnce(&mut WebhookIntakeRegistry) -> R) -> R {
261    REGISTRY.with(|slot| f(&mut slot.borrow_mut()))
262}
263
264/// Reset all in-process intake state. Called between conformance tests.
265pub fn clear_webhook_intake_state() {
266    with_registry_mut(|state| {
267        state.by_id.clear();
268        state.by_path.clear();
269    });
270    reset_cached_inbox();
271}
272
273/// Snapshot of all currently-registered intakes.
274pub fn snapshot_webhook_intakes() -> Vec<WebhookIntakeSnapshot> {
275    with_registry(|state| {
276        let mut out: Vec<_> = state
277            .by_id
278            .values()
279            .map(|entry| entry.snapshot.clone())
280            .collect();
281        out.sort_by(|left, right| left.id.cmp(&right.id));
282        out
283    })
284}
285
286/// Resolve the intake registered against `path`, if any.
287pub fn intake_for_path(path: &str) -> Option<String> {
288    with_registry(|state| state.by_path.get(path).cloned())
289}
290
291/// Register a new intake. Returns its assigned id.
292pub fn register_webhook_intake(
293    config: WebhookIntakeConfig,
294) -> Result<WebhookIntakeSnapshot, WebhookIntakeError> {
295    if config.signature_header.trim().is_empty() {
296        return Err(WebhookIntakeError::Config(
297            "signature_header cannot be empty".to_string(),
298        ));
299    }
300    if config.delivery_id_header.trim().is_empty() {
301        return Err(WebhookIntakeError::Config(
302            "delivery_id_header cannot be empty".to_string(),
303        ));
304    }
305    if config.topic.trim().is_empty() {
306        return Err(WebhookIntakeError::Config(
307            "topic cannot be empty".to_string(),
308        ));
309    }
310    if config.secret.is_empty() {
311        return Err(WebhookIntakeError::Config(
312            "secret cannot be empty".to_string(),
313        ));
314    }
315    if config.algorithm == HmacAlgorithm::Sha1 && !config.allow_legacy_sha1 {
316        return Err(WebhookIntakeError::Config(
317            "algorithm `sha1` is legacy; set `allow_legacy_sha1: true` to verify SHA-1 HMAC signatures for an existing provider".to_string(),
318        ));
319    }
320    Topic::new(config.topic.clone())
321        .map_err(|error| WebhookIntakeError::Config(format!("invalid topic: {error}")))?;
322
323    let id = config
324        .id
325        .clone()
326        .filter(|raw| !raw.trim().is_empty())
327        .unwrap_or_else(|| WebhookIntakeId::new().0);
328
329    with_registry_mut(|state| {
330        if state.by_id.contains_key(&id) {
331            return Err(WebhookIntakeError::Config(format!(
332                "intake `{id}` is already registered"
333            )));
334        }
335        if let Some(path) = config.path.as_ref() {
336            if state.by_path.contains_key(path) {
337                return Err(WebhookIntakeError::Config(format!(
338                    "path `{path}` is already bound to intake `{}`",
339                    state.by_path[path]
340                )));
341            }
342        }
343
344        let snapshot = WebhookIntakeSnapshot {
345            id: id.clone(),
346            path: config.path.clone(),
347            topic: config.topic.clone(),
348            signature_header: normalize_header(&config.signature_header),
349            signature_prefix: config.signature_prefix.clone(),
350            signature_encoding: config.signature_encoding.as_str().to_string(),
351            algorithm: config.algorithm.as_str().to_string(),
352            allow_legacy_sha1: config.allow_legacy_sha1,
353            delivery_id_header: normalize_header(&config.delivery_id_header),
354            dedupe_ttl_seconds: config.dedupe_ttl.as_secs(),
355        };
356
357        let mut stored = config;
358        stored.id = Some(id.clone());
359        stored.signature_header = normalize_header(&stored.signature_header);
360        stored.delivery_id_header = normalize_header(&stored.delivery_id_header);
361
362        if let Some(path) = stored.path.as_ref() {
363            state.by_path.insert(path.clone(), id.clone());
364        }
365        state.by_id.insert(
366            id,
367            RegisteredIntake {
368                snapshot: snapshot.clone(),
369                config: stored,
370            },
371        );
372        Ok(snapshot)
373    })
374}
375
376/// Remove an intake. Returns `true` if it existed.
377pub fn deregister_webhook_intake(intake_id: &str) -> bool {
378    with_registry_mut(|state| {
379        let Some(entry) = state.by_id.remove(intake_id) else {
380            return false;
381        };
382        if let Some(path) = entry.snapshot.path {
383            state.by_path.remove(&path);
384        }
385        true
386    })
387}
388
389/// Feed a delivery through the substrate.
390///
391/// On `Accepted`, the substrate has appended a `webhook_delivery` event to the
392/// configured topic and claimed the delivery id in the inbox. Subsequent calls
393/// with the same delivery id within the dedupe TTL return `Duplicate`. Any
394/// signature failure (missing/invalid header, bad HMAC) returns `Rejected`.
395pub async fn feed_webhook_intake(
396    intake_id: &str,
397    request: WebhookIntakeRequest,
398) -> Result<WebhookIntakeOutcome, WebhookIntakeError> {
399    let entry = with_registry(|state| state.by_id.get(intake_id).cloned())
400        .ok_or_else(|| WebhookIntakeError::UnknownIntake(intake_id.to_string()))?;
401    let received_at = request.received_at.unwrap_or_else(OffsetDateTime::now_utc);
402    let received_at_str = format_rfc3339(received_at);
403    let log = ensure_event_log();
404    let topic = Topic::new(entry.snapshot.topic.clone())
405        .map_err(|error| WebhookIntakeError::Internal(format!("invalid topic: {error}")))?;
406
407    if let Some(expected_path) = entry.config.path.as_deref() {
408        if let Some(actual_path) = request.path.as_deref() {
409            if actual_path != expected_path {
410                let reason =
411                    format!("path mismatch: expected `{expected_path}`, got `{actual_path}`");
412                emit_rejection(&log, &entry.snapshot, &reason, &received_at_str).await;
413                return Ok(WebhookIntakeOutcome {
414                    status: WebhookIntakeStatus::Rejected.as_str().to_string(),
415                    intake_id: entry.snapshot.id.clone(),
416                    topic: entry.snapshot.topic.clone(),
417                    delivery_id: None,
418                    topic_event_id: None,
419                    reason: Some(reason),
420                    received_at: received_at_str,
421                });
422            }
423        }
424    }
425
426    let signature = match find_header(&request.headers, &entry.config.signature_header) {
427        Some(value) => value,
428        None => {
429            let reason = format!(
430                "missing signature header `{}`",
431                entry.config.signature_header
432            );
433            emit_rejection(&log, &entry.snapshot, &reason, &received_at_str).await;
434            return Ok(WebhookIntakeOutcome {
435                status: WebhookIntakeStatus::Rejected.as_str().to_string(),
436                intake_id: entry.snapshot.id.clone(),
437                topic: entry.snapshot.topic.clone(),
438                delivery_id: None,
439                topic_event_id: None,
440                reason: Some(reason),
441                received_at: received_at_str,
442            });
443        }
444    };
445
446    if let Err(reason) = verify_signature(&entry.config, signature.as_str(), &request.body) {
447        emit_rejection(&log, &entry.snapshot, &reason, &received_at_str).await;
448        return Ok(WebhookIntakeOutcome {
449            status: WebhookIntakeStatus::Rejected.as_str().to_string(),
450            intake_id: entry.snapshot.id.clone(),
451            topic: entry.snapshot.topic.clone(),
452            delivery_id: None,
453            topic_event_id: None,
454            reason: Some(reason),
455            received_at: received_at_str,
456        });
457    }
458
459    let delivery_id = match find_header(&request.headers, &entry.config.delivery_id_header) {
460        Some(value) if !value.trim().is_empty() => value,
461        _ => {
462            let reason = format!(
463                "missing delivery id header `{}`",
464                entry.config.delivery_id_header
465            );
466            emit_rejection(&log, &entry.snapshot, &reason, &received_at_str).await;
467            return Ok(WebhookIntakeOutcome {
468                status: WebhookIntakeStatus::Rejected.as_str().to_string(),
469                intake_id: entry.snapshot.id.clone(),
470                topic: entry.snapshot.topic.clone(),
471                delivery_id: None,
472                topic_event_id: None,
473                reason: Some(reason),
474                received_at: received_at_str,
475            });
476        }
477    };
478
479    let inbox = ensure_inbox().await?;
480    let claim_key = format!("intake:{}", entry.snapshot.id);
481    let claimed = inbox
482        .insert_if_new(&claim_key, &delivery_id, entry.config.dedupe_ttl)
483        .await
484        .map_err(|error| WebhookIntakeError::Internal(format!("inbox claim: {error}")))?;
485    if !claimed {
486        return Ok(WebhookIntakeOutcome {
487            status: WebhookIntakeStatus::Duplicate.as_str().to_string(),
488            intake_id: entry.snapshot.id.clone(),
489            topic: entry.snapshot.topic.clone(),
490            delivery_id: Some(delivery_id),
491            topic_event_id: None,
492            reason: None,
493            received_at: received_at_str,
494        });
495    }
496
497    let payload = serde_json::json!({
498        "intake_id": entry.snapshot.id,
499        "delivery_id": delivery_id,
500        "received_at": received_at_str,
501        "headers": request.headers,
502        "body_b64": BASE64_STANDARD.encode(&request.body),
503        "body_text": std::str::from_utf8(&request.body).ok().map(str::to_string),
504        "path": entry.config.path,
505        "signature_header": entry.config.signature_header,
506        "delivery_id_header": entry.config.delivery_id_header,
507        "algorithm": entry.config.algorithm.as_str(),
508    });
509    let mut headers_meta = BTreeMap::new();
510    headers_meta.insert("intake_id".to_string(), entry.snapshot.id.clone());
511    headers_meta.insert("delivery_id".to_string(), delivery_id.clone());
512    let event_id = log
513        .append(
514            &topic,
515            LogEvent::new("webhook_delivery", payload).with_headers(headers_meta),
516        )
517        .await
518        .map_err(|error| WebhookIntakeError::Internal(format!("topic append: {error}")))?;
519
520    Ok(WebhookIntakeOutcome {
521        status: WebhookIntakeStatus::Accepted.as_str().to_string(),
522        intake_id: entry.snapshot.id.clone(),
523        topic: entry.snapshot.topic.clone(),
524        delivery_id: Some(delivery_id),
525        topic_event_id: Some(event_id),
526        reason: None,
527        received_at: received_at_str,
528    })
529}
530
531/// Read the most recent `limit` accepted deliveries on an intake's topic.
532/// Provides the bounded replay buffer surface called out in #1011.
533pub async fn recent_webhook_deliveries(
534    intake_id: &str,
535    limit: usize,
536) -> Result<Vec<serde_json::Value>, WebhookIntakeError> {
537    let topic_name = with_registry(|state| {
538        state
539            .by_id
540            .get(intake_id)
541            .map(|entry| entry.snapshot.topic.clone())
542    })
543    .ok_or_else(|| WebhookIntakeError::UnknownIntake(intake_id.to_string()))?;
544    let log = ensure_event_log();
545    let topic = Topic::new(topic_name)
546        .map_err(|error| WebhookIntakeError::Internal(format!("invalid topic: {error}")))?;
547    let events = log
548        .read_range(&topic, None, usize::MAX)
549        .await
550        .map_err(|error| WebhookIntakeError::Internal(format!("topic read: {error}")))?;
551    let mut payloads: Vec<serde_json::Value> = events
552        .into_iter()
553        .filter(|(_, event)| {
554            event.kind == "webhook_delivery"
555                && event
556                    .headers
557                    .get("intake_id")
558                    .map(|value| value == intake_id)
559                    .unwrap_or(true)
560        })
561        .map(|(_, event)| event.payload)
562        .collect();
563    if payloads.len() > limit {
564        let drop = payloads.len() - limit;
565        payloads.drain(0..drop);
566    }
567    Ok(payloads)
568}
569
570fn verify_signature(
571    config: &WebhookIntakeConfig,
572    raw_signature: &str,
573    body: &[u8],
574) -> Result<(), String> {
575    let stripped = strip_prefix(raw_signature, config.signature_prefix.as_deref())?;
576    let provided = decode_signature(stripped, config.signature_encoding)?;
577    let expected = compute_hmac(config.algorithm, &config.secret, body);
578    if provided.len() != expected.len() {
579        return Err(format!(
580            "signature length mismatch (expected {} bytes, got {})",
581            expected.len(),
582            provided.len()
583        ));
584    }
585    if expected.ct_eq(&provided).into() {
586        Ok(())
587    } else {
588        Err("signature mismatch".to_string())
589    }
590}
591
592fn strip_prefix<'a>(raw: &'a str, prefix: Option<&str>) -> Result<&'a str, String> {
593    let trimmed = raw.trim();
594    match prefix {
595        Some(expected) if !expected.is_empty() => trimmed
596            .strip_prefix(expected)
597            .ok_or_else(|| format!("signature missing expected prefix `{expected}`")),
598        _ => Ok(trimmed),
599    }
600}
601
602fn decode_signature(raw: &str, encoding: SignatureEncoding) -> Result<Vec<u8>, String> {
603    match encoding {
604        SignatureEncoding::Hex => hex::decode(raw).map_err(|error| format!("invalid hex: {error}")),
605        SignatureEncoding::Base64 => BASE64_STANDARD
606            .decode(raw)
607            .map_err(|error| format!("invalid base64: {error}")),
608    }
609}
610
611fn compute_hmac(algorithm: HmacAlgorithm, secret: &[u8], data: &[u8]) -> Vec<u8> {
612    match algorithm {
613        HmacAlgorithm::Sha256 => crate::connectors::hmac::hmac_sha256(secret, data),
614        HmacAlgorithm::Sha1 => crate::connectors::hmac::hmac_sha1(secret, data),
615    }
616}
617
618fn find_header(headers: &BTreeMap<String, String>, name: &str) -> Option<String> {
619    let lower = name.to_ascii_lowercase();
620    headers
621        .iter()
622        .find(|(key, _)| key.to_ascii_lowercase() == lower)
623        .map(|(_, value)| value.clone())
624}
625
626fn normalize_header(name: &str) -> String {
627    name.trim().to_ascii_lowercase()
628}
629
630fn format_rfc3339(value: OffsetDateTime) -> String {
631    value.format(&Rfc3339).unwrap_or_default()
632}
633
634fn ensure_event_log() -> Arc<crate::event_log::AnyEventLog> {
635    active_event_log()
636        .unwrap_or_else(|| install_memory_for_current_thread(INTAKE_EVENT_LOG_QUEUE_DEPTH))
637}
638
639thread_local! {
640    static CACHED_INBOX: RefCell<Option<(Arc<crate::event_log::AnyEventLog>, Arc<InboxIndex>)>> =
641        const { RefCell::new(None) };
642}
643
644async fn ensure_inbox() -> Result<Arc<InboxIndex>, WebhookIntakeError> {
645    let log = ensure_event_log();
646    if let Some(existing) = CACHED_INBOX.with(|slot| {
647        slot.borrow()
648            .as_ref()
649            .filter(|(cached_log, _)| Arc::ptr_eq(cached_log, &log))
650            .map(|(_, inbox)| inbox.clone())
651    }) {
652        return Ok(existing);
653    }
654    let metrics = Arc::new(MetricsRegistry::default());
655    let inbox = Arc::new(
656        InboxIndex::new(log.clone(), metrics)
657            .await
658            .map_err(|error| WebhookIntakeError::Internal(format!("inbox init: {error}")))?,
659    );
660    CACHED_INBOX.with(|slot| {
661        *slot.borrow_mut() = Some((log, inbox.clone()));
662    });
663    Ok(inbox)
664}
665
666fn reset_cached_inbox() {
667    CACHED_INBOX.with(|slot| *slot.borrow_mut() = None);
668}
669
670async fn emit_rejection(
671    log: &Arc<crate::event_log::AnyEventLog>,
672    snapshot: &WebhookIntakeSnapshot,
673    reason: &str,
674    received_at: &str,
675) {
676    let topic = match Topic::new(REJECTION_TOPIC) {
677        Ok(topic) => topic,
678        Err(_) => return,
679    };
680    let mut headers = BTreeMap::new();
681    headers.insert("intake_id".to_string(), snapshot.id.clone());
682    let payload = serde_json::json!({
683        "intake_id": snapshot.id,
684        "topic": snapshot.topic,
685        "path": snapshot.path,
686        "reason": reason,
687        "received_at": received_at,
688    });
689    let _ = log
690        .append(
691            &topic,
692            LogEvent::new("webhook_intake_rejected", payload).with_headers(headers),
693        )
694        .await;
695}
696
697/// Helper: build a `WebhookIntakeRequest` from a `headers` dict, body, and
698/// optional fields. Used by the stdlib bindings to keep the parsing logic out
699/// of the substrate proper.
700pub fn build_request(
701    headers: BTreeMap<String, String>,
702    body: Vec<u8>,
703    path: Option<String>,
704    received_at: Option<OffsetDateTime>,
705) -> WebhookIntakeRequest {
706    WebhookIntakeRequest {
707        headers,
708        body,
709        path,
710        received_at,
711    }
712}
713
714#[cfg(test)]
715mod tests {
716    use super::*;
717    use crate::event_log::{install_default_for_base_dir, AnyEventLog, FileEventLog};
718
719    fn make_config(secret: &[u8]) -> WebhookIntakeConfig {
720        WebhookIntakeConfig {
721            id: None,
722            path: None,
723            signature_header: "x-test-signature".to_string(),
724            signature_prefix: Some("sha256=".to_string()),
725            signature_encoding: SignatureEncoding::Hex,
726            algorithm: HmacAlgorithm::Sha256,
727            allow_legacy_sha1: false,
728            secret: secret.to_vec(),
729            delivery_id_header: "x-test-delivery".to_string(),
730            topic: "tests.webhook_intake".to_string(),
731            dedupe_ttl: StdDuration::from_secs(60),
732        }
733    }
734
735    fn signed_headers(
736        config: &WebhookIntakeConfig,
737        body: &[u8],
738        delivery_id: &str,
739    ) -> BTreeMap<String, String> {
740        let digest = compute_hmac(config.algorithm, &config.secret, body);
741        let signature = match config.signature_encoding {
742            SignatureEncoding::Hex => hex::encode(&digest),
743            SignatureEncoding::Base64 => BASE64_STANDARD.encode(&digest),
744        };
745        let prefix = config.signature_prefix.clone().unwrap_or_default();
746        let mut headers = BTreeMap::new();
747        headers.insert(
748            config.signature_header.clone(),
749            format!("{prefix}{signature}"),
750        );
751        headers.insert(config.delivery_id_header.clone(), delivery_id.to_string());
752        headers
753    }
754
755    async fn reset() {
756        clear_webhook_intake_state();
757        crate::event_log::reset_active_event_log();
758    }
759
760    #[tokio::test(flavor = "current_thread")]
761    async fn round_trip_accepts_then_dedupes() {
762        reset().await;
763        let body = br#"{"hello":"world"}"#.to_vec();
764        let config = make_config(b"super-secret");
765        let snapshot = register_webhook_intake(config.clone()).expect("register");
766
767        let headers = signed_headers(&config, &body, "delivery-1");
768        let outcome = feed_webhook_intake(
769            snapshot.id.as_str(),
770            build_request(headers.clone(), body.clone(), None, None),
771        )
772        .await
773        .expect("feed");
774        assert_eq!(outcome.status, "accepted");
775        assert_eq!(outcome.delivery_id.as_deref(), Some("delivery-1"));
776        assert!(outcome.topic_event_id.is_some());
777
778        let dup = feed_webhook_intake(
779            snapshot.id.as_str(),
780            build_request(headers, body, None, None),
781        )
782        .await
783        .expect("feed dup");
784        assert_eq!(dup.status, "duplicate");
785    }
786
787    #[tokio::test(flavor = "current_thread")]
788    async fn rejects_bad_signature() {
789        reset().await;
790        let body = b"payload".to_vec();
791        let config = make_config(b"correct-secret");
792        let snapshot = register_webhook_intake(config.clone()).expect("register");
793
794        let mut headers = signed_headers(&config, &body, "delivery-1");
795        headers.insert(
796            config.signature_header.clone(),
797            "sha256=deadbeef".to_string(),
798        );
799        let outcome = feed_webhook_intake(
800            snapshot.id.as_str(),
801            build_request(headers, body, None, None),
802        )
803        .await
804        .expect("feed");
805        assert_eq!(outcome.status, "rejected");
806        assert!(outcome
807            .reason
808            .as_deref()
809            .unwrap_or("")
810            .contains("signature"));
811    }
812
813    #[tokio::test(flavor = "current_thread")]
814    async fn isolates_two_intakes_on_different_paths() {
815        reset().await;
816        let body = b"shared-body".to_vec();
817        let mut config_a = make_config(b"secret-a");
818        config_a.path = Some("/hooks/a".to_string());
819        config_a.topic = "tests.intake_a".to_string();
820        config_a.delivery_id_header = "x-a-delivery".to_string();
821
822        let mut config_b = make_config(b"secret-b");
823        config_b.path = Some("/hooks/b".to_string());
824        config_b.topic = "tests.intake_b".to_string();
825        config_b.delivery_id_header = "x-b-delivery".to_string();
826
827        let snap_a = register_webhook_intake(config_a.clone()).expect("register a");
828        let snap_b = register_webhook_intake(config_b.clone()).expect("register b");
829
830        // Same delivery id on both intakes must NOT collide.
831        let headers_a = signed_headers(&config_a, &body, "shared-delivery");
832        let outcome_a = feed_webhook_intake(
833            snap_a.id.as_str(),
834            build_request(headers_a, body.clone(), Some("/hooks/a".to_string()), None),
835        )
836        .await
837        .expect("feed a");
838        assert_eq!(outcome_a.status, "accepted");
839
840        let headers_b = signed_headers(&config_b, &body, "shared-delivery");
841        let outcome_b = feed_webhook_intake(
842            snap_b.id.as_str(),
843            build_request(headers_b, body, Some("/hooks/b".to_string()), None),
844        )
845        .await
846        .expect("feed b");
847        assert_eq!(outcome_b.status, "accepted");
848
849        // And path mismatch is rejected.
850        let body = b"more".to_vec();
851        let headers = signed_headers(&config_a, &body, "another-delivery");
852        let outcome = feed_webhook_intake(
853            snap_a.id.as_str(),
854            build_request(headers, body, Some("/hooks/wrong".to_string()), None),
855        )
856        .await
857        .expect("feed mismatch");
858        assert_eq!(outcome.status, "rejected");
859        assert!(outcome.reason.unwrap().contains("path mismatch"));
860    }
861
862    #[tokio::test(flavor = "current_thread")]
863    async fn supports_sha1_legacy() {
864        reset().await;
865        let body = b"legacy".to_vec();
866        let mut config = make_config(b"legacy-secret");
867        config.algorithm = HmacAlgorithm::Sha1;
868        config.allow_legacy_sha1 = true;
869        config.signature_prefix = Some("sha1=".to_string());
870        let snapshot = register_webhook_intake(config.clone()).expect("register");
871
872        let headers = signed_headers(&config, &body, "legacy-delivery");
873        let outcome = feed_webhook_intake(
874            snapshot.id.as_str(),
875            build_request(headers, body, None, None),
876        )
877        .await
878        .expect("feed");
879        assert_eq!(outcome.status, "accepted");
880    }
881
882    #[test]
883    fn hmac_algorithm_parser_gates_sha1() {
884        assert_eq!(HmacAlgorithm::parse("sha256"), Some(HmacAlgorithm::Sha256));
885        assert_eq!(HmacAlgorithm::parse("sha1"), None);
886        assert_eq!(
887            HmacAlgorithm::parse_with_legacy_sha1("sha1", true),
888            Some(HmacAlgorithm::Sha1)
889        );
890    }
891
892    #[tokio::test(flavor = "current_thread")]
893    async fn rejects_sha1_without_legacy_opt_in() {
894        reset().await;
895        let mut config = make_config(b"legacy-secret");
896        config.algorithm = HmacAlgorithm::Sha1;
897        config.signature_prefix = Some("sha1=".to_string());
898
899        let error = register_webhook_intake(config).expect_err("sha1 requires opt-in");
900        assert!(error.to_string().contains("set `allow_legacy_sha1: true`"));
901    }
902
903    #[tokio::test(flavor = "current_thread")]
904    async fn dedupe_survives_process_restart() {
905        clear_webhook_intake_state();
906        crate::event_log::reset_active_event_log();
907        let tmp = tempfile::tempdir().expect("tempdir");
908        // Install a file-backed event log so the inbox can rehydrate from disk.
909        let log = Arc::new(AnyEventLog::File(
910            FileEventLog::open(tmp.path().to_path_buf(), 32).expect("file log"),
911        ));
912        crate::event_log::install_active_event_log(log.clone());
913
914        let body = br#"{"hello":"durable"}"#.to_vec();
915        let mut config = make_config(b"durable-secret");
916        // Pin the id so the post-restart re-register uses the same dedupe
917        // namespace; without this each register call would mint a fresh id.
918        config.id = Some("durable-intake".to_string());
919        let snapshot = register_webhook_intake(config.clone()).expect("register");
920        let headers = signed_headers(&config, &body, "durable-1");
921
922        let first = feed_webhook_intake(
923            snapshot.id.as_str(),
924            build_request(headers.clone(), body.clone(), None, None),
925        )
926        .await
927        .expect("first feed");
928        assert_eq!(first.status, "accepted");
929
930        // Simulate a process restart by clearing in-memory intake state and the
931        // active event log handle, then reinstalling a fresh file-backed log on
932        // the same directory.
933        clear_webhook_intake_state();
934        crate::event_log::reset_active_event_log();
935        let log = Arc::new(AnyEventLog::File(
936            FileEventLog::open(tmp.path().to_path_buf(), 32).expect("file log"),
937        ));
938        crate::event_log::install_active_event_log(log);
939        let snapshot = register_webhook_intake(config.clone()).expect("re-register");
940
941        let second = feed_webhook_intake(
942            snapshot.id.as_str(),
943            build_request(headers, body, None, None),
944        )
945        .await
946        .expect("second feed");
947        assert_eq!(second.status, "duplicate");
948
949        // Use install_default_for_base_dir to keep parity with stdlib path
950        // (no assertion — just ensures the helper still works).
951        let _log = install_default_for_base_dir(tmp.path()).expect("install default");
952    }
953}