for_streams/lib.rs
1//! **Deprecated:** I don't think this macro is the right approach anymore. See
2//! [`join_me_maybe!`](https://github.com/oconnor663/join_me_maybe/) instead.
3//!
4//! The `for_streams!` macro, for driving multiple async [`Stream`]s concurrently. The goal is to
5//! be more convenient and less error-prone than using `select!`-in-a-loop. The stretch goal is to
6//! make the case that most codebases should _ban_ `select!`-in-a-loop.
7//!
8//! `for_streams!` works well with [Tokio](https://tokio.rs/), but it doesn't depend on Tokio.
9//!
10//! # The simplest case
11//!
12//! Here's what it looks like to drive two streams concurrently:
13//!
14//! ```rust
15//! # use tokio::time::{sleep, Duration};
16//! # #[tokio::main]
17//! # async fn main() {
18//! use for_streams::for_streams;
19//!
20//! for_streams! {
21//! x in futures::stream::iter(1..=3) => {
22//! sleep(Duration::from_millis(1)).await;
23//! print!("{x} ");
24//! }
25//! y in futures::stream::iter(101..=103) => {
26//! sleep(Duration::from_millis(1)).await;
27//! print!("{y} ");
28//! }
29//! }
30//! # }
31//! ```
32//!
33//! That takes three milliseconds and prints `1 101 2 102 3 103`. The behavior above is similar to
34//! using [`StreamExt::for_each`][for_each] and [`futures::join!`][join] together like this:
35//!
36//! ```rust
37//! # use futures::StreamExt;
38//! # use tokio::time::{sleep, Duration};
39//! # #[tokio::main]
40//! # async fn main() {
41//! futures::join!(
42//! futures::stream::iter(1..=3).for_each(|x| async move {
43//! sleep(Duration::from_millis(1)).await;
44//! println!("{x}");
45//! }),
46//! futures::stream::iter(101..=103).for_each(|x| async move {
47//! sleep(Duration::from_millis(1)).await;
48//! println!("{x}");
49//! }),
50//! );
51//! # }
52//! ```
53//!
54//! However, importantly, using [`select!`][futures_select] in a loop does _not_ behave the same
55//! way:
56//!
57//! ```rust
58//! # use futures::StreamExt;
59//! # use tokio::time::{sleep, Duration};
60//! # #[tokio::main]
61//! # async fn main() {
62//! let mut stream1 = futures::stream::iter(1..=3).fuse();
63//! let mut stream2 = futures::stream::iter(101..=103).fuse();
64//! loop {
65//! futures::select! {
66//! x = stream1.next() => {
67//! if let Some(x) = x {
68//! sleep(Duration::from_millis(1)).await;
69//! println!("{x}");
70//! }
71//! }
72//! y = stream2.next() => {
73//! if let Some(y) = y {
74//! sleep(Duration::from_millis(1)).await;
75//! println!("{y}");
76//! }
77//! }
78//! complete => break,
79//! }
80//! }
81//! # }
82//! ```
83//!
84//! `select!`-in-a-loop takes _six_ milliseconds, not three. `select!` is
85//! [notorious][cancelling_async_rust] for cancellation footguns, but this is actually a different
86//! problem: the body of a `select!` arm doesn't run concurrently with any other arms (neither
87//! their futures nor their bodies). Using `select!`-in-a-loop is often a mistake, [occasionally a
88//! deadlock][deadlock] but frequently a silent performance bug.
89//!
90//! And yet, it does give us a lot of control. Any of the arms can `break` the loop, for example,
91//! or even `return` from the enclosing function. Replicating that sort of control flow with
92//! `join!` is awkward. This is what `for_streams!` is for. It's like `select!`-in-a-loop, but it's
93//! specifically for `Stream`s, with fewer footguns and several convenience features.
94//!
95//! # More interesting features
96//!
97//! ### Borrowing
98//!
99//! The bodies of `for_streams!` arms are free to borrow the enclosing scope. This doesn't usually
100//! work with [`for_each`][for_each], because of how its closure argument is structured (switching
101//! to [`AsyncFnMut`] closures might fix that eventually):
102//!
103//! ```rust
104//! # #[tokio::main]
105//! # async fn main() {
106//! # use for_streams::for_streams;
107//! let mut x = 0;
108//! let mut y = 0;
109//! for_streams! {
110//! _ in futures::stream::iter(0..10) => {
111//! x += 1;
112//! }
113//! _ in futures::stream::iter(0..20) => {
114//! y += 1;
115//! }
116//! }
117//! assert_eq!(x, 10);
118//! assert_eq!(y, 20);
119//! # }
120//! ```
121//!
122//! ### `continue`, `break`, and `return`
123//!
124//! `continue` skips to the next element of that stream, `break` stops reading from that stream,
125//! and `return` ends the whole macro immediately (not the calling function, similar to `return` in
126//! an `async` block). The only valid return type is `()`. This example prints `a1 b1 c1 a3 b2 c2
127//! a5 c3` and then exits:
128//!
129//! ```rust
130//! # use for_streams::for_streams;
131//! # use tokio::time::{sleep, Duration};
132//! # #[tokio::main]
133//! # async fn main() {
134//! for_streams! {
135//! a in futures::stream::iter(1..1_000_000_000) => {
136//! if a % 2 == 0 {
137//! continue; // Skip the even elements in this arm.
138//! }
139//! print!("a{a} ");
140//! sleep(Duration::from_millis(1)).await;
141//! }
142//! b in futures::stream::iter(1..1_000_000_000) => {
143//! print!("b{b} ");
144//! if b == 2 {
145//! break; // Stop this arm after two elements.
146//! }
147//! sleep(Duration::from_millis(1)).await;
148//! }
149//! c in futures::stream::iter(1..1_000_000_000) => {
150//! print!("c{c} ");
151//! if c == 3 {
152//! return; // Stop the whole loop after three elements.
153//! }
154//! sleep(Duration::from_millis(1)).await;
155//! }
156//! }
157//! # }
158//! ```
159//!
160//! TODO: Relatedly, `for_streams!` should guarantee that all the streams are polled in a
161//! round-robin fashion. Currently this example is a (nearly) infinite loop if we remove the
162//! sleeps, because control stays in the first arm until it's exhausted.
163//!
164//! ### `in background`
165//!
166//! Sometimes you have a stream that's finite, like a channel that will eventually close, and
167//! another stream that's infinite, like a timer that ticks forever. You can use `in background`
168//! (instead of `in`) to tell `for_streams!` not to wait for some arms to finish:
169//!
170//! ```rust
171//! # use for_streams::for_streams;
172//! # use tokio::time::{sleep, Duration};
173//! # #[tokio::main]
174//! # async fn main() {
175//! use tokio::time::interval;
176//! use tokio_stream::wrappers::IntervalStream;
177//!
178//! let timer = IntervalStream::new(interval(Duration::from_millis(1)));
179//! for_streams! {
180//! x in futures::stream::iter(1..10) => {
181//! sleep(Duration::from_millis(1)).await;
182//! println!("{x}");
183//! }
184//! // We'll never reach the end of this `timer` stream, but `in background`
185//! // means we'll exit when the first arm is done, instead of ticking forever.
186//! _ in background timer => {
187//! println!("tick");
188//! }
189//! }
190//! # }
191//! ```
192//!
193//! ### `move`
194//!
195//! The `move` keyword has the same effect as it would on a lambda or an `async move` block, making
196//! the block take ownership of all the values it references. This can be useful if you need a
197//! channel writer or a lock guard to drop promptly when one arm is done:
198//!
199//! ```rust
200//! # use for_streams::for_streams;
201//! # #[tokio::main]
202//! # async fn main() {
203//! use tokio::sync::mpsc::channel;
204//! use tokio_stream::wrappers::ReceiverStream;
205//!
206//! // This is a bounded channel, so the sender will block quickly on the
207//! // second message if the receiver isn't reading concurrently.
208//! let (sender, receiver) = channel::<i32>(1);
209//! let mut outputs = Vec::new();
210//! for_streams! {
211//! // The `move` keyword makes this arm take ownership of `sender`, which
212//! // means that `sender` drops as soon as this branch is finished. This
213//! // example would deadlock without it.
214//! val in tokio_stream::iter(1..=5) => move {
215//! sender.send(val).await.unwrap();
216//! }
217//! // This arm borrows `outputs` but can't take ownership of it, because
218//! // we use it again below in the assert.
219//! val in ReceiverStream::new(receiver) => {
220//! outputs.push(val);
221//! }
222//! }
223//! assert_eq!(outputs, vec![1, 2, 3, 4, 5]);
224//! # }
225//! ```
226//!
227//! # `select!` gotchas that `for_streams!` helps with
228//!
229//! Note that there are two common `select!` macros out there, the [Tokio version][tokio_select]
230//! and the [`futures` version][futures_select]. Unless noted otherwise, everything in this section
231//! applies to both.
232//!
233//! ### Concurrency
234//!
235//! We saw this in "The simplest case" above. `select!` polls a set of futures concurrently, and
236//! once one of them completes, it cancels the others and executes the body of the matching arm.
237//! That makes sense for racing futures against each other and picking a winner, but it's usually
238//! not what we want for driving multiple streams in a loop. An `.await` in one arm holds up the
239//! entire loop and stops the other streams from making progress.
240//!
241//! `for_streams!` always runs its arms concurrently. As with [`for_each`][for_each], each arm
242//! alternates between polling its stream and polling its body.
243//!
244//! TODO: `for_streams!` could support some sort of `buffered(N)` keyword, which would let us solve
245//! the [Barbara Battles Buffered Streams][barbara] problem directly?
246//!
247//! ### Cancellation
248//!
249//! The futures that come from [`StreamExt::next`] are generally ["cancel-safe"][cancel safety],
250//! but `select!` supports arbitrary futures, and this can lead to [confusing
251//! bugs][cancelling_async_rust]. `for_streams!` only works with streams, so for example you might
252//! need to use [`futures::stream::once`] to adapt a one-off future, but the upside is that by
253//! default you're not exposed to cancellation at all.
254//!
255//! `for_streams!` does support cancellation, using either `return` or the `background` keyword.
256//! The hope is that cancellations you ask for will be less confusing than ones that happen
257//! "randomly".
258//!
259//! TODO: `for_streams!` could guarantee that partially executed bodies are always allowed to
260//! complete before exiting?
261//!
262//! ### Fusing and pinning
263//!
264//! We saw an example of [`futures::select!`][futures_select] in the "The simplest case" above.
265//! Here's the same example using [`tokio::select!`][tokio_select] instead:
266//!
267//! ```rust
268//! # use futures::StreamExt;
269//! # use tokio::time::{Duration, sleep};
270//! # #[tokio::main]
271//! # async fn main() {
272//! let mut stream1 = futures::stream::iter(1..=3).fuse();
273//! let mut stream2 = futures::stream::iter(101..=103).fuse();
274//! loop {
275//! tokio::select! {
276//! x = stream1.next(), if !stream1.is_done() => {
277//! if let Some(x) = x {
278//! sleep(Duration::from_millis(1)).await;
279//! println!("{x}");
280//! }
281//! }
282//! y = stream2.next(), if !stream2.is_done() => {
283//! if let Some(y) = y {
284//! sleep(Duration::from_millis(1)).await;
285//! println!("{y}");
286//! }
287//! }
288//! else => break,
289//! }
290//! }
291//! # }
292//! ```
293//!
294//! Both versions use [`StreamExt::fuse`][fuse] to keep track of which streams have ended and
295//! shouldn't be polled again. Fusing is mandatory in the `futures` version, and that example won't
296//! compile without it. It's optional in the Tokio version, and in that case it also needs explicit
297//! `if` guards to avoid an infinite loop. In both versions, a common mistake is to call `.fuse()`
298//! _inside_ the loop instead of once at the top, which satisfies the type checker but doesn't work
299//! as intended. Both versions also require an explicit `complete`/`else` arm to `break` the loop,
300//! otherwise you get a panic at the end. Finally, depending on what sort of streams you're using,
301//! both version migth require you to explicitly [`pin!`] them.
302//!
303//! `for_streams!` takes care of all of this for you. It usually takes ownership of the streams you
304//! give it, and all the bookkeeping and pinning is internal. However, you can also use
305//! `for_streams!` with a `&mut` reference to a stream, and if you do that you might need to
306//! `.fuse()` it and/or `pin!` it yourself.
307//!
308//! [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
309//! [tokio_select]: https://docs.rs/tokio/latest/tokio/macro.select.html
310//! [futures_select]: https://docs.rs/futures/latest/futures/macro.select.html
311//! [for_each]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.for_each
312//! [join]: https://docs.rs/futures/latest/futures/macro.join.html
313//! [`select!`]: https://docs.rs/futures/latest/futures/macro.select.html
314//! [notorious]: https://sunshowers.io/posts/cancelling-async-rust/
315//! [deadlock]: https://rfd.shared.oxide.computer/rfd/0609
316//! [barbara]: https://without.boats/blog/poll-progress/
317//! [`StreamExt::next`]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.next
318//! [cancel safety]: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
319//! [cancelling_async_rust]: https://sunshowers.io/posts/cancelling-async-rust/
320//! [`futures::stream::once`]: https://docs.rs/futures/latest/futures/stream/fn.once.html
321//! [fuse]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.fuse
322//! [`pin!`]: https://doc.rust-lang.org/stable/std/pin/macro.pin.html
323//! [`AsyncFnMut`]: https://doc.rust-lang.org/std/ops/trait.AsyncFnMut.html
324
325use proc_macro2::{Span, TokenStream as TokenStream2};
326use quote::{ToTokens, format_ident, quote};
327use syn::{
328 Block, Expr, Ident, Pat, Token,
329 parse::{Parse, ParseStream},
330 parse_macro_input,
331};
332
333mod kw {
334 syn::custom_keyword!(background);
335}
336
337struct Arm {
338 pattern: Pat,
339 stream_expr: Expr,
340 body: Block,
341 is_background: bool,
342 is_move: bool,
343}
344
345impl Parse for Arm {
346 fn parse(input: ParseStream) -> syn::Result<Self> {
347 let pattern = Pat::parse_single(input)?;
348 _ = input.parse::<Token![in]>()?;
349 // Check whether we can parse a stream expression after `background`. If not, `background`
350 // itself could be the stream expression (i.e. a local variable name).
351 let fork = input.fork();
352 let is_background = fork.parse::<kw::background>().is_ok() && fork.parse::<Expr>().is_ok();
353 if is_background {
354 _ = input.parse::<kw::background>()?;
355 }
356 let stream_expr = input.parse()?;
357 _ = input.parse::<Token![=>]>()?;
358 let is_move = input.parse::<Token![move]>().is_ok();
359 let body = input.parse()?;
360 Ok(Self {
361 pattern,
362 stream_expr,
363 body,
364 is_background,
365 is_move,
366 })
367 }
368}
369
370struct ForStreams {
371 arms: Vec<Arm>,
372}
373
374impl Parse for ForStreams {
375 fn parse(input: ParseStream) -> syn::Result<Self> {
376 let mut arms = Vec::new();
377 while !input.is_empty() {
378 let arm = input.parse::<Arm>()?;
379 arms.push(arm);
380 }
381 Ok(Self { arms })
382 }
383}
384
385impl ToTokens for ForStreams {
386 fn to_tokens(&self, tokens: &mut TokenStream2) {
387 let mut initializers = TokenStream2::new();
388 let cancel_flag = format_ident!("cancel_flag", span = Span::mixed_site());
389 let arm_names: Vec<Ident> = (0..self.arms.len())
390 .map(|i| format_ident!("arm_{}", i, span = Span::mixed_site()))
391 .collect();
392 for i in 0..self.arms.len() {
393 let Arm {
394 pattern,
395 stream_expr,
396 body,
397 is_background: _,
398 is_move,
399 } = &self.arms[i];
400 let move_token = if *is_move {
401 quote! { move }
402 } else {
403 quote! {}
404 };
405 let returned_early = format_ident!("returned_early", span = Span::mixed_site());
406 let returned_early_ref = format_ident!("returned_early_ref", span = Span::mixed_site());
407 let stream = format_ident!("stream", span = Span::mixed_site());
408 let name = &arm_names[i];
409 initializers.extend(quote! {
410 let mut #name = ::std::pin::pin!(::futures::future::FutureExt::fuse({
411 async {
412 let mut #returned_early = true;
413 // For the `move` case, we need to explicitly take a reference to
414 // `returned_early`, so that we don't copy it.
415 let #returned_early_ref = &mut #returned_early;
416 let _: () = async #move_token {
417 let mut #stream = ::std::pin::pin!(#stream_expr);
418 while let Some(#pattern) = ::futures::stream::StreamExt::next(&mut #stream).await {
419 // NOTE: The #body may `continue`, `break`, or `return`.
420 #body
421 }
422 *#returned_early_ref = false;
423 }.await;
424 if #returned_early {
425 ::std::sync::atomic::AtomicBool::store(&#cancel_flag, true, ::std::sync::atomic::Ordering::Relaxed);
426 }
427 }
428 }));
429 });
430 }
431
432 let mut poll_calls = TokenStream2::new();
433 let foreground_finished = format_ident!("foreground_finished", span = Span::mixed_site());
434 let cx = format_ident!("cx", span = Span::mixed_site());
435 for i in 0..self.arms.len() {
436 let name = &arm_names[i];
437 poll_calls.extend(quote! {
438 // NOTE: These are fused, so we can poll them unconditionally.
439 _ = ::std::future::Future::poll(::std::pin::Pin::as_mut(&mut #name), #cx);
440 });
441 if !self.arms[i].is_background {
442 poll_calls.extend(quote! {
443 #foreground_finished &= ::futures::future::FusedFuture::is_terminated(&#name);
444 });
445 }
446 }
447
448 tokens.extend(quote! {
449 {
450 let mut #cancel_flag = ::std::sync::atomic::AtomicBool::new(false);
451 #initializers
452 ::std::future::poll_fn(|#cx| {
453 let mut #foreground_finished = true;
454 #poll_calls
455 if ::std::sync::atomic::AtomicBool::load(&#cancel_flag, ::std::sync::atomic::Ordering::Relaxed) {
456 return ::std::task::Poll::Ready(());
457 }
458 if #foreground_finished {
459 return ::std::task::Poll::Ready(());
460 }
461 ::std::task::Poll::Pending
462 }).await;
463 }
464 });
465 }
466}
467
468#[proc_macro]
469pub fn for_streams(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
470 let c = parse_macro_input!(input as ForStreams);
471 quote! { #c }.into()
472}