extern crate rand;
extern crate time;
extern crate timely;
extern crate differential_dataflow;
use rand::{Rng, SeedableRng, StdRng};
use timely::dataflow::*;
use timely::dataflow::operators::*;
use differential_dataflow::Collection;
use differential_dataflow::collection::LeastUpperBound;
use differential_dataflow::operators::*;
use differential_dataflow::operators::join::JoinBy;
use differential_dataflow::operators::group::GroupBy;
fn main() {
timely::execute_from_args(std::env::args(), |root| {
let start = time::precise_time_s();
let (mut roots, mut graph) = root.scoped::<u64,_,_>(|scope| {
let (edge_input, graph) = scope.new_input();
let (node_input, roots) = scope.new_input();
let graph = Collection::new(graph);
let roots = Collection::new(roots);
let edges = graph.map(|(x,y)| (y,x)).concat(&graph);
let dists = bc(&edges, &roots);
dists.consolidate()
.inspect_batch(move |t,b| {
println!("epoch: {:?}, length: {}, processing: {}",
t, b.len(), (time::precise_time_s() - start) - (t.inner as f64));
});
(node_input, edge_input)
});
let nodes = 1u32; let edges = 0;
let seed: &[_] = &[1, 2, 3, 4];
let mut rng1: StdRng = SeedableRng::from_seed(seed);
println!("performing BFS on {} nodes, {} edges:", nodes, edges);
if root.index() == 0 {
for _ in 0..edges {
graph.send(((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1));
}
roots.send((0,1));
}
roots.close();
graph.close(); while root.step() { } println!("done!");
});
}
fn bc<G: Scope>(edges: &Collection<G, (u32, u32)>,
roots: &Collection<G, u32>)
-> Collection<G, (u32, u32, u32, u32)>
where G::Timestamp: LeastUpperBound {
let nodes = roots.map(|x| (x, x, x, 0));
let dists = nodes.iterate(|dists| {
let edges = edges.enter(&dists.scope());
let nodes = nodes.enter(&dists.scope());
dists.join_by_u(&edges, |(n,r,_,s)| (n, (r,s)), |e| e, |&n, &(r,s), &d| (d, r, n, s+1))
.concat(&nodes)
.group_by(|(n,r,b,s)| ((n,r),(s,b)), |&(n,r,_,_)| (n + r) as u64, |&(n,r)| (n + r) as u64, |&(n,r), &(b,s)| (n,r,b,s), |&(_n,_r), mut s, t| { let ref_s: &(u32, u32) = s.peek().unwrap().0;
let min_s = ref_s.0;
t.extend(s.take_while(|x| (x.0).0 == min_s).map(|(&(s,b),w)| ((b,s), w)));
})
.inspect_batch(|t,b| println!("iteration: {:?}, length: {}", t, b.len()))
});
dists
}