futures_util/stream/
fold.rs1use core::mem;
2
3use futures_core::{Future, Poll, IntoFuture, Async, Stream};
4use futures_core::task;
5
6#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct Fold<S, Fut, T, F> where Fut: IntoFuture {
12 stream: S,
13 f: F,
14 state: State<T, Fut::Future>,
15}
16
17#[derive(Debug)]
18enum State<T, F> where F: Future {
19 Empty,
21
22 Ready(T),
24
25 Processing(F),
27}
28
29pub fn new<S, Fut, T, F>(s: S, f: F, t: T) -> Fold<S, Fut, T, F>
30 where S: Stream,
31 F: FnMut(T, S::Item) -> Fut,
32 Fut: IntoFuture<Item = T, Error = S::Error>,
33{
34 Fold {
35 stream: s,
36 f: f,
37 state: State::Ready(t),
38 }
39}
40
41impl<S, Fut, T, F> Future for Fold<S, Fut, T, F>
42 where S: Stream,
43 F: FnMut(T, S::Item) -> Fut,
44 Fut: IntoFuture<Item = T, Error = S::Error>,
45{
46 type Item = T;
47 type Error = S::Error;
48
49 fn poll(&mut self, cx: &mut task::Context) -> Poll<T, S::Error> {
50 loop {
51 match mem::replace(&mut self.state, State::Empty) {
52 State::Empty => panic!("cannot poll Fold twice"),
53 State::Ready(state) => {
54 match self.stream.poll_next(cx)? {
55 Async::Ready(Some(e)) => {
56 let future = (self.f)(state, e);
57 let future = future.into_future();
58 self.state = State::Processing(future);
59 }
60 Async::Ready(None) => return Ok(Async::Ready(state)),
61 Async::Pending => {
62 self.state = State::Ready(state);
63 return Ok(Async::Pending)
64 }
65 }
66 }
67 State::Processing(mut fut) => {
68 match fut.poll(cx)? {
69 Async::Ready(state) => self.state = State::Ready(state),
70 Async::Pending => {
71 self.state = State::Processing(fut);
72 return Ok(Async::Pending)
73 }
74 }
75 }
76 }
77 }
78 }
79}