Skip to main content

cache_rs/concurrent/
lfuda.rs

1//! Concurrent LFUDA Cache Implementation
2//!
3//! A thread-safe LFUDA cache using lock striping (segmented storage) for high-performance
4//! concurrent access. This is the multi-threaded counterpart to [`LfudaCache`](crate::LfudaCache).
5//!
6//! # How It Works
7//!
8//! LFUDA (LFU with Dynamic Aging) addresses the cache pollution problem in LFU by
9//! incorporating a global age factor. When an item is evicted, the age is set to its
10//! priority. Newly inserted items start with priority = age + 1, giving them a fair
11//! chance against long-cached items with high frequency counts.
12//!
13//! The cache partitions keys across multiple independent segments, each with its own
14//! lock and independent aging state.
15//!
16//! ```text
17//! ┌──────────────────────────────────────────────────────────────────────┐
18//! │                     ConcurrentLfudaCache                             │
19//! │                                                                      │
20//! │  hash(key) % N  ──>  Segment Selection                               │
21//! │                                                                      │
22//! │  ┌──────────────┐ ┌──────────────┐     ┌──────────────┐              │
23//! │  │  Segment 0   │ │  Segment 1   │ ... │  Segment N-1 │              │
24//! │  │  age=100     │ │  age=150     │     │  age=120     │              │
25//! │  │  ┌────────┐  │ │  ┌────────┐  │     │  ┌────────┐  │              │
26//! │  │  │ Mutex  │  │ │  │ Mutex  │  │     │  │ Mutex  │  │              │
27//! │  │  └────┬───┘  │ │  └────┬───┘  │     │  └────┬───┘  │              │
28//! │  │       │      │ │       │      │     │       │      │              │
29//! │  │  ┌────▼────┐ │ │  ┌────▼────┐ │     │  ┌────▼────┐ │              │
30//! │  │  │LfudaSeg │ │ │  │LfudaSeg │ │     │  │LfudaSeg │ │              │
31//! │  │  └─────────┘ │ │  └─────────┘ │     │  └─────────┘ │              │
32//! │  └──────────────┘ └──────────────┘     └──────────────┘              │
33//! └──────────────────────────────────────────────────────────────────────┘
34//! ```
35//!
36//! ## Per-Segment Aging
37//!
38//! Each segment maintains its own global age counter. This means:
39//! - Aging happens independently in each segment
40//! - High-activity segments may age faster than low-activity ones
41//! - Items in different segments are not directly comparable by priority
42//!
43//! This is a deliberate trade-off: global aging would require cross-segment
44//! coordination, hurting concurrency.
45//!
46//! ## Trade-offs
47//!
48//! - **Pros**: Near-linear scaling, adapts to changing popularity per segment
49//! - **Cons**: Aging is local to each segment, not global
50//!
51//! # Performance Characteristics
52//!
53//! | Metric | Value |
54//! |--------|-------|
55//! | Get/Put/Remove | O(log P) per segment |
56//! | Concurrency | Near-linear scaling up to segment count |
57//! | Memory overhead | ~160 bytes per entry + one Mutex per segment |
58//! | Adaptability | Handles changing popularity patterns |
59//!
60//! Where P = distinct priority values per segment. Priority = frequency + age,
61//! so P can grow with segment size.
62//!
63//! # When to Use
64//!
65//! **Use ConcurrentLfudaCache when:**
66//! - Multiple threads need cache access
67//! - Item popularity changes over time
68//! - Long-running applications where old items should eventually age out
69//! - You need frequency-based eviction with adaptation
70//!
71//! **Consider alternatives when:**
72//! - Single-threaded access only → use `LfudaCache`
73//! - Static popularity patterns → use `ConcurrentLfuCache` (simpler)
74//! - Recency-based access → use `ConcurrentLruCache`
75//! - Need global aging coordination → use `Mutex<LfudaCache>`
76//!
77//! # Thread Safety
78//!
79//! `ConcurrentLfudaCache` is `Send + Sync` and can be shared via `Arc`.
80//!
81//! # Example
82//!
83//! ```rust,ignore
84//! use cache_rs::concurrent::ConcurrentLfudaCache;
85//! use cache_rs::config::ConcurrentLfudaCacheConfig;
86//! use std::num::NonZeroUsize;
87//! use std::sync::Arc;
88//! use std::thread;
89//!
90//! // Create a cache that adapts to changing popularity
91//! let config = ConcurrentLfudaCacheConfig::new(NonZeroUsize::new(10_000).unwrap());
92//! let cache = Arc::new(ConcurrentLfudaCache::init(config, None));
93//!
94//! // Phase 1: Establish initial popularity
95//! for i in 0..1000 {
96//!     cache.put(format!("old-{}", i), i, 1);
97//!     for _ in 0..10 {
98//!         cache.get(&format!("old-{}", i));
99//!     }
100//! }
101//!
102//! // Phase 2: New content arrives, old content ages out
103//! let handles: Vec<_> = (0..4).map(|t| {
104//!     let cache = Arc::clone(&cache);
105//!     thread::spawn(move || {
106//!         for i in 0..5000 {
107//!             let key = format!("new-{}-{}", t, i);
108//!             cache.put(key.clone(), i as i32, 1);
109//!             let _ = cache.get(&key);
110//!         }
111//!     })
112//! }).collect();
113//!
114//! for h in handles {
115//!     h.join().unwrap();
116//! }
117//!
118//! // Old items gradually evicted despite high historical frequency
119//! println!("Cache size: {}", cache.len());
120//! ```
121
122extern crate alloc;
123
124use crate::lfuda::LfudaSegment;
125use crate::metrics::CacheMetrics;
126use alloc::boxed::Box;
127use alloc::collections::BTreeMap;
128use alloc::string::String;
129use alloc::vec::Vec;
130use core::borrow::Borrow;
131use core::hash::{BuildHasher, Hash};
132use core::num::NonZeroUsize;
133use parking_lot::Mutex;
134
135#[cfg(feature = "hashbrown")]
136use hashbrown::DefaultHashBuilder;
137
138#[cfg(not(feature = "hashbrown"))]
139use std::collections::hash_map::RandomState as DefaultHashBuilder;
140
141/// A thread-safe LFUDA cache with segmented storage for high concurrency.
142pub struct ConcurrentLfudaCache<K, V, S = DefaultHashBuilder> {
143    segments: Box<[Mutex<LfudaSegment<K, V, S>>]>,
144    hash_builder: S,
145}
146
147impl<K, V> ConcurrentLfudaCache<K, V, DefaultHashBuilder>
148where
149    K: Hash + Eq + Clone + Send,
150    V: Clone + Send,
151{
152    /// Creates a new concurrent LFUDA cache from a configuration.
153    ///
154    /// This is the **recommended** way to create a concurrent LFUDA cache.
155    ///
156    /// # Arguments
157    ///
158    /// * `config` - The cache configuration specifying capacity, segments, etc.
159    /// * `hasher` - Optional custom hash builder. If `None`, uses the default.
160    pub fn init(
161        config: crate::config::ConcurrentLfudaCacheConfig,
162        hasher: Option<DefaultHashBuilder>,
163    ) -> Self {
164        let segment_count = config.segments;
165        let capacity = config.base.capacity;
166        let max_size = config.base.max_size;
167        let initial_age = config.base.initial_age;
168
169        let hash_builder = hasher.unwrap_or_default();
170
171        let segment_capacity = capacity.get() / segment_count;
172        let segment_cap = NonZeroUsize::new(segment_capacity.max(1)).unwrap();
173        let segment_max_size = max_size / segment_count as u64;
174
175        let segments: Vec<_> = (0..segment_count)
176            .map(|_| {
177                let segment_config = crate::config::LfudaCacheConfig {
178                    capacity: segment_cap,
179                    initial_age,
180                    max_size: segment_max_size,
181                };
182                Mutex::new(LfudaSegment::init(segment_config, hash_builder.clone()))
183            })
184            .collect();
185
186        Self {
187            segments: segments.into_boxed_slice(),
188            hash_builder,
189        }
190    }
191}
192
193impl<K, V, S> ConcurrentLfudaCache<K, V, S>
194where
195    K: Hash + Eq + Clone + Send,
196    V: Clone + Send,
197    S: BuildHasher + Clone + Send,
198{
199    #[inline]
200    fn segment_index<Q>(&self, key: &Q) -> usize
201    where
202        K: Borrow<Q>,
203        Q: ?Sized + Hash,
204    {
205        (self.hash_builder.hash_one(key) as usize) % self.segments.len()
206    }
207
208    /// Returns the total capacity across all segments.
209    pub fn capacity(&self) -> usize {
210        let mut total = 0usize;
211        for segment in self.segments.iter() {
212            total += segment.lock().cap().get();
213        }
214        total
215    }
216
217    /// Returns the number of segments in the cache.
218    pub fn segment_count(&self) -> usize {
219        self.segments.len()
220    }
221
222    /// Returns the total number of entries across all segments.
223    pub fn len(&self) -> usize {
224        let mut total = 0usize;
225        for segment in self.segments.iter() {
226            total += segment.lock().len();
227        }
228        total
229    }
230
231    /// Returns `true` if the cache contains no entries.
232    pub fn is_empty(&self) -> bool {
233        for segment in self.segments.iter() {
234            if !segment.lock().is_empty() {
235                return false;
236            }
237        }
238        true
239    }
240
241    /// Gets a value from the cache.
242    ///
243    /// This clones the value to avoid holding the lock. For zero-copy access,
244    /// use `get_with()` instead.
245    pub fn get<Q>(&self, key: &Q) -> Option<V>
246    where
247        K: Borrow<Q>,
248        Q: ?Sized + Hash + Eq,
249    {
250        let idx = self.segment_index(key);
251        let mut segment = self.segments[idx].lock();
252        segment.get(key).cloned()
253    }
254
255    /// Gets a value and applies a function to it while holding the lock.
256    ///
257    /// This is more efficient than `get()` when you only need to read from the value,
258    /// as it avoids cloning.
259    pub fn get_with<Q, F, R>(&self, key: &Q, f: F) -> Option<R>
260    where
261        K: Borrow<Q>,
262        Q: ?Sized + Hash + Eq,
263        F: FnOnce(&V) -> R,
264    {
265        let idx = self.segment_index(key);
266        let mut segment = self.segments[idx].lock();
267        segment.get(key).map(f)
268    }
269
270    /// Inserts a key-value pair into the cache with optional size tracking.
271    ///
272    /// If the cache is at capacity, the entry with lowest priority (frequency + age) is evicted.
273    /// Use `SIZE_UNIT` (1) for count-based caching.
274    pub fn put(&self, key: K, value: V, size: u64) -> Option<Vec<(K, V)>> {
275        let idx = self.segment_index(&key);
276        let mut segment = self.segments[idx].lock();
277        segment.put(key, value, size)
278    }
279
280    /// Removes a key from the cache, returning the value if it existed.
281    pub fn remove<Q>(&self, key: &Q) -> Option<V>
282    where
283        K: Borrow<Q>,
284        Q: ?Sized + Hash + Eq,
285    {
286        let idx = self.segment_index(key);
287        let mut segment = self.segments[idx].lock();
288        segment.remove(key)
289    }
290
291    /// Clears all entries from the cache.
292    pub fn clear(&self) {
293        for segment in self.segments.iter() {
294            segment.lock().clear();
295        }
296    }
297
298    /// Returns the current total size of cached content across all segments.
299    pub fn current_size(&self) -> u64 {
300        self.segments.iter().map(|s| s.lock().current_size()).sum()
301    }
302
303    /// Returns the maximum content size the cache can hold across all segments.
304    pub fn max_size(&self) -> u64 {
305        self.segments.iter().map(|s| s.lock().max_size()).sum()
306    }
307
308    /// Checks if the cache contains a key without updating priority.
309    ///
310    /// This is a pure existence check that does **not** update the entry's priority
311    /// or frequency.
312    ///
313    /// # Example
314    ///
315    /// ```rust,ignore
316    /// if cache.contains(&"key".to_string()) {
317    ///     println!("Key exists!");
318    /// }
319    /// ```
320    pub fn contains<Q>(&self, key: &Q) -> bool
321    where
322        K: Borrow<Q>,
323        Q: ?Sized + Hash + Eq,
324    {
325        let idx = self.segment_index(key);
326        let segment = self.segments[idx].lock();
327        segment.contains(key)
328    }
329
330    /// Returns a clone of the value without updating priority or access metadata.
331    ///
332    /// Unlike [`get()`](Self::get), this does NOT update the entry's frequency
333    /// or priority. Returns a cloned value because the internal lock cannot be
334    /// held across the return boundary.
335    ///
336    /// # Example
337    ///
338    /// ```rust,ignore
339    /// let value = cache.peek(&"key".to_string());
340    /// ```
341    pub fn peek<Q>(&self, key: &Q) -> Option<V>
342    where
343        K: Borrow<Q>,
344        Q: ?Sized + Hash + Eq,
345        V: Clone,
346    {
347        let idx = self.segment_index(key);
348        let segment = self.segments[idx].lock();
349        segment.peek(key).cloned()
350    }
351}
352
353impl<K, V, S> CacheMetrics for ConcurrentLfudaCache<K, V, S>
354where
355    K: Hash + Eq + Clone + Send,
356    V: Clone + Send,
357    S: BuildHasher + Clone + Send,
358{
359    fn metrics(&self) -> BTreeMap<String, f64> {
360        let mut aggregated = BTreeMap::new();
361        for segment in self.segments.iter() {
362            let segment_metrics = segment.lock().metrics().metrics();
363            for (key, value) in segment_metrics {
364                *aggregated.entry(key).or_insert(0.0) += value;
365            }
366        }
367        aggregated
368    }
369
370    fn algorithm_name(&self) -> &'static str {
371        "ConcurrentLFUDA"
372    }
373}
374
375unsafe impl<K: Send, V: Send, S: Send> Send for ConcurrentLfudaCache<K, V, S> {}
376unsafe impl<K: Send, V: Send, S: Send + Sync> Sync for ConcurrentLfudaCache<K, V, S> {}
377
378impl<K, V, S> core::fmt::Debug for ConcurrentLfudaCache<K, V, S>
379where
380    K: Hash + Eq + Clone + Send,
381    V: Clone + Send,
382    S: BuildHasher + Clone + Send,
383{
384    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
385        f.debug_struct("ConcurrentLfudaCache")
386            .field("segment_count", &self.segments.len())
387            .field("total_len", &self.len())
388            .finish()
389    }
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395    use crate::config::{ConcurrentCacheConfig, ConcurrentLfudaCacheConfig, LfudaCacheConfig};
396
397    extern crate std;
398    use std::string::ToString;
399    use std::sync::Arc;
400    use std::thread;
401    use std::vec::Vec;
402
403    fn make_config(capacity: usize, segments: usize) -> ConcurrentLfudaCacheConfig {
404        ConcurrentCacheConfig {
405            base: LfudaCacheConfig {
406                capacity: NonZeroUsize::new(capacity).unwrap(),
407                initial_age: 0,
408                max_size: u64::MAX,
409            },
410            segments,
411        }
412    }
413
414    #[test]
415    fn test_basic_operations() {
416        let cache: ConcurrentLfudaCache<String, i32> =
417            ConcurrentLfudaCache::init(make_config(100, 16), None);
418
419        cache.put("a".to_string(), 1, 1);
420        cache.put("b".to_string(), 2, 1);
421
422        assert_eq!(cache.get(&"a".to_string()), Some(1));
423        assert_eq!(cache.get(&"b".to_string()), Some(2));
424    }
425
426    #[test]
427    fn test_concurrent_access() {
428        let cache: Arc<ConcurrentLfudaCache<String, i32>> =
429            Arc::new(ConcurrentLfudaCache::init(make_config(1000, 16), None));
430        let num_threads = 8;
431        let ops_per_thread = 500;
432
433        let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
434
435        for t in 0..num_threads {
436            let cache = Arc::clone(&cache);
437            handles.push(thread::spawn(move || {
438                for i in 0..ops_per_thread {
439                    let key = std::format!("key_{}_{}", t, i);
440                    cache.put(key.clone(), i, 1);
441                    let _ = cache.get(&key);
442                }
443            }));
444        }
445
446        for handle in handles {
447            handle.join().unwrap();
448        }
449
450        assert!(!cache.is_empty());
451    }
452
453    #[test]
454    fn test_capacity() {
455        let cache: ConcurrentLfudaCache<String, i32> =
456            ConcurrentLfudaCache::init(make_config(100, 16), None);
457
458        // Capacity is distributed across segments
459        let capacity = cache.capacity();
460        assert!(capacity >= 16);
461        assert!(capacity <= 100);
462    }
463
464    #[test]
465    fn test_segment_count() {
466        let cache: ConcurrentLfudaCache<String, i32> =
467            ConcurrentLfudaCache::init(make_config(100, 8), None);
468
469        assert_eq!(cache.segment_count(), 8);
470    }
471
472    #[test]
473    fn test_len_and_is_empty() {
474        let cache: ConcurrentLfudaCache<String, i32> =
475            ConcurrentLfudaCache::init(make_config(100, 16), None);
476
477        assert!(cache.is_empty());
478        assert_eq!(cache.len(), 0);
479
480        cache.put("key1".to_string(), 1, 1);
481        assert_eq!(cache.len(), 1);
482        assert!(!cache.is_empty());
483
484        cache.put("key2".to_string(), 2, 1);
485        assert_eq!(cache.len(), 2);
486    }
487
488    #[test]
489    fn test_remove() {
490        let cache: ConcurrentLfudaCache<String, i32> =
491            ConcurrentLfudaCache::init(make_config(100, 16), None);
492
493        cache.put("key1".to_string(), 1, 1);
494        cache.put("key2".to_string(), 2, 1);
495
496        assert_eq!(cache.remove(&"key1".to_string()), Some(1));
497        assert_eq!(cache.len(), 1);
498        assert_eq!(cache.get(&"key1".to_string()), None);
499
500        assert_eq!(cache.remove(&"nonexistent".to_string()), None);
501    }
502
503    #[test]
504    fn test_clear() {
505        let cache: ConcurrentLfudaCache<String, i32> =
506            ConcurrentLfudaCache::init(make_config(100, 16), None);
507
508        cache.put("key1".to_string(), 1, 1);
509        cache.put("key2".to_string(), 2, 1);
510        cache.put("key3".to_string(), 3, 1);
511
512        assert_eq!(cache.len(), 3);
513
514        cache.clear();
515
516        assert_eq!(cache.len(), 0);
517        assert!(cache.is_empty());
518        assert_eq!(cache.get(&"key1".to_string()), None);
519    }
520
521    #[test]
522    fn test_contains_key() {
523        let cache: ConcurrentLfudaCache<String, i32> =
524            ConcurrentLfudaCache::init(make_config(100, 16), None);
525
526        cache.put("exists".to_string(), 1, 1);
527
528        assert!(cache.contains(&"exists".to_string()));
529        assert!(!cache.contains(&"missing".to_string()));
530    }
531
532    #[test]
533    fn test_get_with() {
534        let cache: ConcurrentLfudaCache<String, String> =
535            ConcurrentLfudaCache::init(make_config(100, 16), None);
536
537        cache.put("key".to_string(), "hello world".to_string(), 1);
538
539        let len = cache.get_with(&"key".to_string(), |v: &String| v.len());
540        assert_eq!(len, Some(11));
541
542        let missing = cache.get_with(&"missing".to_string(), |v: &String| v.len());
543        assert_eq!(missing, None);
544    }
545
546    #[test]
547    fn test_aging_behavior() {
548        let cache: ConcurrentLfudaCache<String, i32> =
549            ConcurrentLfudaCache::init(make_config(48, 16), None);
550
551        cache.put("a".to_string(), 1, 1);
552        cache.put("b".to_string(), 2, 1);
553        cache.put("c".to_string(), 3, 1);
554
555        // Access "a" and "c" multiple times to increase frequency
556        for _ in 0..5 {
557            let _ = cache.get(&"a".to_string());
558            let _ = cache.get(&"c".to_string());
559        }
560
561        // Add a new item, aging should adjust priorities
562        cache.put("d".to_string(), 4, 1);
563
564        assert!(cache.len() <= 48);
565    }
566
567    #[test]
568    fn test_eviction_on_capacity() {
569        let cache: ConcurrentLfudaCache<String, i32> =
570            ConcurrentLfudaCache::init(make_config(80, 16), None);
571
572        // Fill the cache
573        for i in 0..10 {
574            cache.put(std::format!("key{}", i), i, 1);
575        }
576
577        // Cache should not exceed capacity
578        assert!(cache.len() <= 80);
579    }
580
581    #[test]
582    fn test_metrics() {
583        let cache: ConcurrentLfudaCache<String, i32> =
584            ConcurrentLfudaCache::init(make_config(100, 16), None);
585
586        cache.put("a".to_string(), 1, 1);
587        cache.put("b".to_string(), 2, 1);
588
589        let metrics = cache.metrics();
590        // Metrics aggregation across segments
591        assert!(!metrics.is_empty());
592    }
593
594    #[test]
595    fn test_algorithm_name() {
596        let cache: ConcurrentLfudaCache<String, i32> =
597            ConcurrentLfudaCache::init(make_config(100, 16), None);
598
599        assert_eq!(cache.algorithm_name(), "ConcurrentLFUDA");
600    }
601
602    #[test]
603    fn test_empty_cache_operations() {
604        let cache: ConcurrentLfudaCache<String, i32> =
605            ConcurrentLfudaCache::init(make_config(100, 16), None);
606
607        assert!(cache.is_empty());
608        assert_eq!(cache.len(), 0);
609        assert_eq!(cache.get(&"missing".to_string()), None);
610        assert_eq!(cache.remove(&"missing".to_string()), None);
611        assert!(!cache.contains(&"missing".to_string()));
612    }
613
614    #[test]
615    fn test_borrowed_key_lookup() {
616        let cache: ConcurrentLfudaCache<String, i32> =
617            ConcurrentLfudaCache::init(make_config(100, 16), None);
618
619        cache.put("test_key".to_string(), 42, 1);
620
621        // Test with borrowed key
622        let key_str = "test_key";
623        assert_eq!(cache.get(key_str), Some(42));
624        assert!(cache.contains(key_str));
625        assert_eq!(cache.remove(key_str), Some(42));
626    }
627
628    #[test]
629    fn test_frequency_with_aging() {
630        let cache: ConcurrentLfudaCache<String, i32> =
631            ConcurrentLfudaCache::init(make_config(100, 16), None);
632
633        cache.put("key".to_string(), 1, 1);
634
635        // Access the key multiple times
636        for _ in 0..10 {
637            let _ = cache.get(&"key".to_string());
638        }
639
640        // Item should still be accessible
641        assert_eq!(cache.get(&"key".to_string()), Some(1));
642    }
643
644    #[test]
645    fn test_dynamic_aging() {
646        let cache: ConcurrentLfudaCache<String, i32> =
647            ConcurrentLfudaCache::init(make_config(80, 16), None);
648
649        // Add items with different access patterns
650        for i in 0..5 {
651            cache.put(std::format!("key{}", i), i, 1);
652            for _ in 0..i {
653                let _ = cache.get(&std::format!("key{}", i));
654            }
655        }
656
657        // Add more items to trigger eviction with aging
658        for i in 5..10 {
659            cache.put(std::format!("key{}", i), i, 1);
660        }
661
662        assert!(cache.len() <= 80);
663    }
664
665    #[test]
666    fn test_contains_non_promoting() {
667        let cache: ConcurrentLfudaCache<String, i32> =
668            ConcurrentLfudaCache::init(make_config(100, 16), None);
669
670        cache.put("a".to_string(), 1, 1);
671        cache.put("b".to_string(), 2, 1);
672
673        // contains() should check without updating priority
674        assert!(cache.contains(&"a".to_string()));
675        assert!(cache.contains(&"b".to_string()));
676        assert!(!cache.contains(&"c".to_string()));
677    }
678}