#![cfg(feature = "sync")]
use moka::sync::Cache;
use moka::Expiry;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
type HandleResult<T> = Result<T, String>;
struct CustomExpiry {
error_ttl: Duration,
}
impl Default for CustomExpiry {
fn default() -> Self {
Self {
error_ttl: Duration::from_millis(100),
}
}
}
impl<K, V> Expiry<K, HandleResult<V>> for CustomExpiry {
fn expire_after_create(&self, _: &K, value: &HandleResult<V>, _: Instant) -> Option<Duration> {
match value {
Ok(_) => None,
Err(_) => Some(self.error_ttl),
}
}
fn expire_after_update(
&self,
_: &K,
value: &HandleResult<V>,
_: Instant,
_: Option<Duration>,
) -> Option<Duration> {
match value {
Ok(_) => None,
Err(_) => Some(self.error_ttl),
}
}
}
#[test]
fn test_timer_wheel_panic() {
let test_duration = Duration::from_secs(5);
const NUM_KEYS: u64 = if cfg!(miri) { 25 } else { 100 };
let panics = Arc::new(AtomicUsize::new(0));
let cache: Cache<u64, HandleResult<u64>> = Cache::builder()
.name("test_cache")
.expire_after(CustomExpiry::default())
.time_to_idle(Duration::from_secs(240))
.max_capacity(10000)
.build();
let cache = Arc::new(cache);
let start = Instant::now();
let insert_handles: Vec<_> = (0..4)
.map(|tid| {
let cache = Arc::clone(&cache);
let panics = Arc::clone(&panics);
let duration = test_duration;
thread::spawn(move || {
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut ops = 0u64;
while start.elapsed() < duration {
for key in 0..NUM_KEYS {
let key = key + (tid as u64 * 10000);
cache.insert(key, Err("error".into()));
cache.insert(key, Ok(ops));
if ops % 3 == 0 {
cache.insert(key, Err("retry".into()));
}
ops += 1;
}
}
println!("[Insert {}] done: {} ops", tid, ops);
}))
.unwrap_or_else(|_| {
panics.fetch_add(1, Ordering::Relaxed);
eprintln!("[Insert {}] PANICKED!", tid);
});
})
})
.collect();
let get_handles: Vec<_> = (0..4)
.map(|tid| {
let cache = Arc::clone(&cache);
let panics = Arc::clone(&panics);
let duration = test_duration;
thread::spawn(move || {
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut ops = 0u64;
while start.elapsed() < duration {
for key in 0..NUM_KEYS {
let key = key + ((ops % 4) * 10000);
let _ = cache.get(&key);
ops += 1;
}
}
println!("[Get {}] done: {} ops", tid, ops);
}))
.unwrap_or_else(|_| {
panics.fetch_add(1, Ordering::Relaxed);
eprintln!("[Get {}] PANICKED!", tid);
});
})
})
.collect();
let cache_hk = Arc::clone(&cache);
let panics_hk = Arc::clone(&panics);
let hk = thread::spawn(move || {
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
while start.elapsed() < test_duration {
cache_hk.run_pending_tasks();
thread::sleep(Duration::from_millis(50));
}
}))
.unwrap_or_else(|_| {
panics_hk.fetch_add(1, Ordering::Relaxed);
eprintln!("[HK] PANICKED!");
});
});
for h in insert_handles {
let _ = h.join();
}
for h in get_handles {
let _ = h.join();
}
let _ = hk.join();
let total_panics = panics.load(Ordering::Relaxed);
assert_eq!(
total_panics, 0,
"Timer wheel panic detected! {} threads panicked",
total_panics
);
}
#[test]
#[ignore]
fn stress_test_timer_wheel_panic() {
const DURATION_MINUTES: u64 = if cfg!(miri) { 20 } else { 1 };
let test_duration = Duration::from_secs(DURATION_MINUTES * 60);
let panics = Arc::new(AtomicUsize::new(0));
let cache: Cache<u64, HandleResult<u64>> = Cache::builder()
.name("stress_test_cache")
.expire_after(CustomExpiry::default())
.time_to_idle(Duration::from_secs(240))
.max_capacity(10000)
.build();
let cache = Arc::new(cache);
let start = Instant::now();
const NUM_INSERT_THREADS: i32 = if cfg!(miri) { 6 } else { 8 };
const NUM_GET_THREADS: i32 = if cfg!(miri) { 4 } else { 8 };
let insert_handles: Vec<_> = (0..NUM_INSERT_THREADS)
.map(|tid| {
let cache = Arc::clone(&cache);
let panics = Arc::clone(&panics);
let duration = test_duration;
thread::spawn(move || {
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut ops = 0u64;
const REPORT_INTERVAL: u64 = if cfg!(miri) { 5 } else { 100_000 };
'l: loop {
for key in 0..200u64 {
let key = key + (tid as u64 * 10000);
cache.insert(key, Err("error".into()));
cache.insert(key, Ok(ops));
if ops % 3 == 0 {
cache.insert(key, Err("retry".into()));
}
if ops % 7 == 0 {
cache.invalidate(&key);
}
ops += 1;
if ops % REPORT_INTERVAL == 0 {
println!("[Insert {}] running: {} ops", tid, ops);
}
if start.elapsed() >= duration {
break 'l;
}
}
}
println!("[Insert {}] done: {} ops", tid, ops);
}))
.unwrap_or_else(|e| {
panics.fetch_add(1, Ordering::Relaxed);
eprintln!("[Insert {}] PANICKED: {:?}", tid, e);
});
})
})
.collect();
let get_handles: Vec<_> = (0..NUM_GET_THREADS)
.map(|tid| {
let cache = Arc::clone(&cache);
let panics = Arc::clone(&panics);
let duration = test_duration;
thread::spawn(move || {
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut ops = 0u64;
const REPORT_INTERVAL: u64 = if cfg!(miri) { 50 } else { 100_000 };
'l: loop {
for key in 0..200u64 {
let key = key + ((ops % NUM_GET_THREADS as u64) * 10000);
let _ = cache.get(&key);
ops += 1;
if ops % REPORT_INTERVAL == 0 {
println!("[Get {}] running: {} ops", tid, ops);
}
if start.elapsed() >= duration {
break 'l;
}
}
}
println!("[Get {}] done: {} ops", tid, ops);
}))
.unwrap_or_else(|e| {
panics.fetch_add(1, Ordering::Relaxed);
eprintln!("[Get {}] PANICKED: {:?}", tid, e);
});
})
})
.collect();
let hk_handles = if cfg!(miri) {
None
} else {
let handles = (0..2)
.map(|tid| {
let cache = Arc::clone(&cache);
let panics = Arc::clone(&panics);
let duration = test_duration;
thread::spawn(move || {
let mut ops = 0u64;
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
while start.elapsed() < duration {
cache.run_pending_tasks();
thread::sleep(Duration::from_millis(25));
ops += 1;
if ops % 50 == 0 {
println!("[HK {}] running: {} ops", tid, ops);
}
}
}))
.unwrap_or_else(|e| {
panics.fetch_add(1, Ordering::Relaxed);
eprintln!("[HK {}] PANICKED: {:?}", tid, e);
});
})
})
.collect::<Vec<_>>();
Some(handles)
};
for h in insert_handles {
let _ = h.join();
}
for h in get_handles {
let _ = h.join();
}
if let Some(hk_handles) = hk_handles {
for h in hk_handles {
let _ = h.join();
}
}
let total_panics = panics.load(Ordering::Relaxed);
assert_eq!(
total_panics, 0,
"Timer wheel panic detected! {} threads panicked during stress test.",
total_panics
);
}