join_me_maybe 0.4.2

an async `join!` macro with `select!`-like features
Documentation
# `join_me_maybe!` [![crates.io]https://img.shields.io/crates/v/join_me_maybe.svg]https://crates.io/crates/join_me_maybe [![docs.rs]https://docs.rs/join_me_maybe/badge.svg]https://docs.rs/join_me_maybe

`join_me_maybe` provides an expanded version of the [`futures::join!`]/[`tokio::join!`] macro,
with added features for cancellation and working with streams. Programs that need this sort of
control flow often resort to "[`select!`] in a loop" and/or "`select!` by reference", but those
come with a notoriously long list of footguns.[\[1\]][cancelling_async][\[2\]][rfd400][\[3\]][rfd609]
The goal of `join_me_maybe` is to be more convenient and less error-prone than `select!` in its
most common applications. The stretch goal is to make the case that `select!`-by-reference in
particular isn't usually necessary and should be _considered harmful_.

## Features and examples

The basic use case works like other `join!` macros, polling each of its arguments to completion
and returning their outputs in a tuple.

```rust
use join_me_maybe::join;
use tokio::time::{sleep, Duration};

// Create a couple futures, one that's ready immediately, and another that takes some time.
let future1 = std::future::ready(1);
let future2 = async { sleep(Duration::from_millis(100)).await; 2 };

// Run them concurrently and wait for both of them to finish.
let (a, b) = join!(future1, future2);
assert_eq!((a, b), (1, 2));
```

### `maybe` cancellation

If you don't want to wait for all of your futures finish, you can use the `maybe` keyword. This
can be useful with infinite loops of background work that never actually exit. The outputs of
`maybe` futures are wrapped in `Option`.

```rust
let outputs = join!(
    // This future isn't `maybe`, so we'll definitely wait for it to finish.
    async { sleep(Duration::from_millis(100)).await; 1 },
    // Same here.
    async { sleep(Duration::from_millis(200)).await; 2 },
    // We won't necessarily wait for this `maybe` future, but in practice it'll finish before
    // the "definitely" futures above, and we'll get its output wrapped in `Some()`.
    maybe async { sleep(Duration::from_millis(10)).await; 3 },
    // This `maybe` future never finishes. We'll cancel it when the "definitely" work is done.
    maybe async {
        loop {
            // Some periodic work...
            sleep(Duration::from_millis(10)).await;
        }
    },
);
assert_eq!(outputs, (1, 2, Some(3), None));
```

### `label:` and `.cancel()`

You can also cancel futures by name if you `label:` them. The outputs of labeled futures are
wrapped in `Option` too.

```rust
let mutex = tokio::sync::Mutex::new(42);
let outputs = join!(
    // The `foo:` label here means that all future expressions (including this one) have a
    // `foo` object in scope, which provides a `.cancel()` method.
    foo: async {
        let mut guard = mutex.lock().await;
        *guard += 1;
        // Selfishly hold the lock for a long time.
        sleep(Duration::from_secs(1_000_000)).await;
    },
    async {
        // Give `foo` a little bit of time...
        sleep(Duration::from_millis(100)).await;
        if mutex.try_lock().is_err() {
            // Hmm, `foo` is taking way too long. Cancel it!
            foo.cancel();
        }
        // Cancelling `foo` drops it promptly, which releases the lock. Note that if it only
        // stopped polling `foo`, but didn't drop it, this would be a deadlock. This is a
        // common footgun with `select!`-in-a-loop.
        *mutex.lock().await
    },
);
assert_eq!(outputs, (None, 43));
```

A `.cancel()`ed future won't be polled again, and it'll be dropped promptly, freeing any locks
or other resources that it might be holding. Note that if a future cancels _itself_, its
execution still continues as normal after `.cancel()` returns, up until the next `.await`
point. This can be useful in closure bodies or nested `async` blocks, where `return` or `break`
doesn't work.

### arm bodies with `=>`

One of the powerful features of `select!` is that its arm bodies get exclusive mutable access
to the enclosing scope. `join_me_maybe!` supports an expanded `=>` syntax that works similarly:

```rust
let mut counter = 0;
let output = join!(
    n = std::future::ready(1) => {
        counter += n;
        42
    },
    m = async {
        sleep(Duration::from_millis(1)).await;
        1
    } => counter += m, // Mutate the same `counter` in both arms.
);
assert_eq!(counter, 2);
// When a `=>` body is present, the output is the value of the body.
assert_eq!(output, (42, ()));
```

