1use std::collections::HashMap;
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use axum::extract::FromRequest;
14use bytes::Bytes;
15use http::{HeaderMap, StatusCode};
16use serde::Deserialize;
17use thiserror::Error;
18
19pub use crate::security::config::hmac_sha256_hex;
20
21const DEFAULT_TIMESTAMP_TOLERANCE_SECS: u64 = 300;
22const DEFAULT_REPLAY_WINDOW_SECS: u64 = 24 * 60 * 60;
23const DEFAULT_MAX_BODY_BYTES: usize = 1024 * 1024;
24const IN_MEMORY_REPLAY_CLEANUP_INTERVAL: usize = 128;
25const IN_MEMORY_REPLAY_CLEANUP_HIGH_WATER: usize = 16 * 1024;
26
27#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Deserialize)]
29#[serde(rename_all = "lowercase")]
30pub enum WebhookProvider {
31 Stripe,
33 Github,
35 Slack,
37 #[default]
39 Generic,
40}
41
42impl WebhookProvider {
43 #[must_use]
45 pub const fn as_str(self) -> &'static str {
46 match self {
47 Self::Stripe => "stripe",
48 Self::Github => "github",
49 Self::Slack => "slack",
50 Self::Generic => "generic",
51 }
52 }
53}
54
55impl std::fmt::Display for WebhookProvider {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.write_str(self.as_str())
58 }
59}
60
61#[derive(Debug, Clone, Default, Deserialize)]
63pub struct WebhookConfig {
64 #[serde(default)]
66 pub replay: WebhookReplayConfig,
67 #[serde(default)]
69 pub endpoints: Vec<WebhookEndpointConfig>,
70}
71
72impl WebhookConfig {
73 pub(crate) fn apply_env_overrides_with_env(&mut self, env: &dyn crate::config::Env) {
75 self.replay.apply_env_overrides_with_env(env);
76 self.resolve_secret_envs_with_env(env);
77 }
78
79 fn resolve_secret_envs_with_env(&mut self, env: &dyn crate::config::Env) {
82 for endpoint in &mut self.endpoints {
83 if endpoint.secret.is_none()
84 && let Some(env_name) = endpoint.secret_env.as_deref()
85 && let Ok(secret) = env.var(env_name)
86 {
87 endpoint.secret = Some(secret);
88 }
89
90 for env_name in &endpoint.previous_secret_envs {
91 if let Ok(secret) = env.var(env_name)
92 && !secret.is_empty()
93 {
94 endpoint.previous_secrets.push(secret);
95 }
96 }
97 }
98 }
99
100 pub fn validate(&self, is_production: bool) -> Result<(), WebhookConfigError> {
107 for endpoint in &self.endpoints {
108 endpoint.validate(is_production)?;
109 }
110 validate_unique_endpoint_paths(&self.endpoints)?;
111 if self
112 .endpoints
113 .iter()
114 .any(|endpoint| endpoint.replay_protection)
115 {
116 self.replay.validate(is_production)?;
117 }
118 Ok(())
119 }
120}
121
122#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize)]
124#[serde(rename_all = "lowercase")]
125pub enum WebhookReplayBackend {
126 #[serde(alias = "local", alias = "in_memory")]
129 #[default]
130 Memory,
131 Redis,
133}
134
135impl WebhookReplayBackend {
136 fn from_env_value(value: &str) -> Option<Self> {
137 match value.trim().to_ascii_lowercase().as_str() {
138 "memory" | "local" | "in_memory" | "in-memory" => Some(Self::Memory),
139 "redis" => Some(Self::Redis),
140 _ => None,
141 }
142 }
143}
144
145#[derive(Debug, Clone, Deserialize)]
147pub struct WebhookReplayConfig {
148 #[serde(default)]
150 pub backend: WebhookReplayBackend,
151 #[serde(default)]
153 pub allow_memory_in_production: bool,
154 #[serde(default)]
156 pub redis: WebhookReplayRedisConfig,
157}
158
159impl Default for WebhookReplayConfig {
160 fn default() -> Self {
161 Self {
162 backend: WebhookReplayBackend::Memory,
163 allow_memory_in_production: false,
164 redis: WebhookReplayRedisConfig::default(),
165 }
166 }
167}
168
169impl WebhookReplayConfig {
170 fn apply_env_overrides_with_env(&mut self, env: &dyn crate::config::Env) {
171 if let Ok(value) = env.var("AUTUMN_SECURITY__WEBHOOKS__REPLAY__BACKEND") {
172 if let Some(backend) = WebhookReplayBackend::from_env_value(&value) {
173 self.backend = backend;
174 } else {
175 eprintln!(
176 "Warning: AUTUMN_SECURITY__WEBHOOKS__REPLAY__BACKEND={value:?} is not valid \
177 (expected memory or redis), ignoring"
178 );
179 }
180 }
181 if let Ok(value) = env.var("AUTUMN_SECURITY__WEBHOOKS__REPLAY__ALLOW_MEMORY_IN_PRODUCTION")
182 {
183 match value.trim().parse::<bool>() {
184 Ok(value) => self.allow_memory_in_production = value,
185 Err(error) => eprintln!(
186 "Warning: AUTUMN_SECURITY__WEBHOOKS__REPLAY__ALLOW_MEMORY_IN_PRODUCTION \
187 could not be parsed as bool: {error}"
188 ),
189 }
190 }
191 if let Ok(value) = env.var("AUTUMN_SECURITY__WEBHOOKS__REPLAY__REDIS__URL") {
192 let value = value.trim();
193 self.redis.url = if value.is_empty() {
194 None
195 } else {
196 Some(value.to_owned())
197 };
198 }
199 if let Ok(value) = env.var("AUTUMN_SECURITY__WEBHOOKS__REPLAY__REDIS__KEY_PREFIX")
200 && !value.trim().is_empty()
201 {
202 self.redis.key_prefix = value;
203 }
204 }
205
206 fn validate(&self, is_production: bool) -> Result<(), WebhookConfigError> {
207 match self.backend {
208 WebhookReplayBackend::Memory => {
209 if is_production && !self.allow_memory_in_production {
210 return Err(WebhookConfigError::MemoryReplayInProduction);
211 }
212 Ok(())
213 }
214 WebhookReplayBackend::Redis => validate_redis_replay_config(&self.redis),
215 }
216 }
217}
218
219#[derive(Debug, Clone, Deserialize)]
221pub struct WebhookReplayRedisConfig {
222 #[serde(default)]
224 pub url: Option<String>,
225 #[serde(default = "default_replay_redis_key_prefix")]
227 pub key_prefix: String,
228}
229
230impl Default for WebhookReplayRedisConfig {
231 fn default() -> Self {
232 Self {
233 url: None,
234 key_prefix: default_replay_redis_key_prefix(),
235 }
236 }
237}
238
239#[derive(Debug, Clone, Deserialize)]
241pub struct WebhookEndpointConfig {
242 pub name: String,
244 pub path: String,
246 #[serde(default)]
248 pub provider: WebhookProvider,
249 #[serde(default)]
251 pub secret: Option<String>,
252 #[serde(default)]
254 pub secret_env: Option<String>,
255 #[serde(default)]
257 pub previous_secrets: Vec<String>,
258 #[serde(default)]
260 pub previous_secret_envs: Vec<String>,
261 #[serde(default = "default_timestamp_tolerance_secs")]
263 pub timestamp_tolerance_secs: u64,
264 #[serde(default = "default_replay_window_secs")]
266 pub replay_window_secs: u64,
267 #[serde(default = "default_true")]
269 pub replay_protection: bool,
270 #[serde(default)]
273 pub signature_header: Option<String>,
274 #[serde(default)]
276 pub signature_prefix: Option<String>,
277 #[serde(default)]
279 pub timestamp_header: Option<String>,
280 #[serde(default)]
282 pub delivery_id_header: Option<String>,
283 #[serde(default)]
285 pub event_type_header: Option<String>,
286 #[serde(default = "default_max_body_bytes")]
288 pub max_body_bytes: usize,
289}
290
291impl Default for WebhookEndpointConfig {
292 fn default() -> Self {
293 Self::provider_defaults(WebhookProvider::Generic)
294 }
295}
296
297impl WebhookEndpointConfig {
298 #[must_use]
300 pub fn new(
301 name: impl Into<String>,
302 path: impl Into<String>,
303 provider: WebhookProvider,
304 secret: impl Into<String>,
305 ) -> Self {
306 let mut config = Self::provider_defaults(provider);
307 config.name = name.into();
308 config.path = path.into();
309 config.secret = Some(secret.into());
310 config
311 }
312
313 #[must_use]
315 pub fn stripe(
316 name: impl Into<String>,
317 path: impl Into<String>,
318 secret: impl Into<String>,
319 ) -> Self {
320 Self::new(name, path, WebhookProvider::Stripe, secret)
321 }
322
323 #[must_use]
325 pub fn github(
326 name: impl Into<String>,
327 path: impl Into<String>,
328 secret: impl Into<String>,
329 ) -> Self {
330 Self::new(name, path, WebhookProvider::Github, secret)
331 }
332
333 #[must_use]
335 pub fn slack(
336 name: impl Into<String>,
337 path: impl Into<String>,
338 secret: impl Into<String>,
339 ) -> Self {
340 Self::new(name, path, WebhookProvider::Slack, secret)
341 }
342
343 #[must_use]
345 pub fn generic(
346 name: impl Into<String>,
347 path: impl Into<String>,
348 secret: impl Into<String>,
349 ) -> Self {
350 Self::new(name, path, WebhookProvider::Generic, secret)
351 }
352
353 #[must_use]
355 pub fn with_previous_secret(mut self, secret: impl Into<String>) -> Self {
356 self.previous_secrets.push(secret.into());
357 self
358 }
359
360 #[must_use]
362 pub const fn with_timestamp_tolerance_secs(mut self, secs: u64) -> Self {
363 self.timestamp_tolerance_secs = secs;
364 self
365 }
366
367 #[must_use]
369 pub const fn with_replay_window_secs(mut self, secs: u64) -> Self {
370 self.replay_window_secs = secs;
371 self
372 }
373
374 #[must_use]
376 pub const fn without_replay_protection(mut self) -> Self {
377 self.replay_protection = false;
378 self
379 }
380
381 fn provider_defaults(provider: WebhookProvider) -> Self {
382 let mut config = Self {
383 name: String::new(),
384 path: String::new(),
385 provider,
386 secret: None,
387 secret_env: None,
388 previous_secrets: Vec::new(),
389 previous_secret_envs: Vec::new(),
390 timestamp_tolerance_secs: DEFAULT_TIMESTAMP_TOLERANCE_SECS,
391 replay_window_secs: DEFAULT_REPLAY_WINDOW_SECS,
392 replay_protection: true,
393 signature_header: None,
394 signature_prefix: None,
395 timestamp_header: None,
396 delivery_id_header: None,
397 event_type_header: None,
398 max_body_bytes: DEFAULT_MAX_BODY_BYTES,
399 };
400
401 match provider {
402 WebhookProvider::Stripe => {
403 config.signature_header = Some("Stripe-Signature".to_owned());
404 }
405 WebhookProvider::Github => {
406 config.signature_header = Some("X-Hub-Signature-256".to_owned());
407 config.signature_prefix = Some("sha256=".to_owned());
408 config.delivery_id_header = Some("X-GitHub-Delivery".to_owned());
409 config.event_type_header = Some("X-GitHub-Event".to_owned());
410 }
411 WebhookProvider::Slack => {
412 config.signature_header = Some("X-Slack-Signature".to_owned());
413 config.signature_prefix = Some("v0=".to_owned());
414 config.timestamp_header = Some("X-Slack-Request-Timestamp".to_owned());
415 }
416 WebhookProvider::Generic => {
417 config.signature_header = Some("X-Webhook-Signature".to_owned());
418 config.signature_prefix = Some("sha256=".to_owned());
419 config.delivery_id_header = Some("X-Webhook-Delivery".to_owned());
420 config.event_type_header = Some("X-Webhook-Event".to_owned());
421 }
422 }
423
424 config
425 }
426
427 fn validate(&self, is_production: bool) -> Result<(), WebhookConfigError> {
428 if self.name.trim().is_empty() {
429 return Err(WebhookConfigError::InvalidEndpoint {
430 name: self.name.clone(),
431 message: "name must not be empty".to_owned(),
432 });
433 }
434 if !self.path.starts_with('/') || self.path.trim() == "/" || self.path.trim().is_empty() {
435 return Err(WebhookConfigError::InvalidEndpoint {
436 name: self.name.clone(),
437 message: format!("path {:?} must start with '/' and not be root", self.path),
438 });
439 }
440 let Some(secret) = self.secret.as_deref().filter(|value| !value.is_empty()) else {
441 return Err(WebhookConfigError::MissingSecret {
442 name: self.name.clone(),
443 path: self.path.clone(),
444 });
445 };
446
447 if is_production {
448 crate::security::config::validate_signing_secret(Some(secret), true).map_err(
449 |reason| WebhookConfigError::InvalidSecret {
450 name: self.name.clone(),
451 reason,
452 },
453 )?;
454 for (index, previous) in self.previous_secrets.iter().enumerate() {
455 crate::security::config::validate_signing_secret(Some(previous), true).map_err(
456 |reason| WebhookConfigError::InvalidPreviousSecret {
457 name: self.name.clone(),
458 index,
459 reason,
460 },
461 )?;
462 }
463 }
464
465 Ok(())
466 }
467
468 fn apply_provider_defaults(&mut self) {
469 let defaults = Self::provider_defaults(self.provider);
470 if self.signature_header.is_none() {
471 self.signature_header = defaults.signature_header;
472 }
473 if self.signature_prefix.is_none() {
474 self.signature_prefix = defaults.signature_prefix;
475 }
476 if self.timestamp_header.is_none() {
477 self.timestamp_header = defaults.timestamp_header;
478 }
479 if self.delivery_id_header.is_none() {
480 self.delivery_id_header = defaults.delivery_id_header;
481 }
482 if self.event_type_header.is_none() {
483 self.event_type_header = defaults.event_type_header;
484 }
485 }
486}
487
488#[derive(Debug, Clone, PartialEq, Eq, Error)]
490pub enum WebhookConfigError {
491 #[error("webhook endpoint {name:?} at {path:?} is missing a secret")]
493 MissingSecret {
494 name: String,
496 path: String,
498 },
499 #[error("webhook endpoint {name:?} is invalid: {message}")]
501 InvalidEndpoint {
502 name: String,
504 message: String,
506 },
507 #[error(
509 "duplicate webhook endpoint path {path:?}: endpoints {first_name:?} and \
510 {duplicate_name:?} would shadow each other"
511 )]
512 DuplicatePath {
513 path: String,
515 first_name: String,
517 duplicate_name: String,
519 },
520 #[error("webhook endpoint {name:?} has invalid secret: {reason}")]
522 InvalidSecret {
523 name: String,
525 reason: crate::security::config::SigningSecretError,
527 },
528 #[error("webhook endpoint {name:?} has invalid previous secret {index}: {reason}")]
530 InvalidPreviousSecret {
531 name: String,
533 index: usize,
535 reason: crate::security::config::SigningSecretError,
537 },
538 #[error(
540 "webhook replay backend memory is not allowed in production; set \
541 security.webhooks.replay.backend = \"redis\" or explicitly set \
542 security.webhooks.replay.allow_memory_in_production = true"
543 )]
544 MemoryReplayInProduction,
545 #[error("webhook redis replay backend requires security.webhooks.replay.redis.url")]
547 RedisReplayMissingUrl,
548 #[error("webhook redis replay backend URL is invalid: {0}")]
550 RedisReplayInvalidUrl(String),
551 #[error("webhook redis replay backend requires the autumn-web redis feature")]
553 RedisReplayFeatureDisabled,
554}
555
556#[derive(Debug)]
557struct ResolvedWebhookEndpoint {
558 config: WebhookEndpointConfig,
559 keys: crate::security::config::ResolvedSigningKeys,
560}
561
562#[derive(Clone, Debug)]
564pub struct WebhookRegistry {
565 endpoints_by_path: Arc<HashMap<String, Arc<ResolvedWebhookEndpoint>>>,
566 replay_store: Arc<dyn WebhookReplayStore>,
567}
568
569impl WebhookRegistry {
570 pub fn from_config(config: &WebhookConfig) -> Result<Self, WebhookConfigError> {
576 let replay_store = if config
577 .endpoints
578 .iter()
579 .any(|endpoint| endpoint.replay_protection)
580 {
581 replay_store_from_config(&config.replay)?
582 } else {
583 Arc::new(InMemoryWebhookReplayStore::default())
584 };
585 Self::from_config_with_shared_replay_store(config, replay_store)
586 }
587
588 pub fn from_config_with_replay_store(
594 config: &WebhookConfig,
595 replay_store: impl WebhookReplayStore + 'static,
596 ) -> Result<Self, WebhookConfigError> {
597 Self::from_config_with_shared_replay_store(config, Arc::new(replay_store))
598 }
599
600 pub fn from_config_with_shared_replay_store(
609 config: &WebhookConfig,
610 replay_store: Arc<dyn WebhookReplayStore>,
611 ) -> Result<Self, WebhookConfigError> {
612 validate_unique_endpoint_paths(&config.endpoints)?;
613 let mut endpoints_by_path = HashMap::new();
614 for endpoint in &config.endpoints {
615 let mut endpoint = endpoint.clone();
616 endpoint.apply_provider_defaults();
617 endpoint.validate(false)?;
618 let Some(secret) = endpoint.secret.as_ref() else {
619 return Err(WebhookConfigError::MissingSecret {
620 name: endpoint.name.clone(),
621 path: endpoint.path.clone(),
622 });
623 };
624 let current = secret.as_bytes().to_vec();
625 let previous = endpoint
626 .previous_secrets
627 .iter()
628 .map(|secret| secret.as_bytes().to_vec())
629 .collect();
630 endpoints_by_path.insert(
631 endpoint.path.clone(),
632 Arc::new(ResolvedWebhookEndpoint {
633 config: endpoint,
634 keys: crate::security::config::ResolvedSigningKeys::new(current, previous),
635 }),
636 );
637 }
638
639 Ok(Self {
640 endpoints_by_path: Arc::new(endpoints_by_path),
641 replay_store,
642 })
643 }
644
645 fn endpoint_for_path(&self, path: &str) -> Option<Arc<ResolvedWebhookEndpoint>> {
646 self.endpoints_by_path.get(path).cloned()
647 }
648}
649
650fn validate_unique_endpoint_paths(
651 endpoints: &[WebhookEndpointConfig],
652) -> Result<(), WebhookConfigError> {
653 let mut seen_paths = HashMap::new();
654 for endpoint in endpoints {
655 if let Some(first_name) = seen_paths.insert(endpoint.path.as_str(), endpoint.name.as_str())
656 {
657 return Err(WebhookConfigError::DuplicatePath {
658 path: endpoint.path.clone(),
659 first_name: first_name.to_owned(),
660 duplicate_name: endpoint.name.clone(),
661 });
662 }
663 }
664 Ok(())
665}
666
667pub type WebhookReplayFuture<'a> =
669 Pin<Box<dyn Future<Output = Result<bool, WebhookReplayStoreError>> + Send + 'a>>;
670
671pub trait WebhookReplayStore: Send + Sync + std::fmt::Debug {
673 fn check_and_insert<'a>(
676 &'a self,
677 key: &'a str,
678 received_at: SystemTime,
679 window: Duration,
680 ) -> WebhookReplayFuture<'a>;
681}
682
683impl<T> WebhookReplayStore for Arc<T>
684where
685 T: WebhookReplayStore + ?Sized,
686{
687 fn check_and_insert<'a>(
688 &'a self,
689 key: &'a str,
690 received_at: SystemTime,
691 window: Duration,
692 ) -> WebhookReplayFuture<'a> {
693 self.as_ref().check_and_insert(key, received_at, window)
694 }
695}
696
697#[derive(Debug, Clone, Error)]
699#[error("{message}")]
700pub struct WebhookReplayStoreError {
701 message: String,
702}
703
704impl WebhookReplayStoreError {
705 #[must_use]
712 pub fn new(message: impl Into<String>) -> Self {
713 Self {
714 message: message.into(),
715 }
716 }
717}
718
719#[cfg(feature = "redis")]
720impl WebhookReplayStoreError {
721 fn backend(operation: &'static str, error: impl std::fmt::Display) -> Self {
722 Self::new(format!("webhook replay store {operation} failed: {error}"))
723 }
724}
725
726#[derive(Debug, Default)]
731pub struct InMemoryWebhookReplayStore {
732 state: Mutex<InMemoryWebhookReplayState>,
733}
734
735#[derive(Debug, Default)]
736struct InMemoryWebhookReplayState {
737 seen: HashMap<String, SystemTime>,
738 checks_since_cleanup: usize,
739}
740
741impl InMemoryWebhookReplayStore {
742 fn check_and_insert_sync(&self, key: &str, received_at: SystemTime, window: Duration) -> bool {
743 {
744 let mut state = self
745 .state
746 .lock()
747 .expect("webhook replay store lock poisoned");
748 state.checks_since_cleanup = state.checks_since_cleanup.saturating_add(1);
749
750 if let Some(expires_at) = state.seen.get(key).copied() {
751 if expires_at.duration_since(received_at).is_ok() {
752 Self::cleanup_if_due(&mut state, received_at);
753 drop(state);
754 return false;
755 }
756 state.seen.remove(key);
757 }
758
759 let expires_at = received_at.checked_add(window).unwrap_or(received_at);
760 state.seen.insert(key.to_owned(), expires_at);
761 Self::cleanup_if_due(&mut state, received_at);
762 drop(state);
763 }
764 true
765 }
766
767 fn cleanup_if_due(state: &mut InMemoryWebhookReplayState, received_at: SystemTime) {
768 if state.checks_since_cleanup < IN_MEMORY_REPLAY_CLEANUP_INTERVAL
769 && state.seen.len() <= IN_MEMORY_REPLAY_CLEANUP_HIGH_WATER
770 {
771 return;
772 }
773
774 state.checks_since_cleanup = 0;
775 state
776 .seen
777 .retain(|_, expires_at| expires_at.duration_since(received_at).is_ok());
778 }
779}
780
781impl WebhookReplayStore for InMemoryWebhookReplayStore {
782 fn check_and_insert<'a>(
783 &'a self,
784 key: &'a str,
785 received_at: SystemTime,
786 window: Duration,
787 ) -> WebhookReplayFuture<'a> {
788 Box::pin(async move { Ok(self.check_and_insert_sync(key, received_at, window)) })
789 }
790}
791
792#[cfg(feature = "redis")]
794#[derive(Clone, Debug)]
795pub struct RedisWebhookReplayStore {
796 connection: redis::aio::ConnectionManager,
797 key_prefix: String,
798}
799
800#[cfg(feature = "redis")]
801impl RedisWebhookReplayStore {
802 pub fn from_config(config: &WebhookReplayRedisConfig) -> Result<Self, WebhookConfigError> {
808 let url = config
809 .url
810 .as_deref()
811 .filter(|url| !url.trim().is_empty())
812 .ok_or(WebhookConfigError::RedisReplayMissingUrl)?;
813 let client = redis::Client::open(url)
814 .map_err(|error| WebhookConfigError::RedisReplayInvalidUrl(error.to_string()))?;
815 let connection = redis::aio::ConnectionManager::new_lazy_with_config(
816 client,
817 redis::aio::ConnectionManagerConfig::new(),
818 )
819 .map_err(|error| WebhookConfigError::RedisReplayInvalidUrl(error.to_string()))?;
820 Ok(Self {
821 connection,
822 key_prefix: config.key_prefix.clone(),
823 })
824 }
825
826 fn key_for(&self, replay_key: &str) -> String {
827 format!("{}:{replay_key}", self.key_prefix)
828 }
829}
830
831#[cfg(feature = "redis")]
832impl WebhookReplayStore for RedisWebhookReplayStore {
833 fn check_and_insert<'a>(
834 &'a self,
835 key: &'a str,
836 received_at: SystemTime,
837 window: Duration,
838 ) -> WebhookReplayFuture<'a> {
839 Box::pin(async move {
840 let mut connection = self.connection.clone();
841 let key = self.key_for(key);
842 let ttl_secs = window.as_secs().max(1);
843 let received_unix = received_at
844 .duration_since(UNIX_EPOCH)
845 .map_err(|error| WebhookReplayStoreError::backend("timestamp", error))?
846 .as_secs()
847 .to_string();
848 let inserted: Option<String> = redis::cmd("SET")
849 .arg(&key)
850 .arg(received_unix)
851 .arg("NX")
852 .arg("EX")
853 .arg(ttl_secs)
854 .query_async(&mut connection)
855 .await
856 .map_err(|error| WebhookReplayStoreError::backend("insert", error))?;
857 Ok(inserted.is_some())
858 })
859 }
860}
861
862#[derive(Debug, Clone)]
864pub struct SignedWebhook {
865 provider: WebhookProvider,
866 endpoint: String,
867 delivery_id: Option<String>,
868 event_type: Option<String>,
869 received_at: SystemTime,
870 raw_body: Bytes,
871}
872
873impl SignedWebhook {
874 #[must_use]
876 pub const fn provider(&self) -> &'static str {
877 self.provider.as_str()
878 }
879
880 #[must_use]
882 pub fn endpoint(&self) -> &str {
883 &self.endpoint
884 }
885
886 #[must_use]
888 pub fn delivery_id(&self) -> Option<&str> {
889 self.delivery_id.as_deref()
890 }
891
892 #[must_use]
894 pub fn event_type(&self) -> Option<&str> {
895 self.event_type.as_deref()
896 }
897
898 #[must_use]
900 pub const fn received_at(&self) -> SystemTime {
901 self.received_at
902 }
903
904 #[must_use]
906 pub fn raw_body(&self) -> &[u8] {
907 &self.raw_body
908 }
909
910 pub fn json<T: serde::de::DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
917 serde_json::from_slice(&self.raw_body)
918 }
919}
920
921impl FromRequest<crate::AppState> for SignedWebhook {
922 type Rejection = crate::AutumnError;
923
924 async fn from_request(
925 req: axum::extract::Request,
926 state: &crate::AppState,
927 ) -> Result<Self, Self::Rejection> {
928 let (parts, body) = req.into_parts();
929 let path = parts.uri.path().to_owned();
930 let registry = state
931 .extension::<WebhookRegistry>()
932 .ok_or_else(|| WebhookVerifyError::RegistryMissing.into_autumn_error())?;
933 let endpoint = registry
934 .endpoint_for_path(&path)
935 .ok_or_else(|| WebhookVerifyError::EndpointMissing(path.clone()).into_autumn_error())?;
936 let body = axum::body::to_bytes(body, endpoint.config.max_body_bytes)
937 .await
938 .map_err(|err| {
939 crate::AutumnError::bad_request_msg(format!(
940 "webhook body could not be read: {err}"
941 ))
942 })?;
943 let received_at = SystemTime::now();
944 verify_request(®istry, &endpoint, &parts.headers, body, received_at)
945 .await
946 .map_err(WebhookVerifyError::into_autumn_error)
947 }
948}
949
950#[derive(Debug, Error)]
951enum WebhookVerifyError {
952 #[error("signed webhook registry is not installed")]
953 RegistryMissing,
954 #[error("no signed webhook endpoint is configured for path {0}")]
955 EndpointMissing(String),
956 #[error("missing required webhook header {0}")]
957 MissingHeader(String),
958 #[error("malformed webhook signature")]
959 MalformedSignature,
960 #[error("malformed webhook timestamp")]
961 MalformedTimestamp,
962 #[error("webhook timestamp is outside the accepted tolerance")]
963 StaleTimestamp,
964 #[error("webhook signature mismatch")]
965 SignatureMismatch,
966 #[error("missing webhook delivery ID")]
967 MissingDeliveryId,
968 #[error("duplicate webhook delivery")]
969 DuplicateDelivery,
970 #[error("webhook replay store unavailable: {0}")]
971 ReplayStoreUnavailable(String),
972}
973
974impl WebhookVerifyError {
975 const fn status(&self) -> StatusCode {
976 match self {
977 Self::RegistryMissing | Self::EndpointMissing(_) => StatusCode::INTERNAL_SERVER_ERROR,
978 Self::MissingHeader(_)
979 | Self::MalformedSignature
980 | Self::MalformedTimestamp
981 | Self::MissingDeliveryId => StatusCode::BAD_REQUEST,
982 Self::StaleTimestamp | Self::SignatureMismatch => StatusCode::UNAUTHORIZED,
983 Self::DuplicateDelivery => StatusCode::CONFLICT,
984 Self::ReplayStoreUnavailable(_) => StatusCode::SERVICE_UNAVAILABLE,
985 }
986 }
987
988 fn into_autumn_error(self) -> crate::AutumnError {
989 crate::AutumnError::bad_request_msg(self.to_string()).with_status(self.status())
990 }
991}
992
993async fn verify_request(
994 registry: &WebhookRegistry,
995 endpoint: &ResolvedWebhookEndpoint,
996 headers: &HeaderMap,
997 body: Bytes,
998 received_at: SystemTime,
999) -> Result<SignedWebhook, WebhookVerifyError> {
1000 match endpoint.config.provider {
1001 WebhookProvider::Stripe => verify_stripe(endpoint, headers, &body, received_at)?,
1002 WebhookProvider::Github | WebhookProvider::Generic => {
1003 verify_body_hmac(endpoint, headers, &body, None, received_at)?;
1004 }
1005 WebhookProvider::Slack => verify_slack(endpoint, headers, &body, received_at)?,
1006 }
1007
1008 let json_body = serde_json::from_slice::<serde_json::Value>(&body).ok();
1009 let delivery_id = resolve_delivery_id(&endpoint.config, headers, json_body.as_ref());
1010 if endpoint.config.replay_protection {
1011 let delivery_id = delivery_id
1012 .as_deref()
1013 .ok_or(WebhookVerifyError::MissingDeliveryId)?;
1014 let replay_key = format!(
1015 "{}:{}:{delivery_id}",
1016 endpoint.config.provider.as_str(),
1017 endpoint.config.name
1018 );
1019 let window = Duration::from_secs(endpoint.config.replay_window_secs);
1020 if !registry
1021 .replay_store
1022 .check_and_insert(&replay_key, received_at, window)
1023 .await
1024 .map_err(|error| WebhookVerifyError::ReplayStoreUnavailable(error.to_string()))?
1025 {
1026 return Err(WebhookVerifyError::DuplicateDelivery);
1027 }
1028 }
1029
1030 Ok(SignedWebhook {
1031 provider: endpoint.config.provider,
1032 endpoint: endpoint.config.name.clone(),
1033 delivery_id,
1034 event_type: resolve_event_type(&endpoint.config, headers, json_body.as_ref()),
1035 received_at,
1036 raw_body: body,
1037 })
1038}
1039
1040fn verify_stripe(
1041 endpoint: &ResolvedWebhookEndpoint,
1042 headers: &HeaderMap,
1043 body: &[u8],
1044 received_at: SystemTime,
1045) -> Result<(), WebhookVerifyError> {
1046 let header = required_header(headers, signature_header(endpoint))?;
1047 let (timestamp, signatures) = parse_stripe_signature(header)?;
1048 verify_timestamp(
1049 timestamp,
1050 received_at,
1051 endpoint.config.timestamp_tolerance_secs,
1052 )?;
1053
1054 let timestamp = timestamp.to_string();
1055 let mut signed_payload = Vec::with_capacity(timestamp.len() + 1 + body.len());
1056 signed_payload.extend_from_slice(timestamp.as_bytes());
1057 signed_payload.push(b'.');
1058 signed_payload.extend_from_slice(body);
1059
1060 if signatures
1061 .iter()
1062 .any(|signature| endpoint.keys.verify(&signed_payload, signature))
1063 {
1064 Ok(())
1065 } else {
1066 Err(WebhookVerifyError::SignatureMismatch)
1067 }
1068}
1069
1070fn verify_slack(
1071 endpoint: &ResolvedWebhookEndpoint,
1072 headers: &HeaderMap,
1073 body: &[u8],
1074 received_at: SystemTime,
1075) -> Result<(), WebhookVerifyError> {
1076 let timestamp_header = endpoint
1077 .config
1078 .timestamp_header
1079 .as_deref()
1080 .ok_or(WebhookVerifyError::MalformedTimestamp)?;
1081 let timestamp = required_header(headers, timestamp_header)?
1082 .parse::<i64>()
1083 .map_err(|_| WebhookVerifyError::MalformedTimestamp)?;
1084 verify_timestamp(
1085 timestamp,
1086 received_at,
1087 endpoint.config.timestamp_tolerance_secs,
1088 )?;
1089
1090 let timestamp = timestamp.to_string();
1091 let mut signed_payload = Vec::with_capacity(3 + timestamp.len() + 1 + body.len());
1092 signed_payload.extend_from_slice(b"v0:");
1093 signed_payload.extend_from_slice(timestamp.as_bytes());
1094 signed_payload.push(b':');
1095 signed_payload.extend_from_slice(body);
1096 verify_body_hmac(
1097 endpoint,
1098 headers,
1099 &signed_payload,
1100 endpoint.config.signature_prefix.as_deref(),
1101 received_at,
1102 )
1103}
1104
1105fn verify_body_hmac(
1106 endpoint: &ResolvedWebhookEndpoint,
1107 headers: &HeaderMap,
1108 body_or_base: &[u8],
1109 explicit_prefix: Option<&str>,
1110 received_at: SystemTime,
1111) -> Result<(), WebhookVerifyError> {
1112 if let Some(timestamp_header) = endpoint.config.timestamp_header.as_deref()
1113 && endpoint.config.provider != WebhookProvider::Slack
1114 {
1115 let timestamp = required_header(headers, timestamp_header)?
1116 .parse::<i64>()
1117 .map_err(|_| WebhookVerifyError::MalformedTimestamp)?;
1118 verify_timestamp(
1119 timestamp,
1120 received_at,
1121 endpoint.config.timestamp_tolerance_secs,
1122 )?;
1123 }
1124
1125 let mut signature = required_header(headers, signature_header(endpoint))?;
1126 let prefix = explicit_prefix.or(endpoint.config.signature_prefix.as_deref());
1127 if let Some(prefix) = prefix {
1128 signature = signature
1129 .strip_prefix(prefix)
1130 .ok_or(WebhookVerifyError::MalformedSignature)?;
1131 }
1132
1133 if endpoint.keys.verify(body_or_base, signature) {
1134 Ok(())
1135 } else {
1136 Err(WebhookVerifyError::SignatureMismatch)
1137 }
1138}
1139
1140fn signature_header(endpoint: &ResolvedWebhookEndpoint) -> &str {
1141 endpoint
1142 .config
1143 .signature_header
1144 .as_deref()
1145 .unwrap_or("X-Webhook-Signature")
1146}
1147
1148fn required_header<'a>(headers: &'a HeaderMap, name: &str) -> Result<&'a str, WebhookVerifyError> {
1149 headers
1150 .get(name)
1151 .ok_or_else(|| WebhookVerifyError::MissingHeader(name.to_owned()))?
1152 .to_str()
1153 .map_err(|_| WebhookVerifyError::MalformedSignature)
1154}
1155
1156fn parse_stripe_signature(header: &str) -> Result<(i64, Vec<&str>), WebhookVerifyError> {
1157 let mut timestamp = None;
1158 let mut signatures = Vec::new();
1159
1160 for part in header.split(',') {
1161 let Some((key, value)) = part.split_once('=') else {
1162 return Err(WebhookVerifyError::MalformedSignature);
1163 };
1164 match key.trim() {
1165 "t" => {
1166 timestamp = Some(
1167 value
1168 .trim()
1169 .parse::<i64>()
1170 .map_err(|_| WebhookVerifyError::MalformedTimestamp)?,
1171 );
1172 }
1173 "v1" => signatures.push(value.trim()),
1174 _ => {}
1175 }
1176 }
1177
1178 let timestamp = timestamp.ok_or(WebhookVerifyError::MalformedTimestamp)?;
1179 if signatures.is_empty() {
1180 return Err(WebhookVerifyError::MalformedSignature);
1181 }
1182 Ok((timestamp, signatures))
1183}
1184
1185fn verify_timestamp(
1186 timestamp: i64,
1187 received_at: SystemTime,
1188 tolerance_secs: u64,
1189) -> Result<(), WebhookVerifyError> {
1190 let now = i64::try_from(
1191 received_at
1192 .duration_since(UNIX_EPOCH)
1193 .map_err(|_| WebhookVerifyError::MalformedTimestamp)?
1194 .as_secs(),
1195 )
1196 .map_err(|_| WebhookVerifyError::MalformedTimestamp)?;
1197 let skew = now.abs_diff(timestamp);
1198 if skew > tolerance_secs {
1199 return Err(WebhookVerifyError::StaleTimestamp);
1200 }
1201 Ok(())
1202}
1203
1204fn resolve_delivery_id(
1205 config: &WebhookEndpointConfig,
1206 headers: &HeaderMap,
1207 json_body: Option<&serde_json::Value>,
1208) -> Option<String> {
1209 let header = config
1210 .delivery_id_header
1211 .as_deref()
1212 .and_then(|header| optional_header(headers, header));
1213
1214 match config.provider {
1215 WebhookProvider::Slack => header
1216 .or_else(|| slack_delivery_id(json_body))
1217 .or_else(|| json_string_field(json_body, "id")),
1218 _ => header.or_else(|| json_string_field(json_body, "id")),
1219 }
1220}
1221
1222fn resolve_event_type(
1223 config: &WebhookEndpointConfig,
1224 headers: &HeaderMap,
1225 json_body: Option<&serde_json::Value>,
1226) -> Option<String> {
1227 config
1228 .event_type_header
1229 .as_deref()
1230 .and_then(|header| optional_header(headers, header))
1231 .or_else(|| json_string_field(json_body, "type"))
1232 .or_else(|| nested_json_string_field(json_body, "event", "type"))
1233}
1234
1235fn optional_header(headers: &HeaderMap, name: &str) -> Option<String> {
1236 headers
1237 .get(name)
1238 .and_then(|value| value.to_str().ok())
1239 .filter(|value| !value.trim().is_empty())
1240 .map(str::to_owned)
1241}
1242
1243fn slack_delivery_id(json_body: Option<&serde_json::Value>) -> Option<String> {
1244 json_string_field(json_body, "event_id").or_else(|| {
1245 let value = json_body?;
1246 if value.get("type").and_then(serde_json::Value::as_str) == Some("url_verification") {
1247 value
1248 .get("challenge")
1249 .and_then(serde_json::Value::as_str)
1250 .map(str::to_owned)
1251 } else {
1252 None
1253 }
1254 })
1255}
1256
1257fn json_string_field(value: Option<&serde_json::Value>, field: &str) -> Option<String> {
1258 let value = value?;
1259 value
1260 .get(field)
1261 .and_then(serde_json::Value::as_str)
1262 .map(str::to_owned)
1263}
1264
1265fn nested_json_string_field(
1266 value: Option<&serde_json::Value>,
1267 parent: &str,
1268 field: &str,
1269) -> Option<String> {
1270 let value = value?;
1271 value
1272 .get(parent)
1273 .and_then(|parent_value| parent_value.get(field))
1274 .and_then(serde_json::Value::as_str)
1275 .map(str::to_owned)
1276}
1277
1278const fn default_timestamp_tolerance_secs() -> u64 {
1279 DEFAULT_TIMESTAMP_TOLERANCE_SECS
1280}
1281
1282const fn default_replay_window_secs() -> u64 {
1283 DEFAULT_REPLAY_WINDOW_SECS
1284}
1285
1286const fn default_max_body_bytes() -> usize {
1287 DEFAULT_MAX_BODY_BYTES
1288}
1289
1290const fn default_true() -> bool {
1291 true
1292}
1293
1294fn default_replay_redis_key_prefix() -> String {
1295 "autumn:webhooks:replay".to_owned()
1296}
1297
1298fn validate_redis_replay_config(
1299 config: &WebhookReplayRedisConfig,
1300) -> Result<(), WebhookConfigError> {
1301 let url = config
1302 .url
1303 .as_deref()
1304 .filter(|url| !url.trim().is_empty())
1305 .ok_or(WebhookConfigError::RedisReplayMissingUrl)?;
1306
1307 #[cfg(feature = "redis")]
1308 {
1309 redis::Client::open(url)
1310 .map_err(|error| WebhookConfigError::RedisReplayInvalidUrl(error.to_string()))?;
1311 Ok(())
1312 }
1313
1314 #[cfg(not(feature = "redis"))]
1315 {
1316 let _ = url;
1317 Err(WebhookConfigError::RedisReplayFeatureDisabled)
1318 }
1319}
1320
1321fn replay_store_from_config(
1322 config: &WebhookReplayConfig,
1323) -> Result<Arc<dyn WebhookReplayStore>, WebhookConfigError> {
1324 match config.backend {
1325 WebhookReplayBackend::Memory => Ok(Arc::new(InMemoryWebhookReplayStore::default())),
1326 WebhookReplayBackend::Redis => {
1327 #[cfg(feature = "redis")]
1328 {
1329 Ok(Arc::new(RedisWebhookReplayStore::from_config(
1330 &config.redis,
1331 )?))
1332 }
1333
1334 #[cfg(not(feature = "redis"))]
1335 {
1336 Err(WebhookConfigError::RedisReplayFeatureDisabled)
1337 }
1338 }
1339 }
1340}
1341
1342pub(crate) fn install_registry_from_config(
1343 state: &crate::AppState,
1344 config: &WebhookConfig,
1345) -> Result<(), WebhookConfigError> {
1346 if config.endpoints.is_empty() {
1347 return Ok(());
1348 }
1349 let registry = WebhookRegistry::from_config(config)?;
1350 state.insert_extension(registry);
1351 Ok(())
1352}