#[cfg(feature = "reqwest-client")]
use alloc::string::ToString;
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::fmt::Display;
use core::future::Future;
#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
pub trait HttpClient {
type Error: core::error::Error + Display + Send + Sync + 'static;
fn send_http(
&self,
request: http::Request<Vec<u8>>,
) -> impl Future<Output = core::result::Result<http::Response<Vec<u8>>, Self::Error>>;
}
#[cfg(feature = "streaming")]
use crate::stream::{ByteStream, StreamError};
#[cfg(feature = "streaming")]
#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
pub trait HttpClientExt: HttpClient {
fn send_http_streaming(
&self,
request: http::Request<Vec<u8>>,
) -> impl Future<Output = Result<http::Response<ByteStream>, Self::Error>>;
#[cfg(not(target_arch = "wasm32"))]
fn send_http_bidirectional<S>(
&self,
parts: http::request::Parts,
body: S,
) -> impl Future<Output = Result<http::Response<ByteStream>, Self::Error>>
where
S: n0_future::Stream<Item = Result<bytes::Bytes, StreamError>> + Send + 'static;
#[cfg(target_arch = "wasm32")]
fn send_http_bidirectional<S>(
&self,
parts: http::request::Parts,
body: S,
) -> impl Future<Output = Result<http::Response<ByteStream>, Self::Error>>
where
S: n0_future::Stream<Item = Result<bytes::Bytes, StreamError>> + 'static;
}
#[cfg(feature = "reqwest-client")]
impl HttpClient for reqwest::Client {
type Error = reqwest::Error;
async fn send_http(
&self,
request: http::Request<Vec<u8>>,
) -> core::result::Result<http::Response<Vec<u8>>, Self::Error> {
let (parts, body) = request.into_parts();
let mut req = self.request(parts.method, parts.uri.to_string()).body(body);
for (name, value) in parts.headers.iter() {
req = req.header(name.as_str(), value.as_bytes());
}
let resp = req.send().await?;
let mut builder = http::Response::builder().status(resp.status());
for (name, value) in resp.headers().iter() {
builder = builder.header(name.as_str(), value.as_bytes());
}
let body = resp.bytes().await?.to_vec();
Ok(builder.body(body).expect("Failed to build response"))
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<T: HttpClient + Sync> HttpClient for Arc<T> {
type Error = T::Error;
fn send_http(
&self,
request: http::Request<Vec<u8>>,
) -> impl Future<Output = core::result::Result<http::Response<Vec<u8>>, Self::Error>> + Send
{
self.as_ref().send_http(request)
}
}
#[cfg(target_arch = "wasm32")]
impl<T: HttpClient> HttpClient for Arc<T> {
type Error = T::Error;
fn send_http(
&self,
request: http::Request<Vec<u8>>,
) -> impl Future<Output = core::result::Result<http::Response<Vec<u8>>, Self::Error>> {
self.as_ref().send_http(request)
}
}
#[cfg(all(feature = "streaming", feature = "reqwest-client"))]
impl HttpClientExt for reqwest::Client {
async fn send_http_streaming(
&self,
request: http::Request<Vec<u8>>,
) -> Result<http::Response<ByteStream>, Self::Error> {
let (parts, body) = request.into_parts();
let mut req = self.request(parts.method, parts.uri.to_string()).body(body);
for (name, value) in parts.headers.iter() {
req = req.header(name.as_str(), value.as_bytes());
}
let resp = req.send().await?;
let mut builder = http::Response::builder().status(resp.status());
for (name, value) in resp.headers().iter() {
builder = builder.header(name.as_str(), value.as_bytes());
}
use futures::StreamExt;
let stream = resp
.bytes_stream()
.map(|result| result.map_err(|e| StreamError::transport(e)));
let byte_stream = ByteStream::new(stream);
Ok(builder.body(byte_stream).expect("Failed to build response"))
}
#[cfg(not(target_arch = "wasm32"))]
async fn send_http_bidirectional<S>(
&self,
parts: http::request::Parts,
body: S,
) -> Result<http::Response<ByteStream>, Self::Error>
where
S: n0_future::Stream<Item = Result<bytes::Bytes, StreamError>> + Send + 'static,
{
use futures::StreamExt;
let reqwest_body = reqwest::Body::wrap_stream(body);
let mut req = self
.request(parts.method, parts.uri.to_string())
.body(reqwest_body);
for (name, value) in parts.headers.iter() {
req = req.header(name.as_str(), value.as_bytes());
}
let resp = req.send().await?;
let mut builder = http::Response::builder().status(resp.status());
for (name, value) in resp.headers().iter() {
builder = builder.header(name.as_str(), value.as_bytes());
}
let stream = resp
.bytes_stream()
.map(|result| result.map_err(|e| StreamError::transport(e)));
let byte_stream = ByteStream::new(stream);
Ok(builder.body(byte_stream).expect("Failed to build response"))
}
#[cfg(target_arch = "wasm32")]
async fn send_http_bidirectional<S>(
&self,
_parts: http::request::Parts,
_body: S,
) -> Result<http::Response<ByteStream>, Self::Error>
where
S: n0_future::Stream<Item = Result<bytes::Bytes, StreamError>> + 'static,
{
unimplemented!("Bidirectional streaming not yet supported on WASM")
}
}