[−][src]Function timely::execute::execute
pub fn execute<T, F>(
config: Configuration,
func: F
) -> Result<WorkerGuards<T>, String> where
T: Send + 'static,
F: Fn(&mut Worker<Allocator>) -> T + Send + Sync + 'static,
Executes a timely dataflow from a configuration and per-communicator logic.
The execute
method takes a Configuration
and spins up some number of
workers threads, each of which execute the supplied closure to construct
and run a timely dataflow computation.
The closure may return a T: Send+'static
, and execute
returns a result
containing a WorkerGuards<T>
(or error information), which can be joined
to recover the result T
values from the local workers.
Examples
use timely::dataflow::operators::{ToStream, Inspect}; // execute a timely dataflow using three worker threads. timely::execute(timely::Configuration::Process(3), |worker| { worker.dataflow::<(),_,_>(|scope| { (0..10).to_stream(scope) .inspect(|x| println!("seen: {:?}", x)); }) }).unwrap();
The following example demonstrates how one can extract data from a multi-worker execution. In a multi-process setting, each process will only receive those records present at workers in the process.
use std::sync::{Arc, Mutex}; use timely::dataflow::operators::{ToStream, Inspect, Capture}; use timely::dataflow::operators::capture::Extract; // get send and recv endpoints, wrap send to share let (send, recv) = ::std::sync::mpsc::channel(); let send = Arc::new(Mutex::new(send)); // execute a timely dataflow using three worker threads. timely::execute(timely::Configuration::Process(3), move |worker| { let send = send.lock().unwrap().clone(); worker.dataflow::<(),_,_>(move |scope| { (0..10).to_stream(scope) .inspect(|x| println!("seen: {:?}", x)) .capture_into(send); }); }).unwrap(); // the extracted data should have data (0..10) thrice at timestamp 0. assert_eq!(recv.extract()[0].1, (0..30).map(|x| x / 3).collect::<Vec<_>>());