use super::*;
use crate::btreemap::Allocator;
use crate::{btreemap::node::io::NodeWriter, types::NULL};
pub(super) const OVERFLOW_ADDRESS_OFFSET: Bytes = Bytes::new(7);
const ENTRIES_OFFSET: Bytes = Bytes::new(15);
pub(super) const OVERFLOW_MAGIC: &[u8; 3] = b"NOF";
pub(super) const PAGE_OVERFLOW_NEXT_OFFSET: Bytes = Bytes::new(3);
pub(super) const PAGE_OVERFLOW_DATA_OFFSET: Bytes = Bytes::new(11);
const MINIMUM_PAGE_SIZE: u32 = 128;
impl<K: Storable + Ord + Clone> Node<K> {
pub fn new_v2(address: Address, node_type: NodeType, page_size: PageSize) -> Node<K> {
assert!(page_size.get() >= MINIMUM_PAGE_SIZE);
Node {
address,
node_type,
version: Version::V2(page_size),
entries: vec![],
children: vec![],
overflows: Vec::with_capacity(0),
}
}
pub(super) fn load_v2<M: Memory>(
address: Address,
page_size: PageSize,
header: NodeHeader,
memory: &M,
) -> Self {
#[cfg(feature = "bench_scope")]
let _p = canbench_rs::bench_scope("node_load_v2");
let overflows = read_overflows(address, memory);
let reader = NodeReader {
address,
overflows: &overflows,
page_size,
memory,
};
let mut offset = Address::from(0);
let node_type = match header.node_type {
LEAF_NODE_TYPE => NodeType::Leaf,
INTERNAL_NODE_TYPE => NodeType::Internal,
other => unreachable!("Unknown node type {}", other),
};
let num_entries = header.num_entries as usize;
offset += ENTRIES_OFFSET;
let children = if node_type == NodeType::Internal {
const CHILDREN_BATCH_READ_THRESHOLD: usize = 4;
let count = num_entries + 1;
if count <= CHILDREN_BATCH_READ_THRESHOLD {
let mut children = Vec::with_capacity(count);
for _ in 0..count {
children.push(Address::from(read_u64(&reader, offset)));
offset += Address::size();
}
children
} else {
let children = read_address_vec(&reader, offset, count);
offset += Bytes::from(count) * Address::size();
children
}
} else {
vec![]
};
const EAGER_LOAD_KEY_SIZE_THRESHOLD: u32 = 16;
let mut entries = Vec::with_capacity(num_entries);
let mut buf = vec![];
for _ in 0..num_entries {
let key_offset = Bytes::from(offset.get());
let key_size = if K::BOUND.is_fixed_size() {
K::BOUND.max_size()
} else {
let size = read_u32(&reader, offset);
offset += U32_SIZE;
size
};
let key = if key_size <= EAGER_LOAD_KEY_SIZE_THRESHOLD {
read_to_vec(
&reader,
Address::from(offset.get()),
&mut buf,
key_size as usize,
);
LazyKey::by_value(K::from_bytes(Cow::Borrowed(&buf)))
} else {
LazyKey::by_ref(key_offset, key_size)
};
offset += Bytes::from(key_size);
entries.push((key, LazyValue::by_ref(Bytes::from(0_u64), 0)));
}
for (_key, value) in entries.iter_mut() {
let value_size = read_u32(&reader, offset);
*value = LazyValue::by_ref(Bytes::from(offset.get()), value_size);
offset += U32_SIZE + Bytes::from(value_size as u64);
}
Self {
address,
entries,
children,
node_type,
version: Version::V2(page_size),
overflows,
}
}
pub(super) fn save_v2<M: Memory>(&mut self, allocator: &mut Allocator<M>) {
#[cfg(feature = "bench_scope")]
let _p = canbench_rs::bench_scope("node_save_v2");
let page_size = self.version.page_size().get();
assert!(page_size >= MINIMUM_PAGE_SIZE);
for i in 0..self.entries.len() {
self.entry(i, allocator.memory());
}
let mut writer = NodeWriter::new(
self.address,
self.overflows.clone(),
self.page_size(),
allocator,
);
let mut offset = Address::from(0);
let header = NodeHeader {
magic: *MAGIC,
version: LAYOUT_VERSION_2,
node_type: match self.node_type {
NodeType::Leaf => LEAF_NODE_TYPE,
NodeType::Internal => INTERNAL_NODE_TYPE,
},
num_entries: self.entries.len() as u16,
};
writer.write_struct(&header, offset);
offset += NodeHeader::size();
writer.write_u64(offset, self.overflows.first().unwrap_or(&NULL).get());
offset += Bytes::from(8u64);
let count = self.children.len();
let byte_len = count * Address::size().get() as usize;
let mut bytes = Vec::with_capacity(byte_len);
for child in &self.children {
bytes.extend_from_slice(&child.get().to_le_bytes());
}
writer.write(offset, &bytes);
offset += Bytes::from(byte_len);
for i in 0..self.entries.len() {
let key = self.key(i, writer.memory());
let key_bytes = key.to_bytes_checked();
if !K::BOUND.is_fixed_size() {
writer.write_u32(offset, key_bytes.len() as u32);
offset += U32_SIZE;
}
writer.write(offset, key_bytes.borrow());
offset += Bytes::from(key_bytes.len());
}
for i in 0..self.entries.len() {
let value = self.value(i, writer.memory());
writer.write_u32(offset, value.len() as u32);
offset += U32_SIZE;
writer.write(offset, value);
offset += Bytes::from(value.len());
}
self.overflows = writer.finish();
}
}
fn read_overflows<M: Memory>(address: Address, memory: &M) -> Vec<Address> {
#[repr(C, packed)]
struct OverflowPageHeader {
magic: [u8; 3],
next: Address,
}
let mut overflows = vec![];
let mut overflow = Address::from(read_u64(memory, address + OVERFLOW_ADDRESS_OFFSET));
while overflow != NULL {
overflows.push(overflow);
let header: OverflowPageHeader = read_struct(overflow, memory);
assert_eq!(&header.magic, OVERFLOW_MAGIC, "Bad overflow magic.");
overflow = header.next;
}
overflows
}