1use crate::stream::IntoStream;
2use crate::utils;
3use crate::MergeStreams;
4
5use futures_core::Stream;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9macro_rules! poll_in_order {
10 ($cx:expr, $stream:expr) => { $stream.poll_next($cx) };
11 ($cx:expr, $stream:expr, $($next:tt),*) => {{
12 let mut pending = false;
13 match $stream.poll_next($cx) {
14 Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
15 Poll::Pending => { pending = true; }
16 Poll::Ready(None) => {},
17 }
18 match poll_in_order!($cx, $($next),*) {
19 Poll::Ready(None) if pending => Poll::Pending,
20 other => other,
21 }
22 }};
23}
24
25impl<T, S0, S1> MergeStreams for (S0, S1)
26where
27 S0: IntoStream<Item = T>,
28 S1: IntoStream<Item = T>,
29{
30 type Item = T;
31 type Stream = Merge2<T, S0::IntoStream, S1::IntoStream>;
32
33 fn merge(self) -> Self::Stream {
34 Merge2::new((self.0.into_stream(), self.1.into_stream()))
35 }
36}
37
38#[derive(Debug)]
46#[pin_project::pin_project]
47pub struct Merge2<T, S0, S1>
48where
49 S0: Stream<Item = T>,
50 S1: Stream<Item = T>,
51{
52 streams: (S0, S1),
53}
54
55impl<T, S0, S1> Merge2<T, S0, S1>
56where
57 S0: Stream<Item = T>,
58 S1: Stream<Item = T>,
59{
60 pub(crate) fn new(streams: (S0, S1)) -> Self {
61 Self { streams }
62 }
63}
64
65impl<T, S0, S1> Stream for Merge2<T, S0, S1>
66where
67 S0: Stream<Item = T>,
68 S1: Stream<Item = T>,
69{
70 type Item = T;
71
72 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
73 let this = self.project();
74 let s0 = unsafe { Pin::new_unchecked(&mut this.streams.0) };
76 let s1 = unsafe { Pin::new_unchecked(&mut this.streams.1) };
77 match utils::random(2) {
78 0 => poll_in_order!(cx, s0, s1),
79 1 => poll_in_order!(cx, s1, s0),
80 _ => unreachable!(),
81 }
82 }
83}
84
85impl<T, S0, S1, S2> MergeStreams for (S0, S1, S2)
88where
89 S0: IntoStream<Item = T>,
90 S1: IntoStream<Item = T>,
91 S2: IntoStream<Item = T>,
92{
93 type Item = T;
94 type Stream = Merge3<T, S0::IntoStream, S1::IntoStream, S2::IntoStream>;
95
96 fn merge(self) -> Self::Stream {
97 Merge3::new((
98 self.0.into_stream(),
99 self.1.into_stream(),
100 self.2.into_stream(),
101 ))
102 }
103}
104
105#[derive(Debug)]
113#[pin_project::pin_project]
114pub struct Merge3<T, S0, S1, S2>
115where
116 S0: Stream<Item = T>,
117 S1: Stream<Item = T>,
118 S2: Stream<Item = T>,
119{
120 streams: (S0, S1, S2),
121}
122
123impl<T, S0, S1, S2> Merge3<T, S0, S1, S2>
124where
125 S0: Stream<Item = T>,
126 S1: Stream<Item = T>,
127 S2: Stream<Item = T>,
128{
129 pub(crate) fn new(streams: (S0, S1, S2)) -> Self {
130 Self { streams }
131 }
132}
133
134impl<T, S0, S1, S2> Stream for Merge3<T, S0, S1, S2>
135where
136 S0: Stream<Item = T>,
137 S1: Stream<Item = T>,
138 S2: Stream<Item = T>,
139{
140 type Item = T;
141
142 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
143 let this = self.project();
144 let s0 = unsafe { Pin::new_unchecked(&mut this.streams.0) };
146 let s1 = unsafe { Pin::new_unchecked(&mut this.streams.1) };
147 let s2 = unsafe { Pin::new_unchecked(&mut this.streams.2) };
148 match utils::random(6) {
149 0 => poll_in_order!(cx, s0, s1, s2),
150 1 => poll_in_order!(cx, s0, s2, s1),
151 2 => poll_in_order!(cx, s1, s0, s2),
152 3 => poll_in_order!(cx, s1, s2, s0),
153 4 => poll_in_order!(cx, s2, s0, s1),
154 5 => poll_in_order!(cx, s2, s1, s0),
155 _ => unreachable!(),
156 }
157 }
158}
159impl<T, S0, S1, S2, S3> MergeStreams for (S0, S1, S2, S3)
160where
161 S0: IntoStream<Item = T>,
162 S1: IntoStream<Item = T>,
163 S2: IntoStream<Item = T>,
164 S3: IntoStream<Item = T>,
165{
166 type Item = T;
167 type Stream = Merge4<T, S0::IntoStream, S1::IntoStream, S2::IntoStream, S3::IntoStream>;
168
169 fn merge(self) -> Self::Stream {
170 Merge4::new((
171 self.0.into_stream(),
172 self.1.into_stream(),
173 self.2.into_stream(),
174 self.3.into_stream(),
175 ))
176 }
177}
178
179#[derive(Debug)]
187#[pin_project::pin_project]
188pub struct Merge4<T, S0, S1, S2, S3>
189where
190 S0: Stream<Item = T>,
191 S1: Stream<Item = T>,
192 S2: Stream<Item = T>,
193 S3: Stream<Item = T>,
194{
195 streams: (S0, S1, S2, S3),
196}
197
198impl<T, S0, S1, S2, S3> Merge4<T, S0, S1, S2, S3>
199where
200 S0: Stream<Item = T>,
201 S1: Stream<Item = T>,
202 S2: Stream<Item = T>,
203 S3: Stream<Item = T>,
204{
205 pub(crate) fn new(streams: (S0, S1, S2, S3)) -> Self {
206 Self { streams }
207 }
208}
209
210impl<T, S0, S1, S2, S3> Stream for Merge4<T, S0, S1, S2, S3>
211where
212 S0: Stream<Item = T>,
213 S1: Stream<Item = T>,
214 S2: Stream<Item = T>,
215 S3: Stream<Item = T>,
216{
217 type Item = T;
218
219 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
220 let this = self.project();
221 let s0 = unsafe { Pin::new_unchecked(&mut this.streams.0) };
223 let s1 = unsafe { Pin::new_unchecked(&mut this.streams.1) };
224 let s2 = unsafe { Pin::new_unchecked(&mut this.streams.2) };
225 let s3 = unsafe { Pin::new_unchecked(&mut this.streams.3) };
226 match utils::random(24) {
227 0 => poll_in_order!(cx, s0, s1, s2, s3),
229 1 => poll_in_order!(cx, s0, s1, s3, s2),
230 2 => poll_in_order!(cx, s0, s2, s1, s3),
231 3 => poll_in_order!(cx, s0, s2, s3, s1),
232 4 => poll_in_order!(cx, s0, s3, s1, s2),
233 5 => poll_in_order!(cx, s0, s3, s2, s1),
234 6 => poll_in_order!(cx, s1, s0, s2, s3),
236 7 => poll_in_order!(cx, s1, s0, s3, s2),
237 8 => poll_in_order!(cx, s1, s2, s0, s3),
238 9 => poll_in_order!(cx, s1, s2, s3, s0),
239 10 => poll_in_order!(cx, s1, s3, s0, s2),
240 11 => poll_in_order!(cx, s1, s3, s2, s0),
241 12 => poll_in_order!(cx, s2, s0, s1, s3),
243 13 => poll_in_order!(cx, s2, s0, s3, s1),
244 14 => poll_in_order!(cx, s2, s1, s0, s3),
245 15 => poll_in_order!(cx, s2, s1, s3, s0),
246 16 => poll_in_order!(cx, s2, s3, s0, s1),
247 17 => poll_in_order!(cx, s2, s3, s1, s0),
248 18 => poll_in_order!(cx, s3, s0, s1, s2),
250 19 => poll_in_order!(cx, s3, s0, s2, s1),
251 20 => poll_in_order!(cx, s3, s1, s0, s2),
252 21 => poll_in_order!(cx, s3, s1, s2, s0),
253 22 => poll_in_order!(cx, s3, s2, s0, s1),
254 23 => poll_in_order!(cx, s3, s2, s1, s0),
255 _ => unreachable!(),
256 }
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263
264 #[test]
265 fn merge_tuple_4() {
266 use futures_lite::future::block_on;
267 use futures_lite::{stream, StreamExt};
268
269 block_on(async {
270 let a = stream::once(1);
271 let b = stream::once(2);
272 let c = stream::once(3);
273 let d = stream::once(4);
274 let s = (a, b, c, d).merge();
275
276 let mut counter = 0;
277 s.for_each(|n| counter += n).await;
278 assert_eq!(counter, 10);
279 })
280 }
281}