reasonkit_web/stripe/
idempotency.rs

1//! Idempotency Layer for Stripe Webhooks
2//!
3//! Handles deduplication of webhook deliveries using event IDs.
4//! Stripe may deliver the same webhook multiple times due to:
5//!
6//! - Network issues causing Stripe to not receive our 2xx response
7//! - Retries due to our server returning 5xx
8//! - Stripe's at-least-once delivery guarantee
9//!
10//! # Implementation
11//!
12//! We track processed event IDs with their processing status and timestamp.
13//! Events are automatically cleaned up after the TTL expires.
14//!
15//! # Storage Options
16//!
17//! - `InMemoryIdempotencyStore`: Simple in-memory store (single instance only)
18//! - For production with multiple instances, use Redis or database backing
19
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use tokio::sync::RwLock;
25
26use crate::stripe::error::{StripeWebhookError, StripeWebhookResult};
27
28/// Status of event processing
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum ProcessingStatus {
31    /// Event is currently being processed
32    InProgress,
33    /// Event processing completed successfully
34    Completed,
35    /// Event processing failed (may be retried)
36    Failed { error: String },
37}
38
39/// Entry in the idempotency store
40#[derive(Debug, Clone)]
41pub struct IdempotencyEntry {
42    /// Event ID
43    pub event_id: String,
44    /// Processing status
45    pub status: ProcessingStatus,
46    /// When the event was first received
47    pub received_at: Instant,
48    /// When the status was last updated
49    pub updated_at: Instant,
50    /// Number of processing attempts
51    pub attempts: u32,
52}
53
54/// Trait for idempotency storage backends
55#[async_trait::async_trait]
56pub trait IdempotencyStore: Send + Sync + 'static {
57    /// Check if an event has been processed and record it if not
58    ///
59    /// # Returns
60    ///
61    /// - `Ok(true)` if this is a new event (not seen before)
62    /// - `Ok(false)` if the event was already processed or is in progress
63    /// - `Err` if the check failed
64    async fn check_and_record(&self, event_id: &str) -> StripeWebhookResult<bool>;
65
66    /// Mark an event as completed
67    async fn mark_completed(&self, event_id: &str) -> StripeWebhookResult<()>;
68
69    /// Mark an event as failed
70    async fn mark_failed(&self, event_id: &str, error: &str) -> StripeWebhookResult<()>;
71
72    /// Get the status of an event
73    async fn get_status(&self, event_id: &str) -> StripeWebhookResult<Option<IdempotencyEntry>>;
74
75    /// Clean up expired entries
76    async fn cleanup(&self) -> StripeWebhookResult<usize>;
77}
78
79/// In-memory idempotency store
80///
81/// Suitable for single-instance deployments. For multi-instance deployments,
82/// use a distributed store (Redis, database, etc.)
83pub struct InMemoryIdempotencyStore {
84    /// Event entries keyed by event ID
85    entries: Arc<RwLock<HashMap<String, IdempotencyEntry>>>,
86    /// Time-to-live for entries
87    ttl: Duration,
88    /// Maximum number of entries
89    max_entries: usize,
90}
91
92impl InMemoryIdempotencyStore {
93    /// Create a new in-memory store
94    pub fn new(ttl: Duration, max_entries: usize) -> Self {
95        Self {
96            entries: Arc::new(RwLock::new(HashMap::new())),
97            ttl,
98            max_entries,
99        }
100    }
101
102    /// Create from config
103    pub fn from_config(config: &crate::stripe::config::StripeWebhookConfig) -> Self {
104        Self::new(config.idempotency_ttl, config.idempotency_max_entries)
105    }
106
107    /// Check if entry is expired
108    fn is_expired(&self, entry: &IdempotencyEntry) -> bool {
109        entry.received_at.elapsed() > self.ttl
110    }
111
112    /// Get current entry count
113    pub async fn len(&self) -> usize {
114        self.entries.read().await.len()
115    }
116
117    /// Check if store is empty
118    pub async fn is_empty(&self) -> bool {
119        self.entries.read().await.is_empty()
120    }
121}
122
123#[async_trait::async_trait]
124impl IdempotencyStore for InMemoryIdempotencyStore {
125    async fn check_and_record(&self, event_id: &str) -> StripeWebhookResult<bool> {
126        let mut entries = self.entries.write().await;
127        let now = Instant::now();
128
129        // Check if already exists and not expired
130        if let Some(existing) = entries.get(event_id) {
131            if !self.is_expired(existing) {
132                tracing::debug!(
133                    event_id,
134                    status = ?existing.status,
135                    "Event already in idempotency store"
136                );
137                return Ok(false);
138            }
139            // Entry is expired, remove it and proceed as new
140            entries.remove(event_id);
141        }
142
143        // Check if we need to evict old entries
144        if entries.len() >= self.max_entries {
145            // Remove oldest entries
146            let mut to_remove: Vec<String> = entries
147                .iter()
148                .filter(|(_, entry)| self.is_expired(entry))
149                .map(|(id, _)| id.clone())
150                .collect();
151
152            // If not enough expired entries, remove oldest by received_at
153            if to_remove.len() < entries.len() / 10 {
154                let mut by_age: Vec<_> = entries.iter().collect();
155                by_age.sort_by_key(|(_, entry)| entry.received_at);
156                to_remove.extend(
157                    by_age
158                        .iter()
159                        .take(entries.len() / 10)
160                        .map(|(id, _)| (*id).clone()),
161                );
162            }
163
164            for id in to_remove {
165                entries.remove(&id);
166            }
167
168            tracing::info!(
169                remaining = entries.len(),
170                max = self.max_entries,
171                "Evicted old idempotency entries"
172            );
173        }
174
175        // Record new entry
176        entries.insert(
177            event_id.to_string(),
178            IdempotencyEntry {
179                event_id: event_id.to_string(),
180                status: ProcessingStatus::InProgress,
181                received_at: now,
182                updated_at: now,
183                attempts: 1,
184            },
185        );
186
187        tracing::debug!(event_id, "New event recorded in idempotency store");
188        Ok(true)
189    }
190
191    async fn mark_completed(&self, event_id: &str) -> StripeWebhookResult<()> {
192        let mut entries = self.entries.write().await;
193
194        if let Some(entry) = entries.get_mut(event_id) {
195            entry.status = ProcessingStatus::Completed;
196            entry.updated_at = Instant::now();
197            tracing::debug!(event_id, "Event marked as completed");
198        } else {
199            tracing::warn!(
200                event_id,
201                "Attempted to mark non-existent event as completed"
202            );
203        }
204
205        Ok(())
206    }
207
208    async fn mark_failed(&self, event_id: &str, error: &str) -> StripeWebhookResult<()> {
209        let mut entries = self.entries.write().await;
210
211        if let Some(entry) = entries.get_mut(event_id) {
212            entry.status = ProcessingStatus::Failed {
213                error: error.to_string(),
214            };
215            entry.updated_at = Instant::now();
216            tracing::debug!(event_id, error, "Event marked as failed");
217        } else {
218            tracing::warn!(event_id, "Attempted to mark non-existent event as failed");
219        }
220
221        Ok(())
222    }
223
224    async fn get_status(&self, event_id: &str) -> StripeWebhookResult<Option<IdempotencyEntry>> {
225        let entries = self.entries.read().await;
226
227        if let Some(entry) = entries.get(event_id) {
228            if self.is_expired(entry) {
229                return Ok(None);
230            }
231            return Ok(Some(entry.clone()));
232        }
233
234        Ok(None)
235    }
236
237    async fn cleanup(&self) -> StripeWebhookResult<usize> {
238        let mut entries = self.entries.write().await;
239        let before = entries.len();
240
241        entries.retain(|_, entry| !self.is_expired(entry));
242
243        let removed = before - entries.len();
244        if removed > 0 {
245            tracing::info!(
246                removed,
247                remaining = entries.len(),
248                "Cleaned up expired idempotency entries"
249            );
250        }
251
252        Ok(removed)
253    }
254}
255
256/// Idempotency middleware that wraps event processing
257pub struct IdempotencyMiddleware<S: IdempotencyStore> {
258    store: Arc<S>,
259}
260
261impl<S: IdempotencyStore> IdempotencyMiddleware<S> {
262    /// Create new middleware
263    pub fn new(store: Arc<S>) -> Self {
264        Self { store }
265    }
266
267    /// Check if event should be processed
268    ///
269    /// # Returns
270    ///
271    /// - `Ok(true)` if event should be processed (new event)
272    /// - `Ok(false)` if event was already processed (return 202)
273    /// - `Err` for already-processed events that need special handling
274    pub async fn should_process(&self, event_id: &str) -> StripeWebhookResult<bool> {
275        match self.store.check_and_record(event_id).await {
276            Ok(true) => Ok(true),
277            Ok(false) => {
278                // Already processed or in progress
279                Err(StripeWebhookError::AlreadyProcessed {
280                    event_id: event_id.to_string(),
281                })
282            }
283            Err(e) => Err(e),
284        }
285    }
286
287    /// Mark event processing as complete
288    pub async fn complete(&self, event_id: &str) -> StripeWebhookResult<()> {
289        self.store.mark_completed(event_id).await
290    }
291
292    /// Mark event processing as failed
293    pub async fn fail(&self, event_id: &str, error: &str) -> StripeWebhookResult<()> {
294        self.store.mark_failed(event_id, error).await
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    #[tokio::test]
303    async fn test_check_and_record_new_event() {
304        let store = InMemoryIdempotencyStore::new(Duration::from_secs(3600), 1000);
305
306        let result = store.check_and_record("evt_123").await.unwrap();
307        assert!(result); // New event
308
309        let result = store.check_and_record("evt_123").await.unwrap();
310        assert!(!result); // Already recorded
311    }
312
313    #[tokio::test]
314    async fn test_check_and_record_different_events() {
315        let store = InMemoryIdempotencyStore::new(Duration::from_secs(3600), 1000);
316
317        assert!(store.check_and_record("evt_1").await.unwrap());
318        assert!(store.check_and_record("evt_2").await.unwrap());
319        assert!(store.check_and_record("evt_3").await.unwrap());
320
321        assert!(!store.check_and_record("evt_1").await.unwrap());
322        assert!(!store.check_and_record("evt_2").await.unwrap());
323    }
324
325    #[tokio::test]
326    async fn test_mark_completed() {
327        let store = InMemoryIdempotencyStore::new(Duration::from_secs(3600), 1000);
328
329        store.check_and_record("evt_123").await.unwrap();
330        store.mark_completed("evt_123").await.unwrap();
331
332        let entry = store.get_status("evt_123").await.unwrap().unwrap();
333        assert_eq!(entry.status, ProcessingStatus::Completed);
334    }
335
336    #[tokio::test]
337    async fn test_mark_failed() {
338        let store = InMemoryIdempotencyStore::new(Duration::from_secs(3600), 1000);
339
340        store.check_and_record("evt_123").await.unwrap();
341        store
342            .mark_failed("evt_123", "Database error")
343            .await
344            .unwrap();
345
346        let entry = store.get_status("evt_123").await.unwrap().unwrap();
347        assert!(matches!(entry.status, ProcessingStatus::Failed { .. }));
348    }
349
350    #[tokio::test]
351    async fn test_expired_entries() {
352        let store = InMemoryIdempotencyStore::new(Duration::from_millis(10), 1000);
353
354        store.check_and_record("evt_123").await.unwrap();
355
356        // Wait for expiry
357        tokio::time::sleep(Duration::from_millis(20)).await;
358
359        // Should be able to record again after expiry
360        assert!(store.check_and_record("evt_123").await.unwrap());
361    }
362
363    #[tokio::test]
364    async fn test_cleanup() {
365        let store = InMemoryIdempotencyStore::new(Duration::from_millis(10), 1000);
366
367        store.check_and_record("evt_1").await.unwrap();
368        store.check_and_record("evt_2").await.unwrap();
369
370        // Wait for expiry
371        tokio::time::sleep(Duration::from_millis(20)).await;
372
373        let removed = store.cleanup().await.unwrap();
374        assert_eq!(removed, 2);
375        assert!(store.is_empty().await);
376    }
377
378    #[tokio::test]
379    async fn test_max_entries_eviction() {
380        let store = InMemoryIdempotencyStore::new(Duration::from_secs(3600), 10);
381
382        // Fill up the store
383        for i in 0..15 {
384            store.check_and_record(&format!("evt_{}", i)).await.unwrap();
385        }
386
387        // Should have evicted some entries
388        assert!(store.len().await <= 15);
389    }
390
391    #[tokio::test]
392    async fn test_idempotency_middleware() {
393        let store = Arc::new(InMemoryIdempotencyStore::new(
394            Duration::from_secs(3600),
395            1000,
396        ));
397        let middleware = IdempotencyMiddleware::new(store);
398
399        // First call should succeed
400        assert!(middleware.should_process("evt_123").await.is_ok());
401
402        // Second call should return AlreadyProcessed error
403        let result = middleware.should_process("evt_123").await;
404        assert!(matches!(
405            result,
406            Err(StripeWebhookError::AlreadyProcessed { .. })
407        ));
408
409        // Mark complete
410        middleware.complete("evt_123").await.unwrap();
411    }
412}