Skip to main content

reddb_server/storage/engine/
btree.rs

1//! B+ Tree Implementation for Page-Based Storage
2//!
3//! A B+ tree optimized for disk I/O with the following properties:
4//! - All values stored in leaf nodes
5//! - Interior nodes only contain keys and child pointers
6//! - Leaf nodes form a doubly-linked list for range scans
7//! - Each node fits in a single 4KB page
8//!
9//! # Page Layout
10//!
11//! Interior Node:
12//! ```text
13//! ┌────────────────────────────────────────────────────────────┐
14//! │ PageHeader (32 bytes)                                      │
15//! ├────────────────────────────────────────────────────────────┤
16//! │ right_child: u32 - Rightmost child pointer                 │
17//! │ cells: [key_len: u16, key: [u8], child: u32]...           │
18//! └────────────────────────────────────────────────────────────┘
19//! ```
20//!
21//! Leaf Node:
22//! ```text
23//! ┌────────────────────────────────────────────────────────────┐
24//! │ PageHeader (32 bytes)                                      │
25//! ├────────────────────────────────────────────────────────────┤
26//! │ prev_leaf: u32 - Previous leaf (0 = none)                  │
27//! │ next_leaf: u32 - Next leaf (0 = none)                      │
28//! │ cells: [key_len: u16, val_len: u16, key: [u8], val: [u8]] │
29//! └────────────────────────────────────────────────────────────┘
30//! ```
31
32use std::cmp::Ordering;
33use std::sync::{Arc, RwLock};
34
35use super::page::{Page, PageType, HEADER_SIZE, PAGE_SIZE};
36use super::pager::{Pager, PagerError};
37
38/// Maximum key size (to ensure at least 2 keys per node)
39pub const MAX_KEY_SIZE: usize = 1024;
40
41/// Maximum value size for inline storage.
42///
43/// Derived as `PAGE_SIZE / 4` so it tracks the page size automatically and
44/// preserves the design invariant of at least two cells per leaf. At the
45/// 16KB page this is 4096 bytes (4x the legacy 4KB-page limit of 1024).
46pub const MAX_VALUE_SIZE: usize = PAGE_SIZE / 4;
47
48/// Minimum fill factor before merge (as percentage)
49pub const MIN_FILL_FACTOR: usize = 25;
50
51/// Offset of prev_leaf in leaf page
52const LEAF_PREV_OFFSET: usize = HEADER_SIZE;
53
54/// Offset of next_leaf in leaf page
55const LEAF_NEXT_OFFSET: usize = HEADER_SIZE + 4;
56
57/// Start of the slot array in leaf pages.
58///
59/// Slotted-page layout:
60/// ```text
61/// [PageHeader 32B][prev u32 | next u32 | 8B]
62/// [slot 0: u16][slot 1: u16]...[slot N-1: u16]  ← grows forward
63/// ... free space ...
64/// [cell N-1][cell N-2]...[cell 0]               ← grows backward from free_end
65/// ```
66/// Each cell is laid out as `[key_len:u16][val_len:u16][key][val]`.
67/// `page.cell_count()` is N; `page.free_end()` is the offset of the
68/// lowest (most recently written) cell. The slot array lives right
69/// after the leaf-chain links and each u16 slot is the absolute page
70/// offset of its cell.
71const LEAF_SLOT_ARRAY_OFFSET: usize = HEADER_SIZE + 8;
72
73/// Kept for source-compat with older code paths (e.g. interior-node
74/// helpers); equivalent to `LEAF_SLOT_ARRAY_OFFSET` for leaves.
75const LEAF_DATA_OFFSET: usize = LEAF_SLOT_ARRAY_OFFSET;
76
77/// Start of cell data in interior pages (right_child is in header)
78const INTERIOR_DATA_OFFSET: usize = HEADER_SIZE;
79
80/// B+ Tree error types
81#[derive(Debug, Clone)]
82pub enum BTreeError {
83    /// Key not found
84    NotFound,
85    /// Key already exists
86    DuplicateKey,
87    /// Key too large
88    KeyTooLarge(usize),
89    /// Value too large
90    ValueTooLarge(usize),
91    /// Tree is corrupted
92    Corrupted(String),
93    /// Pager error
94    Pager(String),
95    /// Internal lock was poisoned by a panicked thread
96    LockPoisoned(String),
97}
98
99impl std::fmt::Display for BTreeError {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        match self {
102            Self::NotFound => write!(f, "Key not found"),
103            Self::DuplicateKey => write!(f, "Key already exists"),
104            Self::KeyTooLarge(size) => {
105                write!(f, "Key too large: {} bytes (max {})", size, MAX_KEY_SIZE)
106            }
107            Self::ValueTooLarge(size) => write!(
108                f,
109                "Value too large: {} bytes (max {})",
110                size, MAX_VALUE_SIZE
111            ),
112            Self::Corrupted(msg) => write!(f, "B-tree corrupted: {}", msg),
113            Self::Pager(msg) => write!(f, "Pager error: {}", msg),
114            Self::LockPoisoned(msg) => write!(f, "Lock poisoned: {}", msg),
115        }
116    }
117}
118
119impl std::error::Error for BTreeError {}
120
121impl From<PagerError> for BTreeError {
122    fn from(e: PagerError) -> Self {
123        Self::Pager(e.to_string())
124    }
125}
126
127impl From<super::page::PageError> for BTreeError {
128    fn from(e: super::page::PageError) -> Self {
129        Self::Corrupted(e.to_string())
130    }
131}
132
133/// Result type for B+ tree operations
134pub type BTreeResult<T> = Result<T, BTreeError>;
135
136/// B+ Tree cursor for iteration
137pub struct BTreeCursor {
138    /// Current leaf page ID
139    leaf_page_id: u32,
140    /// Current position within leaf
141    position: usize,
142    /// Pager reference
143    pager: Arc<Pager>,
144    /// A6 — true once we've issued a prefetch hint for the next leaf on
145    /// this leaf. Reset to false every time we move to a new leaf so the
146    /// hint fires once per leaf, at the halfway point.
147    prefetched_next: bool,
148}
149
150impl BTreeCursor {
151    /// Move to next entry
152    pub fn next(&mut self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
153        if self.leaf_page_id == 0 {
154            return Ok(None);
155        }
156
157        let page = self.pager.read_page(self.leaf_page_id)?;
158        let cell_count = page.cell_count() as usize;
159
160        // A6 — prefetch the next leaf when we cross the halfway point of
161        // the current one. The hint fires at most once per leaf (guarded
162        // by `prefetched_next`) so the syscall overhead is amortized over
163        // ~half a leaf's worth of entries. The kernel uses the hint to
164        // start DMA-ing the next page while we finish the current one.
165        if !self.prefetched_next && self.position >= cell_count / 2 && cell_count > 0 {
166            let next_leaf = read_next_leaf(&page);
167            if next_leaf != 0 {
168                self.pager.prefetch_hint(next_leaf);
169            }
170            self.prefetched_next = true;
171        }
172
173        // Check if we have more cells in current page
174        if self.position < cell_count {
175            let (key, value) = read_leaf_cell(&page, self.position)?;
176            self.position += 1;
177            return Ok(Some((key, value)));
178        }
179
180        // Move to next leaf
181        let next_leaf = read_next_leaf(&page);
182        if next_leaf == 0 {
183            self.leaf_page_id = 0;
184            return Ok(None);
185        }
186
187        self.leaf_page_id = next_leaf;
188        self.position = 0;
189        self.prefetched_next = false; // reset for the new leaf
190
191        // Read from new leaf
192        let page = self.pager.read_page(self.leaf_page_id)?;
193        if page.cell_count() == 0 {
194            return Ok(None);
195        }
196
197        let (key, value) = read_leaf_cell(&page, 0)?;
198        self.position = 1;
199        Ok(Some((key, value)))
200    }
201
202    /// Peek at current entry without advancing
203    pub fn peek(&self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
204        if self.leaf_page_id == 0 {
205            return Ok(None);
206        }
207
208        let page = self.pager.read_page(self.leaf_page_id)?;
209        let cell_count = page.cell_count() as usize;
210
211        if self.position >= cell_count {
212            return Ok(None);
213        }
214
215        let (key, value) = read_leaf_cell(&page, self.position)?;
216        Ok(Some((key, value)))
217    }
218}
219
220/// B+ Tree implementation
221pub struct BTree {
222    /// Pager for page I/O
223    pager: Arc<Pager>,
224    /// Root page ID (0 = empty tree)
225    root_page_id: RwLock<u32>,
226    /// D2 fastpath: cached rightmost leaf (page_id, high_key).
227    ///
228    /// When the next key to insert is strictly greater than `high_key`,
229    /// skip `find_leaf` (tree descent from root) and go directly to the
230    /// cached page. Invalidated on splits and deletes.
231    ///
232    /// Mirrors `BTPageCacheIsValid` / `BTREE_FASTPATH_MIN_LEVEL` in
233    /// `nbtinsert.c:31`.
234    rightmost_leaf: RwLock<Option<(u32, Vec<u8>)>>,
235}
236
237#[path = "btree/impl.rs"]
238mod btree_impl;
239
240#[path = "btree/lehman_yao.rs"]
241pub mod lehman_yao;
242// ==================== Search Helpers ====================
243
244enum SearchResult {
245    Found(usize),
246    NotFound(usize),
247}
248
249fn search_leaf(page: &Page, key: &[u8]) -> BTreeResult<SearchResult> {
250    let cell_count = page.cell_count() as usize;
251
252    // Binary search
253    let mut low = 0;
254    let mut high = cell_count;
255
256    while low < high {
257        let mid = (low + high) / 2;
258        let (cell_key, _) = read_leaf_cell(page, mid)?;
259
260        match cell_key.as_slice().cmp(key) {
261            Ordering::Less => low = mid + 1,
262            Ordering::Greater => high = mid,
263            Ordering::Equal => return Ok(SearchResult::Found(mid)),
264        }
265    }
266
267    Ok(SearchResult::NotFound(low))
268}
269
270fn find_child(page: &Page, key: &[u8]) -> BTreeResult<u32> {
271    let cell_count = page.cell_count() as usize;
272
273    // Binary search for the correct child
274    for i in 0..cell_count {
275        let (cell_key, child) = read_interior_cell(page, i)?;
276        if key < cell_key.as_slice() {
277            return Ok(child);
278        }
279    }
280
281    // Key is >= all keys, use right child
282    Ok(page.right_child())
283}
284
285fn find_first_child(page: &Page) -> BTreeResult<u32> {
286    if page.cell_count() == 0 {
287        return Ok(page.right_child());
288    }
289    let (_, child) = read_interior_cell(page, 0)?;
290    Ok(child)
291}
292
293fn leaf_min_bytes() -> usize {
294    (PAGE_SIZE - LEAF_DATA_OFFSET) * MIN_FILL_FACTOR / 100
295}
296
297fn interior_min_bytes() -> usize {
298    (PAGE_SIZE - INTERIOR_DATA_OFFSET) * MIN_FILL_FACTOR / 100
299}
300
301/// Bytes consumed by one leaf entry in the slotted layout: one u16
302/// slot in the pointer array plus the cell itself (`[key_len:u16]
303/// [val_len:u16][key][val]`).
304fn leaf_entry_size(entry: &(Vec<u8>, Vec<u8>)) -> usize {
305    2 + 4 + entry.0.len() + entry.1.len()
306}
307
308fn leaf_entries_size(entries: &[(Vec<u8>, Vec<u8>)]) -> usize {
309    entries.iter().map(leaf_entry_size).sum()
310}
311
312#[inline]
313fn leaf_slot_offset_for(index: usize) -> usize {
314    LEAF_SLOT_ARRAY_OFFSET + index * 2
315}
316
317#[inline]
318fn leaf_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
319    let data = page.as_bytes();
320    let slot_pos = leaf_slot_offset_for(index);
321    if slot_pos + 2 > PAGE_SIZE {
322        return Err(BTreeError::Corrupted("slot array overflows page".into()));
323    }
324    Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
325}
326
327#[inline]
328fn leaf_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
329    let data = page.as_bytes_mut();
330    let slot_pos = leaf_slot_offset_for(index);
331    if slot_pos + 2 > PAGE_SIZE {
332        return Err(BTreeError::Corrupted("slot array overflows page".into()));
333    }
334    data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
335    Ok(())
336}
337
338#[inline]
339fn leaf_slots_end(page: &Page) -> usize {
340    LEAF_SLOT_ARRAY_OFFSET + (page.cell_count() as usize) * 2
341}
342
343#[inline]
344fn leaf_cells_start(page: &Page) -> usize {
345    let end = page.free_end() as usize;
346    if end == 0 {
347        PAGE_SIZE
348    } else {
349        end
350    }
351}
352
353#[inline]
354fn leaf_free_bytes(page: &Page) -> usize {
355    let slot_end = leaf_slots_end(page);
356    let cells = leaf_cells_start(page);
357    cells.saturating_sub(slot_end)
358}
359
360fn interior_key_size(key: &[u8]) -> usize {
361    2 + key.len() + 4
362}
363
364fn interior_entries_size(keys: &[Vec<u8>]) -> usize {
365    keys.iter().map(|k| interior_key_size(k)).sum()
366}
367
368// ==================== Leaf Page Helpers ====================
369
370/// Read the cell at slot `index` in O(1). Follows the u16 slot pointer
371/// into the cell data area; the cell header is `[key_len:u16][val_len:u16]`
372/// followed by the raw key and value bytes.
373fn read_leaf_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, Vec<u8>)> {
374    let cell_count = page.cell_count() as usize;
375    if index >= cell_count {
376        return Err(BTreeError::Corrupted("Cell index out of range".into()));
377    }
378    let offset = leaf_read_slot(page, index)?;
379    let data = page.as_bytes();
380    if offset + 4 > PAGE_SIZE {
381        return Err(BTreeError::Corrupted("cell header out of range".into()));
382    }
383    let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
384    let val_len = u16::from_le_bytes([data[offset + 2], data[offset + 3]]) as usize;
385    let end = offset + 4 + key_len + val_len;
386    if end > PAGE_SIZE {
387        return Err(BTreeError::Corrupted("cell body out of range".into()));
388    }
389    let key = data[offset + 4..offset + 4 + key_len].to_vec();
390    let value = data[offset + 4 + key_len..end].to_vec();
391    Ok((key, value))
392}
393
394fn read_leaf_entries(page: &Page) -> BTreeResult<Vec<(Vec<u8>, Vec<u8>)>> {
395    let cell_count = page.cell_count() as usize;
396    let mut entries = Vec::with_capacity(cell_count);
397    for i in 0..cell_count {
398        entries.push(read_leaf_cell(page, i)?);
399    }
400    Ok(entries)
401}
402
403/// Wipe and re-lay an entire leaf in slotted form. Used by the split
404/// path (which hands us a pre-sorted `entries` vector) and by
405/// `clear_leaf_cells` under the hood. Entries are appended at the end
406/// of the free area so the highest-index slot ends up at the lowest
407/// offset — same shape the single-key insert path produces.
408fn write_leaf_entries(page: &mut Page, entries: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
409    clear_leaf_cells(page);
410    for (i, (k, v)) in entries.iter().enumerate() {
411        let cell_size = 4 + k.len() + v.len();
412        let cells_start = leaf_cells_start(page);
413        let slot_end = LEAF_SLOT_ARRAY_OFFSET + (i + 1) * 2;
414        if slot_end + cell_size > cells_start {
415            return Err(BTreeError::Corrupted("leaf rebuild overflowed page".into()));
416        }
417        let cell_offset = cells_start - cell_size;
418        {
419            let data = page.as_bytes_mut();
420            data[cell_offset..cell_offset + 2].copy_from_slice(&(k.len() as u16).to_le_bytes());
421            data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(v.len() as u16).to_le_bytes());
422            data[cell_offset + 4..cell_offset + 4 + k.len()].copy_from_slice(k);
423            data[cell_offset + 4 + k.len()..cell_offset + 4 + k.len() + v.len()].copy_from_slice(v);
424        }
425        page.set_free_end(cell_offset as u16);
426        leaf_write_slot(page, i, cell_offset as u16)?;
427    }
428    page.set_cell_count(entries.len() as u16);
429    page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + entries.len() * 2) as u16);
430    Ok(())
431}
432
433fn can_insert_leaf(page: &Page, key: &[u8], value: &[u8]) -> bool {
434    let needed = 2 + 4 + key.len() + value.len();
435    leaf_free_bytes(page) >= needed
436}
437
438/// Insert `(key, value)` into the slotted leaf in O(log M) search +
439/// O(M) slot-array memmove. Cell data is appended at the tail of the
440/// free area (backward from `free_end`); the slot pointer is inserted
441/// Overwrite the value portion of cell at `index` in-place. Valid when
442/// `new_value.len() <= existing_value.len()` — the caller
443/// (typically `BTree::upsert`) checks this before calling.
444///
445/// Same-length writes are trivial (byte copy). Shrinks update the cell's
446/// `v_len` header and leave the trailing bytes as dead space until the
447/// page is naturally rewritten; the slotted layout keeps cells
448/// addressable by their own offset+len so the gap is harmless.
449///
450/// Fixed-size column updates (SET score = X on a typed schema) hit this
451/// path, avoiding the delete + re-insert + rebalance cycle that
452/// dominates bulk_update.
453pub(crate) fn overwrite_leaf_value(
454    page: &mut Page,
455    index: usize,
456    new_value: &[u8],
457) -> BTreeResult<()> {
458    let cell_offset = leaf_read_slot(page, index)?;
459    let data = page.as_bytes();
460    if cell_offset + 4 > data.len() {
461        return Err(BTreeError::Corrupted(
462            "leaf cell header out of bounds".into(),
463        ));
464    }
465    let k_len = u16::from_le_bytes([data[cell_offset], data[cell_offset + 1]]) as usize;
466    let old_v_len = u16::from_le_bytes([data[cell_offset + 2], data[cell_offset + 3]]) as usize;
467    if new_value.len() > old_v_len {
468        return Err(BTreeError::Corrupted(
469            "overwrite_leaf_value: new value larger than slot".into(),
470        ));
471    }
472    let value_start = cell_offset + 4 + k_len;
473    if value_start + new_value.len() > data.len() {
474        return Err(BTreeError::Corrupted(
475            "leaf cell value out of bounds".into(),
476        ));
477    }
478    let data = page.as_bytes_mut();
479    // Update the v_len header if the value shrank. read_leaf_cell reads
480    // v_len from this header so the shortened value deserializes correctly.
481    if new_value.len() != old_v_len {
482        let new_len = new_value.len() as u16;
483        data[cell_offset + 2..cell_offset + 4].copy_from_slice(&new_len.to_le_bytes());
484    }
485    data[value_start..value_start + new_value.len()].copy_from_slice(new_value);
486    Ok(())
487}
488
489/// at the sorted position.
490fn insert_into_leaf(page: &mut Page, key: &[u8], value: &[u8]) -> BTreeResult<()> {
491    // 1. Binary search the slot array to find the insertion index.
492    let cell_count = page.cell_count() as usize;
493    let mut low = 0;
494    let mut high = cell_count;
495    while low < high {
496        let mid = (low + high) / 2;
497        let (cell_key, _) = read_leaf_cell(page, mid)?;
498        match cell_key.as_slice().cmp(key) {
499            Ordering::Less => low = mid + 1,
500            Ordering::Greater => high = mid,
501            Ordering::Equal => {
502                // Duplicate keys are tolerated by the B-tree (caller
503                // decides semantics); append after the existing run.
504                low = mid + 1;
505                break;
506            }
507        }
508    }
509    let insert_pos = low;
510
511    // 2. Reserve the cell at the tail of the free area.
512    let cell_size = 4 + key.len() + value.len();
513    let slot_end_after = LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2;
514    let cells_start = leaf_cells_start(page);
515    if slot_end_after + cell_size > cells_start {
516        return Err(BTreeError::Corrupted("leaf page full".into()));
517    }
518    let cell_offset = cells_start - cell_size;
519    {
520        let data = page.as_bytes_mut();
521        data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
522        data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(value.len() as u16).to_le_bytes());
523        data[cell_offset + 4..cell_offset + 4 + key.len()].copy_from_slice(key);
524        data[cell_offset + 4 + key.len()..cell_offset + 4 + key.len() + value.len()]
525            .copy_from_slice(value);
526    }
527    page.set_free_end(cell_offset as u16);
528
529    // 3. Shift the slot-array tail right by one slot, then write the
530    //    new pointer into the freed slot. This is a single memmove on
531    //    a couple dozen u16s — far cheaper than the O(M²) rebuild.
532    {
533        let shift_from = leaf_slot_offset_for(insert_pos);
534        let shift_to = shift_from + 2;
535        let shift_len = (cell_count - insert_pos) * 2;
536        if shift_len > 0 {
537            let data = page.as_bytes_mut();
538            data.copy_within(shift_from..shift_from + shift_len, shift_to);
539        }
540    }
541    leaf_write_slot(page, insert_pos, cell_offset as u16)?;
542
543    // 4. Bump counters.
544    page.set_cell_count((cell_count + 1) as u16);
545    page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2) as u16);
546    Ok(())
547}
548
549/// Remove the slot at `index`. Cell bytes are left in place (lazy
550/// compaction); the slot-array tail is shifted left to close the gap.
551/// The caller is expected to call `clear_leaf_cells` + rebuild if the
552/// page wants its free space reclaimed.
553fn delete_from_leaf(page: &mut Page, index: usize) -> BTreeResult<()> {
554    let cell_count = page.cell_count() as usize;
555    if index >= cell_count {
556        return Err(BTreeError::Corrupted("delete index out of range".into()));
557    }
558    if index + 1 < cell_count {
559        let shift_from = leaf_slot_offset_for(index + 1);
560        let shift_to = leaf_slot_offset_for(index);
561        let shift_len = (cell_count - index - 1) * 2;
562        let data = page.as_bytes_mut();
563        data.copy_within(shift_from..shift_from + shift_len, shift_to);
564    }
565    page.set_cell_count((cell_count - 1) as u16);
566    page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count - 1) * 2) as u16);
567    Ok(())
568}
569
570/// Reset the leaf to an empty slotted state — cell count 0, free
571/// cursors at the extremes. Zeroes the slot array + cell data area so
572/// stale bytes never leak through a corrupted read.
573fn clear_leaf_cells(page: &mut Page) {
574    {
575        let data = page.as_bytes_mut();
576        for byte in &mut data[LEAF_SLOT_ARRAY_OFFSET..] {
577            *byte = 0;
578        }
579    }
580    page.set_cell_count(0);
581    page.set_free_start(LEAF_SLOT_ARRAY_OFFSET as u16);
582    page.set_free_end(PAGE_SIZE as u16);
583}
584
585fn init_leaf_links(page: &mut Page, prev: u32, next: u32) {
586    let data = page.as_bytes_mut();
587    data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
588    data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
589}
590
591fn read_next_leaf(page: &Page) -> u32 {
592    let data = page.as_bytes();
593    u32::from_le_bytes([
594        data[LEAF_NEXT_OFFSET],
595        data[LEAF_NEXT_OFFSET + 1],
596        data[LEAF_NEXT_OFFSET + 2],
597        data[LEAF_NEXT_OFFSET + 3],
598    ])
599}
600
601/// Returns the right-sibling page ID for a leaf page.
602/// 0 means this is the rightmost leaf — used by the D2 fastpath cache.
603#[inline]
604fn leaf_right_sibling(page: &Page) -> u32 {
605    read_next_leaf(page)
606}
607
608/// Returns the maximum (last) key stored in a leaf page, or `None` if empty.
609/// This is the Lehman-Yao "high_key" — any key > this value belongs on a
610/// right-sibling page. Reads only the last slot so it is O(1).
611fn leaf_high_key(page: &Page) -> BTreeResult<Option<Vec<u8>>> {
612    let cell_count = page.cell_count() as usize;
613    if cell_count == 0 {
614        return Ok(None);
615    }
616    let (key, _) = read_leaf_cell(page, cell_count - 1)?;
617    Ok(Some(key))
618}
619
620fn set_prev_leaf(page: &mut Page, prev: u32) {
621    let data = page.as_bytes_mut();
622    data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
623}
624
625fn set_next_leaf(page: &mut Page, next: u32) {
626    let data = page.as_bytes_mut();
627    data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
628}
629
630// ==================== Interior Page Helpers ====================
631//
632// Slotted-page layout mirroring the leaf format:
633//
634// ```text
635// [PageHeader 32B]
636// [slot 0: u16][slot 1: u16]...[slot N-1: u16]  ← grows forward
637// ... free space ...
638// [cell N-1][cell N-2]...[cell 0]               ← grows backward from free_end
639// ```
640//
641// Each cell is `[key_len:u16][key][child:u32]`. The right-most child
642// pointer lives in `PageHeader.right_child` exactly as before.
643
644#[inline]
645fn interior_slot_offset_for(index: usize) -> usize {
646    INTERIOR_DATA_OFFSET + index * 2
647}
648
649#[inline]
650fn interior_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
651    let data = page.as_bytes();
652    let slot_pos = interior_slot_offset_for(index);
653    if slot_pos + 2 > PAGE_SIZE {
654        return Err(BTreeError::Corrupted(
655            "interior slot array overflows page".into(),
656        ));
657    }
658    Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
659}
660
661#[inline]
662fn interior_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
663    let data = page.as_bytes_mut();
664    let slot_pos = interior_slot_offset_for(index);
665    if slot_pos + 2 > PAGE_SIZE {
666        return Err(BTreeError::Corrupted(
667            "interior slot array overflows page".into(),
668        ));
669    }
670    data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
671    Ok(())
672}
673
674#[inline]
675fn interior_cells_start(page: &Page) -> usize {
676    let end = page.free_end() as usize;
677    if end == 0 {
678        PAGE_SIZE
679    } else {
680        end
681    }
682}
683
684#[inline]
685fn interior_free_bytes(page: &Page) -> usize {
686    let slot_end = INTERIOR_DATA_OFFSET + (page.cell_count() as usize) * 2;
687    interior_cells_start(page).saturating_sub(slot_end)
688}
689
690fn read_interior_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, u32)> {
691    let cell_count = page.cell_count() as usize;
692    if index >= cell_count {
693        return Err(BTreeError::Corrupted("Cell index out of range".into()));
694    }
695    let offset = interior_read_slot(page, index)?;
696    let data = page.as_bytes();
697    if offset + 2 > PAGE_SIZE {
698        return Err(BTreeError::Corrupted(
699            "interior cell header out of range".into(),
700        ));
701    }
702    let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
703    let end = offset + 2 + key_len + 4;
704    if end > PAGE_SIZE {
705        return Err(BTreeError::Corrupted(
706            "interior cell body out of range".into(),
707        ));
708    }
709    let key = data[offset + 2..offset + 2 + key_len].to_vec();
710    let child = u32::from_le_bytes([
711        data[offset + 2 + key_len],
712        data[offset + 2 + key_len + 1],
713        data[offset + 2 + key_len + 2],
714        data[offset + 2 + key_len + 3],
715    ]);
716    Ok((key, child))
717}
718
719fn read_interior_keys_children(page: &Page) -> BTreeResult<(Vec<Vec<u8>>, Vec<u32>)> {
720    let cell_count = page.cell_count() as usize;
721    let mut keys = Vec::with_capacity(cell_count);
722    let mut children = Vec::with_capacity(cell_count + 1);
723
724    for i in 0..cell_count {
725        let (key, child) = read_interior_cell(page, i)?;
726        keys.push(key);
727        children.push(child);
728    }
729
730    if cell_count == 0 {
731        let right_child = page.right_child();
732        if right_child != 0 {
733            children.push(right_child);
734        }
735    } else {
736        children.push(page.right_child());
737    }
738
739    Ok((keys, children))
740}
741
742/// Bulk-write an interior page from scratch in slotted form. Used by
743/// the split path. `keys` and `children` follow the B+ tree contract:
744/// `children.len() == keys.len() + 1`, with the last child landing in
745/// `PageHeader.right_child` (as before).
746fn write_interior_entries(page: &mut Page, keys: &[Vec<u8>], children: &[u32]) -> BTreeResult<()> {
747    if !keys.is_empty() && children.len() != keys.len() + 1 {
748        return Err(BTreeError::Corrupted(
749            "Interior keys/children length mismatch".into(),
750        ));
751    }
752
753    clear_interior_cells(page);
754    if keys.is_empty() {
755        let right_child = children.first().copied().unwrap_or(0);
756        page.set_right_child(right_child);
757        return Ok(());
758    }
759
760    for (i, key) in keys.iter().enumerate() {
761        let cell_size = 2 + key.len() + 4;
762        let cells_start = interior_cells_start(page);
763        let slot_end = INTERIOR_DATA_OFFSET + (i + 1) * 2;
764        if slot_end + cell_size > cells_start {
765            return Err(BTreeError::Corrupted(
766                "interior rebuild overflowed page".into(),
767            ));
768        }
769        let cell_offset = cells_start - cell_size;
770        {
771            let data = page.as_bytes_mut();
772            data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
773            data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
774            data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
775                .copy_from_slice(&children[i].to_le_bytes());
776        }
777        page.set_free_end(cell_offset as u16);
778        interior_write_slot(page, i, cell_offset as u16)?;
779    }
780    page.set_cell_count(keys.len() as u16);
781    page.set_free_start((INTERIOR_DATA_OFFSET + keys.len() * 2) as u16);
782    page.set_right_child(*children.last().ok_or_else(|| {
783        BTreeError::Corrupted("write_interior_entries: children empty with non-empty keys".into())
784    })?);
785    Ok(())
786}
787
788fn can_insert_interior(page: &Page, key: &[u8]) -> bool {
789    let needed = 2 + 2 + key.len() + 4;
790    interior_free_bytes(page) >= needed
791}
792
793/// Insert a `(key, left_child)` separator into the interior page.
794/// `right_child` replaces whatever child used to sit to the right of
795/// the inserted key (either a middle child's pointer or the page's
796/// `right_child` when the key is the new maximum).
797fn insert_into_interior(
798    page: &mut Page,
799    key: &[u8],
800    left_child: u32,
801    right_child: u32,
802) -> BTreeResult<()> {
803    // 1. Binary search the slot array for the insert position.
804    let cell_count = page.cell_count() as usize;
805    let mut low = 0;
806    let mut high = cell_count;
807    while low < high {
808        let mid = (low + high) / 2;
809        let (cell_key, _) = read_interior_cell(page, mid)?;
810        match cell_key.as_slice().cmp(key) {
811            Ordering::Less => low = mid + 1,
812            Ordering::Greater | Ordering::Equal => high = mid,
813        }
814    }
815    let insert_pos = low;
816
817    // 2. Redirect the previous owner of the split to `right_child`.
818    //    If the insertion is at the tail, the previous owner was the
819    //    page's right_child pointer; otherwise it was the child slot
820    //    of the cell currently at `insert_pos`.
821    if insert_pos < cell_count {
822        let old_offset = interior_read_slot(page, insert_pos)?;
823        let data = page.as_bytes();
824        let key_len = u16::from_le_bytes([data[old_offset], data[old_offset + 1]]) as usize;
825        let child_pos = old_offset + 2 + key_len;
826        let data = page.as_bytes_mut();
827        data[child_pos..child_pos + 4].copy_from_slice(&right_child.to_le_bytes());
828    } else {
829        page.set_right_child(right_child);
830    }
831
832    // 3. Reserve the new cell at the tail of the free area.
833    let cell_size = 2 + key.len() + 4;
834    let slot_end_after = INTERIOR_DATA_OFFSET + (cell_count + 1) * 2;
835    let cells_start = interior_cells_start(page);
836    if slot_end_after + cell_size > cells_start {
837        return Err(BTreeError::Corrupted("interior page full".into()));
838    }
839    let cell_offset = cells_start - cell_size;
840    {
841        let data = page.as_bytes_mut();
842        data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
843        data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
844        data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
845            .copy_from_slice(&left_child.to_le_bytes());
846    }
847    page.set_free_end(cell_offset as u16);
848
849    // 4. Shift the slot-array tail right by one slot, write the new
850    //    pointer into the freed slot, bump counters.
851    {
852        let shift_from = interior_slot_offset_for(insert_pos);
853        let shift_to = shift_from + 2;
854        let shift_len = (cell_count - insert_pos) * 2;
855        if shift_len > 0 {
856            let data = page.as_bytes_mut();
857            data.copy_within(shift_from..shift_from + shift_len, shift_to);
858        }
859    }
860    interior_write_slot(page, insert_pos, cell_offset as u16)?;
861    page.set_cell_count((cell_count + 1) as u16);
862    page.set_free_start((INTERIOR_DATA_OFFSET + (cell_count + 1) * 2) as u16);
863    Ok(())
864}
865
866fn clear_interior_cells(page: &mut Page) {
867    {
868        let data = page.as_bytes_mut();
869        for byte in &mut data[INTERIOR_DATA_OFFSET..] {
870            *byte = 0;
871        }
872    }
873    page.set_cell_count(0);
874    page.set_free_start(INTERIOR_DATA_OFFSET as u16);
875    page.set_free_end(PAGE_SIZE as u16);
876}
877
878#[cfg(test)]
879mod tests {
880    use super::*;
881    use std::path::PathBuf;
882
883    fn temp_db_path() -> PathBuf {
884        use std::sync::atomic::{AtomicU64, Ordering};
885        static COUNTER: AtomicU64 = AtomicU64::new(0);
886        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
887        let mut path = std::env::temp_dir();
888        path.push(format!("reddb_btree_test_{}_{}.db", std::process::id(), id));
889        path
890    }
891
892    fn cleanup(path: &PathBuf) {
893        let _ = std::fs::remove_file(path);
894    }
895
896    #[test]
897    fn test_btree_empty() {
898        let path = temp_db_path();
899        cleanup(&path);
900
901        let pager = Arc::new(Pager::open_default(&path).unwrap());
902        let tree = BTree::new(pager);
903
904        assert!(tree.is_empty());
905        assert_eq!(tree.root_page_id(), 0);
906        assert_eq!(tree.get(b"key").unwrap(), None);
907        assert_eq!(tree.count().unwrap(), 0);
908
909        cleanup(&path);
910    }
911
912    #[test]
913    fn test_btree_single_insert() {
914        let path = temp_db_path();
915        cleanup(&path);
916
917        let pager = Arc::new(Pager::open_default(&path).unwrap());
918        let tree = BTree::new(pager);
919
920        tree.insert(b"hello", b"world").unwrap();
921
922        assert!(!tree.is_empty());
923        assert_eq!(tree.get(b"hello").unwrap(), Some(b"world".to_vec()));
924        assert_eq!(tree.get(b"other").unwrap(), None);
925        assert_eq!(tree.count().unwrap(), 1);
926
927        cleanup(&path);
928    }
929
930    #[test]
931    fn test_btree_multiple_inserts() {
932        let path = temp_db_path();
933        cleanup(&path);
934
935        let pager = Arc::new(Pager::open_default(&path).unwrap());
936        let tree = BTree::new(pager);
937
938        tree.insert(b"c", b"3").unwrap();
939        tree.insert(b"a", b"1").unwrap();
940        tree.insert(b"b", b"2").unwrap();
941
942        assert_eq!(tree.get(b"a").unwrap(), Some(b"1".to_vec()));
943        assert_eq!(tree.get(b"b").unwrap(), Some(b"2".to_vec()));
944        assert_eq!(tree.get(b"c").unwrap(), Some(b"3".to_vec()));
945        assert_eq!(tree.count().unwrap(), 3);
946
947        cleanup(&path);
948    }
949
950    #[test]
951    fn test_btree_duplicate_key() {
952        let path = temp_db_path();
953        cleanup(&path);
954
955        let pager = Arc::new(Pager::open_default(&path).unwrap());
956        let tree = BTree::new(pager);
957
958        tree.insert(b"key", b"value1").unwrap();
959        let result = tree.insert(b"key", b"value2");
960
961        assert!(matches!(result, Err(BTreeError::DuplicateKey)));
962        assert_eq!(tree.get(b"key").unwrap(), Some(b"value1".to_vec()));
963
964        cleanup(&path);
965    }
966
967    #[test]
968    fn test_btree_delete() {
969        let path = temp_db_path();
970        cleanup(&path);
971
972        let pager = Arc::new(Pager::open_default(&path).unwrap());
973        let tree = BTree::new(pager);
974
975        tree.insert(b"a", b"1").unwrap();
976        tree.insert(b"b", b"2").unwrap();
977        tree.insert(b"c", b"3").unwrap();
978
979        assert!(tree.delete(b"b").unwrap());
980        assert!(!tree.delete(b"d").unwrap());
981
982        assert_eq!(tree.get(b"a").unwrap(), Some(b"1".to_vec()));
983        assert_eq!(tree.get(b"b").unwrap(), None);
984        assert_eq!(tree.get(b"c").unwrap(), Some(b"3".to_vec()));
985        assert_eq!(tree.count().unwrap(), 2);
986
987        cleanup(&path);
988    }
989
990    #[test]
991    fn test_btree_delete_rebalance_removes_empty_leaf() {
992        let path = temp_db_path();
993        cleanup(&path);
994
995        let pager = Arc::new(Pager::open_default(&path).unwrap());
996        let tree = BTree::new(pager.clone());
997
998        let value = vec![b'v'; 200];
999        for i in 0..300u32 {
1000            let key = format!("key{:03}", i);
1001            tree.insert(key.as_bytes(), &value).unwrap();
1002        }
1003
1004        let root_id = tree.root_page_id();
1005        let first_leaf = tree.find_first_leaf(root_id).unwrap();
1006        let mut leaf_ids = Vec::new();
1007        let mut current = first_leaf;
1008        loop {
1009            leaf_ids.push(current);
1010            let page = pager.read_page(current).unwrap();
1011            let next = read_next_leaf(&page);
1012            if next == 0 {
1013                break;
1014            }
1015            current = next;
1016        }
1017
1018        assert!(leaf_ids.len() >= 3);
1019
1020        let target_leaf = leaf_ids[1];
1021        let page = pager.read_page(target_leaf).unwrap();
1022        let cell_count = page.cell_count() as usize;
1023        let mut keys = Vec::with_capacity(cell_count);
1024        for i in 0..cell_count {
1025            let (key, _) = read_leaf_cell(&page, i).unwrap();
1026            keys.push(key);
1027        }
1028
1029        for key in &keys {
1030            tree.delete(key).unwrap();
1031        }
1032
1033        let expected = 300 - keys.len();
1034        assert_eq!(tree.count().unwrap(), expected);
1035
1036        let mut cursor = tree.cursor_first().unwrap();
1037        let mut results = Vec::new();
1038        while let Some((key, _)) = cursor.next().unwrap() {
1039            results.push(key);
1040        }
1041
1042        assert_eq!(results.len(), expected);
1043        let last_key = format!("key{:03}", 299).into_bytes();
1044        assert_eq!(results.last(), Some(&last_key));
1045
1046        cleanup(&path);
1047    }
1048
1049    #[test]
1050    fn test_btree_cursor() {
1051        let path = temp_db_path();
1052        cleanup(&path);
1053
1054        let pager = Arc::new(Pager::open_default(&path).unwrap());
1055        let tree = BTree::new(pager);
1056
1057        tree.insert(b"c", b"3").unwrap();
1058        tree.insert(b"a", b"1").unwrap();
1059        tree.insert(b"b", b"2").unwrap();
1060
1061        let mut cursor = tree.cursor_first().unwrap();
1062        let mut results: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1063
1064        while let Some(entry) = cursor.next().unwrap() {
1065            results.push(entry);
1066        }
1067
1068        assert_eq!(results.len(), 3);
1069        assert_eq!(results[0], (b"a".to_vec(), b"1".to_vec()));
1070        assert_eq!(results[1], (b"b".to_vec(), b"2".to_vec()));
1071        assert_eq!(results[2], (b"c".to_vec(), b"3".to_vec()));
1072
1073        cleanup(&path);
1074    }
1075
1076    #[test]
1077    fn test_btree_range() {
1078        let path = temp_db_path();
1079        cleanup(&path);
1080
1081        let pager = Arc::new(Pager::open_default(&path).unwrap());
1082        let tree = BTree::new(pager);
1083
1084        for i in 0..10u8 {
1085            let key = format!("key{:02}", i);
1086            let value = format!("val{:02}", i);
1087            tree.insert(key.as_bytes(), value.as_bytes()).unwrap();
1088        }
1089
1090        let results = tree.range(b"key03", b"key06").unwrap();
1091
1092        assert_eq!(results.len(), 4);
1093        assert_eq!(results[0].0, b"key03".to_vec());
1094        assert_eq!(results[3].0, b"key06".to_vec());
1095
1096        cleanup(&path);
1097    }
1098
1099    #[test]
1100    fn test_btree_large_keys() {
1101        let path = temp_db_path();
1102        cleanup(&path);
1103
1104        let pager = Arc::new(Pager::open_default(&path).unwrap());
1105        let tree = BTree::new(pager);
1106
1107        let key = vec![b'x'; 500];
1108        let value = vec![b'y'; 500];
1109
1110        tree.insert(&key, &value).unwrap();
1111        assert_eq!(tree.get(&key).unwrap(), Some(value));
1112
1113        cleanup(&path);
1114    }
1115
1116    #[test]
1117    fn test_btree_key_too_large() {
1118        let path = temp_db_path();
1119        cleanup(&path);
1120
1121        let pager = Arc::new(Pager::open_default(&path).unwrap());
1122        let tree = BTree::new(pager);
1123
1124        let key = vec![b'x'; MAX_KEY_SIZE + 1];
1125        let result = tree.insert(&key, b"value");
1126
1127        assert!(matches!(result, Err(BTreeError::KeyTooLarge(_))));
1128
1129        cleanup(&path);
1130    }
1131
1132    #[test]
1133    fn test_btree_many_inserts() {
1134        let path = temp_db_path();
1135        cleanup(&path);
1136
1137        let pager = Arc::new(Pager::open_default(&path).unwrap());
1138        let tree = BTree::new(pager);
1139
1140        // Insert enough entries to force splits
1141        for i in 0..100u32 {
1142            let key = format!("key{:08}", i);
1143            let value = format!("value{:08}", i);
1144            tree.insert(key.as_bytes(), value.as_bytes()).unwrap();
1145        }
1146
1147        // Verify all entries
1148        for i in 0..100u32 {
1149            let key = format!("key{:08}", i);
1150            let expected = format!("value{:08}", i);
1151            assert_eq!(
1152                tree.get(key.as_bytes()).unwrap(),
1153                Some(expected.into_bytes())
1154            );
1155        }
1156
1157        assert_eq!(tree.count().unwrap(), 100);
1158
1159        cleanup(&path);
1160    }
1161
1162    #[test]
1163    fn test_btree_bulk_insert_sorted_preserves_leaf_layout() {
1164        let path = temp_db_path();
1165        cleanup(&path);
1166
1167        let pager = Arc::new(Pager::open_default(&path).unwrap());
1168        let tree = BTree::new(pager);
1169
1170        let items: Vec<(Vec<u8>, Vec<u8>)> = (0..240u32)
1171            .map(|i| {
1172                (
1173                    format!("key{:08}", i).into_bytes(),
1174                    vec![b'v'; 32 + (i as usize % 7)],
1175                )
1176            })
1177            .collect();
1178
1179        tree.bulk_insert_sorted(&items).unwrap();
1180
1181        assert_eq!(tree.count().unwrap(), items.len());
1182
1183        for (key, value) in &items {
1184            assert_eq!(tree.get(key).unwrap(), Some(value.clone()));
1185        }
1186
1187        cleanup(&path);
1188    }
1189
1190    /// Property test for `BTree::upsert_batch_sorted` (#159).
1191    ///
1192    /// Generates random batches of (key, value) updates and applies them to
1193    /// two independently-built trees: the baseline uses the existing
1194    /// loop-of-`upsert` path, and the candidate uses `upsert_batch_sorted`
1195    /// after the caller-side sort.  Both trees must end up with identical
1196    /// key→value contents (verified via per-key `get` and a full cursor
1197    /// scan), proving the new path is observationally equivalent for
1198    /// arbitrary batches up to 200 entries.
1199    mod prop_upsert_batch_sorted {
1200        use super::*;
1201        use proptest::prelude::*;
1202        use std::collections::BTreeMap;
1203
1204        /// Build a tree pre-populated with `seed` and return the path so
1205        /// the caller can clean it up.
1206        fn populate_tree(seed: &[(Vec<u8>, Vec<u8>)]) -> (BTree, PathBuf, Arc<Pager>) {
1207            let path = temp_db_path();
1208            cleanup(&path);
1209            let pager = Arc::new(Pager::open_default(&path).unwrap());
1210            let tree = BTree::new(Arc::clone(&pager));
1211            for (k, v) in seed {
1212                tree.upsert(k, v).unwrap();
1213            }
1214            (tree, path, pager)
1215        }
1216
1217        fn key_strategy() -> impl Strategy<Value = Vec<u8>> {
1218            // Short keys keep many entries per leaf so a single batch
1219            // hits multiple keys in the same leaf — the path the new
1220            // method optimises.  1..=4 bytes from a small alphabet.
1221            prop::collection::vec(0u8..16u8, 1..=4)
1222        }
1223
1224        fn value_strategy() -> impl Strategy<Value = Vec<u8>> {
1225            // Tiny values — same-or-shrink fits the in-place overwrite
1226            // fast path; growing values fall back to the generic
1227            // single-key `upsert`, exercising both branches.
1228            prop::collection::vec(0u8..255u8, 0..=8)
1229        }
1230
1231        fn pair_strategy() -> impl Strategy<Value = (Vec<u8>, Vec<u8>)> {
1232            (key_strategy(), value_strategy())
1233        }
1234
1235        proptest! {
1236            #![proptest_config(ProptestConfig {
1237                cases: 64,
1238                .. ProptestConfig::default()
1239            })]
1240
1241            #[test]
1242            fn batch_upsert_matches_loop_upsert(
1243                seed in prop::collection::vec(pair_strategy(), 0..50),
1244                batch in prop::collection::vec(pair_strategy(), 1..200),
1245            ) {
1246                // Dedup the seed by key (last write wins) so both trees
1247                // start from the same well-defined state.
1248                let mut seed_map: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1249                for (k, v) in seed {
1250                    seed_map.insert(k, v);
1251                }
1252                let seed: Vec<(Vec<u8>, Vec<u8>)> = seed_map.into_iter().collect();
1253
1254                // Baseline: loop-of-upsert.
1255                let (baseline, baseline_path, _baseline_pager) = populate_tree(&seed);
1256                for (k, v) in &batch {
1257                    baseline.upsert(k, v).unwrap();
1258                }
1259
1260                // Candidate: upsert_batch_sorted after caller-side sort
1261                // (matches the contract documented on the method).
1262                let (candidate, candidate_path, _candidate_pager) = populate_tree(&seed);
1263                let mut sorted = batch.clone();
1264                sorted.sort_by(|a, b| a.0.cmp(&b.0));
1265                candidate.upsert_batch_sorted(&sorted).unwrap();
1266
1267                // Compute the expected end state from BTreeMap (last
1268                // write wins), independent of insertion order.
1269                let mut expected: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1270                for (k, v) in &seed {
1271                    expected.insert(k.clone(), v.clone());
1272                }
1273                for (k, v) in &batch {
1274                    expected.insert(k.clone(), v.clone());
1275                }
1276
1277                // Per-key get matches expected on both trees.
1278                for (k, v) in &expected {
1279                    let baseline_got = baseline.get(k).unwrap();
1280                    let candidate_got = candidate.get(k).unwrap();
1281                    prop_assert_eq!(baseline_got.as_ref(), Some(v));
1282                    prop_assert_eq!(candidate_got.as_ref(), Some(v));
1283                }
1284
1285                // Full cursor scan yields identical sorted contents.
1286                let collect = |t: &BTree| -> Vec<(Vec<u8>, Vec<u8>)> {
1287                    let mut out = Vec::new();
1288                    let mut cur = t.cursor_first().unwrap();
1289                    while let Some(entry) = cur.next().unwrap() {
1290                        out.push(entry);
1291                    }
1292                    out
1293                };
1294                let baseline_contents = collect(&baseline);
1295                let candidate_contents = collect(&candidate);
1296                prop_assert_eq!(&baseline_contents, &candidate_contents);
1297
1298                // And both equal the BTreeMap-derived expectation.
1299                let expected_contents: Vec<(Vec<u8>, Vec<u8>)> =
1300                    expected.into_iter().collect();
1301                prop_assert_eq!(&candidate_contents, &expected_contents);
1302
1303                cleanup(&baseline_path);
1304                cleanup(&candidate_path);
1305            }
1306        }
1307    }
1308
1309    #[test]
1310    fn test_btree_sorted_iteration() {
1311        let path = temp_db_path();
1312        cleanup(&path);
1313
1314        let pager = Arc::new(Pager::open_default(&path).unwrap());
1315        let tree = BTree::new(pager);
1316
1317        // Insert in random order
1318        let keys = vec![50, 25, 75, 10, 30, 60, 80, 5, 15, 27, 35, 55, 65, 77, 90];
1319        for k in &keys {
1320            let key = format!("{:03}", k);
1321            tree.insert(key.as_bytes(), key.as_bytes()).unwrap();
1322        }
1323
1324        // Should iterate in sorted order
1325        let mut cursor = tree.cursor_first().unwrap();
1326        let mut prev: Option<Vec<u8>> = None;
1327
1328        while let Some((key, _)) = cursor.next().unwrap() {
1329            if let Some(p) = &prev {
1330                assert!(p < &key, "Keys not in sorted order");
1331            }
1332            prev = Some(key);
1333        }
1334
1335        cleanup(&path);
1336    }
1337}