ratmom 0.1.0

Sensible, async, curl-based HTTP client
//! The HTTP client implementation.

use crate::{
    agent::{self, AgentBuilder},
    body::{AsyncBody, Body},
    config::{
        client::ClientConfig,
        request::{RequestConfig, SetOpt, WithRequestConfig},
        *,
    },
    default_headers::DefaultHeadersInterceptor,
    error::{Error, ErrorKind},
    handler::{RequestHandler, ResponseBodyReader},
    headers::HasHeaders,
    interceptor::{self, Interceptor, InterceptorFuture, InterceptorObj},
    parsing::header_to_curl_string,
    util::future::FutureExt,
};
use futures_io::AsyncRead;
use futures_lite::future::try_zip;
use http::{
    header::{HeaderMap, HeaderName, HeaderValue},
    Request, Response,
};
use once_cell::sync::Lazy;
use ratcurl as curl;
use std::{
    convert::TryFrom,
    fmt,
    future::Future,
    io,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
    time::Duration,
};
use tracing_futures::Instrument;

static USER_AGENT: Lazy<String> = Lazy::new(|| {
    format!(
        "curl/{} ratmom/{}",
        curl::Version::get().version(),
        env!("CARGO_PKG_VERSION")
    )
});

/// An HTTP client builder, capable of creating custom [`HttpClient`] instances
/// with customized behavior.
///
/// Any option that can be configured per-request can also be configured on a
/// client builder as a default setting. Request configuration is provided by
/// the [`Configurable`] trait, which is also available in the
/// [`prelude`](crate::prelude) module.
///
/// # Examples
///
/// ```
/// use ratmom::{
///     config::{RedirectPolicy, VersionNegotiation},
///     prelude::*,
///     HttpClient,
/// };
/// use std::time::Duration;
///
/// let client = HttpClient::builder()
///     .timeout(Duration::from_secs(60))
///     .redirect_policy(RedirectPolicy::Limit(10))
///     .version_negotiation(VersionNegotiation::http2())
///     .build()?;
/// # Ok::<(), ratmom::Error>(())
/// ```
#[must_use = "builders have no effect if unused"]
pub struct HttpClientBuilder {
    agent_builder: AgentBuilder,
    client_config: ClientConfig,
    request_config: RequestConfig,
    interceptors: Vec<InterceptorObj>,
    default_headers: HeaderMap<HeaderValue>,
    error: Option<Error>,

    #[cfg(feature = "cookies")]
    cookie_jar: Option<crate::cookies::CookieJar>,
}

impl Default for HttpClientBuilder {
    fn default() -> Self {
        Self::new()
    }
}

impl HttpClientBuilder {
    /// Create a new builder for building a custom client. All configuration
    /// will start out with the default values.
    ///
    /// This is equivalent to the [`Default`] implementation.
    pub fn new() -> Self {
        Self {
            agent_builder: AgentBuilder::default(),
            client_config: ClientConfig::default(),
            request_config: RequestConfig::client_defaults(),
            interceptors: vec![
                // Add redirect support. Note that this is _always_ the first,
                // and thus the outermost, interceptor. Also note that this does
                // not enable redirect following, it just implements support for
                // it, if a request asks for it.
                crate::redirect::RedirectInterceptor.into(),
            ],
            default_headers: HeaderMap::new(),
            error: None,

            #[cfg(feature = "cookies")]
            cookie_jar: None,
        }
    }

    /// Enable persistent cookie handling for all requests using this client
    /// using a shared cookie jar.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use ratmom::{prelude::*, HttpClient};
    ///
    /// // Create a client with a cookie jar.
    /// let client = HttpClient::builder()
    ///     .cookies()
    ///     .build()?;
    ///
    /// // Make a request that sets a cookie.
    /// let uri = "http://httpbin.org/cookies/set?foo=bar".parse()?;
    /// client.get(&uri)?;
    ///
    /// // Get the cookie from the cookie jar.
    /// let cookie = client.cookie_jar()
    ///     .unwrap()
    ///     .get_by_name(&uri, "foo")
    ///     .unwrap();
    /// assert_eq!(cookie, "bar");
    ///
    /// # Ok::<(), Box<dyn std::error::Error>>(())
    /// ```
    ///
    /// # Availability
    ///
    /// This method is only available when the [`cookies`](index.html#cookies)
    /// feature is enabled.
    #[cfg(feature = "cookies")]
    pub fn cookies(self) -> Self {
        // Note: this method is now essentially the same as setting a default
        // cookie jar, but remains for backwards compatibility.
        self.cookie_jar(Default::default())
    }

    /// Add a request interceptor to the client.
    ///
    /// # Availability
    ///
    /// This method is only available when the
    /// [`unstable-interceptors`](index.html#unstable-interceptors) feature is
    /// enabled.
    #[cfg(feature = "unstable-interceptors")]
    #[inline]
    pub fn interceptor(self, interceptor: impl Interceptor + 'static) -> Self {
        self.interceptor_impl(interceptor)
    }

    #[allow(unused)]
    pub(crate) fn interceptor_impl(mut self, interceptor: impl Interceptor + 'static) -> Self {
        self.interceptors.push(interceptor.into());
        self
    }

