async_std/stream/stream/
flatten.rs

1use std::fmt;
2use std::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::stream::{IntoStream, Stream};
7use crate::task::{Context, Poll};
8
9pin_project! {
10    /// A stream that flattens one level of nesting in an stream of things that can be turned into
11    /// streams.
12    ///
13    /// This `struct` is created by the [`flatten`] method on [`Stream`]. See its
14    /// documentation for more.
15    ///
16    /// [`flatten`]: trait.Stream.html#method.flatten
17    /// [`Stream`]: trait.Stream.html
18    pub struct Flatten<S>
19    where
20        S: Stream,
21        S::Item: IntoStream,
22    {
23        #[pin]
24        stream: S,
25        #[pin]
26        inner_stream: Option<<S::Item as IntoStream>::IntoStream>,
27    }
28}
29
30impl<S> Flatten<S>
31where
32    S: Stream,
33    S::Item: IntoStream,
34{
35    pub(super) fn new(stream: S) -> Self {
36        Self {
37            stream,
38            inner_stream: None,
39        }
40    }
41}
42
43impl<S, U> Stream for Flatten<S>
44where
45    S: Stream,
46    S::Item: IntoStream<IntoStream = U, Item = U::Item>,
47    U: Stream,
48{
49    type Item = U::Item;
50
51    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
52        let mut this = self.project();
53        loop {
54            if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
55                if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) {
56                    return Poll::Ready(item);
57                }
58            }
59
60            match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
61                None => return Poll::Ready(None),
62                Some(inner) => this.inner_stream.set(Some(inner.into_stream())),
63            }
64        }
65    }
66}
67
68impl<S, U> fmt::Debug for Flatten<S>
69where
70    S: fmt::Debug + Stream,
71    S::Item: IntoStream<IntoStream = U, Item = U::Item>,
72    U: fmt::Debug + Stream,
73{
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        f.debug_struct("Flatten")
76            .field("inner", &self.stream)
77            .finish()
78    }
79}