extern crate timely;
extern crate graph_map;
extern crate differential_dataflow;
use timely::order::Product;
use timely::dataflow::{*, operators::Filter};
use differential_dataflow::Collection;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::{*, iterate::Variable};
use differential_dataflow::input::InputSession;
use differential_dataflow::AsCollection;
use graph_map::GraphMMap;
type Node = u32;
type Edge = (Node, Node);
type Time = u32;
type Iter = u32;
type Diff = isize;
fn main() {
let filename = std::env::args().nth(1).unwrap();
let iterations: Iter = std::env::args().nth(2).unwrap().parse().unwrap();
let inspect = std::env::args().nth(3) == Some("inspect".to_string());
timely::execute_from_args(std::env::args().skip(2), move |worker| {
let peers = worker.peers();
let index = worker.index();
let timer = worker.timer();
let mut input = InputSession::new();
let mut probe = ProbeHandle::new();
worker.dataflow::<Time,_,_>(|scope| {
let edges = input.to_collection(scope);
pagerank(iterations, &edges)
.filter(move |_| inspect)
.consolidate()
.inspect(|x| println!("{:?}", x))
.probe_with(&mut probe);
});
let graph = GraphMMap::new(&filename);
let mut node = index;
while node < graph.nodes() {
for &edge in graph.edges(node) {
input.update((node as Node, edge as Node), 1);
}
node += peers;
}
input.advance_to(1);
input.flush();
while probe.less_than(input.time()) {
worker.step();
}
println!("{:?}\tinitial compute complete", timer.elapsed());
for node in 1 .. graph.nodes() {
if node % peers == index {
if !graph.edges(node).is_empty() {
input.update((node as Node, graph.edges(node)[0] as Node), -1);
}
}
input.advance_to((node + 1) as Time);
input.flush();
while probe.less_than(input.time()) {
worker.step();
}
println!("{:?}\tround {} complete", timer.elapsed(), node);
}
}).unwrap();
}
fn pagerank<G>(iters: Iter, edges: &Collection<G, Edge, Diff>) -> Collection<G, Node, Diff>
where
G: Scope,
G::Timestamp: Lattice,
{
let nodes =
edges.flat_map(|(x,y)| Some(x).into_iter().chain(Some(y)))
.distinct();
let degrs = edges.map(|(src,_dst)| src)
.count();
edges.scope().iterative::<Iter,_,_>(|inner| {
let edges = edges.enter(inner);
let nodes = nodes.enter(inner);
let degrs = degrs.enter(inner);
let inits = nodes.explode(|node| Some((node, 6_000_000)));
let reset = nodes.explode(|node| Some((node, 1_000_000)));
let ranks = Variable::new_from(inits, Product::new(Default::default(), 1));
let to_push =
degrs.semijoin(&ranks)
.threshold(|(_node, degr), rank| (5 * rank) / (6 * degr))
.map(|(node, _degr)| node);
let mut pushed =
edges.semijoin(&to_push)
.map(|(_node, dest)| dest)
.concat(&reset)
.consolidate();
if iters > 0 {
pushed =
pushed
.inner
.filter(move |(_d,t,_r)| t.inner < iters)
.as_collection();
}
ranks.set(&pushed);
pushed.leave()
})
}