Skip to main content

cache_rs/concurrent/
slru.rs

1//! Concurrent SLRU Cache Implementation
2//!
3//! A thread-safe Segmented LRU cache using lock striping (segmented storage) for
4//! high-performance concurrent access. This is the multi-threaded counterpart to
5//! [`SlruCache`](crate::SlruCache).
6//!
7//! # How It Works
8//!
9//! SLRU maintains two segments per shard: **probationary** (for new items) and
10//! **protected** (for frequently accessed items). Items enter probationary and
11//! are promoted to protected on subsequent access.
12//!
13//! The cache partitions keys across multiple independent shards using hash-based sharding.
14//!
15//! ```text
16//! ┌──────────────────────────────────────────────────────────────────────────────┐
17//! │                        ConcurrentSlruCache                                   │
18//! │                                                                              │
19//! │  hash(key) % N  ──▶  Shard Selection                                         │
20//! │                                                                              │
21//! │  ┌────────────────────┐ ┌────────────────────┐     ┌────────────────────┐    │
22//! │  │     Shard 0        │ │     Shard 1        │ ... │    Shard N-1       │    │
23//! │  │  ┌──────────────┐  │ │  ┌──────────────┐  │     │  ┌──────────────┐  │    │
24//! │  │  │    Mutex     │  │ │  │    Mutex     │  │     │  │    Mutex     │  │    │
25//! │  │  └──────┬───────┘  │ │  └──────┬───────┘  │     │  └──────┬───────┘  │    │
26//! │  │         │          │ │         │          │     │         │          │    │
27//! │  │  ┌──────▼───────┐  │ │  ┌──────▼───────┐  │     │  ┌──────▼───────┐  │    │
28//! │  │  │ Protected    │  │ │  │ Protected    │  │     │  │ Protected    │  │    │
29//! │  │  │ [hot items]  │  │ │  │ [hot items]  │  │     │  │ [hot items]  │  │    │
30//! │  │  ├──────────────┤  │ │  ├──────────────┤  │     │  ├──────────────┤  │    │
31//! │  │  │ Probationary │  │ │  │ Probationary │  │     │  │ Probationary │  │    │
32//! │  │  │ [new items]  │  │ │  │ [new items]  │  │     │  │ [new items]  │  │    │
33//! │  │  └──────────────┘  │ │  └──────────────┘  │     │  └──────────────┘  │    │
34//! │  └────────────────────┘ └────────────────────┘     └────────────────────┘    │
35//! └──────────────────────────────────────────────────────────────────────────────┘
36//! ```
37//!
38//! ## Segment Structure
39//!
40//! Each shard contains:
41//! - **Probationary segment**: New items enter here (default: 80% of shard capacity)
42//! - **Protected segment**: Items promoted on second access (default: 20% of shard capacity)
43//!
44//! ## Trade-offs
45//!
46//! - **Pros**: Near-linear scaling, excellent scan resistance per shard
47//! - **Cons**: Protection is per-shard, not global. An item hot in shard A
48//!   doesn't influence protection in shard B.
49//!
50//! # Performance Characteristics
51//!
52//! | Metric | Value |
53//! |--------|-------|
54//! | Get/Put/Remove | O(1) average |
55//! | Concurrency | Near-linear scaling up to shard count |
56//! | Memory overhead | ~140 bytes per entry + one Mutex per shard |
57//! | Scan resistance | Good (two-tier protection) |
58//!
59//! # When to Use
60//!
61//! **Use ConcurrentSlruCache when:**
62//! - Multiple threads need cache access
63//! - Workload mixes hot items with sequential scans
64//! - You need scan resistance in a concurrent environment
65//! - Some items are accessed repeatedly while others are one-time
66//!
67//! **Consider alternatives when:**
68//! - Single-threaded access only → use `SlruCache`
69//! - Pure recency patterns → use `ConcurrentLruCache` (simpler)
70//! - Frequency-dominant patterns → use `ConcurrentLfuCache`
71//! - Need global protection coordination → use `Mutex<SlruCache>`
72//!
73//! # Thread Safety
74//!
75//! `ConcurrentSlruCache` is `Send + Sync` and can be shared via `Arc`.
76//!
77//! # Example
78//!
79//! ```rust,ignore
80//! use cache_rs::concurrent::ConcurrentSlruCache;
81//! use cache_rs::config::{ConcurrentSlruCacheConfig, ConcurrentCacheConfig, SlruCacheConfig};
82//! use std::num::NonZeroUsize;
83//! use std::sync::Arc;
84//! use std::thread;
85//!
86//! // Total capacity 10000, protected segment 2000 (20%)
87//! let config: ConcurrentSlruCacheConfig = ConcurrentCacheConfig {
88//!     base: SlruCacheConfig {
89//!         capacity: NonZeroUsize::new(10_000).unwrap(),
90//!         protected_capacity: NonZeroUsize::new(2_000).unwrap(),
91//!         max_size: u64::MAX,
92//!     },
93//!     segments: 16,
94//! };
95//! let cache = Arc::new(ConcurrentSlruCache::init(config, None));
96//!
97//! // Thread 1: Establish hot items
98//! let cache1 = Arc::clone(&cache);
99//! let hot_handle = thread::spawn(move || {
100//!     for i in 0..100 {
101//!         let key = format!("hot-{}", i);
102//!         cache1.put(key.clone(), i);
103//!         // Second access promotes to protected
104//!         let _ = cache1.get(&key);
105//!     }
106//! });
107//!
108//! // Thread 2: Sequential scan (shouldn't evict hot items)
109//! let cache2 = Arc::clone(&cache);
110//! let scan_handle = thread::spawn(move || {
111//!     for i in 0..50000 {
112//!         cache2.put(format!("scan-{}", i), i as i32);
113//!         // No second access - stays in probationary
114//!     }
115//! });
116//!
117//! hot_handle.join().unwrap();
118//! scan_handle.join().unwrap();
119//!
120//! // Hot items should still be accessible (protected from scan)
121//! for i in 0..100 {
122//!     assert!(cache.get(&format!("hot-{}", i)).is_some());
123//! }
124//! ```
125
126extern crate alloc;
127
128use crate::metrics::CacheMetrics;
129use crate::slru::SlruSegment;
130use alloc::boxed::Box;
131use alloc::collections::BTreeMap;
132use alloc::string::String;
133use alloc::vec::Vec;
134use core::borrow::Borrow;
135use core::hash::{BuildHasher, Hash};
136use core::num::NonZeroUsize;
137use parking_lot::Mutex;
138
139#[cfg(feature = "hashbrown")]
140use hashbrown::DefaultHashBuilder;
141
142#[cfg(not(feature = "hashbrown"))]
143use std::collections::hash_map::RandomState as DefaultHashBuilder;
144
145/// A thread-safe SLRU cache with segmented storage for high concurrency.
146pub struct ConcurrentSlruCache<K, V, S = DefaultHashBuilder> {
147    segments: Box<[Mutex<SlruSegment<K, V, S>>]>,
148    hash_builder: S,
149}
150
151impl<K, V> ConcurrentSlruCache<K, V, DefaultHashBuilder>
152where
153    K: Hash + Eq + Clone + Send,
154    V: Clone + Send,
155{
156    /// Creates a new concurrent SLRU cache from a configuration.
157    ///
158    /// This is the **recommended** way to create a concurrent SLRU cache.
159    ///
160    /// # Arguments
161    /// * `config` - The cache configuration
162    /// * `hasher` - Optional custom hasher. If `None`, uses the default hasher.
163    pub fn init(
164        config: crate::config::ConcurrentSlruCacheConfig,
165        hasher: Option<DefaultHashBuilder>,
166    ) -> Self {
167        let segment_count = config.segments;
168        let capacity = config.base.capacity;
169        let protected_capacity = config.base.protected_capacity;
170        let max_size = config.base.max_size;
171
172        let hash_builder = hasher.unwrap_or_default();
173
174        let segment_capacity = capacity.get() / segment_count;
175        let segment_protected = protected_capacity.get() / segment_count;
176        let segment_cap = NonZeroUsize::new(segment_capacity.max(1)).unwrap();
177        let segment_protected_cap = NonZeroUsize::new(segment_protected.max(1)).unwrap();
178        let segment_max_size = max_size / segment_count as u64;
179
180        let segments: Vec<_> = (0..segment_count)
181            .map(|_| {
182                let segment_config = crate::config::SlruCacheConfig {
183                    capacity: segment_cap,
184                    protected_capacity: segment_protected_cap,
185                    max_size: segment_max_size,
186                };
187                Mutex::new(SlruSegment::init(segment_config, hash_builder.clone()))
188            })
189            .collect();
190
191        Self {
192            segments: segments.into_boxed_slice(),
193            hash_builder,
194        }
195    }
196}
197
198impl<K, V, S> ConcurrentSlruCache<K, V, S>
199where
200    K: Hash + Eq + Clone + Send,
201    V: Clone + Send,
202    S: BuildHasher + Clone + Send,
203{
204    #[inline]
205    fn segment_index<Q>(&self, key: &Q) -> usize
206    where
207        K: Borrow<Q>,
208        Q: ?Sized + Hash,
209    {
210        (self.hash_builder.hash_one(key) as usize) % self.segments.len()
211    }
212
213    /// Returns the total capacity across all segments.
214    pub fn capacity(&self) -> usize {
215        self.segments.iter().map(|s| s.lock().cap().get()).sum()
216    }
217
218    /// Returns the number of segments in the cache.
219    pub fn segment_count(&self) -> usize {
220        self.segments.len()
221    }
222
223    /// Returns the total number of entries across all segments.
224    pub fn len(&self) -> usize {
225        self.segments.iter().map(|s| s.lock().len()).sum()
226    }
227
228    /// Returns `true` if the cache contains no entries.
229    pub fn is_empty(&self) -> bool {
230        self.segments.iter().all(|s| s.lock().is_empty())
231    }
232
233    /// Gets a value from the cache.
234    ///
235    /// This clones the value to avoid holding the lock. For zero-copy access,
236    /// use `get_with()` instead.
237    pub fn get<Q>(&self, key: &Q) -> Option<V>
238    where
239        K: Borrow<Q>,
240        Q: ?Sized + Hash + Eq,
241    {
242        let idx = self.segment_index(key);
243        let mut segment = self.segments[idx].lock();
244        segment.get(key).cloned()
245    }
246
247    /// Gets a value and applies a function to it while holding the lock.
248    ///
249    /// This is more efficient than `get()` when you only need to read from the value,
250    /// as it avoids cloning.
251    pub fn get_with<Q, F, R>(&self, key: &Q, f: F) -> Option<R>
252    where
253        K: Borrow<Q>,
254        Q: ?Sized + Hash + Eq,
255        F: FnOnce(&V) -> R,
256    {
257        let idx = self.segment_index(key);
258        let mut segment = self.segments[idx].lock();
259        segment.get(key).map(f)
260    }
261
262    /// Inserts a key-value pair into the cache.
263    ///
264    /// New items enter the probationary segment and are promoted to the protected
265    /// segment on subsequent access.
266    pub fn put(&self, key: K, value: V) -> Option<(K, V)> {
267        let idx = self.segment_index(&key);
268        let mut segment = self.segments[idx].lock();
269        segment.put(key, value)
270    }
271
272    /// Inserts a key-value pair with explicit size tracking.
273    pub fn put_with_size(&self, key: K, value: V, size: u64) -> Option<(K, V)> {
274        let idx = self.segment_index(&key);
275        let mut segment = self.segments[idx].lock();
276        segment.put_with_size(key, value, size)
277    }
278
279    /// Removes a key from the cache, returning the value if it existed.
280    pub fn remove<Q>(&self, key: &Q) -> Option<V>
281    where
282        K: Borrow<Q>,
283        Q: ?Sized + Hash + Eq,
284    {
285        let idx = self.segment_index(key);
286        let mut segment = self.segments[idx].lock();
287        segment.remove(key)
288    }
289
290    /// Returns `true` if the cache contains the specified key.
291    pub fn contains_key<Q>(&self, key: &Q) -> bool
292    where
293        K: Borrow<Q>,
294        Q: ?Sized + Hash + Eq,
295    {
296        let idx = self.segment_index(key);
297        let mut segment = self.segments[idx].lock();
298        segment.get(key).is_some()
299    }
300
301    /// Clears all entries from the cache.
302    pub fn clear(&self) {
303        for segment in self.segments.iter() {
304            segment.lock().clear();
305        }
306    }
307
308    /// Returns the current total size of cached content across all segments.
309    pub fn current_size(&self) -> u64 {
310        self.segments.iter().map(|s| s.lock().current_size()).sum()
311    }
312
313    /// Returns the maximum content size the cache can hold across all segments.
314    pub fn max_size(&self) -> u64 {
315        self.segments.iter().map(|s| s.lock().max_size()).sum()
316    }
317}
318
319impl<K, V, S> CacheMetrics for ConcurrentSlruCache<K, V, S>
320where
321    K: Hash + Eq + Clone + Send,
322    V: Clone + Send,
323    S: BuildHasher + Clone + Send,
324{
325    fn metrics(&self) -> BTreeMap<String, f64> {
326        let mut aggregated = BTreeMap::new();
327        for segment in self.segments.iter() {
328            let segment_metrics = segment.lock().metrics().metrics();
329            for (key, value) in segment_metrics {
330                *aggregated.entry(key).or_insert(0.0) += value;
331            }
332        }
333        aggregated
334    }
335
336    fn algorithm_name(&self) -> &'static str {
337        "ConcurrentSLRU"
338    }
339}
340
341unsafe impl<K: Send, V: Send, S: Send> Send for ConcurrentSlruCache<K, V, S> {}
342unsafe impl<K: Send, V: Send, S: Send + Sync> Sync for ConcurrentSlruCache<K, V, S> {}
343
344impl<K, V, S> core::fmt::Debug for ConcurrentSlruCache<K, V, S>
345where
346    K: Hash + Eq + Clone + Send,
347    V: Clone + Send,
348    S: BuildHasher + Clone + Send,
349{
350    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
351        f.debug_struct("ConcurrentSlruCache")
352            .field("segment_count", &self.segments.len())
353            .field("total_len", &self.len())
354            .finish()
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361    use crate::config::{ConcurrentCacheConfig, ConcurrentSlruCacheConfig, SlruCacheConfig};
362
363    extern crate std;
364    use std::string::ToString;
365    use std::sync::Arc;
366    use std::thread;
367    use std::vec::Vec;
368
369    fn make_config(
370        capacity: usize,
371        protected: usize,
372        segments: usize,
373    ) -> ConcurrentSlruCacheConfig {
374        ConcurrentCacheConfig {
375            base: SlruCacheConfig {
376                capacity: NonZeroUsize::new(capacity).unwrap(),
377                protected_capacity: NonZeroUsize::new(protected).unwrap(),
378                max_size: u64::MAX,
379            },
380            segments,
381        }
382    }
383
384    #[test]
385    fn test_basic_operations() {
386        let cache: ConcurrentSlruCache<String, i32> =
387            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
388
389        cache.put("a".to_string(), 1);
390        cache.put("b".to_string(), 2);
391
392        assert_eq!(cache.get(&"a".to_string()), Some(1));
393        assert_eq!(cache.get(&"b".to_string()), Some(2));
394    }
395
396    #[test]
397    fn test_concurrent_access() {
398        let cache: Arc<ConcurrentSlruCache<String, i32>> =
399            Arc::new(ConcurrentSlruCache::init(make_config(1000, 500, 16), None));
400        let num_threads = 8;
401        let ops_per_thread = 500;
402
403        let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
404
405        for t in 0..num_threads {
406            let cache = Arc::clone(&cache);
407            handles.push(thread::spawn(move || {
408                for i in 0..ops_per_thread {
409                    let key = std::format!("key_{}_{}", t, i);
410                    cache.put(key.clone(), i);
411                    let _ = cache.get(&key);
412                }
413            }));
414        }
415
416        for handle in handles {
417            handle.join().unwrap();
418        }
419
420        assert!(!cache.is_empty());
421    }
422
423    #[test]
424    fn test_capacity() {
425        let cache: ConcurrentSlruCache<String, i32> =
426            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
427
428        // Capacity is distributed across segments
429        let capacity = cache.capacity();
430        assert!(capacity >= 16);
431        assert!(capacity <= 100);
432    }
433
434    #[test]
435    fn test_segment_count() {
436        let cache: ConcurrentSlruCache<String, i32> =
437            ConcurrentSlruCache::init(make_config(100, 50, 8), None);
438
439        assert_eq!(cache.segment_count(), 8);
440    }
441
442    #[test]
443    fn test_len_and_is_empty() {
444        let cache: ConcurrentSlruCache<String, i32> =
445            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
446
447        assert!(cache.is_empty());
448        assert_eq!(cache.len(), 0);
449
450        cache.put("key1".to_string(), 1);
451        assert_eq!(cache.len(), 1);
452        assert!(!cache.is_empty());
453
454        cache.put("key2".to_string(), 2);
455        assert_eq!(cache.len(), 2);
456    }
457
458    #[test]
459    fn test_remove() {
460        let cache: ConcurrentSlruCache<String, i32> =
461            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
462
463        cache.put("key1".to_string(), 1);
464        cache.put("key2".to_string(), 2);
465
466        assert_eq!(cache.remove(&"key1".to_string()), Some(1));
467        assert_eq!(cache.len(), 1);
468        assert_eq!(cache.get(&"key1".to_string()), None);
469
470        assert_eq!(cache.remove(&"nonexistent".to_string()), None);
471    }
472
473    #[test]
474    fn test_clear() {
475        let cache: ConcurrentSlruCache<String, i32> =
476            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
477
478        cache.put("key1".to_string(), 1);
479        cache.put("key2".to_string(), 2);
480        cache.put("key3".to_string(), 3);
481
482        assert_eq!(cache.len(), 3);
483
484        cache.clear();
485
486        assert_eq!(cache.len(), 0);
487        assert!(cache.is_empty());
488        assert_eq!(cache.get(&"key1".to_string()), None);
489    }
490
491    #[test]
492    fn test_contains_key() {
493        let cache: ConcurrentSlruCache<String, i32> =
494            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
495
496        cache.put("exists".to_string(), 1);
497
498        assert!(cache.contains_key(&"exists".to_string()));
499        assert!(!cache.contains_key(&"missing".to_string()));
500    }
501
502    #[test]
503    fn test_get_with() {
504        let cache: ConcurrentSlruCache<String, String> =
505            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
506
507        cache.put("key".to_string(), "hello world".to_string());
508
509        let len = cache.get_with(&"key".to_string(), |v: &String| v.len());
510        assert_eq!(len, Some(11));
511
512        let missing = cache.get_with(&"missing".to_string(), |v: &String| v.len());
513        assert_eq!(missing, None);
514    }
515
516    #[test]
517    fn test_eviction_on_capacity() {
518        let cache: ConcurrentSlruCache<String, i32> =
519            ConcurrentSlruCache::init(make_config(80, 48, 16), None);
520
521        // Fill the cache
522        for i in 0..10 {
523            cache.put(std::format!("key{}", i), i);
524        }
525
526        // Cache should not exceed capacity
527        assert!(cache.len() <= 80);
528    }
529
530    #[test]
531    fn test_promotion_to_protected() {
532        let cache: ConcurrentSlruCache<String, i32> =
533            ConcurrentSlruCache::init(make_config(160, 80, 16), None);
534
535        cache.put("key".to_string(), 1);
536
537        // Access multiple times to promote to protected segment
538        for _ in 0..3 {
539            let _ = cache.get(&"key".to_string());
540        }
541
542        // Item should still be accessible
543        assert_eq!(cache.get(&"key".to_string()), Some(1));
544    }
545
546    #[test]
547    fn test_metrics() {
548        let cache: ConcurrentSlruCache<String, i32> =
549            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
550
551        cache.put("a".to_string(), 1);
552        cache.put("b".to_string(), 2);
553
554        let metrics = cache.metrics();
555        // Metrics aggregation across segments
556        assert!(!metrics.is_empty());
557    }
558
559    #[test]
560    fn test_algorithm_name() {
561        let cache: ConcurrentSlruCache<String, i32> =
562            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
563
564        assert_eq!(cache.algorithm_name(), "ConcurrentSLRU");
565    }
566
567    #[test]
568    fn test_empty_cache_operations() {
569        let cache: ConcurrentSlruCache<String, i32> =
570            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
571
572        assert!(cache.is_empty());
573        assert_eq!(cache.len(), 0);
574        assert_eq!(cache.get(&"missing".to_string()), None);
575        assert_eq!(cache.remove(&"missing".to_string()), None);
576        assert!(!cache.contains_key(&"missing".to_string()));
577    }
578
579    #[test]
580    fn test_borrowed_key_lookup() {
581        let cache: ConcurrentSlruCache<String, i32> =
582            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
583
584        cache.put("test_key".to_string(), 42);
585
586        // Test with borrowed key
587        let key_str = "test_key";
588        assert_eq!(cache.get(key_str), Some(42));
589        assert!(cache.contains_key(key_str));
590        assert_eq!(cache.remove(key_str), Some(42));
591    }
592}