#![allow(
clippy::panic,
clippy::pedantic,
clippy::needless_collect,
clippy::indexing_slicing
)]
use super::{LockedParentResult, NodeCleaner};
use crate::internode::InternodeNode;
use crate::leaf15::LeafNode15;
use crate::nodeversion::{LockGuard, NodeVersion};
use crate::policy::{BoxPolicy, ValuePtr};
use crate::tree::MassTree15;
use std::ptr as StdPtr;
use std::sync::Arc;
macro_rules! assert_val_eq {
($got:expr, $expected:expr) => {
assert_eq!($got, $expected);
};
($got:expr, $expected:expr, $($arg:tt)*) => {
assert_eq!($got, $expected, $($arg)*);
};
}
type TestLeaf = LeafNode15<BoxPolicy<u64>>;
type TestInternode = InternodeNode;
type TestTree = MassTree15<u64>;
#[test]
fn test_remove_single_key() {
let tree: MassTree15<u64> = MassTree15::new();
tree.insert(b"key1", 42);
assert_eq!(tree.len(), 1);
let removed = tree.remove(b"key1").unwrap();
assert_val_eq!(removed, Some(42));
assert_eq!(tree.len(), 0);
}
#[test]
fn test_remove_nonexistent_key() {
let tree: MassTree15<u64> = MassTree15::new();
tree.insert(b"key1", 42);
let result = tree.remove(b"key2");
assert!(matches!(result, Ok(None)));
assert_val_eq!(tree.get(b"key1"), Some(42));
}
#[test]
fn test_remove_updates_count() {
let tree: MassTree15<u64> = MassTree15::new();
for i in 0..10u64 {
tree.insert(&i.to_be_bytes(), i);
}
assert_eq!(tree.len(), 10);
for i in 0..5u64 {
let _ = tree.remove(&i.to_be_bytes());
}
assert_eq!(tree.len(), 5);
for i in 5..10u64 {
assert!(tree.get(&i.to_be_bytes()).is_some());
}
for i in 0..5u64 {
assert!(tree.get(&i.to_be_bytes()).is_none());
}
}
#[test]
fn test_remove_returns_old_value() {
let tree: MassTree15<String> = MassTree15::new();
tree.insert(b"key", "hello".to_string());
tree.insert(b"key", "world".to_string());
let removed = tree.remove(b"key").unwrap();
assert_val_eq!(removed, Some("world".to_string()));
}
#[test]
fn test_remove_short_key() {
let tree: MassTree15<u64> = MassTree15::new();
tree.insert(&[42], 1);
assert_val_eq!(tree.remove(&[42]).unwrap(), Some(1));
let key8 = [1, 2, 3, 4, 5, 6, 7, 8];
tree.insert(&key8, 8);
assert_val_eq!(tree.remove(&key8).unwrap(), Some(8));
}
#[test]
fn test_remove_with_suffix() {
let tree: MassTree15<u64> = MassTree15::new();
let key16 = b"0123456789ABCDEF";
tree.insert(key16, 16);
let removed = tree.remove(key16).unwrap();
assert_val_eq!(removed, Some(16));
assert!(tree.get(key16).is_none());
}
#[test]
fn test_remove_all_keys_empties_tree() {
let tree: MassTree15<u64> = MassTree15::new();
let keys: Vec<_> = (0..100u64).map(u64::to_be_bytes).collect();
for (i, key) in keys.iter().enumerate() {
tree.insert(key, i as u64);
}
assert_eq!(tree.len(), 100);
for key in &keys {
let _ = tree.remove(key);
}
assert_eq!(tree.len(), 0);
assert!(tree.is_empty());
}
#[test]
fn test_remove_in_reverse_order() {
let tree: MassTree15<u64> = MassTree15::new();
for i in 0..50u64 {
tree.insert(&i.to_be_bytes(), i);
}
for i in (0..50u64).rev() {
let removed = tree.remove(&i.to_be_bytes()).unwrap();
assert_val_eq!(removed, Some(i));
}
assert!(tree.is_empty());
}
#[test]
fn test_remove_alternating() {
let tree: MassTree15<u64> = MassTree15::new();
for i in 0..100u64 {
tree.insert(&i.to_be_bytes(), i);
}
for i in (0..100u64).step_by(2) {
let _ = tree.remove(&i.to_be_bytes());
}
assert_eq!(tree.len(), 50);
for i in (1..100u64).step_by(2) {
assert!(tree.get(&i.to_be_bytes()).is_some());
}
}
#[test]
fn test_remove_and_reinsert_same_key() {
let tree: MassTree15<u64> = MassTree15::new();
tree.insert(b"key", 1);
let _ = tree.remove(b"key");
tree.insert(b"key", 2);
assert_val_eq!(tree.get(b"key"), Some(2));
}
#[test]
fn test_remove_reinsert_cycle() {
let tree: MassTree15<u64> = MassTree15::new();
let key = b"test_key";
for i in 0..10u64 {
tree.insert(key, i);
assert_val_eq!(tree.get(key), Some(i));
let removed = tree.remove(key).unwrap();
assert_val_eq!(removed, Some(i));
assert!(tree.get(key).is_none());
}
}
#[test]
fn test_remove_from_empty_tree() {
let tree: MassTree15<u64> = MassTree15::new();
let result = tree.remove(b"key");
assert!(matches!(result, Ok(None)));
}
#[test]
fn test_remove_empty_key() {
let tree: MassTree15<u64> = MassTree15::new();
tree.insert(&[], 0);
let removed = tree.remove(&[]).unwrap();
assert_val_eq!(removed, Some(0));
}
#[test]
fn test_remove_preserves_other_keys() {
let tree: MassTree15<u64> = MassTree15::new();
tree.insert(b"aaa", 1);
tree.insert(b"bbb", 2);
tree.insert(b"ccc", 3);
let _ = tree.remove(b"bbb");
assert_val_eq!(tree.get(b"aaa"), Some(1));
assert!(tree.get(b"bbb").is_none());
assert_val_eq!(tree.get(b"ccc"), Some(3));
}
#[test]
fn test_get_parent_erased_leaf() {
let parent_inode: Box<TestInternode> = TestInternode::new(0);
let parent_ptr: *mut u8 = Box::into_raw(parent_inode).cast();
let leaf: Box<TestLeaf> = TestLeaf::new_boxed();
leaf.set_parent(parent_ptr);
let leaf_ptr: *mut u8 = Box::into_raw(leaf).cast();
let got_parent: *mut u8 = unsafe { NodeCleaner::get_parent_erased::<BoxPolicy<u64>>(leaf_ptr) };
assert_eq!(got_parent, parent_ptr);
let _: Box<TestLeaf> = unsafe { Box::from_raw(leaf_ptr.cast::<TestLeaf>()) };
let _: Box<TestInternode> = unsafe { Box::from_raw(parent_ptr.cast::<TestInternode>()) };
}
#[test]
fn test_get_parent_erased_internode() {
let grandparent: Box<TestInternode> = TestInternode::new(1);
let grandparent_ptr: *mut u8 = Box::into_raw(grandparent).cast();
let inode: Box<TestInternode> = TestInternode::new(0);
inode.set_parent(grandparent_ptr);
let inode_ptr: *mut u8 = Box::into_raw(inode).cast();
let got_parent: *mut u8 =
unsafe { NodeCleaner::get_parent_erased::<BoxPolicy<u64>>(inode_ptr) };
assert_eq!(got_parent, grandparent_ptr);
let _: Box<TestInternode> = unsafe { Box::from_raw(inode_ptr.cast::<TestInternode>()) };
let _: Box<TestInternode> = unsafe { Box::from_raw(grandparent_ptr.cast::<TestInternode>()) };
}
#[test]
fn test_get_parent_erased_null_parent() {
let leaf: Box<TestLeaf> = TestLeaf::new_root_boxed();
let leaf_ptr: *mut u8 = Box::into_raw(leaf).cast();
let parent: *mut u8 = unsafe { NodeCleaner::get_parent_erased::<BoxPolicy<u64>>(leaf_ptr) };
assert!(parent.is_null());
let _: Box<TestLeaf> = unsafe { Box::from_raw(leaf_ptr.cast::<TestLeaf>()) };
}
#[test]
fn test_set_parent_erased_leaf() {
let new_parent_node: Box<TestInternode> = TestInternode::new(0);
let new_parent: *mut u8 = Box::into_raw(new_parent_node).cast();
let leaf: Box<TestLeaf> = TestLeaf::new_boxed();
let leaf_ptr: *mut u8 = Box::into_raw(leaf).cast();
assert!(unsafe { (*leaf_ptr.cast::<TestLeaf>()).parent_unguarded().is_null() });
unsafe {
NodeCleaner::set_parent_erased::<BoxPolicy<u64>>(leaf_ptr, new_parent);
}
let actual_parent: *mut u8 = unsafe { (*leaf_ptr.cast::<TestLeaf>()).parent_unguarded() };
assert_eq!(actual_parent, new_parent);
let _: Box<TestLeaf> = unsafe { Box::from_raw(leaf_ptr.cast::<TestLeaf>()) };
let _: Box<TestInternode> = unsafe { Box::from_raw(new_parent.cast::<TestInternode>()) };
}
#[test]
fn test_set_parent_erased_internode() {
let new_parent_node: Box<TestInternode> = TestInternode::new(1);
let new_parent: *mut u8 = Box::into_raw(new_parent_node).cast();
let inode: Box<TestInternode> = TestInternode::new(0);
let inode_ptr: *mut u8 = Box::into_raw(inode).cast();
assert!(unsafe {
(*inode_ptr.cast::<TestInternode>())
.parent_unguarded()
.is_null()
});
unsafe {
NodeCleaner::set_parent_erased::<BoxPolicy<u64>>(inode_ptr, new_parent);
}
let actual_parent: *mut u8 = unsafe { (*inode_ptr.cast::<TestInternode>()).parent_unguarded() };
assert_eq!(actual_parent, new_parent);
let _: Box<TestInternode> = unsafe { Box::from_raw(inode_ptr.cast::<TestInternode>()) };
let _: Box<TestInternode> = unsafe { Box::from_raw(new_parent.cast::<TestInternode>()) };
}
#[test]
fn test_set_parent_erased_type_dispatch() {
let leaf: Box<TestLeaf> = TestLeaf::new_boxed();
let inode: Box<TestInternode> = TestInternode::new(1);
assert!(leaf.version().is_leaf());
assert!(!inode.version().is_leaf());
}
#[test]
fn test_locked_parent_null_parent() {
let leaf: Box<TestLeaf> = TestLeaf::new_root_boxed();
let leaf_ptr: *mut u8 = Box::into_raw(leaf).cast();
let leaf_ref: &TestLeaf = unsafe { &*leaf_ptr.cast::<TestLeaf>() };
let _leaf_lock: LockGuard<'_> = leaf_ref.version().lock();
let result: LockedParentResult<'_> =
unsafe { NodeCleaner::locked_parent_generic::<BoxPolicy<u64>>(leaf_ptr) };
assert!(matches!(result, LockedParentResult::NoParent));
drop(_leaf_lock);
let _: Box<TestLeaf> = unsafe { Box::from_raw(leaf_ptr.cast::<TestLeaf>()) };
}
#[test]
fn test_locked_parent_basic() {
let parent: Box<TestInternode> = TestInternode::new(0);
let parent_ptr: *mut TestInternode = Box::into_raw(parent);
let leaf: Box<TestLeaf> = TestLeaf::new_boxed();
leaf.set_parent(parent_ptr.cast());
let leaf_ptr: *mut u8 = Box::into_raw(leaf).cast();
unsafe { (*parent_ptr).set_child(0, leaf_ptr) };
let leaf_ref: &TestLeaf = unsafe { &*leaf_ptr.cast::<TestLeaf>() };
let _leaf_lock: LockGuard<'_> = leaf_ref.version().lock();
let result: LockedParentResult<'_> =
unsafe { NodeCleaner::locked_parent_generic::<BoxPolicy<u64>>(leaf_ptr) };
let (lock, returned_parent) = match result {
LockedParentResult::Locked(l, p) => (l, p),
_ => panic!("Expected Locked result"),
};
assert_eq!(returned_parent, parent_ptr.cast::<u8>());
let parent_ref: &TestInternode = unsafe { &*parent_ptr };
assert!(parent_ref.version().is_locked());
drop(lock);
drop(_leaf_lock);
let _: Box<TestLeaf> = unsafe { Box::from_raw(leaf_ptr.cast::<TestLeaf>()) };
let _: Box<TestInternode> = unsafe { Box::from_raw(parent_ptr) };
}
#[test]
fn test_locked_parent_returns_internode() {
let grandparent: Box<TestInternode> = TestInternode::new(1);
grandparent.version().mark_root();
let grandparent_ptr: *mut TestInternode = Box::into_raw(grandparent);
let parent: Box<TestInternode> = TestInternode::new(0);
parent.set_parent(grandparent_ptr.cast());
let parent_ptr: *mut TestInternode = Box::into_raw(parent);
unsafe { (*grandparent_ptr).set_child(0, parent_ptr.cast()) };
let leaf: Box<TestLeaf> = TestLeaf::new_boxed();
leaf.set_parent(parent_ptr.cast());
let leaf_ptr: *mut u8 = Box::into_raw(leaf).cast();
unsafe { (*parent_ptr).set_child(0, leaf_ptr) };
let leaf_ref: &TestLeaf = unsafe { &*leaf_ptr.cast::<TestLeaf>() };
let _leaf_lock: LockGuard<'_> = leaf_ref.version().lock();
let result: LockedParentResult<'_> =
unsafe { NodeCleaner::locked_parent_generic::<BoxPolicy<u64>>(leaf_ptr) };
let (lock, returned_parent) = match result {
LockedParentResult::Locked(l, p) => (l, p),
_ => panic!("Expected Locked result"),
};
assert_eq!(returned_parent, parent_ptr.cast::<u8>());
let parent_version: &NodeVersion = unsafe { &*(returned_parent.cast::<NodeVersion>()) };
assert!(!parent_version.is_leaf());
drop(lock);
drop(_leaf_lock);
let _: Box<TestLeaf> = unsafe { Box::from_raw(leaf_ptr.cast::<TestLeaf>()) };
let _: Box<TestInternode> = unsafe { Box::from_raw(parent_ptr) };
let _: Box<TestInternode> = unsafe { Box::from_raw(grandparent_ptr) };
}
#[test]
fn test_shift_internode_down_middle() {
let inode: Box<TestInternode> = TestInternode::new(0);
inode.set_ikey(0, 10);
inode.set_ikey(1, 20);
inode.set_ikey(2, 30);
inode.set_nkeys(3);
let leaves: Vec<Box<TestLeaf>> = (0..4).map(|_| TestLeaf::new_boxed()).collect();
let ptrs: Vec<*mut u8> = leaves
.into_iter()
.map(|l| Box::into_raw(l) as *mut u8)
.collect();
let (c0, c1, c2, c3) = (ptrs[0], ptrs[1], ptrs[2], ptrs[3]);
inode.set_child(0, c0);
inode.set_child(1, c1);
inode.set_child(2, c2);
inode.set_child(3, c3);
inode.set_child(2, StdPtr::null_mut());
NodeCleaner::shift_internode_down_generic::<TestInternode>(&inode, 2);
assert_eq!(inode.ikey(0), 10);
assert_eq!(inode.ikey(1), 30);
assert_eq!(unsafe { inode.child_unguarded(0) }, c0);
assert_eq!(unsafe { inode.child_unguarded(1) }, c1);
assert_eq!(unsafe { inode.child_unguarded(2) }, c3);
assert_eq!(inode.nkeys(), 2);
for ptr in ptrs {
let _: Box<TestLeaf> = unsafe { Box::from_raw(ptr.cast::<TestLeaf>()) };
}
}
#[test]
fn test_shift_internode_down_last() {
let inode: Box<TestInternode> = TestInternode::new(0);
inode.set_ikey(0, 10);
inode.set_ikey(1, 20);
inode.set_ikey(2, 30);
inode.set_nkeys(3);
let leaves: Vec<Box<TestLeaf>> = (0..4).map(|_| TestLeaf::new_boxed()).collect();
let ptrs: Vec<*mut u8> = leaves
.into_iter()
.map(|l| Box::into_raw(l) as *mut u8)
.collect();
let (c0, c1, c2, c3) = (ptrs[0], ptrs[1], ptrs[2], ptrs[3]);
inode.set_child(0, c0);
inode.set_child(1, c1);
inode.set_child(2, c2);
inode.set_child(3, c3);
inode.set_child(3, StdPtr::null_mut());
NodeCleaner::shift_internode_down_generic::<TestInternode>(&inode, 3);
assert_eq!(inode.ikey(0), 10);
assert_eq!(inode.ikey(1), 20);
assert_eq!(unsafe { inode.child_unguarded(0) }, c0);
assert_eq!(unsafe { inode.child_unguarded(1) }, c1);
assert_eq!(unsafe { inode.child_unguarded(2) }, c2);
assert_eq!(inode.nkeys(), 2);
for ptr in ptrs {
let _: Box<TestLeaf> = unsafe { Box::from_raw(ptr.cast::<TestLeaf>()) };
}
}
#[test]
fn test_shift_internode_down_second() {
let inode: Box<TestInternode> = TestInternode::new(0);
inode.set_ikey(0, 10);
inode.set_ikey(1, 20);
inode.set_nkeys(2);
let leaves: Vec<Box<TestLeaf>> = (0..3).map(|_| TestLeaf::new_boxed()).collect();
let ptrs: Vec<*mut u8> = leaves
.into_iter()
.map(|l| Box::into_raw(l) as *mut u8)
.collect();
let (c0, c1, c2) = (ptrs[0], ptrs[1], ptrs[2]);
inode.set_child(0, c0);
inode.set_child(1, c1);
inode.set_child(2, c2);
inode.set_child(1, StdPtr::null_mut());
NodeCleaner::shift_internode_down_generic::<TestInternode>(&inode, 1);
assert_eq!(inode.ikey(0), 20);
assert_eq!(unsafe { inode.child_unguarded(0) }, c0);
assert_eq!(unsafe { inode.child_unguarded(1) }, c2);
assert_eq!(inode.nkeys(), 1);
for ptr in ptrs {
let _: Box<TestLeaf> = unsafe { Box::from_raw(ptr.cast::<TestLeaf>()) };
}
}
#[test]
fn test_unlink_from_chain_middle() {
let leaf_a: Box<TestLeaf> = TestLeaf::new_boxed();
let leaf_b: Box<TestLeaf> = TestLeaf::new_boxed();
let leaf_c: Box<TestLeaf> = TestLeaf::new_boxed();
let a_ptr: *mut TestLeaf = Box::into_raw(leaf_a);
let b_ptr: *mut TestLeaf = Box::into_raw(leaf_b);
let c_ptr: *mut TestLeaf = Box::into_raw(leaf_c);
unsafe {
(*a_ptr).set_next(b_ptr);
(*b_ptr).set_prev(a_ptr);
(*b_ptr).set_next(c_ptr);
(*c_ptr).set_prev(b_ptr);
}
let b_ref: &TestLeaf = unsafe { &*b_ptr };
let _lock: LockGuard<'_> = b_ref.version().lock();
unsafe { b_ref.unlink_from_chain() };
assert_eq!(unsafe { (*a_ptr).safe_next_unguarded() }, c_ptr);
assert_eq!(unsafe { (*c_ptr).prev_unguarded() }, a_ptr);
drop(_lock);
let _: Box<TestLeaf> = unsafe { Box::from_raw(a_ptr) };
let _: Box<TestLeaf> = unsafe { Box::from_raw(b_ptr) };
let _: Box<TestLeaf> = unsafe { Box::from_raw(c_ptr) };
}
#[test]
fn test_unlink_from_chain_last() {
let leaf_a: Box<TestLeaf> = TestLeaf::new_boxed();
let leaf_b: Box<TestLeaf> = TestLeaf::new_boxed();
let a_ptr: *mut TestLeaf = Box::into_raw(leaf_a);
let b_ptr: *mut TestLeaf = Box::into_raw(leaf_b);
unsafe {
(*a_ptr).set_next(b_ptr);
(*b_ptr).set_prev(a_ptr);
}
let b_ref: &TestLeaf = unsafe { &*b_ptr };
let _lock: LockGuard<'_> = b_ref.version().lock();
unsafe { b_ref.unlink_from_chain() };
assert!(unsafe { (*a_ptr).safe_next_unguarded().is_null() });
drop(_lock);
let _: Box<TestLeaf> = unsafe { Box::from_raw(a_ptr) };
let _: Box<TestLeaf> = unsafe { Box::from_raw(b_ptr) };
}
#[test]
fn test_remove_leaf_updates_parent_child_ptr() {
let tree: TestTree = TestTree::new();
tree.insert(&50_u64.to_be_bytes(), 50);
tree.insert(&150_u64.to_be_bytes(), 150);
let removed = tree.remove(&150_u64.to_be_bytes());
assert!(removed.is_ok());
assert_val_eq!(tree.get(&50_u64.to_be_bytes()), Some(50));
assert_eq!(tree.get(&150_u64.to_be_bytes()), None);
}
#[test]
fn test_remove_leaf_leftmost_not_removed() {
let tree: TestTree = TestTree::new();
tree.insert(&42_u64.to_be_bytes(), 42);
let removed = tree.remove(&42_u64.to_be_bytes());
assert!(removed.is_ok());
assert_eq!(tree.len(), 0);
tree.insert(&100_u64.to_be_bytes(), 100);
assert_val_eq!(tree.get(&100_u64.to_be_bytes()), Some(100));
}
#[test]
fn test_redirect_via_sequential_removal() {
let tree: TestTree = TestTree::new();
eprintln!("Inserting 50 keys...");
for i in 0_u64..50 {
tree.insert(&i.to_be_bytes(), i);
}
eprintln!("Inserted 50 keys, len = {}", tree.len());
eprintln!("Removing keys 0-24...");
for i in 0_u64..25 {
eprintln!(" Removing key {}", i);
let _ = tree.remove(&i.to_be_bytes());
eprintln!(" Removed key {}, len = {}", i, tree.len());
}
eprintln!("Verifying remaining keys...");
for i in 25_u64..50 {
eprintln!(" Getting key {}", i);
assert_val_eq!(tree.get(&i.to_be_bytes()), Some(i));
}
eprintln!("Verifying removed keys are gone...");
for i in 0_u64..25 {
eprintln!(" Checking key {} is gone", i);
assert!(tree.get(&i.to_be_bytes()).is_none());
}
eprintln!("Done!");
}
#[test]
fn test_redirect_alternating_removal() {
let tree: TestTree = TestTree::new();
for i in (0_u64..100).step_by(2) {
tree.insert(&i.to_be_bytes(), i);
}
for i in (0_u64..100).step_by(4) {
let _ = tree.remove(&i.to_be_bytes());
}
for i in (0_u64..100).step_by(2) {
if i % 4 == 0 {
assert!(tree.get(&i.to_be_bytes()).is_none());
} else {
assert_val_eq!(tree.get(&i.to_be_bytes()), Some(i));
}
}
}
#[test]
#[cfg(not(miri))]
fn test_concurrent_remove_and_get() {
use std::sync::Barrier;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let done = Arc::new(AtomicBool::new(false));
let barrier = Arc::new(Barrier::new(2));
for i in 0_u64..1000 {
tree.insert(&i.to_be_bytes(), i);
}
let tree_clone = Arc::clone(&tree);
let done_clone = Arc::clone(&done);
let barrier_clone = Arc::clone(&barrier);
let reader = thread::spawn(move || {
let mut found = 0_u64;
let mut not_found = 0_u64;
barrier_clone.wait();
for i in 0_u64..100 {
let key: u64 = (i * 7) % 1000;
if tree_clone.get(&key.to_be_bytes()).is_some() {
found += 1;
} else {
not_found += 1;
}
}
while !done_clone.load(Ordering::Acquire) {
for i in 0_u64..100 {
let key: u64 = (i * 7) % 1000;
if tree_clone.get(&key.to_be_bytes()).is_some() {
found += 1;
} else {
not_found += 1;
}
}
}
(found, not_found)
});
barrier.wait();
for i in (0_u64..1000).step_by(2) {
let _ = tree.remove(&i.to_be_bytes());
}
done.store(true, Ordering::Release);
let (found, not_found) = reader.join().unwrap();
assert!(found > 0 || not_found > 0);
for i in (1_u64..1000).step_by(2) {
assert_val_eq!(tree.get(&i.to_be_bytes()), Some(i));
}
}
#[test]
#[cfg(not(miri))]
fn test_concurrent_remove_same_keys() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let removed_count = Arc::new(AtomicUsize::new(0));
for i in 0_u64..100 {
tree.insert(&i.to_be_bytes(), i);
}
let mut handles = vec![];
for _ in 0..4 {
let tree_clone = Arc::clone(&tree);
let count_clone = Arc::clone(&removed_count);
handles.push(thread::spawn(move || {
let mut local_removed = 0;
for i in 0_u64..100 {
if tree_clone.remove(&i.to_be_bytes()).unwrap().is_some() {
local_removed += 1;
}
}
count_clone.fetch_add(local_removed, Ordering::Relaxed);
}));
}
for h in handles {
let _ = h.join();
}
assert_eq!(removed_count.load(Ordering::Relaxed), 100);
assert_eq!(tree.len(), 0);
}
#[test]
#[cfg(not(miri))]
fn test_stress_remove_all_concurrent() {
use std::thread;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let key_count: u64 = 10_000;
for i in 0..key_count {
tree.insert(&i.to_be_bytes(), i);
}
let mut handles = vec![];
let threads: u64 = 8;
let keys_per_thread: u64 = key_count / threads;
for t in 0..threads {
let tree_clone = Arc::clone(&tree);
let start: u64 = t * keys_per_thread;
let end: u64 = start + keys_per_thread;
handles.push(thread::spawn(move || {
for i in start..end {
let _ = tree_clone.remove(&i.to_be_bytes());
}
}));
}
for h in handles {
let _ = h.join();
}
assert_eq!(tree.len(), 0);
for i in 0..key_count {
assert!(tree.get(&i.to_be_bytes()).is_none());
}
}
#[test]
#[cfg(not(miri))]
fn test_no_infinite_loop_deleted_node() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Duration;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let reader_done = Arc::new(AtomicBool::new(false));
for i in 0_u64..100 {
tree.insert(&i.to_be_bytes(), i);
}
let tree_clone = Arc::clone(&tree);
let done_clone = Arc::clone(&reader_done);
let reader = thread::spawn(move || {
for _ in 0..1000 {
for i in 0_u64..100 {
let _ = tree_clone.get(&i.to_be_bytes());
}
}
done_clone.store(true, Ordering::Release);
});
thread::sleep(Duration::from_millis(10));
for i in (0_u64..100).step_by(2) {
let _ = tree.remove(&i.to_be_bytes());
}
let result = reader.join();
assert!(result.is_ok());
assert!(reader_done.load(Ordering::Acquire));
}
#[test]
fn test_reader_retry_succeeds_after_coalesce() {
let tree: TestTree = TestTree::new();
tree.insert(&42_u64.to_be_bytes(), 42);
tree.insert(&100_u64.to_be_bytes(), 100);
let _ = tree.remove(&42_u64.to_be_bytes());
assert!(tree.get(&42_u64.to_be_bytes()).is_none());
assert_val_eq!(tree.get(&100_u64.to_be_bytes()), Some(100));
}
#[test]
fn test_miri_remove_single_key() {
let tree: TestTree = TestTree::new();
tree.insert(&1_u64.to_be_bytes(), 1);
assert_val_eq!(tree.remove(&1_u64.to_be_bytes()).unwrap(), Some(1));
assert!(tree.get(&1_u64.to_be_bytes()).is_none());
}
#[test]
fn test_miri_remove_multiple_keys() {
let tree: TestTree = TestTree::new();
for i in 0_u64..10 {
tree.insert(&i.to_be_bytes(), i);
}
for i in 0_u64..10 {
assert_val_eq!(tree.remove(&i.to_be_bytes()).unwrap(), Some(i));
}
assert_eq!(tree.len(), 0);
}
#[test]
fn test_miri_parent_erased_helpers() {
let parent_node: Box<TestInternode> = TestInternode::new(0);
let parent_ptr: *mut u8 = Box::into_raw(parent_node).cast();
let leaf: Box<TestLeaf> = TestLeaf::new_boxed();
let leaf_ptr: *mut u8 = Box::into_raw(leaf).cast();
unsafe {
NodeCleaner::set_parent_erased::<BoxPolicy<u64>>(leaf_ptr, parent_ptr);
}
let got: *mut u8 = unsafe { NodeCleaner::get_parent_erased::<BoxPolicy<u64>>(leaf_ptr) };
assert_eq!(got, parent_ptr);
let _: Box<TestLeaf> = unsafe { Box::from_raw(leaf_ptr.cast::<TestLeaf>()) };
let _: Box<TestInternode> = unsafe { Box::from_raw(parent_ptr.cast::<TestInternode>()) };
}
#[test]
fn test_coalesce_safety_no_infinite_loop() {
let tree: TestTree = TestTree::new();
for i in 0_u64..50 {
tree.insert(&i.to_be_bytes(), i);
}
for i in 0_u64..50 {
let _ = tree.remove(&i.to_be_bytes());
}
let guard = tree.guard();
let processed = tree.process_coalesce(&guard);
assert!(
processed > 0,
"Expected some coalesce entries to be processed"
);
assert_eq!(tree.len(), 0);
for i in 100_u64..110 {
tree.insert(&i.to_be_bytes(), i);
}
for i in 100_u64..110 {
assert_val_eq!(tree.get(&i.to_be_bytes()), Some(i));
}
}
#[test]
#[cfg(not(miri))]
fn test_coalesce_concurrent_with_reads() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let test_complete = Arc::new(AtomicBool::new(false));
for i in 0_u64..100 {
tree.insert(&i.to_be_bytes(), i);
}
for i in 0_u64..50 {
let _ = tree.remove(&i.to_be_bytes());
}
let tree_reader = Arc::clone(&tree);
let complete_reader = Arc::clone(&test_complete);
let reader = thread::spawn(move || {
while !complete_reader.load(Ordering::Acquire) {
for i in 50_u64..100 {
let _ = tree_reader.get(&i.to_be_bytes());
}
}
});
let tree_coalesce = Arc::clone(&tree);
let coalesce_result = thread::spawn(move || {
let guard = tree_coalesce.guard();
tree_coalesce.process_coalesce(&guard)
});
let result = coalesce_result.join();
test_complete.store(true, Ordering::Release);
let _ = reader.join();
assert!(result.is_ok(), "Coalesce should not panic");
for i in 50_u64..100 {
assert_val_eq!(tree.get(&i.to_be_bytes()), Some(i));
}
}
#[test]
fn test_insert_through_deleted_nodes() {
let tree: TestTree = TestTree::new();
for i in 0_u64..30 {
tree.insert(&i.to_be_bytes(), i);
}
for i in 10_u64..20 {
let _ = tree.remove(&i.to_be_bytes());
}
let guard = tree.guard();
let _ = tree.process_coalesce(&guard);
for i in 10_u64..20 {
tree.insert(&i.to_be_bytes(), i * 10);
}
for i in 0_u64..10 {
assert_val_eq!(tree.get(&i.to_be_bytes()), Some(i));
}
for i in 10_u64..20 {
assert_val_eq!(tree.get(&i.to_be_bytes()), Some(i * 10));
}
for i in 20_u64..30 {
assert_val_eq!(tree.get(&i.to_be_bytes()), Some(i));
}
}
#[test]
fn test_coalesce_multiple_cycles() {
let tree: TestTree = TestTree::new();
let guard = tree.guard();
for cycle in 0..5 {
let base: u64 = cycle * 100;
for i in 0_u64..50 {
tree.insert(&(base + i).to_be_bytes(), base + i);
}
for i in 0_u64..50 {
let _ = tree.remove(&(base + i).to_be_bytes());
}
let processed = tree.process_coalesce(&guard);
assert!(processed > 0, "Cycle {cycle}: should process some entries");
assert_eq!(tree.len(), 0, "Cycle {cycle}: tree should be empty");
assert_eq!(
tree.pending_coalesce(),
0,
"Cycle {cycle}: pending coalesce should be 0"
);
}
}
#[test]
fn test_coalesce_preserves_leftmost_leaf() {
let tree: TestTree = TestTree::new();
let guard = tree.guard();
for i in 0_u64..10 {
tree.insert(&i.to_be_bytes(), i);
}
for i in 0_u64..10 {
let _ = tree.remove(&i.to_be_bytes());
}
let _ = tree.process_coalesce(&guard);
for i in 0_u64..10 {
tree.insert(&i.to_be_bytes(), i * 2);
}
for i in 0_u64..10 {
assert_val_eq!(tree.get(&i.to_be_bytes()), Some(i * 2));
}
}
#[test]
fn test_coalesce_interleaved_operations() {
let tree: TestTree = TestTree::new();
let guard = tree.guard();
for i in 0_u64..100 {
tree.insert(&i.to_be_bytes(), i);
}
for i in 0_u64..25 {
let _ = tree.remove(&i.to_be_bytes());
}
let _ = tree.process_coalesce(&guard);
for i in 0_u64..25 {
tree.insert(&i.to_be_bytes(), i + 1000);
}
for i in 50_u64..75 {
let _ = tree.remove(&i.to_be_bytes());
}
let _ = tree.process_coalesce(&guard);
for i in 50_u64..75 {
tree.insert(&i.to_be_bytes(), i + 2000);
}
for i in 0_u64..25 {
assert_val_eq!(
tree.get(&i.to_be_bytes()),
Some(i + 1000),
"Key {i} should have value {}",
i + 1000
);
}
for i in 25_u64..50 {
assert_val_eq!(
tree.get(&i.to_be_bytes()),
Some(i),
"Key {i} should have original value"
);
}
for i in 50_u64..75 {
assert_val_eq!(
tree.get(&i.to_be_bytes()),
Some(i + 2000),
"Key {i} should have value {}",
i + 2000
);
}
for i in 75_u64..100 {
assert_val_eq!(
tree.get(&i.to_be_bytes()),
Some(i),
"Key {i} should have original value"
);
}
}
#[test]
#[expect(clippy::panic)]
fn test_coalesce_batch_processing() {
let tree: TestTree = TestTree::new();
let guard = tree.guard();
for i in 0_u64..200 {
tree.insert(&i.to_be_bytes(), i);
}
for i in 0_u64..200 {
let _ = tree.remove(&i.to_be_bytes());
}
let initial_pending = tree.pending_coalesce();
assert!(initial_pending > 0, "Should have pending coalesce entries");
let mut total_processed: usize = 0;
let batch_limit: usize = 5;
while tree.pending_coalesce() > 0 {
let processed = tree.process_coalesce_batch(&guard, batch_limit);
total_processed += processed;
assert!(
processed <= batch_limit,
"Batch processed {processed}, expected <= {batch_limit}"
);
if total_processed > initial_pending * 3 {
panic!("Too many iterations, possible infinite loop");
}
}
assert_eq!(
tree.pending_coalesce(),
0,
"All entries should be processed"
);
}
#[test]
#[cfg(not(miri))]
fn test_coalesce_concurrent_with_writers() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Duration;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let stop_flag = Arc::new(AtomicBool::new(false));
for i in 0_u64..100 {
tree.insert(&i.to_be_bytes(), i);
}
let tree_writer = Arc::clone(&tree);
let stop_writer = Arc::clone(&stop_flag);
let writer = thread::spawn(move || {
let mut counter: u64 = 1000;
while !stop_writer.load(Ordering::Acquire) {
let key = counter;
let _ = tree_writer.insert(&key.to_be_bytes(), key);
let remove_key = (counter % 200) + 100;
tree_writer.remove(&remove_key.to_be_bytes()).ok();
counter += 1;
if counter > 10000 {
counter = 1000;
}
}
});
let tree_coalesce = Arc::clone(&tree);
let stop_coalesce = Arc::clone(&stop_flag);
let coalescer = thread::spawn(move || {
let mut cycles: usize = 0;
while !stop_coalesce.load(Ordering::Acquire) {
let guard = tree_coalesce.guard();
let _ = tree_coalesce.process_coalesce(&guard);
cycles += 1;
thread::yield_now();
}
cycles
});
thread::sleep(Duration::from_millis(100));
stop_flag.store(true, Ordering::Release);
let writer_result = writer.join();
let coalesce_result = coalescer.join();
assert!(writer_result.is_ok(), "Writer should not panic");
assert!(coalesce_result.is_ok(), "Coalescer should not panic");
let cycles = coalesce_result.unwrap();
assert!(cycles > 0, "Should have run some coalesce cycles");
}
#[test]
fn test_coalesce_empty_tree() {
let tree: TestTree = TestTree::new();
let guard = tree.guard();
assert_eq!(tree.pending_coalesce(), 0);
let processed = tree.process_coalesce(&guard);
assert_eq!(processed, 0, "Empty tree should process 0 entries");
}
#[test]
fn test_coalesce_with_range_scan() {
use crate::RangeBound;
let tree: TestTree = TestTree::new();
let guard = tree.guard();
for i in (0_u64..100).step_by(2) {
tree.insert(&i.to_be_bytes(), i);
}
for i in (20_u64..40).step_by(2) {
let _ = tree.remove(&i.to_be_bytes());
}
let _ = tree.process_coalesce(&guard);
let mut found: Vec<u64> = Vec::new();
tree.scan(
RangeBound::Unbounded,
RangeBound::Unbounded,
|k: &[u8], v: ValuePtr<u64>| {
let key = u64::from_be_bytes(k.try_into().unwrap());
found.push(key);
assert_eq!(*v, key, "Value should match key");
true
},
&guard,
);
let expected: Vec<u64> = (0_u64..100)
.step_by(2)
.filter(|&i| !(20..40).contains(&i))
.collect();
assert_eq!(found, expected, "Range scan should return correct keys");
}
#[test]
fn test_coalesce_stress_rapid_cycles() {
let tree: TestTree = TestTree::new();
let guard = tree.guard();
for cycle in 0_u64..20 {
for i in 0_u64..20 {
tree.insert(&(cycle * 100 + i).to_be_bytes(), i);
}
for i in 0_u64..20 {
let _ = tree.remove(&(cycle * 100 + i).to_be_bytes());
}
tree.process_coalesce(&guard);
}
assert_eq!(tree.len(), 0);
assert_eq!(tree.pending_coalesce(), 0);
for i in 0_u64..50 {
tree.insert(&i.to_be_bytes(), i);
}
for i in 0_u64..50 {
assert_val_eq!(tree.get(&i.to_be_bytes()), Some(i));
}
}
#[test]
fn test_gc_layer_basic_sublayer_cleanup() {
let tree: TestTree = TestTree::new();
let guard = tree.guard();
let key1 = b"prefix00A";
let key2 = b"prefix00B";
tree.insert(key1, 1);
tree.insert(key2, 2);
assert_eq!(tree.len(), 2);
assert_val_eq!(tree.get(key1), Some(1));
assert_val_eq!(tree.get(key2), Some(2));
assert_val_eq!(tree.remove(key1).unwrap(), Some(1));
assert_val_eq!(tree.remove(key2).unwrap(), Some(2));
assert_eq!(tree.len(), 0);
let processed = tree.process_coalesce(&guard);
assert!(processed > 0, "Should process the empty sublayer");
tree.insert(key1, 10);
assert_val_eq!(tree.get(key1), Some(10));
}
#[test]
fn test_gc_layer_multiple_sublayers() {
let tree: TestTree = TestTree::new();
let guard = tree.guard();
let prefixes = [b"aaaaaaaa", b"bbbbbbbb", b"cccccccc"];
let suffixes = [b"1", b"2", b"3"];
for prefix in &prefixes {
for (i, suffix) in suffixes.iter().enumerate() {
let mut key = Vec::with_capacity(9);
key.extend_from_slice(*prefix);
key.extend_from_slice(*suffix);
tree.insert(&key, i as u64);
}
}
assert_eq!(tree.len(), 9);
for suffix in &suffixes {
let mut key = Vec::with_capacity(9);
key.extend_from_slice(b"aaaaaaaa");
key.extend_from_slice(*suffix);
let _ = tree.remove(&key);
}
assert_eq!(tree.len(), 6);
tree.process_coalesce(&guard);
assert_val_eq!(tree.get(b"bbbbbbbb1"), Some(0));
assert_val_eq!(tree.get(b"cccccccc2"), Some(1));
tree.insert(b"aaaaaaaaX", 99);
assert_val_eq!(tree.get(b"aaaaaaaaX"), Some(99));
}
#[test]
fn test_gc_layer_deep_chain() {
let tree: TestTree = TestTree::new();
let guard = tree.guard();
let key_l2_a = b"level000level001level002A";
let key_l2_b = b"level000level001level002B";
tree.insert(key_l2_a, 1);
tree.insert(key_l2_b, 2);
assert_eq!(tree.len(), 2);
let _ = tree.remove(key_l2_a);
assert_eq!(tree.len(), 1);
tree.process_coalesce(&guard);
assert_val_eq!(tree.get(key_l2_b), Some(2));
let _ = tree.remove(key_l2_b);
assert_eq!(tree.len(), 0);
tree.process_coalesce(&guard);
tree.insert(key_l2_a, 100);
assert_val_eq!(tree.get(key_l2_a), Some(100));
}
#[test]
fn test_gc_layer_preserves_siblings() {
let tree: TestTree = TestTree::new();
let guard = tree.guard();
let key_a1 = b"prefix_Akey1";
let key_a2 = b"prefix_Akey2";
let key_b1 = b"prefix_Bkey1";
let key_b2 = b"prefix_Bkey2";
tree.insert(key_a1, 1);
tree.insert(key_a2, 2);
tree.insert(key_b1, 3);
tree.insert(key_b2, 4);
assert_eq!(tree.len(), 4);
let _ = tree.remove(key_a1);
let _ = tree.remove(key_a2);
assert_eq!(tree.len(), 2);
tree.process_coalesce(&guard);
assert_val_eq!(tree.get(key_b1), Some(3));
assert_val_eq!(tree.get(key_b2), Some(4));
tree.insert(key_a1, 10);
assert_val_eq!(tree.get(key_a1), Some(10));
}
#[test]
#[cfg_attr(miri, ignore)] fn test_gc_layer_concurrent_reads() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let done = Arc::new(AtomicBool::new(false));
let key1 = b"sublayer0key1xxx";
let key2 = b"sublayer0key2xxx";
tree.insert(key1, 1);
tree.insert(key2, 2);
for i in 0_u64..10 {
tree.insert(&i.to_be_bytes(), i);
}
let tree_reader = Arc::clone(&tree);
let done_reader = Arc::clone(&done);
let reader = thread::spawn(move || {
while !done_reader.load(Ordering::Acquire) {
for i in 0_u64..10 {
let _ = tree_reader.get(&i.to_be_bytes());
}
let _ = tree_reader.get(b"sublayer0key1xxx");
}
});
let _ = tree.remove(key1);
let _ = tree.remove(key2);
let guard = tree.guard();
tree.process_coalesce(&guard);
done.store(true, Ordering::Release);
#[expect(clippy::expect_used, reason = "test code - panicking is appropriate")]
reader.join().expect("Reader thread panicked");
for i in 0_u64..10 {
assert_val_eq!(tree.get(&i.to_be_bytes()), Some(i));
}
}
#[test]
fn test_gc_layer_slot_changed() {
let tree: TestTree = TestTree::new();
let guard = tree.guard();
let key1 = b"changedXXkey1";
let key2 = b"changedXXkey2";
tree.insert(key1, 1);
tree.insert(key2, 2);
let _ = tree.remove(key1);
let _ = tree.remove(key2);
tree.insert(b"changedXXnewkey", 99);
tree.process_coalesce(&guard);
assert_val_eq!(tree.get(b"changedXXnewkey"), Some(99));
}
#[test]
fn test_gc_layer_stress() {
let tree: TestTree = TestTree::new();
let guard = tree.guard();
for cycle in 0_u32..50 {
let prefix = format!("cyc{cycle:05}");
let key1 = format!("{prefix}key1");
let key2 = format!("{prefix}key2");
let key3 = format!("{prefix}key3");
tree.insert(key1.as_bytes(), cycle as u64);
tree.insert(key2.as_bytes(), cycle as u64 + 1);
tree.insert(key3.as_bytes(), cycle as u64 + 2);
let _ = tree.remove(key1.as_bytes());
let _ = tree.remove(key2.as_bytes());
let _ = tree.remove(key3.as_bytes());
if cycle % 5 == 4 {
tree.process_coalesce(&guard);
}
}
tree.process_coalesce(&guard);
assert_eq!(tree.len(), 0);
tree.insert(b"finaltest!", 12345);
assert_val_eq!(tree.get(b"finaltest!"), Some(12345));
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_gc_layer_requeue_under_parent_contention() {
use std::sync::Barrier;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let key_a = b"parentXXchild_a!";
let key_b = b"parentXXchild_b!";
tree.insert(key_a, 1);
tree.insert(key_b, 2);
let sibling = b"siblingZ";
tree.insert(sibling, 99);
let _ = tree.remove(key_a);
let _ = tree.remove(key_b);
assert!(tree.pending_coalesce() > 0);
let barrier = Arc::new(Barrier::new(2));
let done = Arc::new(AtomicBool::new(false));
let writer_ops = Arc::new(AtomicUsize::new(0));
let tree_w = Arc::clone(&tree);
let barrier_w = Arc::clone(&barrier);
let done_w = Arc::clone(&done);
let ops_w = Arc::clone(&writer_ops);
let writer = thread::spawn(move || {
barrier_w.wait();
let mut i: u64 = 1000;
while !done_w.load(Ordering::Acquire) {
tree_w.insert(sibling, i);
i += 1;
ops_w.fetch_add(1, Ordering::Relaxed);
}
});
barrier.wait();
thread::sleep(Duration::from_millis(1));
let guard = tree.guard();
let mut total_processed = 0;
for _ in 0..100 {
total_processed += tree.process_coalesce(&guard);
if tree.pending_coalesce() == 0 {
break;
}
thread::yield_now();
}
done.store(true, Ordering::Release);
#[expect(clippy::expect_used, reason = "test code")]
writer.join().expect("writer panicked");
assert!(
total_processed > 0,
"coalesce should have processed the gc_layer entry"
);
assert!(tree.get(sibling).is_some());
tree.insert(key_a, 42);
assert_val_eq!(tree.get(key_a), Some(42));
}
#[test]
fn test_gc_layer_requeue_preserves_context_chain() {
let tree: TestTree = TestTree::new();
let guard = tree.guard();
let key1 = b"layer000layer001deep_key_A_here";
let key2 = b"layer000layer001deep_key_B_here";
tree.insert(key1, 1);
tree.insert(key2, 2);
assert_eq!(tree.len(), 2);
let _ = tree.remove(key1);
let _ = tree.remove(key2);
assert_eq!(tree.len(), 0);
let mut rounds = 0;
while tree.pending_coalesce() > 0 && rounds < 50 {
tree.process_coalesce_batch(&guard, 1);
rounds += 1;
}
assert_eq!(
tree.pending_coalesce(),
0,
"coalesce queue should be drained after sufficient rounds"
);
tree.insert(key1, 100);
assert_val_eq!(tree.get(key1), Some(100));
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_gc_layer_max_requeue_drop() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Duration;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let key1 = b"requeueXchild01!";
let key2 = b"requeueXchild02!";
tree.insert(key1, 1);
tree.insert(key2, 2);
let anchor = b"requeueY";
tree.insert(anchor, 999);
let _ = tree.remove(key1);
let _ = tree.remove(key2);
assert!(tree.pending_coalesce() > 0);
let done = Arc::new(AtomicBool::new(false));
let tree_w = Arc::clone(&tree);
let done_w = Arc::clone(&done);
let writer = thread::spawn(move || {
let mut i: u64 = 0;
while !done_w.load(Ordering::Acquire) {
tree_w.insert(anchor, i);
i += 1;
}
});
thread::sleep(Duration::from_millis(2));
let guard = tree.guard();
for _ in 0..200 {
tree.process_coalesce(&guard);
}
done.store(true, Ordering::Release);
#[expect(clippy::expect_used, reason = "test code")]
writer.join().expect("writer panicked");
assert_eq!(
tree.pending_coalesce(),
0,
"queue should be drained (either processed or dropped)"
);
assert!(tree.get(anchor).is_some());
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_gc_layer_concurrent_insert_remove_cycle() {
use std::sync::Barrier;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let barrier = Arc::new(Barrier::new(5));
let done = Arc::new(AtomicBool::new(false));
let handles: Vec<_> = (0..4)
.map(|tid: u32| {
let tree = Arc::clone(&tree);
let barrier = Arc::clone(&barrier);
let done = Arc::clone(&done);
thread::spawn(move || {
let prefix = format!("pfx{tid:05}");
let suffix_a = "keyA!!!!".to_string();
let suffix_b = "keyB!!!!".to_string();
let key_a: Vec<u8> = [prefix.as_bytes(), suffix_a.as_bytes()].concat();
let key_b: Vec<u8> = [prefix.as_bytes(), suffix_b.as_bytes()].concat();
barrier.wait();
let mut cycle: u64 = 0;
while !done.load(Ordering::Acquire) {
tree.insert(&key_a, cycle);
tree.insert(&key_b, cycle + 1);
let _ = tree.remove(&key_a);
let _ = tree.remove(&key_b);
let guard = tree.guard();
tree.process_coalesce(&guard);
cycle += 2;
}
})
})
.collect();
barrier.wait();
thread::sleep(std::time::Duration::from_millis(100));
done.store(true, Ordering::Release);
for h in handles {
#[expect(clippy::expect_used, reason = "test code")]
h.join().expect("worker panicked");
}
let guard = tree.guard();
tree.process_coalesce(&guard);
tree.insert(b"pfx00000verify!", 1);
assert_val_eq!(tree.get(b"pfx00000verify!"), Some(1));
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_gc_layer_does_not_block_concurrent_reads() {
use std::sync::Barrier;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::thread;
use std::time::Duration;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let drain_a = b"blockXXXchild_a!";
let drain_b = b"blockXXXchild_b!";
tree.insert(drain_a, 1);
tree.insert(drain_b, 2);
let probe = b"blockYYY";
tree.insert(probe, 100);
let barrier = Arc::new(Barrier::new(3));
let done = Arc::new(AtomicBool::new(false));
let reader_completed = Arc::new(AtomicU64::new(0));
let tree_r = Arc::clone(&tree);
let barrier_r = Arc::clone(&barrier);
let done_r = Arc::clone(&done);
let completed = Arc::clone(&reader_completed);
let reader = thread::spawn(move || {
barrier_r.wait();
while !done_r.load(Ordering::Acquire) {
let _ = tree_r.get(probe);
completed.fetch_add(1, Ordering::Relaxed);
}
});
let tree_c = Arc::clone(&tree);
let barrier_c = Arc::clone(&barrier);
let done_c = Arc::clone(&done);
let coalescer = thread::spawn(move || {
barrier_c.wait();
let mut cycle: u64 = 0;
while !done_c.load(Ordering::Acquire) {
let _ = tree_c.remove(drain_a);
let _ = tree_c.remove(drain_b);
let guard = tree_c.guard();
tree_c.process_coalesce(&guard);
tree_c.insert(drain_a, cycle);
tree_c.insert(drain_b, cycle + 1);
cycle += 2;
}
});
barrier.wait();
thread::sleep(Duration::from_millis(100));
done.store(true, Ordering::Release);
#[expect(clippy::expect_used, reason = "test code")]
reader.join().expect("reader panicked");
#[expect(clippy::expect_used, reason = "test code")]
coalescer.join().expect("coalescer panicked");
let reads = reader_completed.load(Ordering::Relaxed);
assert!(
reads > 100,
"reader only completed {reads} reads in 100ms, \
gc_layer may be holding locks too long"
);
assert_val_eq!(tree.get(probe), Some(100));
}
#[test]
#[cfg(not(miri))]
fn test_concurrent_remove_and_get_long_keys() {
use std::sync::Barrier;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let barrier = Arc::new(Barrier::new(3));
let done = Arc::new(AtomicBool::new(false));
let make_key = |i: u64| format!("long_key_prefix_{i:06}").into_bytes();
for i in 0_u64..500 {
tree.insert(&make_key(i), i);
}
let tree_r = Arc::clone(&tree);
let done_r = Arc::clone(&done);
let barrier_r = Arc::clone(&barrier);
let reader = thread::spawn(move || {
barrier_r.wait();
let mut reads = 0_u64;
for i in 0_u64..500 {
let _ = tree_r.get(&make_key(i));
reads += 1;
}
while !done_r.load(Ordering::Acquire) {
for i in 0_u64..500 {
let _ = tree_r.get(&make_key(i));
reads += 1;
}
}
reads
});
let tree_w = Arc::clone(&tree);
let barrier_w = Arc::clone(&barrier);
let writer = thread::spawn(move || {
barrier_w.wait();
for i in (0_u64..500).step_by(2) {
let _ = tree_w.remove(&make_key(i));
}
});
barrier.wait();
writer.join().unwrap();
done.store(true, Ordering::Release);
let reads = reader.join().unwrap();
assert!(reads > 0, "reader must complete some reads");
for i in (1_u64..500).step_by(2) {
assert_val_eq!(
tree.get(&make_key(i)),
Some(i),
"odd key {i} missing after concurrent remove"
);
}
for i in (0_u64..500).step_by(2) {
assert!(
tree.get(&make_key(i)).is_none(),
"even key {i} still present after remove"
);
}
}
#[test]
#[cfg(not(miri))]
fn test_concurrent_multi_reader_multi_writer_remove() {
use std::sync::Barrier;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let n_readers = 4_usize;
let n_writers = 4_usize;
let key_count = 2000_u64;
let barrier = Arc::new(Barrier::new(n_readers + n_writers));
let done = Arc::new(AtomicBool::new(false));
for i in 0..key_count {
tree.insert(&i.to_be_bytes(), i);
}
let mut handles = vec![];
for _ in 0..n_readers {
let tree_c = Arc::clone(&tree);
let done_c = Arc::clone(&done);
let barrier_c = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
barrier_c.wait();
let mut reads = 0_u64;
for i in 0..key_count {
let _ = tree_c.get(&i.to_be_bytes());
reads += 1;
}
while !done_c.load(Ordering::Acquire) {
for i in 0..key_count {
let _ = tree_c.get(&i.to_be_bytes());
reads += 1;
}
}
reads
}));
}
let keys_per_writer = key_count / n_writers as u64;
for t in 0..n_writers {
let tree_c = Arc::clone(&tree);
let barrier_c = Arc::clone(&barrier);
let start = t as u64 * keys_per_writer;
let end = start + keys_per_writer;
handles.push(thread::spawn(move || {
barrier_c.wait();
for i in start..end {
let _ = tree_c.remove(&i.to_be_bytes());
}
0_u64
}));
}
for h in handles.drain(n_readers..) {
h.join().unwrap();
}
done.store(true, Ordering::Release);
for h in handles {
let reads = h.join().unwrap();
assert!(reads > 0, "reader must complete some reads");
}
assert_eq!(tree.len(), 0);
}
#[test]
fn test_remove_then_get_immediate() {
let tree: TestTree = TestTree::new();
for i in 0_u64..200 {
tree.insert(&i.to_be_bytes(), i);
}
for i in 0_u64..200 {
assert_val_eq!(tree.get(&i.to_be_bytes()), Some(i));
let removed = tree.remove(&i.to_be_bytes()).unwrap();
assert_val_eq!(removed, Some(i));
assert!(tree.get(&i.to_be_bytes()).is_none());
}
assert_eq!(tree.len(), 0);
}
#[test]
fn test_remove_reinsert_get_cycle_many() {
let tree: TestTree = TestTree::new();
for i in 0_u64..100 {
tree.insert(&i.to_be_bytes(), i);
}
for i in 0_u64..100 {
let _ = tree.remove(&i.to_be_bytes());
}
assert_eq!(tree.len(), 0);
for i in 0_u64..100 {
tree.insert(&i.to_be_bytes(), i + 1000);
}
for i in 0_u64..100 {
assert_val_eq!(tree.get(&i.to_be_bytes()), Some(i + 1000));
}
}
#[test]
#[cfg(not(miri))]
fn test_concurrent_remove_reinsert_get() {
use std::sync::Barrier;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let barrier = Arc::new(Barrier::new(3));
let done = Arc::new(AtomicBool::new(false));
let key_count = 200_u64;
for i in 0..key_count {
tree.insert(&i.to_be_bytes(), i);
}
let tree_rem = Arc::clone(&tree);
let barrier_rem = Arc::clone(&barrier);
let done_rem = Arc::clone(&done);
let remover = thread::spawn(move || {
barrier_rem.wait();
while !done_rem.load(Ordering::Relaxed) {
for i in (0..key_count).step_by(2) {
let _ = tree_rem.remove(&i.to_be_bytes());
}
}
});
let tree_ins = Arc::clone(&tree);
let barrier_ins = Arc::clone(&barrier);
let done_ins = Arc::clone(&done);
let inserter = thread::spawn(move || {
barrier_ins.wait();
while !done_ins.load(Ordering::Relaxed) {
for i in (0..key_count).step_by(2) {
tree_ins.insert(&i.to_be_bytes(), i + 5000);
}
}
});
barrier.wait();
thread::sleep(std::time::Duration::from_millis(50));
done.store(true, Ordering::Relaxed);
remover.join().unwrap();
inserter.join().unwrap();
for i in (1..key_count).step_by(2) {
assert_val_eq!(
tree.get(&i.to_be_bytes()),
Some(i),
"odd key {i} corrupted by concurrent remove+reinsert"
);
}
}
#[test]
fn test_remove_single_leaf_boundary() {
let tree: TestTree = TestTree::new();
for i in 0_u64..15 {
tree.insert(&i.to_be_bytes(), i);
}
for i in 0_u64..15 {
assert_val_eq!(tree.get(&i.to_be_bytes()), Some(i));
let _ = tree.remove(&i.to_be_bytes());
assert!(tree.get(&i.to_be_bytes()).is_none());
}
assert_eq!(tree.len(), 0);
}
#[test]
fn test_remove_across_split_boundary() {
let tree: TestTree = TestTree::new();
for i in 0_u64..30 {
tree.insert(&i.to_be_bytes(), i);
}
assert!(tree.len() == 30);
for i in 0_u64..15 {
let removed = tree.remove(&i.to_be_bytes()).unwrap();
assert_val_eq!(removed, Some(i));
}
for i in 15_u64..30 {
assert_val_eq!(tree.get(&i.to_be_bytes()), Some(i));
}
for i in 15_u64..30 {
let removed = tree.remove(&i.to_be_bytes()).unwrap();
assert_val_eq!(removed, Some(i));
}
assert_eq!(tree.len(), 0);
}
#[test]
fn test_remove_suffix_keys() {
let tree: TestTree = TestTree::new();
let keys: Vec<Vec<u8>> = (0_u64..20)
.map(|i| {
let mut k = b"sameprefix".to_vec();
k.extend_from_slice(&i.to_be_bytes());
k
})
.collect();
for (i, key) in keys.iter().enumerate() {
tree.insert(key, i as u64);
}
for i in (0..20).step_by(2) {
let removed = tree.remove(&keys[i]).unwrap();
assert_val_eq!(removed, Some(i as u64));
}
for i in (1..20).step_by(2) {
assert_val_eq!(tree.get(&keys[i]), Some(i as u64));
}
for i in (0..20).step_by(2) {
assert!(tree.get(&keys[i]).is_none());
}
}
#[test]
#[cfg(not(miri))]
fn test_concurrent_remove_and_get_suffix_keys() {
use std::sync::Barrier;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let barrier = Arc::new(Barrier::new(2));
let done = Arc::new(AtomicBool::new(false));
let make_key = |i: u64| {
let mut k = b"shared__".to_vec(); k.extend_from_slice(&i.to_be_bytes());
k
};
for i in 0_u64..300 {
tree.insert(&make_key(i), i);
}
let tree_r = Arc::clone(&tree);
let done_r = Arc::clone(&done);
let barrier_r = Arc::clone(&barrier);
let reader = thread::spawn(move || {
barrier_r.wait();
let mut reads = 0_u64;
for i in 0_u64..300 {
let _ = tree_r.get(&make_key(i));
reads += 1;
}
while !done_r.load(Ordering::Acquire) {
for i in 0_u64..300 {
let _ = tree_r.get(&make_key(i));
reads += 1;
}
}
reads
});
barrier.wait();
for i in (0_u64..300).step_by(3) {
let _ = tree.remove(&make_key(i));
}
done.store(true, Ordering::Release);
let reads = reader.join().unwrap();
assert!(reads > 0);
for i in 0_u64..300 {
if i % 3 == 0 {
assert!(
tree.get(&make_key(i)).is_none(),
"key {i} should be removed"
);
} else {
assert_val_eq!(tree.get(&make_key(i)), Some(i), "key {i} should survive");
}
}
}
#[test]
fn test_double_remove_returns_none() {
let tree: TestTree = TestTree::new();
tree.insert(&1_u64.to_be_bytes(), 100);
let first = tree.remove(&1_u64.to_be_bytes()).unwrap();
assert_val_eq!(first, Some(100));
let second = tree.remove(&1_u64.to_be_bytes()).unwrap();
assert!(second.is_none());
assert!(tree.get(&1_u64.to_be_bytes()).is_none());
}
#[test]
fn test_remove_empty_key_edge() {
let tree: TestTree = TestTree::new();
tree.insert(b"", 999);
assert_val_eq!(tree.get(b""), Some(999));
let removed = tree.remove(b"").unwrap();
assert_val_eq!(removed, Some(999));
assert!(tree.get(b"").is_none());
let again = tree.remove(b"").unwrap();
assert!(again.is_none());
}
#[test]
fn test_remove_with_guard_correctness() {
let tree: TestTree = TestTree::new();
for i in 0_u64..50 {
tree.insert(&i.to_be_bytes(), i * 10);
}
let guard = tree.guard();
for i in 0_u64..50 {
let result = tree.remove_with_guard(&i.to_be_bytes(), &guard);
match result {
Ok(Some(val)) => assert_eq!(*val, i * 10),
other => panic!("expected Ok(Some({}))), got {other:?}", i * 10),
}
assert!(tree.get_with_guard(&i.to_be_bytes(), &guard).is_none());
}
}
#[test]
#[cfg(not(miri))]
fn test_concurrent_get_during_remove_insert_churn() {
use std::sync::Barrier;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::thread;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let barrier = Arc::new(Barrier::new(3));
let done = Arc::new(AtomicBool::new(false));
let reader_ops = Arc::new(AtomicU64::new(0));
let key_count = 20_u64;
for i in 0..key_count {
tree.insert(&i.to_be_bytes(), i);
}
let tree_churn = Arc::clone(&tree);
let barrier_churn = Arc::clone(&barrier);
let done_churn = Arc::clone(&done);
let churner = thread::spawn(move || {
barrier_churn.wait();
let mut cycles = 0_u64;
while !done_churn.load(Ordering::Relaxed) {
for i in 0..key_count {
let _ = tree_churn.remove(&i.to_be_bytes());
tree_churn.insert(&i.to_be_bytes(), i + cycles * 1000);
}
cycles += 1;
}
cycles
});
let tree_r = Arc::clone(&tree);
let barrier_r = Arc::clone(&barrier);
let done_r = Arc::clone(&done);
let ops = Arc::clone(&reader_ops);
let reader = thread::spawn(move || {
barrier_r.wait();
while !done_r.load(Ordering::Relaxed) {
for i in 0..key_count {
let val = tree_r.get(&i.to_be_bytes());
if let Some(v) = val {
assert!(v >= i, "key {i} returned unexpected value {v}");
}
ops.fetch_add(1, Ordering::Relaxed);
}
}
});
barrier.wait();
thread::sleep(std::time::Duration::from_millis(100));
done.store(true, Ordering::Relaxed);
let cycles = churner.join().unwrap();
reader.join().unwrap();
let total_reads = reader_ops.load(Ordering::Relaxed);
assert!(cycles > 0, "churner must complete some cycles");
assert!(total_reads > 0, "reader must complete some operations");
}
#[test]
fn test_remove_all_large_tree_then_get() {
let tree: TestTree = TestTree::new();
let n = 500_u64;
for i in 0..n {
tree.insert(&i.to_be_bytes(), i);
}
for i in (0..n).rev() {
let removed = tree.remove(&i.to_be_bytes()).unwrap();
assert_val_eq!(removed, Some(i));
}
assert_eq!(tree.len(), 0);
for i in 0..n {
assert!(tree.get(&i.to_be_bytes()).is_none());
}
}
#[test]
#[cfg(not(miri))]
fn test_concurrent_get_with_guard_during_remove() {
use std::sync::Barrier;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
let tree: Arc<TestTree> = Arc::new(TestTree::new());
let barrier = Arc::new(Barrier::new(2));
let done = Arc::new(AtomicBool::new(false));
for i in 0_u64..1000 {
tree.insert(&i.to_be_bytes(), i);
}
let tree_r = Arc::clone(&tree);
let done_r = Arc::clone(&done);
let barrier_r = Arc::clone(&barrier);
let reader = thread::spawn(move || {
let guard = tree_r.guard();
barrier_r.wait();
let mut ops = 0_u64;
for i in 0_u64..1000 {
let _ = tree_r.get_with_guard(&i.to_be_bytes(), &guard);
ops += 1;
}
while !done_r.load(Ordering::Acquire) {
for i in 0_u64..1000 {
let _ = tree_r.get_with_guard(&i.to_be_bytes(), &guard);
ops += 1;
}
}
ops
});
barrier.wait();
{
let guard = tree.guard();
for i in 0_u64..1000 {
let _ = tree.remove_with_guard(&i.to_be_bytes(), &guard);
}
}
done.store(true, Ordering::Release);
let ops = reader.join().unwrap();
assert!(ops > 0);
assert_eq!(tree.len(), 0);
}
#[test]
fn test_routing_correct_after_cascade_coalesce() {
let tree = TestTree::new();
let key_count: u64 = 500;
for i in 0..key_count {
tree.insert(&i.to_be_bytes(), i);
}
assert_eq!(tree.len(), key_count as usize);
let guard = tree.guard();
for i in 0..key_count / 2 {
let _ = tree.remove_with_guard(&i.to_be_bytes(), &guard);
}
drop(guard);
let guard = tree.guard();
tree.process_coalesce(&guard);
drop(guard);
for i in key_count / 2..key_count {
assert_val_eq!(
tree.get(&i.to_be_bytes()),
Some(i),
"key {i} should be reachable after cascade coalesce"
);
}
for i in 0..key_count / 2 {
assert_val_eq!(
tree.get(&i.to_be_bytes()),
None,
"key {i} should be absent after removal"
);
}
for i in 0..key_count / 2 {
tree.insert(&i.to_be_bytes(), i + 1000);
}
for i in 0..key_count / 2 {
assert_val_eq!(
tree.get(&i.to_be_bytes()),
Some(i + 1000),
"re-inserted key {i} should be reachable"
);
}
for i in key_count / 2..key_count {
assert_val_eq!(
tree.get(&i.to_be_bytes()),
Some(i),
"surviving key {i} should still be reachable"
);
}
assert_eq!(tree.len(), key_count as usize);
assert_eq!(tree.coalesce_abandoned(), 0);
}
#[test]
fn test_multi_round_cascade_routing() {
let tree = TestTree::new();
let key_count: u64 = 300;
for i in 0..key_count {
tree.insert(&i.to_be_bytes(), i);
}
{
let guard = tree.guard();
for i in 0..key_count / 4 {
let _ = tree.remove_with_guard(&i.to_be_bytes(), &guard);
}
tree.process_coalesce(&guard);
}
{
let guard = tree.guard();
for i in key_count / 4..key_count / 2 {
let _ = tree.remove_with_guard(&i.to_be_bytes(), &guard);
}
tree.process_coalesce(&guard);
}
for i in key_count / 2..key_count {
assert_val_eq!(
tree.get(&i.to_be_bytes()),
Some(i),
"key {i} should survive two cascade rounds"
);
}
for i in 0..key_count / 2 {
tree.insert(&i.to_be_bytes(), i + 5000);
}
for i in 0..key_count / 2 {
assert_val_eq!(
tree.get(&i.to_be_bytes()),
Some(i + 5000),
"re-inserted key {i} after multi-round cascade"
);
}
assert_eq!(tree.len(), key_count as usize);
assert_eq!(tree.coalesce_abandoned(), 0);
}