futures_util/stream/
flatten.rs

1use futures_core::{Poll, Async, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5/// A combinator used to flatten a stream-of-streams into one long stream of
6/// elements.
7///
8/// This combinator is created by the `Stream::flatten` method.
9#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct Flatten<S>
12    where S: Stream,
13{
14    stream: S,
15    next: Option<S::Item>,
16}
17
18pub fn new<S>(s: S) -> Flatten<S>
19    where S: Stream,
20          S::Item: Stream<Error = S::Error>,
21{
22    Flatten {
23        stream: s,
24        next: None,
25    }
26}
27
28impl<S: Stream> Flatten<S> {
29    /// Acquires a reference to the underlying stream that this combinator is
30    /// pulling from.
31    pub fn get_ref(&self) -> &S {
32        &self.stream
33    }
34
35    /// Acquires a mutable reference to the underlying stream that this
36    /// combinator is pulling from.
37    ///
38    /// Note that care must be taken to avoid tampering with the state of the
39    /// stream which may otherwise confuse this combinator.
40    pub fn get_mut(&mut self) -> &mut S {
41        &mut self.stream
42    }
43
44    /// Consumes this combinator, returning the underlying stream.
45    ///
46    /// Note that this may discard intermediate state of this combinator, so
47    /// care should be taken to avoid losing resources when this is called.
48    pub fn into_inner(self) -> S {
49        self.stream
50    }
51}
52
53// Forwarding impl of Sink from the underlying stream
54impl<S> Sink for Flatten<S>
55    where S: Sink + Stream
56{
57    type SinkItem = S::SinkItem;
58    type SinkError = S::SinkError;
59
60    delegate_sink!(stream);
61}
62
63impl<S> Stream for Flatten<S>
64    where S: Stream,
65          S::Item: Stream,
66          <S::Item as Stream>::Error: From<S::Error>,
67{
68    type Item = <S::Item as Stream>::Item;
69    type Error = <S::Item as Stream>::Error;
70
71    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
72        loop {
73            if self.next.is_none() {
74                match try_ready!(self.stream.poll_next(cx)) {
75                    Some(e) => self.next = Some(e),
76                    None => return Ok(Async::Ready(None)),
77                }
78            }
79            assert!(self.next.is_some());
80            match self.next.as_mut().unwrap().poll_next(cx) {
81                Ok(Async::Ready(None)) => self.next = None,
82                other => return other,
83            }
84        }
85    }
86}