Skip to main content

featherdb_storage/
btree.rs

1//! B-tree implementation for FeatherDB
2
3use crate::{BufferPool, Page, PageType};
4use bytes::BufMut;
5use featherdb_core::{constants, Error, PageId, Result};
6use std::sync::Arc;
7
8/// Marker value stored in the value_len field of a leaf cell to indicate
9/// that the actual value is stored in overflow pages rather than inline.
10const OVERFLOW_MARKER: u32 = u32::MAX;
11
12/// Overflow page data layout (after the standard page header):
13///   [chunk_len:u16][data...]
14/// We chain overflow pages using the existing right_sibling pointer.
15const OVERFLOW_CHUNK_HEADER: usize = 2; // u16 for chunk length
16
17/// Usable data bytes per overflow page.
18fn overflow_data_capacity(page_size: usize) -> usize {
19    page_size - constants::PAGE_HEADER_SIZE - OVERFLOW_CHUNK_HEADER
20}
21
22/// Maximum inline cell size.  If the encoded leaf cell (key_len + value_len
23/// header + key + value) exceeds this threshold the value will be spilled
24/// to overflow pages.  We leave room for at least one slot entry so that
25/// the cell can always be inserted into an empty page.
26fn max_inline_cell_size(page_size: usize) -> usize {
27    // usable = page_size - header; need room for cell + 1 slot entry
28    page_size - constants::PAGE_HEADER_SIZE - constants::SLOT_SIZE
29}
30
31/// B-tree key-value store
32pub struct BTree {
33    /// Root page ID
34    root_page: PageId,
35    /// Buffer pool for page access
36    buffer_pool: Arc<BufferPool>,
37}
38
39impl BTree {
40    /// Create a new B-tree with an empty root page
41    pub fn new(buffer_pool: Arc<BufferPool>) -> Result<Self> {
42        let (root_page, _guard) = buffer_pool.allocate_page(PageType::Leaf)?;
43
44        Ok(BTree {
45            root_page,
46            buffer_pool,
47        })
48    }
49
50    /// Open an existing B-tree
51    pub fn open(root_page: PageId, buffer_pool: Arc<BufferPool>) -> Self {
52        BTree {
53            root_page,
54            buffer_pool,
55        }
56    }
57
58    /// Get root page ID
59    pub fn root_page(&self) -> PageId {
60        self.root_page
61    }
62
63    /// Get a value by key
64    pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
65        let (leaf_page_id, _) = self.find_leaf(key)?;
66        let guard = self.buffer_pool.get_page(leaf_page_id)?;
67        let page = guard.read();
68
69        // Search the leaf page for the key
70        // Note: cells may not be in sorted order after updates (delete+re-insert),
71        // so we scan all cells rather than relying on sorted order for early exit.
72        for i in 0..page.item_count() as usize {
73            // Skip deleted entries
74            if let Some(slot) = page.get_slot(i) {
75                if slot.is_deleted() {
76                    continue;
77                }
78            }
79
80            if let Some(cell) = page.get_cell(i) {
81                let (cell_key, _) = Self::decode_leaf_cell(cell)?;
82
83                if cell_key == key {
84                    // Need to handle overflow cells
85                    if Self::cell_is_overflow(cell) {
86                        let key_len = u16::from_le_bytes([cell[0], cell[1]]) as usize;
87                        let meta = &cell[6 + key_len..];
88                        let (overflow_page, total_len) = Self::decode_overflow_meta(meta)?;
89                        let value = self.read_overflow_pages(overflow_page, total_len as usize)?;
90                        return Ok(Some(value));
91                    } else {
92                        let (_, cell_value) = Self::decode_leaf_cell(cell)?;
93                        return Ok(Some(cell_value.to_vec()));
94                    }
95                }
96            }
97        }
98
99        Ok(None)
100    }
101
102    /// Insert a key-value pair
103    /// Returns the old value if the key already existed
104    pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
105        let (leaf_page_id, path) = self.find_leaf(key)?;
106
107        // Get the leaf page for writing
108        let guard = self.buffer_pool.get_page_for_write(leaf_page_id)?;
109        let mut page = guard.write();
110
111        // Check if key exists
112        let mut old_value = None;
113
114        for i in 0..page.item_count() as usize {
115            // Skip deleted entries
116            if let Some(slot) = page.get_slot(i) {
117                if slot.is_deleted() {
118                    continue;
119                }
120            }
121
122            if let Some(cell) = page.get_cell(i) {
123                let (cell_key, _) = Self::decode_leaf_cell(cell)?;
124
125                if cell_key == key {
126                    // Key exists - read old value (handling overflow)
127                    if Self::cell_is_overflow(cell) {
128                        let key_len = u16::from_le_bytes([cell[0], cell[1]]) as usize;
129                        let meta = &cell[6 + key_len..];
130                        let (overflow_page, total_len) = Self::decode_overflow_meta(meta)?;
131                        old_value =
132                            Some(self.read_overflow_pages(overflow_page, total_len as usize)?);
133                        // Free the old overflow pages
134                        self.free_overflow_pages(overflow_page)?;
135                    } else {
136                        let (_, cell_value) = Self::decode_leaf_cell(cell)?;
137                        old_value = Some(cell_value.to_vec());
138                    }
139                    page.delete_cell(i);
140                    break;
141                }
142            }
143        }
144
145        // Compact the page to reclaim space from deleted entries before
146        // attempting the insert. Without this, repeated updates to the same
147        // key accumulate dead cells until the page appears full and triggers
148        // a split — which can move the root and break fixed-root-page trees
149        // (e.g., the schema catalog B-tree).
150        if old_value.is_some() {
151            page.compact();
152        }
153
154        // Determine if the value needs overflow storage
155        let ps = page.as_bytes().len();
156        let inline_cell = Self::encode_leaf_cell(key, value);
157        let needs_overflow = inline_cell.len() > max_inline_cell_size(ps);
158
159        let cell = if needs_overflow {
160            // Write value to overflow pages and store a compact reference
161            drop(page);
162            drop(guard);
163            let overflow_page_id = self.write_overflow_pages(value)?;
164            let overflow_cell =
165                Self::encode_overflow_cell(key, overflow_page_id, value.len() as u32);
166
167            // Re-acquire the leaf page
168            let guard = self.buffer_pool.get_page_for_write(leaf_page_id)?;
169            let mut page = guard.write();
170
171            if page.insert_cell(&overflow_cell).is_some() {
172                return Ok(old_value);
173            }
174
175            // Still need to split even with the compact overflow cell
176            drop(page);
177            drop(guard);
178            self.insert_overflow_with_split(
179                key,
180                overflow_page_id,
181                value.len() as u32,
182                leaf_page_id,
183                &path,
184            )?;
185            return Ok(old_value);
186        } else {
187            inline_cell
188        };
189
190        // Try to insert into current page
191        if page.insert_cell(&cell).is_some() {
192            return Ok(old_value);
193        }
194
195        // Page is full - need to split
196        drop(page);
197        drop(guard);
198
199        self.insert_with_split(key, value, leaf_page_id, &path)?;
200
201        Ok(old_value)
202    }
203
204    /// Delete a key
205    /// Returns the old value if the key existed
206    pub fn delete(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
207        let (leaf_page_id, _) = self.find_leaf(key)?;
208
209        let guard = self.buffer_pool.get_page_for_write(leaf_page_id)?;
210        let mut page = guard.write();
211
212        for i in 0..page.item_count() as usize {
213            if let Some(cell) = page.get_cell(i) {
214                let (cell_key, _) = Self::decode_leaf_cell(cell)?;
215
216                if cell_key == key {
217                    let old_value = if Self::cell_is_overflow(cell) {
218                        let key_len = u16::from_le_bytes([cell[0], cell[1]]) as usize;
219                        let meta = &cell[6 + key_len..];
220                        let (overflow_page, total_len) = Self::decode_overflow_meta(meta)?;
221                        let val = self.read_overflow_pages(overflow_page, total_len as usize)?;
222                        self.free_overflow_pages(overflow_page)?;
223                        val
224                    } else {
225                        let (_, cell_value) = Self::decode_leaf_cell(cell)?;
226                        cell_value.to_vec()
227                    };
228                    page.delete_cell(i);
229                    return Ok(Some(old_value));
230                }
231            }
232        }
233
234        Ok(None)
235    }
236
237    /// Create a range iterator
238    pub fn range<'a>(&'a self, start: &[u8], end: &[u8]) -> Result<RangeIterator<'a>> {
239        let (leaf_id, _) = self.find_leaf(start)?;
240
241        Ok(RangeIterator {
242            btree: self,
243            current_page: Some(leaf_id),
244            start_key: start.to_vec(),
245            end_key: end.to_vec(),
246            started: false,
247            buffered: Vec::new(),
248        })
249    }
250
251    /// Iterate over all entries
252    pub fn iter(&self) -> Result<RangeIterator<'_>> {
253        // Find the leftmost leaf
254        let mut current = self.root_page;
255
256        loop {
257            let guard = self.buffer_pool.get_page(current)?;
258            let page = guard.read();
259
260            match page.page_type() {
261                PageType::Leaf => {
262                    return Ok(RangeIterator {
263                        btree: self,
264                        current_page: Some(current),
265                        start_key: vec![],
266                        end_key: vec![0xFF; 256], // Max key
267                        started: true,
268                        buffered: Vec::new(),
269                    });
270                }
271                PageType::Branch => {
272                    // Go to first child
273                    if let Some(cell) = page.get_cell(0) {
274                        let (child_id, _) = Self::decode_branch_cell(cell)?;
275                        current = child_id;
276                    } else {
277                        return Err(Error::Internal("Empty branch page".into()));
278                    }
279                }
280                _ => return Err(Error::Internal("Unexpected page type in B-tree".into())),
281            }
282        }
283    }
284
285    /// Iterate all values in the B-tree, calling `f` with borrowed value bytes.
286    /// For inline cells, the value slice points directly into page memory (zero-copy).
287    /// For overflow cells, value bytes are assembled and passed as owned.
288    /// Return `Ok(false)` from the callback to stop iteration early.
289    pub fn for_each_value<F>(&self, mut f: F) -> Result<()>
290    where
291        F: FnMut(&[u8]) -> Result<bool>,
292    {
293        // Find the leftmost leaf
294        let mut current = self.root_page;
295        loop {
296            let guard = self.buffer_pool.get_page(current)?;
297            let page = guard.read();
298            match page.page_type() {
299                PageType::Leaf => break,
300                PageType::Branch => {
301                    if let Some(cell) = page.get_cell(0) {
302                        let (child_id, _) = Self::decode_branch_cell(cell)?;
303                        current = child_id;
304                    } else {
305                        return Err(Error::Internal("Empty branch page".into()));
306                    }
307                }
308                _ => return Err(Error::Internal("Unexpected page type in B-tree".into())),
309            }
310        }
311
312        // Walk leaf pages left-to-right
313        let mut leaf_page = Some(current);
314        while let Some(page_id) = leaf_page {
315            let guard = self.buffer_pool.get_page(page_id)?;
316            let page = guard.read();
317
318            let count = page.item_count() as usize;
319            for i in 0..count {
320                // Check deleted flag directly from slot bytes
321                let slot = match page.get_slot(i) {
322                    Some(s) => s,
323                    None => continue,
324                };
325                if slot.is_deleted() {
326                    continue;
327                }
328
329                if let Some(cell) = page.get_cell(i) {
330                    if Self::cell_is_overflow(cell) {
331                        // Overflow: must assemble value from overflow pages
332                        let key_len = u16::from_le_bytes([cell[0], cell[1]]) as usize;
333                        let meta = &cell[6 + key_len..];
334                        let (overflow_page, total_len) = Self::decode_overflow_meta(meta)?;
335                        let value = self.read_overflow_pages(overflow_page, total_len as usize)?;
336                        if !f(&value)? {
337                            return Ok(());
338                        }
339                    } else {
340                        // Inline: zero-copy slice into page memory
341                        let key_len = u16::from_le_bytes([cell[0], cell[1]]) as usize;
342                        let value_len =
343                            u32::from_le_bytes([cell[2], cell[3], cell[4], cell[5]]) as usize;
344                        let value_start = 6 + key_len;
345                        let value = &cell[value_start..value_start + value_len];
346                        if !f(value)? {
347                            return Ok(());
348                        }
349                    }
350                }
351            }
352
353            leaf_page = page.right_sibling();
354        }
355
356        Ok(())
357    }
358
359    /// Count all non-deleted leaf entries without reading cell data.
360    /// This only inspects slot flags — O(pages) with minimal per-slot work.
361    pub fn count_leaf_items(&self) -> Result<u64> {
362        // Find the leftmost leaf
363        let mut current = self.root_page;
364        loop {
365            let guard = self.buffer_pool.get_page(current)?;
366            let page = guard.read();
367            match page.page_type() {
368                PageType::Leaf => break,
369                PageType::Branch => {
370                    if let Some(cell) = page.get_cell(0) {
371                        let (child_id, _) = Self::decode_branch_cell(cell)?;
372                        current = child_id;
373                    } else {
374                        return Err(Error::Internal("Empty branch page".into()));
375                    }
376                }
377                _ => return Err(Error::Internal("Unexpected page type in B-tree".into())),
378            }
379        }
380
381        let mut total: u64 = 0;
382        let mut leaf_page = Some(current);
383        while let Some(page_id) = leaf_page {
384            let guard = self.buffer_pool.get_page(page_id)?;
385            let page = guard.read();
386            total += page.count_live_items() as u64;
387            leaf_page = page.right_sibling();
388        }
389
390        Ok(total)
391    }
392
393    /// Find the leaf page containing a key, plus the path from root
394    fn find_leaf(&self, key: &[u8]) -> Result<(PageId, Vec<PageId>)> {
395        let mut path = Vec::new();
396        let mut current = self.root_page;
397
398        loop {
399            path.push(current);
400            let guard = self.buffer_pool.get_page(current)?;
401            let page = guard.read();
402
403            match page.page_type() {
404                PageType::Leaf => return Ok((current, path)),
405                PageType::Branch => {
406                    current = self.find_child(&page, key)?;
407                }
408                _ => {
409                    return Err(Error::InvalidPageType {
410                        page_id: current,
411                        page_type: page.page_type() as u8,
412                    })
413                }
414            }
415        }
416    }
417
418    /// Find the child page for a key in a branch page
419    fn find_child(&self, page: &Page, key: &[u8]) -> Result<PageId> {
420        // Branch cells: [child_ptr, separator_key]
421        // We go left if key < separator, right otherwise
422
423        for i in 0..page.item_count() as usize {
424            if let Some(cell) = page.get_cell(i) {
425                let (child_id, separator) = Self::decode_branch_cell(cell)?;
426
427                // Last cell has no separator - it's the rightmost pointer
428                if separator.is_empty() || key < separator {
429                    return Ok(child_id);
430                }
431            }
432        }
433
434        // Key is greater than all separators - use last child
435        if let Some(cell) = page.get_cell(page.item_count() as usize - 1) {
436            let (child_id, _) = Self::decode_branch_cell(cell)?;
437            return Ok(child_id);
438        }
439
440        Err(Error::Internal("Branch page has no children".into()))
441    }
442
443    /// Insert with split when leaf is full.
444    /// This handles both inline and overflow cells by preserving raw cell bytes.
445    fn insert_with_split(
446        &mut self,
447        key: &[u8],
448        value: &[u8],
449        leaf_page_id: PageId,
450        path: &[PageId],
451    ) -> Result<()> {
452        let new_cell = Self::encode_leaf_cell(key, value);
453        self.split_and_insert_cell(key, &new_cell, leaf_page_id, path)
454    }
455
456    /// Insert an overflow reference cell with split.
457    fn insert_overflow_with_split(
458        &mut self,
459        key: &[u8],
460        overflow_page: PageId,
461        total_len: u32,
462        leaf_page_id: PageId,
463        path: &[PageId],
464    ) -> Result<()> {
465        let new_cell = Self::encode_overflow_cell(key, overflow_page, total_len);
466        self.split_and_insert_cell(key, &new_cell, leaf_page_id, path)
467    }
468
469    /// Common split logic: collects raw cells from the leaf, inserts the new
470    /// raw cell in key order, then splits across two pages.
471    fn split_and_insert_cell(
472        &mut self,
473        key: &[u8],
474        new_cell: &[u8],
475        leaf_page_id: PageId,
476        path: &[PageId],
477    ) -> Result<()> {
478        // Collect all raw cells (key, raw_cell_bytes) from the leaf
479        let guard = self.buffer_pool.get_page(leaf_page_id)?;
480        let page = guard.read();
481
482        // entries: (sort_key, raw_cell_bytes)
483        let mut entries: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
484        for i in 0..page.item_count() as usize {
485            if let Some(cell) = page.get_cell(i) {
486                let slot = page.get_slot(i).unwrap();
487                if !slot.is_deleted() {
488                    let (k, _) = Self::decode_leaf_cell(cell)?;
489                    entries.push((k.to_vec(), cell.to_vec()));
490                }
491            }
492        }
493
494        let right_sibling = page.right_sibling();
495        drop(page);
496        drop(guard);
497
498        // Insert the new cell in sorted key order
499        let insert_pos = entries
500            .iter()
501            .position(|(k, _)| k.as_slice() > key)
502            .unwrap_or(entries.len());
503        entries.insert(insert_pos, (key.to_vec(), new_cell.to_vec()));
504
505        // Split entries in half
506        let mid = entries.len() / 2;
507        let left_entries = &entries[..mid];
508        let right_entries = &entries[mid..];
509
510        // The split key is the first key of the right page
511        let split_key = right_entries[0].0.clone();
512
513        // Allocate new page for right half
514        let (new_page_id, new_guard) = self.buffer_pool.allocate_page(PageType::Leaf)?;
515        {
516            let mut new_page = new_guard.write();
517            new_page.set_right_sibling(right_sibling);
518
519            for (_, raw_cell) in right_entries {
520                new_page.insert_cell(raw_cell);
521            }
522        }
523
524        // Rewrite left page with left entries
525        let guard = self.buffer_pool.get_page_for_write(leaf_page_id)?;
526        {
527            let mut page = guard.write();
528
529            // Clear the page
530            let page_id = page.page_id();
531            let page_type = page.page_type();
532            let parent = page.parent();
533            let page_size = page.as_bytes().len();
534
535            *page = Page::new(page_id, page_type, page_size);
536            page.set_parent(parent);
537            page.set_right_sibling(Some(new_page_id));
538
539            for (_, raw_cell) in left_entries {
540                page.insert_cell(raw_cell);
541            }
542        }
543
544        // Insert separator into parent
545        if path.len() > 1 {
546            let parent_id = path[path.len() - 2];
547            self.insert_into_parent(parent_id, &split_key, new_page_id, &path[..path.len() - 1])?;
548        } else {
549            // Root was split - create new root
550            self.create_new_root(leaf_page_id, &split_key, new_page_id)?;
551        }
552
553        Ok(())
554    }
555
556    /// Insert a separator and child pointer into a parent branch page
557    fn insert_into_parent(
558        &mut self,
559        parent_id: PageId,
560        separator: &[u8],
561        child_id: PageId,
562        path: &[PageId],
563    ) -> Result<()> {
564        let guard = self.buffer_pool.get_page_for_write(parent_id)?;
565        let mut page = guard.write();
566
567        let cell = Self::encode_branch_cell(child_id, separator);
568
569        if page.insert_cell(&cell).is_some() {
570            return Ok(());
571        }
572
573        // Parent is also full - need to split it too
574        // Collect all entries from this branch page
575        let mut entries: Vec<(Vec<u8>, PageId)> = Vec::new();
576        for i in 0..page.item_count() as usize {
577            if let Some(cell) = page.get_cell(i) {
578                let slot = page.get_slot(i).unwrap();
579                if !slot.is_deleted() {
580                    let (child, sep) = Self::decode_branch_cell(cell)?;
581                    entries.push((sep.to_vec(), child));
582                }
583            }
584        }
585        drop(page);
586        drop(guard);
587
588        // Add the new entry in sorted order
589        let insert_pos = entries
590            .iter()
591            .position(|(k, _)| k.as_slice() > separator)
592            .unwrap_or(entries.len());
593        entries.insert(insert_pos, (separator.to_vec(), child_id));
594
595        // Split entries - the middle key goes to the parent
596        let mid = entries.len() / 2;
597        let left_entries = &entries[..mid];
598        let right_entries = &entries[mid + 1..]; // Skip the middle entry - it goes up
599        let (middle_key, _) = &entries[mid];
600
601        // Allocate new page for right half
602        let (new_page_id, new_guard) = self.buffer_pool.allocate_page(PageType::Branch)?;
603        {
604            let mut new_page = new_guard.write();
605            for (sep, child) in right_entries {
606                let cell = Self::encode_branch_cell(*child, sep);
607                new_page.insert_cell(&cell);
608            }
609        }
610
611        // Rewrite parent page with left entries
612        let guard = self.buffer_pool.get_page_for_write(parent_id)?;
613        {
614            let mut page = guard.write();
615
616            // Clear the page
617            let page_id = page.page_id();
618            let page_type = page.page_type();
619            let grandparent = page.parent();
620            let page_size = page.as_bytes().len();
621
622            *page = Page::new(page_id, page_type, page_size);
623            page.set_parent(grandparent);
624
625            for (sep, child) in left_entries {
626                let cell = Self::encode_branch_cell(*child, sep);
627                page.insert_cell(&cell);
628            }
629        }
630        drop(guard);
631
632        // Update parent pointers in children of the new page
633        for (_, child_page_id) in right_entries {
634            let guard = self.buffer_pool.get_page_for_write(*child_page_id)?;
635            let mut page = guard.write();
636            page.set_parent(new_page_id);
637        }
638
639        // Find grandparent and insert the middle key
640        // path contains the path from root to the current node
641        let parent_pos = path.iter().position(|&p| p == parent_id);
642        if let Some(pos) = parent_pos {
643            if pos > 0 {
644                // Has grandparent - recursively insert
645                let grandparent_id = path[pos - 1];
646                self.insert_into_parent(grandparent_id, middle_key, new_page_id, &path[..pos])?;
647            } else {
648                // Parent was root - create new root
649                self.create_new_root(parent_id, middle_key, new_page_id)?;
650            }
651        } else {
652            // Parent was root (path might be empty or not contain it)
653            self.create_new_root(parent_id, middle_key, new_page_id)?;
654        }
655
656        Ok(())
657    }
658
659    /// Create a new root when the old root splits
660    fn create_new_root(
661        &mut self,
662        left_child: PageId,
663        separator: &[u8],
664        right_child: PageId,
665    ) -> Result<()> {
666        let (new_root_id, guard) = self.buffer_pool.allocate_page(PageType::Branch)?;
667        {
668            let mut page = guard.write();
669
670            // Left child pointer (with empty separator - goes left of everything)
671            let left_cell = Self::encode_branch_cell(left_child, &[]);
672            page.insert_cell(&left_cell);
673
674            // Right child pointer with separator
675            let right_cell = Self::encode_branch_cell(right_child, separator);
676            page.insert_cell(&right_cell);
677        }
678
679        // Update parent pointers in children
680        {
681            let guard = self.buffer_pool.get_page_for_write(left_child)?;
682            let mut page = guard.write();
683            page.set_parent(new_root_id);
684        }
685        {
686            let guard = self.buffer_pool.get_page_for_write(right_child)?;
687            let mut page = guard.write();
688            page.set_parent(new_root_id);
689        }
690
691        self.root_page = new_root_id;
692        Ok(())
693    }
694
695    /// Encode a leaf cell: [key_len:u16][value_len:u32][key][value]
696    fn encode_leaf_cell(key: &[u8], value: &[u8]) -> Vec<u8> {
697        let mut buf = Vec::with_capacity(6 + key.len() + value.len());
698        buf.put_u16_le(key.len() as u16);
699        buf.put_u32_le(value.len() as u32);
700        buf.extend_from_slice(key);
701        buf.extend_from_slice(value);
702        buf
703    }
704
705    /// Encode an overflow reference cell stored in the leaf page.
706    /// Format: [key_len:u16][OVERFLOW_MARKER:u32][key][overflow_page_id:u64][total_value_len:u32]
707    fn encode_overflow_cell(key: &[u8], overflow_page: PageId, total_len: u32) -> Vec<u8> {
708        let mut buf = Vec::with_capacity(6 + key.len() + 12);
709        buf.put_u16_le(key.len() as u16);
710        buf.put_u32_le(OVERFLOW_MARKER);
711        buf.extend_from_slice(key);
712        buf.put_u64_le(overflow_page.0);
713        buf.put_u32_le(total_len);
714        buf
715    }
716
717    /// Decode a leaf cell.  Returns (key, value) for inline cells.
718    /// For overflow cells this only returns the inline portion; callers
719    /// that need the full value should use `decode_leaf_cell_full`.
720    fn decode_leaf_cell(cell: &[u8]) -> Result<(&[u8], &[u8])> {
721        if cell.len() < 6 {
722            return Err(Error::Internal("Leaf cell too small".into()));
723        }
724
725        let key_len = u16::from_le_bytes([cell[0], cell[1]]) as usize;
726        let value_len = u32::from_le_bytes([cell[2], cell[3], cell[4], cell[5]]);
727
728        if value_len == OVERFLOW_MARKER {
729            // Overflow cell – return key and the raw overflow metadata as
730            // "value" so callers that only need the key still work.
731            let key_end = 6 + key_len;
732            if cell.len() < key_end {
733                return Err(Error::Internal("Overflow cell key truncated".into()));
734            }
735            let key = &cell[6..key_end];
736            // Return the overflow metadata bytes as the "value" slice.
737            // Callers doing key-only comparisons won't inspect this.
738            let meta = &cell[key_end..];
739            Ok((key, meta))
740        } else {
741            let value_len = value_len as usize;
742            if cell.len() < 6 + key_len + value_len {
743                return Err(Error::Internal("Leaf cell data truncated".into()));
744            }
745            let key = &cell[6..6 + key_len];
746            let value = &cell[6 + key_len..6 + key_len + value_len];
747            Ok((key, value))
748        }
749    }
750
751    /// Check whether a raw cell uses overflow storage.
752    fn cell_is_overflow(cell: &[u8]) -> bool {
753        if cell.len() < 6 {
754            return false;
755        }
756        u32::from_le_bytes([cell[2], cell[3], cell[4], cell[5]]) == OVERFLOW_MARKER
757    }
758
759    /// Write a large value into one or more overflow pages.
760    /// Returns the page ID of the first overflow page.
761    fn write_overflow_pages(&self, value: &[u8]) -> Result<PageId> {
762        let ps = self.page_size();
763        let cap = overflow_data_capacity(ps);
764
765        let mut remaining = value;
766        let mut first_page: Option<PageId> = None;
767        let mut prev_page: Option<PageId> = None;
768
769        while !remaining.is_empty() {
770            let chunk_len = remaining.len().min(cap);
771            let (chunk, rest) = remaining.split_at(chunk_len);
772            remaining = rest;
773
774            let (page_id, guard) = self.buffer_pool.allocate_page(PageType::Overflow)?;
775            {
776                let mut page = guard.write();
777                let bytes = page.as_bytes_mut();
778                // Write chunk_len (u16) right after the standard page header
779                let hdr = constants::PAGE_HEADER_SIZE;
780                bytes[hdr..hdr + 2].copy_from_slice(&(chunk_len as u16).to_le_bytes());
781                // Write chunk data after the chunk header
782                let data_start = hdr + OVERFLOW_CHUNK_HEADER;
783                bytes[data_start..data_start + chunk_len].copy_from_slice(chunk);
784            }
785
786            // Chain: set previous overflow page's right_sibling to this page
787            if let Some(prev_id) = prev_page {
788                let prev_guard = self.buffer_pool.get_page_for_write(prev_id)?;
789                let mut prev = prev_guard.write();
790                prev.set_right_sibling(Some(page_id));
791            }
792
793            if first_page.is_none() {
794                first_page = Some(page_id);
795            }
796            prev_page = Some(page_id);
797        }
798
799        first_page.ok_or_else(|| Error::Internal("Empty overflow write".into()))
800    }
801
802    /// Read a value from a chain of overflow pages.
803    fn read_overflow_pages(&self, first_page: PageId, total_len: usize) -> Result<Vec<u8>> {
804        let mut result = Vec::with_capacity(total_len);
805        let mut current = Some(first_page);
806
807        while let Some(page_id) = current {
808            let guard = self.buffer_pool.get_page(page_id)?;
809            let page = guard.read();
810            let bytes = page.as_bytes();
811            let hdr = constants::PAGE_HEADER_SIZE;
812            let chunk_len = u16::from_le_bytes([bytes[hdr], bytes[hdr + 1]]) as usize;
813            let data_start = hdr + OVERFLOW_CHUNK_HEADER;
814            if data_start + chunk_len > bytes.len() {
815                return Err(Error::Internal(
816                    "Overflow page data exceeds page size".into(),
817                ));
818            }
819            result.extend_from_slice(&bytes[data_start..data_start + chunk_len]);
820            current = page.right_sibling();
821        }
822
823        if result.len() != total_len {
824            return Err(Error::Internal(format!(
825                "Overflow data length mismatch: expected {}, got {}",
826                total_len,
827                result.len()
828            )));
829        }
830        Ok(result)
831    }
832
833    /// Decode overflow metadata from the value portion of a decoded overflow cell.
834    fn decode_overflow_meta(meta: &[u8]) -> Result<(PageId, u32)> {
835        if meta.len() < 12 {
836            return Err(Error::Internal("Overflow metadata too small".into()));
837        }
838        let page_id = PageId(u64::from_le_bytes(meta[0..8].try_into().unwrap()));
839        let total_len = u32::from_le_bytes(meta[8..12].try_into().unwrap());
840        Ok((page_id, total_len))
841    }
842
843    /// Fully decode a leaf cell, reading overflow pages if necessary.
844    /// Returns (key, value) with owned value data.
845    fn decode_leaf_cell_full(&self, cell: &[u8]) -> Result<(Vec<u8>, Vec<u8>)> {
846        let (key, val_or_meta) = Self::decode_leaf_cell(cell)?;
847        if Self::cell_is_overflow(cell) {
848            let (overflow_page, total_len) = Self::decode_overflow_meta(val_or_meta)?;
849            let value = self.read_overflow_pages(overflow_page, total_len as usize)?;
850            Ok((key.to_vec(), value))
851        } else {
852            Ok((key.to_vec(), val_or_meta.to_vec()))
853        }
854    }
855
856    /// Free a chain of overflow pages (best-effort; pages become unreachable).
857    fn free_overflow_pages(&self, first_page: PageId) -> Result<()> {
858        let mut current = Some(first_page);
859        while let Some(page_id) = current {
860            let guard = self.buffer_pool.get_page(page_id)?;
861            let page = guard.read();
862            current = page.right_sibling();
863            // We don't have a real free-list integration here, but marking
864            // the page type as Free makes it reclaimable.
865            drop(page);
866            drop(guard);
867            let guard = self.buffer_pool.get_page_for_write(page_id)?;
868            let mut page = guard.write();
869            let ps = page.as_bytes().len();
870            *page = Page::new(page_id, PageType::Free, ps);
871        }
872        Ok(())
873    }
874
875    /// Determine the page size from the buffer pool.
876    fn page_size(&self) -> usize {
877        // We read the root page to determine the page size.
878        // This is cached, so it's cheap.
879        self.buffer_pool
880            .get_page(self.root_page)
881            .map(|g| g.read().as_bytes().len())
882            .unwrap_or(4096)
883    }
884
885    /// Encode a branch cell: [child_id:u64][key_len:u16][key]
886    fn encode_branch_cell(child_id: PageId, key: &[u8]) -> Vec<u8> {
887        let mut buf = Vec::with_capacity(10 + key.len());
888        buf.put_u64_le(child_id.0);
889        buf.put_u16_le(key.len() as u16);
890        buf.extend_from_slice(key);
891        buf
892    }
893
894    /// Decode a branch cell
895    fn decode_branch_cell(cell: &[u8]) -> Result<(PageId, &[u8])> {
896        if cell.len() < 10 {
897            return Err(Error::Internal("Branch cell too small".into()));
898        }
899
900        let child_id = PageId(u64::from_le_bytes(cell[0..8].try_into().unwrap()));
901        let key_len = u16::from_le_bytes([cell[8], cell[9]]) as usize;
902
903        if cell.len() < 10 + key_len {
904            return Err(Error::Internal("Branch cell data truncated".into()));
905        }
906
907        let key = &cell[10..10 + key_len];
908
909        Ok((child_id, key))
910    }
911}
912
913/// Iterator over a range of keys in the B-tree
914pub struct RangeIterator<'a> {
915    btree: &'a BTree,
916    current_page: Option<PageId>,
917    start_key: Vec<u8>,
918    end_key: Vec<u8>,
919    started: bool,
920    /// Buffered cells from current page to avoid per-cell get_page() calls
921    buffered: Vec<(Vec<u8>, Vec<u8>)>,
922}
923
924impl<'a> Iterator for RangeIterator<'a> {
925    type Item = Result<(Vec<u8>, Vec<u8>)>;
926
927    fn next(&mut self) -> Option<Self::Item> {
928        loop {
929            // Yield from buffer first
930            if let Some(pair) = self.buffered.pop() {
931                // Check end bound
932                if pair.0.as_slice() > self.end_key.as_slice() {
933                    return None;
934                }
935                return Some(Ok(pair)); // moved, not cloned
936            }
937
938            // Buffer exhausted - load next page
939            let page_id = self.current_page?;
940            let guard = match self.btree.buffer_pool.get_page(page_id) {
941                Ok(g) => g,
942                Err(e) => return Some(Err(e)),
943            };
944            let page = guard.read();
945
946            self.buffered.clear();
947
948            // Determine starting cell index
949            let start_cell = if !self.started {
950                self.started = true;
951                let mut found_start = page.item_count() as usize; // default: past end
952                for i in 0..page.item_count() as usize {
953                    if let Some(cell) = page.get_cell(i) {
954                        match BTree::decode_leaf_cell(cell) {
955                            Ok((key, _)) if key >= self.start_key.as_slice() => {
956                                found_start = i;
957                                break;
958                            }
959                            Ok(_) => {}
960                            Err(e) => return Some(Err(e)),
961                        }
962                    }
963                }
964                found_start
965            } else {
966                0
967            };
968
969            // Collect all valid cells from this page into buffer
970            for i in start_cell..page.item_count() as usize {
971                if let Some(cell) = page.get_cell(i) {
972                    let slot = match page.get_slot(i) {
973                        Some(s) => s,
974                        None => continue,
975                    };
976                    if slot.is_deleted() {
977                        continue;
978                    }
979                    // Use full decode which resolves overflow pages
980                    match self.btree.decode_leaf_cell_full(cell) {
981                        Ok((key, value)) => {
982                            self.buffered.push((key, value));
983                        }
984                        Err(e) => return Some(Err(e)),
985                    }
986                }
987            }
988
989            // Reverse buffer so pop() returns items in ascending order
990            self.buffered.reverse();
991
992            // Move to next page for future loads
993            self.current_page = page.right_sibling();
994            // guard dropped here - page unpinned
995
996            // If we collected nothing from this page, try next page
997            // (this handles empty pages or pages with only deleted entries)
998        }
999    }
1000}
1001
1002#[cfg(test)]
1003mod tests {
1004    use super::*;
1005    use crate::FileManager;
1006    use featherdb_core::Config;
1007    use tempfile::TempDir;
1008
1009    fn create_test_btree() -> (TempDir, BTree) {
1010        let tmp = TempDir::new().unwrap();
1011        let path = tmp.path().join("test.db");
1012        let config = Config::new(&path);
1013
1014        let fm = Arc::new(FileManager::open(&config).unwrap());
1015        fm.init_if_needed(&config).unwrap();
1016
1017        let bp = Arc::new(BufferPool::new(100, fm));
1018        let btree = BTree::new(bp).unwrap();
1019
1020        (tmp, btree)
1021    }
1022
1023    #[test]
1024    fn test_btree_insert_get() {
1025        let (_tmp, mut btree) = create_test_btree();
1026
1027        // Insert
1028        btree.insert(b"key1", b"value1").unwrap();
1029        btree.insert(b"key2", b"value2").unwrap();
1030        btree.insert(b"key3", b"value3").unwrap();
1031
1032        // Get
1033        assert_eq!(btree.get(b"key1").unwrap(), Some(b"value1".to_vec()));
1034        assert_eq!(btree.get(b"key2").unwrap(), Some(b"value2".to_vec()));
1035        assert_eq!(btree.get(b"key3").unwrap(), Some(b"value3".to_vec()));
1036        assert_eq!(btree.get(b"nonexistent").unwrap(), None);
1037    }
1038
1039    #[test]
1040    fn test_btree_update() {
1041        let (_tmp, mut btree) = create_test_btree();
1042
1043        btree.insert(b"key", b"value1").unwrap();
1044        let old = btree.insert(b"key", b"value2").unwrap();
1045
1046        assert_eq!(old, Some(b"value1".to_vec()));
1047        assert_eq!(btree.get(b"key").unwrap(), Some(b"value2".to_vec()));
1048    }
1049
1050    #[test]
1051    fn test_btree_delete() {
1052        let (_tmp, mut btree) = create_test_btree();
1053
1054        btree.insert(b"key", b"value").unwrap();
1055        let old = btree.delete(b"key").unwrap();
1056
1057        assert_eq!(old, Some(b"value".to_vec()));
1058        assert_eq!(btree.get(b"key").unwrap(), None);
1059    }
1060
1061    #[test]
1062    fn test_btree_iterator() {
1063        let (_tmp, mut btree) = create_test_btree();
1064
1065        btree.insert(b"a", b"1").unwrap();
1066        btree.insert(b"b", b"2").unwrap();
1067        btree.insert(b"c", b"3").unwrap();
1068
1069        let entries: Vec<_> = btree.iter().unwrap().collect();
1070        assert_eq!(entries.len(), 3);
1071    }
1072
1073    #[test]
1074    fn test_btree_many_inserts() {
1075        let (_tmp, mut btree) = create_test_btree();
1076
1077        // Insert enough keys to cause splits
1078        for i in 0..100 {
1079            let key = format!("key{:04}", i);
1080            let value = format!("value{}", i);
1081            btree.insert(key.as_bytes(), value.as_bytes()).unwrap();
1082        }
1083
1084        // Verify all keys are retrievable
1085        for i in 0..100 {
1086            let key = format!("key{:04}", i);
1087            let value = format!("value{}", i);
1088            assert_eq!(btree.get(key.as_bytes()).unwrap(), Some(value.into_bytes()));
1089        }
1090    }
1091}