use crate::error::{Error, ErrorKind, Result};
use crate::pool::PoolConfig;
use crate::request::Request;
use crate::response::Response;
#[cfg(feature = "h3")]
use async_net::UdpSocket;
#[cfg(feature = "h3")]
use crate::connection::quic::{bind_quic_socket, build_quiche_config, resolve_quic_peer_addr};
#[cfg(feature = "h3")]
use async_channel::{Receiver, Sender, TryRecvError};
#[cfg(feature = "h3")]
use bytes::Bytes;
#[cfg(feature = "h3")]
use futures_lite::{StreamExt, future};
#[cfg(feature = "h3")]
use quiche::h3::NameValue;
#[cfg(feature = "h3")]
use std::collections::{HashMap, VecDeque};
#[cfg(feature = "h3")]
use std::net::{IpAddr, SocketAddr};
#[cfg(feature = "h3")]
use std::sync::OnceLock;
#[cfg(feature = "h3")]
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
#[cfg(feature = "h3")]
use std::time::{Duration, Instant};
#[cfg(feature = "h3")]
use crate::body::{Body, BodyData};
#[cfg(feature = "h3")]
use crate::browser_emulation::BrowserProfile;
#[cfg(feature = "h3")]
use crate::decode::CompressionMode;
#[cfg(feature = "h3")]
use crate::decode::maybe_decode_response_body;
#[cfg(feature = "h3")]
use crate::dns::{DnsCache, DnsConfig};
#[cfg(feature = "h3")]
use crate::header::HeaderMap;
#[cfg(feature = "h3")]
use crate::metrics::Metrics;
#[cfg(feature = "h3")]
use crate::pool::PoolKey;
#[cfg(feature = "h3")]
use crate::progress::{ProgressConfig, ProgressPhase, ProgressReporter};
#[cfg(feature = "h3")]
use crate::request::{Method, ProgressCallback, TimeoutConfig};
#[cfg(feature = "h3")]
use crate::response::{StatusCode, TrailerState, Version};
#[cfg(feature = "h3")]
use crate::retry::should_retry_stale_connection;
#[cfg(feature = "h3")]
use crate::tls::{TlsConfig, verify_pinned_certificate};
#[cfg(feature = "h3")]
use crate::url::Url;
#[cfg(feature = "h3")]
use crate::util::{build_httpx_regular_headers, parse_content_length, response_body_allowed};
#[cfg(feature = "h3")]
const MAX_DATAGRAM_SIZE: usize = 1350;
#[cfg(feature = "h3")]
const H3_REQUEST_CANCELLED: u64 = 0x10c;
#[cfg(feature = "h3")]
static NEXT_H3_CONN_ID: AtomicU64 = AtomicU64::new(1);
#[derive(Default)]
pub(crate) struct ConnectionPool {
#[cfg(feature = "h3")]
connections: HashMap<PoolKey, Vec<H3ClientConnection>>,
}
impl ConnectionPool {
#[cfg(feature = "h3")]
fn acquire(&mut self, key: &PoolKey, pool_config: PoolConfig) -> Option<H3ClientConnection> {
let now = Instant::now();
let connections = self.connections.get_mut(key)?;
connections.retain(|connection| !connection.should_evict(now, pool_config));
let selected = connections
.iter()
.filter(|connection| connection.can_accept_new_request())
.min_by_key(|connection| connection.load())
.cloned();
if connections.is_empty() {
self.connections.remove(key);
}
selected
}
#[cfg(feature = "h3")]
fn insert(
&mut self,
key: PoolKey,
connection: H3ClientConnection,
pool_config: PoolConfig,
) -> bool {
if pool_config.max_idle_per_host == 0 {
return false;
}
let now = Instant::now();
let connections = self.connections.entry(key).or_default();
connections.retain(|existing| !existing.should_evict(now, pool_config));
if connections
.iter()
.any(|existing| existing.ptr_eq(&connection))
{
return true;
}
if connections.len() >= pool_config.max_idle_per_host {
return false;
}
connections.push(connection);
true
}
pub(crate) fn clear(&mut self) {
#[cfg(feature = "h3")]
{
for connections in self.connections.values() {
for connection in connections {
connection.close();
}
}
self.connections.clear();
}
}
}
pub(crate) fn can_attempt_h3(request: &Request) -> bool {
#[cfg(feature = "h3")]
{
request.url().scheme() == "https" && request.tls_config().validate_h3_alpn().is_ok()
}
#[cfg(not(feature = "h3"))]
{
let _ = request;
false
}
}
pub(crate) fn clone_request_for_h3(request: &Request) -> Result<Request> {
let body = request.body().try_clone().ok_or_else(|| {
Error::new(
ErrorKind::BodyAlreadyConsumed,
"request body cannot be cloned for http3",
)
})?;
let mut tls_config = request.tls_config().clone();
if let Some(profile) = request.emulation_profile() {
if let Some(fp) = profile.tls_fingerprint() {
if !fp.alpn_protocols.is_empty() {
let protocols = fp
.alpn_protocols
.iter()
.filter(|p| p.as_str() == "h3")
.cloned()
.collect::<Vec<_>>();
if !protocols.is_empty() {
tls_config = tls_config.alpn_protocols(protocols);
}
}
}
}
Ok(Request::new(
request.method(),
request.url().clone(),
request.headers().clone(),
request.cookies().to_vec(),
request.timeout_config(),
request.protocol_policy(),
request.retry_policy(),
request.prior_knowledge_h2c(),
request.progress_callback().cloned(),
request.progress_config(),
request.h2_keepalive_config(),
tls_config,
request.proxy_cloned(),
request.compression_mode(),
body,
request.emulation_profile().cloned(),
))
}
#[cfg(all(test, feature = "h3", feature = "emulation"))]
mod tests {
use super::clone_request_for_h3;
use crate::Emulation;
use crate::client::shared_http1_transport;
use crate::request::{Method, ProtocolPolicy, RequestBuilder};
#[test]
fn clone_request_for_h3_applies_alpn_only_to_clone() {
let original = RequestBuilder::new(shared_http1_transport(), Method::Get, "https://x.test")
.emulation(Emulation::Chrome136)
.build_request()
.unwrap();
assert!(
original
.tls_config()
.validate_http1_alpn(ProtocolPolicy::Auto)
.is_ok()
);
let cloned = clone_request_for_h3(&original).unwrap();
assert_eq!(
cloned.tls_config().validate_h3_alpn().unwrap(),
vec!["h3".to_owned()]
);
assert!(
cloned
.tls_config()
.validate_http1_alpn(ProtocolPolicy::Auto)
.is_err()
);
}
}
#[cfg(feature = "h3")]
pub(crate) async fn execute(
request: Request,
pool: Arc<Mutex<ConnectionPool>>,
dns_cache: Arc<DnsCache>,
dns_config: DnsConfig,
local_addr: Option<SocketAddr>,
pool_config: PoolConfig,
) -> Result<Response> {
let prepared = PreparedRequest::from_request(request).await?;
let retry_prepared = prepared.try_clone();
let pool_key = PoolKey::for_h3(
&prepared.url,
&prepared.tls_config,
prepared.emulation_profile.as_ref(),
)?;
let method = prepared.method;
let pooled = pool
.lock()
.unwrap_or_else(|err| err.into_inner())
.acquire(&pool_key, pool_config);
let reused_connection = pooled.is_some();
let (connection, pooled_connection) = match pooled {
Some(connection) => (connection, true),
None => {
let connection = H3ClientConnection::connect(
&prepared.url,
prepared.timeout_config,
&prepared.tls_config,
Arc::clone(&dns_cache),
dns_config,
local_addr,
)
.await?;
let pooled_connection = pool.lock().unwrap_or_else(|err| err.into_inner()).insert(
pool_key.clone(),
connection.clone(),
pool_config,
);
if !pooled_connection {
connection.close_when_idle();
}
(connection, pooled_connection)
}
};
match connection.execute(prepared).await {
Ok(response) => Ok(annotate_metrics(response, reused_connection)),
Err(err) if should_retry_stale_connection(method, &err) => {
let Some(prepared) = retry_prepared else {
if !pooled_connection {
connection.close();
}
return Err(err);
};
connection.close();
let connection = H3ClientConnection::connect(
&prepared.url,
prepared.timeout_config,
&prepared.tls_config,
Arc::clone(&dns_cache),
dns_config,
local_addr,
)
.await?;
let pooled_retry =
pool.lock()
.unwrap()
.insert(pool_key, connection.clone(), pool_config);
if !pooled_retry {
connection.close_when_idle();
}
let result = connection
.execute(prepared)
.await
.map(|response| annotate_metrics(response, false));
if result.is_err() && !pooled_retry {
connection.close();
}
result
}
Err(err) => {
if !pooled_connection {
connection.close();
}
Err(err)
}
}
}
#[cfg(feature = "h3")]
fn annotate_metrics(response: Response, reused_connection: bool) -> Response {
let version = response.version();
response.with_metrics(
Metrics::default()
.with_protocol(version)
.with_reused_connection(reused_connection),
)
}
#[cfg(not(feature = "h3"))]
pub(crate) async fn execute(
request: Request,
_pool: Arc<Mutex<ConnectionPool>>,
_dns_cache: Arc<crate::dns::DnsCache>,
_dns_config: crate::dns::DnsConfig,
_local_addr: Option<std::net::SocketAddr>,
_pool_config: PoolConfig,
) -> Result<Response> {
if request.url().scheme() != "https" {
return Err(Error::new(
ErrorKind::Transport,
"http3-only requested, but HTTP/3 requires https",
));
}
Err(Error::new(
ErrorKind::Transport,
"http3-only requested, but the h3 feature is disabled",
))
}
#[cfg(feature = "h3")]
enum PreparedRequestBody {
Bytes(Bytes),
Stream {
receiver: Receiver<Result<Option<Bytes>>>,
wake_sender: Arc<Mutex<Option<Sender<ConnectionCommand>>>>,
abort_sender: Sender<()>,
},
}
#[cfg(feature = "h3")]
impl PreparedRequestBody {
fn try_clone(&self) -> Option<Self> {
match self {
Self::Bytes(bytes) => Some(Self::Bytes(bytes.clone())),
Self::Stream { .. } => None,
}
}
fn content_length(&self) -> Option<u64> {
match self {
Self::Bytes(bytes) => Some(bytes.len() as u64),
Self::Stream { .. } => None,
}
}
fn upload_total(&self) -> Option<usize> {
match self {
Self::Bytes(bytes) => Some(bytes.len()),
Self::Stream { .. } => None,
}
}
fn into_active(self) -> ActiveRequestBody {
match self {
Self::Bytes(bytes) => ActiveRequestBody::Bytes { data: bytes },
Self::Stream {
receiver,
wake_sender,
abort_sender,
} => ActiveRequestBody::Stream {
receiver,
wake_sender,
abort_sender,
buffer: Bytes::new(),
offset: 0,
finished: false,
},
}
}
fn abort_upload(&self) {
if let Self::Stream { abort_sender, .. } = self {
let _ = abort_sender.try_send(());
}
}
}
#[cfg(feature = "h3")]
struct PreparedRequest {
method: Method,
url: Url,
headers: HeaderMap,
cookies: Vec<(String, String)>,
timeout_config: TimeoutConfig,
progress_callback: Option<ProgressCallback>,
progress_config: ProgressConfig,
tls_config: TlsConfig,
compression_mode: CompressionMode,
body: PreparedRequestBody,
emulation_profile: Option<BrowserProfile>,
}
#[cfg(feature = "h3")]
impl PreparedRequest {
async fn from_request(request: Request) -> Result<Self> {
let (
method,
url,
headers,
cookies,
timeout_config,
_protocol_policy,
_retry_policy,
_prior_knowledge_h2c,
progress_callback,
progress_config,
_h2_keepalive_config,
tls_config,
_proxy,
compression_mode,
body,
browser_profile,
) = request.into_parts();
if url.scheme() != "https" {
return Err(Error::new(
ErrorKind::Transport,
"http3-only requested, but HTTP/3 requires https",
));
}
let body = match body.into_data()? {
BodyData::Bytes(bytes) => PreparedRequestBody::Bytes(bytes),
BodyData::Stream(mut stream_body) => {
let (sender, receiver) = async_channel::unbounded();
let (abort_sender, abort_receiver) = async_channel::bounded::<()>(1);
let wake_sender = Arc::new(Mutex::new(None::<Sender<ConnectionCommand>>));
let task_wake_sender = Arc::clone(&wake_sender);
std::thread::Builder::new()
.name("request-h3-upload".to_owned())
.spawn(move || {
async_io::block_on(async move {
loop {
enum UploadEvent {
Chunk(Option<Result<Bytes>>),
Abort,
}
let event = future::or(
async { UploadEvent::Chunk(stream_body.next().await) },
async {
let _ = abort_receiver.recv().await;
UploadEvent::Abort
},
)
.await;
match event {
UploadEvent::Chunk(Some(chunk)) => {
if sender.send(chunk.map(Some)).await.is_err() {
return;
}
if let Some(connection_sender) = task_wake_sender
.lock()
.unwrap_or_else(|err| err.into_inner())
.clone()
{
let _ = connection_sender
.try_send(ConnectionCommand::Drive);
}
}
UploadEvent::Chunk(None) => {
if sender.send(Ok(None)).await.is_ok() {
if let Some(connection_sender) = task_wake_sender
.lock()
.unwrap_or_else(|err| err.into_inner())
.clone()
{
let _ = connection_sender
.try_send(ConnectionCommand::Drive);
}
}
return;
}
UploadEvent::Abort => return,
}
}
});
})
.map_err(|err| {
Error::with_source(
ErrorKind::Transport,
"failed to spawn http3 upload task",
err,
)
})?;
PreparedRequestBody::Stream {
receiver,
wake_sender,
abort_sender,
}
}
};
Ok(Self {
method,
url,
headers,
cookies,
timeout_config,
progress_callback,
progress_config,
tls_config,
compression_mode,
body,
emulation_profile: browser_profile,
})
}
fn try_clone(&self) -> Option<Self> {
Some(Self {
method: self.method,
url: self.url.clone(),
headers: self.headers.clone(),
cookies: self.cookies.clone(),
timeout_config: self.timeout_config,
progress_callback: self.progress_callback.clone(),
progress_config: self.progress_config,
tls_config: self.tls_config.clone(),
compression_mode: self.compression_mode,
body: self.body.try_clone()?,
emulation_profile: self.emulation_profile.clone(),
})
}
}
#[cfg(feature = "h3")]
#[derive(Clone)]
struct H3ClientConnection {
shared: Arc<SharedConnection>,
}
#[cfg(feature = "h3")]
impl H3ClientConnection {
async fn connect(
url: &Url,
timeout_config: TimeoutConfig,
tls_config: &TlsConfig,
dns_cache: Arc<DnsCache>,
dns_config: DnsConfig,
local_addr: Option<SocketAddr>,
) -> Result<Self> {
let core = ConnectionCore::connect(
url,
timeout_config,
tls_config,
dns_cache,
dns_config,
local_addr,
)
.await?;
let (sender, receiver) = async_channel::unbounded();
let shared = Arc::new(SharedConnection::new(sender));
let task_shared = Arc::clone(&shared);
let _ = std::thread::Builder::new()
.name(format!(
"request-h3-{}:{}",
url.host(),
url.effective_port()
))
.spawn(move || {
async_io::block_on(async move {
run_connection_task(core, receiver, task_shared).await;
});
})
.map_err(|err| {
Error::with_source(ErrorKind::Transport, "failed to spawn h3 task", err)
})?;
Ok(Self { shared })
}
async fn execute(&self, request: PreparedRequest) -> Result<Response> {
if self.is_closed() {
return Err(Error::new(
ErrorKind::StaleConnection,
"http3 connection is closed",
));
}
self.shared.active_requests.fetch_add(1, Ordering::SeqCst);
self.touch();
let (response_tx, response_rx) = async_channel::bounded(1);
self.shared
.sender
.send(ConnectionCommand::Execute(ExecuteCommand {
request,
response_tx,
}))
.await
.map_err(|_| {
self.shared.active_requests.fetch_sub(1, Ordering::SeqCst);
self.touch();
Error::new(
ErrorKind::StaleConnection,
"failed to submit request to http3 connection",
)
})?;
let result = response_rx.recv().await.map_err(|_| {
self.touch();
Error::new(
ErrorKind::StaleConnection,
"http3 connection task stopped before completing the request",
)
})?;
self.touch();
result
}
fn load(&self) -> usize {
self.shared.active_requests.load(Ordering::SeqCst)
}
fn ptr_eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.shared, &other.shared)
}
fn can_accept_new_request(&self) -> bool {
!self.is_closed()
&& self.shared.accepting_new_requests.load(Ordering::SeqCst)
&& self.shared.available_request_streams.load(Ordering::SeqCst) > 0
}
fn should_evict(&self, now: Instant, pool_config: PoolConfig) -> bool {
if self.is_closed() {
return true;
}
if self.load() > 0 {
return false;
}
if !self.shared.accepting_new_requests.load(Ordering::SeqCst) {
return true;
}
match pool_config.idle_timeout {
Some(timeout) => {
now.duration_since(
*self
.shared
.last_used
.lock()
.unwrap_or_else(|err| err.into_inner()),
) > timeout
}
None => false,
}
}
fn is_closed(&self) -> bool {
self.shared.closed.load(Ordering::SeqCst) || self.shared.sender.is_closed()
}
fn close(&self) {
self.shared.closed.store(true, Ordering::SeqCst);
self.shared.sender.close();
}
fn close_when_idle(&self) {
self.shared.close_when_idle.store(true, Ordering::SeqCst);
}
fn touch(&self) {
*self
.shared
.last_used
.lock()
.unwrap_or_else(|err| err.into_inner()) = Instant::now();
}
}
#[cfg(feature = "h3")]
struct SharedConnection {
sender: Sender<ConnectionCommand>,
active_requests: AtomicUsize,
closed: AtomicBool,
close_when_idle: AtomicBool,
accepting_new_requests: AtomicBool,
available_request_streams: AtomicUsize,
last_used: Mutex<Instant>,
}
#[cfg(feature = "h3")]
impl SharedConnection {
fn new(sender: Sender<ConnectionCommand>) -> Self {
Self {
sender,
active_requests: AtomicUsize::new(0),
closed: AtomicBool::new(false),
close_when_idle: AtomicBool::new(false),
accepting_new_requests: AtomicBool::new(true),
available_request_streams: AtomicUsize::new(usize::MAX),
last_used: Mutex::new(Instant::now()),
}
}
}
#[cfg(feature = "h3")]
enum ConnectionCommand {
Execute(ExecuteCommand),
CancelStream { stream_id: u64 },
Drive,
}
#[cfg(feature = "h3")]
struct ExecuteCommand {
request: PreparedRequest,
response_tx: Sender<Result<Response>>,
}
#[cfg(feature = "h3")]
struct ConnectionCore {
socket: UdpSocket,
local_addr: SocketAddr,
peer_addr: SocketAddr,
conn: quiche::Connection,
h3_config: quiche::h3::Config,
h3_conn: Option<quiche::h3::Connection>,
url: Url,
tls_config: TlsConfig,
pin_verified: bool,
timeout_config: TimeoutConfig,
recv_buffer: Vec<u8>,
send_buffer: Vec<u8>,
}
#[cfg(feature = "h3")]
impl ConnectionCore {
async fn connect(
url: &Url,
timeout_config: TimeoutConfig,
tls_config: &TlsConfig,
dns_cache: Arc<DnsCache>,
dns_config: DnsConfig,
local_addr: Option<SocketAddr>,
) -> Result<Self> {
let peer_addr = resolve_quic_peer_addr(url, &dns_cache, dns_config)?;
let (socket, local_addr) = bind_quic_socket(peer_addr, local_addr).await?;
let mut transport_config = build_quiche_config(tls_config)?;
let scid_bytes = next_connection_id();
let scid = quiche::ConnectionId::from_ref(&scid_bytes);
let server_name = server_name(url);
let conn = quiche::connect(
server_name.as_deref(),
&scid,
local_addr,
peer_addr,
&mut transport_config,
)
.map_err(|err| {
Error::with_source(
ErrorKind::Transport,
"failed to initialize http3 connection",
err,
)
})?;
Ok(Self {
socket,
local_addr,
peer_addr,
conn,
h3_config: build_h3_config()?,
h3_conn: None,
url: url.clone(),
tls_config: tls_config.clone(),
pin_verified: false,
timeout_config,
recv_buffer: vec![0_u8; 64 * 1024],
send_buffer: vec![0_u8; MAX_DATAGRAM_SIZE],
})
}
fn ensure_h3_ready(&mut self) -> Result<()> {
if self.conn.is_established() && !self.pin_verified {
if let Some(cert) = self.conn.peer_cert() {
verify_pinned_certificate(
&self.tls_config.pinned_certificates,
self.url.host(),
cert,
)?;
}
self.pin_verified = true;
}
if self.conn.is_established() && self.h3_conn.is_none() {
self.h3_conn = Some(
quiche::h3::Connection::with_transport(&mut self.conn, &self.h3_config).map_err(
|err| {
Error::with_source(
ErrorKind::Transport,
"failed to initialize http3 application layer",
err,
)
},
)?,
);
}
Ok(())
}
}
#[cfg(feature = "h3")]
struct ConnectionTask {
core: ConnectionCore,
receiver: Receiver<ConnectionCommand>,
shared: Arc<SharedConnection>,
pending: VecDeque<ExecuteCommand>,
opening: VecDeque<ActiveRequest>,
in_flight: HashMap<u64, ActiveRequest>,
}
#[cfg(feature = "h3")]
impl ConnectionTask {
fn new(
core: ConnectionCore,
receiver: Receiver<ConnectionCommand>,
shared: Arc<SharedConnection>,
) -> Self {
Self {
core,
receiver,
shared,
pending: VecDeque::new(),
opening: VecDeque::new(),
in_flight: HashMap::new(),
}
}
async fn run(&mut self) -> Result<()> {
loop {
self.drain_commands();
self.core.ensure_h3_ready()?;
self.refresh_shared_capacity();
self.prepare_pending_requests();
self.open_prepared_requests()?;
self.refresh_shared_capacity();
self.flush_outbound_data().await?;
flush_quic_packets(
&self.core.socket,
&mut self.core.conn,
&mut self.core.send_buffer,
self.core.timeout_config.write,
)
.await?;
let completed_stream = self.process_events().await?;
self.refresh_shared_capacity();
if completed_stream && !self.opening.is_empty() {
continue;
}
if self.core.conn.is_closed() {
let err = self.handle_connection_closed_by_peer();
return Err(err);
}
if self.shared.close_when_idle.load(Ordering::SeqCst)
&& self.pending.is_empty()
&& self.opening.is_empty()
&& self.in_flight.is_empty()
{
return Ok(());
}
let upload_pending = !self.opening.is_empty()
|| self
.in_flight
.values()
.any(|stream| stream.upload_pending());
let timeout =
wait_kind(&self.core.conn, upload_pending, self.core.timeout_config).duration();
let event = future::or(
async { LoopEvent::Command(self.receiver.recv().await) },
async {
LoopEvent::Recv(
recv_with_timeout(&self.core.socket, &mut self.core.recv_buffer, timeout)
.await,
)
},
)
.await;
match event {
LoopEvent::Command(Ok(command)) => self.enqueue_command(command),
LoopEvent::Command(Err(_)) => {
if self.pending.is_empty()
&& self.opening.is_empty()
&& self.in_flight.is_empty()
{
return Ok(());
}
}
LoopEvent::Recv(Ok(Some((read, from)))) => {
if from != self.core.peer_addr {
continue;
}
self.core
.conn
.recv(
&mut self.core.recv_buffer[..read],
quiche::RecvInfo {
from,
to: self.core.local_addr,
},
)
.map_err(|err| {
Error::with_source(
ErrorKind::Transport,
"failed to receive http3 packet",
err,
)
})?;
}
LoopEvent::Recv(Ok(None)) => {
self.core.conn.on_timeout();
}
LoopEvent::Recv(Err(err)) => return Err(err),
}
}
}
fn enqueue_command(&mut self, command: ConnectionCommand) {
match command {
ConnectionCommand::Execute(command) => self.pending.push_back(command),
ConnectionCommand::CancelStream { stream_id } => self.cancel_stream(stream_id),
ConnectionCommand::Drive => {}
}
}
fn drain_commands(&mut self) {
loop {
match self.receiver.try_recv() {
Ok(command) => self.enqueue_command(command),
Err(TryRecvError::Empty) | Err(TryRecvError::Closed) => break,
}
}
}
fn cancel_stream(&mut self, stream_id: u64) {
let Some(mut stream) = self.in_flight.remove(&stream_id) else {
return;
};
stream.abort_request_body();
let _ =
self.core
.conn
.stream_shutdown(stream_id, quiche::Shutdown::Read, H3_REQUEST_CANCELLED);
if !stream.end_stream_sent {
let _ = self.core.conn.stream_shutdown(
stream_id,
quiche::Shutdown::Write,
H3_REQUEST_CANCELLED,
);
}
stream.body_tx = None;
stream.deferred_trailers = None;
self.release_request_slot();
}
fn fail_request_stream(&mut self, stream_id: u64, err: Error) {
let Some(stream) = self.in_flight.remove(&stream_id) else {
return;
};
let _ =
self.core
.conn
.stream_shutdown(stream_id, quiche::Shutdown::Read, H3_REQUEST_CANCELLED);
if !stream.end_stream_sent {
let _ = self.core.conn.stream_shutdown(
stream_id,
quiche::Shutdown::Write,
H3_REQUEST_CANCELLED,
);
}
self.fail_active_request(stream, err);
}
fn prepare_pending_requests(&mut self) {
if !self.shared.accepting_new_requests.load(Ordering::SeqCst) {
let err = Error::new(
ErrorKind::Transport,
"http3 connection is no longer accepting new requests",
);
while let Some(command) = self.pending.pop_front() {
self.fail_pending_command(command, &err);
}
return;
}
while let Some(command) = self.pending.pop_front() {
let ExecuteCommand {
request,
response_tx,
} = command;
let request = ActiveRequest::new(request, response_tx, self.shared.sender.clone());
request
.request_body
.attach_waker(self.shared.sender.clone());
self.opening.push_back(request);
}
}
fn open_prepared_requests(&mut self) -> Result<()> {
if self.core.h3_conn.is_none() || !self.shared.accepting_new_requests.load(Ordering::SeqCst)
{
return Ok(());
}
while let Some(mut request) = self.opening.pop_front() {
let end_stream = request.request_body.ends_stream_with_headers();
let headers = to_h3_headers(&request.request_headers);
let result = {
let h3_conn = self.core.h3_conn.as_mut().expect("h3 connection");
h3_conn.send_request(&mut self.core.conn, &headers, end_stream)
};
match result {
Ok(stream_id) => {
request.mark_headers_sent(stream_id, end_stream);
self.in_flight.insert(stream_id, request);
}
Err(quiche::h3::Error::Done) | Err(quiche::h3::Error::StreamBlocked) => {
self.opening.push_front(request);
break;
}
Err(err) => {
let err = Error::with_source(
ErrorKind::Transport,
"failed to send http3 request headers",
err,
);
self.fail_active_request(request, connection_error(&err));
return Err(err);
}
}
}
Ok(())
}
async fn flush_outbound_data(&mut self) -> Result<()> {
if self.core.h3_conn.is_none() {
return Ok(());
}
let stream_ids = self.in_flight.keys().copied().collect::<Vec<_>>();
for stream_id in stream_ids {
loop {
let (chunk, fin) = {
let Some(stream) = self.in_flight.get_mut(&stream_id) else {
break;
};
let next_chunk = match stream.next_outbound_chunk() {
Ok(next_chunk) => next_chunk,
Err(err) => {
self.fail_request_stream(stream_id, err);
break;
}
};
let Some((chunk, fin)) = next_chunk else {
break;
};
(chunk, fin)
};
let result = {
let h3_conn = self.core.h3_conn.as_mut().expect("h3 connection");
h3_conn.send_body(&mut self.core.conn, stream_id, &chunk, fin)
};
match result {
Ok(written) => {
if let Some(stream) = self.in_flight.get_mut(&stream_id) {
stream.record_sent(written, fin && written == chunk.len())?;
}
if written == 0 || (fin && written == chunk.len()) {
break;
}
}
Err(quiche::h3::Error::Done) | Err(quiche::h3::Error::StreamBlocked) => break,
Err(err) => {
let err = Error::with_source(
ErrorKind::Transport,
"failed to send http3 request body",
err,
);
if let Some(stream) = self.in_flight.remove(&stream_id) {
self.fail_active_request(stream, connection_error(&err));
}
return Err(err);
}
}
}
}
Ok(())
}
async fn process_events(&mut self) -> Result<bool> {
if self.core.h3_conn.is_none() {
return Ok(false);
}
let mut completed_stream = false;
loop {
let event = {
let h3_conn = self.core.h3_conn.as_mut().expect("h3 connection");
h3_conn.poll(&mut self.core.conn)
};
match event {
Ok((stream_id, quiche::h3::Event::Headers { list, more_frames })) => {
let Some(stream) = self.in_flight.get_mut(&stream_id) else {
continue;
};
let block = stream.response_state.apply_headers(list)?;
if matches!(block, HeaderBlockKind::Final)
|| matches!(block, HeaderBlockKind::Trailers)
{
stream
.maybe_send_streaming_response(stream_id, self.shared.sender.clone())?;
}
if !more_frames {
stream.response_state.mark_end_stream();
}
completed_stream |= self.finish_stream_if_complete(stream_id).await?;
}
Ok((stream_id, quiche::h3::Event::Data)) => {
if !self.in_flight.contains_key(&stream_id) {
let mut discard = [0_u8; 4096];
loop {
let result = {
let h3_conn = self.core.h3_conn.as_mut().expect("h3 connection");
h3_conn.recv_body(&mut self.core.conn, stream_id, &mut discard)
};
match result {
Ok(0) | Err(quiche::h3::Error::Done) => break,
Ok(_) => {}
Err(err) => {
return Err(Error::with_source(
ErrorKind::Transport,
"failed to discard unexpected http3 body data",
err,
));
}
}
}
continue;
}
let mut chunks = Vec::new();
loop {
let mut chunk = [0_u8; 4096];
let result = {
let h3_conn = self.core.h3_conn.as_mut().expect("h3 connection");
h3_conn.recv_body(&mut self.core.conn, stream_id, &mut chunk)
};
match result {
Ok(0) | Err(quiche::h3::Error::Done) => break,
Ok(read) => chunks.push(Bytes::copy_from_slice(&chunk[..read])),
Err(err) => {
return Err(Error::with_source(
ErrorKind::Transport,
"failed to read http3 response body",
err,
));
}
}
}
let mut cancel_read = false;
{
let stream = self
.in_flight
.get_mut(&stream_id)
.expect("stream must exist");
stream
.maybe_send_streaming_response(stream_id, self.shared.sender.clone())?;
for chunk in chunks {
if stream.push_response_body_chunk(chunk)? {
cancel_read = true;
}
}
}
if cancel_read {
let _ = self.core.conn.stream_shutdown(
stream_id,
quiche::Shutdown::Read,
H3_REQUEST_CANCELLED,
);
}
completed_stream |= self.finish_stream_if_complete(stream_id).await?;
}
Ok((stream_id, quiche::h3::Event::Finished)) => {
if let Some(stream) = self.in_flight.get_mut(&stream_id) {
stream.response_state.mark_end_stream();
}
completed_stream |= self.finish_stream_if_complete(stream_id).await?;
}
Ok((stream_id, quiche::h3::Event::Reset(error_code))) => {
if let Some(stream) = self.in_flight.remove(&stream_id) {
self.fail_active_request(
stream,
Error::new(
ErrorKind::Transport,
format!(
"http3 response stream reset by peer: error_code={error_code}"
),
),
);
completed_stream = true;
}
}
Ok((goaway_id, quiche::h3::Event::GoAway)) => {
self.shared
.accepting_new_requests
.store(false, Ordering::SeqCst);
self.shared.close_when_idle.store(true, Ordering::SeqCst);
let pending_error = pending_goaway_error(goaway_id);
while let Some(command) = self.pending.pop_front() {
self.fail_pending_command(command, &pending_error);
}
while let Some(stream) = self.opening.pop_front() {
self.fail_active_request(stream, connection_error(&pending_error));
}
let rejected_streams = self
.in_flight
.keys()
.copied()
.filter(|stream_id| stream_rejected_by_goaway(*stream_id, goaway_id))
.collect::<Vec<_>>();
for stream_id in rejected_streams {
if let Some(stream) = self.in_flight.remove(&stream_id) {
self.fail_active_request(
stream,
rejected_stream_goaway_error(stream_id, goaway_id),
);
completed_stream = true;
}
}
}
Ok((_priority_id, quiche::h3::Event::PriorityUpdate)) => {}
Err(quiche::h3::Error::Done) => break,
Err(err) => {
return Err(Error::with_source(
ErrorKind::Transport,
"failed to process http3 response events",
err,
));
}
}
}
Ok(completed_stream)
}
async fn finish_stream_if_complete(&mut self, stream_id: u64) -> Result<bool> {
let complete = self
.in_flight
.get(&stream_id)
.map(|stream| stream.response_state.is_complete())
.unwrap_or(false);
if !complete {
return Ok(false);
}
let mut stream = self
.in_flight
.remove(&stream_id)
.expect("stream must exist");
let cancel_unfinished_request = !stream.end_stream_sent;
if cancel_unfinished_request {
stream.abort_request_body();
let _ = self.core.conn.stream_shutdown(
stream_id,
quiche::Shutdown::Write,
H3_REQUEST_CANCELLED,
);
}
if let Some(reporter) = stream.upload_progress.as_mut() {
reporter.finish();
}
if let Some(reporter) = stream.download_progress.as_mut() {
reporter.finish();
}
if stream.response_sent {
if let Err(err) = stream.response_state.validate_received_body() {
if let Some(body_tx) = stream.body_tx.take() {
let _ = body_tx.try_send(Err(err));
}
self.release_request_slot();
return Ok(true);
}
if let Some(trailers) = stream.response_state.trailers.clone() {
if let Some(deferred_trailers) = stream.deferred_trailers.take() {
let _ = deferred_trailers.set(trailers);
}
}
drop(stream.body_tx.take());
self.release_request_slot();
return Ok(true);
}
let response = stream.response_state.into_response(stream.url.clone())?;
let _ = stream.response_tx.try_send(Ok(response));
self.release_request_slot();
Ok(true)
}
fn fail_pending_command(&self, command: ExecuteCommand, err: &Error) {
command.request.body.abort_upload();
let _ = command.response_tx.try_send(Err(connection_error(err)));
self.release_request_slot();
}
fn fail_active_request(&self, mut stream: ActiveRequest, err: Error) {
stream.abort_request_body();
if let Some(reporter) = stream.upload_progress.as_mut() {
reporter.finish();
}
if let Some(reporter) = stream.download_progress.as_mut() {
reporter.finish();
}
if stream.response_sent {
if let Some(body_tx) = stream.body_tx.take() {
let _ = body_tx.try_send(Err(connection_error(&err)));
}
} else {
let _ = stream.response_tx.try_send(Err(connection_error(&err)));
}
self.release_request_slot();
}
fn fail_queued_commands(&mut self, err: &Error) {
loop {
match self.receiver.try_recv() {
Ok(ConnectionCommand::Execute(command)) => self.fail_pending_command(command, err),
Ok(ConnectionCommand::CancelStream { .. }) | Ok(ConnectionCommand::Drive) => {}
Err(TryRecvError::Empty) | Err(TryRecvError::Closed) => break,
}
}
}
fn refresh_shared_capacity(&self) {
if self.core.h3_conn.is_none() {
return;
}
let available =
usize::try_from(self.core.conn.peer_streams_left_bidi()).unwrap_or(usize::MAX);
self.shared
.available_request_streams
.store(available, Ordering::SeqCst);
}
fn release_request_slot(&self) {
self.shared.active_requests.fetch_sub(1, Ordering::SeqCst);
*self
.shared
.last_used
.lock()
.unwrap_or_else(|err| err.into_inner()) = Instant::now();
}
fn handle_connection_closed_by_peer(&mut self) -> Error {
let reason = connection_closed_by_peer_reason(&self.core.conn);
self.shared
.accepting_new_requests
.store(false, Ordering::SeqCst);
self.shared
.available_request_streams
.store(0, Ordering::SeqCst);
self.shared.closed.store(true, Ordering::SeqCst);
self.shared.sender.close();
let pending_error = pending_peer_close_error(&reason);
while let Some(command) = self.pending.pop_front() {
self.fail_pending_command(command, &pending_error);
}
while let Some(stream) = self.opening.pop_front() {
self.fail_active_request(stream, connection_error(&pending_error));
}
let stale_streams = self
.in_flight
.iter()
.filter_map(|(stream_id, stream)| {
(!stream.response_state.received_final_headers).then_some(*stream_id)
})
.collect::<Vec<_>>();
for stream_id in stale_streams {
if let Some(stream) = self.in_flight.remove(&stream_id) {
self.fail_active_request(
stream,
peer_closed_before_response_error(stream_id, &reason),
);
}
}
Error::new(ErrorKind::Transport, reason)
}
fn fail_all(&mut self, err: &Error) {
while let Some(command) = self.pending.pop_front() {
self.fail_pending_command(command, err);
}
while let Some(stream) = self.opening.pop_front() {
self.fail_active_request(stream, connection_error(err));
}
for (_, stream) in std::mem::take(&mut self.in_flight) {
self.fail_active_request(stream, connection_error(err));
}
self.fail_queued_commands(err);
}
}
#[cfg(feature = "h3")]
async fn run_connection_task(
core: ConnectionCore,
receiver: Receiver<ConnectionCommand>,
shared: Arc<SharedConnection>,
) {
let mut task = ConnectionTask::new(core, receiver, Arc::clone(&shared));
let result = task.run().await;
if let Err(err) = result {
shared.closed.store(true, Ordering::SeqCst);
shared.sender.close();
task.fail_all(&err);
}
shared.closed.store(true, Ordering::SeqCst);
shared.sender.close();
}
#[cfg(feature = "h3")]
enum LoopEvent {
Command(Result<ConnectionCommand, async_channel::RecvError>),
Recv(Result<Option<(usize, SocketAddr)>>),
}
#[cfg(feature = "h3")]
enum ActiveRequestBody {
Bytes {
data: Bytes,
},
Stream {
receiver: Receiver<Result<Option<Bytes>>>,
wake_sender: Arc<Mutex<Option<Sender<ConnectionCommand>>>>,
abort_sender: Sender<()>,
buffer: Bytes,
offset: usize,
finished: bool,
},
}
#[cfg(feature = "h3")]
impl ActiveRequestBody {
fn attach_waker(&self, sender: Sender<ConnectionCommand>) {
if let Self::Stream { wake_sender, .. } = self {
*wake_sender.lock().unwrap_or_else(|err| err.into_inner()) = Some(sender.clone());
let _ = sender.try_send(ConnectionCommand::Drive);
}
}
fn abort_upload(&self) {
if let Self::Stream { abort_sender, .. } = self {
let _ = abort_sender.try_send(());
}
}
fn ends_stream_with_headers(&self) -> bool {
matches!(self, Self::Bytes { data } if data.is_empty())
}
}
#[cfg(feature = "h3")]
struct ActiveRequest {
url: Url,
request_headers: Vec<(String, String)>,
request_body: ActiveRequestBody,
request_body_sent: usize,
stream_id: Option<u64>,
end_stream_sent: bool,
upload_progress: Option<ProgressReporter>,
download_progress: Option<ProgressReporter>,
response_state: H3ResponseState,
response_tx: Sender<Result<Response>>,
response_sent: bool,
body_tx: Option<Sender<Result<Bytes>>>,
deferred_trailers: Option<Arc<OnceLock<HeaderMap>>>,
}
#[cfg(feature = "h3")]
impl ActiveRequest {
fn new(
request: PreparedRequest,
response_tx: Sender<Result<Response>>,
_command_tx: Sender<ConnectionCommand>,
) -> Self {
let PreparedRequest {
method,
url,
headers,
cookies,
progress_callback,
progress_config,
compression_mode,
body,
..
} = request;
let upload_total = body.upload_total();
let request_headers = build_request_headers(
method,
&url,
&headers,
&cookies,
compression_mode,
body.content_length(),
);
let upload_progress = progress_callback.clone().map(|callback| {
ProgressReporter::new(
callback,
ProgressPhase::Upload,
upload_total,
progress_config,
)
});
let download_progress = progress_callback.map(|callback| {
ProgressReporter::new(callback, ProgressPhase::Download, None, progress_config)
});
Self {
url,
request_headers,
request_body: body.into_active(),
request_body_sent: 0,
stream_id: None,
end_stream_sent: false,
upload_progress,
download_progress,
response_state: H3ResponseState::new(method, compression_mode),
response_tx,
response_sent: false,
body_tx: None,
deferred_trailers: None,
}
}
fn abort_request_body(&self) {
self.request_body.abort_upload();
}
fn mark_headers_sent(&mut self, stream_id: u64, end_stream: bool) {
self.stream_id = Some(stream_id);
if end_stream {
self.end_stream_sent = true;
if let Some(reporter) = self.upload_progress.as_mut() {
reporter.finish();
}
}
}
fn upload_pending(&self) -> bool {
self.stream_id.is_none() || !self.end_stream_sent
}
fn next_outbound_chunk(&mut self) -> Result<Option<(Bytes, bool)>> {
if self.end_stream_sent {
return Ok(None);
}
match &mut self.request_body {
ActiveRequestBody::Bytes { data } => {
if self.request_body_sent >= data.len() {
return Ok(None);
}
Ok(Some((data.slice(self.request_body_sent..), true)))
}
ActiveRequestBody::Stream {
receiver,
buffer,
offset,
finished,
..
} => {
while *offset >= buffer.len() && !*finished {
match receiver.try_recv() {
Ok(Ok(Some(chunk))) if chunk.is_empty() => continue,
Ok(Ok(Some(chunk))) => {
*buffer = chunk;
*offset = 0;
break;
}
Ok(Ok(None)) | Err(TryRecvError::Closed) => {
*finished = true;
break;
}
Ok(Err(err)) => return Err(err),
Err(TryRecvError::Empty) => break,
}
}
if *offset < buffer.len() {
Ok(Some((
buffer.slice(*offset..),
*finished && *offset == 0 && buffer.len() > 0,
)))
} else if *finished {
Ok(Some((Bytes::new(), true)))
} else {
Ok(None)
}
}
}
}
fn record_sent(&mut self, size: usize, end_stream: bool) -> Result<()> {
self.request_body_sent = self.request_body_sent.saturating_add(size);
if let ActiveRequestBody::Stream { buffer, offset, .. } = &mut self.request_body {
if size > 0 {
*offset = offset.saturating_add(size);
if *offset >= buffer.len() {
*buffer = Bytes::new();
*offset = 0;
}
}
}
if let Some(reporter) = self.upload_progress.as_mut() {
reporter.record(size);
if end_stream {
reporter.finish();
}
}
if end_stream {
self.end_stream_sent = true;
}
Ok(())
}
fn maybe_send_streaming_response(
&mut self,
stream_id: u64,
command_tx: Sender<ConnectionCommand>,
) -> Result<()> {
if self.response_sent
|| !self.response_state.received_final_headers
|| self.response_state.requires_aggregated_body()
{
return Ok(());
}
let status = self.response_state.status.ok_or_else(|| {
Error::new(
ErrorKind::Transport,
"http3 response completed without final headers",
)
})?;
let headers = self.response_state.headers.clone();
let body = if self.response_state.end_stream {
Body::empty()
} else {
let (body_tx, body_rx) = async_channel::unbounded();
self.body_tx = Some(body_tx);
Body::from_stream(Box::pin(async_stream::stream! {
let mut cancel = CancelStreamOnDrop::new(command_tx, stream_id);
while let Ok(item) = body_rx.recv().await {
yield item;
}
cancel.disarm();
}))
};
let trailers = if self.response_state.end_stream {
TrailerState::Ready(None)
} else {
let deferred = Arc::new(OnceLock::new());
self.deferred_trailers = Some(Arc::clone(&deferred));
TrailerState::Deferred(deferred)
};
self.response_sent = true;
let response = Response::new_with_trailer_state(
status,
Version::Http3,
self.url.clone(),
headers,
trailers,
body,
);
if self.response_tx.try_send(Ok(response)).is_err() {
self.body_tx = None;
self.deferred_trailers = None;
}
Ok(())
}
fn push_response_body_chunk(&mut self, chunk: Bytes) -> Result<bool> {
self.response_state.record_body_bytes(chunk.len());
if !self.response_state.received_final_headers {
return Err(Error::new(
ErrorKind::Transport,
"http3 DATA arrived before final response headers",
));
}
if !self.response_state.allows_response_body() {
if !chunk.is_empty() {
return Err(Error::new(
ErrorKind::Transport,
"http3 response body is not allowed for this request or status",
));
}
return Ok(false);
}
if self.response_state.requires_aggregated_body() {
self.response_state.push_body(&chunk);
if let Some(reporter) = self.download_progress.as_mut() {
reporter.record(chunk.len());
}
return Ok(false);
}
if let Some(reporter) = self.download_progress.as_mut() {
reporter.record(chunk.len());
}
if let Some(body_tx) = &self.body_tx {
if body_tx.try_send(Ok(chunk)).is_err() {
self.body_tx = None;
self.deferred_trailers = None;
return Ok(true);
}
return Ok(false);
}
Ok(true)
}
}
#[cfg(feature = "h3")]
struct CancelStreamOnDrop {
sender: Sender<ConnectionCommand>,
stream_id: u64,
armed: bool,
}
#[cfg(feature = "h3")]
impl CancelStreamOnDrop {
fn new(sender: Sender<ConnectionCommand>, stream_id: u64) -> Self {
Self {
sender,
stream_id,
armed: true,
}
}
fn disarm(&mut self) {
self.armed = false;
}
}
#[cfg(feature = "h3")]
impl Drop for CancelStreamOnDrop {
fn drop(&mut self) {
if self.armed {
let _ = self.sender.try_send(ConnectionCommand::CancelStream {
stream_id: self.stream_id,
});
}
}
}
#[cfg(feature = "h3")]
fn connection_error(err: &Error) -> Error {
Error::new(err.kind().clone(), err.to_string())
}
#[cfg(feature = "h3")]
fn connection_closed_by_peer_reason(conn: &quiche::Connection) -> String {
match conn.peer_error() {
Some(err) => format!("http3 connection closed by peer: {err:?}"),
None => "http3 connection closed by peer".to_owned(),
}
}
#[cfg(feature = "h3")]
fn pending_goaway_error(goaway_id: u64) -> Error {
Error::new(
ErrorKind::StaleConnection,
format!("http3 connection sent GOAWAY before request started: id={goaway_id}"),
)
}
#[cfg(feature = "h3")]
fn pending_peer_close_error(reason: &str) -> Error {
Error::new(
ErrorKind::StaleConnection,
format!("{reason} before request started"),
)
}
#[cfg(feature = "h3")]
fn stream_rejected_by_goaway(stream_id: u64, goaway_id: u64) -> bool {
stream_id >= goaway_id
}
#[cfg(feature = "h3")]
fn peer_closed_before_response_error(stream_id: u64, reason: &str) -> Error {
Error::new(
ErrorKind::StaleConnection,
format!("{reason} before stream {stream_id} received response headers"),
)
}
#[cfg(feature = "h3")]
fn rejected_stream_goaway_error(stream_id: u64, goaway_id: u64) -> Error {
Error::new(
ErrorKind::StaleConnection,
format!("http3 connection sent GOAWAY before stream {stream_id} completed: id={goaway_id}"),
)
}
#[cfg(feature = "h3")]
fn build_h3_config() -> Result<quiche::h3::Config> {
quiche::h3::Config::new().map_err(|err| {
Error::with_source(
ErrorKind::Transport,
"failed to initialize http3 config",
err,
)
})
}
#[cfg(feature = "h3")]
fn build_request_headers(
method: Method,
url: &Url,
headers: &HeaderMap,
cookies: &[(String, String)],
compression_mode: CompressionMode,
body_len: Option<u64>,
) -> Vec<(String, String)> {
let mut header_list = vec![
(":method".to_owned(), method.as_str().to_owned()),
(":scheme".to_owned(), url.scheme().to_owned()),
(":authority".to_owned(), url.authority().to_owned()),
(":path".to_owned(), url.path_and_query().to_owned()),
];
header_list.extend(build_httpx_regular_headers(
headers,
cookies,
compression_mode,
body_len,
));
header_list
}
#[cfg(feature = "h3")]
fn to_h3_headers(headers: &[(String, String)]) -> Vec<quiche::h3::Header> {
headers
.iter()
.map(|(name, value)| quiche::h3::Header::new(name.as_bytes(), value.as_bytes()))
.collect()
}
#[cfg(feature = "h3")]
fn next_connection_id() -> [u8; 16] {
let counter = NEXT_H3_CONN_ID.fetch_add(1, Ordering::Relaxed);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let mut id = [0_u8; 16];
id[..8].copy_from_slice(&counter.to_be_bytes());
id[8..].copy_from_slice(&(now as u64).to_be_bytes());
id
}
#[cfg(feature = "h3")]
fn server_name(url: &Url) -> Option<String> {
if url.host().parse::<IpAddr>().is_ok() {
None
} else {
Some(url.host().to_owned())
}
}
#[cfg(feature = "h3")]
async fn flush_quic_packets(
socket: &UdpSocket,
conn: &mut quiche::Connection,
buffer: &mut [u8],
timeout: Option<Duration>,
) -> Result<()> {
loop {
match conn.send(buffer) {
Ok((written, send_info)) => {
let sent = with_timeout_io(
timeout,
socket.send_to(&buffer[..written], send_info.to),
"write timed out",
)
.await?;
if sent != written {
return Err(Error::new(
ErrorKind::Transport,
"short UDP write while sending http3 packet",
));
}
}
Err(quiche::Error::Done) => return Ok(()),
Err(err) => {
return Err(Error::with_source(
ErrorKind::Transport,
"failed to flush http3 packets",
err,
));
}
}
}
}
#[cfg(feature = "h3")]
async fn recv_with_timeout(
socket: &UdpSocket,
buffer: &mut [u8],
timeout: Option<Duration>,
) -> Result<Option<(usize, SocketAddr)>> {
match timeout {
Some(duration) => {
future::or(
async move {
async_io::Timer::after(duration).await;
Ok(None)
},
async move {
socket.recv_from(buffer).await.map(Some).map_err(|err| {
Error::with_source(ErrorKind::Transport, "failed to read http3 packet", err)
})
},
)
.await
}
None => socket.recv_from(buffer).await.map(Some).map_err(|err| {
Error::with_source(ErrorKind::Transport, "failed to read http3 packet", err)
}),
}
}
#[cfg(feature = "h3")]
async fn with_timeout_io<F, T>(
timeout: Option<Duration>,
future: F,
timeout_message: &'static str,
) -> Result<T>
where
F: std::future::Future<Output = std::io::Result<T>>,
{
match timeout {
Some(duration) => {
future::or(
async move {
async_io::Timer::after(duration).await;
Err(Error::new(ErrorKind::Timeout, timeout_message))
},
async move {
future.await.map_err(|err| {
Error::with_source(ErrorKind::Transport, "io operation failed", err)
})
},
)
.await
}
None => future
.await
.map_err(|err| Error::with_source(ErrorKind::Transport, "io operation failed", err)),
}
}
#[cfg(feature = "h3")]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum WaitKind {
Connect(Option<Duration>),
Write(Option<Duration>),
Read(Option<Duration>),
Quic(Option<Duration>),
}
#[cfg(feature = "h3")]
impl WaitKind {
fn duration(self) -> Option<Duration> {
match self {
Self::Connect(duration)
| Self::Write(duration)
| Self::Read(duration)
| Self::Quic(duration) => duration,
}
}
}
#[cfg(feature = "h3")]
fn wait_kind(
conn: &quiche::Connection,
upload_pending: bool,
timeout_config: TimeoutConfig,
) -> WaitKind {
let user = if !conn.is_established() {
WaitKind::Connect(timeout_config.connect)
} else if upload_pending {
WaitKind::Write(timeout_config.write)
} else {
WaitKind::Read(timeout_config.read)
};
match (user.duration(), conn.timeout()) {
(Some(user_duration), Some(quic_duration)) if quic_duration < user_duration => {
WaitKind::Quic(Some(quic_duration))
}
(Some(_), _) => user,
(None, Some(quic_duration)) => WaitKind::Quic(Some(quic_duration)),
(None, None) => user,
}
}
#[cfg(feature = "h3")]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum HeaderBlockKind {
Informational,
Final,
Trailers,
}
#[cfg(feature = "h3")]
struct H3ResponseState {
request_method: Method,
compression_mode: CompressionMode,
status: Option<StatusCode>,
headers: HeaderMap,
trailers: Option<HeaderMap>,
body: Vec<u8>,
received_body_bytes: usize,
received_final_headers: bool,
received_trailers: bool,
end_stream: bool,
expected_content_length: Option<usize>,
}
#[cfg(feature = "h3")]
impl H3ResponseState {
fn new(request_method: Method, compression_mode: CompressionMode) -> Self {
Self {
request_method,
compression_mode,
status: None,
headers: HeaderMap::new(),
trailers: None,
body: Vec::new(),
received_body_bytes: 0,
received_final_headers: false,
received_trailers: false,
end_stream: false,
expected_content_length: None,
}
}
fn is_complete(&self) -> bool {
self.end_stream
}
fn mark_end_stream(&mut self) {
self.end_stream = true;
}
fn allows_response_body(&self) -> bool {
self.status
.map(|status| response_body_allowed(self.request_method, status))
.unwrap_or(false)
}
fn requires_aggregated_body(&self) -> bool {
self.allows_response_body() && self.headers.get("content-encoding").is_some()
}
fn record_body_bytes(&mut self, size: usize) {
self.received_body_bytes = self.received_body_bytes.saturating_add(size);
}
fn push_body(&mut self, chunk: &[u8]) {
self.body.extend_from_slice(chunk);
}
fn validate_received_body(&self) -> Result<()> {
if !self.allows_response_body() {
return Ok(());
}
if let Some(expected) = self.expected_content_length {
if self.received_body_bytes != expected {
return Err(Error::new(
ErrorKind::Transport,
format!(
"http3 response body length mismatch: expected {expected}, got {}",
self.received_body_bytes
),
));
}
}
Ok(())
}
fn apply_headers(&mut self, list: Vec<quiche::h3::Header>) -> Result<HeaderBlockKind> {
let parsed = ParsedHeaders::parse(list, !self.received_final_headers)?;
if let Some(status) = parsed.status {
if status.as_u16() < 200 {
return Ok(HeaderBlockKind::Informational);
}
if self.received_final_headers {
return Err(Error::new(
ErrorKind::Transport,
"http3 final response headers received more than once",
));
}
self.expected_content_length = parse_content_length(&parsed.headers, "http3")?;
for (name, value) in parsed.headers.iter() {
self.headers.append(name.as_str(), value.as_str())?;
}
self.status = Some(status);
self.received_final_headers = true;
return Ok(HeaderBlockKind::Final);
}
if !self.received_final_headers {
return Err(Error::new(
ErrorKind::Transport,
"http3 response headers missing :status",
));
}
if self.received_trailers {
return Err(Error::new(
ErrorKind::Transport,
"http3 trailers received more than once",
));
}
self.trailers = Some(parsed.headers);
self.received_trailers = true;
Ok(HeaderBlockKind::Trailers)
}
fn into_response(self, url: Url) -> Result<Response> {
let status = self.status.ok_or_else(|| {
Error::new(
ErrorKind::Transport,
"http3 response completed without final headers",
)
})?;
if response_body_allowed(self.request_method, status) {
if let Some(expected) = self.expected_content_length {
if self.received_body_bytes != expected {
return Err(Error::new(
ErrorKind::Transport,
format!(
"http3 response body length mismatch: expected {expected}, got {}",
self.received_body_bytes
),
));
}
}
}
let mut headers = self.headers;
let body = if response_body_allowed(self.request_method, status) {
maybe_decode_response_body(&mut headers, self.body, self.compression_mode)?
} else {
Body::from(self.body)
};
Ok(Response::new_with_trailer_state(
status,
Version::Http3,
url,
headers,
TrailerState::Ready(self.trailers),
body,
))
}
}
#[cfg(feature = "h3")]
struct ParsedHeaders {
status: Option<StatusCode>,
headers: HeaderMap,
}
#[cfg(feature = "h3")]
impl ParsedHeaders {
fn parse(list: Vec<quiche::h3::Header>, allow_status: bool) -> Result<Self> {
let mut status = None;
let mut regular_seen = false;
let mut headers = HeaderMap::new();
for header in list {
let name = String::from_utf8(header.name().to_vec()).map_err(|err| {
Error::with_source(ErrorKind::Transport, "invalid http3 header name", err)
})?;
let value = String::from_utf8(header.value().to_vec()).map_err(|err| {
Error::with_source(ErrorKind::Transport, "invalid http3 header value", err)
})?;
if let Some(pseudo_name) = name.strip_prefix(':') {
if regular_seen {
return Err(Error::new(
ErrorKind::Transport,
"http3 pseudo headers must appear before regular headers",
));
}
if !allow_status || pseudo_name != "status" {
return Err(Error::new(
ErrorKind::Transport,
format!("unexpected http3 pseudo header: {name}"),
));
}
if status.is_some() {
return Err(Error::new(
ErrorKind::Transport,
"http3 response repeated :status pseudo header",
));
}
let value = value.parse::<u16>().map_err(|err| {
Error::with_source(ErrorKind::Transport, "invalid http3 :status value", err)
})?;
status = Some(StatusCode::new(value));
continue;
}
regular_seen = true;
headers.append(name, value)?;
}
Ok(Self { status, headers })
}
}
#[cfg(all(test, feature = "h3"))]
mod tests {
use super::{
build_h3_config, peer_closed_before_response_error, pending_goaway_error,
pending_peer_close_error, rejected_stream_goaway_error, stream_rejected_by_goaway,
};
use crate::tls::TlsConfig;
#[test]
fn quiche_config_initializes() {
let _config = crate::connection::quic::build_quiche_config(&TlsConfig::default())
.expect("quiche transport config should build");
}
#[test]
fn http3_config_initializes() {
let _config = build_h3_config().expect("http3 config should build");
}
#[test]
fn goaway_rejects_stream_ids_at_or_above_identifier() {
assert!(!stream_rejected_by_goaway(0, 4));
assert!(!stream_rejected_by_goaway(3, 4));
assert!(stream_rejected_by_goaway(4, 4));
assert!(stream_rejected_by_goaway(8, 4));
}
#[test]
fn goaway_unsent_request_errors_are_stale() {
let err = pending_goaway_error(8);
assert_eq!(err.kind(), &crate::ErrorKind::StaleConnection);
assert!(err.to_string().contains("GOAWAY"));
assert!(err.to_string().contains("before request started"));
}
#[test]
fn goaway_rejected_stream_errors_are_stale() {
let err = rejected_stream_goaway_error(12, 8);
assert_eq!(err.kind(), &crate::ErrorKind::StaleConnection);
assert!(err.to_string().contains("stream 12"));
assert!(err.to_string().contains("GOAWAY"));
}
#[test]
fn peer_close_unsent_request_errors_are_stale() {
let err = pending_peer_close_error("http3 connection closed by peer");
assert_eq!(err.kind(), &crate::ErrorKind::StaleConnection);
assert!(err.to_string().contains("before request started"));
}
#[test]
fn peer_close_before_response_errors_are_stale() {
let err = peer_closed_before_response_error(4, "http3 connection closed by peer");
assert_eq!(err.kind(), &crate::ErrorKind::StaleConnection);
assert!(err.to_string().contains("stream 4"));
assert!(err.to_string().contains("response headers"));
}
}