use std::collections::HashMap;
use std::ops::Mul;
use std::time::Instant;
use timely::ContainerBuilder;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::{Scope, ScopeParent, Stream};
use timely::dataflow::channels::pact::{Pipeline, Exchange};
use timely::dataflow::operators::{Capability, Operator, generic::Session};
use timely::progress::Antichain;
use timely::progress::frontier::AntichainRef;
use differential_dataflow::{ExchangeData, VecCollection, AsCollection, Hashable};
use differential_dataflow::difference::{Monoid, Semigroup};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader};
use differential_dataflow::consolidation::{consolidate, consolidate_updates};
use differential_dataflow::trace::implementations::BatchContainer;
pub fn half_join<G, K, V, R, Tr, FF, CF, DOut, S>(
stream: VecCollection<G, (K, V, G::Timestamp), R>,
arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
mut output_func: S,
) -> VecCollection<G, (DOut, G::Timestamp), <R as Mul<Tr::Diff>>::Output>
where
G: Scope<Timestamp = Tr::Time>,
K: Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader<KeyOwn = K, Time: std::hash::Hash>+Clone+'static,
R: Mul<Tr::Diff, Output: Semigroup>,
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
S: FnMut(&K, &V, Tr::Val<'_>)->DOut+'static,
{
let output_func = move |session: &mut SessionFor<G, _>, k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, diff1: &R, output: &mut Vec<(G::Timestamp, Tr::Diff)>| {
for (time, diff2) in output.drain(..) {
let diff = diff1.clone() * diff2.clone();
let dout = (output_func(k, v1, v2), time.clone());
session.give((dout, initial.clone(), diff));
}
};
half_join_internal_unsafe::<_, _, _, _, _, _,_,_,_, CapacityContainerBuilder<Vec<_>>>(stream, arrangement, frontier_func, comparison, |_timer, _count| false, output_func)
.as_collection()
}
type SessionFor<'a, 'b, G, CB> =
Session<'a, 'b,
<G as ScopeParent>::Timestamp,
CB,
Capability<<G as ScopeParent>::Timestamp>,
>;
pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, Y, S, CB>(
stream: VecCollection<G, (K, V, G::Timestamp), R>,
mut arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
yield_function: Y,
mut output_func: S,
) -> Stream<G, CB::Container>
where
G: Scope<Timestamp = Tr::Time>,
K: Hashable + ExchangeData,
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: for<'a> TraceReader<KeyOwn = K, Time: std::hash::Hash>+Clone+'static,
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static,
Y: Fn(std::time::Instant, usize) -> bool + 'static,
S: FnMut(&mut SessionFor<G, CB>, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static,
CB: ContainerBuilder,
{
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
let mut arrangement_trace = Some(arrangement.trace);
let arrangement_stream = arrangement.stream;
let mut stash = HashMap::new();
let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());
let mut output_buffer = Vec::new();
let scope = stream.scope();
stream.inner.binary_frontier(arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,info| {
let activator = scope.activator_for(info.address);
move |(input1, frontier1), (input2, frontier2), output| {
input1.for_each(|capability, data| {
stash.entry(capability.retain(0))
.or_insert(Vec::new())
.append(data)
});
input2.for_each(|_, _| { });
let mut yielded = false;
let timer = std::time::Instant::now();
let mut work = 0;
let mut stash_additions = HashMap::new();
if let Some(ref mut trace) = arrangement_trace {
for (capability, proposals) in stash.iter_mut() {
yielded = yielded || yield_function(timer, work);
if !yielded && !frontier2.less_equal(capability.time()) {
let frontier = frontier2.frontier();
yielded = process_proposals::<G, _, _, _, _, _, _, _, _>(
&comparison,
&yield_function,
&mut output_func,
&mut output_buffer,
timer,
&mut work,
trace,
proposals,
output.session_with_builder(capability),
frontier
);
proposals.retain(|ptd| !ptd.2.is_zero());
let mut antichain = Antichain::new();
for (_, initial, _) in proposals.iter() {
antichain.insert(initial.clone());
}
if antichain.len() == 1 && !antichain.less_equal(capability.time()) {
stash_additions
.entry(capability.delayed(&antichain[0]))
.or_insert(Vec::new())
.append(proposals);
}
else if antichain.len() > 1 {
let mut additions = vec![Vec::new(); antichain.len()];
for (data, initial, diff) in proposals.drain(..) {
use timely::PartialOrder;
let position = antichain.iter().position(|t| t.less_equal(&initial)).unwrap();
additions[position].push((data, initial, diff));
}
for (time, addition) in antichain.into_iter().zip(additions) {
stash_additions
.entry(capability.delayed(&time))
.or_insert(Vec::new())
.extend(addition);
}
}
}
}
}
if yielded {
activator.activate();
}
stash.retain(|_,proposals| !proposals.is_empty());
for (capability, proposals) in stash_additions.into_iter() {
stash.entry(capability).or_insert(Vec::new()).extend(proposals);
}
let mut frontier = timely::progress::frontier::Antichain::new();
for time in frontier1.frontier().iter() {
frontier_func(time, &mut frontier);
}
for time in stash.keys() {
frontier_func(time, &mut frontier);
}
arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow()));
if frontier1.is_empty() && stash.is_empty() {
arrangement_trace = None;
}
}
})
}
fn process_proposals<G, Tr, CF, Y, S, CB, K, V, R>(
comparison: &CF,
yield_function: &Y,
output_func: &mut S,
mut output_buffer: &mut Vec<(Tr::Time, Tr::Diff)>,
timer: Instant,
work: &mut usize,
trace: &mut Tr,
proposals: &mut Vec<((K, V, Tr::Time), Tr::Time, R)>,
mut session: SessionFor<G, CB>,
frontier: AntichainRef<Tr::Time>
) -> bool
where
G: Scope<Timestamp = Tr::Time>,
Tr: for<'a> TraceReader<KeyOwn = K>,
CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static,
Y: Fn(Instant, usize) -> bool + 'static,
S: FnMut(&mut SessionFor<G, CB>, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static,
CB: ContainerBuilder,
K: Ord,
V: Ord,
R: Monoid,
{
consolidate_updates(proposals);
let (mut cursor, storage) = trace.cursor();
let mut yielded = false;
let mut key_con = Tr::KeyContainer::with_capacity(1);
let mut time_con = Tr::TimeContainer::with_capacity(1);
for time in frontier.iter() {
time_con.push_own(time);
}
for ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() {
yielded = yielded || yield_function(timer, *work);
if !yielded && !(0 .. time_con.len()).any(|i| comparison(time_con.index(i), initial)) {
key_con.clear(); key_con.push_own(&key);
cursor.seek_key(&storage, key_con.index(0));
if cursor.get_key(&storage) == key_con.get(0) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
let mut t = Tr::owned_time(t);
t.join_assign(time);
output_buffer.push((t, Tr::owned_diff(d)))
}
});
consolidate(&mut output_buffer);
*work += output_buffer.len();
output_func(&mut session, key, val1, val2, initial, diff1, &mut output_buffer);
output_buffer.clear();
cursor.step_val(&storage);
}
cursor.rewind_vals(&storage);
}
*diff1 = R::zero();
}
}
yielded
}