pub mod spine_fueled;
pub mod merge_batcher;
pub mod chainless_batcher;
pub mod ord_neu;
pub mod rhh;
pub mod huffman_container;
pub mod chunker;
pub use self::ord_neu::OrdValSpine as ValSpine;
pub use self::ord_neu::OrdValBatcher as ValBatcher;
pub use self::ord_neu::RcOrdValBuilder as ValBuilder;
pub use self::ord_neu::OrdKeySpine as KeySpine;
pub use self::ord_neu::OrdKeyBatcher as KeyBatcher;
pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder;
use std::convert::TryInto;
use serde::{Deserialize, Serialize};
use timely::container::{DrainContainer, PushInto};
use timely::progress::Timestamp;
use crate::lattice::Lattice;
use crate::difference::Semigroup;
pub trait Update {
type Key: Ord + Clone + 'static;
type Val: Ord + Clone + 'static;
type Time: Ord + Clone + Lattice + timely::progress::Timestamp;
type Diff: Ord + Semigroup + 'static;
}
impl<K,V,T,R> Update for ((K, V), T, R)
where
K: Ord+Clone+'static,
V: Ord+Clone+'static,
T: Ord+Clone+Lattice+timely::progress::Timestamp,
R: Ord+Semigroup+'static,
{
type Key = K;
type Val = V;
type Time = T;
type Diff = R;
}
pub trait Layout {
type KeyContainer: BatchContainer;
type ValContainer: BatchContainer;
type TimeContainer: BatchContainer<Owned: Lattice + timely::progress::Timestamp>;
type DiffContainer: BatchContainer<Owned: Semigroup + 'static>;
type OffsetContainer: for<'a> BatchContainer<ReadItem<'a> = usize>;
}
pub trait WithLayout {
type Layout: Layout;
}
pub trait LayoutExt : WithLayout<Layout: Layout<KeyContainer = Self::KeyContainer, ValContainer = Self::ValContainer, TimeContainer = Self::TimeContainer, DiffContainer = Self::DiffContainer>> {
type Key<'a>: Copy + Ord;
type ValOwn: Clone + Ord;
type Val<'a>: Copy + Ord;
type Time: Lattice + timely::progress::Timestamp;
type TimeGat<'a>: Copy + Ord;
type Diff: Semigroup + 'static;
type DiffGat<'a>: Copy + Ord;
type KeyContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Key<'a>>;
type ValContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Val<'a>, Owned = Self::ValOwn>;
type TimeContainer: for<'a> BatchContainer<ReadItem<'a> = Self::TimeGat<'a>, Owned = Self::Time>;
type DiffContainer: for<'a> BatchContainer<ReadItem<'a> = Self::DiffGat<'a>, Owned = Self::Diff>;
fn owned_val(val: Self::Val<'_>) -> Self::ValOwn;
fn owned_time(time: Self::TimeGat<'_>) -> Self::Time;
fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff;
fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time);
}
impl<L: WithLayout> LayoutExt for L {
type Key<'a> = <<L::Layout as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
type ValOwn = <<L::Layout as Layout>::ValContainer as BatchContainer>::Owned;
type Val<'a> = <<L::Layout as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
type Time = <<L::Layout as Layout>::TimeContainer as BatchContainer>::Owned;
type TimeGat<'a> = <<L::Layout as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
type Diff = <<L::Layout as Layout>::DiffContainer as BatchContainer>::Owned;
type DiffGat<'a> = <<L::Layout as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
type KeyContainer = <L::Layout as Layout>::KeyContainer;
type ValContainer = <L::Layout as Layout>::ValContainer;
type TimeContainer = <L::Layout as Layout>::TimeContainer;
type DiffContainer = <L::Layout as Layout>::DiffContainer;
#[inline(always)] fn owned_val(val: Self::Val<'_>) -> Self::ValOwn { <Self::Layout as Layout>::ValContainer::into_owned(val) }
#[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { <Self::Layout as Layout>::TimeContainer::into_owned(time) }
#[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { <Self::Layout as Layout>::DiffContainer::into_owned(diff) }
#[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { <Self::Layout as Layout>::TimeContainer::clone_onto(time, onto) }
}
impl<KC, VC, TC, DC, OC> Layout for (KC, VC, TC, DC, OC)
where
KC: BatchContainer,
VC: BatchContainer,
TC: BatchContainer<Owned: Lattice + timely::progress::Timestamp>,
DC: BatchContainer<Owned: Semigroup>,
OC: for<'a> BatchContainer<ReadItem<'a> = usize>,
{
type KeyContainer = KC;
type ValContainer = VC;
type TimeContainer = TC;
type DiffContainer = DC;
type OffsetContainer = OC;
}
pub mod layout {
use crate::trace::implementations::{BatchContainer, Layout};
pub type Key<L> = <<L as Layout>::KeyContainer as BatchContainer>::Owned;
pub type KeyRef<'a, L> = <<L as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
pub type Val<L> = <<L as Layout>::ValContainer as BatchContainer>::Owned;
pub type ValRef<'a, L> = <<L as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
pub type Time<L> = <<L as Layout>::TimeContainer as BatchContainer>::Owned;
pub type TimeRef<'a, L> = <<L as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
pub type Diff<L> = <<L as Layout>::DiffContainer as BatchContainer>::Owned;
pub type DiffRef<'a, L> = <<L as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
}
pub struct Vector<U: Update> {
phantom: std::marker::PhantomData<U>,
}
impl<U: Update<Diff: Ord>> Layout for Vector<U> {
type KeyContainer = Vec<U::Key>;
type ValContainer = Vec<U::Val>;
type TimeContainer = Vec<U::Time>;
type DiffContainer = Vec<U::Diff>;
type OffsetContainer = OffsetList;
}
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)]
pub struct OffsetList {
pub zero_prefix: usize,
pub smol: Vec<u32>,
pub chonk: Vec<u64>,
}
impl std::fmt::Debug for OffsetList {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_list().entries(self.into_iter()).finish()
}
}
impl OffsetList {
pub fn with_capacity(cap: usize) -> Self {
Self {
zero_prefix: 0,
smol: Vec::with_capacity(cap),
chonk: Vec::new(),
}
}
pub fn push(&mut self, offset: usize) {
if self.smol.is_empty() && self.chonk.is_empty() && offset == 0 {
self.zero_prefix += 1;
}
else if self.chonk.is_empty() {
if let Ok(smol) = offset.try_into() {
self.smol.push(smol);
}
else {
self.chonk.push(offset.try_into().unwrap())
}
}
else {
self.chonk.push(offset.try_into().unwrap())
}
}
pub fn index(&self, index: usize) -> usize {
if index < self.zero_prefix {
0
}
else if index - self.zero_prefix < self.smol.len() {
self.smol[index - self.zero_prefix].try_into().unwrap()
}
else {
self.chonk[index - self.zero_prefix - self.smol.len()].try_into().unwrap()
}
}
pub fn len(&self) -> usize {
self.zero_prefix + self.smol.len() + self.chonk.len()
}
}
impl<'a> IntoIterator for &'a OffsetList {
type Item = usize;
type IntoIter = OffsetListIter<'a>;
fn into_iter(self) -> Self::IntoIter {
OffsetListIter {list: self, index: 0 }
}
}
pub struct OffsetListIter<'a> {
list: &'a OffsetList,
index: usize,
}
impl<'a> Iterator for OffsetListIter<'a> {
type Item = usize;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.list.len() {
let res = Some(self.list.index(self.index));
self.index += 1;
res
} else {
None
}
}
}
impl PushInto<usize> for OffsetList {
fn push_into(&mut self, item: usize) {
self.push(item);
}
}
impl BatchContainer for OffsetList {
type Owned = usize;
type ReadItem<'a> = usize;
#[inline(always)]
fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item }
#[inline(always)]
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
fn push_own(&mut self, item: &Self::Owned) { self.push_into(*item) }
fn clear(&mut self) { self.zero_prefix = 0; self.smol.clear(); self.chonk.clear(); }
fn with_capacity(size: usize) -> Self {
Self::with_capacity(size)
}
fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
Self::with_capacity(cont1.len() + cont2.len())
}
fn index(&self, index: usize) -> Self::ReadItem<'_> {
self.index(index)
}
fn len(&self) -> usize {
self.len()
}
}
pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: DrainContainer + Sized {
type Key<'a>: Ord;
type Val<'a>: Ord;
type Time;
type Diff;
fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff);
fn key_eq(this: &Self::Key<'_>, other: K::ReadItem<'_>) -> bool;
fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;
fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize);
}
impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
where
K: Ord + Clone + 'static,
KBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a K>>,
V: Ord + Clone + 'static,
VBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a V>>,
T: Timestamp + Lattice + Clone + 'static,
R: Ord + Semigroup + 'static,
{
type Key<'a> = K;
type Val<'a> = V;
type Time = T;
type Diff = R;
fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
(key, val, time, diff)
}
fn key_eq(this: &K, other: KBC::ReadItem<'_>) -> bool {
KBC::reborrow(other) == this
}
fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
VBC::reborrow(other) == this
}
fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for link in chain.iter() {
for ((key, val), _, _) in link.iter() {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
vals += 1;
} else if p_val != val {
vals += 1;
}
} else {
keys += 1;
vals += 1;
}
upds += 1;
prev_keyval = Some((key, val));
}
}
(keys, vals, upds)
}
}
pub use self::containers::{BatchContainer, SliceContainer};
pub mod containers {
use timely::container::PushInto;
pub trait BatchContainer: 'static {
type Owned: Clone + Ord;
type ReadItem<'a>: Copy + Ord;
#[must_use]
fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned;
#[inline(always)]
fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
*other = Self::into_owned(item);
}
fn push_ref(&mut self, item: Self::ReadItem<'_>);
fn push_own(&mut self, item: &Self::Owned);
fn clear(&mut self);
fn with_capacity(size: usize) -> Self;
fn merge_capacity(cont1: &Self, cont2: &Self) -> Self;
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b>;
fn index(&self, index: usize) -> Self::ReadItem<'_>;
fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
if index < self.len() {
Some(self.index(index))
}
else { None }
}
fn len(&self) -> usize;
fn last(&self) -> Option<Self::ReadItem<'_>> {
if self.len() > 0 {
Some(self.index(self.len()-1))
}
else {
None
}
}
fn is_empty(&self) -> bool { self.len() == 0 }
fn advance<F: for<'a> Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize {
let small_limit = 8;
if end > start + small_limit && function(self.index(start + small_limit)) {
let mut index = small_limit + 1;
if start + index < end && function(self.index(start + index)) {
let mut step = 1;
while start + index + step < end && function(self.index(start + index + step)) {
index += step;
step <<= 1;
}
step >>= 1;
while step > 0 {
if start + index + step < end && function(self.index(start + index + step)) {
index += step;
}
step >>= 1;
}
index += 1;
}
index
}
else {
let limit = std::cmp::min(end, start + small_limit);
(start .. limit).filter(|x| function(self.index(*x))).count()
}
}
}
impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
type Owned = T;
type ReadItem<'a> = &'a T;
#[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() }
#[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); }
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
fn push_own(&mut self, item: &Self::Owned) { self.push_into(item.clone()) }
fn clear(&mut self) { self.clear() }
fn with_capacity(size: usize) -> Self {
Vec::with_capacity(size)
}
fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
Vec::with_capacity(cont1.len() + cont2.len())
}
fn index(&self, index: usize) -> Self::ReadItem<'_> {
&self[index]
}
fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
<[T]>::get(&self, index)
}
fn len(&self) -> usize {
self[..].len()
}
}
pub struct SliceContainer<B> {
offsets: Vec<usize>,
inner: Vec<B>,
}
impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
fn push_into(&mut self, item: &[B]) {
for x in item.iter() {
self.inner.push_into(x);
}
self.offsets.push(self.inner.len());
}
}
impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
fn push_into(&mut self, item: &Vec<B>) {
self.push_into(&item[..]);
}
}
impl<B> BatchContainer for SliceContainer<B>
where
B: Ord + Clone + Sized + 'static,
{
type Owned = Vec<B>;
type ReadItem<'a> = &'a [B];
#[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.to_vec() }
#[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from_slice(item); }
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }
fn clear(&mut self) {
self.offsets.clear();
self.offsets.push(0);
self.inner.clear();
}
fn with_capacity(size: usize) -> Self {
let mut offsets = Vec::with_capacity(size + 1);
offsets.push(0);
Self {
offsets,
inner: Vec::with_capacity(size),
}
}
fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
offsets.push(0);
Self {
offsets,
inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
}
}
fn index(&self, index: usize) -> Self::ReadItem<'_> {
let lower = self.offsets[index];
let upper = self.offsets[index+1];
&self.inner[lower .. upper]
}
fn len(&self) -> usize {
self.offsets.len() - 1
}
}
impl<B> Default for SliceContainer<B> {
fn default() -> Self {
Self {
offsets: vec![0],
inner: Default::default(),
}
}
}
}