#![allow(dead_code, unused_imports)]
pub use layout::{ColumnarLayout, ColumnarUpdate};
pub mod layout {
use std::fmt::Debug;
use columnar::Columnar;
use differential_dataflow::trace::implementations::{Layout, OffsetList};
use differential_dataflow::difference::Semigroup;
use differential_dataflow::lattice::Lattice;
use timely::progress::Timestamp;
pub struct ColumnarLayout<U: ColumnarUpdate> {
phantom: std::marker::PhantomData<U>,
}
impl<K, V, T, R> ColumnarUpdate for (K, V, T, R)
where
K: Columnar<Container: OrdContainer + Debug + Default> + Debug + Ord + Clone + 'static,
V: Columnar<Container: OrdContainer + Debug + Default> + Debug + Ord + Clone + 'static,
T: Columnar<Container: OrdContainer + Debug + Default> + Debug + Ord + Default + Clone + Lattice + Timestamp,
R: Columnar<Container: OrdContainer + Debug + Default> + Debug + Ord + Default + Semigroup + 'static,
{
type Key = K;
type Val = V;
type Time = T;
type Diff = R;
}
use crate::arrangement::Coltainer;
impl<U: ColumnarUpdate> Layout for ColumnarLayout<U> {
type KeyContainer = Coltainer<U::Key>;
type ValContainer = Coltainer<U::Val>;
type TimeContainer = Coltainer<U::Time>;
type DiffContainer = Coltainer<U::Diff>;
type OffsetContainer = OffsetList;
}
pub trait ColumnarUpdate : Debug + 'static {
type Key: Columnar<Container: OrdContainer + Debug + Default> + Debug + Ord + Clone + 'static;
type Val: Columnar<Container: OrdContainer + Debug + Default> + Debug + Ord + Clone + 'static;
type Time: Columnar<Container: OrdContainer + Debug + Default> + Debug + Ord + Default + Clone + Lattice + Timestamp;
type Diff: Columnar<Container: OrdContainer + Debug + Default> + Debug + Ord + Default + Semigroup + 'static;
}
pub trait OrdContainer : for<'a> columnar::Container<Ref<'a> : Ord> { }
impl<C: for<'a> columnar::Container<Ref<'a> : Ord>> OrdContainer for C { }
}
pub use updates::Updates;
pub struct RecordedUpdates<U: layout::ColumnarUpdate> {
pub updates: Updates<U>,
pub records: usize,
pub consolidated: bool,
}
impl<U: layout::ColumnarUpdate> Default for RecordedUpdates<U> {
fn default() -> Self { Self { updates: Default::default(), records: 0, consolidated: true } }
}
impl<U: layout::ColumnarUpdate> Clone for RecordedUpdates<U> {
fn clone(&self) -> Self { Self { updates: self.updates.clone(), records: self.records, consolidated: self.consolidated } }
}
impl<U: layout::ColumnarUpdate> timely::Accountable for RecordedUpdates<U> {
#[inline] fn record_count(&self) -> i64 { self.records as i64 }
}
impl<U: layout::ColumnarUpdate> timely::dataflow::channels::ContainerBytes for RecordedUpdates<U> {
fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() }
fn length_in_bytes(&self) -> usize { unimplemented!() }
fn into_bytes<W: std::io::Write>(&self, _writer: &mut W) { unimplemented!() }
}
mod container_impls {
use columnar::{Borrow, Columnar, Index, Len, Push};
use timely::progress::{Timestamp, timestamp::Refines};
use differential_dataflow::difference::Abelian;
use differential_dataflow::collection::containers::{Negate, Enter, Leave, ResultsIn};
use crate::layout::ColumnarUpdate as Update;
use crate::{RecordedUpdates, Updates};
impl<U: Update<Diff: Abelian>> Negate for RecordedUpdates<U> {
fn negate(mut self) -> Self {
let len = self.updates.diffs.values.len();
let mut new_diffs = <<U::Diff as Columnar>::Container as Default>::default();
let mut owned = U::Diff::default();
for i in 0..len {
columnar::Columnar::copy_from(&mut owned, self.updates.diffs.values.borrow().get(i));
owned.negate();
new_diffs.push(&owned);
}
self.updates.diffs.values = new_diffs;
self
}
}
impl<K, V, T1, T2, R> Enter<T1, T2> for RecordedUpdates<(K, V, T1, R)>
where
(K, V, T1, R): Update<Key=K, Val=V, Time=T1, Diff=R>,
(K, V, T2, R): Update<Key=K, Val=V, Time=T2, Diff=R>,
T1: Timestamp + Columnar + Default + Clone,
T2: Refines<T1> + Columnar + Default + Clone,
K: Columnar, V: Columnar, R: Columnar,
{
type InnerContainer = RecordedUpdates<(K, V, T2, R)>;
fn enter(self) -> Self::InnerContainer {
let mut new_times = <<T2 as Columnar>::Container as Default>::default();
let mut t1_owned = T1::default();
for i in 0..self.updates.times.values.len() {
Columnar::copy_from(&mut t1_owned, self.updates.times.values.borrow().get(i));
let t2 = T2::to_inner(t1_owned.clone());
new_times.push(&t2);
}
RecordedUpdates {
consolidated: self.consolidated,
updates: Updates {
keys: self.updates.keys,
vals: self.updates.vals,
times: crate::updates::Lists { values: new_times, bounds: self.updates.times.bounds },
diffs: self.updates.diffs,
},
records: self.records,
}
}
}
impl<K, V, T1, T2, R> Leave<T1, T2> for RecordedUpdates<(K, V, T1, R)>
where
(K, V, T1, R): Update<Key=K, Val=V, Time=T1, Diff=R>,
(K, V, T2, R): Update<Key=K, Val=V, Time=T2, Diff=R>,
T1: Refines<T2> + Columnar + Default + Clone,
T2: Timestamp + Columnar + Default + Clone,
K: Columnar, V: Columnar, R: Columnar,
{
type OuterContainer = RecordedUpdates<(K, V, T2, R)>;
fn leave(self) -> Self::OuterContainer {
let mut flat = Updates::<(K, V, T2, R)>::default();
let mut t1_owned = T1::default();
for (k, v, t, d) in self.updates.iter() {
Columnar::copy_from(&mut t1_owned, t);
let t2: T2 = t1_owned.clone().to_outer();
flat.push((k, v, &t2, d));
}
RecordedUpdates {
updates: flat.consolidate(),
records: self.records,
consolidated: true,
}
}
}
impl<U: Update> ResultsIn<<U::Time as Timestamp>::Summary> for RecordedUpdates<U> {
fn results_in(self, step: &<U::Time as Timestamp>::Summary) -> Self {
use timely::progress::PathSummary;
let mut output = Updates::<U>::default();
let mut time_owned = U::Time::default();
for (k, v, t, d) in self.updates.iter() {
Columnar::copy_from(&mut time_owned, t);
if let Some(new_time) = step.results_in(&time_owned) {
output.push((k, v, &new_time, d));
}
}
RecordedUpdates { updates: output, records: self.records, consolidated: false }
}
}
}
pub use column_builder::ValBuilder as ValColBuilder;
mod column_builder {
use std::collections::VecDeque;
use columnar::{Columnar, Clear, Len, Push};
use crate::layout::ColumnarUpdate as Update;
use crate::{Updates, RecordedUpdates};
type TupleContainer<U> = <(<U as Update>::Key, <U as Update>::Val, <U as Update>::Time, <U as Update>::Diff) as Columnar>::Container;
pub struct ValBuilder<U: Update> {
current: TupleContainer<U>,
empty: Option<RecordedUpdates<U>>,
pending: VecDeque<RecordedUpdates<U>>,
}
use timely::container::PushInto;
impl<T, U: Update> PushInto<T> for ValBuilder<U> where TupleContainer<U> : Push<T> {
#[inline]
fn push_into(&mut self, item: T) {
self.current.push(item);
if self.current.len() > 1024 * 1024 {
use columnar::{Borrow, Index};
let records = self.current.len();
let mut refs = self.current.borrow().into_index_iter().collect::<Vec<_>>();
refs.sort();
let updates = Updates::form(refs.into_iter());
self.pending.push_back(RecordedUpdates { updates, records, consolidated: true });
self.current.clear();
}
}
}
impl<U: Update> Default for ValBuilder<U> {
fn default() -> Self {
ValBuilder {
current: Default::default(),
empty: None,
pending: Default::default(),
}
}
}
use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
impl<U: Update> ContainerBuilder for ValBuilder<U> {
type Container = RecordedUpdates<U>;
#[inline]
fn extract(&mut self) -> Option<&mut Self::Container> {
if let Some(container) = self.pending.pop_front() {
self.empty = Some(container);
self.empty.as_mut()
} else {
None
}
}
#[inline]
fn finish(&mut self) -> Option<&mut Self::Container> {
if !self.current.is_empty() {
use columnar::{Borrow, Index};
let records = self.current.len();
let mut refs = self.current.borrow().into_index_iter().collect::<Vec<_>>();
refs.sort();
let updates = Updates::form(refs.into_iter());
self.pending.push_back(RecordedUpdates { updates, records, consolidated: true });
self.current.clear();
}
self.empty = self.pending.pop_front();
self.empty.as_mut()
}
}
impl<U: Update> LengthPreservingContainerBuilder for ValBuilder<U> { }
}
pub use distributor::ValPact;
mod distributor {
use std::rc::Rc;
use columnar::{Borrow, Index, Len};
use timely::logging::TimelyLogger;
use timely::dataflow::channels::pushers::{Exchange, exchange::Distributor};
use timely::dataflow::channels::Message;
use timely::dataflow::channels::pact::{LogPuller, LogPusher, ParallelizationContract};
use timely::progress::Timestamp;
use timely::worker::Worker;
use crate::layout::ColumnarUpdate as Update;
use crate::{Updates, RecordedUpdates};
pub struct ValDistributor<U: Update, H> {
marker: std::marker::PhantomData<U>,
hashfunc: H,
pre_lens: Vec<usize>,
}
impl<U: Update, H: for<'a> FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor<RecordedUpdates<U>> for ValDistributor<U, H> {
fn partition<T: Clone, P: timely::communication::Push<Message<T, RecordedUpdates<U>>>>(&mut self, container: &mut RecordedUpdates<U>, time: &T, pushers: &mut [P]) {
use crate::updates::child_range;
let keys_b = container.updates.keys.borrow();
let mut outputs: Vec<Updates<U>> = (0..pushers.len()).map(|_| Updates::default()).collect();
for outer in 0..Len::len(&keys_b) {
self.pre_lens.clear();
self.pre_lens.extend(outputs.iter().map(|o| o.keys.values.len()));
for k in child_range(keys_b.bounds, outer) {
let key = keys_b.values.get(k);
let idx = ((self.hashfunc)(key) as usize) % pushers.len();
outputs[idx].extend_from_keys(&container.updates, k..k+1);
}
for (output, &pre) in outputs.iter_mut().zip(self.pre_lens.iter()) {
if output.keys.values.len() > pre {
output.keys.bounds.push(output.keys.values.len() as u64);
}
}
}
let total_records = container.records;
let non_empty: usize = outputs.iter().filter(|o| !o.keys.values.is_empty()).count();
let mut first_records = total_records.saturating_sub(non_empty.saturating_sub(1));
for (pusher, output) in pushers.iter_mut().zip(outputs) {
if !output.keys.values.is_empty() {
let recorded = RecordedUpdates { updates: output, records: first_records, consolidated: container.consolidated };
first_records = 1;
let mut recorded = recorded;
Message::push_at(&mut recorded, time.clone(), pusher);
}
}
}
fn flush<T: Clone, P: timely::communication::Push<Message<T, RecordedUpdates<U>>>>(&mut self, _time: &T, _pushers: &mut [P]) { }
fn relax(&mut self) { }
}
pub struct ValPact<H> { pub hashfunc: H }
impl<T, U, H> ParallelizationContract<T, RecordedUpdates<U>> for ValPact<H>
where
T: Timestamp,
U: Update,
H: for<'a> FnMut(columnar::Ref<'a, U::Key>)->u64 + 'static,
{
type Pusher = Exchange<
T,
LogPusher<Box<dyn timely::communication::Push<Message<T, RecordedUpdates<U>>>>>,
ValDistributor<U, H>
>;
type Puller = LogPuller<Box<dyn timely::communication::Pull<Message<T, RecordedUpdates<U>>>>>;
fn connect(self, worker: &Worker, identifier: usize, address: Rc<[usize]>, logging: Option<TimelyLogger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = worker.allocate::<Message<T, RecordedUpdates<U>>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, worker.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
let distributor = ValDistributor {
marker: std::marker::PhantomData,
hashfunc: self.hashfunc,
pre_lens: Vec::new(),
};
(Exchange::new(senders, distributor), LogPuller::new(receiver, worker.index(), identifier, logging.clone()))
}
}
}
pub use arrangement::{ValBatcher, ValBuilder, ValSpine};
pub mod arrangement {
use std::rc::Rc;
use differential_dataflow::trace::implementations::ord_neu::OrdValBatch;
use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
use differential_dataflow::trace::implementations::spine_fueled::Spine;
use crate::layout::ColumnarLayout;
pub type ValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<ColumnarLayout<(K,V,T,R)>>>>;
pub type ValBatcher<K, V, T, R> = ValBatcher2<(K,V,T,R)>;
pub type ValBuilder<K, V, T, R> = RcBuilder<ValMirror<(K,V,T,R)>>;
pub use batch_container::Coltainer;
pub mod batch_container {
use columnar::{Borrow, Columnar, Container, Clear, Push, Index, Len};
use differential_dataflow::trace::implementations::BatchContainer;
pub struct Coltainer<C: Columnar> {
pub container: C::Container,
}
impl<C: Columnar> Default for Coltainer<C> {
fn default() -> Self { Self { container: Default::default() } }
}
impl<C: Columnar + Ord + Clone> BatchContainer for Coltainer<C> where for<'a> columnar::Ref<'a, C> : Ord {
type ReadItem<'a> = columnar::Ref<'a, C>;
type Owned = C;
#[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { C::into_owned(item) }
#[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.copy_from(item) }
#[inline(always)] fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.container.push(item) }
#[inline(always)] fn push_own(&mut self, item: &Self::Owned) { self.container.push(item) }
fn clear(&mut self) { self.container.clear() }
fn with_capacity(_size: usize) -> Self { Self::default() }
fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
Self {
container: <C as Columnar>::Container::with_capacity_for([cont1.container.borrow(), cont2.container.borrow()].into_iter()),
}
}
#[inline(always)] fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { columnar::ContainerOf::<C>::reborrow_ref(item) }
#[inline(always)] fn index(&self, index: usize) -> Self::ReadItem<'_> { self.container.borrow().get(index) }
#[inline(always)] fn len(&self) -> usize { self.container.len() }
}
}
use crate::{Updates, RecordedUpdates};
use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
type ValBatcher2<U> = MergeBatcher<RecordedUpdates<U>, TrieChunker<U>, trie_merger::TrieMerger<U>>;
pub struct TrieChunker<U: crate::layout::ColumnarUpdate> {
ready: std::collections::VecDeque<Updates<U>>,
empty: Option<Updates<U>>,
}
impl<U: crate::layout::ColumnarUpdate> Default for TrieChunker<U> {
fn default() -> Self { Self { ready: Default::default(), empty: None } }
}
impl<'a, U: crate::layout::ColumnarUpdate> timely::container::PushInto<&'a mut RecordedUpdates<U>> for TrieChunker<U> {
fn push_into(&mut self, container: &'a mut RecordedUpdates<U>) {
let mut updates = std::mem::take(&mut container.updates);
if !container.consolidated { updates = updates.consolidate(); }
if updates.len() > 0 { self.ready.push_back(updates); }
}
}
impl<U: crate::layout::ColumnarUpdate> timely::container::ContainerBuilder for TrieChunker<U> {
type Container = Updates<U>;
fn extract(&mut self) -> Option<&mut Self::Container> {
if let Some(ready) = self.ready.pop_front() {
self.empty = Some(ready);
self.empty.as_mut()
} else {
None
}
}
fn finish(&mut self) -> Option<&mut Self::Container> {
self.empty = self.ready.pop_front();
self.empty.as_mut()
}
}
pub mod batcher {
use columnar::{Borrow, Columnar, Index, Len, Push};
use differential_dataflow::difference::{Semigroup, IsZero};
use timely::progress::frontier::{Antichain, AntichainRef};
use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge;
use crate::ColumnarUpdate as Update;
use crate::Updates;
impl<U: Update> timely::container::SizableContainer for Updates<U> {
fn at_capacity(&self) -> bool { self.diffs.values.len() >= 64 * 1024 }
fn ensure_capacity(&mut self, _stash: &mut Option<Self>) { }
}
impl<U: Update> InternalMerge for Updates<U> {
type TimeOwned = U::Time;
fn len(&self) -> usize { unimplemented!() }
fn clear(&mut self) {
use columnar::Clear;
self.keys.clear();
self.vals.clear();
self.times.clear();
self.diffs.clear();
}
fn merge_from(&mut self, _others: &mut [Self], _positions: &mut [usize]) { unimplemented!() }
fn extract(&mut self,
_position: &mut usize,
_upper: AntichainRef<U::Time>,
_frontier: &mut Antichain<U::Time>,
_keep: &mut Self,
_ship: &mut Self,
) { unimplemented!() }
}
}
pub mod trie_merger {
use columnar::{Columnar, Len};
use timely::PartialOrder;
use timely::progress::frontier::{Antichain, AntichainRef};
use differential_dataflow::trace::implementations::merge_batcher::Merger;
use crate::ColumnarUpdate as Update;
use crate::Updates;
pub struct TrieMerger<U: Update> {
_marker: std::marker::PhantomData<U>,
}
impl<U: Update> Default for TrieMerger<U> {
fn default() -> Self { Self { _marker: std::marker::PhantomData } }
}
struct Merging<I1: Iterator, I2: Iterator> {
iter1: std::iter::Peekable<I1>,
iter2: std::iter::Peekable<I2>,
}
impl<K, V, T, D, I1, I2> Iterator for Merging<I1, I2>
where
K: Copy + Ord,
V: Copy + Ord,
T: Copy + Ord,
I1: Iterator<Item = (K, V, T, D)>,
I2: Iterator<Item = (K, V, T, D)>,
{
type Item = (K, V, T, D);
#[inline]
fn next(&mut self) -> Option<Self::Item> {
match (self.iter1.peek(), self.iter2.peek()) {
(Some(a), Some(b)) => {
if (a.0, a.1, a.2) <= (b.0, b.1, b.2) {
self.iter1.next()
} else {
self.iter2.next()
}
}
(Some(_), None) => self.iter1.next(),
(None, Some(_)) => self.iter2.next(),
(None, None) => None,
}
}
}
fn form_chunks<'a, U: Update>(
sorted: impl Iterator<Item = columnar::Ref<'a, crate::updates::Tuple<U>>>,
output: &mut Vec<Updates<U>>,
) {
let mut sorted = sorted.peekable();
while sorted.peek().is_some() {
let chunk = Updates::<U>::form((&mut sorted).take(64 * 1024));
if chunk.len() > 0 {
output.push(chunk);
}
}
}
impl<U: Update> Merger for TrieMerger<U>
where
U::Time: 'static,
{
type Chunk = Updates<U>;
type Time = U::Time;
fn merge(
&mut self,
list1: Vec<Updates<U>>,
list2: Vec<Updates<U>>,
output: &mut Vec<Updates<U>>,
_stash: &mut Vec<Updates<U>>,
) {
Self::merge_batches(list1, list2, output, _stash);
}
fn extract(
&mut self,
merged: Vec<Self::Chunk>,
upper: AntichainRef<Self::Time>,
frontier: &mut Antichain<Self::Time>,
ship: &mut Vec<Self::Chunk>,
kept: &mut Vec<Self::Chunk>,
_stash: &mut Vec<Self::Chunk>,
) {
let all = merged.iter().flat_map(|chunk| chunk.iter());
let mut time_owned = U::Time::default();
let mut keep_vec = Vec::new();
let mut ship_vec = Vec::new();
for (k, v, t, d) in all {
Columnar::copy_from(&mut time_owned, t);
if upper.less_equal(&time_owned) {
frontier.insert_ref(&time_owned);
keep_vec.push((k, v, t, d));
} else {
ship_vec.push((k, v, t, d));
}
}
form_chunks::<U>(keep_vec.into_iter(), kept);
form_chunks::<U>(ship_vec.into_iter(), ship);
}
fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
use timely::Accountable;
(chunk.record_count() as usize, 0, 0, 0)
}
}
impl<U: Update> TrieMerger<U>
where
U::Time: 'static,
{
#[allow(dead_code)]
fn merge_iterator(
list1: &[Updates<U>],
list2: &[Updates<U>],
output: &mut Vec<Updates<U>>,
) {
let iter1 = list1.iter().flat_map(|chunk| chunk.iter());
let iter2 = list2.iter().flat_map(|chunk| chunk.iter());
let merged = Merging {
iter1: iter1.peekable(),
iter2: iter2.peekable(),
};
form_chunks::<U>(merged, output);
}
#[inline(never)]
fn merge_batches(
list1: Vec<Updates<U>>,
list2: Vec<Updates<U>>,
output: &mut Vec<Updates<U>>,
stash: &mut Vec<Updates<U>>,
) {
let mut builder = ChainBuilder::default();
let mut queue1: std::collections::VecDeque<_> = list1.into();
let mut queue2: std::collections::VecDeque<_> = list2.into();
let mut cursor1 = queue1.pop_front().map(|b| ((0,0,0), b));
let mut cursor2 = queue2.pop_front().map(|b| ((0,0,0), b));
while cursor1.is_some() && cursor2.is_some() {
Self::merge_batch(&mut cursor1, &mut cursor2, &mut builder, stash);
if cursor1.is_none() { cursor1 = queue1.pop_front().map(|b| ((0,0,0), b)); }
if cursor2.is_none() { cursor2 = queue2.pop_front().map(|b| ((0,0,0), b)); }
}
if let Some(((k,v,t),batch)) = cursor1 {
let mut out_batch = stash.pop().unwrap_or_default();
let empty: Updates<U> = Default::default();
write_from_surveys(
&batch,
&empty,
&[Report::This(0, 1)],
&[Report::This(k, batch.keys.values.len())],
&[Report::This(v, batch.vals.values.len())],
&[Report::This(t, batch.times.values.len())],
&mut out_batch,
);
builder.push(out_batch);
}
if let Some(((k,v,t),batch)) = cursor2 {
let mut out_batch = stash.pop().unwrap_or_default();
let empty: Updates<U> = Default::default();
write_from_surveys(
&empty,
&batch,
&[Report::That(0, 1)],
&[Report::That(k, batch.keys.values.len())],
&[Report::That(v, batch.vals.values.len())],
&[Report::That(t, batch.times.values.len())],
&mut out_batch,
);
builder.push(out_batch);
}
builder.extend(queue1);
builder.extend(queue2);
*output = builder.done();
}
#[inline(never)]
fn merge_batch(
batch1: &mut Option<((usize, usize, usize), Updates<U>)>,
batch2: &mut Option<((usize, usize, usize), Updates<U>)>,
builder: &mut ChainBuilder<U>,
stash: &mut Vec<Updates<U>>,
) {
let ((k0_idx, v0_idx, t0_idx), updates0) = batch1.take().unwrap();
let ((k1_idx, v1_idx, t1_idx), updates1) = batch2.take().unwrap();
use columnar::Borrow;
let keys0 = updates0.keys.borrow();
let keys1 = updates1.keys.borrow();
let vals0 = updates0.vals.borrow();
let vals1 = updates1.vals.borrow();
let times0 = updates0.times.borrow();
let times1 = updates1.times.borrow();
let mut key_survey = survey::<columnar::ContainerOf<U::Key>>(keys0, keys1, &[Report::Both(0,0)]);
let mut val_survey = survey::<columnar::ContainerOf<U::Val>>(vals0, vals1, &key_survey);
let mut time_survey = survey::<columnar::ContainerOf<U::Time>>(times0, times1, &val_survey);
if (k0_idx, v0_idx, t0_idx) != (0,0,0) {
let mut done = false; while !done { if let Report::This(l,u) = &mut key_survey[0] { if *u <= k0_idx { key_survey.remove(0); } else { *l = k0_idx; done = true; } } else { done = true; } }
let mut done = false; while !done { if let Report::This(l,u) = &mut val_survey[0] { if *u <= v0_idx { val_survey.remove(0); } else { *l = v0_idx; done = true; } } else { done = true; } }
let mut done = false; while !done { if let Report::This(l,u) = &mut time_survey[0] { if *u <= t0_idx { time_survey.remove(0); } else { *l = t0_idx; done = true; } } else { done = true; } }
}
if (k1_idx, v1_idx, t1_idx) != (0,0,0) {
let mut done = false; while !done { if let Report::That(l,u) = &mut key_survey[0] { if *u <= k1_idx { key_survey.remove(0); } else { *l = k1_idx; done = true; } } else { done = true; } }
let mut done = false; while !done { if let Report::That(l,u) = &mut val_survey[0] { if *u <= v1_idx { val_survey.remove(0); } else { *l = v1_idx; done = true; } } else { done = true; } }
let mut done = false; while !done { if let Report::That(l,u) = &mut time_survey[0] { if *u <= t1_idx { time_survey.remove(0); } else { *l = t1_idx; done = true; } } else { done = true; } }
}
let next_cursor = match time_survey.last().unwrap() {
Report::This(_,_) => {
let mut t = times0.values.len();
while let Some(Report::This(l,_)) = time_survey.last() { t = *l; time_survey.pop(); }
let mut v = vals0.values.len();
while let Some(Report::This(l,_)) = val_survey.last() { v = *l; val_survey.pop(); }
let mut k = keys0.values.len();
while let Some(Report::This(l,_)) = key_survey.last() { k = *l; key_survey.pop(); }
if v == times0.len() || times0.bounds.bounds(v).0 > t { v -= 1; }
if k == vals0.len() || vals0.bounds.bounds(k).0 > v { k -= 1; }
Some(Ok((k,v,t)))
}
Report::Both(_,_) => { None }
Report::That(_,_) => {
let mut t = times1.values.len();
while let Some(Report::That(l,_)) = time_survey.last() { t = *l; time_survey.pop(); }
let mut v = vals1.values.len();
while let Some(Report::That(l,_)) = val_survey.last() { v = *l; val_survey.pop(); }
let mut k = keys1.values.len();
while let Some(Report::That(l,_)) = key_survey.last() { k = *l; key_survey.pop(); }
if v == times1.len() || times1.bounds.bounds(v).0 > t { v -= 1; }
if k == vals1.len() || vals1.bounds.bounds(k).0 > v { k -= 1; }
Some(Err((k,v,t)))
}
};
let mut out_batch = stash.pop().unwrap_or_default();
write_from_surveys(&updates0, &updates1, &[Report::Both(0,0)], &key_survey, &val_survey, &time_survey, &mut out_batch);
builder.push(out_batch);
match next_cursor {
Some(Ok(kvt)) => { *batch1 = Some((kvt, updates0)); }
Some(Err(kvt)) => {*batch2 = Some((kvt, updates1)); }
None => { }
}
}
}
#[inline(never)]
fn write_from_surveys<U: Update>(
updates0: &Updates<U>,
updates1: &Updates<U>,
root_survey: &[Report],
key_survey: &[Report],
val_survey: &[Report],
time_survey: &[Report],
output: &mut Updates<U>,
) {
use columnar::Borrow;
write_layer(updates0.keys.borrow(), updates1.keys.borrow(), root_survey, key_survey, &mut output.keys);
write_layer(updates0.vals.borrow(), updates1.vals.borrow(), key_survey, val_survey, &mut output.vals);
write_layer(updates0.times.borrow(), updates1.times.borrow(), val_survey, time_survey, &mut output.times);
write_diffs::<U>(updates0.diffs.borrow(), updates1.diffs.borrow(), time_survey, &mut output.diffs);
}
#[inline(never)]
pub fn survey<'a, C: columnar::Container<Ref<'a>: Ord>>(
lists0: <crate::updates::Lists<C> as columnar::Borrow>::Borrowed<'a>,
lists1: <crate::updates::Lists<C> as columnar::Borrow>::Borrowed<'a>,
reports: &[Report],
) -> Vec<Report> {
use columnar::Index;
let mut output = Vec::with_capacity(reports.len()); for report in reports.iter() {
match report {
Report::This(lower0, upper0) => {
let (new_lower, _) = lists0.bounds.bounds(*lower0);
let (_, new_upper) = lists0.bounds.bounds(*upper0-1);
output.push(Report::This(new_lower, new_upper));
}
Report::Both(index0, index1) => {
let (mut lower0, upper0) = lists0.bounds.bounds(*index0);
let (mut lower1, upper1) = lists1.bounds.bounds(*index1);
while lower0 < upper0 && lower1 < upper1 {
let val0 = lists0.values.get(lower0);
let val1 = lists1.values.get(lower1);
match val0.cmp(&val1) {
std::cmp::Ordering::Less => {
let start = lower0;
lower0 += 1;
gallop(lists0.values, &mut lower0, upper0, |x| x < val1);
output.push(Report::This(start, lower0));
},
std::cmp::Ordering::Equal => {
output.push(Report::Both(lower0, lower1));
lower0 += 1;
lower1 += 1;
},
std::cmp::Ordering::Greater => {
let start = lower1;
lower1 += 1;
gallop(lists1.values, &mut lower1, upper1, |x| x < val0);
output.push(Report::That(start, lower1));
},
}
}
if lower0 < upper0 { output.push(Report::This(lower0, upper0)); }
if lower1 < upper1 { output.push(Report::That(lower1, upper1)); }
}
Report::That(lower1, upper1) => {
let (new_lower, _) = lists1.bounds.bounds(*lower1);
let (_, new_upper) = lists1.bounds.bounds(*upper1-1);
output.push(Report::That(new_lower, new_upper));
}
}
}
output
}
#[inline(never)]
pub fn write_layer<'a, C: columnar::Container<Ref<'a>: Ord>>(
lists0: <crate::updates::Lists<C> as columnar::Borrow>::Borrowed<'a>,
lists1: <crate::updates::Lists<C> as columnar::Borrow>::Borrowed<'a>,
list_survey: &[Report],
item_survey: &[Report],
output: &mut crate::updates::Lists<C>,
) {
use columnar::{Container, Index, Len, Push};
let mut item_idx = 0;
for (pos, list_report) in list_survey.iter().enumerate() {
let is_first = pos == 0;
let is_last = pos == list_survey.len() - 1;
let may_be_pruned = is_first || is_last;
match list_report {
Report::This(lo, hi) => {
let Report::This(item_lo, item_hi) = item_survey[item_idx] else { unreachable!("Expected This in item survey for This list") };
item_idx += 1;
if may_be_pruned {
let base = output.values.len();
output.values.extend_from_self(lists0.values, item_lo..item_hi);
for i in *lo..*hi {
let (_, nat_hi) = lists0.bounds.bounds(i);
output.bounds.push((base + nat_hi.min(item_hi) - item_lo) as u64);
}
} else {
output.extend_from_self(lists0, *lo..*hi);
}
}
Report::That(lo, hi) => {
let Report::That(item_lo, item_hi) = item_survey[item_idx] else { unreachable!("Expected That in item survey for That list") };
item_idx += 1;
if may_be_pruned {
let base = output.values.len();
output.values.extend_from_self(lists1.values, item_lo..item_hi);
for i in *lo..*hi {
let (_, nat_hi) = lists1.bounds.bounds(i);
output.bounds.push((base + nat_hi.min(item_hi) - item_lo) as u64);
}
} else {
output.extend_from_self(lists1, *lo..*hi);
}
}
Report::Both(i0, i1) => {
let (mut c0, end0) = lists0.bounds.bounds(*i0);
let (mut c1, end1) = lists1.bounds.bounds(*i1);
while (c0 < end0 || c1 < end1) && item_idx < item_survey.len() {
match item_survey[item_idx] {
Report::This(lo, hi) => {
if lo >= end0 { break; }
output.values.extend_from_self(lists0.values, lo..hi);
c0 = hi;
}
Report::That(lo, hi) => {
if lo >= end1 { break; }
output.values.extend_from_self(lists1.values, lo..hi);
c1 = hi;
}
Report::Both(v0, v1) => {
if v0 >= end0 && v1 >= end1 { break; }
output.values.push(lists0.values.get(v0));
c0 = v0 + 1;
c1 = v1 + 1;
}
}
item_idx += 1;
}
output.bounds.push(output.values.len() as u64);
}
}
}
}
#[inline(never)]
pub fn write_diffs<U: crate::layout::ColumnarUpdate>(
diffs0: <crate::updates::Lists<columnar::ContainerOf<U::Diff>> as columnar::Borrow>::Borrowed<'_>,
diffs1: <crate::updates::Lists<columnar::ContainerOf<U::Diff>> as columnar::Borrow>::Borrowed<'_>,
time_survey: &[Report],
output: &mut crate::updates::Lists<columnar::ContainerOf<U::Diff>>,
) {
use columnar::{Columnar, Container, Index, Len, Push};
use differential_dataflow::difference::{Semigroup, IsZero};
for report in time_survey.iter() {
match report {
Report::This(lo, hi) => { output.extend_from_self(diffs0, *lo..*hi); }
Report::That(lo, hi) => { output.extend_from_self(diffs1, *lo..*hi); }
Report::Both(t0, t1) => {
let (d0_lo, d0_hi) = diffs0.bounds.bounds(*t0);
let (d1_lo, d1_hi) = diffs1.bounds.bounds(*t1);
assert_eq!(d0_hi - d0_lo, 1, "Expected singleton diff list at t0={t0}");
assert_eq!(d1_hi - d1_lo, 1, "Expected singleton diff list at t1={t1}");
let mut diff: U::Diff = Columnar::into_owned(diffs0.values.get(d0_lo));
diff.plus_equals(&Columnar::into_owned(diffs1.values.get(d1_lo)));
if !diff.is_zero() { output.values.push(&diff); }
output.bounds.push(output.values.len() as u64);
}
}
}
}
#[inline(always)]
pub(crate) fn gallop<C: columnar::Index>(input: C, lower: &mut usize, upper: usize, mut cmp: impl FnMut(<C as columnar::Index>::Ref) -> bool) {
if *lower < upper && cmp(input.get(*lower)) {
let mut step = 1;
while *lower + step < upper && cmp(input.get(*lower + step)) {
*lower += step;
step <<= 1;
}
step >>= 1;
while step > 0 {
if *lower + step < upper && cmp(input.get(*lower + step)) {
*lower += step;
}
step >>= 1;
}
*lower += 1;
}
}
#[derive(Copy, Clone, Columnar, Debug)]
pub enum Report {
This(usize, usize),
That(usize, usize),
Both(usize, usize),
}
pub struct ChainBuilder<U: crate::layout::ColumnarUpdate> {
updates: Vec<Updates<U>>,
}
impl<U: crate::layout::ColumnarUpdate> Default for ChainBuilder<U> { fn default() -> Self { Self { updates: Default::default() } } }
impl<U: crate::layout::ColumnarUpdate> ChainBuilder<U> {
fn push(&mut self, mut link: Updates<U>) {
link = link.filter_zero();
if link.len() > 0 {
if let Some(last) = self.updates.last_mut() {
if last.len() + link.len() < 2 * 64 * 1024 {
let mut build = crate::updates::UpdatesBuilder::new_from(std::mem::take(last));
build.meld(&link);
*last = build.done();
}
else { self.updates.push(link); }
}
else { self.updates.push(link); }
}
}
fn extend(&mut self, iter: impl IntoIterator<Item=Updates<U>>) { for link in iter { self.push(link); }}
fn done(self) -> Vec<Updates<U>> { self.updates }
}
}
use builder::ValMirror;
pub mod builder {
use differential_dataflow::trace::implementations::ord_neu::{Vals, Upds};
use differential_dataflow::trace::implementations::ord_neu::val_batch::{OrdValBatch, OrdValStorage};
use differential_dataflow::trace::Description;
use crate::Updates;
use crate::layout::ColumnarUpdate as Update;
use crate::layout::ColumnarLayout as Layout;
use crate::arrangement::Coltainer;
use columnar::{Borrow, IndexAs};
use columnar::primitive::offsets::Strides;
use differential_dataflow::trace::implementations::OffsetList;
fn strides_to_offset_list(bounds: &Strides, count: usize) -> OffsetList {
let mut output = OffsetList::with_capacity(count);
output.push(0);
let bounds_b = bounds.borrow();
for i in 0..count {
output.push(bounds_b.index_as(i) as usize);
}
output
}
pub struct ValMirror<U: Update> {
chunks: Vec<Updates<U>>,
}
impl<U: Update> differential_dataflow::trace::Builder for ValMirror<U> {
type Time = U::Time;
type Input = Updates<U>;
type Output = OrdValBatch<Layout<U>>;
fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
Self { chunks: Vec::new() }
}
fn push(&mut self, chunk: &mut Self::Input) {
if chunk.len() > 0 {
self.chunks.push(std::mem::take(chunk));
}
}
fn done(self, description: Description<Self::Time>) -> Self::Output {
let mut chain = self.chunks;
Self::seal(&mut chain, description)
}
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
use columnar::Len;
use columnar::{Borrow, Container};
let mut updates = Updates::<U>::default();
updates.keys.reserve_for(chain.iter().map(|c| c.keys.borrow()));
updates.vals.reserve_for(chain.iter().map(|c| c.vals.borrow()));
updates.times.reserve_for(chain.iter().map(|c| c.times.borrow()));
updates.diffs.reserve_for(chain.iter().map(|c| c.diffs.borrow()));
let mut builder = crate::updates::UpdatesBuilder::new_from(updates);
for chunk in chain.iter() {
builder.meld(chunk);
}
let merged = builder.done();
chain.clear();
let updates = Len::len(&merged.diffs.values);
if updates == 0 {
let storage = OrdValStorage {
keys: Default::default(),
vals: Default::default(),
upds: Default::default(),
};
OrdValBatch { storage, description, updates: 0 }
} else {
let val_offs = strides_to_offset_list(&merged.vals.bounds, Len::len(&merged.keys.values));
let time_offs = strides_to_offset_list(&merged.times.bounds, Len::len(&merged.vals.values));
let storage = OrdValStorage {
keys: Coltainer { container: merged.keys.values },
vals: Vals {
offs: val_offs,
vals: Coltainer { container: merged.vals.values },
},
upds: Upds {
offs: time_offs,
times: Coltainer { container: merged.times.values },
diffs: Coltainer { container: merged.diffs.values },
},
};
OrdValBatch { storage, description, updates }
}
}
}
}
}
pub mod updates {
use columnar::{Columnar, Container, ContainerOf, Vecs, Borrow, Index, IndexAs, Len, Push};
use columnar::primitive::offsets::Strides;
use differential_dataflow::difference::{Semigroup, IsZero};
use crate::layout::ColumnarUpdate as Update;
pub type Lists<C> = Vecs<C, Strides>;
pub struct Updates<U: Update> {
pub keys: Lists<ContainerOf<U::Key>>,
pub vals: Lists<ContainerOf<U::Val>>,
pub times: Lists<ContainerOf<U::Time>>,
pub diffs: Lists<ContainerOf<U::Diff>>,
}
impl<U: Update> Default for Updates<U> {
fn default() -> Self {
Self {
keys: Default::default(),
vals: Default::default(),
times: Default::default(),
diffs: Default::default(),
}
}
}
impl<U: Update> std::fmt::Debug for Updates<U> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Updates").finish()
}
}
impl<U: Update> Clone for Updates<U> {
fn clone(&self) -> Self {
Self {
keys: self.keys.clone(),
vals: self.vals.clone(),
times: self.times.clone(),
diffs: self.diffs.clone(),
}
}
}
pub type Tuple<U> = (<U as Update>::Key, <U as Update>::Val, <U as Update>::Time, <U as Update>::Diff);
#[inline]
pub fn child_range<B: IndexAs<u64>>(bounds: B, i: usize) -> std::ops::Range<usize> {
let lower = if i == 0 { 0 } else { bounds.index_as(i - 1) as usize };
let upper = bounds.index_as(i) as usize;
lower..upper
}
pub struct Consolidating<I: Iterator, D> {
iter: std::iter::Peekable<I>,
diff: D,
}
impl<K, V, T, D, I> Consolidating<I, D>
where
K: Copy + Eq,
V: Copy + Eq,
T: Copy + Eq,
D: Semigroup + IsZero + Default,
I: Iterator<Item = (K, V, T, D)>,
{
pub fn new(iter: I) -> Self {
Self { iter: iter.peekable(), diff: D::default() }
}
}
impl<K, V, T, D, I> Iterator for Consolidating<I, D>
where
K: Copy + Eq,
V: Copy + Eq,
T: Copy + Eq,
D: Semigroup + IsZero + Default + Clone,
I: Iterator<Item = (K, V, T, D)>,
{
type Item = (K, V, T, D);
fn next(&mut self) -> Option<Self::Item> {
loop {
let (k, v, t, d) = self.iter.next()?;
self.diff = d;
while let Some(&(k2, v2, t2, _)) = self.iter.peek() {
if k2 == k && v2 == v && t2 == t {
let (_, _, _, d2) = self.iter.next().unwrap();
self.diff.plus_equals(&d2);
} else {
break;
}
}
if !self.diff.is_zero() {
return Some((k, v, t, self.diff.clone()));
}
}
}
}
impl<U: Update> Updates<U> {
pub fn vals_bounds(&self, key_range: std::ops::Range<usize>) -> std::ops::Range<usize> {
if !key_range.is_empty() {
let bounds = self.vals.bounds.borrow();
let lower = if key_range.start == 0 { 0 } else { bounds.index_as(key_range.start - 1) as usize };
let upper = bounds.index_as(key_range.end - 1) as usize;
lower..upper
} else { key_range }
}
pub fn times_bounds(&self, val_range: std::ops::Range<usize>) -> std::ops::Range<usize> {
if !val_range.is_empty() {
let bounds = self.times.bounds.borrow();
let lower = if val_range.start == 0 { 0 } else { bounds.index_as(val_range.start - 1) as usize };
let upper = bounds.index_as(val_range.end - 1) as usize;
lower..upper
} else { val_range }
}
pub fn diffs_bounds(&self, time_range: std::ops::Range<usize>) -> std::ops::Range<usize> {
if !time_range.is_empty() {
let bounds = self.diffs.bounds.borrow();
let lower = if time_range.start == 0 { 0 } else { bounds.index_as(time_range.start - 1) as usize };
let upper = bounds.index_as(time_range.end - 1) as usize;
lower..upper
} else { time_range }
}
pub fn extend_from_keys(&mut self, other: &Self, key_range: std::ops::Range<usize>) {
self.keys.values.extend_from_self(other.keys.values.borrow(), key_range.clone());
self.vals.extend_from_self(other.vals.borrow(), key_range.clone());
let val_range = other.vals_bounds(key_range);
self.times.extend_from_self(other.times.borrow(), val_range.clone());
let time_range = other.times_bounds(val_range);
self.diffs.extend_from_self(other.diffs.borrow(), time_range);
}
pub fn extend_from_vals(&mut self, other: &Self, val_range: std::ops::Range<usize>) {
self.vals.values.extend_from_self(other.vals.values.borrow(), val_range.clone());
self.times.extend_from_self(other.times.borrow(), val_range.clone());
let time_range = other.times_bounds(val_range);
self.diffs.extend_from_self(other.diffs.borrow(), time_range);
}
pub fn form_unsorted<'a>(unsorted: impl Iterator<Item = columnar::Ref<'a, Tuple<U>>>) -> Self {
let mut data = unsorted.collect::<Vec<_>>();
data.sort();
Self::form(data.into_iter())
}
pub fn form<'a>(sorted: impl Iterator<Item = columnar::Ref<'a, Tuple<U>>>) -> Self {
let consolidated = Consolidating::new(
sorted.map(|(k, v, t, d)| (k, v, t, <U::Diff as Columnar>::into_owned(d)))
);
let mut output = Self::default();
let mut updates = consolidated;
if let Some((key, val, time, diff)) = updates.next() {
let mut prev = (key, val, time);
output.keys.values.push(key);
output.vals.values.push(val);
output.times.values.push(time);
output.diffs.values.push(&diff);
output.diffs.bounds.push(output.diffs.values.len() as u64);
for (key, val, time, diff) in updates {
if key != prev.0 {
output.vals.bounds.push(output.vals.values.len() as u64);
output.times.bounds.push(output.times.values.len() as u64);
output.keys.values.push(key);
output.vals.values.push(val);
}
else if val != prev.1 {
output.times.bounds.push(output.times.values.len() as u64);
output.vals.values.push(val);
}
else {
assert!(time != prev.2);
}
output.times.values.push(time);
output.diffs.values.push(&diff);
output.diffs.bounds.push(output.diffs.values.len() as u64);
prev = (key, val, time);
}
output.keys.bounds.push(output.keys.values.len() as u64);
output.vals.bounds.push(output.vals.values.len() as u64);
output.times.bounds.push(output.times.values.len() as u64);
}
output
}
pub fn consolidate(self) -> Self { Self::form_unsorted(self.iter()) }
pub fn filter_zero(self) -> Self { Self::form(self.iter()) }
pub fn len(&self) -> usize { self.diffs.values.len() }
}
impl<KP, VP, TP, DP, U: Update> Push<(KP, VP, TP, DP)> for Updates<U>
where
ContainerOf<U::Key>: Push<KP>,
ContainerOf<U::Val>: Push<VP>,
ContainerOf<U::Time>: Push<TP>,
ContainerOf<U::Diff>: Push<DP>,
{
fn push(&mut self, (key, val, time, diff): (KP, VP, TP, DP)) {
self.keys.values.push(key);
self.keys.bounds.push(self.keys.values.len() as u64);
self.vals.values.push(val);
self.vals.bounds.push(self.vals.values.len() as u64);
self.times.values.push(time);
self.times.bounds.push(self.times.values.len() as u64);
self.diffs.values.push(diff);
self.diffs.bounds.push(self.diffs.values.len() as u64);
}
}
impl<U: Update> timely::container::PushInto<((U::Key, U::Val), U::Time, U::Diff)> for Updates<U> {
fn push_into(&mut self, ((key, val), time, diff): ((U::Key, U::Val), U::Time, U::Diff)) {
self.push((&key, &val, &time, &diff));
}
}
impl<U: Update> Updates<U> {
pub fn iter(&self) -> impl Iterator<Item = (
columnar::Ref<'_, U::Key>,
columnar::Ref<'_, U::Val>,
columnar::Ref<'_, U::Time>,
columnar::Ref<'_, U::Diff>,
)> {
let keys_b = self.keys.borrow();
let vals_b = self.vals.borrow();
let times_b = self.times.borrow();
let diffs_b = self.diffs.borrow();
(0..Len::len(&keys_b))
.flat_map(move |outer| child_range(keys_b.bounds, outer))
.flat_map(move |k| {
let key = keys_b.values.get(k);
child_range(vals_b.bounds, k).map(move |v| (key, v))
})
.flat_map(move |(key, v)| {
let val = vals_b.values.get(v);
child_range(times_b.bounds, v).map(move |t| (key, val, t))
})
.flat_map(move |(key, val, t)| {
let time = times_b.values.get(t);
child_range(diffs_b.bounds, t).map(move |d| (key, val, time, diffs_b.values.get(d)))
})
}
}
impl<U: Update> timely::Accountable for Updates<U> {
#[inline] fn record_count(&self) -> i64 { Len::len(&self.diffs.values) as i64 }
}
impl<U: Update> timely::dataflow::channels::ContainerBytes for Updates<U> {
fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() }
fn length_in_bytes(&self) -> usize { unimplemented!() }
fn into_bytes<W: std::io::Write>(&self, _writer: &mut W) { unimplemented!() }
}
pub struct UpdatesBuilder<U: Update> {
updates: Updates<U>,
}
impl<U: Update> UpdatesBuilder<U> {
pub fn new_from(mut updates: Updates<U>) -> Self {
use columnar::Len;
if Len::len(&updates.keys.values) > 0 {
updates.keys.bounds.pop();
updates.vals.bounds.pop();
updates.times.bounds.pop();
}
Self { updates }
}
pub fn meld(&mut self, chunk: &Updates<U>) {
use columnar::{Borrow, Index, Len};
if chunk.len() == 0 { return; }
if Len::len(&self.updates.keys.values) == 0 {
self.updates = chunk.clone();
self.updates.keys.bounds.pop();
self.updates.vals.bounds.pop();
self.updates.times.bounds.pop();
return;
}
let keys_match = {
let skb = self.updates.keys.values.borrow();
let ckb = chunk.keys.values.borrow();
skb.get(Len::len(&skb) - 1) == ckb.get(0)
};
let vals_match = keys_match && {
let svb = self.updates.vals.values.borrow();
let cvb = chunk.vals.values.borrow();
svb.get(Len::len(&svb) - 1) == cvb.get(0)
};
let chunk_num_keys = Len::len(&chunk.keys.values);
let chunk_num_vals = Len::len(&chunk.vals.values);
let chunk_num_times = Len::len(&chunk.times.values);
let first_key_vals = child_range(chunk.vals.borrow().bounds, 0);
let first_val_times = child_range(chunk.times.borrow().bounds, 0);
let mut differ = false;
if keys_match {
if chunk_num_keys > 1 {
self.updates.keys.values.extend_from_self(chunk.keys.values.borrow(), 1..chunk_num_keys);
}
} else {
self.updates.keys.values.extend_from_self(chunk.keys.values.borrow(), 0..chunk_num_keys);
differ = true;
}
if differ {
self.updates.vals.bounds.push(Len::len(&self.updates.vals.values) as u64);
self.updates.vals.extend_from_self(chunk.vals.borrow(), 0..chunk_num_keys);
self.updates.vals.bounds.pop();
} else {
if vals_match {
if first_key_vals.len() > 1 {
self.updates.vals.values.extend_from_self(
chunk.vals.values.borrow(),
(first_key_vals.start + 1)..first_key_vals.end,
);
}
} else {
self.updates.vals.values.extend_from_self(
chunk.vals.values.borrow(),
first_key_vals.clone(),
);
differ = true;
}
if chunk_num_keys > 1 {
self.updates.vals.bounds.push(Len::len(&self.updates.vals.values) as u64);
self.updates.vals.extend_from_self(chunk.vals.borrow(), 1..chunk_num_keys);
self.updates.vals.bounds.pop();
}
}
if differ {
self.updates.times.bounds.push(Len::len(&self.updates.times.values) as u64);
self.updates.times.extend_from_self(chunk.times.borrow(), 0..chunk_num_vals);
self.updates.times.bounds.pop();
} else {
debug_assert!({
let stb = self.updates.times.values.borrow();
let ctb = chunk.times.values.borrow();
stb.get(Len::len(&stb) - 1) != ctb.get(0)
}, "meld: duplicate time within same (key, val)");
self.updates.times.values.extend_from_self(
chunk.times.values.borrow(),
first_val_times.clone(),
);
differ = true;
if chunk_num_vals > 1 {
self.updates.times.bounds.push(Len::len(&self.updates.times.values) as u64);
self.updates.times.extend_from_self(chunk.times.borrow(), 1..chunk_num_vals);
self.updates.times.bounds.pop();
}
}
debug_assert!(differ);
self.updates.diffs.extend_from_self(chunk.diffs.borrow(), 0..chunk_num_times);
}
pub fn done(mut self) -> Updates<U> {
use columnar::Len;
if Len::len(&self.updates.keys.values) > 0 {
self.updates.times.bounds.push(Len::len(&self.updates.times.values) as u64);
self.updates.vals.bounds.push(Len::len(&self.updates.vals.values) as u64);
self.updates.keys.bounds.push(Len::len(&self.updates.keys.values) as u64);
}
self.updates
}
}
#[cfg(test)]
mod tests {
use super::*;
use columnar::Push;
type TestUpdate = (u64, u64, u64, i64);
fn collect(updates: &Updates<TestUpdate>) -> Vec<(u64, u64, u64, i64)> {
updates.iter().map(|(k, v, t, d)| (*k, *v, *t, *d)).collect()
}
#[test]
fn test_push_and_consolidate_basic() {
let mut updates = Updates::<TestUpdate>::default();
updates.push((&1, &10, &100, &1));
updates.push((&1, &10, &100, &2));
updates.push((&2, &20, &200, &5));
assert_eq!(updates.len(), 3);
assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 3), (2, 20, 200, 5)]);
}
#[test]
fn test_cancellation() {
let mut updates = Updates::<TestUpdate>::default();
updates.push((&1, &10, &100, &3));
updates.push((&1, &10, &100, &-3));
updates.push((&2, &20, &200, &1));
assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 1)]);
}
#[test]
fn test_multiple_vals_and_times() {
let mut updates = Updates::<TestUpdate>::default();
updates.push((&1, &10, &100, &1));
updates.push((&1, &10, &200, &2));
updates.push((&1, &20, &100, &3));
updates.push((&1, &20, &100, &4));
assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 1), (1, 10, 200, 2), (1, 20, 100, 7)]);
}
#[test]
fn test_val_cancellation_propagates() {
let mut updates = Updates::<TestUpdate>::default();
updates.push((&1, &10, &100, &5));
updates.push((&1, &10, &100, &-5));
updates.push((&1, &20, &100, &1));
assert_eq!(collect(&updates.consolidate()), vec![(1, 20, 100, 1)]);
}
#[test]
fn test_empty() {
let updates = Updates::<TestUpdate>::default();
assert_eq!(collect(&updates.consolidate()), vec![]);
}
#[test]
fn test_total_cancellation() {
let mut updates = Updates::<TestUpdate>::default();
updates.push((&1, &10, &100, &1));
updates.push((&1, &10, &100, &-1));
assert_eq!(collect(&updates.consolidate()), vec![]);
}
#[test]
fn test_unsorted_input() {
let mut updates = Updates::<TestUpdate>::default();
updates.push((&3, &30, &300, &1));
updates.push((&1, &10, &100, &2));
updates.push((&2, &20, &200, &3));
assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 2), (2, 20, 200, 3), (3, 30, 300, 1)]);
}
#[test]
fn test_first_key_cancels() {
let mut updates = Updates::<TestUpdate>::default();
updates.push((&1, &10, &100, &5));
updates.push((&1, &10, &100, &-5));
updates.push((&2, &20, &200, &3));
assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 3)]);
}
#[test]
fn test_middle_time_cancels() {
let mut updates = Updates::<TestUpdate>::default();
updates.push((&1, &10, &100, &1));
updates.push((&1, &10, &200, &2));
updates.push((&1, &10, &200, &-2));
updates.push((&1, &10, &300, &3));
assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 1), (1, 10, 300, 3)]);
}
#[test]
fn test_first_val_cancels() {
let mut updates = Updates::<TestUpdate>::default();
updates.push((&1, &10, &100, &1));
updates.push((&1, &10, &100, &-1));
updates.push((&1, &20, &100, &5));
assert_eq!(collect(&updates.consolidate()), vec![(1, 20, 100, 5)]);
}
#[test]
fn test_interleaved_cancellations() {
let mut updates = Updates::<TestUpdate>::default();
updates.push((&1, &10, &100, &1));
updates.push((&1, &10, &100, &-1));
updates.push((&2, &20, &200, &7));
updates.push((&3, &30, &300, &4));
updates.push((&3, &30, &300, &-4));
assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 7)]);
}
}
}
pub fn join_function<U, I, L>(
input: differential_dataflow::Collection<U::Time, RecordedUpdates<U>>,
mut logic: L,
) -> differential_dataflow::Collection<U::Time, RecordedUpdates<U>>
where
U::Time: differential_dataflow::lattice::Lattice,
U: layout::ColumnarUpdate<Diff: differential_dataflow::difference::Multiply<U::Diff, Output = U::Diff>>,
I: IntoIterator<Item = (U::Key, U::Val, U::Time, U::Diff)>,
L: FnMut(
columnar::Ref<'_, U::Key>,
columnar::Ref<'_, U::Val>,
columnar::Ref<'_, U::Time>,
columnar::Ref<'_, U::Diff>,
) -> I + 'static,
{
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Pipeline;
use differential_dataflow::AsCollection;
use differential_dataflow::difference::Multiply;
use differential_dataflow::lattice::Lattice;
use columnar::Columnar;
input
.inner
.unary::<ValColBuilder<U>, _, _, _>(Pipeline, "JoinFunction", move |_, _| {
move |input, output| {
let mut t1o = U::Time::default();
let mut d1o = U::Diff::default();
input.for_each(|time, data| {
let mut session = output.session_with_builder(&time);
for (k1, v1, t1, d1) in data.updates.iter() {
Columnar::copy_from(&mut t1o, t1);
Columnar::copy_from(&mut d1o, d1);
for (k2, v2, t2, d2) in logic(k1, v1, t1, d1) {
let t3 = t2.join(&t1o);
let d3 = d2.multiply(&d1o);
session.give((&k2, &v2, &t3, &d3));
}
}
});
}
})
.as_collection()
}
type DynTime = timely::order::Product<u64, differential_dataflow::dynamic::pointstamp::PointStamp<u64>>;
pub fn leave_dynamic<K, V, R>(
input: differential_dataflow::Collection<DynTime, RecordedUpdates<(K, V, DynTime, R)>>,
level: usize,
) -> differential_dataflow::Collection<DynTime, RecordedUpdates<(K, V, DynTime, R)>>
where
K: columnar::Columnar,
V: columnar::Columnar,
R: columnar::Columnar,
(K, V, DynTime, R): layout::ColumnarUpdate<Key = K, Val = V, Time = DynTime, Diff = R>,
{
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::operators::generic::OutputBuilder;
use timely::order::Product;
use timely::progress::Antichain;
use timely::container::{ContainerBuilder, PushInto};
use differential_dataflow::AsCollection;
use differential_dataflow::dynamic::pointstamp::{PointStamp, PointStampSummary};
use columnar::Columnar;
let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), input.inner.scope());
let (output, stream) = builder.new_output();
let mut output = OutputBuilder::from(output);
let mut op_input = builder.new_input_connection(
input.inner,
Pipeline,
[(
0,
Antichain::from_elem(Product {
outer: Default::default(),
inner: PointStampSummary {
retain: Some(level - 1),
actions: Vec::new(),
},
}),
)],
);
builder.build(move |_capability| {
let mut col_builder = ValColBuilder::<(K, V, DynTime, R)>::default();
let mut time = DynTime::default();
move |_frontier| {
let mut output = output.activate();
op_input.for_each(|cap, data| {
let mut new_time = cap.time().clone();
let mut vec = std::mem::take(&mut new_time.inner).into_inner();
vec.truncate(level - 1);
new_time.inner = PointStamp::new(vec);
let new_cap = cap.delayed(&new_time, 0);
for (k, v, t, d) in data.updates.iter() {
Columnar::copy_from(&mut time, t);
let mut inner_vec = std::mem::take(&mut time.inner).into_inner();
inner_vec.truncate(level - 1);
time.inner = PointStamp::new(inner_vec);
col_builder.push_into((k, v, &time, d));
}
let mut session = output.session(&new_cap);
while let Some(container) = col_builder.finish() {
session.give_container(container);
}
});
}
});
stream.as_collection()
}
pub fn as_recorded_updates<U>(
arranged: differential_dataflow::operators::arrange::Arranged<
differential_dataflow::operators::arrange::TraceAgent<ValSpine<U::Key, U::Val, U::Time, U::Diff>>,
>,
) -> differential_dataflow::Collection<U::Time, RecordedUpdates<U>>
where
U: layout::ColumnarUpdate,
{
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Pipeline;
use differential_dataflow::trace::{BatchReader, Cursor};
use differential_dataflow::AsCollection;
arranged.stream
.unary::<ValColBuilder<U>, _, _, _>(Pipeline, "AsRecordedUpdates", |_, _| {
move |input, output| {
input.for_each(|time, batches| {
let mut session = output.session_with_builder(&time);
for batch in batches.drain(..) {
let mut cursor = batch.cursor();
while cursor.key_valid(&batch) {
while cursor.val_valid(&batch) {
let key = cursor.key(&batch);
let val = cursor.val(&batch);
cursor.map_times(&batch, |time, diff| {
session.give((key, val, time, diff));
});
cursor.step_val(&batch);
}
cursor.step_key(&batch);
}
}
});
}
})
.as_collection()
}