use super::{
DynChildPtr, DynTreeNode, DynTreeNodeUpdate, Prefix, RADIX_BITS, RadixTreeCursor,
RadixTreeFactories,
};
use crate::{
DBData, DynZWeight,
algebra::ZCursor,
dynamic::{ClonableTrait, DataTrait, DynDataTyped, DynOpt, DynVec},
operator::dynamic::aggregate::{AggCombineFunc, DynAggregator},
trace::cursor::CursorGroup,
};
use dyn_clone::clone_box;
use num::PrimInt;
use std::mem::size_of;
#[derive(Clone)]
struct StackFrame {
update_index: usize,
slot_index: usize,
}
impl StackFrame {
fn new(update_index: usize, slot_index: usize) -> Self {
Self {
update_index,
slot_index,
}
}
}
struct TreeUpdater<'a, TS, A, TC>
where
A: DataTrait + ?Sized,
TS: DBData + PrimInt,
{
factories: &'a RadixTreeFactories<TS, A>,
tree_cursor: TC,
stack: Vec<StackFrame>,
updates: &'a mut DynVec<DynTreeNodeUpdate<TS, A>>,
combine: &'a dyn AggCombineFunc<A>,
tree_node_update: Box<DynTreeNodeUpdate<TS, A>>,
}
impl<'a, TS, A, TC> TreeUpdater<'a, TS, A, TC>
where
TS: PrimInt + DBData,
A: DataTrait + ?Sized,
TC: RadixTreeCursor<TS, A>,
{
fn new(
factories: &'a RadixTreeFactories<TS, A>,
tree_cursor: TC,
updates: &'a mut DynVec<DynTreeNodeUpdate<TS, A>>,
combine: &'a dyn AggCombineFunc<A>,
) -> Self {
debug_assert!(updates.is_empty());
let mut res = Self {
factories,
tree_cursor,
stack: Vec::with_capacity((size_of::<TS>() * 8) / RADIX_BITS as usize),
updates,
combine,
tree_node_update: factories.node_update_factory.default_box(),
};
if res.tree_cursor.key_valid() {
debug_assert!(res.tree_cursor.val_valid() && **res.tree_cursor.weight() != 0);
debug_assert_eq!(**res.tree_cursor.key(), Prefix::full_range());
res.push_existing(
Prefix::full_range(),
&mut *clone_box(res.tree_cursor.val()),
0,
)
} else {
res.push_new(
Prefix::full_range(),
&mut *factories.node_factory.default_box(),
0,
);
};
res
}
fn finish(mut self) {
while !self.stack.is_empty() {
self.pop(
&mut *self.factories.child_ptr_factory.default_box(),
&mut *self.factories.opt_aggregate_factory.default_box(),
);
}
self.updates
.sort_unstable_by(&|upd1, upd2| upd1.prefix().cmp(&upd2.prefix()));
}
fn stack_top(&self) -> &StackFrame {
self.stack.last().unwrap()
}
fn stack_top_mut(&mut self) -> &mut StackFrame {
self.stack.last_mut().unwrap()
}
fn range(&self) -> Prefix<TS> {
let StackFrame {
update_index,
slot_index,
} = self.stack_top();
self.updates[*update_index].prefix().extend(*slot_index)
}
fn node_prefix(&self) -> Prefix<TS> {
self.updates[self.stack_top().update_index].prefix()
}
fn node(&self) -> &DynTreeNode<TS, A> {
let update_index = self.stack_top().update_index;
self.updates[update_index].new().get().unwrap()
}
fn remove_node(&mut self) {
let update_index = self.stack_top().update_index;
self.updates[update_index].new_mut().set_none();
}
fn slot(&self) -> &DynOpt<DynChildPtr<TS, A>> {
let StackFrame {
update_index,
slot_index,
} = self.stack_top();
self.updates[*update_index]
.new()
.get()
.unwrap()
.slot(*slot_index)
}
fn slot_mut(&mut self) -> &mut DynOpt<DynChildPtr<TS, A>> {
let StackFrame {
update_index,
slot_index,
} = self.stack_top().clone();
self.updates[update_index]
.new_mut()
.get_mut()
.unwrap()
.slot_mut(slot_index)
}
fn prefix(&self) -> Option<Prefix<TS>> {
self.slot().get().map(|ptr| ptr.child_prefix())
}
fn pop(&mut self, tmp_child_ptr: &mut DynChildPtr<TS, A>, tmp_agg: &mut DynOpt<A>) {
let occupied_slots = self.node().occupied_slots();
if occupied_slots == 0 {
self.remove_node();
self.stack.pop();
if !self.stack.is_empty() {
self.slot_mut().set_none();
}
} else if self.stack.len() == 1 {
self.stack.pop();
} else if occupied_slots == 1 && self.stack.len() > 1 {
self.node()
.first_occupied_slot()
.unwrap()
.clone_to(tmp_child_ptr);
self.remove_node();
self.stack.pop();
self.slot_mut().from_val(tmp_child_ptr);
} else {
tmp_agg.set_none();
self.node().aggregate(self.combine, tmp_agg);
self.stack.pop();
tmp_agg
.get_mut()
.unwrap()
.move_to(self.slot_mut().get_mut().unwrap().child_agg_mut());
}
}
fn push_new(&mut self, prefix: Prefix<TS>, node: &mut DynTreeNode<TS, A>, slot: usize) {
let frame = StackFrame::new(self.updates.len(), slot);
self.tree_node_update.from_new_node(prefix, node);
self.updates.push_val(&mut *self.tree_node_update);
self.stack.push(frame);
}
fn push_existing(&mut self, prefix: Prefix<TS>, node: &mut DynTreeNode<TS, A>, slot: usize) {
let frame = StackFrame::new(self.updates.len(), slot);
self.tree_node_update.from_existing_node(prefix, node);
self.updates.push_val(&mut *self.tree_node_update);
self.stack.push(frame);
}
fn update_timestamp(&mut self, ts: TS, agg: &mut DynOpt<A>) {
debug_assert!(ts >= self.range().key);
let mut tree_node = self.factories.node_factory.default_box();
let mut child_ptr = self.factories.child_ptr_factory.default_box();
let mut tmp_agg = self.factories.opt_aggregate_factory.default_box();
loop {
if self.range().contains(ts) {
match self.prefix() {
None => {
if let Some(agg) = agg.get_mut() {
child_ptr.from_timestamp(ts, agg);
self.slot_mut().from_val(&mut *child_ptr);
};
return;
}
Some(prefix) => {
if prefix.contains(ts) {
if prefix.is_leaf() {
match agg.get_mut() {
None => self.slot_mut().set_none(),
Some(agg) => {
child_ptr.from_timestamp(ts, agg);
self.slot_mut().from_val(&mut *child_ptr);
}
}
return;
} else {
self.tree_cursor.seek_key(&prefix);
debug_assert!(self.tree_cursor.key_valid());
debug_assert_eq!(**self.tree_cursor.key(), prefix);
debug_assert!(
self.tree_cursor.val_valid()
&& **self.tree_cursor.weight() != 0
);
let slot = prefix.slot_of_timestamp(ts);
self.tree_cursor.val().clone_to(&mut *tree_node);
self.push_existing(prefix, &mut *tree_node, slot);
}
} else if agg.get().is_some() {
let new_prefix = prefix.longest_common_prefix(ts);
tree_node.clear();
let slot = new_prefix.slot_of_timestamp(ts);
child_ptr.from_timestamp(ts, agg.get_mut().unwrap());
tree_node.slot_mut(slot).from_val(&mut *child_ptr);
self.slot()
.clone_to(tree_node.slot_mut(new_prefix.slot_of(&prefix)));
child_ptr.from_prefix(new_prefix.clone());
self.slot_mut().from_val(&mut *child_ptr);
self.push_new(new_prefix, &mut *tree_node, slot);
return;
} else {
return;
}
}
}
} else {
if self.node_prefix().contains(ts) {
self.stack_top_mut().slot_index = self.node_prefix().slot_of_timestamp(ts);
} else {
self.pop(&mut *child_ptr, &mut *tmp_agg);
}
}
}
}
}
pub(in crate::operator) fn radix_tree_update<'a, TS, V, Acc, Out, UC, IC, TC>(
factories: &'a RadixTreeFactories<TS, Acc>,
mut input_delta: UC,
mut input: IC,
tree: TC,
aggregator: &'a dyn DynAggregator<V, (), DynZWeight, Accumulator = Acc, Output = Out>,
output_updates: &'a mut DynVec<DynTreeNodeUpdate<TS, Acc>>,
) where
TS: PrimInt + DBData,
V: DataTrait + ?Sized,
Acc: DataTrait + ?Sized,
Out: DataTrait + ?Sized,
UC: ZCursor<DynDataTyped<TS>, V, ()>,
IC: ZCursor<DynDataTyped<TS>, V, ()>,
TC: RadixTreeCursor<TS, Acc>,
{
let mut tree_updater =
<TreeUpdater<'a, TS, Acc, TC>>::new(factories, tree, output_updates, aggregator.combine());
let aggregator = clone_box(aggregator);
let mut agg = factories.opt_aggregate_factory.default_box();
while input_delta.key_valid() {
input.seek_key(input_delta.key());
agg.set_none();
if input.key_valid() && input.key() == input_delta.key() {
aggregator.aggregate(&mut CursorGroup::new(&mut input, ()), &mut *agg)
};
tree_updater.update_timestamp(**input_delta.key(), &mut *agg);
input_delta.step_key();
}
tree_updater.finish();
}