futures_util/future/
flatten_stream.rs1use core::fmt;
2
3use futures_core::{Async, Future, Poll, Stream};
4use futures_core::task;
5
6#[must_use = "streams do nothing unless polled"]
11pub struct FlattenStream<F>
12 where F: Future,
13 <F as Future>::Item: Stream<Error=F::Error>,
14{
15 state: State<F>
16}
17
18impl<F> fmt::Debug for FlattenStream<F>
19 where F: Future + fmt::Debug,
20 <F as Future>::Item: Stream<Error=F::Error> + fmt::Debug,
21{
22 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
23 fmt.debug_struct("FlattenStream")
24 .field("state", &self.state)
25 .finish()
26 }
27}
28
29pub fn new<F>(f: F) -> FlattenStream<F>
30 where F: Future,
31 <F as Future>::Item: Stream<Error=F::Error>,
32{
33 FlattenStream {
34 state: State::Future(f)
35 }
36}
37
38#[derive(Debug)]
39enum State<F>
40 where F: Future,
41 <F as Future>::Item: Stream<Error=F::Error>,
42{
43 Future(F),
45 Stream(F::Item),
47 Eof,
49 Done,
51}
52
53impl<F> Stream for FlattenStream<F>
54 where F: Future,
55 <F as Future>::Item: Stream<Error=F::Error>,
56{
57 type Item = <F::Item as Stream>::Item;
58 type Error = <F::Item as Stream>::Error;
59
60 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
61 loop {
62 let (next_state, ret_opt) = match self.state {
63 State::Future(ref mut f) => {
64 match f.poll(cx) {
65 Ok(Async::Pending) => {
66 return Ok(Async::Pending)
68 },
69 Ok(Async::Ready(stream)) => {
70 (State::Stream(stream), None)
74 }
75 Err(e) => {
76 (State::Eof, Some(Err(e)))
77 }
78 }
79 }
80 State::Stream(ref mut s) => {
81 return s.poll_next(cx);
84 }
85 State::Eof => {
86 (State::Done, Some(Ok(Async::Ready(None))))
87 }
88 State::Done => {
89 panic!("poll called after eof");
90 }
91 };
92
93 self.state = next_state;
94 if let Some(ret) = ret_opt {
95 return ret;
96 }
97 }
98 }
99}
100