#![forbid(unsafe_code)]
use core::ops::{Bound, RangeBounds};
use crate::btree::node::{decode_node, DecodedNode, NodeKind};
use crate::btree::{BTree, MAX_BTREE_DEPTH, MAX_RANGE_NODES};
use crate::error::{Error, Result};
use crate::pager::page::PageId;
use crate::pager::Pager;
use crate::platform::FileBackend;
use heapless::Vec as HeaplessVec;
pub struct RangeIter<'a, F: FileBackend> {
pager: &'a mut Pager<F>,
root: PageId,
current_leaf: Option<DecodedNode>,
slot_index: usize,
start_bound: Bound<Vec<u8>>,
end_bound: Bound<Vec<u8>>,
last_emitted_key: Option<Vec<u8>>,
nodes_visited: usize,
finished: bool,
}
impl<F: FileBackend> std::fmt::Debug for RangeIter<'_, F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RangeIter")
.field("slot_index", &self.slot_index)
.field("start_bound", &self.start_bound)
.field("end_bound", &self.end_bound)
.field("nodes_visited", &self.nodes_visited)
.field("finished", &self.finished)
.finish()
}
}
impl<F: FileBackend> BTree<F> {
pub fn range<'a, R>(&self, pager: &'a mut Pager<F>, range: R) -> Result<RangeIter<'a, F>>
where
R: RangeBounds<Vec<u8>>,
{
let start_bound = clone_bound_vec(range.start_bound());
let end_bound = clone_bound_vec(range.end_bound());
Self::build_range_iter(pager, self.root, start_bound, end_bound)
}
#[allow(clippy::iter_not_returning_iterator)] pub fn iter<'a>(&self, pager: &'a mut Pager<F>) -> Result<RangeIter<'a, F>> {
self.range(pager, ..)
}
fn build_range_iter(
pager: &mut Pager<F>,
root: PageId,
start_bound: Bound<Vec<u8>>,
end_bound: Bound<Vec<u8>>,
) -> Result<RangeIter<'_, F>> {
let (leaf, slot_index, nodes_visited) =
locate_first_in_range_leaf(pager, root, &start_bound)?;
let finished = slot_index >= leaf.leaves.len();
Ok(RangeIter {
pager,
root,
current_leaf: Some(leaf),
slot_index,
start_bound,
end_bound,
last_emitted_key: None,
nodes_visited,
finished,
})
}
pub fn range_via_snapshot<'a, R>(
pager: &'a Pager<F>,
snapshot: &'a crate::pager::ReaderSnapshot<F>,
root: PageId,
range: R,
) -> Result<SnapshotRangeIter<'a, F>>
where
R: RangeBounds<Vec<u8>>,
{
let start_bound = clone_bound_vec(range.start_bound());
let end_bound = clone_bound_vec(range.end_bound());
let (leaf, slot_index, nodes_visited) =
snap_locate_first_in_range_leaf(pager, snapshot, root, &start_bound)?;
let finished = slot_index >= leaf.leaves.len();
Ok(SnapshotRangeIter {
pager,
snapshot,
root,
current_leaf: Some(leaf),
slot_index,
start_bound,
end_bound,
last_emitted_key: None,
nodes_visited,
finished,
})
}
}
fn locate_first_in_range_leaf<F: FileBackend>(
pager: &mut Pager<F>,
root: PageId,
start_bound: &Bound<Vec<u8>>,
) -> Result<(DecodedNode, usize, usize)> {
let descend_key = match start_bound {
Bound::Included(k) | Bound::Excluded(k) => k.as_slice(),
Bound::Unbounded => &[][..],
};
let leaf_id = descend_to_start_leaf(pager, root, descend_key)?;
let mut leaf = read_leaf(pager, leaf_id)?;
let mut slot_index = position_in_leaf(&leaf, start_bound);
let mut nodes_visited: usize = 1;
while slot_index >= leaf.leaves.len() {
let Some(last_key) = leaf.leaves.last().map(|e| e.key.clone()) else {
break;
};
nodes_visited += 1;
if nodes_visited > MAX_RANGE_NODES {
return Err(Error::BTreeScanLimitExceeded {
limit: MAX_RANGE_NODES,
});
}
let Some(next_id) = descend_to_leaf_after(pager, root, last_key.as_slice())? else {
break;
};
leaf = read_leaf(pager, next_id)?;
slot_index = position_in_leaf(&leaf, start_bound);
}
Ok((leaf, slot_index, nodes_visited))
}
fn clone_bound_vec(b: Bound<&Vec<u8>>) -> Bound<Vec<u8>> {
match b {
Bound::Included(s) => Bound::Included(s.clone()),
Bound::Excluded(s) => Bound::Excluded(s.clone()),
Bound::Unbounded => Bound::Unbounded,
}
}
fn descend_to_start_leaf<F: FileBackend>(
pager: &mut Pager<F>,
root: PageId,
key: &[u8],
) -> Result<PageId> {
let mut path: HeaplessVec<PageId, MAX_BTREE_DEPTH> = HeaplessVec::new();
let mut current = root;
loop {
path.push(current).map_err(|_| Error::BTreeDepthExceeded {
limit: MAX_BTREE_DEPTH,
})?;
let decoded = {
let pr = pager.read_page(current)?;
decode_node(pr.as_bytes())?
};
match decoded.kind {
NodeKind::Leaf => return Ok(current),
NodeKind::Internal => {
let idx = pivot_index_for_start(&decoded, key);
current =
PageId::new(decoded.children[idx]).ok_or(Error::BTreeInvariantViolated {
reason: "range descent: zero child page-id",
})?;
}
}
}
}
fn pivot_index_for_start(node: &DecodedNode, key: &[u8]) -> usize {
let mut idx = node.internals.len();
for (i, p) in node.internals.iter().enumerate() {
if p.key.as_slice() > key {
idx = i;
break;
}
}
idx
}
fn read_leaf<F: FileBackend>(pager: &mut Pager<F>, id: PageId) -> Result<DecodedNode> {
let pr = pager.read_page(id)?;
let decoded = decode_node(pr.as_bytes())?;
if !matches!(decoded.kind, NodeKind::Leaf) {
return Err(Error::BTreeInvariantViolated {
reason: "range: expected leaf",
});
}
Ok(decoded)
}
fn position_in_leaf(leaf: &DecodedNode, start: &Bound<Vec<u8>>) -> usize {
match start {
Bound::Unbounded => 0,
Bound::Included(k) => leaf
.leaves
.iter()
.position(|e| e.key.as_slice() >= k.as_slice())
.unwrap_or(leaf.leaves.len()),
Bound::Excluded(k) => leaf
.leaves
.iter()
.position(|e| e.key.as_slice() > k.as_slice())
.unwrap_or(leaf.leaves.len()),
}
}
fn within_end(key: &[u8], end: &Bound<Vec<u8>>) -> bool {
match end {
Bound::Unbounded => true,
Bound::Included(k) => key <= k.as_slice(),
Bound::Excluded(k) => key < k.as_slice(),
}
}
impl<F: FileBackend> Iterator for RangeIter<'_, F> {
type Item = Result<(Vec<u8>, Vec<u8>)>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
loop {
let leaf = self.current_leaf.as_ref()?;
if self.slot_index < leaf.leaves.len() {
let entry = &leaf.leaves[self.slot_index];
if !within_end(entry.key.as_slice(), &self.end_bound) {
self.finished = true;
return None;
}
let item = (entry.key.clone(), entry.value.clone());
self.slot_index += 1;
self.last_emitted_key = Some(item.0.clone());
return Some(Ok(item));
}
match self.advance_to_next_leaf() {
Ok(true) => (),
Ok(false) => {
self.finished = true;
return None;
}
Err(e) => {
self.finished = true;
return Some(Err(e));
}
}
}
}
}
impl<F: FileBackend> RangeIter<'_, F> {
fn advance_to_next_leaf(&mut self) -> Result<bool> {
let Some(last) = self.last_emitted_key.clone() else {
return Ok(false);
};
self.nodes_visited += 1;
if self.nodes_visited > MAX_RANGE_NODES {
return Err(Error::BTreeScanLimitExceeded {
limit: MAX_RANGE_NODES,
});
}
let Some(leaf_id) = descend_to_leaf_after(self.pager, self.root, last.as_slice())? else {
return Ok(false);
};
let leaf = read_leaf(self.pager, leaf_id)?;
let slot_index = leaf
.leaves
.iter()
.position(|e| e.key.as_slice() > last.as_slice())
.unwrap_or(leaf.leaves.len());
if slot_index == leaf.leaves.len() {
return Ok(false);
}
self.current_leaf = Some(leaf);
self.slot_index = slot_index;
Ok(true)
}
}
fn descend_to_leaf_after<F: FileBackend>(
pager: &mut Pager<F>,
root: PageId,
key: &[u8],
) -> Result<Option<PageId>> {
let mut frames: HeaplessVec<DescendFrame, MAX_BTREE_DEPTH> = HeaplessVec::new();
let mut current = root;
loop {
let decoded = {
let pr = pager.read_page(current)?;
decode_node(pr.as_bytes())?
};
match decoded.kind {
NodeKind::Leaf => {
if decoded.leaves.iter().any(|e| e.key.as_slice() > key) {
return Ok(Some(current));
}
break;
}
NodeKind::Internal => {
let child_index = pivot_index_for_start(&decoded, key);
let raw = decoded.children[child_index];
frames
.push(DescendFrame {
node: decoded,
child_index,
})
.map_err(|_| Error::BTreeDepthExceeded {
limit: MAX_BTREE_DEPTH,
})?;
current = PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
reason: "descend_to_leaf_after: zero child id",
})?;
}
}
}
while let Some(frame) = frames.pop() {
let next_child = frame.child_index + 1;
if next_child < frame.node.children.len() {
let raw = frame.node.children[next_child];
let next_root = PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
reason: "descend_to_leaf_after: zero next-child id",
})?;
return descend_leftmost_leaf(pager, next_root).map(Some);
}
}
Ok(None)
}
struct DescendFrame {
node: DecodedNode,
child_index: usize,
}
fn descend_leftmost_leaf<F: FileBackend>(pager: &mut Pager<F>, root: PageId) -> Result<PageId> {
let mut path: HeaplessVec<PageId, MAX_BTREE_DEPTH> = HeaplessVec::new();
let mut current = root;
loop {
path.push(current).map_err(|_| Error::BTreeDepthExceeded {
limit: MAX_BTREE_DEPTH,
})?;
let decoded = {
let pr = pager.read_page(current)?;
decode_node(pr.as_bytes())?
};
match decoded.kind {
NodeKind::Leaf => return Ok(current),
NodeKind::Internal => {
let raw = decoded.children[0];
current = PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
reason: "descend_leftmost_leaf: zero child id",
})?;
}
}
}
}
pub struct SnapshotRangeIter<'a, F: FileBackend> {
pager: &'a Pager<F>,
snapshot: &'a crate::pager::ReaderSnapshot<F>,
root: PageId,
current_leaf: Option<DecodedNode>,
slot_index: usize,
start_bound: Bound<Vec<u8>>,
end_bound: Bound<Vec<u8>>,
last_emitted_key: Option<Vec<u8>>,
nodes_visited: usize,
finished: bool,
}
impl<F: FileBackend> std::fmt::Debug for SnapshotRangeIter<'_, F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SnapshotRangeIter")
.field("slot_index", &self.slot_index)
.field("start_bound", &self.start_bound)
.field("end_bound", &self.end_bound)
.field("nodes_visited", &self.nodes_visited)
.field("finished", &self.finished)
.finish()
}
}
impl<F: FileBackend> Iterator for SnapshotRangeIter<'_, F> {
type Item = Result<(Vec<u8>, Vec<u8>)>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
loop {
let leaf = self.current_leaf.as_ref()?;
if self.slot_index < leaf.leaves.len() {
let entry = &leaf.leaves[self.slot_index];
if !within_end(entry.key.as_slice(), &self.end_bound) {
self.finished = true;
return None;
}
let item = (entry.key.clone(), entry.value.clone());
self.slot_index += 1;
self.last_emitted_key = Some(item.0.clone());
return Some(Ok(item));
}
match self.advance_to_next_leaf() {
Ok(true) => (),
Ok(false) => {
self.finished = true;
return None;
}
Err(e) => {
self.finished = true;
return Some(Err(e));
}
}
}
}
}
impl<F: FileBackend> SnapshotRangeIter<'_, F> {
fn advance_to_next_leaf(&mut self) -> Result<bool> {
let Some(last) = self.last_emitted_key.clone() else {
return Ok(false);
};
self.nodes_visited += 1;
if self.nodes_visited > MAX_RANGE_NODES {
return Err(Error::BTreeScanLimitExceeded {
limit: MAX_RANGE_NODES,
});
}
let Some(leaf_id) =
snap_descend_to_leaf_after(self.pager, self.snapshot, self.root, last.as_slice())?
else {
return Ok(false);
};
let leaf = snap_read_leaf(self.pager, self.snapshot, leaf_id)?;
let slot_index = leaf
.leaves
.iter()
.position(|e| e.key.as_slice() > last.as_slice())
.unwrap_or(leaf.leaves.len());
if slot_index == leaf.leaves.len() {
return Ok(false);
}
self.current_leaf = Some(leaf);
self.slot_index = slot_index;
Ok(true)
}
}
fn snap_locate_first_in_range_leaf<F: FileBackend>(
pager: &Pager<F>,
snapshot: &crate::pager::ReaderSnapshot<F>,
root: PageId,
start_bound: &Bound<Vec<u8>>,
) -> Result<(DecodedNode, usize, usize)> {
let descend_key = match start_bound {
Bound::Included(k) | Bound::Excluded(k) => k.as_slice(),
Bound::Unbounded => &[][..],
};
let leaf_id = snap_descend_to_start_leaf(pager, snapshot, root, descend_key)?;
let mut leaf = snap_read_leaf(pager, snapshot, leaf_id)?;
let mut slot_index = position_in_leaf(&leaf, start_bound);
let mut nodes_visited: usize = 1;
while slot_index >= leaf.leaves.len() {
let Some(last_key) = leaf.leaves.last().map(|e| e.key.clone()) else {
break;
};
nodes_visited += 1;
if nodes_visited > MAX_RANGE_NODES {
return Err(Error::BTreeScanLimitExceeded {
limit: MAX_RANGE_NODES,
});
}
let Some(next_id) = snap_descend_to_leaf_after(pager, snapshot, root, last_key.as_slice())?
else {
break;
};
leaf = snap_read_leaf(pager, snapshot, next_id)?;
slot_index = position_in_leaf(&leaf, start_bound);
}
Ok((leaf, slot_index, nodes_visited))
}
fn snap_descend_to_start_leaf<F: FileBackend>(
pager: &Pager<F>,
snapshot: &crate::pager::ReaderSnapshot<F>,
root: PageId,
key: &[u8],
) -> Result<PageId> {
let mut path: HeaplessVec<PageId, MAX_BTREE_DEPTH> = HeaplessVec::new();
let mut current = root;
loop {
path.push(current).map_err(|_| Error::BTreeDepthExceeded {
limit: MAX_BTREE_DEPTH,
})?;
let page = snapshot.read_page(pager, current)?;
let decoded = decode_node(page.as_bytes())?;
match decoded.kind {
NodeKind::Leaf => return Ok(current),
NodeKind::Internal => {
let idx = pivot_index_for_start(&decoded, key);
current =
PageId::new(decoded.children[idx]).ok_or(Error::BTreeInvariantViolated {
reason: "range descent: zero child page-id",
})?;
}
}
}
}
fn snap_read_leaf<F: FileBackend>(
pager: &Pager<F>,
snapshot: &crate::pager::ReaderSnapshot<F>,
id: PageId,
) -> Result<DecodedNode> {
let page = snapshot.read_page(pager, id)?;
let decoded = decode_node(page.as_bytes())?;
if !matches!(decoded.kind, NodeKind::Leaf) {
return Err(Error::BTreeInvariantViolated {
reason: "range: expected leaf",
});
}
Ok(decoded)
}
fn snap_descend_to_leaf_after<F: FileBackend>(
pager: &Pager<F>,
snapshot: &crate::pager::ReaderSnapshot<F>,
root: PageId,
key: &[u8],
) -> Result<Option<PageId>> {
let mut frames: HeaplessVec<DescendFrame, MAX_BTREE_DEPTH> = HeaplessVec::new();
let mut current = root;
loop {
let page = snapshot.read_page(pager, current)?;
let decoded = decode_node(page.as_bytes())?;
match decoded.kind {
NodeKind::Leaf => {
if decoded.leaves.iter().any(|e| e.key.as_slice() > key) {
return Ok(Some(current));
}
break;
}
NodeKind::Internal => {
let child_index = pivot_index_for_start(&decoded, key);
let raw = decoded.children[child_index];
frames
.push(DescendFrame {
node: decoded,
child_index,
})
.map_err(|_| Error::BTreeDepthExceeded {
limit: MAX_BTREE_DEPTH,
})?;
current = PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
reason: "descend_to_leaf_after: zero child id",
})?;
}
}
}
while let Some(frame) = frames.pop() {
let next_child = frame.child_index + 1;
if next_child < frame.node.children.len() {
let raw = frame.node.children[next_child];
let next_root = PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
reason: "descend_to_leaf_after: zero next-child id",
})?;
return snap_descend_leftmost_leaf(pager, snapshot, next_root).map(Some);
}
}
Ok(None)
}
fn snap_descend_leftmost_leaf<F: FileBackend>(
pager: &Pager<F>,
snapshot: &crate::pager::ReaderSnapshot<F>,
root: PageId,
) -> Result<PageId> {
let mut path: HeaplessVec<PageId, MAX_BTREE_DEPTH> = HeaplessVec::new();
let mut current = root;
loop {
path.push(current).map_err(|_| Error::BTreeDepthExceeded {
limit: MAX_BTREE_DEPTH,
})?;
let page = snapshot.read_page(pager, current)?;
let decoded = decode_node(page.as_bytes())?;
match decoded.kind {
NodeKind::Leaf => return Ok(current),
NodeKind::Internal => {
let raw = decoded.children[0];
current = PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
reason: "descend_leftmost_leaf: zero child id",
})?;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pager::{Config, Pager};
use crate::platform::FileHandle;
use rand::Rng;
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
use std::collections::BTreeMap;
use std::ops::Bound;
fn config() -> Config {
Config::default()
}
fn collect_all(
pager: &mut Pager<FileHandle>,
tree: &BTree<FileHandle>,
) -> Vec<(Vec<u8>, Vec<u8>)> {
let iter = tree.iter(pager).expect("iter");
iter.map(|r| r.expect("step")).collect()
}
#[test]
fn iter_empty_tree() {
let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
let tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
let v = collect_all(&mut pager, &tree);
assert!(v.is_empty());
}
#[test]
fn iter_single_leaf() {
let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
for (k, v) in [("alpha", "A"), ("bravo", "B"), ("charlie", "C")] {
tree.insert(&mut pager, k.as_bytes(), v.as_bytes())
.expect("ins");
}
let got = collect_all(&mut pager, &tree);
let want: Vec<(Vec<u8>, Vec<u8>)> = [("alpha", "A"), ("bravo", "B"), ("charlie", "C")]
.iter()
.map(|(k, v)| (k.as_bytes().to_vec(), v.as_bytes().to_vec()))
.collect();
assert_eq!(got, want);
}
#[test]
fn range_across_leaf_split_boundary() {
let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
let value = vec![0xAAu8; 256];
for i in 0..50u32 {
let key = format!("key-{i:08}");
tree.insert(&mut pager, key.as_bytes(), &value)
.expect("ins");
}
let root = tree.root();
let root_decoded = {
let pr = pager.read_page(root).expect("read");
decode_node(pr.as_bytes()).expect("dec")
};
assert!(root_decoded.level >= 1, "expected root split");
let iter = tree.iter(&mut pager).expect("iter");
let mut prev: Option<Vec<u8>> = None;
let mut count = 0;
for item in iter {
let (k, _v) = item.expect("step");
if let Some(p) = &prev {
assert!(p.as_slice() < k.as_slice(), "non-monotonic at {k:?}");
}
prev = Some(k);
count += 1;
}
assert_eq!(count, 50);
}
#[test]
fn range_bounded_subset() {
let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
for i in 0..20u32 {
let key = format!("k-{i:03}");
tree.insert(&mut pager, key.as_bytes(), b"v").expect("ins");
}
let iter = tree
.range(
&mut pager,
(
Bound::Included(b"k-005".to_vec()),
Bound::Excluded(b"k-010".to_vec()),
),
)
.expect("range");
let got: Vec<String> = iter
.map(|r| String::from_utf8(r.expect("step").0).expect("utf8"))
.collect();
assert_eq!(got, vec!["k-005", "k-006", "k-007", "k-008", "k-009"]);
}
#[test]
fn range_oracle_1000_ops() {
for seed in 0..3u64 {
run_range_oracle(seed, 1_000);
}
}
#[test]
fn range_start_bound_past_landing_leaf_end() {
let (mut pager, tree, value) = build_split_fixture();
let (start_key, want) = construct_past_landing_leaf_query(&mut pager, &tree, &value);
let iter = tree
.range(
&mut pager,
(Bound::Included(start_key), Bound::<Vec<u8>>::Unbounded),
)
.expect("range");
let got: Vec<(Vec<u8>, Vec<u8>)> = iter.map(|r| r.expect("step")).collect();
assert_eq!(got, want, "iterator must advance past empty landing leaf");
}
fn build_split_fixture() -> (Pager<FileHandle>, BTree<FileHandle>, Vec<u8>) {
let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
let value = vec![0xCDu8; 256];
for i in 0..50u32 {
let key = format!("key-{i:08}");
tree.insert(&mut pager, key.as_bytes(), &value)
.expect("ins");
}
(pager, tree, value)
}
type ExpectedRangeResult = Vec<(Vec<u8>, Vec<u8>)>;
fn construct_past_landing_leaf_query(
pager: &mut Pager<FileHandle>,
tree: &BTree<FileHandle>,
value: &[u8],
) -> (Vec<u8>, ExpectedRangeResult) {
let root_decoded = {
let pr = pager.read_page(tree.root()).expect("read");
decode_node(pr.as_bytes()).expect("dec")
};
assert!(root_decoded.level >= 1, "fixture must split the tree");
let leftmost_id = PageId::new(root_decoded.children[0]).expect("nz");
let leftmost = read_leaf(pager, leftmost_id).expect("leaf");
let last_in_leftmost = leftmost.leaves.last().expect("non-empty").key.clone();
let pivot = root_decoded.internals[0].key.clone();
let mut start_key = last_in_leftmost.clone();
start_key.push(0);
assert!(start_key.as_slice() > last_in_leftmost.as_slice());
assert!(start_key.as_slice() < pivot.as_slice());
let oracle: BTreeMap<Vec<u8>, Vec<u8>> = (0..50u32)
.map(|i| (format!("key-{i:08}").into_bytes(), value.to_vec()))
.collect();
let want: Vec<(Vec<u8>, Vec<u8>)> = oracle
.range::<Vec<u8>, _>((Bound::Included(&start_key), Bound::<&Vec<u8>>::Unbounded))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
assert!(!want.is_empty(), "oracle must return sibling-leaf entries");
(start_key, want)
}
fn run_range_oracle(seed: u64, ops: usize) {
let mut rng = ChaCha8Rng::seed_from_u64(seed);
let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
let mut oracle: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
for op in 0..ops {
match rng.random_range(0u32..6) {
0..=2 => insert_step(&mut pager, &mut tree, &mut oracle, &mut rng, seed, op),
3 => delete_step(&mut pager, &mut tree, &mut oracle, &mut rng, seed, op),
_ => range_check(&mut pager, &tree, &oracle, &mut rng, seed, op),
}
}
let got = collect_all(&mut pager, &tree);
let want: Vec<(Vec<u8>, Vec<u8>)> =
oracle.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
assert_eq!(got, want, "seed {seed}: final iter mismatch");
}
fn insert_step(
pager: &mut Pager<FileHandle>,
tree: &mut BTree<FileHandle>,
oracle: &mut BTreeMap<Vec<u8>, Vec<u8>>,
rng: &mut ChaCha8Rng,
seed: u64,
op: usize,
) {
let key = random_key(rng);
let value = random_value(rng);
if let std::collections::btree_map::Entry::Vacant(slot) = oracle.entry(key.clone()) {
tree.insert(pager, &key, &value)
.unwrap_or_else(|e| panic!("seed {seed} op {op} ins: {e:?}"));
slot.insert(value);
}
}
fn delete_step(
pager: &mut Pager<FileHandle>,
tree: &mut BTree<FileHandle>,
oracle: &mut BTreeMap<Vec<u8>, Vec<u8>>,
rng: &mut ChaCha8Rng,
seed: u64,
op: usize,
) {
if oracle.is_empty() {
return;
}
let pick = rng.random_range(0..oracle.len());
let key = oracle.keys().nth(pick).cloned().unwrap_or_default();
oracle.remove(&key);
tree.delete(pager, &key)
.unwrap_or_else(|e| panic!("seed {seed} op {op} del: {e:?}"));
}
fn range_check(
pager: &mut Pager<FileHandle>,
tree: &BTree<FileHandle>,
oracle: &BTreeMap<Vec<u8>, Vec<u8>>,
rng: &mut ChaCha8Rng,
seed: u64,
op: usize,
) {
let (start, end) = random_bounds(rng);
if !bounds_well_ordered(&start, &end) {
return;
}
let iter = tree
.range(pager, (start.clone(), end.clone()))
.expect("range");
let got: Vec<(Vec<u8>, Vec<u8>)> = iter.map(|r| r.expect("step")).collect();
let want: Vec<(Vec<u8>, Vec<u8>)> = oracle
.range::<Vec<u8>, _>((start.as_ref(), end.as_ref()))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
assert_eq!(got, want, "seed {seed} op {op}: range mismatch");
}
fn bounds_well_ordered(start: &Bound<Vec<u8>>, end: &Bound<Vec<u8>>) -> bool {
match (start, end) {
(Bound::Unbounded, _) | (_, Bound::Unbounded) => true,
(Bound::Excluded(s), Bound::Excluded(e)) => s.as_slice() < e.as_slice(),
(Bound::Included(s) | Bound::Excluded(s), Bound::Included(e) | Bound::Excluded(e)) => {
s.as_slice() <= e.as_slice()
}
}
}
fn random_bounds(rng: &mut ChaCha8Rng) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
let s = match rng.random_range(0u32..3) {
0 => Bound::Unbounded,
1 => Bound::Included(random_key(rng)),
_ => Bound::Excluded(random_key(rng)),
};
let e = match rng.random_range(0u32..3) {
0 => Bound::Unbounded,
1 => Bound::Included(random_key(rng)),
_ => Bound::Excluded(random_key(rng)),
};
(s, e)
}
fn random_key(rng: &mut ChaCha8Rng) -> Vec<u8> {
let len = rng.random_range(1..6);
(0..len).map(|_| rng.random_range(b'a'..=b'c')).collect()
}
fn random_value(rng: &mut ChaCha8Rng) -> Vec<u8> {
let len = rng.random_range(0..16);
(0..len).map(|_| rng.random()).collect()
}
}