Skip to main content

citadel_buffer/
cursor.rs

1//! B+ tree cursor for range iteration using a root-to-leaf path stack.
2
3use std::collections::HashMap;
4use std::hash::BuildHasher;
5use std::sync::Arc;
6
7use citadel_core::types::{PageId, PageType, ValueType};
8use citadel_core::{Error, Result};
9use citadel_page::leaf_node::LeafCell;
10use citadel_page::page::Page;
11use citadel_page::{branch_node, leaf_node};
12
13pub trait PageMap {
14    fn get_page(&self, id: &PageId) -> Option<&Page>;
15}
16
17/// Extends `PageMap` with on-demand page loading for lazy cursor traversal.
18pub trait PageLoader: PageMap {
19    fn ensure_loaded(&mut self, id: PageId) -> Result<()>;
20}
21
22impl<S: BuildHasher> PageMap for HashMap<PageId, Page, S> {
23    fn get_page(&self, id: &PageId) -> Option<&Page> {
24        self.get(id)
25    }
26}
27
28impl<S: BuildHasher> PageMap for HashMap<PageId, Arc<Page>, S> {
29    fn get_page(&self, id: &PageId) -> Option<&Page> {
30        self.get(id).map(|a| a.as_ref())
31    }
32}
33
34pub struct CursorEntry {
35    pub key: Vec<u8>,
36    pub val_type: ValueType,
37    pub value: Vec<u8>,
38}
39
40/// B+ tree cursor position with root-to-leaf path.
41pub struct Cursor {
42    /// Stack of (page_id, child_index) from root to current leaf's parent.
43    path: Vec<(PageId, usize)>,
44    /// Current leaf page ID.
45    leaf: PageId,
46    /// Current cell index within the leaf.
47    cell_idx: u16,
48    /// Whether the cursor is positioned at a valid entry.
49    valid: bool,
50}
51
52impl Cursor {
53    /// Seek to first key >= `key`. Empty key = first entry.
54    pub fn seek(pages: &impl PageMap, root: PageId, key: &[u8]) -> Result<Self> {
55        let mut path = Vec::new();
56        let mut current = root;
57
58        loop {
59            let page = pages
60                .get_page(&current)
61                .ok_or(Error::PageOutOfBounds(current))?;
62            match page.page_type() {
63                Some(PageType::Leaf) => break,
64                Some(PageType::Branch) => {
65                    let child_idx = branch_node::search_child_index(page, key);
66                    let child = branch_node::get_child(page, child_idx);
67                    path.push((current, child_idx));
68                    current = child;
69                }
70                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
71            }
72        }
73
74        let page = pages.get_page(&current).unwrap();
75        let cell_idx = match leaf_node::search(page, key) {
76            Ok(idx) => idx,
77            Err(idx) => idx,
78        };
79
80        let valid = cell_idx < page.num_cells();
81
82        let mut cursor = Self {
83            path,
84            leaf: current,
85            cell_idx,
86            valid,
87        };
88
89        // Landed past the end of this leaf: advance to next.
90        if !valid && page.num_cells() > 0 {
91            cursor.advance_leaf(pages)?;
92        } else if page.num_cells() == 0 {
93            cursor.valid = false;
94        }
95
96        Ok(cursor)
97    }
98
99    /// Position the cursor at the first entry in the tree.
100    pub fn first(pages: &impl PageMap, root: PageId) -> Result<Self> {
101        let mut path = Vec::new();
102        let mut current = root;
103
104        loop {
105            let page = pages
106                .get_page(&current)
107                .ok_or(Error::PageOutOfBounds(current))?;
108            match page.page_type() {
109                Some(PageType::Leaf) => break,
110                Some(PageType::Branch) => {
111                    let child = branch_node::get_child(page, 0);
112                    path.push((current, 0));
113                    current = child;
114                }
115                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
116            }
117        }
118
119        let page = pages.get_page(&current).unwrap();
120        let valid = page.num_cells() > 0;
121
122        Ok(Self {
123            path,
124            leaf: current,
125            cell_idx: 0,
126            valid,
127        })
128    }
129
130    /// Position the cursor at the last entry in the tree.
131    pub fn last(pages: &impl PageMap, root: PageId) -> Result<Self> {
132        let mut path = Vec::new();
133        let mut current = root;
134
135        loop {
136            let page = pages
137                .get_page(&current)
138                .ok_or(Error::PageOutOfBounds(current))?;
139            match page.page_type() {
140                Some(PageType::Leaf) => break,
141                Some(PageType::Branch) => {
142                    let n = page.num_cells() as usize;
143                    let child = page.right_child();
144                    path.push((current, n));
145                    current = child;
146                }
147                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
148            }
149        }
150
151        let page = pages.get_page(&current).unwrap();
152        let n = page.num_cells();
153        let valid = n > 0;
154        let cell_idx = if valid { n - 1 } else { 0 };
155
156        Ok(Self {
157            path,
158            leaf: current,
159            cell_idx,
160            valid,
161        })
162    }
163
164    /// Whether the cursor is at a valid position.
165    pub fn is_valid(&self) -> bool {
166        self.valid
167    }
168
169    /// Current leaf page ID.
170    pub fn leaf_page_id(&self) -> PageId {
171        self.leaf
172    }
173
174    /// Current cell index within the leaf.
175    pub fn cell_index(&self) -> u16 {
176        self.cell_idx
177    }
178
179    /// Update the current leaf page ID after a CoW operation.
180    pub fn set_leaf_page_id(&mut self, id: PageId) {
181        self.leaf = id;
182    }
183
184    pub fn current(&self, pages: &impl PageMap) -> Option<CursorEntry> {
185        if !self.valid {
186            return None;
187        }
188        let page = pages.get_page(&self.leaf)?;
189        let cell = leaf_node::read_cell(page, self.cell_idx);
190        Some(CursorEntry {
191            key: cell.key.to_vec(),
192            val_type: cell.val_type,
193            value: cell.value.to_vec(),
194        })
195    }
196
197    pub fn current_ref<'a, P: PageMap>(&self, pages: &'a P) -> Option<LeafCell<'a>> {
198        if !self.valid {
199            return None;
200        }
201        let page = pages.get_page(&self.leaf)?;
202        Some(leaf_node::read_cell(page, self.cell_idx))
203    }
204
205    /// Move the cursor to the next entry (forward).
206    pub fn next(&mut self, pages: &impl PageMap) -> Result<bool> {
207        if !self.valid {
208            return Ok(false);
209        }
210
211        let page = pages
212            .get_page(&self.leaf)
213            .ok_or(Error::PageOutOfBounds(self.leaf))?;
214
215        if self.cell_idx + 1 < page.num_cells() {
216            self.cell_idx += 1;
217            return Ok(true);
218        }
219
220        self.advance_leaf(pages)
221    }
222
223    /// Move the cursor to the previous entry (backward).
224    pub fn prev(&mut self, pages: &impl PageMap) -> Result<bool> {
225        if !self.valid {
226            return Ok(false);
227        }
228
229        if self.cell_idx > 0 {
230            self.cell_idx -= 1;
231            return Ok(true);
232        }
233
234        self.retreat_leaf(pages)
235    }
236
237    /// Advance to the first cell of the next leaf.
238    fn advance_leaf(&mut self, pages: &impl PageMap) -> Result<bool> {
239        while let Some((parent_id, child_idx)) = self.path.pop() {
240            let parent = pages
241                .get_page(&parent_id)
242                .ok_or(Error::PageOutOfBounds(parent_id))?;
243            let n = parent.num_cells() as usize;
244
245            if child_idx < n {
246                let next_child_idx = child_idx + 1;
247                let next_child = branch_node::get_child(parent, next_child_idx);
248                self.path.push((parent_id, next_child_idx));
249
250                let mut current = next_child;
251                loop {
252                    let page = pages
253                        .get_page(&current)
254                        .ok_or(Error::PageOutOfBounds(current))?;
255                    match page.page_type() {
256                        Some(PageType::Leaf) => {
257                            self.leaf = current;
258                            self.cell_idx = 0;
259                            self.valid = page.num_cells() > 0;
260                            return Ok(self.valid);
261                        }
262                        Some(PageType::Branch) => {
263                            let child = branch_node::get_child(page, 0);
264                            self.path.push((current, 0));
265                            current = child;
266                        }
267                        _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
268                    }
269                }
270            }
271            // child_idx == num_cells (rightmost child) - keep going up
272        }
273
274        // No more siblings - we've exhausted the tree
275        self.valid = false;
276        Ok(false)
277    }
278
279    /// Seek with lazy page loading — only loads the root-to-leaf path.
280    pub fn seek_lazy(pages: &mut impl PageLoader, root: PageId, key: &[u8]) -> Result<Self> {
281        let mut path = Vec::new();
282        let mut current = root;
283
284        loop {
285            pages.ensure_loaded(current)?;
286            let page = pages
287                .get_page(&current)
288                .ok_or(Error::PageOutOfBounds(current))?;
289            match page.page_type() {
290                Some(PageType::Leaf) => break,
291                Some(PageType::Branch) => {
292                    let child_idx = branch_node::search_child_index(page, key);
293                    let child = branch_node::get_child(page, child_idx);
294                    path.push((current, child_idx));
295                    current = child;
296                }
297                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
298            }
299        }
300
301        let page = pages.get_page(&current).unwrap();
302        let cell_idx = match leaf_node::search(page, key) {
303            Ok(idx) => idx,
304            Err(idx) => idx,
305        };
306
307        let valid = cell_idx < page.num_cells();
308
309        let mut cursor = Self {
310            path,
311            leaf: current,
312            cell_idx,
313            valid,
314        };
315
316        if !valid && page.num_cells() > 0 {
317            cursor.advance_leaf_lazy(pages)?;
318        } else if page.num_cells() == 0 {
319            cursor.valid = false;
320        }
321
322        Ok(cursor)
323    }
324
325    /// Read the current entry, loading the leaf page if needed.
326    pub fn current_ref_lazy<'a, P: PageLoader + ?Sized>(
327        &self,
328        pages: &'a mut P,
329    ) -> Option<LeafCell<'a>> {
330        if !self.valid {
331            return None;
332        }
333        pages.ensure_loaded(self.leaf).ok()?;
334        let page = pages.get_page(&self.leaf)?;
335        Some(leaf_node::read_cell(page, self.cell_idx))
336    }
337
338    /// Advance to the next entry, loading pages on demand.
339    pub fn next_lazy<P: PageLoader + ?Sized>(&mut self, pages: &mut P) -> Result<bool> {
340        if !self.valid {
341            return Ok(false);
342        }
343
344        pages.ensure_loaded(self.leaf)?;
345        let page = pages
346            .get_page(&self.leaf)
347            .ok_or(Error::PageOutOfBounds(self.leaf))?;
348
349        if self.cell_idx + 1 < page.num_cells() {
350            self.cell_idx += 1;
351            return Ok(true);
352        }
353
354        self.advance_leaf_lazy(pages)
355    }
356
357    /// Advance to the next leaf, loading child pages on demand.
358    fn advance_leaf_lazy<P: PageLoader + ?Sized>(&mut self, pages: &mut P) -> Result<bool> {
359        while let Some((parent_id, child_idx)) = self.path.pop() {
360            let parent = pages
361                .get_page(&parent_id)
362                .ok_or(Error::PageOutOfBounds(parent_id))?;
363            let n = parent.num_cells() as usize;
364
365            if child_idx < n {
366                let next_child_idx = child_idx + 1;
367                let next_child = branch_node::get_child(parent, next_child_idx);
368                self.path.push((parent_id, next_child_idx));
369
370                let mut current = next_child;
371                loop {
372                    pages.ensure_loaded(current)?;
373                    let page = pages
374                        .get_page(&current)
375                        .ok_or(Error::PageOutOfBounds(current))?;
376                    match page.page_type() {
377                        Some(PageType::Leaf) => {
378                            self.leaf = current;
379                            self.cell_idx = 0;
380                            self.valid = page.num_cells() > 0;
381                            return Ok(self.valid);
382                        }
383                        Some(PageType::Branch) => {
384                            let child = branch_node::get_child(page, 0);
385                            self.path.push((current, 0));
386                            current = child;
387                        }
388                        _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
389                    }
390                }
391            }
392        }
393
394        self.valid = false;
395        Ok(false)
396    }
397
398    /// Retreat to the last cell of the previous leaf.
399    fn retreat_leaf(&mut self, pages: &impl PageMap) -> Result<bool> {
400        while let Some((parent_id, child_idx)) = self.path.pop() {
401            if child_idx > 0 {
402                let prev_child_idx = child_idx - 1;
403                let parent = pages
404                    .get_page(&parent_id)
405                    .ok_or(Error::PageOutOfBounds(parent_id))?;
406                let prev_child = branch_node::get_child(parent, prev_child_idx);
407                self.path.push((parent_id, prev_child_idx));
408
409                let mut current = prev_child;
410                loop {
411                    let page = pages
412                        .get_page(&current)
413                        .ok_or(Error::PageOutOfBounds(current))?;
414                    match page.page_type() {
415                        Some(PageType::Leaf) => {
416                            self.leaf = current;
417                            let n = page.num_cells();
418                            if n > 0 {
419                                self.cell_idx = n - 1;
420                                self.valid = true;
421                            } else {
422                                self.valid = false;
423                            }
424                            return Ok(self.valid);
425                        }
426                        Some(PageType::Branch) => {
427                            let n = page.num_cells() as usize;
428                            let child = page.right_child();
429                            self.path.push((current, n));
430                            current = child;
431                        }
432                        _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
433                    }
434                }
435            }
436            // child_idx == 0 (leftmost child) - keep going up
437        }
438
439        // No more siblings - we've exhausted the tree
440        self.valid = false;
441        Ok(false)
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use super::*;
448    use crate::allocator::PageAllocator;
449    use crate::btree::BTree;
450    use citadel_core::types::TxnId;
451
452    fn build_tree(keys: &[&[u8]]) -> (rustc_hash::FxHashMap<PageId, Page>, BTree) {
453        let mut pages = rustc_hash::FxHashMap::default();
454        let mut alloc = PageAllocator::new(0);
455        let mut tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
456        for k in keys {
457            tree.insert(&mut pages, &mut alloc, TxnId(1), k, ValueType::Inline, k)
458                .unwrap();
459        }
460        (pages, tree)
461    }
462
463    #[test]
464    fn cursor_forward_iteration() {
465        let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
466        let mut cursor = Cursor::first(&pages, tree.root).unwrap();
467
468        let mut collected = Vec::new();
469        while cursor.is_valid() {
470            let entry = cursor.current(&pages).unwrap();
471            collected.push(entry.key.clone());
472            cursor.next(&pages).unwrap();
473        }
474
475        assert_eq!(collected, vec![b"a", b"b", b"c", b"d", b"e"]);
476    }
477
478    #[test]
479    fn cursor_backward_iteration() {
480        let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
481        let mut cursor = Cursor::last(&pages, tree.root).unwrap();
482
483        let mut collected = Vec::new();
484        while cursor.is_valid() {
485            let entry = cursor.current(&pages).unwrap();
486            collected.push(entry.key.clone());
487            cursor.prev(&pages).unwrap();
488        }
489
490        assert_eq!(collected, vec![b"e", b"d", b"c", b"b", b"a"]);
491    }
492
493    #[test]
494    fn cursor_seek() {
495        let (pages, tree) = build_tree(&[b"b", b"d", b"f", b"h"]);
496        // Seek to "c" - should land on "d" (first key >= "c")
497        let cursor = Cursor::seek(&pages, tree.root, b"c").unwrap();
498        assert!(cursor.is_valid());
499        let entry = cursor.current(&pages).unwrap();
500        assert_eq!(entry.key, b"d");
501    }
502
503    #[test]
504    fn cursor_seek_exact() {
505        let (pages, tree) = build_tree(&[b"b", b"d", b"f"]);
506        let cursor = Cursor::seek(&pages, tree.root, b"d").unwrap();
507        assert!(cursor.is_valid());
508        let entry = cursor.current(&pages).unwrap();
509        assert_eq!(entry.key, b"d");
510    }
511
512    #[test]
513    fn cursor_seek_past_end() {
514        let (pages, tree) = build_tree(&[b"a", b"b", b"c"]);
515        let cursor = Cursor::seek(&pages, tree.root, b"z").unwrap();
516        assert!(!cursor.is_valid());
517    }
518
519    #[test]
520    fn cursor_empty_tree() {
521        let mut pages = rustc_hash::FxHashMap::default();
522        let mut alloc = PageAllocator::new(0);
523        let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
524
525        let cursor = Cursor::first(&pages, tree.root).unwrap();
526        assert!(!cursor.is_valid());
527    }
528
529    /// PageLoader backed by a pre-built FxHashMap — tracks unique pages touched.
530    struct TrackingLoader {
531        pages: rustc_hash::FxHashMap<PageId, Page>,
532        touched: std::collections::HashSet<PageId>,
533    }
534
535    impl TrackingLoader {
536        fn new(pages: rustc_hash::FxHashMap<PageId, Page>) -> Self {
537            Self {
538                pages,
539                touched: std::collections::HashSet::new(),
540            }
541        }
542        fn unique_pages_touched(&self) -> usize {
543            self.touched.len()
544        }
545    }
546
547    impl PageMap for TrackingLoader {
548        fn get_page(&self, id: &PageId) -> Option<&Page> {
549            self.pages.get(id)
550        }
551    }
552
553    impl PageLoader for TrackingLoader {
554        fn ensure_loaded(&mut self, id: PageId) -> citadel_core::Result<()> {
555            if self.pages.contains_key(&id) {
556                self.touched.insert(id);
557                Ok(())
558            } else {
559                Err(citadel_core::Error::PageOutOfBounds(id))
560            }
561        }
562    }
563
564    #[test]
565    fn lazy_cursor_forward() {
566        let keys: Vec<Vec<u8>> = (0..2000u32)
567            .map(|i| format!("{i:06}").into_bytes())
568            .collect();
569        let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
570        let (pages, tree) = build_tree(&key_refs);
571
572        let mut loader = TrackingLoader::new(pages);
573        let mut cursor = Cursor::seek_lazy(&mut loader, tree.root, b"").unwrap();
574        let mut count = 0u32;
575        while cursor.is_valid() {
576            let entry = cursor.current_ref_lazy(&mut loader);
577            assert!(entry.is_some());
578            count += 1;
579            cursor.next_lazy(&mut loader).unwrap();
580        }
581        assert_eq!(count, 2000);
582    }
583
584    #[test]
585    fn lazy_cursor_range_loads_fewer_pages() {
586        let keys: Vec<Vec<u8>> = (0..2000u32)
587            .map(|i| format!("{i:06}").into_bytes())
588            .collect();
589        let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
590        let (pages, tree) = build_tree(&key_refs);
591        let total_pages = pages.len();
592
593        let mut loader = TrackingLoader::new(pages);
594        let mut cursor = Cursor::seek_lazy(&mut loader, tree.root, b"001000").unwrap();
595        let mut count = 0u32;
596        while cursor.is_valid() {
597            if let Some(entry) = cursor.current_ref_lazy(&mut loader) {
598                if entry.key > b"001009".as_slice() {
599                    break;
600                }
601                count += 1;
602            }
603            cursor.next_lazy(&mut loader).unwrap();
604        }
605        assert_eq!(count, 10);
606        // Lazy loading should touch far fewer unique pages than the full tree
607        let touched = loader.unique_pages_touched();
608        assert!(
609            touched < total_pages,
610            "lazy touched {} unique pages but tree has {} total",
611            touched,
612            total_pages,
613        );
614    }
615
616    #[test]
617    fn cursor_large_tree_forward() {
618        let keys: Vec<Vec<u8>> = (0..2000u32)
619            .map(|i| format!("{i:06}").into_bytes())
620            .collect();
621        let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
622        let (pages, tree) = build_tree(&key_refs);
623
624        let mut cursor = Cursor::first(&pages, tree.root).unwrap();
625        let mut count = 0u32;
626        let mut prev_key: Option<Vec<u8>> = None;
627        while cursor.is_valid() {
628            let entry = cursor.current(&pages).unwrap();
629            if let Some(ref pk) = prev_key {
630                assert!(entry.key > *pk, "keys should be in sorted order");
631            }
632            prev_key = Some(entry.key);
633            count += 1;
634            cursor.next(&pages).unwrap();
635        }
636        assert_eq!(count, 2000);
637    }
638}