1use crate::body::common::length_from_headers;
2use futures::prelude::*;
3use std::borrow::BorrowMut;
4use std::io;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8pub struct BodyDecode<T: BorrowMut<BodyDecodeState> + Unpin, IO: AsyncRead + Unpin> {
9 transport: IO,
10 state: T,
11}
12
13impl<IO: AsyncRead + Unpin> BodyDecode<BodyDecodeState, IO> {
14 pub fn new(transport: IO, length: Option<u64>) -> Self {
15 BodyDecodeState::new(length).into_async_read(transport)
16 }
17 pub fn from_headers(headers: &http::header::HeaderMap, transport: IO) -> anyhow::Result<Self> {
18 Ok(BodyDecodeState::from_headers(headers)?.into_async_read(transport))
19 }
20}
21
22impl<T: BorrowMut<BodyDecodeState> + Unpin, IO: AsyncRead + Unpin> BodyDecode<T, IO> {
23 pub fn into_inner(self) -> (T, IO) {
24 (self.state, self.transport)
25 }
26}
27
28impl<T: BorrowMut<BodyDecodeState> + Unpin, IO: AsyncRead + Unpin> AsyncRead for BodyDecode<T, IO> {
29 fn poll_read(
30 self: Pin<&mut Self>,
31 cx: &mut Context<'_>,
32 buf: &mut [u8],
33 ) -> Poll<io::Result<usize>> {
34 let this = self.get_mut();
35 this.state
36 .borrow_mut()
37 .poll_read(&mut this.transport, cx, buf)
38 }
39}
40
41pub struct BodyDecodeState {
42 parser_state: Parser,
43 _compression_state: (),
44 remaining: u64,
45}
46
47#[derive(Copy, Clone, Debug, Eq, PartialEq)]
48enum Parser {
49 FixedLength,
50 Chunked(ChunkState),
51 Failed,
52 Done,
53}
54
55#[derive(Copy, Clone, Debug, Eq, PartialEq)]
56enum ChunkState {
57 Size,
58 SizeLF,
59 Content,
60 ContentCR,
61 ContentLF,
62 EndCR,
63 EndLF,
64}
65
66fn err_kind<T>(kind: io::ErrorKind) -> Poll<io::Result<T>> {
67 Poll::Ready(Err(kind.into()))
68}
69
70impl BodyDecodeState {
71 pub fn from_headers(headers: &http::header::HeaderMap) -> anyhow::Result<Self> {
72 Ok(Self::new(length_from_headers(headers)?))
73 }
74 pub fn new(length: Option<u64>) -> Self {
75 let (parser_state, remaining) = match length {
76 Some(0) => (Parser::Done, 0),
77 Some(length) => (Parser::FixedLength, length),
78 None => (Parser::Chunked(ChunkState::Size), 0),
79 };
80 Self {
81 parser_state,
82 _compression_state: (),
83 remaining,
84 }
85 }
86 pub fn into_async_read<IO: AsyncRead + Unpin>(self, transport: IO) -> BodyDecode<Self, IO> {
87 BodyDecode {
88 transport,
89 state: self,
90 }
91 }
92 pub fn as_async_read<IO: AsyncRead + Unpin>(
93 &mut self,
94 transport: IO,
95 ) -> BodyDecode<&mut Self, IO> {
96 BodyDecode {
97 transport,
98 state: self,
99 }
100 }
101 pub fn poll_read<IO: AsyncRead + Unpin>(
102 &mut self,
103 transport: &mut IO,
104 cx: &mut Context<'_>,
105 buf: &mut [u8],
106 ) -> Poll<io::Result<usize>> {
107 loop {
108 let max_read_size = match self.parser_state {
109 Parser::Failed => return err_kind(io::ErrorKind::BrokenPipe),
110 Parser::Done => return Poll::Ready(Ok(0)),
111 Parser::FixedLength | Parser::Chunked(ChunkState::Content) => {
112 if buf.len() as u64 > self.remaining {
113 self.remaining as usize
114 } else {
115 buf.len()
116 }
117 }
118 Parser::Chunked(chunked_state) => {
119 let mut next = [0u8];
120 match Pin::new(&mut *transport).poll_read(cx, &mut next) {
121 Poll::Ready(Err(err)) => {
122 self.parser_state = Parser::Failed;
123 return Poll::Ready(Err(err));
124 }
125 Poll::Pending => return Poll::Pending,
126 Poll::Ready(Ok(0)) => {
127 self.parser_state = Parser::Failed;
128 return err_kind(io::ErrorKind::UnexpectedEof);
129 }
130 Poll::Ready(Ok(_)) => {
131 self.parser_state = match (chunked_state, next[0]) {
132 (ChunkState::Size, b'\r') => Parser::Chunked(ChunkState::SizeLF),
133 (ChunkState::Size, hex_digit) => {
134 self.remaining *= 16;
135 self.remaining += match hex_digit {
136 b'0'..=b'9' => 0 + hex_digit - b'0',
137 b'a'..=b'f' => 10 + hex_digit - b'a',
138 b'A'..=b'F' => 10 + hex_digit - b'A',
139 _ => {
140 self.parser_state = Parser::Failed;
141 return err_kind(io::ErrorKind::InvalidData);
142 }
143 } as u64;
144 Parser::Chunked(ChunkState::Size)
145 }
146 (ChunkState::SizeLF, b'\n') => match self.remaining {
147 0 => Parser::Chunked(ChunkState::EndCR),
148 _ => Parser::Chunked(ChunkState::Content),
149 },
150 (ChunkState::Content, _) => unreachable!(),
151 (ChunkState::ContentCR, b'\r') => {
152 Parser::Chunked(ChunkState::ContentLF)
153 }
154 (ChunkState::ContentLF, b'\n') => Parser::Chunked(ChunkState::Size),
155 (ChunkState::EndCR, b'\r') => Parser::Chunked(ChunkState::EndLF),
156 (ChunkState::EndLF, b'\n') => Parser::Done,
157 (_, _) => return err_kind(io::ErrorKind::InvalidData),
158 }
159 }
160 }
161 continue;
162 }
163 };
164 return match Pin::new(&mut *transport).poll_read(cx, &mut buf[0..max_read_size]) {
165 Poll::Ready(Err(err)) => {
166 self.parser_state = Parser::Failed;
167 Poll::Ready(Err(err))
168 }
169 Poll::Ready(Ok(0)) => {
170 self.parser_state = Parser::Failed;
171 err_kind(io::ErrorKind::UnexpectedEof)
172 }
173 Poll::Ready(Ok(n)) => {
174 self.remaining -= n as u64;
175 if self.remaining == 0 {
176 self.parser_state = match self.parser_state {
177 Parser::FixedLength => Parser::Done,
178 Parser::Chunked(ChunkState::Content) => {
179 Parser::Chunked(ChunkState::ContentCR)
180 }
181 _ => unreachable!(),
182 }
183 }
184 Poll::Ready(Ok(n))
185 }
186 Poll::Pending => Poll::Pending,
187 };
188 }
189 }
190}