Skip to main content

execute

Function execute 

Source
pub fn execute<T, F>(config: Config, func: F) -> Result<WorkerGuards<T>, String>
where T: Send + 'static, F: Fn(&mut Worker<Allocator>) -> T + Send + Sync + 'static,
Expand description

Executes a timely dataflow from a configuration and per-communicator logic.

The execute method takes a Configuration and spins up some number of workers threads, each of which execute the supplied closure to construct and run a timely dataflow computation.

The closure may return a T: Send+'static. The execute method returns immediately after initializing the timely computation with a result containing a WorkerGuards<T> (or error information), which can be joined to recover the result T values from the local workers.

Note: if the caller drops the result of execute, the drop code will block awaiting the completion of the timely computation. If the result of the method is not captured it will be dropped, which gives the experience of execute blocking; to regain control after execute be sure to capture the results and drop them only when the calling thread has no other work to perform.

ยงExamples

use timely::dataflow::operators::{ToStream, Inspect};

// execute a timely dataflow using three worker threads.
timely::execute(timely::Config::process(3), |worker| {
    worker.dataflow::<(),_,_>(|scope| {
        (0..10).to_stream(scope)
               .container::<Vec<_>>()
               .inspect(|x| println!("seen: {:?}", x));
    })
}).unwrap();

The following example demonstrates how one can extract data from a multi-worker execution. In a multi-process setting, each process will only receive those records present at workers in the process.

use std::sync::{Arc, Mutex};
use timely::dataflow::operators::{ToStream, Inspect, Capture};
use timely::dataflow::operators::capture::Extract;

// get send and recv endpoints, wrap send to share
let (send, recv) = ::std::sync::mpsc::channel();
let send = Arc::new(Mutex::new(send));

// execute a timely dataflow using three worker threads.
timely::execute(timely::Config::process(3), move |worker| {
    let send = send.lock().unwrap().clone();
    worker.dataflow::<(),_,_>(move |scope| {
        (0..10).to_stream(scope)
               .container::<Vec<_>>()
               .inspect(|x| println!("seen: {:?}", x))
               .capture_into(send);
    });
}).unwrap();

// the extracted data should have data (0..10) thrice at timestamp 0.
assert_eq!(recv.extract()[0].1, (0..30).map(|x| x / 3).collect::<Vec<_>>());
Examples found in repository?
examples/unordered_input.rs (lines 5-17)
4fn main() {
5    timely::execute(Config::thread(), |worker| {
6        let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
7            let (input, stream) = scope.new_unordered_input();
8            stream.container::<Vec<_>>().inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
9            input
10        });
11
12        for round in 0..10 {
13            input.activate().session(&cap).give(round);
14            cap = cap.delayed(&(round + 1));
15            worker.step();
16        }
17    }).unwrap();
18}
More examples
Hide additional examples
examples/sequence.rs (lines 7-23)
6fn main() {
7    timely::execute(Config::process(4), |worker| {
8
9        let timer = Instant::now();
10        let mut sequencer = Sequencer::new(worker, Instant::now());
11
12        for round in 0 .. {
13            // if worker.index() < 3 {
14                std::thread::sleep(Duration::from_secs(1 + worker.index() as u64));
15                sequencer.push(format!("worker {:?}, round {:?}", worker.index(), round));
16            // }
17            for element in &mut sequencer {
18                println!("{:?}:\tWorker {:?}:\t recv'd: {:?}", timer.elapsed(), worker.index(), element);
19            }
20            worker.step();
21        }
22
23    }).unwrap(); // asserts error-free execution;
24}
examples/columnar.rs (lines 34-124)
21fn main() {
22
23    type InnerContainer = <WordCount as columnar::Columnar>::Container;
24    type Container = Column<InnerContainer>;
25
26    use columnar::Len;
27
28    let config = timely::Config {
29        communication: timely::CommunicationConfig::ProcessBinary(3),
30        worker: timely::WorkerConfig::default(),
31    };
32
33    // initializes and runs a timely dataflow.
34    timely::execute(config, |worker| {
35        let mut input = <InputHandle<_, CapacityContainerBuilder<Container>>>::new();
36        let probe = ProbeHandle::new();
37
38        // create a new input, exchange data, and inspect its output
39        worker.dataflow::<usize, _, _>(|scope| {
40            input
41                .to_stream(scope)
42                .unary(
43                    Pipeline,
44                    "Split",
45                    |_cap, _info| {
46                        move |input, output| {
47                            input.for_each_time(|time, data| {
48                                let mut session = output.session(&time);
49                                for data in data {
50                                    for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
51                                        wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff })
52                                    }) {
53                                        session.give(wordcount);
54                                    }
55                                }
56                            });
57                        }
58                    },
59                )
60                .container::<Container>()
61                .unary_frontier(
62                    ExchangeCore::<ColumnBuilder<InnerContainer>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
63                    "WordCount",
64                    |_capability, _info| {
65                        let mut queues = HashMap::new();
66                        let mut counts = HashMap::new();
67
68                        move |(input, frontier), output| {
69                            input.for_each_time(|time, data| {
70                                queues
71                                    .entry(time.retain(output.output_index()))
72                                    .or_insert(Vec::new())
73                                    .extend(data.map(std::mem::take));
74
75                            });
76
77                            for (key, val) in queues.iter_mut() {
78                                if !frontier.less_equal(key.time()) {
79                                    let mut session = output.session(key);
80                                    for batch in val.drain(..) {
81                                        for wordcount in batch.borrow().into_index_iter() {
82                                            let total =
83                                            if let Some(count) = counts.get_mut(wordcount.text) {
84                                                *count += wordcount.diff;
85                                                *count
86                                            }
87                                            else {
88                                                counts.insert(wordcount.text.to_string(), *wordcount.diff);
89                                                *wordcount.diff
90                                            };
91                                            session.give(WordCountReference { text: wordcount.text, diff: total });
92                                        }
93                                    }
94                                }
95                            }
96
97                            queues.retain(|_key, val| !val.is_empty());
98                        }
99                    },
100                )
101                .container::<Container>()
102                .inspect_container(|x| {
103                    match x {
104                        Ok((time, data)) => {
105                            println!("seen at: {:?}\t{:?} records", time, data.record_count());
106                            for wc in data.borrow().into_index_iter() {
107                                println!("  {}: {}", wc.text, wc.diff);
108                            }
109                        },
110                        Err(frontier) => println!("frontier advanced to {:?}", frontier),
111                    }
112                })
113                .probe_with(&probe);
114        });
115
116        // introduce data and watch!
117        for round in 0..10 {
118            input.send(WordCountReference { text: "flat container", diff: 1 });
119            input.advance_to(round + 1);
120            while probe.less_than(input.time()) {
121                worker.step();
122            }
123        }
124    })
125    .unwrap();
126}