use crate::async_impl::h3_client::dns::resolve;
use crate::dns::DynResolver;
use crate::error::BoxError;
use bytes::Bytes;
use h3::client::SendRequest;
use h3_quinn::{Connection, OpenStreams};
use http::Uri;
use hyper_util::client::legacy::connect::dns::Name;
use quinn::crypto::rustls::QuicClientConfig;
use quinn::{ClientConfig, Endpoint, TransportConfig};
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
type H3Connection = (
h3::client::Connection<Connection, Bytes>,
SendRequest<OpenStreams, Bytes>,
);
const HAPPY_EYEBALLS_DELAY: Duration = Duration::from_millis(250);
#[derive(Clone)]
pub(crate) struct H3ClientConfig {
pub(crate) max_field_section_size: Option<u64>,
pub(crate) send_grease: Option<bool>,
}
impl Default for H3ClientConfig {
fn default() -> Self {
Self {
max_field_section_size: None,
send_grease: None,
}
}
}
#[derive(Clone)]
pub(crate) struct H3Connector {
resolver: DynResolver,
endpoint: Endpoint,
client_config: H3ClientConfig,
local_addr: Option<IpAddr>,
}
impl H3Connector {
pub fn new(
resolver: DynResolver,
tls: rustls::ClientConfig,
local_addr: Option<IpAddr>,
transport_config: TransportConfig,
client_config: H3ClientConfig,
) -> Result<H3Connector, BoxError> {
let quic_client_config = Arc::new(QuicClientConfig::try_from(tls)?);
let mut config = ClientConfig::new(quic_client_config);
config.transport_config(Arc::new(transport_config));
let socket_addr = match local_addr {
Some(ip) => SocketAddr::new(ip, 0),
None => "[::]:0".parse::<SocketAddr>().unwrap(),
};
let mut endpoint = Endpoint::client(socket_addr)?;
endpoint.set_default_client_config(config);
Ok(Self {
resolver,
endpoint,
client_config,
local_addr,
})
}
pub async fn connect(&mut self, dest: Uri) -> Result<H3Connection, BoxError> {
let host = dest
.host()
.ok_or("destination must have a host")?
.trim_start_matches('[')
.trim_end_matches(']');
let port = dest.port_u16().unwrap_or(443);
let addrs = if let Some(addr) = IpAddr::from_str(host).ok() {
vec![SocketAddr::new(addr, port)]
} else {
let addrs = resolve(&mut self.resolver, Name::from_str(host)?).await?;
let addrs = addrs.map(|mut addr| {
addr.set_port(port);
addr
});
addrs.collect()
};
self.remote_connect(addrs, host).await
}
async fn remote_connect(
&mut self,
addrs: Vec<SocketAddr>,
server_name: &str,
) -> Result<H3Connection, BoxError> {
if addrs.is_empty() {
return Err("no addresses to connect to".into());
}
let (mut ipv6_addrs, mut ipv4_addrs): (Vec<SocketAddr>, Vec<SocketAddr>) =
addrs.into_iter().partition(|addr| addr.is_ipv6());
if let Some(local_ip) = self.local_addr {
if local_ip.is_ipv6() {
ipv4_addrs.clear();
} else {
ipv6_addrs.clear();
}
}
if ipv6_addrs.is_empty() {
return Self::try_addresses_static(
&self.endpoint,
&ipv4_addrs,
server_name,
&self.client_config,
)
.await;
}
if ipv4_addrs.is_empty() {
return Self::try_addresses_static(
&self.endpoint,
&ipv6_addrs,
server_name,
&self.client_config,
)
.await;
}
let endpoint = self.endpoint.clone();
let client_config = self.client_config.clone();
if self.local_addr.is_some() {
return match Self::try_addresses_static(
&endpoint,
&ipv6_addrs,
server_name,
&client_config,
)
.await
{
Ok(conn) => Ok(conn),
Err(_) => {
Self::try_addresses_static(&endpoint, &ipv4_addrs, server_name, &client_config)
.await
}
};
}
Self::try_addresses_happy_eyeballs(
&endpoint,
&ipv6_addrs,
&ipv4_addrs,
server_name,
&client_config,
)
.await
}
async fn try_addresses_static(
endpoint: &Endpoint,
addrs: &[SocketAddr],
server_name: &str,
client_config: &H3ClientConfig,
) -> Result<H3Connection, BoxError> {
let mut last_err: Option<BoxError> = None;
for addr in addrs {
match endpoint.connect(*addr, server_name) {
Ok(connecting) => match connecting.await {
Ok(new_conn) => {
let quinn_conn = Connection::new(new_conn);
let mut h3_client_builder = h3::client::builder();
if let Some(max_field_section_size) = client_config.max_field_section_size {
h3_client_builder.max_field_section_size(max_field_section_size);
}
if let Some(send_grease) = client_config.send_grease {
h3_client_builder.send_grease(send_grease);
}
return Ok(h3_client_builder.build(quinn_conn).await?);
}
Err(e) => {
last_err = Some(Box::new(e) as BoxError);
}
},
Err(e) => {
last_err = Some(Box::new(e) as BoxError);
}
}
}
Err(last_err.unwrap_or_else(|| "no addresses available".into()))
}
async fn try_addresses_happy_eyeballs(
endpoint: &Endpoint,
ipv6_addrs: &[SocketAddr],
ipv4_addrs: &[SocketAddr],
server_name: &str,
client_config: &H3ClientConfig,
) -> Result<H3Connection, BoxError> {
let ipv6_connect =
Self::try_addresses_static(endpoint, ipv6_addrs, server_name, client_config);
tokio::pin!(ipv6_connect);
let delay = tokio::time::sleep(HAPPY_EYEBALLS_DELAY);
tokio::pin!(delay);
tokio::select! {
result = &mut ipv6_connect => {
return match result {
Ok(conn) => Ok(conn),
Err(_) => {
Self::try_addresses_static(endpoint, ipv4_addrs, server_name, client_config).await
}
};
}
_ = &mut delay => {}
}
let ipv4_connect =
Self::try_addresses_static(endpoint, ipv4_addrs, server_name, client_config);
tokio::pin!(ipv4_connect);
let wait_for_ipv6 = tokio::select! {
result = &mut ipv6_connect => {
match result {
Ok(conn) => return Ok(conn),
Err(_) => false,
}
}
result = &mut ipv4_connect => {
match result {
Ok(conn) => return Ok(conn),
Err(_) => true,
}
}
};
if wait_for_ipv6 {
ipv6_connect.await
} else {
ipv4_connect.await
}
}
}