#![cfg(feature = "concurrent")]
use cache_rs::config::{
ConcurrentCacheConfig, ConcurrentGdsfCacheConfig, ConcurrentLfuCacheConfig,
ConcurrentLfudaCacheConfig, ConcurrentLruCacheConfig, ConcurrentSlruCacheConfig,
GdsfCacheConfig, LfuCacheConfig, LfudaCacheConfig, LruCacheConfig, SlruCacheConfig,
};
use cache_rs::{
ConcurrentGdsfCache, ConcurrentLfuCache, ConcurrentLfudaCache, ConcurrentLruCache,
ConcurrentSlruCache,
};
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
const NUM_THREADS: usize = 16;
const OPS_PER_THREAD: usize = 10_000;
fn lru_config(capacity: usize, segments: usize) -> ConcurrentLruCacheConfig {
ConcurrentCacheConfig {
base: LruCacheConfig {
capacity: NonZeroUsize::new(capacity).unwrap(),
max_size: u64::MAX,
},
segments,
}
}
fn slru_config(capacity: usize, protected: usize, segments: usize) -> ConcurrentSlruCacheConfig {
ConcurrentCacheConfig {
base: SlruCacheConfig {
capacity: NonZeroUsize::new(capacity).unwrap(),
protected_capacity: NonZeroUsize::new(protected).unwrap(),
max_size: u64::MAX,
},
segments,
}
}
fn lfu_config(capacity: usize, segments: usize) -> ConcurrentLfuCacheConfig {
ConcurrentCacheConfig {
base: LfuCacheConfig {
capacity: NonZeroUsize::new(capacity).unwrap(),
max_size: u64::MAX,
},
segments,
}
}
fn lfuda_config(capacity: usize, segments: usize) -> ConcurrentLfudaCacheConfig {
ConcurrentCacheConfig {
base: LfudaCacheConfig {
capacity: NonZeroUsize::new(capacity).unwrap(),
initial_age: 0,
max_size: u64::MAX,
},
segments,
}
}
fn gdsf_config(capacity: usize, segments: usize) -> ConcurrentGdsfCacheConfig {
ConcurrentCacheConfig {
base: GdsfCacheConfig {
capacity: NonZeroUsize::new(capacity).unwrap(),
initial_age: 0.0,
max_size: u64::MAX,
},
segments,
}
}
#[test]
fn stress_lru_high_contention() {
let cache: Arc<ConcurrentLruCache<usize, usize>> =
Arc::new(ConcurrentLruCache::init(lru_config(100, 16), None));
let mut handles = Vec::new();
for t in 0..NUM_THREADS {
let cache = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = i % 10; if t % 2 == 0 {
cache.put(key, t * OPS_PER_THREAD + i, 1);
} else {
let _ = cache.get(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 100);
}
#[test]
fn stress_segment_counts() {
for segments in [1, 2, 4, 8, 16, 32] {
let cache: Arc<ConcurrentLruCache<usize, usize>> =
Arc::new(ConcurrentLruCache::init(lru_config(1000, segments), None));
let mut handles = Vec::new();
for t in 0..8 {
let cache = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..1000 {
cache.put(t * 1000 + i, i, 1);
let _ = cache.get(&(t * 1000 + i));
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert_eq!(cache.segment_count(), segments);
assert!(cache.len() <= 1000);
}
}
#[test]
fn stress_empty_cache() {
let cache: Arc<ConcurrentLruCache<usize, usize>> =
Arc::new(ConcurrentLruCache::init(lru_config(100, 16), None));
let mut handles = Vec::new();
for _ in 0..NUM_THREADS {
let cache = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..1000 {
assert!(cache.get(&i).is_none());
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.is_empty());
}
#[test]
fn stress_single_item_cache() {
let cache: Arc<ConcurrentLruCache<usize, usize>> =
Arc::new(ConcurrentLruCache::init(lru_config(16, 16), None));
let mut handles = Vec::new();
for t in 0..NUM_THREADS {
let cache = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..1000 {
cache.put(t, i, 1); let _ = cache.get(&t);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 16);
}
#[test]
fn stress_capacity_limits() {
let capacity = 100;
let cache: Arc<ConcurrentLruCache<usize, usize>> =
Arc::new(ConcurrentLruCache::init(lru_config(capacity, 16), None));
let mut handles = Vec::new();
for t in 0..NUM_THREADS {
let cache = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
cache.put(t * OPS_PER_THREAD + i, i, 1);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= capacity);
}
#[test]
fn stress_concurrent_removes() {
let cache: Arc<ConcurrentLruCache<usize, usize>> =
Arc::new(ConcurrentLruCache::init(lru_config(1000, 16), None));
for i in 0..1000 {
cache.put(i, i, 1);
}
let removed_count = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..NUM_THREADS {
let cache = Arc::clone(&cache);
let removed = Arc::clone(&removed_count);
handles.push(thread::spawn(move || {
for i in 0..1000 {
if cache.remove(&i).is_some() {
removed.fetch_add(1, Ordering::Relaxed);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
let total_removed = removed_count.load(Ordering::Relaxed);
assert!(
total_removed <= 1000,
"Removed {} items, expected <= 1000",
total_removed
);
assert!(cache.is_empty());
}
#[test]
fn stress_concurrent_clear() {
let cache: Arc<ConcurrentLruCache<usize, usize>> =
Arc::new(ConcurrentLruCache::init(lru_config(1000, 16), None));
let mut handles = Vec::new();
for t in 0..NUM_THREADS {
let cache = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..1000 {
cache.put(t * 1000 + i, i, 1);
if i % 100 == 0 {
cache.clear();
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 1000);
}
#[test]
fn stress_slru() {
let cache: Arc<ConcurrentSlruCache<usize, usize>> =
Arc::new(ConcurrentSlruCache::init(slru_config(1000, 500, 16), None));
let mut handles = Vec::new();
for t in 0..NUM_THREADS {
let cache = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = t * OPS_PER_THREAD + i;
cache.put(key, i, 1);
for _ in 0..3 {
let _ = cache.get(&key);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 1000);
}
#[test]
fn stress_lfu() {
let cache: Arc<ConcurrentLfuCache<usize, usize>> =
Arc::new(ConcurrentLfuCache::init(lfu_config(1000, 16), None));
let mut handles = Vec::new();
for t in 0..NUM_THREADS {
let cache = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = t * OPS_PER_THREAD + i;
cache.put(key, i, 1);
if i % 10 == 0 {
for _ in 0..5 {
let _ = cache.get(&key);
}
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 1000);
}
#[test]
fn stress_lfuda() {
let cache: Arc<ConcurrentLfudaCache<usize, usize>> =
Arc::new(ConcurrentLfudaCache::init(lfuda_config(1000, 16), None));
let mut handles = Vec::new();
for t in 0..NUM_THREADS {
let cache = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = t * OPS_PER_THREAD + i;
cache.put(key, i, 1);
let _ = cache.get(&key);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(cache.len() <= 1000);
}
#[test]
fn stress_gdsf() {
let cache: Arc<ConcurrentGdsfCache<usize, usize>> =
Arc::new(ConcurrentGdsfCache::init(gdsf_config(10000, 16), None));
let mut handles = Vec::new();
for t in 0..NUM_THREADS {
let cache = Arc::clone(&cache);
handles.push(thread::spawn(move || {
for i in 0..OPS_PER_THREAD {
let key = t * OPS_PER_THREAD + i;
let size = ((i % 10) + 1) as u64;
cache.put(key, i, size);
let _ = cache.get(&key);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(!cache.is_empty());
}
#[test]
fn stress_mixed_all_caches() {
let lru: Arc<ConcurrentLruCache<String, String>> =
Arc::new(ConcurrentLruCache::init(lru_config(500, 16), None));
let mut handles = Vec::new();
for t in 0..8 {
let cache = Arc::clone(&lru);
handles.push(thread::spawn(move || {
for i in 0..5000 {
let key = format!("key_{}_{}", t, i);
let value = format!("value_{}", i);
match i % 4 {
0 => {
cache.put(key, value, 1);
}
1 => {
let _ = cache.get(&key);
}
2 => {
let _ = cache.remove(&key);
}
_ => {
let _ = cache.contains(&key);
}
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(lru.len() <= 500);
}
#[test]
fn stress_get_with() {
let cache: Arc<ConcurrentLruCache<usize, Vec<usize>>> =
Arc::new(ConcurrentLruCache::init(lru_config(100, 16), None));
for i in 0..100 {
cache.put(i, vec![i; 10], 1);
}
let sum = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..NUM_THREADS {
let cache = Arc::clone(&cache);
let sum = Arc::clone(&sum);
handles.push(thread::spawn(move || {
for i in 0..1000 {
let key = i % 100;
if let Some(len) = cache.get_with(&key, |v| v.len()) {
sum.fetch_add(len, Ordering::Relaxed);
}
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert!(sum.load(Ordering::Relaxed) > 0);
}