join_me_maybe!

join_me_maybe! is an expanded version of the futures::join!/tokio::join! macro, with
several added features for cancellation, early exit, and mutable access to the enclosing scope.
Programs that need this sort of control flow often resort to "select! in a loop" and/or
"select! by reference", but those come with a notoriously long list of
footguns.[1][2][3] The goal of join_me_maybe!
is to be more convenient and less error-prone than select! in its most common applications.
The stretch goal is to make the case that select!-by-reference in particular isn't usually
necessary and should be considered harmful.
Features and examples
The basic use case works like join!, polling each of its arguments to completion and
returning their outputs in a tuple.
use join_me_maybe;
use ;
// Create a couple futures, one that's ready immediately, and another that takes some time.
let future1 = ready;
let future2 = async ;
// Run them concurrently and wait for both of them to finish.
let = join_me_maybe!;
assert_eq!;
(This is an example to get us started, but in practice I would just use join! here.)
maybe cancellation
If you don't want to wait for all of your futures finish, you can use the maybe keyword. This
can be useful with infinite loops of background work that never actually exit. The outputs of
maybe futures are wrapped in Option.
let outputs = join_me_maybe!;
assert_eq!;
label: and .cancel()
You can also cancel futures by name if you label: them. The outputs of labeled futures are
wrapped in Option too.
let mutex = new;
let outputs = join_me_maybe!;
assert_eq!;
A .cancel()ed future won't be polled again, and it'll be dropped promptly, freeing any locks
or other resources that it might be holding. Note that if a future cancels itself, its
execution still continues as normal after .cancel() returns, up until the next .await
point. This can be useful in closure bodies or nested async blocks, where return or break
doesn't work.
"finish expressions": <pattern> = <future> => <body>
One of the powerful features of select! is that its arm bodies (though not its "scrutinees")
get exclusive mutable access to the enclosing scope. join_me_maybe! supports an expanded =>
syntax that works similarly:
let mut counter = 0;
join_me_maybe!;
assert_eq!;
In order to give these "finish expressions" mutable access to the enclosing scope, without
"snoozing" any of the other concurrent futures, these expressions run in a synchronous
context (i.e. they cannot .await). This makes them similar to FutureExt::map (as opposed
to FutureExt::then). However, note that trying to accomplish the same thing with map (or
then) doesn't compile:
# #[tokio::main]
# async fn main() {
# use join_me_maybe::join_me_maybe;
# use tokio::time::{sleep, Duration};
# use futures::FutureExt;
let mut counter = 0;
join_me_maybe!(
sleep(Duration::from_millis(1)).map(|_| counter += 1),
// ------- first mutable borrow
async {
sleep(Duration::from_millis(1)).await;
1
}.map(|n| counter += n),
// ------- second mutable borrow
);
# }
streams
Similar to the => syntax for futures above, you can also drive a stream, using <pattern> in <stream> instead of <pattern> = <future>. In this case the following expression executes for
each item in the stream. You can optionally follow that with the finally keyword and another
expression that executes after the stream is finished (if it's not cancelled). Both of these
expressions get mutable access to the environment (and cannot .await). Here's an example of
driving a stream, together with label:/.cancel(), which works with streams like it does
with futures:
use ;
let mut counter = 0;
join_me_maybe!;
assert_eq!;
mutable access to futures and streams
This feature is even more experimental than everything else above. In synchronous expressions
with mutable access to the calling scope (those after => and finally), label: cancellers
support an additional method: .as_pin_mut(). This returns an Option<Pin<&mut T>> pointing
to the corresponding future or stream. (Or None if it's already completed/cancelled.) You can
use this to mutate e.g. a FuturesUnordered or a StreamMap to add more work to it while
it's being polled. (Not literally while it's being polled, but while it's owned by
join_me_maybe! and guaranteed not to be "snoozed".) This is intended as an alternative to
patterns that await futures by reference, which tends to be prone to "snoozing" mistakes.
Unfortunately, streams that you can add work to dynamically are usually "poorly behaved" in the
sense that they often return Ready(None) for a while, until more work is eventually added and
they start returning Ready(Some(_)) again. This is at odds with the usual rule that you
shouldn't poll a stream again after it returns Ready(Some), but it does work with
select!-in-a-loop. (In Tokio it requires an if guard, and with futures::select! it leans
on the "fused" requirement.) However, it does not naturally work with join_me_maybe!, which
interprets Ready(None) as "end of stream" and promptly drops the whole stream. (Like it's
supposed to!) For a stream to work well with this feature, it needs to do two
things that as far as I know none of the dynamic streams currently do:
- The stream should only ever return
Ready(Some(_))orPending, until you somehow inform it that no more work is coming, using say a.close()method or something. After that the stream should probably drain its remaining work before returningReady(None). (If the caller doesn't to wait for remaining work, they can cancel the stream instead.) - Because adding more work might unblock callers that previously received
Pending, the stream should stash aWakerand invoke it whenever work is added.
Adapting a stream that doesn't behave this way is complicated and not obviously a good idea.
See tests/test.rs for some examples. Manually tracking Wakers is exactly the
sort of error-prone business that this crate wants to discourage, and this whole feature will
need a lot of baking before I can recommend it.
no_std
join_me_maybe! doesn't heap allocate and is compatible with #![no_std].