Macro crossbeam_channel::select_loop [] [src]

macro_rules! select_loop {
    {$($method:ident($($args:tt)*) $(if $guard:expr)* => $body:expr$(,)*)*} => { ... };
    {@impl($state:ident) send($tx:expr, $val:ident) => $body:expr} => { ... };
    {@impl($state:ident) send($tx:expr, $val:ident) => $body:expr} => { ... };
    {@impl($state:ident) send($tx:expr, mut $val:expr) => $body:expr} => { ... };
    {@impl($state:ident) send($tx:expr, $val:expr) => $body:expr} => { ... };
    {@impl($state:ident) recv($rx:expr, _) => $body:expr} => { ... };
    {@impl($state:ident) recv($rx:expr, $val:ident) => $body:expr} => { ... };
    {@impl($state:ident) recv($rx:expr, mut $val:ident) => $body:expr} => { ... };
    {@impl($state:ident) disconnected() => $body:expr} => { ... };
    {@impl($state:ident) would_block() => $body:expr} => { ... };
    {@impl($state:ident) timed_out($_timeout:expr) => $body:expr} => { ... };
    {@prelude($state:ident) send($tx:expr, $val:ident) $(if $_guard:expr)*} => { ... };
    {@prelude($state:ident) timed_out($timeout:expr) $(if $guard:expr)*} => { ... };
    {@prelude($state:ident) $($tail:tt)*} => { ... };
    {@check_guard() [$($_ctx:tt)*]} => { ... };
    {@check_guard(if $_guard:expr) [$($_ctx:tt)*]} => { ... };
    {@check_guard($($_tt:tt)*) [$($ctx:tt)*]} => { ... };
}

The static selection macro.

It allows declaring an arbitrary static list of operations on channels, and waiting until exactly one of them fires. If you need to select over a dynamic list of operations, use Select instead. This macro is just a restricted and more user-friendly wrapper around it.

What is selection?

It is possible to declare a set of possible send and/or receive operations on channels, and then wait until exactly one of them fires (in other words, one of them is selected).

For example, we might want to receive a message from a set of two channels and block until a message is received from any of them. To do that, we would write:


use std::thread;

let (tx1, rx1) = channel::unbounded();
let (tx2, rx2) = channel::unbounded();

thread::spawn(move || tx1.send("foo").unwrap());
thread::spawn(move || tx2.send("bar").unwrap());

select_loop! {
    recv(rx1, msg) => println!("A message was received from rx1: {:?}", msg),
    recv(rx2, msg) => println!("A message was received from rx2: {:?}", msg),
}

There are two selection cases: a receive on rx1 and a receive on rx2. The macro is in fact loop, which is continuously probing both channels until one of the cases successfully receives a message. Then we print the message and the loop is broken.

The loop will automatically block the current thread if none of the operations can proceed and wake up as soon as any one of them becomes ready. However, there are a few rules that must be followed when declaring a set of cases in a loop.

Selection cases

There are five kinds of selection cases:

  1. A receive case, which fires when a message can be received from the channel.
  2. A send case, which fires when the message can be sent into the channel.
  3. A would block case, which fires when all receive and send operations in the loop would block.
  4. A disconnected case, which fires when all operations in the loop are working with disconnected channels.
  5. A timed out case, which fires when selection is blocked for longer than the specified timeout.

Additionally, every case may optionally be guarded by a condition, so that the case is enabled only if the condition is true. Note that such conditions must not change within the select_loop!.

Selection rules

Rules which must be respected in order for selection to work properly:

  1. No selection case may be repeated.
  2. No two cases may operate on the same end (receiving or sending) of the same channel.
  3. There must be at least one send or at least one recv case.
  4. If case conditions are used, they must not change within the select_loop!.

Violating any of these rules will either result in a panic, deadlock, or livelock, possibly even in a seemingly unrelated send or receive operations outside this particular selection loop.

Guarantees

  1. Exactly one case fires.
  2. If none of the cases can fire at the time, the current thread will be blocked.
  3. If blocked, the current thread will be woken up as soon a message is pushed/popped into/from any channel waited on by a receive/send case, or if all channels become disconnected.

Finally, if more than one send or receive case can fire at the same time, a pseudorandom case will be selected, but on a best-effort basis only. The mechanism isn't promising any strict guarantees on fairness.

Syntax

The macro has similar syntax to match expression. It takes a list of cases, where each case is of the form operation(arguments) => expression. The individual cases are optionally separated by commas. Just like match, the whole macro invocation is an expression, which in the end evaluates to a single value.

Every case may also optionally have a guard expression at the end, in the form of operation(arguments) if guard_expression => expression. The guarded case will participate in selection only if guard_expression evaluates to true.

The following code illustrates the various ways in which cases can be declared:

