[][src]Macro tokio::select

macro_rules! select {
    (@ {
        // One `_` for each branch in the `select!` macro. Passing this to
        // `count!` converts $skip to an integer.
        ( $($count:tt)* )

        // Normalized select branches. `( $skip )` is a set of `_` characters.
        // There is one `_` for each select branch **before** this one. Given
        // that all input futures are stored in a tuple, $skip is useful for
        // generating a pattern to reference the future for the current branch.
        // $skip is also used as an argument to `count!`, returning the index of
        // the current select branch.
        $( ( $($skip:tt)* ) $bind:pat = $fut:expr, if $c:expr => $handle:expr, )+

        // Fallback expression used when all select branches have been disabled.
        ; $else:expr

    }) => { ... };
    (@ { $($t:tt)* } ) => { ... };
    (@ { $($t:tt)* } else => $else:expr $(,)?) => { ... };
    (@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:block, $($r:tt)* ) => { ... };
    (@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:block, $($r:tt)* ) => { ... };
    (@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:block $($r:tt)* ) => { ... };
    (@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:block $($r:tt)* ) => { ... };
    (@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:expr ) => { ... };
    (@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:expr ) => { ... };
    (@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:expr, $($r:tt)* ) => { ... };
    (@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:expr, $($r:tt)* ) => { ... };
    ( $p:pat = $($t:tt)* ) => { ... };
    () => { ... };
}
This is supported on feature="macros" only.

Wait on multiple concurrent branches, returning when the first branch completes, cancelling the remaining branches.

The select! macro must be used inside of async functions, closures, and blocks.

The select! macro accepts one or more branches with the following pattern:

<pattern> = <async expression> (, if <precondition>)? => <handler>,

Additionally, the select! macro may include a single, optional else branch, which evaluates if none of the other branches match their patterns:

else <expression>

The macro aggregates all <async expression> expressions and runs them concurrently on the current task. Once the first expression completes with a value that matches its <pattern>, the select! macro returns the result of evaluating the completed branch's <handler> expression.

Additionally, each branch may include an optional if precondition. This precondition is evaluated before the <async expression>. If the precondition returns false, the branch is entirely disabled. This capability is useful when using select! within a loop.

The complete lifecycle of a select! expression is as follows:

  1. Evaluate all provded <precondition> expressions. If the precondition returns false, disable the branch for the remainder of the current call to select!. Re-entering select! due to a loop clears the "disabled" state.
  2. Aggregate the <async expression>s from each branch, including the disabled ones. If the branch is disabled, <async expression> is still evaluated, but the resulting future is not polled.
  3. Concurrently await on the results for all remaining <async expression>s.
  4. Once an <async expression> returns a value, attempt to apply the value to the provided <pattern>, if the pattern matches, evaluate <handler> and return. If the pattern does not match, disable the current branch and for the remainder of the current call to select!. Continue from step 3.
  5. If all branches are disabled, evaluate the else expression. If none is provided, panic.

Notes

Runtime characteristics

By running all async expressions on the current task, the expressions are able to run concurrently but not in parallel. This means all expressions are run on the same thread and if one branch blocks the thread, all other expressions will be unable to continue. If parallelism is required, spawn each async expression using tokio::spawn and pass the join handle to select!.

Avoid racy if preconditions

Given that if preconditions are used to disable select! branches, some caution must be used to avoid missing values.

For example, here is incorrect usage of delay with if. The objective is to repeatedly run an asynchronous task for up to 50 milliseconds. However, there is a potential for the delay completion to be missed.

use tokio::time::{self, Duration};

async fn some_async_work() {
    // do work
}

#[tokio::main]
async fn main() {
    let mut delay = time::delay_for(Duration::from_millis(50));

    while !delay.is_elapsed() {
        tokio::select! {
            _ = &mut delay, if !delay.is_elapsed() => {
                println!("operation timed out");
            }
            _ = some_async_work() => {
                println!("operation completed");
            }
        }
    }
}

