futures_util/stream/
chain.rs

1use core::mem;
2
3use futures_core::{Stream, Async, Poll};
4use futures_core::task;
5
6/// State of chain stream.
7#[derive(Debug)]
8enum State<S1, S2> {
9    /// Emitting elements of first stream
10    First(S1, S2),
11    /// Emitting elements of second stream
12    Second(S2),
13    /// Temporary value to replace first with second
14    Temp,
15}
16
17/// An adapter for chaining the output of two streams.
18///
19/// The resulting stream produces items from first stream and then
20/// from second stream.
21#[derive(Debug)]
22#[must_use = "streams do nothing unless polled"]
23pub struct Chain<S1, S2> {
24    state: State<S1, S2>
25}
26
27pub fn new<S1, S2>(s1: S1, s2: S2) -> Chain<S1, S2>
28    where S1: Stream, S2: Stream<Item=S1::Item, Error=S1::Error>,
29{
30    Chain { state: State::First(s1, s2) }
31}
32
33impl<S1, S2> Stream for Chain<S1, S2>
34    where S1: Stream, S2: Stream<Item=S1::Item, Error=S1::Error>,
35{
36    type Item = S1::Item;
37    type Error = S1::Error;
38
39    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
40        loop {
41            match self.state {
42                State::First(ref mut s1, ref _s2) => match s1.poll_next(cx) {
43                    Ok(Async::Ready(None)) => (), // roll
44                    x => return x,
45                },
46                State::Second(ref mut s2) => return s2.poll_next(cx),
47                State::Temp => unreachable!(),
48            }
49
50            self.state = match mem::replace(&mut self.state, State::Temp) {
51                State::First(_s1, s2) => State::Second(s2),
52                _ => unreachable!(),
53            };
54        }
55    }
56}