#![forbid(unsafe_code)]
use heapless::Vec as HeaplessVec;
use crate::btree::node::{
decode_node, encode_node, max_inline_value, max_key_len, DecodedNode, InternalEntry, LeafEntry,
NodeKind, INTERNAL_LEFTMOST_CHILD_BYTES, INTERNAL_SLOT_BYTES, LEAF_SLOT_BYTES,
};
use crate::btree::{BTree, MAX_BTREE_DEPTH};
use crate::error::{Error, Result};
use crate::pager::page::{Page, PageId, PAGE_SIZE, PAGE_TRAILER_SIZE};
use crate::pager::Pager;
use crate::platform::FileBackend;
struct PathFrame {
page_id: PageId,
node: DecodedNode,
child_index: usize,
}
struct Promoted {
key: Vec<u8>,
right_id: PageId,
}
enum ReplaceOutcome {
Fits { new_id: PageId },
Split {
left_id: PageId,
promoted: Vec<Promoted>,
},
}
const PAYLOAD_BYTES: usize = PAGE_SIZE - PAGE_TRAILER_SIZE - crate::btree::node::NODE_HEADER_SIZE;
impl<F: FileBackend> BTree<F> {
pub fn insert(&mut self, pager: &mut Pager<F>, key: &[u8], value: &[u8]) -> Result<()> {
check_key_value_size(key, value)?;
let path = self.descend_with_path(pager, key)?;
self.apply_insert(pager, path, key, value)
}
fn descend_with_path(
&self,
pager: &mut Pager<F>,
key: &[u8],
) -> Result<HeaplessVec<PathFrame, MAX_BTREE_DEPTH>> {
let mut path: HeaplessVec<PathFrame, MAX_BTREE_DEPTH> = HeaplessVec::new();
let mut current = self.root;
loop {
let decoded = {
let page_ref = pager.read_page(current)?;
decode_node(page_ref.as_bytes())?
};
match decoded.kind {
NodeKind::Leaf => {
let frame = PathFrame {
page_id: current,
node: decoded,
child_index: 0,
};
if path.push(frame).is_err() {
return Err(Error::BTreeDepthExceeded {
limit: MAX_BTREE_DEPTH,
});
}
return Ok(path);
}
NodeKind::Internal => {
let child_index = pivot_index(&decoded, key);
let raw = decoded.children[child_index];
let next = PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
reason: "internal node had zero child page-id",
})?;
let frame = PathFrame {
page_id: current,
node: decoded,
child_index,
};
if path.push(frame).is_err() {
return Err(Error::BTreeDepthExceeded {
limit: MAX_BTREE_DEPTH,
});
}
current = next;
}
}
}
}
fn apply_insert(
&mut self,
pager: &mut Pager<F>,
mut path: HeaplessVec<PathFrame, MAX_BTREE_DEPTH>,
key: &[u8],
value: &[u8],
) -> Result<()> {
let mut freed: HeaplessVec<PageId, { MAX_BTREE_DEPTH * 2 }> = HeaplessVec::new();
let Some(leaf_frame) = path.pop() else {
return Err(Error::BTreeInvariantViolated {
reason: "insert: descend returned empty path",
});
};
let mut outcome = replace_leaf(pager, leaf_frame, key, value, &mut freed)?;
while let Some(parent_frame) = path.pop() {
outcome = replace_internal(pager, parent_frame, outcome, &mut freed)?;
}
let new_root = build_new_root(pager, outcome)?;
self.root = new_root;
for old_id in freed.iter().copied() {
pager.free_page(old_id)?;
}
Ok(())
}
}
fn replace_leaf<F: FileBackend>(
pager: &mut Pager<F>,
frame: PathFrame,
key: &[u8],
value: &[u8],
freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 2 }>,
) -> Result<ReplaceOutcome> {
let mut leaf = frame.node;
if leaf.leaves.iter().any(|e| e.key.as_slice() == key) {
return Err(Error::BTreeKeyExists);
}
let insert_at = leaf
.leaves
.iter()
.position(|e| e.key.as_slice() > key)
.unwrap_or(leaf.leaves.len());
leaf.leaves.insert(
insert_at,
LeafEntry {
key: key.to_vec(),
value: value.to_vec(),
},
);
push_freed(freed, frame.page_id)?;
if leaf.occupied_bytes() <= PAYLOAD_BYTES {
let new_id = write_new_node(pager, &leaf)?;
return Ok(ReplaceOutcome::Fits { new_id });
}
split_leaf(pager, leaf)
}
fn replace_internal<F: FileBackend>(
pager: &mut Pager<F>,
frame: PathFrame,
child_outcome: ReplaceOutcome,
freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 2 }>,
) -> Result<ReplaceOutcome> {
let mut internal = frame.node;
let idx = frame.child_index;
match child_outcome {
ReplaceOutcome::Fits { new_id } => {
internal.children[idx] = new_id.get();
}
ReplaceOutcome::Split { left_id, promoted } => {
internal.children[idx] = left_id.get();
for (i, p) in promoted.into_iter().enumerate() {
internal
.internals
.insert(idx + i, InternalEntry { key: p.key });
internal.children.insert(idx + 1 + i, p.right_id.get());
}
}
}
push_freed(freed, frame.page_id)?;
if internal.occupied_bytes() <= PAYLOAD_BYTES {
let new_id = write_new_node(pager, &internal)?;
return Ok(ReplaceOutcome::Fits { new_id });
}
split_internal(pager, internal)
}
fn build_new_root<F: FileBackend>(
pager: &mut Pager<F>,
mut outcome: ReplaceOutcome,
) -> Result<PageId> {
for _ in 0..MAX_BTREE_DEPTH {
let (left_id, promoted) = match outcome {
ReplaceOutcome::Fits { new_id } => return Ok(new_id),
ReplaceOutcome::Split { left_id, promoted } => (left_id, promoted),
};
let level = node_level_after_split(pager, left_id)?;
let next_level = level.checked_add(1).ok_or(Error::BTreeDepthExceeded {
limit: MAX_BTREE_DEPTH,
})?;
let mut children = Vec::with_capacity(promoted.len() + 1);
let mut internals = Vec::with_capacity(promoted.len());
children.push(left_id.get());
for p in promoted {
internals.push(InternalEntry { key: p.key });
children.push(p.right_id.get());
}
let root_node = DecodedNode {
kind: NodeKind::Internal,
level: next_level,
next_sibling: 0,
children,
leaves: Vec::new(),
internals,
};
if root_node.occupied_bytes() <= PAYLOAD_BYTES {
return write_new_node(pager, &root_node);
}
outcome = split_internal(pager, root_node)?;
}
Err(Error::BTreeDepthExceeded {
limit: MAX_BTREE_DEPTH,
})
}
fn push_freed(freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 2 }>, id: PageId) -> Result<()> {
freed.push(id).map_err(|_| Error::BTreeInvariantViolated {
reason: "insert: too many displaced pages to track",
})
}
fn pivot_index(node: &DecodedNode, key: &[u8]) -> usize {
let mut idx = node.internals.len();
for (i, pivot) in node.internals.iter().enumerate() {
if pivot.key.as_slice() > key {
idx = i;
break;
}
}
idx
}
fn check_key_value_size(key: &[u8], value: &[u8]) -> Result<()> {
if key.len() > max_key_len() {
return Err(Error::BTreeKeyTooLarge {
key_len: key.len(),
max: max_key_len(),
});
}
let v_max = max_inline_value(key.len());
if value.len() > v_max {
return Err(Error::BTreeValueTooLarge {
value_len: value.len(),
max: v_max,
});
}
Ok(())
}
pub(crate) fn write_new_node<F: FileBackend>(
pager: &mut Pager<F>,
node: &DecodedNode,
) -> Result<PageId> {
let new_id = pager.alloc_page()?;
let mut page = Page::zeroed();
encode_node(node, &mut page)?;
pager.write_page(new_id, &page)?;
Ok(new_id)
}
fn leaf_entry_bytes(entry: &LeafEntry) -> usize {
use crate::btree::node::varint_len;
LEAF_SLOT_BYTES
+ varint_len(entry.key.len() as u64)
+ entry.key.len()
+ varint_len(entry.value.len() as u64)
+ entry.value.len()
}
fn internal_pivot_bytes(entry: &InternalEntry) -> usize {
use crate::btree::node::varint_len;
INTERNAL_SLOT_BYTES + varint_len(entry.key.len() as u64) + entry.key.len()
}
fn split_leaf<F: FileBackend>(pager: &mut Pager<F>, leaf: DecodedNode) -> Result<ReplaceOutcome> {
debug_assert!(matches!(leaf.kind, NodeKind::Leaf));
debug_assert!(leaf.leaves.len() >= 2, "leaf split needs ≥ 2 entries");
let original_sibling = leaf.next_sibling;
let groups = pack_leaf_groups(leaf.leaves)?;
write_leaf_groups(pager, groups, original_sibling)
}
fn pack_leaf_groups(entries: Vec<LeafEntry>) -> Result<Vec<Vec<LeafEntry>>> {
let mut groups: Vec<Vec<LeafEntry>> = Vec::new();
let mut current: Vec<LeafEntry> = Vec::new();
let mut current_bytes = 0usize;
for entry in entries {
let entry_bytes = leaf_entry_bytes(&entry);
if !current.is_empty() && current_bytes + entry_bytes > PAYLOAD_BYTES {
groups.push(std::mem::take(&mut current));
current_bytes = 0;
}
current_bytes += entry_bytes;
current.push(entry);
}
if !current.is_empty() {
groups.push(current);
}
if groups.len() < 2 {
return Err(Error::BTreeInvariantViolated {
reason: "leaf split produced fewer than two groups",
});
}
Ok(groups)
}
fn write_leaf_groups<F: FileBackend>(
pager: &mut Pager<F>,
groups: Vec<Vec<LeafEntry>>,
original_sibling: u64,
) -> Result<ReplaceOutcome> {
let count = groups.len();
let mut next_sibling = original_sibling;
let mut ids: Vec<(PageId, Vec<u8>)> = Vec::with_capacity(count);
for (i, entries) in groups.into_iter().enumerate().rev() {
let promoted_key =
entries
.first()
.map(|e| e.key.clone())
.ok_or(Error::BTreeInvariantViolated {
reason: "leaf split produced an empty group",
})?;
let node = DecodedNode {
kind: NodeKind::Leaf,
level: 0,
next_sibling,
children: Vec::new(),
leaves: entries,
internals: Vec::new(),
};
let id = write_new_node(pager, &node)?;
next_sibling = id.get();
let key = if i == 0 { Vec::new() } else { promoted_key };
ids.push((id, key));
}
ids.reverse();
assemble_split_outcome(ids)
}
fn assemble_split_outcome(mut ids: Vec<(PageId, Vec<u8>)>) -> Result<ReplaceOutcome> {
if ids.len() < 2 {
return Err(Error::BTreeInvariantViolated {
reason: "split outcome needs a left node and ≥ 1 promoted sibling",
});
}
let mut iter = ids.drain(..);
let (left_id, _) = iter.next().ok_or(Error::BTreeInvariantViolated {
reason: "split outcome missing left node",
})?;
let mut promoted = Vec::with_capacity(iter.len());
for (right_id, key) in iter {
promoted.push(Promoted { key, right_id });
}
Ok(ReplaceOutcome::Split { left_id, promoted })
}
fn split_internal<F: FileBackend>(
pager: &mut Pager<F>,
internal: DecodedNode,
) -> Result<ReplaceOutcome> {
debug_assert!(matches!(internal.kind, NodeKind::Internal));
debug_assert!(
internal.internals.len() >= 2,
"internal split needs ≥ 2 pivots"
);
let level = internal.level;
let groups = pack_internal_groups(internal.internals, internal.children)?;
write_internal_groups(pager, groups, level)
}
struct InternalGroup {
children: Vec<u64>,
internals: Vec<InternalEntry>,
promoted: Option<Vec<u8>>,
}
fn pack_internal_groups(
pivots: Vec<InternalEntry>,
children: Vec<u64>,
) -> Result<Vec<InternalGroup>> {
if children.len() != pivots.len() + 1 {
return Err(Error::BTreeInvariantViolated {
reason: "internal split: children.len() != pivots+1",
});
}
let mut groups: Vec<InternalGroup> = Vec::new();
let mut cur_children: Vec<u64> = Vec::new();
let mut cur_pivots: Vec<InternalEntry> = Vec::new();
let mut cur_bytes = INTERNAL_LEFTMOST_CHILD_BYTES;
let mut pending_promote: Option<Vec<u8>> = None;
let mut child_iter = children.into_iter();
let leftmost = child_iter.next().ok_or(Error::BTreeInvariantViolated {
reason: "internal split: missing leftmost child",
})?;
cur_children.push(leftmost);
for pivot in pivots {
let right_child = child_iter.next().ok_or(Error::BTreeInvariantViolated {
reason: "internal split: missing right child for pivot",
})?;
let pivot_bytes = internal_pivot_bytes(&pivot);
if !cur_pivots.is_empty() && cur_bytes + pivot_bytes > PAYLOAD_BYTES {
groups.push(InternalGroup {
children: std::mem::take(&mut cur_children),
internals: std::mem::take(&mut cur_pivots),
promoted: pending_promote.take(),
});
pending_promote = Some(pivot.key);
cur_children.push(right_child);
cur_bytes = INTERNAL_LEFTMOST_CHILD_BYTES;
} else {
cur_bytes += pivot_bytes;
cur_pivots.push(pivot);
cur_children.push(right_child);
}
}
groups.push(InternalGroup {
children: cur_children,
internals: cur_pivots,
promoted: pending_promote,
});
if groups.len() < 2 {
return Err(Error::BTreeInvariantViolated {
reason: "internal split produced fewer than two groups",
});
}
Ok(groups)
}
fn write_internal_groups<F: FileBackend>(
pager: &mut Pager<F>,
groups: Vec<InternalGroup>,
level: u8,
) -> Result<ReplaceOutcome> {
let mut left_id: Option<PageId> = None;
let mut promoted: Vec<Promoted> = Vec::new();
for group in groups {
let node = DecodedNode {
kind: NodeKind::Internal,
level,
next_sibling: 0,
children: group.children,
leaves: Vec::new(),
internals: group.internals,
};
let id = write_new_node(pager, &node)?;
match group.promoted {
None => left_id = Some(id),
Some(key) => promoted.push(Promoted { key, right_id: id }),
}
}
let left_id = left_id.ok_or(Error::BTreeInvariantViolated {
reason: "internal split produced no left node",
})?;
if promoted.is_empty() {
return Err(Error::BTreeInvariantViolated {
reason: "internal split produced no promoted siblings",
});
}
Ok(ReplaceOutcome::Split { left_id, promoted })
}
fn node_level_after_split<F: FileBackend>(pager: &mut Pager<F>, id: PageId) -> Result<u8> {
let page_ref = pager.read_page(id)?;
let decoded = decode_node(page_ref.as_bytes())?;
Ok(decoded.level)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pager::{Config, Pager};
use crate::platform::FileHandle;
use proptest::prelude::*;
use rand::prelude::IndexedRandom;
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
use std::collections::BTreeMap;
fn config() -> Config {
Config::default()
}
#[test]
fn insert_single_key_round_trip() {
let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
tree.insert(&mut pager, b"hello", b"world").expect("ins");
assert_eq!(
tree.get(&mut pager, b"hello").expect("get"),
Some(b"world".to_vec())
);
}
#[test]
fn duplicate_key_errors() {
let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
tree.insert(&mut pager, b"k", b"v1").expect("ins");
let err = tree
.insert(&mut pager, b"k", b"v2")
.expect_err("dup must fail");
assert!(matches!(err, Error::BTreeKeyExists));
}
#[test]
fn insert_growth_splits_root() {
let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
let value = vec![0xABu8; 256];
for i in 0..200u32 {
let key = format!("key-{i:08}");
tree.insert(&mut pager, key.as_bytes(), &value)
.expect("ins");
}
for i in 0..200u32 {
let key = format!("key-{i:08}");
assert_eq!(
tree.get(&mut pager, key.as_bytes()).expect("get"),
Some(value.clone()),
"key {key}"
);
}
let root = tree.root();
let page_ref = pager.read_page(root).expect("read root");
let decoded = decode_node(page_ref.as_bytes()).expect("decode root");
assert!(
decoded.level >= 1,
"expected internal root, got {decoded:?}"
);
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 16,
max_shrink_iters: 32,
.. ProptestConfig::default()
})]
#[test]
fn insert_oracle_property(seed in any::<u64>()) {
run_insert_oracle(seed, 200);
}
}
#[test]
fn insert_oracle_10k() {
for seed in 0..3u64 {
run_insert_oracle(seed, 10_000);
}
}
fn run_insert_oracle(seed: u64, ops: usize) {
let mut rng = ChaCha8Rng::seed_from_u64(seed);
let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
let mut oracle: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
for op in 0..ops {
let key = random_key(&mut rng);
let value = random_value(&mut rng);
let key_already = oracle.contains_key(&key);
let res = tree.insert(&mut pager, &key, &value);
if key_already {
assert!(
matches!(res, Err(Error::BTreeKeyExists)),
"seed {seed} op {op}: expected BTreeKeyExists, got {res:?}"
);
} else {
res.unwrap_or_else(|e| panic!("seed {seed} op {op}: insert err {e:?}"));
oracle.insert(key.clone(), value.clone());
}
if op.is_multiple_of(127) {
let keys: Vec<&Vec<u8>> = oracle.keys().collect();
if !keys.is_empty() {
let sample: Vec<&Vec<u8>> =
keys.choose_multiple(&mut rng, 4).copied().collect();
for k in sample {
assert_eq!(
tree.get(&mut pager, k).expect("get").as_ref(),
oracle.get(k),
"seed {seed} op {op}: key {k:?}"
);
}
}
}
}
for (k, v) in &oracle {
assert_eq!(
tree.get(&mut pager, k).expect("get").as_ref(),
Some(v),
"seed {seed} final: key {k:?}"
);
}
}
fn random_key(rng: &mut ChaCha8Rng) -> Vec<u8> {
use rand::Rng;
let len = rng.random_range(1..16);
(0..len).map(|_| rng.random_range(b'a'..=b'z')).collect()
}
fn random_value(rng: &mut ChaCha8Rng) -> Vec<u8> {
use rand::Rng;
let len = rng.random_range(0..64);
(0..len).map(|_| rng.random()).collect()
}
use crate::btree::node::{validate_node, NODE_HEADER_SIZE};
const TEST_PAYLOAD_BYTES: usize = PAGE_SIZE - PAGE_TRAILER_SIZE - NODE_HEADER_SIZE;
fn fuzz_key(rng: &mut ChaCha8Rng) -> Vec<u8> {
use rand::Rng;
let cap = max_key_len();
let len = match rng.random_range(0u32..100) {
0..=84 => rng.random_range(1..=16usize),
85..=96 => rng.random_range(16..=256usize),
_ => rng.random_range(256..=cap),
};
let len = len.clamp(1, cap);
(0..len).map(|_| rng.random::<u8>()).collect()
}
fn fuzz_value(rng: &mut ChaCha8Rng, key_len: usize) -> Vec<u8> {
use rand::Rng;
let cap = max_inline_value(key_len);
if cap == 0 {
return Vec::new();
}
let len = match rng.random_range(0u32..100) {
0..=49 => rng.random_range(0..=64usize),
50..=79 => rng.random_range(64..=512usize),
_ => rng.random_range(512..=cap.max(512)),
};
let len = len.min(cap);
(0..len).map(|_| rng.random::<u8>()).collect()
}
fn assert_tree_invariants(pager: &mut Pager<FileHandle>, root: PageId) {
let mut stack: Vec<PageId> = vec![root];
let budget = usize::try_from(pager.page_count())
.unwrap_or(usize::MAX)
.saturating_add(1);
let mut visited = 0usize;
while let Some(id) = stack.pop() {
visited += 1;
assert!(visited <= budget, "node walk exceeded page-count bound");
let node = {
let pr = pager.read_page(id).expect("read node");
decode_node(pr.as_bytes()).expect("decode node")
};
validate_node(&node).expect("node must satisfy validate_node");
assert!(
node.occupied_bytes() <= TEST_PAYLOAD_BYTES,
"node {id:?} occupies {} bytes > payload cap {} — a split \
left an overflowing node (issue #137 regression)",
node.occupied_bytes(),
TEST_PAYLOAD_BYTES,
);
if matches!(node.kind, NodeKind::Internal) {
for &child in &node.children {
stack.push(PageId::new(child).expect("child id nonzero"));
}
}
}
}
fn run_split_oracle(seed: u64, ops: usize) {
let mut rng = ChaCha8Rng::seed_from_u64(seed);
let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
let mut oracle: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
for op in 0..ops {
let key = fuzz_key(&mut rng);
let value = fuzz_value(&mut rng, key.len());
let already = oracle.contains_key(&key);
let res = tree.insert(&mut pager, &key, &value);
match res {
Ok(()) => {
assert!(!already, "seed {seed} op {op}: dup insert unexpectedly Ok");
oracle.insert(key, value);
}
Err(Error::BTreeKeyExists) => {
assert!(already, "seed {seed} op {op}: spurious BTreeKeyExists");
}
Err(e) => panic!(
"seed {seed} op {op}: unexpected insert error {e:?} \
(key_len={}, value_len={})",
key.len(),
value.len(),
),
}
if op.is_multiple_of(521) {
assert_tree_invariants(&mut pager, tree.root());
}
}
for (k, v) in &oracle {
assert_eq!(
tree.get(&mut pager, k).expect("get").as_ref(),
Some(v),
"seed {seed} final: key {k:?} value mismatch",
);
}
let scanned: Vec<(Vec<u8>, Vec<u8>)> = tree
.range(&mut pager, ..)
.expect("range")
.map(|r| r.expect("range item"))
.collect();
let expected: Vec<(Vec<u8>, Vec<u8>)> =
oracle.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
assert_eq!(
scanned.len(),
expected.len(),
"seed {seed}: range scan count disagrees with oracle",
);
assert_eq!(
scanned, expected,
"seed {seed}: ordered range scan mismatch"
);
assert_tree_invariants(&mut pager, tree.root());
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 24,
max_shrink_iters: 16,
.. ProptestConfig::default()
})]
#[test]
fn split_oracle_property(seed in any::<u64>()) {
run_split_oracle(seed, 600);
}
}
#[test]
fn split_oracle_large_values_10k() {
for seed in 0..5u64 {
run_split_oracle(seed, 10_000);
}
}
#[test]
fn issue_137_large_entry_into_populated_leaf() {
let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
for i in 0..30u32 {
let key = format!("k{i:04}");
let value = vec![0x5Au8; 8];
tree.insert(&mut pager, key.as_bytes(), &value)
.expect("small insert");
}
let big_key = vec![b'm'; 16];
let big_value = vec![0xA5u8; 1_800];
tree.insert(&mut pager, &big_key, &big_value)
.expect("large insert must succeed (issue #137)");
let big_key2 = vec![b'n'; 16];
let big_value2 = vec![0xC3u8; max_inline_value(big_key2.len())];
tree.insert(&mut pager, &big_key2, &big_value2)
.expect("near-max large insert must succeed");
assert_eq!(
tree.get(&mut pager, &big_key).expect("get"),
Some(big_value)
);
assert_eq!(
tree.get(&mut pager, &big_key2).expect("get"),
Some(big_value2)
);
for i in 0..30u32 {
let key = format!("k{i:04}");
assert_eq!(
tree.get(&mut pager, key.as_bytes()).expect("get"),
Some(vec![0x5Au8; 8]),
"small key {key} must survive the split",
);
}
assert_tree_invariants(&mut pager, tree.root());
}
#[test]
fn issue_137_large_entry_in_the_middle_multiway() {
let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
for i in 0..20u32 {
let key = format!("a{i:03}");
let value = vec![0x11u8; 180];
tree.insert(&mut pager, key.as_bytes(), &value)
.expect("fill");
}
let mid_key = b"a0095".to_vec();
let mid_value = vec![0x22u8; max_inline_value(mid_key.len())];
tree.insert(&mut pager, &mid_key, &mid_value)
.expect("mid large insert must succeed");
assert_eq!(
tree.get(&mut pager, &mid_key).expect("get"),
Some(mid_value)
);
assert_tree_invariants(&mut pager, tree.root());
}
#[test]
fn split_then_delete_integrity_walk() {
use rand::Rng;
for seed in 0..4u64 {
let mut rng = ChaCha8Rng::seed_from_u64(seed);
let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
let mut oracle: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
for _ in 0..6_000usize {
let deleting = !oracle.is_empty() && rng.random_range(0u32..3) == 0;
if deleting {
let n = oracle.len();
let i = rng.random_range(0..n);
let cand = oracle.keys().nth(i).cloned().unwrap_or_default();
let want = oracle.remove(&cand).is_some();
let got = tree.delete(&mut pager, &cand).expect("delete");
assert_eq!(got, want, "seed {seed}: delete presence");
} else {
let key = fuzz_key(&mut rng);
let value = fuzz_value(&mut rng, key.len());
if let std::collections::btree_map::Entry::Vacant(slot) =
oracle.entry(key.clone())
{
tree.insert(&mut pager, &key, &value).expect("insert");
slot.insert(value);
}
}
}
for (k, v) in &oracle {
assert_eq!(
tree.get(&mut pager, k).expect("get").as_ref(),
Some(v),
"seed {seed}: surviving key {k:?}",
);
}
assert_tree_invariants(&mut pager, tree.root());
}
}
}