Skip to main content

citadel_buffer/
cursor.rs

1//! B+ tree cursor for range iteration using a root-to-leaf path stack.
2
3use std::collections::HashMap;
4use std::hash::BuildHasher;
5use std::sync::Arc;
6
7use citadel_core::types::{PageId, PageType, ValueType};
8use citadel_core::{Error, Result};
9use citadel_page::leaf_node::LeafCell;
10use citadel_page::page::Page;
11use citadel_page::{branch_node, leaf_node};
12
13pub trait PageMap {
14    fn get_page(&self, id: &PageId) -> Option<&Page>;
15}
16
17/// Extends `PageMap` with on-demand page loading for lazy cursor traversal.
18pub trait PageLoader: PageMap {
19    fn ensure_loaded(&mut self, id: PageId) -> Result<()>;
20}
21
22impl<S: BuildHasher> PageMap for HashMap<PageId, Page, S> {
23    fn get_page(&self, id: &PageId) -> Option<&Page> {
24        self.get(id)
25    }
26}
27
28impl<S: BuildHasher> PageMap for HashMap<PageId, Arc<Page>, S> {
29    fn get_page(&self, id: &PageId) -> Option<&Page> {
30        self.get(id).map(|a| a.as_ref())
31    }
32}
33
34pub struct CursorEntry {
35    pub key: Vec<u8>,
36    pub val_type: ValueType,
37    pub value: Vec<u8>,
38}
39
40/// B+ tree cursor position with root-to-leaf path.
41pub struct Cursor {
42    /// Stack of (page_id, child_index) from root to current leaf's parent.
43    path: Vec<(PageId, usize)>,
44    /// Current leaf page ID.
45    leaf: PageId,
46    /// Current cell index within the leaf.
47    cell_idx: u16,
48    /// Whether the cursor is positioned at a valid entry.
49    valid: bool,
50}
51
52impl Cursor {
53    /// Seek to first key >= `key`. Empty key = first entry.
54    pub fn seek(pages: &impl PageMap, root: PageId, key: &[u8]) -> Result<Self> {
55        let mut path = Vec::new();
56        let mut current = root;
57
58        loop {
59            let page = pages
60                .get_page(&current)
61                .ok_or(Error::PageOutOfBounds(current))?;
62            match page.page_type() {
63                Some(PageType::Leaf) => break,
64                Some(PageType::Branch) => {
65                    let child_idx = branch_node::search_child_index(page, key);
66                    let child = branch_node::get_child(page, child_idx);
67                    path.push((current, child_idx));
68                    current = child;
69                }
70                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
71            }
72        }
73
74        let page = pages.get_page(&current).unwrap();
75        let cell_idx = match leaf_node::search(page, key) {
76            Ok(idx) => idx,
77            Err(idx) => idx,
78        };
79
80        let valid = cell_idx < page.num_cells();
81
82        let mut cursor = Self {
83            path,
84            leaf: current,
85            cell_idx,
86            valid,
87        };
88
89        // Landed past the end of this leaf: advance to next.
90        if !valid && page.num_cells() > 0 {
91            cursor.advance_leaf(pages)?;
92        } else if page.num_cells() == 0 {
93            cursor.valid = false;
94        }
95
96        Ok(cursor)
97    }
98
99    /// Position the cursor at the first entry in the tree.
100    pub fn first(pages: &impl PageMap, root: PageId) -> Result<Self> {
101        let mut path = Vec::new();
102        let mut current = root;
103
104        loop {
105            let page = pages
106                .get_page(&current)
107                .ok_or(Error::PageOutOfBounds(current))?;
108            match page.page_type() {
109                Some(PageType::Leaf) => break,
110                Some(PageType::Branch) => {
111                    let child = branch_node::get_child(page, 0);
112                    path.push((current, 0));
113                    current = child;
114                }
115                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
116            }
117        }
118
119        let page = pages.get_page(&current).unwrap();
120        let valid = page.num_cells() > 0;
121
122        Ok(Self {
123            path,
124            leaf: current,
125            cell_idx: 0,
126            valid,
127        })
128    }
129
130    /// Position the cursor at the last entry in the tree.
131    pub fn last(pages: &impl PageMap, root: PageId) -> Result<Self> {
132        let mut path = Vec::new();
133        let mut current = root;
134
135        loop {
136            let page = pages
137                .get_page(&current)
138                .ok_or(Error::PageOutOfBounds(current))?;
139            match page.page_type() {
140                Some(PageType::Leaf) => break,
141                Some(PageType::Branch) => {
142                    let n = page.num_cells() as usize;
143                    let child = page.right_child();
144                    path.push((current, n));
145                    current = child;
146                }
147                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
148            }
149        }
150
151        let page = pages.get_page(&current).unwrap();
152        let n = page.num_cells();
153        let valid = n > 0;
154        let cell_idx = if valid { n - 1 } else { 0 };
155
156        Ok(Self {
157            path,
158            leaf: current,
159            cell_idx,
160            valid,
161        })
162    }
163
164    /// Whether the cursor is at a valid position.
165    pub fn is_valid(&self) -> bool {
166        self.valid
167    }
168
169    /// Current leaf page ID.
170    pub fn leaf_page_id(&self) -> PageId {
171        self.leaf
172    }
173
174    /// Current cell index within the leaf.
175    pub fn cell_index(&self) -> u16 {
176        self.cell_idx
177    }
178
179    /// Update the current leaf page ID after a CoW operation.
180    pub fn set_leaf_page_id(&mut self, id: PageId) {
181        self.leaf = id;
182    }
183
184    pub fn current(&self, pages: &impl PageMap) -> Option<CursorEntry> {
185        if !self.valid {
186            return None;
187        }
188        let page = pages.get_page(&self.leaf)?;
189        let cell = leaf_node::read_cell(page, self.cell_idx);
190        Some(CursorEntry {
191            key: cell.key.to_vec(),
192            val_type: cell.val_type,
193            value: cell.value.to_vec(),
194        })
195    }
196
197    pub fn current_ref<'a, P: PageMap>(&self, pages: &'a P) -> Option<LeafCell<'a>> {
198        if !self.valid {
199            return None;
200        }
201        let page = pages.get_page(&self.leaf)?;
202        Some(leaf_node::read_cell(page, self.cell_idx))
203    }
204
205    /// Move the cursor to the next entry (forward).
206    pub fn next(&mut self, pages: &impl PageMap) -> Result<bool> {
207        if !self.valid {
208            return Ok(false);
209        }
210
211        let page = pages
212            .get_page(&self.leaf)
213            .ok_or(Error::PageOutOfBounds(self.leaf))?;
214
215        if self.cell_idx + 1 < page.num_cells() {
216            self.cell_idx += 1;
217            return Ok(true);
218        }
219
220        self.advance_leaf(pages)
221    }
222
223    /// Move the cursor to the previous entry (backward).
224    pub fn prev(&mut self, pages: &impl PageMap) -> Result<bool> {
225        if !self.valid {
226            return Ok(false);
227        }
228
229        if self.cell_idx > 0 {
230            self.cell_idx -= 1;
231            return Ok(true);
232        }
233
234        self.retreat_leaf(pages)
235    }
236
237    /// Advance to the first cell of the next leaf.
238    fn advance_leaf(&mut self, pages: &impl PageMap) -> Result<bool> {
239        while let Some((parent_id, child_idx)) = self.path.pop() {
240            let parent = pages
241                .get_page(&parent_id)
242                .ok_or(Error::PageOutOfBounds(parent_id))?;
243            let n = parent.num_cells() as usize;
244
245            if child_idx < n {
246                let next_child_idx = child_idx + 1;
247                let next_child = branch_node::get_child(parent, next_child_idx);
248                self.path.push((parent_id, next_child_idx));
249
250                let mut current = next_child;
251                loop {
252                    let page = pages
253                        .get_page(&current)
254                        .ok_or(Error::PageOutOfBounds(current))?;
255                    match page.page_type() {
256                        Some(PageType::Leaf) => {
257                            self.leaf = current;
258                            self.cell_idx = 0;
259                            self.valid = page.num_cells() > 0;
260                            return Ok(self.valid);
261                        }
262                        Some(PageType::Branch) => {
263                            let child = branch_node::get_child(page, 0);
264                            self.path.push((current, 0));
265                            current = child;
266                        }
267                        _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
268                    }
269                }
270            }
271            // child_idx == num_cells (rightmost child) - keep going up
272        }
273
274        // No more siblings - we've exhausted the tree
275        self.valid = false;
276        Ok(false)
277    }
278
279    /// Seek with lazy page loading — only loads the root-to-leaf path.
280    pub fn seek_lazy(pages: &mut impl PageLoader, root: PageId, key: &[u8]) -> Result<Self> {
281        let mut path = Vec::new();
282        let mut current = root;
283
284        loop {
285            pages.ensure_loaded(current)?;
286            let page = pages
287                .get_page(&current)
288                .ok_or(Error::PageOutOfBounds(current))?;
289            match page.page_type() {
290                Some(PageType::Leaf) => break,
291                Some(PageType::Branch) => {
292                    let child_idx = branch_node::search_child_index(page, key);
293                    let child = branch_node::get_child(page, child_idx);
294                    path.push((current, child_idx));
295                    current = child;
296                }
297                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
298            }
299        }
300
301        let page = pages.get_page(&current).unwrap();
302        let cell_idx = match leaf_node::search(page, key) {
303            Ok(idx) => idx,
304            Err(idx) => idx,
305        };
306
307        let valid = cell_idx < page.num_cells();
308
309        let mut cursor = Self {
310            path,
311            leaf: current,
312            cell_idx,
313            valid,
314        };
315
316        if !valid && page.num_cells() > 0 {
317            cursor.advance_leaf_lazy(pages)?;
318        } else if page.num_cells() == 0 {
319            cursor.valid = false;
320        }
321
322        Ok(cursor)
323    }
324
325    /// Read the current entry, loading the leaf page if needed.
326    pub fn current_ref_lazy<'a, P: PageLoader + ?Sized>(
327        &self,
328        pages: &'a mut P,
329    ) -> Option<LeafCell<'a>> {
330        if !self.valid {
331            return None;
332        }
333        pages.ensure_loaded(self.leaf).ok()?;
334        let page = pages.get_page(&self.leaf)?;
335        Some(leaf_node::read_cell(page, self.cell_idx))
336    }
337
338    /// Advance to the next entry, loading pages on demand.
339    pub fn next_lazy<P: PageLoader + ?Sized>(&mut self, pages: &mut P) -> Result<bool> {
340        if !self.valid {
341            return Ok(false);
342        }
343
344        pages.ensure_loaded(self.leaf)?;
345        let page = pages
346            .get_page(&self.leaf)
347            .ok_or(Error::PageOutOfBounds(self.leaf))?;
348
349        if self.cell_idx + 1 < page.num_cells() {
350            self.cell_idx += 1;
351            return Ok(true);
352        }
353
354        self.advance_leaf_lazy(pages)
355    }
356
357    /// Advance to the next leaf, loading child pages on demand.
358    fn advance_leaf_lazy<P: PageLoader + ?Sized>(&mut self, pages: &mut P) -> Result<bool> {
359        while let Some((parent_id, child_idx)) = self.path.pop() {
360            let parent = pages
361                .get_page(&parent_id)
362                .ok_or(Error::PageOutOfBounds(parent_id))?;
363            let n = parent.num_cells() as usize;
364
365            if child_idx < n {
366                let next_child_idx = child_idx + 1;
367                let next_child = branch_node::get_child(parent, next_child_idx);
368                self.path.push((parent_id, next_child_idx));
369
370                let mut current = next_child;
371                loop {
372                    pages.ensure_loaded(current)?;
373                    let page = pages
374                        .get_page(&current)
375                        .ok_or(Error::PageOutOfBounds(current))?;
376                    match page.page_type() {
377                        Some(PageType::Leaf) => {
378                            self.leaf = current;
379                            self.cell_idx = 0;
380                            self.valid = page.num_cells() > 0;
381                            return Ok(self.valid);
382                        }
383                        Some(PageType::Branch) => {
384                            let child = branch_node::get_child(page, 0);
385                            self.path.push((current, 0));
386                            current = child;
387                        }
388                        _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
389                    }
390                }
391            }
392        }
393
394        self.valid = false;
395        Ok(false)
396    }
397
398    /// Retreat to the last cell of the previous leaf.
399    fn retreat_leaf(&mut self, pages: &impl PageMap) -> Result<bool> {
400        while let Some((parent_id, child_idx)) = self.path.pop() {
401            if child_idx > 0 {
402                let prev_child_idx = child_idx - 1;
403                let parent = pages
404                    .get_page(&parent_id)
405                    .ok_or(Error::PageOutOfBounds(parent_id))?;
406                let prev_child = branch_node::get_child(parent, prev_child_idx);
407                self.path.push((parent_id, prev_child_idx));
408
409                let mut current = prev_child;
410                loop {
411                    let page = pages
412                        .get_page(&current)
413                        .ok_or(Error::PageOutOfBounds(current))?;
414                    match page.page_type() {
415                        Some(PageType::Leaf) => {
416                            self.leaf = current;
417                            let n = page.num_cells();
418                            if n > 0 {
419                                self.cell_idx = n - 1;
420                                self.valid = true;
421                            } else {
422                                self.valid = false;
423                            }
424                            return Ok(self.valid);
425                        }
426                        Some(PageType::Branch) => {
427                            let n = page.num_cells() as usize;
428                            let child = page.right_child();
429                            self.path.push((current, n));
430                            current = child;
431                        }
432                        _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
433                    }
434                }
435            }
436            // child_idx == 0 (leftmost child) - keep going up
437        }
438
439        // No more siblings - we've exhausted the tree
440        self.valid = false;
441        Ok(false)
442    }
443}
444
445#[cfg(test)]
446#[path = "cursor_tests.rs"]
447mod tests;