Skip to main content

arcly_http/web/
idempotency.rs

1//! Idempotency-Key layer — safe client retries for mutating endpoints.
2//!
3//! Stripe-style semantics for `#[Idempotent(ttl = "24h")]` routes:
4//!
5//! - Client sends `Idempotency-Key: <opaque>`; the key is scoped to
6//!   `tenant + principal + route`, so keys can never collide across users.
7//! - First arrival **claims** the key atomically and runs the handler; the
8//!   final response (status < 500) is stored for `ttl`.
9//! - A retry with the same key gets the **stored response replayed**
10//!   (`Idempotency-Replayed: true`) — no second order, no double charge.
11//! - A concurrent duplicate while the first is still running gets
12//!   `409 Conflict` (retry after the original completes).
13//! - Handler failures ≥ 500 release the claim so the client may retry.
14//! - Requests without the header pass through untouched.
15//!
16//! Pairs deliberately with `#[Timeout]`: deadlines make clients retry; this
17//! layer makes those retries safe.
18
19use axum::response::Response;
20use futures::future::BoxFuture;
21
22use crate::web::context::RequestContext;
23use crate::web::error::{Conflict, HttpError};
24
25/// Cap on stored response bodies — larger responses pass through unstored
26/// (the retry will re-execute; correctness preserved by the in-flight gate).
27const MAX_STORED_BODY: usize = 256 * 1024;
28
29// ─── Store contract ───────────────────────────────────────────────────────────
30
31pub enum IdempotencyDecision {
32    /// Key claimed — run the handler, then `complete` or `release`.
33    Fresh,
34    /// A finished response exists — replay it verbatim.
35    Replay { status: u16, body: bytes::Bytes },
36    /// Another request with this key is currently executing.
37    InFlight,
38    /// Store unreachable — fail-open (run the handler, skip storing).
39    Unavailable,
40}
41
42/// App-provided storage (Redis `SET NX PX` in production).
43pub trait IdempotencyStore: Send + Sync + 'static {
44    fn claim<'a>(&'a self, key: &'a str, ttl_secs: u64) -> BoxFuture<'a, IdempotencyDecision>;
45    fn complete<'a>(
46        &'a self,
47        key: &'a str,
48        status: u16,
49        body: &'a [u8],
50        ttl_secs: u64,
51    ) -> BoxFuture<'a, ()>;
52    fn release<'a>(&'a self, key: &'a str) -> BoxFuture<'a, ()>;
53}
54
55// ─── Macro entry point ────────────────────────────────────────────────────────
56
57/// Called by the `#[Idempotent]` expansion — outermost wrapper, so replays
58/// skip guards, transactions, and audit (the original already recorded them).
59#[doc(hidden)]
60pub async fn run_idempotent<Fut>(
61    ctx: &RequestContext,
62    ttl_secs: u64,
63    route: &'static str,
64    handler: Fut,
65) -> Response
66where
67    Fut: std::future::Future<Output = Response>,
68{
69    // No header or no store → plain execution.
70    let (Some(client_key), Some(store)) = (
71        ctx.header("idempotency-key")
72            .map(str::trim)
73            .filter(|k| !k.is_empty()),
74        ctx.try_inject::<Box<dyn IdempotencyStore>>(),
75    ) else {
76        return handler.await;
77    };
78
79    // Scope: tenant + principal + route + client key.
80    let tenant = ctx.tenant().map(|t| t.id.as_str()).unwrap_or("-");
81    let sub = ctx
82        .claims()
83        .and_then(|c| c.get("sub"))
84        .and_then(|v| v.as_str())
85        .unwrap_or("-");
86    let key = format!("idem:{tenant}:{sub}:{route}:{client_key}");
87
88    match store.claim(&key, ttl_secs).await {
89        IdempotencyDecision::Replay { status, body } => {
90            metrics::counter!("idempotency_replays_total").increment(1);
91            Response::builder()
92                .status(status)
93                .header("content-type", "application/json")
94                .header("idempotency-replayed", "true")
95                .body(axum::body::Body::from(body))
96                .unwrap_or_else(|_| Response::new(axum::body::Body::empty()))
97        }
98        IdempotencyDecision::InFlight => {
99            metrics::counter!("idempotency_conflicts_total").increment(1);
100            crate::http::IntoResponse::into_response(crate::web::error::HttpException::from(
101                Conflict::new("a request with this Idempotency-Key is already in flight"),
102            ))
103        }
104        IdempotencyDecision::Unavailable => {
105            // Fail-open: availability over replay protection; the conflict
106            // gate is best-effort while the store is down.
107            metrics::counter!("idempotency_store_errors_total").increment(1);
108            handler.await
109        }
110        IdempotencyDecision::Fresh => {
111            let resp = handler.await;
112            let status = resp.status().as_u16();
113
114            if status >= 500 {
115                // Let the client retry a genuine failure.
116                store.release(&key).await;
117                return resp;
118            }
119
120            // Buffer (bounded) so the response can be stored AND returned.
121            let (parts, body) = resp.into_parts();
122            match axum::body::to_bytes(body, MAX_STORED_BODY).await {
123                Ok(bytes) => {
124                    store.complete(&key, status, &bytes, ttl_secs).await;
125                    Response::from_parts(parts, axum::body::Body::from(bytes))
126                }
127                Err(_) => {
128                    // Body exceeded the cap (streaming/huge) — don't store;
129                    // release so a retry re-executes rather than 409s forever.
130                    store.release(&key).await;
131                    Response::from_parts(parts, axum::body::Body::empty())
132                }
133            }
134        }
135    }
136}
137
138// Conflict already implements HttpError via the stock_error! macro; the use
139// above goes through HttpException::from for the standard ProblemDetails body.
140const _: fn() = || {
141    fn assert_http_error<T: HttpError>() {}
142    assert_http_error::<Conflict>();
143};