differential_dataflow/algorithms/graphs/
bfs.rs1use std::hash::Hash;
4
5use timely::progress::Timestamp;
6
7use crate::{VecCollection, ExchangeData};
8use crate::operators::*;
9use crate::lattice::Lattice;
10
11pub fn bfs<'scope, T, N>(edges: VecCollection<'scope, T, (N,N)>, roots: VecCollection<'scope, T, N>) -> VecCollection<'scope, T, (N,u32)>
13where
14 T: Timestamp + Lattice + Ord,
15 N: ExchangeData+Hash,
16{
17 let edges = edges.arrange_by_key();
18 bfs_arranged(edges, roots)
19}
20
21use crate::trace::TraceReader;
22use crate::operators::arrange::Arranged;
23
24pub fn bfs_arranged<'scope, N, Tr>(edges: Arranged<'scope, Tr>, roots: VecCollection<'scope, Tr::Time, N>) -> VecCollection<'scope, Tr::Time, (N, u32)>
26where
27 N: ExchangeData+Hash,
28 Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
29{
30 let nodes = roots.map(|x| (x, 0));
32
33 nodes.clone().iterate(|scope, inner| {
35
36 let edges = edges.enter(scope);
37 let nodes = nodes.enter(scope);
38
39 inner.join_core(edges, |_k,l,d| Some((d.clone(), l+1)))
40 .concat(nodes)
41 .reduce(|_, s, t| t.push((*s[0].0, 1)))
42 })
43}