slinger/
response.rs

1use crate::body::Body;
2#[cfg(feature = "cookie")]
3use crate::cookies;
4use crate::errors::{new_io_error, Error, Result};
5use crate::record::{HTTPRecord, RedirectRecord};
6#[cfg(feature = "tls")]
7use crate::tls::PeerCertificate;
8use crate::{Request, COLON_SPACE, CR_LF, SPACE};
9use bytes::Bytes;
10#[cfg(feature = "charset")]
11use encoding_rs::{Encoding, UTF_8};
12#[cfg(feature = "gzip")]
13use flate2::read::MultiGzDecoder;
14use http::{Method, Response as HttpResponse};
15#[cfg(feature = "charset")]
16use mime::Mime;
17#[cfg(feature = "gzip")]
18use std::io::Read;
19use std::pin::Pin;
20use std::task::{Context, Poll};
21use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader, ReadBuf};
22use tokio::time::{timeout, Duration};
23
24/// A Response to a submitted `Request`.
25#[derive(Debug, Default, Clone)]
26#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
27#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
28pub struct Response {
29  #[cfg_attr(feature = "serde", serde(with = "http_serde::version"))]
30  #[cfg_attr(
31    feature = "schema",
32    schemars(
33      with = "String",
34      title = "HTTP version",
35      description = "The protocol version used in the HTTP response",
36      example = "HTTP/1.1"
37    )
38  )]
39  /// The HTTP version of the response.
40  pub version: http::Version,
41  #[cfg_attr(feature = "serde", serde(with = "http_serde::uri"))]
42  #[cfg_attr(
43    feature = "schema",
44    schemars(
45      with = "String",
46      title = "request URI",
47      description = "The original request URI that generated this response",
48      example = "https://example.com/api/v1/resource"
49    )
50  )]
51  /// The final URI of the response.
52  pub uri: http::Uri,
53  #[cfg_attr(feature = "serde", serde(with = "http_serde::status_code"))]
54  #[cfg_attr(
55    feature = "schema",
56    schemars(
57      title = "status code",
58      description = "The HTTP status code indicating the response status",
59      example = 200,
60      schema_with = "crate::serde_schema::status_code_schema"
61    )
62  )]
63  /// The status code of the response.
64  pub status_code: http::StatusCode,
65  #[cfg_attr(feature = "serde", serde(with = "http_serde::header_map"))]
66  #[cfg_attr(
67    feature = "schema",
68    schemars(
69      with = "std::collections::HashMap<String,String>",
70      title = "response headers",
71      description = "Key-value pairs of HTTP headers included in the response",
72      example = r#"{"Content-Type": "application/json", "Cache-Control": "max-age=3600"}"#
73    )
74  )]
75  /// The headers of the response.
76  pub headers: http::HeaderMap<http::HeaderValue>,
77  #[cfg_attr(feature = "serde", serde(skip))]
78  /// The extensions associated with the response.
79  pub extensions: http::Extensions,
80  #[cfg_attr(
81    feature = "schema",
82    schemars(
83      title = "response body",
84      description = "Optional body content received in the HTTP response",
85      example = r#"{"data": {"id": 123, "name": "example"}}"#,
86    )
87  )]
88  /// The body of the response.
89  pub body: Option<Body>,
90}
91
92impl PartialEq for Response {
93  fn eq(&self, other: &Self) -> bool {
94    self.version == other.version
95      && self.status_code == other.status_code
96      && self.headers == other.headers
97      && self.body.eq(&self.body)
98  }
99}
100
101impl<T> From<HttpResponse<T>> for Response
102where
103  T: Into<Body>,
104{
105  fn from(value: HttpResponse<T>) -> Self {
106    let (parts, body) = value.into_parts();
107    let body = body.into();
108    Self {
109      version: parts.version,
110      uri: Default::default(),
111      status_code: parts.status,
112      headers: parts.headers,
113      extensions: parts.extensions,
114      body: if body.is_empty() { None } else { Some(body) },
115    }
116  }
117}
118
119impl Response {
120  pub(crate) fn to_raw(&self) -> Bytes {
121    let mut http_response = Vec::new();
122    http_response.extend(format!("{:?}", self.version).as_bytes());
123    http_response.extend(SPACE);
124    http_response.extend(format!("{}", self.status_code).as_bytes());
125    http_response.extend(CR_LF);
126    for (k, v) in self.headers.iter() {
127      http_response.extend(k.as_str().as_bytes());
128      http_response.extend(COLON_SPACE);
129      http_response.extend(v.as_bytes());
130      http_response.extend(CR_LF);
131    }
132    http_response.extend(CR_LF);
133    // 添加body
134    if let Some(b) = self.body() {
135      if !b.is_empty() {
136        http_response.extend(b.as_ref());
137      }
138    }
139    Bytes::from(http_response)
140  }
141  /// An HTTP response builder
142  ///
143  /// This type can be used to construct an instance of `Response` through a
144  /// builder-like pattern.
145  pub fn builder() -> http::response::Builder {
146    http::response::Builder::new()
147  }
148}
149
150impl Response {
151  /// Retrieve the cookies contained in the response.
152  ///
153  /// Note that invalid 'Set-Cookie' headers will be ignored.
154  ///
155  /// # Optional
156  ///
157  /// This requires the optional `cookie` feature to be enabled.
158  #[cfg(feature = "cookie")]
159  #[cfg_attr(docsrs, doc(cfg(feature = "cookie")))]
160  pub fn cookies(&self) -> impl Iterator<Item = cookies::Cookie<'_>> {
161    cookies::extract_response_cookies(&self.headers).filter_map(|x| x.ok())
162  }
163
164  /// 获取编码并且尝试解码
165  #[cfg(feature = "charset")]
166  pub fn text_with_charset(&self, default_encoding: &str) -> Result<String> {
167    let body = if let Some(b) = self.body() {
168      b
169    } else {
170      return Ok(String::new());
171    };
172    let content_type = self
173      .headers
174      .get(http::header::CONTENT_TYPE)
175      .and_then(|value| value.to_str().ok())
176      .and_then(|value| value.parse::<Mime>().ok());
177    let header_encoding = content_type
178      .as_ref()
179      .and_then(|mime| mime.get_param("charset").map(|charset| charset.as_str()))
180      .unwrap_or(default_encoding);
181    let mut decode_text = String::new();
182    for encoding_name in &[header_encoding, default_encoding] {
183      let encoding = Encoding::for_label(encoding_name.as_bytes()).unwrap_or(UTF_8);
184      let (text, _, is_errors) = encoding.decode(body);
185      if !is_errors {
186        decode_text = text.to_string();
187        break;
188      }
189    }
190    Ok(decode_text)
191  }
192  /// Get the response text.
193  ///
194  /// This method decodes the response body with BOM sniffing
195  /// and with malformed sequences replaced with the REPLACEMENT CHARACTER.
196  /// Encoding is determined from the `charset` parameter of `Content-Type` header,
197  /// and defaults to `utf-8` if not presented.
198  ///
199  /// # Note
200  ///
201  /// If the `charset` feature is disabled the method will only attempt to decode the
202  /// response as UTF-8, regardless of the given `Content-Type`
203  ///
204  /// # Example
205  ///
206  /// ```rust
207  /// # extern crate slinger;
208  /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
209  /// let content = slinger::get("http://httpbin.org/range/26").await?.text()?;
210  /// # Ok(())
211  /// # }
212  /// ```
213  ///
214  pub fn text(&self) -> Result<String> {
215    #[cfg(feature = "charset")]
216    {
217      let default_encoding = "utf-8";
218      self.text_with_charset(default_encoding)
219    }
220    #[cfg(not(feature = "charset"))]
221    Ok(String::from_utf8_lossy(&self.body().clone().unwrap_or_default()).to_string())
222  }
223  /// Get the `StatusCode` of this `Response`.
224  ///
225  /// # Examples
226  ///
227  /// Checking for general status class:
228  ///
229  /// ```rust
230  /// # #[cfg(feature = "json")]
231  /// # fn run() -> Result<(), Box<dyn std::error::Error>> {
232  /// let resp = slinger::get("http://httpbin.org/get")?;
233  /// if resp.status().is_success() {
234  ///     println!("success!");
235  /// } else if resp.status().is_server_error() {
236  ///     println!("server error!");
237  /// } else {
238  ///     println!("Something else happened. Status: {:?}", resp.status());
239  /// }
240  /// # Ok(())
241  /// # }
242  /// ```
243  ///
244  /// Checking for specific status codes:
245  ///
246  /// ```rust
247  /// use slinger::Client;
248  /// use slinger::http::StatusCode;
249  /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
250  /// let client = Client::new();
251  ///
252  /// let resp = client.post("http://httpbin.org/post")
253  ///     .body("possibly too large")
254  ///     .send().await?;
255  ///
256  /// match resp.status_code() {
257  ///     StatusCode::OK => println!("success!"),
258  ///     StatusCode::PAYLOAD_TOO_LARGE => {
259  ///         println!("Request payload is too large!");
260  ///     }
261  ///     s => println!("Received response status: {s:?}"),
262  /// };
263  /// # Ok(())
264  /// # }
265  /// ```
266  #[inline]
267  pub fn status_code(&self) -> http::StatusCode {
268    self.status_code
269  }
270  /// Get the HTTP `Version` of this `Response`.
271  #[inline]
272  pub fn version(&self) -> http::Version {
273    self.version
274  }
275
276  /// Get the `Headers` of this `Response`.
277  ///
278  /// # Example
279  ///
280  /// Saving an etag when caching a file:
281  ///
282  /// ```
283  /// use slinger::Client;
284  /// use slinger::http::header::ETAG;
285  ///
286  /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
287  /// let client = Client::new();
288  ///
289  /// let mut resp = client.get("http://httpbin.org/cache").send().await?;
290  /// if resp.status_code().is_success() {
291  ///     if let Some(etag) = resp.headers().get(ETAG) {
292  ///         std::fs::write("etag", etag.as_bytes())?;
293  ///     }
294  /// }
295  /// # Ok(())
296  /// # }
297  /// ```
298  #[inline]
299  pub fn headers(&self) -> &http::HeaderMap {
300    &self.headers
301  }
302  /// Get a mutable reference to the `Headers` of this `Response`.
303  #[inline]
304  pub fn headers_mut(&mut self) -> &mut http::HeaderMap {
305    &mut self.headers
306  }
307  /// Get the content-length of the response, if it is known.
308  ///
309  /// Reasons it may not be known:
310  ///
311  /// - The server didn't send a `content-length` header.
312  /// - The response is gzipped and automatically decoded (thus changing
313  ///   the actual decoded length).
314  pub fn content_length(&self) -> Option<u64> {
315    self
316      .headers
317      .get(http::header::CONTENT_LENGTH)
318      .and_then(|x| x.to_str().ok()?.parse().ok())
319  }
320  /// Get the final `http::Uri` of this `Response`.
321  ///
322  /// # Example
323  ///
324  /// ```rust
325  /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
326  /// let resp = slinger::get("http://httpbin.org/redirect/1").await?;
327  /// assert_eq!(resp.uri().to_string().as_str(), "http://httpbin.org/get");
328  /// # Ok(())
329  /// # }
330  /// ```
331  #[inline]
332  pub fn uri(&self) -> &http::Uri {
333    &self.uri
334  }
335  #[inline]
336  pub(crate) fn url_mut(&mut self) -> &mut http::Uri {
337    &mut self.uri
338  }
339  /// Get the full response body as `Bytes`.
340  ///
341  /// # Example
342  ///
343  /// ```
344  /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
345  /// let resp = slinger::get("http://httpbin.org/ip").await?;
346  /// let body = resp.body();
347  /// println!("bytes: {body:?}");
348  /// # Ok(())
349  /// # }
350  /// ```
351  pub fn body(&self) -> &Option<Body> {
352    &self.body
353  }
354  /// private
355  pub fn body_mut(&mut self) -> &mut Option<Body> {
356    &mut self.body
357  }
358  /// Returns a reference to the associated extensions.
359  pub fn extensions(&self) -> &http::Extensions {
360    &self.extensions
361  }
362  /// Returns a mutable reference to the associated extensions.
363  pub fn extensions_mut(&mut self) -> &mut http::Extensions {
364    &mut self.extensions
365  }
366}
367
368/// 放一些响应中间过程记录,存起来方便获取
369impl Response {
370  /// Get the certificate to get this `Response`.
371  ///
372  /// # Example
373  ///
374  /// ```rust
375  /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
376  /// let resp = slinger::get("https://httpbin.org/").await?;
377  /// println!("httpbin.org certificate: {:?}", resp.certificate());
378  /// # Ok(())
379  /// # }
380  /// ```
381  ///
382  #[cfg(feature = "tls")]
383  pub fn certificate(&self) -> Option<&Vec<PeerCertificate>> {
384    self.extensions().get::<Vec<PeerCertificate>>()
385  }
386  /// Get the http record used to get this `Response`.
387  ///
388  /// # Example
389  ///
390  /// ```rust
391  /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
392  /// let resp = slinger::get("http://httpbin.org/redirect/1").await?;
393  /// println!("httpbin.org http: {:?}", resp.http_record());
394  /// # Ok(())
395  /// # }
396  /// ```
397  pub fn http_record(&self) -> Option<&Vec<HTTPRecord>> {
398    self.extensions().get::<Vec<HTTPRecord>>()
399  }
400  /// Get the request used to get this `Response`.
401  ///
402  /// # Example
403  ///
404  /// ```rust
405  /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
406  /// let resp = slinger::get("http://httpbin.org/redirect/1").await?;
407  /// println!("httpbin.org request: {:?}", resp.request());
408  /// # Ok(())
409  /// # }
410  /// ```
411  pub fn request(&self) -> Option<&Request> {
412    self.extensions().get::<Request>()
413  }
414  /// Get the redirect record used to get this `Response`.
415  ///
416  /// # Example
417  ///
418  /// ```rust
419  /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
420  /// let resp = slinger::get("http://httpbin.org/redirect-to?url=http://www.example.com/").await?;
421  /// println!("httpbin.org redirect: {:?}", resp.redirect_record());
422  /// # Ok(())
423  /// # }
424  /// ```
425  pub fn redirect_record(&self) -> Option<&RedirectRecord> {
426    self.extensions().get::<RedirectRecord>()
427  }
428}
429
430/// A streaming HTTP response that allows the body to be read like a BufReader.
431///
432/// This struct is returned by [`ResponseBuilder::build_streaming()`] and provides
433/// access to the HTTP response headers and status code while allowing the body
434/// to be read in a streaming fashion. This is useful for:
435///
436/// - Reading large response bodies without loading them entirely into memory
437/// - Processing response data incrementally
438/// - Having more control over how and when the body is read
439///
440/// # Example
441///
442/// ```rust,ignore
443/// use slinger::{Client, ResponseBuilder, ResponseConfig};
444/// use tokio::io::AsyncBufReadExt;
445///
446/// async fn streaming_example() -> Result<(), Box<dyn std::error::Error>> {
447///     // After getting a streaming response...
448///     let mut streaming = response_builder.build_streaming().await?;
449///     
450///     // Access headers and status without reading body
451///     println!("Status: {}", streaming.status_code());
452///     println!("Headers: {:?}", streaming.headers());
453///     
454///     // Read body line by line
455///     let mut line = String::new();
456///     while streaming.read_line(&mut line).await? > 0 {
457///         println!("Line: {}", line);
458///         line.clear();
459///     }
460///     
461///     // Or consume the response and get remaining socket
462///     let (response, socket) = streaming.finish().await?;
463///     
464///     Ok(())
465/// }
466/// ```
467#[derive(Debug)]
468pub struct StreamingResponse<T: AsyncRead + AsyncReadExt + Unpin> {
469  /// The HTTP version of the response.
470  pub version: http::Version,
471  /// The status code of the response.
472  pub status_code: http::StatusCode,
473  /// The headers of the response.
474  pub headers: http::HeaderMap<http::HeaderValue>,
475  /// The extensions associated with the response.
476  pub extensions: http::Extensions,
477  /// The buffered reader for streaming body access.
478  reader: BufReader<T>,
479  /// Configuration for reading the response.
480  config: ResponseConfig,
481}
482
483/// Accessor methods for StreamingResponse
484impl<T: AsyncRead + AsyncReadExt + Unpin + Sized> StreamingResponse<T> {
485  /// Get the `StatusCode` of this `StreamingResponse`.
486  #[inline]
487  pub fn status_code(&self) -> http::StatusCode {
488    self.status_code
489  }
490
491  /// Get the HTTP `Version` of this `StreamingResponse`.
492  #[inline]
493  pub fn version(&self) -> http::Version {
494    self.version
495  }
496
497  /// Get the `Headers` of this `StreamingResponse`.
498  #[inline]
499  pub fn headers(&self) -> &http::HeaderMap {
500    &self.headers
501  }
502
503  /// Get a mutable reference to the `Headers` of this `StreamingResponse`.
504  #[inline]
505  pub fn headers_mut(&mut self) -> &mut http::HeaderMap {
506    &mut self.headers
507  }
508
509  /// Get the content-length of the response, if it is known.
510  pub fn content_length(&self) -> Option<u64> {
511    self
512      .headers
513      .get(http::header::CONTENT_LENGTH)
514      .and_then(|x| x.to_str().ok()?.parse().ok())
515  }
516
517  /// Returns a reference to the associated extensions.
518  pub fn extensions(&self) -> &http::Extensions {
519    &self.extensions
520  }
521
522  /// Returns a mutable reference to the associated extensions.
523  pub fn extensions_mut(&mut self) -> &mut http::Extensions {
524    &mut self.extensions
525  }
526
527  /// Get a reference to the underlying buffered reader.
528  ///
529  /// This allows direct access to the reader for advanced use cases.
530  #[inline]
531  pub fn reader(&self) -> &BufReader<T> {
532    &self.reader
533  }
534
535  /// Get a mutable reference to the underlying buffered reader.
536  ///
537  /// This allows direct access to the reader for advanced use cases.
538  #[inline]
539  pub fn reader_mut(&mut self) -> &mut BufReader<T> {
540    &mut self.reader
541  }
542}
543
544/// BufReader-like streaming read methods
545impl<T: AsyncRead + AsyncReadExt + Unpin + Sized> StreamingResponse<T> {
546  /// Read data from the response body into the provided buffer.
547  ///
548  /// Returns the number of bytes read, or 0 if EOF has been reached.
549  ///
550  /// # Example
551  ///
552  /// ```rust,ignore
553  /// let mut buf = [0u8; 1024];
554  /// let n = streaming.read(&mut buf).await?;
555  /// println!("Read {} bytes", n);
556  /// ```
557  pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
558    if let Some(to) = self.config.timeout {
559      timeout(to, self.reader.read(buf))
560        .await
561        .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::TimedOut, e)))?
562        .map_err(Error::IO)
563    } else {
564      self.reader.read(buf).await.map_err(Error::IO)
565    }
566  }
567
568  /// Read the exact number of bytes required to fill the buffer.
569  ///
570  /// # Errors
571  ///
572  /// Returns an error if EOF is reached before the buffer is filled.
573  pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<usize> {
574    if let Some(to) = self.config.timeout {
575      timeout(to, self.reader.read_exact(buf))
576        .await
577        .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::TimedOut, e)))?
578        .map_err(Error::IO)
579    } else {
580      self.reader.read_exact(buf).await.map_err(Error::IO)
581    }
582  }
583
584  /// Read all bytes until a newline (0xA) is reached, and append them to the provided buffer.
585  ///
586  /// Returns the number of bytes read, including the newline.
587  pub async fn read_line(&mut self, buf: &mut String) -> Result<usize> {
588    if let Some(to) = self.config.timeout {
589      timeout(to, self.reader.read_line(buf))
590        .await
591        .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::TimedOut, e)))?
592        .map_err(Error::IO)
593    } else {
594      self.reader.read_line(buf).await.map_err(Error::IO)
595    }
596  }
597
598  /// Read all bytes until the delimiter is reached, and append them to the provided buffer.
599  ///
600  /// Returns the number of bytes read, including the delimiter.
601  pub async fn read_until(&mut self, delimiter: u8, buf: &mut Vec<u8>) -> Result<usize> {
602    if let Some(to) = self.config.timeout {
603      timeout(to, self.reader.read_until(delimiter, buf))
604        .await
605        .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::TimedOut, e)))?
606        .map_err(Error::IO)
607    } else {
608      self
609        .reader
610        .read_until(delimiter, buf)
611        .await
612        .map_err(Error::IO)
613    }
614  }
615
616  /// Read all bytes until EOF, placing them into the provided buffer.
617  ///
618  /// Returns the number of bytes read.
619  pub async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
620    if let Some(to) = self.config.timeout {
621      timeout(to, self.reader.read_to_end(buf))
622        .await
623        .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::TimedOut, e)))?
624        .map_err(Error::IO)
625    } else {
626      self.reader.read_to_end(buf).await.map_err(Error::IO)
627    }
628  }
629
630  /// Read all bytes until EOF, placing them into the provided string buffer.
631  ///
632  /// Returns the number of bytes read.
633  pub async fn read_to_string(&mut self, buf: &mut String) -> Result<usize> {
634    if let Some(to) = self.config.timeout {
635      timeout(to, self.reader.read_to_string(buf))
636        .await
637        .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::TimedOut, e)))?
638        .map_err(Error::IO)
639    } else {
640      self.reader.read_to_string(buf).await.map_err(Error::IO)
641    }
642  }
643}
644
645/// Response conversion methods
646impl<T: AsyncRead + AsyncReadExt + Unpin + Sized> StreamingResponse<T> {
647  /// Consume the streaming response and read the remaining body, returning a complete `Response`.
648  ///
649  /// This is useful when you want to convert a streaming response to a regular response
650  /// after inspecting the headers.
651  ///
652  /// # Example
653  ///
654  /// ```rust,ignore
655  /// let mut streaming = response_builder.build_streaming().await?;
656  ///
657  /// // Check status code first
658  /// if streaming.status_code().is_success() {
659  ///     // Only read the body if successful
660  ///     let (response, socket) = streaming.finish().await?;
661  ///     println!("Body: {:?}", response.body());
662  /// }
663  /// ```
664  pub async fn finish(mut self) -> Result<(Response, T)>
665  where
666    T: Send,
667  {
668    let body = read_body(&mut self.reader, &self.headers, &self.config).await?;
669    let response = Response {
670      version: self.version,
671      uri: http::Uri::default(),
672      status_code: self.status_code,
673      headers: self.headers,
674      extensions: self.extensions,
675      body: if body.is_empty() {
676        None
677      } else {
678        Some(body.into())
679      },
680    };
681    let socket = self.reader.into_inner();
682    Ok((response, socket))
683  }
684}
685
686/// Implement `AsyncRead` for `StreamingResponse` to allow using it directly with async readers.
687impl<T: AsyncRead + AsyncReadExt + Unpin + Sized> AsyncRead for StreamingResponse<T> {
688  fn poll_read(
689    mut self: Pin<&mut Self>,
690    cx: &mut Context<'_>,
691    buf: &mut ReadBuf<'_>,
692  ) -> Poll<std::io::Result<()>> {
693    Pin::new(&mut self.reader).poll_read(cx, buf)
694  }
695}
696
697/// Implement `AsyncBufRead` for `StreamingResponse` to allow buffered reading operations.
698impl<T: AsyncRead + AsyncReadExt + Unpin + Sized> AsyncBufRead for StreamingResponse<T> {
699  fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
700    Pin::new(&mut self.get_mut().reader).poll_fill_buf(cx)
701  }
702
703  fn consume(mut self: Pin<&mut Self>, amt: usize) {
704    Pin::new(&mut self.reader).consume(amt)
705  }
706}
707
708// ============================================================================
709// Shared helper functions for body reading
710// ============================================================================
711
712/// Read HTTP body based on headers and configuration.
713/// This is used by both `StreamingResponse::finish()` and `ResponseBuilder::build()`.
714async fn read_body<R: AsyncRead + AsyncBufRead + Unpin>(
715  reader: &mut R,
716  headers: &http::HeaderMap,
717  config: &ResponseConfig,
718) -> Result<Vec<u8>> {
719  let mut body = Vec::new();
720  if matches!(config.method, Method::HEAD) {
721    return Ok(body);
722  }
723  let mut content_length: Option<u64> = headers.get(http::header::CONTENT_LENGTH).and_then(|x| {
724    x.to_str()
725      .ok()?
726      .parse()
727      .ok()
728      .and_then(|l| if l == 0 { None } else { Some(l) })
729  });
730  if config.unsafe_response {
731    content_length = None;
732  }
733  if let Some(te) = headers.get(http::header::TRANSFER_ENCODING) {
734    if te == "chunked" {
735      body = read_chunked_body(reader).await?;
736    }
737  } else {
738    let limits = content_length.map(|x| {
739      if let Some(max) = config.max_read {
740        std::cmp::min(x, max)
741      } else {
742        x
743      }
744    });
745    let mut buffer = vec![0; 12];
746    let mut total_bytes_read = 0;
747    let timeout_duration = config.timeout;
748    loop {
749      let size = if let Some(to) = timeout_duration {
750        match tokio::time::timeout(to, reader.read(&mut buffer)).await {
751          Ok(size) => size,
752          Err(_) => break,
753        }
754      } else {
755        reader.read(&mut buffer).await
756      };
757      match size {
758        Ok(0) => break,
759        Ok(n) => {
760          body.extend_from_slice(&buffer[..n]);
761          total_bytes_read += n;
762        }
763        Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => {
764          if total_bytes_read > 0 {
765            break;
766          }
767        }
768        Err(_err) => break,
769      }
770      if let Some(limit) = limits {
771        if total_bytes_read >= limit as usize {
772          break;
773        }
774      }
775    }
776  }
777  #[cfg(feature = "gzip")]
778  if let Some(ce) = headers.get(http::header::CONTENT_ENCODING) {
779    if ce == "gzip" {
780      let mut gzip_body = Vec::new();
781      let mut d = MultiGzDecoder::new(&body[..]);
782      d.read_to_end(&mut gzip_body)?;
783      body = gzip_body;
784    }
785  }
786  Ok(body)
787}
788
789/// Read chunked transfer encoding body.
790async fn read_chunked_body<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Vec<u8>> {
791  let mut body: Vec<u8> = Vec::new();
792  loop {
793    let mut chunk: String = String::new();
794    loop {
795      let mut one_byte = vec![0; 1];
796      reader.read_exact(&mut one_byte).await?;
797      if one_byte[0] != 10 && one_byte[0] != 13 {
798        chunk.push(one_byte[0] as char);
799        break;
800      }
801    }
802    loop {
803      let mut one_byte = vec![0; 1];
804      reader.read_exact(&mut one_byte).await?;
805      if one_byte[0] == 10 || one_byte[0] == 13 {
806        reader.read_exact(&mut one_byte).await?;
807        break;
808      } else {
809        chunk.push(one_byte[0] as char)
810      }
811    }
812    if chunk == "0" || chunk.is_empty() {
813      break;
814    }
815    let chunk = usize::from_str_radix(&chunk, 16)?;
816    let mut chunk_of_bytes = vec![0; chunk];
817    reader.read_exact(&mut chunk_of_bytes).await?;
818    body.append(&mut chunk_of_bytes);
819  }
820  Ok(body)
821}
822
823/// A builder to construct the properties of a `Response`.
824///
825/// To construct a `ResponseBuilder`, refer to the `Client` documentation.
826#[derive(Debug)]
827pub struct ResponseBuilder<T: AsyncRead + AsyncReadExt> {
828  builder: http::response::Builder,
829  reader: BufReader<T>,
830  config: ResponseConfig,
831}
832
833/// response config
834#[derive(Debug, Default)]
835pub struct ResponseConfig {
836  method: Method,
837  timeout: Option<Duration>,
838  unsafe_response: bool,
839  max_read: Option<u64>,
840}
841
842impl ResponseConfig {
843  /// new a response config
844  pub fn new(request: &Request, timeout: Option<Duration>) -> Self {
845    let method = request.method().clone();
846    let unsafe_response = request.is_unsafe();
847    ResponseConfig {
848      method,
849      timeout,
850      unsafe_response,
851      max_read: None,
852    }
853  }
854}
855
856impl<T: AsyncRead + Unpin + Sized> ResponseBuilder<T> {
857  /// Constructs a new response.
858  pub fn new(reader: BufReader<T>, config: ResponseConfig) -> ResponseBuilder<T> {
859    ResponseBuilder {
860      builder: Default::default(),
861      reader,
862      config,
863    }
864  }
865  async fn parser_version(&mut self) -> Result<(http::Version, http::StatusCode)> {
866    let (mut vf, mut sf) = (false, false);
867    let mut lines = Vec::new();
868    if let Ok(_length) = timeout(
869      self.config.timeout.unwrap_or(Duration::from_secs(30)),
870      self.reader.read_until(b'\n', &mut lines),
871    )
872    .await
873    .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::TimedOut, e)))?
874    {
875      let mut version = http::Version::default();
876      let mut sc = http::StatusCode::default();
877      for (index, vc) in lines.splitn(3, |b| b == &b' ').enumerate() {
878        if vc.is_empty() {
879          return Err(new_io_error(
880            std::io::ErrorKind::InvalidData,
881            "invalid http version and status_code data",
882          ));
883        }
884        match index {
885          0 => {
886            version = match vc {
887              b"HTTP/0.9" => http::Version::HTTP_09,
888              b"HTTP/1.0" => http::Version::HTTP_10,
889              b"HTTP/1.1" => http::Version::HTTP_11,
890              b"HTTP/2.0" => http::Version::HTTP_2,
891              b"HTTP/3.0" => http::Version::HTTP_3,
892              _ => {
893                return Err(new_io_error(
894                  std::io::ErrorKind::InvalidData,
895                  "invalid http version",
896                ));
897              }
898            };
899            vf = true;
900          }
901          1 => {
902            sc = http::StatusCode::try_from(vc).map_err(|x| Error::Http(http::Error::from(x)))?;
903            sf = true;
904          }
905          _ => {}
906        }
907      }
908      if !(vf && sf) {
909        return Err(new_io_error(
910          std::io::ErrorKind::InvalidData,
911          "invalid http version and status_code data",
912        ));
913      }
914      Ok((version, sc))
915    } else {
916      Err(new_io_error(
917        std::io::ErrorKind::InvalidData,
918        "invalid http version and status_code data",
919      ))
920    }
921  }
922  async fn read_headers(&mut self) -> Result<http::HeaderMap> {
923    // 读取请求头
924    let mut headers = http::HeaderMap::new();
925    let mut header_line = Vec::new();
926    while let Ok(length) = timeout(
927      self.config.timeout.unwrap_or(Duration::from_secs(30)),
928      self.reader.read_until(b'\n', &mut header_line),
929    )
930    .await
931    .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::TimedOut, e)))?
932    {
933      if length == 0 || header_line == b"\r\n" {
934        break;
935      }
936      if let Ok((Some(k), Some(v))) = parser_headers(&header_line) {
937        if headers.contains_key(&k) {
938          headers.append(k, v);
939        } else {
940          headers.insert(k, v);
941        }
942      };
943      header_line.clear();
944    }
945    Ok(headers)
946  }
947
948  /// Build a `Response`, which can be inspected, modified and executed with
949  /// `Client::execute()`.
950  pub async fn build(mut self) -> Result<(Response, T)> {
951    let (v, c) = self.parser_version().await?;
952    self.builder = self.builder.version(v).status(c);
953    let headers = self.read_headers().await?;
954    // 读取body - 使用共享的body读取函数
955    let body = read_body(&mut self.reader, &headers, &self.config).await?;
956    if let Some(h) = self.builder.headers_mut() {
957      *h = headers;
958    }
959    let resp = self.builder.body(body)?;
960    let response = resp.into();
961    let socket = self.reader.into_inner();
962    Ok((response, socket))
963  }
964
965  /// Build a `StreamingResponse` that allows streaming body reads.
966  ///
967  /// Unlike [`build()`](Self::build), this method returns immediately after parsing
968  /// the status line and headers, without reading the body. This gives the caller
969  /// full control over how and when to read the body data.
970  ///
971  /// # Use Cases
972  ///
973  /// - Reading large response bodies without loading them entirely into memory
974  /// - Processing response data incrementally as it arrives
975  /// - Deciding whether to read the body based on status code or headers
976  /// - Implementing custom body handling logic
977  ///
978  /// # Example
979  ///
980  /// ```rust,ignore
981  /// use slinger::{ResponseBuilder, ResponseConfig};
982  /// use tokio::io::AsyncBufReadExt;
983  ///
984  /// async fn stream_body() -> Result<(), Box<dyn std::error::Error>> {
985  ///     // Assuming you have a reader and config...
986  ///     let builder = ResponseBuilder::new(reader, config);
987  ///     
988  ///     // Get streaming response - body not read yet
989  ///     let mut streaming = builder.build_streaming().await?;
990  ///     
991  ///     // Check headers first
992  ///     if streaming.status_code().is_success() {
993  ///         // Read body line by line
994  ///         let mut line = String::new();
995  ///         while streaming.read_line(&mut line).await? > 0 {
996  ///             process_line(&line);
997  ///             line.clear();
998  ///         }
999  ///     }
1000  ///     
1001  ///     Ok(())
1002  /// }
1003  /// # fn process_line(_: &str) {}
1004  /// ```
1005  pub async fn build_streaming(mut self) -> Result<StreamingResponse<T>> {
1006    let (version, status_code) = self.parser_version().await?;
1007    let headers = self.read_headers().await?;
1008    Ok(StreamingResponse {
1009      version,
1010      status_code,
1011      headers,
1012      extensions: http::Extensions::new(),
1013      reader: self.reader,
1014      config: self.config,
1015    })
1016  }
1017}
1018
1019pub(crate) fn parser_headers(
1020  buffer: &[u8],
1021) -> Result<(Option<http::HeaderName>, Option<http::HeaderValue>)> {
1022  let mut k = None;
1023  let mut v = None;
1024  let buffer = buffer.strip_suffix(CR_LF).unwrap_or(buffer);
1025  for (index, h) in buffer.splitn(2, |s| s == &58).enumerate() {
1026    let h = h.strip_prefix(SPACE).unwrap_or(h);
1027    match index {
1028      0 => match http::HeaderName::from_bytes(h) {
1029        Ok(hk) => k = Some(hk),
1030        Err(err) => {
1031          return Err(Error::Http(http::Error::from(err)));
1032        }
1033      },
1034      1 => match http::HeaderValue::from_bytes(h) {
1035        Ok(hv) => v = Some(hv),
1036        Err(err) => {
1037          return Err(Error::Http(http::Error::from(err)));
1038        }
1039      },
1040      _ => {}
1041    }
1042  }
1043  Ok((k, v))
1044}