ferro_stripe/webhook/
queue.rs1use std::sync::Arc;
24
25use crate::webhook::events::WebhookEvent;
26use crate::webhook::sync::SyncDispatcher;
27
28#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
33pub struct ProcessStripeWebhook {
34 pub event_type: String,
36 pub raw_body: String,
38 pub connect_account_id: Option<String>,
40 #[serde(skip)]
43 pub dispatcher: Option<Arc<SyncDispatcher>>,
44}
45
46impl ProcessStripeWebhook {
47 pub fn new(
53 event_type: String,
54 raw_body: String,
55 connect_account_id: Option<String>,
56 dispatcher: Arc<SyncDispatcher>,
57 ) -> Self {
58 Self {
59 event_type,
60 raw_body,
61 connect_account_id,
62 dispatcher: Some(dispatcher),
63 }
64 }
65}
66
67#[ferro_queue::async_trait]
68impl ferro_queue::Job for ProcessStripeWebhook {
69 async fn handle(&self) -> Result<(), ferro_queue::Error> {
70 let dispatcher = self
71 .dispatcher
72 .as_ref()
73 .ok_or_else(|| ferro_queue::Error::JobFailed {
74 job: "ProcessStripeWebhook".to_string(),
75 message: "dispatcher not injected — use ProcessStripeWebhook::new()".to_string(),
76 })?;
77 let event =
78 WebhookEvent::from_json(&self.raw_body).map_err(|e| ferro_queue::Error::JobFailed {
79 job: "ProcessStripeWebhook".to_string(),
80 message: format!("parse stripe event: {e}"),
81 })?;
82 dispatcher
83 .dispatch(event)
84 .await
85 .map_err(|e| ferro_queue::Error::JobFailed {
86 job: "ProcessStripeWebhook".to_string(),
87 message: e.to_string(),
88 })
89 }
90
91 fn name(&self) -> &'static str {
92 "ProcessStripeWebhook"
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99 use crate::webhook::events::StripeCheckoutCompleted;
100 use std::sync::atomic::{AtomicBool, Ordering};
101
102 #[test]
103 fn process_stripe_webhook_job_name() {
104 let dispatcher = Arc::new(SyncDispatcher::new());
105 let job = ProcessStripeWebhook::new(
106 "invoice.paid".to_string(),
107 "{}".to_string(),
108 None,
109 dispatcher,
110 );
111 use ferro_queue::Job;
112 assert_eq!(job.name(), "ProcessStripeWebhook");
113 }
114
115 #[test]
116 fn new_sets_dispatcher_to_some() {
117 let dispatcher = Arc::new(SyncDispatcher::new());
118 let job = ProcessStripeWebhook::new(
119 "checkout.session.completed".to_string(),
120 "{}".to_string(),
121 None,
122 Arc::clone(&dispatcher),
123 );
124 assert!(job.dispatcher.is_some());
125 }
126
127 #[tokio::test]
128 async fn handle_dispatches_parsed_event_through_dispatcher() {
129 let flag = Arc::new(AtomicBool::new(false));
130 let flag_clone = Arc::clone(&flag);
131 let dispatcher = Arc::new(SyncDispatcher::new().on(move |_: StripeCheckoutCompleted| {
132 let flag = Arc::clone(&flag_clone);
133 async move {
134 flag.store(true, Ordering::SeqCst);
135 Ok(())
136 }
137 }));
138
139 let raw =
141 include_str!("../../tests/fixtures/stripe_events/checkout_session_completed.json");
142
143 let job = ProcessStripeWebhook::new(
144 "checkout.session.completed".to_string(),
145 raw.to_string(),
146 None,
147 dispatcher,
148 );
149
150 use ferro_queue::Job;
151 let result = job.handle().await;
152 assert!(result.is_ok(), "handle should succeed, got {result:?}");
153 assert!(
154 flag.load(Ordering::SeqCst),
155 "registered handler should have run"
156 );
157 }
158
159 #[tokio::test]
160 async fn handle_maps_parse_errors_to_job_failed() {
161 let dispatcher = Arc::new(SyncDispatcher::new());
162 let job = ProcessStripeWebhook::new(
163 "invoice.paid".to_string(),
164 "not-json".to_string(),
165 None,
166 dispatcher,
167 );
168
169 use ferro_queue::Job;
170 let result = job.handle().await;
171 match result {
172 Err(ferro_queue::Error::JobFailed { message, .. }) => {
173 assert!(message.starts_with("parse stripe event:"), "got: {message}");
174 }
175 other => panic!("expected JobFailed, got {other:?}"),
176 }
177 }
178}