Skip to main content

obj_core/btree/
range.rs

1//! Forward range scan iterator.
2//!
3//! See `docs/format.md` § B+tree write semantics for the snapshot
4//! contract this iterator implements. In summary: the iterator
5//! captures the tree's root page-id at construction; subsequent
6//! writes (which COW the touched pages) cannot disturb the
7//! iterator's traversal because every page the iterator references
8//! remains intact on disk for the lifetime of the iterator. Real
9//! MVCC reader snapshots across concurrent writers arrive in M6.
10//!
11//! # Navigation strategy
12//!
13//! The iterator advances from one leaf to the next by **descending
14//! from the snapshot root** with the search key set to "the
15//! smallest key strictly greater than the last emitted key". It
16//! does NOT follow leaf `next_sibling` pointers.
17//!
18//! Why not sibling pointers? In a copy-on-write B+tree the page-id
19//! of a leaf changes every time the leaf is rewritten. A left-
20//! neighbour leaf that was NOT touched by the rewrite still carries
21//! the OLD `next_sibling` page-id, which now points at a freed page
22//! (or, post-recycling, at a completely different node). Updating
23//! every potentially-affected sibling on every COW would cascade
24//! the rewrite up arbitrarily; descending from the snapshot root
25//! is O(log n) per leaf-step but always correct.
26//!
27//! `next_sibling` is still part of the on-disk node header (per
28//! `docs/format.md` § Node header) and is set correctly by every
29//! split, but M4 readers do not use it. M6 MVCC reader snapshots
30//! may revisit the trade-off when the storage engine can identify
31//! a snapshot under which sibling pointers are stable.
32//!
33//! # Power-of-ten posture
34//!
35//! - **Rule 1.** Descent uses the same explicit stack as
36//!   `BTree::get`; the iterator's per-step transition is a
37//!   bounded-depth descent.
38//! - **Rule 2.** Each iterator carries a `nodes_visited` counter
39//!   and surfaces `Error::BTreeScanLimitExceeded` past
40//!   `MAX_RANGE_NODES = 1_000_000`.
41//! - **Rule 7.** Per-step errors (decode failures, depth-bound
42//!   trips, scan-limit trips, I/O failures from the pager) are
43//!   surfaced in-band as `Some(Err(...))`; the iterator latches
44//!   itself shut after the first error so subsequent calls return
45//!   `None`.
46
47#![forbid(unsafe_code)]
48
49use core::ops::{Bound, RangeBounds};
50
51use crate::btree::node::{decode_node, DecodedNode, NodeKind};
52use crate::btree::{BTree, MAX_BTREE_DEPTH, MAX_RANGE_NODES};
53use crate::error::{Error, Result};
54use crate::pager::page::PageId;
55use crate::pager::Pager;
56use crate::platform::FileBackend;
57
58use heapless::Vec as HeaplessVec;
59
60/// A forward iterator over `(key, value)` pairs in a B+tree.
61///
62/// Implements `Iterator<Item = Result<(Vec<u8>, Vec<u8>)>>`.
63/// Returns items in ascending key order over the half-open range
64/// `start..end` (or `..end`, `start..`, `..` depending on the
65/// bounds passed to [`BTree::range`]).
66///
67/// # Snapshot guarantee
68///
69/// The iterator captures the tree's root page-id at construction.
70/// Because every B+tree mutation copies-on-write (see
71/// `docs/format.md` § B+tree write semantics), any page the
72/// iterator's current traversal references continues to exist on
73/// disk until the iterator drops, *even if a concurrent
74/// `BTree::insert` or `BTree::delete` has produced a new tree
75/// shape*. M3 enforces single-writer at the file level, so
76/// concurrent writes from a different process are not in scope
77/// for M4; the snapshot guarantee covers concurrent in-process
78/// mutations only.
79pub struct RangeIter<'a, F: FileBackend> {
80    pager: &'a mut Pager<F>,
81    /// Snapshot root captured at construction. Every leaf descent
82    /// starts here so the iterator's view is stable across COW
83    /// writes that may have happened since.
84    root: PageId,
85    current_leaf: Option<DecodedNode>,
86    slot_index: usize,
87    start_bound: Bound<Vec<u8>>,
88    end_bound: Bound<Vec<u8>>,
89    /// The last key emitted by `next`. Used to find the next leaf
90    /// via a root-descent with start = `Excluded(last_key)`.
91    last_emitted_key: Option<Vec<u8>>,
92    nodes_visited: usize,
93    finished: bool,
94}
95
96impl<F: FileBackend> std::fmt::Debug for RangeIter<'_, F> {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        f.debug_struct("RangeIter")
99            .field("slot_index", &self.slot_index)
100            .field("start_bound", &self.start_bound)
101            .field("end_bound", &self.end_bound)
102            .field("nodes_visited", &self.nodes_visited)
103            .field("finished", &self.finished)
104            .finish()
105    }
106}
107
108impl<F: FileBackend> BTree<F> {
109    /// Construct a forward iterator over the half-open range
110    /// `range`. Bounds are inclusive on `start` (per `RangeBounds`)
111    /// and exclusive on `end`. An unbounded `start`/`end` is
112    /// permitted (yielding `..end`, `start..`, or `..`).
113    ///
114    /// Bounds are taken as `RangeBounds<Vec<u8>>` so the four
115    /// idiomatic Rust range syntaxes — `..`, `start..`, `..end`,
116    /// `start..end`, `start..=end` — and the `(Bound<Vec<u8>>,
117    /// Bound<Vec<u8>>)` tuple all compose naturally.
118    ///
119    /// # Errors
120    ///
121    /// Returns [`Error::BTreeDepthExceeded`], [`Error::Corruption`],
122    /// or [`Error::Io`] propagated from the descent to the first
123    /// leaf. Per-step errors during iteration are surfaced via
124    /// `Some(Err(...))` from `next`.
125    pub fn range<'a, R>(&self, pager: &'a mut Pager<F>, range: R) -> Result<RangeIter<'a, F>>
126    where
127        R: RangeBounds<Vec<u8>>,
128    {
129        let start_bound = clone_bound_vec(range.start_bound());
130        let end_bound = clone_bound_vec(range.end_bound());
131        Self::build_range_iter(pager, self.root, start_bound, end_bound)
132    }
133
134    /// Iterate over every `(key, value)` pair in the tree. Sugar
135    /// for `range(..)`.
136    ///
137    /// The Clippy `iter_not_returning_iterator` warning would
138    /// normally fire here because the return type is wrapped in a
139    /// `Result` (construction can fail). We accept the deviation
140    /// because the alternative — making `iter` infallible and
141    /// returning errors on the first `next()` — surprises the
142    /// caller more.
143    ///
144    /// # Errors
145    ///
146    /// As for [`BTree::range`].
147    #[allow(clippy::iter_not_returning_iterator)] // construction is fallible; see doc comment
148    pub fn iter<'a>(&self, pager: &'a mut Pager<F>) -> Result<RangeIter<'a, F>> {
149        self.range(pager, ..)
150    }
151
152    fn build_range_iter(
153        pager: &mut Pager<F>,
154        root: PageId,
155        start_bound: Bound<Vec<u8>>,
156        end_bound: Bound<Vec<u8>>,
157    ) -> Result<RangeIter<'_, F>> {
158        let (leaf, slot_index, nodes_visited) =
159            locate_first_in_range_leaf(pager, root, &start_bound)?;
160        let finished = slot_index >= leaf.leaves.len();
161        Ok(RangeIter {
162            pager,
163            root,
164            current_leaf: Some(leaf),
165            slot_index,
166            start_bound,
167            end_bound,
168            last_emitted_key: None,
169            nodes_visited,
170            finished,
171        })
172    }
173
174    /// Snapshot-consistent variant of [`BTree::range`] — every page
175    /// read during descent and leaf-scan is resolved as-of `snapshot`
176    /// via [`crate::pager::ReaderSnapshot::read_page`] rather than the
177    /// live `Pager::read_page` (which consults the WAL overlay,
178    /// including a concurrent writer's post-snapshot commits).
179    ///
180    /// This is the range/count analogue of [`BTree::get_via_snapshot`]
181    /// (M12 #12): a read transaction documented as snapshot-isolated
182    /// must enumerate range and count results consistently with its
183    /// pinned LSN. Walking the live pager would let a concurrent
184    /// writer's post-snapshot index entries leak into the read txn's
185    /// range/count.
186    ///
187    /// `root` is the B+tree root captured at the reader's
188    /// snapshot-time view of the catalog (the descriptor read via
189    /// [`crate::Catalog::lookup_via_snapshot`]); the caller is
190    /// responsible for handing in the snapshot-time root.
191    ///
192    /// # Errors
193    ///
194    /// As [`BTree::range`], plus snapshot read errors propagated from
195    /// [`crate::pager::ReaderSnapshot::read_page`].
196    pub fn range_via_snapshot<'a, R>(
197        pager: &'a Pager<F>,
198        snapshot: &'a crate::pager::ReaderSnapshot<F>,
199        root: PageId,
200        range: R,
201    ) -> Result<SnapshotRangeIter<'a, F>>
202    where
203        R: RangeBounds<Vec<u8>>,
204    {
205        let start_bound = clone_bound_vec(range.start_bound());
206        let end_bound = clone_bound_vec(range.end_bound());
207        let (leaf, slot_index, nodes_visited) =
208            snap_locate_first_in_range_leaf(pager, snapshot, root, &start_bound)?;
209        let finished = slot_index >= leaf.leaves.len();
210        Ok(SnapshotRangeIter {
211            pager,
212            snapshot,
213            root,
214            current_leaf: Some(leaf),
215            slot_index,
216            start_bound,
217            end_bound,
218            last_emitted_key: None,
219            nodes_visited,
220            finished,
221        })
222    }
223}
224
225/// Find the leaf + slot that holds the first key satisfying
226/// `start_bound`. Returns the leaf, the in-leaf slot index (which may
227/// equal `leaf.leaves.len()` when the snapshot has no in-range keys
228/// at all — the caller treats that as a finished iterator), and the
229/// number of leaf nodes visited during the search (counted against
230/// `MAX_RANGE_NODES`).
231///
232/// If every key in the landing leaf sorts BEFORE `start_bound`,
233/// `position_in_leaf` returns past-end. We then walk forward through
234/// right-sibling leaves (via root-descent on the current leaf's last
235/// key — leaf page-ids are unstable under COW, so we re-descend
236/// rather than follow `next_sibling`) until we find a leaf with an
237/// in-bound slot, or the snapshot has no more leaves to scan.
238///
239/// Bound the walk via the same `MAX_RANGE_NODES` budget the iterator
240/// uses per-step; an unbounded walk here would be a power-of-ten
241/// Rule 2 violation. In practice the loop executes at most twice —
242/// by the B+tree's pivot invariant, a leaf adjacent-right of the
243/// landing leaf has its first key strictly greater than every key in
244/// the landing leaf, so `position_in_leaf` on the very next leaf
245/// returns 0 for any start bound whose value is ≤ the landing leaf's
246/// largest key.
247fn locate_first_in_range_leaf<F: FileBackend>(
248    pager: &mut Pager<F>,
249    root: PageId,
250    start_bound: &Bound<Vec<u8>>,
251) -> Result<(DecodedNode, usize, usize)> {
252    // Descend to the leaf that should hold the first
253    // in-range key. Uses an explicit `heapless::Vec` stack to
254    // bound depth.
255    let descend_key = match start_bound {
256        Bound::Included(k) | Bound::Excluded(k) => k.as_slice(),
257        Bound::Unbounded => &[][..],
258    };
259    let leaf_id = descend_to_start_leaf(pager, root, descend_key)?;
260    let mut leaf = read_leaf(pager, leaf_id)?;
261    let mut slot_index = position_in_leaf(&leaf, start_bound);
262    let mut nodes_visited: usize = 1;
263    while slot_index >= leaf.leaves.len() {
264        let Some(last_key) = leaf.leaves.last().map(|e| e.key.clone()) else {
265            // Landing leaf is empty — the snapshot has nothing
266            // to scan. Surface as a finished iterator below.
267            break;
268        };
269        nodes_visited += 1;
270        if nodes_visited > MAX_RANGE_NODES {
271            return Err(Error::BTreeScanLimitExceeded {
272                limit: MAX_RANGE_NODES,
273            });
274        }
275        let Some(next_id) = descend_to_leaf_after(pager, root, last_key.as_slice())? else {
276            // No more leaves to the right — return an empty
277            // finished iterator below.
278            break;
279        };
280        leaf = read_leaf(pager, next_id)?;
281        slot_index = position_in_leaf(&leaf, start_bound);
282    }
283    Ok((leaf, slot_index, nodes_visited))
284}
285
286fn clone_bound_vec(b: Bound<&Vec<u8>>) -> Bound<Vec<u8>> {
287    match b {
288        Bound::Included(s) => Bound::Included(s.clone()),
289        Bound::Excluded(s) => Bound::Excluded(s.clone()),
290        Bound::Unbounded => Bound::Unbounded,
291    }
292}
293
294fn descend_to_start_leaf<F: FileBackend>(
295    pager: &mut Pager<F>,
296    root: PageId,
297    key: &[u8],
298) -> Result<PageId> {
299    let mut path: HeaplessVec<PageId, MAX_BTREE_DEPTH> = HeaplessVec::new();
300    let mut current = root;
301    loop {
302        path.push(current).map_err(|_| Error::BTreeDepthExceeded {
303            limit: MAX_BTREE_DEPTH,
304        })?;
305        let decoded = {
306            let pr = pager.read_page(current)?;
307            decode_node(pr.as_bytes())?
308        };
309        match decoded.kind {
310            NodeKind::Leaf => return Ok(current),
311            NodeKind::Internal => {
312                let idx = pivot_index_for_start(&decoded, key);
313                current =
314                    PageId::new(decoded.children[idx]).ok_or(Error::BTreeInvariantViolated {
315                        reason: "range descent: zero child page-id",
316                    })?;
317            }
318        }
319    }
320}
321
322fn pivot_index_for_start(node: &DecodedNode, key: &[u8]) -> usize {
323    // Same as `descend_to_leaf`: find smallest `i` with
324    // `pivot[i] > key`; otherwise rightmost child.
325    let mut idx = node.internals.len();
326    for (i, p) in node.internals.iter().enumerate() {
327        if p.key.as_slice() > key {
328            idx = i;
329            break;
330        }
331    }
332    idx
333}
334
335fn read_leaf<F: FileBackend>(pager: &mut Pager<F>, id: PageId) -> Result<DecodedNode> {
336    let pr = pager.read_page(id)?;
337    let decoded = decode_node(pr.as_bytes())?;
338    if !matches!(decoded.kind, NodeKind::Leaf) {
339        return Err(Error::BTreeInvariantViolated {
340            reason: "range: expected leaf",
341        });
342    }
343    Ok(decoded)
344}
345
346/// First slot index in `leaf` whose key satisfies the start bound.
347fn position_in_leaf(leaf: &DecodedNode, start: &Bound<Vec<u8>>) -> usize {
348    match start {
349        Bound::Unbounded => 0,
350        Bound::Included(k) => leaf
351            .leaves
352            .iter()
353            .position(|e| e.key.as_slice() >= k.as_slice())
354            .unwrap_or(leaf.leaves.len()),
355        Bound::Excluded(k) => leaf
356            .leaves
357            .iter()
358            .position(|e| e.key.as_slice() > k.as_slice())
359            .unwrap_or(leaf.leaves.len()),
360    }
361}
362
363fn within_end(key: &[u8], end: &Bound<Vec<u8>>) -> bool {
364    match end {
365        Bound::Unbounded => true,
366        Bound::Included(k) => key <= k.as_slice(),
367        Bound::Excluded(k) => key < k.as_slice(),
368    }
369}
370
371impl<F: FileBackend> Iterator for RangeIter<'_, F> {
372    type Item = Result<(Vec<u8>, Vec<u8>)>;
373
374    fn next(&mut self) -> Option<Self::Item> {
375        if self.finished {
376            return None;
377        }
378        loop {
379            let leaf = self.current_leaf.as_ref()?;
380            if self.slot_index < leaf.leaves.len() {
381                let entry = &leaf.leaves[self.slot_index];
382                if !within_end(entry.key.as_slice(), &self.end_bound) {
383                    self.finished = true;
384                    return None;
385                }
386                let item = (entry.key.clone(), entry.value.clone());
387                self.slot_index += 1;
388                self.last_emitted_key = Some(item.0.clone());
389                return Some(Ok(item));
390            }
391            // In-leaf slot exhausted; descend from the snapshot
392            // root to find the next leaf whose smallest key is
393            // strictly greater than the last emitted key.
394            match self.advance_to_next_leaf() {
395                Ok(true) => (),
396                Ok(false) => {
397                    self.finished = true;
398                    return None;
399                }
400                Err(e) => {
401                    self.finished = true;
402                    return Some(Err(e));
403                }
404            }
405        }
406    }
407}
408
409impl<F: FileBackend> RangeIter<'_, F> {
410    /// Advance `current_leaf` to the next leaf in the snapshot.
411    /// Returns `Ok(true)` if a new leaf with at least one key
412    /// strictly greater than the last emitted was loaded; `Ok(false)`
413    /// if no more keys remain in the snapshot.
414    fn advance_to_next_leaf(&mut self) -> Result<bool> {
415        let Some(last) = self.last_emitted_key.clone() else {
416            // No keys emitted yet and the initial leaf was empty —
417            // nothing else to scan in this snapshot.
418            return Ok(false);
419        };
420        self.nodes_visited += 1;
421        if self.nodes_visited > MAX_RANGE_NODES {
422            return Err(Error::BTreeScanLimitExceeded {
423                limit: MAX_RANGE_NODES,
424            });
425        }
426        let Some(leaf_id) = descend_to_leaf_after(self.pager, self.root, last.as_slice())? else {
427            return Ok(false);
428        };
429        let leaf = read_leaf(self.pager, leaf_id)?;
430        // The leaf landed-on is guaranteed to have its first key
431        // > `last` by construction. But defensively scan past any
432        // entries <= last (only relevant if a future change to
433        // `descend_to_leaf_after` weakens its post-condition).
434        let slot_index = leaf
435            .leaves
436            .iter()
437            .position(|e| e.key.as_slice() > last.as_slice())
438            .unwrap_or(leaf.leaves.len());
439        if slot_index == leaf.leaves.len() {
440            return Ok(false);
441        }
442        self.current_leaf = Some(leaf);
443        self.slot_index = slot_index;
444        Ok(true)
445    }
446}
447
448/// Descend from `root` to the leaf whose smallest key is strictly
449/// greater than `key`. Returns `Ok(None)` if no such leaf exists.
450///
451/// Algorithm: descend to the leaf that would *contain* `key` while
452/// recording the (internal, `child_index`) frames of the path. If
453/// the landing leaf has any key > `key`, return it. Otherwise walk
454/// the recorded path back up looking for an internal whose next
455/// child (`child_index` + 1) exists; descend the leftmost leaf of
456/// that subtree.
457fn descend_to_leaf_after<F: FileBackend>(
458    pager: &mut Pager<F>,
459    root: PageId,
460    key: &[u8],
461) -> Result<Option<PageId>> {
462    let mut frames: HeaplessVec<DescendFrame, MAX_BTREE_DEPTH> = HeaplessVec::new();
463    let mut current = root;
464    loop {
465        let decoded = {
466            let pr = pager.read_page(current)?;
467            decode_node(pr.as_bytes())?
468        };
469        match decoded.kind {
470            NodeKind::Leaf => {
471                if decoded.leaves.iter().any(|e| e.key.as_slice() > key) {
472                    return Ok(Some(current));
473                }
474                break;
475            }
476            NodeKind::Internal => {
477                let child_index = pivot_index_for_start(&decoded, key);
478                let raw = decoded.children[child_index];
479                frames
480                    .push(DescendFrame {
481                        node: decoded,
482                        child_index,
483                    })
484                    .map_err(|_| Error::BTreeDepthExceeded {
485                        limit: MAX_BTREE_DEPTH,
486                    })?;
487                current = PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
488                    reason: "descend_to_leaf_after: zero child id",
489                })?;
490            }
491        }
492    }
493    // Landing leaf has no key > key. Walk parents looking for one
494    // whose next child exists; descend its leftmost leaf.
495    while let Some(frame) = frames.pop() {
496        let next_child = frame.child_index + 1;
497        if next_child < frame.node.children.len() {
498            let raw = frame.node.children[next_child];
499            let next_root = PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
500                reason: "descend_to_leaf_after: zero next-child id",
501            })?;
502            return descend_leftmost_leaf(pager, next_root).map(Some);
503        }
504    }
505    Ok(None)
506}
507
508struct DescendFrame {
509    node: DecodedNode,
510    child_index: usize,
511}
512
513/// Descend from `root` to the leftmost leaf in its subtree.
514fn descend_leftmost_leaf<F: FileBackend>(pager: &mut Pager<F>, root: PageId) -> Result<PageId> {
515    let mut path: HeaplessVec<PageId, MAX_BTREE_DEPTH> = HeaplessVec::new();
516    let mut current = root;
517    loop {
518        path.push(current).map_err(|_| Error::BTreeDepthExceeded {
519            limit: MAX_BTREE_DEPTH,
520        })?;
521        let decoded = {
522            let pr = pager.read_page(current)?;
523            decode_node(pr.as_bytes())?
524        };
525        match decoded.kind {
526            NodeKind::Leaf => return Ok(current),
527            NodeKind::Internal => {
528                let raw = decoded.children[0];
529                current = PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
530                    reason: "descend_leftmost_leaf: zero child id",
531                })?;
532            }
533        }
534    }
535}
536
537// ----------------------------------------------------------------------
538// Snapshot-pinned forward range scan (M12 #12).
539//
540// Mirrors `RangeIter` exactly, but every page read resolves through
541// `ReaderSnapshot::read_page(pager, id)` (which serves WAL frames at
542// `LSN <= pinned_lsn` at snapshot creation time, never a concurrent
543// writer's post-snapshot commit) rather than the live `Pager::read_page`.
544//
545// `ReaderSnapshot::read_page` takes `&Pager` and returns an owned
546// `Page`, so this iterator borrows the pager immutably for its whole
547// lifetime — distinct from `RangeIter`, which holds `&mut Pager`.
548// ----------------------------------------------------------------------
549
550/// Snapshot-consistent forward iterator over `(key, value)` pairs.
551///
552/// Constructed by [`BTree::range_via_snapshot`]. Yields items in
553/// ascending key order, identical in semantics to [`RangeIter`], but
554/// every traversal read is resolved as-of the pinned snapshot.
555pub struct SnapshotRangeIter<'a, F: FileBackend> {
556    pager: &'a Pager<F>,
557    snapshot: &'a crate::pager::ReaderSnapshot<F>,
558    /// Snapshot root captured at construction; every leaf descent
559    /// restarts here, matching [`RangeIter`].
560    root: PageId,
561    current_leaf: Option<DecodedNode>,
562    slot_index: usize,
563    start_bound: Bound<Vec<u8>>,
564    end_bound: Bound<Vec<u8>>,
565    last_emitted_key: Option<Vec<u8>>,
566    nodes_visited: usize,
567    finished: bool,
568}
569
570impl<F: FileBackend> std::fmt::Debug for SnapshotRangeIter<'_, F> {
571    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
572        f.debug_struct("SnapshotRangeIter")
573            .field("slot_index", &self.slot_index)
574            .field("start_bound", &self.start_bound)
575            .field("end_bound", &self.end_bound)
576            .field("nodes_visited", &self.nodes_visited)
577            .field("finished", &self.finished)
578            .finish()
579    }
580}
581
582impl<F: FileBackend> Iterator for SnapshotRangeIter<'_, F> {
583    type Item = Result<(Vec<u8>, Vec<u8>)>;
584
585    fn next(&mut self) -> Option<Self::Item> {
586        if self.finished {
587            return None;
588        }
589        loop {
590            let leaf = self.current_leaf.as_ref()?;
591            if self.slot_index < leaf.leaves.len() {
592                let entry = &leaf.leaves[self.slot_index];
593                if !within_end(entry.key.as_slice(), &self.end_bound) {
594                    self.finished = true;
595                    return None;
596                }
597                let item = (entry.key.clone(), entry.value.clone());
598                self.slot_index += 1;
599                self.last_emitted_key = Some(item.0.clone());
600                return Some(Ok(item));
601            }
602            match self.advance_to_next_leaf() {
603                Ok(true) => (),
604                Ok(false) => {
605                    self.finished = true;
606                    return None;
607                }
608                Err(e) => {
609                    self.finished = true;
610                    return Some(Err(e));
611                }
612            }
613        }
614    }
615}
616
617impl<F: FileBackend> SnapshotRangeIter<'_, F> {
618    /// Advance `current_leaf` to the next leaf in the snapshot.
619    /// Snapshot mirror of [`RangeIter::advance_to_next_leaf`].
620    fn advance_to_next_leaf(&mut self) -> Result<bool> {
621        let Some(last) = self.last_emitted_key.clone() else {
622            return Ok(false);
623        };
624        self.nodes_visited += 1;
625        if self.nodes_visited > MAX_RANGE_NODES {
626            return Err(Error::BTreeScanLimitExceeded {
627                limit: MAX_RANGE_NODES,
628            });
629        }
630        let Some(leaf_id) =
631            snap_descend_to_leaf_after(self.pager, self.snapshot, self.root, last.as_slice())?
632        else {
633            return Ok(false);
634        };
635        let leaf = snap_read_leaf(self.pager, self.snapshot, leaf_id)?;
636        let slot_index = leaf
637            .leaves
638            .iter()
639            .position(|e| e.key.as_slice() > last.as_slice())
640            .unwrap_or(leaf.leaves.len());
641        if slot_index == leaf.leaves.len() {
642            return Ok(false);
643        }
644        self.current_leaf = Some(leaf);
645        self.slot_index = slot_index;
646        Ok(true)
647    }
648}
649
650/// Snapshot mirror of [`locate_first_in_range_leaf`].
651fn snap_locate_first_in_range_leaf<F: FileBackend>(
652    pager: &Pager<F>,
653    snapshot: &crate::pager::ReaderSnapshot<F>,
654    root: PageId,
655    start_bound: &Bound<Vec<u8>>,
656) -> Result<(DecodedNode, usize, usize)> {
657    let descend_key = match start_bound {
658        Bound::Included(k) | Bound::Excluded(k) => k.as_slice(),
659        Bound::Unbounded => &[][..],
660    };
661    let leaf_id = snap_descend_to_start_leaf(pager, snapshot, root, descend_key)?;
662    let mut leaf = snap_read_leaf(pager, snapshot, leaf_id)?;
663    let mut slot_index = position_in_leaf(&leaf, start_bound);
664    let mut nodes_visited: usize = 1;
665    while slot_index >= leaf.leaves.len() {
666        let Some(last_key) = leaf.leaves.last().map(|e| e.key.clone()) else {
667            break;
668        };
669        nodes_visited += 1;
670        if nodes_visited > MAX_RANGE_NODES {
671            return Err(Error::BTreeScanLimitExceeded {
672                limit: MAX_RANGE_NODES,
673            });
674        }
675        let Some(next_id) = snap_descend_to_leaf_after(pager, snapshot, root, last_key.as_slice())?
676        else {
677            break;
678        };
679        leaf = snap_read_leaf(pager, snapshot, next_id)?;
680        slot_index = position_in_leaf(&leaf, start_bound);
681    }
682    Ok((leaf, slot_index, nodes_visited))
683}
684
685/// Snapshot mirror of [`descend_to_start_leaf`].
686fn snap_descend_to_start_leaf<F: FileBackend>(
687    pager: &Pager<F>,
688    snapshot: &crate::pager::ReaderSnapshot<F>,
689    root: PageId,
690    key: &[u8],
691) -> Result<PageId> {
692    let mut path: HeaplessVec<PageId, MAX_BTREE_DEPTH> = HeaplessVec::new();
693    let mut current = root;
694    loop {
695        path.push(current).map_err(|_| Error::BTreeDepthExceeded {
696            limit: MAX_BTREE_DEPTH,
697        })?;
698        let page = snapshot.read_page(pager, current)?;
699        let decoded = decode_node(page.as_bytes())?;
700        match decoded.kind {
701            NodeKind::Leaf => return Ok(current),
702            NodeKind::Internal => {
703                let idx = pivot_index_for_start(&decoded, key);
704                current =
705                    PageId::new(decoded.children[idx]).ok_or(Error::BTreeInvariantViolated {
706                        reason: "range descent: zero child page-id",
707                    })?;
708            }
709        }
710    }
711}
712
713/// Snapshot mirror of [`read_leaf`].
714fn snap_read_leaf<F: FileBackend>(
715    pager: &Pager<F>,
716    snapshot: &crate::pager::ReaderSnapshot<F>,
717    id: PageId,
718) -> Result<DecodedNode> {
719    let page = snapshot.read_page(pager, id)?;
720    let decoded = decode_node(page.as_bytes())?;
721    if !matches!(decoded.kind, NodeKind::Leaf) {
722        return Err(Error::BTreeInvariantViolated {
723            reason: "range: expected leaf",
724        });
725    }
726    Ok(decoded)
727}
728
729/// Snapshot mirror of [`descend_to_leaf_after`].
730fn snap_descend_to_leaf_after<F: FileBackend>(
731    pager: &Pager<F>,
732    snapshot: &crate::pager::ReaderSnapshot<F>,
733    root: PageId,
734    key: &[u8],
735) -> Result<Option<PageId>> {
736    let mut frames: HeaplessVec<DescendFrame, MAX_BTREE_DEPTH> = HeaplessVec::new();
737    let mut current = root;
738    loop {
739        let page = snapshot.read_page(pager, current)?;
740        let decoded = decode_node(page.as_bytes())?;
741        match decoded.kind {
742            NodeKind::Leaf => {
743                if decoded.leaves.iter().any(|e| e.key.as_slice() > key) {
744                    return Ok(Some(current));
745                }
746                break;
747            }
748            NodeKind::Internal => {
749                let child_index = pivot_index_for_start(&decoded, key);
750                let raw = decoded.children[child_index];
751                frames
752                    .push(DescendFrame {
753                        node: decoded,
754                        child_index,
755                    })
756                    .map_err(|_| Error::BTreeDepthExceeded {
757                        limit: MAX_BTREE_DEPTH,
758                    })?;
759                current = PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
760                    reason: "descend_to_leaf_after: zero child id",
761                })?;
762            }
763        }
764    }
765    while let Some(frame) = frames.pop() {
766        let next_child = frame.child_index + 1;
767        if next_child < frame.node.children.len() {
768            let raw = frame.node.children[next_child];
769            let next_root = PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
770                reason: "descend_to_leaf_after: zero next-child id",
771            })?;
772            return snap_descend_leftmost_leaf(pager, snapshot, next_root).map(Some);
773        }
774    }
775    Ok(None)
776}
777
778/// Snapshot mirror of [`descend_leftmost_leaf`].
779fn snap_descend_leftmost_leaf<F: FileBackend>(
780    pager: &Pager<F>,
781    snapshot: &crate::pager::ReaderSnapshot<F>,
782    root: PageId,
783) -> Result<PageId> {
784    let mut path: HeaplessVec<PageId, MAX_BTREE_DEPTH> = HeaplessVec::new();
785    let mut current = root;
786    loop {
787        path.push(current).map_err(|_| Error::BTreeDepthExceeded {
788            limit: MAX_BTREE_DEPTH,
789        })?;
790        let page = snapshot.read_page(pager, current)?;
791        let decoded = decode_node(page.as_bytes())?;
792        match decoded.kind {
793            NodeKind::Leaf => return Ok(current),
794            NodeKind::Internal => {
795                let raw = decoded.children[0];
796                current = PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
797                    reason: "descend_leftmost_leaf: zero child id",
798                })?;
799            }
800        }
801    }
802}
803
804#[cfg(test)]
805mod tests {
806    use super::*;
807    use crate::pager::{Config, Pager};
808    use crate::platform::FileHandle;
809
810    use rand::Rng;
811    use rand::SeedableRng;
812    use rand_chacha::ChaCha8Rng;
813    use std::collections::BTreeMap;
814    use std::ops::Bound;
815
816    fn config() -> Config {
817        Config::default()
818    }
819
820    fn collect_all(
821        pager: &mut Pager<FileHandle>,
822        tree: &BTree<FileHandle>,
823    ) -> Vec<(Vec<u8>, Vec<u8>)> {
824        let iter = tree.iter(pager).expect("iter");
825        iter.map(|r| r.expect("step")).collect()
826    }
827
828    #[test]
829    fn iter_empty_tree() {
830        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
831        let tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
832        let v = collect_all(&mut pager, &tree);
833        assert!(v.is_empty());
834    }
835
836    #[test]
837    fn iter_single_leaf() {
838        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
839        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
840        for (k, v) in [("alpha", "A"), ("bravo", "B"), ("charlie", "C")] {
841            tree.insert(&mut pager, k.as_bytes(), v.as_bytes())
842                .expect("ins");
843        }
844        let got = collect_all(&mut pager, &tree);
845        let want: Vec<(Vec<u8>, Vec<u8>)> = [("alpha", "A"), ("bravo", "B"), ("charlie", "C")]
846            .iter()
847            .map(|(k, v)| (k.as_bytes().to_vec(), v.as_bytes().to_vec()))
848            .collect();
849        assert_eq!(got, want);
850    }
851
852    #[test]
853    fn range_across_leaf_split_boundary() {
854        // Force at least one split by inserting many 256-byte values.
855        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
856        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
857        let value = vec![0xAAu8; 256];
858        for i in 0..50u32 {
859            let key = format!("key-{i:08}");
860            tree.insert(&mut pager, key.as_bytes(), &value)
861                .expect("ins");
862        }
863        // Confirm the tree has split (root is internal).
864        let root = tree.root();
865        let root_decoded = {
866            let pr = pager.read_page(root).expect("read");
867            decode_node(pr.as_bytes()).expect("dec")
868        };
869        assert!(root_decoded.level >= 1, "expected root split");
870        // Walk every key — must be sorted, no gaps.
871        let iter = tree.iter(&mut pager).expect("iter");
872        let mut prev: Option<Vec<u8>> = None;
873        let mut count = 0;
874        for item in iter {
875            let (k, _v) = item.expect("step");
876            if let Some(p) = &prev {
877                assert!(p.as_slice() < k.as_slice(), "non-monotonic at {k:?}");
878            }
879            prev = Some(k);
880            count += 1;
881        }
882        assert_eq!(count, 50);
883    }
884
885    #[test]
886    fn range_bounded_subset() {
887        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
888        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
889        for i in 0..20u32 {
890            let key = format!("k-{i:03}");
891            tree.insert(&mut pager, key.as_bytes(), b"v").expect("ins");
892        }
893        // Inclusive-start, exclusive-end. Bound is k-005..k-010.
894        let iter = tree
895            .range(
896                &mut pager,
897                (
898                    Bound::Included(b"k-005".to_vec()),
899                    Bound::Excluded(b"k-010".to_vec()),
900                ),
901            )
902            .expect("range");
903        let got: Vec<String> = iter
904            .map(|r| String::from_utf8(r.expect("step").0).expect("utf8"))
905            .collect();
906        assert_eq!(got, vec!["k-005", "k-006", "k-007", "k-008", "k-009"]);
907    }
908
909    #[test]
910    fn range_oracle_1000_ops() {
911        for seed in 0..3u64 {
912            run_range_oracle(seed, 1_000);
913        }
914    }
915
916    /// Regression for the bug surfaced by issue #29's 1M-op oracle:
917    /// when `RangeIter::build_range_iter` descends to the leaf that
918    /// should hold the start key but every entry in that leaf sorts
919    /// strictly before the start bound, the constructor's
920    /// `position_in_leaf` returns past-end. The original
921    /// implementation then short-circuited the first `next()` to
922    /// `None` via `last_emitted_key.is_none()`, wrongly reporting an
923    /// empty range even though right-sibling leaves contained
924    /// matching keys.
925    ///
926    /// This test forces a tree split, then queries a range whose
927    /// start key sorts AFTER every key in the leftmost leaf but
928    /// BEFORE keys in the next leaf. The fixed iterator must walk
929    /// to the right sibling and return those keys.
930    #[test]
931    fn range_start_bound_past_landing_leaf_end() {
932        let (mut pager, tree, value) = build_split_fixture();
933        let (start_key, want) = construct_past_landing_leaf_query(&mut pager, &tree, &value);
934        let iter = tree
935            .range(
936                &mut pager,
937                (Bound::Included(start_key), Bound::<Vec<u8>>::Unbounded),
938            )
939            .expect("range");
940        let got: Vec<(Vec<u8>, Vec<u8>)> = iter.map(|r| r.expect("step")).collect();
941        assert_eq!(got, want, "iterator must advance past empty landing leaf");
942    }
943
944    /// Builds a B+tree with enough 256-byte-value entries to force
945    /// at least one leaf split (root becomes internal).
946    fn build_split_fixture() -> (Pager<FileHandle>, BTree<FileHandle>, Vec<u8>) {
947        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
948        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
949        let value = vec![0xCDu8; 256];
950        for i in 0..50u32 {
951            let key = format!("key-{i:08}");
952            tree.insert(&mut pager, key.as_bytes(), &value)
953                .expect("ins");
954        }
955        (pager, tree, value)
956    }
957
958    type ExpectedRangeResult = Vec<(Vec<u8>, Vec<u8>)>;
959
960    /// Constructs a start key strictly greater than every key in the
961    /// leftmost leaf but strictly less than the pivot above it, and
962    /// returns the oracle-computed expected results for the
963    /// `Included(start_key)..` range.
964    fn construct_past_landing_leaf_query(
965        pager: &mut Pager<FileHandle>,
966        tree: &BTree<FileHandle>,
967        value: &[u8],
968    ) -> (Vec<u8>, ExpectedRangeResult) {
969        let root_decoded = {
970            let pr = pager.read_page(tree.root()).expect("read");
971            decode_node(pr.as_bytes()).expect("dec")
972        };
973        assert!(root_decoded.level >= 1, "fixture must split the tree");
974        let leftmost_id = PageId::new(root_decoded.children[0]).expect("nz");
975        let leftmost = read_leaf(pager, leftmost_id).expect("leaf");
976        let last_in_leftmost = leftmost.leaves.last().expect("non-empty").key.clone();
977        let pivot = root_decoded.internals[0].key.clone();
978        // Appending a NUL byte produces a key lexicographically just
979        // past the leaf's last entry and (by key structure) below
980        // the pivot, forcing the descent to land in leaf 0.
981        let mut start_key = last_in_leftmost.clone();
982        start_key.push(0);
983        assert!(start_key.as_slice() > last_in_leftmost.as_slice());
984        assert!(start_key.as_slice() < pivot.as_slice());
985        let oracle: BTreeMap<Vec<u8>, Vec<u8>> = (0..50u32)
986            .map(|i| (format!("key-{i:08}").into_bytes(), value.to_vec()))
987            .collect();
988        let want: Vec<(Vec<u8>, Vec<u8>)> = oracle
989            .range::<Vec<u8>, _>((Bound::Included(&start_key), Bound::<&Vec<u8>>::Unbounded))
990            .map(|(k, v)| (k.clone(), v.clone()))
991            .collect();
992        assert!(!want.is_empty(), "oracle must return sibling-leaf entries");
993        (start_key, want)
994    }
995
996    fn run_range_oracle(seed: u64, ops: usize) {
997        let mut rng = ChaCha8Rng::seed_from_u64(seed);
998        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
999        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
1000        let mut oracle: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1001        for op in 0..ops {
1002            match rng.random_range(0u32..6) {
1003                0..=2 => insert_step(&mut pager, &mut tree, &mut oracle, &mut rng, seed, op),
1004                3 => delete_step(&mut pager, &mut tree, &mut oracle, &mut rng, seed, op),
1005                _ => range_check(&mut pager, &tree, &oracle, &mut rng, seed, op),
1006            }
1007        }
1008        // Final pass: full iter vs full BTreeMap range.
1009        let got = collect_all(&mut pager, &tree);
1010        let want: Vec<(Vec<u8>, Vec<u8>)> =
1011            oracle.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1012        assert_eq!(got, want, "seed {seed}: final iter mismatch");
1013    }
1014
1015    fn insert_step(
1016        pager: &mut Pager<FileHandle>,
1017        tree: &mut BTree<FileHandle>,
1018        oracle: &mut BTreeMap<Vec<u8>, Vec<u8>>,
1019        rng: &mut ChaCha8Rng,
1020        seed: u64,
1021        op: usize,
1022    ) {
1023        let key = random_key(rng);
1024        let value = random_value(rng);
1025        if let std::collections::btree_map::Entry::Vacant(slot) = oracle.entry(key.clone()) {
1026            tree.insert(pager, &key, &value)
1027                .unwrap_or_else(|e| panic!("seed {seed} op {op} ins: {e:?}"));
1028            slot.insert(value);
1029        }
1030    }
1031
1032    fn delete_step(
1033        pager: &mut Pager<FileHandle>,
1034        tree: &mut BTree<FileHandle>,
1035        oracle: &mut BTreeMap<Vec<u8>, Vec<u8>>,
1036        rng: &mut ChaCha8Rng,
1037        seed: u64,
1038        op: usize,
1039    ) {
1040        if oracle.is_empty() {
1041            return;
1042        }
1043        let pick = rng.random_range(0..oracle.len());
1044        let key = oracle.keys().nth(pick).cloned().unwrap_or_default();
1045        oracle.remove(&key);
1046        tree.delete(pager, &key)
1047            .unwrap_or_else(|e| panic!("seed {seed} op {op} del: {e:?}"));
1048    }
1049
1050    fn range_check(
1051        pager: &mut Pager<FileHandle>,
1052        tree: &BTree<FileHandle>,
1053        oracle: &BTreeMap<Vec<u8>, Vec<u8>>,
1054        rng: &mut ChaCha8Rng,
1055        seed: u64,
1056        op: usize,
1057    ) {
1058        let (start, end) = random_bounds(rng);
1059        // BTreeMap::range panics if start > end. Skip those random
1060        // draws — they're not interesting cases for the iterator
1061        // (semantically empty), and `RangeBounds` itself doesn't
1062        // require well-orderedness.
1063        if !bounds_well_ordered(&start, &end) {
1064            return;
1065        }
1066        let iter = tree
1067            .range(pager, (start.clone(), end.clone()))
1068            .expect("range");
1069        let got: Vec<(Vec<u8>, Vec<u8>)> = iter.map(|r| r.expect("step")).collect();
1070        let want: Vec<(Vec<u8>, Vec<u8>)> = oracle
1071            .range::<Vec<u8>, _>((start.as_ref(), end.as_ref()))
1072            .map(|(k, v)| (k.clone(), v.clone()))
1073            .collect();
1074        assert_eq!(got, want, "seed {seed} op {op}: range mismatch");
1075    }
1076
1077    fn bounds_well_ordered(start: &Bound<Vec<u8>>, end: &Bound<Vec<u8>>) -> bool {
1078        match (start, end) {
1079            (Bound::Unbounded, _) | (_, Bound::Unbounded) => true,
1080            (Bound::Excluded(s), Bound::Excluded(e)) => s.as_slice() < e.as_slice(),
1081            (Bound::Included(s) | Bound::Excluded(s), Bound::Included(e) | Bound::Excluded(e)) => {
1082                s.as_slice() <= e.as_slice()
1083            }
1084        }
1085    }
1086
1087    fn random_bounds(rng: &mut ChaCha8Rng) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
1088        let s = match rng.random_range(0u32..3) {
1089            0 => Bound::Unbounded,
1090            1 => Bound::Included(random_key(rng)),
1091            _ => Bound::Excluded(random_key(rng)),
1092        };
1093        let e = match rng.random_range(0u32..3) {
1094            0 => Bound::Unbounded,
1095            1 => Bound::Included(random_key(rng)),
1096            _ => Bound::Excluded(random_key(rng)),
1097        };
1098        (s, e)
1099    }
1100
1101    fn random_key(rng: &mut ChaCha8Rng) -> Vec<u8> {
1102        let len = rng.random_range(1..6);
1103        (0..len).map(|_| rng.random_range(b'a'..=b'c')).collect()
1104    }
1105
1106    fn random_value(rng: &mut ChaCha8Rng) -> Vec<u8> {
1107        let len = rng.random_range(0..16);
1108        (0..len).map(|_| rng.random()).collect()
1109    }
1110}