Skip to main content

lance_core/cache/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Lance cache system.
5//!
6//! ## For cache users
7//!
8//! Use [`LanceCache`] (or [`WeakLanceCache`]) to store and retrieve typed
9//! values. Define a [`CacheKey`] (or [`UnsizedCacheKey`] for trait objects) to
10//! describe what you're caching and its type.
11//!
12//! To make a value type serializable (so persistent backends can store it),
13//! implement [`CacheCodecImpl`] on the type, then override [`CacheKey::codec`]:
14//!
15//! ```ignore
16//! impl CacheCodecImpl for MyData {
17//!     fn serialize(&self, w: &mut dyn Write) -> Result<()> { /* ... */ }
18//!     fn deserialize(data: &Bytes) -> Result<Self> { /* ... */ }
19//! }
20//!
21//! impl CacheKey for MyDataKey {
22//!     type ValueType = MyData;
23//!     fn key(&self) -> Cow<'_, str> { /* ... */ }
24//!     fn type_name() -> &'static str { "MyData" }
25//!     fn codec() -> Option<CacheCodec> {
26//!         Some(CacheCodec::from_impl::<MyData>())
27//!     }
28//! }
29//! ```
30//!
31//! ## For backend implementors
32//!
33//! Implement [`CacheBackend`] to provide a custom storage layer (disk, Redis,
34//! etc.). Backends receive [`InternalCacheKey`] keys and type-erased
35//! [`CacheEntry`] values — the typed wrapping is handled by [`LanceCache`].
36//! See the [`backend`] module for details.
37//!
38//! ## Serialization flow
39//!
40//! When a [`CacheKey`] provides a codec via [`CacheKey::codec`]:
41//!
42//! 1. [`LanceCache`] wraps the [`CacheCodec`] and passes it to the backend
43//!    alongside the entry on `insert` and `get` calls.
44//! 2. In-memory backends (like [`MokaCacheBackend`]) ignore the codec.
45//! 3. Persistent backends use `codec.serialize(entry, writer)` on insert and
46//!    `codec.deserialize(reader)` on get to persist entries across restarts.
47
48pub mod backend;
49pub mod codec;
50mod entry_io;
51mod moka;
52
53pub use backend::{CacheBackend, CacheEntry, CacheKeyIterator, InternalCacheKey};
54pub use codec::{
55    CacheCodec, CacheCodecImpl, CacheDecode, CacheMissReason, MAGIC, has_cache_envelope,
56};
57pub use entry_io::{CacheEntryReader, CacheEntryWriter};
58pub use moka::MokaCacheBackend;
59
60use std::borrow::Cow;
61use std::sync::{
62    Arc,
63    atomic::{AtomicU64, Ordering},
64};
65
66use futures::{Future, FutureExt};
67
68use crate::Result;
69
70pub use crate::deepsize::{Context, DeepSizeOf};
71
72// ---------------------------------------------------------------------------
73// CacheKey / UnsizedCacheKey — typed key traits for cache users
74// ---------------------------------------------------------------------------
75
76/// Typed cache key for sized value types.
77///
78/// Implement this trait to define a new type of cached entry. [`LanceCache`]
79/// uses the key string and type name to construct an [`InternalCacheKey`]
80/// for the backend.
81///
82/// # Example
83///
84/// ```ignore
85/// struct MyKey { id: u64 }
86///
87/// impl CacheKey for MyKey {
88///     type ValueType = MyData;
89///     fn key(&self) -> Cow<'_, str> { self.id.to_string().into() }
90///     fn type_name() -> &'static str { "MyData" }
91/// }
92/// ```
93pub trait CacheKey {
94    type ValueType: 'static;
95
96    fn key(&self) -> Cow<'_, str>;
97
98    /// Short, stable string identifying this value type.
99    ///
100    /// Two `CacheKey` impls that store different `ValueType`s **must** return
101    /// different type names; if they collide, gets will silently return `None`
102    /// due to failed downcasts.
103    ///
104    /// Use a short literal (e.g. `"Vec<IndexMetadata>"`), not
105    /// `std::any::type_name` — the latter is not guaranteed stable across
106    /// compiler versions or build configurations.
107    fn type_name() -> &'static str;
108
109    /// Optional codec for serializing/deserializing this key's value type.
110    ///
111    /// Returns `None` by default. Cache backends that support persistence
112    /// (e.g. disk-backed caches) use this to serialize entries on insert and
113    /// deserialize on get. Types without a codec will only be stored in-memory.
114    ///
115    /// [`CacheCodec`] is `Copy` (two plain function pointers), so returning it
116    /// by value is cheap — no allocation needed.
117    fn codec() -> Option<CacheCodec> {
118        None
119    }
120}
121
122/// Like [`CacheKey`] but for unsized value types (e.g. `dyn Trait`).
123///
124/// The cache wraps values in an extra `Arc` layer internally; callers pass
125/// and receive `Arc<T>` where `T: ?Sized`.
126///
127/// Unsized cache entries are always in-memory only (no serialization codec).
128/// For serializable entries, use a sized [`CacheKey`] instead.
129pub trait UnsizedCacheKey {
130    type ValueType: 'static + ?Sized;
131
132    fn key(&self) -> Cow<'_, str>;
133
134    /// Short, stable string identifying this value type.
135    /// See [`CacheKey::type_name`] for requirements.
136    fn type_name() -> &'static str;
137}
138
139// ---------------------------------------------------------------------------
140// Internal helpers
141// ---------------------------------------------------------------------------
142
143/// Size of a cached `Arc<T>`, accounting for the Arc overhead (two atomic counters).
144fn cache_entry_size<T: DeepSizeOf + ?Sized>(value: &T) -> usize {
145    value.deep_size_of() + std::mem::size_of::<std::sync::atomic::AtomicUsize>() * 2
146}
147
148/// Build an [`InternalCacheKey`] from a cache's prefix, a user key string,
149/// and a type name.
150fn build_key(prefix: &Arc<str>, key: &str, type_name: &'static str) -> InternalCacheKey {
151    InternalCacheKey::new(prefix.clone(), Arc::from(key), type_name)
152}
153
154// ---------------------------------------------------------------------------
155// LanceCache — typed wrapper around dyn CacheBackend
156// ---------------------------------------------------------------------------
157
158/// Typed cache wrapper that handles key construction and type safety.
159///
160/// Internally delegates to a [`CacheBackend`]. The default backend is
161/// [`MokaCacheBackend`]; pass a custom backend via [`LanceCache::with_backend`].
162#[derive(Clone)]
163pub struct LanceCache {
164    cache: Arc<dyn CacheBackend>,
165    prefix: Arc<str>,
166    hits: Arc<AtomicU64>,
167    misses: Arc<AtomicU64>,
168}
169
170impl std::fmt::Debug for LanceCache {
171    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172        f.debug_struct("LanceCache")
173            .field("cache", &self.cache)
174            .finish()
175    }
176}
177
178impl DeepSizeOf for LanceCache {
179    fn deep_size_of_children(&self, _: &mut Context) -> usize {
180        self.cache.approx_size_bytes()
181    }
182}
183
184impl LanceCache {
185    pub fn with_capacity(capacity: usize) -> Self {
186        Self {
187            cache: Arc::new(MokaCacheBackend::with_capacity(capacity)),
188            prefix: Arc::from(""),
189            hits: Arc::new(AtomicU64::new(0)),
190            misses: Arc::new(AtomicU64::new(0)),
191        }
192    }
193
194    /// Create a cache backed by a custom [`CacheBackend`].
195    pub fn with_backend(backend: Arc<dyn CacheBackend>) -> Self {
196        Self {
197            cache: backend,
198            prefix: Arc::from(""),
199            hits: Arc::new(AtomicU64::new(0)),
200            misses: Arc::new(AtomicU64::new(0)),
201        }
202    }
203
204    pub fn no_cache() -> Self {
205        Self {
206            cache: Arc::new(MokaCacheBackend::no_cache()),
207            prefix: Arc::from(""),
208            hits: Arc::new(AtomicU64::new(0)),
209            misses: Arc::new(AtomicU64::new(0)),
210        }
211    }
212
213    /// Create a cache with the given backend and an exact prefix string.
214    /// Unlike `with_key_prefix`, this sets the prefix verbatim (no trailing slash added).
215    pub fn with_backend_and_prefix(backend: Arc<dyn CacheBackend>, prefix: String) -> Self {
216        Self {
217            cache: backend,
218            prefix: Arc::from(prefix),
219            hits: Arc::new(AtomicU64::new(0)),
220            misses: Arc::new(AtomicU64::new(0)),
221        }
222    }
223
224    /// Appends a prefix to the cache key.
225    pub fn with_key_prefix(&self, prefix: &str) -> Self {
226        Self {
227            cache: self.cache.clone(),
228            prefix: Arc::from(format!("{}{}/", self.prefix, prefix)),
229            hits: self.hits.clone(),
230            misses: self.misses.clone(),
231        }
232    }
233
234    /// Invalidate all entries whose prefix starts with the given string.
235    pub async fn invalidate_prefix(&self, prefix: &str) {
236        let full_prefix = format!("{}{}", self.prefix, prefix);
237        self.cache.invalidate_prefix(&full_prefix).await;
238    }
239
240    pub async fn size(&self) -> usize {
241        self.cache.num_entries().await
242    }
243
244    pub fn approx_size(&self) -> usize {
245        self.cache.approx_num_entries()
246    }
247
248    pub async fn size_bytes(&self) -> usize {
249        self.cache.size_bytes().await
250    }
251
252    /// Return an iterator over keys currently stored under this cache's prefix.
253    ///
254    /// Returns `None` when the backend does not support key inventory. The
255    /// iterator is intended for diagnostics and may be weakly consistent with
256    /// concurrent cache mutations.
257    ///
258    /// # Examples
259    ///
260    /// ```
261    /// # use std::{borrow::Cow, sync::Arc};
262    /// # use lance_core::cache::{CacheKey, LanceCache};
263    /// # struct MyKey;
264    /// # impl CacheKey for MyKey {
265    /// #     type ValueType = Vec<i32>;
266    /// #     fn key(&self) -> Cow<'_, str> { Cow::Borrowed("my-key") }
267    /// #     fn type_name() -> &'static str { "VecI32" }
268    /// # }
269    /// # async fn example() {
270    /// let cache = LanceCache::with_capacity(1024);
271    /// cache.insert_with_key(&MyKey, Arc::new(vec![1, 2, 3])).await;
272    ///
273    /// let mut keys = cache.keys().await.expect("Moka supports key inventory");
274    /// assert_eq!(keys.next().unwrap().key(), "my-key");
275    /// # }
276    /// ```
277    pub async fn keys(&self) -> Option<CacheKeyIterator<'_>> {
278        Some(Box::new(
279            self.cache
280                .keys()
281                .await?
282                .filter(|key| key.starts_with(&self.prefix)),
283        ))
284    }
285
286    // -- Sized insert/get (internal, shared by sized and unsized paths) --------
287
288    async fn insert_with_id<T: DeepSizeOf + Send + Sync + 'static>(
289        &self,
290        key: &str,
291        type_name: &'static str,
292        codec: Option<CacheCodec>,
293        metadata: Arc<T>,
294    ) {
295        let size = cache_entry_size(&*metadata);
296        let cache_key = build_key(&self.prefix, key, type_name);
297        self.cache.insert(&cache_key, metadata, size, codec).await;
298    }
299
300    async fn get_with_id<T: Send + Sync + 'static>(
301        &self,
302        key: &str,
303        type_name: &'static str,
304        codec: Option<CacheCodec>,
305    ) -> Option<Arc<T>> {
306        let cache_key = build_key(&self.prefix, key, type_name);
307        if let Some(entry) = self.cache.get(&cache_key, codec).await {
308            match entry.downcast::<T>() {
309                Ok(val) => {
310                    self.hits.fetch_add(1, Ordering::Relaxed);
311                    Some(val)
312                }
313                Err(_) => {
314                    // Type mismatch: the backend returned a different concrete
315                    // type than expected (e.g. a disk cache may store
316                    // intermediate state). Treat as a miss.
317                    self.misses.fetch_add(1, Ordering::Relaxed);
318                    None
319                }
320            }
321        } else {
322            self.misses.fetch_add(1, Ordering::Relaxed);
323            None
324        }
325    }
326
327    // -- Stats / clear --------------------------------------------------------
328
329    pub async fn stats(&self) -> CacheStats {
330        CacheStats {
331            hits: self.hits.load(Ordering::Relaxed),
332            misses: self.misses.load(Ordering::Relaxed),
333            num_entries: self.cache.num_entries().await,
334            size_bytes: self.cache.size_bytes().await,
335        }
336    }
337
338    pub async fn clear(&self) {
339        self.cache.clear().await;
340        self.hits.store(0, Ordering::Relaxed);
341        self.misses.store(0, Ordering::Relaxed);
342    }
343
344    // -- CacheKey-based methods -----------------------------------------------
345
346    pub async fn insert_with_key<K>(&self, cache_key: &K, metadata: Arc<K::ValueType>)
347    where
348        K: CacheKey,
349        K::ValueType: DeepSizeOf + Send + Sync + 'static,
350    {
351        self.insert_with_id(&cache_key.key(), K::type_name(), K::codec(), metadata)
352            .boxed()
353            .await
354    }
355
356    pub async fn get_with_key<K>(&self, cache_key: &K) -> Option<Arc<K::ValueType>>
357    where
358        K: CacheKey,
359        K::ValueType: DeepSizeOf + Send + Sync + 'static,
360    {
361        self.get_with_id::<K::ValueType>(&cache_key.key(), K::type_name(), K::codec())
362            .boxed()
363            .await
364    }
365
366    pub async fn get_or_insert_with_key<K, F, Fut>(
367        &self,
368        cache_key: K,
369        loader: F,
370    ) -> Result<Arc<K::ValueType>>
371    where
372        K: CacheKey,
373        K::ValueType: DeepSizeOf + Send + Sync + 'static,
374        F: FnOnce() -> Fut + Send,
375        Fut: Future<Output = Result<K::ValueType>> + Send,
376    {
377        let key = build_key(&self.prefix, &cache_key.key(), K::type_name());
378
379        let typed_loader = Box::pin(async move {
380            let value = loader().await?;
381            let arc = Arc::new(value);
382            let size = cache_entry_size(&*arc);
383            Ok((arc as CacheEntry, size))
384        });
385
386        let (entry, was_cached) = self
387            .cache
388            .get_or_insert(&key, typed_loader, K::codec())
389            .await?;
390
391        if was_cached {
392            self.hits.fetch_add(1, Ordering::Relaxed);
393        } else {
394            self.misses.fetch_add(1, Ordering::Relaxed);
395        }
396
397        Ok(entry.downcast::<K::ValueType>().unwrap())
398    }
399
400    pub async fn insert_unsized_with_key<K>(&self, cache_key: &K, metadata: Arc<K::ValueType>)
401    where
402        K: UnsizedCacheKey,
403        K::ValueType: DeepSizeOf + Send + Sync + 'static,
404    {
405        self.insert_with_id(&cache_key.key(), K::type_name(), None, Arc::new(metadata))
406            .boxed()
407            .await
408    }
409
410    pub async fn get_unsized_with_key<K>(&self, cache_key: &K) -> Option<Arc<K::ValueType>>
411    where
412        K: UnsizedCacheKey,
413        K::ValueType: DeepSizeOf + Send + Sync + 'static,
414    {
415        let outer = self
416            .get_with_id::<Arc<K::ValueType>>(&cache_key.key(), K::type_name(), None)
417            .boxed()
418            .await?;
419        Some(outer.as_ref().clone())
420    }
421}
422
423// ---------------------------------------------------------------------------
424// WeakLanceCache
425// ---------------------------------------------------------------------------
426
427/// A weak reference to a LanceCache, used by indices to avoid circular references.
428/// When the original cache is dropped, operations on this will gracefully no-op.
429#[derive(Clone, Debug)]
430pub struct WeakLanceCache {
431    inner: std::sync::Weak<dyn CacheBackend>,
432    prefix: Arc<str>,
433    hits: Arc<AtomicU64>,
434    misses: Arc<AtomicU64>,
435}
436
437impl WeakLanceCache {
438    pub fn from(cache: &LanceCache) -> Self {
439        Self {
440            inner: Arc::downgrade(&cache.cache),
441            prefix: cache.prefix.clone(),
442            hits: cache.hits.clone(),
443            misses: cache.misses.clone(),
444        }
445    }
446
447    pub fn with_key_prefix(&self, prefix: &str) -> Self {
448        Self {
449            inner: self.inner.clone(),
450            prefix: Arc::from(format!("{}{}/", self.prefix, prefix)),
451            hits: self.hits.clone(),
452            misses: self.misses.clone(),
453        }
454    }
455
456    /// The key prefix used for all entries in this cache.
457    pub fn prefix(&self) -> &str {
458        &self.prefix
459    }
460
461    pub async fn get_with_key<K>(&self, cache_key: &K) -> Option<Arc<K::ValueType>>
462    where
463        K: CacheKey,
464        K::ValueType: DeepSizeOf + Send + Sync + 'static,
465    {
466        let cache = self.inner.upgrade()?;
467        let key = build_key(&self.prefix, &cache_key.key(), K::type_name());
468        if let Some(entry) = cache.get(&key, K::codec()).await {
469            self.hits.fetch_add(1, Ordering::Relaxed);
470            Some(entry.downcast::<K::ValueType>().unwrap())
471        } else {
472            self.misses.fetch_add(1, Ordering::Relaxed);
473            None
474        }
475    }
476
477    pub async fn insert_with_key<K>(&self, cache_key: &K, value: Arc<K::ValueType>) -> bool
478    where
479        K: CacheKey,
480        K::ValueType: DeepSizeOf + Send + Sync + 'static,
481    {
482        if let Some(cache) = self.inner.upgrade() {
483            let size = cache_entry_size(&*value);
484            let key = build_key(&self.prefix, &cache_key.key(), K::type_name());
485            cache.insert(&key, value, size, K::codec()).await;
486            true
487        } else {
488            log::warn!("WeakLanceCache: cache no longer available, unable to insert item");
489            false
490        }
491    }
492
493    /// Get or insert an item, computing it if necessary.
494    ///
495    /// Deduplication of concurrent loads is handled by the backend.
496    pub async fn get_or_insert_with_key<K, F, Fut>(
497        &self,
498        cache_key: K,
499        loader: F,
500    ) -> Result<Arc<K::ValueType>>
501    where
502        K: CacheKey,
503        K::ValueType: DeepSizeOf + Send + Sync + 'static,
504        F: FnOnce() -> Fut + Send,
505        Fut: Future<Output = Result<K::ValueType>> + Send,
506    {
507        if let Some(cache) = self.inner.upgrade() {
508            let key = build_key(&self.prefix, &cache_key.key(), K::type_name());
509            let typed_loader = Box::pin(async move {
510                let value = loader().await?;
511                let arc = Arc::new(value);
512                let size = cache_entry_size(&*arc);
513                Ok((arc as CacheEntry, size))
514            });
515            let (entry, was_cached) = cache.get_or_insert(&key, typed_loader, K::codec()).await?;
516            if was_cached {
517                self.hits.fetch_add(1, Ordering::Relaxed);
518            } else {
519                self.misses.fetch_add(1, Ordering::Relaxed);
520            }
521            Ok(entry.downcast::<K::ValueType>().unwrap())
522        } else {
523            log::warn!("WeakLanceCache: cache no longer available, computing without caching");
524            loader().await.map(Arc::new)
525        }
526    }
527
528    pub async fn get_unsized_with_key<K>(&self, cache_key: &K) -> Option<Arc<K::ValueType>>
529    where
530        K: UnsizedCacheKey,
531        K::ValueType: DeepSizeOf + Send + Sync + 'static,
532    {
533        let cache = self.inner.upgrade()?;
534        let key = build_key(&self.prefix, &cache_key.key(), K::type_name());
535        if let Some(entry) = cache.get(&key, None).await {
536            entry
537                .downcast::<Arc<K::ValueType>>()
538                .ok()
539                .map(|arc| arc.as_ref().clone())
540        } else {
541            None
542        }
543    }
544
545    pub async fn insert_unsized_with_key<K>(&self, cache_key: &K, value: Arc<K::ValueType>)
546    where
547        K: UnsizedCacheKey,
548        K::ValueType: DeepSizeOf + Send + Sync + 'static,
549    {
550        if let Some(cache) = self.inner.upgrade() {
551            let wrapper = Arc::new(value);
552            let size = cache_entry_size(&*wrapper);
553            let key = build_key(&self.prefix, &cache_key.key(), K::type_name());
554            cache.insert(&key, wrapper, size, None).await;
555        } else {
556            log::warn!("WeakLanceCache: cache no longer available, unable to insert unsized item");
557        }
558    }
559}
560
561// ---------------------------------------------------------------------------
562// CacheStats
563// ---------------------------------------------------------------------------
564
565#[derive(Debug, Clone)]
566pub struct CacheStats {
567    /// Number of times `get`, `get_unsized`, or `get_or_insert` found an item in the cache.
568    pub hits: u64,
569    /// Number of times `get`, `get_unsized`, or `get_or_insert` did not find an item in the cache.
570    pub misses: u64,
571    /// Number of entries currently in the cache.
572    pub num_entries: usize,
573    /// Total size in bytes of all entries in the cache.
574    pub size_bytes: usize,
575}
576
577impl CacheStats {
578    pub fn hit_ratio(&self) -> f32 {
579        if self.hits + self.misses == 0 {
580            0.0
581        } else {
582            self.hits as f32 / (self.hits + self.misses) as f32
583        }
584    }
585
586    pub fn miss_ratio(&self) -> f32 {
587        if self.hits + self.misses == 0 {
588            0.0
589        } else {
590            self.misses as f32 / (self.hits + self.misses) as f32
591        }
592    }
593}
594
595#[cfg(test)]
596mod tests {
597    use super::*;
598    use std::collections::{BTreeSet, HashMap};
599    use std::marker::PhantomData;
600
601    struct TestKey<T: 'static> {
602        key: String,
603        _phantom: PhantomData<T>,
604    }
605
606    impl<T: 'static> TestKey<T> {
607        fn new(key: &str) -> Self {
608            Self {
609                key: key.to_string(),
610                _phantom: PhantomData,
611            }
612        }
613    }
614
615    impl<T: 'static> CacheKey for TestKey<T> {
616        type ValueType = T;
617        fn key(&self) -> std::borrow::Cow<'_, str> {
618            std::borrow::Cow::Borrowed(&self.key)
619        }
620        fn type_name() -> &'static str {
621            std::any::type_name::<T>()
622        }
623    }
624
625    /// Test helper: an UnsizedCacheKey for trait object values.
626    struct TestUnsizedKey<T: 'static + ?Sized> {
627        key: String,
628        _phantom: PhantomData<T>,
629    }
630
631    impl<T: 'static + ?Sized> TestUnsizedKey<T> {
632        fn new(key: &str) -> Self {
633            Self {
634                key: key.to_string(),
635                _phantom: PhantomData,
636            }
637        }
638    }
639
640    impl<T: 'static + ?Sized> UnsizedCacheKey for TestUnsizedKey<T> {
641        type ValueType = T;
642        fn key(&self) -> std::borrow::Cow<'_, str> {
643            std::borrow::Cow::Borrowed(&self.key)
644        }
645        fn type_name() -> &'static str {
646            std::any::type_name::<T>()
647        }
648    }
649
650    fn key_fields(keys: &[InternalCacheKey]) -> BTreeSet<(String, String, &'static str)> {
651        keys.iter()
652            .map(|key| {
653                (
654                    key.prefix().to_string(),
655                    key.key().to_string(),
656                    key.type_name(),
657                )
658            })
659            .collect()
660    }
661
662    #[tokio::test]
663    async fn test_cache_bytes() {
664        let item = Arc::new(vec![1, 2, 3]);
665        let item_size = item.deep_size_of();
666        let capacity = 10 * item_size;
667        let cache = LanceCache::with_capacity(capacity);
668
669        cache
670            .insert_with_key(&TestKey::<Vec<i32>>::new("key"), item.clone())
671            .await;
672        assert_eq!(cache.size().await, 1);
673
674        let retrieved = cache
675            .get_with_key(&TestKey::<Vec<i32>>::new("key"))
676            .await
677            .unwrap();
678        assert_eq!(*retrieved, *item);
679
680        for i in 0..20 {
681            cache
682                .insert_with_key(
683                    &TestKey::<Vec<i32>>::new(&format!("key_{}", i)),
684                    Arc::new(vec![i, i, i]),
685                )
686                .await;
687        }
688        assert!(cache.size_bytes().await <= capacity);
689    }
690
691    #[tokio::test]
692    async fn test_cache_trait_objects() {
693        #[derive(Debug, DeepSizeOf)]
694        struct MyType(i32);
695
696        trait MyTrait: DeepSizeOf + Send + Sync + std::any::Any {
697            fn as_any(&self) -> &dyn std::any::Any;
698        }
699
700        impl MyTrait for MyType {
701            fn as_any(&self) -> &dyn std::any::Any {
702                self
703            }
704        }
705
706        let item: Arc<dyn MyTrait> = Arc::new(MyType(42));
707        let cache = LanceCache::with_capacity(1000);
708        cache
709            .insert_unsized_with_key(&TestUnsizedKey::<dyn MyTrait>::new("test"), item)
710            .await;
711
712        let retrieved = cache
713            .get_unsized_with_key(&TestUnsizedKey::<dyn MyTrait>::new("test"))
714            .await
715            .unwrap();
716        assert_eq!(retrieved.as_any().downcast_ref::<MyType>().unwrap().0, 42);
717    }
718
719    #[tokio::test]
720    async fn test_cache_stats_basic() {
721        let cache = LanceCache::with_capacity(1000);
722        assert_eq!(cache.stats().await.hits, 0);
723
724        // Miss
725        assert!(
726            cache
727                .get_with_key(&TestKey::<Vec<i32>>::new("x"))
728                .await
729                .is_none()
730        );
731        assert_eq!(cache.stats().await.misses, 1);
732
733        // Insert then hit
734        cache
735            .insert_with_key(&TestKey::new("k"), Arc::new(vec![1, 2, 3]))
736            .await;
737        assert!(
738            cache
739                .get_with_key(&TestKey::<Vec<i32>>::new("k"))
740                .await
741                .is_some()
742        );
743        assert_eq!(cache.stats().await.hits, 1);
744    }
745
746    #[tokio::test]
747    async fn test_cache_stats_with_prefixes() {
748        let base = LanceCache::with_capacity(1000);
749        let prefixed = base.with_key_prefix("ns");
750
751        assert!(
752            prefixed
753                .get_with_key(&TestKey::<Vec<i32>>::new("k"))
754                .await
755                .is_none()
756        );
757        assert_eq!(base.stats().await.misses, 1);
758
759        prefixed
760            .insert_with_key(&TestKey::new("k"), Arc::new(vec![1]))
761            .await;
762        assert!(
763            prefixed
764                .get_with_key(&TestKey::<Vec<i32>>::new("k"))
765                .await
766                .is_some()
767        );
768        assert_eq!(base.stats().await.hits, 1);
769    }
770
771    #[tokio::test]
772    async fn test_cache_keys_with_prefixes() {
773        let base = LanceCache::with_capacity(1000);
774        let prefixed = base.with_key_prefix("ns");
775        let nested = prefixed.with_key_prefix("index");
776        let other = base.with_key_prefix("ns-other");
777
778        base.insert_with_key(&TestKey::new("root"), Arc::new(vec![0]))
779            .await;
780        prefixed
781            .insert_with_key(&TestKey::new("child"), Arc::new(vec![1]))
782            .await;
783        nested
784            .insert_with_key(&TestKey::new("nested"), Arc::new(vec![2]))
785            .await;
786        other
787            .insert_with_key(&TestKey::new("other"), Arc::new(vec![3]))
788            .await;
789
790        let base_keys = base.keys().await.unwrap().collect::<Vec<_>>();
791        assert_eq!(
792            key_fields(&base_keys),
793            BTreeSet::from([
794                (
795                    "".to_string(),
796                    "root".to_string(),
797                    TestKey::<Vec<i32>>::type_name()
798                ),
799                (
800                    "ns/".to_string(),
801                    "child".to_string(),
802                    TestKey::<Vec<i32>>::type_name()
803                ),
804                (
805                    "ns/index/".to_string(),
806                    "nested".to_string(),
807                    TestKey::<Vec<i32>>::type_name()
808                ),
809                (
810                    "ns-other/".to_string(),
811                    "other".to_string(),
812                    TestKey::<Vec<i32>>::type_name()
813                ),
814            ])
815        );
816
817        let prefixed_keys = prefixed.keys().await.unwrap().collect::<Vec<_>>();
818        assert_eq!(
819            key_fields(&prefixed_keys),
820            BTreeSet::from([
821                (
822                    "ns/".to_string(),
823                    "child".to_string(),
824                    TestKey::<Vec<i32>>::type_name()
825                ),
826                (
827                    "ns/index/".to_string(),
828                    "nested".to_string(),
829                    TestKey::<Vec<i32>>::type_name()
830                ),
831            ])
832        );
833    }
834
835    #[tokio::test]
836    async fn test_cache_keys_reflect_invalidation_and_clear() {
837        let base = LanceCache::with_capacity(1000);
838        let prefixed = base.with_key_prefix("ns");
839        let other = base.with_key_prefix("other");
840
841        prefixed
842            .insert_with_key(&TestKey::new("child"), Arc::new(vec![1]))
843            .await;
844        other
845            .insert_with_key(&TestKey::new("other"), Arc::new(vec![2]))
846            .await;
847        assert_eq!(base.keys().await.unwrap().count(), 2);
848
849        prefixed.invalidate_prefix("").await;
850        let keys = base.keys().await.unwrap().collect::<Vec<_>>();
851        assert_eq!(
852            key_fields(&keys),
853            BTreeSet::from([(
854                "other/".to_string(),
855                "other".to_string(),
856                TestKey::<Vec<i32>>::type_name()
857            )])
858        );
859
860        base.clear().await;
861        assert_eq!(base.keys().await.unwrap().count(), 0);
862    }
863
864    #[tokio::test]
865    async fn test_cache_get_or_insert() {
866        let cache = LanceCache::with_capacity(1000);
867
868        let v: Arc<Vec<i32>> = cache
869            .get_or_insert_with_key(TestKey::<Vec<i32>>::new("k"), || async {
870                Ok(vec![1, 2, 3])
871            })
872            .await
873            .unwrap();
874        assert_eq!(*v, vec![1, 2, 3]);
875        assert_eq!(cache.stats().await.misses, 1);
876        assert_eq!(cache.stats().await.hits, 0);
877
878        // Second call should not invoke loader and should be a hit
879        let v: Arc<Vec<i32>> = cache
880            .get_or_insert_with_key(TestKey::<Vec<i32>>::new("k"), || async {
881                panic!("should not be called")
882            })
883            .await
884            .unwrap();
885        assert_eq!(*v, vec![1, 2, 3]);
886        assert_eq!(cache.stats().await.hits, 1);
887    }
888
889    #[tokio::test]
890    async fn test_custom_backend() {
891        use async_trait::async_trait;
892        use tokio::sync::Mutex;
893
894        #[derive(Debug)]
895        struct HashMapBackend {
896            map: Mutex<HashMap<InternalCacheKey, (CacheEntry, usize)>>,
897        }
898
899        impl HashMapBackend {
900            fn new() -> Self {
901                Self {
902                    map: Mutex::new(HashMap::new()),
903                }
904            }
905        }
906
907        #[async_trait]
908        impl CacheBackend for HashMapBackend {
909            async fn get(
910                &self,
911                key: &InternalCacheKey,
912                _codec: Option<CacheCodec>,
913            ) -> Option<CacheEntry> {
914                self.map.lock().await.get(key).map(|(e, _)| e.clone())
915            }
916            async fn insert(
917                &self,
918                key: &InternalCacheKey,
919                entry: CacheEntry,
920                size_bytes: usize,
921                _codec: Option<CacheCodec>,
922            ) {
923                self.map
924                    .lock()
925                    .await
926                    .insert(key.clone(), (entry, size_bytes));
927            }
928            async fn get_or_insert<'a>(
929                &self,
930                key: &InternalCacheKey,
931                loader: std::pin::Pin<
932                    Box<dyn futures::Future<Output = Result<(CacheEntry, usize)>> + Send + 'a>,
933                >,
934                _codec: Option<CacheCodec>,
935            ) -> Result<(CacheEntry, bool)> {
936                if let Some((entry, _)) = self.map.lock().await.get(key) {
937                    Ok((entry.clone(), true))
938                } else {
939                    let (entry, size) = loader.await?;
940                    self.map
941                        .lock()
942                        .await
943                        .insert(key.clone(), (entry.clone(), size));
944                    Ok((entry, false))
945                }
946            }
947            async fn invalidate_prefix(&self, prefix: &str) {
948                self.map.lock().await.retain(|k, _| !k.starts_with(prefix));
949            }
950            async fn clear(&self) {
951                self.map.lock().await.clear();
952            }
953            async fn num_entries(&self) -> usize {
954                self.map.lock().await.len()
955            }
956            async fn size_bytes(&self) -> usize {
957                self.map.lock().await.values().map(|(_, s)| *s).sum()
958            }
959        }
960
961        let cache = LanceCache::with_backend(Arc::new(HashMapBackend::new()));
962
963        cache
964            .insert_with_key(&TestKey::new("k"), Arc::new(vec![1, 2, 3]))
965            .await;
966        assert!(
967            cache
968                .get_with_key(&TestKey::<Vec<i32>>::new("k"))
969                .await
970                .is_some()
971        );
972        // Different type at same key = miss
973        assert!(
974            cache
975                .get_with_key(&TestKey::<Vec<u8>>::new("k"))
976                .await
977                .is_none()
978        );
979        assert!(cache.keys().await.is_none());
980    }
981
982    #[tokio::test]
983    async fn test_get_or_insert_dedup() {
984        use std::sync::atomic::AtomicUsize;
985
986        let load_count = Arc::new(AtomicUsize::new(0));
987        let cache = LanceCache::with_capacity(10000);
988
989        let (barrier_tx, _) = tokio::sync::broadcast::channel::<()>(1);
990        let mut handles = Vec::new();
991        for _ in 0..5 {
992            let cache = cache.clone();
993            let load_count = load_count.clone();
994            let mut barrier_rx = barrier_tx.subscribe();
995            handles.push(tokio::spawn(async move {
996                barrier_rx.recv().await.ok();
997                cache
998                    .get_or_insert_with_key(TestKey::<Vec<i32>>::new("key"), || {
999                        let load_count = load_count.clone();
1000                        async move {
1001                            load_count.fetch_add(1, Ordering::SeqCst);
1002                            tokio::task::yield_now().await;
1003                            Ok(vec![1, 2, 3])
1004                        }
1005                    })
1006                    .await
1007            }));
1008        }
1009        barrier_tx.send(()).unwrap();
1010        for h in handles {
1011            let result: Arc<Vec<i32>> = h.await.unwrap().unwrap();
1012            assert_eq!(*result, vec![1, 2, 3]);
1013        }
1014
1015        assert_eq!(load_count.load(Ordering::SeqCst), 1);
1016    }
1017}