futures_concurrency/stream/chain/
tuple.rs

1use core::fmt;
2use core::pin::Pin;
3use core::task::{Context, Poll};
4
5use futures_core::Stream;
6
7use super::Chain;
8
9macro_rules! impl_chain_for_tuple {
10    ($mod_name: ident $StructName:ident $($F:ident)+) => {
11        mod $mod_name {
12            #[repr(usize)]
13            enum Indexes {
14                $($F,)+
15            }
16
17            $(
18                pub(super) const $F: usize = Indexes::$F as usize;
19            )+
20
21            pub(super) const LEN: usize = [$(Indexes::$F,)+].len();
22        }
23
24        #[pin_project::pin_project]
25        pub struct $StructName<$($F,)+> {
26            index: usize,
27            done: bool,
28            $( #[pin] $F: $F,)+
29        }
30
31        impl<T, $($F,)+> Stream for $StructName<$($F,)+>
32        where
33            $($F: Stream<Item = T>,)+
34        {
35            type Item = T;
36
37            fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38                let mut this = self.project();
39
40                assert!(!*this.done, "Stream should not be polled after completion");
41
42                loop {
43                    if *this.index == $mod_name::LEN {
44                        *this.done = true;
45                        return Poll::Ready(None);
46                    }
47
48                    match *this.index {
49                        $(
50                            $mod_name::$F => {
51                                let fut = unsafe { Pin::new_unchecked(&mut this.$F) };
52                                match fut.poll_next(cx) {
53                                    Poll::Ready(None) => {
54                                        *this.index += 1;
55                                        continue;
56                                    }
57                                    v @ (Poll::Pending | Poll::Ready(Some(_))) => return v,
58                                }
59                            },
60                        )+
61                        _  => unreachable!(),
62                    }
63                }
64            }
65        }
66
67        impl<$($F,)+> fmt::Debug for $StructName<$($F,)+>
68        where
69            $($F: fmt::Debug,)+
70        {
71            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72                f.debug_tuple("Chain")
73                    $(.field(&self.$F))+
74                    .finish()
75            }
76        }
77
78        impl<T, $($F,)+> Chain for ($($F,)+)
79        where
80            $($F: Stream<Item = T>,)+
81        {
82            type Item = T;
83
84            type Stream = $StructName<$($F,)+>;
85
86            fn chain(self) -> Self::Stream {
87                let ($($F,)*): ($($F,)*) = self;
88                Self::Stream {
89                    done: false,
90                    index: 0,
91                    $($F,)+
92                }
93            }
94        }
95    }
96}
97
98impl_chain_for_tuple! { chain_1 Chain1 A }
99impl_chain_for_tuple! { chain_2 Chain2 A B }
100impl_chain_for_tuple! { chain_3 Chain3 A B C }
101impl_chain_for_tuple! { chain_4 Chain4 A B C D }
102impl_chain_for_tuple! { chain_5 Chain5 A B C D E }
103impl_chain_for_tuple! { chain_6 Chain6 A B C D E F }
104impl_chain_for_tuple! { chain_7 Chain7 A B C D E F G }
105impl_chain_for_tuple! { chain_8 Chain8 A B C D E F G H }
106impl_chain_for_tuple! { chain_9 Chain9 A B C D E F G H I }
107impl_chain_for_tuple! { chain_10 Chain10 A B C D E F G H I J }
108impl_chain_for_tuple! { chain_11 Chain11 A B C D E F G H I J K }
109impl_chain_for_tuple! { chain_12 Chain12 A B C D E F G H I J K L }
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114
115    use futures_lite::future::block_on;
116    use futures_lite::prelude::*;
117    use futures_lite::stream;
118
119    #[test]
120    fn chain_3() {
121        block_on(async {
122            let a = stream::once(1);
123            let b = stream::once(2);
124            let c = stream::once(3);
125            let mut s = (a, b, c).chain();
126
127            assert_eq!(s.next().await, Some(1));
128            assert_eq!(s.next().await, Some(2));
129            assert_eq!(s.next().await, Some(3));
130            assert_eq!(s.next().await, None);
131        })
132    }
133}