futures_concurrency/stream/chain/
vec.rs1#[cfg(all(feature = "alloc", not(feature = "std")))]
2use alloc::vec::Vec;
3
4use core::fmt;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7
8use futures_core::Stream;
9use pin_project::pin_project;
10
11use crate::utils;
12
13use super::Chain as ChainTrait;
14
15#[pin_project]
23pub struct Chain<S> {
24 #[pin]
25 streams: Vec<S>,
26 index: usize,
27 len: usize,
28 done: bool,
29}
30
31impl<S: Stream> Stream for Chain<S> {
32 type Item = S::Item;
33
34 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
35 let mut this = self.project();
36
37 assert!(!*this.done, "Stream should not be polled after completion");
38
39 loop {
40 if this.index == this.len {
41 *this.done = true;
42 return Poll::Ready(None);
43 }
44 let stream = utils::iter_pin_mut_vec(this.streams.as_mut())
45 .nth(*this.index)
46 .unwrap();
47 match stream.poll_next(cx) {
48 Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
49 Poll::Ready(None) => {
50 *this.index += 1;
51 continue;
52 }
53 Poll::Pending => return Poll::Pending,
54 }
55 }
56 }
57}
58
59impl<S> fmt::Debug for Chain<S>
60where
61 S: Stream + fmt::Debug,
62{
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 f.debug_list().entries(self.streams.iter()).finish()
65 }
66}
67
68impl<S: Stream> ChainTrait for Vec<S> {
69 type Item = S::Item;
70
71 type Stream = Chain<S>;
72
73 fn chain(self) -> Self::Stream {
74 Chain {
75 len: self.len(),
76 streams: self,
77 index: 0,
78 done: false,
79 }
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 use super::*;
86 use alloc::vec;
87 use futures_lite::future::block_on;
88 use futures_lite::prelude::*;
89 use futures_lite::stream;
90
91 #[test]
92 fn chain_3() {
93 block_on(async {
94 let a = stream::once(1);
95 let b = stream::once(2);
96 let c = stream::once(3);
97 let mut s = vec![a, b, c].chain();
98
99 assert_eq!(s.next().await, Some(1));
100 assert_eq!(s.next().await, Some(2));
101 assert_eq!(s.next().await, Some(3));
102 assert_eq!(s.next().await, None);
103 })
104 }
105}