use std::fs::write;
use std::{fmt::Debug, sync::Arc};
use http::{header, HeaderName, HeaderValue, Response};
use http_body_util::BodyExt;
use http_body_util::combinators::BoxBody;
use hyper::{
body::{Bytes, Incoming},
upgrade::on,
};
use hyper_body_utils::HttpBody;
#[cfg(feature = "tokio-rt")]
use hyper_util::rt::TokioIo;
use log::error;
use serde::Deserialize;
#[cfg(feature = "smol-rt")]
use smol_hyper::rt::FuturesIo;
use crate::{Result, client::serde::ResponseBody, cookie::DeboaCookie, errors::{ConnectionError, DeboaError, IoError}};
use url::Url;
pub trait IntoBody {
fn into_body(self) -> HttpBody;
}
impl IntoBody for Incoming {
fn into_body(self) -> HttpBody {
HttpBody::Incoming(self)
}
}
impl IntoBody for &[u8] {
fn into_body(self) -> HttpBody {
HttpBody::from_bytes(self)
}
}
impl IntoBody for Vec<u8> {
fn into_body(self) -> HttpBody {
HttpBody::from_bytes(&self)
}
}
impl IntoBody for BoxBody<Bytes, std::io::Error> {
fn into_body(self) -> HttpBody {
HttpBody::Stream(self)
}
}
pub struct DeboaResponseBuilder {
url: Url,
inner: Response<HttpBody>,
}
impl DeboaResponseBuilder {
#[inline]
pub fn status(mut self, status: http::StatusCode) -> Self {
*self
.inner
.status_mut() = status;
self
}
#[inline]
pub fn headers(mut self, headers: http::HeaderMap) -> Self {
*self
.inner
.headers_mut() = headers;
self
}
#[inline]
pub fn header(mut self, name: HeaderName, value: &str) -> Self {
let header_value = HeaderValue::from_str(value);
if let Ok(header_value) = header_value {
self.inner
.headers_mut()
.insert(name, header_value);
}
self
}
#[inline]
pub fn body<B: IntoBody>(mut self, body: B) -> Self {
*self
.inner
.body_mut() = body.into_body();
self
}
#[inline]
pub fn build(self) -> DeboaResponse {
DeboaResponse { url: self.url.into(), inner: self.inner }
}
}
pub struct DeboaResponse {
url: Arc<Url>,
inner: Response<HttpBody>,
}
impl Debug for DeboaResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DeboaResponse")
.field("url", &self.url)
.field("status", &self.inner.status())
.field("headers", &self.inner.headers())
.finish()
}
}
impl AsRef<DeboaResponse> for DeboaResponse {
fn as_ref(&self) -> &DeboaResponse {
self
}
}
impl AsMut<DeboaResponse> for DeboaResponse {
fn as_mut(&mut self) -> &mut DeboaResponse {
self
}
}
impl DeboaResponse {
pub fn new(url: Arc<Url>, inner: Response<HttpBody>) -> Self {
Self { url, inner }
}
#[inline]
pub fn builder(url: Url) -> DeboaResponseBuilder {
DeboaResponseBuilder { url, inner: Response::new(HttpBody::from_bytes(&[])) }
}
#[inline]
pub fn url(&self) -> &Url {
&self.url
}
#[inline]
pub fn status(&self) -> http::StatusCode {
self.inner.status()
}
#[inline]
pub fn status_mut(&mut self) -> &mut http::StatusCode {
self.inner
.status_mut()
}
#[inline]
pub fn headers(&self) -> &http::HeaderMap {
self.inner.headers()
}
#[inline]
pub fn headers_mut(&mut self) -> &mut http::HeaderMap {
self.inner
.headers_mut()
}
#[inline]
fn header_value(&self, header: HeaderName) -> Result<String> {
let header_name = header.as_str();
let header_value = self
.headers()
.get(header_name);
if header_value.is_none() {
error!("Header {} is missing", header_name);
return Err(DeboaError::Header { message: "Header is missing".to_string() });
}
let header_value = header_value.unwrap();
let header_value = header_value.to_str();
if let Err(e) = header_value {
error!("Failed to read {}:: {}", header_name, e);
return Err(DeboaError::Header {
message: format!("Failed to read {}:: {}", header_name, e),
});
}
Ok(header_value
.unwrap()
.to_string())
}
#[inline]
pub fn content_length(&self) -> Result<u64> {
let header = self.header_value(header::CONTENT_LENGTH)?;
let header = header.parse::<u64>();
if let Err(e) = header {
error!("Failed to parse content-length: {}", e);
return Err(DeboaError::Header {
message: format!("Failed to parse content-length: {}", e),
});
}
Ok(header.unwrap())
}
#[inline]
pub fn content_type(&self) -> Result<String> {
let header = self.header_value(header::CONTENT_TYPE)?;
Ok(header)
}
#[inline]
pub fn cookies(&self) -> Result<Option<Vec<DeboaCookie>>> {
let view = self
.headers()
.get_all(header::SET_COOKIE);
let cookies = view
.into_iter()
.map(|cookie| {
let cookie = cookie.to_str();
if let Ok(cookie) = cookie {
DeboaCookie::parse_from_header(cookie)
} else {
error!("Invalid cookie header");
Err(DeboaError::Cookie { message: "Invalid cookie header".to_string() })
}
})
.collect::<Result<Vec<DeboaCookie>>>()
.unwrap();
if cookies.is_empty() {
Ok(None)
} else {
Ok(Some(cookies))
}
}
#[inline]
pub fn inner_body(self) -> HttpBody {
self.inner
.into_body()
}
#[inline]
pub async fn bytes(self) -> Vec<u8> {
let mut data = Vec::<u8>::new();
let bytes = self
.inner_body()
.collect()
.await;
match bytes {
Ok(bytes) => data.extend_from_slice(&bytes.to_bytes()),
Err(e) => {
error!("Failed to collect response body: {}", e);
}
}
data
}
#[inline]
pub fn stream(self) -> HttpBody {
self.inner
.into_body()
}
#[inline]
pub async fn body_as<T: ResponseBody, B: for<'a> Deserialize<'a>>(
self,
body_type: T,
) -> Result<B> {
let bytes = self.bytes().await;
let result = body_type.deserialize::<B>(bytes)?;
Ok(result)
}
#[inline]
pub async fn text(self) -> Result<String> {
let body = self.bytes().await;
Ok(String::from_utf8_lossy(&body).to_string())
}
#[inline]
pub async fn to_file(self, path: &str) -> Result<()> {
let body = self.bytes().await;
let result = write(path, body);
if let Err(e) = result {
error!("Failed to write file: {}", e);
return Err(DeboaError::Io(IoError::File { message: e.to_string() }));
}
Ok(())
}
pub fn into_parts(self) -> (http::response::Parts, HttpBody) {
let (parts, body) = self
.inner
.into_parts();
(parts, body)
}
#[cfg(feature = "tokio-rt")]
#[inline]
pub async fn upgrade(self) -> Result<hyper_util::rt::TokioIo<hyper::upgrade::Upgraded>> {
if self.inner.version() != http::Version::HTTP_11 {
error!("Upgrade is only supported for HTTP/1.1");
return Err(DeboaError::Connection(ConnectionError::Upgrade {
message: "Upgrade is only supported for HTTP/1.1".to_string(),
}));
}
let upgrade = on(self.inner).await;
if let Err(e) = upgrade {
error!("Failed to upgrade connection: {}", e);
return Err(DeboaError::Connection(ConnectionError::Upgrade {
message: e.to_string(),
}));
}
Ok(TokioIo::new(upgrade.unwrap()))
}
#[cfg(feature = "smol-rt")]
#[inline]
pub async fn upgrade(self) -> Result<FuturesIo<hyper::upgrade::Upgraded>> {
if self.inner.version() != http::Version::HTTP_11 {
error!("Upgrade is only supported for HTTP/1.1");
return Err(DeboaError::Connection(ConnectionError::Upgrade {
message: "Upgrade is only supported for HTTP/1.1".to_string(),
}));
}
let upgrade = on(self.inner).await;
if let Err(e) = upgrade {
error!("Failed to upgrade connection: {}", e);
return Err(DeboaError::Connection(ConnectionError::Upgrade {
message: e.to_string(),
}));
}
Ok(FuturesIo::new(upgrade.unwrap()))
}
}