reasonkit_web/stripe/
processor.rs

1//! Async Event Processing
2//!
3//! Handles background processing of webhook events with retry logic.
4//! The key design principle: acknowledge the webhook quickly (return 200),
5//! then process the event asynchronously.
6//!
7//! # Architecture
8//!
9//! ```text
10//! Webhook Received
11//!       |
12//!       v
13//! [Verify Signature]
14//!       |
15//!       v
16//! [Check Idempotency] --> Already processed? --> Return 202
17//!       |
18//!       v
19//! [Spawn Background Task] --> Return 200 immediately
20//!       |
21//!       v
22//! [Process Event with Retries]
23//!       |
24//!       v
25//! [Update Idempotency Store]
26//! ```
27
28use std::sync::Arc;
29
30use tokio::sync::mpsc;
31use tokio::time::timeout;
32
33use crate::stripe::config::StripeWebhookConfig;
34use crate::stripe::error::StripeWebhookResult;
35use crate::stripe::events::{
36    CustomerEvent, InvoiceEvent, StripeEvent, StripeEventType, SubscriptionEvent,
37};
38use crate::stripe::idempotency::IdempotencyStore;
39
40/// Handler trait for subscription events
41#[async_trait::async_trait]
42pub trait SubscriptionHandler: Send + Sync + 'static {
43    /// Handle new subscription created
44    async fn on_subscription_created(&self, event: &SubscriptionEvent) -> anyhow::Result<()>;
45
46    /// Handle subscription updated (plan change, status change, etc.)
47    async fn on_subscription_updated(&self, event: &SubscriptionEvent) -> anyhow::Result<()>;
48
49    /// Handle subscription deleted/canceled
50    async fn on_subscription_deleted(&self, event: &SubscriptionEvent) -> anyhow::Result<()>;
51
52    /// Handle invoice payment succeeded
53    async fn on_payment_succeeded(&self, event: &InvoiceEvent) -> anyhow::Result<()>;
54
55    /// Handle invoice payment failed
56    async fn on_payment_failed(&self, event: &InvoiceEvent) -> anyhow::Result<()>;
57
58    /// Handle new customer created
59    async fn on_customer_created(&self, event: &CustomerEvent) -> anyhow::Result<()>;
60}
61
62/// Event processor that handles webhook events asynchronously
63pub struct EventProcessor<H: SubscriptionHandler, S: IdempotencyStore> {
64    handler: Arc<H>,
65    idempotency_store: Arc<S>,
66    config: StripeWebhookConfig,
67    /// Channel for background task processing
68    task_sender: mpsc::Sender<ProcessingTask>,
69}
70
71/// A processing task sent to background workers
72struct ProcessingTask {
73    event: StripeEvent,
74    /// Retry attempt counter (reserved for future retry logic)
75    #[allow(dead_code)]
76    attempt: u32,
77}
78
79impl<H: SubscriptionHandler, S: IdempotencyStore> EventProcessor<H, S> {
80    /// Create a new event processor
81    pub fn new(
82        handler: Arc<H>,
83        idempotency_store: Arc<S>,
84        config: StripeWebhookConfig,
85    ) -> (Self, ProcessorHandle<H, S>) {
86        let (tx, rx) = mpsc::channel(1000);
87
88        let processor = Self {
89            handler: handler.clone(),
90            idempotency_store: idempotency_store.clone(),
91            config: config.clone(),
92            task_sender: tx,
93        };
94
95        let handle = ProcessorHandle {
96            handler,
97            idempotency_store,
98            config,
99            task_receiver: rx,
100        };
101
102        (processor, handle)
103    }
104
105    /// Queue an event for async processing
106    ///
107    /// This returns immediately after queuing. The actual processing
108    /// happens in a background task.
109    pub async fn queue_event(&self, event: StripeEvent) -> StripeWebhookResult<()> {
110        let task = ProcessingTask { event, attempt: 0 };
111
112        self.task_sender.send(task).await.map_err(|e| {
113            crate::stripe::error::StripeWebhookError::InternalError(format!(
114                "Failed to queue event: {}",
115                e
116            ))
117        })?;
118
119        Ok(())
120    }
121
122    /// Process an event synchronously (for testing or immediate processing)
123    pub async fn process_event_sync(&self, event: &StripeEvent) -> StripeWebhookResult<()> {
124        process_single_event(&self.handler, &self.idempotency_store, event, &self.config).await
125    }
126}
127
128/// Handle for running the background processor
129pub struct ProcessorHandle<H: SubscriptionHandler, S: IdempotencyStore> {
130    handler: Arc<H>,
131    idempotency_store: Arc<S>,
132    config: StripeWebhookConfig,
133    task_receiver: mpsc::Receiver<ProcessingTask>,
134}
135
136impl<H: SubscriptionHandler, S: IdempotencyStore> ProcessorHandle<H, S> {
137    /// Run the background processor
138    ///
139    /// This should be spawned as a tokio task:
140    ///
141    /// ```rust,ignore
142    /// tokio::spawn(async move {
143    ///     handle.run().await;
144    /// });
145    /// ```
146    pub async fn run(mut self) {
147        tracing::info!("Starting Stripe webhook event processor");
148
149        while let Some(task) = self.task_receiver.recv().await {
150            let handler = self.handler.clone();
151            let store = self.idempotency_store.clone();
152            let config = self.config.clone();
153
154            // Spawn each event processing in its own task
155            tokio::spawn(async move {
156                process_with_retry(handler, store, task.event, &config).await;
157            });
158        }
159
160        tracing::info!("Stripe webhook event processor shutting down");
161    }
162}
163
164/// Process a single event with retry logic
165async fn process_with_retry<H: SubscriptionHandler, S: IdempotencyStore>(
166    handler: Arc<H>,
167    store: Arc<S>,
168    event: StripeEvent,
169    config: &StripeWebhookConfig,
170) {
171    let event_id = event.id.clone();
172    let event_type = event.event_type.clone();
173
174    for attempt in 0..=config.max_retries {
175        if attempt > 0 {
176            let delay = config.retry_delay(attempt - 1);
177            tracing::info!(
178                event_id = %event_id,
179                event_type = %event_type,
180                attempt,
181                delay_ms = delay.as_millis(),
182                "Retrying event processing"
183            );
184            tokio::time::sleep(delay).await;
185        }
186
187        match process_single_event(&handler, &store, &event, config).await {
188            Ok(()) => {
189                tracing::info!(
190                    event_id = %event_id,
191                    event_type = %event_type,
192                    attempts = attempt + 1,
193                    "Event processed successfully"
194                );
195                return;
196            }
197            Err(e) => {
198                tracing::warn!(
199                    event_id = %event_id,
200                    event_type = %event_type,
201                    attempt = attempt + 1,
202                    max_retries = config.max_retries,
203                    error = %e,
204                    "Event processing failed"
205                );
206
207                if attempt == config.max_retries {
208                    // Final failure - mark as failed in idempotency store
209                    if let Err(mark_err) = store.mark_failed(&event_id, &e.to_string()).await {
210                        tracing::error!(
211                            event_id = %event_id,
212                            error = %mark_err,
213                            "Failed to mark event as failed in idempotency store"
214                        );
215                    }
216                }
217            }
218        }
219    }
220}
221
222/// Process a single event
223async fn process_single_event<H: SubscriptionHandler, S: IdempotencyStore>(
224    handler: &Arc<H>,
225    store: &Arc<S>,
226    event: &StripeEvent,
227    config: &StripeWebhookConfig,
228) -> StripeWebhookResult<()> {
229    let event_type = event.typed_event_type();
230
231    // Apply timeout to prevent hanging
232    let result = timeout(config.processing_timeout, async {
233        match event_type {
234            StripeEventType::SubscriptionCreated => {
235                let sub_event = event.as_subscription()?;
236                handler
237                    .on_subscription_created(&sub_event)
238                    .await
239                    .map_err(|e| {
240                        crate::stripe::error::StripeWebhookError::ProcessingFailed(e.to_string())
241                    })
242            }
243            StripeEventType::SubscriptionUpdated => {
244                let sub_event = event.as_subscription()?;
245                handler
246                    .on_subscription_updated(&sub_event)
247                    .await
248                    .map_err(|e| {
249                        crate::stripe::error::StripeWebhookError::ProcessingFailed(e.to_string())
250                    })
251            }
252            StripeEventType::SubscriptionDeleted => {
253                let sub_event = event.as_subscription()?;
254                handler
255                    .on_subscription_deleted(&sub_event)
256                    .await
257                    .map_err(|e| {
258                        crate::stripe::error::StripeWebhookError::ProcessingFailed(e.to_string())
259                    })
260            }
261            StripeEventType::InvoicePaymentSucceeded => {
262                let invoice_event = event.as_invoice()?;
263                handler
264                    .on_payment_succeeded(&invoice_event)
265                    .await
266                    .map_err(|e| {
267                        crate::stripe::error::StripeWebhookError::ProcessingFailed(e.to_string())
268                    })
269            }
270            StripeEventType::InvoicePaymentFailed => {
271                let invoice_event = event.as_invoice()?;
272                handler
273                    .on_payment_failed(&invoice_event)
274                    .await
275                    .map_err(|e| {
276                        crate::stripe::error::StripeWebhookError::ProcessingFailed(e.to_string())
277                    })
278            }
279            StripeEventType::CustomerCreated => {
280                let customer_event = event.as_customer()?;
281                handler
282                    .on_customer_created(&customer_event)
283                    .await
284                    .map_err(|e| {
285                        crate::stripe::error::StripeWebhookError::ProcessingFailed(e.to_string())
286                    })
287            }
288            StripeEventType::Unknown => {
289                tracing::debug!(
290                    event_id = %event.id,
291                    event_type = %event.event_type,
292                    "Ignoring unknown event type"
293                );
294                Ok(())
295            }
296        }
297    })
298    .await;
299
300    match result {
301        Ok(inner_result) => {
302            if inner_result.is_ok() {
303                // Mark as completed in idempotency store
304                store.mark_completed(&event.id).await?;
305            }
306            inner_result
307        }
308        Err(_) => Err(crate::stripe::error::StripeWebhookError::ProcessingFailed(
309            format!("Processing timed out after {:?}", config.processing_timeout),
310        )),
311    }
312}
313
314/// No-op handler for testing
315#[derive(Clone)]
316pub struct NoOpHandler;
317
318#[async_trait::async_trait]
319impl SubscriptionHandler for NoOpHandler {
320    async fn on_subscription_created(&self, _event: &SubscriptionEvent) -> anyhow::Result<()> {
321        Ok(())
322    }
323    async fn on_subscription_updated(&self, _event: &SubscriptionEvent) -> anyhow::Result<()> {
324        Ok(())
325    }
326    async fn on_subscription_deleted(&self, _event: &SubscriptionEvent) -> anyhow::Result<()> {
327        Ok(())
328    }
329    async fn on_payment_succeeded(&self, _event: &InvoiceEvent) -> anyhow::Result<()> {
330        Ok(())
331    }
332    async fn on_payment_failed(&self, _event: &InvoiceEvent) -> anyhow::Result<()> {
333        Ok(())
334    }
335    async fn on_customer_created(&self, _event: &CustomerEvent) -> anyhow::Result<()> {
336        Ok(())
337    }
338}
339
340/// Logging handler that logs all events
341#[derive(Clone)]
342pub struct LoggingHandler;
343
344#[async_trait::async_trait]
345impl SubscriptionHandler for LoggingHandler {
346    async fn on_subscription_created(&self, event: &SubscriptionEvent) -> anyhow::Result<()> {
347        tracing::info!(
348            subscription_id = %event.subscription.id,
349            customer_id = %event.subscription.customer,
350            status = ?event.subscription.status,
351            "Subscription created"
352        );
353        Ok(())
354    }
355
356    async fn on_subscription_updated(&self, event: &SubscriptionEvent) -> anyhow::Result<()> {
357        tracing::info!(
358            subscription_id = %event.subscription.id,
359            customer_id = %event.subscription.customer,
360            status = ?event.subscription.status,
361            cancel_at_period_end = event.subscription.cancel_at_period_end,
362            "Subscription updated"
363        );
364        Ok(())
365    }
366
367    async fn on_subscription_deleted(&self, event: &SubscriptionEvent) -> anyhow::Result<()> {
368        tracing::info!(
369            subscription_id = %event.subscription.id,
370            customer_id = %event.subscription.customer,
371            "Subscription deleted"
372        );
373        Ok(())
374    }
375
376    async fn on_payment_succeeded(&self, event: &InvoiceEvent) -> anyhow::Result<()> {
377        tracing::info!(
378            invoice_id = %event.invoice.id,
379            customer_id = %event.invoice.customer,
380            amount_paid = event.invoice.amount_paid,
381            currency = %event.invoice.currency,
382            "Payment succeeded"
383        );
384        Ok(())
385    }
386
387    async fn on_payment_failed(&self, event: &InvoiceEvent) -> anyhow::Result<()> {
388        tracing::warn!(
389            invoice_id = %event.invoice.id,
390            customer_id = %event.invoice.customer,
391            amount_due = event.invoice.amount_due,
392            currency = %event.invoice.currency,
393            "Payment failed"
394        );
395        Ok(())
396    }
397
398    async fn on_customer_created(&self, event: &CustomerEvent) -> anyhow::Result<()> {
399        tracing::info!(
400            customer_id = %event.customer.id,
401            email = ?event.customer.email,
402            "Customer created"
403        );
404        Ok(())
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411    use crate::stripe::idempotency::InMemoryIdempotencyStore;
412    use std::sync::atomic::{AtomicU32, Ordering};
413    use std::time::Duration;
414
415    /// Test handler that tracks calls
416    struct TestHandler {
417        subscription_created_calls: AtomicU32,
418        subscription_updated_calls: AtomicU32,
419        subscription_deleted_calls: AtomicU32,
420        payment_succeeded_calls: AtomicU32,
421        payment_failed_calls: AtomicU32,
422        customer_created_calls: AtomicU32,
423        should_fail: std::sync::atomic::AtomicBool,
424    }
425
426    impl TestHandler {
427        fn new() -> Self {
428            Self {
429                subscription_created_calls: AtomicU32::new(0),
430                subscription_updated_calls: AtomicU32::new(0),
431                subscription_deleted_calls: AtomicU32::new(0),
432                payment_succeeded_calls: AtomicU32::new(0),
433                payment_failed_calls: AtomicU32::new(0),
434                customer_created_calls: AtomicU32::new(0),
435                should_fail: std::sync::atomic::AtomicBool::new(false),
436            }
437        }
438    }
439
440    #[async_trait::async_trait]
441    impl SubscriptionHandler for TestHandler {
442        async fn on_subscription_created(&self, _event: &SubscriptionEvent) -> anyhow::Result<()> {
443            self.subscription_created_calls
444                .fetch_add(1, Ordering::SeqCst);
445            if self.should_fail.load(Ordering::SeqCst) {
446                anyhow::bail!("Simulated failure");
447            }
448            Ok(())
449        }
450        async fn on_subscription_updated(&self, _event: &SubscriptionEvent) -> anyhow::Result<()> {
451            self.subscription_updated_calls
452                .fetch_add(1, Ordering::SeqCst);
453            Ok(())
454        }
455        async fn on_subscription_deleted(&self, _event: &SubscriptionEvent) -> anyhow::Result<()> {
456            self.subscription_deleted_calls
457                .fetch_add(1, Ordering::SeqCst);
458            Ok(())
459        }
460        async fn on_payment_succeeded(&self, _event: &InvoiceEvent) -> anyhow::Result<()> {
461            self.payment_succeeded_calls.fetch_add(1, Ordering::SeqCst);
462            Ok(())
463        }
464        async fn on_payment_failed(&self, _event: &InvoiceEvent) -> anyhow::Result<()> {
465            self.payment_failed_calls.fetch_add(1, Ordering::SeqCst);
466            Ok(())
467        }
468        async fn on_customer_created(&self, _event: &CustomerEvent) -> anyhow::Result<()> {
469            self.customer_created_calls.fetch_add(1, Ordering::SeqCst);
470            Ok(())
471        }
472    }
473
474    fn create_test_subscription_event() -> StripeEvent {
475        let json = r#"{
476            "id": "evt_test_123",
477            "type": "customer.subscription.created",
478            "created": 1614556800,
479            "livemode": false,
480            "pending_webhooks": 1,
481            "data": {
482                "object": {
483                    "id": "sub_test_123",
484                    "customer": "cus_test_123",
485                    "status": "active",
486                    "current_period_start": 1614556800,
487                    "current_period_end": 1617235200,
488                    "cancel_at_period_end": false,
489                    "items": {
490                        "data": [{
491                            "id": "si_test_123",
492                            "price": {
493                                "id": "price_test_123",
494                                "product": "prod_test_123",
495                                "unit_amount": 2000,
496                                "currency": "usd",
497                                "recurring": {
498                                    "interval": "month",
499                                    "interval_count": 1
500                                }
501                            },
502                            "quantity": 1
503                        }]
504                    },
505                    "metadata": {},
506                    "livemode": false
507                }
508            }
509        }"#;
510
511        StripeEvent::from_bytes(json.as_bytes()).unwrap()
512    }
513
514    #[tokio::test]
515    async fn test_process_subscription_created() {
516        let handler = Arc::new(TestHandler::new());
517        let store = Arc::new(InMemoryIdempotencyStore::new(
518            Duration::from_secs(3600),
519            1000,
520        ));
521        let config = StripeWebhookConfig::test_config();
522
523        let event = create_test_subscription_event();
524
525        // Record in idempotency store first
526        store.check_and_record(&event.id).await.unwrap();
527
528        // Process the event
529        process_single_event(&handler, &store, &event, &config)
530            .await
531            .unwrap();
532
533        assert_eq!(handler.subscription_created_calls.load(Ordering::SeqCst), 1);
534    }
535
536    #[tokio::test]
537    async fn test_processor_queue_and_run() {
538        let handler = Arc::new(TestHandler::new());
539        let store = Arc::new(InMemoryIdempotencyStore::new(
540            Duration::from_secs(3600),
541            1000,
542        ));
543        let config = StripeWebhookConfig::test_config();
544
545        let (processor, handle) = EventProcessor::new(handler.clone(), store.clone(), config);
546
547        // Start the background processor
548        let processor_task = tokio::spawn(async move {
549            handle.run().await;
550        });
551
552        // Queue an event
553        let event = create_test_subscription_event();
554        store.check_and_record(&event.id).await.unwrap();
555        processor.queue_event(event).await.unwrap();
556
557        // Give it time to process
558        tokio::time::sleep(Duration::from_millis(100)).await;
559
560        // Verify handler was called
561        assert_eq!(handler.subscription_created_calls.load(Ordering::SeqCst), 1);
562
563        // Cleanup
564        processor_task.abort();
565    }
566
567    #[tokio::test]
568    async fn test_unknown_event_type_ignored() {
569        let handler = Arc::new(TestHandler::new());
570        let store = Arc::new(InMemoryIdempotencyStore::new(
571            Duration::from_secs(3600),
572            1000,
573        ));
574        let config = StripeWebhookConfig::test_config();
575
576        let json = r#"{
577            "id": "evt_unknown_123",
578            "type": "some.unknown.event",
579            "created": 1614556800,
580            "livemode": false,
581            "pending_webhooks": 1,
582            "data": {
583                "object": {}
584            }
585        }"#;
586
587        let event = StripeEvent::from_bytes(json.as_bytes()).unwrap();
588        store.check_and_record(&event.id).await.unwrap();
589
590        // Should succeed (unknown events are ignored)
591        process_single_event(&handler, &store, &event, &config)
592            .await
593            .unwrap();
594
595        // No handlers should have been called
596        assert_eq!(handler.subscription_created_calls.load(Ordering::SeqCst), 0);
597    }
598}