timely 0.9.0

A low-latency data-parallel dataflow system in Rust
Documentation
extern crate timely;

use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Exchange, Probe};

// use timely::dataflow::operators::capture::EventWriter;
// use timely::dataflow::ScopeParent;
use timely::logging::TimelyEvent;

fn main() {
    // initializes and runs a timely dataflow.
    let config = timely::Configuration::from_args(::std::env::args()).unwrap();
    timely::execute(config, |worker| {

        let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
        let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
        let mut input = InputHandle::new();
        let mut probe = ProbeHandle::new();

        // Register timely worker logging.
        worker.log_register().insert::<TimelyEvent,_>("timely", |_time, data|
            data.iter().for_each(|x| println!("LOG1: {:?}", x))
        );

        // create a new input, exchange data, and inspect its output
        worker.dataflow(|scope| {
            scope
                .input_from(&mut input)
                .exchange(|&x| x as u64)
                .probe_with(&mut probe);
        });

        // Register timely worker logging.
        worker.log_register().insert::<TimelyEvent,_>("timely", |_time, data|
            data.iter().for_each(|x| println!("LOG2: {:?}", x))
        );

        // create a new input, exchange data, and inspect its output
        worker.dataflow(|scope| {
            scope
                .input_from(&mut input)
                .exchange(|&x| x as u64)
                .probe_with(&mut probe);
        });

        // Register user-level logging.
        worker.log_register().insert::<(),_>("input", |_time, data|
            for element in data.iter() {
                println!("Round tick at: {:?}", element.0);
            }
        );

        let input_logger = worker.log_register().get::<()>("input").expect("Input logger absent");

        let timer = std::time::Instant::now();

        for round in 0 .. rounds {

            for i in 0 .. batch {
                input.send(i);
            }
            input.advance_to(round);
            input_logger.log(());

            while probe.less_than(input.time()) {
                worker.step();
            }

        }

        let volume = (rounds * batch) as f64;
        let elapsed = timer.elapsed();
        let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);

        println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);

    }).unwrap();
}