[][src]Function timely::execute::execute

pub fn execute<T, F>(
    config: Configuration,
    func: F
) -> Result<WorkerGuards<T>, String> where
    T: Send + 'static,
    F: Fn(&mut Worker<Allocator>) -> T + Send + Sync + 'static, 

Executes a timely dataflow from a configuration and per-communicator logic.

The execute method takes a Configuration and spins up some number of workers threads, each of which execute the supplied closure to construct and run a timely dataflow computation.

The closure may return a T: Send+'static. The execute method returns immediately after initializing the timely computation with a result containing a WorkerGuards<T> (or error information), which can be joined to recover the result T values from the local workers.

Note: if the caller drops the result of execute, the drop code will block awaiting the completion of the timely computation.

Examples

use timely::dataflow::operators::{ToStream, Inspect};

// execute a timely dataflow using three worker threads.
timely::execute(timely::Configuration::Process(3), |worker| {
    worker.dataflow::<(),_,_>(|scope| {
        (0..10).to_stream(scope)
               .inspect(|x| println!("seen: {:?}", x));
    })
}).unwrap();

The following example demonstrates how one can extract data from a multi-worker execution. In a multi-process setting, each process will only receive those records present at workers in the process.

use std::sync::{Arc, Mutex};
use timely::dataflow::operators::{ToStream, Inspect, Capture};
use timely::dataflow::operators::capture::Extract;

// get send and recv endpoints, wrap send to share
let (send, recv) = ::std::sync::mpsc::channel();
let send = Arc::new(Mutex::new(send));

// execute a timely dataflow using three worker threads.
timely::execute(timely::Configuration::Process(3), move |worker| {
    let send = send.lock().unwrap().clone();
    worker.dataflow::<(),_,_>(move |scope| {
        (0..10).to_stream(scope)
               .inspect(|x| println!("seen: {:?}", x))
               .capture_into(send);
    });
}).unwrap();

// the extracted data should have data (0..10) thrice at timestamp 0.
assert_eq!(recv.extract()[0].1, (0..30).map(|x| x / 3).collect::<Vec<_>>());