pub fn initialize<T: Send + 'static, F: Fn(Generic) -> T + Send + Sync + 'static>(
config: Config,
func: F,
) -> Result<WorkerGuards<T>, String>
Expand description
Initializes communication and executes a distributed computation.
This method allocates an allocator::Generic
for each thread, spawns local worker threads,
and invokes the supplied function with the allocator.
The method returns a WorkerGuards<T>
which can be join
ed to retrieve the return values
(or errors) of the workers.
§Examples
use timely_communication::Allocate;
// configure for two threads, just one process.
let config = timely_communication::Config::Process(2);
// initializes communication, spawns workers
let guards = timely_communication::initialize(config, |mut allocator| {
println!("worker {} started", allocator.index());
// allocates a pair of senders list and one receiver.
let (mut senders, mut receiver) = allocator.allocate(0);
// send typed data along each channel
use timely_communication::Message;
senders[0].send(Message::from_typed(format!("hello, {}", 0)));
senders[1].send(Message::from_typed(format!("hello, {}", 1)));
// no support for termination notification,
// we have to count down ourselves.
let mut expecting = 2;
while expecting > 0 {
allocator.receive();
if let Some(message) = receiver.recv() {
use std::ops::Deref;
println!("worker {}: received: <{}>", allocator.index(), message.deref());
expecting -= 1;
}
allocator.release();
}
// optionally, return something
allocator.index()
});
// computation runs until guards are joined or dropped.
if let Ok(guards) = guards {
for guard in guards.join() {
println!("result: {:?}", guard);
}
}
else { println!("error in computation"); }
The should produce output like:
ⓘ
worker 0 started
worker 1 started
worker 0: received: <hello, 0>
worker 1: received: <hello, 1>
worker 0: received: <hello, 0>
worker 1: received: <hello, 1>
result: Ok(0)
result: Ok(1)
Examples found in repository?
examples/comm_hello.rs (lines 10-39)
6fn main() {
7
8 // extract the configuration from user-supplied arguments, initialize the computation.
9 let config = timely_communication::Config::from_args(std::env::args()).unwrap();
10 let guards = timely_communication::initialize(config, |mut allocator| {
11
12 println!("worker {} of {} started", allocator.index(), allocator.peers());
13
14 // allocates a pair of senders list and one receiver.
15 let (mut senders, mut receiver) = allocator.allocate(0);
16
17 // send typed data along each channel
18 for i in 0 .. allocator.peers() {
19 senders[i].send(Message::from_typed(format!("hello, {}", i)));
20 senders[i].done();
21 }
22
23 // no support for termination notification,
24 // we have to count down ourselves.
25 let mut received = 0;
26 while received < allocator.peers() {
27
28 allocator.receive();
29
30 if let Some(message) = receiver.recv() {
31 println!("worker {}: received: <{}>", allocator.index(), message.deref());
32 received += 1;
33 }
34
35 allocator.release();
36 }
37
38 allocator.index()
39 });
40
41 // computation runs until guards are joined or dropped.
42 if let Ok(guards) = guards {
43 for guard in guards.join() {
44 println!("result: {:?}", guard);
45 }
46 }
47 else { println!("error in computation"); }
48}