use crate::bplustree::node::NodeId;
use crate::keyfmt::KeyBlockFormat; use crate::keyfmt::KeyFormat; use crate::keyfmt::ScratchBuf;
use crate::keyfmt::raw;
use crate::keyfmt::resolve_key_format; use crate::layout::PAGE_SIZE;
use crate::page::INTERNAL_NODE_TAG;
use crate::page::PageError;
use zerocopy::{AsBytes, FromBytes, FromZeroes};
use std::convert::TryInto;
use std::fmt;
use std::fmt::Debug;
#[inline]
fn read_u64_le(buf: &[u8]) -> u64 {
u64::from_le_bytes(buf.try_into().unwrap()) }
#[inline]
fn write_u64_le(buf: &mut [u8], off: usize, v: u64) {
let b = v.to_le_bytes();
buf[off..off + std::mem::size_of::<u64>()].copy_from_slice(&b); }
#[repr(C)]
#[derive(Clone, Copy, AsBytes, FromZeroes, FromBytes, Debug)]
pub struct Header {
kind: u8,
keyfmt_id: u8,
key_count: u16,
key_block_len: u16,
}
const CHILD_ID_SIZE: usize = core::mem::size_of::<NodeId>();
pub(crate) const HEADER_SIZE: usize = std::mem::size_of::<Header>();
pub(crate) const BUFFER_SIZE: usize = PAGE_SIZE - HEADER_SIZE;
#[repr(C)]
#[derive(Clone, Copy, AsBytes, FromZeroes, FromBytes)]
pub struct InternalPage {
header: Header,
buf: [u8; BUFFER_SIZE],
}
impl InternalPage {
pub fn new(keyfmt_id: KeyFormat) -> Self {
InternalPage {
header: Header {
kind: INTERNAL_NODE_TAG,
keyfmt_id: keyfmt_id.id(),
key_count: 0u16,
key_block_len: 0u16,
},
buf: [0u8; BUFFER_SIZE],
}
}
#[inline]
pub fn from_bytes(buf: &[u8; PAGE_SIZE]) -> Result<&Self, PageError> {
InternalPage::ref_from(buf).ok_or(PageError::FromBytesError {
msg: "Failed to convert bytes to LeafPage".to_string(),
})
}
#[inline]
#[allow(clippy::wrong_self_convention)]
pub fn to_bytes(&self) -> Result<&[u8; PAGE_SIZE], std::array::TryFromSliceError> {
let array: &[u8; PAGE_SIZE] = self.as_bytes().try_into()?;
Ok(array)
}
#[inline]
pub fn kind(&self) -> u8 {
self.header.kind
}
#[inline]
pub fn key_count(&self) -> u16 {
self.header.key_count
}
#[inline]
pub fn keyfmt_id(&self) -> u8 {
self.header.keyfmt_id
}
#[inline]
fn set_key_count(&mut self, n: u16) {
self.header.key_count = n;
}
#[inline]
fn key_block_len(&self) -> u16 {
self.header.key_block_len
}
#[inline]
fn set_key_block_len(&mut self, n: u16) {
self.header.key_block_len = n;
}
#[inline]
fn keys_start(&self) -> usize {
0
}
#[inline]
fn keys_end(&self) -> usize {
self.key_block_len() as usize
}
#[inline]
fn children_base(&self) -> usize {
self.keys_end()
}
#[inline]
fn children_len(&self) -> usize {
(self.key_count() as usize + 1) * CHILD_ID_SIZE
}
#[inline]
fn children_end(&self) -> usize {
self.children_base() + self.children_len()
}
pub fn used_bytes(&self) -> usize {
self.key_block_len() as usize + self.children_len()
}
#[inline]
fn key_block(&self) -> &[u8] {
&self.buf[self.keys_start()..self.keys_end()]
}
pub fn key_fmt(&self) -> &dyn KeyBlockFormat {
resolve_key_format(self.keyfmt_id())
.expect("unknown key format id; register it in keyfmt::resolve_key_format")
}
pub fn key_fmt_id(&self) -> u8 {
self.keyfmt_id()
}
fn key_run<'s>(&'s self) -> PageKeyRun<'s> {
PageKeyRun {
body: self.key_block(),
fmt: self.key_fmt(),
}
}
pub fn lower_bound(&self, key_enc: &[u8], scratch: &mut ScratchBuf) -> Result<usize, usize> {
self.key_run().seek(key_enc, scratch)
}
pub fn lower_bound_cmp(
&self,
key_enc: &[u8],
scratch: &mut ScratchBuf,
cmp: fn(&[u8], &[u8]) -> core::cmp::Ordering,
) -> Result<usize, usize> {
self.key_fmt()
.seek_with_cmp(self.key_block(), key_enc, scratch, cmp)
}
pub fn find_slot(&self, key_enc: &[u8], scratch: &mut ScratchBuf) -> Result<usize, usize> {
self.key_run().seek(key_enc, scratch)
}
pub fn insert_separator(
&mut self,
idx: usize,
key: &[u8],
right_child: u64,
) -> Result<(), PageError> {
if idx > self.key_count() as usize {
return Err(PageError::IndexOutOfBounds {});
}
let mut scratch = ScratchBuf::new();
let (range, repl) = self
.key_fmt()
.insert_plan(self.key_block(), idx, key, &mut scratch);
let delta_k = repl.len() as isize - (range.end - range.start) as isize;
let keys_end_old = self.keys_end();
let keys_end_new = (keys_end_old as isize + delta_k) as usize;
let children_end_new = keys_end_new + (self.key_count() as usize) * CHILD_ID_SIZE;
if children_end_new > PAGE_SIZE {
return Err(PageError::PageFull {});
}
let new_keys_end = (self.keys_end() as isize + delta_k) as usize;
let new_children_len = (self.key_count() as usize) * CHILD_ID_SIZE;
let new_used = new_keys_end + new_children_len;
if new_used > PAGE_SIZE {
return Err(PageError::PageFull {});
}
self.move_child_dir(delta_k)?;
let ks = self.keys_start();
let old_len = self.key_block_len() as usize;
let new_len = (old_len as isize + delta_k) as usize;
self.set_key_block_len(new_len as u16);
let tail_src_start = ks + range.end;
let tail_src_end = ks + old_len;
let tail_dst = (tail_src_start as isize + delta_k) as usize;
self.buf.copy_within(tail_src_start..tail_src_end, tail_dst);
let hole_start = ks + range.start;
self.buf[hole_start..hole_start + repl.len()].copy_from_slice(&repl);
self.children_shift_right_from(idx + 1);
self.write_child_at(idx + 1, right_child)?;
self.set_key_count(self.key_count() + 1);
Ok(())
}
pub fn insert_key_at(&mut self, idx: usize, key: &[u8]) -> Result<(), PageError> {
if idx > self.key_count() as usize {
return Err(PageError::IndexOutOfBounds {});
}
let mut scratch = ScratchBuf::new();
let (range, repl) = self
.key_fmt()
.insert_plan(self.key_block(), idx, key, &mut scratch);
let delta_k = repl.len() as isize - (range.end - range.start) as isize;
let keys_end_old = self.keys_end();
let keys_end_new = (keys_end_old as isize + delta_k) as usize;
let children_end_new = keys_end_new + (self.key_count() as usize) * CHILD_ID_SIZE;
if children_end_new > PAGE_SIZE {
return Err(PageError::PageFull {});
}
let new_keys_end = (self.keys_end() as isize + delta_k) as usize;
let new_children_len = (self.key_count() as usize) * CHILD_ID_SIZE;
let new_used = new_keys_end + new_children_len;
if new_used > PAGE_SIZE {
return Err(PageError::PageFull {});
}
self.move_child_dir(delta_k)?;
let ks = self.keys_start();
let old_len = self.key_block_len() as usize;
let new_len = (old_len as isize + delta_k) as usize;
self.set_key_block_len(new_len as u16);
let tail_src_start = ks + range.end;
let tail_src_end = ks + old_len;
let tail_dst = (tail_src_start as isize + delta_k) as usize;
self.buf.copy_within(tail_src_start..tail_src_end, tail_dst);
let hole_start = ks + range.start;
self.buf[hole_start..hole_start + repl.len()].copy_from_slice(&repl);
self.set_key_count(self.key_count() + 1);
Ok(())
}
pub fn append(&mut self, sep_key_enc: &[u8], right_child: u64) -> Result<(), PageError> {
let idx = self.key_count() as usize; self.insert_separator(idx, sep_key_enc, right_child)?;
Ok(())
}
pub fn push_front(&mut self, sep_key_enc: &[u8], left_child: u64) -> Result<(), PageError> {
self.insert_key_at(0, sep_key_enc)?;
self.children_shift_right_from(0);
self.write_child_at(0, left_child)?;
Ok(())
}
pub fn delete_separator(&mut self, idx: usize) -> Result<(), PageError> {
if idx >= self.key_count() as usize {
return Err(PageError::IndexOutOfBounds {});
}
let mut scratch = ScratchBuf::new();
let (range, repl) = self
.key_fmt()
.delete_plan(self.key_block(), idx, &mut scratch); let delta_k = repl.len() as isize - (range.end - range.start) as isize;
let ks = self.keys_start();
let old_len = self.key_block_len() as usize;
let new_len = (old_len as isize + delta_k) as usize;
self.children_shift_left_from(idx + 1);
let tail_src_start = ks + range.end;
let tail_src_end = ks + old_len + self.children_len(); let tail_dst_start = (tail_src_start as isize + delta_k) as usize;
self.buf
.copy_within(tail_src_start..tail_src_end, tail_dst_start);
self.set_key_block_len(new_len as u16);
self.set_key_count(self.key_count() - 1);
Ok(())
}
pub fn replace_key_at(&mut self, idx: usize, key_bytes: &[u8]) -> Result<(), PageError> {
if idx >= self.key_count() as usize {
return Err(PageError::IndexOutOfBounds {});
}
let mut scratch = ScratchBuf::new();
let kb = self.key_block(); let (range, repl) = self
.key_fmt()
.replace_plan(kb, idx, key_bytes, &mut scratch); let delta_k = repl.len() as isize - (range.end - range.start) as isize;
let keys_end_new = (self.keys_end() as isize + delta_k) as usize;
let children_end_new = keys_end_new + (self.key_count() as usize) * CHILD_ID_SIZE;
if children_end_new > PAGE_SIZE {
return Err(PageError::PageFull {});
}
let new_keys_end = (self.keys_end() as isize + delta_k) as usize;
let new_children_len = (self.key_count() as usize) * CHILD_ID_SIZE;
let new_used = new_keys_end + new_children_len;
if new_used > PAGE_SIZE {
return Err(PageError::PageFull {});
}
self.move_child_dir(delta_k)?;
let ks = self.keys_start();
let old_len = self.key_block_len() as usize;
let new_len = (old_len as isize + delta_k) as usize;
self.set_key_block_len(new_len as u16);
let tail_src_start = ks + range.end;
let tail_src_end = ks + old_len;
let tail_dst_start = (tail_src_start as isize + delta_k) as usize;
self.buf
.copy_within(tail_src_start..tail_src_end, tail_dst_start);
let hole_start = ks + range.start;
self.buf[hole_start..hole_start + repl.len()].copy_from_slice(&repl);
Ok(())
}
pub fn delete_key_at(
&mut self,
idx: usize,
scratch: &mut ScratchBuf,
) -> Result<Vec<u8>, PageError> {
let key = self
.key_fmt()
.decode_at(self.key_block(), idx, scratch)
.to_vec();
let (range, repl) = self.key_fmt().delete_plan(self.key_block(), idx, scratch);
let delta_k = repl.len() as isize - (range.end - range.start) as isize;
let ks = self.keys_start();
let old_len = self.key_block_len() as usize;
let new_len = (old_len as isize + delta_k) as usize;
let tail_src_start = ks + range.end;
let tail_src_end = ks + old_len + self.children_len(); let tail_dst_start = (tail_src_start as isize + delta_k) as usize;
self.buf
.copy_within(tail_src_start..tail_src_end, tail_dst_start);
self.set_key_block_len(new_len as u16);
self.set_key_count(self.key_count() - 1);
Ok(key)
}
pub fn pop_last_key(&mut self, scratch: &mut ScratchBuf) -> Result<Vec<u8>, PageError> {
let idx = self.key_count() as usize - 1;
self.delete_key_at(idx, scratch)
}
pub fn insert_encoded(&mut self, key: &[u8], child: u64) -> Result<(), PageError> {
let idx = match self.find_slot(key, &mut ScratchBuf::new()) {
Ok(i) => i, Err(i) => i, };
self.insert_separator(idx, key, child)
}
#[inline]
pub fn read_child_at(&self, idx: usize) -> Result<u64, PageError> {
let offset = self.children_base() + idx * CHILD_ID_SIZE;
if offset + CHILD_ID_SIZE > PAGE_SIZE {
return Err(PageError::IndexOutOfBounds {});
}
Ok(read_u64_le(&self.buf[offset..offset + CHILD_ID_SIZE]))
}
#[inline]
pub fn write_child_at(&mut self, idx: usize, child: u64) -> Result<(), PageError> {
let offset = self.children_base() + idx * CHILD_ID_SIZE;
if offset + CHILD_ID_SIZE > PAGE_SIZE {
return Err(PageError::IndexOutOfBounds {});
}
write_u64_le(&mut self.buf, offset, child);
Ok(())
}
#[inline]
pub fn write_leftmost_child(&mut self, child: u64) -> Result<(), PageError> {
self.write_child_at(0, child)
}
#[inline]
pub fn read_leftmost_child(&mut self) -> Result<u64, PageError> {
self.read_child_at(0)
}
#[inline]
pub fn replace_child_at(&mut self, idx: usize, child: u64) -> Result<(), PageError> {
self.write_child_at(idx, child)
}
#[inline]
pub fn delete_child_at(&mut self, idx: usize) -> Result<(), PageError> {
self.children_shift_left_from(idx);
Ok(())
}
fn children_shift_right_from(&mut self, from: usize) {
let base = self.children_base();
let len = self.key_count() as usize + 1; let src = base + from * CHILD_ID_SIZE;
let dst = base + (from + 1) * CHILD_ID_SIZE;
let bytes = (len - from) * CHILD_ID_SIZE;
self.buf.copy_within(src..src + bytes, dst);
}
fn children_shift_left_from(&mut self, from: usize) {
let base = self.children_base();
let len = self.key_count() as usize + 1;
let src = base + (from + 1) * CHILD_ID_SIZE;
let dst = base + from * CHILD_ID_SIZE;
let bytes = (len - from) * CHILD_ID_SIZE;
self.buf.copy_within(src..src + bytes, dst);
}
fn move_child_dir(&mut self, delta_k: isize) -> Result<(), PageError> {
if delta_k == 0 {
return Ok(());
}
let base = self.children_base();
let end = self.children_end();
if delta_k > 0 {
let dk = delta_k as usize;
if end + dk > PAGE_SIZE {
return Err(PageError::PageFull {});
}
self.buf.copy_within(base..end, base + dk);
} else {
let dk = (-delta_k) as usize;
let dst = base.saturating_sub(dk);
self.buf.copy_within(base..end, dst);
}
Ok(())
}
#[inline]
pub fn get_key_at(&self, idx: usize, scratch: &mut ScratchBuf) -> Result<&[u8], PageError> {
if idx >= self.key_count() as usize {
return Err(PageError::IndexOutOfBounds {});
}
Ok(self.key_fmt().decode_at(self.key_block(), idx, scratch))
}
pub fn split_off_into(
&mut self,
split_idx: usize,
right: &mut InternalPage,
) -> Result<Vec<u8>, PageError> {
let key_count = self.key_count() as usize;
if split_idx == 0 || split_idx >= key_count {
return Err(PageError::IndexOutOfBounds {});
}
let kb = self.key_block();
let mut left_kb = Vec::new();
let mut right_kb = Vec::new();
self.key_fmt()
.split_into(kb, split_idx, &mut left_kb, &mut right_kb);
let mut moved_cldrn = Vec::with_capacity(key_count + 1 - split_idx);
for i in split_idx..key_count + 1 {
moved_cldrn.push(self.read_child_at(i)?);
}
let old_len = kb.len();
let delta_k = left_kb.len() as isize - old_len as isize; self.move_child_dir(delta_k)?;
let ks = self.keys_start();
self.buf[ks..ks + left_kb.len()].copy_from_slice(&left_kb);
self.set_key_block_len(left_kb.len() as u16);
self.set_key_count(split_idx as u16);
{
let ks_r = right.keys_start();
right.buf[ks_r..ks_r + right_kb.len()].copy_from_slice(&right_kb);
right.set_key_block_len(right_kb.len() as u16);
right.set_key_count((key_count - split_idx) as u16);
}
for (i, child_ptr) in moved_cldrn.iter().enumerate() {
right.write_child_at(i, *child_ptr)?;
}
let mut scratch = ScratchBuf::new();
let sep = self
.key_fmt()
.decode_at(right.key_block(), 0, &mut scratch)
.to_vec();
Ok(sep)
}
}
impl Default for InternalPage {
fn default() -> Self {
InternalPage::new(KeyFormat::Raw(raw::RawFormat)) }
}
struct PageKeyRun<'a> {
body: &'a [u8],
fmt: &'a dyn KeyBlockFormat,
}
impl<'a> PageKeyRun<'a> {
fn seek(&self, needle: &[u8], scratch: &mut ScratchBuf) -> Result<usize, usize> {
self.fmt.seek(self.body, needle, scratch)
}
}
impl fmt::Debug for InternalPage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let key_count = self.key_count() as usize;
let key_block_len = self.key_block_len() as usize;
let keys_end = self.keys_end();
let children_base = self.children_base();
let children_end = self.children_end();
let mut dbg = f.debug_struct("InternalPage");
dbg.field("fmt_id", &self.keyfmt_id())
.field("keys", &key_count)
.field("children", &(key_count + 1))
.field("key_block_len", &key_block_len)
.field("keys_end", &keys_end)
.field("children_base", &children_base)
.field("children_end", &children_end)
.field("used_bytes", &children_end);
let fmt_impl = self.key_fmt();
let mut scratch = ScratchBuf::new();
let mut previews: Vec<String> = Vec::new();
let sample = key_count;
for i in 0..sample {
let k = fmt_impl.decode_at(self.key_block(), i, &mut scratch);
previews.push(k.iter().map(|b| format!("{:02x}", b)).collect());
}
dbg.field("keys_preview(hex)", &previews);
let mut previews: Vec<String> = Vec::new();
let sample = (key_count + 1).min(10);
for i in 0..sample {
let k = self.read_child_at(i);
previews.push(k.iter().map(|b| format!("{}", b)).collect());
}
dbg.field("children_preview", &previews);
dbg.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_internal_page() {
let mut page = InternalPage::new(KeyFormat::Raw(raw::RawFormat));
let key = b"test_key";
let child = 42;
assert!(page.insert_encoded(key, child).is_ok());
let scratch = &mut ScratchBuf::new();
let retrieved_key = page.get_key_at(0, scratch).unwrap();
let retrieved_child = page.read_child_at(1).unwrap();
assert_eq!(
String::from_utf8(retrieved_key.to_vec()),
String::from_utf8(key.to_vec())
);
assert_eq!(retrieved_child, child);
}
#[test]
fn test_internal_page_multiples() {
let mut page = InternalPage::new(KeyFormat::Raw(raw::RawFormat));
let mut keys: Vec<String> = Vec::new();
let mut children: Vec<u64> = Vec::new();
let iterations = 10;
let scratch = &mut ScratchBuf::new();
page.write_leftmost_child(0).unwrap();
for i in 0..iterations {
let mut key = format!("key{}", i);
for _j in 0..i {
key.push_str(&format!("key{}", i));
}
keys.push(key.clone());
children.push(i as u64 + 1);
assert!(
page.insert_separator(i, key.as_bytes(), i as u64 + 1)
.is_ok()
);
let retrieved_key = page.get_key_at(i, scratch).unwrap();
assert_eq!(retrieved_key, key.as_bytes());
let retrieved_child = page.read_child_at(i + 1).unwrap();
assert_eq!(retrieved_child, i as u64 + 1);
}
for (i, key) in keys.iter().enumerate().take(iterations) {
let scratch = &mut ScratchBuf::new();
let retrieved_key = page.get_key_at(i, scratch).unwrap();
assert_eq!(retrieved_key, key.as_bytes());
}
}
#[test]
fn test_internal_page_random_inserts() {
use rand::Rng;
let mut rng = rand::thread_rng();
let mut page = InternalPage::new(KeyFormat::Raw(raw::RawFormat));
let iterations = 10;
let scratch = &mut ScratchBuf::new();
page.write_leftmost_child(0).unwrap(); for i in 0..iterations {
let mut key = format!("key{}", i);
for _j in 0..i {
key.push_str(&format!("key{}", i));
}
assert!(
page.insert_separator(i, key.as_bytes(), i as u64 + 1)
.is_ok()
);
let retrieved_key = page.get_key_at(i, scratch).unwrap();
assert_eq!(retrieved_key, key.as_bytes());
let retrieved_child = page.read_child_at(i + 1).unwrap();
assert_eq!(retrieved_child, i as u64 + 1);
}
let key = "SomeKeyWithRandomSize";
let idx_rand = rng.gen_range(0..iterations - 1) as usize;
let res = page.insert_separator(idx_rand as usize, key.as_bytes(), 999);
assert!(res.is_ok());
let retrieved_value = page.read_child_at(idx_rand + 1).unwrap();
let retrieved_key = page.get_key_at(idx_rand, scratch).unwrap();
assert_eq!(retrieved_value, 999);
assert_eq!(retrieved_key, key.as_bytes());
}
#[test]
fn test_internal_page_replaces() {
let mut page = InternalPage::new(KeyFormat::Raw(raw::RawFormat));
let iterations = 10;
let scratch = &mut ScratchBuf::new();
page.write_leftmost_child(0).unwrap(); for i in 0..iterations {
let mut key = format!("key{}", i);
for _j in 0..i {
key.push_str(&format!("key{}", i));
}
assert!(
page.insert_separator(i, key.as_bytes(), i as u64 + 1)
.is_ok()
);
let retrieved_key = page.get_key_at(i, scratch).unwrap();
assert_eq!(retrieved_key, key.as_bytes());
let retrieved_child = page.read_child_at(i + 1).unwrap();
assert_eq!(retrieved_child, i as u64 + 1);
}
for i in 0..iterations {
let new_key = format!("new_key{}", i);
assert!(page.replace_key_at(i, new_key.as_bytes()).is_ok());
let retrieved_key = page.get_key_at(i, scratch).unwrap();
assert_eq!(retrieved_key, new_key.as_bytes());
}
}
#[test]
fn test_internal_page_pop_last() {
let mut page = InternalPage::new(KeyFormat::Raw(raw::RawFormat));
let iterations = 10;
let scratch = &mut ScratchBuf::new();
page.write_leftmost_child(0).unwrap(); for i in 0..iterations {
let mut key = format!("key{}", i);
for _j in 0..i {
key.push_str(&format!("key{}", i));
}
assert!(
page.insert_separator(i, key.as_bytes(), i as u64 + 1)
.is_ok()
);
let retrieved_key = page.get_key_at(i, scratch).unwrap();
assert_eq!(retrieved_key, key.as_bytes());
let retrieved_child = page.read_child_at(i + 1).unwrap();
assert_eq!(retrieved_child, i as u64 + 1);
}
for i in (0..iterations).rev() {
let mut expected_key = format!("key{}", i);
for _j in 0..i {
expected_key.push_str(&format!("key{}", i));
}
let popped_key = page.pop_last_key(scratch).unwrap();
assert_eq!(popped_key, expected_key.as_bytes());
assert_eq!(page.key_count(), i as u16);
}
}
#[test]
fn test_internal_page_removals() {
use rand::Rng;
let mut rng = rand::thread_rng();
let mut page = InternalPage::new(KeyFormat::Raw(raw::RawFormat));
let scratch = &mut ScratchBuf::new();
let iterations = 10;
let mut keys: Vec<String> = Vec::new();
let mut children: Vec<u64> = Vec::new();
page.write_leftmost_child(0).unwrap();
for i in 0..iterations {
let mut key = format!("key{}", i);
for _j in 0..i {
key.push_str(&format!("key{}", i));
}
keys.push(key.clone());
children.push(i as u64 + 1);
assert!(
page.insert_separator(i, key.as_bytes(), i as u64 + 1)
.is_ok()
);
let retrieved_key = page.get_key_at(i, scratch).unwrap();
assert_eq!(retrieved_key, key.as_bytes());
let retrieved_child = page.read_child_at(i + 1).unwrap();
assert_eq!(retrieved_child, i as u64 + 1);
}
while page.key_count() > 0 {
let bound = page.key_count() as usize - 1;
let idx = rng.gen_range(0..=bound) as usize;
assert!(page.delete_separator(idx).is_ok());
if page.key_count() == 0 {
break;
}
if idx >= page.key_count() as usize {
assert!(page.get_key_at(idx, scratch).is_err());
continue;
}
let retrieved_key = page.get_key_at(idx, scratch).unwrap();
assert_ne!(retrieved_key, keys[idx].as_bytes());
}
}
#[test]
fn test_delete_key_at() {
let mut page = InternalPage::new(KeyFormat::Raw(raw::RawFormat));
let scratch = &mut ScratchBuf::new();
let iterations = 10;
let mut keys: Vec<String> = Vec::new();
let mut children: Vec<u64> = Vec::new();
page.write_leftmost_child(0).unwrap();
for i in 0..iterations {
let mut key = format!("key{}", i);
for _j in 0..i {
key.push_str(&format!("key{}", i));
}
keys.push(key.clone());
children.push(i as u64 + 1);
assert!(
page.insert_separator(i, key.as_bytes(), i as u64 + 1)
.is_ok()
);
let retrieved_key = page.get_key_at(i, scratch).unwrap();
assert_eq!(retrieved_key, key.as_bytes());
let retrieved_child = page.read_child_at(i + 1).unwrap();
assert_eq!(retrieved_child, i as u64 + 1);
}
while page.key_count() > 0 {
let bound = page.key_count() as usize - 1;
let idx = bound; let deleted_key = page.delete_key_at(idx, scratch).unwrap();
assert_eq!(deleted_key, keys[idx].as_bytes());
if page.key_count() == 0 {
break;
}
if idx >= page.key_count() as usize {
assert!(page.get_key_at(idx, scratch).is_err());
continue;
}
let retrieved_key = page.get_key_at(idx, scratch).unwrap();
assert_ne!(retrieved_key, keys[idx].as_bytes());
}
}
#[test]
fn test_internal_page_split() {
use rand::Rng;
let mut rng = rand::thread_rng();
let mut page = InternalPage::new(KeyFormat::Raw(raw::RawFormat));
let scratch = &mut ScratchBuf::new();
let iterations = 10;
let mut keys: Vec<String> = Vec::new();
let mut children: Vec<u64> = Vec::new();
page.write_leftmost_child(0).unwrap();
for i in 0..iterations {
let mut key = format!("key{}", i);
for _j in 0..i {
key.push_str(&format!("key{}", i));
}
keys.push(key.clone());
children.push(i as u64 + 1);
assert!(
page.insert_separator(i, key.as_bytes(), i as u64 + 1)
.is_ok()
);
let retrieved_key = page.get_key_at(i, scratch).unwrap();
assert_eq!(retrieved_key, key.as_bytes());
let retrieved_child = page.read_child_at(i + 1).unwrap();
assert_eq!(retrieved_child, i as u64 + 1);
}
let mut right = InternalPage::new(KeyFormat::Raw(raw::RawFormat));
let split_idx = rng.gen_range(1..(page.key_count() as usize - 1));
let sep = page.split_off_into(split_idx, &mut right).unwrap();
let separator = right.get_key_at(0, scratch).unwrap();
let scratch = &mut ScratchBuf::new();
for i in 0..split_idx {
let retrieved_key = page.get_key_at(i, scratch).unwrap();
assert_eq!(retrieved_key, keys[i].as_bytes());
let retrieved_child = page.read_child_at(i + 1).unwrap();
assert_eq!(retrieved_child, children[i]);
}
let scratch = &mut ScratchBuf::new();
for i in 0..(right.key_count() as usize) {
let retrieved_key = right.get_key_at(i, scratch).unwrap();
assert_eq!(retrieved_key, keys[split_idx + i].as_bytes());
let retrieved_child = right.read_child_at(i + 1).unwrap();
assert_eq!(retrieved_child, children[split_idx + i]);
}
assert_eq!(sep, separator);
}
}