futures_concurrency/stream/chain/
array.rs

1use core::fmt;
2use core::pin::Pin;
3use core::task::{Context, Poll};
4
5use futures_core::Stream;
6use pin_project::pin_project;
7
8use crate::utils;
9
10use super::Chain as ChainTrait;
11
12/// A stream that chains multiple streams one after another.
13///
14/// This `struct` is created by the [`chain`] method on the [`Chain`] trait. See its
15/// documentation for more.
16///
17/// [`chain`]: trait.Chain.html#method.merge
18/// [`Chain`]: trait.Chain.html
19#[pin_project]
20pub struct Chain<S, const N: usize> {
21    #[pin]
22    streams: [S; N],
23    index: usize,
24    len: usize,
25    done: bool,
26}
27
28impl<S: Stream, const N: usize> Stream for Chain<S, N> {
29    type Item = S::Item;
30
31    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32        let mut this = self.project();
33
34        assert!(!*this.done, "Stream should not be polled after completion");
35
36        loop {
37            if this.index == this.len {
38                *this.done = true;
39                return Poll::Ready(None);
40            }
41            let stream = utils::iter_pin_mut(this.streams.as_mut())
42                .nth(*this.index)
43                .unwrap();
44            match stream.poll_next(cx) {
45                Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
46                Poll::Ready(None) => {
47                    *this.index += 1;
48                    continue;
49                }
50                Poll::Pending => return Poll::Pending,
51            }
52        }
53    }
54}
55
56impl<S, const N: usize> fmt::Debug for Chain<S, N>
57where
58    S: Stream + fmt::Debug,
59{
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        f.debug_list().entries(self.streams.iter()).finish()
62    }
63}
64
65impl<S: Stream, const N: usize> ChainTrait for [S; N] {
66    type Item = S::Item;
67
68    type Stream = Chain<S, N>;
69
70    fn chain(self) -> Self::Stream {
71        Chain {
72            len: self.len(),
73            streams: self,
74            index: 0,
75            done: false,
76        }
77    }
78}
79
80#[cfg(test)]
81mod tests {
82    use super::*;
83    use futures_lite::future::block_on;
84    use futures_lite::prelude::*;
85    use futures_lite::stream;
86
87    #[test]
88    fn chain_3() {
89        block_on(async {
90            let a = stream::once(1);
91            let b = stream::once(2);
92            let c = stream::once(3);
93            let mut s = [a, b, c].chain();
94
95            assert_eq!(s.next().await, Some(1));
96            assert_eq!(s.next().await, Some(2));
97            assert_eq!(s.next().await, Some(3));
98            assert_eq!(s.next().await, None);
99        })
100    }
101}