use std::hash::Hash;
use timely::dataflow::*;
use crate::hashable::Hashable;
use crate::lattice::Lattice;
use crate::operators::*;
use crate::{ExchangeData, VecCollection};
fn _color<G, N>(edges: &VecCollection<G, (N, N)>) -> VecCollection<G, (N, Option<u32>)>
where
G: Scope<Timestamp: Lattice + Ord + Hash>,
N: ExchangeData + Hash,
{
let start = edges.map(|(x, _y)| (x, u32::max_value())).distinct();
sequence(&start, edges, |_node, vals| {
(1u32..)
.find(|&i| vals.get(i as usize - 1).map(|x| *x.0) != Some(i))
.unwrap()
})
}
pub fn sequence<G, N, V, F>(
state: &VecCollection<G, (N, V)>,
edges: &VecCollection<G, (N, N)>,
logic: F,
) -> VecCollection<G, (N, Option<V>)>
where
G: Scope<Timestamp: Lattice + Hash + Ord>,
N: ExchangeData + Hashable,
V: ExchangeData,
F: Fn(&N, &[(&V, isize)]) -> V + 'static,
{
state
.map(|(node, _state)| (node, None))
.iterate(|new_state| {
let edges = edges.enter(&new_state.scope());
let old_state = state.enter(&new_state.scope());
let forward = edges.filter(|edge| edge.0 < edge.1);
let reverse = edges.filter(|edge| edge.0 > edge.1);
let new_messages = new_state.join_map(&forward, |_k, v, d| (d.clone(), v.clone()));
let incomplete = new_messages
.filter(|x| x.1.is_none())
.map(|x| x.0)
.distinct();
let new_messages = new_messages
.filter(|x| x.1.is_some())
.map(|x| (x.0, x.1.unwrap()));
let old_messages = old_state.join_map(&reverse, |_k, v, d| (d.clone(), v.clone()));
let messages = new_messages.concat(&old_messages).antijoin(&incomplete);
messages
.reduce(move |k, vs, t| t.push((Some(logic(k,vs)),1)))
.concat(&incomplete.map(|x| (x, None)))
})
}