Skip to main content

reddb_server/storage/engine/
btree.rs

1//! B+ Tree Implementation for Page-Based Storage
2//!
3//! A B+ tree optimized for disk I/O with the following properties:
4//! - All values stored in leaf nodes
5//! - Interior nodes only contain keys and child pointers
6//! - Leaf nodes form a doubly-linked list for range scans
7//! - Each node fits in a single 4KB page
8//!
9//! # Page Layout
10//!
11//! Interior Node:
12//! ```text
13//! ┌────────────────────────────────────────────────────────────┐
14//! │ PageHeader (32 bytes)                                      │
15//! ├────────────────────────────────────────────────────────────┤
16//! │ right_child: u32 - Rightmost child pointer                 │
17//! │ cells: [key_len: u16, key: [u8], child: u32]...           │
18//! └────────────────────────────────────────────────────────────┘
19//! ```
20//!
21//! Leaf Node:
22//! ```text
23//! ┌────────────────────────────────────────────────────────────┐
24//! │ PageHeader (32 bytes)                                      │
25//! ├────────────────────────────────────────────────────────────┤
26//! │ prev_leaf: u32 - Previous leaf (0 = none)                  │
27//! │ next_leaf: u32 - Next leaf (0 = none)                      │
28//! │ cells: [key_len: u16, val_len: u16, key: [u8], val: [u8]] │
29//! └────────────────────────────────────────────────────────────┘
30//! ```
31
32use std::cmp::Ordering;
33use std::sync::{Arc, RwLock};
34
35use super::page::{Page, PageType, HEADER_SIZE, PAGE_SIZE};
36use super::pager::{Pager, PagerError};
37
38/// Maximum key size (to ensure at least 2 keys per node)
39pub const MAX_KEY_SIZE: usize = 1024;
40
41/// Maximum value size for inline storage
42pub const MAX_VALUE_SIZE: usize = 1024;
43
44/// Minimum fill factor before merge (as percentage)
45pub const MIN_FILL_FACTOR: usize = 25;
46
47/// Offset of prev_leaf in leaf page
48const LEAF_PREV_OFFSET: usize = HEADER_SIZE;
49
50/// Offset of next_leaf in leaf page
51const LEAF_NEXT_OFFSET: usize = HEADER_SIZE + 4;
52
53/// Start of the slot array in leaf pages.
54///
55/// Slotted-page layout:
56/// ```text
57/// [PageHeader 32B][prev u32 | next u32 | 8B]
58/// [slot 0: u16][slot 1: u16]...[slot N-1: u16]  ← grows forward
59/// ... free space ...
60/// [cell N-1][cell N-2]...[cell 0]               ← grows backward from free_end
61/// ```
62/// Each cell is laid out as `[key_len:u16][val_len:u16][key][val]`.
63/// `page.cell_count()` is N; `page.free_end()` is the offset of the
64/// lowest (most recently written) cell. The slot array lives right
65/// after the leaf-chain links and each u16 slot is the absolute page
66/// offset of its cell.
67const LEAF_SLOT_ARRAY_OFFSET: usize = HEADER_SIZE + 8;
68
69/// Kept for source-compat with older code paths (e.g. interior-node
70/// helpers); equivalent to `LEAF_SLOT_ARRAY_OFFSET` for leaves.
71const LEAF_DATA_OFFSET: usize = LEAF_SLOT_ARRAY_OFFSET;
72
73/// Start of cell data in interior pages (right_child is in header)
74const INTERIOR_DATA_OFFSET: usize = HEADER_SIZE;
75
76/// B+ Tree error types
77#[derive(Debug, Clone)]
78pub enum BTreeError {
79    /// Key not found
80    NotFound,
81    /// Key already exists
82    DuplicateKey,
83    /// Key too large
84    KeyTooLarge(usize),
85    /// Value too large
86    ValueTooLarge(usize),
87    /// Tree is corrupted
88    Corrupted(String),
89    /// Pager error
90    Pager(String),
91    /// Internal lock was poisoned by a panicked thread
92    LockPoisoned(String),
93}
94
95impl std::fmt::Display for BTreeError {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        match self {
98            Self::NotFound => write!(f, "Key not found"),
99            Self::DuplicateKey => write!(f, "Key already exists"),
100            Self::KeyTooLarge(size) => {
101                write!(f, "Key too large: {} bytes (max {})", size, MAX_KEY_SIZE)
102            }
103            Self::ValueTooLarge(size) => write!(
104                f,
105                "Value too large: {} bytes (max {})",
106                size, MAX_VALUE_SIZE
107            ),
108            Self::Corrupted(msg) => write!(f, "B-tree corrupted: {}", msg),
109            Self::Pager(msg) => write!(f, "Pager error: {}", msg),
110            Self::LockPoisoned(msg) => write!(f, "Lock poisoned: {}", msg),
111        }
112    }
113}
114
115impl std::error::Error for BTreeError {}
116
117impl From<PagerError> for BTreeError {
118    fn from(e: PagerError) -> Self {
119        Self::Pager(e.to_string())
120    }
121}
122
123impl From<super::page::PageError> for BTreeError {
124    fn from(e: super::page::PageError) -> Self {
125        Self::Corrupted(e.to_string())
126    }
127}
128
129/// Result type for B+ tree operations
130pub type BTreeResult<T> = Result<T, BTreeError>;
131
132/// B+ Tree cursor for iteration
133pub struct BTreeCursor {
134    /// Current leaf page ID
135    leaf_page_id: u32,
136    /// Current position within leaf
137    position: usize,
138    /// Pager reference
139    pager: Arc<Pager>,
140    /// A6 — true once we've issued a prefetch hint for the next leaf on
141    /// this leaf. Reset to false every time we move to a new leaf so the
142    /// hint fires once per leaf, at the halfway point.
143    prefetched_next: bool,
144}
145
146impl BTreeCursor {
147    /// Move to next entry
148    pub fn next(&mut self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
149        if self.leaf_page_id == 0 {
150            return Ok(None);
151        }
152
153        let page = self.pager.read_page(self.leaf_page_id)?;
154        let cell_count = page.cell_count() as usize;
155
156        // A6 — prefetch the next leaf when we cross the halfway point of
157        // the current one. The hint fires at most once per leaf (guarded
158        // by `prefetched_next`) so the syscall overhead is amortized over
159        // ~half a leaf's worth of entries. The kernel uses the hint to
160        // start DMA-ing the next page while we finish the current one.
161        if !self.prefetched_next && self.position >= cell_count / 2 && cell_count > 0 {
162            let next_leaf = read_next_leaf(&page);
163            if next_leaf != 0 {
164                self.pager.prefetch_hint(next_leaf);
165            }
166            self.prefetched_next = true;
167        }
168
169        // Check if we have more cells in current page
170        if self.position < cell_count {
171            let (key, value) = read_leaf_cell(&page, self.position)?;
172            self.position += 1;
173            return Ok(Some((key, value)));
174        }
175
176        // Move to next leaf
177        let next_leaf = read_next_leaf(&page);
178        if next_leaf == 0 {
179            self.leaf_page_id = 0;
180            return Ok(None);
181        }
182
183        self.leaf_page_id = next_leaf;
184        self.position = 0;
185        self.prefetched_next = false; // reset for the new leaf
186
187        // Read from new leaf
188        let page = self.pager.read_page(self.leaf_page_id)?;
189        if page.cell_count() == 0 {
190            return Ok(None);
191        }
192
193        let (key, value) = read_leaf_cell(&page, 0)?;
194        self.position = 1;
195        Ok(Some((key, value)))
196    }
197
198    /// Peek at current entry without advancing
199    pub fn peek(&self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
200        if self.leaf_page_id == 0 {
201            return Ok(None);
202        }
203
204        let page = self.pager.read_page(self.leaf_page_id)?;
205        let cell_count = page.cell_count() as usize;
206
207        if self.position >= cell_count {
208            return Ok(None);
209        }
210
211        let (key, value) = read_leaf_cell(&page, self.position)?;
212        Ok(Some((key, value)))
213    }
214}
215
216/// B+ Tree implementation
217pub struct BTree {
218    /// Pager for page I/O
219    pager: Arc<Pager>,
220    /// Root page ID (0 = empty tree)
221    root_page_id: RwLock<u32>,
222    /// D2 fastpath: cached rightmost leaf (page_id, high_key).
223    ///
224    /// When the next key to insert is strictly greater than `high_key`,
225    /// skip `find_leaf` (tree descent from root) and go directly to the
226    /// cached page. Invalidated on splits and deletes.
227    ///
228    /// Mirrors `BTPageCacheIsValid` / `BTREE_FASTPATH_MIN_LEVEL` in
229    /// `nbtinsert.c:31`.
230    rightmost_leaf: RwLock<Option<(u32, Vec<u8>)>>,
231}
232
233#[path = "btree/impl.rs"]
234mod btree_impl;
235
236#[path = "btree/lehman_yao.rs"]
237pub mod lehman_yao;
238// ==================== Search Helpers ====================
239
240enum SearchResult {
241    Found(usize),
242    NotFound(usize),
243}
244
245fn search_leaf(page: &Page, key: &[u8]) -> BTreeResult<SearchResult> {
246    let cell_count = page.cell_count() as usize;
247
248    // Binary search
249    let mut low = 0;
250    let mut high = cell_count;
251
252    while low < high {
253        let mid = (low + high) / 2;
254        let (cell_key, _) = read_leaf_cell(page, mid)?;
255
256        match cell_key.as_slice().cmp(key) {
257            Ordering::Less => low = mid + 1,
258            Ordering::Greater => high = mid,
259            Ordering::Equal => return Ok(SearchResult::Found(mid)),
260        }
261    }
262
263    Ok(SearchResult::NotFound(low))
264}
265
266fn find_child(page: &Page, key: &[u8]) -> BTreeResult<u32> {
267    let cell_count = page.cell_count() as usize;
268
269    // Binary search for the correct child
270    for i in 0..cell_count {
271        let (cell_key, child) = read_interior_cell(page, i)?;
272        if key < cell_key.as_slice() {
273            return Ok(child);
274        }
275    }
276
277    // Key is >= all keys, use right child
278    Ok(page.right_child())
279}
280
281fn find_first_child(page: &Page) -> BTreeResult<u32> {
282    if page.cell_count() == 0 {
283        return Ok(page.right_child());
284    }
285    let (_, child) = read_interior_cell(page, 0)?;
286    Ok(child)
287}
288
289fn leaf_min_bytes() -> usize {
290    (PAGE_SIZE - LEAF_DATA_OFFSET) * MIN_FILL_FACTOR / 100
291}
292
293fn interior_min_bytes() -> usize {
294    (PAGE_SIZE - INTERIOR_DATA_OFFSET) * MIN_FILL_FACTOR / 100
295}
296
297/// Bytes consumed by one leaf entry in the slotted layout: one u16
298/// slot in the pointer array plus the cell itself (`[key_len:u16]
299/// [val_len:u16][key][val]`).
300fn leaf_entry_size(entry: &(Vec<u8>, Vec<u8>)) -> usize {
301    2 + 4 + entry.0.len() + entry.1.len()
302}
303
304fn leaf_entries_size(entries: &[(Vec<u8>, Vec<u8>)]) -> usize {
305    entries.iter().map(leaf_entry_size).sum()
306}
307
308#[inline]
309fn leaf_slot_offset_for(index: usize) -> usize {
310    LEAF_SLOT_ARRAY_OFFSET + index * 2
311}
312
313#[inline]
314fn leaf_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
315    let data = page.as_bytes();
316    let slot_pos = leaf_slot_offset_for(index);
317    if slot_pos + 2 > PAGE_SIZE {
318        return Err(BTreeError::Corrupted("slot array overflows page".into()));
319    }
320    Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
321}
322
323#[inline]
324fn leaf_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
325    let data = page.as_bytes_mut();
326    let slot_pos = leaf_slot_offset_for(index);
327    if slot_pos + 2 > PAGE_SIZE {
328        return Err(BTreeError::Corrupted("slot array overflows page".into()));
329    }
330    data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
331    Ok(())
332}
333
334#[inline]
335fn leaf_slots_end(page: &Page) -> usize {
336    LEAF_SLOT_ARRAY_OFFSET + (page.cell_count() as usize) * 2
337}
338
339#[inline]
340fn leaf_cells_start(page: &Page) -> usize {
341    let end = page.free_end() as usize;
342    if end == 0 {
343        PAGE_SIZE
344    } else {
345        end
346    }
347}
348
349#[inline]
350fn leaf_free_bytes(page: &Page) -> usize {
351    let slot_end = leaf_slots_end(page);
352    let cells = leaf_cells_start(page);
353    cells.saturating_sub(slot_end)
354}
355
356fn interior_key_size(key: &[u8]) -> usize {
357    2 + key.len() + 4
358}
359
360fn interior_entries_size(keys: &[Vec<u8>]) -> usize {
361    keys.iter().map(|k| interior_key_size(k)).sum()
362}
363
364// ==================== Leaf Page Helpers ====================
365
366/// Read the cell at slot `index` in O(1). Follows the u16 slot pointer
367/// into the cell data area; the cell header is `[key_len:u16][val_len:u16]`
368/// followed by the raw key and value bytes.
369fn read_leaf_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, Vec<u8>)> {
370    let cell_count = page.cell_count() as usize;
371    if index >= cell_count {
372        return Err(BTreeError::Corrupted("Cell index out of range".into()));
373    }
374    let offset = leaf_read_slot(page, index)?;
375    let data = page.as_bytes();
376    if offset + 4 > PAGE_SIZE {
377        return Err(BTreeError::Corrupted("cell header out of range".into()));
378    }
379    let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
380    let val_len = u16::from_le_bytes([data[offset + 2], data[offset + 3]]) as usize;
381    let end = offset + 4 + key_len + val_len;
382    if end > PAGE_SIZE {
383        return Err(BTreeError::Corrupted("cell body out of range".into()));
384    }
385    let key = data[offset + 4..offset + 4 + key_len].to_vec();
386    let value = data[offset + 4 + key_len..end].to_vec();
387    Ok((key, value))
388}
389
390fn read_leaf_entries(page: &Page) -> BTreeResult<Vec<(Vec<u8>, Vec<u8>)>> {
391    let cell_count = page.cell_count() as usize;
392    let mut entries = Vec::with_capacity(cell_count);
393    for i in 0..cell_count {
394        entries.push(read_leaf_cell(page, i)?);
395    }
396    Ok(entries)
397}
398
399/// Wipe and re-lay an entire leaf in slotted form. Used by the split
400/// path (which hands us a pre-sorted `entries` vector) and by
401/// `clear_leaf_cells` under the hood. Entries are appended at the end
402/// of the free area so the highest-index slot ends up at the lowest
403/// offset — same shape the single-key insert path produces.
404fn write_leaf_entries(page: &mut Page, entries: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
405    clear_leaf_cells(page);
406    for (i, (k, v)) in entries.iter().enumerate() {
407        let cell_size = 4 + k.len() + v.len();
408        let cells_start = leaf_cells_start(page);
409        let slot_end = LEAF_SLOT_ARRAY_OFFSET + (i + 1) * 2;
410        if slot_end + cell_size > cells_start {
411            return Err(BTreeError::Corrupted("leaf rebuild overflowed page".into()));
412        }
413        let cell_offset = cells_start - cell_size;
414        {
415            let data = page.as_bytes_mut();
416            data[cell_offset..cell_offset + 2].copy_from_slice(&(k.len() as u16).to_le_bytes());
417            data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(v.len() as u16).to_le_bytes());
418            data[cell_offset + 4..cell_offset + 4 + k.len()].copy_from_slice(k);
419            data[cell_offset + 4 + k.len()..cell_offset + 4 + k.len() + v.len()].copy_from_slice(v);
420        }
421        page.set_free_end(cell_offset as u16);
422        leaf_write_slot(page, i, cell_offset as u16)?;
423    }
424    page.set_cell_count(entries.len() as u16);
425    page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + entries.len() * 2) as u16);
426    Ok(())
427}
428
429fn can_insert_leaf(page: &Page, key: &[u8], value: &[u8]) -> bool {
430    let needed = 2 + 4 + key.len() + value.len();
431    leaf_free_bytes(page) >= needed
432}
433
434/// Insert `(key, value)` into the slotted leaf in O(log M) search +
435/// O(M) slot-array memmove. Cell data is appended at the tail of the
436/// free area (backward from `free_end`); the slot pointer is inserted
437/// Overwrite the value portion of cell at `index` in-place. Valid when
438/// `new_value.len() <= existing_value.len()` — the caller
439/// (typically `BTree::upsert`) checks this before calling.
440///
441/// Same-length writes are trivial (byte copy). Shrinks update the cell's
442/// `v_len` header and leave the trailing bytes as dead space until the
443/// page is naturally rewritten; the slotted layout keeps cells
444/// addressable by their own offset+len so the gap is harmless.
445///
446/// Fixed-size column updates (SET score = X on a typed schema) hit this
447/// path, avoiding the delete + re-insert + rebalance cycle that
448/// dominates bulk_update.
449pub(crate) fn overwrite_leaf_value(
450    page: &mut Page,
451    index: usize,
452    new_value: &[u8],
453) -> BTreeResult<()> {
454    let cell_offset = leaf_read_slot(page, index)?;
455    let data = page.as_bytes();
456    if cell_offset + 4 > data.len() {
457        return Err(BTreeError::Corrupted(
458            "leaf cell header out of bounds".into(),
459        ));
460    }
461    let k_len = u16::from_le_bytes([data[cell_offset], data[cell_offset + 1]]) as usize;
462    let old_v_len = u16::from_le_bytes([data[cell_offset + 2], data[cell_offset + 3]]) as usize;
463    if new_value.len() > old_v_len {
464        return Err(BTreeError::Corrupted(
465            "overwrite_leaf_value: new value larger than slot".into(),
466        ));
467    }
468    let value_start = cell_offset + 4 + k_len;
469    if value_start + new_value.len() > data.len() {
470        return Err(BTreeError::Corrupted(
471            "leaf cell value out of bounds".into(),
472        ));
473    }
474    let data = page.as_bytes_mut();
475    // Update the v_len header if the value shrank. read_leaf_cell reads
476    // v_len from this header so the shortened value deserializes correctly.
477    if new_value.len() != old_v_len {
478        let new_len = new_value.len() as u16;
479        data[cell_offset + 2..cell_offset + 4].copy_from_slice(&new_len.to_le_bytes());
480    }
481    data[value_start..value_start + new_value.len()].copy_from_slice(new_value);
482    Ok(())
483}
484
485/// at the sorted position.
486fn insert_into_leaf(page: &mut Page, key: &[u8], value: &[u8]) -> BTreeResult<()> {
487    // 1. Binary search the slot array to find the insertion index.
488    let cell_count = page.cell_count() as usize;
489    let mut low = 0;
490    let mut high = cell_count;
491    while low < high {
492        let mid = (low + high) / 2;
493        let (cell_key, _) = read_leaf_cell(page, mid)?;
494        match cell_key.as_slice().cmp(key) {
495            Ordering::Less => low = mid + 1,
496            Ordering::Greater => high = mid,
497            Ordering::Equal => {
498                // Duplicate keys are tolerated by the B-tree (caller
499                // decides semantics); append after the existing run.
500                low = mid + 1;
501                break;
502            }
503        }
504    }
505    let insert_pos = low;
506
507    // 2. Reserve the cell at the tail of the free area.
508    let cell_size = 4 + key.len() + value.len();
509    let slot_end_after = LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2;
510    let cells_start = leaf_cells_start(page);
511    if slot_end_after + cell_size > cells_start {
512        return Err(BTreeError::Corrupted("leaf page full".into()));
513    }
514    let cell_offset = cells_start - cell_size;
515    {
516        let data = page.as_bytes_mut();
517        data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
518        data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(value.len() as u16).to_le_bytes());
519        data[cell_offset + 4..cell_offset + 4 + key.len()].copy_from_slice(key);
520        data[cell_offset + 4 + key.len()..cell_offset + 4 + key.len() + value.len()]
521            .copy_from_slice(value);
522    }
523    page.set_free_end(cell_offset as u16);
524
525    // 3. Shift the slot-array tail right by one slot, then write the
526    //    new pointer into the freed slot. This is a single memmove on
527    //    a couple dozen u16s — far cheaper than the O(M²) rebuild.
528    {
529        let shift_from = leaf_slot_offset_for(insert_pos);
530        let shift_to = shift_from + 2;
531        let shift_len = (cell_count - insert_pos) * 2;
532        if shift_len > 0 {
533            let data = page.as_bytes_mut();
534            data.copy_within(shift_from..shift_from + shift_len, shift_to);
535        }
536    }
537    leaf_write_slot(page, insert_pos, cell_offset as u16)?;
538
539    // 4. Bump counters.
540    page.set_cell_count((cell_count + 1) as u16);
541    page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2) as u16);
542    Ok(())
543}
544
545/// Remove the slot at `index`. Cell bytes are left in place (lazy
546/// compaction); the slot-array tail is shifted left to close the gap.
547/// The caller is expected to call `clear_leaf_cells` + rebuild if the
548/// page wants its free space reclaimed.
549fn delete_from_leaf(page: &mut Page, index: usize) -> BTreeResult<()> {
550    let cell_count = page.cell_count() as usize;
551    if index >= cell_count {
552        return Err(BTreeError::Corrupted("delete index out of range".into()));
553    }
554    if index + 1 < cell_count {
555        let shift_from = leaf_slot_offset_for(index + 1);
556        let shift_to = leaf_slot_offset_for(index);
557        let shift_len = (cell_count - index - 1) * 2;
558        let data = page.as_bytes_mut();
559        data.copy_within(shift_from..shift_from + shift_len, shift_to);
560    }
561    page.set_cell_count((cell_count - 1) as u16);
562    page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count - 1) * 2) as u16);
563    Ok(())
564}
565
566/// Reset the leaf to an empty slotted state — cell count 0, free
567/// cursors at the extremes. Zeroes the slot array + cell data area so
568/// stale bytes never leak through a corrupted read.
569fn clear_leaf_cells(page: &mut Page) {
570    {
571        let data = page.as_bytes_mut();
572        for byte in &mut data[LEAF_SLOT_ARRAY_OFFSET..] {
573            *byte = 0;
574        }
575    }
576    page.set_cell_count(0);
577    page.set_free_start(LEAF_SLOT_ARRAY_OFFSET as u16);
578    page.set_free_end(PAGE_SIZE as u16);
579}
580
581fn init_leaf_links(page: &mut Page, prev: u32, next: u32) {
582    let data = page.as_bytes_mut();
583    data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
584    data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
585}
586
587fn read_next_leaf(page: &Page) -> u32 {
588    let data = page.as_bytes();
589    u32::from_le_bytes([
590        data[LEAF_NEXT_OFFSET],
591        data[LEAF_NEXT_OFFSET + 1],
592        data[LEAF_NEXT_OFFSET + 2],
593        data[LEAF_NEXT_OFFSET + 3],
594    ])
595}
596
597/// Returns the right-sibling page ID for a leaf page.
598/// 0 means this is the rightmost leaf — used by the D2 fastpath cache.
599#[inline]
600fn leaf_right_sibling(page: &Page) -> u32 {
601    read_next_leaf(page)
602}
603
604/// Returns the maximum (last) key stored in a leaf page, or `None` if empty.
605/// This is the Lehman-Yao "high_key" — any key > this value belongs on a
606/// right-sibling page. Reads only the last slot so it is O(1).
607fn leaf_high_key(page: &Page) -> BTreeResult<Option<Vec<u8>>> {
608    let cell_count = page.cell_count() as usize;
609    if cell_count == 0 {
610        return Ok(None);
611    }
612    let (key, _) = read_leaf_cell(page, cell_count - 1)?;
613    Ok(Some(key))
614}
615
616fn set_prev_leaf(page: &mut Page, prev: u32) {
617    let data = page.as_bytes_mut();
618    data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
619}
620
621fn set_next_leaf(page: &mut Page, next: u32) {
622    let data = page.as_bytes_mut();
623    data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
624}
625
626// ==================== Interior Page Helpers ====================
627//
628// Slotted-page layout mirroring the leaf format:
629//
630// ```text
631// [PageHeader 32B]
632// [slot 0: u16][slot 1: u16]...[slot N-1: u16]  ← grows forward
633// ... free space ...
634// [cell N-1][cell N-2]...[cell 0]               ← grows backward from free_end
635// ```
636//
637// Each cell is `[key_len:u16][key][child:u32]`. The right-most child
638// pointer lives in `PageHeader.right_child` exactly as before.
639
640#[inline]
641fn interior_slot_offset_for(index: usize) -> usize {
642    INTERIOR_DATA_OFFSET + index * 2
643}
644
645#[inline]
646fn interior_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
647    let data = page.as_bytes();
648    let slot_pos = interior_slot_offset_for(index);
649    if slot_pos + 2 > PAGE_SIZE {
650        return Err(BTreeError::Corrupted(
651            "interior slot array overflows page".into(),
652        ));
653    }
654    Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
655}
656
657#[inline]
658fn interior_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
659    let data = page.as_bytes_mut();
660    let slot_pos = interior_slot_offset_for(index);
661    if slot_pos + 2 > PAGE_SIZE {
662        return Err(BTreeError::Corrupted(
663            "interior slot array overflows page".into(),
664        ));
665    }
666    data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
667    Ok(())
668}
669
670#[inline]
671fn interior_cells_start(page: &Page) -> usize {
672    let end = page.free_end() as usize;
673    if end == 0 {
674        PAGE_SIZE
675    } else {
676        end
677    }
678}
679
680#[inline]
681fn interior_free_bytes(page: &Page) -> usize {
682    let slot_end = INTERIOR_DATA_OFFSET + (page.cell_count() as usize) * 2;
683    interior_cells_start(page).saturating_sub(slot_end)
684}
685
686fn read_interior_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, u32)> {
687    let cell_count = page.cell_count() as usize;
688    if index >= cell_count {
689        return Err(BTreeError::Corrupted("Cell index out of range".into()));
690    }
691    let offset = interior_read_slot(page, index)?;
692    let data = page.as_bytes();
693    if offset + 2 > PAGE_SIZE {
694        return Err(BTreeError::Corrupted(
695            "interior cell header out of range".into(),
696        ));
697    }
698    let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
699    let end = offset + 2 + key_len + 4;
700    if end > PAGE_SIZE {
701        return Err(BTreeError::Corrupted(
702            "interior cell body out of range".into(),
703        ));
704    }
705    let key = data[offset + 2..offset + 2 + key_len].to_vec();
706    let child = u32::from_le_bytes([
707        data[offset + 2 + key_len],
708        data[offset + 2 + key_len + 1],
709        data[offset + 2 + key_len + 2],
710        data[offset + 2 + key_len + 3],
711    ]);
712    Ok((key, child))
713}
714
715fn read_interior_keys_children(page: &Page) -> BTreeResult<(Vec<Vec<u8>>, Vec<u32>)> {
716    let cell_count = page.cell_count() as usize;
717    let mut keys = Vec::with_capacity(cell_count);
718    let mut children = Vec::with_capacity(cell_count + 1);
719
720    for i in 0..cell_count {
721        let (key, child) = read_interior_cell(page, i)?;
722        keys.push(key);
723        children.push(child);
724    }
725
726    if cell_count == 0 {
727        let right_child = page.right_child();
728        if right_child != 0 {
729            children.push(right_child);
730        }
731    } else {
732        children.push(page.right_child());
733    }
734
735    Ok((keys, children))
736}
737
738/// Bulk-write an interior page from scratch in slotted form. Used by
739/// the split path. `keys` and `children` follow the B+ tree contract:
740/// `children.len() == keys.len() + 1`, with the last child landing in
741/// `PageHeader.right_child` (as before).
742fn write_interior_entries(page: &mut Page, keys: &[Vec<u8>], children: &[u32]) -> BTreeResult<()> {
743    if !keys.is_empty() && children.len() != keys.len() + 1 {
744        return Err(BTreeError::Corrupted(
745            "Interior keys/children length mismatch".into(),
746        ));
747    }
748
749    clear_interior_cells(page);
750    if keys.is_empty() {
751        let right_child = children.first().copied().unwrap_or(0);
752        page.set_right_child(right_child);
753        return Ok(());
754    }
755
756    for (i, key) in keys.iter().enumerate() {
757        let cell_size = 2 + key.len() + 4;
758        let cells_start = interior_cells_start(page);
759        let slot_end = INTERIOR_DATA_OFFSET + (i + 1) * 2;
760        if slot_end + cell_size > cells_start {
761            return Err(BTreeError::Corrupted(
762                "interior rebuild overflowed page".into(),
763            ));
764        }
765        let cell_offset = cells_start - cell_size;
766        {
767            let data = page.as_bytes_mut();
768            data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
769            data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
770            data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
771                .copy_from_slice(&children[i].to_le_bytes());
772        }
773        page.set_free_end(cell_offset as u16);
774        interior_write_slot(page, i, cell_offset as u16)?;
775    }
776    page.set_cell_count(keys.len() as u16);
777    page.set_free_start((INTERIOR_DATA_OFFSET + keys.len() * 2) as u16);
778    page.set_right_child(*children.last().ok_or_else(|| {
779        BTreeError::Corrupted("write_interior_entries: children empty with non-empty keys".into())
780    })?);
781    Ok(())
782}
783
784fn can_insert_interior(page: &Page, key: &[u8]) -> bool {
785    let needed = 2 + 2 + key.len() + 4;
786    interior_free_bytes(page) >= needed
787}
788
789/// Insert a `(key, left_child)` separator into the interior page.
790/// `right_child` replaces whatever child used to sit to the right of
791/// the inserted key (either a middle child's pointer or the page's
792/// `right_child` when the key is the new maximum).
793fn insert_into_interior(
794    page: &mut Page,
795    key: &[u8],
796    left_child: u32,
797    right_child: u32,
798) -> BTreeResult<()> {
799    // 1. Binary search the slot array for the insert position.
800    let cell_count = page.cell_count() as usize;
801    let mut low = 0;
802    let mut high = cell_count;
803    while low < high {
804        let mid = (low + high) / 2;
805        let (cell_key, _) = read_interior_cell(page, mid)?;
806        match cell_key.as_slice().cmp(key) {
807            Ordering::Less => low = mid + 1,
808            Ordering::Greater | Ordering::Equal => high = mid,
809        }
810    }
811    let insert_pos = low;
812
813    // 2. Redirect the previous owner of the split to `right_child`.
814    //    If the insertion is at the tail, the previous owner was the
815    //    page's right_child pointer; otherwise it was the child slot
816    //    of the cell currently at `insert_pos`.
817    if insert_pos < cell_count {
818        let old_offset = interior_read_slot(page, insert_pos)?;
819        let data = page.as_bytes();
820        let key_len = u16::from_le_bytes([data[old_offset], data[old_offset + 1]]) as usize;
821        let child_pos = old_offset + 2 + key_len;
822        let data = page.as_bytes_mut();
823        data[child_pos..child_pos + 4].copy_from_slice(&right_child.to_le_bytes());
824    } else {
825        page.set_right_child(right_child);
826    }
827
828    // 3. Reserve the new cell at the tail of the free area.
829    let cell_size = 2 + key.len() + 4;
830    let slot_end_after = INTERIOR_DATA_OFFSET + (cell_count + 1) * 2;
831    let cells_start = interior_cells_start(page);
832    if slot_end_after + cell_size > cells_start {
833        return Err(BTreeError::Corrupted("interior page full".into()));
834    }
835    let cell_offset = cells_start - cell_size;
836    {
837        let data = page.as_bytes_mut();
838        data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
839        data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
840        data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
841            .copy_from_slice(&left_child.to_le_bytes());
842    }
843    page.set_free_end(cell_offset as u16);
844
845    // 4. Shift the slot-array tail right by one slot, write the new
846    //    pointer into the freed slot, bump counters.
847    {
848        let shift_from = interior_slot_offset_for(insert_pos);
849        let shift_to = shift_from + 2;
850        let shift_len = (cell_count - insert_pos) * 2;
851        if shift_len > 0 {
852            let data = page.as_bytes_mut();
853            data.copy_within(shift_from..shift_from + shift_len, shift_to);
854        }
855    }
856    interior_write_slot(page, insert_pos, cell_offset as u16)?;
857    page.set_cell_count((cell_count + 1) as u16);
858    page.set_free_start((INTERIOR_DATA_OFFSET + (cell_count + 1) * 2) as u16);
859    Ok(())
860}
861
862fn clear_interior_cells(page: &mut Page) {
863    {
864        let data = page.as_bytes_mut();
865        for byte in &mut data[INTERIOR_DATA_OFFSET..] {
866            *byte = 0;
867        }
868    }
869    page.set_cell_count(0);
870    page.set_free_start(INTERIOR_DATA_OFFSET as u16);
871    page.set_free_end(PAGE_SIZE as u16);
872}
873
874#[cfg(test)]
875mod tests {
876    use super::*;
877    use std::path::PathBuf;
878
879    fn temp_db_path() -> PathBuf {
880        use std::sync::atomic::{AtomicU64, Ordering};
881        static COUNTER: AtomicU64 = AtomicU64::new(0);
882        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
883        let mut path = std::env::temp_dir();
884        path.push(format!("reddb_btree_test_{}_{}.db", std::process::id(), id));
885        path
886    }
887
888    fn cleanup(path: &PathBuf) {
889        let _ = std::fs::remove_file(path);
890    }
891
892    #[test]
893    fn test_btree_empty() {
894        let path = temp_db_path();
895        cleanup(&path);
896
897        let pager = Arc::new(Pager::open_default(&path).unwrap());
898        let tree = BTree::new(pager);
899
900        assert!(tree.is_empty());
901        assert_eq!(tree.root_page_id(), 0);
902        assert_eq!(tree.get(b"key").unwrap(), None);
903        assert_eq!(tree.count().unwrap(), 0);
904
905        cleanup(&path);
906    }
907
908    #[test]
909    fn test_btree_single_insert() {
910        let path = temp_db_path();
911        cleanup(&path);
912
913        let pager = Arc::new(Pager::open_default(&path).unwrap());
914        let tree = BTree::new(pager);
915
916        tree.insert(b"hello", b"world").unwrap();
917
918        assert!(!tree.is_empty());
919        assert_eq!(tree.get(b"hello").unwrap(), Some(b"world".to_vec()));
920        assert_eq!(tree.get(b"other").unwrap(), None);
921        assert_eq!(tree.count().unwrap(), 1);
922
923        cleanup(&path);
924    }
925
926    #[test]
927    fn test_btree_multiple_inserts() {
928        let path = temp_db_path();
929        cleanup(&path);
930
931        let pager = Arc::new(Pager::open_default(&path).unwrap());
932        let tree = BTree::new(pager);
933
934        tree.insert(b"c", b"3").unwrap();
935        tree.insert(b"a", b"1").unwrap();
936        tree.insert(b"b", b"2").unwrap();
937
938        assert_eq!(tree.get(b"a").unwrap(), Some(b"1".to_vec()));
939        assert_eq!(tree.get(b"b").unwrap(), Some(b"2".to_vec()));
940        assert_eq!(tree.get(b"c").unwrap(), Some(b"3".to_vec()));
941        assert_eq!(tree.count().unwrap(), 3);
942
943        cleanup(&path);
944    }
945
946    #[test]
947    fn test_btree_duplicate_key() {
948        let path = temp_db_path();
949        cleanup(&path);
950
951        let pager = Arc::new(Pager::open_default(&path).unwrap());
952        let tree = BTree::new(pager);
953
954        tree.insert(b"key", b"value1").unwrap();
955        let result = tree.insert(b"key", b"value2");
956
957        assert!(matches!(result, Err(BTreeError::DuplicateKey)));
958        assert_eq!(tree.get(b"key").unwrap(), Some(b"value1".to_vec()));
959
960        cleanup(&path);
961    }
962
963    #[test]
964    fn test_btree_delete() {
965        let path = temp_db_path();
966        cleanup(&path);
967
968        let pager = Arc::new(Pager::open_default(&path).unwrap());
969        let tree = BTree::new(pager);
970
971        tree.insert(b"a", b"1").unwrap();
972        tree.insert(b"b", b"2").unwrap();
973        tree.insert(b"c", b"3").unwrap();
974
975        assert!(tree.delete(b"b").unwrap());
976        assert!(!tree.delete(b"d").unwrap());
977
978        assert_eq!(tree.get(b"a").unwrap(), Some(b"1".to_vec()));
979        assert_eq!(tree.get(b"b").unwrap(), None);
980        assert_eq!(tree.get(b"c").unwrap(), Some(b"3".to_vec()));
981        assert_eq!(tree.count().unwrap(), 2);
982
983        cleanup(&path);
984    }
985
986    #[test]
987    fn test_btree_delete_rebalance_removes_empty_leaf() {
988        let path = temp_db_path();
989        cleanup(&path);
990
991        let pager = Arc::new(Pager::open_default(&path).unwrap());
992        let tree = BTree::new(pager.clone());
993
994        let value = vec![b'v'; 200];
995        for i in 0..60u32 {
996            let key = format!("key{:03}", i);
997            tree.insert(key.as_bytes(), &value).unwrap();
998        }
999
1000        let root_id = tree.root_page_id();
1001        let first_leaf = tree.find_first_leaf(root_id).unwrap();
1002        let mut leaf_ids = Vec::new();
1003        let mut current = first_leaf;
1004        loop {
1005            leaf_ids.push(current);
1006            let page = pager.read_page(current).unwrap();
1007            let next = read_next_leaf(&page);
1008            if next == 0 {
1009                break;
1010            }
1011            current = next;
1012        }
1013
1014        assert!(leaf_ids.len() >= 3);
1015
1016        let target_leaf = leaf_ids[1];
1017        let page = pager.read_page(target_leaf).unwrap();
1018        let cell_count = page.cell_count() as usize;
1019        let mut keys = Vec::with_capacity(cell_count);
1020        for i in 0..cell_count {
1021            let (key, _) = read_leaf_cell(&page, i).unwrap();
1022            keys.push(key);
1023        }
1024
1025        for key in &keys {
1026            tree.delete(key).unwrap();
1027        }
1028
1029        let expected = 60 - keys.len();
1030        assert_eq!(tree.count().unwrap(), expected);
1031
1032        let mut cursor = tree.cursor_first().unwrap();
1033        let mut results = Vec::new();
1034        while let Some((key, _)) = cursor.next().unwrap() {
1035            results.push(key);
1036        }
1037
1038        assert_eq!(results.len(), expected);
1039        let last_key = format!("key{:03}", 59).into_bytes();
1040        assert_eq!(results.last(), Some(&last_key));
1041
1042        cleanup(&path);
1043    }
1044
1045    #[test]
1046    fn test_btree_cursor() {
1047        let path = temp_db_path();
1048        cleanup(&path);
1049
1050        let pager = Arc::new(Pager::open_default(&path).unwrap());
1051        let tree = BTree::new(pager);
1052
1053        tree.insert(b"c", b"3").unwrap();
1054        tree.insert(b"a", b"1").unwrap();
1055        tree.insert(b"b", b"2").unwrap();
1056
1057        let mut cursor = tree.cursor_first().unwrap();
1058        let mut results: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1059
1060        while let Some(entry) = cursor.next().unwrap() {
1061            results.push(entry);
1062        }
1063
1064        assert_eq!(results.len(), 3);
1065        assert_eq!(results[0], (b"a".to_vec(), b"1".to_vec()));
1066        assert_eq!(results[1], (b"b".to_vec(), b"2".to_vec()));
1067        assert_eq!(results[2], (b"c".to_vec(), b"3".to_vec()));
1068
1069        cleanup(&path);
1070    }
1071
1072    #[test]
1073    fn test_btree_range() {
1074        let path = temp_db_path();
1075        cleanup(&path);
1076
1077        let pager = Arc::new(Pager::open_default(&path).unwrap());
1078        let tree = BTree::new(pager);
1079
1080        for i in 0..10u8 {
1081            let key = format!("key{:02}", i);
1082            let value = format!("val{:02}", i);
1083            tree.insert(key.as_bytes(), value.as_bytes()).unwrap();
1084        }
1085
1086        let results = tree.range(b"key03", b"key06").unwrap();
1087
1088        assert_eq!(results.len(), 4);
1089        assert_eq!(results[0].0, b"key03".to_vec());
1090        assert_eq!(results[3].0, b"key06".to_vec());
1091
1092        cleanup(&path);
1093    }
1094
1095    #[test]
1096    fn test_btree_large_keys() {
1097        let path = temp_db_path();
1098        cleanup(&path);
1099
1100        let pager = Arc::new(Pager::open_default(&path).unwrap());
1101        let tree = BTree::new(pager);
1102
1103        let key = vec![b'x'; 500];
1104        let value = vec![b'y'; 500];
1105
1106        tree.insert(&key, &value).unwrap();
1107        assert_eq!(tree.get(&key).unwrap(), Some(value));
1108
1109        cleanup(&path);
1110    }
1111
1112    #[test]
1113    fn test_btree_key_too_large() {
1114        let path = temp_db_path();
1115        cleanup(&path);
1116
1117        let pager = Arc::new(Pager::open_default(&path).unwrap());
1118        let tree = BTree::new(pager);
1119
1120        let key = vec![b'x'; MAX_KEY_SIZE + 1];
1121        let result = tree.insert(&key, b"value");
1122
1123        assert!(matches!(result, Err(BTreeError::KeyTooLarge(_))));
1124
1125        cleanup(&path);
1126    }
1127
1128    #[test]
1129    fn test_btree_many_inserts() {
1130        let path = temp_db_path();
1131        cleanup(&path);
1132
1133        let pager = Arc::new(Pager::open_default(&path).unwrap());
1134        let tree = BTree::new(pager);
1135
1136        // Insert enough entries to force splits
1137        for i in 0..100u32 {
1138            let key = format!("key{:08}", i);
1139            let value = format!("value{:08}", i);
1140            tree.insert(key.as_bytes(), value.as_bytes()).unwrap();
1141        }
1142
1143        // Verify all entries
1144        for i in 0..100u32 {
1145            let key = format!("key{:08}", i);
1146            let expected = format!("value{:08}", i);
1147            assert_eq!(
1148                tree.get(key.as_bytes()).unwrap(),
1149                Some(expected.into_bytes())
1150            );
1151        }
1152
1153        assert_eq!(tree.count().unwrap(), 100);
1154
1155        cleanup(&path);
1156    }
1157
1158    #[test]
1159    fn test_btree_bulk_insert_sorted_preserves_leaf_layout() {
1160        let path = temp_db_path();
1161        cleanup(&path);
1162
1163        let pager = Arc::new(Pager::open_default(&path).unwrap());
1164        let tree = BTree::new(pager);
1165
1166        let items: Vec<(Vec<u8>, Vec<u8>)> = (0..240u32)
1167            .map(|i| {
1168                (
1169                    format!("key{:08}", i).into_bytes(),
1170                    vec![b'v'; 32 + (i as usize % 7)],
1171                )
1172            })
1173            .collect();
1174
1175        tree.bulk_insert_sorted(&items).unwrap();
1176
1177        assert_eq!(tree.count().unwrap(), items.len());
1178
1179        for (key, value) in &items {
1180            assert_eq!(tree.get(key).unwrap(), Some(value.clone()));
1181        }
1182
1183        cleanup(&path);
1184    }
1185
1186    /// Property test for `BTree::upsert_batch_sorted` (#159).
1187    ///
1188    /// Generates random batches of (key, value) updates and applies them to
1189    /// two independently-built trees: the baseline uses the existing
1190    /// loop-of-`upsert` path, and the candidate uses `upsert_batch_sorted`
1191    /// after the caller-side sort.  Both trees must end up with identical
1192    /// key→value contents (verified via per-key `get` and a full cursor
1193    /// scan), proving the new path is observationally equivalent for
1194    /// arbitrary batches up to 200 entries.
1195    mod prop_upsert_batch_sorted {
1196        use super::*;
1197        use proptest::prelude::*;
1198        use std::collections::BTreeMap;
1199
1200        /// Build a tree pre-populated with `seed` and return the path so
1201        /// the caller can clean it up.
1202        fn populate_tree(seed: &[(Vec<u8>, Vec<u8>)]) -> (BTree, PathBuf, Arc<Pager>) {
1203            let path = temp_db_path();
1204            cleanup(&path);
1205            let pager = Arc::new(Pager::open_default(&path).unwrap());
1206            let tree = BTree::new(Arc::clone(&pager));
1207            for (k, v) in seed {
1208                tree.upsert(k, v).unwrap();
1209            }
1210            (tree, path, pager)
1211        }
1212
1213        fn key_strategy() -> impl Strategy<Value = Vec<u8>> {
1214            // Short keys keep many entries per leaf so a single batch
1215            // hits multiple keys in the same leaf — the path the new
1216            // method optimises.  1..=4 bytes from a small alphabet.
1217            prop::collection::vec(0u8..16u8, 1..=4)
1218        }
1219
1220        fn value_strategy() -> impl Strategy<Value = Vec<u8>> {
1221            // Tiny values — same-or-shrink fits the in-place overwrite
1222            // fast path; growing values fall back to the generic
1223            // single-key `upsert`, exercising both branches.
1224            prop::collection::vec(0u8..255u8, 0..=8)
1225        }
1226
1227        fn pair_strategy() -> impl Strategy<Value = (Vec<u8>, Vec<u8>)> {
1228            (key_strategy(), value_strategy())
1229        }
1230
1231        proptest! {
1232            #![proptest_config(ProptestConfig {
1233                cases: 64,
1234                .. ProptestConfig::default()
1235            })]
1236
1237            #[test]
1238            fn batch_upsert_matches_loop_upsert(
1239                seed in prop::collection::vec(pair_strategy(), 0..50),
1240                batch in prop::collection::vec(pair_strategy(), 1..200),
1241            ) {
1242                // Dedup the seed by key (last write wins) so both trees
1243                // start from the same well-defined state.
1244                let mut seed_map: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1245                for (k, v) in seed {
1246                    seed_map.insert(k, v);
1247                }
1248                let seed: Vec<(Vec<u8>, Vec<u8>)> = seed_map.into_iter().collect();
1249
1250                // Baseline: loop-of-upsert.
1251                let (baseline, baseline_path, _baseline_pager) = populate_tree(&seed);
1252                for (k, v) in &batch {
1253                    baseline.upsert(k, v).unwrap();
1254                }
1255
1256                // Candidate: upsert_batch_sorted after caller-side sort
1257                // (matches the contract documented on the method).
1258                let (candidate, candidate_path, _candidate_pager) = populate_tree(&seed);
1259                let mut sorted = batch.clone();
1260                sorted.sort_by(|a, b| a.0.cmp(&b.0));
1261                candidate.upsert_batch_sorted(&sorted).unwrap();
1262
1263                // Compute the expected end state from BTreeMap (last
1264                // write wins), independent of insertion order.
1265                let mut expected: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1266                for (k, v) in &seed {
1267                    expected.insert(k.clone(), v.clone());
1268                }
1269                for (k, v) in &batch {
1270                    expected.insert(k.clone(), v.clone());
1271                }
1272
1273                // Per-key get matches expected on both trees.
1274                for (k, v) in &expected {
1275                    let baseline_got = baseline.get(k).unwrap();
1276                    let candidate_got = candidate.get(k).unwrap();
1277                    prop_assert_eq!(baseline_got.as_ref(), Some(v));
1278                    prop_assert_eq!(candidate_got.as_ref(), Some(v));
1279                }
1280
1281                // Full cursor scan yields identical sorted contents.
1282                let collect = |t: &BTree| -> Vec<(Vec<u8>, Vec<u8>)> {
1283                    let mut out = Vec::new();
1284                    let mut cur = t.cursor_first().unwrap();
1285                    while let Some(entry) = cur.next().unwrap() {
1286                        out.push(entry);
1287                    }
1288                    out
1289                };
1290                let baseline_contents = collect(&baseline);
1291                let candidate_contents = collect(&candidate);
1292                prop_assert_eq!(&baseline_contents, &candidate_contents);
1293
1294                // And both equal the BTreeMap-derived expectation.
1295                let expected_contents: Vec<(Vec<u8>, Vec<u8>)> =
1296                    expected.into_iter().collect();
1297                prop_assert_eq!(&candidate_contents, &expected_contents);
1298
1299                cleanup(&baseline_path);
1300                cleanup(&candidate_path);
1301            }
1302        }
1303    }
1304
1305    #[test]
1306    fn test_btree_sorted_iteration() {
1307        let path = temp_db_path();
1308        cleanup(&path);
1309
1310        let pager = Arc::new(Pager::open_default(&path).unwrap());
1311        let tree = BTree::new(pager);
1312
1313        // Insert in random order
1314        let keys = vec![50, 25, 75, 10, 30, 60, 80, 5, 15, 27, 35, 55, 65, 77, 90];
1315        for k in &keys {
1316            let key = format!("{:03}", k);
1317            tree.insert(key.as_bytes(), key.as_bytes()).unwrap();
1318        }
1319
1320        // Should iterate in sorted order
1321        let mut cursor = tree.cursor_first().unwrap();
1322        let mut prev: Option<Vec<u8>> = None;
1323
1324        while let Some((key, _)) = cursor.next().unwrap() {
1325            if let Some(p) = &prev {
1326                assert!(p < &key, "Keys not in sorted order");
1327            }
1328            prev = Some(key);
1329        }
1330
1331        cleanup(&path);
1332    }
1333}