Skip to main content

reddb_server/storage/engine/
btree.rs

1//! B+ Tree Implementation for Page-Based Storage
2//!
3//! A B+ tree optimized for disk I/O with the following properties:
4//! - All values stored in leaf nodes
5//! - Interior nodes only contain keys and child pointers
6//! - Leaf nodes form a doubly-linked list for range scans
7//! - Each node fits in a single 4KB page
8//!
9//! # Page Layout
10//!
11//! Interior Node:
12//! ```text
13//! ┌────────────────────────────────────────────────────────────┐
14//! │ PageHeader (32 bytes)                                      │
15//! ├────────────────────────────────────────────────────────────┤
16//! │ right_child: u32 - Rightmost child pointer                 │
17//! │ cells: [key_len: u16, key: [u8], child: u32]...           │
18//! └────────────────────────────────────────────────────────────┘
19//! ```
20//!
21//! Leaf Node:
22//! ```text
23//! ┌────────────────────────────────────────────────────────────┐
24//! │ PageHeader (32 bytes)                                      │
25//! ├────────────────────────────────────────────────────────────┤
26//! │ prev_leaf: u32 - Previous leaf (0 = none)                  │
27//! │ next_leaf: u32 - Next leaf (0 = none)                      │
28//! │ cells: [key_len: u16, val_len: u16, key: [u8], val: [u8]] │
29//! └────────────────────────────────────────────────────────────┘
30//! ```
31
32use std::cmp::Ordering;
33use std::sync::{Arc, RwLock};
34
35use super::page::{Page, PageType, HEADER_SIZE, PAGE_SIZE};
36use super::pager::{Pager, PagerError};
37
38/// Maximum key size (to ensure at least 2 keys per node)
39pub const MAX_KEY_SIZE: usize = 1024;
40
41/// Hard upper bound on logical value size.
42///
43/// Per ADR 0023 (large-value overflow), values up to 256 MiB are stored
44/// transparently — inline below [`value_layout::OVERFLOW_THRESHOLD`],
45/// inline+compressed when LZ4 shrinks them under the same threshold, or
46/// spilled into an [`overflow::OverflowChain`] otherwise. Values above
47/// this ceiling are rejected with [`BTreeError::ValueTooLarge`] before
48/// any compression or page allocation runs.
49///
50/// Re-exported from [`value_layout::MAX_VALUE_SIZE`] so callers that
51/// historically read this constant (`unified::store::impl_pages`,
52/// `query::planner::stats_catalog`) continue to compile.
53pub const MAX_VALUE_SIZE: usize = value_layout::MAX_VALUE_SIZE;
54
55/// Minimum fill factor before merge (as percentage)
56pub const MIN_FILL_FACTOR: usize = 25;
57
58/// Offset of prev_leaf in leaf page
59const LEAF_PREV_OFFSET: usize = HEADER_SIZE;
60
61/// Offset of next_leaf in leaf page
62const LEAF_NEXT_OFFSET: usize = HEADER_SIZE + 4;
63
64/// Start of the slot array in leaf pages.
65///
66/// Slotted-page layout:
67/// ```text
68/// [PageHeader 32B][prev u32 | next u32 | 8B]
69/// [slot 0: u16][slot 1: u16]...[slot N-1: u16]  ← grows forward
70/// ... free space ...
71/// [cell N-1][cell N-2]...[cell 0]               ← grows backward from free_end
72/// ```
73/// Each cell is laid out as `[key_len:u16][val_len:u16][key][val]`.
74/// `page.cell_count()` is N; `page.free_end()` is the offset of the
75/// lowest (most recently written) cell. The slot array lives right
76/// after the leaf-chain links and each u16 slot is the absolute page
77/// offset of its cell.
78const LEAF_SLOT_ARRAY_OFFSET: usize = HEADER_SIZE + 8;
79
80/// Kept for source-compat with older code paths (e.g. interior-node
81/// helpers); equivalent to `LEAF_SLOT_ARRAY_OFFSET` for leaves.
82const LEAF_DATA_OFFSET: usize = LEAF_SLOT_ARRAY_OFFSET;
83
84/// Start of cell data in interior pages (right_child is in header)
85const INTERIOR_DATA_OFFSET: usize = HEADER_SIZE;
86
87/// B+ Tree error types
88#[derive(Debug, Clone)]
89pub enum BTreeError {
90    /// Key not found
91    NotFound,
92    /// Key already exists
93    DuplicateKey,
94    /// Key too large
95    KeyTooLarge(usize),
96    /// Value too large
97    ValueTooLarge(usize),
98    /// Tree is corrupted
99    Corrupted(String),
100    /// Pager error
101    Pager(String),
102    /// Internal lock was poisoned by a panicked thread
103    LockPoisoned(String),
104    /// Large-value layout encode/decode failed (overflow chain or LZ4).
105    ValueLayout(String),
106}
107
108impl std::fmt::Display for BTreeError {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        match self {
111            Self::NotFound => write!(f, "Key not found"),
112            Self::DuplicateKey => write!(f, "Key already exists"),
113            Self::KeyTooLarge(size) => {
114                write!(f, "Key too large: {} bytes (max {})", size, MAX_KEY_SIZE)
115            }
116            Self::ValueTooLarge(size) => write!(
117                f,
118                "Value too large: {} bytes (max {})",
119                size, MAX_VALUE_SIZE
120            ),
121            Self::Corrupted(msg) => write!(f, "B-tree corrupted: {}", msg),
122            Self::Pager(msg) => write!(f, "Pager error: {}", msg),
123            Self::LockPoisoned(msg) => write!(f, "Lock poisoned: {}", msg),
124            Self::ValueLayout(msg) => write!(f, "Value layout: {}", msg),
125        }
126    }
127}
128
129impl From<value_layout::ValueLayoutError> for BTreeError {
130    fn from(e: value_layout::ValueLayoutError) -> Self {
131        match e {
132            value_layout::ValueLayoutError::ValueTooLarge(n) => Self::ValueTooLarge(n),
133            other => Self::ValueLayout(other.to_string()),
134        }
135    }
136}
137
138impl std::error::Error for BTreeError {}
139
140impl From<PagerError> for BTreeError {
141    fn from(e: PagerError) -> Self {
142        Self::Pager(e.to_string())
143    }
144}
145
146impl From<super::page::PageError> for BTreeError {
147    fn from(e: super::page::PageError) -> Self {
148        Self::Corrupted(e.to_string())
149    }
150}
151
152/// Result type for B+ tree operations
153pub type BTreeResult<T> = Result<T, BTreeError>;
154
155/// B+ Tree cursor for iteration
156pub struct BTreeCursor {
157    /// Current leaf page ID
158    leaf_page_id: u32,
159    /// Current position within leaf
160    position: usize,
161    /// Pager reference
162    pager: Arc<Pager>,
163    /// A6 — true once we've issued a prefetch hint for the next leaf on
164    /// this leaf. Reset to false every time we move to a new leaf so the
165    /// hint fires once per leaf, at the halfway point.
166    prefetched_next: bool,
167}
168
169impl BTreeCursor {
170    /// Move to next entry
171    pub fn next(&mut self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
172        if self.leaf_page_id == 0 {
173            return Ok(None);
174        }
175
176        let page = self.pager.read_page(self.leaf_page_id)?;
177        let cell_count = page.cell_count() as usize;
178
179        // A6 — prefetch the next leaf when we cross the halfway point of
180        // the current one. The hint fires at most once per leaf (guarded
181        // by `prefetched_next`) so the syscall overhead is amortized over
182        // ~half a leaf's worth of entries. The kernel uses the hint to
183        // start DMA-ing the next page while we finish the current one.
184        if !self.prefetched_next && self.position >= cell_count / 2 && cell_count > 0 {
185            let next_leaf = read_next_leaf(&page);
186            if next_leaf != 0 {
187                self.pager.prefetch_hint(next_leaf);
188            }
189            self.prefetched_next = true;
190        }
191
192        // Check if we have more cells in current page
193        if self.position < cell_count {
194            let (key, stored) = read_leaf_cell(&page, self.position)?;
195            let value = value_layout::decode(&self.pager, &stored)?;
196            self.position += 1;
197            return Ok(Some((key, value)));
198        }
199
200        // Move to next leaf
201        let next_leaf = read_next_leaf(&page);
202        if next_leaf == 0 {
203            self.leaf_page_id = 0;
204            return Ok(None);
205        }
206
207        self.leaf_page_id = next_leaf;
208        self.position = 0;
209        self.prefetched_next = false; // reset for the new leaf
210
211        // Read from new leaf
212        let page = self.pager.read_page(self.leaf_page_id)?;
213        if page.cell_count() == 0 {
214            return Ok(None);
215        }
216
217        let (key, stored) = read_leaf_cell(&page, 0)?;
218        let value = value_layout::decode(&self.pager, &stored)?;
219        self.position = 1;
220        Ok(Some((key, value)))
221    }
222
223    /// Peek at current entry without advancing
224    pub fn peek(&self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
225        if self.leaf_page_id == 0 {
226            return Ok(None);
227        }
228
229        let page = self.pager.read_page(self.leaf_page_id)?;
230        let cell_count = page.cell_count() as usize;
231
232        if self.position >= cell_count {
233            return Ok(None);
234        }
235
236        let (key, stored) = read_leaf_cell(&page, self.position)?;
237        let value = value_layout::decode(&self.pager, &stored)?;
238        Ok(Some((key, value)))
239    }
240}
241
242/// B+ Tree implementation
243pub struct BTree {
244    /// Pager for page I/O
245    pager: Arc<Pager>,
246    /// Root page ID (0 = empty tree)
247    root_page_id: RwLock<u32>,
248    /// D2 fastpath: cached rightmost leaf (page_id, high_key).
249    ///
250    /// When the next key to insert is strictly greater than `high_key`,
251    /// skip `find_leaf` (tree descent from root) and go directly to the
252    /// cached page. Invalidated on splits and deletes.
253    ///
254    /// Mirrors `BTPageCacheIsValid` / `BTREE_FASTPATH_MIN_LEVEL` in
255    /// `nbtinsert.c:31`.
256    rightmost_leaf: RwLock<Option<(u32, Vec<u8>)>>,
257}
258
259#[path = "btree/impl.rs"]
260mod btree_impl;
261
262#[path = "btree/lehman_yao.rs"]
263pub mod lehman_yao;
264
265#[path = "btree/value_layout.rs"]
266pub mod value_layout;
267// ==================== Search Helpers ====================
268
269enum SearchResult {
270    Found(usize),
271    NotFound(usize),
272}
273
274fn search_leaf(page: &Page, key: &[u8]) -> BTreeResult<SearchResult> {
275    let cell_count = page.cell_count() as usize;
276
277    // Binary search
278    let mut low = 0;
279    let mut high = cell_count;
280
281    while low < high {
282        let mid = (low + high) / 2;
283        let (cell_key, _) = read_leaf_cell(page, mid)?;
284
285        match cell_key.as_slice().cmp(key) {
286            Ordering::Less => low = mid + 1,
287            Ordering::Greater => high = mid,
288            Ordering::Equal => return Ok(SearchResult::Found(mid)),
289        }
290    }
291
292    Ok(SearchResult::NotFound(low))
293}
294
295fn find_child(page: &Page, key: &[u8]) -> BTreeResult<u32> {
296    let cell_count = page.cell_count() as usize;
297
298    // Binary search for the correct child
299    for i in 0..cell_count {
300        let (cell_key, child) = read_interior_cell(page, i)?;
301        if key < cell_key.as_slice() {
302            return Ok(child);
303        }
304    }
305
306    // Key is >= all keys, use right child
307    Ok(page.right_child())
308}
309
310fn find_first_child(page: &Page) -> BTreeResult<u32> {
311    if page.cell_count() == 0 {
312        return Ok(page.right_child());
313    }
314    let (_, child) = read_interior_cell(page, 0)?;
315    Ok(child)
316}
317
318fn leaf_min_bytes() -> usize {
319    (PAGE_SIZE - LEAF_DATA_OFFSET) * MIN_FILL_FACTOR / 100
320}
321
322fn interior_min_bytes() -> usize {
323    (PAGE_SIZE - INTERIOR_DATA_OFFSET) * MIN_FILL_FACTOR / 100
324}
325
326/// Bytes consumed by one leaf entry in the slotted layout: one u16
327/// slot in the pointer array plus the cell itself (`[key_len:u16]
328/// [val_len:u16][key][val]`).
329fn leaf_entry_size(entry: &(Vec<u8>, Vec<u8>)) -> usize {
330    2 + 4 + entry.0.len() + entry.1.len()
331}
332
333fn leaf_entries_size(entries: &[(Vec<u8>, Vec<u8>)]) -> usize {
334    entries.iter().map(leaf_entry_size).sum()
335}
336
337#[inline]
338fn leaf_slot_offset_for(index: usize) -> usize {
339    LEAF_SLOT_ARRAY_OFFSET + index * 2
340}
341
342#[inline]
343fn leaf_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
344    let data = page.as_bytes();
345    let slot_pos = leaf_slot_offset_for(index);
346    if slot_pos + 2 > PAGE_SIZE {
347        return Err(BTreeError::Corrupted("slot array overflows page".into()));
348    }
349    Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
350}
351
352#[inline]
353fn leaf_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
354    let data = page.as_bytes_mut();
355    let slot_pos = leaf_slot_offset_for(index);
356    if slot_pos + 2 > PAGE_SIZE {
357        return Err(BTreeError::Corrupted("slot array overflows page".into()));
358    }
359    data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
360    Ok(())
361}
362
363#[inline]
364fn leaf_slots_end(page: &Page) -> usize {
365    LEAF_SLOT_ARRAY_OFFSET + (page.cell_count() as usize) * 2
366}
367
368#[inline]
369fn leaf_cells_start(page: &Page) -> usize {
370    let end = page.free_end() as usize;
371    if end == 0 {
372        PAGE_SIZE
373    } else {
374        end
375    }
376}
377
378#[inline]
379fn leaf_free_bytes(page: &Page) -> usize {
380    let slot_end = leaf_slots_end(page);
381    let cells = leaf_cells_start(page);
382    cells.saturating_sub(slot_end)
383}
384
385fn interior_key_size(key: &[u8]) -> usize {
386    2 + key.len() + 4
387}
388
389fn interior_entries_size(keys: &[Vec<u8>]) -> usize {
390    keys.iter().map(|k| interior_key_size(k)).sum()
391}
392
393// ==================== Leaf Page Helpers ====================
394
395/// Read the cell at slot `index` in O(1). Follows the u16 slot pointer
396/// into the cell data area; the cell header is `[key_len:u16][val_len:u16]`
397/// followed by the raw key and value bytes.
398fn read_leaf_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, Vec<u8>)> {
399    let cell_count = page.cell_count() as usize;
400    if index >= cell_count {
401        return Err(BTreeError::Corrupted("Cell index out of range".into()));
402    }
403    let offset = leaf_read_slot(page, index)?;
404    let data = page.as_bytes();
405    if offset + 4 > PAGE_SIZE {
406        return Err(BTreeError::Corrupted("cell header out of range".into()));
407    }
408    let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
409    let val_len = u16::from_le_bytes([data[offset + 2], data[offset + 3]]) as usize;
410    let end = offset + 4 + key_len + val_len;
411    if end > PAGE_SIZE {
412        return Err(BTreeError::Corrupted("cell body out of range".into()));
413    }
414    let key = data[offset + 4..offset + 4 + key_len].to_vec();
415    let value = data[offset + 4 + key_len..end].to_vec();
416    Ok((key, value))
417}
418
419fn read_leaf_entries(page: &Page) -> BTreeResult<Vec<(Vec<u8>, Vec<u8>)>> {
420    let cell_count = page.cell_count() as usize;
421    let mut entries = Vec::with_capacity(cell_count);
422    for i in 0..cell_count {
423        entries.push(read_leaf_cell(page, i)?);
424    }
425    Ok(entries)
426}
427
428/// Wipe and re-lay an entire leaf in slotted form. Used by the split
429/// path (which hands us a pre-sorted `entries` vector) and by
430/// `clear_leaf_cells` under the hood. Entries are appended at the end
431/// of the free area so the highest-index slot ends up at the lowest
432/// offset — same shape the single-key insert path produces.
433fn write_leaf_entries(page: &mut Page, entries: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
434    clear_leaf_cells(page);
435    for (i, (k, v)) in entries.iter().enumerate() {
436        let cell_size = 4 + k.len() + v.len();
437        let cells_start = leaf_cells_start(page);
438        let slot_end = LEAF_SLOT_ARRAY_OFFSET + (i + 1) * 2;
439        if slot_end + cell_size > cells_start {
440            return Err(BTreeError::Corrupted("leaf rebuild overflowed page".into()));
441        }
442        let cell_offset = cells_start - cell_size;
443        {
444            let data = page.as_bytes_mut();
445            data[cell_offset..cell_offset + 2].copy_from_slice(&(k.len() as u16).to_le_bytes());
446            data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(v.len() as u16).to_le_bytes());
447            data[cell_offset + 4..cell_offset + 4 + k.len()].copy_from_slice(k);
448            data[cell_offset + 4 + k.len()..cell_offset + 4 + k.len() + v.len()].copy_from_slice(v);
449        }
450        page.set_free_end(cell_offset as u16);
451        leaf_write_slot(page, i, cell_offset as u16)?;
452    }
453    page.set_cell_count(entries.len() as u16);
454    page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + entries.len() * 2) as u16);
455    Ok(())
456}
457
458fn can_insert_leaf(page: &Page, key: &[u8], value: &[u8]) -> bool {
459    let needed = 2 + 4 + key.len() + value.len();
460    leaf_free_bytes(page) >= needed
461}
462
463/// Insert `(key, value)` into the slotted leaf in O(log M) search +
464/// O(M) slot-array memmove. Cell data is appended at the tail of the
465/// free area (backward from `free_end`); the slot pointer is inserted
466/// Overwrite the value portion of cell at `index` in-place. Valid when
467/// `new_value.len() <= existing_value.len()` — the caller
468/// (typically `BTree::upsert`) checks this before calling.
469///
470/// Same-length writes are trivial (byte copy). Shrinks update the cell's
471/// `v_len` header and leave the trailing bytes as dead space until the
472/// page is naturally rewritten; the slotted layout keeps cells
473/// addressable by their own offset+len so the gap is harmless.
474///
475/// Fixed-size column updates (SET score = X on a typed schema) hit this
476/// path, avoiding the delete + re-insert + rebalance cycle that
477/// dominates bulk_update.
478pub(crate) fn overwrite_leaf_value(
479    page: &mut Page,
480    index: usize,
481    new_value: &[u8],
482) -> BTreeResult<()> {
483    let cell_offset = leaf_read_slot(page, index)?;
484    let data = page.as_bytes();
485    if cell_offset + 4 > data.len() {
486        return Err(BTreeError::Corrupted(
487            "leaf cell header out of bounds".into(),
488        ));
489    }
490    let k_len = u16::from_le_bytes([data[cell_offset], data[cell_offset + 1]]) as usize;
491    let old_v_len = u16::from_le_bytes([data[cell_offset + 2], data[cell_offset + 3]]) as usize;
492    if new_value.len() > old_v_len {
493        return Err(BTreeError::Corrupted(
494            "overwrite_leaf_value: new value larger than slot".into(),
495        ));
496    }
497    let value_start = cell_offset + 4 + k_len;
498    if value_start + new_value.len() > data.len() {
499        return Err(BTreeError::Corrupted(
500            "leaf cell value out of bounds".into(),
501        ));
502    }
503    let data = page.as_bytes_mut();
504    // Update the v_len header if the value shrank. read_leaf_cell reads
505    // v_len from this header so the shortened value deserializes correctly.
506    if new_value.len() != old_v_len {
507        let new_len = new_value.len() as u16;
508        data[cell_offset + 2..cell_offset + 4].copy_from_slice(&new_len.to_le_bytes());
509    }
510    data[value_start..value_start + new_value.len()].copy_from_slice(new_value);
511    Ok(())
512}
513
514/// at the sorted position.
515fn insert_into_leaf(page: &mut Page, key: &[u8], value: &[u8]) -> BTreeResult<()> {
516    // 1. Binary search the slot array to find the insertion index.
517    let cell_count = page.cell_count() as usize;
518    let mut low = 0;
519    let mut high = cell_count;
520    while low < high {
521        let mid = (low + high) / 2;
522        let (cell_key, _) = read_leaf_cell(page, mid)?;
523        match cell_key.as_slice().cmp(key) {
524            Ordering::Less => low = mid + 1,
525            Ordering::Greater => high = mid,
526            Ordering::Equal => {
527                // Duplicate keys are tolerated by the B-tree (caller
528                // decides semantics); append after the existing run.
529                low = mid + 1;
530                break;
531            }
532        }
533    }
534    let insert_pos = low;
535
536    // 2. Reserve the cell at the tail of the free area.
537    let cell_size = 4 + key.len() + value.len();
538    let slot_end_after = LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2;
539    let cells_start = leaf_cells_start(page);
540    if slot_end_after + cell_size > cells_start {
541        return Err(BTreeError::Corrupted("leaf page full".into()));
542    }
543    let cell_offset = cells_start - cell_size;
544    {
545        let data = page.as_bytes_mut();
546        data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
547        data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(value.len() as u16).to_le_bytes());
548        data[cell_offset + 4..cell_offset + 4 + key.len()].copy_from_slice(key);
549        data[cell_offset + 4 + key.len()..cell_offset + 4 + key.len() + value.len()]
550            .copy_from_slice(value);
551    }
552    page.set_free_end(cell_offset as u16);
553
554    // 3. Shift the slot-array tail right by one slot, then write the
555    //    new pointer into the freed slot. This is a single memmove on
556    //    a couple dozen u16s — far cheaper than the O(M²) rebuild.
557    {
558        let shift_from = leaf_slot_offset_for(insert_pos);
559        let shift_to = shift_from + 2;
560        let shift_len = (cell_count - insert_pos) * 2;
561        if shift_len > 0 {
562            let data = page.as_bytes_mut();
563            data.copy_within(shift_from..shift_from + shift_len, shift_to);
564        }
565    }
566    leaf_write_slot(page, insert_pos, cell_offset as u16)?;
567
568    // 4. Bump counters.
569    page.set_cell_count((cell_count + 1) as u16);
570    page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2) as u16);
571    Ok(())
572}
573
574/// Remove the slot at `index`. Cell bytes are left in place (lazy
575/// compaction); the slot-array tail is shifted left to close the gap.
576/// The caller is expected to call `clear_leaf_cells` + rebuild if the
577/// page wants its free space reclaimed.
578fn delete_from_leaf(page: &mut Page, index: usize) -> BTreeResult<()> {
579    let cell_count = page.cell_count() as usize;
580    if index >= cell_count {
581        return Err(BTreeError::Corrupted("delete index out of range".into()));
582    }
583    if index + 1 < cell_count {
584        let shift_from = leaf_slot_offset_for(index + 1);
585        let shift_to = leaf_slot_offset_for(index);
586        let shift_len = (cell_count - index - 1) * 2;
587        let data = page.as_bytes_mut();
588        data.copy_within(shift_from..shift_from + shift_len, shift_to);
589    }
590    page.set_cell_count((cell_count - 1) as u16);
591    page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count - 1) * 2) as u16);
592    Ok(())
593}
594
595/// Reset the leaf to an empty slotted state — cell count 0, free
596/// cursors at the extremes. Zeroes the slot array + cell data area so
597/// stale bytes never leak through a corrupted read.
598fn clear_leaf_cells(page: &mut Page) {
599    {
600        let data = page.as_bytes_mut();
601        for byte in &mut data[LEAF_SLOT_ARRAY_OFFSET..] {
602            *byte = 0;
603        }
604    }
605    page.set_cell_count(0);
606    page.set_free_start(LEAF_SLOT_ARRAY_OFFSET as u16);
607    page.set_free_end(PAGE_SIZE as u16);
608}
609
610fn init_leaf_links(page: &mut Page, prev: u32, next: u32) {
611    let data = page.as_bytes_mut();
612    data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
613    data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
614}
615
616fn read_next_leaf(page: &Page) -> u32 {
617    let data = page.as_bytes();
618    u32::from_le_bytes([
619        data[LEAF_NEXT_OFFSET],
620        data[LEAF_NEXT_OFFSET + 1],
621        data[LEAF_NEXT_OFFSET + 2],
622        data[LEAF_NEXT_OFFSET + 3],
623    ])
624}
625
626/// Returns the right-sibling page ID for a leaf page.
627/// 0 means this is the rightmost leaf — used by the D2 fastpath cache.
628#[inline]
629fn leaf_right_sibling(page: &Page) -> u32 {
630    read_next_leaf(page)
631}
632
633/// Returns the maximum (last) key stored in a leaf page, or `None` if empty.
634/// This is the Lehman-Yao "high_key" — any key > this value belongs on a
635/// right-sibling page. Reads only the last slot so it is O(1).
636fn leaf_high_key(page: &Page) -> BTreeResult<Option<Vec<u8>>> {
637    let cell_count = page.cell_count() as usize;
638    if cell_count == 0 {
639        return Ok(None);
640    }
641    let (key, _) = read_leaf_cell(page, cell_count - 1)?;
642    Ok(Some(key))
643}
644
645fn set_prev_leaf(page: &mut Page, prev: u32) {
646    let data = page.as_bytes_mut();
647    data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
648}
649
650fn set_next_leaf(page: &mut Page, next: u32) {
651    let data = page.as_bytes_mut();
652    data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
653}
654
655// ==================== Interior Page Helpers ====================
656//
657// Slotted-page layout mirroring the leaf format:
658//
659// ```text
660// [PageHeader 32B]
661// [slot 0: u16][slot 1: u16]...[slot N-1: u16]  ← grows forward
662// ... free space ...
663// [cell N-1][cell N-2]...[cell 0]               ← grows backward from free_end
664// ```
665//
666// Each cell is `[key_len:u16][key][child:u32]`. The right-most child
667// pointer lives in `PageHeader.right_child` exactly as before.
668
669#[inline]
670fn interior_slot_offset_for(index: usize) -> usize {
671    INTERIOR_DATA_OFFSET + index * 2
672}
673
674#[inline]
675fn interior_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
676    let data = page.as_bytes();
677    let slot_pos = interior_slot_offset_for(index);
678    if slot_pos + 2 > PAGE_SIZE {
679        return Err(BTreeError::Corrupted(
680            "interior slot array overflows page".into(),
681        ));
682    }
683    Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
684}
685
686#[inline]
687fn interior_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
688    let data = page.as_bytes_mut();
689    let slot_pos = interior_slot_offset_for(index);
690    if slot_pos + 2 > PAGE_SIZE {
691        return Err(BTreeError::Corrupted(
692            "interior slot array overflows page".into(),
693        ));
694    }
695    data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
696    Ok(())
697}
698
699#[inline]
700fn interior_cells_start(page: &Page) -> usize {
701    let end = page.free_end() as usize;
702    if end == 0 {
703        PAGE_SIZE
704    } else {
705        end
706    }
707}
708
709#[inline]
710fn interior_free_bytes(page: &Page) -> usize {
711    let slot_end = INTERIOR_DATA_OFFSET + (page.cell_count() as usize) * 2;
712    interior_cells_start(page).saturating_sub(slot_end)
713}
714
715fn read_interior_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, u32)> {
716    let cell_count = page.cell_count() as usize;
717    if index >= cell_count {
718        return Err(BTreeError::Corrupted("Cell index out of range".into()));
719    }
720    let offset = interior_read_slot(page, index)?;
721    let data = page.as_bytes();
722    if offset + 2 > PAGE_SIZE {
723        return Err(BTreeError::Corrupted(
724            "interior cell header out of range".into(),
725        ));
726    }
727    let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
728    let end = offset + 2 + key_len + 4;
729    if end > PAGE_SIZE {
730        return Err(BTreeError::Corrupted(
731            "interior cell body out of range".into(),
732        ));
733    }
734    let key = data[offset + 2..offset + 2 + key_len].to_vec();
735    let child = u32::from_le_bytes([
736        data[offset + 2 + key_len],
737        data[offset + 2 + key_len + 1],
738        data[offset + 2 + key_len + 2],
739        data[offset + 2 + key_len + 3],
740    ]);
741    Ok((key, child))
742}
743
744fn read_interior_keys_children(page: &Page) -> BTreeResult<(Vec<Vec<u8>>, Vec<u32>)> {
745    let cell_count = page.cell_count() as usize;
746    let mut keys = Vec::with_capacity(cell_count);
747    let mut children = Vec::with_capacity(cell_count + 1);
748
749    for i in 0..cell_count {
750        let (key, child) = read_interior_cell(page, i)?;
751        keys.push(key);
752        children.push(child);
753    }
754
755    if cell_count == 0 {
756        let right_child = page.right_child();
757        if right_child != 0 {
758            children.push(right_child);
759        }
760    } else {
761        children.push(page.right_child());
762    }
763
764    Ok((keys, children))
765}
766
767/// Bulk-write an interior page from scratch in slotted form. Used by
768/// the split path. `keys` and `children` follow the B+ tree contract:
769/// `children.len() == keys.len() + 1`, with the last child landing in
770/// `PageHeader.right_child` (as before).
771fn write_interior_entries(page: &mut Page, keys: &[Vec<u8>], children: &[u32]) -> BTreeResult<()> {
772    if !keys.is_empty() && children.len() != keys.len() + 1 {
773        return Err(BTreeError::Corrupted(
774            "Interior keys/children length mismatch".into(),
775        ));
776    }
777
778    clear_interior_cells(page);
779    if keys.is_empty() {
780        let right_child = children.first().copied().unwrap_or(0);
781        page.set_right_child(right_child);
782        return Ok(());
783    }
784
785    for (i, key) in keys.iter().enumerate() {
786        let cell_size = 2 + key.len() + 4;
787        let cells_start = interior_cells_start(page);
788        let slot_end = INTERIOR_DATA_OFFSET + (i + 1) * 2;
789        if slot_end + cell_size > cells_start {
790            return Err(BTreeError::Corrupted(
791                "interior rebuild overflowed page".into(),
792            ));
793        }
794        let cell_offset = cells_start - cell_size;
795        {
796            let data = page.as_bytes_mut();
797            data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
798            data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
799            data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
800                .copy_from_slice(&children[i].to_le_bytes());
801        }
802        page.set_free_end(cell_offset as u16);
803        interior_write_slot(page, i, cell_offset as u16)?;
804    }
805    page.set_cell_count(keys.len() as u16);
806    page.set_free_start((INTERIOR_DATA_OFFSET + keys.len() * 2) as u16);
807    page.set_right_child(*children.last().ok_or_else(|| {
808        BTreeError::Corrupted("write_interior_entries: children empty with non-empty keys".into())
809    })?);
810    Ok(())
811}
812
813fn can_insert_interior(page: &Page, key: &[u8]) -> bool {
814    let needed = 2 + 2 + key.len() + 4;
815    interior_free_bytes(page) >= needed
816}
817
818/// Insert a `(key, left_child)` separator into the interior page.
819/// `right_child` replaces whatever child used to sit to the right of
820/// the inserted key (either a middle child's pointer or the page's
821/// `right_child` when the key is the new maximum).
822fn insert_into_interior(
823    page: &mut Page,
824    key: &[u8],
825    left_child: u32,
826    right_child: u32,
827) -> BTreeResult<()> {
828    // 1. Binary search the slot array for the insert position.
829    let cell_count = page.cell_count() as usize;
830    let mut low = 0;
831    let mut high = cell_count;
832    while low < high {
833        let mid = (low + high) / 2;
834        let (cell_key, _) = read_interior_cell(page, mid)?;
835        match cell_key.as_slice().cmp(key) {
836            Ordering::Less => low = mid + 1,
837            Ordering::Greater | Ordering::Equal => high = mid,
838        }
839    }
840    let insert_pos = low;
841
842    // 2. Redirect the previous owner of the split to `right_child`.
843    //    If the insertion is at the tail, the previous owner was the
844    //    page's right_child pointer; otherwise it was the child slot
845    //    of the cell currently at `insert_pos`.
846    if insert_pos < cell_count {
847        let old_offset = interior_read_slot(page, insert_pos)?;
848        let data = page.as_bytes();
849        let key_len = u16::from_le_bytes([data[old_offset], data[old_offset + 1]]) as usize;
850        let child_pos = old_offset + 2 + key_len;
851        let data = page.as_bytes_mut();
852        data[child_pos..child_pos + 4].copy_from_slice(&right_child.to_le_bytes());
853    } else {
854        page.set_right_child(right_child);
855    }
856
857    // 3. Reserve the new cell at the tail of the free area.
858    let cell_size = 2 + key.len() + 4;
859    let slot_end_after = INTERIOR_DATA_OFFSET + (cell_count + 1) * 2;
860    let cells_start = interior_cells_start(page);
861    if slot_end_after + cell_size > cells_start {
862        return Err(BTreeError::Corrupted("interior page full".into()));
863    }
864    let cell_offset = cells_start - cell_size;
865    {
866        let data = page.as_bytes_mut();
867        data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
868        data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
869        data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
870            .copy_from_slice(&left_child.to_le_bytes());
871    }
872    page.set_free_end(cell_offset as u16);
873
874    // 4. Shift the slot-array tail right by one slot, write the new
875    //    pointer into the freed slot, bump counters.
876    {
877        let shift_from = interior_slot_offset_for(insert_pos);
878        let shift_to = shift_from + 2;
879        let shift_len = (cell_count - insert_pos) * 2;
880        if shift_len > 0 {
881            let data = page.as_bytes_mut();
882            data.copy_within(shift_from..shift_from + shift_len, shift_to);
883        }
884    }
885    interior_write_slot(page, insert_pos, cell_offset as u16)?;
886    page.set_cell_count((cell_count + 1) as u16);
887    page.set_free_start((INTERIOR_DATA_OFFSET + (cell_count + 1) * 2) as u16);
888    Ok(())
889}
890
891fn clear_interior_cells(page: &mut Page) {
892    {
893        let data = page.as_bytes_mut();
894        for byte in &mut data[INTERIOR_DATA_OFFSET..] {
895            *byte = 0;
896        }
897    }
898    page.set_cell_count(0);
899    page.set_free_start(INTERIOR_DATA_OFFSET as u16);
900    page.set_free_end(PAGE_SIZE as u16);
901}
902
903#[cfg(test)]
904mod tests {
905    use super::*;
906    use std::path::PathBuf;
907
908    fn temp_db_path() -> PathBuf {
909        use std::sync::atomic::{AtomicU64, Ordering};
910        static COUNTER: AtomicU64 = AtomicU64::new(0);
911        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
912        let mut path = std::env::temp_dir();
913        path.push(format!("reddb_btree_test_{}_{}.db", std::process::id(), id));
914        path
915    }
916
917    fn cleanup(path: &PathBuf) {
918        let _ = std::fs::remove_file(path);
919    }
920
921    #[test]
922    fn test_btree_empty() {
923        let path = temp_db_path();
924        cleanup(&path);
925
926        let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
927        let tree = BTree::new(pager);
928
929        assert!(tree.is_empty());
930        assert_eq!(tree.root_page_id(), 0);
931        assert_eq!(tree.get(b"key").expect("get() should succeed"), None);
932        assert_eq!(tree.count().expect("count() should succeed"), 0);
933
934        cleanup(&path);
935    }
936
937    #[test]
938    fn test_btree_single_insert() {
939        let path = temp_db_path();
940        cleanup(&path);
941
942        let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
943        let tree = BTree::new(pager);
944
945        tree.insert(b"hello", b"world")
946            .expect("insert() should succeed");
947
948        assert!(!tree.is_empty());
949        assert_eq!(
950            tree.get(b"hello").expect("get() should succeed"),
951            Some(b"world".to_vec())
952        );
953        assert_eq!(tree.get(b"other").expect("get() should succeed"), None);
954        assert_eq!(tree.count().expect("count() should succeed"), 1);
955
956        cleanup(&path);
957    }
958
959    #[test]
960    fn test_btree_multiple_inserts() {
961        let path = temp_db_path();
962        cleanup(&path);
963
964        let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
965        let tree = BTree::new(pager);
966
967        tree.insert(b"c", b"3").expect("insert() should succeed");
968        tree.insert(b"a", b"1").expect("insert() should succeed");
969        tree.insert(b"b", b"2").expect("insert() should succeed");
970
971        assert_eq!(
972            tree.get(b"a").expect("get() should succeed"),
973            Some(b"1".to_vec())
974        );
975        assert_eq!(
976            tree.get(b"b").expect("get() should succeed"),
977            Some(b"2".to_vec())
978        );
979        assert_eq!(
980            tree.get(b"c").expect("get() should succeed"),
981            Some(b"3".to_vec())
982        );
983        assert_eq!(tree.count().expect("count() should succeed"), 3);
984
985        cleanup(&path);
986    }
987
988    #[test]
989    fn test_btree_duplicate_key() {
990        let path = temp_db_path();
991        cleanup(&path);
992
993        let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
994        let tree = BTree::new(pager);
995
996        tree.insert(b"key", b"value1")
997            .expect("insert() should succeed");
998        let result = tree.insert(b"key", b"value2");
999
1000        assert!(matches!(result, Err(BTreeError::DuplicateKey)));
1001        assert_eq!(
1002            tree.get(b"key").expect("get() should succeed"),
1003            Some(b"value1".to_vec())
1004        );
1005
1006        cleanup(&path);
1007    }
1008
1009    #[test]
1010    fn test_btree_delete() {
1011        let path = temp_db_path();
1012        cleanup(&path);
1013
1014        let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1015        let tree = BTree::new(pager);
1016
1017        tree.insert(b"a", b"1").expect("insert() should succeed");
1018        tree.insert(b"b", b"2").expect("insert() should succeed");
1019        tree.insert(b"c", b"3").expect("insert() should succeed");
1020
1021        assert!(tree.delete(b"b").expect("delete() should succeed"));
1022        assert!(!tree.delete(b"d").expect("delete() should succeed"));
1023
1024        assert_eq!(
1025            tree.get(b"a").expect("get() should succeed"),
1026            Some(b"1".to_vec())
1027        );
1028        assert_eq!(tree.get(b"b").expect("get() should succeed"), None);
1029        assert_eq!(
1030            tree.get(b"c").expect("get() should succeed"),
1031            Some(b"3".to_vec())
1032        );
1033        assert_eq!(tree.count().expect("count() should succeed"), 2);
1034
1035        cleanup(&path);
1036    }
1037
1038    #[test]
1039    fn test_btree_delete_rebalance_removes_empty_leaf() {
1040        let path = temp_db_path();
1041        cleanup(&path);
1042
1043        let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1044        let tree = BTree::new(pager.clone());
1045
1046        let value = vec![b'v'; 200];
1047        for i in 0..300u32 {
1048            let key = format!("key{:03}", i);
1049            tree.insert(key.as_bytes(), &value)
1050                .expect("insert() should succeed");
1051        }
1052
1053        let root_id = tree.root_page_id();
1054        let first_leaf = tree
1055            .find_first_leaf(root_id)
1056            .expect("find_first_leaf() should succeed");
1057        let mut leaf_ids = Vec::new();
1058        let mut current = first_leaf;
1059        loop {
1060            leaf_ids.push(current);
1061            let page = pager
1062                .read_page(current)
1063                .expect("read_page() should succeed");
1064            let next = read_next_leaf(&page);
1065            if next == 0 {
1066                break;
1067            }
1068            current = next;
1069        }
1070
1071        assert!(leaf_ids.len() >= 3);
1072
1073        let target_leaf = leaf_ids[1];
1074        let page = pager
1075            .read_page(target_leaf)
1076            .expect("read_page() should succeed");
1077        let cell_count = page.cell_count() as usize;
1078        let mut keys = Vec::with_capacity(cell_count);
1079        for i in 0..cell_count {
1080            let (key, _) = read_leaf_cell(&page, i).expect("read_leaf_cell() should succeed");
1081            keys.push(key);
1082        }
1083
1084        for key in &keys {
1085            tree.delete(key).expect("delete() should succeed");
1086        }
1087
1088        let expected = 300 - keys.len();
1089        assert_eq!(tree.count().expect("count() should succeed"), expected);
1090
1091        let mut cursor = tree.cursor_first().expect("cursor_first() should succeed");
1092        let mut results = Vec::new();
1093        while let Some((key, _)) = cursor.next().expect("next() should succeed") {
1094            results.push(key);
1095        }
1096
1097        assert_eq!(results.len(), expected);
1098        let last_key = format!("key{:03}", 299).into_bytes();
1099        assert_eq!(results.last(), Some(&last_key));
1100
1101        cleanup(&path);
1102    }
1103
1104    #[test]
1105    fn test_btree_cursor() {
1106        let path = temp_db_path();
1107        cleanup(&path);
1108
1109        let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1110        let tree = BTree::new(pager);
1111
1112        tree.insert(b"c", b"3").expect("insert() should succeed");
1113        tree.insert(b"a", b"1").expect("insert() should succeed");
1114        tree.insert(b"b", b"2").expect("insert() should succeed");
1115
1116        let mut cursor = tree.cursor_first().expect("cursor_first() should succeed");
1117        let mut results: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1118
1119        while let Some(entry) = cursor.next().expect("next() should succeed") {
1120            results.push(entry);
1121        }
1122
1123        assert_eq!(results.len(), 3);
1124        assert_eq!(results[0], (b"a".to_vec(), b"1".to_vec()));
1125        assert_eq!(results[1], (b"b".to_vec(), b"2".to_vec()));
1126        assert_eq!(results[2], (b"c".to_vec(), b"3".to_vec()));
1127
1128        cleanup(&path);
1129    }
1130
1131    #[test]
1132    fn test_btree_range() {
1133        let path = temp_db_path();
1134        cleanup(&path);
1135
1136        let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1137        let tree = BTree::new(pager);
1138
1139        for i in 0..10u8 {
1140            let key = format!("key{:02}", i);
1141            let value = format!("val{:02}", i);
1142            tree.insert(key.as_bytes(), value.as_bytes())
1143                .expect("insert() should succeed");
1144        }
1145
1146        let results = tree
1147            .range(b"key03", b"key06")
1148            .expect("range() should succeed");
1149
1150        assert_eq!(results.len(), 4);
1151        assert_eq!(results[0].0, b"key03".to_vec());
1152        assert_eq!(results[3].0, b"key06".to_vec());
1153
1154        cleanup(&path);
1155    }
1156
1157    #[test]
1158    fn test_btree_large_keys() {
1159        let path = temp_db_path();
1160        cleanup(&path);
1161
1162        let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1163        let tree = BTree::new(pager);
1164
1165        let key = vec![b'x'; 500];
1166        let value = vec![b'y'; 500];
1167
1168        tree.insert(&key, &value).expect("insert() should succeed");
1169        assert_eq!(tree.get(&key).expect("get() should succeed"), Some(value));
1170
1171        cleanup(&path);
1172    }
1173
1174    #[test]
1175    fn test_btree_key_too_large() {
1176        let path = temp_db_path();
1177        cleanup(&path);
1178
1179        let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1180        let tree = BTree::new(pager);
1181
1182        let key = vec![b'x'; MAX_KEY_SIZE + 1];
1183        let result = tree.insert(&key, b"value");
1184
1185        assert!(matches!(result, Err(BTreeError::KeyTooLarge(_))));
1186
1187        cleanup(&path);
1188    }
1189
1190    #[test]
1191    fn test_btree_many_inserts() {
1192        let path = temp_db_path();
1193        cleanup(&path);
1194
1195        let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1196        let tree = BTree::new(pager);
1197
1198        // Insert enough entries to force splits
1199        for i in 0..100u32 {
1200            let key = format!("key{:08}", i);
1201            let value = format!("value{:08}", i);
1202            tree.insert(key.as_bytes(), value.as_bytes())
1203                .expect("insert() should succeed");
1204        }
1205
1206        // Verify all entries
1207        for i in 0..100u32 {
1208            let key = format!("key{:08}", i);
1209            let expected = format!("value{:08}", i);
1210            assert_eq!(
1211                tree.get(key.as_bytes()).expect("get() should succeed"),
1212                Some(expected.into_bytes())
1213            );
1214        }
1215
1216        assert_eq!(tree.count().expect("count() should succeed"), 100);
1217
1218        cleanup(&path);
1219    }
1220
1221    #[test]
1222    fn test_btree_bulk_insert_sorted_preserves_leaf_layout() {
1223        let path = temp_db_path();
1224        cleanup(&path);
1225
1226        let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1227        let tree = BTree::new(pager);
1228
1229        let items: Vec<(Vec<u8>, Vec<u8>)> = (0..240u32)
1230            .map(|i| {
1231                (
1232                    format!("key{:08}", i).into_bytes(),
1233                    vec![b'v'; 32 + (i as usize % 7)],
1234                )
1235            })
1236            .collect();
1237
1238        tree.bulk_insert_sorted(&items)
1239            .expect("bulk_insert_sorted() should succeed");
1240
1241        assert_eq!(tree.count().expect("count() should succeed"), items.len());
1242
1243        for (key, value) in &items {
1244            assert_eq!(
1245                tree.get(key).expect("get() should succeed"),
1246                Some(value.clone())
1247            );
1248        }
1249
1250        cleanup(&path);
1251    }
1252
1253    /// Property test for `BTree::upsert_batch_sorted` (#159).
1254    ///
1255    /// Generates random batches of (key, value) updates and applies them to
1256    /// two independently-built trees: the baseline uses the existing
1257    /// loop-of-`upsert` path, and the candidate uses `upsert_batch_sorted`
1258    /// after the caller-side sort.  Both trees must end up with identical
1259    /// key→value contents (verified via per-key `get` and a full cursor
1260    /// scan), proving the new path is observationally equivalent for
1261    /// arbitrary batches up to 200 entries.
1262    mod prop_upsert_batch_sorted {
1263        use super::*;
1264        use proptest::prelude::*;
1265        use std::collections::BTreeMap;
1266
1267        /// Build a tree pre-populated with `seed` and return the path so
1268        /// the caller can clean it up.
1269        fn populate_tree(seed: &[(Vec<u8>, Vec<u8>)]) -> (BTree, PathBuf, Arc<Pager>) {
1270            let path = temp_db_path();
1271            cleanup(&path);
1272            let pager =
1273                Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1274            let tree = BTree::new(Arc::clone(&pager));
1275            for (k, v) in seed {
1276                tree.upsert(k, v).expect("upsert() should succeed");
1277            }
1278            (tree, path, pager)
1279        }
1280
1281        fn key_strategy() -> impl Strategy<Value = Vec<u8>> {
1282            // Short keys keep many entries per leaf so a single batch
1283            // hits multiple keys in the same leaf — the path the new
1284            // method optimises.  1..=4 bytes from a small alphabet.
1285            prop::collection::vec(0u8..16u8, 1..=4)
1286        }
1287
1288        fn value_strategy() -> impl Strategy<Value = Vec<u8>> {
1289            // Tiny values — same-or-shrink fits the in-place overwrite
1290            // fast path; growing values fall back to the generic
1291            // single-key `upsert`, exercising both branches.
1292            prop::collection::vec(0u8..255u8, 0..=8)
1293        }
1294
1295        fn pair_strategy() -> impl Strategy<Value = (Vec<u8>, Vec<u8>)> {
1296            (key_strategy(), value_strategy())
1297        }
1298
1299        proptest! {
1300            #![proptest_config(ProptestConfig {
1301                cases: 64,
1302                .. ProptestConfig::default()
1303            })]
1304
1305            #[test]
1306            fn batch_upsert_matches_loop_upsert(
1307                seed in prop::collection::vec(pair_strategy(), 0..50),
1308                batch in prop::collection::vec(pair_strategy(), 1..200),
1309            ) {
1310                // Dedup the seed by key (last write wins) so both trees
1311                // start from the same well-defined state.
1312                let mut seed_map: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1313                for (k, v) in seed {
1314                    seed_map.insert(k, v);
1315                }
1316                let seed: Vec<(Vec<u8>, Vec<u8>)> = seed_map.into_iter().collect();
1317
1318                // Baseline: loop-of-upsert.
1319                let (baseline, baseline_path, _baseline_pager) = populate_tree(&seed);
1320                for (k, v) in &batch {
1321                    baseline.upsert(k, v).expect("upsert() should succeed");
1322                }
1323
1324                // Candidate: upsert_batch_sorted after caller-side sort
1325                // (matches the contract documented on the method).
1326                let (candidate, candidate_path, _candidate_pager) = populate_tree(&seed);
1327                let mut sorted = batch.clone();
1328                sorted.sort_by(|a, b| a.0.cmp(&b.0));
1329                candidate.upsert_batch_sorted(&sorted).expect("upsert_batch_sorted() should succeed");
1330
1331                // Compute the expected end state from BTreeMap (last
1332                // write wins), independent of insertion order.
1333                let mut expected: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1334                for (k, v) in &seed {
1335                    expected.insert(k.clone(), v.clone());
1336                }
1337                for (k, v) in &batch {
1338                    expected.insert(k.clone(), v.clone());
1339                }
1340
1341                // Per-key get matches expected on both trees.
1342                for (k, v) in &expected {
1343                    let baseline_got = baseline.get(k).expect("get() should succeed");
1344                    let candidate_got = candidate.get(k).expect("get() should succeed");
1345                    prop_assert_eq!(baseline_got.as_ref(), Some(v));
1346                    prop_assert_eq!(candidate_got.as_ref(), Some(v));
1347                }
1348
1349                // Full cursor scan yields identical sorted contents.
1350                let collect = |t: &BTree| -> Vec<(Vec<u8>, Vec<u8>)> {
1351                    let mut out = Vec::new();
1352                    let mut cur = t.cursor_first().expect("cursor_first() should succeed");
1353                    while let Some(entry) = cur.next().expect("next() should succeed") {
1354                        out.push(entry);
1355                    }
1356                    out
1357                };
1358                let baseline_contents = collect(&baseline);
1359                let candidate_contents = collect(&candidate);
1360                prop_assert_eq!(&baseline_contents, &candidate_contents);
1361
1362                // And both equal the BTreeMap-derived expectation.
1363                let expected_contents: Vec<(Vec<u8>, Vec<u8>)> =
1364                    expected.into_iter().collect();
1365                prop_assert_eq!(&candidate_contents, &expected_contents);
1366
1367                cleanup(&baseline_path);
1368                cleanup(&candidate_path);
1369            }
1370        }
1371    }
1372
1373    #[test]
1374    fn test_btree_sorted_iteration() {
1375        let path = temp_db_path();
1376        cleanup(&path);
1377
1378        let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1379        let tree = BTree::new(pager);
1380
1381        // Insert in random order
1382        let keys = vec![50, 25, 75, 10, 30, 60, 80, 5, 15, 27, 35, 55, 65, 77, 90];
1383        for k in &keys {
1384            let key = format!("{:03}", k);
1385            tree.insert(key.as_bytes(), key.as_bytes())
1386                .expect("insert() should succeed");
1387        }
1388
1389        // Should iterate in sorted order
1390        let mut cursor = tree.cursor_first().expect("cursor_first() should succeed");
1391        let mut prev: Option<Vec<u8>> = None;
1392
1393        while let Some((key, _)) = cursor.next().expect("next() should succeed") {
1394            if let Some(p) = &prev {
1395                assert!(p < &key, "Keys not in sorted order");
1396            }
1397            prev = Some(key);
1398        }
1399
1400        cleanup(&path);
1401    }
1402}
1403
1404/// Acceptance tests for slice E of PRD #662 — spill pipeline that wires
1405/// the per-cell value layout (`value_layout`), the LZ4 codec
1406/// (`vector_btree::value_codec`), and the overflow chain (`overflow`)
1407/// into the B-tree's `insert` / `bulk_insert_sorted` / `get` paths.
1408#[cfg(test)]
1409mod overflow_pipeline_tests {
1410    use super::value_layout::{MAX_VALUE_SIZE, OVERFLOW_THRESHOLD};
1411    use super::*;
1412    use std::path::PathBuf;
1413
1414    fn temp_db_path(tag: &str) -> PathBuf {
1415        use std::sync::atomic::{AtomicU64, Ordering};
1416        static COUNTER: AtomicU64 = AtomicU64::new(0);
1417        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
1418        let mut path = std::env::temp_dir();
1419        path.push(format!(
1420            "reddb_btree_spill_{}_{}_{}.db",
1421            tag,
1422            std::process::id(),
1423            id
1424        ));
1425        path
1426    }
1427
1428    fn cleanup(path: &std::path::Path) {
1429        let _ = std::fs::remove_file(path);
1430        for sidecar in reddb_file::layout::pager_shadow_sidecar_paths(path) {
1431            let _ = std::fs::remove_file(sidecar);
1432        }
1433    }
1434
1435    /// AC #1: a value at `OVERFLOW_THRESHOLD - 1` stores inline (raw or
1436    /// compressed) and **no overflow pages are allocated**. We count
1437    /// the file's pages before and after; the only growth should be the
1438    /// leaf page itself.
1439    #[test]
1440    fn inline_path_allocates_no_overflow_pages() {
1441        let path = temp_db_path("inline_no_overflow");
1442        cleanup(&path);
1443        {
1444            let pager =
1445                Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1446            let tree = BTree::new(pager.clone());
1447            let before = pager.page_count().expect("page_count() should succeed");
1448
1449            let value = vec![0x5Au8; OVERFLOW_THRESHOLD - 1];
1450            tree.insert(b"k", &value).expect("insert() should succeed");
1451
1452            let after = pager.page_count().expect("page_count() should succeed");
1453            // Exactly one new page (the root leaf). An overflow path
1454            // would have allocated ≥ 2 pages (leaf + chain head).
1455            assert_eq!(
1456                after - before,
1457                1,
1458                "inline value must allocate only the root leaf"
1459            );
1460            assert_eq!(tree.get(b"k").expect("get() should succeed"), Some(value));
1461        }
1462        cleanup(&path);
1463    }
1464
1465    /// AC #2: a 44 KB highly-compressible markdown payload stores via
1466    /// the compressed-inline path (or via overflow if it still exceeds
1467    /// the threshold after LZ4) and `get` returns byte-identical bytes.
1468    #[test]
1469    fn compressible_44kb_round_trips() {
1470        let path = temp_db_path("compressible_44kb");
1471        cleanup(&path);
1472        {
1473            let pager =
1474                Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1475            let tree = BTree::new(pager);
1476
1477            // Repeating markdown chunk — compresses far below threshold.
1478            let snippet = "# heading\n- bullet line that says the same thing over and over\n";
1479            let mut value = Vec::with_capacity(44 * 1024);
1480            while value.len() < 44 * 1024 {
1481                value.extend_from_slice(snippet.as_bytes());
1482            }
1483            assert!(value.len() > OVERFLOW_THRESHOLD);
1484
1485            tree.insert(b"doc", &value)
1486                .expect("insert() should succeed");
1487            assert_eq!(tree.get(b"doc").expect("get() should succeed"), Some(value));
1488        }
1489        cleanup(&path);
1490    }
1491
1492    /// AC #3: a 5 MB incompressible payload spills via overflow chain
1493    /// and `get` returns byte-identical bytes.
1494    #[test]
1495    fn incompressible_5mb_spills_and_round_trips() {
1496        let path = temp_db_path("incompressible_5mb");
1497        cleanup(&path);
1498        {
1499            let pager =
1500                Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1501            let tree = BTree::new(pager.clone());
1502
1503            // xorshift produces incompressible bytes deterministically
1504            // so the test never flakes on the codec's heuristic.
1505            let mut state: u64 = 0xC0FFEE_DEAD_F00D;
1506            let value: Vec<u8> = (0..5 * 1024 * 1024)
1507                .map(|_| {
1508                    state ^= state << 13;
1509                    state ^= state >> 7;
1510                    state ^= state << 17;
1511                    state as u8
1512                })
1513                .collect();
1514
1515            let before = pager.page_count().expect("page_count() should succeed");
1516            tree.insert(b"blob", &value)
1517                .expect("insert() should succeed");
1518            let after = pager.page_count().expect("page_count() should succeed");
1519            // 5 MB / overflow-page payload ≈ many pages — the leaf
1520            // alone could not absorb this.
1521            assert!(
1522                after - before > 2,
1523                "5 MB spill must allocate overflow pages (alloc'd {} pages)",
1524                after - before
1525            );
1526
1527            assert_eq!(
1528                tree.get(b"blob").expect("get() should succeed").as_deref(),
1529                Some(value.as_slice())
1530            );
1531        }
1532        cleanup(&path);
1533    }
1534
1535    /// AC #4: a value just above `MAX_VALUE_SIZE` is rejected with
1536    /// `ValueTooLarge`; nothing is allocated (no LZ4 work, no overflow
1537    /// pages, no leaf page).
1538    #[test]
1539    fn value_above_ceiling_rejected_without_allocation() {
1540        let path = temp_db_path("ceiling_reject");
1541        cleanup(&path);
1542        {
1543            let pager =
1544                Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1545            let tree = BTree::new(pager.clone());
1546            let before = pager.page_count().expect("page_count() should succeed");
1547
1548            let value = vec![0u8; MAX_VALUE_SIZE + 1];
1549            let err = tree.insert(b"too_big", &value).unwrap_err();
1550            assert!(matches!(err, BTreeError::ValueTooLarge(_)));
1551
1552            assert_eq!(
1553                pager.page_count().expect("page_count() should succeed"),
1554                before,
1555                "rejected value must not allocate pages"
1556            );
1557            // Tree stays empty — no root leaf was created.
1558            assert!(tree.is_empty());
1559        }
1560        cleanup(&path);
1561    }
1562
1563    /// AC #5: `bulk_insert_sorted` with mixed value sizes inserts all
1564    /// valid rows; an oversized row in the middle is rejected but the
1565    /// rest land. The function still surfaces the violation in its
1566    /// return value so callers do not silently lose data.
1567    #[test]
1568    fn bulk_insert_skips_oversized_row_keeps_rest() {
1569        let path = temp_db_path("bulk_mixed");
1570        cleanup(&path);
1571        {
1572            let pager =
1573                Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1574            let tree = BTree::new(pager);
1575
1576            let mut items: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1577            // Five small rows ...
1578            for i in 0..5u32 {
1579                items.push((format!("k{:04}", i).into_bytes(), vec![b'a' + i as u8; 64]));
1580            }
1581            // ... one oversize row in the middle ...
1582            items.push((b"k0005_huge".to_vec(), vec![0u8; MAX_VALUE_SIZE + 1]));
1583            // ... and five more small rows after it. Keys stay sorted.
1584            for i in 6..11u32 {
1585                items.push((format!("k{:04}", i).into_bytes(), vec![b'z' - i as u8; 32]));
1586            }
1587
1588            let res = tree.bulk_insert_sorted(&items);
1589            assert!(matches!(res, Err(BTreeError::ValueTooLarge(_))));
1590
1591            // All valid rows persisted.
1592            for (k, v) in items.iter().filter(|(_, v)| v.len() <= MAX_VALUE_SIZE) {
1593                assert_eq!(
1594                    tree.get(k).expect("get() should succeed").as_deref(),
1595                    Some(v.as_slice()),
1596                    "valid row {:?} must land",
1597                    k
1598                );
1599            }
1600            // The oversize row did not land.
1601            assert_eq!(tree.get(b"k0005_huge").expect("get() should succeed"), None);
1602        }
1603        cleanup(&path);
1604    }
1605
1606    /// AC #6: values that used to fail (≥ 1024 bytes) now persist
1607    /// transparently. Walk a handful of sizes that straddle the
1608    /// threshold and the compressed-inline / spill boundaries.
1609    #[test]
1610    fn large_values_persist_transparently() {
1611        let path = temp_db_path("large_transparent");
1612        cleanup(&path);
1613        {
1614            let pager =
1615                Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1616            let tree = BTree::new(pager);
1617
1618            // Sizes hit: just over threshold, well over threshold,
1619            // and several MB. Bytes pattern is the index — so the
1620            // values differ across keys and are incompressible enough
1621            // that LZ4 falls back to raw on the largest entries.
1622            for (i, size) in [
1623                OVERFLOW_THRESHOLD + 1,
1624                OVERFLOW_THRESHOLD * 2,
1625                64 * 1024,
1626                1 * 1024 * 1024,
1627            ]
1628            .iter()
1629            .enumerate()
1630            {
1631                let value: Vec<u8> = (0..*size).map(|n| ((n + i) & 0xff) as u8).collect();
1632                let key = format!("size{:08}", size).into_bytes();
1633                tree.insert(&key, &value).expect("insert() should succeed");
1634                assert_eq!(
1635                    tree.get(&key).expect("get() should succeed").as_deref(),
1636                    Some(value.as_slice()),
1637                    "size {} must round-trip",
1638                    size
1639                );
1640            }
1641        }
1642        cleanup(&path);
1643    }
1644}
1645
1646/// Acceptance tests for slice F of PRD #662 — freeing overflow chains
1647/// on delete and shrinking update so storage does not leak.
1648#[cfg(test)]
1649mod overflow_free_tests {
1650    use super::value_layout::OVERFLOW_THRESHOLD;
1651    use super::*;
1652    use std::path::PathBuf;
1653
1654    fn temp_db_path(tag: &str) -> PathBuf {
1655        use std::sync::atomic::{AtomicU64, Ordering};
1656        static COUNTER: AtomicU64 = AtomicU64::new(0);
1657        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
1658        let mut path = std::env::temp_dir();
1659        path.push(format!(
1660            "reddb_btree_free_{}_{}_{}.db",
1661            tag,
1662            std::process::id(),
1663            id
1664        ));
1665        path
1666    }
1667
1668    fn cleanup(path: &std::path::Path) {
1669        let _ = std::fs::remove_file(path);
1670        for sidecar in reddb_file::layout::pager_shadow_sidecar_paths(path) {
1671            let _ = std::fs::remove_file(sidecar);
1672        }
1673    }
1674
1675    fn incompressible(seed: u64, len: usize) -> Vec<u8> {
1676        let mut state = seed | 1;
1677        (0..len)
1678            .map(|_| {
1679                state ^= state << 13;
1680                state ^= state >> 7;
1681                state ^= state << 17;
1682                state as u8
1683            })
1684            .collect()
1685    }
1686
1687    /// AC #1: deleting a row whose value spilled returns every overflow
1688    /// page to the free list. Observable signal: page_count stays
1689    /// constant when we insert a same-size row right after the delete
1690    /// (no file growth on the second insert), and `tree.get` returns
1691    /// the new value.
1692    #[test]
1693    fn delete_spilled_value_frees_overflow_chain() {
1694        let path = temp_db_path("delete_frees");
1695        cleanup(&path);
1696        {
1697            let pager =
1698                Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1699            let tree = BTree::new(pager.clone());
1700
1701            // First spilled insert grows the file.
1702            let v1 = incompressible(0xA11CE, 2 * 1024 * 1024);
1703            tree.insert(b"k", &v1).expect("insert() should succeed");
1704            let after_first = pager.page_count().expect("page_count() should succeed");
1705
1706            // Delete: every overflow page returned to the free list.
1707            assert!(tree.delete(b"k").expect("delete() should succeed"));
1708
1709            // Re-insert the same size. With a working free list, the
1710            // pager reuses the freed pages and the file does not grow.
1711            let v2 = incompressible(0xB0B, 2 * 1024 * 1024);
1712            tree.insert(b"k", &v2).expect("insert() should succeed");
1713            let after_second = pager.page_count().expect("page_count() should succeed");
1714
1715            assert_eq!(
1716                after_second, after_first,
1717                "second spill must reuse freed pages, not grow the file"
1718            );
1719            assert_eq!(
1720                tree.get(b"k").expect("get() should succeed").as_deref(),
1721                Some(v2.as_slice())
1722            );
1723        }
1724        cleanup(&path);
1725    }
1726
1727    /// AC #2: updating a spilled value to a small inline value frees
1728    /// the old chain in full. Same observability trick: insert another
1729    /// spilled value of the original size, see that the file does not
1730    /// grow beyond the original high-water mark.
1731    #[test]
1732    fn shrinking_update_to_inline_frees_chain() {
1733        let path = temp_db_path("shrink_to_inline");
1734        cleanup(&path);
1735        {
1736            let pager =
1737                Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1738            let tree = BTree::new(pager.clone());
1739
1740            let big = incompressible(0xDEAD, 2 * 1024 * 1024);
1741            tree.insert(b"k", &big).expect("insert() should succeed");
1742            let hwm = pager.page_count().expect("page_count() should succeed");
1743
1744            // Shrink to inline raw (fits well under the threshold).
1745            let small = b"tiny".to_vec();
1746            tree.upsert(b"k", &small).expect("upsert() should succeed");
1747            assert_eq!(
1748                tree.get(b"k").expect("get() should succeed").as_deref(),
1749                Some(small.as_slice())
1750            );
1751
1752            // If the chain was not freed, this insert would push the
1753            // page count past `hwm`. If it was, the pager reuses freed
1754            // pages and we stay at the high-water mark.
1755            let big2 = incompressible(0xBEEF, 2 * 1024 * 1024);
1756            tree.insert(b"k2", &big2).expect("insert() should succeed");
1757            assert_eq!(
1758                pager.page_count().expect("page_count() should succeed"),
1759                hwm,
1760                "shrinking update must free the chain — replacement spill should reuse"
1761            );
1762            assert_eq!(
1763                tree.get(b"k2").expect("get() should succeed").as_deref(),
1764                Some(big2.as_slice())
1765            );
1766        }
1767        cleanup(&path);
1768    }
1769
1770    /// AC #3: updating a spilled value to a shorter (still spilled)
1771    /// value frees the old chain in full. The new chain is independent
1772    /// — no in-place reuse — so the freed pages must show up in the
1773    /// allocator before any new growth happens.
1774    #[test]
1775    fn shrinking_update_to_shorter_spill_frees_old_chain() {
1776        let path = temp_db_path("shrink_spill_to_spill");
1777        cleanup(&path);
1778        {
1779            let pager =
1780                Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1781            let tree = BTree::new(pager.clone());
1782
1783            // Two spilled values, the second clearly shorter than the
1784            // first. Both encode through the pointer path because the
1785            // bytes are incompressible.
1786            let big = incompressible(0xF00D, 3 * 1024 * 1024);
1787            tree.insert(b"k", &big).expect("insert() should succeed");
1788            let hwm_after_big = pager.page_count().expect("page_count() should succeed");
1789
1790            // Per slice F's acceptance the new chain is *independent*
1791            // of the old — i.e. no in-place page reuse — so the
1792            // upsert allocates fresh pages for the smaller chain
1793            // before freeing the old one. The file may grow during
1794            // this window; what we care about is that the freed pages
1795            // re-enter the allocator. We measure that by dropping the
1796            // row entirely (which frees the new chain too) and
1797            // re-inserting at the original size: if both chains were
1798            // freed, the pager reuses freed pages and we do not exceed
1799            // the high-water mark observed during the transient.
1800            let smaller = incompressible(0xCAFE, 1 * 1024 * 1024);
1801            tree.upsert(b"k", &smaller)
1802                .expect("upsert() should succeed");
1803            assert_eq!(
1804                tree.get(b"k").expect("get() should succeed").as_deref(),
1805                Some(smaller.as_slice())
1806            );
1807            let hwm_after_upsert = pager.page_count().expect("page_count() should succeed");
1808
1809            assert!(tree.delete(b"k").expect("delete() should succeed"));
1810
1811            let again = incompressible(0x1234, 3 * 1024 * 1024);
1812            tree.insert(b"k", &again).expect("insert() should succeed");
1813            assert!(
1814                pager.page_count().expect("page_count() should succeed") <= hwm_after_upsert,
1815                "after delete+insert of original size, file must not grow past the upsert transient \
1816                 (page_count = {}, transient high-water = {}, original = {}) — the upsert must have \
1817                 freed the old chain so the re-insert reuses freed pages",
1818                pager.page_count().expect("page_count() should succeed"),
1819                hwm_after_upsert,
1820                hwm_after_big,
1821            );
1822            assert_eq!(
1823                tree.get(b"k").expect("get() should succeed").as_deref(),
1824                Some(again.as_slice())
1825            );
1826        }
1827        cleanup(&path);
1828    }
1829
1830    /// AC #5 regression: insert large → delete → insert same-size large
1831    /// → file does not grow on the second insert.
1832    #[test]
1833    fn insert_delete_reinsert_does_not_grow_file() {
1834        let path = temp_db_path("reinsert_no_grow");
1835        cleanup(&path);
1836        {
1837            let pager =
1838                Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1839            let tree = BTree::new(pager.clone());
1840
1841            let v1 = incompressible(0x5EED, 4 * 1024 * 1024);
1842            tree.insert(b"row", &v1).expect("insert() should succeed");
1843            let pages_after_first = pager.page_count().expect("page_count() should succeed");
1844
1845            assert!(tree.delete(b"row").expect("delete() should succeed"));
1846
1847            let v2 = incompressible(0xACE, 4 * 1024 * 1024);
1848            tree.insert(b"row", &v2).expect("insert() should succeed");
1849            let pages_after_second = pager.page_count().expect("page_count() should succeed");
1850
1851            assert_eq!(
1852                pages_after_second, pages_after_first,
1853                "re-inserting at the same size must reuse the freed chain"
1854            );
1855            assert_eq!(
1856                tree.get(b"row").expect("get() should succeed").as_deref(),
1857                Some(v2.as_slice())
1858            );
1859        }
1860        cleanup(&path);
1861    }
1862
1863    /// Belt-and-braces: an in-place shrink via `upsert_batch_sorted`
1864    /// — same in-place fast path as single-key `upsert` — must also
1865    /// free the old chain. Without this hook the batch path would
1866    /// silently leak on every `bulk_update` over spilled rows.
1867    #[test]
1868    fn upsert_batch_in_place_shrink_frees_chain() {
1869        let path = temp_db_path("batch_shrink");
1870        cleanup(&path);
1871        {
1872            let pager =
1873                Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1874            let tree = BTree::new(pager.clone());
1875
1876            // Seed two spilled rows so the batch update lands in-place
1877            // on the same leaf.
1878            let big_a = incompressible(0x1111, 2 * 1024 * 1024);
1879            let big_b = incompressible(0x2222, 2 * 1024 * 1024);
1880            tree.insert(b"a", &big_a).expect("insert() should succeed");
1881            tree.insert(b"b", &big_b).expect("insert() should succeed");
1882            let hwm = pager.page_count().expect("page_count() should succeed");
1883
1884            let updates = vec![
1885                (b"a".to_vec(), b"shrunk-a".to_vec()),
1886                (b"b".to_vec(), b"shrunk-b".to_vec()),
1887            ];
1888            tree.upsert_batch_sorted(&updates)
1889                .expect("upsert_batch_sorted() should succeed");
1890            assert_eq!(
1891                tree.get(b"a").expect("get() should succeed").as_deref(),
1892                Some(b"shrunk-a".as_slice())
1893            );
1894            assert_eq!(
1895                tree.get(b"b").expect("get() should succeed").as_deref(),
1896                Some(b"shrunk-b".as_slice())
1897            );
1898
1899            // Both chains must be freed: a fresh pair of spilled
1900            // inserts re-uses them instead of growing the file.
1901            let again_a = incompressible(0xAAAA, 2 * 1024 * 1024);
1902            let again_b = incompressible(0xBBBB, 2 * 1024 * 1024);
1903            tree.insert(b"c", &again_a)
1904                .expect("insert() should succeed");
1905            tree.insert(b"d", &again_b)
1906                .expect("insert() should succeed");
1907            assert!(
1908                pager.page_count().expect("page_count() should succeed") <= hwm,
1909                "batch in-place shrink must free chains — replacement spills should reuse"
1910            );
1911        }
1912        cleanup(&path);
1913    }
1914}