1use crate::error::{Error, Result};
4use crate::headers::Headers;
5use crate::url::Url;
6use bytes::{Bytes, BytesMut};
7use http::StatusCode;
8use http_body::{Body as HttpBody, Frame, SizeHint};
9use std::fmt;
10use std::future::Future;
11use std::io::Read;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15pub struct Body {
27 inner: BodyInner,
28}
29
30enum BodyInner {
31 Empty,
32 Buffered(Option<Bytes>),
33 H1(crate::transport::h1::H1Body),
34 H2(crate::transport::h2::H2Body),
35 H2Direct(Box<crate::transport::h2::H2DirectBody>),
36 H3(crate::transport::h3::H3Body),
37}
38
39#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40pub enum BodyCapacityProtocol {
41 Empty,
42 Buffered,
43 H1,
44 H2,
45 H2Direct,
46 H3,
47}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq)]
50pub struct BodyCapacity {
51 pub protocol: BodyCapacityProtocol,
52 pub buffer_capacity: usize,
53 pub buffered_chunks: usize,
54 pub available_slots: usize,
55 pub buffered_bytes: usize,
56 pub closed: bool,
57 pub ended: bool,
58}
59
60impl Body {
61 pub fn empty() -> Self {
63 Self {
64 inner: BodyInner::Empty,
65 }
66 }
67
68 pub fn from_bytes(bytes: impl Into<Bytes>) -> Self {
71 let bytes = bytes.into();
72 if bytes.is_empty() {
73 Self::empty()
74 } else {
75 Self {
76 inner: BodyInner::Buffered(Some(bytes)),
77 }
78 }
79 }
80
81 pub(crate) fn from_h1(body: crate::transport::h1::H1Body) -> Self {
83 Self {
84 inner: BodyInner::H1(body),
85 }
86 }
87
88 pub(crate) fn from_h2(body: crate::transport::h2::H2Body) -> Self {
90 Self {
91 inner: BodyInner::H2(body),
92 }
93 }
94
95 pub(crate) fn from_h2_direct(body: crate::transport::h2::H2DirectBody) -> Self {
97 Self {
98 inner: BodyInner::H2Direct(Box::new(body)),
99 }
100 }
101
102 pub(crate) fn from_h3(body: crate::transport::h3::H3Body) -> Self {
104 Self {
105 inner: BodyInner::H3(body),
106 }
107 }
108
109 pub async fn trailers(&mut self) -> Result<Option<Headers>> {
120 match &mut self.inner {
121 BodyInner::H2(body) => body.trailers().await,
122 BodyInner::Empty
123 | BodyInner::Buffered(_)
124 | BodyInner::H1(_)
125 | BodyInner::H2Direct(_)
126 | BodyInner::H3(_) => Ok(None),
127 }
128 }
129
130 pub fn is_empty(&self) -> bool {
133 match &self.inner {
134 BodyInner::Empty => true,
135 BodyInner::Buffered(Some(b)) => b.is_empty(),
136 BodyInner::Buffered(None) => true,
137 BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_) => {
138 false
139 }
140 }
141 }
142
143 pub fn is_streaming(&self) -> bool {
145 matches!(
146 self.inner,
147 BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_)
148 )
149 }
150
151 pub fn as_bytes(&self) -> Option<&Bytes> {
154 match &self.inner {
155 BodyInner::Buffered(Some(b)) => Some(b),
156 _ => None,
157 }
158 }
159
160 pub fn buffered_len(&self) -> Option<usize> {
162 match &self.inner {
163 BodyInner::Empty => Some(0),
164 BodyInner::Buffered(Some(b)) => Some(b.len()),
165 BodyInner::Buffered(None) => Some(0),
166 BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_) => None,
167 }
168 }
169
170 pub fn h3_capacity(&self) -> Option<crate::transport::h3::H3BodyCapacity> {
173 match &self.inner {
174 BodyInner::H3(body) => Some(body.capacity()),
175 _ => None,
176 }
177 }
178
179 pub fn capacity(&self) -> BodyCapacity {
186 match &self.inner {
187 BodyInner::Empty => BodyCapacity {
188 protocol: BodyCapacityProtocol::Empty,
189 buffer_capacity: 0,
190 buffered_chunks: 0,
191 available_slots: 0,
192 buffered_bytes: 0,
193 closed: false,
194 ended: true,
195 },
196 BodyInner::Buffered(bytes) => {
197 let buffered_bytes = bytes.as_ref().map(Bytes::len).unwrap_or(0);
198 BodyCapacity {
199 protocol: BodyCapacityProtocol::Buffered,
200 buffer_capacity: usize::from(buffered_bytes > 0),
201 buffered_chunks: usize::from(buffered_bytes > 0),
202 available_slots: usize::from(buffered_bytes == 0),
203 buffered_bytes,
204 closed: false,
205 ended: true,
206 }
207 }
208 BodyInner::H1(_) => BodyCapacity {
209 protocol: BodyCapacityProtocol::H1,
210 buffer_capacity: 0,
211 buffered_chunks: 0,
212 available_slots: 0,
213 buffered_bytes: 0,
214 closed: false,
215 ended: false,
216 },
217 BodyInner::H2(body) => {
218 let capacity = body.capacity();
219 BodyCapacity {
220 protocol: BodyCapacityProtocol::H2,
221 buffer_capacity: capacity.buffer_capacity,
222 buffered_chunks: capacity.buffered_chunks,
223 available_slots: capacity.available_slots,
224 buffered_bytes: capacity.buffered_bytes,
225 closed: capacity.closed,
226 ended: capacity.ended,
227 }
228 }
229 BodyInner::H2Direct(_) => BodyCapacity {
230 protocol: BodyCapacityProtocol::H2Direct,
231 buffer_capacity: 0,
232 buffered_chunks: 0,
233 available_slots: 0,
234 buffered_bytes: 0,
235 closed: false,
236 ended: false,
237 },
238 BodyInner::H3(body) => {
239 let capacity = body.capacity();
240 BodyCapacity {
241 protocol: BodyCapacityProtocol::H3,
242 buffer_capacity: capacity.buffer_capacity,
243 buffered_chunks: capacity.buffered_chunks,
244 available_slots: capacity.available_slots,
245 buffered_bytes: capacity.buffered_bytes,
246 closed: capacity.closed,
247 ended: capacity.ended,
248 }
249 }
250 }
251 }
252
253 pub fn len(&self) -> usize {
257 self.buffered_len().unwrap_or(0)
258 }
259
260 pub fn frame(&mut self) -> FrameFuture<'_> {
262 FrameFuture { body: self }
263 }
264
265 #[inline(always)]
267 pub fn chunk(&mut self) -> ChunkFuture<'_> {
268 ChunkFuture { body: self }
269 }
270
271 pub async fn collect_to_bytes(&mut self) -> Result<Bytes> {
277 let mut buf = BytesMut::new();
278 while let Some(frame) = self.frame().await {
279 let frame = frame?;
280 if let Ok(data) = frame.into_data() {
281 buf.extend_from_slice(&data);
282 }
283 }
284 Ok(buf.freeze())
285 }
286}
287
288impl Default for Body {
289 fn default() -> Self {
290 Self::empty()
291 }
292}
293
294impl fmt::Debug for Body {
295 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
296 match &self.inner {
297 BodyInner::Empty => f.debug_struct("Body::Empty").finish(),
298 BodyInner::Buffered(Some(b)) => f
299 .debug_struct("Body::Buffered")
300 .field("len", &b.len())
301 .finish(),
302 BodyInner::Buffered(None) => f.debug_struct("Body::Buffered").field("len", &0).finish(),
303 BodyInner::H1(_) => f.debug_struct("Body::H1Streaming").finish(),
304 BodyInner::H2(_) => f.debug_struct("Body::H2Streaming").finish(),
305 BodyInner::H2Direct(_) => f.debug_struct("Body::H2DirectStreaming").finish(),
306 BodyInner::H3(_) => f.debug_struct("Body::H3Streaming").finish(),
307 }
308 }
309}
310
311impl Clone for Body {
312 fn clone(&self) -> Self {
313 match &self.inner {
314 BodyInner::Empty => Self::empty(),
315 BodyInner::Buffered(Some(b)) => Self {
316 inner: BodyInner::Buffered(Some(b.clone())),
317 },
318 BodyInner::Buffered(None) => Self {
319 inner: BodyInner::Buffered(None),
320 },
321 BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_) => {
322 panic!("specter::Body::clone is not supported for streaming bodies")
323 }
324 }
325 }
326}
327
328impl From<Bytes> for Body {
329 fn from(value: Bytes) -> Self {
330 Self::from_bytes(value)
331 }
332}
333
334impl HttpBody for Body {
335 type Data = Bytes;
336 type Error = Error;
337
338 fn poll_frame(
339 mut self: Pin<&mut Self>,
340 cx: &mut Context<'_>,
341 ) -> Poll<Option<std::result::Result<Frame<Self::Data>, Self::Error>>> {
342 match &mut self.inner {
343 BodyInner::Empty => Poll::Ready(None),
344 BodyInner::Buffered(slot) => match slot.take() {
345 Some(bytes) if !bytes.is_empty() => Poll::Ready(Some(Ok(Frame::data(bytes)))),
346 _ => Poll::Ready(None),
347 },
348 BodyInner::H1(body) => Pin::new(body).poll_frame(cx),
349 BodyInner::H2(body) => Pin::new(body).poll_frame(cx),
350 BodyInner::H2Direct(body) => Pin::new(body.as_mut()).poll_frame(cx),
351 BodyInner::H3(body) => Pin::new(body).poll_frame(cx),
352 }
353 }
354
355 fn is_end_stream(&self) -> bool {
356 match &self.inner {
357 BodyInner::Empty => true,
358 BodyInner::Buffered(None) => true,
359 BodyInner::Buffered(Some(b)) => b.is_empty(),
360 BodyInner::H1(body) => body.is_terminal(),
361 BodyInner::H2(body) => body.is_terminal(),
362 BodyInner::H2Direct(body) => body.is_terminal(),
363 BodyInner::H3(body) => body.is_terminal(),
364 }
365 }
366
367 fn size_hint(&self) -> SizeHint {
368 match &self.inner {
369 BodyInner::Empty => SizeHint::with_exact(0),
370 BodyInner::Buffered(Some(b)) => SizeHint::with_exact(b.len() as u64),
371 BodyInner::Buffered(None) => SizeHint::with_exact(0),
372 BodyInner::H1(body) => body.size_hint(),
373 BodyInner::H2(body) => body.size_hint(),
374 BodyInner::H2Direct(body) => body.size_hint(),
375 BodyInner::H3(body) => body.size_hint(),
376 }
377 }
378}
379
380impl Body {
381 #[inline(always)]
382 fn poll_chunk(
383 mut self: Pin<&mut Self>,
384 cx: &mut Context<'_>,
385 ) -> Poll<Option<std::result::Result<Bytes, Error>>> {
386 match &mut self.inner {
387 BodyInner::Empty => Poll::Ready(None),
388 BodyInner::Buffered(slot) => match slot.take() {
389 Some(bytes) if !bytes.is_empty() => Poll::Ready(Some(Ok(bytes))),
390 _ => Poll::Ready(None),
391 },
392 BodyInner::H2(body) => Pin::new(body).poll_data_coalesced(cx),
393 BodyInner::H2Direct(body) => Pin::new(body.as_mut()).poll_data(cx),
394 BodyInner::H1(body) => match Pin::new(body).poll_frame(cx) {
395 Poll::Ready(Some(Ok(frame))) => match frame.into_data() {
396 Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
397 Err(_) => Poll::Pending,
398 },
399 Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error))),
400 Poll::Ready(None) => Poll::Ready(None),
401 Poll::Pending => Poll::Pending,
402 },
403 BodyInner::H3(body) => match Pin::new(body).poll_frame(cx) {
404 Poll::Ready(Some(Ok(frame))) => match frame.into_data() {
405 Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
406 Err(_) => Poll::Pending,
407 },
408 Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error))),
409 Poll::Ready(None) => Poll::Ready(None),
410 Poll::Pending => Poll::Pending,
411 },
412 }
413 }
414}
415
416pub struct FrameFuture<'a> {
418 body: &'a mut Body,
419}
420
421impl<'a> Future for FrameFuture<'a> {
422 type Output = Option<std::result::Result<Frame<Bytes>, Error>>;
423
424 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
425 let body = &mut *self.get_mut().body;
426 match Pin::new(body).poll_frame(cx) {
427 Poll::Pending => Poll::Pending,
428 Poll::Ready(value) => Poll::Ready(value),
429 }
430 }
431}
432
433pub struct ChunkFuture<'a> {
435 body: &'a mut Body,
436}
437
438impl<'a> Future for ChunkFuture<'a> {
439 type Output = Option<std::result::Result<Bytes, Error>>;
440
441 #[inline(always)]
442 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
443 let body = &mut *self.get_mut().body;
444 Pin::new(body).poll_chunk(cx)
445 }
446}
447
448#[derive(Debug, Clone)]
450pub struct Response {
451 pub(crate) status: u16,
452 headers: Headers,
453 body: Body,
454 http_version: String,
455 effective_url: Option<Url>,
456}
457
458impl Response {
459 pub fn new(status: u16, headers: Headers, body: Bytes, http_version: String) -> Self {
463 Self {
464 status,
465 headers,
466 body: Body::from_bytes(body),
467 http_version,
468 effective_url: None,
469 }
470 }
471
472 pub fn with_body(status: u16, headers: Headers, body: Body, http_version: String) -> Self {
475 Self {
476 status,
477 headers,
478 body,
479 http_version,
480 effective_url: None,
481 }
482 }
483
484 pub(crate) fn into_status_headers_version(self) -> (u16, Headers, String) {
485 (self.status, self.headers, self.http_version)
486 }
487
488 pub fn with_url(mut self, url: Url) -> Self {
490 self.effective_url = Some(url);
491 self
492 }
493
494 pub(crate) async fn into_buffered(mut self) -> Result<Self> {
495 if self.body.is_streaming() {
496 let bytes = self.body.collect_to_bytes().await?;
497 self.body = Body::from_bytes(bytes);
498 }
499 Ok(self)
500 }
501
502 pub fn http_version(&self) -> &str {
503 &self.http_version
504 }
505
506 pub fn status(&self) -> StatusCode {
507 StatusCode::from_u16(self.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
508 }
509
510 pub fn status_code(&self) -> u16 {
511 self.status
512 }
513
514 pub fn headers(&self) -> &Headers {
515 &self.headers
516 }
517
518 pub async fn trailers(&mut self) -> Result<Option<Headers>> {
531 self.body.trailers().await
532 }
533
534 pub fn url(&self) -> Option<&Url> {
535 self.effective_url.as_ref()
536 }
537
538 pub fn body(&self) -> &Body {
540 &self.body
541 }
542
543 pub fn body_mut(&mut self) -> &mut Body {
546 &mut self.body
547 }
548
549 pub fn into_body(self) -> Body {
551 self.body
552 }
553
554 pub fn buffered_bytes(&self) -> Option<&Bytes> {
558 self.body.as_bytes()
559 }
560
561 pub fn bytes_raw(&self) -> Result<Bytes> {
562 self.body
563 .as_bytes()
564 .cloned()
565 .ok_or_else(|| Error::HttpProtocol("response body is streaming, not buffered".into()))
566 }
567
568 pub fn bytes(&self) -> Result<Bytes> {
569 self.decoded_body()
570 }
571
572 pub fn is_success(&self) -> bool {
573 (200..300).contains(&self.status)
574 }
575 pub fn is_redirect(&self) -> bool {
576 (300..400).contains(&self.status)
577 }
578 pub fn redirect_url(&self) -> Option<&str> {
579 self.get_header("Location")
580 }
581
582 pub fn get_header(&self, name: &str) -> Option<&str> {
583 self.headers.get(name)
584 }
585
586 pub fn get_headers(&self, name: &str) -> Vec<&str> {
587 self.headers.get_all(name)
588 }
589
590 pub fn content_type(&self) -> Option<&str> {
591 self.get_header("Content-Type")
592 }
593 pub fn content_encoding(&self) -> Option<&str> {
594 self.get_header("Content-Encoding")
595 }
596
597 pub fn decoded_body(&self) -> Result<Bytes> {
602 let body = self.body.as_bytes().ok_or_else(|| {
603 Error::HttpProtocol("response body is streaming, not buffered".into())
604 })?;
605
606 let encodings: Vec<&str> = self
607 .content_encoding()
608 .map(|s| s.split(',').map(str::trim).collect())
609 .unwrap_or_default();
610
611 if !encodings.is_empty() {
612 let mut data = body.clone();
613 for encoding in encodings.iter().rev() {
614 data = match encoding.to_lowercase().as_str() {
615 "gzip" | "x-gzip" => decode_gzip(&data)?,
616 "deflate" => decode_deflate(&data)?,
617 "br" => decode_brotli(&data)?,
618 "zstd" => decode_zstd(&data)?,
619 "identity" => data,
620 _ => data,
621 };
622 }
623 return Ok(data);
624 }
625
626 if body.len() >= 4
627 && body[0] == 0x28
628 && body[1] == 0xB5
629 && body[2] == 0x2F
630 && body[3] == 0xFD
631 {
632 return decode_zstd(body);
633 }
634 if body.len() >= 2 && body[0] == 0x1f && body[1] == 0x8b {
635 return decode_gzip(body);
636 }
637
638 Ok(body.clone())
639 }
640
641 pub fn text(&self) -> Result<String> {
642 let decoded = self.decoded_body()?;
643 String::from_utf8(decoded.to_vec())
644 .map_err(|e| Error::Decompression(format!("UTF-8 decode error: {}", e)))
645 }
646
647 pub fn json<T: serde::de::DeserializeOwned>(&self) -> Result<T> {
648 let text = self.text()?;
649 serde_json::from_str(&text).map_err(Error::from)
650 }
651
652 pub fn error_for_status(self) -> Result<Self> {
653 if self.status().is_client_error() || self.status().is_server_error() {
654 let message = self
655 .status()
656 .canonical_reason()
657 .unwrap_or("HTTP error")
658 .to_string();
659 Err(Error::http_status(self.status, message))
660 } else {
661 Ok(self)
662 }
663 }
664
665 pub fn error_for_status_ref(&self) -> Result<&Self> {
666 if self.status().is_client_error() || self.status().is_server_error() {
667 let message = self
668 .status()
669 .canonical_reason()
670 .unwrap_or("HTTP error")
671 .to_string();
672 Err(Error::http_status(self.status, message))
673 } else {
674 Ok(self)
675 }
676 }
677}
678
679fn decode_gzip(data: &[u8]) -> Result<Bytes> {
680 let mut decoder = flate2::read::GzDecoder::new(data);
681 let mut decoded = Vec::new();
682 decoder
683 .read_to_end(&mut decoded)
684 .map_err(|e| Error::Decompression(format!("gzip: {}", e)))?;
685 Ok(Bytes::from(decoded))
686}
687
688fn decode_deflate(data: &[u8]) -> Result<Bytes> {
689 let mut decoded = Vec::new();
690 if flate2::read::ZlibDecoder::new(data)
691 .read_to_end(&mut decoded)
692 .is_ok()
693 {
694 return Ok(Bytes::from(decoded));
695 }
696 decoded.clear();
697 flate2::read::DeflateDecoder::new(data)
698 .read_to_end(&mut decoded)
699 .map_err(|e| Error::Decompression(format!("deflate: {}", e)))?;
700 Ok(Bytes::from(decoded))
701}
702
703fn decode_brotli(data: &[u8]) -> Result<Bytes> {
704 let mut decoder = brotli::Decompressor::new(data, 4096);
705 let mut decoded = Vec::new();
706 decoder
707 .read_to_end(&mut decoded)
708 .map_err(|e| Error::Decompression(format!("brotli: {}", e)))?;
709 Ok(Bytes::from(decoded))
710}
711
712fn decode_zstd(data: &[u8]) -> Result<Bytes> {
713 zstd::stream::decode_all(data)
714 .map(Bytes::from)
715 .map_err(|e| Error::Decompression(format!("zstd: {}", e)))
716}