Skip to main content

autumn_web/
webhook.rs

1//! Signed webhook intake for third-party callbacks.
2//!
3//! The [`SignedWebhook`] extractor verifies provider signatures against the
4//! exact HTTP request bytes before handler code runs. Configure endpoints with
5//! [`WebhookEndpointConfig`] under `security.webhooks.endpoints`.
6
7use std::collections::HashMap;
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use axum::extract::FromRequest;
14use bytes::Bytes;
15use http::{HeaderMap, StatusCode};
16use serde::Deserialize;
17use thiserror::Error;
18
19pub use crate::security::config::hmac_sha256_hex;
20
21const DEFAULT_TIMESTAMP_TOLERANCE_SECS: u64 = 300;
22const DEFAULT_REPLAY_WINDOW_SECS: u64 = 24 * 60 * 60;
23const DEFAULT_MAX_BODY_BYTES: usize = 1024 * 1024;
24const IN_MEMORY_REPLAY_CLEANUP_INTERVAL: usize = 128;
25const IN_MEMORY_REPLAY_CLEANUP_HIGH_WATER: usize = 16 * 1024;
26
27/// Provider preset for signed webhook verification.
28#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Deserialize)]
29#[serde(rename_all = "lowercase")]
30pub enum WebhookProvider {
31    /// Stripe-style `Stripe-Signature: t=...,v1=...`.
32    Stripe,
33    /// GitHub-style `X-Hub-Signature-256: sha256=...`.
34    Github,
35    /// Slack-style `X-Slack-Signature: v0=...`.
36    Slack,
37    /// Generic HMAC-SHA256 over the raw request body.
38    #[default]
39    Generic,
40}
41
42impl WebhookProvider {
43    /// Stable lower-case provider name.
44    #[must_use]
45    pub const fn as_str(self) -> &'static str {
46        match self {
47            Self::Stripe => "stripe",
48            Self::Github => "github",
49            Self::Slack => "slack",
50            Self::Generic => "generic",
51        }
52    }
53}
54
55impl std::fmt::Display for WebhookProvider {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.write_str(self.as_str())
58    }
59}
60
61/// Signed webhook configuration section.
62#[derive(Debug, Clone, Default, Deserialize)]
63pub struct WebhookConfig {
64    /// Replay protection backend used for all replay-protected endpoints.
65    #[serde(default)]
66    pub replay: WebhookReplayConfig,
67    /// Configured signed webhook endpoints.
68    #[serde(default)]
69    pub endpoints: Vec<WebhookEndpointConfig>,
70}
71
72impl WebhookConfig {
73    /// Apply environment-sourced webhook configuration.
74    pub(crate) fn apply_env_overrides_with_env(&mut self, env: &dyn crate::config::Env) {
75        self.replay.apply_env_overrides_with_env(env);
76        self.resolve_secret_envs_with_env(env);
77    }
78
79    /// Resolve `secret_env` and `previous_secret_envs` entries from the
80    /// configured environment.
81    fn resolve_secret_envs_with_env(&mut self, env: &dyn crate::config::Env) {
82        for endpoint in &mut self.endpoints {
83            if endpoint.secret.is_none()
84                && let Some(env_name) = endpoint.secret_env.as_deref()
85                && let Ok(secret) = env.var(env_name)
86            {
87                endpoint.secret = Some(secret);
88            }
89
90            for env_name in &endpoint.previous_secret_envs {
91                if let Ok(secret) = env.var(env_name)
92                    && !secret.is_empty()
93                {
94                    endpoint.previous_secrets.push(secret);
95                }
96            }
97        }
98    }
99
100    /// Validate configured signed webhook endpoints.
101    ///
102    /// # Errors
103    ///
104    /// Returns [`WebhookConfigError`] when an endpoint has no usable secret,
105    /// an invalid path, or a weak production secret.
106    pub fn validate(&self, is_production: bool) -> Result<(), WebhookConfigError> {
107        for endpoint in &self.endpoints {
108            endpoint.validate(is_production)?;
109        }
110        validate_unique_endpoint_paths(&self.endpoints)?;
111        if self
112            .endpoints
113            .iter()
114            .any(|endpoint| endpoint.replay_protection)
115        {
116            self.replay.validate(is_production)?;
117        }
118        Ok(())
119    }
120}
121
122/// Replay protection backend for signed webhooks.
123#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize)]
124#[serde(rename_all = "lowercase")]
125pub enum WebhookReplayBackend {
126    /// Process-local memory store. Suitable for tests, development, and
127    /// explicitly acknowledged single-replica deployments.
128    #[serde(alias = "local", alias = "in_memory")]
129    #[default]
130    Memory,
131    /// Redis `SET NX EX` store shared by every application replica.
132    Redis,
133}
134
135impl WebhookReplayBackend {
136    fn from_env_value(value: &str) -> Option<Self> {
137        match value.trim().to_ascii_lowercase().as_str() {
138            "memory" | "local" | "in_memory" | "in-memory" => Some(Self::Memory),
139            "redis" => Some(Self::Redis),
140            _ => None,
141        }
142    }
143}
144
145/// Replay protection storage configuration.
146#[derive(Debug, Clone, Deserialize)]
147pub struct WebhookReplayConfig {
148    /// Active replay backend.
149    #[serde(default)]
150    pub backend: WebhookReplayBackend,
151    /// Explicit production escape hatch for single-replica deployments.
152    #[serde(default)]
153    pub allow_memory_in_production: bool,
154    /// Redis backend options.
155    #[serde(default)]
156    pub redis: WebhookReplayRedisConfig,
157}
158
159impl Default for WebhookReplayConfig {
160    fn default() -> Self {
161        Self {
162            backend: WebhookReplayBackend::Memory,
163            allow_memory_in_production: false,
164            redis: WebhookReplayRedisConfig::default(),
165        }
166    }
167}
168
169impl WebhookReplayConfig {
170    fn apply_env_overrides_with_env(&mut self, env: &dyn crate::config::Env) {
171        if let Ok(value) = env.var("AUTUMN_SECURITY__WEBHOOKS__REPLAY__BACKEND") {
172            if let Some(backend) = WebhookReplayBackend::from_env_value(&value) {
173                self.backend = backend;
174            } else {
175                eprintln!(
176                    "Warning: AUTUMN_SECURITY__WEBHOOKS__REPLAY__BACKEND={value:?} is not valid \
177                     (expected memory or redis), ignoring"
178                );
179            }
180        }
181        if let Ok(value) = env.var("AUTUMN_SECURITY__WEBHOOKS__REPLAY__ALLOW_MEMORY_IN_PRODUCTION")
182        {
183            match value.trim().parse::<bool>() {
184                Ok(value) => self.allow_memory_in_production = value,
185                Err(error) => eprintln!(
186                    "Warning: AUTUMN_SECURITY__WEBHOOKS__REPLAY__ALLOW_MEMORY_IN_PRODUCTION \
187                     could not be parsed as bool: {error}"
188                ),
189            }
190        }
191        if let Ok(value) = env.var("AUTUMN_SECURITY__WEBHOOKS__REPLAY__REDIS__URL") {
192            let value = value.trim();
193            self.redis.url = if value.is_empty() {
194                None
195            } else {
196                Some(value.to_owned())
197            };
198        }
199        if let Ok(value) = env.var("AUTUMN_SECURITY__WEBHOOKS__REPLAY__REDIS__KEY_PREFIX")
200            && !value.trim().is_empty()
201        {
202            self.redis.key_prefix = value;
203        }
204    }
205
206    fn validate(&self, is_production: bool) -> Result<(), WebhookConfigError> {
207        match self.backend {
208            WebhookReplayBackend::Memory => {
209                if is_production && !self.allow_memory_in_production {
210                    return Err(WebhookConfigError::MemoryReplayInProduction);
211                }
212                Ok(())
213            }
214            WebhookReplayBackend::Redis => validate_redis_replay_config(&self.redis),
215        }
216    }
217}
218
219/// Redis replay protection backend configuration.
220#[derive(Debug, Clone, Deserialize)]
221pub struct WebhookReplayRedisConfig {
222    /// Redis connection URL.
223    #[serde(default)]
224    pub url: Option<String>,
225    /// Prefix for all replay keys stored in Redis.
226    #[serde(default = "default_replay_redis_key_prefix")]
227    pub key_prefix: String,
228}
229
230impl Default for WebhookReplayRedisConfig {
231    fn default() -> Self {
232        Self {
233            url: None,
234            key_prefix: default_replay_redis_key_prefix(),
235        }
236    }
237}
238
239/// Configuration for one signed webhook endpoint.
240#[derive(Debug, Clone, Deserialize)]
241pub struct WebhookEndpointConfig {
242    /// Unique endpoint name used in diagnostics and replay keys.
243    pub name: String,
244    /// Exact route path protected by this config.
245    pub path: String,
246    /// Provider verification preset.
247    #[serde(default)]
248    pub provider: WebhookProvider,
249    /// Current webhook signing secret.
250    #[serde(default)]
251    pub secret: Option<String>,
252    /// Environment variable that provides the current secret.
253    #[serde(default)]
254    pub secret_env: Option<String>,
255    /// Previous secrets accepted during a rotation grace window.
256    #[serde(default)]
257    pub previous_secrets: Vec<String>,
258    /// Environment variables that provide previous rotation secrets.
259    #[serde(default)]
260    pub previous_secret_envs: Vec<String>,
261    /// Maximum timestamp skew accepted for timestamped providers.
262    #[serde(default = "default_timestamp_tolerance_secs")]
263    pub timestamp_tolerance_secs: u64,
264    /// Replay rejection window for duplicate delivery IDs.
265    #[serde(default = "default_replay_window_secs")]
266    pub replay_window_secs: u64,
267    /// Whether duplicate delivery IDs are rejected.
268    #[serde(default = "default_true")]
269    pub replay_protection: bool,
270    /// Header carrying the signature. Provider defaults are applied by
271    /// constructors and by `Default`.
272    #[serde(default)]
273    pub signature_header: Option<String>,
274    /// Optional prefix stripped from header signatures before comparison.
275    #[serde(default)]
276    pub signature_prefix: Option<String>,
277    /// Optional header carrying a Unix timestamp.
278    #[serde(default)]
279    pub timestamp_header: Option<String>,
280    /// Optional header carrying provider delivery ID.
281    #[serde(default)]
282    pub delivery_id_header: Option<String>,
283    /// Optional header carrying provider event type.
284    #[serde(default)]
285    pub event_type_header: Option<String>,
286    /// Maximum raw body bytes read by the extractor.
287    #[serde(default = "default_max_body_bytes")]
288    pub max_body_bytes: usize,
289}
290
291impl Default for WebhookEndpointConfig {
292    fn default() -> Self {
293        Self::provider_defaults(WebhookProvider::Generic)
294    }
295}
296
297impl WebhookEndpointConfig {
298    /// Create a provider-shaped endpoint config with a current secret.
299    #[must_use]
300    pub fn new(
301        name: impl Into<String>,
302        path: impl Into<String>,
303        provider: WebhookProvider,
304        secret: impl Into<String>,
305    ) -> Self {
306        let mut config = Self::provider_defaults(provider);
307        config.name = name.into();
308        config.path = path.into();
309        config.secret = Some(secret.into());
310        config
311    }
312
313    /// Create a Stripe-style endpoint.
314    #[must_use]
315    pub fn stripe(
316        name: impl Into<String>,
317        path: impl Into<String>,
318        secret: impl Into<String>,
319    ) -> Self {
320        Self::new(name, path, WebhookProvider::Stripe, secret)
321    }
322
323    /// Create a GitHub-style endpoint.
324    #[must_use]
325    pub fn github(
326        name: impl Into<String>,
327        path: impl Into<String>,
328        secret: impl Into<String>,
329    ) -> Self {
330        Self::new(name, path, WebhookProvider::Github, secret)
331    }
332
333    /// Create a Slack-style endpoint.
334    #[must_use]
335    pub fn slack(
336        name: impl Into<String>,
337        path: impl Into<String>,
338        secret: impl Into<String>,
339    ) -> Self {
340        Self::new(name, path, WebhookProvider::Slack, secret)
341    }
342
343    /// Create a generic HMAC-SHA256 endpoint.
344    #[must_use]
345    pub fn generic(
346        name: impl Into<String>,
347        path: impl Into<String>,
348        secret: impl Into<String>,
349    ) -> Self {
350        Self::new(name, path, WebhookProvider::Generic, secret)
351    }
352
353    /// Accept one previous secret during rotation.
354    #[must_use]
355    pub fn with_previous_secret(mut self, secret: impl Into<String>) -> Self {
356        self.previous_secrets.push(secret.into());
357        self
358    }
359
360    /// Override timestamp tolerance.
361    #[must_use]
362    pub const fn with_timestamp_tolerance_secs(mut self, secs: u64) -> Self {
363        self.timestamp_tolerance_secs = secs;
364        self
365    }
366
367    /// Override replay rejection window.
368    #[must_use]
369    pub const fn with_replay_window_secs(mut self, secs: u64) -> Self {
370        self.replay_window_secs = secs;
371        self
372    }
373
374    /// Disable duplicate delivery rejection for this endpoint.
375    #[must_use]
376    pub const fn without_replay_protection(mut self) -> Self {
377        self.replay_protection = false;
378        self
379    }
380
381    fn provider_defaults(provider: WebhookProvider) -> Self {
382        let mut config = Self {
383            name: String::new(),
384            path: String::new(),
385            provider,
386            secret: None,
387            secret_env: None,
388            previous_secrets: Vec::new(),
389            previous_secret_envs: Vec::new(),
390            timestamp_tolerance_secs: DEFAULT_TIMESTAMP_TOLERANCE_SECS,
391            replay_window_secs: DEFAULT_REPLAY_WINDOW_SECS,
392            replay_protection: true,
393            signature_header: None,
394            signature_prefix: None,
395            timestamp_header: None,
396            delivery_id_header: None,
397            event_type_header: None,
398            max_body_bytes: DEFAULT_MAX_BODY_BYTES,
399        };
400
401        match provider {
402            WebhookProvider::Stripe => {
403                config.signature_header = Some("Stripe-Signature".to_owned());
404            }
405            WebhookProvider::Github => {
406                config.signature_header = Some("X-Hub-Signature-256".to_owned());
407                config.signature_prefix = Some("sha256=".to_owned());
408                config.delivery_id_header = Some("X-GitHub-Delivery".to_owned());
409                config.event_type_header = Some("X-GitHub-Event".to_owned());
410            }
411            WebhookProvider::Slack => {
412                config.signature_header = Some("X-Slack-Signature".to_owned());
413                config.signature_prefix = Some("v0=".to_owned());
414                config.timestamp_header = Some("X-Slack-Request-Timestamp".to_owned());
415            }
416            WebhookProvider::Generic => {
417                config.signature_header = Some("X-Webhook-Signature".to_owned());
418                config.signature_prefix = Some("sha256=".to_owned());
419                config.delivery_id_header = Some("X-Webhook-Delivery".to_owned());
420                config.event_type_header = Some("X-Webhook-Event".to_owned());
421            }
422        }
423
424        config
425    }
426
427    fn validate(&self, is_production: bool) -> Result<(), WebhookConfigError> {
428        if self.name.trim().is_empty() {
429            return Err(WebhookConfigError::InvalidEndpoint {
430                name: self.name.clone(),
431                message: "name must not be empty".to_owned(),
432            });
433        }
434        if !self.path.starts_with('/') || self.path.trim() == "/" || self.path.trim().is_empty() {
435            return Err(WebhookConfigError::InvalidEndpoint {
436                name: self.name.clone(),
437                message: format!("path {:?} must start with '/' and not be root", self.path),
438            });
439        }
440        let Some(secret) = self.secret.as_deref().filter(|value| !value.is_empty()) else {
441            return Err(WebhookConfigError::MissingSecret {
442                name: self.name.clone(),
443                path: self.path.clone(),
444            });
445        };
446
447        if is_production {
448            crate::security::config::validate_signing_secret(Some(secret), true).map_err(
449                |reason| WebhookConfigError::InvalidSecret {
450                    name: self.name.clone(),
451                    reason,
452                },
453            )?;
454            for (index, previous) in self.previous_secrets.iter().enumerate() {
455                crate::security::config::validate_signing_secret(Some(previous), true).map_err(
456                    |reason| WebhookConfigError::InvalidPreviousSecret {
457                        name: self.name.clone(),
458                        index,
459                        reason,
460                    },
461                )?;
462            }
463        }
464
465        Ok(())
466    }
467
468    fn apply_provider_defaults(&mut self) {
469        let defaults = Self::provider_defaults(self.provider);
470        if self.signature_header.is_none() {
471            self.signature_header = defaults.signature_header;
472        }
473        if self.signature_prefix.is_none() {
474            self.signature_prefix = defaults.signature_prefix;
475        }
476        if self.timestamp_header.is_none() {
477            self.timestamp_header = defaults.timestamp_header;
478        }
479        if self.delivery_id_header.is_none() {
480            self.delivery_id_header = defaults.delivery_id_header;
481        }
482        if self.event_type_header.is_none() {
483            self.event_type_header = defaults.event_type_header;
484        }
485    }
486}
487
488/// Webhook configuration validation failure.
489#[derive(Debug, Clone, PartialEq, Eq, Error)]
490pub enum WebhookConfigError {
491    /// A configured endpoint has no current secret.
492    #[error("webhook endpoint {name:?} at {path:?} is missing a secret")]
493    MissingSecret {
494        /// Endpoint name.
495        name: String,
496        /// Endpoint path.
497        path: String,
498    },
499    /// A configured endpoint is invalid.
500    #[error("webhook endpoint {name:?} is invalid: {message}")]
501    InvalidEndpoint {
502        /// Endpoint name.
503        name: String,
504        /// Validation message.
505        message: String,
506    },
507    /// Two endpoints declare the same route path.
508    #[error(
509        "duplicate webhook endpoint path {path:?}: endpoints {first_name:?} and \
510         {duplicate_name:?} would shadow each other"
511    )]
512    DuplicatePath {
513        /// Shared endpoint path.
514        path: String,
515        /// Name of the first endpoint using the path.
516        first_name: String,
517        /// Name of the later endpoint using the same path.
518        duplicate_name: String,
519    },
520    /// Current production secret is weak or malformed.
521    #[error("webhook endpoint {name:?} has invalid secret: {reason}")]
522    InvalidSecret {
523        /// Endpoint name.
524        name: String,
525        /// Signing secret validation error.
526        reason: crate::security::config::SigningSecretError,
527    },
528    /// Previous production secret is weak or malformed.
529    #[error("webhook endpoint {name:?} has invalid previous secret {index}: {reason}")]
530    InvalidPreviousSecret {
531        /// Endpoint name.
532        name: String,
533        /// Previous secret index.
534        index: usize,
535        /// Signing secret validation error.
536        reason: crate::security::config::SigningSecretError,
537    },
538    /// Process-local replay storage was selected for production webhooks.
539    #[error(
540        "webhook replay backend memory is not allowed in production; set \
541         security.webhooks.replay.backend = \"redis\" or explicitly set \
542         security.webhooks.replay.allow_memory_in_production = true"
543    )]
544    MemoryReplayInProduction,
545    /// Redis replay protection was selected without a URL.
546    #[error("webhook redis replay backend requires security.webhooks.replay.redis.url")]
547    RedisReplayMissingUrl,
548    /// Redis replay URL is malformed.
549    #[error("webhook redis replay backend URL is invalid: {0}")]
550    RedisReplayInvalidUrl(String),
551    /// Redis replay protection was selected without compiling the Redis feature.
552    #[error("webhook redis replay backend requires the autumn-web redis feature")]
553    RedisReplayFeatureDisabled,
554}
555
556#[derive(Debug)]
557struct ResolvedWebhookEndpoint {
558    config: WebhookEndpointConfig,
559    keys: crate::security::config::ResolvedSigningKeys,
560}
561
562/// Runtime registry for signed webhook endpoints.
563#[derive(Clone, Debug)]
564pub struct WebhookRegistry {
565    endpoints_by_path: Arc<HashMap<String, Arc<ResolvedWebhookEndpoint>>>,
566    replay_store: Arc<dyn WebhookReplayStore>,
567}
568
569impl WebhookRegistry {
570    /// Build a registry from config using the built-in in-memory replay store.
571    ///
572    /// # Errors
573    ///
574    /// Returns [`WebhookConfigError`] when any endpoint is missing a secret.
575    pub fn from_config(config: &WebhookConfig) -> Result<Self, WebhookConfigError> {
576        let replay_store = if config
577            .endpoints
578            .iter()
579            .any(|endpoint| endpoint.replay_protection)
580        {
581            replay_store_from_config(&config.replay)?
582        } else {
583            Arc::new(InMemoryWebhookReplayStore::default())
584        };
585        Self::from_config_with_shared_replay_store(config, replay_store)
586    }
587
588    /// Build a registry from config with a custom replay store.
589    ///
590    /// # Errors
591    ///
592    /// Returns [`WebhookConfigError`] when any endpoint is missing a secret.
593    pub fn from_config_with_replay_store(
594        config: &WebhookConfig,
595        replay_store: impl WebhookReplayStore + 'static,
596    ) -> Result<Self, WebhookConfigError> {
597        Self::from_config_with_shared_replay_store(config, Arc::new(replay_store))
598    }
599
600    /// Build a registry from config with a shared replay store.
601    ///
602    /// This is useful when multiple test app instances should share replay
603    /// state, or when an integration constructs its own durable backend.
604    ///
605    /// # Errors
606    ///
607    /// Returns [`WebhookConfigError`] when any endpoint is missing a secret.
608    pub fn from_config_with_shared_replay_store(
609        config: &WebhookConfig,
610        replay_store: Arc<dyn WebhookReplayStore>,
611    ) -> Result<Self, WebhookConfigError> {
612        validate_unique_endpoint_paths(&config.endpoints)?;
613        let mut endpoints_by_path = HashMap::new();
614        for endpoint in &config.endpoints {
615            let mut endpoint = endpoint.clone();
616            endpoint.apply_provider_defaults();
617            endpoint.validate(false)?;
618            let Some(secret) = endpoint.secret.as_ref() else {
619                return Err(WebhookConfigError::MissingSecret {
620                    name: endpoint.name.clone(),
621                    path: endpoint.path.clone(),
622                });
623            };
624            let current = secret.as_bytes().to_vec();
625            let previous = endpoint
626                .previous_secrets
627                .iter()
628                .map(|secret| secret.as_bytes().to_vec())
629                .collect();
630            endpoints_by_path.insert(
631                endpoint.path.clone(),
632                Arc::new(ResolvedWebhookEndpoint {
633                    config: endpoint,
634                    keys: crate::security::config::ResolvedSigningKeys::new(current, previous),
635                }),
636            );
637        }
638
639        Ok(Self {
640            endpoints_by_path: Arc::new(endpoints_by_path),
641            replay_store,
642        })
643    }
644
645    fn endpoint_for_path(&self, path: &str) -> Option<Arc<ResolvedWebhookEndpoint>> {
646        self.endpoints_by_path.get(path).cloned()
647    }
648}
649
650fn validate_unique_endpoint_paths(
651    endpoints: &[WebhookEndpointConfig],
652) -> Result<(), WebhookConfigError> {
653    let mut seen_paths = HashMap::new();
654    for endpoint in endpoints {
655        if let Some(first_name) = seen_paths.insert(endpoint.path.as_str(), endpoint.name.as_str())
656        {
657            return Err(WebhookConfigError::DuplicatePath {
658                path: endpoint.path.clone(),
659                first_name: first_name.to_owned(),
660                duplicate_name: endpoint.name.clone(),
661            });
662        }
663    }
664    Ok(())
665}
666
667/// Boxed replay store operation future.
668pub type WebhookReplayFuture<'a> =
669    Pin<Box<dyn Future<Output = Result<bool, WebhookReplayStoreError>> + Send + 'a>>;
670
671/// Replay store used to reject duplicate provider delivery IDs.
672pub trait WebhookReplayStore: Send + Sync + std::fmt::Debug {
673    /// Insert a delivery key and return `true`; return `false` if it already
674    /// exists inside the replay window.
675    fn check_and_insert<'a>(
676        &'a self,
677        key: &'a str,
678        received_at: SystemTime,
679        window: Duration,
680    ) -> WebhookReplayFuture<'a>;
681
682    /// Remove a delivery key from the store.
683    fn remove<'a>(&'a self, key: &'a str) -> WebhookReplayFuture<'a>;
684}
685
686impl<T> WebhookReplayStore for Arc<T>
687where
688    T: WebhookReplayStore + ?Sized,
689{
690    fn check_and_insert<'a>(
691        &'a self,
692        key: &'a str,
693        received_at: SystemTime,
694        window: Duration,
695    ) -> WebhookReplayFuture<'a> {
696        self.as_ref().check_and_insert(key, received_at, window)
697    }
698
699    fn remove<'a>(&'a self, key: &'a str) -> WebhookReplayFuture<'a> {
700        self.as_ref().remove(key)
701    }
702}
703
704/// Replay store operation failure.
705#[derive(Debug, Clone, Error)]
706#[error("{message}")]
707pub struct WebhookReplayStoreError {
708    message: String,
709}
710
711impl WebhookReplayStoreError {
712    /// Create a replay-store failure with a human-readable diagnostic.
713    ///
714    /// Custom [`WebhookReplayStore`] implementations should return this when
715    /// their durable backend is unavailable or cannot complete the atomic
716    /// delivery-ID claim. Autumn surfaces the failure as `503 Service
717    /// Unavailable` before the webhook handler runs.
718    #[must_use]
719    pub fn new(message: impl Into<String>) -> Self {
720        Self {
721            message: message.into(),
722        }
723    }
724}
725
726#[cfg(feature = "redis")]
727impl WebhookReplayStoreError {
728    fn backend(operation: &'static str, error: impl std::fmt::Display) -> Self {
729        Self::new(format!("webhook replay store {operation} failed: {error}"))
730    }
731}
732
733/// In-memory replay protection store.
734///
735/// This is suitable for tests, development, and single-process deployments. A
736/// multi-replica production fleet should configure the Redis replay backend.
737#[derive(Debug, Default)]
738pub struct InMemoryWebhookReplayStore {
739    state: Mutex<InMemoryWebhookReplayState>,
740}
741
742#[derive(Debug, Default)]
743struct InMemoryWebhookReplayState {
744    seen: HashMap<String, SystemTime>,
745    checks_since_cleanup: usize,
746}
747
748impl InMemoryWebhookReplayStore {
749    fn check_and_insert_sync(&self, key: &str, received_at: SystemTime, window: Duration) -> bool {
750        {
751            let mut state = self
752                .state
753                .lock()
754                .expect("webhook replay store lock poisoned");
755            state.checks_since_cleanup = state.checks_since_cleanup.saturating_add(1);
756
757            if let Some(expires_at) = state.seen.get(key).copied() {
758                if expires_at.duration_since(received_at).is_ok() {
759                    Self::cleanup_if_due(&mut state, received_at);
760                    drop(state);
761                    return false;
762                }
763                state.seen.remove(key);
764            }
765
766            let expires_at = received_at.checked_add(window).unwrap_or(received_at);
767            state.seen.insert(key.to_owned(), expires_at);
768            Self::cleanup_if_due(&mut state, received_at);
769            drop(state);
770        }
771        true
772    }
773
774    fn cleanup_if_due(state: &mut InMemoryWebhookReplayState, received_at: SystemTime) {
775        if state.checks_since_cleanup < IN_MEMORY_REPLAY_CLEANUP_INTERVAL
776            && state.seen.len() <= IN_MEMORY_REPLAY_CLEANUP_HIGH_WATER
777        {
778            return;
779        }
780
781        state.checks_since_cleanup = 0;
782        state
783            .seen
784            .retain(|_, expires_at| expires_at.duration_since(received_at).is_ok());
785    }
786}
787
788impl WebhookReplayStore for InMemoryWebhookReplayStore {
789    fn check_and_insert<'a>(
790        &'a self,
791        key: &'a str,
792        received_at: SystemTime,
793        window: Duration,
794    ) -> WebhookReplayFuture<'a> {
795        Box::pin(async move { Ok(self.check_and_insert_sync(key, received_at, window)) })
796    }
797
798    fn remove<'a>(&'a self, key: &'a str) -> WebhookReplayFuture<'a> {
799        self.state
800            .lock()
801            .expect("webhook replay store lock poisoned")
802            .seen
803            .remove(key);
804        Box::pin(async move { Ok(true) })
805    }
806}
807
808/// Redis replay protection store.
809#[cfg(feature = "redis")]
810#[derive(Clone, Debug)]
811pub struct RedisWebhookReplayStore {
812    connection: redis::aio::ConnectionManager,
813    key_prefix: String,
814}
815
816#[cfg(feature = "redis")]
817impl RedisWebhookReplayStore {
818    /// Build a Redis replay store from config.
819    ///
820    /// # Errors
821    ///
822    /// Returns [`WebhookConfigError`] when the URL is absent or malformed.
823    pub fn from_config(config: &WebhookReplayRedisConfig) -> Result<Self, WebhookConfigError> {
824        let url = config
825            .url
826            .as_deref()
827            .filter(|url| !url.trim().is_empty())
828            .ok_or(WebhookConfigError::RedisReplayMissingUrl)?;
829        let client = redis::Client::open(url)
830            .map_err(|error| WebhookConfigError::RedisReplayInvalidUrl(error.to_string()))?;
831        let connection = redis::aio::ConnectionManager::new_lazy_with_config(
832            client,
833            redis::aio::ConnectionManagerConfig::new(),
834        )
835        .map_err(|error| WebhookConfigError::RedisReplayInvalidUrl(error.to_string()))?;
836        Ok(Self {
837            connection,
838            key_prefix: config.key_prefix.clone(),
839        })
840    }
841
842    fn key_for(&self, replay_key: &str) -> String {
843        format!("{}:{replay_key}", self.key_prefix)
844    }
845}
846
847#[cfg(feature = "redis")]
848impl WebhookReplayStore for RedisWebhookReplayStore {
849    fn check_and_insert<'a>(
850        &'a self,
851        key: &'a str,
852        received_at: SystemTime,
853        window: Duration,
854    ) -> WebhookReplayFuture<'a> {
855        Box::pin(async move {
856            let mut connection = self.connection.clone();
857            let key = self.key_for(key);
858            let ttl_secs = window.as_secs().max(1);
859            let received_unix = received_at
860                .duration_since(UNIX_EPOCH)
861                .map_err(|error| WebhookReplayStoreError::backend("timestamp", error))?
862                .as_secs()
863                .to_string();
864            let inserted: Option<String> = redis::cmd("SET")
865                .arg(&key)
866                .arg(received_unix)
867                .arg("NX")
868                .arg("EX")
869                .arg(ttl_secs)
870                .query_async(&mut connection)
871                .await
872                .map_err(|error| WebhookReplayStoreError::backend("insert", error))?;
873            Ok(inserted.is_some())
874        })
875    }
876
877    fn remove<'a>(&'a self, key: &'a str) -> WebhookReplayFuture<'a> {
878        Box::pin(async move {
879            let mut connection = self.connection.clone();
880            let key = self.key_for(key);
881            let _: () = redis::cmd("DEL")
882                .arg(&key)
883                .query_async(&mut connection)
884                .await
885                .map_err(|error| WebhookReplayStoreError::backend("delete", error))?;
886            Ok(true)
887        })
888    }
889}
890
891/// A request that has passed signed webhook verification.
892#[derive(Debug, Clone)]
893pub struct SignedWebhook {
894    provider: WebhookProvider,
895    endpoint: String,
896    delivery_id: Option<String>,
897    event_type: Option<String>,
898    received_at: SystemTime,
899    raw_body: Bytes,
900}
901
902impl SignedWebhook {
903    /// Provider preset that verified this request.
904    #[must_use]
905    pub const fn provider(&self) -> &'static str {
906        self.provider.as_str()
907    }
908
909    /// Configured endpoint name.
910    #[must_use]
911    pub fn endpoint(&self) -> &str {
912        &self.endpoint
913    }
914
915    /// Provider delivery ID, when present.
916    #[must_use]
917    pub fn delivery_id(&self) -> Option<&str> {
918        self.delivery_id.as_deref()
919    }
920
921    /// Provider event type, when present.
922    #[must_use]
923    pub fn event_type(&self) -> Option<&str> {
924        self.event_type.as_deref()
925    }
926
927    /// Request receive time used for timestamp and replay checks.
928    #[must_use]
929    pub const fn received_at(&self) -> SystemTime {
930        self.received_at
931    }
932
933    /// Exact verified request body bytes.
934    #[must_use]
935    pub fn raw_body(&self) -> &[u8] {
936        &self.raw_body
937    }
938
939    /// Decode the verified body as JSON.
940    ///
941    /// # Errors
942    ///
943    /// Returns `serde_json::Error` when the verified body is not valid JSON for
944    /// the requested type.
945    pub fn json<T: serde::de::DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
946        serde_json::from_slice(&self.raw_body)
947    }
948}
949
950impl FromRequest<crate::AppState> for SignedWebhook {
951    type Rejection = crate::AutumnError;
952
953    async fn from_request(
954        req: axum::extract::Request,
955        state: &crate::AppState,
956    ) -> Result<Self, Self::Rejection> {
957        let (parts, body) = req.into_parts();
958        let path = parts.uri.path().to_owned();
959        let registry = state
960            .extension::<WebhookRegistry>()
961            .ok_or_else(|| WebhookVerifyError::RegistryMissing.into_autumn_error())?;
962        let endpoint = registry
963            .endpoint_for_path(&path)
964            .ok_or_else(|| WebhookVerifyError::EndpointMissing(path.clone()).into_autumn_error())?;
965        let body = axum::body::to_bytes(body, endpoint.config.max_body_bytes)
966            .await
967            .map_err(|err| {
968                crate::AutumnError::bad_request_msg(format!(
969                    "webhook body could not be read: {err}"
970                ))
971            })?;
972        let received_at = SystemTime::now();
973        verify_request(&registry, &endpoint, &parts.headers, body, received_at)
974            .await
975            .map_err(WebhookVerifyError::into_autumn_error)
976    }
977}
978
979#[derive(Debug, Error)]
980enum WebhookVerifyError {
981    #[error("signed webhook registry is not installed")]
982    RegistryMissing,
983    #[error("no signed webhook endpoint is configured for path {0}")]
984    EndpointMissing(String),
985    #[error("missing required webhook header {0}")]
986    MissingHeader(String),
987    #[error("malformed webhook signature")]
988    MalformedSignature,
989    #[error("malformed webhook timestamp")]
990    MalformedTimestamp,
991    #[error("webhook timestamp is outside the accepted tolerance")]
992    StaleTimestamp,
993    #[error("webhook signature mismatch")]
994    SignatureMismatch,
995    #[error("missing webhook delivery ID")]
996    MissingDeliveryId,
997    #[error("duplicate webhook delivery")]
998    DuplicateDelivery,
999    #[error("webhook replay store unavailable: {0}")]
1000    ReplayStoreUnavailable(String),
1001}
1002
1003impl WebhookVerifyError {
1004    const fn status(&self) -> StatusCode {
1005        match self {
1006            Self::RegistryMissing | Self::EndpointMissing(_) => StatusCode::INTERNAL_SERVER_ERROR,
1007            Self::MissingHeader(_)
1008            | Self::MalformedSignature
1009            | Self::MalformedTimestamp
1010            | Self::MissingDeliveryId => StatusCode::BAD_REQUEST,
1011            Self::StaleTimestamp | Self::SignatureMismatch => StatusCode::UNAUTHORIZED,
1012            Self::DuplicateDelivery => StatusCode::CONFLICT,
1013            Self::ReplayStoreUnavailable(_) => StatusCode::SERVICE_UNAVAILABLE,
1014        }
1015    }
1016
1017    fn into_autumn_error(self) -> crate::AutumnError {
1018        crate::AutumnError::bad_request_msg(self.to_string()).with_status(self.status())
1019    }
1020}
1021
1022type ReplayStoreCell =
1023    std::sync::Arc<std::sync::Mutex<Option<(std::sync::Arc<dyn WebhookReplayStore>, Vec<String>)>>>;
1024
1025tokio::task_local! {
1026    pub static WEBHOOK_REPLAY_KEY: ReplayStoreCell;
1027}
1028
1029struct ReplayKeyGuard {
1030    cell: ReplayStoreCell,
1031    completed: bool,
1032}
1033
1034impl Drop for ReplayKeyGuard {
1035    fn drop(&mut self) {
1036        if !self.completed {
1037            let to_remove = match self.cell.lock() {
1038                Ok(mut guard) => guard.take(),
1039                Err(poisoned) => poisoned.into_inner().take(),
1040            };
1041            if let Some((store, keys)) = to_remove {
1042                tokio::spawn(async move {
1043                    for key in keys {
1044                        tracing::debug!(key = %key, "Releasing webhook replay key due to panic in handler");
1045                        let _ = store.remove(&key).await;
1046                    }
1047                });
1048            }
1049        }
1050    }
1051}
1052
1053/// Middleware to clean up webhook replay keys on handler failure.
1054pub async fn webhook_replay_cleanup_middleware(
1055    req: axum::extract::Request,
1056    next: axum::middleware::Next,
1057) -> axum::response::Response {
1058    let cell = std::sync::Arc::new(std::sync::Mutex::new(None));
1059    let cell_cloned = cell.clone();
1060
1061    let mut guard = ReplayKeyGuard {
1062        cell: cell_cloned.clone(),
1063        completed: false,
1064    };
1065
1066    let response = WEBHOOK_REPLAY_KEY
1067        .scope(cell, async move { next.run(req).await })
1068        .await;
1069
1070    guard.completed = true;
1071
1072    if response.status().is_server_error() {
1073        let to_remove = match cell_cloned.lock() {
1074            Ok(mut guard) => guard.take(),
1075            Err(poisoned) => poisoned.into_inner().take(),
1076        };
1077        if let Some((store, keys)) = to_remove {
1078            for key in keys {
1079                tracing::debug!(key = %key, "Releasing webhook replay key due to 5xx server error");
1080                let _ = store.remove(&key).await;
1081            }
1082        }
1083    }
1084
1085    response
1086}
1087
1088async fn verify_request(
1089    registry: &WebhookRegistry,
1090    endpoint: &ResolvedWebhookEndpoint,
1091    headers: &HeaderMap,
1092    body: Bytes,
1093    received_at: SystemTime,
1094) -> Result<SignedWebhook, WebhookVerifyError> {
1095    match endpoint.config.provider {
1096        WebhookProvider::Stripe => verify_stripe(endpoint, headers, &body, received_at)?,
1097        WebhookProvider::Github | WebhookProvider::Generic => {
1098            verify_body_hmac(endpoint, headers, &body, None, received_at)?;
1099        }
1100        WebhookProvider::Slack => verify_slack(endpoint, headers, &body, received_at)?,
1101    }
1102
1103    let json_body = serde_json::from_slice::<serde_json::Value>(&body).ok();
1104    let delivery_id = resolve_delivery_id(&endpoint.config, headers, json_body.as_ref());
1105    if endpoint.config.replay_protection {
1106        let delivery_id = delivery_id
1107            .as_deref()
1108            .ok_or(WebhookVerifyError::MissingDeliveryId)?;
1109
1110        let mut keys_to_check = vec![format!(
1111            "{}:{}:id:{delivery_id}",
1112            endpoint.config.provider.as_str(),
1113            endpoint.config.name
1114        )];
1115
1116        if matches!(
1117            endpoint.config.provider,
1118            WebhookProvider::Github | WebhookProvider::Generic
1119        ) {
1120            let sig_hdr = signature_header(endpoint);
1121            if let Some(sig_val) = headers.get(sig_hdr).and_then(|v| v.to_str().ok()) {
1122                use sha2::{Digest, Sha256};
1123                let mut hasher = Sha256::new();
1124                hasher.update(sig_val.as_bytes());
1125                let sig_hash = hex::encode(hasher.finalize());
1126                keys_to_check.push(format!(
1127                    "{}:{}:sig:{sig_hash}",
1128                    endpoint.config.provider.as_str(),
1129                    endpoint.config.name
1130                ));
1131            }
1132        }
1133
1134        let window = Duration::from_secs(endpoint.config.replay_window_secs);
1135        let mut inserted_keys = Vec::new();
1136        for key in keys_to_check {
1137            match registry
1138                .replay_store
1139                .check_and_insert(&key, received_at, window)
1140                .await
1141            {
1142                Ok(true) => {
1143                    inserted_keys.push(key);
1144                }
1145                Ok(false) => {
1146                    // Rollback already inserted keys for this request
1147                    for inserted in inserted_keys {
1148                        let _ = registry.replay_store.remove(&inserted).await;
1149                    }
1150                    return Err(WebhookVerifyError::DuplicateDelivery);
1151                }
1152                Err(error) => {
1153                    // Rollback already inserted keys for this request
1154                    for inserted in inserted_keys {
1155                        let _ = registry.replay_store.remove(&inserted).await;
1156                    }
1157                    return Err(WebhookVerifyError::ReplayStoreUnavailable(
1158                        error.to_string(),
1159                    ));
1160                }
1161            }
1162        }
1163
1164        let _ = WEBHOOK_REPLAY_KEY.try_with(|cell| {
1165            let guard = match cell.lock() {
1166                Ok(g) => Some(g),
1167                Err(poisoned) => Some(poisoned.into_inner()),
1168            };
1169            if let Some(mut guard) = guard {
1170                *guard = Some((std::sync::Arc::clone(&registry.replay_store), inserted_keys));
1171            }
1172        });
1173    }
1174
1175    Ok(SignedWebhook {
1176        provider: endpoint.config.provider,
1177        endpoint: endpoint.config.name.clone(),
1178        delivery_id,
1179        event_type: resolve_event_type(&endpoint.config, headers, json_body.as_ref()),
1180        received_at,
1181        raw_body: body,
1182    })
1183}
1184
1185fn verify_stripe(
1186    endpoint: &ResolvedWebhookEndpoint,
1187    headers: &HeaderMap,
1188    body: &[u8],
1189    received_at: SystemTime,
1190) -> Result<(), WebhookVerifyError> {
1191    let header = required_header(headers, signature_header(endpoint))?;
1192    let (timestamp, signatures) = parse_stripe_signature(header)?;
1193    verify_timestamp(
1194        timestamp,
1195        received_at,
1196        endpoint.config.timestamp_tolerance_secs,
1197    )?;
1198
1199    let timestamp = timestamp.to_string();
1200    let mut signed_payload = Vec::with_capacity(timestamp.len() + 1 + body.len());
1201    signed_payload.extend_from_slice(timestamp.as_bytes());
1202    signed_payload.push(b'.');
1203    signed_payload.extend_from_slice(body);
1204
1205    if signatures
1206        .iter()
1207        .any(|signature| endpoint.keys.verify(&signed_payload, signature))
1208    {
1209        Ok(())
1210    } else {
1211        Err(WebhookVerifyError::SignatureMismatch)
1212    }
1213}
1214
1215fn verify_slack(
1216    endpoint: &ResolvedWebhookEndpoint,
1217    headers: &HeaderMap,
1218    body: &[u8],
1219    received_at: SystemTime,
1220) -> Result<(), WebhookVerifyError> {
1221    let timestamp_header = endpoint
1222        .config
1223        .timestamp_header
1224        .as_deref()
1225        .ok_or(WebhookVerifyError::MalformedTimestamp)?;
1226    let timestamp = required_header(headers, timestamp_header)?
1227        .parse::<i64>()
1228        .map_err(|_| WebhookVerifyError::MalformedTimestamp)?;
1229    verify_timestamp(
1230        timestamp,
1231        received_at,
1232        endpoint.config.timestamp_tolerance_secs,
1233    )?;
1234
1235    let timestamp = timestamp.to_string();
1236    let mut signed_payload = Vec::with_capacity(3 + timestamp.len() + 1 + body.len());
1237    signed_payload.extend_from_slice(b"v0:");
1238    signed_payload.extend_from_slice(timestamp.as_bytes());
1239    signed_payload.push(b':');
1240    signed_payload.extend_from_slice(body);
1241    verify_body_hmac(
1242        endpoint,
1243        headers,
1244        &signed_payload,
1245        endpoint.config.signature_prefix.as_deref(),
1246        received_at,
1247    )
1248}
1249
1250fn verify_body_hmac(
1251    endpoint: &ResolvedWebhookEndpoint,
1252    headers: &HeaderMap,
1253    body_or_base: &[u8],
1254    explicit_prefix: Option<&str>,
1255    received_at: SystemTime,
1256) -> Result<(), WebhookVerifyError> {
1257    if let Some(timestamp_header) = endpoint.config.timestamp_header.as_deref()
1258        && endpoint.config.provider != WebhookProvider::Slack
1259    {
1260        let timestamp = required_header(headers, timestamp_header)?
1261            .parse::<i64>()
1262            .map_err(|_| WebhookVerifyError::MalformedTimestamp)?;
1263        verify_timestamp(
1264            timestamp,
1265            received_at,
1266            endpoint.config.timestamp_tolerance_secs,
1267        )?;
1268    }
1269
1270    let mut signature = required_header(headers, signature_header(endpoint))?;
1271    let prefix = explicit_prefix.or(endpoint.config.signature_prefix.as_deref());
1272    if let Some(prefix) = prefix {
1273        signature = signature
1274            .strip_prefix(prefix)
1275            .ok_or(WebhookVerifyError::MalformedSignature)?;
1276    }
1277
1278    if endpoint.keys.verify(body_or_base, signature) {
1279        Ok(())
1280    } else {
1281        Err(WebhookVerifyError::SignatureMismatch)
1282    }
1283}
1284
1285fn signature_header(endpoint: &ResolvedWebhookEndpoint) -> &str {
1286    endpoint
1287        .config
1288        .signature_header
1289        .as_deref()
1290        .unwrap_or("X-Webhook-Signature")
1291}
1292
1293fn required_header<'a>(headers: &'a HeaderMap, name: &str) -> Result<&'a str, WebhookVerifyError> {
1294    headers
1295        .get(name)
1296        .ok_or_else(|| WebhookVerifyError::MissingHeader(name.to_owned()))?
1297        .to_str()
1298        .map_err(|_| WebhookVerifyError::MalformedSignature)
1299}
1300
1301fn parse_stripe_signature(header: &str) -> Result<(i64, Vec<&str>), WebhookVerifyError> {
1302    let mut timestamp = None;
1303    let mut signatures = Vec::new();
1304
1305    for part in header.split(',') {
1306        let Some((key, value)) = part.split_once('=') else {
1307            return Err(WebhookVerifyError::MalformedSignature);
1308        };
1309        match key.trim() {
1310            "t" => {
1311                timestamp = Some(
1312                    value
1313                        .trim()
1314                        .parse::<i64>()
1315                        .map_err(|_| WebhookVerifyError::MalformedTimestamp)?,
1316                );
1317            }
1318            "v1" => signatures.push(value.trim()),
1319            _ => {}
1320        }
1321    }
1322
1323    let timestamp = timestamp.ok_or(WebhookVerifyError::MalformedTimestamp)?;
1324    if signatures.is_empty() {
1325        return Err(WebhookVerifyError::MalformedSignature);
1326    }
1327    Ok((timestamp, signatures))
1328}
1329
1330fn verify_timestamp(
1331    timestamp: i64,
1332    received_at: SystemTime,
1333    tolerance_secs: u64,
1334) -> Result<(), WebhookVerifyError> {
1335    let now = i64::try_from(
1336        received_at
1337            .duration_since(UNIX_EPOCH)
1338            .map_err(|_| WebhookVerifyError::MalformedTimestamp)?
1339            .as_secs(),
1340    )
1341    .map_err(|_| WebhookVerifyError::MalformedTimestamp)?;
1342    let skew = now.abs_diff(timestamp);
1343    if skew > tolerance_secs {
1344        return Err(WebhookVerifyError::StaleTimestamp);
1345    }
1346    Ok(())
1347}
1348
1349fn resolve_delivery_id(
1350    config: &WebhookEndpointConfig,
1351    headers: &HeaderMap,
1352    json_body: Option<&serde_json::Value>,
1353) -> Option<String> {
1354    let header = config
1355        .delivery_id_header
1356        .as_deref()
1357        .and_then(|header| optional_header(headers, header));
1358
1359    match config.provider {
1360        WebhookProvider::Slack => header
1361            .or_else(|| slack_delivery_id(json_body))
1362            .or_else(|| json_string_field(json_body, "id")),
1363        _ => header.or_else(|| json_string_field(json_body, "id")),
1364    }
1365}
1366
1367fn resolve_event_type(
1368    config: &WebhookEndpointConfig,
1369    headers: &HeaderMap,
1370    json_body: Option<&serde_json::Value>,
1371) -> Option<String> {
1372    config
1373        .event_type_header
1374        .as_deref()
1375        .and_then(|header| optional_header(headers, header))
1376        .or_else(|| json_string_field(json_body, "type"))
1377        .or_else(|| nested_json_string_field(json_body, "event", "type"))
1378}
1379
1380fn optional_header(headers: &HeaderMap, name: &str) -> Option<String> {
1381    headers
1382        .get(name)
1383        .and_then(|value| value.to_str().ok())
1384        .filter(|value| !value.trim().is_empty())
1385        .map(str::to_owned)
1386}
1387
1388fn slack_delivery_id(json_body: Option<&serde_json::Value>) -> Option<String> {
1389    json_string_field(json_body, "event_id").or_else(|| {
1390        let value = json_body?;
1391        if value.get("type").and_then(serde_json::Value::as_str) == Some("url_verification") {
1392            value
1393                .get("challenge")
1394                .and_then(serde_json::Value::as_str)
1395                .map(str::to_owned)
1396        } else {
1397            None
1398        }
1399    })
1400}
1401
1402fn json_string_field(value: Option<&serde_json::Value>, field: &str) -> Option<String> {
1403    let value = value?;
1404    value
1405        .get(field)
1406        .and_then(serde_json::Value::as_str)
1407        .map(str::to_owned)
1408}
1409
1410fn nested_json_string_field(
1411    value: Option<&serde_json::Value>,
1412    parent: &str,
1413    field: &str,
1414) -> Option<String> {
1415    let value = value?;
1416    value
1417        .get(parent)
1418        .and_then(|parent_value| parent_value.get(field))
1419        .and_then(serde_json::Value::as_str)
1420        .map(str::to_owned)
1421}
1422
1423const fn default_timestamp_tolerance_secs() -> u64 {
1424    DEFAULT_TIMESTAMP_TOLERANCE_SECS
1425}
1426
1427const fn default_replay_window_secs() -> u64 {
1428    DEFAULT_REPLAY_WINDOW_SECS
1429}
1430
1431const fn default_max_body_bytes() -> usize {
1432    DEFAULT_MAX_BODY_BYTES
1433}
1434
1435const fn default_true() -> bool {
1436    true
1437}
1438
1439fn default_replay_redis_key_prefix() -> String {
1440    "autumn:webhooks:replay".to_owned()
1441}
1442
1443fn validate_redis_replay_config(
1444    config: &WebhookReplayRedisConfig,
1445) -> Result<(), WebhookConfigError> {
1446    let url = config
1447        .url
1448        .as_deref()
1449        .filter(|url| !url.trim().is_empty())
1450        .ok_or(WebhookConfigError::RedisReplayMissingUrl)?;
1451
1452    #[cfg(feature = "redis")]
1453    {
1454        redis::Client::open(url)
1455            .map_err(|error| WebhookConfigError::RedisReplayInvalidUrl(error.to_string()))?;
1456        Ok(())
1457    }
1458
1459    #[cfg(not(feature = "redis"))]
1460    {
1461        let _ = url;
1462        Err(WebhookConfigError::RedisReplayFeatureDisabled)
1463    }
1464}
1465
1466fn replay_store_from_config(
1467    config: &WebhookReplayConfig,
1468) -> Result<Arc<dyn WebhookReplayStore>, WebhookConfigError> {
1469    match config.backend {
1470        WebhookReplayBackend::Memory => Ok(Arc::new(InMemoryWebhookReplayStore::default())),
1471        WebhookReplayBackend::Redis => {
1472            #[cfg(feature = "redis")]
1473            {
1474                Ok(Arc::new(RedisWebhookReplayStore::from_config(
1475                    &config.redis,
1476                )?))
1477            }
1478
1479            #[cfg(not(feature = "redis"))]
1480            {
1481                Err(WebhookConfigError::RedisReplayFeatureDisabled)
1482            }
1483        }
1484    }
1485}
1486
1487pub(crate) fn install_registry_from_config(
1488    state: &crate::AppState,
1489    config: &WebhookConfig,
1490) -> Result<(), WebhookConfigError> {
1491    if config.endpoints.is_empty() {
1492        return Ok(());
1493    }
1494    let registry = WebhookRegistry::from_config(config)?;
1495    state.insert_extension(registry);
1496    Ok(())
1497}