use std::convert::Infallible;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::future;
use std::mem;
use std::ops::Deref;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::LazyLock;
use std::task::Context;
use std::task::Poll;
use bytes::Bytes;
use futures::Future;
use futures::TryStreamExt;
use http::Request;
use http::Response;
use http_body::Frame;
use http_body::SizeHint;
use raw::oio::Read;
use super::HttpBody;
use super::parse_content_encoding;
use super::parse_content_length;
use crate::raw::*;
use crate::*;
#[allow(dead_code)]
pub static GLOBAL_REQWEST_CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);
pub type HttpFetcher = Arc<dyn HttpFetchDyn>;
#[derive(Clone)]
pub struct HttpClient {
fetcher: HttpFetcher,
}
#[derive(Clone)]
pub struct AccessorInfoHttpSend {
info: Arc<AccessorInfo>,
}
impl AccessorInfoHttpSend {
pub fn new(info: Arc<AccessorInfo>) -> Self {
Self { info }
}
}
impl Debug for HttpClient {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HttpClient").finish()
}
}
impl Debug for AccessorInfoHttpSend {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AccessorInfoHttpSend").finish()
}
}
impl Default for HttpClient {
fn default() -> Self {
Self {
fetcher: Arc::new(GLOBAL_REQWEST_CLIENT.clone()),
}
}
}
impl HttpClient {
pub fn new() -> Result<Self> {
Ok(Self::default())
}
pub fn with(client: impl HttpFetch) -> Self {
let fetcher = Arc::new(client);
Self { fetcher }
}
pub fn into_inner(self) -> HttpFetcher {
self.fetcher
}
pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
let (parts, mut body) = self.fetch(req).await?.into_parts();
let buffer = body.read_all().await?;
Ok(Response::from_parts(parts, buffer))
}
pub async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
self.fetcher.fetch(req).await
}
}
impl reqsign_core::HttpSend for HttpClient {
async fn http_send(&self, req: Request<Bytes>) -> reqsign_core::Result<Response<Bytes>> {
let req = req.map(Buffer::from);
let resp = self.send(req).await.map_err(|err| {
let retryable = err.is_temporary();
reqsign_core::Error::unexpected("send request via OpenDAL HttpClient")
.with_source(err)
.set_retryable(retryable)
})?;
let (parts, body) = resp.into_parts();
Ok(Response::from_parts(parts, body.to_bytes()))
}
}
impl reqsign_core::HttpSend for AccessorInfoHttpSend {
async fn http_send(&self, req: Request<Bytes>) -> reqsign_core::Result<Response<Bytes>> {
let client = self.info.http_client();
reqsign_core::HttpSend::http_send(&client, req).await
}
}
pub trait HttpFetch: Send + Sync + Unpin + 'static {
fn fetch(
&self,
req: Request<Buffer>,
) -> impl Future<Output = Result<Response<HttpBody>>> + MaybeSend;
}
pub trait HttpFetchDyn: Send + Sync + Unpin + 'static {
fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<'_, Result<Response<HttpBody>>>;
}
impl<T: HttpFetch + ?Sized> HttpFetchDyn for T {
fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<'_, Result<Response<HttpBody>>> {
Box::pin(self.fetch(req))
}
}
impl<T: HttpFetchDyn + ?Sized> HttpFetch for Arc<T> {
async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
self.deref().fetch_dyn(req).await
}
}
impl HttpFetch for reqwest::Client {
async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
let uri = req.uri().clone();
let is_head = req.method() == http::Method::HEAD;
let (parts, body) = req.into_parts();
let url = reqwest::Url::from_str(&uri.to_string()).map_err(|err| {
Error::new(ErrorKind::Unexpected, "request url is invalid")
.with_operation("http_util::Client::send::fetch")
.with_context("url", uri.to_string())
.set_source(err)
})?;
let mut req_builder = self.request(parts.method, url).headers(parts.headers);
#[cfg(not(target_arch = "wasm32"))]
{
req_builder = req_builder.version(parts.version);
}
if !body.is_empty() {
#[cfg(not(target_arch = "wasm32"))]
{
req_builder = req_builder.body(reqwest::Body::wrap(HttpBufferBody(body)))
}
#[cfg(target_arch = "wasm32")]
{
req_builder = req_builder.body(reqwest::Body::from(body.to_bytes()))
}
}
let mut resp = req_builder.send().await.map_err(|err| {
Error::new(ErrorKind::Unexpected, "send http request")
.with_operation("http_util::Client::send")
.with_context("url", uri.to_string())
.with_temporary(is_temporary_error(&err))
.set_source(err)
})?;
let content_length = if is_head || parse_content_encoding(resp.headers())?.is_some() {
None
} else {
parse_content_length(resp.headers())?
};
let mut hr = Response::builder()
.status(resp.status())
.extension(uri.clone());
#[cfg(not(target_arch = "wasm32"))]
{
hr = hr.version(resp.version());
}
mem::swap(hr.headers_mut().unwrap(), resp.headers_mut());
let bs = HttpBody::new(
resp.bytes_stream()
.try_filter(|v| future::ready(!v.is_empty()))
.map_ok(Buffer::from)
.map_err(move |err| {
Error::new(ErrorKind::Unexpected, "read data from http response")
.with_operation("http_util::Client::send")
.with_context("url", uri.to_string())
.with_temporary(is_temporary_error(&err))
.set_source(err)
}),
content_length,
);
let resp = hr.body(bs).expect("response must build succeed");
Ok(resp)
}
}
#[inline]
fn is_temporary_error(err: &reqwest::Error) -> bool {
err.is_request()||
err.is_body() ||
err.is_decode()
}
struct HttpBufferBody(Buffer);
impl http_body::Body for HttpBufferBody {
type Data = Bytes;
type Error = Infallible;
fn poll_frame(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.0.next() {
Some(bs) => Poll::Ready(Some(Ok(Frame::data(bs)))),
None => Poll::Ready(None),
}
}
fn is_end_stream(&self) -> bool {
self.0.is_empty()
}
fn size_hint(&self) -> SizeHint {
SizeHint::with_exact(self.0.len() as u64)
}
}