use std::collections::HashMap;
use std::sync::Arc;
use citadel_core::types::{PageId, PageType, ValueType};
use citadel_core::{Error, Result};
use citadel_page::leaf_node::LeafCell;
use citadel_page::page::Page;
use citadel_page::{branch_node, leaf_node};
pub trait PageMap {
fn get_page(&self, id: &PageId) -> Option<&Page>;
}
pub trait PageLoader: PageMap {
fn ensure_loaded(&mut self, id: PageId) -> Result<()>;
}
impl PageMap for HashMap<PageId, Page> {
fn get_page(&self, id: &PageId) -> Option<&Page> {
self.get(id)
}
}
impl PageMap for HashMap<PageId, Arc<Page>> {
fn get_page(&self, id: &PageId) -> Option<&Page> {
self.get(id).map(|a| a.as_ref())
}
}
pub struct CursorEntry {
pub key: Vec<u8>,
pub val_type: ValueType,
pub value: Vec<u8>,
}
pub struct Cursor {
path: Vec<(PageId, usize)>,
leaf: PageId,
cell_idx: u16,
valid: bool,
}
impl Cursor {
pub fn seek(pages: &impl PageMap, root: PageId, key: &[u8]) -> Result<Self> {
let mut path = Vec::new();
let mut current = root;
loop {
let page = pages
.get_page(¤t)
.ok_or(Error::PageOutOfBounds(current))?;
match page.page_type() {
Some(PageType::Leaf) => break,
Some(PageType::Branch) => {
let child_idx = branch_node::search_child_index(page, key);
let child = branch_node::get_child(page, child_idx);
path.push((current, child_idx));
current = child;
}
_ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
}
}
let page = pages.get_page(¤t).unwrap();
let cell_idx = match leaf_node::search(page, key) {
Ok(idx) => idx,
Err(idx) => idx,
};
let valid = cell_idx < page.num_cells();
let mut cursor = Self {
path,
leaf: current,
cell_idx,
valid,
};
if !valid && page.num_cells() > 0 {
cursor.advance_leaf(pages)?;
} else if page.num_cells() == 0 {
cursor.valid = false;
}
Ok(cursor)
}
pub fn first(pages: &impl PageMap, root: PageId) -> Result<Self> {
let mut path = Vec::new();
let mut current = root;
loop {
let page = pages
.get_page(¤t)
.ok_or(Error::PageOutOfBounds(current))?;
match page.page_type() {
Some(PageType::Leaf) => break,
Some(PageType::Branch) => {
let child = branch_node::get_child(page, 0);
path.push((current, 0));
current = child;
}
_ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
}
}
let page = pages.get_page(¤t).unwrap();
let valid = page.num_cells() > 0;
Ok(Self {
path,
leaf: current,
cell_idx: 0,
valid,
})
}
pub fn last(pages: &impl PageMap, root: PageId) -> Result<Self> {
let mut path = Vec::new();
let mut current = root;
loop {
let page = pages
.get_page(¤t)
.ok_or(Error::PageOutOfBounds(current))?;
match page.page_type() {
Some(PageType::Leaf) => break,
Some(PageType::Branch) => {
let n = page.num_cells() as usize;
let child = page.right_child();
path.push((current, n));
current = child;
}
_ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
}
}
let page = pages.get_page(¤t).unwrap();
let n = page.num_cells();
let valid = n > 0;
let cell_idx = if valid { n - 1 } else { 0 };
Ok(Self {
path,
leaf: current,
cell_idx,
valid,
})
}
pub fn is_valid(&self) -> bool {
self.valid
}
pub fn leaf_page_id(&self) -> PageId {
self.leaf
}
pub fn cell_index(&self) -> u16 {
self.cell_idx
}
pub fn set_leaf_page_id(&mut self, id: PageId) {
self.leaf = id;
}
pub fn current(&self, pages: &impl PageMap) -> Option<CursorEntry> {
if !self.valid {
return None;
}
let page = pages.get_page(&self.leaf)?;
let cell = leaf_node::read_cell(page, self.cell_idx);
Some(CursorEntry {
key: cell.key.to_vec(),
val_type: cell.val_type,
value: cell.value.to_vec(),
})
}
pub fn current_ref<'a, P: PageMap>(&self, pages: &'a P) -> Option<LeafCell<'a>> {
if !self.valid {
return None;
}
let page = pages.get_page(&self.leaf)?;
Some(leaf_node::read_cell(page, self.cell_idx))
}
pub fn next(&mut self, pages: &impl PageMap) -> Result<bool> {
if !self.valid {
return Ok(false);
}
let page = pages
.get_page(&self.leaf)
.ok_or(Error::PageOutOfBounds(self.leaf))?;
if self.cell_idx + 1 < page.num_cells() {
self.cell_idx += 1;
return Ok(true);
}
self.advance_leaf(pages)
}
pub fn prev(&mut self, pages: &impl PageMap) -> Result<bool> {
if !self.valid {
return Ok(false);
}
if self.cell_idx > 0 {
self.cell_idx -= 1;
return Ok(true);
}
self.retreat_leaf(pages)
}
fn advance_leaf(&mut self, pages: &impl PageMap) -> Result<bool> {
while let Some((parent_id, child_idx)) = self.path.pop() {
let parent = pages
.get_page(&parent_id)
.ok_or(Error::PageOutOfBounds(parent_id))?;
let n = parent.num_cells() as usize;
if child_idx < n {
let next_child_idx = child_idx + 1;
let next_child = branch_node::get_child(parent, next_child_idx);
self.path.push((parent_id, next_child_idx));
let mut current = next_child;
loop {
let page = pages
.get_page(¤t)
.ok_or(Error::PageOutOfBounds(current))?;
match page.page_type() {
Some(PageType::Leaf) => {
self.leaf = current;
self.cell_idx = 0;
self.valid = page.num_cells() > 0;
return Ok(self.valid);
}
Some(PageType::Branch) => {
let child = branch_node::get_child(page, 0);
self.path.push((current, 0));
current = child;
}
_ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
}
}
}
}
self.valid = false;
Ok(false)
}
pub fn seek_lazy(pages: &mut impl PageLoader, root: PageId, key: &[u8]) -> Result<Self> {
let mut path = Vec::new();
let mut current = root;
loop {
pages.ensure_loaded(current)?;
let page = pages
.get_page(¤t)
.ok_or(Error::PageOutOfBounds(current))?;
match page.page_type() {
Some(PageType::Leaf) => break,
Some(PageType::Branch) => {
let child_idx = branch_node::search_child_index(page, key);
let child = branch_node::get_child(page, child_idx);
path.push((current, child_idx));
current = child;
}
_ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
}
}
let page = pages.get_page(¤t).unwrap();
let cell_idx = match leaf_node::search(page, key) {
Ok(idx) => idx,
Err(idx) => idx,
};
let valid = cell_idx < page.num_cells();
let mut cursor = Self {
path,
leaf: current,
cell_idx,
valid,
};
if !valid && page.num_cells() > 0 {
cursor.advance_leaf_lazy(pages)?;
} else if page.num_cells() == 0 {
cursor.valid = false;
}
Ok(cursor)
}
pub fn current_ref_lazy<'a, P: PageLoader>(&self, pages: &'a mut P) -> Option<LeafCell<'a>> {
if !self.valid {
return None;
}
pages.ensure_loaded(self.leaf).ok()?;
let page = pages.get_page(&self.leaf)?;
Some(leaf_node::read_cell(page, self.cell_idx))
}
pub fn next_lazy(&mut self, pages: &mut impl PageLoader) -> Result<bool> {
if !self.valid {
return Ok(false);
}
pages.ensure_loaded(self.leaf)?;
let page = pages
.get_page(&self.leaf)
.ok_or(Error::PageOutOfBounds(self.leaf))?;
if self.cell_idx + 1 < page.num_cells() {
self.cell_idx += 1;
return Ok(true);
}
self.advance_leaf_lazy(pages)
}
fn advance_leaf_lazy(&mut self, pages: &mut impl PageLoader) -> Result<bool> {
while let Some((parent_id, child_idx)) = self.path.pop() {
let parent = pages
.get_page(&parent_id)
.ok_or(Error::PageOutOfBounds(parent_id))?;
let n = parent.num_cells() as usize;
if child_idx < n {
let next_child_idx = child_idx + 1;
let next_child = branch_node::get_child(parent, next_child_idx);
self.path.push((parent_id, next_child_idx));
let mut current = next_child;
loop {
pages.ensure_loaded(current)?;
let page = pages
.get_page(¤t)
.ok_or(Error::PageOutOfBounds(current))?;
match page.page_type() {
Some(PageType::Leaf) => {
self.leaf = current;
self.cell_idx = 0;
self.valid = page.num_cells() > 0;
return Ok(self.valid);
}
Some(PageType::Branch) => {
let child = branch_node::get_child(page, 0);
self.path.push((current, 0));
current = child;
}
_ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
}
}
}
}
self.valid = false;
Ok(false)
}
fn retreat_leaf(&mut self, pages: &impl PageMap) -> Result<bool> {
while let Some((parent_id, child_idx)) = self.path.pop() {
if child_idx > 0 {
let prev_child_idx = child_idx - 1;
let parent = pages
.get_page(&parent_id)
.ok_or(Error::PageOutOfBounds(parent_id))?;
let prev_child = branch_node::get_child(parent, prev_child_idx);
self.path.push((parent_id, prev_child_idx));
let mut current = prev_child;
loop {
let page = pages
.get_page(¤t)
.ok_or(Error::PageOutOfBounds(current))?;
match page.page_type() {
Some(PageType::Leaf) => {
self.leaf = current;
let n = page.num_cells();
if n > 0 {
self.cell_idx = n - 1;
self.valid = true;
} else {
self.valid = false;
}
return Ok(self.valid);
}
Some(PageType::Branch) => {
let n = page.num_cells() as usize;
let child = page.right_child();
self.path.push((current, n));
current = child;
}
_ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
}
}
}
}
self.valid = false;
Ok(false)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::allocator::PageAllocator;
use crate::btree::BTree;
use citadel_core::types::TxnId;
fn build_tree(keys: &[&[u8]]) -> (HashMap<PageId, Page>, BTree) {
let mut pages = HashMap::new();
let mut alloc = PageAllocator::new(0);
let mut tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
for k in keys {
tree.insert(&mut pages, &mut alloc, TxnId(1), k, ValueType::Inline, k)
.unwrap();
}
(pages, tree)
}
#[test]
fn cursor_forward_iteration() {
let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
let mut cursor = Cursor::first(&pages, tree.root).unwrap();
let mut collected = Vec::new();
while cursor.is_valid() {
let entry = cursor.current(&pages).unwrap();
collected.push(entry.key.clone());
cursor.next(&pages).unwrap();
}
assert_eq!(collected, vec![b"a", b"b", b"c", b"d", b"e"]);
}
#[test]
fn cursor_backward_iteration() {
let (pages, tree) = build_tree(&[b"c", b"a", b"e", b"b", b"d"]);
let mut cursor = Cursor::last(&pages, tree.root).unwrap();
let mut collected = Vec::new();
while cursor.is_valid() {
let entry = cursor.current(&pages).unwrap();
collected.push(entry.key.clone());
cursor.prev(&pages).unwrap();
}
assert_eq!(collected, vec![b"e", b"d", b"c", b"b", b"a"]);
}
#[test]
fn cursor_seek() {
let (pages, tree) = build_tree(&[b"b", b"d", b"f", b"h"]);
let cursor = Cursor::seek(&pages, tree.root, b"c").unwrap();
assert!(cursor.is_valid());
let entry = cursor.current(&pages).unwrap();
assert_eq!(entry.key, b"d");
}
#[test]
fn cursor_seek_exact() {
let (pages, tree) = build_tree(&[b"b", b"d", b"f"]);
let cursor = Cursor::seek(&pages, tree.root, b"d").unwrap();
assert!(cursor.is_valid());
let entry = cursor.current(&pages).unwrap();
assert_eq!(entry.key, b"d");
}
#[test]
fn cursor_seek_past_end() {
let (pages, tree) = build_tree(&[b"a", b"b", b"c"]);
let cursor = Cursor::seek(&pages, tree.root, b"z").unwrap();
assert!(!cursor.is_valid());
}
#[test]
fn cursor_empty_tree() {
let mut pages = HashMap::new();
let mut alloc = PageAllocator::new(0);
let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
let cursor = Cursor::first(&pages, tree.root).unwrap();
assert!(!cursor.is_valid());
}
struct TrackingLoader {
pages: HashMap<PageId, Page>,
touched: std::collections::HashSet<PageId>,
}
impl TrackingLoader {
fn new(pages: HashMap<PageId, Page>) -> Self {
Self {
pages,
touched: std::collections::HashSet::new(),
}
}
fn unique_pages_touched(&self) -> usize {
self.touched.len()
}
}
impl PageMap for TrackingLoader {
fn get_page(&self, id: &PageId) -> Option<&Page> {
self.pages.get(id)
}
}
impl PageLoader for TrackingLoader {
fn ensure_loaded(&mut self, id: PageId) -> citadel_core::Result<()> {
if self.pages.contains_key(&id) {
self.touched.insert(id);
Ok(())
} else {
Err(citadel_core::Error::PageOutOfBounds(id))
}
}
}
#[test]
fn lazy_cursor_forward() {
let keys: Vec<Vec<u8>> = (0..2000u32)
.map(|i| format!("{i:06}").into_bytes())
.collect();
let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
let (pages, tree) = build_tree(&key_refs);
let mut loader = TrackingLoader::new(pages);
let mut cursor = Cursor::seek_lazy(&mut loader, tree.root, b"").unwrap();
let mut count = 0u32;
while cursor.is_valid() {
let entry = cursor.current_ref_lazy(&mut loader);
assert!(entry.is_some());
count += 1;
cursor.next_lazy(&mut loader).unwrap();
}
assert_eq!(count, 2000);
}
#[test]
fn lazy_cursor_range_loads_fewer_pages() {
let keys: Vec<Vec<u8>> = (0..2000u32)
.map(|i| format!("{i:06}").into_bytes())
.collect();
let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
let (pages, tree) = build_tree(&key_refs);
let total_pages = pages.len();
let mut loader = TrackingLoader::new(pages);
let mut cursor = Cursor::seek_lazy(&mut loader, tree.root, b"001000").unwrap();
let mut count = 0u32;
while cursor.is_valid() {
if let Some(entry) = cursor.current_ref_lazy(&mut loader) {
if entry.key > b"001009".as_slice() {
break;
}
count += 1;
}
cursor.next_lazy(&mut loader).unwrap();
}
assert_eq!(count, 10);
let touched = loader.unique_pages_touched();
assert!(
touched < total_pages,
"lazy touched {} unique pages but tree has {} total",
touched,
total_pages,
);
}
#[test]
fn cursor_large_tree_forward() {
let keys: Vec<Vec<u8>> = (0..2000u32)
.map(|i| format!("{i:06}").into_bytes())
.collect();
let key_refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
let (pages, tree) = build_tree(&key_refs);
let mut cursor = Cursor::first(&pages, tree.root).unwrap();
let mut count = 0u32;
let mut prev_key: Option<Vec<u8>> = None;
while cursor.is_valid() {
let entry = cursor.current(&pages).unwrap();
if let Some(ref pk) = prev_key {
assert!(entry.key > *pk, "keys should be in sorted order");
}
prev_key = Some(entry.key);
count += 1;
cursor.next(&pages).unwrap();
}
assert_eq!(count, 2000);
}
}