use ferntree::Tree;
use rand::prelude::*;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc::{channel, RecvTimeoutError};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
fn run_with_timeout<F, R>(timeout: Duration, name: &str, f: F) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = channel();
let name = name.to_string();
let handle = thread::spawn(move || {
let result = f();
let _ = tx.send(result);
});
match rx.recv_timeout(timeout) {
Ok(result) => {
handle.join().expect("Thread panicked");
result
}
Err(RecvTimeoutError::Timeout) => {
panic!(
"TIMEOUT: '{}' did not complete within {:?} - potential deadlock detected",
name, timeout
);
}
Err(RecvTimeoutError::Disconnected) => {
handle.join().expect("Thread panicked without sending result");
panic!("Thread terminated unexpectedly without completing");
}
}
}
#[allow(dead_code)]
fn run_concurrent_with_timeout<F>(timeout: Duration, name: &str, num_threads: usize, f: F)
where
F: Fn(usize) + Send + Sync + 'static,
{
let f = Arc::new(f);
let (tx, rx) = channel();
let name = name.to_string();
let handles: Vec<_> = (0..num_threads)
.map(|i| {
let f = Arc::clone(&f);
let tx = tx.clone();
thread::spawn(move || {
f(i);
let _ = tx.send(i);
})
})
.collect();
drop(tx);
let start = Instant::now();
let mut completed = 0;
while completed < num_threads {
let remaining = timeout.saturating_sub(start.elapsed());
if remaining.is_zero() {
panic!(
"TIMEOUT: '{}' - only {}/{} threads completed within {:?} - potential deadlock",
name, completed, num_threads, timeout
);
}
match rx.recv_timeout(remaining) {
Ok(_) => completed += 1,
Err(RecvTimeoutError::Timeout) => {
panic!(
"TIMEOUT: '{}' - only {}/{} threads completed within {:?} - potential deadlock",
name, completed, num_threads, timeout
);
}
Err(RecvTimeoutError::Disconnected) => {
break;
}
}
}
for handle in handles {
handle.join().expect("Thread panicked");
}
}
#[test]
fn deadlock_multiple_writers_different_orders() {
run_with_timeout(Duration::from_secs(10), "multiple_writers_different_orders", || {
let tree = Arc::new(Tree::<i32, i32>::new());
for i in 0..100 {
tree.insert(i, i);
}
let num_threads = 4;
let iterations = 100;
let handles: Vec<_> = (0..num_threads)
.map(|t| {
let tree = Arc::clone(&tree);
thread::spawn(move || {
let mut rng = rand::rng();
for _ in 0..iterations {
let keys: Vec<i32> = (0..10).map(|_| rng.random_range(0..100)).collect();
for key in keys {
tree.insert(key, t);
}
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
tree.assert_invariants();
});
}
#[test]
fn deadlock_reader_writer_interleaving() {
run_with_timeout(Duration::from_secs(10), "reader_writer_interleaving", || {
let tree = Arc::new(Tree::<i32, i32>::new());
for i in 0..100 {
tree.insert(i, i);
}
let running = Arc::new(AtomicBool::new(true));
let num_readers = 4;
let num_writers = 2;
let reader_handles: Vec<_> = (0..num_readers)
.map(|_| {
let tree = Arc::clone(&tree);
let running = Arc::clone(&running);
thread::spawn(move || {
let mut count = 0u64;
while running.load(Ordering::Relaxed) {
let mut iter = tree.raw_iter();
iter.seek_to_first();
while iter.next().is_some() {
count += 1;
}
}
count
})
})
.collect();
let writer_handles: Vec<_> = (0..num_writers)
.map(|t| {
let tree = Arc::clone(&tree);
let running = Arc::clone(&running);
thread::spawn(move || {
let mut rng = rand::rng();
let mut count = 0u64;
while running.load(Ordering::Relaxed) {
let key: i32 = rng.random_range(0..100);
if rng.random_bool(0.5) {
tree.insert(key, t);
} else {
tree.remove(&key);
}
count += 1;
}
count
})
})
.collect();
thread::sleep(Duration::from_millis(500));
running.store(false, Ordering::Relaxed);
let reader_ops: u64 = reader_handles.into_iter().map(|h| h.join().unwrap()).sum();
let writer_ops: u64 = writer_handles.into_iter().map(|h| h.join().unwrap()).sum();
assert!(reader_ops > 0, "Readers made no progress");
assert!(writer_ops > 0, "Writers made no progress");
tree.assert_invariants();
});
}
#[test]
fn deadlock_concurrent_splits() {
run_with_timeout(Duration::from_secs(15), "concurrent_splits", || {
let tree = Arc::new(Tree::<i32, i32>::new());
let num_threads = 8;
let entries_per_thread = 500;
let handles: Vec<_> = (0..num_threads)
.map(|t| {
let tree = Arc::clone(&tree);
thread::spawn(move || {
for i in 0..entries_per_thread {
let key = t * entries_per_thread + i;
tree.insert(key, key * 10);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
tree.assert_invariants();
assert_eq!(tree.len(), (num_threads * entries_per_thread) as usize);
assert!(tree.height() > 1, "Tree should have split");
});
}
#[test]
fn deadlock_concurrent_merges() {
run_with_timeout(Duration::from_secs(15), "concurrent_merges", || {
let tree = Arc::new(Tree::<i32, i32>::new());
for i in 0..2000 {
tree.insert(i, i);
}
tree.assert_invariants();
let num_threads = 4;
let entries_per_thread = 500;
let handles: Vec<_> = (0..num_threads)
.map(|t| {
let tree = Arc::clone(&tree);
thread::spawn(move || {
for i in 0..entries_per_thread {
let key = t * entries_per_thread + i;
tree.remove(&key);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
tree.assert_invariants();
assert!(tree.is_empty(), "All entries should be removed");
});
}
#[test]
fn deadlock_interleaved_splits_and_merges() {
run_with_timeout(Duration::from_secs(20), "interleaved_splits_and_merges", || {
let tree = Arc::new(Tree::<i32, i32>::new());
for i in 0..500 {
tree.insert(i, i);
}
let running = Arc::new(AtomicBool::new(true));
let num_threads = 6;
let handles: Vec<_> = (0..num_threads)
.map(|t| {
let tree = Arc::clone(&tree);
let running = Arc::clone(&running);
thread::spawn(move || {
let mut rng = rand::rng();
let mut ops = 0u64;
while running.load(Ordering::Relaxed) {
let key: i32 = rng.random_range(0..1000);
if t % 2 == 0 {
if rng.random_bool(0.7) {
tree.insert(key, t);
} else {
tree.remove(&key);
}
} else {
if rng.random_bool(0.7) {
tree.remove(&key);
} else {
tree.insert(key, t);
}
}
ops += 1;
}
ops
})
})
.collect();
thread::sleep(Duration::from_millis(1000));
running.store(false, Ordering::Relaxed);
let total_ops: u64 = handles.into_iter().map(|h| h.join().unwrap()).sum();
tree.assert_invariants();
assert!(total_ops > 100, "Should have performed many operations");
});
}
#[test]
fn same_thread_insert_different_leaf() {
run_with_timeout(Duration::from_secs(5), "insert_different_leaf", || {
let tree: Tree<i32, i32> = Tree::new();
for i in 0..200 {
tree.insert(i, i);
}
tree.assert_invariants();
assert!(tree.height() > 1, "Need multiple leaves for this test");
let mut iter = tree.raw_iter();
iter.seek(&0);
let (k, _) = iter.next().unwrap();
assert_eq!(*k, 0);
tree.insert(1000, 1000);
let (k, _) = iter.next().unwrap();
assert_eq!(*k, 1);
tree.assert_invariants();
});
}
#[test]
fn same_thread_remove_different_leaf() {
run_with_timeout(Duration::from_secs(5), "remove_different_leaf", || {
let tree: Tree<i32, i32> = Tree::new();
for i in 0..200 {
tree.insert(i, i);
}
tree.assert_invariants();
tree.insert(1000, 1000);
let mut iter = tree.raw_iter();
iter.seek(&0);
let (k, _) = iter.next().unwrap();
assert_eq!(*k, 0);
let removed = tree.remove(&1000);
assert_eq!(removed, Some(1000));
let (k, _) = iter.next().unwrap();
assert_eq!(*k, 1);
tree.assert_invariants();
});
}
#[test]
fn same_thread_iterator_releases_locks_on_leaf_change() {
run_with_timeout(Duration::from_secs(5), "iterator_releases_locks", || {
let tree: Tree<i32, i32> = Tree::new();
for i in 0..200 {
tree.insert(i, i);
}
tree.assert_invariants();
let mut iter = tree.raw_iter();
iter.seek_to_first();
let mut count = 0;
while iter.next().is_some() {
count += 1;
}
assert_eq!(count, 200);
tree.insert(0, 999);
assert_eq!(tree.lookup(&0, |v| *v), Some(999));
tree.assert_invariants();
});
}
#[test]
fn same_thread_exclusive_iter_different_leaf() {
run_with_timeout(Duration::from_secs(5), "exclusive_iter_different_leaf", || {
let tree: Tree<i32, i32> = Tree::new();
for i in 0..200 {
tree.insert(i, i);
}
tree.assert_invariants();
let mut iter = tree.raw_iter_mut();
iter.seek(&0);
if let Some((_, v)) = iter.next() {
*v = 999;
}
drop(iter);
tree.insert(1000, 1000);
tree.assert_invariants();
});
}
#[test]
fn concurrent_delete_ahead_of_iterator() {
run_with_timeout(Duration::from_secs(10), "delete_ahead_of_iterator", || {
let tree = Arc::new(Tree::<i32, i32>::new());
for i in 0..500 {
tree.insert(i, i);
}
tree.assert_invariants();
let tree_reader = Arc::clone(&tree);
let tree_deleter = Arc::clone(&tree);
let reader = thread::spawn(move || {
let mut iter = tree_reader.raw_iter();
iter.seek_to_first();
let mut prev = -1i32;
let mut count = 0;
while let Some((k, v)) = iter.next() {
assert!(*k > prev, "Order violation: {} not > {} at count {}", k, prev, count);
assert_eq!(*k, *v, "Value mismatch at key {}", k);
prev = *k;
count += 1;
}
count
});
let deleter = thread::spawn(move || {
for i in (250..500).rev() {
tree_deleter.remove(&i);
}
});
deleter.join().unwrap();
let count = reader.join().unwrap();
assert!(count > 0, "Reader saw no entries");
tree.assert_invariants();
});
}
#[test]
fn concurrent_delete_behind_iterator() {
run_with_timeout(Duration::from_secs(10), "delete_behind_iterator", || {
let tree = Arc::new(Tree::<i32, i32>::new());
for i in 0..500 {
tree.insert(i, i);
}
tree.assert_invariants();
let tree_reader = Arc::clone(&tree);
let tree_deleter = Arc::clone(&tree);
let reader = thread::spawn(move || {
let mut iter = tree_reader.raw_iter();
iter.seek(&250);
let mut prev = 249i32;
let mut count = 0;
while let Some((k, _)) = iter.next() {
assert!(*k > prev, "Order violation");
prev = *k;
count += 1;
}
count
});
let deleter = thread::spawn(move || {
for i in 0..250 {
tree_deleter.remove(&i);
}
});
deleter.join().unwrap();
let count = reader.join().unwrap();
assert!(count > 0, "Reader saw no entries");
tree.assert_invariants();
});
}
#[test]
fn concurrent_delete_during_reverse_iteration() {
run_with_timeout(Duration::from_secs(10), "delete_during_reverse_iteration", || {
let tree = Arc::new(Tree::<i32, i32>::new());
for i in 0..500 {
tree.insert(i, i);
}
tree.assert_invariants();
let tree_reader = Arc::clone(&tree);
let tree_deleter = Arc::clone(&tree);
let reader = thread::spawn(move || {
let mut iter = tree_reader.raw_iter();
iter.seek_to_last();
let mut prev = 500i32;
let mut count = 0;
while let Some((k, _)) = iter.prev() {
assert!(*k < prev, "Reverse order violation: {} not < {}", k, prev);
prev = *k;
count += 1;
}
count
});
let deleter = thread::spawn(move || {
let mut rng = rand::rng();
for _ in 0..200 {
let key: i32 = rng.random_range(0..500);
tree_deleter.remove(&key);
}
});
deleter.join().unwrap();
let count = reader.join().unwrap();
assert!(count > 0, "Reader saw no entries");
tree.assert_invariants();
});
}
#[test]
fn concurrent_structural_changes_during_iteration() {
run_with_timeout(Duration::from_secs(15), "structural_changes_during_iteration", || {
let tree = Arc::new(Tree::<i32, i32>::new());
for i in 0..200 {
tree.insert(i, i);
}
tree.assert_invariants();
let running = Arc::new(AtomicBool::new(true));
let tree_reader = Arc::clone(&tree);
let tree_modifier = Arc::clone(&tree);
let running_reader = Arc::clone(&running);
let running_modifier = Arc::clone(&running);
let reader = thread::spawn(move || {
let mut iterations = 0u64;
while running_reader.load(Ordering::Relaxed) {
let mut iter = tree_reader.raw_iter();
iter.seek_to_first();
let mut prev = -1i32;
while let Some((k, _)) = iter.next() {
assert!(*k > prev, "Order violation during iteration {}", iterations);
prev = *k;
}
iterations += 1;
}
iterations
});
let modifier = thread::spawn(move || {
let mut rng = rand::rng();
let mut ops = 0u64;
while running_modifier.load(Ordering::Relaxed) {
let key: i32 = rng.random_range(0..1000);
if rng.random_bool(0.5) {
tree_modifier.insert(key, key);
} else {
tree_modifier.remove(&key);
}
ops += 1;
}
ops
});
thread::sleep(Duration::from_millis(1000));
running.store(false, Ordering::Relaxed);
let read_ops = reader.join().unwrap();
let modify_ops = modifier.join().unwrap();
assert!(read_ops > 0, "Reader made no progress");
assert!(modify_ops > 0, "Modifier made no progress");
tree.assert_invariants();
});
}
#[test]
fn concurrent_iteration_maintains_sorted_order() {
run_with_timeout(Duration::from_secs(10), "iteration_maintains_sorted_order", || {
let tree = Arc::new(Tree::<i32, i32>::new());
for i in 0..200 {
tree.insert(i, i);
}
let tree_reader = Arc::clone(&tree);
let tree_writer = Arc::clone(&tree);
let reader = thread::spawn(move || {
for _ in 0..10 {
let mut iter = tree_reader.raw_iter();
iter.seek_to_first();
let mut keys = Vec::new();
while let Some((k, _)) = iter.next() {
keys.push(*k);
}
for window in keys.windows(2) {
assert!(
window[0] < window[1],
"Order violation: {} >= {}",
window[0],
window[1]
);
}
}
});
let writer = thread::spawn(move || {
let mut rng = rand::rng();
for _ in 0..100 {
let key: i32 = rng.random_range(0..500);
if rng.random_bool(0.5) {
tree_writer.insert(key, key);
} else {
tree_writer.remove(&key);
}
}
});
reader.join().unwrap();
writer.join().unwrap();
tree.assert_invariants();
});
}
#[test]
fn stress_timeout_concurrent_mixed_high_contention() {
run_with_timeout(Duration::from_secs(30), "stress_mixed_high_contention", || {
let tree = Arc::new(Tree::<i32, i32>::new());
let num_threads = 8;
let key_range = 100; let ops_per_thread = 2000;
for i in 0..key_range {
tree.insert(i, i);
}
let handles: Vec<_> = (0..num_threads)
.map(|t| {
let tree = Arc::clone(&tree);
thread::spawn(move || {
let mut rng = rand::rng();
for _ in 0..ops_per_thread {
let key: i32 = rng.random_range(0..key_range);
match rng.random_range(0..4) {
0 => {
tree.insert(key, t);
}
1 => {
tree.remove(&key);
}
2 => {
tree.lookup(&key, |v| *v);
}
3 => {
let mut iter = tree.raw_iter();
iter.seek_to_first();
while iter.next().is_some() {}
}
_ => unreachable!(),
}
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
tree.assert_invariants();
});
}
#[test]
fn stress_timeout_single_key_contention() {
run_with_timeout(Duration::from_secs(15), "stress_single_key_contention", || {
let tree = Arc::new(Tree::<i32, i32>::new());
let num_threads = 8;
let iterations = 5000;
tree.insert(42, 0);
let handles: Vec<_> = (0..num_threads)
.map(|t| {
let tree = Arc::clone(&tree);
thread::spawn(move || {
for i in 0..iterations {
match i % 3 {
0 => {
tree.insert(42, t);
}
1 => {
tree.lookup(&42, |v| *v);
}
2 => {
tree.remove(&42);
tree.insert(42, t);
}
_ => unreachable!(),
}
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
tree.assert_invariants();
});
}
#[test]
fn stress_timeout_rapid_split_merge_cycles() {
run_with_timeout(Duration::from_secs(30), "stress_rapid_split_merge", || {
let tree = Arc::new(Tree::<i32, i32>::new());
let num_threads = 4;
let cycles = 10;
for cycle in 0..cycles {
let handles: Vec<_> = (0..num_threads)
.map(|t| {
let tree = Arc::clone(&tree);
thread::spawn(move || {
let base = t * 200;
for i in 0..200 {
tree.insert(base + i, cycle);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let handles: Vec<_> = (0..num_threads)
.map(|t| {
let tree = Arc::clone(&tree);
thread::spawn(move || {
let base = t * 200;
for i in 0..200 {
tree.remove(&(base + i));
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
tree.assert_invariants();
}
});
}
#[test]
fn starvation_writer_under_heavy_reads() {
run_with_timeout(Duration::from_secs(30), "writer_starvation", || {
let tree = Arc::new(Tree::<i32, i32>::new());
for i in 0..500 {
tree.insert(i, i);
}
let num_readers = 8;
let writer_target_ops = 100;
let running = Arc::new(AtomicBool::new(true));
let writer_ops = Arc::new(AtomicU64::new(0));
let reader_ops = Arc::new(AtomicU64::new(0));
let reader_handles: Vec<_> = (0..num_readers)
.map(|_| {
let tree = Arc::clone(&tree);
let running = Arc::clone(&running);
let ops = Arc::clone(&reader_ops);
thread::spawn(move || {
while running.load(Ordering::Relaxed) {
let mut iter = tree.raw_iter();
iter.seek_to_first();
while iter.next().is_some() {
ops.fetch_add(1, Ordering::Relaxed);
}
}
})
})
.collect();
thread::sleep(Duration::from_millis(50));
let tree_writer = Arc::clone(&tree);
let writer_ops_clone = Arc::clone(&writer_ops);
let start = Instant::now();
let writer = thread::spawn(move || {
for i in 0..writer_target_ops {
let key = 1000 + i;
tree_writer.insert(key, i);
writer_ops_clone.fetch_add(1, Ordering::Relaxed);
}
});
writer.join().unwrap();
let write_duration = start.elapsed();
running.store(false, Ordering::Relaxed);
for h in reader_handles {
h.join().unwrap();
}
let final_writer_ops = writer_ops.load(Ordering::Relaxed);
let final_reader_ops = reader_ops.load(Ordering::Relaxed);
assert_eq!(
final_writer_ops, writer_target_ops as u64,
"Writer didn't complete all operations"
);
assert!(
write_duration < Duration::from_secs(10),
"Writer took too long ({:?}) - possible starvation",
write_duration
);
assert!(final_reader_ops > 0, "Readers made no progress");
tree.assert_invariants();
for i in 0..writer_target_ops {
let key = 1000 + i;
assert!(tree.contains_key(&key), "Writer's key {} not found", key);
}
});
}
#[test]
fn starvation_reader_under_heavy_writes() {
run_with_timeout(Duration::from_secs(30), "reader_starvation", || {
let tree = Arc::new(Tree::<i32, i32>::new());
for i in 0..200 {
tree.insert(i, i);
}
let num_writers = 8;
let reader_target_iterations = 10;
let running = Arc::new(AtomicBool::new(true));
let reader_iterations = Arc::new(AtomicU64::new(0));
let writer_ops = Arc::new(AtomicU64::new(0));
let writer_handles: Vec<_> = (0..num_writers)
.map(|t| {
let tree = Arc::clone(&tree);
let running = Arc::clone(&running);
let ops = Arc::clone(&writer_ops);
thread::spawn(move || {
let mut rng = rand::rng();
while running.load(Ordering::Relaxed) {
let key: i32 = rng.random_range(0..500);
if rng.random_bool(0.5) {
tree.insert(key, t);
} else {
tree.remove(&key);
}
ops.fetch_add(1, Ordering::Relaxed);
}
})
})
.collect();
thread::sleep(Duration::from_millis(50));
let tree_reader = Arc::clone(&tree);
let reader_iterations_clone = Arc::clone(&reader_iterations);
let start = Instant::now();
let reader = thread::spawn(move || {
for _ in 0..reader_target_iterations {
let mut iter = tree_reader.raw_iter();
iter.seek_to_first();
let mut prev = -1i32;
while let Some((k, _)) = iter.next() {
assert!(*k > prev, "Order violation during read under write pressure");
prev = *k;
}
reader_iterations_clone.fetch_add(1, Ordering::Relaxed);
}
});
reader.join().unwrap();
let read_duration = start.elapsed();
running.store(false, Ordering::Relaxed);
for h in writer_handles {
h.join().unwrap();
}
let final_reader_iters = reader_iterations.load(Ordering::Relaxed);
let final_writer_ops = writer_ops.load(Ordering::Relaxed);
assert_eq!(
final_reader_iters, reader_target_iterations as u64,
"Reader didn't complete all iterations"
);
assert!(
read_duration < Duration::from_secs(15),
"Reader took too long ({:?}) - possible starvation",
read_duration
);
assert!(final_writer_ops > 0, "Writers made no progress");
tree.assert_invariants();
});
}
#[test]
fn starvation_fairness_mixed_workload() {
run_with_timeout(Duration::from_secs(30), "fairness_mixed_workload", || {
let tree = Arc::new(Tree::<i32, i32>::new());
for i in 0..200 {
tree.insert(i, i);
}
let num_readers = 4;
let num_writers = 4;
let test_duration = Duration::from_secs(3);
let running = Arc::new(AtomicBool::new(true));
let reader_ops = Arc::new(AtomicU64::new(0));
let writer_ops = Arc::new(AtomicU64::new(0));
let reader_handles: Vec<_> = (0..num_readers)
.map(|_| {
let tree = Arc::clone(&tree);
let running = Arc::clone(&running);
let ops = Arc::clone(&reader_ops);
thread::spawn(move || {
while running.load(Ordering::Relaxed) {
let mut iter = tree.raw_iter();
iter.seek_to_first();
while iter.next().is_some() {
ops.fetch_add(1, Ordering::Relaxed);
}
}
})
})
.collect();
let writer_handles: Vec<_> = (0..num_writers)
.map(|t| {
let tree = Arc::clone(&tree);
let running = Arc::clone(&running);
let ops = Arc::clone(&writer_ops);
thread::spawn(move || {
let mut rng = rand::rng();
while running.load(Ordering::Relaxed) {
let key: i32 = rng.random_range(0..500);
if rng.random_bool(0.5) {
tree.insert(key, t);
} else {
tree.remove(&key);
}
ops.fetch_add(1, Ordering::Relaxed);
}
})
})
.collect();
thread::sleep(test_duration);
running.store(false, Ordering::Relaxed);
for h in reader_handles {
h.join().unwrap();
}
for h in writer_handles {
h.join().unwrap();
}
let final_reader_ops = reader_ops.load(Ordering::Relaxed);
let final_writer_ops = writer_ops.load(Ordering::Relaxed);
assert!(
final_reader_ops > 100,
"Readers made insufficient progress: {} ops",
final_reader_ops
);
assert!(
final_writer_ops > 100,
"Writers made insufficient progress: {} ops",
final_writer_ops
);
let ratio = if final_reader_ops > final_writer_ops {
final_reader_ops as f64 / final_writer_ops as f64
} else {
final_writer_ops as f64 / final_reader_ops as f64
};
assert!(
ratio < 100.0,
"Unfair workload distribution: reader_ops={}, writer_ops={}, ratio={}",
final_reader_ops,
final_writer_ops,
ratio
);
tree.assert_invariants();
});
}
#[test]
#[ignore]
fn starvation_long_running_fairness() {
run_with_timeout(Duration::from_secs(120), "long_running_fairness", || {
let tree = Arc::new(Tree::<i32, i32>::new());
for i in 0..1000 {
tree.insert(i, i);
}
let num_readers = 8;
let num_writers = 4;
let test_duration = Duration::from_secs(60);
let running = Arc::new(AtomicBool::new(true));
let reader_ops = Arc::new(AtomicU64::new(0));
let writer_ops = Arc::new(AtomicU64::new(0));
let reader_handles: Vec<_> = (0..num_readers)
.map(|_| {
let tree = Arc::clone(&tree);
let running = Arc::clone(&running);
let ops = Arc::clone(&reader_ops);
thread::spawn(move || {
while running.load(Ordering::Relaxed) {
let mut iter = tree.raw_iter();
iter.seek_to_first();
while iter.next().is_some() {}
ops.fetch_add(1, Ordering::Relaxed);
}
})
})
.collect();
let writer_handles: Vec<_> = (0..num_writers)
.map(|t| {
let tree = Arc::clone(&tree);
let running = Arc::clone(&running);
let ops = Arc::clone(&writer_ops);
thread::spawn(move || {
let mut rng = rand::rng();
while running.load(Ordering::Relaxed) {
let key: i32 = rng.random_range(0..2000);
if rng.random_bool(0.5) {
tree.insert(key, t);
} else {
tree.remove(&key);
}
ops.fetch_add(1, Ordering::Relaxed);
}
})
})
.collect();
thread::sleep(test_duration);
running.store(false, Ordering::Relaxed);
for h in reader_handles {
h.join().unwrap();
}
for h in writer_handles {
h.join().unwrap();
}
let final_reader_ops = reader_ops.load(Ordering::Relaxed);
let final_writer_ops = writer_ops.load(Ordering::Relaxed);
tree.assert_invariants();
assert!(
final_reader_ops > 1000,
"Readers potentially starved: only {} iterations",
final_reader_ops
);
assert!(
final_writer_ops > 10000,
"Writers potentially starved: only {} ops",
final_writer_ops
);
});
}