    /// Set a maximum number of simultaneous connections that this client is
    /// allowed to keep open at one time.
    ///
    /// If set to a value greater than zero, no more than `max` connections will
    /// be opened at one time. If executing a new request would require opening
    /// a new connection, then the request will stay in a "pending" state until
    /// an existing connection can be used or an active request completes and
    /// can be closed, making room for a new connection.
    ///
    /// Setting this value to `0` disables the limit entirely.
    ///
    /// This is an effective way of limiting the number of sockets or file
    /// descriptors that this client will open, though note that the client may
    /// use file descriptors for purposes other than just HTTP connections.
    ///
    /// By default this value is `0` and no limit is enforced.
    ///
    /// To apply a limit per-host, see
    /// [`HttpClientBuilder::max_connections_per_host`].
    pub fn max_connections(mut self, max: usize) -> Self {
        self.agent_builder = self.agent_builder.max_connections(max);
        self
    }

    /// Set a maximum number of simultaneous connections that this client is
    /// allowed to keep open to individual hosts at one time.
    ///
    /// If set to a value greater than zero, no more than `max` connections will
    /// be opened to a single host at one time. If executing a new request would
    /// require opening a new connection, then the request will stay in a
    /// "pending" state until an existing connection can be used or an active
    /// request completes and can be closed, making room for a new connection.
    ///
    /// Setting this value to `0` disables the limit entirely. By default this
    /// value is `0` and no limit is enforced.
    ///
    /// To set a global limit across all hosts, see
    /// [`HttpClientBuilder::max_connections`].
    pub fn max_connections_per_host(mut self, max: usize) -> Self {
        self.agent_builder = self.agent_builder.max_connections_per_host(max);
        self
    }

    /// Set the size of the connection cache.
    ///
    /// After requests are completed, if the underlying connection is reusable,
    /// it is added to the connection cache to be reused to reduce latency for
    /// future requests.
    ///
    /// Setting the size to `0` disables connection caching for all requests
    /// using this client.
    ///
    /// By default this value is unspecified. A reasonable default size will be
    /// chosen.
    pub fn connection_cache_size(mut self, size: usize) -> Self {
        self.agent_builder = self.agent_builder.connection_cache_size(size);
        self.client_config.close_connections = size == 0;
        self
    }

    /// Set the maximum time-to-live (TTL) for connections to remain in the
    /// connection cache.
    ///
    /// After requests are completed, if the underlying connection is
    /// reusable, it is added to the connection cache to be reused to reduce
    /// latency for future requests. This option controls how long such
    /// connections should be still considered valid before being discarded.
    ///
    /// Old connections have a high risk of not working any more and thus
    /// attempting to use them wastes time if the server has disconnected.
    ///
    /// The default TTL is 118 seconds.
    pub fn connection_cache_ttl(mut self, ttl: Duration) -> Self {
        self.client_config.connection_cache_ttl = Some(ttl);
        self
    }

    /// Configure DNS caching.
    ///
    /// By default, DNS entries are cached by the client executing the request
    /// and are used until the entry expires. Calling this method allows you to
    /// change the entry timeout duration or disable caching completely.
    ///
    /// Note that DNS entry TTLs are not respected, regardless of this setting.
    ///
    /// By default caching is enabled with a 60 second timeout.
    ///
    /// # Examples
    ///
    /// ```
    /// use ratmom::{config::*, prelude::*, HttpClient};
    /// use std::time::Duration;
    ///
    /// let client = HttpClient::builder()
    ///     // Cache entries for 10 seconds.
    ///     .dns_cache(Duration::from_secs(10))
    ///     // Cache entries forever.
    ///     .dns_cache(DnsCache::Forever)
    ///     // Don't cache anything.
    ///     .dns_cache(DnsCache::Disable)
    ///     .build()?;
    /// # Ok::<(), ratmom::Error>(())
    /// ```
    pub fn dns_cache<C>(mut self, cache: C) -> Self
    where
        C: Into<DnsCache>,
    {
        // This option is technically supported per-request by curl, but we only
        // expose it on the client. Since the DNS cache is shared between all
        // requests, exposing this option per-request would actually cause the
        // timeout to alternate values for every request with a different
        // timeout, resulting in some confusing (but valid) behavior.
        self.client_config.dns_cache = Some(cache.into());
        self
    }

    /// Set a mapping of DNS resolve overrides.
    ///
    /// Entries in the given map will be used first before using the default DNS
    /// resolver for host+port pairs.
    ///
    /// Note that DNS resolving is only performed when establishing a new
    ///
    /// # Examples
    ///
    /// ```
    /// use ratmom::{config::ResolveMap, prelude::*, HttpClient};
    /// use std::net::IpAddr;
    ///
    /// let client = HttpClient::builder()
    ///     .dns_resolve(ResolveMap::new()
    ///         // Send requests for example.org on port 80 to 127.0.0.1.
    ///         .add("example.org", 80, [127, 0, 0, 1]))
    ///     .build()?;
    /// # Ok::<(), Box<dyn std::error::Error>>(())
    /// ```
    pub fn dns_resolve(mut self, map: ResolveMap) -> Self {
        // Similar to the dns_cache option, this operation actually affects all
        // requests in a multi handle so we do not expose it per-request to
        // avoid confusing behavior.
        self.client_config.dns_resolve = Some(map);
        self
    }

