1use crate::error::{Error, Result};
4use crate::headers::Headers;
5use crate::url::Url;
6use bytes::{Bytes, BytesMut};
7use http::StatusCode;
8use http_body::{Body as HttpBody, Frame, SizeHint};
9use std::fmt;
10use std::future::Future;
11use std::io::Read;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15pub struct Body {
27 inner: BodyInner,
28}
29
30enum BodyInner {
31 Empty,
32 Buffered(Option<Bytes>),
33 H1(crate::transport::h1::H1Body),
34 H2(crate::transport::h2::H2Body),
35 H2Direct(Box<crate::transport::h2::H2DirectBody>),
36 H3(crate::transport::h3::H3Body),
37}
38
39#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40pub enum BodyCapacityProtocol {
41 Empty,
42 Buffered,
43 H1,
44 H2,
45 H2Direct,
46 H3,
47}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq)]
50pub struct BodyCapacity {
51 pub protocol: BodyCapacityProtocol,
52 pub buffer_capacity: usize,
53 pub buffered_chunks: usize,
54 pub available_slots: usize,
55 pub buffered_bytes: usize,
56 pub closed: bool,
57 pub ended: bool,
58}
59
60impl Body {
61 pub fn empty() -> Self {
63 Self {
64 inner: BodyInner::Empty,
65 }
66 }
67
68 pub fn from_bytes(bytes: impl Into<Bytes>) -> Self {
71 let bytes = bytes.into();
72 if bytes.is_empty() {
73 Self::empty()
74 } else {
75 Self {
76 inner: BodyInner::Buffered(Some(bytes)),
77 }
78 }
79 }
80
81 pub(crate) fn from_h1(body: crate::transport::h1::H1Body) -> Self {
83 Self {
84 inner: BodyInner::H1(body),
85 }
86 }
87
88 pub(crate) fn from_h2(body: crate::transport::h2::H2Body) -> Self {
90 Self {
91 inner: BodyInner::H2(body),
92 }
93 }
94
95 pub(crate) fn from_h2_direct(body: crate::transport::h2::H2DirectBody) -> Self {
97 Self {
98 inner: BodyInner::H2Direct(Box::new(body)),
99 }
100 }
101
102 pub(crate) fn from_h3(body: crate::transport::h3::H3Body) -> Self {
104 Self {
105 inner: BodyInner::H3(body),
106 }
107 }
108
109 pub fn is_empty(&self) -> bool {
112 match &self.inner {
113 BodyInner::Empty => true,
114 BodyInner::Buffered(Some(b)) => b.is_empty(),
115 BodyInner::Buffered(None) => true,
116 BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_) => {
117 false
118 }
119 }
120 }
121
122 pub fn is_streaming(&self) -> bool {
124 matches!(
125 self.inner,
126 BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_)
127 )
128 }
129
130 pub fn as_bytes(&self) -> Option<&Bytes> {
133 match &self.inner {
134 BodyInner::Buffered(Some(b)) => Some(b),
135 _ => None,
136 }
137 }
138
139 pub fn buffered_len(&self) -> Option<usize> {
141 match &self.inner {
142 BodyInner::Empty => Some(0),
143 BodyInner::Buffered(Some(b)) => Some(b.len()),
144 BodyInner::Buffered(None) => Some(0),
145 BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_) => None,
146 }
147 }
148
149 pub fn h3_capacity(&self) -> Option<crate::transport::h3::H3BodyCapacity> {
152 match &self.inner {
153 BodyInner::H3(body) => Some(body.capacity()),
154 _ => None,
155 }
156 }
157
158 pub fn capacity(&self) -> BodyCapacity {
165 match &self.inner {
166 BodyInner::Empty => BodyCapacity {
167 protocol: BodyCapacityProtocol::Empty,
168 buffer_capacity: 0,
169 buffered_chunks: 0,
170 available_slots: 0,
171 buffered_bytes: 0,
172 closed: false,
173 ended: true,
174 },
175 BodyInner::Buffered(bytes) => {
176 let buffered_bytes = bytes.as_ref().map(Bytes::len).unwrap_or(0);
177 BodyCapacity {
178 protocol: BodyCapacityProtocol::Buffered,
179 buffer_capacity: usize::from(buffered_bytes > 0),
180 buffered_chunks: usize::from(buffered_bytes > 0),
181 available_slots: usize::from(buffered_bytes == 0),
182 buffered_bytes,
183 closed: false,
184 ended: true,
185 }
186 }
187 BodyInner::H1(_) => BodyCapacity {
188 protocol: BodyCapacityProtocol::H1,
189 buffer_capacity: 0,
190 buffered_chunks: 0,
191 available_slots: 0,
192 buffered_bytes: 0,
193 closed: false,
194 ended: false,
195 },
196 BodyInner::H2(body) => {
197 let capacity = body.capacity();
198 BodyCapacity {
199 protocol: BodyCapacityProtocol::H2,
200 buffer_capacity: capacity.buffer_capacity,
201 buffered_chunks: capacity.buffered_chunks,
202 available_slots: capacity.available_slots,
203 buffered_bytes: capacity.buffered_bytes,
204 closed: capacity.closed,
205 ended: capacity.ended,
206 }
207 }
208 BodyInner::H2Direct(_) => BodyCapacity {
209 protocol: BodyCapacityProtocol::H2Direct,
210 buffer_capacity: 0,
211 buffered_chunks: 0,
212 available_slots: 0,
213 buffered_bytes: 0,
214 closed: false,
215 ended: false,
216 },
217 BodyInner::H3(body) => {
218 let capacity = body.capacity();
219 BodyCapacity {
220 protocol: BodyCapacityProtocol::H3,
221 buffer_capacity: capacity.buffer_capacity,
222 buffered_chunks: capacity.buffered_chunks,
223 available_slots: capacity.available_slots,
224 buffered_bytes: capacity.buffered_bytes,
225 closed: capacity.closed,
226 ended: capacity.ended,
227 }
228 }
229 }
230 }
231
232 pub fn len(&self) -> usize {
236 self.buffered_len().unwrap_or(0)
237 }
238
239 pub fn frame(&mut self) -> FrameFuture<'_> {
241 FrameFuture { body: self }
242 }
243
244 #[inline(always)]
246 pub fn chunk(&mut self) -> ChunkFuture<'_> {
247 ChunkFuture { body: self }
248 }
249
250 pub async fn collect_to_bytes(&mut self) -> Result<Bytes> {
256 let mut buf = BytesMut::new();
257 while let Some(frame) = self.frame().await {
258 let frame = frame?;
259 if let Ok(data) = frame.into_data() {
260 buf.extend_from_slice(&data);
261 }
262 }
263 Ok(buf.freeze())
264 }
265}
266
267impl Default for Body {
268 fn default() -> Self {
269 Self::empty()
270 }
271}
272
273impl fmt::Debug for Body {
274 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
275 match &self.inner {
276 BodyInner::Empty => f.debug_struct("Body::Empty").finish(),
277 BodyInner::Buffered(Some(b)) => f
278 .debug_struct("Body::Buffered")
279 .field("len", &b.len())
280 .finish(),
281 BodyInner::Buffered(None) => f.debug_struct("Body::Buffered").field("len", &0).finish(),
282 BodyInner::H1(_) => f.debug_struct("Body::H1Streaming").finish(),
283 BodyInner::H2(_) => f.debug_struct("Body::H2Streaming").finish(),
284 BodyInner::H2Direct(_) => f.debug_struct("Body::H2DirectStreaming").finish(),
285 BodyInner::H3(_) => f.debug_struct("Body::H3Streaming").finish(),
286 }
287 }
288}
289
290impl Clone for Body {
291 fn clone(&self) -> Self {
292 match &self.inner {
293 BodyInner::Empty => Self::empty(),
294 BodyInner::Buffered(Some(b)) => Self {
295 inner: BodyInner::Buffered(Some(b.clone())),
296 },
297 BodyInner::Buffered(None) => Self {
298 inner: BodyInner::Buffered(None),
299 },
300 BodyInner::H1(_) | BodyInner::H2(_) | BodyInner::H2Direct(_) | BodyInner::H3(_) => {
301 panic!("specter::Body::clone is not supported for streaming bodies")
302 }
303 }
304 }
305}
306
307impl From<Bytes> for Body {
308 fn from(value: Bytes) -> Self {
309 Self::from_bytes(value)
310 }
311}
312
313impl HttpBody for Body {
314 type Data = Bytes;
315 type Error = Error;
316
317 fn poll_frame(
318 mut self: Pin<&mut Self>,
319 cx: &mut Context<'_>,
320 ) -> Poll<Option<std::result::Result<Frame<Self::Data>, Self::Error>>> {
321 match &mut self.inner {
322 BodyInner::Empty => Poll::Ready(None),
323 BodyInner::Buffered(slot) => match slot.take() {
324 Some(bytes) if !bytes.is_empty() => Poll::Ready(Some(Ok(Frame::data(bytes)))),
325 _ => Poll::Ready(None),
326 },
327 BodyInner::H1(body) => Pin::new(body).poll_frame(cx),
328 BodyInner::H2(body) => Pin::new(body).poll_frame(cx),
329 BodyInner::H2Direct(body) => Pin::new(body.as_mut()).poll_frame(cx),
330 BodyInner::H3(body) => Pin::new(body).poll_frame(cx),
331 }
332 }
333
334 fn is_end_stream(&self) -> bool {
335 match &self.inner {
336 BodyInner::Empty => true,
337 BodyInner::Buffered(None) => true,
338 BodyInner::Buffered(Some(b)) => b.is_empty(),
339 BodyInner::H1(body) => body.is_terminal(),
340 BodyInner::H2(body) => body.is_terminal(),
341 BodyInner::H2Direct(body) => body.is_terminal(),
342 BodyInner::H3(body) => body.is_terminal(),
343 }
344 }
345
346 fn size_hint(&self) -> SizeHint {
347 match &self.inner {
348 BodyInner::Empty => SizeHint::with_exact(0),
349 BodyInner::Buffered(Some(b)) => SizeHint::with_exact(b.len() as u64),
350 BodyInner::Buffered(None) => SizeHint::with_exact(0),
351 BodyInner::H1(body) => body.size_hint(),
352 BodyInner::H2(body) => body.size_hint(),
353 BodyInner::H2Direct(body) => body.size_hint(),
354 BodyInner::H3(body) => body.size_hint(),
355 }
356 }
357}
358
359impl Body {
360 #[inline(always)]
361 fn poll_chunk(
362 mut self: Pin<&mut Self>,
363 cx: &mut Context<'_>,
364 ) -> Poll<Option<std::result::Result<Bytes, Error>>> {
365 match &mut self.inner {
366 BodyInner::Empty => Poll::Ready(None),
367 BodyInner::Buffered(slot) => match slot.take() {
368 Some(bytes) if !bytes.is_empty() => Poll::Ready(Some(Ok(bytes))),
369 _ => Poll::Ready(None),
370 },
371 BodyInner::H2(body) => Pin::new(body).poll_data_coalesced(cx),
372 BodyInner::H2Direct(body) => Pin::new(body.as_mut()).poll_data(cx),
373 BodyInner::H1(body) => match Pin::new(body).poll_frame(cx) {
374 Poll::Ready(Some(Ok(frame))) => match frame.into_data() {
375 Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
376 Err(_) => Poll::Pending,
377 },
378 Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error))),
379 Poll::Ready(None) => Poll::Ready(None),
380 Poll::Pending => Poll::Pending,
381 },
382 BodyInner::H3(body) => match Pin::new(body).poll_frame(cx) {
383 Poll::Ready(Some(Ok(frame))) => match frame.into_data() {
384 Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
385 Err(_) => Poll::Pending,
386 },
387 Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error))),
388 Poll::Ready(None) => Poll::Ready(None),
389 Poll::Pending => Poll::Pending,
390 },
391 }
392 }
393}
394
395pub struct FrameFuture<'a> {
397 body: &'a mut Body,
398}
399
400impl<'a> Future for FrameFuture<'a> {
401 type Output = Option<std::result::Result<Frame<Bytes>, Error>>;
402
403 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
404 let body = &mut *self.get_mut().body;
405 match Pin::new(body).poll_frame(cx) {
406 Poll::Pending => Poll::Pending,
407 Poll::Ready(value) => Poll::Ready(value),
408 }
409 }
410}
411
412pub struct ChunkFuture<'a> {
414 body: &'a mut Body,
415}
416
417impl<'a> Future for ChunkFuture<'a> {
418 type Output = Option<std::result::Result<Bytes, Error>>;
419
420 #[inline(always)]
421 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
422 let body = &mut *self.get_mut().body;
423 Pin::new(body).poll_chunk(cx)
424 }
425}
426
427#[derive(Debug, Clone)]
429pub struct Response {
430 pub(crate) status: u16,
431 headers: Headers,
432 body: Body,
433 http_version: String,
434 effective_url: Option<Url>,
435}
436
437impl Response {
438 pub fn new(status: u16, headers: Headers, body: Bytes, http_version: String) -> Self {
442 Self {
443 status,
444 headers,
445 body: Body::from_bytes(body),
446 http_version,
447 effective_url: None,
448 }
449 }
450
451 pub fn with_body(status: u16, headers: Headers, body: Body, http_version: String) -> Self {
454 Self {
455 status,
456 headers,
457 body,
458 http_version,
459 effective_url: None,
460 }
461 }
462
463 pub(crate) fn into_status_headers_version(self) -> (u16, Headers, String) {
464 (self.status, self.headers, self.http_version)
465 }
466
467 pub fn with_url(mut self, url: Url) -> Self {
469 self.effective_url = Some(url);
470 self
471 }
472
473 pub fn http_version(&self) -> &str {
474 &self.http_version
475 }
476
477 pub fn status(&self) -> StatusCode {
478 StatusCode::from_u16(self.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
479 }
480
481 pub fn status_code(&self) -> u16 {
482 self.status
483 }
484
485 pub fn headers(&self) -> &Headers {
486 &self.headers
487 }
488
489 pub fn url(&self) -> Option<&Url> {
490 self.effective_url.as_ref()
491 }
492
493 pub fn body(&self) -> &Body {
495 &self.body
496 }
497
498 pub fn body_mut(&mut self) -> &mut Body {
501 &mut self.body
502 }
503
504 pub fn into_body(self) -> Body {
506 self.body
507 }
508
509 pub fn buffered_bytes(&self) -> Option<&Bytes> {
513 self.body.as_bytes()
514 }
515
516 pub fn bytes_raw(&self) -> Result<Bytes> {
517 self.body
518 .as_bytes()
519 .cloned()
520 .ok_or_else(|| Error::HttpProtocol("response body is streaming, not buffered".into()))
521 }
522
523 pub fn bytes(&self) -> Result<Bytes> {
524 self.decoded_body()
525 }
526
527 pub fn is_success(&self) -> bool {
528 (200..300).contains(&self.status)
529 }
530 pub fn is_redirect(&self) -> bool {
531 (300..400).contains(&self.status)
532 }
533 pub fn redirect_url(&self) -> Option<&str> {
534 self.get_header("Location")
535 }
536
537 pub fn get_header(&self, name: &str) -> Option<&str> {
538 self.headers.get(name)
539 }
540
541 pub fn get_headers(&self, name: &str) -> Vec<&str> {
542 self.headers.get_all(name)
543 }
544
545 pub fn content_type(&self) -> Option<&str> {
546 self.get_header("Content-Type")
547 }
548 pub fn content_encoding(&self) -> Option<&str> {
549 self.get_header("Content-Encoding")
550 }
551
552 pub fn decoded_body(&self) -> Result<Bytes> {
557 let body = self.body.as_bytes().ok_or_else(|| {
558 Error::HttpProtocol("response body is streaming, not buffered".into())
559 })?;
560
561 let encodings: Vec<&str> = self
562 .content_encoding()
563 .map(|s| s.split(',').map(str::trim).collect())
564 .unwrap_or_default();
565
566 if !encodings.is_empty() {
567 let mut data = body.clone();
568 for encoding in encodings.iter().rev() {
569 data = match encoding.to_lowercase().as_str() {
570 "gzip" | "x-gzip" => decode_gzip(&data)?,
571 "deflate" => decode_deflate(&data)?,
572 "br" => decode_brotli(&data)?,
573 "zstd" => decode_zstd(&data)?,
574 "identity" => data,
575 _ => data,
576 };
577 }
578 return Ok(data);
579 }
580
581 if body.len() >= 4
582 && body[0] == 0x28
583 && body[1] == 0xB5
584 && body[2] == 0x2F
585 && body[3] == 0xFD
586 {
587 return decode_zstd(body);
588 }
589 if body.len() >= 2 && body[0] == 0x1f && body[1] == 0x8b {
590 return decode_gzip(body);
591 }
592
593 Ok(body.clone())
594 }
595
596 pub fn text(&self) -> Result<String> {
597 let decoded = self.decoded_body()?;
598 String::from_utf8(decoded.to_vec())
599 .map_err(|e| Error::Decompression(format!("UTF-8 decode error: {}", e)))
600 }
601
602 pub fn json<T: serde::de::DeserializeOwned>(&self) -> Result<T> {
603 let text = self.text()?;
604 serde_json::from_str(&text).map_err(Error::from)
605 }
606
607 pub fn error_for_status(self) -> Result<Self> {
608 if self.status().is_client_error() || self.status().is_server_error() {
609 let message = self
610 .status()
611 .canonical_reason()
612 .unwrap_or("HTTP error")
613 .to_string();
614 Err(Error::http_status(self.status, message))
615 } else {
616 Ok(self)
617 }
618 }
619
620 pub fn error_for_status_ref(&self) -> Result<&Self> {
621 if self.status().is_client_error() || self.status().is_server_error() {
622 let message = self
623 .status()
624 .canonical_reason()
625 .unwrap_or("HTTP error")
626 .to_string();
627 Err(Error::http_status(self.status, message))
628 } else {
629 Ok(self)
630 }
631 }
632}
633
634fn decode_gzip(data: &[u8]) -> Result<Bytes> {
635 let mut decoder = flate2::read::GzDecoder::new(data);
636 let mut decoded = Vec::new();
637 decoder
638 .read_to_end(&mut decoded)
639 .map_err(|e| Error::Decompression(format!("gzip: {}", e)))?;
640 Ok(Bytes::from(decoded))
641}
642
643fn decode_deflate(data: &[u8]) -> Result<Bytes> {
644 let mut decoded = Vec::new();
645 if flate2::read::ZlibDecoder::new(data)
646 .read_to_end(&mut decoded)
647 .is_ok()
648 {
649 return Ok(Bytes::from(decoded));
650 }
651 decoded.clear();
652 flate2::read::DeflateDecoder::new(data)
653 .read_to_end(&mut decoded)
654 .map_err(|e| Error::Decompression(format!("deflate: {}", e)))?;
655 Ok(Bytes::from(decoded))
656}
657
658fn decode_brotli(data: &[u8]) -> Result<Bytes> {
659 let mut decoder = brotli::Decompressor::new(data, 4096);
660 let mut decoded = Vec::new();
661 decoder
662 .read_to_end(&mut decoded)
663 .map_err(|e| Error::Decompression(format!("brotli: {}", e)))?;
664 Ok(Bytes::from(decoded))
665}
666
667fn decode_zstd(data: &[u8]) -> Result<Bytes> {
668 zstd::stream::decode_all(data)
669 .map(Bytes::from)
670 .map_err(|e| Error::Decompression(format!("zstd: {}", e)))
671}