Skip to main content

iqdb_cache/
cached.rs

1//! The [`CachedIndex`] wrapper.
2
3use core::time::Duration;
4use std::sync::Arc;
5
6use clock_lib::{Clock, Monotonic, SystemClock};
7use iqdb_index::{IndexCore, IndexStats};
8use iqdb_types::{DistanceMetric, Hit, Metadata, Result, SearchParams, VectorId};
9
10use crate::config::{CacheConfig, EvictionPolicy};
11use crate::key::ResultKey;
12use crate::policy::PolicyCache;
13use crate::stats::CacheStats;
14use crate::sync::{AtomicU64, Mutex, MutexGuard, Ordering};
15
16/// A cached search result plus the moment it was stored.
17///
18/// `stamp` is recorded only when a TTL is configured; with no TTL it is `None`
19/// and the entry never expires on time.
20struct CacheEntry {
21    /// The memoized hits, ready to clone out on a hit.
22    hits: Box<[Hit]>,
23    /// When the entry was written, for TTL expiry; `None` when no TTL applies.
24    stamp: Option<Monotonic>,
25}
26
27/// A drop-in [`IndexCore`] wrapper that memoizes search results.
28///
29/// `CachedIndex` holds any `I: IndexCore` and forwards every call to it, with
30/// one addition: identical [`search`](IndexCore::search) calls — same query
31/// and same [`SearchParams`] — are served from an in-memory LRU cache instead
32/// of re-running the search. Because it *is* an [`IndexCore`], it slots in
33/// anywhere the wrapped index does, including behind `Box<dyn IndexCore>`.
34///
35/// ## Correctness
36///
37/// The cache never returns a stale result. Every mutation that can change the
38/// search space — [`insert`](IndexCore::insert),
39/// [`insert_batch`](IndexCore::insert_batch), and
40/// [`delete`](IndexCore::delete) — invalidates the cache, so a search after a
41/// write always recomputes against the current index. Operations that do not
42/// change the result set ([`flush`](IndexCore::flush) and the read-only
43/// accessors) leave the cache intact.
44///
45/// ## Opt-in
46///
47/// Caching is an optimization a caller chooses by wrapping an index; the
48/// database leaves indexes unwrapped by default. Construct a cache that holds
49/// a fixed number of recent searches with [`new`](CachedIndex::new) or
50/// [`with_capacity`](CachedIndex::with_capacity), or tune it through a
51/// [`CacheConfig`] with [`with_config`](CachedIndex::with_config). A capacity of
52/// `0` disables caching entirely: every search passes straight through, which is
53/// useful for A/B measuring the cache's effect without changing call sites.
54///
55/// ## Time-to-live
56///
57/// A [`CacheConfig::ttl`] gives entries an expiry: a cached result older than
58/// the TTL is treated as a miss and recomputed. Mutations through this wrapper
59/// already invalidate exactly, so the TTL exists to bound staleness from changes
60/// the wrapper *cannot* see — for example, the wrapped index mutated through
61/// another handle. With no TTL (the default) the clock is never consulted.
62///
63/// ## Concurrency
64///
65/// `CachedIndex<I>` is `Send + Sync` whenever `I` is (which every `IndexCore`
66/// is). Reads share the cache behind a [`Mutex`] held only for the lookup and
67/// the insert — never across the wrapped search — so concurrent misses run the
68/// underlying search in parallel rather than serializing on the lock.
69///
70/// # Examples
71///
72/// ```
73/// use std::sync::Arc;
74///
75/// use iqdb_cache::CachedIndex;
76/// use iqdb_index::{Index, IndexCore, IndexStats};
77/// use iqdb_types::{DistanceMetric, Hit, IqdbError, Metadata, Result, SearchParams, VectorId};
78///
79/// // A minimal index that returns one hit per search; enough to show the wrap.
80/// struct Stub {
81///     dim: usize,
82///     metric: DistanceMetric,
83///     ids: Vec<VectorId>,
84/// }
85///
86/// impl IndexCore for Stub {
87///     fn insert(&mut self, id: VectorId, _v: Arc<[f32]>, _m: Option<Metadata>) -> Result<()> {
88///         self.ids.push(id);
89///         Ok(())
90///     }
91///     fn delete(&mut self, id: &VectorId) -> Result<()> {
92///         match self.ids.iter().position(|x| x == id) {
93///             Some(pos) => { let _ = self.ids.remove(pos); Ok(()) }
94///             None => Err(IqdbError::NotFound),
95///         }
96///     }
97///     fn search(&self, _q: &[f32], params: &SearchParams) -> Result<Vec<Hit>> {
98///         Ok(self.ids.iter().take(params.k).map(|id| Hit::new(id.clone(), 0.0)).collect())
99///     }
100///     fn len(&self) -> usize { self.ids.len() }
101///     fn dim(&self) -> usize { self.dim }
102///     fn metric(&self) -> DistanceMetric { self.metric }
103///     fn flush(&mut self) -> Result<()> { Ok(()) }
104///     fn stats(&self) -> IndexStats {
105///         IndexStats { n_vectors: self.ids.len(), index_type: "stub", ..IndexStats::default() }
106///     }
107/// }
108///
109/// # fn main() -> Result<()> {
110/// let stub = Stub { dim: 3, metric: DistanceMetric::Cosine, ids: vec![VectorId::from(1u64)] };
111/// let mut cached = CachedIndex::new(stub);
112///
113/// let params = SearchParams::new(1, DistanceMetric::Cosine);
114/// let first = cached.search(&[1.0, 0.0, 0.0], &params)?;  // miss: runs the search
115/// let again = cached.search(&[1.0, 0.0, 0.0], &params)?;  // hit: served from cache
116/// assert_eq!(first, again);
117///
118/// let stats = cached.cache_stats();
119/// assert_eq!(stats.hits, 1);
120/// assert_eq!(stats.misses, 1);
121/// # Ok(())
122/// # }
123/// ```
124pub struct CachedIndex<I> {
125    /// The wrapped index every call forwards to.
126    inner: I,
127    /// The result cache, guarded for `&self` search access.
128    cache: Mutex<PolicyCache<ResultKey, CacheEntry>>,
129    /// Configured capacity, mirrored here for `0`-means-disabled fast paths.
130    capacity: usize,
131    /// Configured eviction policy, mirrored here for introspection.
132    policy: EvictionPolicy,
133    /// Optional per-entry time-to-live; `None` means entries expire only on
134    /// mutation.
135    ttl: Option<Duration>,
136    /// Time source for TTL expiry. `SystemClock` in production; a mock clock is
137    /// injected in tests. Only read when `ttl` is `Some`.
138    clock: Arc<dyn Clock>,
139    /// Lifetime count of cache hits.
140    hits: AtomicU64,
141    /// Lifetime count of cache misses.
142    misses: AtomicU64,
143    /// Lifetime count of entries discarded by the eviction policy.
144    evictions: AtomicU64,
145}
146
147impl<I: IndexCore> CachedIndex<I> {
148    /// Wraps `inner` with a result cache of the default capacity (1024 recent
149    /// searches) and no TTL.
150    ///
151    /// # Examples
152    ///
153    /// ```
154    /// # use iqdb_cache::CachedIndex;
155    /// # use iqdb_cache::doc_stub::stub_index;
156    /// let cached = CachedIndex::new(stub_index());
157    /// assert!(cached.is_enabled());
158    /// ```
159    #[must_use]
160    pub fn new(inner: I) -> Self {
161        Self::with_config(inner, CacheConfig::new())
162    }
163
164    /// Wraps `inner` with a result cache that holds at most `capacity` recent
165    /// searches and no TTL.
166    ///
167    /// A `capacity` of `0` disables caching: searches pass straight through and
168    /// nothing is stored.
169    ///
170    /// # Examples
171    ///
172    /// ```
173    /// # use iqdb_cache::CachedIndex;
174    /// # use iqdb_cache::doc_stub::stub_index;
175    /// let cached = CachedIndex::with_capacity(stub_index(), 256);
176    /// assert_eq!(cached.capacity(), 256);
177    ///
178    /// let bypass = CachedIndex::with_capacity(stub_index(), 0);
179    /// assert!(!bypass.is_enabled());
180    /// ```
181    #[must_use]
182    pub fn with_capacity(inner: I, capacity: usize) -> Self {
183        Self::with_config(inner, CacheConfig::new().capacity(capacity))
184    }
185
186    /// Wraps `inner` with a result cache built from `config` (the Tier-2 path).
187    ///
188    /// Use [`CacheConfig`] to set the capacity and an optional TTL together.
189    ///
190    /// # Examples
191    ///
192    /// ```
193    /// use std::time::Duration;
194    ///
195    /// use iqdb_cache::{CacheConfig, CachedIndex};
196    /// # use iqdb_cache::doc_stub::stub_index;
197    /// let config = CacheConfig::new().capacity(512).ttl(Duration::from_secs(30));
198    /// let cached = CachedIndex::with_config(stub_index(), config);
199    /// assert_eq!(cached.capacity(), 512);
200    /// assert_eq!(cached.ttl(), Some(Duration::from_secs(30)));
201    /// ```
202    #[must_use]
203    pub fn with_config(inner: I, config: CacheConfig) -> Self {
204        Self::with_config_in(inner, config, Arc::new(SystemClock::new()))
205    }
206
207    /// Construction core shared by every public constructor, with an injectable
208    /// clock for deterministic TTL tests.
209    pub(crate) fn with_config_in(inner: I, config: CacheConfig, clock: Arc<dyn Clock>) -> Self {
210        Self {
211            inner,
212            cache: Mutex::new(PolicyCache::new(config.policy, config.capacity)),
213            capacity: config.capacity,
214            policy: config.policy,
215            ttl: config.ttl,
216            clock,
217            hits: AtomicU64::new(0),
218            misses: AtomicU64::new(0),
219            evictions: AtomicU64::new(0),
220        }
221    }
222
223    /// The configured cache capacity. `0` means caching is disabled.
224    #[inline]
225    #[must_use]
226    pub fn capacity(&self) -> usize {
227        self.capacity
228    }
229
230    /// The configured per-entry time-to-live, or `None` if results expire only
231    /// on mutation.
232    #[inline]
233    #[must_use]
234    pub fn ttl(&self) -> Option<Duration> {
235        self.ttl
236    }
237
238    /// The configured eviction policy.
239    #[inline]
240    #[must_use]
241    pub fn policy(&self) -> EvictionPolicy {
242        self.policy
243    }
244
245    /// Whether caching is active (`capacity > 0`).
246    #[inline]
247    #[must_use]
248    pub fn is_enabled(&self) -> bool {
249        self.capacity > 0
250    }
251
252    /// Borrows the wrapped index.
253    #[inline]
254    #[must_use]
255    pub fn get_ref(&self) -> &I {
256        &self.inner
257    }
258
259    /// Unwraps the cache, returning the index it held.
260    ///
261    /// # Examples
262    ///
263    /// ```
264    /// # use iqdb_cache::CachedIndex;
265    /// # use iqdb_cache::doc_stub::stub_index;
266    /// # use iqdb_index::IndexCore;
267    /// let cached = CachedIndex::new(stub_index());
268    /// let inner = cached.into_inner();
269    /// assert_eq!(inner.dim(), 3);
270    /// ```
271    #[must_use]
272    pub fn into_inner(self) -> I {
273        self.inner
274    }
275
276    /// Drops every cached result, keeping the wrapped index untouched.
277    ///
278    /// Mutations invalidate automatically; call this only to force a cold cache
279    /// (for example, after the wrapped index was changed through a handle other
280    /// than this wrapper).
281    pub fn clear_cache(&mut self) {
282        self.lock_cache().clear();
283    }
284
285    /// A snapshot of the cache's hit/miss counters and occupancy.
286    #[must_use]
287    pub fn cache_stats(&self) -> CacheStats {
288        let len = self.lock_cache().len();
289        CacheStats {
290            hits: self.hits.load(Ordering::Relaxed),
291            misses: self.misses.load(Ordering::Relaxed),
292            evictions: self.evictions.load(Ordering::Relaxed),
293            len,
294            capacity: self.capacity,
295        }
296    }
297
298    /// Locks the cache, recovering the guard if a previous holder panicked.
299    ///
300    /// A poisoned result cache is safe to keep using: a half-finished insert
301    /// can at worst drop or duplicate a memoized entry, never corrupt a result,
302    /// so recovery is preferable to propagating the panic.
303    fn lock_cache(&self) -> MutexGuard<'_, PolicyCache<ResultKey, CacheEntry>> {
304        self.cache
305            .lock()
306            .unwrap_or_else(|poisoned| poisoned.into_inner())
307    }
308
309    /// Whether a cached entry is still live under the configured TTL.
310    ///
311    /// Always `true` when no TTL is set, so the clock is never read on the
312    /// non-TTL hot path.
313    #[inline]
314    fn is_live(&self, entry: &CacheEntry) -> bool {
315        match (self.ttl, entry.stamp) {
316            (Some(ttl), Some(stamp)) => self.clock.now().saturating_duration_since(stamp) < ttl,
317            _ => true,
318        }
319    }
320
321    /// Empties the cache after a mutation. Takes `&mut self`, so the lock is
322    /// always uncontended here.
323    fn invalidate(&mut self) {
324        self.lock_cache().clear();
325    }
326}
327
328impl<I: IndexCore> IndexCore for CachedIndex<I> {
329    fn insert(
330        &mut self,
331        id: VectorId,
332        vector: std::sync::Arc<[f32]>,
333        metadata: Option<Metadata>,
334    ) -> Result<()> {
335        let result = self.inner.insert(id, vector, metadata);
336        if result.is_ok() {
337            self.invalidate();
338        }
339        result
340    }
341
342    fn insert_batch(
343        &mut self,
344        items: Vec<(VectorId, std::sync::Arc<[f32]>, Option<Metadata>)>,
345    ) -> Result<()> {
346        // `insert_batch` is fail-fast and may apply partially, so any outcome
347        // can have changed the search space: always invalidate.
348        let result = self.inner.insert_batch(items);
349        self.invalidate();
350        result
351    }
352
353    fn delete(&mut self, id: &VectorId) -> Result<()> {
354        let result = self.inner.delete(id);
355        if result.is_ok() {
356            self.invalidate();
357        }
358        result
359    }
360
361    fn search(&self, query: &[f32], params: &SearchParams) -> Result<Vec<Hit>> {
362        if self.capacity == 0 {
363            let _ = self.misses.fetch_add(1, Ordering::Relaxed);
364            return self.inner.search(query, params);
365        }
366
367        let key = ResultKey::new(query, params);
368        {
369            let mut cache = self.lock_cache();
370            if let Some(entry) = cache.get(&key) {
371                if self.is_live(entry) {
372                    let _ = self.hits.fetch_add(1, Ordering::Relaxed);
373                    return Ok(entry.hits.to_vec());
374                }
375                // Expired: fall through to recompute. The stale entry stays
376                // until the `put` below overwrites it with a fresh result.
377            }
378        }
379
380        // Miss (or expired): run the search without holding the lock so
381        // concurrent misses do not serialize on it.
382        let hits = self.inner.search(query, params)?;
383        let _ = self.misses.fetch_add(1, Ordering::Relaxed);
384        let stamp = self.ttl.map(|_| self.clock.now());
385        let evicted = {
386            let mut cache = self.lock_cache();
387            cache.put(
388                key,
389                CacheEntry {
390                    hits: hits.clone().into_boxed_slice(),
391                    stamp,
392                },
393            )
394        };
395        if evicted {
396            let _ = self.evictions.fetch_add(1, Ordering::Relaxed);
397        }
398        Ok(hits)
399    }
400
401    fn len(&self) -> usize {
402        self.inner.len()
403    }
404
405    fn is_empty(&self) -> bool {
406        self.inner.is_empty()
407    }
408
409    fn dim(&self) -> usize {
410        self.inner.dim()
411    }
412
413    fn metric(&self) -> DistanceMetric {
414        self.inner.metric()
415    }
416
417    fn flush(&mut self) -> Result<()> {
418        // Flush commits durable state without changing the searchable set, so
419        // the cache stays valid.
420        self.inner.flush()
421    }
422
423    fn stats(&self) -> IndexStats {
424        self.inner.stats()
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    #![allow(clippy::unwrap_used)]
431
432    use clock_lib::ManualClock;
433
434    use super::*;
435    use crate::doc_stub::stub_index;
436
437    fn params() -> SearchParams {
438        SearchParams::new(1, DistanceMetric::Cosine)
439    }
440
441    #[test]
442    fn ttl_entry_is_recomputed_after_expiry() {
443        let clock = Arc::new(ManualClock::new());
444        let config = CacheConfig::new().capacity(8).ttl(Duration::from_secs(60));
445        let cached = CachedIndex::with_config_in(stub_index(), config, clock.clone());
446
447        let _miss = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
448        let _hit = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
449        assert_eq!(cached.cache_stats().hits, 1);
450
451        // Just inside the TTL: still a hit.
452        clock.advance(Duration::from_secs(59));
453        let _hit2 = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
454        assert_eq!(cached.cache_stats().hits, 2);
455
456        // Past the TTL: the entry expires and the search recomputes (a miss).
457        clock.advance(Duration::from_secs(2));
458        let _expired = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
459        assert_eq!(cached.cache_stats().hits, 2);
460        assert_eq!(cached.cache_stats().misses, 2);
461
462        // The recompute refreshed the entry, so the next search hits again.
463        let _hit3 = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
464        assert_eq!(cached.cache_stats().hits, 3);
465    }
466
467    #[test]
468    fn ttl_boundary_is_exclusive() {
469        let clock = Arc::new(ManualClock::new());
470        let config = CacheConfig::new().capacity(8).ttl(Duration::from_secs(10));
471        let cached = CachedIndex::with_config_in(stub_index(), config, clock.clone());
472
473        let _miss = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
474        // Exactly at the TTL counts as expired (`elapsed >= ttl`).
475        clock.advance(Duration::from_secs(10));
476        let _again = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
477        assert_eq!(cached.cache_stats().hits, 0);
478        assert_eq!(cached.cache_stats().misses, 2);
479    }
480
481    #[test]
482    fn no_ttl_never_expires_even_as_time_passes() {
483        let clock = Arc::new(ManualClock::new());
484        let config = CacheConfig::new().capacity(8); // no TTL
485        let cached = CachedIndex::with_config_in(stub_index(), config, clock.clone());
486
487        let _miss = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
488        let _hit = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
489        // Advance far beyond any plausible TTL: still a hit, because none is set.
490        clock.advance(Duration::from_secs(60 * 60 * 24 * 365));
491        let _hit2 = cached.search(&[1.0, 0.0, 0.0], &params()).unwrap();
492        assert_eq!(cached.cache_stats().hits, 2);
493        assert_eq!(cached.cache_stats().misses, 1);
494    }
495}