Skip to main content

autumn_web/
webhook.rs

1//! Signed webhook intake for third-party callbacks.
2//!
3//! The [`SignedWebhook`] extractor verifies provider signatures against the
4//! exact HTTP request bytes before handler code runs. Configure endpoints with
5//! [`WebhookEndpointConfig`] under `security.webhooks.endpoints`.
6
7use std::collections::HashMap;
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use axum::extract::FromRequest;
14use bytes::Bytes;
15use http::{HeaderMap, StatusCode};
16use serde::Deserialize;
17use thiserror::Error;
18
19pub use crate::security::config::hmac_sha256_hex;
20
21const DEFAULT_TIMESTAMP_TOLERANCE_SECS: u64 = 300;
22const DEFAULT_REPLAY_WINDOW_SECS: u64 = 24 * 60 * 60;
23const DEFAULT_MAX_BODY_BYTES: usize = 1024 * 1024;
24const IN_MEMORY_REPLAY_CLEANUP_INTERVAL: usize = 128;
25const IN_MEMORY_REPLAY_CLEANUP_HIGH_WATER: usize = 16 * 1024;
26
27/// Provider preset for signed webhook verification.
28#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Deserialize)]
29#[serde(rename_all = "lowercase")]
30pub enum WebhookProvider {
31    /// Stripe-style `Stripe-Signature: t=...,v1=...`.
32    Stripe,
33    /// GitHub-style `X-Hub-Signature-256: sha256=...`.
34    Github,
35    /// Slack-style `X-Slack-Signature: v0=...`.
36    Slack,
37    /// Generic HMAC-SHA256 over the raw request body.
38    #[default]
39    Generic,
40}
41
42impl WebhookProvider {
43    /// Stable lower-case provider name.
44    #[must_use]
45    pub const fn as_str(self) -> &'static str {
46        match self {
47            Self::Stripe => "stripe",
48            Self::Github => "github",
49            Self::Slack => "slack",
50            Self::Generic => "generic",
51        }
52    }
53}
54
55impl std::fmt::Display for WebhookProvider {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.write_str(self.as_str())
58    }
59}
60
61/// Signed webhook configuration section.
62#[derive(Debug, Clone, Default, Deserialize)]
63pub struct WebhookConfig {
64    /// Replay protection backend used for all replay-protected endpoints.
65    #[serde(default)]
66    pub replay: WebhookReplayConfig,
67    /// Configured signed webhook endpoints.
68    #[serde(default)]
69    pub endpoints: Vec<WebhookEndpointConfig>,
70}
71
72impl WebhookConfig {
73    /// Apply environment-sourced webhook configuration.
74    pub(crate) fn apply_env_overrides_with_env(&mut self, env: &dyn crate::config::Env) {
75        self.replay.apply_env_overrides_with_env(env);
76        self.resolve_secret_envs_with_env(env);
77    }
78
79    /// Resolve `secret_env` and `previous_secret_envs` entries from the
80    /// configured environment.
81    fn resolve_secret_envs_with_env(&mut self, env: &dyn crate::config::Env) {
82        for endpoint in &mut self.endpoints {
83            if endpoint.secret.is_none()
84                && let Some(env_name) = endpoint.secret_env.as_deref()
85                && let Ok(secret) = env.var(env_name)
86            {
87                endpoint.secret = Some(secret);
88            }
89
90            for env_name in &endpoint.previous_secret_envs {
91                if let Ok(secret) = env.var(env_name)
92                    && !secret.is_empty()
93                {
94                    endpoint.previous_secrets.push(secret);
95                }
96            }
97        }
98    }
99
100    /// Validate configured signed webhook endpoints.
101    ///
102    /// # Errors
103    ///
104    /// Returns [`WebhookConfigError`] when an endpoint has no usable secret,
105    /// an invalid path, or a weak production secret.
106    pub fn validate(&self, is_production: bool) -> Result<(), WebhookConfigError> {
107        for endpoint in &self.endpoints {
108            endpoint.validate(is_production)?;
109        }
110        validate_unique_endpoint_paths(&self.endpoints)?;
111        if self
112            .endpoints
113            .iter()
114            .any(|endpoint| endpoint.replay_protection)
115        {
116            self.replay.validate(is_production)?;
117        }
118        Ok(())
119    }
120}
121
122/// Replay protection backend for signed webhooks.
123#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize)]
124#[serde(rename_all = "lowercase")]
125pub enum WebhookReplayBackend {
126    /// Process-local memory store. Suitable for tests, development, and
127    /// explicitly acknowledged single-replica deployments.
128    #[serde(alias = "local", alias = "in_memory")]
129    #[default]
130    Memory,
131    /// Redis `SET NX EX` store shared by every application replica.
132    Redis,
133}
134
135impl WebhookReplayBackend {
136    fn from_env_value(value: &str) -> Option<Self> {
137        match value.trim().to_ascii_lowercase().as_str() {
138            "memory" | "local" | "in_memory" | "in-memory" => Some(Self::Memory),
139            "redis" => Some(Self::Redis),
140            _ => None,
141        }
142    }
143}
144
145/// Replay protection storage configuration.
146#[derive(Debug, Clone, Deserialize)]
147pub struct WebhookReplayConfig {
148    /// Active replay backend.
149    #[serde(default)]
150    pub backend: WebhookReplayBackend,
151    /// Explicit production escape hatch for single-replica deployments.
152    #[serde(default)]
153    pub allow_memory_in_production: bool,
154    /// Redis backend options.
155    #[serde(default)]
156    pub redis: WebhookReplayRedisConfig,
157}
158
159impl Default for WebhookReplayConfig {
160    fn default() -> Self {
161        Self {
162            backend: WebhookReplayBackend::Memory,
163            allow_memory_in_production: false,
164            redis: WebhookReplayRedisConfig::default(),
165        }
166    }
167}
168
169impl WebhookReplayConfig {
170    fn apply_env_overrides_with_env(&mut self, env: &dyn crate::config::Env) {
171        if let Ok(value) = env.var("AUTUMN_SECURITY__WEBHOOKS__REPLAY__BACKEND") {
172            if let Some(backend) = WebhookReplayBackend::from_env_value(&value) {
173                self.backend = backend;
174            } else {
175                eprintln!(
176                    "Warning: AUTUMN_SECURITY__WEBHOOKS__REPLAY__BACKEND={value:?} is not valid \
177                     (expected memory or redis), ignoring"
178                );
179            }
180        }
181        if let Ok(value) = env.var("AUTUMN_SECURITY__WEBHOOKS__REPLAY__ALLOW_MEMORY_IN_PRODUCTION")
182        {
183            match value.trim().parse::<bool>() {
184                Ok(value) => self.allow_memory_in_production = value,
185                Err(error) => eprintln!(
186                    "Warning: AUTUMN_SECURITY__WEBHOOKS__REPLAY__ALLOW_MEMORY_IN_PRODUCTION \
187                     could not be parsed as bool: {error}"
188                ),
189            }
190        }
191        if let Ok(value) = env.var("AUTUMN_SECURITY__WEBHOOKS__REPLAY__REDIS__URL") {
192            let value = value.trim();
193            self.redis.url = if value.is_empty() {
194                None
195            } else {
196                Some(value.to_owned())
197            };
198        }
199        if let Ok(value) = env.var("AUTUMN_SECURITY__WEBHOOKS__REPLAY__REDIS__KEY_PREFIX")
200            && !value.trim().is_empty()
201        {
202            self.redis.key_prefix = value;
203        }
204    }
205
206    fn validate(&self, is_production: bool) -> Result<(), WebhookConfigError> {
207        match self.backend {
208            WebhookReplayBackend::Memory => {
209                if is_production && !self.allow_memory_in_production {
210                    return Err(WebhookConfigError::MemoryReplayInProduction);
211                }
212                Ok(())
213            }
214            WebhookReplayBackend::Redis => validate_redis_replay_config(&self.redis),
215        }
216    }
217}
218
219/// Redis replay protection backend configuration.
220#[derive(Debug, Clone, Deserialize)]
221pub struct WebhookReplayRedisConfig {
222    /// Redis connection URL.
223    #[serde(default)]
224    pub url: Option<String>,
225    /// Prefix for all replay keys stored in Redis.
226    #[serde(default = "default_replay_redis_key_prefix")]
227    pub key_prefix: String,
228}
229
230impl Default for WebhookReplayRedisConfig {
231    fn default() -> Self {
232        Self {
233            url: None,
234            key_prefix: default_replay_redis_key_prefix(),
235        }
236    }
237}
238
239/// Configuration for one signed webhook endpoint.
240#[derive(Debug, Clone, Deserialize)]
241pub struct WebhookEndpointConfig {
242    /// Unique endpoint name used in diagnostics and replay keys.
243    pub name: String,
244    /// Exact route path protected by this config.
245    pub path: String,
246    /// Provider verification preset.
247    #[serde(default)]
248    pub provider: WebhookProvider,
249    /// Current webhook signing secret.
250    #[serde(default)]
251    pub secret: Option<String>,
252    /// Environment variable that provides the current secret.
253    #[serde(default)]
254    pub secret_env: Option<String>,
255    /// Previous secrets accepted during a rotation grace window.
256    #[serde(default)]
257    pub previous_secrets: Vec<String>,
258    /// Environment variables that provide previous rotation secrets.
259    #[serde(default)]
260    pub previous_secret_envs: Vec<String>,
261    /// Maximum timestamp skew accepted for timestamped providers.
262    #[serde(default = "default_timestamp_tolerance_secs")]
263    pub timestamp_tolerance_secs: u64,
264    /// Replay rejection window for duplicate delivery IDs.
265    #[serde(default = "default_replay_window_secs")]
266    pub replay_window_secs: u64,
267    /// Whether duplicate delivery IDs are rejected.
268    #[serde(default = "default_true")]
269    pub replay_protection: bool,
270    /// Header carrying the signature. Provider defaults are applied by
271    /// constructors and by `Default`.
272    #[serde(default)]
273    pub signature_header: Option<String>,
274    /// Optional prefix stripped from header signatures before comparison.
275    #[serde(default)]
276    pub signature_prefix: Option<String>,
277    /// Optional header carrying a Unix timestamp.
278    #[serde(default)]
279    pub timestamp_header: Option<String>,
280    /// Optional header carrying provider delivery ID.
281    #[serde(default)]
282    pub delivery_id_header: Option<String>,
283    /// Optional header carrying provider event type.
284    #[serde(default)]
285    pub event_type_header: Option<String>,
286    /// Maximum raw body bytes read by the extractor.
287    #[serde(default = "default_max_body_bytes")]
288    pub max_body_bytes: usize,
289}
290
291impl Default for WebhookEndpointConfig {
292    fn default() -> Self {
293        Self::provider_defaults(WebhookProvider::Generic)
294    }
295}
296
297impl WebhookEndpointConfig {
298    /// Create a provider-shaped endpoint config with a current secret.
299    #[must_use]
300    pub fn new(
301        name: impl Into<String>,
302        path: impl Into<String>,
303        provider: WebhookProvider,
304        secret: impl Into<String>,
305    ) -> Self {
306        let mut config = Self::provider_defaults(provider);
307        config.name = name.into();
308        config.path = path.into();
309        config.secret = Some(secret.into());
310        config
311    }
312
313    /// Create a Stripe-style endpoint.
314    #[must_use]
315    pub fn stripe(
316        name: impl Into<String>,
317        path: impl Into<String>,
318        secret: impl Into<String>,
319    ) -> Self {
320        Self::new(name, path, WebhookProvider::Stripe, secret)
321    }
322
323    /// Create a GitHub-style endpoint.
324    #[must_use]
325    pub fn github(
326        name: impl Into<String>,
327        path: impl Into<String>,
328        secret: impl Into<String>,
329    ) -> Self {
330        Self::new(name, path, WebhookProvider::Github, secret)
331    }
332
333    /// Create a Slack-style endpoint.
334    #[must_use]
335    pub fn slack(
336        name: impl Into<String>,
337        path: impl Into<String>,
338        secret: impl Into<String>,
339    ) -> Self {
340        Self::new(name, path, WebhookProvider::Slack, secret)
341    }
342
343    /// Create a generic HMAC-SHA256 endpoint.
344    #[must_use]
345    pub fn generic(
346        name: impl Into<String>,
347        path: impl Into<String>,
348        secret: impl Into<String>,
349    ) -> Self {
350        Self::new(name, path, WebhookProvider::Generic, secret)
351    }
352
353    /// Accept one previous secret during rotation.
354    #[must_use]
355    pub fn with_previous_secret(mut self, secret: impl Into<String>) -> Self {
356        self.previous_secrets.push(secret.into());
357        self
358    }
359
360    /// Override timestamp tolerance.
361    #[must_use]
362    pub const fn with_timestamp_tolerance_secs(mut self, secs: u64) -> Self {
363        self.timestamp_tolerance_secs = secs;
364        self
365    }
366
367    /// Override replay rejection window.
368    #[must_use]
369    pub const fn with_replay_window_secs(mut self, secs: u64) -> Self {
370        self.replay_window_secs = secs;
371        self
372    }
373
374    /// Disable duplicate delivery rejection for this endpoint.
375    #[must_use]
376    pub const fn without_replay_protection(mut self) -> Self {
377        self.replay_protection = false;
378        self
379    }
380
381    fn provider_defaults(provider: WebhookProvider) -> Self {
382        let mut config = Self {
383            name: String::new(),
384            path: String::new(),
385            provider,
386            secret: None,
387            secret_env: None,
388            previous_secrets: Vec::new(),
389            previous_secret_envs: Vec::new(),
390            timestamp_tolerance_secs: DEFAULT_TIMESTAMP_TOLERANCE_SECS,
391            replay_window_secs: DEFAULT_REPLAY_WINDOW_SECS,
392            replay_protection: true,
393            signature_header: None,
394            signature_prefix: None,
395            timestamp_header: None,
396            delivery_id_header: None,
397            event_type_header: None,
398            max_body_bytes: DEFAULT_MAX_BODY_BYTES,
399        };
400
401        match provider {
402            WebhookProvider::Stripe => {
403                config.signature_header = Some("Stripe-Signature".to_owned());
404            }
405            WebhookProvider::Github => {
406                config.signature_header = Some("X-Hub-Signature-256".to_owned());
407                config.signature_prefix = Some("sha256=".to_owned());
408                config.delivery_id_header = Some("X-GitHub-Delivery".to_owned());
409                config.event_type_header = Some("X-GitHub-Event".to_owned());
410            }
411            WebhookProvider::Slack => {
412                config.signature_header = Some("X-Slack-Signature".to_owned());
413                config.signature_prefix = Some("v0=".to_owned());
414                config.timestamp_header = Some("X-Slack-Request-Timestamp".to_owned());
415            }
416            WebhookProvider::Generic => {
417                config.signature_header = Some("X-Webhook-Signature".to_owned());
418                config.signature_prefix = Some("sha256=".to_owned());
419                config.delivery_id_header = Some("X-Webhook-Delivery".to_owned());
420                config.event_type_header = Some("X-Webhook-Event".to_owned());
421            }
422        }
423
424        config
425    }
426
427    fn validate(&self, is_production: bool) -> Result<(), WebhookConfigError> {
428        if self.name.trim().is_empty() {
429            return Err(WebhookConfigError::InvalidEndpoint {
430                name: self.name.clone(),
431                message: "name must not be empty".to_owned(),
432            });
433        }
434        if !self.path.starts_with('/') || self.path.trim() == "/" || self.path.trim().is_empty() {
435            return Err(WebhookConfigError::InvalidEndpoint {
436                name: self.name.clone(),
437                message: format!("path {:?} must start with '/' and not be root", self.path),
438            });
439        }
440        let Some(secret) = self.secret.as_deref().filter(|value| !value.is_empty()) else {
441            return Err(WebhookConfigError::MissingSecret {
442                name: self.name.clone(),
443                path: self.path.clone(),
444            });
445        };
446
447        if is_production {
448            crate::security::config::validate_signing_secret(Some(secret), true).map_err(
449                |reason| WebhookConfigError::InvalidSecret {
450                    name: self.name.clone(),
451                    reason,
452                },
453            )?;
454            for (index, previous) in self.previous_secrets.iter().enumerate() {
455                crate::security::config::validate_signing_secret(Some(previous), true).map_err(
456                    |reason| WebhookConfigError::InvalidPreviousSecret {
457                        name: self.name.clone(),
458                        index,
459                        reason,
460                    },
461                )?;
462            }
463        }
464
465        Ok(())
466    }
467
468    fn apply_provider_defaults(&mut self) {
469        let defaults = Self::provider_defaults(self.provider);
470        if self.signature_header.is_none() {
471            self.signature_header = defaults.signature_header;
472        }
473        if self.signature_prefix.is_none() {
474            self.signature_prefix = defaults.signature_prefix;
475        }
476        if self.timestamp_header.is_none() {
477            self.timestamp_header = defaults.timestamp_header;
478        }
479        if self.delivery_id_header.is_none() {
480            self.delivery_id_header = defaults.delivery_id_header;
481        }
482        if self.event_type_header.is_none() {
483            self.event_type_header = defaults.event_type_header;
484        }
485    }
486}
487
488/// Webhook configuration validation failure.
489#[derive(Debug, Clone, PartialEq, Eq, Error)]
490pub enum WebhookConfigError {
491    /// A configured endpoint has no current secret.
492    #[error("webhook endpoint {name:?} at {path:?} is missing a secret")]
493    MissingSecret {
494        /// Endpoint name.
495        name: String,
496        /// Endpoint path.
497        path: String,
498    },
499    /// A configured endpoint is invalid.
500    #[error("webhook endpoint {name:?} is invalid: {message}")]
501    InvalidEndpoint {
502        /// Endpoint name.
503        name: String,
504        /// Validation message.
505        message: String,
506    },
507    /// Two endpoints declare the same route path.
508    #[error(
509        "duplicate webhook endpoint path {path:?}: endpoints {first_name:?} and \
510         {duplicate_name:?} would shadow each other"
511    )]
512    DuplicatePath {
513        /// Shared endpoint path.
514        path: String,
515        /// Name of the first endpoint using the path.
516        first_name: String,
517        /// Name of the later endpoint using the same path.
518        duplicate_name: String,
519    },
520    /// Current production secret is weak or malformed.
521    #[error("webhook endpoint {name:?} has invalid secret: {reason}")]
522    InvalidSecret {
523        /// Endpoint name.
524        name: String,
525        /// Signing secret validation error.
526        reason: crate::security::config::SigningSecretError,
527    },
528    /// Previous production secret is weak or malformed.
529    #[error("webhook endpoint {name:?} has invalid previous secret {index}: {reason}")]
530    InvalidPreviousSecret {
531        /// Endpoint name.
532        name: String,
533        /// Previous secret index.
534        index: usize,
535        /// Signing secret validation error.
536        reason: crate::security::config::SigningSecretError,
537    },
538    /// Process-local replay storage was selected for production webhooks.
539    #[error(
540        "webhook replay backend memory is not allowed in production; set \
541         security.webhooks.replay.backend = \"redis\" or explicitly set \
542         security.webhooks.replay.allow_memory_in_production = true"
543    )]
544    MemoryReplayInProduction,
545    /// Redis replay protection was selected without a URL.
546    #[error("webhook redis replay backend requires security.webhooks.replay.redis.url")]
547    RedisReplayMissingUrl,
548    /// Redis replay URL is malformed.
549    #[error("webhook redis replay backend URL is invalid: {0}")]
550    RedisReplayInvalidUrl(String),
551    /// Redis replay protection was selected without compiling the Redis feature.
552    #[error("webhook redis replay backend requires the autumn-web redis feature")]
553    RedisReplayFeatureDisabled,
554}
555
556#[derive(Debug)]
557struct ResolvedWebhookEndpoint {
558    config: WebhookEndpointConfig,
559    keys: crate::security::config::ResolvedSigningKeys,
560}
561
562/// Runtime registry for signed webhook endpoints.
563#[derive(Clone, Debug)]
564pub struct WebhookRegistry {
565    endpoints_by_path: Arc<HashMap<String, Arc<ResolvedWebhookEndpoint>>>,
566    replay_store: Arc<dyn WebhookReplayStore>,
567}
568
569impl WebhookRegistry {
570    /// Build a registry from config using the built-in in-memory replay store.
571    ///
572    /// # Errors
573    ///
574    /// Returns [`WebhookConfigError`] when any endpoint is missing a secret.
575    pub fn from_config(config: &WebhookConfig) -> Result<Self, WebhookConfigError> {
576        let replay_store = if config
577            .endpoints
578            .iter()
579            .any(|endpoint| endpoint.replay_protection)
580        {
581            replay_store_from_config(&config.replay)?
582        } else {
583            Arc::new(InMemoryWebhookReplayStore::default())
584        };
585        Self::from_config_with_shared_replay_store(config, replay_store)
586    }
587
588    /// Build a registry from config with a custom replay store.
589    ///
590    /// # Errors
591    ///
592    /// Returns [`WebhookConfigError`] when any endpoint is missing a secret.
593    pub fn from_config_with_replay_store(
594        config: &WebhookConfig,
595        replay_store: impl WebhookReplayStore + 'static,
596    ) -> Result<Self, WebhookConfigError> {
597        Self::from_config_with_shared_replay_store(config, Arc::new(replay_store))
598    }
599
600    /// Build a registry from config with a shared replay store.
601    ///
602    /// This is useful when multiple test app instances should share replay
603    /// state, or when an integration constructs its own durable backend.
604    ///
605    /// # Errors
606    ///
607    /// Returns [`WebhookConfigError`] when any endpoint is missing a secret.
608    pub fn from_config_with_shared_replay_store(
609        config: &WebhookConfig,
610        replay_store: Arc<dyn WebhookReplayStore>,
611    ) -> Result<Self, WebhookConfigError> {
612        validate_unique_endpoint_paths(&config.endpoints)?;
613        let mut endpoints_by_path = HashMap::new();
614        for endpoint in &config.endpoints {
615            let mut endpoint = endpoint.clone();
616            endpoint.apply_provider_defaults();
617            endpoint.validate(false)?;
618            let Some(secret) = endpoint.secret.as_ref() else {
619                return Err(WebhookConfigError::MissingSecret {
620                    name: endpoint.name.clone(),
621                    path: endpoint.path.clone(),
622                });
623            };
624            let current = secret.as_bytes().to_vec();
625            let previous = endpoint
626                .previous_secrets
627                .iter()
628                .map(|secret| secret.as_bytes().to_vec())
629                .collect();
630            endpoints_by_path.insert(
631                endpoint.path.clone(),
632                Arc::new(ResolvedWebhookEndpoint {
633                    config: endpoint,
634                    keys: crate::security::config::ResolvedSigningKeys::new(current, previous),
635                }),
636            );
637        }
638
639        Ok(Self {
640            endpoints_by_path: Arc::new(endpoints_by_path),
641            replay_store,
642        })
643    }
644
645    fn endpoint_for_path(&self, path: &str) -> Option<Arc<ResolvedWebhookEndpoint>> {
646        self.endpoints_by_path.get(path).cloned()
647    }
648}
649
650fn validate_unique_endpoint_paths(
651    endpoints: &[WebhookEndpointConfig],
652) -> Result<(), WebhookConfigError> {
653    let mut seen_paths = HashMap::new();
654    for endpoint in endpoints {
655        if let Some(first_name) = seen_paths.insert(endpoint.path.as_str(), endpoint.name.as_str())
656        {
657            return Err(WebhookConfigError::DuplicatePath {
658                path: endpoint.path.clone(),
659                first_name: first_name.to_owned(),
660                duplicate_name: endpoint.name.clone(),
661            });
662        }
663    }
664    Ok(())
665}
666
667/// Boxed replay store operation future.
668pub type WebhookReplayFuture<'a> =
669    Pin<Box<dyn Future<Output = Result<bool, WebhookReplayStoreError>> + Send + 'a>>;
670
671/// Replay store used to reject duplicate provider delivery IDs.
672pub trait WebhookReplayStore: Send + Sync + std::fmt::Debug {
673    /// Insert a delivery key and return `true`; return `false` if it already
674    /// exists inside the replay window.
675    fn check_and_insert<'a>(
676        &'a self,
677        key: &'a str,
678        received_at: SystemTime,
679        window: Duration,
680    ) -> WebhookReplayFuture<'a>;
681}
682
683impl<T> WebhookReplayStore for Arc<T>
684where
685    T: WebhookReplayStore + ?Sized,
686{
687    fn check_and_insert<'a>(
688        &'a self,
689        key: &'a str,
690        received_at: SystemTime,
691        window: Duration,
692    ) -> WebhookReplayFuture<'a> {
693        self.as_ref().check_and_insert(key, received_at, window)
694    }
695}
696
697/// Replay store operation failure.
698#[derive(Debug, Clone, Error)]
699#[error("{message}")]
700pub struct WebhookReplayStoreError {
701    message: String,
702}
703
704impl WebhookReplayStoreError {
705    /// Create a replay-store failure with a human-readable diagnostic.
706    ///
707    /// Custom [`WebhookReplayStore`] implementations should return this when
708    /// their durable backend is unavailable or cannot complete the atomic
709    /// delivery-ID claim. Autumn surfaces the failure as `503 Service
710    /// Unavailable` before the webhook handler runs.
711    #[must_use]
712    pub fn new(message: impl Into<String>) -> Self {
713        Self {
714            message: message.into(),
715        }
716    }
717}
718
719#[cfg(feature = "redis")]
720impl WebhookReplayStoreError {
721    fn backend(operation: &'static str, error: impl std::fmt::Display) -> Self {
722        Self::new(format!("webhook replay store {operation} failed: {error}"))
723    }
724}
725
726/// In-memory replay protection store.
727///
728/// This is suitable for tests, development, and single-process deployments. A
729/// multi-replica production fleet should configure the Redis replay backend.
730#[derive(Debug, Default)]
731pub struct InMemoryWebhookReplayStore {
732    state: Mutex<InMemoryWebhookReplayState>,
733}
734
735#[derive(Debug, Default)]
736struct InMemoryWebhookReplayState {
737    seen: HashMap<String, SystemTime>,
738    checks_since_cleanup: usize,
739}
740
741impl InMemoryWebhookReplayStore {
742    fn check_and_insert_sync(&self, key: &str, received_at: SystemTime, window: Duration) -> bool {
743        {
744            let mut state = self
745                .state
746                .lock()
747                .expect("webhook replay store lock poisoned");
748            state.checks_since_cleanup = state.checks_since_cleanup.saturating_add(1);
749
750            if let Some(expires_at) = state.seen.get(key).copied() {
751                if expires_at.duration_since(received_at).is_ok() {
752                    Self::cleanup_if_due(&mut state, received_at);
753                    drop(state);
754                    return false;
755                }
756                state.seen.remove(key);
757            }
758
759            let expires_at = received_at.checked_add(window).unwrap_or(received_at);
760            state.seen.insert(key.to_owned(), expires_at);
761            Self::cleanup_if_due(&mut state, received_at);
762            drop(state);
763        }
764        true
765    }
766
767    fn cleanup_if_due(state: &mut InMemoryWebhookReplayState, received_at: SystemTime) {
768        if state.checks_since_cleanup < IN_MEMORY_REPLAY_CLEANUP_INTERVAL
769            && state.seen.len() <= IN_MEMORY_REPLAY_CLEANUP_HIGH_WATER
770        {
771            return;
772        }
773
774        state.checks_since_cleanup = 0;
775        state
776            .seen
777            .retain(|_, expires_at| expires_at.duration_since(received_at).is_ok());
778    }
779}
780
781impl WebhookReplayStore for InMemoryWebhookReplayStore {
782    fn check_and_insert<'a>(
783        &'a self,
784        key: &'a str,
785        received_at: SystemTime,
786        window: Duration,
787    ) -> WebhookReplayFuture<'a> {
788        Box::pin(async move { Ok(self.check_and_insert_sync(key, received_at, window)) })
789    }
790}
791
792/// Redis replay protection store.
793#[cfg(feature = "redis")]
794#[derive(Clone, Debug)]
795pub struct RedisWebhookReplayStore {
796    connection: redis::aio::ConnectionManager,
797    key_prefix: String,
798}
799
800#[cfg(feature = "redis")]
801impl RedisWebhookReplayStore {
802    /// Build a Redis replay store from config.
803    ///
804    /// # Errors
805    ///
806    /// Returns [`WebhookConfigError`] when the URL is absent or malformed.
807    pub fn from_config(config: &WebhookReplayRedisConfig) -> Result<Self, WebhookConfigError> {
808        let url = config
809            .url
810            .as_deref()
811            .filter(|url| !url.trim().is_empty())
812            .ok_or(WebhookConfigError::RedisReplayMissingUrl)?;
813        let client = redis::Client::open(url)
814            .map_err(|error| WebhookConfigError::RedisReplayInvalidUrl(error.to_string()))?;
815        let connection = redis::aio::ConnectionManager::new_lazy_with_config(
816            client,
817            redis::aio::ConnectionManagerConfig::new(),
818        )
819        .map_err(|error| WebhookConfigError::RedisReplayInvalidUrl(error.to_string()))?;
820        Ok(Self {
821            connection,
822            key_prefix: config.key_prefix.clone(),
823        })
824    }
825
826    fn key_for(&self, replay_key: &str) -> String {
827        format!("{}:{replay_key}", self.key_prefix)
828    }
829}
830
831#[cfg(feature = "redis")]
832impl WebhookReplayStore for RedisWebhookReplayStore {
833    fn check_and_insert<'a>(
834        &'a self,
835        key: &'a str,
836        received_at: SystemTime,
837        window: Duration,
838    ) -> WebhookReplayFuture<'a> {
839        Box::pin(async move {
840            let mut connection = self.connection.clone();
841            let key = self.key_for(key);
842            let ttl_secs = window.as_secs().max(1);
843            let received_unix = received_at
844                .duration_since(UNIX_EPOCH)
845                .map_err(|error| WebhookReplayStoreError::backend("timestamp", error))?
846                .as_secs()
847                .to_string();
848            let inserted: Option<String> = redis::cmd("SET")
849                .arg(&key)
850                .arg(received_unix)
851                .arg("NX")
852                .arg("EX")
853                .arg(ttl_secs)
854                .query_async(&mut connection)
855                .await
856                .map_err(|error| WebhookReplayStoreError::backend("insert", error))?;
857            Ok(inserted.is_some())
858        })
859    }
860}
861
862/// A request that has passed signed webhook verification.
863#[derive(Debug, Clone)]
864pub struct SignedWebhook {
865    provider: WebhookProvider,
866    endpoint: String,
867    delivery_id: Option<String>,
868    event_type: Option<String>,
869    received_at: SystemTime,
870    raw_body: Bytes,
871}
872
873impl SignedWebhook {
874    /// Provider preset that verified this request.
875    #[must_use]
876    pub const fn provider(&self) -> &'static str {
877        self.provider.as_str()
878    }
879
880    /// Configured endpoint name.
881    #[must_use]
882    pub fn endpoint(&self) -> &str {
883        &self.endpoint
884    }
885
886    /// Provider delivery ID, when present.
887    #[must_use]
888    pub fn delivery_id(&self) -> Option<&str> {
889        self.delivery_id.as_deref()
890    }
891
892    /// Provider event type, when present.
893    #[must_use]
894    pub fn event_type(&self) -> Option<&str> {
895        self.event_type.as_deref()
896    }
897
898    /// Request receive time used for timestamp and replay checks.
899    #[must_use]
900    pub const fn received_at(&self) -> SystemTime {
901        self.received_at
902    }
903
904    /// Exact verified request body bytes.
905    #[must_use]
906    pub fn raw_body(&self) -> &[u8] {
907        &self.raw_body
908    }
909
910    /// Decode the verified body as JSON.
911    ///
912    /// # Errors
913    ///
914    /// Returns `serde_json::Error` when the verified body is not valid JSON for
915    /// the requested type.
916    pub fn json<T: serde::de::DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
917        serde_json::from_slice(&self.raw_body)
918    }
919}
920
921impl FromRequest<crate::AppState> for SignedWebhook {
922    type Rejection = crate::AutumnError;
923
924    async fn from_request(
925        req: axum::extract::Request,
926        state: &crate::AppState,
927    ) -> Result<Self, Self::Rejection> {
928        let (parts, body) = req.into_parts();
929        let path = parts.uri.path().to_owned();
930        let registry = state
931            .extension::<WebhookRegistry>()
932            .ok_or_else(|| WebhookVerifyError::RegistryMissing.into_autumn_error())?;
933        let endpoint = registry
934            .endpoint_for_path(&path)
935            .ok_or_else(|| WebhookVerifyError::EndpointMissing(path.clone()).into_autumn_error())?;
936        let body = axum::body::to_bytes(body, endpoint.config.max_body_bytes)
937            .await
938            .map_err(|err| {
939                crate::AutumnError::bad_request_msg(format!(
940                    "webhook body could not be read: {err}"
941                ))
942            })?;
943        let received_at = SystemTime::now();
944        verify_request(&registry, &endpoint, &parts.headers, body, received_at)
945            .await
946            .map_err(WebhookVerifyError::into_autumn_error)
947    }
948}
949
950#[derive(Debug, Error)]
951enum WebhookVerifyError {
952    #[error("signed webhook registry is not installed")]
953    RegistryMissing,
954    #[error("no signed webhook endpoint is configured for path {0}")]
955    EndpointMissing(String),
956    #[error("missing required webhook header {0}")]
957    MissingHeader(String),
958    #[error("malformed webhook signature")]
959    MalformedSignature,
960    #[error("malformed webhook timestamp")]
961    MalformedTimestamp,
962    #[error("webhook timestamp is outside the accepted tolerance")]
963    StaleTimestamp,
964    #[error("webhook signature mismatch")]
965    SignatureMismatch,
966    #[error("missing webhook delivery ID")]
967    MissingDeliveryId,
968    #[error("duplicate webhook delivery")]
969    DuplicateDelivery,
970    #[error("webhook replay store unavailable: {0}")]
971    ReplayStoreUnavailable(String),
972}
973
974impl WebhookVerifyError {
975    const fn status(&self) -> StatusCode {
976        match self {
977            Self::RegistryMissing | Self::EndpointMissing(_) => StatusCode::INTERNAL_SERVER_ERROR,
978            Self::MissingHeader(_)
979            | Self::MalformedSignature
980            | Self::MalformedTimestamp
981            | Self::MissingDeliveryId => StatusCode::BAD_REQUEST,
982            Self::StaleTimestamp | Self::SignatureMismatch => StatusCode::UNAUTHORIZED,
983            Self::DuplicateDelivery => StatusCode::CONFLICT,
984            Self::ReplayStoreUnavailable(_) => StatusCode::SERVICE_UNAVAILABLE,
985        }
986    }
987
988    fn into_autumn_error(self) -> crate::AutumnError {
989        crate::AutumnError::bad_request_msg(self.to_string()).with_status(self.status())
990    }
991}
992
993async fn verify_request(
994    registry: &WebhookRegistry,
995    endpoint: &ResolvedWebhookEndpoint,
996    headers: &HeaderMap,
997    body: Bytes,
998    received_at: SystemTime,
999) -> Result<SignedWebhook, WebhookVerifyError> {
1000    match endpoint.config.provider {
1001        WebhookProvider::Stripe => verify_stripe(endpoint, headers, &body, received_at)?,
1002        WebhookProvider::Github | WebhookProvider::Generic => {
1003            verify_body_hmac(endpoint, headers, &body, None, received_at)?;
1004        }
1005        WebhookProvider::Slack => verify_slack(endpoint, headers, &body, received_at)?,
1006    }
1007
1008    let json_body = serde_json::from_slice::<serde_json::Value>(&body).ok();
1009    let delivery_id = resolve_delivery_id(&endpoint.config, headers, json_body.as_ref());
1010    if endpoint.config.replay_protection {
1011        let delivery_id = delivery_id
1012            .as_deref()
1013            .ok_or(WebhookVerifyError::MissingDeliveryId)?;
1014        let replay_key = format!(
1015            "{}:{}:{delivery_id}",
1016            endpoint.config.provider.as_str(),
1017            endpoint.config.name
1018        );
1019        let window = Duration::from_secs(endpoint.config.replay_window_secs);
1020        if !registry
1021            .replay_store
1022            .check_and_insert(&replay_key, received_at, window)
1023            .await
1024            .map_err(|error| WebhookVerifyError::ReplayStoreUnavailable(error.to_string()))?
1025        {
1026            return Err(WebhookVerifyError::DuplicateDelivery);
1027        }
1028    }
1029
1030    Ok(SignedWebhook {
1031        provider: endpoint.config.provider,
1032        endpoint: endpoint.config.name.clone(),
1033        delivery_id,
1034        event_type: resolve_event_type(&endpoint.config, headers, json_body.as_ref()),
1035        received_at,
1036        raw_body: body,
1037    })
1038}
1039
1040fn verify_stripe(
1041    endpoint: &ResolvedWebhookEndpoint,
1042    headers: &HeaderMap,
1043    body: &[u8],
1044    received_at: SystemTime,
1045) -> Result<(), WebhookVerifyError> {
1046    let header = required_header(headers, signature_header(endpoint))?;
1047    let (timestamp, signatures) = parse_stripe_signature(header)?;
1048    verify_timestamp(
1049        timestamp,
1050        received_at,
1051        endpoint.config.timestamp_tolerance_secs,
1052    )?;
1053
1054    let timestamp = timestamp.to_string();
1055    let mut signed_payload = Vec::with_capacity(timestamp.len() + 1 + body.len());
1056    signed_payload.extend_from_slice(timestamp.as_bytes());
1057    signed_payload.push(b'.');
1058    signed_payload.extend_from_slice(body);
1059
1060    if signatures
1061        .iter()
1062        .any(|signature| endpoint.keys.verify(&signed_payload, signature))
1063    {
1064        Ok(())
1065    } else {
1066        Err(WebhookVerifyError::SignatureMismatch)
1067    }
1068}
1069
1070fn verify_slack(
1071    endpoint: &ResolvedWebhookEndpoint,
1072    headers: &HeaderMap,
1073    body: &[u8],
1074    received_at: SystemTime,
1075) -> Result<(), WebhookVerifyError> {
1076    let timestamp_header = endpoint
1077        .config
1078        .timestamp_header
1079        .as_deref()
1080        .ok_or(WebhookVerifyError::MalformedTimestamp)?;
1081    let timestamp = required_header(headers, timestamp_header)?
1082        .parse::<i64>()
1083        .map_err(|_| WebhookVerifyError::MalformedTimestamp)?;
1084    verify_timestamp(
1085        timestamp,
1086        received_at,
1087        endpoint.config.timestamp_tolerance_secs,
1088    )?;
1089
1090    let timestamp = timestamp.to_string();
1091    let mut signed_payload = Vec::with_capacity(3 + timestamp.len() + 1 + body.len());
1092    signed_payload.extend_from_slice(b"v0:");
1093    signed_payload.extend_from_slice(timestamp.as_bytes());
1094    signed_payload.push(b':');
1095    signed_payload.extend_from_slice(body);
1096    verify_body_hmac(
1097        endpoint,
1098        headers,
1099        &signed_payload,
1100        endpoint.config.signature_prefix.as_deref(),
1101        received_at,
1102    )
1103}
1104
1105fn verify_body_hmac(
1106    endpoint: &ResolvedWebhookEndpoint,
1107    headers: &HeaderMap,
1108    body_or_base: &[u8],
1109    explicit_prefix: Option<&str>,
1110    received_at: SystemTime,
1111) -> Result<(), WebhookVerifyError> {
1112    if let Some(timestamp_header) = endpoint.config.timestamp_header.as_deref()
1113        && endpoint.config.provider != WebhookProvider::Slack
1114    {
1115        let timestamp = required_header(headers, timestamp_header)?
1116            .parse::<i64>()
1117            .map_err(|_| WebhookVerifyError::MalformedTimestamp)?;
1118        verify_timestamp(
1119            timestamp,
1120            received_at,
1121            endpoint.config.timestamp_tolerance_secs,
1122        )?;
1123    }
1124
1125    let mut signature = required_header(headers, signature_header(endpoint))?;
1126    let prefix = explicit_prefix.or(endpoint.config.signature_prefix.as_deref());
1127    if let Some(prefix) = prefix {
1128        signature = signature
1129            .strip_prefix(prefix)
1130            .ok_or(WebhookVerifyError::MalformedSignature)?;
1131    }
1132
1133    if endpoint.keys.verify(body_or_base, signature) {
1134        Ok(())
1135    } else {
1136        Err(WebhookVerifyError::SignatureMismatch)
1137    }
1138}
1139
1140fn signature_header(endpoint: &ResolvedWebhookEndpoint) -> &str {
1141    endpoint
1142        .config
1143        .signature_header
1144        .as_deref()
1145        .unwrap_or("X-Webhook-Signature")
1146}
1147
1148fn required_header<'a>(headers: &'a HeaderMap, name: &str) -> Result<&'a str, WebhookVerifyError> {
1149    headers
1150        .get(name)
1151        .ok_or_else(|| WebhookVerifyError::MissingHeader(name.to_owned()))?
1152        .to_str()
1153        .map_err(|_| WebhookVerifyError::MalformedSignature)
1154}
1155
1156fn parse_stripe_signature(header: &str) -> Result<(i64, Vec<&str>), WebhookVerifyError> {
1157    let mut timestamp = None;
1158    let mut signatures = Vec::new();
1159
1160    for part in header.split(',') {
1161        let Some((key, value)) = part.split_once('=') else {
1162            return Err(WebhookVerifyError::MalformedSignature);
1163        };
1164        match key.trim() {
1165            "t" => {
1166                timestamp = Some(
1167                    value
1168                        .trim()
1169                        .parse::<i64>()
1170                        .map_err(|_| WebhookVerifyError::MalformedTimestamp)?,
1171                );
1172            }
1173            "v1" => signatures.push(value.trim()),
1174            _ => {}
1175        }
1176    }
1177
1178    let timestamp = timestamp.ok_or(WebhookVerifyError::MalformedTimestamp)?;
1179    if signatures.is_empty() {
1180        return Err(WebhookVerifyError::MalformedSignature);
1181    }
1182    Ok((timestamp, signatures))
1183}
1184
1185fn verify_timestamp(
1186    timestamp: i64,
1187    received_at: SystemTime,
1188    tolerance_secs: u64,
1189) -> Result<(), WebhookVerifyError> {
1190    let now = i64::try_from(
1191        received_at
1192            .duration_since(UNIX_EPOCH)
1193            .map_err(|_| WebhookVerifyError::MalformedTimestamp)?
1194            .as_secs(),
1195    )
1196    .map_err(|_| WebhookVerifyError::MalformedTimestamp)?;
1197    let skew = now.abs_diff(timestamp);
1198    if skew > tolerance_secs {
1199        return Err(WebhookVerifyError::StaleTimestamp);
1200    }
1201    Ok(())
1202}
1203
1204fn resolve_delivery_id(
1205    config: &WebhookEndpointConfig,
1206    headers: &HeaderMap,
1207    json_body: Option<&serde_json::Value>,
1208) -> Option<String> {
1209    let header = config
1210        .delivery_id_header
1211        .as_deref()
1212        .and_then(|header| optional_header(headers, header));
1213
1214    match config.provider {
1215        WebhookProvider::Slack => header
1216            .or_else(|| slack_delivery_id(json_body))
1217            .or_else(|| json_string_field(json_body, "id")),
1218        _ => header.or_else(|| json_string_field(json_body, "id")),
1219    }
1220}
1221
1222fn resolve_event_type(
1223    config: &WebhookEndpointConfig,
1224    headers: &HeaderMap,
1225    json_body: Option<&serde_json::Value>,
1226) -> Option<String> {
1227    config
1228        .event_type_header
1229        .as_deref()
1230        .and_then(|header| optional_header(headers, header))
1231        .or_else(|| json_string_field(json_body, "type"))
1232        .or_else(|| nested_json_string_field(json_body, "event", "type"))
1233}
1234
1235fn optional_header(headers: &HeaderMap, name: &str) -> Option<String> {
1236    headers
1237        .get(name)
1238        .and_then(|value| value.to_str().ok())
1239        .filter(|value| !value.trim().is_empty())
1240        .map(str::to_owned)
1241}
1242
1243fn slack_delivery_id(json_body: Option<&serde_json::Value>) -> Option<String> {
1244    json_string_field(json_body, "event_id").or_else(|| {
1245        let value = json_body?;
1246        if value.get("type").and_then(serde_json::Value::as_str) == Some("url_verification") {
1247            value
1248                .get("challenge")
1249                .and_then(serde_json::Value::as_str)
1250                .map(str::to_owned)
1251        } else {
1252            None
1253        }
1254    })
1255}
1256
1257fn json_string_field(value: Option<&serde_json::Value>, field: &str) -> Option<String> {
1258    let value = value?;
1259    value
1260        .get(field)
1261        .and_then(serde_json::Value::as_str)
1262        .map(str::to_owned)
1263}
1264
1265fn nested_json_string_field(
1266    value: Option<&serde_json::Value>,
1267    parent: &str,
1268    field: &str,
1269) -> Option<String> {
1270    let value = value?;
1271    value
1272        .get(parent)
1273        .and_then(|parent_value| parent_value.get(field))
1274        .and_then(serde_json::Value::as_str)
1275        .map(str::to_owned)
1276}
1277
1278const fn default_timestamp_tolerance_secs() -> u64 {
1279    DEFAULT_TIMESTAMP_TOLERANCE_SECS
1280}
1281
1282const fn default_replay_window_secs() -> u64 {
1283    DEFAULT_REPLAY_WINDOW_SECS
1284}
1285
1286const fn default_max_body_bytes() -> usize {
1287    DEFAULT_MAX_BODY_BYTES
1288}
1289
1290const fn default_true() -> bool {
1291    true
1292}
1293
1294fn default_replay_redis_key_prefix() -> String {
1295    "autumn:webhooks:replay".to_owned()
1296}
1297
1298fn validate_redis_replay_config(
1299    config: &WebhookReplayRedisConfig,
1300) -> Result<(), WebhookConfigError> {
1301    let url = config
1302        .url
1303        .as_deref()
1304        .filter(|url| !url.trim().is_empty())
1305        .ok_or(WebhookConfigError::RedisReplayMissingUrl)?;
1306
1307    #[cfg(feature = "redis")]
1308    {
1309        redis::Client::open(url)
1310            .map_err(|error| WebhookConfigError::RedisReplayInvalidUrl(error.to_string()))?;
1311        Ok(())
1312    }
1313
1314    #[cfg(not(feature = "redis"))]
1315    {
1316        let _ = url;
1317        Err(WebhookConfigError::RedisReplayFeatureDisabled)
1318    }
1319}
1320
1321fn replay_store_from_config(
1322    config: &WebhookReplayConfig,
1323) -> Result<Arc<dyn WebhookReplayStore>, WebhookConfigError> {
1324    match config.backend {
1325        WebhookReplayBackend::Memory => Ok(Arc::new(InMemoryWebhookReplayStore::default())),
1326        WebhookReplayBackend::Redis => {
1327            #[cfg(feature = "redis")]
1328            {
1329                Ok(Arc::new(RedisWebhookReplayStore::from_config(
1330                    &config.redis,
1331                )?))
1332            }
1333
1334            #[cfg(not(feature = "redis"))]
1335            {
1336                Err(WebhookConfigError::RedisReplayFeatureDisabled)
1337            }
1338        }
1339    }
1340}
1341
1342pub(crate) fn install_registry_from_config(
1343    state: &crate::AppState,
1344    config: &WebhookConfig,
1345) -> Result<(), WebhookConfigError> {
1346    if config.endpoints.is_empty() {
1347        return Ok(());
1348    }
1349    let registry = WebhookRegistry::from_config(config)?;
1350    state.insert_extension(registry);
1351    Ok(())
1352}