1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use std::hash::Hash;
use timely::order::Product;
use timely::dataflow::*;
use ::{Collection, Data};
use ::operators::*;
use ::lattice::Lattice;
use ::operators::iterate::Variable;
pub fn bidijkstra<G, N>(edges: &Collection<G, (N,N)>, goals: &Collection<G, (N,N)>) -> Collection<G, ((N,N), u32)>
where
G: Scope,
G::Timestamp: Lattice+Ord,
N: Data+Hash,
{
edges.scope().iterative::<u64,_,_>(|inner| {
let forward = Variable::new_from(goals.map(|(x,_)| (x.clone(),(x.clone(),0))).enter(inner), Product::new(Default::default(), 1));
let reverse = Variable::new_from(goals.map(|(_,y)| (y.clone(),(y.clone(),0))).enter(inner), Product::new(Default::default(), 1));
let goals = goals.enter(inner);
let edges = edges.enter(inner);
let reached =
forward
.join_map(&reverse, |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2))
.reduce(|_key, s, t| t.push((s[0].0.clone(), 1)));
let active =
reached
.semijoin(&goals)
.negate()
.map(|(srcdst,_)| srcdst)
.concat(&goals)
.consolidate();
let forward_active = active.map(|(x,_y)| x).distinct();
let forward_next =
forward
.map(|(med, (src, dist))| (src, (med, dist)))
.semijoin(&forward_active)
.map(|(src, (med, dist))| (med, (src, dist)))
.join_map(&edges, |_med, (src, dist), next| (next.clone(), (src.clone(), *dist+1)))
.concat(&forward)
.map(|(next, (src, dist))| ((next, src), dist))
.reduce(|_key, s, t| t.push((s[0].0.clone(), 1)))
.map(|((next, src), dist)| (next, (src, dist)));
forward.set(&forward_next);
let reverse_active = active.map(|(_x,y)| y).distinct();
let reverse_next =
reverse
.map(|(med, (rev, dist))| (rev, (med, dist)))
.semijoin(&reverse_active)
.map(|(rev, (med, dist))| (med, (rev, dist)))
.join_map(&edges.map(|(x,y)| (y,x)), |_med, (rev, dist), next| (next.clone(), (rev.clone(), *dist+1)))
.concat(&reverse)
.map(|(next, (rev, dist))| ((next, rev), dist))
.reduce(|_key, s, t| t.push((s[0].0.clone(), 1)))
.map(|((next,rev), dist)| (next, (rev, dist)));
reverse.set(&reverse_next);
reached.leave()
})
}