1use citadel_core::types::{PageId, PageType, ValueType};
7use citadel_core::{Error, Result};
8use citadel_page::page::Page;
9use citadel_page::{branch_node, leaf_node};
10use std::collections::HashMap;
11
12pub struct CursorEntry {
14 pub key: Vec<u8>,
15 pub val_type: ValueType,
16 pub value: Vec<u8>,
17}
18
19pub struct Cursor {
22 path: Vec<(PageId, usize)>,
24 leaf: PageId,
26 cell_idx: u16,
28 valid: bool,
30}
31
32impl Cursor {
33 pub fn seek(pages: &HashMap<PageId, Page>, root: PageId, key: &[u8]) -> Result<Self> {
36 let mut path = Vec::new();
37 let mut current = root;
38
39 loop {
41 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
42 match page.page_type() {
43 Some(PageType::Leaf) => break,
44 Some(PageType::Branch) => {
45 let child_idx = branch_node::search_child_index(page, key);
46 let child = branch_node::get_child(page, child_idx);
47 path.push((current, child_idx));
48 current = child;
49 }
50 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
51 }
52 }
53
54 let page = pages.get(¤t).unwrap();
56 let cell_idx = match leaf_node::search(page, key) {
57 Ok(idx) => idx,
58 Err(idx) => idx,
59 };
60
61 let valid = cell_idx < page.num_cells();
62
63 let mut cursor = Self {
64 path,
65 leaf: current,
66 cell_idx,
67 valid,
68 };
69
70 if !valid && page.num_cells() > 0 {
72 cursor.advance_leaf(pages)?;
73 } else if page.num_cells() == 0 {
74 cursor.valid = false;
75 }
76
77 Ok(cursor)
78 }
79
80 pub fn first(pages: &HashMap<PageId, Page>, root: PageId) -> Result<Self> {
82 let mut path = Vec::new();
83 let mut current = root;
84
85 loop {
87 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
88 match page.page_type() {
89 Some(PageType::Leaf) => break,
90 Some(PageType::Branch) => {
91 let child = branch_node::get_child(page, 0);
92 path.push((current, 0));
93 current = child;
94 }
95 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
96 }
97 }
98
99 let page = pages.get(¤t).unwrap();
100 let valid = page.num_cells() > 0;
101
102 Ok(Self {
103 path,
104 leaf: current,
105 cell_idx: 0,
106 valid,
107 })
108 }
109
110 pub fn last(pages: &HashMap<PageId, Page>, root: PageId) -> Result<Self> {
112 let mut path = Vec::new();
113 let mut current = root;
114
115 loop {
117 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
118 match page.page_type() {
119 Some(PageType::Leaf) => break,
120 Some(PageType::Branch) => {
121 let n = page.num_cells() as usize;
122 let child = page.right_child();
123 path.push((current, n));
124 current = child;
125 }
126 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
127 }
128 }
129
130 let page = pages.get(¤t).unwrap();
131 let n = page.num_cells();
132 let valid = n > 0;
133 let cell_idx = if valid { n - 1 } else { 0 };
134
135 Ok(Self {
136 path,
137 leaf: current,
138 cell_idx,
139 valid,
140 })
141 }
142
143 pub fn is_valid(&self) -> bool {
145 self.valid
146 }
147
148 pub fn current(&self, pages: &HashMap<PageId, Page>) -> Option<CursorEntry> {
150 if !self.valid {
151 return None;
152 }
153 let page = pages.get(&self.leaf)?;
154 let cell = leaf_node::read_cell(page, self.cell_idx);
155 Some(CursorEntry {
156 key: cell.key.to_vec(),
157 val_type: cell.val_type,
158 value: cell.value.to_vec(),
159 })
160 }
161
162 pub fn next(&mut self, pages: &HashMap<PageId, Page>) -> Result<bool> {
164 if !self.valid {
165 return Ok(false);
166 }
167
168 let page = pages
169 .get(&self.leaf)
170 .ok_or(Error::PageOutOfBounds(self.leaf))?;
171
172 if self.cell_idx + 1 < page.num_cells() {
173 self.cell_idx += 1;
174 return Ok(true);
175 }
176
177 self.advance_leaf(pages)
179 }
180
181 pub fn prev(&mut self, pages: &HashMap<PageId, Page>) -> Result<bool> {
183 if !self.valid {
184 return Ok(false);
185 }
186
187 if self.cell_idx > 0 {
188 self.cell_idx -= 1;
189 return Ok(true);
190 }
191
192 self.retreat_leaf(pages)
194 }
195
196 fn advance_leaf(&mut self, pages: &HashMap<PageId, Page>) -> Result<bool> {
198 while let Some((parent_id, child_idx)) = self.path.pop() {
200 let parent = pages
201 .get(&parent_id)
202 .ok_or(Error::PageOutOfBounds(parent_id))?;
203 let n = parent.num_cells() as usize;
204
205 if child_idx < n {
206 let next_child_idx = child_idx + 1;
208 let next_child = branch_node::get_child(parent, next_child_idx);
209 self.path.push((parent_id, next_child_idx));
210
211 let mut current = next_child;
213 loop {
214 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
215 match page.page_type() {
216 Some(PageType::Leaf) => {
217 self.leaf = current;
218 self.cell_idx = 0;
219 self.valid = page.num_cells() > 0;
220 return Ok(self.valid);
221 }
222 Some(PageType::Branch) => {
223 let child = branch_node::get_child(page, 0);
224 self.path.push((current, 0));
225 current = child;
226 }
227 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
228 }
229 }
230 }
231 }
233
234 self.valid = false;
236 Ok(false)
237 }
238
239 fn retreat_leaf(&mut self, pages: &HashMap<PageId, Page>) -> Result<bool> {
241 while let Some((parent_id, child_idx)) = self.path.pop() {
243 if child_idx > 0 {
244 let prev_child_idx = child_idx - 1;
246 let parent = pages
247 .get(&parent_id)
248 .ok_or(Error::PageOutOfBounds(parent_id))?;
249 let prev_child = branch_node::get_child(parent, prev_child_idx);
250 self.path.push((parent_id, prev_child_idx));
251
252 let mut current = prev_child;
254 loop {
255 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
256 match page.page_type() {
257 Some(PageType::Leaf) => {
258 self.leaf = current;
259 let n = page.num_cells();
260 if n > 0 {
261 self.cell_idx = n - 1;
262 self.valid = true;
263 } else {
264 self.valid = false;
265 }
266 return Ok(self.valid);
267 }
268 Some(PageType::Branch) => {
269 let n = page.num_cells() as usize;
270 let child = page.right_child();
271 self.path.push((current, n));
272 current = child;
273 }
274 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
275 }
276 }
277 }
278 }
280
281 self.valid = false;
283 Ok(false)
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use crate::allocator::PageAllocator;
291 use crate::btree::BTree;
292 use citadel_core::types::TxnId;
293
294 fn build_tree(keys: &[&[u8]]) -> (HashMap<PageId, Page>, BTree) {
295 let mut pages = HashMap::new();
296 let mut alloc = PageAllocator::new(0);
297 let mut tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
298 for k in keys {
299 tree.insert(&mut pages, &mut alloc, TxnId(1), k, ValueType::Inline, k)
300 .unwrap();
301 }
302 (pages, tree)
303 }
304
305 #[test]
306 fn cursor_forward_iteration() {
307 let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
308 let mut cursor = Cursor::first(&pages, tree.root).unwrap();
309
310 let mut collected = Vec::new();
311 while cursor.is_valid() {
312 let entry = cursor.current(&pages).unwrap();
313 collected.push(entry.key.clone());
314 cursor.next(&pages).unwrap();
315 }
316
317 assert_eq!(collected, vec![b"a", b"b", b"c", b"d", b"e"]);
318 }
319
320 #[test]
321 fn cursor_backward_iteration() {
322 let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
323 let mut cursor = Cursor::last(&pages, tree.root).unwrap();
324
325 let mut collected = Vec::new();
326 while cursor.is_valid() {
327 let entry = cursor.current(&pages).unwrap();
328 collected.push(entry.key.clone());
329 cursor.prev(&pages).unwrap();
330 }
331
332 assert_eq!(collected, vec![b"e", b"d", b"c", b"b", b"a"]);
333 }
334
335 #[test]
336 fn cursor_seek() {
337 let (pages, tree) = build_tree(&[b"b", b"d", b"f", b"h"]);
338 let cursor = Cursor::seek(&pages, tree.root, b"c").unwrap();
340 assert!(cursor.is_valid());
341 let entry = cursor.current(&pages).unwrap();
342 assert_eq!(entry.key, b"d");
343 }
344
345 #[test]
346 fn cursor_seek_exact() {
347 let (pages, tree) = build_tree(&[b"b", b"d", b"f"]);
348 let cursor = Cursor::seek(&pages, tree.root, b"d").unwrap();
349 assert!(cursor.is_valid());
350 let entry = cursor.current(&pages).unwrap();
351 assert_eq!(entry.key, b"d");
352 }
353
354 #[test]
355 fn cursor_seek_past_end() {
356 let (pages, tree) = build_tree(&[b"a", b"b", b"c"]);
357 let cursor = Cursor::seek(&pages, tree.root, b"z").unwrap();
358 assert!(!cursor.is_valid());
359 }
360
361 #[test]
362 fn cursor_empty_tree() {
363 let mut pages = HashMap::new();
364 let mut alloc = PageAllocator::new(0);
365 let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
366
367 let cursor = Cursor::first(&pages, tree.root).unwrap();
368 assert!(!cursor.is_valid());
369 }
370
371 #[test]
372 fn cursor_large_tree_forward() {
373 let keys: Vec<Vec<u8>> = (0..2000u32)
374 .map(|i| format!("{i:06}").into_bytes())
375 .collect();
376 let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
377 let (pages, tree) = build_tree(&key_refs);
378
379 let mut cursor = Cursor::first(&pages, tree.root).unwrap();
380 let mut count = 0u32;
381 let mut prev_key: Option<Vec<u8>> = None;
382 while cursor.is_valid() {
383 let entry = cursor.current(&pages).unwrap();
384 if let Some(ref pk) = prev_key {
385 assert!(entry.key > *pk, "keys should be in sorted order");
386 }
387 prev_key = Some(entry.key);
388 count += 1;
389 cursor.next(&pages).unwrap();
390 }
391 assert_eq!(count, 2000);
392 }
393}