#![cfg(feature = "concurrent")]
use cache_rs::config::{
ConcurrentCacheConfig, ConcurrentGdsfCacheConfig, ConcurrentLfuCacheConfig,
ConcurrentLfudaCacheConfig, ConcurrentLruCacheConfig, ConcurrentSlruCacheConfig,
GdsfCacheConfig, LfuCacheConfig, LfudaCacheConfig, LruCacheConfig, SlruCacheConfig,
};
use cache_rs::metrics::CacheMetrics;
use cache_rs::{
ConcurrentGdsfCache, ConcurrentLfuCache, ConcurrentLfudaCache, ConcurrentLruCache,
ConcurrentSlruCache,
};
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
const NUM_THREADS: usize = 8;
const OPS_PER_THREAD: usize = 500;
fn lru_config(capacity: usize, segments: usize) -> ConcurrentLruCacheConfig {
ConcurrentCacheConfig {
base: LruCacheConfig {
capacity: NonZeroUsize::new(capacity).unwrap(),
max_size: u64::MAX,
},
segments,
}
}
fn slru_config(capacity: usize, protected: usize, segments: usize) -> ConcurrentSlruCacheConfig {
ConcurrentCacheConfig {
base: SlruCacheConfig {
capacity: NonZeroUsize::new(capacity).unwrap(),
protected_capacity: NonZeroUsize::new(protected).unwrap(),
max_size: u64::MAX,
},
segments,
}
}
fn lfu_config(capacity: usize, segments: usize) -> ConcurrentLfuCacheConfig {
ConcurrentCacheConfig {
base: LfuCacheConfig {
capacity: NonZeroUsize::new(capacity).unwrap(),
max_size: u64::MAX,
},
segments,
}
}
fn lfuda_config(capacity: usize, segments: usize) -> ConcurrentLfudaCacheConfig {
ConcurrentCacheConfig {
base: LfudaCacheConfig {
capacity: NonZeroUsize::new(capacity).unwrap(),
initial_age: 0,
max_size: u64::MAX,
},
segments,
}
}
fn gdsf_config(capacity: usize, segments: usize) -> ConcurrentGdsfCacheConfig {
ConcurrentCacheConfig {
base: GdsfCacheConfig {
capacity: NonZeroUsize::new(capacity).unwrap(),
initial_age: 0.0,
max_size: u64::MAX,
},
segments,
}
}
fn lru_config_with_size(
capacity: usize,
max_size: u64,
segments: usize,
) -> ConcurrentLruCacheConfig {
ConcurrentCacheConfig {
base: LruCacheConfig {
capacity: NonZeroUsize::new(capacity).unwrap(),
max_size,
},
segments,
}
}
fn lfu_config_with_size(
capacity: usize,
max_size: u64,
segments: usize,
) -> ConcurrentLfuCacheConfig {
ConcurrentCacheConfig {
base: LfuCacheConfig {
capacity: NonZeroUsize::new(capacity).unwrap(),
max_size,
},
segments,
}
}
fn lfuda_config_with_size(
capacity: usize,
max_size: u64,
segments: usize,
) -> ConcurrentLfudaCacheConfig {
ConcurrentCacheConfig {
base: LfudaCacheConfig {
capacity: NonZeroUsize::new(capacity).unwrap(),
initial_age: 0,
max_size,
},
segments,
}
}
fn gdsf_config_with_size(
capacity: usize,
max_size: u64,
segments: usize,
) -> ConcurrentGdsfCacheConfig {
ConcurrentCacheConfig {
base: GdsfCacheConfig {
capacity: NonZeroUsize::new(capacity).unwrap(),
initial_age: 0.0,
max_size,
},
segments,
}
}
fn slru_config_with_size(
capacity: usize,
protected: usize,
max_size: u64,
segments: usize,
) -> ConcurrentSlruCacheConfig {
ConcurrentCacheConfig {
base: SlruCacheConfig {
capacity: NonZeroUsize::new(capacity).unwrap(),
protected_capacity: NonZeroUsize::new(protected).unwrap(),
max_size,
},
segments,
}
}
#[test]
fn test_concurrent_lru_basic_eviction() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(50, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key * 10, 1);
let _ = c.get(&key);
if i % 5 == 0 {
let _ = c.contains(&key);
}
if i % 10 == 0 {
let _ = c.peek(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(
cache.len() <= 50,
"Cache should maintain capacity after concurrent evictions"
);
}
#[test]
fn test_concurrent_lru_access_prevents_eviction() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(100, 4), None));
let hot_keys: Vec<i32> = (0..10).collect();
for &key in &hot_keys {
cache.put(key, key * 100, 1);
}
let mut handles = vec![];
let hot_keys_arc = Arc::new(hot_keys.clone());
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
let hk = Arc::clone(&hot_keys_arc);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let hot_key = hk[i % hk.len()];
let _ = c.get(&hot_key);
let cold_key = 1000 + (t * OPS_PER_THREAD + i) as i32;
c.put(cold_key, cold_key, 1);
if i % 3 == 0 {
let _ = c.contains(&hot_key);
}
if i % 7 == 0 {
let _ = c.remove(&cold_key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 100, "Cache should maintain capacity");
}
#[test]
fn test_concurrent_lru_multi_segment_eviction() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(40, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key * 10, 1);
for _ in 0..3 {
let _ = c.get(&key);
}
if i % 4 == 0 {
let exists = c.contains(&key);
if exists && i % 8 == 0 {
let _ = c.remove(&key);
}
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 40, "Cache should not exceed capacity");
}
#[test]
fn test_concurrent_lru_concurrent_writes_maintain_capacity() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(20, 4), None));
let mut handles = vec![];
for t in 0..4 {
let cache = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..100 {
let key = t * 1000 + i;
cache.put(key, key, 1);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(
cache.len() <= 20,
"Concurrent writes should not exceed capacity"
);
}
#[test]
fn test_concurrent_lfu_frequency_based_eviction() {
let cache: Arc<ConcurrentLfuCache<i32, i32>> =
Arc::new(ConcurrentLfuCache::init(lfu_config(50, 4), None));
let hot_keys: Vec<i32> = (0..10).collect();
for &key in &hot_keys {
cache.put(key, key * 100, 1);
}
let mut handles = vec![];
let hot_keys_arc = Arc::new(hot_keys.clone());
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
let hk = Arc::clone(&hot_keys_arc);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let hot_key = hk[i % hk.len()];
for _ in 0..3 {
let _ = c.get(&hot_key);
}
let cold_key = 1000 + (t * OPS_PER_THREAD + i) as i32;
c.put(cold_key, cold_key, 1);
if i % 5 == 0 {
let _ = c.peek(&hot_key);
}
if i % 7 == 0 {
let _ = c.contains(&cold_key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 50, "Cache should maintain capacity");
}
#[test]
fn test_concurrent_lfu_frequency_accumulation() {
let cache: Arc<ConcurrentLfuCache<String, i32>> =
Arc::new(ConcurrentLfuCache::init(lfu_config(6, 2), None));
cache.put("hot".to_string(), 1, 1);
cache.put("warm".to_string(), 2, 1);
cache.put("cold".to_string(), 3, 1);
let cache_clone = Arc::clone(&cache);
let mut handles = vec![];
for _ in 0..4 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for _ in 0..50 {
c.get(&"hot".to_string());
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
cache_clone.put("new1".to_string(), 4, 1);
cache_clone.put("new2".to_string(), 5, 1);
cache_clone.put("new3".to_string(), 6, 1);
cache_clone.put("new4".to_string(), 7, 1);
assert!(
cache_clone.get(&"hot".to_string()).is_some(),
"Hot key should survive due to high concurrent access frequency"
);
}
#[test]
fn test_concurrent_lfu_multi_segment_correctness() {
let cache: Arc<ConcurrentLfuCache<i32, i32>> =
Arc::new(ConcurrentLfuCache::init(lfu_config(80, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key * 10, 1);
if key % 10 < 3 {
for _ in 0..5 {
let _ = c.get(&key);
}
}
if i % 4 == 0 {
let _ = c.contains(&key);
}
if i % 6 == 0 {
let _ = c.peek(&key);
}
if i % 10 == 0 {
let _ = c.remove(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 80, "Should maintain capacity");
}
#[test]
fn test_concurrent_lfuda_priority_eviction() {
let cache: Arc<ConcurrentLfudaCache<i32, i32>> =
Arc::new(ConcurrentLfudaCache::init(lfuda_config(50, 4), None));
let hot_keys: Vec<i32> = (0..10).collect();
for &key in &hot_keys {
cache.put(key, key * 100, 1);
}
let mut handles = vec![];
let hot_keys_arc = Arc::new(hot_keys.clone());
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
let hk = Arc::clone(&hot_keys_arc);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let hot_key = hk[i % hk.len()];
for _ in 0..3 {
let _ = c.get(&hot_key);
}
let cold_key = 1000 + (t * OPS_PER_THREAD + i) as i32;
c.put(cold_key, cold_key, 1);
if i % 5 == 0 {
let _ = c.peek(&hot_key);
}
if i % 8 == 0 {
let _ = c.contains(&cold_key);
}
if i % 12 == 0 {
let _ = c.remove(&cold_key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 50, "Cache should maintain capacity");
}
#[test]
fn test_concurrent_lfuda_aging_mechanism() {
let cache: Arc<ConcurrentLfudaCache<i32, i32>> =
Arc::new(ConcurrentLfudaCache::init(lfuda_config(50, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key * 10, 1);
if i % 3 == 0 {
for _ in 0..5 {
let _ = c.get(&key);
}
}
if i % 4 == 0 {
let _ = c.contains(&key);
}
if i % 7 == 0 {
let _ = c.peek(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 50, "Should maintain capacity");
}
#[test]
fn test_concurrent_slru_segment_behavior() {
let cache: Arc<ConcurrentSlruCache<i32, i32>> =
Arc::new(ConcurrentSlruCache::init(slru_config(60, 20, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key * 10, 1);
if i % 3 == 0 {
for _ in 0..3 {
let _ = c.get(&key);
}
}
if i % 5 == 0 {
let _ = c.peek(&key);
}
if i % 7 == 0 {
let _ = c.contains(&key);
}
if i % 10 == 0 {
let _ = c.remove(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 60, "Should maintain capacity");
}
#[test]
fn test_concurrent_slru_promotion_under_concurrency() {
let cache: Arc<ConcurrentSlruCache<i32, i32>> =
Arc::new(ConcurrentSlruCache::init(slru_config(50, 15, 4), None));
for i in 0..20 {
cache.put(i, i * 10, 1);
}
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key * 10, 1);
for _ in 0..4 {
let _ = c.get(&key);
}
if i % 5 == 0 {
let _ = c.peek(&key);
}
if i % 6 == 0 {
let _ = c.contains(&key);
}
if i % 9 == 0 {
let _ = c.remove(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 50, "Cache should maintain capacity");
}
#[test]
fn test_concurrent_gdsf_size_aware_eviction() {
let cache: Arc<ConcurrentGdsfCache<i32, i32>> =
Arc::new(ConcurrentGdsfCache::init(gdsf_config(50, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
let size = if i % 5 == 0 { 100 } else { 1 }; c.put(key, key * 10, size);
if i % 3 == 0 {
for _ in 0..3 {
let _ = c.get(&key);
}
}
if i % 4 == 0 {
let _ = c.peek(&key);
}
if i % 6 == 0 {
let _ = c.contains(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 50, "Cache should maintain capacity");
}
#[test]
fn test_concurrent_gdsf_frequency_matters() {
let cache: Arc<ConcurrentGdsfCache<i32, i32>> =
Arc::new(ConcurrentGdsfCache::init(gdsf_config(50, 4), None));
let hot_keys: Vec<i32> = (0..10).collect();
for &key in &hot_keys {
cache.put(key, key * 100, 1);
}
let mut handles = vec![];
let hot_keys_arc = Arc::new(hot_keys.clone());
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
let hk = Arc::clone(&hot_keys_arc);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let hot_key = hk[i % hk.len()];
for _ in 0..3 {
let _ = c.get(&hot_key);
}
let cold_key = 1000 + (t * OPS_PER_THREAD + i) as i32;
let size = ((i % 10) + 1) as u64;
c.put(cold_key, cold_key, size);
if i % 5 == 0 {
let _ = c.peek(&hot_key);
}
if i % 7 == 0 {
let _ = c.contains(&cold_key);
}
if i % 11 == 0 {
let _ = c.remove(&cold_key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 50, "Cache should maintain capacity");
}
#[test]
fn test_concurrent_gdsf_concurrent_size_tracking() {
let cache: Arc<ConcurrentGdsfCache<i32, i32>> =
Arc::new(ConcurrentGdsfCache::init(gdsf_config(10, 2), None));
let mut handles = vec![];
for t in 0..4 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..5 {
let key = t * 10 + i;
let size = ((i + 1) * 10) as u64;
c.put(key, key, size);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
let size = cache.current_size();
assert!(size > 0, "Size should be tracked");
assert!(cache.len() <= 10, "Should maintain entry capacity");
}
#[test]
fn test_capacity_never_exceeded_lru() {
let capacity = 50;
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(capacity, 4), None));
let mut handles = vec![];
let write_count = Arc::new(AtomicUsize::new(0));
for t in 0..8 {
let c = Arc::clone(&cache);
let wc = Arc::clone(&write_count);
handles.push(thread::spawn(move || {
for i in 0..500 {
let key = t * 1000 + i;
c.put(key, key, 1);
wc.fetch_add(1, Ordering::Relaxed);
assert!(
c.len() <= capacity,
"Capacity exceeded during concurrent writes!"
);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert_eq!(write_count.load(Ordering::Relaxed), 8 * 500);
assert!(cache.len() <= capacity, "Final capacity check failed");
}
#[test]
fn test_capacity_never_exceeded_lfu() {
let capacity = 50;
let cache: Arc<ConcurrentLfuCache<i32, i32>> =
Arc::new(ConcurrentLfuCache::init(lfu_config(capacity, 4), None));
let mut handles = vec![];
for t in 0..8 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..500 {
let key = t * 1000 + i;
c.put(key, key, 1);
assert!(c.len() <= capacity, "Capacity exceeded!");
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= capacity);
}
#[test]
fn test_capacity_never_exceeded_slru() {
let capacity = 50;
let cache: Arc<ConcurrentSlruCache<i32, i32>> = Arc::new(ConcurrentSlruCache::init(
slru_config(capacity, 20, 4),
None,
));
let mut handles = vec![];
for t in 0..8 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..500 {
let key = t * 1000 + i;
c.put(key, key, 1);
assert!(c.len() <= capacity, "Capacity exceeded!");
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= capacity);
}
#[test]
fn test_capacity_never_exceeded_lfuda() {
let capacity = 50;
let cache: Arc<ConcurrentLfudaCache<i32, i32>> =
Arc::new(ConcurrentLfudaCache::init(lfuda_config(capacity, 4), None));
let mut handles = vec![];
for t in 0..8 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..500 {
let key = t * 1000 + i;
c.put(key, key, 1);
assert!(c.len() <= capacity, "Capacity exceeded!");
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= capacity);
}
#[test]
fn test_capacity_never_exceeded_gdsf() {
let capacity = 50;
let cache: Arc<ConcurrentGdsfCache<i32, i32>> =
Arc::new(ConcurrentGdsfCache::init(gdsf_config(capacity, 4), None));
let mut handles = vec![];
for t in 0..8 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..500 {
let key = t * 1000 + i;
c.put(key, key, 1);
assert!(c.len() <= capacity, "Capacity exceeded!");
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= capacity);
}
#[test]
fn test_get_returns_correct_value() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(100, 4), None));
for i in 0..50 {
cache.put(i, i * 100, 1);
}
let errors = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for _ in 0..8 {
let c = Arc::clone(&cache);
let err = Arc::clone(&errors);
handles.push(thread::spawn(move || {
for i in 0..50 {
if let Some(val) = c.get(&i) {
if val != i * 100 {
err.fetch_add(1, Ordering::Relaxed);
}
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert_eq!(errors.load(Ordering::Relaxed), 0, "Values were corrupted");
}
#[test]
fn test_update_is_atomic() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(10, 2), None));
cache.put(1, 0, 1);
let mut handles = vec![];
for t in 0..4 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for _ in 0..100 {
c.put(1, t, 1);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
let value = cache.get(&1).unwrap();
assert!(
(0..=3).contains(&value),
"Value should be a valid thread ID"
);
}
#[test]
fn test_remove_consistency() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(200, 4), None));
for i in 0..50 {
cache.put(i, i, 1);
}
let initial_count = cache.len();
assert_eq!(initial_count, 50, "All 50 items should be inserted");
let successful_removes = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for _ in 0..4 {
let c = Arc::clone(&cache);
let sr = Arc::clone(&successful_removes);
handles.push(thread::spawn(move || {
for i in 0..50 {
if c.remove(&i).is_some() {
sr.fetch_add(1, Ordering::Relaxed);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert_eq!(
successful_removes.load(Ordering::Relaxed),
50,
"Each key should be removed exactly once"
);
assert!(cache.is_empty(), "Cache should be empty after all removes");
}
#[test]
fn test_mixed_operations_lru() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(100, 4), None));
let mut handles = vec![];
for t in 0..4 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..200 {
c.put(t * 1000 + i, i, 1);
}
}));
}
for t in 0..4 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..200 {
let _ = c.get(&(t * 1000 + i));
}
}));
}
for t in 0..2 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..100 {
c.remove(&(t * 1000 + i));
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 100);
}
#[test]
fn test_mixed_operations_lfu() {
let cache: Arc<ConcurrentLfuCache<i32, i32>> =
Arc::new(ConcurrentLfuCache::init(lfu_config(100, 4), None));
let mut handles = vec![];
for t in 0..4 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..200 {
c.put(t * 1000 + i, i, 1);
let _ = c.get(&(t * 1000 + i));
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 100);
}
#[test]
fn test_mixed_operations_slru() {
let cache: Arc<ConcurrentSlruCache<i32, i32>> =
Arc::new(ConcurrentSlruCache::init(slru_config(100, 40, 4), None));
let mut handles = vec![];
for t in 0..4 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..200 {
c.put(t * 1000 + i, i, 1);
for _ in 0..3 {
let _ = c.get(&(t * 1000 + i));
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 100);
}
#[test]
fn test_mixed_operations_gdsf() {
let cache: Arc<ConcurrentGdsfCache<i32, i32>> =
Arc::new(ConcurrentGdsfCache::init(gdsf_config(100, 4), None));
let mut handles = vec![];
for t in 0..4 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..200 {
let size = (i % 10 + 1) as u64;
c.put(t * 1000 + i, i, size);
let _ = c.get(&(t * 1000 + i));
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 100);
}
#[test]
fn test_clear_during_operations() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(100, 4), None));
let stop_flag = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for t in 0..4 {
let c = Arc::clone(&cache);
let sf = Arc::clone(&stop_flag);
handles.push(thread::spawn(move || {
let mut i = 0;
while sf.load(Ordering::Relaxed) == 0 {
c.put(t * 10000 + i, i, 1);
i += 1;
}
}));
}
let cache_clear = Arc::clone(&cache);
let stop_flag_clear = Arc::clone(&stop_flag);
handles.push(thread::spawn(move || {
for _ in 0..10 {
thread::sleep(std::time::Duration::from_millis(20));
cache_clear.clear();
}
stop_flag_clear.store(1, Ordering::Relaxed);
}));
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 100);
}
#[test]
fn test_size_tracking_concurrent_lru() {
let cache: Arc<ConcurrentLruCache<i32, String>> =
Arc::new(ConcurrentLruCache::init(lru_config(200, 4), None));
let mut handles = vec![];
for t in 0..4 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..25 {
let key = t * 100 + i;
c.put(key, format!("value_{}", key), 10);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
let size = cache.current_size();
let len = cache.len();
assert!(len <= 200, "Should not exceed entry limit");
assert!(size > 0, "Size should be tracked");
assert_eq!(size, len as u64 * 10, "Size should match entries * 10");
}
#[test]
fn test_size_tracking_concurrent_lfu() {
let cache: Arc<ConcurrentLfuCache<i32, String>> =
Arc::new(ConcurrentLfuCache::init(lfu_config(200, 4), None));
let mut handles = vec![];
for t in 0..4 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..25 {
let key = t * 100 + i;
c.put(key, format!("value_{}", key), 10);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.current_size() > 0, "Size should be tracked");
assert!(cache.len() <= 200);
}
#[test]
fn test_concurrent_access_empty_cache() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(10, 2), None));
let mut handles = vec![];
for _ in 0..8 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..100 {
assert!(c.get(&i).is_none(), "Empty cache should return None");
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.is_empty());
}
#[test]
fn test_concurrent_single_key() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(10, 2), None));
let put_count = Arc::new(AtomicUsize::new(0));
let get_count = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for _ in 0..8 {
let c = Arc::clone(&cache);
let pc = Arc::clone(&put_count);
let gc = Arc::clone(&get_count);
handles.push(thread::spawn(move || {
for i in 0..100 {
c.put(1, i, 1);
pc.fetch_add(1, Ordering::Relaxed);
if c.get(&1).is_some() {
gc.fetch_add(1, Ordering::Relaxed);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.get(&1).is_some(), "Key should exist");
assert_eq!(cache.len(), 1, "Should have exactly 1 key");
assert_eq!(put_count.load(Ordering::Relaxed), 8 * 100);
}
#[test]
fn test_concurrent_capacity_one() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(1, 1), None));
let mut handles = vec![];
for t in 0..4 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..100 {
c.put(t * 100 + i, i, 1);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert_eq!(cache.len(), 1, "Cache with capacity 1 should have 1 entry");
}
#[test]
fn test_contains_key_consistency() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(50, 4), None));
for i in 0..30 {
cache.put(i, i, 1);
}
let mut handles = vec![];
for _ in 0..4 {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..30 {
if c.contains(&i) {
let _ = c.get(&i);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
}
#[test]
fn test_all_concurrent_caches_len_consistency() {
let lru: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(50, 4), None));
let lfu: Arc<ConcurrentLfuCache<i32, i32>> =
Arc::new(ConcurrentLfuCache::init(lfu_config(50, 4), None));
let lfuda: Arc<ConcurrentLfudaCache<i32, i32>> =
Arc::new(ConcurrentLfudaCache::init(lfuda_config(50, 4), None));
let slru: Arc<ConcurrentSlruCache<i32, i32>> =
Arc::new(ConcurrentSlruCache::init(slru_config(50, 15, 4), None));
let gdsf: Arc<ConcurrentGdsfCache<i32, i32>> =
Arc::new(ConcurrentGdsfCache::init(gdsf_config(50, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let lru_c = Arc::clone(&lru);
let lfu_c = Arc::clone(&lfu);
let lfuda_c = Arc::clone(&lfuda);
let slru_c = Arc::clone(&slru);
let gdsf_c = Arc::clone(&gdsf);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
lru_c.put(key, key, 1);
lfu_c.put(key, key, 1);
lfuda_c.put(key, key, 1);
slru_c.put(key, key, 1);
gdsf_c.put(key, key, 1);
if i % 3 == 0 {
let _ = lru_c.get(&key);
let _ = lfu_c.get(&key);
let _ = lfuda_c.get(&key);
let _ = slru_c.get(&key);
let _ = gdsf_c.get(&key);
}
if i % 7 == 0 {
let _ = lru_c.remove(&key);
let _ = lfu_c.remove(&key);
let _ = lfuda_c.remove(&key);
let _ = slru_c.remove(&key);
let _ = gdsf_c.remove(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(lru.len() <= 50, "LRU exceeded capacity");
assert!(lfu.len() <= 50, "LFU exceeded capacity");
assert!(lfuda.len() <= 50, "LFUDA exceeded capacity");
assert!(slru.len() <= 50, "SLRU exceeded capacity");
assert!(gdsf.len() <= 50, "GDSF exceeded capacity");
}
#[test]
fn test_all_concurrent_caches_clear() {
let lru: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(100, 4), None));
let lfu: Arc<ConcurrentLfuCache<i32, i32>> =
Arc::new(ConcurrentLfuCache::init(lfu_config(100, 4), None));
let lfuda: Arc<ConcurrentLfudaCache<i32, i32>> =
Arc::new(ConcurrentLfudaCache::init(lfuda_config(100, 4), None));
let slru: Arc<ConcurrentSlruCache<i32, i32>> =
Arc::new(ConcurrentSlruCache::init(slru_config(100, 30, 4), None));
let gdsf: Arc<ConcurrentGdsfCache<i32, i32>> =
Arc::new(ConcurrentGdsfCache::init(gdsf_config(100, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let lru_c = Arc::clone(&lru);
let lfu_c = Arc::clone(&lfu);
let lfuda_c = Arc::clone(&lfuda);
let slru_c = Arc::clone(&slru);
let gdsf_c = Arc::clone(&gdsf);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
lru_c.put(key, key, 1);
lfu_c.put(key, key, 1);
lfuda_c.put(key, key, 1);
slru_c.put(key, key, 1);
gdsf_c.put(key, key, 1);
if i % 100 == 0 {
lru_c.clear();
lfu_c.clear();
lfuda_c.clear();
slru_c.clear();
gdsf_c.clear();
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
lru.clear();
lfu.clear();
lfuda.clear();
slru.clear();
gdsf.clear();
assert!(lru.is_empty(), "LRU should be empty after clear");
assert!(lfu.is_empty(), "LFU should be empty after clear");
assert!(lfuda.is_empty(), "LFUDA should be empty after clear");
assert!(slru.is_empty(), "SLRU should be empty after clear");
assert!(gdsf.is_empty(), "GDSF should be empty after clear");
}
#[test]
fn test_concurrent_lru_size_based_eviction() {
let max_size: u64 = 100 * 1024; let segment_count = 4;
let cache: Arc<ConcurrentLruCache<i32, String>> = Arc::new(ConcurrentLruCache::init(
lru_config_with_size(200, max_size, segment_count),
None,
));
let object_size: u64 = 1024; let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, format!("value_{}", key), object_size);
if i % 3 == 0 {
let _ = c.get(&key);
}
if i % 5 == 0 {
let _ = c.peek(&key);
}
if i % 7 == 0 {
let _ = c.contains(&key);
}
if i % 11 == 0 {
let _ = c.remove(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
let final_size = cache.current_size();
assert!(
final_size <= max_size,
"Size {} should not exceed max_size {}",
final_size,
max_size
);
}
#[test]
fn test_concurrent_lfu_size_based_eviction() {
let max_size: u64 = 100 * 1024; let segment_count = 4;
let cache: Arc<ConcurrentLfuCache<i32, String>> = Arc::new(ConcurrentLfuCache::init(
lfu_config_with_size(200, max_size, segment_count),
None,
));
let object_size: u64 = 1024; let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, format!("value_{}", key), object_size);
if i % 5 == 0 {
for _ in 0..3 {
let _ = c.get(&key);
}
}
if i % 4 == 0 {
let _ = c.peek(&key);
}
if i % 6 == 0 {
let _ = c.contains(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
let final_size = cache.current_size();
assert!(
final_size <= max_size,
"LFU Size {} should not exceed max_size {}",
final_size,
max_size
);
}
#[test]
fn test_concurrent_lfuda_size_based_eviction() {
let max_size: u64 = 100 * 1024; let segment_count = 4;
let cache: Arc<ConcurrentLfudaCache<i32, String>> = Arc::new(ConcurrentLfudaCache::init(
lfuda_config_with_size(200, max_size, segment_count),
None,
));
let object_size: u64 = 1024; let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, format!("value_{}", key), object_size);
if i % 4 == 0 {
for _ in 0..3 {
let _ = c.get(&key);
}
}
if i % 5 == 0 {
let _ = c.peek(&key);
}
if i % 7 == 0 {
let _ = c.contains(&key);
}
if i % 10 == 0 {
let _ = c.remove(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
let final_size = cache.current_size();
assert!(
final_size <= max_size,
"LFUDA Size {} should not exceed max_size {}",
final_size,
max_size
);
}
#[test]
fn test_concurrent_gdsf_size_based_eviction() {
let max_size: u64 = 100 * 1024; let segment_count = 4;
let cache: Arc<ConcurrentGdsfCache<i32, String>> = Arc::new(ConcurrentGdsfCache::init(
gdsf_config_with_size(200, max_size, segment_count),
None,
));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
let size = ((i % 5) + 1) as u64 * 256; c.put(key, format!("value_{}", key), size);
if i % 3 == 0 {
for _ in 0..3 {
let _ = c.get(&key);
}
}
if i % 5 == 0 {
let _ = c.peek(&key);
}
if i % 6 == 0 {
let _ = c.contains(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
let final_size = cache.current_size();
assert!(
final_size <= max_size,
"GDSF Size {} should not exceed max_size {}",
final_size,
max_size
);
}
#[test]
fn test_concurrent_lru_per_segment_size_limit() {
let max_size: u64 = 100 * 1024; let segment_count = 4;
let cache: Arc<ConcurrentLruCache<i32, String>> = Arc::new(ConcurrentLruCache::init(
lru_config_with_size(200, max_size, segment_count),
None,
));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
let size = ((i % 4) + 1) as u64 * 2048; c.put(key, format!("large_value_{}", key), size);
let current = c.current_size();
assert!(
current <= max_size,
"Size {} exceeded max_size {} during concurrent insert",
current,
max_size
);
if i % 4 == 0 {
let _ = c.get(&key);
}
if i % 6 == 0 {
let _ = c.peek(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
let final_size = cache.current_size();
assert!(
final_size <= max_size,
"Final size {} should not exceed max_size {}",
final_size,
max_size
);
}
#[test]
fn test_concurrent_size_tracking_accuracy() {
let max_size: u64 = 1024 * 1024; let segment_count = 4;
let cache: Arc<ConcurrentLruCache<i32, String>> = Arc::new(ConcurrentLruCache::init(
lru_config_with_size(1000, max_size, segment_count),
None,
));
let item_size: u64 = 100; let num_items = 100;
let handles: Vec<_> = (0..4)
.map(|t| {
let cache = Arc::clone(&cache);
std::thread::spawn(move || {
for i in 0..25 {
let key = t * 25 + i;
cache.put(key, format!("value_{}", key), item_size);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let expected_size = num_items as u64 * item_size;
let actual_size = cache.current_size();
let actual_len = cache.len();
println!(
"Size tracking: expected={}B, actual={}B, len={}",
expected_size, actual_size, actual_len
);
assert_eq!(actual_len, num_items, "All items should be present");
assert_eq!(
actual_size, expected_size,
"Size should match expected total"
);
}
#[test]
fn test_concurrent_size_tracking_on_remove() {
let max_size: u64 = 500 * 1024; let segment_count = 4;
let cache: Arc<ConcurrentLruCache<i32, String>> = Arc::new(ConcurrentLruCache::init(
lru_config_with_size(500, max_size, segment_count),
None,
));
let item_size: u64 = 1024;
for i in 0..100 {
cache.put(i, format!("value_{}", i), item_size);
}
let size_before = cache.current_size();
assert_eq!(size_before, 100 * item_size, "Size should be 100KB");
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
if i < 50 {
let _ = c.remove(&(i as i32));
}
c.put(key + 1000, format!("new_value_{}", key), item_size);
if i % 3 == 0 {
let _ = c.get(&(key + 1000));
}
if i % 5 == 0 {
let _ = c.contains(&(key + 1000));
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
let final_size = cache.current_size();
assert!(
final_size <= max_size,
"Size {} should be <= max_size {}",
final_size,
max_size
);
}
#[test]
fn test_concurrent_lru_with_max_size() {
let max_size: u64 = 1024 * 1024; let cache: Arc<ConcurrentLruCache<String, Vec<u8>>> = Arc::new(ConcurrentLruCache::init(
lru_config_with_size(10000, max_size, 4),
None,
));
let actual_max = cache.max_size();
assert!(
actual_max >= max_size - cache.segment_count() as u64
&& actual_max <= max_size + cache.segment_count() as u64,
"max_size should be approximately {} but was {}",
max_size,
actual_max
);
assert_eq!(cache.current_size(), 0);
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = format!("key_{}_{}", t, i);
let data = vec![0u8; 100];
c.put(key.clone(), data, 100);
if i % 3 == 0 {
let _ = c.get(&key);
}
if i % 5 == 0 {
let _ = c.contains(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.current_size() > 0, "Size should be tracked");
}
#[test]
fn test_concurrent_lru_with_limits() {
let max_size: u64 = 100_000;
let cache: Arc<ConcurrentLruCache<i32, String>> = Arc::new(ConcurrentLruCache::init(
lru_config_with_size(200, max_size, 4),
None,
));
assert_eq!(cache.max_size(), max_size);
assert_eq!(cache.current_size(), 0);
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, format!("value_{}", key), 100);
if i % 50 == 0 {
c.clear();
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
cache.clear();
assert_eq!(cache.len(), 0);
assert_eq!(cache.current_size(), 0);
}
#[test]
fn test_concurrent_lfu_with_max_size() {
let max_size: u64 = 1024 * 1024;
let cache: Arc<ConcurrentLfuCache<String, Vec<u8>>> = Arc::new(ConcurrentLfuCache::init(
lfu_config_with_size(10000, max_size, 4),
None,
));
let actual_max = cache.max_size();
assert!(
actual_max >= max_size - cache.segment_count() as u64
&& actual_max <= max_size + cache.segment_count() as u64,
"max_size should be approximately {} but was {}",
max_size,
actual_max
);
assert_eq!(cache.current_size(), 0);
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = format!("key_{}_{}", t, i);
c.put(key.clone(), vec![1, 2, 3], 100);
if i % 3 == 0 {
let _ = c.get(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.current_size() > 0, "Size should be tracked");
}
#[test]
fn test_concurrent_lfu_with_limits() {
let max_size: u64 = 100_000;
let cache: Arc<ConcurrentLfuCache<i32, String>> = Arc::new(ConcurrentLfuCache::init(
lfu_config_with_size(200, max_size, 4),
None,
));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, format!("value_{}", key), 100);
if i % 50 == 0 {
c.clear();
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
cache.clear();
assert_eq!(cache.len(), 0);
assert_eq!(cache.current_size(), 0);
}
#[test]
fn test_concurrent_lfuda_with_max_size() {
let max_size: u64 = 1024 * 1024;
let cache: Arc<ConcurrentLfudaCache<String, Vec<u8>>> = Arc::new(ConcurrentLfudaCache::init(
lfuda_config_with_size(10000, max_size, 4),
None,
));
let actual_max = cache.max_size();
assert!(
actual_max >= max_size - cache.segment_count() as u64
&& actual_max <= max_size + cache.segment_count() as u64,
"max_size should be approximately {} but was {}",
max_size,
actual_max
);
assert_eq!(cache.current_size(), 0);
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = format!("key_{}_{}", t, i);
c.put(key.clone(), vec![1, 2, 3], 100);
if i % 3 == 0 {
let _ = c.get(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.current_size() > 0, "Size should be tracked");
}
#[test]
fn test_concurrent_lfuda_with_limits() {
let max_size: u64 = 100_000;
let cache: Arc<ConcurrentLfudaCache<i32, String>> = Arc::new(ConcurrentLfudaCache::init(
lfuda_config_with_size(200, max_size, 4),
None,
));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, format!("value_{}", key), 100);
if i % 50 == 0 {
c.clear();
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
cache.clear();
assert_eq!(cache.len(), 0);
assert_eq!(cache.current_size(), 0);
}
#[test]
fn test_concurrent_gdsf_with_max_size() {
let max_size: u64 = 1024 * 1024;
let cache: Arc<ConcurrentGdsfCache<String, Vec<u8>>> = Arc::new(ConcurrentGdsfCache::init(
gdsf_config_with_size(10000, max_size, 4),
None,
));
let actual_max = cache.max_size();
assert!(
actual_max >= max_size - cache.segment_count() as u64
&& actual_max <= max_size + cache.segment_count() as u64,
"max_size should be approximately {} but was {}",
max_size,
actual_max
);
assert_eq!(cache.current_size(), 0);
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = format!("key_{}_{}", t, i);
c.put(key.clone(), vec![1, 2, 3], 100);
if i % 3 == 0 {
let _ = c.get(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.current_size() > 0, "Size should be tracked");
}
#[test]
fn test_concurrent_gdsf_with_limits() {
let max_size: u64 = 100_000;
let cache: Arc<ConcurrentGdsfCache<i32, String>> = Arc::new(ConcurrentGdsfCache::init(
gdsf_config_with_size(200, max_size, 4),
None,
));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, format!("value_{}", key), 100);
if i % 50 == 0 {
c.clear();
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
cache.clear();
assert_eq!(cache.len(), 0);
assert_eq!(cache.current_size(), 0);
}
#[test]
fn test_concurrent_slru_with_max_size() {
let max_size: u64 = 1024 * 1024;
let cache: Arc<ConcurrentSlruCache<String, Vec<u8>>> = Arc::new(ConcurrentSlruCache::init(
slru_config_with_size(10000, 2000, max_size, 4),
None,
));
let actual_max = cache.max_size();
assert!(
actual_max >= max_size - cache.segment_count() as u64
&& actual_max <= max_size + cache.segment_count() as u64,
"max_size should be approximately {} but was {}",
max_size,
actual_max
);
assert_eq!(cache.current_size(), 0);
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = format!("key_{}_{}", t, i);
c.put(key.clone(), vec![1, 2, 3], 100);
for _ in 0..3 {
let _ = c.get(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.current_size() > 0, "Size should be tracked");
}
#[test]
fn test_concurrent_slru_with_limits() {
let max_size: u64 = 100_000;
let cache: Arc<ConcurrentSlruCache<i32, String>> = Arc::new(ConcurrentSlruCache::init(
slru_config_with_size(200, 50, max_size, 4),
None,
));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, format!("value_{}", key), 100);
for _ in 0..3 {
let _ = c.get(&key);
}
if i % 50 == 0 {
c.clear();
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
cache.clear();
assert_eq!(cache.len(), 0);
assert_eq!(cache.current_size(), 0);
}
#[test]
fn test_concurrent_clear_during_operations() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(1000, 4), None));
for i in 0..100 {
cache.put(i, i, 1);
}
assert_eq!(cache.len(), 100);
let cache_clone = Arc::clone(&cache);
let handle = thread::spawn(move || {
for _ in 0..5 {
cache_clone.clear();
std::thread::sleep(std::time::Duration::from_millis(20));
}
});
for i in 100..200 {
cache.put(i, i, 1);
}
handle.join().unwrap();
let len = cache.len();
assert!(len <= 1000, "Cache should respect capacity");
}
#[test]
fn test_concurrent_lru_record_miss() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(100, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
c.record_miss(100);
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key, 1);
let _ = c.get(&key);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
let metrics = cache.metrics();
assert!(
metrics.get("cache_misses").unwrap_or(&0.0) >= &2.0,
"Should have recorded misses"
);
}
#[test]
fn test_concurrent_lru_get_with() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(10000, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key * 2, 1);
let doubled = c.get_with(&key, |v| v * 2);
if let Some(val) = doubled {
assert_eq!(val, key * 4);
}
let _ = c.get_with(&(key + 100000), |v| v * 2);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= cache.capacity());
}
#[test]
fn test_concurrent_lfu_get_with() {
let cache: Arc<ConcurrentLfuCache<i32, i32>> =
Arc::new(ConcurrentLfuCache::init(lfu_config(10000, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key * 2, 1);
let doubled = c.get_with(&key, |v| v * 2);
if let Some(val) = doubled {
assert_eq!(val, key * 4);
}
let _ = c.get_with(&(key + 100000), |v| v * 2);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= cache.capacity());
}
#[test]
fn test_concurrent_lfuda_get_with() {
let cache: Arc<ConcurrentLfudaCache<i32, i32>> =
Arc::new(ConcurrentLfudaCache::init(lfuda_config(10000, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key * 2, 1);
let doubled = c.get_with(&key, |v| v * 2);
if let Some(val) = doubled {
assert_eq!(val, key * 4);
}
let _ = c.get_with(&(key + 100000), |v| v * 2);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= cache.capacity());
}
#[test]
fn test_concurrent_slru_get_with() {
let cache: Arc<ConcurrentSlruCache<i32, i32>> =
Arc::new(ConcurrentSlruCache::init(slru_config(10000, 2000, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key * 2, 1);
let doubled = c.get_with(&key, |v| v * 2);
if let Some(val) = doubled {
assert_eq!(val, key * 4);
}
let _ = c.get_with(&(key + 100000), |v| v * 2);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= cache.capacity());
}
#[test]
fn test_concurrent_gdsf_get_with() {
let cache: Arc<ConcurrentGdsfCache<i32, i32>> =
Arc::new(ConcurrentGdsfCache::init(gdsf_config(10000, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key * 2, 1);
let doubled = c.get_with(&key, |v| v * 2);
if let Some(val) = doubled {
assert_eq!(val, key * 4);
}
let _ = c.get_with(&(key + 100000), |v| v * 2);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= cache.capacity());
}
#[test]
fn test_concurrent_lru_get_mut_with() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(10000, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key, 1);
let old_val = c.get_mut_with(&key, |v| {
let old = *v;
*v += 100;
old
});
if let Some(val) = old_val {
assert_eq!(val, key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= cache.capacity());
}
#[test]
fn test_concurrent_all_caches_contains() {
let lru: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(1000, 4), None));
let lfu: Arc<ConcurrentLfuCache<i32, i32>> =
Arc::new(ConcurrentLfuCache::init(lfu_config(1000, 4), None));
let lfuda: Arc<ConcurrentLfudaCache<i32, i32>> =
Arc::new(ConcurrentLfudaCache::init(lfuda_config(1000, 4), None));
let slru: Arc<ConcurrentSlruCache<i32, i32>> =
Arc::new(ConcurrentSlruCache::init(slru_config(1000, 200, 4), None));
let gdsf: Arc<ConcurrentGdsfCache<i32, i32>> =
Arc::new(ConcurrentGdsfCache::init(gdsf_config(1000, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let lru_c = Arc::clone(&lru);
let lfu_c = Arc::clone(&lfu);
let lfuda_c = Arc::clone(&lfuda);
let slru_c = Arc::clone(&slru);
let gdsf_c = Arc::clone(&gdsf);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
let _ = lru_c.contains(&key);
lru_c.put(key, key, 1);
let _ = lru_c.contains(&key); if i % 10 == 0 {
lru_c.remove(&key);
}
lfu_c.put(key, key, 1);
let _ = lfu_c.contains(&key);
if i % 10 == 0 {
lfu_c.remove(&key);
}
lfuda_c.put(key, key, 1);
let _ = lfuda_c.contains(&key);
if i % 10 == 0 {
lfuda_c.remove(&key);
}
slru_c.put(key, key, 1);
let _ = slru_c.contains(&key);
if i % 10 == 0 {
slru_c.remove(&key);
}
gdsf_c.put(key, key, 1);
let _ = gdsf_c.contains(&key);
if i % 10 == 0 {
gdsf_c.remove(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
}
#[test]
fn test_concurrent_lru_peek() {
let cache: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(10000, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key, 1);
let peeked = c.peek(&key);
if let Some(val) = peeked {
assert_eq!(val, key);
}
let _ = c.peek(&(key + 100000));
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= cache.capacity());
}
#[test]
fn test_concurrent_lfu_peek() {
let cache: Arc<ConcurrentLfuCache<i32, i32>> =
Arc::new(ConcurrentLfuCache::init(lfu_config(10000, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key, 1);
let peeked = c.peek(&key);
if let Some(val) = peeked {
assert_eq!(val, key);
}
let _ = c.peek(&(key + 100000));
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= cache.capacity());
}
#[test]
fn test_concurrent_lfuda_peek() {
let cache: Arc<ConcurrentLfudaCache<i32, i32>> =
Arc::new(ConcurrentLfudaCache::init(lfuda_config(10000, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key, 1);
let peeked = c.peek(&key);
if let Some(val) = peeked {
assert_eq!(val, key);
}
let _ = c.peek(&(key + 100000));
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= cache.capacity());
}
#[test]
fn test_concurrent_slru_peek() {
let cache: Arc<ConcurrentSlruCache<i32, i32>> =
Arc::new(ConcurrentSlruCache::init(slru_config(10000, 2000, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key, 1);
let peeked = c.peek(&key);
if let Some(val) = peeked {
assert_eq!(val, key);
}
let _ = c.peek(&(key + 100000));
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= cache.capacity());
}
#[test]
fn test_concurrent_gdsf_peek() {
let cache: Arc<ConcurrentGdsfCache<i32, i32>> =
Arc::new(ConcurrentGdsfCache::init(gdsf_config(10000, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let c = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
c.put(key, key, 1);
let peeked = c.peek(&key);
if let Some(val) = peeked {
assert_eq!(val, key);
}
let _ = c.peek(&(key + 100000));
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= cache.capacity());
}
#[test]
fn test_concurrent_all_caches_capacity_and_segments() {
let lru: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(1000, 4), None));
let lfu: Arc<ConcurrentLfuCache<i32, i32>> =
Arc::new(ConcurrentLfuCache::init(lfu_config(1000, 8), None));
let lfuda: Arc<ConcurrentLfudaCache<i32, i32>> =
Arc::new(ConcurrentLfudaCache::init(lfuda_config(1000, 4), None));
let slru: Arc<ConcurrentSlruCache<i32, i32>> =
Arc::new(ConcurrentSlruCache::init(slru_config(1000, 200, 8), None));
let gdsf: Arc<ConcurrentGdsfCache<i32, i32>> =
Arc::new(ConcurrentGdsfCache::init(gdsf_config(1000, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let lru_c = Arc::clone(&lru);
let lfu_c = Arc::clone(&lfu);
let lfuda_c = Arc::clone(&lfuda);
let slru_c = Arc::clone(&slru);
let gdsf_c = Arc::clone(&gdsf);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
lru_c.put(key, key, 1);
assert_eq!(lru_c.capacity(), 1000);
assert_eq!(lru_c.segment_count(), 4);
lfu_c.put(key, key, 1);
assert_eq!(lfu_c.capacity(), 1000);
assert_eq!(lfu_c.segment_count(), 8);
lfuda_c.put(key, key, 1);
assert_eq!(lfuda_c.capacity(), 1000);
assert_eq!(lfuda_c.segment_count(), 4);
slru_c.put(key, key, 1);
assert_eq!(slru_c.capacity(), 1000);
assert_eq!(slru_c.segment_count(), 8);
gdsf_c.put(key, key, 1);
assert_eq!(gdsf_c.capacity(), 1000);
assert_eq!(gdsf_c.segment_count(), 4);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
}
#[test]
fn test_concurrent_all_caches_algorithm_name() {
let lru: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(1000, 4), None));
let lfu: Arc<ConcurrentLfuCache<i32, i32>> =
Arc::new(ConcurrentLfuCache::init(lfu_config(1000, 4), None));
let lfuda: Arc<ConcurrentLfudaCache<i32, i32>> =
Arc::new(ConcurrentLfudaCache::init(lfuda_config(1000, 4), None));
let slru: Arc<ConcurrentSlruCache<i32, i32>> =
Arc::new(ConcurrentSlruCache::init(slru_config(1000, 200, 4), None));
let gdsf: Arc<ConcurrentGdsfCache<i32, i32>> =
Arc::new(ConcurrentGdsfCache::init(gdsf_config(1000, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let lru_c = Arc::clone(&lru);
let lfu_c = Arc::clone(&lfu);
let lfuda_c = Arc::clone(&lfuda);
let slru_c = Arc::clone(&slru);
let gdsf_c = Arc::clone(&gdsf);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
lru_c.put(key, key, 1);
assert_eq!(lru_c.algorithm_name(), "ConcurrentLRU");
lfu_c.put(key, key, 1);
assert_eq!(lfu_c.algorithm_name(), "ConcurrentLFU");
lfuda_c.put(key, key, 1);
assert_eq!(lfuda_c.algorithm_name(), "ConcurrentLFUDA");
slru_c.put(key, key, 1);
assert_eq!(slru_c.algorithm_name(), "ConcurrentSLRU");
gdsf_c.put(key, key, 1);
assert_eq!(gdsf_c.algorithm_name(), "ConcurrentGDSF");
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
}
#[test]
fn test_concurrent_all_caches_metrics() {
let lru: Arc<ConcurrentLruCache<i32, i32>> =
Arc::new(ConcurrentLruCache::init(lru_config(1000, 4), None));
let lfu: Arc<ConcurrentLfuCache<i32, i32>> =
Arc::new(ConcurrentLfuCache::init(lfu_config(1000, 4), None));
let lfuda: Arc<ConcurrentLfudaCache<i32, i32>> =
Arc::new(ConcurrentLfudaCache::init(lfuda_config(1000, 4), None));
let slru: Arc<ConcurrentSlruCache<i32, i32>> =
Arc::new(ConcurrentSlruCache::init(slru_config(1000, 200, 4), None));
let gdsf: Arc<ConcurrentGdsfCache<i32, i32>> =
Arc::new(ConcurrentGdsfCache::init(gdsf_config(1000, 4), None));
let mut handles = vec![];
for t in 0..NUM_THREADS {
let lru_c = Arc::clone(&lru);
let lfu_c = Arc::clone(&lfu);
let lfuda_c = Arc::clone(&lfuda);
let slru_c = Arc::clone(&slru);
let gdsf_c = Arc::clone(&gdsf);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = (t * OPS_PER_THREAD + i) as i32;
lru_c.put(key, key, 1);
let _ = lru_c.get(&key);
let metrics = lru_c.metrics();
assert!(metrics.contains_key("cache_hits"), "LRU should track hits");
lfu_c.put(key, key, 1);
let _ = lfu_c.get(&key);
let metrics = lfu_c.metrics();
assert!(metrics.contains_key("cache_hits"), "LFU should track hits");
lfuda_c.put(key, key, 1);
let _ = lfuda_c.get(&key);
let metrics = lfuda_c.metrics();
assert!(
metrics.contains_key("cache_hits"),
"LFUDA should track hits"
);
slru_c.put(key, key, 1);
let _ = slru_c.get(&key);
let metrics = slru_c.metrics();
assert!(metrics.contains_key("cache_hits"), "SLRU should track hits");
gdsf_c.put(key, key, 1);
let _ = gdsf_c.get(&key);
let metrics = gdsf_c.metrics();
assert!(metrics.contains_key("cache_hits"), "GDSF should track hits");
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
}