extern crate rand;
extern crate timely;
extern crate graph_map;
extern crate differential_dataflow;
use std::time::Instant;
use timely::dataflow::*;
use differential_dataflow::input::Input;
use differential_dataflow::operators::*;
use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace;
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::difference::DiffPair;
type Arrange<G: Scope, K, V, R> = Arranged<G, K, V, R, TraceAgent<K, V, G::Timestamp, R, DefaultValTrace<K, V, G::Timestamp, R>>>;
fn main() {
let filename = std::env::args().nth(1).unwrap();
timely::execute_from_args(std::env::args().skip(1), move |worker| {
let peers = worker.peers();
let index = worker.index();
let mut input = worker.dataflow::<(),_,_>(|scope| {
let (input, graph) = scope.new_collection();
let organizers = graph.explode(|(x,y)| Some((x, DiffPair::new(1,0))).into_iter().chain(Some((y, DiffPair::new(0,1))).into_iter()))
.threshold_total(|w| if w.element2 == 0 { 1 } else { 0 });
organizers
.iterate(|attend| {
graph.enter(&attend.scope())
.semijoin(attend)
.map(|(_,y)| y)
.threshold_total(|w| if w >= 3 { 1 } else { 0 })
.concat(&organizers.enter(&attend.scope()))
.consolidate()
})
.map(|_| ())
.consolidate()
.inspect(|x| println!("{:?}", x));
input
});
let timer = Instant::now();
use std::io::{BufReader, BufRead};
use std::fs::File;
let file = BufReader::new(File::open(filename.clone()).unwrap());
for (count, readline) in file.lines().enumerate() {
let line = readline.ok().expect("read error");
if count % peers == index && !line.starts_with('#') {
let mut elts = line[..].split_whitespace();
let src: u32 = elts.next().unwrap().parse().ok().expect("malformed src");
let dst: u32 = elts.next().unwrap().parse().ok().expect("malformed dst");
input.insert((src, dst));
}
}
println!("{:?}\tData ingested", timer.elapsed());
}).unwrap();
}