use crate::{
error::{IndexChangeError, PIRes},
id::RecRef,
index::{
config::{IndexOrd, IndexTypeInternal, ValueMode},
tree::PosRef,
},
};
use std::{
cmp::Ordering,
iter::{Peekable, Rev},
ops::Bound,
vec::IntoIter,
};
pub type NodeRef = RecRef;
#[derive(Clone)]
pub enum Node<K, V> {
Node(Nodes<K>),
Leaf(Leaf<K, V>),
}
impl<K, V> Node<K, V> {
pub fn get_prev(&self) -> &Option<K> {
match self {
Node::Node(n) => &n.prev,
Node::Leaf(l) => &l.prev,
}
}
pub fn get_next(&self) -> &Option<K> {
match self {
Node::Node(n) => &n.next,
Node::Leaf(l) => &l.next,
}
}
pub fn len(&self) -> usize {
match self {
Node::Node(n) => n.len(),
Node::Leaf(l) => l.len(),
}
}
}
impl<K: IndexOrd + Clone, V> Node<K, V> {
pub fn merge_right(&mut self, k: K, node: &mut Node<K, V>) {
match self {
Node::Node(n) => match node {
Node::Node(n1) => {
n.merge_right(k, n1);
}
Node::Leaf(_) => {
panic!("impossible merge a leaf to node");
}
},
Node::Leaf(l) => match node {
Node::Node(_) => {
panic!("impossible merge a node to leaf");
}
Node::Leaf(l1) => {
l.merge_right(l1);
}
},
}
}
pub fn split(&mut self, top_limit: usize) -> Vec<(K, Node<K, V>)> {
match self {
Node::Node(n) => n.split(top_limit).into_iter().map(|x| (x.0, Node::Node(x.1))).collect(),
Node::Leaf(l) => l.split(top_limit).into_iter().map(|x| (x.0, Node::Leaf(x.1))).collect(),
}
}
pub fn check_range(&self, k: &K) -> bool {
match self {
Node::Node(n) => n.check_range(k),
Node::Leaf(l) => l.check_range(k),
}
}
}
pub(crate) fn compare<T: IndexOrd>(first: &T, second: &T) -> Ordering {
first.cmp(second)
}
#[derive(Clone)]
pub struct Nodes<K> {
pub keys: Vec<K>,
pub pointers: Vec<NodeRef>,
pub prev: Option<K>,
pub next: Option<K>,
}
impl<K> Nodes<K> {
pub fn len(&self) -> usize {
self.pointers.len()
}
pub fn remove(&mut self, pos: usize) -> Option<NodeRef> {
if pos < self.pointers.len() {
self.keys.remove(pos - 1);
Some(self.pointers.remove(pos))
} else {
None
}
}
}
impl<K: IndexOrd + Clone> Nodes<K> {
pub fn new_from_split(left: NodeRef, values: &[(K, NodeRef)]) -> Nodes<K> {
let keys = values.iter().map(|z| z.0.clone()).collect();
let mut pointers: Vec<NodeRef> = values.iter().map(|z| z.1).collect();
pointers.insert(0, left);
Nodes {
keys,
pointers,
prev: None,
next: None,
}
}
pub fn add(&mut self, pos: usize, k: &K, node_ref: NodeRef) {
self.keys.insert(pos, k.clone());
self.pointers.insert(pos + 1, node_ref);
}
pub fn find(&self, k: &K) -> PosRef<K> {
match self.keys.binary_search_by(|x| compare(x, k)) {
Ok(index) => {
let sibling = Some(self.pointers[index]);
PosRef::new(k, index + 1, self.pointers[index + 1], sibling)
}
Err(index) => {
let sibling = if index > 0 {
Some(self.pointers[index - 1])
} else if self.pointers.len() > index + 1 {
Some(self.pointers[index + 1])
} else {
None
};
PosRef::new(k, index, self.pointers[index], sibling)
}
}
}
pub fn find_write(&self, k: &K) -> Option<PosRef<K>> {
let pos = self.find(k);
if pos.pos == 0 {
if let Some(pk) = &self.prev {
if compare(k, pk) == Ordering::Less {
return None;
}
}
} else if pos.pos == self.pointers.len() {
if let Some(nk) = &self.next {
if compare(k, nk) != Ordering::Less {
return None;
}
}
}
Some(pos)
}
pub fn get_key(&self, pos: usize) -> K {
self.keys[pos].clone()
}
pub fn get(&self, pos: usize) -> NodeRef {
self.pointers[pos]
}
pub fn insert_after(&mut self, pos: usize, values: &mut Vec<(K, NodeRef)>) {
values.reverse();
for (key, node) in values.iter() {
self.add(pos, key, *node);
}
}
pub fn split(&mut self, max: usize) -> Vec<(K, Nodes<K>)> {
let mut split_result: Vec<(K, Nodes<K>)> = Vec::new();
let size = self.keys.len();
let n_split = size / max;
let split_offset = size / (n_split + 1) + 1;
let mut others = self.keys.split_off(split_offset - 1);
let mut other_pointers = self.pointers.split_off(split_offset);
let pre_next = self.next.clone();
while others.len() > max {
let new = others.split_off(split_offset);
let new_pointers = other_pointers.split_off(split_offset);
let key = others.remove(0);
if let Some((_, ref mut x)) = split_result.last_mut() {
x.next = Some(key.clone());
} else {
self.next = Some(key.clone());
}
let leaf = Nodes {
keys: others,
pointers: other_pointers,
prev: Some(key.clone()),
next: None,
};
split_result.push((key, leaf));
others = new;
other_pointers = new_pointers;
}
let key = others.remove(0);
if let Some((_, ref mut x)) = split_result.last_mut() {
x.next = Some(key.clone());
} else {
self.next = Some(key.clone());
}
let leaf = Nodes {
keys: others,
pointers: other_pointers,
prev: Some(key.clone()),
next: pre_next,
};
split_result.push((key, leaf));
split_result
}
#[allow(dead_code)]
pub fn merge_left(&mut self, owner: K, nodes: &mut Nodes<K>) {
let mut keys = std::mem::take(&mut nodes.keys);
let mut pointers = std::mem::take(&mut nodes.pointers);
keys.push(owner);
keys.append(&mut self.keys);
pointers.append(&mut self.pointers);
self.keys = keys;
self.pointers = pointers;
}
pub fn merge_right(&mut self, owner: K, nodes: &mut Nodes<K>) {
self.keys.push(owner);
self.keys.append(&mut nodes.keys);
self.pointers.append(&mut nodes.pointers);
self.next = nodes.next.clone();
}
fn check_range(&self, k: &K) -> bool {
if let Some(x) = &self.prev {
if compare(x, k) == Ordering::Greater {
return false;
}
}
if let Some(x) = &self.next {
if compare(x, k) == Ordering::Less {
return false;
}
}
true
}
}
#[derive(Clone, PartialEq, Debug)]
pub enum Value<V> {
Cluster(Vec<V>),
Single(V),
}
impl<V> IntoIterator for Value<V> {
type Item = V;
type IntoIter = IntoIter<V>;
fn into_iter(self) -> IntoIter<V> {
match self {
Value::Single(v) => vec![v].into_iter(),
Value::Cluster(v) => v.into_iter(),
}
}
}
pub struct PageIter<K, V> {
pub iter: Peekable<IntoIter<LeafEntry<K, V>>>,
}
pub struct PageIterBack<K, V> {
pub iter: Peekable<Rev<IntoIter<LeafEntry<K, V>>>>,
}
#[derive(Clone)]
pub struct Leaf<K, V> {
pub entries: Vec<LeafEntry<K, V>>,
pub prev: Option<K>,
pub next: Option<K>,
}
#[derive(Clone)]
pub struct LeafEntry<K, V> {
pub key: K,
pub value: Value<V>,
}
impl<K, V> Leaf<K, V> {
pub fn new() -> Leaf<K, V> {
Leaf {
entries: Vec::new(),
prev: None,
next: None,
}
}
pub fn len(&self) -> usize {
self.entries.len()
}
#[allow(dead_code)]
pub fn merge_left(&mut self, leaf: &mut Leaf<K, V>) {
let mut entries = std::mem::take(&mut leaf.entries);
entries.append(&mut self.entries);
self.entries = entries;
}
}
impl<K: IndexOrd + Clone, V> Leaf<K, V> {
fn check_range(&self, k: &K) -> bool {
if let Some(x) = &self.prev {
if compare(x, k) == Ordering::Greater {
return false;
}
}
if let Some(x) = &self.next {
if compare(x, k) == Ordering::Less {
return false;
}
}
true
}
pub fn merge_right(&mut self, leaf: &mut Leaf<K, V>) {
self.entries.append(&mut leaf.entries);
self.next = leaf.next.clone();
}
pub fn split(&mut self, max: usize) -> Vec<(K, Leaf<K, V>)> {
let mut split_result: Vec<(K, Leaf<K, V>)> = Vec::new();
let size = self.entries.len();
let n_split = size / max;
let split_offset = size / (n_split + 1) + 1;
let mut others = self.entries.split_off(split_offset);
let pre_next = self.next.clone();
while others.len() > max {
let new = others.split_off(split_offset);
let key = others[0].key.clone();
if let Some((_, ref mut x)) = split_result.last_mut() {
x.next = Some(key.clone());
} else {
self.next = Some(key.clone());
}
let leaf = Leaf {
entries: others,
prev: Some(key.clone()),
next: None,
};
split_result.push((key, leaf));
others = new;
}
let key = others[0].key.clone();
if let Some((_, ref mut x)) = split_result.last_mut() {
x.next = Some(key.clone());
} else {
self.next = Some(key.clone());
}
let leaf = Leaf {
entries: others,
prev: Some(key.clone()),
next: pre_next,
};
split_result.push((key, leaf));
split_result
}
}
impl<K: IndexOrd + Clone, V: Clone> Leaf<K, V> {
pub fn find(&self, k: &K) -> Result<(K, Value<V>), usize> {
self.entries
.binary_search_by(|n| compare(&n.key, k))
.map(|index| (self.entries[index].key.clone(), self.entries[index].value.clone()))
}
pub fn add(&mut self, pos: usize, k: &K, v: &V, _value_mode: ValueMode) {
self.entries.insert(
pos,
LeafEntry {
key: k.clone(),
value: Value::Single(v.clone()),
},
);
}
pub fn iter_from(&self, bound: Bound<&K>) -> IntoIter<LeafEntry<K, V>> {
let index = match bound {
Bound::Included(k) => match self.entries.binary_search_by(|n| compare(&n.key, k)) {
Ok(index) => index,
Err(index) => index,
},
Bound::Excluded(k) => match self.entries.binary_search_by(|n| compare(&n.key, k)) {
Ok(index) => index + 1,
Err(index) => index,
},
Bound::Unbounded => 0,
};
self.entries[index..].to_vec().into_iter()
}
pub fn back_iter_from(&self, bound: Bound<&K>) -> Rev<IntoIter<LeafEntry<K, V>>> {
let index = match bound {
Bound::Included(k) => match self.entries.binary_search_by(|n| compare(&n.key, k)) {
Ok(index) => index + 1,
Err(index) => index,
},
Bound::Excluded(k) => match self.entries.binary_search_by(|n| compare(&n.key, k)) {
Ok(index) => index,
Err(index) => index,
},
Bound::Unbounded => self.len(),
};
self.entries[..index].to_vec().into_iter().rev()
}
}
impl<K: IndexTypeInternal, V: IndexOrd + Clone> Leaf<K, V> {
pub fn insert_or_update(&mut self, k: &K, v: &V, value_mode: ValueMode, index_name: &str) -> PIRes<()> {
match self.entries.binary_search_by(|n| compare(&n.key, k)) {
Ok(index) => {
let entry = &mut self.entries[index];
match value_mode {
ValueMode::Replace => {
entry.value = Value::Single(v.clone());
}
ValueMode::Exclusive => match entry.value {
Value::Single(ref ev) => {
if compare(ev, v) != Ordering::Equal {
return Err(IndexChangeError::IndexDuplicateKey(
index_name.to_string(),
format!("{}", k),
));
}
}
_ => unreachable!("Exclusive leafs never have cluster values"),
},
ValueMode::Cluster => {
let mut new_value = None;
match entry.value {
Value::Single(ref ev) => match compare(ev, v) {
Ordering::Equal => {}
Ordering::Less => {
new_value = Some(Value::Cluster(vec![ev.clone(), v.clone()]));
}
Ordering::Greater => {
new_value = Some(Value::Cluster(vec![v.clone(), ev.clone()]));
}
},
Value::Cluster(ref mut cl) => {
if let Err(index) = cl.binary_search_by(|x| compare(x, v)) {
cl.insert(index, v.clone());
}
}
}
if let Some(v) = new_value {
entry.value = v;
}
}
}
}
Err(index) => self.add(index, k, v, value_mode),
}
Ok(())
}
pub fn remove(&mut self, k: &K, v: &Option<V>) -> bool {
match self.entries.binary_search_by(|n| compare(&n.key, k)) {
Ok(index) => {
if let Some(rv) = v {
let mut removed = false;
let remove_entry = {
let mut new_value = None;
let entry = &mut self.entries[index];
let remove_entry = match &mut entry.value {
Value::Single(val) => {
if compare(val, rv) == Ordering::Equal {
removed = true;
true
} else {
false
}
}
Value::Cluster(ref mut cl) => {
if let Ok(index) = cl.binary_search_by(|x| compare(x, rv)) {
removed = true;
cl.remove(index);
}
if cl.len() == 1 {
new_value = Some(Value::Single(cl.pop().unwrap()));
false
} else {
cl.is_empty()
}
}
};
if let Some(new) = new_value {
entry.value = new;
}
remove_entry
};
if remove_entry {
self.entries.remove(index);
}
removed
} else {
self.entries.remove(index);
true
}
}
Err(_) => false,
}
}
}
#[cfg(test)]
mod tests {
use super::{Leaf, NodeRef, Nodes, PosRef, Value, ValueMode};
use crate::id::RecRef;
use rand::random;
fn random_pointer() -> NodeRef {
RecRef::new(random::<u64>(), random::<u32>())
}
#[test]
fn single_node_add_test() {
let val1 = random_pointer();
let val2 = random_pointer();
let val3 = random_pointer();
let val4 = random_pointer();
let val5 = random_pointer();
let val6 = random_pointer();
let mut node = Nodes::new_from_split(val1, &[(0, val2)]);
let pos = node.find(&2).pos;
node.add(pos, &2, val3.clone());
let pos = node.find(&5).pos;
node.add(pos, &5, val4.clone());
let pos = node.find(&6).pos;
node.add(pos, &6, val5);
let pos = node.find(&4).pos;
node.add(pos, &4, val6.clone());
let found = node.find(&4);
assert_eq!(found.pos, 3);
assert_eq!(found.node_ref, val6);
let found = node.find(&5);
assert_eq!(found.pos, 4);
assert_eq!(found.node_ref, val4);
let found = node.find(&3);
assert_eq!(found.pos, 2);
assert_eq!(found.node_ref, val3);
}
#[test]
fn single_leaf_insert_test() {
let mut leaf = Leaf::new();
for n in 0..50 {
leaf.insert_or_update(&n, &n, ValueMode::Replace, "aa")
.expect("insert is ok");
}
let res = leaf.find(&10);
assert_eq!(Ok((10, Value::Single(10))), res);
let res = leaf.find(&60);
assert_eq!(Err(50), res);
}
#[test]
fn single_leaf_cluster_insert_test() {
let mut leaf = Leaf::new();
leaf.insert_or_update(&10, &1, ValueMode::Cluster, "aa")
.expect("insert is ok");
leaf.insert_or_update(&10, &2, ValueMode::Cluster, "aa")
.expect("insert is ok");
let res = leaf.find(&10);
assert_eq!(Ok((10, Value::Cluster(vec![1, 2]))), res);
}
#[test]
fn leaf_cluster_remove_test() {
let mut leaf = Leaf::new();
leaf.insert_or_update(&10, &1, ValueMode::Cluster, "aa")
.expect("insert is ok");
leaf.insert_or_update(&10, &2, ValueMode::Cluster, "aa")
.expect("insert is ok");
assert!(leaf.remove(&10, &Some(2)));
let res = leaf.find(&10);
assert_eq!(Ok((10, Value::Single(1))), res);
}
#[test]
fn leaf_cluster_remove_not_exist_value_test() {
let mut leaf = Leaf::new();
leaf.insert_or_update(&10, &1, ValueMode::Cluster, "aa")
.expect("insert is ok");
leaf.insert_or_update(&10, &2, ValueMode::Cluster, "aa")
.expect("insert is ok");
assert!(!leaf.remove(&10, &Some(10)));
let res = leaf.find(&10);
assert_eq!(Ok((10, Value::Cluster(vec![1, 2]))), res);
}
#[test]
fn leaf_single_delete_not_exist_value_test() {
let mut leaf = Leaf::new();
leaf.insert_or_update(&10, &1, ValueMode::Exclusive, "aa")
.expect("insert is ok");
assert!(!leaf.remove(&10, &Some(10)));
let res = leaf.find(&10);
assert_eq!(Ok((10, Value::Single(1))), res);
}
#[test]
fn leaf_duplicate_key_test() {
let mut leaf = Leaf::new();
leaf.insert_or_update(&10, &1, ValueMode::Exclusive, "aa")
.expect("insert is ok");
let res = leaf.insert_or_update(&10, &2, ValueMode::Exclusive, "aa");
assert!(res.is_err());
}
#[test]
fn test_leaf_split() {
let mut leaf = Leaf::new();
for n in 0..103 {
leaf.insert_or_update(&n, &n, ValueMode::Replace, "aa")
.expect("insert is ok");
}
let res = leaf.split(21);
assert_eq!(leaf.len(), 21);
assert_eq!(res[0].1.len(), 21);
assert_eq!(res[1].1.len(), 21);
assert_eq!(res[2].1.len(), 21);
assert_eq!(res[3].1.len(), 19);
}
#[test]
fn test_node_split() {
let mut node = Nodes::new_from_split(random_pointer(), &[(0, random_pointer())]);
for n in 1..103 {
let pos = node.find(&n).pos;
node.add(pos, &n, random_pointer());
}
let res = node.split(21);
assert_eq!(node.len(), 21);
assert_eq!(node.pointers.len(), 21);
assert_eq!(node.keys.len(), 20);
assert_eq!(res[0].1.len(), 21);
assert_eq!(res[0].1.pointers.len(), 21);
assert_eq!(res[0].1.keys.len(), 20);
assert_eq!(res[1].1.len(), 21);
assert_eq!(res[1].1.pointers.len(), 21);
assert_eq!(res[1].1.keys.len(), 20);
assert_eq!(res[2].1.len(), 21);
assert_eq!(res[2].1.pointers.len(), 21);
assert_eq!(res[2].1.keys.len(), 20);
assert_eq!(res[3].1.len(), 20);
assert_eq!(res[3].1.pointers.len(), 20);
assert_eq!(res[3].1.keys.len(), 19);
}
#[test]
fn test_remove_from_leaf() {
let mut leaf = Leaf::new();
for n in 0..50 {
leaf.insert_or_update(&n, &n, ValueMode::Replace, "aa")
.expect("insert is ok");
}
assert!(leaf.remove(&10, &Some(10)));
assert!(!leaf.remove(&100, &Some(100)));
assert_eq!(leaf.len(), 49);
let res = leaf.find(&10);
assert_eq!(Err(10), res);
}
#[test]
fn test_remove_from_node() {
let mut node = Nodes::new_from_split(random_pointer(), &[(0, random_pointer())]);
let mut keep_pre = None;
let mut keep = None;
for n in 1..50 {
let pos = node.find(&n).pos;
let point = random_pointer();
if n == 8 {
keep_pre = Some(point.clone());
}
if n == 9 {
keep = Some(point.clone());
}
node.add(pos, &n, point);
}
let pos = node.find(&10).pos;
node.remove(pos);
assert_eq!(node.len(), 50);
let res = node.find(&10);
assert_eq!(PosRef::new(&10, 10, keep.unwrap(), keep_pre), res);
}
#[test]
fn test_merge_leaf() {
let mut leaf = Leaf::new();
let mut leaf2 = Leaf::new();
for n in 0..20 {
leaf.insert_or_update(&n, &n, ValueMode::Replace, "aa")
.expect("insert is ok");
}
for n in 20..40 {
leaf2
.insert_or_update(&n, &n, ValueMode::Replace, "aa")
.expect("insert is ok");
}
leaf.merge_right(&mut leaf2);
assert_eq!(leaf.len(), 40);
assert_eq!(leaf2.len(), 0);
let res = leaf.find(&35);
assert_eq!(res, Ok((35, Value::Single(35))));
let mut leaf = Leaf::new();
let mut leaf2 = Leaf::new();
for n in 20..40 {
leaf.insert_or_update(&n, &n, ValueMode::Replace, "aa")
.expect("insert is ok");
}
for n in 0..20 {
leaf2
.insert_or_update(&n, &n, ValueMode::Replace, "aa")
.expect("insert is ok");
}
leaf.merge_left(&mut leaf2);
assert_eq!(leaf.len(), 40);
assert_eq!(leaf2.len(), 0);
let res = leaf.find(&35);
assert_eq!(res, Ok((35, Value::Single(35))));
}
#[test]
fn test_merge_nodes() {
let mut node = Nodes::new_from_split(random_pointer(), &[(0, random_pointer())]);
for n in 1..20 {
let pos = node.find(&n).pos;
let point = random_pointer();
node.add(pos, &n, point);
}
let mut node2 = Nodes::new_from_split(random_pointer(), &[(21, random_pointer())]);
let mut keep_pre = None;
let mut keep = None;
for n in 22..40 {
let pos = node2.find(&n).pos;
let point = random_pointer();
if n == 25 {
keep_pre = Some(point.clone());
}
if n == 26 {
keep = Some(point.clone());
}
node2.add(pos, &n, point);
}
node.merge_right(20, &mut node2);
assert_eq!(node.len(), 41);
assert_eq!(node2.len(), 0);
let res = node.find(&26);
assert_eq!(PosRef::new(&26, 27, keep.unwrap(), keep_pre), res);
let mut node = Nodes::new_from_split(random_pointer(), &[(21, random_pointer())]);
let mut keep_pre = None;
let mut keep = None;
for n in 22..40 {
let pos = node.find(&n).pos;
let point = random_pointer();
if n == 25 {
keep_pre = Some(point.clone());
}
if n == 26 {
keep = Some(point.clone());
}
node.add(pos, &n, point);
}
let mut node2 = Nodes::new_from_split(random_pointer(), &[(0, random_pointer())]);
for n in 1..20 {
let pos = node2.find(&n).pos;
let point = random_pointer();
node2.add(pos, &n, point);
}
node.merge_left(20, &mut node2);
assert_eq!(node.len(), 41);
assert_eq!(node2.len(), 0);
let res = node.find(&26);
assert_eq!(PosRef::new(&26, 27, keep.unwrap(), keep_pre), res);
}
}