Skip to main content

harn_vm/triggers/
webhook_intake.rs

1//! Forge-agnostic webhook intake substrate.
2//!
3//! This is the lowest-level layer connectors can wire into to absorb webhook
4//! deliveries. It is deliberately ignorant of any specific provider (GitHub,
5//! GitLab, Linear, Slack, Stripe, ...). A connector declares:
6//!
7//! * a path scope (e.g. `/hooks/github`)
8//! * a signature header + algorithm + format the substrate should verify
9//! * a delivery-id header the substrate should dedupe on
10//! * a topic to republish accepted deliveries onto
11//!
12//! and the substrate handles the rest: HMAC verification, delivery-id
13//! deduplication (durable across process restarts via the trigger inbox), and
14//! republishing to the chosen event-log topic. Per-forge event normalization
15//! lives in the connector that consumes the topic.
16//!
17//! The substrate is process-global, mirroring the rest of the trigger runtime.
18//! It is exposed to Harn scripts through the `webhook_intake_*` builtins.
19//
20// Design notes:
21// - A connector should be able to wire intake in <30 lines of Harn (issue
22//   #1011). The Harn-facing surface is therefore a single `register` call that
23//   takes a config dict, plus `feed` for hand-driving the substrate from a
24//   handler or a test fixture.
25// - Dedupe survives process restart because we delegate to the existing
26//   `InboxIndex`, which rehydrates from the durable claim topic on construction.
27// - Multiple connectors on different paths must not collide — each intake gets
28//   its own intake id, and the inbox dedupe is namespaced by intake id.
29
30use std::cell::RefCell;
31use std::collections::{BTreeMap, HashMap};
32use std::sync::Arc;
33use std::time::Duration as StdDuration;
34
35use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
36use base64::Engine;
37use serde::{Deserialize, Serialize};
38use subtle::ConstantTimeEq;
39use time::format_description::well_known::Rfc3339;
40use time::OffsetDateTime;
41use uuid::Uuid;
42
43use crate::connectors::MetricsRegistry;
44use crate::event_log::{
45    active_event_log, install_memory_for_current_thread, EventLog, LogEvent, Topic,
46};
47use crate::triggers::inbox::InboxIndex;
48
49/// Default delivery-id retention used when callers omit the field. Matches the
50/// 24-hour window most providers retry within.
51pub const DEFAULT_DEDUPE_TTL_SECS: u64 = 24 * 60 * 60;
52/// Topic the substrate appends `webhook_intake_rejected` audit events on.
53pub const REJECTION_TOPIC: &str = "triggers.webhook_intake.rejections";
54const INTAKE_EVENT_LOG_QUEUE_DEPTH: usize = 128;
55
56/// Identifier of a registered intake. Stable for the lifetime of the process,
57/// or until `webhook_intake_deregister` is called.
58#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
59pub struct WebhookIntakeId(pub String);
60
61impl WebhookIntakeId {
62    pub fn new() -> Self {
63        Self(format!("intake_{}", Uuid::now_v7()))
64    }
65
66    pub fn as_str(&self) -> &str {
67        &self.0
68    }
69}
70
71impl Default for WebhookIntakeId {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77/// Hash algorithm used to recompute the HMAC over the request body.
78#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub enum HmacAlgorithm {
81    #[default]
82    Sha256,
83    Sha1,
84}
85
86impl HmacAlgorithm {
87    pub fn parse(raw: &str) -> Option<Self> {
88        match raw.trim().to_ascii_lowercase().as_str() {
89            "" | "sha256" | "hmac-sha256" => Some(Self::Sha256),
90            "sha1" | "hmac-sha1" => Some(Self::Sha1),
91            _ => None,
92        }
93    }
94
95    pub fn as_str(&self) -> &'static str {
96        match self {
97            Self::Sha256 => "sha256",
98            Self::Sha1 => "sha1",
99        }
100    }
101}
102
103/// Encoding the wire signature is presented in.
104#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub enum SignatureEncoding {
107    #[default]
108    Hex,
109    Base64,
110}
111
112impl SignatureEncoding {
113    pub fn parse(raw: &str) -> Option<Self> {
114        match raw.trim().to_ascii_lowercase().as_str() {
115            "" | "hex" | "base16" => Some(Self::Hex),
116            "base64" | "b64" => Some(Self::Base64),
117            _ => None,
118        }
119    }
120
121    pub fn as_str(&self) -> &'static str {
122        match self {
123            Self::Hex => "hex",
124            Self::Base64 => "base64",
125        }
126    }
127}
128
129/// Configuration for a single registered intake.
130#[derive(Clone, Debug)]
131pub struct WebhookIntakeConfig {
132    pub id: Option<String>,
133    pub path: Option<String>,
134    pub signature_header: String,
135    pub signature_prefix: Option<String>,
136    pub signature_encoding: SignatureEncoding,
137    pub algorithm: HmacAlgorithm,
138    pub secret: Vec<u8>,
139    pub delivery_id_header: String,
140    pub topic: String,
141    pub dedupe_ttl: StdDuration,
142}
143
144impl WebhookIntakeConfig {
145    /// Common forges (GitHub, GitLab, Bitbucket) prefix the digest with the
146    /// algorithm name. Helper for callers that want the same default the
147    /// stdlib parser applies when `signature_prefix` is omitted.
148    pub fn default_prefix(algorithm: HmacAlgorithm) -> Option<String> {
149        Some(format!("{}=", algorithm.as_str()))
150    }
151}
152
153/// What the substrate did with a delivery.
154#[derive(Clone, Debug, PartialEq, Eq)]
155pub enum WebhookIntakeStatus {
156    Accepted,
157    Duplicate,
158    Rejected,
159}
160
161impl WebhookIntakeStatus {
162    pub fn as_str(&self) -> &'static str {
163        match self {
164            Self::Accepted => "accepted",
165            Self::Duplicate => "duplicate",
166            Self::Rejected => "rejected",
167        }
168    }
169}
170
171/// Result of feeding a delivery through the substrate.
172#[derive(Clone, Debug, Serialize, Deserialize)]
173pub struct WebhookIntakeOutcome {
174    pub status: String,
175    pub intake_id: String,
176    pub topic: String,
177    pub delivery_id: Option<String>,
178    pub topic_event_id: Option<u64>,
179    pub reason: Option<String>,
180    pub received_at: String,
181}
182
183/// Snapshot of an intake registration. Returned by `register` and `list`.
184#[derive(Clone, Debug, Serialize, Deserialize)]
185pub struct WebhookIntakeSnapshot {
186    pub id: String,
187    pub path: Option<String>,
188    pub topic: String,
189    pub signature_header: String,
190    pub signature_prefix: Option<String>,
191    pub signature_encoding: String,
192    pub algorithm: String,
193    pub delivery_id_header: String,
194    pub dedupe_ttl_seconds: u64,
195}
196
197/// Raw inbound delivery as fed to the substrate.
198#[derive(Clone, Debug, Default)]
199pub struct WebhookIntakeRequest {
200    pub headers: BTreeMap<String, String>,
201    pub body: Vec<u8>,
202    pub path: Option<String>,
203    pub received_at: Option<OffsetDateTime>,
204}
205
206#[derive(Debug)]
207pub enum WebhookIntakeError {
208    Config(String),
209    UnknownIntake(String),
210    Internal(String),
211}
212
213impl std::fmt::Display for WebhookIntakeError {
214    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215        match self {
216            Self::Config(detail) => write!(f, "intake config: {detail}"),
217            Self::UnknownIntake(id) => write!(f, "intake `{id}` is not registered"),
218            Self::Internal(detail) => write!(f, "intake substrate: {detail}"),
219        }
220    }
221}
222
223impl std::error::Error for WebhookIntakeError {}
224
225#[derive(Clone)]
226struct RegisteredIntake {
227    snapshot: WebhookIntakeSnapshot,
228    config: WebhookIntakeConfig,
229}
230
231#[derive(Default)]
232struct WebhookIntakeRegistry {
233    by_id: HashMap<String, RegisteredIntake>,
234    by_path: HashMap<String, String>,
235}
236
237thread_local! {
238    static REGISTRY: RefCell<WebhookIntakeRegistry> =
239        RefCell::new(WebhookIntakeRegistry::default());
240}
241
242fn with_registry<R>(f: impl FnOnce(&WebhookIntakeRegistry) -> R) -> R {
243    REGISTRY.with(|slot| f(&slot.borrow()))
244}
245
246fn with_registry_mut<R>(f: impl FnOnce(&mut WebhookIntakeRegistry) -> R) -> R {
247    REGISTRY.with(|slot| f(&mut slot.borrow_mut()))
248}
249
250/// Reset all in-process intake state. Called between conformance tests.
251pub fn clear_webhook_intake_state() {
252    with_registry_mut(|state| {
253        state.by_id.clear();
254        state.by_path.clear();
255    });
256    reset_cached_inbox();
257}
258
259/// Snapshot of all currently-registered intakes.
260pub fn snapshot_webhook_intakes() -> Vec<WebhookIntakeSnapshot> {
261    with_registry(|state| {
262        let mut out: Vec<_> = state
263            .by_id
264            .values()
265            .map(|entry| entry.snapshot.clone())
266            .collect();
267        out.sort_by(|left, right| left.id.cmp(&right.id));
268        out
269    })
270}
271
272/// Resolve the intake registered against `path`, if any.
273pub fn intake_for_path(path: &str) -> Option<String> {
274    with_registry(|state| state.by_path.get(path).cloned())
275}
276
277/// Register a new intake. Returns its assigned id.
278pub fn register_webhook_intake(
279    config: WebhookIntakeConfig,
280) -> Result<WebhookIntakeSnapshot, WebhookIntakeError> {
281    if config.signature_header.trim().is_empty() {
282        return Err(WebhookIntakeError::Config(
283            "signature_header cannot be empty".to_string(),
284        ));
285    }
286    if config.delivery_id_header.trim().is_empty() {
287        return Err(WebhookIntakeError::Config(
288            "delivery_id_header cannot be empty".to_string(),
289        ));
290    }
291    if config.topic.trim().is_empty() {
292        return Err(WebhookIntakeError::Config(
293            "topic cannot be empty".to_string(),
294        ));
295    }
296    if config.secret.is_empty() {
297        return Err(WebhookIntakeError::Config(
298            "secret cannot be empty".to_string(),
299        ));
300    }
301    Topic::new(config.topic.clone())
302        .map_err(|error| WebhookIntakeError::Config(format!("invalid topic: {error}")))?;
303
304    let id = config
305        .id
306        .clone()
307        .filter(|raw| !raw.trim().is_empty())
308        .unwrap_or_else(|| WebhookIntakeId::new().0);
309
310    with_registry_mut(|state| {
311        if state.by_id.contains_key(&id) {
312            return Err(WebhookIntakeError::Config(format!(
313                "intake `{id}` is already registered"
314            )));
315        }
316        if let Some(path) = config.path.as_ref() {
317            if state.by_path.contains_key(path) {
318                return Err(WebhookIntakeError::Config(format!(
319                    "path `{path}` is already bound to intake `{}`",
320                    state.by_path[path]
321                )));
322            }
323        }
324
325        let snapshot = WebhookIntakeSnapshot {
326            id: id.clone(),
327            path: config.path.clone(),
328            topic: config.topic.clone(),
329            signature_header: normalize_header(&config.signature_header),
330            signature_prefix: config.signature_prefix.clone(),
331            signature_encoding: config.signature_encoding.as_str().to_string(),
332            algorithm: config.algorithm.as_str().to_string(),
333            delivery_id_header: normalize_header(&config.delivery_id_header),
334            dedupe_ttl_seconds: config.dedupe_ttl.as_secs(),
335        };
336
337        let mut stored = config;
338        stored.id = Some(id.clone());
339        stored.signature_header = normalize_header(&stored.signature_header);
340        stored.delivery_id_header = normalize_header(&stored.delivery_id_header);
341
342        if let Some(path) = stored.path.as_ref() {
343            state.by_path.insert(path.clone(), id.clone());
344        }
345        state.by_id.insert(
346            id,
347            RegisteredIntake {
348                snapshot: snapshot.clone(),
349                config: stored,
350            },
351        );
352        Ok(snapshot)
353    })
354}
355
356/// Remove an intake. Returns `true` if it existed.
357pub fn deregister_webhook_intake(intake_id: &str) -> bool {
358    with_registry_mut(|state| {
359        let Some(entry) = state.by_id.remove(intake_id) else {
360            return false;
361        };
362        if let Some(path) = entry.snapshot.path {
363            state.by_path.remove(&path);
364        }
365        true
366    })
367}
368
369/// Feed a delivery through the substrate.
370///
371/// On `Accepted`, the substrate has appended a `webhook_delivery` event to the
372/// configured topic and claimed the delivery id in the inbox. Subsequent calls
373/// with the same delivery id within the dedupe TTL return `Duplicate`. Any
374/// signature failure (missing/invalid header, bad HMAC) returns `Rejected`.
375pub async fn feed_webhook_intake(
376    intake_id: &str,
377    request: WebhookIntakeRequest,
378) -> Result<WebhookIntakeOutcome, WebhookIntakeError> {
379    let entry = with_registry(|state| state.by_id.get(intake_id).cloned())
380        .ok_or_else(|| WebhookIntakeError::UnknownIntake(intake_id.to_string()))?;
381    let received_at = request.received_at.unwrap_or_else(OffsetDateTime::now_utc);
382    let received_at_str = format_rfc3339(received_at);
383    let log = ensure_event_log();
384    let topic = Topic::new(entry.snapshot.topic.clone())
385        .map_err(|error| WebhookIntakeError::Internal(format!("invalid topic: {error}")))?;
386
387    if let Some(expected_path) = entry.config.path.as_deref() {
388        if let Some(actual_path) = request.path.as_deref() {
389            if actual_path != expected_path {
390                let reason =
391                    format!("path mismatch: expected `{expected_path}`, got `{actual_path}`");
392                emit_rejection(&log, &entry.snapshot, &reason, &received_at_str).await;
393                return Ok(WebhookIntakeOutcome {
394                    status: WebhookIntakeStatus::Rejected.as_str().to_string(),
395                    intake_id: entry.snapshot.id.clone(),
396                    topic: entry.snapshot.topic.clone(),
397                    delivery_id: None,
398                    topic_event_id: None,
399                    reason: Some(reason),
400                    received_at: received_at_str,
401                });
402            }
403        }
404    }
405
406    let signature = match find_header(&request.headers, &entry.config.signature_header) {
407        Some(value) => value,
408        None => {
409            let reason = format!(
410                "missing signature header `{}`",
411                entry.config.signature_header
412            );
413            emit_rejection(&log, &entry.snapshot, &reason, &received_at_str).await;
414            return Ok(WebhookIntakeOutcome {
415                status: WebhookIntakeStatus::Rejected.as_str().to_string(),
416                intake_id: entry.snapshot.id.clone(),
417                topic: entry.snapshot.topic.clone(),
418                delivery_id: None,
419                topic_event_id: None,
420                reason: Some(reason),
421                received_at: received_at_str,
422            });
423        }
424    };
425
426    if let Err(reason) = verify_signature(&entry.config, signature.as_str(), &request.body) {
427        emit_rejection(&log, &entry.snapshot, &reason, &received_at_str).await;
428        return Ok(WebhookIntakeOutcome {
429            status: WebhookIntakeStatus::Rejected.as_str().to_string(),
430            intake_id: entry.snapshot.id.clone(),
431            topic: entry.snapshot.topic.clone(),
432            delivery_id: None,
433            topic_event_id: None,
434            reason: Some(reason),
435            received_at: received_at_str,
436        });
437    }
438
439    let delivery_id = match find_header(&request.headers, &entry.config.delivery_id_header) {
440        Some(value) if !value.trim().is_empty() => value,
441        _ => {
442            let reason = format!(
443                "missing delivery id header `{}`",
444                entry.config.delivery_id_header
445            );
446            emit_rejection(&log, &entry.snapshot, &reason, &received_at_str).await;
447            return Ok(WebhookIntakeOutcome {
448                status: WebhookIntakeStatus::Rejected.as_str().to_string(),
449                intake_id: entry.snapshot.id.clone(),
450                topic: entry.snapshot.topic.clone(),
451                delivery_id: None,
452                topic_event_id: None,
453                reason: Some(reason),
454                received_at: received_at_str,
455            });
456        }
457    };
458
459    let inbox = ensure_inbox().await?;
460    let claim_key = format!("intake:{}", entry.snapshot.id);
461    let claimed = inbox
462        .insert_if_new(&claim_key, &delivery_id, entry.config.dedupe_ttl)
463        .await
464        .map_err(|error| WebhookIntakeError::Internal(format!("inbox claim: {error}")))?;
465    if !claimed {
466        return Ok(WebhookIntakeOutcome {
467            status: WebhookIntakeStatus::Duplicate.as_str().to_string(),
468            intake_id: entry.snapshot.id.clone(),
469            topic: entry.snapshot.topic.clone(),
470            delivery_id: Some(delivery_id),
471            topic_event_id: None,
472            reason: None,
473            received_at: received_at_str,
474        });
475    }
476
477    let payload = serde_json::json!({
478        "intake_id": entry.snapshot.id,
479        "delivery_id": delivery_id,
480        "received_at": received_at_str,
481        "headers": request.headers,
482        "body_b64": BASE64_STANDARD.encode(&request.body),
483        "body_text": std::str::from_utf8(&request.body).ok().map(str::to_string),
484        "path": entry.config.path,
485        "signature_header": entry.config.signature_header,
486        "delivery_id_header": entry.config.delivery_id_header,
487        "algorithm": entry.config.algorithm.as_str(),
488    });
489    let mut headers_meta = BTreeMap::new();
490    headers_meta.insert("intake_id".to_string(), entry.snapshot.id.clone());
491    headers_meta.insert("delivery_id".to_string(), delivery_id.clone());
492    let event_id = log
493        .append(
494            &topic,
495            LogEvent::new("webhook_delivery", payload).with_headers(headers_meta),
496        )
497        .await
498        .map_err(|error| WebhookIntakeError::Internal(format!("topic append: {error}")))?;
499
500    Ok(WebhookIntakeOutcome {
501        status: WebhookIntakeStatus::Accepted.as_str().to_string(),
502        intake_id: entry.snapshot.id.clone(),
503        topic: entry.snapshot.topic.clone(),
504        delivery_id: Some(delivery_id),
505        topic_event_id: Some(event_id),
506        reason: None,
507        received_at: received_at_str,
508    })
509}
510
511/// Read the most recent `limit` accepted deliveries on an intake's topic.
512/// Provides the bounded replay buffer surface called out in #1011.
513pub async fn recent_webhook_deliveries(
514    intake_id: &str,
515    limit: usize,
516) -> Result<Vec<serde_json::Value>, WebhookIntakeError> {
517    let topic_name = with_registry(|state| {
518        state
519            .by_id
520            .get(intake_id)
521            .map(|entry| entry.snapshot.topic.clone())
522    })
523    .ok_or_else(|| WebhookIntakeError::UnknownIntake(intake_id.to_string()))?;
524    let log = ensure_event_log();
525    let topic = Topic::new(topic_name)
526        .map_err(|error| WebhookIntakeError::Internal(format!("invalid topic: {error}")))?;
527    let events = log
528        .read_range(&topic, None, usize::MAX)
529        .await
530        .map_err(|error| WebhookIntakeError::Internal(format!("topic read: {error}")))?;
531    let mut payloads: Vec<serde_json::Value> = events
532        .into_iter()
533        .filter(|(_, event)| {
534            event.kind == "webhook_delivery"
535                && event
536                    .headers
537                    .get("intake_id")
538                    .map(|value| value == intake_id)
539                    .unwrap_or(true)
540        })
541        .map(|(_, event)| event.payload)
542        .collect();
543    if payloads.len() > limit {
544        let drop = payloads.len() - limit;
545        payloads.drain(0..drop);
546    }
547    Ok(payloads)
548}
549
550fn verify_signature(
551    config: &WebhookIntakeConfig,
552    raw_signature: &str,
553    body: &[u8],
554) -> Result<(), String> {
555    let stripped = strip_prefix(raw_signature, config.signature_prefix.as_deref())?;
556    let provided = decode_signature(stripped, config.signature_encoding)?;
557    let expected = compute_hmac(config.algorithm, &config.secret, body);
558    if provided.len() != expected.len() {
559        return Err(format!(
560            "signature length mismatch (expected {} bytes, got {})",
561            expected.len(),
562            provided.len()
563        ));
564    }
565    if expected.ct_eq(&provided).into() {
566        Ok(())
567    } else {
568        Err("signature mismatch".to_string())
569    }
570}
571
572fn strip_prefix<'a>(raw: &'a str, prefix: Option<&str>) -> Result<&'a str, String> {
573    let trimmed = raw.trim();
574    match prefix {
575        Some(expected) if !expected.is_empty() => trimmed
576            .strip_prefix(expected)
577            .ok_or_else(|| format!("signature missing expected prefix `{expected}`")),
578        _ => Ok(trimmed),
579    }
580}
581
582fn decode_signature(raw: &str, encoding: SignatureEncoding) -> Result<Vec<u8>, String> {
583    match encoding {
584        SignatureEncoding::Hex => hex::decode(raw).map_err(|error| format!("invalid hex: {error}")),
585        SignatureEncoding::Base64 => BASE64_STANDARD
586            .decode(raw)
587            .map_err(|error| format!("invalid base64: {error}")),
588    }
589}
590
591fn compute_hmac(algorithm: HmacAlgorithm, secret: &[u8], data: &[u8]) -> Vec<u8> {
592    match algorithm {
593        HmacAlgorithm::Sha256 => crate::connectors::hmac::hmac_sha256(secret, data),
594        HmacAlgorithm::Sha1 => crate::connectors::hmac::hmac_sha1(secret, data),
595    }
596}
597
598fn find_header(headers: &BTreeMap<String, String>, name: &str) -> Option<String> {
599    let lower = name.to_ascii_lowercase();
600    headers
601        .iter()
602        .find(|(key, _)| key.to_ascii_lowercase() == lower)
603        .map(|(_, value)| value.clone())
604}
605
606fn normalize_header(name: &str) -> String {
607    name.trim().to_ascii_lowercase()
608}
609
610fn format_rfc3339(value: OffsetDateTime) -> String {
611    value.format(&Rfc3339).unwrap_or_default()
612}
613
614fn ensure_event_log() -> Arc<crate::event_log::AnyEventLog> {
615    active_event_log()
616        .unwrap_or_else(|| install_memory_for_current_thread(INTAKE_EVENT_LOG_QUEUE_DEPTH))
617}
618
619thread_local! {
620    static CACHED_INBOX: RefCell<Option<(Arc<crate::event_log::AnyEventLog>, Arc<InboxIndex>)>> =
621        const { RefCell::new(None) };
622}
623
624async fn ensure_inbox() -> Result<Arc<InboxIndex>, WebhookIntakeError> {
625    let log = ensure_event_log();
626    if let Some(existing) = CACHED_INBOX.with(|slot| {
627        slot.borrow()
628            .as_ref()
629            .filter(|(cached_log, _)| Arc::ptr_eq(cached_log, &log))
630            .map(|(_, inbox)| inbox.clone())
631    }) {
632        return Ok(existing);
633    }
634    let metrics = Arc::new(MetricsRegistry::default());
635    let inbox = Arc::new(
636        InboxIndex::new(log.clone(), metrics)
637            .await
638            .map_err(|error| WebhookIntakeError::Internal(format!("inbox init: {error}")))?,
639    );
640    CACHED_INBOX.with(|slot| {
641        *slot.borrow_mut() = Some((log, inbox.clone()));
642    });
643    Ok(inbox)
644}
645
646fn reset_cached_inbox() {
647    CACHED_INBOX.with(|slot| *slot.borrow_mut() = None);
648}
649
650async fn emit_rejection(
651    log: &Arc<crate::event_log::AnyEventLog>,
652    snapshot: &WebhookIntakeSnapshot,
653    reason: &str,
654    received_at: &str,
655) {
656    let topic = match Topic::new(REJECTION_TOPIC) {
657        Ok(topic) => topic,
658        Err(_) => return,
659    };
660    let mut headers = BTreeMap::new();
661    headers.insert("intake_id".to_string(), snapshot.id.clone());
662    let payload = serde_json::json!({
663        "intake_id": snapshot.id,
664        "topic": snapshot.topic,
665        "path": snapshot.path,
666        "reason": reason,
667        "received_at": received_at,
668    });
669    let _ = log
670        .append(
671            &topic,
672            LogEvent::new("webhook_intake_rejected", payload).with_headers(headers),
673        )
674        .await;
675}
676
677/// Helper: build a `WebhookIntakeRequest` from a `headers` dict, body, and
678/// optional fields. Used by the stdlib bindings to keep the parsing logic out
679/// of the substrate proper.
680pub fn build_request(
681    headers: BTreeMap<String, String>,
682    body: Vec<u8>,
683    path: Option<String>,
684    received_at: Option<OffsetDateTime>,
685) -> WebhookIntakeRequest {
686    WebhookIntakeRequest {
687        headers,
688        body,
689        path,
690        received_at,
691    }
692}
693
694#[cfg(test)]
695mod tests {
696    use super::*;
697    use crate::event_log::{install_default_for_base_dir, AnyEventLog, FileEventLog};
698
699    fn make_config(secret: &[u8]) -> WebhookIntakeConfig {
700        WebhookIntakeConfig {
701            id: None,
702            path: None,
703            signature_header: "x-test-signature".to_string(),
704            signature_prefix: Some("sha256=".to_string()),
705            signature_encoding: SignatureEncoding::Hex,
706            algorithm: HmacAlgorithm::Sha256,
707            secret: secret.to_vec(),
708            delivery_id_header: "x-test-delivery".to_string(),
709            topic: "tests.webhook_intake".to_string(),
710            dedupe_ttl: StdDuration::from_secs(60),
711        }
712    }
713
714    fn signed_headers(
715        config: &WebhookIntakeConfig,
716        body: &[u8],
717        delivery_id: &str,
718    ) -> BTreeMap<String, String> {
719        let digest = compute_hmac(config.algorithm, &config.secret, body);
720        let signature = match config.signature_encoding {
721            SignatureEncoding::Hex => hex::encode(&digest),
722            SignatureEncoding::Base64 => BASE64_STANDARD.encode(&digest),
723        };
724        let prefix = config.signature_prefix.clone().unwrap_or_default();
725        let mut headers = BTreeMap::new();
726        headers.insert(
727            config.signature_header.clone(),
728            format!("{prefix}{signature}"),
729        );
730        headers.insert(config.delivery_id_header.clone(), delivery_id.to_string());
731        headers
732    }
733
734    async fn reset() {
735        clear_webhook_intake_state();
736        crate::event_log::reset_active_event_log();
737    }
738
739    #[tokio::test(flavor = "current_thread")]
740    async fn round_trip_accepts_then_dedupes() {
741        reset().await;
742        let body = br#"{"hello":"world"}"#.to_vec();
743        let config = make_config(b"super-secret");
744        let snapshot = register_webhook_intake(config.clone()).expect("register");
745
746        let headers = signed_headers(&config, &body, "delivery-1");
747        let outcome = feed_webhook_intake(
748            snapshot.id.as_str(),
749            build_request(headers.clone(), body.clone(), None, None),
750        )
751        .await
752        .expect("feed");
753        assert_eq!(outcome.status, "accepted");
754        assert_eq!(outcome.delivery_id.as_deref(), Some("delivery-1"));
755        assert!(outcome.topic_event_id.is_some());
756
757        let dup = feed_webhook_intake(
758            snapshot.id.as_str(),
759            build_request(headers, body, None, None),
760        )
761        .await
762        .expect("feed dup");
763        assert_eq!(dup.status, "duplicate");
764    }
765
766    #[tokio::test(flavor = "current_thread")]
767    async fn rejects_bad_signature() {
768        reset().await;
769        let body = b"payload".to_vec();
770        let config = make_config(b"correct-secret");
771        let snapshot = register_webhook_intake(config.clone()).expect("register");
772
773        let mut headers = signed_headers(&config, &body, "delivery-1");
774        headers.insert(
775            config.signature_header.clone(),
776            "sha256=deadbeef".to_string(),
777        );
778        let outcome = feed_webhook_intake(
779            snapshot.id.as_str(),
780            build_request(headers, body, None, None),
781        )
782        .await
783        .expect("feed");
784        assert_eq!(outcome.status, "rejected");
785        assert!(outcome
786            .reason
787            .as_deref()
788            .unwrap_or("")
789            .contains("signature"));
790    }
791
792    #[tokio::test(flavor = "current_thread")]
793    async fn isolates_two_intakes_on_different_paths() {
794        reset().await;
795        let body = b"shared-body".to_vec();
796        let mut config_a = make_config(b"secret-a");
797        config_a.path = Some("/hooks/a".to_string());
798        config_a.topic = "tests.intake_a".to_string();
799        config_a.delivery_id_header = "x-a-delivery".to_string();
800
801        let mut config_b = make_config(b"secret-b");
802        config_b.path = Some("/hooks/b".to_string());
803        config_b.topic = "tests.intake_b".to_string();
804        config_b.delivery_id_header = "x-b-delivery".to_string();
805
806        let snap_a = register_webhook_intake(config_a.clone()).expect("register a");
807        let snap_b = register_webhook_intake(config_b.clone()).expect("register b");
808
809        // Same delivery id on both intakes must NOT collide.
810        let headers_a = signed_headers(&config_a, &body, "shared-delivery");
811        let outcome_a = feed_webhook_intake(
812            snap_a.id.as_str(),
813            build_request(headers_a, body.clone(), Some("/hooks/a".to_string()), None),
814        )
815        .await
816        .expect("feed a");
817        assert_eq!(outcome_a.status, "accepted");
818
819        let headers_b = signed_headers(&config_b, &body, "shared-delivery");
820        let outcome_b = feed_webhook_intake(
821            snap_b.id.as_str(),
822            build_request(headers_b, body, Some("/hooks/b".to_string()), None),
823        )
824        .await
825        .expect("feed b");
826        assert_eq!(outcome_b.status, "accepted");
827
828        // And path mismatch is rejected.
829        let body = b"more".to_vec();
830        let headers = signed_headers(&config_a, &body, "another-delivery");
831        let outcome = feed_webhook_intake(
832            snap_a.id.as_str(),
833            build_request(headers, body, Some("/hooks/wrong".to_string()), None),
834        )
835        .await
836        .expect("feed mismatch");
837        assert_eq!(outcome.status, "rejected");
838        assert!(outcome.reason.unwrap().contains("path mismatch"));
839    }
840
841    #[tokio::test(flavor = "current_thread")]
842    async fn supports_sha1_legacy() {
843        reset().await;
844        let body = b"legacy".to_vec();
845        let mut config = make_config(b"legacy-secret");
846        config.algorithm = HmacAlgorithm::Sha1;
847        config.signature_prefix = Some("sha1=".to_string());
848        let snapshot = register_webhook_intake(config.clone()).expect("register");
849
850        let headers = signed_headers(&config, &body, "legacy-delivery");
851        let outcome = feed_webhook_intake(
852            snapshot.id.as_str(),
853            build_request(headers, body, None, None),
854        )
855        .await
856        .expect("feed");
857        assert_eq!(outcome.status, "accepted");
858    }
859
860    #[tokio::test(flavor = "current_thread")]
861    async fn dedupe_survives_process_restart() {
862        clear_webhook_intake_state();
863        crate::event_log::reset_active_event_log();
864        let tmp = tempfile::tempdir().expect("tempdir");
865        // Install a file-backed event log so the inbox can rehydrate from disk.
866        let log = Arc::new(AnyEventLog::File(
867            FileEventLog::open(tmp.path().to_path_buf(), 32).expect("file log"),
868        ));
869        crate::event_log::install_active_event_log(log.clone());
870
871        let body = br#"{"hello":"durable"}"#.to_vec();
872        let mut config = make_config(b"durable-secret");
873        // Pin the id so the post-restart re-register uses the same dedupe
874        // namespace; without this each register call would mint a fresh id.
875        config.id = Some("durable-intake".to_string());
876        let snapshot = register_webhook_intake(config.clone()).expect("register");
877        let headers = signed_headers(&config, &body, "durable-1");
878
879        let first = feed_webhook_intake(
880            snapshot.id.as_str(),
881            build_request(headers.clone(), body.clone(), None, None),
882        )
883        .await
884        .expect("first feed");
885        assert_eq!(first.status, "accepted");
886
887        // Simulate a process restart by clearing in-memory intake state and the
888        // active event log handle, then reinstalling a fresh file-backed log on
889        // the same directory.
890        clear_webhook_intake_state();
891        crate::event_log::reset_active_event_log();
892        let log = Arc::new(AnyEventLog::File(
893            FileEventLog::open(tmp.path().to_path_buf(), 32).expect("file log"),
894        ));
895        crate::event_log::install_active_event_log(log);
896        let snapshot = register_webhook_intake(config.clone()).expect("re-register");
897
898        let second = feed_webhook_intake(
899            snapshot.id.as_str(),
900            build_request(headers, body, None, None),
901        )
902        .await
903        .expect("second feed");
904        assert_eq!(second.status, "duplicate");
905
906        // Use install_default_for_base_dir to keep parity with stdlib path
907        // (no assertion — just ensures the helper still works).
908        let _log = install_default_for_base_dir(tmp.path()).expect("install default");
909    }
910}