differential_dataflow/algorithms/graphs/
bijkstra.rs

1//! Bi-directional Dijkstra distance labeling.
2
3use std::hash::Hash;
4
5use timely::order::Product;
6use timely::dataflow::*;
7
8use crate::{Collection, ExchangeData};
9use crate::operators::*;
10use crate::lattice::Lattice;
11use crate::operators::iterate::Variable;
12
13/// Returns the subset of `goals` that can reach each other in `edges`, with distance.
14///
15/// This method performs bidirectional search, from both ends of each goal in forward
16/// and reverse direction, for the sources and targets respectively. Each search can
17/// examine a fraction of the graph before meeting, and multiple searches can be managed
18/// concurrently.
19///
20/// Goals that cannot reach from the source to the target are relatively expensive, as
21/// the entire graph must be explored to confirm this. A graph connectivity pre-filter
22/// could be good insurance here.
23pub fn bidijkstra<G, N>(edges: &Collection<G, (N,N)>, goals: &Collection<G, (N,N)>) -> Collection<G, ((N,N), u32)>
24where
25    G: Scope,
26    G::Timestamp: Lattice+Ord,
27    N: ExchangeData+Hash,
28{
29    use crate::operators::arrange::arrangement::ArrangeByKey;
30    let forward = edges.arrange_by_key();
31    let reverse = edges.map(|(x,y)| (y,x)).arrange_by_key();
32    bidijkstra_arranged(&forward, &reverse, goals)
33}
34
35use crate::trace::TraceReader;
36use crate::operators::arrange::Arranged;
37
38/// Bi-directional Dijkstra search using arranged forward and reverse edge collections.
39pub fn bidijkstra_arranged<G, N, Tr>(
40    forward: &Arranged<G, Tr>,
41    reverse: &Arranged<G, Tr>,
42    goals: &Collection<G, (N,N)>
43) -> Collection<G, ((N,N), u32)>
44where
45    G: Scope,
46    G::Timestamp: Lattice+Ord,
47    N: ExchangeData+Hash,
48    Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static,
49{
50    forward
51        .stream
52        .scope().iterative::<u64,_,_>(|inner| {
53
54            let forward_edges = forward.enter(inner);
55            let reverse_edges = reverse.enter(inner);
56
57        // Our plan is to start evolving distances from both sources and destinations.
58        // The evolution from a source or destination should continue as long as there
59        // is a corresponding destination or source that has not yet been reached.
60
61        // forward and reverse (node, (root, dist))
62        let forward = Variable::new_from(goals.map(|(x,_)| (x.clone(),(x.clone(),0))).enter(inner), Product::new(Default::default(), 1));
63        let reverse = Variable::new_from(goals.map(|(_,y)| (y.clone(),(y.clone(),0))).enter(inner), Product::new(Default::default(), 1));
64
65        forward.map(|_| ()).consolidate().inspect(|x| println!("forward: {:?}", x));
66        reverse.map(|_| ()).consolidate().inspect(|x| println!("reverse: {:?}", x));
67
68        let goals = goals.enter(inner);
69        // let edges = edges.enter(inner);
70
71        // Let's determine which (src, dst) pairs are ready to return.
72        //
73        //   done(src, dst) := forward(src, med), reverse(dst, med), goal(src, dst).
74        //
75        // This is a cyclic join, which should scare us a bunch.
76        let reached =
77        forward
78            .join_map(&reverse, |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2))
79            .reduce(|_key, s, t| t.push((*s[0].0, 1)))
80            .semijoin(&goals);
81
82        let active =
83        reached
84            .negate()
85            .map(|(srcdst,_)| srcdst)
86            .concat(&goals)
87            .consolidate();
88
89        // Let's expand out forward queries that are active.
90        let forward_active = active.map(|(x,_y)| x).distinct();
91        let forward_next =
92        forward
93            .map(|(med, (src, dist))| (src, (med, dist)))
94            .semijoin(&forward_active)
95            .map(|(src, (med, dist))| (med, (src, dist)))
96            .join_core(&forward_edges, |_med, (src, dist), next| Some((next.clone(), (src.clone(), *dist+1))))
97            .concat(&forward)
98            .map(|(next, (src, dist))| ((next, src), dist))
99            .reduce(|_key, s, t| t.push((*s[0].0, 1)))
100            .map(|((next, src), dist)| (next, (src, dist)));
101
102        forward_next.map(|_| ()).consolidate().inspect(|x| println!("forward_next: {:?}", x));
103
104        forward.set(&forward_next);
105
106        // Let's expand out reverse queries that are active.
107        let reverse_active = active.map(|(_x,y)| y).distinct();
108        let reverse_next =
109        reverse
110            .map(|(med, (rev, dist))| (rev, (med, dist)))
111            .semijoin(&reverse_active)
112            .map(|(rev, (med, dist))| (med, (rev, dist)))
113            .join_core(&reverse_edges, |_med, (rev, dist), next| Some((next.clone(), (rev.clone(), *dist+1))))
114            .concat(&reverse)
115            .map(|(next, (rev, dist))| ((next, rev), dist))
116            .reduce(|_key, s, t| t.push((*s[0].0, 1)))
117            .map(|((next,rev), dist)| (next, (rev, dist)));
118
119        reverse_next.map(|_| ()).consolidate().inspect(|x| println!("reverse_next: {:?}", x));
120
121        reverse.set(&reverse_next);
122
123        reached.leave()
124    })
125}