#![allow(unsafe_code)]
use crossbeam_epoch::{Guard, Shared};
use crossbeam_utils::Backoff;
use crate::config::Config;
use crate::key::Key;
use crate::node::{is_child, Node, SLOT_DATA, SLOT_EMPTY, SLOT_TOMBSTONE, SLOT_WRITING};
pub fn remove<K: Key, V: Clone + Send + Sync>(
node: &Node<K, V>,
key: &K,
config: &Config,
guard: &Guard,
) -> bool {
let mut was_removed = false;
'retry: loop {
let backoff = Backoff::new();
let mut current_node = node;
let mut rebuild_candidate: Option<(&Node<K, V>, usize)> = None;
#[allow(clippy::type_complexity)]
let mut descent_snapshot: Option<(&Node<K, V>, usize, Shared<'_, Node<K, V>>)> = None;
loop {
let slot_idx = current_node.predict_slot(key);
let state = current_node.slot_state(slot_idx);
match state {
SLOT_EMPTY | SLOT_TOMBSTONE => return was_removed,
SLOT_WRITING => {
backoff.snooze();
}
SLOT_DATA => {
let k = unsafe { current_node.read_key(slot_idx) };
if k != key {
return was_removed;
}
if current_node.cas_data_to_tombstone(slot_idx) {
current_node.dec_keys();
current_node.inc_tombstones();
if let Some((parent, pidx, expected)) = descent_snapshot {
if parent.load_child(pidx, guard) != expected {
was_removed = true;
continue 'retry;
}
}
if config.auto_rebuild
&& current_node.tombstone_ratio() > config.tombstone_ratio_threshold
{
if let Some((parent, idx)) = rebuild_candidate {
crate::rebuild::try_rebuild_subtree(parent, idx, config, guard);
}
}
return true;
}
}
s if is_child(s) => {
let child_shared = current_node.load_child(slot_idx, guard);
if child_shared.tag() != 0 {
backoff.snooze();
continue;
}
if child_shared.is_null() {
return was_removed;
}
if rebuild_candidate.is_none() {
rebuild_candidate = Some((current_node, slot_idx));
}
if descent_snapshot.is_none() {
descent_snapshot = Some((current_node, slot_idx, child_shared));
}
current_node = unsafe { child_shared.deref() };
}
_ => return was_removed, }
}
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::insert;
use crate::node::Node;
use crossbeam_epoch as epoch;
fn guard() -> epoch::Guard {
epoch::pin()
}
fn cfg() -> Config {
Config::default()
}
#[test]
fn remove_existing_key() {
let g = guard();
let c = cfg();
let pairs = vec![(10u64, "a"), (20, "b"), (30, "c")];
let node = crate::build::bulk_load(&pairs, &c).unwrap();
let removed = remove(&node, &20, &c, &g);
assert!(removed);
assert_eq!(node.total_keys(&g), 2);
assert_eq!(crate::lookup::get(&node, &20, &g), None);
}
#[test]
fn remove_missing_key() {
let g = guard();
let c = cfg();
let pairs = vec![(10u64, "a"), (20, "b")];
let node = crate::build::bulk_load(&pairs, &c).unwrap();
let removed = remove(&node, &99, &c, &g);
assert!(!removed);
assert_eq!(node.total_keys(&g), 2);
}
#[test]
fn remove_all_keys() {
let g = guard();
let c = cfg();
let pairs = vec![(1u64, 'a'), (2, 'b'), (3, 'c')];
let node = crate::build::bulk_load(&pairs, &c).unwrap();
assert!(remove(&node, &1, &c, &g));
assert!(remove(&node, &2, &c, &g));
assert!(remove(&node, &3, &c, &g));
assert_eq!(node.total_keys(&g), 0);
}
#[test]
fn remove_then_reinsert() {
let g = guard();
let c = cfg();
let pairs = vec![(10u64, "a"), (20, "b")];
let node = crate::build::bulk_load(&pairs, &c).unwrap();
remove(&node, &10, &c, &g);
assert_eq!(crate::lookup::get(&node, &10, &g), None);
insert::insert(&node, 10, &"A", &c, &g);
assert_eq!(crate::lookup::get(&node, &10, &g), Some(&"A"));
}
#[test]
fn remove_idempotent() {
let g = guard();
let c = cfg();
let pairs = vec![(5u64, "x")];
let node = crate::build::bulk_load(&pairs, &c).unwrap();
assert!(remove(&node, &5, &c, &g));
assert!(!remove(&node, &5, &c, &g));
}
#[test]
fn remove_from_child_node() {
let g = guard();
let c = cfg();
let pairs: Vec<(u64, &str)> = vec![(10, "a"), (20, "b")];
let node = crate::build::bulk_load(&pairs, &c).unwrap();
insert::insert(&node, 15, &"c", &c, &g);
assert_eq!(node.total_keys(&g), 3);
assert!(remove(&node, &15, &c, &g));
assert_eq!(crate::lookup::get(&node, &15, &g), None);
}
#[test]
fn tombstone_compaction_preserves_remaining_keys() {
let g = guard();
let c = Config::new()
.auto_rebuild(true)
.tombstone_ratio_threshold(0.3);
let root = Node::<u64, u64>::with_capacity(crate::model::LinearModel::new(0.01, 0.0), 16);
for i in 0..40u64 {
insert::insert(&root, i, &i, &c, &g);
}
assert_eq!(root.total_keys(&g), 40);
for i in 0..30u64 {
remove(&root, &i, &c, &g);
}
for i in 30..40u64 {
assert_eq!(
crate::lookup::get(&root, &i, &g),
Some(&i),
"key {i} lost after tombstone compaction"
);
}
}
}