Skip to main content

differential_dataflow/algorithms/graphs/
bfs.rs

1//! Breadth-first distance labeling.
2
3use std::hash::Hash;
4
5use timely::progress::Timestamp;
6
7use crate::{VecCollection, ExchangeData};
8use crate::operators::*;
9use crate::lattice::Lattice;
10
11/// Returns pairs (node, dist) indicating distance of each node from a root.
12pub fn bfs<'scope, T, N>(edges: VecCollection<'scope, T, (N,N)>, roots: VecCollection<'scope, T, N>) -> VecCollection<'scope, T, (N,u32)>
13where
14    T: Timestamp + Lattice + Ord,
15    N: ExchangeData+Hash,
16{
17    let edges = edges.arrange_by_key();
18    bfs_arranged(edges, roots)
19}
20
21use crate::trace::TraceReader;
22use crate::operators::arrange::Arranged;
23
24/// Returns pairs (node, dist) indicating distance of each node from a root.
25pub fn bfs_arranged<'scope, N, Tr>(edges: Arranged<'scope, Tr>, roots: VecCollection<'scope, Tr::Time, N>) -> VecCollection<'scope, Tr::Time, (N, u32)>
26where
27    N: ExchangeData+Hash,
28    Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
29{
30    // initialize roots as reaching themselves at distance 0
31    let nodes = roots.map(|x| (x, 0));
32
33    // repeatedly update minimal distances each node can be reached from each root
34    nodes.clone().iterate(|scope, inner| {
35
36        let edges = edges.enter(scope);
37        let nodes = nodes.enter(scope);
38
39        inner.join_core(edges, |_k,l,d| Some((d.clone(), l+1)))
40             .concat(nodes)
41             .reduce(|_, s, t| t.push((*s[0].0, 1)))
42     })
43}