Skip to main content

arcly_http_core/web/
cache.rs

1//! Per-route response cache, driven by `#[CacheTTL(N)]` on the route method.
2//!
3//! Implemented as a singleton `Interceptor` whose `around` reads the matched
4//! route's `RouteSpec.cache_ttl_secs`. If the TTL is non-zero the interceptor
5//! consults a process-global, sharded, lock-striped store (`dashmap`) keyed
6//! by `method + path + query`. On hit, the cached body bytes are replayed —
7//! the inner handler is never invoked. On miss, the inner chain runs and the
8//! 2xx response body is fingerprinted and stored.
9//!
10//! Why dashmap rather than a single `RwLock<HashMap>`: read concurrency in
11//! dashmap is **per-shard** — a hot cached endpoint serializes only against
12//! the small minority of requests targeting the same shard. There is no
13//! single global lock on the request path. Writes touch one shard.
14//!
15//! Why not lock-free? `papaya`/seize-style stores would be truly lock-free
16//! but add a non-trivial dependency footprint. dashmap is the standard
17//! pragmatic choice and matches the audit's "no global lock on the active
18//! request path" intent.
19
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::time::{Duration, Instant};
22
23use axum::body::{to_bytes, Body};
24use axum::http::HeaderValue;
25use axum::response::Response;
26use dashmap::DashMap;
27use futures::future::BoxFuture;
28use once_cell::sync::Lazy;
29
30use crate::web::context::RequestContext;
31use crate::web::interceptors::{Interceptor, NextHandler};
32
33/// One cached response.
34#[derive(Clone)]
35struct CachedEntry {
36    bytes: bytes::Bytes,
37    content_type: Option<HeaderValue>,
38    status: u16,
39    inserted_at: Instant,
40    ttl: Duration,
41}
42
43impl CachedEntry {
44    #[inline]
45    fn is_fresh(&self) -> bool {
46        self.inserted_at.elapsed() < self.ttl
47    }
48}
49
50/// Process-global store. `dashmap` is concurrent-safe with sharded locking —
51/// reads scale linearly until shards collide.
52static STORE: Lazy<DashMap<String, CachedEntry>> = Lazy::new(DashMap::new);
53static HITS: AtomicU64 = AtomicU64::new(0);
54static MISSES: AtomicU64 = AtomicU64::new(0);
55static REJECTED: AtomicU64 = AtomicU64::new(0);
56
57/// Hard ceiling on cached entries. The cache key includes the query string,
58/// so without a bound an attacker spraying random querystrings at any
59/// `#[CacheTTL]` route grows the map without limit (memory-exhaustion DoS).
60/// Configured at launch via `LaunchConfig::cache_max_entries`.
61static CAPACITY: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(10_000);
62
63#[doc(hidden)]
64pub fn set_capacity(max_entries: usize) {
65    CAPACITY.store(max_entries, Ordering::Relaxed);
66}
67
68/// Drop every expired entry. Called periodically by the launch-spawned
69/// sweeper so entries that are never re-requested still get reclaimed
70/// (the hit path only evicts keys it happens to touch).
71pub(crate) fn sweep_expired() {
72    STORE.retain(|_, e| e.is_fresh());
73}
74
75/// Spawn the background sweeper. Runs off the request path; `retain` locks
76/// one shard at a time, briefly, every `interval`.
77#[doc(hidden)]
78pub fn spawn_sweeper(interval: Duration) {
79    tokio::spawn(async move {
80        let mut tick = tokio::time::interval(interval);
81        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
82        loop {
83            tick.tick().await;
84            sweep_expired();
85        }
86    });
87}
88
89/// Telemetry snapshot for the cache. Plugins / handlers can expose this.
90#[derive(Debug, Clone, Copy, serde::Serialize)]
91pub struct CacheStats {
92    pub hits: u64,
93    pub misses: u64,
94    pub entries: u64,
95    /// Inserts skipped because the store was at capacity.
96    pub rejected: u64,
97}
98
99#[inline]
100pub fn stats() -> CacheStats {
101    CacheStats {
102        hits: HITS.load(Ordering::Relaxed),
103        misses: MISSES.load(Ordering::Relaxed),
104        entries: STORE.len() as u64,
105        rejected: REJECTED.load(Ordering::Relaxed),
106    }
107}
108
109/// Manually invalidate all cached entries. Useful for tests + admin endpoints.
110pub fn clear() {
111    STORE.clear();
112}
113
114/// Build the lookup key for one request. If the route configured a custom
115/// `cache_key` template, that string takes precedence and the user assumes
116/// responsibility for variability. Otherwise: `METHOD + ' ' + path + '?' + query`.
117#[inline]
118fn key_for(ctx: &RequestContext) -> String {
119    if let Some(spec) = ctx.route_spec() {
120        if !spec.cache_key.is_empty() {
121            return spec.cache_key.to_owned();
122        }
123    }
124    let q = ctx.query_string().unwrap_or("");
125    if q.is_empty() {
126        format!("{} {}", ctx.method(), ctx.path())
127    } else {
128        format!("{} {}?{}", ctx.method(), ctx.path(), q)
129    }
130}
131
132/// The interceptor itself — a unit struct so it can be referenced from a
133/// `static` in macro-generated code (see `wrap_interceptors` in the macros
134/// crate).
135pub struct CacheInterceptor;
136
137impl Interceptor for CacheInterceptor {
138    fn around(
139        &'static self,
140        ctx: RequestContext,
141        next: NextHandler,
142    ) -> BoxFuture<'static, Response> {
143        Box::pin(async move {
144            let ttl_secs = ctx.route_spec().map(|s| s.cache_ttl_secs).unwrap_or(0);
145            if ttl_secs == 0 {
146                // Cache disabled for this route — pass through with no work.
147                return next.run(ctx).await;
148            }
149            let key = key_for(&ctx);
150
151            // ── Hit path ──────────────────────────────────────────────
152            if let Some(entry) = STORE.get(&key) {
153                if entry.is_fresh() {
154                    HITS.fetch_add(1, Ordering::Relaxed);
155                    let mut resp = Response::builder()
156                        .status(entry.status)
157                        .header("x-cache", "HIT")
158                        .body(Body::from(entry.bytes.clone()))
159                        .expect("cache hit response builds");
160                    if let Some(ct) = entry.content_type.clone() {
161                        resp.headers_mut().insert("content-type", ct);
162                    }
163                    return resp;
164                } else {
165                    // Entry exists but is stale — evict it now rather than
166                    // letting the map grow unboundedly with dead entries.
167                    drop(entry);
168                    STORE.remove(&key);
169                }
170            }
171
172            MISSES.fetch_add(1, Ordering::Relaxed);
173            // ── Miss path: run inner, then fingerprint + store the body ──
174            let resp = next.run(ctx).await;
175            if !resp.status().is_success() {
176                return resp;
177            }
178
179            let (mut parts, body) = resp.into_parts();
180            const MAX_CACHE_BODY: usize = 8 * 1024 * 1024;
181            let bytes = to_bytes(body, MAX_CACHE_BODY).await.unwrap_or_default();
182            let content_type = parts.headers.get("content-type").cloned();
183
184            // Capacity gate: serve the response uncached rather than grow
185            // without bound. Existing keys may still refresh in place.
186            if STORE.len() < CAPACITY.load(Ordering::Relaxed) || STORE.contains_key(&key) {
187                STORE.insert(
188                    key,
189                    CachedEntry {
190                        bytes: bytes.clone(),
191                        content_type,
192                        status: parts.status.as_u16(),
193                        inserted_at: Instant::now(),
194                        ttl: Duration::from_secs(ttl_secs),
195                    },
196                );
197            } else {
198                REJECTED.fetch_add(1, Ordering::Relaxed);
199            }
200            parts
201                .headers
202                .insert("x-cache", HeaderValue::from_static("MISS"));
203            Response::from_parts(parts, Body::from(bytes))
204        })
205    }
206}