mod page_buf;
use crate::Result;
use crate::format::{
B_NODUPS, BINTERNAL_HEADER_SIZE, BLEAF_HEADER_SIZE, BTREE_MAGIC, BTREE_VERSION, MAX_PSIZE,
P_BINTERNAL, P_BLEAF, P_INVALID, P_ROOT, P_TYPE, PAGE_HEADER_SIZE, align_entry,
};
use crate::page::PageView;
use page_buf::{PageBufMut, PageBufRef};
use std::cmp::Ordering;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::path::Path;
const DEFAULT_PSIZE: usize = 4096;
#[must_use = "Writer is staged in memory only; call finish() to write the database"]
pub struct Writer {
file: File,
psize: usize,
pages: Vec<u8>,
free: u32,
nrecs: u32,
flags: u32,
}
impl Writer {
fn page_bytes(&self, pgno: u32) -> &[u8] {
let start = pgno as usize * self.psize;
&self.pages[start..start + self.psize]
}
fn page_bytes_mut(&mut self, pgno: u32) -> &mut [u8] {
let start = pgno as usize * self.psize;
let psize = self.psize;
&mut self.pages[start..start + psize]
}
fn page(&self, pgno: u32) -> PageBufRef<'_> {
PageBufRef::new(self.page_bytes(pgno))
}
fn page_mut(&mut self, pgno: u32) -> PageBufMut<'_> {
PageBufMut::new(self.page_bytes_mut(pgno))
}
fn next_free_pgno(&self) -> u32 {
let count = self.pages.len() / self.psize;
u32::try_from(count).expect("page count fits in u32")
}
fn push_page(&mut self, buf: &[u8]) -> u32 {
debug_assert_eq!(buf.len(), self.psize);
let pgno = self.next_free_pgno();
self.pages.extend_from_slice(buf);
pgno
}
fn replace_page(&mut self, pgno: u32, buf: &[u8]) {
debug_assert_eq!(buf.len(), self.psize);
self.page_bytes_mut(pgno).copy_from_slice(buf);
}
pub fn create_new(path: impl AsRef<Path>) -> Result<Self> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(path)?;
let psize = DEFAULT_PSIZE;
let mut pages: Vec<u8> = Vec::with_capacity(64 * psize);
pages.resize(2 * psize, 0);
pages[psize..2 * psize].copy_from_slice(&empty_leaf(P_ROOT, psize));
Ok(Self {
file,
psize,
pages,
free: P_INVALID,
nrecs: 0,
flags: B_NODUPS,
})
}
pub fn del(&mut self, key: &[u8]) -> Result<bool> {
let spine = self.descend(key)?;
let leaf_pgno = spine.leaf_pgno;
let (exact, idx) = leaf_search(leaf_pgno, self.page(leaf_pgno), key)?;
if !exact {
return Ok(false);
}
delete_leaf_entry(&mut self.page_mut(leaf_pgno), idx);
Ok(true)
}
pub fn put(&mut self, key: &[u8], val: &[u8]) -> Result<bool> {
let psize = self.psize;
let leaf_bytes = align_entry(BLEAF_HEADER_SIZE + key.len() + val.len());
let sep_bytes = align_entry(BINTERNAL_HEADER_SIZE + key.len());
let zero_key_bytes = align_entry(BINTERNAL_HEADER_SIZE);
let usable = psize - PAGE_HEADER_SIZE;
if leaf_bytes + 2 > usable || sep_bytes + zero_key_bytes + 4 > usable {
return Err(crate::Error::EntryTooLarge {
key_len: key.len(),
val_len: val.len(),
});
}
let spine = self.descend(key)?;
let leaf_pgno = spine.leaf_pgno;
let leaf = self.page(leaf_pgno);
let (exact, idx) = leaf_search(leaf_pgno, leaf, key)?;
if exact {
return Ok(false);
}
if leaf.free_space() >= leaf_bytes + 2 {
insert_leaf_entry(&mut self.page_mut(leaf_pgno), idx, key, val);
return Ok(true);
}
if leaf_pgno == P_ROOT {
self.split_root_leaf(idx, key, val)?;
return Ok(true);
}
let sep = self.split_non_root_leaf(leaf_pgno, idx, key, val)?;
self.propagate_separator(spine.path, sep)?;
Ok(true)
}
fn descend(&self, key: &[u8]) -> Result<Spine> {
let mut path = Vec::new();
let mut current = P_ROOT;
loop {
let page = self.page(current);
if page.is_leaf() {
return Ok(Spine {
path,
leaf_pgno: current,
});
}
let view = PageView::parse(current, page.bytes(), false)?;
let leftmost = view.prev_pg == P_INVALID;
let chosen = descend_internal_pick(&view, key, leftmost)?;
let child = view.internal_entry(chosen)?.child;
path.push((current, chosen));
current = child;
}
}
fn split_non_root_leaf(
&mut self,
leaf_pgno: u32,
skip: usize,
key: &[u8],
val: &[u8],
) -> Result<Separator> {
let leaf = self.page(leaf_pgno);
if leaf.is_rightmost() && skip == leaf.nentries() {
self.split_leaf_sorted_opt(leaf_pgno, key, val)
} else {
self.split_leaf_full(leaf_pgno, skip, key, val)
}
}
fn split_leaf_sorted_opt(
&mut self,
leaf_pgno: u32,
key: &[u8],
val: &[u8],
) -> Result<Separator> {
let psize = self.psize;
let l_last_key = {
let leaf = self.page(leaf_pgno);
let view = PageView::parse(leaf_pgno, leaf.bytes(), false)?;
let n = view.nentries();
view.inline(view.leaf_entry(n - 1)?.key)?.to_vec()
};
let r_pgno = self.next_free_pgno();
let mut r_buf = empty_leaf(r_pgno, psize);
{
let mut r = PageBufMut::new(&mut r_buf);
r.set_prev_pg(leaf_pgno);
r.set_lower(PAGE_HEADER_SIZE + 2);
write_entry_at_slot(&mut r, 0, key, val);
}
self.page_mut(leaf_pgno).set_next_pg(r_pgno);
self.push_page(&r_buf);
Ok(Separator::Leaf {
sep_key: key.to_vec(),
child: r_pgno,
l_last_key,
})
}
fn split_leaf_full(
&mut self,
leaf_pgno: u32,
skip: usize,
key: &[u8],
val: &[u8],
) -> Result<Separator> {
let psize = self.psize;
let ilen = align_entry(BLEAF_HEADER_SIZE + key.len() + val.len());
let r_pgno = self.next_free_pgno();
let mut l_buf = empty_leaf(leaf_pgno, psize);
let mut r_buf = empty_leaf(r_pgno, psize);
let leaf_next_pg;
let (l_last_key, r_first_key) = {
let old = self.page(leaf_pgno);
let mut l = PageBufMut::new(&mut l_buf);
let mut r = PageBufMut::new(&mut r_buf);
l.set_prev_pg(old.prev_pg());
l.set_next_pg(r_pgno);
r.set_prev_pg(leaf_pgno);
leaf_next_pg = old.next_pg();
r.set_next_pg(leaf_next_pg);
let view = PageView::parse(leaf_pgno, old.bytes(), false)?;
let n = view.nentries();
let plan = compute_psplit_split_leaf(&view, skip, ilen, psize)?;
copy_entries(&view, 0..plan.h_in_l(), plan.l_skip(skip), &mut l)?;
copy_entries(&view, plan.h_in_l()..n, plan.r_skip(skip), &mut r)?;
let target = if plan.right_target { &mut r } else { &mut l };
write_entry_at_slot(target, plan.target_slot(skip), key, val);
(leaf_last_key(l.bytes())?, leaf_first_key(r.bytes())?)
};
self.replace_page(leaf_pgno, &l_buf);
let pushed = self.push_page(&r_buf);
debug_assert_eq!(pushed, r_pgno);
if leaf_next_pg != P_INVALID {
self.page_mut(leaf_next_pg).set_prev_pg(r_pgno);
}
Ok(Separator::Leaf {
sep_key: r_first_key,
child: r_pgno,
l_last_key,
})
}
fn propagate_separator(
&mut self,
mut spine: Vec<(u32, usize)>,
mut sep: Separator,
) -> Result<()> {
while let Some((parent_pgno, parent_idx)) = spine.pop() {
let skip = parent_idx + 1;
match self.try_insert_internal(parent_pgno, skip, &sep)? {
None => return Ok(()),
Some(new_sep) => sep = new_sep,
}
}
unreachable!("propagate_separator ran past the root");
}
fn try_insert_internal(
&mut self,
parent_pgno: u32,
skip: usize,
sep: &Separator,
) -> Result<Option<Separator>> {
let parent = self.page(parent_pgno);
let (nksize, nbytes) = sep.encoded_size(parent.prev_pg(), skip);
if parent.free_space() >= nbytes + 2 {
insert_internal_in_place(
&mut self.page_mut(parent_pgno),
skip,
nksize,
sep.sep_key(),
sep.child(),
);
return Ok(None);
}
if parent_pgno == P_ROOT {
self.split_root_internal(skip, sep, nksize)?;
return Ok(None);
}
if parent.is_rightmost() && skip == parent.nentries() {
return Ok(Some(self.split_internal_sorted_opt(
parent_pgno,
sep,
nksize,
)));
}
Ok(Some(self.split_internal_page(
parent_pgno,
skip,
sep,
nksize,
)?))
}
fn split_internal_sorted_opt(
&mut self,
parent_pgno: u32,
sep: &Separator,
nksize: usize,
) -> Separator {
let psize = self.psize;
let r_pgno = self.next_free_pgno();
let mut r_buf = empty_internal(r_pgno, psize);
{
let mut r = PageBufMut::new(&mut r_buf);
r.set_prev_pg(parent_pgno);
r.set_lower(PAGE_HEADER_SIZE + 2);
write_internal_entry_at_slot(&mut r, 0, nksize, sep.sep_key(), sep.child());
}
self.page_mut(parent_pgno).set_next_pg(r_pgno);
self.push_page(&r_buf);
Separator::Internal {
sep_key: sep.sep_key().to_vec(),
child: r_pgno,
ksize: nksize,
}
}
fn split_root_internal(&mut self, skip: usize, sep: &Separator, nksize: usize) -> Result<()> {
let psize = self.psize;
let ilen = align_entry(BINTERNAL_HEADER_SIZE + nksize);
let l_pgno = self.next_free_pgno();
let r_pgno = l_pgno + 1;
let mut l_buf = empty_internal(l_pgno, psize);
let mut r_buf = empty_internal(r_pgno, psize);
{
let mut l = PageBufMut::new(&mut l_buf);
let mut r = PageBufMut::new(&mut r_buf);
l.set_next_pg(r_pgno);
r.set_prev_pg(l_pgno);
let old_root = self.page(P_ROOT);
let view = PageView::parse(P_ROOT, old_root.bytes(), false)?;
let n = view.nentries();
let plan = compute_psplit_split_internal(&view, skip, ilen, psize)?;
copy_internal_entries(&view, 0..plan.h_in_l(), plan.l_skip(skip), &mut l)?;
copy_internal_entries(&view, plan.h_in_l()..n, plan.r_skip(skip), &mut r)?;
let target = if plan.right_target { &mut r } else { &mut l };
write_internal_entry_at_slot(
target,
plan.target_slot(skip),
nksize,
sep.sep_key(),
sep.child(),
);
}
let r_first_key = first_internal_key(&r_buf)?;
let mut new_root_buf = self.page_bytes(P_ROOT).to_vec();
convert_root_to_internal(
&mut PageBufMut::new(&mut new_root_buf),
psize,
l_pgno,
r_pgno,
&r_first_key,
);
let l_pushed = self.push_page(&l_buf);
debug_assert_eq!(l_pushed, l_pgno);
let r_pushed = self.push_page(&r_buf);
debug_assert_eq!(r_pushed, r_pgno);
self.replace_page(P_ROOT, &new_root_buf);
Ok(())
}
fn split_internal_page(
&mut self,
parent_pgno: u32,
skip: usize,
sep: &Separator,
nksize: usize,
) -> Result<Separator> {
let psize = self.psize;
let ilen = align_entry(BINTERNAL_HEADER_SIZE + nksize);
let r_pgno = self.next_free_pgno();
let mut l_buf = empty_internal(parent_pgno, psize);
let mut r_buf = empty_internal(r_pgno, psize);
let next_pg;
{
let old = self.page(parent_pgno);
let mut l = PageBufMut::new(&mut l_buf);
let mut r = PageBufMut::new(&mut r_buf);
l.set_prev_pg(old.prev_pg());
l.set_next_pg(r_pgno);
r.set_prev_pg(parent_pgno);
next_pg = old.next_pg();
r.set_next_pg(next_pg);
let view = PageView::parse(parent_pgno, old.bytes(), false)?;
let n = view.nentries();
let plan = compute_psplit_split_internal(&view, skip, ilen, psize)?;
copy_internal_entries(&view, 0..plan.h_in_l(), plan.l_skip(skip), &mut l)?;
copy_internal_entries(&view, plan.h_in_l()..n, plan.r_skip(skip), &mut r)?;
let target = if plan.right_target { &mut r } else { &mut l };
write_internal_entry_at_slot(
target,
plan.target_slot(skip),
nksize,
sep.sep_key(),
sep.child(),
);
}
let r_first_key = first_internal_key(&r_buf)?;
self.replace_page(parent_pgno, &l_buf);
let pushed = self.push_page(&r_buf);
debug_assert_eq!(pushed, r_pgno);
if next_pg != P_INVALID {
self.page_mut(next_pg).set_prev_pg(r_pgno);
}
let ksize = r_first_key.len();
Ok(Separator::Internal {
sep_key: r_first_key,
child: r_pgno,
ksize,
})
}
fn split_root_leaf(&mut self, skip: usize, key: &[u8], val: &[u8]) -> Result<()> {
let psize = self.psize;
let ilen = align_entry(BLEAF_HEADER_SIZE + key.len() + val.len());
let l_pgno = self.next_free_pgno();
let r_pgno = l_pgno + 1;
let mut l_buf = empty_leaf(l_pgno, psize);
let mut r_buf = empty_leaf(r_pgno, psize);
let r_first_key = {
let mut l = PageBufMut::new(&mut l_buf);
let mut r = PageBufMut::new(&mut r_buf);
l.set_next_pg(r_pgno);
r.set_prev_pg(l_pgno);
let old_root = self.page(P_ROOT);
let view = PageView::parse(P_ROOT, old_root.bytes(), false)?;
let n = view.nentries();
let plan = compute_psplit_split_leaf(&view, skip, ilen, psize)?;
copy_entries(&view, 0..plan.h_in_l(), plan.l_skip(skip), &mut l)?;
copy_entries(&view, plan.h_in_l()..n, plan.r_skip(skip), &mut r)?;
let target = if plan.right_target { &mut r } else { &mut l };
write_entry_at_slot(target, plan.target_slot(skip), key, val);
leaf_first_key(r.bytes())?
};
let mut new_root_buf = self.page_bytes(P_ROOT).to_vec();
convert_root_to_internal(
&mut PageBufMut::new(&mut new_root_buf),
psize,
l_pgno,
r_pgno,
&r_first_key,
);
let l_pushed = self.push_page(&l_buf);
debug_assert_eq!(l_pushed, l_pgno);
let r_pushed = self.push_page(&r_buf);
debug_assert_eq!(r_pushed, r_pgno);
self.replace_page(P_ROOT, &new_root_buf);
Ok(())
}
pub fn finish(self) -> Result<()> {
let Self {
file,
psize,
mut pages,
free,
nrecs,
flags,
} = self;
assert!(
!pages.is_empty() && pages.len() % psize == 0,
"arena length {len} is not a positive multiple of psize {psize}",
len = pages.len(),
);
pages[..psize].copy_from_slice(&build_meta(psize, free, nrecs, flags));
let mut bw = BufWriter::with_capacity(64 * 1024, file);
bw.write_all(&pages)?;
bw.flush()?;
Ok(())
}
}
fn build_meta(psize: usize, free: u32, nrecs: u32, flags: u32) -> Vec<u8> {
let mut p = vec![0u8; psize];
write_u32(&mut p, 0, BTREE_MAGIC);
write_u32(&mut p, 4, BTREE_VERSION);
write_u32(&mut p, 8, psize_as_u32(psize));
write_u32(&mut p, 12, free);
write_u32(&mut p, 16, nrecs);
write_u32(&mut p, 20, flags);
p
}
fn empty_leaf(pgno: u32, psize: usize) -> Vec<u8> {
empty_page(pgno, psize, P_BLEAF)
}
fn empty_internal(pgno: u32, psize: usize) -> Vec<u8> {
empty_page(pgno, psize, P_BINTERNAL)
}
fn empty_page(pgno: u32, psize: usize, flags: u32) -> Vec<u8> {
let mut buf = vec![0u8; psize];
let mut p = PageBufMut::new(&mut buf);
p.set_pgno(pgno);
p.set_flags(flags);
p.set_lower(PAGE_HEADER_SIZE);
p.set_upper(psize);
buf
}
#[inline]
fn psize_as_u32(psize: usize) -> u32 {
debug_assert!(psize <= MAX_PSIZE);
u32::try_from(psize).expect("psize <= MAX_PSIZE")
}
fn bt_defpfx(a: &[u8], b: &[u8]) -> usize {
match a.iter().zip(b).position(|(x, y)| x != y) {
Some(i) => i + 1,
None if a.len() < b.len() => a.len() + 1,
None => a.len(),
}
}
fn compute_psplit_split_leaf(
view: &PageView<'_>,
skip: usize,
ilen: usize,
psize: usize,
) -> Result<SplitPlan> {
psplit_loop(skip, ilen, psize, view.nentries(), |idx| {
entry_nbytes(view, idx)
})
}
fn entry_nbytes(view: &PageView<'_>, idx: usize) -> Result<usize> {
let entry = view.leaf_entry(idx)?;
let key = view.inline(entry.key)?;
let val = view.inline(entry.data)?;
Ok(align_entry(BLEAF_HEADER_SIZE + key.len() + val.len()))
}
fn copy_entries(
view: &PageView<'_>,
range: std::ops::Range<usize>,
reserve_at: Option<usize>,
dst: &mut PageBufMut<'_>,
) -> Result<()> {
let mut slot = 0usize;
for src_idx in range {
if Some(slot) == reserve_at {
slot += 1;
}
copy_one(view, src_idx, slot, dst)?;
slot += 1;
}
if Some(slot) == reserve_at {
slot += 1;
}
dst.set_lower(PAGE_HEADER_SIZE + slot * 2);
Ok(())
}
fn copy_one(
view: &PageView<'_>,
src_idx: usize,
dst_slot: usize,
dst: &mut PageBufMut<'_>,
) -> Result<()> {
let entry = view.leaf_entry(src_idx)?;
let key = view.inline(entry.key)?;
let val = view.inline(entry.data)?;
let nbytes = align_entry(BLEAF_HEADER_SIZE + key.len() + val.len());
let new_upper = dst.upper() - nbytes;
dst.set_upper(new_upper);
dst.set_linp(dst_slot, new_upper);
write_bleaf_inline(dst.bytes_mut(), new_upper, key, val);
Ok(())
}
fn delete_leaf_entry(page: &mut PageBufMut<'_>, idx: usize) {
let lower = page.lower();
let upper = page.upper();
let n = page.nentries();
let offset = page.linp(idx) as usize;
let ksize = read_u32_at(page.bytes(), offset) as usize;
let dsize = read_u32_at(page.bytes(), offset + 4) as usize;
let nbytes = align_entry(BLEAF_HEADER_SIZE + ksize + dsize);
page.bytes_mut().copy_within(upper..offset, upper + nbytes);
for i in 0..idx {
let v = page.linp(i) as usize;
if v < offset {
page.set_linp(i, v + nbytes);
}
}
for i in idx..n - 1 {
let v = page.linp(i + 1) as usize;
let new = if v < offset { v + nbytes } else { v };
page.set_linp(i, new);
}
page.set_lower(lower - 2);
page.set_upper(upper + nbytes);
}
fn write_entry_at_slot(dst: &mut PageBufMut<'_>, slot: usize, key: &[u8], val: &[u8]) {
let nbytes = align_entry(BLEAF_HEADER_SIZE + key.len() + val.len());
let new_upper = dst.upper() - nbytes;
dst.set_upper(new_upper);
dst.set_linp(slot, new_upper);
write_bleaf_inline(dst.bytes_mut(), new_upper, key, val);
}
fn leaf_first_key(page: &[u8]) -> Result<Vec<u8>> {
let view = PageView::parse(0, page, false)?;
Ok(view.inline(view.leaf_entry(0)?.key)?.to_vec())
}
fn leaf_last_key(page: &[u8]) -> Result<Vec<u8>> {
let view = PageView::parse(0, page, false)?;
let last = view.nentries() - 1;
Ok(view.inline(view.leaf_entry(last)?.key)?.to_vec())
}
fn convert_root_to_internal(
p: &mut PageBufMut<'_>,
psize: usize,
l_pgno: u32,
r_pgno: u32,
r_first_key: &[u8],
) {
let upper0 = psize - align_entry(BINTERNAL_HEADER_SIZE);
let upper1 = upper0 - align_entry(BINTERNAL_HEADER_SIZE + r_first_key.len());
let bytes = p.bytes_mut();
write_binternal_inline(bytes, upper0, 0, &[], l_pgno);
write_binternal_inline(bytes, upper1, r_first_key.len(), r_first_key, r_pgno);
p.set_linp(0, upper0);
p.set_linp(1, upper1);
p.set_lower(PAGE_HEADER_SIZE + 4);
p.set_upper(upper1);
let new_flags = (p.flags() & !P_TYPE) | P_BINTERNAL;
p.set_flags(new_flags);
}
struct Spine {
path: Vec<(u32, usize)>,
leaf_pgno: u32,
}
enum Separator {
Leaf {
sep_key: Vec<u8>,
child: u32,
l_last_key: Vec<u8>,
},
Internal {
sep_key: Vec<u8>,
child: u32,
ksize: usize,
},
}
impl Separator {
const fn child(&self) -> u32 {
match self {
Self::Leaf { child, .. } | Self::Internal { child, .. } => *child,
}
}
fn sep_key(&self) -> &[u8] {
match self {
Self::Leaf { sep_key, .. } | Self::Internal { sep_key, .. } => sep_key,
}
}
fn encoded_size(&self, parent_prev_pg: u32, skip: usize) -> (usize, usize) {
match self {
Self::Internal { ksize, .. } => (*ksize, align_entry(BINTERNAL_HEADER_SIZE + ksize)),
Self::Leaf {
sep_key,
l_last_key,
..
} => {
let full = sep_key.len();
let full_nbytes = align_entry(BINTERNAL_HEADER_SIZE + full);
let allow_pfx = parent_prev_pg != P_INVALID || skip > 1;
if !allow_pfx {
return (full, full_nbytes);
}
let pfx_len = bt_defpfx(l_last_key, sep_key);
let compressed = align_entry(BINTERNAL_HEADER_SIZE + pfx_len);
if compressed < full_nbytes {
(pfx_len, compressed)
} else {
(full, full_nbytes)
}
}
}
}
}
fn descend_internal_pick(view: &PageView<'_>, key: &[u8], leftmost: bool) -> Result<usize> {
let n = view.nentries();
let mut base = 0usize;
let mut lim = n;
let mut matched: Option<usize> = None;
while lim > 0 {
let mid = base + (lim >> 1);
let entry = view.internal_entry(mid)?;
let cmp = if mid == 0 && leftmost {
Ordering::Greater
} else {
key.cmp(view.inline(entry.key)?)
};
match cmp {
Ordering::Equal => {
matched = Some(mid);
break;
}
Ordering::Greater => {
base = mid + 1;
lim -= 1;
}
Ordering::Less => {}
}
lim >>= 1;
}
Ok(match matched {
Some(idx) => idx,
None if base == 0 => 0,
None => base - 1,
})
}
fn compute_psplit_split_internal(
view: &PageView<'_>,
skip: usize,
ilen: usize,
psize: usize,
) -> Result<SplitPlan> {
psplit_loop(skip, ilen, psize, view.nentries(), |idx| {
internal_entry_nbytes(view, idx)
})
}
#[derive(Clone, Copy, Debug)]
struct SplitPlan {
left_slots: usize,
right_target: bool,
}
impl SplitPlan {
const fn h_in_l(self) -> usize {
if self.right_target {
self.left_slots
} else {
self.left_slots - 1
}
}
const fn l_skip(self, skip: usize) -> Option<usize> {
if self.right_target { None } else { Some(skip) }
}
const fn r_skip(self, skip: usize) -> Option<usize> {
if self.right_target {
Some(skip - self.left_slots)
} else {
None
}
}
const fn target_slot(self, skip: usize) -> usize {
if self.right_target {
skip - self.left_slots
} else {
skip
}
}
}
fn psplit_loop(
skip: usize,
ilen: usize,
psize: usize,
top: usize,
mut entry_size: impl FnMut(usize) -> Result<usize>,
) -> Result<SplitPlan> {
let full = psize - PAGE_HEADER_SIZE;
let half = full / 2;
let mut used: usize = 0;
let mut nxt = 0usize;
let mut off = 0usize;
loop {
if nxt >= top {
break;
}
let nbytes = if skip == off { ilen } else { entry_size(nxt)? };
if (skip <= off && used + nbytes + 2 >= full) || nxt + 1 == top {
break;
}
if skip != off {
nxt += 1;
}
used += nbytes + 2;
off += 1;
if used >= half {
break;
}
}
let left_slots = off;
let right_target = skip >= left_slots;
Ok(SplitPlan {
left_slots,
right_target,
})
}
fn internal_entry_nbytes(view: &PageView<'_>, idx: usize) -> Result<usize> {
let entry = view.internal_entry(idx)?;
let key = view.inline(entry.key)?;
Ok(align_entry(BINTERNAL_HEADER_SIZE + key.len()))
}
fn copy_internal_entries(
view: &PageView<'_>,
range: std::ops::Range<usize>,
reserve_at: Option<usize>,
dst: &mut PageBufMut<'_>,
) -> Result<()> {
let mut slot = 0usize;
for src_idx in range {
if Some(slot) == reserve_at {
slot += 1;
}
copy_internal_one(view, src_idx, slot, dst)?;
slot += 1;
}
if Some(slot) == reserve_at {
slot += 1;
}
dst.set_lower(PAGE_HEADER_SIZE + slot * 2);
Ok(())
}
fn copy_internal_one(
view: &PageView<'_>,
src_idx: usize,
dst_slot: usize,
dst: &mut PageBufMut<'_>,
) -> Result<()> {
let entry = view.internal_entry(src_idx)?;
let key = view.inline(entry.key)?;
let nbytes = align_entry(BINTERNAL_HEADER_SIZE + key.len());
let new_upper = dst.upper() - nbytes;
dst.set_upper(new_upper);
dst.set_linp(dst_slot, new_upper);
write_binternal_inline(dst.bytes_mut(), new_upper, key.len(), key, entry.child);
Ok(())
}
fn write_internal_entry_at_slot(
dst: &mut PageBufMut<'_>,
slot: usize,
nksize: usize,
sep_key: &[u8],
child: u32,
) {
let nbytes = align_entry(BINTERNAL_HEADER_SIZE + nksize);
let new_upper = dst.upper() - nbytes;
dst.set_upper(new_upper);
dst.set_linp(slot, new_upper);
write_binternal_inline(dst.bytes_mut(), new_upper, nksize, sep_key, child);
}
fn insert_internal_in_place(
page: &mut PageBufMut<'_>,
skip: usize,
nksize: usize,
sep_key: &[u8],
child: u32,
) {
let nbytes = align_entry(BINTERNAL_HEADER_SIZE + nksize);
let new_upper = page.reserve_slot(skip, nbytes);
write_binternal_inline(page.bytes_mut(), new_upper, nksize, sep_key, child);
}
fn first_internal_key(page: &[u8]) -> Result<Vec<u8>> {
let view = PageView::parse(0, page, false)?;
let entry = view.internal_entry(0)?;
Ok(view.inline(entry.key)?.to_vec())
}
fn leaf_search(pgno: u32, page: PageBufRef<'_>, key: &[u8]) -> Result<(bool, usize)> {
let view = PageView::parse(pgno, page.bytes(), false)?;
let mut base = 0usize;
let mut lim = view.nentries();
while lim > 0 {
let idx = base + (lim >> 1);
let entry = view.leaf_entry(idx)?;
let entry_key = view.inline(entry.key)?;
match key.cmp(entry_key) {
Ordering::Equal => return Ok((true, idx)),
Ordering::Greater => {
base = idx + 1;
lim -= 1;
}
Ordering::Less => {}
}
lim >>= 1;
}
Ok((false, base))
}
fn insert_leaf_entry(page: &mut PageBufMut<'_>, idx: usize, key: &[u8], val: &[u8]) {
let nbytes = align_entry(BLEAF_HEADER_SIZE + key.len() + val.len());
let new_upper = page.reserve_slot(idx, nbytes);
write_bleaf_inline(page.bytes_mut(), new_upper, key, val);
}
#[inline]
fn len_as_u32(len: usize) -> u32 {
debug_assert!(len <= MAX_PSIZE);
u32::try_from(len).expect("len <= MAX_PSIZE")
}
#[inline]
fn write_u32(buf: &mut [u8], off: usize, value: u32) {
buf[off..off + 4].copy_from_slice(&value.to_ne_bytes());
}
#[inline]
fn read_u32_at(buf: &[u8], off: usize) -> u32 {
u32::from_ne_bytes(buf[off..off + 4].try_into().expect("4 bytes"))
}
fn write_bleaf_inline(bytes: &mut [u8], off: usize, key: &[u8], val: &[u8]) {
write_u32(bytes, off, len_as_u32(key.len()));
write_u32(bytes, off + 4, len_as_u32(val.len()));
bytes[off + 8] = 0;
let key_pos = off + BLEAF_HEADER_SIZE;
bytes[key_pos..key_pos + key.len()].copy_from_slice(key);
let val_pos = key_pos + key.len();
bytes[val_pos..val_pos + val.len()].copy_from_slice(val);
}
fn write_binternal_inline(bytes: &mut [u8], off: usize, ksize: usize, key: &[u8], child: u32) {
debug_assert!(key.len() >= ksize);
write_u32(bytes, off, len_as_u32(ksize));
write_u32(bytes, off + 4, child);
bytes[off + 8] = 0;
let key_pos = off + BINTERNAL_HEADER_SIZE;
bytes[key_pos..key_pos + ksize].copy_from_slice(&key[..ksize]);
}