use crate::{
rc, Cursor, Db, Error, MutTxn, PageCursor, PageItem, PageT, Representable, Transaction,
MAX_RECORD_SIZE, N_CURSORS, PAGE_SIZE_I16, PAGE_SIZE_U16,
};
use crate::skiplist;
use crate::skiplist::{record_size, FIRST_BINDING_SIZE, NIL, SKIPLIST_ROOT};
use crate::transaction::{Cow, Env, Exclusive, MutPage};
use rand::Rng;
use std;
use std::borrow::Borrow;
#[derive(Debug)]
enum Deleted<K, V> {
Merged {
page: u64,
left_free: u64,
right_free: u64,
middle_ptr: *const u8,
former_binding_len: u16,
},
Rebalanced {
key: K,
value: V,
left: u64,
right: u64,
middle_ptr: *const u8,
former_binding_len: u16,
},
Split {
key: K,
value: V,
left: u64,
right: u64,
old_page: u64,
},
Leaf {
deleted_ptr: *const u8,
former_binding_len: u16,
},
Updated {
page: u64,
replaced_by: u64,
},
}
impl<K: Representable, V: Representable> Deleted<K, V> {
fn size_difference(&self) -> i16 {
match *self {
Deleted::Merged {
former_binding_len, ..
}
| Deleted::Leaf {
former_binding_len, ..
} => -(former_binding_len as i16),
Deleted::Rebalanced {
former_binding_len,
key,
value,
..
} => record_size(key, value) as i16 - (former_binding_len as i16),
Deleted::Split { key, value, .. } => record_size(key, value) as i16,
Deleted::Updated { .. } => 0,
}
}
}
#[derive(Debug)]
struct Delete<K, V> {
cursor: Cursor,
last_operation: Deleted<K, V>,
free_stack: [[u64; 2]; N_CURSORS],
rc_level: usize,
}
impl<K, V> std::ops::Index<usize> for Delete<K, V> {
type Output = PageCursor;
fn index(&self, i: usize) -> &PageCursor {
self.cursor.stack.index(i)
}
}
impl<K, V> std::ops::IndexMut<usize> for Delete<K, V> {
fn index_mut(&mut self, i: usize) -> &mut PageCursor {
self.cursor.stack.index_mut(i)
}
}
impl<K, V> Delete<K, V> {
fn new(cursors: Cursor) -> Self {
Delete {
cursor: cursors,
last_operation: Deleted::Updated {
page: 0,
replaced_by: 0,
},
free_stack: [[0; 2]; N_CURSORS],
rc_level: 200,
}
}
fn current(&mut self) -> &mut PageCursor {
&mut self.cursor.stack[self.cursor.pointer]
}
}
#[derive(Copy, Clone, Debug)]
struct Replacement<K, V> {
page: u64,
k: K,
v: V,
deleted_ptr: *const u8,
deleted_len: u16,
}
#[derive(Debug, Clone, Copy)]
enum ReplOn {
None,
Left,
Right,
Middle,
}
impl<E: Borrow<Env<Exclusive>>, T> MutTxn<E, T> {
fn find_smallest_descendant<K, V>(&mut self, del: &mut Delete<K, V>) -> Result<(), Error> {
let mut page = self.load_cow_page(del.current().page)?;
let next = skiplist::next_at_level(&page, del.current()[0], 0);
let right_page = skiplist::right_child(&page, next);
if right_page > 0 {
del.cursor.pointer += 1;
del.current().page = right_page;
del.current().cursor.reset();
loop {
page = self.load_cow_page(del.current().page)?;
if del.cursor.first_rc_level >= N_CURSORS
&& rc(self, del[del.cursor.pointer].page)? >= 2
{
debug!(
"first_rc_level: {:?} {:?}",
del.cursor.pointer, del.cursor.stack[del.cursor.pointer].page
);
del.cursor.first_rc_level = del.cursor.pointer
}
let left_page = skiplist::right_child(&page, SKIPLIST_ROOT);
if left_page == 0 {
break;
} else {
del.cursor.pointer += 1;
del.current().page = left_page;
del.current().cursor.reset()
}
}
}
Ok(())
}
pub fn del<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
db: &mut Db<K, V>,
key: K,
value: Option<V>,
) -> Result<bool, Error> {
info!("del starting");
let mut del = {
let (stack, found) = self.set_cursors(db, Some((key, value)))?;
if !found {
return Ok(false);
}
Delete::new(stack)
};
let cursor_pointer0 = del.cursor.pointer;
debug!("stack {:?}", &del.cursor.stack[..=del.cursor.pointer]);
self.find_smallest_descendant(&mut del)?;
let page = self.load_cow_page(del.current().page)?;
debug!(
"cursor_pointer: {:?} {:?}",
del.cursor.pointer, cursor_pointer0
);
if del.cursor.pointer == cursor_pointer0 {
let next = skiplist::next_at_level(&page, del.current()[0], 0);
let (key, value) = unsafe { skiplist::read_key_value::<_, K, V>(&page, next) };
info!("deleting at leaf: {:?} {:?}", key, value,);
del.last_operation = Deleted::Leaf {
deleted_ptr: page.offset(next as isize),
former_binding_len: record_size(key, value),
};
if cursor_pointer0 < del.cursor.first_rc_level {
info!("decr_value {:?} {:?}", key, value);
key.drop_value(self, rng)?;
value.drop_value(self, rng)?;
info!("/decr_value");
}
del.cursor.pointer -= 1;
while del.cursor.pointer >= 1 {
self.merge_or_rebalance(rng, &mut del, None)?;
del.cursor.pointer -= 1;
}
self.update_del(rng, &mut del, None)?;
self.update_root(rng, db, &mut del)?;
} else {
let first = skiplist::next_at_level(&page, SKIPLIST_ROOT, 0);
let (key, value) = unsafe { skiplist::read_key_value::<_, K, V>(&page, first) };
info!(
"deleting internal key, replacement size: {:?} {:?}",
key.onpage_size(),
value.onpage_size()
);
del.last_operation = Deleted::Leaf {
deleted_ptr: page.offset(first as isize),
former_binding_len: record_size(key, value),
};
if del.cursor.pointer >= del.cursor.first_rc_level {
for offset in key.page_offsets() {
self.incr_rc(rng, offset)?
}
for offset in value.page_offsets() {
self.incr_rc(rng, offset)?
}
}
let repl = {
let page0 = self.load_cow_page(del[cursor_pointer0].page)?;
debug!("reading at {:?}", del[cursor_pointer0][0]);
let ptr0 = skiplist::next_at_level(&page0, del[cursor_pointer0][0], 0);
let (k0, v0) = { unsafe { skiplist::read_key_value::<_, K, V>(&page0, ptr0) } };
info!(
"deleting k0, v0: {:?} {:?}",
k0.onpage_size(),
v0.onpage_size()
);
if cursor_pointer0 < del.cursor.first_rc_level {
info!("decr_value {:?} {:?}", key, value);
k0.drop_value(self, rng)?;
v0.drop_value(self, rng)?;
info!("/decr_value");
}
Replacement {
page: del[cursor_pointer0].page,
k: key,
v: value,
deleted_ptr: page0.offset(ptr0 as isize),
deleted_len: record_size(k0, v0),
}
};
debug!(
"Internal, updating {:?}",
&del.cursor.stack[..=del.cursor.pointer]
);
del.cursor.pointer -= 1;
while del.cursor.pointer >= 1 {
info!(
"merge_or_rebalance, del.cursor.pointer = {:?}",
del.cursor.pointer
);
self.merge_or_rebalance(rng, &mut del, Some(repl))?;
del.cursor.pointer -= 1;
}
debug!("Internal, done updating");
self.update_del(rng, &mut del, Some(repl))?;
self.update_root(rng, db, &mut del)?;
}
debug!("stack {:?}", del.cursor.stack[1]);
unsafe {
let level = del.cursor.first_rc_level + 1;
for freed in del.free_stack.iter().take(level) {
if freed[0] > 0 {
self.rc_decr_del(rng, freed[0])?;
if freed[1] > 0 {
assert!(freed[0] != freed[1]);
self.rc_decr_del(rng, freed[1])?
}
}
}
}
Ok(true)
}
fn merge_or_rebalance<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
del: &mut Delete<K, V>,
replacement: Option<Replacement<K, V>>,
) -> Result<(), Error> {
debug!("merge or rebalance");
let mut cursor_was_moved_backwards = false;
let page = self.load_cow_page(del.current().page)?;
debug!("page = {:?}", page);
let mut next = skiplist::next_at_level(&page, del.current()[0], 0);
if next == NIL {
cursor_was_moved_backwards = true;
debug!("moving backwards in page {:?}", page);
let backwards = skiplist::move_cursor_backward(&page, &mut del.current().cursor);
assert!(backwards);
next = skiplist::next_at_level(&page, del.current()[0], 0)
}
let page_a = skiplist::right_child(&page, del.current()[0]);
let page_a = self.load_cow_page(page_a)?;
let page_b = skiplist::right_child(&page, next);
let page_b = self.load_cow_page(page_b)?;
let next_ptr: *const u8 = page.offset(next as isize);
let (middle_key, middle_value) = unsafe { skiplist::read_key_value(&page, next) };
let middle_size = record_size(middle_key, middle_value);
let (repl_pos, repl_diff) = match (&del.last_operation, replacement) {
(_, Some(repl)) if repl.deleted_ptr == next_ptr => {
let diff = record_size(repl.k, repl.v) as i16 - repl.deleted_len as i16;
(ReplOn::Middle, diff)
}
(Deleted::Merged { .. }, _) | (Deleted::Rebalanced { .. }, _) => {
(ReplOn::None, 0)
}
(_, Some(repl)) => {
let diff = record_size(repl.k, repl.v) as i16 - repl.deleted_len as i16;
if repl.page == page_a.page_offset() {
(ReplOn::Left, diff)
} else if repl.page == page_b.page_offset() {
(ReplOn::Right, diff)
} else {
(ReplOn::None, 0)
}
}
_ => {
(ReplOn::None, 0)
}
};
let new_total_len = {
(skiplist::occupied(&page_a) + skiplist::occupied(&page_b) + middle_size) as i16
+ del.last_operation.size_difference()
+ repl_diff
- FIRST_BINDING_SIZE as i16
};
debug!(
"page_a = {:?}, page_b = {:?}, middle = {:?}, last.size_diff = {:?}, repl_diff = {:?}",
skiplist::occupied(&page_a),
skiplist::occupied(&page_b),
middle_size,
del.last_operation.size_difference(),
repl_diff
);
debug!("repl = {:?} {:?}", repl_pos, repl_diff);
debug!("last_operation = {:?}", del.last_operation);
if new_total_len as u16 <= PAGE_SIZE_U16 {
let new_page = self.merge(
rng,
page_a,
page_b,
next_ptr,
middle_key,
middle_value,
del,
replacement,
)?;
let new_page = self.load_cow_page(new_page)?;
assert_eq!(skiplist::occupied(&new_page), new_total_len as u16);
return Ok(());
}
let (new_a, new_b) = if cursor_was_moved_backwards {
(
skiplist::occupied(&page_a) as i16
+ if let ReplOn::Left = repl_pos {
repl_diff
} else {
0
},
skiplist::occupied(&page_b) as i16
+ if let ReplOn::Right = repl_pos {
repl_diff
} else {
0
}
+ del.last_operation.size_difference(),
)
} else {
(
skiplist::occupied(&page_a) as i16
+ if let ReplOn::Left = repl_pos {
repl_diff
} else {
0
}
+ del.last_operation.size_difference(),
skiplist::occupied(&page_b) as i16
+ if let ReplOn::Right = repl_pos {
repl_diff
} else {
0
},
)
};
if new_a as u16 <= (MAX_RECORD_SIZE + FIRST_BINDING_SIZE)
|| new_b as u16 <= (MAX_RECORD_SIZE + FIRST_BINDING_SIZE)
{
self.rebalance(
rng,
page_a,
page_b,
next_ptr,
middle_key,
middle_value,
new_total_len as u16 + FIRST_BINDING_SIZE,
del,
replacement,
)?;
return Ok(());
}
if cursor_was_moved_backwards {
let forward = skiplist::move_cursor_forward(&page, &mut del.current().cursor);
assert!(forward)
}
self.update_del(rng, del, replacement)
}
fn update_root<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
db: &mut Db<K, V>,
del: &mut Delete<K, V>,
) -> Result<(), Error> {
debug!("UPDATE ROOT: {:?}", del.last_operation);
debug!("stack: {:?}", &del.cursor.stack[..(del.cursor.pointer + 1)]);
match del.last_operation {
Deleted::Split {
key,
value,
left,
right,
..
} => {
db.0 = self.split_root(rng, &mut del.cursor, left, right, key, value)?;
debug!("Split {:?}", db.0);
}
Deleted::Updated { replaced_by, .. } => {
db.0 = replaced_by;
let root = self.load_cow_page(db.0)?;
let next = skiplist::next_at_level(&root, SKIPLIST_ROOT, 0);
debug!(
"next = {:?} {:?}",
root,
skiplist::next_at_level(&root, next, 0)
);
if skiplist::next_at_level(&root, SKIPLIST_ROOT, 0) == NIL {
let right = skiplist::right_child(&root, SKIPLIST_ROOT);
debug!("freeing");
if right > 0 {
del.free_stack[1][0] = del[1].page;
db.0 = right
}
}
}
_ => {
}
}
Ok(())
}
fn update_del<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
del: &mut Delete<K, V>,
replacement: Option<Replacement<K, V>>,
) -> Result<(), Error> {
let mut page = self.load_cow_page(del[del.cursor.pointer + 1].page)?;
debug!(
"pointers/rc: {:?} {:?}",
del.cursor.pointer, del.cursor.first_rc_level
);
debug!("update_del {:?}", page);
match page {
Cow::MutPage(ref mut p) if del.cursor.pointer + 1 < del.cursor.first_rc_level => {
debug!("mut page, no RC");
self.update_mut(rng, del, replacement, p)?
}
Cow::MutPage(ref p) => {
debug!("mut page, RC");
self.update_const(rng, del, replacement, p)?
}
Cow::Page(ref p) => {
debug!("const page");
self.update_const(rng, del, replacement, p)?
}
}
Ok(())
}
fn update_mut<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
del: &mut Delete<K, V>,
replacement: Option<Replacement<K, V>>,
p: &mut MutPage,
) -> Result<(), Error> {
match del.last_operation {
Deleted::Leaf { .. } => {
debug_assert_eq!(p.offset, del[del.cursor.pointer + 1].page);
self.skiplist_cursor_delete_next::<K, V>(p, &del[del.cursor.pointer + 1].cursor);
del.last_operation = Deleted::Updated {
page: 0,
replaced_by: p.page_offset(),
}
}
Deleted::Merged { page, .. } => {
debug_assert_eq!(p.offset, del[del.cursor.pointer + 1].page);
self.skiplist_cursor_delete_next::<K, V>(p, &del[del.cursor.pointer + 1].cursor);
self.set_right_child(p, del[del.cursor.pointer + 1][0], page);
del.last_operation = Deleted::Updated {
page: 0,
replaced_by: p.page_offset(),
}
}
Deleted::Updated { replaced_by, .. } => {
self.update_mut_updated(rng, del, replacement, p, replaced_by)?
}
Deleted::Rebalanced {
key,
value,
right,
left,
former_binding_len,
..
} => {
self.update_mut_rebalance(rng, del, key, value, right, left, former_binding_len, p)?
}
Deleted::Split {
key,
value,
right,
left,
..
} => {
debug!("update_mut, split, {:?} {:?}", left, right);
debug!("current = {:?}", del.cursor.pointer);
let size_diff = record_size(key, value) as i16
+ (if let Some(repl) = replacement {
if repl.page == p.offset {
record_size(repl.k, repl.v) as i16 - repl.deleted_len as i16
} else {
0
}
} else {
0
});
if skiplist::occupied(p) as i16 + size_diff < PAGE_SIZE_I16 {
if skiplist::first_free(p) as i16 + size_diff < PAGE_SIZE_I16 {
debug_assert_eq!(p.offset, del[del.cursor.pointer + 1].page);
{
let ref mut skip_cursor =
del.cursor.stack[del.cursor.pointer + 1].cursor;
if let Some(repl) = replacement {
if repl.page == p.offset {
self.skiplist_cursor_delete_next::<K, V>(p, &skip_cursor);
self.skiplist_insert_after_cursor(
p,
rng,
skip_cursor,
repl.k,
repl.v,
left,
);
}
}
}
self.set_right_child(p, del[del.cursor.pointer + 1][0], left);
{
let ref mut skip_cursor =
del.cursor.stack[del.cursor.pointer + 1].cursor;
self.skiplist_insert_after_cursor(
p,
rng,
skip_cursor,
key,
value,
right,
);
}
del.last_operation = Deleted::Updated {
page: 0,
replaced_by: p.page_offset(),
};
} else {
debug!("update_mut, needs compaction");
self.update_const(rng, del, replacement, p)?
}
} else {
debug!("cascading split");
self.split(rng, p, del, replacement)?;
}
}
}
Ok(())
}
fn update_const<R: Rng, P: PageT + Sized, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
del: &mut Delete<K, V>,
replacement: Option<Replacement<K, V>>,
p: &P,
) -> Result<(), Error> {
let it_size = skiplist::occupied(p) as i16
+ del.last_operation.size_difference()
+ (if let Some(repl) = replacement {
if repl.page == p.page_offset() {
record_size(repl.k, repl.v) as i16
} else {
0
}
} else {
0
});
if it_size <= PAGE_SIZE_I16 {
let new_page = {
debug!("it on page {:?}", p.page_offset());
let incr_children_rc = del.cursor.pointer + 1 >= del.cursor.first_rc_level;
debug!("{:?} {:?}", del.cursor.pointer, del.cursor.first_rc_level);
let it = skiplist::iter_all(p).map(|(a, b, c)| PageItem {
ptr: a,
kv: b,
right: c,
incr_right_rc: incr_children_rc,
incr_kv_rc: incr_children_rc,
});
let it = WithChanges::new(del, it, replacement);
self.spread_on_one_page(rng, it)?
};
debug!("new_page: {:?}", new_page);
let curs = del.cursor.pointer;
del[curs + 1].page = new_page;
debug!("curs: {:?}", &del.cursor.stack[..10]);
del.free_stack[del.cursor.pointer + 1][0] = p.page_offset();
del.last_operation = Deleted::Updated {
page: p.page_offset(),
replaced_by: new_page,
};
Ok(())
} else {
debug!("update_const, splitting");
self.split(rng, p, del, replacement)
}
}
fn update_mut_updated<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
del: &mut Delete<K, V>,
replacement: Option<Replacement<K, V>>,
p: &mut MutPage,
replaced_by: u64,
) -> Result<(), Error> {
debug!("update_mut_no_operation, page = {:?}", p.offset);
debug!("replacement = {:?}", replacement);
if let Some(repl) = replacement {
if p.offset == repl.page {
let size = record_size(repl.k, repl.v);
if skiplist::occupied(p) - repl.deleted_len + size <= PAGE_SIZE_U16 {
if skiplist::first_free(p) + size < PAGE_SIZE_U16 {
let pcurs = del.cursor.pointer;
{
let ref mut skip_cursor = del[pcurs + 1].cursor;
self.skiplist_cursor_delete_next::<K, V>(p, &skip_cursor);
self.skiplist_insert_after_cursor(
p,
rng,
skip_cursor,
repl.k,
repl.v,
replaced_by,
);
}
del.last_operation = Deleted::Updated {
page: 0,
replaced_by: p.page_offset(),
}
} else {
debug!("compact and delete");
self.update_const(rng, del, replacement, p)?
}
} else {
self.split(rng, p, del, replacement)?;
}
return Ok(());
}
}
let cur = del[del.cursor.pointer + 1][0];
self.set_right_child(p, cur, replaced_by);
del.last_operation = Deleted::Updated {
page: 0,
replaced_by: p.page_offset(),
};
Ok(())
}
fn update_mut_rebalance<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
del: &mut Delete<K, V>,
key: K,
value: V,
right: u64,
left: u64,
former_binding_len: u16,
p: &mut MutPage,
) -> Result<(), Error> {
debug!("update_mut, rebalance");
let size = record_size(key, value);
if skiplist::occupied(p) + size - former_binding_len < PAGE_SIZE_U16 {
if skiplist::first_free(p) + size < PAGE_SIZE_U16 {
debug_assert_eq!(p.offset, del[del.cursor.pointer + 1].page);
self.skiplist_cursor_delete_next::<K, V>(p, &del[del.cursor.pointer + 1].cursor);
self.set_right_child(p, del[del.cursor.pointer + 1][0], left);
{
let ref mut skip_cursor = del.cursor.stack[del.cursor.pointer + 1].cursor;
self.skiplist_insert_after_cursor(p, rng, skip_cursor, key, value, right);
}
del.last_operation = Deleted::Updated {
page: 0,
replaced_by: p.page_offset(),
}
} else {
debug!("update_mut, needs compaction");
self.update_const(rng, del, None, p)?
}
} else {
self.split(rng, p, del, None)?
}
Ok(())
}
unsafe fn rc_decr_del<R: Rng>(&mut self, rng: &mut R, page: u64) -> Result<(), Error> {
let rc = rc(self, page)?;
debug!("rc_decr_del: {:?} {:?}", page, rc);
if rc == 1 {
self.remove_rc(rng, page)?;
self.free_page(page)
} else if rc == 0 {
self.free_page(page)
} else {
self.set_rc(rng, page, rc - 1)?
}
Ok(())
}
fn split<R: Rng, P: PageT + Sized, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
p: &P,
del: &mut Delete<K, V>,
replacement: Option<Replacement<K, V>>,
) -> Result<(), Error> {
debug!("splitting {:?}", del[del.cursor.pointer + 1].page);
debug!("last = {:?}", del.last_operation);
let (left, right, key, value) = {
let incr_children_rc = del.cursor.pointer + 1 >= del.cursor.first_rc_level;
let it = skiplist::iter_all(p).map(|(a, b, c)| PageItem {
ptr: a,
kv: b,
right: c,
incr_right_rc: incr_children_rc,
incr_kv_rc: incr_children_rc,
});
let it = WithChanges::new(del, it, replacement);
self.spread_on_two_pages(rng, it, skiplist::occupied(p))?
};
debug!("split onto {:?} and {:?}", left, right);
del.free_stack[del.cursor.pointer + 1][0] = p.page_offset();
del.last_operation = Deleted::Split {
key,
value,
left,
right,
old_page: p.page_offset(),
};
Ok(())
}
fn merge<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
page_a: Cow,
page_b: Cow,
middle_ptr: *const u8,
middle_key: K,
middle_value: V,
del: &mut Delete<K, V>,
replacement: Option<Replacement<K, V>>,
) -> Result<u64, Error> {
let incr_middle = del.cursor.pointer >= del.cursor.first_rc_level;
let incr_a_children = incr_middle || rc(self, page_a.page_offset())? >= 2;
let incr_b_children = incr_middle || rc(self, page_b.page_offset())? >= 2;
let it_a = skiplist::iter_all(&page_a).map(|(ptr, kv, right)| PageItem {
ptr,
kv,
right,
incr_right_rc: incr_a_children,
incr_kv_rc: incr_a_children,
});
let mut it_b = skiplist::iter_all(&page_b).map(|(ptr, kv, right)| PageItem {
ptr,
kv,
right,
incr_right_rc: incr_b_children,
incr_kv_rc: incr_b_children,
});
debug!(
"MERGE {:?} {:?} {:?}",
del.cursor.pointer, del.cursor.first_rc_level, middle_key
);
let b0 = it_b.next().unwrap();
debug_assert!(b0.kv.is_none());
let new_page = {
let it = it_a
.chain(std::iter::once(PageItem {
ptr: middle_ptr,
kv: Some((middle_key, middle_value)),
right: b0.right,
incr_kv_rc: incr_middle,
incr_right_rc: incr_b_children,
}))
.chain(it_b);
let it = WithChanges::new(del, it, replacement);
self.spread_on_one_page(rng, it)?
};
debug!("finished {:?}", new_page);
del.free_stack[del.cursor.pointer + 1][0] = page_a.page_offset();
del.free_stack[del.cursor.pointer + 1][1] = page_b.page_offset();
del.last_operation = Deleted::Merged {
page: new_page,
left_free: page_a.page_offset(),
right_free: page_b.page_offset(),
middle_ptr: middle_ptr,
former_binding_len: record_size(middle_key, middle_value),
};
Ok(new_page)
}
fn rebalance<R: Rng, K: Representable, V: Representable>(
&mut self,
rng: &mut R,
page_a: Cow,
page_b: Cow,
middle_ptr: *const u8,
middle_key: K,
middle_value: V,
total_child_size: u16,
del: &mut Delete<K, V>,
replacement: Option<Replacement<K, V>>,
) -> Result<(), Error> {
let incr_middle = del.cursor.pointer >= del.cursor.first_rc_level;
let incr_a_children = incr_middle || rc(self, page_a.page_offset())? >= 2;
let incr_b_children = incr_middle || rc(self, page_b.page_offset())? >= 2;
let it_a = skiplist::iter_all(&page_a).map(|(ptr, kv, right)| PageItem {
ptr,
kv,
right,
incr_right_rc: incr_a_children,
incr_kv_rc: incr_a_children,
});
let mut it_b = skiplist::iter_all(&page_b).map(|(ptr, kv, right)| PageItem {
ptr,
kv,
right,
incr_right_rc: incr_b_children,
incr_kv_rc: incr_b_children,
});
let b0 = it_b.next().unwrap();
debug_assert!(b0.kv.is_none());
debug!(
"REBALANCE {:?} {:?}",
del.cursor.pointer, del.cursor.first_rc_level
);
let (new_a, new_b, k, v) = {
let it = it_a
.chain(std::iter::once(PageItem {
ptr: middle_ptr,
kv: Some((middle_key, middle_value)),
right: b0.right,
incr_right_rc: incr_b_children,
incr_kv_rc: incr_middle,
}))
.chain(it_b);
let it = WithChanges::new(del, it, replacement);
self.spread_on_two_pages(rng, it, total_child_size)?
};
debug!(
"rebalanced {:?} {:?} onto {:?} {:?}",
page_a.page_offset(),
page_b.page_offset(),
new_a,
new_b
);
del.free_stack[del.cursor.pointer + 1][0] = page_a.page_offset();
del.free_stack[del.cursor.pointer + 1][1] = page_b.page_offset();
del.last_operation = Deleted::Rebalanced {
key: k,
value: v,
left: new_a,
right: new_b,
former_binding_len: record_size(middle_key, middle_value),
middle_ptr,
};
Ok(())
}
}
struct WithChanges<'b, I: Iterator, K: 'b, V: 'b> {
del: &'b mut Delete<K, V>,
next: Option<PageItem<K, V>>,
it: std::iter::Peekable<I>,
replacement: Option<Replacement<K, V>>,
}
impl<'b, K, V, I: Iterator> WithChanges<'b, I, K, V> {
fn new(
del: &'b mut Delete<K, V>,
it: I,
replacement: Option<Replacement<K, V>>,
) -> WithChanges<'b, I, K, V> {
WithChanges {
del: del,
next: None,
it: it.peekable(),
replacement: replacement,
}
}
}
impl<'b, K: Representable, V: Representable, P: Iterator<Item = PageItem<K, V>>> Iterator
for WithChanges<'b, P, K, V>
{
type Item = PageItem<K, V>;
fn next(&mut self) -> Option<Self::Item> {
let result = self.iter_next();
debug!("{:?}", result);
result
}
}
impl<'b, K: Representable, V: Representable, P: Iterator<Item = PageItem<K, V>>>
WithChanges<'b, P, K, V>
{
fn iter_next(&mut self) -> Option<PageItem<K, V>> {
loop {
if let Some(next) = self.next.take() {
return Some(next);
}
let mut item = if let Some(next) = self.it.next() {
next
} else {
return None;
};
if let Some(replacement) = self.replacement.take() {
if item.ptr == replacement.deleted_ptr {
item.kv = Some((replacement.k, replacement.v))
} else {
self.replacement = Some(replacement)
}
}
match self.del.last_operation {
Deleted::Rebalanced {
key,
value,
right,
left,
middle_ptr,
..
} => {
debug!(
"Rebalanced {:?} {:?} {:?} {:?} {:?}",
key, value, right, left, middle_ptr
);
if middle_ptr == item.ptr {
debug_assert!(item.kv.is_some());
return Some(PageItem {
ptr: std::ptr::null(),
kv: Some((key, value)),
right,
incr_kv_rc: false,
incr_right_rc: false,
});
} else if let Some(item_) = self.it.peek() {
if middle_ptr == item_.ptr {
return Some(PageItem {
ptr: std::ptr::null(),
kv: item.kv,
right: left,
incr_kv_rc: false,
incr_right_rc: false,
});
}
}
}
Deleted::Split {
left,
key,
value,
right,
old_page,
..
} => {
debug!("split {:?} {:?} {:?}", item.ptr, item.right, key);
if old_page == item.right {
self.next = Some(PageItem {
ptr: std::ptr::null(),
kv: Some((key, value)),
right,
incr_right_rc: false,
incr_kv_rc: false,
});
return Some(PageItem {
ptr: std::ptr::null(),
kv: item.kv,
right: left,
incr_right_rc: false,
incr_kv_rc: false,
});
}
}
Deleted::Merged {
page, middle_ptr, ..
} => {
debug!("merged {:?} {:?}", middle_ptr, item.ptr);
if item.ptr == middle_ptr {
continue;
} else if let Some(item_) = self.it.peek() {
if item_.ptr == middle_ptr {
return Some(PageItem {
ptr: std::ptr::null(),
kv: item.kv,
right: page,
incr_kv_rc: item.incr_kv_rc,
incr_right_rc: false,
});
}
}
}
Deleted::Updated { page, replaced_by } => {
debug!(
"Updated {:?} {:?} {:?}",
page, replaced_by, item.incr_right_rc
);
if item.right == page {
return Some(PageItem {
ptr: std::ptr::null(),
kv: item.kv,
right: replaced_by,
incr_kv_rc: item.incr_kv_rc,
incr_right_rc: false,
});
} else {
return Some(item);
}
}
Deleted::Leaf { deleted_ptr, .. } => {
debug!("Deleted? {:?} {:?}", item.ptr, deleted_ptr);
if item.ptr == deleted_ptr {
continue;
}
}
}
return Some(item);
}
}
}