extern crate timely;
extern crate differential_dataflow;
use differential_dataflow::input::Input;
use differential_dataflow::operators::Threshold;
fn main() {
let large: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let small: usize = std::env::args().nth(2).unwrap().parse().unwrap();
let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
let total: usize = std::env::args().nth(4).unwrap().parse().unwrap();
timely::execute_from_args(std::env::args().skip(3), move |worker| {
let timer = ::std::time::Instant::now();
let mut probe = timely::dataflow::operators::probe::Handle::new();
let mut handle = worker.dataflow(|scope| {
let (handle, input) = scope.new_collection();
input.distinct().probe_with(&mut probe);
handle
});
println!("{:?}:\tloading edges", timer.elapsed());
let mut next = batch;
let mut value = worker.index();
while value < total {
if value >= next {
handle.advance_to(next);
handle.flush();
next += batch;
while probe.less_than(handle.time()) { worker.step(); }
}
handle.advance_to(value);
handle.insert(value % large);
handle.insert(value % small);
value += worker.peers();
}
handle.advance_to(total);
handle.flush();
while probe.less_than(handle.time()) { worker.step(); }
println!("{:?}\tdata loaded", timer.elapsed());
let mut next = batch;
let mut value = worker.index();
while value < total {
if value >= next {
handle.advance_to(total + next);
handle.flush();
next += batch;
while probe.less_than(handle.time()) { worker.step(); }
}
handle.advance_to(total + value);
handle.remove(value % large);
handle.remove(value % small);
value += worker.peers();
}
handle.advance_to(total + total);
handle.flush();
while probe.less_than(handle.time()) { worker.step(); }
println!("{:?}\tdata unloaded", timer.elapsed());
while worker.step_or_park(None) { }
}).unwrap();
}