async_http_codec/body/
decode.rs

1use crate::body::common::length_from_headers;
2use futures::prelude::*;
3use std::borrow::BorrowMut;
4use std::io;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8pub struct BodyDecode<T: BorrowMut<BodyDecodeState> + Unpin, IO: AsyncRead + Unpin> {
9    transport: IO,
10    state: T,
11}
12
13impl<IO: AsyncRead + Unpin> BodyDecode<BodyDecodeState, IO> {
14    pub fn new(transport: IO, length: Option<u64>) -> Self {
15        BodyDecodeState::new(length).into_async_read(transport)
16    }
17    pub fn from_headers(headers: &http::header::HeaderMap, transport: IO) -> anyhow::Result<Self> {
18        Ok(BodyDecodeState::from_headers(headers)?.into_async_read(transport))
19    }
20}
21
22impl<T: BorrowMut<BodyDecodeState> + Unpin, IO: AsyncRead + Unpin> BodyDecode<T, IO> {
23    pub fn into_inner(self) -> (T, IO) {
24        (self.state, self.transport)
25    }
26}
27
28impl<T: BorrowMut<BodyDecodeState> + Unpin, IO: AsyncRead + Unpin> AsyncRead for BodyDecode<T, IO> {
29    fn poll_read(
30        self: Pin<&mut Self>,
31        cx: &mut Context<'_>,
32        buf: &mut [u8],
33    ) -> Poll<io::Result<usize>> {
34        let this = self.get_mut();
35        this.state
36            .borrow_mut()
37            .poll_read(&mut this.transport, cx, buf)
38    }
39}
40
41pub struct BodyDecodeState {
42    parser_state: Parser,
43    _compression_state: (),
44    remaining: u64,
45}
46
47#[derive(Copy, Clone, Debug, Eq, PartialEq)]
48enum Parser {
49    FixedLength,
50    Chunked(ChunkState),
51    Failed,
52    Done,
53}
54
55#[derive(Copy, Clone, Debug, Eq, PartialEq)]
56enum ChunkState {
57    Size,
58    SizeLF,
59    Content,
60    ContentCR,
61    ContentLF,
62    EndCR,
63    EndLF,
64}
65
66fn err_kind<T>(kind: io::ErrorKind) -> Poll<io::Result<T>> {
67    Poll::Ready(Err(kind.into()))
68}
69
70impl BodyDecodeState {
71    pub fn from_headers(headers: &http::header::HeaderMap) -> anyhow::Result<Self> {
72        Ok(Self::new(length_from_headers(headers)?))
73    }
74    pub fn new(length: Option<u64>) -> Self {
75        let (parser_state, remaining) = match length {
76            Some(0) => (Parser::Done, 0),
77            Some(length) => (Parser::FixedLength, length),
78            None => (Parser::Chunked(ChunkState::Size), 0),
79        };
80        Self {
81            parser_state,
82            _compression_state: (),
83            remaining,
84        }
85    }
86    pub fn into_async_read<IO: AsyncRead + Unpin>(self, transport: IO) -> BodyDecode<Self, IO> {
87        BodyDecode {
88            transport,
89            state: self,
90        }
91    }
92    pub fn as_async_read<IO: AsyncRead + Unpin>(
93        &mut self,
94        transport: IO,
95    ) -> BodyDecode<&mut Self, IO> {
96        BodyDecode {
97            transport,
98            state: self,
99        }
100    }
101    pub fn poll_read<IO: AsyncRead + Unpin>(
102        &mut self,
103        transport: &mut IO,
104        cx: &mut Context<'_>,
105        buf: &mut [u8],
106    ) -> Poll<io::Result<usize>> {
107        loop {
108            let max_read_size = match self.parser_state {
109                Parser::Failed => return err_kind(io::ErrorKind::BrokenPipe),
110                Parser::Done => return Poll::Ready(Ok(0)),
111                Parser::FixedLength | Parser::Chunked(ChunkState::Content) => {
112                    if buf.len() as u64 > self.remaining {
113                        self.remaining as usize
114                    } else {
115                        buf.len()
116                    }
117                }
118                Parser::Chunked(chunked_state) => {
119                    let mut next = [0u8];
120                    match Pin::new(&mut *transport).poll_read(cx, &mut next) {
121                        Poll::Ready(Err(err)) => {
122                            self.parser_state = Parser::Failed;
123                            return Poll::Ready(Err(err));
124                        }
125                        Poll::Pending => return Poll::Pending,
126                        Poll::Ready(Ok(0)) => {
127                            self.parser_state = Parser::Failed;
128                            return err_kind(io::ErrorKind::UnexpectedEof);
129                        }
130                        Poll::Ready(Ok(_)) => {
131                            self.parser_state = match (chunked_state, next[0]) {
132                                (ChunkState::Size, b'\r') => Parser::Chunked(ChunkState::SizeLF),
133                                (ChunkState::Size, hex_digit) => {
134                                    self.remaining *= 16;
135                                    self.remaining += match hex_digit {
136                                        b'0'..=b'9' => 0 + hex_digit - b'0',
137                                        b'a'..=b'f' => 10 + hex_digit - b'a',
138                                        b'A'..=b'F' => 10 + hex_digit - b'A',
139                                        _ => {
140                                            self.parser_state = Parser::Failed;
141                                            return err_kind(io::ErrorKind::InvalidData);
142                                        }
143                                    } as u64;
144                                    Parser::Chunked(ChunkState::Size)
145                                }
146                                (ChunkState::SizeLF, b'\n') => match self.remaining {
147                                    0 => Parser::Chunked(ChunkState::EndCR),
148                                    _ => Parser::Chunked(ChunkState::Content),
149                                },
150                                (ChunkState::Content, _) => unreachable!(),
151                                (ChunkState::ContentCR, b'\r') => {
152                                    Parser::Chunked(ChunkState::ContentLF)
153                                }
154                                (ChunkState::ContentLF, b'\n') => Parser::Chunked(ChunkState::Size),
155                                (ChunkState::EndCR, b'\r') => Parser::Chunked(ChunkState::EndLF),
156                                (ChunkState::EndLF, b'\n') => Parser::Done,
157                                (_, _) => return err_kind(io::ErrorKind::InvalidData),
158                            }
159                        }
160                    }
161                    continue;
162                }
163            };
164            return match Pin::new(&mut *transport).poll_read(cx, &mut buf[0..max_read_size]) {
165                Poll::Ready(Err(err)) => {
166                    self.parser_state = Parser::Failed;
167                    Poll::Ready(Err(err))
168                }
169                Poll::Ready(Ok(0)) => {
170                    self.parser_state = Parser::Failed;
171                    err_kind(io::ErrorKind::UnexpectedEof)
172                }
173                Poll::Ready(Ok(n)) => {
174                    self.remaining -= n as u64;
175                    if self.remaining == 0 {
176                        self.parser_state = match self.parser_state {
177                            Parser::FixedLength => Parser::Done,
178                            Parser::Chunked(ChunkState::Content) => {
179                                Parser::Chunked(ChunkState::ContentCR)
180                            }
181                            _ => unreachable!(),
182                        }
183                    }
184                    Poll::Ready(Ok(n))
185                }
186                Poll::Pending => Poll::Pending,
187            };
188        }
189    }
190}