Skip to main content

ferro_stripe/
idempotency.rs

1//! Idempotency primitives for Stripe webhook processing.
2//!
3//! Stripe retries webhooks, so every handler must be idempotent. Deduplicate
4//! on the Stripe event id (`evt_xxx`) using a persistent log. The trait
5//! below is the contract; the consuming app ships the impl backed by its DB.
6//!
7//! ## Recommended SQL schema
8//!
9//! ```sql
10//! CREATE TABLE stripe_processed_events (
11//!   event_id TEXT PRIMARY KEY,
12//!   event_type TEXT NOT NULL,
13//!   received_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
14//! );
15//! ```
16//!
17//! Ferro does not ship this migration. Applications own the table.
18//! The `PRIMARY KEY` on `event_id` is the idempotency fence: the app's
19//! `try_mark_processed` impl SHOULD perform a conditional `INSERT` and
20//! return `Ok(true)` when a row was inserted, `Ok(false)` when the row
21//! already existed (unique-constraint violation).
22//!
23//! [`MemoryProcessedLog`] is provided for tests and single-process dev
24//! scenarios. Production systems MUST implement [`ProcessedEventLog`]
25//! against a shared database — in-process state does not survive restarts
26//! or protect against horizontal scaling.
27
28use crate::Error;
29use async_trait::async_trait;
30
31/// Records Stripe webhook events that have been processed, so that
32/// retries from Stripe do not cause duplicate side effects.
33///
34/// Implementations SHOULD be backed by a durable store with a unique
35/// constraint on `event_id`. See the module-level doc for the
36/// recommended SQL schema.
37#[async_trait]
38pub trait ProcessedEventLog: Send + Sync {
39    /// Attempts to mark `event_id` as processed.
40    ///
41    /// Returns `Ok(true)` when this call is the first to record the id
42    /// (the handler SHOULD proceed with side effects).
43    /// Returns `Ok(false)` when the id was already present
44    /// (the handler MUST skip side effects — this is a retry).
45    async fn try_mark_processed(&self, event_id: &str) -> Result<bool, Error>;
46}
47
48/// In-memory [`ProcessedEventLog`] backed by [`dashmap::DashMap`].
49///
50/// Intended for tests and single-process development. State is lost on
51/// process restart and is not shared across processes. Use a DB-backed
52/// impl in production.
53pub struct MemoryProcessedLog {
54    seen: dashmap::DashMap<String, ()>,
55}
56
57impl MemoryProcessedLog {
58    /// Creates an empty in-memory log.
59    pub fn new() -> Self {
60        Self {
61            seen: dashmap::DashMap::new(),
62        }
63    }
64}
65
66impl Default for MemoryProcessedLog {
67    fn default() -> Self {
68        Self::new()
69    }
70}
71
72#[async_trait]
73impl ProcessedEventLog for MemoryProcessedLog {
74    async fn try_mark_processed(&self, event_id: &str) -> Result<bool, Error> {
75        // DashMap::insert returns None when the key was absent (first time,
76        // so we return Ok(true)) and Some(()) when the key was already
77        // present (already seen, so Ok(false)). DashMap shard locking
78        // makes this atomic per key across concurrent callers.
79        Ok(self.seen.insert(event_id.to_string(), ()).is_none())
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use super::*;
86
87    #[tokio::test]
88    async fn memory_log_true_then_false() {
89        let log = MemoryProcessedLog::new();
90        assert!(
91            log.try_mark_processed("evt_001").await.unwrap(),
92            "first call with a new id must return Ok(true)"
93        );
94        assert!(
95            !log.try_mark_processed("evt_001").await.unwrap(),
96            "second call with the same id must return Ok(false)"
97        );
98        assert!(
99            log.try_mark_processed("evt_002").await.unwrap(),
100            "different id must return Ok(true) even after an earlier id was seen"
101        );
102    }
103
104    #[tokio::test]
105    async fn memory_log_concurrent_insert_applies_once() {
106        use std::sync::Arc;
107        let log = Arc::new(MemoryProcessedLog::new());
108        let log2 = Arc::clone(&log);
109
110        let t1 = tokio::spawn(async move { log.try_mark_processed("evt_race_001").await });
111        let t2 = tokio::spawn(async move { log2.try_mark_processed("evt_race_001").await });
112
113        let (r1, r2) = tokio::join!(t1, t2);
114        let v1 = r1.unwrap().unwrap();
115        let v2 = r2.unwrap().unwrap();
116        assert_ne!(
117            v1,
118            v2,
119            "concurrent inserts must apply exactly once: one Ok(true), one Ok(false), got ({v1}, {v2})"
120        );
121    }
122}