arcly_http/web/cache.rs
1//! Per-route response cache, driven by `#[CacheTTL(N)]` on the route method.
2//!
3//! Implemented as a singleton `Interceptor` whose `around` reads the matched
4//! route's `RouteSpec.cache_ttl_secs`. If the TTL is non-zero the interceptor
5//! consults a process-global, sharded, lock-striped store (`dashmap`) keyed
6//! by `method + path + query`. On hit, the cached body bytes are replayed —
7//! the inner handler is never invoked. On miss, the inner chain runs and the
8//! 2xx response body is fingerprinted and stored.
9//!
10//! Why dashmap rather than a single `RwLock<HashMap>`: read concurrency in
11//! dashmap is **per-shard** — a hot cached endpoint serializes only against
12//! the small minority of requests targeting the same shard. There is no
13//! single global lock on the request path. Writes touch one shard.
14//!
15//! Why not lock-free? `papaya`/seize-style stores would be truly lock-free
16//! but add a non-trivial dependency footprint. dashmap is the standard
17//! pragmatic choice and matches the audit's "no global lock on the active
18//! request path" intent.
19
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::time::{Duration, Instant};
22
23use axum::body::{to_bytes, Body};
24use axum::http::HeaderValue;
25use axum::response::Response;
26use dashmap::DashMap;
27use futures::future::BoxFuture;
28use once_cell::sync::Lazy;
29
30use crate::web::context::RequestContext;
31use crate::web::interceptors::{Interceptor, NextHandler};
32
33/// One cached response.
34#[derive(Clone)]
35struct CachedEntry {
36 bytes: bytes::Bytes,
37 content_type: Option<HeaderValue>,
38 status: u16,
39 inserted_at: Instant,
40 ttl: Duration,
41}
42
43impl CachedEntry {
44 #[inline]
45 fn is_fresh(&self) -> bool {
46 self.inserted_at.elapsed() < self.ttl
47 }
48}
49
50/// Process-global store. `dashmap` is concurrent-safe with sharded locking —
51/// reads scale linearly until shards collide.
52static STORE: Lazy<DashMap<String, CachedEntry>> = Lazy::new(DashMap::new);
53static HITS: AtomicU64 = AtomicU64::new(0);
54static MISSES: AtomicU64 = AtomicU64::new(0);
55static REJECTED: AtomicU64 = AtomicU64::new(0);
56
57/// Hard ceiling on cached entries. The cache key includes the query string,
58/// so without a bound an attacker spraying random querystrings at any
59/// `#[CacheTTL]` route grows the map without limit (memory-exhaustion DoS).
60/// Configured at launch via `LaunchConfig::cache_max_entries`.
61static CAPACITY: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(10_000);
62
63#[doc(hidden)]
64pub fn set_capacity(max_entries: usize) {
65 CAPACITY.store(max_entries, Ordering::Relaxed);
66}
67
68/// Drop every expired entry. Called periodically by the launch-spawned
69/// sweeper so entries that are never re-requested still get reclaimed
70/// (the hit path only evicts keys it happens to touch).
71pub(crate) fn sweep_expired() {
72 STORE.retain(|_, e| e.is_fresh());
73}
74
75/// Spawn the background sweeper. Runs off the request path; `retain` locks
76/// one shard at a time, briefly, every `interval`.
77pub(crate) fn spawn_sweeper(interval: Duration) {
78 tokio::spawn(async move {
79 let mut tick = tokio::time::interval(interval);
80 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
81 loop {
82 tick.tick().await;
83 sweep_expired();
84 }
85 });
86}
87
88/// Telemetry snapshot for the cache. Plugins / handlers can expose this.
89#[derive(Debug, Clone, Copy, serde::Serialize)]
90pub struct CacheStats {
91 pub hits: u64,
92 pub misses: u64,
93 pub entries: u64,
94 /// Inserts skipped because the store was at capacity.
95 pub rejected: u64,
96}
97
98#[inline]
99pub fn stats() -> CacheStats {
100 CacheStats {
101 hits: HITS.load(Ordering::Relaxed),
102 misses: MISSES.load(Ordering::Relaxed),
103 entries: STORE.len() as u64,
104 rejected: REJECTED.load(Ordering::Relaxed),
105 }
106}
107
108/// Manually invalidate all cached entries. Useful for tests + admin endpoints.
109pub fn clear() {
110 STORE.clear();
111}
112
113/// Build the lookup key for one request. If the route configured a custom
114/// `cache_key` template, that string takes precedence and the user assumes
115/// responsibility for variability. Otherwise: `METHOD + ' ' + path + '?' + query`.
116#[inline]
117fn key_for(ctx: &RequestContext) -> String {
118 if let Some(spec) = ctx.route_spec() {
119 if !spec.cache_key.is_empty() {
120 return spec.cache_key.to_owned();
121 }
122 }
123 let q = ctx.query_string().unwrap_or("");
124 if q.is_empty() {
125 format!("{} {}", ctx.method(), ctx.path())
126 } else {
127 format!("{} {}?{}", ctx.method(), ctx.path(), q)
128 }
129}
130
131/// The interceptor itself — a unit struct so it can be referenced from a
132/// `static` in macro-generated code (see `wrap_interceptors` in the macros
133/// crate).
134pub struct CacheInterceptor;
135
136impl Interceptor for CacheInterceptor {
137 fn around(
138 &'static self,
139 ctx: RequestContext,
140 next: NextHandler,
141 ) -> BoxFuture<'static, Response> {
142 Box::pin(async move {
143 let ttl_secs = ctx.route_spec().map(|s| s.cache_ttl_secs).unwrap_or(0);
144 if ttl_secs == 0 {
145 // Cache disabled for this route — pass through with no work.
146 return next.run(ctx).await;
147 }
148 let key = key_for(&ctx);
149
150 // ── Hit path ──────────────────────────────────────────────
151 if let Some(entry) = STORE.get(&key) {
152 if entry.is_fresh() {
153 HITS.fetch_add(1, Ordering::Relaxed);
154 let mut resp = Response::builder()
155 .status(entry.status)
156 .header("x-cache", "HIT")
157 .body(Body::from(entry.bytes.clone()))
158 .expect("cache hit response builds");
159 if let Some(ct) = entry.content_type.clone() {
160 resp.headers_mut().insert("content-type", ct);
161 }
162 return resp;
163 } else {
164 // Entry exists but is stale — evict it now rather than
165 // letting the map grow unboundedly with dead entries.
166 drop(entry);
167 STORE.remove(&key);
168 }
169 }
170
171 MISSES.fetch_add(1, Ordering::Relaxed);
172 // ── Miss path: run inner, then fingerprint + store the body ──
173 let resp = next.run(ctx).await;
174 if !resp.status().is_success() {
175 return resp;
176 }
177
178 let (mut parts, body) = resp.into_parts();
179 const MAX_CACHE_BODY: usize = 8 * 1024 * 1024;
180 let bytes = to_bytes(body, MAX_CACHE_BODY).await.unwrap_or_default();
181 let content_type = parts.headers.get("content-type").cloned();
182
183 // Capacity gate: serve the response uncached rather than grow
184 // without bound. Existing keys may still refresh in place.
185 if STORE.len() < CAPACITY.load(Ordering::Relaxed) || STORE.contains_key(&key) {
186 STORE.insert(
187 key,
188 CachedEntry {
189 bytes: bytes.clone(),
190 content_type,
191 status: parts.status.as_u16(),
192 inserted_at: Instant::now(),
193 ttl: Duration::from_secs(ttl_secs),
194 },
195 );
196 } else {
197 REJECTED.fetch_add(1, Ordering::Relaxed);
198 }
199 parts
200 .headers
201 .insert("x-cache", HeaderValue::from_static("MISS"));
202 Response::from_parts(parts, Body::from(bytes))
203 })
204 }
205}