use crate::{
agent::{self, AgentBuilder},
body::{AsyncBody, Body},
config::{
client::ClientConfig,
request::{RequestConfig, SetOpt, WithRequestConfig},
*,
},
default_headers::DefaultHeadersInterceptor,
error::{Error, ErrorKind},
handler::{RequestHandler, ResponseBodyReader},
headers::HasHeaders,
interceptor::{self, Interceptor, InterceptorObj},
parsing::header_to_curl_string,
};
use futures_lite::{
future::{block_on, try_zip},
io::AsyncRead,
};
use http::{
Request, Response,
header::{HeaderMap, HeaderName, HeaderValue},
};
use std::{
convert::TryFrom,
fmt,
future::Future,
io,
pin::Pin,
sync::{Arc, LazyLock},
task::{Context, Poll},
time::Duration,
};
use tracing_futures::Instrument;
static USER_AGENT: LazyLock<String> = LazyLock::new(|| {
format!(
"curl/{} isahc/{}",
curl::Version::get().version(),
env!("CARGO_PKG_VERSION")
)
});
#[must_use = "builders have no effect if unused"]
pub struct HttpClientBuilder {
agent_builder: AgentBuilder,
client_config: ClientConfig,
request_config: RequestConfig,
interceptors: Vec<InterceptorObj>,
default_headers: HeaderMap<HeaderValue>,
error: Option<Error>,
#[cfg(feature = "cookies")]
cookie_jar: Option<crate::cookies::CookieJar>,
}
impl Default for HttpClientBuilder {
fn default() -> Self {
Self::new()
}
}
impl HttpClientBuilder {
pub fn new() -> Self {
Self {
agent_builder: AgentBuilder::default(),
client_config: ClientConfig::default(),
request_config: RequestConfig::client_defaults(),
interceptors: vec![
InterceptorObj::new(crate::redirect::RedirectInterceptor),
],
default_headers: HeaderMap::new(),
error: None,
#[cfg(feature = "cookies")]
cookie_jar: None,
}
}
#[cfg(feature = "cookies")]
pub fn cookies(self) -> Self {
self.cookie_jar(Default::default())
}
#[cfg(feature = "unstable-interceptors")]
#[inline]
pub fn interceptor(self, interceptor: impl Interceptor + 'static) -> Self {
self.interceptor_impl(interceptor)
}
#[allow(unused)]
pub(crate) fn interceptor_impl(mut self, interceptor: impl Interceptor + 'static) -> Self {
self.interceptors.push(InterceptorObj::new(interceptor));
self
}
pub fn max_connections(mut self, max: usize) -> Self {
self.agent_builder = self.agent_builder.max_connections(max);
self
}
pub fn max_connections_per_host(mut self, max: usize) -> Self {
self.agent_builder = self.agent_builder.max_connections_per_host(max);
self
}
pub fn connection_cache_size(mut self, size: usize) -> Self {
self.agent_builder = self.agent_builder.connection_cache_size(size);
self.client_config.close_connections = size == 0;
self
}
pub fn connection_cache_ttl(mut self, ttl: Duration) -> Self {
self.client_config.connection_cache_ttl = Some(ttl);
self
}
pub fn dns_cache<C>(mut self, cache: C) -> Self
where
C: Into<DnsCache>,
{
self.client_config.dns_cache = Some(cache.into());
self
}
pub fn dns_resolve(mut self, map: ResolveMap) -> Self {
self.client_config.dns_resolve = Some(map);
self
}
pub fn default_header<K, V>(mut self, key: K, value: V) -> Self
where
HeaderName: TryFrom<K>,
<HeaderName as TryFrom<K>>::Error: Into<http::Error>,
HeaderValue: TryFrom<V>,
<HeaderValue as TryFrom<V>>::Error: Into<http::Error>,
{
match HeaderName::try_from(key) {
Ok(key) => match HeaderValue::try_from(value) {
Ok(value) => {
self.default_headers.append(key, value);
}
Err(e) => {
self.error = Some(Error::new(ErrorKind::ClientInitialization, e.into()));
}
},
Err(e) => {
self.error = Some(Error::new(ErrorKind::ClientInitialization, e.into()));
}
}
self
}
pub fn default_headers<K, V, I, P>(mut self, headers: I) -> Self
where
HeaderName: TryFrom<K>,
<HeaderName as TryFrom<K>>::Error: Into<http::Error>,
HeaderValue: TryFrom<V>,
<HeaderValue as TryFrom<V>>::Error: Into<http::Error>,
I: IntoIterator<Item = P>,
P: HeaderPair<K, V>,
{
self.default_headers.clear();
for (key, value) in headers.into_iter().map(HeaderPair::pair) {
self = self.default_header(key, value);
}
self
}
#[allow(unused_mut)]
pub fn build(mut self) -> Result<HttpClient, Error> {
if let Some(err) = self.error {
return Err(err);
}
#[cfg(feature = "cookies")]
{
let jar = self.cookie_jar.clone();
self = self.interceptor_impl(crate::cookies::interceptor::CookieInterceptor::new(jar));
}
if !self.default_headers.is_empty() {
let default_headers = std::mem::take(&mut self.default_headers);
self = self.interceptor_impl(DefaultHeadersInterceptor::from(default_headers));
}
#[cfg(not(feature = "cookies"))]
let inner = Inner {
agent: self
.agent_builder
.spawn()
.map_err(|e| Error::new(ErrorKind::ClientInitialization, e))?,
client_config: self.client_config,
request_config: self.request_config,
interceptors: self.interceptors,
};
#[cfg(feature = "cookies")]
let inner = Inner {
agent: self
.agent_builder
.spawn()
.map_err(|e| Error::new(ErrorKind::ClientInitialization, e))?,
client_config: self.client_config,
request_config: self.request_config,
interceptors: self.interceptors,
cookie_jar: self.cookie_jar,
};
Ok(HttpClient {
inner: Arc::new(inner),
})
}
}
impl Configurable for HttpClientBuilder {
#[cfg(feature = "cookies")]
fn cookie_jar(mut self, cookie_jar: crate::cookies::CookieJar) -> Self {
self.cookie_jar = Some(cookie_jar);
self
}
}
impl WithRequestConfig for HttpClientBuilder {
#[inline]
fn with_config(mut self, f: impl FnOnce(&mut RequestConfig)) -> Self {
f(&mut self.request_config);
self
}
}
impl fmt::Debug for HttpClientBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HttpClientBuilder").finish()
}
}
pub trait HeaderPair<K, V> {
fn pair(self) -> (K, V);
}
impl<K, V> HeaderPair<K, V> for (K, V) {
fn pair(self) -> (K, V) {
self
}
}
impl<'a, K: Copy, V: Copy> HeaderPair<K, V> for &'a (K, V) {
fn pair(self) -> (K, V) {
(self.0, self.1)
}
}
#[derive(Clone)]
pub struct HttpClient {
inner: Arc<Inner>,
}
struct Inner {
agent: agent::Handle,
client_config: ClientConfig,
request_config: RequestConfig,
interceptors: Vec<InterceptorObj>,
#[cfg(feature = "cookies")]
cookie_jar: Option<crate::cookies::CookieJar>,
}
impl HttpClient {
pub fn new() -> Result<Self, Error> {
HttpClientBuilder::default().build()
}
pub(crate) fn shared() -> &'static Self {
static SHARED: LazyLock<HttpClient> =
LazyLock::new(|| HttpClient::new().expect("shared client failed to initialize"));
&SHARED
}
pub fn builder() -> HttpClientBuilder {
HttpClientBuilder::default()
}
#[cfg(feature = "cookies")]
pub fn cookie_jar(&self) -> Option<&crate::cookies::CookieJar> {
self.inner.cookie_jar.as_ref()
}
#[inline]
pub fn get<U>(&self, uri: U) -> Result<Response<Body>, Error>
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
{
match http::Request::get(uri).body(()) {
Ok(request) => self.send(request),
Err(e) => Err(Error::from_any(e)),
}
}
pub fn get_async<U>(&self, uri: U) -> ResponseFuture<'_>
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
{
match http::Request::get(uri).body(()) {
Ok(request) => self.send_async(request),
Err(e) => ResponseFuture::error(Error::from_any(e)),
}
}
#[inline]
pub fn head<U>(&self, uri: U) -> Result<Response<Body>, Error>
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
{
match http::Request::head(uri).body(()) {
Ok(request) => self.send(request),
Err(e) => Err(Error::from_any(e)),
}
}
pub fn head_async<U>(&self, uri: U) -> ResponseFuture<'_>
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
{
match http::Request::head(uri).body(()) {
Ok(request) => self.send_async(request),
Err(e) => ResponseFuture::error(Error::from_any(e)),
}
}
#[inline]
pub fn post<U, B>(&self, uri: U, body: B) -> Result<Response<Body>, Error>
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
B: Into<Body>,
{
match http::Request::post(uri).body(body) {
Ok(request) => self.send(request),
Err(e) => Err(Error::from_any(e)),
}
}
pub fn post_async<U, B>(&self, uri: U, body: B) -> ResponseFuture<'_>
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
B: Into<AsyncBody>,
{
match http::Request::post(uri).body(body) {
Ok(request) => self.send_async(request),
Err(e) => ResponseFuture::error(Error::from_any(e)),
}
}
#[inline]
pub fn put<U, B>(&self, uri: U, body: B) -> Result<Response<Body>, Error>
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
B: Into<Body>,
{
match http::Request::put(uri).body(body) {
Ok(request) => self.send(request),
Err(e) => Err(Error::from_any(e)),
}
}
pub fn put_async<U, B>(&self, uri: U, body: B) -> ResponseFuture<'_>
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
B: Into<AsyncBody>,
{
match http::Request::put(uri).body(body) {
Ok(request) => self.send_async(request),
Err(e) => ResponseFuture::error(Error::from_any(e)),
}
}
#[inline]
pub fn delete<U>(&self, uri: U) -> Result<Response<Body>, Error>
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
{
match http::Request::delete(uri).body(()) {
Ok(request) => self.send(request),
Err(e) => Err(Error::from_any(e)),
}
}
pub fn delete_async<U>(&self, uri: U) -> ResponseFuture<'_>
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
{
match http::Request::delete(uri).body(()) {
Ok(request) => self.send_async(request),
Err(e) => ResponseFuture::error(Error::from_any(e)),
}
}
pub fn send<B>(&self, request: Request<B>) -> Result<Response<Body>, Error>
where
B: Into<Body>,
{
let span = tracing::debug_span!(
"send",
method = ?request.method(),
uri = ?request.uri(),
);
let mut writer_maybe = None;
let request = request.map(|body| {
let (async_body, writer) = body.into().into_async();
writer_maybe = writer;
async_body
});
let response = block_on(
async move {
if let Some(mut writer) = writer_maybe {
let (response, _) = try_zip(self.send_async_inner(request), async move {
writer.write().await.map_err(Error::from)
})
.await?;
Ok(response)
} else {
self.send_async_inner(request).await
}
}
.instrument(span),
)?;
Ok(response.map(|body| body.into_sync()))
}
#[inline]
pub fn send_async<B>(&self, request: Request<B>) -> ResponseFuture<'_>
where
B: Into<AsyncBody>,
{
let span = tracing::debug_span!(
"send_async",
method = ?request.method(),
uri = ?request.uri(),
);
ResponseFuture::new(
self.send_async_inner(request.map(Into::into))
.instrument(span),
)
}
async fn send_async_inner(
&self,
mut request: Request<AsyncBody>,
) -> Result<Response<AsyncBody>, Error> {
if let Some(config) = request.extensions_mut().get_mut::<RequestConfig>() {
config.merge(&self.inner.request_config);
} else {
request
.extensions_mut()
.insert(self.inner.request_config.clone());
}
let ctx = interceptor::Context {
invoker: Arc::new(self),
interceptors: &self.inner.interceptors,
};
ctx.send(request).await
}
fn create_easy_handle(
&self,
mut request: Request<AsyncBody>,
) -> Result<
(
curl::easy::Easy2<RequestHandler>,
impl Future<Output = Result<Response<ResponseBodyReader>, Error>>,
),
curl::Error,
> {
let body = std::mem::take(request.body_mut());
let has_body = !body.is_empty();
let body_length = body.len();
let (handler, future) = RequestHandler::new(body);
let mut easy = curl::easy::Easy2::new(handler);
easy.verbose(easy.get_ref().is_debug_enabled())?;
if self.inner.client_config.close_connections {
easy.get_mut().disable_connection_reuse_log = true;
}
easy.signal(false)?;
let request_config = request.extensions().get::<RequestConfig>().unwrap();
request_config.set_opt(&mut easy)?;
self.inner.client_config.set_opt(&mut easy)?;
let disable_expect_header = request_config
.expect_continue
.as_ref()
.map(|x| x.is_disabled())
.unwrap_or_default();
match (request.method(), has_body) {
(&http::Method::GET, false) => {
easy.get(true)?;
}
(&http::Method::HEAD, false) => {
easy.nobody(true)?;
}
(&http::Method::POST, true) => {
easy.post(true)?;
}
(&http::Method::PUT, true) => {
easy.upload(true)?;
}
(method, has_body) => {
easy.upload(has_body)?;
easy.custom_request(method.as_str())?;
}
}
easy.url(&uri_to_string(request.uri()))?;
if has_body {
let body_length = request
.headers()
.get("Content-Length")
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse().ok())
.or(body_length);
if let Some(len) = body_length {
if request.method() == http::Method::POST {
easy.post_field_size(len)?;
} else {
easy.in_filesize(len)?;
}
} else {
request.headers_mut().insert(
"Transfer-Encoding",
http::header::HeaderValue::from_static("chunked"),
);
}
}
let mut headers = curl::easy::List::new();
let title_case = request
.extensions()
.get::<RequestConfig>()
.unwrap()
.title_case_headers
.unwrap_or(false);
for (name, value) in request.headers().iter() {
headers.append(&header_to_curl_string(name, value, title_case))?;
}
if disable_expect_header {
headers.append("Expect:")?;
}
easy.http_headers(headers)?;
Ok((easy, future))
}
}
impl crate::interceptor::Invoke for &HttpClient {
fn invoke(
&self,
mut request: Request<AsyncBody>,
) -> crate::interceptor::InterceptorFuture<'_, Error> {
Box::pin(async move {
let is_head_request = request.method() == http::Method::HEAD;
request
.headers_mut()
.entry(http::header::USER_AGENT)
.or_insert(USER_AGENT.parse().unwrap());
let is_automatic_decompression = request
.extensions()
.get::<RequestConfig>()
.unwrap()
.automatic_decompression
.unwrap_or(false);
let (easy, future) = self.create_easy_handle(request).map_err(Error::from_any)?;
self.inner.agent.submit_request(easy)?;
let response = future.await?;
let body_len = response.content_length().filter(|_| {
if is_automatic_decompression {
if let Some(value) = response.headers().get(http::header::CONTENT_ENCODING) {
if value != "identity" {
return false;
}
}
}
true
});
Ok(response.map(|reader| {
if is_head_request {
AsyncBody::empty()
} else {
let body = ResponseBody {
inner: reader,
_client: (*self).clone(),
};
if let Some(len) = body_len {
AsyncBody::from_reader_sized(body, len)
} else {
AsyncBody::from_reader(body)
}
}
}))
})
}
}
impl fmt::Debug for HttpClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HttpClient").finish()
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ResponseFuture<'c>(Pin<Box<dyn Future<Output = <Self as Future>::Output> + 'c + Send>>);
impl<'c> ResponseFuture<'c> {
fn new<F>(future: F) -> Self
where
F: Future<Output = <Self as Future>::Output> + Send + 'c,
{
ResponseFuture(Box::pin(future))
}
fn error(error: Error) -> Self {
Self::new(async move { Err(error) })
}
}
impl Future for ResponseFuture<'_> {
type Output = Result<Response<AsyncBody>, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.as_mut().poll(cx)
}
}
impl<'c> fmt::Debug for ResponseFuture<'c> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ResponseFuture").finish()
}
}
struct ResponseBody {
inner: ResponseBodyReader,
_client: HttpClient,
}
impl AsyncRead for ResponseBody {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let inner = Pin::new(&mut self.inner);
inner.poll_read(cx, buf)
}
}
fn uri_to_string(uri: &http::Uri) -> String {
let mut s = String::new();
if let Some(scheme) = uri.scheme() {
s.push_str(scheme.as_str());
s.push_str("://");
}
if let Some(authority) = uri.authority() {
s.push_str(authority.as_str());
}
s.push_str(uri.path());
if let Some(query) = uri.query() {
s.push('?');
s.push_str(query);
}
s
}
#[cfg(test)]
mod tests {
use super::*;
static_assertions::assert_impl_all!(HttpClient: Send, Sync);
static_assertions::assert_impl_all!(HttpClientBuilder: Send);
#[test]
fn test_default_header() {
let client = HttpClientBuilder::new()
.default_header("some-key", "some-value")
.build();
match client {
Ok(_) => assert!(true),
Err(_) => assert!(false),
}
}
#[test]
fn test_default_headers_mut() {
let mut builder = HttpClientBuilder::new().default_header("some-key", "some-value");
let headers_map = &mut builder.default_headers;
assert!(headers_map.len() == 1);
let mut builder = HttpClientBuilder::new()
.default_header("some-key", "some-value1")
.default_header("some-key", "some-value2");
let headers_map = &mut builder.default_headers;
assert!(headers_map.len() == 2);
let mut builder = HttpClientBuilder::new();
let header_map = &mut builder.default_headers;
assert!(header_map.is_empty())
}
}