use core::time::Duration;
#[cfg(feature = "async")]
use std::future::Future;
use std::io::{self, Read, Write};
use std::net::{TcpStream, ToSocketAddrs};
#[cfg(feature = "async")]
use std::pin::Pin;
#[cfg(feature = "async")]
use std::sync::atomic::{AtomicUsize, Ordering};
#[cfg(feature = "async")]
use std::sync::{Arc, Mutex};
#[cfg(feature = "async")]
use std::task::{Context, Poll};
use std::time::Instant;
#[cfg(all(feature = "async", feature = "proxy"))]
use tokio::io::AsyncReadExt;
#[cfg(feature = "async")]
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf};
#[cfg(feature = "async")]
use tokio::net::TcpStream as AsyncTcpStream;
#[cfg(feature = "async")]
use tokio::sync::Mutex as AsyncMutex;
use crate::request::{ConnectionParams, OwnedConnectionParams, ParsedRequest};
#[cfg(feature = "async")]
use crate::Response;
use crate::{Error, Method, ResponseLazy};
type UnsecuredStream = TcpStream;
#[cfg(feature = "rustls")]
mod rustls_stream;
#[cfg(feature = "rustls")]
type SecuredStream = rustls_stream::SecuredStream;
pub(crate) enum HttpStream {
Unsecured(UnsecuredStream, Option<Instant>),
#[cfg(feature = "rustls")]
Secured(Box<SecuredStream>, Option<Instant>),
#[cfg(feature = "async")]
Buffer(std::io::Cursor<Vec<u8>>),
}
impl HttpStream {
fn create_unsecured(reader: UnsecuredStream, timeout_at: Option<Instant>) -> HttpStream {
HttpStream::Unsecured(reader, timeout_at)
}
#[cfg(feature = "async")]
pub(crate) fn create_buffer(buffer: Vec<u8>) -> HttpStream {
HttpStream::Buffer(std::io::Cursor::new(buffer))
}
}
fn timeout_err() -> io::Error {
io::Error::new(io::ErrorKind::TimedOut, "the timeout of the request was reached")
}
fn timeout_at_to_duration(timeout_at: Option<Instant>) -> Result<Option<Duration>, io::Error> {
if let Some(timeout_at) = timeout_at {
if let Some(duration) = timeout_at.checked_duration_since(Instant::now()) {
Ok(Some(duration))
} else {
Err(timeout_err())
}
} else {
Ok(None)
}
}
impl Read for HttpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let timeout = |tcp: &TcpStream, timeout_at: Option<Instant>| -> io::Result<()> {
let _ = tcp.set_read_timeout(timeout_at_to_duration(timeout_at)?);
Ok(())
};
let result = match self {
HttpStream::Unsecured(inner, timeout_at) => {
timeout(inner, *timeout_at)?;
inner.read(buf)
}
#[cfg(feature = "rustls")]
HttpStream::Secured(inner, timeout_at) => {
timeout(inner.get_ref(), *timeout_at)?;
inner.read(buf)
}
#[cfg(feature = "async")]
HttpStream::Buffer(cursor) => std::io::Read::read(cursor, buf),
};
match result {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
Err(timeout_err())
}
r => r,
}
}
}
fn set_socket_write_timeout(tcp: &TcpStream, timeout_at: Option<Instant>) -> io::Result<()> {
tcp.set_write_timeout(timeout_at_to_duration(timeout_at)?)?;
Ok(())
}
impl Write for HttpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let result = match self {
HttpStream::Unsecured(inner, timeout_at) => {
set_socket_write_timeout(inner, *timeout_at)?;
inner.write(buf)
}
#[cfg(feature = "rustls")]
HttpStream::Secured(inner, timeout_at) => {
set_socket_write_timeout(inner.get_ref(), *timeout_at)?;
inner.write(buf)
}
#[cfg(feature = "async")]
HttpStream::Buffer(_) => {
debug_assert!(false, "We shouldn't write to a pre-loaded stream");
Ok(buf.len())
}
};
match result {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
Err(timeout_err())
}
r => r,
}
}
fn flush(&mut self) -> io::Result<()> {
let result = match self {
HttpStream::Unsecured(inner, timeout_at) => {
set_socket_write_timeout(inner, *timeout_at)?;
inner.flush()
}
#[cfg(feature = "rustls")]
HttpStream::Secured(inner, timeout_at) => {
set_socket_write_timeout(inner.get_ref(), *timeout_at)?;
inner.flush()
}
#[cfg(feature = "async")]
HttpStream::Buffer(_) => {
debug_assert!(false, "We shouldn't write to a pre-loaded stream");
Ok(())
}
};
match result {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
Err(timeout_err())
}
r => r,
}
}
}
#[cfg(any(feature = "async-https-rustls", feature = "async-https-rustls-probe"))]
type AsyncSecuredStream = rustls_stream::AsyncSecuredStream;
#[cfg(feature = "async")]
pub(crate) enum AsyncHttpStream {
Unsecured(AsyncTcpStream),
#[cfg(any(feature = "async-https-rustls", feature = "async-https-rustls-probe"))]
Secured(Box<AsyncSecuredStream>),
}
#[cfg(feature = "async")]
impl AsyncRead for AsyncHttpStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
match &mut *self {
AsyncHttpStream::Unsecured(inner) => Pin::new(inner).poll_read(cx, buf),
#[cfg(any(feature = "async-https-rustls", feature = "async-https-rustls-probe"))]
AsyncHttpStream::Secured(inner) => Pin::new(inner).poll_read(cx, buf),
}
}
}
#[cfg(feature = "async")]
impl AsyncWrite for AsyncHttpStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
match &mut *self {
AsyncHttpStream::Unsecured(inner) => Pin::new(inner).poll_write(cx, buf),
#[cfg(any(feature = "async-https-rustls", feature = "async-https-rustls-probe"))]
AsyncHttpStream::Secured(inner) => Pin::new(inner).poll_write(cx, buf),
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut *self {
AsyncHttpStream::Unsecured(inner) => Pin::new(inner).poll_flush(cx),
#[cfg(any(feature = "async-https-rustls", feature = "async-https-rustls-probe"))]
AsyncHttpStream::Secured(inner) => Pin::new(inner).poll_flush(cx),
}
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut *self {
AsyncHttpStream::Unsecured(inner) => Pin::new(inner).poll_shutdown(cx),
#[cfg(any(feature = "async-https-rustls", feature = "async-https-rustls-probe"))]
AsyncHttpStream::Secured(inner) => Pin::new(inner).poll_shutdown(cx),
}
}
}
#[cfg(feature = "async")]
struct AsyncConnectionState {
write: AsyncMutex<WriteHalf<AsyncHttpStream>>,
read: AsyncMutex<ReadHalf<AsyncHttpStream>>,
next_request_id: AtomicUsize,
readable_request_id: AtomicUsize,
min_dropped_reader_id: AtomicUsize,
socket_new_requests_timeout: Mutex<Instant>,
}
#[cfg(feature = "async")]
struct PendingReader<'a> {
min_dropped_reader_id: &'a AtomicUsize,
id: Option<usize>,
}
#[cfg(feature = "async")]
impl<'a> Drop for PendingReader<'a> {
fn drop(&mut self) {
if let Some(reader_id) = self.id {
self.min_dropped_reader_id.fetch_min(reader_id, Ordering::AcqRel);
}
}
}
#[cfg(feature = "async")]
pub struct AsyncConnection(Mutex<Arc<AsyncConnectionState>>);
#[cfg(feature = "async")]
impl AsyncConnection {
pub(crate) async fn new(
params: ConnectionParams<'_>,
timeout_at: Option<Instant>,
) -> Result<AsyncConnection, Error> {
let future = async move {
let socket = Self::connect(params).await?;
if params.https {
#[cfg(not(any(
feature = "async-https-rustls",
feature = "async-https-rustls-probe"
)))]
return Err(Error::HttpsFeatureNotEnabled);
#[cfg(any(feature = "async-https-rustls", feature = "async-https-rustls-probe"))]
rustls_stream::wrap_async_stream(socket, params.host).await
} else {
Ok(AsyncHttpStream::Unsecured(socket))
}
};
let stream = if let Some(timeout_at) = timeout_at {
tokio::time::timeout_at(timeout_at.into(), future)
.await
.unwrap_or(Err(Error::IoError(timeout_err())))?
} else {
future.await?
};
let (read, write) = tokio::io::split(stream);
Ok(AsyncConnection(Mutex::new(Arc::new(AsyncConnectionState {
read: AsyncMutex::new(read),
write: AsyncMutex::new(write),
next_request_id: AtomicUsize::new(0),
readable_request_id: AtomicUsize::new(0),
min_dropped_reader_id: AtomicUsize::new(usize::MAX),
socket_new_requests_timeout: Mutex::new(Instant::now() + Duration::from_secs(60)),
}))))
}
async fn tcp_connect(host: &str, port: u16) -> Result<AsyncTcpStream, Error> {
#[cfg(feature = "log")]
log::trace!("Looking up host {host}");
let addrs = tokio::net::lookup_host((host, port)).await.map_err(Error::IoError)?;
let addrs: Vec<_> = addrs.collect();
let addrs_count = addrs.len();
if addrs.is_empty() {
return Err(Error::AddressNotFound);
}
for (i, addr) in addrs.iter().enumerate() {
#[cfg(feature = "log")]
log::trace!("Attempting to connect to {addr} for {host}");
match AsyncTcpStream::connect(addr).await {
Ok(s) => {
#[cfg(feature = "log")]
log::trace!("Connected to {addr} for {host}");
return Ok(s);
}
Err(e) =>
if i == addrs_count - 1 {
return Err(Error::IoError(e));
},
}
}
Err(Error::AddressNotFound)
}
async fn connect(params: ConnectionParams<'_>) -> Result<AsyncTcpStream, Error> {
#[cfg(feature = "proxy")]
match ¶ms.proxy {
Some(proxy) => {
let mut tcp = Self::tcp_connect(&proxy.server, proxy.port).await?;
let proxy_request = proxy.connect(params.host, params.port);
tcp.write_all(proxy_request.as_bytes()).await?;
tcp.flush().await?;
const MAX_PROXY_RESPONSE_SIZE: usize = 16 * 1024;
let mut proxy_response = Vec::new();
let mut buf = [0; 256];
loop {
let n = tcp.read(&mut buf).await?;
if n == 0 {
break;
}
proxy_response.extend_from_slice(&buf[..n]);
if proxy_response.len() > MAX_PROXY_RESPONSE_SIZE {
return Err(Error::ProxyConnect);
}
if n < buf.len() {
break;
}
}
crate::Proxy::verify_response(&proxy_response)?;
Ok(tcp)
}
None => Self::tcp_connect(params.host, params.port).await,
}
#[cfg(not(feature = "proxy"))]
Self::tcp_connect(¶ms.host, params.port).await
}
async fn timeout<O, F: Future<Output = O>>(timeout: Option<Instant>, f: F) -> Result<O, Error> {
if let Some(time) = timeout {
tokio::time::timeout_at(time.into(), f).await.map_err(|_| Error::IoError(timeout_err()))
} else {
Ok(f.await)
}
}
pub(crate) fn send<'a>(
&'a self,
request: ParsedRequest,
) -> Pin<Box<dyn Future<Output = Result<Response, Error>> + Send + 'a>> {
Box::pin(async move {
let conn = Arc::clone(&*self.0.lock().unwrap());
#[cfg(debug_assertions)]
{
let next_read = conn.readable_request_id.load(Ordering::Acquire);
let next_request = conn.next_request_id.load(Ordering::Acquire);
debug_assert!(
next_request >= next_read,
"At all times, the next_request_id should be higher than the readable id"
);
}
let mut read = None;
let mut write = None;
if !request.config.pipelining {
read = Some(Self::timeout(request.timeout_at, conn.read.lock()).await?);
write = Some(Self::timeout(request.timeout_at, conn.write.lock()).await?);
while {
let next_read = conn.readable_request_id.load(Ordering::Relaxed);
let next_request = conn.next_request_id.load(Ordering::Relaxed);
next_request != usize::MAX && next_read < next_request
} {
read.take();
write.take();
tokio::task::yield_now().await;
read = Some(Self::timeout(request.timeout_at, conn.read.lock()).await?);
write = Some(Self::timeout(request.timeout_at, conn.write.lock()).await?);
}
}
macro_rules! retry_new_connection {
(CONNECTION_STATE_UNDEFINED) => {
conn.next_request_id.store(usize::MAX, Ordering::Release);
conn.readable_request_id.store(usize::MAX, Ordering::Release);
retry_new_connection!(_internal);
};
(CONNECTION_STILL_READABLE, $write_lock: ident) => {
conn.next_request_id.store(usize::MAX, Ordering::Release);
core::mem::drop($write_lock);
retry_new_connection!(_internal);
};
(_internal) => {
let new_connection =
AsyncConnection::new(request.connection_params(), request.timeout_at)
.await?;
*self.0.lock().unwrap() = Arc::clone(&*new_connection.0.lock().unwrap());
core::mem::drop(read);
return self.send(request).await;
};
}
let request_id;
let mut this_request =
PendingReader { min_dropped_reader_id: &conn.min_dropped_reader_id, id: None };
{
let mut write = if let Some(write) = write {
write
} else {
Self::timeout(request.timeout_at, conn.write.lock()).await?
};
let socket_timeout = *conn.socket_new_requests_timeout.lock().unwrap();
let socket_timed_out = Instant::now() > socket_timeout;
request_id = conn.next_request_id.fetch_add(1, Ordering::Relaxed);
if request_id == usize::MAX || socket_timed_out {
retry_new_connection!(CONNECTION_STILL_READABLE, write);
}
#[cfg(feature = "log")]
log::trace!(
"Writing HTTP request id {request_id} on connection to {:?}.",
request.connection_params(),
);
this_request.id = Some(request_id);
let write_res =
Self::timeout(request.timeout_at, write.write_all(&request.as_bytes())).await;
match write_res {
Err(e) => {
conn.next_request_id.store(usize::MAX, Ordering::Release);
return Err(e);
}
Ok(Err(ioe)) => {
conn.next_request_id.store(usize::MAX, Ordering::Release);
return Err(Error::IoError(ioe));
}
Ok(Ok(())) => {}
}
}
let mut should_retry = false;
let response = Self::timeout(request.timeout_at, async {
if read.is_none() {
read = Some(Self::timeout(request.timeout_at, conn.read.lock()).await?);
}
while {
let readable = conn.readable_request_id.load(Ordering::Acquire);
if readable == usize::MAX {
debug_assert!(
request.config.pipelining,
"We should never need to re-send a non-pipelined request (as both locks were held and no other pending requests were in-flight)",
);
should_retry = true;
return Err(Error::Other("Retrying pipelining failure"));
}
readable < request_id
} {
debug_assert!(
request.config.pipelining,
"Non-pipelined requests should never need to wait as both locks were held and no other requests were in-filght"
);
read.take();
if conn.min_dropped_reader_id.load(Ordering::Acquire) < request_id {
should_retry = true;
return Err(Error::Other("Retrying pipelining failure"));
}
tokio::task::yield_now().await;
read = Some(conn.read.lock().await);
}
let mut read = read.take().unwrap();
#[cfg(feature = "log")]
log::trace!(
"Reading HTTP response for request id {request_id} on connection to {:?}.",
request.connection_params(),
);
let response = Response::create_async(
&mut *read,
request.config.method == Method::Head,
request.config.max_headers_size,
request.config.max_status_line_len,
request.config.max_body_size,
)
.await?;
let mut found_keep_alive = false;
if let Some(header) = response.headers.get("connection") {
if header.eq_ignore_ascii_case("keep-alive") {
found_keep_alive = true;
}
}
if !found_keep_alive {
conn.next_request_id.store(usize::MAX, Ordering::Release);
conn.readable_request_id.store(usize::MAX, Ordering::Release);
} else {
conn.readable_request_id.fetch_add(1, Ordering::Release);
}
if let Some(header) = response.headers.get("keep-alive") {
for param in header.split(',') {
if let Some((k, v)) = param.trim().split_once('=') {
if let Ok(v) = v.parse::<usize>() {
match k.trim() {
"timeout" => {
let timeout_secs = (v as u64).saturating_sub(1);
*conn.socket_new_requests_timeout.lock().unwrap() =
Instant::now()
.checked_add(Duration::from_secs(timeout_secs))
.unwrap_or(Instant::now());
}
"max" => {
conn.next_request_id.fetch_max(
usize::MAX.saturating_sub(v),
Ordering::AcqRel,
);
}
_ => {
conn.next_request_id.store(usize::MAX, Ordering::Release);
}
}
} else {
conn.next_request_id.store(usize::MAX, Ordering::Release);
}
} else {
conn.next_request_id.store(usize::MAX, Ordering::Release);
}
}
}
this_request.id = None;
Ok(response)
})
.await;
let response = match response {
Ok(Ok(response)) => response,
Err(e) | Ok(Err(e)) => {
if should_retry {
retry_new_connection!(CONNECTION_STATE_UNDEFINED);
} else {
conn.next_request_id.store(usize::MAX, Ordering::Release);
conn.readable_request_id.store(usize::MAX, Ordering::Relaxed);
return Err(e);
}
}
};
core::mem::drop(read);
async_handle_redirects(self, request, response).await
})
}
}
pub struct Connection {
stream: HttpStream,
}
impl Connection {
pub(crate) fn new(
params: ConnectionParams<'_>,
timeout_at: Option<Instant>,
) -> Result<Connection, Error> {
let socket = Self::connect(params, timeout_at)?;
let stream = if params.https {
#[cfg(not(feature = "rustls"))]
return Err(Error::HttpsFeatureNotEnabled);
#[cfg(feature = "rustls")]
{
let tls = rustls_stream::wrap_stream(socket, params.host)?;
HttpStream::Secured(Box::new(tls), timeout_at)
}
} else {
HttpStream::create_unsecured(socket, timeout_at)
};
Ok(Connection { stream })
}
fn tcp_connect(host: &str, port: u16, timeout_at: Option<Instant>) -> Result<TcpStream, Error> {
#[cfg(feature = "log")]
log::trace!("Looking up host {host}");
let addrs = (host, port).to_socket_addrs().map_err(Error::IoError)?;
let addrs_count = addrs.len();
for (i, addr) in addrs.enumerate() {
#[cfg(feature = "log")]
log::trace!("Attempting to connect to {addr} for {host}");
let stream = if let Some(timeout) = timeout_at_to_duration(timeout_at)? {
TcpStream::connect_timeout(&addr, timeout)
} else {
TcpStream::connect(addr)
};
match stream {
Ok(s) => {
#[cfg(feature = "log")]
log::trace!("Connected to {addr} for {host}");
return Ok(s);
}
Err(e) =>
if i == addrs_count - 1 {
return Err(Error::IoError(e));
},
}
}
Err(Error::AddressNotFound)
}
fn connect(
params: ConnectionParams<'_>,
timeout_at: Option<Instant>,
) -> Result<TcpStream, Error> {
#[cfg(feature = "proxy")]
match ¶ms.proxy {
Some(proxy) => {
let mut tcp = Self::tcp_connect(&proxy.server, proxy.port, timeout_at)?;
write!(tcp, "{}", proxy.connect(params.host, params.port))?;
tcp.flush()?;
const MAX_PROXY_RESPONSE_SIZE: usize = 16 * 1024;
let mut proxy_response = Vec::new();
let mut buf = [0; 256];
loop {
let n = tcp.read(&mut buf)?;
if n == 0 {
break;
}
proxy_response.extend_from_slice(&buf[..n]);
if proxy_response.len() > MAX_PROXY_RESPONSE_SIZE {
return Err(Error::ProxyConnect);
}
if n < buf.len() {
break;
}
}
crate::Proxy::verify_response(&proxy_response)?;
Ok(tcp)
}
None => Self::tcp_connect(params.host, params.port, timeout_at),
}
#[cfg(not(feature = "proxy"))]
Self::tcp_connect(params.host, params.port, timeout_at)
}
pub(crate) fn send(mut self, request: ParsedRequest) -> Result<ResponseLazy, Error> {
enforce_timeout(request.timeout_at, move || {
#[cfg(feature = "log")]
log::trace!("Writing HTTP request.");
self.stream.write_all(&request.as_bytes())?;
#[cfg(feature = "log")]
log::trace!("Reading HTTP response.");
let response = ResponseLazy::from_stream(
self.stream,
request.config.max_headers_size,
request.config.max_status_line_len,
request.config.max_body_size,
)?;
handle_redirects(request, response)
})
}
}
fn handle_redirects(
request: ParsedRequest,
mut response: ResponseLazy,
) -> Result<ResponseLazy, Error> {
let status_code = response.status_code;
let url = response.headers.get("location");
match get_redirect(request, status_code, url) {
NextHop::Redirect(request) => {
let (request, _) = request?;
Connection::new(request.connection_params(), request.timeout_at)?.send(request)
}
NextHop::Destination(request) => {
let dst_url = request.url;
dst_url.write_base_url_to(&mut response.url).unwrap();
dst_url.write_resource_to(&mut response.url).unwrap();
Ok(response)
}
}
}
#[cfg(feature = "async")]
async fn async_handle_redirects(
connection: &AsyncConnection,
request: ParsedRequest,
mut response: Response,
) -> Result<Response, Error> {
let status_code = response.status_code;
let url = response.headers.get("location");
match async_get_redirect(request, status_code, url) {
NextHopAsync::Redirect(request) => {
let (request, needs_new_connection) = request?;
let mut connection = connection;
let new_connection;
if needs_new_connection {
new_connection =
AsyncConnection::new(request.connection_params(), request.timeout_at).await?;
connection = &new_connection;
}
connection.send(request).await
}
NextHopAsync::Destination(request) => {
let dst_url = request.url;
dst_url.write_base_url_to(&mut response.url).unwrap();
dst_url.write_resource_to(&mut response.url).unwrap();
Ok(response)
}
}
}
macro_rules! redirect_utils {
($get_redirect: ident, $NextHop: ident, $Response: ident) => {
enum $NextHop {
Redirect(Result<(ParsedRequest, bool), Error>),
Destination(ParsedRequest),
}
fn $get_redirect(
mut request: ParsedRequest,
status_code: i32,
url: Option<&String>,
) -> $NextHop {
match status_code {
301 | 302 | 303 | 307 => {
let url = match url {
Some(url) => url,
None => return $NextHop::Redirect(Err(Error::RedirectLocationMissing)),
};
#[cfg(feature = "log")]
log::debug!("Redirecting ({}) to: {}", status_code, url);
let previous_params: OwnedConnectionParams = request.connection_params().into();
match request.redirect_to(url.as_str()) {
Ok(()) => {
if status_code == 303 {
match request.config.method {
Method::Post | Method::Put | Method::Delete => {
request.config.method = Method::Get;
}
_ => {}
}
}
let needs_new_conn = previous_params != request.connection_params();
$NextHop::Redirect(Ok((request, needs_new_conn)))
}
Err(err) => $NextHop::Redirect(Err(err)),
}
}
_ => $NextHop::Destination(request),
}
}
};
}
redirect_utils!(get_redirect, NextHop, ResponseLazy);
#[cfg(feature = "async")]
redirect_utils!(async_get_redirect, NextHopAsync, Response);
fn enforce_timeout<F, R>(timeout_at: Option<Instant>, f: F) -> Result<R, Error>
where
F: 'static + Send + FnOnce() -> Result<R, Error>,
R: 'static + Send,
{
use std::sync::mpsc::{channel, RecvTimeoutError};
match timeout_at {
Some(deadline) => {
let (sender, receiver) = channel();
let thread = std::thread::spawn(move || {
let result = f();
let _ = sender.send(());
result
});
if let Some(timeout_duration) = deadline.checked_duration_since(Instant::now()) {
match receiver.recv_timeout(timeout_duration) {
Ok(()) => thread.join().unwrap(),
Err(err) => match err {
RecvTimeoutError::Timeout => Err(Error::IoError(timeout_err())),
RecvTimeoutError::Disconnected =>
Err(Error::Other("request connection paniced")),
},
}
} else {
Err(Error::IoError(timeout_err()))
}
}
None => f(),
}
}