reasonkit_web/stripe/
idempotency.rs1use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use tokio::sync::RwLock;
25
26use crate::stripe::error::{StripeWebhookError, StripeWebhookResult};
27
28#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum ProcessingStatus {
31 InProgress,
33 Completed,
35 Failed { error: String },
37}
38
39#[derive(Debug, Clone)]
41pub struct IdempotencyEntry {
42 pub event_id: String,
44 pub status: ProcessingStatus,
46 pub received_at: Instant,
48 pub updated_at: Instant,
50 pub attempts: u32,
52}
53
54#[async_trait::async_trait]
56pub trait IdempotencyStore: Send + Sync + 'static {
57 async fn check_and_record(&self, event_id: &str) -> StripeWebhookResult<bool>;
65
66 async fn mark_completed(&self, event_id: &str) -> StripeWebhookResult<()>;
68
69 async fn mark_failed(&self, event_id: &str, error: &str) -> StripeWebhookResult<()>;
71
72 async fn get_status(&self, event_id: &str) -> StripeWebhookResult<Option<IdempotencyEntry>>;
74
75 async fn cleanup(&self) -> StripeWebhookResult<usize>;
77}
78
79pub struct InMemoryIdempotencyStore {
84 entries: Arc<RwLock<HashMap<String, IdempotencyEntry>>>,
86 ttl: Duration,
88 max_entries: usize,
90}
91
92impl InMemoryIdempotencyStore {
93 pub fn new(ttl: Duration, max_entries: usize) -> Self {
95 Self {
96 entries: Arc::new(RwLock::new(HashMap::new())),
97 ttl,
98 max_entries,
99 }
100 }
101
102 pub fn from_config(config: &crate::stripe::config::StripeWebhookConfig) -> Self {
104 Self::new(config.idempotency_ttl, config.idempotency_max_entries)
105 }
106
107 fn is_expired(&self, entry: &IdempotencyEntry) -> bool {
109 entry.received_at.elapsed() > self.ttl
110 }
111
112 pub async fn len(&self) -> usize {
114 self.entries.read().await.len()
115 }
116
117 pub async fn is_empty(&self) -> bool {
119 self.entries.read().await.is_empty()
120 }
121}
122
123#[async_trait::async_trait]
124impl IdempotencyStore for InMemoryIdempotencyStore {
125 async fn check_and_record(&self, event_id: &str) -> StripeWebhookResult<bool> {
126 let mut entries = self.entries.write().await;
127 let now = Instant::now();
128
129 if let Some(existing) = entries.get(event_id) {
131 if !self.is_expired(existing) {
132 tracing::debug!(
133 event_id,
134 status = ?existing.status,
135 "Event already in idempotency store"
136 );
137 return Ok(false);
138 }
139 entries.remove(event_id);
141 }
142
143 if entries.len() >= self.max_entries {
145 let mut to_remove: Vec<String> = entries
147 .iter()
148 .filter(|(_, entry)| self.is_expired(entry))
149 .map(|(id, _)| id.clone())
150 .collect();
151
152 if to_remove.len() < entries.len() / 10 {
154 let mut by_age: Vec<_> = entries.iter().collect();
155 by_age.sort_by_key(|(_, entry)| entry.received_at);
156 to_remove.extend(
157 by_age
158 .iter()
159 .take(entries.len() / 10)
160 .map(|(id, _)| (*id).clone()),
161 );
162 }
163
164 for id in to_remove {
165 entries.remove(&id);
166 }
167
168 tracing::info!(
169 remaining = entries.len(),
170 max = self.max_entries,
171 "Evicted old idempotency entries"
172 );
173 }
174
175 entries.insert(
177 event_id.to_string(),
178 IdempotencyEntry {
179 event_id: event_id.to_string(),
180 status: ProcessingStatus::InProgress,
181 received_at: now,
182 updated_at: now,
183 attempts: 1,
184 },
185 );
186
187 tracing::debug!(event_id, "New event recorded in idempotency store");
188 Ok(true)
189 }
190
191 async fn mark_completed(&self, event_id: &str) -> StripeWebhookResult<()> {
192 let mut entries = self.entries.write().await;
193
194 if let Some(entry) = entries.get_mut(event_id) {
195 entry.status = ProcessingStatus::Completed;
196 entry.updated_at = Instant::now();
197 tracing::debug!(event_id, "Event marked as completed");
198 } else {
199 tracing::warn!(
200 event_id,
201 "Attempted to mark non-existent event as completed"
202 );
203 }
204
205 Ok(())
206 }
207
208 async fn mark_failed(&self, event_id: &str, error: &str) -> StripeWebhookResult<()> {
209 let mut entries = self.entries.write().await;
210
211 if let Some(entry) = entries.get_mut(event_id) {
212 entry.status = ProcessingStatus::Failed {
213 error: error.to_string(),
214 };
215 entry.updated_at = Instant::now();
216 tracing::debug!(event_id, error, "Event marked as failed");
217 } else {
218 tracing::warn!(event_id, "Attempted to mark non-existent event as failed");
219 }
220
221 Ok(())
222 }
223
224 async fn get_status(&self, event_id: &str) -> StripeWebhookResult<Option<IdempotencyEntry>> {
225 let entries = self.entries.read().await;
226
227 if let Some(entry) = entries.get(event_id) {
228 if self.is_expired(entry) {
229 return Ok(None);
230 }
231 return Ok(Some(entry.clone()));
232 }
233
234 Ok(None)
235 }
236
237 async fn cleanup(&self) -> StripeWebhookResult<usize> {
238 let mut entries = self.entries.write().await;
239 let before = entries.len();
240
241 entries.retain(|_, entry| !self.is_expired(entry));
242
243 let removed = before - entries.len();
244 if removed > 0 {
245 tracing::info!(
246 removed,
247 remaining = entries.len(),
248 "Cleaned up expired idempotency entries"
249 );
250 }
251
252 Ok(removed)
253 }
254}
255
256pub struct IdempotencyMiddleware<S: IdempotencyStore> {
258 store: Arc<S>,
259}
260
261impl<S: IdempotencyStore> IdempotencyMiddleware<S> {
262 pub fn new(store: Arc<S>) -> Self {
264 Self { store }
265 }
266
267 pub async fn should_process(&self, event_id: &str) -> StripeWebhookResult<bool> {
275 match self.store.check_and_record(event_id).await {
276 Ok(true) => Ok(true),
277 Ok(false) => {
278 Err(StripeWebhookError::AlreadyProcessed {
280 event_id: event_id.to_string(),
281 })
282 }
283 Err(e) => Err(e),
284 }
285 }
286
287 pub async fn complete(&self, event_id: &str) -> StripeWebhookResult<()> {
289 self.store.mark_completed(event_id).await
290 }
291
292 pub async fn fail(&self, event_id: &str, error: &str) -> StripeWebhookResult<()> {
294 self.store.mark_failed(event_id, error).await
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301
302 #[tokio::test]
303 async fn test_check_and_record_new_event() {
304 let store = InMemoryIdempotencyStore::new(Duration::from_secs(3600), 1000);
305
306 let result = store.check_and_record("evt_123").await.unwrap();
307 assert!(result); let result = store.check_and_record("evt_123").await.unwrap();
310 assert!(!result); }
312
313 #[tokio::test]
314 async fn test_check_and_record_different_events() {
315 let store = InMemoryIdempotencyStore::new(Duration::from_secs(3600), 1000);
316
317 assert!(store.check_and_record("evt_1").await.unwrap());
318 assert!(store.check_and_record("evt_2").await.unwrap());
319 assert!(store.check_and_record("evt_3").await.unwrap());
320
321 assert!(!store.check_and_record("evt_1").await.unwrap());
322 assert!(!store.check_and_record("evt_2").await.unwrap());
323 }
324
325 #[tokio::test]
326 async fn test_mark_completed() {
327 let store = InMemoryIdempotencyStore::new(Duration::from_secs(3600), 1000);
328
329 store.check_and_record("evt_123").await.unwrap();
330 store.mark_completed("evt_123").await.unwrap();
331
332 let entry = store.get_status("evt_123").await.unwrap().unwrap();
333 assert_eq!(entry.status, ProcessingStatus::Completed);
334 }
335
336 #[tokio::test]
337 async fn test_mark_failed() {
338 let store = InMemoryIdempotencyStore::new(Duration::from_secs(3600), 1000);
339
340 store.check_and_record("evt_123").await.unwrap();
341 store
342 .mark_failed("evt_123", "Database error")
343 .await
344 .unwrap();
345
346 let entry = store.get_status("evt_123").await.unwrap().unwrap();
347 assert!(matches!(entry.status, ProcessingStatus::Failed { .. }));
348 }
349
350 #[tokio::test]
351 async fn test_expired_entries() {
352 let store = InMemoryIdempotencyStore::new(Duration::from_millis(10), 1000);
353
354 store.check_and_record("evt_123").await.unwrap();
355
356 tokio::time::sleep(Duration::from_millis(20)).await;
358
359 assert!(store.check_and_record("evt_123").await.unwrap());
361 }
362
363 #[tokio::test]
364 async fn test_cleanup() {
365 let store = InMemoryIdempotencyStore::new(Duration::from_millis(10), 1000);
366
367 store.check_and_record("evt_1").await.unwrap();
368 store.check_and_record("evt_2").await.unwrap();
369
370 tokio::time::sleep(Duration::from_millis(20)).await;
372
373 let removed = store.cleanup().await.unwrap();
374 assert_eq!(removed, 2);
375 assert!(store.is_empty().await);
376 }
377
378 #[tokio::test]
379 async fn test_max_entries_eviction() {
380 let store = InMemoryIdempotencyStore::new(Duration::from_secs(3600), 10);
381
382 for i in 0..15 {
384 store.check_and_record(&format!("evt_{}", i)).await.unwrap();
385 }
386
387 assert!(store.len().await <= 15);
389 }
390
391 #[tokio::test]
392 async fn test_idempotency_middleware() {
393 let store = Arc::new(InMemoryIdempotencyStore::new(
394 Duration::from_secs(3600),
395 1000,
396 ));
397 let middleware = IdempotencyMiddleware::new(store);
398
399 assert!(middleware.should_process("evt_123").await.is_ok());
401
402 let result = middleware.should_process("evt_123").await;
404 assert!(matches!(
405 result,
406 Err(StripeWebhookError::AlreadyProcessed { .. })
407 ));
408
409 middleware.complete("evt_123").await.unwrap();
411 }
412}