differential_dataflow/algorithms/graphs/
bijkstra.rs1use std::hash::Hash;
4
5use timely::order::Product;
6use timely::dataflow::*;
7
8use crate::{VecCollection, ExchangeData};
9use crate::lattice::Lattice;
10use crate::operators::iterate::Variable;
11
12pub fn bidijkstra<G, N>(edges: VecCollection<G, (N,N)>, goals: VecCollection<G, (N,N)>) -> VecCollection<G, ((N,N), u32)>
23where
24 G: Scope<Timestamp: Lattice+Ord>,
25 N: ExchangeData+Hash,
26{
27 let forward = edges.clone().arrange_by_key();
28 let reverse = edges.map(|(x,y)| (y,x)).arrange_by_key();
29 bidijkstra_arranged(forward, reverse, goals)
30}
31
32use crate::trace::TraceReader;
33use crate::operators::arrange::Arranged;
34
35pub fn bidijkstra_arranged<G, N, Tr>(
37 forward: Arranged<G, Tr>,
38 reverse: Arranged<G, Tr>,
39 goals: VecCollection<G, (N,N)>
40) -> VecCollection<G, ((N,N), u32)>
41where
42 G: Scope<Timestamp=Tr::Time>,
43 N: ExchangeData+Hash,
44 Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
45{
46 forward
47 .stream
48 .scope().iterative::<u64,_,_>(|inner| {
49
50 let forward_edges = forward.enter(inner);
51 let reverse_edges = reverse.enter(inner);
52
53 let (forward_bind, forward) = Variable::new_from(goals.clone().map(|(x,_)| (x.clone(),(x.clone(),0))).enter(inner), Product::new(Default::default(), 1));
59 let (reverse_bind, reverse) = Variable::new_from(goals.clone().map(|(_,y)| (y.clone(),(y.clone(),0))).enter(inner), Product::new(Default::default(), 1));
60
61 forward.clone().map(|_| ()).consolidate().inspect(|x| println!("forward: {:?}", x));
62 reverse.clone().map(|_| ()).consolidate().inspect(|x| println!("reverse: {:?}", x));
63
64 let goals = goals.enter(inner);
65 let reached =
73 forward
74 .clone()
75 .join_map(reverse.clone(), |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2))
76 .reduce(|_key, s, t| t.push((*s[0].0, 1)))
77 .semijoin(goals.clone());
78
79 let active =
80 reached
81 .clone()
82 .negate()
83 .map(|(srcdst,_)| srcdst)
84 .concat(goals)
85 .consolidate();
86
87 let forward_active = active.clone().map(|(x,_y)| x).distinct();
89 let forward_next =
90 forward
91 .clone()
92 .map(|(med, (src, dist))| (src, (med, dist)))
93 .semijoin(forward_active)
94 .map(|(src, (med, dist))| (med, (src, dist)))
95 .join_core(forward_edges, |_med, (src, dist), next| Some((next.clone(), (src.clone(), *dist+1))))
96 .concat(forward)
97 .map(|(next, (src, dist))| ((next, src), dist))
98 .reduce(|_key, s, t| t.push((*s[0].0, 1)))
99 .map(|((next, src), dist)| (next, (src, dist)));
100
101 forward_next.clone().map(|_| ()).consolidate().inspect(|x| println!("forward_next: {:?}", x));
102
103 forward_bind.set(forward_next);
104
105 let reverse_active = active.map(|(_x,y)| y).distinct();
107 let reverse_next =
108 reverse
109 .clone()
110 .map(|(med, (rev, dist))| (rev, (med, dist)))
111 .semijoin(reverse_active)
112 .map(|(rev, (med, dist))| (med, (rev, dist)))
113 .join_core(reverse_edges, |_med, (rev, dist), next| Some((next.clone(), (rev.clone(), *dist+1))))
114 .concat(reverse)
115 .map(|(next, (rev, dist))| ((next, rev), dist))
116 .reduce(|_key, s, t| t.push((*s[0].0, 1)))
117 .map(|((next,rev), dist)| (next, (rev, dist)));
118
119 reverse_next.clone().map(|_| ()).consolidate().inspect(|x| println!("reverse_next: {:?}", x));
120
121 reverse_bind.set(reverse_next);
122
123 reached.leave()
124 })
125}