1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project_lite::pin_project! {
10 #[must_use = "futures do nothing unless polled"]
14 pub struct FeedMultipartWrite<St: Stream, Wr, F> {
15 #[pin]
16 stream: St,
17 #[pin]
18 writer: WriteBuf<Wr, St::Item, F>,
19 state: State,
20 stream_terminated: bool,
21 is_terminated: bool,
22 }
23}
24
25#[derive(Debug, Clone, Copy)]
26enum State {
27 Write,
28 Next,
29 Complete,
30 Shutdown,
31}
32
33impl<St: Stream, Wr, F> FeedMultipartWrite<St, Wr, F> {
34 pub(super) fn new(stream: St, writer: Wr, f: F) -> Self {
35 Self {
36 stream,
37 writer: WriteBuf::new(writer, f),
38 state: State::Write,
39 stream_terminated: false,
40 is_terminated: false,
41 }
42 }
43}
44
45impl<St, Wr, F> FusedStream for FeedMultipartWrite<St, Wr, F>
46where
47 St: Stream,
48 Wr: FusedMultipartWrite<St::Item>,
49 F: FnMut(Wr::Ret) -> bool,
50{
51 fn is_terminated(&self) -> bool {
52 self.is_terminated
53 }
54}
55
56impl<St, Wr, F> Stream for FeedMultipartWrite<St, Wr, F>
57where
58 St: Stream,
59 Wr: FusedMultipartWrite<St::Item>,
60 F: FnMut(Wr::Ret) -> bool,
61{
62 type Item = Result<Wr::Output, Wr::Error>;
63
64 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65 let mut this = self.project();
66
67 loop {
68 if this.writer.inner.is_terminated() {
71 *this.is_terminated = true;
72 return Poll::Ready(None);
73 }
74
75 let next_state = match *this.state {
76 State::Write => match ready!(this.writer.as_mut().poll_inner_send(cx)) {
84 Ok(None) => State::Next,
85 Ok(Some(b)) if b => State::Complete,
86 Ok(_) => State::Write,
87 Err(e) => {
88 *this.state = State::Write;
89 return Poll::Ready(Some(Err(e)));
90 }
91 },
92 State::Next => match ready!(this.stream.as_mut().poll_next(cx)) {
96 Some(next) => {
97 this.writer.as_mut().set_buffered(next);
98 State::Write
99 }
100 _ => {
101 *this.stream_terminated = true;
102 if this.writer.as_mut().is_empty() {
103 return Poll::Ready(None);
104 }
105 State::Complete
106 }
107 },
108 State::Complete => {
110 let output = ready!(this.writer.as_mut().poll_inner_complete(cx))?;
111 if *this.stream_terminated {
112 *this.state = State::Shutdown;
113 } else {
114 *this.state = State::Write;
115 }
116 return Poll::Ready(Some(Ok(output)));
117 }
118 State::Shutdown => {
119 *this.is_terminated = true;
120 return Poll::Ready(None);
121 }
122 };
123
124 *this.state = next_state;
125 }
126 }
127}
128
129impl<St, Wr, F> Debug for FeedMultipartWrite<St, Wr, F>
130where
131 St: Stream + Debug,
132 St::Item: Debug,
133 Wr: Debug,
134{
135 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
136 f.debug_struct("FeedMultipartWrite")
137 .field("stream", &self.stream)
138 .field("writer", &self.writer)
139 .field("state", &self.state)
140 .field("stream_terminated", &self.stream_terminated)
141 .field("is_terminated", &self.is_terminated)
142 .finish()
143 }
144}
145
146pin_project_lite::pin_project! {
147 struct WriteBuf<Wr, T, F> {
148 #[pin]
149 inner: Wr,
150 buffered: Option<T>,
151 f: F,
152 is_empty: bool,
153 }
154}
155
156impl<Wr, T, F> WriteBuf<Wr, T, F> {
157 fn new(inner: Wr, f: F) -> Self {
158 Self {
159 inner,
160 buffered: None,
161 f,
162 is_empty: false,
163 }
164 }
165
166 fn is_empty(&self) -> bool {
167 self.is_empty && self.buffered.is_none()
168 }
169
170 fn set_buffered(self: Pin<&mut Self>, t: T) {
171 *self.project().buffered = Some(t);
172 }
173
174 fn poll_inner_complete(
175 self: Pin<&mut Self>,
176 cx: &mut Context<'_>,
177 ) -> Poll<Result<Wr::Output, Wr::Error>>
178 where
179 Wr: MultipartWrite<T>,
180 F: FnMut(Wr::Ret) -> bool,
181 {
182 let mut this = self.project();
183 ready!(this.inner.as_mut().poll_flush(cx))?;
184 let output = ready!(this.inner.as_mut().poll_complete(cx))?;
185 *this.is_empty = true;
186 Poll::Ready(Ok(output))
187 }
188
189 fn poll_inner_send(
190 self: Pin<&mut Self>,
191 cx: &mut Context<'_>,
192 ) -> Poll<Result<Option<bool>, Wr::Error>>
193 where
194 Wr: MultipartWrite<T>,
195 F: FnMut(Wr::Ret) -> bool,
196 {
197 let mut this = self.project();
198
199 if this.buffered.is_some() {
200 ready!(this.inner.as_mut().poll_ready(cx))?;
201 let item = this.buffered.take().unwrap();
202 let ret = this.inner.as_mut().start_send(item).map(this.f)?;
203 *this.is_empty = false;
204 Poll::Ready(Ok(Some(ret)))
205 } else {
206 Poll::Ready(Ok(None))
207 }
208 }
209}
210
211impl<Wr: Debug, T: Debug, F> Debug for WriteBuf<Wr, T, F> {
212 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
213 f.debug_struct("WriteBuf")
214 .field("inner", &self.inner)
215 .field("buffered", &self.buffered)
216 .field("is_empty", &self.is_empty)
217 .finish()
218 }
219}