use std::hash::Hash;
use timely::progress::Timestamp;
use crate::{VecCollection, ExchangeData};
use crate::lattice::Lattice;
use crate::operators::*;
use crate::hashable::Hashable;
fn _color<'scope, T, N>(edges: VecCollection<'scope, T, (N,N)>) -> VecCollection<'scope, T,(N,Option<u32>)>
where
T: Timestamp + Lattice + Ord + Hash,
N: ExchangeData+Hash,
{
let start = edges.clone()
.map(|(x,_y)| (x,u32::max_value()))
.distinct();
sequence(start, edges, |_node, vals| {
(1u32 ..)
.find(|&i| vals.get(i as usize - 1).map(|x| *x.0) != Some(i))
.unwrap()
})
}
pub fn sequence<'scope, T, N, V, F>(
state: VecCollection<'scope, T, (N,V)>,
edges: VecCollection<'scope, T, (N,N)>,
logic: F) -> VecCollection<'scope, T, (N,Option<V>)>
where
T: Timestamp + Lattice + Hash + Ord,
N: ExchangeData+Hashable,
V: ExchangeData,
F: Fn(&N, &[(&V, isize)])->V+'static
{
state
.clone()
.map(|(node, _state)| (node, None))
.iterate(|scope, new_state| {
let edges = edges.enter(scope);
let old_state = state.enter(scope);
let forward = edges.clone().filter(|edge| edge.0 < edge.1);
let reverse = edges.filter(|edge| edge.0 > edge.1);
let new_messages = new_state.join_map(forward, |_k,v,d| (d.clone(),v.clone()));
let incomplete = new_messages.clone().filter(|x| x.1.is_none()).map(|x| x.0).distinct();
let new_messages = new_messages.filter(|x| x.1.is_some()).map(|x| (x.0, x.1.unwrap()));
let old_messages = old_state.join_map(reverse, |_k,v,d| (d.clone(),v.clone()));
let messages = new_messages.concat(old_messages).antijoin(incomplete.clone());
messages
.reduce(move |k, vs, t| t.push((Some(logic(k,vs)),1)))
.concat(incomplete.map(|x| (x, None)))
})
}