1extern crate alloc;
100
101use crate::lru::LruSegment;
102use crate::metrics::CacheMetrics;
103use alloc::boxed::Box;
104use alloc::collections::BTreeMap;
105use alloc::string::String;
106use alloc::vec::Vec;
107use core::borrow::Borrow;
108use core::hash::{BuildHasher, Hash};
109use core::num::NonZeroUsize;
110use parking_lot::Mutex;
111
112#[cfg(feature = "hashbrown")]
113use hashbrown::DefaultHashBuilder;
114
115#[cfg(not(feature = "hashbrown"))]
116use std::collections::hash_map::RandomState as DefaultHashBuilder;
117
118pub struct ConcurrentLruCache<K, V, S = DefaultHashBuilder> {
150 segments: Box<[Mutex<LruSegment<K, V, S>>]>,
151 hash_builder: S,
152}
153
154impl<K, V> ConcurrentLruCache<K, V, DefaultHashBuilder>
155where
156 K: Hash + Eq + Clone + Send,
157 V: Clone + Send,
158{
159 pub fn init(
196 config: crate::config::ConcurrentLruCacheConfig,
197 hasher: Option<DefaultHashBuilder>,
198 ) -> Self {
199 let segment_count = config.segments;
200 let capacity = config.base.capacity;
201 let max_size = config.base.max_size;
202
203 let segment_capacity = capacity.get() / segment_count;
204 let segment_cap = NonZeroUsize::new(segment_capacity.max(1)).unwrap();
205 let segment_max_size = max_size / segment_count as u64;
206
207 let hash_builder = hasher.unwrap_or_default();
208 let segments: Vec<_> = (0..segment_count)
209 .map(|_| {
210 let segment_config = crate::config::LruCacheConfig {
211 capacity: segment_cap,
212 max_size: segment_max_size,
213 };
214 Mutex::new(crate::lru::LruSegment::init(
215 segment_config,
216 hash_builder.clone(),
217 ))
218 })
219 .collect();
220
221 Self {
222 segments: segments.into_boxed_slice(),
223 hash_builder,
224 }
225 }
226}
227
228impl<K, V, S> ConcurrentLruCache<K, V, S>
229where
230 K: Hash + Eq + Clone + Send,
231 V: Clone + Send,
232 S: BuildHasher + Clone + Send,
233{
234 #[inline]
238 fn segment_index<Q>(&self, key: &Q) -> usize
239 where
240 K: Borrow<Q>,
241 Q: ?Sized + Hash,
242 {
243 (self.hash_builder.hash_one(key) as usize) % self.segments.len()
244 }
245
246 pub fn capacity(&self) -> usize {
248 self.segments.iter().map(|s| s.lock().cap().get()).sum()
249 }
250
251 pub fn segment_count(&self) -> usize {
253 self.segments.len()
254 }
255
256 pub fn len(&self) -> usize {
261 self.segments.iter().map(|s| s.lock().len()).sum()
262 }
263
264 pub fn is_empty(&self) -> bool {
266 self.segments.iter().all(|s| s.lock().is_empty())
267 }
268
269 pub fn get<Q>(&self, key: &Q) -> Option<V>
282 where
283 K: Borrow<Q>,
284 Q: ?Sized + Hash + Eq,
285 {
286 let idx = self.segment_index(key);
287 let mut segment = self.segments[idx].lock();
288 segment.get(key).cloned()
289 }
290
291 pub fn get_with<Q, F, R>(&self, key: &Q, f: F) -> Option<R>
308 where
309 K: Borrow<Q>,
310 Q: ?Sized + Hash + Eq,
311 F: FnOnce(&V) -> R,
312 {
313 let idx = self.segment_index(key);
314 let mut segment = self.segments[idx].lock();
315 segment.get(key).map(f)
316 }
317
318 pub fn get_mut_with<Q, F, R>(&self, key: &Q, f: F) -> Option<R>
329 where
330 K: Borrow<Q>,
331 Q: ?Sized + Hash + Eq,
332 F: FnOnce(&mut V) -> R,
333 {
334 let idx = self.segment_index(key);
335 let mut segment = self.segments[idx].lock();
336 segment.get_mut(key).map(f)
337 }
338
339 pub fn put(&self, key: K, value: V, size: u64) -> Option<Vec<(K, V)>> {
356 let idx = self.segment_index(&key);
357 let mut segment = self.segments[idx].lock();
358 segment.put(key, value, size)
359 }
360
361 pub fn remove<Q>(&self, key: &Q) -> Option<V>
368 where
369 K: Borrow<Q>,
370 Q: ?Sized + Hash + Eq,
371 {
372 let idx = self.segment_index(key);
373 let mut segment = self.segments[idx].lock();
374 segment.remove(key)
375 }
376
377 pub fn clear(&self) {
381 for segment in self.segments.iter() {
382 segment.lock().clear();
383 }
384 }
385
386 pub fn current_size(&self) -> u64 {
390 self.segments.iter().map(|s| s.lock().current_size()).sum()
391 }
392
393 pub fn max_size(&self) -> u64 {
395 self.segments.iter().map(|s| s.lock().max_size()).sum()
396 }
397
398 pub fn record_miss(&self, object_size: u64) {
402 if let Some(segment) = self.segments.first() {
404 segment.lock().record_miss(object_size);
405 }
406 }
407
408 pub fn contains<Q>(&self, key: &Q) -> bool
420 where
421 K: Borrow<Q>,
422 Q: ?Sized + Hash + Eq,
423 {
424 let idx = self.segment_index(key);
425 let segment = self.segments[idx].lock();
426 segment.contains(key)
427 }
428
429 pub fn peek<Q>(&self, key: &Q) -> Option<V>
441 where
442 K: Borrow<Q>,
443 Q: ?Sized + Hash + Eq,
444 V: Clone,
445 {
446 let idx = self.segment_index(key);
447 let segment = self.segments[idx].lock();
448 segment.peek(key).cloned()
449 }
450}
451
452impl<K, V, S> CacheMetrics for ConcurrentLruCache<K, V, S>
453where
454 K: Hash + Eq + Clone + Send,
455 V: Clone + Send,
456 S: BuildHasher + Clone + Send,
457{
458 fn metrics(&self) -> BTreeMap<String, f64> {
459 let mut aggregated = BTreeMap::new();
461
462 for segment in self.segments.iter() {
463 let segment_metrics = segment.lock().metrics().metrics();
464 for (key, value) in segment_metrics {
465 *aggregated.entry(key).or_insert(0.0) += value;
466 }
467 }
468
469 aggregated
470 }
471
472 fn algorithm_name(&self) -> &'static str {
473 "ConcurrentLRU"
474 }
475}
476
477unsafe impl<K: Send, V: Send, S: Send> Send for ConcurrentLruCache<K, V, S> {}
480unsafe impl<K: Send, V: Send, S: Send + Sync> Sync for ConcurrentLruCache<K, V, S> {}
481
482impl<K, V, S> core::fmt::Debug for ConcurrentLruCache<K, V, S>
483where
484 K: Hash + Eq + Clone + Send,
485 V: Clone + Send,
486 S: BuildHasher + Clone + Send,
487{
488 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
489 f.debug_struct("ConcurrentLruCache")
490 .field("segment_count", &self.segments.len())
491 .field("total_len", &self.len())
492 .finish()
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499 use crate::config::{ConcurrentCacheConfig, ConcurrentLruCacheConfig, LruCacheConfig};
500
501 extern crate std;
502 use std::string::ToString;
503 use std::sync::Arc;
504 use std::thread;
505 use std::vec::Vec;
506
507 fn make_config(capacity: usize, segments: usize) -> ConcurrentLruCacheConfig {
508 ConcurrentCacheConfig {
509 base: LruCacheConfig {
510 capacity: NonZeroUsize::new(capacity).unwrap(),
511 max_size: u64::MAX,
512 },
513 segments,
514 }
515 }
516
517 #[test]
518 fn test_basic_operations() {
519 let cache: ConcurrentLruCache<String, i32> =
520 ConcurrentLruCache::init(make_config(100, 16), None);
521
522 assert!(cache.is_empty());
523 assert_eq!(cache.len(), 0);
524
525 cache.put("a".to_string(), 1, 1);
526 cache.put("b".to_string(), 2, 1);
527 cache.put("c".to_string(), 3, 1);
528
529 assert_eq!(cache.len(), 3);
530 assert!(!cache.is_empty());
531
532 assert_eq!(cache.get(&"a".to_string()), Some(1));
533 assert_eq!(cache.get(&"b".to_string()), Some(2));
534 assert_eq!(cache.get(&"c".to_string()), Some(3));
535 assert_eq!(cache.get(&"d".to_string()), None);
536 }
537
538 #[test]
539 fn test_get_with() {
540 let cache: ConcurrentLruCache<String, String> =
541 ConcurrentLruCache::init(make_config(100, 16), None);
542
543 cache.put("key".to_string(), "hello world".to_string(), 1);
544
545 let len = cache.get_with(&"key".to_string(), |v: &String| v.len());
546 assert_eq!(len, Some(11));
547
548 let missing = cache.get_with(&"missing".to_string(), |v: &String| v.len());
549 assert_eq!(missing, None);
550 }
551
552 #[test]
553 fn test_get_mut_with() {
554 let cache: ConcurrentLruCache<String, i32> =
555 ConcurrentLruCache::init(make_config(100, 16), None);
556
557 cache.put("counter".to_string(), 0, 1);
558
559 cache.get_mut_with(&"counter".to_string(), |v: &mut i32| *v += 1);
560 cache.get_mut_with(&"counter".to_string(), |v: &mut i32| *v += 1);
561
562 assert_eq!(cache.get(&"counter".to_string()), Some(2));
563 }
564
565 #[test]
566 fn test_remove() {
567 let cache: ConcurrentLruCache<String, i32> =
568 ConcurrentLruCache::init(make_config(100, 16), None);
569
570 cache.put("a".to_string(), 1, 1);
571 cache.put("b".to_string(), 2, 1);
572
573 assert_eq!(cache.remove(&"a".to_string()), Some(1));
574 assert_eq!(cache.get(&"a".to_string()), None);
575 assert_eq!(cache.len(), 1);
576
577 assert_eq!(cache.remove(&"nonexistent".to_string()), None);
578 }
579
580 #[test]
581 fn test_clear() {
582 let cache: ConcurrentLruCache<String, i32> =
583 ConcurrentLruCache::init(make_config(100, 16), None);
584
585 cache.put("a".to_string(), 1, 1);
586 cache.put("b".to_string(), 2, 1);
587 cache.put("c".to_string(), 3, 1);
588
589 assert_eq!(cache.len(), 3);
590 cache.clear();
591 assert_eq!(cache.len(), 0);
592 assert!(cache.is_empty());
593 }
594
595 #[test]
596 fn test_contains_key() {
597 let cache: ConcurrentLruCache<String, i32> =
598 ConcurrentLruCache::init(make_config(100, 16), None);
599
600 cache.put("exists".to_string(), 1, 1);
601
602 assert!(cache.contains(&"exists".to_string()));
603 assert!(!cache.contains(&"missing".to_string()));
604 }
605
606 #[test]
607 fn test_concurrent_access() {
608 let cache: Arc<ConcurrentLruCache<String, i32>> =
609 Arc::new(ConcurrentLruCache::init(make_config(1000, 16), None));
610 let num_threads = 8;
611 let ops_per_thread = 1000;
612
613 let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
614
615 for t in 0..num_threads {
616 let cache = Arc::clone(&cache);
617 handles.push(thread::spawn(move || {
618 for i in 0..ops_per_thread {
619 let key = std::format!("thread_{}_key_{}", t, i);
620 cache.put(key.clone(), t * 1000 + i, 1);
621 let _ = cache.get(&key);
622 }
623 }));
624 }
625
626 for handle in handles {
627 handle.join().unwrap();
628 }
629
630 assert!(!cache.is_empty());
631 }
632
633 #[test]
634 fn test_concurrent_mixed_operations() {
635 let cache: Arc<ConcurrentLruCache<String, i32>> =
636 Arc::new(ConcurrentLruCache::init(make_config(100, 16), None));
637 let num_threads = 8;
638 let ops_per_thread = 500;
639
640 let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
641
642 for t in 0..num_threads {
643 let cache = Arc::clone(&cache);
644 handles.push(thread::spawn(move || {
645 for i in 0..ops_per_thread {
646 let key = std::format!("key_{}", i % 200);
647
648 match i % 4 {
649 0 => {
650 cache.put(key, i, 1);
651 }
652 1 => {
653 let _ = cache.get(&key);
654 }
655 2 => {
656 cache.get_mut_with(&key, |v: &mut i32| *v += 1);
657 }
658 3 => {
659 let _ = cache.remove(&key);
660 }
661 _ => unreachable!(),
662 }
663
664 if i == 250 && t == 0 {
665 cache.clear();
666 }
667 }
668 }));
669 }
670
671 for handle in handles {
672 handle.join().unwrap();
673 }
674
675 assert!(cache.len() <= 100);
677 }
678
679 #[test]
680 fn test_segment_count() {
681 let cache: ConcurrentLruCache<String, i32> =
682 ConcurrentLruCache::init(make_config(100, 8), None);
683
684 assert_eq!(cache.segment_count(), 8);
685 }
686
687 #[test]
688 fn test_capacity() {
689 let cache: ConcurrentLruCache<String, i32> =
690 ConcurrentLruCache::init(make_config(100, 16), None);
691
692 let capacity = cache.capacity();
695 assert!(capacity >= 16); assert!(capacity <= 100); }
698
699 #[test]
700 fn test_eviction_on_capacity() {
701 let cache: ConcurrentLruCache<String, i32> =
702 ConcurrentLruCache::init(make_config(48, 16), None);
703
704 cache.put("a".to_string(), 1, 1);
705 cache.put("b".to_string(), 2, 1);
706 cache.put("c".to_string(), 3, 1);
707
708 assert_eq!(cache.len(), 3);
709
710 cache.put("d".to_string(), 4, 1);
712
713 assert!(cache.len() <= 48);
714 assert!(cache.contains(&"d".to_string()));
715 }
716
717 #[test]
718 fn test_update_existing_key() {
719 let cache: ConcurrentLruCache<String, i32> =
720 ConcurrentLruCache::init(make_config(100, 16), None);
721
722 cache.put("key".to_string(), 1, 1);
723 assert_eq!(cache.get(&"key".to_string()), Some(1));
724
725 cache.put("key".to_string(), 2, 1);
726 assert_eq!(cache.get(&"key".to_string()), Some(2));
727 assert_eq!(cache.len(), 1);
728 }
729
730 #[test]
731 fn test_lru_ordering() {
732 let cache: ConcurrentLruCache<String, i32> =
733 ConcurrentLruCache::init(make_config(48, 16), None);
734
735 cache.put("a".to_string(), 1, 1);
736 cache.put("b".to_string(), 2, 1);
737 cache.put("c".to_string(), 3, 1);
738
739 let _ = cache.get(&"a".to_string());
741
742 cache.put("d".to_string(), 4, 1);
744
745 assert!(cache.contains(&"a".to_string()));
746 assert!(cache.contains(&"d".to_string()));
747 }
748
749 #[test]
750 fn test_metrics() {
751 let cache: ConcurrentLruCache<String, i32> =
752 ConcurrentLruCache::init(make_config(100, 16), None);
753
754 cache.put("a".to_string(), 1, 1);
755 cache.put("b".to_string(), 2, 1);
756
757 let metrics = cache.metrics();
758 assert!(!metrics.is_empty());
760 }
761
762 #[test]
763 fn test_record_miss() {
764 let cache: ConcurrentLruCache<String, i32> =
765 ConcurrentLruCache::init(make_config(100, 16), None);
766
767 cache.record_miss(100);
768 cache.record_miss(200);
769
770 let metrics = cache.metrics();
771 assert!(!metrics.is_empty());
773 }
774
775 #[test]
776 fn test_empty_cache_operations() {
777 let cache: ConcurrentLruCache<String, i32> =
778 ConcurrentLruCache::init(make_config(100, 16), None);
779
780 assert!(cache.is_empty());
781 assert_eq!(cache.len(), 0);
782 assert_eq!(cache.get(&"missing".to_string()), None);
783 assert_eq!(cache.remove(&"missing".to_string()), None);
784 assert!(!cache.contains(&"missing".to_string()));
785
786 let result = cache.get_with(&"missing".to_string(), |v: &i32| *v);
787 assert_eq!(result, None);
788 }
789
790 #[test]
791 fn test_single_item_cache() {
792 let cache: ConcurrentLruCache<String, i32> =
793 ConcurrentLruCache::init(make_config(16, 16), None);
794
795 cache.put("a".to_string(), 1, 1);
796 assert!(!cache.is_empty());
797
798 cache.put("b".to_string(), 2, 1);
799 assert!(cache.len() <= 16);
800 }
801
802 #[test]
803 fn test_borrowed_key_lookup() {
804 let cache: ConcurrentLruCache<String, i32> =
805 ConcurrentLruCache::init(make_config(100, 16), None);
806
807 cache.put("test_key".to_string(), 42, 1);
808
809 let key_str = "test_key";
811 assert_eq!(cache.get(key_str), Some(42));
812 assert!(cache.contains(key_str));
813 assert_eq!(cache.remove(key_str), Some(42));
814 }
815
816 #[test]
817 fn test_algorithm_name() {
818 let cache: ConcurrentLruCache<String, i32> =
819 ConcurrentLruCache::init(make_config(100, 16), None);
820
821 assert_eq!(cache.algorithm_name(), "ConcurrentLRU");
822 }
823
824 #[test]
825 fn test_contains_non_promoting() {
826 let cache: ConcurrentLruCache<String, i32> =
827 ConcurrentLruCache::init(make_config(100, 16), None);
828
829 cache.put("a".to_string(), 1, 1);
830 cache.put("b".to_string(), 2, 1);
831
832 assert!(cache.contains(&"a".to_string()));
834 assert!(cache.contains(&"b".to_string()));
835 assert!(!cache.contains(&"c".to_string()));
836 }
837}