1extern crate futures;
16extern crate httparse;
17extern crate tokio;
18
19use std::fmt;
20use std::io;
21use std::pin::Pin;
22use std::task::{Context, Poll};
23
24use futures::future::Future;
25
26use httparse::{Response as ResponseParser, Status};
27
28use tokio::io::{AsyncRead, ReadBuf, ReadHalf};
29
30use crate::ffi::log::platform_log;
31
32use crate::io::network::stream::ClientStream;
33
34use super::response::Response;
35
36const LOG_TAG: &str = "http_decode";
37
38pub enum HeaderPartDecodeStatus {
39 Success(Response),
40 Again,
41 BufferTooSmall,
42 EOF,
43}
44
45pub struct HeaderPartDecoder<'a, 'b> {
46 buf: &'a mut ReadBuf<'b>,
47 consumed: &'a mut usize,
48 rh: &'a mut ReadHalf<ClientStream>,
49}
50
51impl<'a, 'b> HeaderPartDecoder<'a, 'b> {
52 pub fn new(
53 buf: &'a mut ReadBuf<'b>,
54 consumed: &'a mut usize,
55 rh: &'a mut ReadHalf<ClientStream>,
56 ) -> HeaderPartDecoder<'a, 'b> {
57 HeaderPartDecoder { buf, consumed, rh }
58 }
59}
60
61impl Future for HeaderPartDecoder<'_, '_> {
62 type Output = Result<HeaderPartDecodeStatus>;
63 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
64 platform_log(LOG_TAG, "HeaderPartDecoder poll()");
65
66 let decoder = self.get_mut();
67 if decoder.buf.filled().len() == *decoder.consumed && *decoder.consumed != 0 {
68 platform_log(LOG_TAG, "making space for poll_read");
69 decoder.buf.clear();
70 *decoder.consumed = 0;
71 }
72
73 let before_read = decoder.buf.filled().len();
74
75 match Pin::new(&mut decoder.rh).poll_read(cx, &mut decoder.buf) {
76 Poll::Ready(Ok(())) => {
77 platform_log(
78 LOG_TAG,
79 format!(
80 "poll_read success with new data range {}-{}",
81 *decoder.consumed,
82 decoder.buf.filled().len()
83 ),
84 );
85
86 let after_read = decoder.buf.filled().len();
87
88 let pending_data = &decoder.buf.filled()[*decoder.consumed..];
89 let mut headers = [httparse::EMPTY_HEADER; 16];
90 let mut parser = ResponseParser::new(&mut headers);
91 if let Ok(status) = parser.parse(pending_data) {
92 match status {
93 Status::Partial => {
94 platform_log(LOG_TAG, "on partial http header");
95
96 if before_read == after_read {
97 platform_log(LOG_TAG, "no more data");
98 return Poll::Ready(Ok(HeaderPartDecodeStatus::EOF));
99 }
100
101 if decoder.buf.remaining() > 0 {
102 Poll::Ready(Ok(HeaderPartDecodeStatus::Again))
103 } else {
104 Poll::Ready(Ok(HeaderPartDecodeStatus::BufferTooSmall))
105 }
106 }
107
108 Status::Complete(size) => {
109 platform_log(LOG_TAG, "on complete http header");
110 if let Some(resp) = Response::from(&parser) {
111 *decoder.consumed += size;
112
113 Poll::Ready(Ok(HeaderPartDecodeStatus::Success(resp)))
114 } else {
115 Poll::Ready(Err(ErrorKind::Parse))
116 }
117 }
118 }
119 } else {
120 Poll::Ready(Err(ErrorKind::Parse))
121 }
122 }
123
124 Poll::Ready(Err(e)) => Poll::Ready(Err(ErrorKind::Io(e))),
125
126 Poll::Pending => Poll::Pending,
127 }
128 }
129}
130
131pub enum ChunkDecodeResult {
132 Part(Vec<u8>),
133 Again,
134 BufferTooSmall,
135 EOF,
136}
137
138pub struct ChunkDecoder<'a, 'b> {
139 buf: &'a mut ReadBuf<'b>,
140 consumed: &'a mut usize,
141 rh: &'a mut ReadHalf<ClientStream>,
142}
143
144impl<'a, 'b> ChunkDecoder<'a, 'b> {
145 pub fn new(
146 buf: &'a mut ReadBuf<'b>,
147 consumed: &'a mut usize,
148 rh: &'a mut ReadHalf<ClientStream>,
149 ) -> ChunkDecoder<'a, 'b> {
150 ChunkDecoder { buf, consumed, rh }
151 }
152}
153
154impl Future for ChunkDecoder<'_, '_> {
155 type Output = Result<ChunkDecodeResult>;
156 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
157 platform_log(LOG_TAG, "ChunkDecoder poll()");
158
159 let decoder: &mut ChunkDecoder<'_, '_> = self.get_mut();
160 if decoder.buf.filled().len() == *decoder.consumed && *decoder.consumed != 0 {
161 platform_log(LOG_TAG, "making space for poll_read");
162 decoder.buf.clear();
163 *decoder.consumed = 0;
164 }
165
166 let pending_data = &decoder.buf.filled()[*decoder.consumed..];
167
168 match httparse::parse_chunk_size(pending_data) {
169 Ok(status) => match status {
170 Status::Complete((index, size)) => {
171 platform_log(LOG_TAG, format!("on complete chunk of size {}", size));
172
173 if size == 0 {
174 *decoder.consumed += index;
175 *decoder.consumed += 2;
176
177 Poll::Ready(Ok(ChunkDecodeResult::EOF))
178 } else {
179 if let Ok(size) = size.try_into() {
180 let pending_data = &decoder.buf.filled()[*decoder.consumed + index..];
181
182 platform_log(
183 LOG_TAG,
184 format!("data currently available is {} bytes", pending_data.len()),
185 );
186
187 if pending_data.len() >= size {
188 let data = pending_data[..size].to_vec();
189
190 *decoder.consumed += index;
191 *decoder.consumed += data.len();
192 *decoder.consumed += 2;
193
194 Poll::Ready(Ok(ChunkDecodeResult::Part(data)))
195 } else {
196 if decoder.buf.remaining() == 0 {
197 Poll::Ready(Ok(ChunkDecodeResult::BufferTooSmall))
198 } else {
199 let before_read = decoder.buf.filled().len();
200
201 match Pin::new(&mut decoder.rh).poll_read(cx, &mut decoder.buf)
202 {
203 Poll::Ready(Ok(())) => {
204 platform_log(
205 LOG_TAG,
206 format!(
207 "poll_read success with new data range {}-{}",
208 *decoder.consumed,
209 decoder.buf.filled().len()
210 ),
211 );
212
213 let after_read = decoder.buf.filled().len();
214
215 if before_read == after_read {
216 platform_log(LOG_TAG, "no more data");
217 Poll::Ready(Ok(ChunkDecodeResult::EOF))
218 } else {
219 Poll::Ready(Ok(ChunkDecodeResult::Again))
220 }
221 }
222
223 Poll::Ready(Err(e)) => Poll::Ready(Err(ErrorKind::Io(e))),
224
225 Poll::Pending => Poll::Pending,
226 }
227 }
228 }
229 } else {
230 Poll::Ready(Err(ErrorKind::Parse))
231 }
232 }
233 }
234
235 Status::Partial => {
236 platform_log(LOG_TAG, "on partial chunk");
237
238 if decoder.buf.remaining() == 0 {
239 Poll::Ready(Ok(ChunkDecodeResult::BufferTooSmall))
240 } else {
241 let before_read = decoder.buf.filled().len();
242
243 match Pin::new(&mut decoder.rh).poll_read(cx, &mut decoder.buf) {
244 Poll::Ready(Ok(())) => {
245 platform_log(
246 LOG_TAG,
247 format!(
248 "poll_read success with new data range {}-{}",
249 *decoder.consumed,
250 decoder.buf.filled().len()
251 ),
252 );
253
254 let after_read = decoder.buf.filled().len();
255
256 if before_read == after_read {
257 platform_log(LOG_TAG, "no more data");
258 Poll::Ready(Ok(ChunkDecodeResult::EOF))
259 } else {
260 Poll::Ready(Ok(ChunkDecodeResult::Again))
261 }
262 }
263
264 Poll::Ready(Err(e)) => Poll::Ready(Err(ErrorKind::Io(e))),
265
266 Poll::Pending => Poll::Pending,
267 }
268 }
269 }
270 },
271
272 Err(_) => Poll::Ready(Err(ErrorKind::Parse)),
273 }
274 }
275}
276
277pub enum ErrorKind {
278 Io(io::Error),
279 Parse,
280}
281
282impl Clone for ErrorKind {
283 fn clone(&self) -> ErrorKind {
284 match self {
285 ErrorKind::Io(e) => ErrorKind::Io(io::Error::from(e.kind())),
286
287 ErrorKind::Parse => ErrorKind::Parse,
288 }
289 }
290}
291
292impl fmt::Debug for ErrorKind {
293 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294 match self {
295 ErrorKind::Io(e) => {
296 write!(f, "Io error {:?}", e)
297 }
298
299 ErrorKind::Parse => {
300 write!(f, "Parse")
301 }
302 }
303 }
304}
305
306pub type Result<T> = std::result::Result<T, ErrorKind>;