futures_http/
reader.rs

1//! Utilities to parse http packets from a stream of bytes.
2//!
3use std::{future::Future, io};
4
5use crate::{
6    body::{BodyReader, BodyReaderError},
7    read_buf::ReadBuf,
8};
9use bytes::{Bytes, BytesMut};
10use futures::{io::Cursor, AsyncRead, AsyncReadExt};
11use http::{
12    header::{InvalidHeaderName, InvalidHeaderValue},
13    method::InvalidMethod,
14    response::Parts,
15    status::InvalidStatusCode,
16    uri::InvalidUri,
17    HeaderName, HeaderValue, Method, Request, Response, StatusCode, Uri, Version,
18};
19
20/// Variants of parse http packets.
21#[derive(Debug, thiserror::Error)]
22pub enum ParseError {
23    #[error(transparent)]
24    HttpError(#[from] http::Error),
25
26    #[error("Http header parse buf overflow, max={0}")]
27    ParseBufOverflow(usize),
28
29    #[error(transparent)]
30    IoError(#[from] io::Error),
31
32    #[error("Unable to complete http parsing, reached the end of the stream.")]
33    Eof,
34
35    #[error("Miss method field.")]
36    Method,
37
38    #[error(transparent)]
39    InvalidMethod(#[from] InvalidMethod),
40
41    #[error("Miss uri field.")]
42    Uri,
43
44    #[error(transparent)]
45    InvalidUri(#[from] InvalidUri),
46
47    #[error("Invalid http version.")]
48    Version,
49
50    #[error(transparent)]
51    InvalidHeaderName(#[from] InvalidHeaderName),
52
53    #[error(transparent)]
54    InvalidHeaderValue(#[from] InvalidHeaderValue),
55
56    #[error(transparent)]
57    InvalidStatusCode(#[from] InvalidStatusCode),
58
59    #[error(transparent)]
60    SerdeJsonError(#[from] serde_json::Error),
61
62    #[error(transparent)]
63    BodyReaderError(#[from] BodyReaderError),
64}
65
66/// Type alias for parser result.
67pub type ParseResult<T> = Result<T, ParseError>;
68
69impl From<ParseError> for io::Error {
70    fn from(value: ParseError) -> Self {
71        match value {
72            ParseError::IoError(err) => err,
73            _ => io::Error::new(io::ErrorKind::Other, value),
74        }
75    }
76}
77
78/// Http packet parse config.
79#[derive(Debug)]
80pub struct Config {
81    /// The max buf len for parsing http headers.
82    pub parsing_headers_max_buf: usize,
83}
84
85impl Default for Config {
86    fn default() -> Self {
87        Self {
88            parsing_headers_max_buf: 2048,
89        }
90    }
91}
92
93/// Http request packet parser.
94///
95/// In general, please do not create [`Requester`] directly but use
96/// [`parse_request`] or [`parse_request_with`] to parse the stream.
97pub struct Requester<S> {
98    /// http parser config.
99    config: Config,
100
101    /// parser statemachine.
102    state: RequestParseState,
103
104    /// The stream from which the request parser read bytes.
105    stream: S,
106
107    /// the http `request` object builder.
108    builder: Option<http::request::Builder>,
109}
110
111impl<S> Requester<S> {
112    /// Create new `Requester` with provided [`config`](Config)
113    pub fn new_with(stream: S, config: Config) -> Self {
114        Self {
115            config,
116            state: RequestParseState::Method,
117            stream,
118            builder: Some(http::request::Builder::new()),
119        }
120    }
121
122    /// Create new `Requester` with provided default config.
123    pub fn new(stream: S) -> Self {
124        Self::new_with(stream, Default::default())
125    }
126}
127
128impl<S> Requester<S>
129where
130    S: AsyncRead + Unpin + Send + 'static,
131{
132    pub async fn parse_parts(mut self) -> ParseResult<(http::request::Parts, Bytes, S)> {
133        // create header parts parse buffer with capacity to `config.parsing_headers_max_buf`
134        let mut read_buf = ReadBuf::with_capacity(self.config.parsing_headers_max_buf);
135
136        'out: while self.state != RequestParseState::Finished {
137            let chunk_mut = read_buf.chunk_mut();
138
139            // Checks if the parsing buf is overflowing.
140            if chunk_mut.len() == 0 {
141                return Err(ParseError::ParseBufOverflow(
142                    self.config.parsing_headers_max_buf,
143                ));
144            }
145
146            let read_size = self.stream.read(chunk_mut).await?;
147
148            // EOF reached.
149            if read_size == 0 {
150                return Err(ParseError::Eof);
151            }
152
153            read_buf.advance_mut(read_size);
154
155            'inner: while read_buf.chunk().len() > 0 {
156                match self.state {
157                    RequestParseState::Method => {
158                        if !self.parse_method(&mut read_buf)? {
159                            break 'inner;
160                        }
161                    }
162                    RequestParseState::Uri => {
163                        if !self.parse_uri(&mut read_buf)? {
164                            break 'inner;
165                        }
166                    }
167                    RequestParseState::Version => {
168                        if !self.parse_version(&mut read_buf)? {
169                            break 'inner;
170                        }
171                    }
172                    RequestParseState::Headers => {
173                        if !self.parse_header(&mut read_buf)? {
174                            break 'inner;
175                        }
176                    }
177                    RequestParseState::Finished => break 'out,
178                }
179            }
180
181            if let RequestParseState::Finished = self.state {
182                break;
183            }
184        }
185
186        let cached = read_buf.into_bytes(None);
187
188        let (parts, _) = self.builder.unwrap().body(())?.into_parts();
189
190        Ok((parts, cached, self.stream))
191    }
192
193    /// Try parse http request header parts and generate [`Request`] object.
194    pub async fn parse(self) -> ParseResult<Request<BodyReader>> {
195        let (parts, cached, stream) = self.parse_parts().await?;
196
197        let stream = Cursor::new(cached).chain(stream);
198
199        let body_reader = BodyReader::parse(&parts.headers, stream).await?;
200
201        // construct [`Request`]
202        Ok(Request::from_parts(parts, body_reader))
203    }
204
205    #[inline]
206    fn skip_spaces(&mut self, read_buf: &mut ReadBuf) {
207        read_buf.split_to(skip_spaces(read_buf.chunk()));
208    }
209
210    #[inline]
211    fn parse_method(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
212        self.skip_spaces(read_buf);
213
214        if let Some(len) = parse_token(read_buf.chunk()) {
215            if len == 0 {
216                return Err(ParseError::Method);
217            }
218
219            let buf = read_buf.split_to(len);
220
221            self.set_method(Method::from_bytes(&buf)?);
222
223            self.state.next();
224
225            Ok(true)
226        } else {
227            // Incomplete method token.
228            Ok(false)
229        }
230    }
231
232    #[inline]
233    fn parse_uri(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
234        self.skip_spaces(read_buf);
235
236        if let Some(len) = parse_token(read_buf.chunk()) {
237            if len == 0 {
238                return Err(ParseError::Uri);
239            }
240
241            let buf = read_buf.split_to(len);
242
243            self.set_uri(Uri::from_maybe_shared(buf)?);
244
245            self.state.next();
246
247            Ok(true)
248        } else {
249            // Incomplete method token.
250            Ok(false)
251        }
252    }
253
254    #[inline]
255    fn parse_version(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
256        self.skip_spaces(read_buf);
257
258        if let Some(version) = parse_version(read_buf.chunk())? {
259            // advance read cursor.
260            read_buf.split_to(8);
261
262            self.set_version(version);
263
264            self.state.next();
265
266            Ok(true)
267        } else {
268            Ok(false)
269        }
270    }
271
272    #[inline]
273    fn parse_header(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
274        match skip_newlines(read_buf) {
275            SkipNewLine::Break(len) => {
276                read_buf.split_to(len);
277                self.state.next();
278                return Ok(false);
279            }
280            SkipNewLine::Incomplete => return Ok(false),
281            SkipNewLine::One(len) => {
282                if read_buf.remaining() == len {
283                    return Ok(false);
284                }
285
286                read_buf.split_to(len);
287            }
288            SkipNewLine::None => {}
289        }
290
291        match parse_header(read_buf)? {
292            Some((name, value)) => {
293                self.set_header(name, value);
294                Ok(true)
295            }
296            None => Ok(false),
297        }
298    }
299
300    #[inline]
301    fn set_method(&mut self, method: Method) {
302        self.builder = Some(self.builder.take().unwrap().method(method))
303    }
304
305    #[inline]
306    fn set_uri(&mut self, uri: Uri) {
307        self.builder = Some(self.builder.take().unwrap().uri(uri))
308    }
309
310    #[inline]
311    fn set_version(&mut self, version: Version) {
312        self.builder = Some(self.builder.take().unwrap().version(version))
313    }
314
315    #[inline]
316    fn set_header(&mut self, name: HeaderName, value: HeaderValue) {
317        self.builder = Some(self.builder.take().unwrap().header(name, value))
318    }
319}
320
321/// Helper function to help parsing stream into [`Request`] instance.
322///
323/// See [`new_with`](Requester::new) for more information.
324pub async fn parse_request<S>(stream: S) -> ParseResult<Request<BodyReader>>
325where
326    S: AsyncRead + Unpin + Send + 'static,
327{
328    Requester::new(stream).parse().await
329}
330
331/// Helper function to help parsing stream into [`Request`] instance.
332///
333/// See [`new_with`](Requester::new_with) for more information.
334pub async fn parse_request_with<S>(stream: S, config: Config) -> ParseResult<Request<BodyReader>>
335where
336    S: AsyncRead + Send + Unpin + 'static,
337{
338    Requester::new_with(stream, config).parse().await
339}
340
341/// Http response packet parser.
342///
343/// In general, please do not create [`Requester`] directly but use
344/// [`parse_response`] or [`parse_response_with`] to parse the stream.
345pub struct Responser<S> {
346    /// http parser config.
347    config: Config,
348
349    /// parser statemachine.
350    state: ResponseParseState,
351
352    /// The stream from which the request parser read bytes.
353    stream: S,
354
355    /// the http `response` object builder.
356    builder: Option<http::response::Builder>,
357
358    /// the http reason
359    reason: Option<Bytes>,
360}
361
362impl<S> Responser<S> {
363    /// Create new `Requester` with provided [`config`](Config)
364    pub fn new_with(stream: S, config: Config) -> Self {
365        Self {
366            config,
367            state: ResponseParseState::Version,
368            stream,
369            builder: Some(http::response::Builder::new()),
370            reason: Some(Bytes::from_static(b"")),
371        }
372    }
373
374    /// Create new `Requester` with provided default config.
375    pub fn new(stream: S) -> Self {
376        Self::new_with(stream, Default::default())
377    }
378}
379
380impl<S> Responser<S>
381where
382    S: AsyncRead + Unpin + Send + 'static,
383{
384    pub async fn parse_parts(mut self) -> ParseResult<(Parts, Bytes, S)> {
385        // create header parts parse buffer with capacity to `config.parsing_headers_max_buf`
386        let mut read_buf = ReadBuf::with_capacity(self.config.parsing_headers_max_buf);
387
388        'out: while self.state != ResponseParseState::Finished {
389            let chunk_mut = read_buf.chunk_mut();
390
391            // Checks if the parsing buf is overflowing.
392            if chunk_mut.len() == 0 {
393                return Err(ParseError::ParseBufOverflow(
394                    self.config.parsing_headers_max_buf,
395                ));
396            }
397
398            let read_size = self.stream.read(chunk_mut).await?;
399
400            // EOF reached.
401            if read_size == 0 {
402                return Err(ParseError::Eof);
403            }
404
405            read_buf.advance_mut(read_size);
406
407            'inner: while read_buf.chunk().len() > 0 {
408                match self.state {
409                    ResponseParseState::Version => {
410                        if !self.parse_version(&mut read_buf)? {
411                            break 'inner;
412                        }
413                    }
414                    ResponseParseState::StatusCode => {
415                        if !self.parse_status_code(&mut read_buf)? {
416                            break 'inner;
417                        }
418                    }
419                    ResponseParseState::Reason => {
420                        if !self.parse_reason(&mut read_buf)? {
421                            break 'inner;
422                        }
423                    }
424                    ResponseParseState::Headers => {
425                        if !self.parse_header(&mut read_buf)? {
426                            break 'inner;
427                        }
428                    }
429                    ResponseParseState::Finished => break 'out,
430                }
431            }
432
433            if let ResponseParseState::Finished = self.state {
434                break;
435            }
436        }
437
438        let cached = read_buf.into_bytes(None);
439
440        let (parts, _) = self.builder.unwrap().body(())?.into_parts();
441
442        Ok((parts, cached, self.stream))
443    }
444
445    /// Try parse http request header parts and generate [`Request`] object.
446    pub async fn parse(self) -> ParseResult<Response<BodyReader>> {
447        let (parts, cached, stream) = self.parse_parts().await?;
448
449        let stream = Cursor::new(cached).chain(stream);
450
451        let body_reader = BodyReader::parse(&parts.headers, stream).await?;
452
453        Ok(Response::from_parts(parts, body_reader))
454    }
455
456    #[inline]
457    fn skip_spaces(&mut self, read_buf: &mut ReadBuf) {
458        read_buf.split_to(skip_spaces(read_buf.chunk()));
459    }
460
461    #[inline]
462    fn parse_status_code(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
463        self.skip_spaces(read_buf);
464
465        match parse_code(read_buf.chunk())? {
466            Some(code) => {
467                self.set_code(code);
468
469                read_buf.split_to(3);
470
471                self.state.next();
472
473                Ok(true)
474            }
475            None => Ok(false),
476        }
477    }
478
479    #[inline]
480    fn parse_reason(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
481        self.skip_spaces(read_buf);
482
483        match parse_reason(read_buf.chunk()) {
484            Some(len) => {
485                let buf = read_buf.split_to(len);
486
487                self.set_reason(buf.freeze());
488
489                self.state.next();
490
491                Ok(true)
492            }
493            None => Ok(false),
494        }
495    }
496
497    #[inline]
498    fn parse_version(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
499        self.skip_spaces(read_buf);
500
501        if let Some(version) = parse_version(read_buf.chunk())? {
502            // advance read cursor.
503            read_buf.split_to(8);
504
505            self.set_version(version);
506
507            self.state.next();
508
509            Ok(true)
510        } else {
511            Ok(false)
512        }
513    }
514
515    #[inline]
516    fn parse_header(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
517        match skip_newlines(read_buf) {
518            SkipNewLine::Break(len) => {
519                read_buf.split_to(len);
520                self.state.next();
521                return Ok(false);
522            }
523            SkipNewLine::Incomplete => return Ok(false),
524            SkipNewLine::One(len) => {
525                if read_buf.remaining() == len {
526                    return Ok(false);
527                }
528
529                read_buf.split_to(len);
530            }
531            SkipNewLine::None => {}
532        }
533
534        match parse_header(read_buf)? {
535            Some((name, value)) => {
536                self.set_header(name, value);
537                Ok(true)
538            }
539            None => Ok(false),
540        }
541    }
542
543    #[inline]
544    fn set_code(&mut self, code: StatusCode) {
545        self.builder = Some(self.builder.take().unwrap().status(code))
546    }
547
548    #[inline]
549    fn set_reason(&mut self, reason: Bytes) {
550        self.reason = Some(reason);
551    }
552
553    #[inline]
554    fn set_version(&mut self, version: Version) {
555        self.builder = Some(self.builder.take().unwrap().version(version))
556    }
557
558    #[inline]
559    fn set_header(&mut self, name: HeaderName, value: HeaderValue) {
560        self.builder = Some(self.builder.take().unwrap().header(name, value))
561    }
562}
563
564/// Helper function to help parsing stream into [`Response`] instance.
565///
566/// See [`new_with`](Response::new) for more information.
567pub async fn parse_response<S>(stream: S) -> ParseResult<Response<BodyReader>>
568where
569    S: AsyncRead + Send + Unpin + 'static,
570{
571    Responser::new(stream).parse().await
572}
573
574/// Helper function to help parsing stream into [`Response`] instance.
575///
576/// See [`new_with`](Responser::new_with) for more information.
577pub async fn parse_response_with<S>(stream: S, config: Config) -> ParseResult<Response<BodyReader>>
578where
579    S: AsyncRead + Send + Unpin + 'static,
580{
581    Responser::new_with(stream, config).parse().await
582}
583
584/// The statemachine of [`Requester`].
585#[repr(u8)]
586#[derive(Debug, Clone, Copy, PartialEq)]
587#[allow(unused)]
588enum ResponseParseState {
589    Version = 1,
590    StatusCode = 2,
591    Reason = 3,
592    Headers = 4,
593    Finished = 5,
594}
595
596impl ResponseParseState {
597    fn next(&mut self) {
598        if let ResponseParseState::Finished = self {
599            return;
600        }
601
602        unsafe { *(self as *mut Self as *mut u8) += 1 }
603    }
604}
605
606/// The statemachine of [`Requester`].
607#[repr(u8)]
608#[derive(Debug, Clone, Copy, PartialEq)]
609#[allow(unused)]
610enum RequestParseState {
611    Method = 1,
612    Uri = 2,
613    Version = 3,
614    Headers = 4,
615    Finished = 5,
616}
617
618impl RequestParseState {
619    fn next(&mut self) {
620        if let RequestParseState::Finished = self {
621            return;
622        }
623
624        unsafe { *(self as *mut Self as *mut u8) += 1 }
625    }
626}
627
628#[inline]
629fn skip_spaces(buf: &[u8]) -> usize {
630    for (offset, b) in buf.iter().cloned().enumerate() {
631        if b != b' ' && b != b'\t' {
632            return offset;
633        }
634    }
635
636    buf.len()
637}
638
639#[inline]
640fn parse_token(buf: &[u8]) -> Option<usize> {
641    for (offset, c) in buf.iter().cloned().enumerate() {
642        if c == b' ' || c == b'\t' || c == b'\r' || c == b'\n' {
643            return Some(offset);
644        }
645    }
646
647    None
648}
649
650#[inline]
651fn parse_header_name(buf: &[u8]) -> Option<usize> {
652    for (offset, c) in buf.iter().cloned().enumerate() {
653        if c == b':' {
654            return Some(offset);
655        }
656    }
657
658    None
659}
660
661#[inline]
662fn parse_header_value(buf: &[u8]) -> Option<usize> {
663    for (offset, c) in buf.iter().cloned().enumerate() {
664        if c == b'\r' || c == b'\n' {
665            return Some(offset);
666        }
667    }
668
669    None
670}
671
672#[inline]
673fn parse_version(buf: &[u8]) -> ParseResult<Option<Version>> {
674    if buf.len() >= 8 {
675        return match &buf[0..8] {
676            b"HTTP/0.9" => Ok(Some(Version::HTTP_09)),
677            b"HTTP/1.0" => Ok(Some(Version::HTTP_10)),
678            b"HTTP/1.1" => Ok(Some(Version::HTTP_11)),
679            b"HTTP/2.0" => Ok(Some(Version::HTTP_2)),
680            b"HTTP/3.0" => Ok(Some(Version::HTTP_3)),
681            _ => Err(ParseError::Version),
682        };
683    }
684
685    Ok(None)
686}
687
688/// result of [`skip_newline`]
689enum SkipNewLine {
690    /// do nothing
691    None,
692    /// skip one newline token.
693    One(usize),
694    /// This is a sequence of two line breaks, indicating that processing
695    /// of the current paragraph has been completed.
696    Break(usize),
697    /// newline token is incomplete.
698    Incomplete,
699}
700
701#[inline]
702fn _skip_newline(buf: &[u8]) -> SkipNewLine {
703    if buf.len() > 1 {
704        if b"\r\n" == &buf[..2] {
705            return SkipNewLine::One(2);
706        }
707
708        if b"\n\n" == &buf[..2] {
709            return SkipNewLine::Break(2);
710        }
711    }
712
713    if buf.len() > 0 {
714        match buf[0] {
715            b'\n' => {
716                return SkipNewLine::One(1);
717            }
718            b'\r' => {
719                return SkipNewLine::Incomplete;
720            }
721            _ => {}
722        }
723    }
724
725    SkipNewLine::None
726}
727
728#[inline]
729fn _skip_newlines(buf: &[u8]) -> SkipNewLine {
730    let mut offset = 0;
731    let mut is_break = false;
732
733    loop {
734        match _skip_newline(&buf[offset..]) {
735            SkipNewLine::Incomplete | SkipNewLine::None => {
736                if is_break {
737                    return SkipNewLine::Break(offset);
738                }
739
740                if offset > 0 {
741                    return SkipNewLine::One(offset);
742                }
743
744                return SkipNewLine::None;
745            }
746            SkipNewLine::One(len) => {
747                if offset > 0 {
748                    is_break = true;
749                }
750
751                offset += len;
752            }
753            SkipNewLine::Break(len) => {
754                is_break = true;
755                offset += len;
756            }
757        }
758    }
759}
760
761#[inline]
762fn skip_newlines(read_buf: &mut ReadBuf) -> SkipNewLine {
763    let skip_new_line = _skip_newlines(read_buf.chunk());
764
765    skip_new_line
766}
767
768#[inline]
769fn trim_suffix_spaces(buf: &mut BytesMut) {
770    for (offset, c) in buf.iter().rev().cloned().enumerate() {
771        if c != b' ' && c != b'\t' {
772            if offset > 0 {
773                _ = buf.split_off(buf.len() - offset);
774            }
775
776            break;
777        }
778    }
779}
780
781#[inline]
782fn parse_reason<'a>(buf: &[u8]) -> Option<usize> {
783    for (offset, c) in buf.iter().cloned().enumerate() {
784        if c == b'\r' || c == b'\n' {
785            return Some(offset);
786        }
787    }
788
789    None
790}
791
792#[inline]
793fn parse_code(buf: &[u8]) -> ParseResult<Option<StatusCode>> {
794    if buf.len() >= 3 {
795        Ok(Some(StatusCode::from_bytes(&buf[..3])?))
796    } else {
797        Ok(None)
798    }
799}
800
801fn parse_header(read_buf: &mut ReadBuf) -> ParseResult<Option<(HeaderName, HeaderValue)>> {
802    let chunk = read_buf.chunk();
803
804    let mut offset = skip_spaces(chunk);
805
806    let name_offset = offset;
807
808    let name_len = match parse_header_name(&chunk[offset..]) {
809        Some(name_len) => name_len,
810        None => return Ok(None),
811    };
812
813    // advance: name + ':'
814    offset += name_len + 1;
815
816    let value_offset = skip_spaces(&chunk[offset..]);
817
818    offset += value_offset;
819
820    let value_len = match parse_header_value(&chunk[offset..]) {
821        Some(value_len) => value_len,
822        None => return Ok(None),
823    };
824
825    read_buf.split_to(name_offset);
826
827    let mut buf = read_buf.split_to(name_len);
828
829    trim_suffix_spaces(&mut buf);
830
831    let header_name = HeaderName::from_bytes(&buf)?;
832
833    read_buf.split_to(value_offset + 1);
834
835    let mut buf = read_buf.split_to(value_len);
836
837    trim_suffix_spaces(&mut buf);
838
839    let header_value = HeaderValue::from_maybe_shared(buf)?;
840
841    Ok(Some((header_name, header_value)))
842}
843
844pub trait HttpReader: AsyncRead + Unpin + Send + 'static {
845    fn read_request(self) -> impl Future<Output = io::Result<Request<BodyReader>>>
846    where
847        Self: Sized,
848    {
849        async { Ok(Requester::new(self).parse().await?) }
850    }
851
852    fn read_response(self) -> impl Future<Output = io::Result<Response<BodyReader>>>
853    where
854        Self: Sized,
855    {
856        async { Ok(Responser::new(self).parse().await?) }
857    }
858}
859
860impl<T: AsyncRead + Send + Unpin + 'static> HttpReader for T {}
861
862#[cfg(test)]
863mod tests {
864    use super::*;
865
866    #[test]
867    fn test_parse_state() {
868        let mut state = RequestParseState::Method;
869
870        state.next();
871
872        assert_eq!(state, RequestParseState::Uri);
873
874        state.next();
875
876        assert_eq!(state, RequestParseState::Version);
877
878        state.next();
879
880        assert_eq!(state, RequestParseState::Headers);
881
882        state.next();
883
884        assert_eq!(state, RequestParseState::Finished);
885
886        state.next();
887
888        assert_eq!(state, RequestParseState::Finished);
889    }
890
891    use futures::io::Cursor;
892
893    use http::{Method, Request, Version};
894
895    async fn parse_request(buf: &[u8]) -> ParseResult<Request<()>> {
896        let (parts, _, _) = Requester::new(Cursor::new(buf.to_vec()))
897            .parse_parts()
898            .await?;
899
900        Ok(Request::from_parts(parts, ()))
901    }
902
903    async fn parse_request_test<F>(buf: &[u8], f: F)
904    where
905        F: FnOnce(Request<()>),
906    {
907        let request = parse_request(buf).await.expect("parse request failed.");
908
909        f(request)
910    }
911
912    async fn expect_request_partial_parse(buf: &[u8]) {
913        let error = parse_request(buf).await.expect_err("");
914        if let ParseError::Eof = error {
915        } else {
916            panic!("Expect eof, but got {:?}", error);
917        }
918    }
919
920    async fn expect_request_empty_method(buf: &[u8]) {
921        let error = parse_request(buf).await.expect_err("");
922        if let ParseError::InvalidMethod(_) = error {
923        } else {
924            panic!("Expect method error, but got {:?}", error);
925        }
926    }
927
928    async fn expect_request_empty_uri(buf: &[u8]) {
929        let error = parse_request(buf).await.expect_err("");
930        if let ParseError::InvalidUri(_) = error {
931        } else {
932            panic!("Expect uri error, but got {:?}", error);
933        }
934    }
935
936    async fn parse_response(buf: &[u8]) -> ParseResult<Response<()>> {
937        let (parts, _, _) = Responser::new(Cursor::new(buf.to_vec()))
938            .parse_parts()
939            .await?;
940
941        Ok(Response::from_parts(parts, ()))
942    }
943
944    async fn parse_response_test<F>(buf: &[u8], f: F)
945    where
946        F: FnOnce(Response<()>),
947    {
948        let request = parse_response(buf).await.expect("parse request failed.");
949
950        f(request)
951    }
952
953    async fn expect_response_partial_parse(buf: &[u8]) {
954        let error = parse_response(buf).await.expect_err("");
955        if let ParseError::Eof = error {
956        } else {
957            panic!("Expect eof, but got {:?}", error);
958        }
959    }
960
961    async fn expect_response_version(buf: &[u8]) {
962        let error = parse_response(buf).await.expect_err("");
963        if let ParseError::Version = error {
964        } else {
965            panic!("Expect version, but got {:?}", error);
966        }
967    }
968
969    #[futures_test::test]
970    async fn response_tests() {
971        parse_response_test(b"HTTP/1.1 200 OK\r\n\r\n", |resp| {
972            assert_eq!(resp.version(), Version::HTTP_11);
973            assert_eq!(resp.status(), StatusCode::OK);
974        })
975        .await;
976
977        parse_response_test(b"HTTP/1.0 403 Forbidden\nServer: foo.bar\n\n", |resp| {
978            assert_eq!(resp.version(), Version::HTTP_10);
979            assert_eq!(resp.status(), StatusCode::FORBIDDEN);
980        })
981        .await;
982
983        parse_response_test(b"HTTP/1.1 200 \r\n\r\n", |resp| {
984            assert_eq!(resp.version(), Version::HTTP_11);
985            assert_eq!(resp.status(), StatusCode::OK);
986        })
987        .await;
988
989        parse_response_test(b"HTTP/1.1 200\r\n\r\n", |resp| {
990            assert_eq!(resp.version(), Version::HTTP_11);
991            assert_eq!(resp.status(), StatusCode::OK);
992        })
993        .await;
994
995        parse_response_test(b"HTTP/1.1 200\r\nFoo: bar\r\n\r\n", |resp| {
996            assert_eq!(resp.version(), Version::HTTP_11);
997            assert_eq!(resp.status(), StatusCode::OK);
998            assert_eq!(resp.headers().len(), 1);
999
1000            assert_eq!(resp.headers().get("Foo").unwrap().to_str().unwrap(), "bar");
1001        })
1002        .await;
1003
1004        parse_response_test(b"HTTP/1.1 200 X\xFFZ\r\n\r\n", |resp| {
1005            assert_eq!(resp.version(), Version::HTTP_11);
1006            assert_eq!(resp.status(), StatusCode::OK);
1007        })
1008        .await;
1009
1010        parse_response_test(b"HTTP/1.1 200 \x00\r\n\r\n", |resp| {
1011            assert_eq!(resp.version(), Version::HTTP_11);
1012            assert_eq!(resp.status(), StatusCode::OK);
1013        })
1014        .await;
1015
1016        parse_response_test(b"HTTP/1.0 200\nContent-type: text/html\n\n", |resp| {
1017            assert_eq!(resp.version(), Version::HTTP_10);
1018            assert_eq!(resp.status(), StatusCode::OK);
1019            assert_eq!(resp.headers().len(), 1);
1020            assert_eq!(
1021                resp.headers()
1022                    .get("Content-type")
1023                    .unwrap()
1024                    .to_str()
1025                    .unwrap(),
1026                "text/html"
1027            );
1028        })
1029        .await;
1030
1031        parse_response_test( b"HTTP/1.1 200 OK\r\nAccess-Control-Allow-Credentials : true\r\nBread: baguette\r\n\r\n", |resp| {
1032            assert_eq!(resp.version(), Version::HTTP_11);
1033            assert_eq!(resp.status(), StatusCode::OK);
1034            assert_eq!(resp.headers().len(), 2);
1035            assert_eq!(
1036                resp.headers()
1037                    .get("Access-Control-Allow-Credentials")
1038                    .unwrap()
1039                    .to_str()
1040                    .unwrap(),
1041                "true"
1042            );
1043
1044            assert_eq!(
1045                resp.headers()
1046                    .get("Bread")
1047                    .unwrap()
1048                    .to_str()
1049                    .unwrap(),
1050                "baguette"
1051            );
1052        })
1053        .await;
1054
1055        expect_response_partial_parse(b"HTTP/1.1").await;
1056
1057        expect_response_partial_parse(b"HTTP/1.1 200").await;
1058
1059        expect_response_partial_parse(b"HTTP/1.1 200 OK\r\nServer: yolo\r\n").await;
1060
1061        expect_response_version(b"\n\nHTTP/1.1 200 OK\n\n").await;
1062    }
1063
1064    #[futures_test::test]
1065    async fn request_tests() {
1066        parse_request_test(b"GET / HTTP/1.1\r\n\r\n", |req| {
1067            assert_eq!(req.method(), Method::GET);
1068            assert_eq!(req.uri().to_string(), "/");
1069            assert_eq!(req.version(), Version::HTTP_11);
1070            assert_eq!(req.headers().len(), 0);
1071        })
1072        .await;
1073
1074        parse_request_test(b"GET /thing?data=a HTTP/1.1\r\n\r\n", |req| {
1075            assert_eq!(req.method(), Method::GET);
1076            assert_eq!(req.uri().to_string(), "/thing?data=a");
1077            assert_eq!(req.version(), Version::HTTP_11);
1078            assert_eq!(req.headers().len(), 0);
1079        })
1080        .await;
1081
1082        parse_request_test(b"GET /thing?data=a^ HTTP/1.1\r\n\r\n", |req| {
1083            assert_eq!(req.method(), Method::GET);
1084            assert_eq!(req.uri().to_string(), "/thing?data=a^");
1085            assert_eq!(req.version(), Version::HTTP_11);
1086            assert_eq!(req.headers().len(), 0);
1087        })
1088        .await;
1089
1090        parse_request_test(
1091            b"GET / HTTP/1.1\r\nHost: foo.com\r\nCookie: \r\n\r\n",
1092            |req| {
1093                assert_eq!(req.method(), Method::GET);
1094                assert_eq!(req.uri().to_string(), "/");
1095                assert_eq!(req.version(), Version::HTTP_11);
1096                assert_eq!(req.headers().len(), 2);
1097                assert_eq!(
1098                    req.headers().get("Host").unwrap().to_str().unwrap(),
1099                    "foo.com"
1100                );
1101                assert_eq!(req.headers().get("Cookie").unwrap().to_str().unwrap(), "");
1102            },
1103        )
1104        .await;
1105
1106        parse_request_test(
1107            b"GET / HTTP/1.1\r\nHost: \tfoo.com\t \r\nCookie: \t \r\n\r\n",
1108            |req| {
1109                assert_eq!(req.method(), Method::GET);
1110                assert_eq!(req.uri().to_string(), "/");
1111                assert_eq!(req.version(), Version::HTTP_11);
1112                assert_eq!(req.headers().len(), 2);
1113                assert_eq!(
1114                    req.headers().get("Host").unwrap().to_str().unwrap(),
1115                    "foo.com"
1116                );
1117                assert_eq!(req.headers().get("Cookie").unwrap().to_str().unwrap(), "");
1118            },
1119        )
1120        .await;
1121
1122        parse_request_test(
1123            b"GET / HTTP/1.1\r\nUser-Agent: some\tagent\r\n\r\n",
1124            |req| {
1125                assert_eq!(req.method(), Method::GET);
1126                assert_eq!(req.uri().to_string(), "/");
1127                assert_eq!(req.version(), Version::HTTP_11);
1128                assert_eq!(req.headers().len(), 1);
1129                assert_eq!(
1130                    req.headers().get("User-Agent").unwrap().to_str().unwrap(),
1131                    "some\tagent"
1132                );
1133            },
1134        )
1135        .await;
1136
1137        parse_request_test(
1138            b"GET / HTTP/1.1\r\nUser-Agent: 1234567890some\tagent\r\n\r\n",
1139            |req| {
1140                assert_eq!(req.method(), Method::GET);
1141                assert_eq!(req.uri().to_string(), "/");
1142                assert_eq!(req.version(), Version::HTTP_11);
1143                assert_eq!(req.headers().len(), 1);
1144                assert_eq!(
1145                    req.headers().get("User-Agent").unwrap().to_str().unwrap(),
1146                    "1234567890some\tagent"
1147                );
1148            },
1149        )
1150        .await;
1151
1152        parse_request_test(
1153            b"GET / HTTP/1.1\r\nUser-Agent: 1234567890some\t1234567890agent1234567890\r\n\r\n",
1154            |req| {
1155                assert_eq!(req.method(), Method::GET);
1156                assert_eq!(req.uri().to_string(), "/");
1157                assert_eq!(req.version(), Version::HTTP_11);
1158                assert_eq!(req.headers().len(), 1);
1159                assert_eq!(
1160                    req.headers().get("User-Agent").unwrap().to_str().unwrap(),
1161                    "1234567890some\t1234567890agent1234567890"
1162                );
1163            },
1164        )
1165        .await;
1166
1167        parse_request_test(
1168            b"GET / HTTP/1.1\r\nHost: foo.com\r\nUser-Agent: \xe3\x81\xb2\xe3/1.0\r\n\r\n",
1169            |req| {
1170                assert_eq!(req.method(), Method::GET);
1171                assert_eq!(req.uri().to_string(), "/");
1172                assert_eq!(req.version(), Version::HTTP_11);
1173                assert_eq!(req.headers().len(), 2);
1174                assert_eq!(
1175                    req.headers().get("Host").unwrap().to_str().unwrap(),
1176                    "foo.com"
1177                );
1178                assert_eq!(
1179                    req.headers().get("User-Agent").unwrap().as_bytes(),
1180                    b"\xe3\x81\xb2\xe3/1.0"
1181                );
1182            },
1183        )
1184        .await;
1185
1186        parse_request_test(b"GET /\\?wayne\\=5 HTTP/1.1\r\n\r\n", |req| {
1187            assert_eq!(req.method(), Method::GET);
1188            assert_eq!(req.uri().to_string(), "/\\?wayne\\=5");
1189            assert_eq!(req.version(), Version::HTTP_11);
1190            assert_eq!(req.headers().len(), 0);
1191        })
1192        .await;
1193
1194        expect_request_partial_parse(b"GET / HTTP/1.1\r\n\r").await;
1195
1196        expect_request_partial_parse(b"GET / HTTP/1.1\r\nHost: yolo\r\n").await;
1197
1198        expect_request_empty_uri(b"GET  HTTP/1.1\r\n\r\n").await;
1199
1200        expect_request_empty_method(b"  HTTP/1.1\r\n\r\n").await;
1201
1202        expect_request_empty_method(b" / HTTP/1.1\r\n\r\n").await;
1203    }
1204}