use crate::common::bucket::{BucketHeader, BUCKET_HEADER_SIZE};
use crate::common::cell::{Ref, RefMut};
use crate::common::memory::{BCell, IsAligned};
use crate::common::page::{
CoerciblePage, MutPage, PageHeader, RefPage, BUCKET_LEAF_FLAG, LEAF_PAGE_FLAG, PAGE_HEADER_SIZE,
};
use crate::common::tree::{
MappedBranchPage, MappedLeafPage, TreePage, BRANCH_PAGE_ELEMENT_SIZE, LEAF_PAGE_ELEMENT_SIZE,
};
use crate::common::{BVec, HashMap, PgId, SplitRef, ZERO_PGID};
use crate::cursor::{CursorIApi, CursorImpl, CursorRwIApi, CursorRwImpl, InnerCursor, PageNode};
use crate::node::NodeRwCell;
use crate::tx::{TxCell, TxIApi, TxRwCell, TxRwIApi};
use crate::Error::{
BucketExists, BucketNameRequired, BucketNotFound, IncompatibleValue, KeyRequired, KeyTooLarge,
ValueTooLarge,
};
use crate::{CursorRwApi, Error};
use bumpalo::Bump;
use bytemuck::{Pod, Zeroable};
use getset::CopyGetters;
use std::alloc::Layout;
use std::marker::PhantomData;
use std::ops::{AddAssign, Deref};
use std::ptr::slice_from_raw_parts_mut;
use std::slice::{from_raw_parts, from_raw_parts_mut};
use std::{mem, ptr};
pub trait BucketApi<'tx>
where
Self: Sized,
{
fn root(&self) -> PgId;
fn writable(&self) -> bool;
fn cursor(&self) -> CursorImpl<'tx>;
fn bucket<T: AsRef<[u8]>>(&self, name: T) -> Option<BucketImpl<'tx>>;
fn get<T: AsRef<[u8]>>(&self, key: T) -> Option<&[u8]>;
fn sequence(&self) -> u64;
fn for_each<F: FnMut(&'tx [u8], Option<&'tx [u8]>) -> crate::Result<()>>(
&self, f: F,
) -> crate::Result<()>;
fn for_each_bucket<F: FnMut(&'tx [u8]) -> crate::Result<()>>(&self, f: F) -> crate::Result<()>;
fn stats(&self) -> BucketStats;
}
pub trait BucketRwApi<'tx>: BucketApi<'tx> {
fn bucket_mut<T: AsRef<[u8]>>(&mut self, name: T) -> Option<impl BucketRwApi<'tx>>;
fn create_bucket<T: AsRef<[u8]>>(&mut self, key: T) -> crate::Result<impl BucketRwApi<'tx>>;
fn create_bucket_if_not_exists<T: AsRef<[u8]>>(
&mut self, key: T,
) -> crate::Result<impl BucketRwApi<'tx>>;
fn cursor_mut(&self) -> impl CursorRwApi<'tx>;
fn delete_bucket<T: AsRef<[u8]>>(&mut self, key: T) -> crate::Result<()>;
fn put<T: AsRef<[u8]>, U: AsRef<[u8]>>(&mut self, key: T, data: U) -> crate::Result<()>;
fn delete<T: AsRef<[u8]>>(&mut self, key: T) -> crate::Result<()>;
fn set_sequence(&mut self, v: u64) -> crate::Result<()>;
fn next_sequence(&mut self) -> crate::Result<u64>;
fn set_fill_percent(&mut self, fill_percent: f64);
}
pub struct BucketImpl<'tx> {
b: BucketWrapper<'tx>,
}
pub enum BucketWrapper<'tx> {
R(BucketCell<'tx>),
RW(BucketRwCell<'tx>),
}
impl<'tx> From<BucketCell<'tx>> for BucketImpl<'tx> {
fn from(value: BucketCell<'tx>) -> Self {
BucketImpl {
b: BucketWrapper::R(value),
}
}
}
impl<'tx> From<BucketRwCell<'tx>> for BucketImpl<'tx> {
fn from(value: BucketRwCell<'tx>) -> Self {
BucketImpl {
b: BucketWrapper::RW(value),
}
}
}
impl<'tx> BucketApi<'tx> for BucketImpl<'tx> {
fn root(&self) -> PgId {
match &self.b {
BucketWrapper::R(r) => r.root(),
BucketWrapper::RW(rw) => rw.root(),
}
}
fn writable(&self) -> bool {
match &self.b {
BucketWrapper::R(r) => r.is_writeable(),
BucketWrapper::RW(rw) => rw.is_writeable(),
}
}
fn cursor(&self) -> CursorImpl<'tx> {
match &self.b {
BucketWrapper::R(r) => InnerCursor::new(*r, r.tx().bump()).into(),
BucketWrapper::RW(rw) => InnerCursor::new(*rw, rw.tx().bump()).into(),
}
}
fn bucket<T: AsRef<[u8]>>(&self, name: T) -> Option<BucketImpl<'tx>> {
match &self.b {
BucketWrapper::R(r) => r.api_bucket(name.as_ref()).map(BucketImpl::from),
BucketWrapper::RW(rw) => rw.api_bucket(name.as_ref()).map(BucketImpl::from),
}
}
fn get<T: AsRef<[u8]>>(&self, key: T) -> Option<&[u8]> {
match &self.b {
BucketWrapper::R(r) => r.api_get(key.as_ref()),
BucketWrapper::RW(rw) => rw.api_get(key.as_ref()),
}
}
fn sequence(&self) -> u64 {
match &self.b {
BucketWrapper::R(r) => r.api_sequence(),
BucketWrapper::RW(rw) => rw.api_sequence(),
}
}
fn for_each<F: FnMut(&'tx [u8], Option<&'tx [u8]>) -> crate::Result<()>>(
&self, f: F,
) -> crate::Result<()> {
match &self.b {
BucketWrapper::R(r) => r.api_for_each(f),
BucketWrapper::RW(rw) => rw.api_for_each(f),
}
}
fn for_each_bucket<F: FnMut(&'tx [u8]) -> crate::Result<()>>(&self, f: F) -> crate::Result<()> {
match &self.b {
BucketWrapper::R(r) => r.api_for_each_bucket(f),
BucketWrapper::RW(rw) => rw.api_for_each_bucket(f),
}
}
fn stats(&self) -> BucketStats {
match &self.b {
BucketWrapper::R(r) => r.api_stats(),
BucketWrapper::RW(rw) => rw.api_stats(),
}
}
}
pub struct BucketRwImpl<'tx> {
b: BucketRwCell<'tx>,
}
impl<'tx> From<BucketRwCell<'tx>> for BucketRwImpl<'tx> {
fn from(value: BucketRwCell<'tx>) -> Self {
BucketRwImpl { b: value }
}
}
impl<'tx> BucketApi<'tx> for BucketRwImpl<'tx> {
fn root(&self) -> PgId {
self.b.root()
}
fn writable(&self) -> bool {
self.b.is_writeable()
}
fn cursor(&self) -> CursorImpl<'tx> {
InnerCursor::new(self.b, self.b.tx().bump()).into()
}
fn bucket<T: AsRef<[u8]>>(&self, name: T) -> Option<BucketImpl<'tx>> {
self.b.api_bucket(name.as_ref()).map(BucketImpl::from)
}
fn get<T: AsRef<[u8]>>(&self, key: T) -> Option<&[u8]> {
self.b.api_get(key.as_ref())
}
fn sequence(&self) -> u64 {
self.b.api_sequence()
}
fn for_each<F: FnMut(&'tx [u8], Option<&'tx [u8]>) -> crate::Result<()>>(
&self, f: F,
) -> crate::Result<()> {
self.b.api_for_each(f)
}
fn for_each_bucket<F: FnMut(&'tx [u8]) -> crate::Result<()>>(&self, f: F) -> crate::Result<()> {
self.b.api_for_each_bucket(f)
}
fn stats(&self) -> BucketStats {
self.b.api_stats()
}
}
impl<'tx> BucketRwApi<'tx> for BucketRwImpl<'tx> {
fn bucket_mut<T: AsRef<[u8]>>(&mut self, name: T) -> Option<impl BucketRwApi<'tx>> {
self.b.api_bucket(name.as_ref()).map(BucketRwImpl::from)
}
fn create_bucket<T: AsRef<[u8]>>(&mut self, key: T) -> crate::Result<impl BucketRwApi<'tx>> {
self
.b
.api_create_bucket(key.as_ref())
.map(BucketRwImpl::from)
}
fn create_bucket_if_not_exists<T: AsRef<[u8]>>(
&mut self, key: T,
) -> crate::Result<impl BucketRwApi<'tx>> {
self
.b
.api_create_bucket_if_not_exists(key.as_ref())
.map(BucketRwImpl::from)
}
fn cursor_mut(&self) -> impl CursorRwApi<'tx> {
CursorRwImpl::new(InnerCursor::new(self.b, self.b.tx().bump()))
}
fn delete_bucket<T: AsRef<[u8]>>(&mut self, key: T) -> crate::Result<()> {
self.b.api_delete_bucket(key.as_ref())
}
fn put<T: AsRef<[u8]>, U: AsRef<[u8]>>(&mut self, key: T, data: U) -> crate::Result<()> {
self.b.api_put(key.as_ref(), data.as_ref())
}
fn delete<T: AsRef<[u8]>>(&mut self, key: T) -> crate::Result<()> {
self.b.api_delete(key.as_ref())
}
fn set_sequence(&mut self, v: u64) -> crate::Result<()> {
self.b.api_set_sequence(v)
}
fn next_sequence(&mut self) -> crate::Result<u64> {
self.b.api_next_sequence()
}
fn set_fill_percent(&mut self, fill_percent: f64) {
self.b.cell.borrow_mut().w.fill_percent = fill_percent;
}
}
#[derive(Copy, Clone, Eq, PartialEq, Default, Debug, CopyGetters)]
#[getset(get_copy = "pub")]
pub struct BucketStats {
branch_page_n: i64,
branch_overflow_n: i64,
leaf_page_n: i64,
leaf_overflow_n: i64,
pub(crate) key_n: i64,
depth: i64,
branch_alloc: i64,
branch_in_use: i64,
leaf_alloc: i64,
leaf_in_use: i64,
bucket_n: i64,
inline_bucket_n: i64,
inline_bucket_in_use: i64,
}
impl AddAssign<BucketStats> for BucketStats {
fn add_assign(&mut self, rhs: BucketStats) {
self.branch_page_n += rhs.branch_page_n;
self.branch_overflow_n += rhs.branch_overflow_n;
self.leaf_page_n += rhs.leaf_page_n;
self.leaf_overflow_n += rhs.leaf_overflow_n;
self.key_n += rhs.key_n;
if self.depth < rhs.depth {
self.depth = rhs.depth;
}
self.branch_alloc += rhs.branch_alloc;
self.branch_in_use += rhs.branch_in_use;
self.leaf_alloc += rhs.leaf_alloc;
self.leaf_in_use += rhs.leaf_in_use;
self.bucket_n += rhs.bucket_n;
self.inline_bucket_n += rhs.inline_bucket_n;
self.inline_bucket_in_use += rhs.inline_bucket_in_use;
}
}
const DEFAULT_FILL_PERCENT: f64 = 0.5;
const MAX_KEY_SIZE: u32 = 32768;
const MAX_VALUE_SIZE: u32 = (1 << 31) - 2;
const INLINE_BUCKET_ALIGNMENT: usize = mem::align_of::<InlineBucket>();
const INLINE_BUCKET_SIZE: usize = mem::size_of::<InlineBucket>();
pub(crate) const MIN_FILL_PERCENT: f64 = 0.1;
pub(crate) const MAX_FILL_PERCENT: f64 = 1.0;
#[repr(C)]
#[derive(Copy, Clone, Pod, Zeroable)]
struct InlineBucket {
header: BucketHeader,
page: PageHeader,
}
impl Default for InlineBucket {
fn default() -> Self {
InlineBucket {
header: BucketHeader::new(ZERO_PGID, 0),
page: PageHeader {
id: Default::default(),
flags: LEAF_PAGE_FLAG,
count: 0,
overflow: 0,
},
}
}
}
pub(crate) trait BucketIApi<'tx, T: TxIApi<'tx>>:
SplitRef<BucketR<'tx>, T, InnerBucketW<'tx, T, Self>>
{
fn new_in(
bump: &'tx Bump, bucket_header: BucketHeader, tx: T, inline_page: Option<RefPage<'tx>>,
) -> Self;
fn is_writeable(&self) -> bool;
fn tx(self) -> T {
self.split_bound()
}
fn root(self) -> PgId {
self.split_r().bucket_header.root()
}
fn i_cursor(self) -> InnerCursor<'tx, T, Self> {
let tx = self.tx();
tx.split_r().stats.as_ref().unwrap().inc_cursor_count(1);
InnerCursor::new(self, tx.bump())
}
fn api_bucket(self, name: &[u8]) -> Option<Self> {
if let Some(w) = self.split_ow() {
if let Some(child) = w.buckets.get(name) {
return Some(*child);
}
}
let mut c = self.i_cursor();
let (k, v, flags) = c.i_seek(name)?;
if name != k || (flags & BUCKET_LEAF_FLAG) == 0 {
return None;
}
let child = self.open_bucket(v);
if let Some(mut w) = self.split_ow_mut() {
let tx = self.split_bound();
let bump = tx.bump();
let name = bump.alloc_slice_copy(name);
w.buckets.insert(name, child);
}
Some(child)
}
fn open_bucket(self, mut value: &[u8]) -> Self {
let tx = self.tx();
let bump = tx.bump();
if !IsAligned::is_aligned_to::<InlineBucket>(value.as_ptr()) {
let layout = Layout::from_size_align(value.len(), INLINE_BUCKET_ALIGNMENT).unwrap();
let new_value = unsafe {
let data = bump.alloc_layout(layout).as_ptr();
ptr::write_bytes(data, 0, value.len());
&mut *slice_from_raw_parts_mut(data, value.len())
};
new_value.copy_from_slice(value);
value = new_value;
}
let bucket_header = *bytemuck::from_bytes::<BucketHeader>(value.split_at(BUCKET_HEADER_SIZE).0);
let ref_page = if bucket_header.root() == ZERO_PGID {
assert!(
value.len() >= INLINE_BUCKET_SIZE,
"subbucket value not large enough. Expected at least {} bytes. Was {}",
INLINE_BUCKET_SIZE,
value.len()
);
unsafe {
let ref_page_ptr = value.as_ptr().add(BUCKET_HEADER_SIZE);
Some(RefPage::new(ref_page_ptr))
}
} else {
None
};
Self::new_in(bump, bucket_header, tx, ref_page)
}
fn api_get(self, key: &[u8]) -> Option<&'tx [u8]> {
if let Some((k, v, flags)) = self.i_cursor().i_seek(key) {
if (flags & BUCKET_LEAF_FLAG) != 0 {
return None;
}
if key != k {
return None;
}
Some(v)
} else {
None
}
}
fn api_for_each<F: FnMut(&'tx [u8], Option<&'tx [u8]>) -> crate::Result<()>>(
self, mut f: F,
) -> crate::Result<()> {
let mut c = self.i_cursor();
let mut inode = c.api_first();
while let Some((k, v)) = inode {
f(k, v)?;
inode = c.api_next();
}
Ok(())
}
fn api_for_each_bucket<F: FnMut(&'tx [u8]) -> crate::Result<()>>(
self, mut f: F,
) -> crate::Result<()> {
let mut c = self.i_cursor();
let mut inode = c.i_first();
while let Some((k, _, flags)) = inode {
if flags & BUCKET_LEAF_FLAG != 0 {
f(k)?;
}
inode = c.i_next();
}
Ok(())
}
fn for_each_page<F: FnMut(&RefPage<'tx>, usize, &mut BVec<PgId>)>(self, f: &mut F) {
let tx = self.tx();
let root = {
let r = self.split_r();
let root = r.bucket_header.root();
if let Some(page) = &r.inline_page {
let mut v = BVec::with_capacity_in(1, tx.bump());
v.push(root);
f(page, 0, &mut v);
return;
}
root
};
tx.for_each_page(root, f)
}
fn for_each_page_node<F: FnMut(&PageNode<'tx>, usize) + Copy>(self, mut f: F) {
let root = {
let r = self.split_r();
if let Some(page) = &r.inline_page {
f(&PageNode::Page(*page), 0);
return;
}
r.bucket_header.root()
};
self._for_each_page_node(root, 0, f);
}
fn _for_each_page_node<F: FnMut(&PageNode<'tx>, usize) + Copy>(
self, root: PgId, depth: usize, mut f: F,
) {
let pn = self.page_node(root);
f(&pn, depth);
match &pn {
PageNode::Page(page) => {
if let Some(branch_page) = MappedBranchPage::coerce_ref(page) {
branch_page.elements().iter().for_each(|elem| {
self._for_each_page_node(elem.pgid(), depth + 1, f);
});
}
}
PageNode::Node(node) => {
let bump = self.tx().bump();
let v = {
let node_borrow = node.cell.borrow();
let mut v = BVec::with_capacity_in(node_borrow.inodes.len(), bump);
let ids = node_borrow.inodes.iter().map(|inode| inode.pgid());
v.extend(ids);
v
};
v.into_iter()
.for_each(|pgid| self._for_each_page_node(pgid, depth + 1, f));
}
}
}
fn page_node(self, id: PgId) -> PageNode<'tx> {
let (r, w) = self.split_ref();
if r.bucket_header.root() == ZERO_PGID {
if id != ZERO_PGID {
panic!("inline bucket non-zero page access(2): {} != 0", id)
}
return if let Some(root_node) = &w.and_then(|wb| wb.root_node) {
PageNode::Node(*root_node)
} else {
PageNode::Page(r.inline_page.unwrap())
};
}
if let Some(wb) = &w {
if let Some(node) = wb.nodes.get(&id) {
return PageNode::Node(*node);
}
}
PageNode::Page(self.tx().mem_page(id))
}
fn api_sequence(self) -> u64 {
self.split_r().bucket_header.sequence()
}
fn max_inline_bucket_size(self) -> usize {
self.tx().page_size() / 4
}
fn api_stats(self) -> BucketStats {
let mut s = BucketStats::default();
let mut sub_stats = BucketStats::default();
let page_size = self.tx().page_size();
s.bucket_n += 1;
if self.root() == ZERO_PGID {
s.inline_bucket_n += 1;
}
self.for_each_page(&mut |p, depth, _| {
if let Some(leaf_page) = MappedLeafPage::coerce_ref(p) {
s.key_n += p.count as i64;
let mut used = PAGE_HEADER_SIZE;
if let Some(last_element) = leaf_page.elements().last() {
used += LEAF_PAGE_ELEMENT_SIZE * (p.count - 1) as usize;
used += last_element.pos() as usize
+ last_element.key_size() as usize
+ last_element.value_size() as usize;
}
if self.root() == ZERO_PGID {
s.inline_bucket_in_use += used as i64;
} else {
s.leaf_page_n += 1;
s.leaf_in_use += used as i64;
s.leaf_overflow_n += leaf_page.overflow as i64;
for leaf_elem in leaf_page.iter() {
if leaf_elem.is_bucket_entry() {
sub_stats += self.open_bucket(leaf_elem.value()).api_stats();
}
}
}
} else if let Some(branch_page) = MappedBranchPage::coerce_ref(p) {
s.branch_page_n += 1;
if let Some(last_element) = branch_page.elements().last() {
let mut used =
PAGE_HEADER_SIZE + (BRANCH_PAGE_ELEMENT_SIZE * (branch_page.count - 1) as usize);
used += last_element.pos() as usize + last_element.key_size() as usize;
s.branch_in_use += used as i64;
s.branch_overflow_n += branch_page.overflow as i64;
}
}
if (depth + 1) as i64 > s.depth {
s.depth = (depth + 1) as i64;
}
});
s.branch_alloc = (s.branch_page_n + s.branch_overflow_n) * page_size as i64;
s.leaf_alloc = (s.leaf_page_n + s.leaf_overflow_n) * page_size as i64;
s.depth += sub_stats.depth;
s += sub_stats;
s
}
fn into_impl(self) -> BucketImpl<'tx>;
}
pub(crate) trait BucketRwIApi<'tx>: BucketIApi<'tx, TxRwCell<'tx>> {
fn materialize_root(self) -> NodeRwCell<'tx>;
fn api_create_bucket(self, key: &[u8]) -> crate::Result<Self>;
fn api_create_bucket_if_not_exists(self, key: &[u8]) -> crate::Result<Self>;
fn api_delete_bucket(self, key: &[u8]) -> crate::Result<()>;
fn api_put(self, key: &[u8], value: &[u8]) -> crate::Result<()>;
fn api_delete(self, key: &[u8]) -> crate::Result<()>;
fn api_set_sequence(self, v: u64) -> crate::Result<()>;
fn api_next_sequence(self) -> crate::Result<u64>;
fn free(self);
fn spill(self, bump: &'tx Bump) -> crate::Result<()>;
fn write(self, bump: &'tx Bump) -> &'tx [u8];
fn inlineable(self) -> bool;
fn own_in(self);
fn node(self, pgid: PgId, parent: Option<NodeRwCell<'tx>>) -> NodeRwCell<'tx>;
fn rebalance(self);
}
pub struct BucketR<'tx> {
pub(crate) bucket_header: BucketHeader,
pub(crate) inline_page: Option<RefPage<'tx>>,
p: PhantomData<&'tx u8>,
}
impl<'tx> BucketR<'tx> {
pub fn new(in_bucket: BucketHeader) -> BucketR<'tx> {
BucketR {
bucket_header: in_bucket,
inline_page: None,
p: Default::default(),
}
}
}
pub struct InnerBucketW<'tx, T: TxIApi<'tx>, B: BucketIApi<'tx, T>> {
pub(crate) root_node: Option<NodeRwCell<'tx>>,
buckets: HashMap<'tx, &'tx [u8], B>,
pub(crate) nodes: HashMap<'tx, PgId, NodeRwCell<'tx>>,
pub(crate) fill_percent: f64,
phantom_t: PhantomData<T>,
}
impl<'tx, T: TxIApi<'tx>, B: BucketIApi<'tx, T>> InnerBucketW<'tx, T, B> {
pub fn new_in(bump: &'tx Bump) -> InnerBucketW<'tx, T, B> {
InnerBucketW {
root_node: None,
buckets: HashMap::with_capacity_in(0, bump),
nodes: HashMap::with_capacity_in(0, bump),
fill_percent: DEFAULT_FILL_PERCENT,
phantom_t: PhantomData,
}
}
}
pub type BucketW<'tx> = InnerBucketW<'tx, TxRwCell<'tx>, BucketRwCell<'tx>>;
pub struct BucketRW<'tx> {
pub(crate) r: BucketR<'tx>,
pub(crate) w: BucketW<'tx>,
}
#[derive(Copy, Clone)]
pub(crate) struct BucketCell<'tx> {
cell: BCell<'tx, BucketR<'tx>, TxCell<'tx>>,
}
impl<'tx> BucketIApi<'tx, TxCell<'tx>> for BucketCell<'tx> {
fn new_in(
bump: &'tx Bump, bucket_header: BucketHeader, tx: TxCell<'tx>,
inline_page: Option<RefPage<'tx>>,
) -> Self {
let r = BucketR {
bucket_header,
inline_page,
p: Default::default(),
};
BucketCell {
cell: BCell::new_in(r, tx, bump),
}
}
#[inline(always)]
fn is_writeable(&self) -> bool {
false
}
fn into_impl(self) -> BucketImpl<'tx> {
self.into()
}
}
impl<'tx> SplitRef<BucketR<'tx>, TxCell<'tx>, InnerBucketW<'tx, TxCell<'tx>, BucketCell<'tx>>>
for BucketCell<'tx>
{
fn split_r(&self) -> Ref<BucketR<'tx>> {
self.cell.borrow()
}
fn split_ref(
&self,
) -> (
Ref<BucketR<'tx>>,
Option<Ref<InnerBucketW<'tx, TxCell<'tx>, BucketCell<'tx>>>>,
) {
(self.cell.borrow(), None)
}
fn split_ow(&self) -> Option<Ref<InnerBucketW<'tx, TxCell<'tx>, BucketCell<'tx>>>> {
None
}
fn split_bound(&self) -> TxCell<'tx> {
self.cell.bound()
}
fn split_r_mut(&self) -> RefMut<BucketR<'tx>> {
self.cell.borrow_mut()
}
fn split_ow_mut(&self) -> Option<RefMut<InnerBucketW<'tx, TxCell<'tx>, BucketCell<'tx>>>> {
None
}
}
#[derive(Copy, Clone)]
pub(crate) struct BucketRwCell<'tx> {
pub(crate) cell: BCell<'tx, BucketRW<'tx>, TxRwCell<'tx>>,
}
impl<'tx> SplitRef<BucketR<'tx>, TxRwCell<'tx>, BucketW<'tx>> for BucketRwCell<'tx> {
fn split_r(&self) -> Ref<BucketR<'tx>> {
Ref::map(self.cell.borrow(), |b| &b.r)
}
fn split_ref(&self) -> (Ref<BucketR<'tx>>, Option<Ref<BucketW<'tx>>>) {
let (r, w) = Ref::map_split(self.cell.borrow(), |b| (&b.r, &b.w));
(r, Some(w))
}
fn split_ow(&self) -> Option<Ref<BucketW<'tx>>> {
Some(Ref::map(self.cell.borrow(), |b| &b.w))
}
fn split_bound(&self) -> TxRwCell<'tx> {
self.cell.bound()
}
fn split_r_mut(&self) -> RefMut<BucketR<'tx>> {
RefMut::map(self.cell.borrow_mut(), |b| &mut b.r)
}
fn split_ow_mut(&self) -> Option<RefMut<BucketW<'tx>>> {
Some(RefMut::map(self.cell.borrow_mut(), |b| &mut b.w))
}
}
impl<'tx> BucketIApi<'tx, TxRwCell<'tx>> for BucketRwCell<'tx> {
fn new_in(
bump: &'tx Bump, bucket_header: BucketHeader, tx: TxRwCell<'tx>,
inline_page: Option<RefPage<'tx>>,
) -> Self {
let r = BucketR {
bucket_header,
inline_page,
p: Default::default(),
};
let w = BucketW::new_in(bump);
BucketRwCell {
cell: BCell::new_in(BucketRW { r, w }, tx, bump),
}
}
#[inline(always)]
fn is_writeable(&self) -> bool {
true
}
fn into_impl(self) -> BucketImpl<'tx> {
self.into()
}
}
impl<'tx> BucketRwIApi<'tx> for BucketRwCell<'tx> {
fn materialize_root(self) -> NodeRwCell<'tx> {
let root_id = {
let bucket = self.cell.borrow();
match bucket.w.root_node {
None => bucket.r.bucket_header.root(),
Some(root_node) => return root_node,
}
};
self.node(root_id, None)
}
fn api_create_bucket(self, key: &[u8]) -> crate::Result<Self> {
if key.is_empty() {
return Err(BucketNameRequired);
}
let mut c = self.i_cursor();
if let Some((k, _, flags)) = c.i_seek(key) {
if k == key {
if flags & BUCKET_LEAF_FLAG != 0 {
return Err(BucketExists);
}
return Err(IncompatibleValue);
}
}
let inline_page = InlineBucket::default();
let layout = Layout::from_size_align(INLINE_BUCKET_SIZE, INLINE_BUCKET_ALIGNMENT).unwrap();
let bump = self.tx().bump();
let data = bump.alloc_layout(layout).as_ptr();
let value = unsafe {
ptr::write_bytes(data, 0, INLINE_BUCKET_SIZE);
from_raw_parts_mut(data, INLINE_BUCKET_SIZE)
};
value.copy_from_slice(bytemuck::bytes_of(&inline_page));
let key = bump.alloc_slice_clone(key) as &[u8];
c.node().put(key, key, value, ZERO_PGID, BUCKET_LEAF_FLAG);
self.split_r_mut().inline_page = None;
Ok(self.api_bucket(key).unwrap())
}
fn api_create_bucket_if_not_exists(self, key: &[u8]) -> crate::Result<Self> {
match self.api_create_bucket(key) {
Ok(child) => Ok(child),
Err(error) => {
if error == BucketExists {
Ok(self.api_bucket(key).unwrap())
} else {
Err(error)
}
}
}
}
fn api_delete_bucket(self, key: &[u8]) -> crate::Result<()> {
let mut c = self.i_cursor();
let (k, _, flags) = c.i_seek(key).unwrap_or((&[], &[], 0));
if key != k {
return Err(BucketNotFound);
} else if flags & BUCKET_LEAF_FLAG == 0 {
return Err(IncompatibleValue);
}
let child = self.api_bucket(key).unwrap();
child.api_for_each_bucket(|k| {
match child.api_delete_bucket(k) {
Ok(_) => Ok(()),
Err(e) => Err(Error::Other(e.into())),
}
})?;
{
self.cell.borrow_mut().w.buckets.remove(key);
let mut child_mut = child.cell.borrow_mut();
child_mut.w.nodes.clear();
child_mut.w.root_node = None;
}
child.free();
c.node().del(key);
Ok(())
}
fn api_put(self, key: &[u8], value: &[u8]) -> crate::Result<()> {
if key.is_empty() {
return Err(KeyRequired);
} else if key.len() > MAX_KEY_SIZE as usize {
return Err(KeyTooLarge);
} else if value.len() > MAX_VALUE_SIZE as usize {
return Err(ValueTooLarge);
}
let mut c = self.i_cursor();
if let Some((k, _, flags)) = c.i_seek(key) {
if key == k && (flags & BUCKET_LEAF_FLAG) != 0 {
return Err(IncompatibleValue);
}
}
let bump = self.tx().bump();
let key = &*bump.alloc_slice_clone(key);
let value = &*bump.alloc_slice_clone(value);
c.node().put(key, key, value, ZERO_PGID, 0);
Ok(())
}
fn api_delete(self, key: &[u8]) -> crate::Result<()> {
let mut c = self.i_cursor();
let (k, _, flags) = c.i_seek(key).unwrap();
if key != k {
return Ok(());
}
if flags & BUCKET_LEAF_FLAG != 0 {
return Err(IncompatibleValue);
}
c.node().del(key);
Ok(())
}
fn api_set_sequence(self, v: u64) -> crate::Result<()> {
self.materialize_root();
self.split_r_mut().bucket_header.set_sequence(v);
Ok(())
}
fn api_next_sequence(self) -> crate::Result<u64> {
self.materialize_root();
let mut r = self.split_r_mut();
r.bucket_header.inc_sequence();
Ok(r.bucket_header.sequence())
}
fn free(self) {
if self.split_r().bucket_header.root() == ZERO_PGID {
return;
}
let tx = self.tx();
let txid = tx.meta().txid();
self.for_each_page_node(|pn, _| match pn {
PageNode::Page(page) => tx.freelist_free_page(txid, page),
PageNode::Node(node) => node.free(),
});
self.split_r_mut().bucket_header.set_root(ZERO_PGID);
}
fn spill(self, bump: &'tx Bump) -> crate::Result<()> {
let v = {
let bucket = self.cell.borrow();
let mut v = BVec::with_capacity_in(bucket.w.buckets.len(), bump);
for (name, child) in &bucket.w.buckets {
v.push((*name, *child));
}
v
};
for (name, child) in v.into_iter() {
let value = if child.inlineable() {
child.free();
child.write(bump)
} else {
child.spill(bump)?;
let layout = Layout::from_size_align(BUCKET_HEADER_SIZE, INLINE_BUCKET_ALIGNMENT).unwrap();
let inline_bucket_ptr = bump.alloc_layout(layout).as_ptr();
unsafe {
let inline_bucket = &mut (*(inline_bucket_ptr as *mut BucketHeader));
*inline_bucket = child.split_r().bucket_header;
from_raw_parts(inline_bucket_ptr, BUCKET_HEADER_SIZE)
}
};
if child.cell.borrow().w.root_node.is_none() {
continue;
}
let mut c = self.i_cursor();
let (k, _, flags) = c.i_seek(name).unwrap();
assert_eq!(name, k, "misplaced bucket header");
assert_ne!(
flags & BUCKET_LEAF_FLAG,
0,
"unexpected bucket header flag: {:x}",
flags
);
c.node().put(name, name, value, ZERO_PGID, BUCKET_LEAF_FLAG);
}
let root_node = match self.cell.borrow().w.root_node {
None => return Ok(()),
Some(root_node) => root_node,
};
root_node.spill()?;
{
let mut self_borrow = self.cell.borrow_mut();
let new_root = root_node.root();
self_borrow.w.root_node = Some(new_root);
let borrow_root = new_root.cell.borrow_mut();
let new_pgid = borrow_root.pgid;
let tx_pgid = self.cell.bound().meta().pgid();
if new_pgid >= tx_pgid {
panic!("pgid ({}) above high water mark ({})", new_pgid, tx_pgid);
}
self_borrow.r.bucket_header.set_root(new_pgid);
}
Ok(())
}
fn write(self, bump: &'tx Bump) -> &'tx [u8] {
let root_node = self.materialize_root();
let page_size = BUCKET_HEADER_SIZE + root_node.size();
let layout = Layout::from_size_align(page_size, INLINE_BUCKET_ALIGNMENT).unwrap();
let inline_bucket_ptr = bump.alloc_layout(layout).as_ptr();
unsafe {
let inline_bucket = &mut (*(inline_bucket_ptr as *mut BucketHeader));
*inline_bucket = self.cell.borrow().r.bucket_header;
let mut mut_page = MutPage::new(inline_bucket_ptr.add(BUCKET_HEADER_SIZE));
mut_page.id = PgId(0);
mut_page.count = 0;
mut_page.overflow = 0;
root_node.write(&mut mut_page);
from_raw_parts(inline_bucket_ptr, page_size)
}
}
fn inlineable(self) -> bool {
let bucket = self.cell.borrow_mut();
let n = match bucket.w.root_node {
None => return false,
Some(n) => n,
};
let node_ref = n.cell.borrow();
if !node_ref.is_leaf {
return false;
}
let mut size = PAGE_HEADER_SIZE;
for inode in node_ref.inodes.deref() {
size += LEAF_PAGE_ELEMENT_SIZE + inode.key().len() + inode.value().len();
if inode.flags() & BUCKET_LEAF_FLAG != 0 || size > self.max_inline_bucket_size() {
return false;
}
}
true
}
fn own_in(self) {
let (bump, root, children) = {
let tx = self.split_bound();
let bucket = self.cell.borrow();
let bump = tx.bump();
let mut children: BVec<BucketRwCell<'tx>> =
BVec::with_capacity_in(bucket.w.buckets.len(), bump);
children.extend(bucket.w.buckets.values());
(bump, bucket.w.root_node, children)
};
if let Some(node) = root {
node.root().own_in(bump);
}
for child in children.into_iter() {
child.own_in();
}
}
fn node(self, pgid: PgId, parent: Option<NodeRwCell<'tx>>) -> NodeRwCell<'tx> {
let inline_page = {
let self_borrow = self.cell.borrow_mut();
if let Some(n) = self_borrow.w.nodes.get(&pgid) {
return *n;
}
self_borrow.r.inline_page
};
let page = match inline_page {
None => self.tx().mem_page(pgid),
Some(page) => page,
};
let n = NodeRwCell::read_in(self, parent, &page);
let mut bucket = self.cell.borrow_mut();
let wb = &mut bucket.w;
match parent {
None => wb.root_node = Some(n),
Some(parent_node) => parent_node.cell.borrow_mut().children.push(n),
}
wb.nodes.insert(pgid, n);
self
.split_bound()
.split_r()
.stats
.as_ref()
.unwrap()
.inc_node_count(1);
n
}
fn rebalance(self) {
let bump = self.tx().bump();
let (nodes, buckets) = {
let borrow = self.cell.borrow();
let nodes = BVec::from_iter_in(borrow.w.nodes.values().cloned(), bump);
let buckets = BVec::from_iter_in(borrow.w.buckets.values().cloned(), bump);
(nodes, buckets)
};
let _nodes = nodes.as_slice();
for node in nodes.into_iter() {
node.rebalance();
}
for bucket in buckets.into_iter() {
bucket.rebalance();
}
}
}
#[cfg(test)]
mod tests {
use crate::bucket::MAX_VALUE_SIZE;
use crate::test_support::TestDb;
use crate::{
BucketApi, BucketRwApi, BucketStats, CursorApi, DbApi, DbRwAPI, Error, TxApi, TxRwRefApi,
};
use anyhow::anyhow;
use itertools::Itertools;
use rand::rngs::StdRng;
use rand::seq::SliceRandom;
use rand::SeedableRng;
use std::sync::atomic::{AtomicU32, Ordering};
#[test]
fn test_bucket_get_non_existent() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let b = tx.create_bucket(b"widgets")?;
assert_eq!(None, b.get(b"foo"));
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_get_from_node() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut b = tx.create_bucket(b"widgets")?;
b.put(b"foo", b"bar")?;
assert_eq!(Some(b"bar".as_slice()), b.get(b"foo"));
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_get_incompatible_value() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let _ = tx.create_bucket(b"widgets")?;
tx.bucket_mut(b"widgets").unwrap().create_bucket(b"foo")?;
assert_eq!(None, tx.bucket(b"widgets").unwrap().get(b"foo"));
Ok(())
})?;
Ok(())
}
#[test]
#[ignore]
fn test_bucket_get_capacity() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut b = tx.create_bucket(b"widgets")?;
b.put(b"key", b"val")?;
Ok(())
})?;
db.update(|tx| {
let b = tx.bucket(b"widgets").unwrap();
let mut c = b.cursor();
if let Some((_k, Some(_v))) = c.first() {
todo!("We don't allow modifying values in place for this first version");
}
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_put() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
{
let mut b = tx.create_bucket(b"widgets")?;
b.put(b"foo", b"bar")?;
}
assert_eq!(
Some(b"bar".as_slice()),
tx.bucket(b"widgets").unwrap().get(b"foo")
);
Ok(())
})
}
#[test]
fn test_bucket_put_repeat() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
{
let mut b = tx.create_bucket(b"widgets")?;
b.put(b"foo", b"bar")?;
b.put(b"foo", b"baz")?;
}
assert_eq!(
Some(b"baz".as_slice()),
tx.bucket(b"widgets").unwrap().get(b"foo")
);
Ok(())
})
}
#[test]
#[cfg(not(miri))]
fn test_bucket_put_large() -> crate::Result<()> {
let mut db = TestDb::new()?;
let count = 100;
let factor = 200;
db.update(|mut tx| {
let mut b = tx.create_bucket(b"widgets")?;
for i in 1..count {
b.put(
"0".repeat(i * factor).as_bytes(),
"X".repeat((count - i) * factor).as_bytes(),
)?;
}
Ok(())
})?;
db.view(|tx| {
let b = tx.bucket(b"widgets").unwrap();
for i in 1..count {
let v = b.get("0".repeat(i * factor).as_bytes()).unwrap();
assert_eq!((count - i) * factor, v.len());
v.iter().all(|c| c == &b'X');
}
Ok(())
})
}
#[test]
#[cfg(all(not(miri), feature = "long-tests"))]
fn test_db_put_very_large() -> crate::Result<()> {
let mut db = TestDb::new()?;
let n = 400000u64;
let batch_n = 200000u64;
let v = [0u8; 500];
for i in (0..n).step_by(batch_n as usize) {
db.update(|mut tx| {
let mut b = tx.create_bucket_if_not_exists(b"widgets")?;
for j in 0..batch_n {
b.put((i + j).to_be_bytes().as_slice(), v)?;
}
Ok(())
})?;
}
Ok(())
}
#[test]
fn test_bucket_put_incompatible_value() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let _ = tx.create_bucket(b"widgets")?;
tx.bucket_mut(b"widgets").unwrap().create_bucket(b"foo")?;
assert_eq!(
Err(Error::IncompatibleValue),
tx.bucket_mut(b"widgets").unwrap().put(b"foo", b"bar")
);
Ok(())
})
}
#[test]
fn test_bucket_delete() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut b = tx.create_bucket(b"widgets")?;
b.put(b"foo", b"bar")?;
b.delete(b"foo")?;
assert_eq!(None, b.get(b"foo"));
Ok(())
})
}
#[test]
#[cfg(not(miri))]
fn test_bucket_delete_large() -> crate::Result<()> {
let mut db = TestDb::new()?;
let var = [b'*'; 1024];
db.update(|mut tx| {
let mut b = tx.create_bucket(b"widgets")?;
for i in 0..100 {
b.put(format!("{}", i).as_bytes(), var)?;
}
Ok(())
})?;
db.update(|mut tx| {
let mut b = tx.bucket_mut(b"widgets").unwrap();
for i in 0..100 {
b.delete(format!("{}", i).as_bytes())?;
}
Ok(())
})?;
db.view(|tx| {
let b = tx.bucket(b"widgets").unwrap();
for i in 0..100 {
assert_eq!(None, b.get(format!("{}", i).as_bytes()));
}
Ok(())
})?;
Ok(())
}
#[test]
#[cfg(all(not(miri), feature = "long_tests"))]
fn test_bucket_delete_freelist_overflow() -> crate::Result<()> {
let mut db = TestDb::new()?;
for i in 0u64..8192 {
db.update(|mut tx| {
let mut b = tx.create_bucket_if_not_exists(b"0")?;
for j in 0u64..1000 {
let mut k = [0u8; 16];
let (k0, k1) = k.split_at_mut(8);
k0.copy_from_slice(i.to_be_bytes().as_slice());
k1.copy_from_slice(j.to_be_bytes().as_slice());
b.put(k, [])?;
}
Ok(())
})?;
}
db.update(|mut tx| {
let b = tx.bucket_mut(b"0").unwrap();
let mut c = b.cursor_mut();
let mut node = c.first();
while node.is_some() {
c.delete()?;
node = c.next();
}
Ok(())
})?;
let stats = db.stats();
let free_page_n = stats.free_page_n();
let pending_page_n = stats.pending_page_n();
let free_pages = free_page_n + pending_page_n;
assert!(
free_pages > 0xFFFF,
"expected more than 0xFFFF free pages. Got {}",
free_pages
);
db.must_close();
db.must_reopen();
let stats = db.stats();
let reopen_free_pages = stats.free_page_n();
assert_eq!(
free_pages, reopen_free_pages,
"Expected {} free pages, got {:?}",
free_pages, reopen_free_pages
);
Ok(())
}
#[test]
fn test_bucket_delete_non_existing() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut b = tx.create_bucket(b"widgets")?;
let _ = b.create_bucket(b"nested")?;
Ok(())
})?;
db.update(|mut tx| {
let mut b = tx.bucket_mut(b"widgets").unwrap();
b.delete(b"foo")?;
assert!(
b.bucket(b"nested").is_some(),
"nested bucket has been deleted"
);
Ok(())
})?;
Ok(())
}
#[test]
#[cfg(not(miri))]
fn test_bucket_nested() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut b = tx.create_bucket(b"widgets")?;
let _ = b.create_bucket(b"foo")?;
b.put(b"bar", b"0000")?;
Ok(())
})?;
db.must_check();
db.update(|mut tx| {
let mut b = tx.bucket_mut(b"widgets").unwrap();
b.put(b"bar", b"xxxx")?;
Ok(())
})?;
db.must_check();
db.update(|mut tx| {
let mut b = tx.bucket_mut(b"widgets").unwrap();
for i in 0..10000 {
let s = format!("{}", i);
b.put(s.as_bytes(), s.as_bytes())?;
}
Ok(())
})?;
db.must_check();
db.update(|mut tx| {
let mut b = tx.bucket_mut(b"widgets").unwrap();
{
let mut foo = b.bucket_mut(b"foo").unwrap();
foo.put(b"baz", b"yyyy")?;
}
b.put(b"bar", b"xxxx")?;
Ok(())
})?;
db.must_check();
db.view(|tx| {
let b = tx.bucket(b"widgets").unwrap();
let foo = b.bucket(b"foo").unwrap();
assert_eq!(Some(b"yyyy".as_slice()), foo.get(b"baz"));
assert_eq!(Some(b"xxxx".as_slice()), b.get(b"bar"));
for i in 0..10000 {
let s = format!("{}", i);
assert_eq!(Some(s.as_bytes()), b.get(s.as_bytes()));
}
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_delete_bucket() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut b = tx.create_bucket(b"widgets")?;
let _ = b.create_bucket(b"foo")?;
assert_eq!(Err(Error::IncompatibleValue), b.delete(b"foo"));
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_delete_bucket_nested() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
{
let mut widgets = tx.create_bucket(b"widgets")?;
let mut foo = widgets.create_bucket(b"foo")?;
let mut bar = foo.create_bucket(b"bar")?;
bar.put(b"baz", b"bat")?;
}
tx.bucket_mut(b"widgets").unwrap().delete_bucket(b"foo")?;
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_delete_bucket_nested2() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut widgets = tx.create_bucket(b"widgets")?;
let mut foo = widgets.create_bucket(b"foo")?;
let mut bar = foo.create_bucket(b"bar")?;
bar.put(b"baz", b"bat")?;
Ok(())
})?;
db.update(|mut tx| {
{
let widgets = tx.bucket(b"widgets").unwrap();
let foo = widgets.bucket(b"foo").unwrap();
let bar = foo.bucket(b"bar").unwrap();
assert_eq!(Some(b"bat".as_slice()), bar.get(b"baz"));
}
tx.delete_bucket(b"widgets")?;
Ok(())
})?;
db.view(|tx| {
assert!(tx.bucket(b"widgets").is_none());
Ok(())
})?;
Ok(())
}
#[test]
#[cfg(not(miri))]
fn test_bucket_delete_bucket_large() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut widgets = tx.create_bucket(b"widgets")?;
let mut foo = widgets.create_bucket(b"foo")?;
for i in 0..1000 {
let k = format!("{}", i);
let v = format!("{:0100}", i);
foo.put(k.as_bytes(), v.as_bytes())?;
}
Ok(())
})?;
db.update(|mut tx| {
tx.delete_bucket(b"widgets")?;
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_bucket_incompatible_value() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut widgets = tx.create_bucket(b"widgets")?;
widgets.put(b"foo", b"bar")?;
assert!(widgets.bucket(b"foo").is_none());
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_create_bucket_incompatible_value() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut widgets = tx.create_bucket(b"widgets")?;
widgets.put(b"foo", b"bar")?;
assert_eq!(
Some(Error::IncompatibleValue),
widgets.create_bucket(b"foo").err()
);
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_delete_bucket_incompatible_value() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut widgets = tx.create_bucket(b"widgets")?;
widgets.put(b"foo", b"bar")?;
assert_eq!(
Some(Error::IncompatibleValue),
widgets.delete_bucket(b"foo").err()
);
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_sequence() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut bkt = tx.create_bucket(b"0")?;
assert_eq!(0, bkt.sequence());
bkt.set_sequence(1000)?;
assert_eq!(1000, bkt.sequence());
Ok(())
})?;
db.view(|tx| {
let bkt = tx.bucket(b"0").unwrap();
assert_eq!(1000, bkt.sequence());
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_next_sequence() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let _ = tx.create_bucket(b"widgets")?;
let _ = tx.create_bucket(b"woojits")?;
{
let mut widgets = tx.bucket_mut("widgets").unwrap();
assert_eq!(1, widgets.next_sequence()?);
assert_eq!(2, widgets.next_sequence()?);
}
let mut woojits = tx.bucket_mut("woojits").unwrap();
assert_eq!(1, woojits.next_sequence()?);
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_next_sequence_persist() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
tx.create_bucket(b"widgets")?;
Ok(())
})?;
db.update(|mut tx| {
let mut widgets = tx.bucket_mut(b"widgets").unwrap();
assert_eq!(1, widgets.next_sequence()?);
Ok(())
})?;
db.update(|mut tx| {
let mut widgets = tx.bucket_mut(b"widgets").unwrap();
assert_eq!(2, widgets.next_sequence()?);
Ok(())
})?;
Ok(())
}
fn for_each_collect_kv<'tx, B: BucketApi<'tx>>(
b: B,
) -> crate::Result<Vec<(&'tx [u8], Option<&'tx [u8]>)>> {
let items = std::cell::RefCell::new(Vec::new());
b.for_each(|k, v| {
items.borrow_mut().push((k, v));
Ok(())
})?;
Ok(items.into_inner())
}
fn for_each_bucket_collect_k<'tx, B: BucketApi<'tx>>(b: B) -> crate::Result<Vec<&'tx [u8]>> {
let items = std::cell::RefCell::new(Vec::new());
b.for_each_bucket(|k| {
items.borrow_mut().push(k);
Ok(())
})?;
Ok(items.into_inner())
}
#[test]
fn test_bucket_for_each() -> crate::Result<()> {
let expected_items = [
(b"bar".as_slice(), Some(b"0002".as_slice())),
(b"baz".as_slice(), Some(b"0001".as_slice())),
(b"csubbucket".as_slice(), None),
(b"foo".as_slice(), Some(b"0000".as_slice())),
];
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut b = tx.create_bucket(b"widgets")?;
b.put(b"foo", b"0000")?;
b.put(b"baz", b"0001")?;
b.put(b"bar", b"0002")?;
b.create_bucket(b"csubbucket")?;
let items = for_each_collect_kv(b)?;
assert_eq!(
expected_items.as_slice(),
&items,
"what we iterated (ForEach) is not what we put"
);
Ok(())
})?;
db.view(|tx| {
let b = tx.bucket(b"widgets").unwrap();
let items = for_each_collect_kv(b)?;
assert_eq!(
expected_items.as_slice(),
&items,
"what we iterated (ForEach) is not what we put"
);
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_for_each_bucket() -> crate::Result<()> {
let expected_items = [b"csubbucket".as_slice(), b"zsubbucket".as_slice()];
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut b = tx.create_bucket(b"widgets")?;
b.put(b"foo", b"0000")?;
let _ = b.create_bucket(b"zsubbucket")?;
b.put(b"baz", b"0001")?;
b.put(b"bar", b"0002")?;
let _ = b.create_bucket(b"csubbucket")?;
let items = for_each_bucket_collect_k(b)?;
assert_eq!(
expected_items.as_slice(),
&items,
"what we iterated (ForEach) is not what we put"
);
Ok(())
})?;
db.view(|tx| {
let b = tx.bucket(b"widgets").unwrap();
let items = for_each_bucket_collect_k(b)?;
assert_eq!(
expected_items.as_slice(),
&items,
"what we iterated (ForEach) is not what we put"
);
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_for_each_bucket_no_buckets() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut b = tx.create_bucket(b"widgets")?;
b.put(b"foo", b"0000")?;
b.put(b"baz", b"0001")?;
let items = for_each_bucket_collect_k(b)?;
assert!(
items.is_empty(),
"what we iterated (ForEach) is not what we put"
);
Ok(())
})?;
db.view(|tx| {
let b = tx.bucket(b"widgets").unwrap();
let items = for_each_bucket_collect_k(b)?;
assert!(
items.is_empty(),
"what we iterated (ForEach) is not what we put"
);
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_for_each_short_circuit() -> crate::Result<()> {
let mut db = TestDb::new()?;
let result = db.update(|mut tx| {
{
let mut b = tx.create_bucket(b"widgets")?;
b.put(b"bar", b"0000")?;
b.put(b"baz", b"0000")?;
b.put(b"foo", b"0000")?;
}
let index = AtomicU32::new(0);
tx.bucket(b"widgets").unwrap().for_each(|k, _| {
index.fetch_add(1, Ordering::Relaxed);
if k == b"baz" {
return Err(Error::Other(anyhow!("marker")));
}
Ok(())
})?;
Ok(())
});
let e = result.map_err(|e| e.to_string()).err().unwrap();
assert_eq!("marker", e);
Ok(())
}
#[test]
fn test_bucket_put_empty_key() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut widgets = tx.create_bucket(b"widgets")?;
assert_eq!(Some(Error::KeyRequired), widgets.put([], []).err());
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_put_key_too_large() -> crate::Result<()> {
let mut db = TestDb::new()?;
let key = [0u8; 32769];
db.update(|mut tx| {
let mut widgets = tx.create_bucket(b"widgets")?;
assert_eq!(
Some(Error::KeyTooLarge),
widgets.put(key.as_slice(), b"bar").err()
);
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_put_value_too_large() -> crate::Result<()> {
let mut db = TestDb::new()?;
let value = vec![0u8; MAX_VALUE_SIZE as usize + 1];
db.update(|mut tx| {
let mut widgets = tx.create_bucket(b"widgets")?;
assert_eq!(
Some(Error::ValueTooLarge),
widgets.put(b"foo", value.as_slice()).err()
);
Ok(())
})?;
Ok(())
}
#[test]
#[cfg(feature = "long-tests")]
fn test_bucket_stats() -> crate::Result<()> {
let mut db = TestDb::new()?;
let big_key = "really-big-value";
for i in 0..500 {
db.update(|mut tx| {
let mut b = tx.create_bucket_if_not_exists("woojits")?;
b.put(format!("{:03}", i), format!("{}", i))?;
Ok(())
})?;
}
let long_key_length = 10 * 4096 + 17;
db.update(|mut tx| {
tx.bucket_mut("woojits")
.unwrap()
.put(big_key, "*".repeat(long_key_length))?;
Ok(())
})?;
db.must_check();
let stat_4096 = BucketStats {
branch_page_n: 1,
branch_overflow_n: 0,
leaf_page_n: 7,
leaf_overflow_n: 10,
key_n: 501,
depth: 2,
branch_alloc: 4096,
branch_in_use: 149,
leaf_alloc: 69_632,
leaf_in_use: 0 +
7 * 16 + 501 * 16 + 500 * 3 + big_key.len() as i64 + 1 * 10 + 2 * 90 + 3 * 400 + long_key_length as i64, bucket_n: 1,
inline_bucket_n: 0,
inline_bucket_in_use: 0,
};
db.view(|tx| {
let b = tx.bucket("woojits").unwrap();
let stats = b.stats();
assert_eq!(stat_4096, stats, "stats differs from expectations");
Ok(())
})?;
Ok(())
}
#[test]
#[cfg(feature = "long-tests")]
fn test_bucket_stats_random_fill() -> crate::Result<()> {
let mut db = TestDb::new()?;
let mut count = 0;
let mut rand = StdRng::seed_from_u64(42);
let mut outer_range = (0..1000).collect_vec();
outer_range.shuffle(&mut rand);
for i in outer_range {
db.update(|mut tx| {
let mut b = tx.create_bucket_if_not_exists("woojits")?;
b.set_fill_percent(0.90);
let mut inner_range = (0..100).collect_vec();
inner_range.shuffle(&mut rand);
for j in inner_range {
let index = (j * 1000) + i;
b.put(format!("{:015}", index), "0000000000")?;
count += 1;
}
Ok(())
})?;
}
db.must_check();
db.view(|tx| {
let b = tx.bucket("woojits").unwrap();
let stats = b.stats();
assert_eq!(68, stats.branch_page_n, "unexpected BranchPageN");
assert_eq!(0, stats.branch_overflow_n, "unexpected BranchOverflowN");
assert_eq!(2946, stats.leaf_page_n, "unexpected LeafPageN");
assert_eq!(0, stats.leaf_overflow_n, "unexpected LeafOverflowN");
assert_eq!(10_0000, stats.key_n, "unexpected KeyN");
assert_eq!(94_491, stats.branch_in_use, "unexpected BranchInuse");
assert_eq!(4_147_136, stats.leaf_in_use, "unexpected LeafInuse");
assert_eq!(278_528, stats.branch_alloc, "unexpected BranchAlloc");
assert_eq!(12_066_816, stats.leaf_alloc, "unexpected LeafAlloc");
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_stats_small() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut b = tx.create_bucket("whozawhats")?;
b.put("foo", "bar")?;
Ok(())
})?;
db.must_check();
db.view(|tx| {
let b = tx.bucket("whozawhats").unwrap();
let stats = b.stats();
assert_eq!(0, stats.branch_page_n, "unexpected BranchPageN");
assert_eq!(0, stats.branch_overflow_n, "unexpected BranchOverflowN");
assert_eq!(0, stats.leaf_page_n, "unexpected LeafPageN");
assert_eq!(0, stats.leaf_overflow_n, "unexpected LeafOverflowN");
assert_eq!(1, stats.key_n, "unexpected KeyN");
assert_eq!(1, stats.depth, "unexpected Depth");
assert_eq!(0, stats.branch_in_use, "unexpected BranchInuse");
assert_eq!(0, stats.leaf_in_use, "unexpected LeafInuse");
assert_eq!(0, stats.branch_alloc, "unexpected BranchAlloc");
assert_eq!(0, stats.leaf_alloc, "unexpected LeafAlloc");
assert_eq!(1, stats.bucket_n, "unexpected BucketN");
assert_eq!(1, stats.inline_bucket_n, "unexpected InlineBucketN");
assert_eq!(
16 + 16 + 6,
stats.inline_bucket_in_use,
"unexpected InlineBucketInuse"
);
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_stats_empty_bucket() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
tx.create_bucket("whozawhats")?;
Ok(())
})?;
db.must_check();
db.view(|tx| {
let b = tx.bucket("whozawhats").unwrap();
let stats = b.stats();
assert_eq!(0, stats.branch_page_n, "unexpected BranchPageN");
assert_eq!(0, stats.branch_overflow_n, "unexpected BranchOverflowN");
assert_eq!(0, stats.leaf_page_n, "unexpected LeafPageN");
assert_eq!(0, stats.leaf_overflow_n, "unexpected LeafOverflowN");
assert_eq!(0, stats.key_n, "unexpected KeyN");
assert_eq!(1, stats.depth, "unexpected Depth");
assert_eq!(0, stats.branch_in_use, "unexpected BranchInuse");
assert_eq!(0, stats.leaf_in_use, "unexpected LeafInuse");
assert_eq!(0, stats.branch_alloc, "unexpected BranchAlloc");
assert_eq!(0, stats.leaf_alloc, "unexpected LeafAlloc");
assert_eq!(1, stats.bucket_n, "unexpected BucketN");
assert_eq!(1, stats.inline_bucket_n, "unexpected InlineBucketN");
assert_eq!(
16, stats.inline_bucket_in_use,
"unexpected InlineBucketInuse"
);
Ok(())
})?;
Ok(())
}
#[test]
fn test_bucket_stats_nested() -> crate::Result<()> {
let mut db = TestDb::new()?;
db.update(|mut tx| {
let mut b = tx.create_bucket("foo")?;
for i in 0..100 {
let i_str = format!("{:02}", i);
b.put(&i_str, &i_str)?;
}
let mut bar = b.create_bucket("bar")?;
for i in 0..10 {
let i_str = format!("{}", i);
bar.put(&i_str, &i_str)?;
}
let mut baz = bar.create_bucket("baz")?;
for i in 0..10 {
let i_str = format!("{}", i);
baz.put(&i_str, &i_str)?;
}
Ok(())
})?;
db.must_check();
db.view(|tx| {
let b = tx.bucket("foo").unwrap();
let stats = b.stats();
assert_eq!(0, stats.branch_page_n, "unexpected BranchPageN");
assert_eq!(0, stats.branch_overflow_n, "unexpected BranchOverflowN");
assert_eq!(2, stats.leaf_page_n, "unexpected LeafPageN");
assert_eq!(0, stats.leaf_overflow_n, "unexpected LeafOverflowN");
assert_eq!(122, stats.key_n, "unexpected KeyN");
assert_eq!(3, stats.depth, "unexpected Depth");
assert_eq!(0, stats.branch_in_use, "unexpected BranchInuse");
let mut foo = 16; foo += 101 * 16; foo += 100 * 2 + 100 * 2; foo += 3 + 16; let mut bar = 16; bar += 11 * 16; bar += 10 + 10; bar += 3 + 16; let mut baz = 16; baz += 10 * 16; baz += 10 + 10; assert_eq!(foo + bar + baz, stats.leaf_in_use, "unexpected LeafInuse");
assert_eq!(0, stats.branch_alloc, "unexpected BranchAlloc");
assert_eq!(8192, stats.leaf_alloc, "unexpected LeafAlloc");
assert_eq!(3, stats.bucket_n, "unexpected BucketN");
assert_eq!(1, stats.inline_bucket_n, "unexpected InlineBucketN");
assert_eq!(
baz, stats.inline_bucket_in_use,
"unexpected InlineBucketInuse"
);
Ok(())
})?;
Ok(())
}
#[test]
#[cfg(feature = "long-tests")]
fn test_bucket_stats_large() -> crate::Result<()> {
let mut db = TestDb::new()?;
let mut index = 0;
for _ in 0..100 {
db.update(|mut tx| {
let mut b = tx.create_bucket_if_not_exists("widgets")?;
for _ in 0..1000 {
let i_str = format!("{}", index);
b.put(&i_str, &i_str)?;
index += 1;
}
Ok(())
})?;
}
db.must_check();
let stat_4096 = BucketStats {
branch_page_n: 13,
branch_overflow_n: 0,
leaf_page_n: 1196,
leaf_overflow_n: 0,
key_n: 100_000,
depth: 3,
branch_alloc: 53_248,
branch_in_use: 25_257,
leaf_alloc: 4_898_816,
leaf_in_use: 2_596_916,
bucket_n: 1,
inline_bucket_n: 0,
inline_bucket_in_use: 0,
};
db.view(|tx| {
let b = tx.bucket("widgets").unwrap();
let stats = b.stats();
assert_eq!(stat_4096, stats, "stats differs from expectations");
Ok(())
})?;
Ok(())
}
#[test]
#[ignore]
#[cfg(feature = "long-tests")]
fn test_bucket_put_single() -> crate::Result<()> {
todo!("quick-check")
}
#[test]
#[ignore]
#[cfg(feature = "long-tests")]
fn test_bucket_put_multiple() -> crate::Result<()> {
todo!("quick-check")
}
#[test]
#[ignore]
#[cfg(feature = "long-tests")]
fn test_bucket_delete_quick() -> crate::Result<()> {
todo!("quick-check")
}
}