use std::sync::Arc;
use crate::webhook::events::WebhookEvent;
use crate::webhook::sync::SyncDispatcher;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ProcessStripeWebhook {
pub event_type: String,
pub raw_body: String,
pub connect_account_id: Option<String>,
#[serde(skip)]
pub dispatcher: Option<Arc<SyncDispatcher>>,
}
impl ProcessStripeWebhook {
pub fn new(
event_type: String,
raw_body: String,
connect_account_id: Option<String>,
dispatcher: Arc<SyncDispatcher>,
) -> Self {
Self {
event_type,
raw_body,
connect_account_id,
dispatcher: Some(dispatcher),
}
}
}
#[ferro_queue::async_trait]
impl ferro_queue::Job for ProcessStripeWebhook {
async fn handle(&self) -> Result<(), ferro_queue::Error> {
let dispatcher = self
.dispatcher
.as_ref()
.ok_or_else(|| ferro_queue::Error::JobFailed {
job: "ProcessStripeWebhook".to_string(),
message: "dispatcher not injected — use ProcessStripeWebhook::new()".to_string(),
})?;
let event =
WebhookEvent::from_json(&self.raw_body).map_err(|e| ferro_queue::Error::JobFailed {
job: "ProcessStripeWebhook".to_string(),
message: format!("parse stripe event: {e}"),
})?;
dispatcher
.dispatch(event)
.await
.map_err(|e| ferro_queue::Error::JobFailed {
job: "ProcessStripeWebhook".to_string(),
message: e.to_string(),
})
}
fn name(&self) -> &'static str {
"ProcessStripeWebhook"
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::webhook::events::StripeCheckoutCompleted;
use std::sync::atomic::{AtomicBool, Ordering};
#[test]
fn process_stripe_webhook_job_name() {
let dispatcher = Arc::new(SyncDispatcher::new());
let job = ProcessStripeWebhook::new(
"invoice.paid".to_string(),
"{}".to_string(),
None,
dispatcher,
);
use ferro_queue::Job;
assert_eq!(job.name(), "ProcessStripeWebhook");
}
#[test]
fn new_sets_dispatcher_to_some() {
let dispatcher = Arc::new(SyncDispatcher::new());
let job = ProcessStripeWebhook::new(
"checkout.session.completed".to_string(),
"{}".to_string(),
None,
Arc::clone(&dispatcher),
);
assert!(job.dispatcher.is_some());
}
#[tokio::test]
async fn handle_dispatches_parsed_event_through_dispatcher() {
let flag = Arc::new(AtomicBool::new(false));
let flag_clone = Arc::clone(&flag);
let dispatcher = Arc::new(SyncDispatcher::new().on(move |_: StripeCheckoutCompleted| {
let flag = Arc::clone(&flag_clone);
async move {
flag.store(true, Ordering::SeqCst);
Ok(())
}
}));
let raw =
include_str!("../../tests/fixtures/stripe_events/checkout_session_completed.json");
let job = ProcessStripeWebhook::new(
"checkout.session.completed".to_string(),
raw.to_string(),
None,
dispatcher,
);
use ferro_queue::Job;
let result = job.handle().await;
assert!(result.is_ok(), "handle should succeed, got {result:?}");
assert!(
flag.load(Ordering::SeqCst),
"registered handler should have run"
);
}
#[tokio::test]
async fn handle_maps_parse_errors_to_job_failed() {
let dispatcher = Arc::new(SyncDispatcher::new());
let job = ProcessStripeWebhook::new(
"invoice.paid".to_string(),
"not-json".to_string(),
None,
dispatcher,
);
use ferro_queue::Job;
let result = job.handle().await;
match result {
Err(ferro_queue::Error::JobFailed { message, .. }) => {
assert!(message.starts_with("parse stripe event:"), "got: {message}");
}
other => panic!("expected JobFailed, got {other:?}"),
}
}
}