use super::node::{Node, NodeId};
use super::tree::BPlusTree;
use super::version::Snapshot;
use std::fmt::Debug;
use std::sync::{RwLock, RwLockReadGuard};
fn recover_read_guard<'a, T>(lock: &'a RwLock<T>) -> RwLockReadGuard<'a, T> {
match lock.read() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CursorDirection {
Forward,
Backward,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CursorState {
BeforeFirst,
Valid,
AfterLast,
Invalid,
}
pub struct Cursor<'a, K, V>
where
K: Clone + Ord + Debug + Send + Sync,
V: Clone + Debug + Send + Sync,
{
tree: &'a BPlusTree<K, V>,
snapshot: Snapshot,
current_leaf: Option<NodeId>,
current_index: usize,
direction: CursorDirection,
state: CursorState,
current_entry: Option<(K, V)>,
leaf_cache: Vec<(K, V)>,
leaf_cache_idx: usize,
}
impl<'a, K, V> Cursor<'a, K, V>
where
K: Clone + Ord + Debug + Send + Sync,
V: Clone + Debug + Send + Sync,
{
pub fn new(tree: &'a BPlusTree<K, V>, snapshot: Snapshot) -> Self {
Self {
tree,
snapshot,
current_leaf: None,
current_index: 0,
direction: CursorDirection::Forward,
state: CursorState::BeforeFirst,
current_entry: None,
leaf_cache: Vec::new(),
leaf_cache_idx: 0,
}
}
pub fn at_key(tree: &'a BPlusTree<K, V>, snapshot: Snapshot, key: &K) -> Self {
let mut cursor = Self::new(tree, snapshot);
cursor.seek(key);
cursor
}
pub fn reverse(tree: &'a BPlusTree<K, V>, snapshot: Snapshot) -> Self {
Self {
tree,
snapshot,
current_leaf: None,
current_index: 0,
direction: CursorDirection::Backward,
state: CursorState::AfterLast,
current_entry: None,
leaf_cache: Vec::new(),
leaf_cache_idx: 0,
}
}
pub fn state(&self) -> CursorState {
self.state
}
pub fn is_valid(&self) -> bool {
self.state == CursorState::Valid
}
pub fn key(&self) -> Option<&K> {
self.current_entry.as_ref().map(|(k, _)| k)
}
pub fn value(&self) -> Option<&V> {
self.current_entry.as_ref().map(|(_, v)| v)
}
pub fn entry(&self) -> Option<(&K, &V)> {
self.current_entry.as_ref().map(|(k, v)| (k, v))
}
pub fn first(&mut self) -> bool {
let first_leaf = match *recover_read_guard(&self.tree.first_leaf) {
Some(id) => id,
None => {
self.state = CursorState::AfterLast;
return false;
}
};
self.current_leaf = Some(first_leaf);
self.current_index = 0;
self.direction = CursorDirection::Forward;
self.leaf_cache.clear();
self.leaf_cache_idx = 0;
self.load_current()
}
pub fn last(&mut self) -> bool {
let mut leaf_id = match *recover_read_guard(&self.tree.first_leaf) {
Some(id) => id,
None => {
self.state = CursorState::BeforeFirst;
return false;
}
};
while let Some(node) = self.tree.get_node(leaf_id) {
let node = recover_read_guard(&node);
if let Node::Leaf(leaf) = &*node {
match leaf.next {
Some(next_id) => {
leaf_id = next_id;
}
None => break,
}
} else {
break;
}
}
self.current_leaf = Some(leaf_id);
if let Some(node) = self.tree.get_node(leaf_id) {
let node = recover_read_guard(&node);
if let Node::Leaf(leaf) = &*node {
self.current_index = leaf.keys.len().saturating_sub(1);
}
}
self.direction = CursorDirection::Backward;
self.leaf_cache.clear();
self.leaf_cache_idx = 0;
self.load_current_at(self.current_index)
}
pub fn seek(&mut self, key: &K) -> bool {
let leaf_id = match self.find_leaf(key) {
Some(id) => id,
None => {
self.state = CursorState::AfterLast;
return false;
}
};
self.current_leaf = Some(leaf_id);
if let Some(node) = self.tree.get_node(leaf_id) {
let node = recover_read_guard(&node);
if let Node::Leaf(leaf) = &*node {
match leaf.keys.binary_search(key) {
Ok(i) => self.current_index = i,
Err(i) => self.current_index = i,
}
}
}
self.leaf_cache.clear();
self.leaf_cache_idx = 0;
self.load_current()
}
pub fn next(&mut self) -> bool {
match self.state {
CursorState::BeforeFirst => self.first(),
CursorState::AfterLast | CursorState::Invalid => false,
CursorState::Valid => {
self.leaf_cache_idx += 1;
if self.leaf_cache_idx < self.leaf_cache.len() {
let entry = self.leaf_cache[self.leaf_cache_idx].clone();
self.current_entry = Some(entry);
return true;
}
self.move_to_next_leaf()
}
}
}
pub fn prev(&mut self) -> bool {
match self.state {
CursorState::AfterLast => self.last(),
CursorState::BeforeFirst | CursorState::Invalid => false,
CursorState::Valid => {
if self.current_index == 0 {
self.move_to_prev_leaf()
} else {
self.current_index -= 1;
self.leaf_cache.clear();
self.leaf_cache_idx = 0;
self.load_current_at(self.current_index)
}
}
}
}
fn load_current_at(&mut self, index: usize) -> bool {
let leaf_id = match self.current_leaf {
Some(id) => id,
None => {
self.state = CursorState::Invalid;
self.current_entry = None;
return false;
}
};
if let Some(node) = self.tree.get_node(leaf_id) {
let node = recover_read_guard(&node);
if let Node::Leaf(leaf) = &*node {
if index < leaf.keys.len() {
if let Some(value) = leaf.entries[index].get(&self.snapshot) {
self.current_entry = Some((leaf.keys[index].clone(), value.clone()));
self.state = CursorState::Valid;
return true;
}
}
}
}
self.state = CursorState::BeforeFirst;
self.current_entry = None;
false
}
fn find_leaf(&self, key: &K) -> Option<NodeId> {
let root_id = (*recover_read_guard(&self.tree.root))?;
self.find_leaf_from(root_id, key)
}
fn find_leaf_from(&self, node_id: NodeId, key: &K) -> Option<NodeId> {
let node = self.tree.get_node(node_id)?;
let node = recover_read_guard(&node);
match &*node {
Node::Internal(internal) => {
let child_id = internal.get_child(key)?;
drop(node);
self.find_leaf_from(child_id, key)
}
Node::Leaf(_) => Some(node_id),
}
}
fn check_bounds(&self) -> bool {
if let Some(leaf_id) = self.current_leaf {
if let Some(node) = self.tree.get_node(leaf_id) {
let node = recover_read_guard(&node);
if let Node::Leaf(leaf) = &*node {
return self.current_index < leaf.keys.len();
}
}
}
false
}
fn fill_leaf_cache(&mut self) -> bool {
self.leaf_cache.clear();
self.leaf_cache_idx = 0;
let leaf_id = match self.current_leaf {
Some(id) => id,
None => return false,
};
if let Some(node) = self.tree.get_node(leaf_id) {
let node = recover_read_guard(&node);
if let Node::Leaf(leaf) = &*node {
for i in self.current_index..leaf.keys.len() {
if let Some(value) = leaf.entries[i].get(&self.snapshot) {
self.leaf_cache.push((leaf.keys[i].clone(), value.clone()));
}
}
}
}
!self.leaf_cache.is_empty()
}
fn load_current(&mut self) -> bool {
if self.leaf_cache_idx < self.leaf_cache.len() {
let entry = self.leaf_cache[self.leaf_cache_idx].clone();
self.current_entry = Some(entry);
self.state = CursorState::Valid;
return true;
}
if self.fill_leaf_cache() {
let entry = self.leaf_cache[0].clone();
self.current_entry = Some(entry);
self.state = CursorState::Valid;
true
} else {
self.move_to_next_leaf()
}
}
fn move_to_next_leaf(&mut self) -> bool {
let leaf_id = match self.current_leaf {
Some(id) => id,
None => {
self.state = CursorState::AfterLast;
return false;
}
};
let next_leaf = if let Some(node) = self.tree.get_node(leaf_id) {
let node = recover_read_guard(&node);
if let Node::Leaf(leaf) = &*node {
leaf.next
} else {
None
}
} else {
None
};
match next_leaf {
Some(next_id) => {
self.current_leaf = Some(next_id);
self.current_index = 0;
self.leaf_cache.clear();
self.leaf_cache_idx = 0;
self.load_current()
}
None => {
self.state = CursorState::AfterLast;
self.current_entry = None;
false
}
}
}
fn move_to_prev_leaf(&mut self) -> bool {
let leaf_id = match self.current_leaf {
Some(id) => id,
None => {
self.state = CursorState::BeforeFirst;
return false;
}
};
let prev_leaf = if let Some(node) = self.tree.get_node(leaf_id) {
let node = recover_read_guard(&node);
if let Node::Leaf(leaf) = &*node {
leaf.prev
} else {
None
}
} else {
None
};
match prev_leaf {
Some(prev_id) => {
self.current_leaf = Some(prev_id);
if let Some(node) = self.tree.get_node(prev_id) {
let node = recover_read_guard(&node);
if let Node::Leaf(leaf) = &*node {
self.current_index = leaf.keys.len().saturating_sub(1);
}
}
self.leaf_cache.clear();
self.leaf_cache_idx = 0;
self.load_current_at(self.current_index)
}
None => {
self.state = CursorState::BeforeFirst;
self.current_entry = None;
false
}
}
}
}
impl<'a, K, V> Iterator for Cursor<'a, K, V>
where
K: Clone + Ord + Debug + Send + Sync,
V: Clone + Debug + Send + Sync,
{
type Item = (K, V);
fn next(&mut self) -> Option<Self::Item> {
if self.state == CursorState::BeforeFirst {
if !self.first() {
return None;
}
self.current_entry.clone()
} else if self.next() {
self.current_entry.clone()
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::btree::{BPlusTree, BTreeConfig};
fn create_test_tree() -> BPlusTree<i32, String> {
use crate::storage::primitives::ids::TxnId;
let tree = BPlusTree::new(BTreeConfig::new().with_order(4));
for i in 1..=10 {
tree.insert(i, format!("v{}", i), TxnId(1));
}
tree
}
#[test]
fn test_cursor_forward() {
let tree = create_test_tree();
let snapshot = tree.snapshot();
let cursor = Cursor::new(&tree, snapshot);
let results: Vec<_> = cursor.collect();
assert_eq!(results.len(), 10);
assert_eq!(results[0], (1, "v1".to_string()));
assert_eq!(results[9], (10, "v10".to_string()));
}
#[test]
fn test_cursor_first_last() {
let tree = create_test_tree();
let snapshot = tree.snapshot();
let mut cursor = Cursor::new(&tree, snapshot);
assert!(Cursor::first(&mut cursor));
assert_eq!(cursor.key(), Some(&1));
let mut cursor2 = Cursor::new(&tree, tree.snapshot());
assert!(Cursor::last(&mut cursor2));
assert_eq!(cursor2.key(), Some(&10));
}
#[test]
fn test_cursor_seek() {
let tree = create_test_tree();
let snapshot = tree.snapshot();
let mut cursor = Cursor::new(&tree, snapshot);
assert!(cursor.seek(&5));
assert_eq!(cursor.key(), Some(&5));
assert!(cursor.seek(&7));
assert_eq!(cursor.key(), Some(&7));
}
#[test]
fn test_cursor_prev() {
let tree = create_test_tree();
let snapshot = tree.snapshot();
let mut cursor = Cursor::new(&tree, snapshot);
Cursor::last(&mut cursor);
assert_eq!(cursor.key(), Some(&10));
cursor.prev();
assert_eq!(cursor.key(), Some(&9));
cursor.prev();
assert_eq!(cursor.key(), Some(&8));
}
#[test]
fn test_cursor_empty_tree() {
let tree: BPlusTree<i32, String> = BPlusTree::with_default_config();
let snapshot = tree.snapshot();
let mut cursor = Cursor::new(&tree, snapshot);
assert!(!cursor.first());
assert!(!cursor.is_valid());
}
}