Also like `select!`, it's possible to `.await` or `return` in an arm body. (However, `break` or
`continue` in a containing loop are not supported.) Note that a `return` short-circuits the
whole containing function, not just the `join!`. This is useful for error handling:

```rust
async fn foo() -> std::io::Result<()> {
    let _: ((), bool) = join!(
        _ = std::future::ready(1) => {
            sleep(Duration::from_millis(1)).await;
            return Ok(()); // Return from `foo` (the whole function, not just the `join!`).
        },
        _ = std::future::ready(2) => {
            std::fs::exists("fallible.txt")? // Error handling with `?` also works.
        },
    );
    unreachable!("`return` short-circuits above.");
}
```

Shared mutation from different arm bodies wouldn't be possible if they ran concurrently.
Instead, `join!` only runs one arm body at a time. This is a potential source of surprising
timing bugs, and it's best to avoid `.await`ing in arm bodies if you have a choice. However,
arm bodies and "scrutinees" (the futures to the left of the `=>`) run concurrently.

### streams

Similar to the `=>` syntax for futures above, you can also drive a stream, using `<pattern> in
<stream>` instead of `<pattern> = <future>`. In this case the following expression executes for
each item in the stream. As above, bodies get mutable access to the environment and can
`.await` or `return`:

```rust
use futures::stream;

let mut total = 0;
join!(
    n in stream::iter([1, 2, 3]) => total += n,
    m in stream::iter([4, 5, 6]) => total += m,
);
assert_eq!(total, 21);
```

You can optionally follow this syntax with the `finally` keyword and another expression that
executes after the stream is finished (if it's not cancelled). Streams have no return value by
default, but streams with a `finally` expression take the value of that expression:

```rust
let ret = join!(
    // This stream has no `finally` expression, so it returns `()`.
    _ in stream::iter([42]) => {},
    // This arm's `finally` expression is `1`, and the `maybe` means we get `Some(1)`.
    maybe _ in stream::iter([42]) => {} finally 1,
    // Same, but without `maybe` we get the unwrapped value.
    _ in stream::iter([42]) => {} finally 2,
    // All the streams above finish immediately, so this `maybe` stream gets cancelled and
    // returns `None` instead of evaluating its `finally` expression.
    maybe _ in stream::iter([42]) => {} finally 3,
);
assert_eq!(ret, ((), Some(1), 2, None));
```

Here's an example of driving a stream together with `label:`/`.cancel()`, which works with
streams like it does with futures:

```rust
use futures::stream::{self, StreamExt};

let mut counter = 0;
join!(
    my_stream: _ in stream::iter(0..5).then(async |_| {
        sleep(Duration::from_millis(10)).await
    }) => {
        // This stream gets cancelled below, so this only executes three times.
        counter += 1;
    } finally {
        // This stream gets cancelled below, so this will never execute.
        counter += 1_000_000;
    },
    async {
        // Wait long enough for the stream to yield three items, then cancel it.
        sleep(Duration::from_millis(35)).await;
        my_stream.cancel();
    },
);
assert_eq!(counter, 3);
```

### mutable access to futures and streams

This feature is even more experimental than everything else above. In arm bodies
(blocks/expressions after `=>` and `finally`), `label:` cancellers support an additional
method: `with_pin_mut`. This takes a closure and invokes it with an `Option<Pin<&mut T>>`
pointing to the corresponding future or stream. (`None` if that arm is already completed or
cancelled.) You can use this to mutate e.g. a [`FuturesUnordered`] or a [`StreamMap`] to add
more work to it while it's being polled. (Not literally while it's being polled -- everything
in a `join!` runs on one thread -- but while it's owned by `join!` and guaranteed not to be
"snoozed".) This is intended as an alternative to patterns that await futures *by reference*,
which tends to be prone to "snoozing" mistakes.

Unfortunately, streams that you can add work to dynamically are usually "poorly behaved" in the
sense that they often return `Ready(None)` for a while, until more work is eventually added and
they start returning `Ready(Some(_))` again. This is at odds with the [usual rule] that you
shouldn't poll a stream again after it returns `Ready(None)`, but it does work with
`select!`-in-a-loop. (In Tokio it requires an `if` guard, and with `futures::select!` it leans
on the "fused" requirement.) However, it does _not_ naturally work with `join_me_maybe`, which
interprets `Ready(None)` as "end of stream" and promptly drops the whole stream. ([Like it's
supposed to!][usual rule]) For a stream to work well with this feature, it needs to do two
things that as far as I know none of the dynamic streams currently do:

