use std::rc::Rc;
use std::default::Default;
use std::hash::Hasher;
use std::ops::DerefMut;
use itertools::Itertools;
use ::{Collection, Data};
use timely::dataflow::*;
use timely::dataflow::operators::{Map, Binary};
use timely::dataflow::channels::pact::Exchange;
use timely_sort::{LSBRadixSorter, Unsigned};
use collection::{LeastUpperBound, Lookup, Trace, Offset};
use collection::trace::{CollectionIterator, DifferenceIterator, Traceable};
use iterators::coalesce::Coalesce;
use collection::compact::Compact;
pub trait CoGroupBy<G: Scope, K: Data, V1: Data> where G::Timestamp: LeastUpperBound {
fn cogroup_by_inner<
D: Data,
V2: Data+Default,
V3: Data+Default,
U: Unsigned+Default,
KH: Fn(&K)->U+'static,
Look: Lookup<K, Offset>+'static,
LookG: Fn(u64)->Look,
Logic: Fn(&K, &mut CollectionIterator<DifferenceIterator<V1>>, &mut CollectionIterator<DifferenceIterator<V2>>, &mut Vec<(V3, i32)>)+'static,
Reduc: Fn(&K, &V3)->D+'static,
>
(&self, other: &Collection<G, (K, V2)>, key_h: KH, reduc: Reduc, look: LookG, logic: Logic) -> Collection<G, D>;
}
impl<G: Scope, K: Data, V1: Data> CoGroupBy<G, K, V1> for Collection<G, (K, V1)>
where G::Timestamp: LeastUpperBound {
fn cogroup_by_inner<
D: Data,
V2: Data+Default,
V3: Data+Default,
U: Unsigned+Default,
KH: Fn(&K)->U+'static,
Look: Lookup<K, Offset>+'static,
LookG: Fn(u64)->Look,
Logic: Fn(&K, &mut CollectionIterator<V1>, &mut CollectionIterator<V2>, &mut Vec<(V3, i32)>)+'static,
Reduc: Fn(&K, &V3)->D+'static,
>
(&self, other: &Collection<G, (K, V2)>, key_h: KH, reduc: Reduc, look: LookG, logic: Logic) -> Collection<G, D> {
let mut source1 = Trace::new(look(0));
let mut source2 = Trace::new(look(0));
let mut result = Trace::new(look(0));
let mut inputs1 = Vec::new();
let mut inputs2 = Vec::new();
let mut to_do = Vec::new();
let mut buffer = vec![];
let key_h = Rc::new(key_h);
let key_1 = key_h.clone();
let key_2 = key_h.clone();
let exch1 = Exchange::new(move |&((ref k, _),_)| key_1(k).as_u64());
let exch2 = Exchange::new(move |&((ref k, _),_)| key_2(k).as_u64());
let mut sorter1 = LSBRadixSorter::new();
let mut sorter2 = LSBRadixSorter::new();
Collection::new(self.inner.binary_notify(&other.inner, exch1, exch2, "CoGroupBy", vec![], move |input1, input2, output, notificator| {
while let Some((time, data)) = input1.next() {
inputs1.entry_or_insert(time.time(), || Vec::new())
.push(::std::mem::replace(data.deref_mut(), Vec::new()));
notificator.notify_at(time);
}
while let Some((time, data)) = input2.next() {
inputs2.entry_or_insert(time.time(), || Vec::new())
.push(::std::mem::replace(data.deref_mut(), Vec::new()));
notificator.notify_at(time);
}
while let Some((index, _count)) = notificator.next() {
let mut stash = Vec::new();
panic!("interesting times needs to do LUB of union of times for each key, input");
if let Some(mut queue) = inputs1.remove_key(&index) {
let compact = if queue.len() > 1 {
for element in queue.into_iter() {
sorter1.extend(element.into_iter(), &|x| key_h(&(x.0).0));
}
let mut sorted = sorter1.finish(&|x| key_h(&(x.0).0));
let result = Compact::from_radix(&mut sorted, &|k| key_h(k));
sorted.truncate(256);
sorter1.recycle(sorted);
result
}
else {
let mut vec = queue.pop().unwrap();
let mut vec = vec.drain(..).collect::<Vec<_>>();
vec.sort_by(|x,y| key_h(&(x.0).0).cmp(&key_h((&(y.0).0))));
Compact::from_radix(&mut vec![vec], &|k| key_h(k))
};
if let Some(compact) = compact {
for key in &compact.keys {
stash.push(index.clone());
source1.interesting_times(key, &index, &mut stash);
for time in &stash {
let mut queue = to_do.entry_or_insert((*time).clone(), || { notificator.notify_at(index.delayed(time)); Vec::new() });
queue.push((*key).clone());
}
stash.clear();
}
source1.set_difference(index.time(), compact);
}
}
if let Some(mut queue) = inputs2.remove_key(&index) {
let compact = if queue.len() > 1 {
for element in queue.into_iter() {
sorter2.extend(element.into_iter(), &|x| key_h(&(x.0).0));
}
let mut sorted = sorter2.finish(&|x| key_h(&(x.0).0));
let result = Compact::from_radix(&mut sorted, &|k| key_h(k));
sorted.truncate(256);
sorter2.recycle(sorted);
result
}
else {
let mut vec = queue.pop().unwrap();
let mut vec = vec.drain(..).collect::<Vec<_>>();
vec.sort_by(|x,y| key_h(&(x.0).0).cmp(&key_h((&(y.0).0))));
Compact::from_radix(&mut vec![vec], &|k| key_h(k))
};
if let Some(compact) = compact {
for key in &compact.keys {
stash.push(index.clone());
source2.interesting_times(key, &index, &mut stash);
for time in &stash {
let mut queue = to_do.entry_or_insert((*time).clone(), || { notificator.notify_at(index.delayed(time)); Vec::new() });
queue.push((*key).clone());
}
stash.clear();
}
source2.set_difference(index.time(), compact);
}
}
let mut session = output.session(&index);
if let Some(mut keys) = to_do.remove_key(&index) {
keys.sort_by(|x,y| (key_h(&x), x).cmp(&(key_h(&y), y)));
keys.dedup();
let mut accumulation = Compact::new(0,0);
for key in keys {
let mut input1 = source1.get_collection(&key, &index);
let mut input2 = source2.get_collection(&key, &index);
if input1.peek().is_some() || input2.peek().is_some() { logic(&key, &mut input1, &mut input2, &mut buffer); }
buffer.sort_by(|x,y| x.0.cmp(&y.0));
let mut compact = accumulation.session();
for (val, wgt) in Coalesce::coalesce(result.get_collection(&key, &index)
.map(|(v, w)| (v,-w))
.merge_by(buffer.iter().map(|&(ref v, w)| (v, w)), |x,y| {
x.0 <= y.0
}))
{
session.give((reduc(&key, val), wgt));
compact.push(val.clone(), wgt);
}
compact.done(key);
buffer.clear();
}
if accumulation.vals.len() > 0 {
result.set_difference(index.time(), accumulation);
}
}
}
}))
}
}