use crate::bucket::{BucketCell, BucketIApi, BucketRwIApi, MAX_FILL_PERCENT, MIN_FILL_PERCENT};
use crate::common::inode::INode;
use crate::common::memory::{CodSlice, LCell, SplitArray, VecOrSplit};
use crate::common::page::tree::branch::{MappedBranchPage, BRANCH_PAGE_ELEMENT_SIZE};
use crate::common::page::tree::leaf::{MappedLeafPage, LEAF_PAGE_ELEMENT_SIZE};
use crate::common::page::tree::{TreePage, MIN_KEYS_PER_PAGE};
use crate::common::page::{CoerciblePage, MutPage, RefPage, PAGE_HEADER_SIZE};
use crate::common::{BVec, PgId, SplitRef, ZERO_PGID};
use crate::tx::{TxIApi, TxRwIApi};
use bumpalo::Bump;
use std::mem;
use std::ops::{Deref, DerefMut};
pub struct NodeW<'tx> {
pub(crate) is_leaf: bool,
pub(crate) key: CodSlice<'tx, u8>,
pub(crate) pgid: PgId,
pub(crate) inodes: VecOrSplit<'tx, INode<'tx>>,
bucket: BucketCell<'tx>,
parent: Option<NodeRwCell<'tx>>,
is_unbalanced: bool,
is_spilled: bool,
pub(crate) children: BVec<'tx, NodeRwCell<'tx>>,
}
impl<'tx> PartialEq for NodeW<'tx> {
fn eq(&self, other: &Self) -> bool {
self.pgid == other.pgid && self.key == other.key
}
}
impl<'tx> Eq for NodeW<'tx> {}
impl<'tx> NodeW<'tx> {
fn new_parent_in(bucket: BucketCell<'tx>) -> NodeW<'tx> {
let bump = bucket.tx().bump();
NodeW {
is_leaf: false,
key: CodSlice::Owned(&[]),
pgid: Default::default(),
inodes: BVec::with_capacity_in(0, bump).into(),
bucket,
parent: None,
is_unbalanced: false,
is_spilled: false,
children: BVec::with_capacity_in(0, bump),
}
}
fn new_child_in(bucket: BucketCell<'tx>, is_leaf: bool, parent: NodeRwCell<'tx>) -> NodeW<'tx> {
let bump = bucket.tx().bump();
NodeW {
is_leaf,
key: CodSlice::Owned(&[]),
pgid: Default::default(),
inodes: BVec::with_capacity_in(0, bump).into(),
bucket,
parent: Some(parent),
is_unbalanced: false,
is_spilled: false,
children: BVec::with_capacity_in(0, bump),
}
}
pub(crate) fn read_in(
bucket: BucketCell<'tx>, parent: Option<NodeRwCell<'tx>>, page: &RefPage<'tx>,
) -> NodeW<'tx> {
assert!(page.is_leaf() || page.is_branch(), "Non-tree page read");
let bump = bucket.tx().bump();
let mut inodes = BVec::with_capacity_in(page.count as usize, bump);
INode::read_inodes_in(&mut inodes, page);
let _inodes = inodes.as_slice();
let key = if !inodes.is_empty() {
CodSlice::Mapped(inodes[0].key())
} else {
CodSlice::Mapped(&[])
};
NodeW {
is_leaf: page.is_leaf(),
key,
pgid: page.id,
inodes: inodes.into(),
bucket,
parent,
is_unbalanced: false,
is_spilled: false,
children: BVec::with_capacity_in(page.count as usize, bump),
}
}
pub(crate) fn page_element_size(&self) -> usize {
if self.is_leaf {
LEAF_PAGE_ELEMENT_SIZE
} else {
BRANCH_PAGE_ELEMENT_SIZE
}
}
pub(crate) fn min_keys(&self) -> usize {
if self.is_leaf {
1
} else {
2
}
}
pub(crate) fn key(&self) -> &'tx [u8] {
self.key.get_ref()
}
pub(crate) fn size(&self) -> usize {
let mut size = PAGE_HEADER_SIZE;
let elem_size = self.page_element_size();
for inode in self.inodes.deref() {
size += elem_size + inode.key().len() + inode.value().len();
}
size
}
fn write(&self, p: &mut MutPage) {
if self.inodes.len() >= 0xFFFF {
panic!("inode overflow: {} (pgid={})", self.inodes.len(), p.id);
}
if self.is_leaf {
MappedLeafPage::mut_into(p).write_elements(&self.inodes);
} else {
MappedBranchPage::mut_into(p).write_elements(&self.inodes);
}
}
fn del(&mut self, key: &[u8]) {
if let Ok(index) = self.inodes.binary_search_by(|probe| probe.key().cmp(key)) {
self.inodes.get_mut_vec().remove(index);
self.is_unbalanced = true;
}
}
fn remove_child(&mut self, target: NodeRwCell<'tx>) {
if let Some(pos) = self.children.iter().position(|n| *n == target) {
self.children.remove(pos);
}
}
}
struct NodeSplit<'tx> {
page_size: usize,
threshold: usize,
page_element_size: usize,
split_array: SplitArray<'tx, INode<'tx>>,
next: Option<NodeRwCell<'tx>>,
}
impl<'tx> NodeSplit<'tx> {
fn split_two<'a>(
&'a mut self, node: NodeRwCell<'tx>,
) -> (NodeRwCell<'tx>, Option<NodeRwCell<'tx>>)
where
'tx: 'a,
{
let mut cell = node.cell.borrow_mut();
if self.split_array.len() <= MIN_KEYS_PER_PAGE * 2 || self.size_less_than(&self.split_array) {
cell.inodes = self
.split_array
.split_left_off(self.split_array.len())
.into();
return (node, None);
}
let (split_index, _) = self.split_index(&self.split_array);
cell.inodes = self.split_array.split_left_off(split_index).into();
let parent = {
if let Some(parent) = cell.parent {
parent
} else {
let parent = NodeRwCell::new_parent_in(cell.bucket);
cell.parent = Some(parent);
parent.cell.borrow_mut().children.push(node);
parent
}
};
let next = NodeRwCell::new_child_in(cell.bucket, cell.is_leaf, parent);
let mut next_cell = next.cell.borrow_mut();
next_cell.inodes = self.split_array.clone().into();
parent.cell.borrow_mut().children.push(next);
cell
.bucket
.tx()
.split_r()
.stats
.as_ref()
.unwrap()
.inc_split(1);
(node, Some(next))
}
fn split_index(&self, rem: &[INode]) -> (usize, usize) {
let mut size = PAGE_HEADER_SIZE;
let mut index = 0;
if rem.len() <= MIN_KEYS_PER_PAGE {
return (index, size);
}
for (idx, inode) in rem
.split_at(rem.len() - MIN_KEYS_PER_PAGE)
.0
.iter()
.enumerate()
{
index = idx;
let elsize = self.page_element_size + inode.key().len() + inode.value().len();
if index >= MIN_KEYS_PER_PAGE && size + elsize > self.threshold {
break;
}
size += elsize;
}
(index, size)
}
fn size_less_than(&self, rem: &[INode]) -> bool {
let mut size = PAGE_HEADER_SIZE;
let elem_size = self.page_element_size;
for inode in rem {
size += elem_size + inode.key().len() + inode.value().len();
if size > self.page_size {
return false;
}
}
true
}
}
impl<'tx> Iterator for NodeSplit<'tx> {
type Item = NodeRwCell<'tx>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(node) = self.next {
let (a, ob) = self.split_two(node);
self.next = ob;
return Some(a);
}
None
}
}
#[derive(Copy, Clone, Eq, PartialEq)]
pub struct NodeRwCell<'tx> {
pub(crate) cell: LCell<'tx, NodeW<'tx>>,
}
impl<'tx> NodeRwCell<'tx> {
fn new_parent_in(bucket: BucketCell<'tx>) -> NodeRwCell<'tx> {
NodeRwCell {
cell: LCell::new_in(NodeW::new_parent_in(bucket), bucket.tx().bump()),
}
}
fn new_child_in(
bucket: BucketCell<'tx>, is_leaf: bool, parent: NodeRwCell<'tx>,
) -> NodeRwCell<'tx> {
NodeRwCell {
cell: LCell::new_in(
NodeW::new_child_in(bucket, is_leaf, parent),
bucket.tx().bump(),
),
}
}
pub(crate) fn read_in(
bucket: BucketCell<'tx>, parent: Option<NodeRwCell<'tx>>, page: &RefPage<'tx>,
) -> NodeRwCell<'tx> {
NodeRwCell {
cell: LCell::new_in(NodeW::read_in(bucket, parent, page), bucket.tx().bump()),
}
}
pub(crate) fn root(self: NodeRwCell<'tx>) -> NodeRwCell<'tx> {
let parent = self.cell.borrow().parent;
match parent {
None => self,
Some(p) => p.root(),
}
}
pub(crate) fn child_at(self: NodeRwCell<'tx>, index: u32) -> NodeRwCell<'tx> {
let (bucket, pgid) = {
let self_borrow = self.cell.borrow();
if self_borrow.is_leaf {
panic!("invalid child_at {} on leaf node", index);
}
(
self_borrow.bucket,
self_borrow.inodes[index as usize].pgid(),
)
};
bucket.node(pgid, Some(self))
}
pub(crate) fn child_index(self: NodeRwCell<'tx>, child: NodeRwCell<'tx>) -> usize {
let child_key = child.cell.borrow().key;
let result = {
let self_borrow = self.cell.borrow();
self_borrow
.inodes
.binary_search_by(|probe| probe.key().cmp(&child_key))
};
result
.map_err(|_| child_key.as_ref())
.expect("node not found")
}
pub(crate) fn num_children(self: NodeRwCell<'tx>) -> usize {
self.cell.borrow().inodes.len()
}
pub(crate) fn next_sibling(self: NodeRwCell<'tx>) -> Option<NodeRwCell<'tx>> {
let parent = self.cell.borrow().parent;
if let Some(parent_node) = parent {
let index = parent_node.child_index(self);
if index >= parent_node.num_children() - 1 {
return None;
}
return Some(parent_node.child_at((index + 1) as u32));
}
None
}
pub(crate) fn prev_sibling(self: NodeRwCell<'tx>) -> Option<NodeRwCell<'tx>> {
let parent = self.cell.borrow().parent;
if let Some(parent_node) = parent {
let index = parent_node.child_index(self);
if index == 0 {
return None;
}
return Some(parent_node.child_at((index - 1) as u32));
}
None
}
pub(crate) fn put(
self: NodeRwCell<'tx>, old_key: &'tx [u8], new_key: &'tx [u8], value: &'tx [u8], pgid: PgId,
flags: u32,
) {
let mut self_borrow = self.cell.borrow_mut();
if pgid >= self_borrow.bucket.tx().meta().pgid() {
panic!(
"pgid {} above high water mark {}",
pgid,
self_borrow.bucket.tx().meta().pgid()
);
} else if old_key.is_empty() {
panic!("put: zero-length old key");
} else if new_key.is_empty() {
panic!("put: zero-length new key");
}
let index = self_borrow
.inodes
.binary_search_by(|probe| probe.key().cmp(old_key));
let new_node = INode::new_owned_in(flags, pgid, new_key, value, self_borrow.bucket.tx().bump());
if new_node.key().is_empty() {
panic!("put: zero-length new key");
}
match index {
Ok(exact) => *self_borrow.inodes.get_mut(exact).unwrap() = new_node,
Err(closest) => self_borrow.inodes.get_mut_vec().insert(closest, new_node),
}
}
pub(crate) fn del(self: NodeRwCell<'tx>, key: &[u8]) {
self.cell.borrow_mut().del(key);
}
pub(crate) fn size(self: NodeRwCell<'tx>) -> usize {
self.cell.borrow().size()
}
pub(crate) fn write(self: NodeRwCell<'tx>, page: &mut MutPage<'tx>) {
let self_borrow = self.cell.borrow();
if self_borrow.is_leaf {
let mpage = MappedLeafPage::mut_into(page);
mpage.write_elements(&self_borrow.inodes);
} else {
let mpage = MappedBranchPage::mut_into(page);
mpage.write_elements(&self_borrow.inodes);
}
}
pub(crate) fn spill(self) -> crate::Result<()> {
let tx = {
let mut cell = self.cell.borrow_mut();
if cell.is_spilled {
return Ok(());
}
cell.children.sort_by_key(|child| child.cell.borrow().key());
cell.bucket.tx()
};
let mut i = 0;
let mut o_child = self.cell.borrow().children.get(i).copied();
while let Some(child) = o_child {
child.spill()?;
i += 1;
o_child = self.cell.borrow().children.get(i).copied()
}
self.cell.borrow_mut().children.clear();
let page_size = tx.page_size();
for node in self.split(page_size) {
let node_size = {
let mut node_cell = node.cell.borrow_mut();
if node_cell.pgid > ZERO_PGID {
let any_page = tx.any_page(node_cell.pgid);
tx.freelist_free_page(tx.api_id(), &any_page);
node_cell.pgid = ZERO_PGID;
}
node_cell.size()
};
let mut p = tx.allocate((node_size + tx.page_size() - 1) / tx.page_size())?;
if p.id >= tx.meta().pgid() {
panic!("pgid {} above high water mark {}", p.id, tx.meta().pgid())
}
let mut node_cell = node.cell.borrow_mut();
node_cell.pgid = p.id;
node_cell.write(&mut p);
tx.queue_page(p);
node_cell.is_spilled = true;
if let Some(parent) = node_cell.parent {
let key: &'tx [u8] = {
if node_cell.key.len() == 0 {
node_cell.inodes.deref()[0].key()
} else {
node_cell.key()
}
};
parent.put(
key,
node_cell.inodes.deref()[0].key(),
&[],
node_cell.pgid,
0,
);
node_cell.key = node_cell.inodes.deref()[0].cod_key();
}
tx.split_r().stats.as_ref().unwrap().inc_spill(1);
}
let cell = self.cell.borrow();
if let Some(parent) = cell.parent {
drop(cell);
if { parent.cell.borrow().pgid } == ZERO_PGID {
return parent.spill();
}
}
Ok(())
}
fn split(self, page_size: usize) -> NodeSplit<'tx> {
let mut fill_percent = self
.cell
.borrow()
.bucket
.cell
.borrow()
.w
.as_ref()
.unwrap()
.fill_percent;
fill_percent = fill_percent.max(MIN_FILL_PERCENT).min(MAX_FILL_PERCENT);
let threshold = (page_size as f64 * fill_percent) as usize;
let mut self_borrow = self.cell.borrow_mut();
let mut inodes = BVec::with_capacity_in(0, self_borrow.inodes.get_vec().bump());
mem::swap(&mut inodes, self_borrow.inodes.get_mut_vec());
NodeSplit {
page_size,
threshold,
split_array: SplitArray::new(inodes),
page_element_size: self_borrow.page_element_size(),
next: Some(self),
}
}
pub(crate) fn rebalance(self: NodeRwCell<'tx>) {
let mut self_borrow = self.cell.borrow_mut();
let bucket = self_borrow.bucket;
if !self_borrow.is_unbalanced {
return;
}
self_borrow.is_unbalanced = false;
let tx = self_borrow.bucket.tx();
tx.split_r().stats.as_ref().unwrap().inc_rebalance(1);
let threshold = tx.page_size() / 4;
if self_borrow.size() > threshold && self_borrow.inodes.len() > self_borrow.min_keys() {
return;
}
if self_borrow.parent.is_none() {
if !self_borrow.is_leaf && self_borrow.inodes.len() == 1 {
let child = self_borrow
.bucket
.node(self_borrow.inodes.first().unwrap().pgid(), Some(self));
let mut child_borrow = child.cell.borrow_mut();
self_borrow.is_leaf = child_borrow.is_leaf;
self_borrow.inodes.get_mut_vec().clear();
mem::swap(&mut self_borrow.inodes, &mut child_borrow.inodes);
self_borrow.children.clear();
mem::swap(&mut self_borrow.children, &mut child_borrow.children);
let mut bucket = self_borrow.bucket.cell.borrow_mut();
for inode in self_borrow.inodes.get_vec() {
if let Some(child) = bucket.w.as_mut().unwrap().nodes.get_mut(&inode.pgid()) {
child.cell.borrow_mut().parent = Some(self);
}
}
child_borrow.parent = None;
bucket.w.as_mut().unwrap().nodes.remove(&child_borrow.pgid);
drop(bucket);
drop(child_borrow);
child.free()
}
return;
}
let parent = self_borrow.parent.unwrap();
let mut parent_borrow = parent.cell.borrow_mut();
if self_borrow.inodes.is_empty() {
parent_borrow.del(self_borrow.key());
let bucket = self_borrow.bucket;
let pg_id = self_borrow.pgid;
drop(self_borrow);
parent_borrow.remove_child(self);
{
let mut bucket_borrow = bucket.cell.borrow_mut();
bucket_borrow.w.as_mut().unwrap().nodes.remove(&pg_id);
}
self.free();
drop(parent_borrow);
parent.rebalance();
return;
}
assert!(
parent_borrow.inodes.len() > 1,
"parent must have at least 2 children"
);
drop(self_borrow);
drop(parent_borrow);
let use_next_sibling = parent.child_index(self) == 0;
let target = if use_next_sibling {
self.next_sibling().unwrap()
} else {
self.prev_sibling().unwrap()
};
let mut target_borrow = target.cell.borrow_mut();
let mut bucket = bucket.cell.borrow_mut();
let mut self_borrow = self.cell.borrow_mut();
if use_next_sibling {
{
let inodes: &'tx [INode] = unsafe { mem::transmute(target_borrow.inodes.deref()) };
drop(target_borrow);
for inode in inodes {
if let Some(child) = bucket.w.as_ref().unwrap().nodes.get(&inode.pgid()).cloned() {
let child_parent = child.cell.borrow().parent.unwrap();
child_parent.cell.borrow_mut().remove_child(child);
child.cell.borrow_mut().parent = Some(self);
self_borrow.children.push(child);
}
}
}
let mut target_borrow = target.cell.borrow_mut();
self_borrow
.inodes
.get_mut_vec()
.append(target_borrow.inodes.get_mut_vec());
let parent = self_borrow.parent.unwrap();
parent.del(target_borrow.key());
let target_pgid = target_borrow.pgid;
drop(target_borrow);
drop(self_borrow);
parent.cell.borrow_mut().remove_child(target);
bucket.w.as_mut().unwrap().nodes.remove(&target_pgid);
target.free();
} else {
{
let inodes: &'tx [INode] = unsafe { mem::transmute(self_borrow.inodes.deref()) };
drop(self_borrow);
for inode in inodes {
if let Some(child) = bucket.w.as_ref().unwrap().nodes.get(&inode.pgid()).cloned() {
let child_parent = child.cell.borrow().parent.unwrap();
child_parent.cell.borrow_mut().remove_child(child);
child.cell.borrow_mut().parent = Some(target);
target_borrow.children.push(child);
}
}
}
let mut self_borrow = self.cell.borrow_mut();
target_borrow
.inodes
.get_mut_vec()
.append(self_borrow.inodes.get_mut_vec());
let parent = self_borrow.parent.unwrap();
parent.del(self_borrow.key());
let self_pgid = self_borrow.pgid;
drop(self_borrow);
drop(target_borrow);
parent.cell.borrow_mut().remove_child(self);
bucket.w.as_mut().unwrap().nodes.remove(&self_pgid);
self.free();
}
drop(bucket);
parent.rebalance();
}
pub(crate) fn own_in(self: NodeRwCell<'tx>, bump: &'tx Bump) {
let mut self_borrow = self.cell.borrow_mut();
self_borrow.key.own_in(bump);
for inode in self_borrow.inodes.deref_mut() {
inode.own_in(bump);
}
for child in self_borrow.children.iter() {
child.own_in(bump);
}
self_borrow
.bucket
.tx()
.split_r()
.stats
.as_ref()
.unwrap()
.inc_node_deref(1);
}
pub(crate) fn free(self: NodeRwCell<'tx>) {
let (pgid, api_tx) = {
let self_borrow = self.cell.borrow();
if self_borrow.pgid == ZERO_PGID {
return;
}
(self_borrow.pgid, self_borrow.bucket.tx())
};
let page = api_tx.mem_page(pgid);
let txid = api_tx.meta().txid();
api_tx.freelist_free_page(txid, &page);
}
}
#[cfg(test)]
mod test {
use crate::bucket::BucketRwIApi;
use crate::common::ids::pd;
use crate::common::page::tree::leaf::{
LeafPageElement, MappedLeafPage, LEAF_PAGE_ELEMENT_SIZE, LEAF_PAGE_FLAG,
};
use crate::common::page::tree::TreePage;
use crate::common::page::{CoerciblePage, MutPage, RefPage, PAGE_HEADER_SIZE};
use crate::common::ZERO_PGID;
use crate::node::NodeW;
use crate::test_support::TestDb;
use crate::tx::check::UnsealRwTx;
use crate::tx::TxRwIApi;
use aligners::{alignment, AlignedBytes};
use itertools::Itertools;
#[test]
fn test_node_put() -> crate::Result<()> {
let mut test_db = TestDb::new()?;
let tx = test_db.begin_rw_unseal()?;
let txrw = tx.unseal_rw();
let root_bucket = txrw.root_bucket_mut();
let n = root_bucket.materialize_root();
n.put(b"baz", b"baz", b"2", ZERO_PGID, 0);
n.put(b"foo", b"foo", b"0", ZERO_PGID, 0);
n.put(b"bar", b"bar", b"1", ZERO_PGID, 0);
n.put(b"foo", b"foo", b"3", ZERO_PGID, LEAF_PAGE_FLAG as u32);
assert_eq!(3, n.cell.borrow().inodes.len());
let inode = &n.cell.borrow().inodes[0];
assert_eq!(b"bar1".split_at(3), (inode.key(), inode.value()));
let inode = &n.cell.borrow().inodes[1];
assert_eq!(b"baz2".split_at(3), (inode.key(), inode.value()));
let inode = &n.cell.borrow().inodes[2];
assert_eq!(b"foo3".split_at(3), (inode.key(), inode.value()));
assert_eq!(LEAF_PAGE_FLAG as u32, n.cell.borrow().inodes[2].flags());
Ok(())
}
#[test]
fn test_node_read_leaf_page() -> crate::Result<()> {
let mut test_db = TestDb::new()?;
let tx = test_db.begin_rw_unseal()?;
let txrw = tx.unseal_rw();
let root_bucket = txrw.root_bucket_mut();
root_bucket.materialize_root();
let n = root_bucket
.cell
.borrow_mut()
.w
.as_ref()
.unwrap()
.root_node
.unwrap();
let mut page = AlignedBytes::<alignment::Page>::new_zeroed(4096);
{
let mut mut_page = MutPage::new(page.as_mut_ptr());
let mapped_page = MappedLeafPage::mut_into(&mut mut_page);
mapped_page.count = 2;
let elements = mapped_page.elements_mut();
elements[0] = LeafPageElement::new(0, 32, 3, 4);
elements[1] = LeafPageElement::new(0, 23, 10, 3);
let data = b"barfoozhelloworldbye";
let data_offset = PAGE_HEADER_SIZE + (LEAF_PAGE_ELEMENT_SIZE * 2);
page[data_offset..data_offset + data.len()].copy_from_slice(data.as_slice());
}
let ref_page = RefPage::new(page.as_ptr());
let node = NodeW::read_in(root_bucket, Some(n), &ref_page);
assert!(node.is_leaf);
assert_eq!(2, node.inodes.len());
let inodes = &node.inodes;
assert_eq!(
(b"bar".as_slice(), b"fooz".as_slice()),
(inodes[0].key(), inodes[0].value())
);
assert_eq!(
(b"helloworld".as_slice(), b"bye".as_slice()),
(inodes[1].key(), inodes[1].value())
);
Ok(())
}
#[test]
fn test_node_write_leaf_page() -> crate::Result<()> {
let mut page = AlignedBytes::<alignment::Page>::new_zeroed(4096);
let mut test_db = TestDb::new()?;
let tx = test_db.begin_rw_unseal()?;
let txrw = tx.unseal_rw();
let root_bucket = txrw.root_bucket_mut();
root_bucket.materialize_root();
let n = root_bucket
.cell
.borrow_mut()
.w
.as_ref()
.unwrap()
.root_node
.unwrap();
{
n.put(b"susy", b"susy", b"que", pd(0), 0);
n.put(b"ricki", b"ricki", b"lake", pd(0), 0);
n.put(b"john", b"john", b"johnson", pd(0), 0);
let mut mut_page = MutPage::new(page.as_mut_ptr());
n.write(&mut mut_page);
n.del(b"susy");
n.del(b"ricki");
n.del(b"john");
}
let ref_page = RefPage::new(page.as_ptr());
let node = NodeW::read_in(root_bucket, Some(n), &ref_page);
assert!(node.is_leaf);
assert_eq!(3, node.inodes.len());
let inodes = &node.inodes;
assert_eq!(
(b"john".as_slice(), b"johnson".as_slice()),
(inodes[0].key(), inodes[0].value())
);
assert_eq!(
(b"ricki".as_slice(), b"lake".as_slice()),
(inodes[1].key(), inodes[1].value())
);
assert_eq!(
(b"susy".as_slice(), b"que".as_slice()),
(inodes[2].key(), inodes[2].value())
);
Ok(())
}
#[test]
fn test_node_split() -> crate::Result<()> {
let mut test_db = TestDb::new()?;
let tx = test_db.begin_rw_unseal()?;
let txrw = tx.unseal_rw();
let root_bucket = txrw.root_bucket_mut();
let n = root_bucket.materialize_root();
n.put(b"00000001", b"00000001", b"0123456701234567", ZERO_PGID, 0);
n.put(b"00000002", b"00000002", b"0123456701234567", ZERO_PGID, 0);
n.put(b"00000003", b"00000003", b"0123456701234567", ZERO_PGID, 0);
n.put(b"00000004", b"00000004", b"0123456701234567", ZERO_PGID, 0);
n.put(b"00000005", b"00000005", b"0123456701234567", ZERO_PGID, 0);
let split_nodes = n.split(100).collect_vec();
let binding = n.cell.borrow().parent.unwrap();
let parent_children = &binding.cell.borrow().children;
assert_eq!(2, parent_children.len());
assert_eq!(2, split_nodes[0].cell.borrow().inodes.len());
assert_eq!(3, split_nodes[1].cell.borrow().inodes.len());
Ok(())
}
#[test]
fn test_node_split_min_keys() -> crate::Result<()> {
let mut test_db = TestDb::new()?;
let tx = test_db.begin_rw_unseal()?;
let txrw = tx.unseal_rw();
let root_bucket = txrw.root_bucket_mut();
let n = root_bucket.materialize_root();
n.put(b"00000001", b"00000001", b"0123456701234567", ZERO_PGID, 0);
n.put(b"00000002", b"00000002", b"0123456701234567", ZERO_PGID, 0);
let _split_nodes = n.split(20).collect_vec();
assert!(n.cell.borrow().parent.is_none(), "expected none parent");
Ok(())
}
#[test]
fn test_node_split_single_page() -> crate::Result<()> {
let mut test_db = TestDb::new()?;
let tx = test_db.begin_rw_unseal()?;
let txrw = tx.unseal_rw();
let root_bucket = txrw.root_bucket_mut();
let n = root_bucket.materialize_root();
n.put(b"00000001", b"00000001", b"0123456701234567", ZERO_PGID, 0);
n.put(b"00000002", b"00000002", b"0123456701234567", ZERO_PGID, 0);
n.put(b"00000003", b"00000003", b"0123456701234567", ZERO_PGID, 0);
n.put(b"00000004", b"00000004", b"0123456701234567", ZERO_PGID, 0);
n.put(b"00000005", b"00000005", b"0123456701234567", ZERO_PGID, 0);
let _split_nodes = n.split(4096).collect_vec();
assert!(n.cell.borrow().parent.is_none(), "expected none parent");
Ok(())
}
}