1use std::collections::HashMap;
7use citadel_core::{Error, Result};
8use citadel_core::types::{PageId, PageType, ValueType};
9use citadel_page::page::Page;
10use citadel_page::{branch_node, leaf_node};
11
12pub struct CursorEntry {
14 pub key: Vec<u8>,
15 pub val_type: ValueType,
16 pub value: Vec<u8>,
17}
18
19pub struct Cursor {
22 path: Vec<(PageId, usize)>,
24 leaf: PageId,
26 cell_idx: u16,
28 valid: bool,
30}
31
32impl Cursor {
33 pub fn seek(
36 pages: &HashMap<PageId, Page>,
37 root: PageId,
38 key: &[u8],
39 ) -> Result<Self> {
40 let mut path = Vec::new();
41 let mut current = root;
42
43 loop {
45 let page = pages.get(¤t)
46 .ok_or(Error::PageOutOfBounds(current))?;
47 match page.page_type() {
48 Some(PageType::Leaf) => break,
49 Some(PageType::Branch) => {
50 let child_idx = branch_node::search_child_index(page, key);
51 let child = branch_node::get_child(page, child_idx);
52 path.push((current, child_idx));
53 current = child;
54 }
55 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
56 }
57 }
58
59 let page = pages.get(¤t).unwrap();
61 let cell_idx = match leaf_node::search(page, key) {
62 Ok(idx) => idx,
63 Err(idx) => idx,
64 };
65
66 let valid = cell_idx < page.num_cells();
67
68 let mut cursor = Self { path, leaf: current, cell_idx, valid };
69
70 if !valid && page.num_cells() > 0 {
72 cursor.advance_leaf(pages)?;
73 } else if page.num_cells() == 0 {
74 cursor.valid = false;
75 }
76
77 Ok(cursor)
78 }
79
80 pub fn first(
82 pages: &HashMap<PageId, Page>,
83 root: PageId,
84 ) -> Result<Self> {
85 let mut path = Vec::new();
86 let mut current = root;
87
88 loop {
90 let page = pages.get(¤t)
91 .ok_or(Error::PageOutOfBounds(current))?;
92 match page.page_type() {
93 Some(PageType::Leaf) => break,
94 Some(PageType::Branch) => {
95 let child = branch_node::get_child(page, 0);
96 path.push((current, 0));
97 current = child;
98 }
99 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
100 }
101 }
102
103 let page = pages.get(¤t).unwrap();
104 let valid = page.num_cells() > 0;
105
106 Ok(Self { path, leaf: current, cell_idx: 0, valid })
107 }
108
109 pub fn last(
111 pages: &HashMap<PageId, Page>,
112 root: PageId,
113 ) -> Result<Self> {
114 let mut path = Vec::new();
115 let mut current = root;
116
117 loop {
119 let page = pages.get(¤t)
120 .ok_or(Error::PageOutOfBounds(current))?;
121 match page.page_type() {
122 Some(PageType::Leaf) => break,
123 Some(PageType::Branch) => {
124 let n = page.num_cells() as usize;
125 let child = page.right_child();
126 path.push((current, n));
127 current = child;
128 }
129 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
130 }
131 }
132
133 let page = pages.get(¤t).unwrap();
134 let n = page.num_cells();
135 let valid = n > 0;
136 let cell_idx = if valid { n - 1 } else { 0 };
137
138 Ok(Self { path, leaf: current, cell_idx, valid })
139 }
140
141 pub fn is_valid(&self) -> bool {
143 self.valid
144 }
145
146 pub fn current(&self, pages: &HashMap<PageId, Page>) -> Option<CursorEntry> {
148 if !self.valid {
149 return None;
150 }
151 let page = pages.get(&self.leaf)?;
152 let cell = leaf_node::read_cell(page, self.cell_idx);
153 Some(CursorEntry {
154 key: cell.key.to_vec(),
155 val_type: cell.val_type,
156 value: cell.value.to_vec(),
157 })
158 }
159
160 pub fn next(&mut self, pages: &HashMap<PageId, Page>) -> Result<bool> {
162 if !self.valid {
163 return Ok(false);
164 }
165
166 let page = pages.get(&self.leaf)
167 .ok_or(Error::PageOutOfBounds(self.leaf))?;
168
169 if self.cell_idx + 1 < page.num_cells() {
170 self.cell_idx += 1;
171 return Ok(true);
172 }
173
174 self.advance_leaf(pages)
176 }
177
178 pub fn prev(&mut self, pages: &HashMap<PageId, Page>) -> Result<bool> {
180 if !self.valid {
181 return Ok(false);
182 }
183
184 if self.cell_idx > 0 {
185 self.cell_idx -= 1;
186 return Ok(true);
187 }
188
189 self.retreat_leaf(pages)
191 }
192
193 fn advance_leaf(&mut self, pages: &HashMap<PageId, Page>) -> Result<bool> {
195 while let Some((parent_id, child_idx)) = self.path.pop() {
197 let parent = pages.get(&parent_id)
198 .ok_or(Error::PageOutOfBounds(parent_id))?;
199 let n = parent.num_cells() as usize;
200
201 if child_idx < n {
202 let next_child_idx = child_idx + 1;
204 let next_child = branch_node::get_child(parent, next_child_idx);
205 self.path.push((parent_id, next_child_idx));
206
207 let mut current = next_child;
209 loop {
210 let page = pages.get(¤t)
211 .ok_or(Error::PageOutOfBounds(current))?;
212 match page.page_type() {
213 Some(PageType::Leaf) => {
214 self.leaf = current;
215 self.cell_idx = 0;
216 self.valid = page.num_cells() > 0;
217 return Ok(self.valid);
218 }
219 Some(PageType::Branch) => {
220 let child = branch_node::get_child(page, 0);
221 self.path.push((current, 0));
222 current = child;
223 }
224 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
225 }
226 }
227 }
228 }
230
231 self.valid = false;
233 Ok(false)
234 }
235
236 fn retreat_leaf(&mut self, pages: &HashMap<PageId, Page>) -> Result<bool> {
238 while let Some((parent_id, child_idx)) = self.path.pop() {
240 if child_idx > 0 {
241 let prev_child_idx = child_idx - 1;
243 let parent = pages.get(&parent_id)
244 .ok_or(Error::PageOutOfBounds(parent_id))?;
245 let prev_child = branch_node::get_child(parent, prev_child_idx);
246 self.path.push((parent_id, prev_child_idx));
247
248 let mut current = prev_child;
250 loop {
251 let page = pages.get(¤t)
252 .ok_or(Error::PageOutOfBounds(current))?;
253 match page.page_type() {
254 Some(PageType::Leaf) => {
255 self.leaf = current;
256 let n = page.num_cells();
257 if n > 0 {
258 self.cell_idx = n - 1;
259 self.valid = true;
260 } else {
261 self.valid = false;
262 }
263 return Ok(self.valid);
264 }
265 Some(PageType::Branch) => {
266 let n = page.num_cells() as usize;
267 let child = page.right_child();
268 self.path.push((current, n));
269 current = child;
270 }
271 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
272 }
273 }
274 }
275 }
277
278 self.valid = false;
280 Ok(false)
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use crate::allocator::PageAllocator;
288 use crate::btree::BTree;
289 use citadel_core::types::TxnId;
290
291 fn build_tree(keys: &[&[u8]]) -> (HashMap<PageId, Page>, BTree) {
292 let mut pages = HashMap::new();
293 let mut alloc = PageAllocator::new(0);
294 let mut tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
295 for k in keys {
296 tree.insert(&mut pages, &mut alloc, TxnId(1), k, ValueType::Inline, *k).unwrap();
297 }
298 (pages, tree)
299 }
300
301 #[test]
302 fn cursor_forward_iteration() {
303 let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
304 let mut cursor = Cursor::first(&pages, tree.root).unwrap();
305
306 let mut collected = Vec::new();
307 while cursor.is_valid() {
308 let entry = cursor.current(&pages).unwrap();
309 collected.push(entry.key.clone());
310 cursor.next(&pages).unwrap();
311 }
312
313 assert_eq!(collected, vec![b"a", b"b", b"c", b"d", b"e"]);
314 }
315
316 #[test]
317 fn cursor_backward_iteration() {
318 let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
319 let mut cursor = Cursor::last(&pages, tree.root).unwrap();
320
321 let mut collected = Vec::new();
322 while cursor.is_valid() {
323 let entry = cursor.current(&pages).unwrap();
324 collected.push(entry.key.clone());
325 cursor.prev(&pages).unwrap();
326 }
327
328 assert_eq!(collected, vec![b"e", b"d", b"c", b"b", b"a"]);
329 }
330
331 #[test]
332 fn cursor_seek() {
333 let (pages, tree) = build_tree(&[b"b", b"d", b"f", b"h"]);
334 let cursor = Cursor::seek(&pages, tree.root, b"c").unwrap();
336 assert!(cursor.is_valid());
337 let entry = cursor.current(&pages).unwrap();
338 assert_eq!(entry.key, b"d");
339 }
340
341 #[test]
342 fn cursor_seek_exact() {
343 let (pages, tree) = build_tree(&[b"b", b"d", b"f"]);
344 let cursor = Cursor::seek(&pages, tree.root, b"d").unwrap();
345 assert!(cursor.is_valid());
346 let entry = cursor.current(&pages).unwrap();
347 assert_eq!(entry.key, b"d");
348 }
349
350 #[test]
351 fn cursor_seek_past_end() {
352 let (pages, tree) = build_tree(&[b"a", b"b", b"c"]);
353 let cursor = Cursor::seek(&pages, tree.root, b"z").unwrap();
354 assert!(!cursor.is_valid());
355 }
356
357 #[test]
358 fn cursor_empty_tree() {
359 let mut pages = HashMap::new();
360 let mut alloc = PageAllocator::new(0);
361 let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
362
363 let cursor = Cursor::first(&pages, tree.root).unwrap();
364 assert!(!cursor.is_valid());
365 }
366
367 #[test]
368 fn cursor_large_tree_forward() {
369 let keys: Vec<Vec<u8>> = (0..2000u32)
370 .map(|i| format!("{i:06}").into_bytes())
371 .collect();
372 let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
373 let (pages, tree) = build_tree(&key_refs);
374
375 let mut cursor = Cursor::first(&pages, tree.root).unwrap();
376 let mut count = 0u32;
377 let mut prev_key: Option<Vec<u8>> = None;
378 while cursor.is_valid() {
379 let entry = cursor.current(&pages).unwrap();
380 if let Some(ref pk) = prev_key {
381 assert!(entry.key > *pk, "keys should be in sorted order");
382 }
383 prev_key = Some(entry.key);
384 count += 1;
385 cursor.next(&pages).unwrap();
386 }
387 assert_eq!(count, 2000);
388 }
389}