axon/idempotency.rs
1//! §Fase 32.f — Idempotency-Key store for first-class axonendpoint routes.
2//!
3//! Stripe-compatible Idempotency-Key semantics for POST/PUT routes,
4//! the banking-grade primitive that makes safe client retries possible
5//! on flaky networks. The invariant ratified in D7:
6//!
7//! **same_key + same_body ⟹ same_response** (within retention window).
8//!
9//! ## D7 truth table (per plan vivo §7.2)
10//!
11//! | request key | endpoint method | cache state | response |
12//! |-------------|-----------------|-------------|-----------------------------------------|
13//! | absent | any | n/a | normal execute (no caching) |
14//! | present | POST or PUT | miss | execute + cache + 200 (or original) |
15//! | present | POST or PUT | hit, same | byte-identical cached body + Idempotency-Status: replayed |
16//! | present | POST or PUT | hit, differ | 422 `idempotency_key_reused_with_different_request` |
17//! | present | GET or DELETE | n/a | key ignored (logged); HTTP-spec idempotent natively |
18//!
19//! ## Cross-tenant isolation
20//!
21//! Cache key = `(client_id, endpoint_path, idempotency_key)`. Two
22//! tenants cannot collide on the same Idempotency-Key because the
23//! `client_id` (from auth bearer or `"anonymous"` fallback) namespaces
24//! the entry. This honors PCI DSS Req 8 (account-level segregation)
25//! and SOC 2 CC6 (logical access controls).
26//!
27//! ## Retention
28//!
29//! Default 24h sliding window per Stripe / Plaid convention. Entries
30//! older than the window are evicted lazily on lookup; a periodic
31//! reaper (`reap_expired`) is exposed so a server task can run it
32//! out-of-band.
33//!
34//! ## Pillar trace per D12
35//!
36//! - **MATHEMATICS** — the cache is a partial function with retention:
37//! `lookup : (client_id, path, key, body_hash, now) → Option<Response>`.
38//! Single-valued for every input; the body_hash check forbids
39//! silent body drift collapsing two distinct requests into one cached
40//! response.
41//! - **LOGIC** — `same_key + same_body ⟹ same_response` invariant
42//! provably preserved when the cached response is returned verbatim
43//! (status + headers + body cloned byte-for-byte).
44//! - **PHILOSOPHY** — the language honors the industry standard verbatim:
45//! Stripe / Plaid / Square clients work unchanged when pointed at
46//! axon endpoints.
47//! - **COMPUTING** — D9 backwards-compat absolute: requests without
48//! the header AND endpoints without `method: POST|PUT` are unaffected;
49//! no client behavior changes.
50
51use std::collections::HashMap;
52use std::time::{Duration, Instant};
53
54use sha2::{Digest, Sha256};
55
56/// Default retention window per Stripe / Plaid convention.
57pub const DEFAULT_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
58
59/// One cached response entry. Holds enough metadata to project the
60/// original wire response back verbatim (status, body, content-type)
61/// AND to detect body drift for the same key (request_body_hash).
62#[derive(Debug, Clone)]
63pub struct IdempotencyEntry {
64 /// SHA-256 of the canonicalized request body. Used to detect
65 /// "same key, different body" → 422.
66 pub request_body_hash: [u8; 32],
67 /// HTTP status code of the cached response.
68 pub status: u16,
69 /// Content-Type header of the cached response (preserved verbatim
70 /// so the replay matches the original wire format — JSON, SSE,
71 /// ndjson, etc.).
72 pub content_type: String,
73 /// Cached response body bytes.
74 pub body: Vec<u8>,
75 /// When this entry was inserted. Used by the retention sweep to
76 /// evict entries older than the configured window.
77 pub inserted_at: Instant,
78}
79
80/// Composite key namespacing each entry by client + endpoint + key.
81/// Cross-tenant isolation is a property of this struct's identity:
82/// two clients cannot collide on the same Idempotency-Key value
83/// because their `client_id` prefixes differ.
84#[derive(Debug, Clone, PartialEq, Eq, Hash)]
85pub struct IdempotencyCacheKey {
86 pub client_id: String,
87 pub endpoint_path: String,
88 pub idempotency_key: String,
89}
90
91/// Result of a cache lookup. Total enum — every input either misses,
92/// hits with a matching body (→ replay), or hits with a different
93/// body for the same key (→ 422 conflict).
94#[derive(Debug, Clone)]
95pub enum IdempotencyVerdict {
96 Miss,
97 Hit(IdempotencyEntry),
98 Conflict {
99 /// The conflict diagnostic surfaces the cached body's hash
100 /// (hex prefix) so the adopter can correlate the failing
101 /// request with whatever the original request body was.
102 cached_body_hash_hex: String,
103 },
104}
105
106/// In-memory Idempotency-Key store. Bounded by capacity (default
107/// 10_000 entries — generous for the high-traffic banking POST case);
108/// once full, the oldest entry is evicted on insert.
109#[derive(Debug)]
110pub struct IdempotencyStore {
111 entries: HashMap<IdempotencyCacheKey, IdempotencyEntry>,
112 capacity: usize,
113 retention: Duration,
114}
115
116impl Default for IdempotencyStore {
117 fn default() -> Self {
118 Self::new(10_000, DEFAULT_RETENTION)
119 }
120}
121
122impl IdempotencyStore {
123 pub fn new(capacity: usize, retention: Duration) -> Self {
124 Self {
125 entries: HashMap::new(),
126 capacity,
127 retention,
128 }
129 }
130
131 pub fn len(&self) -> usize {
132 self.entries.len()
133 }
134
135 pub fn is_empty(&self) -> bool {
136 self.entries.is_empty()
137 }
138
139 /// Hex-encode the first 8 bytes of a SHA-256 digest. Enough
140 /// entropy for adopter-side correlation, doesn't leak the full
141 /// hash (defense-in-depth).
142 pub fn hash_prefix_hex(hash: &[u8; 32]) -> String {
143 let mut s = String::with_capacity(16);
144 for byte in &hash[..8] {
145 s.push_str(&format!("{byte:02x}"));
146 }
147 s
148 }
149
150 /// Compute the canonical body hash. We hash the raw bytes the
151 /// client sent — adopters submitting JSON with whitespace
152 /// differences will hash DIFFERENTLY, which is the safer default
153 /// (the client must canonicalize on its side if it wants
154 /// semantic equality). Matches Stripe's behavior.
155 pub fn hash_body(body: &[u8]) -> [u8; 32] {
156 let mut h = Sha256::new();
157 h.update(body);
158 h.finalize().into()
159 }
160
161 /// Look up a cached entry. Three-way verdict:
162 /// - Miss: no entry for this key.
163 /// - Hit(entry): entry found, body hash matches — replay.
164 /// - Conflict: entry found, body hash MISMATCH — return 422.
165 /// Expired entries are evicted lazily and reported as Miss.
166 pub fn lookup(
167 &mut self,
168 key: &IdempotencyCacheKey,
169 request_body_hash: &[u8; 32],
170 ) -> IdempotencyVerdict {
171 let now = Instant::now();
172 let entry = match self.entries.get(key) {
173 Some(e) => e.clone(),
174 None => return IdempotencyVerdict::Miss,
175 };
176 if now.duration_since(entry.inserted_at) > self.retention {
177 self.entries.remove(key);
178 return IdempotencyVerdict::Miss;
179 }
180 if &entry.request_body_hash == request_body_hash {
181 IdempotencyVerdict::Hit(entry)
182 } else {
183 IdempotencyVerdict::Conflict {
184 cached_body_hash_hex: Self::hash_prefix_hex(&entry.request_body_hash),
185 }
186 }
187 }
188
189 /// Insert (or overwrite) an entry. Caller is responsible for
190 /// only caching successful responses (the gate in
191 /// `dynamic_endpoint_handler` only caches 2xx — preserving the
192 /// semantic that retries genuinely retry execution on failure).
193 pub fn insert(&mut self, key: IdempotencyCacheKey, entry: IdempotencyEntry) {
194 // Evict if at capacity (oldest entry first).
195 if self.entries.len() >= self.capacity && !self.entries.contains_key(&key) {
196 // Find the oldest entry. Linear scan is acceptable at
197 // the default capacity (10k); for larger stores a BTree
198 // by insertion-time would replace this.
199 if let Some(oldest_key) = self
200 .entries
201 .iter()
202 .min_by_key(|(_, e)| e.inserted_at)
203 .map(|(k, _)| k.clone())
204 {
205 self.entries.remove(&oldest_key);
206 }
207 }
208 self.entries.insert(key, entry);
209 }
210
211 /// Sweep expired entries. Returns the number reaped. Intended to
212 /// be called periodically by a server task to bound memory.
213 pub fn reap_expired(&mut self) -> usize {
214 let now = Instant::now();
215 let before = self.entries.len();
216 let retention = self.retention;
217 self.entries
218 .retain(|_, e| now.duration_since(e.inserted_at) <= retention);
219 before - self.entries.len()
220 }
221
222 /// Reconfigure retention (for tests + per-endpoint future tuning).
223 pub fn set_retention(&mut self, retention: Duration) {
224 self.retention = retention;
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231
232 fn key(c: &str, p: &str, k: &str) -> IdempotencyCacheKey {
233 IdempotencyCacheKey {
234 client_id: c.to_string(),
235 endpoint_path: p.to_string(),
236 idempotency_key: k.to_string(),
237 }
238 }
239
240 fn entry(body: &str, status: u16) -> (IdempotencyEntry, [u8; 32]) {
241 let body_bytes = body.as_bytes().to_vec();
242 let hash = IdempotencyStore::hash_body(&body_bytes);
243 (
244 IdempotencyEntry {
245 request_body_hash: hash,
246 status,
247 content_type: "application/json".to_string(),
248 body: body_bytes,
249 inserted_at: Instant::now(),
250 },
251 hash,
252 )
253 }
254
255 #[test]
256 fn miss_on_empty_store() {
257 let mut s = IdempotencyStore::default();
258 let h = IdempotencyStore::hash_body(b"{}");
259 assert!(matches!(
260 s.lookup(&key("c1", "/p", "k1"), &h),
261 IdempotencyVerdict::Miss
262 ));
263 }
264
265 #[test]
266 fn hit_on_same_key_and_body() {
267 let mut s = IdempotencyStore::default();
268 let (e, h) = entry("{\"amount\":42}", 200);
269 s.insert(key("c1", "/p", "k1"), e.clone());
270 let verdict = s.lookup(&key("c1", "/p", "k1"), &h);
271 match verdict {
272 IdempotencyVerdict::Hit(got) => {
273 assert_eq!(got.status, 200);
274 assert_eq!(got.body, e.body);
275 }
276 _ => panic!("expected Hit"),
277 }
278 }
279
280 #[test]
281 fn conflict_on_same_key_different_body() {
282 let mut s = IdempotencyStore::default();
283 let (e, _h) = entry("{\"amount\":42}", 200);
284 s.insert(key("c1", "/p", "k1"), e);
285 let h_other = IdempotencyStore::hash_body(b"{\"amount\":99}");
286 match s.lookup(&key("c1", "/p", "k1"), &h_other) {
287 IdempotencyVerdict::Conflict { cached_body_hash_hex } => {
288 assert_eq!(cached_body_hash_hex.len(), 16);
289 }
290 _ => panic!("expected Conflict"),
291 }
292 }
293
294 #[test]
295 fn cross_tenant_isolation() {
296 let mut s = IdempotencyStore::default();
297 let (e, h) = entry("{\"x\":1}", 200);
298 s.insert(key("c1", "/p", "k1"), e);
299 // Different client_id, same key — must be a miss.
300 assert!(matches!(
301 s.lookup(&key("c2", "/p", "k1"), &h),
302 IdempotencyVerdict::Miss
303 ));
304 // Different path, same key — must be a miss.
305 assert!(matches!(
306 s.lookup(&key("c1", "/other", "k1"), &h),
307 IdempotencyVerdict::Miss
308 ));
309 }
310
311 #[test]
312 fn retention_expiry_evicts_old_entry() {
313 let mut s = IdempotencyStore::new(10, Duration::from_millis(0));
314 let (e, h) = entry("{}", 200);
315 s.insert(key("c1", "/p", "k1"), e);
316 std::thread::sleep(Duration::from_millis(2));
317 assert!(matches!(
318 s.lookup(&key("c1", "/p", "k1"), &h),
319 IdempotencyVerdict::Miss
320 ));
321 // Eviction happens during lookup — store should be empty.
322 assert_eq!(s.len(), 0);
323 }
324
325 #[test]
326 fn reap_expired_returns_count() {
327 let mut s = IdempotencyStore::new(10, Duration::from_millis(0));
328 let (e1, _) = entry("{\"a\":1}", 200);
329 let (e2, _) = entry("{\"a\":2}", 200);
330 s.insert(key("c1", "/p", "k1"), e1);
331 s.insert(key("c1", "/p", "k2"), e2);
332 assert_eq!(s.len(), 2);
333 std::thread::sleep(Duration::from_millis(2));
334 assert_eq!(s.reap_expired(), 2);
335 assert_eq!(s.len(), 0);
336 }
337
338 #[test]
339 fn capacity_eviction_drops_oldest_on_overflow() {
340 let mut s = IdempotencyStore::new(2, DEFAULT_RETENTION);
341 let (e1, h1) = entry("{\"a\":1}", 200);
342 s.insert(key("c1", "/p", "k1"), e1);
343 std::thread::sleep(Duration::from_millis(1));
344 let (e2, _) = entry("{\"a\":2}", 200);
345 s.insert(key("c1", "/p", "k2"), e2);
346 std::thread::sleep(Duration::from_millis(1));
347 let (e3, _) = entry("{\"a\":3}", 200);
348 s.insert(key("c1", "/p", "k3"), e3);
349 assert_eq!(s.len(), 2);
350 // k1 (oldest) was evicted.
351 assert!(matches!(
352 s.lookup(&key("c1", "/p", "k1"), &h1),
353 IdempotencyVerdict::Miss
354 ));
355 }
356
357 #[test]
358 fn hash_prefix_hex_is_16_chars_lowercase() {
359 let h = IdempotencyStore::hash_body(b"hello");
360 let prefix = IdempotencyStore::hash_prefix_hex(&h);
361 assert_eq!(prefix.len(), 16);
362 for c in prefix.chars() {
363 assert!(c.is_ascii_hexdigit() && !c.is_ascii_uppercase());
364 }
365 }
366
367 #[test]
368 fn hash_body_deterministic() {
369 // Same bytes ⟹ same hash. The fundamental invariant.
370 let a = IdempotencyStore::hash_body(b"{\"x\":1}");
371 let b = IdempotencyStore::hash_body(b"{\"x\":1}");
372 assert_eq!(a, b);
373 }
374
375 #[test]
376 fn hash_body_sensitive_to_whitespace() {
377 // Whitespace differences hash differently — adopters who want
378 // semantic equality must canonicalize on the client. Matches
379 // Stripe's documented behavior.
380 let a = IdempotencyStore::hash_body(b"{\"x\":1}");
381 let b = IdempotencyStore::hash_body(b"{ \"x\": 1 }");
382 assert_ne!(a, b);
383 }
384}