use std::hash::Hash;
use timely::order::Product;
use timely::progress::Timestamp;
use crate::{VecCollection, ExchangeData};
use crate::lattice::Lattice;
use crate::operators::iterate::Variable;
pub fn bidijkstra<'scope, T, N>(edges: VecCollection<'scope, T, (N,N)>, goals: VecCollection<'scope, T, (N,N)>) -> VecCollection<'scope, T, ((N,N), u32)>
where
T: Timestamp + Lattice + Ord,
N: ExchangeData+Hash,
{
let forward = edges.clone().arrange_by_key();
let reverse = edges.map(|(x,y)| (y,x)).arrange_by_key();
bidijkstra_arranged(forward, reverse, goals)
}
use crate::trace::TraceReader;
use crate::operators::arrange::Arranged;
pub fn bidijkstra_arranged<'scope, N, Tr>(
forward: Arranged<'scope, Tr>,
reverse: Arranged<'scope, Tr>,
goals: VecCollection<'scope, Tr::Time, (N,N)>
) -> VecCollection<'scope, Tr::Time, ((N,N), u32)>
where
N: ExchangeData+Hash,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
{
let outer = forward.stream.scope();
outer.iterative::<u64,_,_>(|inner| {
let forward_edges = forward.enter(inner);
let reverse_edges = reverse.enter(inner);
let (forward_bind, forward) = Variable::new_from(goals.clone().map(|(x,_)| (x.clone(),(x.clone(),0))).enter(inner), Product::new(Default::default(), 1));
let (reverse_bind, reverse) = Variable::new_from(goals.clone().map(|(_,y)| (y.clone(),(y.clone(),0))).enter(inner), Product::new(Default::default(), 1));
forward.clone().map(|_| ()).consolidate().inspect(|x| println!("forward: {:?}", x));
reverse.clone().map(|_| ()).consolidate().inspect(|x| println!("reverse: {:?}", x));
let goals = goals.enter(inner);
let reached =
forward
.clone()
.join_map(reverse.clone(), |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.semijoin(goals.clone());
let active =
reached
.clone()
.negate()
.map(|(srcdst,_)| srcdst)
.concat(goals)
.consolidate();
let forward_active = active.clone().map(|(x,_y)| x).distinct();
let forward_next =
forward
.clone()
.map(|(med, (src, dist))| (src, (med, dist)))
.semijoin(forward_active)
.map(|(src, (med, dist))| (med, (src, dist)))
.join_core(forward_edges, |_med, (src, dist), next| Some((next.clone(), (src.clone(), *dist+1))))
.concat(forward)
.map(|(next, (src, dist))| ((next, src), dist))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.map(|((next, src), dist)| (next, (src, dist)));
forward_next.clone().map(|_| ()).consolidate().inspect(|x| println!("forward_next: {:?}", x));
forward_bind.set(forward_next);
let reverse_active = active.map(|(_x,y)| y).distinct();
let reverse_next =
reverse
.clone()
.map(|(med, (rev, dist))| (rev, (med, dist)))
.semijoin(reverse_active)
.map(|(rev, (med, dist))| (med, (rev, dist)))
.join_core(reverse_edges, |_med, (rev, dist), next| Some((next.clone(), (rev.clone(), *dist+1))))
.concat(reverse)
.map(|(next, (rev, dist))| ((next, rev), dist))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.map(|((next,rev), dist)| (next, (rev, dist)));
reverse_next.clone().map(|_| ()).consolidate().inspect(|x| println!("reverse_next: {:?}", x));
reverse_bind.set(reverse_next);
reached.leave(outer)
})
}