1use core::{
2 cmp::min,
3 ops::{Deref, DerefMut},
4 time::Duration,
5};
6use std::io::{BufReader, Error as IoError, ErrorKind as IoErrorKind};
7
8use async_sleep::{rw::AsyncReadWithTimeoutExt as _, Sleepble};
9use async_trait::async_trait;
10use futures_io::AsyncRead;
11use http::{Request, Response, Version};
12use http1_spec::{
13 body_framing::{BodyFraming, BodyFramingDetector},
14 body_parser::{BodyParseOutput, BodyParser},
15 content_length_body_parser::ContentLengthBodyParser,
16 head_parser::{HeadParseConfig, HeadParseOutput, HeadParser},
17 request_head_parser::RequestHeadParser,
18 response_head_parser::ResponseHeadParser,
19 ReasonPhrase,
20};
21
22use crate::{body::DecoderBody, stream::Http1StreamDecoder};
23
24pub struct Http1Decoder<HP>
28where
29 HP: HeadParser,
30{
31 head_parser: HP,
32 content_length_body_parser: ContentLengthBodyParser,
33 buf: Vec<u8>,
34 offset_read: usize,
35 offset_parsed: usize,
36 read_timeout: Duration,
37 state: State,
38 require_read: bool,
39}
40#[derive(Debug, PartialEq, Eq)]
41enum State {
42 Idle,
43 ReadingHead,
44 ReadBody(BodyFraming),
45}
46impl Default for State {
47 fn default() -> Self {
48 Self::Idle
49 }
50}
51impl<HP> Http1Decoder<HP>
52where
53 HP: HeadParser,
54{
55 fn new(buf_capacity: usize, config: Option<HeadParseConfig>) -> Self {
57 Self {
58 head_parser: HP::with_config(config.unwrap_or_default()),
59 content_length_body_parser: ContentLengthBodyParser::new(),
60 buf: vec![0u8; buf_capacity],
61 offset_read: 0,
62 offset_parsed: 0,
63 read_timeout: Duration::from_secs(5),
64 state: Default::default(),
65 require_read: true,
66 }
67 }
68
69 fn set_read_timeout(&mut self, dur: Duration) {
71 self.read_timeout = dur;
72 }
73 pub fn has_unparsed_bytes(&self) -> bool {
74 self.offset_read > self.offset_parsed
75 }
76
77 async fn read<S: AsyncRead + Unpin, SLEEP: Sleepble>(
79 &mut self,
80 stream: &mut S,
81 ) -> Result<(), IoError> {
82 if !self.require_read {
83 return Ok(());
84 }
85
86 if self.offset_read >= self.buf.len() {
88 return Err(IoError::new(IoErrorKind::InvalidInput, "override buf"));
89 }
90
91 let n_read = match stream
93 .read_with_timeout::<SLEEP>(&mut self.buf[self.offset_read..], self.read_timeout)
94 .await
95 {
96 Ok(n) if n == 0 => return Err(IoError::new(IoErrorKind::UnexpectedEof, "read 0")),
97 Ok(n) => n,
98 Err(err) => return Err(err),
99 };
100 self.offset_read += n_read;
101 Ok(())
102 }
103
104 fn rotate_offset(&mut self) {
105 let n = self.offset_parsed;
106 self.buf.rotate_left(n);
107 self.offset_read -= n;
108 self.offset_parsed = 0;
109 }
110
111 async fn read_head0<S: AsyncRead + Unpin, SLEEP: Sleepble>(
112 &mut self,
113 stream: &mut S,
114 ) -> Result<BodyFraming, IoError> {
115 if self.state == State::Idle {
116 self.rotate_offset();
117 }
118
119 let body_framing = loop {
120 self.read::<_, SLEEP>(stream).await?;
121
122 let mut buf_reader = BufReader::new(&self.buf[self.offset_parsed..self.offset_read]);
123
124 match self.head_parser.parse(&mut buf_reader) {
125 Ok(HeadParseOutput::Completed(n_parsed)) => {
126 self.offset_parsed += n_parsed;
127 if self.offset_parsed == self.offset_read {
128 self.require_read = true;
129 } else {
130 self.require_read = false;
131 }
132
133 let headers = self.head_parser.get_headers();
134 let version = self.head_parser.get_version();
135
136 let body_framing = (headers, version).detect()?;
137 match &body_framing {
138 BodyFraming::Neither => {
139 self.state = State::Idle;
140 }
141 BodyFraming::ContentLength(n) => {
142 if n == &0 {
143 self.state = State::Idle;
144 } else {
145 self.state = State::ReadBody(body_framing.clone());
146 }
147 }
148 BodyFraming::Chunked => {
149 if version != &Version::HTTP_11 {
150 return Err(IoError::new(
151 IoErrorKind::InvalidInput,
152 "Only valid in HTTP/1.1",
153 ));
154 }
155 return Err(IoError::new(
156 IoErrorKind::InvalidInput,
157 "unimplemented now",
158 ));
159 }
160 }
161
162 break body_framing;
163 }
164 Ok(HeadParseOutput::Partial(n_parsed)) => {
165 self.offset_parsed += n_parsed;
166 self.require_read = true;
167
168 self.state = State::ReadingHead;
169
170 continue;
171 }
172 Err(err) => return Err(err.into()),
173 }
174 };
175
176 Ok(body_framing)
177 }
178
179 async fn read_body0<S: AsyncRead + Unpin, SLEEP: Sleepble>(
180 &mut self,
181 stream: &mut S,
182 ) -> Result<DecoderBody, IoError> {
183 #[allow(clippy::single_match)]
184 match self.state {
185 State::ReadBody(_) => {
186 self.read::<_, SLEEP>(stream).await?;
187 }
188 _ => {}
189 }
190
191 match &mut self.state {
192 State::Idle => Ok(DecoderBody::Completed(Vec::<u8>::new())),
193 State::ReadingHead => Err(IoError::new(IoErrorKind::Other, "state should is ReadBody")),
194 State::ReadBody(body_framing) => match body_framing.clone() {
195 BodyFraming::Neither => unreachable!(),
196 BodyFraming::ContentLength(content_length) => {
197 debug_assert!(content_length > 0);
198
199 self.content_length_body_parser.set_length(content_length);
200 let mut buf_reader =
201 BufReader::new(&self.buf[self.offset_parsed..self.offset_read]);
202 let mut body_buf =
203 vec![0u8; min(self.offset_read - self.offset_parsed, content_length)];
204 match self
205 .content_length_body_parser
206 .parse(&mut buf_reader, &mut body_buf)
207 {
208 Ok(BodyParseOutput::Completed(n_parsed)) => {
209 self.offset_parsed += n_parsed;
210 if self.offset_parsed == self.offset_read {
211 self.require_read = true;
212 } else {
213 self.require_read = false;
214 }
215
216 self.state = State::Idle;
217
218 Ok(DecoderBody::Completed(body_buf))
219 }
220 Ok(BodyParseOutput::Partial(n_parsed)) => {
221 self.offset_parsed += n_parsed;
222 if self.offset_parsed == self.offset_read {
223 self.require_read = true;
224 } else {
225 self.require_read = false;
226 }
227
228 body_framing.update_content_length_value(content_length - n_parsed)?;
229
230 Ok(DecoderBody::Partial(body_buf))
231 }
232 Err(err) => Err(err.into()),
233 }
234 }
235 BodyFraming::Chunked => {
236 Err(IoError::new(IoErrorKind::InvalidInput, "unimplemented now"))
237 }
238 },
239 }
240 }
241}
242
243pub type Http1RequestDecoderInner = Http1Decoder<RequestHeadParser>;
247pub struct Http1RequestDecoder {
248 inner: Http1RequestDecoderInner,
249}
250impl Deref for Http1RequestDecoder {
251 type Target = Http1RequestDecoderInner;
252
253 fn deref(&self) -> &Http1RequestDecoderInner {
254 &self.inner
255 }
256}
257impl DerefMut for Http1RequestDecoder {
258 fn deref_mut(&mut self) -> &mut Http1RequestDecoderInner {
259 &mut self.inner
260 }
261}
262impl Http1RequestDecoder {
263 pub fn new(buf_capacity: usize, config: Option<HeadParseConfig>) -> Self {
264 Self {
265 inner: Http1RequestDecoderInner::new(buf_capacity, config),
266 }
267 }
268}
269
270#[async_trait]
271impl<S, SLEEP> Http1StreamDecoder<S, SLEEP, Request<()>> for Http1RequestDecoder
272where
273 S: AsyncRead + Unpin + Send,
274 SLEEP: Sleepble,
275{
276 async fn read_head(&mut self, stream: &mut S) -> Result<(Request<()>, BodyFraming), IoError> {
277 let body_framing = self.read_head0::<_, SLEEP>(stream).await?;
278
279 let mut request = Request::new(());
280 *request.method_mut() = self.inner.head_parser.method.to_owned();
281 *request.uri_mut() = self.inner.head_parser.uri.to_owned();
282 *request.version_mut() = self.inner.head_parser.http_version.to_owned();
283 *request.headers_mut() = self.inner.head_parser.headers.to_owned();
284
285 Ok((request, body_framing))
286 }
287 async fn read_body(&mut self, stream: &mut S) -> Result<DecoderBody, IoError> {
288 self.read_body0::<_, SLEEP>(stream).await
289 }
290
291 fn set_read_timeout(&mut self, dur: Duration) {
292 self.inner.set_read_timeout(dur)
293 }
294}
295
296pub type Http1ResponseDecoderInner = Http1Decoder<ResponseHeadParser>;
300pub struct Http1ResponseDecoder {
301 inner: Http1ResponseDecoderInner,
302}
303impl Deref for Http1ResponseDecoder {
304 type Target = Http1ResponseDecoderInner;
305
306 fn deref(&self) -> &Http1ResponseDecoderInner {
307 &self.inner
308 }
309}
310impl DerefMut for Http1ResponseDecoder {
311 fn deref_mut(&mut self) -> &mut Http1ResponseDecoderInner {
312 &mut self.inner
313 }
314}
315impl Http1ResponseDecoder {
316 pub fn new(buf_capacity: usize, config: Option<HeadParseConfig>) -> Self {
317 Self {
318 inner: Http1ResponseDecoderInner::new(buf_capacity, config),
319 }
320 }
321}
322
323#[async_trait]
324impl<S, SLEEP> Http1StreamDecoder<S, SLEEP, (Response<()>, ReasonPhrase)> for Http1ResponseDecoder
325where
326 S: AsyncRead + Unpin + Send,
327 SLEEP: Sleepble,
328{
329 async fn read_head(
330 &mut self,
331 stream: &mut S,
332 ) -> Result<((Response<()>, ReasonPhrase), BodyFraming), IoError> {
333 let body_framing = self.read_head0::<_, SLEEP>(stream).await?;
334
335 let mut response = Response::new(());
336 *response.version_mut() = self.inner.head_parser.http_version.to_owned();
337 *response.status_mut() = self.inner.head_parser.status_code.to_owned();
338 *response.headers_mut() = self.inner.head_parser.headers.to_owned();
339
340 let reason_phrase = self.inner.head_parser.reason_phrase.to_owned();
341
342 Ok(((response, reason_phrase), body_framing))
343 }
344 async fn read_body(&mut self, stream: &mut S) -> Result<DecoderBody, IoError> {
345 self.read_body0::<_, SLEEP>(stream).await
346 }
347
348 fn set_read_timeout(&mut self, dur: Duration) {
349 self.inner.set_read_timeout(dur)
350 }
351}