Skip to main content

priority_lfu/
cache.rs

1#[cfg(feature = "metrics")]
2use std::sync::atomic::AtomicU64;
3use std::sync::atomic::{AtomicUsize, Ordering};
4
5use parking_lot::RwLock;
6
7use crate::erased::{Entry, ErasedKey, ErasedKeyRef};
8use crate::guard::Guard;
9#[cfg(feature = "metrics")]
10use crate::metrics::CacheMetrics;
11use crate::shard::Shard;
12use crate::traits::CacheKey;
13
14/// Thread-safe cache with weight-stratified clock eviction.
15///
16/// The cache can be shared across threads via `Arc<Cache>`. All methods are synchronous
17/// but safe to call from async contexts.
18///
19/// # Weight-Stratified Clock Eviction
20///
21/// This cache uses a two-level eviction strategy that combines policy-based prioritization
22/// with recency and frequency tracking:
23///
24/// 1. **Policy-based stratification**: Each entry has an eviction policy (Critical, Standard, or
25///    Volatile). When space is needed, lower-priority entries are evicted first:
26///    - **Volatile** (lowest priority) - evicted first
27///    - **Standard** (normal priority) - evicted if Volatile is empty
28///    - **Critical** (highest priority) - evicted only as a last resort
29///
30/// 2. **Clock algorithm**: Within each policy bucket, a "clock sweep" finds eviction candidates.
31///    The algorithm uses two signals:
32///    - **Clock bit**: Set when an entry is accessed. During eviction, if set, the bit is cleared
33///      and the entry gets another chance. If clear, the entry is a candidate.
34///    - **Frequency counter** (0-255): Tracks access frequency. Even with a clear clock bit,
35///      high-frequency entries get additional chances via frequency decay.
36///
37/// This approach ensures that frequently-accessed entries resist eviction regardless of policy,
38/// while still respecting policy-based priorities during memory pressure.
39///
40/// # Sharding for Concurrency
41///
42/// The cache divides its capacity across multiple shards (up to 64 by default). Each shard
43/// has its own lock, reducing contention in concurrent workloads. Keys are distributed to
44/// shards via their hash value.
45///
46/// The shard count is automatically scaled based on capacity to ensure each shard has at
47/// least 4KB of space. This prevents premature eviction due to uneven hash distribution:
48///
49/// - Large caches (>= 256KB): 64 shards
50/// - Smaller caches: Scaled down proportionally (e.g., 64KB → 16 shards, 4KB → 1 shard)
51///
52/// # Async Usage
53///
54/// ```ignore
55/// let cache = Arc::new(Cache::new(1024 * 1024));
56///
57/// // Share across tasks
58/// let cache_clone = cache.clone();
59/// tokio::spawn(async move {
60///     // Use get_clone() in async contexts to avoid holding guards across await points
61///     if let Some(value) = cache_clone.get_clone(&key) {
62///         do_async_work(&value).await;
63///     }
64/// });
65/// ```
66pub struct Cache {
67	/// Sharded storage
68	shards: Vec<RwLock<Shard>>,
69	/// Current total size in bytes
70	current_size: AtomicUsize,
71	/// Total entry count
72	entry_count: AtomicUsize,
73	/// Number of shards
74	shard_count: usize,
75	/// Maximum capacity in bytes
76	#[cfg_attr(not(feature = "metrics"), allow(dead_code))]
77	max_size_bytes: usize,
78	/// Metrics: cache hits
79	#[cfg(feature = "metrics")]
80	hits: AtomicU64,
81	/// Metrics: cache misses
82	#[cfg(feature = "metrics")]
83	misses: AtomicU64,
84	/// Metrics: new inserts
85	#[cfg(feature = "metrics")]
86	inserts: AtomicU64,
87	/// Metrics: updates (replaced existing key)
88	#[cfg(feature = "metrics")]
89	updates: AtomicU64,
90	/// Metrics: evictions
91	#[cfg(feature = "metrics")]
92	evictions: AtomicU64,
93	/// Metrics: explicit removals
94	#[cfg(feature = "metrics")]
95	removals: AtomicU64,
96}
97
98/// Minimum size per shard in bytes.
99///
100/// This ensures each shard has enough capacity to hold a reasonable number of entries,
101/// preventing premature eviction due to hash distribution variance across shards.
102/// With 4KB minimum, a shard can comfortably hold dozens of typical cache entries.
103const MIN_SHARD_SIZE: usize = 4096;
104
105/// Default number of shards for large caches.
106///
107/// More shards reduce contention but increase memory overhead.
108/// This value is used when capacity is large enough to support it.
109const DEFAULT_SHARD_COUNT: usize = 64;
110
111/// Compute the optimal shard count for a given capacity.
112///
113/// This ensures each shard has at least `MIN_SHARD_SIZE` bytes to prevent
114/// premature eviction due to uneven hash distribution.
115fn compute_shard_count(capacity: usize, desired_shards: usize) -> usize {
116	// Calculate max shards that maintain MIN_SHARD_SIZE per shard
117	let max_shards = (capacity / MIN_SHARD_SIZE).max(1);
118
119	// Use the smaller of desired and max, ensuring power of 2
120	desired_shards.min(max_shards).next_power_of_two().max(1)
121}
122
123impl Cache {
124	/// Create a new cache with the given maximum size in bytes.
125	///
126	/// Uses default configuration with automatic shard scaling. The number of shards
127	/// is chosen to balance concurrency (more shards = less contention) with per-shard
128	/// capacity (each shard needs enough space to avoid premature eviction).
129	///
130	/// - Large caches (>= 256KB): 64 shards
131	/// - Smaller caches: Scaled down to ensure at least 4KB per shard
132	///
133	/// For explicit control over shard count, use [`CacheBuilder`] or [`with_shards`].
134	pub fn new(max_size_bytes: usize) -> Self {
135		let shard_count = compute_shard_count(max_size_bytes, DEFAULT_SHARD_COUNT);
136		Self::with_shards_internal(max_size_bytes, shard_count)
137	}
138
139	/// Create with custom shard count.
140	///
141	/// More shards reduce contention but increase memory overhead.
142	/// Recommended: `num_cpus * 8` to `num_cpus * 16`.
143	///
144	/// **Note**: The shard count may be reduced if the capacity is too small to
145	/// support the requested number of shards (minimum 4KB per shard). This prevents
146	/// premature eviction due to uneven hash distribution.
147	pub fn with_shards(max_size_bytes: usize, shard_count: usize) -> Self {
148		let shard_count = compute_shard_count(max_size_bytes, shard_count);
149		Self::with_shards_internal(max_size_bytes, shard_count)
150	}
151
152	/// Internal constructor that uses the shard count directly (already validated).
153	fn with_shards_internal(max_size_bytes: usize, shard_count: usize) -> Self {
154		// Divide capacity per shard
155		let size_per_shard = max_size_bytes / shard_count;
156
157		// Create shards
158		let shards = (0..shard_count).map(|_| RwLock::new(Shard::new(size_per_shard))).collect();
159
160		Self {
161			shards,
162			current_size: AtomicUsize::new(0),
163			entry_count: AtomicUsize::new(0),
164			shard_count,
165			max_size_bytes,
166			#[cfg(feature = "metrics")]
167			hits: AtomicU64::new(0),
168			#[cfg(feature = "metrics")]
169			misses: AtomicU64::new(0),
170			#[cfg(feature = "metrics")]
171			inserts: AtomicU64::new(0),
172			#[cfg(feature = "metrics")]
173			updates: AtomicU64::new(0),
174			#[cfg(feature = "metrics")]
175			evictions: AtomicU64::new(0),
176			#[cfg(feature = "metrics")]
177			removals: AtomicU64::new(0),
178		}
179	}
180
181	/// Insert a key-value pair. Evicts items if necessary.
182	///
183	/// Returns the previous value if the key existed.
184	///
185	/// # Runtime Complexity
186	///
187	/// Expected case: O(1) for successful insertion without eviction.
188	///
189	/// Worst case: O(n) where n is the number of entries per shard.
190	/// Eviction happens via clock hand advancement within the shard.
191	pub fn insert<K: CacheKey>(&self, key: K, value: K::Value) -> Option<K::Value> {
192		let erased_key = ErasedKey::new(&key);
193		let policy = key.policy();
194		let entry = Entry::new(value, policy);
195		let entry_size = entry.size;
196
197		// Get the shard
198		let shard_lock = self.get_shard(erased_key.hash);
199
200		// Acquire write lock
201		let mut shard = shard_lock.write();
202
203		// Insert (handles eviction internally via Clock-PRO)
204		let (old_entry, (num_evictions, evicted_size)) = shard.insert(erased_key, entry);
205
206		if let Some(ref old) = old_entry {
207			// Update size (might be different)
208			let size_diff = entry_size as isize - old.size as isize;
209			if size_diff > 0 {
210				self.current_size.fetch_add(size_diff as usize, Ordering::Relaxed);
211			} else {
212				self.current_size.fetch_sub((-size_diff) as usize, Ordering::Relaxed);
213			}
214			// Metrics: track update
215			#[cfg(feature = "metrics")]
216			self.updates.fetch_add(1, Ordering::Relaxed);
217		} else {
218			// New entry
219			self.current_size.fetch_add(entry_size, Ordering::Relaxed);
220			self.entry_count.fetch_add(1, Ordering::Relaxed);
221			// Metrics: track insert
222			#[cfg(feature = "metrics")]
223			self.inserts.fetch_add(1, Ordering::Relaxed);
224		}
225
226		// Account for evictions
227		if num_evictions > 0 {
228			self.entry_count.fetch_sub(num_evictions, Ordering::Relaxed);
229			self.current_size.fetch_sub(evicted_size, Ordering::Relaxed);
230			// Metrics: track evictions
231			#[cfg(feature = "metrics")]
232			self.evictions.fetch_add(num_evictions as u64, Ordering::Relaxed);
233		}
234
235		old_entry.and_then(|e| e.into_value::<K::Value>())
236	}
237
238	/// Retrieve a value via guard. The guard holds a read lock on the shard.
239	///
240	/// # Warning
241	///
242	/// Do NOT hold this guard across `.await` points. Use `get_clone()` instead
243	/// for async contexts.
244	///
245	/// # Runtime Complexity
246	///
247	/// Expected case: O(1)
248	///
249	/// This method performs a zero-allocation hash table lookup and atomically
250	/// increments the reference counter for Clock-PRO.
251	pub fn get<K: CacheKey>(&self, key: &K) -> Option<Guard<'_, K::Value>> {
252		let key_ref = ErasedKeyRef::new(key); // Zero allocation
253		let shard_lock = self.get_shard(key_ref.hash);
254
255		// Acquire read lock
256		let shard = shard_lock.read();
257
258		// Look up entry (bumps reference counter internally, zero allocation)
259		let Some(entry) = shard.get_ref(&key_ref) else {
260			// Metrics: track miss
261			#[cfg(feature = "metrics")]
262			self.misses.fetch_add(1, Ordering::Relaxed);
263			return None;
264		};
265
266		// Metrics: track hit
267		#[cfg(feature = "metrics")]
268		self.hits.fetch_add(1, Ordering::Relaxed);
269
270		// Get pointer to value (use value_ref for type-safe downcast)
271		let value_ref = entry.value_ref::<K::Value>()?;
272		let value_ptr = value_ref as *const K::Value;
273
274		// SAFETY: We hold a read lock on the shard, so the entry won't be modified
275		// or dropped while the guard exists. The guard ties its lifetime to the lock.
276		unsafe { Some(Guard::new(shard, value_ptr)) }
277	}
278
279	/// Retrieve a cloned value. Safe to hold across `.await` points.
280	///
281	/// Requires `V: Clone`. This is the preferred method for async code.
282	///
283	/// # Runtime Complexity
284	///
285	/// Expected case: O(1) + O(m) where m is the cost of cloning the value.
286	///
287	/// This method performs a hash table lookup in O(1) expected time, then clones
288	/// the underlying value. If cloning is expensive, consider using `Arc<T>` as your
289	/// value type, which makes cloning O(1).
290	pub fn get_clone<K: CacheKey>(&self, key: &K) -> Option<K::Value>
291	where
292		K::Value: Clone,
293	{
294		let key_ref = ErasedKeyRef::new(key); // Zero allocation
295		let shard_lock = self.get_shard(key_ref.hash);
296
297		// Short-lived read lock
298		let shard = shard_lock.read();
299
300		let Some(entry) = shard.get_ref(&key_ref) else {
301			// Metrics: track miss
302			#[cfg(feature = "metrics")]
303			self.misses.fetch_add(1, Ordering::Relaxed);
304			return None;
305		};
306
307		// Metrics: track hit
308		#[cfg(feature = "metrics")]
309		self.hits.fetch_add(1, Ordering::Relaxed);
310
311		// Clone the value
312		entry.value_ref::<K::Value>().cloned()
313	}
314
315	/// Remove a key from the cache.
316	///
317	/// # Runtime Complexity
318	///
319	/// Expected case: O(1)
320	///
321	/// Worst case: O(n) where n is the number of entries per shard.
322	///
323	/// The worst case occurs when the entry is removed from an IndexMap segment,
324	/// which preserves insertion order and may require shifting elements. In practice,
325	/// most removals are O(1) amortized.
326	pub fn remove<K: CacheKey>(&self, key: &K) -> Option<K::Value> {
327		let erased_key = ErasedKey::new(key);
328		let shard_lock = self.get_shard(erased_key.hash);
329
330		let mut shard = shard_lock.write();
331		let entry = shard.remove(&erased_key)?;
332
333		self.current_size.fetch_sub(entry.size, Ordering::Relaxed);
334		self.entry_count.fetch_sub(1, Ordering::Relaxed);
335
336		// Metrics: track removal
337		#[cfg(feature = "metrics")]
338		self.removals.fetch_add(1, Ordering::Relaxed);
339
340		entry.into_value::<K::Value>()
341	}
342
343	/// Check if a key exists (zero allocation).
344	pub fn contains<K: CacheKey>(&self, key: &K) -> bool {
345		let key_ref = ErasedKeyRef::new(key); // Zero allocation
346		let shard_lock = self.get_shard(key_ref.hash);
347		let shard = shard_lock.read();
348
349		// Use get_ref for zero-allocation lookup
350		shard.get_ref(&key_ref).is_some()
351	}
352
353	/// Current total size in bytes.
354	pub fn size(&self) -> usize {
355		self.current_size.load(Ordering::Relaxed)
356	}
357
358	/// Number of entries across all types.
359	pub fn len(&self) -> usize {
360		self.entry_count.load(Ordering::Relaxed)
361	}
362
363	/// Check if cache is empty.
364	pub fn is_empty(&self) -> bool {
365		self.len() == 0
366	}
367
368	/// Clear all entries.
369	///
370	/// # Runtime Complexity
371	///
372	/// O(n) where n is the total number of entries in the cache.
373	///
374	/// This method acquires a write lock on each shard sequentially and clears
375	/// all data structures (HashMap and IndexMaps).
376	pub fn clear(&self) {
377		for shard_lock in &self.shards {
378			let mut shard = shard_lock.write();
379			shard.clear();
380		}
381		self.current_size.store(0, Ordering::Relaxed);
382		self.entry_count.store(0, Ordering::Relaxed);
383
384		// Reset all metrics
385		#[cfg(feature = "metrics")]
386		{
387			self.hits.store(0, Ordering::Relaxed);
388			self.misses.store(0, Ordering::Relaxed);
389			self.inserts.store(0, Ordering::Relaxed);
390			self.updates.store(0, Ordering::Relaxed);
391			self.evictions.store(0, Ordering::Relaxed);
392			self.removals.store(0, Ordering::Relaxed);
393		}
394	}
395
396	/// Get performance metrics snapshot.
397	///
398	/// Returns a snapshot of cache performance metrics including hits, misses,
399	/// evictions, and memory utilization.
400	///
401	/// # Example
402	///
403	/// ```
404	/// use priority_lfu::Cache;
405	///
406	/// let cache = Cache::new(1024 * 1024);
407	/// // ... perform cache operations ...
408	///
409	/// let metrics = cache.metrics();
410	/// println!("Hit rate: {:.2}%", metrics.hit_rate() * 100.0);
411	/// println!("Cache utilization: {:.2}%", metrics.utilization() * 100.0);
412	/// println!("Total evictions: {}", metrics.evictions);
413	/// ```
414	#[cfg(feature = "metrics")]
415	pub fn metrics(&self) -> CacheMetrics {
416		CacheMetrics {
417			hits: self.hits.load(Ordering::Relaxed),
418			misses: self.misses.load(Ordering::Relaxed),
419			inserts: self.inserts.load(Ordering::Relaxed),
420			updates: self.updates.load(Ordering::Relaxed),
421			evictions: self.evictions.load(Ordering::Relaxed),
422			removals: self.removals.load(Ordering::Relaxed),
423			current_size_bytes: self.current_size.load(Ordering::Relaxed),
424			capacity_bytes: self.max_size_bytes,
425			entry_count: self.entry_count.load(Ordering::Relaxed),
426		}
427	}
428
429	/// Get the shard for a given hash.
430	fn get_shard(&self, hash: u64) -> &RwLock<Shard> {
431		let index = (hash as usize) & (self.shard_count - 1);
432		&self.shards[index]
433	}
434}
435
436// Thread safety: Cache can be shared across threads
437unsafe impl Send for Cache {}
438unsafe impl Sync for Cache {}
439
440#[cfg(test)]
441mod tests {
442	use super::*;
443	use crate::DeepSizeOf;
444
445	#[derive(Hash, Eq, PartialEq, Clone, Debug)]
446	struct TestKey(u64);
447
448	impl CacheKey for TestKey {
449		type Value = TestValue;
450	}
451
452	#[derive(Clone, Debug, PartialEq, DeepSizeOf)]
453	struct TestValue {
454		data: String,
455	}
456
457	#[test]
458	fn test_compute_shard_count_scales_with_capacity() {
459		// Very small capacity: should get 1 shard
460		assert_eq!(compute_shard_count(1024, 64), 1);
461		assert_eq!(compute_shard_count(4095, 64), 1);
462
463		// Exactly MIN_SHARD_SIZE: 1 shard
464		assert_eq!(compute_shard_count(4096, 64), 1);
465
466		// 2x MIN_SHARD_SIZE: 2 shards (power of 2)
467		assert_eq!(compute_shard_count(8192, 64), 2);
468
469		// 16x MIN_SHARD_SIZE: 16 shards
470		assert_eq!(compute_shard_count(65536, 64), 16);
471
472		// Large capacity: full 64 shards
473		assert_eq!(compute_shard_count(256 * 1024, 64), 64);
474		assert_eq!(compute_shard_count(1024 * 1024, 64), 64);
475
476		// Custom shard count is also bounded
477		assert_eq!(compute_shard_count(8192, 128), 2); // Can't have 128 shards with 8KB
478		assert_eq!(compute_shard_count(1024 * 1024, 128), 128); // Large cache can have 128
479	}
480
481	#[test]
482	fn test_cache_insert_and_get() {
483		let cache = Cache::new(1024);
484
485		let key = TestKey(1);
486		let value = TestValue {
487			data: "hello".to_string(),
488		};
489
490		cache.insert(key.clone(), value.clone());
491
492		let retrieved = cache.get_clone(&key).expect("key should exist");
493		assert_eq!(retrieved, value);
494	}
495
496	#[test]
497	fn test_cache_remove() {
498		let cache = Cache::new(1024);
499
500		let key = TestKey(1);
501		let value = TestValue {
502			data: "hello".to_string(),
503		};
504
505		cache.insert(key.clone(), value.clone());
506		assert!(cache.contains(&key));
507
508		let removed = cache.remove(&key).expect("key should exist");
509		assert_eq!(removed, value);
510		assert!(!cache.contains(&key));
511	}
512
513	#[test]
514	fn test_cache_eviction() {
515		// Small cache that will trigger eviction (use fewer shards for small capacity)
516		let cache = Cache::with_shards(1000, 4);
517
518		// Insert values that exceed capacity
519		for i in 0..15 {
520			let key = TestKey(i);
521			let value = TestValue {
522				data: "x".repeat(50),
523			};
524			cache.insert(key, value);
525		}
526
527		// Cache should have evicted some entries
528		assert!(cache.len() < 15, "Cache should have evicted some entries");
529		assert!(cache.size() <= 1000, "Cache size should be <= 1000, got {}", cache.size());
530	}
531
532	#[test]
533	fn test_cache_concurrent_access() {
534		use std::sync::Arc;
535		use std::thread;
536
537		let cache = Arc::new(Cache::new(10240));
538		let mut handles = vec![];
539
540		for t in 0..4 {
541			let cache = cache.clone();
542			handles.push(thread::spawn(move || {
543				for i in 0..100 {
544					let key = TestKey(t * 100 + i);
545					let value = TestValue {
546						data: format!("value-{}", i),
547					};
548					cache.insert(key.clone(), value.clone());
549
550					if let Some(retrieved) = cache.get_clone(&key) {
551						assert_eq!(retrieved, value);
552					}
553				}
554			}));
555		}
556
557		for handle in handles {
558			handle.join().expect("thread should not panic");
559		}
560
561		assert!(!cache.is_empty());
562	}
563
564	#[test]
565	fn test_cache_is_send_sync() {
566		fn assert_send<T: Send>() {}
567		fn assert_sync<T: Sync>() {}
568
569		assert_send::<Cache>();
570		assert_sync::<Cache>();
571	}
572}