trillium_http/h3/connection.rs
1mod peer_settings_wait;
2
3use super::{
4 H3Error,
5 frame::{Frame, FrameDecodeError, UniStreamType},
6 quic_varint::{self, QuicVarIntError},
7 settings::H3Settings,
8};
9use crate::{
10 Buffer, Conn, HttpContext,
11 conn::H3FirstFrame,
12 h3::{H3ErrorCode, MAX_BUFFER_SIZE},
13 headers::qpack::{DecoderDynamicTable, EncoderDynamicTable, FieldSection},
14};
15use event_listener::Event;
16use futures_lite::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
17use std::{
18 future::Future,
19 io::{self, ErrorKind},
20 pin::Pin,
21 sync::{
22 Arc, OnceLock,
23 atomic::{AtomicBool, AtomicU64, Ordering},
24 },
25 task::{Context, Poll},
26};
27use swansong::{ShutdownCompletion, Swansong};
28
29/// The result of processing an HTTP/3 bidirectional stream.
30#[derive(Debug)]
31#[allow(clippy::large_enum_variant)] // Request is the hot path; boxing it would add an allocation per request
32pub enum H3StreamResult<Transport> {
33 /// The stream carried a normal HTTP/3 request.
34 Request(Conn<Transport>),
35
36 /// The stream carries a WebTransport bidirectional data stream. The `session_id` identifies
37 /// the associated WebTransport session.
38 WebTransport {
39 /// The WebTransport session ID (stream ID of the CONNECT request).
40 session_id: u64,
41 /// The underlying transport, ready for application data.
42 transport: Transport,
43 /// Any bytes buffered after the session ID during stream negotiation.
44 buffer: Buffer,
45 },
46}
47
48/// Inner-loop result of [`H3Connection::process_inbound_uni_with_close`] before the recv
49/// stream is reattached. Decouples the inner async block (which only borrows the stream)
50/// from the caller-visible [`UniStreamResult`] (which returns the stream by value on
51/// non-`Handled` variants), so the function can keep ownership of `stream` long enough to
52/// fire its close callback before `stream` drops.
53enum UniInnerResult {
54 Handled,
55 WebTransport { session_id: u64, buffer: Buffer },
56 Unknown { stream_type: u64 },
57}
58
59/// The result of processing an HTTP/3 unidirectional stream.
60#[derive(Debug)]
61pub enum UniStreamResult<T> {
62 /// The stream was a known internal type (control, QPACK encoder/decoder) and was handled
63 /// automatically.
64 Handled,
65
66 /// A WebTransport unidirectional data stream. The `session_id` identifies the associated
67 /// WebTransport session.
68 WebTransport {
69 /// The WebTransport session ID.
70 session_id: u64,
71 /// The receive stream, ready for application data.
72 stream: T,
73 /// Any bytes buffered after the session ID during stream negotiation.
74 buffer: Buffer,
75 },
76
77 /// A stream whose type is recognized but unsupported (e.g. `Push`) or not recognized
78 /// at all by this crate.
79 ///
80 /// The caller is responsible for disposing of the stream — the in-tree consumers
81 /// (`trillium-server-common` for servers, `trillium-client` for clients) RST it with
82 /// `H3_STREAM_CREATION_ERROR`. `process_inbound_uni` deliberately does *not* close
83 /// the stream itself: handing it back gives a downstream extension the option to
84 /// implement a stream type trillium-http doesn't yet know about (a future RFC, an
85 /// experiment, etc.) without forking the codec.
86 Unknown {
87 /// The raw stream type value.
88 stream_type: u64,
89 /// The stream.
90 stream: T,
91 },
92}
93
94/// Shared state for a single HTTP/3 QUIC connection.
95///
96/// Call the appropriate methods on this type for each stream accepted from the QUIC connection.
97///
98/// # Driver shape (vs h2)
99///
100/// h2 multiplexes everything onto a single TCP byte stream, so a single
101/// [`H2Driver`][crate::h2::H2Driver] task suffices. h3 instead has the QUIC layer hand us multiple
102/// independent streams: an inbound and outbound control stream, an inbound and outbound QPACK
103/// encoder stream, an inbound and outbound QPACK decoder stream, and one bidi stream per
104/// request. There is no single "h3 driver" — each stream is driven by its own future returned from
105/// `H3Connection`'s `run_*` / `process_*` methods, and the caller decides how those futures are
106/// scheduled.
107///
108/// The trillium-http boundary is **runtime-free by design**: this crate hands out anonymous futures
109/// and lets the caller pick the executor. The in-tree consumers (`trillium-server-common`,
110/// `trillium-client`) follow a task-per-stream pattern — spawn each long-lived control / encoder /
111/// decoder future on its own task at connection setup, then spawn one task per accepted request
112/// stream. Nothing in this crate requires that pattern; a caller could in principle race all the
113/// futures on one task instead, with different perf characteristics.
114#[derive(Debug)]
115pub struct H3Connection {
116 /// Shared configuration for the entire server, including tcp-based listeners
117 context: Arc<HttpContext>,
118
119 /// Connection-scoped shutdown signal. Shut down when we receive GOAWAY from the peer or when
120 /// the server-level Swansong shuts down. Request stream tasks use this to interrupt
121 /// in-progress work.
122 swansong: Swansong,
123
124 /// The peer's H3 settings, received on their control stream. Request streams may need to
125 /// consult these (e.g. max field section size).
126 pub(super) peer_settings: OnceLock<H3Settings>,
127
128 /// Multi-listener wake source for [`PeerSettings`]. Notified by `run_inbound_control` after
129 /// applying peer SETTINGS, and again on connection close, so any number of concurrently-
130 /// parked `PeerSettings` futures all unblock together.
131 pub(super) peer_settings_event: Event,
132
133 /// The highest bidirectional stream ID we have accepted. Used to compute the GOAWAY value
134 /// (this + 4) to tell the peer which requests we saw. None until the first stream is accepted.
135 /// Updated by the runtime adapter's accept loop via [`record_accepted_stream`].
136 max_accepted_stream_id: AtomicU64,
137
138 /// Whether we have accepted any streams yet.
139 has_accepted_stream: AtomicBool,
140
141 /// The decoder-side QPACK dynamic table for this connection.
142 decoder_dynamic_table: DecoderDynamicTable,
143
144 /// The encoder-side QPACK dynamic table for this connection.
145 encoder_dynamic_table: EncoderDynamicTable,
146}
147
148impl H3Connection {
149 /// Construct a new `H3Connection` to manage HTTP/3 for a given peer.
150 pub fn new(context: Arc<HttpContext>) -> Arc<Self> {
151 let swansong = context.swansong.child();
152 let max_table_capacity = context.config.dynamic_table_capacity;
153 let blocked_streams = context.config.h3_blocked_streams;
154 let encoder_dynamic_table = EncoderDynamicTable::new(&context);
155 Arc::new(Self {
156 context,
157 swansong,
158 peer_settings: OnceLock::new(),
159 peer_settings_event: Event::new(),
160 max_accepted_stream_id: AtomicU64::new(0),
161 has_accepted_stream: AtomicBool::new(false),
162 decoder_dynamic_table: DecoderDynamicTable::new(max_table_capacity, blocked_streams),
163 encoder_dynamic_table,
164 })
165 }
166
167 /// Retrieve the [`Swansong`] shutdown handle for this HTTP/3 connection. See also
168 /// [`H3Connection::shut_down`]
169 pub fn swansong(&self) -> &Swansong {
170 &self.swansong
171 }
172
173 /// Attempt graceful shutdown of this HTTP/3 connection (all streams).
174 ///
175 /// The returned [`ShutdownCompletion`] type can
176 /// either be awaited in an async context or blocked on with [`ShutdownCompletion::block`] in a
177 /// blocking context
178 ///
179 /// Note that this will NOT shut down the server. To shut down the whole server, use
180 /// [`HttpContext::shut_down`]
181 pub fn shut_down(&self) -> ShutdownCompletion {
182 // Wake any in-flight `decode_field_section` calls parked on the decoder
183 // table's `ThresholdWait` (a non-I/O future awaiting dynamic-table inserts
184 // from the peer). The encoder table's writer loop is already swansong-
185 // aware, but we mark it failed too for symmetry: any future state
186 // mutations after shutdown are no longer wire-relevant.
187 self.decoder_dynamic_table.fail(H3ErrorCode::NoError);
188 self.encoder_dynamic_table.fail(H3ErrorCode::NoError);
189 self.wake_peer_settings_waiters();
190 self.swansong.shut_down()
191 }
192
193 /// Retrieve the [`HttpContext`] for this server.
194 pub fn context(&self) -> Arc<HttpContext> {
195 self.context.clone()
196 }
197
198 /// Returns the peer's HTTP/3 settings, available once the peer's control stream has been
199 /// processed.
200 pub fn peer_settings(&self) -> Option<&H3Settings> {
201 self.peer_settings.get()
202 }
203
204 /// Record that we accepted a bidirectional stream with this ID.
205 fn record_accepted_stream(&self, stream_id: u64) {
206 self.max_accepted_stream_id
207 .fetch_max(stream_id, Ordering::Relaxed);
208 self.has_accepted_stream.store(true, Ordering::Relaxed);
209 }
210
211 /// The stream ID to send in a GOAWAY frame: one past the highest stream we accepted, or 0 if we
212 /// haven't accepted any.
213 fn goaway_id(&self) -> u64 {
214 if self.has_accepted_stream.load(Ordering::Relaxed) {
215 self.max_accepted_stream_id.load(Ordering::Relaxed) + 4
216 } else {
217 0
218 }
219 }
220
221 /// Process a single HTTP/3 request-response cycle on a bidirectional stream.
222 ///
223 /// Call this once per accepted bidirectional stream. Returns
224 /// [`H3StreamResult::WebTransport`] if the stream opens a WebTransport session rather than
225 /// a standard HTTP/3 request.
226 ///
227 /// On a stream-level protocol error (e.g. malformed pseudo-headers,
228 /// `H3_MESSAGE_ERROR`), this method drops the transport without resetting it. To honour
229 /// RFC 9114's stream-error MUSTs, callers should use [`process_inbound_bidi_with_reset`]
230 /// instead and pass a closure that issues a stream RST with the protocol error code.
231 ///
232 /// [`process_inbound_bidi_with_reset`]: Self::process_inbound_bidi_with_reset
233 ///
234 /// # Errors
235 ///
236 /// Returns an `H3Error` in case of io error or http/3 semantic error.
237 #[deprecated(
238 since = "1.2.0",
239 note = "use `process_inbound_bidi_with_reset` so stream-level protocol errors RST the \
240 stream as required by RFC 9114 §4.1.2"
241 )]
242 pub async fn process_inbound_bidi<Transport, Handler, Fut>(
243 self: Arc<Self>,
244 transport: Transport,
245 handler: Handler,
246 stream_id: u64,
247 ) -> Result<H3StreamResult<Transport>, H3Error>
248 where
249 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
250 Handler: FnOnce(Conn<Transport>) -> Fut,
251 Fut: Future<Output = Conn<Transport>>,
252 {
253 self.process_inbound_bidi_with_reset(transport, handler, stream_id, |_, _| {})
254 .await
255 }
256
257 /// Process a single HTTP/3 request-response cycle on a bidirectional stream, calling
258 /// `reset` to issue a stream RST when a stream-level protocol error occurs.
259 ///
260 /// Identical to [`process_inbound_bidi`][Self::process_inbound_bidi] except that on any
261 /// `H3Error::Protocol(code)` produced by first-frame processing (HEADERS decode,
262 /// pseudo-header validation, etc.), `reset` is invoked with the still-owned transport and
263 /// the error code before the error is returned. This lets callers RST both the recv and
264 /// send halves of the bidi stream — required by RFC 9114 §4.1.2 for stream errors like
265 /// `H3_MESSAGE_ERROR`. I/O errors and successful runs do not invoke `reset`.
266 ///
267 /// `reset` is a `FnOnce` taking `(&mut Transport, H3ErrorCode)`. trillium-http does not
268 /// itself depend on any reset capability of the transport; callers wire up the actual
269 /// stream-RST mechanism (e.g. quinn's `RecvStream::stop` + `SendStream::reset`) inside
270 /// the closure.
271 ///
272 /// # Errors
273 ///
274 /// Returns an `H3Error` in case of io error or http/3 semantic error.
275 pub async fn process_inbound_bidi_with_reset<Transport, Handler, Fut, Reset>(
276 self: Arc<Self>,
277 mut transport: Transport,
278 handler: Handler,
279 stream_id: u64,
280 reset: Reset,
281 ) -> Result<H3StreamResult<Transport>, H3Error>
282 where
283 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
284 Handler: FnOnce(Conn<Transport>) -> Fut,
285 Fut: Future<Output = Conn<Transport>>,
286 Reset: FnOnce(&mut Transport, H3ErrorCode),
287 {
288 self.record_accepted_stream(stream_id);
289 let _guard = self.swansong.guard();
290 let mut buffer: Buffer =
291 Vec::with_capacity(self.context.config.request_buffer_initial_len).into();
292
293 let outcome =
294 Conn::process_first_frame_h3(&self, &mut transport, &mut buffer, stream_id).await;
295
296 match outcome {
297 Ok(H3FirstFrame::Request {
298 validated,
299 start_time,
300 }) => {
301 let conn =
302 Conn::build_h3(self, transport, buffer, validated, start_time, stream_id);
303 Ok(H3StreamResult::Request(
304 handler(conn).await.send_h3().await?,
305 ))
306 }
307 Ok(H3FirstFrame::WebTransport { session_id }) => Ok(H3StreamResult::WebTransport {
308 session_id,
309 transport,
310 buffer,
311 }),
312 Err(error) => {
313 if let H3Error::Protocol(code) = &error {
314 reset(&mut transport, *code);
315 }
316 Err(error)
317 }
318 }
319 }
320
321 /// Decode a QPACK-encoded field section, consulting the dynamic table as needed.
322 ///
323 /// If the field section's Required Insert Count is greater than zero, waits until the
324 /// dynamic table has received enough entries. Returns an error on protocol violations or
325 /// if the encoder stream fails while waiting.
326 ///
327 /// Duplicate pseudo-headers are silently ignored (first value wins).
328 /// Unknown pseudo-headers are rejected per RFC 9114 §4.1.1.
329 ///
330 /// # Errors
331 ///
332 /// Returns an error if the encoded bytes cannot be parsed as a valid field section.
333 #[cfg(feature = "unstable")]
334 pub async fn decode_field_section(
335 &self,
336 encoded: &[u8],
337 stream_id: u64,
338 ) -> Result<FieldSection<'static>, H3Error> {
339 self.decoder_dynamic_table.decode(encoded, stream_id).await
340 }
341
342 #[cfg(not(feature = "unstable"))]
343 pub(crate) async fn decode_field_section(
344 &self,
345 encoded: &[u8],
346 stream_id: u64,
347 ) -> Result<FieldSection<'static>, H3Error> {
348 self.decoder_dynamic_table.decode(encoded, stream_id).await
349 }
350
351 /// Encode a QPACK field section from pseudo-headers and headers.
352 ///
353 /// This currently uses only the static table (no dynamic table).
354 /// Decode a QPACK-encoded field section, consulting the dynamic table as needed.
355 ///
356 /// # Errors
357 ///
358 /// Returns an `H3Error` in case of http/3 semantic error.
359 #[cfg(feature = "unstable")]
360 #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
361 pub fn encode_field_section(
362 &self,
363 field_section: &FieldSection<'_>,
364 buf: &mut Vec<u8>,
365 stream_id: u64,
366 ) -> Result<(), H3Error> {
367 self.encoder_dynamic_table
368 .encode(field_section, buf, stream_id);
369 Ok(())
370 }
371
372 #[cfg(not(feature = "unstable"))]
373 #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
374 pub(crate) fn encode_field_section(
375 &self,
376 field_section: &FieldSection<'_>,
377 buf: &mut Vec<u8>,
378 stream_id: u64,
379 ) -> Result<(), H3Error> {
380 self.encoder_dynamic_table
381 .encode(field_section, buf, stream_id);
382 Ok(())
383 }
384
385 /// Run this server's HTTP/3 outbound control stream.
386 ///
387 /// Sends the initial SETTINGS frame, then sends GOAWAY when the connection shuts down.
388 /// Returns after GOAWAY is sent; keep the stream open until the QUIC connection closes
389 /// (closing a control stream is a connection error per RFC 9114 §6.2.1).
390 ///
391 /// # Errors
392 ///
393 /// Returns an `H3Error` in case of io error or http/3 semantic error.
394 pub async fn run_outbound_control<T>(&self, mut stream: T) -> Result<(), H3Error>
395 where
396 T: AsyncWrite + Unpin + Send,
397 {
398 let mut buf = vec![0; 128];
399
400 // Stream type + SETTINGS frame
401 let settings = Frame::Settings(H3Settings::from(&self.context.config));
402 log::trace!(
403 "H3 outbound control: sending SETTINGS: {:?}",
404 H3Settings::from(&self.context.config)
405 );
406
407 write(&mut buf, &mut stream, |buf| {
408 let mut written = quic_varint::encode(UniStreamType::Control, buf)?;
409 written += settings.encode(&mut buf[written..])?;
410 Some(written)
411 })
412 .await?;
413 log::trace!("H3 outbound control: SETTINGS sent");
414
415 // Wait for shutdown
416 self.swansong.clone().await;
417
418 // Send GOAWAY
419 write(&mut buf, &mut stream, |buf| {
420 Frame::Goaway(self.goaway_id()).encode(buf)
421 })
422 .await?;
423
424 Ok(())
425 }
426
427 /// Run the outbound QPACK encoder stream for the duration of the connection.
428 ///
429 /// Writes the stream type byte, then drains encoder-stream instructions from the encoder
430 /// dynamic table as they are enqueued. Returns when the connection shuts down or the table is
431 /// marked failed.
432 ///
433 /// # Errors
434 ///
435 /// Returns an `H3Error` in case of io error.
436 pub async fn run_encoder<T>(&self, mut stream: T) -> Result<(), H3Error>
437 where
438 T: AsyncWrite + Unpin + Send,
439 {
440 self.encoder_dynamic_table
441 .run_writer(&mut stream, self.swansong.clone())
442 .await
443 }
444
445 /// Run the outbound QPACK decoder stream for the duration of the connection.
446 ///
447 /// Writes the stream type byte, then loops sending Section Acknowledgement and Insert
448 /// Count Increment instructions as they become needed. Returns when the connection
449 /// shuts down.
450 ///
451 /// # Errors
452 ///
453 /// Returns an `H3Error` in case of io error or http/3 semantic error.
454 pub async fn run_decoder<T>(&self, mut stream: T) -> Result<(), H3Error>
455 where
456 T: AsyncWrite + Unpin + Send,
457 {
458 self.decoder_dynamic_table
459 .run_writer(&mut stream, self.swansong.clone())
460 .await
461 }
462
463 /// Handle an inbound unidirectional HTTP/3 stream from the peer.
464 ///
465 /// Internal stream types (control, QPACK encoder/decoder) are handled automatically;
466 /// application streams are returned via [`UniStreamResult`] for the caller to process.
467 ///
468 /// On a connection-level protocol error, this method drops the recv stream before
469 /// the caller can react. Quinn's `RecvStream::drop` then sends `STOP_SENDING`, which
470 /// races against the caller's `connection.close` — if the peer responds with a
471 /// malformed `RESET_STREAM` (notably `final_offset = 0`) before our app close is
472 /// applied, the transport-level error overrides our app error code on the wire.
473 /// Use [`process_inbound_uni_with_close`] to thread the close call through the
474 /// function so it fires before the stream drops.
475 ///
476 /// [`process_inbound_uni_with_close`]: Self::process_inbound_uni_with_close
477 ///
478 /// # Errors
479 ///
480 /// Returns a `H3Error` in case of io error or http/3 semantic error.
481 #[deprecated(
482 since = "1.2.0",
483 note = "use `process_inbound_uni_with_close` so connection-level protocol errors close \
484 the QUIC connection before the recv stream drops, avoiding a `FINAL_SIZE_ERROR` \
485 race with the peer's response to STOP_SENDING"
486 )]
487 pub async fn process_inbound_uni<T>(&self, stream: T) -> Result<UniStreamResult<T>, H3Error>
488 where
489 T: AsyncRead + Unpin + Send,
490 {
491 self.process_inbound_uni_with_close(stream, |_| {}).await
492 }
493
494 /// Handle an inbound unidirectional HTTP/3 stream from the peer, calling `on_close` to
495 /// close the QUIC connection if a connection-level protocol error is detected.
496 ///
497 /// Identical to [`process_inbound_uni`][Self::process_inbound_uni] except that on
498 /// any `H3Error::Protocol(code)` whose code is a connection-level error
499 /// (RFC 9114 §8.1, RFC 9204 §6), `on_close` is invoked with that code while the
500 /// recv stream is still alive. This lets callers send a `CONNECTION_CLOSE` before
501 /// the stream drops — if the close call sets quinn's `conn.error`, quinn's
502 /// `RecvStream::drop` skips `STOP_SENDING`, eliminating a peer race that otherwise
503 /// causes `FINAL_SIZE_ERROR` to override the app error code.
504 ///
505 /// `on_close` is a `FnOnce` taking `H3ErrorCode`. trillium-http does not itself
506 /// hold the QUIC connection; callers wire up the actual `connection.close()` call
507 /// inside the closure (e.g. quinn's `Connection::close`).
508 ///
509 /// # Errors
510 ///
511 /// Returns a `H3Error` in case of io error or http/3 semantic error.
512 pub async fn process_inbound_uni_with_close<T, OnClose>(
513 &self,
514 mut stream: T,
515 on_close: OnClose,
516 ) -> Result<UniStreamResult<T>, H3Error>
517 where
518 T: AsyncRead + Unpin + Send,
519 OnClose: FnOnce(H3ErrorCode),
520 {
521 let inner = self
522 .swansong
523 .interrupt(self.process_inbound_uni_inner(&mut stream))
524 .await
525 .unwrap_or(Ok(UniInnerResult::Handled)); // interrupted
526
527 match inner {
528 Ok(UniInnerResult::Handled) => Ok(UniStreamResult::Handled),
529 Ok(UniInnerResult::WebTransport { session_id, buffer }) => {
530 Ok(UniStreamResult::WebTransport {
531 session_id,
532 stream,
533 buffer,
534 })
535 }
536 Ok(UniInnerResult::Unknown { stream_type }) => Ok(UniStreamResult::Unknown {
537 stream_type,
538 stream,
539 }),
540 Err(error) => {
541 // Fire `on_close` BEFORE returning so the caller's connection.close
542 // call sets quinn's `conn.error` while `stream` is still alive. When
543 // `stream` then drops at function return, quinn's `RecvStream::drop`
544 // skips STOP_SENDING — preventing the peer-RESET_STREAM race that
545 // otherwise replaces our app close code with FINAL_SIZE_ERROR.
546 if let H3Error::Protocol(code) = &error
547 && code.is_connection_error()
548 {
549 on_close(*code);
550 }
551 Err(error)
552 }
553 }
554 }
555
556 /// Inner-loop body of [`process_inbound_uni_with_close`][Self::process_inbound_uni_with_close].
557 /// Borrows `stream` so the outer function can keep ownership of it across the await,
558 /// which lets the caller's close callback fire before the recv stream drops.
559 async fn process_inbound_uni_inner<T>(&self, stream: &mut T) -> Result<UniInnerResult, H3Error>
560 where
561 T: AsyncRead + Unpin + Send,
562 {
563 let mut buf = vec![0; 128];
564 let mut filled = 0;
565
566 // Read stream type varint (decode as raw u64 to handle unknown types)
567 let stream_type = read(&mut buf, &mut filled, stream, |data| {
568 match quic_varint::decode(data) {
569 Ok(ok) => Ok(Some(ok)),
570 Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
571 // this branch is unreachable because u64 is always From<u64>
572 Err(QuicVarIntError::UnknownValue { bytes, value }) => Ok(Some((value, bytes))),
573 }
574 })
575 .await?;
576
577 match UniStreamType::try_from(stream_type) {
578 Ok(UniStreamType::Control) => {
579 log::trace!("H3 inbound uni: control stream");
580 self.run_inbound_control(&mut buf, &mut filled, stream)
581 .await?;
582 Ok(UniInnerResult::Handled)
583 }
584
585 Ok(UniStreamType::QpackEncoder) => {
586 log::trace!("H3 inbound uni: QPACK encoder stream ({filled} bytes pre-read)");
587 let mut reader = Prepended {
588 head: &buf[..filled],
589 tail: stream,
590 };
591
592 log::trace!("QPACK encoder stream: started");
593 self.decoder_dynamic_table.run_reader(&mut reader).await?;
594
595 Ok(UniInnerResult::Handled)
596 }
597
598 Ok(UniStreamType::QpackDecoder) => {
599 log::trace!("H3 inbound uni: QPACK decoder stream ({filled} bytes pre-read)");
600 let mut reader = Prepended {
601 head: &buf[..filled],
602 tail: stream,
603 };
604 self.encoder_dynamic_table.run_reader(&mut reader).await?;
605 Ok(UniInnerResult::Handled)
606 }
607
608 Ok(UniStreamType::WebTransport) => {
609 log::trace!("H3 inbound uni: WebTransport stream");
610 let session_id =
611 read(
612 &mut buf,
613 &mut filled,
614 stream,
615 |data| match quic_varint::decode(data) {
616 Ok(ok) => Ok(Some(ok)),
617 Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
618 Err(QuicVarIntError::UnknownValue { bytes, value }) => {
619 Ok(Some((value, bytes)))
620 }
621 },
622 )
623 .await?;
624
625 buf.truncate(filled);
626
627 Ok(UniInnerResult::WebTransport {
628 session_id,
629 buffer: buf.into(),
630 })
631 }
632
633 Ok(UniStreamType::Push) => {
634 // Push streams are server→client per RFC 9114 §4.6. Trillium does
635 // not support HTTP/3 push as initiator or recipient, so we hand
636 // these back as `Unknown` for the caller to dispose of identically
637 // to truly-unknown stream types — the explicit arm exists so trace
638 // output names "push stream" rather than a bare type id.
639 log::trace!("H3 inbound uni: push stream (push not supported)");
640 Ok(UniInnerResult::Unknown { stream_type })
641 }
642
643 Err(_) => {
644 log::trace!("H3 inbound uni: unknown stream type {stream_type:#x}");
645 Ok(UniInnerResult::Unknown { stream_type })
646 }
647 }
648 }
649
650 /// Handle the http/3 peer's inbound control stream.
651 ///
652 /// # Errors
653 ///
654 /// Returns a `H3Error` in case of io error or HTTP/3 semantic error.
655 // The first frame must be SETTINGS. After that, watches for
656 // GOAWAY to initiate connection shutdown.
657 async fn run_inbound_control<T>(
658 &self,
659 buf: &mut Vec<u8>,
660 filled: &mut usize,
661 stream: &mut T,
662 ) -> Result<(), H3Error>
663 where
664 T: AsyncRead + Unpin + Send,
665 {
666 // First frame must be SETTINGS (§6.2.1). A non-SETTINGS first frame OR a malformed
667 // first frame whose payload doesn't decode are both H3_MISSING_SETTINGS. *But* if
668 // the first frame is a SETTINGS frame whose payload is itself invalid (e.g.
669 // forbidden HTTP/2 setting IDs per §7.2.4.1), that's H3_SETTINGS_ERROR — preserve.
670 let settings = read(buf, filled, stream, |data| match Frame::decode(data) {
671 Ok((Frame::Settings(s), consumed)) => Ok(Some((s, consumed))),
672 Err(FrameDecodeError::Incomplete) => Ok(None),
673 Err(FrameDecodeError::Error(H3ErrorCode::SettingsError)) => {
674 Err(H3ErrorCode::SettingsError)
675 }
676 Ok(_) | Err(FrameDecodeError::Error(_)) => Err(H3ErrorCode::MissingSettings),
677 })
678 .await
679 .map_err(map_critical_stream_eof)?;
680
681 log::trace!("H3 peer settings: {settings:?}");
682
683 self.peer_settings
684 .set(settings)
685 .map_err(|_| H3ErrorCode::FrameUnexpected)?;
686 self.wake_peer_settings_waiters();
687
688 self.encoder_dynamic_table
689 .initialize_from_peer_settings(settings);
690
691 // Read subsequent frames, watching for GOAWAY
692 loop {
693 let frame = self
694 .swansong
695 .interrupt(read(buf, filled, stream, |data| {
696 match Frame::decode(data) {
697 Ok((frame, consumed)) => Ok(Some((frame, consumed))),
698 Err(FrameDecodeError::Incomplete) => Ok(None),
699 Err(FrameDecodeError::Error(code)) => Err(code),
700 }
701 }))
702 .await
703 .transpose()
704 .map_err(map_critical_stream_eof)?;
705
706 match frame {
707 None => {
708 log::trace!("H3 control stream: interrupted by shutdown");
709 return Ok(());
710 }
711
712 Some(Frame::Goaway(id)) => {
713 log::trace!("H3 control stream: peer sent GOAWAY(stream_id={id})");
714 self.swansong.shut_down();
715 return Ok(());
716 }
717
718 Some(Frame::Unknown(n)) => {
719 // RFC 9114 §7.2.8: unknown frame types MUST be ignored.
720 // We must also consume the payload bytes so the stream stays synchronized.
721 log::trace!("H3 control stream: skipping unknown frame (payload {n} bytes)");
722 let n = usize::try_from(n).unwrap_or(usize::MAX);
723 let in_buf = n.min(*filled);
724 buf.copy_within(in_buf..*filled, 0);
725 *filled -= in_buf;
726 let mut todo = n - in_buf;
727 let mut scratch = [0u8; 256];
728 while todo > 0 {
729 let to_read = todo.min(scratch.len());
730 let n = stream
731 .read(&mut scratch[..to_read])
732 .await
733 .map_err(H3Error::Io)?;
734 if n == 0 {
735 return Err(H3ErrorCode::ClosedCriticalStream.into());
736 }
737 todo -= n;
738 }
739 }
740
741 // RFC 9114 §7.2.4: a second SETTINGS frame is H3_FRAME_UNEXPECTED.
742 // RFC 9114 §7.2.1 / §7.2.2 / §7.2.5: DATA, HEADERS, and PUSH_PROMISE are
743 // not permitted on the control stream; same H3_FRAME_UNEXPECTED. The
744 // WebTransport bidi-signal (0x41) similarly has no business here.
745 Some(
746 Frame::Settings(_)
747 | Frame::Data(_)
748 | Frame::Headers(_)
749 | Frame::PushPromise { .. }
750 | Frame::WebTransport(_),
751 ) => {
752 return Err(H3ErrorCode::FrameUnexpected.into());
753 }
754
755 // CANCEL_PUSH and MAX_PUSH_ID are valid control-stream frames; we don't push,
756 // so we ignore them.
757 Some(Frame::CancelPush(_) | Frame::MaxPushId(_)) => {
758 log::trace!("H3 control stream: ignoring {frame:?}");
759 }
760 }
761 }
762 }
763}
764
765/// Map an `UnexpectedEof` I/O error (the `read` helper's "stream FIN'd" signal) to
766/// `H3_CLOSED_CRITICAL_STREAM`. RFC 9114 §6.2.1 forbids closure of the control stream;
767/// closure of either QPACK side-channel is the same connection error per RFC 9204 §4.2.
768/// Other I/O errors and any protocol error are passed through unchanged.
769fn map_critical_stream_eof(error: H3Error) -> H3Error {
770 match error {
771 H3Error::Io(e) if e.kind() == ErrorKind::UnexpectedEof => {
772 H3ErrorCode::ClosedCriticalStream.into()
773 }
774 other => other,
775 }
776}
777
778async fn write(
779 buf: &mut Vec<u8>,
780 mut stream: impl AsyncWrite + Unpin + Send,
781 mut f: impl FnMut(&mut [u8]) -> Option<usize>,
782) -> io::Result<usize> {
783 let written = loop {
784 if let Some(w) = f(buf) {
785 break w;
786 }
787 if buf.len() >= MAX_BUFFER_SIZE {
788 return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation"));
789 }
790 buf.resize(buf.len() * 2, 0);
791 };
792
793 stream.write_all(&buf[..written]).await?;
794 stream.flush().await?;
795 Ok(written)
796}
797
798/// An `AsyncRead` adapter that drains a byte slice before reading from an inner stream.
799///
800/// Used in `process_inbound_uni` to replay bytes that were read ahead while
801/// parsing the stream-type varint before dispatching to `run_inbound_encoder`.
802struct Prepended<'a, T> {
803 head: &'a [u8],
804 tail: T,
805}
806
807impl<T: AsyncRead + Unpin> AsyncRead for Prepended<'_, T> {
808 fn poll_read(
809 self: Pin<&mut Self>,
810 cx: &mut Context<'_>,
811 out: &mut [u8],
812 ) -> Poll<io::Result<usize>> {
813 let this = self.get_mut();
814 if !this.head.is_empty() {
815 let n = this.head.len().min(out.len());
816 out[..n].copy_from_slice(&this.head[..n]);
817 this.head = &this.head[n..];
818 return Poll::Ready(Ok(n));
819 }
820 Pin::new(&mut this.tail).poll_read(cx, out)
821 }
822}
823
824/// Read from `stream` into `buf` until `f` can decode a value.
825///
826/// `f` receives the filled portion of the buffer and returns:
827/// - `Ok(Some((value, consumed)))` — success; consumed bytes are removed from the front
828/// - `Ok(None)` — need more data; reads more bytes and retries
829/// - `Err(e)` — unrecoverable error; propagated to caller
830async fn read<R>(
831 buf: &mut Vec<u8>,
832 filled: &mut usize,
833 stream: &mut (impl AsyncRead + Unpin + Send),
834 f: impl Fn(&[u8]) -> Result<Option<(R, usize)>, H3ErrorCode>,
835) -> Result<R, H3Error> {
836 loop {
837 if let Some((result, consumed)) = f(&buf[..*filled])? {
838 buf.copy_within(consumed..*filled, 0);
839 *filled -= consumed;
840 return Ok(result);
841 }
842
843 if *filled >= buf.len() {
844 if buf.len() >= MAX_BUFFER_SIZE {
845 return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation").into());
846 }
847 buf.resize(buf.len() * 2, 0);
848 }
849
850 let n = stream.read(&mut buf[*filled..]).await?;
851 if n == 0 {
852 return Err(io::Error::new(ErrorKind::UnexpectedEof, "stream closed").into());
853 }
854 *filled += n;
855 }
856}