rasi_ext/net/http/
parse.rs

1use std::{io, time::Duration};
2
3use bytes::{Bytes, BytesMut};
4use futures::{AsyncRead, AsyncReadExt};
5use http::{
6    header::{InvalidHeaderName, InvalidHeaderValue},
7    method::InvalidMethod,
8    status::InvalidStatusCode,
9    uri::InvalidUri,
10    HeaderName, HeaderValue, Method, Request, Response, StatusCode, Uri, Version,
11};
12use rasi::{io::Cursor, time::TimeoutExt};
13
14use crate::utils::ReadBuf;
15
16#[derive(Debug, thiserror::Error)]
17pub enum ParseError {
18    #[error(transparent)]
19    HttpError(#[from] http::Error),
20
21    #[error("Http header parse buf overflow, max={0}")]
22    ParseBufOverflow(usize),
23
24    #[error(transparent)]
25    IoError(#[from] io::Error),
26
27    #[error("Unable to complete http parsing, reached the end of the stream.")]
28    Eof,
29
30    #[error("Miss method field.")]
31    Method,
32
33    #[error(transparent)]
34    InvalidMethod(#[from] InvalidMethod),
35
36    #[error("Miss uri field.")]
37    Uri,
38
39    #[error(transparent)]
40    InvalidUri(#[from] InvalidUri),
41
42    #[error("Invalid http version.")]
43    Version,
44
45    #[error(transparent)]
46    InvalidHeaderName(#[from] InvalidHeaderName),
47
48    #[error(transparent)]
49    InvalidHeaderValue(#[from] InvalidHeaderValue),
50
51    #[error(transparent)]
52    InvalidStatusCode(#[from] InvalidStatusCode),
53
54    #[error(transparent)]
55    SerdeJsonError(#[from] serde_json::Error),
56}
57
58impl From<ParseError> for io::Error {
59    fn from(value: ParseError) -> Self {
60        match value {
61            ParseError::IoError(err) => err,
62            _ => io::Error::new(io::ErrorKind::Other, value),
63        }
64    }
65}
66
67/// Type alias for parser result.
68pub type ParseResult<T> = Result<T, ParseError>;
69
70/// Http packet parse config.
71#[derive(Debug)]
72pub struct Config {
73    /// The max buf len for parsing http headers.
74    pub parsing_headers_max_buf: usize,
75}
76
77impl Default for Config {
78    fn default() -> Self {
79        Self {
80            parsing_headers_max_buf: 2048,
81        }
82    }
83}
84
85/// Http body reader created by http parsers, see mod [`parse`](crate::parse)  for more information.
86#[derive(Debug)]
87pub struct BodyReader<S> {
88    /// Unparsed data cached by the parser when parsing http headers
89    cached: Bytes,
90
91    /// The underlying transport layer data stream
92    stream: S,
93}
94
95impl<S> BodyReader<S> {
96    /// Create new `BodyReader` instance from `(Bytes,S)`
97    pub fn new(cached: Bytes, stream: S) -> Self {
98        Self { cached, stream }
99    }
100}
101
102impl<S> BodyReader<S>
103where
104    S: AsyncRead + Unpin,
105{
106    /// Consume `BodyReader` and create an [`AsyncRead`] instance.
107    pub fn into_read(self) -> impl AsyncRead {
108        Cursor::new(self.cached).chain(self.stream)
109    }
110
111    /// Consume `BodyReader` into [`BytesMut`].
112    ///
113    /// Use `max_body_len` to limit memory buf usage, which may be useful for server-side code.
114    pub async fn into_bytes(
115        self,
116        max_body_len: usize,
117        timeout: Option<Duration>,
118    ) -> ParseResult<BytesMut> {
119        let mut stream = self.into_read();
120
121        let mut read_buf = ReadBuf::with_capacity(max_body_len);
122
123        loop {
124            let chunk_mut = read_buf.chunk_mut();
125
126            // Checks if the parsing buf is overflowing.
127            if chunk_mut.len() == 0 {
128                return Err(ParseError::ParseBufOverflow(max_body_len));
129            }
130
131            let read_size = if let Some(timeout) = timeout {
132                match stream.read(chunk_mut).timeout(timeout).await {
133                    Some(r) => r?,
134                    None => {
135                        return Err(io::Error::new(
136                            io::ErrorKind::TimedOut,
137                            "read body content timeout",
138                        )
139                        .into())
140                    }
141                }
142            } else {
143                stream.read(chunk_mut).await?
144            };
145
146            if read_size == 0 {
147                break;
148            }
149
150            read_buf.advance_mut(read_size);
151        }
152
153        Ok(read_buf.into_bytes_mut(None))
154    }
155
156    /// Deserialize an instance of type `T` from http json format body.
157    ///
158    /// The maximum body length is limited to `4096` bytes,
159    /// use [`from_json_with`](Self::from_json_with) instead if you want to use other values.
160    pub async fn from_json<T>(self, timeout: Option<Duration>) -> ParseResult<T>
161    where
162        for<'a> T: serde::de::Deserialize<'a>,
163    {
164        self.from_json_with(4096, timeout).await
165    }
166
167    /// Deserialize an instance of type `T` from http json format body.
168    ///
169    /// Use `max_body_len` to limit memory buf usage, which may be useful for server-side code.
170    pub async fn from_json_with<T>(
171        self,
172        max_body_len: usize,
173        timeout: Option<Duration>,
174    ) -> ParseResult<T>
175    where
176        for<'a> T: serde::de::Deserialize<'a>,
177    {
178        let buf = self.into_bytes(max_body_len, timeout).await?;
179
180        Ok(serde_json::from_slice(&buf)?)
181    }
182}
183/// Http request packet parser.
184///
185/// In general, please do not create [`Requester`] directly but use
186/// [`parse_request`] or [`parse_request_with`] to parse the stream.
187pub struct Requester<S> {
188    /// http parser config.
189    config: Config,
190
191    /// parser statemachine.
192    state: RequestParseState,
193
194    /// The stream from which the request parser read bytes.
195    stream: S,
196
197    /// the http `request` object builder.
198    builder: Option<http::request::Builder>,
199}
200
201impl<S> Requester<S> {
202    /// Create new `Requester` with provided [`config`](Config)
203    pub fn new_with(stream: S, config: Config) -> Self {
204        Self {
205            config,
206            state: RequestParseState::Method,
207            stream,
208            builder: Some(http::request::Builder::new()),
209        }
210    }
211
212    /// Create new `Requester` with provided default config.
213    pub fn new(stream: S) -> Self {
214        Self::new_with(stream, Default::default())
215    }
216}
217
218impl<S> Requester<S>
219where
220    S: AsyncRead + Unpin,
221{
222    /// Try parse http request header parts and generate [`Request`] object.
223    pub async fn parse(mut self) -> ParseResult<Request<BodyReader<S>>> {
224        // create header parts parse buffer with capacity to `config.parsing_headers_max_buf`
225        let mut read_buf = ReadBuf::with_capacity(self.config.parsing_headers_max_buf);
226
227        'out: while self.state != RequestParseState::Finished {
228            let chunk_mut = read_buf.chunk_mut();
229
230            // Checks if the parsing buf is overflowing.
231            if chunk_mut.len() == 0 {
232                return Err(ParseError::ParseBufOverflow(
233                    self.config.parsing_headers_max_buf,
234                ));
235            }
236
237            let read_size = self.stream.read(chunk_mut).await?;
238
239            // EOF reached.
240            if read_size == 0 {
241                return Err(ParseError::Eof);
242            }
243
244            read_buf.advance_mut(read_size);
245
246            'inner: while read_buf.chunk().len() > 0 {
247                match self.state {
248                    RequestParseState::Method => {
249                        if !self.parse_method(&mut read_buf)? {
250                            break 'inner;
251                        }
252                    }
253                    RequestParseState::Uri => {
254                        if !self.parse_uri(&mut read_buf)? {
255                            break 'inner;
256                        }
257                    }
258                    RequestParseState::Version => {
259                        if !self.parse_version(&mut read_buf)? {
260                            break 'inner;
261                        }
262                    }
263                    RequestParseState::Headers => {
264                        if !self.parse_header(&mut read_buf)? {
265                            break 'inner;
266                        }
267                    }
268                    RequestParseState::Finished => break 'out,
269                }
270            }
271
272            if let RequestParseState::Finished = self.state {
273                break;
274            }
275        }
276
277        let cached = read_buf.into_bytes(None);
278
279        // construct [`Request`]
280        Ok(self
281            .builder
282            .unwrap()
283            .body(BodyReader::new(cached, self.stream))?)
284    }
285
286    #[inline]
287    fn skip_spaces(&mut self, read_buf: &mut ReadBuf) {
288        read_buf.split_to(skip_spaces(read_buf.chunk()));
289    }
290
291    #[inline]
292    fn parse_method(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
293        self.skip_spaces(read_buf);
294
295        if let Some(len) = parse_token(read_buf.chunk()) {
296            if len == 0 {
297                return Err(ParseError::Method);
298            }
299
300            let buf = read_buf.split_to(len);
301
302            self.set_method(Method::from_bytes(&buf)?);
303
304            self.state.next();
305
306            Ok(true)
307        } else {
308            // Incomplete method token.
309            Ok(false)
310        }
311    }
312
313    #[inline]
314    fn parse_uri(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
315        self.skip_spaces(read_buf);
316
317        if let Some(len) = parse_token(read_buf.chunk()) {
318            if len == 0 {
319                return Err(ParseError::Uri);
320            }
321
322            let buf = read_buf.split_to(len);
323
324            self.set_uri(Uri::from_maybe_shared(buf)?);
325
326            self.state.next();
327
328            Ok(true)
329        } else {
330            // Incomplete method token.
331            Ok(false)
332        }
333    }
334
335    #[inline]
336    fn parse_version(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
337        self.skip_spaces(read_buf);
338
339        if let Some(version) = parse_version(read_buf.chunk())? {
340            // advance read cursor.
341            read_buf.split_to(8);
342
343            self.set_version(version);
344
345            self.state.next();
346
347            Ok(true)
348        } else {
349            Ok(false)
350        }
351    }
352
353    #[inline]
354    fn parse_header(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
355        match skip_newlines(read_buf) {
356            SkipNewLine::Break(len) => {
357                read_buf.split_to(len);
358                self.state.next();
359                return Ok(false);
360            }
361            SkipNewLine::Incomplete => return Ok(false),
362            SkipNewLine::One(len) => {
363                if read_buf.remaining() == len {
364                    return Ok(false);
365                }
366
367                read_buf.split_to(len);
368            }
369            SkipNewLine::None => {}
370        }
371
372        match parse_header(read_buf)? {
373            Some((name, value)) => {
374                self.set_header(name, value);
375                Ok(true)
376            }
377            None => Ok(false),
378        }
379    }
380
381    #[inline]
382    fn set_method(&mut self, method: Method) {
383        self.builder = Some(self.builder.take().unwrap().method(method))
384    }
385
386    #[inline]
387    fn set_uri(&mut self, uri: Uri) {
388        self.builder = Some(self.builder.take().unwrap().uri(uri))
389    }
390
391    #[inline]
392    fn set_version(&mut self, version: Version) {
393        self.builder = Some(self.builder.take().unwrap().version(version))
394    }
395
396    #[inline]
397    fn set_header(&mut self, name: HeaderName, value: HeaderValue) {
398        self.builder = Some(self.builder.take().unwrap().header(name, value))
399    }
400}
401
402/// Helper function to help parsing stream into [`Request`] instance.
403///
404/// See [`new_with`](Requester::new) for more information.
405pub async fn parse_request<S>(stream: S) -> io::Result<Request<BodyReader<S>>>
406where
407    S: AsyncRead + Unpin,
408{
409    Ok(Requester::new(stream).parse().await?)
410}
411
412/// Helper function to help parsing stream into [`Request`] instance.
413///
414/// See [`new_with`](Requester::new_with) for more information.
415pub async fn parse_request_with<S>(stream: S, config: Config) -> io::Result<Request<BodyReader<S>>>
416where
417    S: AsyncRead + Unpin,
418{
419    Ok(Requester::new_with(stream, config).parse().await?)
420}
421
422/// Http response packet parser.
423///
424/// In general, please do not create [`Requester`] directly but use
425/// [`parse_response`] or [`parse_response_with`] to parse the stream.
426pub struct Responser<S> {
427    /// http parser config.
428    config: Config,
429
430    /// parser statemachine.
431    state: ResponseParseState,
432
433    /// The stream from which the request parser read bytes.
434    stream: S,
435
436    /// the http `response` object builder.
437    builder: Option<http::response::Builder>,
438
439    /// the http reason
440    reason: Option<Bytes>,
441}
442
443impl<S> Responser<S> {
444    /// Create new `Requester` with provided [`config`](Config)
445    pub fn new_with(stream: S, config: Config) -> Self {
446        Self {
447            config,
448            state: ResponseParseState::Version,
449            stream,
450            builder: Some(http::response::Builder::new()),
451            reason: Some(Bytes::from_static(b"")),
452        }
453    }
454
455    /// Create new `Requester` with provided default config.
456    pub fn new(stream: S) -> Self {
457        Self::new_with(stream, Default::default())
458    }
459}
460
461impl<S> Responser<S>
462where
463    S: AsyncRead + Unpin,
464{
465    /// Try parse http request header parts and generate [`Request`] object.
466    pub async fn parse(mut self) -> ParseResult<Response<BodyReader<S>>> {
467        // create header parts parse buffer with capacity to `config.parsing_headers_max_buf`
468        let mut read_buf = ReadBuf::with_capacity(self.config.parsing_headers_max_buf);
469
470        'out: while self.state != ResponseParseState::Finished {
471            let chunk_mut = read_buf.chunk_mut();
472
473            // Checks if the parsing buf is overflowing.
474            if chunk_mut.len() == 0 {
475                return Err(ParseError::ParseBufOverflow(
476                    self.config.parsing_headers_max_buf,
477                ));
478            }
479
480            let read_size = self.stream.read(chunk_mut).await?;
481
482            // EOF reached.
483            if read_size == 0 {
484                return Err(ParseError::Eof);
485            }
486
487            read_buf.advance_mut(read_size);
488
489            'inner: while read_buf.chunk().len() > 0 {
490                match self.state {
491                    ResponseParseState::Version => {
492                        if !self.parse_version(&mut read_buf)? {
493                            break 'inner;
494                        }
495                    }
496                    ResponseParseState::StatusCode => {
497                        if !self.parse_status_code(&mut read_buf)? {
498                            break 'inner;
499                        }
500                    }
501                    ResponseParseState::Reason => {
502                        if !self.parse_reason(&mut read_buf)? {
503                            break 'inner;
504                        }
505                    }
506                    ResponseParseState::Headers => {
507                        if !self.parse_header(&mut read_buf)? {
508                            break 'inner;
509                        }
510                    }
511                    ResponseParseState::Finished => break 'out,
512                }
513            }
514
515            if let ResponseParseState::Finished = self.state {
516                break;
517            }
518        }
519
520        let cached = read_buf.into_bytes(None);
521
522        // construct [`Request`]
523        Ok(self
524            .builder
525            .unwrap()
526            .body(BodyReader::new(cached, self.stream))?)
527    }
528
529    #[inline]
530    fn skip_spaces(&mut self, read_buf: &mut ReadBuf) {
531        read_buf.split_to(skip_spaces(read_buf.chunk()));
532    }
533
534    #[inline]
535    fn parse_status_code(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
536        self.skip_spaces(read_buf);
537
538        match parse_code(read_buf.chunk())? {
539            Some(code) => {
540                self.set_code(code);
541
542                read_buf.split_to(3);
543
544                self.state.next();
545
546                Ok(true)
547            }
548            None => Ok(false),
549        }
550    }
551
552    #[inline]
553    fn parse_reason(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
554        self.skip_spaces(read_buf);
555
556        match parse_reason(read_buf.chunk()) {
557            Some(len) => {
558                let buf = read_buf.split_to(len);
559
560                self.set_reason(buf.freeze());
561
562                self.state.next();
563
564                Ok(true)
565            }
566            None => Ok(false),
567        }
568    }
569
570    #[inline]
571    fn parse_version(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
572        self.skip_spaces(read_buf);
573
574        if let Some(version) = parse_version(read_buf.chunk())? {
575            // advance read cursor.
576            read_buf.split_to(8);
577
578            self.set_version(version);
579
580            self.state.next();
581
582            Ok(true)
583        } else {
584            Ok(false)
585        }
586    }
587
588    #[inline]
589    fn parse_header(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
590        match skip_newlines(read_buf) {
591            SkipNewLine::Break(len) => {
592                read_buf.split_to(len);
593                self.state.next();
594                return Ok(false);
595            }
596            SkipNewLine::Incomplete => return Ok(false),
597            SkipNewLine::One(len) => {
598                if read_buf.remaining() == len {
599                    return Ok(false);
600                }
601
602                read_buf.split_to(len);
603            }
604            SkipNewLine::None => {}
605        }
606
607        match parse_header(read_buf)? {
608            Some((name, value)) => {
609                self.set_header(name, value);
610                Ok(true)
611            }
612            None => Ok(false),
613        }
614    }
615
616    #[inline]
617    fn set_code(&mut self, code: StatusCode) {
618        self.builder = Some(self.builder.take().unwrap().status(code))
619    }
620
621    #[inline]
622    fn set_reason(&mut self, reason: Bytes) {
623        self.reason = Some(reason);
624    }
625
626    #[inline]
627    fn set_version(&mut self, version: Version) {
628        self.builder = Some(self.builder.take().unwrap().version(version))
629    }
630
631    #[inline]
632    fn set_header(&mut self, name: HeaderName, value: HeaderValue) {
633        self.builder = Some(self.builder.take().unwrap().header(name, value))
634    }
635}
636
637/// Helper function to help parsing stream into [`Response`] instance.
638///
639/// See [`new_with`](Response::new) for more information.
640pub async fn parse_response<S>(stream: S) -> io::Result<Response<BodyReader<S>>>
641where
642    S: AsyncRead + Unpin,
643{
644    Ok(Responser::new(stream).parse().await?)
645}
646
647/// Helper function to help parsing stream into [`Response`] instance.
648///
649/// See [`new_with`](Response::new_with) for more information.
650pub async fn parse_response_with<S>(
651    stream: S,
652    config: Config,
653) -> io::Result<Response<BodyReader<S>>>
654where
655    S: AsyncRead + Unpin,
656{
657    Ok(Responser::new_with(stream, config).parse().await?)
658}
659
660/// The statemachine of [`Requester`].
661#[repr(u8)]
662#[derive(Debug, Clone, Copy, PartialEq)]
663#[allow(unused)]
664enum ResponseParseState {
665    Version = 1,
666    StatusCode = 2,
667    Reason = 3,
668    Headers = 4,
669    Finished = 5,
670}
671
672impl ResponseParseState {
673    fn next(&mut self) {
674        if let ResponseParseState::Finished = self {
675            return;
676        }
677
678        unsafe { *(self as *mut Self as *mut u8) += 1 }
679    }
680}
681
682/// The statemachine of [`Requester`].
683#[repr(u8)]
684#[derive(Debug, Clone, Copy, PartialEq)]
685#[allow(unused)]
686enum RequestParseState {
687    Method = 1,
688    Uri = 2,
689    Version = 3,
690    Headers = 4,
691    Finished = 5,
692}
693
694impl RequestParseState {
695    fn next(&mut self) {
696        if let RequestParseState::Finished = self {
697            return;
698        }
699
700        unsafe { *(self as *mut Self as *mut u8) += 1 }
701    }
702}
703
704#[inline]
705fn skip_spaces(buf: &[u8]) -> usize {
706    for (offset, b) in buf.iter().cloned().enumerate() {
707        if b != b' ' && b != b'\t' {
708            return offset;
709        }
710    }
711
712    buf.len()
713}
714
715#[inline]
716fn parse_token(buf: &[u8]) -> Option<usize> {
717    for (offset, c) in buf.iter().cloned().enumerate() {
718        if c == b' ' || c == b'\t' || c == b'\r' || c == b'\n' {
719            return Some(offset);
720        }
721    }
722
723    None
724}
725
726#[inline]
727fn parse_header_name(buf: &[u8]) -> Option<usize> {
728    for (offset, c) in buf.iter().cloned().enumerate() {
729        if c == b':' {
730            return Some(offset);
731        }
732    }
733
734    None
735}
736
737#[inline]
738fn parse_header_value(buf: &[u8]) -> Option<usize> {
739    for (offset, c) in buf.iter().cloned().enumerate() {
740        if c == b'\r' || c == b'\n' {
741            return Some(offset);
742        }
743    }
744
745    None
746}
747
748#[inline]
749fn parse_version(buf: &[u8]) -> ParseResult<Option<Version>> {
750    if buf.len() >= 8 {
751        return match &buf[0..8] {
752            b"HTTP/0.9" => Ok(Some(Version::HTTP_09)),
753            b"HTTP/1.0" => Ok(Some(Version::HTTP_10)),
754            b"HTTP/1.1" => Ok(Some(Version::HTTP_11)),
755            b"HTTP/2.0" => Ok(Some(Version::HTTP_2)),
756            b"HTTP/3.0" => Ok(Some(Version::HTTP_3)),
757            _ => Err(ParseError::Version),
758        };
759    }
760
761    Ok(None)
762}
763
764/// result of [`skip_newline`]
765enum SkipNewLine {
766    /// do nothing
767    None,
768    /// skip one newline token.
769    One(usize),
770    /// This is a sequence of two line breaks, indicating that processing
771    /// of the current paragraph has been completed.
772    Break(usize),
773    /// newline token is incomplete.
774    Incomplete,
775}
776
777#[inline]
778fn _skip_newline(buf: &[u8]) -> SkipNewLine {
779    if buf.len() > 1 {
780        if b"\r\n" == &buf[..2] {
781            return SkipNewLine::One(2);
782        }
783
784        if b"\n\n" == &buf[..2] {
785            return SkipNewLine::Break(2);
786        }
787    }
788
789    if buf.len() > 0 {
790        match buf[0] {
791            b'\n' => {
792                return SkipNewLine::One(1);
793            }
794            b'\r' => {
795                return SkipNewLine::Incomplete;
796            }
797            _ => {}
798        }
799    }
800
801    SkipNewLine::None
802}
803
804#[inline]
805fn _skip_newlines(buf: &[u8]) -> SkipNewLine {
806    let mut offset = 0;
807    let mut is_break = false;
808
809    loop {
810        match _skip_newline(&buf[offset..]) {
811            SkipNewLine::Incomplete | SkipNewLine::None => {
812                if is_break {
813                    return SkipNewLine::Break(offset);
814                }
815
816                if offset > 0 {
817                    return SkipNewLine::One(offset);
818                }
819
820                return SkipNewLine::None;
821            }
822            SkipNewLine::One(len) => {
823                if offset > 0 {
824                    is_break = true;
825                }
826
827                offset += len;
828            }
829            SkipNewLine::Break(len) => {
830                is_break = true;
831                offset += len;
832            }
833        }
834    }
835}
836
837#[inline]
838fn skip_newlines(read_buf: &mut ReadBuf) -> SkipNewLine {
839    let skip_new_line = _skip_newlines(read_buf.chunk());
840
841    skip_new_line
842}
843
844#[inline]
845fn trim_suffix_spaces(buf: &mut BytesMut) {
846    for (offset, c) in buf.iter().rev().cloned().enumerate() {
847        if c != b' ' && c != b'\t' {
848            if offset > 0 {
849                _ = buf.split_off(buf.len() - offset);
850            }
851
852            break;
853        }
854    }
855}
856
857#[inline]
858fn parse_reason<'a>(buf: &[u8]) -> Option<usize> {
859    for (offset, c) in buf.iter().cloned().enumerate() {
860        if c == b'\r' || c == b'\n' {
861            return Some(offset);
862        }
863    }
864
865    None
866}
867
868#[inline]
869fn parse_code(buf: &[u8]) -> ParseResult<Option<StatusCode>> {
870    if buf.len() >= 3 {
871        Ok(Some(StatusCode::from_bytes(&buf[..3])?))
872    } else {
873        Ok(None)
874    }
875}
876
877fn parse_header(read_buf: &mut ReadBuf) -> ParseResult<Option<(HeaderName, HeaderValue)>> {
878    let chunk = read_buf.chunk();
879
880    let mut offset = skip_spaces(chunk);
881
882    let name_offset = offset;
883
884    let name_len = match parse_header_name(&chunk[offset..]) {
885        Some(name_len) => name_len,
886        None => return Ok(None),
887    };
888
889    // advance: name + ':'
890    offset += name_len + 1;
891
892    let value_offset = skip_spaces(&chunk[offset..]);
893
894    offset += value_offset;
895
896    let value_len = match parse_header_value(&chunk[offset..]) {
897        Some(value_len) => value_len,
898        None => return Ok(None),
899    };
900
901    read_buf.split_to(name_offset);
902
903    let mut buf = read_buf.split_to(name_len);
904
905    trim_suffix_spaces(&mut buf);
906
907    let header_name = HeaderName::from_bytes(&buf)?;
908
909    read_buf.split_to(value_offset + 1);
910
911    let mut buf = read_buf.split_to(value_len);
912
913    trim_suffix_spaces(&mut buf);
914
915    let header_value = HeaderValue::from_maybe_shared(buf)?;
916
917    Ok(Some((header_name, header_value)))
918}
919
920#[cfg(test)]
921mod tests {
922    use super::*;
923
924    #[test]
925    fn test_parse_state() {
926        let mut state = RequestParseState::Method;
927
928        state.next();
929
930        assert_eq!(state, RequestParseState::Uri);
931
932        state.next();
933
934        assert_eq!(state, RequestParseState::Version);
935
936        state.next();
937
938        assert_eq!(state, RequestParseState::Headers);
939
940        state.next();
941
942        assert_eq!(state, RequestParseState::Finished);
943
944        state.next();
945
946        assert_eq!(state, RequestParseState::Finished);
947    }
948
949    use futures::io::Cursor;
950
951    use http::{Method, Request, Version};
952    use serde::{Deserialize, Serialize};
953
954    async fn parse_request(buf: &[u8]) -> ParseResult<Request<BodyReader<Cursor<Vec<u8>>>>> {
955        Requester::new(Cursor::new(buf.to_vec())).parse().await
956    }
957
958    async fn parse_request_test<F>(buf: &[u8], f: F)
959    where
960        F: FnOnce(Request<BodyReader<Cursor<Vec<u8>>>>),
961    {
962        let request = parse_request(buf).await.expect("parse request failed.");
963
964        f(request)
965    }
966
967    async fn expect_request_partial_parse(buf: &[u8]) {
968        let error = parse_request(buf).await.expect_err("");
969        if let ParseError::Eof = error {
970        } else {
971            panic!("Expect eof, but got {:?}", error);
972        }
973    }
974
975    async fn expect_request_empty_method(buf: &[u8]) {
976        let error = parse_request(buf).await.expect_err("");
977        if let ParseError::InvalidMethod(_) = error {
978        } else {
979            panic!("Expect method error, but got {:?}", error);
980        }
981    }
982
983    async fn expect_request_empty_uri(buf: &[u8]) {
984        let error = parse_request(buf).await.expect_err("");
985        if let ParseError::InvalidUri(_) = error {
986        } else {
987            panic!("Expect uri error, but got {:?}", error);
988        }
989    }
990
991    async fn parse_response(buf: &[u8]) -> ParseResult<Response<BodyReader<Cursor<Vec<u8>>>>> {
992        Responser::new(Cursor::new(buf.to_vec())).parse().await
993    }
994
995    async fn parse_response_test<F>(buf: &[u8], f: F)
996    where
997        F: FnOnce(Response<BodyReader<Cursor<Vec<u8>>>>),
998    {
999        let request = parse_response(buf).await.expect("parse request failed.");
1000
1001        f(request)
1002    }
1003
1004    async fn expect_response_partial_parse(buf: &[u8]) {
1005        let error = parse_response(buf).await.expect_err("");
1006        if let ParseError::Eof = error {
1007        } else {
1008            panic!("Expect eof, but got {:?}", error);
1009        }
1010    }
1011
1012    async fn expect_response_version(buf: &[u8]) {
1013        let error = parse_response(buf).await.expect_err("");
1014        if let ParseError::Version = error {
1015        } else {
1016            panic!("Expect version, but got {:?}", error);
1017        }
1018    }
1019
1020    #[futures_test::test]
1021    async fn response_tests() {
1022        parse_response_test(b"HTTP/1.1 200 OK\r\n\r\n", |resp| {
1023            assert_eq!(resp.version(), Version::HTTP_11);
1024            assert_eq!(resp.status(), StatusCode::OK);
1025        })
1026        .await;
1027
1028        parse_response_test(b"HTTP/1.0 403 Forbidden\nServer: foo.bar\n\n", |resp| {
1029            assert_eq!(resp.version(), Version::HTTP_10);
1030            assert_eq!(resp.status(), StatusCode::FORBIDDEN);
1031        })
1032        .await;
1033
1034        parse_response_test(b"HTTP/1.1 200 \r\n\r\n", |resp| {
1035            assert_eq!(resp.version(), Version::HTTP_11);
1036            assert_eq!(resp.status(), StatusCode::OK);
1037        })
1038        .await;
1039
1040        parse_response_test(b"HTTP/1.1 200\r\n\r\n", |resp| {
1041            assert_eq!(resp.version(), Version::HTTP_11);
1042            assert_eq!(resp.status(), StatusCode::OK);
1043        })
1044        .await;
1045
1046        parse_response_test(b"HTTP/1.1 200\r\nFoo: bar\r\n\r\n", |resp| {
1047            assert_eq!(resp.version(), Version::HTTP_11);
1048            assert_eq!(resp.status(), StatusCode::OK);
1049            assert_eq!(resp.headers().len(), 1);
1050
1051            assert_eq!(resp.headers().get("Foo").unwrap().to_str().unwrap(), "bar");
1052        })
1053        .await;
1054
1055        parse_response_test(b"HTTP/1.1 200 X\xFFZ\r\n\r\n", |resp| {
1056            assert_eq!(resp.version(), Version::HTTP_11);
1057            assert_eq!(resp.status(), StatusCode::OK);
1058        })
1059        .await;
1060
1061        parse_response_test(b"HTTP/1.1 200 \x00\r\n\r\n", |resp| {
1062            assert_eq!(resp.version(), Version::HTTP_11);
1063            assert_eq!(resp.status(), StatusCode::OK);
1064        })
1065        .await;
1066
1067        parse_response_test(b"HTTP/1.0 200\nContent-type: text/html\n\n", |resp| {
1068            assert_eq!(resp.version(), Version::HTTP_10);
1069            assert_eq!(resp.status(), StatusCode::OK);
1070            assert_eq!(resp.headers().len(), 1);
1071            assert_eq!(
1072                resp.headers()
1073                    .get("Content-type")
1074                    .unwrap()
1075                    .to_str()
1076                    .unwrap(),
1077                "text/html"
1078            );
1079        })
1080        .await;
1081
1082        parse_response_test( b"HTTP/1.1 200 OK\r\nAccess-Control-Allow-Credentials : true\r\nBread: baguette\r\n\r\n", |resp| {
1083            assert_eq!(resp.version(), Version::HTTP_11);
1084            assert_eq!(resp.status(), StatusCode::OK);
1085            assert_eq!(resp.headers().len(), 2);
1086            assert_eq!(
1087                resp.headers()
1088                    .get("Access-Control-Allow-Credentials")
1089                    .unwrap()
1090                    .to_str()
1091                    .unwrap(),
1092                "true"
1093            );
1094
1095            assert_eq!(
1096                resp.headers()
1097                    .get("Bread")
1098                    .unwrap()
1099                    .to_str()
1100                    .unwrap(),
1101                "baguette"
1102            );
1103        })
1104        .await;
1105
1106        expect_response_partial_parse(b"HTTP/1.1").await;
1107
1108        expect_response_partial_parse(b"HTTP/1.1 200").await;
1109
1110        expect_response_partial_parse(b"HTTP/1.1 200 OK\r\nServer: yolo\r\n").await;
1111
1112        expect_response_version(b"\n\nHTTP/1.1 200 OK\n\n").await;
1113    }
1114
1115    #[futures_test::test]
1116    async fn request_tests() {
1117        parse_request_test(b"GET / HTTP/1.1\r\n\r\n", |req| {
1118            assert_eq!(req.method(), Method::GET);
1119            assert_eq!(req.uri().to_string(), "/");
1120            assert_eq!(req.version(), Version::HTTP_11);
1121            assert_eq!(req.headers().len(), 0);
1122        })
1123        .await;
1124
1125        parse_request_test(b"GET /thing?data=a HTTP/1.1\r\n\r\n", |req| {
1126            assert_eq!(req.method(), Method::GET);
1127            assert_eq!(req.uri().to_string(), "/thing?data=a");
1128            assert_eq!(req.version(), Version::HTTP_11);
1129            assert_eq!(req.headers().len(), 0);
1130        })
1131        .await;
1132
1133        parse_request_test(b"GET /thing?data=a^ HTTP/1.1\r\n\r\n", |req| {
1134            assert_eq!(req.method(), Method::GET);
1135            assert_eq!(req.uri().to_string(), "/thing?data=a^");
1136            assert_eq!(req.version(), Version::HTTP_11);
1137            assert_eq!(req.headers().len(), 0);
1138        })
1139        .await;
1140
1141        parse_request_test(
1142            b"GET / HTTP/1.1\r\nHost: foo.com\r\nCookie: \r\n\r\n",
1143            |req| {
1144                assert_eq!(req.method(), Method::GET);
1145                assert_eq!(req.uri().to_string(), "/");
1146                assert_eq!(req.version(), Version::HTTP_11);
1147                assert_eq!(req.headers().len(), 2);
1148                assert_eq!(
1149                    req.headers().get("Host").unwrap().to_str().unwrap(),
1150                    "foo.com"
1151                );
1152                assert_eq!(req.headers().get("Cookie").unwrap().to_str().unwrap(), "");
1153            },
1154        )
1155        .await;
1156
1157        parse_request_test(
1158            b"GET / HTTP/1.1\r\nHost: \tfoo.com\t \r\nCookie: \t \r\n\r\n",
1159            |req| {
1160                assert_eq!(req.method(), Method::GET);
1161                assert_eq!(req.uri().to_string(), "/");
1162                assert_eq!(req.version(), Version::HTTP_11);
1163                assert_eq!(req.headers().len(), 2);
1164                assert_eq!(
1165                    req.headers().get("Host").unwrap().to_str().unwrap(),
1166                    "foo.com"
1167                );
1168                assert_eq!(req.headers().get("Cookie").unwrap().to_str().unwrap(), "");
1169            },
1170        )
1171        .await;
1172
1173        parse_request_test(
1174            b"GET / HTTP/1.1\r\nUser-Agent: some\tagent\r\n\r\n",
1175            |req| {
1176                assert_eq!(req.method(), Method::GET);
1177                assert_eq!(req.uri().to_string(), "/");
1178                assert_eq!(req.version(), Version::HTTP_11);
1179                assert_eq!(req.headers().len(), 1);
1180                assert_eq!(
1181                    req.headers().get("User-Agent").unwrap().to_str().unwrap(),
1182                    "some\tagent"
1183                );
1184            },
1185        )
1186        .await;
1187
1188        parse_request_test(
1189            b"GET / HTTP/1.1\r\nUser-Agent: 1234567890some\tagent\r\n\r\n",
1190            |req| {
1191                assert_eq!(req.method(), Method::GET);
1192                assert_eq!(req.uri().to_string(), "/");
1193                assert_eq!(req.version(), Version::HTTP_11);
1194                assert_eq!(req.headers().len(), 1);
1195                assert_eq!(
1196                    req.headers().get("User-Agent").unwrap().to_str().unwrap(),
1197                    "1234567890some\tagent"
1198                );
1199            },
1200        )
1201        .await;
1202
1203        parse_request_test(
1204            b"GET / HTTP/1.1\r\nUser-Agent: 1234567890some\t1234567890agent1234567890\r\n\r\n",
1205            |req| {
1206                assert_eq!(req.method(), Method::GET);
1207                assert_eq!(req.uri().to_string(), "/");
1208                assert_eq!(req.version(), Version::HTTP_11);
1209                assert_eq!(req.headers().len(), 1);
1210                assert_eq!(
1211                    req.headers().get("User-Agent").unwrap().to_str().unwrap(),
1212                    "1234567890some\t1234567890agent1234567890"
1213                );
1214            },
1215        )
1216        .await;
1217
1218        parse_request_test(
1219            b"GET / HTTP/1.1\r\nHost: foo.com\r\nUser-Agent: \xe3\x81\xb2\xe3/1.0\r\n\r\n",
1220            |req| {
1221                assert_eq!(req.method(), Method::GET);
1222                assert_eq!(req.uri().to_string(), "/");
1223                assert_eq!(req.version(), Version::HTTP_11);
1224                assert_eq!(req.headers().len(), 2);
1225                assert_eq!(
1226                    req.headers().get("Host").unwrap().to_str().unwrap(),
1227                    "foo.com"
1228                );
1229                assert_eq!(
1230                    req.headers().get("User-Agent").unwrap().as_bytes(),
1231                    b"\xe3\x81\xb2\xe3/1.0"
1232                );
1233            },
1234        )
1235        .await;
1236
1237        parse_request_test(b"GET /\\?wayne\\=5 HTTP/1.1\r\n\r\n", |req| {
1238            assert_eq!(req.method(), Method::GET);
1239            assert_eq!(req.uri().to_string(), "/\\?wayne\\=5");
1240            assert_eq!(req.version(), Version::HTTP_11);
1241            assert_eq!(req.headers().len(), 0);
1242        })
1243        .await;
1244
1245        expect_request_partial_parse(b"GET / HTTP/1.1\r\n\r").await;
1246
1247        expect_request_partial_parse(b"GET / HTTP/1.1\r\nHost: yolo\r\n").await;
1248
1249        expect_request_empty_uri(b"GET  HTTP/1.1\r\n\r\n").await;
1250
1251        expect_request_empty_method(b"  HTTP/1.1\r\n\r\n").await;
1252
1253        expect_request_empty_method(b" / HTTP/1.1\r\n\r\n").await;
1254    }
1255
1256    #[derive(Serialize, Deserialize, PartialEq, Debug)]
1257    struct Mock {
1258        a: i32,
1259        b: String,
1260    }
1261
1262    #[futures_test::test]
1263    async fn test_from_json() {
1264        let mock = Mock {
1265            a: 10,
1266            b: "hello".to_string(),
1267        };
1268
1269        let mut json_data = Bytes::from(serde_json::to_string_pretty(&mock).unwrap());
1270
1271        let body_reader = BodyReader::new(
1272            json_data.split_to(json_data.len() / 2),
1273            Cursor::new(json_data),
1274        );
1275
1276        let mock2 = body_reader.from_json::<Mock>(None).await.unwrap();
1277
1278        assert_eq!(mock, mock2);
1279    }
1280}