use super::version::{Snapshot, Timestamp, TxnId, VersionChain};
use std::fmt::Debug;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
pub type NodeId = u64;
static NODE_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
pub fn next_node_id() -> NodeId {
NODE_ID_COUNTER.fetch_add(1, AtomicOrdering::SeqCst)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NodeType {
Internal,
Leaf,
}
#[derive(Debug)]
pub enum Node<K, V>
where
K: Clone + Ord + Debug,
V: Clone + Debug,
{
Internal(InternalNode<K, V>),
Leaf(LeafNode<K, V>),
}
impl<K, V> Node<K, V>
where
K: Clone + Ord + Debug,
V: Clone + Debug,
{
pub fn id(&self) -> NodeId {
match self {
Node::Internal(n) => n.id,
Node::Leaf(n) => n.id,
}
}
pub fn node_type(&self) -> NodeType {
match self {
Node::Internal(_) => NodeType::Internal,
Node::Leaf(_) => NodeType::Leaf,
}
}
pub fn is_leaf(&self) -> bool {
matches!(self, Node::Leaf(_))
}
pub fn len(&self) -> usize {
match self {
Node::Internal(n) => n.keys.len(),
Node::Leaf(n) => n.keys.len(),
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn is_full(&self, order: usize) -> bool {
self.len() >= order - 1
}
pub fn has_min_keys(&self, order: usize) -> bool {
let min = (order - 1) / 2;
self.len() >= min
}
pub fn min_key(&self) -> Option<&K> {
match self {
Node::Internal(n) => n.keys.first(),
Node::Leaf(n) => n.keys.first(),
}
}
pub fn max_key(&self) -> Option<&K> {
match self {
Node::Internal(n) => n.keys.last(),
Node::Leaf(n) => n.keys.last(),
}
}
pub fn as_internal(&self) -> Option<&InternalNode<K, V>> {
match self {
Node::Internal(n) => Some(n),
_ => None,
}
}
pub fn as_internal_mut(&mut self) -> Option<&mut InternalNode<K, V>> {
match self {
Node::Internal(n) => Some(n),
_ => None,
}
}
pub fn as_leaf(&self) -> Option<&LeafNode<K, V>> {
match self {
Node::Leaf(n) => Some(n),
_ => None,
}
}
pub fn as_leaf_mut(&mut self) -> Option<&mut LeafNode<K, V>> {
match self {
Node::Leaf(n) => Some(n),
_ => None,
}
}
}
#[derive(Debug)]
pub struct InternalNode<K, V>
where
K: Clone + Ord + Debug,
V: Clone + Debug,
{
pub id: NodeId,
pub keys: Vec<K>,
pub children: Vec<NodeId>,
_phantom: std::marker::PhantomData<V>,
}
impl<K, V> InternalNode<K, V>
where
K: Clone + Ord + Debug,
V: Clone + Debug,
{
pub fn new() -> Self {
Self {
id: next_node_id(),
keys: Vec::new(),
children: Vec::new(),
_phantom: std::marker::PhantomData,
}
}
pub fn with_child(child_id: NodeId) -> Self {
let mut node = Self::new();
node.children.push(child_id);
node
}
pub fn find_child_index(&self, key: &K) -> usize {
match self.keys.binary_search(key) {
Ok(i) => i + 1,
Err(i) => i,
}
}
pub fn get_child(&self, key: &K) -> Option<NodeId> {
let idx = self.find_child_index(key);
self.children.get(idx).copied()
}
pub fn insert_at(&mut self, idx: usize, key: K, right_child: NodeId) {
self.keys.insert(idx, key);
self.children.insert(idx + 1, right_child);
}
pub fn insert(&mut self, key: K, right_child: NodeId) {
let idx = match self.keys.binary_search(&key) {
Ok(i) | Err(i) => i,
};
self.insert_at(idx, key, right_child);
}
pub fn remove_at(&mut self, idx: usize) -> (K, NodeId) {
let key = self.keys.remove(idx);
let child = self.children.remove(idx + 1);
(key, child)
}
pub fn split(&mut self) -> (K, Self) {
let mid = self.keys.len() / 2;
let median_key = self.keys.remove(mid);
let right_keys: Vec<K> = self.keys.drain(mid..).collect();
let right_children: Vec<NodeId> = self.children.drain(mid + 1..).collect();
let mut right = Self::new();
right.keys = right_keys;
right.children = right_children;
(median_key, right)
}
pub fn merge(&mut self, separator: K, right: &mut Self) {
self.keys.push(separator);
self.keys.append(&mut right.keys);
self.children.append(&mut right.children);
}
pub fn borrow_from_left(&mut self, left: &mut Self, parent_key: &K) -> K {
let borrowed_key = left
.keys
.pop()
.expect("invariant: borrow_from_left requires left.keys non-empty");
let borrowed_child = left
.children
.pop()
.expect("invariant: internal node has children.len() == keys.len() + 1");
self.keys.insert(0, parent_key.clone());
self.children.insert(0, borrowed_child);
borrowed_key
}
pub fn borrow_from_right(&mut self, right: &mut Self, parent_key: &K) -> K {
let borrowed_key = right.keys.remove(0);
let borrowed_child = right.children.remove(0);
self.keys.push(parent_key.clone());
self.children.push(borrowed_child);
borrowed_key
}
}
impl<K, V> Default for InternalNode<K, V>
where
K: Clone + Ord + Debug,
V: Clone + Debug,
{
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct LeafEntry<K, V>
where
K: Clone + Ord + Debug,
V: Clone + Debug,
{
pub key: K,
pub versions: VersionChain<V>,
}
impl<K, V> LeafEntry<K, V>
where
K: Clone + Ord + Debug,
V: Clone + Debug,
{
pub fn new(key: K, value: V, txn_id: TxnId, timestamp: Timestamp) -> Self {
Self {
key,
versions: VersionChain::with_value(value, txn_id, timestamp),
}
}
pub fn get(&self, snapshot: &Snapshot) -> Option<&V> {
self.versions.get(snapshot)
}
pub fn update(&mut self, value: V, txn_id: TxnId, timestamp: Timestamp) {
self.versions.update(value, txn_id, timestamp);
}
pub fn delete(&mut self, txn_id: TxnId, timestamp: Timestamp) {
self.versions.delete(txn_id, timestamp);
}
}
#[derive(Debug)]
pub struct LeafNode<K, V>
where
K: Clone + Ord + Debug,
V: Clone + Debug,
{
pub id: NodeId,
pub keys: Vec<K>,
pub entries: Vec<LeafEntry<K, V>>,
pub next: Option<NodeId>,
pub prev: Option<NodeId>,
}
impl<K, V> LeafNode<K, V>
where
K: Clone + Ord + Debug,
V: Clone + Debug,
{
pub fn new() -> Self {
Self {
id: next_node_id(),
keys: Vec::new(),
entries: Vec::new(),
next: None,
prev: None,
}
}
pub fn find_index(&self, key: &K) -> Result<usize, usize> {
self.keys.binary_search(key)
}
pub fn get(&self, key: &K, snapshot: &Snapshot) -> Option<&V> {
match self.find_index(key) {
Ok(idx) => self.entries[idx].get(snapshot),
Err(_) => None,
}
}
pub fn contains(&self, key: &K, snapshot: &Snapshot) -> bool {
self.get(key, snapshot).is_some()
}
pub fn insert(&mut self, key: K, value: V, txn_id: TxnId, timestamp: Timestamp) -> bool {
match self.find_index(&key) {
Ok(idx) => {
self.entries[idx].update(value, txn_id, timestamp);
false }
Err(idx) => {
let entry = LeafEntry::new(key.clone(), value, txn_id, timestamp);
self.keys.insert(idx, key);
self.entries.insert(idx, entry);
true }
}
}
pub fn delete(&mut self, key: &K, txn_id: TxnId, timestamp: Timestamp) -> bool {
match self.find_index(key) {
Ok(idx) => {
self.entries[idx].delete(txn_id, timestamp);
true
}
Err(_) => false,
}
}
pub fn split(&mut self) -> (K, Self) {
let mid = self.keys.len() / 2;
let right_keys: Vec<K> = self.keys.drain(mid..).collect();
let right_entries: Vec<LeafEntry<K, V>> = self.entries.drain(mid..).collect();
let mut right = Self::new();
right.keys = right_keys.clone();
right.entries = right_entries;
let separator = right.keys[0].clone();
right.next = self.next;
right.prev = Some(self.id);
self.next = Some(right.id);
(separator, right)
}
pub fn merge(&mut self, right: &mut Self) {
self.keys.append(&mut right.keys);
self.entries.append(&mut right.entries);
self.next = right.next;
}
pub fn borrow_from_left(&mut self, left: &mut Self) -> K {
let borrowed_key = left
.keys
.pop()
.expect("invariant: borrow_from_left requires left.keys non-empty");
let borrowed_entry = left
.entries
.pop()
.expect("invariant: leaf node has keys.len() == entries.len()");
self.keys.insert(0, borrowed_key.clone());
self.entries.insert(0, borrowed_entry);
self.keys[0].clone()
}
pub fn borrow_from_right(&mut self, right: &mut Self) -> K {
let borrowed_key = right.keys.remove(0);
let borrowed_entry = right.entries.remove(0);
self.keys.push(borrowed_key);
self.entries.push(borrowed_entry);
right.keys[0].clone()
}
pub fn range<'a>(
&'a self,
start: Option<&K>,
end: Option<&K>,
snapshot: &'a Snapshot,
) -> impl Iterator<Item = (&'a K, &'a V)> {
let start_idx = match start {
Some(k) => match self.find_index(k) {
Ok(i) => i,
Err(i) => i,
},
None => 0,
};
let end_idx = match end {
Some(k) => match self.find_index(k) {
Ok(i) => i + 1,
Err(i) => i,
},
None => self.keys.len(),
};
self.keys[start_idx..end_idx]
.iter()
.zip(self.entries[start_idx..end_idx].iter())
.filter_map(move |(key, entry)| entry.get(snapshot).map(|v| (key, v)))
}
pub fn gc(&mut self, watermark: Timestamp) -> usize {
let mut removed = 0;
for entry in &mut self.entries {
removed += entry.versions.gc(watermark);
}
removed
}
}
impl<K, V> Default for LeafNode<K, V>
where
K: Clone + Ord + Debug,
V: Clone + Debug,
{
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_internal_node_basic() {
let mut node: InternalNode<i32, String> = InternalNode::new();
assert!(node.keys.is_empty());
assert!(node.children.is_empty());
node.children.push(100);
node.insert(5, 101);
node.insert(10, 102);
node.insert(15, 103);
assert_eq!(node.keys, vec![5, 10, 15]);
assert_eq!(node.children, vec![100, 101, 102, 103]);
}
#[test]
fn test_internal_node_find_child() {
let mut node: InternalNode<i32, String> = InternalNode::new();
node.children.push(100);
node.insert(10, 101);
node.insert(20, 102);
node.insert(30, 103);
assert_eq!(node.find_child_index(&5), 0);
assert_eq!(node.find_child_index(&10), 1);
assert_eq!(node.find_child_index(&15), 1);
assert_eq!(node.find_child_index(&35), 3);
}
#[test]
fn test_internal_node_split() {
let mut node: InternalNode<i32, String> = InternalNode::new();
node.children.push(100);
for i in 1..=6 {
node.insert(i * 10, 100 + i as u64);
}
let (median, right) = node.split();
assert!(node.keys.len() < 6);
assert!(!right.keys.is_empty());
}
#[test]
fn test_leaf_node_basic() {
let mut node: LeafNode<i32, String> = LeafNode::new();
let snapshot = Snapshot::new(TxnId::ZERO, Timestamp(100));
assert!(node.keys.is_empty());
node.insert(10, "ten".to_string(), TxnId(1), Timestamp(1));
node.insert(20, "twenty".to_string(), TxnId(1), Timestamp(2));
node.insert(5, "five".to_string(), TxnId(1), Timestamp(3));
assert_eq!(node.keys, vec![5, 10, 20]);
assert_eq!(node.get(&10, &snapshot), Some(&"ten".to_string()));
assert_eq!(node.get(&15, &snapshot), None);
}
#[test]
fn test_leaf_node_update() {
let mut node: LeafNode<i32, String> = LeafNode::new();
let snapshot = Snapshot::new(TxnId::ZERO, Timestamp(100));
node.insert(10, "v1".to_string(), TxnId(1), Timestamp(1));
assert_eq!(node.get(&10, &snapshot), Some(&"v1".to_string()));
node.insert(10, "v2".to_string(), TxnId(2), Timestamp(2));
assert_eq!(node.get(&10, &snapshot), Some(&"v2".to_string()));
assert_eq!(node.keys.len(), 1);
}
#[test]
fn test_leaf_node_delete() {
let mut node: LeafNode<i32, String> = LeafNode::new();
let snapshot = Snapshot::new(TxnId::ZERO, Timestamp(100));
node.insert(10, "ten".to_string(), TxnId(1), Timestamp(1));
assert!(node.contains(&10, &snapshot));
node.delete(&10, TxnId(2), Timestamp(2));
assert!(!node.contains(&10, &snapshot));
}
#[test]
fn test_leaf_node_split() {
let mut node: LeafNode<i32, String> = LeafNode::new();
for i in 1..=6 {
node.insert(i * 10, format!("v{}", i), TxnId(1), Timestamp(i as u64));
}
let (separator, right) = node.split();
assert!(node.keys.len() < 6);
assert!(!right.keys.is_empty());
assert_eq!(separator, right.keys[0]);
assert_eq!(node.next, Some(right.id));
assert_eq!(right.prev, Some(node.id));
}
#[test]
fn test_leaf_node_range() {
let mut node: LeafNode<i32, String> = LeafNode::new();
let snapshot = Snapshot::new(TxnId::ZERO, Timestamp(100));
for i in 1..=10 {
node.insert(i * 10, format!("v{}", i), TxnId(1), Timestamp(i as u64));
}
let results: Vec<_> = node.range(Some(&25), Some(&75), &snapshot).collect();
assert_eq!(results.len(), 5); }
#[test]
fn test_node_enum() {
let leaf: Node<i32, String> = Node::Leaf(LeafNode::new());
assert!(leaf.is_leaf());
assert!(leaf.as_leaf().is_some());
assert!(leaf.as_internal().is_none());
let internal: Node<i32, String> = Node::Internal(InternalNode::new());
assert!(!internal.is_leaf());
assert!(internal.as_internal().is_some());
assert!(internal.as_leaf().is_none());
}
}