reasonkit_web/stripe/
handler.rs

1//! Axum Handler and Router for Stripe Webhooks
2//!
3//! This module provides the HTTP layer for receiving and processing Stripe webhooks.
4//!
5//! # Key Features
6//!
7//! - **Raw Body Extraction**: Captures the exact bytes for signature verification
8//! - **Quick Acknowledgment**: Returns 200/202 immediately, processes asynchronously
9//! - **Proper Error Responses**: Returns appropriate HTTP status codes
10//!
11//! # Endpoint
12//!
13//! `POST /webhooks/stripe`
14//!
15//! # Headers
16//!
17//! Required:
18//! - `stripe-signature`: The Stripe webhook signature
19//! - `Content-Type: application/json`
20
21use std::sync::Arc;
22
23use axum::{
24    body::Bytes,
25    extract::State,
26    http::{HeaderMap, StatusCode},
27    response::{IntoResponse, Response},
28    routing::post,
29    Json, Router,
30};
31use serde::Serialize;
32
33use crate::stripe::config::StripeWebhookConfig;
34use crate::stripe::error::StripeWebhookError;
35use crate::stripe::events::StripeEvent;
36use crate::stripe::idempotency::{
37    IdempotencyMiddleware, IdempotencyStore, InMemoryIdempotencyStore,
38};
39use crate::stripe::processor::{EventProcessor, SubscriptionHandler};
40use crate::stripe::signature::SignatureVerifier;
41
42/// Shared state for the webhook handler
43pub struct StripeWebhookState<
44    H: SubscriptionHandler,
45    S: IdempotencyStore = InMemoryIdempotencyStore,
46> {
47    /// Signature verifier
48    pub verifier: SignatureVerifier,
49    /// Idempotency middleware
50    pub idempotency: IdempotencyMiddleware<S>,
51    /// Event processor
52    pub processor: EventProcessor<H, S>,
53    /// Configuration
54    pub config: StripeWebhookConfig,
55    /// Whether to log payloads (DISABLE in production)
56    pub log_payloads: bool,
57}
58
59impl<H: SubscriptionHandler> StripeWebhookState<H, InMemoryIdempotencyStore> {
60    /// Create new state with in-memory idempotency store
61    pub fn new(
62        config: StripeWebhookConfig,
63        handler: Arc<H>,
64    ) -> (
65        Self,
66        crate::stripe::processor::ProcessorHandle<H, InMemoryIdempotencyStore>,
67    ) {
68        let store = Arc::new(InMemoryIdempotencyStore::from_config(&config));
69        Self::with_store(config, handler, store)
70    }
71}
72
73impl<H: SubscriptionHandler, S: IdempotencyStore> StripeWebhookState<H, S> {
74    /// Create new state with custom idempotency store
75    pub fn with_store(
76        config: StripeWebhookConfig,
77        handler: Arc<H>,
78        store: Arc<S>,
79    ) -> (Self, crate::stripe::processor::ProcessorHandle<H, S>) {
80        let verifier = SignatureVerifier::new(&config);
81        let idempotency = IdempotencyMiddleware::new(store.clone());
82        let (processor, handle) = EventProcessor::new(handler, store, config.clone());
83        let log_payloads = config.log_payloads;
84
85        let state = Self {
86            verifier,
87            idempotency,
88            processor,
89            config,
90            log_payloads,
91        };
92
93        (state, handle)
94    }
95}
96
97/// Success response for webhook acknowledgment
98#[derive(Debug, Clone, Serialize)]
99pub struct WebhookResponse {
100    pub received: bool,
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub event_id: Option<String>,
103    #[serde(skip_serializing_if = "Option::is_none")]
104    pub message: Option<String>,
105}
106
107impl WebhookResponse {
108    fn success(event_id: &str) -> Self {
109        Self {
110            received: true,
111            event_id: Some(event_id.to_string()),
112            message: None,
113        }
114    }
115
116    fn already_processed(event_id: &str) -> Self {
117        Self {
118            received: true,
119            event_id: Some(event_id.to_string()),
120            message: Some("Event already processed".to_string()),
121        }
122    }
123}
124
125/// Create the Stripe webhook router
126///
127/// # Example
128///
129/// ```rust,ignore
130/// use reasonkit_web::stripe::{StripeWebhookConfig, StripeWebhookState, stripe_webhook_router};
131/// use std::sync::Arc;
132///
133/// let config = StripeWebhookConfig::from_env()?;
134/// let handler = Arc::new(MyHandler);
135/// let (state, processor_handle) = StripeWebhookState::new(config, handler);
136///
137/// // Start the background processor
138/// tokio::spawn(async move {
139///     processor_handle.run().await;
140/// });
141///
142/// // Create the router
143/// let app = stripe_webhook_router(Arc::new(state));
144/// ```
145pub fn stripe_webhook_router<H: SubscriptionHandler, S: IdempotencyStore>(
146    state: Arc<StripeWebhookState<H, S>>,
147) -> Router {
148    Router::new()
149        .route("/webhooks/stripe", post(stripe_webhook_handler))
150        .with_state(state)
151}
152
153/// Main webhook handler
154///
155/// Processes incoming Stripe webhook requests:
156///
157/// 1. Extracts the `stripe-signature` header
158/// 2. Verifies the signature using HMAC-SHA256
159/// 3. Checks idempotency (have we seen this event?)
160/// 4. Queues the event for async processing
161/// 5. Returns 200 (new event) or 202 (already processed)
162pub async fn stripe_webhook_handler<H: SubscriptionHandler, S: IdempotencyStore>(
163    State(state): State<Arc<StripeWebhookState<H, S>>>,
164    headers: HeaderMap,
165    body: Bytes,
166) -> Response {
167    // Extract signature header
168    let signature = match headers.get("stripe-signature") {
169        Some(sig) => match sig.to_str() {
170            Ok(s) => s,
171            Err(_) => {
172                return StripeWebhookError::InvalidSignatureFormat(
173                    "Invalid header encoding".to_string(),
174                )
175                .into_response();
176            }
177        },
178        None => {
179            return StripeWebhookError::MissingSignature.into_response();
180        }
181    };
182
183    // Verify signature
184    if let Err(e) = state.verifier.verify(signature, &body) {
185        tracing::warn!(error = %e, "Stripe webhook signature verification failed");
186        return e.into_response();
187    }
188
189    // Parse the event
190    let event = match StripeEvent::from_bytes(&body) {
191        Ok(e) => e,
192        Err(e) => {
193            tracing::warn!(error = %e, "Failed to parse Stripe webhook payload");
194            return e.into_response();
195        }
196    };
197
198    // Log the event (if enabled - DISABLE in production for PII)
199    if state.log_payloads {
200        tracing::debug!(
201            event_id = %event.id,
202            event_type = %event.event_type,
203            livemode = event.livemode,
204            "Received Stripe webhook"
205        );
206    } else {
207        tracing::info!(
208            event_id = %event.id,
209            event_type = %event.event_type,
210            "Received Stripe webhook"
211        );
212    }
213
214    // Check idempotency
215    match state.idempotency.should_process(&event.id).await {
216        Ok(true) => {
217            // New event - queue for processing and return 200
218            if let Err(e) = state.processor.queue_event(event.clone()).await {
219                tracing::error!(
220                    event_id = %event.id,
221                    error = %e,
222                    "Failed to queue event for processing"
223                );
224                // Still return 200 - we recorded it, will retry on next delivery
225            }
226
227            (StatusCode::OK, Json(WebhookResponse::success(&event.id))).into_response()
228        }
229        Ok(false) => {
230            // Already in progress (should not happen with our implementation)
231            (
232                StatusCode::ACCEPTED,
233                Json(WebhookResponse::already_processed(&event.id)),
234            )
235                .into_response()
236        }
237        Err(StripeWebhookError::AlreadyProcessed { event_id }) => {
238            // Already processed - return 202 to acknowledge without reprocessing
239            tracing::debug!(
240                event_id = %event_id,
241                "Stripe webhook already processed (idempotent)"
242            );
243            (
244                StatusCode::ACCEPTED,
245                Json(WebhookResponse::already_processed(&event_id)),
246            )
247                .into_response()
248        }
249        Err(e) => {
250            // Other error
251            tracing::error!(
252                event_id = %event.id,
253                error = %e,
254                "Idempotency check failed"
255            );
256            e.into_response()
257        }
258    }
259}
260
261/// Health check endpoint for the webhook handler
262pub async fn webhook_health() -> impl IntoResponse {
263    (
264        StatusCode::OK,
265        Json(serde_json::json!({
266            "status": "healthy",
267            "service": "stripe-webhook-handler"
268        })),
269    )
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275    use crate::stripe::processor::NoOpHandler;
276    use axum::body::Body;
277    use axum::http::{Method, Request};
278    use std::time::{SystemTime, UNIX_EPOCH};
279    use tower::ServiceExt;
280
281    fn current_timestamp() -> i64 {
282        SystemTime::now()
283            .duration_since(UNIX_EPOCH)
284            .unwrap()
285            .as_secs() as i64
286    }
287
288    fn create_test_state() -> (
289        Arc<StripeWebhookState<NoOpHandler, InMemoryIdempotencyStore>>,
290        crate::stripe::processor::ProcessorHandle<NoOpHandler, InMemoryIdempotencyStore>,
291    ) {
292        let config = StripeWebhookConfig::test_config();
293        let handler = Arc::new(NoOpHandler);
294        let (state, handle) = StripeWebhookState::new(config, handler);
295        (Arc::new(state), handle)
296    }
297
298    fn create_valid_webhook_request(
299        state: &StripeWebhookState<NoOpHandler, InMemoryIdempotencyStore>,
300    ) -> (String, Vec<u8>) {
301        let payload = r#"{
302            "id": "evt_test_handler_123",
303            "type": "customer.subscription.created",
304            "created": 1614556800,
305            "livemode": false,
306            "pending_webhooks": 1,
307            "data": {
308                "object": {
309                    "id": "sub_test_123",
310                    "customer": "cus_test_123",
311                    "status": "active",
312                    "current_period_start": 1614556800,
313                    "current_period_end": 1617235200,
314                    "cancel_at_period_end": false,
315                    "items": {
316                        "data": [{
317                            "id": "si_test_123",
318                            "price": {
319                                "id": "price_test_123",
320                                "product": "prod_test_123",
321                                "unit_amount": 2000,
322                                "currency": "usd",
323                                "recurring": {
324                                    "interval": "month",
325                                    "interval_count": 1
326                                }
327                            },
328                            "quantity": 1
329                        }]
330                    },
331                    "metadata": {},
332                    "livemode": false
333                }
334            }
335        }"#;
336
337        let timestamp = current_timestamp();
338        let signature = state
339            .verifier
340            .generate_test_signature_public(payload.as_bytes(), timestamp);
341        let header = format!("t={},v1={}", timestamp, signature);
342
343        (header, payload.as_bytes().to_vec())
344    }
345
346    #[tokio::test]
347    async fn test_webhook_handler_success() {
348        let (state, _handle) = create_test_state();
349        let app = stripe_webhook_router(state.clone());
350
351        let (signature, payload) = create_valid_webhook_request(&state);
352
353        let request = Request::builder()
354            .method(Method::POST)
355            .uri("/webhooks/stripe")
356            .header("content-type", "application/json")
357            .header("stripe-signature", signature)
358            .body(Body::from(payload))
359            .unwrap();
360
361        let response = app.oneshot(request).await.unwrap();
362        assert_eq!(response.status(), StatusCode::OK);
363    }
364
365    #[tokio::test]
366    async fn test_webhook_handler_missing_signature() {
367        let (state, _handle) = create_test_state();
368        let app = stripe_webhook_router(state);
369
370        let request = Request::builder()
371            .method(Method::POST)
372            .uri("/webhooks/stripe")
373            .header("content-type", "application/json")
374            // No stripe-signature header
375            .body(Body::from(r#"{"id": "test"}"#))
376            .unwrap();
377
378        let response = app.oneshot(request).await.unwrap();
379        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
380    }
381
382    #[tokio::test]
383    async fn test_webhook_handler_invalid_signature() {
384        let (state, _handle) = create_test_state();
385        let app = stripe_webhook_router(state);
386
387        let timestamp = current_timestamp();
388        let invalid_signature = format!(
389            "t={},v1=0000000000000000000000000000000000000000000000000000000000000000",
390            timestamp
391        );
392
393        let request = Request::builder()
394            .method(Method::POST)
395            .uri("/webhooks/stripe")
396            .header("content-type", "application/json")
397            .header("stripe-signature", invalid_signature)
398            .body(Body::from(r#"{"id": "test"}"#))
399            .unwrap();
400
401        let response = app.oneshot(request).await.unwrap();
402        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
403    }
404
405    #[tokio::test]
406    async fn test_webhook_handler_invalid_payload() {
407        let (state, _handle) = create_test_state();
408        let app = stripe_webhook_router(state.clone());
409
410        let payload = b"not valid json";
411        let timestamp = current_timestamp();
412        let signature = state
413            .verifier
414            .generate_test_signature_public(payload, timestamp);
415        let header = format!("t={},v1={}", timestamp, signature);
416
417        let request = Request::builder()
418            .method(Method::POST)
419            .uri("/webhooks/stripe")
420            .header("content-type", "application/json")
421            .header("stripe-signature", header)
422            .body(Body::from(payload.to_vec()))
423            .unwrap();
424
425        let response = app.oneshot(request).await.unwrap();
426        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
427    }
428
429    #[tokio::test]
430    async fn test_webhook_handler_idempotency() {
431        let (state, _handle) = create_test_state();
432
433        // Use unique event IDs for each test run
434        let unique_id = format!("evt_idempotency_test_{}", current_timestamp());
435        let payload = format!(
436            r#"{{
437            "id": "{}",
438            "type": "customer.subscription.created",
439            "created": 1614556800,
440            "livemode": false,
441            "pending_webhooks": 1,
442            "data": {{
443                "object": {{
444                    "id": "sub_test_123",
445                    "customer": "cus_test_123",
446                    "status": "active",
447                    "current_period_start": 1614556800,
448                    "current_period_end": 1617235200,
449                    "cancel_at_period_end": false,
450                    "items": {{
451                        "data": [{{
452                            "id": "si_test_123",
453                            "price": {{
454                                "id": "price_test_123",
455                                "product": "prod_test_123",
456                                "unit_amount": 2000,
457                                "currency": "usd",
458                                "recurring": {{
459                                    "interval": "month",
460                                    "interval_count": 1
461                                }}
462                            }},
463                            "quantity": 1
464                        }}]
465                    }},
466                    "metadata": {{}},
467                    "livemode": false
468                }}
469            }}
470        }}"#,
471            unique_id
472        );
473
474        let timestamp = current_timestamp();
475        let signature = state
476            .verifier
477            .generate_test_signature_public(payload.as_bytes(), timestamp);
478        let header = format!("t={},v1={}", timestamp, signature);
479
480        // First request - should return 200
481        let app1 = stripe_webhook_router(state.clone());
482        let request1 = Request::builder()
483            .method(Method::POST)
484            .uri("/webhooks/stripe")
485            .header("content-type", "application/json")
486            .header("stripe-signature", header.clone())
487            .body(Body::from(payload.clone()))
488            .unwrap();
489
490        let response1 = app1.oneshot(request1).await.unwrap();
491        assert_eq!(response1.status(), StatusCode::OK);
492
493        // Second request with same event - should return 202
494        let app2 = stripe_webhook_router(state.clone());
495        let request2 = Request::builder()
496            .method(Method::POST)
497            .uri("/webhooks/stripe")
498            .header("content-type", "application/json")
499            .header("stripe-signature", header)
500            .body(Body::from(payload))
501            .unwrap();
502
503        let response2 = app2.oneshot(request2).await.unwrap();
504        assert_eq!(response2.status(), StatusCode::ACCEPTED);
505    }
506}