timely 0.29.0

A low-latency data-parallel dataflow system in Rust
Documentation
use std::collections::HashMap;

use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Operator, Inspect, Probe};
use timely::dataflow::operators::vec::Map;
use timely::dataflow::channels::pact::Exchange;

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        let mut input = InputHandle::new();
        let probe = ProbeHandle::new();

        // define a distribution function for strings.
        let exchange = Exchange::new(|x: &(String, i64)| (x.0).len() as u64);

        // create a new input, exchange data, and inspect its output
        worker.dataflow::<usize,_,_>(|scope| {
            input.to_stream(scope)
                 .flat_map(|(text, diff): (String, i64)|
                    text.split_whitespace()
                        .map(move |word| (word.to_owned(), diff))
                        .collect::<Vec<_>>()
                 )
                 .unary_frontier(exchange, "WordCount", |_capability, _info| {

                    let mut queues = HashMap::new();
                    let mut counts = HashMap::new();

                    move |(input, frontier), output| {
                        input.for_each_time(|time, data| {
                            queues.entry(time.retain(output.output_index()))
                                  .or_insert(Vec::new())
                                  .extend(data.map(std::mem::take));
                        });

                        for (key, val) in queues.iter_mut() {
                            if !frontier.less_equal(key.time()) {
                                let mut session = output.session(key);
                                for mut batch in val.drain(..) {
                                    for (word, diff) in batch.drain(..) {
                                        let entry = counts.entry(word.clone()).or_insert(0i64);
                                        *entry += diff;
                                        session.give((word, *entry));
                                    }
                                }
                            }
                        }

                        queues.retain(|_key, val| !val.is_empty());
                    }})
                 .container::<Vec<_>>()
                 .inspect(|x| println!("seen: {:?}", x))
                 .probe_with(&probe);
        });

        // introduce data and watch!
        for round in 0..10 {
            input.send(("round".to_owned(), 1));
            input.advance_to(round + 1);
            while probe.less_than(input.time()) {
                worker.step();
            }
        }
    }).unwrap();
}