multipart_write/stream/
feed_multipart_write.rs

1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project_lite::pin_project! {
10    /// Stream for  [`feed_multipart_write`].
11    ///
12    /// [`feed_multipart_write`]: super::MultipartStreamExt::feed_multipart_write
13    #[must_use = "futures do nothing unless polled"]
14    pub struct FeedMultipartWrite<St: Stream, Wr, F> {
15        #[pin]
16        stream: St,
17        #[pin]
18        writer: WriteBuf<Wr, St::Item, F>,
19        state: State,
20        stream_terminated: bool,
21        is_terminated: bool,
22    }
23}
24
25#[derive(Debug, Clone, Copy)]
26enum State {
27    Write,
28    Next,
29    Complete,
30    Shutdown,
31}
32
33impl<St: Stream, Wr, F> FeedMultipartWrite<St, Wr, F> {
34    pub(super) fn new(stream: St, writer: Wr, f: F) -> Self {
35        Self {
36            stream,
37            writer: WriteBuf::new(writer, f),
38            state: State::Write,
39            stream_terminated: false,
40            is_terminated: false,
41        }
42    }
43}
44
45impl<St, Wr, F> FusedStream for FeedMultipartWrite<St, Wr, F>
46where
47    St: Stream,
48    Wr: FusedMultipartWrite<St::Item>,
49    F: FnMut(Wr::Ret) -> bool,
50{
51    fn is_terminated(&self) -> bool {
52        self.is_terminated
53    }
54}
55
56impl<St, Wr, F> Stream for FeedMultipartWrite<St, Wr, F>
57where
58    St: Stream,
59    Wr: FusedMultipartWrite<St::Item>,
60    F: FnMut(Wr::Ret) -> bool,
61{
62    type Item = Result<Wr::Output, Wr::Error>;
63
64    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65        let mut this = self.project();
66
67        loop {
68            // We can't make any more progress if this writer is fused so stop
69            // producing the stream.
70            if this.writer.inner.is_terminated() {
71                *this.is_terminated = true;
72                return Poll::Ready(None);
73            }
74
75            let next_state = match *this.state {
76                // Try to make the writer write its buffered item.
77                // Possibilities are:
78                // * Doesn't have a buffered input, return `State::Next` to poll
79                // the stream for the next item.
80                // * Returned `Some(true)`, which implies `poll_complete`.
81                // * Returned `Some(false)`, can write more items.
82                // * Error -- return the error.
83                State::Write => match ready!(this.writer.as_mut().poll_inner_send(cx)) {
84                    Ok(None) => State::Next,
85                    Ok(Some(b)) if b => State::Complete,
86                    Ok(_) => State::Write,
87                    Err(e) => {
88                        *this.state = State::Write;
89                        return Poll::Ready(Some(Err(e)));
90                    }
91                },
92                // Poll for the next upstream item.
93                // If there is none, then do the last `poll_complete` if the
94                // writer is not empty; end now otherwise.
95                State::Next => match ready!(this.stream.as_mut().poll_next(cx)) {
96                    Some(next) => {
97                        this.writer.as_mut().set_buffered(next);
98                        State::Write
99                    }
100                    _ => {
101                        *this.stream_terminated = true;
102                        if this.writer.as_mut().is_empty() {
103                            return Poll::Ready(None);
104                        }
105                        State::Complete
106                    }
107                },
108                // Produce the next item downstream.
109                State::Complete => {
110                    let output = ready!(this.writer.as_mut().poll_inner_complete(cx))?;
111                    if *this.stream_terminated {
112                        *this.state = State::Shutdown;
113                    } else {
114                        *this.state = State::Write;
115                    }
116                    return Poll::Ready(Some(Ok(output)));
117                }
118                State::Shutdown => {
119                    *this.is_terminated = true;
120                    return Poll::Ready(None);
121                }
122            };
123
124            *this.state = next_state;
125        }
126    }
127}
128
129impl<St, Wr, F> Debug for FeedMultipartWrite<St, Wr, F>
130where
131    St: Stream + Debug,
132    St::Item: Debug,
133    Wr: Debug,
134{
135    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
136        f.debug_struct("FeedMultipartWrite")
137            .field("stream", &self.stream)
138            .field("writer", &self.writer)
139            .field("state", &self.state)
140            .field("stream_terminated", &self.stream_terminated)
141            .field("is_terminated", &self.is_terminated)
142            .finish()
143    }
144}
145
146pin_project_lite::pin_project! {
147    struct WriteBuf<Wr, T, F> {
148        #[pin]
149        inner: Wr,
150        buffered: Option<T>,
151        f: F,
152        is_empty: bool,
153    }
154}
155
156impl<Wr, T, F> WriteBuf<Wr, T, F> {
157    fn new(inner: Wr, f: F) -> Self {
158        Self {
159            inner,
160            buffered: None,
161            f,
162            is_empty: false,
163        }
164    }
165
166    fn is_empty(&self) -> bool {
167        self.is_empty && self.buffered.is_none()
168    }
169
170    fn set_buffered(self: Pin<&mut Self>, t: T) {
171        *self.project().buffered = Some(t);
172    }
173
174    fn poll_inner_complete(
175        self: Pin<&mut Self>,
176        cx: &mut Context<'_>,
177    ) -> Poll<Result<Wr::Output, Wr::Error>>
178    where
179        Wr: MultipartWrite<T>,
180        F: FnMut(Wr::Ret) -> bool,
181    {
182        let mut this = self.project();
183        ready!(this.inner.as_mut().poll_flush(cx))?;
184        let output = ready!(this.inner.as_mut().poll_complete(cx))?;
185        *this.is_empty = true;
186        Poll::Ready(Ok(output))
187    }
188
189    fn poll_inner_send(
190        self: Pin<&mut Self>,
191        cx: &mut Context<'_>,
192    ) -> Poll<Result<Option<bool>, Wr::Error>>
193    where
194        Wr: MultipartWrite<T>,
195        F: FnMut(Wr::Ret) -> bool,
196    {
197        let mut this = self.project();
198
199        if this.buffered.is_some() {
200            ready!(this.inner.as_mut().poll_ready(cx))?;
201            let item = this.buffered.take().unwrap();
202            let ret = this.inner.as_mut().start_send(item).map(this.f)?;
203            *this.is_empty = false;
204            Poll::Ready(Ok(Some(ret)))
205        } else {
206            Poll::Ready(Ok(None))
207        }
208    }
209}
210
211impl<Wr: Debug, T: Debug, F> Debug for WriteBuf<Wr, T, F> {
212    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
213        f.debug_struct("WriteBuf")
214            .field("inner", &self.inner)
215            .field("buffered", &self.buffered)
216            .field("is_empty", &self.is_empty)
217            .finish()
218    }
219}