use std::collections::VecDeque;
use std::ops::Mul;
use timely::ContainerBuilder;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::{Scope, Stream};
use timely::dataflow::channels::pact::{Pipeline, Exchange};
use timely::dataflow::operators::Operator;
use timely::PartialOrder;
use timely::progress::{Antichain, ChangeBatch, Timestamp};
use timely::progress::frontier::MutableAntichain;
use differential_dataflow::{ExchangeData, VecCollection, AsCollection, Hashable};
use differential_dataflow::difference::{Monoid, Semigroup};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader};
use differential_dataflow::consolidation::{consolidate, consolidate_updates};
use differential_dataflow::trace::implementations::BatchContainer;
use timely::dataflow::operators::CapabilitySet;
pub fn half_join<G, K, V, R, Tr, FF, CF, DOut, S>(
stream: VecCollection<G, (K, V, G::Timestamp), R>,
arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
mut output_func: S,
) -> VecCollection<G, (DOut, G::Timestamp), <R as Mul<Tr::Diff>>::Output>
where
G: Scope<Timestamp = Tr::Time>,
K: Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader<KeyOwn = K, Time: std::hash::Hash>+Clone+'static,
R: Mul<Tr::Diff, Output: Semigroup>,
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
S: FnMut(&K, &V, Tr::Val<'_>)->DOut+'static,
{
let output_func = move |builder: &mut CapacityContainerBuilder<Vec<_>>, k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, diff1: &R, output: &mut Vec<(G::Timestamp, Tr::Diff)>| {
for (time, diff2) in output.drain(..) {
let diff = diff1.clone() * diff2.clone();
let dout = (output_func(k, v1, v2), time.clone());
use timely::container::PushInto;
builder.push_into((dout, initial.clone(), diff));
}
};
half_join_internal_unsafe::<_, _, _, _, _, _,_,_,_, CapacityContainerBuilder<Vec<_>>>(stream, arrangement, frontier_func, comparison, |_timer, _count| false, output_func)
.as_collection()
}
pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, Y, S, CB>(
stream: VecCollection<G, (K, V, G::Timestamp), R>,
mut arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
yield_function: Y,
mut output_func: S,
) -> Stream<G, CB::Container>
where
G: Scope<Timestamp = Tr::Time>,
K: Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: for<'a> TraceReader<KeyOwn = K, Time: std::hash::Hash>+Clone+'static,
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static,
Y: Fn(std::time::Instant, usize) -> bool + 'static,
S: FnMut(&mut CB, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static,
CB: ContainerBuilder,
{
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
let mut arrangement_trace = Some(arrangement.trace);
let arrangement_stream = arrangement.stream;
let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());
let mut output_buffer = Vec::new();
let mut blobs: Vec<Blob<(K, V, G::Timestamp), G::Timestamp, R>> = Vec::new();
let scope = stream.scope();
stream.inner.binary_frontier(arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,info| {
let activator = scope.activator_for(info.address);
move |(input1, frontier1), (input2, frontier2), output| {
let mut arriving: Vec<(G::Timestamp, (K, V, G::Timestamp), R)> = Vec::new();
let mut caps = CapabilitySet::new();
input1.for_each(|capability, data| {
caps.insert(capability.retain(0));
arriving.extend(data.drain(..).map(|(d, t, r)| (t, d, r)));
});
input2.for_each(|_, _| { });
let mut yielded = false;
let timer = std::time::Instant::now();
let mut work = 0;
if let Some(ref mut trace) = arrangement_trace {
let frontier = frontier2.frontier();
let mut time_con = Tr::TimeContainer::with_capacity(1);
if let Some(min_time) = frontier.iter().min() {
time_con.push_own(min_time);
}
let eligible = |initial: &G::Timestamp| -> bool {
!(0..time_con.len()).any(|i| comparison(time_con.index(i), initial))
};
consolidate_updates(&mut arriving);
if !arriving.is_empty() {
let mut lower = MutableAntichain::new();
lower.update_iter(arriving.iter().map(|(t, _, _)| (t.clone(), 1)));
let mut blob_caps = CapabilitySet::new();
for time in lower.frontier().iter() {
blob_caps.insert(caps.delayed(time));
}
let stuck_count = arriving.iter().rev()
.take_while(|(t, _, _)| !eligible(t))
.count();
let mut data: VecDeque<_> = arriving.into();
let ready_len = data.len() - stuck_count;
if ready_len > 0 {
let slice = data.make_contiguous();
slice[..ready_len].sort_by(|(t1, d1, r1), (t2, d2, r2)| {
(d1, t1, r1).cmp(&(d2, t2, r2))
});
}
blobs.push(Blob {
caps: blob_caps,
lower,
data,
stuck_count,
});
}
for blob in blobs.iter_mut().filter(|b| b.stuck_count == b.data.len()) {
let newly_ready = blob.data.iter().take_while(|(t, _, _)| eligible(t)).count();
if newly_ready > 0 {
blob.stuck_count -= newly_ready;
let slice = blob.data.make_contiguous();
slice[..newly_ready].sort_by(|(t1, d1, r1), (t2, d2, r2)| {
(d1, t1, r1).cmp(&(d2, t2, r2))
});
let mut new_lower = MutableAntichain::new();
new_lower.update_iter(blob.data.iter().map(|(t, _, _)| (t.clone(), 1)));
blob.lower = new_lower;
blob.caps.downgrade(&blob.lower.frontier());
}
}
for blob in blobs.iter_mut().filter(|b| b.data.len() > b.stuck_count) {
if yielded { break; }
let mut builders = (0..blob.caps.len()).map(|_| CB::default()).collect::<Vec<_>>();
let (mut cursor, storage) = trace.cursor();
let mut key_con = Tr::KeyContainer::with_capacity(1);
let mut removals: ChangeBatch<G::Timestamp> = ChangeBatch::new();
while blob.data.len() > blob.stuck_count {
yielded = yielded || yield_function(timer, work);
if yielded { break; }
let (ref initial, (ref key, ref val1, ref time), ref diff1) = blob.data[0];
let builder_idx = blob.caps.iter().position(|c| c.time().less_equal(initial)).unwrap();
key_con.clear(); key_con.push_own(&key);
cursor.seek_key(&storage, key_con.index(0));
if cursor.get_key(&storage) == key_con.get(0) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
let mut t = Tr::owned_time(t);
t.join_assign(time);
output_buffer.push((t, Tr::owned_diff(d)))
}
});
consolidate(&mut output_buffer);
work += output_buffer.len();
output_func(&mut builders[builder_idx], key, val1, val2, initial, diff1, &mut output_buffer);
output_buffer.clear();
cursor.step_val(&storage);
}
cursor.rewind_vals(&storage);
}
while let Some(container) = builders[builder_idx].extract() {
output.session(&blob.caps[builder_idx]).give_container(container);
}
let (initial, _, _) = blob.data.pop_front().unwrap();
removals.update(initial, -1);
}
for builder_idx in 0 .. blob.caps.len() {
while let Some(container) = builders[builder_idx].finish() {
output.session(&blob.caps[builder_idx]).give_container(container);
}
}
if blob.data.is_empty() {
blob.lower = MutableAntichain::new();
blob.caps = CapabilitySet::new();
blob.data = VecDeque::default();
} else {
blob.lower.update_iter(removals.drain());
blob.caps.downgrade(&blob.lower.frontier());
}
}
blobs.retain(|blob| !blob.data.is_empty());
}
if blobs.iter().any(|b| b.data.len() > b.stuck_count) {
activator.activate();
}
let mut frontier = Antichain::new();
for time in frontier1.frontier().iter() {
frontier_func(time, &mut frontier);
}
for blob in blobs.iter() {
for cap in blob.caps.iter() {
frontier_func(cap.time(), &mut frontier);
}
}
arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow()));
if frontier1.is_empty() && blobs.is_empty() {
arrangement_trace = None;
}
}
})
}
struct Blob<D, T: Timestamp, R> {
caps: CapabilitySet<T>,
lower: MutableAntichain<T>,
data: VecDeque<(T, D, R)>,
stuck_count: usize,
}