use crate::{
blob::Blob,
body::AsyncBody,
error::{Error, ErrorKind},
metrics::Metrics,
parsing::{parse_header, parse_status_line},
response::{LocalAddr, RemoteAddr},
trailer::TrailerWriter,
};
use async_channel::Sender;
use curl::easy::{InfoType, ReadError, SeekResult, WriteError};
use curl_sys::{CURL, CURLE_OK, CURLoption};
use futures_lite::io::{AsyncRead, AsyncWrite};
use http::Response;
use sluice::pipe;
use std::{
ascii,
collections::HashMap,
ffi::CStr,
fmt,
future::Future,
io, mem,
net::SocketAddr,
os::raw::{c_char, c_long},
pin::Pin,
ptr,
sync::{Arc, OnceLock},
task::{Context, Poll, Waker},
};
pub(crate) struct RequestBody(pub(crate) AsyncBody);
impl Clone for RequestBody {
fn clone(&self) -> Self {
panic!("RequestBody extension cannot be cloned");
}
}
pub(crate) struct RequestHandler {
span: tracing::Span,
shared: Arc<Shared>,
sender: Option<Sender<Result<http::response::Builder, Error>>>,
request_body: AsyncBody,
request_body_waker: Option<Waker>,
response_status_code: Option<http::StatusCode>,
response_version: Option<http::Version>,
response_headers: http::HeaderMap,
response_body_writer: pipe::PipeWriter,
response_body_waker: Option<Waker>,
response_trailer_writer: TrailerWriter,
metrics: Option<Metrics>,
blobs: HashMap<CURLoption, Blob>,
handle: *mut CURL,
pub(crate) disable_connection_reuse_log: bool,
}
unsafe impl Send for RequestHandler {}
pub(crate) type ResponseResult = Result<Response<ResponseBodyReader>, Error>;
#[derive(Debug, Default)]
struct Shared {
result: OnceLock<Result<(), Error>>,
}
impl RequestHandler {
pub(crate) fn new(request_body: AsyncBody) -> (Self, impl Future<Output = ResponseResult>) {
let (sender, receiver) = async_channel::bounded(1);
let shared = Arc::new(Shared::default());
let (response_body_reader, response_body_writer) = pipe::pipe();
let handler = Self {
span: tracing::debug_span!("handler", id = tracing::field::Empty),
sender: Some(sender),
shared: shared.clone(),
request_body,
request_body_waker: None,
response_status_code: None,
response_version: None,
response_headers: http::HeaderMap::new(),
response_body_writer,
response_body_waker: None,
response_trailer_writer: TrailerWriter::new(),
metrics: None,
blobs: Default::default(),
handle: ptr::null_mut(),
disable_connection_reuse_log: false,
};
let future = async move {
let builder = receiver
.recv()
.await
.map_err(|e| Error::new(ErrorKind::Unknown, e))??;
let reader = ResponseBodyReader {
inner: response_body_reader,
shared,
};
builder
.body(reader)
.map_err(|e| Error::new(ErrorKind::ProtocolViolation, e))
};
(handler, future)
}
pub(crate) fn is_debug_enabled(&self) -> bool {
if !self.span.is_none() {
return true;
}
#[cfg(feature = "log")]
if log::log_enabled!(log::Level::Debug) {
return true;
}
false
}
fn is_future_canceled(&self) -> bool {
self.sender.as_ref().map(Sender::is_closed).unwrap_or(false)
}
pub(crate) fn init(
&mut self,
id: usize,
handle: *mut CURL,
request_waker: Waker,
response_waker: Waker,
) {
let _enter = self.span.enter();
debug_assert!(self.request_body_waker.is_none());
debug_assert!(self.response_body_waker.is_none());
self.span.record("id", id);
self.handle = handle;
self.request_body_waker = Some(request_waker);
self.response_body_waker = Some(response_waker);
}
pub(crate) fn set_result(&mut self, result: Result<(), Error>) {
let result = result.map_err(|mut e| {
if let Some(addr) = self.get_local_addr() {
e = e.with_local_addr(addr);
}
if let Some(addr) = self.get_primary_addr() {
e = e.with_remote_addr(addr);
}
e
});
if self.shared.result.set(result).is_err() {
tracing::debug!("attempted to set error multiple times");
}
if let Some(metrics) = self.metrics.as_ref() {
metrics.update_from_info_complete(self.handle);
}
self.response_trailer_writer.flush();
self.complete_response_future();
}
fn complete_response_future(&mut self) {
if let Some(sender) = self.sender.take() {
let result = if let Some(Err(e)) = self.shared.result.get() {
tracing::warn!("request completed with error: {}", e);
Err(e.clone())
} else {
Ok(self.build_response())
};
if sender.try_send(result).is_err() {
tracing::debug!("request canceled by user");
}
}
}
fn build_response(&mut self) -> http::response::Builder {
let mut builder = http::Response::builder();
if let Some(status) = self.response_status_code {
builder = builder.status(status);
}
if let Some(version) = self.response_version {
builder = builder.version(version);
}
if let Some(headers) = builder.headers_mut() {
headers.extend(self.response_headers.drain());
}
if let Some(addr) = self.get_local_addr() {
builder = builder.extension(LocalAddr(addr));
}
if let Some(addr) = self.get_primary_addr() {
builder = builder.extension(RemoteAddr(addr));
}
builder = builder.extension(RequestBody(mem::take(&mut self.request_body)));
builder = builder.extension(self.response_trailer_writer.trailer());
if let Some(metrics) = self.metrics.clone() {
builder = builder.extension(metrics);
}
builder
}
fn get_primary_addr(&mut self) -> Option<SocketAddr> {
let ip = self.get_primary_ip()?.parse().ok()?;
let port = self.get_primary_port()?;
Some(SocketAddr::new(ip, port))
}
fn get_primary_ip(&mut self) -> Option<&str> {
if self.handle.is_null() {
return None;
}
let mut ptr = ptr::null::<c_char>();
unsafe {
if curl_sys::curl_easy_getinfo(self.handle, curl_sys::CURLINFO_PRIMARY_IP, &mut ptr)
!= curl_sys::CURLE_OK
{
return None;
}
}
if ptr.is_null() {
return None;
}
unsafe { CStr::from_ptr(ptr) }.to_str().ok()
}
fn get_primary_port(&mut self) -> Option<u16> {
if self.handle.is_null() {
return None;
}
let mut port: c_long = 0;
unsafe {
if curl_sys::curl_easy_getinfo(self.handle, curl_sys::CURLINFO_PRIMARY_PORT, &mut port)
!= curl_sys::CURLE_OK
{
return None;
}
}
Some(port as u16)
}
fn get_local_addr(&mut self) -> Option<SocketAddr> {
let ip = self.get_local_ip()?.parse().ok()?;
let port = self.get_local_port()?;
Some(SocketAddr::new(ip, port))
}
fn get_local_ip(&mut self) -> Option<&str> {
if self.handle.is_null() {
return None;
}
let mut ptr = ptr::null::<c_char>();
unsafe {
if curl_sys::curl_easy_getinfo(self.handle, curl_sys::CURLINFO_LOCAL_IP, &mut ptr)
!= curl_sys::CURLE_OK
{
return None;
}
}
if ptr.is_null() {
return None;
}
unsafe { CStr::from_ptr(ptr) }.to_str().ok()
}
fn get_local_port(&mut self) -> Option<u16> {
if self.handle.is_null() {
return None;
}
let mut port: c_long = 0;
unsafe {
if curl_sys::curl_easy_getinfo(self.handle, curl_sys::CURLINFO_LOCAL_PORT, &mut port)
!= curl_sys::CURLE_OK
{
return None;
}
}
Some(port as u16)
}
}
impl curl::easy::Handler for RequestHandler {
fn header(&mut self, data: &[u8]) -> bool {
if self.is_future_canceled() {
return false;
}
let span = tracing::trace_span!(parent: &self.span, "header");
let _enter = span.enter();
if self.sender.is_none() {
if let Some(trailer_headers) = self.response_trailer_writer.get_mut() {
if let Some((name, value)) = parse_header(data) {
trailer_headers.append(name, value);
return true;
}
}
}
if let Some((version, status)) = parse_status_line(data) {
self.response_version = Some(version);
self.response_status_code = Some(status);
self.response_headers.clear();
return true;
}
if let Some((name, value)) = parse_header(data) {
self.response_headers.append(name, value);
return true;
}
if data == b"\r\n" {
return true;
}
false
}
fn read(&mut self, data: &mut [u8]) -> Result<usize, ReadError> {
if self.is_future_canceled() {
return Err(ReadError::Abort);
}
let span = tracing::trace_span!(parent: &self.span, "read");
let _enter = span.enter();
if let Some(waker) = self.request_body_waker.as_ref() {
let mut context = Context::from_waker(waker);
match Pin::new(&mut self.request_body).poll_read(&mut context, data) {
Poll::Pending => Err(ReadError::Pause),
Poll::Ready(Ok(len)) => Ok(len),
Poll::Ready(Err(e)) => {
tracing::error!("error reading request body: {}", e);
self.set_result(Err(e.into()));
Err(ReadError::Abort)
}
}
} else {
tracing::error!("request has not been initialized!");
Err(ReadError::Abort)
}
}
fn seek(&mut self, whence: io::SeekFrom) -> SeekResult {
let span = tracing::trace_span!(parent: &self.span, "seek", whence = ?whence);
let _enter = span.enter();
if whence == io::SeekFrom::Start(0) && self.request_body.reset() {
SeekResult::Ok
} else {
tracing::warn!("seek requested for request body, but it is not supported");
SeekResult::CantSeek
}
}
fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
let span = tracing::trace_span!(parent: &self.span, "write");
let _enter = span.enter();
tracing::trace!("received {} bytes of data", data.len());
self.complete_response_future();
if let Some(waker) = self.response_body_waker.as_ref() {
let mut context = Context::from_waker(waker);
match Pin::new(&mut self.response_body_writer).poll_write(&mut context, data) {
Poll::Pending => Err(WriteError::Pause),
Poll::Ready(Ok(len)) => Ok(len),
Poll::Ready(Err(e)) => {
if e.kind() == io::ErrorKind::BrokenPipe {
if !self.disable_connection_reuse_log
&& self.response_version < Some(http::Version::HTTP_2)
{
tracing::info!(
"\
response dropped without fully consuming the response body, connection won't be reused\n\
Aborting a response without fully consuming the response body can result in sub-optimal \
performance. See https://github.com/sagebind/isahc/wiki/Connection-Reuse#closing-connections-early."
);
}
} else {
tracing::error!("error writing response body to buffer: {}", e);
}
Ok(0)
}
}
} else {
tracing::error!("request has not been initialized!");
Ok(0)
}
}
fn progress(&mut self, dltotal: f64, dlnow: f64, ultotal: f64, ulnow: f64) -> bool {
let metrics = self.metrics.get_or_insert_with(Metrics::new);
metrics.update_from_progress(self.handle, dltotal, dlnow, ultotal, ulnow);
true
}
fn debug(&mut self, kind: InfoType, data: &[u8]) {
let _enter = self.span.enter();
struct FormatAscii<T>(T);
impl<T: AsRef<[u8]>> fmt::Display for FormatAscii<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for &byte in self.0.as_ref() {
ascii::escape_default(byte).fmt(f)?;
}
Ok(())
}
}
match kind {
InfoType::Text => {
tracing::debug!("{}", String::from_utf8_lossy(data).trim_end())
}
InfoType::HeaderIn | InfoType::DataIn => {
tracing::trace!(target: "isahc::wire", "<< {}", FormatAscii(data))
}
InfoType::HeaderOut | InfoType::DataOut => {
tracing::trace!(target: "isahc::wire", ">> {}", FormatAscii(data))
}
_ => (),
}
}
}
pub(crate) trait BlobOptions {
unsafe fn setopt_blob_nocopy(
&mut self,
option: CURLoption,
data: &Blob,
) -> Result<(), curl::Error>;
}
impl BlobOptions for curl::easy::Easy2<RequestHandler> {
unsafe fn setopt_blob_nocopy(
&mut self,
option: CURLoption,
blob: &Blob,
) -> Result<(), curl::Error> {
let code = unsafe { curl_sys::curl_easy_setopt(self.raw(), option, blob.as_raw_ptr()) };
if code == CURLE_OK {
self.get_mut().blobs.insert(option, blob.clone());
Ok(())
} else {
let mut err = curl::Error::new(code);
if let Some(msg) = self.take_error_buf() {
err.set_extra(msg);
}
Err(err)
}
}
}
impl Drop for RequestHandler {
fn drop(&mut self) {
if self.sender.is_some() {
tracing::warn!("fixme: request handler dropped before response was completed");
}
self.blobs.clear();
}
}
impl fmt::Debug for RequestHandler {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "RequestHandler")
}
}
pub(crate) struct ResponseBodyReader {
inner: pipe::PipeReader,
shared: Arc<Shared>,
}
impl AsyncRead for ResponseBodyReader {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let inner = Pin::new(&mut self.inner);
match inner.poll_read(cx, buf) {
Poll::Ready(Ok(0)) => match self.shared.result.get() {
Some(Ok(())) => Poll::Ready(Ok(0)),
Some(Err(e)) => Poll::Ready(Err(io::Error::from(e.clone()))),
None => Poll::Ready(Err(io::ErrorKind::ConnectionAborted.into())),
},
poll => poll,
}
}
}
impl Metrics {
fn update_from_info_complete(&self, handle: *mut CURL) {
if handle.is_null() {
return;
}
let mut upload_total: f64 = 0.0;
let mut download_total: f64 = 0.0;
unsafe {
curl_sys::curl_easy_getinfo(handle, curl_sys::CURLINFO_SIZE_UPLOAD, &mut upload_total);
curl_sys::curl_easy_getinfo(
handle,
curl_sys::CURLINFO_SIZE_DOWNLOAD,
&mut download_total,
);
}
self.update_from_progress(
handle,
download_total,
download_total,
upload_total,
upload_total,
);
}
fn update_from_progress(
&self,
handle: *mut CURL,
dltotal: f64,
dlnow: f64,
ultotal: f64,
ulnow: f64,
) {
self.inner.upload_total.store(ultotal);
self.inner.upload_progress.store(ulnow);
self.inner.download_total.store(dltotal);
self.inner.download_progress.store(dlnow);
if !handle.is_null() {
unsafe {
curl_sys::curl_easy_getinfo(
handle,
curl_sys::CURLINFO_SPEED_UPLOAD,
self.inner.upload_speed.as_ptr(),
);
curl_sys::curl_easy_getinfo(
handle,
curl_sys::CURLINFO_SPEED_DOWNLOAD,
self.inner.download_speed.as_ptr(),
);
curl_sys::curl_easy_getinfo(
handle,
curl_sys::CURLINFO_NAMELOOKUP_TIME,
self.inner.namelookup_time.as_ptr(),
);
curl_sys::curl_easy_getinfo(
handle,
curl_sys::CURLINFO_CONNECT_TIME,
self.inner.connect_time.as_ptr(),
);
curl_sys::curl_easy_getinfo(
handle,
curl_sys::CURLINFO_APPCONNECT_TIME,
self.inner.appconnect_time.as_ptr(),
);
curl_sys::curl_easy_getinfo(
handle,
curl_sys::CURLINFO_PRETRANSFER_TIME,
self.inner.pretransfer_time.as_ptr(),
);
curl_sys::curl_easy_getinfo(
handle,
curl_sys::CURLINFO_STARTTRANSFER_TIME,
self.inner.starttransfer_time.as_ptr(),
);
curl_sys::curl_easy_getinfo(
handle,
curl_sys::CURLINFO_TOTAL_TIME,
self.inner.total_time.as_ptr(),
);
curl_sys::curl_easy_getinfo(
handle,
curl_sys::CURLINFO_REDIRECT_TIME,
self.inner.redirect_time.as_ptr(),
);
}
}
}
}