use crate::{
changes::Changes, keys::InfiniteKeys, min_keys, nearest_node, put::propagate_changes_up_tree,
wchildren, Child, HyperbeeError, KeyValue, KeyValueData, NodePath, SharedNode, Tree, MAX_KEYS,
};
use tracing::info;
use Side::{Left, Right};
#[derive(Debug)]
enum Side {
Left,
Right,
}
impl Side {
async fn replace_removed_internal_key_with_a_key_from_leaf(
path: &mut NodePath,
node: SharedNode,
key_index: usize,
) -> Result<(), HyperbeeError> {
let left_child_index = key_index;
let right_child_index = left_child_index + 1;
let left_child = node.read().await.get_child(left_child_index).await?;
let (_, left_path) = nearest_node(left_child, &InfiniteKeys::Positive).await?;
let num_left_keys = left_path.last().unwrap().0.read().await.keys.len();
let right_child = node.read().await.get_child(right_child_index).await?;
let (_, right_path) = nearest_node(right_child, &InfiniteKeys::Negative).await?;
let num_right_keys = right_path.last().unwrap().0.read().await.keys.len();
let (child_index, replacement_key, mut path_to_bottom) = if num_left_keys < num_right_keys {
let donor = right_path.last().unwrap().0.write().await.keys.remove(0);
(right_child_index, donor, right_path)
} else {
let donor = left_path
.last()
.unwrap()
.0
.write()
.await
.keys
.pop()
.unwrap();
(left_child_index, donor, left_path)
};
node.write().await.keys.insert(key_index, replacement_key);
path.push((node.clone(), child_index));
path.append(&mut path_to_bottom);
Ok(())
}
async fn get_donor_index(&self, father: SharedNode, deficient_index: usize) -> Option<usize> {
match *self {
Left => match deficient_index == 0 {
true => None,
false => Some(deficient_index - 1),
},
Right => match deficient_index + 1 >= father.read().await.n_children().await {
true => None,
false => Some(deficient_index + 1),
},
}
}
fn get_key_index(&self, deficient_index: usize) -> usize {
match self {
Right => deficient_index,
Left => deficient_index - 1,
}
}
async fn get_donor_key(&self, donor: SharedNode) -> KeyValue {
match self {
Right => donor.write().await.keys.remove(0),
Left => donor
.write()
.await
.keys
.pop()
.expect("keys should not be empty"),
}
}
async fn get_donor_child(&self, donor: SharedNode) -> Option<Child> {
if donor.read().await.is_leaf().await {
return None;
}
Some(match self {
Right => wchildren!(donor).remove(0),
Left => wchildren!(donor).pop().expect("node is not a leaf"),
})
}
async fn swap_donor_key_in_father(
&self,
father: SharedNode,
deficient_index: usize,
key: KeyValue,
) -> KeyValue {
let key_index = self.get_key_index(deficient_index);
father
.write()
.await
.keys
.splice(key_index..=key_index, vec![key])
.collect::<Vec<KeyValue>>()
.pop()
.expect("one val removed in splice")
}
async fn insert_donations_into_deficient_child(
&self,
deficient_child: SharedNode,
key: KeyValue,
child: Option<Child>,
) {
match self {
Right => {
deficient_child.write().await.keys.push(key);
if let Some(child) = child {
wchildren!(deficient_child).push(child);
}
}
Left => {
deficient_child.write().await.keys.insert(0, key);
if let Some(child) = child {
wchildren!(deficient_child).insert(0, child);
}
}
}
}
async fn can_rotate(
&self,
father: SharedNode,
deficient_index: usize,
order: usize,
) -> Result<Option<(usize, SharedNode)>, HyperbeeError> {
let Some(donor_index) = self.get_donor_index(father.clone(), deficient_index).await else {
return Ok(None);
};
let donor_child = father.read().await.get_child(donor_index).await?;
let can_rotate = min_keys(order) < donor_child.read().await.keys.len();
if can_rotate {
Ok(Some((donor_index, donor_child)))
} else {
Ok(None)
}
}
#[tracing::instrument(skip(self, father, changes))]
async fn rotate(
&self,
father: SharedNode,
deficient_index: usize,
deficient_child: SharedNode,
donor_index: usize,
donor: SharedNode,
changes: &mut Changes,
) -> Result<SharedNode, HyperbeeError> {
let donated_key = self.get_donor_key(donor.clone()).await;
let donated_child = self.get_donor_child(donor.clone()).await;
let donated_key_from_father = self
.swap_donor_key_in_father(father.clone(), deficient_index, donated_key)
.await;
self.insert_donations_into_deficient_child(
deficient_child.clone(),
donated_key_from_father,
donated_child,
)
.await;
wchildren!(father)[donor_index] = changes.add_node(donor);
wchildren!(father)[deficient_index] = changes.add_node(deficient_child);
Ok(father)
}
async fn maybe_rotate(
&self,
father: SharedNode,
deficient_index: usize,
deficient_child: SharedNode,
order: usize,
changes: &mut Changes,
) -> Result<Option<SharedNode>, HyperbeeError> {
let Some((donor_index, donor_child)) = self
.can_rotate(father.clone(), deficient_index, order)
.await?
else {
return Ok(None);
};
Ok(Some(
self.rotate(
father,
deficient_index,
deficient_child,
donor_index,
donor_child,
changes,
)
.await?,
))
}
#[tracing::instrument(skip(self, father, changes))]
async fn maybe_merge(
&self,
father: SharedNode,
deficient_index: usize,
deficient_child: SharedNode,
changes: &mut Changes,
) -> Result<Option<SharedNode>, HyperbeeError> {
let Some(donor_index) = self.get_donor_index(father.clone(), deficient_index).await else {
return Ok(None);
};
let (left, right) = {
let donor_child = father.read().await.get_child(donor_index).await?;
match self {
Right => (deficient_child, donor_child),
Left => (donor_child, deficient_child),
}
};
let key_index = self.get_key_index(deficient_index);
let donated_key_from_father = father.write().await.keys.remove(key_index);
let right_child_index = match self {
Right => deficient_index + 1,
Left => deficient_index,
};
wchildren!(father).splice(right_child_index..=right_child_index, vec![]);
let n_left_keys = left.read().await.keys.len();
let mut keys_to_add = vec![donated_key_from_father];
keys_to_add.append(&mut right.write().await.keys);
left.write()
.await
.keys
.splice(n_left_keys..n_left_keys, keys_to_add);
let n_left_children = left.read().await.n_children().await;
wchildren!(left).splice(
n_left_children..n_left_children,
wchildren!(right).drain(..),
);
info!("add merged nodes father changes: {left:#?}");
let left_ref = changes.add_node(left.clone());
wchildren!(father).splice((right_child_index - 1)..(right_child_index), vec![left_ref]);
Ok(Some(father))
}
}
#[tracing::instrument(skip(father, changes))]
async fn repair_one(
father: SharedNode,
deficient_index: usize,
deficient_child: SharedNode,
order: usize,
changes: &mut Changes,
) -> Result<SharedNode, HyperbeeError> {
if let Some(res) = Left
.maybe_rotate(
father.clone(),
deficient_index,
deficient_child.clone(),
order,
changes,
)
.await?
{
info!("rotated from left");
return Ok(res);
}
if let Some(res) = Right
.maybe_rotate(
father.clone(),
deficient_index,
deficient_child.clone(),
order,
changes,
)
.await?
{
info!("rotated from right");
return Ok(res);
}
if let Some(res) = Left
.maybe_merge(
father.clone(),
deficient_index,
deficient_child.clone(),
changes,
)
.await?
{
info!("merged from left");
return Ok(res);
}
if let Some(res) = Right
.maybe_merge(
father.clone(),
deficient_index,
deficient_child.clone(),
changes,
)
.await?
{
info!("merged from right");
return Ok(res);
}
panic!("this should never happen");
}
#[tracing::instrument(skip(path, changes))]
async fn repair(
path: &mut NodePath,
order: usize,
changes: &mut Changes,
) -> Result<Child, HyperbeeError> {
let (mut father, mut deficient_index) =
path.pop().expect("path.len() > 0 should be checked before");
let mut deficient_child = father.read().await.get_child(deficient_index).await?;
let father_ref = loop {
let father_with_repaired_child =
repair_one(father, deficient_index, deficient_child, order, changes).await?;
if path.is_empty()
&& father_with_repaired_child.read().await.keys.is_empty()
&& father_with_repaired_child.read().await.n_children().await == 1
{
info!("\nrepair removed all keys from root. Replacing root with it's child");
break father_with_repaired_child
.read()
.await
.children
.children
.read()
.await[0]
.clone();
}
if path.is_empty() || father_with_repaired_child.read().await.keys.len() >= min_keys(order)
{
break changes.add_node(father_with_repaired_child);
}
deficient_child = father_with_repaired_child;
(father, deficient_index) = path.pop().expect("path.is_empty() checked above");
};
Ok(father_ref)
}
fn cas_always_true(_kv: &KeyValueData) -> bool {
true
}
impl Tree {
pub async fn del_compare_and_swap(
&self,
key: &[u8],
compare_and_swap: impl FnOnce(&KeyValueData) -> bool,
) -> Result<Option<(bool, u64)>, HyperbeeError> {
let Some(root) = self.get_root(false).await? else {
return Ok(None);
};
let (matched, mut path) = nearest_node(root.clone(), key).await?;
let Some(seq) = matched else {
return Ok(None);
};
{
let len = path.len();
let (node, index) = &path[len - 1];
let kv = node.read().await.get_key_value(*index).await?;
if !compare_and_swap(&kv) {
return Ok(Some((false, seq)));
}
}
let mut changes: Changes = Changes::new(self.version().await, key, None);
let (cur_node, cur_index) = path
.pop()
.expect("nearest_node always returns at least one node");
cur_node.write().await.keys.remove(cur_index);
if cur_node.read().await.is_leaf().await {
info!("deleted key from leaf");
path.push((cur_node.clone(), cur_index));
} else {
info!("deleted key from internal node");
Side::replace_removed_internal_key_with_a_key_from_leaf(&mut path, cur_node, cur_index)
.await?;
};
let (bottom_node, _) = path.pop().expect("if/else above ensures path is not empty");
let child_ref =
if !path.is_empty() && bottom_node.read().await.keys.len() < min_keys(MAX_KEYS) {
info!("del requires a rebalance");
repair(&mut path, MAX_KEYS, &mut changes).await?
} else {
info!("del does not require a rebalance");
changes.add_node(bottom_node.clone())
};
if !path.is_empty() {
info!("propagating changes");
propagate_changes_up_tree(&mut changes, path, child_ref).await;
};
self.blocks.read().await.add_changes(changes).await?;
Ok(Some((true, seq)))
}
pub async fn del(&self, key: &[u8]) -> Result<Option<u64>, HyperbeeError> {
let Some((deleted, seq)) = self.del_compare_and_swap(key, cas_always_true).await? else {
return Ok(None);
};
if deleted {
return Ok(Some(seq));
}
panic!("cas_always_true implies `deleted` always true");
}
}
#[cfg(test)]
mod test {
use crate::{
test::{check_tree, i32_key_vec, Rand},
Hyperbee, Tree,
};
#[tokio::test]
async fn empty_tree_no_key() -> Result<(), Box<dyn std::error::Error>> {
let hb = Tree::from_ram().await?;
let key = vec![1];
let res = hb.del(&key).await?;
assert!(res.is_none());
Ok(())
}
#[tokio::test]
async fn no_key() -> Result<(), Box<dyn std::error::Error>> {
let (hb, ..) = crate::test::hb_put!(0..10).await?;
let key = vec![1];
let res = hb.del(&key).await?;
assert!(res.is_none());
Ok(())
}
#[tokio::test]
async fn delete_from_root_that_is_leaf() -> Result<(), Box<dyn std::error::Error>> {
let (hb, keys) = crate::test::hb_put!(0..4).await?;
let k = &keys[0].clone();
let res = hb.del(k).await?;
assert!(res.is_some());
let res = hb.get(k).await?;
assert_eq!(res, None);
let res = hb.get(&keys[1].clone()).await?;
assert!(res.is_some());
check_tree(hb).await?;
Ok(())
}
#[tokio::test]
async fn delete_from_leaf_no_underflow() -> Result<(), Box<dyn std::error::Error>> {
let (hb, keys) = crate::test::hb_put!(0..10).await?;
let k = &keys.last().unwrap().clone();
let res = hb.del(k).await?;
assert!(res.is_some());
let res = hb.get(k).await?;
assert_eq!(res, None);
check_tree(hb).await?;
Ok(())
}
#[tokio::test]
async fn delete_last_key() -> Result<(), Box<dyn std::error::Error>> {
let (hb, keys) = crate::test::hb_put!(0..1).await?;
let k = &keys.last().unwrap().clone();
let res = hb.del(k).await?;
assert!(res.is_some());
let res = hb.get(k).await?;
assert_eq!(res, None);
check_tree(hb).await?;
Ok(())
}
#[tokio::test]
async fn delete_from_leaf_with_underflow_rotate_left() -> Result<(), Box<dyn std::error::Error>>
{
let (hb, keys) = crate::test::hb_put!(0..6).await?;
let k = keys[0].clone();
let res = hb.del(&k).await?;
assert!(res.is_some());
let res = hb.get(&k).await?;
assert_eq!(res, None);
check_tree(hb).await?;
Ok(())
}
#[tokio::test]
async fn delete_from_leaf_with_underflow_rotate_right() -> Result<(), Box<dyn std::error::Error>>
{
let (hb, keys) = crate::test::hb_put!(&[1, 2, 3, 4, 5, 0]).await?;
let k = keys[keys.len() - 2].clone();
let res = hb.del(&k).await?;
assert!(res.is_some());
let res = hb.get(&k).await?;
assert_eq!(res, None);
check_tree(hb).await?;
Ok(())
}
#[tokio::test]
async fn delete_from_leaf_with_underflow_merge_left() -> Result<(), Box<dyn std::error::Error>>
{
let (hb, keys) = crate::test::hb_put!(0..5).await?;
let k = keys[0].clone();
let res = hb.del(&k).await?;
assert!(res.is_some());
let res = hb.get(&k).await?;
assert_eq!(res, None);
check_tree(hb).await?;
Ok(())
}
#[tokio::test]
async fn delete_from_internal_no_underflow() -> Result<(), Box<dyn std::error::Error>> {
let (hb, keys) = crate::test::hb_put!(0..19).await?;
let k = keys[5].clone();
let res = hb.del(&k).await?;
assert!(res.is_some());
let res = hb.get(&k).await?;
assert_eq!(res, None);
check_tree(hb).await?;
Ok(())
}
#[tokio::test]
async fn delete_from_internal_node_with_underflow_merge(
) -> Result<(), Box<dyn std::error::Error>> {
let (hb, keys) = crate::test::hb_put!(0..19).await?;
let k = keys[10].clone();
let res = hb.del(&k).await?;
assert!(res.is_some());
let res = hb.get(&k).await?;
assert_eq!(res, None);
check_tree(hb).await?;
Ok(())
}
#[tokio::test]
async fn delete_from_internal_node_with_underflow_rotate(
) -> Result<(), Box<dyn std::error::Error>> {
let (hb, keys) = crate::test::hb_put!(0..25).await?;
let k = keys[10].clone();
let res = hb.del(&k).await?;
assert!(res.is_some());
let res = hb.get(&k).await?;
assert_eq!(res, None);
check_tree(hb).await?;
Ok(())
}
#[tokio::test]
async fn bug_where_root_was_not_getting_replaced() -> Result<(), Box<dyn std::error::Error>> {
let (hb, keys) = crate::test::hb_put!(0..5).await?;
for k in keys.iter() {
hb.del(k).await?;
let res = hb.get(k).await?;
assert_eq!(res, None);
check_tree(hb.clone()).await?;
}
Ok(())
}
#[tokio::test]
async fn rand_delete() -> Result<(), Box<dyn std::error::Error>> {
let rand = Rand::default();
let hb = Tree::from_ram().await?;
let keys: Vec<Vec<u8>> = (0..100).map(i32_key_vec).collect();
let keys = rand.shuffle(keys);
for k in keys.iter() {
let val: Option<&[u8]> = Some(k);
hb.put(k, val).await?;
}
for k in rand.shuffle(keys).iter() {
hb.del(k).await?;
let res = hb.get(k).await?;
assert_eq!(res, None);
check_tree(hb.clone()).await?;
}
Ok(())
}
#[tokio::test]
async fn test_del_compare_and_swap() -> Result<(), Box<dyn std::error::Error>> {
let hb = Hyperbee::from_ram().await?;
let k = b"foo";
let res = hb.del_compare_and_swap(k, |_old| false).await?;
assert_eq!(res, None);
let _ = hb.put(k, None).await?;
let res = hb.del_compare_and_swap(b"no_key", |_| false).await?;
assert_eq!(res, None);
let res = hb.del_compare_and_swap(b"no_key", |_| true).await?;
assert_eq!(res, None);
let res = hb.del_compare_and_swap(k, |_old| false).await?;
assert_eq!(res, Some((false, 1)));
assert!(hb.get(k).await?.is_some());
let res = hb.del_compare_and_swap(k, |_old| true).await?;
assert_eq!(res, Some((true, 1)));
assert!(hb.get(k).await?.is_none());
Ok(())
}
}