Skip to main content

cache_rs/concurrent/
lfu.rs

1//! Concurrent LFU Cache Implementation
2//!
3//! A thread-safe LFU cache using lock striping (segmented storage) for high-performance
4//! concurrent access. This is the multi-threaded counterpart to [`LfuCache`](crate::LfuCache).
5//!
6//! # How It Works
7//!
8//! The cache partitions keys across multiple independent segments, each with its own lock.
9//! This allows concurrent operations on different segments without contention.
10//!
11//! ```text
12//! ┌──────────────────────────────────────────────────────────────────────┐
13//! │                      ConcurrentLfuCache                              │
14//! │                                                                      │
15//! │  hash(key) % N  ──▶  Segment Selection                               │
16//! │                                                                      │
17//! │  ┌──────────────┐ ┌──────────────┐     ┌──────────────┐              │
18//! │  │  Segment 0   │ │  Segment 1   │ ... │  Segment N-1 │              │
19//! │  │  ┌────────┐  │ │  ┌────────┐  │     │  ┌────────┐  │              │
20//! │  │  │ Mutex  │  │ │  │ Mutex  │  │     │  │ Mutex  │  │              │
21//! │  │  └────┬───┘  │ │  └────┬───┘  │     │  └────┬───┘  │              │
22//! │  │       │      │ │       │      │     │       │      │              │
23//! │  │  ┌────▼───┐  │ │  ┌────▼───┐  │     │  ┌────▼───┐  │              │
24//! │  │  │LfuCache│  │ │  │LfuCache│  │     │  │LfuCache│  │              │
25//! │  │  └────────┘  │ │  └────────┘  │     │  └────────┘  │              │
26//! │  └──────────────┘ └──────────────┘     └──────────────┘              │
27//! └──────────────────────────────────────────────────────────────────────┘
28//! ```
29//!
30//! ## Segment Count
31//!
32//! The default segment count is based on available CPU cores (typically 16).
33//! More segments = less contention but more memory overhead.
34//!
35//! ## Trade-offs
36//!
37//! - **Pros**: Near-linear scaling with thread count, excellent scan resistance
38//! - **Cons**: LFU frequency tracking is per-segment, not global. An item accessed
39//!   in segment A doesn't affect frequency tracking in segment B.
40//!
41//! # Performance Characteristics
42//!
43//! | Metric | Value |
44//! |--------|-------|
45//! | Get/Put/Remove | O(log F) per segment, effectively O(1) |
46//! | Concurrency | Near-linear scaling up to segment count |
47//! | Memory overhead | ~150 bytes per entry + one Mutex per segment |
48//! | Scan resistance | Excellent (frequency-based eviction) |
49//!
50//! Where F = distinct frequency values per segment. Since frequencies are small
51//! integers, F is bounded and operations are effectively O(1).
52//!
53//! # When to Use
54//!
55//! **Use ConcurrentLfuCache when:**
56//! - Multiple threads need cache access
57//! - Access patterns have stable popularity (some keys consistently more popular)
58//! - You need excellent scan resistance
59//! - Frequency is more important than recency
60//!
61//! **Consider alternatives when:**
62//! - Single-threaded access only → use `LfuCache`
63//! - Need global frequency tracking → use `Mutex<LfuCache>`
64//! - Popularity changes over time → use `ConcurrentLfudaCache`
65//! - Recency-based access → use `ConcurrentLruCache`
66//!
67//! # Thread Safety
68//!
69//! `ConcurrentLfuCache` is `Send + Sync` and can be shared via `Arc`.
70//!
71//! # Example
72//!
73//! ```rust,ignore
74//! use cache_rs::concurrent::ConcurrentLfuCache;
75//! use cache_rs::config::ConcurrentLfuCacheConfig;
76//! use std::num::NonZeroUsize;
77//! use std::sync::Arc;
78//! use std::thread;
79//!
80//! let config = ConcurrentLfuCacheConfig::new(NonZeroUsize::new(10_000).unwrap());
81//! let cache = Arc::new(ConcurrentLfuCache::from_config(config));
82//!
83//! let handles: Vec<_> = (0..4).map(|i| {
84//!     let cache = Arc::clone(&cache);
85//!     thread::spawn(move || {
86//!         for j in 0..1000 {
87//!             let key = format!("key-{}-{}", i, j);
88//!             cache.put(key.clone(), j);
89//!             // Access popular keys more frequently
90//!             if j % 10 == 0 {
91//!                 for _ in 0..5 {
92//!                     let _ = cache.get(&key);
93//!                 }
94//!             }
95//!         }
96//!     })
97//! }).collect();
98//!
99//! for h in handles {
100//!     h.join().unwrap();
101//! }
102//!
103//! println!("Total entries: {}", cache.len());
104//! ```
105
106extern crate alloc;
107
108use crate::lfu::LfuSegment;
109use crate::metrics::CacheMetrics;
110use alloc::boxed::Box;
111use alloc::collections::BTreeMap;
112use alloc::string::String;
113use alloc::vec::Vec;
114use core::borrow::Borrow;
115use core::hash::{BuildHasher, Hash};
116use core::num::NonZeroUsize;
117use parking_lot::Mutex;
118
119#[cfg(feature = "hashbrown")]
120use hashbrown::DefaultHashBuilder;
121
122#[cfg(not(feature = "hashbrown"))]
123use std::collections::hash_map::RandomState as DefaultHashBuilder;
124
125/// A thread-safe LFU cache with segmented storage for high concurrency.
126pub struct ConcurrentLfuCache<K, V, S = DefaultHashBuilder> {
127    segments: Box<[Mutex<LfuSegment<K, V, S>>]>,
128    hash_builder: S,
129}
130
131impl<K, V> ConcurrentLfuCache<K, V, DefaultHashBuilder>
132where
133    K: Hash + Eq + Clone + Send,
134    V: Clone + Send,
135{
136    /// Creates a new concurrent LFU cache from a configuration.
137    ///
138    /// This is the **recommended** way to create a concurrent LFU cache.
139    ///
140    /// # Arguments
141    /// * `config` - The cache configuration
142    /// * `hasher` - Optional custom hash builder. If `None`, uses the default.
143    pub fn init(
144        config: crate::config::ConcurrentLfuCacheConfig,
145        hasher: Option<DefaultHashBuilder>,
146    ) -> Self {
147        let segment_count = config.segments;
148        let capacity = config.base.capacity;
149        let max_size = config.base.max_size;
150
151        let segment_capacity = capacity.get() / segment_count;
152        let segment_cap = NonZeroUsize::new(segment_capacity.max(1)).unwrap();
153        let segment_max_size = max_size / segment_count as u64;
154
155        let hash_builder = hasher.unwrap_or_default();
156
157        let segments: Vec<_> = (0..segment_count)
158            .map(|_| {
159                let segment_config = crate::config::LfuCacheConfig {
160                    capacity: segment_cap,
161                    max_size: segment_max_size,
162                };
163                Mutex::new(LfuSegment::init(segment_config, hash_builder.clone()))
164            })
165            .collect();
166
167        Self {
168            segments: segments.into_boxed_slice(),
169            hash_builder,
170        }
171    }
172}
173
174impl<K, V, S> ConcurrentLfuCache<K, V, S>
175where
176    K: Hash + Eq + Clone + Send,
177    V: Clone + Send,
178    S: BuildHasher + Clone + Send,
179{
180    #[inline]
181    fn segment_index<Q>(&self, key: &Q) -> usize
182    where
183        K: Borrow<Q>,
184        Q: ?Sized + Hash,
185    {
186        (self.hash_builder.hash_one(key) as usize) % self.segments.len()
187    }
188
189    /// Returns the total capacity across all segments.
190    pub fn capacity(&self) -> usize {
191        self.segments.iter().map(|s| s.lock().cap().get()).sum()
192    }
193
194    /// Returns the number of segments in the cache.
195    pub fn segment_count(&self) -> usize {
196        self.segments.len()
197    }
198
199    /// Returns the total number of entries across all segments.
200    pub fn len(&self) -> usize {
201        self.segments.iter().map(|s| s.lock().len()).sum()
202    }
203
204    /// Returns `true` if the cache contains no entries.
205    pub fn is_empty(&self) -> bool {
206        self.segments.iter().all(|s| s.lock().is_empty())
207    }
208
209    /// Gets a value from the cache.
210    ///
211    /// This clones the value to avoid holding the lock. For zero-copy access,
212    /// use `get_with()` instead.
213    pub fn get<Q>(&self, key: &Q) -> Option<V>
214    where
215        K: Borrow<Q>,
216        Q: ?Sized + Hash + Eq,
217    {
218        let idx = self.segment_index(key);
219        let mut segment = self.segments[idx].lock();
220        segment.get(key).cloned()
221    }
222
223    /// Gets a value and applies a function to it while holding the lock.
224    ///
225    /// This is more efficient than `get()` when you only need to read from the value,
226    /// as it avoids cloning.
227    pub fn get_with<Q, F, R>(&self, key: &Q, f: F) -> Option<R>
228    where
229        K: Borrow<Q>,
230        Q: ?Sized + Hash + Eq,
231        F: FnOnce(&V) -> R,
232    {
233        let idx = self.segment_index(key);
234        let mut segment = self.segments[idx].lock();
235        segment.get(key).map(f)
236    }
237
238    /// Inserts a key-value pair into the cache.
239    ///
240    /// If the cache is at capacity, the least frequently used entry is evicted.
241    pub fn put(&self, key: K, value: V) -> Option<(K, V)> {
242        let idx = self.segment_index(&key);
243        let mut segment = self.segments[idx].lock();
244        segment.put(key, value)
245    }
246
247    /// Inserts a key-value pair with explicit size tracking.
248    pub fn put_with_size(&self, key: K, value: V, size: u64) -> Option<(K, V)> {
249        let idx = self.segment_index(&key);
250        let mut segment = self.segments[idx].lock();
251        segment.put_with_size(key, value, size)
252    }
253
254    /// Removes a key from the cache, returning the value if it existed.
255    pub fn remove<Q>(&self, key: &Q) -> Option<V>
256    where
257        K: Borrow<Q>,
258        Q: ?Sized + Hash + Eq,
259    {
260        let idx = self.segment_index(key);
261        let mut segment = self.segments[idx].lock();
262        segment.remove(key)
263    }
264
265    /// Returns `true` if the cache contains the specified key.
266    pub fn contains_key<Q>(&self, key: &Q) -> bool
267    where
268        K: Borrow<Q>,
269        Q: ?Sized + Hash + Eq,
270    {
271        let idx = self.segment_index(key);
272        let mut segment = self.segments[idx].lock();
273        segment.get(key).is_some()
274    }
275
276    /// Clears all entries from the cache.
277    pub fn clear(&self) {
278        for segment in self.segments.iter() {
279            segment.lock().clear();
280        }
281    }
282
283    /// Returns the current total size of cached content across all segments.
284    pub fn current_size(&self) -> u64 {
285        self.segments.iter().map(|s| s.lock().current_size()).sum()
286    }
287
288    /// Returns the maximum content size the cache can hold across all segments.
289    pub fn max_size(&self) -> u64 {
290        self.segments.iter().map(|s| s.lock().max_size()).sum()
291    }
292}
293
294impl<K, V, S> CacheMetrics for ConcurrentLfuCache<K, V, S>
295where
296    K: Hash + Eq + Clone + Send,
297    V: Clone + Send,
298    S: BuildHasher + Clone + Send,
299{
300    fn metrics(&self) -> BTreeMap<String, f64> {
301        let mut aggregated = BTreeMap::new();
302        for segment in self.segments.iter() {
303            let segment_metrics = segment.lock().metrics().metrics();
304            for (key, value) in segment_metrics {
305                *aggregated.entry(key).or_insert(0.0) += value;
306            }
307        }
308        aggregated
309    }
310
311    fn algorithm_name(&self) -> &'static str {
312        "ConcurrentLFU"
313    }
314}
315
316unsafe impl<K: Send, V: Send, S: Send> Send for ConcurrentLfuCache<K, V, S> {}
317unsafe impl<K: Send, V: Send, S: Send + Sync> Sync for ConcurrentLfuCache<K, V, S> {}
318
319impl<K, V, S> core::fmt::Debug for ConcurrentLfuCache<K, V, S>
320where
321    K: Hash + Eq + Clone + Send,
322    V: Clone + Send,
323    S: BuildHasher + Clone + Send,
324{
325    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
326        f.debug_struct("ConcurrentLfuCache")
327            .field("segment_count", &self.segments.len())
328            .field("total_len", &self.len())
329            .finish()
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use crate::config::{ConcurrentCacheConfig, ConcurrentLfuCacheConfig, LfuCacheConfig};
337
338    extern crate std;
339    use std::string::ToString;
340    use std::sync::Arc;
341    use std::thread;
342    use std::vec::Vec;
343
344    fn make_config(capacity: usize, segments: usize) -> ConcurrentLfuCacheConfig {
345        ConcurrentCacheConfig {
346            base: LfuCacheConfig {
347                capacity: NonZeroUsize::new(capacity).unwrap(),
348                max_size: u64::MAX,
349            },
350            segments,
351        }
352    }
353
354    #[test]
355    fn test_basic_operations() {
356        let cache: ConcurrentLfuCache<String, i32> =
357            ConcurrentLfuCache::init(make_config(100, 16), None);
358
359        cache.put("a".to_string(), 1);
360        cache.put("b".to_string(), 2);
361
362        assert_eq!(cache.get(&"a".to_string()), Some(1));
363        assert_eq!(cache.get(&"b".to_string()), Some(2));
364    }
365
366    #[test]
367    fn test_concurrent_access() {
368        let cache: Arc<ConcurrentLfuCache<String, i32>> =
369            Arc::new(ConcurrentLfuCache::init(make_config(1000, 16), None));
370        let num_threads = 8;
371        let ops_per_thread = 500;
372
373        let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
374
375        for t in 0..num_threads {
376            let cache = Arc::clone(&cache);
377            handles.push(thread::spawn(move || {
378                for i in 0..ops_per_thread {
379                    let key = std::format!("key_{}_{}", t, i);
380                    cache.put(key.clone(), i);
381                    // Access multiple times to test frequency tracking
382                    if i % 3 == 0 {
383                        let _ = cache.get(&key);
384                        let _ = cache.get(&key);
385                    }
386                }
387            }));
388        }
389
390        for handle in handles {
391            handle.join().unwrap();
392        }
393
394        assert!(!cache.is_empty());
395    }
396
397    #[test]
398    fn test_capacity() {
399        let cache: ConcurrentLfuCache<String, i32> =
400            ConcurrentLfuCache::init(make_config(100, 16), None);
401
402        // Capacity is distributed across segments
403        let capacity = cache.capacity();
404        assert!(capacity >= 16);
405        assert!(capacity <= 100);
406    }
407
408    #[test]
409    fn test_segment_count() {
410        let cache: ConcurrentLfuCache<String, i32> =
411            ConcurrentLfuCache::init(make_config(100, 8), None);
412
413        assert_eq!(cache.segment_count(), 8);
414    }
415
416    #[test]
417    fn test_len_and_is_empty() {
418        let cache: ConcurrentLfuCache<String, i32> =
419            ConcurrentLfuCache::init(make_config(100, 16), None);
420
421        assert!(cache.is_empty());
422        assert_eq!(cache.len(), 0);
423
424        cache.put("key1".to_string(), 1);
425        assert_eq!(cache.len(), 1);
426        assert!(!cache.is_empty());
427
428        cache.put("key2".to_string(), 2);
429        assert_eq!(cache.len(), 2);
430    }
431
432    #[test]
433    fn test_remove() {
434        let cache: ConcurrentLfuCache<String, i32> =
435            ConcurrentLfuCache::init(make_config(100, 16), None);
436
437        cache.put("key1".to_string(), 1);
438        cache.put("key2".to_string(), 2);
439
440        assert_eq!(cache.remove(&"key1".to_string()), Some(1));
441        assert_eq!(cache.len(), 1);
442        assert_eq!(cache.get(&"key1".to_string()), None);
443
444        assert_eq!(cache.remove(&"nonexistent".to_string()), None);
445    }
446
447    #[test]
448    fn test_clear() {
449        let cache: ConcurrentLfuCache<String, i32> =
450            ConcurrentLfuCache::init(make_config(100, 16), None);
451
452        cache.put("key1".to_string(), 1);
453        cache.put("key2".to_string(), 2);
454        cache.put("key3".to_string(), 3);
455
456        assert_eq!(cache.len(), 3);
457
458        cache.clear();
459
460        assert_eq!(cache.len(), 0);
461        assert!(cache.is_empty());
462        assert_eq!(cache.get(&"key1".to_string()), None);
463    }
464
465    #[test]
466    fn test_contains_key() {
467        let cache: ConcurrentLfuCache<String, i32> =
468            ConcurrentLfuCache::init(make_config(100, 16), None);
469
470        cache.put("exists".to_string(), 1);
471
472        assert!(cache.contains_key(&"exists".to_string()));
473        assert!(!cache.contains_key(&"missing".to_string()));
474    }
475
476    #[test]
477    fn test_get_with() {
478        let cache: ConcurrentLfuCache<String, String> =
479            ConcurrentLfuCache::init(make_config(100, 16), None);
480
481        cache.put("key".to_string(), "hello world".to_string());
482
483        let len = cache.get_with(&"key".to_string(), |v: &String| v.len());
484        assert_eq!(len, Some(11));
485
486        let missing = cache.get_with(&"missing".to_string(), |v: &String| v.len());
487        assert_eq!(missing, None);
488    }
489
490    #[test]
491    fn test_frequency_eviction() {
492        let cache: ConcurrentLfuCache<String, i32> =
493            ConcurrentLfuCache::init(make_config(48, 16), None);
494
495        cache.put("a".to_string(), 1);
496        cache.put("b".to_string(), 2);
497        cache.put("c".to_string(), 3);
498
499        // Access "a" and "c" multiple times to increase frequency
500        for _ in 0..5 {
501            let _ = cache.get(&"a".to_string());
502            let _ = cache.get(&"c".to_string());
503        }
504
505        // Add a new item
506        cache.put("d".to_string(), 4);
507
508        assert!(cache.len() <= 48);
509    }
510
511    #[test]
512    fn test_eviction_on_capacity() {
513        let cache: ConcurrentLfuCache<String, i32> =
514            ConcurrentLfuCache::init(make_config(80, 16), None);
515
516        // Fill the cache
517        for i in 0..10 {
518            cache.put(std::format!("key{}", i), i);
519        }
520
521        // Cache should not exceed capacity
522        assert!(cache.len() <= 80);
523    }
524
525    #[test]
526    fn test_metrics() {
527        let cache: ConcurrentLfuCache<String, i32> =
528            ConcurrentLfuCache::init(make_config(100, 16), None);
529
530        cache.put("a".to_string(), 1);
531        cache.put("b".to_string(), 2);
532
533        let metrics = cache.metrics();
534        // Metrics aggregation across segments
535        assert!(!metrics.is_empty());
536    }
537
538    #[test]
539    fn test_algorithm_name() {
540        let cache: ConcurrentLfuCache<String, i32> =
541            ConcurrentLfuCache::init(make_config(100, 16), None);
542
543        assert_eq!(cache.algorithm_name(), "ConcurrentLFU");
544    }
545
546    #[test]
547    fn test_empty_cache_operations() {
548        let cache: ConcurrentLfuCache<String, i32> =
549            ConcurrentLfuCache::init(make_config(100, 16), None);
550
551        assert!(cache.is_empty());
552        assert_eq!(cache.len(), 0);
553        assert_eq!(cache.get(&"missing".to_string()), None);
554        assert_eq!(cache.remove(&"missing".to_string()), None);
555        assert!(!cache.contains_key(&"missing".to_string()));
556    }
557
558    #[test]
559    fn test_borrowed_key_lookup() {
560        let cache: ConcurrentLfuCache<String, i32> =
561            ConcurrentLfuCache::init(make_config(100, 16), None);
562
563        cache.put("test_key".to_string(), 42);
564
565        // Test with borrowed key
566        let key_str = "test_key";
567        assert_eq!(cache.get(key_str), Some(42));
568        assert!(cache.contains_key(key_str));
569        assert_eq!(cache.remove(key_str), Some(42));
570    }
571
572    #[test]
573    fn test_frequency_tracking() {
574        let cache: ConcurrentLfuCache<String, i32> =
575            ConcurrentLfuCache::init(make_config(100, 16), None);
576
577        cache.put("key".to_string(), 1);
578
579        // Access the key multiple times
580        for _ in 0..10 {
581            let _ = cache.get(&"key".to_string());
582        }
583
584        // Item should still be accessible
585        assert_eq!(cache.get(&"key".to_string()), Some(1));
586    }
587}