use std::hash::Hash;
use timely::dataflow::*;
use ::{Collection, Data};
use ::lattice::Lattice;
use ::operators::*;
use hashable::Hashable;
fn _color<G, N>(edges: &Collection<G, (N,N)>) -> Collection<G,(N,Option<u32>)>
where
G: Scope,
G::Timestamp: Lattice+Ord,
N: Data+Hash,
{
let start = edges.map(|(x,_y)| (x,u32::max_value()))
.distinct();
sequence(&start, &edges, |_node, vals| {
(1u32 ..)
.filter(|&i| vals.get(i as usize - 1).map(|x| *x.0) != Some(i))
.next()
.unwrap()
})
}
pub fn sequence<G, N, V, F>(
state: &Collection<G, (N,V)>,
edges: &Collection<G, (N,N)>,
logic: F) -> Collection<G, (N,Option<V>)>
where
G: Scope,
G::Timestamp: Lattice+Hash+Ord,
N: Data+Hashable,
V: Data,
F: Fn(&N, &[(&V, isize)])->V+'static
{
let timer = ::std::time::Instant::now();
state
.map(|(node, _state)| (node, None))
.iterate(|new_state| {
new_state.map(|x| x.1.is_some()).consolidate().inspect(move |x| println!("{:?}\t{:?}", timer.elapsed(), x));
let edges = edges.enter(&new_state.scope());
let old_state = state.enter(&new_state.scope());
let forward = edges.filter(|edge| edge.0 < edge.1);
let reverse = edges.filter(|edge| edge.0 > edge.1);
let new_messages = new_state.join_map(&forward, |_k,v,d| (d.clone(),v.clone()));
let incomplete = new_messages.filter(|x| x.1.is_none()).map(|x| x.0).distinct();
let new_messages = new_messages.filter(|x| x.1.is_some()).map(|x| (x.0, x.1.unwrap()));
let old_messages = old_state.join_map(&reverse, |_k,v,d| (d.clone(),v.clone()));
let messages = new_messages.concat(&old_messages).antijoin(&incomplete);
messages
.reduce(move |k, vs, t| t.push((Some(logic(k,vs)),1)))
.concat(&incomplete.map(|x| (x, None)))
})
}