1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, Instant};
4
5use serde_json::Value;
6
7#[derive(Debug, Clone)]
8pub struct StoredResponse {
9 pub status: u16,
10 pub envelope: Value,
11}
12
13#[derive(Debug, Clone)]
14pub enum IdempotencyCheck {
15 New,
16 Replay(StoredResponse),
17 Conflict,
18}
19
20pub trait IdempotencyStore: Send + Sync {
21 fn check(&self, method: &str, key: &str, params: &Value) -> IdempotencyCheck;
22 fn store(&self, method: &str, key: &str, params: &Value, response: &StoredResponse);
23}
24
25#[derive(Debug, Clone)]
26pub struct InMemoryIdempotencyStore {
27 entries: Arc<Mutex<HashMap<String, Entry>>>,
28 ttl: Duration,
29}
30
31#[derive(Debug, Clone)]
32struct Entry {
33 params: Value,
34 response: StoredResponse,
35 created_at: Instant,
36}
37
38impl InMemoryIdempotencyStore {
39 pub fn new(ttl: Duration) -> Self {
40 Self {
41 entries: Arc::new(Mutex::new(HashMap::new())),
42 ttl,
43 }
44 }
45
46 fn cleanup(&self, entries: &mut HashMap<String, Entry>) {
47 let ttl = self.ttl;
48 entries.retain(|_, entry| entry.created_at.elapsed() <= ttl);
49 }
50
51 fn make_key(method: &str, key: &str) -> String {
52 format!("{method}:{key}")
53 }
54}
55
56impl IdempotencyStore for InMemoryIdempotencyStore {
57 fn check(&self, method: &str, key: &str, params: &Value) -> IdempotencyCheck {
58 let mut guard = self
59 .entries
60 .lock()
61 .expect("idempotency store mutex should not be poisoned");
62 self.cleanup(&mut guard);
63
64 let store_key = Self::make_key(method, key);
65 match guard.get(&store_key) {
66 None => IdempotencyCheck::New,
67 Some(entry) if entry.params == *params => {
68 IdempotencyCheck::Replay(entry.response.clone())
69 }
70 Some(_) => IdempotencyCheck::Conflict,
71 }
72 }
73
74 fn store(&self, method: &str, key: &str, params: &Value, response: &StoredResponse) {
75 let mut guard = self
76 .entries
77 .lock()
78 .expect("idempotency store mutex should not be poisoned");
79 self.cleanup(&mut guard);
80 guard.insert(
81 Self::make_key(method, key),
82 Entry {
83 params: params.clone(),
84 response: response.clone(),
85 created_at: Instant::now(),
86 },
87 );
88 }
89}
90
91#[cfg(test)]
92mod tests {
93 use std::time::Duration;
94
95 use serde_json::json;
96
97 use super::{IdempotencyCheck, IdempotencyStore, InMemoryIdempotencyStore, StoredResponse};
98
99 #[test]
100 fn replays_same_method_key_and_params() {
101 let store = InMemoryIdempotencyStore::new(Duration::from_secs(60));
102 let params = json!({ "value": 1 });
103 let response = StoredResponse {
104 status: 200,
105 envelope: json!({ "ok": true }),
106 };
107
108 assert!(matches!(
109 store.check("task.create", "idem-1", ¶ms),
110 IdempotencyCheck::New
111 ));
112
113 store.store("task.create", "idem-1", ¶ms, &response);
114
115 let replay = store.check("task.create", "idem-1", ¶ms);
116 match replay {
117 IdempotencyCheck::Replay(actual) => assert_eq!(actual.envelope, response.envelope),
118 _ => panic!("expected replay"),
119 }
120 }
121
122 #[test]
123 fn detects_conflict_for_same_key_with_different_params() {
124 let store = InMemoryIdempotencyStore::new(Duration::from_secs(60));
125 store.store(
126 "task.create",
127 "idem-2",
128 &json!({ "value": 1 }),
129 &StoredResponse {
130 status: 200,
131 envelope: json!({ "ok": true }),
132 },
133 );
134
135 assert!(matches!(
136 store.check("task.create", "idem-2", &json!({ "value": 2 })),
137 IdempotencyCheck::Conflict
138 ));
139 }
140}