Skip to main content

citadel_buffer/
cursor.rs

1//! Cursor for B+ tree range iteration.
2//!
3//! Uses a saved root-to-leaf path stack (no sibling pointers needed).
4//! Supports forward and backward iteration across leaf boundaries.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use citadel_core::types::{PageId, PageType, ValueType};
10use citadel_core::{Error, Result};
11use citadel_page::leaf_node::LeafCell;
12use citadel_page::page::Page;
13use citadel_page::{branch_node, leaf_node};
14
15pub trait PageMap {
16    fn get_page(&self, id: &PageId) -> Option<&Page>;
17}
18
19impl PageMap for HashMap<PageId, Page> {
20    fn get_page(&self, id: &PageId) -> Option<&Page> {
21        self.get(id)
22    }
23}
24
25impl PageMap for HashMap<PageId, Arc<Page>> {
26    fn get_page(&self, id: &PageId) -> Option<&Page> {
27        self.get(id).map(|a| a.as_ref())
28    }
29}
30
31pub struct CursorEntry {
32    pub key: Vec<u8>,
33    pub val_type: ValueType,
34    pub value: Vec<u8>,
35}
36
37/// Cursor position within the B+ tree.
38/// Stores the path from root to the current leaf.
39pub struct Cursor {
40    /// Stack of (page_id, child_index) from root to current leaf's parent.
41    path: Vec<(PageId, usize)>,
42    /// Current leaf page ID.
43    leaf: PageId,
44    /// Current cell index within the leaf.
45    cell_idx: u16,
46    /// Whether the cursor is positioned at a valid entry.
47    valid: bool,
48}
49
50impl Cursor {
51    /// Position the cursor at the first key >= `key` (seek).
52    /// If `key` is empty, positions at the first entry in the tree.
53    pub fn seek(pages: &impl PageMap, root: PageId, key: &[u8]) -> Result<Self> {
54        let mut path = Vec::new();
55        let mut current = root;
56
57        // Walk to the leaf
58        loop {
59            let page = pages
60                .get_page(&current)
61                .ok_or(Error::PageOutOfBounds(current))?;
62            match page.page_type() {
63                Some(PageType::Leaf) => break,
64                Some(PageType::Branch) => {
65                    let child_idx = branch_node::search_child_index(page, key);
66                    let child = branch_node::get_child(page, child_idx);
67                    path.push((current, child_idx));
68                    current = child;
69                }
70                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
71            }
72        }
73
74        // Find the cell index in the leaf
75        let page = pages.get_page(&current).unwrap();
76        let cell_idx = match leaf_node::search(page, key) {
77            Ok(idx) => idx,
78            Err(idx) => idx,
79        };
80
81        let valid = cell_idx < page.num_cells();
82
83        let mut cursor = Self {
84            path,
85            leaf: current,
86            cell_idx,
87            valid,
88        };
89
90        // If we landed past the end of this leaf, advance to next leaf
91        if !valid && page.num_cells() > 0 {
92            cursor.advance_leaf(pages)?;
93        } else if page.num_cells() == 0 {
94            cursor.valid = false;
95        }
96
97        Ok(cursor)
98    }
99
100    /// Position the cursor at the first entry in the tree.
101    pub fn first(pages: &impl PageMap, root: PageId) -> Result<Self> {
102        let mut path = Vec::new();
103        let mut current = root;
104
105        // Walk to the leftmost leaf
106        loop {
107            let page = pages
108                .get_page(&current)
109                .ok_or(Error::PageOutOfBounds(current))?;
110            match page.page_type() {
111                Some(PageType::Leaf) => break,
112                Some(PageType::Branch) => {
113                    let child = branch_node::get_child(page, 0);
114                    path.push((current, 0));
115                    current = child;
116                }
117                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
118            }
119        }
120
121        let page = pages.get_page(&current).unwrap();
122        let valid = page.num_cells() > 0;
123
124        Ok(Self {
125            path,
126            leaf: current,
127            cell_idx: 0,
128            valid,
129        })
130    }
131
132    /// Position the cursor at the last entry in the tree.
133    pub fn last(pages: &impl PageMap, root: PageId) -> Result<Self> {
134        let mut path = Vec::new();
135        let mut current = root;
136
137        // Walk to the rightmost leaf
138        loop {
139            let page = pages
140                .get_page(&current)
141                .ok_or(Error::PageOutOfBounds(current))?;
142            match page.page_type() {
143                Some(PageType::Leaf) => break,
144                Some(PageType::Branch) => {
145                    let n = page.num_cells() as usize;
146                    let child = page.right_child();
147                    path.push((current, n));
148                    current = child;
149                }
150                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
151            }
152        }
153
154        let page = pages.get_page(&current).unwrap();
155        let n = page.num_cells();
156        let valid = n > 0;
157        let cell_idx = if valid { n - 1 } else { 0 };
158
159        Ok(Self {
160            path,
161            leaf: current,
162            cell_idx,
163            valid,
164        })
165    }
166
167    /// Whether the cursor is at a valid position.
168    pub fn is_valid(&self) -> bool {
169        self.valid
170    }
171
172    pub fn current(&self, pages: &impl PageMap) -> Option<CursorEntry> {
173        if !self.valid {
174            return None;
175        }
176        let page = pages.get_page(&self.leaf)?;
177        let cell = leaf_node::read_cell(page, self.cell_idx);
178        Some(CursorEntry {
179            key: cell.key.to_vec(),
180            val_type: cell.val_type,
181            value: cell.value.to_vec(),
182        })
183    }
184
185    pub fn current_ref<'a, P: PageMap>(&self, pages: &'a P) -> Option<LeafCell<'a>> {
186        if !self.valid {
187            return None;
188        }
189        let page = pages.get_page(&self.leaf)?;
190        Some(leaf_node::read_cell(page, self.cell_idx))
191    }
192
193    /// Move the cursor to the next entry (forward).
194    pub fn next(&mut self, pages: &impl PageMap) -> Result<bool> {
195        if !self.valid {
196            return Ok(false);
197        }
198
199        let page = pages
200            .get_page(&self.leaf)
201            .ok_or(Error::PageOutOfBounds(self.leaf))?;
202
203        if self.cell_idx + 1 < page.num_cells() {
204            self.cell_idx += 1;
205            return Ok(true);
206        }
207
208        // Need to move to the next leaf
209        self.advance_leaf(pages)
210    }
211
212    /// Move the cursor to the previous entry (backward).
213    pub fn prev(&mut self, pages: &impl PageMap) -> Result<bool> {
214        if !self.valid {
215            return Ok(false);
216        }
217
218        if self.cell_idx > 0 {
219            self.cell_idx -= 1;
220            return Ok(true);
221        }
222
223        // Need to move to the previous leaf
224        self.retreat_leaf(pages)
225    }
226
227    /// Advance to the first cell of the next leaf.
228    fn advance_leaf(&mut self, pages: &impl PageMap) -> Result<bool> {
229        // Walk up the path to find a parent where we can go right
230        while let Some((parent_id, child_idx)) = self.path.pop() {
231            let parent = pages
232                .get_page(&parent_id)
233                .ok_or(Error::PageOutOfBounds(parent_id))?;
234            let n = parent.num_cells() as usize;
235
236            if child_idx < n {
237                // There's a sibling to the right: child_idx + 1
238                let next_child_idx = child_idx + 1;
239                let next_child = branch_node::get_child(parent, next_child_idx);
240                self.path.push((parent_id, next_child_idx));
241
242                // Walk down to the leftmost leaf of this subtree
243                let mut current = next_child;
244                loop {
245                    let page = pages
246                        .get_page(&current)
247                        .ok_or(Error::PageOutOfBounds(current))?;
248                    match page.page_type() {
249                        Some(PageType::Leaf) => {
250                            self.leaf = current;
251                            self.cell_idx = 0;
252                            self.valid = page.num_cells() > 0;
253                            return Ok(self.valid);
254                        }
255                        Some(PageType::Branch) => {
256                            let child = branch_node::get_child(page, 0);
257                            self.path.push((current, 0));
258                            current = child;
259                        }
260                        _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
261                    }
262                }
263            }
264            // child_idx == num_cells (rightmost child) - keep going up
265        }
266
267        // No more siblings - we've exhausted the tree
268        self.valid = false;
269        Ok(false)
270    }
271
272    /// Retreat to the last cell of the previous leaf.
273    fn retreat_leaf(&mut self, pages: &impl PageMap) -> Result<bool> {
274        // Walk up the path to find a parent where we can go left
275        while let Some((parent_id, child_idx)) = self.path.pop() {
276            if child_idx > 0 {
277                // There's a sibling to the left: child_idx - 1
278                let prev_child_idx = child_idx - 1;
279                let parent = pages
280                    .get_page(&parent_id)
281                    .ok_or(Error::PageOutOfBounds(parent_id))?;
282                let prev_child = branch_node::get_child(parent, prev_child_idx);
283                self.path.push((parent_id, prev_child_idx));
284
285                // Walk down to the rightmost leaf of this subtree
286                let mut current = prev_child;
287                loop {
288                    let page = pages
289                        .get_page(&current)
290                        .ok_or(Error::PageOutOfBounds(current))?;
291                    match page.page_type() {
292                        Some(PageType::Leaf) => {
293                            self.leaf = current;
294                            let n = page.num_cells();
295                            if n > 0 {
296                                self.cell_idx = n - 1;
297                                self.valid = true;
298                            } else {
299                                self.valid = false;
300                            }
301                            return Ok(self.valid);
302                        }
303                        Some(PageType::Branch) => {
304                            let n = page.num_cells() as usize;
305                            let child = page.right_child();
306                            self.path.push((current, n));
307                            current = child;
308                        }
309                        _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
310                    }
311                }
312            }
313            // child_idx == 0 (leftmost child) - keep going up
314        }
315
316        // No more siblings - we've exhausted the tree
317        self.valid = false;
318        Ok(false)
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325    use crate::allocator::PageAllocator;
326    use crate::btree::BTree;
327    use citadel_core::types::TxnId;
328
329    fn build_tree(keys: &[&[u8]]) -> (HashMap<PageId, Page>, BTree) {
330        let mut pages = HashMap::new();
331        let mut alloc = PageAllocator::new(0);
332        let mut tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
333        for k in keys {
334            tree.insert(&mut pages, &mut alloc, TxnId(1), k, ValueType::Inline, k)
335                .unwrap();
336        }
337        (pages, tree)
338    }
339
340    #[test]
341    fn cursor_forward_iteration() {
342        let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
343        let mut cursor = Cursor::first(&pages, tree.root).unwrap();
344
345        let mut collected = Vec::new();
346        while cursor.is_valid() {
347            let entry = cursor.current(&pages).unwrap();
348            collected.push(entry.key.clone());
349            cursor.next(&pages).unwrap();
350        }
351
352        assert_eq!(collected, vec![b"a", b"b", b"c", b"d", b"e"]);
353    }
354
355    #[test]
356    fn cursor_backward_iteration() {
357        let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
358        let mut cursor = Cursor::last(&pages, tree.root).unwrap();
359
360        let mut collected = Vec::new();
361        while cursor.is_valid() {
362            let entry = cursor.current(&pages).unwrap();
363            collected.push(entry.key.clone());
364            cursor.prev(&pages).unwrap();
365        }
366
367        assert_eq!(collected, vec![b"e", b"d", b"c", b"b", b"a"]);
368    }
369
370    #[test]
371    fn cursor_seek() {
372        let (pages, tree) = build_tree(&[b"b", b"d", b"f", b"h"]);
373        // Seek to "c" - should land on "d" (first key >= "c")
374        let cursor = Cursor::seek(&pages, tree.root, b"c").unwrap();
375        assert!(cursor.is_valid());
376        let entry = cursor.current(&pages).unwrap();
377        assert_eq!(entry.key, b"d");
378    }
379
380    #[test]
381    fn cursor_seek_exact() {
382        let (pages, tree) = build_tree(&[b"b", b"d", b"f"]);
383        let cursor = Cursor::seek(&pages, tree.root, b"d").unwrap();
384        assert!(cursor.is_valid());
385        let entry = cursor.current(&pages).unwrap();
386        assert_eq!(entry.key, b"d");
387    }
388
389    #[test]
390    fn cursor_seek_past_end() {
391        let (pages, tree) = build_tree(&[b"a", b"b", b"c"]);
392        let cursor = Cursor::seek(&pages, tree.root, b"z").unwrap();
393        assert!(!cursor.is_valid());
394    }
395
396    #[test]
397    fn cursor_empty_tree() {
398        let mut pages = HashMap::new();
399        let mut alloc = PageAllocator::new(0);
400        let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
401
402        let cursor = Cursor::first(&pages, tree.root).unwrap();
403        assert!(!cursor.is_valid());
404    }
405
406    #[test]
407    fn cursor_large_tree_forward() {
408        let keys: Vec<Vec<u8>> = (0..2000u32)
409            .map(|i| format!("{i:06}").into_bytes())
410            .collect();
411        let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
412        let (pages, tree) = build_tree(&key_refs);
413
414        let mut cursor = Cursor::first(&pages, tree.root).unwrap();
415        let mut count = 0u32;
416        let mut prev_key: Option<Vec<u8>> = None;
417        while cursor.is_valid() {
418            let entry = cursor.current(&pages).unwrap();
419            if let Some(ref pk) = prev_key {
420                assert!(entry.key > *pk, "keys should be in sorted order");
421            }
422            prev_key = Some(entry.key);
423            count += 1;
424            cursor.next(&pages).unwrap();
425        }
426        assert_eq!(count, 2000);
427    }
428}