Skip to main content

differential_dataflow/algorithms/graphs/
bijkstra.rs

1//! Bi-directional Dijkstra distance labeling.
2
3use std::hash::Hash;
4
5use timely::order::Product;
6use timely::progress::Timestamp;
7
8use crate::{VecCollection, ExchangeData};
9use crate::lattice::Lattice;
10use crate::operators::iterate::Variable;
11
12/// Returns the subset of `goals` that can reach each other in `edges`, with distance.
13///
14/// This method performs bidirectional search, from both ends of each goal in forward
15/// and reverse direction, for the sources and targets respectively. Each search can
16/// examine a fraction of the graph before meeting, and multiple searches can be managed
17/// concurrently.
18///
19/// Goals that cannot reach from the source to the target are relatively expensive, as
20/// the entire graph must be explored to confirm this. A graph connectivity pre-filter
21/// could be good insurance here.
22pub fn bidijkstra<'scope, T, N>(edges: VecCollection<'scope, T, (N,N)>, goals: VecCollection<'scope, T, (N,N)>) -> VecCollection<'scope, T, ((N,N), u32)>
23where
24    T: Timestamp + Lattice + Ord,
25    N: ExchangeData+Hash,
26{
27    let forward = edges.clone().arrange_by_key();
28    let reverse = edges.map(|(x,y)| (y,x)).arrange_by_key();
29    bidijkstra_arranged(forward, reverse, goals)
30}
31
32use crate::trace::TraceReader;
33use crate::operators::arrange::Arranged;
34
35/// Bi-directional Dijkstra search using arranged forward and reverse edge collections.
36pub fn bidijkstra_arranged<'scope, N, Tr>(
37    forward: Arranged<'scope, Tr>,
38    reverse: Arranged<'scope, Tr>,
39    goals: VecCollection<'scope, Tr::Time, (N,N)>
40) -> VecCollection<'scope, Tr::Time, ((N,N), u32)>
41where
42    N: ExchangeData+Hash,
43    Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
44{
45    let outer = forward.stream.scope();
46    outer.iterative::<u64,_,_>(|inner| {
47
48            let forward_edges = forward.enter(inner);
49            let reverse_edges = reverse.enter(inner);
50
51        // Our plan is to start evolving distances from both sources and destinations.
52        // The evolution from a source or destination should continue as long as there
53        // is a corresponding destination or source that has not yet been reached.
54
55        // forward and reverse (node, (root, dist))
56        let (forward_bind, forward) = Variable::new_from(goals.clone().map(|(x,_)| (x.clone(),(x.clone(),0))).enter(inner), Product::new(Default::default(), 1));
57        let (reverse_bind, reverse) = Variable::new_from(goals.clone().map(|(_,y)| (y.clone(),(y.clone(),0))).enter(inner), Product::new(Default::default(), 1));
58
59        forward.clone().map(|_| ()).consolidate().inspect(|x| println!("forward: {:?}", x));
60        reverse.clone().map(|_| ()).consolidate().inspect(|x| println!("reverse: {:?}", x));
61
62        let goals = goals.enter(inner);
63        // let edges = edges.enter(inner);
64
65        // Let's determine which (src, dst) pairs are ready to return.
66        //
67        //   done(src, dst) := forward(src, med), reverse(dst, med), goal(src, dst).
68        //
69        // This is a cyclic join, which should scare us a bunch.
70        let reached =
71        forward
72            .clone()
73            .join_map(reverse.clone(), |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2))
74            .reduce(|_key, s, t| t.push((*s[0].0, 1)))
75            .semijoin(goals.clone());
76
77        let active =
78        reached
79            .clone()
80            .negate()
81            .map(|(srcdst,_)| srcdst)
82            .concat(goals)
83            .consolidate();
84
85        // Let's expand out forward queries that are active.
86        let forward_active = active.clone().map(|(x,_y)| x).distinct();
87        let forward_next =
88        forward
89            .clone()
90            .map(|(med, (src, dist))| (src, (med, dist)))
91            .semijoin(forward_active)
92            .map(|(src, (med, dist))| (med, (src, dist)))
93            .join_core(forward_edges, |_med, (src, dist), next| Some((next.clone(), (src.clone(), *dist+1))))
94            .concat(forward)
95            .map(|(next, (src, dist))| ((next, src), dist))
96            .reduce(|_key, s, t| t.push((*s[0].0, 1)))
97            .map(|((next, src), dist)| (next, (src, dist)));
98
99        forward_next.clone().map(|_| ()).consolidate().inspect(|x| println!("forward_next: {:?}", x));
100
101        forward_bind.set(forward_next);
102
103        // Let's expand out reverse queries that are active.
104        let reverse_active = active.map(|(_x,y)| y).distinct();
105        let reverse_next =
106        reverse
107            .clone()
108            .map(|(med, (rev, dist))| (rev, (med, dist)))
109            .semijoin(reverse_active)
110            .map(|(rev, (med, dist))| (med, (rev, dist)))
111            .join_core(reverse_edges, |_med, (rev, dist), next| Some((next.clone(), (rev.clone(), *dist+1))))
112            .concat(reverse)
113            .map(|(next, (rev, dist))| ((next, rev), dist))
114            .reduce(|_key, s, t| t.push((*s[0].0, 1)))
115            .map(|((next,rev), dist)| (next, (rev, dist)));
116
117        reverse_next.clone().map(|_| ()).consolidate().inspect(|x| println!("reverse_next: {:?}", x));
118
119        reverse_bind.set(reverse_next);
120
121        reached.leave(outer)
122    })
123}