futures_util/stream/
chain.rs1use core::mem;
2
3use futures_core::{Stream, Async, Poll};
4use futures_core::task;
5
6#[derive(Debug)]
8enum State<S1, S2> {
9 First(S1, S2),
11 Second(S2),
13 Temp,
15}
16
17#[derive(Debug)]
22#[must_use = "streams do nothing unless polled"]
23pub struct Chain<S1, S2> {
24 state: State<S1, S2>
25}
26
27pub fn new<S1, S2>(s1: S1, s2: S2) -> Chain<S1, S2>
28 where S1: Stream, S2: Stream<Item=S1::Item, Error=S1::Error>,
29{
30 Chain { state: State::First(s1, s2) }
31}
32
33impl<S1, S2> Stream for Chain<S1, S2>
34 where S1: Stream, S2: Stream<Item=S1::Item, Error=S1::Error>,
35{
36 type Item = S1::Item;
37 type Error = S1::Error;
38
39 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
40 loop {
41 match self.state {
42 State::First(ref mut s1, ref _s2) => match s1.poll_next(cx) {
43 Ok(Async::Ready(None)) => (), x => return x,
45 },
46 State::Second(ref mut s2) => return s2.poll_next(cx),
47 State::Temp => unreachable!(),
48 }
49
50 self.state = match mem::replace(&mut self.state, State::Temp) {
51 State::First(_s1, s2) => State::Second(s2),
52 _ => unreachable!(),
53 };
54 }
55 }
56}