multipart_write/stream/
feed_multipart_write.rs

1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures::ready;
4use futures::stream::{FusedStream, Stream};
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9/// Stream for  [`feed_multipart_write`].
10///
11/// [`feed_multipart_write`]: super::MultipartStreamExt::feed_multipart_write
12#[derive(Debug)]
13#[must_use = "futures do nothing unless polled"]
14#[pin_project::pin_project]
15pub struct FeedMultipartWrite<St: Stream, Wr, F> {
16    #[pin]
17    stream: St,
18    #[pin]
19    writer: WriteBuf<Wr, St::Item, F>,
20    state: State,
21    stream_terminated: bool,
22}
23
24#[derive(Debug, Clone, Copy)]
25enum State {
26    Write,
27    Next,
28    Complete,
29    Shutdown,
30}
31
32impl<St: Stream, Wr, F> FeedMultipartWrite<St, Wr, F> {
33    pub(super) fn new(stream: St, writer: Wr, f: F) -> Self {
34        Self {
35            stream,
36            writer: WriteBuf::new(writer, f),
37            state: State::Write,
38            stream_terminated: false,
39        }
40    }
41}
42
43impl<St, Wr, F> FusedStream for FeedMultipartWrite<St, Wr, F>
44where
45    St: Stream,
46    Wr: FusedMultipartWrite<St::Item>,
47    F: FnMut(Wr::Ret) -> bool,
48{
49    fn is_terminated(&self) -> bool {
50        self.stream_terminated || self.writer.inner.is_terminated()
51    }
52}
53
54impl<St, Wr, F> Stream for FeedMultipartWrite<St, Wr, F>
55where
56    St: Stream,
57    Wr: MultipartWrite<St::Item>,
58    F: FnMut(Wr::Ret) -> bool,
59{
60    type Item = Result<Wr::Output, Wr::Error>;
61
62    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63        let mut this = self.project();
64
65        loop {
66            let next_state = match *this.state {
67                // Try to make the writer write its buffered item.
68                // Possibilities are:
69                // * Doesn't have a buffered input, return `State::Next` to poll
70                // the stream for the next item.
71                // * Returned `Some(true)`, which implies `poll_complete`.
72                // * Returned `Some(false)`, can write more items.
73                // * Error -- return the error.
74                State::Write => match ready!(this.writer.as_mut().poll_inner_send(cx)) {
75                    Ok(None) => State::Next,
76                    Ok(Some(b)) if b => State::Complete,
77                    Ok(_) => State::Write,
78                    Err(e) => {
79                        *this.state = State::Write;
80                        return Poll::Ready(Some(Err(e)));
81                    }
82                },
83                // Poll for the next upstream item.
84                // If there is none, then do the last `poll_complete` if the
85                // writer is not empty; end now otherwise.
86                State::Next => match ready!(this.stream.as_mut().poll_next(cx)) {
87                    Some(next) => {
88                        this.writer.as_mut().set_buffered(next);
89                        State::Write
90                    }
91                    _ => {
92                        *this.stream_terminated = true;
93                        if this.writer.as_mut().is_empty() {
94                            return Poll::Ready(None);
95                        }
96                        State::Complete
97                    }
98                },
99                // Produce the next item downstream.
100                State::Complete => {
101                    let output = ready!(this.writer.as_mut().poll_inner_complete(cx))?;
102                    if *this.stream_terminated {
103                        *this.state = State::Shutdown;
104                    } else {
105                        *this.state = State::Write;
106                    }
107                    return Poll::Ready(Some(Ok(output)));
108                }
109                State::Shutdown => return Poll::Ready(None),
110            };
111
112            *this.state = next_state;
113        }
114    }
115}
116
117#[pin_project::pin_project]
118struct WriteBuf<Wr, T, F> {
119    #[pin]
120    inner: Wr,
121    buffered: Option<T>,
122    f: F,
123    is_empty: bool,
124}
125
126impl<Wr, T, F> WriteBuf<Wr, T, F> {
127    fn new(inner: Wr, f: F) -> Self {
128        Self {
129            inner,
130            buffered: None,
131            f,
132            is_empty: false,
133        }
134    }
135
136    fn is_empty(&self) -> bool {
137        self.is_empty && self.buffered.is_none()
138    }
139
140    fn set_buffered(self: Pin<&mut Self>, t: T) {
141        *self.project().buffered = Some(t);
142    }
143
144    fn poll_inner_complete(
145        self: Pin<&mut Self>,
146        cx: &mut Context<'_>,
147    ) -> Poll<Result<Wr::Output, Wr::Error>>
148    where
149        Wr: MultipartWrite<T>,
150        F: FnMut(Wr::Ret) -> bool,
151    {
152        let mut this = self.project();
153        ready!(this.inner.as_mut().poll_flush(cx))?;
154        let output = ready!(this.inner.as_mut().poll_complete(cx))?;
155        *this.is_empty = true;
156        Poll::Ready(Ok(output))
157    }
158
159    fn poll_inner_send(
160        self: Pin<&mut Self>,
161        cx: &mut Context<'_>,
162    ) -> Poll<Result<Option<bool>, Wr::Error>>
163    where
164        Wr: MultipartWrite<T>,
165        F: FnMut(Wr::Ret) -> bool,
166    {
167        let mut this = self.project();
168
169        if this.buffered.is_some() {
170            ready!(this.inner.as_mut().poll_ready(cx))?;
171            let item = this.buffered.take().unwrap();
172            let ret = this.inner.as_mut().start_send(item).map(this.f)?;
173            *this.is_empty = false;
174            Poll::Ready(Ok(Some(ret)))
175        } else {
176            Poll::Ready(Ok(None))
177        }
178    }
179}
180
181impl<Wr: Debug, T: Debug, F> Debug for WriteBuf<Wr, T, F> {
182    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
183        f.debug_struct("WriteBuf")
184            .field("inner", &self.inner)
185            .field("buffered", &self.buffered)
186            .field("is_empty", &self.is_empty)
187            .finish()
188    }
189}