multipart_write/stream/
feed_multipart_write.rs

1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project_lite::pin_project! {
10    /// Stream for  [`feed_multipart_write`].
11    ///
12    /// [`feed_multipart_write`]: super::MultipartStreamExt::feed_multipart_write
13    #[must_use = "futures do nothing unless polled"]
14    pub struct FeedMultipartWrite<St: Stream, Wr, F> {
15        #[pin]
16        stream: St,
17        #[pin]
18        writer: WriteBuf<Wr, St::Item, F>,
19        state: State,
20        stream_terminated: bool,
21        is_terminated: bool,
22    }
23}
24
25#[derive(Debug, Clone, Copy)]
26enum State {
27    Write,
28    Next,
29    Complete,
30    Shutdown,
31}
32
33impl<St: Stream, Wr, F> FeedMultipartWrite<St, Wr, F> {
34    pub(super) fn new(stream: St, writer: Wr, f: F) -> Self {
35        Self {
36            stream,
37            writer: WriteBuf::new(writer, f),
38            state: State::Write,
39            stream_terminated: false,
40            is_terminated: false,
41        }
42    }
43}
44
45impl<St, Wr, F> FusedStream for FeedMultipartWrite<St, Wr, F>
46where
47    St: FusedStream,
48    Wr: FusedMultipartWrite<St::Item>,
49    F: FnMut(Wr::Ret) -> bool,
50{
51    fn is_terminated(&self) -> bool {
52        self.is_terminated
53    }
54}
55
56impl<St, Wr, F> Stream for FeedMultipartWrite<St, Wr, F>
57where
58    St: Stream,
59    Wr: MultipartWrite<St::Item>,
60    F: FnMut(Wr::Ret) -> bool,
61{
62    type Item = Result<Wr::Output, Wr::Error>;
63
64    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65        let mut this = self.project();
66
67        loop {
68            let next_state = match *this.state {
69                // Try to make the writer write its buffered item.
70                // Possibilities are:
71                // * Doesn't have a buffered input, return `State::Next` to poll
72                // the stream for the next item.
73                // * Returned `Some(true)`, which implies `poll_complete`.
74                // * Returned `Some(false)`, can write more items.
75                // * Error -- return the error.
76                State::Write => match ready!(this.writer.as_mut().poll_inner_send(cx)) {
77                    Ok(None) => State::Next,
78                    Ok(Some(b)) if b => State::Complete,
79                    Ok(_) => State::Write,
80                    Err(e) => {
81                        *this.state = State::Write;
82                        return Poll::Ready(Some(Err(e)));
83                    }
84                },
85                // Poll for the next upstream item.
86                // If there is none, then do the last `poll_complete` if the
87                // writer is not empty; end now otherwise.
88                State::Next => match ready!(this.stream.as_mut().poll_next(cx)) {
89                    Some(next) => {
90                        this.writer.as_mut().set_buffered(next);
91                        State::Write
92                    }
93                    _ => {
94                        *this.stream_terminated = true;
95                        if this.writer.as_mut().is_empty() {
96                            return Poll::Ready(None);
97                        }
98                        State::Complete
99                    }
100                },
101                // Produce the next item downstream.
102                State::Complete => {
103                    let output = ready!(this.writer.as_mut().poll_inner_complete(cx))?;
104                    if *this.stream_terminated {
105                        *this.state = State::Shutdown;
106                    } else {
107                        *this.state = State::Write;
108                    }
109                    return Poll::Ready(Some(Ok(output)));
110                }
111                State::Shutdown => {
112                    *this.is_terminated = true;
113                    return Poll::Ready(None);
114                }
115            };
116
117            *this.state = next_state;
118        }
119    }
120}
121
122impl<St, Wr, F> Debug for FeedMultipartWrite<St, Wr, F>
123where
124    St: Stream + Debug,
125    St::Item: Debug,
126    Wr: Debug,
127{
128    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
129        f.debug_struct("FeedMultipartWrite")
130            .field("stream", &self.stream)
131            .field("writer", &self.writer)
132            .field("state", &self.state)
133            .field("stream_terminated", &self.stream_terminated)
134            .field("is_terminated", &self.is_terminated)
135            .finish()
136    }
137}
138
139pin_project_lite::pin_project! {
140    struct WriteBuf<Wr, T, F> {
141        #[pin]
142        inner: Wr,
143        buffered: Option<T>,
144        f: F,
145        is_empty: bool,
146    }
147}
148
149impl<Wr, T, F> WriteBuf<Wr, T, F> {
150    fn new(inner: Wr, f: F) -> Self {
151        Self {
152            inner,
153            buffered: None,
154            f,
155            is_empty: false,
156        }
157    }
158
159    fn is_empty(&self) -> bool {
160        self.is_empty && self.buffered.is_none()
161    }
162
163    fn set_buffered(self: Pin<&mut Self>, t: T) {
164        *self.project().buffered = Some(t);
165    }
166
167    fn poll_inner_complete(
168        self: Pin<&mut Self>,
169        cx: &mut Context<'_>,
170    ) -> Poll<Result<Wr::Output, Wr::Error>>
171    where
172        Wr: MultipartWrite<T>,
173        F: FnMut(Wr::Ret) -> bool,
174    {
175        let mut this = self.project();
176        ready!(this.inner.as_mut().poll_flush(cx))?;
177        let output = ready!(this.inner.as_mut().poll_complete(cx))?;
178        *this.is_empty = true;
179        Poll::Ready(Ok(output))
180    }
181
182    fn poll_inner_send(
183        self: Pin<&mut Self>,
184        cx: &mut Context<'_>,
185    ) -> Poll<Result<Option<bool>, Wr::Error>>
186    where
187        Wr: MultipartWrite<T>,
188        F: FnMut(Wr::Ret) -> bool,
189    {
190        let mut this = self.project();
191
192        if this.buffered.is_some() {
193            ready!(this.inner.as_mut().poll_ready(cx))?;
194            let item = this.buffered.take().unwrap();
195            let ret = this.inner.as_mut().start_send(item).map(this.f)?;
196            *this.is_empty = false;
197            Poll::Ready(Ok(Some(ret)))
198        } else {
199            Poll::Ready(Ok(None))
200        }
201    }
202}
203
204impl<Wr: Debug, T: Debug, F> Debug for WriteBuf<Wr, T, F> {
205    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
206        f.debug_struct("WriteBuf")
207            .field("inner", &self.inner)
208            .field("buffered", &self.buffered)
209            .field("is_empty", &self.is_empty)
210            .finish()
211    }
212}