1use std::collections::HashMap;
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use axum::extract::FromRequest;
14use bytes::Bytes;
15use http::{HeaderMap, StatusCode};
16use serde::Deserialize;
17use thiserror::Error;
18
19pub use crate::security::config::hmac_sha256_hex;
20
21const DEFAULT_TIMESTAMP_TOLERANCE_SECS: u64 = 300;
22const DEFAULT_REPLAY_WINDOW_SECS: u64 = 24 * 60 * 60;
23const DEFAULT_MAX_BODY_BYTES: usize = 1024 * 1024;
24const IN_MEMORY_REPLAY_CLEANUP_INTERVAL: usize = 128;
25const IN_MEMORY_REPLAY_CLEANUP_HIGH_WATER: usize = 16 * 1024;
26
27#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Deserialize)]
29#[serde(rename_all = "lowercase")]
30pub enum WebhookProvider {
31 Stripe,
33 Github,
35 Slack,
37 #[default]
39 Generic,
40}
41
42impl WebhookProvider {
43 #[must_use]
45 pub const fn as_str(self) -> &'static str {
46 match self {
47 Self::Stripe => "stripe",
48 Self::Github => "github",
49 Self::Slack => "slack",
50 Self::Generic => "generic",
51 }
52 }
53}
54
55impl std::fmt::Display for WebhookProvider {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.write_str(self.as_str())
58 }
59}
60
61#[derive(Debug, Clone, Default, Deserialize)]
63pub struct WebhookConfig {
64 #[serde(default)]
66 pub replay: WebhookReplayConfig,
67 #[serde(default)]
69 pub endpoints: Vec<WebhookEndpointConfig>,
70}
71
72impl WebhookConfig {
73 pub(crate) fn apply_env_overrides_with_env(&mut self, env: &dyn crate::config::Env) {
75 self.replay.apply_env_overrides_with_env(env);
76 self.resolve_secret_envs_with_env(env);
77 }
78
79 fn resolve_secret_envs_with_env(&mut self, env: &dyn crate::config::Env) {
82 for endpoint in &mut self.endpoints {
83 if endpoint.secret.is_none()
84 && let Some(env_name) = endpoint.secret_env.as_deref()
85 && let Ok(secret) = env.var(env_name)
86 {
87 endpoint.secret = Some(secret);
88 }
89
90 for env_name in &endpoint.previous_secret_envs {
91 if let Ok(secret) = env.var(env_name)
92 && !secret.is_empty()
93 {
94 endpoint.previous_secrets.push(secret);
95 }
96 }
97 }
98 }
99
100 pub fn validate(&self, is_production: bool) -> Result<(), WebhookConfigError> {
107 for endpoint in &self.endpoints {
108 endpoint.validate(is_production)?;
109 }
110 validate_unique_endpoint_paths(&self.endpoints)?;
111 if self
112 .endpoints
113 .iter()
114 .any(|endpoint| endpoint.replay_protection)
115 {
116 self.replay.validate(is_production)?;
117 }
118 Ok(())
119 }
120}
121
122#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize)]
124#[serde(rename_all = "lowercase")]
125pub enum WebhookReplayBackend {
126 #[serde(alias = "local", alias = "in_memory")]
129 #[default]
130 Memory,
131 Redis,
133}
134
135impl WebhookReplayBackend {
136 fn from_env_value(value: &str) -> Option<Self> {
137 match value.trim().to_ascii_lowercase().as_str() {
138 "memory" | "local" | "in_memory" | "in-memory" => Some(Self::Memory),
139 "redis" => Some(Self::Redis),
140 _ => None,
141 }
142 }
143}
144
145#[derive(Debug, Clone, Deserialize)]
147pub struct WebhookReplayConfig {
148 #[serde(default)]
150 pub backend: WebhookReplayBackend,
151 #[serde(default)]
153 pub allow_memory_in_production: bool,
154 #[serde(default)]
156 pub redis: WebhookReplayRedisConfig,
157}
158
159impl Default for WebhookReplayConfig {
160 fn default() -> Self {
161 Self {
162 backend: WebhookReplayBackend::Memory,
163 allow_memory_in_production: false,
164 redis: WebhookReplayRedisConfig::default(),
165 }
166 }
167}
168
169impl WebhookReplayConfig {
170 fn apply_env_overrides_with_env(&mut self, env: &dyn crate::config::Env) {
171 if let Ok(value) = env.var("AUTUMN_SECURITY__WEBHOOKS__REPLAY__BACKEND") {
172 if let Some(backend) = WebhookReplayBackend::from_env_value(&value) {
173 self.backend = backend;
174 } else {
175 eprintln!(
176 "Warning: AUTUMN_SECURITY__WEBHOOKS__REPLAY__BACKEND={value:?} is not valid \
177 (expected memory or redis), ignoring"
178 );
179 }
180 }
181 if let Ok(value) = env.var("AUTUMN_SECURITY__WEBHOOKS__REPLAY__ALLOW_MEMORY_IN_PRODUCTION")
182 {
183 match value.trim().parse::<bool>() {
184 Ok(value) => self.allow_memory_in_production = value,
185 Err(error) => eprintln!(
186 "Warning: AUTUMN_SECURITY__WEBHOOKS__REPLAY__ALLOW_MEMORY_IN_PRODUCTION \
187 could not be parsed as bool: {error}"
188 ),
189 }
190 }
191 if let Ok(value) = env.var("AUTUMN_SECURITY__WEBHOOKS__REPLAY__REDIS__URL") {
192 let value = value.trim();
193 self.redis.url = if value.is_empty() {
194 None
195 } else {
196 Some(value.to_owned())
197 };
198 }
199 if let Ok(value) = env.var("AUTUMN_SECURITY__WEBHOOKS__REPLAY__REDIS__KEY_PREFIX")
200 && !value.trim().is_empty()
201 {
202 self.redis.key_prefix = value;
203 }
204 }
205
206 fn validate(&self, is_production: bool) -> Result<(), WebhookConfigError> {
207 match self.backend {
208 WebhookReplayBackend::Memory => {
209 if is_production && !self.allow_memory_in_production {
210 return Err(WebhookConfigError::MemoryReplayInProduction);
211 }
212 Ok(())
213 }
214 WebhookReplayBackend::Redis => validate_redis_replay_config(&self.redis),
215 }
216 }
217}
218
219#[derive(Debug, Clone, Deserialize)]
221pub struct WebhookReplayRedisConfig {
222 #[serde(default)]
224 pub url: Option<String>,
225 #[serde(default = "default_replay_redis_key_prefix")]
227 pub key_prefix: String,
228}
229
230impl Default for WebhookReplayRedisConfig {
231 fn default() -> Self {
232 Self {
233 url: None,
234 key_prefix: default_replay_redis_key_prefix(),
235 }
236 }
237}
238
239#[derive(Debug, Clone, Deserialize)]
241pub struct WebhookEndpointConfig {
242 pub name: String,
244 pub path: String,
246 #[serde(default)]
248 pub provider: WebhookProvider,
249 #[serde(default)]
251 pub secret: Option<String>,
252 #[serde(default)]
254 pub secret_env: Option<String>,
255 #[serde(default)]
257 pub previous_secrets: Vec<String>,
258 #[serde(default)]
260 pub previous_secret_envs: Vec<String>,
261 #[serde(default = "default_timestamp_tolerance_secs")]
263 pub timestamp_tolerance_secs: u64,
264 #[serde(default = "default_replay_window_secs")]
266 pub replay_window_secs: u64,
267 #[serde(default = "default_true")]
269 pub replay_protection: bool,
270 #[serde(default)]
273 pub signature_header: Option<String>,
274 #[serde(default)]
276 pub signature_prefix: Option<String>,
277 #[serde(default)]
279 pub timestamp_header: Option<String>,
280 #[serde(default)]
282 pub delivery_id_header: Option<String>,
283 #[serde(default)]
285 pub event_type_header: Option<String>,
286 #[serde(default = "default_max_body_bytes")]
288 pub max_body_bytes: usize,
289}
290
291impl Default for WebhookEndpointConfig {
292 fn default() -> Self {
293 Self::provider_defaults(WebhookProvider::Generic)
294 }
295}
296
297impl WebhookEndpointConfig {
298 #[must_use]
300 pub fn new(
301 name: impl Into<String>,
302 path: impl Into<String>,
303 provider: WebhookProvider,
304 secret: impl Into<String>,
305 ) -> Self {
306 let mut config = Self::provider_defaults(provider);
307 config.name = name.into();
308 config.path = path.into();
309 config.secret = Some(secret.into());
310 config
311 }
312
313 #[must_use]
315 pub fn stripe(
316 name: impl Into<String>,
317 path: impl Into<String>,
318 secret: impl Into<String>,
319 ) -> Self {
320 Self::new(name, path, WebhookProvider::Stripe, secret)
321 }
322
323 #[must_use]
325 pub fn github(
326 name: impl Into<String>,
327 path: impl Into<String>,
328 secret: impl Into<String>,
329 ) -> Self {
330 Self::new(name, path, WebhookProvider::Github, secret)
331 }
332
333 #[must_use]
335 pub fn slack(
336 name: impl Into<String>,
337 path: impl Into<String>,
338 secret: impl Into<String>,
339 ) -> Self {
340 Self::new(name, path, WebhookProvider::Slack, secret)
341 }
342
343 #[must_use]
345 pub fn generic(
346 name: impl Into<String>,
347 path: impl Into<String>,
348 secret: impl Into<String>,
349 ) -> Self {
350 Self::new(name, path, WebhookProvider::Generic, secret)
351 }
352
353 #[must_use]
355 pub fn with_previous_secret(mut self, secret: impl Into<String>) -> Self {
356 self.previous_secrets.push(secret.into());
357 self
358 }
359
360 #[must_use]
362 pub const fn with_timestamp_tolerance_secs(mut self, secs: u64) -> Self {
363 self.timestamp_tolerance_secs = secs;
364 self
365 }
366
367 #[must_use]
369 pub const fn with_replay_window_secs(mut self, secs: u64) -> Self {
370 self.replay_window_secs = secs;
371 self
372 }
373
374 #[must_use]
376 pub const fn without_replay_protection(mut self) -> Self {
377 self.replay_protection = false;
378 self
379 }
380
381 fn provider_defaults(provider: WebhookProvider) -> Self {
382 let mut config = Self {
383 name: String::new(),
384 path: String::new(),
385 provider,
386 secret: None,
387 secret_env: None,
388 previous_secrets: Vec::new(),
389 previous_secret_envs: Vec::new(),
390 timestamp_tolerance_secs: DEFAULT_TIMESTAMP_TOLERANCE_SECS,
391 replay_window_secs: DEFAULT_REPLAY_WINDOW_SECS,
392 replay_protection: true,
393 signature_header: None,
394 signature_prefix: None,
395 timestamp_header: None,
396 delivery_id_header: None,
397 event_type_header: None,
398 max_body_bytes: DEFAULT_MAX_BODY_BYTES,
399 };
400
401 match provider {
402 WebhookProvider::Stripe => {
403 config.signature_header = Some("Stripe-Signature".to_owned());
404 }
405 WebhookProvider::Github => {
406 config.signature_header = Some("X-Hub-Signature-256".to_owned());
407 config.signature_prefix = Some("sha256=".to_owned());
408 config.delivery_id_header = Some("X-GitHub-Delivery".to_owned());
409 config.event_type_header = Some("X-GitHub-Event".to_owned());
410 }
411 WebhookProvider::Slack => {
412 config.signature_header = Some("X-Slack-Signature".to_owned());
413 config.signature_prefix = Some("v0=".to_owned());
414 config.timestamp_header = Some("X-Slack-Request-Timestamp".to_owned());
415 }
416 WebhookProvider::Generic => {
417 config.signature_header = Some("X-Webhook-Signature".to_owned());
418 config.signature_prefix = Some("sha256=".to_owned());
419 config.delivery_id_header = Some("X-Webhook-Delivery".to_owned());
420 config.event_type_header = Some("X-Webhook-Event".to_owned());
421 }
422 }
423
424 config
425 }
426
427 fn validate(&self, is_production: bool) -> Result<(), WebhookConfigError> {
428 if self.name.trim().is_empty() {
429 return Err(WebhookConfigError::InvalidEndpoint {
430 name: self.name.clone(),
431 message: "name must not be empty".to_owned(),
432 });
433 }
434 if !self.path.starts_with('/') || self.path.trim() == "/" || self.path.trim().is_empty() {
435 return Err(WebhookConfigError::InvalidEndpoint {
436 name: self.name.clone(),
437 message: format!("path {:?} must start with '/' and not be root", self.path),
438 });
439 }
440 let Some(secret) = self.secret.as_deref().filter(|value| !value.is_empty()) else {
441 return Err(WebhookConfigError::MissingSecret {
442 name: self.name.clone(),
443 path: self.path.clone(),
444 });
445 };
446
447 if is_production {
448 crate::security::config::validate_signing_secret(Some(secret), true).map_err(
449 |reason| WebhookConfigError::InvalidSecret {
450 name: self.name.clone(),
451 reason,
452 },
453 )?;
454 for (index, previous) in self.previous_secrets.iter().enumerate() {
455 crate::security::config::validate_signing_secret(Some(previous), true).map_err(
456 |reason| WebhookConfigError::InvalidPreviousSecret {
457 name: self.name.clone(),
458 index,
459 reason,
460 },
461 )?;
462 }
463 }
464
465 Ok(())
466 }
467
468 fn apply_provider_defaults(&mut self) {
469 let defaults = Self::provider_defaults(self.provider);
470 if self.signature_header.is_none() {
471 self.signature_header = defaults.signature_header;
472 }
473 if self.signature_prefix.is_none() {
474 self.signature_prefix = defaults.signature_prefix;
475 }
476 if self.timestamp_header.is_none() {
477 self.timestamp_header = defaults.timestamp_header;
478 }
479 if self.delivery_id_header.is_none() {
480 self.delivery_id_header = defaults.delivery_id_header;
481 }
482 if self.event_type_header.is_none() {
483 self.event_type_header = defaults.event_type_header;
484 }
485 }
486}
487
488#[derive(Debug, Clone, PartialEq, Eq, Error)]
490pub enum WebhookConfigError {
491 #[error("webhook endpoint {name:?} at {path:?} is missing a secret")]
493 MissingSecret {
494 name: String,
496 path: String,
498 },
499 #[error("webhook endpoint {name:?} is invalid: {message}")]
501 InvalidEndpoint {
502 name: String,
504 message: String,
506 },
507 #[error(
509 "duplicate webhook endpoint path {path:?}: endpoints {first_name:?} and \
510 {duplicate_name:?} would shadow each other"
511 )]
512 DuplicatePath {
513 path: String,
515 first_name: String,
517 duplicate_name: String,
519 },
520 #[error("webhook endpoint {name:?} has invalid secret: {reason}")]
522 InvalidSecret {
523 name: String,
525 reason: crate::security::config::SigningSecretError,
527 },
528 #[error("webhook endpoint {name:?} has invalid previous secret {index}: {reason}")]
530 InvalidPreviousSecret {
531 name: String,
533 index: usize,
535 reason: crate::security::config::SigningSecretError,
537 },
538 #[error(
540 "webhook replay backend memory is not allowed in production; set \
541 security.webhooks.replay.backend = \"redis\" or explicitly set \
542 security.webhooks.replay.allow_memory_in_production = true"
543 )]
544 MemoryReplayInProduction,
545 #[error("webhook redis replay backend requires security.webhooks.replay.redis.url")]
547 RedisReplayMissingUrl,
548 #[error("webhook redis replay backend URL is invalid: {0}")]
550 RedisReplayInvalidUrl(String),
551 #[error("webhook redis replay backend requires the autumn-web redis feature")]
553 RedisReplayFeatureDisabled,
554}
555
556#[derive(Debug)]
557struct ResolvedWebhookEndpoint {
558 config: WebhookEndpointConfig,
559 keys: crate::security::config::ResolvedSigningKeys,
560}
561
562#[derive(Clone, Debug)]
564pub struct WebhookRegistry {
565 endpoints_by_path: Arc<HashMap<String, Arc<ResolvedWebhookEndpoint>>>,
566 replay_store: Arc<dyn WebhookReplayStore>,
567}
568
569impl WebhookRegistry {
570 pub fn from_config(config: &WebhookConfig) -> Result<Self, WebhookConfigError> {
576 let replay_store = if config
577 .endpoints
578 .iter()
579 .any(|endpoint| endpoint.replay_protection)
580 {
581 replay_store_from_config(&config.replay)?
582 } else {
583 Arc::new(InMemoryWebhookReplayStore::default())
584 };
585 Self::from_config_with_shared_replay_store(config, replay_store)
586 }
587
588 pub fn from_config_with_replay_store(
594 config: &WebhookConfig,
595 replay_store: impl WebhookReplayStore + 'static,
596 ) -> Result<Self, WebhookConfigError> {
597 Self::from_config_with_shared_replay_store(config, Arc::new(replay_store))
598 }
599
600 pub fn from_config_with_shared_replay_store(
609 config: &WebhookConfig,
610 replay_store: Arc<dyn WebhookReplayStore>,
611 ) -> Result<Self, WebhookConfigError> {
612 validate_unique_endpoint_paths(&config.endpoints)?;
613 let mut endpoints_by_path = HashMap::new();
614 for endpoint in &config.endpoints {
615 let mut endpoint = endpoint.clone();
616 endpoint.apply_provider_defaults();
617 endpoint.validate(false)?;
618 let Some(secret) = endpoint.secret.as_ref() else {
619 return Err(WebhookConfigError::MissingSecret {
620 name: endpoint.name.clone(),
621 path: endpoint.path.clone(),
622 });
623 };
624 let current = secret.as_bytes().to_vec();
625 let previous = endpoint
626 .previous_secrets
627 .iter()
628 .map(|secret| secret.as_bytes().to_vec())
629 .collect();
630 endpoints_by_path.insert(
631 endpoint.path.clone(),
632 Arc::new(ResolvedWebhookEndpoint {
633 config: endpoint,
634 keys: crate::security::config::ResolvedSigningKeys::new(current, previous),
635 }),
636 );
637 }
638
639 Ok(Self {
640 endpoints_by_path: Arc::new(endpoints_by_path),
641 replay_store,
642 })
643 }
644
645 fn endpoint_for_path(&self, path: &str) -> Option<Arc<ResolvedWebhookEndpoint>> {
646 self.endpoints_by_path.get(path).cloned()
647 }
648}
649
650fn validate_unique_endpoint_paths(
651 endpoints: &[WebhookEndpointConfig],
652) -> Result<(), WebhookConfigError> {
653 let mut seen_paths = HashMap::new();
654 for endpoint in endpoints {
655 if let Some(first_name) = seen_paths.insert(endpoint.path.as_str(), endpoint.name.as_str())
656 {
657 return Err(WebhookConfigError::DuplicatePath {
658 path: endpoint.path.clone(),
659 first_name: first_name.to_owned(),
660 duplicate_name: endpoint.name.clone(),
661 });
662 }
663 }
664 Ok(())
665}
666
667pub type WebhookReplayFuture<'a> =
669 Pin<Box<dyn Future<Output = Result<bool, WebhookReplayStoreError>> + Send + 'a>>;
670
671pub trait WebhookReplayStore: Send + Sync + std::fmt::Debug {
673 fn check_and_insert<'a>(
676 &'a self,
677 key: &'a str,
678 received_at: SystemTime,
679 window: Duration,
680 ) -> WebhookReplayFuture<'a>;
681
682 fn remove<'a>(&'a self, key: &'a str) -> WebhookReplayFuture<'a>;
684}
685
686impl<T> WebhookReplayStore for Arc<T>
687where
688 T: WebhookReplayStore + ?Sized,
689{
690 fn check_and_insert<'a>(
691 &'a self,
692 key: &'a str,
693 received_at: SystemTime,
694 window: Duration,
695 ) -> WebhookReplayFuture<'a> {
696 self.as_ref().check_and_insert(key, received_at, window)
697 }
698
699 fn remove<'a>(&'a self, key: &'a str) -> WebhookReplayFuture<'a> {
700 self.as_ref().remove(key)
701 }
702}
703
704#[derive(Debug, Clone, Error)]
706#[error("{message}")]
707pub struct WebhookReplayStoreError {
708 message: String,
709}
710
711impl WebhookReplayStoreError {
712 #[must_use]
719 pub fn new(message: impl Into<String>) -> Self {
720 Self {
721 message: message.into(),
722 }
723 }
724}
725
726#[cfg(feature = "redis")]
727impl WebhookReplayStoreError {
728 fn backend(operation: &'static str, error: impl std::fmt::Display) -> Self {
729 Self::new(format!("webhook replay store {operation} failed: {error}"))
730 }
731}
732
733#[derive(Debug, Default)]
738pub struct InMemoryWebhookReplayStore {
739 state: Mutex<InMemoryWebhookReplayState>,
740}
741
742#[derive(Debug, Default)]
743struct InMemoryWebhookReplayState {
744 seen: HashMap<String, SystemTime>,
745 checks_since_cleanup: usize,
746}
747
748impl InMemoryWebhookReplayStore {
749 fn check_and_insert_sync(&self, key: &str, received_at: SystemTime, window: Duration) -> bool {
750 {
751 let mut state = self
752 .state
753 .lock()
754 .expect("webhook replay store lock poisoned");
755 state.checks_since_cleanup = state.checks_since_cleanup.saturating_add(1);
756
757 if let Some(expires_at) = state.seen.get(key).copied() {
758 if expires_at.duration_since(received_at).is_ok() {
759 Self::cleanup_if_due(&mut state, received_at);
760 drop(state);
761 return false;
762 }
763 state.seen.remove(key);
764 }
765
766 let expires_at = received_at.checked_add(window).unwrap_or(received_at);
767 state.seen.insert(key.to_owned(), expires_at);
768 Self::cleanup_if_due(&mut state, received_at);
769 drop(state);
770 }
771 true
772 }
773
774 fn cleanup_if_due(state: &mut InMemoryWebhookReplayState, received_at: SystemTime) {
775 if state.checks_since_cleanup < IN_MEMORY_REPLAY_CLEANUP_INTERVAL
776 && state.seen.len() <= IN_MEMORY_REPLAY_CLEANUP_HIGH_WATER
777 {
778 return;
779 }
780
781 state.checks_since_cleanup = 0;
782 state
783 .seen
784 .retain(|_, expires_at| expires_at.duration_since(received_at).is_ok());
785 }
786}
787
788impl WebhookReplayStore for InMemoryWebhookReplayStore {
789 fn check_and_insert<'a>(
790 &'a self,
791 key: &'a str,
792 received_at: SystemTime,
793 window: Duration,
794 ) -> WebhookReplayFuture<'a> {
795 Box::pin(async move { Ok(self.check_and_insert_sync(key, received_at, window)) })
796 }
797
798 fn remove<'a>(&'a self, key: &'a str) -> WebhookReplayFuture<'a> {
799 self.state
800 .lock()
801 .expect("webhook replay store lock poisoned")
802 .seen
803 .remove(key);
804 Box::pin(async move { Ok(true) })
805 }
806}
807
808#[cfg(feature = "redis")]
810#[derive(Clone, Debug)]
811pub struct RedisWebhookReplayStore {
812 connection: redis::aio::ConnectionManager,
813 key_prefix: String,
814}
815
816#[cfg(feature = "redis")]
817impl RedisWebhookReplayStore {
818 pub fn from_config(config: &WebhookReplayRedisConfig) -> Result<Self, WebhookConfigError> {
824 let url = config
825 .url
826 .as_deref()
827 .filter(|url| !url.trim().is_empty())
828 .ok_or(WebhookConfigError::RedisReplayMissingUrl)?;
829 let client = redis::Client::open(url)
830 .map_err(|error| WebhookConfigError::RedisReplayInvalidUrl(error.to_string()))?;
831 let connection = redis::aio::ConnectionManager::new_lazy_with_config(
832 client,
833 redis::aio::ConnectionManagerConfig::new(),
834 )
835 .map_err(|error| WebhookConfigError::RedisReplayInvalidUrl(error.to_string()))?;
836 Ok(Self {
837 connection,
838 key_prefix: config.key_prefix.clone(),
839 })
840 }
841
842 fn key_for(&self, replay_key: &str) -> String {
843 format!("{}:{replay_key}", self.key_prefix)
844 }
845}
846
847#[cfg(feature = "redis")]
848impl WebhookReplayStore for RedisWebhookReplayStore {
849 fn check_and_insert<'a>(
850 &'a self,
851 key: &'a str,
852 received_at: SystemTime,
853 window: Duration,
854 ) -> WebhookReplayFuture<'a> {
855 Box::pin(async move {
856 let mut connection = self.connection.clone();
857 let key = self.key_for(key);
858 let ttl_secs = window.as_secs().max(1);
859 let received_unix = received_at
860 .duration_since(UNIX_EPOCH)
861 .map_err(|error| WebhookReplayStoreError::backend("timestamp", error))?
862 .as_secs()
863 .to_string();
864 let inserted: Option<String> = redis::cmd("SET")
865 .arg(&key)
866 .arg(received_unix)
867 .arg("NX")
868 .arg("EX")
869 .arg(ttl_secs)
870 .query_async(&mut connection)
871 .await
872 .map_err(|error| WebhookReplayStoreError::backend("insert", error))?;
873 Ok(inserted.is_some())
874 })
875 }
876
877 fn remove<'a>(&'a self, key: &'a str) -> WebhookReplayFuture<'a> {
878 Box::pin(async move {
879 let mut connection = self.connection.clone();
880 let key = self.key_for(key);
881 let _: () = redis::cmd("DEL")
882 .arg(&key)
883 .query_async(&mut connection)
884 .await
885 .map_err(|error| WebhookReplayStoreError::backend("delete", error))?;
886 Ok(true)
887 })
888 }
889}
890
891#[derive(Debug, Clone)]
893pub struct SignedWebhook {
894 provider: WebhookProvider,
895 endpoint: String,
896 delivery_id: Option<String>,
897 event_type: Option<String>,
898 received_at: SystemTime,
899 raw_body: Bytes,
900}
901
902impl SignedWebhook {
903 #[must_use]
905 pub const fn provider(&self) -> &'static str {
906 self.provider.as_str()
907 }
908
909 #[must_use]
911 pub fn endpoint(&self) -> &str {
912 &self.endpoint
913 }
914
915 #[must_use]
917 pub fn delivery_id(&self) -> Option<&str> {
918 self.delivery_id.as_deref()
919 }
920
921 #[must_use]
923 pub fn event_type(&self) -> Option<&str> {
924 self.event_type.as_deref()
925 }
926
927 #[must_use]
929 pub const fn received_at(&self) -> SystemTime {
930 self.received_at
931 }
932
933 #[must_use]
935 pub fn raw_body(&self) -> &[u8] {
936 &self.raw_body
937 }
938
939 pub fn json<T: serde::de::DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
946 serde_json::from_slice(&self.raw_body)
947 }
948}
949
950impl FromRequest<crate::AppState> for SignedWebhook {
951 type Rejection = crate::AutumnError;
952
953 async fn from_request(
954 req: axum::extract::Request,
955 state: &crate::AppState,
956 ) -> Result<Self, Self::Rejection> {
957 let (parts, body) = req.into_parts();
958 let path = parts.uri.path().to_owned();
959 let registry = state
960 .extension::<WebhookRegistry>()
961 .ok_or_else(|| WebhookVerifyError::RegistryMissing.into_autumn_error())?;
962 let endpoint = registry
963 .endpoint_for_path(&path)
964 .ok_or_else(|| WebhookVerifyError::EndpointMissing(path.clone()).into_autumn_error())?;
965 let body = axum::body::to_bytes(body, endpoint.config.max_body_bytes)
966 .await
967 .map_err(|err| {
968 crate::AutumnError::bad_request_msg(format!(
969 "webhook body could not be read: {err}"
970 ))
971 })?;
972 let received_at = SystemTime::now();
973 verify_request(®istry, &endpoint, &parts.headers, body, received_at)
974 .await
975 .map_err(WebhookVerifyError::into_autumn_error)
976 }
977}
978
979#[derive(Debug, Error)]
980enum WebhookVerifyError {
981 #[error("signed webhook registry is not installed")]
982 RegistryMissing,
983 #[error("no signed webhook endpoint is configured for path {0}")]
984 EndpointMissing(String),
985 #[error("missing required webhook header {0}")]
986 MissingHeader(String),
987 #[error("malformed webhook signature")]
988 MalformedSignature,
989 #[error("malformed webhook timestamp")]
990 MalformedTimestamp,
991 #[error("webhook timestamp is outside the accepted tolerance")]
992 StaleTimestamp,
993 #[error("webhook signature mismatch")]
994 SignatureMismatch,
995 #[error("missing webhook delivery ID")]
996 MissingDeliveryId,
997 #[error("duplicate webhook delivery")]
998 DuplicateDelivery,
999 #[error("webhook replay store unavailable: {0}")]
1000 ReplayStoreUnavailable(String),
1001}
1002
1003impl WebhookVerifyError {
1004 const fn status(&self) -> StatusCode {
1005 match self {
1006 Self::RegistryMissing | Self::EndpointMissing(_) => StatusCode::INTERNAL_SERVER_ERROR,
1007 Self::MissingHeader(_)
1008 | Self::MalformedSignature
1009 | Self::MalformedTimestamp
1010 | Self::MissingDeliveryId => StatusCode::BAD_REQUEST,
1011 Self::StaleTimestamp | Self::SignatureMismatch => StatusCode::UNAUTHORIZED,
1012 Self::DuplicateDelivery => StatusCode::CONFLICT,
1013 Self::ReplayStoreUnavailable(_) => StatusCode::SERVICE_UNAVAILABLE,
1014 }
1015 }
1016
1017 fn into_autumn_error(self) -> crate::AutumnError {
1018 crate::AutumnError::bad_request_msg(self.to_string()).with_status(self.status())
1019 }
1020}
1021
1022type ReplayStoreCell =
1023 std::sync::Arc<std::sync::Mutex<Option<(std::sync::Arc<dyn WebhookReplayStore>, Vec<String>)>>>;
1024
1025tokio::task_local! {
1026 pub static WEBHOOK_REPLAY_KEY: ReplayStoreCell;
1027}
1028
1029struct ReplayKeyGuard {
1030 cell: ReplayStoreCell,
1031 completed: bool,
1032}
1033
1034impl Drop for ReplayKeyGuard {
1035 fn drop(&mut self) {
1036 if !self.completed {
1037 let to_remove = match self.cell.lock() {
1038 Ok(mut guard) => guard.take(),
1039 Err(poisoned) => poisoned.into_inner().take(),
1040 };
1041 if let Some((store, keys)) = to_remove {
1042 tokio::spawn(async move {
1043 for key in keys {
1044 tracing::debug!(key = %key, "Releasing webhook replay key due to panic in handler");
1045 let _ = store.remove(&key).await;
1046 }
1047 });
1048 }
1049 }
1050 }
1051}
1052
1053pub async fn webhook_replay_cleanup_middleware(
1055 req: axum::extract::Request,
1056 next: axum::middleware::Next,
1057) -> axum::response::Response {
1058 let cell = std::sync::Arc::new(std::sync::Mutex::new(None));
1059 let cell_cloned = cell.clone();
1060
1061 let mut guard = ReplayKeyGuard {
1062 cell: cell_cloned.clone(),
1063 completed: false,
1064 };
1065
1066 let response = WEBHOOK_REPLAY_KEY
1067 .scope(cell, async move { next.run(req).await })
1068 .await;
1069
1070 guard.completed = true;
1071
1072 if response.status().is_server_error() {
1073 let to_remove = match cell_cloned.lock() {
1074 Ok(mut guard) => guard.take(),
1075 Err(poisoned) => poisoned.into_inner().take(),
1076 };
1077 if let Some((store, keys)) = to_remove {
1078 for key in keys {
1079 tracing::debug!(key = %key, "Releasing webhook replay key due to 5xx server error");
1080 let _ = store.remove(&key).await;
1081 }
1082 }
1083 }
1084
1085 response
1086}
1087
1088async fn verify_request(
1089 registry: &WebhookRegistry,
1090 endpoint: &ResolvedWebhookEndpoint,
1091 headers: &HeaderMap,
1092 body: Bytes,
1093 received_at: SystemTime,
1094) -> Result<SignedWebhook, WebhookVerifyError> {
1095 match endpoint.config.provider {
1096 WebhookProvider::Stripe => verify_stripe(endpoint, headers, &body, received_at)?,
1097 WebhookProvider::Github | WebhookProvider::Generic => {
1098 verify_body_hmac(endpoint, headers, &body, None, received_at)?;
1099 }
1100 WebhookProvider::Slack => verify_slack(endpoint, headers, &body, received_at)?,
1101 }
1102
1103 let json_body = serde_json::from_slice::<serde_json::Value>(&body).ok();
1104 let delivery_id = resolve_delivery_id(&endpoint.config, headers, json_body.as_ref());
1105 if endpoint.config.replay_protection {
1106 let delivery_id = delivery_id
1107 .as_deref()
1108 .ok_or(WebhookVerifyError::MissingDeliveryId)?;
1109
1110 let mut keys_to_check = vec![format!(
1111 "{}:{}:id:{delivery_id}",
1112 endpoint.config.provider.as_str(),
1113 endpoint.config.name
1114 )];
1115
1116 if matches!(
1117 endpoint.config.provider,
1118 WebhookProvider::Github | WebhookProvider::Generic
1119 ) {
1120 let sig_hdr = signature_header(endpoint);
1121 if let Some(sig_val) = headers.get(sig_hdr).and_then(|v| v.to_str().ok()) {
1122 use sha2::{Digest, Sha256};
1123 let mut hasher = Sha256::new();
1124 hasher.update(sig_val.as_bytes());
1125 let sig_hash = hex::encode(hasher.finalize());
1126 keys_to_check.push(format!(
1127 "{}:{}:sig:{sig_hash}",
1128 endpoint.config.provider.as_str(),
1129 endpoint.config.name
1130 ));
1131 }
1132 }
1133
1134 let window = Duration::from_secs(endpoint.config.replay_window_secs);
1135 let mut inserted_keys = Vec::new();
1136 for key in keys_to_check {
1137 match registry
1138 .replay_store
1139 .check_and_insert(&key, received_at, window)
1140 .await
1141 {
1142 Ok(true) => {
1143 inserted_keys.push(key);
1144 }
1145 Ok(false) => {
1146 for inserted in inserted_keys {
1148 let _ = registry.replay_store.remove(&inserted).await;
1149 }
1150 return Err(WebhookVerifyError::DuplicateDelivery);
1151 }
1152 Err(error) => {
1153 for inserted in inserted_keys {
1155 let _ = registry.replay_store.remove(&inserted).await;
1156 }
1157 return Err(WebhookVerifyError::ReplayStoreUnavailable(
1158 error.to_string(),
1159 ));
1160 }
1161 }
1162 }
1163
1164 let _ = WEBHOOK_REPLAY_KEY.try_with(|cell| {
1165 let guard = match cell.lock() {
1166 Ok(g) => Some(g),
1167 Err(poisoned) => Some(poisoned.into_inner()),
1168 };
1169 if let Some(mut guard) = guard {
1170 *guard = Some((std::sync::Arc::clone(®istry.replay_store), inserted_keys));
1171 }
1172 });
1173 }
1174
1175 Ok(SignedWebhook {
1176 provider: endpoint.config.provider,
1177 endpoint: endpoint.config.name.clone(),
1178 delivery_id,
1179 event_type: resolve_event_type(&endpoint.config, headers, json_body.as_ref()),
1180 received_at,
1181 raw_body: body,
1182 })
1183}
1184
1185fn verify_stripe(
1186 endpoint: &ResolvedWebhookEndpoint,
1187 headers: &HeaderMap,
1188 body: &[u8],
1189 received_at: SystemTime,
1190) -> Result<(), WebhookVerifyError> {
1191 let header = required_header(headers, signature_header(endpoint))?;
1192 let (timestamp, signatures) = parse_stripe_signature(header)?;
1193 verify_timestamp(
1194 timestamp,
1195 received_at,
1196 endpoint.config.timestamp_tolerance_secs,
1197 )?;
1198
1199 let timestamp = timestamp.to_string();
1200 let mut signed_payload = Vec::with_capacity(timestamp.len() + 1 + body.len());
1201 signed_payload.extend_from_slice(timestamp.as_bytes());
1202 signed_payload.push(b'.');
1203 signed_payload.extend_from_slice(body);
1204
1205 if signatures
1206 .iter()
1207 .any(|signature| endpoint.keys.verify(&signed_payload, signature))
1208 {
1209 Ok(())
1210 } else {
1211 Err(WebhookVerifyError::SignatureMismatch)
1212 }
1213}
1214
1215fn verify_slack(
1216 endpoint: &ResolvedWebhookEndpoint,
1217 headers: &HeaderMap,
1218 body: &[u8],
1219 received_at: SystemTime,
1220) -> Result<(), WebhookVerifyError> {
1221 let timestamp_header = endpoint
1222 .config
1223 .timestamp_header
1224 .as_deref()
1225 .ok_or(WebhookVerifyError::MalformedTimestamp)?;
1226 let timestamp = required_header(headers, timestamp_header)?
1227 .parse::<i64>()
1228 .map_err(|_| WebhookVerifyError::MalformedTimestamp)?;
1229 verify_timestamp(
1230 timestamp,
1231 received_at,
1232 endpoint.config.timestamp_tolerance_secs,
1233 )?;
1234
1235 let timestamp = timestamp.to_string();
1236 let mut signed_payload = Vec::with_capacity(3 + timestamp.len() + 1 + body.len());
1237 signed_payload.extend_from_slice(b"v0:");
1238 signed_payload.extend_from_slice(timestamp.as_bytes());
1239 signed_payload.push(b':');
1240 signed_payload.extend_from_slice(body);
1241 verify_body_hmac(
1242 endpoint,
1243 headers,
1244 &signed_payload,
1245 endpoint.config.signature_prefix.as_deref(),
1246 received_at,
1247 )
1248}
1249
1250fn verify_body_hmac(
1251 endpoint: &ResolvedWebhookEndpoint,
1252 headers: &HeaderMap,
1253 body_or_base: &[u8],
1254 explicit_prefix: Option<&str>,
1255 received_at: SystemTime,
1256) -> Result<(), WebhookVerifyError> {
1257 if let Some(timestamp_header) = endpoint.config.timestamp_header.as_deref()
1258 && endpoint.config.provider != WebhookProvider::Slack
1259 {
1260 let timestamp = required_header(headers, timestamp_header)?
1261 .parse::<i64>()
1262 .map_err(|_| WebhookVerifyError::MalformedTimestamp)?;
1263 verify_timestamp(
1264 timestamp,
1265 received_at,
1266 endpoint.config.timestamp_tolerance_secs,
1267 )?;
1268 }
1269
1270 let mut signature = required_header(headers, signature_header(endpoint))?;
1271 let prefix = explicit_prefix.or(endpoint.config.signature_prefix.as_deref());
1272 if let Some(prefix) = prefix {
1273 signature = signature
1274 .strip_prefix(prefix)
1275 .ok_or(WebhookVerifyError::MalformedSignature)?;
1276 }
1277
1278 if endpoint.keys.verify(body_or_base, signature) {
1279 Ok(())
1280 } else {
1281 Err(WebhookVerifyError::SignatureMismatch)
1282 }
1283}
1284
1285fn signature_header(endpoint: &ResolvedWebhookEndpoint) -> &str {
1286 endpoint
1287 .config
1288 .signature_header
1289 .as_deref()
1290 .unwrap_or("X-Webhook-Signature")
1291}
1292
1293fn required_header<'a>(headers: &'a HeaderMap, name: &str) -> Result<&'a str, WebhookVerifyError> {
1294 headers
1295 .get(name)
1296 .ok_or_else(|| WebhookVerifyError::MissingHeader(name.to_owned()))?
1297 .to_str()
1298 .map_err(|_| WebhookVerifyError::MalformedSignature)
1299}
1300
1301fn parse_stripe_signature(header: &str) -> Result<(i64, Vec<&str>), WebhookVerifyError> {
1302 let mut timestamp = None;
1303 let mut signatures = Vec::new();
1304
1305 for part in header.split(',') {
1306 let Some((key, value)) = part.split_once('=') else {
1307 return Err(WebhookVerifyError::MalformedSignature);
1308 };
1309 match key.trim() {
1310 "t" => {
1311 timestamp = Some(
1312 value
1313 .trim()
1314 .parse::<i64>()
1315 .map_err(|_| WebhookVerifyError::MalformedTimestamp)?,
1316 );
1317 }
1318 "v1" => signatures.push(value.trim()),
1319 _ => {}
1320 }
1321 }
1322
1323 let timestamp = timestamp.ok_or(WebhookVerifyError::MalformedTimestamp)?;
1324 if signatures.is_empty() {
1325 return Err(WebhookVerifyError::MalformedSignature);
1326 }
1327 Ok((timestamp, signatures))
1328}
1329
1330fn verify_timestamp(
1331 timestamp: i64,
1332 received_at: SystemTime,
1333 tolerance_secs: u64,
1334) -> Result<(), WebhookVerifyError> {
1335 let now = i64::try_from(
1336 received_at
1337 .duration_since(UNIX_EPOCH)
1338 .map_err(|_| WebhookVerifyError::MalformedTimestamp)?
1339 .as_secs(),
1340 )
1341 .map_err(|_| WebhookVerifyError::MalformedTimestamp)?;
1342 let skew = now.abs_diff(timestamp);
1343 if skew > tolerance_secs {
1344 return Err(WebhookVerifyError::StaleTimestamp);
1345 }
1346 Ok(())
1347}
1348
1349fn resolve_delivery_id(
1350 config: &WebhookEndpointConfig,
1351 headers: &HeaderMap,
1352 json_body: Option<&serde_json::Value>,
1353) -> Option<String> {
1354 let header = config
1355 .delivery_id_header
1356 .as_deref()
1357 .and_then(|header| optional_header(headers, header));
1358
1359 match config.provider {
1360 WebhookProvider::Slack => header
1361 .or_else(|| slack_delivery_id(json_body))
1362 .or_else(|| json_string_field(json_body, "id")),
1363 _ => header.or_else(|| json_string_field(json_body, "id")),
1364 }
1365}
1366
1367fn resolve_event_type(
1368 config: &WebhookEndpointConfig,
1369 headers: &HeaderMap,
1370 json_body: Option<&serde_json::Value>,
1371) -> Option<String> {
1372 config
1373 .event_type_header
1374 .as_deref()
1375 .and_then(|header| optional_header(headers, header))
1376 .or_else(|| json_string_field(json_body, "type"))
1377 .or_else(|| nested_json_string_field(json_body, "event", "type"))
1378}
1379
1380fn optional_header(headers: &HeaderMap, name: &str) -> Option<String> {
1381 headers
1382 .get(name)
1383 .and_then(|value| value.to_str().ok())
1384 .filter(|value| !value.trim().is_empty())
1385 .map(str::to_owned)
1386}
1387
1388fn slack_delivery_id(json_body: Option<&serde_json::Value>) -> Option<String> {
1389 json_string_field(json_body, "event_id").or_else(|| {
1390 let value = json_body?;
1391 if value.get("type").and_then(serde_json::Value::as_str) == Some("url_verification") {
1392 value
1393 .get("challenge")
1394 .and_then(serde_json::Value::as_str)
1395 .map(str::to_owned)
1396 } else {
1397 None
1398 }
1399 })
1400}
1401
1402fn json_string_field(value: Option<&serde_json::Value>, field: &str) -> Option<String> {
1403 let value = value?;
1404 value
1405 .get(field)
1406 .and_then(serde_json::Value::as_str)
1407 .map(str::to_owned)
1408}
1409
1410fn nested_json_string_field(
1411 value: Option<&serde_json::Value>,
1412 parent: &str,
1413 field: &str,
1414) -> Option<String> {
1415 let value = value?;
1416 value
1417 .get(parent)
1418 .and_then(|parent_value| parent_value.get(field))
1419 .and_then(serde_json::Value::as_str)
1420 .map(str::to_owned)
1421}
1422
1423const fn default_timestamp_tolerance_secs() -> u64 {
1424 DEFAULT_TIMESTAMP_TOLERANCE_SECS
1425}
1426
1427const fn default_replay_window_secs() -> u64 {
1428 DEFAULT_REPLAY_WINDOW_SECS
1429}
1430
1431const fn default_max_body_bytes() -> usize {
1432 DEFAULT_MAX_BODY_BYTES
1433}
1434
1435const fn default_true() -> bool {
1436 true
1437}
1438
1439fn default_replay_redis_key_prefix() -> String {
1440 "autumn:webhooks:replay".to_owned()
1441}
1442
1443fn validate_redis_replay_config(
1444 config: &WebhookReplayRedisConfig,
1445) -> Result<(), WebhookConfigError> {
1446 let url = config
1447 .url
1448 .as_deref()
1449 .filter(|url| !url.trim().is_empty())
1450 .ok_or(WebhookConfigError::RedisReplayMissingUrl)?;
1451
1452 #[cfg(feature = "redis")]
1453 {
1454 redis::Client::open(url)
1455 .map_err(|error| WebhookConfigError::RedisReplayInvalidUrl(error.to_string()))?;
1456 Ok(())
1457 }
1458
1459 #[cfg(not(feature = "redis"))]
1460 {
1461 let _ = url;
1462 Err(WebhookConfigError::RedisReplayFeatureDisabled)
1463 }
1464}
1465
1466fn replay_store_from_config(
1467 config: &WebhookReplayConfig,
1468) -> Result<Arc<dyn WebhookReplayStore>, WebhookConfigError> {
1469 match config.backend {
1470 WebhookReplayBackend::Memory => Ok(Arc::new(InMemoryWebhookReplayStore::default())),
1471 WebhookReplayBackend::Redis => {
1472 #[cfg(feature = "redis")]
1473 {
1474 Ok(Arc::new(RedisWebhookReplayStore::from_config(
1475 &config.redis,
1476 )?))
1477 }
1478
1479 #[cfg(not(feature = "redis"))]
1480 {
1481 Err(WebhookConfigError::RedisReplayFeatureDisabled)
1482 }
1483 }
1484 }
1485}
1486
1487pub(crate) fn install_registry_from_config(
1488 state: &crate::AppState,
1489 config: &WebhookConfig,
1490) -> Result<(), WebhookConfigError> {
1491 if config.endpoints.is_empty() {
1492 return Ok(());
1493 }
1494 let registry = WebhookRegistry::from_config(config)?;
1495 state.insert_extension(registry);
1496 Ok(())
1497}