stream_combinators/
stream_sequence.rs1use futures::{Poll, Async, Future};
2use futures::stream::{Stream, Chain, StreamFuture, Once, once};
3use core::mem;
4
5#[derive(Debug)]
19#[must_use = "streams do nothing unless polled"]
20pub struct Sequence<S, F, U>
21 where S: Stream
22{
23 state: SeqState<S, F, U>,
24}
25
26type OutputStream<S> = Chain<Once<<S as Stream>::Item, <S as Stream>::Error>, S>;
29
30#[derive(Debug)]
31enum SeqState<S, F, U>
32 where S: Stream
33{
34 Done,
36 Waiting((StreamFuture<S>, F)),
38 Streaming(U),
40}
41
42impl<S, F, U> Sequence<S, F, U>
43 where S: Stream,
44 F: FnOnce(OutputStream<S>) -> U,
45 U: Stream
46{
47 pub fn new(stream: S, f: F) -> Sequence<S, F, U> {
48 Sequence { state: SeqState::Waiting((stream.into_future(), f)) }
49 }
50
51 fn poll_stream(&mut self, mut stream: U) -> Poll<Option<U::Item>, U::Error> {
52 let result = stream.poll();
53 self.state = SeqState::Streaming(stream);
54 result
55 }
56}
57
58pub trait SequenceStream: Stream + Sized {
59 fn sequence<F, U>(self, f: F) -> Sequence<Self, F, U>
64 where F: FnOnce(OutputStream<Self>) -> U,
65 U: Stream
66 {
67 Sequence::new(self, f)
68 }
69}
70
71impl<S> SequenceStream for S where S: Stream {}
73
74impl<S, F, U> Stream for Sequence<S, F, U>
75 where S: Stream,
76 F: FnOnce(OutputStream<S>) -> U,
77 U: Stream
78{
79 type Item = U::Item;
80 type Error = U::Error;
81
82 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
83 match mem::replace(&mut self.state, SeqState::Done) {
84 SeqState::Done => Ok(Async::Ready(None)),
85 SeqState::Waiting((mut stream_future, f)) => {
86 match stream_future.poll() {
87 Ok(Async::Ready((Some(val), stream))) => {
88 let stream = once(Ok(val)).chain(stream);
89 let stream = f(stream);
90 self.poll_stream(stream)
91 }
92 Err((err, stream)) => {
93 let stream = once(Err(err)).chain(stream);
94 let stream = f(stream);
95 self.poll_stream(stream)
96 }
97 Ok(Async::NotReady) => {
98 self.state = SeqState::Waiting((stream_future, f));
99 Ok(Async::NotReady)
100 }
101 Ok(Async::Ready((None, _stream))) => {
102 self.state = SeqState::Done;
103 Ok(Async::Ready(None))
104 }
105 }
106 }
107 SeqState::Streaming(stream) => self.poll_stream(stream),
108 }
109 }
110}