iqdb_cache/cached.rs
1//! The [`CachedIndex`] wrapper.
2
3use core::time::Duration;
4use std::sync::Arc;
5use std::sync::Mutex;
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use clock_lib::{Clock, Monotonic, SystemClock};
9use iqdb_index::{IndexCore, IndexStats};
10use iqdb_types::{DistanceMetric, Hit, Metadata, Result, SearchParams, VectorId};
11
12use crate::config::CacheConfig;
13use crate::key::ResultKey;
14use crate::lru::LruCache;
15use crate::stats::CacheStats;
16
17/// A cached search result plus the moment it was stored.
18///
19/// `stamp` is recorded only when a TTL is configured; with no TTL it is `None`
20/// and the entry never expires on time.
21struct CacheEntry {
22 /// The memoized hits, ready to clone out on a hit.
23 hits: Box<[Hit]>,
24 /// When the entry was written, for TTL expiry; `None` when no TTL applies.
25 stamp: Option<Monotonic>,
26}
27
28/// A drop-in [`IndexCore`] wrapper that memoizes search results.
29///
30/// `CachedIndex` holds any `I: IndexCore` and forwards every call to it, with
31/// one addition: identical [`search`](IndexCore::search) calls — same query
32/// and same [`SearchParams`] — are served from an in-memory LRU cache instead
33/// of re-running the search. Because it *is* an [`IndexCore`], it slots in
34/// anywhere the wrapped index does, including behind `Box<dyn IndexCore>`.
35///
36/// ## Correctness
37///
38/// The cache never returns a stale result. Every mutation that can change the
39/// search space — [`insert`](IndexCore::insert),
40/// [`insert_batch`](IndexCore::insert_batch), and
41/// [`delete`](IndexCore::delete) — invalidates the cache, so a search after a
42/// write always recomputes against the current index. Operations that do not
43/// change the result set ([`flush`](IndexCore::flush) and the read-only
44/// accessors) leave the cache intact.
45///
46/// ## Opt-in
47///
48/// Caching is an optimization a caller chooses by wrapping an index; the
49/// database leaves indexes unwrapped by default. Construct a cache that holds
50/// a fixed number of recent searches with [`new`](CachedIndex::new) or
51/// [`with_capacity`](CachedIndex::with_capacity), or tune it through a
52/// [`CacheConfig`] with [`with_config`](CachedIndex::with_config). A capacity of
53/// `0` disables caching entirely: every search passes straight through, which is
54/// useful for A/B measuring the cache's effect without changing call sites.
55///
56/// ## Time-to-live
57///
58/// A [`CacheConfig::ttl`] gives entries an expiry: a cached result older than
59/// the TTL is treated as a miss and recomputed. Mutations through this wrapper
60/// already invalidate exactly, so the TTL exists to bound staleness from changes
61/// the wrapper *cannot* see — for example, the wrapped index mutated through
62/// another handle. With no TTL (the default) the clock is never consulted.
63///
64/// ## Concurrency
65///
66/// `CachedIndex<I>` is `Send + Sync` whenever `I` is (which every `IndexCore`
67/// is). Reads share the cache behind a [`Mutex`] held only for the lookup and
68/// the insert — never across the wrapped search — so concurrent misses run the
69/// underlying search in parallel rather than serializing on the lock.
70///
71/// # Examples
72///
73/// ```
74/// use std::sync::Arc;
75///
76/// use iqdb_cache::CachedIndex;
77/// use iqdb_index::{Index, IndexCore, IndexStats};
78/// use iqdb_types::{DistanceMetric, Hit, IqdbError, Metadata, Result, SearchParams, VectorId};
79///
80/// // A minimal index that returns one hit per search; enough to show the wrap.
81/// struct Stub {
82/// dim: usize,
83/// metric: DistanceMetric,
84/// ids: Vec<VectorId>,
85/// }
86///
87/// impl IndexCore for Stub {
88/// fn insert(&mut self, id: VectorId, _v: Arc<[f32]>, _m: Option<Metadata>) -> Result<()> {
89/// self.ids.push(id);
90/// Ok(())
91/// }
92/// fn delete(&mut self, id: &VectorId) -> Result<()> {
93/// match self.ids.iter().position(|x| x == id) {
94/// Some(pos) => { let _ = self.ids.remove(pos); Ok(()) }
95/// None => Err(IqdbError::NotFound),
96/// }
97/// }
98/// fn search(&self, _q: &[f32], params: &SearchParams) -> Result<Vec<Hit>> {
99/// Ok(self.ids.iter().take(params.k).map(|id| Hit::new(id.clone(), 0.0)).collect())
100/// }
101/// fn len(&self) -> usize { self.ids.len() }
102/// fn dim(&self) -> usize { self.dim }
103/// fn metric(&self) -> DistanceMetric { self.metric }
104/// fn flush(&mut self) -> Result<()> { Ok(()) }
105/// fn stats(&self) -> IndexStats {
106/// IndexStats { n_vectors: self.ids.len(), index_type: "stub", ..IndexStats::default() }
107/// }
108/// }
109///
110/// # fn main() -> Result<()> {
111/// let stub = Stub { dim: 3, metric: DistanceMetric::Cosine, ids: vec![VectorId::from(1u64)] };
112/// let mut cached = CachedIndex::new(stub);
113///
114/// let params = SearchParams::new(1, DistanceMetric::Cosine);
115/// let first = cached.search(&[1.0, 0.0, 0.0], ¶ms)?; // miss: runs the search
116/// let again = cached.search(&[1.0, 0.0, 0.0], ¶ms)?; // hit: served from cache
117/// assert_eq!(first, again);
118///
119/// let stats = cached.cache_stats();
120/// assert_eq!(stats.hits, 1);
121/// assert_eq!(stats.misses, 1);
122/// # Ok(())
123/// # }
124/// ```
125pub struct CachedIndex<I> {
126 /// The wrapped index every call forwards to.
127 inner: I,
128 /// The result cache, guarded for `&self` search access.
129 cache: Mutex<LruCache<ResultKey, CacheEntry>>,
130 /// Configured capacity, mirrored here for `0`-means-disabled fast paths.
131 capacity: usize,
132 /// Optional per-entry time-to-live; `None` means entries expire only on
133 /// mutation.
134 ttl: Option<Duration>,
135 /// Time source for TTL expiry. `SystemClock` in production; a mock clock is
136 /// injected in tests. Only read when `ttl` is `Some`.
137 clock: Arc<dyn Clock>,
138 /// Lifetime count of cache hits.
139 hits: AtomicU64,
140 /// Lifetime count of cache misses.
141 misses: AtomicU64,
142}
143
144impl<I: IndexCore> CachedIndex<I> {
145 /// Wraps `inner` with a result cache of the default capacity (1024 recent
146 /// searches) and no TTL.
147 ///
148 /// # Examples
149 ///
150 /// ```
151 /// # use iqdb_cache::CachedIndex;
152 /// # use iqdb_cache::doc_stub::stub_index;
153 /// let cached = CachedIndex::new(stub_index());
154 /// assert!(cached.is_enabled());
155 /// ```
156 #[must_use]
157 pub fn new(inner: I) -> Self {
158 Self::with_config(inner, CacheConfig::new())
159 }
160
161 /// Wraps `inner` with a result cache that holds at most `capacity` recent
162 /// searches and no TTL.
163 ///
164 /// A `capacity` of `0` disables caching: searches pass straight through and
165 /// nothing is stored.
166 ///
167 /// # Examples
168 ///
169 /// ```
170 /// # use iqdb_cache::CachedIndex;
171 /// # use iqdb_cache::doc_stub::stub_index;
172 /// let cached = CachedIndex::with_capacity(stub_index(), 256);
173 /// assert_eq!(cached.capacity(), 256);
174 ///
175 /// let bypass = CachedIndex::with_capacity(stub_index(), 0);
176 /// assert!(!bypass.is_enabled());
177 /// ```
178 #[must_use]
179 pub fn with_capacity(inner: I, capacity: usize) -> Self {
180 Self::with_config(inner, CacheConfig::new().capacity(capacity))
181 }
182
183 /// Wraps `inner` with a result cache built from `config` (the Tier-2 path).
184 ///
185 /// Use [`CacheConfig`] to set the capacity and an optional TTL together.
186 ///
187 /// # Examples
188 ///
189 /// ```
190 /// use std::time::Duration;
191 ///
192 /// use iqdb_cache::{CacheConfig, CachedIndex};
193 /// # use iqdb_cache::doc_stub::stub_index;
194 /// let config = CacheConfig::new().capacity(512).ttl(Duration::from_secs(30));
195 /// let cached = CachedIndex::with_config(stub_index(), config);
196 /// assert_eq!(cached.capacity(), 512);
197 /// assert_eq!(cached.ttl(), Some(Duration::from_secs(30)));
198 /// ```
199 #[must_use]
200 pub fn with_config(inner: I, config: CacheConfig) -> Self {
201 Self::with_config_in(inner, config, Arc::new(SystemClock::new()))
202 }
203
204 /// Construction core shared by every public constructor, with an injectable
205 /// clock for deterministic TTL tests.
206 pub(crate) fn with_config_in(inner: I, config: CacheConfig, clock: Arc<dyn Clock>) -> Self {
207 Self {
208 inner,
209 cache: Mutex::new(LruCache::with_capacity(config.capacity)),
210 capacity: config.capacity,
211 ttl: config.ttl,
212 clock,
213 hits: AtomicU64::new(0),
214 misses: AtomicU64::new(0),
215 }
216 }
217
218 /// The configured cache capacity. `0` means caching is disabled.
219 #[inline]
220 #[must_use]
221 pub fn capacity(&self) -> usize {
222 self.capacity
223 }
224
225 /// The configured per-entry time-to-live, or `None` if results expire only
226 /// on mutation.
227 #[inline]
228 #[must_use]
229 pub fn ttl(&self) -> Option<Duration> {
230 self.ttl
231 }
232
233 /// Whether caching is active (`capacity > 0`).
234 #[inline]
235 #[must_use]
236 pub fn is_enabled(&self) -> bool {
237 self.capacity > 0
238 }
239
240 /// Borrows the wrapped index.
241 #[inline]
242 #[must_use]
243 pub fn get_ref(&self) -> &I {
244 &self.inner
245 }
246
247 /// Unwraps the cache, returning the index it held.
248 ///
249 /// # Examples
250 ///
251 /// ```
252 /// # use iqdb_cache::CachedIndex;
253 /// # use iqdb_cache::doc_stub::stub_index;
254 /// # use iqdb_index::IndexCore;
255 /// let cached = CachedIndex::new(stub_index());
256 /// let inner = cached.into_inner();
257 /// assert_eq!(inner.dim(), 3);
258 /// ```
259 #[must_use]
260 pub fn into_inner(self) -> I {
261 self.inner
262 }
263
264 /// Drops every cached result, keeping the wrapped index untouched.
265 ///
266 /// Mutations invalidate automatically; call this only to force a cold cache
267 /// (for example, after the wrapped index was changed through a handle other
268 /// than this wrapper).
269 pub fn clear_cache(&mut self) {
270 match self.cache.get_mut() {
271 Ok(cache) => cache.clear(),
272 Err(poisoned) => poisoned.into_inner().clear(),
273 }
274 }
275
276 /// A snapshot of the cache's hit/miss counters and occupancy.
277 #[must_use]
278 pub fn cache_stats(&self) -> CacheStats {
279 let len = self.lock_cache().len();
280 CacheStats {
281 hits: self.hits.load(Ordering::Relaxed),
282 misses: self.misses.load(Ordering::Relaxed),
283 len,
284 capacity: self.capacity,
285 }
286 }
287
288 /// Locks the cache, recovering the guard if a previous holder panicked.
289 ///
290 /// A poisoned result cache is safe to keep using: a half-finished insert
291 /// can at worst drop or duplicate a memoized entry, never corrupt a result,
292 /// so recovery is preferable to propagating the panic.
293 fn lock_cache(&self) -> std::sync::MutexGuard<'_, LruCache<ResultKey, CacheEntry>> {
294 self.cache
295 .lock()
296 .unwrap_or_else(|poisoned| poisoned.into_inner())
297 }
298
299 /// Whether a cached entry is still live under the configured TTL.
300 ///
301 /// Always `true` when no TTL is set, so the clock is never read on the
302 /// non-TTL hot path.
303 #[inline]
304 fn is_live(&self, entry: &CacheEntry) -> bool {
305 match (self.ttl, entry.stamp) {
306 (Some(ttl), Some(stamp)) => self.clock.now().saturating_duration_since(stamp) < ttl,
307 _ => true,
308 }
309 }
310
311 /// Empties the cache through `&mut self` after a mutation.
312 fn invalidate(&mut self) {
313 // `&mut self` guarantees exclusive access, so no lock is contended.
314 match self.cache.get_mut() {
315 Ok(cache) => cache.clear(),
316 Err(poisoned) => poisoned.into_inner().clear(),
317 }
318 }
319}
320
321impl<I: IndexCore> IndexCore for CachedIndex<I> {
322 fn insert(
323 &mut self,
324 id: VectorId,
325 vector: std::sync::Arc<[f32]>,
326 metadata: Option<Metadata>,
327 ) -> Result<()> {
328 let result = self.inner.insert(id, vector, metadata);
329 if result.is_ok() {
330 self.invalidate();
331 }
332 result
333 }
334
335 fn insert_batch(
336 &mut self,
337 items: Vec<(VectorId, std::sync::Arc<[f32]>, Option<Metadata>)>,
338 ) -> Result<()> {
339 // `insert_batch` is fail-fast and may apply partially, so any outcome
340 // can have changed the search space: always invalidate.
341 let result = self.inner.insert_batch(items);
342 self.invalidate();
343 result
344 }
345
346 fn delete(&mut self, id: &VectorId) -> Result<()> {
347 let result = self.inner.delete(id);
348 if result.is_ok() {
349 self.invalidate();
350 }
351 result
352 }
353
354 fn search(&self, query: &[f32], params: &SearchParams) -> Result<Vec<Hit>> {
355 if self.capacity == 0 {
356 let _ = self.misses.fetch_add(1, Ordering::Relaxed);
357 return self.inner.search(query, params);
358 }
359
360 let key = ResultKey::new(query, params);
361 {
362 let mut cache = self.lock_cache();
363 if let Some(entry) = cache.get(&key) {
364 if self.is_live(entry) {
365 let _ = self.hits.fetch_add(1, Ordering::Relaxed);
366 return Ok(entry.hits.to_vec());
367 }
368 // Expired: fall through to recompute. The stale entry stays
369 // until the `put` below overwrites it with a fresh result.
370 }
371 }
372
373 // Miss (or expired): run the search without holding the lock so
374 // concurrent misses do not serialize on it.
375 let hits = self.inner.search(query, params)?;
376 let _ = self.misses.fetch_add(1, Ordering::Relaxed);
377 let stamp = self.ttl.map(|_| self.clock.now());
378 {
379 let mut cache = self.lock_cache();
380 let _evicted = cache.put(
381 key,
382 CacheEntry {
383 hits: hits.clone().into_boxed_slice(),
384 stamp,
385 },
386 );
387 }
388 Ok(hits)
389 }
390
391 fn len(&self) -> usize {
392 self.inner.len()
393 }
394
395 fn is_empty(&self) -> bool {
396 self.inner.is_empty()
397 }
398
399 fn dim(&self) -> usize {
400 self.inner.dim()
401 }
402
403 fn metric(&self) -> DistanceMetric {
404 self.inner.metric()
405 }
406
407 fn flush(&mut self) -> Result<()> {
408 // Flush commits durable state without changing the searchable set, so
409 // the cache stays valid.
410 self.inner.flush()
411 }
412
413 fn stats(&self) -> IndexStats {
414 self.inner.stats()
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 #![allow(clippy::unwrap_used)]
421
422 use clock_lib::ManualClock;
423
424 use super::*;
425 use crate::doc_stub::stub_index;
426
427 fn params() -> SearchParams {
428 SearchParams::new(1, DistanceMetric::Cosine)
429 }
430
431 #[test]
432 fn ttl_entry_is_recomputed_after_expiry() {
433 let clock = Arc::new(ManualClock::new());
434 let config = CacheConfig::new().capacity(8).ttl(Duration::from_secs(60));
435 let cached = CachedIndex::with_config_in(stub_index(), config, clock.clone());
436
437 let _miss = cached.search(&[1.0, 0.0, 0.0], ¶ms()).unwrap();
438 let _hit = cached.search(&[1.0, 0.0, 0.0], ¶ms()).unwrap();
439 assert_eq!(cached.cache_stats().hits, 1);
440
441 // Just inside the TTL: still a hit.
442 clock.advance(Duration::from_secs(59));
443 let _hit2 = cached.search(&[1.0, 0.0, 0.0], ¶ms()).unwrap();
444 assert_eq!(cached.cache_stats().hits, 2);
445
446 // Past the TTL: the entry expires and the search recomputes (a miss).
447 clock.advance(Duration::from_secs(2));
448 let _expired = cached.search(&[1.0, 0.0, 0.0], ¶ms()).unwrap();
449 assert_eq!(cached.cache_stats().hits, 2);
450 assert_eq!(cached.cache_stats().misses, 2);
451
452 // The recompute refreshed the entry, so the next search hits again.
453 let _hit3 = cached.search(&[1.0, 0.0, 0.0], ¶ms()).unwrap();
454 assert_eq!(cached.cache_stats().hits, 3);
455 }
456
457 #[test]
458 fn ttl_boundary_is_exclusive() {
459 let clock = Arc::new(ManualClock::new());
460 let config = CacheConfig::new().capacity(8).ttl(Duration::from_secs(10));
461 let cached = CachedIndex::with_config_in(stub_index(), config, clock.clone());
462
463 let _miss = cached.search(&[1.0, 0.0, 0.0], ¶ms()).unwrap();
464 // Exactly at the TTL counts as expired (`elapsed >= ttl`).
465 clock.advance(Duration::from_secs(10));
466 let _again = cached.search(&[1.0, 0.0, 0.0], ¶ms()).unwrap();
467 assert_eq!(cached.cache_stats().hits, 0);
468 assert_eq!(cached.cache_stats().misses, 2);
469 }
470
471 #[test]
472 fn no_ttl_never_expires_even_as_time_passes() {
473 let clock = Arc::new(ManualClock::new());
474 let config = CacheConfig::new().capacity(8); // no TTL
475 let cached = CachedIndex::with_config_in(stub_index(), config, clock.clone());
476
477 let _miss = cached.search(&[1.0, 0.0, 0.0], ¶ms()).unwrap();
478 let _hit = cached.search(&[1.0, 0.0, 0.0], ¶ms()).unwrap();
479 // Advance far beyond any plausible TTL: still a hit, because none is set.
480 clock.advance(Duration::from_secs(60 * 60 * 24 * 365));
481 let _hit2 = cached.search(&[1.0, 0.0, 0.0], ¶ms()).unwrap();
482 assert_eq!(cached.cache_stats().hits, 2);
483 assert_eq!(cached.cache_stats().misses, 1);
484 }
485}