axess_cache/cache.rs
1//! [`ClockTtlCache`] and its single-flight in-flight machinery.
2//!
3//! The cache implementation itself lives here; the public counters
4//! snapshot is in [`crate::stats`].
5
6use crate::stats::{CacheCounters, CacheStats};
7use axess_clock::Clock;
8use lru::LruCache;
9use parking_lot::Mutex;
10use std::collections::HashMap;
11use std::hash::Hash;
12use std::num::NonZeroUsize;
13use std::sync::Arc;
14use std::sync::atomic::Ordering;
15use std::time::Duration;
16use tokio::sync::OnceCell;
17
18/// Cached value plus the absolute (epoch-µs) instant at which it expires.
19///
20/// `expires_at_micros` is computed at insert time as `clock.now() + ttl`,
21/// then compared against `clock.now()` at every read. Storing the absolute
22/// expiry rather than the insert-time + a static TTL means changing the
23/// cache's TTL doesn't retroactively re-validate stale entries.
24struct Entry<V> {
25 value: V,
26 expires_at_micros: i64,
27}
28
29/// TTL+LRU cache with all time decisions routed through an injected [`Clock`].
30///
31/// # Capacity vs. TTL
32///
33/// Two independent eviction triggers:
34///
35/// - **TTL**: checked on read. Expired entries are removed lazily when
36/// touched; [`cleanup_expired`](Self::cleanup_expired) sweeps the whole
37/// map for callers who want to reclaim memory eagerly.
38/// - **Capacity**: enforced at insert time by [`LruCache`]. Inserting into a
39/// full cache evicts the least-recently-used entry.
40///
41/// # Concurrency
42///
43/// Internal state is wrapped in a [`parking_lot::Mutex`]. Reads briefly
44/// acquire the lock to update LRU recency. The single mutex becomes the
45/// bottleneck only once multiple threads contend on the same hot key;
46/// `benches/cache_bench.rs` quantifies the regression. A sharded variant
47/// is deferred until real profiling shows it matters.
48///
49/// # DST guarantees
50///
51/// Every TTL decision and every `expires_at_micros` calculation goes
52/// through `clock.now()`. Tests using
53/// [`MockClock`](axess_clock::testing::MockClock) can advance time in
54/// arbitrary jumps and observe deterministic eviction. There are no
55/// background tasks and no calls into `Instant::now()` /
56/// `chrono::Utc::now()`.
57pub struct ClockTtlCache<K, V>
58where
59 K: Hash + Eq + Send + Sync,
60 V: Clone + Send + Sync,
61{
62 inner: Mutex<LruCache<K, Entry<V>>>,
63 /// In-flight loads for [`Self::get_or_try_insert_with`]. Held under a
64 /// dedicated mutex so cache reads aren't blocked by single-flight
65 /// bookkeeping. The TOCTOU between this map and `inner` is benign:
66 /// the worst case is one extra fetch (which is the work we'd be
67 /// doing anyway), and `OnceCell` semantics still guarantee a single
68 /// fetcher per cell across concurrent waiters.
69 inflight: Mutex<HashMap<K, Arc<OnceCell<V>>>>,
70 clock: Arc<dyn Clock>,
71 ttl: Duration,
72 pub(crate) stats: CacheCounters,
73}
74
75impl<K, V> ClockTtlCache<K, V>
76where
77 K: Hash + Eq + Clone + Send + Sync,
78 V: Clone + Send + Sync,
79{
80 /// Construct a cache with the given capacity, TTL, and Clock.
81 pub fn new(capacity: NonZeroUsize, ttl: Duration, clock: Arc<dyn Clock>) -> Self {
82 Self {
83 inner: Mutex::new(LruCache::new(capacity)),
84 inflight: Mutex::new(HashMap::new()),
85 clock,
86 ttl,
87 stats: CacheCounters::default(),
88 }
89 }
90
91 /// Snapshot of the live observability counters (hits, misses, evictions,
92 /// single-flight joins, …). See [`CacheStats`] for field semantics.
93 ///
94 /// Cheap (atomic loads only); safe to call on the hot path or from a
95 /// metrics scrape endpoint.
96 pub fn stats(&self) -> CacheStats {
97 self.stats.snapshot()
98 }
99
100 /// Reset every counter to zero. Useful for tests and for resetting a
101 /// metrics window after a scrape; production telemetry pipelines
102 /// typically just take rate-of-change of the monotonic counters and
103 /// shouldn't need this.
104 pub fn reset_stats(&self) {
105 self.stats.reset();
106 }
107
108 /// Look up a key. Returns `None` if absent or expired (and removes
109 /// the expired entry as a side effect; lazy TTL sweep).
110 pub fn get(&self, key: &K) -> Option<V> {
111 let now_micros = self.clock.now().timestamp_micros();
112 let mut guard = self.inner.lock();
113 // LruCache::get updates recency, which is what we want on a hit;
114 // on an expired-hit we then `pop` to evict.
115 if let Some(entry) = guard.get(key) {
116 if entry.expires_at_micros >= now_micros {
117 let v = entry.value.clone();
118 drop(guard);
119 self.stats.hits.fetch_add(1, Ordering::Relaxed);
120 return Some(v);
121 }
122 }
123 // Either absent or stale. `pop` is a no-op for the absent case;
124 // we count both as a miss (TTL-expired hits are semantically
125 // misses to the caller; they get None).
126 guard.pop(key);
127 drop(guard);
128 self.stats.misses.fetch_add(1, Ordering::Relaxed);
129 None
130 }
131
132 /// Insert or replace a value with the configured TTL.
133 pub fn insert(&self, key: K, value: V) {
134 let now_micros = self.clock.now().timestamp_micros();
135 let expires_at_micros = now_micros.saturating_add(self.ttl.as_micros() as i64);
136 let mut guard = self.inner.lock();
137 let len_before = guard.len();
138 let cap = guard.cap().get();
139 guard.put(
140 key,
141 Entry {
142 value,
143 expires_at_micros,
144 },
145 );
146 // LruCache::put on a full cache evicts the LRU entry to make room.
147 // Detect the eviction by checking that we hit (or were already at)
148 // capacity and the cache size didn't grow. Counts both
149 // "displaced existing key" and "popped LRU" paths; the former
150 // is also useful telemetry (tells you about overwrites under
151 // capacity pressure).
152 let inserted_displaced = len_before >= cap;
153 drop(guard);
154 self.stats.inserts.fetch_add(1, Ordering::Relaxed);
155 if inserted_displaced {
156 self.stats
157 .capacity_evictions
158 .fetch_add(1, Ordering::Relaxed);
159 }
160 }
161
162 /// Remove a single entry. Also removes any in-flight load cell for
163 /// the same key, so a concurrent
164 /// [`get_or_try_insert_with`](Self::get_or_try_insert_with) load that
165 /// resolves *after* this call cannot re-cache the now-invalidated
166 /// value. Returns `true` if anything was removed (cache row, in-flight
167 /// cell, or both).
168 ///
169 /// # Concurrency
170 ///
171 /// Two separate critical sections: inflight first, then LRU.
172 /// Lock-ordering rule (inflight before LRU) is preserved. A concurrent
173 /// load whose post-resolve runs *between* our inflight removal and
174 /// our LRU removal sees its cell missing from inflight and skips the
175 /// LRU promotion (see `get_or_try_insert_with`'s `Ok` arm). A
176 /// concurrent load whose post-resolve runs *before* our inflight
177 /// removal already inserted into LRU, which our LRU pop then
178 /// removes. Both paths converge on "key is not cached afterward."
179 pub fn invalidate(&self, key: &K) -> bool {
180 let inflight_removed = self.inflight.lock().remove(key).is_some();
181 let lru_removed = self.inner.lock().pop(key).is_some();
182 let removed = inflight_removed || lru_removed;
183 if removed {
184 self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
185 }
186 removed
187 }
188
189 /// Remove every cache entry whose key matches `predicate`. Also
190 /// removes matching in-flight load cells so concurrent loads
191 /// resolving after this call cannot re-cache invalidated values.
192 /// Returns the number of cache entries removed (LRU entries only;
193 /// in-flight removals are bookkeeping side effects).
194 ///
195 /// O(n) over the cache plus O(m) over the in-flight map. Use for
196 /// "evict everything for principal X" after a role change; small N
197 /// for typical caches; if your cardinality is large enough that
198 /// this is too slow, consider maintaining a secondary index outside
199 /// this primitive.
200 #[tracing::instrument(
201 name = "axess_cache.invalidate_by",
202 skip(self, predicate),
203 fields(removed = tracing::field::Empty),
204 )]
205 pub fn invalidate_by(&self, predicate: impl Fn(&K) -> bool) -> usize {
206 // Inflight first, then LRU; lock-ordering rule.
207 {
208 let mut inflight = self.inflight.lock();
209 let to_remove: Vec<K> = inflight.keys().filter(|k| predicate(k)).cloned().collect();
210 for k in &to_remove {
211 inflight.remove(k);
212 }
213 }
214
215 let mut guard = self.inner.lock();
216 let to_remove: Vec<K> = guard
217 .iter()
218 .filter_map(|(k, _)| if predicate(k) { Some(k.clone()) } else { None })
219 .collect();
220 let count = to_remove.len();
221 for k in &to_remove {
222 guard.pop(k);
223 }
224 drop(guard);
225 if count > 0 {
226 self.stats
227 .invalidations
228 .fetch_add(count as u64, Ordering::Relaxed);
229 }
230 tracing::Span::current().record("removed", count);
231 count
232 }
233
234 /// Drop every entry from the cache. Also drops every in-flight load
235 /// cell, so any load resolving after this call is barred from
236 /// promoting its value to the cache (post-`invalidate_all` cache
237 /// state must be empty).
238 #[tracing::instrument(
239 name = "axess_cache.invalidate_all",
240 skip(self),
241 fields(removed = tracing::field::Empty),
242 )]
243 pub fn invalidate_all(&self) {
244 // Inflight first, then LRU; lock-ordering rule.
245 self.inflight.lock().clear();
246
247 let mut guard = self.inner.lock();
248 let count = guard.len();
249 guard.clear();
250 drop(guard);
251 if count > 0 {
252 self.stats
253 .invalidations
254 .fetch_add(count as u64, Ordering::Relaxed);
255 }
256 tracing::Span::current().record("removed", count);
257 }
258
259 /// Walk the cache and remove every expired entry. Returns the number
260 /// of entries reclaimed.
261 ///
262 /// Optional: the cache evicts expired entries lazily on access via
263 /// [`get`](Self::get). Call this if you want to reclaim memory ahead
264 /// of next access (e.g. from a periodic Clock-aware scheduled task).
265 #[tracing::instrument(
266 name = "axess_cache.cleanup_expired",
267 skip(self),
268 fields(removed = tracing::field::Empty),
269 )]
270 pub fn cleanup_expired(&self) -> usize {
271 let now_micros = self.clock.now().timestamp_micros();
272 let mut guard = self.inner.lock();
273 let to_remove: Vec<K> = guard
274 .iter()
275 .filter_map(|(k, e)| {
276 if e.expires_at_micros < now_micros {
277 Some(k.clone())
278 } else {
279 None
280 }
281 })
282 .collect();
283 let count = to_remove.len();
284 for k in &to_remove {
285 guard.pop(k);
286 }
287 drop(guard);
288 if count > 0 {
289 self.stats
290 .invalidations
291 .fetch_add(count as u64, Ordering::Relaxed);
292 }
293 tracing::Span::current().record("removed", count);
294 count
295 }
296
297 /// Load `key` from the cache, or run `fetcher` to populate it.
298 /// Single-flight: concurrent calls for the same key share one fetcher
299 /// invocation, and the rest await the in-flight cell.
300 ///
301 /// # Errors
302 ///
303 /// On `fetcher` error the in-flight cell is removed so subsequent
304 /// callers retry rather than re-await a known-failing future. The
305 /// error is propagated to every concurrent caller that joined this
306 /// in-flight load.
307 ///
308 /// # Panic safety
309 ///
310 /// If `fetcher` panics, the in-flight cell is removed via an RAII
311 /// guard whose `Drop` runs during panic-unwind. Subsequent callers
312 /// for the same key get a fresh fetcher invocation rather than
313 /// joining a permanently-uninitialised cell.
314 ///
315 /// # Interaction with concurrent `invalidate`
316 ///
317 /// Invalidate wins against concurrent loads. If `invalidate` /
318 /// `invalidate_by` / `invalidate_all` runs while a load is in flight
319 /// for `key`, the loaded value is **not** promoted to the cache. The
320 /// caller that triggered the in-flight load (and joiners) still
321 /// receive the loaded value (the load started before the
322 /// invalidate), but the cache stays in its post-invalidate state and
323 /// the next lookup runs a fresh load. Aborting an in-flight load to
324 /// make joiners retry is not possible with [`tokio::sync::OnceCell`]
325 /// semantics.
326 ///
327 /// # Fetcher determinism
328 ///
329 /// Concurrent callers may each pass a different `fetcher` closure,
330 /// but only the first one to reach the cell is invoked; the rest are
331 /// dropped. Callers should pass fetchers that are pure functions of
332 /// the key, otherwise only the winning caller's side effects fire.
333 #[tracing::instrument(
334 name = "axess_cache.get_or_try_insert_with",
335 skip(self, key, fetcher),
336 fields(joined = tracing::field::Empty, error = tracing::field::Empty),
337 )]
338 pub async fn get_or_try_insert_with<F, Fut, E>(&self, key: K, fetcher: F) -> Result<V, E>
339 where
340 F: FnOnce() -> Fut,
341 Fut: std::future::Future<Output = Result<V, E>>,
342 {
343 // Fast path: cache hit.
344 if let Some(v) = self.get(&key) {
345 return Ok(v);
346 }
347
348 // Slow path: claim or join an in-flight cell.
349 let (cell, joined) = {
350 let mut inflight = self.inflight.lock();
351 match inflight.get(&key) {
352 Some(existing) => (existing.clone(), true),
353 None => {
354 let cell = Arc::new(OnceCell::<V>::new());
355 inflight.insert(key.clone(), cell.clone());
356 (cell, false)
357 }
358 }
359 };
360 tracing::Span::current().record("joined", joined);
361 if joined {
362 self.stats
363 .single_flight_joins
364 .fetch_add(1, Ordering::Relaxed);
365 }
366
367 // RAII cleanup of the in-flight slot. Drops on Ok / Err / panic-
368 // unwind, ensuring the slot doesn't leak even if `fetcher` panics
369 // mid-await. Same `Arc::ptr_eq`-gated removal as before.
370 let inflight_guard = InflightGuard {
371 cache: self,
372 key: key.clone(),
373 cell: cell.clone(),
374 };
375
376 // Race-resolve: `OnceCell::get_or_try_init` ensures exactly one
377 // closure runs across all concurrent callers for this cell.
378 // Subsequent callers `.await` the same future internally.
379 let result: Result<&V, E> = cell.get_or_try_init(fetcher).await;
380
381 let outcome = match result {
382 Ok(v_ref) => {
383 let v = v_ref.clone();
384 // Atomic publication against `invalidate*`: hold the
385 // inflight lock while we (a) check that our cell is still
386 // the active one and (b) promote to LRU. Since invalidate
387 // *must* take inflight first (lock-ordering rule), it
388 // cannot interleave between our check and our LRU insert.
389 {
390 let mut inflight = self.inflight.lock();
391 if let Some(existing) = inflight.get(&key)
392 && Arc::ptr_eq(existing, &cell)
393 {
394 // Still our cell. Insert into LRU under inflight
395 // hold so a racing invalidate is forced to wait;
396 // when it does run, it'll then remove the just-
397 // inserted LRU entry, converging on "invalidated."
398 // Critically: when invalidate runs *during* our
399 // load (before this point), the cell is removed
400 // from inflight, so this `if let Some(...)` won't
401 // match and we skip the promotion entirely.
402 self.insert(key.clone(), v.clone());
403 inflight.remove(&key);
404 }
405 // else: either invalidate ran during our load, or
406 // another caller's cleanup ran first. Either way,
407 // refuse to promote.
408 };
409 // Guard's Drop is now a no-op for the active path (we
410 // removed our own slot); for the !active path it's also
411 // a no-op (slot was already gone).
412 Ok(v)
413 }
414 Err(e) => {
415 self.stats
416 .single_flight_errors
417 .fetch_add(1, Ordering::Relaxed);
418 tracing::Span::current().record("error", true);
419 // Guard's Drop removes the slot so retries get a fresh
420 // fetcher rather than re-awaiting the failed future.
421 Err(e)
422 }
423 };
424 drop(inflight_guard);
425 outcome
426 }
427
428 /// Current number of entries (including any that are expired but not
429 /// yet evicted by a read or [`cleanup_expired`](Self::cleanup_expired)).
430 pub fn len(&self) -> usize {
431 self.inner.lock().len()
432 }
433
434 /// `true` if the cache holds zero entries.
435 pub fn is_empty(&self) -> bool {
436 self.inner.lock().len() == 0
437 }
438
439 /// Configured capacity (LRU bound).
440 pub fn capacity(&self) -> NonZeroUsize {
441 self.inner.lock().cap()
442 }
443
444 /// Number of in-flight loads currently waiting to resolve.
445 ///
446 /// Useful for ops monitoring ("is this pod stuck on slow fetchers?")
447 /// and for tests that verify `get_or_try_insert_with` cleans up its
448 /// in-flight cells correctly even on panic-unwind.
449 pub fn pending_loads_count(&self) -> usize {
450 self.inflight.lock().len()
451 }
452}
453
454/// RAII guard that removes an in-flight cell from the cache's
455/// pending-loads map on drop (including panic-unwind), gated on
456/// `Arc::ptr_eq` so a slot replaced mid-resolve is left intact.
457struct InflightGuard<'a, K, V>
458where
459 K: Hash + Eq + Clone + Send + Sync,
460 V: Clone + Send + Sync,
461{
462 cache: &'a ClockTtlCache<K, V>,
463 key: K,
464 cell: Arc<OnceCell<V>>,
465}
466
467impl<K, V> Drop for InflightGuard<'_, K, V>
468where
469 K: Hash + Eq + Clone + Send + Sync,
470 V: Clone + Send + Sync,
471{
472 fn drop(&mut self) {
473 let mut inflight = self.cache.inflight.lock();
474 if let Some(existing) = inflight.get(&self.key)
475 && Arc::ptr_eq(existing, &self.cell)
476 {
477 inflight.remove(&self.key);
478 }
479 }
480}