1extern crate alloc;
107
108use crate::lfu::LfuSegment;
109use crate::metrics::CacheMetrics;
110use alloc::boxed::Box;
111use alloc::collections::BTreeMap;
112use alloc::string::String;
113use alloc::vec::Vec;
114use core::borrow::Borrow;
115use core::hash::{BuildHasher, Hash};
116use core::num::NonZeroUsize;
117use parking_lot::Mutex;
118
119#[cfg(feature = "hashbrown")]
120use hashbrown::DefaultHashBuilder;
121
122#[cfg(not(feature = "hashbrown"))]
123use std::collections::hash_map::RandomState as DefaultHashBuilder;
124
125pub struct ConcurrentLfuCache<K, V, S = DefaultHashBuilder> {
127 segments: Box<[Mutex<LfuSegment<K, V, S>>]>,
128 hash_builder: S,
129}
130
131impl<K, V> ConcurrentLfuCache<K, V, DefaultHashBuilder>
132where
133 K: Hash + Eq + Clone + Send,
134 V: Clone + Send,
135{
136 pub fn init(
144 config: crate::config::ConcurrentLfuCacheConfig,
145 hasher: Option<DefaultHashBuilder>,
146 ) -> Self {
147 let segment_count = config.segments;
148 let capacity = config.base.capacity;
149 let max_size = config.base.max_size;
150
151 let segment_capacity = capacity.get() / segment_count;
152 let segment_cap = NonZeroUsize::new(segment_capacity.max(1)).unwrap();
153 let segment_max_size = max_size / segment_count as u64;
154
155 let hash_builder = hasher.unwrap_or_default();
156
157 let segments: Vec<_> = (0..segment_count)
158 .map(|_| {
159 let segment_config = crate::config::LfuCacheConfig {
160 capacity: segment_cap,
161 max_size: segment_max_size,
162 };
163 Mutex::new(LfuSegment::init(segment_config, hash_builder.clone()))
164 })
165 .collect();
166
167 Self {
168 segments: segments.into_boxed_slice(),
169 hash_builder,
170 }
171 }
172}
173
174impl<K, V, S> ConcurrentLfuCache<K, V, S>
175where
176 K: Hash + Eq + Clone + Send,
177 V: Clone + Send,
178 S: BuildHasher + Clone + Send,
179{
180 #[inline]
181 fn segment_index<Q>(&self, key: &Q) -> usize
182 where
183 K: Borrow<Q>,
184 Q: ?Sized + Hash,
185 {
186 (self.hash_builder.hash_one(key) as usize) % self.segments.len()
187 }
188
189 pub fn capacity(&self) -> usize {
191 self.segments.iter().map(|s| s.lock().cap().get()).sum()
192 }
193
194 pub fn segment_count(&self) -> usize {
196 self.segments.len()
197 }
198
199 pub fn len(&self) -> usize {
201 self.segments.iter().map(|s| s.lock().len()).sum()
202 }
203
204 pub fn is_empty(&self) -> bool {
206 self.segments.iter().all(|s| s.lock().is_empty())
207 }
208
209 pub fn get<Q>(&self, key: &Q) -> Option<V>
214 where
215 K: Borrow<Q>,
216 Q: ?Sized + Hash + Eq,
217 {
218 let idx = self.segment_index(key);
219 let mut segment = self.segments[idx].lock();
220 segment.get(key).cloned()
221 }
222
223 pub fn get_with<Q, F, R>(&self, key: &Q, f: F) -> Option<R>
228 where
229 K: Borrow<Q>,
230 Q: ?Sized + Hash + Eq,
231 F: FnOnce(&V) -> R,
232 {
233 let idx = self.segment_index(key);
234 let mut segment = self.segments[idx].lock();
235 segment.get(key).map(f)
236 }
237
238 pub fn put(&self, key: K, value: V) -> Option<(K, V)> {
242 let idx = self.segment_index(&key);
243 let mut segment = self.segments[idx].lock();
244 segment.put(key, value)
245 }
246
247 pub fn put_with_size(&self, key: K, value: V, size: u64) -> Option<(K, V)> {
249 let idx = self.segment_index(&key);
250 let mut segment = self.segments[idx].lock();
251 segment.put_with_size(key, value, size)
252 }
253
254 pub fn remove<Q>(&self, key: &Q) -> Option<V>
256 where
257 K: Borrow<Q>,
258 Q: ?Sized + Hash + Eq,
259 {
260 let idx = self.segment_index(key);
261 let mut segment = self.segments[idx].lock();
262 segment.remove(key)
263 }
264
265 pub fn contains_key<Q>(&self, key: &Q) -> bool
267 where
268 K: Borrow<Q>,
269 Q: ?Sized + Hash + Eq,
270 {
271 let idx = self.segment_index(key);
272 let mut segment = self.segments[idx].lock();
273 segment.get(key).is_some()
274 }
275
276 pub fn clear(&self) {
278 for segment in self.segments.iter() {
279 segment.lock().clear();
280 }
281 }
282
283 pub fn current_size(&self) -> u64 {
285 self.segments.iter().map(|s| s.lock().current_size()).sum()
286 }
287
288 pub fn max_size(&self) -> u64 {
290 self.segments.iter().map(|s| s.lock().max_size()).sum()
291 }
292}
293
294impl<K, V, S> CacheMetrics for ConcurrentLfuCache<K, V, S>
295where
296 K: Hash + Eq + Clone + Send,
297 V: Clone + Send,
298 S: BuildHasher + Clone + Send,
299{
300 fn metrics(&self) -> BTreeMap<String, f64> {
301 let mut aggregated = BTreeMap::new();
302 for segment in self.segments.iter() {
303 let segment_metrics = segment.lock().metrics().metrics();
304 for (key, value) in segment_metrics {
305 *aggregated.entry(key).or_insert(0.0) += value;
306 }
307 }
308 aggregated
309 }
310
311 fn algorithm_name(&self) -> &'static str {
312 "ConcurrentLFU"
313 }
314}
315
316unsafe impl<K: Send, V: Send, S: Send> Send for ConcurrentLfuCache<K, V, S> {}
317unsafe impl<K: Send, V: Send, S: Send + Sync> Sync for ConcurrentLfuCache<K, V, S> {}
318
319impl<K, V, S> core::fmt::Debug for ConcurrentLfuCache<K, V, S>
320where
321 K: Hash + Eq + Clone + Send,
322 V: Clone + Send,
323 S: BuildHasher + Clone + Send,
324{
325 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
326 f.debug_struct("ConcurrentLfuCache")
327 .field("segment_count", &self.segments.len())
328 .field("total_len", &self.len())
329 .finish()
330 }
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use crate::config::{ConcurrentCacheConfig, ConcurrentLfuCacheConfig, LfuCacheConfig};
337
338 extern crate std;
339 use std::string::ToString;
340 use std::sync::Arc;
341 use std::thread;
342 use std::vec::Vec;
343
344 fn make_config(capacity: usize, segments: usize) -> ConcurrentLfuCacheConfig {
345 ConcurrentCacheConfig {
346 base: LfuCacheConfig {
347 capacity: NonZeroUsize::new(capacity).unwrap(),
348 max_size: u64::MAX,
349 },
350 segments,
351 }
352 }
353
354 #[test]
355 fn test_basic_operations() {
356 let cache: ConcurrentLfuCache<String, i32> =
357 ConcurrentLfuCache::init(make_config(100, 16), None);
358
359 cache.put("a".to_string(), 1);
360 cache.put("b".to_string(), 2);
361
362 assert_eq!(cache.get(&"a".to_string()), Some(1));
363 assert_eq!(cache.get(&"b".to_string()), Some(2));
364 }
365
366 #[test]
367 fn test_concurrent_access() {
368 let cache: Arc<ConcurrentLfuCache<String, i32>> =
369 Arc::new(ConcurrentLfuCache::init(make_config(1000, 16), None));
370 let num_threads = 8;
371 let ops_per_thread = 500;
372
373 let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
374
375 for t in 0..num_threads {
376 let cache = Arc::clone(&cache);
377 handles.push(thread::spawn(move || {
378 for i in 0..ops_per_thread {
379 let key = std::format!("key_{}_{}", t, i);
380 cache.put(key.clone(), i);
381 if i % 3 == 0 {
383 let _ = cache.get(&key);
384 let _ = cache.get(&key);
385 }
386 }
387 }));
388 }
389
390 for handle in handles {
391 handle.join().unwrap();
392 }
393
394 assert!(!cache.is_empty());
395 }
396
397 #[test]
398 fn test_capacity() {
399 let cache: ConcurrentLfuCache<String, i32> =
400 ConcurrentLfuCache::init(make_config(100, 16), None);
401
402 let capacity = cache.capacity();
404 assert!(capacity >= 16);
405 assert!(capacity <= 100);
406 }
407
408 #[test]
409 fn test_segment_count() {
410 let cache: ConcurrentLfuCache<String, i32> =
411 ConcurrentLfuCache::init(make_config(100, 8), None);
412
413 assert_eq!(cache.segment_count(), 8);
414 }
415
416 #[test]
417 fn test_len_and_is_empty() {
418 let cache: ConcurrentLfuCache<String, i32> =
419 ConcurrentLfuCache::init(make_config(100, 16), None);
420
421 assert!(cache.is_empty());
422 assert_eq!(cache.len(), 0);
423
424 cache.put("key1".to_string(), 1);
425 assert_eq!(cache.len(), 1);
426 assert!(!cache.is_empty());
427
428 cache.put("key2".to_string(), 2);
429 assert_eq!(cache.len(), 2);
430 }
431
432 #[test]
433 fn test_remove() {
434 let cache: ConcurrentLfuCache<String, i32> =
435 ConcurrentLfuCache::init(make_config(100, 16), None);
436
437 cache.put("key1".to_string(), 1);
438 cache.put("key2".to_string(), 2);
439
440 assert_eq!(cache.remove(&"key1".to_string()), Some(1));
441 assert_eq!(cache.len(), 1);
442 assert_eq!(cache.get(&"key1".to_string()), None);
443
444 assert_eq!(cache.remove(&"nonexistent".to_string()), None);
445 }
446
447 #[test]
448 fn test_clear() {
449 let cache: ConcurrentLfuCache<String, i32> =
450 ConcurrentLfuCache::init(make_config(100, 16), None);
451
452 cache.put("key1".to_string(), 1);
453 cache.put("key2".to_string(), 2);
454 cache.put("key3".to_string(), 3);
455
456 assert_eq!(cache.len(), 3);
457
458 cache.clear();
459
460 assert_eq!(cache.len(), 0);
461 assert!(cache.is_empty());
462 assert_eq!(cache.get(&"key1".to_string()), None);
463 }
464
465 #[test]
466 fn test_contains_key() {
467 let cache: ConcurrentLfuCache<String, i32> =
468 ConcurrentLfuCache::init(make_config(100, 16), None);
469
470 cache.put("exists".to_string(), 1);
471
472 assert!(cache.contains_key(&"exists".to_string()));
473 assert!(!cache.contains_key(&"missing".to_string()));
474 }
475
476 #[test]
477 fn test_get_with() {
478 let cache: ConcurrentLfuCache<String, String> =
479 ConcurrentLfuCache::init(make_config(100, 16), None);
480
481 cache.put("key".to_string(), "hello world".to_string());
482
483 let len = cache.get_with(&"key".to_string(), |v: &String| v.len());
484 assert_eq!(len, Some(11));
485
486 let missing = cache.get_with(&"missing".to_string(), |v: &String| v.len());
487 assert_eq!(missing, None);
488 }
489
490 #[test]
491 fn test_frequency_eviction() {
492 let cache: ConcurrentLfuCache<String, i32> =
493 ConcurrentLfuCache::init(make_config(48, 16), None);
494
495 cache.put("a".to_string(), 1);
496 cache.put("b".to_string(), 2);
497 cache.put("c".to_string(), 3);
498
499 for _ in 0..5 {
501 let _ = cache.get(&"a".to_string());
502 let _ = cache.get(&"c".to_string());
503 }
504
505 cache.put("d".to_string(), 4);
507
508 assert!(cache.len() <= 48);
509 }
510
511 #[test]
512 fn test_eviction_on_capacity() {
513 let cache: ConcurrentLfuCache<String, i32> =
514 ConcurrentLfuCache::init(make_config(80, 16), None);
515
516 for i in 0..10 {
518 cache.put(std::format!("key{}", i), i);
519 }
520
521 assert!(cache.len() <= 80);
523 }
524
525 #[test]
526 fn test_metrics() {
527 let cache: ConcurrentLfuCache<String, i32> =
528 ConcurrentLfuCache::init(make_config(100, 16), None);
529
530 cache.put("a".to_string(), 1);
531 cache.put("b".to_string(), 2);
532
533 let metrics = cache.metrics();
534 assert!(!metrics.is_empty());
536 }
537
538 #[test]
539 fn test_algorithm_name() {
540 let cache: ConcurrentLfuCache<String, i32> =
541 ConcurrentLfuCache::init(make_config(100, 16), None);
542
543 assert_eq!(cache.algorithm_name(), "ConcurrentLFU");
544 }
545
546 #[test]
547 fn test_empty_cache_operations() {
548 let cache: ConcurrentLfuCache<String, i32> =
549 ConcurrentLfuCache::init(make_config(100, 16), None);
550
551 assert!(cache.is_empty());
552 assert_eq!(cache.len(), 0);
553 assert_eq!(cache.get(&"missing".to_string()), None);
554 assert_eq!(cache.remove(&"missing".to_string()), None);
555 assert!(!cache.contains_key(&"missing".to_string()));
556 }
557
558 #[test]
559 fn test_borrowed_key_lookup() {
560 let cache: ConcurrentLfuCache<String, i32> =
561 ConcurrentLfuCache::init(make_config(100, 16), None);
562
563 cache.put("test_key".to_string(), 42);
564
565 let key_str = "test_key";
567 assert_eq!(cache.get(key_str), Some(42));
568 assert!(cache.contains_key(key_str));
569 assert_eq!(cache.remove(key_str), Some(42));
570 }
571
572 #[test]
573 fn test_frequency_tracking() {
574 let cache: ConcurrentLfuCache<String, i32> =
575 ConcurrentLfuCache::init(make_config(100, 16), None);
576
577 cache.put("key".to_string(), 1);
578
579 for _ in 0..10 {
581 let _ = cache.get(&"key".to_string());
582 }
583
584 assert_eq!(cache.get(&"key".to_string()), Some(1));
586 }
587}