Skip to main content

axess_cache/
cache.rs

1//! [`ClockTtlCache`] and its single-flight in-flight machinery.
2//!
3//! The cache implementation itself lives here; the public counters
4//! snapshot is in [`crate::stats`].
5
6use crate::stats::{CacheCounters, CacheStats};
7use axess_clock::Clock;
8use lru::LruCache;
9use parking_lot::Mutex;
10use std::collections::HashMap;
11use std::hash::Hash;
12use std::num::NonZeroUsize;
13use std::sync::Arc;
14use std::sync::atomic::Ordering;
15use std::time::Duration;
16use tokio::sync::OnceCell;
17
18/// Cached value plus the absolute (epoch-µs) instant at which it expires.
19///
20/// `expires_at_micros` is computed at insert time as `clock.now() + ttl`,
21/// then compared against `clock.now()` at every read. Storing the absolute
22/// expiry rather than the insert-time + a static TTL means changing the
23/// cache's TTL doesn't retroactively re-validate stale entries.
24struct Entry<V> {
25    value: V,
26    expires_at_micros: i64,
27}
28
29/// TTL+LRU cache with all time decisions routed through an injected [`Clock`].
30///
31/// # Capacity vs. TTL
32///
33/// Two independent eviction triggers:
34///
35/// - **TTL**: checked on read. Expired entries are removed lazily when
36///   touched; [`cleanup_expired`](Self::cleanup_expired) sweeps the whole
37///   map for callers who want to reclaim memory eagerly.
38/// - **Capacity**: enforced at insert time by [`LruCache`]. Inserting into a
39///   full cache evicts the least-recently-used entry.
40///
41/// # Concurrency
42///
43/// Internal state is wrapped in a [`parking_lot::Mutex`]. Reads briefly
44/// acquire the lock to update LRU recency. The single mutex becomes the
45/// bottleneck only once multiple threads contend on the same hot key;
46/// `benches/cache_bench.rs` quantifies the regression. A sharded variant
47/// is deferred until real profiling shows it matters.
48///
49/// # DST guarantees
50///
51/// Every TTL decision and every `expires_at_micros` calculation goes
52/// through `clock.now()`. Tests using
53/// [`MockClock`](axess_clock::testing::MockClock) can advance time in
54/// arbitrary jumps and observe deterministic eviction. There are no
55/// background tasks and no calls into `Instant::now()` /
56/// `chrono::Utc::now()`.
57pub struct ClockTtlCache<K, V>
58where
59    K: Hash + Eq + Send + Sync,
60    V: Clone + Send + Sync,
61{
62    inner: Mutex<LruCache<K, Entry<V>>>,
63    /// In-flight loads for [`Self::get_or_try_insert_with`]. Held under a
64    /// dedicated mutex so cache reads aren't blocked by single-flight
65    /// bookkeeping. The TOCTOU between this map and `inner` is benign:
66    /// the worst case is one extra fetch (which is the work we'd be
67    /// doing anyway), and `OnceCell` semantics still guarantee a single
68    /// fetcher per cell across concurrent waiters.
69    inflight: Mutex<HashMap<K, Arc<OnceCell<V>>>>,
70    clock: Arc<dyn Clock>,
71    ttl: Duration,
72    pub(crate) stats: CacheCounters,
73}
74
75impl<K, V> ClockTtlCache<K, V>
76where
77    K: Hash + Eq + Clone + Send + Sync,
78    V: Clone + Send + Sync,
79{
80    /// Construct a cache with the given capacity, TTL, and Clock.
81    pub fn new(capacity: NonZeroUsize, ttl: Duration, clock: Arc<dyn Clock>) -> Self {
82        Self {
83            inner: Mutex::new(LruCache::new(capacity)),
84            inflight: Mutex::new(HashMap::new()),
85            clock,
86            ttl,
87            stats: CacheCounters::default(),
88        }
89    }
90
91    /// Snapshot of the live observability counters (hits, misses, evictions,
92    /// single-flight joins, …). See [`CacheStats`] for field semantics.
93    ///
94    /// Cheap (atomic loads only); safe to call on the hot path or from a
95    /// metrics scrape endpoint.
96    pub fn stats(&self) -> CacheStats {
97        self.stats.snapshot()
98    }
99
100    /// Reset every counter to zero. Useful for tests and for resetting a
101    /// metrics window after a scrape; production telemetry pipelines
102    /// typically just take rate-of-change of the monotonic counters and
103    /// shouldn't need this.
104    pub fn reset_stats(&self) {
105        self.stats.reset();
106    }
107
108    /// Look up a key. Returns `None` if absent or expired (and removes
109    /// the expired entry as a side effect; lazy TTL sweep).
110    pub fn get(&self, key: &K) -> Option<V> {
111        let now_micros = self.clock.now().timestamp_micros();
112        let mut guard = self.inner.lock();
113        // LruCache::get updates recency, which is what we want on a hit;
114        // on an expired-hit we then `pop` to evict.
115        if let Some(entry) = guard.get(key) {
116            if entry.expires_at_micros >= now_micros {
117                let v = entry.value.clone();
118                drop(guard);
119                self.stats.hits.fetch_add(1, Ordering::Relaxed);
120                return Some(v);
121            }
122        }
123        // Either absent or stale. `pop` is a no-op for the absent case;
124        // we count both as a miss (TTL-expired hits are semantically
125        // misses to the caller; they get None).
126        guard.pop(key);
127        drop(guard);
128        self.stats.misses.fetch_add(1, Ordering::Relaxed);
129        None
130    }
131
132    /// Insert or replace a value with the configured TTL.
133    pub fn insert(&self, key: K, value: V) {
134        let now_micros = self.clock.now().timestamp_micros();
135        let expires_at_micros = now_micros.saturating_add(self.ttl.as_micros() as i64);
136        let mut guard = self.inner.lock();
137        let len_before = guard.len();
138        let cap = guard.cap().get();
139        guard.put(
140            key,
141            Entry {
142                value,
143                expires_at_micros,
144            },
145        );
146        // LruCache::put on a full cache evicts the LRU entry to make room.
147        // Detect the eviction by checking that we hit (or were already at)
148        // capacity and the cache size didn't grow. Counts both
149        // "displaced existing key" and "popped LRU" paths; the former
150        // is also useful telemetry (tells you about overwrites under
151        // capacity pressure).
152        let inserted_displaced = len_before >= cap;
153        drop(guard);
154        self.stats.inserts.fetch_add(1, Ordering::Relaxed);
155        if inserted_displaced {
156            self.stats
157                .capacity_evictions
158                .fetch_add(1, Ordering::Relaxed);
159        }
160    }
161
162    /// Remove a single entry. Also removes any in-flight load cell for
163    /// the same key, so a concurrent
164    /// [`get_or_try_insert_with`](Self::get_or_try_insert_with) load that
165    /// resolves *after* this call cannot re-cache the now-invalidated
166    /// value. Returns `true` if anything was removed (cache row, in-flight
167    /// cell, or both).
168    ///
169    /// # Concurrency
170    ///
171    /// Two separate critical sections: inflight first, then LRU.
172    /// Lock-ordering rule (inflight before LRU) is preserved. A concurrent
173    /// load whose post-resolve runs *between* our inflight removal and
174    /// our LRU removal sees its cell missing from inflight and skips the
175    /// LRU promotion (see `get_or_try_insert_with`'s `Ok` arm). A
176    /// concurrent load whose post-resolve runs *before* our inflight
177    /// removal already inserted into LRU, which our LRU pop then
178    /// removes. Both paths converge on "key is not cached afterward."
179    pub fn invalidate(&self, key: &K) -> bool {
180        let inflight_removed = self.inflight.lock().remove(key).is_some();
181        let lru_removed = self.inner.lock().pop(key).is_some();
182        let removed = inflight_removed || lru_removed;
183        if removed {
184            self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
185        }
186        removed
187    }
188
189    /// Remove every cache entry whose key matches `predicate`. Also
190    /// removes matching in-flight load cells so concurrent loads
191    /// resolving after this call cannot re-cache invalidated values.
192    /// Returns the number of cache entries removed (LRU entries only;
193    /// in-flight removals are bookkeeping side effects).
194    ///
195    /// O(n) over the cache plus O(m) over the in-flight map. Use for
196    /// "evict everything for principal X" after a role change; small N
197    /// for typical caches; if your cardinality is large enough that
198    /// this is too slow, consider maintaining a secondary index outside
199    /// this primitive.
200    #[tracing::instrument(
201        name = "axess_cache.invalidate_by",
202        skip(self, predicate),
203        fields(removed = tracing::field::Empty),
204    )]
205    pub fn invalidate_by(&self, predicate: impl Fn(&K) -> bool) -> usize {
206        // Inflight first, then LRU; lock-ordering rule.
207        {
208            let mut inflight = self.inflight.lock();
209            let to_remove: Vec<K> = inflight.keys().filter(|k| predicate(k)).cloned().collect();
210            for k in &to_remove {
211                inflight.remove(k);
212            }
213        }
214
215        let mut guard = self.inner.lock();
216        let to_remove: Vec<K> = guard
217            .iter()
218            .filter_map(|(k, _)| if predicate(k) { Some(k.clone()) } else { None })
219            .collect();
220        let count = to_remove.len();
221        for k in &to_remove {
222            guard.pop(k);
223        }
224        drop(guard);
225        if count > 0 {
226            self.stats
227                .invalidations
228                .fetch_add(count as u64, Ordering::Relaxed);
229        }
230        tracing::Span::current().record("removed", count);
231        count
232    }
233
234    /// Drop every entry from the cache. Also drops every in-flight load
235    /// cell, so any load resolving after this call is barred from
236    /// promoting its value to the cache (post-`invalidate_all` cache
237    /// state must be empty).
238    #[tracing::instrument(
239        name = "axess_cache.invalidate_all",
240        skip(self),
241        fields(removed = tracing::field::Empty),
242    )]
243    pub fn invalidate_all(&self) {
244        // Inflight first, then LRU; lock-ordering rule.
245        self.inflight.lock().clear();
246
247        let mut guard = self.inner.lock();
248        let count = guard.len();
249        guard.clear();
250        drop(guard);
251        if count > 0 {
252            self.stats
253                .invalidations
254                .fetch_add(count as u64, Ordering::Relaxed);
255        }
256        tracing::Span::current().record("removed", count);
257    }
258
259    /// Walk the cache and remove every expired entry. Returns the number
260    /// of entries reclaimed.
261    ///
262    /// Optional: the cache evicts expired entries lazily on access via
263    /// [`get`](Self::get). Call this if you want to reclaim memory ahead
264    /// of next access (e.g. from a periodic Clock-aware scheduled task).
265    #[tracing::instrument(
266        name = "axess_cache.cleanup_expired",
267        skip(self),
268        fields(removed = tracing::field::Empty),
269    )]
270    pub fn cleanup_expired(&self) -> usize {
271        let now_micros = self.clock.now().timestamp_micros();
272        let mut guard = self.inner.lock();
273        let to_remove: Vec<K> = guard
274            .iter()
275            .filter_map(|(k, e)| {
276                if e.expires_at_micros < now_micros {
277                    Some(k.clone())
278                } else {
279                    None
280                }
281            })
282            .collect();
283        let count = to_remove.len();
284        for k in &to_remove {
285            guard.pop(k);
286        }
287        drop(guard);
288        if count > 0 {
289            self.stats
290                .invalidations
291                .fetch_add(count as u64, Ordering::Relaxed);
292        }
293        tracing::Span::current().record("removed", count);
294        count
295    }
296
297    /// Load `key` from the cache, or run `fetcher` to populate it.
298    /// Single-flight: concurrent calls for the same key share one fetcher
299    /// invocation, and the rest await the in-flight cell.
300    ///
301    /// # Errors
302    ///
303    /// On `fetcher` error the in-flight cell is removed so subsequent
304    /// callers retry rather than re-await a known-failing future. The
305    /// error is propagated to every concurrent caller that joined this
306    /// in-flight load.
307    ///
308    /// # Panic safety
309    ///
310    /// If `fetcher` panics, the in-flight cell is removed via an RAII
311    /// guard whose `Drop` runs during panic-unwind. Subsequent callers
312    /// for the same key get a fresh fetcher invocation rather than
313    /// joining a permanently-uninitialised cell.
314    ///
315    /// # Interaction with concurrent `invalidate`
316    ///
317    /// Invalidate wins against concurrent loads. If `invalidate` /
318    /// `invalidate_by` / `invalidate_all` runs while a load is in flight
319    /// for `key`, the loaded value is **not** promoted to the cache. The
320    /// caller that triggered the in-flight load (and joiners) still
321    /// receive the loaded value (the load started before the
322    /// invalidate), but the cache stays in its post-invalidate state and
323    /// the next lookup runs a fresh load. Aborting an in-flight load to
324    /// make joiners retry is not possible with [`tokio::sync::OnceCell`]
325    /// semantics.
326    ///
327    /// # Fetcher determinism
328    ///
329    /// Concurrent callers may each pass a different `fetcher` closure,
330    /// but only the first one to reach the cell is invoked; the rest are
331    /// dropped. Callers should pass fetchers that are pure functions of
332    /// the key, otherwise only the winning caller's side effects fire.
333    #[tracing::instrument(
334        name = "axess_cache.get_or_try_insert_with",
335        skip(self, key, fetcher),
336        fields(joined = tracing::field::Empty, error = tracing::field::Empty),
337    )]
338    pub async fn get_or_try_insert_with<F, Fut, E>(&self, key: K, fetcher: F) -> Result<V, E>
339    where
340        F: FnOnce() -> Fut,
341        Fut: std::future::Future<Output = Result<V, E>>,
342    {
343        // Fast path: cache hit.
344        if let Some(v) = self.get(&key) {
345            return Ok(v);
346        }
347
348        // Slow path: claim or join an in-flight cell.
349        let (cell, joined) = {
350            let mut inflight = self.inflight.lock();
351            match inflight.get(&key) {
352                Some(existing) => (existing.clone(), true),
353                None => {
354                    let cell = Arc::new(OnceCell::<V>::new());
355                    inflight.insert(key.clone(), cell.clone());
356                    (cell, false)
357                }
358            }
359        };
360        tracing::Span::current().record("joined", joined);
361        if joined {
362            self.stats
363                .single_flight_joins
364                .fetch_add(1, Ordering::Relaxed);
365        }
366
367        // RAII cleanup of the in-flight slot. Drops on Ok / Err / panic-
368        // unwind, ensuring the slot doesn't leak even if `fetcher` panics
369        // mid-await. Same `Arc::ptr_eq`-gated removal as before.
370        let inflight_guard = InflightGuard {
371            cache: self,
372            key: key.clone(),
373            cell: cell.clone(),
374        };
375
376        // Race-resolve: `OnceCell::get_or_try_init` ensures exactly one
377        // closure runs across all concurrent callers for this cell.
378        // Subsequent callers `.await` the same future internally.
379        let result: Result<&V, E> = cell.get_or_try_init(fetcher).await;
380
381        let outcome = match result {
382            Ok(v_ref) => {
383                let v = v_ref.clone();
384                // Atomic publication against `invalidate*`: hold the
385                // inflight lock while we (a) check that our cell is still
386                // the active one and (b) promote to LRU. Since invalidate
387                // *must* take inflight first (lock-ordering rule), it
388                // cannot interleave between our check and our LRU insert.
389                {
390                    let mut inflight = self.inflight.lock();
391                    if let Some(existing) = inflight.get(&key)
392                        && Arc::ptr_eq(existing, &cell)
393                    {
394                        // Still our cell. Insert into LRU under inflight
395                        // hold so a racing invalidate is forced to wait;
396                        // when it does run, it'll then remove the just-
397                        // inserted LRU entry, converging on "invalidated."
398                        // Critically: when invalidate runs *during* our
399                        // load (before this point), the cell is removed
400                        // from inflight, so this `if let Some(...)` won't
401                        // match and we skip the promotion entirely.
402                        self.insert(key.clone(), v.clone());
403                        inflight.remove(&key);
404                    }
405                    // else: either invalidate ran during our load, or
406                    // another caller's cleanup ran first. Either way,
407                    // refuse to promote.
408                };
409                // Guard's Drop is now a no-op for the active path (we
410                // removed our own slot); for the !active path it's also
411                // a no-op (slot was already gone).
412                Ok(v)
413            }
414            Err(e) => {
415                self.stats
416                    .single_flight_errors
417                    .fetch_add(1, Ordering::Relaxed);
418                tracing::Span::current().record("error", true);
419                // Guard's Drop removes the slot so retries get a fresh
420                // fetcher rather than re-awaiting the failed future.
421                Err(e)
422            }
423        };
424        drop(inflight_guard);
425        outcome
426    }
427
428    /// Current number of entries (including any that are expired but not
429    /// yet evicted by a read or [`cleanup_expired`](Self::cleanup_expired)).
430    pub fn len(&self) -> usize {
431        self.inner.lock().len()
432    }
433
434    /// `true` if the cache holds zero entries.
435    pub fn is_empty(&self) -> bool {
436        self.inner.lock().len() == 0
437    }
438
439    /// Configured capacity (LRU bound).
440    pub fn capacity(&self) -> NonZeroUsize {
441        self.inner.lock().cap()
442    }
443
444    /// Number of in-flight loads currently waiting to resolve.
445    ///
446    /// Useful for ops monitoring ("is this pod stuck on slow fetchers?")
447    /// and for tests that verify `get_or_try_insert_with` cleans up its
448    /// in-flight cells correctly even on panic-unwind.
449    pub fn pending_loads_count(&self) -> usize {
450        self.inflight.lock().len()
451    }
452}
453
454/// RAII guard that removes an in-flight cell from the cache's
455/// pending-loads map on drop (including panic-unwind), gated on
456/// `Arc::ptr_eq` so a slot replaced mid-resolve is left intact.
457struct InflightGuard<'a, K, V>
458where
459    K: Hash + Eq + Clone + Send + Sync,
460    V: Clone + Send + Sync,
461{
462    cache: &'a ClockTtlCache<K, V>,
463    key: K,
464    cell: Arc<OnceCell<V>>,
465}
466
467impl<K, V> Drop for InflightGuard<'_, K, V>
468where
469    K: Hash + Eq + Clone + Send + Sync,
470    V: Clone + Send + Sync,
471{
472    fn drop(&mut self) {
473        let mut inflight = self.cache.inflight.lock();
474        if let Some(existing) = inflight.get(&self.key)
475            && Arc::ptr_eq(existing, &self.cell)
476        {
477            inflight.remove(&self.key);
478        }
479    }
480}