futures_util/future/
flatten_stream.rs

1use core::fmt;
2
3use futures_core::{Async, Future, Poll, Stream};
4use futures_core::task;
5
6/// Future for the `flatten_stream` combinator, flattening a
7/// future-of-a-stream to get just the result of the final stream as a stream.
8///
9/// This is created by the `Future::flatten_stream` method.
10#[must_use = "streams do nothing unless polled"]
11pub struct FlattenStream<F>
12    where F: Future,
13          <F as Future>::Item: Stream<Error=F::Error>,
14{
15    state: State<F>
16}
17
18impl<F> fmt::Debug for FlattenStream<F>
19    where F: Future + fmt::Debug,
20          <F as Future>::Item: Stream<Error=F::Error> + fmt::Debug,
21{
22    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
23        fmt.debug_struct("FlattenStream")
24            .field("state", &self.state)
25            .finish()
26    }
27}
28
29pub fn new<F>(f: F) -> FlattenStream<F>
30    where F: Future,
31          <F as Future>::Item: Stream<Error=F::Error>,
32{
33    FlattenStream {
34        state: State::Future(f)
35    }
36}
37
38#[derive(Debug)]
39enum State<F>
40    where F: Future,
41          <F as Future>::Item: Stream<Error=F::Error>,
42{
43    // future is not yet called or called and not ready
44    Future(F),
45    // future resolved to Stream
46    Stream(F::Item),
47    // EOF after future resolved to error
48    Eof,
49    // after EOF after future resolved to error
50    Done,
51}
52
53impl<F> Stream for FlattenStream<F>
54    where F: Future,
55          <F as Future>::Item: Stream<Error=F::Error>,
56{
57    type Item = <F::Item as Stream>::Item;
58    type Error = <F::Item as Stream>::Error;
59
60    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
61        loop {
62            let (next_state, ret_opt) = match self.state {
63                State::Future(ref mut f) => {
64                    match f.poll(cx) {
65                        Ok(Async::Pending) => {
66                            // State is not changed, early return.
67                            return Ok(Async::Pending)
68                        },
69                        Ok(Async::Ready(stream)) => {
70                            // Future resolved to stream.
71                            // We do not return, but poll that
72                            // stream in the next loop iteration.
73                            (State::Stream(stream), None)
74                        }
75                        Err(e) => {
76                            (State::Eof, Some(Err(e)))
77                        }
78                    }
79                }
80                State::Stream(ref mut s) => {
81                    // Just forward call to the stream,
82                    // do not track its state.
83                    return s.poll_next(cx);
84                }
85                State::Eof => {
86                    (State::Done, Some(Ok(Async::Ready(None))))
87                }
88                State::Done => {
89                    panic!("poll called after eof");
90                }
91            };
92
93            self.state = next_state;
94            if let Some(ret) = ret_opt {
95                return ret;
96            }
97        }
98    }
99}
100