ferro_stripe/idempotency.rs
1//! Idempotency primitives for Stripe webhook processing.
2//!
3//! Stripe retries webhooks, so every handler must be idempotent. Deduplicate
4//! on the Stripe event id (`evt_xxx`) using a persistent log. The trait
5//! below is the contract; the consuming app ships the impl backed by its DB.
6//!
7//! ## Recommended SQL schema
8//!
9//! ```sql
10//! CREATE TABLE stripe_processed_events (
11//! event_id TEXT PRIMARY KEY,
12//! event_type TEXT NOT NULL,
13//! received_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
14//! );
15//! ```
16//!
17//! Ferro does not ship this migration. Applications own the table.
18//! The `PRIMARY KEY` on `event_id` is the idempotency fence: the app's
19//! `try_mark_processed` impl SHOULD perform a conditional `INSERT` and
20//! return `Ok(true)` when a row was inserted, `Ok(false)` when the row
21//! already existed (unique-constraint violation).
22//!
23//! [`MemoryProcessedLog`] is provided for tests and single-process dev
24//! scenarios. Production systems MUST implement [`ProcessedEventLog`]
25//! against a shared database — in-process state does not survive restarts
26//! or protect against horizontal scaling.
27
28use crate::Error;
29use async_trait::async_trait;
30
31/// Records Stripe webhook events that have been processed, so that
32/// retries from Stripe do not cause duplicate side effects.
33///
34/// Implementations SHOULD be backed by a durable store with a unique
35/// constraint on `event_id`. See the module-level doc for the
36/// recommended SQL schema.
37#[async_trait]
38pub trait ProcessedEventLog: Send + Sync {
39 /// Attempts to mark `event_id` as processed.
40 ///
41 /// Returns `Ok(true)` when this call is the first to record the id
42 /// (the handler SHOULD proceed with side effects).
43 /// Returns `Ok(false)` when the id was already present
44 /// (the handler MUST skip side effects — this is a retry).
45 async fn try_mark_processed(&self, event_id: &str) -> Result<bool, Error>;
46}
47
48/// In-memory [`ProcessedEventLog`] backed by [`dashmap::DashMap`].
49///
50/// Intended for tests and single-process development. State is lost on
51/// process restart and is not shared across processes. Use a DB-backed
52/// impl in production.
53pub struct MemoryProcessedLog {
54 seen: dashmap::DashMap<String, ()>,
55}
56
57impl MemoryProcessedLog {
58 /// Creates an empty in-memory log.
59 pub fn new() -> Self {
60 Self {
61 seen: dashmap::DashMap::new(),
62 }
63 }
64}
65
66impl Default for MemoryProcessedLog {
67 fn default() -> Self {
68 Self::new()
69 }
70}
71
72#[async_trait]
73impl ProcessedEventLog for MemoryProcessedLog {
74 async fn try_mark_processed(&self, event_id: &str) -> Result<bool, Error> {
75 // DashMap::insert returns None when the key was absent (first time,
76 // so we return Ok(true)) and Some(()) when the key was already
77 // present (already seen, so Ok(false)). DashMap shard locking
78 // makes this atomic per key across concurrent callers.
79 Ok(self.seen.insert(event_id.to_string(), ()).is_none())
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 use super::*;
86
87 #[tokio::test]
88 async fn memory_log_true_then_false() {
89 let log = MemoryProcessedLog::new();
90 assert!(
91 log.try_mark_processed("evt_001").await.unwrap(),
92 "first call with a new id must return Ok(true)"
93 );
94 assert!(
95 !log.try_mark_processed("evt_001").await.unwrap(),
96 "second call with the same id must return Ok(false)"
97 );
98 assert!(
99 log.try_mark_processed("evt_002").await.unwrap(),
100 "different id must return Ok(true) even after an earlier id was seen"
101 );
102 }
103
104 #[tokio::test]
105 async fn memory_log_concurrent_insert_applies_once() {
106 use std::sync::Arc;
107 let log = Arc::new(MemoryProcessedLog::new());
108 let log2 = Arc::clone(&log);
109
110 let t1 = tokio::spawn(async move { log.try_mark_processed("evt_race_001").await });
111 let t2 = tokio::spawn(async move { log2.try_mark_processed("evt_race_001").await });
112
113 let (r1, r2) = tokio::join!(t1, t2);
114 let v1 = r1.unwrap().unwrap();
115 let v2 = r2.unwrap().unwrap();
116 assert_ne!(
117 v1,
118 v2,
119 "concurrent inserts must apply exactly once: one Ok(true), one Ok(false), got ({v1}, {v2})"
120 );
121 }
122}