opendal-core 0.56.0

Apache OpenDALâ„¢: One Layer, All Storage.
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use std::convert::Infallible;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::future;
use std::mem;
use std::ops::Deref;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::LazyLock;
use std::task::Context;
use std::task::Poll;

use bytes::Bytes;
use futures::Future;
use futures::TryStreamExt;
use http::Request;
use http::Response;
use http_body::Frame;
use http_body::SizeHint;
use raw::oio::Read;

use super::HttpBody;
use super::parse_content_encoding;
use super::parse_content_length;
use crate::raw::*;
use crate::*;

/// Http client used across opendal for loading credentials.
/// This is merely a temporary solution because reqsign requires a reqwest client to be passed.
/// We will remove it after the next major version of reqsign, which will enable users to provide their own client.
#[allow(dead_code)]
pub static GLOBAL_REQWEST_CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);

/// HttpFetcher is a type erased [`HttpFetch`].
pub type HttpFetcher = Arc<dyn HttpFetchDyn>;

/// An HTTP client instance for OpenDAL's services.
///
/// # Notes
///
/// * A http client must support redirections that follows 3xx response.
#[derive(Clone)]
pub struct HttpClient {
    fetcher: HttpFetcher,
}

/// A reqsign `HttpSend` implementation that always forwards requests to the
/// current http client stored inside [`AccessorInfo`].
#[derive(Clone)]
pub struct AccessorInfoHttpSend {
    info: Arc<AccessorInfo>,
}

impl AccessorInfoHttpSend {
    /// Create a new [`AccessorInfoHttpSend`].
    pub fn new(info: Arc<AccessorInfo>) -> Self {
        Self { info }
    }
}

/// We don't want users to know details about our clients.
impl Debug for HttpClient {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("HttpClient").finish()
    }
}

impl Debug for AccessorInfoHttpSend {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("AccessorInfoHttpSend").finish()
    }
}

impl Default for HttpClient {
    fn default() -> Self {
        Self {
            fetcher: Arc::new(GLOBAL_REQWEST_CLIENT.clone()),
        }
    }
}

impl HttpClient {
    /// Create a new http client in async context.
    pub fn new() -> Result<Self> {
        Ok(Self::default())
    }

    /// Construct `Self` with given [`reqwest::Client`]
    pub fn with(client: impl HttpFetch) -> Self {
        let fetcher = Arc::new(client);
        Self { fetcher }
    }

    /// Get the inner http client.
    pub fn into_inner(self) -> HttpFetcher {
        self.fetcher
    }

    /// Send a request and consume response.
    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
        let (parts, mut body) = self.fetch(req).await?.into_parts();
        let buffer = body.read_all().await?;
        Ok(Response::from_parts(parts, buffer))
    }

    /// Fetch a request and return a streamable [`HttpBody`].
    ///
    /// Services can use [`HttpBody`] as [`Access::Read`].
    pub async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
        self.fetcher.fetch(req).await
    }
}

impl reqsign_core::HttpSend for HttpClient {
    async fn http_send(&self, req: Request<Bytes>) -> reqsign_core::Result<Response<Bytes>> {
        let req = req.map(Buffer::from);
        let resp = self.send(req).await.map_err(|err| {
            let retryable = err.is_temporary();
            reqsign_core::Error::unexpected("send request via OpenDAL HttpClient")
                .with_source(err)
                .set_retryable(retryable)
        })?;

        let (parts, body) = resp.into_parts();
        Ok(Response::from_parts(parts, body.to_bytes()))
    }
}

impl reqsign_core::HttpSend for AccessorInfoHttpSend {
    async fn http_send(&self, req: Request<Bytes>) -> reqsign_core::Result<Response<Bytes>> {
        let client = self.info.http_client();
        reqsign_core::HttpSend::http_send(&client, req).await
    }
}

/// HttpFetch is the trait to fetch a request in async way.
/// User should implement this trait to provide their own http client.
pub trait HttpFetch: Send + Sync + Unpin + 'static {
    /// Fetch a request in async way.
    fn fetch(
        &self,
        req: Request<Buffer>,
    ) -> impl Future<Output = Result<Response<HttpBody>>> + MaybeSend;
}

