1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures::ready;
4use futures::stream::{FusedStream, Stream};
5use std::fmt::{self, Debug, Formatter};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9#[derive(Debug)]
13#[must_use = "futures do nothing unless polled"]
14#[pin_project::pin_project]
15pub struct FeedMultipartWrite<St: Stream, Wr, F> {
16 #[pin]
17 stream: St,
18 #[pin]
19 writer: WriteBuf<Wr, St::Item, F>,
20 state: State,
21 stream_terminated: bool,
22}
23
24#[derive(Debug, Clone, Copy)]
25enum State {
26 Write,
27 Next,
28 Complete,
29 Shutdown,
30}
31
32impl<St: Stream, Wr, F> FeedMultipartWrite<St, Wr, F> {
33 pub(super) fn new(stream: St, writer: Wr, f: F) -> Self {
34 Self {
35 stream,
36 writer: WriteBuf::new(writer, f),
37 state: State::Write,
38 stream_terminated: false,
39 }
40 }
41}
42
43impl<St, Wr, F> FusedStream for FeedMultipartWrite<St, Wr, F>
44where
45 St: Stream,
46 Wr: FusedMultipartWrite<St::Item>,
47 F: FnMut(Wr::Ret) -> bool,
48{
49 fn is_terminated(&self) -> bool {
50 self.stream_terminated || self.writer.inner.is_terminated()
51 }
52}
53
54impl<St, Wr, F> Stream for FeedMultipartWrite<St, Wr, F>
55where
56 St: Stream,
57 Wr: MultipartWrite<St::Item>,
58 F: FnMut(Wr::Ret) -> bool,
59{
60 type Item = Result<Wr::Output, Wr::Error>;
61
62 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63 let mut this = self.project();
64
65 loop {
66 let next_state = match *this.state {
67 State::Write => match ready!(this.writer.as_mut().poll_inner_send(cx)) {
75 Ok(None) => State::Next,
76 Ok(Some(b)) if b => State::Complete,
77 Ok(_) => State::Write,
78 Err(e) => {
79 *this.state = State::Write;
80 return Poll::Ready(Some(Err(e)));
81 }
82 },
83 State::Next => match ready!(this.stream.as_mut().poll_next(cx)) {
87 Some(next) => {
88 this.writer.as_mut().set_buffered(next);
89 State::Write
90 }
91 _ => {
92 *this.stream_terminated = true;
93 if this.writer.as_mut().is_empty() {
94 return Poll::Ready(None);
95 }
96 State::Complete
97 }
98 },
99 State::Complete => {
101 let output = ready!(this.writer.as_mut().poll_inner_complete(cx))?;
102 if *this.stream_terminated {
103 *this.state = State::Shutdown;
104 } else {
105 *this.state = State::Write;
106 }
107 return Poll::Ready(Some(Ok(output)));
108 }
109 State::Shutdown => return Poll::Ready(None),
110 };
111
112 *this.state = next_state;
113 }
114 }
115}
116
117#[pin_project::pin_project]
118struct WriteBuf<Wr, T, F> {
119 #[pin]
120 inner: Wr,
121 buffered: Option<T>,
122 f: F,
123 is_empty: bool,
124}
125
126impl<Wr, T, F> WriteBuf<Wr, T, F> {
127 fn new(inner: Wr, f: F) -> Self {
128 Self {
129 inner,
130 buffered: None,
131 f,
132 is_empty: false,
133 }
134 }
135
136 fn is_empty(&self) -> bool {
137 self.is_empty && self.buffered.is_none()
138 }
139
140 fn set_buffered(self: Pin<&mut Self>, t: T) {
141 *self.project().buffered = Some(t);
142 }
143
144 fn poll_inner_complete(
145 self: Pin<&mut Self>,
146 cx: &mut Context<'_>,
147 ) -> Poll<Result<Wr::Output, Wr::Error>>
148 where
149 Wr: MultipartWrite<T>,
150 F: FnMut(Wr::Ret) -> bool,
151 {
152 let mut this = self.project();
153 ready!(this.inner.as_mut().poll_flush(cx))?;
154 let output = ready!(this.inner.as_mut().poll_complete(cx))?;
155 *this.is_empty = true;
156 Poll::Ready(Ok(output))
157 }
158
159 fn poll_inner_send(
160 self: Pin<&mut Self>,
161 cx: &mut Context<'_>,
162 ) -> Poll<Result<Option<bool>, Wr::Error>>
163 where
164 Wr: MultipartWrite<T>,
165 F: FnMut(Wr::Ret) -> bool,
166 {
167 let mut this = self.project();
168
169 if this.buffered.is_some() {
170 ready!(this.inner.as_mut().poll_ready(cx))?;
171 let item = this.buffered.take().unwrap();
172 let ret = this.inner.as_mut().start_send(item).map(this.f)?;
173 *this.is_empty = false;
174 Poll::Ready(Ok(Some(ret)))
175 } else {
176 Poll::Ready(Ok(None))
177 }
178 }
179}
180
181impl<Wr: Debug, T: Debug, F> Debug for WriteBuf<Wr, T, F> {
182 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
183 f.debug_struct("WriteBuf")
184 .field("inner", &self.inner)
185 .field("buffered", &self.buffered)
186 .field("is_empty", &self.is_empty)
187 .finish()
188 }
189}