# `join_me_maybe!` [](https://crates.io/crates/join_me_maybe) [](https://docs.rs/join_me_maybe)
`join_me_maybe` provides an expanded version of the [`futures::join!`]/[`tokio::join!`] macro,
with added features for cancellation and working with streams. Programs that need this sort of
control flow often resort to "[`select!`] in a loop" and/or "`select!` by reference", but those
come with a notoriously long list of footguns.[\[1\]][cancelling_async][\[2\]][rfd400][\[3\]][rfd609]
The goal of `join_me_maybe` is to be more convenient and less error-prone than `select!` in its
most common applications. The stretch goal is to make the case that `select!`-by-reference in
particular isn't usually necessary and should be _considered harmful_.
## Features and examples
The basic use case works like other `join!` macros, polling each of its arguments to completion
and returning their outputs in a tuple.
```rust
use join_me_maybe::join;
use tokio::time::{sleep, Duration};
// Create a couple futures, one that's ready immediately, and another that takes some time.
let future1 = std::future::ready(1);
let future2 = async { sleep(Duration::from_millis(100)).await; 2 };
// Run them concurrently and wait for both of them to finish.
let (a, b) = join!(future1, future2);
assert_eq!((a, b), (1, 2));
```
### `maybe` cancellation
If you don't want to wait for all of your futures finish, you can use the `maybe` keyword. This
can be useful with infinite loops of background work that never actually exit. The outputs of
`maybe` futures are wrapped in `Option`.
```rust
let outputs = join!(
// This future isn't `maybe`, so we'll definitely wait for it to finish.
async { sleep(Duration::from_millis(100)).await; 1 },
// Same here.
async { sleep(Duration::from_millis(200)).await; 2 },
// We won't necessarily wait for this `maybe` future, but in practice it'll finish before
// the "definitely" futures above, and we'll get its output wrapped in `Some()`.
maybe async { sleep(Duration::from_millis(10)).await; 3 },
// This `maybe` future never finishes. We'll cancel it when the "definitely" work is done.
maybe async {
loop {
// Some periodic work...
sleep(Duration::from_millis(10)).await;
}
},
);
assert_eq!(outputs, (1, 2, Some(3), None));
```
### `label:` and `.cancel()`
You can also cancel futures by name if you `label:` them. The outputs of labeled futures are
wrapped in `Option` too.
```rust
let mutex = tokio::sync::Mutex::new(42);
let outputs = join!(
// The `foo:` label here means that all future expressions (including this one) have a
// `foo` object in scope, which provides a `.cancel()` method.
foo: async {
let mut guard = mutex.lock().await;
*guard += 1;
// Selfishly hold the lock for a long time.
sleep(Duration::from_secs(1_000_000)).await;
},
async {
// Give `foo` a little bit of time...
sleep(Duration::from_millis(100)).await;
if mutex.try_lock().is_err() {
// Hmm, `foo` is taking way too long. Cancel it!
foo.cancel();
}
// Cancelling `foo` drops it promptly, which releases the lock. Note that if it only
// stopped polling `foo`, but didn't drop it, this would be a deadlock. This is a
// common footgun with `select!`-in-a-loop.
*mutex.lock().await
},
);
assert_eq!(outputs, (None, 43));
```
A `.cancel()`ed future won't be polled again, and it'll be dropped promptly, freeing any locks
or other resources that it might be holding. Note that if a future cancels _itself_, its
execution still continues as normal after `.cancel()` returns, up until the next `.await`
point. This can be useful in closure bodies or nested `async` blocks, where `return` or `break`
doesn't work.
### arm bodies with `=>`
One of the powerful features of `select!` is that its arm bodies get exclusive mutable access
to the enclosing scope. `join_me_maybe!` supports an expanded `=>` syntax that works similarly:
```rust
let mut counter = 0;
let output = join!(
n = std::future::ready(1) => {
counter += n;
42
},
m = async {
sleep(Duration::from_millis(1)).await;
1
} => counter += m, // Mutate the same `counter` in both arms.
);
assert_eq!(counter, 2);
// When a `=>` body is present, the output is the value of the body.
assert_eq!(output, (42, ()));
```
Also like `select!`, it's possible to `.await` or `return` in an arm body. (However, `break` or
`continue` in a containing loop are not supported.) Note that a `return` short-circuits the
whole containing function, not just the `join!`. This is useful for error handling:
```rust
async fn foo() -> std::io::Result<()> {
let _: ((), bool) = join!(
_ = std::future::ready(1) => {
sleep(Duration::from_millis(1)).await;
return Ok(()); // Return from `foo` (the whole function, not just the `join!`).
},
_ = std::future::ready(2) => {
std::fs::exists("fallible.txt")? // Error handling with `?` also works.
},
);
unreachable!("`return` short-circuits above.");
}
```
Shared mutation from different arm bodies wouldn't be possible if they ran concurrently.
Instead, `join!` only runs one arm body at a time. This is a potential source of surprising
timing bugs, and it's best to avoid `.await`ing in arm bodies if you have a choice. However,
arm bodies and "scrutinees" (the futures to the left of the `=>`) run concurrently.
### streams
Similar to the `=>` syntax for futures above, you can also drive a stream, using `<pattern> in
<stream>` instead of `<pattern> = <future>`. In this case the following expression executes for
each item in the stream. As above, bodies get mutable access to the environment and can
`.await` or `return`:
```rust
use futures::stream;
let mut total = 0;
join!(
n in stream::iter([1, 2, 3]) => total += n,
m in stream::iter([4, 5, 6]) => total += m,
);
assert_eq!(total, 21);
```
You can optionally follow this syntax with the `finally` keyword and another expression that
executes after the stream is finished (if it's not cancelled). Streams have no return value by
default, but streams with a `finally` expression take the value of that expression:
```rust
let ret = join!(
// This stream has no `finally` expression, so it returns `()`.
_ in stream::iter([42]) => {},
// This arm's `finally` expression is `1`, and the `maybe` means we get `Some(1)`.
maybe _ in stream::iter([42]) => {} finally 1,
// Same, but without `maybe` we get the unwrapped value.
_ in stream::iter([42]) => {} finally 2,
// All the streams above finish immediately, so this `maybe` stream gets cancelled and
// returns `None` instead of evaluating its `finally` expression.
maybe _ in stream::iter([42]) => {} finally 3,
);
assert_eq!(ret, ((), Some(1), 2, None));
```
Here's an example of driving a stream together with `label:`/`.cancel()`, which works with
streams like it does with futures:
```rust
use futures::stream::{self, StreamExt};
let mut counter = 0;
join!(
my_stream: _ in stream::iter(0..5).then(async |_| {
sleep(Duration::from_millis(10)).await
}) => {
// This stream gets cancelled below, so this only executes three times.
counter += 1;
} finally {
// This stream gets cancelled below, so this will never execute.
counter += 1_000_000;
},
async {
// Wait long enough for the stream to yield three items, then cancel it.
sleep(Duration::from_millis(35)).await;
my_stream.cancel();
},
);
assert_eq!(counter, 3);
```
### mutable access to futures and streams
This feature is even more experimental than everything else above. In arm bodies
(blocks/expressions after `=>` and `finally`), `label:` cancellers support an additional
method: `with_pin_mut`. This takes a closure and invokes it with an `Option<Pin<&mut T>>`
pointing to the corresponding future or stream. (`None` if that arm is already completed or
cancelled.) You can use this to mutate e.g. a [`FuturesUnordered`] or a [`StreamMap`] to add
more work to it while it's being polled. (Not literally while it's being polled -- everything
in a `join!` runs on one thread -- but while it's owned by `join!` and guaranteed not to be
"snoozed".) This is intended as an alternative to patterns that await futures *by reference*,
which tends to be prone to "snoozing" mistakes.
Unfortunately, streams that you can add work to dynamically are usually "poorly behaved" in the
sense that they often return `Ready(None)` for a while, until more work is eventually added and
they start returning `Ready(Some(_))` again. This is at odds with the [usual rule] that you
shouldn't poll a stream again after it returns `Ready(None)`, but it does work with
`select!`-in-a-loop. (In Tokio it requires an `if` guard, and with `futures::select!` it leans
on the "fused" requirement.) However, it does _not_ naturally work with `join_me_maybe`, which
interprets `Ready(None)` as "end of stream" and promptly drops the whole stream. ([Like it's
supposed to!][usual rule]) For a stream to work well with this feature, it needs to do two
things that as far as I know none of the dynamic streams currently do:
1. The stream should only ever return `Ready(Some(_))` or `Pending`, until you somehow inform
it that no more work is coming, using e.g. a `.close()` method or something like that. After
that the stream should probably drain its remaining work before returning `Ready(None)`. (If
the caller doesn't want to wait for remaining work, they can cancel the stream instead.)
2. Because adding more work might unblock callers that previously received `Pending`, the
stream should stash a `Waker` and invoke it whenever work is added.
Adapting a stream that doesn't behave this way is complicated and not obviously a good idea.
[See `tests/test.rs` for some examples.][adapter] Manually tracking `Waker`s is exactly the
sort of error-prone business that this crate wants to _discourage_, and this whole feature will
need a lot of baking before I can recommend it. However, this approach is necessary to solve
cases like Niko Matsakis' [case study of pub-sub in mini-redis][miniredis].
[miniredis]: https://smallcultfollowing.com/babysteps/blog/2022/06/13/async-cancellation-a-case-study-of-pub-sub-in-mini-redis/
### `no_std`
`join_me_maybe` doesn't heap allocate and is compatible with `#![no_std]`.
[`futures::join!`]: https://docs.rs/futures/latest/futures/macro.join.html
[`tokio::join!`]: https://docs.rs/tokio/latest/tokio/macro.join.html
[`select!`]: https://tokio.rs/tokio/tutorial/select
[cancelling_async]: https://sunshowers.io/posts/cancelling-async-rust/
[rfd400]: https://rfd.shared.oxide.computer/rfd/400
[rfd609]: https://rfd.shared.oxide.computer/rfd/609
[`FutureExt::map`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.map
[`FutureExt::then`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.then
[`AsyncIterator`]: https://doc.rust-lang.org/std/async_iter/trait.AsyncIterator.html
[`futures::stream`]: https://docs.rs/futures/latest/futures/stream/
[`FuturesUnordered`]: https://docs.rs/futures/latest/futures/stream/struct.FuturesUnordered.html
[`StreamMap`]: https://docs.rs/tokio-stream/latest/tokio_stream/struct.StreamMap.html
[usual rule]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html#tymethod.poll_next
[adapter]: https://github.com/oconnor663/join_me_maybe/blob/672c615cd586140e09052a83795ccc291c0a31c8/tests/test.rs#L180-L327
## Help needed! (from the compiler...)
As far as I know, there's no way for the `join!` macro to support something like this today
(this works in arm bodies, but not as here in the "scrutinee" position):
```rust
let mut x = 0;
join!(
async { x += 1; },
async { x += 1; }, // error: cannot borrow `x` as mutable more than once at a time
);
assert_eq!(x, 2);
```
The problem is that both futures want to capture `&mut x`, which violates the mutable aliasing
rule. However, that arguably borrows too much. Consider:
1. Neither of these futures tries to hold `&mut x` across an `.await` point. In other words,
the borrow checker would accept any interleaving of their "basic blocks" in a
single-threaded context.
2. They're hidden inside the join future, so we can't yeet one of them off to another thread to
create a data race.
Instead of each inner future capturing `&mut x`, the outer join future could capture it once,
and the inner futures could "reborrow" it in some sense when they're polled. My guess is that
there's no practical way for a macro to express this in Rust today (corrections welcome!), but
the Rust compiler could add a hypothetical syntax like this:
```rust
let mut x = 0;
let mut y = 0;
concurrent_bikeshed {
{
// `x` is not borrowed across the `.await`...
x += 1;
// ...but `y` is.
let y_ref = &mut y;
sleep(Duration::from_secs(1)).await;
*y_ref += 1;
x += 1;
},
{
// Mutating `x` here does not conflict...
x += 1;
// ...but trying to mutate `y` here would conflict.
// y += 1;
},
}
```
Another big advantage of adding dedicated syntax for this is that it could support
`return`/`break`/`continue` as usual to diverge from inside any arm. That would be especially
helpful for error handling with `?`, which is awkward in a lot of concurrent contexts today.