use std::rc::Rc;
use std::cell::RefCell;
use std::default::Default;
use std::collections::HashMap;
use itertools::Itertools;
use linear_map::LinearMap;
use vec_map::VecMap;
use ::{Data, Collection, Delta};
use timely::dataflow::*;
use timely::dataflow::operators::Unary;
use timely::dataflow::channels::pact::Pipeline;
use timely_sort::Unsigned;
use collection::{LeastUpperBound, Lookup, Trace, BasicTrace, Offset};
use collection::trace::CollectionIterator;
use collection::basic::DifferenceIterator;
use collection::compact::Compact;
use iterators::coalesce::Coalesce;
use operators::arrange::{Arranged, ArrangeByKey};
pub trait Group<G: Scope, K: Data, V: Data> where G::Timestamp: LeastUpperBound {
fn group<L, V2: Data>(&self, logic: L) -> Collection<G, (K,V2)>
where L: Fn(&K, &mut CollectionIterator<DifferenceIterator<V>>, &mut Vec<(V2, Delta)>)+'static;
}
impl<G: Scope, K: Data+Default, V: Data+Default> Group<G, K, V> for Collection<G, (K,V)>
where G::Timestamp: LeastUpperBound
{
fn group<L, V2: Data>(&self, logic: L) -> Collection<G, (K,V2)>
where L: Fn(&K, &mut CollectionIterator<DifferenceIterator<V>>, &mut Vec<(V2, Delta)>)+'static {
self.arrange_by_key(|k| k.hashed(), |_| HashMap::new())
.group(|k| k.hashed(), |_| HashMap::new(), logic)
.as_collection()
}
}
pub trait Count<G: Scope, K: Data> where G::Timestamp: LeastUpperBound {
fn count(&self) -> Collection<G, (K,Delta)>;
}
impl<G: Scope, K: Data+Default> Count<G, K> for Collection<G, K> where G::Timestamp: LeastUpperBound {
fn count(&self) -> Collection<G, (K,Delta)> {
self.map(|k| (k,()))
.group(|_k,s,t| t.push((s.next().unwrap().1, 1)))
}
}
pub trait GroupUnsigned<G: Scope, U: Unsigned+Data+Default, V: Data>
where G::Timestamp: LeastUpperBound {
fn group_u<L, V2: Data>(&self, logic: L) -> Collection<G, (U, V2)>
where L: Fn(&U, &mut CollectionIterator<DifferenceIterator<V>>, &mut Vec<(V2, i32)>)+'static;
}
impl<G: Scope, U: Unsigned+Data+Default, V: Data> GroupUnsigned<G, U, V> for Collection<G, (U,V)>
where G::Timestamp: LeastUpperBound {
fn group_u<L, V2: Data>(&self, logic: L) -> Collection<G, (U, V2)>
where L: Fn(&U, &mut CollectionIterator<DifferenceIterator<V>>, &mut Vec<(V2, i32)>)+'static {
self.arrange_by_key(|k| k.as_u64(), |x| (VecMap::new(), x))
.group(|k| k.as_u64(), |x| (VecMap::new(), x), logic)
.as_collection()
}
}
pub trait CountUnsigned<G: Scope, U: Unsigned+Data+Default> where G::Timestamp: LeastUpperBound {
fn count_u(&self) -> Collection<G, (U,Delta)>;
}
impl<G: Scope, U: Unsigned+Data+Default> CountUnsigned<G, U> for Collection<G, U> where G::Timestamp: LeastUpperBound {
fn count_u(&self) -> Collection<G,(U,Delta)> {
self.map(|k| (k,()))
.group_u(|_k,s,t| t.push((s.next().unwrap().1, 1)))
}
}
pub trait GroupArranged<G: Scope, K: Data, V: Data> {
fn group<V2, U, KH, Look, LookG, Logic>(&self, key_h: KH, look: LookG, logic: Logic)
-> Arranged<G, BasicTrace<K,G::Timestamp,V2,Look>>
where
G::Timestamp: LeastUpperBound,
V2: Data,
U: Unsigned+Default,
KH: Fn(&K)->U+'static,
Look: Lookup<K, Offset>+'static,
LookG: Fn(u64)->Look,
Logic: Fn(&K, &mut CollectionIterator<DifferenceIterator<V>>, &mut Vec<(V2, i32)>)+'static;
}
impl<G, K, V, L> GroupArranged<G, K, V> for Arranged<G, BasicTrace<K, G::Timestamp, V, L>>
where
G: Scope,
K: Data,
V: Data,
L: Lookup<K, Offset>+'static,
G::Timestamp: LeastUpperBound {
fn group<V2, U, KH, Look, LookG, Logic>(&self, key_h: KH, look: LookG, logic: Logic)
-> Arranged<G, BasicTrace<K,G::Timestamp,V2,Look>>
where
V2: Data,
U: Unsigned+Default,
KH: Fn(&K)->U+'static,
Look: Lookup<K, Offset>+'static,
LookG: Fn(u64)->Look,
Logic: Fn(&K, &mut CollectionIterator<DifferenceIterator<V>>, &mut Vec<(V2, i32)>)+'static {
let peers = self.stream.scope().peers();
let mut log_peers = 0;
while (1 << (log_peers + 1)) <= peers {
log_peers += 1;
}
let source = self.trace.clone();
let result = Rc::new(RefCell::new(BasicTrace::new(look(log_peers))));
let target = result.clone();
let mut inputs = LinearMap::new();
let mut to_do = LinearMap::new();
let mut buffer = vec![];
let stream = self.stream.unary_notify(Pipeline, "GroupArranged", vec![], move |input, output, notificator| {
input.for_each(|time, data| {
inputs.entry_or_insert(time.time(), || {
notificator.notify_at(time);
data.drain(..).next().unwrap().0
});
});
notificator.for_each(|capability, _count, notificator| {
let time = capability.time();
if let Some(queue) = inputs.remove_key(&time) {
let source = source.borrow();
let mut stash = Vec::new();
for key in queue {
if source.get_difference(&key, &time).is_some() {
stash.push(capability.time());
source.interesting_times(&key, &time, &mut stash);
for new_time in &stash {
to_do.entry_or_insert(new_time.clone(), || {
notificator.notify_at(capability.delayed(new_time));
Vec::new()
})
.push(key.clone());
}
stash.clear();
}
}
}
if let Some(mut keys) = to_do.remove_key(&time) {
keys.sort_by(|x,y| (key_h(&x), x).cmp(&(key_h(&y), y)));
keys.dedup();
let mut accumulation = Compact::new(0,0);
let mut source_borrow = source.borrow_mut();
for key in keys {
let mut input = source_borrow.get_collection(&key, &time);
if input.peek().is_some() { logic(&key, &mut input, &mut buffer); }
buffer.sort_by(|x,y| x.0.cmp(&y.0));
let mut compact = accumulation.session();
{
let mut borrow = target.borrow_mut();
let iter = borrow.get_collection(&key, &time)
.map(|(v, w)| (v,-w))
.merge_by(buffer.iter().map(|&(ref v, w)| (v, w)), |x,y| {
x.0 <= y.0
});
for (val, wgt) in Coalesce::coalesce(iter) {
compact.push(val.clone(), wgt);
}
}
compact.done(key);
buffer.clear();
}
if accumulation.vals.len() > 0 {
output.session(&capability).give((
accumulation.keys.clone(),
accumulation.cnts.clone(),
accumulation.vals.clone()
));
target.borrow_mut().set_difference(time, accumulation);
}
}
});
});
Arranged { stream: stream, trace: result }
}
}