pub mod nodes;
use crate::{
error::{PERes, PIRes},
index::{
config::{IndexOrd, ValueMode},
keeper::{IndexKeeper, IndexModify},
tree::nodes::{Leaf, Node, NodeRef, Nodes, PageIter, PageIterBack, Value},
},
};
use std::{cmp::Ordering, ops::Bound, rc::Rc};
use super::config::IndexTypeInternal;
pub enum ValueChange<V> {
Add(V),
Remove(Option<V>),
}
pub struct KeyChanges<K, V> {
pub k: K,
changes: Vec<ValueChange<V>>,
}
impl<K: IndexTypeInternal, V: IndexTypeInternal> KeyChanges<K, V> {
#[allow(dead_code)]
pub fn single_add(k: K, v: V) -> KeyChanges<K, V> {
KeyChanges {
k,
changes: vec![ValueChange::Add(v)],
}
}
#[allow(dead_code)]
fn single_delete(k: K, v: Option<V>) -> KeyChanges<K, V> {
KeyChanges {
k,
changes: vec![ValueChange::Remove(v)],
}
}
pub fn new(k: K, changes: Vec<ValueChange<V>>) -> KeyChanges<K, V> {
KeyChanges { k, changes }
}
fn apply(&self, leaf: &mut Leaf<K, V>, value_mode: ValueMode, index: &str) -> PIRes<bool> {
let mut update = false;
for vc in &self.changes {
update |= match vc {
ValueChange::Add(v) => {
if leaf.len() > 0 {
leaf.insert_or_update(&self.k, v, value_mode.clone(), index)?;
} else {
leaf.add(0, &self.k, v, value_mode.clone());
}
true
}
ValueChange::Remove(r) => leaf.remove(&self.k, r),
};
}
Ok(update)
}
}
pub trait Index<K, V> {
fn get(&mut self, k: &K) -> PERes<Option<Value<V>>>;
fn iter_from(&mut self, first: Bound<&K>) -> PERes<PageIter<K, V>>;
fn back_iter_from(&mut self, first: Bound<&K>) -> PERes<PageIterBack<K, V>>;
}
pub trait IndexApply<K, V>: Index<K, V> {
fn apply(&mut self, adds: &[KeyChanges<K, V>]) -> PIRes<()>;
}
struct Updated<K> {
path: Vec<PosRef<K>>,
}
impl<K> Updated<K> {
fn new() -> Self {
Self { path: Vec::new() }
}
fn push(&mut self, p: PosRef<K>) {
self.path.push(p)
}
fn clear(&mut self) {
self.path.clear();
}
fn last(&self) -> Option<&PosRef<K>> {
self.path.last()
}
}
#[derive(PartialEq, Clone, Debug)]
pub struct PosRef<K> {
pub k: K,
pub pos: usize,
pub node_ref: NodeRef,
pub sibling: Option<NodeRef>,
pub version: u16,
}
impl<K: Clone> PosRef<K> {
pub(crate) fn new(k: &K, pos: usize, node_ref: NodeRef, sibling: Option<NodeRef>) -> PosRef<K> {
PosRef {
k: k.clone(),
pos,
node_ref,
sibling,
version: 0,
}
}
}
struct ParentNodeChanged<K> {
path: Updated<K>,
children: Vec<PosRef<K>>,
}
impl<K> ParentNodeChanged<K> {
fn node_id(&self) -> NodeRef {
self.path.last().unwrap().node_ref
}
fn is_root(&self) -> bool {
self.path.path.is_empty()
}
}
fn split_link_save<K: IndexOrd + Clone, V, I: IndexModify<K, V>>(
index: &mut I,
mut prev_id: NodeRef,
mut first: Node<K, V>,
cur_version: u16,
) -> PIRes<Vec<(K, NodeRef)>> {
let new_nodes = first.split(index.top_limit());
let mut ids = Vec::new();
for (k, new_node) in new_nodes {
let saved_id = index.insert(new_node)?;
ids.push((k, saved_id));
}
if let Node::Leaf(leaf) = first {
let mut prev_leaf = leaf;
let mut prev_version = cur_version;
for (_, id) in &ids {
let (sibling_node, version) = index.load_modify(id)?.unwrap();
if let Node::Leaf(sibling) = index.owned(id, sibling_node) {
index.update(&prev_id, Node::Leaf(prev_leaf), prev_version)?;
prev_id = *id;
prev_leaf = sibling;
prev_version = version;
}
}
index.update(&prev_id, Node::Leaf(prev_leaf), prev_version)?;
} else {
index.update(&prev_id, first, cur_version)?;
}
Ok(ids)
}
fn group_by_parent<K>(updates: Vec<Updated<K>>) -> Vec<ParentNodeChanged<K>> {
let mut parent_updates = Vec::new();
let mut parent_node: Option<NodeRef> = None;
let mut new_update: Option<ParentNodeChanged<K>> = None;
for mut update in updates {
if let Some(last) = update.path.pop() {
if parent_node == update.path.last().map(|x| x.node_ref) {
if let Some(p) = &mut new_update {
p.children.push(last);
}
} else {
if let Some(p) = new_update {
parent_updates.push(p);
}
parent_node = update.path.last().map(|x| x.node_ref);
let children = vec![last];
new_update = Some(ParentNodeChanged { path: update, children });
}
}
}
if let Some(p) = new_update {
parent_updates.push(p);
}
parent_updates
}
fn lock_parents<K: IndexOrd + Clone, V, I: IndexModify<K, V>>(
index: &mut I,
mut updates: Vec<Updated<K>>,
) -> PIRes<Vec<Updated<K>>> {
for update in &mut updates {
loop {
debug_assert!(
update.path.len() >= 2,
"never lock a path that shorter than 2 because that would be the root",
);
let locked = {
let update = &update;
let len = update.path.len();
let rec_ref = &update.path[len - 2];
if let Some((node, version)) = index.load_modify(&rec_ref.node_ref)? {
lock_logic(index, rec_ref, node, version)?.is_some()
} else {
false
}
};
if locked {
break;
} else {
let last = update.path.last().unwrap().clone();
if let Some(ref node) = index.get_root_refresh()? {
let mut path = Vec::new();
let mut cur_node = PosRef::new(&last.k, 0, *node, None);
while cur_node.node_ref != last.node_ref {
let mut restart = true;
if let Some((node_modify, version)) = index.load_modify(&cur_node.node_ref)? {
if let Node::Node(n) = &*node_modify {
cur_node.version = version;
path.push(cur_node.clone());
if let Some(x) = n.find_write(&last.k) {
cur_node = x;
restart = false;
}
}
}
if restart {
if let Some(ref node) = index.get_root_refresh()? {
cur_node = PosRef::new(&last.k, 0, *node, None);
path.clear();
} else {
panic!("restart node finding but not root present");
}
}
}
path.push(last);
update.path = path;
}
}
}
}
Ok(updates)
}
fn merge_and_save<K: IndexOrd + Clone, V, I: IndexModify<K, V>>(
index: &mut I,
parent: &mut Nodes<K>,
pos: usize,
mut cur: Node<K, V>,
version: u16,
) -> PIRes<bool> {
Ok(if pos > 0 {
let node_ref = parent.get(pos - 1);
let (dest_node, dest_version) = index.load_modify(&node_ref)?.unwrap();
let mut dest_merge = index.owned(&node_ref, dest_node);
dest_merge.merge_right(parent.get_key(pos - 1), &mut cur);
index.update(&node_ref, dest_merge, dest_version)?;
index.delete(&parent.remove(pos).unwrap(), version)?;
false
} else {
let node_ref = parent.get(pos + 1);
let (source_node, source_version) = index.load_modify(&node_ref)?.unwrap();
let mut source_merge = index.owned(&node_ref, source_node);
cur.merge_right(parent.get_key(pos), &mut source_merge);
index.delete(&parent.remove(pos + 1).unwrap(), source_version)?;
index.update(&parent.get(pos), cur, version)?;
true
})
}
fn lock_logic<K: IndexOrd + Clone, V, I: IndexModify<K, V>>(
index: &mut I,
cur_node: &PosRef<K>,
node: Rc<Node<K, V>>,
node_version: u16,
) -> PIRes<Option<(Rc<Node<K, V>>, u16)>> {
if let Some(sibling) = &cur_node.sibling {
let sibling_load = if let Some(load) = index.load_modify(sibling)? {
load
} else {
return Ok(None);
};
let ((first, second), (f, s)) = if cur_node.pos > 0 {
((sibling, &cur_node.node_ref), (sibling_load, (node, node_version)))
} else {
((&cur_node.node_ref, sibling), ((node, node_version), sibling_load))
};
let (first_node, first_version) = f;
let (second_node, second_version) = s;
if let (Some(f), Some(s)) = (first_node.get_next(), second_node.get_prev()) {
if f.cmp(s) != Ordering::Equal {
return Ok(None);
}
} else {
return Ok(None);
}
if cur_node.pos == 0 && !first_node.check_range(&cur_node.k) {
return Ok(None);
}
if cur_node.pos > 0 && !second_node.check_range(&cur_node.k) {
return Ok(None);
}
if !index.lock(first, first_version)? {
return Ok(None);
}
if !index.lock(second, second_version)? {
index.unlock(first)?;
return Ok(None);
}
if cur_node.pos > 0 {
Ok(Some((second_node, second_version)))
} else {
Ok(Some((first_node, first_version)))
}
} else if !node.check_range(&cur_node.k) {
Ok(None)
} else if index.lock(&cur_node.node_ref, node_version)? {
Ok(Some((node, node_version)))
} else {
Ok(None)
}
}
fn find_and_change_leaf<K: IndexTypeInternal, V: IndexTypeInternal, I: IndexModify<K, V>>(
add: &KeyChanges<K, V>,
updates: &mut Vec<Updated<K>>,
index: &mut I,
mut prev_leaf_id: Option<NodeRef>,
) -> PIRes<Option<NodeRef>> {
let mut next: Option<PosRef<K>> = None;
let mut path = Updated::new();
loop {
next = if let Some(cur_node) = &mut next {
let mut new_next = None;
if let Some((node_modify, version)) = index.load_modify(&cur_node.node_ref)? {
if let Node::Node(n) = &*node_modify {
cur_node.version = version;
path.push(cur_node.clone());
new_next = n.find_write(&add.k);
} else if let Node::Leaf(ref _none) = &*node_modify {
cur_node.version = version;
if let Some((leaf_modify, version)) = lock_logic(index, cur_node, node_modify, version)? {
path.push(cur_node.clone());
if let Node::Leaf(mut leaf) = index.owned(&cur_node.node_ref, leaf_modify) {
let node_ref = &cur_node.node_ref;
if add.apply(&mut leaf, index.value_mode(), index.index_name())? {
index.update(node_ref, Node::Leaf(leaf), version)?;
if Some(*node_ref) != prev_leaf_id {
updates.push(path);
}
prev_leaf_id = Some(*node_ref)
}
break;
}
}
}
}
new_next
} else if let Some(r) = index.get_root_refresh()? {
path.clear();
Some(PosRef::new(&add.k, 0, r, None))
} else {
index.lock_config()?;
if let Some(r) = index.get_root_refresh()? {
path.clear();
Some(PosRef::new(&add.k, 0, r, None))
} else {
let mut leaf = Leaf::new();
add.apply(&mut leaf, index.value_mode(), index.index_name())?;
let leaf_ref = index.insert(Node::Leaf(leaf))?;
index.set_root(Some(leaf_ref))?;
break;
}
}
}
Ok(prev_leaf_id)
}
fn merge_and_split_children<K: IndexOrd + Clone, V: Clone, I: IndexModify<K, V>>(
mut n: Nodes<K>,
children: Vec<PosRef<K>>,
index: &mut I,
) -> PIRes<Option<Nodes<K>>> {
let mut save = false;
let mut changed_flags = vec![false; n.len()];
for ch in children {
let pos = n.find(&ch.k);
changed_flags[pos.pos] = true;
if n.len() > 1 {
let cur_id = pos.node_ref;
let (cur, version) = index.load_modify(&cur_id)?.unwrap();
if cur.len() < index.bottom_limit() {
let mut new_pos = pos.pos;
let to_modify = index.owned(&cur_id, cur);
if merge_and_save(index, &mut n, pos.pos, to_modify, version)? {
new_pos += 1;
}
changed_flags.remove(new_pos);
changed_flags[new_pos - 1] = true;
save = true;
}
}
}
for pos in 0..n.len() {
if changed_flags[pos] {
let cur_id = n.get(pos);
let (cur, cur_version) = index.load_modify(&cur_id)?.unwrap();
if cur.len() > index.top_limit() {
let to_modify = index.owned(&cur_id, cur);
let mut ids = split_link_save(index, n.get(pos), to_modify, cur_version)?;
for _ in 0..ids.len() {
changed_flags.insert(pos, false);
}
n.insert_after(pos, &mut ids);
save = true;
}
}
}
if save {
Ok(Some(n))
} else {
Ok(None)
}
}
impl<K: IndexTypeInternal, V: IndexTypeInternal, T> IndexApply<K, V> for T
where
T: IndexModify<K, V>,
T: Index<K, V>,
{
fn apply(&mut self, adds: &[KeyChanges<K, V>]) -> PIRes<()> {
let mut updates = Vec::new();
let mut prev_leaf_id = None;
for add in adds {
prev_leaf_id = find_and_change_leaf(add, &mut updates, self, prev_leaf_id)?;
}
while updates.len() > 1 || (updates.len() == 1 && updates[0].path.len() > 1) {
updates = lock_parents(self, updates)?;
let parent_updates = group_by_parent(updates);
updates = Vec::new();
for update in parent_updates {
let parent_id = update.node_id();
let (read_node, n_version) = self.load_modify(&parent_id)?.unwrap();
if let Node::Node(n) = self.owned(&parent_id, read_node) {
let is_root = update.is_root();
if let Some(n) = merge_and_split_children(n, update.children, self)? {
self.update(&parent_id, Node::Node(n), n_version)?;
if !is_root {
updates.push(update.path);
} else {
break;
}
}
}
}
}
if updates.len() == 1 && updates[0].path.len() == 1 {
self.lock_config()?;
while let Some(r) = self.get_root_refresh()? {
if let Some((n, n_version)) = self.load_modify(&r)? {
if n.len() > self.top_limit() {
let to_modify = self.owned(&r, n);
let ids = split_link_save(self, r, to_modify, n_version)?;
let node = Node::Node(Nodes::new_from_split(r, &ids));
let cur_id = self.insert(node)?;
self.set_root(Some(cur_id))?;
} else if n.len() == 1 {
if let Node::Node(cn) = &*n {
self.delete(&r, n_version)?;
self.set_root(Some(cn.get(0)))?;
} else {
break;
}
} else if n.len() == 0 {
self.set_root(None)?;
} else {
break;
}
}
}
}
Ok(())
}
}
impl<K: IndexTypeInternal, V: IndexTypeInternal, T> Index<K, V> for T
where
T: IndexKeeper<K, V>,
{
fn get(&mut self, k: &K) -> PERes<Option<Value<V>>> {
Ok(if let Some(node) = self.get_root()? {
let mut cur_node = node;
loop {
match self.load(&cur_node)? {
Node::Node(n) => {
cur_node = n.find(k).node_ref;
}
Node::Leaf(leaf) => {
break leaf.find(k).map(|el| el.1).ok();
}
}
}
} else {
None
})
}
fn iter_from(&mut self, first: Bound<&K>) -> PERes<PageIter<K, V>> {
let mut path = Vec::new();
let mut iter = if let Some(mut cur_node) = self.get_root()? {
path.push((0, cur_node));
loop {
match self.load(&cur_node)? {
Node::Node(n) => {
let value = match first {
Bound::Included(f) => {
let found = n.find(f);
(found.pos, found.node_ref)
}
Bound::Excluded(f) => {
let found = n.find(f);
(found.pos, found.node_ref)
}
Bound::Unbounded => (0, n.get(0)),
};
cur_node = value.1;
path.push(value);
}
Node::Leaf(leaf) => {
break PageIter {
iter: leaf.iter_from(first).peekable(),
};
}
}
}
} else {
PageIter {
iter: Vec::new().into_iter().peekable(),
}
};
while iter.iter.peek().is_none() {
let prev = path.pop();
if let Some((_, node)) = path.last() {
let (pos, _) = prev.unwrap();
match self.load(node)? {
Node::Node(n) => {
if n.len() > pos + 1 {
let mut cur_node = n.get(pos + 1);
loop {
match self.load(&cur_node)? {
Node::Node(n) => {
cur_node = n.get(0);
}
Node::Leaf(leaf) => {
iter = PageIter {
iter: leaf.iter_from(first).peekable(),
};
break;
}
}
}
}
}
Node::Leaf(_leaf) => {
panic!("can't happen");
}
}
} else {
break;
}
}
Ok(iter)
}
fn back_iter_from(&mut self, last: Bound<&K>) -> PERes<PageIterBack<K, V>> {
let mut path = Vec::new();
let mut iter = if let Some(mut cur_node) = self.get_root()? {
path.push((0, cur_node));
loop {
match self.load(&cur_node)? {
Node::Node(n) => {
let value = match last {
Bound::Included(f) => {
let found = n.find(f);
(found.pos, found.node_ref)
}
Bound::Excluded(f) => {
let found = n.find(f);
(found.pos, found.node_ref)
}
Bound::Unbounded => {
let pos = n.len() - 1;
(pos, n.get(pos))
}
};
cur_node = value.1;
path.push(value);
}
Node::Leaf(leaf) => {
break PageIterBack {
iter: leaf.back_iter_from(last).peekable(),
};
}
}
}
} else {
PageIterBack {
iter: Vec::new().into_iter().rev().peekable(),
}
};
while iter.iter.peek().is_none() {
let prev = path.pop();
if let Some((_, node)) = path.last() {
let (pos, _) = prev.unwrap();
match self.load(node)? {
Node::Node(n) => {
if pos > 0 {
let mut cur_node = n.get(pos - 1);
loop {
match self.load(&cur_node)? {
Node::Node(n) => {
cur_node = n.get(n.len() - 1);
}
Node::Leaf(leaf) => {
iter = PageIterBack {
iter: leaf.back_iter_from(last).peekable(),
};
break;
}
}
}
}
}
Node::Leaf(_leaf) => {
panic!("can't happen");
}
}
} else {
break;
}
}
Ok(iter)
}
}
#[cfg(test)]
mod tests;