use std::{fmt, future::Future};
use futures_core::future::BoxFuture;
use futures_util::FutureExt as _;
use http::Response;
use hyper::body::Incoming;
use thiserror::Error;
pub use crate::client::pool::key::UriError;
use crate::client::pool::PoolableConnection;
pub trait Connection {
type ResBody: http_body::Body + Send + 'static;
type Error: std::error::Error + Send + Sync + 'static;
type Future: Future<Output = Result<::http::Response<Self::ResBody>, Self::Error>>
+ Send
+ 'static;
fn send_request(&mut self, request: crate::body::Request) -> Self::Future;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>>;
fn when_ready(&mut self) -> BoxFuture<'_, Result<(), Self::Error>>
where
Self: Send,
{
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).boxed()
}
fn version(&self) -> ::http::Version;
}
pub struct HttpConnection {
inner: InnerConnection,
}
impl HttpConnection {
pub(super) fn h1(conn: hyper::client::conn::http1::SendRequest<crate::body::Body>) -> Self {
HttpConnection {
inner: InnerConnection::H1(conn),
}
}
pub(super) fn h2(conn: hyper::client::conn::http2::SendRequest<crate::body::Body>) -> Self {
HttpConnection {
inner: InnerConnection::H2(conn),
}
}
}
impl fmt::Debug for HttpConnection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HttpConnection")
.field("version", &self.version())
.finish()
}
}
enum InnerConnection {
H2(hyper::client::conn::http2::SendRequest<crate::body::Body>),
H1(hyper::client::conn::http1::SendRequest<crate::body::Body>),
}
impl Connection for HttpConnection {
type ResBody = hyper::body::Incoming;
type Error = hyper::Error;
type Future = BoxFuture<'static, Result<Response<Incoming>, hyper::Error>>;
fn send_request(&mut self, mut request: crate::body::Request) -> Self::Future {
match &mut self.inner {
InnerConnection::H2(conn) => {
*request.version_mut() = http::Version::HTTP_2;
Box::pin(conn.send_request(request))
}
InnerConnection::H1(conn) => {
*request.version_mut() = http::Version::HTTP_11;
Box::pin(conn.send_request(request))
}
}
}
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
match &mut self.inner {
InnerConnection::H2(conn) => conn.poll_ready(cx),
InnerConnection::H1(conn) => conn.poll_ready(cx),
}
}
fn version(&self) -> http::Version {
match &self.inner {
InnerConnection::H2(_) => http::Version::HTTP_2,
InnerConnection::H1(_) => http::Version::HTTP_11,
}
}
}
impl PoolableConnection for HttpConnection {
fn is_open(&self) -> bool {
match &self.inner {
InnerConnection::H2(ref conn) => conn.is_ready(),
InnerConnection::H1(ref conn) => conn.is_ready(),
}
}
fn can_share(&self) -> bool {
match &self.inner {
InnerConnection::H2(_) => true,
InnerConnection::H1(_) => false,
}
}
fn reuse(&mut self) -> Option<Self> {
match &self.inner {
InnerConnection::H2(conn) => Some(Self {
inner: InnerConnection::H2(conn.clone()),
}),
InnerConnection::H1(_) => None,
}
}
}
#[derive(Debug, Error)]
pub enum ConnectionError {
#[error(transparent)]
Connecting(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("handshake: {0}")]
Handshake(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("connection cancelled")]
Canceled(#[source] hyper::Error),
#[error("connection closed")]
Closed(#[source] hyper::Error),
#[error("connection timeout")]
Timeout,
#[error("invalid URI")]
InvalidUri(#[from] UriError),
}