Skip to main content

citadel_buffer/
cursor.rs

1//! B+ tree cursor for range iteration using a root-to-leaf path stack.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use citadel_core::types::{PageId, PageType, ValueType};
7use citadel_core::{Error, Result};
8use citadel_page::leaf_node::LeafCell;
9use citadel_page::page::Page;
10use citadel_page::{branch_node, leaf_node};
11
12pub trait PageMap {
13    fn get_page(&self, id: &PageId) -> Option<&Page>;
14}
15
16/// Extends `PageMap` with on-demand page loading for lazy cursor traversal.
17pub trait PageLoader: PageMap {
18    fn ensure_loaded(&mut self, id: PageId) -> Result<()>;
19}
20
21impl PageMap for HashMap<PageId, Page> {
22    fn get_page(&self, id: &PageId) -> Option<&Page> {
23        self.get(id)
24    }
25}
26
27impl PageMap for HashMap<PageId, Arc<Page>> {
28    fn get_page(&self, id: &PageId) -> Option<&Page> {
29        self.get(id).map(|a| a.as_ref())
30    }
31}
32
33pub struct CursorEntry {
34    pub key: Vec<u8>,
35    pub val_type: ValueType,
36    pub value: Vec<u8>,
37}
38
39/// B+ tree cursor position with root-to-leaf path.
40pub struct Cursor {
41    /// Stack of (page_id, child_index) from root to current leaf's parent.
42    path: Vec<(PageId, usize)>,
43    /// Current leaf page ID.
44    leaf: PageId,
45    /// Current cell index within the leaf.
46    cell_idx: u16,
47    /// Whether the cursor is positioned at a valid entry.
48    valid: bool,
49}
50
51impl Cursor {
52    /// Seek to first key >= `key`. Empty key = first entry.
53    pub fn seek(pages: &impl PageMap, root: PageId, key: &[u8]) -> Result<Self> {
54        let mut path = Vec::new();
55        let mut current = root;
56
57        // Walk to the leaf
58        loop {
59            let page = pages
60                .get_page(&current)
61                .ok_or(Error::PageOutOfBounds(current))?;
62            match page.page_type() {
63                Some(PageType::Leaf) => break,
64                Some(PageType::Branch) => {
65                    let child_idx = branch_node::search_child_index(page, key);
66                    let child = branch_node::get_child(page, child_idx);
67                    path.push((current, child_idx));
68                    current = child;
69                }
70                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
71            }
72        }
73
74        // Find the cell index in the leaf
75        let page = pages.get_page(&current).unwrap();
76        let cell_idx = match leaf_node::search(page, key) {
77            Ok(idx) => idx,
78            Err(idx) => idx,
79        };
80
81        let valid = cell_idx < page.num_cells();
82
83        let mut cursor = Self {
84            path,
85            leaf: current,
86            cell_idx,
87            valid,
88        };
89
90        // If we landed past the end of this leaf, advance to next leaf
91        if !valid && page.num_cells() > 0 {
92            cursor.advance_leaf(pages)?;
93        } else if page.num_cells() == 0 {
94            cursor.valid = false;
95        }
96
97        Ok(cursor)
98    }
99
100    /// Position the cursor at the first entry in the tree.
101    pub fn first(pages: &impl PageMap, root: PageId) -> Result<Self> {
102        let mut path = Vec::new();
103        let mut current = root;
104
105        // Walk to the leftmost leaf
106        loop {
107            let page = pages
108                .get_page(&current)
109                .ok_or(Error::PageOutOfBounds(current))?;
110            match page.page_type() {
111                Some(PageType::Leaf) => break,
112                Some(PageType::Branch) => {
113                    let child = branch_node::get_child(page, 0);
114                    path.push((current, 0));
115                    current = child;
116                }
117                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
118            }
119        }
120
121        let page = pages.get_page(&current).unwrap();
122        let valid = page.num_cells() > 0;
123
124        Ok(Self {
125            path,
126            leaf: current,
127            cell_idx: 0,
128            valid,
129        })
130    }
131
132    /// Position the cursor at the last entry in the tree.
133    pub fn last(pages: &impl PageMap, root: PageId) -> Result<Self> {
134        let mut path = Vec::new();
135        let mut current = root;
136
137        // Walk to the rightmost leaf
138        loop {
139            let page = pages
140                .get_page(&current)
141                .ok_or(Error::PageOutOfBounds(current))?;
142            match page.page_type() {
143                Some(PageType::Leaf) => break,
144                Some(PageType::Branch) => {
145                    let n = page.num_cells() as usize;
146                    let child = page.right_child();
147                    path.push((current, n));
148                    current = child;
149                }
150                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
151            }
152        }
153
154        let page = pages.get_page(&current).unwrap();
155        let n = page.num_cells();
156        let valid = n > 0;
157        let cell_idx = if valid { n - 1 } else { 0 };
158
159        Ok(Self {
160            path,
161            leaf: current,
162            cell_idx,
163            valid,
164        })
165    }
166
167    /// Whether the cursor is at a valid position.
168    pub fn is_valid(&self) -> bool {
169        self.valid
170    }
171
172    /// Current leaf page ID.
173    pub fn leaf_page_id(&self) -> PageId {
174        self.leaf
175    }
176
177    /// Current cell index within the leaf.
178    pub fn cell_index(&self) -> u16 {
179        self.cell_idx
180    }
181
182    /// Update the current leaf page ID after a CoW operation.
183    pub fn set_leaf_page_id(&mut self, id: PageId) {
184        self.leaf = id;
185    }
186
187    pub fn current(&self, pages: &impl PageMap) -> Option<CursorEntry> {
188        if !self.valid {
189            return None;
190        }
191        let page = pages.get_page(&self.leaf)?;
192        let cell = leaf_node::read_cell(page, self.cell_idx);
193        Some(CursorEntry {
194            key: cell.key.to_vec(),
195            val_type: cell.val_type,
196            value: cell.value.to_vec(),
197        })
198    }
199
200    pub fn current_ref<'a, P: PageMap>(&self, pages: &'a P) -> Option<LeafCell<'a>> {
201        if !self.valid {
202            return None;
203        }
204        let page = pages.get_page(&self.leaf)?;
205        Some(leaf_node::read_cell(page, self.cell_idx))
206    }
207
208    /// Move the cursor to the next entry (forward).
209    pub fn next(&mut self, pages: &impl PageMap) -> Result<bool> {
210        if !self.valid {
211            return Ok(false);
212        }
213
214        let page = pages
215            .get_page(&self.leaf)
216            .ok_or(Error::PageOutOfBounds(self.leaf))?;
217
218        if self.cell_idx + 1 < page.num_cells() {
219            self.cell_idx += 1;
220            return Ok(true);
221        }
222
223        // Need to move to the next leaf
224        self.advance_leaf(pages)
225    }
226
227    /// Move the cursor to the previous entry (backward).
228    pub fn prev(&mut self, pages: &impl PageMap) -> Result<bool> {
229        if !self.valid {
230            return Ok(false);
231        }
232
233        if self.cell_idx > 0 {
234            self.cell_idx -= 1;
235            return Ok(true);
236        }
237
238        // Need to move to the previous leaf
239        self.retreat_leaf(pages)
240    }
241
242    /// Advance to the first cell of the next leaf.
243    fn advance_leaf(&mut self, pages: &impl PageMap) -> Result<bool> {
244        // Walk up the path to find a parent where we can go right
245        while let Some((parent_id, child_idx)) = self.path.pop() {
246            let parent = pages
247                .get_page(&parent_id)
248                .ok_or(Error::PageOutOfBounds(parent_id))?;
249            let n = parent.num_cells() as usize;
250
251            if child_idx < n {
252                // There's a sibling to the right: child_idx + 1
253                let next_child_idx = child_idx + 1;
254                let next_child = branch_node::get_child(parent, next_child_idx);
255                self.path.push((parent_id, next_child_idx));
256
257                // Walk down to the leftmost leaf of this subtree
258                let mut current = next_child;
259                loop {
260                    let page = pages
261                        .get_page(&current)
262                        .ok_or(Error::PageOutOfBounds(current))?;
263                    match page.page_type() {
264                        Some(PageType::Leaf) => {
265                            self.leaf = current;
266                            self.cell_idx = 0;
267                            self.valid = page.num_cells() > 0;
268                            return Ok(self.valid);
269                        }
270                        Some(PageType::Branch) => {
271                            let child = branch_node::get_child(page, 0);
272                            self.path.push((current, 0));
273                            current = child;
274                        }
275                        _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
276                    }
277                }
278            }
279            // child_idx == num_cells (rightmost child) - keep going up
280        }
281
282        // No more siblings - we've exhausted the tree
283        self.valid = false;
284        Ok(false)
285    }
286
287    // ── Lazy variants (load pages on demand via PageLoader) ─────────
288
289    /// Seek with lazy page loading — only loads the root-to-leaf path.
290    pub fn seek_lazy(pages: &mut impl PageLoader, root: PageId, key: &[u8]) -> Result<Self> {
291        let mut path = Vec::new();
292        let mut current = root;
293
294        loop {
295            pages.ensure_loaded(current)?;
296            let page = pages
297                .get_page(&current)
298                .ok_or(Error::PageOutOfBounds(current))?;
299            match page.page_type() {
300                Some(PageType::Leaf) => break,
301                Some(PageType::Branch) => {
302                    let child_idx = branch_node::search_child_index(page, key);
303                    let child = branch_node::get_child(page, child_idx);
304                    path.push((current, child_idx));
305                    current = child;
306                }
307                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
308            }
309        }
310
311        let page = pages.get_page(&current).unwrap();
312        let cell_idx = match leaf_node::search(page, key) {
313            Ok(idx) => idx,
314            Err(idx) => idx,
315        };
316
317        let valid = cell_idx < page.num_cells();
318
319        let mut cursor = Self {
320            path,
321            leaf: current,
322            cell_idx,
323            valid,
324        };
325
326        if !valid && page.num_cells() > 0 {
327            cursor.advance_leaf_lazy(pages)?;
328        } else if page.num_cells() == 0 {
329            cursor.valid = false;
330        }
331
332        Ok(cursor)
333    }
334
335    /// Read the current entry, loading the leaf page if needed.
336    pub fn current_ref_lazy<'a, P: PageLoader>(&self, pages: &'a mut P) -> Option<LeafCell<'a>> {
337        if !self.valid {
338            return None;
339        }
340        pages.ensure_loaded(self.leaf).ok()?;
341        let page = pages.get_page(&self.leaf)?;
342        Some(leaf_node::read_cell(page, self.cell_idx))
343    }
344
345    /// Advance to the next entry, loading pages on demand.
346    pub fn next_lazy(&mut self, pages: &mut impl PageLoader) -> Result<bool> {
347        if !self.valid {
348            return Ok(false);
349        }
350
351        pages.ensure_loaded(self.leaf)?;
352        let page = pages
353            .get_page(&self.leaf)
354            .ok_or(Error::PageOutOfBounds(self.leaf))?;
355
356        if self.cell_idx + 1 < page.num_cells() {
357            self.cell_idx += 1;
358            return Ok(true);
359        }
360
361        self.advance_leaf_lazy(pages)
362    }
363
364    /// Advance to the next leaf, loading child pages on demand.
365    fn advance_leaf_lazy(&mut self, pages: &mut impl PageLoader) -> Result<bool> {
366        while let Some((parent_id, child_idx)) = self.path.pop() {
367            let parent = pages
368                .get_page(&parent_id)
369                .ok_or(Error::PageOutOfBounds(parent_id))?;
370            let n = parent.num_cells() as usize;
371
372            if child_idx < n {
373                let next_child_idx = child_idx + 1;
374                let next_child = branch_node::get_child(parent, next_child_idx);
375                self.path.push((parent_id, next_child_idx));
376
377                let mut current = next_child;
378                loop {
379                    pages.ensure_loaded(current)?;
380                    let page = pages
381                        .get_page(&current)
382                        .ok_or(Error::PageOutOfBounds(current))?;
383                    match page.page_type() {
384                        Some(PageType::Leaf) => {
385                            self.leaf = current;
386                            self.cell_idx = 0;
387                            self.valid = page.num_cells() > 0;
388                            return Ok(self.valid);
389                        }
390                        Some(PageType::Branch) => {
391                            let child = branch_node::get_child(page, 0);
392                            self.path.push((current, 0));
393                            current = child;
394                        }
395                        _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
396                    }
397                }
398            }
399        }
400
401        self.valid = false;
402        Ok(false)
403    }
404
405    /// Retreat to the last cell of the previous leaf.
406    fn retreat_leaf(&mut self, pages: &impl PageMap) -> Result<bool> {
407        // Walk up the path to find a parent where we can go left
408        while let Some((parent_id, child_idx)) = self.path.pop() {
409            if child_idx > 0 {
410                // There's a sibling to the left: child_idx - 1
411                let prev_child_idx = child_idx - 1;
412                let parent = pages
413                    .get_page(&parent_id)
414                    .ok_or(Error::PageOutOfBounds(parent_id))?;
415                let prev_child = branch_node::get_child(parent, prev_child_idx);
416                self.path.push((parent_id, prev_child_idx));
417
418                // Walk down to the rightmost leaf of this subtree
419                let mut current = prev_child;
420                loop {
421                    let page = pages
422                        .get_page(&current)
423                        .ok_or(Error::PageOutOfBounds(current))?;
424                    match page.page_type() {
425                        Some(PageType::Leaf) => {
426                            self.leaf = current;
427                            let n = page.num_cells();
428                            if n > 0 {
429                                self.cell_idx = n - 1;
430                                self.valid = true;
431                            } else {
432                                self.valid = false;
433                            }
434                            return Ok(self.valid);
435                        }
436                        Some(PageType::Branch) => {
437                            let n = page.num_cells() as usize;
438                            let child = page.right_child();
439                            self.path.push((current, n));
440                            current = child;
441                        }
442                        _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
443                    }
444                }
445            }
446            // child_idx == 0 (leftmost child) - keep going up
447        }
448
449        // No more siblings - we've exhausted the tree
450        self.valid = false;
451        Ok(false)
452    }
453}
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458    use crate::allocator::PageAllocator;
459    use crate::btree::BTree;
460    use citadel_core::types::TxnId;
461
462    fn build_tree(keys: &[&[u8]]) -> (HashMap<PageId, Page>, BTree) {
463        let mut pages = HashMap::new();
464        let mut alloc = PageAllocator::new(0);
465        let mut tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
466        for k in keys {
467            tree.insert(&mut pages, &mut alloc, TxnId(1), k, ValueType::Inline, k)
468                .unwrap();
469        }
470        (pages, tree)
471    }
472
473    #[test]
474    fn cursor_forward_iteration() {
475        let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
476        let mut cursor = Cursor::first(&pages, tree.root).unwrap();
477
478        let mut collected = Vec::new();
479        while cursor.is_valid() {
480            let entry = cursor.current(&pages).unwrap();
481            collected.push(entry.key.clone());
482            cursor.next(&pages).unwrap();
483        }
484
485        assert_eq!(collected, vec![b"a", b"b", b"c", b"d", b"e"]);
486    }
487
488    #[test]
489    fn cursor_backward_iteration() {
490        let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
491        let mut cursor = Cursor::last(&pages, tree.root).unwrap();
492
493        let mut collected = Vec::new();
494        while cursor.is_valid() {
495            let entry = cursor.current(&pages).unwrap();
496            collected.push(entry.key.clone());
497            cursor.prev(&pages).unwrap();
498        }
499
500        assert_eq!(collected, vec![b"e", b"d", b"c", b"b", b"a"]);
501    }
502
503    #[test]
504    fn cursor_seek() {
505        let (pages, tree) = build_tree(&[b"b", b"d", b"f", b"h"]);
506        // Seek to "c" - should land on "d" (first key >= "c")
507        let cursor = Cursor::seek(&pages, tree.root, b"c").unwrap();
508        assert!(cursor.is_valid());
509        let entry = cursor.current(&pages).unwrap();
510        assert_eq!(entry.key, b"d");
511    }
512
513    #[test]
514    fn cursor_seek_exact() {
515        let (pages, tree) = build_tree(&[b"b", b"d", b"f"]);
516        let cursor = Cursor::seek(&pages, tree.root, b"d").unwrap();
517        assert!(cursor.is_valid());
518        let entry = cursor.current(&pages).unwrap();
519        assert_eq!(entry.key, b"d");
520    }
521
522    #[test]
523    fn cursor_seek_past_end() {
524        let (pages, tree) = build_tree(&[b"a", b"b", b"c"]);
525        let cursor = Cursor::seek(&pages, tree.root, b"z").unwrap();
526        assert!(!cursor.is_valid());
527    }
528
529    #[test]
530    fn cursor_empty_tree() {
531        let mut pages = HashMap::new();
532        let mut alloc = PageAllocator::new(0);
533        let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
534
535        let cursor = Cursor::first(&pages, tree.root).unwrap();
536        assert!(!cursor.is_valid());
537    }
538
539    /// PageLoader backed by a pre-built HashMap — tracks unique pages touched.
540    struct TrackingLoader {
541        pages: HashMap<PageId, Page>,
542        touched: std::collections::HashSet<PageId>,
543    }
544
545    impl TrackingLoader {
546        fn new(pages: HashMap<PageId, Page>) -> Self {
547            Self {
548                pages,
549                touched: std::collections::HashSet::new(),
550            }
551        }
552        fn unique_pages_touched(&self) -> usize {
553            self.touched.len()
554        }
555    }
556
557    impl PageMap for TrackingLoader {
558        fn get_page(&self, id: &PageId) -> Option<&Page> {
559            self.pages.get(id)
560        }
561    }
562
563    impl PageLoader for TrackingLoader {
564        fn ensure_loaded(&mut self, id: PageId) -> citadel_core::Result<()> {
565            if self.pages.contains_key(&id) {
566                self.touched.insert(id);
567                Ok(())
568            } else {
569                Err(citadel_core::Error::PageOutOfBounds(id))
570            }
571        }
572    }
573
574    #[test]
575    fn lazy_cursor_forward() {
576        let keys: Vec<Vec<u8>> = (0..2000u32)
577            .map(|i| format!("{i:06}").into_bytes())
578            .collect();
579        let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
580        let (pages, tree) = build_tree(&key_refs);
581
582        let mut loader = TrackingLoader::new(pages);
583        let mut cursor = Cursor::seek_lazy(&mut loader, tree.root, b"").unwrap();
584        let mut count = 0u32;
585        while cursor.is_valid() {
586            let entry = cursor.current_ref_lazy(&mut loader);
587            assert!(entry.is_some());
588            count += 1;
589            cursor.next_lazy(&mut loader).unwrap();
590        }
591        assert_eq!(count, 2000);
592    }
593
594    #[test]
595    fn lazy_cursor_range_loads_fewer_pages() {
596        let keys: Vec<Vec<u8>> = (0..2000u32)
597            .map(|i| format!("{i:06}").into_bytes())
598            .collect();
599        let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
600        let (pages, tree) = build_tree(&key_refs);
601        let total_pages = pages.len();
602
603        let mut loader = TrackingLoader::new(pages);
604        let mut cursor = Cursor::seek_lazy(&mut loader, tree.root, b"001000").unwrap();
605        let mut count = 0u32;
606        while cursor.is_valid() {
607            if let Some(entry) = cursor.current_ref_lazy(&mut loader) {
608                if entry.key > b"001009".as_slice() {
609                    break;
610                }
611                count += 1;
612            }
613            cursor.next_lazy(&mut loader).unwrap();
614        }
615        assert_eq!(count, 10);
616        // Lazy loading should touch far fewer unique pages than the full tree
617        let touched = loader.unique_pages_touched();
618        assert!(
619            touched < total_pages,
620            "lazy touched {} unique pages but tree has {} total",
621            touched,
622            total_pages,
623        );
624    }
625
626    #[test]
627    fn cursor_large_tree_forward() {
628        let keys: Vec<Vec<u8>> = (0..2000u32)
629            .map(|i| format!("{i:06}").into_bytes())
630            .collect();
631        let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
632        let (pages, tree) = build_tree(&key_refs);
633
634        let mut cursor = Cursor::first(&pages, tree.root).unwrap();
635        let mut count = 0u32;
636        let mut prev_key: Option<Vec<u8>> = None;
637        while cursor.is_valid() {
638            let entry = cursor.current(&pages).unwrap();
639            if let Some(ref pk) = prev_key {
640                assert!(entry.key > *pk, "keys should be in sorted order");
641            }
642            prev_key = Some(entry.key);
643            count += 1;
644            cursor.next(&pages).unwrap();
645        }
646        assert_eq!(count, 2000);
647    }
648}