async_macros/join_stream.rs
1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use futures_core::Stream;
5
6/// A stream joining two or more streams.
7///
8/// This stream is returned by `join!`.
9#[derive(Debug)]
10pub struct JoinStream<L, R> {
11 left: L,
12 right: R,
13}
14
15impl<L, R> Unpin for JoinStream<L, R> {}
16
17impl<L, R> JoinStream<L, R> {
18 #[doc(hidden)]
19 pub fn new(left: L, right: R) -> Self {
20 Self { left, right }
21 }
22}
23
24impl<L, R, T> Stream for JoinStream<L, R>
25where
26 L: Stream<Item = T> + Unpin,
27 R: Stream<Item = T> + Unpin,
28{
29 type Item = T;
30
31 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32 if let Poll::Ready(Some(item)) = Pin::new(&mut self.left).poll_next(cx) {
33 // The first stream made progress. The JoinStream needs to be polled
34 // again to check the progress of the second stream.
35 cx.waker().wake_by_ref();
36 Poll::Ready(Some(item))
37 } else {
38 Pin::new(&mut self.right).poll_next(cx)
39 }
40 }
41}
42
43/// Combines multiple streams into a single stream of all their outputs.
44///
45/// This macro is only usable inside of async functions, closures, and blocks.
46///
47/// # Examples
48///
49/// ```
50/// # futures::executor::block_on(async {
51/// use async_macros::join_stream as join;
52/// use futures::stream::{self, StreamExt};
53/// use futures::future::ready;
54///
55/// let a = stream::once(ready(1u8));
56/// let b = stream::once(ready(2u8));
57/// let c = stream::once(ready(3u8));
58///
59/// let mut s = join!(a, b, c);
60///
61/// assert_eq!(s.next().await, Some(1u8));
62/// assert_eq!(s.next().await, Some(2u8));
63/// assert_eq!(s.next().await, Some(3u8));
64/// assert_eq!(s.next().await, None);
65/// # });
66/// ```
67#[macro_export]
68macro_rules! join_stream {
69 ($stream1:ident, $stream2:ident, $($stream:ident),* $(,)?) => {{
70 let joined = $crate::JoinStream::new($stream1, $stream2);
71 $(
72 let joined = $crate::JoinStream::new(joined, $stream);
73 )*
74 joined
75 }};
76}