1use std::collections::HashMap;
4use std::hash::BuildHasher;
5use std::sync::Arc;
6
7use citadel_core::types::{PageId, PageType, ValueType};
8use citadel_core::{Error, Result};
9use citadel_page::leaf_node::LeafCell;
10use citadel_page::page::Page;
11use citadel_page::{branch_node, leaf_node};
12
13pub trait PageMap {
14 fn get_page(&self, id: &PageId) -> Option<&Page>;
15}
16
17pub trait PageLoader: PageMap {
19 fn ensure_loaded(&mut self, id: PageId) -> Result<()>;
20}
21
22impl<S: BuildHasher> PageMap for HashMap<PageId, Page, S> {
23 fn get_page(&self, id: &PageId) -> Option<&Page> {
24 self.get(id)
25 }
26}
27
28impl<S: BuildHasher> PageMap for HashMap<PageId, Arc<Page>, S> {
29 fn get_page(&self, id: &PageId) -> Option<&Page> {
30 self.get(id).map(|a| a.as_ref())
31 }
32}
33
34pub struct CursorEntry {
35 pub key: Vec<u8>,
36 pub val_type: ValueType,
37 pub value: Vec<u8>,
38}
39
40pub struct Cursor {
42 path: Vec<(PageId, usize)>,
44 leaf: PageId,
46 cell_idx: u16,
48 valid: bool,
50}
51
52impl Cursor {
53 pub fn seek(pages: &impl PageMap, root: PageId, key: &[u8]) -> Result<Self> {
55 let mut path = Vec::new();
56 let mut current = root;
57
58 loop {
59 let page = pages
60 .get_page(¤t)
61 .ok_or(Error::PageOutOfBounds(current))?;
62 match page.page_type() {
63 Some(PageType::Leaf) => break,
64 Some(PageType::Branch) => {
65 let child_idx = branch_node::search_child_index(page, key);
66 let child = branch_node::get_child(page, child_idx);
67 path.push((current, child_idx));
68 current = child;
69 }
70 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
71 }
72 }
73
74 let page = pages.get_page(¤t).unwrap();
75 let cell_idx = match leaf_node::search(page, key) {
76 Ok(idx) => idx,
77 Err(idx) => idx,
78 };
79
80 let valid = cell_idx < page.num_cells();
81
82 let mut cursor = Self {
83 path,
84 leaf: current,
85 cell_idx,
86 valid,
87 };
88
89 if !valid && page.num_cells() > 0 {
91 cursor.advance_leaf(pages)?;
92 } else if page.num_cells() == 0 {
93 cursor.valid = false;
94 }
95
96 Ok(cursor)
97 }
98
99 pub fn first(pages: &impl PageMap, root: PageId) -> Result<Self> {
101 let mut path = Vec::new();
102 let mut current = root;
103
104 loop {
105 let page = pages
106 .get_page(¤t)
107 .ok_or(Error::PageOutOfBounds(current))?;
108 match page.page_type() {
109 Some(PageType::Leaf) => break,
110 Some(PageType::Branch) => {
111 let child = branch_node::get_child(page, 0);
112 path.push((current, 0));
113 current = child;
114 }
115 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
116 }
117 }
118
119 let page = pages.get_page(¤t).unwrap();
120 let valid = page.num_cells() > 0;
121
122 Ok(Self {
123 path,
124 leaf: current,
125 cell_idx: 0,
126 valid,
127 })
128 }
129
130 pub fn last(pages: &impl PageMap, root: PageId) -> Result<Self> {
132 let mut path = Vec::new();
133 let mut current = root;
134
135 loop {
136 let page = pages
137 .get_page(¤t)
138 .ok_or(Error::PageOutOfBounds(current))?;
139 match page.page_type() {
140 Some(PageType::Leaf) => break,
141 Some(PageType::Branch) => {
142 let n = page.num_cells() as usize;
143 let child = page.right_child();
144 path.push((current, n));
145 current = child;
146 }
147 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
148 }
149 }
150
151 let page = pages.get_page(¤t).unwrap();
152 let n = page.num_cells();
153 let valid = n > 0;
154 let cell_idx = if valid { n - 1 } else { 0 };
155
156 Ok(Self {
157 path,
158 leaf: current,
159 cell_idx,
160 valid,
161 })
162 }
163
164 pub fn is_valid(&self) -> bool {
166 self.valid
167 }
168
169 pub fn leaf_page_id(&self) -> PageId {
171 self.leaf
172 }
173
174 pub fn cell_index(&self) -> u16 {
176 self.cell_idx
177 }
178
179 pub fn set_leaf_page_id(&mut self, id: PageId) {
181 self.leaf = id;
182 }
183
184 pub fn current(&self, pages: &impl PageMap) -> Option<CursorEntry> {
185 if !self.valid {
186 return None;
187 }
188 let page = pages.get_page(&self.leaf)?;
189 let cell = leaf_node::read_cell(page, self.cell_idx);
190 Some(CursorEntry {
191 key: cell.key.to_vec(),
192 val_type: cell.val_type,
193 value: cell.value.to_vec(),
194 })
195 }
196
197 pub fn current_ref<'a, P: PageMap>(&self, pages: &'a P) -> Option<LeafCell<'a>> {
198 if !self.valid {
199 return None;
200 }
201 let page = pages.get_page(&self.leaf)?;
202 Some(leaf_node::read_cell(page, self.cell_idx))
203 }
204
205 pub fn next(&mut self, pages: &impl PageMap) -> Result<bool> {
207 if !self.valid {
208 return Ok(false);
209 }
210
211 let page = pages
212 .get_page(&self.leaf)
213 .ok_or(Error::PageOutOfBounds(self.leaf))?;
214
215 if self.cell_idx + 1 < page.num_cells() {
216 self.cell_idx += 1;
217 return Ok(true);
218 }
219
220 self.advance_leaf(pages)
221 }
222
223 pub fn prev(&mut self, pages: &impl PageMap) -> Result<bool> {
225 if !self.valid {
226 return Ok(false);
227 }
228
229 if self.cell_idx > 0 {
230 self.cell_idx -= 1;
231 return Ok(true);
232 }
233
234 self.retreat_leaf(pages)
235 }
236
237 fn advance_leaf(&mut self, pages: &impl PageMap) -> Result<bool> {
239 while let Some((parent_id, child_idx)) = self.path.pop() {
240 let parent = pages
241 .get_page(&parent_id)
242 .ok_or(Error::PageOutOfBounds(parent_id))?;
243 let n = parent.num_cells() as usize;
244
245 if child_idx < n {
246 let next_child_idx = child_idx + 1;
247 let next_child = branch_node::get_child(parent, next_child_idx);
248 self.path.push((parent_id, next_child_idx));
249
250 let mut current = next_child;
251 loop {
252 let page = pages
253 .get_page(¤t)
254 .ok_or(Error::PageOutOfBounds(current))?;
255 match page.page_type() {
256 Some(PageType::Leaf) => {
257 self.leaf = current;
258 self.cell_idx = 0;
259 self.valid = page.num_cells() > 0;
260 return Ok(self.valid);
261 }
262 Some(PageType::Branch) => {
263 let child = branch_node::get_child(page, 0);
264 self.path.push((current, 0));
265 current = child;
266 }
267 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
268 }
269 }
270 }
271 }
273
274 self.valid = false;
276 Ok(false)
277 }
278
279 pub fn seek_lazy(pages: &mut impl PageLoader, root: PageId, key: &[u8]) -> Result<Self> {
281 let mut path = Vec::new();
282 let mut current = root;
283
284 loop {
285 pages.ensure_loaded(current)?;
286 let page = pages
287 .get_page(¤t)
288 .ok_or(Error::PageOutOfBounds(current))?;
289 match page.page_type() {
290 Some(PageType::Leaf) => break,
291 Some(PageType::Branch) => {
292 let child_idx = branch_node::search_child_index(page, key);
293 let child = branch_node::get_child(page, child_idx);
294 path.push((current, child_idx));
295 current = child;
296 }
297 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
298 }
299 }
300
301 let page = pages.get_page(¤t).unwrap();
302 let cell_idx = match leaf_node::search(page, key) {
303 Ok(idx) => idx,
304 Err(idx) => idx,
305 };
306
307 let valid = cell_idx < page.num_cells();
308
309 let mut cursor = Self {
310 path,
311 leaf: current,
312 cell_idx,
313 valid,
314 };
315
316 if !valid && page.num_cells() > 0 {
317 cursor.advance_leaf_lazy(pages)?;
318 } else if page.num_cells() == 0 {
319 cursor.valid = false;
320 }
321
322 Ok(cursor)
323 }
324
325 pub fn current_ref_lazy<'a, P: PageLoader + ?Sized>(
327 &self,
328 pages: &'a mut P,
329 ) -> Option<LeafCell<'a>> {
330 if !self.valid {
331 return None;
332 }
333 pages.ensure_loaded(self.leaf).ok()?;
334 let page = pages.get_page(&self.leaf)?;
335 Some(leaf_node::read_cell(page, self.cell_idx))
336 }
337
338 pub fn next_lazy<P: PageLoader + ?Sized>(&mut self, pages: &mut P) -> Result<bool> {
340 if !self.valid {
341 return Ok(false);
342 }
343
344 pages.ensure_loaded(self.leaf)?;
345 let page = pages
346 .get_page(&self.leaf)
347 .ok_or(Error::PageOutOfBounds(self.leaf))?;
348
349 if self.cell_idx + 1 < page.num_cells() {
350 self.cell_idx += 1;
351 return Ok(true);
352 }
353
354 self.advance_leaf_lazy(pages)
355 }
356
357 fn advance_leaf_lazy<P: PageLoader + ?Sized>(&mut self, pages: &mut P) -> Result<bool> {
359 while let Some((parent_id, child_idx)) = self.path.pop() {
360 let parent = pages
361 .get_page(&parent_id)
362 .ok_or(Error::PageOutOfBounds(parent_id))?;
363 let n = parent.num_cells() as usize;
364
365 if child_idx < n {
366 let next_child_idx = child_idx + 1;
367 let next_child = branch_node::get_child(parent, next_child_idx);
368 self.path.push((parent_id, next_child_idx));
369
370 let mut current = next_child;
371 loop {
372 pages.ensure_loaded(current)?;
373 let page = pages
374 .get_page(¤t)
375 .ok_or(Error::PageOutOfBounds(current))?;
376 match page.page_type() {
377 Some(PageType::Leaf) => {
378 self.leaf = current;
379 self.cell_idx = 0;
380 self.valid = page.num_cells() > 0;
381 return Ok(self.valid);
382 }
383 Some(PageType::Branch) => {
384 let child = branch_node::get_child(page, 0);
385 self.path.push((current, 0));
386 current = child;
387 }
388 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
389 }
390 }
391 }
392 }
393
394 self.valid = false;
395 Ok(false)
396 }
397
398 fn retreat_leaf(&mut self, pages: &impl PageMap) -> Result<bool> {
400 while let Some((parent_id, child_idx)) = self.path.pop() {
401 if child_idx > 0 {
402 let prev_child_idx = child_idx - 1;
403 let parent = pages
404 .get_page(&parent_id)
405 .ok_or(Error::PageOutOfBounds(parent_id))?;
406 let prev_child = branch_node::get_child(parent, prev_child_idx);
407 self.path.push((parent_id, prev_child_idx));
408
409 let mut current = prev_child;
410 loop {
411 let page = pages
412 .get_page(¤t)
413 .ok_or(Error::PageOutOfBounds(current))?;
414 match page.page_type() {
415 Some(PageType::Leaf) => {
416 self.leaf = current;
417 let n = page.num_cells();
418 if n > 0 {
419 self.cell_idx = n - 1;
420 self.valid = true;
421 } else {
422 self.valid = false;
423 }
424 return Ok(self.valid);
425 }
426 Some(PageType::Branch) => {
427 let n = page.num_cells() as usize;
428 let child = page.right_child();
429 self.path.push((current, n));
430 current = child;
431 }
432 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
433 }
434 }
435 }
436 }
438
439 self.valid = false;
441 Ok(false)
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use super::*;
448 use crate::allocator::PageAllocator;
449 use crate::btree::BTree;
450 use citadel_core::types::TxnId;
451
452 fn build_tree(keys: &[&[u8]]) -> (rustc_hash::FxHashMap<PageId, Page>, BTree) {
453 let mut pages = rustc_hash::FxHashMap::default();
454 let mut alloc = PageAllocator::new(0);
455 let mut tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
456 for k in keys {
457 tree.insert(&mut pages, &mut alloc, TxnId(1), k, ValueType::Inline, k)
458 .unwrap();
459 }
460 (pages, tree)
461 }
462
463 #[test]
464 fn cursor_forward_iteration() {
465 let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
466 let mut cursor = Cursor::first(&pages, tree.root).unwrap();
467
468 let mut collected = Vec::new();
469 while cursor.is_valid() {
470 let entry = cursor.current(&pages).unwrap();
471 collected.push(entry.key.clone());
472 cursor.next(&pages).unwrap();
473 }
474
475 assert_eq!(collected, vec![b"a", b"b", b"c", b"d", b"e"]);
476 }
477
478 #[test]
479 fn cursor_backward_iteration() {
480 let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
481 let mut cursor = Cursor::last(&pages, tree.root).unwrap();
482
483 let mut collected = Vec::new();
484 while cursor.is_valid() {
485 let entry = cursor.current(&pages).unwrap();
486 collected.push(entry.key.clone());
487 cursor.prev(&pages).unwrap();
488 }
489
490 assert_eq!(collected, vec![b"e", b"d", b"c", b"b", b"a"]);
491 }
492
493 #[test]
494 fn cursor_seek() {
495 let (pages, tree) = build_tree(&[b"b", b"d", b"f", b"h"]);
496 let cursor = Cursor::seek(&pages, tree.root, b"c").unwrap();
498 assert!(cursor.is_valid());
499 let entry = cursor.current(&pages).unwrap();
500 assert_eq!(entry.key, b"d");
501 }
502
503 #[test]
504 fn cursor_seek_exact() {
505 let (pages, tree) = build_tree(&[b"b", b"d", b"f"]);
506 let cursor = Cursor::seek(&pages, tree.root, b"d").unwrap();
507 assert!(cursor.is_valid());
508 let entry = cursor.current(&pages).unwrap();
509 assert_eq!(entry.key, b"d");
510 }
511
512 #[test]
513 fn cursor_seek_past_end() {
514 let (pages, tree) = build_tree(&[b"a", b"b", b"c"]);
515 let cursor = Cursor::seek(&pages, tree.root, b"z").unwrap();
516 assert!(!cursor.is_valid());
517 }
518
519 #[test]
520 fn cursor_empty_tree() {
521 let mut pages = rustc_hash::FxHashMap::default();
522 let mut alloc = PageAllocator::new(0);
523 let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
524
525 let cursor = Cursor::first(&pages, tree.root).unwrap();
526 assert!(!cursor.is_valid());
527 }
528
529 struct TrackingLoader {
531 pages: rustc_hash::FxHashMap<PageId, Page>,
532 touched: std::collections::HashSet<PageId>,
533 }
534
535 impl TrackingLoader {
536 fn new(pages: rustc_hash::FxHashMap<PageId, Page>) -> Self {
537 Self {
538 pages,
539 touched: std::collections::HashSet::new(),
540 }
541 }
542 fn unique_pages_touched(&self) -> usize {
543 self.touched.len()
544 }
545 }
546
547 impl PageMap for TrackingLoader {
548 fn get_page(&self, id: &PageId) -> Option<&Page> {
549 self.pages.get(id)
550 }
551 }
552
553 impl PageLoader for TrackingLoader {
554 fn ensure_loaded(&mut self, id: PageId) -> citadel_core::Result<()> {
555 if self.pages.contains_key(&id) {
556 self.touched.insert(id);
557 Ok(())
558 } else {
559 Err(citadel_core::Error::PageOutOfBounds(id))
560 }
561 }
562 }
563
564 #[test]
565 fn lazy_cursor_forward() {
566 let keys: Vec<Vec<u8>> = (0..2000u32)
567 .map(|i| format!("{i:06}").into_bytes())
568 .collect();
569 let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
570 let (pages, tree) = build_tree(&key_refs);
571
572 let mut loader = TrackingLoader::new(pages);
573 let mut cursor = Cursor::seek_lazy(&mut loader, tree.root, b"").unwrap();
574 let mut count = 0u32;
575 while cursor.is_valid() {
576 let entry = cursor.current_ref_lazy(&mut loader);
577 assert!(entry.is_some());
578 count += 1;
579 cursor.next_lazy(&mut loader).unwrap();
580 }
581 assert_eq!(count, 2000);
582 }
583
584 #[test]
585 fn lazy_cursor_range_loads_fewer_pages() {
586 let keys: Vec<Vec<u8>> = (0..2000u32)
587 .map(|i| format!("{i:06}").into_bytes())
588 .collect();
589 let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
590 let (pages, tree) = build_tree(&key_refs);
591 let total_pages = pages.len();
592
593 let mut loader = TrackingLoader::new(pages);
594 let mut cursor = Cursor::seek_lazy(&mut loader, tree.root, b"001000").unwrap();
595 let mut count = 0u32;
596 while cursor.is_valid() {
597 if let Some(entry) = cursor.current_ref_lazy(&mut loader) {
598 if entry.key > b"001009".as_slice() {
599 break;
600 }
601 count += 1;
602 }
603 cursor.next_lazy(&mut loader).unwrap();
604 }
605 assert_eq!(count, 10);
606 let touched = loader.unique_pages_touched();
608 assert!(
609 touched < total_pages,
610 "lazy touched {} unique pages but tree has {} total",
611 touched,
612 total_pages,
613 );
614 }
615
616 #[test]
617 fn cursor_large_tree_forward() {
618 let keys: Vec<Vec<u8>> = (0..2000u32)
619 .map(|i| format!("{i:06}").into_bytes())
620 .collect();
621 let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
622 let (pages, tree) = build_tree(&key_refs);
623
624 let mut cursor = Cursor::first(&pages, tree.root).unwrap();
625 let mut count = 0u32;
626 let mut prev_key: Option<Vec<u8>> = None;
627 while cursor.is_valid() {
628 let entry = cursor.current(&pages).unwrap();
629 if let Some(ref pk) = prev_key {
630 assert!(entry.key > *pk, "keys should be in sorted order");
631 }
632 prev_key = Some(entry.key);
633 count += 1;
634 cursor.next(&pages).unwrap();
635 }
636 assert_eq!(count, 2000);
637 }
638}