use std::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll, ready},
};
use bytes::Bytes;
use http::{Request, Response};
use http_body::Body;
use httparse::ParserConfig;
use tokio::io::{AsyncRead, AsyncWrite};
use crate::client::core::{
Error, Result,
body::Incoming as IncomingBody,
dispatch::{self, TrySendError},
error::BoxError,
http1::Http1Options,
proto,
};
type Dispatcher<T, B> =
proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
pub struct SendRequest<B> {
dispatch: dispatch::Sender<Request<B>, Response<IncomingBody>>,
}
#[derive(Debug)]
#[non_exhaustive]
pub struct Parts<T> {
pub io: T,
pub read_buf: Bytes,
}
#[must_use = "futures do nothing unless polled"]
pub struct Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Body + 'static,
{
inner: Dispatcher<T, B>,
}
impl<T, B> Connection<T, B>
where
T: AsyncRead + AsyncWrite + Unpin,
B: Body + 'static,
B::Error: Into<BoxError>,
{
pub fn into_parts(self) -> Parts<T> {
let (io, read_buf, _) = self.inner.into_inner();
Parts { io, read_buf }
}
}
#[derive(Clone, Debug)]
pub struct Builder {
opts: Http1Options,
}
impl<B> SendRequest<B> {
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.dispatch.poll_ready(cx)
}
pub async fn ready(&mut self) -> Result<()> {
std::future::poll_fn(|cx| self.poll_ready(cx)).await
}
pub fn is_ready(&self) -> bool {
self.dispatch.is_ready()
}
}
impl<B> SendRequest<B>
where
B: Body + 'static,
{
pub fn try_send_request(
&mut self,
req: Request<B>,
) -> impl Future<Output = std::result::Result<Response<IncomingBody>, TrySendError<Request<B>>>>
{
let sent = self.dispatch.try_send(req);
async move {
match sent {
Ok(rx) => match rx.await {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
Err(_) => panic!("dispatch dropped without returning error"),
},
Err(req) => {
debug!("connection was not ready");
let error = Error::new_canceled().with("connection was not ready");
Err(TrySendError {
error,
message: Some(req),
})
}
}
}
}
}
impl<B> fmt::Debug for SendRequest<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendRequest").finish()
}
}
impl<T, B> Connection<T, B>
where
T: AsyncRead + AsyncWrite + Unpin + Send,
B: Body + 'static,
B::Error: Into<BoxError>,
{
pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<T, B> {
upgrades::UpgradeableConnection { inner: Some(self) }
}
}
impl<T, B> fmt::Debug for Connection<T, B>
where
T: AsyncRead + AsyncWrite + fmt::Debug,
B: Body + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection").finish()
}
}
impl<T, B> Future for Connection<T, B>
where
T: AsyncRead + AsyncWrite + Unpin,
B: Body + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
{
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.inner).poll(cx))? {
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
proto::Dispatched::Upgrade(pending) => {
pending.manual();
Poll::Ready(Ok(()))
}
}
}
}
impl Default for Builder {
fn default() -> Self {
Self::new()
}
}
impl Builder {
#[inline]
pub fn new() -> Builder {
Builder {
opts: Default::default(),
}
}
#[inline]
pub fn options(&mut self, opts: Http1Options) {
self.opts = opts;
}
pub async fn handshake<T, B>(self, io: T) -> Result<(SendRequest<B>, Connection<T, B>)>
where
T: AsyncRead + AsyncWrite + Unpin,
B: Body + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
{
trace!("client handshake HTTP/1");
let (tx, rx) = dispatch::channel();
let mut conn = proto::Conn::new(io);
let h1_parser_config = {
let mut h1_parser_config = ParserConfig::default();
h1_parser_config
.ignore_invalid_headers_in_responses(self.opts.ignore_invalid_headers_in_responses)
.allow_spaces_after_header_name_in_responses(
self.opts.allow_spaces_after_header_name_in_responses,
)
.allow_obsolete_multiline_headers_in_responses(
self.opts.allow_obsolete_multiline_headers_in_responses,
);
h1_parser_config
};
conn.set_h1_parser_config(h1_parser_config);
if let Some(writev) = self.opts.h1_writev {
if writev {
conn.set_write_strategy_queue();
} else {
conn.set_write_strategy_flatten();
}
}
if let Some(max_headers) = self.opts.h1_max_headers {
conn.set_http1_max_headers(max_headers);
}
if self.opts.h09_responses {
conn.set_h09_responses();
}
if let Some(sz) = self.opts.h1_read_buf_exact_size {
conn.set_read_buf_exact_size(sz);
}
if let Some(max) = self.opts.h1_max_buf_size {
conn.set_max_buf_size(max);
}
let cd = proto::h1::dispatch::Client::new(rx);
let proto = proto::h1::Dispatcher::new_with_config(
cd,
conn,
proto::h1::dispatch::PollConfig {
max_iterations: self
.opts
.h1_max_poll_iterations
.unwrap_or(proto::h1::dispatch::DEFAULT_MAX_POLL_ITERATIONS),
},
);
Ok((SendRequest { dispatch: tx }, Connection { inner: proto }))
}
}
mod upgrades {
use super::*;
use crate::client::core::upgrade::Upgraded;
#[must_use = "futures do nothing unless polled"]
pub struct UpgradeableConnection<T, B>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Body + 'static,
B::Error: Into<BoxError>,
{
pub(super) inner: Option<Connection<T, B>>,
}
impl<I, B> Future for UpgradeableConnection<I, B>
where
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
{
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.inner.as_mut().unwrap().inner).poll(cx)) {
Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
Ok(proto::Dispatched::Upgrade(pending)) => {
let Parts { io, read_buf } = self.inner.take().unwrap().into_parts();
pending.fulfill(Upgraded::new(io, read_buf));
Poll::Ready(Ok(()))
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
}