1use std::collections::HashMap;
7use std::sync::Arc;
8
9use citadel_core::types::{PageId, PageType, ValueType};
10use citadel_core::{Error, Result};
11use citadel_page::leaf_node::LeafCell;
12use citadel_page::page::Page;
13use citadel_page::{branch_node, leaf_node};
14
15pub trait PageMap {
16 fn get_page(&self, id: &PageId) -> Option<&Page>;
17}
18
19impl PageMap for HashMap<PageId, Page> {
20 fn get_page(&self, id: &PageId) -> Option<&Page> {
21 self.get(id)
22 }
23}
24
25impl PageMap for HashMap<PageId, Arc<Page>> {
26 fn get_page(&self, id: &PageId) -> Option<&Page> {
27 self.get(id).map(|a| a.as_ref())
28 }
29}
30
31pub struct CursorEntry {
32 pub key: Vec<u8>,
33 pub val_type: ValueType,
34 pub value: Vec<u8>,
35}
36
37pub struct Cursor {
40 path: Vec<(PageId, usize)>,
42 leaf: PageId,
44 cell_idx: u16,
46 valid: bool,
48}
49
50impl Cursor {
51 pub fn seek(pages: &impl PageMap, root: PageId, key: &[u8]) -> Result<Self> {
54 let mut path = Vec::new();
55 let mut current = root;
56
57 loop {
59 let page = pages
60 .get_page(¤t)
61 .ok_or(Error::PageOutOfBounds(current))?;
62 match page.page_type() {
63 Some(PageType::Leaf) => break,
64 Some(PageType::Branch) => {
65 let child_idx = branch_node::search_child_index(page, key);
66 let child = branch_node::get_child(page, child_idx);
67 path.push((current, child_idx));
68 current = child;
69 }
70 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
71 }
72 }
73
74 let page = pages.get_page(¤t).unwrap();
76 let cell_idx = match leaf_node::search(page, key) {
77 Ok(idx) => idx,
78 Err(idx) => idx,
79 };
80
81 let valid = cell_idx < page.num_cells();
82
83 let mut cursor = Self {
84 path,
85 leaf: current,
86 cell_idx,
87 valid,
88 };
89
90 if !valid && page.num_cells() > 0 {
92 cursor.advance_leaf(pages)?;
93 } else if page.num_cells() == 0 {
94 cursor.valid = false;
95 }
96
97 Ok(cursor)
98 }
99
100 pub fn first(pages: &impl PageMap, root: PageId) -> Result<Self> {
102 let mut path = Vec::new();
103 let mut current = root;
104
105 loop {
107 let page = pages
108 .get_page(¤t)
109 .ok_or(Error::PageOutOfBounds(current))?;
110 match page.page_type() {
111 Some(PageType::Leaf) => break,
112 Some(PageType::Branch) => {
113 let child = branch_node::get_child(page, 0);
114 path.push((current, 0));
115 current = child;
116 }
117 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
118 }
119 }
120
121 let page = pages.get_page(¤t).unwrap();
122 let valid = page.num_cells() > 0;
123
124 Ok(Self {
125 path,
126 leaf: current,
127 cell_idx: 0,
128 valid,
129 })
130 }
131
132 pub fn last(pages: &impl PageMap, root: PageId) -> Result<Self> {
134 let mut path = Vec::new();
135 let mut current = root;
136
137 loop {
139 let page = pages
140 .get_page(¤t)
141 .ok_or(Error::PageOutOfBounds(current))?;
142 match page.page_type() {
143 Some(PageType::Leaf) => break,
144 Some(PageType::Branch) => {
145 let n = page.num_cells() as usize;
146 let child = page.right_child();
147 path.push((current, n));
148 current = child;
149 }
150 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
151 }
152 }
153
154 let page = pages.get_page(¤t).unwrap();
155 let n = page.num_cells();
156 let valid = n > 0;
157 let cell_idx = if valid { n - 1 } else { 0 };
158
159 Ok(Self {
160 path,
161 leaf: current,
162 cell_idx,
163 valid,
164 })
165 }
166
167 pub fn is_valid(&self) -> bool {
169 self.valid
170 }
171
172 pub fn current(&self, pages: &impl PageMap) -> Option<CursorEntry> {
173 if !self.valid {
174 return None;
175 }
176 let page = pages.get_page(&self.leaf)?;
177 let cell = leaf_node::read_cell(page, self.cell_idx);
178 Some(CursorEntry {
179 key: cell.key.to_vec(),
180 val_type: cell.val_type,
181 value: cell.value.to_vec(),
182 })
183 }
184
185 pub fn current_ref<'a, P: PageMap>(&self, pages: &'a P) -> Option<LeafCell<'a>> {
186 if !self.valid {
187 return None;
188 }
189 let page = pages.get_page(&self.leaf)?;
190 Some(leaf_node::read_cell(page, self.cell_idx))
191 }
192
193 pub fn next(&mut self, pages: &impl PageMap) -> Result<bool> {
195 if !self.valid {
196 return Ok(false);
197 }
198
199 let page = pages
200 .get_page(&self.leaf)
201 .ok_or(Error::PageOutOfBounds(self.leaf))?;
202
203 if self.cell_idx + 1 < page.num_cells() {
204 self.cell_idx += 1;
205 return Ok(true);
206 }
207
208 self.advance_leaf(pages)
210 }
211
212 pub fn prev(&mut self, pages: &impl PageMap) -> Result<bool> {
214 if !self.valid {
215 return Ok(false);
216 }
217
218 if self.cell_idx > 0 {
219 self.cell_idx -= 1;
220 return Ok(true);
221 }
222
223 self.retreat_leaf(pages)
225 }
226
227 fn advance_leaf(&mut self, pages: &impl PageMap) -> Result<bool> {
229 while let Some((parent_id, child_idx)) = self.path.pop() {
231 let parent = pages
232 .get_page(&parent_id)
233 .ok_or(Error::PageOutOfBounds(parent_id))?;
234 let n = parent.num_cells() as usize;
235
236 if child_idx < n {
237 let next_child_idx = child_idx + 1;
239 let next_child = branch_node::get_child(parent, next_child_idx);
240 self.path.push((parent_id, next_child_idx));
241
242 let mut current = next_child;
244 loop {
245 let page = pages
246 .get_page(¤t)
247 .ok_or(Error::PageOutOfBounds(current))?;
248 match page.page_type() {
249 Some(PageType::Leaf) => {
250 self.leaf = current;
251 self.cell_idx = 0;
252 self.valid = page.num_cells() > 0;
253 return Ok(self.valid);
254 }
255 Some(PageType::Branch) => {
256 let child = branch_node::get_child(page, 0);
257 self.path.push((current, 0));
258 current = child;
259 }
260 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
261 }
262 }
263 }
264 }
266
267 self.valid = false;
269 Ok(false)
270 }
271
272 fn retreat_leaf(&mut self, pages: &impl PageMap) -> Result<bool> {
274 while let Some((parent_id, child_idx)) = self.path.pop() {
276 if child_idx > 0 {
277 let prev_child_idx = child_idx - 1;
279 let parent = pages
280 .get_page(&parent_id)
281 .ok_or(Error::PageOutOfBounds(parent_id))?;
282 let prev_child = branch_node::get_child(parent, prev_child_idx);
283 self.path.push((parent_id, prev_child_idx));
284
285 let mut current = prev_child;
287 loop {
288 let page = pages
289 .get_page(¤t)
290 .ok_or(Error::PageOutOfBounds(current))?;
291 match page.page_type() {
292 Some(PageType::Leaf) => {
293 self.leaf = current;
294 let n = page.num_cells();
295 if n > 0 {
296 self.cell_idx = n - 1;
297 self.valid = true;
298 } else {
299 self.valid = false;
300 }
301 return Ok(self.valid);
302 }
303 Some(PageType::Branch) => {
304 let n = page.num_cells() as usize;
305 let child = page.right_child();
306 self.path.push((current, n));
307 current = child;
308 }
309 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
310 }
311 }
312 }
313 }
315
316 self.valid = false;
318 Ok(false)
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325 use crate::allocator::PageAllocator;
326 use crate::btree::BTree;
327 use citadel_core::types::TxnId;
328
329 fn build_tree(keys: &[&[u8]]) -> (HashMap<PageId, Page>, BTree) {
330 let mut pages = HashMap::new();
331 let mut alloc = PageAllocator::new(0);
332 let mut tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
333 for k in keys {
334 tree.insert(&mut pages, &mut alloc, TxnId(1), k, ValueType::Inline, k)
335 .unwrap();
336 }
337 (pages, tree)
338 }
339
340 #[test]
341 fn cursor_forward_iteration() {
342 let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
343 let mut cursor = Cursor::first(&pages, tree.root).unwrap();
344
345 let mut collected = Vec::new();
346 while cursor.is_valid() {
347 let entry = cursor.current(&pages).unwrap();
348 collected.push(entry.key.clone());
349 cursor.next(&pages).unwrap();
350 }
351
352 assert_eq!(collected, vec![b"a", b"b", b"c", b"d", b"e"]);
353 }
354
355 #[test]
356 fn cursor_backward_iteration() {
357 let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
358 let mut cursor = Cursor::last(&pages, tree.root).unwrap();
359
360 let mut collected = Vec::new();
361 while cursor.is_valid() {
362 let entry = cursor.current(&pages).unwrap();
363 collected.push(entry.key.clone());
364 cursor.prev(&pages).unwrap();
365 }
366
367 assert_eq!(collected, vec![b"e", b"d", b"c", b"b", b"a"]);
368 }
369
370 #[test]
371 fn cursor_seek() {
372 let (pages, tree) = build_tree(&[b"b", b"d", b"f", b"h"]);
373 let cursor = Cursor::seek(&pages, tree.root, b"c").unwrap();
375 assert!(cursor.is_valid());
376 let entry = cursor.current(&pages).unwrap();
377 assert_eq!(entry.key, b"d");
378 }
379
380 #[test]
381 fn cursor_seek_exact() {
382 let (pages, tree) = build_tree(&[b"b", b"d", b"f"]);
383 let cursor = Cursor::seek(&pages, tree.root, b"d").unwrap();
384 assert!(cursor.is_valid());
385 let entry = cursor.current(&pages).unwrap();
386 assert_eq!(entry.key, b"d");
387 }
388
389 #[test]
390 fn cursor_seek_past_end() {
391 let (pages, tree) = build_tree(&[b"a", b"b", b"c"]);
392 let cursor = Cursor::seek(&pages, tree.root, b"z").unwrap();
393 assert!(!cursor.is_valid());
394 }
395
396 #[test]
397 fn cursor_empty_tree() {
398 let mut pages = HashMap::new();
399 let mut alloc = PageAllocator::new(0);
400 let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
401
402 let cursor = Cursor::first(&pages, tree.root).unwrap();
403 assert!(!cursor.is_valid());
404 }
405
406 #[test]
407 fn cursor_large_tree_forward() {
408 let keys: Vec<Vec<u8>> = (0..2000u32)
409 .map(|i| format!("{i:06}").into_bytes())
410 .collect();
411 let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
412 let (pages, tree) = build_tree(&key_refs);
413
414 let mut cursor = Cursor::first(&pages, tree.root).unwrap();
415 let mut count = 0u32;
416 let mut prev_key: Option<Vec<u8>> = None;
417 while cursor.is_valid() {
418 let entry = cursor.current(&pages).unwrap();
419 if let Some(ref pk) = prev_key {
420 assert!(entry.key > *pk, "keys should be in sorted order");
421 }
422 prev_key = Some(entry.key);
423 count += 1;
424 cursor.next(&pages).unwrap();
425 }
426 assert_eq!(count, 2000);
427 }
428}