use std::hash::Hash;
use timely::dataflow::*;
use timely::order::Product;
use crate::lattice::Lattice;
use crate::operators::iterate::Variable;
use crate::operators::*;
use crate::{ExchangeData, VecCollection};
pub fn bidijkstra<G, N>(
edges: &VecCollection<G, (N, N)>,
goals: &VecCollection<G, (N, N)>,
) -> VecCollection<G, ((N, N), u32)>
where
G: Scope<Timestamp: Lattice + Ord>,
N: ExchangeData + Hash,
{
use crate::operators::arrange::arrangement::ArrangeByKey;
let forward = edges.arrange_by_key();
let reverse = edges.map(|(x, y)| (y, x)).arrange_by_key();
bidijkstra_arranged(&forward, &reverse, goals)
}
use crate::operators::arrange::Arranged;
use crate::trace::TraceReader;
pub fn bidijkstra_arranged<G, N, Tr>(
forward: &Arranged<G, Tr>,
reverse: &Arranged<G, Tr>,
goals: &VecCollection<G, (N, N)>,
) -> VecCollection<G, ((N, N), u32)>
where
G: Scope<Timestamp = Tr::Time>,
N: ExchangeData + Hash,
Tr: for<'a> TraceReader<Key<'a> = &'a N, Val<'a> = &'a N, Diff = isize> + Clone + 'static,
{
forward.stream.scope().iterative::<u64, _, _>(|inner| {
let forward_edges = forward.enter(inner);
let reverse_edges = reverse.enter(inner);
let forward = Variable::new_from(
goals.map(|(x, _)| (x.clone(), (x.clone(), 0))).enter(inner),
Product::new(Default::default(), 1),
);
let reverse = Variable::new_from(
goals.map(|(_, y)| (y.clone(), (y.clone(), 0))).enter(inner),
Product::new(Default::default(), 1),
);
forward
.map(|_| ())
.consolidate()
.inspect(|x| println!("forward: {:?}", x));
reverse
.map(|_| ())
.consolidate()
.inspect(|x| println!("reverse: {:?}", x));
let goals = goals.enter(inner);
let reached = forward
.join_map(&reverse, |_, (src, d1), (dst, d2)| {
((src.clone(), dst.clone()), *d1 + *d2)
})
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.semijoin(&goals);
let active = reached
.negate()
.map(|(srcdst, _)| srcdst)
.concat(&goals)
.consolidate();
let forward_active = active.map(|(x, _y)| x).distinct();
let forward_next = forward
.map(|(med, (src, dist))| (src, (med, dist)))
.semijoin(&forward_active)
.map(|(src, (med, dist))| (med, (src, dist)))
.join_core(&forward_edges, |_med, (src, dist), next| {
Some((next.clone(), (src.clone(), *dist + 1)))
})
.concat(&forward)
.map(|(next, (src, dist))| ((next, src), dist))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.map(|((next, src), dist)| (next, (src, dist)));
forward_next
.map(|_| ())
.consolidate()
.inspect(|x| println!("forward_next: {:?}", x));
forward.set(&forward_next);
let reverse_active = active.map(|(_x, y)| y).distinct();
let reverse_next = reverse
.map(|(med, (rev, dist))| (rev, (med, dist)))
.semijoin(&reverse_active)
.map(|(rev, (med, dist))| (med, (rev, dist)))
.join_core(&reverse_edges, |_med, (rev, dist), next| {
Some((next.clone(), (rev.clone(), *dist + 1)))
})
.concat(&reverse)
.map(|(next, (rev, dist))| ((next, rev), dist))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.map(|((next, rev), dist)| (next, (rev, dist)));
reverse_next
.map(|_| ())
.consolidate()
.inspect(|x| println!("reverse_next: {:?}", x));
reverse.set(&reverse_next);
reached.leave()
})
}