use seize::LocalGuard;
use std::fmt::{self as StdFmt, Display, Formatter};
use std::hint as StdHint;
use std::ptr as StdPtr;
use crate::ksearch::upper_bound_internode_generic;
use crate::leaf15::LeafNode15;
use crate::tree::coalesce::Route;
use crate::tree::split::Propagation;
use crate::{
TreeInternode, TreeLeafNode,
alloc_trait::TreeAllocator,
internode::InternodeNode,
key::Key,
leaf15::{KSUF_KEYLENX, LAYER_KEYLENX},
nodeversion::{LockGuard, NodeVersion},
policy::LeafPolicy,
tree::MassTreeGeneric,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RemoveError {
RetryLimitExceeded,
}
impl Display for RemoveError {
fn fmt(&self, f: &mut Formatter<'_>) -> StdFmt::Result {
match self {
Self::RetryLimitExceeded => write!(f, "retry limit exceeded"),
}
}
}
impl std::error::Error for RemoveError {}
#[derive(Debug)]
enum RemoveSearchResult {
NotFound,
Found {
ki: usize,
#[allow(dead_code, reason = "Captured for debugging, verified under lock")]
kp: usize,
},
DescendLayer {
layer_ptr: *mut u8,
},
}
enum RemoveLockResult<'t, 'g, P, A>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
NotFound,
Ready(RemoveCursor<'t, 'g, P, A>),
DescendLayer {
layer_ptr: *mut u8,
},
Retry,
RestartFromRoot,
}
const MAX_RETRIES: usize = 1000;
const MAX_PARENT_RETRIES: usize = 100;
const IKEY_SIZE: u8 = 8;
enum LockedParentResult<'a> {
Locked(LockGuard<'a>, *mut u8),
NoParent,
RetryExhausted,
}
#[allow(clippy::redundant_pub_crate)]
pub(crate) enum ImmediateParentResult<'a> {
Success,
SuccessNeedsCascade {
parent_ptr: *mut u8,
parent_lock: LockGuard<'a>,
},
Failure,
}
#[derive(Debug)]
pub struct RemoveCursor<'t, 'g, P, A>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
tree: &'t MassTreeGeneric<P, A>,
leaf: *mut LeafNode15<P>,
lock: LockGuard<'t>,
ki: usize,
kp: usize,
route: Route,
guard: &'g LocalGuard<'g>,
}
impl<'t, 'g, P, A> RemoveCursor<'t, 'g, P, A>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
#[inline(always)]
#[expect(clippy::too_many_arguments, reason = "Complex state management")]
const fn new(
tree: &'t MassTreeGeneric<P, A>,
leaf: *mut LeafNode15<P>,
lock: LockGuard<'t>,
ki: usize,
kp: usize,
route: Route,
guard: &'g LocalGuard<'g>,
) -> Self {
Self {
tree,
leaf,
lock,
ki,
kp,
route,
guard,
}
}
#[must_use]
pub fn finish_remove(mut self) -> Option<P::Output> {
let leaf: &LeafNode15<P> = unsafe { &*self.leaf };
let slot_keylenx: u8 = leaf.keylenx(self.kp);
self.lock.mark_insert();
let value: Option<P::Output> = leaf.take_value(self.kp);
if P::NEEDS_RETIREMENT
&& let Some(ref v) = value
{
unsafe { P::retire_output(P::clone_output(v), self.guard) };
}
leaf.set_keylenx(self.kp, 0);
if slot_keylenx == KSUF_KEYLENX {
unsafe { leaf.clear_ksuf(self.kp, self.guard) };
}
let mut new_perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm = leaf.permutation();
new_perm.remove(self.ki);
leaf.set_permutation(new_perm);
self.tree.dec_count();
if new_perm.size() == 0 {
self.schedule_coalesce_cold(leaf);
}
value
}
#[cold]
#[inline(never)]
fn schedule_coalesce_cold(&mut self, leaf: &LeafNode15<P>) {
leaf.mark_empty();
if leaf.try_mark_queued() {
if self.route.is_empty() {
let ikey_bound: u64 = leaf.ikey_bound();
self.tree
.coalesce_queue
.schedule_chain(self.leaf.cast::<u8>(), ikey_bound);
} else {
self.tree
.coalesce_queue
.schedule_sublayer(std::mem::take(&mut self.route));
}
}
}
}
#[derive(Debug)]
pub struct NodeCleaner;
impl NodeCleaner {
#[cold]
pub(crate) fn remove_leaf_from_immediate_parent<'a, P, A>(
_allocator: &A,
_guard: &LocalGuard<'_>,
leaf_ptr: *mut LeafNode15<P>,
_leaf_lock: &LockGuard<'_>,
ikey_bound: u64,
) -> ImmediateParentResult<'a>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
let current: *mut u8 = leaf_ptr.cast();
let parent_result: LockedParentResult<'_> =
unsafe { Self::locked_parent_generic::<P>(current) };
let (mut parent_lock, parent_ptr) = match parent_result {
LockedParentResult::Locked(lock, ptr) => (lock, ptr),
LockedParentResult::NoParent => {
return ImmediateParentResult::Success;
}
LockedParentResult::RetryExhausted => {
return ImmediateParentResult::Failure;
}
};
let parent: &InternodeNode = unsafe { &*parent_ptr.cast::<InternodeNode>() };
parent_lock.mark_insert();
debug_assert!(
!parent.version().is_deleted(),
"remove_leaf_from_immediate_parent: parent should not be deleted"
);
let mut kp: usize = upper_bound_internode_generic(ikey_bound, parent);
if TreeInternode::child(parent, kp) != current {
let nkeys: usize = parent.nkeys();
let mut found_kp: Option<usize> = None;
for i in 0..=nkeys {
if TreeInternode::child(parent, i) == current {
found_kp = Some(i);
break;
}
}
if let Some(actual_kp) = found_kp {
kp = actual_kp;
} else {
drop(parent_lock);
return ImmediateParentResult::Success;
}
}
parent.set_child(kp, StdPtr::null_mut());
if kp > 0 {
Self::shift_internode_down_generic::<InternodeNode>(parent, kp);
}
if (kp <= 1) && (parent.nkeys() > 0) && TreeInternode::child(parent, 0).is_null() {
let new_ikey: u64 = parent.ikey(0);
Self::redirect_ikey_bounds_generic::<P>(parent_ptr, ikey_bound, new_ikey);
}
if parent.nkeys() > 0 || parent.version().is_root() {
drop(parent_lock);
ImmediateParentResult::Success
} else {
ImmediateParentResult::SuccessNeedsCascade {
parent_ptr,
parent_lock,
}
}
}
#[cold]
pub(crate) fn try_cascade_internodes<P, A>(
allocator: &A,
guard: &LocalGuard<'_>,
start_ptr: *mut u8,
start_lock: LockGuard<'_>,
ikey_bound: u64,
) where
P: LeafPolicy,
A: TreeAllocator<P>,
{
let mut current: *mut u8 = start_ptr;
let mut current_lock: LockGuard<'_> = start_lock;
let mut current_ikey: u64 = ikey_bound;
loop {
let parent: &InternodeNode = unsafe { &*current.cast::<InternodeNode>() };
let child0: *mut u8 = TreeInternode::child(parent, 0);
current_lock.mark_deleted();
parent.set_child(0, StdPtr::null_mut());
unsafe { allocator.retire_internode_erased(current, guard) };
let grandparent_result: LockedParentResult<'_> =
unsafe { Self::locked_parent_generic::<P>(current) };
let (mut grandparent_lock, grandparent_ptr) = match grandparent_result {
LockedParentResult::Locked(lock, ptr) => (lock, ptr),
LockedParentResult::NoParent => {
drop(current_lock);
return;
}
LockedParentResult::RetryExhausted => {
drop(current_lock);
return;
}
};
let grandparent: &InternodeNode = unsafe { &*grandparent_ptr.cast::<InternodeNode>() };
grandparent_lock.mark_insert();
let nkeys: usize = grandparent.nkeys();
let mut kp: Option<usize> = None;
for i in 0..=nkeys {
if TreeInternode::child(grandparent, i) == current {
kp = Some(i);
break;
}
}
let Some(kp) = kp else {
drop(current_lock);
drop(grandparent_lock);
return;
};
grandparent.set_child(kp, child0);
if !child0.is_null() {
unsafe { Self::set_parent_erased::<P>(child0, grandparent_ptr) };
} else if kp > 0 {
Self::shift_internode_down_generic::<InternodeNode>(grandparent, kp);
}
if (kp <= 1)
&& (grandparent.nkeys() > 0)
&& TreeInternode::child(grandparent, 0).is_null()
{
let new_ikey: u64 = grandparent.ikey(0);
Self::redirect_ikey_bounds_generic::<P>(grandparent_ptr, current_ikey, new_ikey);
current_ikey = new_ikey;
}
drop(current_lock);
if grandparent.nkeys() > 0 || grandparent.version().is_root() {
drop(grandparent_lock);
return;
}
current = grandparent_ptr;
current_lock = grandparent_lock;
}
}
pub fn remove_concurrent_generic<P, A>(
tree: &MassTreeGeneric<P, A>,
key_bytes: &[u8],
guard: &LocalGuard<'_>,
) -> Result<Option<P::Output>, RemoveError>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
let mut key: Key<'_> = Key::new(key_bytes);
let single_layer_mode: bool = !key.has_suffix();
let mut retry_count: usize = 0;
let mut layer_root: *mut u8 = tree.load_root_ptr_generic(guard).cast_mut();
let mut route: Route = Vec::new();
'layer_loop: loop {
if retry_count >= MAX_RETRIES {
return Err(RemoveError::RetryLimitExceeded);
}
retry_count += 1;
let leaf_ptr: *mut LeafNode15<P> =
tree.reach_leaf_concurrent_generic(layer_root, &key, false, guard);
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
'forward: loop {
let version: u32 = leaf.version().stable();
let perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm = leaf.permutation();
let search_result: RemoveSearchResult = if single_layer_mode {
Self::search_for_remove::<true, P>(leaf, &key, &perm)
} else {
Self::search_for_remove::<false, P>(leaf, &key, &perm)
};
if leaf.version().has_changed(version) {
continue 'forward;
}
match search_result {
RemoveSearchResult::NotFound => {
return Ok(None);
}
RemoveSearchResult::Found { ki, kp: _ } => {
let lock_result = Self::lock_and_verify_for_remove(
tree, leaf_ptr, ki, &key, &mut route, guard,
);
match lock_result {
RemoveLockResult::NotFound => {
return Ok(None);
}
RemoveLockResult::Ready(cursor) => {
return Ok(cursor.finish_remove());
}
RemoveLockResult::DescendLayer { layer_ptr: lp } => {
route.push(key.ikey());
layer_root = lp;
key.shift();
continue 'layer_loop;
}
RemoveLockResult::Retry => {
}
RemoveLockResult::RestartFromRoot => {
key.unshift_all();
layer_root = tree.load_root_ptr_generic(guard).cast_mut();
route.clear();
continue 'layer_loop;
}
}
}
RemoveSearchResult::DescendLayer { layer_ptr } => {
debug_assert!(
!single_layer_mode,
"DescendLayer unreachable in single-layer mode"
);
if !Self::is_sublayer_valid(layer_ptr) {
return Ok(None);
}
route.push(key.ikey());
layer_root = layer_ptr;
key.shift();
continue 'layer_loop;
}
}
}
}
}
#[inline(always)]
fn search_for_remove<const SINGLE_LAYER: bool, P>(
leaf: &LeafNode15<P>,
key: &Key<'_>,
perm: &<LeafNode15<P> as TreeLeafNode<P>>::Perm,
) -> RemoveSearchResult
where
P: LeafPolicy,
{
let target_ikey: u64 = key.ikey();
#[expect(clippy::cast_possible_truncation, reason = "key.current_len() <= 8")]
let search_keylenx: u8 = key.current_len() as u8;
let size: usize = perm.size();
for ki in 0..size {
let kp: usize = perm.get(ki);
let slot_ikey: u64 = leaf.ikey_relaxed(kp);
if slot_ikey < target_ikey {
continue;
}
if slot_ikey > target_ikey {
return RemoveSearchResult::NotFound;
}
leaf.prefetch_value(kp);
let slot_keylenx: u8 = leaf.keylenx_relaxed(kp);
if SINGLE_LAYER {
if slot_keylenx == search_keylenx {
return RemoveSearchResult::Found { ki, kp };
}
} else {
if slot_keylenx >= LAYER_KEYLENX {
if key.has_suffix() {
let layer_ptr: *mut u8 = leaf.load_layer_raw(kp);
return RemoveSearchResult::DescendLayer { layer_ptr };
}
return RemoveSearchResult::NotFound;
}
if slot_keylenx == KSUF_KEYLENX {
if !key.has_suffix() {
continue;
}
leaf.prefetch_suffix();
let suffix: &[u8] = key.suffix();
if leaf.ksuf_equals(kp, suffix) {
return RemoveSearchResult::Found { ki, kp };
}
continue;
}
if search_keylenx <= IKEY_SIZE && slot_keylenx == search_keylenx {
return RemoveSearchResult::Found { ki, kp };
}
}
}
RemoveSearchResult::NotFound
}
#[inline]
fn lock_and_verify_for_remove<'t, 'g, P, A>(
tree: &'t MassTreeGeneric<P, A>,
leaf_ptr: *mut LeafNode15<P>,
ki: usize,
key: &Key<'_>,
route: &mut Route,
guard: &'g LocalGuard<'g>,
) -> RemoveLockResult<'t, 'g, P, A>
where
P: LeafPolicy,
A: TreeAllocator<P>,
{
let leaf: &LeafNode15<P> = unsafe { &*leaf_ptr };
leaf.prefetch_for_search();
let lock: LockGuard<'_> = leaf.version().lock_bounded();
if leaf.deleted_layer() {
drop(lock);
return RemoveLockResult::RestartFromRoot;
}
let new_perm: <LeafNode15<P> as TreeLeafNode<P>>::Perm = leaf.permutation();
if new_perm.size() <= ki {
drop(lock);
return RemoveLockResult::Retry;
}
let new_kp: usize = new_perm.get(ki);
leaf.prefetch_value(new_kp);
let slot_ikey: u64 = leaf.ikey(new_kp);
let slot_keylenx: u8 = leaf.keylenx(new_kp);
if slot_ikey != key.ikey() {
drop(lock);
return RemoveLockResult::Retry;
}
if slot_keylenx >= LAYER_KEYLENX {
let lp: *mut u8 = leaf.load_layer_raw(new_kp);
drop(lock);
if !Self::is_sublayer_valid(lp) {
return RemoveLockResult::NotFound;
}
return RemoveLockResult::DescendLayer { layer_ptr: lp };
}
let cursor: RemoveCursor<'_, '_, P, A> = RemoveCursor::new(
tree,
leaf_ptr,
lock,
ki,
new_kp,
std::mem::take(route),
guard,
);
RemoveLockResult::Ready(cursor)
}
#[inline(always)]
fn is_sublayer_valid(layer_ptr: *mut u8) -> bool {
unsafe { NodeVersion::is_valid_sublayer(layer_ptr) }
}
#[cold]
#[inline(never)]
fn shift_internode_down_generic<I>(inode: &I, removed_pos: usize)
where
I: TreeInternode,
{
let nkeys: usize = inode.nkeys();
debug_assert!(removed_pos > 0, "shift_down: removed_pos must be > 0");
debug_assert!(
removed_pos <= nkeys,
"shift_down: removed_pos out of bounds"
);
let count: usize = nkeys - removed_pos;
for i in 0..count {
let key: u64 = inode.ikey(removed_pos + i);
inode.set_ikey(removed_pos - 1 + i, key);
}
for i in 0..count {
let child: *mut u8 = inode.child(removed_pos + 1 + i);
inode.set_child(removed_pos + i, child);
}
#[expect(clippy::cast_possible_truncation, reason = "nkeys <= WIDTH")]
inode.set_nkeys((nkeys - 1) as u8);
}
#[cold]
#[inline(never)]
#[expect(clippy::cast_sign_loss)]
fn redirect_ikey_bounds_generic<P>(start_internode: *mut u8, old_ikey: u64, new_ikey: u64)
where
P: LeafPolicy,
{
let mut current: *mut u8 = start_internode;
let mut kp: i32 = -1;
let mut owned_lock: Option<LockGuard<'_>> = None;
loop {
let parent_result: LockedParentResult<'_> =
unsafe { Self::locked_parent_generic::<P>(current) };
let (parent_lock, parent_ptr) = match parent_result {
LockedParentResult::Locked(lock, ptr) => (lock, ptr),
LockedParentResult::NoParent | LockedParentResult::RetryExhausted => {
drop(owned_lock);
return;
}
};
if kp >= 0 {
drop(owned_lock.take());
}
let parent: &InternodeNode = unsafe { &*(parent_ptr.cast::<InternodeNode>()) };
#[expect(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
{
kp = upper_bound_internode_generic(old_ikey, parent) as i32;
}
if TreeInternode::child(parent, kp as usize) != current {
let nkeys: usize = parent.nkeys();
let mut found: bool = false;
for i in 0..=nkeys {
#[expect(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
if TreeInternode::child(parent, i) == current {
kp = i as i32;
found = true;
break;
}
}
if !found {
drop(owned_lock);
drop(parent_lock);
return;
}
}
if kp > 0 {
parent.set_ikey((kp - 1) as usize, new_ikey);
}
current = parent_ptr;
owned_lock = Some(parent_lock);
let should_continue: bool =
(kp == 0) || ((kp == 1) && TreeInternode::child(parent, 0).is_null());
if !should_continue {
drop(owned_lock);
return;
}
}
}
unsafe fn get_parent_erased<P: LeafPolicy>(node_ptr: *mut u8) -> *mut u8 {
#[expect(clippy::cast_ptr_alignment)]
let version: &NodeVersion = unsafe { &*(node_ptr.cast::<NodeVersion>()) };
Propagation::get_parent::<P>(node_ptr, version.is_leaf())
}
unsafe fn locked_parent_generic<'a, P: LeafPolicy>(
current_ptr: *mut u8,
) -> LockedParentResult<'a> {
for _ in 0..MAX_PARENT_RETRIES {
let parent_ptr: *mut u8 = unsafe { Self::get_parent_erased::<P>(current_ptr) };
if parent_ptr.is_null() {
return LockedParentResult::NoParent;
}
let parent: &InternodeNode = unsafe { &*(parent_ptr.cast::<InternodeNode>()) };
let parent_lock: LockGuard<'_> = parent.version().lock();
let current_parent: *mut u8 = unsafe { Self::get_parent_erased::<P>(current_ptr) };
if current_parent == parent_ptr {
debug_assert!(
!parent.version().is_leaf(),
"locked_parent: parent must be an internode"
);
return LockedParentResult::Locked(parent_lock, parent_ptr);
}
drop(parent_lock);
StdHint::spin_loop();
}
LockedParentResult::RetryExhausted
}
#[inline(always)]
unsafe fn set_parent_erased<P: LeafPolicy>(node_ptr: *mut u8, new_parent: *mut u8) {
#[expect(clippy::cast_ptr_alignment, reason = "Checked by caller")]
let version: &NodeVersion = unsafe { &*(node_ptr.cast::<NodeVersion>()) };
Propagation::set_parent::<P>(node_ptr, new_parent, version.is_leaf());
}
}
#[cfg(test)]
#[expect(clippy::unwrap_used, reason = "Tests")]
mod unit_tests;