use crate::Result;
use crate::error::Error;
use crate::format::{
BTREE_MAGIC, BTREE_VERSION, MAX_PSIZE, META_SIZE, MIN_PSIZE, Meta, P_INVALID, P_ROOT,
PAGE_HEADER_SIZE, R_RECNO, SAVEMETA,
};
use crate::iter::{Entry, Iter};
use crate::page::{ItemRef, PageView};
use memmap2::{Advice, Mmap};
use std::borrow::Cow;
use std::cmp::Ordering;
use std::fs::File;
use std::path::Path;
#[derive(Debug)]
pub struct Db {
mmap: Mmap,
psize: usize,
swap: bool,
}
impl Db {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let file = File::open(path.as_ref())?;
let mmap = unsafe { Mmap::map(&file)? };
let _ = mmap.advise(Advice::Sequential);
let hdr: &[u8; META_SIZE] = mmap.first_chunk().ok_or(Error::ShortFile)?;
let (meta, swap) = Meta::parse(hdr);
if meta.magic != BTREE_MAGIC {
return Err(Error::BadMagic { magic: meta.magic });
}
if meta.version != BTREE_VERSION {
return Err(Error::BadVersion {
version: meta.version,
});
}
let psize = meta.psize as usize;
if !(MIN_PSIZE..=MAX_PSIZE).contains(&psize) || (psize & 1) != 0 {
return Err(Error::BadPageSize { psize: meta.psize });
}
if meta.flags & !SAVEMETA != 0 || meta.flags & R_RECNO != 0 {
return Err(Error::UnsupportedFlags { flags: meta.flags });
}
Ok(Self { mmap, psize, swap })
}
pub fn get(&self, key: &[u8]) -> Result<Option<Cow<'_, [u8]>>> {
let Some(pos) = self.search(key)? else {
return Ok(None);
};
let view = PageView::parse(pos.pgno, self.page(pos.pgno)?, self.swap)?;
let entry = view.leaf_entry(pos.index)?;
Ok(Some(self.materialise(&view, entry.data)?))
}
#[must_use]
pub const fn iter(&self) -> Iter<'_> {
Iter::new(self)
}
#[inline]
pub(crate) fn page(&self, pgno: u32) -> Result<&[u8]> {
let start = pgno as usize * self.psize;
let end = start + self.psize;
self.mmap
.get(start..end)
.ok_or(Error::PageOutOfBounds { pgno })
}
#[inline]
pub(crate) const fn swap(&self) -> bool {
self.swap
}
fn search(&self, key: &[u8]) -> Result<Option<Position>> {
let mut pgno = P_ROOT;
loop {
let view = PageView::parse(pgno, self.page(pgno)?, self.swap)?;
if view.is_leaf() {
return self.search_leaf(&view, key);
}
if !view.is_internal() {
return Err(Error::CorruptPage {
pgno,
reason: "non-internal page during descent",
});
}
pgno = self.descend_internal(&view, key)?;
}
}
fn search_leaf(&self, view: &PageView<'_>, key: &[u8]) -> Result<Option<Position>> {
let mut base = 0usize;
let mut lim = view.nentries();
while lim > 0 {
let idx = base + (lim >> 1);
let entry = view.leaf_entry(idx)?;
match self.compare_item(key, view, entry.key)? {
Ordering::Equal => {
return Ok(Some(Position {
pgno: view.pgno,
index: idx,
}));
}
Ordering::Greater => {
base = idx + 1;
lim -= 1;
}
Ordering::Less => {}
}
lim >>= 1;
}
Ok(None)
}
fn descend_internal(&self, view: &PageView<'_>, key: &[u8]) -> Result<u32> {
let leftmost_spine = view.prev_pg == P_INVALID;
let mut base = 0usize;
let mut lim = view.nentries();
let mut matched: Option<usize> = None;
while lim > 0 {
let idx = base + (lim >> 1);
let entry = view.internal_entry(idx)?;
let cmp = if idx == 0 && leftmost_spine {
Ordering::Greater
} else {
self.compare_item(key, view, entry.key)?
};
match cmp {
Ordering::Equal => {
matched = Some(idx);
break;
}
Ordering::Greater => {
base = idx + 1;
lim -= 1;
}
Ordering::Less => {}
}
lim >>= 1;
}
let chosen = match matched {
Some(idx) => idx,
None if base == 0 => 0,
None => base - 1,
};
Ok(view.internal_entry(chosen)?.child)
}
fn compare_item(&self, key: &[u8], view: &PageView<'_>, item: ItemRef) -> Result<Ordering> {
match item {
ItemRef::Inline { .. } => Ok(key.cmp(view.inline(item)?)),
ItemRef::Overflow { pgno, size } => {
let other = self.fetch_overflow(pgno, size)?;
Ok(key.cmp(other.as_slice()))
}
}
}
pub(crate) fn materialise<'a>(
&'a self,
view: &PageView<'a>,
item: ItemRef,
) -> Result<Cow<'a, [u8]>> {
match item {
ItemRef::Inline { .. } => Ok(Cow::Borrowed(view.inline(item)?)),
ItemRef::Overflow { pgno, size } => Ok(Cow::Owned(self.fetch_overflow(pgno, size)?)),
}
}
fn fetch_overflow(&self, start_pgno: u32, size: u32) -> Result<Vec<u8>> {
let mut out = Vec::with_capacity(size as usize);
let payload_per_page = self.psize - PAGE_HEADER_SIZE;
let mut pgno = start_pgno;
let mut remaining = size as usize;
while remaining > 0 {
if pgno == P_INVALID {
return Err(Error::CorruptOverflow { pgno: start_pgno });
}
let view = PageView::parse(pgno, self.page(pgno)?, self.swap)?;
if !view.is_overflow() {
return Err(Error::CorruptOverflow { pgno: start_pgno });
}
let take = remaining.min(payload_per_page);
let payload = view.overflow_payload();
if take > payload.len() {
return Err(Error::CorruptOverflow { pgno: start_pgno });
}
out.extend_from_slice(&payload[..take]);
remaining -= take;
pgno = view.next_pg;
}
Ok(out)
}
}
#[derive(Clone, Copy)]
struct Position {
pgno: u32,
index: usize,
}
impl<'a> IntoIterator for &'a Db {
type Item = Result<Entry<'a>>;
type IntoIter = Iter<'a>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}