futures_concurrency/stream/chain/
vec.rs

1#[cfg(all(feature = "alloc", not(feature = "std")))]
2use alloc::vec::Vec;
3
4use core::fmt;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7
8use futures_core::Stream;
9use pin_project::pin_project;
10
11use crate::utils;
12
13use super::Chain as ChainTrait;
14
15/// A stream that chains multiple streams one after another.
16///
17/// This `struct` is created by the [`chain`] method on the [`Chain`] trait. See its
18/// documentation for more.
19///
20/// [`chain`]: trait.Chain.html#method.merge
21/// [`Chain`]: trait.Chain.html
22#[pin_project]
23pub struct Chain<S> {
24    #[pin]
25    streams: Vec<S>,
26    index: usize,
27    len: usize,
28    done: bool,
29}
30
31impl<S: Stream> Stream for Chain<S> {
32    type Item = S::Item;
33
34    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
35        let mut this = self.project();
36
37        assert!(!*this.done, "Stream should not be polled after completion");
38
39        loop {
40            if this.index == this.len {
41                *this.done = true;
42                return Poll::Ready(None);
43            }
44            let stream = utils::iter_pin_mut_vec(this.streams.as_mut())
45                .nth(*this.index)
46                .unwrap();
47            match stream.poll_next(cx) {
48                Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
49                Poll::Ready(None) => {
50                    *this.index += 1;
51                    continue;
52                }
53                Poll::Pending => return Poll::Pending,
54            }
55        }
56    }
57}
58
59impl<S> fmt::Debug for Chain<S>
60where
61    S: Stream + fmt::Debug,
62{
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        f.debug_list().entries(self.streams.iter()).finish()
65    }
66}
67
68impl<S: Stream> ChainTrait for Vec<S> {
69    type Item = S::Item;
70
71    type Stream = Chain<S>;
72
73    fn chain(self) -> Self::Stream {
74        Chain {
75            len: self.len(),
76            streams: self,
77            index: 0,
78            done: false,
79        }
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use super::*;
86    use alloc::vec;
87    use futures_lite::future::block_on;
88    use futures_lite::prelude::*;
89    use futures_lite::stream;
90
91    #[test]
92    fn chain_3() {
93        block_on(async {
94            let a = stream::once(1);
95            let b = stream::once(2);
96            let c = stream::once(3);
97            let mut s = vec![a, b, c].chain();
98
99            assert_eq!(s.next().await, Some(1));
100            assert_eq!(s.next().await, Some(2));
101            assert_eq!(s.next().await, Some(3));
102            assert_eq!(s.next().await, None);
103        })
104    }
105}