1use std::collections::HashMap;
4use std::sync::Arc;
5
6use citadel_core::types::{PageId, PageType, ValueType};
7use citadel_core::{Error, Result};
8use citadel_page::leaf_node::LeafCell;
9use citadel_page::page::Page;
10use citadel_page::{branch_node, leaf_node};
11
12pub trait PageMap {
13 fn get_page(&self, id: &PageId) -> Option<&Page>;
14}
15
16pub trait PageLoader: PageMap {
18 fn ensure_loaded(&mut self, id: PageId) -> Result<()>;
19}
20
21impl PageMap for HashMap<PageId, Page> {
22 fn get_page(&self, id: &PageId) -> Option<&Page> {
23 self.get(id)
24 }
25}
26
27impl PageMap for HashMap<PageId, Arc<Page>> {
28 fn get_page(&self, id: &PageId) -> Option<&Page> {
29 self.get(id).map(|a| a.as_ref())
30 }
31}
32
33pub struct CursorEntry {
34 pub key: Vec<u8>,
35 pub val_type: ValueType,
36 pub value: Vec<u8>,
37}
38
39pub struct Cursor {
41 path: Vec<(PageId, usize)>,
43 leaf: PageId,
45 cell_idx: u16,
47 valid: bool,
49}
50
51impl Cursor {
52 pub fn seek(pages: &impl PageMap, root: PageId, key: &[u8]) -> Result<Self> {
54 let mut path = Vec::new();
55 let mut current = root;
56
57 loop {
59 let page = pages
60 .get_page(¤t)
61 .ok_or(Error::PageOutOfBounds(current))?;
62 match page.page_type() {
63 Some(PageType::Leaf) => break,
64 Some(PageType::Branch) => {
65 let child_idx = branch_node::search_child_index(page, key);
66 let child = branch_node::get_child(page, child_idx);
67 path.push((current, child_idx));
68 current = child;
69 }
70 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
71 }
72 }
73
74 let page = pages.get_page(¤t).unwrap();
76 let cell_idx = match leaf_node::search(page, key) {
77 Ok(idx) => idx,
78 Err(idx) => idx,
79 };
80
81 let valid = cell_idx < page.num_cells();
82
83 let mut cursor = Self {
84 path,
85 leaf: current,
86 cell_idx,
87 valid,
88 };
89
90 if !valid && page.num_cells() > 0 {
92 cursor.advance_leaf(pages)?;
93 } else if page.num_cells() == 0 {
94 cursor.valid = false;
95 }
96
97 Ok(cursor)
98 }
99
100 pub fn first(pages: &impl PageMap, root: PageId) -> Result<Self> {
102 let mut path = Vec::new();
103 let mut current = root;
104
105 loop {
107 let page = pages
108 .get_page(¤t)
109 .ok_or(Error::PageOutOfBounds(current))?;
110 match page.page_type() {
111 Some(PageType::Leaf) => break,
112 Some(PageType::Branch) => {
113 let child = branch_node::get_child(page, 0);
114 path.push((current, 0));
115 current = child;
116 }
117 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
118 }
119 }
120
121 let page = pages.get_page(¤t).unwrap();
122 let valid = page.num_cells() > 0;
123
124 Ok(Self {
125 path,
126 leaf: current,
127 cell_idx: 0,
128 valid,
129 })
130 }
131
132 pub fn last(pages: &impl PageMap, root: PageId) -> Result<Self> {
134 let mut path = Vec::new();
135 let mut current = root;
136
137 loop {
139 let page = pages
140 .get_page(¤t)
141 .ok_or(Error::PageOutOfBounds(current))?;
142 match page.page_type() {
143 Some(PageType::Leaf) => break,
144 Some(PageType::Branch) => {
145 let n = page.num_cells() as usize;
146 let child = page.right_child();
147 path.push((current, n));
148 current = child;
149 }
150 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
151 }
152 }
153
154 let page = pages.get_page(¤t).unwrap();
155 let n = page.num_cells();
156 let valid = n > 0;
157 let cell_idx = if valid { n - 1 } else { 0 };
158
159 Ok(Self {
160 path,
161 leaf: current,
162 cell_idx,
163 valid,
164 })
165 }
166
167 pub fn is_valid(&self) -> bool {
169 self.valid
170 }
171
172 pub fn leaf_page_id(&self) -> PageId {
174 self.leaf
175 }
176
177 pub fn cell_index(&self) -> u16 {
179 self.cell_idx
180 }
181
182 pub fn set_leaf_page_id(&mut self, id: PageId) {
184 self.leaf = id;
185 }
186
187 pub fn current(&self, pages: &impl PageMap) -> Option<CursorEntry> {
188 if !self.valid {
189 return None;
190 }
191 let page = pages.get_page(&self.leaf)?;
192 let cell = leaf_node::read_cell(page, self.cell_idx);
193 Some(CursorEntry {
194 key: cell.key.to_vec(),
195 val_type: cell.val_type,
196 value: cell.value.to_vec(),
197 })
198 }
199
200 pub fn current_ref<'a, P: PageMap>(&self, pages: &'a P) -> Option<LeafCell<'a>> {
201 if !self.valid {
202 return None;
203 }
204 let page = pages.get_page(&self.leaf)?;
205 Some(leaf_node::read_cell(page, self.cell_idx))
206 }
207
208 pub fn next(&mut self, pages: &impl PageMap) -> Result<bool> {
210 if !self.valid {
211 return Ok(false);
212 }
213
214 let page = pages
215 .get_page(&self.leaf)
216 .ok_or(Error::PageOutOfBounds(self.leaf))?;
217
218 if self.cell_idx + 1 < page.num_cells() {
219 self.cell_idx += 1;
220 return Ok(true);
221 }
222
223 self.advance_leaf(pages)
225 }
226
227 pub fn prev(&mut self, pages: &impl PageMap) -> Result<bool> {
229 if !self.valid {
230 return Ok(false);
231 }
232
233 if self.cell_idx > 0 {
234 self.cell_idx -= 1;
235 return Ok(true);
236 }
237
238 self.retreat_leaf(pages)
240 }
241
242 fn advance_leaf(&mut self, pages: &impl PageMap) -> Result<bool> {
244 while let Some((parent_id, child_idx)) = self.path.pop() {
246 let parent = pages
247 .get_page(&parent_id)
248 .ok_or(Error::PageOutOfBounds(parent_id))?;
249 let n = parent.num_cells() as usize;
250
251 if child_idx < n {
252 let next_child_idx = child_idx + 1;
254 let next_child = branch_node::get_child(parent, next_child_idx);
255 self.path.push((parent_id, next_child_idx));
256
257 let mut current = next_child;
259 loop {
260 let page = pages
261 .get_page(¤t)
262 .ok_or(Error::PageOutOfBounds(current))?;
263 match page.page_type() {
264 Some(PageType::Leaf) => {
265 self.leaf = current;
266 self.cell_idx = 0;
267 self.valid = page.num_cells() > 0;
268 return Ok(self.valid);
269 }
270 Some(PageType::Branch) => {
271 let child = branch_node::get_child(page, 0);
272 self.path.push((current, 0));
273 current = child;
274 }
275 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
276 }
277 }
278 }
279 }
281
282 self.valid = false;
284 Ok(false)
285 }
286
287 pub fn seek_lazy(pages: &mut impl PageLoader, root: PageId, key: &[u8]) -> Result<Self> {
291 let mut path = Vec::new();
292 let mut current = root;
293
294 loop {
295 pages.ensure_loaded(current)?;
296 let page = pages
297 .get_page(¤t)
298 .ok_or(Error::PageOutOfBounds(current))?;
299 match page.page_type() {
300 Some(PageType::Leaf) => break,
301 Some(PageType::Branch) => {
302 let child_idx = branch_node::search_child_index(page, key);
303 let child = branch_node::get_child(page, child_idx);
304 path.push((current, child_idx));
305 current = child;
306 }
307 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
308 }
309 }
310
311 let page = pages.get_page(¤t).unwrap();
312 let cell_idx = match leaf_node::search(page, key) {
313 Ok(idx) => idx,
314 Err(idx) => idx,
315 };
316
317 let valid = cell_idx < page.num_cells();
318
319 let mut cursor = Self {
320 path,
321 leaf: current,
322 cell_idx,
323 valid,
324 };
325
326 if !valid && page.num_cells() > 0 {
327 cursor.advance_leaf_lazy(pages)?;
328 } else if page.num_cells() == 0 {
329 cursor.valid = false;
330 }
331
332 Ok(cursor)
333 }
334
335 pub fn current_ref_lazy<'a, P: PageLoader>(&self, pages: &'a mut P) -> Option<LeafCell<'a>> {
337 if !self.valid {
338 return None;
339 }
340 pages.ensure_loaded(self.leaf).ok()?;
341 let page = pages.get_page(&self.leaf)?;
342 Some(leaf_node::read_cell(page, self.cell_idx))
343 }
344
345 pub fn next_lazy(&mut self, pages: &mut impl PageLoader) -> Result<bool> {
347 if !self.valid {
348 return Ok(false);
349 }
350
351 pages.ensure_loaded(self.leaf)?;
352 let page = pages
353 .get_page(&self.leaf)
354 .ok_or(Error::PageOutOfBounds(self.leaf))?;
355
356 if self.cell_idx + 1 < page.num_cells() {
357 self.cell_idx += 1;
358 return Ok(true);
359 }
360
361 self.advance_leaf_lazy(pages)
362 }
363
364 fn advance_leaf_lazy(&mut self, pages: &mut impl PageLoader) -> Result<bool> {
366 while let Some((parent_id, child_idx)) = self.path.pop() {
367 let parent = pages
368 .get_page(&parent_id)
369 .ok_or(Error::PageOutOfBounds(parent_id))?;
370 let n = parent.num_cells() as usize;
371
372 if child_idx < n {
373 let next_child_idx = child_idx + 1;
374 let next_child = branch_node::get_child(parent, next_child_idx);
375 self.path.push((parent_id, next_child_idx));
376
377 let mut current = next_child;
378 loop {
379 pages.ensure_loaded(current)?;
380 let page = pages
381 .get_page(¤t)
382 .ok_or(Error::PageOutOfBounds(current))?;
383 match page.page_type() {
384 Some(PageType::Leaf) => {
385 self.leaf = current;
386 self.cell_idx = 0;
387 self.valid = page.num_cells() > 0;
388 return Ok(self.valid);
389 }
390 Some(PageType::Branch) => {
391 let child = branch_node::get_child(page, 0);
392 self.path.push((current, 0));
393 current = child;
394 }
395 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
396 }
397 }
398 }
399 }
400
401 self.valid = false;
402 Ok(false)
403 }
404
405 fn retreat_leaf(&mut self, pages: &impl PageMap) -> Result<bool> {
407 while let Some((parent_id, child_idx)) = self.path.pop() {
409 if child_idx > 0 {
410 let prev_child_idx = child_idx - 1;
412 let parent = pages
413 .get_page(&parent_id)
414 .ok_or(Error::PageOutOfBounds(parent_id))?;
415 let prev_child = branch_node::get_child(parent, prev_child_idx);
416 self.path.push((parent_id, prev_child_idx));
417
418 let mut current = prev_child;
420 loop {
421 let page = pages
422 .get_page(¤t)
423 .ok_or(Error::PageOutOfBounds(current))?;
424 match page.page_type() {
425 Some(PageType::Leaf) => {
426 self.leaf = current;
427 let n = page.num_cells();
428 if n > 0 {
429 self.cell_idx = n - 1;
430 self.valid = true;
431 } else {
432 self.valid = false;
433 }
434 return Ok(self.valid);
435 }
436 Some(PageType::Branch) => {
437 let n = page.num_cells() as usize;
438 let child = page.right_child();
439 self.path.push((current, n));
440 current = child;
441 }
442 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
443 }
444 }
445 }
446 }
448
449 self.valid = false;
451 Ok(false)
452 }
453}
454
455#[cfg(test)]
456mod tests {
457 use super::*;
458 use crate::allocator::PageAllocator;
459 use crate::btree::BTree;
460 use citadel_core::types::TxnId;
461
462 fn build_tree(keys: &[&[u8]]) -> (HashMap<PageId, Page>, BTree) {
463 let mut pages = HashMap::new();
464 let mut alloc = PageAllocator::new(0);
465 let mut tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
466 for k in keys {
467 tree.insert(&mut pages, &mut alloc, TxnId(1), k, ValueType::Inline, k)
468 .unwrap();
469 }
470 (pages, tree)
471 }
472
473 #[test]
474 fn cursor_forward_iteration() {
475 let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
476 let mut cursor = Cursor::first(&pages, tree.root).unwrap();
477
478 let mut collected = Vec::new();
479 while cursor.is_valid() {
480 let entry = cursor.current(&pages).unwrap();
481 collected.push(entry.key.clone());
482 cursor.next(&pages).unwrap();
483 }
484
485 assert_eq!(collected, vec![b"a", b"b", b"c", b"d", b"e"]);
486 }
487
488 #[test]
489 fn cursor_backward_iteration() {
490 let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
491 let mut cursor = Cursor::last(&pages, tree.root).unwrap();
492
493 let mut collected = Vec::new();
494 while cursor.is_valid() {
495 let entry = cursor.current(&pages).unwrap();
496 collected.push(entry.key.clone());
497 cursor.prev(&pages).unwrap();
498 }
499
500 assert_eq!(collected, vec![b"e", b"d", b"c", b"b", b"a"]);
501 }
502
503 #[test]
504 fn cursor_seek() {
505 let (pages, tree) = build_tree(&[b"b", b"d", b"f", b"h"]);
506 let cursor = Cursor::seek(&pages, tree.root, b"c").unwrap();
508 assert!(cursor.is_valid());
509 let entry = cursor.current(&pages).unwrap();
510 assert_eq!(entry.key, b"d");
511 }
512
513 #[test]
514 fn cursor_seek_exact() {
515 let (pages, tree) = build_tree(&[b"b", b"d", b"f"]);
516 let cursor = Cursor::seek(&pages, tree.root, b"d").unwrap();
517 assert!(cursor.is_valid());
518 let entry = cursor.current(&pages).unwrap();
519 assert_eq!(entry.key, b"d");
520 }
521
522 #[test]
523 fn cursor_seek_past_end() {
524 let (pages, tree) = build_tree(&[b"a", b"b", b"c"]);
525 let cursor = Cursor::seek(&pages, tree.root, b"z").unwrap();
526 assert!(!cursor.is_valid());
527 }
528
529 #[test]
530 fn cursor_empty_tree() {
531 let mut pages = HashMap::new();
532 let mut alloc = PageAllocator::new(0);
533 let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
534
535 let cursor = Cursor::first(&pages, tree.root).unwrap();
536 assert!(!cursor.is_valid());
537 }
538
539 struct TrackingLoader {
541 pages: HashMap<PageId, Page>,
542 touched: std::collections::HashSet<PageId>,
543 }
544
545 impl TrackingLoader {
546 fn new(pages: HashMap<PageId, Page>) -> Self {
547 Self {
548 pages,
549 touched: std::collections::HashSet::new(),
550 }
551 }
552 fn unique_pages_touched(&self) -> usize {
553 self.touched.len()
554 }
555 }
556
557 impl PageMap for TrackingLoader {
558 fn get_page(&self, id: &PageId) -> Option<&Page> {
559 self.pages.get(id)
560 }
561 }
562
563 impl PageLoader for TrackingLoader {
564 fn ensure_loaded(&mut self, id: PageId) -> citadel_core::Result<()> {
565 if self.pages.contains_key(&id) {
566 self.touched.insert(id);
567 Ok(())
568 } else {
569 Err(citadel_core::Error::PageOutOfBounds(id))
570 }
571 }
572 }
573
574 #[test]
575 fn lazy_cursor_forward() {
576 let keys: Vec<Vec<u8>> = (0..2000u32)
577 .map(|i| format!("{i:06}").into_bytes())
578 .collect();
579 let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
580 let (pages, tree) = build_tree(&key_refs);
581
582 let mut loader = TrackingLoader::new(pages);
583 let mut cursor = Cursor::seek_lazy(&mut loader, tree.root, b"").unwrap();
584 let mut count = 0u32;
585 while cursor.is_valid() {
586 let entry = cursor.current_ref_lazy(&mut loader);
587 assert!(entry.is_some());
588 count += 1;
589 cursor.next_lazy(&mut loader).unwrap();
590 }
591 assert_eq!(count, 2000);
592 }
593
594 #[test]
595 fn lazy_cursor_range_loads_fewer_pages() {
596 let keys: Vec<Vec<u8>> = (0..2000u32)
597 .map(|i| format!("{i:06}").into_bytes())
598 .collect();
599 let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
600 let (pages, tree) = build_tree(&key_refs);
601 let total_pages = pages.len();
602
603 let mut loader = TrackingLoader::new(pages);
604 let mut cursor = Cursor::seek_lazy(&mut loader, tree.root, b"001000").unwrap();
605 let mut count = 0u32;
606 while cursor.is_valid() {
607 if let Some(entry) = cursor.current_ref_lazy(&mut loader) {
608 if entry.key > b"001009".as_slice() {
609 break;
610 }
611 count += 1;
612 }
613 cursor.next_lazy(&mut loader).unwrap();
614 }
615 assert_eq!(count, 10);
616 let touched = loader.unique_pages_touched();
618 assert!(
619 touched < total_pages,
620 "lazy touched {} unique pages but tree has {} total",
621 touched,
622 total_pages,
623 );
624 }
625
626 #[test]
627 fn cursor_large_tree_forward() {
628 let keys: Vec<Vec<u8>> = (0..2000u32)
629 .map(|i| format!("{i:06}").into_bytes())
630 .collect();
631 let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
632 let (pages, tree) = build_tree(&key_refs);
633
634 let mut cursor = Cursor::first(&pages, tree.root).unwrap();
635 let mut count = 0u32;
636 let mut prev_key: Option<Vec<u8>> = None;
637 while cursor.is_valid() {
638 let entry = cursor.current(&pages).unwrap();
639 if let Some(ref pk) = prev_key {
640 assert!(entry.key > *pk, "keys should be in sorted order");
641 }
642 prev_key = Some(entry.key);
643 count += 1;
644 cursor.next(&pages).unwrap();
645 }
646 assert_eq!(count, 2000);
647 }
648}