futures_concurrency/stream/chain/
tuple.rs1use core::fmt;
2use core::pin::Pin;
3use core::task::{Context, Poll};
4
5use futures_core::Stream;
6
7use super::Chain;
8
9macro_rules! impl_chain_for_tuple {
10 ($mod_name: ident $StructName:ident $($F:ident)+) => {
11 mod $mod_name {
12 #[repr(usize)]
13 enum Indexes {
14 $($F,)+
15 }
16
17 $(
18 pub(super) const $F: usize = Indexes::$F as usize;
19 )+
20
21 pub(super) const LEN: usize = [$(Indexes::$F,)+].len();
22 }
23
24 #[pin_project::pin_project]
25 pub struct $StructName<$($F,)+> {
26 index: usize,
27 done: bool,
28 $( #[pin] $F: $F,)+
29 }
30
31 impl<T, $($F,)+> Stream for $StructName<$($F,)+>
32 where
33 $($F: Stream<Item = T>,)+
34 {
35 type Item = T;
36
37 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38 let mut this = self.project();
39
40 assert!(!*this.done, "Stream should not be polled after completion");
41
42 loop {
43 if *this.index == $mod_name::LEN {
44 *this.done = true;
45 return Poll::Ready(None);
46 }
47
48 match *this.index {
49 $(
50 $mod_name::$F => {
51 let fut = unsafe { Pin::new_unchecked(&mut this.$F) };
52 match fut.poll_next(cx) {
53 Poll::Ready(None) => {
54 *this.index += 1;
55 continue;
56 }
57 v @ (Poll::Pending | Poll::Ready(Some(_))) => return v,
58 }
59 },
60 )+
61 _ => unreachable!(),
62 }
63 }
64 }
65 }
66
67 impl<$($F,)+> fmt::Debug for $StructName<$($F,)+>
68 where
69 $($F: fmt::Debug,)+
70 {
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 f.debug_tuple("Chain")
73 $(.field(&self.$F))+
74 .finish()
75 }
76 }
77
78 impl<T, $($F,)+> Chain for ($($F,)+)
79 where
80 $($F: Stream<Item = T>,)+
81 {
82 type Item = T;
83
84 type Stream = $StructName<$($F,)+>;
85
86 fn chain(self) -> Self::Stream {
87 let ($($F,)*): ($($F,)*) = self;
88 Self::Stream {
89 done: false,
90 index: 0,
91 $($F,)+
92 }
93 }
94 }
95 }
96}
97
98impl_chain_for_tuple! { chain_1 Chain1 A }
99impl_chain_for_tuple! { chain_2 Chain2 A B }
100impl_chain_for_tuple! { chain_3 Chain3 A B C }
101impl_chain_for_tuple! { chain_4 Chain4 A B C D }
102impl_chain_for_tuple! { chain_5 Chain5 A B C D E }
103impl_chain_for_tuple! { chain_6 Chain6 A B C D E F }
104impl_chain_for_tuple! { chain_7 Chain7 A B C D E F G }
105impl_chain_for_tuple! { chain_8 Chain8 A B C D E F G H }
106impl_chain_for_tuple! { chain_9 Chain9 A B C D E F G H I }
107impl_chain_for_tuple! { chain_10 Chain10 A B C D E F G H I J }
108impl_chain_for_tuple! { chain_11 Chain11 A B C D E F G H I J K }
109impl_chain_for_tuple! { chain_12 Chain12 A B C D E F G H I J K L }
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114
115 use futures_lite::future::block_on;
116 use futures_lite::prelude::*;
117 use futures_lite::stream;
118
119 #[test]
120 fn chain_3() {
121 block_on(async {
122 let a = stream::once(1);
123 let b = stream::once(2);
124 let c = stream::once(3);
125 let mut s = (a, b, c).chain();
126
127 assert_eq!(s.next().await, Some(1));
128 assert_eq!(s.next().await, Some(2));
129 assert_eq!(s.next().await, Some(3));
130 assert_eq!(s.next().await, None);
131 })
132 }
133}