Skip to main content

citadeldb_buffer/
cursor.rs

1//! Cursor for B+ tree range iteration.
2//!
3//! Uses a saved root-to-leaf path stack (no sibling pointers needed).
4//! Supports forward and backward iteration across leaf boundaries.
5
6use std::collections::HashMap;
7use citadel_core::{Error, Result};
8use citadel_core::types::{PageId, PageType, ValueType};
9use citadel_page::page::Page;
10use citadel_page::{branch_node, leaf_node};
11
12/// A key-value entry returned by the cursor.
13pub struct CursorEntry {
14    pub key: Vec<u8>,
15    pub val_type: ValueType,
16    pub value: Vec<u8>,
17}
18
19/// Cursor position within the B+ tree.
20/// Stores the path from root to the current leaf.
21pub struct Cursor {
22    /// Stack of (page_id, child_index) from root to current leaf's parent.
23    path: Vec<(PageId, usize)>,
24    /// Current leaf page ID.
25    leaf: PageId,
26    /// Current cell index within the leaf.
27    cell_idx: u16,
28    /// Whether the cursor is positioned at a valid entry.
29    valid: bool,
30}
31
32impl Cursor {
33    /// Position the cursor at the first key >= `key` (seek).
34    /// If `key` is empty, positions at the first entry in the tree.
35    pub fn seek(
36        pages: &HashMap<PageId, Page>,
37        root: PageId,
38        key: &[u8],
39    ) -> Result<Self> {
40        let mut path = Vec::new();
41        let mut current = root;
42
43        // Walk to the leaf
44        loop {
45            let page = pages.get(&current)
46                .ok_or(Error::PageOutOfBounds(current))?;
47            match page.page_type() {
48                Some(PageType::Leaf) => break,
49                Some(PageType::Branch) => {
50                    let child_idx = branch_node::search_child_index(page, key);
51                    let child = branch_node::get_child(page, child_idx);
52                    path.push((current, child_idx));
53                    current = child;
54                }
55                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
56            }
57        }
58
59        // Find the cell index in the leaf
60        let page = pages.get(&current).unwrap();
61        let cell_idx = match leaf_node::search(page, key) {
62            Ok(idx) => idx,
63            Err(idx) => idx,
64        };
65
66        let valid = cell_idx < page.num_cells();
67
68        let mut cursor = Self { path, leaf: current, cell_idx, valid };
69
70        // If we landed past the end of this leaf, advance to next leaf
71        if !valid && page.num_cells() > 0 {
72            cursor.advance_leaf(pages)?;
73        } else if page.num_cells() == 0 {
74            cursor.valid = false;
75        }
76
77        Ok(cursor)
78    }
79
80    /// Position the cursor at the first entry in the tree.
81    pub fn first(
82        pages: &HashMap<PageId, Page>,
83        root: PageId,
84    ) -> Result<Self> {
85        let mut path = Vec::new();
86        let mut current = root;
87
88        // Walk to the leftmost leaf
89        loop {
90            let page = pages.get(&current)
91                .ok_or(Error::PageOutOfBounds(current))?;
92            match page.page_type() {
93                Some(PageType::Leaf) => break,
94                Some(PageType::Branch) => {
95                    let child = branch_node::get_child(page, 0);
96                    path.push((current, 0));
97                    current = child;
98                }
99                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
100            }
101        }
102
103        let page = pages.get(&current).unwrap();
104        let valid = page.num_cells() > 0;
105
106        Ok(Self { path, leaf: current, cell_idx: 0, valid })
107    }
108
109    /// Position the cursor at the last entry in the tree.
110    pub fn last(
111        pages: &HashMap<PageId, Page>,
112        root: PageId,
113    ) -> Result<Self> {
114        let mut path = Vec::new();
115        let mut current = root;
116
117        // Walk to the rightmost leaf
118        loop {
119            let page = pages.get(&current)
120                .ok_or(Error::PageOutOfBounds(current))?;
121            match page.page_type() {
122                Some(PageType::Leaf) => break,
123                Some(PageType::Branch) => {
124                    let n = page.num_cells() as usize;
125                    let child = page.right_child();
126                    path.push((current, n));
127                    current = child;
128                }
129                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
130            }
131        }
132
133        let page = pages.get(&current).unwrap();
134        let n = page.num_cells();
135        let valid = n > 0;
136        let cell_idx = if valid { n - 1 } else { 0 };
137
138        Ok(Self { path, leaf: current, cell_idx, valid })
139    }
140
141    /// Whether the cursor is at a valid position.
142    pub fn is_valid(&self) -> bool {
143        self.valid
144    }
145
146    /// Get the current entry without advancing.
147    pub fn current(&self, pages: &HashMap<PageId, Page>) -> Option<CursorEntry> {
148        if !self.valid {
149            return None;
150        }
151        let page = pages.get(&self.leaf)?;
152        let cell = leaf_node::read_cell(page, self.cell_idx);
153        Some(CursorEntry {
154            key: cell.key.to_vec(),
155            val_type: cell.val_type,
156            value: cell.value.to_vec(),
157        })
158    }
159
160    /// Move the cursor to the next entry (forward).
161    pub fn next(&mut self, pages: &HashMap<PageId, Page>) -> Result<bool> {
162        if !self.valid {
163            return Ok(false);
164        }
165
166        let page = pages.get(&self.leaf)
167            .ok_or(Error::PageOutOfBounds(self.leaf))?;
168
169        if self.cell_idx + 1 < page.num_cells() {
170            self.cell_idx += 1;
171            return Ok(true);
172        }
173
174        // Need to move to the next leaf
175        self.advance_leaf(pages)
176    }
177
178    /// Move the cursor to the previous entry (backward).
179    pub fn prev(&mut self, pages: &HashMap<PageId, Page>) -> Result<bool> {
180        if !self.valid {
181            return Ok(false);
182        }
183
184        if self.cell_idx > 0 {
185            self.cell_idx -= 1;
186            return Ok(true);
187        }
188
189        // Need to move to the previous leaf
190        self.retreat_leaf(pages)
191    }
192
193    /// Advance to the first cell of the next leaf.
194    fn advance_leaf(&mut self, pages: &HashMap<PageId, Page>) -> Result<bool> {
195        // Walk up the path to find a parent where we can go right
196        while let Some((parent_id, child_idx)) = self.path.pop() {
197            let parent = pages.get(&parent_id)
198                .ok_or(Error::PageOutOfBounds(parent_id))?;
199            let n = parent.num_cells() as usize;
200
201            if child_idx < n {
202                // There's a sibling to the right: child_idx + 1
203                let next_child_idx = child_idx + 1;
204                let next_child = branch_node::get_child(parent, next_child_idx);
205                self.path.push((parent_id, next_child_idx));
206
207                // Walk down to the leftmost leaf of this subtree
208                let mut current = next_child;
209                loop {
210                    let page = pages.get(&current)
211                        .ok_or(Error::PageOutOfBounds(current))?;
212                    match page.page_type() {
213                        Some(PageType::Leaf) => {
214                            self.leaf = current;
215                            self.cell_idx = 0;
216                            self.valid = page.num_cells() > 0;
217                            return Ok(self.valid);
218                        }
219                        Some(PageType::Branch) => {
220                            let child = branch_node::get_child(page, 0);
221                            self.path.push((current, 0));
222                            current = child;
223                        }
224                        _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
225                    }
226                }
227            }
228            // child_idx == num_cells (rightmost child) — keep going up
229        }
230
231        // No more siblings — we've exhausted the tree
232        self.valid = false;
233        Ok(false)
234    }
235
236    /// Retreat to the last cell of the previous leaf.
237    fn retreat_leaf(&mut self, pages: &HashMap<PageId, Page>) -> Result<bool> {
238        // Walk up the path to find a parent where we can go left
239        while let Some((parent_id, child_idx)) = self.path.pop() {
240            if child_idx > 0 {
241                // There's a sibling to the left: child_idx - 1
242                let prev_child_idx = child_idx - 1;
243                let parent = pages.get(&parent_id)
244                    .ok_or(Error::PageOutOfBounds(parent_id))?;
245                let prev_child = branch_node::get_child(parent, prev_child_idx);
246                self.path.push((parent_id, prev_child_idx));
247
248                // Walk down to the rightmost leaf of this subtree
249                let mut current = prev_child;
250                loop {
251                    let page = pages.get(&current)
252                        .ok_or(Error::PageOutOfBounds(current))?;
253                    match page.page_type() {
254                        Some(PageType::Leaf) => {
255                            self.leaf = current;
256                            let n = page.num_cells();
257                            if n > 0 {
258                                self.cell_idx = n - 1;
259                                self.valid = true;
260                            } else {
261                                self.valid = false;
262                            }
263                            return Ok(self.valid);
264                        }
265                        Some(PageType::Branch) => {
266                            let n = page.num_cells() as usize;
267                            let child = page.right_child();
268                            self.path.push((current, n));
269                            current = child;
270                        }
271                        _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
272                    }
273                }
274            }
275            // child_idx == 0 (leftmost child) — keep going up
276        }
277
278        // No more siblings — we've exhausted the tree
279        self.valid = false;
280        Ok(false)
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use crate::allocator::PageAllocator;
288    use crate::btree::BTree;
289    use citadel_core::types::TxnId;
290
291    fn build_tree(keys: &[&[u8]]) -> (HashMap<PageId, Page>, BTree) {
292        let mut pages = HashMap::new();
293        let mut alloc = PageAllocator::new(0);
294        let mut tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
295        for k in keys {
296            tree.insert(&mut pages, &mut alloc, TxnId(1), k, ValueType::Inline, *k).unwrap();
297        }
298        (pages, tree)
299    }
300
301    #[test]
302    fn cursor_forward_iteration() {
303        let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
304        let mut cursor = Cursor::first(&pages, tree.root).unwrap();
305
306        let mut collected = Vec::new();
307        while cursor.is_valid() {
308            let entry = cursor.current(&pages).unwrap();
309            collected.push(entry.key.clone());
310            cursor.next(&pages).unwrap();
311        }
312
313        assert_eq!(collected, vec![b"a", b"b", b"c", b"d", b"e"]);
314    }
315
316    #[test]
317    fn cursor_backward_iteration() {
318        let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
319        let mut cursor = Cursor::last(&pages, tree.root).unwrap();
320
321        let mut collected = Vec::new();
322        while cursor.is_valid() {
323            let entry = cursor.current(&pages).unwrap();
324            collected.push(entry.key.clone());
325            cursor.prev(&pages).unwrap();
326        }
327
328        assert_eq!(collected, vec![b"e", b"d", b"c", b"b", b"a"]);
329    }
330
331    #[test]
332    fn cursor_seek() {
333        let (pages, tree) = build_tree(&[b"b", b"d", b"f", b"h"]);
334        // Seek to "c" — should land on "d" (first key >= "c")
335        let cursor = Cursor::seek(&pages, tree.root, b"c").unwrap();
336        assert!(cursor.is_valid());
337        let entry = cursor.current(&pages).unwrap();
338        assert_eq!(entry.key, b"d");
339    }
340
341    #[test]
342    fn cursor_seek_exact() {
343        let (pages, tree) = build_tree(&[b"b", b"d", b"f"]);
344        let cursor = Cursor::seek(&pages, tree.root, b"d").unwrap();
345        assert!(cursor.is_valid());
346        let entry = cursor.current(&pages).unwrap();
347        assert_eq!(entry.key, b"d");
348    }
349
350    #[test]
351    fn cursor_seek_past_end() {
352        let (pages, tree) = build_tree(&[b"a", b"b", b"c"]);
353        let cursor = Cursor::seek(&pages, tree.root, b"z").unwrap();
354        assert!(!cursor.is_valid());
355    }
356
357    #[test]
358    fn cursor_empty_tree() {
359        let mut pages = HashMap::new();
360        let mut alloc = PageAllocator::new(0);
361        let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
362
363        let cursor = Cursor::first(&pages, tree.root).unwrap();
364        assert!(!cursor.is_valid());
365    }
366
367    #[test]
368    fn cursor_large_tree_forward() {
369        let keys: Vec<Vec<u8>> = (0..2000u32)
370            .map(|i| format!("{i:06}").into_bytes())
371            .collect();
372        let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
373        let (pages, tree) = build_tree(&key_refs);
374
375        let mut cursor = Cursor::first(&pages, tree.root).unwrap();
376        let mut count = 0u32;
377        let mut prev_key: Option<Vec<u8>> = None;
378        while cursor.is_valid() {
379            let entry = cursor.current(&pages).unwrap();
380            if let Some(ref pk) = prev_key {
381                assert!(entry.key > *pk, "keys should be in sorted order");
382            }
383            prev_key = Some(entry.key);
384            count += 1;
385            cursor.next(&pages).unwrap();
386        }
387        assert_eq!(count, 2000);
388    }
389}