stream_reduce/
reducer.rs

1use core::future::Future;
2use core::pin::Pin;
3use core::task::{Context, Poll};
4use futures::ready;
5use futures::Stream;
6use pin_utils::{unsafe_pinned, unsafe_unpinned};
7use core::fmt;
8
9/// Future for the [`reduce`](super::Reduce::reduce) method.
10#[must_use = "futures do nothing unless you `.await` or poll them"]
11pub struct Reducer<S, T, F, Fut> {
12    stream: S,
13    f: F,
14    accum: Option<T>,
15    future: Option<Fut>,
16}
17
18impl<S, T, F, Fut> Reducer<S, T, F, Fut>
19where
20    S: Stream,
21    F: FnMut(T, S::Item) -> Fut,
22    Fut: Future<Output = T>,
23{
24    unsafe_pinned!(stream: S);
25    unsafe_unpinned!(f: F);
26    unsafe_unpinned!(accum: Option<T>);
27    unsafe_pinned!(future: Option<Fut>);
28
29    pub(super) fn new(stream: S, f: F) -> Self {
30        Self {
31            stream,
32            f,
33            accum: None,
34            future: None,
35        }
36    }
37}
38
39impl<S, T, F, Fut> Future for Reducer<S, T, F, Fut>
40where
41    S: Stream<Item = T>,
42    F: FnMut(T, S::Item) -> Fut,
43    Fut: Future<Output = T>,
44{
45    type Output = Option<T>;
46
47    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
48        loop {
49            if self.accum.is_none() {
50                if self.future.is_none() {
51                    let first = ready!(self.as_mut().stream().poll_next(cx));
52                    if first.is_none() {
53                        return Poll::Ready(None);
54                    }
55                    *self.as_mut().accum() = first;
56                } else {
57                    let accum = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
58                    *self.as_mut().accum() = Some(accum);
59                    self.as_mut().future().set(None);
60                }
61            }
62
63            let item = ready!(self.as_mut().stream().poll_next(cx));
64            let accum = self
65                .as_mut()
66                .accum()
67                .take()
68                .expect("Reducer polled after completion");
69
70            if let Some(e) = item {
71                let future = (self.as_mut().f())(accum, e);
72                self.as_mut().future().set(Some(future));
73            } else {
74                return Poll::Ready(Some(accum));
75            }
76        }
77    }
78}
79
80impl<S: Unpin, T, F, Fut: Unpin> Unpin for Reducer<S, T, F, Fut> {}
81
82impl<S, T, F, Fut> fmt::Debug for Reducer<S, T, F, Fut>
83where
84    S: fmt::Debug,
85    T: fmt::Debug,
86    Fut: fmt::Debug,
87{
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        f.debug_struct("Reducer")
90            .field("stream", &self.stream)
91            .field("accum", &self.accum)
92            .field("future", &self.future)
93            .finish()
94    }
95}