timely 0.29.0

A low-latency data-parallel dataflow system in Rust
Documentation
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::Input;
use timely::dataflow::InputHandle;
use timely::Config;

#[test] fn operator_scaling_1() { operator_scaling(1); }
#[test] fn operator_scaling_10() { operator_scaling(10); }
#[test] fn operator_scaling_100() { operator_scaling(100); }
#[test] #[cfg_attr(miri, ignore)] fn operator_scaling_1000() { operator_scaling(1000); }
#[test] #[cfg_attr(miri, ignore)] fn operator_scaling_10000() { operator_scaling(10000); }
#[test] #[cfg_attr(miri, ignore)] fn operator_scaling_100000() { operator_scaling(100000); }

fn operator_scaling(scale: u64) {
    timely::execute(Config::thread(), move |worker| {
        let mut input = InputHandle::new();
        worker.dataflow::<u64, _, _>(|scope| {
            use timely::dataflow::operators::vec::Partition;
            let parts =
            scope
                .input_from(&mut input)
                .partition(scale, |()| (0, ()));

            use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
            let mut builder = OperatorBuilder::new("OpScaling".to_owned(), scope.clone());
            let mut handles = Vec::with_capacity(parts.len());
            let mut outputs = Vec::with_capacity(parts.len());
            for (index, part) in parts.into_iter().enumerate() {
                let (output, stream) = builder.new_output_connection::<Vec<()>,_>([]);
                use timely::progress::Antichain;
                let connectivity = [(index, Antichain::from_elem(Default::default()))];
                handles.push((builder.new_input_connection(part, Pipeline, connectivity), output));
                outputs.push(stream);
            }

            builder.build(move |_| {
                move |_frontiers| {
                    for (input, output) in handles.iter_mut() {
                        let mut output = output.activate();
                        input.for_each(|time, data| {
                            output.give(&time, data);
                        });
                    }
                }
            });
        });
    })
    .unwrap();
}

#[test] fn subgraph_scaling_1() { subgraph_scaling(1); }
#[test] fn subgraph_scaling_10() { subgraph_scaling(10); }
#[test] fn subgraph_scaling_100() { subgraph_scaling(100); }
#[test] #[cfg_attr(miri, ignore)] fn subgraph_scaling_1000() { subgraph_scaling(1000); }
#[test] #[cfg_attr(miri, ignore)] fn subgraph_scaling_10000() { subgraph_scaling(10000); }
#[test] #[cfg_attr(miri, ignore)] fn subgraph_scaling_100000() { subgraph_scaling(100000); }

fn subgraph_scaling(scale: u64) {
    timely::execute(Config::thread(), move |worker| {
        let mut input = InputHandle::new();
        worker.dataflow::<u64, _, _>(|scope| {
            use timely::dataflow::operators::vec::Partition;
            let parts =
            scope
                .input_from(&mut input)
                .partition(scale, |()| (0, ()));

            let _outputs = scope.region(|inner| {
                use timely::dataflow::operators::{Enter, Leave};
                parts.into_iter().map(|part| part.enter(inner).leave(scope)).collect::<Vec<_>>()
            });
        });
    })
    .unwrap();
}