differential_dataflow/algorithms/graphs/
bfs.rs1use std::hash::Hash;
4
5use timely::dataflow::*;
6
7use crate::{VecCollection, ExchangeData};
8use crate::operators::*;
9use crate::lattice::Lattice;
10
11pub fn bfs<G, N>(edges: VecCollection<G, (N,N)>, roots: VecCollection<G, N>) -> VecCollection<G, (N,u32)>
13where
14 G: Scope<Timestamp: Lattice+Ord>,
15 N: ExchangeData+Hash,
16{
17 let edges = edges.arrange_by_key();
18 bfs_arranged(edges, roots)
19}
20
21use crate::trace::TraceReader;
22use crate::operators::arrange::Arranged;
23
24pub fn bfs_arranged<G, N, Tr>(edges: Arranged<G, Tr>, roots: VecCollection<G, N>) -> VecCollection<G, (N, u32)>
26where
27 G: Scope<Timestamp=Tr::Time>,
28 N: ExchangeData+Hash,
29 Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
30{
31 let nodes = roots.map(|x| (x, 0));
33
34 nodes.clone().iterate(|scope, inner| {
36
37 let edges = edges.enter(&scope);
38 let nodes = nodes.enter(&scope);
39
40 inner.join_core(edges, |_k,l,d| Some((d.clone(), l+1)))
41 .concat(nodes)
42 .reduce(|_, s, t| t.push((*s[0].0, 1)))
43 })
44}