Skip to main content

axon/
idempotency.rs

1//! §Fase 32.f — Idempotency-Key store for first-class axonendpoint routes.
2//!
3//! Stripe-compatible Idempotency-Key semantics for POST/PUT routes,
4//! the banking-grade primitive that makes safe client retries possible
5//! on flaky networks. The invariant ratified in D7:
6//!
7//!   **same_key + same_body ⟹ same_response** (within retention window).
8//!
9//! ## D7 truth table (per plan vivo §7.2)
10//!
11//! | request key | endpoint method | cache state | response                                |
12//! |-------------|-----------------|-------------|-----------------------------------------|
13//! | absent      | any             | n/a         | normal execute (no caching)             |
14//! | present     | POST or PUT     | miss        | execute + cache + 200 (or original)     |
15//! | present     | POST or PUT     | hit, same   | byte-identical cached body + Idempotency-Status: replayed |
16//! | present     | POST or PUT     | hit, differ | 422 `idempotency_key_reused_with_different_request` |
17//! | present     | GET or DELETE   | n/a         | key ignored (logged); HTTP-spec idempotent natively |
18//!
19//! ## Cross-tenant isolation
20//!
21//! Cache key = `(client_id, endpoint_path, idempotency_key)`. Two
22//! tenants cannot collide on the same Idempotency-Key because the
23//! `client_id` (from auth bearer or `"anonymous"` fallback) namespaces
24//! the entry. This honors PCI DSS Req 8 (account-level segregation)
25//! and SOC 2 CC6 (logical access controls).
26//!
27//! ## Retention
28//!
29//! Default 24h sliding window per Stripe / Plaid convention. Entries
30//! older than the window are evicted lazily on lookup; a periodic
31//! reaper (`reap_expired`) is exposed so a server task can run it
32//! out-of-band.
33//!
34//! ## Pillar trace per D12
35//!
36//! - **MATHEMATICS** — the cache is a partial function with retention:
37//!   `lookup : (client_id, path, key, body_hash, now) → Option<Response>`.
38//!   Single-valued for every input; the body_hash check forbids
39//!   silent body drift collapsing two distinct requests into one cached
40//!   response.
41//! - **LOGIC** — `same_key + same_body ⟹ same_response` invariant
42//!   provably preserved when the cached response is returned verbatim
43//!   (status + headers + body cloned byte-for-byte).
44//! - **PHILOSOPHY** — the language honors the industry standard verbatim:
45//!   Stripe / Plaid / Square clients work unchanged when pointed at
46//!   axon endpoints.
47//! - **COMPUTING** — D9 backwards-compat absolute: requests without
48//!   the header AND endpoints without `method: POST|PUT` are unaffected;
49//!   no client behavior changes.
50
51use std::collections::HashMap;
52use std::time::{Duration, Instant};
53
54use sha2::{Digest, Sha256};
55
56/// Default retention window per Stripe / Plaid convention.
57pub const DEFAULT_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
58
59/// One cached response entry. Holds enough metadata to project the
60/// original wire response back verbatim (status, body, content-type)
61/// AND to detect body drift for the same key (request_body_hash).
62#[derive(Debug, Clone)]
63pub struct IdempotencyEntry {
64    /// SHA-256 of the canonicalized request body. Used to detect
65    /// "same key, different body" → 422.
66    pub request_body_hash: [u8; 32],
67    /// HTTP status code of the cached response.
68    pub status: u16,
69    /// Content-Type header of the cached response (preserved verbatim
70    /// so the replay matches the original wire format — JSON, SSE,
71    /// ndjson, etc.).
72    pub content_type: String,
73    /// Cached response body bytes.
74    pub body: Vec<u8>,
75    /// When this entry was inserted. Used by the retention sweep to
76    /// evict entries older than the configured window.
77    pub inserted_at: Instant,
78}
79
80/// Composite key namespacing each entry by client + endpoint + key.
81/// Cross-tenant isolation is a property of this struct's identity:
82/// two clients cannot collide on the same Idempotency-Key value
83/// because their `client_id` prefixes differ.
84#[derive(Debug, Clone, PartialEq, Eq, Hash)]
85pub struct IdempotencyCacheKey {
86    pub client_id: String,
87    pub endpoint_path: String,
88    pub idempotency_key: String,
89}
90
91/// Result of a cache lookup. Total enum — every input either misses,
92/// hits with a matching body (→ replay), or hits with a different
93/// body for the same key (→ 422 conflict).
94#[derive(Debug, Clone)]
95pub enum IdempotencyVerdict {
96    Miss,
97    Hit(IdempotencyEntry),
98    Conflict {
99        /// The conflict diagnostic surfaces the cached body's hash
100        /// (hex prefix) so the adopter can correlate the failing
101        /// request with whatever the original request body was.
102        cached_body_hash_hex: String,
103    },
104}
105
106/// In-memory Idempotency-Key store. Bounded by capacity (default
107/// 10_000 entries — generous for the high-traffic banking POST case);
108/// once full, the oldest entry is evicted on insert.
109#[derive(Debug)]
110pub struct IdempotencyStore {
111    entries: HashMap<IdempotencyCacheKey, IdempotencyEntry>,
112    capacity: usize,
113    retention: Duration,
114}
115
116impl Default for IdempotencyStore {
117    fn default() -> Self {
118        Self::new(10_000, DEFAULT_RETENTION)
119    }
120}
121
122impl IdempotencyStore {
123    pub fn new(capacity: usize, retention: Duration) -> Self {
124        Self {
125            entries: HashMap::new(),
126            capacity,
127            retention,
128        }
129    }
130
131    pub fn len(&self) -> usize {
132        self.entries.len()
133    }
134
135    pub fn is_empty(&self) -> bool {
136        self.entries.is_empty()
137    }
138
139    /// Hex-encode the first 8 bytes of a SHA-256 digest. Enough
140    /// entropy for adopter-side correlation, doesn't leak the full
141    /// hash (defense-in-depth).
142    pub fn hash_prefix_hex(hash: &[u8; 32]) -> String {
143        let mut s = String::with_capacity(16);
144        for byte in &hash[..8] {
145            s.push_str(&format!("{byte:02x}"));
146        }
147        s
148    }
149
150    /// Compute the canonical body hash. We hash the raw bytes the
151    /// client sent — adopters submitting JSON with whitespace
152    /// differences will hash DIFFERENTLY, which is the safer default
153    /// (the client must canonicalize on its side if it wants
154    /// semantic equality). Matches Stripe's behavior.
155    pub fn hash_body(body: &[u8]) -> [u8; 32] {
156        let mut h = Sha256::new();
157        h.update(body);
158        h.finalize().into()
159    }
160
161    /// Look up a cached entry. Three-way verdict:
162    ///   - Miss: no entry for this key.
163    ///   - Hit(entry): entry found, body hash matches — replay.
164    ///   - Conflict: entry found, body hash MISMATCH — return 422.
165    /// Expired entries are evicted lazily and reported as Miss.
166    pub fn lookup(
167        &mut self,
168        key: &IdempotencyCacheKey,
169        request_body_hash: &[u8; 32],
170    ) -> IdempotencyVerdict {
171        let now = Instant::now();
172        let entry = match self.entries.get(key) {
173            Some(e) => e.clone(),
174            None => return IdempotencyVerdict::Miss,
175        };
176        if now.duration_since(entry.inserted_at) > self.retention {
177            self.entries.remove(key);
178            return IdempotencyVerdict::Miss;
179        }
180        if &entry.request_body_hash == request_body_hash {
181            IdempotencyVerdict::Hit(entry)
182        } else {
183            IdempotencyVerdict::Conflict {
184                cached_body_hash_hex: Self::hash_prefix_hex(&entry.request_body_hash),
185            }
186        }
187    }
188
189    /// Insert (or overwrite) an entry. Caller is responsible for
190    /// only caching successful responses (the gate in
191    /// `dynamic_endpoint_handler` only caches 2xx — preserving the
192    /// semantic that retries genuinely retry execution on failure).
193    pub fn insert(&mut self, key: IdempotencyCacheKey, entry: IdempotencyEntry) {
194        // Evict if at capacity (oldest entry first).
195        if self.entries.len() >= self.capacity && !self.entries.contains_key(&key) {
196            // Find the oldest entry. Linear scan is acceptable at
197            // the default capacity (10k); for larger stores a BTree
198            // by insertion-time would replace this.
199            if let Some(oldest_key) = self
200                .entries
201                .iter()
202                .min_by_key(|(_, e)| e.inserted_at)
203                .map(|(k, _)| k.clone())
204            {
205                self.entries.remove(&oldest_key);
206            }
207        }
208        self.entries.insert(key, entry);
209    }
210
211    /// Sweep expired entries. Returns the number reaped. Intended to
212    /// be called periodically by a server task to bound memory.
213    pub fn reap_expired(&mut self) -> usize {
214        let now = Instant::now();
215        let before = self.entries.len();
216        let retention = self.retention;
217        self.entries
218            .retain(|_, e| now.duration_since(e.inserted_at) <= retention);
219        before - self.entries.len()
220    }
221
222    /// Reconfigure retention (for tests + per-endpoint future tuning).
223    pub fn set_retention(&mut self, retention: Duration) {
224        self.retention = retention;
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231
232    fn key(c: &str, p: &str, k: &str) -> IdempotencyCacheKey {
233        IdempotencyCacheKey {
234            client_id: c.to_string(),
235            endpoint_path: p.to_string(),
236            idempotency_key: k.to_string(),
237        }
238    }
239
240    fn entry(body: &str, status: u16) -> (IdempotencyEntry, [u8; 32]) {
241        let body_bytes = body.as_bytes().to_vec();
242        let hash = IdempotencyStore::hash_body(&body_bytes);
243        (
244            IdempotencyEntry {
245                request_body_hash: hash,
246                status,
247                content_type: "application/json".to_string(),
248                body: body_bytes,
249                inserted_at: Instant::now(),
250            },
251            hash,
252        )
253    }
254
255    #[test]
256    fn miss_on_empty_store() {
257        let mut s = IdempotencyStore::default();
258        let h = IdempotencyStore::hash_body(b"{}");
259        assert!(matches!(
260            s.lookup(&key("c1", "/p", "k1"), &h),
261            IdempotencyVerdict::Miss
262        ));
263    }
264
265    #[test]
266    fn hit_on_same_key_and_body() {
267        let mut s = IdempotencyStore::default();
268        let (e, h) = entry("{\"amount\":42}", 200);
269        s.insert(key("c1", "/p", "k1"), e.clone());
270        let verdict = s.lookup(&key("c1", "/p", "k1"), &h);
271        match verdict {
272            IdempotencyVerdict::Hit(got) => {
273                assert_eq!(got.status, 200);
274                assert_eq!(got.body, e.body);
275            }
276            _ => panic!("expected Hit"),
277        }
278    }
279
280    #[test]
281    fn conflict_on_same_key_different_body() {
282        let mut s = IdempotencyStore::default();
283        let (e, _h) = entry("{\"amount\":42}", 200);
284        s.insert(key("c1", "/p", "k1"), e);
285        let h_other = IdempotencyStore::hash_body(b"{\"amount\":99}");
286        match s.lookup(&key("c1", "/p", "k1"), &h_other) {
287            IdempotencyVerdict::Conflict { cached_body_hash_hex } => {
288                assert_eq!(cached_body_hash_hex.len(), 16);
289            }
290            _ => panic!("expected Conflict"),
291        }
292    }
293
294    #[test]
295    fn cross_tenant_isolation() {
296        let mut s = IdempotencyStore::default();
297        let (e, h) = entry("{\"x\":1}", 200);
298        s.insert(key("c1", "/p", "k1"), e);
299        // Different client_id, same key — must be a miss.
300        assert!(matches!(
301            s.lookup(&key("c2", "/p", "k1"), &h),
302            IdempotencyVerdict::Miss
303        ));
304        // Different path, same key — must be a miss.
305        assert!(matches!(
306            s.lookup(&key("c1", "/other", "k1"), &h),
307            IdempotencyVerdict::Miss
308        ));
309    }
310
311    #[test]
312    fn retention_expiry_evicts_old_entry() {
313        let mut s = IdempotencyStore::new(10, Duration::from_millis(0));
314        let (e, h) = entry("{}", 200);
315        s.insert(key("c1", "/p", "k1"), e);
316        std::thread::sleep(Duration::from_millis(2));
317        assert!(matches!(
318            s.lookup(&key("c1", "/p", "k1"), &h),
319            IdempotencyVerdict::Miss
320        ));
321        // Eviction happens during lookup — store should be empty.
322        assert_eq!(s.len(), 0);
323    }
324
325    #[test]
326    fn reap_expired_returns_count() {
327        let mut s = IdempotencyStore::new(10, Duration::from_millis(0));
328        let (e1, _) = entry("{\"a\":1}", 200);
329        let (e2, _) = entry("{\"a\":2}", 200);
330        s.insert(key("c1", "/p", "k1"), e1);
331        s.insert(key("c1", "/p", "k2"), e2);
332        assert_eq!(s.len(), 2);
333        std::thread::sleep(Duration::from_millis(2));
334        assert_eq!(s.reap_expired(), 2);
335        assert_eq!(s.len(), 0);
336    }
337
338    #[test]
339    fn capacity_eviction_drops_oldest_on_overflow() {
340        let mut s = IdempotencyStore::new(2, DEFAULT_RETENTION);
341        let (e1, h1) = entry("{\"a\":1}", 200);
342        s.insert(key("c1", "/p", "k1"), e1);
343        std::thread::sleep(Duration::from_millis(1));
344        let (e2, _) = entry("{\"a\":2}", 200);
345        s.insert(key("c1", "/p", "k2"), e2);
346        std::thread::sleep(Duration::from_millis(1));
347        let (e3, _) = entry("{\"a\":3}", 200);
348        s.insert(key("c1", "/p", "k3"), e3);
349        assert_eq!(s.len(), 2);
350        // k1 (oldest) was evicted.
351        assert!(matches!(
352            s.lookup(&key("c1", "/p", "k1"), &h1),
353            IdempotencyVerdict::Miss
354        ));
355    }
356
357    #[test]
358    fn hash_prefix_hex_is_16_chars_lowercase() {
359        let h = IdempotencyStore::hash_body(b"hello");
360        let prefix = IdempotencyStore::hash_prefix_hex(&h);
361        assert_eq!(prefix.len(), 16);
362        for c in prefix.chars() {
363            assert!(c.is_ascii_hexdigit() && !c.is_ascii_uppercase());
364        }
365    }
366
367    #[test]
368    fn hash_body_deterministic() {
369        // Same bytes ⟹ same hash. The fundamental invariant.
370        let a = IdempotencyStore::hash_body(b"{\"x\":1}");
371        let b = IdempotencyStore::hash_body(b"{\"x\":1}");
372        assert_eq!(a, b);
373    }
374
375    #[test]
376    fn hash_body_sensitive_to_whitespace() {
377        // Whitespace differences hash differently — adopters who want
378        // semantic equality must canonicalize on the client. Matches
379        // Stripe's documented behavior.
380        let a = IdempotencyStore::hash_body(b"{\"x\":1}");
381        let b = IdempotencyStore::hash_body(b"{ \"x\": 1 }");
382        assert_ne!(a, b);
383    }
384}