1use crate::body::Body;
2#[cfg(feature = "cookie")]
3use crate::cookies;
4use crate::errors::{new_io_error, Error, Result};
5use crate::record::{HTTPRecord, RedirectRecord};
6#[cfg(feature = "tls")]
7use crate::tls::PeerCertificate;
8use crate::{Request, COLON_SPACE, CR_LF, SPACE};
9use bytes::Bytes;
10#[cfg(feature = "charset")]
11use encoding_rs::{Encoding, UTF_8};
12#[cfg(feature = "gzip")]
13use flate2::read::MultiGzDecoder;
14use http::{Method, Response as HttpResponse};
15#[cfg(feature = "charset")]
16use mime::Mime;
17#[cfg(feature = "gzip")]
18use std::io::Read;
19use std::pin::Pin;
20use std::task::{Context, Poll};
21use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader, ReadBuf};
22use tokio::time::{timeout, Duration};
23
24#[derive(Debug, Default, Clone)]
26#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
27#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
28pub struct Response {
29 #[cfg_attr(feature = "serde", serde(with = "http_serde::version"))]
30 #[cfg_attr(
31 feature = "schema",
32 schemars(
33 with = "String",
34 title = "HTTP version",
35 description = "The protocol version used in the HTTP response",
36 example = "HTTP/1.1"
37 )
38 )]
39 pub version: http::Version,
41 #[cfg_attr(feature = "serde", serde(with = "http_serde::uri"))]
42 #[cfg_attr(
43 feature = "schema",
44 schemars(
45 with = "String",
46 title = "request URI",
47 description = "The original request URI that generated this response",
48 example = "https://example.com/api/v1/resource"
49 )
50 )]
51 pub uri: http::Uri,
53 #[cfg_attr(feature = "serde", serde(with = "http_serde::status_code"))]
54 #[cfg_attr(
55 feature = "schema",
56 schemars(
57 title = "status code",
58 description = "The HTTP status code indicating the response status",
59 example = 200,
60 schema_with = "crate::serde_schema::status_code_schema"
61 )
62 )]
63 pub status_code: http::StatusCode,
65 #[cfg_attr(feature = "serde", serde(with = "http_serde::header_map"))]
66 #[cfg_attr(
67 feature = "schema",
68 schemars(
69 with = "std::collections::HashMap<String,String>",
70 title = "response headers",
71 description = "Key-value pairs of HTTP headers included in the response",
72 example = r#"{"Content-Type": "application/json", "Cache-Control": "max-age=3600"}"#
73 )
74 )]
75 pub headers: http::HeaderMap<http::HeaderValue>,
77 #[cfg_attr(feature = "serde", serde(skip))]
78 pub extensions: http::Extensions,
80 #[cfg_attr(
81 feature = "schema",
82 schemars(
83 title = "response body",
84 description = "Optional body content received in the HTTP response",
85 example = r#"{"data": {"id": 123, "name": "example"}}"#,
86 )
87 )]
88 pub body: Option<Body>,
90}
91
92impl PartialEq for Response {
93 fn eq(&self, other: &Self) -> bool {
94 self.version == other.version
95 && self.status_code == other.status_code
96 && self.headers == other.headers
97 && self.body.eq(&self.body)
98 }
99}
100
101impl<T> From<HttpResponse<T>> for Response
102where
103 T: Into<Body>,
104{
105 fn from(value: HttpResponse<T>) -> Self {
106 let (parts, body) = value.into_parts();
107 let body = body.into();
108 Self {
109 version: parts.version,
110 uri: Default::default(),
111 status_code: parts.status,
112 headers: parts.headers,
113 extensions: parts.extensions,
114 body: if body.is_empty() { None } else { Some(body) },
115 }
116 }
117}
118
119impl Response {
120 pub(crate) fn to_raw(&self) -> Bytes {
121 let mut http_response = Vec::new();
122 http_response.extend(format!("{:?}", self.version).as_bytes());
123 http_response.extend(SPACE);
124 http_response.extend(format!("{}", self.status_code).as_bytes());
125 http_response.extend(CR_LF);
126 for (k, v) in self.headers.iter() {
127 http_response.extend(k.as_str().as_bytes());
128 http_response.extend(COLON_SPACE);
129 http_response.extend(v.as_bytes());
130 http_response.extend(CR_LF);
131 }
132 http_response.extend(CR_LF);
133 if let Some(b) = self.body() {
135 if !b.is_empty() {
136 http_response.extend(b.as_ref());
137 }
138 }
139 Bytes::from(http_response)
140 }
141 pub fn builder() -> http::response::Builder {
146 http::response::Builder::new()
147 }
148}
149
150impl Response {
151 #[cfg(feature = "cookie")]
159 #[cfg_attr(docsrs, doc(cfg(feature = "cookie")))]
160 pub fn cookies(&self) -> impl Iterator<Item = cookies::Cookie<'_>> {
161 cookies::extract_response_cookies(&self.headers).filter_map(|x| x.ok())
162 }
163
164 #[cfg(feature = "charset")]
166 pub fn text_with_charset(&self, default_encoding: &str) -> Result<String> {
167 let body = if let Some(b) = self.body() {
168 b
169 } else {
170 return Ok(String::new());
171 };
172 let content_type = self
173 .headers
174 .get(http::header::CONTENT_TYPE)
175 .and_then(|value| value.to_str().ok())
176 .and_then(|value| value.parse::<Mime>().ok());
177 let header_encoding = content_type
178 .as_ref()
179 .and_then(|mime| mime.get_param("charset").map(|charset| charset.as_str()))
180 .unwrap_or(default_encoding);
181 let mut decode_text = String::new();
182 for encoding_name in &[header_encoding, default_encoding] {
183 let encoding = Encoding::for_label(encoding_name.as_bytes()).unwrap_or(UTF_8);
184 let (text, _, is_errors) = encoding.decode(body);
185 if !is_errors {
186 decode_text = text.to_string();
187 break;
188 }
189 }
190 Ok(decode_text)
191 }
192 pub fn text(&self) -> Result<String> {
215 #[cfg(feature = "charset")]
216 {
217 let default_encoding = "utf-8";
218 self.text_with_charset(default_encoding)
219 }
220 #[cfg(not(feature = "charset"))]
221 Ok(String::from_utf8_lossy(&self.body().clone().unwrap_or_default()).to_string())
222 }
223 #[inline]
267 pub fn status_code(&self) -> http::StatusCode {
268 self.status_code
269 }
270 #[inline]
272 pub fn version(&self) -> http::Version {
273 self.version
274 }
275
276 #[inline]
299 pub fn headers(&self) -> &http::HeaderMap {
300 &self.headers
301 }
302 #[inline]
304 pub fn headers_mut(&mut self) -> &mut http::HeaderMap {
305 &mut self.headers
306 }
307 pub fn content_length(&self) -> Option<u64> {
315 self
316 .headers
317 .get(http::header::CONTENT_LENGTH)
318 .and_then(|x| x.to_str().ok()?.parse().ok())
319 }
320 #[inline]
332 pub fn uri(&self) -> &http::Uri {
333 &self.uri
334 }
335 #[inline]
336 pub(crate) fn url_mut(&mut self) -> &mut http::Uri {
337 &mut self.uri
338 }
339 pub fn body(&self) -> &Option<Body> {
352 &self.body
353 }
354 pub fn body_mut(&mut self) -> &mut Option<Body> {
356 &mut self.body
357 }
358 pub fn extensions(&self) -> &http::Extensions {
360 &self.extensions
361 }
362 pub fn extensions_mut(&mut self) -> &mut http::Extensions {
364 &mut self.extensions
365 }
366}
367
368impl Response {
370 #[cfg(feature = "tls")]
383 pub fn certificate(&self) -> Option<&Vec<PeerCertificate>> {
384 self.extensions().get::<Vec<PeerCertificate>>()
385 }
386 pub fn http_record(&self) -> Option<&Vec<HTTPRecord>> {
398 self.extensions().get::<Vec<HTTPRecord>>()
399 }
400 pub fn request(&self) -> Option<&Request> {
412 self.extensions().get::<Request>()
413 }
414 pub fn redirect_record(&self) -> Option<&RedirectRecord> {
426 self.extensions().get::<RedirectRecord>()
427 }
428}
429
430#[derive(Debug)]
468pub struct StreamingResponse<T: AsyncRead + AsyncReadExt + Unpin> {
469 pub version: http::Version,
471 pub status_code: http::StatusCode,
473 pub headers: http::HeaderMap<http::HeaderValue>,
475 pub extensions: http::Extensions,
477 reader: BufReader<T>,
479 config: ResponseConfig,
481}
482
483impl<T: AsyncRead + AsyncReadExt + Unpin + Sized> StreamingResponse<T> {
485 #[inline]
487 pub fn status_code(&self) -> http::StatusCode {
488 self.status_code
489 }
490
491 #[inline]
493 pub fn version(&self) -> http::Version {
494 self.version
495 }
496
497 #[inline]
499 pub fn headers(&self) -> &http::HeaderMap {
500 &self.headers
501 }
502
503 #[inline]
505 pub fn headers_mut(&mut self) -> &mut http::HeaderMap {
506 &mut self.headers
507 }
508
509 pub fn content_length(&self) -> Option<u64> {
511 self
512 .headers
513 .get(http::header::CONTENT_LENGTH)
514 .and_then(|x| x.to_str().ok()?.parse().ok())
515 }
516
517 pub fn extensions(&self) -> &http::Extensions {
519 &self.extensions
520 }
521
522 pub fn extensions_mut(&mut self) -> &mut http::Extensions {
524 &mut self.extensions
525 }
526
527 #[inline]
531 pub fn reader(&self) -> &BufReader<T> {
532 &self.reader
533 }
534
535 #[inline]
539 pub fn reader_mut(&mut self) -> &mut BufReader<T> {
540 &mut self.reader
541 }
542}
543
544impl<T: AsyncRead + AsyncReadExt + Unpin + Sized> StreamingResponse<T> {
546 pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
558 if let Some(to) = self.config.timeout {
559 timeout(to, self.reader.read(buf))
560 .await
561 .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::TimedOut, e)))?
562 .map_err(Error::IO)
563 } else {
564 self.reader.read(buf).await.map_err(Error::IO)
565 }
566 }
567
568 pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<usize> {
574 if let Some(to) = self.config.timeout {
575 timeout(to, self.reader.read_exact(buf))
576 .await
577 .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::TimedOut, e)))?
578 .map_err(Error::IO)
579 } else {
580 self.reader.read_exact(buf).await.map_err(Error::IO)
581 }
582 }
583
584 pub async fn read_line(&mut self, buf: &mut String) -> Result<usize> {
588 if let Some(to) = self.config.timeout {
589 timeout(to, self.reader.read_line(buf))
590 .await
591 .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::TimedOut, e)))?
592 .map_err(Error::IO)
593 } else {
594 self.reader.read_line(buf).await.map_err(Error::IO)
595 }
596 }
597
598 pub async fn read_until(&mut self, delimiter: u8, buf: &mut Vec<u8>) -> Result<usize> {
602 if let Some(to) = self.config.timeout {
603 timeout(to, self.reader.read_until(delimiter, buf))
604 .await
605 .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::TimedOut, e)))?
606 .map_err(Error::IO)
607 } else {
608 self
609 .reader
610 .read_until(delimiter, buf)
611 .await
612 .map_err(Error::IO)
613 }
614 }
615
616 pub async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
620 if let Some(to) = self.config.timeout {
621 timeout(to, self.reader.read_to_end(buf))
622 .await
623 .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::TimedOut, e)))?
624 .map_err(Error::IO)
625 } else {
626 self.reader.read_to_end(buf).await.map_err(Error::IO)
627 }
628 }
629
630 pub async fn read_to_string(&mut self, buf: &mut String) -> Result<usize> {
634 if let Some(to) = self.config.timeout {
635 timeout(to, self.reader.read_to_string(buf))
636 .await
637 .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::TimedOut, e)))?
638 .map_err(Error::IO)
639 } else {
640 self.reader.read_to_string(buf).await.map_err(Error::IO)
641 }
642 }
643}
644
645impl<T: AsyncRead + AsyncReadExt + Unpin + Sized> StreamingResponse<T> {
647 pub async fn finish(mut self) -> Result<(Response, T)>
665 where
666 T: Send,
667 {
668 let body = read_body(&mut self.reader, &self.headers, &self.config).await?;
669 let response = Response {
670 version: self.version,
671 uri: http::Uri::default(),
672 status_code: self.status_code,
673 headers: self.headers,
674 extensions: self.extensions,
675 body: if body.is_empty() {
676 None
677 } else {
678 Some(body.into())
679 },
680 };
681 let socket = self.reader.into_inner();
682 Ok((response, socket))
683 }
684}
685
686impl<T: AsyncRead + AsyncReadExt + Unpin + Sized> AsyncRead for StreamingResponse<T> {
688 fn poll_read(
689 mut self: Pin<&mut Self>,
690 cx: &mut Context<'_>,
691 buf: &mut ReadBuf<'_>,
692 ) -> Poll<std::io::Result<()>> {
693 Pin::new(&mut self.reader).poll_read(cx, buf)
694 }
695}
696
697impl<T: AsyncRead + AsyncReadExt + Unpin + Sized> AsyncBufRead for StreamingResponse<T> {
699 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
700 Pin::new(&mut self.get_mut().reader).poll_fill_buf(cx)
701 }
702
703 fn consume(mut self: Pin<&mut Self>, amt: usize) {
704 Pin::new(&mut self.reader).consume(amt)
705 }
706}
707
708async fn read_body<R: AsyncRead + AsyncBufRead + Unpin>(
715 reader: &mut R,
716 headers: &http::HeaderMap,
717 config: &ResponseConfig,
718) -> Result<Vec<u8>> {
719 let mut body = Vec::new();
720 if matches!(config.method, Method::HEAD) {
721 return Ok(body);
722 }
723 let mut content_length: Option<u64> = headers.get(http::header::CONTENT_LENGTH).and_then(|x| {
724 x.to_str()
725 .ok()?
726 .parse()
727 .ok()
728 .and_then(|l| if l == 0 { None } else { Some(l) })
729 });
730 if config.unsafe_response {
731 content_length = None;
732 }
733 if let Some(te) = headers.get(http::header::TRANSFER_ENCODING) {
734 if te == "chunked" {
735 body = read_chunked_body(reader).await?;
736 }
737 } else {
738 let limits = content_length.map(|x| {
739 if let Some(max) = config.max_read {
740 std::cmp::min(x, max)
741 } else {
742 x
743 }
744 });
745 let mut buffer = vec![0; 12];
746 let mut total_bytes_read = 0;
747 let timeout_duration = config.timeout;
748 loop {
749 let size = if let Some(to) = timeout_duration {
750 match tokio::time::timeout(to, reader.read(&mut buffer)).await {
751 Ok(size) => size,
752 Err(_) => break,
753 }
754 } else {
755 reader.read(&mut buffer).await
756 };
757 match size {
758 Ok(0) => break,
759 Ok(n) => {
760 body.extend_from_slice(&buffer[..n]);
761 total_bytes_read += n;
762 }
763 Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => {
764 if total_bytes_read > 0 {
765 break;
766 }
767 }
768 Err(_err) => break,
769 }
770 if let Some(limit) = limits {
771 if total_bytes_read >= limit as usize {
772 break;
773 }
774 }
775 }
776 }
777 #[cfg(feature = "gzip")]
778 if let Some(ce) = headers.get(http::header::CONTENT_ENCODING) {
779 if ce == "gzip" {
780 let mut gzip_body = Vec::new();
781 let mut d = MultiGzDecoder::new(&body[..]);
782 d.read_to_end(&mut gzip_body)?;
783 body = gzip_body;
784 }
785 }
786 Ok(body)
787}
788
789async fn read_chunked_body<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Vec<u8>> {
791 let mut body: Vec<u8> = Vec::new();
792 loop {
793 let mut chunk: String = String::new();
794 loop {
795 let mut one_byte = vec![0; 1];
796 reader.read_exact(&mut one_byte).await?;
797 if one_byte[0] != 10 && one_byte[0] != 13 {
798 chunk.push(one_byte[0] as char);
799 break;
800 }
801 }
802 loop {
803 let mut one_byte = vec![0; 1];
804 reader.read_exact(&mut one_byte).await?;
805 if one_byte[0] == 10 || one_byte[0] == 13 {
806 reader.read_exact(&mut one_byte).await?;
807 break;
808 } else {
809 chunk.push(one_byte[0] as char)
810 }
811 }
812 if chunk == "0" || chunk.is_empty() {
813 break;
814 }
815 let chunk = usize::from_str_radix(&chunk, 16)?;
816 let mut chunk_of_bytes = vec![0; chunk];
817 reader.read_exact(&mut chunk_of_bytes).await?;
818 body.append(&mut chunk_of_bytes);
819 }
820 Ok(body)
821}
822
823#[derive(Debug)]
827pub struct ResponseBuilder<T: AsyncRead + AsyncReadExt> {
828 builder: http::response::Builder,
829 reader: BufReader<T>,
830 config: ResponseConfig,
831}
832
833#[derive(Debug, Default)]
835pub struct ResponseConfig {
836 method: Method,
837 timeout: Option<Duration>,
838 unsafe_response: bool,
839 max_read: Option<u64>,
840}
841
842impl ResponseConfig {
843 pub fn new(request: &Request, timeout: Option<Duration>) -> Self {
845 let method = request.method().clone();
846 let unsafe_response = request.is_unsafe();
847 ResponseConfig {
848 method,
849 timeout,
850 unsafe_response,
851 max_read: None,
852 }
853 }
854}
855
856impl<T: AsyncRead + Unpin + Sized> ResponseBuilder<T> {
857 pub fn new(reader: BufReader<T>, config: ResponseConfig) -> ResponseBuilder<T> {
859 ResponseBuilder {
860 builder: Default::default(),
861 reader,
862 config,
863 }
864 }
865 async fn parser_version(&mut self) -> Result<(http::Version, http::StatusCode)> {
866 let (mut vf, mut sf) = (false, false);
867 let mut lines = Vec::new();
868 if let Ok(_length) = timeout(
869 self.config.timeout.unwrap_or(Duration::from_secs(30)),
870 self.reader.read_until(b'\n', &mut lines),
871 )
872 .await
873 .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::TimedOut, e)))?
874 {
875 let mut version = http::Version::default();
876 let mut sc = http::StatusCode::default();
877 for (index, vc) in lines.splitn(3, |b| b == &b' ').enumerate() {
878 if vc.is_empty() {
879 return Err(new_io_error(
880 std::io::ErrorKind::InvalidData,
881 "invalid http version and status_code data",
882 ));
883 }
884 match index {
885 0 => {
886 version = match vc {
887 b"HTTP/0.9" => http::Version::HTTP_09,
888 b"HTTP/1.0" => http::Version::HTTP_10,
889 b"HTTP/1.1" => http::Version::HTTP_11,
890 b"HTTP/2.0" => http::Version::HTTP_2,
891 b"HTTP/3.0" => http::Version::HTTP_3,
892 _ => {
893 return Err(new_io_error(
894 std::io::ErrorKind::InvalidData,
895 "invalid http version",
896 ));
897 }
898 };
899 vf = true;
900 }
901 1 => {
902 sc = http::StatusCode::try_from(vc).map_err(|x| Error::Http(http::Error::from(x)))?;
903 sf = true;
904 }
905 _ => {}
906 }
907 }
908 if !(vf && sf) {
909 return Err(new_io_error(
910 std::io::ErrorKind::InvalidData,
911 "invalid http version and status_code data",
912 ));
913 }
914 Ok((version, sc))
915 } else {
916 Err(new_io_error(
917 std::io::ErrorKind::InvalidData,
918 "invalid http version and status_code data",
919 ))
920 }
921 }
922 async fn read_headers(&mut self) -> Result<http::HeaderMap> {
923 let mut headers = http::HeaderMap::new();
925 let mut header_line = Vec::new();
926 while let Ok(length) = timeout(
927 self.config.timeout.unwrap_or(Duration::from_secs(30)),
928 self.reader.read_until(b'\n', &mut header_line),
929 )
930 .await
931 .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::TimedOut, e)))?
932 {
933 if length == 0 || header_line == b"\r\n" {
934 break;
935 }
936 if let Ok((Some(k), Some(v))) = parser_headers(&header_line) {
937 if headers.contains_key(&k) {
938 headers.append(k, v);
939 } else {
940 headers.insert(k, v);
941 }
942 };
943 header_line.clear();
944 }
945 Ok(headers)
946 }
947
948 pub async fn build(mut self) -> Result<(Response, T)> {
951 let (v, c) = self.parser_version().await?;
952 self.builder = self.builder.version(v).status(c);
953 let headers = self.read_headers().await?;
954 let body = read_body(&mut self.reader, &headers, &self.config).await?;
956 if let Some(h) = self.builder.headers_mut() {
957 *h = headers;
958 }
959 let resp = self.builder.body(body)?;
960 let response = resp.into();
961 let socket = self.reader.into_inner();
962 Ok((response, socket))
963 }
964
965 pub async fn build_streaming(mut self) -> Result<StreamingResponse<T>> {
1006 let (version, status_code) = self.parser_version().await?;
1007 let headers = self.read_headers().await?;
1008 Ok(StreamingResponse {
1009 version,
1010 status_code,
1011 headers,
1012 extensions: http::Extensions::new(),
1013 reader: self.reader,
1014 config: self.config,
1015 })
1016 }
1017}
1018
1019pub(crate) fn parser_headers(
1020 buffer: &[u8],
1021) -> Result<(Option<http::HeaderName>, Option<http::HeaderValue>)> {
1022 let mut k = None;
1023 let mut v = None;
1024 let buffer = buffer.strip_suffix(CR_LF).unwrap_or(buffer);
1025 for (index, h) in buffer.splitn(2, |s| s == &58).enumerate() {
1026 let h = h.strip_prefix(SPACE).unwrap_or(h);
1027 match index {
1028 0 => match http::HeaderName::from_bytes(h) {
1029 Ok(hk) => k = Some(hk),
1030 Err(err) => {
1031 return Err(Error::Http(http::Error::from(err)));
1032 }
1033 },
1034 1 => match http::HeaderValue::from_bytes(h) {
1035 Ok(hv) => v = Some(hv),
1036 Err(err) => {
1037 return Err(Error::Http(http::Error::from(err)));
1038 }
1039 },
1040 _ => {}
1041 }
1042 }
1043 Ok((k, v))
1044}