differential-dataflow 0.15.2

An incremental data-parallel dataflow platform
Documentation
use rand::{Rng, SeedableRng, StdRng};

use differential_dataflow::input::Input;
// use differential_dataflow::operators::Count;
use differential_dataflow::operators::count::CountTotal;

fn main() {

    let mut args = std::env::args();
    args.next();

    let nodes: u32 = args.next().unwrap().parse().unwrap();
    let edges: usize = args.next().unwrap().parse().unwrap();
    let batch: u32 = args.next().unwrap().parse().unwrap();
    let inspect: bool = args.next().unwrap() == "inspect";

    // define a new computational scope, in which to run BFS
    timely::execute_from_args(std::env::args().skip(5), move |worker| {

        let timer = ::std::time::Instant::now();

        let index = worker.index();
        let peers = worker.peers();

        // create a degree counting differential dataflow
        let (mut input, probe) = worker.dataflow::<u32,_,_>(|scope| {

            // create edge input, count a few ways.
            let (input, edges) = scope.new_collection::<_,i32>();

            let out_degr_distr =
            edges
                .map(|(src, _dst)| src)
                .count_total()
                .map(|(_src, cnt)| cnt as usize)
                .count_total();

            // show us something about the collection, notice when done.
            let probe =
            out_degr_distr
                .filter(move |_| inspect)
                .inspect(|x| println!("observed: {:?}", x))
                .probe();

            (input, probe)
        });

        let seed: &[_] = &[1, 2, 3, index];
        let mut rng1: StdRng = SeedableRng::from_seed(seed);    // rng for edge additions
        let mut rng2: StdRng = SeedableRng::from_seed(seed);    // rng for edge deletions

        // Load up graph data. Round-robin among workers.
        for _ in 0 .. (edges / peers) + if index < (edges % peers) { 1 } else { 0 } {
            input.update((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1)
        }

        input.advance_to(1);
        input.flush();
        worker.step_while(|| probe.less_than(input.time()));

        if index == 0 {
            println!("round 0 finished after {:?} (loading)", timer.elapsed());
        }

        if batch > 0 {
            // Just have worker zero drive input production.
            if index == 0 {

                let mut next = batch;
                for round in 1 .. {

                    input.advance_to(round);
                    input.update((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1);
                    input.update((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)), -1);

                    if round > next {
                        let timer = ::std::time::Instant::now();
                        input.flush();
                        while probe.less_than(input.time()) {
                            worker.step();
                        }
                        println!("round {} finished after {:?}", next, timer.elapsed());
                        next += batch;
                    }
                }
            }
        }
    }).unwrap();
}