trillium_http/h3/connection.rs
1mod peer_settings_wait;
2
3use super::{
4 H3Error,
5 frame::{Frame, FrameDecodeError, UniStreamType},
6 quic_varint::{self, QuicVarIntError},
7 settings::H3Settings,
8};
9use crate::{
10 Buffer, Conn, HttpContext, KnownHeaderName, Priority,
11 conn::H3FirstFrame,
12 h3::{H3ErrorCode, MAX_BUFFER_SIZE},
13 headers::qpack::{DecoderDynamicTable, EncoderDynamicTable, FieldSection},
14};
15use event_listener::Event;
16use futures_lite::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
17use std::{
18 future::{Future, IntoFuture},
19 io::{self, ErrorKind},
20 pin::Pin,
21 sync::{
22 Arc, OnceLock,
23 atomic::{AtomicBool, AtomicU64, Ordering},
24 },
25 task::{Context, Poll},
26};
27use swansong::{ShutdownCompletion, Swansong};
28
29/// The result of processing an HTTP/3 bidirectional stream.
30#[derive(Debug)]
31#[allow(
32 clippy::large_enum_variant,
33 reason = "Request is the hot path; boxing it would add an allocation per request"
34)]
35pub enum H3StreamResult<Transport> {
36 /// The stream carried a normal HTTP/3 request.
37 Request(Conn<Transport>),
38
39 /// The stream carries a WebTransport bidirectional data stream. The `session_id` identifies
40 /// the associated WebTransport session.
41 WebTransport {
42 /// The WebTransport session ID (stream ID of the CONNECT request).
43 session_id: u64,
44 /// The underlying transport, ready for application data.
45 transport: Transport,
46 /// Any bytes buffered after the session ID during stream negotiation.
47 buffer: Buffer,
48 },
49}
50
51/// Inner-loop result of [`H3Connection::process_inbound_uni_with_close`] before the recv
52/// stream is reattached. Decouples the inner async block (which only borrows the stream)
53/// from the caller-visible [`UniStreamResult`] (which returns the stream by value on
54/// non-`Handled` variants), so the function can keep ownership of `stream` long enough to
55/// fire its close callback before `stream` drops.
56enum UniInnerResult {
57 Handled,
58 WebTransport { session_id: u64, buffer: Buffer },
59 Unknown { stream_type: u64 },
60}
61
62/// The result of processing an HTTP/3 unidirectional stream.
63#[derive(Debug)]
64pub enum UniStreamResult<T> {
65 /// The stream was a known internal type (control, QPACK encoder/decoder) and was handled
66 /// automatically.
67 Handled,
68
69 /// A WebTransport unidirectional data stream. The `session_id` identifies the associated
70 /// WebTransport session.
71 WebTransport {
72 /// The WebTransport session ID.
73 session_id: u64,
74 /// The receive stream, ready for application data.
75 stream: T,
76 /// Any bytes buffered after the session ID during stream negotiation.
77 buffer: Buffer,
78 },
79
80 /// A stream whose type is recognized but unsupported (e.g. `Push`) or not recognized
81 /// at all by this crate.
82 ///
83 /// The caller is responsible for disposing of the stream — the in-tree consumers RST
84 /// it with `H3_STREAM_CREATION_ERROR`. `process_inbound_uni` deliberately does *not*
85 /// close the stream itself: handing it back gives a downstream extension the option to
86 /// implement a stream type trillium-http doesn't know about (a future RFC, an
87 /// experiment, etc.) without forking the codec.
88 Unknown {
89 /// The raw stream type value.
90 stream_type: u64,
91 /// The stream.
92 stream: T,
93 },
94}
95
96/// Shared state for a single HTTP/3 QUIC connection.
97///
98/// Call the appropriate methods on this type for each stream accepted from the QUIC connection.
99///
100/// # Driver shape (vs h2)
101///
102/// h2 multiplexes everything onto a single TCP byte stream, so a single
103/// [`H2Driver`][crate::h2::H2Driver] task suffices. h3 instead has the QUIC layer hand us multiple
104/// independent streams: an inbound and outbound control stream, an inbound and outbound QPACK
105/// encoder stream, an inbound and outbound QPACK decoder stream, and one bidi stream per
106/// request. There is no single "h3 driver" — each stream is driven by its own future returned from
107/// `H3Connection`'s `run_*` / `process_*` methods, and the caller decides how those futures are
108/// scheduled.
109///
110/// The trillium-http boundary is **runtime-free by design**: this crate hands out anonymous futures
111/// and lets the caller pick the executor. The in-tree consumers (`trillium-server-common`,
112/// `trillium-client`) follow a task-per-stream pattern — spawn each long-lived control / encoder /
113/// decoder future on its own task at connection setup, then spawn one task per accepted request
114/// stream. Nothing in this crate requires that pattern; a caller could in principle race all the
115/// futures on one task instead, with different perf characteristics.
116#[derive(Debug)]
117pub struct H3Connection {
118 /// Shared configuration across all protocols.
119 context: Arc<HttpContext>,
120
121 /// Connection-scoped shutdown signal. Shut down when we receive GOAWAY from the peer or when
122 /// the server-level Swansong shuts down. Request stream tasks use this to interrupt
123 /// in-progress work.
124 swansong: Swansong,
125
126 /// The peer's H3 settings, received on their control stream. Request streams may need to
127 /// consult these (e.g. max field section size).
128 pub(super) peer_settings: OnceLock<H3Settings>,
129
130 /// Multi-listener wake source for
131 /// [`PeerSettingsReady`][peer_settings_wait::PeerSettingsReady]. Notified by
132 /// `run_inbound_control` after applying peer SETTINGS, and again on connection
133 /// close, so any number of concurrently-parked futures all unblock together.
134 pub(super) peer_settings_event: Event,
135
136 /// The highest bidirectional stream ID we have accepted. Used to compute the GOAWAY value
137 /// (this + 4) to tell the peer which requests we saw. None until the first stream is accepted.
138 /// Updated by the runtime adapter's accept loop via [`record_accepted_stream`].
139 max_accepted_stream_id: AtomicU64,
140
141 /// Whether we have accepted any streams yet.
142 has_accepted_stream: AtomicBool,
143
144 /// The decoder-side QPACK dynamic table for this connection.
145 decoder_dynamic_table: DecoderDynamicTable,
146
147 /// The encoder-side QPACK dynamic table for this connection.
148 encoder_dynamic_table: EncoderDynamicTable,
149
150 /// Sink for RFC 9218 priority signals, set via
151 /// [`register_priority_callback`][Self::register_priority_callback]. Unset until the runtime
152 /// adapter that owns the QUIC streams registers it.
153 priority_callback: PriorityCallback,
154}
155
156/// Boxed sink for `(stream_id, priority, is_update)` signals.
157type PriorityCallbackFn = Box<dyn Fn(u64, Priority, bool) + Send + Sync>;
158
159/// A registered sink for `(stream_id, priority, is_update)` signals. Newtype so [`H3Connection`]
160/// can keep deriving `Debug` despite holding a boxed closure.
161#[derive(Default)]
162struct PriorityCallback(OnceLock<PriorityCallbackFn>);
163
164impl std::fmt::Debug for PriorityCallback {
165 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166 f.debug_tuple("PriorityCallback")
167 .field(&self.0.get().map(|_| format_args!("..")))
168 .finish()
169 }
170}
171
172impl H3Connection {
173 /// Construct a new `H3Connection` to manage HTTP/3 for a given peer.
174 pub fn new(context: Arc<HttpContext>) -> Arc<Self> {
175 let swansong = context.swansong.child();
176 let max_table_capacity = context.config.dynamic_table_capacity;
177 let blocked_streams = context.config.h3_blocked_streams;
178 let encoder_dynamic_table = EncoderDynamicTable::new(&context);
179 Arc::new(Self {
180 context,
181 swansong,
182 peer_settings: OnceLock::new(),
183 peer_settings_event: Event::new(),
184 max_accepted_stream_id: AtomicU64::new(0),
185 has_accepted_stream: AtomicBool::new(false),
186 decoder_dynamic_table: DecoderDynamicTable::new(max_table_capacity, blocked_streams),
187 encoder_dynamic_table,
188 priority_callback: PriorityCallback::default(),
189 })
190 }
191
192 /// Register the sink for RFC 9218 priority signals on this connection.
193 ///
194 /// The callback is invoked with `(stream_id, priority, is_update)` once per request when its
195 /// initial `priority` header is parsed (`is_update = false`), and again for every
196 /// `PRIORITY_UPDATE` received afterward (`is_update = true`). `is_update` lets the receiver
197 /// honor RFC 9218 precedence: a `PRIORITY_UPDATE` outranks the request's initial header
198 /// priority regardless of arrival order, including when it arrives before the stream is
199 /// accepted.
200 ///
201 /// This crate keeps no priority state of its own and does no scheduling: it parses each
202 /// signal and hands the [`Priority`] off through this callback, leaving the receiver to apply
203 /// it to whatever owns send scheduling. Without a registered callback, priority is parsed but
204 /// never applied.
205 ///
206 /// Has no effect if a callback is already registered.
207 pub fn register_priority_callback(
208 &self,
209 callback: impl Fn(u64, Priority, bool) + Send + Sync + 'static,
210 ) {
211 let _ = self.priority_callback.0.set(Box::new(callback));
212 }
213
214 /// Emit a priority signal for a request stream to the registered callback, if any.
215 /// `is_update` distinguishes a received `PRIORITY_UPDATE` from the request's initial header
216 /// priority so the receiver can honor the precedence rule.
217 fn emit_priority(&self, stream_id: u64, priority: Priority, is_update: bool) {
218 let kind = if is_update {
219 "PRIORITY_UPDATE"
220 } else {
221 "initial"
222 };
223 match self.priority_callback.0.get() {
224 Some(callback) => {
225 log::trace!("H3 stream {stream_id}: emitting {kind} priority \"{priority}\"");
226 callback(stream_id, priority, is_update);
227 }
228 None => log::trace!(
229 "H3 stream {stream_id}: {kind} priority \"{priority}\" parsed but no callback \
230 registered"
231 ),
232 }
233 }
234
235 /// Handle an RFC 9218 `PRIORITY_UPDATE` received on the peer's control stream. The
236 /// prioritized element id must name a client-initiated bidirectional (request) stream —
237 /// `id % 4 == 0` in QUIC — and other ids are ignored rather than erroring, since the signal
238 /// is advisory.
239 fn emit_priority_update(&self, prioritized_element_id: u64, priority: Priority) {
240 if prioritized_element_id.is_multiple_of(4) {
241 self.emit_priority(prioritized_element_id, priority, true);
242 } else {
243 log::trace!(
244 "H3: ignoring PRIORITY_UPDATE for non-request stream {prioritized_element_id}"
245 );
246 }
247 }
248
249 /// Retrieve the [`Swansong`] shutdown handle for this HTTP/3 connection. See also
250 /// [`H3Connection::shut_down`]
251 pub fn swansong(&self) -> &Swansong {
252 &self.swansong
253 }
254
255 /// Attempt graceful shutdown of this HTTP/3 connection (all streams).
256 ///
257 /// The returned [`ShutdownCompletion`] type can
258 /// either be awaited in an async context or blocked on with [`ShutdownCompletion::block`] in a
259 /// blocking context
260 ///
261 /// Note that this will NOT shut down the server. To shut down the whole server, use
262 /// [`HttpContext::shut_down`]
263 pub fn shut_down(&self) -> ShutdownCompletion {
264 // Wake any in-flight `decode_field_section` calls parked on the decoder
265 // table's `ThresholdWait` (a non-I/O future awaiting dynamic-table inserts
266 // from the peer). The encoder table's writer loop is already swansong-
267 // aware, but we mark it failed too for symmetry: any future state
268 // mutations after shutdown are no longer wire-relevant.
269 self.decoder_dynamic_table.fail(H3ErrorCode::NoError);
270 self.encoder_dynamic_table.fail(H3ErrorCode::NoError);
271 self.wake_peer_settings_waiters();
272 self.swansong.shut_down()
273 }
274
275 /// Retrieve the [`HttpContext`] for this server.
276 pub fn context(&self) -> Arc<HttpContext> {
277 self.context.clone()
278 }
279
280 /// Returns the peer's HTTP/3 settings, available once the peer's control stream has been
281 /// processed.
282 pub fn peer_settings(&self) -> Option<&H3Settings> {
283 self.peer_settings.get()
284 }
285
286 /// Record that we accepted a bidirectional stream with this ID.
287 fn record_accepted_stream(&self, stream_id: u64) {
288 self.max_accepted_stream_id
289 .fetch_max(stream_id, Ordering::Relaxed);
290 self.has_accepted_stream.store(true, Ordering::Relaxed);
291 }
292
293 /// The stream ID to send in a GOAWAY frame: one past the highest stream we accepted, or 0 if we
294 /// haven't accepted any.
295 fn goaway_id(&self) -> u64 {
296 if self.has_accepted_stream.load(Ordering::Relaxed) {
297 self.max_accepted_stream_id.load(Ordering::Relaxed) + 4
298 } else {
299 0
300 }
301 }
302
303 /// Begin processing a single HTTP/3 request-response cycle on an accepted bidirectional
304 /// stream.
305 ///
306 /// Returns a builder. Attach an optional reset hook with
307 /// [`with_reset`][H3BidiRequest::with_reset], then `.await` it to run one request/response
308 /// cycle. Awaiting resolves to [`H3StreamResult::WebTransport`] if the stream opens a
309 /// WebTransport session rather than a standard HTTP/3 request.
310 ///
311 /// Without a reset hook, a stream-level protocol error drops the transport without
312 /// resetting it; attach `with_reset` to issue the RST that RFC 9114 requires for stream
313 /// errors.
314 ///
315 /// RFC 9218 priority is delivered out of band via the callback registered with
316 /// [`register_priority_callback`][Self::register_priority_callback]: this method emits the
317 /// request's initial priority once the headers are parsed.
318 pub fn process_inbound_bidi<Transport, Handler>(
319 self: Arc<Self>,
320 transport: Transport,
321 handler: Handler,
322 stream_id: u64,
323 ) -> H3BidiRequest<Transport, Handler> {
324 H3BidiRequest {
325 h3: self,
326 transport,
327 handler,
328 stream_id,
329 reset: None,
330 }
331 }
332
333 /// Process a single HTTP/3 request-response cycle on a bidirectional stream, calling
334 /// `reset` to issue a stream RST when a stream-level protocol error occurs.
335 ///
336 /// On any `H3Error::Protocol(code)` produced by first-frame processing (HEADERS decode,
337 /// pseudo-header validation, etc.), `reset` is invoked with the still-owned transport and
338 /// the error code before the error is returned. This lets callers RST both the recv and
339 /// send halves of the bidi stream — required by RFC 9114 for stream errors like
340 /// `H3_MESSAGE_ERROR`. I/O errors and successful runs do not invoke `reset`.
341 ///
342 /// `reset` is a `FnOnce` taking `(&mut Transport, H3ErrorCode)`. trillium-http does not
343 /// itself depend on any reset capability of the transport; callers wire up the actual
344 /// stream-RST mechanism (e.g. quinn's `RecvStream::stop` + `SendStream::reset`) inside
345 /// the closure.
346 ///
347 /// # Errors
348 ///
349 /// Returns an `H3Error` in case of io error or http/3 semantic error.
350 // This is not deprecated yet because it didn't make sense to release a new version of
351 // trillium-client just to avoid this deprecation, but the intention is to deprecate
352 pub async fn process_inbound_bidi_with_reset<Transport, Handler, Fut, Reset>(
353 self: Arc<Self>,
354 mut transport: Transport,
355 handler: Handler,
356 stream_id: u64,
357 reset: Reset,
358 ) -> Result<H3StreamResult<Transport>, H3Error>
359 where
360 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
361 Handler: FnOnce(Conn<Transport>) -> Fut,
362 Fut: Future<Output = Conn<Transport>>,
363 Reset: FnOnce(&mut Transport, H3ErrorCode),
364 {
365 self.record_accepted_stream(stream_id);
366 let _guard = self.swansong.guard();
367 let mut buffer: Buffer =
368 Vec::with_capacity(self.context.config.request_buffer_initial_len).into();
369
370 let outcome =
371 Conn::process_first_frame_h3(&self, &mut transport, &mut buffer, stream_id).await;
372
373 match outcome {
374 Ok(H3FirstFrame::Request {
375 validated,
376 start_time,
377 }) => {
378 let conn =
379 Conn::build_h3(self, transport, buffer, validated, start_time, stream_id);
380 Ok(H3StreamResult::Request(
381 handler(conn).await.send_h3().await?,
382 ))
383 }
384 Ok(H3FirstFrame::WebTransport { session_id }) => Ok(H3StreamResult::WebTransport {
385 session_id,
386 transport,
387 buffer,
388 }),
389 Err(error) => {
390 if let H3Error::Protocol(code) = &error {
391 reset(&mut transport, *code);
392 }
393 Err(error)
394 }
395 }
396 }
397
398 /// Decode a QPACK-encoded field section, consulting the dynamic table as needed.
399 ///
400 /// If the field section's Required Insert Count is greater than zero, waits until the
401 /// dynamic table has received enough entries. Returns an error on protocol violations or
402 /// if the encoder stream fails while waiting.
403 ///
404 /// Duplicate pseudo-headers are silently ignored (first value wins). Unknown
405 /// pseudo-headers are rejected.
406 ///
407 /// # Errors
408 ///
409 /// Returns an error if the encoded bytes cannot be parsed as a valid field section.
410 #[cfg(feature = "unstable")]
411 pub async fn decode_field_section(
412 &self,
413 encoded: &[u8],
414 stream_id: u64,
415 ) -> Result<FieldSection<'static>, H3Error> {
416 self.decoder_dynamic_table.decode(encoded, stream_id).await
417 }
418
419 #[cfg(not(feature = "unstable"))]
420 pub(crate) async fn decode_field_section(
421 &self,
422 encoded: &[u8],
423 stream_id: u64,
424 ) -> Result<FieldSection<'static>, H3Error> {
425 self.decoder_dynamic_table.decode(encoded, stream_id).await
426 }
427
428 /// Encode a QPACK field section from pseudo-headers and headers, consulting the encoder
429 /// dynamic table to emit literal-with-name-reference or indexed representations as the
430 /// table's contents allow.
431 ///
432 /// # Errors
433 ///
434 /// Returns an `H3Error` in case of http/3 semantic error.
435 #[cfg(feature = "unstable")]
436 #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
437 pub fn encode_field_section(
438 &self,
439 field_section: &FieldSection<'_>,
440 buf: &mut Vec<u8>,
441 stream_id: u64,
442 ) -> Result<(), H3Error> {
443 self.encoder_dynamic_table
444 .encode(field_section, buf, stream_id);
445 Ok(())
446 }
447
448 #[cfg(not(feature = "unstable"))]
449 #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
450 pub(crate) fn encode_field_section(
451 &self,
452 field_section: &FieldSection<'_>,
453 buf: &mut Vec<u8>,
454 stream_id: u64,
455 ) -> Result<(), H3Error> {
456 self.encoder_dynamic_table
457 .encode(field_section, buf, stream_id);
458 Ok(())
459 }
460
461 /// Run this connection's HTTP/3 outbound control stream.
462 ///
463 /// Sends the initial SETTINGS frame, then sends GOAWAY when the connection shuts down.
464 /// Returns after GOAWAY is sent; keep the stream open until the QUIC connection closes
465 /// (closing a control stream is a connection error).
466 ///
467 /// # Errors
468 ///
469 /// Returns an `H3Error` in case of io error or http/3 semantic error.
470 pub async fn run_outbound_control<T>(&self, mut stream: T) -> Result<(), H3Error>
471 where
472 T: AsyncWrite + Unpin + Send,
473 {
474 let mut buf = vec![0; 128];
475
476 let settings = Frame::Settings(H3Settings::from(&self.context.config));
477 log::trace!(
478 "H3 outbound control: sending SETTINGS: {:?}",
479 H3Settings::from(&self.context.config)
480 );
481
482 write(&mut buf, &mut stream, |buf| {
483 let mut written = quic_varint::encode(UniStreamType::Control, buf)?;
484 written += settings.encode(&mut buf[written..])?;
485 Some(written)
486 })
487 .await?;
488 log::trace!("H3 outbound control: SETTINGS sent");
489
490 self.swansong.clone().await;
491
492 write(&mut buf, &mut stream, |buf| {
493 Frame::Goaway(self.goaway_id()).encode(buf)
494 })
495 .await?;
496
497 Ok(())
498 }
499
500 /// Run the outbound QPACK encoder stream for the duration of the connection.
501 ///
502 /// Writes the stream type byte, then drains encoder-stream instructions from the encoder
503 /// dynamic table as they are enqueued. Returns when the connection shuts down or the table is
504 /// marked failed.
505 ///
506 /// # Errors
507 ///
508 /// Returns an `H3Error` in case of io error.
509 pub async fn run_encoder<T>(&self, mut stream: T) -> Result<(), H3Error>
510 where
511 T: AsyncWrite + Unpin + Send,
512 {
513 self.encoder_dynamic_table
514 .run_writer(&mut stream, self.swansong.clone())
515 .await
516 }
517
518 /// Run the outbound QPACK decoder stream for the duration of the connection.
519 ///
520 /// Writes the stream type byte, then loops sending Section Acknowledgement and Insert
521 /// Count Increment instructions as they become needed. Returns when the connection
522 /// shuts down.
523 ///
524 /// # Errors
525 ///
526 /// Returns an `H3Error` in case of io error or http/3 semantic error.
527 pub async fn run_decoder<T>(&self, mut stream: T) -> Result<(), H3Error>
528 where
529 T: AsyncWrite + Unpin + Send,
530 {
531 self.decoder_dynamic_table
532 .run_writer(&mut stream, self.swansong.clone())
533 .await
534 }
535
536 /// Handle an inbound unidirectional HTTP/3 stream from the peer.
537 ///
538 /// Internal stream types (control, QPACK encoder/decoder) are handled automatically;
539 /// application streams are returned via [`UniStreamResult`] for the caller to process.
540 ///
541 /// On a connection-level protocol error, this method drops the recv stream before
542 /// the caller can react. Quinn's `RecvStream::drop` then sends `STOP_SENDING`, which
543 /// races against the caller's `connection.close` — if the peer responds with a
544 /// malformed `RESET_STREAM` (notably `final_offset = 0`) before our app close is
545 /// applied, the transport-level error overrides our app error code on the wire.
546 /// Use [`process_inbound_uni_with_close`] to thread the close call through the
547 /// function so it fires before the stream drops.
548 ///
549 /// [`process_inbound_uni_with_close`]: Self::process_inbound_uni_with_close
550 ///
551 /// # Errors
552 ///
553 /// Returns a `H3Error` in case of io error or http/3 semantic error.
554 #[deprecated(
555 since = "1.2.0",
556 note = "use `process_inbound_uni_with_close` so connection-level protocol errors close \
557 the QUIC connection before the recv stream drops, avoiding a `FINAL_SIZE_ERROR` \
558 race with the peer's response to STOP_SENDING"
559 )]
560 pub async fn process_inbound_uni<T>(&self, stream: T) -> Result<UniStreamResult<T>, H3Error>
561 where
562 T: AsyncRead + Unpin + Send,
563 {
564 self.process_inbound_uni_with_close(stream, |_| {}).await
565 }
566
567 /// Handle an inbound unidirectional HTTP/3 stream from the peer, calling `on_close` to
568 /// close the QUIC connection if a connection-level protocol error is detected.
569 ///
570 /// Identical to [`process_inbound_uni`][Self::process_inbound_uni] except that on
571 /// any `H3Error::Protocol(code)` whose code is a connection-level error (RFC 9114,
572 /// RFC 9204), `on_close` is invoked with that code while the recv stream is still alive. This
573 /// lets callers send a `CONNECTION_CLOSE` before the stream drops — if the close call sets
574 /// quinn's `conn.error`, quinn's `RecvStream::drop` skips `STOP_SENDING`, eliminating a
575 /// peer race that otherwise causes `FINAL_SIZE_ERROR` to override the app error code.
576 ///
577 /// `on_close` is a `FnOnce` taking `H3ErrorCode`. trillium-http does not itself
578 /// hold the QUIC connection; callers wire up the actual `connection.close()` call
579 /// inside the closure (e.g. quinn's `Connection::close`).
580 ///
581 /// # Errors
582 ///
583 /// Returns a `H3Error` in case of io error or http/3 semantic error.
584 pub async fn process_inbound_uni_with_close<T, OnClose>(
585 &self,
586 mut stream: T,
587 on_close: OnClose,
588 ) -> Result<UniStreamResult<T>, H3Error>
589 where
590 T: AsyncRead + Unpin + Send,
591 OnClose: FnOnce(H3ErrorCode),
592 {
593 let inner = self
594 .swansong
595 .interrupt(self.process_inbound_uni_inner(&mut stream))
596 .await
597 .unwrap_or(Ok(UniInnerResult::Handled)); // interrupted
598
599 match inner {
600 Ok(UniInnerResult::Handled) => Ok(UniStreamResult::Handled),
601 Ok(UniInnerResult::WebTransport { session_id, buffer }) => {
602 Ok(UniStreamResult::WebTransport {
603 session_id,
604 stream,
605 buffer,
606 })
607 }
608 Ok(UniInnerResult::Unknown { stream_type }) => Ok(UniStreamResult::Unknown {
609 stream_type,
610 stream,
611 }),
612 Err(error) => {
613 // Fire `on_close` BEFORE returning so the caller's connection.close
614 // call sets quinn's `conn.error` while `stream` is still alive. When
615 // `stream` then drops at function return, quinn's `RecvStream::drop`
616 // skips STOP_SENDING — preventing the peer-RESET_STREAM race that
617 // otherwise replaces our app close code with FINAL_SIZE_ERROR.
618 if let H3Error::Protocol(code) = &error
619 && code.is_connection_error()
620 {
621 on_close(*code);
622 }
623 Err(error)
624 }
625 }
626 }
627
628 /// Inner-loop body of [`process_inbound_uni_with_close`][Self::process_inbound_uni_with_close].
629 /// Borrows `stream` so the outer function can keep ownership of it across the await,
630 /// which lets the caller's close callback fire before the recv stream drops.
631 async fn process_inbound_uni_inner<T>(&self, stream: &mut T) -> Result<UniInnerResult, H3Error>
632 where
633 T: AsyncRead + Unpin + Send,
634 {
635 let mut buf = vec![0; 128];
636 let mut filled = 0;
637
638 // Read stream type varint (decode as raw u64 to handle unknown types)
639 let stream_type = read(&mut buf, &mut filled, stream, |data| {
640 match quic_varint::decode(data) {
641 Ok(ok) => Ok(Some(ok)),
642 Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
643 // this branch is unreachable because u64 is always From<u64>
644 Err(QuicVarIntError::UnknownValue { bytes, value }) => Ok(Some((value, bytes))),
645 }
646 })
647 .await?;
648
649 match UniStreamType::try_from(stream_type) {
650 Ok(UniStreamType::Control) => {
651 log::trace!("H3 inbound uni: control stream");
652 self.run_inbound_control(&mut buf, &mut filled, stream)
653 .await?;
654 Ok(UniInnerResult::Handled)
655 }
656
657 Ok(UniStreamType::QpackEncoder) => {
658 log::trace!("H3 inbound uni: QPACK encoder stream ({filled} bytes pre-read)");
659 let mut reader = Prepended {
660 head: &buf[..filled],
661 tail: stream,
662 };
663
664 log::trace!("QPACK encoder stream: started");
665 self.decoder_dynamic_table.run_reader(&mut reader).await?;
666
667 Ok(UniInnerResult::Handled)
668 }
669
670 Ok(UniStreamType::QpackDecoder) => {
671 log::trace!("H3 inbound uni: QPACK decoder stream ({filled} bytes pre-read)");
672 let mut reader = Prepended {
673 head: &buf[..filled],
674 tail: stream,
675 };
676 self.encoder_dynamic_table.run_reader(&mut reader).await?;
677 Ok(UniInnerResult::Handled)
678 }
679
680 Ok(UniStreamType::WebTransport) => {
681 log::trace!("H3 inbound uni: WebTransport stream");
682 let session_id =
683 read(
684 &mut buf,
685 &mut filled,
686 stream,
687 |data| match quic_varint::decode(data) {
688 Ok(ok) => Ok(Some(ok)),
689 Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
690 Err(QuicVarIntError::UnknownValue { bytes, value }) => {
691 Ok(Some((value, bytes)))
692 }
693 },
694 )
695 .await?;
696
697 buf.truncate(filled);
698
699 Ok(UniInnerResult::WebTransport {
700 session_id,
701 buffer: buf.into(),
702 })
703 }
704
705 Ok(UniStreamType::Push) => {
706 // Trillium does not support HTTP/3 push, so we hand these back as `Unknown`
707 // identically to truly-unknown stream types — the explicit arm exists so
708 // trace output names "push stream" rather than a bare type id.
709 log::trace!("H3 inbound uni: push stream (push not supported)");
710 Ok(UniInnerResult::Unknown { stream_type })
711 }
712
713 Err(_) => {
714 log::trace!("H3 inbound uni: unknown stream type {stream_type:#x}");
715 Ok(UniInnerResult::Unknown { stream_type })
716 }
717 }
718 }
719
720 /// Handle the http/3 peer's inbound control stream.
721 ///
722 /// # Errors
723 ///
724 /// Returns a `H3Error` in case of io error or HTTP/3 semantic error.
725 async fn run_inbound_control<T>(
726 &self,
727 buf: &mut Vec<u8>,
728 filled: &mut usize,
729 stream: &mut T,
730 ) -> Result<(), H3Error>
731 where
732 T: AsyncRead + Unpin + Send,
733 {
734 // SettingsError takes priority: a SETTINGS frame whose payload is itself invalid
735 // (e.g. forbidden HTTP/2 setting IDs) is reported as SETTINGS_ERROR, not the
736 // MISSING_SETTINGS we report for everything else here.
737 let settings = read(buf, filled, stream, |data| match Frame::decode(data) {
738 Ok((Frame::Settings(s), consumed)) => Ok(Some((s, consumed))),
739 Err(FrameDecodeError::Incomplete) => Ok(None),
740 Err(FrameDecodeError::Error(H3ErrorCode::SettingsError)) => {
741 Err(H3ErrorCode::SettingsError)
742 }
743 Ok(_) | Err(FrameDecodeError::Error(_)) => Err(H3ErrorCode::MissingSettings),
744 })
745 .await
746 .map_err(map_critical_stream_eof)?;
747
748 log::trace!("H3 peer settings: {settings:?}");
749
750 self.peer_settings
751 .set(settings)
752 .map_err(|_| H3ErrorCode::FrameUnexpected)?;
753 self.wake_peer_settings_waiters();
754
755 self.encoder_dynamic_table
756 .initialize_from_peer_settings(settings);
757
758 loop {
759 let frame = self
760 .swansong
761 .interrupt(read(buf, filled, stream, |data| {
762 match Frame::decode(data) {
763 Ok((frame, consumed)) => Ok(Some((frame, consumed))),
764 Err(FrameDecodeError::Incomplete) => Ok(None),
765 Err(FrameDecodeError::Error(code)) => Err(code),
766 }
767 }))
768 .await
769 .transpose()
770 .map_err(map_critical_stream_eof)?;
771
772 match frame {
773 None => {
774 log::trace!("H3 control stream: interrupted by shutdown");
775 return Ok(());
776 }
777
778 Some(Frame::Goaway(id)) => {
779 log::trace!("H3 control stream: peer sent GOAWAY(stream_id={id})");
780 self.swansong.shut_down();
781 return Ok(());
782 }
783
784 Some(Frame::Unknown(n)) => {
785 // Consume the payload bytes so the stream stays synchronized.
786 log::trace!("H3 control stream: skipping unknown frame (payload {n} bytes)");
787 let n = usize::try_from(n).unwrap_or(usize::MAX);
788 let in_buf = n.min(*filled);
789 buf.copy_within(in_buf..*filled, 0);
790 *filled -= in_buf;
791 let mut todo = n - in_buf;
792 let mut scratch = [0u8; 256];
793 while todo > 0 {
794 let to_read = todo.min(scratch.len());
795 let n = stream
796 .read(&mut scratch[..to_read])
797 .await
798 .map_err(H3Error::Io)?;
799 if n == 0 {
800 return Err(H3ErrorCode::ClosedCriticalStream.into());
801 }
802 todo -= n;
803 }
804 }
805
806 Some(
807 Frame::Settings(_)
808 | Frame::Data(_)
809 | Frame::Headers(_)
810 | Frame::PushPromise { .. }
811 | Frame::WebTransport(_),
812 ) => {
813 return Err(H3ErrorCode::FrameUnexpected.into());
814 }
815
816 Some(Frame::PriorityUpdate {
817 prioritized_element_id,
818 priority,
819 }) => {
820 log::trace!(
821 "H3 control stream: PRIORITY_UPDATE stream={prioritized_element_id} \
822 priority=\"{priority}\""
823 );
824 self.emit_priority_update(prioritized_element_id, priority);
825 }
826
827 // Trillium doesn't implement push, so these are ignored rather than acted on.
828 Some(Frame::CancelPush(_) | Frame::MaxPushId(_)) => {
829 log::trace!("H3 control stream: ignoring {frame:?}");
830 }
831 }
832 }
833 }
834}
835
836/// A pending HTTP/3 request-response cycle on one bidirectional stream, with optional
837/// per-stream hooks.
838///
839/// Built by [`H3Connection::process_inbound_bidi`]. Configure hooks with the `with_*`
840/// methods and `.await` it to run the cycle. New per-stream extension points are added as
841/// further `with_*` methods, so the entry point's required arguments never change.
842pub struct H3BidiRequest<Transport, Handler> {
843 h3: Arc<H3Connection>,
844 transport: Transport,
845 handler: Handler,
846 stream_id: u64,
847 reset: Option<ResetHook<Transport>>,
848}
849
850/// Per-stream reset hook: RST both halves with the still-owned transport on a stream-level error.
851type ResetHook<Transport> = Box<dyn FnOnce(&mut Transport, H3ErrorCode) + Send>;
852
853impl<Transport, Handler> std::fmt::Debug for H3BidiRequest<Transport, Handler> {
854 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
855 f.debug_struct("H3BidiRequest")
856 .field("stream_id", &self.stream_id)
857 .finish_non_exhaustive()
858 }
859}
860
861impl<Transport, Handler> H3BidiRequest<Transport, Handler> {
862 /// Issue a stream RST on a stream-level protocol error.
863 ///
864 /// On any `H3Error::Protocol(code)` from first-frame processing, `reset` is called with
865 /// the still-owned transport and the error code before the error is returned — letting the
866 /// caller RST both halves of the bidi stream as RFC 9114 requires. I/O errors and
867 /// successful runs do not invoke it. Without this hook, the transport is dropped without a
868 /// reset.
869 #[must_use]
870 pub fn with_reset<R>(mut self, reset: R) -> Self
871 where
872 R: FnOnce(&mut Transport, H3ErrorCode) + Send + 'static,
873 {
874 self.reset = Some(Box::new(reset));
875 self
876 }
877}
878
879impl<Transport, Handler, Fut> IntoFuture for H3BidiRequest<Transport, Handler>
880where
881 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
882 Handler: FnOnce(Conn<Transport>) -> Fut + Send + 'static,
883 Fut: Future<Output = Conn<Transport>> + Send + 'static,
884{
885 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
886 type Output = Result<H3StreamResult<Transport>, H3Error>;
887
888 fn into_future(self) -> Self::IntoFuture {
889 Box::pin(async move {
890 let Self {
891 h3,
892 mut transport,
893 handler,
894 stream_id,
895 reset,
896 } = self;
897
898 h3.record_accepted_stream(stream_id);
899 let _guard = h3.swansong.guard();
900 let mut buffer: Buffer =
901 Vec::with_capacity(h3.context.config.request_buffer_initial_len).into();
902
903 let outcome =
904 Conn::process_first_frame_h3(&h3, &mut transport, &mut buffer, stream_id).await;
905
906 match outcome {
907 Ok(H3FirstFrame::Request {
908 validated,
909 start_time,
910 }) => {
911 let initial_priority = validated
912 .request_headers
913 .get_str(KnownHeaderName::Priority)
914 .map(Priority::parse)
915 .unwrap_or_default();
916 h3.emit_priority(stream_id, initial_priority, false);
917 let conn =
918 Conn::build_h3(h3, transport, buffer, validated, start_time, stream_id);
919 Ok(H3StreamResult::Request(
920 handler(conn).await.send_h3().await?,
921 ))
922 }
923 Ok(H3FirstFrame::WebTransport { session_id }) => Ok(H3StreamResult::WebTransport {
924 session_id,
925 transport,
926 buffer,
927 }),
928 Err(error) => {
929 if let H3Error::Protocol(code) = &error
930 && let Some(reset) = reset
931 {
932 reset(&mut transport, *code);
933 }
934 Err(error)
935 }
936 }
937 })
938 }
939}
940
941/// Map an `UnexpectedEof` I/O error (the `read` helper's "stream FIN'd" signal) to
942/// `H3_CLOSED_CRITICAL_STREAM`. Closure of the control stream or of either QPACK
943/// side-channel is a connection error. Other I/O errors and any protocol error are passed
944/// through unchanged.
945fn map_critical_stream_eof(error: H3Error) -> H3Error {
946 match error {
947 H3Error::Io(e) if e.kind() == ErrorKind::UnexpectedEof => {
948 H3ErrorCode::ClosedCriticalStream.into()
949 }
950 other => other,
951 }
952}
953
954async fn write(
955 buf: &mut Vec<u8>,
956 mut stream: impl AsyncWrite + Unpin + Send,
957 mut f: impl FnMut(&mut [u8]) -> Option<usize>,
958) -> io::Result<usize> {
959 let written = loop {
960 if let Some(w) = f(buf) {
961 break w;
962 }
963 if buf.len() >= MAX_BUFFER_SIZE {
964 return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation"));
965 }
966 buf.resize(buf.len() * 2, 0);
967 };
968
969 stream.write_all(&buf[..written]).await?;
970 stream.flush().await?;
971 Ok(written)
972}
973
974/// An `AsyncRead` adapter that drains a byte slice before reading from an inner stream.
975///
976/// Used to replay bytes that were read ahead while parsing a stream-type varint, before
977/// dispatching to the inner runner that consumes the rest of the stream.
978struct Prepended<'a, T> {
979 head: &'a [u8],
980 tail: T,
981}
982
983impl<T: AsyncRead + Unpin> AsyncRead for Prepended<'_, T> {
984 fn poll_read(
985 self: Pin<&mut Self>,
986 cx: &mut Context<'_>,
987 out: &mut [u8],
988 ) -> Poll<io::Result<usize>> {
989 let this = self.get_mut();
990 if !this.head.is_empty() {
991 let n = this.head.len().min(out.len());
992 out[..n].copy_from_slice(&this.head[..n]);
993 this.head = &this.head[n..];
994 return Poll::Ready(Ok(n));
995 }
996 Pin::new(&mut this.tail).poll_read(cx, out)
997 }
998}
999
1000/// Read from `stream` into `buf` until `f` can decode a value.
1001///
1002/// `f` receives the filled portion of the buffer and returns:
1003/// - `Ok(Some((value, consumed)))` — success; consumed bytes are removed from the front
1004/// - `Ok(None)` — need more data; reads more bytes and retries
1005/// - `Err(e)` — unrecoverable error; propagated to caller
1006async fn read<R>(
1007 buf: &mut Vec<u8>,
1008 filled: &mut usize,
1009 stream: &mut (impl AsyncRead + Unpin + Send),
1010 f: impl Fn(&[u8]) -> Result<Option<(R, usize)>, H3ErrorCode>,
1011) -> Result<R, H3Error> {
1012 loop {
1013 if let Some((result, consumed)) = f(&buf[..*filled])? {
1014 buf.copy_within(consumed..*filled, 0);
1015 *filled -= consumed;
1016 return Ok(result);
1017 }
1018
1019 if *filled >= buf.len() {
1020 if buf.len() >= MAX_BUFFER_SIZE {
1021 return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation").into());
1022 }
1023 buf.resize(buf.len() * 2, 0);
1024 }
1025
1026 let n = stream.read(&mut buf[*filled..]).await?;
1027 if n == 0 {
1028 return Err(io::Error::new(ErrorKind::UnexpectedEof, "stream closed").into());
1029 }
1030 *filled += n;
1031 }
1032}