extern crate timely;
extern crate graph_map;
extern crate differential_dataflow;
use std::hash::Hash;
use timely::dataflow::*;
use timely::dataflow::operators::*;
use differential_dataflow::{Collection, Data};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::*;
use graph_map::GraphMMap;
type Node = u32;
type Edge = (Node, Node);
type Nodes<G,V> = Collection<G, (Node, V)>;
type Edges<G> = Collection<G, Edge>;
fn main() {
let filename = std::env::args().nth(1).unwrap();
timely::execute_from_args(std::env::args().skip(1), move |worker| {
let peers = worker.peers();
let index = worker.index();
let graph = GraphMMap::new(&filename);
let nodes = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
let edges = (0..nodes).filter(move |node| node % peers == index)
.flat_map(move |node| {
let vec = graph.edges(node).to_vec();
vec.into_iter().map(move |edge| ((node as u32, edge), Default::default(), 1))
});
worker.dataflow::<u64,_,_>(|scope| {
let edges = Collection::new(edges.to_stream(scope));
let edges = edges.map(|(x,y)| (y,x)).concat(&edges);
_reach(&edges).map(|(_x,c)| c).consolidate().inspect(|x| println!("{:?}", x));
});
}).unwrap();
}
fn _color<G: Scope>(edges: &Edges<G>) -> Nodes<G,Option<u32>>
where G::Timestamp: Lattice+Hash+Ord {
let start = edges.map(|(x,_y)| (x,u32::max_value()))
.consolidate();
sequence(&start, &edges, |_node, vals| {
(1u32 ..)
.filter(|&i| vals.get(i as usize - 1).map(|x| x.0) != Some(&Some(i)))
.next()
.unwrap()
})
}
fn _reach<G: Scope>(edges: &Edges<G>) -> Nodes<G,Option<bool>>
where G::Timestamp: Lattice+Hash+Ord {
let start = edges.map(|(x,_y)| (x, x < 10))
.consolidate();
sequence(&start, &edges, |_node, vals| {
let mut reached = false;
let mut iter = vals.iter();
while let Some(&(b, _)) = iter.next() {
if let &Some(b) = b {
if b { reached = true; }
}
}
reached
})
}
fn sequence<G: Scope, V: Data, F>(state: &Nodes<G, V>, edges: &Edges<G>, logic: F) -> Nodes<G, Option<V>>
where
G::Timestamp: Lattice+Hash+Ord,
F: Fn(&Node, &[(&Option<V>, isize)])->V+'static
{
state
.map(|(node, _state)| (node, None))
.iterate(|new_state| {
let edges = edges.enter(&new_state.scope());
let old_state = state.enter(&new_state.scope())
.map(|x| (x.0, Some(x.1)));
let forward = edges.filter(|edge| edge.0 < edge.1);
let reverse = edges.filter(|edge| edge.0 > edge.1);
let new_messages = new_state.join_map(&forward, |_k,v,d| (*d,v.clone()));
let old_messages = old_state.join_map(&reverse, |_k,v,d| (*d,v.clone()));
new_messages
.concat(&old_messages) .group(move |k, vs, t| t.push((vs[0].0.as_ref().map(|_| logic(k, vs)), 1)))
})
}