1use std::sync::Arc;
22
23use axum::{
24 body::Bytes,
25 extract::State,
26 http::{HeaderMap, StatusCode},
27 response::{IntoResponse, Response},
28 routing::post,
29 Json, Router,
30};
31use serde::Serialize;
32
33use crate::stripe::config::StripeWebhookConfig;
34use crate::stripe::error::StripeWebhookError;
35use crate::stripe::events::StripeEvent;
36use crate::stripe::idempotency::{
37 IdempotencyMiddleware, IdempotencyStore, InMemoryIdempotencyStore,
38};
39use crate::stripe::processor::{EventProcessor, SubscriptionHandler};
40use crate::stripe::signature::SignatureVerifier;
41
42pub struct StripeWebhookState<
44 H: SubscriptionHandler,
45 S: IdempotencyStore = InMemoryIdempotencyStore,
46> {
47 pub verifier: SignatureVerifier,
49 pub idempotency: IdempotencyMiddleware<S>,
51 pub processor: EventProcessor<H, S>,
53 pub config: StripeWebhookConfig,
55 pub log_payloads: bool,
57}
58
59impl<H: SubscriptionHandler> StripeWebhookState<H, InMemoryIdempotencyStore> {
60 pub fn new(
62 config: StripeWebhookConfig,
63 handler: Arc<H>,
64 ) -> (
65 Self,
66 crate::stripe::processor::ProcessorHandle<H, InMemoryIdempotencyStore>,
67 ) {
68 let store = Arc::new(InMemoryIdempotencyStore::from_config(&config));
69 Self::with_store(config, handler, store)
70 }
71}
72
73impl<H: SubscriptionHandler, S: IdempotencyStore> StripeWebhookState<H, S> {
74 pub fn with_store(
76 config: StripeWebhookConfig,
77 handler: Arc<H>,
78 store: Arc<S>,
79 ) -> (Self, crate::stripe::processor::ProcessorHandle<H, S>) {
80 let verifier = SignatureVerifier::new(&config);
81 let idempotency = IdempotencyMiddleware::new(store.clone());
82 let (processor, handle) = EventProcessor::new(handler, store, config.clone());
83 let log_payloads = config.log_payloads;
84
85 let state = Self {
86 verifier,
87 idempotency,
88 processor,
89 config,
90 log_payloads,
91 };
92
93 (state, handle)
94 }
95}
96
97#[derive(Debug, Clone, Serialize)]
99pub struct WebhookResponse {
100 pub received: bool,
101 #[serde(skip_serializing_if = "Option::is_none")]
102 pub event_id: Option<String>,
103 #[serde(skip_serializing_if = "Option::is_none")]
104 pub message: Option<String>,
105}
106
107impl WebhookResponse {
108 fn success(event_id: &str) -> Self {
109 Self {
110 received: true,
111 event_id: Some(event_id.to_string()),
112 message: None,
113 }
114 }
115
116 fn already_processed(event_id: &str) -> Self {
117 Self {
118 received: true,
119 event_id: Some(event_id.to_string()),
120 message: Some("Event already processed".to_string()),
121 }
122 }
123}
124
125pub fn stripe_webhook_router<H: SubscriptionHandler, S: IdempotencyStore>(
146 state: Arc<StripeWebhookState<H, S>>,
147) -> Router {
148 Router::new()
149 .route("/webhooks/stripe", post(stripe_webhook_handler))
150 .with_state(state)
151}
152
153pub async fn stripe_webhook_handler<H: SubscriptionHandler, S: IdempotencyStore>(
163 State(state): State<Arc<StripeWebhookState<H, S>>>,
164 headers: HeaderMap,
165 body: Bytes,
166) -> Response {
167 let signature = match headers.get("stripe-signature") {
169 Some(sig) => match sig.to_str() {
170 Ok(s) => s,
171 Err(_) => {
172 return StripeWebhookError::InvalidSignatureFormat(
173 "Invalid header encoding".to_string(),
174 )
175 .into_response();
176 }
177 },
178 None => {
179 return StripeWebhookError::MissingSignature.into_response();
180 }
181 };
182
183 if let Err(e) = state.verifier.verify(signature, &body) {
185 tracing::warn!(error = %e, "Stripe webhook signature verification failed");
186 return e.into_response();
187 }
188
189 let event = match StripeEvent::from_bytes(&body) {
191 Ok(e) => e,
192 Err(e) => {
193 tracing::warn!(error = %e, "Failed to parse Stripe webhook payload");
194 return e.into_response();
195 }
196 };
197
198 if state.log_payloads {
200 tracing::debug!(
201 event_id = %event.id,
202 event_type = %event.event_type,
203 livemode = event.livemode,
204 "Received Stripe webhook"
205 );
206 } else {
207 tracing::info!(
208 event_id = %event.id,
209 event_type = %event.event_type,
210 "Received Stripe webhook"
211 );
212 }
213
214 match state.idempotency.should_process(&event.id).await {
216 Ok(true) => {
217 if let Err(e) = state.processor.queue_event(event.clone()).await {
219 tracing::error!(
220 event_id = %event.id,
221 error = %e,
222 "Failed to queue event for processing"
223 );
224 }
226
227 (StatusCode::OK, Json(WebhookResponse::success(&event.id))).into_response()
228 }
229 Ok(false) => {
230 (
232 StatusCode::ACCEPTED,
233 Json(WebhookResponse::already_processed(&event.id)),
234 )
235 .into_response()
236 }
237 Err(StripeWebhookError::AlreadyProcessed { event_id }) => {
238 tracing::debug!(
240 event_id = %event_id,
241 "Stripe webhook already processed (idempotent)"
242 );
243 (
244 StatusCode::ACCEPTED,
245 Json(WebhookResponse::already_processed(&event_id)),
246 )
247 .into_response()
248 }
249 Err(e) => {
250 tracing::error!(
252 event_id = %event.id,
253 error = %e,
254 "Idempotency check failed"
255 );
256 e.into_response()
257 }
258 }
259}
260
261pub async fn webhook_health() -> impl IntoResponse {
263 (
264 StatusCode::OK,
265 Json(serde_json::json!({
266 "status": "healthy",
267 "service": "stripe-webhook-handler"
268 })),
269 )
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275 use crate::stripe::processor::NoOpHandler;
276 use axum::body::Body;
277 use axum::http::{Method, Request};
278 use std::time::{SystemTime, UNIX_EPOCH};
279 use tower::ServiceExt;
280
281 fn current_timestamp() -> i64 {
282 SystemTime::now()
283 .duration_since(UNIX_EPOCH)
284 .unwrap()
285 .as_secs() as i64
286 }
287
288 fn create_test_state() -> (
289 Arc<StripeWebhookState<NoOpHandler, InMemoryIdempotencyStore>>,
290 crate::stripe::processor::ProcessorHandle<NoOpHandler, InMemoryIdempotencyStore>,
291 ) {
292 let config = StripeWebhookConfig::test_config();
293 let handler = Arc::new(NoOpHandler);
294 let (state, handle) = StripeWebhookState::new(config, handler);
295 (Arc::new(state), handle)
296 }
297
298 fn create_valid_webhook_request(
299 state: &StripeWebhookState<NoOpHandler, InMemoryIdempotencyStore>,
300 ) -> (String, Vec<u8>) {
301 let payload = r#"{
302 "id": "evt_test_handler_123",
303 "type": "customer.subscription.created",
304 "created": 1614556800,
305 "livemode": false,
306 "pending_webhooks": 1,
307 "data": {
308 "object": {
309 "id": "sub_test_123",
310 "customer": "cus_test_123",
311 "status": "active",
312 "current_period_start": 1614556800,
313 "current_period_end": 1617235200,
314 "cancel_at_period_end": false,
315 "items": {
316 "data": [{
317 "id": "si_test_123",
318 "price": {
319 "id": "price_test_123",
320 "product": "prod_test_123",
321 "unit_amount": 2000,
322 "currency": "usd",
323 "recurring": {
324 "interval": "month",
325 "interval_count": 1
326 }
327 },
328 "quantity": 1
329 }]
330 },
331 "metadata": {},
332 "livemode": false
333 }
334 }
335 }"#;
336
337 let timestamp = current_timestamp();
338 let signature = state
339 .verifier
340 .generate_test_signature_public(payload.as_bytes(), timestamp);
341 let header = format!("t={},v1={}", timestamp, signature);
342
343 (header, payload.as_bytes().to_vec())
344 }
345
346 #[tokio::test]
347 async fn test_webhook_handler_success() {
348 let (state, _handle) = create_test_state();
349 let app = stripe_webhook_router(state.clone());
350
351 let (signature, payload) = create_valid_webhook_request(&state);
352
353 let request = Request::builder()
354 .method(Method::POST)
355 .uri("/webhooks/stripe")
356 .header("content-type", "application/json")
357 .header("stripe-signature", signature)
358 .body(Body::from(payload))
359 .unwrap();
360
361 let response = app.oneshot(request).await.unwrap();
362 assert_eq!(response.status(), StatusCode::OK);
363 }
364
365 #[tokio::test]
366 async fn test_webhook_handler_missing_signature() {
367 let (state, _handle) = create_test_state();
368 let app = stripe_webhook_router(state);
369
370 let request = Request::builder()
371 .method(Method::POST)
372 .uri("/webhooks/stripe")
373 .header("content-type", "application/json")
374 .body(Body::from(r#"{"id": "test"}"#))
376 .unwrap();
377
378 let response = app.oneshot(request).await.unwrap();
379 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
380 }
381
382 #[tokio::test]
383 async fn test_webhook_handler_invalid_signature() {
384 let (state, _handle) = create_test_state();
385 let app = stripe_webhook_router(state);
386
387 let timestamp = current_timestamp();
388 let invalid_signature = format!(
389 "t={},v1=0000000000000000000000000000000000000000000000000000000000000000",
390 timestamp
391 );
392
393 let request = Request::builder()
394 .method(Method::POST)
395 .uri("/webhooks/stripe")
396 .header("content-type", "application/json")
397 .header("stripe-signature", invalid_signature)
398 .body(Body::from(r#"{"id": "test"}"#))
399 .unwrap();
400
401 let response = app.oneshot(request).await.unwrap();
402 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
403 }
404
405 #[tokio::test]
406 async fn test_webhook_handler_invalid_payload() {
407 let (state, _handle) = create_test_state();
408 let app = stripe_webhook_router(state.clone());
409
410 let payload = b"not valid json";
411 let timestamp = current_timestamp();
412 let signature = state
413 .verifier
414 .generate_test_signature_public(payload, timestamp);
415 let header = format!("t={},v1={}", timestamp, signature);
416
417 let request = Request::builder()
418 .method(Method::POST)
419 .uri("/webhooks/stripe")
420 .header("content-type", "application/json")
421 .header("stripe-signature", header)
422 .body(Body::from(payload.to_vec()))
423 .unwrap();
424
425 let response = app.oneshot(request).await.unwrap();
426 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
427 }
428
429 #[tokio::test]
430 async fn test_webhook_handler_idempotency() {
431 let (state, _handle) = create_test_state();
432
433 let unique_id = format!("evt_idempotency_test_{}", current_timestamp());
435 let payload = format!(
436 r#"{{
437 "id": "{}",
438 "type": "customer.subscription.created",
439 "created": 1614556800,
440 "livemode": false,
441 "pending_webhooks": 1,
442 "data": {{
443 "object": {{
444 "id": "sub_test_123",
445 "customer": "cus_test_123",
446 "status": "active",
447 "current_period_start": 1614556800,
448 "current_period_end": 1617235200,
449 "cancel_at_period_end": false,
450 "items": {{
451 "data": [{{
452 "id": "si_test_123",
453 "price": {{
454 "id": "price_test_123",
455 "product": "prod_test_123",
456 "unit_amount": 2000,
457 "currency": "usd",
458 "recurring": {{
459 "interval": "month",
460 "interval_count": 1
461 }}
462 }},
463 "quantity": 1
464 }}]
465 }},
466 "metadata": {{}},
467 "livemode": false
468 }}
469 }}
470 }}"#,
471 unique_id
472 );
473
474 let timestamp = current_timestamp();
475 let signature = state
476 .verifier
477 .generate_test_signature_public(payload.as_bytes(), timestamp);
478 let header = format!("t={},v1={}", timestamp, signature);
479
480 let app1 = stripe_webhook_router(state.clone());
482 let request1 = Request::builder()
483 .method(Method::POST)
484 .uri("/webhooks/stripe")
485 .header("content-type", "application/json")
486 .header("stripe-signature", header.clone())
487 .body(Body::from(payload.clone()))
488 .unwrap();
489
490 let response1 = app1.oneshot(request1).await.unwrap();
491 assert_eq!(response1.status(), StatusCode::OK);
492
493 let app2 = stripe_webhook_router(state.clone());
495 let request2 = Request::builder()
496 .method(Method::POST)
497 .uri("/webhooks/stripe")
498 .header("content-type", "application/json")
499 .header("stripe-signature", header)
500 .body(Body::from(payload))
501 .unwrap();
502
503 let response2 = app2.oneshot(request2).await.unwrap();
504 assert_eq!(response2.status(), StatusCode::ACCEPTED);
505 }
506}