use std::{
fmt,
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll, ready},
};
use http::{Request, Response};
use http_body::Body;
use tokio::io::{AsyncRead, AsyncWrite};
use crate::{
client::core::{
Result,
body::Incoming as IncomingBody,
dispatch::{self, TrySendError},
error::{BoxError, Error},
proto::{self, h2::ping},
rt::{ArcTimer, Time, Timer, bounds::Http2ClientConnExec},
},
http2::Http2Options,
};
pub struct SendRequest<B> {
dispatch: dispatch::UnboundedSender<Request<B>, Response<IncomingBody>>,
}
impl<B> Clone for SendRequest<B> {
fn clone(&self) -> SendRequest<B> {
SendRequest {
dispatch: self.dispatch.clone(),
}
}
}
#[must_use = "futures do nothing unless polled"]
pub struct Connection<T, B, E>
where
T: AsyncRead + AsyncWrite + Unpin,
B: Body + 'static,
E: Http2ClientConnExec<B, T> + Unpin,
B::Error: Into<BoxError>,
{
inner: (PhantomData<T>, proto::h2::ClientTask<B, E, T>),
}
#[derive(Clone)]
pub struct Builder<Ex> {
exec: Ex,
timer: Time,
opts: Http2Options,
}
impl<B> SendRequest<B> {
pub fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> {
if self.is_closed() {
Poll::Ready(Err(Error::new_closed()))
} else {
Poll::Ready(Ok(()))
}
}
pub async fn ready(&mut self) -> Result<()> {
std::future::poll_fn(|cx| self.poll_ready(cx)).await
}
pub fn is_ready(&self) -> bool {
self.dispatch.is_ready()
}
pub fn is_closed(&self) -> bool {
self.dispatch.is_closed()
}
}
impl<B> SendRequest<B>
where
B: Body + 'static,
{
pub fn try_send_request(
&mut self,
req: Request<B>,
) -> impl Future<Output = std::result::Result<Response<IncomingBody>, TrySendError<Request<B>>>>
{
let sent = self.dispatch.try_send(req);
async move {
match sent {
Ok(rx) => match rx.await {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
Err(_) => panic!("dispatch dropped without returning error"),
},
Err(req) => {
debug!("connection was not ready");
let error = Error::new_canceled().with("connection was not ready");
Err(TrySendError {
error,
message: Some(req),
})
}
}
}
}
}
impl<B> fmt::Debug for SendRequest<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendRequest").finish()
}
}
impl<T, B, E> fmt::Debug for Connection<T, B, E>
where
T: AsyncRead + AsyncWrite + fmt::Debug + 'static + Unpin,
B: Body + 'static,
E: Http2ClientConnExec<B, T> + Unpin,
B::Error: Into<BoxError>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection").finish()
}
}
impl<T, B, E> Future for Connection<T, B, E>
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
B: Body + 'static + Unpin,
B::Data: Send,
E: Unpin,
B::Error: Into<BoxError>,
E: Http2ClientConnExec<B, T> + Unpin,
{
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.inner.1).poll(cx))? {
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
proto::Dispatched::Upgrade(_pending) => unreachable!("http2 cannot upgrade"),
}
}
}
impl<Ex> Builder<Ex>
where
Ex: Clone,
{
#[inline]
pub fn new(exec: Ex) -> Builder<Ex> {
Builder {
exec,
timer: Time::Empty,
opts: Default::default(),
}
}
#[inline]
pub fn timer<M>(&mut self, timer: M)
where
M: Timer + Send + Sync + 'static,
{
self.timer = Time::Timer(ArcTimer::new(timer));
}
#[inline]
pub fn options(&mut self, opts: Http2Options) {
self.opts = opts;
}
pub async fn handshake<T, B>(self, io: T) -> Result<(SendRequest<B>, Connection<T, B, Ex>)>
where
T: AsyncRead + AsyncWrite + Unpin,
B: Body + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
Ex: Http2ClientConnExec<B, T> + Unpin,
{
trace!("client handshake HTTP/2");
let builder = {
let mut builder = http2::client::Builder::default();
builder
.initial_max_send_streams(self.opts.initial_max_send_streams)
.initial_window_size(self.opts.initial_window_size)
.initial_connection_window_size(self.opts.initial_conn_window_size)
.max_send_buffer_size(self.opts.max_send_buffer_size);
if let Some(id) = self.opts.initial_stream_id {
builder.initial_stream_id(id);
}
if let Some(max) = self.opts.max_pending_accept_reset_streams {
builder.max_pending_accept_reset_streams(max);
}
if let Some(max) = self.opts.max_concurrent_reset_streams {
builder.max_concurrent_reset_streams(max);
}
if let Some(max) = self.opts.max_concurrent_streams {
builder.max_concurrent_streams(max);
}
if let Some(max) = self.opts.max_header_list_size {
builder.max_header_list_size(max);
}
if let Some(opt) = self.opts.enable_push {
builder.enable_push(opt);
}
if let Some(max) = self.opts.max_frame_size {
builder.max_frame_size(max);
}
if let Some(max) = self.opts.header_table_size {
builder.header_table_size(max);
}
if let Some(v) = self.opts.enable_connect_protocol {
builder.enable_connect_protocol(v);
}
if let Some(v) = self.opts.no_rfc7540_priorities {
builder.no_rfc7540_priorities(v);
}
if let Some(order) = self.opts.settings_order {
builder.settings_order(order);
}
if let Some(experimental_settings) = self.opts.experimental_settings {
builder.experimental_settings(experimental_settings);
}
if let Some(stream_dependency) = self.opts.headers_stream_dependency {
builder.headers_stream_dependency(stream_dependency);
}
if let Some(order) = self.opts.headers_pseudo_order {
builder.headers_pseudo_order(order);
}
if let Some(priority) = self.opts.priorities {
builder.priorities(priority);
}
builder
};
let ping_config = ping::Config::new(
self.opts.adaptive_window,
self.opts.initial_window_size,
self.opts.keep_alive_interval,
self.opts.keep_alive_timeout,
self.opts.keep_alive_while_idle,
);
let (tx, rx) = dispatch::channel();
let h2 = proto::h2::client::handshake(io, rx, builder, ping_config, self.exec, self.timer)
.await?;
Ok((
SendRequest {
dispatch: tx.unbound(),
},
Connection {
inner: (PhantomData, h2),
},
))
}
}