lance_core/
cache.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Cache implementation
5
6use std::any::{Any, TypeId};
7use std::borrow::Cow;
8use std::sync::{
9    atomic::{AtomicU64, Ordering},
10    Arc,
11};
12
13use futures::{Future, FutureExt};
14use moka::future::Cache;
15use snafu::location;
16
17use crate::Result;
18
19pub use deepsize::{Context, DeepSizeOf};
20
21type ArcAny = Arc<dyn Any + Send + Sync>;
22
23#[derive(Clone)]
24pub struct SizedRecord {
25    record: ArcAny,
26    size_accessor: Arc<dyn Fn(&ArcAny) -> usize + Send + Sync>,
27}
28
29impl std::fmt::Debug for SizedRecord {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        f.debug_struct("SizedRecord")
32            .field("record", &self.record)
33            .finish()
34    }
35}
36
37impl DeepSizeOf for SizedRecord {
38    fn deep_size_of_children(&self, _: &mut Context) -> usize {
39        (self.size_accessor)(&self.record)
40    }
41}
42
43impl SizedRecord {
44    fn new<T: DeepSizeOf + Send + Sync + 'static>(record: Arc<T>) -> Self {
45        // +8 for the size of the Arc pointer itself
46        let size_accessor =
47            |record: &ArcAny| -> usize { record.downcast_ref::<T>().unwrap().deep_size_of() + 8 };
48        Self {
49            record,
50            size_accessor: Arc::new(size_accessor),
51        }
52    }
53}
54
55#[derive(Clone)]
56pub struct LanceCache {
57    cache: Arc<Cache<(String, TypeId), SizedRecord>>,
58    prefix: String,
59    hits: Arc<AtomicU64>,
60    misses: Arc<AtomicU64>,
61}
62
63impl std::fmt::Debug for LanceCache {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        f.debug_struct("LanceCache")
66            .field("cache", &self.cache)
67            .finish()
68    }
69}
70
71impl DeepSizeOf for LanceCache {
72    fn deep_size_of_children(&self, _: &mut Context) -> usize {
73        self.cache
74            .iter()
75            .map(|(_, v)| (v.size_accessor)(&v.record))
76            .sum()
77    }
78}
79
80impl LanceCache {
81    pub fn with_capacity(capacity: usize) -> Self {
82        let cache = Cache::builder()
83            .max_capacity(capacity as u64)
84            .weigher(|_, v: &SizedRecord| {
85                (v.size_accessor)(&v.record).try_into().unwrap_or(u32::MAX)
86            })
87            .support_invalidation_closures()
88            .build();
89        Self {
90            cache: Arc::new(cache),
91            prefix: String::new(),
92            hits: Arc::new(AtomicU64::new(0)),
93            misses: Arc::new(AtomicU64::new(0)),
94        }
95    }
96
97    pub fn no_cache() -> Self {
98        Self {
99            cache: Arc::new(Cache::new(0)),
100            prefix: String::new(),
101            hits: Arc::new(AtomicU64::new(0)),
102            misses: Arc::new(AtomicU64::new(0)),
103        }
104    }
105
106    /// Appends a prefix to the cache key
107    ///
108    /// If this cache already has a prefix, the new prefix will be appended to
109    /// the existing one.
110    ///
111    /// Prefixes are used to create a namespace for the cache keys to avoid
112    /// collisions between different caches.
113    pub fn with_key_prefix(&self, prefix: &str) -> Self {
114        Self {
115            cache: self.cache.clone(),
116            prefix: format!("{}{}/", self.prefix, prefix),
117            hits: self.hits.clone(),
118            misses: self.misses.clone(),
119        }
120    }
121
122    fn get_key(&self, key: &str) -> String {
123        if self.prefix.is_empty() {
124            key.to_string()
125        } else {
126            format!("{}/{}", self.prefix, key)
127        }
128    }
129
130    /// Invalidate all entries in the cache that start with the given prefix
131    ///
132    /// The given prefix is appended to the existing prefix of the cache. If you
133    /// want to invalidate all at the current prefix, pass an empty string.
134    pub fn invalidate_prefix(&self, prefix: &str) {
135        let full_prefix = format!("{}{}", self.prefix, prefix);
136        self.cache
137            .invalidate_entries_if(move |(key, _typeid), _value| key.starts_with(&full_prefix))
138            .expect("Cache configured correctly");
139    }
140
141    pub async fn size(&self) -> usize {
142        self.cache.run_pending_tasks().await;
143        self.cache.entry_count() as usize
144    }
145
146    pub fn approx_size(&self) -> usize {
147        self.cache.entry_count() as usize
148    }
149
150    pub async fn size_bytes(&self) -> usize {
151        self.cache.run_pending_tasks().await;
152        self.approx_size_bytes()
153    }
154
155    pub fn approx_size_bytes(&self) -> usize {
156        self.cache.weighted_size() as usize
157    }
158
159    async fn insert<T: DeepSizeOf + Send + Sync + 'static>(&self, key: &str, metadata: Arc<T>) {
160        let key = self.get_key(key);
161        let record = SizedRecord::new(metadata);
162        tracing::trace!(
163            target: "lance_cache::insert",
164            key = key,
165            type_id = std::any::type_name::<T>(),
166            size = (record.size_accessor)(&record.record),
167        );
168        self.cache.insert((key, TypeId::of::<T>()), record).await;
169    }
170
171    pub async fn insert_unsized<T: DeepSizeOf + Send + Sync + 'static + ?Sized>(
172        &self,
173        key: &str,
174        metadata: Arc<T>,
175    ) {
176        // In order to make the data Sized, we wrap in another pointer.
177        self.insert(key, Arc::new(metadata)).await
178    }
179
180    async fn get<T: DeepSizeOf + Send + Sync + 'static>(&self, key: &str) -> Option<Arc<T>> {
181        let key = self.get_key(key);
182        if let Some(metadata) = self.cache.get(&(key, TypeId::of::<T>())).await {
183            self.hits.fetch_add(1, Ordering::Relaxed);
184            Some(metadata.record.clone().downcast::<T>().unwrap())
185        } else {
186            self.misses.fetch_add(1, Ordering::Relaxed);
187            None
188        }
189    }
190
191    pub async fn get_unsized<T: DeepSizeOf + Send + Sync + 'static + ?Sized>(
192        &self,
193        key: &str,
194    ) -> Option<Arc<T>> {
195        let outer = self.get::<Arc<T>>(key).await?;
196        Some(outer.as_ref().clone())
197    }
198
199    /// Get an item
200    ///
201    /// If it exists in the cache return that
202    ///
203    /// If it doesn't then run `loader` to load the item, insert into cache, and return
204    async fn get_or_insert<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
205        &self,
206        key: String,
207        loader: F,
208    ) -> Result<Arc<T>>
209    where
210        F: FnOnce(&str) -> Fut,
211        Fut: Future<Output = Result<T>> + Send,
212    {
213        let full_key = self.get_key(&key);
214        let cache_key = (full_key, TypeId::of::<T>());
215
216        // Use optionally_get_with to handle concurrent requests
217        let hits = self.hits.clone();
218        let misses = self.misses.clone();
219
220        // Use oneshot channels to track both errors and whether init was run
221        let (error_tx, error_rx) = tokio::sync::oneshot::channel();
222        let (init_run_tx, mut init_run_rx) = tokio::sync::oneshot::channel();
223
224        let init = Box::pin(async move {
225            let _ = init_run_tx.send(());
226            misses.fetch_add(1, Ordering::Relaxed);
227            match loader(&key).await {
228                Ok(value) => Some(SizedRecord::new(Arc::new(value))),
229                Err(e) => {
230                    let _ = error_tx.send(e);
231                    None
232                }
233            }
234        });
235
236        match self.cache.optionally_get_with(cache_key, init).await {
237            Some(metadata) => {
238                // Check if init was run or if this was a cache hit
239                match init_run_rx.try_recv() {
240                    Ok(()) => {
241                        // Init was run, miss was already recorded
242                    }
243                    Err(_) => {
244                        // Init was not run, this is a cache hit
245                        hits.fetch_add(1, Ordering::Relaxed);
246                    }
247                }
248                Ok(metadata.record.clone().downcast::<T>().unwrap())
249            }
250            None => {
251                // The loader returned an error, retrieve it from the channel
252                match error_rx.await {
253                    Ok(err) => Err(err),
254                    Err(_) => Err(crate::Error::Internal {
255                        message: "Failed to retrieve error from cache loader".into(),
256                        location: location!(),
257                    }),
258                }
259            }
260        }
261    }
262
263    pub async fn stats(&self) -> CacheStats {
264        self.cache.run_pending_tasks().await;
265        CacheStats {
266            hits: self.hits.load(Ordering::Relaxed),
267            misses: self.misses.load(Ordering::Relaxed),
268            num_entries: self.cache.entry_count() as usize,
269            size_bytes: self.cache.weighted_size() as usize,
270        }
271    }
272
273    pub async fn clear(&self) {
274        self.cache.invalidate_all();
275        self.cache.run_pending_tasks().await;
276        self.hits.store(0, Ordering::Relaxed);
277        self.misses.store(0, Ordering::Relaxed);
278    }
279
280    // CacheKey-based methods
281    pub async fn insert_with_key<K>(&self, cache_key: &K, metadata: Arc<K::ValueType>)
282    where
283        K: CacheKey,
284        K::ValueType: DeepSizeOf + Send + Sync + 'static,
285    {
286        self.insert(&cache_key.key(), metadata).boxed().await
287    }
288
289    pub async fn get_with_key<K>(&self, cache_key: &K) -> Option<Arc<K::ValueType>>
290    where
291        K: CacheKey,
292        K::ValueType: DeepSizeOf + Send + Sync + 'static,
293    {
294        self.get::<K::ValueType>(&cache_key.key()).boxed().await
295    }
296
297    pub async fn get_or_insert_with_key<K, F, Fut>(
298        &self,
299        cache_key: K,
300        loader: F,
301    ) -> Result<Arc<K::ValueType>>
302    where
303        K: CacheKey,
304        K::ValueType: DeepSizeOf + Send + Sync + 'static,
305        F: FnOnce() -> Fut,
306        Fut: Future<Output = Result<K::ValueType>> + Send,
307    {
308        let key_str = cache_key.key().into_owned();
309        Box::pin(self.get_or_insert(key_str, |_| loader())).await
310    }
311
312    pub async fn insert_unsized_with_key<K>(&self, cache_key: &K, metadata: Arc<K::ValueType>)
313    where
314        K: UnsizedCacheKey,
315        K::ValueType: DeepSizeOf + Send + Sync + 'static,
316    {
317        self.insert_unsized(&cache_key.key(), metadata)
318            .boxed()
319            .await
320    }
321
322    pub async fn get_unsized_with_key<K>(&self, cache_key: &K) -> Option<Arc<K::ValueType>>
323    where
324        K: UnsizedCacheKey,
325        K::ValueType: DeepSizeOf + Send + Sync + 'static,
326    {
327        self.get_unsized::<K::ValueType>(&cache_key.key())
328            .boxed()
329            .await
330    }
331}
332
333/// A weak reference to a LanceCache, used by indices to avoid circular references.
334/// When the original cache is dropped, operations on this will gracefully no-op.
335#[derive(Clone, Debug)]
336pub struct WeakLanceCache {
337    inner: std::sync::Weak<Cache<(String, TypeId), SizedRecord>>,
338    prefix: String,
339    hits: Arc<AtomicU64>,
340    misses: Arc<AtomicU64>,
341}
342
343impl WeakLanceCache {
344    /// Create a weak reference from a strong LanceCache
345    pub fn from(cache: &LanceCache) -> Self {
346        Self {
347            inner: Arc::downgrade(&cache.cache),
348            prefix: cache.prefix.clone(),
349            hits: cache.hits.clone(),
350            misses: cache.misses.clone(),
351        }
352    }
353
354    /// Appends a prefix to the cache key
355    pub fn with_key_prefix(&self, prefix: &str) -> Self {
356        Self {
357            inner: self.inner.clone(),
358            prefix: format!("{}{}/", self.prefix, prefix),
359            hits: self.hits.clone(),
360            misses: self.misses.clone(),
361        }
362    }
363
364    fn get_key(&self, key: &str) -> String {
365        if self.prefix.is_empty() {
366            key.to_string()
367        } else {
368            format!("{}/{}", self.prefix, key)
369        }
370    }
371
372    /// Get an item from cache if the cache is still alive
373    pub async fn get<T: DeepSizeOf + Send + Sync + 'static>(&self, key: &str) -> Option<Arc<T>> {
374        let cache = self.inner.upgrade()?;
375        let key = self.get_key(key);
376        if let Some(metadata) = cache.get(&(key, TypeId::of::<T>())).await {
377            self.hits.fetch_add(1, Ordering::Relaxed);
378            Some(metadata.record.clone().downcast::<T>().unwrap())
379        } else {
380            self.misses.fetch_add(1, Ordering::Relaxed);
381            None
382        }
383    }
384
385    /// Insert an item if the cache is still alive
386    /// Returns true if the item was inserted, false if the cache is no longer available
387    pub async fn insert<T: DeepSizeOf + Send + Sync + 'static>(
388        &self,
389        key: &str,
390        value: Arc<T>,
391    ) -> bool {
392        if let Some(cache) = self.inner.upgrade() {
393            let key = self.get_key(key);
394            let record = SizedRecord::new(value);
395            cache.insert((key, TypeId::of::<T>()), record).await;
396            true
397        } else {
398            log::warn!("WeakLanceCache: cache no longer available, unable to insert item");
399            false
400        }
401    }
402
403    /// Get or insert an item, computing it if necessary
404    pub async fn get_or_insert<T, F, Fut>(&self, key: &str, f: F) -> Result<Arc<T>>
405    where
406        T: DeepSizeOf + Send + Sync + 'static,
407        F: FnOnce() -> Fut,
408        Fut: Future<Output = Result<T>> + Send,
409    {
410        if let Some(cache) = self.inner.upgrade() {
411            let full_key = self.get_key(key);
412            let cache_key = (full_key.clone(), TypeId::of::<T>());
413
414            // Use optionally_get_with to handle concurrent requests properly
415            let hits = self.hits.clone();
416            let misses = self.misses.clone();
417
418            // Track whether init was run (for metrics)
419            let (init_run_tx, mut init_run_rx) = tokio::sync::oneshot::channel();
420            let (error_tx, error_rx) = tokio::sync::oneshot::channel();
421
422            let init = Box::pin(async move {
423                let _ = init_run_tx.send(());
424                misses.fetch_add(1, Ordering::Relaxed);
425                match f().await {
426                    Ok(value) => Some(SizedRecord::new(Arc::new(value))),
427                    Err(e) => {
428                        let _ = error_tx.send(e);
429                        None
430                    }
431                }
432            });
433
434            match cache.optionally_get_with(cache_key, init).await {
435                Some(record) => {
436                    // Check if init was run or if this was a cache hit
437                    match init_run_rx.try_recv() {
438                        Ok(()) => {
439                            // Init was run, miss was already recorded
440                        }
441                        Err(_) => {
442                            // Init was not run, this was a cache hit
443                            hits.fetch_add(1, Ordering::Relaxed);
444                        }
445                    }
446                    Ok(record.record.clone().downcast::<T>().unwrap())
447                }
448                None => {
449                    // Init returned None, which means there was an error
450                    match error_rx.await {
451                        Ok(e) => Err(e),
452                        Err(_) => Err(crate::Error::Internal {
453                            message: "Failed to receive error from cache init function".to_string(),
454                            location: location!(),
455                        }),
456                    }
457                }
458            }
459        } else {
460            log::warn!("WeakLanceCache: cache no longer available, computing without caching");
461            f().await.map(Arc::new)
462        }
463    }
464
465    /// Get or insert an item with a cache key type
466    pub async fn get_or_insert_with_key<K, F, Fut>(
467        &self,
468        cache_key: K,
469        loader: F,
470    ) -> Result<Arc<K::ValueType>>
471    where
472        K: CacheKey,
473        K::ValueType: DeepSizeOf + Send + Sync + 'static,
474        F: FnOnce() -> Fut,
475        Fut: Future<Output = Result<K::ValueType>> + Send,
476    {
477        let key_str = cache_key.key().into_owned();
478        self.get_or_insert(&key_str, loader).await
479    }
480
481    /// Insert with a cache key type
482    /// Returns true if the item was inserted, false if the cache is no longer available
483    pub async fn insert_with_key<K>(&self, cache_key: &K, value: Arc<K::ValueType>) -> bool
484    where
485        K: CacheKey,
486        K::ValueType: DeepSizeOf + Send + Sync + 'static,
487    {
488        let key_str = cache_key.key().into_owned();
489        self.insert(&key_str, value).await
490    }
491
492    /// Get with a cache key type
493    pub async fn get_with_key<K>(&self, cache_key: &K) -> Option<Arc<K::ValueType>>
494    where
495        K: CacheKey,
496        K::ValueType: DeepSizeOf + Send + Sync + 'static,
497    {
498        let key_str = cache_key.key().into_owned();
499        self.get(&key_str).await
500    }
501
502    /// Get unsized item from cache
503    pub async fn get_unsized<T: DeepSizeOf + Send + Sync + 'static + ?Sized>(
504        &self,
505        key: &str,
506    ) -> Option<Arc<T>> {
507        // For unsized types, we store Arc<T> directly
508        let cache = self.inner.upgrade()?;
509        let key = self.get_key(key);
510        if let Some(metadata) = cache.get(&(key, TypeId::of::<Arc<T>>())).await {
511            metadata
512                .record
513                .clone()
514                .downcast::<Arc<T>>()
515                .ok()
516                .map(|arc| arc.as_ref().clone())
517        } else {
518            None
519        }
520    }
521
522    /// Insert unsized item into cache
523    pub async fn insert_unsized<T: DeepSizeOf + Send + Sync + 'static + ?Sized>(
524        &self,
525        key: &str,
526        value: Arc<T>,
527    ) {
528        if let Some(cache) = self.inner.upgrade() {
529            let key = self.get_key(key);
530            let record = SizedRecord::new(Arc::new(value));
531            cache.insert((key, TypeId::of::<Arc<T>>()), record).await;
532        } else {
533            log::warn!("WeakLanceCache: cache no longer available, unable to insert unsized item");
534        }
535    }
536
537    /// Get unsized with a cache key type
538    pub async fn get_unsized_with_key<K>(&self, cache_key: &K) -> Option<Arc<K::ValueType>>
539    where
540        K: UnsizedCacheKey,
541        K::ValueType: DeepSizeOf + Send + Sync + 'static,
542    {
543        let key_str = cache_key.key();
544        self.get_unsized(&key_str).await
545    }
546
547    /// Insert unsized with a cache key type
548    pub async fn insert_unsized_with_key<K>(&self, cache_key: &K, value: Arc<K::ValueType>)
549    where
550        K: UnsizedCacheKey,
551        K::ValueType: DeepSizeOf + Send + Sync + 'static,
552    {
553        let key_str = cache_key.key();
554        self.insert_unsized(&key_str, value).await
555    }
556}
557
558pub trait CacheKey {
559    type ValueType;
560
561    fn key(&self) -> Cow<'_, str>;
562}
563
564pub trait UnsizedCacheKey {
565    type ValueType: ?Sized;
566
567    fn key(&self) -> Cow<'_, str>;
568}
569
570#[derive(Debug, Clone)]
571pub struct CacheStats {
572    /// Number of times `get`, `get_unsized`, or `get_or_insert` found an item in the cache.
573    pub hits: u64,
574    /// Number of times `get`, `get_unsized`, or `get_or_insert` did not find an item in the cache.
575    pub misses: u64,
576    /// Number of entries currently in the cache.
577    pub num_entries: usize,
578    /// Total size in bytes of all entries in the cache.
579    pub size_bytes: usize,
580}
581
582impl CacheStats {
583    pub fn hit_ratio(&self) -> f32 {
584        if self.hits + self.misses == 0 {
585            0.0
586        } else {
587            self.hits as f32 / (self.hits + self.misses) as f32
588        }
589    }
590
591    pub fn miss_ratio(&self) -> f32 {
592        if self.hits + self.misses == 0 {
593            0.0
594        } else {
595            self.misses as f32 / (self.hits + self.misses) as f32
596        }
597    }
598}
599
600#[cfg(test)]
601mod tests {
602    use super::*;
603
604    #[tokio::test]
605    async fn test_cache_bytes() {
606        let item = Arc::new(vec![1, 2, 3]);
607        let item_size = item.deep_size_of(); // Size of Arc<Vec<i32>>
608        let capacity = 10 * item_size;
609
610        let cache = LanceCache::with_capacity(capacity);
611        assert_eq!(cache.size_bytes().await, 0);
612        assert_eq!(cache.approx_size_bytes(), 0);
613
614        let item = Arc::new(vec![1, 2, 3]);
615        cache.insert("key", item.clone()).await;
616        assert_eq!(cache.size().await, 1);
617        assert_eq!(cache.size_bytes().await, item_size);
618        assert_eq!(cache.approx_size_bytes(), item_size);
619
620        let retrieved = cache.get::<Vec<i32>>("key").await.unwrap();
621        assert_eq!(*retrieved, *item);
622
623        // Test eviction based on size
624        for i in 0..20 {
625            cache
626                .insert(&format!("key_{}", i), Arc::new(vec![i, i, i]))
627                .await;
628        }
629        assert_eq!(cache.size_bytes().await, capacity);
630        assert_eq!(cache.size().await, 10);
631    }
632
633    #[tokio::test]
634    async fn test_cache_trait_objects() {
635        #[derive(Debug, DeepSizeOf)]
636        struct MyType(i32);
637
638        trait MyTrait: DeepSizeOf + Send + Sync + Any {
639            fn as_any(&self) -> &dyn Any;
640        }
641
642        impl MyTrait for MyType {
643            fn as_any(&self) -> &dyn Any {
644                self
645            }
646        }
647
648        let item = Arc::new(MyType(42));
649        let item_dyn: Arc<dyn MyTrait> = item;
650
651        let cache = LanceCache::with_capacity(1000);
652        cache.insert_unsized("test", item_dyn).await;
653
654        let retrieved = cache.get_unsized::<dyn MyTrait>("test").await.unwrap();
655        let retrieved = retrieved.as_any().downcast_ref::<MyType>().unwrap();
656        assert_eq!(retrieved.0, 42);
657    }
658
659    #[tokio::test]
660    async fn test_cache_stats_basic() {
661        let cache = LanceCache::with_capacity(1000);
662
663        // Initially no hits or misses
664        let stats = cache.stats().await;
665        assert_eq!(stats.hits, 0);
666        assert_eq!(stats.misses, 0);
667
668        // Miss on first get
669        let result = cache.get::<Vec<i32>>("nonexistent");
670        assert!(result.await.is_none());
671        let stats = cache.stats().await;
672        assert_eq!(stats.hits, 0);
673        assert_eq!(stats.misses, 1);
674
675        // Insert and then hit
676        cache.insert("key1", Arc::new(vec![1, 2, 3])).await;
677        let result = cache.get::<Vec<i32>>("key1");
678        assert!(result.await.is_some());
679        let stats = cache.stats().await;
680        assert_eq!(stats.hits, 1);
681        assert_eq!(stats.misses, 1);
682
683        // Another hit
684        let result = cache.get::<Vec<i32>>("key1");
685        assert!(result.await.is_some());
686        let stats = cache.stats().await;
687        assert_eq!(stats.hits, 2);
688        assert_eq!(stats.misses, 1);
689
690        // Another miss
691        let result = cache.get::<Vec<i32>>("nonexistent2");
692        assert!(result.await.is_none());
693        let stats = cache.stats().await;
694        assert_eq!(stats.hits, 2);
695        assert_eq!(stats.misses, 2);
696    }
697
698    #[tokio::test]
699    async fn test_cache_stats_with_prefixes() {
700        let base_cache = LanceCache::with_capacity(1000);
701        let prefixed_cache = base_cache.with_key_prefix("test");
702
703        // Stats should be shared between base and prefixed cache
704        let stats = base_cache.stats().await;
705        assert_eq!(stats.hits, 0);
706        assert_eq!(stats.misses, 0);
707
708        let stats = prefixed_cache.stats().await;
709        assert_eq!(stats.hits, 0);
710        assert_eq!(stats.misses, 0);
711
712        // Miss on prefixed cache
713        let result = prefixed_cache.get::<Vec<i32>>("key1");
714        assert!(result.await.is_none());
715
716        // Both should show the miss
717        let stats = base_cache.stats().await;
718        assert_eq!(stats.hits, 0);
719        assert_eq!(stats.misses, 1);
720
721        let stats = prefixed_cache.stats().await;
722        assert_eq!(stats.hits, 0);
723        assert_eq!(stats.misses, 1);
724
725        // Insert through prefixed cache and hit
726        prefixed_cache.insert("key1", Arc::new(vec![1, 2, 3])).await;
727        let result = prefixed_cache.get::<Vec<i32>>("key1");
728        assert!(result.await.is_some());
729
730        // Both should show the hit
731        let stats = base_cache.stats().await;
732        assert_eq!(stats.hits, 1);
733        assert_eq!(stats.misses, 1);
734
735        let stats = prefixed_cache.stats().await;
736        assert_eq!(stats.hits, 1);
737        assert_eq!(stats.misses, 1);
738    }
739
740    #[tokio::test]
741    async fn test_cache_stats_unsized() {
742        #[derive(Debug, DeepSizeOf)]
743        struct MyType(i32);
744
745        trait MyTrait: DeepSizeOf + Send + Sync + Any {}
746
747        impl MyTrait for MyType {}
748
749        let cache = LanceCache::with_capacity(1000);
750
751        // Miss on unsized get
752        let result = cache.get_unsized::<dyn MyTrait>("test");
753        assert!(result.await.is_none());
754        let stats = cache.stats().await;
755        assert_eq!(stats.hits, 0);
756        assert_eq!(stats.misses, 1);
757
758        // Insert and hit on unsized
759        let item = Arc::new(MyType(42));
760        let item_dyn: Arc<dyn MyTrait> = item;
761        cache.insert_unsized("test", item_dyn).await;
762
763        let result = cache.get_unsized::<dyn MyTrait>("test");
764        assert!(result.await.is_some());
765        let stats = cache.stats().await;
766        assert_eq!(stats.hits, 1);
767        assert_eq!(stats.misses, 1);
768    }
769
770    #[tokio::test]
771    async fn test_cache_stats_get_or_insert() {
772        let cache = LanceCache::with_capacity(1000);
773
774        // First call should be a miss and load the value
775        let result: Arc<Vec<i32>> = cache
776            .get_or_insert("key1".to_string(), |_key| async { Ok(vec![1, 2, 3]) })
777            .await
778            .unwrap();
779        assert_eq!(*result, vec![1, 2, 3]);
780
781        let stats = cache.stats().await;
782        assert_eq!(stats.hits, 0);
783        assert_eq!(stats.misses, 1);
784
785        // Second call should be a hit
786        let result: Arc<Vec<i32>> = cache
787            .get_or_insert("key1".to_string(), |_key| async {
788                panic!("Should not be called")
789            })
790            .await
791            .unwrap();
792        assert_eq!(*result, vec![1, 2, 3]);
793
794        let stats = cache.stats().await;
795        assert_eq!(stats.hits, 1);
796        assert_eq!(stats.misses, 1);
797
798        // Different key should be another miss
799        let result: Arc<Vec<i32>> = cache
800            .get_or_insert("key2".to_string(), |_key| async { Ok(vec![4, 5, 6]) })
801            .await
802            .unwrap();
803        assert_eq!(*result, vec![4, 5, 6]);
804
805        let stats = cache.stats().await;
806        assert_eq!(stats.hits, 1);
807        assert_eq!(stats.misses, 2);
808    }
809}