extern crate timely;
extern crate differential_dataflow;
use std::io::{BufRead, BufReader};
use std::fs::File;
use timely::dataflow::*;
use timely::dataflow::operators::probe::Handle;
use differential_dataflow::input::InputSession;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;
type Time = usize;
type Node = u32;
type Edge = (Node, Node);
fn main() {
let mut args = std::env::args().skip(1);
let filename = args.next().expect("must specify a filename");
let root: Node = args.next().expect("must specify root node").parse().expect("root node must be an integer");
let batch: usize = args.next().expect("must specify batching").parse().expect("batch must be an integer");
let compression: Time = args.next().expect("must specify compression").parse().expect("compression must be an integer");
let inspect: bool = args.next().expect("must specify inspect bit").parse().expect("inspect must be boolean");
timely::execute_from_args(args, move |worker| {
let timer = ::std::time::Instant::now();
let index = worker.index();
let peers = worker.peers();
let mut roots = InputSession::new();
let mut graph = InputSession::new();
let mut probe = Handle::new();
worker.dataflow(|scope| {
let roots = roots.to_collection(scope);
let graph = graph.to_collection(scope);
bfs(&graph, &roots)
.filter(move |_| inspect)
.map(|(_,l)| l)
.consolidate()
.inspect(|x| println!("\t{:?}", x))
.probe_with(&mut probe);
});
let mut edges = Vec::new();
let file = BufReader::new(File::open(filename.clone()).unwrap());
for (count, readline) in file.lines().enumerate() {
if count % peers == index {
let line = readline.ok().expect("read error");
if !line.starts_with('#') {
let mut elts = line[..].split_whitespace();
let src: Node = elts.next().expect("line missing src field").parse().expect("malformed src");
let dst: Node = elts.next().expect("line missing dst field").parse().expect("malformed dst");
let sec: Time = elts.next().expect("line missing sec field").parse().expect("malformed sec");
edges.push((compression * ((sec / compression) + 1), src, dst));
}
}
}
roots.insert(root);
roots.close();
println!("{:?}\tData loaded.", timer.elapsed());
edges.sort();
println!("{:?}\tData sorted.", timer.elapsed());
let mut slice = &edges[..];
while !slice.is_empty() {
graph.advance_to(slice[0].0);
graph.flush();
while probe.less_than(graph.time()) {
worker.step();
}
println!("{:?}\tTime {:?} reached", timer.elapsed(), graph.time());
let limit = std::cmp::min(slice.len(), batch);
for &(time, src, dst) in slice[..limit].iter() {
graph.advance_to(time);
graph.insert((src, dst));
}
slice = &slice[limit..];
}
graph.close();
while worker.step() { }
println!("{:?}\tComputation complete.", timer.elapsed());
}).unwrap();
}
fn bfs<G: Scope>(edges: &Collection<G, Edge>, roots: &Collection<G, Node>) -> Collection<G, (Node, u32)>
where G::Timestamp: Lattice+Ord {
let nodes = roots.map(|x| (x, 0));
nodes.iterate(|inner| {
let edges = edges.enter(&inner.scope());
let nodes = nodes.enter(&inner.scope());
inner.join_map(&edges, |_k,l,d| (*d, l+1))
.concat(&nodes)
.reduce(|_, s, t| t.push((*s[0].0, 1)))
})
}