use std::rc::Rc;
use std::cell::RefCell;
use std::default::Default;
use std::ops::DerefMut;
use linear_map::LinearMap;
use timely::dataflow::*;
use timely::dataflow::operators::{Map, Unary};
use timely::dataflow::channels::pact::Exchange;
use timely_sort::{LSBRadixSorter, Unsigned};
use ::{Data, Collection};
use collection::{LeastUpperBound, Lookup, Trace, TraceRef};
use collection::basic::{BasicTrace, Offset};
use collection::count::Count;
use collection::compact::Compact;
pub struct Arranged<G: Scope, T: Trace<Index=G::Timestamp>>
where
T::Key: Data,
T::Value: Data,
G::Timestamp: LeastUpperBound ,
for<'a> &'a T: TraceRef<'a, T::Key, T::Index, T::Value>
{
pub stream: Stream<G, (Vec<<T as Trace>::Key>, Vec<u32>, Vec<(<T as Trace>::Value, i32)>)>,
pub trace: Rc<RefCell<T>>,
}
impl<G: Scope, T: Trace<Index=G::Timestamp>> Arranged<G, T>
where
T::Key: Data,
T::Value: Data,
G::Timestamp: LeastUpperBound ,
for<'a> &'a T: TraceRef<'a, T::Key, T::Index, T::Value>
{
pub fn as_collection(&self) -> Collection<G, (T::Key, T::Value)> {
Collection::new(
self.stream.flat_map(|(keys, cnts, vals)| {
let keys = keys.into_iter();
let cnts = cnts.into_iter();
let vals = vals.into_iter();
keys.zip(cnts)
.flat_map(|(k,c)| (0..c).map(move |_| k.clone()))
.zip(vals)
.map(|(k,(v,w))| ((k,v),w))
})
)
}
}
pub trait ArrangeByKey<G: Scope, K: Data, V: Data> where G::Timestamp: LeastUpperBound {
fn arrange_by_key<
U: Unsigned+Default,
KH: Fn(&K)->U+'static,
Look: Lookup<K, Offset>+'static,
LookG: Fn(u64)->Look,
>(&self, key_h: KH, look: LookG) -> Arranged<G, BasicTrace<K,G::Timestamp,V,Look>>;
}
impl<G: Scope, K: Data, V: Data> ArrangeByKey<G, K, V> for Collection<G, (K, V)> where G::Timestamp: LeastUpperBound {
fn arrange_by_key<
U: Unsigned+Default,
KH: Fn(&K)->U+'static,
Look: Lookup<K, Offset>+'static,
LookG: Fn(u64)->Look,
>
(&self, key_h: KH, look: LookG) -> Arranged<G, BasicTrace<K,G::Timestamp,V,Look>> {
let peers = self.scope().peers();
let mut log_peers = 0;
while (1 << (log_peers + 1)) <= peers {
log_peers += 1;
}
let trace = Rc::new(RefCell::new(BasicTrace::new(look(log_peers))));
let source = Rc::downgrade(&trace);
let mut inputs = LinearMap::new();
let part1 = Rc::new(key_h);
let part2 = part1.clone();
let exch = Exchange::new(move |&((ref k, _),_): &((K,V),i32)| part1(k).as_u64());
let mut sorter = LSBRadixSorter::new();
let stream = self.inner.unary_notify(exch, "ArrangeByKey", vec![], move |input, output, notificator| {
input.for_each(|time, data| {
inputs.entry_or_insert(time.time(), || { notificator.notify_at(time); Vec::new() })
.push(::std::mem::replace(data.deref_mut(), Vec::new()));
});
notificator.for_each(|index, _count, _notificator| {
if let Some(mut queue) = inputs.remove_key(&index) {
let compact = if queue.len() > 1 {
for element in queue.into_iter() {
sorter.push_batch(element, &|x| part2(&(x.0).0));
}
let mut sorted = sorter.finish(&|x| part2(&(x.0).0));
let result = Compact::from_radix(&mut sorted, &|k| part2(k));
sorted.truncate(256);
sorter.recycle(sorted);
result
}
else {
let mut vec = queue.pop().unwrap();
vec.sort_by(|x,y| part2(&(x.0).0).cmp(&part2((&(y.0).0))));
Compact::from_radix(&mut vec![vec], &|k| part2(k))
};
if let Some(compact) = compact {
output.session(&index).give((compact.keys.clone(), compact.cnts.clone(), compact.vals.clone()));
if let Some(trace) = source.upgrade() {
trace.borrow_mut().set_difference(index.time(), compact);
}
}
}
});
});
Arranged { stream: stream, trace: trace }
}
}
pub trait ArrangeBySelf<G: Scope, K: Data> {
fn arrange_by_self<
U: Unsigned+Default,
KH: Fn(&K)->U+'static,
Look: Lookup<K, ::collection::count::Offset>+'static,
LookG: Fn(u64)->Look,
>(&self, key_h: KH, look: LookG) -> Arranged<G, Count<K, G::Timestamp, Look>>
where G::Timestamp: LeastUpperBound;
}
impl<G: Scope, K: Data> ArrangeBySelf<G, K> for Collection<G, K> where G::Timestamp: LeastUpperBound {
fn arrange_by_self<
U: Unsigned+Default,
KH: Fn(&K)->U+'static,
Look: Lookup<K, ::collection::count::Offset>+'static,
LookG: Fn(u64)->Look,
>
(&self, key_h: KH, look: LookG) -> Arranged<G, Count<K, G::Timestamp, Look>>
where G::Timestamp: LeastUpperBound {
let peers = self.scope().peers();
let mut log_peers = 0;
while (1 << (log_peers + 1)) <= peers {
log_peers += 1;
}
let trace = Rc::new(RefCell::new(Count::new(look(log_peers))));
let source = Rc::downgrade(&trace);
let mut inputs = LinearMap::new();
let part1 = Rc::new(key_h);
let part2 = part1.clone();
let exch = Exchange::new(move |&(ref k,_): &(K,i32)| part1(k).as_u64());
let mut sorter = LSBRadixSorter::new();
let stream = self.inner.unary_notify(exch, "ArrangeBySelf", vec![], move |input, output, notificator| {
while let Some((time, data)) = input.next() {
inputs.entry_or_insert(time.time(), || { notificator.notify_at(time); Vec::new() })
.push(::std::mem::replace(data.deref_mut(), Vec::new()));
}
while let Some((index, _count)) = notificator.next() {
if let Some(mut queue) = inputs.remove_key(&index) {
let compact = if queue.len() > 1 {
for element in queue.into_iter() {
sorter.extend(element.into_iter().map(|(d,w)| ((d,()),w)), &|x| part2(&(x.0).0));
}
let mut sorted = sorter.finish(&|x| part2(&(x.0).0));
let result = Compact::from_radix(&mut sorted, &|k| part2(k));
sorted.truncate(256);
sorter.recycle(sorted);
result
}
else {
let mut vec = queue.pop().unwrap();
let mut vec = vec.drain(..).map(|(d,w)| ((d,()),w)).collect::<Vec<_>>();
vec.sort_by(|x,y| part2(&(x.0).0).cmp(&part2((&(y.0).0))));
Compact::from_radix(&mut vec![vec], &|k| part2(k))
};
if let Some(compact) = compact {
output.session(&index).give((compact.keys.clone(), compact.cnts.clone(), compact.vals.clone()));
if let Some(trace) = source.upgrade() {
trace.borrow_mut().set_difference(index.time(), compact);
}
}
}
}
});
Arranged { stream: stream, trace: trace }
}
}