1use std::{io, time::Duration};
2
3use bytes::{Bytes, BytesMut};
4use futures::{AsyncRead, AsyncReadExt};
5use http::{
6 header::{InvalidHeaderName, InvalidHeaderValue},
7 method::InvalidMethod,
8 status::InvalidStatusCode,
9 uri::InvalidUri,
10 HeaderName, HeaderValue, Method, Request, Response, StatusCode, Uri, Version,
11};
12use rasi::{io::Cursor, time::TimeoutExt};
13
14use crate::utils::ReadBuf;
15
16#[derive(Debug, thiserror::Error)]
17pub enum ParseError {
18 #[error(transparent)]
19 HttpError(#[from] http::Error),
20
21 #[error("Http header parse buf overflow, max={0}")]
22 ParseBufOverflow(usize),
23
24 #[error(transparent)]
25 IoError(#[from] io::Error),
26
27 #[error("Unable to complete http parsing, reached the end of the stream.")]
28 Eof,
29
30 #[error("Miss method field.")]
31 Method,
32
33 #[error(transparent)]
34 InvalidMethod(#[from] InvalidMethod),
35
36 #[error("Miss uri field.")]
37 Uri,
38
39 #[error(transparent)]
40 InvalidUri(#[from] InvalidUri),
41
42 #[error("Invalid http version.")]
43 Version,
44
45 #[error(transparent)]
46 InvalidHeaderName(#[from] InvalidHeaderName),
47
48 #[error(transparent)]
49 InvalidHeaderValue(#[from] InvalidHeaderValue),
50
51 #[error(transparent)]
52 InvalidStatusCode(#[from] InvalidStatusCode),
53
54 #[error(transparent)]
55 SerdeJsonError(#[from] serde_json::Error),
56}
57
58impl From<ParseError> for io::Error {
59 fn from(value: ParseError) -> Self {
60 match value {
61 ParseError::IoError(err) => err,
62 _ => io::Error::new(io::ErrorKind::Other, value),
63 }
64 }
65}
66
67pub type ParseResult<T> = Result<T, ParseError>;
69
70#[derive(Debug)]
72pub struct Config {
73 pub parsing_headers_max_buf: usize,
75}
76
77impl Default for Config {
78 fn default() -> Self {
79 Self {
80 parsing_headers_max_buf: 2048,
81 }
82 }
83}
84
85#[derive(Debug)]
87pub struct BodyReader<S> {
88 cached: Bytes,
90
91 stream: S,
93}
94
95impl<S> BodyReader<S> {
96 pub fn new(cached: Bytes, stream: S) -> Self {
98 Self { cached, stream }
99 }
100}
101
102impl<S> BodyReader<S>
103where
104 S: AsyncRead + Unpin,
105{
106 pub fn into_read(self) -> impl AsyncRead {
108 Cursor::new(self.cached).chain(self.stream)
109 }
110
111 pub async fn into_bytes(
115 self,
116 max_body_len: usize,
117 timeout: Option<Duration>,
118 ) -> ParseResult<BytesMut> {
119 let mut stream = self.into_read();
120
121 let mut read_buf = ReadBuf::with_capacity(max_body_len);
122
123 loop {
124 let chunk_mut = read_buf.chunk_mut();
125
126 if chunk_mut.len() == 0 {
128 return Err(ParseError::ParseBufOverflow(max_body_len));
129 }
130
131 let read_size = if let Some(timeout) = timeout {
132 match stream.read(chunk_mut).timeout(timeout).await {
133 Some(r) => r?,
134 None => {
135 return Err(io::Error::new(
136 io::ErrorKind::TimedOut,
137 "read body content timeout",
138 )
139 .into())
140 }
141 }
142 } else {
143 stream.read(chunk_mut).await?
144 };
145
146 if read_size == 0 {
147 break;
148 }
149
150 read_buf.advance_mut(read_size);
151 }
152
153 Ok(read_buf.into_bytes_mut(None))
154 }
155
156 pub async fn from_json<T>(self, timeout: Option<Duration>) -> ParseResult<T>
161 where
162 for<'a> T: serde::de::Deserialize<'a>,
163 {
164 self.from_json_with(4096, timeout).await
165 }
166
167 pub async fn from_json_with<T>(
171 self,
172 max_body_len: usize,
173 timeout: Option<Duration>,
174 ) -> ParseResult<T>
175 where
176 for<'a> T: serde::de::Deserialize<'a>,
177 {
178 let buf = self.into_bytes(max_body_len, timeout).await?;
179
180 Ok(serde_json::from_slice(&buf)?)
181 }
182}
183pub struct Requester<S> {
188 config: Config,
190
191 state: RequestParseState,
193
194 stream: S,
196
197 builder: Option<http::request::Builder>,
199}
200
201impl<S> Requester<S> {
202 pub fn new_with(stream: S, config: Config) -> Self {
204 Self {
205 config,
206 state: RequestParseState::Method,
207 stream,
208 builder: Some(http::request::Builder::new()),
209 }
210 }
211
212 pub fn new(stream: S) -> Self {
214 Self::new_with(stream, Default::default())
215 }
216}
217
218impl<S> Requester<S>
219where
220 S: AsyncRead + Unpin,
221{
222 pub async fn parse(mut self) -> ParseResult<Request<BodyReader<S>>> {
224 let mut read_buf = ReadBuf::with_capacity(self.config.parsing_headers_max_buf);
226
227 'out: while self.state != RequestParseState::Finished {
228 let chunk_mut = read_buf.chunk_mut();
229
230 if chunk_mut.len() == 0 {
232 return Err(ParseError::ParseBufOverflow(
233 self.config.parsing_headers_max_buf,
234 ));
235 }
236
237 let read_size = self.stream.read(chunk_mut).await?;
238
239 if read_size == 0 {
241 return Err(ParseError::Eof);
242 }
243
244 read_buf.advance_mut(read_size);
245
246 'inner: while read_buf.chunk().len() > 0 {
247 match self.state {
248 RequestParseState::Method => {
249 if !self.parse_method(&mut read_buf)? {
250 break 'inner;
251 }
252 }
253 RequestParseState::Uri => {
254 if !self.parse_uri(&mut read_buf)? {
255 break 'inner;
256 }
257 }
258 RequestParseState::Version => {
259 if !self.parse_version(&mut read_buf)? {
260 break 'inner;
261 }
262 }
263 RequestParseState::Headers => {
264 if !self.parse_header(&mut read_buf)? {
265 break 'inner;
266 }
267 }
268 RequestParseState::Finished => break 'out,
269 }
270 }
271
272 if let RequestParseState::Finished = self.state {
273 break;
274 }
275 }
276
277 let cached = read_buf.into_bytes(None);
278
279 Ok(self
281 .builder
282 .unwrap()
283 .body(BodyReader::new(cached, self.stream))?)
284 }
285
286 #[inline]
287 fn skip_spaces(&mut self, read_buf: &mut ReadBuf) {
288 read_buf.split_to(skip_spaces(read_buf.chunk()));
289 }
290
291 #[inline]
292 fn parse_method(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
293 self.skip_spaces(read_buf);
294
295 if let Some(len) = parse_token(read_buf.chunk()) {
296 if len == 0 {
297 return Err(ParseError::Method);
298 }
299
300 let buf = read_buf.split_to(len);
301
302 self.set_method(Method::from_bytes(&buf)?);
303
304 self.state.next();
305
306 Ok(true)
307 } else {
308 Ok(false)
310 }
311 }
312
313 #[inline]
314 fn parse_uri(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
315 self.skip_spaces(read_buf);
316
317 if let Some(len) = parse_token(read_buf.chunk()) {
318 if len == 0 {
319 return Err(ParseError::Uri);
320 }
321
322 let buf = read_buf.split_to(len);
323
324 self.set_uri(Uri::from_maybe_shared(buf)?);
325
326 self.state.next();
327
328 Ok(true)
329 } else {
330 Ok(false)
332 }
333 }
334
335 #[inline]
336 fn parse_version(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
337 self.skip_spaces(read_buf);
338
339 if let Some(version) = parse_version(read_buf.chunk())? {
340 read_buf.split_to(8);
342
343 self.set_version(version);
344
345 self.state.next();
346
347 Ok(true)
348 } else {
349 Ok(false)
350 }
351 }
352
353 #[inline]
354 fn parse_header(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
355 match skip_newlines(read_buf) {
356 SkipNewLine::Break(len) => {
357 read_buf.split_to(len);
358 self.state.next();
359 return Ok(false);
360 }
361 SkipNewLine::Incomplete => return Ok(false),
362 SkipNewLine::One(len) => {
363 if read_buf.remaining() == len {
364 return Ok(false);
365 }
366
367 read_buf.split_to(len);
368 }
369 SkipNewLine::None => {}
370 }
371
372 match parse_header(read_buf)? {
373 Some((name, value)) => {
374 self.set_header(name, value);
375 Ok(true)
376 }
377 None => Ok(false),
378 }
379 }
380
381 #[inline]
382 fn set_method(&mut self, method: Method) {
383 self.builder = Some(self.builder.take().unwrap().method(method))
384 }
385
386 #[inline]
387 fn set_uri(&mut self, uri: Uri) {
388 self.builder = Some(self.builder.take().unwrap().uri(uri))
389 }
390
391 #[inline]
392 fn set_version(&mut self, version: Version) {
393 self.builder = Some(self.builder.take().unwrap().version(version))
394 }
395
396 #[inline]
397 fn set_header(&mut self, name: HeaderName, value: HeaderValue) {
398 self.builder = Some(self.builder.take().unwrap().header(name, value))
399 }
400}
401
402pub async fn parse_request<S>(stream: S) -> io::Result<Request<BodyReader<S>>>
406where
407 S: AsyncRead + Unpin,
408{
409 Ok(Requester::new(stream).parse().await?)
410}
411
412pub async fn parse_request_with<S>(stream: S, config: Config) -> io::Result<Request<BodyReader<S>>>
416where
417 S: AsyncRead + Unpin,
418{
419 Ok(Requester::new_with(stream, config).parse().await?)
420}
421
422pub struct Responser<S> {
427 config: Config,
429
430 state: ResponseParseState,
432
433 stream: S,
435
436 builder: Option<http::response::Builder>,
438
439 reason: Option<Bytes>,
441}
442
443impl<S> Responser<S> {
444 pub fn new_with(stream: S, config: Config) -> Self {
446 Self {
447 config,
448 state: ResponseParseState::Version,
449 stream,
450 builder: Some(http::response::Builder::new()),
451 reason: Some(Bytes::from_static(b"")),
452 }
453 }
454
455 pub fn new(stream: S) -> Self {
457 Self::new_with(stream, Default::default())
458 }
459}
460
461impl<S> Responser<S>
462where
463 S: AsyncRead + Unpin,
464{
465 pub async fn parse(mut self) -> ParseResult<Response<BodyReader<S>>> {
467 let mut read_buf = ReadBuf::with_capacity(self.config.parsing_headers_max_buf);
469
470 'out: while self.state != ResponseParseState::Finished {
471 let chunk_mut = read_buf.chunk_mut();
472
473 if chunk_mut.len() == 0 {
475 return Err(ParseError::ParseBufOverflow(
476 self.config.parsing_headers_max_buf,
477 ));
478 }
479
480 let read_size = self.stream.read(chunk_mut).await?;
481
482 if read_size == 0 {
484 return Err(ParseError::Eof);
485 }
486
487 read_buf.advance_mut(read_size);
488
489 'inner: while read_buf.chunk().len() > 0 {
490 match self.state {
491 ResponseParseState::Version => {
492 if !self.parse_version(&mut read_buf)? {
493 break 'inner;
494 }
495 }
496 ResponseParseState::StatusCode => {
497 if !self.parse_status_code(&mut read_buf)? {
498 break 'inner;
499 }
500 }
501 ResponseParseState::Reason => {
502 if !self.parse_reason(&mut read_buf)? {
503 break 'inner;
504 }
505 }
506 ResponseParseState::Headers => {
507 if !self.parse_header(&mut read_buf)? {
508 break 'inner;
509 }
510 }
511 ResponseParseState::Finished => break 'out,
512 }
513 }
514
515 if let ResponseParseState::Finished = self.state {
516 break;
517 }
518 }
519
520 let cached = read_buf.into_bytes(None);
521
522 Ok(self
524 .builder
525 .unwrap()
526 .body(BodyReader::new(cached, self.stream))?)
527 }
528
529 #[inline]
530 fn skip_spaces(&mut self, read_buf: &mut ReadBuf) {
531 read_buf.split_to(skip_spaces(read_buf.chunk()));
532 }
533
534 #[inline]
535 fn parse_status_code(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
536 self.skip_spaces(read_buf);
537
538 match parse_code(read_buf.chunk())? {
539 Some(code) => {
540 self.set_code(code);
541
542 read_buf.split_to(3);
543
544 self.state.next();
545
546 Ok(true)
547 }
548 None => Ok(false),
549 }
550 }
551
552 #[inline]
553 fn parse_reason(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
554 self.skip_spaces(read_buf);
555
556 match parse_reason(read_buf.chunk()) {
557 Some(len) => {
558 let buf = read_buf.split_to(len);
559
560 self.set_reason(buf.freeze());
561
562 self.state.next();
563
564 Ok(true)
565 }
566 None => Ok(false),
567 }
568 }
569
570 #[inline]
571 fn parse_version(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
572 self.skip_spaces(read_buf);
573
574 if let Some(version) = parse_version(read_buf.chunk())? {
575 read_buf.split_to(8);
577
578 self.set_version(version);
579
580 self.state.next();
581
582 Ok(true)
583 } else {
584 Ok(false)
585 }
586 }
587
588 #[inline]
589 fn parse_header(&mut self, read_buf: &mut ReadBuf) -> ParseResult<bool> {
590 match skip_newlines(read_buf) {
591 SkipNewLine::Break(len) => {
592 read_buf.split_to(len);
593 self.state.next();
594 return Ok(false);
595 }
596 SkipNewLine::Incomplete => return Ok(false),
597 SkipNewLine::One(len) => {
598 if read_buf.remaining() == len {
599 return Ok(false);
600 }
601
602 read_buf.split_to(len);
603 }
604 SkipNewLine::None => {}
605 }
606
607 match parse_header(read_buf)? {
608 Some((name, value)) => {
609 self.set_header(name, value);
610 Ok(true)
611 }
612 None => Ok(false),
613 }
614 }
615
616 #[inline]
617 fn set_code(&mut self, code: StatusCode) {
618 self.builder = Some(self.builder.take().unwrap().status(code))
619 }
620
621 #[inline]
622 fn set_reason(&mut self, reason: Bytes) {
623 self.reason = Some(reason);
624 }
625
626 #[inline]
627 fn set_version(&mut self, version: Version) {
628 self.builder = Some(self.builder.take().unwrap().version(version))
629 }
630
631 #[inline]
632 fn set_header(&mut self, name: HeaderName, value: HeaderValue) {
633 self.builder = Some(self.builder.take().unwrap().header(name, value))
634 }
635}
636
637pub async fn parse_response<S>(stream: S) -> io::Result<Response<BodyReader<S>>>
641where
642 S: AsyncRead + Unpin,
643{
644 Ok(Responser::new(stream).parse().await?)
645}
646
647pub async fn parse_response_with<S>(
651 stream: S,
652 config: Config,
653) -> io::Result<Response<BodyReader<S>>>
654where
655 S: AsyncRead + Unpin,
656{
657 Ok(Responser::new_with(stream, config).parse().await?)
658}
659
660#[repr(u8)]
662#[derive(Debug, Clone, Copy, PartialEq)]
663#[allow(unused)]
664enum ResponseParseState {
665 Version = 1,
666 StatusCode = 2,
667 Reason = 3,
668 Headers = 4,
669 Finished = 5,
670}
671
672impl ResponseParseState {
673 fn next(&mut self) {
674 if let ResponseParseState::Finished = self {
675 return;
676 }
677
678 unsafe { *(self as *mut Self as *mut u8) += 1 }
679 }
680}
681
682#[repr(u8)]
684#[derive(Debug, Clone, Copy, PartialEq)]
685#[allow(unused)]
686enum RequestParseState {
687 Method = 1,
688 Uri = 2,
689 Version = 3,
690 Headers = 4,
691 Finished = 5,
692}
693
694impl RequestParseState {
695 fn next(&mut self) {
696 if let RequestParseState::Finished = self {
697 return;
698 }
699
700 unsafe { *(self as *mut Self as *mut u8) += 1 }
701 }
702}
703
704#[inline]
705fn skip_spaces(buf: &[u8]) -> usize {
706 for (offset, b) in buf.iter().cloned().enumerate() {
707 if b != b' ' && b != b'\t' {
708 return offset;
709 }
710 }
711
712 buf.len()
713}
714
715#[inline]
716fn parse_token(buf: &[u8]) -> Option<usize> {
717 for (offset, c) in buf.iter().cloned().enumerate() {
718 if c == b' ' || c == b'\t' || c == b'\r' || c == b'\n' {
719 return Some(offset);
720 }
721 }
722
723 None
724}
725
726#[inline]
727fn parse_header_name(buf: &[u8]) -> Option<usize> {
728 for (offset, c) in buf.iter().cloned().enumerate() {
729 if c == b':' {
730 return Some(offset);
731 }
732 }
733
734 None
735}
736
737#[inline]
738fn parse_header_value(buf: &[u8]) -> Option<usize> {
739 for (offset, c) in buf.iter().cloned().enumerate() {
740 if c == b'\r' || c == b'\n' {
741 return Some(offset);
742 }
743 }
744
745 None
746}
747
748#[inline]
749fn parse_version(buf: &[u8]) -> ParseResult<Option<Version>> {
750 if buf.len() >= 8 {
751 return match &buf[0..8] {
752 b"HTTP/0.9" => Ok(Some(Version::HTTP_09)),
753 b"HTTP/1.0" => Ok(Some(Version::HTTP_10)),
754 b"HTTP/1.1" => Ok(Some(Version::HTTP_11)),
755 b"HTTP/2.0" => Ok(Some(Version::HTTP_2)),
756 b"HTTP/3.0" => Ok(Some(Version::HTTP_3)),
757 _ => Err(ParseError::Version),
758 };
759 }
760
761 Ok(None)
762}
763
764enum SkipNewLine {
766 None,
768 One(usize),
770 Break(usize),
773 Incomplete,
775}
776
777#[inline]
778fn _skip_newline(buf: &[u8]) -> SkipNewLine {
779 if buf.len() > 1 {
780 if b"\r\n" == &buf[..2] {
781 return SkipNewLine::One(2);
782 }
783
784 if b"\n\n" == &buf[..2] {
785 return SkipNewLine::Break(2);
786 }
787 }
788
789 if buf.len() > 0 {
790 match buf[0] {
791 b'\n' => {
792 return SkipNewLine::One(1);
793 }
794 b'\r' => {
795 return SkipNewLine::Incomplete;
796 }
797 _ => {}
798 }
799 }
800
801 SkipNewLine::None
802}
803
804#[inline]
805fn _skip_newlines(buf: &[u8]) -> SkipNewLine {
806 let mut offset = 0;
807 let mut is_break = false;
808
809 loop {
810 match _skip_newline(&buf[offset..]) {
811 SkipNewLine::Incomplete | SkipNewLine::None => {
812 if is_break {
813 return SkipNewLine::Break(offset);
814 }
815
816 if offset > 0 {
817 return SkipNewLine::One(offset);
818 }
819
820 return SkipNewLine::None;
821 }
822 SkipNewLine::One(len) => {
823 if offset > 0 {
824 is_break = true;
825 }
826
827 offset += len;
828 }
829 SkipNewLine::Break(len) => {
830 is_break = true;
831 offset += len;
832 }
833 }
834 }
835}
836
837#[inline]
838fn skip_newlines(read_buf: &mut ReadBuf) -> SkipNewLine {
839 let skip_new_line = _skip_newlines(read_buf.chunk());
840
841 skip_new_line
842}
843
844#[inline]
845fn trim_suffix_spaces(buf: &mut BytesMut) {
846 for (offset, c) in buf.iter().rev().cloned().enumerate() {
847 if c != b' ' && c != b'\t' {
848 if offset > 0 {
849 _ = buf.split_off(buf.len() - offset);
850 }
851
852 break;
853 }
854 }
855}
856
857#[inline]
858fn parse_reason<'a>(buf: &[u8]) -> Option<usize> {
859 for (offset, c) in buf.iter().cloned().enumerate() {
860 if c == b'\r' || c == b'\n' {
861 return Some(offset);
862 }
863 }
864
865 None
866}
867
868#[inline]
869fn parse_code(buf: &[u8]) -> ParseResult<Option<StatusCode>> {
870 if buf.len() >= 3 {
871 Ok(Some(StatusCode::from_bytes(&buf[..3])?))
872 } else {
873 Ok(None)
874 }
875}
876
877fn parse_header(read_buf: &mut ReadBuf) -> ParseResult<Option<(HeaderName, HeaderValue)>> {
878 let chunk = read_buf.chunk();
879
880 let mut offset = skip_spaces(chunk);
881
882 let name_offset = offset;
883
884 let name_len = match parse_header_name(&chunk[offset..]) {
885 Some(name_len) => name_len,
886 None => return Ok(None),
887 };
888
889 offset += name_len + 1;
891
892 let value_offset = skip_spaces(&chunk[offset..]);
893
894 offset += value_offset;
895
896 let value_len = match parse_header_value(&chunk[offset..]) {
897 Some(value_len) => value_len,
898 None => return Ok(None),
899 };
900
901 read_buf.split_to(name_offset);
902
903 let mut buf = read_buf.split_to(name_len);
904
905 trim_suffix_spaces(&mut buf);
906
907 let header_name = HeaderName::from_bytes(&buf)?;
908
909 read_buf.split_to(value_offset + 1);
910
911 let mut buf = read_buf.split_to(value_len);
912
913 trim_suffix_spaces(&mut buf);
914
915 let header_value = HeaderValue::from_maybe_shared(buf)?;
916
917 Ok(Some((header_name, header_value)))
918}
919
920#[cfg(test)]
921mod tests {
922 use super::*;
923
924 #[test]
925 fn test_parse_state() {
926 let mut state = RequestParseState::Method;
927
928 state.next();
929
930 assert_eq!(state, RequestParseState::Uri);
931
932 state.next();
933
934 assert_eq!(state, RequestParseState::Version);
935
936 state.next();
937
938 assert_eq!(state, RequestParseState::Headers);
939
940 state.next();
941
942 assert_eq!(state, RequestParseState::Finished);
943
944 state.next();
945
946 assert_eq!(state, RequestParseState::Finished);
947 }
948
949 use futures::io::Cursor;
950
951 use http::{Method, Request, Version};
952 use serde::{Deserialize, Serialize};
953
954 async fn parse_request(buf: &[u8]) -> ParseResult<Request<BodyReader<Cursor<Vec<u8>>>>> {
955 Requester::new(Cursor::new(buf.to_vec())).parse().await
956 }
957
958 async fn parse_request_test<F>(buf: &[u8], f: F)
959 where
960 F: FnOnce(Request<BodyReader<Cursor<Vec<u8>>>>),
961 {
962 let request = parse_request(buf).await.expect("parse request failed.");
963
964 f(request)
965 }
966
967 async fn expect_request_partial_parse(buf: &[u8]) {
968 let error = parse_request(buf).await.expect_err("");
969 if let ParseError::Eof = error {
970 } else {
971 panic!("Expect eof, but got {:?}", error);
972 }
973 }
974
975 async fn expect_request_empty_method(buf: &[u8]) {
976 let error = parse_request(buf).await.expect_err("");
977 if let ParseError::InvalidMethod(_) = error {
978 } else {
979 panic!("Expect method error, but got {:?}", error);
980 }
981 }
982
983 async fn expect_request_empty_uri(buf: &[u8]) {
984 let error = parse_request(buf).await.expect_err("");
985 if let ParseError::InvalidUri(_) = error {
986 } else {
987 panic!("Expect uri error, but got {:?}", error);
988 }
989 }
990
991 async fn parse_response(buf: &[u8]) -> ParseResult<Response<BodyReader<Cursor<Vec<u8>>>>> {
992 Responser::new(Cursor::new(buf.to_vec())).parse().await
993 }
994
995 async fn parse_response_test<F>(buf: &[u8], f: F)
996 where
997 F: FnOnce(Response<BodyReader<Cursor<Vec<u8>>>>),
998 {
999 let request = parse_response(buf).await.expect("parse request failed.");
1000
1001 f(request)
1002 }
1003
1004 async fn expect_response_partial_parse(buf: &[u8]) {
1005 let error = parse_response(buf).await.expect_err("");
1006 if let ParseError::Eof = error {
1007 } else {
1008 panic!("Expect eof, but got {:?}", error);
1009 }
1010 }
1011
1012 async fn expect_response_version(buf: &[u8]) {
1013 let error = parse_response(buf).await.expect_err("");
1014 if let ParseError::Version = error {
1015 } else {
1016 panic!("Expect version, but got {:?}", error);
1017 }
1018 }
1019
1020 #[futures_test::test]
1021 async fn response_tests() {
1022 parse_response_test(b"HTTP/1.1 200 OK\r\n\r\n", |resp| {
1023 assert_eq!(resp.version(), Version::HTTP_11);
1024 assert_eq!(resp.status(), StatusCode::OK);
1025 })
1026 .await;
1027
1028 parse_response_test(b"HTTP/1.0 403 Forbidden\nServer: foo.bar\n\n", |resp| {
1029 assert_eq!(resp.version(), Version::HTTP_10);
1030 assert_eq!(resp.status(), StatusCode::FORBIDDEN);
1031 })
1032 .await;
1033
1034 parse_response_test(b"HTTP/1.1 200 \r\n\r\n", |resp| {
1035 assert_eq!(resp.version(), Version::HTTP_11);
1036 assert_eq!(resp.status(), StatusCode::OK);
1037 })
1038 .await;
1039
1040 parse_response_test(b"HTTP/1.1 200\r\n\r\n", |resp| {
1041 assert_eq!(resp.version(), Version::HTTP_11);
1042 assert_eq!(resp.status(), StatusCode::OK);
1043 })
1044 .await;
1045
1046 parse_response_test(b"HTTP/1.1 200\r\nFoo: bar\r\n\r\n", |resp| {
1047 assert_eq!(resp.version(), Version::HTTP_11);
1048 assert_eq!(resp.status(), StatusCode::OK);
1049 assert_eq!(resp.headers().len(), 1);
1050
1051 assert_eq!(resp.headers().get("Foo").unwrap().to_str().unwrap(), "bar");
1052 })
1053 .await;
1054
1055 parse_response_test(b"HTTP/1.1 200 X\xFFZ\r\n\r\n", |resp| {
1056 assert_eq!(resp.version(), Version::HTTP_11);
1057 assert_eq!(resp.status(), StatusCode::OK);
1058 })
1059 .await;
1060
1061 parse_response_test(b"HTTP/1.1 200 \x00\r\n\r\n", |resp| {
1062 assert_eq!(resp.version(), Version::HTTP_11);
1063 assert_eq!(resp.status(), StatusCode::OK);
1064 })
1065 .await;
1066
1067 parse_response_test(b"HTTP/1.0 200\nContent-type: text/html\n\n", |resp| {
1068 assert_eq!(resp.version(), Version::HTTP_10);
1069 assert_eq!(resp.status(), StatusCode::OK);
1070 assert_eq!(resp.headers().len(), 1);
1071 assert_eq!(
1072 resp.headers()
1073 .get("Content-type")
1074 .unwrap()
1075 .to_str()
1076 .unwrap(),
1077 "text/html"
1078 );
1079 })
1080 .await;
1081
1082 parse_response_test( b"HTTP/1.1 200 OK\r\nAccess-Control-Allow-Credentials : true\r\nBread: baguette\r\n\r\n", |resp| {
1083 assert_eq!(resp.version(), Version::HTTP_11);
1084 assert_eq!(resp.status(), StatusCode::OK);
1085 assert_eq!(resp.headers().len(), 2);
1086 assert_eq!(
1087 resp.headers()
1088 .get("Access-Control-Allow-Credentials")
1089 .unwrap()
1090 .to_str()
1091 .unwrap(),
1092 "true"
1093 );
1094
1095 assert_eq!(
1096 resp.headers()
1097 .get("Bread")
1098 .unwrap()
1099 .to_str()
1100 .unwrap(),
1101 "baguette"
1102 );
1103 })
1104 .await;
1105
1106 expect_response_partial_parse(b"HTTP/1.1").await;
1107
1108 expect_response_partial_parse(b"HTTP/1.1 200").await;
1109
1110 expect_response_partial_parse(b"HTTP/1.1 200 OK\r\nServer: yolo\r\n").await;
1111
1112 expect_response_version(b"\n\nHTTP/1.1 200 OK\n\n").await;
1113 }
1114
1115 #[futures_test::test]
1116 async fn request_tests() {
1117 parse_request_test(b"GET / HTTP/1.1\r\n\r\n", |req| {
1118 assert_eq!(req.method(), Method::GET);
1119 assert_eq!(req.uri().to_string(), "/");
1120 assert_eq!(req.version(), Version::HTTP_11);
1121 assert_eq!(req.headers().len(), 0);
1122 })
1123 .await;
1124
1125 parse_request_test(b"GET /thing?data=a HTTP/1.1\r\n\r\n", |req| {
1126 assert_eq!(req.method(), Method::GET);
1127 assert_eq!(req.uri().to_string(), "/thing?data=a");
1128 assert_eq!(req.version(), Version::HTTP_11);
1129 assert_eq!(req.headers().len(), 0);
1130 })
1131 .await;
1132
1133 parse_request_test(b"GET /thing?data=a^ HTTP/1.1\r\n\r\n", |req| {
1134 assert_eq!(req.method(), Method::GET);
1135 assert_eq!(req.uri().to_string(), "/thing?data=a^");
1136 assert_eq!(req.version(), Version::HTTP_11);
1137 assert_eq!(req.headers().len(), 0);
1138 })
1139 .await;
1140
1141 parse_request_test(
1142 b"GET / HTTP/1.1\r\nHost: foo.com\r\nCookie: \r\n\r\n",
1143 |req| {
1144 assert_eq!(req.method(), Method::GET);
1145 assert_eq!(req.uri().to_string(), "/");
1146 assert_eq!(req.version(), Version::HTTP_11);
1147 assert_eq!(req.headers().len(), 2);
1148 assert_eq!(
1149 req.headers().get("Host").unwrap().to_str().unwrap(),
1150 "foo.com"
1151 );
1152 assert_eq!(req.headers().get("Cookie").unwrap().to_str().unwrap(), "");
1153 },
1154 )
1155 .await;
1156
1157 parse_request_test(
1158 b"GET / HTTP/1.1\r\nHost: \tfoo.com\t \r\nCookie: \t \r\n\r\n",
1159 |req| {
1160 assert_eq!(req.method(), Method::GET);
1161 assert_eq!(req.uri().to_string(), "/");
1162 assert_eq!(req.version(), Version::HTTP_11);
1163 assert_eq!(req.headers().len(), 2);
1164 assert_eq!(
1165 req.headers().get("Host").unwrap().to_str().unwrap(),
1166 "foo.com"
1167 );
1168 assert_eq!(req.headers().get("Cookie").unwrap().to_str().unwrap(), "");
1169 },
1170 )
1171 .await;
1172
1173 parse_request_test(
1174 b"GET / HTTP/1.1\r\nUser-Agent: some\tagent\r\n\r\n",
1175 |req| {
1176 assert_eq!(req.method(), Method::GET);
1177 assert_eq!(req.uri().to_string(), "/");
1178 assert_eq!(req.version(), Version::HTTP_11);
1179 assert_eq!(req.headers().len(), 1);
1180 assert_eq!(
1181 req.headers().get("User-Agent").unwrap().to_str().unwrap(),
1182 "some\tagent"
1183 );
1184 },
1185 )
1186 .await;
1187
1188 parse_request_test(
1189 b"GET / HTTP/1.1\r\nUser-Agent: 1234567890some\tagent\r\n\r\n",
1190 |req| {
1191 assert_eq!(req.method(), Method::GET);
1192 assert_eq!(req.uri().to_string(), "/");
1193 assert_eq!(req.version(), Version::HTTP_11);
1194 assert_eq!(req.headers().len(), 1);
1195 assert_eq!(
1196 req.headers().get("User-Agent").unwrap().to_str().unwrap(),
1197 "1234567890some\tagent"
1198 );
1199 },
1200 )
1201 .await;
1202
1203 parse_request_test(
1204 b"GET / HTTP/1.1\r\nUser-Agent: 1234567890some\t1234567890agent1234567890\r\n\r\n",
1205 |req| {
1206 assert_eq!(req.method(), Method::GET);
1207 assert_eq!(req.uri().to_string(), "/");
1208 assert_eq!(req.version(), Version::HTTP_11);
1209 assert_eq!(req.headers().len(), 1);
1210 assert_eq!(
1211 req.headers().get("User-Agent").unwrap().to_str().unwrap(),
1212 "1234567890some\t1234567890agent1234567890"
1213 );
1214 },
1215 )
1216 .await;
1217
1218 parse_request_test(
1219 b"GET / HTTP/1.1\r\nHost: foo.com\r\nUser-Agent: \xe3\x81\xb2\xe3/1.0\r\n\r\n",
1220 |req| {
1221 assert_eq!(req.method(), Method::GET);
1222 assert_eq!(req.uri().to_string(), "/");
1223 assert_eq!(req.version(), Version::HTTP_11);
1224 assert_eq!(req.headers().len(), 2);
1225 assert_eq!(
1226 req.headers().get("Host").unwrap().to_str().unwrap(),
1227 "foo.com"
1228 );
1229 assert_eq!(
1230 req.headers().get("User-Agent").unwrap().as_bytes(),
1231 b"\xe3\x81\xb2\xe3/1.0"
1232 );
1233 },
1234 )
1235 .await;
1236
1237 parse_request_test(b"GET /\\?wayne\\=5 HTTP/1.1\r\n\r\n", |req| {
1238 assert_eq!(req.method(), Method::GET);
1239 assert_eq!(req.uri().to_string(), "/\\?wayne\\=5");
1240 assert_eq!(req.version(), Version::HTTP_11);
1241 assert_eq!(req.headers().len(), 0);
1242 })
1243 .await;
1244
1245 expect_request_partial_parse(b"GET / HTTP/1.1\r\n\r").await;
1246
1247 expect_request_partial_parse(b"GET / HTTP/1.1\r\nHost: yolo\r\n").await;
1248
1249 expect_request_empty_uri(b"GET HTTP/1.1\r\n\r\n").await;
1250
1251 expect_request_empty_method(b" HTTP/1.1\r\n\r\n").await;
1252
1253 expect_request_empty_method(b" / HTTP/1.1\r\n\r\n").await;
1254 }
1255
1256 #[derive(Serialize, Deserialize, PartialEq, Debug)]
1257 struct Mock {
1258 a: i32,
1259 b: String,
1260 }
1261
1262 #[futures_test::test]
1263 async fn test_from_json() {
1264 let mock = Mock {
1265 a: 10,
1266 b: "hello".to_string(),
1267 };
1268
1269 let mut json_data = Bytes::from(serde_json::to_string_pretty(&mock).unwrap());
1270
1271 let body_reader = BodyReader::new(
1272 json_data.split_to(json_data.len() / 2),
1273 Cursor::new(json_data),
1274 );
1275
1276 let mock2 = body_reader.from_json::<Mock>(None).await.unwrap();
1277
1278 assert_eq!(mock, mock2);
1279 }
1280}