Skip to main content

jstream_ext/
fold_mut.rs

1use crate::op_prelude::*;
2
3const POLL_AFTER_COMPLETED_MSG: &'static str = "call to poll after completed!";
4
5pin_project! {
6    /// Future for the [`try_fold_mut`](super::JTryStreamExt::try_fold_mut) method
7    #[must_use = "futures do nothing unless polled"]
8    pub struct TryFoldMut<S, T, F, Fut> {
9        #[pin]
10        upstream: S,
11        #[pin]
12        pending_future: Option<Fut>,
13        state: Option<T>,
14        handler: F,
15    }
16}
17
18impl<S, T, F, Fut> TryFoldMut<S, T, F, Fut>
19where
20    S: TryStream + FusedStream,
21    F: FnMut(&mut T, S::Ok) -> Fut,
22    Fut: TryFuture<Ok=(), Error=S::Error>,
23{
24    pub(crate) fn new(upstream: S, initial: T, handler: F) -> Self {
25        Self {
26            upstream,
27            pending_future: None,
28            state: Some(initial),
29            handler,
30        }
31    }
32}
33
34impl<S, T, F, Fut> Future for TryFoldMut<S, T, F, Fut>
35where
36    S: TryStream + FusedStream,
37    F: FnMut(&mut T, S::Ok) -> Fut,
38    Fut: TryFuture<Ok=(), Error=S::Error>,
39{
40    type Output = Result<T, S::Error>;
41
42    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
43        let mut this = self.project();
44        Poll::Ready(loop {
45            // poll future if we have one
46            if let Some(future) = this.pending_future.as_mut().as_pin_mut() {
47                let out = ready!(future.try_poll(cx));
48                this.pending_future.set(None);
49                if let Err(err) = out {
50                    this.state.take();
51                    break Err(err);
52                }
53            }
54
55            // poll upstream
56            match ready!(this.upstream.as_mut().try_poll_next(cx)) {
57                // got something, no error
58                Some(Ok(next)) => {
59                    let state = this.state.as_mut().expect(POLL_AFTER_COMPLETED_MSG);
60                    let future = (this.handler)(state, next);
61                    this.pending_future.set(Some(future));
62                }
63                // got error
64                Some(Err(err)) => {
65                    this.state.take();
66                    break Err(err);
67                },
68                // upstream done
69                None => {
70                    break Ok(this.state.take().expect(POLL_AFTER_COMPLETED_MSG));
71                }
72            }
73        })
74    }
75}
76
77#[cfg(feature = "sink")]
78impl<S, T, F, Fut, Item, E> Sink<Item> for TryFoldMut<S, T, F, Fut>
79where
80    S: Sink<Item, Error=E> + Stream + FusedStream,
81    F: FnMut(&mut T, S::Item) -> Fut,
82    Fut: Future<Output=()>,
83{
84    delegate_sink!(upstream, E, Item);
85}
86
87pin_project! {
88    /// Future for the [`fold_mut`](super::JStreamExt::fold_mut) method
89    #[must_use = "futures do nothing unless polled"]
90    pub struct FoldMut<S, T, F, Fut> {
91        #[pin]
92        upstream: S,
93        #[pin]
94        pending_future: Option<Fut>,
95        state: Option<T>,
96        handler: F,
97    }
98}
99
100impl<S, T, F, Fut> FoldMut<S, T, F, Fut>
101where
102    S: Stream + FusedStream,
103    F: FnMut(&mut T, S::Item) -> Fut,
104    Fut: Future<Output=()>,
105{
106    pub(crate) fn new(upstream: S, initial: T, handler: F) -> Self {
107        Self {
108            upstream,
109            pending_future: None,
110            state: Some(initial),
111            handler,
112        }
113    }
114}
115
116
117impl<S, T, F, Fut> Future for FoldMut<S, T, F, Fut>
118where
119    S: Stream + FusedStream,
120    F: FnMut(&mut T, S::Item) -> Fut,
121    Fut: Future<Output=()>,
122{
123    type Output = T;
124
125    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
126        let mut this = self.project();
127        Poll::Ready(loop {
128            // poll future if we have one
129            if let Some(future) = this.pending_future.as_mut().as_pin_mut() {
130                ready!(future.poll(cx));
131                this.pending_future.set(None);
132            }
133
134            // poll upstream
135            match ready!(this.upstream.as_mut().poll_next(cx)) {
136                // got next item
137                Some(next) => {
138                    let state = this.state.as_mut().expect(POLL_AFTER_COMPLETED_MSG);
139                    let future = (this.handler)(state, next);
140                    this.pending_future.set(Some(future));
141                }
142                // upstream done
143                None => {
144                    break this.state.take().expect(POLL_AFTER_COMPLETED_MSG)
145                }
146            }
147
148        })
149    }
150}
151
152#[cfg(feature = "sink")]
153impl<S, T, F, Fut, Item> Sink<Item> for FoldMut<S, T, F, Fut>
154where
155    S: Sink<Item> + Stream + FusedStream,
156    F: FnMut(&mut T, S::Item) -> Fut,
157    Fut: Future<Output=()>,
158{
159    delegate_sink!(upstream, S::Error, Item);
160}