#![allow(clippy::drop_non_drop)]
use ferntree::Tree;
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
#[test]
fn reader_during_split_yields_correct_successor() {
let tree = Arc::new(Tree::<u32, u32>::new());
for i in 0..64u32 {
tree.insert(i * 2, i * 2);
}
let barrier = Arc::new(Barrier::new(2));
let writer_barrier = Arc::clone(&barrier);
let tree_writer = Arc::clone(&tree);
let writer = thread::spawn(move || {
writer_barrier.wait();
for i in 0..64u32 {
tree_writer.insert(i * 2 + 1, i * 2 + 1);
}
});
{
let mut iter = tree.raw_iter();
iter.seek_to_first();
barrier.wait();
let mut last: Option<u32> = None;
while let Some((k, _v)) = iter.next() {
if let Some(prev) = last {
assert!(*k > prev, "iterator non-monotonic: {prev} then {k}");
}
last = Some(*k);
}
}
writer.join().unwrap();
tree.assert_invariants();
}
#[test]
fn reader_during_merge_yields_consistent_sequence() {
let tree = Arc::new(Tree::<u32, u32>::new());
for i in 0..512u32 {
tree.insert(i, i);
}
tree.assert_invariants();
let tree_writer = Arc::clone(&tree);
let writer = thread::spawn(move || {
for i in 0..256u32 {
tree_writer.remove(&i);
}
});
let mut seen_count = 0usize;
{
let mut iter = tree.raw_iter();
iter.seek_to_first();
let mut prev: Option<u32> = None;
while let Some((k, _)) = iter.next() {
if let Some(p) = prev {
assert!(*k > p, "iter went backwards: {p} then {k}");
}
prev = Some(*k);
seen_count += 1;
}
} writer.join().unwrap();
assert!(seen_count > 0);
tree.assert_invariants();
}
#[test]
fn two_writers_split_same_leaf_settles_to_consistent_state() {
let tree = Arc::new(Tree::<u32, u32>::new());
for i in 0..32u32 {
tree.insert(i * 4, i);
}
tree.assert_invariants();
let barrier = Arc::new(Barrier::new(2));
let writers: Vec<_> = (0..2u32)
.map(|tid| {
let tree = Arc::clone(&tree);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
for i in 0..128u32 {
let key = if tid == 0 {
1 + 4 * i
} else {
3 + 4 * i
};
tree.insert(key, key);
}
})
})
.collect();
for h in writers {
h.join().unwrap();
}
tree.assert_invariants();
for i in 0..128u32 {
assert_eq!(tree.lookup(&(1 + 4 * i), |v| *v), Some(1 + 4 * i));
assert_eq!(tree.lookup(&(3 + 4 * i), |v| *v), Some(3 + 4 * i));
}
for i in 0..32u32 {
assert_eq!(tree.lookup(&(i * 4), |v| *v), Some(i));
}
}
#[test]
fn concurrent_pop_first_returns_unique_entries() {
let tree = Arc::new(Tree::<u32, u32>::new());
const N: u32 = 1024;
for i in 0..N {
tree.insert(i, i.wrapping_mul(31));
}
let popped = Arc::new(parking_lot::Mutex::new(Vec::new()));
let workers: Vec<_> = (0..4)
.map(|_| {
let tree = Arc::clone(&tree);
let popped = Arc::clone(&popped);
thread::spawn(move || {
let mut local = Vec::new();
while let Some(kv) = tree.pop_first() {
local.push(kv);
}
popped.lock().extend(local);
})
})
.collect();
for h in workers {
h.join().unwrap();
}
let popped = popped.lock();
assert_eq!(popped.len(), N as usize, "wrong number of pops");
let mut seen: BTreeMap<u32, u32> = BTreeMap::new();
for (k, v) in popped.iter() {
assert!(seen.insert(*k, *v).is_none(), "duplicate pop for key {k}");
assert_eq!(*v, k.wrapping_mul(31), "value mismatch for key {k}");
}
for i in 0..N {
assert!(seen.contains_key(&i), "missing key {i} from pops");
}
tree.assert_invariants();
assert!(tree.is_empty());
}
#[test]
fn concurrent_pop_last_returns_unique_entries() {
let tree = Arc::new(Tree::<u32, u32>::new());
const N: u32 = 1024;
for i in 0..N {
tree.insert(i, i);
}
let popped = Arc::new(parking_lot::Mutex::new(Vec::new()));
let workers: Vec<_> = (0..4)
.map(|_| {
let tree = Arc::clone(&tree);
let popped = Arc::clone(&popped);
thread::spawn(move || {
let mut local = Vec::new();
while let Some(kv) = tree.pop_last() {
local.push(kv);
}
popped.lock().extend(local);
})
})
.collect();
for h in workers {
h.join().unwrap();
}
let popped = popped.lock();
let mut seen = std::collections::HashSet::new();
for (k, _) in popped.iter() {
assert!(seen.insert(*k), "duplicate pop for key {k}");
}
assert_eq!(popped.len(), N as usize);
assert!(tree.is_empty());
}
#[test]
fn iterator_outlives_concurrent_clear() {
let tree = Arc::new(Tree::<u32, u32>::new());
for i in 0..1024u32 {
tree.insert(i, i);
}
let tree_writer = Arc::clone(&tree);
let done = Arc::new(AtomicBool::new(false));
let done_w = Arc::clone(&done);
let writer = thread::spawn(move || {
while !done_w.load(Ordering::Relaxed) {
tree_writer.clear();
for i in 0..256u32 {
tree_writer.insert(i, i);
}
}
});
for _ in 0..16 {
let mut iter = tree.raw_iter();
iter.seek_to_first();
let mut last: Option<u32> = None;
while let Some((k, _)) = iter.next() {
if let Some(p) = last {
if *k < p {
}
}
last = Some(*k);
}
}
done.store(true, Ordering::Relaxed);
writer.join().unwrap();
tree.assert_invariants();
}
#[test]
fn concurrent_operations_produce_consistent_final_state() {
let tree = Arc::new(Tree::<u32, u32>::new());
let workers: Vec<_> = (0..4u32)
.map(|tid| {
let tree = Arc::clone(&tree);
thread::spawn(move || {
let base = tid * 10_000;
for i in 0..2_500u32 {
tree.insert(base + i, base + i);
}
for i in 0..1_000u32 {
assert_eq!(tree.remove(&(base + i)), Some(base + i));
}
})
})
.collect();
for h in workers {
h.join().unwrap();
}
tree.assert_invariants();
for tid in 0..4u32 {
let base = tid * 10_000;
for i in 0..1_000u32 {
assert_eq!(tree.lookup(&(base + i), |v| *v), None, "should be removed");
}
for i in 1_000..2_500u32 {
assert_eq!(tree.lookup(&(base + i), |v| *v), Some(base + i));
}
}
}
#[test]
fn concurrent_churn_drops_values_exactly() {
let drop_count = Arc::new(AtomicUsize::new(0));
#[derive(Clone)]
struct Counted(Arc<AtomicUsize>);
impl Drop for Counted {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
unsafe impl ferntree::OptimisticRead for Counted {
const EPOCH_DEFERRED_DROP: bool = true;
type Slot = ferntree::atomic_slot::BoxedSlot<Self>;
}
let inserts = 200usize;
let threads = 4usize;
{
let tree = Arc::new(Tree::<u32, Counted>::new());
let workers: Vec<_> = (0..threads as u32)
.map(|tid| {
let tree = Arc::clone(&tree);
let drop_count = Arc::clone(&drop_count);
thread::spawn(move || {
for i in 0..inserts as u32 {
let key = tid.wrapping_mul(0x1000_0000) ^ i;
tree.insert(key, Counted(Arc::clone(&drop_count)));
}
for i in 0..inserts as u32 {
let key = tid.wrapping_mul(0x1000_0000) ^ i;
tree.remove(&key);
}
})
})
.collect();
for h in workers {
h.join().unwrap();
}
tree.assert_invariants();
assert!(tree.is_empty());
drop(tree);
}
for _ in 0..32 {
drop(crossbeam_epoch::pin());
}
thread::sleep(Duration::from_millis(50));
for _ in 0..32 {
drop(crossbeam_epoch::pin());
}
let observed = drop_count.load(Ordering::SeqCst);
let expected = 2 * inserts * threads;
assert!(observed <= expected, "observed {observed} drops > {expected} inserts (double-drop?)");
assert!(
observed >= expected.saturating_sub(2 * threads),
"observed {observed} drops < expected {expected} (likely leak)"
);
}