use crate::{
btreemap::Allocator,
read_address_vec, read_struct, read_to_vec, read_u32, read_u64,
storable::Storable,
types::{Address, Bytes},
write, write_struct, write_u32, Memory,
};
use std::borrow::{Borrow, Cow};
use std::cell::OnceCell;
mod io;
#[cfg(test)]
mod tests;
mod v1;
mod v2;
use io::NodeReader;
const B: usize = 6;
const CAPACITY: usize = 2 * B - 1;
const LAYOUT_VERSION_1: u8 = 1;
const LAYOUT_VERSION_2: u8 = 2;
const MAGIC: &[u8; 3] = b"BTN";
const LEAF_NODE_TYPE: u8 = 0;
const INTERNAL_NODE_TYPE: u8 = 1;
const U32_SIZE: Bytes = Bytes::new(4);
#[derive(Debug, PartialEq, Copy, Clone, Eq)]
#[cfg_attr(test, derive(test_strategy::Arbitrary))]
pub enum NodeType {
Leaf,
Internal,
}
pub type Entry<K> = (K, Vec<u8>);
pub type EntryRef<'a, K> = (&'a K, &'a [u8]);
type LazyEntry<K> = (LazyKey<K>, LazyValue);
#[derive(Debug)]
pub struct Node<K: Storable + Ord + Clone> {
address: Address,
entries: Vec<LazyEntry<K>>,
children: Vec<Address>,
node_type: NodeType,
version: Version,
overflows: Vec<Address>,
}
impl<K: Storable + Ord + Clone> Node<K> {
pub fn load<M: Memory>(address: Address, page_size: PageSize, memory: &M) -> Self {
let header: NodeHeader = read_struct(address, memory);
assert_eq!(&header.magic, MAGIC, "Bad magic.");
match header.version {
LAYOUT_VERSION_1 => match page_size {
PageSize::Derived(DerivedPageSize {
max_key_size,
max_value_size,
}) => Self::load_v1(header, address, max_key_size, max_value_size, memory),
PageSize::Value(_) => {
unreachable!("Tried to load a V1 node without a derived PageSize.")
}
},
LAYOUT_VERSION_2 => Self::load_v2(address, page_size, header, memory),
unknown_version => unreachable!("Unsupported version {unknown_version}."),
}
}
pub fn save<M: Memory>(&mut self, allocator: &mut Allocator<M>) {
match self.version {
Version::V1(_) => self.save_v1(allocator.memory()),
Version::V2(_) => self.save_v2(allocator),
}
}
pub fn address(&self) -> Address {
self.address
}
pub fn node_type(&self) -> NodeType {
self.node_type
}
pub fn get_max<M: Memory>(&self, memory: &M) -> Entry<K> {
match self.node_type {
NodeType::Leaf => {
let last_entry = self.entries.last().expect("A node can never be empty");
(
self.get_key(last_entry, memory).clone(),
self.get_value(last_entry, memory).to_vec(),
)
}
NodeType::Internal => {
let last_child = Self::load(
*self
.children
.last()
.expect("An internal node must have children."),
self.version.page_size(),
memory,
);
last_child.get_max(memory)
}
}
}
pub fn get_min<M: Memory>(&self, memory: &M) -> Entry<K> {
match self.node_type {
NodeType::Leaf => {
let entry = self.entry(0, memory);
(entry.0.clone(), entry.1.to_vec())
}
NodeType::Internal => {
let first_child = Self::load(
self.children[0],
self.version.page_size(),
memory,
);
first_child.get_min(memory)
}
}
}
pub fn is_full(&self) -> bool {
self.entries.len() >= CAPACITY
}
pub fn swap_value<M: Memory>(&mut self, idx: usize, new: Vec<u8>, memory: &M) -> Vec<u8> {
let old = core::mem::replace(&mut self.entries[idx].1, LazyValue::by_value(new));
self.extract_value(old, memory)
}
pub fn swap_entry<M: Memory>(
&mut self,
idx: usize,
(key, value): Entry<K>,
memory: &M,
) -> Entry<K> {
let (old_key, old_value) = core::mem::replace(
&mut self.entries[idx],
(LazyKey::by_value(key), LazyValue::by_value(value)),
);
(
self.extract_key(old_key, memory),
self.extract_value(old_value, memory),
)
}
#[inline(always)]
pub fn entry<M: Memory>(&self, idx: usize, memory: &M) -> EntryRef<'_, K> {
(self.key(idx, memory), self.value(idx, memory))
}
#[inline(always)]
fn get_key<'a, M: Memory>(&'a self, (k, _): &'a LazyEntry<K>, memory: &M) -> &'a K {
k.get_or_load(|offset, size| self.load_key_from_memory(offset, size, memory))
}
#[inline(always)]
fn get_value<'a, M: Memory>(&'a self, (_, v): &'a LazyEntry<K>, memory: &M) -> &'a [u8] {
v.get_or_load(|offset, size| self.load_value_from_memory(offset, size, memory))
}
#[inline(always)]
pub fn key<M: Memory>(&self, idx: usize, memory: &M) -> &K {
self.get_key(&self.entries[idx], memory)
}
#[inline(always)]
pub fn value<M: Memory>(&self, idx: usize, memory: &M) -> &[u8] {
self.get_value(&self.entries[idx], memory)
}
fn extract_key<M: Memory>(&self, key: LazyKey<K>, memory: &M) -> K {
key.take_or_load(|offset, size| self.load_key_from_memory(offset, size, memory))
}
fn extract_value<M: Memory>(&self, value: LazyValue, memory: &M) -> Vec<u8> {
value.take_or_load(|offset, size| self.load_value_from_memory(offset, size, memory))
}
fn load_key_from_memory<M: Memory>(
&self,
mut offset: Bytes,
loaded_size: u32,
memory: &M,
) -> K {
let reader = NodeReader {
address: self.address,
overflows: &self.overflows,
page_size: self.page_size(),
memory,
};
let size = match self.version {
Version::V1(_) => {
offset += U32_SIZE;
loaded_size
}
Version::V2(_) => {
if K::BOUND.is_fixed_size() {
K::BOUND.max_size()
} else {
offset += U32_SIZE;
loaded_size
}
}
} as usize;
let mut bytes = Vec::with_capacity(size);
read_to_vec(&reader, Address::from(offset.get()), &mut bytes, size);
K::from_bytes(Cow::Borrowed(&bytes))
}
fn load_value_from_memory<M: Memory>(
&self,
offset: Bytes,
loaded_size: u32,
memory: &M,
) -> Vec<u8> {
let reader = NodeReader {
address: self.address,
overflows: &self.overflows,
page_size: self.page_size(),
memory,
};
let size = loaded_size as usize;
let mut bytes = Vec::with_capacity(size);
read_to_vec(
&reader,
Address::from((offset + U32_SIZE).get()),
&mut bytes,
size,
);
bytes
}
fn page_size(&self) -> PageSize {
self.version.page_size()
}
pub fn child(&self, idx: usize) -> Address {
self.children[idx]
}
pub fn insert_child(&mut self, idx: usize, address: Address) {
self.children.insert(idx, address)
}
pub fn push_child(&mut self, address: Address) {
self.children.push(address)
}
pub fn remove_child(&mut self, idx: usize) -> Address {
self.children.remove(idx)
}
pub fn children_len(&self) -> usize {
self.children.len()
}
pub fn pop_child(&mut self) -> Option<Address> {
self.children.pop()
}
pub fn insert_entry(&mut self, idx: usize, (key, value): Entry<K>) {
self.entries
.insert(idx, (LazyKey::by_value(key), LazyValue::by_value(value)));
}
pub fn extract_entry_at<M: Memory>(&mut self, idx: usize, memory: &M) -> Entry<K> {
let (key, value) = self.entries.swap_remove(idx);
(
self.extract_key(key, memory),
self.extract_value(value, memory),
)
}
pub fn remove_entry<M: Memory>(&mut self, idx: usize, memory: &M) -> Entry<K> {
let (key, value) = self.entries.remove(idx);
(
self.extract_key(key, memory),
self.extract_value(value, memory),
)
}
pub fn push_entry(&mut self, (key, value): Entry<K>) {
self.entries
.push((LazyKey::by_value(key), LazyValue::by_value(value)));
}
pub fn pop_entry<M: Memory>(&mut self, memory: &M) -> Option<Entry<K>> {
let len = self.entries_len();
if len == 0 {
return None;
}
let (key, value) = self.entries.pop().expect("node must not be empty");
Some((
self.extract_key(key, memory),
self.extract_value(value, memory),
))
}
pub fn merge<M: Memory>(
&mut self,
mut source: Node<K>,
median: Entry<K>,
allocator: &mut Allocator<M>,
) {
for i in 0..source.entries_len() {
source.entry(i, allocator.memory());
}
if source.key(0, allocator.memory()) > self.key(0, allocator.memory()) {
Self::append(self, &mut source, median, allocator.memory());
} else {
Self::append(&mut source, self, median, allocator.memory());
self.entries = core::mem::take(&mut source.entries);
self.children = core::mem::take(&mut source.children);
}
self.save(allocator);
source.deallocate(allocator);
}
fn append<M: Memory>(a: &mut Node<K>, b: &mut Node<K>, median: Entry<K>, memory: &M) {
let a_len = a.entries_len();
assert_eq!(a.node_type(), b.node_type());
assert!(b.entries_len() > 0);
assert!(a_len > 0);
assert!(a.key(a_len - 1, memory) < &median.0);
assert!(&median.0 < b.key(0, memory));
a.push_entry(median);
a.entries.append(&mut b.entries);
a.children.append(&mut b.children);
assert_eq!(b.entries.len(), 0);
assert_eq!(b.children.len(), 0);
}
#[cfg(test)]
pub fn entries<M: Memory>(&self, memory: &M) -> Vec<Entry<K>> {
(0..self.entries.len())
.map(|i| (self.key(i, memory).clone(), self.value(i, memory).to_vec()))
.collect()
}
#[cfg(test)]
pub fn keys<M: Memory>(&self, memory: &M) -> Vec<&K> {
(0..self.entries.len())
.map(|i| self.key(i, memory))
.collect()
}
#[cfg(test)]
pub fn overflows(&self) -> &[Address] {
&self.overflows
}
pub fn entries_len(&self) -> usize {
self.entries.len()
}
pub fn search<M: Memory>(&self, key: &K, memory: &M) -> Result<usize, usize> {
self.entries
.binary_search_by_key(&key, |entry| self.get_key(entry, memory))
}
pub fn max_size(max_key_size: u32, max_value_size: u32) -> Bytes {
v1::size_v1(max_key_size, max_value_size)
}
pub fn at_minimum(&self) -> bool {
self.entries.len() < B
}
pub fn can_remove_entry_without_merging(&self) -> bool {
!self.at_minimum()
}
pub fn split<M: Memory>(&mut self, sibling: &mut Node<K>, memory: &M) -> Entry<K> {
debug_assert!(self.is_full());
for idx in B..self.entries_len() {
self.entry(idx, memory);
}
sibling.entries = self.entries.split_off(B);
if self.node_type == NodeType::Internal {
sibling.children = self.children.split_off(B);
}
self.pop_entry(memory)
.expect("An initially full node cannot be empty")
}
pub fn deallocate<M: Memory>(self, allocator: &mut Allocator<M>) {
for overflow in self.overflows.into_iter() {
allocator.deallocate(overflow);
}
allocator.deallocate(self.address);
}
}
#[repr(C, packed)]
struct NodeHeader {
magic: [u8; 3],
version: u8,
node_type: u8,
num_entries: u16,
}
impl NodeHeader {
fn size() -> Bytes {
Bytes::from(core::mem::size_of::<Self>() as u64)
}
}
#[derive(Debug)]
enum LazyObject<T> {
ByVal(T),
ByRef {
offset: Bytes,
size: u32,
loaded: OnceCell<T>,
},
}
impl<T> LazyObject<T> {
#[inline(always)]
pub fn by_value(value: T) -> Self {
LazyObject::ByVal(value)
}
#[inline(always)]
pub fn by_ref(offset: Bytes, size: u32) -> Self {
LazyObject::ByRef {
offset,
size,
loaded: OnceCell::new(),
}
}
#[inline(always)]
pub fn get_or_load(&self, load: impl FnOnce(Bytes, u32) -> T) -> &T {
match self {
LazyObject::ByVal(value) => value,
LazyObject::ByRef {
offset,
size,
loaded,
} => loaded.get_or_init(|| load(*offset, *size)),
}
}
#[inline(always)]
pub fn take_or_load(self, load: impl FnOnce(Bytes, u32) -> T) -> T {
match self {
LazyObject::ByVal(value) => value,
LazyObject::ByRef {
offset,
size,
loaded,
} => loaded.into_inner().unwrap_or_else(|| load(offset, size)),
}
}
}
type Blob = Vec<u8>;
#[derive(Debug)]
struct LazyValue(LazyObject<Blob>);
impl LazyValue {
#[inline(always)]
pub fn by_value(value: Blob) -> Self {
Self(LazyObject::by_value(value))
}
#[inline(always)]
pub fn by_ref(offset: Bytes, size: u32) -> Self {
Self(LazyObject::by_ref(offset, size))
}
#[inline(always)]
pub fn get_or_load(&self, load: impl FnOnce(Bytes, u32) -> Blob) -> &Blob {
self.0.get_or_load(load)
}
#[inline(always)]
pub fn take_or_load(self, load: impl FnOnce(Bytes, u32) -> Blob) -> Blob {
self.0.take_or_load(load)
}
}
#[derive(Debug)]
struct LazyKey<K>(LazyObject<K>);
impl<K> LazyKey<K> {
#[inline(always)]
pub fn by_value(value: K) -> Self {
Self(LazyObject::by_value(value))
}
#[inline(always)]
pub fn by_ref(offset: Bytes, size: u32) -> Self {
Self(LazyObject::by_ref(offset, size))
}
#[inline(always)]
pub fn get_or_load(&self, load: impl FnOnce(Bytes, u32) -> K) -> &K {
self.0.get_or_load(load)
}
#[inline(always)]
pub fn take_or_load(self, load: impl FnOnce(Bytes, u32) -> K) -> K {
self.0.take_or_load(load)
}
}
#[derive(Debug, PartialEq, Copy, Clone, Eq)]
pub enum Version {
V1(DerivedPageSize),
V2(PageSize),
}
impl Version {
pub fn page_size(&self) -> PageSize {
match self {
Self::V1(page_size) => PageSize::Derived(*page_size),
Self::V2(page_size) => *page_size,
}
}
}
#[allow(dead_code)]
#[derive(Debug, PartialEq, Copy, Clone, Eq)]
pub enum PageSize {
Derived(DerivedPageSize),
Value(u32),
}
impl PageSize {
pub fn get(&self) -> u32 {
match self {
Self::Value(page_size) => *page_size,
Self::Derived(page_size) => page_size.get(),
}
}
}
#[derive(Debug, PartialEq, Copy, Clone, Eq)]
pub struct DerivedPageSize {
pub max_key_size: u32,
pub max_value_size: u32,
}
impl DerivedPageSize {
fn get(&self) -> u32 {
v1::size_v1(self.max_key_size, self.max_value_size).get() as u32
}
}