use std::cmp::Ordering;
use timely::{Accountable, ContainerBuilder};
use timely::container::PushInto;
use timely::order::PartialOrder;
use timely::progress::Timestamp;
use timely::dataflow::Stream;
use timely::dataflow::operators::generic::{Operator, OutputBuilderSession, Session};
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::Capability;
use crate::lattice::Lattice;
use crate::operators::arrange::Arranged;
use crate::trace::{BatchReader, Cursor};
use crate::operators::ValueHistory;
use crate::trace::TraceReader;
pub type JoinSession<'a, 'b, T, CB, CT> = Session<'a, 'b, T, EffortBuilder<CB>, CT>;
#[derive(Default, Debug)]
pub struct EffortBuilder<CB>(pub std::cell::Cell<usize>, pub CB);
impl<CB: ContainerBuilder> timely::container::ContainerBuilder for EffortBuilder<CB> {
type Container = CB::Container;
#[inline]
fn extract(&mut self) -> Option<&mut Self::Container> {
let extracted = self.1.extract();
self.0.replace(self.0.take() + extracted.as_ref().map_or(0, |e| e.record_count() as usize));
extracted
}
#[inline]
fn finish(&mut self) -> Option<&mut Self::Container> {
let finished = self.1.finish();
self.0.replace(self.0.take() + finished.as_ref().map_or(0, |e| e.record_count() as usize));
finished
}
}
impl<CB: PushInto<D>, D> PushInto<D> for EffortBuilder<CB> {
#[inline]
fn push_into(&mut self, item: D) {
self.1.push_into(item);
}
}
pub fn join_traces<'scope, Tr1, Tr2, L, CB>(arranged1: Arranged<'scope, Tr1>, arranged2: Arranged<'scope, Tr2>, mut result: L) -> Stream<'scope, Tr1::Time, CB::Container>
where
Tr1: TraceReader+Clone+'static,
Tr2: for<'a> TraceReader<Key<'a>=Tr1::Key<'a>, Time = Tr1::Time>+Clone+'static,
L: FnMut(Tr1::Key<'_>,Tr1::Val<'_>,Tr2::Val<'_>,&Tr1::Time,&Tr1::Diff,&Tr2::Diff,&mut JoinSession<Tr1::Time, CB, Capability<Tr1::Time>>)+'static,
CB: ContainerBuilder,
{
let mut trace1 = arranged1.trace.clone();
let mut trace2 = arranged2.trace.clone();
let scope = arranged1.stream.scope();
arranged1.stream.binary_frontier(arranged2.stream, Pipeline, Pipeline, "Join", move |capability, info| {
use timely::scheduling::Activator;
let activations = scope.activations().clone();
let activator = Activator::new(info.address, activations);
use timely::progress::frontier::Antichain;
let mut acknowledged1 = Antichain::from_elem(Tr1::Time::minimum());
let mut acknowledged2 = Antichain::from_elem(Tr1::Time::minimum());
let mut todo1 = std::collections::VecDeque::new();
let mut todo2 = std::collections::VecDeque::new();
trace1.map_batches(|batch1| {
acknowledged1.clone_from(batch1.upper());
});
assert!(PartialOrder::less_equal(&trace1.get_physical_compaction(), &acknowledged1.borrow()));
let mut batch2_cursors = Vec::new();
trace2.map_batches(|batch2| {
acknowledged2.clone_from(batch2.upper());
batch2_cursors.push((batch2.cursor(), batch2.clone()));
});
assert!(PartialOrder::less_equal(&trace2.get_physical_compaction(), &acknowledged2.borrow()));
for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap();
todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone()));
}
let mut trace1_option = Some(trace1);
let mut trace2_option = Some(trace2);
move |(input1, frontier1), (input2, frontier2), output| {
input1.for_each(|capability, data| {
if let Some(ref mut trace2) = trace2_option {
let capability = capability.retain(0);
for batch1 in data.drain(..) {
if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
if !batch1.is_empty() {
let (trace2_cursor, trace2_storage) = trace2.cursor_through(acknowledged2.borrow()).unwrap();
let batch1_cursor = batch1.cursor();
todo1.push_back(Deferred::new(trace2_cursor, trace2_storage, batch1_cursor, batch1.clone(), capability.clone()));
}
debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper()));
acknowledged1.clone_from(batch1.upper());
}
}
}
else { panic!("`trace2_option` dropped before `input1` emptied!"); }
});
input2.for_each(|capability, data| {
if let Some(ref mut trace1) = trace1_option {
let capability = capability.retain(0);
for batch2 in data.drain(..) {
if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
if !batch2.is_empty() {
let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap();
let batch2_cursor = batch2.cursor();
todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone()));
}
debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper()));
acknowledged2.clone_from(batch2.upper());
}
}
}
else { panic!("`trace1_option` dropped before `input2` emptied!"); }
});
if let Some(trace1) = trace1_option.as_mut() {
trace1.advance_upper(&mut acknowledged1);
}
if let Some(trace2) = trace2_option.as_mut() {
trace2.advance_upper(&mut acknowledged2);
}
let mut fuel = 1_000_000;
while !todo1.is_empty() && fuel > 0 {
todo1.front_mut().unwrap().work(
output,
|k,v2,v1,t,r2,r1,c| result(k,v1,v2,t,r1,r2,c),
&mut fuel
);
if !todo1.front().unwrap().work_remains() { todo1.pop_front(); }
}
let mut fuel = 1_000_000;
while !todo2.is_empty() && fuel > 0 {
todo2.front_mut().unwrap().work(
output,
|k,v1,v2,t,r1,r2,c| result(k,v1,v2,t,r1,r2,c),
&mut fuel
);
if !todo2.front().unwrap().work_remains() { todo2.pop_front(); }
}
if !todo1.is_empty() || !todo2.is_empty() {
activator.activate();
}
if let Some(trace1) = trace1_option.as_mut() {
if frontier2.is_empty() { trace1_option = None; }
else {
trace1.set_logical_compaction(frontier2.frontier());
trace1.set_physical_compaction(acknowledged1.borrow());
}
}
if let Some(trace2) = trace2_option.as_mut() {
if frontier1.is_empty() { trace2_option = None;}
else {
trace2.set_logical_compaction(frontier1.frontier());
trace2.set_physical_compaction(acknowledged2.borrow());
}
}
}
})
}
struct Deferred<T, C1, C2>
where
T: Timestamp+Lattice+Ord,
C1: Cursor<Time=T>,
C2: for<'a> Cursor<Key<'a>=C1::Key<'a>, Time=T>,
{
trace: C1,
trace_storage: C1::Storage,
batch: C2,
batch_storage: C2::Storage,
capability: Capability<T>,
done: bool,
}
impl<T, C1, C2> Deferred<T, C1, C2>
where
C1: Cursor<Time=T>,
C2: for<'a> Cursor<Key<'a>=C1::Key<'a>, Time=T>,
T: Timestamp+Lattice+Ord,
{
fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability<T>) -> Self {
Deferred {
trace,
trace_storage,
batch,
batch_storage,
capability,
done: false,
}
}
fn work_remains(&self) -> bool {
!self.done
}
#[inline(never)]
fn work<L, CB: ContainerBuilder>(&mut self, output: &mut OutputBuilderSession<T, EffortBuilder<CB>>, mut logic: L, fuel: &mut usize)
where
L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff, &mut JoinSession<T, CB, Capability<T>>),
{
let meet = self.capability.time();
let mut effort = 0;
let mut session = output.session_with_builder(&self.capability);
let trace_storage = &self.trace_storage;
let batch_storage = &self.batch_storage;
let trace = &mut self.trace;
let batch = &mut self.batch;
let mut thinker = JoinThinker::new();
while let (Some(batch_key), Some(trace_key), true) = (batch.get_key(batch_storage), trace.get_key(trace_storage), effort < *fuel) {
match trace_key.cmp(&batch_key) {
Ordering::Less => trace.seek_key(trace_storage, batch_key),
Ordering::Greater => batch.seek_key(batch_storage, trace_key),
Ordering::Equal => {
thinker.history1.edits.load(trace, trace_storage, |time| {
let mut time = C1::owned_time(time);
time.join_assign(meet);
time
});
thinker.history2.edits.load(batch, batch_storage, |time| C2::owned_time(time));
thinker.think(|v1,v2,t,r1,r2| {
logic(batch_key, v1, v2, &t, r1, r2, &mut session);
});
effort += session.builder().0.take();
batch.step_key(batch_storage);
trace.step_key(trace_storage);
thinker.history1.clear();
thinker.history2.clear();
}
}
}
self.done = !batch.key_valid(batch_storage) || !trace.key_valid(trace_storage);
if effort > *fuel { *fuel = 0; }
else { *fuel -= effort; }
}
}
struct JoinThinker<'a, C1, C2>
where
C1: Cursor,
C2: Cursor<Time = C1::Time>,
{
pub history1: ValueHistory<'a, C1>,
pub history2: ValueHistory<'a, C2>,
}
impl<'a, C1, C2> JoinThinker<'a, C1, C2>
where
C1: Cursor,
C2: Cursor<Time = C1::Time>,
{
fn new() -> Self {
JoinThinker {
history1: ValueHistory::new(),
history2: ValueHistory::new(),
}
}
fn think<F: FnMut(C1::Val<'a>,C2::Val<'a>,C1::Time,&C1::Diff,&C2::Diff)>(&mut self, mut results: F) {
if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
self.history1.edits.map(|v1, t1, d1| {
self.history2.edits.map(|v2, t2, d2| {
results(v1, v2, t1.join(t2), d1, d2);
})
})
}
else {
let mut replay1 = self.history1.replay();
let mut replay2 = self.history2.replay();
while !replay1.is_done() && !replay2.is_done() {
if replay1.time().unwrap().cmp(replay2.time().unwrap()) == ::std::cmp::Ordering::Less {
replay2.advance_buffer_by(replay1.meet().unwrap());
for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
let (val1, time1, diff1) = replay1.edit().unwrap();
results(val1, val2, time1.join(time2), diff1, diff2);
}
replay1.step();
}
else {
replay1.advance_buffer_by(replay2.meet().unwrap());
for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
let (val2, time2, diff2) = replay2.edit().unwrap();
results(val1, val2, time1.join(time2), diff1, diff2);
}
replay2.step();
}
}
while !replay1.is_done() {
replay2.advance_buffer_by(replay1.meet().unwrap());
for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
let (val1, time1, diff1) = replay1.edit().unwrap();
results(val1, val2, time1.join(time2), diff1, diff2);
}
replay1.step();
}
while !replay2.is_done() {
replay1.advance_buffer_by(replay2.meet().unwrap());
for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
let (val2, time2, diff2) = replay2.edit().unwrap();
results(val1, val2, time1.join(time2), diff1, diff2);
}
replay2.step();
}
}
}
}