merge_streams/merge/
tuple.rs

1use crate::stream::IntoStream;
2use crate::utils;
3use crate::MergeStreams;
4
5use futures_core::Stream;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9macro_rules! poll_in_order {
10    ($cx:expr, $stream:expr) => { $stream.poll_next($cx) };
11    ($cx:expr, $stream:expr, $($next:tt),*) => {{
12        let mut pending = false;
13        match $stream.poll_next($cx) {
14            Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
15            Poll::Pending => { pending = true; }
16            Poll::Ready(None) => {},
17        }
18        match poll_in_order!($cx, $($next),*) {
19            Poll::Ready(None) if pending => Poll::Pending,
20            other => other,
21        }
22    }};
23}
24
25impl<T, S0, S1> MergeStreams for (S0, S1)
26where
27    S0: IntoStream<Item = T>,
28    S1: IntoStream<Item = T>,
29{
30    type Item = T;
31    type Stream = Merge2<T, S0::IntoStream, S1::IntoStream>;
32
33    fn merge(self) -> Self::Stream {
34        Merge2::new((self.0.into_stream(), self.1.into_stream()))
35    }
36}
37
38/// A stream that merges multiple streams into a single stream.
39///
40/// This `struct` is created by the [`merge`] method on [`Stream`]. See its
41/// documentation for more.
42///
43/// [`merge`]: trait.Stream.html#method.merge
44/// [`Stream`]: trait.Stream.html
45#[derive(Debug)]
46#[pin_project::pin_project]
47pub struct Merge2<T, S0, S1>
48where
49    S0: Stream<Item = T>,
50    S1: Stream<Item = T>,
51{
52    streams: (S0, S1),
53}
54
55impl<T, S0, S1> Merge2<T, S0, S1>
56where
57    S0: Stream<Item = T>,
58    S1: Stream<Item = T>,
59{
60    pub(crate) fn new(streams: (S0, S1)) -> Self {
61        Self { streams }
62    }
63}
64
65impl<T, S0, S1> Stream for Merge2<T, S0, S1>
66where
67    S0: Stream<Item = T>,
68    S1: Stream<Item = T>,
69{
70    type Item = T;
71
72    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
73        let this = self.project();
74        // SAFETY: we're manually projecting the tuple fields here.
75        let s0 = unsafe { Pin::new_unchecked(&mut this.streams.0) };
76        let s1 = unsafe { Pin::new_unchecked(&mut this.streams.1) };
77        match utils::random(2) {
78            0 => poll_in_order!(cx, s0, s1),
79            1 => poll_in_order!(cx, s1, s0),
80            _ => unreachable!(),
81        }
82    }
83}
84
85// TODO: automate this!
86
87impl<T, S0, S1, S2> MergeStreams for (S0, S1, S2)
88where
89    S0: IntoStream<Item = T>,
90    S1: IntoStream<Item = T>,
91    S2: IntoStream<Item = T>,
92{
93    type Item = T;
94    type Stream = Merge3<T, S0::IntoStream, S1::IntoStream, S2::IntoStream>;
95
96    fn merge(self) -> Self::Stream {
97        Merge3::new((
98            self.0.into_stream(),
99            self.1.into_stream(),
100            self.2.into_stream(),
101        ))
102    }
103}
104
105/// A stream that merges multiple streams into a single stream.
106///
107/// This `struct` is created by the [`merge`] method on [`Stream`]. See its
108/// documentation for more.
109///
110/// [`merge`]: trait.Stream.html#method.merge
111/// [`Stream`]: trait.Stream.html
112#[derive(Debug)]
113#[pin_project::pin_project]
114pub struct Merge3<T, S0, S1, S2>
115where
116    S0: Stream<Item = T>,
117    S1: Stream<Item = T>,
118    S2: Stream<Item = T>,
119{
120    streams: (S0, S1, S2),
121}
122
123impl<T, S0, S1, S2> Merge3<T, S0, S1, S2>
124where
125    S0: Stream<Item = T>,
126    S1: Stream<Item = T>,
127    S2: Stream<Item = T>,
128{
129    pub(crate) fn new(streams: (S0, S1, S2)) -> Self {
130        Self { streams }
131    }
132}
133
134impl<T, S0, S1, S2> Stream for Merge3<T, S0, S1, S2>
135where
136    S0: Stream<Item = T>,
137    S1: Stream<Item = T>,
138    S2: Stream<Item = T>,
139{
140    type Item = T;
141
142    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
143        let this = self.project();
144        // SAFETY: we're manually projecting the tuple fields here.
145        let s0 = unsafe { Pin::new_unchecked(&mut this.streams.0) };
146        let s1 = unsafe { Pin::new_unchecked(&mut this.streams.1) };
147        let s2 = unsafe { Pin::new_unchecked(&mut this.streams.2) };
148        match utils::random(6) {
149            0 => poll_in_order!(cx, s0, s1, s2),
150            1 => poll_in_order!(cx, s0, s2, s1),
151            2 => poll_in_order!(cx, s1, s0, s2),
152            3 => poll_in_order!(cx, s1, s2, s0),
153            4 => poll_in_order!(cx, s2, s0, s1),
154            5 => poll_in_order!(cx, s2, s1, s0),
155            _ => unreachable!(),
156        }
157    }
158}
159impl<T, S0, S1, S2, S3> MergeStreams for (S0, S1, S2, S3)
160where
161    S0: IntoStream<Item = T>,
162    S1: IntoStream<Item = T>,
163    S2: IntoStream<Item = T>,
164    S3: IntoStream<Item = T>,
165{
166    type Item = T;
167    type Stream = Merge4<T, S0::IntoStream, S1::IntoStream, S2::IntoStream, S3::IntoStream>;
168
169    fn merge(self) -> Self::Stream {
170        Merge4::new((
171            self.0.into_stream(),
172            self.1.into_stream(),
173            self.2.into_stream(),
174            self.3.into_stream(),
175        ))
176    }
177}
178
179/// A stream that merges multiple streams into a single stream.
180///
181/// This `struct` is created by the [`merge`] method on [`Stream`]. See its
182/// documentation for more.
183///
184/// [`merge`]: trait.Stream.html#method.merge
185/// [`Stream`]: trait.Stream.html
186#[derive(Debug)]
187#[pin_project::pin_project]
188pub struct Merge4<T, S0, S1, S2, S3>
189where
190    S0: Stream<Item = T>,
191    S1: Stream<Item = T>,
192    S2: Stream<Item = T>,
193    S3: Stream<Item = T>,
194{
195    streams: (S0, S1, S2, S3),
196}
197
198impl<T, S0, S1, S2, S3> Merge4<T, S0, S1, S2, S3>
199where
200    S0: Stream<Item = T>,
201    S1: Stream<Item = T>,
202    S2: Stream<Item = T>,
203    S3: Stream<Item = T>,
204{
205    pub(crate) fn new(streams: (S0, S1, S2, S3)) -> Self {
206        Self { streams }
207    }
208}
209
210impl<T, S0, S1, S2, S3> Stream for Merge4<T, S0, S1, S2, S3>
211where
212    S0: Stream<Item = T>,
213    S1: Stream<Item = T>,
214    S2: Stream<Item = T>,
215    S3: Stream<Item = T>,
216{
217    type Item = T;
218
219    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
220        let this = self.project();
221        // SAFETY: we're manually projecting the tuple fields here.
222        let s0 = unsafe { Pin::new_unchecked(&mut this.streams.0) };
223        let s1 = unsafe { Pin::new_unchecked(&mut this.streams.1) };
224        let s2 = unsafe { Pin::new_unchecked(&mut this.streams.2) };
225        let s3 = unsafe { Pin::new_unchecked(&mut this.streams.3) };
226        match utils::random(24) {
227            // s0 first
228            0 => poll_in_order!(cx, s0, s1, s2, s3),
229            1 => poll_in_order!(cx, s0, s1, s3, s2),
230            2 => poll_in_order!(cx, s0, s2, s1, s3),
231            3 => poll_in_order!(cx, s0, s2, s3, s1),
232            4 => poll_in_order!(cx, s0, s3, s1, s2),
233            5 => poll_in_order!(cx, s0, s3, s2, s1),
234            // s1 first
235            6 => poll_in_order!(cx, s1, s0, s2, s3),
236            7 => poll_in_order!(cx, s1, s0, s3, s2),
237            8 => poll_in_order!(cx, s1, s2, s0, s3),
238            9 => poll_in_order!(cx, s1, s2, s3, s0),
239            10 => poll_in_order!(cx, s1, s3, s0, s2),
240            11 => poll_in_order!(cx, s1, s3, s2, s0),
241            // s2 first
242            12 => poll_in_order!(cx, s2, s0, s1, s3),
243            13 => poll_in_order!(cx, s2, s0, s3, s1),
244            14 => poll_in_order!(cx, s2, s1, s0, s3),
245            15 => poll_in_order!(cx, s2, s1, s3, s0),
246            16 => poll_in_order!(cx, s2, s3, s0, s1),
247            17 => poll_in_order!(cx, s2, s3, s1, s0),
248            // s3 first
249            18 => poll_in_order!(cx, s3, s0, s1, s2),
250            19 => poll_in_order!(cx, s3, s0, s2, s1),
251            20 => poll_in_order!(cx, s3, s1, s0, s2),
252            21 => poll_in_order!(cx, s3, s1, s2, s0),
253            22 => poll_in_order!(cx, s3, s2, s0, s1),
254            23 => poll_in_order!(cx, s3, s2, s1, s0),
255            _ => unreachable!(),
256        }
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[test]
265    fn merge_tuple_4() {
266        use futures_lite::future::block_on;
267        use futures_lite::{stream, StreamExt};
268
269        block_on(async {
270            let a = stream::once(1);
271            let b = stream::once(2);
272            let c = stream::once(3);
273            let d = stream::once(4);
274            let s = (a, b, c, d).merge();
275
276            let mut counter = 0;
277            s.for_each(|n| counter += n).await;
278            assert_eq!(counter, 10);
279        })
280    }
281}