async_std/stream/stream/
chain.rs

1use std::pin::Pin;
2
3use pin_project_lite::pin_project;
4
5use super::fuse::Fuse;
6use crate::prelude::*;
7use crate::task::{Context, Poll};
8
9pin_project! {
10    /// A stream that chains two streams one after another.
11    ///
12    /// This `struct` is created by the [`chain`] method on [`Stream`]. See its
13    /// documentation for more.
14    ///
15    /// [`chain`]: trait.Stream.html#method.chain
16    /// [`Stream`]: trait.Stream.html
17    #[derive(Debug)]
18    pub struct Chain<S, U> {
19        #[pin]
20        first: Fuse<S>,
21        #[pin]
22        second: Fuse<U>,
23    }
24}
25
26impl<S: Stream, U: Stream> Chain<S, U> {
27    pub(super) fn new(first: S, second: U) -> Self {
28        Self {
29            first: first.fuse(),
30            second: second.fuse(),
31        }
32    }
33}
34
35impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
36    type Item = S::Item;
37
38    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
39        let mut this = self.project();
40        if !this.first.done {
41            let next = futures_core::ready!(this.first.as_mut().poll_next(cx));
42            if let Some(next) = next {
43                return Poll::Ready(Some(next));
44            }
45        }
46
47        if !this.second.done {
48            let next = futures_core::ready!(this.second.as_mut().poll_next(cx));
49            if let Some(next) = next {
50                return Poll::Ready(Some(next));
51            }
52        }
53
54        if this.first.done && this.second.done {
55            return Poll::Ready(None);
56        }
57
58        Poll::Pending
59    }
60}