    /// Add a default header to be passed with every request.
    ///
    /// If a default header value is already defined for the given key, then a
    /// second header value will be appended to the list and multiple header
    /// values will be included in the request.
    ///
    /// If any values are defined for this header key on an outgoing request,
    /// they will override any default header values.
    ///
    /// If the header key or value are malformed, [`HttpClientBuilder::build`]
    /// will return an error.
    ///
    /// # Examples
    ///
    /// ```
    /// use ratmom::{prelude::*, HttpClient};
    ///
    /// let client = HttpClient::builder()
    ///     .default_header("some-header", "some-value")
    ///     .build()?;
    /// # Ok::<(), Box<dyn std::error::Error>>(())
    /// ```
    pub fn default_header<K, V>(mut self, key: K, value: V) -> Self
    where
        HeaderName: TryFrom<K>,
        <HeaderName as TryFrom<K>>::Error: Into<http::Error>,
        HeaderValue: TryFrom<V>,
        <HeaderValue as TryFrom<V>>::Error: Into<http::Error>,
    {
        match HeaderName::try_from(key) {
            Ok(key) => match HeaderValue::try_from(value) {
                Ok(value) => {
                    self.default_headers.append(key, value);
                }
                Err(e) => {
                    self.error = Some(Error::new(ErrorKind::ClientInitialization, e.into()));
                }
            },
            Err(e) => {
                self.error = Some(Error::new(ErrorKind::ClientInitialization, e.into()));
            }
        }
        self
    }

    /// Set the default headers to include in every request, replacing any
    /// previously set default headers.
    ///
    /// Headers defined on an individual request always override headers in the
    /// default map.
    ///
    /// If any header keys or values are malformed, [`HttpClientBuilder::build`]
    /// will return an error.
    ///
    /// # Examples
    ///
    /// Set default headers from a slice:
    ///
    /// ```
    /// use ratmom::{prelude::*, HttpClient};
    ///
    /// let mut builder = HttpClient::builder()
    ///     .default_headers(&[
    ///         ("some-header", "value1"),
    ///         ("some-header", "value2"),
    ///         ("some-other-header", "some-other-value"),
    ///     ])
    ///     .build()?;
    /// # Ok::<(), Box<dyn std::error::Error>>(())
    /// ```
    ///
    /// Using an existing header map:
    ///
    /// ```
    /// use ratmom::{prelude::*, HttpClient};
    ///
    /// let mut headers = http::HeaderMap::new();
    /// headers.append("some-header".parse::<http::header::HeaderName>()?, "some-value".parse()?);
    ///
    /// let mut builder = HttpClient::builder()
    ///     .default_headers(&headers)
    ///     .build()?;
    /// # Ok::<(), Box<dyn std::error::Error>>(())
    /// ```
    ///
    /// Using a hashmap:
    ///
    /// ```
    /// use ratmom::{prelude::*, HttpClient};
    /// use std::collections::HashMap;
    ///
    /// let mut headers = HashMap::new();
    /// headers.insert("some-header", "some-value");
    ///
    /// let mut builder = HttpClient::builder()
    ///     .default_headers(headers)
    ///     .build()?;
    /// # Ok::<(), Box<dyn std::error::Error>>(())
    /// ```
    pub fn default_headers<K, V, I, P>(mut self, headers: I) -> Self
    where
        HeaderName: TryFrom<K>,
        <HeaderName as TryFrom<K>>::Error: Into<http::Error>,
        HeaderValue: TryFrom<V>,
        <HeaderValue as TryFrom<V>>::Error: Into<http::Error>,
        I: IntoIterator<Item = P>,
        P: HeaderPair<K, V>,
    {
        self.default_headers.clear();

        for (key, value) in headers.into_iter().map(HeaderPair::pair) {
            self = self.default_header(key, value);
        }

        self
    }

    /// Build an [`HttpClient`] using the configured options.
    ///
    /// If the client fails to initialize, an error will be returned.
    #[allow(unused_mut)]
    pub fn build(mut self) -> Result<HttpClient, Error> {
        if let Some(err) = self.error {
            return Err(err);
        }

        // Add cookie interceptor if enabled.
        #[cfg(feature = "cookies")]
        {
            let jar = self.cookie_jar.clone();
            self = self.interceptor_impl(crate::cookies::interceptor::CookieInterceptor::new(jar));
        }

        // Add default header interceptor if any default headers were specified.
        if !self.default_headers.is_empty() {
            let default_headers = std::mem::take(&mut self.default_headers);
            self = self.interceptor_impl(DefaultHeadersInterceptor::from(default_headers));
        }

        #[cfg(not(feature = "cookies"))]
        let inner = Inner {
            agent: self
                .agent_builder
                .spawn()
                .map_err(|e| Error::new(ErrorKind::ClientInitialization, e))?,
            client_config: self.client_config,
            request_config: self.request_config,
            interceptors: self.interceptors,
        };

        #[cfg(feature = "cookies")]
        let inner = Inner {
            agent: self
                .agent_builder
                .spawn()
                .map_err(|e| Error::new(ErrorKind::ClientInitialization, e))?,
            client_config: self.client_config,
            request_config: self.request_config,
            interceptors: self.interceptors,
            cookie_jar: self.cookie_jar,
        };

        Ok(HttpClient {
            inner: Arc::new(inner),
        })
    }
}

