1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project_lite::pin_project! {
10 #[must_use = "futures do nothing unless polled"]
14 pub struct FeedMultipartWrite<St: Stream, Wr, F> {
15 #[pin]
16 stream: St,
17 #[pin]
18 writer: WriteBuf<Wr, St::Item, F>,
19 state: State,
20 stream_terminated: bool,
21 is_terminated: bool,
22 }
23}
24
25#[derive(Debug, Clone, Copy)]
26enum State {
27 Write,
28 Next,
29 Complete,
30 Shutdown,
31}
32
33impl<St: Stream, Wr, F> FeedMultipartWrite<St, Wr, F> {
34 pub(super) fn new(stream: St, writer: Wr, f: F) -> Self {
35 Self {
36 stream,
37 writer: WriteBuf::new(writer, f),
38 state: State::Write,
39 stream_terminated: false,
40 is_terminated: false,
41 }
42 }
43}
44
45impl<St, Wr, F> FusedStream for FeedMultipartWrite<St, Wr, F>
46where
47 St: FusedStream,
48 Wr: FusedMultipartWrite<St::Item>,
49 F: FnMut(Wr::Ret) -> bool,
50{
51 fn is_terminated(&self) -> bool {
52 self.is_terminated
53 }
54}
55
56impl<St, Wr, F> Stream for FeedMultipartWrite<St, Wr, F>
57where
58 St: Stream,
59 Wr: MultipartWrite<St::Item>,
60 F: FnMut(Wr::Ret) -> bool,
61{
62 type Item = Result<Wr::Output, Wr::Error>;
63
64 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65 let mut this = self.project();
66
67 loop {
68 let next_state = match *this.state {
69 State::Write => match ready!(this.writer.as_mut().poll_inner_send(cx)) {
77 Ok(None) => State::Next,
78 Ok(Some(b)) if b => State::Complete,
79 Ok(_) => State::Write,
80 Err(e) => {
81 *this.state = State::Write;
82 return Poll::Ready(Some(Err(e)));
83 }
84 },
85 State::Next => match ready!(this.stream.as_mut().poll_next(cx)) {
89 Some(next) => {
90 this.writer.as_mut().set_buffered(next);
91 State::Write
92 }
93 _ => {
94 *this.stream_terminated = true;
95 if this.writer.as_mut().is_empty() {
96 return Poll::Ready(None);
97 }
98 State::Complete
99 }
100 },
101 State::Complete => {
103 let output = ready!(this.writer.as_mut().poll_inner_complete(cx))?;
104 if *this.stream_terminated {
105 *this.state = State::Shutdown;
106 } else {
107 *this.state = State::Write;
108 }
109 return Poll::Ready(Some(Ok(output)));
110 }
111 State::Shutdown => {
112 *this.is_terminated = true;
113 return Poll::Ready(None);
114 }
115 };
116
117 *this.state = next_state;
118 }
119 }
120}
121
122impl<St, Wr, F> Debug for FeedMultipartWrite<St, Wr, F>
123where
124 St: Stream + Debug,
125 St::Item: Debug,
126 Wr: Debug,
127{
128 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
129 f.debug_struct("FeedMultipartWrite")
130 .field("stream", &self.stream)
131 .field("writer", &self.writer)
132 .field("state", &self.state)
133 .field("stream_terminated", &self.stream_terminated)
134 .field("is_terminated", &self.is_terminated)
135 .finish()
136 }
137}
138
139pin_project_lite::pin_project! {
140 struct WriteBuf<Wr, T, F> {
141 #[pin]
142 inner: Wr,
143 buffered: Option<T>,
144 f: F,
145 is_empty: bool,
146 }
147}
148
149impl<Wr, T, F> WriteBuf<Wr, T, F> {
150 fn new(inner: Wr, f: F) -> Self {
151 Self {
152 inner,
153 buffered: None,
154 f,
155 is_empty: false,
156 }
157 }
158
159 fn is_empty(&self) -> bool {
160 self.is_empty && self.buffered.is_none()
161 }
162
163 fn set_buffered(self: Pin<&mut Self>, t: T) {
164 *self.project().buffered = Some(t);
165 }
166
167 fn poll_inner_complete(
168 self: Pin<&mut Self>,
169 cx: &mut Context<'_>,
170 ) -> Poll<Result<Wr::Output, Wr::Error>>
171 where
172 Wr: MultipartWrite<T>,
173 F: FnMut(Wr::Ret) -> bool,
174 {
175 let mut this = self.project();
176 ready!(this.inner.as_mut().poll_flush(cx))?;
177 let output = ready!(this.inner.as_mut().poll_complete(cx))?;
178 *this.is_empty = true;
179 Poll::Ready(Ok(output))
180 }
181
182 fn poll_inner_send(
183 self: Pin<&mut Self>,
184 cx: &mut Context<'_>,
185 ) -> Poll<Result<Option<bool>, Wr::Error>>
186 where
187 Wr: MultipartWrite<T>,
188 F: FnMut(Wr::Ret) -> bool,
189 {
190 let mut this = self.project();
191
192 if this.buffered.is_some() {
193 ready!(this.inner.as_mut().poll_ready(cx))?;
194 let item = this.buffered.take().unwrap();
195 let ret = this.inner.as_mut().start_send(item).map(this.f)?;
196 *this.is_empty = false;
197 Poll::Ready(Ok(Some(ret)))
198 } else {
199 Poll::Ready(Ok(None))
200 }
201 }
202}
203
204impl<Wr: Debug, T: Debug, F> Debug for WriteBuf<Wr, T, F> {
205 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
206 f.debug_struct("WriteBuf")
207 .field("inner", &self.inner)
208 .field("buffered", &self.buffered)
209 .field("is_empty", &self.is_empty)
210 .finish()
211 }
212}