pub fn execute_from_args<I, T, F>(
iter: I,
func: F,
) -> Result<WorkerGuards<T>, String>Expand description
Executes a timely dataflow from supplied arguments and per-communicator logic.
The execute method takes arguments (typically std::env::args()) and spins up some number of
workers threads, each of which execute the supplied closure to construct and run a timely
dataflow computation.
The closure may return a T: Send+'static. The execute_from_args method
returns immediately after initializing the timely computation with a result
containing a WorkerGuards<T> (or error information), which can be joined
to recover the result T values from the local workers.
Note: if the caller drops the result of execute_from_args, the drop code
will block awaiting the completion of the timely computation.
The arguments execute_from_args currently understands are:
-w, --threads: number of per-process worker threads.
-n, --processes: number of processes involved in the computation.
-p, --process: identity of this process; from 0 to n-1.
-h, --hostfile: a text file whose lines are “hostname:port” in order of process identity.
If not specified, localhost will be used, with port numbers increasing from 2101 (chosen
arbitrarily).
This method is only available if the getopts feature is enabled, which
it is by default.
§Examples
use timely::dataflow::operators::{ToStream, Inspect};
// execute a timely dataflow using command line parameters
timely::execute_from_args(std::env::args(), |worker| {
worker.dataflow::<(),_,_>(|scope| {
(0..10).to_stream(scope)
.container::<Vec<_>>()
.inspect(|x| println!("seen: {:?}", x));
})
}).unwrap();host0% cargo run -- -w 2 -n 4 -h hosts.txt -p 0
host1% cargo run -- -w 2 -n 4 -h hosts.txt -p 1
host2% cargo run -- -w 2 -n 4 -h hosts.txt -p 2
host3% cargo run -- -w 2 -n 4 -h hosts.txt -p 3% cat hosts.txt
host0:port
host1:port
host2:port
host3:portExamples found in repository?
5fn main() {
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let addr = format!("127.0.0.1:{}", 8000 + worker.index());
9 let send = TcpStream::connect(addr).unwrap();
10
11 worker.dataflow::<u64,_,_>(|scope|
12 (0..10u64)
13 .to_stream(scope)
14 .container::<Vec<_>>()
15 .capture_into(EventWriter::new(send))
16 );
17 }).unwrap();
18}More examples
3fn main() {
4
5 let iterations = std::env::args().nth(1).unwrap().parse::<u64>().unwrap();
6 let elements = std::env::args().nth(2).unwrap().parse::<u64>().unwrap();
7
8 // initializes and runs a timely dataflow
9 timely::execute_from_args(std::env::args().skip(3), move |worker| {
10 let index = worker.index();
11 let peers = worker.peers();
12 worker.dataflow::<u64,_,_>(move |scope| {
13 let (helper, cycle) = scope.feedback(1);
14 (0 .. elements)
15 .filter(move |&x| (x as usize) % peers == index)
16 .to_stream(scope)
17 .concat(cycle)
18 .exchange(|&x| x)
19 .map_in_place(|x| *x += 1)
20 .branch_when(move |t| t < &iterations).1
21 .connect_loop(helper);
22 });
23 }).unwrap();
24}5fn main() {
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9
10 // create replayers from disjoint partition of source worker identifiers.
11 let replayers =
12 (0 .. source_peers)
13 .filter(|i| i % worker.peers() == worker.index())
14 .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
15 .collect::<Vec<_>>()
16 .into_iter()
17 .map(|l| l.incoming().next().unwrap().unwrap())
18 .map(EventReader::<_,Vec<u64>,_>::new)
19 .collect::<Vec<_>>();
20
21 worker.dataflow::<u64,_,_>(|scope| {
22 replayers
23 .replay_into(scope)
24 .inspect(|x| println!("replayed: {:?}", x));
25 })
26 }).unwrap(); // asserts error-free execution
27}10fn main() {
11 // initializes and runs a timely dataflow.
12 timely::execute_from_args(std::env::args(), |worker| {
13 // create a new input, exchange data, and inspect its output
14 let index = worker.index();
15 let mut input = InputHandle::new();
16 let probe = ProbeHandle::new();
17 worker.dataflow(|scope| {
18 scope.input_from(&mut input)
19 .container::<Vec<_>>()
20 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22 .probe_with(&probe);
23 });
24
25 // introduce data and watch!
26 for round in 0..10 {
27 input.send(Test { _field: Rc::new(round) } );
28 input.advance_to(round + 1);
29 worker.step_while(|| probe.less_than(input.time()));
30 }
31 }).unwrap();
32}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let index = worker.index();
9 let mut input = InputHandle::new();
10 let probe = ProbeHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 worker.dataflow(|scope| {
14 scope.input_from(&mut input)
15 .container::<Vec<_>>()
16 .exchange(|x| *x)
17 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18 .probe_with(&probe);
19 });
20
21 // introduce data and watch!
22 for round in 0..10 {
23 if index == 0 {
24 input.send(round);
25 }
26 input.advance_to(round + 1);
27 while probe.less_than(input.time()) {
28 worker.step();
29 }
30 }
31 }).unwrap();
32}8fn main() {
9 timely::execute_from_args(std::env::args(), |worker| {
10
11 let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
12
13 // create replayers from disjoint partition of source worker identifiers.
14 let replayers =
15 (0 .. source_peers)
16 .filter(|i| i % worker.peers() == worker.index())
17 .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
18 .collect::<Vec<_>>()
19 .into_iter()
20 .map(|l| l.incoming().next().unwrap().unwrap())
21 .map(EventReader::<Duration,Vec<(Duration,TimelySetup,TimelyEvent)>,_>::new)
22 .collect::<Vec<_>>();
23
24 worker.dataflow(|scope| {
25 replayers
26 .replay_into(scope)
27 .inspect(|x| println!("replayed: {:?}", x));
28 })
29 }).unwrap(); // asserts error-free execution
30}