impl Configurable for HttpClientBuilder {
    #[cfg(feature = "cookies")]
    fn cookie_jar(mut self, cookie_jar: crate::cookies::CookieJar) -> Self {
        self.cookie_jar = Some(cookie_jar);
        self
    }
}

impl WithRequestConfig for HttpClientBuilder {
    #[inline]
    fn with_config(mut self, f: impl FnOnce(&mut RequestConfig)) -> Self {
        f(&mut self.request_config);
        self
    }
}

impl fmt::Debug for HttpClientBuilder {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("HttpClientBuilder").finish()
    }
}

/// Helper trait for defining key-value pair types that can be dereferenced into
/// a tuple from a reference.
///
/// This trait is sealed and cannot be implemented for types outside of ratmom.
pub trait HeaderPair<K, V> {
    fn pair(self) -> (K, V);
}

impl<K, V> HeaderPair<K, V> for (K, V) {
    fn pair(self) -> (K, V) {
        self
    }
}

impl<'a, K: Copy, V: Copy> HeaderPair<K, V> for &'a (K, V) {
    fn pair(self) -> (K, V) {
        (self.0, self.1)
    }
}

/// An HTTP client for making requests.
///
/// An [`HttpClient`] instance acts as a session for executing one or more HTTP
/// requests, and also allows you to set common protocol settings that should be
/// applied to all requests made with the client.
///
/// [`HttpClient`] is entirely thread-safe, and implements both [`Send`] and
/// [`Sync`]. You are free to create clients outside the context of the "main"
/// thread, or move them between threads. You can even invoke many requests
/// simultaneously from multiple threads, since doing so doesn't need a mutable
/// reference to the client. This is fairly cheap to do as well, since
/// internally requests use lock-free message passing to get things going.
///
/// The client maintains a connection pool internally and is not cheap to
/// create, so we recommend creating a client once and re-using it throughout
/// your code. Creating a new client for every request would decrease
/// performance significantly, and might cause errors to occur under high
/// workloads, caused by creating too many system resources like sockets or
/// threads.
///
/// It is not universally true that you should use exactly one client instance
/// in an application. All HTTP requests made with the same client will share
/// any session-wide state, like cookies or persistent connections. It may be
/// the case that it is better to create separate clients for separate areas of
/// an application if they have separate concerns or are making calls to
/// different servers. If you are creating an API client library, that might be
/// a good place to maintain your own internal client.
///
/// # Examples
///
/// ```no_run
/// use ratmom::{prelude::*, HttpClient};
///
/// // Create a new client using reasonable defaults.
/// let client = HttpClient::new()?;
///
/// // Make some requests.
/// let mut response = client.get("https://example.org")?;
/// assert!(response.status().is_success());
///
/// println!("Response:\n{}", response.text()?);
/// # Ok::<(), ratmom::Error>(())
/// ```
///
/// Customizing the client configuration:
///
/// ```no_run
/// use ratmom::{
///     config::{RedirectPolicy, VersionNegotiation},
///     prelude::*,
///     HttpClient,
/// };
/// use std::time::Duration;
///
/// let client = HttpClient::builder()
///     .version_negotiation(VersionNegotiation::http11())
///     .redirect_policy(RedirectPolicy::Limit(10))
///     .timeout(Duration::from_secs(20))
///     // May return an error if there's something wrong with our configuration
///     // or if the client failed to start up.
///     .build()?;
///
/// let response = client.get("https://example.org")?;
/// assert!(response.status().is_success());
/// # Ok::<(), ratmom::Error>(())
/// ```
///
/// See the documentation on [`HttpClientBuilder`] for a comprehensive look at
/// what can be configured.
#[derive(Clone)]
pub struct HttpClient {
    inner: Arc<Inner>,
}

struct Inner {
    /// This is how we talk to our background agent thread.
    agent: agent::Handle,

    /// Client-wide request configuration.
    client_config: ClientConfig,

    /// Default request configuration to use if not specified in a request.
    request_config: RequestConfig,

    /// Registered interceptors that requests should pass through.
    interceptors: Vec<InterceptorObj>,

    /// Configured cookie jar, if any.
    #[cfg(feature = "cookies")]
    cookie_jar: Option<crate::cookies::CookieJar>,
}

impl HttpClient {
    /// Create a new HTTP client using the default configuration.
    ///
    /// If the client fails to initialize, an error will be returned.
    pub fn new() -> Result<Self, Error> {
        HttpClientBuilder::default().build()
    }

    /// Get a reference to a global client instance.
    ///
    /// TODO: Stabilize.
    pub(crate) fn shared() -> &'static Self {
        static SHARED: Lazy<HttpClient> =
            Lazy::new(|| HttpClient::new().expect("shared client failed to initialize"));

