extern crate timely;
use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Exchange, Probe};
use timely::logging::TimelyEvent;
fn main() {
let config = timely::Configuration::from_args(::std::env::args()).unwrap();
timely::execute(config, |worker| {
let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
let mut input = InputHandle::new();
let mut probe = ProbeHandle::new();
worker.log_register().insert::<TimelyEvent,_>("timely", |_time, data|
data.iter().for_each(|x| println!("LOG1: {:?}", x))
);
worker.dataflow(|scope| {
scope
.input_from(&mut input)
.exchange(|&x| x as u64)
.probe_with(&mut probe);
});
worker.log_register().insert::<TimelyEvent,_>("timely", |_time, data|
data.iter().for_each(|x| println!("LOG2: {:?}", x))
);
worker.dataflow(|scope| {
scope
.input_from(&mut input)
.exchange(|&x| x as u64)
.probe_with(&mut probe);
});
worker.log_register().insert::<(),_>("input", |_time, data|
for element in data.iter() {
println!("Round tick at: {:?}", element.0);
}
);
let input_logger = worker.log_register().get::<()>("input").expect("Input logger absent");
let timer = std::time::Instant::now();
for round in 0 .. rounds {
for i in 0 .. batch {
input.send(i);
}
input.advance_to(round);
input_logger.log(());
while probe.less_than(input.time()) {
worker.step();
}
}
let volume = (rounds * batch) as f64;
let elapsed = timer.elapsed();
let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
}).unwrap();
}