1extern crate alloc;
107
108use crate::lfu::LfuSegment;
109use crate::metrics::CacheMetrics;
110use alloc::boxed::Box;
111use alloc::collections::BTreeMap;
112use alloc::string::String;
113use alloc::vec::Vec;
114use core::borrow::Borrow;
115use core::hash::{BuildHasher, Hash};
116use core::num::NonZeroUsize;
117use parking_lot::Mutex;
118
119#[cfg(feature = "hashbrown")]
120use hashbrown::DefaultHashBuilder;
121
122#[cfg(not(feature = "hashbrown"))]
123use std::collections::hash_map::RandomState as DefaultHashBuilder;
124
125pub struct ConcurrentLfuCache<K, V, S = DefaultHashBuilder> {
127 segments: Box<[Mutex<LfuSegment<K, V, S>>]>,
128 hash_builder: S,
129}
130
131impl<K, V> ConcurrentLfuCache<K, V, DefaultHashBuilder>
132where
133 K: Hash + Eq + Clone + Send,
134 V: Clone + Send,
135{
136 pub fn init(
144 config: crate::config::ConcurrentLfuCacheConfig,
145 hasher: Option<DefaultHashBuilder>,
146 ) -> Self {
147 let segment_count = config.segments;
148 let capacity = config.base.capacity;
149 let max_size = config.base.max_size;
150
151 let segment_capacity = capacity.get() / segment_count;
152 let segment_cap = NonZeroUsize::new(segment_capacity.max(1)).unwrap();
153 let segment_max_size = max_size / segment_count as u64;
154
155 let hash_builder = hasher.unwrap_or_default();
156
157 let segments: Vec<_> = (0..segment_count)
158 .map(|_| {
159 let segment_config = crate::config::LfuCacheConfig {
160 capacity: segment_cap,
161 max_size: segment_max_size,
162 };
163 Mutex::new(LfuSegment::init(segment_config, hash_builder.clone()))
164 })
165 .collect();
166
167 Self {
168 segments: segments.into_boxed_slice(),
169 hash_builder,
170 }
171 }
172}
173
174impl<K, V, S> ConcurrentLfuCache<K, V, S>
175where
176 K: Hash + Eq + Clone + Send,
177 V: Clone + Send,
178 S: BuildHasher + Clone + Send,
179{
180 #[inline]
181 fn segment_index<Q>(&self, key: &Q) -> usize
182 where
183 K: Borrow<Q>,
184 Q: ?Sized + Hash,
185 {
186 (self.hash_builder.hash_one(key) as usize) % self.segments.len()
187 }
188
189 pub fn capacity(&self) -> usize {
191 self.segments.iter().map(|s| s.lock().cap().get()).sum()
192 }
193
194 pub fn segment_count(&self) -> usize {
196 self.segments.len()
197 }
198
199 pub fn len(&self) -> usize {
201 self.segments.iter().map(|s| s.lock().len()).sum()
202 }
203
204 pub fn is_empty(&self) -> bool {
206 self.segments.iter().all(|s| s.lock().is_empty())
207 }
208
209 pub fn get<Q>(&self, key: &Q) -> Option<V>
214 where
215 K: Borrow<Q>,
216 Q: ?Sized + Hash + Eq,
217 {
218 let idx = self.segment_index(key);
219 let mut segment = self.segments[idx].lock();
220 segment.get(key).cloned()
221 }
222
223 pub fn get_with<Q, F, R>(&self, key: &Q, f: F) -> Option<R>
228 where
229 K: Borrow<Q>,
230 Q: ?Sized + Hash + Eq,
231 F: FnOnce(&V) -> R,
232 {
233 let idx = self.segment_index(key);
234 let mut segment = self.segments[idx].lock();
235 segment.get(key).map(f)
236 }
237
238 pub fn put(&self, key: K, value: V, size: u64) -> Option<Vec<(K, V)>> {
243 let idx = self.segment_index(&key);
244 let mut segment = self.segments[idx].lock();
245 segment.put(key, value, size)
246 }
247
248 pub fn remove<Q>(&self, key: &Q) -> Option<V>
250 where
251 K: Borrow<Q>,
252 Q: ?Sized + Hash + Eq,
253 {
254 let idx = self.segment_index(key);
255 let mut segment = self.segments[idx].lock();
256 segment.remove(key)
257 }
258
259 pub fn clear(&self) {
261 for segment in self.segments.iter() {
262 segment.lock().clear();
263 }
264 }
265
266 pub fn current_size(&self) -> u64 {
268 self.segments.iter().map(|s| s.lock().current_size()).sum()
269 }
270
271 pub fn max_size(&self) -> u64 {
273 self.segments.iter().map(|s| s.lock().max_size()).sum()
274 }
275
276 pub fn contains<Q>(&self, key: &Q) -> bool
288 where
289 K: Borrow<Q>,
290 Q: ?Sized + Hash + Eq,
291 {
292 let idx = self.segment_index(key);
293 let segment = self.segments[idx].lock();
294 segment.contains(key)
295 }
296
297 pub fn peek<Q>(&self, key: &Q) -> Option<V>
309 where
310 K: Borrow<Q>,
311 Q: ?Sized + Hash + Eq,
312 V: Clone,
313 {
314 let idx = self.segment_index(key);
315 let segment = self.segments[idx].lock();
316 segment.peek(key).cloned()
317 }
318}
319
320impl<K, V, S> CacheMetrics for ConcurrentLfuCache<K, V, S>
321where
322 K: Hash + Eq + Clone + Send,
323 V: Clone + Send,
324 S: BuildHasher + Clone + Send,
325{
326 fn metrics(&self) -> BTreeMap<String, f64> {
327 let mut aggregated = BTreeMap::new();
328 for segment in self.segments.iter() {
329 let segment_metrics = segment.lock().metrics().metrics();
330 for (key, value) in segment_metrics {
331 *aggregated.entry(key).or_insert(0.0) += value;
332 }
333 }
334 aggregated
335 }
336
337 fn algorithm_name(&self) -> &'static str {
338 "ConcurrentLFU"
339 }
340}
341
342unsafe impl<K: Send, V: Send, S: Send> Send for ConcurrentLfuCache<K, V, S> {}
343unsafe impl<K: Send, V: Send, S: Send + Sync> Sync for ConcurrentLfuCache<K, V, S> {}
344
345impl<K, V, S> core::fmt::Debug for ConcurrentLfuCache<K, V, S>
346where
347 K: Hash + Eq + Clone + Send,
348 V: Clone + Send,
349 S: BuildHasher + Clone + Send,
350{
351 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
352 f.debug_struct("ConcurrentLfuCache")
353 .field("segment_count", &self.segments.len())
354 .field("total_len", &self.len())
355 .finish()
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use super::*;
362 use crate::config::{ConcurrentCacheConfig, ConcurrentLfuCacheConfig, LfuCacheConfig};
363
364 extern crate std;
365 use std::string::ToString;
366 use std::sync::Arc;
367 use std::thread;
368 use std::vec::Vec;
369
370 fn make_config(capacity: usize, segments: usize) -> ConcurrentLfuCacheConfig {
371 ConcurrentCacheConfig {
372 base: LfuCacheConfig {
373 capacity: NonZeroUsize::new(capacity).unwrap(),
374 max_size: u64::MAX,
375 },
376 segments,
377 }
378 }
379
380 #[test]
381 fn test_basic_operations() {
382 let cache: ConcurrentLfuCache<String, i32> =
383 ConcurrentLfuCache::init(make_config(100, 16), None);
384
385 cache.put("a".to_string(), 1, 1);
386 cache.put("b".to_string(), 2, 1);
387
388 assert_eq!(cache.get(&"a".to_string()), Some(1));
389 assert_eq!(cache.get(&"b".to_string()), Some(2));
390 }
391
392 #[test]
393 fn test_concurrent_access() {
394 let cache: Arc<ConcurrentLfuCache<String, i32>> =
395 Arc::new(ConcurrentLfuCache::init(make_config(1000, 16), None));
396 let num_threads = 8;
397 let ops_per_thread = 500;
398
399 let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
400
401 for t in 0..num_threads {
402 let cache = Arc::clone(&cache);
403 handles.push(thread::spawn(move || {
404 for i in 0..ops_per_thread {
405 let key = std::format!("key_{}_{}", t, i);
406 cache.put(key.clone(), i, 1);
407 if i % 3 == 0 {
409 let _ = cache.get(&key);
410 let _ = cache.get(&key);
411 }
412 }
413 }));
414 }
415
416 for handle in handles {
417 handle.join().unwrap();
418 }
419
420 assert!(!cache.is_empty());
421 }
422
423 #[test]
424 fn test_capacity() {
425 let cache: ConcurrentLfuCache<String, i32> =
426 ConcurrentLfuCache::init(make_config(100, 16), None);
427
428 let capacity = cache.capacity();
430 assert!(capacity >= 16);
431 assert!(capacity <= 100);
432 }
433
434 #[test]
435 fn test_segment_count() {
436 let cache: ConcurrentLfuCache<String, i32> =
437 ConcurrentLfuCache::init(make_config(100, 8), None);
438
439 assert_eq!(cache.segment_count(), 8);
440 }
441
442 #[test]
443 fn test_len_and_is_empty() {
444 let cache: ConcurrentLfuCache<String, i32> =
445 ConcurrentLfuCache::init(make_config(100, 16), None);
446
447 assert!(cache.is_empty());
448 assert_eq!(cache.len(), 0);
449
450 cache.put("key1".to_string(), 1, 1);
451 assert_eq!(cache.len(), 1);
452 assert!(!cache.is_empty());
453
454 cache.put("key2".to_string(), 2, 1);
455 assert_eq!(cache.len(), 2);
456 }
457
458 #[test]
459 fn test_remove() {
460 let cache: ConcurrentLfuCache<String, i32> =
461 ConcurrentLfuCache::init(make_config(100, 16), None);
462
463 cache.put("key1".to_string(), 1, 1);
464 cache.put("key2".to_string(), 2, 1);
465
466 assert_eq!(cache.remove(&"key1".to_string()), Some(1));
467 assert_eq!(cache.len(), 1);
468 assert_eq!(cache.get(&"key1".to_string()), None);
469
470 assert_eq!(cache.remove(&"nonexistent".to_string()), None);
471 }
472
473 #[test]
474 fn test_clear() {
475 let cache: ConcurrentLfuCache<String, i32> =
476 ConcurrentLfuCache::init(make_config(100, 16), None);
477
478 cache.put("key1".to_string(), 1, 1);
479 cache.put("key2".to_string(), 2, 1);
480 cache.put("key3".to_string(), 3, 1);
481
482 assert_eq!(cache.len(), 3);
483
484 cache.clear();
485
486 assert_eq!(cache.len(), 0);
487 assert!(cache.is_empty());
488 assert_eq!(cache.get(&"key1".to_string()), None);
489 }
490
491 #[test]
492 fn test_contains_key() {
493 let cache: ConcurrentLfuCache<String, i32> =
494 ConcurrentLfuCache::init(make_config(100, 16), None);
495
496 cache.put("exists".to_string(), 1, 1);
497
498 assert!(cache.contains(&"exists".to_string()));
499 assert!(!cache.contains(&"missing".to_string()));
500 }
501
502 #[test]
503 fn test_get_with() {
504 let cache: ConcurrentLfuCache<String, String> =
505 ConcurrentLfuCache::init(make_config(100, 16), None);
506
507 cache.put("key".to_string(), "hello world".to_string(), 1);
508
509 let len = cache.get_with(&"key".to_string(), |v: &String| v.len());
510 assert_eq!(len, Some(11));
511
512 let missing = cache.get_with(&"missing".to_string(), |v: &String| v.len());
513 assert_eq!(missing, None);
514 }
515
516 #[test]
517 fn test_frequency_eviction() {
518 let cache: ConcurrentLfuCache<String, i32> =
519 ConcurrentLfuCache::init(make_config(48, 16), None);
520
521 cache.put("a".to_string(), 1, 1);
522 cache.put("b".to_string(), 2, 1);
523 cache.put("c".to_string(), 3, 1);
524
525 for _ in 0..5 {
527 let _ = cache.get(&"a".to_string());
528 let _ = cache.get(&"c".to_string());
529 }
530
531 cache.put("d".to_string(), 4, 1);
533
534 assert!(cache.len() <= 48);
535 }
536
537 #[test]
538 fn test_eviction_on_capacity() {
539 let cache: ConcurrentLfuCache<String, i32> =
540 ConcurrentLfuCache::init(make_config(80, 16), None);
541
542 for i in 0..10 {
544 cache.put(std::format!("key{}", i), i, 1);
545 }
546
547 assert!(cache.len() <= 80);
549 }
550
551 #[test]
552 fn test_metrics() {
553 let cache: ConcurrentLfuCache<String, i32> =
554 ConcurrentLfuCache::init(make_config(100, 16), None);
555
556 cache.put("a".to_string(), 1, 1);
557 cache.put("b".to_string(), 2, 1);
558
559 let metrics = cache.metrics();
560 assert!(!metrics.is_empty());
562 }
563
564 #[test]
565 fn test_algorithm_name() {
566 let cache: ConcurrentLfuCache<String, i32> =
567 ConcurrentLfuCache::init(make_config(100, 16), None);
568
569 assert_eq!(cache.algorithm_name(), "ConcurrentLFU");
570 }
571
572 #[test]
573 fn test_empty_cache_operations() {
574 let cache: ConcurrentLfuCache<String, i32> =
575 ConcurrentLfuCache::init(make_config(100, 16), None);
576
577 assert!(cache.is_empty());
578 assert_eq!(cache.len(), 0);
579 assert_eq!(cache.get(&"missing".to_string()), None);
580 assert_eq!(cache.remove(&"missing".to_string()), None);
581 assert!(!cache.contains(&"missing".to_string()));
582 }
583
584 #[test]
585 fn test_borrowed_key_lookup() {
586 let cache: ConcurrentLfuCache<String, i32> =
587 ConcurrentLfuCache::init(make_config(100, 16), None);
588
589 cache.put("test_key".to_string(), 42, 1);
590
591 let key_str = "test_key";
593 assert_eq!(cache.get(key_str), Some(42));
594 assert!(cache.contains(key_str));
595 assert_eq!(cache.remove(key_str), Some(42));
596 }
597
598 #[test]
599 fn test_frequency_tracking() {
600 let cache: ConcurrentLfuCache<String, i32> =
601 ConcurrentLfuCache::init(make_config(100, 16), None);
602
603 cache.put("key".to_string(), 1, 1);
604
605 for _ in 0..10 {
607 let _ = cache.get(&"key".to_string());
608 }
609
610 assert_eq!(cache.get(&"key".to_string()), Some(1));
612 }
613
614 #[test]
615 fn test_contains_non_promoting() {
616 let cache: ConcurrentLfuCache<String, i32> =
617 ConcurrentLfuCache::init(make_config(100, 16), None);
618
619 cache.put("a".to_string(), 1, 1);
620 cache.put("b".to_string(), 2, 1);
621
622 assert!(cache.contains(&"a".to_string()));
624 assert!(cache.contains(&"b".to_string()));
625 assert!(!cache.contains(&"c".to_string()));
626 }
627}