1. The stream should only ever return `Ready(Some(_))` or `Pending`, until you somehow inform
   it that no more work is coming, using e.g. a `.close()` method or something like that. After
   that the stream should probably drain its remaining work before returning `Ready(None)`. (If
   the caller doesn't want to wait for remaining work, they can cancel the stream instead.)
2. Because adding more work might unblock callers that previously received `Pending`, the
   stream should stash a `Waker` and invoke it whenever work is added.

Adapting a stream that doesn't behave this way is complicated and not obviously a good idea.
[See `tests/test.rs` for some examples.][adapter] Manually tracking `Waker`s is exactly the
sort of error-prone business that this crate wants to _discourage_, and this whole feature will
need a lot of baking before I can recommend it. However, this approach is necessary to solve
cases like Niko Matsakis' [case study of pub-sub in mini-redis][miniredis].

[miniredis]: https://smallcultfollowing.com/babysteps/blog/2022/06/13/async-cancellation-a-case-study-of-pub-sub-in-mini-redis/

### `no_std`

`join_me_maybe` doesn't heap allocate and is compatible with `#![no_std]`.

[`futures::join!`]: https://docs.rs/futures/latest/futures/macro.join.html
[`tokio::join!`]: https://docs.rs/tokio/latest/tokio/macro.join.html
[`select!`]: https://tokio.rs/tokio/tutorial/select
[cancelling_async]: https://sunshowers.io/posts/cancelling-async-rust/
[rfd400]: https://rfd.shared.oxide.computer/rfd/400
[rfd609]: https://rfd.shared.oxide.computer/rfd/609
[`FutureExt::map`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.map
[`FutureExt::then`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.then
[`AsyncIterator`]: https://doc.rust-lang.org/std/async_iter/trait.AsyncIterator.html
[`futures::stream`]: https://docs.rs/futures/latest/futures/stream/
[`FuturesUnordered`]: https://docs.rs/futures/latest/futures/stream/struct.FuturesUnordered.html
[`StreamMap`]: https://docs.rs/tokio-stream/latest/tokio_stream/struct.StreamMap.html
[usual rule]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html#tymethod.poll_next
[adapter]: https://github.com/oconnor663/join_me_maybe/blob/672c615cd586140e09052a83795ccc291c0a31c8/tests/test.rs#L180-L327

## Help needed! (from the compiler...)

As far as I know, there's no way for the `join!` macro to support something like this today
(this works in arm bodies, but not as here in the "scrutinee" position):

```rust
let mut x = 0;
join!(
    async { x += 1; },
    async { x += 1; }, // error: cannot borrow `x` as mutable more than once at a time
);
assert_eq!(x, 2);
```

The problem is that both futures want to capture `&mut x`, which violates the mutable aliasing
rule. However, that arguably borrows too much. Consider:

1. Neither of these futures tries to hold `&mut x` across an `.await` point. In other words,
   the borrow checker would accept any interleaving of their "basic blocks" in a
   single-threaded context.
2. They're hidden inside the join future, so we can't yeet one of them off to another thread to
   create a data race.

Instead of each inner future capturing `&mut x`, the outer join future could capture it once,
and the inner futures could "reborrow" it in some sense when they're polled. My guess is that
there's no practical way for a macro to express this in Rust today (corrections welcome!), but
the Rust compiler could add a hypothetical syntax like this:

```rust
let mut x = 0;
let mut y = 0;
concurrent_bikeshed {
    {
        // `x` is not borrowed across the `.await`...
        x += 1;
        // ...but `y` is.
        let y_ref = &mut y;
        sleep(Duration::from_secs(1)).await;
        *y_ref += 1;
        x += 1;
    },
    {
        // Mutating `x` here does not conflict...
        x += 1;
        // ...but trying to mutate `y` here would conflict.
        // y += 1;
    },
}
```

Another big advantage of adding dedicated syntax for this is that it could support
`return`/`break`/`continue` as usual to diverge from inside any arm. That would be especially
helpful for error handling with `?`, which is awkward in a lot of concurrent contexts today.