arcly-http 0.3.0

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Idempotency-Key layer — safe client retries for mutating endpoints.
//!
//! Stripe-style semantics for `#[Idempotent(ttl = "24h")]` routes:
//!
//! - Client sends `Idempotency-Key: <opaque>`; the key is scoped to
//!   `tenant + principal + route`, so keys can never collide across users.
//! - First arrival **claims** the key atomically and runs the handler; the
//!   final response (status < 500) is stored for `ttl`.
//! - A retry with the same key gets the **stored response replayed**
//!   (`Idempotency-Replayed: true`) — no second order, no double charge.
//! - A concurrent duplicate while the first is still running gets
//!   `409 Conflict` (retry after the original completes).
//! - Handler failures ≥ 500 release the claim so the client may retry.
//! - Requests without the header pass through untouched.
//!
//! Pairs deliberately with `#[Timeout]`: deadlines make clients retry; this
//! layer makes those retries safe.

use axum::response::Response;
use futures::future::BoxFuture;

use crate::web::context::RequestContext;
use crate::web::error::{Conflict, HttpError};

/// Cap on stored response bodies — larger responses pass through unstored
/// (the retry will re-execute; correctness preserved by the in-flight gate).
const MAX_STORED_BODY: usize = 256 * 1024;

// ─── Store contract ───────────────────────────────────────────────────────────

pub enum IdempotencyDecision {
    /// Key claimed — run the handler, then `complete` or `release`.
    Fresh,
    /// A finished response exists — replay it verbatim.
    Replay { status: u16, body: bytes::Bytes },
    /// Another request with this key is currently executing.
    InFlight,
    /// Store unreachable — fail-open (run the handler, skip storing).
    Unavailable,
}

/// App-provided storage (Redis `SET NX PX` in production).
pub trait IdempotencyStore: Send + Sync + 'static {
    fn claim<'a>(&'a self, key: &'a str, ttl_secs: u64) -> BoxFuture<'a, IdempotencyDecision>;
    fn complete<'a>(
        &'a self,
        key: &'a str,
        status: u16,
        body: &'a [u8],
        ttl_secs: u64,
    ) -> BoxFuture<'a, ()>;
    fn release<'a>(&'a self, key: &'a str) -> BoxFuture<'a, ()>;
}

// ─── Macro entry point ────────────────────────────────────────────────────────

/// Called by the `#[Idempotent]` expansion — outermost wrapper, so replays
/// skip guards, transactions, and audit (the original already recorded them).
#[doc(hidden)]
pub async fn run_idempotent<Fut>(
    ctx: &RequestContext,
    ttl_secs: u64,
    route: &'static str,
    handler: Fut,
) -> Response
where
    Fut: std::future::Future<Output = Response>,
{
    // No header or no store → plain execution.
    let (Some(client_key), Some(store)) = (
        ctx.header("idempotency-key")
            .map(str::trim)
            .filter(|k| !k.is_empty()),
        ctx.try_inject::<Box<dyn IdempotencyStore>>(),
    ) else {
        return handler.await;
    };

    // Scope: tenant + principal + route + client key.
    let tenant = ctx.tenant().map(|t| t.id.as_str()).unwrap_or("-");
    let sub = ctx
        .claims()
        .and_then(|c| c.get("sub"))
        .and_then(|v| v.as_str())
        .unwrap_or("-");
    let key = format!("idem:{tenant}:{sub}:{route}:{client_key}");

    match store.claim(&key, ttl_secs).await {
        IdempotencyDecision::Replay { status, body } => {
            metrics::counter!("idempotency_replays_total").increment(1);
            Response::builder()
                .status(status)
                .header("content-type", "application/json")
                .header("idempotency-replayed", "true")
                .body(axum::body::Body::from(body))
                .unwrap_or_else(|_| Response::new(axum::body::Body::empty()))
        }
        IdempotencyDecision::InFlight => {
            metrics::counter!("idempotency_conflicts_total").increment(1);
            crate::http::IntoResponse::into_response(crate::web::error::HttpException::from(
                Conflict::new("a request with this Idempotency-Key is already in flight"),
            ))
        }
        IdempotencyDecision::Unavailable => {
            // Fail-open: availability over replay protection; the conflict
            // gate is best-effort while the store is down.
            metrics::counter!("idempotency_store_errors_total").increment(1);
            handler.await
        }
        IdempotencyDecision::Fresh => {
            let resp = handler.await;
            let status = resp.status().as_u16();

            if status >= 500 {
                // Let the client retry a genuine failure.
                store.release(&key).await;
                return resp;
            }

            // Buffer (bounded) so the response can be stored AND returned.
            let (parts, body) = resp.into_parts();
            match axum::body::to_bytes(body, MAX_STORED_BODY).await {
                Ok(bytes) => {
                    store.complete(&key, status, &bytes, ttl_secs).await;
                    Response::from_parts(parts, axum::body::Body::from(bytes))
                }
                Err(_) => {
                    // Body exceeded the cap (streaming/huge) — don't store;
                    // release so a retry re-executes rather than 409s forever.
                    store.release(&key).await;
                    Response::from_parts(parts, axum::body::Body::empty())
                }
            }
        }
    }
}

// Conflict already implements HttpError via the stock_error! macro; the use
// above goes through HttpException::from for the standard ProblemDetails body.
const _: fn() = || {
    fn assert_http_error<T: HttpError>() {}
    assert_http_error::<Conflict>();
};