Function initialize

Source
pub fn initialize<T: Send + 'static, F: Fn(Generic) -> T + Send + Sync + 'static>(
    config: Config,
    func: F,
) -> Result<WorkerGuards<T>, String>
Expand description

Initializes communication and executes a distributed computation.

This method allocates an allocator::Generic for each thread, spawns local worker threads, and invokes the supplied function with the allocator. The method returns a WorkerGuards<T> which can be joined to retrieve the return values (or errors) of the workers.

§Examples

use timely_communication::Allocate;

// configure for two threads, just one process.
let config = timely_communication::Config::Process(2);

// initializes communication, spawns workers
let guards = timely_communication::initialize(config, |mut allocator| {
    println!("worker {} started", allocator.index());

    // allocates a pair of senders list and one receiver.
    let (mut senders, mut receiver) = allocator.allocate(0);

    // send typed data along each channel
    use timely_communication::Message;
    senders[0].send(Message::from_typed(format!("hello, {}", 0)));
    senders[1].send(Message::from_typed(format!("hello, {}", 1)));

    // no support for termination notification,
    // we have to count down ourselves.
    let mut expecting = 2;
    while expecting > 0 {
        allocator.receive();
        if let Some(message) = receiver.recv() {
            use std::ops::Deref;
            println!("worker {}: received: <{}>", allocator.index(), message.deref());
            expecting -= 1;
        }
        allocator.release();
    }

    // optionally, return something
    allocator.index()
});

// computation runs until guards are joined or dropped.
if let Ok(guards) = guards {
    for guard in guards.join() {
        println!("result: {:?}", guard);
    }
}
else { println!("error in computation"); }

The should produce output like:

worker 0 started
worker 1 started
worker 0: received: <hello, 0>
worker 1: received: <hello, 1>
worker 0: received: <hello, 0>
worker 1: received: <hello, 1>
result: Ok(0)
result: Ok(1)
Examples found in repository?
examples/comm_hello.rs (lines 10-39)
6fn main() {
7
8    // extract the configuration from user-supplied arguments, initialize the computation.
9    let config = timely_communication::Config::from_args(std::env::args()).unwrap();
10    let guards = timely_communication::initialize(config, |mut allocator| {
11
12        println!("worker {} of {} started", allocator.index(), allocator.peers());
13
14        // allocates a pair of senders list and one receiver.
15        let (mut senders, mut receiver) = allocator.allocate(0);
16
17        // send typed data along each channel
18        for i in 0 .. allocator.peers() {
19            senders[i].send(Message::from_typed(format!("hello, {}", i)));
20            senders[i].done();
21        }
22
23        // no support for termination notification,
24        // we have to count down ourselves.
25        let mut received = 0;
26        while received < allocator.peers() {
27
28            allocator.receive();
29
30            if let Some(message) = receiver.recv() {
31                println!("worker {}: received: <{}>", allocator.index(), message.deref());
32                received += 1;
33            }
34
35            allocator.release();
36        }
37
38        allocator.index()
39    });
40
41    // computation runs until guards are joined or dropped.
42    if let Ok(guards) = guards {
43        for guard in guards.join() {
44            println!("result: {:?}", guard);
45        }
46    }
47    else { println!("error in computation"); }
48}