use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use bytes::Bytes;
use http::{Request, Response};
use http_body::Body;
use httparse::ParserConfig;
use tokio::io::{AsyncRead, AsyncWrite};
use crate::{
body::Incoming,
dispatch::{self, TrySendError},
error::BoxError,
proto::{
self,
http1::{self, conn::Conn, role::Client, Http1Options},
},
Error, Result,
};
pub struct SendRequest<B> {
dispatch: dispatch::Sender<Request<B>, Response<Incoming>>,
}
#[derive(Debug)]
#[non_exhaustive]
pub struct Parts<T> {
pub io: T,
pub read_buf: Bytes,
}
#[must_use = "futures do nothing unless polled"]
pub struct Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Body + 'static,
{
inner: http1::dispatch::Dispatcher<http1::dispatch::Client<B>, B, T, Client>,
}
impl<T, B> Connection<T, B>
where
T: AsyncRead + AsyncWrite + Unpin,
B: Body + 'static,
B::Error: Into<BoxError>,
{
#[inline]
pub fn into_parts(self) -> Parts<T> {
let (io, read_buf, _) = self.inner.into_inner();
Parts { io, read_buf }
}
}
#[derive(Debug, Default, Clone)]
pub struct Builder {
opts: Http1Options,
}
impl<B> SendRequest<B> {
#[inline]
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.dispatch.poll_ready(cx)
}
#[inline]
pub async fn ready(&mut self) -> Result<()> {
std::future::poll_fn(|cx| self.poll_ready(cx)).await
}
#[inline]
pub fn is_ready(&self) -> bool {
self.dispatch.is_ready()
}
}
impl<B> SendRequest<B>
where
B: Body + 'static,
{
pub fn try_send_request(
&mut self,
req: Request<B>,
) -> impl Future<Output = Result<Response<Incoming>, TrySendError<Request<B>>>> {
let sent = self.dispatch.try_send(req);
async move {
match sent {
Ok(rx) => match rx.await {
Ok(res) => res,
Err(_) => panic!("dispatch dropped without returning error"),
},
Err(req) => {
debug!("connection was not ready");
Err(TrySendError {
error: Error::new_canceled().with("connection was not ready"),
message: Some(req),
})
}
}
}
}
}
impl<T, B> Connection<T, B>
where
T: AsyncRead + AsyncWrite + Unpin + Send,
B: Body + 'static,
B::Error: Into<BoxError>,
{
#[inline]
pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<T, B> {
upgrades::UpgradeableConnection { inner: Some(self) }
}
pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
self.inner.poll_without_shutdown(cx)
}
pub async fn without_shutdown(self) -> crate::Result<Parts<T>> {
let mut conn = Some(self);
std::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
Poll::Ready(Ok(conn.take().unwrap().into_parts()))
})
.await
}
}
impl<T, B> Future for Connection<T, B>
where
T: AsyncRead + AsyncWrite + Unpin,
B: Body + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
{
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.inner).poll(cx))? {
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
proto::Dispatched::Upgrade(pending) => {
pending.manual();
Poll::Ready(Ok(()))
}
}
}
}
impl Builder {
#[inline]
pub fn options(mut self, opts: Http1Options) -> Self {
self.opts = opts;
self
}
pub async fn handshake<T, B>(self, io: T) -> Result<(SendRequest<B>, Connection<T, B>)>
where
T: AsyncRead + AsyncWrite + Unpin,
B: Body + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
{
trace!("client handshake HTTP/1");
let (tx, rx) = dispatch::channel();
let mut conn = Conn::new(io);
let h1_parser_config = {
let mut h1_parser_config = ParserConfig::default();
h1_parser_config
.ignore_invalid_headers_in_responses(self.opts.ignore_invalid_headers_in_responses)
.allow_spaces_after_header_name_in_responses(
self.opts.allow_spaces_after_header_name_in_responses,
)
.allow_obsolete_multiline_headers_in_responses(
self.opts.allow_obsolete_multiline_headers_in_responses,
);
h1_parser_config
};
conn.set_h1_parser_config(h1_parser_config);
if let Some(writev) = self.opts.h1_writev {
if writev {
conn.set_write_strategy_queue();
} else {
conn.set_write_strategy_flatten();
}
}
if let Some(max_headers) = self.opts.h1_max_headers {
conn.set_http1_max_headers(max_headers);
}
if self.opts.h09_responses {
conn.set_h09_responses();
}
if let Some(sz) = self.opts.h1_read_buf_exact_size {
conn.set_read_buf_exact_size(sz);
}
if let Some(max) = self.opts.h1_max_buf_size {
conn.set_max_buf_size(max);
}
let cd = http1::dispatch::Client::new(rx);
let proto = http1::dispatch::Dispatcher::new(cd, conn);
Ok((SendRequest { dispatch: tx }, Connection { inner: proto }))
}
}
mod upgrades {
use super::*;
use crate::upgrade::Upgraded;
#[must_use = "futures do nothing unless polled"]
pub struct UpgradeableConnection<T, B>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Body + 'static,
B::Error: Into<BoxError>,
{
pub(super) inner: Option<Connection<T, B>>,
}
impl<I, B> Future for UpgradeableConnection<I, B>
where
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
{
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.inner.as_mut().unwrap().inner).poll(cx)) {
Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
Ok(proto::Dispatched::Upgrade(pending)) => {
let Parts { io, read_buf } = self.inner.take().unwrap().into_parts();
pending.fulfill(Upgraded::new(io, read_buf));
Poll::Ready(Ok(()))
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
}