        &SHARED
    }

    /// Create a new [`HttpClientBuilder`] for building a custom client.
    pub fn builder() -> HttpClientBuilder {
        HttpClientBuilder::default()
    }

    /// Get the configured cookie jar for this HTTP client, if any.
    ///
    /// # Availability
    ///
    /// This method is only available when the [`cookies`](index.html#cookies)
    /// feature is enabled.
    #[cfg(feature = "cookies")]
    pub fn cookie_jar(&self) -> Option<&crate::cookies::CookieJar> {
        self.inner.cookie_jar.as_ref()
    }

    /// Get the configured interceptors for this HTTP client.
    pub(crate) fn interceptors(&self) -> &[InterceptorObj] {
        &self.inner.interceptors
    }

    /// Send a GET request to the given URI.
    ///
    /// To customize the request further, see [`HttpClient::send`]. To execute
    /// the request asynchronously, see [`HttpClient::get_async`].
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use ratmom::{prelude::*, HttpClient};
    ///
    /// let client = HttpClient::new()?;
    /// let mut response = client.get("https://example.org")?;
    /// println!("{}", response.text()?);
    /// # Ok::<(), ratmom::Error>(())
    /// ```
    #[inline]
    pub fn get<U>(&self, uri: U) -> Result<Response<Body>, Error>
    where
        http::Uri: TryFrom<U>,
        <http::Uri as TryFrom<U>>::Error: Into<http::Error>,
    {
        match http::Request::get(uri).body(()) {
            Ok(request) => self.send(request),
            Err(e) => Err(Error::from_any(e)),
        }
    }

    /// Send a GET request to the given URI asynchronously.
    ///
    /// To customize the request further, see [`HttpClient::send_async`]. To
    /// execute the request synchronously, see [`HttpClient::get`].
    pub fn get_async<U>(&self, uri: U) -> ResponseFuture
    where
        http::Uri: TryFrom<U>,
        <http::Uri as TryFrom<U>>::Error: Into<http::Error>,
    {
        match http::Request::get(uri).body(()) {
            Ok(request) => self.send_async(request),
            Err(e) => ResponseFuture::error(Error::from_any(e)),
        }
    }

    /// Send a HEAD request to the given URI.
    ///
    /// To customize the request further, see [`HttpClient::send`]. To execute
    /// the request asynchronously, see [`HttpClient::head_async`].
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use ratmom::{prelude::*, HttpClient};
    ///
    /// let client = HttpClient::new()?;
    /// let response = client.head("https://example.org")?;
    /// println!("Page size: {:?}", response.headers()["content-length"]);
    /// # Ok::<(), ratmom::Error>(())
    /// ```
    #[inline]
    pub fn head<U>(&self, uri: U) -> Result<Response<Body>, Error>
    where
        http::Uri: TryFrom<U>,
        <http::Uri as TryFrom<U>>::Error: Into<http::Error>,
    {
        match http::Request::head(uri).body(()) {
            Ok(request) => self.send(request),
            Err(e) => Err(Error::from_any(e)),
        }
    }

    /// Send a HEAD request to the given URI asynchronously.
    ///
    /// To customize the request further, see [`HttpClient::send_async`]. To
    /// execute the request synchronously, see [`HttpClient::head`].
    pub fn head_async<U>(&self, uri: U) -> ResponseFuture
    where
        http::Uri: TryFrom<U>,
        <http::Uri as TryFrom<U>>::Error: Into<http::Error>,
    {
        match http::Request::head(uri).body(()) {
            Ok(request) => self.send_async(request),
            Err(e) => ResponseFuture::error(Error::from_any(e)),
        }
    }

    /// Send a POST request to the given URI with a given request body.
    ///
    /// To customize the request further, see [`HttpClient::send`]. To execute
    /// the request asynchronously, see [`HttpClient::post_async`].
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use ratmom::{prelude::*, HttpClient};
    ///
    /// let client = HttpClient::new()?;
    ///
    /// let response = client.post("https://httpbin.org/post", r#"{
    ///     "speed": "fast",
    ///     "cool_name": true
    /// }"#)?;
    /// # Ok::<(), ratmom::Error>(())
    #[inline]
    pub fn post<U, B>(&self, uri: U, body: B) -> Result<Response<Body>, Error>
    where
        http::Uri: TryFrom<U>,
        <http::Uri as TryFrom<U>>::Error: Into<http::Error>,
        B: Into<Body>,
    {
        match http::Request::post(uri).body(body) {
            Ok(request) => self.send(request),
            Err(e) => Err(Error::from_any(e)),
        }
    }

    /// Send a POST request to the given URI asynchronously with a given request
    /// body.
    ///
    /// To customize the request further, see [`HttpClient::send_async`]. To
    /// execute the request synchronously, see [`HttpClient::post`].
    pub fn post_async<U, B>(&self, uri: U, body: B) -> ResponseFuture
    where
        http::Uri: TryFrom<U>,
        <http::Uri as TryFrom<U>>::Error: Into<http::Error>,
        B: Into<AsyncBody>,
    {
        match http::Request::post(uri).body(body) {
            Ok(request) => self.send_async(request),
            Err(e) => ResponseFuture::error(Error::from_any(e)),
        }
    }

    /// Send a PUT request to the given URI with a given request body.
    ///
    /// To customize the request further, see [`HttpClient::send`]. To execute
    /// the request asynchronously, see [`HttpClient::put_async`].
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use ratmom::{prelude::*, HttpClient};
    ///
    /// let client = HttpClient::new()?;
    ///
    /// let response = client.put("https://httpbin.org/put", r#"{
    ///     "speed": "fast",
    ///     "cool_name": true
    /// }"#)?;
    /// # Ok::<(), ratmom::Error>(())
    /// ```
    #[inline]
    pub fn put<U, B>(&self, uri: U, body: B) -> Result<Response<Body>, Error>
    where
        http::Uri: TryFrom<U>,
        <http::Uri as TryFrom<U>>::Error: Into<http::Error>,
        B: Into<Body>,
    {
        match http::Request::put(uri).body(body) {
            Ok(request) => self.send(request),
            Err(e) => Err(Error::from_any(e)),
        }
    }

    /// Send a PUT request to the given URI asynchronously with a given request
    /// body.
    ///
    /// To customize the request further, see [`HttpClient::send_async`]. To
    /// execute the request synchronously, see [`HttpClient::put`].
    pub fn put_async<U, B>(&self, uri: U, body: B) -> ResponseFuture
    where
        http::Uri: TryFrom<U>,
        <http::Uri as TryFrom<U>>::Error: Into<http::Error>,
        B: Into<AsyncBody>,
    {
        match http::Request::put(uri).body(body) {
            Ok(request) => self.send_async(request),
            Err(e) => ResponseFuture::error(Error::from_any(e)),
        }
    }

    /// Send a DELETE request to the given URI.
    ///
    /// To customize the request further, see [`HttpClient::send`]. To execute
    /// the request asynchronously, see [`HttpClient::delete_async`].
    #[inline]
    pub fn delete<U>(&self, uri: U) -> Result<Response<Body>, Error>
    where
        http::Uri: TryFrom<U>,
        <http::Uri as TryFrom<U>>::Error: Into<http::Error>,
    {
        match http::Request::delete(uri).body(()) {
            Ok(request) => self.send(request),
            Err(e) => Err(Error::from_any(e)),
        }
    }

    /// Send a DELETE request to the given URI asynchronously.
    ///
    /// To customize the request further, see [`HttpClient::send_async`]. To
    /// execute the request synchronously, see [`HttpClient::delete`].
    pub fn delete_async<U>(&self, uri: U) -> ResponseFuture
    where
        http::Uri: TryFrom<U>,
        <http::Uri as TryFrom<U>>::Error: Into<http::Error>,
    {
        match http::Request::delete(uri).body(()) {
            Ok(request) => self.send_async(request),
            Err(e) => ResponseFuture::error(Error::from_any(e)),
        }
    }

    /// Send an HTTP request and return the HTTP response.
    ///
    /// Upon success, will return a [`Response`] containing the status code,
    /// response headers, and response body from the server. The [`Response`] is
    /// returned as soon as the HTTP response headers are received; the
    /// connection will remain open to stream the response body in real time.
    /// Dropping the response body without fully consuming it will close the
    /// connection early without downloading the rest of the response body.
    ///
    /// The response body is provided as a stream that may only be consumed
    /// once. If you need to inspect the response body more than once, you will
    /// have to either read it into memory or write it to a file.
    ///
    /// The response body is not a direct stream from the server, but uses its
    /// own buffering mechanisms internally for performance. It is therefore
    /// undesirable to wrap the body in additional buffering readers.
    ///
    /// _Note that the actual underlying socket connection isn't necessarily
    /// closed on drop. It may remain open to be reused if pipelining is being
    /// used, the connection is configured as `keep-alive`, and so on._
    ///
    /// This client's configuration can be overridden for this request by
    /// configuring the request using methods provided by the [`Configurable`]
    /// trait.
    ///
    /// To execute a request asynchronously, see [`HttpClient::send_async`].
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use ratmom::{prelude::*, HttpClient, Request};
    ///
    /// let client = HttpClient::new()?;
    ///
    /// let request = Request::post("https://httpbin.org/post")
    ///     .header("Content-Type", "application/json")
    ///     .body(r#"{
    ///         "speed": "fast",
    ///         "cool_name": true
    ///     }"#)?;
    ///
    /// let response = client.send(request)?;
    /// assert!(response.status().is_success());
    /// # Ok::<(), ratmom::Error>(())
    /// ```
    pub fn send<B>(&self, request: Request<B>) -> Result<Response<Body>, Error>
    where
        B: Into<Body>,
    {
        let span = tracing::debug_span!(
            "send",
            method = ?request.method(),
            uri = ?request.uri(),
        );

        let mut writer_maybe = None;

        let request = request.map(|body| {
            let (async_body, writer) = body.into().into_async();
            writer_maybe = writer;
            async_body
        });

        let response = async move {
            // Instead of simply blocking the current thread until the response
            // is received, we can use the current thread to read from the
            // request body synchronously while concurrently waiting for the
            // response.
            if let Some(mut writer) = writer_maybe {
                // Note that the `send_async` future is given first; this
                // ensures that it is polled first and thus the request is
                // initiated before we attempt to write the request body.
                let (response, _) = try_zip(self.send_async_inner(request), async move {
                    writer.write().await.map_err(Error::from)
                })
                .await?;

                Ok(response)
            } else {
                self.send_async_inner(request).await
            }
        }
        .instrument(span)
        .wait()?;

        Ok(response.map(|body| body.into_sync()))
    }

    /// Send an HTTP request and return the HTTP response asynchronously.
    ///
    /// Upon success, will return a [`Response`] containing the status code,
    /// response headers, and response body from the server. The [`Response`] is
    /// returned as soon as the HTTP response headers are received; the
    /// connection will remain open to stream the response body in real time.
    /// Dropping the response body without fully consuming it will close the
    /// connection early without downloading the rest of the response body.
    ///
    /// The response body is provided as a stream that may only be consumed
    /// once. If you need to inspect the response body more than once, you will
    /// have to either read it into memory or write it to a file.
    ///
    /// The response body is not a direct stream from the server, but uses its
    /// own buffering mechanisms internally for performance. It is therefore
    /// undesirable to wrap the body in additional buffering readers.
    ///
    /// _Note that the actual underlying socket connection isn't necessarily
    /// closed on drop. It may remain open to be reused if pipelining is being
    /// used, the connection is configured as `keep-alive`, and so on._
    ///
    /// This client's configuration can be overridden for this request by
    /// configuring the request using methods provided by the [`Configurable`]
    /// trait.
    ///
    /// To execute a request synchronously, see [`HttpClient::send`].
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # async fn run() -> Result<(), ratmom::Error> {
    /// use ratmom::{prelude::*, HttpClient, Request};
    ///
    /// let client = HttpClient::new()?;
    ///
    /// let request = Request::post("https://httpbin.org/post")
    ///     .header("Content-Type", "application/json")
    ///     .body(r#"{
    ///         "speed": "fast",
    ///         "cool_name": true
    ///     }"#)?;
    ///
    /// let response = client.send_async(request).await?;
    /// assert!(response.status().is_success());
    /// # Ok(()) }
    /// ```
    #[inline]
    pub fn send_async<B>(&self, request: Request<B>) -> ResponseFuture
    where
        B: Into<AsyncBody>,
    {
        let span = tracing::debug_span!(
            "send_async",
            method = ?request.method(),
            uri = ?request.uri(),
        );

        ResponseFuture::new(
            self.send_async_inner(request.map(Into::into))
                .instrument(span),
        )
    }

    /// Actually send the request. All the public methods go through here.
    fn send_async_inner(&self, mut request: Request<AsyncBody>) -> InterceptorFuture<Error> {
        // Populate request config, creating if necessary.
        if let Some(config) = request.extensions_mut().get_mut::<RequestConfig>() {
            // Merge request configuration with defaults.
            config.merge(&self.inner.request_config);
        } else {
            request
                .extensions_mut()
                .insert(self.inner.request_config.clone());
        }

        let ctx = interceptor::Context {
            client: self.clone(),
            interceptor_offset: 0,
        };

        ctx.send(request)
    }

    pub(crate) fn invoke(&self, mut request: Request<AsyncBody>) -> InterceptorFuture<Error> {
        let client = self.clone();

        Box::pin(async move {
            let is_head_request = request.method() == http::Method::HEAD;

            // Set default user agent if not specified.
            request
                .headers_mut()
                .entry(http::header::USER_AGENT)
                .or_insert(USER_AGENT.parse().unwrap());

            // Check if automatic decompression is enabled; we'll need to know
            // this later after the response is sent.
            let is_automatic_decompression = request
                .extensions()
                .get::<RequestConfig>()
                .unwrap()
                .automatic_decompression
                .unwrap_or(false);

            // Create and configure a curl easy handle to fulfil the request.
            let (easy, future) = client
                .create_easy_handle(request)
                .map_err(Error::from_any)?;

            // Send the request to the agent to be executed.
            client.inner.agent.submit_request(easy)?;

            // Await for the response headers.
            let response = future.await?;

            // If a Content-Length header is present, include that information in
            // the body as well.
            let body_len = response.content_length().filter(|_| {
                // If automatic decompression is enabled, and will likely be
                // selected, then the value of Content-Length does not indicate
                // the uncompressed body length and merely the compressed data
                // length. If it looks like we are in this scenario then we
                // ignore the Content-Length, since it can only cause confusion
                // when included with the body.
                if is_automatic_decompression {
                    if let Some(value) = response.headers().get(http::header::CONTENT_ENCODING) {
                        if value != "identity" {
                            return false;
                        }
                    }
                }

                true
            });

            // Convert the reader into an opaque Body.
            Ok(response.map(|reader| {
                if is_head_request {
                    AsyncBody::empty()
                } else {
                    let body = ResponseBody {
                        inner: reader,
                        // Extend the lifetime of the agent by including a reference
                        // to its handle in the response body.
                        _client: client,
                    };

                    if let Some(len) = body_len {
                        AsyncBody::from_reader_sized(body, len)
                    } else {
                        AsyncBody::from_reader(body)
                    }
                }
            }))
        })
    }

    fn create_easy_handle(
        &self,
        mut request: Request<AsyncBody>,
    ) -> Result<
        (
            curl::easy::Easy2<RequestHandler>,
            impl Future<Output = Result<Response<ResponseBodyReader>, Error>>,
        ),
        curl::Error,
    > {
        // Prepare the request plumbing.
        let body = std::mem::take(request.body_mut());
        let has_body = !body.is_empty();
        let body_length = body.len();
        let (mut easy, future) = RequestHandler::new(body);

        // Set whether curl should generate verbose debug data for us to log.
        easy.verbose(easy.get_ref().is_debug_enabled())?;

        // Disable connection reuse logs if connection cache is disabled.
        if self.inner.client_config.close_connections {
            easy.get_mut().disable_connection_reuse_log = true;
        }

        easy.signal(false)?;

        let request_config = request.extensions().get::<RequestConfig>().unwrap();

        request_config.set_opt(&mut easy)?;
        self.inner.client_config.set_opt(&mut easy)?;

        // Check if we need to disable the Expect header.
        let disable_expect_header = request_config
            .expect_continue
            .as_ref()
            .map(|x| x.is_disabled())
            .unwrap_or_default();

        // Set the HTTP method to use. Curl ties in behavior with the request
        // method, so we need to configure this carefully.
        #[allow(indirect_structural_match)]
        match (request.method(), has_body) {
            // Normal GET request.
            (&http::Method::GET, false) => {
                easy.get(true)?;
            }
            // Normal HEAD request.
            (&http::Method::HEAD, false) => {
                easy.nobody(true)?;
            }
            // POST requests have special redirect behavior.
            (&http::Method::POST, _) => {
                easy.post(true)?;
            }
            // Normal PUT request.
            (&http::Method::PUT, _) => {
                easy.upload(true)?;
            }
            // Default case is to either treat request like a GET or PUT.
            (method, has_body) => {
                easy.upload(has_body)?;
                easy.custom_request(method.as_str())?;
            }
        }

        easy.url(&uri_to_string(request.uri()))?;

        // If the request has a body, then we either need to tell curl how large
        // the body is if we know it, or tell curl to use chunked encoding. If
        // we do neither, curl will simply not send the body without warning.
        if has_body {
            // Use length given in Content-Length header, or the size defined by
            // the body itself.
            let body_length = request
                .headers()
                .get("Content-Length")
                .and_then(|value| value.to_str().ok())
                .and_then(|value| value.parse().ok())
                .or(body_length);

            if let Some(len) = body_length {
                if request.method() == http::Method::POST {
                    easy.post_field_size(len)?;
                } else {
                    easy.in_filesize(len)?;
                }
            } else {
                // Set the Transfer-Encoding header to instruct curl to use
                // chunked encoding. Replaces any existing values that may be
                // incorrect.
                request.headers_mut().insert(
                    "Transfer-Encoding",
                    http::header::HeaderValue::from_static("chunked"),
                );
            }
        }

        // Generate a header list for curl.
        let mut headers = curl::easy::List::new();

        let title_case = request
            .extensions()
            .get::<RequestConfig>()
            .unwrap()
            .title_case_headers
            .unwrap_or(false);

        for (name, value) in request.headers().iter() {
            headers.append(&header_to_curl_string(name, value, title_case))?;
        }

        if disable_expect_header {
            headers.append("Expect:")?;
        }

        easy.http_headers(headers)?;

        Ok((easy, future))
    }
}

