rust_rcs_core/http/
decode.rs

1// Copyright 2023 宋昊文
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15extern crate futures;
16extern crate httparse;
17extern crate tokio;
18
19use std::fmt;
20use std::io;
21use std::pin::Pin;
22use std::task::{Context, Poll};
23
24use futures::future::Future;
25
26use httparse::{Response as ResponseParser, Status};
27
28use tokio::io::{AsyncRead, ReadBuf, ReadHalf};
29
30use crate::ffi::log::platform_log;
31
32use crate::io::network::stream::ClientStream;
33
34use super::response::Response;
35
36const LOG_TAG: &str = "http_decode";
37
38pub enum HeaderPartDecodeStatus {
39    Success(Response),
40    Again,
41    BufferTooSmall,
42    EOF,
43}
44
45pub struct HeaderPartDecoder<'a, 'b> {
46    buf: &'a mut ReadBuf<'b>,
47    consumed: &'a mut usize,
48    rh: &'a mut ReadHalf<ClientStream>,
49}
50
51impl<'a, 'b> HeaderPartDecoder<'a, 'b> {
52    pub fn new(
53        buf: &'a mut ReadBuf<'b>,
54        consumed: &'a mut usize,
55        rh: &'a mut ReadHalf<ClientStream>,
56    ) -> HeaderPartDecoder<'a, 'b> {
57        HeaderPartDecoder { buf, consumed, rh }
58    }
59}
60
61impl Future for HeaderPartDecoder<'_, '_> {
62    type Output = Result<HeaderPartDecodeStatus>;
63    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
64        platform_log(LOG_TAG, "HeaderPartDecoder poll()");
65
66        let decoder = self.get_mut();
67        if decoder.buf.filled().len() == *decoder.consumed && *decoder.consumed != 0 {
68            platform_log(LOG_TAG, "making space for poll_read");
69            decoder.buf.clear();
70            *decoder.consumed = 0;
71        }
72
73        let before_read = decoder.buf.filled().len();
74
75        match Pin::new(&mut decoder.rh).poll_read(cx, &mut decoder.buf) {
76            Poll::Ready(Ok(())) => {
77                platform_log(
78                    LOG_TAG,
79                    format!(
80                        "poll_read success with new data range {}-{}",
81                        *decoder.consumed,
82                        decoder.buf.filled().len()
83                    ),
84                );
85
86                let after_read = decoder.buf.filled().len();
87
88                let pending_data = &decoder.buf.filled()[*decoder.consumed..];
89                let mut headers = [httparse::EMPTY_HEADER; 16];
90                let mut parser = ResponseParser::new(&mut headers);
91                if let Ok(status) = parser.parse(pending_data) {
92                    match status {
93                        Status::Partial => {
94                            platform_log(LOG_TAG, "on partial http header");
95
96                            if before_read == after_read {
97                                platform_log(LOG_TAG, "no more data");
98                                return Poll::Ready(Ok(HeaderPartDecodeStatus::EOF));
99                            }
100
101                            if decoder.buf.remaining() > 0 {
102                                Poll::Ready(Ok(HeaderPartDecodeStatus::Again))
103                            } else {
104                                Poll::Ready(Ok(HeaderPartDecodeStatus::BufferTooSmall))
105                            }
106                        }
107
108                        Status::Complete(size) => {
109                            platform_log(LOG_TAG, "on complete http header");
110                            if let Some(resp) = Response::from(&parser) {
111                                *decoder.consumed += size;
112
113                                Poll::Ready(Ok(HeaderPartDecodeStatus::Success(resp)))
114                            } else {
115                                Poll::Ready(Err(ErrorKind::Parse))
116                            }
117                        }
118                    }
119                } else {
120                    Poll::Ready(Err(ErrorKind::Parse))
121                }
122            }
123
124            Poll::Ready(Err(e)) => Poll::Ready(Err(ErrorKind::Io(e))),
125
126            Poll::Pending => Poll::Pending,
127        }
128    }
129}
130
131pub enum ChunkDecodeResult {
132    Part(Vec<u8>),
133    Again,
134    BufferTooSmall,
135    EOF,
136}
137
138pub struct ChunkDecoder<'a, 'b> {
139    buf: &'a mut ReadBuf<'b>,
140    consumed: &'a mut usize,
141    rh: &'a mut ReadHalf<ClientStream>,
142}
143
144impl<'a, 'b> ChunkDecoder<'a, 'b> {
145    pub fn new(
146        buf: &'a mut ReadBuf<'b>,
147        consumed: &'a mut usize,
148        rh: &'a mut ReadHalf<ClientStream>,
149    ) -> ChunkDecoder<'a, 'b> {
150        ChunkDecoder { buf, consumed, rh }
151    }
152}
153
154impl Future for ChunkDecoder<'_, '_> {
155    type Output = Result<ChunkDecodeResult>;
156    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
157        platform_log(LOG_TAG, "ChunkDecoder poll()");
158
159        let decoder: &mut ChunkDecoder<'_, '_> = self.get_mut();
160        if decoder.buf.filled().len() == *decoder.consumed && *decoder.consumed != 0 {
161            platform_log(LOG_TAG, "making space for poll_read");
162            decoder.buf.clear();
163            *decoder.consumed = 0;
164        }
165
166        let pending_data = &decoder.buf.filled()[*decoder.consumed..];
167
168        match httparse::parse_chunk_size(pending_data) {
169            Ok(status) => match status {
170                Status::Complete((index, size)) => {
171                    platform_log(LOG_TAG, format!("on complete chunk of size {}", size));
172
173                    if size == 0 {
174                        *decoder.consumed += index;
175                        *decoder.consumed += 2;
176
177                        Poll::Ready(Ok(ChunkDecodeResult::EOF))
178                    } else {
179                        if let Ok(size) = size.try_into() {
180                            let pending_data = &decoder.buf.filled()[*decoder.consumed + index..];
181
182                            platform_log(
183                                LOG_TAG,
184                                format!("data currently available is {} bytes", pending_data.len()),
185                            );
186
187                            if pending_data.len() >= size {
188                                let data = pending_data[..size].to_vec();
189
190                                *decoder.consumed += index;
191                                *decoder.consumed += data.len();
192                                *decoder.consumed += 2;
193
194                                Poll::Ready(Ok(ChunkDecodeResult::Part(data)))
195                            } else {
196                                if decoder.buf.remaining() == 0 {
197                                    Poll::Ready(Ok(ChunkDecodeResult::BufferTooSmall))
198                                } else {
199                                    let before_read = decoder.buf.filled().len();
200
201                                    match Pin::new(&mut decoder.rh).poll_read(cx, &mut decoder.buf)
202                                    {
203                                        Poll::Ready(Ok(())) => {
204                                            platform_log(
205                                                LOG_TAG,
206                                                format!(
207                                                    "poll_read success with new data range {}-{}",
208                                                    *decoder.consumed,
209                                                    decoder.buf.filled().len()
210                                                ),
211                                            );
212
213                                            let after_read = decoder.buf.filled().len();
214
215                                            if before_read == after_read {
216                                                platform_log(LOG_TAG, "no more data");
217                                                Poll::Ready(Ok(ChunkDecodeResult::EOF))
218                                            } else {
219                                                Poll::Ready(Ok(ChunkDecodeResult::Again))
220                                            }
221                                        }
222
223                                        Poll::Ready(Err(e)) => Poll::Ready(Err(ErrorKind::Io(e))),
224
225                                        Poll::Pending => Poll::Pending,
226                                    }
227                                }
228                            }
229                        } else {
230                            Poll::Ready(Err(ErrorKind::Parse))
231                        }
232                    }
233                }
234
235                Status::Partial => {
236                    platform_log(LOG_TAG, "on partial chunk");
237
238                    if decoder.buf.remaining() == 0 {
239                        Poll::Ready(Ok(ChunkDecodeResult::BufferTooSmall))
240                    } else {
241                        let before_read = decoder.buf.filled().len();
242
243                        match Pin::new(&mut decoder.rh).poll_read(cx, &mut decoder.buf) {
244                            Poll::Ready(Ok(())) => {
245                                platform_log(
246                                    LOG_TAG,
247                                    format!(
248                                        "poll_read success with new data range {}-{}",
249                                        *decoder.consumed,
250                                        decoder.buf.filled().len()
251                                    ),
252                                );
253
254                                let after_read = decoder.buf.filled().len();
255
256                                if before_read == after_read {
257                                    platform_log(LOG_TAG, "no more data");
258                                    Poll::Ready(Ok(ChunkDecodeResult::EOF))
259                                } else {
260                                    Poll::Ready(Ok(ChunkDecodeResult::Again))
261                                }
262                            }
263
264                            Poll::Ready(Err(e)) => Poll::Ready(Err(ErrorKind::Io(e))),
265
266                            Poll::Pending => Poll::Pending,
267                        }
268                    }
269                }
270            },
271
272            Err(_) => Poll::Ready(Err(ErrorKind::Parse)),
273        }
274    }
275}
276
277pub enum ErrorKind {
278    Io(io::Error),
279    Parse,
280}
281
282impl Clone for ErrorKind {
283    fn clone(&self) -> ErrorKind {
284        match self {
285            ErrorKind::Io(e) => ErrorKind::Io(io::Error::from(e.kind())),
286
287            ErrorKind::Parse => ErrorKind::Parse,
288        }
289    }
290}
291
292impl fmt::Debug for ErrorKind {
293    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294        match self {
295            ErrorKind::Io(e) => {
296                write!(f, "Io error {:?}", e)
297            }
298
299            ErrorKind::Parse => {
300                write!(f, "Parse")
301            }
302        }
303    }
304}
305
306pub type Result<T> = std::result::Result<T, ErrorKind>;