arcly-http 0.2.0

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Per-route response cache, driven by `#[CacheTTL(N)]` on the route method.
//!
//! Implemented as a singleton `Interceptor` whose `around` reads the matched
//! route's `RouteSpec.cache_ttl_secs`. If the TTL is non-zero the interceptor
//! consults a process-global, sharded, lock-striped store (`dashmap`) keyed
//! by `method + path + query`. On hit, the cached body bytes are replayed —
//! the inner handler is never invoked. On miss, the inner chain runs and the
//! 2xx response body is fingerprinted and stored.
//!
//! Why dashmap rather than a single `RwLock<HashMap>`: read concurrency in
//! dashmap is **per-shard** — a hot cached endpoint serializes only against
//! the small minority of requests targeting the same shard. There is no
//! single global lock on the request path. Writes touch one shard.
//!
//! Why not lock-free? `papaya`/seize-style stores would be truly lock-free
//! but add a non-trivial dependency footprint. dashmap is the standard
//! pragmatic choice and matches the audit's "no global lock on the active
//! request path" intent.

use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};

use axum::body::{to_bytes, Body};
use axum::http::HeaderValue;
use axum::response::Response;
use dashmap::DashMap;
use futures::future::BoxFuture;
use once_cell::sync::Lazy;

use crate::web::context::RequestContext;
use crate::web::interceptors::{Interceptor, NextHandler};

/// One cached response.
#[derive(Clone)]
struct CachedEntry {
    bytes: bytes::Bytes,
    content_type: Option<HeaderValue>,
    status: u16,
    inserted_at: Instant,
    ttl: Duration,
}

impl CachedEntry {
    #[inline]
    fn is_fresh(&self) -> bool {
        self.inserted_at.elapsed() < self.ttl
    }
}

/// Process-global store. `dashmap` is concurrent-safe with sharded locking —
/// reads scale linearly until shards collide.
static STORE: Lazy<DashMap<String, CachedEntry>> = Lazy::new(DashMap::new);
static HITS: AtomicU64 = AtomicU64::new(0);
static MISSES: AtomicU64 = AtomicU64::new(0);
static REJECTED: AtomicU64 = AtomicU64::new(0);

/// Hard ceiling on cached entries. The cache key includes the query string,
/// so without a bound an attacker spraying random querystrings at any
/// `#[CacheTTL]` route grows the map without limit (memory-exhaustion DoS).
/// Configured at launch via `LaunchConfig::cache_max_entries`.
static CAPACITY: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(10_000);

#[doc(hidden)]
pub fn set_capacity(max_entries: usize) {
    CAPACITY.store(max_entries, Ordering::Relaxed);
}

/// Drop every expired entry. Called periodically by the launch-spawned
/// sweeper so entries that are never re-requested still get reclaimed
/// (the hit path only evicts keys it happens to touch).
pub(crate) fn sweep_expired() {
    STORE.retain(|_, e| e.is_fresh());
}

/// Spawn the background sweeper. Runs off the request path; `retain` locks
/// one shard at a time, briefly, every `interval`.
pub(crate) fn spawn_sweeper(interval: Duration) {
    tokio::spawn(async move {
        let mut tick = tokio::time::interval(interval);
        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
        loop {
            tick.tick().await;
            sweep_expired();
        }
    });
}

/// Telemetry snapshot for the cache. Plugins / handlers can expose this.
#[derive(Debug, Clone, Copy, serde::Serialize)]
pub struct CacheStats {
    pub hits: u64,
    pub misses: u64,
    pub entries: u64,
    /// Inserts skipped because the store was at capacity.
    pub rejected: u64,
}

#[inline]
pub fn stats() -> CacheStats {
    CacheStats {
        hits: HITS.load(Ordering::Relaxed),
        misses: MISSES.load(Ordering::Relaxed),
        entries: STORE.len() as u64,
        rejected: REJECTED.load(Ordering::Relaxed),
    }
}

/// Manually invalidate all cached entries. Useful for tests + admin endpoints.
pub fn clear() {
    STORE.clear();
}

/// Build the lookup key for one request. If the route configured a custom
/// `cache_key` template, that string takes precedence and the user assumes
/// responsibility for variability. Otherwise: `METHOD + ' ' + path + '?' + query`.
#[inline]
fn key_for(ctx: &RequestContext) -> String {
    if let Some(spec) = ctx.route_spec() {
        if !spec.cache_key.is_empty() {
            return spec.cache_key.to_owned();
        }
    }
    let q = ctx.query_string().unwrap_or("");
    if q.is_empty() {
        format!("{} {}", ctx.method(), ctx.path())
    } else {
        format!("{} {}?{}", ctx.method(), ctx.path(), q)
    }
}

/// The interceptor itself — a unit struct so it can be referenced from a
/// `static` in macro-generated code (see `wrap_interceptors` in the macros
/// crate).
pub struct CacheInterceptor;

impl Interceptor for CacheInterceptor {
    fn around(
        &'static self,
        ctx: RequestContext,
        next: NextHandler,
    ) -> BoxFuture<'static, Response> {
        Box::pin(async move {
            let ttl_secs = ctx.route_spec().map(|s| s.cache_ttl_secs).unwrap_or(0);
            if ttl_secs == 0 {
                // Cache disabled for this route — pass through with no work.
                return next.run(ctx).await;
            }
            let key = key_for(&ctx);

            // ── Hit path ──────────────────────────────────────────────
            if let Some(entry) = STORE.get(&key) {
                if entry.is_fresh() {
                    HITS.fetch_add(1, Ordering::Relaxed);
                    let mut resp = Response::builder()
                        .status(entry.status)
                        .header("x-cache", "HIT")
                        .body(Body::from(entry.bytes.clone()))
                        .expect("cache hit response builds");
                    if let Some(ct) = entry.content_type.clone() {
                        resp.headers_mut().insert("content-type", ct);
                    }
                    return resp;
                } else {
                    // Entry exists but is stale — evict it now rather than
                    // letting the map grow unboundedly with dead entries.
                    drop(entry);
                    STORE.remove(&key);
                }
            }

            MISSES.fetch_add(1, Ordering::Relaxed);
            // ── Miss path: run inner, then fingerprint + store the body ──
            let resp = next.run(ctx).await;
            if !resp.status().is_success() {
                return resp;
            }

            let (mut parts, body) = resp.into_parts();
            const MAX_CACHE_BODY: usize = 8 * 1024 * 1024;
            let bytes = to_bytes(body, MAX_CACHE_BODY).await.unwrap_or_default();
            let content_type = parts.headers.get("content-type").cloned();

            // Capacity gate: serve the response uncached rather than grow
            // without bound. Existing keys may still refresh in place.
            if STORE.len() < CAPACITY.load(Ordering::Relaxed) || STORE.contains_key(&key) {
                STORE.insert(
                    key,
                    CachedEntry {
                        bytes: bytes.clone(),
                        content_type,
                        status: parts.status.as_u16(),
                        inserted_at: Instant::now(),
                        ttl: Duration::from_secs(ttl_secs),
                    },
                );
            } else {
                REJECTED.fetch_add(1, Ordering::Relaxed);
            }
            parts
                .headers
                .insert("x-cache", HeaderValue::from_static("MISS"));
            Response::from_parts(parts, Body::from(bytes))
        })
    }
}