Skip to main content

ralph_api/
idempotency.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, Instant};
4
5use serde_json::Value;
6
7#[derive(Debug, Clone)]
8pub struct StoredResponse {
9    pub status: u16,
10    pub envelope: Value,
11}
12
13#[derive(Debug, Clone)]
14pub enum IdempotencyCheck {
15    New,
16    Replay(StoredResponse),
17    Conflict,
18}
19
20pub trait IdempotencyStore: Send + Sync {
21    fn check(&self, method: &str, key: &str, params: &Value) -> IdempotencyCheck;
22    fn store(&self, method: &str, key: &str, params: &Value, response: &StoredResponse);
23}
24
25#[derive(Debug, Clone)]
26pub struct InMemoryIdempotencyStore {
27    entries: Arc<Mutex<HashMap<String, Entry>>>,
28    ttl: Duration,
29}
30
31#[derive(Debug, Clone)]
32struct Entry {
33    params: Value,
34    response: StoredResponse,
35    created_at: Instant,
36}
37
38impl InMemoryIdempotencyStore {
39    pub fn new(ttl: Duration) -> Self {
40        Self {
41            entries: Arc::new(Mutex::new(HashMap::new())),
42            ttl,
43        }
44    }
45
46    fn cleanup(&self, entries: &mut HashMap<String, Entry>) {
47        let ttl = self.ttl;
48        entries.retain(|_, entry| entry.created_at.elapsed() <= ttl);
49    }
50
51    fn make_key(method: &str, key: &str) -> String {
52        format!("{method}:{key}")
53    }
54}
55
56impl IdempotencyStore for InMemoryIdempotencyStore {
57    fn check(&self, method: &str, key: &str, params: &Value) -> IdempotencyCheck {
58        let mut guard = self
59            .entries
60            .lock()
61            .expect("idempotency store mutex should not be poisoned");
62        self.cleanup(&mut guard);
63
64        let store_key = Self::make_key(method, key);
65        match guard.get(&store_key) {
66            None => IdempotencyCheck::New,
67            Some(entry) if entry.params == *params => {
68                IdempotencyCheck::Replay(entry.response.clone())
69            }
70            Some(_) => IdempotencyCheck::Conflict,
71        }
72    }
73
74    fn store(&self, method: &str, key: &str, params: &Value, response: &StoredResponse) {
75        let mut guard = self
76            .entries
77            .lock()
78            .expect("idempotency store mutex should not be poisoned");
79        self.cleanup(&mut guard);
80        guard.insert(
81            Self::make_key(method, key),
82            Entry {
83                params: params.clone(),
84                response: response.clone(),
85                created_at: Instant::now(),
86            },
87        );
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use std::time::Duration;
94
95    use serde_json::json;
96
97    use super::{IdempotencyCheck, IdempotencyStore, InMemoryIdempotencyStore, StoredResponse};
98
99    #[test]
100    fn replays_same_method_key_and_params() {
101        let store = InMemoryIdempotencyStore::new(Duration::from_secs(60));
102        let params = json!({ "value": 1 });
103        let response = StoredResponse {
104            status: 200,
105            envelope: json!({ "ok": true }),
106        };
107
108        assert!(matches!(
109            store.check("task.create", "idem-1", &params),
110            IdempotencyCheck::New
111        ));
112
113        store.store("task.create", "idem-1", &params, &response);
114
115        let replay = store.check("task.create", "idem-1", &params);
116        match replay {
117            IdempotencyCheck::Replay(actual) => assert_eq!(actual.envelope, response.envelope),
118            _ => panic!("expected replay"),
119        }
120    }
121
122    #[test]
123    fn detects_conflict_for_same_key_with_different_params() {
124        let store = InMemoryIdempotencyStore::new(Duration::from_secs(60));
125        store.store(
126            "task.create",
127            "idem-2",
128            &json!({ "value": 1 }),
129            &StoredResponse {
130                status: 200,
131                envelope: json!({ "ok": true }),
132            },
133        );
134
135        assert!(matches!(
136            store.check("task.create", "idem-2", &json!({ "value": 2 })),
137            IdempotencyCheck::Conflict
138        ));
139    }
140}