timely 0.29.0

A low-latency data-parallel dataflow system in Rust
Documentation
use std::collections::HashMap;

use timely::dataflow::*;
use timely::dataflow::operators::{Input, Probe};
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Exchange;

fn main() {

    // command-line args: numbers of nodes and edges in the random graph.
    let keys: u64 = std::env::args().nth(1).unwrap().parse().unwrap();
    let vals: usize = std::env::args().nth(2).unwrap().parse().unwrap();
    let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();

    timely::execute_from_args(std::env::args().skip(4), move |worker| {

        let index = worker.index();
        let peers = worker.peers();

        let mut input1 = InputHandle::new();
        let mut input2 = InputHandle::new();
        let probe = ProbeHandle::new();

        worker.dataflow(|scope| {

            let stream1 = scope.input_from(&mut input1);
            let stream2 = scope.input_from(&mut input2);

            let exchange1 = Exchange::new(|x: &(u64, u64)| x.0);
            let exchange2 = Exchange::new(|x: &(u64, u64)| x.0);

            stream1
                .binary(stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {

                    let mut map1 = HashMap::<u64, Vec<u64>>::new();
                    let mut map2 = HashMap::<u64, Vec<u64>>::new();

                    move |input1, input2, output| {

                        // Drain first input, check second map, update first map.
                        input1.for_each_time(|time, data| {
                            let mut session = output.session(&time);
                            for (key, val1) in data.flat_map(|d| d.drain(..)) {
                                if let Some(values) = map2.get(&key) {
                                    for val2 in values.iter() {
                                        session.give((val1, *val2));
                                    }
                                }

                                map1.entry(key).or_default().push(val1);
                            }
                        });

                        // Drain second input, check first map, update second map.
                        input2.for_each_time(|time, data| {
                            let mut session = output.session(&time);
                            for (key, val2) in data.flat_map(|d| d.drain(..)) {
                                if let Some(values) = map1.get(&key) {
                                    for val1 in values.iter() {
                                        session.give((*val1, val2));
                                    }
                                }

                                map2.entry(key).or_default().push(val2);
                            }
                        });
                    }
                })
                .container::<Vec<_>>()
                .probe_with(&probe);
        });

        // Generate roughly random data.
        use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
        let hasher = BuildHasherDefault::<DefaultHasher>::new();
        let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) % keys,
                                             hasher.hash_one(&(i,index,1)) % keys,
                                             hasher.hash_one(&(i,index,2)) % keys,
                                             hasher.hash_one(&(i,index,3)) % keys));

        let timer = std::time::Instant::now();

        let mut sent = 0;
        while sent < (vals / peers) {

            // Send some amount of data, no more than `batch`.
            let to_send = std::cmp::min(batch, vals/peers - sent);
            for (src0, dst0, src1, dst1) in (&mut insert).take(to_send) {
                input1.send((src0, dst0));
                input2.send((src1, dst1));
            }
            sent += to_send;

            // Advance input, iterate until data cleared.
            let next = input1.time() + 1;
            input1.advance_to(next);
            input2.advance_to(next);
            while probe.less_than(input1.time()) {
                worker.step();
            }

            println!("{:?}\tworker {} batch complete", timer.elapsed(), index)
        }

    }).unwrap(); // asserts error-free execution;
}