/// HttpFetchDyn is the dyn version of [`HttpFetch`]
/// which make it possible to use as `Arc<dyn HttpFetchDyn>`.
/// User should never implement this trait, but use `HttpFetch` instead.
pub trait HttpFetchDyn: Send + Sync + Unpin + 'static {
    /// The dyn version of [`HttpFetch::fetch`].
    ///
    /// This function returns a boxed future to make it object safe.
    fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<'_, Result<Response<HttpBody>>>;
}

impl<T: HttpFetch + ?Sized> HttpFetchDyn for T {
    fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<'_, Result<Response<HttpBody>>> {
        Box::pin(self.fetch(req))
    }
}

impl<T: HttpFetchDyn + ?Sized> HttpFetch for Arc<T> {
    async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
        self.deref().fetch_dyn(req).await
    }
}

impl HttpFetch for reqwest::Client {
    async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
        // Uri stores all string alike data in `Bytes` which means
        // the clone here is cheap.
        let uri = req.uri().clone();
        let is_head = req.method() == http::Method::HEAD;

        let (parts, body) = req.into_parts();

        let url = reqwest::Url::from_str(&uri.to_string()).map_err(|err| {
            Error::new(ErrorKind::Unexpected, "request url is invalid")
                .with_operation("http_util::Client::send::fetch")
                .with_context("url", uri.to_string())
                .set_source(err)
        })?;

        let mut req_builder = self.request(parts.method, url).headers(parts.headers);

        // Client under wasm doesn't support set version.
        #[cfg(not(target_arch = "wasm32"))]
        {
            req_builder = req_builder.version(parts.version);
        }

        // Don't set body if body is empty.
        if !body.is_empty() {
            #[cfg(not(target_arch = "wasm32"))]
            {
                req_builder = req_builder.body(reqwest::Body::wrap(HttpBufferBody(body)))
            }
            #[cfg(target_arch = "wasm32")]
            {
                req_builder = req_builder.body(reqwest::Body::from(body.to_bytes()))
            }
        }

        let mut resp = req_builder.send().await.map_err(|err| {
            Error::new(ErrorKind::Unexpected, "send http request")
                .with_operation("http_util::Client::send")
                .with_context("url", uri.to_string())
                .with_temporary(is_temporary_error(&err))
                .set_source(err)
        })?;

        // Get content length from header so that we can check it.
        //
        // - If the request method is HEAD, we will ignore content length.
        // - If response contains content_encoding, we should omit its content length.
        let content_length = if is_head || parse_content_encoding(resp.headers())?.is_some() {
            None
        } else {
            parse_content_length(resp.headers())?
        };

        let mut hr = Response::builder()
            .status(resp.status())
            // Insert uri into response extension so that we can fetch
            // it later.
            .extension(uri.clone());

        // Response builder under wasm doesn't support set version.
        #[cfg(not(target_arch = "wasm32"))]
        {
            hr = hr.version(resp.version());
        }

        // Swap headers directly instead of copy the entire map.
        mem::swap(hr.headers_mut().unwrap(), resp.headers_mut());

        let bs = HttpBody::new(
            resp.bytes_stream()
                .try_filter(|v| future::ready(!v.is_empty()))
                .map_ok(Buffer::from)
                .map_err(move |err| {
                    Error::new(ErrorKind::Unexpected, "read data from http response")
                        .with_operation("http_util::Client::send")
                        .with_context("url", uri.to_string())
                        .with_temporary(is_temporary_error(&err))
                        .set_source(err)
                }),
            content_length,
        );

        let resp = hr.body(bs).expect("response must build succeed");
        Ok(resp)
    }
}

#[inline]
fn is_temporary_error(err: &reqwest::Error) -> bool {
    // error sending request
    err.is_request()||
    // request or response body error
    err.is_body() ||
    // error decoding response body, for example, connection reset.
    err.is_decode()
}

struct HttpBufferBody(Buffer);

impl http_body::Body for HttpBufferBody {
    type Data = Bytes;
    type Error = Infallible;

    fn poll_frame(
        mut self: Pin<&mut Self>,
        _: &mut Context<'_>,
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
        match self.0.next() {
            Some(bs) => Poll::Ready(Some(Ok(Frame::data(bs)))),
            None => Poll::Ready(None),
        }
    }

    fn is_end_stream(&self) -> bool {
        self.0.is_empty()
    }

    fn size_hint(&self) -> SizeHint {
        SizeHint::with_exact(self.0.len() as u64)
    }
}