pub fn execute<T, F>(config: Config, func: F) -> Result<WorkerGuards<T>, String>Expand description
Executes a timely dataflow from a configuration and per-communicator logic.
The execute method takes a Configuration and spins up some number of
workers threads, each of which execute the supplied closure to construct
and run a timely dataflow computation.
The closure may return a T: Send+'static. The execute method returns
immediately after initializing the timely computation with a result
containing a WorkerGuards<T> (or error information), which can be joined
to recover the result T values from the local workers.
Note: if the caller drops the result of execute, the drop code will
block awaiting the completion of the timely computation. If the result
of the method is not captured it will be dropped, which gives the experience
of execute blocking; to regain control after execute be sure to
capture the results and drop them only when the calling thread has no
other work to perform.
ยงExamples
use timely::dataflow::operators::{ToStream, Inspect};
// execute a timely dataflow using three worker threads.
timely::execute(timely::Config::process(3), |worker| {
worker.dataflow::<(),_,_>(|scope| {
(0..10).to_stream(scope)
.container::<Vec<_>>()
.inspect(|x| println!("seen: {:?}", x));
})
}).unwrap();The following example demonstrates how one can extract data from a multi-worker execution. In a multi-process setting, each process will only receive those records present at workers in the process.
use std::sync::{Arc, Mutex};
use timely::dataflow::operators::{ToStream, Inspect, Capture};
use timely::dataflow::operators::capture::Extract;
// get send and recv endpoints, wrap send to share
let (send, recv) = ::std::sync::mpsc::channel();
let send = Arc::new(Mutex::new(send));
// execute a timely dataflow using three worker threads.
timely::execute(timely::Config::process(3), move |worker| {
let send = send.lock().unwrap().clone();
worker.dataflow::<(),_,_>(move |scope| {
(0..10).to_stream(scope)
.container::<Vec<_>>()
.inspect(|x| println!("seen: {:?}", x))
.capture_into(send);
});
}).unwrap();
// the extracted data should have data (0..10) thrice at timestamp 0.
assert_eq!(recv.extract()[0].1, (0..30).map(|x| x / 3).collect::<Vec<_>>());Examples found in repository?
4fn main() {
5 timely::execute(Config::thread(), |worker| {
6 let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
7 let (input, stream) = scope.new_unordered_input();
8 stream.container::<Vec<_>>().inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
9 input
10 });
11
12 for round in 0..10 {
13 input.activate().session(&cap).give(round);
14 cap = cap.delayed(&(round + 1));
15 worker.step();
16 }
17 }).unwrap();
18}More examples
6fn main() {
7 timely::execute(Config::process(4), |worker| {
8
9 let timer = Instant::now();
10 let mut sequencer = Sequencer::new(worker, Instant::now());
11
12 for round in 0 .. {
13 // if worker.index() < 3 {
14 std::thread::sleep(Duration::from_secs(1 + worker.index() as u64));
15 sequencer.push(format!("worker {:?}, round {:?}", worker.index(), round));
16 // }
17 for element in &mut sequencer {
18 println!("{:?}:\tWorker {:?}:\t recv'd: {:?}", timer.elapsed(), worker.index(), element);
19 }
20 worker.step();
21 }
22
23 }).unwrap(); // asserts error-free execution;
24}21fn main() {
22
23 type InnerContainer = <WordCount as columnar::Columnar>::Container;
24 type Container = Column<InnerContainer>;
25
26 use columnar::Len;
27
28 let config = timely::Config {
29 communication: timely::CommunicationConfig::ProcessBinary(3),
30 worker: timely::WorkerConfig::default(),
31 };
32
33 // initializes and runs a timely dataflow.
34 timely::execute(config, |worker| {
35 let mut input = <InputHandle<_, CapacityContainerBuilder<Container>>>::new();
36 let probe = ProbeHandle::new();
37
38 // create a new input, exchange data, and inspect its output
39 worker.dataflow::<usize, _, _>(|scope| {
40 input
41 .to_stream(scope)
42 .unary(
43 Pipeline,
44 "Split",
45 |_cap, _info| {
46 move |input, output| {
47 input.for_each_time(|time, data| {
48 let mut session = output.session(&time);
49 for data in data {
50 for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
51 wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff })
52 }) {
53 session.give(wordcount);
54 }
55 }
56 });
57 }
58 },
59 )
60 .container::<Container>()
61 .unary_frontier(
62 ExchangeCore::<ColumnBuilder<InnerContainer>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
63 "WordCount",
64 |_capability, _info| {
65 let mut queues = HashMap::new();
66 let mut counts = HashMap::new();
67
68 move |(input, frontier), output| {
69 input.for_each_time(|time, data| {
70 queues
71 .entry(time.retain(output.output_index()))
72 .or_insert(Vec::new())
73 .extend(data.map(std::mem::take));
74
75 });
76
77 for (key, val) in queues.iter_mut() {
78 if !frontier.less_equal(key.time()) {
79 let mut session = output.session(key);
80 for batch in val.drain(..) {
81 for wordcount in batch.borrow().into_index_iter() {
82 let total =
83 if let Some(count) = counts.get_mut(wordcount.text) {
84 *count += wordcount.diff;
85 *count
86 }
87 else {
88 counts.insert(wordcount.text.to_string(), *wordcount.diff);
89 *wordcount.diff
90 };
91 session.give(WordCountReference { text: wordcount.text, diff: total });
92 }
93 }
94 }
95 }
96
97 queues.retain(|_key, val| !val.is_empty());
98 }
99 },
100 )
101 .container::<Container>()
102 .inspect_container(|x| {
103 match x {
104 Ok((time, data)) => {
105 println!("seen at: {:?}\t{:?} records", time, data.record_count());
106 for wc in data.borrow().into_index_iter() {
107 println!(" {}: {}", wc.text, wc.diff);
108 }
109 },
110 Err(frontier) => println!("frontier advanced to {:?}", frontier),
111 }
112 })
113 .probe_with(&probe);
114 });
115
116 // introduce data and watch!
117 for round in 0..10 {
118 input.send(WordCountReference { text: "flat container", diff: 1 });
119 input.advance_to(round + 1);
120 while probe.less_than(input.time()) {
121 worker.step();
122 }
123 }
124 })
125 .unwrap();
126}