async_http1_lite/
decoder.rs

1use core::{
2    cmp::min,
3    ops::{Deref, DerefMut},
4    time::Duration,
5};
6use std::io::{BufReader, Error as IoError, ErrorKind as IoErrorKind};
7
8use async_sleep::{rw::AsyncReadWithTimeoutExt as _, Sleepble};
9use async_trait::async_trait;
10use futures_io::AsyncRead;
11use http::{Request, Response, Version};
12use http1_spec::{
13    body_framing::{BodyFraming, BodyFramingDetector},
14    body_parser::{BodyParseOutput, BodyParser},
15    content_length_body_parser::ContentLengthBodyParser,
16    head_parser::{HeadParseConfig, HeadParseOutput, HeadParser},
17    request_head_parser::RequestHeadParser,
18    response_head_parser::ResponseHeadParser,
19    ReasonPhrase,
20};
21
22use crate::{body::DecoderBody, stream::Http1StreamDecoder};
23
24//
25//
26//
27pub struct Http1Decoder<HP>
28where
29    HP: HeadParser,
30{
31    head_parser: HP,
32    content_length_body_parser: ContentLengthBodyParser,
33    buf: Vec<u8>,
34    offset_read: usize,
35    offset_parsed: usize,
36    read_timeout: Duration,
37    state: State,
38    require_read: bool,
39}
40#[derive(Debug, PartialEq, Eq)]
41enum State {
42    Idle,
43    ReadingHead,
44    ReadBody(BodyFraming),
45}
46impl Default for State {
47    fn default() -> Self {
48        Self::Idle
49    }
50}
51impl<HP> Http1Decoder<HP>
52where
53    HP: HeadParser,
54{
55    //
56    fn new(buf_capacity: usize, config: Option<HeadParseConfig>) -> Self {
57        Self {
58            head_parser: HP::with_config(config.unwrap_or_default()),
59            content_length_body_parser: ContentLengthBodyParser::new(),
60            buf: vec![0u8; buf_capacity],
61            offset_read: 0,
62            offset_parsed: 0,
63            read_timeout: Duration::from_secs(5),
64            state: Default::default(),
65            require_read: true,
66        }
67    }
68
69    //
70    fn set_read_timeout(&mut self, dur: Duration) {
71        self.read_timeout = dur;
72    }
73    pub fn has_unparsed_bytes(&self) -> bool {
74        self.offset_read > self.offset_parsed
75    }
76
77    //
78    async fn read<S: AsyncRead + Unpin, SLEEP: Sleepble>(
79        &mut self,
80        stream: &mut S,
81    ) -> Result<(), IoError> {
82        if !self.require_read {
83            return Ok(());
84        }
85
86        //
87        if self.offset_read >= self.buf.len() {
88            return Err(IoError::new(IoErrorKind::InvalidInput, "override buf"));
89        }
90
91        //
92        let n_read = match stream
93            .read_with_timeout::<SLEEP>(&mut self.buf[self.offset_read..], self.read_timeout)
94            .await
95        {
96            Ok(n) if n == 0 => return Err(IoError::new(IoErrorKind::UnexpectedEof, "read 0")),
97            Ok(n) => n,
98            Err(err) => return Err(err),
99        };
100        self.offset_read += n_read;
101        Ok(())
102    }
103
104    fn rotate_offset(&mut self) {
105        let n = self.offset_parsed;
106        self.buf.rotate_left(n);
107        self.offset_read -= n;
108        self.offset_parsed = 0;
109    }
110
111    async fn read_head0<S: AsyncRead + Unpin, SLEEP: Sleepble>(
112        &mut self,
113        stream: &mut S,
114    ) -> Result<BodyFraming, IoError> {
115        if self.state == State::Idle {
116            self.rotate_offset();
117        }
118
119        let body_framing = loop {
120            self.read::<_, SLEEP>(stream).await?;
121
122            let mut buf_reader = BufReader::new(&self.buf[self.offset_parsed..self.offset_read]);
123
124            match self.head_parser.parse(&mut buf_reader) {
125                Ok(HeadParseOutput::Completed(n_parsed)) => {
126                    self.offset_parsed += n_parsed;
127                    if self.offset_parsed == self.offset_read {
128                        self.require_read = true;
129                    } else {
130                        self.require_read = false;
131                    }
132
133                    let headers = self.head_parser.get_headers();
134                    let version = self.head_parser.get_version();
135
136                    let body_framing = (headers, version).detect()?;
137                    match &body_framing {
138                        BodyFraming::Neither => {
139                            self.state = State::Idle;
140                        }
141                        BodyFraming::ContentLength(n) => {
142                            if n == &0 {
143                                self.state = State::Idle;
144                            } else {
145                                self.state = State::ReadBody(body_framing.clone());
146                            }
147                        }
148                        BodyFraming::Chunked => {
149                            if version != &Version::HTTP_11 {
150                                return Err(IoError::new(
151                                    IoErrorKind::InvalidInput,
152                                    "Only valid in HTTP/1.1",
153                                ));
154                            }
155                            return Err(IoError::new(
156                                IoErrorKind::InvalidInput,
157                                "unimplemented now",
158                            ));
159                        }
160                    }
161
162                    break body_framing;
163                }
164                Ok(HeadParseOutput::Partial(n_parsed)) => {
165                    self.offset_parsed += n_parsed;
166                    self.require_read = true;
167
168                    self.state = State::ReadingHead;
169
170                    continue;
171                }
172                Err(err) => return Err(err.into()),
173            }
174        };
175
176        Ok(body_framing)
177    }
178
179    async fn read_body0<S: AsyncRead + Unpin, SLEEP: Sleepble>(
180        &mut self,
181        stream: &mut S,
182    ) -> Result<DecoderBody, IoError> {
183        #[allow(clippy::single_match)]
184        match self.state {
185            State::ReadBody(_) => {
186                self.read::<_, SLEEP>(stream).await?;
187            }
188            _ => {}
189        }
190
191        match &mut self.state {
192            State::Idle => Ok(DecoderBody::Completed(Vec::<u8>::new())),
193            State::ReadingHead => Err(IoError::new(IoErrorKind::Other, "state should is ReadBody")),
194            State::ReadBody(body_framing) => match body_framing.clone() {
195                BodyFraming::Neither => unreachable!(),
196                BodyFraming::ContentLength(content_length) => {
197                    debug_assert!(content_length > 0);
198
199                    self.content_length_body_parser.set_length(content_length);
200                    let mut buf_reader =
201                        BufReader::new(&self.buf[self.offset_parsed..self.offset_read]);
202                    let mut body_buf =
203                        vec![0u8; min(self.offset_read - self.offset_parsed, content_length)];
204                    match self
205                        .content_length_body_parser
206                        .parse(&mut buf_reader, &mut body_buf)
207                    {
208                        Ok(BodyParseOutput::Completed(n_parsed)) => {
209                            self.offset_parsed += n_parsed;
210                            if self.offset_parsed == self.offset_read {
211                                self.require_read = true;
212                            } else {
213                                self.require_read = false;
214                            }
215
216                            self.state = State::Idle;
217
218                            Ok(DecoderBody::Completed(body_buf))
219                        }
220                        Ok(BodyParseOutput::Partial(n_parsed)) => {
221                            self.offset_parsed += n_parsed;
222                            if self.offset_parsed == self.offset_read {
223                                self.require_read = true;
224                            } else {
225                                self.require_read = false;
226                            }
227
228                            body_framing.update_content_length_value(content_length - n_parsed)?;
229
230                            Ok(DecoderBody::Partial(body_buf))
231                        }
232                        Err(err) => Err(err.into()),
233                    }
234                }
235                BodyFraming::Chunked => {
236                    Err(IoError::new(IoErrorKind::InvalidInput, "unimplemented now"))
237                }
238            },
239        }
240    }
241}
242
243//
244//
245//
246pub type Http1RequestDecoderInner = Http1Decoder<RequestHeadParser>;
247pub struct Http1RequestDecoder {
248    inner: Http1RequestDecoderInner,
249}
250impl Deref for Http1RequestDecoder {
251    type Target = Http1RequestDecoderInner;
252
253    fn deref(&self) -> &Http1RequestDecoderInner {
254        &self.inner
255    }
256}
257impl DerefMut for Http1RequestDecoder {
258    fn deref_mut(&mut self) -> &mut Http1RequestDecoderInner {
259        &mut self.inner
260    }
261}
262impl Http1RequestDecoder {
263    pub fn new(buf_capacity: usize, config: Option<HeadParseConfig>) -> Self {
264        Self {
265            inner: Http1RequestDecoderInner::new(buf_capacity, config),
266        }
267    }
268}
269
270#[async_trait]
271impl<S, SLEEP> Http1StreamDecoder<S, SLEEP, Request<()>> for Http1RequestDecoder
272where
273    S: AsyncRead + Unpin + Send,
274    SLEEP: Sleepble,
275{
276    async fn read_head(&mut self, stream: &mut S) -> Result<(Request<()>, BodyFraming), IoError> {
277        let body_framing = self.read_head0::<_, SLEEP>(stream).await?;
278
279        let mut request = Request::new(());
280        *request.method_mut() = self.inner.head_parser.method.to_owned();
281        *request.uri_mut() = self.inner.head_parser.uri.to_owned();
282        *request.version_mut() = self.inner.head_parser.http_version.to_owned();
283        *request.headers_mut() = self.inner.head_parser.headers.to_owned();
284
285        Ok((request, body_framing))
286    }
287    async fn read_body(&mut self, stream: &mut S) -> Result<DecoderBody, IoError> {
288        self.read_body0::<_, SLEEP>(stream).await
289    }
290
291    fn set_read_timeout(&mut self, dur: Duration) {
292        self.inner.set_read_timeout(dur)
293    }
294}
295
296//
297//
298//
299pub type Http1ResponseDecoderInner = Http1Decoder<ResponseHeadParser>;
300pub struct Http1ResponseDecoder {
301    inner: Http1ResponseDecoderInner,
302}
303impl Deref for Http1ResponseDecoder {
304    type Target = Http1ResponseDecoderInner;
305
306    fn deref(&self) -> &Http1ResponseDecoderInner {
307        &self.inner
308    }
309}
310impl DerefMut for Http1ResponseDecoder {
311    fn deref_mut(&mut self) -> &mut Http1ResponseDecoderInner {
312        &mut self.inner
313    }
314}
315impl Http1ResponseDecoder {
316    pub fn new(buf_capacity: usize, config: Option<HeadParseConfig>) -> Self {
317        Self {
318            inner: Http1ResponseDecoderInner::new(buf_capacity, config),
319        }
320    }
321}
322
323#[async_trait]
324impl<S, SLEEP> Http1StreamDecoder<S, SLEEP, (Response<()>, ReasonPhrase)> for Http1ResponseDecoder
325where
326    S: AsyncRead + Unpin + Send,
327    SLEEP: Sleepble,
328{
329    async fn read_head(
330        &mut self,
331        stream: &mut S,
332    ) -> Result<((Response<()>, ReasonPhrase), BodyFraming), IoError> {
333        let body_framing = self.read_head0::<_, SLEEP>(stream).await?;
334
335        let mut response = Response::new(());
336        *response.version_mut() = self.inner.head_parser.http_version.to_owned();
337        *response.status_mut() = self.inner.head_parser.status_code.to_owned();
338        *response.headers_mut() = self.inner.head_parser.headers.to_owned();
339
340        let reason_phrase = self.inner.head_parser.reason_phrase.to_owned();
341
342        Ok(((response, reason_phrase), body_framing))
343    }
344    async fn read_body(&mut self, stream: &mut S) -> Result<DecoderBody, IoError> {
345        self.read_body0::<_, SLEEP>(stream).await
346    }
347
348    fn set_read_timeout(&mut self, dur: Duration) {
349        self.inner.set_read_timeout(dur)
350    }
351}