1use crate::error::{Error, Result};
4use crate::headers::Headers;
5use crate::url::Url;
6use bytes::{Bytes, BytesMut};
7use http::StatusCode;
8use http_body::{Body as HttpBody, Frame, SizeHint};
9use std::fmt;
10use std::future::Future;
11use std::io::Read;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15pub struct Body {
27 inner: BodyInner,
28}
29
30enum BodyInner {
31 Empty,
32 Buffered(Option<Bytes>),
33 H1(crate::transport::h1::H1Body),
34 H2(crate::transport::h2::H2Body),
35 H2Direct(Box<crate::transport::h2::H2DirectBody>),
36 H3(crate::transport::h3::H3Body),
37}
38
39#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40pub enum BodyCapacityProtocol {
41 Empty,
42 Buffered,
43 H1,
44 H2,
45 H2Direct,
46 H3,
47}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq)]
50pub struct BodyCapacity {
51 pub protocol: BodyCapacityProtocol,
52 pub buffer_capacity: usize,
53 pub buffered_chunks: usize,
54 pub available_slots: usize,
55 pub buffered_bytes: usize,
56 pub closed: bool,
57 pub ended: bool,
58}
59
60impl Body {
61 pub fn empty() -> Self {
63 Self {
64 inner: BodyInner::Empty,
65 }
66 }
67
68 pub fn from_bytes(bytes: impl Into<Bytes>) -> Self {
71 let bytes = bytes.into();
72 if bytes.is_empty() {
73 Self::empty()
74 } else {
75 Self {
76 inner: BodyInner::Buffered(Some(bytes)),
77 }
78 }
79 }
80
81 pub(crate) fn from_h1(body: crate::transport::h1::H1Body) -> Self {
83 Self {
84 inner: BodyInner::H1(body),
85 }
86 }
87
88 pub(crate) fn from_h2(body: crate::transport::h2::H2Body) -> Self {
90 Self {
91 inner: BodyInner::H2(body),
92 }
93 }
94
95 pub(crate) fn from_h2_direct(body: crate::transport::h2::H2DirectBody) -> Self {
97 Self {
98 inner: BodyInner::H2Direct(Box::new(body)),
99 }
100 }
101
102 pub(crate) fn from_h3(body: crate::transport::h3::H3Body) -> Self {
104 Self {
105 inner: BodyInner::H3(body),
106 }
107 }
108
109 pub fn is_empty(&self) -> bool {
112 match &self.inner {
113 BodyInner::Empty => true,
114 BodyInner::Buffered(Some(b)) => b.is_empty(),
115 BodyInner::Buffered(None) => true,
116 BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_) => {
117 false
118 }
119 }
120 }
121
122 pub fn is_streaming(&self) -> bool {
124 matches!(
125 self.inner,
126 BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_)
127 )
128 }
129
130 pub fn as_bytes(&self) -> Option<&Bytes> {
133 match &self.inner {
134 BodyInner::Buffered(Some(b)) => Some(b),
135 _ => None,
136 }
137 }
138
139 pub fn buffered_len(&self) -> Option<usize> {
141 match &self.inner {
142 BodyInner::Empty => Some(0),
143 BodyInner::Buffered(Some(b)) => Some(b.len()),
144 BodyInner::Buffered(None) => Some(0),
145 BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_) => None,
146 }
147 }
148
149 pub fn h3_capacity(&self) -> Option<crate::transport::h3::H3BodyCapacity> {
152 match &self.inner {
153 BodyInner::H3(body) => Some(body.capacity()),
154 _ => None,
155 }
156 }
157
158 pub fn capacity(&self) -> BodyCapacity {
165 match &self.inner {
166 BodyInner::Empty => BodyCapacity {
167 protocol: BodyCapacityProtocol::Empty,
168 buffer_capacity: 0,
169 buffered_chunks: 0,
170 available_slots: 0,
171 buffered_bytes: 0,
172 closed: false,
173 ended: true,
174 },
175 BodyInner::Buffered(bytes) => {
176 let buffered_bytes = bytes.as_ref().map(Bytes::len).unwrap_or(0);
177 BodyCapacity {
178 protocol: BodyCapacityProtocol::Buffered,
179 buffer_capacity: usize::from(buffered_bytes > 0),
180 buffered_chunks: usize::from(buffered_bytes > 0),
181 available_slots: usize::from(buffered_bytes == 0),
182 buffered_bytes,
183 closed: false,
184 ended: true,
185 }
186 }
187 BodyInner::H1(_) => BodyCapacity {
188 protocol: BodyCapacityProtocol::H1,
189 buffer_capacity: 0,
190 buffered_chunks: 0,
191 available_slots: 0,
192 buffered_bytes: 0,
193 closed: false,
194 ended: false,
195 },
196 BodyInner::H2(body) => {
197 let capacity = body.capacity();
198 BodyCapacity {
199 protocol: BodyCapacityProtocol::H2,
200 buffer_capacity: capacity.buffer_capacity,
201 buffered_chunks: capacity.buffered_chunks,
202 available_slots: capacity.available_slots,
203 buffered_bytes: capacity.buffered_bytes,
204 closed: capacity.closed,
205 ended: capacity.ended,
206 }
207 }
208 BodyInner::H2Direct(_) => BodyCapacity {
209 protocol: BodyCapacityProtocol::H2Direct,
210 buffer_capacity: 0,
211 buffered_chunks: 0,
212 available_slots: 0,
213 buffered_bytes: 0,
214 closed: false,
215 ended: false,
216 },
217 BodyInner::H3(body) => {
218 let capacity = body.capacity();
219 BodyCapacity {
220 protocol: BodyCapacityProtocol::H3,
221 buffer_capacity: capacity.buffer_capacity,
222 buffered_chunks: capacity.buffered_chunks,
223 available_slots: capacity.available_slots,
224 buffered_bytes: capacity.buffered_bytes,
225 closed: capacity.closed,
226 ended: capacity.ended,
227 }
228 }
229 }
230 }
231
232 pub fn len(&self) -> usize {
236 self.buffered_len().unwrap_or(0)
237 }
238
239 pub fn frame(&mut self) -> FrameFuture<'_> {
241 FrameFuture { body: self }
242 }
243
244 #[inline(always)]
246 pub fn chunk(&mut self) -> ChunkFuture<'_> {
247 ChunkFuture { body: self }
248 }
249
250 pub async fn collect_to_bytes(&mut self) -> Result<Bytes> {
256 let mut buf = BytesMut::new();
257 while let Some(frame) = self.frame().await {
258 let frame = frame?;
259 if let Ok(data) = frame.into_data() {
260 buf.extend_from_slice(&data);
261 }
262 }
263 Ok(buf.freeze())
264 }
265}
266
267impl Default for Body {
268 fn default() -> Self {
269 Self::empty()
270 }
271}
272
273impl fmt::Debug for Body {
274 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
275 match &self.inner {
276 BodyInner::Empty => f.debug_struct("Body::Empty").finish(),
277 BodyInner::Buffered(Some(b)) => f
278 .debug_struct("Body::Buffered")
279 .field("len", &b.len())
280 .finish(),
281 BodyInner::Buffered(None) => f.debug_struct("Body::Buffered").field("len", &0).finish(),
282 BodyInner::H1(_) => f.debug_struct("Body::H1Streaming").finish(),
283 BodyInner::H2(_) => f.debug_struct("Body::H2Streaming").finish(),
284 BodyInner::H2Direct(_) => f.debug_struct("Body::H2DirectStreaming").finish(),
285 BodyInner::H3(_) => f.debug_struct("Body::H3Streaming").finish(),
286 }
287 }
288}
289
290impl Clone for Body {
291 fn clone(&self) -> Self {
292 match &self.inner {
293 BodyInner::Empty => Self::empty(),
294 BodyInner::Buffered(Some(b)) => Self {
295 inner: BodyInner::Buffered(Some(b.clone())),
296 },
297 BodyInner::Buffered(None) => Self {
298 inner: BodyInner::Buffered(None),
299 },
300 BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_) => {
301 panic!("specter::Body::clone is not supported for streaming bodies")
302 }
303 }
304 }
305}
306
307impl From<Bytes> for Body {
308 fn from(value: Bytes) -> Self {
309 Self::from_bytes(value)
310 }
311}
312
313impl HttpBody for Body {
314 type Data = Bytes;
315 type Error = Error;
316
317 fn poll_frame(
318 mut self: Pin<&mut Self>,
319 cx: &mut Context<'_>,
320 ) -> Poll<Option<std::result::Result<Frame<Self::Data>, Self::Error>>> {
321 match &mut self.inner {
322 BodyInner::Empty => Poll::Ready(None),
323 BodyInner::Buffered(slot) => match slot.take() {
324 Some(bytes) if !bytes.is_empty() => Poll::Ready(Some(Ok(Frame::data(bytes)))),
325 _ => Poll::Ready(None),
326 },
327 BodyInner::H1(body) => Pin::new(body).poll_frame(cx),
328 BodyInner::H2(body) => Pin::new(body).poll_frame(cx),
329 BodyInner::H2Direct(body) => Pin::new(body.as_mut()).poll_frame(cx),
330 BodyInner::H3(body) => Pin::new(body).poll_frame(cx),
331 }
332 }
333
334 fn is_end_stream(&self) -> bool {
335 match &self.inner {
336 BodyInner::Empty => true,
337 BodyInner::Buffered(None) => true,
338 BodyInner::Buffered(Some(b)) => b.is_empty(),
339 BodyInner::H1(body) => body.is_terminal(),
340 BodyInner::H2(body) => body.is_terminal(),
341 BodyInner::H2Direct(body) => body.is_terminal(),
342 BodyInner::H3(body) => body.is_terminal(),
343 }
344 }
345
346 fn size_hint(&self) -> SizeHint {
347 match &self.inner {
348 BodyInner::Empty => SizeHint::with_exact(0),
349 BodyInner::Buffered(Some(b)) => SizeHint::with_exact(b.len() as u64),
350 BodyInner::Buffered(None) => SizeHint::with_exact(0),
351 BodyInner::H1(body) => body.size_hint(),
352 BodyInner::H2(body) => body.size_hint(),
353 BodyInner::H2Direct(body) => body.size_hint(),
354 BodyInner::H3(body) => body.size_hint(),
355 }
356 }
357}
358
359impl Body {
360 #[inline(always)]
361 fn poll_chunk(
362 mut self: Pin<&mut Self>,
363 cx: &mut Context<'_>,
364 ) -> Poll<Option<std::result::Result<Bytes, Error>>> {
365 match &mut self.inner {
366 BodyInner::Empty => Poll::Ready(None),
367 BodyInner::Buffered(slot) => match slot.take() {
368 Some(bytes) if !bytes.is_empty() => Poll::Ready(Some(Ok(bytes))),
369 _ => Poll::Ready(None),
370 },
371 BodyInner::H2(body) => Pin::new(body).poll_data_coalesced(cx),
372 BodyInner::H2Direct(body) => Pin::new(body.as_mut()).poll_data(cx),
373 BodyInner::H1(body) => match Pin::new(body).poll_frame(cx) {
374 Poll::Ready(Some(Ok(frame))) => match frame.into_data() {
375 Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
376 Err(_) => Poll::Pending,
377 },
378 Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error))),
379 Poll::Ready(None) => Poll::Ready(None),
380 Poll::Pending => Poll::Pending,
381 },
382 BodyInner::H3(body) => match Pin::new(body).poll_frame(cx) {
383 Poll::Ready(Some(Ok(frame))) => match frame.into_data() {
384 Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
385 Err(_) => Poll::Pending,
386 },
387 Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error))),
388 Poll::Ready(None) => Poll::Ready(None),
389 Poll::Pending => Poll::Pending,
390 },
391 }
392 }
393}
394
395pub struct FrameFuture<'a> {
397 body: &'a mut Body,
398}
399
400impl<'a> Future for FrameFuture<'a> {
401 type Output = Option<std::result::Result<Frame<Bytes>, Error>>;
402
403 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
404 let body = &mut *self.get_mut().body;
405 match Pin::new(body).poll_frame(cx) {
406 Poll::Pending => Poll::Pending,
407 Poll::Ready(value) => Poll::Ready(value),
408 }
409 }
410}
411
412pub struct ChunkFuture<'a> {
414 body: &'a mut Body,
415}
416
417impl<'a> Future for ChunkFuture<'a> {
418 type Output = Option<std::result::Result<Bytes, Error>>;
419
420 #[inline(always)]
421 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
422 let body = &mut *self.get_mut().body;
423 Pin::new(body).poll_chunk(cx)
424 }
425}
426
427#[derive(Debug, Clone)]
429pub struct Response {
430 pub(crate) status: u16,
431 headers: Headers,
432 body: Body,
433 http_version: String,
434 effective_url: Option<Url>,
435}
436
437impl Response {
438 pub fn new(status: u16, headers: Headers, body: Bytes, http_version: String) -> Self {
442 Self {
443 status,
444 headers,
445 body: Body::from_bytes(body),
446 http_version,
447 effective_url: None,
448 }
449 }
450
451 pub fn with_body(status: u16, headers: Headers, body: Body, http_version: String) -> Self {
454 Self {
455 status,
456 headers,
457 body,
458 http_version,
459 effective_url: None,
460 }
461 }
462
463 pub(crate) fn into_status_headers_version(self) -> (u16, Headers, String) {
464 (self.status, self.headers, self.http_version)
465 }
466
467 pub fn with_url(mut self, url: Url) -> Self {
469 self.effective_url = Some(url);
470 self
471 }
472
473 pub(crate) async fn into_buffered(mut self) -> Result<Self> {
474 if self.body.is_streaming() {
475 let bytes = self.body.collect_to_bytes().await?;
476 self.body = Body::from_bytes(bytes);
477 }
478 Ok(self)
479 }
480
481 pub fn http_version(&self) -> &str {
482 &self.http_version
483 }
484
485 pub fn status(&self) -> StatusCode {
486 StatusCode::from_u16(self.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
487 }
488
489 pub fn status_code(&self) -> u16 {
490 self.status
491 }
492
493 pub fn headers(&self) -> &Headers {
494 &self.headers
495 }
496
497 pub fn url(&self) -> Option<&Url> {
498 self.effective_url.as_ref()
499 }
500
501 pub fn body(&self) -> &Body {
503 &self.body
504 }
505
506 pub fn body_mut(&mut self) -> &mut Body {
509 &mut self.body
510 }
511
512 pub fn into_body(self) -> Body {
514 self.body
515 }
516
517 pub fn buffered_bytes(&self) -> Option<&Bytes> {
521 self.body.as_bytes()
522 }
523
524 pub fn bytes_raw(&self) -> Result<Bytes> {
525 self.body
526 .as_bytes()
527 .cloned()
528 .ok_or_else(|| Error::HttpProtocol("response body is streaming, not buffered".into()))
529 }
530
531 pub fn bytes(&self) -> Result<Bytes> {
532 self.decoded_body()
533 }
534
535 pub fn is_success(&self) -> bool {
536 (200..300).contains(&self.status)
537 }
538 pub fn is_redirect(&self) -> bool {
539 (300..400).contains(&self.status)
540 }
541 pub fn redirect_url(&self) -> Option<&str> {
542 self.get_header("Location")
543 }
544
545 pub fn get_header(&self, name: &str) -> Option<&str> {
546 self.headers.get(name)
547 }
548
549 pub fn get_headers(&self, name: &str) -> Vec<&str> {
550 self.headers.get_all(name)
551 }
552
553 pub fn content_type(&self) -> Option<&str> {
554 self.get_header("Content-Type")
555 }
556 pub fn content_encoding(&self) -> Option<&str> {
557 self.get_header("Content-Encoding")
558 }
559
560 pub fn decoded_body(&self) -> Result<Bytes> {
565 let body = self.body.as_bytes().ok_or_else(|| {
566 Error::HttpProtocol("response body is streaming, not buffered".into())
567 })?;
568
569 let encodings: Vec<&str> = self
570 .content_encoding()
571 .map(|s| s.split(',').map(str::trim).collect())
572 .unwrap_or_default();
573
574 if !encodings.is_empty() {
575 let mut data = body.clone();
576 for encoding in encodings.iter().rev() {
577 data = match encoding.to_lowercase().as_str() {
578 "gzip" | "x-gzip" => decode_gzip(&data)?,
579 "deflate" => decode_deflate(&data)?,
580 "br" => decode_brotli(&data)?,
581 "zstd" => decode_zstd(&data)?,
582 "identity" => data,
583 _ => data,
584 };
585 }
586 return Ok(data);
587 }
588
589 if body.len() >= 4
590 && body[0] == 0x28
591 && body[1] == 0xB5
592 && body[2] == 0x2F
593 && body[3] == 0xFD
594 {
595 return decode_zstd(body);
596 }
597 if body.len() >= 2 && body[0] == 0x1f && body[1] == 0x8b {
598 return decode_gzip(body);
599 }
600
601 Ok(body.clone())
602 }
603
604 pub fn text(&self) -> Result<String> {
605 let decoded = self.decoded_body()?;
606 String::from_utf8(decoded.to_vec())
607 .map_err(|e| Error::Decompression(format!("UTF-8 decode error: {}", e)))
608 }
609
610 pub fn json<T: serde::de::DeserializeOwned>(&self) -> Result<T> {
611 let text = self.text()?;
612 serde_json::from_str(&text).map_err(Error::from)
613 }
614
615 pub fn error_for_status(self) -> Result<Self> {
616 if self.status().is_client_error() || self.status().is_server_error() {
617 let message = self
618 .status()
619 .canonical_reason()
620 .unwrap_or("HTTP error")
621 .to_string();
622 Err(Error::http_status(self.status, message))
623 } else {
624 Ok(self)
625 }
626 }
627
628 pub fn error_for_status_ref(&self) -> Result<&Self> {
629 if self.status().is_client_error() || self.status().is_server_error() {
630 let message = self
631 .status()
632 .canonical_reason()
633 .unwrap_or("HTTP error")
634 .to_string();
635 Err(Error::http_status(self.status, message))
636 } else {
637 Ok(self)
638 }
639 }
640}
641
642fn decode_gzip(data: &[u8]) -> Result<Bytes> {
643 let mut decoder = flate2::read::GzDecoder::new(data);
644 let mut decoded = Vec::new();
645 decoder
646 .read_to_end(&mut decoded)
647 .map_err(|e| Error::Decompression(format!("gzip: {}", e)))?;
648 Ok(Bytes::from(decoded))
649}
650
651fn decode_deflate(data: &[u8]) -> Result<Bytes> {
652 let mut decoded = Vec::new();
653 if flate2::read::ZlibDecoder::new(data)
654 .read_to_end(&mut decoded)
655 .is_ok()
656 {
657 return Ok(Bytes::from(decoded));
658 }
659 decoded.clear();
660 flate2::read::DeflateDecoder::new(data)
661 .read_to_end(&mut decoded)
662 .map_err(|e| Error::Decompression(format!("deflate: {}", e)))?;
663 Ok(Bytes::from(decoded))
664}
665
666fn decode_brotli(data: &[u8]) -> Result<Bytes> {
667 let mut decoder = brotli::Decompressor::new(data, 4096);
668 let mut decoded = Vec::new();
669 decoder
670 .read_to_end(&mut decoded)
671 .map_err(|e| Error::Decompression(format!("brotli: {}", e)))?;
672 Ok(Bytes::from(decoded))
673}
674
675fn decode_zstd(data: &[u8]) -> Result<Bytes> {
676 zstd::stream::decode_all(data)
677 .map(Bytes::from)
678 .map_err(|e| Error::Decompression(format!("zstd: {}", e)))
679}