Skip to main content

iqdb_cache/
cached.rs

1//! The [`CachedIndex`] wrapper.
2
3use core::time::Duration;
4use std::sync::Arc;
5use std::sync::Mutex;
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use clock_lib::{Clock, Monotonic, SystemClock};
9use iqdb_index::{IndexCore, IndexStats};
10use iqdb_types::{DistanceMetric, Hit, Metadata, Result, SearchParams, VectorId};
11
12use crate::config::{CacheConfig, EvictionPolicy};
13use crate::key::ResultKey;
14use crate::policy::PolicyCache;
15use crate::stats::CacheStats;
16
17/// A cached search result plus the moment it was stored.
18///
19/// `stamp` is recorded only when a TTL is configured; with no TTL it is `None`
20/// and the entry never expires on time.
21struct CacheEntry {
22    /// The memoized hits, ready to clone out on a hit.
23    hits: Box<[Hit]>,
24    /// When the entry was written, for TTL expiry; `None` when no TTL applies.
25    stamp: Option<Monotonic>,
26}
27
28/// A drop-in [`IndexCore`] wrapper that memoizes search results.
29///
30/// `CachedIndex` holds any `I: IndexCore` and forwards every call to it, with
31/// one addition: identical [`search`](IndexCore::search) calls — same query
32/// and same [`SearchParams`] — are served from an in-memory LRU cache instead
33/// of re-running the search. Because it *is* an [`IndexCore`], it slots in
34/// anywhere the wrapped index does, including behind `Box<dyn IndexCore>`.
35///
36/// ## Correctness
37///
38/// The cache never returns a stale result. Every mutation that can change the
39/// search space — [`insert`](IndexCore::insert),
40/// [`insert_batch`](IndexCore::insert_batch), and
41/// [`delete`](IndexCore::delete) — invalidates the cache, so a search after a
42/// write always recomputes against the current index. Operations that do not
43/// change the result set ([`flush`](IndexCore::flush) and the read-only
44/// accessors) leave the cache intact.
45///
46/// ## Opt-in
47///
48/// Caching is an optimization a caller chooses by wrapping an index; the
49/// database leaves indexes unwrapped by default. Construct a cache that holds
50/// a fixed number of recent searches with [`new`](CachedIndex::new) or
51/// [`with_capacity`](CachedIndex::with_capacity), or tune it through a
52/// [`CacheConfig`] with [`with_config`](CachedIndex::with_config). A capacity of
53/// `0` disables caching entirely: every search passes straight through, which is
54/// useful for A/B measuring the cache's effect without changing call sites.
55///
56/// ## Time-to-live
57///
58/// A [`CacheConfig::ttl`] gives entries an expiry: a cached result older than
59/// the TTL is treated as a miss and recomputed. Mutations through this wrapper
60/// already invalidate exactly, so the TTL exists to bound staleness from changes
61/// the wrapper *cannot* see — for example, the wrapped index mutated through
62/// another handle. With no TTL (the default) the clock is never consulted.
63///
64/// ## Concurrency
65///
66/// `CachedIndex<I>` is `Send + Sync` whenever `I` is (which every `IndexCore`
67/// is). Reads share the cache behind a [`Mutex`] held only for the lookup and
68/// the insert — never across the wrapped search — so concurrent misses run the
69/// underlying search in parallel rather than serializing on the lock.
70///
71/// # Examples
72///
73/// ```
74/// use std::sync::Arc;
75///
76/// use iqdb_cache::CachedIndex;
77/// use iqdb_index::{Index, IndexCore, IndexStats};
78/// use iqdb_types::{DistanceMetric, Hit, IqdbError, Metadata, Result, SearchParams, VectorId};
79///
80/// // A minimal index that returns one hit per search; enough to show the wrap.
81/// struct Stub {
82///     dim: usize,
83///     metric: DistanceMetric,
84///     ids: Vec<VectorId>,
85/// }
86///
87/// impl IndexCore for Stub {
88///     fn insert(&mut self, id: VectorId, _v: Arc<[f32]>, _m: Option<Metadata>) -> Result<()> {
89///         self.ids.push(id);
90///         Ok(())
91///     }
92///     fn delete(&mut self, id: &VectorId) -> Result<()> {
93///         match self.ids.iter().position(|x| x == id) {
94///             Some(pos) => { let _ = self.ids.remove(pos); Ok(()) }
95///             None => Err(IqdbError::NotFound),
96///         }
97///     }
98///     fn search(&self, _q: &[f32], params: &SearchParams) -> Result<Vec<Hit>> {
99///         Ok(self.ids.iter().take(params.k).map(|id| Hit::new(id.clone(), 0.0)).collect())
100///     }
101///     fn len(&self) -> usize { self.ids.len() }
102///     fn dim(&self) -> usize { self.dim }
103///     fn metric(&self) -> DistanceMetric { self.metric }
104///     fn flush(&mut self) -> Result<()> { Ok(()) }
105///     fn stats(&self) -> IndexStats {
106///         IndexStats { n_vectors: self.ids.len(), index_type: "stub", ..IndexStats::default() }
107///     }
108/// }
109///
110/// # fn main() -> Result<()> {
111/// let stub = Stub { dim: 3, metric: DistanceMetric::Cosine, ids: vec![VectorId::from(1u64)] };
112/// let mut cached = CachedIndex::new(stub);
113///
114/// let params = SearchParams::new(1, DistanceMetric::Cosine);
115/// let first = cached.search(&[1.0, 0.0, 0.0], &params)?;  // miss: runs the search
116/// let again = cached.search(&[1.0, 0.0, 0.0], &params)?;  // hit: served from cache
117/// assert_eq!(first, again);
118///
119/// let stats = cached.cache_stats();
120/// assert_eq!(stats.hits, 1);
121/// assert_eq!(stats.misses, 1);
122/// # Ok(())
123/// # }
124/// ```
125pub struct CachedIndex<I> {
126    /// The wrapped index every call forwards to.
127    inner: I,
128    /// The result cache, guarded for `&self` search access.
129    cache: Mutex<PolicyCache<ResultKey, CacheEntry>>,
130    /// Configured capacity, mirrored here for `0`-means-disabled fast paths.
131    capacity: usize,
132    /// Configured eviction policy, mirrored here for introspection.
133    policy: EvictionPolicy,
134    /// Optional per-entry time-to-live; `None` means entries expire only on
135    /// mutation.
136    ttl: Option<Duration>,
137    /// Time source for TTL expiry. `SystemClock` in production; a mock clock is
138    /// injected in tests. Only read when `ttl` is `Some`.
139    clock: Arc<dyn Clock>,
140    /// Lifetime count of cache hits.
141    hits: AtomicU64,
142    /// Lifetime count of cache misses.
143    misses: AtomicU64,
144    /// Lifetime count of entries discarded by the eviction policy.
145    evictions: AtomicU64,
146}
147
148impl<I: IndexCore> CachedIndex<I> {
149    /// Wraps `inner` with a result cache of the default capacity (1024 recent
150    /// searches) and no TTL.
151    ///
152    /// # Examples
153    ///
154    /// ```
155    /// # use iqdb_cache::CachedIndex;
156    /// # use iqdb_cache::doc_stub::stub_index;
157    /// let cached = CachedIndex::new(stub_index());
158    /// assert!(cached.is_enabled());
159    /// ```
160    #[must_use]
161    pub fn new(inner: I) -> Self {
162        Self::with_config(inner, CacheConfig::new())
163    }
164
165    /// Wraps `inner` with a result cache that holds at most `capacity` recent
166    /// searches and no TTL.
167    ///
168    /// A `capacity` of `0` disables caching: searches pass straight through and
169    /// nothing is stored.
170    ///
171    /// # Examples
172    ///
173    /// ```
174    /// # use iqdb_cache::CachedIndex;
175    /// # use iqdb_cache::doc_stub::stub_index;
176    /// let cached = CachedIndex::with_capacity(stub_index(), 256);
177    /// assert_eq!(cached.capacity(), 256);
178    ///
179    /// let bypass = CachedIndex::with_capacity(stub_index(), 0);
180    /// assert!(!bypass.is_enabled());
181    /// ```
182    #[must_use]
183    pub fn with_capacity(inner: I, capacity: usize) -> Self {
184        Self::with_config(inner, CacheConfig::new().capacity(capacity))
185    }
186
187    /// Wraps `inner` with a result cache built from `config` (the Tier-2 path).
188    ///
189    /// Use [`CacheConfig`] to set the capacity and an optional TTL together.
190    ///
191    /// # Examples
192    ///
193    /// ```
194    /// use std::time::Duration;
195    ///
196    /// use iqdb_cache::{CacheConfig, CachedIndex};
197    /// # use iqdb_cache::doc_stub::stub_index;
198    /// let config = CacheConfig::new().capacity(512).ttl(Duration::from_secs(30));
199    /// let cached = CachedIndex::with_config(stub_index(), config);
200    /// assert_eq!(cached.capacity(), 512);
201    /// assert_eq!(cached.ttl(), Some(Duration::from_secs(30)));
202    /// ```
203    #[must_use]
204    pub fn with_config(inner: I, config: CacheConfig) -> Self {
205        Self::with_config_in(inner, config, Arc::new(SystemClock::new()))
206    }
207
208    /// Construction core shared by every public constructor, with an injectable
209    /// clock for deterministic TTL tests.
210    pub(crate) fn with_config_in(inner: I, config: CacheConfig, clock: Arc<dyn Clock>) -> Self {
211        Self {
212            inner,
213            cache: Mutex::new(PolicyCache::new(config.policy, config.capacity)),
214            capacity: config.capacity,
215            policy: config.policy,
216            ttl: config.ttl,
217            clock,
218            hits: AtomicU64::new(0),
219            misses: AtomicU64::new(0),
220            evictions: AtomicU64::new(0),
221        }
222    }
223
224    /// The configured cache capacity. `0` means caching is disabled.
225    #[inline]
226    #[must_use]
227    pub fn capacity(&self) -> usize {
228        self.capacity
229    }
230
231    /// The configured per-entry time-to-live, or `None` if results expire only
232    /// on mutation.
233    #[inline]
234    #[must_use]
235    pub fn ttl(&self) -> Option<Duration> {
236        self.ttl
237    }
238
239    /// The configured eviction policy.
240    #[inline]
241    #[must_use]
242    pub fn policy(&self) -> EvictionPolicy {
243        self.policy
244    }
245
246    /// Whether caching is active (`capacity > 0`).
247    #[inline]
248    #[must_use]
249    pub fn is_enabled(&self) -> bool {
250        self.capacity > 0
251    }
252
253    /// Borrows the wrapped index.
254    #[inline]
255    #[must_use]
256    pub fn get_ref(&self) -> &I {
257        &self.inner
258    }
259
260    /// Unwraps the cache, returning the index it held.
261    ///
262    /// # Examples
263    ///
264    /// ```
265    /// # use iqdb_cache::CachedIndex;
266    /// # use iqdb_cache::doc_stub::stub_index;
267    /// # use iqdb_index::IndexCore;
268    /// let cached = CachedIndex::new(stub_index());
269    /// let inner = cached.into_inner();
270    /// assert_eq!(inner.dim(), 3);
271    /// ```
272    #[must_use]
273    pub fn into_inner(self) -> I {
274        self.inner
275    }
276
277    /// Drops every cached result, keeping the wrapped index untouched.
278    ///
279    /// Mutations invalidate automatically; call this only to force a cold cache
280    /// (for example, after the wrapped index was changed through a handle other
281    /// than this wrapper).
282    pub fn clear_cache(&mut self) {
283        match self.cache.get_mut() {
284            Ok(cache) => cache.clear(),
285            Err(poisoned) => poisoned.into_inner().clear(),
286        }
287    }
288
289    /// A snapshot of the cache's hit/miss counters and occupancy.
290    #[must_use]
291    pub fn cache_stats(&self) -> CacheStats {
292        let len = self.lock_cache().len();
293        CacheStats {
294            hits: self.hits.load(Ordering::Relaxed),
295            misses: self.misses.load(Ordering::Relaxed),
296            evictions: self.evictions.load(Ordering::Relaxed),
297            len,
298            capacity: self.capacity,
299        }
300    }
301
302    /// Locks the cache, recovering the guard if a previous holder panicked.
303    ///
304    /// A poisoned result cache is safe to keep using: a half-finished insert
305    /// can at worst drop or duplicate a memoized entry, never corrupt a result,
306    /// so recovery is preferable to propagating the panic.
307    fn lock_cache(&self) -> std::sync::MutexGuard<'_, PolicyCache<ResultKey, CacheEntry>> {
308        self.cache
309            .lock()
310            .unwrap_or_else(|poisoned| poisoned.into_inner())
311    }
312
313    /// Whether a cached entry is still live under the configured TTL.
314    ///
315    /// Always `true` when no TTL is set, so the clock is never read on the
316    /// non-TTL hot path.
317    #[inline]
318    fn is_live(&self, entry: &CacheEntry) -> bool {
319        match (self.ttl, entry.stamp) {
320            (Some(ttl), Some(stamp)) => self.clock.now().saturating_duration_since(stamp) < ttl,
321            _ => true,
322        }
323    }
324
325    /// Empties the cache through `&mut self` after a mutation.
326    fn invalidate(&mut self) {
327        // `&mut self` guarantees exclusive access, so no lock is contended.
328        match self.cache.get_mut() {
329            Ok(cache) => cache.clear(),
330            Err(poisoned) => poisoned.into_inner().clear(),
331        }
332    }
333}
334
335impl<I: IndexCore> IndexCore for CachedIndex<I> {
336    fn insert(
337        &mut self,
338        id: VectorId,
339        vector: std::sync::Arc<[f32]>,
340        metadata: Option<Metadata>,
341    ) -> Result<()> {
342        let result = self.inner.insert(id, vector, metadata);
343        if result.is_ok() {
344            self.invalidate();
345        }
346        result
347    }
348
349    fn insert_batch(
350        &mut self,
351        items: Vec<(VectorId, std::sync::Arc<[f32]>, Option<Metadata>)>,
352    ) -> Result<()> {
353        // `insert_batch` is fail-fast and may apply partially, so any outcome
354        // can have changed the search space: always invalidate.
355        let result = self.inner.insert_batch(items);
356        self.invalidate();
357        result
358    }
359
360    fn delete(&mut self, id: &VectorId) -> Result<()> {
361        let result = self.inner.delete(id);
362        if result.is_ok() {
363            self.invalidate();
364        }
365        result
366    }
367
368    fn search(&self, query: &[f32], params: &SearchParams) -> Result<Vec<Hit>> {
369        if self.capacity == 0 {
370            let _ = self.misses.fetch_add(1, Ordering::Relaxed);
371            return self.inner.search(query, params);
372        }
373
374        let key = ResultKey::new(query, params);
375        {
376            let mut cache = self.lock_cache();
377            if let Some(entry) = cache.get(&key) {
378                if self.is_live(entry) {
379                    let _ = self.hits.fetch_add(1, Ordering::Relaxed);
380                    return Ok(entry.hits.to_vec());
381                }
382                // Expired: fall through to recompute. The stale entry stays
383                // until the `put` below overwrites it with a fresh result.
384            }
385        }
386
387        // Miss (or expired): run the search without holding the lock so
388        // concurrent misses do not serialize on it.
389        let hits = self.inner.search(query, params)?;
390        let _ = self.misses.fetch_add(1, Ordering::Relaxed);
391        let stamp = self.ttl.map(|_| self.clock.now());
392        let evicted = {
393            let mut cache = self.lock_cache();
394            cache.put(
395                key,
396                CacheEntry {
397                    hits: hits.clone().into_boxed_slice(),
398                    stamp,
399                },
400            )
401        };
402        if evicted {
403            let _ = self.evictions.fetch_add(1, Ordering::Relaxed);
404        }
405        Ok(hits)
406    }
407
408    fn len(&self) -> usize {
409        self.inner.len()
410    }
411
412    fn is_empty(&self) -> bool {
413        self.inner.is_empty()
414    }
415
416    fn dim(&self) -> usize {
417        self.inner.dim()
418    }
419
420    fn metric(&self) -> DistanceMetric {
421        self.inner.metric()
422    }
423
424    fn flush(&mut self) -> Result<()> {
425        // Flush commits durable state without changing the searchable set, so
426        // the cache stays valid.
427        self.inner.flush()
428    }
429
430    fn stats(&self) -> IndexStats {
431        self.inner.stats()
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    #![allow(clippy::unwrap_used)]
438
439    use clock_lib::ManualClock;
440
441    use super::*;
442    use crate::doc_stub::stub_index;
443
444    fn params() -> SearchParams {
445        SearchParams::new(1, DistanceMetric::Cosine)
446    }
447
448    #[test]
449    fn ttl_entry_is_recomputed_after_expiry() {
450        let clock = Arc::new(ManualClock::new());
451        let config = CacheConfig::new().capacity(8).ttl(Duration::from_secs(60));
452        let cached = CachedIndex::with_config_in(stub_index(), config, clock.clone());
453
454        let _miss = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
455        let _hit = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
456        assert_eq!(cached.cache_stats().hits, 1);
457
458        // Just inside the TTL: still a hit.
459        clock.advance(Duration::from_secs(59));
460        let _hit2 = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
461        assert_eq!(cached.cache_stats().hits, 2);
462
463        // Past the TTL: the entry expires and the search recomputes (a miss).
464        clock.advance(Duration::from_secs(2));
465        let _expired = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
466        assert_eq!(cached.cache_stats().hits, 2);
467        assert_eq!(cached.cache_stats().misses, 2);
468
469        // The recompute refreshed the entry, so the next search hits again.
470        let _hit3 = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
471        assert_eq!(cached.cache_stats().hits, 3);
472    }
473
474    #[test]
475    fn ttl_boundary_is_exclusive() {
476        let clock = Arc::new(ManualClock::new());
477        let config = CacheConfig::new().capacity(8).ttl(Duration::from_secs(10));
478        let cached = CachedIndex::with_config_in(stub_index(), config, clock.clone());
479
480        let _miss = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
481        // Exactly at the TTL counts as expired (`elapsed >= ttl`).
482        clock.advance(Duration::from_secs(10));
483        let _again = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
484        assert_eq!(cached.cache_stats().hits, 0);
485        assert_eq!(cached.cache_stats().misses, 2);
486    }
487
488    #[test]
489    fn no_ttl_never_expires_even_as_time_passes() {
490        let clock = Arc::new(ManualClock::new());
491        let config = CacheConfig::new().capacity(8); // no TTL
492        let cached = CachedIndex::with_config_in(stub_index(), config, clock.clone());
493
494        let _miss = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
495        let _hit = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
496        // Advance far beyond any plausible TTL: still a hit, because none is set.
497        clock.advance(Duration::from_secs(60 * 60 * 24 * 365));
498        let _hit2 = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
499        assert_eq!(cached.cache_stats().hits, 2);
500        assert_eq!(cached.cache_stats().misses, 1);
501    }
502}