1use std::sync::Arc;
29
30use tokio::sync::mpsc;
31use tokio::time::timeout;
32
33use crate::stripe::config::StripeWebhookConfig;
34use crate::stripe::error::StripeWebhookResult;
35use crate::stripe::events::{
36 CustomerEvent, InvoiceEvent, StripeEvent, StripeEventType, SubscriptionEvent,
37};
38use crate::stripe::idempotency::IdempotencyStore;
39
40#[async_trait::async_trait]
42pub trait SubscriptionHandler: Send + Sync + 'static {
43 async fn on_subscription_created(&self, event: &SubscriptionEvent) -> anyhow::Result<()>;
45
46 async fn on_subscription_updated(&self, event: &SubscriptionEvent) -> anyhow::Result<()>;
48
49 async fn on_subscription_deleted(&self, event: &SubscriptionEvent) -> anyhow::Result<()>;
51
52 async fn on_payment_succeeded(&self, event: &InvoiceEvent) -> anyhow::Result<()>;
54
55 async fn on_payment_failed(&self, event: &InvoiceEvent) -> anyhow::Result<()>;
57
58 async fn on_customer_created(&self, event: &CustomerEvent) -> anyhow::Result<()>;
60}
61
62pub struct EventProcessor<H: SubscriptionHandler, S: IdempotencyStore> {
64 handler: Arc<H>,
65 idempotency_store: Arc<S>,
66 config: StripeWebhookConfig,
67 task_sender: mpsc::Sender<ProcessingTask>,
69}
70
71struct ProcessingTask {
73 event: StripeEvent,
74 #[allow(dead_code)]
76 attempt: u32,
77}
78
79impl<H: SubscriptionHandler, S: IdempotencyStore> EventProcessor<H, S> {
80 pub fn new(
82 handler: Arc<H>,
83 idempotency_store: Arc<S>,
84 config: StripeWebhookConfig,
85 ) -> (Self, ProcessorHandle<H, S>) {
86 let (tx, rx) = mpsc::channel(1000);
87
88 let processor = Self {
89 handler: handler.clone(),
90 idempotency_store: idempotency_store.clone(),
91 config: config.clone(),
92 task_sender: tx,
93 };
94
95 let handle = ProcessorHandle {
96 handler,
97 idempotency_store,
98 config,
99 task_receiver: rx,
100 };
101
102 (processor, handle)
103 }
104
105 pub async fn queue_event(&self, event: StripeEvent) -> StripeWebhookResult<()> {
110 let task = ProcessingTask { event, attempt: 0 };
111
112 self.task_sender.send(task).await.map_err(|e| {
113 crate::stripe::error::StripeWebhookError::InternalError(format!(
114 "Failed to queue event: {}",
115 e
116 ))
117 })?;
118
119 Ok(())
120 }
121
122 pub async fn process_event_sync(&self, event: &StripeEvent) -> StripeWebhookResult<()> {
124 process_single_event(&self.handler, &self.idempotency_store, event, &self.config).await
125 }
126}
127
128pub struct ProcessorHandle<H: SubscriptionHandler, S: IdempotencyStore> {
130 handler: Arc<H>,
131 idempotency_store: Arc<S>,
132 config: StripeWebhookConfig,
133 task_receiver: mpsc::Receiver<ProcessingTask>,
134}
135
136impl<H: SubscriptionHandler, S: IdempotencyStore> ProcessorHandle<H, S> {
137 pub async fn run(mut self) {
147 tracing::info!("Starting Stripe webhook event processor");
148
149 while let Some(task) = self.task_receiver.recv().await {
150 let handler = self.handler.clone();
151 let store = self.idempotency_store.clone();
152 let config = self.config.clone();
153
154 tokio::spawn(async move {
156 process_with_retry(handler, store, task.event, &config).await;
157 });
158 }
159
160 tracing::info!("Stripe webhook event processor shutting down");
161 }
162}
163
164async fn process_with_retry<H: SubscriptionHandler, S: IdempotencyStore>(
166 handler: Arc<H>,
167 store: Arc<S>,
168 event: StripeEvent,
169 config: &StripeWebhookConfig,
170) {
171 let event_id = event.id.clone();
172 let event_type = event.event_type.clone();
173
174 for attempt in 0..=config.max_retries {
175 if attempt > 0 {
176 let delay = config.retry_delay(attempt - 1);
177 tracing::info!(
178 event_id = %event_id,
179 event_type = %event_type,
180 attempt,
181 delay_ms = delay.as_millis(),
182 "Retrying event processing"
183 );
184 tokio::time::sleep(delay).await;
185 }
186
187 match process_single_event(&handler, &store, &event, config).await {
188 Ok(()) => {
189 tracing::info!(
190 event_id = %event_id,
191 event_type = %event_type,
192 attempts = attempt + 1,
193 "Event processed successfully"
194 );
195 return;
196 }
197 Err(e) => {
198 tracing::warn!(
199 event_id = %event_id,
200 event_type = %event_type,
201 attempt = attempt + 1,
202 max_retries = config.max_retries,
203 error = %e,
204 "Event processing failed"
205 );
206
207 if attempt == config.max_retries {
208 if let Err(mark_err) = store.mark_failed(&event_id, &e.to_string()).await {
210 tracing::error!(
211 event_id = %event_id,
212 error = %mark_err,
213 "Failed to mark event as failed in idempotency store"
214 );
215 }
216 }
217 }
218 }
219 }
220}
221
222async fn process_single_event<H: SubscriptionHandler, S: IdempotencyStore>(
224 handler: &Arc<H>,
225 store: &Arc<S>,
226 event: &StripeEvent,
227 config: &StripeWebhookConfig,
228) -> StripeWebhookResult<()> {
229 let event_type = event.typed_event_type();
230
231 let result = timeout(config.processing_timeout, async {
233 match event_type {
234 StripeEventType::SubscriptionCreated => {
235 let sub_event = event.as_subscription()?;
236 handler
237 .on_subscription_created(&sub_event)
238 .await
239 .map_err(|e| {
240 crate::stripe::error::StripeWebhookError::ProcessingFailed(e.to_string())
241 })
242 }
243 StripeEventType::SubscriptionUpdated => {
244 let sub_event = event.as_subscription()?;
245 handler
246 .on_subscription_updated(&sub_event)
247 .await
248 .map_err(|e| {
249 crate::stripe::error::StripeWebhookError::ProcessingFailed(e.to_string())
250 })
251 }
252 StripeEventType::SubscriptionDeleted => {
253 let sub_event = event.as_subscription()?;
254 handler
255 .on_subscription_deleted(&sub_event)
256 .await
257 .map_err(|e| {
258 crate::stripe::error::StripeWebhookError::ProcessingFailed(e.to_string())
259 })
260 }
261 StripeEventType::InvoicePaymentSucceeded => {
262 let invoice_event = event.as_invoice()?;
263 handler
264 .on_payment_succeeded(&invoice_event)
265 .await
266 .map_err(|e| {
267 crate::stripe::error::StripeWebhookError::ProcessingFailed(e.to_string())
268 })
269 }
270 StripeEventType::InvoicePaymentFailed => {
271 let invoice_event = event.as_invoice()?;
272 handler
273 .on_payment_failed(&invoice_event)
274 .await
275 .map_err(|e| {
276 crate::stripe::error::StripeWebhookError::ProcessingFailed(e.to_string())
277 })
278 }
279 StripeEventType::CustomerCreated => {
280 let customer_event = event.as_customer()?;
281 handler
282 .on_customer_created(&customer_event)
283 .await
284 .map_err(|e| {
285 crate::stripe::error::StripeWebhookError::ProcessingFailed(e.to_string())
286 })
287 }
288 StripeEventType::Unknown => {
289 tracing::debug!(
290 event_id = %event.id,
291 event_type = %event.event_type,
292 "Ignoring unknown event type"
293 );
294 Ok(())
295 }
296 }
297 })
298 .await;
299
300 match result {
301 Ok(inner_result) => {
302 if inner_result.is_ok() {
303 store.mark_completed(&event.id).await?;
305 }
306 inner_result
307 }
308 Err(_) => Err(crate::stripe::error::StripeWebhookError::ProcessingFailed(
309 format!("Processing timed out after {:?}", config.processing_timeout),
310 )),
311 }
312}
313
314#[derive(Clone)]
316pub struct NoOpHandler;
317
318#[async_trait::async_trait]
319impl SubscriptionHandler for NoOpHandler {
320 async fn on_subscription_created(&self, _event: &SubscriptionEvent) -> anyhow::Result<()> {
321 Ok(())
322 }
323 async fn on_subscription_updated(&self, _event: &SubscriptionEvent) -> anyhow::Result<()> {
324 Ok(())
325 }
326 async fn on_subscription_deleted(&self, _event: &SubscriptionEvent) -> anyhow::Result<()> {
327 Ok(())
328 }
329 async fn on_payment_succeeded(&self, _event: &InvoiceEvent) -> anyhow::Result<()> {
330 Ok(())
331 }
332 async fn on_payment_failed(&self, _event: &InvoiceEvent) -> anyhow::Result<()> {
333 Ok(())
334 }
335 async fn on_customer_created(&self, _event: &CustomerEvent) -> anyhow::Result<()> {
336 Ok(())
337 }
338}
339
340#[derive(Clone)]
342pub struct LoggingHandler;
343
344#[async_trait::async_trait]
345impl SubscriptionHandler for LoggingHandler {
346 async fn on_subscription_created(&self, event: &SubscriptionEvent) -> anyhow::Result<()> {
347 tracing::info!(
348 subscription_id = %event.subscription.id,
349 customer_id = %event.subscription.customer,
350 status = ?event.subscription.status,
351 "Subscription created"
352 );
353 Ok(())
354 }
355
356 async fn on_subscription_updated(&self, event: &SubscriptionEvent) -> anyhow::Result<()> {
357 tracing::info!(
358 subscription_id = %event.subscription.id,
359 customer_id = %event.subscription.customer,
360 status = ?event.subscription.status,
361 cancel_at_period_end = event.subscription.cancel_at_period_end,
362 "Subscription updated"
363 );
364 Ok(())
365 }
366
367 async fn on_subscription_deleted(&self, event: &SubscriptionEvent) -> anyhow::Result<()> {
368 tracing::info!(
369 subscription_id = %event.subscription.id,
370 customer_id = %event.subscription.customer,
371 "Subscription deleted"
372 );
373 Ok(())
374 }
375
376 async fn on_payment_succeeded(&self, event: &InvoiceEvent) -> anyhow::Result<()> {
377 tracing::info!(
378 invoice_id = %event.invoice.id,
379 customer_id = %event.invoice.customer,
380 amount_paid = event.invoice.amount_paid,
381 currency = %event.invoice.currency,
382 "Payment succeeded"
383 );
384 Ok(())
385 }
386
387 async fn on_payment_failed(&self, event: &InvoiceEvent) -> anyhow::Result<()> {
388 tracing::warn!(
389 invoice_id = %event.invoice.id,
390 customer_id = %event.invoice.customer,
391 amount_due = event.invoice.amount_due,
392 currency = %event.invoice.currency,
393 "Payment failed"
394 );
395 Ok(())
396 }
397
398 async fn on_customer_created(&self, event: &CustomerEvent) -> anyhow::Result<()> {
399 tracing::info!(
400 customer_id = %event.customer.id,
401 email = ?event.customer.email,
402 "Customer created"
403 );
404 Ok(())
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411 use crate::stripe::idempotency::InMemoryIdempotencyStore;
412 use std::sync::atomic::{AtomicU32, Ordering};
413 use std::time::Duration;
414
415 struct TestHandler {
417 subscription_created_calls: AtomicU32,
418 subscription_updated_calls: AtomicU32,
419 subscription_deleted_calls: AtomicU32,
420 payment_succeeded_calls: AtomicU32,
421 payment_failed_calls: AtomicU32,
422 customer_created_calls: AtomicU32,
423 should_fail: std::sync::atomic::AtomicBool,
424 }
425
426 impl TestHandler {
427 fn new() -> Self {
428 Self {
429 subscription_created_calls: AtomicU32::new(0),
430 subscription_updated_calls: AtomicU32::new(0),
431 subscription_deleted_calls: AtomicU32::new(0),
432 payment_succeeded_calls: AtomicU32::new(0),
433 payment_failed_calls: AtomicU32::new(0),
434 customer_created_calls: AtomicU32::new(0),
435 should_fail: std::sync::atomic::AtomicBool::new(false),
436 }
437 }
438 }
439
440 #[async_trait::async_trait]
441 impl SubscriptionHandler for TestHandler {
442 async fn on_subscription_created(&self, _event: &SubscriptionEvent) -> anyhow::Result<()> {
443 self.subscription_created_calls
444 .fetch_add(1, Ordering::SeqCst);
445 if self.should_fail.load(Ordering::SeqCst) {
446 anyhow::bail!("Simulated failure");
447 }
448 Ok(())
449 }
450 async fn on_subscription_updated(&self, _event: &SubscriptionEvent) -> anyhow::Result<()> {
451 self.subscription_updated_calls
452 .fetch_add(1, Ordering::SeqCst);
453 Ok(())
454 }
455 async fn on_subscription_deleted(&self, _event: &SubscriptionEvent) -> anyhow::Result<()> {
456 self.subscription_deleted_calls
457 .fetch_add(1, Ordering::SeqCst);
458 Ok(())
459 }
460 async fn on_payment_succeeded(&self, _event: &InvoiceEvent) -> anyhow::Result<()> {
461 self.payment_succeeded_calls.fetch_add(1, Ordering::SeqCst);
462 Ok(())
463 }
464 async fn on_payment_failed(&self, _event: &InvoiceEvent) -> anyhow::Result<()> {
465 self.payment_failed_calls.fetch_add(1, Ordering::SeqCst);
466 Ok(())
467 }
468 async fn on_customer_created(&self, _event: &CustomerEvent) -> anyhow::Result<()> {
469 self.customer_created_calls.fetch_add(1, Ordering::SeqCst);
470 Ok(())
471 }
472 }
473
474 fn create_test_subscription_event() -> StripeEvent {
475 let json = r#"{
476 "id": "evt_test_123",
477 "type": "customer.subscription.created",
478 "created": 1614556800,
479 "livemode": false,
480 "pending_webhooks": 1,
481 "data": {
482 "object": {
483 "id": "sub_test_123",
484 "customer": "cus_test_123",
485 "status": "active",
486 "current_period_start": 1614556800,
487 "current_period_end": 1617235200,
488 "cancel_at_period_end": false,
489 "items": {
490 "data": [{
491 "id": "si_test_123",
492 "price": {
493 "id": "price_test_123",
494 "product": "prod_test_123",
495 "unit_amount": 2000,
496 "currency": "usd",
497 "recurring": {
498 "interval": "month",
499 "interval_count": 1
500 }
501 },
502 "quantity": 1
503 }]
504 },
505 "metadata": {},
506 "livemode": false
507 }
508 }
509 }"#;
510
511 StripeEvent::from_bytes(json.as_bytes()).unwrap()
512 }
513
514 #[tokio::test]
515 async fn test_process_subscription_created() {
516 let handler = Arc::new(TestHandler::new());
517 let store = Arc::new(InMemoryIdempotencyStore::new(
518 Duration::from_secs(3600),
519 1000,
520 ));
521 let config = StripeWebhookConfig::test_config();
522
523 let event = create_test_subscription_event();
524
525 store.check_and_record(&event.id).await.unwrap();
527
528 process_single_event(&handler, &store, &event, &config)
530 .await
531 .unwrap();
532
533 assert_eq!(handler.subscription_created_calls.load(Ordering::SeqCst), 1);
534 }
535
536 #[tokio::test]
537 async fn test_processor_queue_and_run() {
538 let handler = Arc::new(TestHandler::new());
539 let store = Arc::new(InMemoryIdempotencyStore::new(
540 Duration::from_secs(3600),
541 1000,
542 ));
543 let config = StripeWebhookConfig::test_config();
544
545 let (processor, handle) = EventProcessor::new(handler.clone(), store.clone(), config);
546
547 let processor_task = tokio::spawn(async move {
549 handle.run().await;
550 });
551
552 let event = create_test_subscription_event();
554 store.check_and_record(&event.id).await.unwrap();
555 processor.queue_event(event).await.unwrap();
556
557 tokio::time::sleep(Duration::from_millis(100)).await;
559
560 assert_eq!(handler.subscription_created_calls.load(Ordering::SeqCst), 1);
562
563 processor_task.abort();
565 }
566
567 #[tokio::test]
568 async fn test_unknown_event_type_ignored() {
569 let handler = Arc::new(TestHandler::new());
570 let store = Arc::new(InMemoryIdempotencyStore::new(
571 Duration::from_secs(3600),
572 1000,
573 ));
574 let config = StripeWebhookConfig::test_config();
575
576 let json = r#"{
577 "id": "evt_unknown_123",
578 "type": "some.unknown.event",
579 "created": 1614556800,
580 "livemode": false,
581 "pending_webhooks": 1,
582 "data": {
583 "object": {}
584 }
585 }"#;
586
587 let event = StripeEvent::from_bytes(json.as_bytes()).unwrap();
588 store.check_and_record(&event.id).await.unwrap();
589
590 process_single_event(&handler, &store, &event, &config)
592 .await
593 .unwrap();
594
595 assert_eq!(handler.subscription_created_calls.load(Ordering::SeqCst), 0);
597 }
598}