use fibre_cache::{CacheBuilder, builder::TimerWheelMode};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
#[tokio::test]
async fn test_iter_hangs_on_ttl_expiration_during_iteration() {
println!("\n=== Test 1: TTL Expiration During Iteration ===");
let cache = CacheBuilder::<u32, String>::new()
.capacity(100)
.time_to_live(Duration::from_millis(100))
.timer_mode(TimerWheelMode::HighPrecisionShortLived)
.build()
.unwrap();
cache.insert(1, "job_1".to_string(), 1);
cache.insert(2, "job_2".to_string(), 1);
cache.insert(3, "job_3".to_string(), 1);
println!("Inserted 3 items, waiting 90ms (close to 100ms TTL)...");
tokio::time::sleep(Duration::from_millis(90)).await;
let hung = Arc::new(AtomicBool::new(false));
let hung_clone = hung.clone();
let iteration_count = Arc::new(AtomicUsize::new(0));
let count_clone = iteration_count.clone();
let iteration_task = tokio::task::spawn_blocking(move || {
let start = std::time::Instant::now();
let mut count = 0;
println!("Starting iteration...");
for (_key, _value) in cache.iter() {
count += 1;
count_clone.store(count, Ordering::SeqCst);
if count > 10 {
hung_clone.store(true, Ordering::SeqCst);
println!("ERROR: Iterated {} times on 3 items!", count);
return count;
}
if start.elapsed() > Duration::from_secs(2) {
hung_clone.store(true, Ordering::SeqCst);
println!("ERROR: Iteration taking too long: {:?}", start.elapsed());
return count;
}
}
println!(
"Iteration completed: {} items in {:?}",
count,
start.elapsed()
);
count
});
match tokio::time::timeout(Duration::from_secs(5), iteration_task).await {
Ok(Ok(count)) => {
let is_hung = hung.load(Ordering::SeqCst);
assert!(!is_hung, "Iterator entered infinite loop!");
assert!(count <= 3, "Should iterate at most 3 items, got {}", count);
println!("✓ Test passed: {} iterations", count);
}
Ok(Err(e)) => panic!("Iterator task panicked: {:?}", e),
Err(_) => {
let count = iteration_count.load(Ordering::SeqCst);
panic!(
"Iterator timed out after {} iterations - INFINITE LOOP DETECTED!",
count
);
}
}
}
#[tokio::test]
async fn test_iter_with_concurrent_janitor_expiration() {
println!("\n=== Test 2: Concurrent Janitor Cleanup ===");
let cache = Arc::new(
CacheBuilder::<u32, String>::new()
.capacity(100)
.time_to_live(Duration::from_millis(50))
.timer_mode(TimerWheelMode::HighPrecisionShortLived)
.janitor_tick_interval(Duration::from_millis(10)) .build()
.unwrap(),
);
println!("Inserting 5 items with 50ms TTL...");
for i in 0..5 {
cache.insert(i, format!("job_{}", i), 1);
}
println!("Waiting 45ms (edge of expiration)...");
tokio::time::sleep(Duration::from_millis(45)).await;
println!("Starting iteration while janitor is actively cleaning...");
let cache_clone = cache.clone();
let iter_count = Arc::new(AtomicUsize::new(0));
let count_clone = iter_count.clone();
let iter_task = tokio::task::spawn_blocking(move || {
let start = std::time::Instant::now();
let mut count = 0;
for (_k, _v) in cache_clone.iter() {
count += 1;
count_clone.store(count, Ordering::SeqCst);
if count > 100 {
println!("ERROR: Iterated {} times on 5 items!", count);
return count;
}
if start.elapsed() > Duration::from_secs(2) {
println!("ERROR: Iteration took {:?}", start.elapsed());
return count;
}
}
println!("Completed: {} items in {:?}", count, start.elapsed());
count
});
match tokio::time::timeout(Duration::from_secs(5), iter_task).await {
Ok(Ok(count)) => {
println!("✓ Test passed: {} iterations", count);
assert!(count <= 5, "Should iterate at most 5 items");
}
Ok(Err(e)) => panic!("Task panicked: {:?}", e),
Err(_) => {
let count = iter_count.load(Ordering::SeqCst);
panic!("Iterator hung indefinitely after {} iterations!", count);
}
}
}
#[test]
fn test_iter_edge_case_two_expiring_items() {
println!("\n=== Test 3: Two Expiring Items (Production Scenario) ===");
let cache = CacheBuilder::<u32, String>::new()
.capacity(100)
.time_to_live(Duration::from_millis(100))
.timer_mode(TimerWheelMode::HighPrecisionShortLived)
.build()
.unwrap();
println!("Inserting 2 items...");
cache.insert(1, "job_a".to_string(), 1);
cache.insert(2, "job_b".to_string(), 1);
println!("Waiting 95ms (close to 100ms TTL)...");
std::thread::sleep(Duration::from_millis(95));
println!("Starting iteration (should complete in microseconds)...");
let start = std::time::Instant::now();
let mut iterations = 0;
for (_k, _v) in cache.iter() {
iterations += 1;
if iterations > 100 {
panic!(
"INFINITE LOOP DETECTED! Iterated {} times on 2 items",
iterations
);
}
if start.elapsed() > Duration::from_millis(500) {
panic!(
"Iterator taking too long! {} iterations in {:?}",
iterations,
start.elapsed()
);
}
}
let elapsed = start.elapsed();
println!(
"✓ Completed in {:?} with {} iterations",
elapsed, iterations
);
assert!(
iterations <= 2,
"Should iterate at most 2 items, got {}",
iterations
);
assert!(
elapsed < Duration::from_millis(100),
"Should complete quickly, took {:?}",
elapsed
);
}
#[tokio::test]
async fn test_production_scenario_simulation() {
println!("\n=== Test 4: Production Scenario (1 Hour TTL) ===");
let job_history = Arc::new(
CacheBuilder::<u32, String>::new()
.capacity(10_000)
.time_to_live(Duration::from_secs(2)) .build()
.unwrap(),
);
println!("Inserting 2 completed jobs with 2s TTL...");
job_history.insert(1, "completed_job_1".to_string(), 1);
job_history.insert(2, "completed_job_2".to_string(), 1);
println!("Waiting 1.9s (close to TTL boundary)...");
tokio::time::sleep(Duration::from_millis(1900)).await;
println!("Calling list_all_jobs equivalent...");
let iter_count = Arc::new(AtomicUsize::new(0));
let count_clone = iter_count.clone();
let result = tokio::task::spawn_blocking(move || {
let mut summaries = Vec::new();
let start = std::time::Instant::now();
let mut count = 0;
for (_id, def) in job_history.iter() {
count += 1;
count_clone.store(count, Ordering::SeqCst);
if count > 100 {
println!("ERROR: {} iterations on 2 items!", count);
return summaries;
}
if start.elapsed() > Duration::from_secs(2) {
println!("ERROR: Iteration took {:?}", start.elapsed());
return summaries;
}
summaries.push(def.clone());
}
println!("Completed: {} items in {:?}", count, start.elapsed());
summaries
});
match tokio::time::timeout(Duration::from_secs(5), result).await {
Ok(Ok(summaries)) => {
println!("✓ Success: got {} summaries", summaries.len());
}
Ok(Err(e)) => panic!("Task panicked: {:?}", e),
Err(_) => {
let count = iter_count.load(Ordering::SeqCst);
panic!(
"REPRODUCED BUG: Iterator hung at TTL boundary after {} iterations!",
count
);
}
}
}
#[test]
fn test_multiple_sequential_iterations() {
println!("\n=== Test 5: Multiple Sequential Iterations ===");
let cache = CacheBuilder::<u32, String>::new()
.capacity(100)
.time_to_live(Duration::from_millis(200))
.timer_mode(TimerWheelMode::HighPrecisionShortLived)
.build()
.unwrap();
cache.insert(1, "a".to_string(), 1);
cache.insert(2, "b".to_string(), 1);
cache.insert(3, "c".to_string(), 1);
for attempt in 1..=5 {
let delay_ms = 30 * attempt;
println!(
"Attempt {}: waiting {}ms, then iterating...",
attempt, delay_ms
);
std::thread::sleep(Duration::from_millis(delay_ms));
let start = std::time::Instant::now();
let mut count = 0;
for _ in cache.iter() {
count += 1;
if count > 50 {
panic!(
"Attempt {}: Infinite loop after {} iterations!",
attempt, count
);
}
}
let elapsed = start.elapsed();
println!(" → {} items in {:?}", count, elapsed);
if elapsed > Duration::from_millis(500) {
panic!("Attempt {}: Took too long: {:?}", attempt, elapsed);
}
}
println!("✓ All attempts completed successfully");
}
#[test]
fn test_iter_empty_cache_with_ttl() {
println!("\n=== Test 6: Empty Cache with TTL ===");
let cache = CacheBuilder::<u32, String>::new()
.capacity(100)
.time_to_live(Duration::from_millis(100))
.timer_mode(TimerWheelMode::HighPrecisionShortLived)
.build()
.unwrap();
println!("Inserting 2 items, waiting for expiration...");
cache.insert(1, "a".to_string(), 1);
cache.insert(2, "b".to_string(), 1);
std::thread::sleep(Duration::from_millis(150));
println!("Items should be expired, iterating...");
let start = std::time::Instant::now();
let mut count = 0;
for _ in cache.iter() {
count += 1;
if count > 10 {
panic!("Infinite loop on empty/expired cache!");
}
}
let elapsed = start.elapsed();
println!("✓ Completed: {} items in {:?}", count, elapsed);
assert!(count == 0, "Should iterate 0 items on expired cache");
assert!(
elapsed < Duration::from_millis(100),
"Should complete quickly"
);
}
#[tokio::test]
async fn test_rapid_iterations_during_expiration() {
println!("\n=== Test 7: Rapid Iterations During Expiration ===");
let cache = Arc::new(
CacheBuilder::<u32, String>::new()
.capacity(100)
.time_to_live(Duration::from_millis(100))
.timer_mode(TimerWheelMode::HighPrecisionShortLived)
.build()
.unwrap(),
);
cache.insert(1, "a".to_string(), 1);
cache.insert(2, "b".to_string(), 1);
tokio::time::sleep(Duration::from_millis(90)).await;
println!("Starting rapid iterations during expiration window...");
let mut handles = vec![];
for i in 0..5 {
let cache_clone = cache.clone();
let handle = tokio::task::spawn_blocking(move || {
let start = std::time::Instant::now();
let mut count = 0;
for _ in cache_clone.iter() {
count += 1;
if count > 20 {
return Err(format!("Iterator {}: infinite loop!", i));
}
}
if start.elapsed() > Duration::from_secs(1) {
return Err(format!("Iterator {}: took {:?}", i, start.elapsed()));
}
Ok((i, count))
});
handles.push(handle);
}
for handle in handles {
match tokio::time::timeout(Duration::from_secs(3), handle).await {
Ok(Ok(Ok((id, count)))) => {
println!(" Iterator {} completed: {} items", id, count);
}
Ok(Ok(Err(e))) => panic!("{}", e),
Ok(Err(e)) => panic!("Iterator panicked: {:?}", e),
Err(_) => panic!("Iterator hung!"),
}
}
println!("✓ All rapid iterations completed");
}
#[test]
fn test_iterator_state_consistency() {
println!("\n=== Test 8: Iterator State Consistency ===");
let cache = CacheBuilder::<u32, String>::new()
.capacity(100)
.time_to_live(Duration::from_millis(100))
.timer_mode(TimerWheelMode::HighPrecisionShortLived)
.build()
.unwrap();
cache.insert(1, "a".to_string(), 1);
cache.insert(2, "b".to_string(), 1);
cache.insert(3, "c".to_string(), 1);
std::thread::sleep(Duration::from_millis(90));
println!("Creating iterator and partially consuming it...");
let mut iter = cache.iter();
let mut seen_keys = std::collections::HashSet::new();
if let Some((key, _)) = iter.next() {
seen_keys.insert(key);
println!(" First item: key={}", key);
}
std::thread::sleep(Duration::from_millis(20));
println!("Consuming rest of iterator...");
let start = std::time::Instant::now();
let mut count = 1;
for (key, _) in iter {
count += 1;
if seen_keys.contains(&key) {
panic!("DUPLICATE KEY DETECTED: {} - ITERATOR CYCLE!", key);
}
seen_keys.insert(key);
if count > 20 {
panic!("Infinite loop detected after partial consumption!");
}
if start.elapsed() > Duration::from_millis(500) {
panic!("Partial consumption took too long!");
}
}
println!("✓ Iterator consumed {} unique items", count);
assert!(count <= 3, "Should see at most 3 items");
}
#[test]
#[ignore] fn test_manual_cpu_observation() {
println!("\n=== Manual CPU Observation Test ===");
println!("Monitor CPU usage while this test runs.");
println!("If CPU spikes to 100% and test hangs, bug is reproduced!\n");
let cache = CacheBuilder::<u32, String>::new()
.capacity(100)
.time_to_live(Duration::from_millis(100))
.timer_mode(TimerWheelMode::HighPrecisionShortLived)
.build()
.unwrap();
cache.insert(1, "a".to_string(), 1);
cache.insert(2, "b".to_string(), 1);
println!("Waiting 95ms...");
std::thread::sleep(Duration::from_millis(95));
println!("Starting iteration NOW - check CPU!");
println!("(Test will timeout after 10s if hung)\n");
let start = std::time::Instant::now();
let mut count = 0;
for (_k, _v) in cache.iter() {
count += 1;
if count % 1000 == 0 {
println!(" ... {} iterations in {:?}", count, start.elapsed());
}
if start.elapsed() > Duration::from_secs(10) {
panic!("TIMEOUT: {} iterations in 10 seconds - HUNG!", count);
}
if count > 100000 {
panic!("INFINITE LOOP: {} iterations!", count);
}
}
println!("\n✓ Completed: {} items in {:?}", count, start.elapsed());
}