Skip to main content

cache_rs/concurrent/
lfu.rs

1//! Concurrent LFU Cache Implementation
2//!
3//! A thread-safe LFU cache using lock striping (segmented storage) for high-performance
4//! concurrent access. This is the multi-threaded counterpart to [`LfuCache`](crate::LfuCache).
5//!
6//! # How It Works
7//!
8//! The cache partitions keys across multiple independent segments, each with its own lock.
9//! This allows concurrent operations on different segments without contention.
10//!
11//! ```text
12//! ┌──────────────────────────────────────────────────────────────────────┐
13//! │                      ConcurrentLfuCache                              │
14//! │                                                                      │
15//! │  hash(key) % N  ──▶  Segment Selection                               │
16//! │                                                                      │
17//! │  ┌──────────────┐ ┌──────────────┐     ┌──────────────┐              │
18//! │  │  Segment 0   │ │  Segment 1   │ ... │  Segment N-1 │              │
19//! │  │  ┌────────┐  │ │  ┌────────┐  │     │  ┌────────┐  │              │
20//! │  │  │ Mutex  │  │ │  │ Mutex  │  │     │  │ Mutex  │  │              │
21//! │  │  └────┬───┘  │ │  └────┬───┘  │     │  └────┬───┘  │              │
22//! │  │       │      │ │       │      │     │       │      │              │
23//! │  │  ┌────▼───┐  │ │  ┌────▼───┐  │     │  ┌────▼───┐  │              │
24//! │  │  │LfuCache│  │ │  │LfuCache│  │     │  │LfuCache│  │              │
25//! │  │  └────────┘  │ │  └────────┘  │     │  └────────┘  │              │
26//! │  └──────────────┘ └──────────────┘     └──────────────┘              │
27//! └──────────────────────────────────────────────────────────────────────┘
28//! ```
29//!
30//! ## Segment Count
31//!
32//! The default segment count is based on available CPU cores (typically 16).
33//! More segments = less contention but more memory overhead.
34//!
35//! ## Trade-offs
36//!
37//! - **Pros**: Near-linear scaling with thread count, excellent scan resistance
38//! - **Cons**: LFU frequency tracking is per-segment, not global. An item accessed
39//!   in segment A doesn't affect frequency tracking in segment B.
40//!
41//! # Performance Characteristics
42//!
43//! | Metric | Value |
44//! |--------|-------|
45//! | Get/Put/Remove | O(log F) per segment, effectively O(1) |
46//! | Concurrency | Near-linear scaling up to segment count |
47//! | Memory overhead | ~150 bytes per entry + one Mutex per segment |
48//! | Scan resistance | Excellent (frequency-based eviction) |
49//!
50//! Where F = distinct frequency values per segment. Since frequencies are small
51//! integers, F is bounded and operations are effectively O(1).
52//!
53//! # When to Use
54//!
55//! **Use ConcurrentLfuCache when:**
56//! - Multiple threads need cache access
57//! - Access patterns have stable popularity (some keys consistently more popular)
58//! - You need excellent scan resistance
59//! - Frequency is more important than recency
60//!
61//! **Consider alternatives when:**
62//! - Single-threaded access only → use `LfuCache`
63//! - Need global frequency tracking → use `Mutex<LfuCache>`
64//! - Popularity changes over time → use `ConcurrentLfudaCache`
65//! - Recency-based access → use `ConcurrentLruCache`
66//!
67//! # Thread Safety
68//!
69//! `ConcurrentLfuCache` is `Send + Sync` and can be shared via `Arc`.
70//!
71//! # Example
72//!
73//! ```rust,ignore
74//! use cache_rs::concurrent::ConcurrentLfuCache;
75//! use cache_rs::config::ConcurrentLfuCacheConfig;
76//! use std::num::NonZeroUsize;
77//! use std::sync::Arc;
78//! use std::thread;
79//!
80//! let config = ConcurrentLfuCacheConfig::new(NonZeroUsize::new(10_000).unwrap());
81//! let cache = Arc::new(ConcurrentLfuCache::from_config(config));
82//!
83//! let handles: Vec<_> = (0..4).map(|i| {
84//!     let cache = Arc::clone(&cache);
85//!     thread::spawn(move || {
86//!         for j in 0..1000 {
87//!             let key = format!("key-{}-{}", i, j);
88//!             cache.put(key.clone(), j, 1);
89//!             // Access popular keys more frequently
90//!             if j % 10 == 0 {
91//!                 for _ in 0..5 {
92//!                     let _ = cache.get(&key);
93//!                 }
94//!             }
95//!         }
96//!     })
97//! }).collect();
98//!
99//! for h in handles {
100//!     h.join().unwrap();
101//! }
102//!
103//! println!("Total entries: {}", cache.len());
104//! ```
105
106extern crate alloc;
107
108use crate::lfu::LfuSegment;
109use crate::metrics::CacheMetrics;
110use alloc::boxed::Box;
111use alloc::collections::BTreeMap;
112use alloc::string::String;
113use alloc::vec::Vec;
114use core::borrow::Borrow;
115use core::hash::{BuildHasher, Hash};
116use core::num::NonZeroUsize;
117use parking_lot::Mutex;
118
119#[cfg(feature = "hashbrown")]
120use hashbrown::DefaultHashBuilder;
121
122#[cfg(not(feature = "hashbrown"))]
123use std::collections::hash_map::RandomState as DefaultHashBuilder;
124
125/// A thread-safe LFU cache with segmented storage for high concurrency.
126pub struct ConcurrentLfuCache<K, V, S = DefaultHashBuilder> {
127    segments: Box<[Mutex<LfuSegment<K, V, S>>]>,
128    hash_builder: S,
129}
130
131impl<K, V> ConcurrentLfuCache<K, V, DefaultHashBuilder>
132where
133    K: Hash + Eq + Clone + Send,
134    V: Clone + Send,
135{
136    /// Creates a new concurrent LFU cache from a configuration.
137    ///
138    /// This is the **recommended** way to create a concurrent LFU cache.
139    ///
140    /// # Arguments
141    /// * `config` - The cache configuration
142    /// * `hasher` - Optional custom hash builder. If `None`, uses the default.
143    pub fn init(
144        config: crate::config::ConcurrentLfuCacheConfig,
145        hasher: Option<DefaultHashBuilder>,
146    ) -> Self {
147        let segment_count = config.segments;
148        let capacity = config.base.capacity;
149        let max_size = config.base.max_size;
150
151        let segment_capacity = capacity.get() / segment_count;
152        let segment_cap = NonZeroUsize::new(segment_capacity.max(1)).unwrap();
153        let segment_max_size = max_size / segment_count as u64;
154
155        let hash_builder = hasher.unwrap_or_default();
156
157        let segments: Vec<_> = (0..segment_count)
158            .map(|_| {
159                let segment_config = crate::config::LfuCacheConfig {
160                    capacity: segment_cap,
161                    max_size: segment_max_size,
162                };
163                Mutex::new(LfuSegment::init(segment_config, hash_builder.clone()))
164            })
165            .collect();
166
167        Self {
168            segments: segments.into_boxed_slice(),
169            hash_builder,
170        }
171    }
172}
173
174impl<K, V, S> ConcurrentLfuCache<K, V, S>
175where
176    K: Hash + Eq + Clone + Send,
177    V: Clone + Send,
178    S: BuildHasher + Clone + Send,
179{
180    #[inline]
181    fn segment_index<Q>(&self, key: &Q) -> usize
182    where
183        K: Borrow<Q>,
184        Q: ?Sized + Hash,
185    {
186        (self.hash_builder.hash_one(key) as usize) % self.segments.len()
187    }
188
189    /// Returns the total capacity across all segments.
190    pub fn capacity(&self) -> usize {
191        self.segments.iter().map(|s| s.lock().cap().get()).sum()
192    }
193
194    /// Returns the number of segments in the cache.
195    pub fn segment_count(&self) -> usize {
196        self.segments.len()
197    }
198
199    /// Returns the total number of entries across all segments.
200    pub fn len(&self) -> usize {
201        self.segments.iter().map(|s| s.lock().len()).sum()
202    }
203
204    /// Returns `true` if the cache contains no entries.
205    pub fn is_empty(&self) -> bool {
206        self.segments.iter().all(|s| s.lock().is_empty())
207    }
208
209    /// Gets a value from the cache.
210    ///
211    /// This clones the value to avoid holding the lock. For zero-copy access,
212    /// use `get_with()` instead.
213    pub fn get<Q>(&self, key: &Q) -> Option<V>
214    where
215        K: Borrow<Q>,
216        Q: ?Sized + Hash + Eq,
217    {
218        let idx = self.segment_index(key);
219        let mut segment = self.segments[idx].lock();
220        segment.get(key).cloned()
221    }
222
223    /// Gets a value and applies a function to it while holding the lock.
224    ///
225    /// This is more efficient than `get()` when you only need to read from the value,
226    /// as it avoids cloning.
227    pub fn get_with<Q, F, R>(&self, key: &Q, f: F) -> Option<R>
228    where
229        K: Borrow<Q>,
230        Q: ?Sized + Hash + Eq,
231        F: FnOnce(&V) -> R,
232    {
233        let idx = self.segment_index(key);
234        let mut segment = self.segments[idx].lock();
235        segment.get(key).map(f)
236    }
237
238    /// Inserts a key-value pair into the cache with optional size tracking.
239    ///
240    /// If the cache is at capacity, the least frequently used entry is evicted.
241    /// Use `SIZE_UNIT` (1) for count-based caching.
242    pub fn put(&self, key: K, value: V, size: u64) -> Option<Vec<(K, V)>> {
243        let idx = self.segment_index(&key);
244        let mut segment = self.segments[idx].lock();
245        segment.put(key, value, size)
246    }
247
248    /// Removes a key from the cache, returning the value if it existed.
249    pub fn remove<Q>(&self, key: &Q) -> Option<V>
250    where
251        K: Borrow<Q>,
252        Q: ?Sized + Hash + Eq,
253    {
254        let idx = self.segment_index(key);
255        let mut segment = self.segments[idx].lock();
256        segment.remove(key)
257    }
258
259    /// Clears all entries from the cache.
260    pub fn clear(&self) {
261        for segment in self.segments.iter() {
262            segment.lock().clear();
263        }
264    }
265
266    /// Returns the current total size of cached content across all segments.
267    pub fn current_size(&self) -> u64 {
268        self.segments.iter().map(|s| s.lock().current_size()).sum()
269    }
270
271    /// Returns the maximum content size the cache can hold across all segments.
272    pub fn max_size(&self) -> u64 {
273        self.segments.iter().map(|s| s.lock().max_size()).sum()
274    }
275
276    /// Checks if the cache contains a key without updating frequency.
277    ///
278    /// This is a pure existence check that does **not** update the entry's frequency.
279    ///
280    /// # Example
281    ///
282    /// ```rust,ignore
283    /// if cache.contains(&"key".to_string()) {
284    ///     println!("Key exists!");
285    /// }
286    /// ```
287    pub fn contains<Q>(&self, key: &Q) -> bool
288    where
289        K: Borrow<Q>,
290        Q: ?Sized + Hash + Eq,
291    {
292        let idx = self.segment_index(key);
293        let segment = self.segments[idx].lock();
294        segment.contains(key)
295    }
296
297    /// Returns a clone of the value without updating frequency or access metadata.
298    ///
299    /// Unlike [`get()`](Self::get), this does NOT increment the entry's frequency
300    /// or change its position. Returns a cloned value because the internal lock
301    /// cannot be held across the return boundary.
302    ///
303    /// # Example
304    ///
305    /// ```rust,ignore
306    /// let value = cache.peek(&"key".to_string());
307    /// ```
308    pub fn peek<Q>(&self, key: &Q) -> Option<V>
309    where
310        K: Borrow<Q>,
311        Q: ?Sized + Hash + Eq,
312        V: Clone,
313    {
314        let idx = self.segment_index(key);
315        let segment = self.segments[idx].lock();
316        segment.peek(key).cloned()
317    }
318}
319
320impl<K, V, S> CacheMetrics for ConcurrentLfuCache<K, V, S>
321where
322    K: Hash + Eq + Clone + Send,
323    V: Clone + Send,
324    S: BuildHasher + Clone + Send,
325{
326    fn metrics(&self) -> BTreeMap<String, f64> {
327        let mut aggregated = BTreeMap::new();
328        for segment in self.segments.iter() {
329            let segment_metrics = segment.lock().metrics().metrics();
330            for (key, value) in segment_metrics {
331                *aggregated.entry(key).or_insert(0.0) += value;
332            }
333        }
334        aggregated
335    }
336
337    fn algorithm_name(&self) -> &'static str {
338        "ConcurrentLFU"
339    }
340}
341
342unsafe impl<K: Send, V: Send, S: Send> Send for ConcurrentLfuCache<K, V, S> {}
343unsafe impl<K: Send, V: Send, S: Send + Sync> Sync for ConcurrentLfuCache<K, V, S> {}
344
345impl<K, V, S> core::fmt::Debug for ConcurrentLfuCache<K, V, S>
346where
347    K: Hash + Eq + Clone + Send,
348    V: Clone + Send,
349    S: BuildHasher + Clone + Send,
350{
351    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
352        f.debug_struct("ConcurrentLfuCache")
353            .field("segment_count", &self.segments.len())
354            .field("total_len", &self.len())
355            .finish()
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362    use crate::config::{ConcurrentCacheConfig, ConcurrentLfuCacheConfig, LfuCacheConfig};
363
364    extern crate std;
365    use std::string::ToString;
366    use std::sync::Arc;
367    use std::thread;
368    use std::vec::Vec;
369
370    fn make_config(capacity: usize, segments: usize) -> ConcurrentLfuCacheConfig {
371        ConcurrentCacheConfig {
372            base: LfuCacheConfig {
373                capacity: NonZeroUsize::new(capacity).unwrap(),
374                max_size: u64::MAX,
375            },
376            segments,
377        }
378    }
379
380    #[test]
381    fn test_basic_operations() {
382        let cache: ConcurrentLfuCache<String, i32> =
383            ConcurrentLfuCache::init(make_config(100, 16), None);
384
385        cache.put("a".to_string(), 1, 1);
386        cache.put("b".to_string(), 2, 1);
387
388        assert_eq!(cache.get(&"a".to_string()), Some(1));
389        assert_eq!(cache.get(&"b".to_string()), Some(2));
390    }
391
392    #[test]
393    fn test_concurrent_access() {
394        let cache: Arc<ConcurrentLfuCache<String, i32>> =
395            Arc::new(ConcurrentLfuCache::init(make_config(1000, 16), None));
396        let num_threads = 8;
397        let ops_per_thread = 500;
398
399        let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
400
401        for t in 0..num_threads {
402            let cache = Arc::clone(&cache);
403            handles.push(thread::spawn(move || {
404                for i in 0..ops_per_thread {
405                    let key = std::format!("key_{}_{}", t, i);
406                    cache.put(key.clone(), i, 1);
407                    // Access multiple times to test frequency tracking
408                    if i % 3 == 0 {
409                        let _ = cache.get(&key);
410                        let _ = cache.get(&key);
411                    }
412                }
413            }));
414        }
415
416        for handle in handles {
417            handle.join().unwrap();
418        }
419
420        assert!(!cache.is_empty());
421    }
422
423    #[test]
424    fn test_capacity() {
425        let cache: ConcurrentLfuCache<String, i32> =
426            ConcurrentLfuCache::init(make_config(100, 16), None);
427
428        // Capacity is distributed across segments
429        let capacity = cache.capacity();
430        assert!(capacity >= 16);
431        assert!(capacity <= 100);
432    }
433
434    #[test]
435    fn test_segment_count() {
436        let cache: ConcurrentLfuCache<String, i32> =
437            ConcurrentLfuCache::init(make_config(100, 8), None);
438
439        assert_eq!(cache.segment_count(), 8);
440    }
441
442    #[test]
443    fn test_len_and_is_empty() {
444        let cache: ConcurrentLfuCache<String, i32> =
445            ConcurrentLfuCache::init(make_config(100, 16), None);
446
447        assert!(cache.is_empty());
448        assert_eq!(cache.len(), 0);
449
450        cache.put("key1".to_string(), 1, 1);
451        assert_eq!(cache.len(), 1);
452        assert!(!cache.is_empty());
453
454        cache.put("key2".to_string(), 2, 1);
455        assert_eq!(cache.len(), 2);
456    }
457
458    #[test]
459    fn test_remove() {
460        let cache: ConcurrentLfuCache<String, i32> =
461            ConcurrentLfuCache::init(make_config(100, 16), None);
462
463        cache.put("key1".to_string(), 1, 1);
464        cache.put("key2".to_string(), 2, 1);
465
466        assert_eq!(cache.remove(&"key1".to_string()), Some(1));
467        assert_eq!(cache.len(), 1);
468        assert_eq!(cache.get(&"key1".to_string()), None);
469
470        assert_eq!(cache.remove(&"nonexistent".to_string()), None);
471    }
472
473    #[test]
474    fn test_clear() {
475        let cache: ConcurrentLfuCache<String, i32> =
476            ConcurrentLfuCache::init(make_config(100, 16), None);
477
478        cache.put("key1".to_string(), 1, 1);
479        cache.put("key2".to_string(), 2, 1);
480        cache.put("key3".to_string(), 3, 1);
481
482        assert_eq!(cache.len(), 3);
483
484        cache.clear();
485
486        assert_eq!(cache.len(), 0);
487        assert!(cache.is_empty());
488        assert_eq!(cache.get(&"key1".to_string()), None);
489    }
490
491    #[test]
492    fn test_contains_key() {
493        let cache: ConcurrentLfuCache<String, i32> =
494            ConcurrentLfuCache::init(make_config(100, 16), None);
495
496        cache.put("exists".to_string(), 1, 1);
497
498        assert!(cache.contains(&"exists".to_string()));
499        assert!(!cache.contains(&"missing".to_string()));
500    }
501
502    #[test]
503    fn test_get_with() {
504        let cache: ConcurrentLfuCache<String, String> =
505            ConcurrentLfuCache::init(make_config(100, 16), None);
506
507        cache.put("key".to_string(), "hello world".to_string(), 1);
508
509        let len = cache.get_with(&"key".to_string(), |v: &String| v.len());
510        assert_eq!(len, Some(11));
511
512        let missing = cache.get_with(&"missing".to_string(), |v: &String| v.len());
513        assert_eq!(missing, None);
514    }
515
516    #[test]
517    fn test_frequency_eviction() {
518        let cache: ConcurrentLfuCache<String, i32> =
519            ConcurrentLfuCache::init(make_config(48, 16), None);
520
521        cache.put("a".to_string(), 1, 1);
522        cache.put("b".to_string(), 2, 1);
523        cache.put("c".to_string(), 3, 1);
524
525        // Access "a" and "c" multiple times to increase frequency
526        for _ in 0..5 {
527            let _ = cache.get(&"a".to_string());
528            let _ = cache.get(&"c".to_string());
529        }
530
531        // Add a new item
532        cache.put("d".to_string(), 4, 1);
533
534        assert!(cache.len() <= 48);
535    }
536
537    #[test]
538    fn test_eviction_on_capacity() {
539        let cache: ConcurrentLfuCache<String, i32> =
540            ConcurrentLfuCache::init(make_config(80, 16), None);
541
542        // Fill the cache
543        for i in 0..10 {
544            cache.put(std::format!("key{}", i), i, 1);
545        }
546
547        // Cache should not exceed capacity
548        assert!(cache.len() <= 80);
549    }
550
551    #[test]
552    fn test_metrics() {
553        let cache: ConcurrentLfuCache<String, i32> =
554            ConcurrentLfuCache::init(make_config(100, 16), None);
555
556        cache.put("a".to_string(), 1, 1);
557        cache.put("b".to_string(), 2, 1);
558
559        let metrics = cache.metrics();
560        // Metrics aggregation across segments
561        assert!(!metrics.is_empty());
562    }
563
564    #[test]
565    fn test_algorithm_name() {
566        let cache: ConcurrentLfuCache<String, i32> =
567            ConcurrentLfuCache::init(make_config(100, 16), None);
568
569        assert_eq!(cache.algorithm_name(), "ConcurrentLFU");
570    }
571
572    #[test]
573    fn test_empty_cache_operations() {
574        let cache: ConcurrentLfuCache<String, i32> =
575            ConcurrentLfuCache::init(make_config(100, 16), None);
576
577        assert!(cache.is_empty());
578        assert_eq!(cache.len(), 0);
579        assert_eq!(cache.get(&"missing".to_string()), None);
580        assert_eq!(cache.remove(&"missing".to_string()), None);
581        assert!(!cache.contains(&"missing".to_string()));
582    }
583
584    #[test]
585    fn test_borrowed_key_lookup() {
586        let cache: ConcurrentLfuCache<String, i32> =
587            ConcurrentLfuCache::init(make_config(100, 16), None);
588
589        cache.put("test_key".to_string(), 42, 1);
590
591        // Test with borrowed key
592        let key_str = "test_key";
593        assert_eq!(cache.get(key_str), Some(42));
594        assert!(cache.contains(key_str));
595        assert_eq!(cache.remove(key_str), Some(42));
596    }
597
598    #[test]
599    fn test_frequency_tracking() {
600        let cache: ConcurrentLfuCache<String, i32> =
601            ConcurrentLfuCache::init(make_config(100, 16), None);
602
603        cache.put("key".to_string(), 1, 1);
604
605        // Access the key multiple times
606        for _ in 0..10 {
607            let _ = cache.get(&"key".to_string());
608        }
609
610        // Item should still be accessible
611        assert_eq!(cache.get(&"key".to_string()), Some(1));
612    }
613
614    #[test]
615    fn test_contains_non_promoting() {
616        let cache: ConcurrentLfuCache<String, i32> =
617            ConcurrentLfuCache::init(make_config(100, 16), None);
618
619        cache.put("a".to_string(), 1, 1);
620        cache.put("b".to_string(), 2, 1);
621
622        // contains() should check without updating frequency
623        assert!(cache.contains(&"a".to_string()));
624        assert!(cache.contains(&"b".to_string()));
625        assert!(!cache.contains(&"c".to_string()));
626    }
627}