use std::hash::Hash;
use timely::progress::Timestamp;
use crate::{VecCollection, ExchangeData};
use crate::operators::*;
use crate::lattice::Lattice;
pub fn bfs<'scope, T, N>(edges: VecCollection<'scope, T, (N,N)>, roots: VecCollection<'scope, T, N>) -> VecCollection<'scope, T, (N,u32)>
where
T: Timestamp + Lattice + Ord,
N: ExchangeData+Hash,
{
let edges = edges.arrange_by_key();
bfs_arranged(edges, roots)
}
use crate::trace::TraceReader;
use crate::operators::arrange::Arranged;
pub fn bfs_arranged<'scope, N, Tr>(edges: Arranged<'scope, Tr>, roots: VecCollection<'scope, Tr::Time, N>) -> VecCollection<'scope, Tr::Time, (N, u32)>
where
N: ExchangeData+Hash,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
{
let nodes = roots.map(|x| (x, 0));
nodes.clone().iterate(|scope, inner| {
let edges = edges.enter(scope);
let nodes = nodes.enter(scope);
inner.join_core(edges, |_k,l,d| Some((d.clone(), l+1)))
.concat(nodes)
.reduce(|_, s, t| t.push((*s[0].0, 1)))
})
}