use crate::{
error::{IndexChangeError, PIRes},
id::RecRef,
index::{
config::{IndexOrd, IndexTypeInternal, ValueMode},
tree::PosRef,
},
};
use std::{
cmp::Ordering,
iter::{Peekable, Rev},
ops::Bound,
vec::IntoIter,
};
pub type NodeRef = RecRef;
#[derive(Clone)]
pub enum Node<K, V> {
Node(Nodes<K>),
Leaf(Leaf<K, V>),
}
impl<K, V> Node<K, V> {
pub fn get_prev(&self) -> &Option<K> {
match self {
Node::Node(n) => &n.prev,
Node::Leaf(l) => &l.prev,
}
}
pub fn get_next(&self) -> &Option<K> {
match self {
Node::Node(n) => &n.next,
Node::Leaf(l) => &l.next,
}
}
pub fn len(&self) -> usize {
match self {
Node::Node(n) => n.len(),
Node::Leaf(l) => l.len(),
}
}
}
impl<K: IndexOrd + Clone, V> Node<K, V> {
pub fn merge_right(&mut self, k: K, node: &mut Node<K, V>) {
match self {
Node::Node(n) => match node {
Node::Node(n1) => {
n.merge_right(k, n1);
}
Node::Leaf(_) => {
panic!("impossible merge a leaf to node");
}
},
Node::Leaf(l) => match node {
Node::Node(_) => {
panic!("impossible merge a node to leaf");
}
Node::Leaf(l1) => {
l.merge_right(l1);
}
},
}
}
pub fn split(&mut self, top_limit: usize) -> Vec<(K, Node<K, V>)> {
match self {
Node::Node(n) => n.split(top_limit).into_iter().map(|x| (x.0, Node::Node(x.1))).collect(),
Node::Leaf(l) => l.split(top_limit).into_iter().map(|x| (x.0, Node::Leaf(x.1))).collect(),
}
}
pub fn check_range(&self, k: &K) -> bool {
match self {
Node::Node(n) => n.check_range(k),
Node::Leaf(l) => l.check_range(k),
}
}
}
pub(crate) fn compare<T: IndexOrd>(first: &T, second: &T) -> Ordering {
first.cmp(second)
}
#[derive(Clone)]
pub struct Nodes<K> {
pub keys: Vec<K>,
pub pointers: Vec<NodeRef>,
pub prev: Option<K>,
pub next: Option<K>,
}
impl<K> Nodes<K> {
pub fn len(&self) -> usize {
self.pointers.len()
}
pub fn remove(&mut self, pos: usize) -> Option<NodeRef> {
if pos < self.pointers.len() {
self.keys.remove(pos - 1);
Some(self.pointers.remove(pos))
} else {
None
}
}
}
impl<K: IndexOrd + Clone> Nodes<K> {
pub fn new_from_split(left: NodeRef, values: &[(K, NodeRef)]) -> Nodes<K> {
let keys = values.iter().map(|z| z.0.clone()).collect();
let mut pointers: Vec<NodeRef> = values.iter().map(|z| z.1).collect();
pointers.insert(0, left);
Nodes {
keys,
pointers,
prev: None,
next: None,
}
}
pub fn add(&mut self, pos: usize, k: &K, node_ref: NodeRef) {
self.keys.insert(pos, k.clone());
self.pointers.insert(pos + 1, node_ref);
}
pub fn find(&self, k: &K) -> PosRef<K> {
match self.keys.binary_search_by(|x| compare(x, k)) {
Ok(index) => {
let sibling = Some(self.pointers[index]);
PosRef::new(k, index + 1, self.pointers[index + 1], sibling)
}
Err(index) => {
let sibling = if index > 0 {
Some(self.pointers[index - 1])
} else if self.pointers.len() > index + 1 {
Some(self.pointers[index + 1])
} else {
None
};
PosRef::new(k, index, self.pointers[index], sibling)
}
}
}
pub fn find_write(&self, k: &K) -> Option<PosRef<K>> {
let pos = self.find(k);
if pos.pos == 0 {
if let Some(pk) = &self.prev {
if compare(k, pk) == Ordering::Less {
return None;
}
}
} else if pos.pos == self.pointers.len() {
if let Some(nk) = &self.next {
if compare(k, nk) != Ordering::Less {
return None;
}
}
}
Some(pos)
}
pub fn get_key(&self, pos: usize) -> K {
self.keys[pos].clone()
}
pub fn get(&self, pos: usize) -> NodeRef {
self.pointers[pos]
}
pub fn insert_after(&mut self, pos: usize, values: &mut Vec<(K, NodeRef)>) {
values.reverse();
for (key, node) in values.iter() {
self.add(pos, key, *node);
}
}
pub fn split(&mut self, max: usize) -> Vec<(K, Nodes<K>)> {
let mut split_result: Vec<(K, Nodes<K>)> = Vec::new();
let size = self.keys.len();
let n_split = size / max;
let split_offset = size / (n_split + 1) + 1;
let mut others = self.keys.split_off(split_offset - 1);
let mut other_pointers = self.pointers.split_off(split_offset);
let pre_next = self.next.clone();
while others.len() > max {
let new = others.split_off(split_offset);
let new_pointers = other_pointers.split_off(split_offset);
let key = others.remove(0);
if let Some((_, ref mut x)) = split_result.last_mut() {
x.next = Some(key.clone());
} else {
self.next = Some(key.clone());
}
let leaf = Nodes {
keys: others,
pointers: other_pointers,
prev: Some(key.clone()),
next: None,
};
split_result.push((key, leaf));
others = new;
other_pointers = new_pointers;
}
let key = others.remove(0);
if let Some((_, ref mut x)) = split_result.last_mut() {
x.next = Some(key.clone());
} else {
self.next = Some(key.clone());
}
let leaf = Nodes {
keys: others,
pointers: other_pointers,
prev: Some(key.clone()),
next: pre_next,
};
split_result.push((key, leaf));
split_result
}
#[allow(dead_code)]
pub fn merge_left(&mut self, owner: K, nodes: &mut Nodes<K>) {
let mut keys = std::mem::take(&mut nodes.keys);
let mut pointers = std::mem::take(&mut nodes.pointers);
keys.push(owner);
keys.append(&mut self.keys);
pointers.append(&mut self.pointers);
self.keys = keys;
self.pointers = pointers;
}
pub fn merge_right(&mut self, owner: K, nodes: &mut Nodes<K>) {
self.keys.push(owner);
self.keys.append(&mut nodes.keys);
self.pointers.append(&mut nodes.pointers);
self.next = nodes.next.clone();
}
fn check_range(&self, k: &K) -> bool {
if let Some(x) = &self.prev {
if compare(x, k) == Ordering::Greater {
return false;
}
}
if let Some(x) = &self.next {
if compare(x, k) == Ordering::Less {
return false;
}
}
true
}
}
#[derive(Clone, PartialEq, Debug)]
pub enum Value<V> {
Cluster(Vec<V>),
Single(V),
}
impl<V> IntoIterator for Value<V> {
type Item = V;
type IntoIter = IntoIter<V>;
fn into_iter(self) -> IntoIter<V> {
match self {
Value::Single(v) => vec![v].into_iter(),
Value::Cluster(v) => v.into_iter(),
}
}
}
pub struct PageIter<K, V> {
pub iter: Peekable<IntoIter<LeafEntry<K, V>>>,
}
pub struct PageIterBack<K, V> {
pub iter: Peekable<Rev<IntoIter<LeafEntry<K, V>>>>,
}
#[derive(Clone)]
pub struct Leaf<K, V> {
pub entries: Vec<LeafEntry<K, V>>,
pub prev: Option<K>,
pub next: Option<K>,
}
#[derive(Clone)]
pub struct LeafEntry<K, V> {
pub key: K,
pub value: Value<V>,
}
impl<K, V> Leaf<K, V> {
pub fn new() -> Leaf<K, V> {
Leaf {
entries: Vec::new(),
prev: None,
next: None,
}
}
pub fn len(&self) -> usize {
self.entries.len()
}
#[allow(dead_code)]
pub fn merge_left(&mut self, leaf: &mut Leaf<K, V>) {
let mut entries = std::mem::take(&mut leaf.entries);
entries.append(&mut self.entries);
self.entries = entries;
}
}
impl<K: IndexOrd + Clone, V> Leaf<K, V> {
fn check_range(&self, k: &K) -> bool {
if let Some(x) = &self.prev {
if compare(x, k) == Ordering::Greater {
return false;
}
}
if let Some(x) = &self.next {
if compare(x, k) == Ordering::Less {
return false;
}
}
true
}
pub fn merge_right(&mut self, leaf: &mut Leaf<K, V>) {
self.entries.append(&mut leaf.entries);
self.next = leaf.next.clone();
}
pub fn split(&mut self, max: usize) -> Vec<(K, Leaf<K, V>)> {
let mut split_result: Vec<(K, Leaf<K, V>)> = Vec::new();
let size = self.entries.len();
let n_split = size / max;
let split_offset = size / (n_split + 1) + 1;
let mut others = self.entries.split_off(split_offset);
let pre_next = self.next.clone();
while others.len() > max {
let new = others.split_off(split_offset);
let key = others[0].key.clone();
if let Some((_, ref mut x)) = split_result.last_mut() {
x.next = Some(key.clone());
} else {
self.next = Some(key.clone());
}
let leaf = Leaf {
entries: others,
prev: Some(key.clone()),
next: None,
};
split_result.push((key, leaf));
others = new;
}
let key = others[0].key.clone();
if let Some((_, ref mut x)) = split_result.last_mut() {
x.next = Some(key.clone());
} else {
self.next = Some(key.clone());
}
let leaf = Leaf {
entries: others,
prev: Some(key.clone()),
next: pre_next,
};
split_result.push((key, leaf));
split_result
}
}
impl<K: IndexOrd + Clone, V: Clone> Leaf<K, V> {
pub fn find(&self, k: &K) -> Result<(K, Value<V>), usize> {
self.entries
.binary_search_by(|n| compare(&n.key, k))
.map(|index| (self.entries[index].key.clone(), self.entries[index].value.clone()))
}
pub fn add(&mut self, pos: usize, k: &K, v: &V, _value_mode: ValueMode) {
self.entries.insert(
pos,
LeafEntry {
key: k.clone(),
value: Value::Single(v.clone()),
},
);
}
pub fn iter_from(&self, bound: Bound<&K>) -> IntoIter<LeafEntry<K, V>> {
let index = match bound {
Bound::Included(k) => match self.entries.binary_search_by(|n| compare(&n.key, k)) {
Ok(index) => index,
Err(index) => index,
},
Bound::Excluded(k) => match self.entries.binary_search_by(|n| compare(&n.key, k)) {
Ok(index) => index + 1,
Err(index) => index,
},
Bound::Unbounded => 0,
};
self.entries[index..].to_vec().into_iter()
}
pub fn back_iter_from(&self, bound: Bound<&K>) -> Rev<IntoIter<LeafEntry<K, V>>> {
let index = match bound {
Bound::Included(k) => match self.entries.binary_search_by(|n| compare(&n.key, k)) {
Ok(index) => index + 1,
Err(index) => index,
},
Bound::Excluded(k) => match self.entries.binary_search_by(|n| compare(&n.key, k)) {
Ok(index) => index,
Err(index) => index,
},
Bound::Unbounded => self.len(),
};
self.entries[..index].to_vec().into_iter().rev()
}
}
impl<K: IndexTypeInternal, V: IndexOrd + Clone> Leaf<K, V> {
pub fn insert_or_update(&mut self, k: &K, v: &V, value_mode: ValueMode, index_name: &str) -> PIRes<()> {
match self.entries.binary_search_by(|n| compare(&n.key, k)) {
Ok(index) => {
let entry = &mut self.entries[index];
match value_mode {
ValueMode::Replace => {
entry.value = Value::Single(v.clone());
}
ValueMode::Exclusive => match entry.value {
Value::Single(ref ev) => {
if compare(ev, v) != Ordering::Equal {
return Err(IndexChangeError::IndexDuplicateKey(
index_name.to_string(),
format!("{}", k),
));
}
}
_ => unreachable!("Exclusive leafs never have cluster values"),
},
ValueMode::Cluster => {
let mut new_value = None;
match entry.value {
Value::Single(ref ev) => match compare(ev, v) {
Ordering::Equal => {}
Ordering::Less => {
new_value = Some(Value::Cluster(vec![ev.clone(), v.clone()]));
}
Ordering::Greater => {
new_value = Some(Value::Cluster(vec![v.clone(), ev.clone()]));
}
},
Value::Cluster(ref mut cl) => {
if let Err(index) = cl.binary_search_by(|x| compare(x, v)) {
cl.insert(index, v.clone());
}
}
}
if let Some(v) = new_value {
entry.value = v;
}
}
}
}
Err(index) => self.add(index, k, v, value_mode),
}
Ok(())
}
pub fn remove(&mut self, k: &K, v: &Option<V>) -> bool {
match self.entries.binary_search_by(|n| compare(&n.key, k)) {
Ok(index) => {
if let Some(rv) = v {
let mut removed = false;
let remove_entry = {
let mut new_value = None;
let entry = &mut self.entries[index];
let remove_entry = match &mut entry.value {
Value::Single(val) => {
if compare(val, rv) == Ordering::Equal {
removed = true;
true
} else {
false
}
}
Value::Cluster(ref mut cl) => {
if let Ok(index) = cl.binary_search_by(|x| compare(x, rv)) {
removed = true;
cl.remove(index);
}
if cl.len() == 1 {
new_value = Some(Value::Single(cl.pop().unwrap()));
false
} else {
cl.is_empty()
}
}
};
if let Some(new) = new_value {
entry.value = new;
}
remove_entry
};
if remove_entry {
self.entries.remove(index);
}
removed
} else {
self.entries.remove(index);
true
}
}
Err(_) => false,
}
}
}