1use crate::op_prelude::*;
2
3const POLL_AFTER_COMPLETED_MSG: &'static str = "call to poll after completed!";
4
5pin_project! {
6 #[must_use = "futures do nothing unless polled"]
8 pub struct TryFoldMut<S, T, F, Fut> {
9 #[pin]
10 upstream: S,
11 #[pin]
12 pending_future: Option<Fut>,
13 state: Option<T>,
14 handler: F,
15 }
16}
17
18impl<S, T, F, Fut> TryFoldMut<S, T, F, Fut>
19where
20 S: TryStream + FusedStream,
21 F: FnMut(&mut T, S::Ok) -> Fut,
22 Fut: TryFuture<Ok=(), Error=S::Error>,
23{
24 pub(crate) fn new(upstream: S, initial: T, handler: F) -> Self {
25 Self {
26 upstream,
27 pending_future: None,
28 state: Some(initial),
29 handler,
30 }
31 }
32}
33
34impl<S, T, F, Fut> Future for TryFoldMut<S, T, F, Fut>
35where
36 S: TryStream + FusedStream,
37 F: FnMut(&mut T, S::Ok) -> Fut,
38 Fut: TryFuture<Ok=(), Error=S::Error>,
39{
40 type Output = Result<T, S::Error>;
41
42 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
43 let mut this = self.project();
44 Poll::Ready(loop {
45 if let Some(future) = this.pending_future.as_mut().as_pin_mut() {
47 let out = ready!(future.try_poll(cx));
48 this.pending_future.set(None);
49 if let Err(err) = out {
50 this.state.take();
51 break Err(err);
52 }
53 }
54
55 match ready!(this.upstream.as_mut().try_poll_next(cx)) {
57 Some(Ok(next)) => {
59 let state = this.state.as_mut().expect(POLL_AFTER_COMPLETED_MSG);
60 let future = (this.handler)(state, next);
61 this.pending_future.set(Some(future));
62 }
63 Some(Err(err)) => {
65 this.state.take();
66 break Err(err);
67 },
68 None => {
70 break Ok(this.state.take().expect(POLL_AFTER_COMPLETED_MSG));
71 }
72 }
73 })
74 }
75}
76
77#[cfg(feature = "sink")]
78impl<S, T, F, Fut, Item, E> Sink<Item> for TryFoldMut<S, T, F, Fut>
79where
80 S: Sink<Item, Error=E> + Stream + FusedStream,
81 F: FnMut(&mut T, S::Item) -> Fut,
82 Fut: Future<Output=()>,
83{
84 delegate_sink!(upstream, E, Item);
85}
86
87pin_project! {
88 #[must_use = "futures do nothing unless polled"]
90 pub struct FoldMut<S, T, F, Fut> {
91 #[pin]
92 upstream: S,
93 #[pin]
94 pending_future: Option<Fut>,
95 state: Option<T>,
96 handler: F,
97 }
98}
99
100impl<S, T, F, Fut> FoldMut<S, T, F, Fut>
101where
102 S: Stream + FusedStream,
103 F: FnMut(&mut T, S::Item) -> Fut,
104 Fut: Future<Output=()>,
105{
106 pub(crate) fn new(upstream: S, initial: T, handler: F) -> Self {
107 Self {
108 upstream,
109 pending_future: None,
110 state: Some(initial),
111 handler,
112 }
113 }
114}
115
116
117impl<S, T, F, Fut> Future for FoldMut<S, T, F, Fut>
118where
119 S: Stream + FusedStream,
120 F: FnMut(&mut T, S::Item) -> Fut,
121 Fut: Future<Output=()>,
122{
123 type Output = T;
124
125 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
126 let mut this = self.project();
127 Poll::Ready(loop {
128 if let Some(future) = this.pending_future.as_mut().as_pin_mut() {
130 ready!(future.poll(cx));
131 this.pending_future.set(None);
132 }
133
134 match ready!(this.upstream.as_mut().poll_next(cx)) {
136 Some(next) => {
138 let state = this.state.as_mut().expect(POLL_AFTER_COMPLETED_MSG);
139 let future = (this.handler)(state, next);
140 this.pending_future.set(Some(future));
141 }
142 None => {
144 break this.state.take().expect(POLL_AFTER_COMPLETED_MSG)
145 }
146 }
147
148 })
149 }
150}
151
152#[cfg(feature = "sink")]
153impl<S, T, F, Fut, Item> Sink<Item> for FoldMut<S, T, F, Fut>
154where
155 S: Sink<Item> + Stream + FusedStream,
156 F: FnMut(&mut T, S::Item) -> Fut,
157 Fut: Future<Output=()>,
158{
159 delegate_sink!(upstream, S::Error, Item);
160}