async_http_codec/body/
encode.rs

1use crate::body::common::length_from_headers;
2use futures::prelude::*;
3use std::cmp::min;
4use std::io;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8pub struct BodyEncode<IO: AsyncWrite + Unpin> {
9    transport: IO,
10    state: BodyEncodeState,
11}
12
13impl<IO: AsyncWrite + Unpin> BodyEncode<IO> {
14    pub fn new(transport: IO, length: Option<u64>) -> Self {
15        BodyEncodeState::new(length).into_async_write(transport)
16    }
17    pub fn checkpoint(self) -> (IO, BodyEncodeState) {
18        (self.transport, self.state)
19    }
20    pub fn from_headers(headers: &http::header::HeaderMap, transport: IO) -> anyhow::Result<Self> {
21        Ok(BodyEncodeState::from_headers(headers)?.into_async_write(transport))
22    }
23}
24
25impl<IO: AsyncWrite + Unpin> AsyncWrite for BodyEncode<IO> {
26    fn poll_write(
27        self: Pin<&mut Self>,
28        cx: &mut Context<'_>,
29        buf: &[u8],
30    ) -> Poll<io::Result<usize>> {
31        let this = self.get_mut();
32        this.state.poll_write(&mut this.transport, cx, buf)
33    }
34
35    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
36        let this = self.get_mut();
37        this.state.poll_flush(&mut this.transport, cx)
38    }
39
40    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
41        let this = self.get_mut();
42        this.state.poll_close(&mut this.transport, cx)
43    }
44}
45
46pub enum BodyEncodeState {
47    Fixed {
48        _compression_state: (),
49        remaining: u64,
50    },
51    Chunked(Chunked),
52    Failed,
53    Closed,
54}
55
56fn err_kind<T>(kind: io::ErrorKind) -> Poll<io::Result<T>> {
57    Poll::Ready(Err(kind.into()))
58}
59
60impl BodyEncodeState {
61    pub fn from_headers(headers: &http::header::HeaderMap) -> anyhow::Result<Self> {
62        Ok(Self::new(length_from_headers(headers)?))
63    }
64    pub fn new(length: Option<u64>) -> Self {
65        match length {
66            None => Self::Chunked(Chunked {
67                buffer: [0u8; 1300],
68                buffered: 0,
69                written: None,
70                closing: false,
71            }),
72            Some(remaining) => Self::Fixed {
73                _compression_state: (),
74                remaining,
75            },
76        }
77    }
78    pub fn into_async_write<IO: AsyncWrite + Unpin>(self, transport: IO) -> BodyEncode<IO> {
79        BodyEncode {
80            transport,
81            state: self,
82        }
83    }
84    pub fn poll_write<IO: AsyncWrite + Unpin>(
85        &mut self,
86        mut transport: IO,
87        cx: &mut Context<'_>,
88        buf: &[u8],
89    ) -> Poll<io::Result<usize>> {
90        match self {
91            BodyEncodeState::Fixed { remaining: 0, .. } => {
92                return match buf.len() {
93                    0 => Poll::Ready(Ok(0)),
94                    _ => err_kind(io::ErrorKind::InvalidData),
95                };
96            }
97            BodyEncodeState::Fixed { remaining, .. } => {
98                let max_len = match (buf.len() as u64) < *remaining {
99                    true => buf.len(),
100                    false => *remaining as usize,
101                };
102                return match Pin::new(&mut transport).poll_write(cx, &buf[0..max_len]) {
103                    Poll::Ready(Err(err)) => {
104                        *self = BodyEncodeState::Failed;
105                        Poll::Ready(Err(err))
106                    }
107                    Poll::Ready(Ok(n)) => {
108                        *remaining -= n as u64;
109                        Poll::Ready(Ok(n))
110                    }
111                    Poll::Pending => Poll::Pending,
112                };
113            }
114            BodyEncodeState::Chunked(chunked) => match chunked.poll_write(transport, cx, buf) {
115                Poll::Ready(Err(err)) => {
116                    *self = BodyEncodeState::Failed;
117                    Poll::Ready(Err(err))
118                }
119                p => p,
120            },
121            BodyEncodeState::Failed => err_kind(io::ErrorKind::BrokenPipe),
122            BodyEncodeState::Closed => err_kind(io::ErrorKind::BrokenPipe),
123        }
124    }
125    fn poll_flush<IO: AsyncWrite + Unpin>(
126        &mut self,
127        mut transport: IO,
128        cx: &mut Context<'_>,
129    ) -> Poll<io::Result<()>> {
130        match self {
131            BodyEncodeState::Fixed { .. } => match Pin::new(&mut transport).poll_flush(cx) {
132                Poll::Ready(Err(err)) => {
133                    *self = BodyEncodeState::Failed;
134                    Poll::Ready(Err(err))
135                }
136                p => p,
137            },
138            BodyEncodeState::Chunked(chunked) => match chunked.poll_flush(transport, cx) {
139                Poll::Ready(Err(err)) => {
140                    *self = BodyEncodeState::Failed;
141                    Poll::Ready(Err(err))
142                }
143                p => p,
144            },
145            BodyEncodeState::Failed => err_kind(io::ErrorKind::BrokenPipe),
146            BodyEncodeState::Closed => err_kind(io::ErrorKind::BrokenPipe),
147        }
148    }
149    fn poll_close<IO: AsyncWrite + Unpin>(
150        &mut self,
151        mut transport: IO,
152        cx: &mut Context<'_>,
153    ) -> Poll<io::Result<()>> {
154        match self {
155            BodyEncodeState::Fixed { .. } => match Pin::new(&mut transport).poll_close(cx) {
156                Poll::Ready(Err(err)) => {
157                    *self = BodyEncodeState::Failed;
158                    Poll::Ready(Err(err))
159                }
160                Poll::Ready(Ok(())) => {
161                    *self = BodyEncodeState::Closed;
162                    Poll::Ready(Ok(()))
163                }
164                Poll::Pending => Poll::Pending,
165            },
166            BodyEncodeState::Chunked(chunked) => match chunked.poll_close(transport, cx) {
167                Poll::Ready(Err(err)) => {
168                    *self = BodyEncodeState::Failed;
169                    Poll::Ready(Err(err))
170                }
171                Poll::Ready(Ok(())) => {
172                    *self = BodyEncodeState::Closed;
173                    Poll::Ready(Ok(()))
174                }
175                Poll::Pending => Poll::Pending,
176            },
177            BodyEncodeState::Failed => err_kind(io::ErrorKind::BrokenPipe),
178            BodyEncodeState::Closed => Poll::Ready(Ok(())),
179        }
180    }
181}
182
183pub struct Chunked {
184    buffer: [u8; 1300],
185    buffered: usize,
186    written: Option<usize>,
187    closing: bool,
188}
189
190const BUFFER_HEAD: usize = 5;
191const BUFFER_TAIL: usize = 2;
192
193impl Chunked {
194    fn poll_write<IO: AsyncWrite + Unpin>(
195        &mut self,
196        mut transport: IO,
197        cx: &mut Context<'_>,
198        buf: &[u8],
199    ) -> Poll<io::Result<usize>> {
200        loop {
201            if self.closing && buf.len() > 0 {
202                return err_kind(io::ErrorKind::InvalidData);
203            }
204            let mut n = 0;
205            if self.written == None {
206                n += self.append(buf);
207            }
208            return match self.poll(&mut transport, cx) {
209                Poll::Pending => match n {
210                    0 => Poll::Pending,
211                    n => Poll::Ready(Ok(n)),
212                },
213                Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
214                Poll::Ready(Ok(())) => match n {
215                    0 => continue,
216                    n => Poll::Ready(Ok(n)),
217                },
218            };
219        }
220    }
221    fn poll_flush<IO: AsyncWrite + Unpin>(
222        &mut self,
223        mut transport: IO,
224        cx: &mut Context<'_>,
225    ) -> Poll<io::Result<()>> {
226        if self.buffered > 0 && self.written == None {
227            self.finalize_chunk();
228        }
229        while self.written != None {
230            match self.poll(Pin::new(&mut transport), cx) {
231                Poll::Ready(Ok(())) => {}
232                p => return p,
233            }
234        }
235        Pin::new(&mut transport).poll_flush(cx)
236    }
237    fn poll_close<IO: AsyncWrite + Unpin>(
238        &mut self,
239        mut transport: IO,
240        cx: &mut Context<'_>,
241    ) -> Poll<io::Result<()>> {
242        while self.written != None || !self.closing {
243            if self.written == None {
244                if self.buffered == 0 {
245                    self.closing = true;
246                }
247                self.finalize_chunk();
248            }
249            match self.poll(Pin::new(&mut transport), cx) {
250                Poll::Ready(Ok(())) => {}
251                p => return p,
252            }
253        }
254        Pin::new(&mut transport).poll_close(cx)
255    }
256    fn append(&mut self, buf: &[u8]) -> usize {
257        let off = BUFFER_HEAD + self.buffered;
258        let n = min(buf.len(), self.buffer.len() - off - BUFFER_TAIL);
259        self.buffer[off..off + n].copy_from_slice(&buf[0..n]);
260        self.buffered += n;
261        if self.buffered + BUFFER_TAIL + BUFFER_HEAD == self.buffer.len() {
262            self.finalize_chunk();
263        }
264        n
265    }
266    fn finalize_chunk(&mut self) {
267        self.buffer[BUFFER_HEAD - 2..BUFFER_HEAD].copy_from_slice(b"\r\n");
268        let end = BUFFER_HEAD + self.buffered + BUFFER_TAIL;
269        self.buffer[end - 2..end].copy_from_slice(b"\r\n");
270        let mut len = self.buffered;
271        let mut start = BUFFER_HEAD - 2;
272        while len > 0 || start == BUFFER_HEAD - 2 {
273            let digit = len & 15;
274            len /= 16;
275            start -= 1;
276            self.buffer[start] = match digit {
277                0..=9 => b'0' + digit as u8,
278                10..=15 => b'A' - 10 + digit as u8,
279                _ => unreachable!(),
280            };
281        }
282        self.written = Some(start);
283    }
284    fn poll<IO: AsyncWrite + Unpin>(
285        &mut self,
286        mut transport: IO,
287        cx: &mut Context<'_>,
288    ) -> Poll<io::Result<()>> {
289        match self.written {
290            None => Poll::Ready(Ok(())),
291            Some(written) => {
292                let end = BUFFER_HEAD + self.buffered + BUFFER_TAIL;
293                match Pin::new(&mut transport).poll_write(cx, &self.buffer[written..end]) {
294                    Poll::Ready(Ok(n)) => {
295                        self.written = Some(written + n);
296                        if self.written == Some(end) {
297                            self.buffered = 0;
298                            self.written = None;
299                        }
300                        Poll::Ready(Ok(()))
301                    }
302                    Poll::Pending => Poll::Pending,
303                    Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
304                }
305            }
306        }
307    }
308}