1pub mod backend;
49pub mod codec;
50mod entry_io;
51mod moka;
52
53pub use backend::{CacheBackend, CacheEntry, CacheKeyIterator, InternalCacheKey};
54pub use codec::{
55 CacheCodec, CacheCodecImpl, CacheDecode, CacheMissReason, MAGIC, has_cache_envelope,
56};
57pub use entry_io::{CacheEntryReader, CacheEntryWriter};
58pub use moka::MokaCacheBackend;
59
60use std::borrow::Cow;
61use std::sync::{
62 Arc,
63 atomic::{AtomicU64, Ordering},
64};
65
66use futures::{Future, FutureExt};
67
68use crate::Result;
69
70pub use crate::deepsize::{Context, DeepSizeOf};
71
72pub trait CacheKey {
94 type ValueType: 'static;
95
96 fn key(&self) -> Cow<'_, str>;
97
98 fn type_name() -> &'static str;
108
109 fn codec() -> Option<CacheCodec> {
118 None
119 }
120}
121
122pub trait UnsizedCacheKey {
130 type ValueType: 'static + ?Sized;
131
132 fn key(&self) -> Cow<'_, str>;
133
134 fn type_name() -> &'static str;
137}
138
139fn cache_entry_size<T: DeepSizeOf + ?Sized>(value: &T) -> usize {
145 value.deep_size_of() + std::mem::size_of::<std::sync::atomic::AtomicUsize>() * 2
146}
147
148fn build_key(prefix: &Arc<str>, key: &str, type_name: &'static str) -> InternalCacheKey {
151 InternalCacheKey::new(prefix.clone(), Arc::from(key), type_name)
152}
153
154#[derive(Clone)]
163pub struct LanceCache {
164 cache: Arc<dyn CacheBackend>,
165 prefix: Arc<str>,
166 hits: Arc<AtomicU64>,
167 misses: Arc<AtomicU64>,
168}
169
170impl std::fmt::Debug for LanceCache {
171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 f.debug_struct("LanceCache")
173 .field("cache", &self.cache)
174 .finish()
175 }
176}
177
178impl DeepSizeOf for LanceCache {
179 fn deep_size_of_children(&self, _: &mut Context) -> usize {
180 self.cache.approx_size_bytes()
181 }
182}
183
184impl LanceCache {
185 pub fn with_capacity(capacity: usize) -> Self {
186 Self {
187 cache: Arc::new(MokaCacheBackend::with_capacity(capacity)),
188 prefix: Arc::from(""),
189 hits: Arc::new(AtomicU64::new(0)),
190 misses: Arc::new(AtomicU64::new(0)),
191 }
192 }
193
194 pub fn with_backend(backend: Arc<dyn CacheBackend>) -> Self {
196 Self {
197 cache: backend,
198 prefix: Arc::from(""),
199 hits: Arc::new(AtomicU64::new(0)),
200 misses: Arc::new(AtomicU64::new(0)),
201 }
202 }
203
204 pub fn no_cache() -> Self {
205 Self {
206 cache: Arc::new(MokaCacheBackend::no_cache()),
207 prefix: Arc::from(""),
208 hits: Arc::new(AtomicU64::new(0)),
209 misses: Arc::new(AtomicU64::new(0)),
210 }
211 }
212
213 pub fn with_backend_and_prefix(backend: Arc<dyn CacheBackend>, prefix: String) -> Self {
216 Self {
217 cache: backend,
218 prefix: Arc::from(prefix),
219 hits: Arc::new(AtomicU64::new(0)),
220 misses: Arc::new(AtomicU64::new(0)),
221 }
222 }
223
224 pub fn with_key_prefix(&self, prefix: &str) -> Self {
226 Self {
227 cache: self.cache.clone(),
228 prefix: Arc::from(format!("{}{}/", self.prefix, prefix)),
229 hits: self.hits.clone(),
230 misses: self.misses.clone(),
231 }
232 }
233
234 pub async fn invalidate_prefix(&self, prefix: &str) {
236 let full_prefix = format!("{}{}", self.prefix, prefix);
237 self.cache.invalidate_prefix(&full_prefix).await;
238 }
239
240 pub async fn size(&self) -> usize {
241 self.cache.num_entries().await
242 }
243
244 pub fn approx_size(&self) -> usize {
245 self.cache.approx_num_entries()
246 }
247
248 pub async fn size_bytes(&self) -> usize {
249 self.cache.size_bytes().await
250 }
251
252 pub async fn keys(&self) -> Option<CacheKeyIterator<'_>> {
278 Some(Box::new(
279 self.cache
280 .keys()
281 .await?
282 .filter(|key| key.starts_with(&self.prefix)),
283 ))
284 }
285
286 async fn insert_with_id<T: DeepSizeOf + Send + Sync + 'static>(
289 &self,
290 key: &str,
291 type_name: &'static str,
292 codec: Option<CacheCodec>,
293 metadata: Arc<T>,
294 ) {
295 let size = cache_entry_size(&*metadata);
296 let cache_key = build_key(&self.prefix, key, type_name);
297 self.cache.insert(&cache_key, metadata, size, codec).await;
298 }
299
300 async fn get_with_id<T: Send + Sync + 'static>(
301 &self,
302 key: &str,
303 type_name: &'static str,
304 codec: Option<CacheCodec>,
305 ) -> Option<Arc<T>> {
306 let cache_key = build_key(&self.prefix, key, type_name);
307 if let Some(entry) = self.cache.get(&cache_key, codec).await {
308 match entry.downcast::<T>() {
309 Ok(val) => {
310 self.hits.fetch_add(1, Ordering::Relaxed);
311 Some(val)
312 }
313 Err(_) => {
314 self.misses.fetch_add(1, Ordering::Relaxed);
318 None
319 }
320 }
321 } else {
322 self.misses.fetch_add(1, Ordering::Relaxed);
323 None
324 }
325 }
326
327 pub async fn stats(&self) -> CacheStats {
330 CacheStats {
331 hits: self.hits.load(Ordering::Relaxed),
332 misses: self.misses.load(Ordering::Relaxed),
333 num_entries: self.cache.num_entries().await,
334 size_bytes: self.cache.size_bytes().await,
335 }
336 }
337
338 pub async fn clear(&self) {
339 self.cache.clear().await;
340 self.hits.store(0, Ordering::Relaxed);
341 self.misses.store(0, Ordering::Relaxed);
342 }
343
344 pub async fn insert_with_key<K>(&self, cache_key: &K, metadata: Arc<K::ValueType>)
347 where
348 K: CacheKey,
349 K::ValueType: DeepSizeOf + Send + Sync + 'static,
350 {
351 self.insert_with_id(&cache_key.key(), K::type_name(), K::codec(), metadata)
352 .boxed()
353 .await
354 }
355
356 pub async fn get_with_key<K>(&self, cache_key: &K) -> Option<Arc<K::ValueType>>
357 where
358 K: CacheKey,
359 K::ValueType: DeepSizeOf + Send + Sync + 'static,
360 {
361 self.get_with_id::<K::ValueType>(&cache_key.key(), K::type_name(), K::codec())
362 .boxed()
363 .await
364 }
365
366 pub async fn get_or_insert_with_key<K, F, Fut>(
367 &self,
368 cache_key: K,
369 loader: F,
370 ) -> Result<Arc<K::ValueType>>
371 where
372 K: CacheKey,
373 K::ValueType: DeepSizeOf + Send + Sync + 'static,
374 F: FnOnce() -> Fut + Send,
375 Fut: Future<Output = Result<K::ValueType>> + Send,
376 {
377 let key = build_key(&self.prefix, &cache_key.key(), K::type_name());
378
379 let typed_loader = Box::pin(async move {
380 let value = loader().await?;
381 let arc = Arc::new(value);
382 let size = cache_entry_size(&*arc);
383 Ok((arc as CacheEntry, size))
384 });
385
386 let (entry, was_cached) = self
387 .cache
388 .get_or_insert(&key, typed_loader, K::codec())
389 .await?;
390
391 if was_cached {
392 self.hits.fetch_add(1, Ordering::Relaxed);
393 } else {
394 self.misses.fetch_add(1, Ordering::Relaxed);
395 }
396
397 Ok(entry.downcast::<K::ValueType>().unwrap())
398 }
399
400 pub async fn insert_unsized_with_key<K>(&self, cache_key: &K, metadata: Arc<K::ValueType>)
401 where
402 K: UnsizedCacheKey,
403 K::ValueType: DeepSizeOf + Send + Sync + 'static,
404 {
405 self.insert_with_id(&cache_key.key(), K::type_name(), None, Arc::new(metadata))
406 .boxed()
407 .await
408 }
409
410 pub async fn get_unsized_with_key<K>(&self, cache_key: &K) -> Option<Arc<K::ValueType>>
411 where
412 K: UnsizedCacheKey,
413 K::ValueType: DeepSizeOf + Send + Sync + 'static,
414 {
415 let outer = self
416 .get_with_id::<Arc<K::ValueType>>(&cache_key.key(), K::type_name(), None)
417 .boxed()
418 .await?;
419 Some(outer.as_ref().clone())
420 }
421}
422
423#[derive(Clone, Debug)]
430pub struct WeakLanceCache {
431 inner: std::sync::Weak<dyn CacheBackend>,
432 prefix: Arc<str>,
433 hits: Arc<AtomicU64>,
434 misses: Arc<AtomicU64>,
435}
436
437impl WeakLanceCache {
438 pub fn from(cache: &LanceCache) -> Self {
439 Self {
440 inner: Arc::downgrade(&cache.cache),
441 prefix: cache.prefix.clone(),
442 hits: cache.hits.clone(),
443 misses: cache.misses.clone(),
444 }
445 }
446
447 pub fn with_key_prefix(&self, prefix: &str) -> Self {
448 Self {
449 inner: self.inner.clone(),
450 prefix: Arc::from(format!("{}{}/", self.prefix, prefix)),
451 hits: self.hits.clone(),
452 misses: self.misses.clone(),
453 }
454 }
455
456 pub fn prefix(&self) -> &str {
458 &self.prefix
459 }
460
461 pub async fn get_with_key<K>(&self, cache_key: &K) -> Option<Arc<K::ValueType>>
462 where
463 K: CacheKey,
464 K::ValueType: DeepSizeOf + Send + Sync + 'static,
465 {
466 let cache = self.inner.upgrade()?;
467 let key = build_key(&self.prefix, &cache_key.key(), K::type_name());
468 if let Some(entry) = cache.get(&key, K::codec()).await {
469 self.hits.fetch_add(1, Ordering::Relaxed);
470 Some(entry.downcast::<K::ValueType>().unwrap())
471 } else {
472 self.misses.fetch_add(1, Ordering::Relaxed);
473 None
474 }
475 }
476
477 pub async fn insert_with_key<K>(&self, cache_key: &K, value: Arc<K::ValueType>) -> bool
478 where
479 K: CacheKey,
480 K::ValueType: DeepSizeOf + Send + Sync + 'static,
481 {
482 if let Some(cache) = self.inner.upgrade() {
483 let size = cache_entry_size(&*value);
484 let key = build_key(&self.prefix, &cache_key.key(), K::type_name());
485 cache.insert(&key, value, size, K::codec()).await;
486 true
487 } else {
488 log::warn!("WeakLanceCache: cache no longer available, unable to insert item");
489 false
490 }
491 }
492
493 pub async fn get_or_insert_with_key<K, F, Fut>(
497 &self,
498 cache_key: K,
499 loader: F,
500 ) -> Result<Arc<K::ValueType>>
501 where
502 K: CacheKey,
503 K::ValueType: DeepSizeOf + Send + Sync + 'static,
504 F: FnOnce() -> Fut + Send,
505 Fut: Future<Output = Result<K::ValueType>> + Send,
506 {
507 if let Some(cache) = self.inner.upgrade() {
508 let key = build_key(&self.prefix, &cache_key.key(), K::type_name());
509 let typed_loader = Box::pin(async move {
510 let value = loader().await?;
511 let arc = Arc::new(value);
512 let size = cache_entry_size(&*arc);
513 Ok((arc as CacheEntry, size))
514 });
515 let (entry, was_cached) = cache.get_or_insert(&key, typed_loader, K::codec()).await?;
516 if was_cached {
517 self.hits.fetch_add(1, Ordering::Relaxed);
518 } else {
519 self.misses.fetch_add(1, Ordering::Relaxed);
520 }
521 Ok(entry.downcast::<K::ValueType>().unwrap())
522 } else {
523 log::warn!("WeakLanceCache: cache no longer available, computing without caching");
524 loader().await.map(Arc::new)
525 }
526 }
527
528 pub async fn get_unsized_with_key<K>(&self, cache_key: &K) -> Option<Arc<K::ValueType>>
529 where
530 K: UnsizedCacheKey,
531 K::ValueType: DeepSizeOf + Send + Sync + 'static,
532 {
533 let cache = self.inner.upgrade()?;
534 let key = build_key(&self.prefix, &cache_key.key(), K::type_name());
535 if let Some(entry) = cache.get(&key, None).await {
536 entry
537 .downcast::<Arc<K::ValueType>>()
538 .ok()
539 .map(|arc| arc.as_ref().clone())
540 } else {
541 None
542 }
543 }
544
545 pub async fn insert_unsized_with_key<K>(&self, cache_key: &K, value: Arc<K::ValueType>)
546 where
547 K: UnsizedCacheKey,
548 K::ValueType: DeepSizeOf + Send + Sync + 'static,
549 {
550 if let Some(cache) = self.inner.upgrade() {
551 let wrapper = Arc::new(value);
552 let size = cache_entry_size(&*wrapper);
553 let key = build_key(&self.prefix, &cache_key.key(), K::type_name());
554 cache.insert(&key, wrapper, size, None).await;
555 } else {
556 log::warn!("WeakLanceCache: cache no longer available, unable to insert unsized item");
557 }
558 }
559}
560
561#[derive(Debug, Clone)]
566pub struct CacheStats {
567 pub hits: u64,
569 pub misses: u64,
571 pub num_entries: usize,
573 pub size_bytes: usize,
575}
576
577impl CacheStats {
578 pub fn hit_ratio(&self) -> f32 {
579 if self.hits + self.misses == 0 {
580 0.0
581 } else {
582 self.hits as f32 / (self.hits + self.misses) as f32
583 }
584 }
585
586 pub fn miss_ratio(&self) -> f32 {
587 if self.hits + self.misses == 0 {
588 0.0
589 } else {
590 self.misses as f32 / (self.hits + self.misses) as f32
591 }
592 }
593}
594
595#[cfg(test)]
596mod tests {
597 use super::*;
598 use std::collections::{BTreeSet, HashMap};
599 use std::marker::PhantomData;
600
601 struct TestKey<T: 'static> {
602 key: String,
603 _phantom: PhantomData<T>,
604 }
605
606 impl<T: 'static> TestKey<T> {
607 fn new(key: &str) -> Self {
608 Self {
609 key: key.to_string(),
610 _phantom: PhantomData,
611 }
612 }
613 }
614
615 impl<T: 'static> CacheKey for TestKey<T> {
616 type ValueType = T;
617 fn key(&self) -> std::borrow::Cow<'_, str> {
618 std::borrow::Cow::Borrowed(&self.key)
619 }
620 fn type_name() -> &'static str {
621 std::any::type_name::<T>()
622 }
623 }
624
625 struct TestUnsizedKey<T: 'static + ?Sized> {
627 key: String,
628 _phantom: PhantomData<T>,
629 }
630
631 impl<T: 'static + ?Sized> TestUnsizedKey<T> {
632 fn new(key: &str) -> Self {
633 Self {
634 key: key.to_string(),
635 _phantom: PhantomData,
636 }
637 }
638 }
639
640 impl<T: 'static + ?Sized> UnsizedCacheKey for TestUnsizedKey<T> {
641 type ValueType = T;
642 fn key(&self) -> std::borrow::Cow<'_, str> {
643 std::borrow::Cow::Borrowed(&self.key)
644 }
645 fn type_name() -> &'static str {
646 std::any::type_name::<T>()
647 }
648 }
649
650 fn key_fields(keys: &[InternalCacheKey]) -> BTreeSet<(String, String, &'static str)> {
651 keys.iter()
652 .map(|key| {
653 (
654 key.prefix().to_string(),
655 key.key().to_string(),
656 key.type_name(),
657 )
658 })
659 .collect()
660 }
661
662 #[tokio::test]
663 async fn test_cache_bytes() {
664 let item = Arc::new(vec![1, 2, 3]);
665 let item_size = item.deep_size_of();
666 let capacity = 10 * item_size;
667 let cache = LanceCache::with_capacity(capacity);
668
669 cache
670 .insert_with_key(&TestKey::<Vec<i32>>::new("key"), item.clone())
671 .await;
672 assert_eq!(cache.size().await, 1);
673
674 let retrieved = cache
675 .get_with_key(&TestKey::<Vec<i32>>::new("key"))
676 .await
677 .unwrap();
678 assert_eq!(*retrieved, *item);
679
680 for i in 0..20 {
681 cache
682 .insert_with_key(
683 &TestKey::<Vec<i32>>::new(&format!("key_{}", i)),
684 Arc::new(vec![i, i, i]),
685 )
686 .await;
687 }
688 assert!(cache.size_bytes().await <= capacity);
689 }
690
691 #[tokio::test]
692 async fn test_cache_trait_objects() {
693 #[derive(Debug, DeepSizeOf)]
694 struct MyType(i32);
695
696 trait MyTrait: DeepSizeOf + Send + Sync + std::any::Any {
697 fn as_any(&self) -> &dyn std::any::Any;
698 }
699
700 impl MyTrait for MyType {
701 fn as_any(&self) -> &dyn std::any::Any {
702 self
703 }
704 }
705
706 let item: Arc<dyn MyTrait> = Arc::new(MyType(42));
707 let cache = LanceCache::with_capacity(1000);
708 cache
709 .insert_unsized_with_key(&TestUnsizedKey::<dyn MyTrait>::new("test"), item)
710 .await;
711
712 let retrieved = cache
713 .get_unsized_with_key(&TestUnsizedKey::<dyn MyTrait>::new("test"))
714 .await
715 .unwrap();
716 assert_eq!(retrieved.as_any().downcast_ref::<MyType>().unwrap().0, 42);
717 }
718
719 #[tokio::test]
720 async fn test_cache_stats_basic() {
721 let cache = LanceCache::with_capacity(1000);
722 assert_eq!(cache.stats().await.hits, 0);
723
724 assert!(
726 cache
727 .get_with_key(&TestKey::<Vec<i32>>::new("x"))
728 .await
729 .is_none()
730 );
731 assert_eq!(cache.stats().await.misses, 1);
732
733 cache
735 .insert_with_key(&TestKey::new("k"), Arc::new(vec![1, 2, 3]))
736 .await;
737 assert!(
738 cache
739 .get_with_key(&TestKey::<Vec<i32>>::new("k"))
740 .await
741 .is_some()
742 );
743 assert_eq!(cache.stats().await.hits, 1);
744 }
745
746 #[tokio::test]
747 async fn test_cache_stats_with_prefixes() {
748 let base = LanceCache::with_capacity(1000);
749 let prefixed = base.with_key_prefix("ns");
750
751 assert!(
752 prefixed
753 .get_with_key(&TestKey::<Vec<i32>>::new("k"))
754 .await
755 .is_none()
756 );
757 assert_eq!(base.stats().await.misses, 1);
758
759 prefixed
760 .insert_with_key(&TestKey::new("k"), Arc::new(vec![1]))
761 .await;
762 assert!(
763 prefixed
764 .get_with_key(&TestKey::<Vec<i32>>::new("k"))
765 .await
766 .is_some()
767 );
768 assert_eq!(base.stats().await.hits, 1);
769 }
770
771 #[tokio::test]
772 async fn test_cache_keys_with_prefixes() {
773 let base = LanceCache::with_capacity(1000);
774 let prefixed = base.with_key_prefix("ns");
775 let nested = prefixed.with_key_prefix("index");
776 let other = base.with_key_prefix("ns-other");
777
778 base.insert_with_key(&TestKey::new("root"), Arc::new(vec![0]))
779 .await;
780 prefixed
781 .insert_with_key(&TestKey::new("child"), Arc::new(vec![1]))
782 .await;
783 nested
784 .insert_with_key(&TestKey::new("nested"), Arc::new(vec![2]))
785 .await;
786 other
787 .insert_with_key(&TestKey::new("other"), Arc::new(vec![3]))
788 .await;
789
790 let base_keys = base.keys().await.unwrap().collect::<Vec<_>>();
791 assert_eq!(
792 key_fields(&base_keys),
793 BTreeSet::from([
794 (
795 "".to_string(),
796 "root".to_string(),
797 TestKey::<Vec<i32>>::type_name()
798 ),
799 (
800 "ns/".to_string(),
801 "child".to_string(),
802 TestKey::<Vec<i32>>::type_name()
803 ),
804 (
805 "ns/index/".to_string(),
806 "nested".to_string(),
807 TestKey::<Vec<i32>>::type_name()
808 ),
809 (
810 "ns-other/".to_string(),
811 "other".to_string(),
812 TestKey::<Vec<i32>>::type_name()
813 ),
814 ])
815 );
816
817 let prefixed_keys = prefixed.keys().await.unwrap().collect::<Vec<_>>();
818 assert_eq!(
819 key_fields(&prefixed_keys),
820 BTreeSet::from([
821 (
822 "ns/".to_string(),
823 "child".to_string(),
824 TestKey::<Vec<i32>>::type_name()
825 ),
826 (
827 "ns/index/".to_string(),
828 "nested".to_string(),
829 TestKey::<Vec<i32>>::type_name()
830 ),
831 ])
832 );
833 }
834
835 #[tokio::test]
836 async fn test_cache_keys_reflect_invalidation_and_clear() {
837 let base = LanceCache::with_capacity(1000);
838 let prefixed = base.with_key_prefix("ns");
839 let other = base.with_key_prefix("other");
840
841 prefixed
842 .insert_with_key(&TestKey::new("child"), Arc::new(vec![1]))
843 .await;
844 other
845 .insert_with_key(&TestKey::new("other"), Arc::new(vec![2]))
846 .await;
847 assert_eq!(base.keys().await.unwrap().count(), 2);
848
849 prefixed.invalidate_prefix("").await;
850 let keys = base.keys().await.unwrap().collect::<Vec<_>>();
851 assert_eq!(
852 key_fields(&keys),
853 BTreeSet::from([(
854 "other/".to_string(),
855 "other".to_string(),
856 TestKey::<Vec<i32>>::type_name()
857 )])
858 );
859
860 base.clear().await;
861 assert_eq!(base.keys().await.unwrap().count(), 0);
862 }
863
864 #[tokio::test]
865 async fn test_cache_get_or_insert() {
866 let cache = LanceCache::with_capacity(1000);
867
868 let v: Arc<Vec<i32>> = cache
869 .get_or_insert_with_key(TestKey::<Vec<i32>>::new("k"), || async {
870 Ok(vec![1, 2, 3])
871 })
872 .await
873 .unwrap();
874 assert_eq!(*v, vec![1, 2, 3]);
875 assert_eq!(cache.stats().await.misses, 1);
876 assert_eq!(cache.stats().await.hits, 0);
877
878 let v: Arc<Vec<i32>> = cache
880 .get_or_insert_with_key(TestKey::<Vec<i32>>::new("k"), || async {
881 panic!("should not be called")
882 })
883 .await
884 .unwrap();
885 assert_eq!(*v, vec![1, 2, 3]);
886 assert_eq!(cache.stats().await.hits, 1);
887 }
888
889 #[tokio::test]
890 async fn test_custom_backend() {
891 use async_trait::async_trait;
892 use tokio::sync::Mutex;
893
894 #[derive(Debug)]
895 struct HashMapBackend {
896 map: Mutex<HashMap<InternalCacheKey, (CacheEntry, usize)>>,
897 }
898
899 impl HashMapBackend {
900 fn new() -> Self {
901 Self {
902 map: Mutex::new(HashMap::new()),
903 }
904 }
905 }
906
907 #[async_trait]
908 impl CacheBackend for HashMapBackend {
909 async fn get(
910 &self,
911 key: &InternalCacheKey,
912 _codec: Option<CacheCodec>,
913 ) -> Option<CacheEntry> {
914 self.map.lock().await.get(key).map(|(e, _)| e.clone())
915 }
916 async fn insert(
917 &self,
918 key: &InternalCacheKey,
919 entry: CacheEntry,
920 size_bytes: usize,
921 _codec: Option<CacheCodec>,
922 ) {
923 self.map
924 .lock()
925 .await
926 .insert(key.clone(), (entry, size_bytes));
927 }
928 async fn get_or_insert<'a>(
929 &self,
930 key: &InternalCacheKey,
931 loader: std::pin::Pin<
932 Box<dyn futures::Future<Output = Result<(CacheEntry, usize)>> + Send + 'a>,
933 >,
934 _codec: Option<CacheCodec>,
935 ) -> Result<(CacheEntry, bool)> {
936 if let Some((entry, _)) = self.map.lock().await.get(key) {
937 Ok((entry.clone(), true))
938 } else {
939 let (entry, size) = loader.await?;
940 self.map
941 .lock()
942 .await
943 .insert(key.clone(), (entry.clone(), size));
944 Ok((entry, false))
945 }
946 }
947 async fn invalidate_prefix(&self, prefix: &str) {
948 self.map.lock().await.retain(|k, _| !k.starts_with(prefix));
949 }
950 async fn clear(&self) {
951 self.map.lock().await.clear();
952 }
953 async fn num_entries(&self) -> usize {
954 self.map.lock().await.len()
955 }
956 async fn size_bytes(&self) -> usize {
957 self.map.lock().await.values().map(|(_, s)| *s).sum()
958 }
959 }
960
961 let cache = LanceCache::with_backend(Arc::new(HashMapBackend::new()));
962
963 cache
964 .insert_with_key(&TestKey::new("k"), Arc::new(vec![1, 2, 3]))
965 .await;
966 assert!(
967 cache
968 .get_with_key(&TestKey::<Vec<i32>>::new("k"))
969 .await
970 .is_some()
971 );
972 assert!(
974 cache
975 .get_with_key(&TestKey::<Vec<u8>>::new("k"))
976 .await
977 .is_none()
978 );
979 assert!(cache.keys().await.is_none());
980 }
981
982 #[tokio::test]
983 async fn test_get_or_insert_dedup() {
984 use std::sync::atomic::AtomicUsize;
985
986 let load_count = Arc::new(AtomicUsize::new(0));
987 let cache = LanceCache::with_capacity(10000);
988
989 let (barrier_tx, _) = tokio::sync::broadcast::channel::<()>(1);
990 let mut handles = Vec::new();
991 for _ in 0..5 {
992 let cache = cache.clone();
993 let load_count = load_count.clone();
994 let mut barrier_rx = barrier_tx.subscribe();
995 handles.push(tokio::spawn(async move {
996 barrier_rx.recv().await.ok();
997 cache
998 .get_or_insert_with_key(TestKey::<Vec<i32>>::new("key"), || {
999 let load_count = load_count.clone();
1000 async move {
1001 load_count.fetch_add(1, Ordering::SeqCst);
1002 tokio::task::yield_now().await;
1003 Ok(vec![1, 2, 3])
1004 }
1005 })
1006 .await
1007 }));
1008 }
1009 barrier_tx.send(()).unwrap();
1010 for h in handles {
1011 let result: Arc<Vec<i32>> = h.await.unwrap().unwrap();
1012 assert_eq!(*result, vec![1, 2, 3]);
1013 }
1014
1015 assert_eq!(load_count.load(Ordering::SeqCst), 1);
1016 }
1017}