use crate::{
DBData, ZWeight,
algebra::{HasOne, ZCursor},
dynamic::{DataTrait, DynOpt},
operator::dynamic::{aggregate::AggCombineFunc, time_series::Range},
};
use dyn_clone::clone_box;
use num::PrimInt;
use std::{
collections::{BTreeMap, BTreeSet},
fmt,
fmt::Write,
ops::Deref,
};
mod partitioned_tree_aggregate;
mod prefix;
mod tree_aggregate;
mod treenode;
mod updater;
use crate::dynamic::{ClonableTrait, DynVec, Erase, Factory, LeanVec, WithFactory};
pub use partitioned_tree_aggregate::{
FilePartitionedRadixTreeFactories, OrdPartitionedTreeAggregateFactories,
PartitionedRadixTreeBatch,
};
pub use prefix::{DynPrefix, Prefix};
pub use treenode::{
ChildPtr, DynChildPtr, DynTreeNode, DynTreeNodeUpdate, TreeNode, TreeNodeUpdate,
};
use updater::radix_tree_update;
const RADIX: usize = 2;
const RADIX_BITS: u32 = RADIX.trailing_zeros();
pub struct RadixTreeFactories<TS: 'static, A: DataTrait + ?Sized> {
opt_aggregate_factory: &'static dyn Factory<DynOpt<A>>,
child_ptr_factory: &'static dyn Factory<DynChildPtr<TS, A>>,
node_factory: &'static dyn Factory<DynTreeNode<TS, A>>,
node_update_factory: &'static dyn Factory<DynTreeNodeUpdate<TS, A>>,
node_updates_factory: &'static dyn Factory<DynVec<DynTreeNodeUpdate<TS, A>>>,
}
impl<TS, A> RadixTreeFactories<TS, A>
where
TS: DBData + PrimInt,
A: DataTrait + ?Sized,
{
pub fn new<AType>() -> Self
where
AType: DBData + Erase<A>,
{
Self {
opt_aggregate_factory: WithFactory::<Option<AType>>::FACTORY,
child_ptr_factory: WithFactory::<ChildPtr<TS, AType>>::FACTORY,
node_factory: WithFactory::<TreeNode<TS, AType>>::FACTORY,
node_update_factory: WithFactory::<TreeNodeUpdate<TS, AType>>::FACTORY,
node_updates_factory: WithFactory::<LeanVec<TreeNodeUpdate<TS, AType>>>::FACTORY,
}
}
}
impl<TS: 'static, A: DataTrait + ?Sized> Clone for RadixTreeFactories<TS, A> {
fn clone(&self) -> Self {
Self {
opt_aggregate_factory: self.opt_aggregate_factory,
child_ptr_factory: self.child_ptr_factory,
node_factory: self.node_factory,
node_update_factory: self.node_update_factory,
node_updates_factory: self.node_updates_factory,
}
}
}
pub trait RadixTreeCursor<TS, A>: ZCursor<DynPrefix<TS>, DynTreeNode<TS, A>, ()>
where
A: DataTrait + ?Sized,
TS: PrimInt + DBData,
{
fn aggregate_range(
&mut self,
range: &Range<TS>,
combine: &dyn AggCombineFunc<A>,
result: &mut DynOpt<A>,
) {
result.set_none();
if !self.key_valid() {
return;
}
debug_assert!(self.val_valid() && **self.weight() != 0);
let node = clone_box(self.val());
self.aggregate_range_inner(&Prefix::full_range(), node.deref(), range, combine, result)
}
#[doc(hidden)]
fn aggregate_range_inner(
&mut self,
prefix: &Prefix<TS>,
node: &DynTreeNode<TS, A>,
range: &Range<TS>,
combine: &dyn AggCombineFunc<A>,
agg: &mut DynOpt<A>,
) {
let start = if range.from < prefix.key {
0
} else if prefix.contains(range.from) {
prefix.slot_of_timestamp(range.from)
} else {
RADIX
};
let len = if prefix.contains(range.to) {
prefix.slot_of_timestamp(range.to) + 1 - start
} else if prefix.key < range.to {
RADIX - start
} else {
0
};
let mut child_node = clone_box(node);
for idx in start..start + len {
if let Some(child) = node.slot(idx).get() {
let child_prefix = child.child_prefix();
if child_prefix.in_range(range) {
if let Some(agg) = agg.get_mut() {
combine(agg, child.child_agg());
} else {
agg.from_ref(child.child_agg());
};
} else if child_prefix.contains(range.from) || child_prefix.contains(range.to) {
self.seek_key(child_prefix.erase());
debug_assert!(self.key_valid() && **self.weight() != 0);
debug_assert_eq!(**self.key(), child_prefix);
self.val().clone_to(&mut *child_node);
self.aggregate_range_inner(&child_prefix, &*child_node, range, combine, agg);
}
}
}
}
#[allow(unused)]
fn format_tree<W>(&mut self, writer: &mut W) -> Result<(), fmt::Error>
where
W: Write,
{
while self.key_valid() {
debug_assert!(self.val_valid() && **self.weight() != 0);
let indent = self.key().prefix_len as usize / RADIX_BITS as usize;
writeln!(
writer,
"{:indent$}[{}] => {}",
"",
self.key().deref(),
self.val()
)?;
self.step_key();
}
Ok(())
}
#[allow(unused)]
fn validate(&mut self, contents: &BTreeMap<TS, Box<A>>, combine: &dyn Fn(&mut A, &A)) {
let mut contents_clone = BTreeMap::new();
for (k, v) in contents.iter() {
contents_clone.insert(*k, clone_box(v.as_ref()));
}
let mut contents = contents_clone;
let mut expected_prefixes = BTreeSet::new();
expected_prefixes.insert(Prefix::full_range());
while self.key_valid() {
debug_assert!(self.val_valid() && **self.weight() != 0);
assert_eq!(**self.weight(), ZWeight::one());
let node_prefix = self.key();
assert!(expected_prefixes.remove(node_prefix));
let node = clone_box(self.val());
for child_idx in 0..RADIX {
if let Some(child_ptr) = node.slot(child_idx).get() {
assert!(node_prefix.contains(child_ptr.child_prefix().key));
assert!(node_prefix.prefix_len < child_ptr.child_prefix().prefix_len);
assert_eq!(
child_idx,
node_prefix.slot_of_timestamp(child_ptr.child_prefix().key)
);
if child_ptr.child_prefix().is_leaf() {
let agg = contents.remove(&child_ptr.child_prefix().key).unwrap();
assert_eq!(&*agg, child_ptr.child_agg());
} else {
let mut accumulator: Option<Box<A>> = None;
for (_, key_agg) in contents
.iter()
.filter(|&(&k, _)| child_ptr.child_prefix().contains(k))
{
match &mut accumulator {
None => accumulator = Some(clone_box(key_agg)),
Some(x) => combine(x, key_agg),
}
}
let accumulator = accumulator.unwrap();
assert_eq!(&*accumulator, child_ptr.child_agg());
expected_prefixes.insert(child_ptr.child_prefix().clone());
}
}
}
self.step_val();
assert!(!self.val_valid());
self.step_key();
}
assert!(contents.is_empty());
expected_prefixes.remove(&Prefix::full_range());
assert!(expected_prefixes.is_empty());
}
}
impl<TS, A, C> RadixTreeCursor<TS, A> for C
where
A: DataTrait + ?Sized,
TS: PrimInt + DBData,
C: ZCursor<DynPrefix<TS>, DynTreeNode<TS, A>, ()>,
{
}
#[cfg(test)]
pub(in crate::operator) mod test {
use super::RadixTreeCursor;
use crate::{
DBData,
algebra::Semigroup,
dynamic::{DowncastTrait, DynData, Erase},
operator::dynamic::time_series::Range,
};
use num::PrimInt;
use std::{collections::BTreeMap, iter::once};
pub(in crate::operator) fn test_aggregate_range<TS, A, C, S>(
cursor: &mut C,
contents: &BTreeMap<TS, Box<DynData >>,
) where
C: RadixTreeCursor<TS, DynData >,
TS: PrimInt + DBData,
A: DBData,
S: Semigroup<A>,
{
let keys: Vec<TS> = once(TS::min_value())
.chain(contents.keys().cloned())
.chain(once(TS::max_value()))
.collect();
for (i, from) in keys.iter().enumerate() {
for to in &keys[i..] {
let expected_agg = contents.range(*from..=*to).fold(None, |acc, (_, v)| {
Some(if let Some(acc) = acc {
S::combine(&acc, v.downcast_checked::<A>())
} else {
v.downcast_checked::<A>().clone()
})
});
cursor.rewind_keys();
let mut agg = None;
cursor.aggregate_range(
&Range::new(*from, *to),
&|acc, val| {
*acc.downcast_mut_checked::<A>() = S::combine(
(acc as &DynData).downcast_checked::<A>(),
val.downcast_checked::<A>(),
)
},
agg.erase_mut(),
);
assert_eq!(
agg, expected_agg,
"Aggregating in range {:x?}..{:x?}",
*from, *to
);
}
}
}
}