1use core::future::Future;
2use core::pin::Pin;
3use core::task::{Context, Poll};
4use futures::ready;
5use futures::Stream;
6use pin_utils::{unsafe_pinned, unsafe_unpinned};
7use core::fmt;
8
9#[must_use = "futures do nothing unless you `.await` or poll them"]
11pub struct Reducer<S, T, F, Fut> {
12 stream: S,
13 f: F,
14 accum: Option<T>,
15 future: Option<Fut>,
16}
17
18impl<S, T, F, Fut> Reducer<S, T, F, Fut>
19where
20 S: Stream,
21 F: FnMut(T, S::Item) -> Fut,
22 Fut: Future<Output = T>,
23{
24 unsafe_pinned!(stream: S);
25 unsafe_unpinned!(f: F);
26 unsafe_unpinned!(accum: Option<T>);
27 unsafe_pinned!(future: Option<Fut>);
28
29 pub(super) fn new(stream: S, f: F) -> Self {
30 Self {
31 stream,
32 f,
33 accum: None,
34 future: None,
35 }
36 }
37}
38
39impl<S, T, F, Fut> Future for Reducer<S, T, F, Fut>
40where
41 S: Stream<Item = T>,
42 F: FnMut(T, S::Item) -> Fut,
43 Fut: Future<Output = T>,
44{
45 type Output = Option<T>;
46
47 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
48 loop {
49 if self.accum.is_none() {
50 if self.future.is_none() {
51 let first = ready!(self.as_mut().stream().poll_next(cx));
52 if first.is_none() {
53 return Poll::Ready(None);
54 }
55 *self.as_mut().accum() = first;
56 } else {
57 let accum = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
58 *self.as_mut().accum() = Some(accum);
59 self.as_mut().future().set(None);
60 }
61 }
62
63 let item = ready!(self.as_mut().stream().poll_next(cx));
64 let accum = self
65 .as_mut()
66 .accum()
67 .take()
68 .expect("Reducer polled after completion");
69
70 if let Some(e) = item {
71 let future = (self.as_mut().f())(accum, e);
72 self.as_mut().future().set(Some(future));
73 } else {
74 return Poll::Ready(Some(accum));
75 }
76 }
77 }
78}
79
80impl<S: Unpin, T, F, Fut: Unpin> Unpin for Reducer<S, T, F, Fut> {}
81
82impl<S, T, F, Fut> fmt::Debug for Reducer<S, T, F, Fut>
83where
84 S: fmt::Debug,
85 T: fmt::Debug,
86 Fut: fmt::Debug,
87{
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 f.debug_struct("Reducer")
90 .field("stream", &self.stream)
91 .field("accum", &self.accum)
92 .field("future", &self.future)
93 .finish()
94 }
95}