Skip to main content

citadel_buffer/
cursor.rs

1//! Cursor for B+ tree range iteration.
2//!
3//! Uses a saved root-to-leaf path stack (no sibling pointers needed).
4//! Supports forward and backward iteration across leaf boundaries.
5
6use citadel_core::types::{PageId, PageType, ValueType};
7use citadel_core::{Error, Result};
8use citadel_page::page::Page;
9use citadel_page::{branch_node, leaf_node};
10use std::collections::HashMap;
11
12/// A key-value entry returned by the cursor.
13pub struct CursorEntry {
14    pub key: Vec<u8>,
15    pub val_type: ValueType,
16    pub value: Vec<u8>,
17}
18
19/// Cursor position within the B+ tree.
20/// Stores the path from root to the current leaf.
21pub struct Cursor {
22    /// Stack of (page_id, child_index) from root to current leaf's parent.
23    path: Vec<(PageId, usize)>,
24    /// Current leaf page ID.
25    leaf: PageId,
26    /// Current cell index within the leaf.
27    cell_idx: u16,
28    /// Whether the cursor is positioned at a valid entry.
29    valid: bool,
30}
31
32impl Cursor {
33    /// Position the cursor at the first key >= `key` (seek).
34    /// If `key` is empty, positions at the first entry in the tree.
35    pub fn seek(pages: &HashMap<PageId, Page>, root: PageId, key: &[u8]) -> Result<Self> {
36        let mut path = Vec::new();
37        let mut current = root;
38
39        // Walk to the leaf
40        loop {
41            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
42            match page.page_type() {
43                Some(PageType::Leaf) => break,
44                Some(PageType::Branch) => {
45                    let child_idx = branch_node::search_child_index(page, key);
46                    let child = branch_node::get_child(page, child_idx);
47                    path.push((current, child_idx));
48                    current = child;
49                }
50                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
51            }
52        }
53
54        // Find the cell index in the leaf
55        let page = pages.get(&current).unwrap();
56        let cell_idx = match leaf_node::search(page, key) {
57            Ok(idx) => idx,
58            Err(idx) => idx,
59        };
60
61        let valid = cell_idx < page.num_cells();
62
63        let mut cursor = Self {
64            path,
65            leaf: current,
66            cell_idx,
67            valid,
68        };
69
70        // If we landed past the end of this leaf, advance to next leaf
71        if !valid && page.num_cells() > 0 {
72            cursor.advance_leaf(pages)?;
73        } else if page.num_cells() == 0 {
74            cursor.valid = false;
75        }
76
77        Ok(cursor)
78    }
79
80    /// Position the cursor at the first entry in the tree.
81    pub fn first(pages: &HashMap<PageId, Page>, root: PageId) -> Result<Self> {
82        let mut path = Vec::new();
83        let mut current = root;
84
85        // Walk to the leftmost leaf
86        loop {
87            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
88            match page.page_type() {
89                Some(PageType::Leaf) => break,
90                Some(PageType::Branch) => {
91                    let child = branch_node::get_child(page, 0);
92                    path.push((current, 0));
93                    current = child;
94                }
95                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
96            }
97        }
98
99        let page = pages.get(&current).unwrap();
100        let valid = page.num_cells() > 0;
101
102        Ok(Self {
103            path,
104            leaf: current,
105            cell_idx: 0,
106            valid,
107        })
108    }
109
110    /// Position the cursor at the last entry in the tree.
111    pub fn last(pages: &HashMap<PageId, Page>, root: PageId) -> Result<Self> {
112        let mut path = Vec::new();
113        let mut current = root;
114
115        // Walk to the rightmost leaf
116        loop {
117            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
118            match page.page_type() {
119                Some(PageType::Leaf) => break,
120                Some(PageType::Branch) => {
121                    let n = page.num_cells() as usize;
122                    let child = page.right_child();
123                    path.push((current, n));
124                    current = child;
125                }
126                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
127            }
128        }
129
130        let page = pages.get(&current).unwrap();
131        let n = page.num_cells();
132        let valid = n > 0;
133        let cell_idx = if valid { n - 1 } else { 0 };
134
135        Ok(Self {
136            path,
137            leaf: current,
138            cell_idx,
139            valid,
140        })
141    }
142
143    /// Whether the cursor is at a valid position.
144    pub fn is_valid(&self) -> bool {
145        self.valid
146    }
147
148    /// Get the current entry without advancing.
149    pub fn current(&self, pages: &HashMap<PageId, Page>) -> Option<CursorEntry> {
150        if !self.valid {
151            return None;
152        }
153        let page = pages.get(&self.leaf)?;
154        let cell = leaf_node::read_cell(page, self.cell_idx);
155        Some(CursorEntry {
156            key: cell.key.to_vec(),
157            val_type: cell.val_type,
158            value: cell.value.to_vec(),
159        })
160    }
161
162    /// Move the cursor to the next entry (forward).
163    pub fn next(&mut self, pages: &HashMap<PageId, Page>) -> Result<bool> {
164        if !self.valid {
165            return Ok(false);
166        }
167
168        let page = pages
169            .get(&self.leaf)
170            .ok_or(Error::PageOutOfBounds(self.leaf))?;
171
172        if self.cell_idx + 1 < page.num_cells() {
173            self.cell_idx += 1;
174            return Ok(true);
175        }
176
177        // Need to move to the next leaf
178        self.advance_leaf(pages)
179    }
180
181    /// Move the cursor to the previous entry (backward).
182    pub fn prev(&mut self, pages: &HashMap<PageId, Page>) -> Result<bool> {
183        if !self.valid {
184            return Ok(false);
185        }
186
187        if self.cell_idx > 0 {
188            self.cell_idx -= 1;
189            return Ok(true);
190        }
191
192        // Need to move to the previous leaf
193        self.retreat_leaf(pages)
194    }
195
196    /// Advance to the first cell of the next leaf.
197    fn advance_leaf(&mut self, pages: &HashMap<PageId, Page>) -> Result<bool> {
198        // Walk up the path to find a parent where we can go right
199        while let Some((parent_id, child_idx)) = self.path.pop() {
200            let parent = pages
201                .get(&parent_id)
202                .ok_or(Error::PageOutOfBounds(parent_id))?;
203            let n = parent.num_cells() as usize;
204
205            if child_idx < n {
206                // There's a sibling to the right: child_idx + 1
207                let next_child_idx = child_idx + 1;
208                let next_child = branch_node::get_child(parent, next_child_idx);
209                self.path.push((parent_id, next_child_idx));
210
211                // Walk down to the leftmost leaf of this subtree
212                let mut current = next_child;
213                loop {
214                    let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
215                    match page.page_type() {
216                        Some(PageType::Leaf) => {
217                            self.leaf = current;
218                            self.cell_idx = 0;
219                            self.valid = page.num_cells() > 0;
220                            return Ok(self.valid);
221                        }
222                        Some(PageType::Branch) => {
223                            let child = branch_node::get_child(page, 0);
224                            self.path.push((current, 0));
225                            current = child;
226                        }
227                        _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
228                    }
229                }
230            }
231            // child_idx == num_cells (rightmost child) — keep going up
232        }
233
234        // No more siblings — we've exhausted the tree
235        self.valid = false;
236        Ok(false)
237    }
238
239    /// Retreat to the last cell of the previous leaf.
240    fn retreat_leaf(&mut self, pages: &HashMap<PageId, Page>) -> Result<bool> {
241        // Walk up the path to find a parent where we can go left
242        while let Some((parent_id, child_idx)) = self.path.pop() {
243            if child_idx > 0 {
244                // There's a sibling to the left: child_idx - 1
245                let prev_child_idx = child_idx - 1;
246                let parent = pages
247                    .get(&parent_id)
248                    .ok_or(Error::PageOutOfBounds(parent_id))?;
249                let prev_child = branch_node::get_child(parent, prev_child_idx);
250                self.path.push((parent_id, prev_child_idx));
251
252                // Walk down to the rightmost leaf of this subtree
253                let mut current = prev_child;
254                loop {
255                    let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
256                    match page.page_type() {
257                        Some(PageType::Leaf) => {
258                            self.leaf = current;
259                            let n = page.num_cells();
260                            if n > 0 {
261                                self.cell_idx = n - 1;
262                                self.valid = true;
263                            } else {
264                                self.valid = false;
265                            }
266                            return Ok(self.valid);
267                        }
268                        Some(PageType::Branch) => {
269                            let n = page.num_cells() as usize;
270                            let child = page.right_child();
271                            self.path.push((current, n));
272                            current = child;
273                        }
274                        _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
275                    }
276                }
277            }
278            // child_idx == 0 (leftmost child) — keep going up
279        }
280
281        // No more siblings — we've exhausted the tree
282        self.valid = false;
283        Ok(false)
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    use crate::allocator::PageAllocator;
291    use crate::btree::BTree;
292    use citadel_core::types::TxnId;
293
294    fn build_tree(keys: &[&[u8]]) -> (HashMap<PageId, Page>, BTree) {
295        let mut pages = HashMap::new();
296        let mut alloc = PageAllocator::new(0);
297        let mut tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
298        for k in keys {
299            tree.insert(&mut pages, &mut alloc, TxnId(1), k, ValueType::Inline, k)
300                .unwrap();
301        }
302        (pages, tree)
303    }
304
305    #[test]
306    fn cursor_forward_iteration() {
307        let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
308        let mut cursor = Cursor::first(&pages, tree.root).unwrap();
309
310        let mut collected = Vec::new();
311        while cursor.is_valid() {
312            let entry = cursor.current(&pages).unwrap();
313            collected.push(entry.key.clone());
314            cursor.next(&pages).unwrap();
315        }
316
317        assert_eq!(collected, vec![b"a", b"b", b"c", b"d", b"e"]);
318    }
319
320    #[test]
321    fn cursor_backward_iteration() {
322        let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
323        let mut cursor = Cursor::last(&pages, tree.root).unwrap();
324
325        let mut collected = Vec::new();
326        while cursor.is_valid() {
327            let entry = cursor.current(&pages).unwrap();
328            collected.push(entry.key.clone());
329            cursor.prev(&pages).unwrap();
330        }
331
332        assert_eq!(collected, vec![b"e", b"d", b"c", b"b", b"a"]);
333    }
334
335    #[test]
336    fn cursor_seek() {
337        let (pages, tree) = build_tree(&[b"b", b"d", b"f", b"h"]);
338        // Seek to "c" — should land on "d" (first key >= "c")
339        let cursor = Cursor::seek(&pages, tree.root, b"c").unwrap();
340        assert!(cursor.is_valid());
341        let entry = cursor.current(&pages).unwrap();
342        assert_eq!(entry.key, b"d");
343    }
344
345    #[test]
346    fn cursor_seek_exact() {
347        let (pages, tree) = build_tree(&[b"b", b"d", b"f"]);
348        let cursor = Cursor::seek(&pages, tree.root, b"d").unwrap();
349        assert!(cursor.is_valid());
350        let entry = cursor.current(&pages).unwrap();
351        assert_eq!(entry.key, b"d");
352    }
353
354    #[test]
355    fn cursor_seek_past_end() {
356        let (pages, tree) = build_tree(&[b"a", b"b", b"c"]);
357        let cursor = Cursor::seek(&pages, tree.root, b"z").unwrap();
358        assert!(!cursor.is_valid());
359    }
360
361    #[test]
362    fn cursor_empty_tree() {
363        let mut pages = HashMap::new();
364        let mut alloc = PageAllocator::new(0);
365        let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
366
367        let cursor = Cursor::first(&pages, tree.root).unwrap();
368        assert!(!cursor.is_valid());
369    }
370
371    #[test]
372    fn cursor_large_tree_forward() {
373        let keys: Vec<Vec<u8>> = (0..2000u32)
374            .map(|i| format!("{i:06}").into_bytes())
375            .collect();
376        let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
377        let (pages, tree) = build_tree(&key_refs);
378
379        let mut cursor = Cursor::first(&pages, tree.root).unwrap();
380        let mut count = 0u32;
381        let mut prev_key: Option<Vec<u8>> = None;
382        while cursor.is_valid() {
383            let entry = cursor.current(&pages).unwrap();
384            if let Some(ref pk) = prev_key {
385                assert!(entry.key > *pk, "keys should be in sorted order");
386            }
387            prev_key = Some(entry.key);
388            count += 1;
389            cursor.next(&pages).unwrap();
390        }
391        assert_eq!(count, 2000);
392    }
393}