stream_combinators/
stream_sequence.rs

1use futures::{Poll, Async, Future};
2use futures::stream::{Stream, Chain, StreamFuture, Once, once};
3use core::mem;
4
5/// Drives a stream until just before it produces a value,
6/// then performs a transformation on the stream and returns
7/// the transformed stream. If the driven stream reaches its
8/// end without producing a value, the transformation function
9/// is not called and the returned stream also ends.
10///
11/// This combinator is conceptually equivalent to calling
12/// `.into_future().and_then()` on a stream and properly passing
13/// both the returned value and the stream through a stream
14/// transformation function.
15
16/// A stream that wraps and transforms another stream
17/// once it has produced a value
18#[derive(Debug)]
19#[must_use = "streams do nothing unless polled"]
20pub struct Sequence<S, F, U>
21    where S: Stream
22{
23    state: SeqState<S, F, U>,
24}
25
26// The type of the stream passed to the stream transformation function
27// (eventually we'll get rid of this...)
28type OutputStream<S> = Chain<Once<<S as Stream>::Item, <S as Stream>::Error>, S>;
29
30#[derive(Debug)]
31enum SeqState<S, F, U>
32    where S: Stream
33{
34    /// Wrapped stream is done
35    Done,
36    /// Waiting for the wrapped stream to produce a value
37    Waiting((StreamFuture<S>, F)),
38    /// Streaming transformed stream values
39    Streaming(U),
40}
41
42impl<S, F, U> Sequence<S, F, U>
43    where S: Stream,
44          F: FnOnce(OutputStream<S>) -> U,
45          U: Stream
46{
47    pub fn new(stream: S, f: F) -> Sequence<S, F, U> {
48        Sequence { state: SeqState::Waiting((stream.into_future(), f)) }
49    }
50
51    fn poll_stream(&mut self, mut stream: U) -> Poll<Option<U::Item>, U::Error> {
52        let result = stream.poll();
53        self.state = SeqState::Streaming(stream);
54        result
55    }
56}
57
58pub trait SequenceStream: Stream + Sized {
59    /// Create a `Sequence` stream from a stream.
60    ///
61    /// Takes a transformation function which will be applied to
62    /// the stream immediately before it produces a value.
63    fn sequence<F, U>(self, f: F) -> Sequence<Self, F, U>
64        where F: FnOnce(OutputStream<Self>) -> U,
65              U: Stream
66    {
67        Sequence::new(self, f)
68    }
69}
70
71/// Implement `sequence` for all `Stream`s
72impl<S> SequenceStream for S where S: Stream {}
73
74impl<S, F, U> Stream for Sequence<S, F, U>
75    where S: Stream,
76          F: FnOnce(OutputStream<S>) -> U,
77          U: Stream
78{
79    type Item = U::Item;
80    type Error = U::Error;
81
82    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
83        match mem::replace(&mut self.state, SeqState::Done) {
84            SeqState::Done => Ok(Async::Ready(None)),
85            SeqState::Waiting((mut stream_future, f)) => {
86                match stream_future.poll() {
87                    Ok(Async::Ready((Some(val), stream))) => {
88                        let stream = once(Ok(val)).chain(stream);
89                        let stream = f(stream);
90                        self.poll_stream(stream)
91                    }
92                    Err((err, stream)) => {
93                        let stream = once(Err(err)).chain(stream);
94                        let stream = f(stream);
95                        self.poll_stream(stream)
96                    }
97                    Ok(Async::NotReady) => {
98                        self.state = SeqState::Waiting((stream_future, f));
99                        Ok(Async::NotReady)
100                    }
101                    Ok(Async::Ready((None, _stream))) => {
102                        self.state = SeqState::Done;
103                        Ok(Async::Ready(None))
104                    }
105                }
106            }
107            SeqState::Streaming(stream) => self.poll_stream(stream),
108        }
109    }
110}