#[allow(unused_variables)]
extern crate fnv;
extern crate rand;
extern crate time;
extern crate timely;
extern crate differential_dataflow;
use std::io::{BufReader, BufRead};
use std::fs::File;
use timely::dataflow::*;
use timely::dataflow::operators::*;
use differential_dataflow::{Data, Collection};
use differential_dataflow::operators::*;
use differential_dataflow::operators::join::JoinUnsigned;
use differential_dataflow::operators::group::{GroupUnsigned};
macro_rules! join {
($source:ident : $help:ident : $(($other:ident, $key:expr, $recons:expr, $index:expr)),*) => {{
let mut counts = $source.map(|(x,w)| ((x, (1 << 30, 1 << 30)), w));
$( // for each other relation, determine the count of its extensions, then join against the counts so far.
let temp = counts.map(|(p,ci)| ($key(p).0, ($key(p).1,ci)));
counts = $other.map(|(p,_e)| (p,())).group_u(|_k,s,t| t.push(((s.next().unwrap().1, $index),1)))
.join_u(&temp).map(|(k,c1,(r,c2))| ($recons(k,r), if c1.0 < c2.0 { c1 } else { c2 }));
)*
let mut proposals = $help;
$(
proposals = counts.filter(|&((_p,(_c,i)),_w)| i == $index)
.map(|(p,_)| $key(p))
.join_u(&$other)
.map(|(k,r,e)| ($recons(k,r),e))
.concat(&proposals);
)*
$(
proposals = proposals.map(|(p,e)| (($key(p).0,e),$key(p).1))
.semijoin(&$other)
.map(|((k,e), r)| ($recons(k,r),e));
)*
proposals
}}
}
fn for_each_line<F: FnMut(String)>(filename: &str, mut logic: F) {
let file = BufReader::new(File::open(filename).unwrap());
for readline in file.lines() {
let line = readline.ok().expect("read error");
logic(line);
}
}
fn for_each_pair_in<F: FnMut((u32, u32))>(filename: &str, mut logic: F) {
for_each_line(filename, |string| {
if !string.starts_with('#') {
let mut fields = string[..].split(',');
let a: u32 = fields.next().unwrap().parse().ok().expect("parse error");
let b: u32 = fields.next().unwrap().parse().ok().expect("parse error");
logic((a, b))
}
});
}
fn for_each_trip_in<F: FnMut((u32, u32, u32))>(filename: &str, mut logic: F) {
for_each_line(filename, |string| {
let mut fields = string[..].split(",");
let a: u32 = fields.next().unwrap().parse().ok().expect("parse error");
let b: u32 = fields.next().unwrap().parse().ok().expect("parse error");
let c: u32 = fields.next().unwrap().parse().ok().expect("parse error");
logic((a, b, c))
});
}
fn main() {
timely::execute_from_args(std::env::args(), |root| {
let (mut c, mut p) = root.scoped::<u64, _, _>(move |outer| {
let (c_input, c) = outer.new_input();
let c = Collection::new(c);
let (p_input, p) = outer.new_input();
let p = Collection::new(p);
let c2 = c.map(|(x,y,_z)| (x,y));
let ce = c.map(|(x,y,z)| ((x,y),z)).filter(|_| false);
let trips = join!(p : ce :
(p, |(x,y)| (x,y), |x, y| (x,y), 0u32),
(c2, |(x,y)| (y,x), |y, x| (x,y), 1u32)
);
(c_input, p_input)
});
if root.index() == 0 {
for_each_trip_in("/Users/mcsherry/Projects/Datasets/galen/c.txt", |x| c.send((x,1)));
for_each_pair_in("/Users/mcsherry/Projects/Datasets/galen/p-final.txt", |x| p.send((x,1)));
}
});
}