pub mod spine_fueled;
pub mod chainless_batcher;
pub mod chunker;
pub mod huffman_container;
pub mod merge_batcher;
pub mod ord_neu;
pub mod rhh;
pub use self::ord_neu::OrdKeyBatcher as KeyBatcher;
pub use self::ord_neu::OrdKeySpine as KeySpine;
pub use self::ord_neu::OrdValBatcher as ValBatcher;
pub use self::ord_neu::OrdValSpine as ValSpine;
pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder;
pub use self::ord_neu::RcOrdValBuilder as ValBuilder;
use std::convert::TryInto;
use columnation::Columnation;
use serde::{Deserialize, Serialize};
use timely::container::{DrainContainer, PushInto};
use timely::progress::Timestamp;
use crate::containers::TimelyStack;
use crate::difference::Semigroup;
use crate::lattice::Lattice;
pub trait Update {
type Key: Ord + Clone + 'static;
type Val: Ord + Clone + 'static;
type Time: Ord + Clone + Lattice + timely::progress::Timestamp;
type Diff: Ord + Semigroup + 'static;
}
impl<K, V, T, R> Update for ((K, V), T, R)
where
K: Ord + Clone + 'static,
V: Ord + Clone + 'static,
T: Ord + Clone + Lattice + timely::progress::Timestamp,
R: Ord + Semigroup + 'static,
{
type Key = K;
type Val = V;
type Time = T;
type Diff = R;
}
pub trait Layout {
type KeyContainer: BatchContainer;
type ValContainer: BatchContainer;
type TimeContainer: BatchContainer<Owned: Lattice + timely::progress::Timestamp>;
type DiffContainer: BatchContainer<Owned: Semigroup + 'static>;
type OffsetContainer: for<'a> BatchContainer<ReadItem<'a> = usize>;
}
pub trait WithLayout {
type Layout: Layout;
}
pub trait LayoutExt:
WithLayout<
Layout: Layout<
KeyContainer = Self::KeyContainer,
ValContainer = Self::ValContainer,
TimeContainer = Self::TimeContainer,
DiffContainer = Self::DiffContainer,
>,
>
{
type KeyOwn;
type Key<'a>: Copy + Ord;
type ValOwn: Clone + Ord;
type Val<'a>: Copy + Ord;
type Time: Lattice + timely::progress::Timestamp;
type TimeGat<'a>: Copy + Ord;
type Diff: Semigroup + 'static;
type DiffGat<'a>: Copy + Ord;
type KeyContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Key<'a>, Owned = Self::KeyOwn>;
type ValContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Val<'a>, Owned = Self::ValOwn>;
type TimeContainer: for<'a> BatchContainer<ReadItem<'a> = Self::TimeGat<'a>, Owned = Self::Time>;
type DiffContainer: for<'a> BatchContainer<ReadItem<'a> = Self::DiffGat<'a>, Owned = Self::Diff>;
fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn;
fn owned_val(val: Self::Val<'_>) -> Self::ValOwn;
fn owned_time(time: Self::TimeGat<'_>) -> Self::Time;
fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff;
fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time);
}
impl<L: WithLayout> LayoutExt for L {
type KeyOwn = <<L::Layout as Layout>::KeyContainer as BatchContainer>::Owned;
type Key<'a> = <<L::Layout as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
type ValOwn = <<L::Layout as Layout>::ValContainer as BatchContainer>::Owned;
type Val<'a> = <<L::Layout as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
type Time = <<L::Layout as Layout>::TimeContainer as BatchContainer>::Owned;
type TimeGat<'a> = <<L::Layout as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
type Diff = <<L::Layout as Layout>::DiffContainer as BatchContainer>::Owned;
type DiffGat<'a> = <<L::Layout as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
type KeyContainer = <L::Layout as Layout>::KeyContainer;
type ValContainer = <L::Layout as Layout>::ValContainer;
type TimeContainer = <L::Layout as Layout>::TimeContainer;
type DiffContainer = <L::Layout as Layout>::DiffContainer;
#[inline(always)]
fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn {
<Self::Layout as Layout>::KeyContainer::into_owned(key)
}
#[inline(always)]
fn owned_val(val: Self::Val<'_>) -> Self::ValOwn {
<Self::Layout as Layout>::ValContainer::into_owned(val)
}
#[inline(always)]
fn owned_time(time: Self::TimeGat<'_>) -> Self::Time {
<Self::Layout as Layout>::TimeContainer::into_owned(time)
}
#[inline(always)]
fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff {
<Self::Layout as Layout>::DiffContainer::into_owned(diff)
}
#[inline(always)]
fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) {
<Self::Layout as Layout>::TimeContainer::clone_onto(time, onto)
}
}
impl<KC, VC, TC, DC, OC> Layout for (KC, VC, TC, DC, OC)
where
KC: BatchContainer,
VC: BatchContainer,
TC: BatchContainer<Owned: Lattice + timely::progress::Timestamp>,
DC: BatchContainer<Owned: Semigroup>,
OC: for<'a> BatchContainer<ReadItem<'a> = usize>,
{
type KeyContainer = KC;
type ValContainer = VC;
type TimeContainer = TC;
type DiffContainer = DC;
type OffsetContainer = OC;
}
pub mod layout {
use crate::trace::implementations::{BatchContainer, Layout};
pub type Key<L> = <<L as Layout>::KeyContainer as BatchContainer>::Owned;
pub type KeyRef<'a, L> = <<L as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
pub type Val<L> = <<L as Layout>::ValContainer as BatchContainer>::Owned;
pub type ValRef<'a, L> = <<L as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
pub type Time<L> = <<L as Layout>::TimeContainer as BatchContainer>::Owned;
pub type TimeRef<'a, L> = <<L as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
pub type Diff<L> = <<L as Layout>::DiffContainer as BatchContainer>::Owned;
pub type DiffRef<'a, L> = <<L as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
}
pub struct Vector<U: Update> {
phantom: std::marker::PhantomData<U>,
}
impl<U: Update<Diff: Ord>> Layout for Vector<U> {
type KeyContainer = Vec<U::Key>;
type ValContainer = Vec<U::Val>;
type TimeContainer = Vec<U::Time>;
type DiffContainer = Vec<U::Diff>;
type OffsetContainer = OffsetList;
}
pub struct TStack<U: Update> {
phantom: std::marker::PhantomData<U>,
}
impl<U> Layout for TStack<U>
where
U: Update<Key: Columnation, Val: Columnation, Time: Columnation, Diff: Columnation + Ord>,
{
type KeyContainer = TimelyStack<U::Key>;
type ValContainer = TimelyStack<U::Val>;
type TimeContainer = TimelyStack<U::Time>;
type DiffContainer = TimelyStack<U::Diff>;
type OffsetContainer = OffsetList;
}
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)]
pub struct OffsetList {
pub zero_prefix: usize,
pub smol: Vec<u32>,
pub chonk: Vec<u64>,
}
impl std::fmt::Debug for OffsetList {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_list().entries(self.into_iter()).finish()
}
}
impl OffsetList {
pub fn with_capacity(cap: usize) -> Self {
Self {
zero_prefix: 0,
smol: Vec::with_capacity(cap),
chonk: Vec::new(),
}
}
pub fn push(&mut self, offset: usize) {
if self.smol.is_empty() && self.chonk.is_empty() && offset == 0 {
self.zero_prefix += 1;
} else if self.chonk.is_empty() {
if let Ok(smol) = offset.try_into() {
self.smol.push(smol);
} else {
self.chonk.push(offset.try_into().unwrap())
}
} else {
self.chonk.push(offset.try_into().unwrap())
}
}
pub fn index(&self, index: usize) -> usize {
if index < self.zero_prefix {
0
} else if index - self.zero_prefix < self.smol.len() {
self.smol[index - self.zero_prefix].try_into().unwrap()
} else {
self.chonk[index - self.zero_prefix - self.smol.len()]
.try_into()
.unwrap()
}
}
pub fn len(&self) -> usize {
self.zero_prefix + self.smol.len() + self.chonk.len()
}
}
impl<'a> IntoIterator for &'a OffsetList {
type Item = usize;
type IntoIter = OffsetListIter<'a>;
fn into_iter(self) -> Self::IntoIter {
OffsetListIter {
list: self,
index: 0,
}
}
}
pub struct OffsetListIter<'a> {
list: &'a OffsetList,
index: usize,
}
impl<'a> Iterator for OffsetListIter<'a> {
type Item = usize;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.list.len() {
let res = Some(self.list.index(self.index));
self.index += 1;
res
} else {
None
}
}
}
impl PushInto<usize> for OffsetList {
fn push_into(&mut self, item: usize) {
self.push(item);
}
}
impl BatchContainer for OffsetList {
type Owned = usize;
type ReadItem<'a> = usize;
#[inline(always)]
fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
item
}
#[inline(always)]
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
item
}
fn push_ref(&mut self, item: Self::ReadItem<'_>) {
self.push_into(item)
}
fn push_own(&mut self, item: &Self::Owned) {
self.push_into(*item)
}
fn clear(&mut self) {
self.zero_prefix = 0;
self.smol.clear();
self.chonk.clear();
}
fn with_capacity(size: usize) -> Self {
Self::with_capacity(size)
}
fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
Self::with_capacity(cont1.len() + cont2.len())
}
fn index(&self, index: usize) -> Self::ReadItem<'_> {
self.index(index)
}
fn len(&self) -> usize {
self.len()
}
}
pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: DrainContainer + Sized {
type Key<'a>: Ord;
type Val<'a>: Ord;
type Time;
type Diff;
fn into_parts<'a>(
item: Self::Item<'a>,
) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff);
fn key_eq(this: &Self::Key<'_>, other: K::ReadItem<'_>) -> bool;
fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;
fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize);
}
impl<K, KBC, V, VBC, T, R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
where
K: Ord + Clone + 'static,
KBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a K>>,
V: Ord + Clone + 'static,
VBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a V>>,
T: Timestamp + Lattice + Clone + 'static,
R: Ord + Semigroup + 'static,
{
type Key<'a> = K;
type Val<'a> = V;
type Time = T;
type Diff = R;
fn into_parts<'a>(
((key, val), time, diff): Self::Item<'a>,
) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
(key, val, time, diff)
}
fn key_eq(this: &K, other: KBC::ReadItem<'_>) -> bool {
KBC::reborrow(other) == this
}
fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
VBC::reborrow(other) == this
}
fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for link in chain.iter() {
for ((key, val), _, _) in link.iter() {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
vals += 1;
} else if p_val != val {
vals += 1;
}
} else {
keys += 1;
vals += 1;
}
upds += 1;
prev_keyval = Some((key, val));
}
}
(keys, vals, upds)
}
}
impl<K, V, T, R> BuilderInput<K, V> for TimelyStack<((K::Owned, V::Owned), T, R)>
where
K: for<'a> BatchContainer<
ReadItem<'a>: PartialEq<&'a K::Owned>,
Owned: Ord + Columnation + Clone + 'static,
>,
V: for<'a> BatchContainer<
ReadItem<'a>: PartialEq<&'a V::Owned>,
Owned: Ord + Columnation + Clone + 'static,
>,
T: Timestamp + Lattice + Columnation + Clone + 'static,
R: Ord + Clone + Semigroup + Columnation + 'static,
{
type Key<'a> = &'a K::Owned;
type Val<'a> = &'a V::Owned;
type Time = T;
type Diff = R;
fn into_parts<'a>(
((key, val), time, diff): Self::Item<'a>,
) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
(key, val, time.clone(), diff.clone())
}
fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool {
K::reborrow(other) == *this
}
fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
V::reborrow(other) == *this
}
fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for link in chain.iter() {
for ((key, val), _, _) in link.iter() {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
vals += 1;
} else if p_val != val {
vals += 1;
}
} else {
keys += 1;
vals += 1;
}
upds += 1;
prev_keyval = Some((key, val));
}
}
(keys, vals, upds)
}
}
pub use self::containers::{BatchContainer, SliceContainer};
pub mod containers {
use columnation::Columnation;
use timely::container::PushInto;
use crate::containers::TimelyStack;
pub trait BatchContainer: 'static {
type Owned: Clone + Ord;
type ReadItem<'a>: Copy + Ord;
#[must_use]
fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned;
#[inline(always)]
fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
*other = Self::into_owned(item);
}
fn push_ref(&mut self, item: Self::ReadItem<'_>);
fn push_own(&mut self, item: &Self::Owned);
fn clear(&mut self);
fn with_capacity(size: usize) -> Self;
fn merge_capacity(cont1: &Self, cont2: &Self) -> Self;
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b>;
fn index(&self, index: usize) -> Self::ReadItem<'_>;
fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
if index < self.len() {
Some(self.index(index))
} else {
None
}
}
fn len(&self) -> usize;
fn last(&self) -> Option<Self::ReadItem<'_>> {
if self.len() > 0 {
Some(self.index(self.len() - 1))
} else {
None
}
}
fn is_empty(&self) -> bool {
self.len() == 0
}
fn advance<F: for<'a> Fn(Self::ReadItem<'a>) -> bool>(
&self,
start: usize,
end: usize,
function: F,
) -> usize {
let small_limit = 8;
if end > start + small_limit && function(self.index(start + small_limit)) {
let mut index = small_limit + 1;
if start + index < end && function(self.index(start + index)) {
let mut step = 1;
while start + index + step < end && function(self.index(start + index + step)) {
index += step;
step <<= 1;
}
step >>= 1;
while step > 0 {
if start + index + step < end && function(self.index(start + index + step))
{
index += step;
}
step >>= 1;
}
index += 1;
}
index
} else {
let limit = std::cmp::min(end, start + small_limit);
(start..limit).filter(|x| function(self.index(*x))).count()
}
}
}
impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
type Owned = T;
type ReadItem<'a> = &'a T;
#[inline(always)]
fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
item.clone()
}
#[inline(always)]
fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
other.clone_from(item);
}
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
item
}
fn push_ref(&mut self, item: Self::ReadItem<'_>) {
self.push_into(item)
}
fn push_own(&mut self, item: &Self::Owned) {
self.push_into(item.clone())
}
fn clear(&mut self) {
self.clear()
}
fn with_capacity(size: usize) -> Self {
Vec::with_capacity(size)
}
fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
Vec::with_capacity(cont1.len() + cont2.len())
}
fn index(&self, index: usize) -> Self::ReadItem<'_> {
&self[index]
}
fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
<[T]>::get(&self, index)
}
fn len(&self) -> usize {
self[..].len()
}
}
impl<T: Clone + Ord + Columnation + 'static> BatchContainer for TimelyStack<T> {
type Owned = T;
type ReadItem<'a> = &'a T;
#[inline(always)]
fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
item.clone()
}
#[inline(always)]
fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
other.clone_from(item);
}
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
item
}
fn push_ref(&mut self, item: Self::ReadItem<'_>) {
self.push_into(item)
}
fn push_own(&mut self, item: &Self::Owned) {
self.push_into(item)
}
fn clear(&mut self) {
self.clear()
}
fn with_capacity(size: usize) -> Self {
Self::with_capacity(size)
}
fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
let mut new = Self::default();
new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
new
}
fn index(&self, index: usize) -> Self::ReadItem<'_> {
&self[index]
}
fn len(&self) -> usize {
self[..].len()
}
}
pub struct SliceContainer<B> {
offsets: Vec<usize>,
inner: Vec<B>,
}
impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
fn push_into(&mut self, item: &[B]) {
for x in item.iter() {
self.inner.push_into(x);
}
self.offsets.push(self.inner.len());
}
}
impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
fn push_into(&mut self, item: &Vec<B>) {
self.push_into(&item[..]);
}
}
impl<B> BatchContainer for SliceContainer<B>
where
B: Ord + Clone + Sized + 'static,
{
type Owned = Vec<B>;
type ReadItem<'a> = &'a [B];
#[inline(always)]
fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
item.to_vec()
}
#[inline(always)]
fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
other.clone_from_slice(item);
}
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
item
}
fn push_ref(&mut self, item: Self::ReadItem<'_>) {
self.push_into(item)
}
fn push_own(&mut self, item: &Self::Owned) {
self.push_into(item)
}
fn clear(&mut self) {
self.offsets.clear();
self.offsets.push(0);
self.inner.clear();
}
fn with_capacity(size: usize) -> Self {
let mut offsets = Vec::with_capacity(size + 1);
offsets.push(0);
Self {
offsets,
inner: Vec::with_capacity(size),
}
}
fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
offsets.push(0);
Self {
offsets,
inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
}
}
fn index(&self, index: usize) -> Self::ReadItem<'_> {
let lower = self.offsets[index];
let upper = self.offsets[index + 1];
&self.inner[lower..upper]
}
fn len(&self) -> usize {
self.offsets.len() - 1
}
}
impl<B> Default for SliceContainer<B> {
fn default() -> Self {
Self {
offsets: vec![0],
inner: Default::default(),
}
}
}
}