use crate::ClientOptions;
use crate::client::builder::{HttpRequestBuilder, RequestBuilderError};
use crate::client::{HttpRequest, HttpResponse, HttpResponseBody};
use async_trait::async_trait;
use http::{Method, Uri};
use http_body_util::BodyExt;
use std::error::Error;
use std::sync::Arc;
use tokio::runtime::Handle;
#[derive(Debug, thiserror::Error)]
#[error("HTTP error: {source}")]
pub struct HttpError {
kind: HttpErrorKind,
#[source]
source: Box<dyn Error + Send + Sync>,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum HttpErrorKind {
Connect,
Request,
Timeout,
Interrupted,
Decode,
Unknown,
}
impl HttpError {
pub fn new<E>(kind: HttpErrorKind, e: E) -> Self
where
E: Error + Send + Sync + 'static,
{
Self {
kind,
source: Box::new(e),
}
}
pub(crate) fn reqwest(e: reqwest::Error) -> Self {
#[cfg(not(target_arch = "wasm32"))]
let is_connect = || e.is_connect();
#[cfg(target_arch = "wasm32")]
let is_connect = || false;
let mut kind = if e.is_timeout() {
HttpErrorKind::Timeout
} else if is_connect() {
HttpErrorKind::Connect
} else if e.is_decode() {
HttpErrorKind::Decode
} else {
HttpErrorKind::Unknown
};
let mut source = e.source();
while kind == HttpErrorKind::Unknown {
if let Some(e) = source {
if let Some(e) = e.downcast_ref::<hyper::Error>() {
if e.is_closed() || e.is_incomplete_message() || e.is_body_write_aborted() {
kind = HttpErrorKind::Request;
} else if e.is_timeout() {
kind = HttpErrorKind::Timeout;
}
}
if let Some(e) = e.downcast_ref::<std::io::Error>() {
match e.kind() {
std::io::ErrorKind::TimedOut => kind = HttpErrorKind::Timeout,
std::io::ErrorKind::ConnectionAborted
| std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::UnexpectedEof => kind = HttpErrorKind::Interrupted,
_ => {}
}
}
source = e.source();
} else {
break;
}
}
Self {
kind,
source: Box::new(e.without_url()),
}
}
pub fn kind(&self) -> HttpErrorKind {
self.kind
}
}
#[async_trait]
pub trait HttpService: std::fmt::Debug + Send + Sync + 'static {
async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError>;
}
#[derive(Debug, Clone)]
pub struct HttpClient(Arc<dyn HttpService>);
impl HttpClient {
pub fn new(service: impl HttpService + 'static) -> Self {
Self(Arc::new(service))
}
pub async fn execute(&self, request: HttpRequest) -> Result<HttpResponse, HttpError> {
self.0.call(request).await
}
#[allow(unused)]
pub(crate) fn get<U>(&self, url: U) -> HttpRequestBuilder
where
U: TryInto<Uri>,
U::Error: Into<RequestBuilderError>,
{
self.request(Method::GET, url)
}
#[allow(unused)]
pub(crate) fn post<U>(&self, url: U) -> HttpRequestBuilder
where
U: TryInto<Uri>,
U::Error: Into<RequestBuilderError>,
{
self.request(Method::POST, url)
}
#[allow(unused)]
pub(crate) fn put<U>(&self, url: U) -> HttpRequestBuilder
where
U: TryInto<Uri>,
U::Error: Into<RequestBuilderError>,
{
self.request(Method::PUT, url)
}
#[allow(unused)]
pub(crate) fn delete<U>(&self, url: U) -> HttpRequestBuilder
where
U: TryInto<Uri>,
U::Error: Into<RequestBuilderError>,
{
self.request(Method::DELETE, url)
}
pub(crate) fn request<U>(&self, method: Method, url: U) -> HttpRequestBuilder
where
U: TryInto<Uri>,
U::Error: Into<RequestBuilderError>,
{
HttpRequestBuilder::new(self.clone())
.uri(url)
.method(method)
}
}
#[async_trait]
#[cfg(not(target_arch = "wasm32"))]
impl HttpService for reqwest::Client {
async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
let (parts, body) = req.into_parts();
let url = parts.uri.to_string().parse().unwrap();
let mut req = reqwest::Request::new(parts.method, url);
*req.headers_mut() = parts.headers;
*req.body_mut() = Some(body.into_reqwest());
let r = self.execute(req).await.map_err(HttpError::reqwest)?;
let res: http::Response<reqwest::Body> = r.into();
let (parts, body) = res.into_parts();
let body = HttpResponseBody::new(body.map_err(HttpError::reqwest));
Ok(HttpResponse::from_parts(parts, body))
}
}
#[async_trait]
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
impl HttpService for reqwest::Client {
async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
use futures_channel::{mpsc, oneshot};
use futures_util::{SinkExt, StreamExt, TryStreamExt};
use http_body_util::{Empty, StreamBody};
use wasm_bindgen_futures::spawn_local;
let (parts, body) = req.into_parts();
let url = parts.uri.to_string().parse().unwrap();
let mut req = reqwest::Request::new(parts.method, url);
*req.headers_mut() = parts.headers;
*req.body_mut() = Some(body.into_reqwest());
let (mut tx, rx) = mpsc::channel(1);
let (tx_parts, rx_parts) = oneshot::channel();
let res_fut = self.execute(req);
spawn_local(async move {
match res_fut.await.map_err(HttpError::reqwest) {
Err(err) => {
let _ = tx_parts.send(Err(err));
drop(tx);
}
Ok(res) => {
let (mut parts, _) = http::Response::new(Empty::<()>::new()).into_parts();
parts.headers = res.headers().clone();
parts.status = res.status();
let _ = tx_parts.send(Ok(parts));
let mut stream = res.bytes_stream().map_err(HttpError::reqwest);
while let Some(chunk) = stream.next().await {
if let Err(_e) = tx.send(chunk).await {
break;
}
}
}
}
});
let parts = rx_parts.await.unwrap()?;
let safe_stream = rx.map(|chunk| {
let frame = hyper::body::Frame::data(chunk?);
Ok(frame)
});
let body = HttpResponseBody::new(StreamBody::new(safe_stream));
Ok(HttpResponse::from_parts(parts, body))
}
}
pub trait HttpConnector: std::fmt::Debug + Send + Sync + 'static {
fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient>;
}
#[derive(Debug, Default)]
#[allow(missing_copy_implementations)]
#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
pub struct ReqwestConnector {}
#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
impl HttpConnector for ReqwestConnector {
fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient> {
let client = options.client()?;
Ok(HttpClient::new(client))
}
}
#[derive(Debug)]
#[allow(missing_copy_implementations)]
#[cfg(not(target_arch = "wasm32"))]
pub struct SpawnedReqwestConnector {
runtime: Handle,
}
#[cfg(not(target_arch = "wasm32"))]
impl SpawnedReqwestConnector {
pub fn new(runtime: Handle) -> Self {
Self { runtime }
}
}
#[cfg(not(target_arch = "wasm32"))]
impl HttpConnector for SpawnedReqwestConnector {
fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient> {
let spawn_service = super::SpawnService::new(options.client()?, self.runtime.clone());
Ok(HttpClient::new(spawn_service))
}
}
#[cfg(all(target_arch = "wasm32", target_os = "wasi"))]
pub(crate) fn http_connector(
custom: Option<Arc<dyn HttpConnector>>,
) -> crate::Result<Arc<dyn HttpConnector>> {
match custom {
Some(x) => Ok(x),
None => Err(crate::Error::NotSupported {
source: "WASI architectures must provide an HTTPConnector"
.to_string()
.into(),
}),
}
}
#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
pub(crate) fn http_connector(
custom: Option<Arc<dyn HttpConnector>>,
) -> crate::Result<Arc<dyn HttpConnector>> {
match custom {
Some(x) => Ok(x),
None => Ok(Arc::new(ReqwestConnector {})),
}
}