use crate::{
btreemap::Allocator,
read_struct, read_u32, read_u64,
storable::Storable,
types::{Address, Bytes},
write, write_struct, write_u32, Memory,
};
use std::borrow::{Borrow, Cow};
use std::cell::{Ref, RefCell};
mod io;
#[cfg(test)]
mod tests;
mod v1;
mod v2;
use io::NodeReader;
const B: usize = 6;
const CAPACITY: usize = 2 * B - 1;
const LAYOUT_VERSION_1: u8 = 1;
const LAYOUT_VERSION_2: u8 = 2;
const MAGIC: &[u8; 3] = b"BTN";
const LEAF_NODE_TYPE: u8 = 0;
const INTERNAL_NODE_TYPE: u8 = 1;
const U32_SIZE: Bytes = Bytes::new(4);
#[derive(Debug, PartialEq, Copy, Clone, Eq)]
#[cfg_attr(test, derive(test_strategy::Arbitrary))]
pub enum NodeType {
Leaf,
Internal,
}
pub type Entry<K> = (K, Vec<u8>);
#[derive(Debug)]
pub struct Node<K: Storable + Ord + Clone> {
address: Address,
keys: Vec<K>,
encoded_values: RefCell<Vec<Value>>,
children: Vec<Address>,
node_type: NodeType,
version: Version,
overflows: Vec<Address>,
}
impl<K: Storable + Ord + Clone> Node<K> {
pub fn load<M: Memory>(address: Address, page_size: PageSize, memory: &M) -> Self {
let header: NodeHeader = read_struct(address, memory);
assert_eq!(&header.magic, MAGIC, "Bad magic.");
match header.version {
LAYOUT_VERSION_1 => match page_size {
PageSize::Derived(DerivedPageSize {
max_key_size,
max_value_size,
}) => Self::load_v1(header, address, max_key_size, max_value_size, memory),
PageSize::Value(_) => {
unreachable!("Tried to load a V1 node without a derived PageSize.")
}
},
LAYOUT_VERSION_2 => Self::load_v2(address, page_size, header, memory),
unknown_version => unreachable!("Unsupported version {unknown_version}."),
}
}
pub fn save<M: Memory>(&mut self, allocator: &mut Allocator<M>) {
match self.version {
Version::V1(_) => self.save_v1(allocator.memory()),
Version::V2(_) => self.save_v2(allocator),
}
}
pub fn address(&self) -> Address {
self.address
}
pub fn node_type(&self) -> NodeType {
self.node_type
}
pub fn get_max<M: Memory>(&self, memory: &M) -> Entry<K> {
match self.node_type {
NodeType::Leaf => {
let last_idx = self.encoded_values.borrow().len() - 1;
(
self.keys.last().expect("A node can never be empty").clone(),
self.value(last_idx, memory).to_vec(),
)
}
NodeType::Internal => {
let last_child = Self::load(
*self
.children
.last()
.expect("An internal node must have children."),
self.version.page_size(),
memory,
);
last_child.get_max(memory)
}
}
}
pub fn get_min<M: Memory>(&self, memory: &M) -> Entry<K> {
match self.node_type {
NodeType::Leaf => {
self.entry(0, memory)
}
NodeType::Internal => {
let first_child = Self::load(
self.children[0],
self.version.page_size(),
memory,
);
first_child.get_min(memory)
}
}
}
pub fn is_full(&self) -> bool {
self.keys.len() >= CAPACITY
}
pub fn swap_entry<M: Memory>(
&mut self,
idx: usize,
(mut key, value): Entry<K>,
memory: &M,
) -> Entry<K> {
core::mem::swap(&mut self.keys[idx], &mut key);
let old_value = self.value(idx, memory).to_vec();
self.encoded_values.borrow_mut()[idx] = Value::ByVal(value);
(key, old_value)
}
pub fn entry<M: Memory>(&self, idx: usize, memory: &M) -> Entry<K> {
(self.keys[idx].clone(), self.value(idx, memory).to_vec())
}
pub fn value<M: Memory>(&self, idx: usize, memory: &M) -> Ref<[u8]> {
{
let mut values = self.encoded_values.borrow_mut();
if let Value::ByRef(offset) = values[idx] {
let reader = NodeReader {
address: self.address,
overflows: &self.overflows,
page_size: self.page_size(),
memory,
};
let value_len = read_u32(&reader, Address::from(offset.get())) as usize;
let mut value = vec![0; value_len];
reader.read((offset + U32_SIZE).get(), &mut value);
values[idx] = Value::ByVal(value);
}
}
Ref::map(self.encoded_values.borrow(), |values| {
if let Value::ByVal(v) = &values[idx] {
&v[..]
} else {
unreachable!("value must have been loaded already.");
}
})
}
fn page_size(&self) -> PageSize {
self.version.page_size()
}
pub fn key(&self, idx: usize) -> &K {
&self.keys[idx]
}
pub fn child(&self, idx: usize) -> Address {
self.children[idx]
}
pub fn insert_child(&mut self, idx: usize, address: Address) {
self.children.insert(idx, address)
}
pub fn push_child(&mut self, address: Address) {
self.children.push(address)
}
pub fn remove_child(&mut self, idx: usize) -> Address {
self.children.remove(idx)
}
pub fn children_len(&self) -> usize {
self.children.len()
}
pub fn pop_child(&mut self) -> Option<Address> {
self.children.pop()
}
pub fn insert_entry(&mut self, idx: usize, (key, encoded_value): Entry<K>) {
self.keys.insert(idx, key);
self.encoded_values
.borrow_mut()
.insert(idx, Value::ByVal(encoded_value));
}
pub fn remove_entry<M: Memory>(&mut self, idx: usize, memory: &M) -> Entry<K> {
let value = self.value(idx, memory).to_vec();
self.encoded_values.borrow_mut().remove(idx);
(self.keys.remove(idx), value)
}
pub fn push_entry(&mut self, (key, encoded_value): Entry<K>) {
self.keys.push(key);
self.encoded_values
.borrow_mut()
.push(Value::ByVal(encoded_value));
}
pub fn pop_entry<M: Memory>(&mut self, memory: &M) -> Option<Entry<K>> {
let len = self.entries_len();
if len == 0 {
return None;
}
let key = self.keys.pop().expect("node must not be empty");
let last_value = self.value(len - 1, memory).to_vec();
self.encoded_values
.borrow_mut()
.pop()
.expect("node must not be empty");
Some((key, last_value))
}
pub fn merge<M: Memory>(
&mut self,
mut source: Node<K>,
median: Entry<K>,
allocator: &mut Allocator<M>,
) {
for i in 0..source.entries_len() {
source.value(i, allocator.memory());
}
if source.key(0) > self.key(0) {
Self::append(self, &mut source, median);
} else {
Self::append(&mut source, self, median);
self.keys = core::mem::take(&mut source.keys);
self.encoded_values = core::mem::take(&mut source.encoded_values);
self.children = core::mem::take(&mut source.children);
}
self.save(allocator);
source.deallocate(allocator);
}
fn append(a: &mut Node<K>, b: &mut Node<K>, median: Entry<K>) {
let a_len = a.entries_len();
assert_eq!(a.node_type(), b.node_type());
assert!(b.entries_len() > 0);
assert!(a_len > 0);
assert!(a.key(a_len - 1) < &median.0);
assert!(&median.0 < b.key(0));
a.push_entry(median);
a.keys.append(&mut b.keys);
a.encoded_values
.borrow_mut()
.append(&mut b.encoded_values.borrow_mut());
a.children.append(&mut b.children);
assert_eq!(b.keys.len(), 0);
assert_eq!(b.encoded_values.borrow().len(), 0);
assert_eq!(b.children.len(), 0);
}
#[cfg(test)]
pub fn entries<M: Memory>(&self, memory: &M) -> Vec<Entry<K>> {
self.keys
.iter()
.cloned()
.zip((0..self.keys.len()).map(|idx| self.value(idx, memory).to_vec()))
.collect()
}
#[cfg(test)]
pub fn keys(&self) -> &[K] {
&self.keys
}
#[cfg(test)]
pub fn overflows(&self) -> &[Address] {
&self.overflows
}
pub fn entries_len(&self) -> usize {
self.keys.len()
}
pub fn search(&self, key: &K) -> Result<usize, usize> {
self.keys.binary_search(key)
}
pub fn max_size(max_key_size: u32, max_value_size: u32) -> Bytes {
v1::size_v1(max_key_size, max_value_size)
}
pub fn at_minimum(&self) -> bool {
self.keys.len() < B
}
pub fn can_remove_entry_without_merging(&self) -> bool {
!self.at_minimum()
}
pub fn split<M: Memory>(&mut self, sibling: &mut Node<K>, memory: &M) -> Entry<K> {
debug_assert!(self.is_full());
for idx in B..self.entries_len() {
self.value(idx, memory);
}
sibling.keys = self.keys.split_off(B);
*sibling.encoded_values.borrow_mut() = self.encoded_values.borrow_mut().split_off(B);
if self.node_type == NodeType::Internal {
sibling.children = self.children.split_off(B);
}
self.pop_entry(memory)
.expect("An initially full node cannot be empty")
}
pub fn deallocate<M: Memory>(self, allocator: &mut Allocator<M>) {
for overflow in self.overflows.into_iter() {
allocator.deallocate(overflow);
}
allocator.deallocate(self.address);
}
}
#[repr(C, packed)]
struct NodeHeader {
magic: [u8; 3],
version: u8,
node_type: u8,
num_entries: u16,
}
impl NodeHeader {
fn size() -> Bytes {
Bytes::from(core::mem::size_of::<Self>() as u64)
}
}
#[derive(Debug)]
enum Value {
ByVal(Vec<u8>),
ByRef(Bytes),
}
#[derive(Debug, PartialEq, Copy, Clone, Eq)]
pub enum Version {
V1(DerivedPageSize),
V2(PageSize),
}
impl Version {
pub fn page_size(&self) -> PageSize {
match self {
Self::V1(page_size) => PageSize::Derived(*page_size),
Self::V2(page_size) => *page_size,
}
}
}
#[allow(dead_code)]
#[derive(Debug, PartialEq, Copy, Clone, Eq)]
pub enum PageSize {
Derived(DerivedPageSize),
Value(u32),
}
impl PageSize {
pub fn get(&self) -> u32 {
match self {
Self::Value(page_size) => *page_size,
Self::Derived(page_size) => page_size.get(),
}
}
}
#[derive(Debug, PartialEq, Copy, Clone, Eq)]
pub struct DerivedPageSize {
pub max_key_size: u32,
pub max_value_size: u32,
}
impl DerivedPageSize {
fn get(&self) -> u32 {
v1::size_v1(self.max_key_size, self.max_value_size).get() as u32
}
}