use std::fmt::{self as StdFmt, Debug, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use crossbeam_queue::SegQueue;
use seize::LocalGuard;
use crate::alloc_trait::TreeAllocator;
use crate::key::Key;
use crate::leaf15::{LAYER_KEYLENX, LeafNode15};
use crate::link::Linker;
use crate::nodeversion::{LockGuard, NodeVersion};
use crate::policy::LeafPolicy;
use crate::tree::MassTreeGeneric;
use crate::tree::remove::{ImmediateParentResult, NodeCleaner};
pub type Route = Vec<u64>;
const MAX_REQUEUE_COUNT: u8 = 10;
const MAX_GC_BLINK_HOPS: usize = 64;
#[derive(Debug, Clone)]
struct ChainEntry {
leaf_ptr: *mut u8,
ikey_bound: u64,
requeue_count: u8,
}
#[derive(Debug, Clone)]
struct DeferredChainEntry {
ikey_bound: u64,
requeue_count: u8,
}
#[derive(Debug, Clone)]
struct SublayerEntry {
route: Route,
requeue_count: u8,
}
pub struct CoalesceQueue {
chains: SegQueue<ChainEntry>,
sublayers: SegQueue<SublayerEntry>,
deferred_chains: SegQueue<DeferredChainEntry>,
deferred_sublayers: SegQueue<SublayerEntry>,
abandoned: AtomicUsize,
}
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl Send for CoalesceQueue {}
unsafe impl Sync for CoalesceQueue {}
impl Default for CoalesceQueue {
fn default() -> Self {
Self::new()
}
}
impl CoalesceQueue {
#[must_use]
pub const fn new() -> Self {
Self {
chains: SegQueue::new(),
sublayers: SegQueue::new(),
deferred_chains: SegQueue::new(),
deferred_sublayers: SegQueue::new(),
abandoned: AtomicUsize::new(0),
}
}
#[inline(always)]
pub fn schedule_chain(&self, leaf_ptr: *mut u8, ikey_bound: u64) {
self.chains.push(ChainEntry {
leaf_ptr,
ikey_bound,
requeue_count: 0,
});
}
#[inline(always)]
pub fn schedule_sublayer(&self, route: Route) {
self.sublayers.push(SublayerEntry {
route,
requeue_count: 0,
});
}
fn drain_deferred(&self) {
while let Some(entry) = self.deferred_sublayers.pop() {
self.sublayers.push(entry);
}
}
#[must_use]
#[inline]
#[allow(dead_code)]
pub fn is_empty(&self) -> bool {
self.chains.is_empty()
&& self.sublayers.is_empty()
&& self.deferred_chains.is_empty()
&& self.deferred_sublayers.is_empty()
}
#[must_use]
#[inline]
pub fn len(&self) -> usize {
self.chains.len()
+ self.sublayers.len()
+ self.deferred_chains.len()
+ self.deferred_sublayers.len()
}
#[must_use]
#[inline]
pub fn deferred_len(&self) -> usize {
self.deferred_chains.len() + self.deferred_sublayers.len()
}
#[must_use]
#[inline]
pub fn abandoned(&self) -> usize {
self.abandoned.load(AtomicOrdering::Relaxed)
}
pub fn clear(&self) {
while self.chains.pop().is_some() {}
while self.sublayers.pop().is_some() {}
while self.deferred_chains.pop().is_some() {}
while self.deferred_sublayers.pop().is_some() {}
}
}
impl Debug for CoalesceQueue {
fn fmt(&self, f: &mut Formatter<'_>) -> StdFmt::Result {
f.debug_struct("CoalesceQueue")
.field("chains", &self.chains.len())
.field("sublayers", &self.sublayers.len())
.field("deferred_chains", &self.deferred_chains.len())
.field("deferred_sublayers", &self.deferred_sublayers.len())
.field("abandoned", &self.abandoned())
.finish()
}
}
enum RouteLookupResult<T> {
Found(T),
Retry,
NotFound,
HopLimit,
}
struct FoundParent<P: LeafPolicy> {
parent_ptr: *mut LeafNode15<P>,
last_ikey: u64,
}
pub struct Coalesce;
impl Coalesce {
pub fn process_all<P, A>(tree: &MassTreeGeneric<P, A>, guard: &LocalGuard<'_>) -> usize
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
tree.coalesce_queue().drain_deferred();
let mut processed: usize = 0;
while Self::try_remove_one::<P, A>(tree, guard) {
processed += 1;
}
processed
}
pub fn process_batch<P, A>(
tree: &MassTreeGeneric<P, A>,
guard: &LocalGuard<'_>,
limit: usize,
) -> usize
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
tree.coalesce_queue().drain_deferred();
let mut processed: usize = 0;
while processed < limit && Self::try_remove_one::<P, A>(tree, guard) {
processed += 1;
}
processed
}
fn try_remove_one<P, A>(tree: &MassTreeGeneric<P, A>, guard: &LocalGuard<'_>) -> bool
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
let queue: &CoalesceQueue = tree.coalesce_queue();
if let Some(entry) = queue.deferred_chains.pop() {
return Self::try_remove_chain_deferred::<P, A>(
tree,
queue,
entry.ikey_bound,
entry.requeue_count,
guard,
);
}
if let Some(entry) = queue.sublayers.pop() {
return Self::try_remove_sublayer::<P, A>(
tree,
queue,
entry.route,
entry.requeue_count,
guard,
);
}
if let Some(entry) = queue.chains.pop() {
return Self::try_remove_chain::<P, A>(
tree,
queue,
entry.leaf_ptr,
entry.ikey_bound,
entry.requeue_count,
guard,
);
}
false
}
fn try_remove_chain<P, A>(
tree: &MassTreeGeneric<P, A>,
queue: &CoalesceQueue,
leaf_ptr_erased: *mut u8,
ikey_bound: u64,
requeue_count: u8,
guard: &LocalGuard<'_>,
) -> bool
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
let leaf_ptr: *mut LeafNode15<P> = leaf_ptr_erased.cast();
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let Some(mut lock) = leaf.version().try_lock() else {
Self::requeue_chain(queue, ikey_bound, requeue_count);
return true;
};
if leaf.size() > 0 {
drop(lock);
return true;
}
if leaf.deleted_layer() {
drop(lock);
return true;
}
if leaf.prev(guard).is_null() {
leaf.clear_queued();
drop(lock);
return true;
}
Self::finish_chain_coalesce::<P, A>(
tree,
queue,
leaf,
leaf_ptr,
ikey_bound,
requeue_count,
&mut lock,
guard,
);
true
}
#[cold]
#[inline(never)]
fn try_remove_chain_deferred<P, A>(
tree: &MassTreeGeneric<P, A>,
queue: &CoalesceQueue,
ikey_bound: u64,
requeue_count: u8,
guard: &LocalGuard<'_>,
) -> bool
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
let leaf_ptr: *mut LeafNode15<P> =
match Self::find_chain_leaf::<P, A>(tree, ikey_bound, guard) {
RouteLookupResult::Found(ptr) => ptr,
RouteLookupResult::NotFound => {
return true;
}
RouteLookupResult::Retry | RouteLookupResult::HopLimit => {
Self::requeue_chain(queue, ikey_bound, requeue_count);
return true;
}
};
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let Some(mut lock) = leaf.version().try_lock() else {
Self::requeue_chain(queue, ikey_bound, requeue_count);
return true;
};
if leaf.size() > 0 || leaf.deleted_layer() {
drop(lock);
return true;
}
if leaf.prev(guard).is_null() {
leaf.clear_queued();
drop(lock);
return true;
}
Self::finish_chain_coalesce::<P, A>(
tree,
queue,
leaf,
leaf_ptr,
ikey_bound,
requeue_count,
&mut lock,
guard,
);
true
}
#[expect(
clippy::too_many_arguments,
reason = "Shared coalesce tail needs full context"
)]
fn finish_chain_coalesce<P, A>(
tree: &MassTreeGeneric<P, A>,
queue: &CoalesceQueue,
leaf: &LeafNode15<P>,
leaf_ptr: *mut LeafNode15<P>,
ikey_bound: u64,
requeue_count: u8,
lock: &mut LockGuard<'_>,
guard: &LocalGuard<'_>,
) where
P: LeafPolicy,
A: TreeAllocator<P>,
{
let leaf_ikey_bound: u64 = leaf.ikey_bound();
let result: ImmediateParentResult<'_> =
NodeCleaner::remove_leaf_from_immediate_parent::<P, A>(
tree.allocator(),
guard,
leaf_ptr,
lock,
leaf_ikey_bound,
);
match result {
ImmediateParentResult::Success => {
lock.mark_deleted();
unsafe { leaf.unlink_from_chain() };
unsafe { tree.allocator().retire_leaf(leaf_ptr, guard) };
}
ImmediateParentResult::SuccessNeedsCascade {
parent_ptr,
parent_lock,
} => {
lock.mark_deleted();
unsafe { leaf.unlink_from_chain() };
unsafe { tree.allocator().retire_leaf(leaf_ptr, guard) };
NodeCleaner::try_cascade_internodes::<P, A>(
tree.allocator(),
guard,
parent_ptr,
parent_lock,
leaf_ikey_bound,
);
}
ImmediateParentResult::Failure => {
Self::requeue_chain(queue, ikey_bound, requeue_count);
}
}
}
fn requeue_chain(queue: &CoalesceQueue, ikey_bound: u64, count: u8) {
if count < MAX_REQUEUE_COUNT {
queue.deferred_chains.push(DeferredChainEntry {
ikey_bound,
requeue_count: count + 1,
});
} else {
queue.abandoned.fetch_add(1, AtomicOrdering::Relaxed);
}
}
fn find_chain_leaf<P, A>(
tree: &MassTreeGeneric<P, A>,
ikey_bound: u64,
guard: &LocalGuard<'_>,
) -> RouteLookupResult<*mut LeafNode15<P>>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
let root: *const u8 = tree.load_root_ptr_generic(guard);
let key: Key<'_> = Key::from_ikey(ikey_bound);
let start: *mut LeafNode15<P> =
tree.reach_leaf_concurrent_generic(root, &key, false, guard);
match Self::walk_chain_to_ikey(start, ikey_bound, guard) {
RouteLookupResult::Found(leaf_ptr) => {
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
if leaf.ikey_bound() == ikey_bound && leaf.is_empty_state() && leaf.is_queued() {
RouteLookupResult::Found(leaf_ptr)
} else {
RouteLookupResult::NotFound
}
}
RouteLookupResult::Retry => RouteLookupResult::Retry,
RouteLookupResult::HopLimit => RouteLookupResult::HopLimit,
RouteLookupResult::NotFound => RouteLookupResult::NotFound,
}
}
#[cold]
#[inline(never)]
fn try_remove_sublayer<P, A>(
tree: &MassTreeGeneric<P, A>,
queue: &CoalesceQueue,
route: Route,
requeue_count: u8,
guard: &LocalGuard<'_>,
) -> bool
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
let found = match Self::find_sublayer_parent::<P, A>(tree, &route, guard) {
RouteLookupResult::Found(f) => f,
RouteLookupResult::Retry | RouteLookupResult::HopLimit => {
Self::requeue_sublayer(queue, route, requeue_count);
return true;
}
RouteLookupResult::NotFound => {
queue.abandoned.fetch_add(1, AtomicOrdering::Relaxed);
return true;
}
};
let parent: &LeafNode15<P> = unsafe { &*found.parent_ptr };
let Some(mut parent_lock) = parent.version().try_lock() else {
Self::requeue_sublayer(queue, route, requeue_count);
return true;
};
let Some(parent_slot) = Self::find_layer_slot(parent, found.last_ikey) else {
drop(parent_lock);
Self::requeue_sublayer(queue, route, requeue_count);
return true;
};
let layer_ptr: *mut u8 = parent.load_layer_raw(parent_slot);
if layer_ptr.is_null() {
drop(parent_lock);
return true;
}
let sublayer: &LeafNode15<P> = unsafe { &*layer_ptr.cast::<LeafNode15<P>>() };
let Some(sublayer_lock) = sublayer.version().try_lock() else {
drop(parent_lock);
Self::requeue_sublayer(queue, route, requeue_count);
return true;
};
if sublayer.deleted_layer() || sublayer.size() > 0 {
sublayer.clear_queued();
drop(sublayer_lock);
drop(parent_lock);
return true;
}
if !sublayer.prev(guard).is_null() || !sublayer.safe_next(guard).is_null() {
drop(sublayer_lock);
drop(parent_lock);
Self::requeue_sublayer(queue, route, requeue_count);
return true;
}
parent_lock.mark_insert();
parent.clear_slot_and_permutation(parent_slot);
let parent_now_empty: bool = parent.size() == 0;
let parent_newly_queued: bool = if parent_now_empty {
parent.mark_empty();
parent.try_mark_queued()
} else {
false
};
let parent_ikey_bound: u64 = parent.ikey_bound();
sublayer.mark_deleted_layer();
drop(sublayer_lock);
drop(parent_lock);
unsafe {
tree.allocator()
.retire_leaf(layer_ptr.cast::<LeafNode15<P>>(), guard);
}
if parent_newly_queued {
if route.len() > 1 {
let mut parent_route: Route = route;
parent_route.pop();
queue.schedule_sublayer(parent_route);
} else {
queue.schedule_chain(found.parent_ptr.cast::<u8>(), parent_ikey_bound);
}
}
true
}
fn requeue_sublayer(queue: &CoalesceQueue, route: Route, requeue_count: u8) {
if requeue_count < MAX_REQUEUE_COUNT {
queue.deferred_sublayers.push(SublayerEntry {
route,
requeue_count: requeue_count + 1,
});
} else {
queue.abandoned.fetch_add(1, AtomicOrdering::Relaxed);
}
}
fn find_layer_slot<P: LeafPolicy>(leaf: &LeafNode15<P>, target_ikey: u64) -> Option<usize> {
let perm = leaf.permutation();
let size: usize = perm.size();
for ki in 0..size {
let kp: usize = perm.get(ki);
if leaf.ikey_relaxed(kp) == target_ikey && leaf.keylenx(kp) >= LAYER_KEYLENX {
return Some(kp);
}
}
None
}
fn find_sublayer_parent<P, A>(
tree: &MassTreeGeneric<P, A>,
route: &Route,
guard: &LocalGuard<'_>,
) -> RouteLookupResult<FoundParent<P>>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
debug_assert!(!route.is_empty());
let mut current_root: *const u8 = tree.root_ptr.load(AtomicOrdering::Acquire);
for &ikey in &route[..route.len() - 1] {
let key: Key<'_> = Key::from_ikey(ikey);
let leaf_ptr: *mut LeafNode15<P> =
tree.reach_leaf_concurrent_generic(current_root, &key, true, guard);
match Self::find_layer_in_chain(leaf_ptr, ikey, guard) {
RouteLookupResult::Found(ptr) => current_root = ptr.cast_const(),
RouteLookupResult::Retry => return RouteLookupResult::Retry,
RouteLookupResult::NotFound => return RouteLookupResult::NotFound,
RouteLookupResult::HopLimit => return RouteLookupResult::HopLimit,
}
}
let last_ikey: u64 = route[route.len() - 1];
let key: Key<'_> = Key::from_ikey(last_ikey);
let leaf_ptr: *mut LeafNode15<P> =
tree.reach_leaf_concurrent_generic(current_root, &key, true, guard);
match Self::walk_chain_to_ikey(leaf_ptr, last_ikey, guard) {
RouteLookupResult::Found(parent_ptr) => RouteLookupResult::Found(FoundParent {
parent_ptr,
last_ikey,
}),
RouteLookupResult::Retry => RouteLookupResult::Retry,
RouteLookupResult::NotFound => RouteLookupResult::NotFound,
RouteLookupResult::HopLimit => RouteLookupResult::HopLimit,
}
}
fn walk_chain_to_ikey<P: LeafPolicy>(
start: *mut LeafNode15<P>,
target_ikey: u64,
guard: &LocalGuard<'_>,
) -> RouteLookupResult<*mut LeafNode15<P>> {
let mut leaf_ptr: *mut LeafNode15<P> = start;
let mut hops: usize = 0;
loop {
if hops >= MAX_GC_BLINK_HOPS {
return RouteLookupResult::HopLimit;
}
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
if leaf.version().is_deleted() {
let next_raw: *mut LeafNode15<P> = leaf.next_raw(guard);
let next_ptr: *mut LeafNode15<P> = Linker::unmark_ptr(next_raw);
if next_ptr.is_null() {
return RouteLookupResult::NotFound;
}
leaf_ptr = next_ptr;
hops += 1;
continue;
}
if leaf.deleted_layer() {
return RouteLookupResult::Retry;
}
let next_raw: *mut LeafNode15<P> = leaf.next_raw(guard);
if Linker::is_marked(next_raw) {
leaf.wait_for_split();
continue;
}
let next_ptr: *mut LeafNode15<P> = Linker::unmark_ptr(next_raw);
if !next_ptr.is_null() {
let next_leaf: &LeafNode15<P> = unsafe { &*next_ptr };
if target_ikey >= next_leaf.ikey_bound() {
leaf_ptr = next_ptr;
hops += 1;
continue;
}
}
return RouteLookupResult::Found(leaf_ptr);
}
}
fn find_layer_in_chain<P: LeafPolicy>(
start: *mut LeafNode15<P>,
target_ikey: u64,
guard: &LocalGuard<'_>,
) -> RouteLookupResult<*mut u8> {
let leaf_ptr: *mut LeafNode15<P> = match Self::walk_chain_to_ikey(start, target_ikey, guard)
{
RouteLookupResult::Found(ptr) => ptr,
RouteLookupResult::Retry => return RouteLookupResult::Retry,
RouteLookupResult::NotFound => return RouteLookupResult::NotFound,
RouteLookupResult::HopLimit => return RouteLookupResult::HopLimit,
};
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
let version: u32 = match leaf.version().try_stable() {
Some(v) => v,
None => return RouteLookupResult::Retry,
};
let slot: Option<usize> = Self::find_layer_slot(leaf, target_ikey);
if leaf.version().has_changed_or_locked(version) {
return RouteLookupResult::Retry;
}
let Some(kp) = slot else {
return RouteLookupResult::NotFound;
};
let layer_ptr: *mut u8 = leaf.load_layer_raw(kp);
if leaf.version().has_changed_or_locked(version) {
return RouteLookupResult::Retry;
}
if layer_ptr.is_null() {
return RouteLookupResult::NotFound;
}
#[expect(clippy::cast_ptr_alignment, reason = "NodeVersion is first field")]
let layer_version: &NodeVersion = unsafe { &*layer_ptr.cast::<NodeVersion>() };
if layer_version.is_deleted() {
return RouteLookupResult::Retry;
}
RouteLookupResult::Found(layer_ptr)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::ptr;
#[test]
fn test_queue_basic_operations() {
let queue: CoalesceQueue = CoalesceQueue::new();
assert!(queue.is_empty());
assert_eq!(queue.len(), 0);
assert_eq!(queue.deferred_len(), 0);
assert_eq!(queue.abandoned(), 0);
queue.schedule_chain(ptr::null_mut(), 100);
queue.schedule_chain(ptr::null_mut(), 200);
assert!(!queue.is_empty());
assert_eq!(queue.len(), 2);
queue.clear();
assert!(queue.is_empty());
}
#[test]
fn test_debug_impl() {
let queue = CoalesceQueue::new();
queue.schedule_chain(ptr::null_mut(), 42);
let debug_str = format!("{queue:?}");
assert!(debug_str.contains("CoalesceQueue"));
assert!(debug_str.contains("chains"));
assert!(debug_str.contains("deferred_chains"));
assert!(debug_str.contains("abandoned"));
}
#[test]
fn test_schedule_sublayer() {
let queue = CoalesceQueue::new();
queue.schedule_sublayer(vec![0x1234, 0x5678]);
assert_eq!(queue.len(), 1);
queue.schedule_chain(ptr::null_mut(), 300);
assert_eq!(queue.len(), 2);
queue.clear();
assert!(queue.is_empty());
}
#[test]
fn test_requeue_count_limit() {
let entry: ChainEntry = ChainEntry {
leaf_ptr: ptr::null_mut(),
ikey_bound: 42,
requeue_count: 0,
};
assert_eq!(entry.requeue_count, 0);
let sub_entry: SublayerEntry = SublayerEntry {
route: vec![1, 2, 3],
requeue_count: 0,
};
assert_eq!(sub_entry.requeue_count, 0);
let deferred_entry: DeferredChainEntry = DeferredChainEntry {
ikey_bound: 42,
requeue_count: 5,
};
assert_eq!(deferred_entry.requeue_count, 5);
const {
assert!(
MAX_REQUEUE_COUNT >= 5,
"MAX_REQUEUE_COUNT should be at least 5"
);
}
const {
assert!(
MAX_REQUEUE_COUNT <= 20,
"MAX_REQUEUE_COUNT should be at most 20"
);
}
}
#[test]
fn test_deferred_queue_operations() {
let queue = CoalesceQueue::new();
queue.deferred_chains.push(DeferredChainEntry {
ikey_bound: 100,
requeue_count: 1,
});
queue.deferred_sublayers.push(SublayerEntry {
route: vec![0x42],
requeue_count: 2,
});
assert_eq!(queue.deferred_len(), 2);
assert_eq!(queue.len(), 2);
assert!(!queue.is_empty());
queue.drain_deferred();
assert_eq!(queue.sublayers.len(), 1);
assert_eq!(queue.deferred_chains.len(), 1);
assert_eq!(queue.deferred_sublayers.len(), 0);
queue.clear();
assert!(queue.is_empty());
}
#[test]
fn test_abandoned_counter() {
let queue = CoalesceQueue::new();
assert_eq!(queue.abandoned(), 0);
queue.abandoned.fetch_add(1, AtomicOrdering::Relaxed);
assert_eq!(queue.abandoned(), 1);
queue.abandoned.fetch_add(3, AtomicOrdering::Relaxed);
assert_eq!(queue.abandoned(), 4);
}
#[test]
fn test_requeue_chain_exhaustion() {
let queue = CoalesceQueue::new();
Coalesce::requeue_chain(&queue, 42, 5);
assert_eq!(queue.deferred_chains.len(), 1);
Coalesce::requeue_chain(&queue, 42, MAX_REQUEUE_COUNT);
assert_eq!(queue.deferred_chains.len(), 1);
assert_eq!(queue.abandoned(), 1);
}
#[test]
fn test_requeue_sublayer_exhaustion() {
let queue = CoalesceQueue::new();
Coalesce::requeue_sublayer(&queue, vec![1, 2], 5);
assert_eq!(queue.deferred_sublayers.len(), 1);
Coalesce::requeue_sublayer(&queue, vec![1, 2], MAX_REQUEUE_COUNT);
assert_eq!(queue.deferred_sublayers.len(), 1);
assert_eq!(queue.abandoned(), 1);
}
#[test]
fn test_deferred_chain_stale_entry_dropped() {
use crate::tree::MassTree15;
let tree: MassTree15<u64> = MassTree15::new();
tree.insert(b"hello", 1);
let _ = tree.remove(b"hello");
let guard = tree.guard();
tree.process_coalesce(&guard);
drop(guard);
tree.coalesce_queue()
.deferred_chains
.push(DeferredChainEntry {
ikey_bound: 0xDEAD_BEEF,
requeue_count: 3,
});
assert_eq!(tree.coalesce_queue().deferred_chains.len(), 1);
let guard = tree.guard();
let processed = tree.process_coalesce(&guard);
assert!(processed >= 1, "stale deferred entry should be processed");
assert_eq!(
tree.coalesce_queue().deferred_chains.len(),
0,
"stale entry should not be requeued"
);
assert_eq!(
tree.coalesce_abandoned(),
0,
"stale entry should not increment abandoned"
);
}
#[test]
fn test_deferred_chain_retry_exhaustion_increments_abandoned() {
let queue = CoalesceQueue::new();
for count in 0..MAX_REQUEUE_COUNT {
Coalesce::requeue_chain(&queue, 0x42, count);
}
assert_eq!(
queue.deferred_chains.len(),
MAX_REQUEUE_COUNT as usize,
"each requeue below limit should push to deferred_chains"
);
assert_eq!(queue.abandoned(), 0, "no abandonment below limit");
Coalesce::requeue_chain(&queue, 0x42, MAX_REQUEUE_COUNT);
assert_eq!(
queue.abandoned(),
1,
"exhaustion should increment abandoned exactly once"
);
assert_eq!(
queue.deferred_chains.len(),
MAX_REQUEUE_COUNT as usize,
"exhausted entry should not be requeued"
);
}
#[test]
fn test_deferred_chain_mismatched_leaf_is_not_found() {
use crate::tree::MassTree15;
let tree: MassTree15<u64> = MassTree15::new();
tree.insert(b"testkey1", 100);
let ikey_bound: u64 = u64::from_be_bytes(*b"testkey1");
tree.coalesce_queue()
.deferred_chains
.push(DeferredChainEntry {
ikey_bound,
requeue_count: 0,
});
let guard = tree.guard();
let processed = tree.process_coalesce(&guard);
assert!(processed >= 1);
assert_eq!(
tree.coalesce_queue().deferred_chains.len(),
0,
"mismatched entry should be consumed (NotFound), not requeued"
);
assert_eq!(tree.coalesce_abandoned(), 0);
assert_eq!(tree.get(b"testkey1"), Some(100));
}
}