1use crate::body::common::length_from_headers;
2use futures::prelude::*;
3use std::cmp::min;
4use std::io;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8pub struct BodyEncode<IO: AsyncWrite + Unpin> {
9 transport: IO,
10 state: BodyEncodeState,
11}
12
13impl<IO: AsyncWrite + Unpin> BodyEncode<IO> {
14 pub fn new(transport: IO, length: Option<u64>) -> Self {
15 BodyEncodeState::new(length).into_async_write(transport)
16 }
17 pub fn checkpoint(self) -> (IO, BodyEncodeState) {
18 (self.transport, self.state)
19 }
20 pub fn from_headers(headers: &http::header::HeaderMap, transport: IO) -> anyhow::Result<Self> {
21 Ok(BodyEncodeState::from_headers(headers)?.into_async_write(transport))
22 }
23}
24
25impl<IO: AsyncWrite + Unpin> AsyncWrite for BodyEncode<IO> {
26 fn poll_write(
27 self: Pin<&mut Self>,
28 cx: &mut Context<'_>,
29 buf: &[u8],
30 ) -> Poll<io::Result<usize>> {
31 let this = self.get_mut();
32 this.state.poll_write(&mut this.transport, cx, buf)
33 }
34
35 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
36 let this = self.get_mut();
37 this.state.poll_flush(&mut this.transport, cx)
38 }
39
40 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
41 let this = self.get_mut();
42 this.state.poll_close(&mut this.transport, cx)
43 }
44}
45
46pub enum BodyEncodeState {
47 Fixed {
48 _compression_state: (),
49 remaining: u64,
50 },
51 Chunked(Chunked),
52 Failed,
53 Closed,
54}
55
56fn err_kind<T>(kind: io::ErrorKind) -> Poll<io::Result<T>> {
57 Poll::Ready(Err(kind.into()))
58}
59
60impl BodyEncodeState {
61 pub fn from_headers(headers: &http::header::HeaderMap) -> anyhow::Result<Self> {
62 Ok(Self::new(length_from_headers(headers)?))
63 }
64 pub fn new(length: Option<u64>) -> Self {
65 match length {
66 None => Self::Chunked(Chunked {
67 buffer: [0u8; 1300],
68 buffered: 0,
69 written: None,
70 closing: false,
71 }),
72 Some(remaining) => Self::Fixed {
73 _compression_state: (),
74 remaining,
75 },
76 }
77 }
78 pub fn into_async_write<IO: AsyncWrite + Unpin>(self, transport: IO) -> BodyEncode<IO> {
79 BodyEncode {
80 transport,
81 state: self,
82 }
83 }
84 pub fn poll_write<IO: AsyncWrite + Unpin>(
85 &mut self,
86 mut transport: IO,
87 cx: &mut Context<'_>,
88 buf: &[u8],
89 ) -> Poll<io::Result<usize>> {
90 match self {
91 BodyEncodeState::Fixed { remaining: 0, .. } => {
92 return match buf.len() {
93 0 => Poll::Ready(Ok(0)),
94 _ => err_kind(io::ErrorKind::InvalidData),
95 };
96 }
97 BodyEncodeState::Fixed { remaining, .. } => {
98 let max_len = match (buf.len() as u64) < *remaining {
99 true => buf.len(),
100 false => *remaining as usize,
101 };
102 return match Pin::new(&mut transport).poll_write(cx, &buf[0..max_len]) {
103 Poll::Ready(Err(err)) => {
104 *self = BodyEncodeState::Failed;
105 Poll::Ready(Err(err))
106 }
107 Poll::Ready(Ok(n)) => {
108 *remaining -= n as u64;
109 Poll::Ready(Ok(n))
110 }
111 Poll::Pending => Poll::Pending,
112 };
113 }
114 BodyEncodeState::Chunked(chunked) => match chunked.poll_write(transport, cx, buf) {
115 Poll::Ready(Err(err)) => {
116 *self = BodyEncodeState::Failed;
117 Poll::Ready(Err(err))
118 }
119 p => p,
120 },
121 BodyEncodeState::Failed => err_kind(io::ErrorKind::BrokenPipe),
122 BodyEncodeState::Closed => err_kind(io::ErrorKind::BrokenPipe),
123 }
124 }
125 fn poll_flush<IO: AsyncWrite + Unpin>(
126 &mut self,
127 mut transport: IO,
128 cx: &mut Context<'_>,
129 ) -> Poll<io::Result<()>> {
130 match self {
131 BodyEncodeState::Fixed { .. } => match Pin::new(&mut transport).poll_flush(cx) {
132 Poll::Ready(Err(err)) => {
133 *self = BodyEncodeState::Failed;
134 Poll::Ready(Err(err))
135 }
136 p => p,
137 },
138 BodyEncodeState::Chunked(chunked) => match chunked.poll_flush(transport, cx) {
139 Poll::Ready(Err(err)) => {
140 *self = BodyEncodeState::Failed;
141 Poll::Ready(Err(err))
142 }
143 p => p,
144 },
145 BodyEncodeState::Failed => err_kind(io::ErrorKind::BrokenPipe),
146 BodyEncodeState::Closed => err_kind(io::ErrorKind::BrokenPipe),
147 }
148 }
149 fn poll_close<IO: AsyncWrite + Unpin>(
150 &mut self,
151 mut transport: IO,
152 cx: &mut Context<'_>,
153 ) -> Poll<io::Result<()>> {
154 match self {
155 BodyEncodeState::Fixed { .. } => match Pin::new(&mut transport).poll_close(cx) {
156 Poll::Ready(Err(err)) => {
157 *self = BodyEncodeState::Failed;
158 Poll::Ready(Err(err))
159 }
160 Poll::Ready(Ok(())) => {
161 *self = BodyEncodeState::Closed;
162 Poll::Ready(Ok(()))
163 }
164 Poll::Pending => Poll::Pending,
165 },
166 BodyEncodeState::Chunked(chunked) => match chunked.poll_close(transport, cx) {
167 Poll::Ready(Err(err)) => {
168 *self = BodyEncodeState::Failed;
169 Poll::Ready(Err(err))
170 }
171 Poll::Ready(Ok(())) => {
172 *self = BodyEncodeState::Closed;
173 Poll::Ready(Ok(()))
174 }
175 Poll::Pending => Poll::Pending,
176 },
177 BodyEncodeState::Failed => err_kind(io::ErrorKind::BrokenPipe),
178 BodyEncodeState::Closed => Poll::Ready(Ok(())),
179 }
180 }
181}
182
183pub struct Chunked {
184 buffer: [u8; 1300],
185 buffered: usize,
186 written: Option<usize>,
187 closing: bool,
188}
189
190const BUFFER_HEAD: usize = 5;
191const BUFFER_TAIL: usize = 2;
192
193impl Chunked {
194 fn poll_write<IO: AsyncWrite + Unpin>(
195 &mut self,
196 mut transport: IO,
197 cx: &mut Context<'_>,
198 buf: &[u8],
199 ) -> Poll<io::Result<usize>> {
200 loop {
201 if self.closing && buf.len() > 0 {
202 return err_kind(io::ErrorKind::InvalidData);
203 }
204 let mut n = 0;
205 if self.written == None {
206 n += self.append(buf);
207 }
208 return match self.poll(&mut transport, cx) {
209 Poll::Pending => match n {
210 0 => Poll::Pending,
211 n => Poll::Ready(Ok(n)),
212 },
213 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
214 Poll::Ready(Ok(())) => match n {
215 0 => continue,
216 n => Poll::Ready(Ok(n)),
217 },
218 };
219 }
220 }
221 fn poll_flush<IO: AsyncWrite + Unpin>(
222 &mut self,
223 mut transport: IO,
224 cx: &mut Context<'_>,
225 ) -> Poll<io::Result<()>> {
226 if self.buffered > 0 && self.written == None {
227 self.finalize_chunk();
228 }
229 while self.written != None {
230 match self.poll(Pin::new(&mut transport), cx) {
231 Poll::Ready(Ok(())) => {}
232 p => return p,
233 }
234 }
235 Pin::new(&mut transport).poll_flush(cx)
236 }
237 fn poll_close<IO: AsyncWrite + Unpin>(
238 &mut self,
239 mut transport: IO,
240 cx: &mut Context<'_>,
241 ) -> Poll<io::Result<()>> {
242 while self.written != None || !self.closing {
243 if self.written == None {
244 if self.buffered == 0 {
245 self.closing = true;
246 }
247 self.finalize_chunk();
248 }
249 match self.poll(Pin::new(&mut transport), cx) {
250 Poll::Ready(Ok(())) => {}
251 p => return p,
252 }
253 }
254 Pin::new(&mut transport).poll_close(cx)
255 }
256 fn append(&mut self, buf: &[u8]) -> usize {
257 let off = BUFFER_HEAD + self.buffered;
258 let n = min(buf.len(), self.buffer.len() - off - BUFFER_TAIL);
259 self.buffer[off..off + n].copy_from_slice(&buf[0..n]);
260 self.buffered += n;
261 if self.buffered + BUFFER_TAIL + BUFFER_HEAD == self.buffer.len() {
262 self.finalize_chunk();
263 }
264 n
265 }
266 fn finalize_chunk(&mut self) {
267 self.buffer[BUFFER_HEAD - 2..BUFFER_HEAD].copy_from_slice(b"\r\n");
268 let end = BUFFER_HEAD + self.buffered + BUFFER_TAIL;
269 self.buffer[end - 2..end].copy_from_slice(b"\r\n");
270 let mut len = self.buffered;
271 let mut start = BUFFER_HEAD - 2;
272 while len > 0 || start == BUFFER_HEAD - 2 {
273 let digit = len & 15;
274 len /= 16;
275 start -= 1;
276 self.buffer[start] = match digit {
277 0..=9 => b'0' + digit as u8,
278 10..=15 => b'A' - 10 + digit as u8,
279 _ => unreachable!(),
280 };
281 }
282 self.written = Some(start);
283 }
284 fn poll<IO: AsyncWrite + Unpin>(
285 &mut self,
286 mut transport: IO,
287 cx: &mut Context<'_>,
288 ) -> Poll<io::Result<()>> {
289 match self.written {
290 None => Poll::Ready(Ok(())),
291 Some(written) => {
292 let end = BUFFER_HEAD + self.buffered + BUFFER_TAIL;
293 match Pin::new(&mut transport).poll_write(cx, &self.buffer[written..end]) {
294 Poll::Ready(Ok(n)) => {
295 self.written = Some(written + n);
296 if self.written == Some(end) {
297 self.buffered = 0;
298 self.written = None;
299 }
300 Poll::Ready(Ok(()))
301 }
302 Poll::Pending => Poll::Pending,
303 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
304 }
305 }
306 }
307 }
308}