extern crate rand;
extern crate getopts;
extern crate timely;
extern crate graph_map;
extern crate differential_dataflow;
use std::hash::Hash;
use timely::dataflow::*;
use timely::dataflow::operators::*;
use differential_dataflow::{Collection, Data};
use differential_dataflow::collection::LeastUpperBound;
use differential_dataflow::operators::*;
use differential_dataflow::operators::join::JoinUnsigned;
use differential_dataflow::operators::group::GroupUnsigned;
use differential_dataflow::collection::trace::CollectionIterator;
use differential_dataflow::collection::basic::DifferenceIterator;
use graph_map::GraphMMap;
type Node = u32;
type Edge = (Node, Node);
type Nodes<G,V> = Collection<G, (Node, V)>;
type Edges<G> = Collection<G, Edge>;
fn main() {
let filename = std::env::args().nth(1).unwrap();
timely::execute_from_args(std::env::args().skip(1), move |computation| {
let peers = computation.peers();
let index = computation.index();
let graph = GraphMMap::new(&filename);
let nodes = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
let edges = (0..nodes).filter(move |node| node % peers == index)
.flat_map(move |node| {
let vec = graph.edges(node).to_vec();
vec.into_iter().map(move |edge| ((node as u32, edge),1))
});
computation.scoped::<u64,_,_>(|scope| {
let edges = Collection::new(edges.to_stream(scope));
let edges = edges.map(|(x,y)| (y,x)).concat(&edges);
_reach(&edges).map(|(_x,c)| c).consolidate().inspect(|x| println!("{:?}", x));
});
}).unwrap();
}
fn _color<G: Scope>(edges: &Edges<G>) -> Nodes<G,Option<u32>>
where G::Timestamp: LeastUpperBound+Hash {
let start = edges.map(|(x,_y)| (x,u32::max_value()))
.consolidate_by(|x| x.0);
sequence(&start, &edges, |_node, vals| {
let mut candidate = 1;
while let Some((color, _)) = vals.next() {
if let Some(color) = *color {
if color == candidate {
candidate += 1;
}
}
else {
println!("whoa, None?");
}
}
candidate
})
}
fn _reach<G: Scope>(edges: &Edges<G>) -> Nodes<G,Option<bool>>
where G::Timestamp: LeastUpperBound+Hash {
let start = edges.map(|(x,_y)| (x, x < 10))
.consolidate_by(|x| x.0);
sequence(&start, &edges, |_node, vals| {
let mut reached = false;
while let Some((b, _)) = vals.next() {
if let Some(b) = *b {
if b { reached = true; }
}
}
reached
})
}
fn sequence<G: Scope, V: Data, F>(state: &Nodes<G, V>, edges: &Edges<G>, logic: F) -> Nodes<G, Option<V>>
where G::Timestamp: LeastUpperBound+Hash,
F: Fn(&Node, &mut CollectionIterator<DifferenceIterator<Option<V>>>)->V+'static {
state.map(|(node, _state)| (node, None))
.iterate(|new_state| {
let edges = edges.enter(&new_state.scope());
let old_state = state.enter(&new_state.scope())
.map(|x| (x.0, Some(x.1)));
let forward = edges.filter(|edge| edge.0 < edge.1);
let reverse = edges.filter(|edge| edge.0 > edge.1);
let new_messages = new_state.join_map_u(&forward, |_k,v,d| (*d,v.clone()));
let old_messages = old_state.join_map_u(&reverse, |_k,v,d| (*d,v.clone()));
let result = new_messages.concat(&old_messages)
.group_u(move |k, vs, t| {
if vs.peek().unwrap().0.is_some() {
t.push((Some(logic(k, vs)), 1));
}
else {
t.push((None, 1));
}
});
result.inner.count().inspect_batch(|t,xs| println!("count[{:?}]:\t{:?}", t, xs));
result
})
}