Skip to main content

cache_rs/concurrent/
slru.rs

1//! Concurrent SLRU Cache Implementation
2//!
3//! A thread-safe Segmented LRU cache using lock striping (segmented storage) for
4//! high-performance concurrent access. This is the multi-threaded counterpart to
5//! [`SlruCache`](crate::SlruCache).
6//!
7//! # How It Works
8//!
9//! SLRU maintains two segments per shard: **probationary** (for new items) and
10//! **protected** (for frequently accessed items). Items enter probationary and
11//! are promoted to protected on subsequent access.
12//!
13//! The cache partitions keys across multiple independent shards using hash-based sharding.
14//!
15//! ```text
16//! ┌──────────────────────────────────────────────────────────────────────────────┐
17//! │                        ConcurrentSlruCache                                   │
18//! │                                                                              │
19//! │  hash(key) % N  ──▶  Shard Selection                                         │
20//! │                                                                              │
21//! │  ┌────────────────────┐ ┌────────────────────┐     ┌────────────────────┐    │
22//! │  │     Shard 0        │ │     Shard 1        │ ... │    Shard N-1       │    │
23//! │  │  ┌──────────────┐  │ │  ┌──────────────┐  │     │  ┌──────────────┐  │    │
24//! │  │  │    Mutex     │  │ │  │    Mutex     │  │     │  │    Mutex     │  │    │
25//! │  │  └──────┬───────┘  │ │  └──────┬───────┘  │     │  └──────┬───────┘  │    │
26//! │  │         │          │ │         │          │     │         │          │    │
27//! │  │  ┌──────▼───────┐  │ │  ┌──────▼───────┐  │     │  ┌──────▼───────┐  │    │
28//! │  │  │ Protected    │  │ │  │ Protected    │  │     │  │ Protected    │  │    │
29//! │  │  │ [hot items]  │  │ │  │ [hot items]  │  │     │  │ [hot items]  │  │    │
30//! │  │  ├──────────────┤  │ │  ├──────────────┤  │     │  ├──────────────┤  │    │
31//! │  │  │ Probationary │  │ │  │ Probationary │  │     │  │ Probationary │  │    │
32//! │  │  │ [new items]  │  │ │  │ [new items]  │  │     │  │ [new items]  │  │    │
33//! │  │  └──────────────┘  │ │  └──────────────┘  │     │  └──────────────┘  │    │
34//! │  └────────────────────┘ └────────────────────┘     └────────────────────┘    │
35//! └──────────────────────────────────────────────────────────────────────────────┘
36//! ```
37//!
38//! ## Segment Structure
39//!
40//! Each shard contains:
41//! - **Probationary segment**: New items enter here (default: 80% of shard capacity)
42//! - **Protected segment**: Items promoted on second access (default: 20% of shard capacity)
43//!
44//! ## Trade-offs
45//!
46//! - **Pros**: Near-linear scaling, excellent scan resistance per shard
47//! - **Cons**: Protection is per-shard, not global. An item hot in shard A
48//!   doesn't influence protection in shard B.
49//!
50//! # Performance Characteristics
51//!
52//! | Metric | Value |
53//! |--------|-------|
54//! | Get/Put/Remove | O(1) average |
55//! | Concurrency | Near-linear scaling up to shard count |
56//! | Memory overhead | ~140 bytes per entry + one Mutex per shard |
57//! | Scan resistance | Good (two-tier protection) |
58//!
59//! # When to Use
60//!
61//! **Use ConcurrentSlruCache when:**
62//! - Multiple threads need cache access
63//! - Workload mixes hot items with sequential scans
64//! - You need scan resistance in a concurrent environment
65//! - Some items are accessed repeatedly while others are one-time
66//!
67//! **Consider alternatives when:**
68//! - Single-threaded access only → use `SlruCache`
69//! - Pure recency patterns → use `ConcurrentLruCache` (simpler)
70//! - Frequency-dominant patterns → use `ConcurrentLfuCache`
71//! - Need global protection coordination → use `Mutex<SlruCache>`
72//!
73//! # Thread Safety
74//!
75//! `ConcurrentSlruCache` is `Send + Sync` and can be shared via `Arc`.
76//!
77//! # Example
78//!
79//! ```rust,ignore
80//! use cache_rs::concurrent::ConcurrentSlruCache;
81//! use cache_rs::config::{ConcurrentSlruCacheConfig, ConcurrentCacheConfig, SlruCacheConfig};
82//! use std::num::NonZeroUsize;
83//! use std::sync::Arc;
84//! use std::thread;
85//!
86//! // Total capacity 10000, protected segment 2000 (20%)
87//! let config: ConcurrentSlruCacheConfig = ConcurrentCacheConfig {
88//!     base: SlruCacheConfig {
89//!         capacity: NonZeroUsize::new(10_000).unwrap(),
90//!         protected_capacity: NonZeroUsize::new(2_000).unwrap(),
91//!         max_size: u64::MAX,
92//!     },
93//!     segments: 16,
94//! };
95//! let cache = Arc::new(ConcurrentSlruCache::init(config, None));
96//!
97//! // Thread 1: Establish hot items
98//! let cache1 = Arc::clone(&cache);
99//! let hot_handle = thread::spawn(move || {
100//!     for i in 0..100 {
101//!         let key = format!("hot-{}", i);
102//!         cache1.put(key.clone(), i);
103//!         // Second access promotes to protected
104//!         let _ = cache1.get(&key);
105//!     }
106//! });
107//!
108//! // Thread 2: Sequential scan (shouldn't evict hot items)
109//! let cache2 = Arc::clone(&cache);
110//! let scan_handle = thread::spawn(move || {
111//!     for i in 0..50000 {
112//!         cache2.put(format!("scan-{}", i), i as i32);
113//!         // No second access - stays in probationary
114//!     }
115//! });
116//!
117//! hot_handle.join().unwrap();
118//! scan_handle.join().unwrap();
119//!
120//! // Hot items should still be accessible (protected from scan)
121//! for i in 0..100 {
122//!     assert!(cache.get(&format!("hot-{}", i)).is_some());
123//! }
124//! ```
125
126extern crate alloc;
127
128use crate::metrics::CacheMetrics;
129use crate::slru::SlruInner;
130use alloc::boxed::Box;
131use alloc::collections::BTreeMap;
132use alloc::string::String;
133use alloc::vec::Vec;
134use core::borrow::Borrow;
135use core::hash::{BuildHasher, Hash};
136use core::num::NonZeroUsize;
137use parking_lot::Mutex;
138
139#[cfg(feature = "hashbrown")]
140use hashbrown::DefaultHashBuilder;
141
142#[cfg(not(feature = "hashbrown"))]
143use std::collections::hash_map::RandomState as DefaultHashBuilder;
144
145/// A thread-safe SLRU cache with segmented storage for high concurrency.
146pub struct ConcurrentSlruCache<K, V, S = DefaultHashBuilder> {
147    segments: Box<[Mutex<SlruInner<K, V, S>>]>,
148    hash_builder: S,
149}
150
151impl<K, V> ConcurrentSlruCache<K, V, DefaultHashBuilder>
152where
153    K: Hash + Eq + Clone + Send,
154    V: Clone + Send,
155{
156    /// Creates a new concurrent SLRU cache from a configuration.
157    ///
158    /// This is the **recommended** way to create a concurrent SLRU cache.
159    ///
160    /// # Arguments
161    /// * `config` - The cache configuration
162    /// * `hasher` - Optional custom hasher. If `None`, uses the default hasher.
163    pub fn init(
164        config: crate::config::ConcurrentSlruCacheConfig,
165        hasher: Option<DefaultHashBuilder>,
166    ) -> Self {
167        let segment_count = config.segments;
168        let capacity = config.base.capacity;
169        let protected_capacity = config.base.protected_capacity;
170        let max_size = config.base.max_size;
171
172        let hash_builder = hasher.unwrap_or_default();
173
174        let segment_capacity = capacity.get() / segment_count;
175        let segment_protected = protected_capacity.get() / segment_count;
176        let segment_cap = NonZeroUsize::new(segment_capacity.max(1)).unwrap();
177        let segment_protected_cap = NonZeroUsize::new(segment_protected.max(1)).unwrap();
178        let segment_max_size = max_size / segment_count as u64;
179
180        let segments: Vec<_> = (0..segment_count)
181            .map(|_| {
182                let segment_config = crate::config::SlruCacheConfig {
183                    capacity: segment_cap,
184                    protected_capacity: segment_protected_cap,
185                    max_size: segment_max_size,
186                };
187                Mutex::new(SlruInner::init(segment_config, hash_builder.clone()))
188            })
189            .collect();
190
191        Self {
192            segments: segments.into_boxed_slice(),
193            hash_builder,
194        }
195    }
196}
197
198impl<K, V, S> ConcurrentSlruCache<K, V, S>
199where
200    K: Hash + Eq + Clone + Send,
201    V: Clone + Send,
202    S: BuildHasher + Clone + Send,
203{
204    #[inline]
205    fn segment_index<Q>(&self, key: &Q) -> usize
206    where
207        K: Borrow<Q>,
208        Q: ?Sized + Hash,
209    {
210        (self.hash_builder.hash_one(key) as usize) % self.segments.len()
211    }
212
213    /// Returns the total capacity across all segments.
214    pub fn capacity(&self) -> usize {
215        self.segments.iter().map(|s| s.lock().cap().get()).sum()
216    }
217
218    /// Returns the number of segments in the cache.
219    pub fn segment_count(&self) -> usize {
220        self.segments.len()
221    }
222
223    /// Returns the total number of entries across all segments.
224    pub fn len(&self) -> usize {
225        self.segments.iter().map(|s| s.lock().len()).sum()
226    }
227
228    /// Returns `true` if the cache contains no entries.
229    pub fn is_empty(&self) -> bool {
230        self.segments.iter().all(|s| s.lock().is_empty())
231    }
232
233    /// Gets a value from the cache.
234    ///
235    /// This clones the value to avoid holding the lock. For zero-copy access,
236    /// use `get_with()` instead.
237    pub fn get<Q>(&self, key: &Q) -> Option<V>
238    where
239        K: Borrow<Q>,
240        Q: ?Sized + Hash + Eq,
241    {
242        let idx = self.segment_index(key);
243        let mut segment = self.segments[idx].lock();
244        segment.get(key).cloned()
245    }
246
247    /// Gets a value and applies a function to it while holding the lock.
248    ///
249    /// This is more efficient than `get()` when you only need to read from the value,
250    /// as it avoids cloning.
251    pub fn get_with<Q, F, R>(&self, key: &Q, f: F) -> Option<R>
252    where
253        K: Borrow<Q>,
254        Q: ?Sized + Hash + Eq,
255        F: FnOnce(&V) -> R,
256    {
257        let idx = self.segment_index(key);
258        let mut segment = self.segments[idx].lock();
259        segment.get(key).map(f)
260    }
261
262    /// Inserts a key-value pair into the cache with optional size tracking.
263    ///
264    /// New items enter the probationary segment and are promoted to the protected
265    /// segment on subsequent access. Use `SIZE_UNIT` (1) for count-based caching.
266    pub fn put(&self, key: K, value: V, size: u64) -> Option<Vec<(K, V)>> {
267        let idx = self.segment_index(&key);
268        let mut segment = self.segments[idx].lock();
269        segment.put(key, value, size)
270    }
271
272    /// Removes a key from the cache, returning the value if it existed.
273    pub fn remove<Q>(&self, key: &Q) -> Option<V>
274    where
275        K: Borrow<Q>,
276        Q: ?Sized + Hash + Eq,
277    {
278        let idx = self.segment_index(key);
279        let mut segment = self.segments[idx].lock();
280        segment.remove(key)
281    }
282
283    /// Clears all entries from the cache.
284    pub fn clear(&self) {
285        for segment in self.segments.iter() {
286            segment.lock().clear();
287        }
288    }
289
290    /// Returns the current total size of cached content across all segments.
291    pub fn current_size(&self) -> u64 {
292        self.segments.iter().map(|s| s.lock().current_size()).sum()
293    }
294
295    /// Returns the maximum content size the cache can hold across all segments.
296    pub fn max_size(&self) -> u64 {
297        self.segments.iter().map(|s| s.lock().max_size()).sum()
298    }
299
300    /// Checks if the cache contains a key without promoting it.
301    ///
302    /// This is a pure existence check that does **not** update the entry's recency
303    /// or promote it between segments.
304    ///
305    /// # Example
306    ///
307    /// ```rust,ignore
308    /// if cache.contains(&"key".to_string()) {
309    ///     println!("Key exists!");
310    /// }
311    /// ```
312    pub fn contains<Q>(&self, key: &Q) -> bool
313    where
314        K: Borrow<Q>,
315        Q: ?Sized + Hash + Eq,
316    {
317        let idx = self.segment_index(key);
318        let segment = self.segments[idx].lock();
319        segment.contains(key)
320    }
321
322    /// Returns a clone of the value without promoting or updating access metadata.
323    ///
324    /// Unlike [`get()`](Self::get), this does NOT promote the entry from
325    /// probationary to protected. Returns a cloned value because the internal
326    /// lock cannot be held across the return boundary.
327    ///
328    /// # Example
329    ///
330    /// ```rust,ignore
331    /// let value = cache.peek(&"key".to_string());
332    /// ```
333    pub fn peek<Q>(&self, key: &Q) -> Option<V>
334    where
335        K: Borrow<Q>,
336        Q: ?Sized + Hash + Eq,
337        V: Clone,
338    {
339        let idx = self.segment_index(key);
340        let segment = self.segments[idx].lock();
341        segment.peek(key).cloned()
342    }
343}
344
345impl<K, V, S> CacheMetrics for ConcurrentSlruCache<K, V, S>
346where
347    K: Hash + Eq + Clone + Send,
348    V: Clone + Send,
349    S: BuildHasher + Clone + Send,
350{
351    fn metrics(&self) -> BTreeMap<String, f64> {
352        let mut aggregated = BTreeMap::new();
353        for segment in self.segments.iter() {
354            let segment_metrics = segment.lock().metrics().metrics();
355            for (key, value) in segment_metrics {
356                *aggregated.entry(key).or_insert(0.0) += value;
357            }
358        }
359        aggregated
360    }
361
362    fn algorithm_name(&self) -> &'static str {
363        "ConcurrentSLRU"
364    }
365}
366
367unsafe impl<K: Send, V: Send, S: Send> Send for ConcurrentSlruCache<K, V, S> {}
368unsafe impl<K: Send, V: Send, S: Send + Sync> Sync for ConcurrentSlruCache<K, V, S> {}
369
370impl<K, V, S> core::fmt::Debug for ConcurrentSlruCache<K, V, S>
371where
372    K: Hash + Eq + Clone + Send,
373    V: Clone + Send,
374    S: BuildHasher + Clone + Send,
375{
376    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
377        f.debug_struct("ConcurrentSlruCache")
378            .field("segment_count", &self.segments.len())
379            .field("total_len", &self.len())
380            .finish()
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387    use crate::config::{ConcurrentCacheConfig, ConcurrentSlruCacheConfig, SlruCacheConfig};
388
389    extern crate std;
390    use std::string::ToString;
391    use std::sync::Arc;
392    use std::thread;
393    use std::vec::Vec;
394
395    fn make_config(
396        capacity: usize,
397        protected: usize,
398        segments: usize,
399    ) -> ConcurrentSlruCacheConfig {
400        ConcurrentCacheConfig {
401            base: SlruCacheConfig {
402                capacity: NonZeroUsize::new(capacity).unwrap(),
403                protected_capacity: NonZeroUsize::new(protected).unwrap(),
404                max_size: u64::MAX,
405            },
406            segments,
407        }
408    }
409
410    #[test]
411    fn test_basic_operations() {
412        let cache: ConcurrentSlruCache<String, i32> =
413            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
414
415        cache.put("a".to_string(), 1, 1);
416        cache.put("b".to_string(), 2, 1);
417
418        assert_eq!(cache.get(&"a".to_string()), Some(1));
419        assert_eq!(cache.get(&"b".to_string()), Some(2));
420    }
421
422    #[test]
423    fn test_concurrent_access() {
424        let cache: Arc<ConcurrentSlruCache<String, i32>> =
425            Arc::new(ConcurrentSlruCache::init(make_config(1000, 500, 16), None));
426        let num_threads = 8;
427        let ops_per_thread = 500;
428
429        let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
430
431        for t in 0..num_threads {
432            let cache = Arc::clone(&cache);
433            handles.push(thread::spawn(move || {
434                for i in 0..ops_per_thread {
435                    let key = std::format!("key_{}_{}", t, i);
436                    cache.put(key.clone(), i, 1);
437                    let _ = cache.get(&key);
438                }
439            }));
440        }
441
442        for handle in handles {
443            handle.join().unwrap();
444        }
445
446        assert!(!cache.is_empty());
447    }
448
449    #[test]
450    fn test_capacity() {
451        let cache: ConcurrentSlruCache<String, i32> =
452            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
453
454        // Capacity is distributed across segments
455        let capacity = cache.capacity();
456        assert!(capacity >= 16);
457        assert!(capacity <= 100);
458    }
459
460    #[test]
461    fn test_segment_count() {
462        let cache: ConcurrentSlruCache<String, i32> =
463            ConcurrentSlruCache::init(make_config(100, 50, 8), None);
464
465        assert_eq!(cache.segment_count(), 8);
466    }
467
468    #[test]
469    fn test_len_and_is_empty() {
470        let cache: ConcurrentSlruCache<String, i32> =
471            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
472
473        assert!(cache.is_empty());
474        assert_eq!(cache.len(), 0);
475
476        cache.put("key1".to_string(), 1, 1);
477        assert_eq!(cache.len(), 1);
478        assert!(!cache.is_empty());
479
480        cache.put("key2".to_string(), 2, 1);
481        assert_eq!(cache.len(), 2);
482    }
483
484    #[test]
485    fn test_remove() {
486        let cache: ConcurrentSlruCache<String, i32> =
487            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
488
489        cache.put("key1".to_string(), 1, 1);
490        cache.put("key2".to_string(), 2, 1);
491
492        assert_eq!(cache.remove(&"key1".to_string()), Some(1));
493        assert_eq!(cache.len(), 1);
494        assert_eq!(cache.get(&"key1".to_string()), None);
495
496        assert_eq!(cache.remove(&"nonexistent".to_string()), None);
497    }
498
499    #[test]
500    fn test_clear() {
501        let cache: ConcurrentSlruCache<String, i32> =
502            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
503
504        cache.put("key1".to_string(), 1, 1);
505        cache.put("key2".to_string(), 2, 1);
506        cache.put("key3".to_string(), 3, 1);
507
508        assert_eq!(cache.len(), 3);
509
510        cache.clear();
511
512        assert_eq!(cache.len(), 0);
513        assert!(cache.is_empty());
514        assert_eq!(cache.get(&"key1".to_string()), None);
515    }
516
517    #[test]
518    fn test_contains_key() {
519        let cache: ConcurrentSlruCache<String, i32> =
520            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
521
522        cache.put("exists".to_string(), 1, 1);
523
524        assert!(cache.contains(&"exists".to_string()));
525        assert!(!cache.contains(&"missing".to_string()));
526    }
527
528    #[test]
529    fn test_get_with() {
530        let cache: ConcurrentSlruCache<String, String> =
531            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
532
533        cache.put("key".to_string(), "hello world".to_string(), 1);
534
535        let len = cache.get_with(&"key".to_string(), |v: &String| v.len());
536        assert_eq!(len, Some(11));
537
538        let missing = cache.get_with(&"missing".to_string(), |v: &String| v.len());
539        assert_eq!(missing, None);
540    }
541
542    #[test]
543    fn test_eviction_on_capacity() {
544        let cache: ConcurrentSlruCache<String, i32> =
545            ConcurrentSlruCache::init(make_config(80, 48, 16), None);
546
547        // Fill the cache
548        for i in 0..10 {
549            cache.put(std::format!("key{}", i), i, 1);
550        }
551
552        // Cache should not exceed capacity
553        assert!(cache.len() <= 80);
554    }
555
556    #[test]
557    fn test_promotion_to_protected() {
558        let cache: ConcurrentSlruCache<String, i32> =
559            ConcurrentSlruCache::init(make_config(160, 80, 16), None);
560
561        cache.put("key".to_string(), 1, 1);
562
563        // Access multiple times to promote to protected segment
564        for _ in 0..3 {
565            let _ = cache.get(&"key".to_string());
566        }
567
568        // Item should still be accessible
569        assert_eq!(cache.get(&"key".to_string()), Some(1));
570    }
571
572    #[test]
573    fn test_metrics() {
574        let cache: ConcurrentSlruCache<String, i32> =
575            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
576
577        cache.put("a".to_string(), 1, 1);
578        cache.put("b".to_string(), 2, 1);
579
580        let metrics = cache.metrics();
581        // Metrics aggregation across segments
582        assert!(!metrics.is_empty());
583    }
584
585    #[test]
586    fn test_algorithm_name() {
587        let cache: ConcurrentSlruCache<String, i32> =
588            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
589
590        assert_eq!(cache.algorithm_name(), "ConcurrentSLRU");
591    }
592
593    #[test]
594    fn test_empty_cache_operations() {
595        let cache: ConcurrentSlruCache<String, i32> =
596            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
597
598        assert!(cache.is_empty());
599        assert_eq!(cache.len(), 0);
600        assert_eq!(cache.get(&"missing".to_string()), None);
601        assert_eq!(cache.remove(&"missing".to_string()), None);
602        assert!(!cache.contains(&"missing".to_string()));
603    }
604
605    #[test]
606    fn test_borrowed_key_lookup() {
607        let cache: ConcurrentSlruCache<String, i32> =
608            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
609
610        cache.put("test_key".to_string(), 42, 1);
611
612        // Test with borrowed key
613        let key_str = "test_key";
614        assert_eq!(cache.get(key_str), Some(42));
615        assert!(cache.contains(key_str));
616        assert_eq!(cache.remove(key_str), Some(42));
617    }
618
619    #[test]
620    fn test_contains_non_promoting() {
621        let cache: ConcurrentSlruCache<String, i32> =
622            ConcurrentSlruCache::init(make_config(100, 50, 16), None);
623
624        cache.put("a".to_string(), 1, 1);
625        cache.put("b".to_string(), 2, 1);
626
627        // contains() should check without promoting
628        assert!(cache.contains(&"a".to_string()));
629        assert!(cache.contains(&"b".to_string()));
630        assert!(!cache.contains(&"c".to_string()));
631    }
632}