#[cfg(any(
feature = "native-tls",
feature = "__rustls",
))]
use std::any::Any;
use std::convert::TryInto;
use std::fmt;
use std::future::Future;
use std::net::IpAddr;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use http::header::HeaderValue;
use log::{error, trace};
use tokio::sync::{mpsc, oneshot};
use super::request::{Request, RequestBuilder};
use super::response::Response;
use super::wait;
use crate::{async_impl, header, IntoUrl, Method, Proxy, redirect};
#[cfg(feature = "__tls")]
use crate::{Certificate, Identity};
#[derive(Clone)]
pub struct Client {
inner: ClientHandle,
}
#[must_use]
pub struct ClientBuilder {
inner: async_impl::ClientBuilder,
timeout: Timeout,
}
impl Default for ClientBuilder {
fn default() -> Self {
Self::new()
}
}
impl ClientBuilder {
pub fn new() -> ClientBuilder {
ClientBuilder {
inner: async_impl::ClientBuilder::new(),
timeout: Timeout::default(),
}
}
pub fn build(self) -> crate::Result<Client> {
ClientHandle::new(self).map(|handle| Client { inner: handle })
}
pub fn user_agent<V>(self, value: V) -> ClientBuilder
where
V: TryInto<HeaderValue>,
V::Error: Into<http::Error>,
{
self.with_inner(move |inner| inner.user_agent(value))
}
pub fn default_headers(self, headers: header::HeaderMap) -> ClientBuilder {
self.with_inner(move |inner| inner.default_headers(headers))
}
#[cfg(feature = "cookies")]
pub fn cookie_store(self, enable: bool) -> ClientBuilder {
self.with_inner(|inner| inner.cookie_store(enable))
}
#[cfg(feature = "gzip")]
pub fn gzip(self, enable: bool) -> ClientBuilder {
self.with_inner(|inner| inner.gzip(enable))
}
#[cfg(feature = "brotli")]
pub fn brotli(self, enable: bool) -> ClientBuilder { self.with_inner(|inner| inner.brotli(enable)) }
pub fn no_gzip(self) -> ClientBuilder {
self.with_inner(|inner| inner.no_gzip())
}
pub fn no_brotli(self) -> ClientBuilder { self.with_inner(|inner| inner.no_brotli()) }
pub fn redirect(self, policy: redirect::Policy) -> ClientBuilder {
self.with_inner(move |inner| inner.redirect(policy))
}
pub fn referer(self, enable: bool) -> ClientBuilder {
self.with_inner(|inner| inner.referer(enable))
}
pub fn proxy(self, proxy: Proxy) -> ClientBuilder {
self.with_inner(move |inner| inner.proxy(proxy))
}
pub fn no_proxy(self) -> ClientBuilder {
self.with_inner(move |inner| inner.no_proxy())
}
pub fn timeout<T>(mut self, timeout: T) -> ClientBuilder
where
T: Into<Option<Duration>>,
{
self.timeout = Timeout(timeout.into());
self
}
pub fn connect_timeout<T>(self, timeout: T) -> ClientBuilder
where
T: Into<Option<Duration>>,
{
let timeout = timeout.into();
if let Some(dur) = timeout {
self.with_inner(|inner| inner.connect_timeout(dur))
} else {
self
}
}
pub fn connection_verbose(self, verbose: bool) -> ClientBuilder {
self.with_inner(move |inner| inner.connection_verbose(verbose))
}
pub fn pool_idle_timeout<D>(self, val: D) -> ClientBuilder
where
D: Into<Option<Duration>>,
{
self.with_inner(|inner| inner.pool_idle_timeout(val))
}
pub fn pool_max_idle_per_host(self, max: usize) -> ClientBuilder {
self.with_inner(move |inner| inner.pool_max_idle_per_host(max))
}
pub fn http1_title_case_headers(self) -> ClientBuilder {
self.with_inner(|inner| inner.http1_title_case_headers())
}
pub fn http2_prior_knowledge(self) -> ClientBuilder {
self.with_inner(|inner| inner.http2_prior_knowledge())
}
pub fn http2_initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> ClientBuilder {
self.with_inner(|inner| inner.http2_initial_stream_window_size(sz))
}
pub fn http2_initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> ClientBuilder {
self.with_inner(|inner| inner.http2_initial_connection_window_size(sz))
}
pub fn tcp_nodelay(self, enabled: bool) -> ClientBuilder {
self.with_inner(move |inner| inner.tcp_nodelay(enabled))
}
pub fn local_address<T>(self, addr: T) -> ClientBuilder
where
T: Into<Option<IpAddr>>,
{
self.with_inner(move |inner| inner.local_address(addr))
}
pub fn tcp_keepalive<D>(self, val: D) -> ClientBuilder
where
D: Into<Option<Duration>>,
{
self.with_inner(move |inner| inner.tcp_keepalive(val))
}
#[cfg(feature = "__tls")]
pub fn add_root_certificate(self, cert: Certificate) -> ClientBuilder {
self.with_inner(move |inner| inner.add_root_certificate(cert))
}
#[cfg(feature = "__tls")]
pub fn identity(self, identity: Identity) -> ClientBuilder {
self.with_inner(move |inner| inner.identity(identity))
}
#[cfg(feature = "native-tls")]
pub fn danger_accept_invalid_hostnames(self, accept_invalid_hostname: bool) -> ClientBuilder {
self.with_inner(|inner| inner.danger_accept_invalid_hostnames(accept_invalid_hostname))
}
#[cfg(feature = "__tls")]
pub fn danger_accept_invalid_certs(self, accept_invalid_certs: bool) -> ClientBuilder {
self.with_inner(|inner| inner.danger_accept_invalid_certs(accept_invalid_certs))
}
#[cfg(feature = "native-tls")]
pub fn use_native_tls(self) -> ClientBuilder {
self.with_inner(move |inner| inner.use_native_tls())
}
#[cfg(feature = "__rustls")]
pub fn use_rustls_tls(self) -> ClientBuilder {
self.with_inner(move |inner| inner.use_rustls_tls())
}
#[cfg(any(
feature = "native-tls",
feature = "__rustls",
))]
pub fn use_preconfigured_tls(self, tls: impl Any) -> ClientBuilder {
self.with_inner(move |inner| inner.use_preconfigured_tls(tls))
}
#[cfg(feature = "trust-dns")]
pub fn trust_dns(self, enable: bool) -> ClientBuilder {
self.with_inner(|inner| inner.trust_dns(enable))
}
pub fn no_trust_dns(self) -> ClientBuilder {
self.with_inner(|inner| inner.no_trust_dns())
}
pub fn https_only(self, enabled: bool) -> ClientBuilder {
self.with_inner(|inner| inner.https_only(enabled))
}
fn with_inner<F>(mut self, func: F) -> ClientBuilder
where
F: FnOnce(async_impl::ClientBuilder) -> async_impl::ClientBuilder,
{
self.inner = func(self.inner);
self
}
}
impl From<async_impl::ClientBuilder> for ClientBuilder {
fn from(builder: async_impl::ClientBuilder) -> Self {
Self {
inner: builder,
timeout: Timeout::default(),
}
}
}
impl Default for Client {
fn default() -> Self {
Self::new()
}
}
impl Client {
pub fn new() -> Client {
ClientBuilder::new().build().expect("Client::new()")
}
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::GET, url)
}
pub fn post<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::POST, url)
}
pub fn put<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::PUT, url)
}
pub fn patch<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::PATCH, url)
}
pub fn delete<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::DELETE, url)
}
pub fn head<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::HEAD, url)
}
pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
let req = url.into_url().map(move |url| Request::new(method, url));
RequestBuilder::new(self.clone(), req)
}
pub fn execute(&self, request: Request) -> crate::Result<Response> {
self.inner.execute_request(request)
}
}
impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Client")
.finish()
}
}
impl fmt::Debug for ClientBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.inner.fmt(f)
}
}
#[derive(Clone)]
struct ClientHandle {
timeout: Timeout,
inner: Arc<InnerClientHandle>,
}
type OneshotResponse = oneshot::Sender<crate::Result<async_impl::Response>>;
type ThreadSender = mpsc::UnboundedSender<(async_impl::Request, OneshotResponse)>;
struct InnerClientHandle {
tx: Option<ThreadSender>,
thread: Option<thread::JoinHandle<()>>,
}
impl Drop for InnerClientHandle {
fn drop(&mut self) {
let id = self.thread
.as_ref()
.map(|h| h.thread().id())
.expect("thread not dropped yet");
trace!("closing runtime thread ({:?})", id);
self.tx.take();
trace!("signaled close for runtime thread ({:?})", id);
self.thread.take().map(|h| h.join());
trace!("closed runtime thread ({:?})", id);
}
}
impl ClientHandle {
fn new(builder: ClientBuilder) -> crate::Result<ClientHandle> {
let timeout = builder.timeout;
let builder = builder.inner;
let (tx, rx) = mpsc::unbounded_channel::<(async_impl::Request, OneshotResponse)>();
let (spawn_tx, spawn_rx) = oneshot::channel::<crate::Result<()>>();
let handle = thread::Builder::new()
.name("reqwest-internal-sync-runtime".into())
.spawn(move || {
use tokio::runtime;
let rt = match runtime::Builder::new_current_thread().enable_all().build().map_err(crate::error::builder) {
Err(e) => {
if let Err(e) = spawn_tx.send(Err(e)) {
error!("Failed to communicate runtime creation failure: {:?}", e);
}
return;
}
Ok(v) => v,
};
let f = async move {
let client = match builder.build() {
Err(e) => {
if let Err(e) = spawn_tx.send(Err(e)) {
error!("Failed to communicate client creation failure: {:?}", e);
}
return;
}
Ok(v) => v,
};
if let Err(e) = spawn_tx.send(Ok(())) {
error!("Failed to communicate successful startup: {:?}", e);
return;
}
let mut rx = rx;
while let Some((req, req_tx)) = rx.recv().await {
let req_fut = client.execute(req);
tokio::spawn(forward(req_fut, req_tx));
}
trace!("({:?}) Receiver is shutdown", thread::current().id());
};
trace!("({:?}) start runtime::block_on", thread::current().id());
rt.block_on(f);
trace!("({:?}) end runtime::block_on", thread::current().id());
drop(rt);
trace!("({:?}) finished", thread::current().id());
})
.map_err(crate::error::builder)?;
match wait::timeout(spawn_rx, None) {
Ok(Ok(())) => (),
Ok(Err(err)) => return Err(err),
Err(_canceled) => event_loop_panicked(),
}
let inner_handle = Arc::new(InnerClientHandle {
tx: Some(tx),
thread: Some(handle),
});
Ok(ClientHandle {
timeout,
inner: inner_handle,
})
}
fn execute_request(&self, req: Request) -> crate::Result<Response> {
let (tx, rx) = oneshot::channel();
let (req, body) = req.into_async();
let url = req.url().clone();
let timeout = req.timeout().copied().or(self.timeout.0);
self.inner
.tx
.as_ref()
.expect("core thread exited early")
.send((req, tx))
.expect("core thread panicked");
let result: Result<crate::Result<async_impl::Response>, wait::Waited<crate::Error>> =
if let Some(body) = body {
let f = async move {
body.send().await?;
rx.await.map_err(|_canceled| event_loop_panicked())
};
wait::timeout(f, timeout)
} else {
let f = async move {
rx.await.map_err(|_canceled| event_loop_panicked())
};
wait::timeout(f, timeout)
};
match result {
Ok(Err(err)) => Err(err.with_url(url)),
Ok(Ok(res)) => Ok(Response::new(
res,
self.timeout.0,
KeepCoreThreadAlive(Some(self.inner.clone())),
)),
Err(wait::Waited::TimedOut(e)) => Err(crate::error::request(e).with_url(url)),
Err(wait::Waited::Inner(err)) => Err(err.with_url(url)),
}
}
}
async fn forward<F>(fut: F, mut tx: OneshotResponse)
where
F: Future<Output = crate::Result<async_impl::Response>>,
{
use std::task::Poll;
futures_util::pin_mut!(fut);
let res = futures_util::future::poll_fn(|cx| {
match fut.as_mut().poll(cx) {
Poll::Ready(val) => Poll::Ready(Some(val)),
Poll::Pending => {
futures_core::ready!(tx.poll_closed(cx));
Poll::Ready(None)
}
}
})
.await;
if let Some(res) = res {
let _ = tx.send(res);
}
}
#[derive(Clone, Copy)]
struct Timeout(Option<Duration>);
impl Default for Timeout {
fn default() -> Timeout {
Timeout(Some(Duration::from_secs(30)))
}
}
pub(crate) struct KeepCoreThreadAlive(Option<Arc<InnerClientHandle>>);
impl KeepCoreThreadAlive {
pub(crate) fn empty() -> KeepCoreThreadAlive {
KeepCoreThreadAlive(None)
}
}
#[cold]
#[inline(never)]
fn event_loop_panicked() -> ! {
panic!("event loop thread panicked");
}