futures_concurrency/stream/chain/
array.rs1use core::fmt;
2use core::pin::Pin;
3use core::task::{Context, Poll};
4
5use futures_core::Stream;
6use pin_project::pin_project;
7
8use crate::utils;
9
10use super::Chain as ChainTrait;
11
12#[pin_project]
20pub struct Chain<S, const N: usize> {
21 #[pin]
22 streams: [S; N],
23 index: usize,
24 len: usize,
25 done: bool,
26}
27
28impl<S: Stream, const N: usize> Stream for Chain<S, N> {
29 type Item = S::Item;
30
31 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32 let mut this = self.project();
33
34 assert!(!*this.done, "Stream should not be polled after completion");
35
36 loop {
37 if this.index == this.len {
38 *this.done = true;
39 return Poll::Ready(None);
40 }
41 let stream = utils::iter_pin_mut(this.streams.as_mut())
42 .nth(*this.index)
43 .unwrap();
44 match stream.poll_next(cx) {
45 Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
46 Poll::Ready(None) => {
47 *this.index += 1;
48 continue;
49 }
50 Poll::Pending => return Poll::Pending,
51 }
52 }
53 }
54}
55
56impl<S, const N: usize> fmt::Debug for Chain<S, N>
57where
58 S: Stream + fmt::Debug,
59{
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 f.debug_list().entries(self.streams.iter()).finish()
62 }
63}
64
65impl<S: Stream, const N: usize> ChainTrait for [S; N] {
66 type Item = S::Item;
67
68 type Stream = Chain<S, N>;
69
70 fn chain(self) -> Self::Stream {
71 Chain {
72 len: self.len(),
73 streams: self,
74 index: 0,
75 done: false,
76 }
77 }
78}
79
80#[cfg(test)]
81mod tests {
82 use super::*;
83 use futures_lite::future::block_on;
84 use futures_lite::prelude::*;
85 use futures_lite::stream;
86
87 #[test]
88 fn chain_3() {
89 block_on(async {
90 let a = stream::once(1);
91 let b = stream::once(2);
92 let c = stream::once(3);
93 let mut s = [a, b, c].chain();
94
95 assert_eq!(s.next().await, Some(1));
96 assert_eq!(s.next().await, Some(2));
97 assert_eq!(s.next().await, Some(3));
98 assert_eq!(s.next().await, None);
99 })
100 }
101}