for-streams
The for_streams! macro, for driving multiple async Streams concurrently. for_streams!
works well with Tokio, but it doesn't depend on Tokio.
The simplest case
use for_streams;
for_streams!
That takes three milliseconds and prints 1 101 2 102 3 103. The behavior there is similar to
using StreamExt::for_each and futures::join! together like this:
join!;
However, importantly, using select! in a loop does not behave the same way:
let mut stream1 = iter.fuse;
let mut stream2 = iter.fuse;
loop
That approach takes six milliseconds, not three. select! is notorious for cancellation
footguns, but this is actually a different problem: the body of a select! arm doesn't run
concurrently with any other arms (neither their bodies nor their "scrutinees"). Using select!
in a loop to drive multiple streams is often a mistake, occasionally a deadlock but
frequently a silent performance bug.
And yet, select! in a loop gives us an appealing degree of control. Any of the bodies can
break the loop, for example, which is awkward to replicate with join!. This is what
for_streams! is about. It's like select! in a loop, but specifically for Streams, with
fewer footguns and several convenience features.
More interesting features
continue, break, and return are all supported. continue skips to the next element of
that stream, break stops reading from that stream, and return ends the whole macro (not the
calling function, similar to return in an async block). The only valid return type is ().
This example prints a2 b1 c1 a4 b2 c2 a6 c3 a8 and then exits:
for_streams!
Sometimes you have a stream that's finite, like a channel that will eventually close, and
another streams that's infinite, like a timer that ticks forever. You can use in background
(in place of in) to tell for_streams! not to wait for some arms to finish:
use interval;
use IntervalStream;
let timer = new;
for_streams!
The move keyword is supported and has the same effect as it would on a lambda or an async move block, making the block take ownership of all the values it references. This can be
useful if you need a channel writer or a lock guard to drop promptly when one arm is done:
use channel;
use ReceiverStream;
// This is a bounded channel, so the sender will block quickly on the
// second message if the receiver isn't reading concurrently.
let = ;
let mut outputs = Vecnew;
for_streams!
assert_eq!;