#![forbid(unsafe_code)]
use crate::error::{Error, Result};
use crate::pager::page::{Page, PAGE_SIZE, PAGE_TRAILER_SIZE};
pub const PAGE_TYPE_BTREE_INTERNAL: u8 = 0x02;
pub const PAGE_TYPE_BTREE_LEAF: u8 = 0x03;
pub const NODE_HEADER_SIZE: usize = 24;
const OFF_PAGE_TYPE: usize = 0;
const OFF_LEVEL: usize = 1;
const OFF_KEY_COUNT: usize = 2;
const OFF_FREE_SPACE_OFF: usize = 4;
const OFF_NEXT_SIBLING: usize = 8;
const OFF_RESERVED: usize = 16;
pub const LEAF_SLOT_BYTES: usize = 4;
pub const INTERNAL_SLOT_BYTES: usize = 12;
pub const INTERNAL_LEFTMOST_CHILD_BYTES: usize = 8;
const PAYLOAD_OFFSET: usize = NODE_HEADER_SIZE;
const PAYLOAD_END: usize = PAGE_SIZE - PAGE_TRAILER_SIZE;
pub const LEAF_SLOT_CAP: usize = (PAYLOAD_END - PAYLOAD_OFFSET) / LEAF_SLOT_BYTES;
pub const INTERNAL_SLOT_CAP: usize =
(PAYLOAD_END - PAYLOAD_OFFSET - INTERNAL_LEFTMOST_CHILD_BYTES) / INTERNAL_SLOT_BYTES;
#[must_use]
pub const fn max_key_len() -> usize {
PAGE_SIZE / 4
}
#[must_use]
pub const fn max_inline_value(key_len: usize) -> usize {
let base = PAYLOAD_END - PAYLOAD_OFFSET - LEAF_SLOT_BYTES;
let varints = 9 + 9;
if key_len + varints >= base {
0
} else {
base - key_len - varints
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NodeKind {
Internal,
Leaf,
}
#[derive(Debug, Clone)]
pub struct LeafEntry {
pub key: Vec<u8>,
pub value: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct InternalEntry {
pub key: Vec<u8>,
}
pub type ChildEntry = u64;
#[derive(Debug, Clone)]
pub struct DecodedNode {
pub kind: NodeKind,
pub level: u8,
pub next_sibling: u64,
pub children: Vec<ChildEntry>,
pub leaves: Vec<LeafEntry>,
pub internals: Vec<InternalEntry>,
}
impl DecodedNode {
#[must_use]
pub fn occupied_bytes(&self) -> usize {
match self.kind {
NodeKind::Leaf => {
let slot_bytes = self.leaves.len() * LEAF_SLOT_BYTES;
let heap_bytes: usize = self
.leaves
.iter()
.map(|e| {
varint_len(u64_from_usize(e.key.len()))
+ e.key.len()
+ varint_len(u64_from_usize(e.value.len()))
+ e.value.len()
})
.sum();
slot_bytes + heap_bytes
}
NodeKind::Internal => {
let slot_bytes = self.internals.len() * INTERNAL_SLOT_BYTES;
let heap_bytes: usize = self
.internals
.iter()
.map(|e| varint_len(u64_from_usize(e.key.len())) + e.key.len())
.sum();
INTERNAL_LEFTMOST_CHILD_BYTES + slot_bytes + heap_bytes
}
}
}
#[must_use]
pub fn free_bytes(&self) -> usize {
let cap = PAYLOAD_END - PAYLOAD_OFFSET;
cap.saturating_sub(self.occupied_bytes())
}
}
pub fn encode_node(node: &DecodedNode, page: &mut Page) -> Result<()> {
validate_node_release(node)?;
let buf = page.as_bytes_mut();
buf.fill(0);
write_node_header(buf, node);
match node.kind {
NodeKind::Leaf => encode_leaf_body(buf, node)?,
NodeKind::Internal => encode_internal_body(buf, node)?,
}
crate::pager::checksum::write_page_trailer(page);
Ok(())
}
fn write_node_header(buf: &mut [u8; PAGE_SIZE], node: &DecodedNode) {
let page_type = match node.kind {
NodeKind::Leaf => PAGE_TYPE_BTREE_LEAF,
NodeKind::Internal => PAGE_TYPE_BTREE_INTERNAL,
};
buf[OFF_PAGE_TYPE] = page_type;
buf[OFF_LEVEL] = node.level;
let key_count = match node.kind {
NodeKind::Leaf => node.leaves.len(),
NodeKind::Internal => node.internals.len(),
};
let kc16 = u16_from_usize(key_count);
buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&kc16.to_le_bytes());
let heap_bytes: usize = match node.kind {
NodeKind::Leaf => node
.leaves
.iter()
.map(|e| {
varint_len(u64_from_usize(e.key.len()))
+ e.key.len()
+ varint_len(u64_from_usize(e.value.len()))
+ e.value.len()
})
.sum(),
NodeKind::Internal => node
.internals
.iter()
.map(|e| varint_len(u64_from_usize(e.key.len())) + e.key.len())
.sum(),
};
let fso = u32_from_usize(PAYLOAD_END.saturating_sub(heap_bytes));
buf[OFF_FREE_SPACE_OFF..OFF_FREE_SPACE_OFF + 4].copy_from_slice(&fso.to_le_bytes());
buf[OFF_NEXT_SIBLING..OFF_NEXT_SIBLING + 8].copy_from_slice(&node.next_sibling.to_le_bytes());
let _ = OFF_RESERVED;
}
fn encode_leaf_body(buf: &mut [u8; PAGE_SIZE], node: &DecodedNode) -> Result<()> {
let mut heap_cursor = PAYLOAD_END;
let mut slot_off = PAYLOAD_OFFSET;
for entry in &node.leaves {
let entry_len = varint_len(u64_from_usize(entry.key.len()))
+ entry.key.len()
+ varint_len(u64_from_usize(entry.value.len()))
+ entry.value.len();
if heap_cursor < entry_len + slot_off + LEAF_SLOT_BYTES {
return Err(Error::BTreeInvariantViolated {
reason: "leaf encode: slot dir and heap collide",
});
}
heap_cursor -= entry_len;
let mut cur = heap_cursor;
let kl = u64_from_usize(entry.key.len());
cur += write_varint(&mut buf[cur..], kl);
buf[cur..cur + entry.key.len()].copy_from_slice(&entry.key);
cur += entry.key.len();
let vl = u64_from_usize(entry.value.len());
cur += write_varint(&mut buf[cur..], vl);
buf[cur..cur + entry.value.len()].copy_from_slice(&entry.value);
let off32 = u32_from_usize(heap_cursor);
buf[slot_off..slot_off + LEAF_SLOT_BYTES].copy_from_slice(&off32.to_le_bytes());
slot_off += LEAF_SLOT_BYTES;
}
Ok(())
}
fn encode_internal_body(buf: &mut [u8; PAGE_SIZE], node: &DecodedNode) -> Result<()> {
if node.children.len() != node.internals.len() + 1 {
return Err(Error::BTreeInvariantViolated {
reason: "internal node has children.len() != pivots+1",
});
}
let leftmost = node.children[0];
buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES]
.copy_from_slice(&leftmost.to_le_bytes());
let mut heap_cursor = PAYLOAD_END;
let mut slot_off = PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES;
for (i, pivot) in node.internals.iter().enumerate() {
let right_child = node.children[i + 1];
let entry_len = varint_len(u64_from_usize(pivot.key.len())) + pivot.key.len();
if heap_cursor < entry_len + slot_off + INTERNAL_SLOT_BYTES {
return Err(Error::BTreeInvariantViolated {
reason: "internal encode: slot dir and heap collide",
});
}
heap_cursor -= entry_len;
let mut cur = heap_cursor;
cur += write_varint(&mut buf[cur..], u64_from_usize(pivot.key.len()));
buf[cur..cur + pivot.key.len()].copy_from_slice(&pivot.key);
let off32 = u32_from_usize(heap_cursor);
buf[slot_off..slot_off + 4].copy_from_slice(&off32.to_le_bytes());
buf[slot_off + 4..slot_off + INTERNAL_SLOT_BYTES]
.copy_from_slice(&right_child.to_le_bytes());
slot_off += INTERNAL_SLOT_BYTES;
}
Ok(())
}
pub fn decode_node(buf: &[u8; PAGE_SIZE]) -> Result<DecodedNode> {
let page_type = buf[OFF_PAGE_TYPE];
let kind = match page_type {
PAGE_TYPE_BTREE_LEAF => NodeKind::Leaf,
PAGE_TYPE_BTREE_INTERNAL => NodeKind::Internal,
_ => return Err(Error::Corruption { page_id: 0 }),
};
let level = buf[OFF_LEVEL];
let key_count = u16::from_le_bytes([buf[OFF_KEY_COUNT], buf[OFF_KEY_COUNT + 1]]) as usize;
let next_sibling = u64::from_le_bytes([
buf[OFF_NEXT_SIBLING],
buf[OFF_NEXT_SIBLING + 1],
buf[OFF_NEXT_SIBLING + 2],
buf[OFF_NEXT_SIBLING + 3],
buf[OFF_NEXT_SIBLING + 4],
buf[OFF_NEXT_SIBLING + 5],
buf[OFF_NEXT_SIBLING + 6],
buf[OFF_NEXT_SIBLING + 7],
]);
let mut node = DecodedNode {
kind,
level,
next_sibling,
children: Vec::new(),
leaves: Vec::new(),
internals: Vec::new(),
};
match kind {
NodeKind::Leaf => decode_leaf_body(buf, &mut node, key_count)?,
NodeKind::Internal => decode_internal_body(buf, &mut node, key_count)?,
}
validate_node_release(&node)?;
Ok(node)
}
pub fn peek_node_kind(buf: &[u8; PAGE_SIZE]) -> Result<NodeKind> {
match buf[OFF_PAGE_TYPE] {
PAGE_TYPE_BTREE_LEAF => Ok(NodeKind::Leaf),
PAGE_TYPE_BTREE_INTERNAL => Ok(NodeKind::Internal),
_ => Err(Error::Corruption { page_id: 0 }),
}
}
pub fn decode_node_find(buf: &[u8; PAGE_SIZE], target: &[u8]) -> Result<Option<Vec<u8>>> {
match peek_node_kind(buf)? {
NodeKind::Leaf => {}
NodeKind::Internal => {
return Err(Error::BTreeInvariantViolated {
reason: "decode_node_find called on an internal node",
});
}
}
if buf[OFF_LEVEL] != 0 {
return Err(Error::BTreeInvariantViolated {
reason: "leaf has non-zero level",
});
}
let key_count = u16::from_le_bytes([buf[OFF_KEY_COUNT], buf[OFF_KEY_COUNT + 1]]) as usize;
if key_count > LEAF_SLOT_CAP {
return Err(Error::Corruption { page_id: 0 });
}
find_leaf_value(buf, key_count, target)
}
fn find_leaf_value(
buf: &[u8; PAGE_SIZE],
key_count: usize,
target: &[u8],
) -> Result<Option<Vec<u8>>> {
let mut prev_key: Option<&[u8]> = None;
for i in 0..key_count {
let slot_off = PAYLOAD_OFFSET + i * LEAF_SLOT_BYTES;
if slot_off + LEAF_SLOT_BYTES > PAYLOAD_END {
return Err(Error::Corruption { page_id: 0 });
}
let entry_off = u32::from_le_bytes([
buf[slot_off],
buf[slot_off + 1],
buf[slot_off + 2],
buf[slot_off + 3],
]) as usize;
if !(PAYLOAD_OFFSET..PAYLOAD_END).contains(&entry_off) {
return Err(Error::Corruption { page_id: 0 });
}
let (key, after_key) = read_length_prefixed(buf, entry_off)?;
let (value, _) = read_length_prefixed(buf, after_key)?;
if key.len() > max_key_len() {
return Err(Error::Corruption { page_id: 0 });
}
if let Some(prev) = prev_key {
if key <= prev {
return Err(Error::Corruption { page_id: 0 });
}
}
if key == target {
return Ok(Some(value.to_vec()));
}
prev_key = Some(key);
}
Ok(None)
}
fn decode_leaf_body(buf: &[u8; PAGE_SIZE], node: &mut DecodedNode, key_count: usize) -> Result<()> {
if key_count > LEAF_SLOT_CAP {
return Err(Error::Corruption { page_id: 0 });
}
node.leaves.reserve(key_count);
for i in 0..key_count {
let slot_off = PAYLOAD_OFFSET + i * LEAF_SLOT_BYTES;
if slot_off + LEAF_SLOT_BYTES > PAYLOAD_END {
return Err(Error::Corruption { page_id: 0 });
}
let entry_off = u32::from_le_bytes([
buf[slot_off],
buf[slot_off + 1],
buf[slot_off + 2],
buf[slot_off + 3],
]) as usize;
if !(PAYLOAD_OFFSET..PAYLOAD_END).contains(&entry_off) {
return Err(Error::Corruption { page_id: 0 });
}
let (key, after_key) = read_length_prefixed(buf, entry_off)?;
let (value, _) = read_length_prefixed(buf, after_key)?;
if key.len() > max_key_len() {
return Err(Error::Corruption { page_id: 0 });
}
if let Some(prev) = node.leaves.last() {
if key <= prev.key.as_slice() {
return Err(Error::Corruption { page_id: 0 });
}
}
node.leaves.push(LeafEntry {
key: key.to_vec(),
value: value.to_vec(),
});
}
Ok(())
}
fn decode_internal_body(
buf: &[u8; PAGE_SIZE],
node: &mut DecodedNode,
key_count: usize,
) -> Result<()> {
if key_count > INTERNAL_SLOT_CAP {
return Err(Error::Corruption { page_id: 0 });
}
let leftmost = read_u64(buf, PAYLOAD_OFFSET);
if leftmost == 0 {
return Err(Error::Corruption { page_id: 0 });
}
node.children.reserve(key_count + 1);
node.internals.reserve(key_count);
node.children.push(leftmost);
for i in 0..key_count {
let (key_vec, right_child) = decode_internal_slot(buf, i)?;
if let Some(prev) = node.internals.last() {
if key_vec.as_slice() <= prev.key.as_slice() {
return Err(Error::Corruption { page_id: 0 });
}
}
node.internals.push(InternalEntry { key: key_vec });
node.children.push(right_child);
}
Ok(())
}
fn decode_internal_slot(buf: &[u8; PAGE_SIZE], i: usize) -> Result<(Vec<u8>, u64)> {
let slot_off = PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES + i * INTERNAL_SLOT_BYTES;
if slot_off + INTERNAL_SLOT_BYTES > PAYLOAD_END {
return Err(Error::Corruption { page_id: 0 });
}
let entry_off = u32::from_le_bytes([
buf[slot_off],
buf[slot_off + 1],
buf[slot_off + 2],
buf[slot_off + 3],
]) as usize;
let right_child = read_u64(buf, slot_off + 4);
if right_child == 0 || !(PAYLOAD_OFFSET..PAYLOAD_END).contains(&entry_off) {
return Err(Error::Corruption { page_id: 0 });
}
let (key, _) = read_length_prefixed(buf, entry_off)?;
if key.len() > max_key_len() {
return Err(Error::Corruption { page_id: 0 });
}
Ok((key.to_vec(), right_child))
}
fn read_u64(buf: &[u8; PAGE_SIZE], offset: usize) -> u64 {
u64::from_le_bytes([
buf[offset],
buf[offset + 1],
buf[offset + 2],
buf[offset + 3],
buf[offset + 4],
buf[offset + 5],
buf[offset + 6],
buf[offset + 7],
])
}
fn read_length_prefixed(buf: &[u8; PAGE_SIZE], offset: usize) -> Result<(&[u8], usize)> {
if offset >= PAYLOAD_END {
return Err(Error::Corruption { page_id: 0 });
}
let (len, after) = read_varint(buf, offset)?;
let len_usize = usize_from_u64(len)?;
let end = after
.checked_add(len_usize)
.ok_or(Error::Corruption { page_id: 0 })?;
if end > PAYLOAD_END {
return Err(Error::Corruption { page_id: 0 });
}
Ok((&buf[after..end], end))
}
pub fn validate_node(node: &DecodedNode) -> Result<()> {
validate_node_release(node)
}
fn validate_node_release(node: &DecodedNode) -> Result<()> {
validate_node_structural(node)?;
validate_node_ordering(node)
}
fn validate_node_structural(node: &DecodedNode) -> Result<()> {
match node.kind {
NodeKind::Leaf => {
if node.level != 0 {
return Err(Error::BTreeInvariantViolated {
reason: "leaf has non-zero level",
});
}
if node.leaves.len() > LEAF_SLOT_CAP {
return Err(Error::BTreeInvariantViolated {
reason: "leaf key count exceeds slot cap",
});
}
}
NodeKind::Internal => {
if node.level == 0 {
return Err(Error::BTreeInvariantViolated {
reason: "internal node at level 0",
});
}
if node.internals.len() > INTERNAL_SLOT_CAP {
return Err(Error::BTreeInvariantViolated {
reason: "internal key count exceeds slot cap",
});
}
if node.children.len() != node.internals.len() + 1 {
return Err(Error::BTreeInvariantViolated {
reason: "internal children.len() != pivots+1",
});
}
for c in &node.children {
if *c == 0 {
return Err(Error::BTreeInvariantViolated {
reason: "internal node has zero child page-id",
});
}
}
if node.next_sibling != 0 {
return Err(Error::BTreeInvariantViolated {
reason: "internal node has non-zero next_sibling",
});
}
}
}
Ok(())
}
fn validate_node_ordering(node: &DecodedNode) -> Result<()> {
match node.kind {
NodeKind::Leaf => {
for w in node.leaves.windows(2) {
if w[0].key.as_slice() >= w[1].key.as_slice() {
return Err(Error::BTreeInvariantViolated {
reason: "leaf keys not strictly sorted",
});
}
}
}
NodeKind::Internal => {
for w in node.internals.windows(2) {
if w[0].key.as_slice() >= w[1].key.as_slice() {
return Err(Error::BTreeInvariantViolated {
reason: "internal keys not strictly sorted",
});
}
}
}
}
Ok(())
}
#[must_use]
pub fn varint_len(v: u64) -> usize {
let mut n: usize = 1;
let mut x = v >> 7;
while x != 0 {
n += 1;
x >>= 7;
}
n
}
fn write_varint(dst: &mut [u8], mut v: u64) -> usize {
let mut i = 0;
loop {
let mut byte = (v & 0x7F) as u8;
v >>= 7;
if v != 0 {
byte |= 0x80;
dst[i] = byte;
i += 1;
} else {
dst[i] = byte;
i += 1;
return i;
}
}
}
fn read_varint(buf: &[u8; PAGE_SIZE], offset: usize) -> Result<(u64, usize)> {
let mut value: u64 = 0;
let mut shift: u32 = 0;
let mut i = offset;
for _ in 0..10 {
if i >= PAYLOAD_END {
return Err(Error::Corruption { page_id: 0 });
}
let byte = buf[i];
i += 1;
let chunk = u64::from(byte & 0x7F);
value |= chunk << shift;
if (byte & 0x80) == 0 {
return Ok((value, i));
}
shift += 7;
if shift >= 64 {
return Err(Error::Corruption { page_id: 0 });
}
}
Err(Error::Corruption { page_id: 0 })
}
fn u32_from_usize(v: usize) -> u32 {
debug_assert!(u32::try_from(v).is_ok());
u32::try_from(v).unwrap_or(u32::MAX)
}
fn u16_from_usize(v: usize) -> u16 {
debug_assert!(u16::try_from(v).is_ok());
u16::try_from(v).unwrap_or(u16::MAX)
}
fn u64_from_usize(v: usize) -> u64 {
v as u64
}
fn usize_from_u64(v: u64) -> Result<usize> {
usize::try_from(v).map_err(|_| Error::Corruption { page_id: 0 })
}
#[cfg(test)]
mod tests {
use super::*;
fn empty_leaf() -> DecodedNode {
DecodedNode {
kind: NodeKind::Leaf,
level: 0,
next_sibling: 0,
children: Vec::new(),
leaves: Vec::new(),
internals: Vec::new(),
}
}
fn leaf_with(entries: &[(&[u8], &[u8])]) -> DecodedNode {
let mut leaf = empty_leaf();
for (k, v) in entries {
leaf.leaves.push(LeafEntry {
key: k.to_vec(),
value: v.to_vec(),
});
}
leaf
}
#[test]
fn round_trip_empty_leaf() {
let leaf = empty_leaf();
let mut page = Page::zeroed();
encode_node(&leaf, &mut page).expect("encode");
let decoded = decode_node(page.as_bytes()).expect("decode");
assert_eq!(decoded.kind, NodeKind::Leaf);
assert_eq!(decoded.level, 0);
assert_eq!(decoded.next_sibling, 0);
assert!(decoded.leaves.is_empty());
}
#[test]
fn round_trip_populated_leaf() {
let leaf = leaf_with(&[(b"alpha", b"AAA"), (b"bravo", b"BBB"), (b"charlie", b"CCC")]);
let mut page = Page::zeroed();
encode_node(&leaf, &mut page).expect("encode");
let decoded = decode_node(page.as_bytes()).expect("decode");
assert_eq!(decoded.leaves.len(), 3);
assert_eq!(decoded.leaves[0].key.as_slice(), b"alpha");
assert_eq!(decoded.leaves[0].value.as_slice(), b"AAA");
assert_eq!(decoded.leaves[2].key.as_slice(), b"charlie");
}
#[test]
fn round_trip_internal() {
let mut internal = DecodedNode {
kind: NodeKind::Internal,
level: 1,
next_sibling: 0,
children: Vec::new(),
leaves: Vec::new(),
internals: Vec::new(),
};
for raw in [10u64, 20, 30, 40] {
internal.children.push(raw);
}
for k in [b"d".as_slice(), b"h", b"m"] {
internal.internals.push(InternalEntry { key: k.to_vec() });
}
let mut page = Page::zeroed();
encode_node(&internal, &mut page).expect("encode");
let decoded = decode_node(page.as_bytes()).expect("decode");
assert_eq!(decoded.kind, NodeKind::Internal);
assert_eq!(decoded.level, 1);
assert_eq!(decoded.internals.len(), 3);
assert_eq!(decoded.children.as_slice(), &[10, 20, 30, 40]);
assert_eq!(decoded.internals[0].key.as_slice(), b"d");
assert_eq!(decoded.internals[2].key.as_slice(), b"m");
}
#[test]
fn decode_rejects_unsorted_leaf() {
let mut page = Page::zeroed();
let buf = page.as_bytes_mut();
buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
buf[OFF_LEVEL] = 0;
buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&2u16.to_le_bytes());
let slot0 = u32_from_usize(PAYLOAD_END - 4);
let slot1 = u32_from_usize(PAYLOAD_END - 8);
buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&slot0.to_le_bytes());
buf[PAYLOAD_OFFSET + 4..PAYLOAD_OFFSET + 8].copy_from_slice(&slot1.to_le_bytes());
buf[PAYLOAD_END - 4] = 1;
buf[PAYLOAD_END - 3] = b'c';
buf[PAYLOAD_END - 2] = 1;
buf[PAYLOAD_END - 1] = b'x';
buf[PAYLOAD_END - 8] = 1;
buf[PAYLOAD_END - 7] = b'a';
buf[PAYLOAD_END - 6] = 1;
buf[PAYLOAD_END - 5] = b'y';
crate::pager::checksum::write_page_trailer(&mut page);
let err = decode_node(page.as_bytes()).expect_err("decode must reject");
assert!(matches!(err, Error::Corruption { .. }));
}
#[test]
fn varint_round_trip() {
for v in [0u64, 1, 127, 128, 0xFFFF, 0xDEAD_BEEF, u64::MAX] {
let mut buf = [0u8; 16];
let n = write_varint(&mut buf, v);
assert_eq!(n, varint_len(v));
let mut page = [0u8; PAGE_SIZE];
page[..n].copy_from_slice(&buf[..n]);
let (decoded, after) = read_varint(&page, 0).expect("decode varint");
assert_eq!(decoded, v);
assert_eq!(after, n);
}
}
#[test]
fn decode_node_varint_max_returns_corruption_not_panic() {
let mut page = Page::zeroed();
{
let buf = page.as_bytes_mut();
buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
buf[OFF_LEVEL] = 0;
buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&1u16.to_le_bytes());
let entry_off = PAYLOAD_END - 16;
let slot_off_bytes = u32_from_usize(entry_off).to_le_bytes();
buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&slot_off_bytes);
let varint = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01];
buf[entry_off..entry_off + varint.len()].copy_from_slice(&varint);
}
crate::pager::checksum::write_page_trailer(&mut page);
let result = decode_node(page.as_bytes());
match result {
Err(Error::Corruption { .. }) => {}
other => panic!(
"expected Err(Error::Corruption), got {:?} \
(a panic here would indicate the M13 #115 bug \
is still present in release builds)",
other.map_or("non-corruption err", |_| "Ok(_)")
),
}
}
#[test]
fn decode_internal_node_varint_max_returns_corruption() {
let mut page = Page::zeroed();
{
let buf = page.as_bytes_mut();
buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_INTERNAL;
buf[OFF_LEVEL] = 1;
buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&1u16.to_le_bytes());
buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 8].copy_from_slice(&42u64.to_le_bytes());
let slot_base = PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES;
let entry_off = PAYLOAD_END - 16;
buf[slot_base..slot_base + 4].copy_from_slice(&u32_from_usize(entry_off).to_le_bytes());
buf[slot_base + 4..slot_base + INTERNAL_SLOT_BYTES]
.copy_from_slice(&7u64.to_le_bytes());
let varint = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01];
buf[entry_off..entry_off + varint.len()].copy_from_slice(&varint);
}
crate::pager::checksum::write_page_trailer(&mut page);
let result = decode_node(page.as_bytes());
assert!(
matches!(result, Err(Error::Corruption { .. })),
"expected Err(Error::Corruption) on u64::MAX varint in internal slot"
);
}
#[test]
fn max_inline_value_is_below_payload() {
let k = 8;
let v = max_inline_value(k);
let entry_len = varint_len(u64_from_usize(k)) + k + varint_len(u64_from_usize(v)) + v;
let payload = entry_len + LEAF_SLOT_BYTES;
assert!(payload <= PAYLOAD_END - PAYLOAD_OFFSET);
}
#[test]
fn decode_node_find_matches_full_decode() {
let leaf = leaf_with(&[(b"alpha", b"AAA"), (b"bravo", b"BBB"), (b"charlie", b"CCC")]);
let mut page = Page::zeroed();
encode_node(&leaf, &mut page).expect("encode");
let found = decode_node_find(page.as_bytes(), b"bravo").expect("find");
assert_eq!(found.as_deref(), Some(b"BBB".as_slice()));
let missing = decode_node_find(page.as_bytes(), b"zeta").expect("find");
assert_eq!(missing, None);
assert_eq!(
decode_node_find(page.as_bytes(), b"alpha")
.expect("find")
.as_deref(),
Some(b"AAA".as_slice())
);
assert_eq!(
decode_node_find(page.as_bytes(), b"charlie")
.expect("find")
.as_deref(),
Some(b"CCC".as_slice())
);
}
#[test]
fn decode_node_find_rejects_corrupted_slot_offset() {
let leaf = leaf_with(&[(b"a", b"x"), (b"b", b"y")]);
let mut page = Page::zeroed();
encode_node(&leaf, &mut page).expect("encode");
let buf = page.as_bytes_mut();
let bad = u32_from_usize(PAYLOAD_END + 4);
buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&bad.to_le_bytes());
crate::pager::checksum::write_page_trailer(&mut page);
let r = decode_node_find(page.as_bytes(), b"b");
assert!(
matches!(r, Err(Error::Corruption { .. })),
"corrupted slot offset not caught by find path: {r:?}"
);
}
#[test]
fn decode_node_find_rejects_out_of_order_keys() {
let mut page = Page::zeroed();
{
let buf = page.as_bytes_mut();
buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
buf[OFF_LEVEL] = 0;
buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&2u16.to_le_bytes());
let slot0 = u32_from_usize(PAYLOAD_END - 4);
let slot1 = u32_from_usize(PAYLOAD_END - 8);
buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&slot0.to_le_bytes());
buf[PAYLOAD_OFFSET + 4..PAYLOAD_OFFSET + 8].copy_from_slice(&slot1.to_le_bytes());
buf[PAYLOAD_END - 4] = 1;
buf[PAYLOAD_END - 3] = b'c';
buf[PAYLOAD_END - 2] = 1;
buf[PAYLOAD_END - 1] = b'x';
buf[PAYLOAD_END - 8] = 1;
buf[PAYLOAD_END - 7] = b'a';
buf[PAYLOAD_END - 6] = 1;
buf[PAYLOAD_END - 5] = b'y';
}
crate::pager::checksum::write_page_trailer(&mut page);
let r = decode_node_find(page.as_bytes(), b"z");
assert!(
matches!(r, Err(Error::Corruption { .. })),
"out-of-order key not caught by find path: {r:?}"
);
}
#[test]
fn decode_node_find_rejects_over_long_key() {
let mut page = Page::zeroed();
{
let buf = page.as_bytes_mut();
buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
buf[OFF_LEVEL] = 0;
buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&1u16.to_le_bytes());
let over = max_key_len() + 1;
let entry_off = PAYLOAD_OFFSET + LEAF_SLOT_BYTES;
buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4]
.copy_from_slice(&u32_from_usize(entry_off).to_le_bytes());
let n = write_varint(&mut buf[entry_off..], u64_from_usize(over));
assert_eq!(n, varint_len(u64_from_usize(over)));
}
crate::pager::checksum::write_page_trailer(&mut page);
let r = decode_node_find(page.as_bytes(), b"anything");
assert!(
matches!(r, Err(Error::Corruption { .. })),
"over-long key length not caught by find path: {r:?}"
);
}
#[test]
fn decode_node_find_rejects_internal_node() {
let mut internal = DecodedNode {
kind: NodeKind::Internal,
level: 1,
next_sibling: 0,
children: Vec::new(),
leaves: Vec::new(),
internals: Vec::new(),
};
internal.children.push(10);
internal.children.push(20);
internal
.internals
.push(InternalEntry { key: b"m".to_vec() });
let mut page = Page::zeroed();
encode_node(&internal, &mut page).expect("encode");
let r = decode_node_find(page.as_bytes(), b"m");
assert!(matches!(r, Err(Error::BTreeInvariantViolated { .. })));
}
}