Be careful when using this code, it's not being tested!
select_loop! {
    // Send `msg` into `tx1`.
    //
    // Behind the scenes, this form will actually rebind the variable in mutable form by
    // inserting the following line before the loop: `let mut msg = msg;`
    send(tx1, msg) => { ... }

    // Send the result of an expression as a message into `tx2`.
    //
    // Note that this form will evaluate the expression in each iteration of the loop. If the
    // evaluation is expensive, you should probably do it once before the loop and bind to a
    // variable, then send that variable as the message.
    send(tx2, x * 10 - y) => { ... }

    // Send `msg` into `tx3`, but regain ownership on each failure.
    //
    // If sending the message fails in an interation of the loop, then ownership of the message
    // will be automatically regained from the error and bound back to the original variable.
    //
    // You should use this form if `msg` is not `Copy`.
    send(tx3, mut msg) => { ... }

    // Send `msg` into `tx4` but only if `enabled_tx4` is `true`.
    send(tx4, msg) if enabled_tx4 => { ... }

    // Receive `msg` from `rx1`.
    recv(rx1, msg) => { ... }

    // Receive `msg` from `rx2`, and make the variable mutable.
    recv(rx2, mut msg) => { ... }

    // Receive a message from `rx3`, but don't bind it to a variable.
    recv(rx3, _) => { ... }

    // Receive `msg` from the optional receiver, if it exists (`Option<Receiver<_>>`).
    recv(opt_rx4.as_ref().unwrap(), msg) if opt_rx4.is_some() => { ... }

    // This case fires if all declared send/receive operations are on disconnected channels.
    disconnected() => { ... }

    // This case fires if all declared send/receive operations would block.
    would_block() => { ... }

    // This case fires if selection waits for longer than `timeout`.
    //
    // The specified `timeout` must be of type `std::time::Duration`.
    timed_out(timeout) => { ... }
}

Examples

Receive a message of the same type from two channels


use std::thread;

let (tx1, rx1) = channel::unbounded();
let (tx2, rx2) = channel::unbounded();

thread::spawn(move || tx1.send("foo").unwrap());
thread::spawn(move || tx2.send("bar").unwrap());

let msg = select_loop! {
    recv(rx1, msg) => {
        println!("Received from rx1.");
        msg
    }
    recv(rx2, msg) => {
        println!("Received from rx2.");
        msg
    }
};

println!("Message: {:?}", msg);

Send a non-Copy message, regaining ownership on each failure


let (tx, rx) = channel::unbounded();

// The message we're going to send.
let mut msg = "Hello!".to_string();

select_loop! {
    // The variable is marked with `mut`, which indicates that ownership must be reacquired if
    // sending the variable fails.
    send(tx, mut msg) => println!("The message was sent!"),

    recv(rx, msg) => println!("A message was received: {}", msg),
}

Stop if all channels are disconnected


let (tx, rx) = channel::unbounded();

// Disconnect the channel.
drop(rx);

select_loop! {
    // Won't happen. The channel is disconnected.
    send(tx, "message") => {
        println!("Sent the message.");
        panic!();
    }

    disconnected() => println!("All channels are disconnected! Stopping selection."),
}

Stop if all operations would block


let (tx, rx) = channel::unbounded::<i32>();

select_loop! {
    // Won't happen. The channel is empty.
    recv(rx, msg) => {
        println!("Received message: {:?}", msg);
        panic!();
    }

    would_block() => println!("All operations would block. Stopping selection."),
}

Selection with a timeout


use std::time::Duration;

let (tx, rx) = channel::unbounded::<i32>();

select_loop! {
    // Won't happen. The channel is empty.
    recv(rx, msg) => {
        println!("Received message: {:?}", msg);
        panic!();
    }

    timed_out(Duration::from_secs(1)) => println!("Timed out after 1 second."),
}

One send and one receive operation on the same channel


use channel::{Sender, Receiver, Select};
use std::thread;

// Either send my name into the channel or receive someone else's, whatever happens first.
fn seek<'a>(name: &'a str, tx: Sender<&'a str>, rx: Receiver<&'a str>) {
    select_loop! {
        recv(rx, peer) => println!("{} received a message from {}.", name, peer),
        send(tx, name) => {}
    }
}

let (tx, rx) = channel::bounded(1); // Make room for one unmatched send.

// Pair up five people by exchanging messages over the channel.
// Since there is an odd number of them, one person won't have its match.
["Anna", "Bob", "Cody", "Dave", "Eva"].iter()
    .map(|name| {
        let tx = tx.clone();
        let rx = rx.clone();
        thread::spawn(move || seek(name, tx, rx))
    })
    .collect::<Vec<_>>()
    .into_iter()
    .for_each(|t| t.join().unwrap());

// Let's send a message to the remaining person who doesn't have a match.
if let Ok(name) = rx.try_recv() {
    println!("No one received {}’s message.", name);
}