impl fmt::Debug for HttpClient {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("HttpClient").finish()
    }
}

/// A future for a request being executed.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ResponseFuture(Pin<Box<dyn Future<Output = <Self as Future>::Output> + 'static + Send>>);

impl ResponseFuture {
    fn new<F>(future: F) -> Self
    where
        F: Future<Output = <Self as Future>::Output> + Send + 'static,
    {
        ResponseFuture(Box::pin(future))
    }

    fn error(error: Error) -> Self {
        Self::new(async move { Err(error) })
    }
}

impl Future for ResponseFuture {
    type Output = Result<Response<AsyncBody>, Error>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.0.as_mut().poll(cx)
    }
}

impl fmt::Debug for ResponseFuture {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("ResponseFuture").finish()
    }
}

/// Response body stream. Holds a reference to the agent to ensure it is kept
/// alive until at least this transfer is complete.
struct ResponseBody {
    inner: ResponseBodyReader,
    _client: HttpClient,
}

impl AsyncRead for ResponseBody {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        let inner = Pin::new(&mut self.inner);
        inner.poll_read(cx, buf)
    }
}

/// Convert a URI to a string. This implementation is a bit faster than the
/// `Display` implementation that avoids the `std::fmt` machinery.
fn uri_to_string(uri: &http::Uri) -> String {
    let mut s = String::new();

    if let Some(scheme) = uri.scheme() {
        s.push_str(scheme.as_str());
        s.push_str("://");
    }

    if let Some(authority) = uri.authority() {
        s.push_str(authority.as_str());
    }

    s.push_str(uri.path());

    if let Some(query) = uri.query() {
        s.push('?');
        s.push_str(query);
    }

    s
}

#[cfg(test)]
mod tests {
    use super::*;

    static_assertions::assert_impl_all!(HttpClient: Send, Sync);
    static_assertions::assert_impl_all!(HttpClientBuilder: Send);

    #[test]
    fn test_default_header() {
        HttpClientBuilder::new()
            .default_header("some-key", "some-value")
            .build()
            .expect("build client succeed");
    }

    #[test]
    fn test_default_headers_mut() {
        let mut builder = HttpClientBuilder::new().default_header("some-key", "some-value");
        let headers_map = &mut builder.default_headers;
        assert!(headers_map.len() == 1);

        let mut builder = HttpClientBuilder::new()
            .default_header("some-key", "some-value1")
            .default_header("some-key", "some-value2");
        let headers_map = &mut builder.default_headers;

        assert!(headers_map.len() == 2);

        let mut builder = HttpClientBuilder::new();
        let header_map = &mut builder.default_headers;
        assert!(header_map.is_empty())
    }
}