Skip to main content

ferro_stripe/webhook/
queue.rs

1//! Queue-based Stripe webhook dispatch (eventual-consistency path).
2//!
3//! [`ProcessStripeWebhook`] is a [`ferro_queue::Job`] that holds a raw
4//! Stripe event body and dispatches it through a shared
5//! [`SyncDispatcher`] on the background worker. The same dispatcher
6//! instance serves both the synchronous HTTP path and the queue path —
7//! register handlers once with [`SyncDispatcher::on`] and enqueue this
8//! job for events that tolerate eventual consistency.
9//!
10//! # Persistence
11//!
12//! The job persists `event_type` and `raw_body` through serde. The
13//! `dispatcher` field is runtime-only (`#[serde(skip)]`) and must be
14//! re-injected by the caller at enqueue time via
15//! [`ProcessStripeWebhook::new`]. Jobs deserialized without a dispatcher
16//! will return [`ferro_queue::Error::JobFailed`] on
17//! [`handle`][ferro_queue::Job::handle] so the queue can mark the job
18//! failed and continue processing other jobs.
19//!
20//! [`SyncDispatcher`]: crate::webhook::sync::SyncDispatcher
21//! [`SyncDispatcher::on`]: crate::webhook::sync::SyncDispatcher::on
22
23use std::sync::Arc;
24
25use crate::webhook::events::WebhookEvent;
26use crate::webhook::sync::SyncDispatcher;
27
28/// Background job that dispatches a verified Stripe webhook event through
29/// a shared [`SyncDispatcher`].
30///
31/// [`SyncDispatcher`]: crate::webhook::sync::SyncDispatcher
32#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
33pub struct ProcessStripeWebhook {
34    /// The Stripe event type string (e.g. `"checkout.session.completed"`).
35    pub event_type: String,
36    /// The raw JSON body of the already-verified Stripe event.
37    pub raw_body: String,
38    /// Connect account id for Connect webhooks; `None` for platform events.
39    pub connect_account_id: Option<String>,
40    /// Runtime-only: not persisted by the queue. Injected via
41    /// [`ProcessStripeWebhook::new`] at enqueue time.
42    #[serde(skip)]
43    pub dispatcher: Option<Arc<SyncDispatcher>>,
44}
45
46impl ProcessStripeWebhook {
47    /// Constructs a new queue job with the dispatcher attached.
48    ///
49    /// Use this constructor at enqueue time — the dispatcher is not
50    /// persisted, so jobs deserialized from storage without
51    /// re-injection cannot execute.
52    pub fn new(
53        event_type: String,
54        raw_body: String,
55        connect_account_id: Option<String>,
56        dispatcher: Arc<SyncDispatcher>,
57    ) -> Self {
58        Self {
59            event_type,
60            raw_body,
61            connect_account_id,
62            dispatcher: Some(dispatcher),
63        }
64    }
65}
66
67#[ferro_queue::async_trait]
68impl ferro_queue::Job for ProcessStripeWebhook {
69    async fn handle(&self) -> Result<(), ferro_queue::Error> {
70        let dispatcher = self
71            .dispatcher
72            .as_ref()
73            .ok_or_else(|| ferro_queue::Error::JobFailed {
74                job: "ProcessStripeWebhook".to_string(),
75                message: "dispatcher not injected — use ProcessStripeWebhook::new()".to_string(),
76            })?;
77        let event =
78            WebhookEvent::from_json(&self.raw_body).map_err(|e| ferro_queue::Error::JobFailed {
79                job: "ProcessStripeWebhook".to_string(),
80                message: format!("parse stripe event: {e}"),
81            })?;
82        dispatcher
83            .dispatch(event)
84            .await
85            .map_err(|e| ferro_queue::Error::JobFailed {
86                job: "ProcessStripeWebhook".to_string(),
87                message: e.to_string(),
88            })
89    }
90
91    fn name(&self) -> &'static str {
92        "ProcessStripeWebhook"
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99    use crate::webhook::events::StripeCheckoutCompleted;
100    use std::sync::atomic::{AtomicBool, Ordering};
101
102    #[test]
103    fn process_stripe_webhook_job_name() {
104        let dispatcher = Arc::new(SyncDispatcher::new());
105        let job = ProcessStripeWebhook::new(
106            "invoice.paid".to_string(),
107            "{}".to_string(),
108            None,
109            dispatcher,
110        );
111        use ferro_queue::Job;
112        assert_eq!(job.name(), "ProcessStripeWebhook");
113    }
114
115    #[test]
116    fn new_sets_dispatcher_to_some() {
117        let dispatcher = Arc::new(SyncDispatcher::new());
118        let job = ProcessStripeWebhook::new(
119            "checkout.session.completed".to_string(),
120            "{}".to_string(),
121            None,
122            Arc::clone(&dispatcher),
123        );
124        assert!(job.dispatcher.is_some());
125    }
126
127    #[tokio::test]
128    async fn handle_dispatches_parsed_event_through_dispatcher() {
129        let flag = Arc::new(AtomicBool::new(false));
130        let flag_clone = Arc::clone(&flag);
131        let dispatcher = Arc::new(SyncDispatcher::new().on(move |_: StripeCheckoutCompleted| {
132            let flag = Arc::clone(&flag_clone);
133            async move {
134                flag.store(true, Ordering::SeqCst);
135                Ok(())
136            }
137        }));
138
139        // Reuse Plan 03 fixture for a realistic stripe::Event JSON body.
140        let raw =
141            include_str!("../../tests/fixtures/stripe_events/checkout_session_completed.json");
142
143        let job = ProcessStripeWebhook::new(
144            "checkout.session.completed".to_string(),
145            raw.to_string(),
146            None,
147            dispatcher,
148        );
149
150        use ferro_queue::Job;
151        let result = job.handle().await;
152        assert!(result.is_ok(), "handle should succeed, got {result:?}");
153        assert!(
154            flag.load(Ordering::SeqCst),
155            "registered handler should have run"
156        );
157    }
158
159    #[tokio::test]
160    async fn handle_maps_parse_errors_to_job_failed() {
161        let dispatcher = Arc::new(SyncDispatcher::new());
162        let job = ProcessStripeWebhook::new(
163            "invoice.paid".to_string(),
164            "not-json".to_string(),
165            None,
166            dispatcher,
167        );
168
169        use ferro_queue::Job;
170        let result = job.handle().await;
171        match result {
172            Err(ferro_queue::Error::JobFailed { message, .. }) => {
173                assert!(message.starts_with("parse stripe event:"), "got: {message}");
174            }
175            other => panic!("expected JobFailed, got {other:?}"),
176        }
177    }
178}