extern crate rand;
extern crate timely;
extern crate differential_dataflow;
use rand::{Rng, SeedableRng, StdRng};
use timely::dataflow::operators::*;
use timely::order::Product;
use timely::scheduling::Scheduler;
use differential_dataflow::input::Input;
use differential_dataflow::AsCollection;
use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::operators::reduce::Reduce;
use differential_dataflow::operators::join::JoinCore;
use differential_dataflow::operators::Iterate;
use differential_dataflow::operators::Consolidate;
fn main() {
let nodes: u32 = std::env::args().nth(1).unwrap().parse().unwrap();
let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
let pre: usize = std::env::args().nth(4).unwrap().parse().unwrap();
let inspect: bool = std::env::args().nth(5).unwrap() == "inspect";
timely::execute_from_args(std::env::args().skip(6), move |worker| {
let timer = ::std::time::Instant::now();
let index = worker.index();
let peers = worker.peers();
let mut probe = timely::dataflow::operators::probe::Handle::new();
let mut graph = worker.dataflow::<Product<(),usize>,_,_>(|scope| {
timely::dataflow::operators::generic::source(scope, "RandomGraph", |mut capability, info| {
let activator = scope.activator_for(&info.address[..]);
let seed: &[_] = &[1, 2, 3, index];
let mut rng1: StdRng = SeedableRng::from_seed(seed); let mut rng2: StdRng = SeedableRng::from_seed(seed);
let mut additions = 0;
let mut deletions = 0;
let handle = probe.clone();
move |output| {
if !handle.less_than(capability.time()) {
let mut time = capability.time().clone();
{ let mut session = output.session(&capability);
for _ in 0 .. batch {
while additions < time.inner + edges {
if additions % peers == index {
let src = rng1.gen_range(0, nodes);
let dst = rng1.gen_range(0, nodes);
session.give(((src, dst), time, 1));
}
additions += 1;
}
while deletions < time.inner {
if deletions % peers == index {
let src = rng2.gen_range(0, nodes);
let dst = rng2.gen_range(0, nodes);
session.give(((src, dst), time, -1));
}
deletions += 1;
}
time.inner += 1;
}
}
capability.downgrade(&time);
}
activator.activate();
}
})
.probe_with(&mut probe)
.as_collection()
.arrange_by_key()
.trace
});
println!("{:?}:\tloading edges", timer.elapsed());
for _ in 0 .. pre {
worker.step();
}
println!("{:?}\tedges loaded; building query dataflows", timer.elapsed());
let mut roots = worker.dataflow(|scope| {
let edges = graph.import(scope);
let (input, roots) = scope.new_collection();
let roots = roots.map(|x| (x, 0));
roots.iterate(|dists| {
let edges = edges.enter(&dists.scope());
let roots = roots.enter(&dists.scope());
dists.arrange_by_key()
.join_core(&edges, |_k,l,d| Some((*d, l+1)))
.concat(&roots)
.reduce(|_, s, t| t.push((*s[0].0, 1)))
})
.map(|(_node, dist)| dist)
.consolidate()
.inspect(|x| println!("distance update: {:?}", x))
.probe_with(&mut probe);
input
});
let mut query = worker.dataflow(|scope| {
let edges = graph.import(scope);
let (input, query) = scope.new_collection();
query.map(|x| (x, x))
.join_core(&edges, |_n, &q, &d| Some((d, q)))
.join_core(&edges, |_n, &q, &d| Some((d, q)))
.join_core(&edges, |_n, &q, &d| Some((d, q)))
.filter(move |_| inspect)
.map(|x| x.1)
.consolidate()
.inspect(|x| println!("{:?}", x))
.probe_with(&mut probe);
input
});
println!("{:?}\tquery dataflows built; querying", timer.elapsed());
drop(graph);
if batch > 0 {
for round in 0 .. {
let mut time = query.time().clone();
if index == 0 {
query.insert(round % nodes);
}
time.inner += batch;
roots.advance_to(time); roots.flush();
query.advance_to(time); query.flush();
if index == 0 {
query.remove(round % nodes);
}
worker.step_while(|| probe.less_than(&time));
println!("{:?}\tquery round {:?} complete", timer.elapsed(), round);
}
}
}).unwrap();
}