futures_util/stream/
fold.rs

1use core::mem;
2
3use futures_core::{Future, Poll, IntoFuture, Async, Stream};
4use futures_core::task;
5
6/// A future used to collect all the results of a stream into one generic type.
7///
8/// This future is returned by the `Stream::fold` method.
9#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct Fold<S, Fut, T, F> where Fut: IntoFuture {
12    stream: S,
13    f: F,
14    state: State<T, Fut::Future>,
15}
16
17#[derive(Debug)]
18enum State<T, F> where F: Future {
19    /// Placeholder state when doing work
20    Empty,
21
22    /// Ready to process the next stream item; current accumulator is the `T`
23    Ready(T),
24
25    /// Working on a future the process the previous stream item
26    Processing(F),
27}
28
29pub fn new<S, Fut, T, F>(s: S, f: F, t: T) -> Fold<S, Fut, T, F>
30    where S: Stream,
31          F: FnMut(T, S::Item) -> Fut,
32          Fut: IntoFuture<Item = T, Error = S::Error>,
33{
34    Fold {
35        stream: s,
36        f: f,
37        state: State::Ready(t),
38    }
39}
40
41impl<S, Fut, T, F> Future for Fold<S, Fut, T, F>
42    where S: Stream,
43          F: FnMut(T, S::Item) -> Fut,
44          Fut: IntoFuture<Item = T, Error = S::Error>,
45{
46    type Item = T;
47    type Error = S::Error;
48
49    fn poll(&mut self, cx: &mut task::Context) -> Poll<T, S::Error> {
50        loop {
51            match mem::replace(&mut self.state, State::Empty) {
52                State::Empty => panic!("cannot poll Fold twice"),
53                State::Ready(state) => {
54                    match self.stream.poll_next(cx)? {
55                        Async::Ready(Some(e)) => {
56                            let future = (self.f)(state, e);
57                            let future = future.into_future();
58                            self.state = State::Processing(future);
59                        }
60                        Async::Ready(None) => return Ok(Async::Ready(state)),
61                        Async::Pending => {
62                            self.state = State::Ready(state);
63                            return Ok(Async::Pending)
64                        }
65                    }
66                }
67                State::Processing(mut fut) => {
68                    match fut.poll(cx)? {
69                        Async::Ready(state) => self.state = State::Ready(state),
70                        Async::Pending => {
71                            self.state = State::Processing(fut);
72                            return Ok(Async::Pending)
73                        }
74                    }
75                }
76            }
77        }
78    }
79}