use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use http::Request;
use http::Response;
use http::Uri;
use super::{BoxFuture, ClientBody, ClientTransport};
use crate::error::ConnectError;
type BoxError = Box<dyn std::error::Error + Send + Sync>;
#[cfg(feature = "client-tls")]
use std::sync::Arc;
trait H2Io: hyper::rt::Read + hyper::rt::Write + Send + Unpin {}
impl<T: hyper::rt::Read + hyper::rt::Write + Send + Unpin> H2Io for T {}
type BoxedIo = Pin<Box<dyn H2Io>>;
type BoxedConnector = tower::util::BoxService<Uri, BoxedIo, BoxError>;
fn box_connector<C>(connector: C) -> BoxedConnector
where
C: tower::Service<Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin + 'static,
C::Error: Into<BoxError>,
C::Future: Send + 'static,
{
use tower::ServiceExt;
tower::util::BoxService::new(
connector
.map_response(|io| Box::pin(io) as BoxedIo)
.map_err(Into::into),
)
}
#[cfg(unix)]
fn unix_connector(
path: std::path::PathBuf,
) -> impl tower::Service<
Uri,
Response = hyper_util::rt::TokioIo<tokio::net::UnixStream>,
Error = ConnectError,
Future: Send + 'static,
> + Send
+ 'static {
tower::service_fn(move |_uri: Uri| {
let path = path.clone();
async move {
let stream = tokio::net::UnixStream::connect(&path).await.map_err(|e| {
ConnectError::unavailable(format!(
"unix socket connect to {} failed: {e}",
path.display()
))
})?;
Ok(hyper_util::rt::TokioIo::new(stream))
}
})
}
#[cfg(feature = "client-tls")]
fn prepare_tls_for_h2(config: &Arc<rustls::ClientConfig>) -> Arc<rustls::ClientConfig> {
let mut cfg = (**config).clone();
cfg.alpn_protocols = vec![b"h2".to_vec()];
Arc::new(cfg)
}
#[cfg(feature = "client-tls")]
fn server_name_from_uri(uri: &Uri) -> Result<rustls_pki_types::ServerName<'static>, ConnectError> {
let host = uri.host().ok_or_else(|| {
ConnectError::invalid_argument("URI must have a host for TLS server name resolution")
})?;
let stripped = host
.strip_prefix('[')
.and_then(|s| s.strip_suffix(']'))
.unwrap_or(host);
rustls_pki_types::ServerName::try_from(stripped.to_owned()).map_err(|e| {
ConnectError::invalid_argument(format!("invalid TLS server name '{host}': {e}"))
})
}
#[cfg(feature = "client-tls")]
fn require_https_scheme(uri: &Uri) -> Result<(), ConnectError> {
match uri.scheme_str() {
Some("https") => Ok(()),
Some("http") | None => Err(ConnectError::invalid_argument(
"Http2Connection TLS constructors require https:// scheme; \
use connect_plaintext/lazy_plaintext for http://",
)),
Some(other) => Err(ConnectError::invalid_argument(format!(
"unsupported URI scheme: {other}"
))),
}
}
pub struct Http2Connection {
inner: Reconnect<MakeSendRequest>,
}
impl std::fmt::Debug for Http2Connection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state = match self.inner.state {
ReconnectState::Idle => "Idle",
ReconnectState::Connecting(_) => "Connecting",
ReconnectState::Connected(_) => "Connected",
};
f.debug_struct("Http2Connection")
.field("uri", &self.inner.uri)
.field("state", &state)
.field("has_connected", &self.inner.has_connected)
.finish()
}
}
fn require_http_scheme(uri: &Uri) -> Result<(), ConnectError> {
match uri.scheme_str() {
Some("http") | None => Ok(()),
Some("https") => Err(ConnectError::invalid_argument(
"Http2Connection plaintext constructors require http:// scheme; \
use connect_tls/lazy_tls for https://",
)),
Some(other) => Err(ConnectError::invalid_argument(format!(
"unsupported URI scheme: {other}"
))),
}
}
impl Http2Connection {
pub fn builder() -> Http2ConnectionBuilder {
Http2ConnectionBuilder::default()
}
#[must_use]
pub fn lazy_plaintext(uri: Uri) -> Self {
Self::builder().lazy_plaintext(uri)
}
pub async fn connect_plaintext(uri: Uri) -> Result<Self, ConnectError> {
Self::builder().connect_plaintext(uri).await
}
#[deprecated(
since = "0.8.0",
note = "use `Http2Connection::builder()` and configure via the proxied \
keep-alive/window setters or `h2_settings(|b| ...)`"
)]
#[must_use]
pub fn with_builder_plaintext(
uri: Uri,
mut builder: hyper::client::conn::http2::Builder<hyper_util::rt::TokioExecutor>,
) -> Self {
builder.timer(hyper_util::rt::TokioTimer::new());
let mut b = Self::builder();
b.h2_builder = builder;
b.lazy_plaintext(uri)
}
#[must_use]
pub fn lazy_with_connector<C>(connector: C, authority: Uri) -> Self
where
C: tower::Service<Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin + 'static,
C::Error: Into<BoxError>,
C::Future: Send + 'static,
{
Self::builder().lazy_with_connector(connector, authority)
}
pub async fn connect_with_connector<C>(
connector: C,
authority: Uri,
) -> Result<Self, ConnectError>
where
C: tower::Service<Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin + 'static,
C::Error: Into<BoxError>,
C::Future: Send + 'static,
{
Self::builder()
.connect_with_connector(connector, authority)
.await
}
#[cfg(unix)]
#[cfg_attr(docsrs, doc(cfg(unix)))]
#[must_use]
pub fn lazy_unix(path: impl Into<std::path::PathBuf>, authority: Uri) -> Self {
Self::builder().lazy_unix(path, authority)
}
#[cfg(unix)]
#[cfg_attr(docsrs, doc(cfg(unix)))]
pub async fn connect_unix(
path: impl Into<std::path::PathBuf>,
authority: Uri,
) -> Result<Self, ConnectError> {
Self::builder().connect_unix(path, authority).await
}
#[cfg(feature = "client-tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "client-tls")))]
#[must_use]
pub fn lazy_tls(uri: Uri, tls_config: Arc<rustls::ClientConfig>) -> Self {
Self::builder().lazy_tls(uri, tls_config)
}
#[cfg(feature = "client-tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "client-tls")))]
pub async fn connect_tls(
uri: Uri,
tls_config: Arc<rustls::ClientConfig>,
) -> Result<Self, ConnectError> {
Self::builder().connect_tls(uri, tls_config).await
}
#[deprecated(
since = "0.8.0",
note = "use `Http2Connection::builder()` and configure via the proxied \
keep-alive/window setters or `h2_settings(|b| ...)`"
)]
#[cfg(feature = "client-tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "client-tls")))]
#[must_use]
pub fn with_builder_tls(
uri: Uri,
mut builder: hyper::client::conn::http2::Builder<hyper_util::rt::TokioExecutor>,
tls_config: Arc<rustls::ClientConfig>,
) -> Self {
builder.timer(hyper_util::rt::TokioTimer::new());
let mut b = Self::builder();
b.h2_builder = builder;
b.lazy_tls(uri, tls_config)
}
}
#[derive(Debug, Clone)]
#[must_use = "call a lazy_*/connect_* terminal to build the connection"]
pub struct Http2ConnectionBuilder {
tcp_connect_timeout: Option<Duration>,
establishment_timeout: Option<Duration>,
h2_builder: hyper::client::conn::http2::Builder<hyper_util::rt::TokioExecutor>,
}
pub const DEFAULT_ESTABLISHMENT_TIMEOUT: Duration = Duration::from_secs(20);
pub const DEFAULT_TCP_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
pub(super) fn finite(dur: Duration) -> Option<Duration> {
(dur != Duration::MAX).then_some(dur)
}
impl Default for Http2ConnectionBuilder {
fn default() -> Self {
let mut h2_builder =
hyper::client::conn::http2::Builder::new(hyper_util::rt::TokioExecutor::new());
h2_builder.timer(hyper_util::rt::TokioTimer::new());
Self {
tcp_connect_timeout: Some(DEFAULT_TCP_CONNECT_TIMEOUT),
establishment_timeout: Some(DEFAULT_ESTABLISHMENT_TIMEOUT),
h2_builder,
}
}
}
impl Http2ConnectionBuilder {
pub fn tcp_connect_timeout(mut self, dur: Duration) -> Self {
self.tcp_connect_timeout = finite(dur);
self
}
pub fn no_tcp_connect_timeout(mut self) -> Self {
self.tcp_connect_timeout = None;
self
}
pub fn establishment_timeout(mut self, dur: Duration) -> Self {
self.establishment_timeout = finite(dur);
self
}
pub fn no_establishment_timeout(mut self) -> Self {
self.establishment_timeout = None;
self
}
pub fn keep_alive_interval(mut self, interval: Duration) -> Self {
self.h2_builder.keep_alive_interval(interval);
self
}
pub fn keep_alive_timeout(mut self, timeout: Duration) -> Self {
self.h2_builder.keep_alive_timeout(timeout);
self
}
pub fn keep_alive_while_idle(mut self, enabled: bool) -> Self {
self.h2_builder.keep_alive_while_idle(enabled);
self
}
pub fn initial_stream_window_size(mut self, size: u32) -> Self {
self.h2_builder.initial_stream_window_size(size);
self
}
pub fn initial_connection_window_size(mut self, size: u32) -> Self {
self.h2_builder.initial_connection_window_size(size);
self
}
pub fn adaptive_window(mut self, enabled: bool) -> Self {
self.h2_builder.adaptive_window(enabled);
self
}
pub fn h2_settings(
mut self,
f: impl FnOnce(&mut hyper::client::conn::http2::Builder<hyper_util::rt::TokioExecutor>),
) -> Self {
f(&mut self.h2_builder);
self
}
#[must_use]
pub fn lazy_plaintext(self, uri: Uri) -> Http2Connection {
Http2Connection {
inner: Reconnect::new(self.make_plaintext(), uri, true),
}
}
pub async fn connect_plaintext(self, uri: Uri) -> Result<Http2Connection, ConnectError> {
require_http_scheme(&uri)?;
let mut conn = Http2Connection {
inner: Reconnect::new(self.make_plaintext(), uri, false),
};
drive_connect(&mut conn, "connect failed").await?;
Ok(conn)
}
#[cfg(feature = "client-tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "client-tls")))]
#[must_use]
pub fn lazy_tls(self, uri: Uri, tls_config: Arc<rustls::ClientConfig>) -> Http2Connection {
Http2Connection {
inner: Reconnect::new(self.make_tls(tls_config), uri, true),
}
}
#[cfg(feature = "client-tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "client-tls")))]
pub async fn connect_tls(
self,
uri: Uri,
tls_config: Arc<rustls::ClientConfig>,
) -> Result<Http2Connection, ConnectError> {
require_https_scheme(&uri)?;
let mut conn = Http2Connection {
inner: Reconnect::new(self.make_tls(tls_config), uri, false),
};
drive_connect(&mut conn, "TLS connect failed").await?;
Ok(conn)
}
#[must_use]
pub fn lazy_with_connector<C>(self, connector: C, authority: Uri) -> Http2Connection
where
C: tower::Service<Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin + 'static,
C::Error: Into<BoxError>,
C::Future: Send + 'static,
{
Http2Connection {
inner: Reconnect::new(self.make_custom(box_connector(connector)), authority, true),
}
}
pub async fn connect_with_connector<C>(
self,
connector: C,
authority: Uri,
) -> Result<Http2Connection, ConnectError>
where
C: tower::Service<Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin + 'static,
C::Error: Into<BoxError>,
C::Future: Send + 'static,
{
let mut conn = Http2Connection {
inner: Reconnect::new(self.make_custom(box_connector(connector)), authority, false),
};
drive_connect(&mut conn, "connect failed").await?;
Ok(conn)
}
#[cfg(unix)]
#[cfg_attr(docsrs, doc(cfg(unix)))]
#[must_use]
pub fn lazy_unix(self, path: impl Into<std::path::PathBuf>, authority: Uri) -> Http2Connection {
self.lazy_with_connector(unix_connector(path.into()), authority)
}
#[cfg(unix)]
#[cfg_attr(docsrs, doc(cfg(unix)))]
pub async fn connect_unix(
self,
path: impl Into<std::path::PathBuf>,
authority: Uri,
) -> Result<Http2Connection, ConnectError> {
self.connect_with_connector(unix_connector(path.into()), authority)
.await
}
fn http_connector(&self) -> hyper_util::client::legacy::connect::HttpConnector {
let mut connector = hyper_util::client::legacy::connect::HttpConnector::new();
connector.set_nodelay(true);
connector.set_connect_timeout(self.tcp_connect_timeout);
connector
}
fn make_plaintext(self) -> MakeSendRequest {
MakeSendRequest {
connector: self.http_connector(),
builder: self.h2_builder,
#[cfg(feature = "client-tls")]
tls: None,
custom: None,
establishment_timeout: self.establishment_timeout,
}
}
#[cfg(feature = "client-tls")]
fn make_tls(self, tls_config: Arc<rustls::ClientConfig>) -> MakeSendRequest {
let mut connector = self.http_connector();
connector.enforce_http(false);
MakeSendRequest {
connector,
builder: self.h2_builder,
tls: Some(prepare_tls_for_h2(&tls_config)),
custom: None,
establishment_timeout: self.establishment_timeout,
}
}
fn make_custom(self, conn: BoxedConnector) -> MakeSendRequest {
MakeSendRequest {
connector: self.http_connector(),
builder: self.h2_builder,
#[cfg(feature = "client-tls")]
tls: None,
custom: Some(conn),
establishment_timeout: self.establishment_timeout,
}
}
}
async fn drive_connect(conn: &mut Http2Connection, ctx: &str) -> Result<(), ConnectError> {
std::future::poll_fn(|cx| conn.inner.poll_ready(cx))
.await
.map_err(|e| ConnectError::unavailable(format!("{ctx}: {e}")))
}
impl tower::Service<Request<ClientBody>> for Http2Connection {
type Response = Response<hyper::body::Incoming>;
type Error = BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request<ClientBody>) -> Self::Future {
self.inner.call(req)
}
}
#[derive(Clone)]
#[allow(clippy::type_complexity)] pub struct SharedHttp2Connection {
inner: tower::buffer::Buffer<
Request<ClientBody>,
BoxFuture<'static, Result<Response<hyper::body::Incoming>, BoxError>>,
>,
}
impl Http2Connection {
pub fn shared(self, bound: usize) -> SharedHttp2Connection {
let (buffer, worker) = tower::buffer::Buffer::pair(self, bound);
tokio::spawn(worker);
SharedHttp2Connection { inner: buffer }
}
}
impl tower::Service<Request<ClientBody>> for SharedHttp2Connection {
type Response = Response<hyper::body::Incoming>;
type Error = BoxError;
type Future = BoxFuture<'static, Result<Response<hyper::body::Incoming>, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
<_ as tower::Service<Request<ClientBody>>>::poll_ready(&mut self.inner, cx)
}
fn call(&mut self, req: Request<ClientBody>) -> Self::Future {
let fut = <_ as tower::Service<Request<ClientBody>>>::call(&mut self.inner, req);
Box::pin(fut)
}
}
impl ClientTransport for SharedHttp2Connection {
type ResponseBody = hyper::body::Incoming;
type Error = ConnectError;
fn send(
&self,
request: Request<ClientBody>,
) -> BoxFuture<'static, Result<Response<Self::ResponseBody>, Self::Error>> {
use tower::ServiceExt;
let svc = self.clone();
Box::pin(async move {
svc.oneshot(request)
.await
.map_err(|e| ConnectError::unavailable(format!("h2 send failed: {e}")))
})
}
}
struct SendRequest {
inner: hyper::client::conn::http2::SendRequest<ClientBody>,
}
impl tower::Service<Request<ClientBody>> for SendRequest {
type Response = Response<hyper::body::Incoming>;
type Error = BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}
fn call(&mut self, req: Request<ClientBody>) -> Self::Future {
let fut = self.inner.send_request(req);
Box::pin(async move { fut.await.map_err(Into::into) })
}
}
struct MakeSendRequest {
connector: hyper_util::client::legacy::connect::HttpConnector,
builder: hyper::client::conn::http2::Builder<hyper_util::rt::TokioExecutor>,
#[cfg(feature = "client-tls")]
tls: Option<Arc<rustls::ClientConfig>>,
custom: Option<BoxedConnector>,
establishment_timeout: Option<Duration>,
}
impl tower::Service<Uri> for MakeSendRequest {
type Response = SendRequest;
type Error = BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Some(c) = &mut self.custom {
return c.poll_ready(cx);
}
<_ as tower::Service<Uri>>::poll_ready(&mut self.connector, cx).map_err(Into::into)
}
fn call(&mut self, uri: Uri) -> Self::Future {
if let Some(c) = &mut self.custom {
let io_fut = c.call(uri);
let builder = self.builder.clone();
let establishment_timeout = self.establishment_timeout;
return Box::pin(async move {
let establish = async move {
let io = io_fut.await?;
builder.handshake(io).await.map_err(BoxError::from)
};
let (send_request, conn) =
run_establishment(establish, establishment_timeout).await?;
tokio::spawn(async move {
if let Err(e) = conn.await {
tracing::debug!("h2 connection task exited with error: {e}");
}
});
Ok(SendRequest {
inner: send_request,
})
});
}
#[cfg(feature = "client-tls")]
let scheme_check = if self.tls.is_some() {
require_https_scheme(&uri)
} else {
require_http_scheme(&uri)
};
#[cfg(not(feature = "client-tls"))]
let scheme_check = require_http_scheme(&uri);
if let Err(e) = scheme_check {
return Box::pin(async move { Err(e.into()) });
}
#[cfg(feature = "client-tls")]
let tls = self.tls.clone();
#[cfg(feature = "client-tls")]
let server_name = match self.tls.is_some() {
true => Some(match server_name_from_uri(&uri) {
Ok(sn) => sn,
Err(e) => return Box::pin(async move { Err(e.into()) }),
}),
false => None,
};
let connect_fut = <_ as tower::Service<Uri>>::call(&mut self.connector, uri);
let builder = self.builder.clone();
let establishment_timeout = self.establishment_timeout;
Box::pin(async move {
let establish = async move {
let io = connect_fut.await.map_err(Into::<BoxError>::into)?;
#[cfg(feature = "client-tls")]
let io: BoxedIo = if let (Some(tls), Some(server_name)) = (tls, server_name) {
let tcp = io.into_inner();
let connector = tokio_rustls::TlsConnector::from(tls);
let tls_stream = connector.connect(server_name, tcp).await.map_err(|e| {
BoxError::from(ConnectError::unavailable(format!(
"TLS handshake failed: {e}"
)))
})?;
let (_, session) = tls_stream.get_ref();
if session.alpn_protocol() != Some(b"h2") {
return Err(BoxError::from(ConnectError::unavailable(
"TLS handshake succeeded but server did not negotiate \
HTTP/2 via ALPN (is the server h2-capable?)",
)));
}
Box::pin(hyper_util::rt::TokioIo::new(tls_stream))
} else {
Box::pin(io)
};
#[cfg(not(feature = "client-tls"))]
let io: BoxedIo = Box::pin(io);
builder.handshake(io).await.map_err(BoxError::from)
};
let (send_request, conn) = run_establishment(establish, establishment_timeout).await?;
tokio::spawn(async move {
if let Err(e) = conn.await {
tracing::debug!("h2 connection task exited with error: {e}");
}
});
Ok(SendRequest {
inner: send_request,
})
})
}
}
pub(super) async fn run_establishment<F, T, E>(
fut: F,
timeout: Option<Duration>,
) -> Result<T, BoxError>
where
F: Future<Output = Result<T, E>>,
E: Into<BoxError>,
{
match timeout {
Some(dur) => match tokio::time::timeout(dur, fut).await {
Ok(res) => res.map_err(Into::into),
Err(_) => Err(ConnectError::unavailable(format!(
"connection establishment did not complete within {dur:?}"
))
.into()),
},
None => fut.await.map_err(Into::into),
}
}
struct Reconnect<M>
where
M: tower::Service<Uri>,
{
make: M,
uri: Uri,
state: ReconnectState<M::Future, M::Response>,
deferred_error: Option<BoxError>,
has_connected: bool,
lazy: bool,
}
enum ReconnectState<F, S> {
Idle,
Connecting(Pin<Box<F>>),
Connected(S),
}
impl<M> Reconnect<M>
where
M: tower::Service<Uri>,
{
fn new(make: M, uri: Uri, lazy: bool) -> Self {
Self {
make,
uri,
state: ReconnectState::Idle,
deferred_error: None,
has_connected: false,
lazy,
}
}
}
impl<M, S> Reconnect<M>
where
M: tower::Service<Uri, Response = S>,
M::Error: Into<BoxError>,
S: tower::Service<Request<ClientBody>>,
S::Error: Into<BoxError>,
S::Future: Send + 'static,
{
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
if self.deferred_error.is_some() {
return Poll::Ready(Ok(()));
}
loop {
match &mut self.state {
ReconnectState::Idle => {
if let Err(e) = futures::ready!(self.make.poll_ready(cx)) {
return Poll::Ready(Err(e.into()));
}
let fut = self.make.call(self.uri.clone());
self.state = ReconnectState::Connecting(Box::pin(fut));
}
ReconnectState::Connecting(fut) => match fut.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(svc)) => {
self.state = ReconnectState::Connected(svc);
self.has_connected = true;
}
Poll::Ready(Err(e)) => {
let e: BoxError = e.into();
self.state = ReconnectState::Idle;
if self.has_connected || self.lazy {
tracing::debug!("h2 reconnect failed (will retry): {e}");
self.deferred_error = Some(e);
return Poll::Ready(Ok(()));
} else {
return Poll::Ready(Err(e));
}
}
},
ReconnectState::Connected(svc) => match svc.poll_ready(cx) {
Poll::Ready(Ok(())) => return Poll::Ready(Ok(())),
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(_)) => {
tracing::debug!("h2 connection lost; reconnecting");
self.state = ReconnectState::Idle;
}
},
}
}
}
fn call(
&mut self,
req: Request<ClientBody>,
) -> BoxFuture<'static, Result<S::Response, BoxError>> {
if let Some(e) = self.deferred_error.take() {
return Box::pin(async move { Err(e) });
}
match &mut self.state {
ReconnectState::Connected(svc) => {
let fut = svc.call(req);
Box::pin(async move { fut.await.map_err(Into::into) })
}
_ => {
Box::pin(async {
Err("Http2Connection::call before poll_ready returned Ready"
.to_string()
.into())
})
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn lazy_plaintext_starts_idle() {
let conn = Http2Connection::lazy_plaintext("http://localhost:0".parse().unwrap());
let _ = conn;
}
#[tokio::test]
async fn connect_plaintext_to_nonexistent_fails() {
let err = Http2Connection::connect_plaintext("http://127.0.0.1:1".parse().unwrap()).await;
assert!(err.is_err(), "expected connect to port 1 to fail");
}
#[tokio::test]
async fn connect_plaintext_rejects_https() {
let err = Http2Connection::connect_plaintext("https://localhost:8080".parse().unwrap())
.await
.unwrap_err();
assert_eq!(err.code, crate::error::ErrorCode::InvalidArgument);
assert!(err.message.as_deref().unwrap().contains("http://"));
}
#[test]
fn require_http_scheme_cases() {
assert!(require_http_scheme(&"http://foo".parse().unwrap()).is_ok());
assert!(require_http_scheme(&"/path".parse().unwrap()).is_ok());
assert!(require_http_scheme(&"https://foo".parse().unwrap()).is_err());
}
#[cfg(feature = "client-tls")]
#[test]
fn require_https_scheme_cases() {
assert!(require_https_scheme(&"https://foo".parse().unwrap()).is_ok());
assert!(require_https_scheme(&"http://foo".parse().unwrap()).is_err());
assert!(require_https_scheme(&"/path".parse().unwrap()).is_err());
}
#[cfg(feature = "client-tls")]
#[test]
fn prepare_tls_for_h2_sets_alpn() {
let cfg = Arc::new(
rustls::ClientConfig::builder()
.with_root_certificates(rustls::RootCertStore::empty())
.with_no_client_auth(),
);
let prepared = prepare_tls_for_h2(&cfg);
assert_eq!(prepared.alpn_protocols, vec![b"h2".to_vec()]);
}
#[cfg(feature = "client-tls")]
#[test]
fn prepare_tls_for_h2_shares_cert_resolver() {
let cfg = Arc::new(
rustls::ClientConfig::builder()
.with_root_certificates(rustls::RootCertStore::empty())
.with_no_client_auth(),
);
let prepared = prepare_tls_for_h2(&cfg);
assert!(Arc::ptr_eq(
&cfg.client_auth_cert_resolver,
&prepared.client_auth_cert_resolver
));
}
#[cfg(feature = "client-tls")]
#[test]
fn server_name_from_uri_extracts_host() {
let name = server_name_from_uri(&"https://example.com:8080/path".parse().unwrap()).unwrap();
assert_eq!(format!("{name:?}"), "DnsName(\"example.com\")");
}
#[cfg(feature = "client-tls")]
#[test]
fn server_name_from_uri_ipv4() {
let name = server_name_from_uri(&"https://10.0.0.1:8443".parse().unwrap()).unwrap();
assert!(matches!(name, rustls_pki_types::ServerName::IpAddress(_)));
}
#[cfg(feature = "client-tls")]
#[test]
fn server_name_from_uri_ipv6_strips_brackets() {
let name = server_name_from_uri(&"https://[::1]:8443".parse().unwrap()).unwrap();
assert!(matches!(name, rustls_pki_types::ServerName::IpAddress(_)));
}
#[cfg(feature = "client-tls")]
#[tokio::test]
async fn connect_tls_rejects_http_scheme() {
let cfg = Arc::new(
rustls::ClientConfig::builder()
.with_root_certificates(rustls::RootCertStore::empty())
.with_no_client_auth(),
);
let result =
Http2Connection::connect_tls("http://localhost:8080".parse().unwrap(), cfg).await;
let err = match result {
Err(e) => e,
Ok(_) => panic!("expected http:// to be rejected"),
};
assert_eq!(err.code, crate::error::ErrorCode::InvalidArgument);
}
#[test]
fn lazy_with_connector_starts_idle() {
let conn = Http2Connection::lazy_with_connector(
tower::service_fn(|_uri: Uri| async {
Err::<hyper_util::rt::TokioIo<tokio::net::TcpStream>, _>(std::io::Error::other(
"unreachable",
))
}),
"http://localhost".parse().unwrap(),
);
let _ = conn;
}
#[tokio::test]
async fn connect_with_connector_propagates_error() {
let err = Http2Connection::connect_with_connector(
tower::service_fn(|_uri: Uri| async {
Err::<hyper_util::rt::TokioIo<tokio::net::TcpStream>, _>(std::io::Error::other(
"dial refused",
))
}),
"http://localhost".parse().unwrap(),
)
.await
.unwrap_err();
assert_eq!(err.code, crate::error::ErrorCode::Unavailable);
assert!(
err.message.as_deref().unwrap().contains("dial refused"),
"error should propagate connector message, got: {err:?}"
);
}
#[cfg(unix)]
#[test]
fn lazy_unix_starts_idle() {
let conn = Http2Connection::lazy_unix(
"/nonexistent/test.sock",
"http://localhost".parse().unwrap(),
);
let _ = conn;
}
#[cfg(unix)]
#[tokio::test]
async fn connect_unix_nonexistent_fails() {
let path = "/nonexistent/buffa-test.sock";
let err = Http2Connection::connect_unix(path, "http://localhost".parse().unwrap())
.await
.unwrap_err();
assert_eq!(err.code, crate::error::ErrorCode::Unavailable);
assert!(
err.message.as_deref().unwrap().contains(path),
"error should include socket path, got: {err:?}"
);
}
#[test]
fn builder_defaults_are_finite() {
let builder = Http2Connection::builder();
assert_eq!(
builder.tcp_connect_timeout,
Some(DEFAULT_TCP_CONNECT_TIMEOUT)
);
assert_eq!(
builder.establishment_timeout,
Some(DEFAULT_ESTABLISHMENT_TIMEOUT)
);
}
#[test]
fn builder_setters_record_durations() {
let builder = Http2Connection::builder()
.tcp_connect_timeout(Duration::from_millis(10))
.establishment_timeout(Duration::from_millis(20));
assert_eq!(builder.tcp_connect_timeout, Some(Duration::from_millis(10)));
assert_eq!(
builder.establishment_timeout,
Some(Duration::from_millis(20))
);
let unbounded = Http2Connection::builder()
.no_tcp_connect_timeout()
.no_establishment_timeout();
assert_eq!(unbounded.tcp_connect_timeout, None);
assert_eq!(unbounded.establishment_timeout, None);
let max = Http2Connection::builder()
.tcp_connect_timeout(Duration::MAX)
.establishment_timeout(Duration::MAX);
assert_eq!(max.tcp_connect_timeout, None);
assert_eq!(max.establishment_timeout, None);
}
#[tokio::test]
async fn builder_tcp_connect_timeout_bounds_tcp_connect() {
use std::time::Instant;
let start = Instant::now();
let result = Http2Connection::builder()
.tcp_connect_timeout(Duration::from_millis(100))
.connect_plaintext("http://192.0.2.1:9".parse().unwrap())
.await;
let elapsed = start.elapsed();
let err = match result {
Err(e) => e,
Ok(_) => {
eprintln!("skipping: TEST-NET-1 connect succeeded (proxy?) in {elapsed:?}");
return;
}
};
assert_eq!(err.code, crate::error::ErrorCode::Unavailable);
if elapsed < Duration::from_millis(90) {
eprintln!(
"skipping lower-bound check: host rejected TEST-NET-1 \
in {elapsed:?} ({err:?})"
);
return;
}
assert!(
elapsed < Duration::from_secs(2),
"tcp_connect_timeout(100ms) should abort within ~2s, took {elapsed:?}: {err:?}"
);
}
#[cfg(feature = "client-tls")]
#[tokio::test]
async fn establishment_timeout_fires_when_tls_server_stalls_after_accept() {
use std::time::Instant;
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
let mut held = Vec::new();
while let Ok((stream, _)) = listener.accept().await {
held.push(stream);
}
});
let tls_config = Arc::new(
rustls::ClientConfig::builder()
.with_root_certificates(rustls::RootCertStore::empty())
.with_no_client_auth(),
);
let uri: Uri = format!("https://{addr}").parse().unwrap();
let start = Instant::now();
let err = Http2Connection::builder()
.establishment_timeout(Duration::from_millis(150))
.connect_tls(uri, tls_config)
.await
.unwrap_err();
let elapsed = start.elapsed();
assert_eq!(err.code, crate::error::ErrorCode::Unavailable);
assert!(
err.message
.as_deref()
.unwrap()
.contains("establishment did not complete"),
"expected a handshake-timeout message, got: {err:?}"
);
assert!(
elapsed < Duration::from_secs(2),
"establishment_timeout(150ms) should fire within ~2s, took {elapsed:?}"
);
server.abort();
}
#[cfg(feature = "client-tls")]
#[tokio::test]
async fn establishment_timeout_applies_with_custom_h2_settings() {
use std::time::Instant;
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
let mut held = Vec::new();
while let Ok((stream, _)) = listener.accept().await {
held.push(stream);
}
});
let tls_config = Arc::new(
rustls::ClientConfig::builder()
.with_root_certificates(rustls::RootCertStore::empty())
.with_no_client_auth(),
);
let uri: Uri = format!("https://{addr}").parse().unwrap();
let start = Instant::now();
let err = Http2Connection::builder()
.keep_alive_interval(Duration::from_secs(30))
.keep_alive_while_idle(true)
.h2_settings(|b| {
b.max_frame_size(1 << 14);
})
.establishment_timeout(Duration::from_millis(150))
.connect_tls(uri, tls_config)
.await
.unwrap_err();
let elapsed = start.elapsed();
assert_eq!(err.code, crate::error::ErrorCode::Unavailable);
assert!(
err.message
.as_deref()
.unwrap()
.contains("establishment did not complete"),
"expected a handshake-timeout message, got: {err:?}"
);
assert!(
elapsed < Duration::from_secs(2),
"establishment_timeout(150ms) should fire within ~2s, took {elapsed:?}"
);
server.abort();
}
#[tokio::test]
async fn establishment_timeout_bounds_custom_connector_dial() {
use std::time::Instant;
let never = tower::service_fn(|_uri: Uri| async move {
std::future::pending::<()>().await;
Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(tokio::io::duplex(1).0))
});
let start = Instant::now();
let err = Http2Connection::builder()
.establishment_timeout(Duration::from_millis(150))
.connect_with_connector(never, "http://localhost".parse().unwrap())
.await
.unwrap_err();
let elapsed = start.elapsed();
assert_eq!(err.code, crate::error::ErrorCode::Unavailable);
assert!(
err.message
.as_deref()
.unwrap()
.contains("establishment did not complete"),
"expected a handshake-timeout message, got: {err:?}"
);
assert!(
elapsed < Duration::from_secs(2),
"establishment_timeout(150ms) should fire within ~2s, took {elapsed:?}"
);
}
#[cfg(feature = "server")]
#[tokio::test]
async fn handshake_succeeds_within_generous_bound() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(async move {
let io = hyper_util::rt::TokioIo::new(stream);
let service =
hyper::service::service_fn(|_req: Request<hyper::body::Incoming>| async {
Ok::<_, std::convert::Infallible>(Response::new(
http_body_util::Full::new(bytes::Bytes::from_static(b"ok")),
))
});
let _ = hyper::server::conn::http2::Builder::new(
hyper_util::rt::TokioExecutor::new(),
)
.serve_connection(io, service)
.await;
});
}
});
let uri: Uri = format!("http://{addr}").parse().unwrap();
let conn = Http2Connection::builder()
.tcp_connect_timeout(Duration::from_secs(5))
.establishment_timeout(Duration::from_secs(5))
.connect_plaintext(uri)
.await
.expect("establishment should succeed within a generous bound");
let _ = conn;
server.abort();
}
}