In the above example, delay.is_elapsed() may return true even if delay.poll() never returned Ready. This opens up a potential race condition where delay expires between the while !delay.is_elapsed() check and the call to select! resulting in the some_async_work() call to run uninterrupted despite the delay having elapsed.

One way to write the above example without the race would be:

use tokio::time::{self, Duration};

async fn some_async_work() {
    // do work
}

#[tokio::main]
async fn main() {
    let mut delay = time::delay_for(Duration::from_millis(50));

    loop {
        tokio::select! {
            _ = &mut delay => {
                println!("operation timed out");
                break;
            }
            _ = some_async_work() => {
                println!("operation completed");
            }
        }
    }
}

Fairness

select! randomly picks a branch to check first. This provides some level of fairness when calling select! in a loop with branches that are always ready.

Panics

select! panics if all branches are disabled and there is no provided else branch. A branch is disabled when the provided if precondition returns false or when the pattern does not match the result of `.

Examples

Basic select with two branches.

async fn do_stuff_async() {
    // async work
}

async fn more_async_work() {
    // more here
}

#[tokio::main]
async fn main() {
    tokio::select! {
        _ = do_stuff_async() => {
            println!("do_stuff_async() completed first")
        }
        _ = more_async_work() => {
            println!("more_async_work() completed first")
        }
    };
}

Basic stream selecting.

use tokio::stream::{self, StreamExt};

#[tokio::main]
async fn main() {
    let mut stream1 = stream::iter(vec![1, 2, 3]);
    let mut stream2 = stream::iter(vec![4, 5, 6]);

    let next = tokio::select! {
        v = stream1.next() => v.unwrap(),
        v = stream2.next() => v.unwrap(),
    };

    assert!(next == 1 || next == 4);
}

Collect the contents of two streams. In this example, we rely on pattern matching and the fact that stream::iter is "fused", i.e. once the stream is complete, all calls to next() return None.

use tokio::stream::{self, StreamExt};

#[tokio::main]
async fn main() {
    let mut stream1 = stream::iter(vec![1, 2, 3]);
    let mut stream2 = stream::iter(vec![4, 5, 6]);

    let mut values = vec![];

    loop {
        tokio::select! {
            Some(v) = stream1.next() => values.push(v),
            Some(v) = stream2.next() => values.push(v),
            else => break,
        }
    }

    values.sort();
    assert_eq!(&[1, 2, 3, 4, 5, 6], &values[..]);
}

Using the same future in multiple select! expressions can be done by passing a reference to the future. Doing so requires the future to be Unpin. A future can be made Unpin by either using Box::pin or stack pinning.

Here, a stream is consumed for at most 1 second.

use tokio::stream::{self, StreamExt};
use tokio::time::{self, Duration};

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(vec![1, 2, 3]);
    let mut delay = time::delay_for(Duration::from_secs(1));

    loop {
        tokio::select! {
            maybe_v = stream.next() => {
                if let Some(v) = maybe_v {
                    println!("got = {}", v);
                } else {
                    break;
                }
            }
            _ = &mut delay => {
                println!("timeout");
                break;
            }
        }
    }
}

Joining two values using select!.

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = oneshot::channel();
    let (tx2, mut rx2) = oneshot::channel();

    tokio::spawn(async move {
        tx1.send("first").unwrap();
    });

    tokio::spawn(async move {
        tx2.send("second").unwrap();
    });

    let mut a = None;
    let mut b = None;

    while a.is_none() || b.is_none() {
        tokio::select! {
            v1 = (&mut rx1), if a.is_none() => a = Some(v1.unwrap()),
            v2 = (&mut rx2), if b.is_none() => b = Some(v2.unwrap()),
        }
    }

    let res = (a.unwrap(), b.unwrap());

    assert_eq!(res.0, "first");
    assert_eq!(res.1, "second");
}