Skip to main content

trillium_http/h3/
connection.rs

1mod peer_settings_wait;
2
3use super::{
4    H3Error,
5    frame::{Frame, FrameDecodeError, UniStreamType},
6    quic_varint::{self, QuicVarIntError},
7    settings::H3Settings,
8};
9use crate::{
10    Buffer, Conn, HttpContext,
11    conn::H3FirstFrame,
12    h3::{H3ErrorCode, MAX_BUFFER_SIZE},
13    headers::qpack::{DecoderDynamicTable, EncoderDynamicTable, FieldSection},
14};
15use event_listener::Event;
16use futures_lite::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
17use std::{
18    future::Future,
19    io::{self, ErrorKind},
20    pin::Pin,
21    sync::{
22        Arc, OnceLock,
23        atomic::{AtomicBool, AtomicU64, Ordering},
24    },
25    task::{Context, Poll},
26};
27use swansong::{ShutdownCompletion, Swansong};
28
29/// The result of processing an HTTP/3 bidirectional stream.
30#[derive(Debug)]
31#[allow(clippy::large_enum_variant)] // Request is the hot path; boxing it would add an allocation per request
32pub enum H3StreamResult<Transport> {
33    /// The stream carried a normal HTTP/3 request.
34    Request(Conn<Transport>),
35
36    /// The stream carries a WebTransport bidirectional data stream. The `session_id` identifies
37    /// the associated WebTransport session.
38    WebTransport {
39        /// The WebTransport session ID (stream ID of the CONNECT request).
40        session_id: u64,
41        /// The underlying transport, ready for application data.
42        transport: Transport,
43        /// Any bytes buffered after the session ID during stream negotiation.
44        buffer: Buffer,
45    },
46}
47
48/// Inner-loop result of [`H3Connection::process_inbound_uni_with_close`] before the recv
49/// stream is reattached. Decouples the inner async block (which only borrows the stream)
50/// from the caller-visible [`UniStreamResult`] (which returns the stream by value on
51/// non-`Handled` variants), so the function can keep ownership of `stream` long enough to
52/// fire its close callback before `stream` drops.
53enum UniInnerResult {
54    Handled,
55    WebTransport { session_id: u64, buffer: Buffer },
56    Unknown { stream_type: u64 },
57}
58
59/// The result of processing an HTTP/3 unidirectional stream.
60#[derive(Debug)]
61pub enum UniStreamResult<T> {
62    /// The stream was a known internal type (control, QPACK encoder/decoder) and was handled
63    /// automatically.
64    Handled,
65
66    /// A WebTransport unidirectional data stream. The `session_id` identifies the associated
67    /// WebTransport session.
68    WebTransport {
69        /// The WebTransport session ID.
70        session_id: u64,
71        /// The receive stream, ready for application data.
72        stream: T,
73        /// Any bytes buffered after the session ID during stream negotiation.
74        buffer: Buffer,
75    },
76
77    /// A stream whose type is recognized but unsupported (e.g. `Push`) or not recognized
78    /// at all by this crate.
79    ///
80    /// The caller is responsible for disposing of the stream — the in-tree consumers
81    /// (`trillium-server-common` for servers, `trillium-client` for clients) RST it with
82    /// `H3_STREAM_CREATION_ERROR`. `process_inbound_uni` deliberately does *not* close
83    /// the stream itself: handing it back gives a downstream extension the option to
84    /// implement a stream type trillium-http doesn't yet know about (a future RFC, an
85    /// experiment, etc.) without forking the codec.
86    Unknown {
87        /// The raw stream type value.
88        stream_type: u64,
89        /// The stream.
90        stream: T,
91    },
92}
93
94/// Shared state for a single HTTP/3 QUIC connection.
95///
96/// Call the appropriate methods on this type for each stream accepted from the QUIC connection.
97///
98/// # Driver shape (vs h2)
99///
100/// h2 multiplexes everything onto a single TCP byte stream, so a single
101/// [`H2Driver`][crate::h2::H2Driver] task suffices. h3 instead has the QUIC layer hand us multiple
102/// independent streams: an inbound and outbound control stream, an inbound and outbound QPACK
103/// encoder stream, an inbound and outbound QPACK decoder stream, and one bidi stream per
104/// request. There is no single "h3 driver" — each stream is driven by its own future returned from
105/// `H3Connection`'s `run_*` / `process_*` methods, and the caller decides how those futures are
106/// scheduled.
107///
108/// The trillium-http boundary is **runtime-free by design**: this crate hands out anonymous futures
109/// and lets the caller pick the executor. The in-tree consumers (`trillium-server-common`,
110/// `trillium-client`) follow a task-per-stream pattern — spawn each long-lived control / encoder /
111/// decoder future on its own task at connection setup, then spawn one task per accepted request
112/// stream. Nothing in this crate requires that pattern; a caller could in principle race all the
113/// futures on one task instead, with different perf characteristics.
114#[derive(Debug)]
115pub struct H3Connection {
116    /// Shared configuration for the entire server, including tcp-based listeners
117    context: Arc<HttpContext>,
118
119    /// Connection-scoped shutdown signal. Shut down when we receive GOAWAY from the peer or when
120    /// the server-level Swansong shuts down.  Request stream tasks use this to interrupt
121    /// in-progress work.
122    swansong: Swansong,
123
124    /// The peer's H3 settings, received on their control stream.  Request streams may need to
125    /// consult these (e.g. max field section size).
126    pub(super) peer_settings: OnceLock<H3Settings>,
127
128    /// Multi-listener wake source for [`PeerSettings`]. Notified by `run_inbound_control` after
129    /// applying peer SETTINGS, and again on connection close, so any number of concurrently-
130    /// parked `PeerSettings` futures all unblock together.
131    pub(super) peer_settings_event: Event,
132
133    /// The highest bidirectional stream ID we have accepted.  Used to compute the GOAWAY value
134    /// (this + 4) to tell the peer which requests we saw. None until the first stream is accepted.
135    /// Updated by the runtime adapter's accept loop via [`record_accepted_stream`].
136    max_accepted_stream_id: AtomicU64,
137
138    /// Whether we have accepted any streams yet.
139    has_accepted_stream: AtomicBool,
140
141    /// The decoder-side QPACK dynamic table for this connection.
142    decoder_dynamic_table: DecoderDynamicTable,
143
144    /// The encoder-side QPACK dynamic table for this connection.
145    encoder_dynamic_table: EncoderDynamicTable,
146}
147
148impl H3Connection {
149    /// Construct a new `H3Connection` to manage HTTP/3 for a given peer.
150    pub fn new(context: Arc<HttpContext>) -> Arc<Self> {
151        let swansong = context.swansong.child();
152        let max_table_capacity = context.config.dynamic_table_capacity;
153        let blocked_streams = context.config.h3_blocked_streams;
154        let encoder_dynamic_table = EncoderDynamicTable::new(&context);
155        Arc::new(Self {
156            context,
157            swansong,
158            peer_settings: OnceLock::new(),
159            peer_settings_event: Event::new(),
160            max_accepted_stream_id: AtomicU64::new(0),
161            has_accepted_stream: AtomicBool::new(false),
162            decoder_dynamic_table: DecoderDynamicTable::new(max_table_capacity, blocked_streams),
163            encoder_dynamic_table,
164        })
165    }
166
167    /// Retrieve the [`Swansong`] shutdown handle for this HTTP/3 connection. See also
168    /// [`H3Connection::shut_down`]
169    pub fn swansong(&self) -> &Swansong {
170        &self.swansong
171    }
172
173    /// Attempt graceful shutdown of this HTTP/3 connection (all streams).
174    ///
175    /// The returned [`ShutdownCompletion`] type can
176    /// either be awaited in an async context or blocked on with [`ShutdownCompletion::block`] in a
177    /// blocking context
178    ///
179    /// Note that this will NOT shut down the server. To shut down the whole server, use
180    /// [`HttpContext::shut_down`]
181    pub fn shut_down(&self) -> ShutdownCompletion {
182        // Wake any in-flight `decode_field_section` calls parked on the decoder
183        // table's `ThresholdWait` (a non-I/O future awaiting dynamic-table inserts
184        // from the peer). The encoder table's writer loop is already swansong-
185        // aware, but we mark it failed too for symmetry: any future state
186        // mutations after shutdown are no longer wire-relevant.
187        self.decoder_dynamic_table.fail(H3ErrorCode::NoError);
188        self.encoder_dynamic_table.fail(H3ErrorCode::NoError);
189        self.wake_peer_settings_waiters();
190        self.swansong.shut_down()
191    }
192
193    /// Retrieve the [`HttpContext`] for this server.
194    pub fn context(&self) -> Arc<HttpContext> {
195        self.context.clone()
196    }
197
198    /// Returns the peer's HTTP/3 settings, available once the peer's control stream has been
199    /// processed.
200    pub fn peer_settings(&self) -> Option<&H3Settings> {
201        self.peer_settings.get()
202    }
203
204    /// Record that we accepted a bidirectional stream with this ID.
205    fn record_accepted_stream(&self, stream_id: u64) {
206        self.max_accepted_stream_id
207            .fetch_max(stream_id, Ordering::Relaxed);
208        self.has_accepted_stream.store(true, Ordering::Relaxed);
209    }
210
211    /// The stream ID to send in a GOAWAY frame: one past the highest stream we accepted, or 0 if we
212    /// haven't accepted any.
213    fn goaway_id(&self) -> u64 {
214        if self.has_accepted_stream.load(Ordering::Relaxed) {
215            self.max_accepted_stream_id.load(Ordering::Relaxed) + 4
216        } else {
217            0
218        }
219    }
220
221    /// Process a single HTTP/3 request-response cycle on a bidirectional stream.
222    ///
223    /// Call this once per accepted bidirectional stream. Returns
224    /// [`H3StreamResult::WebTransport`] if the stream opens a WebTransport session rather than
225    /// a standard HTTP/3 request.
226    ///
227    /// On a stream-level protocol error (e.g. malformed pseudo-headers,
228    /// `H3_MESSAGE_ERROR`), this method drops the transport without resetting it. To honour
229    /// RFC 9114's stream-error MUSTs, callers should use [`process_inbound_bidi_with_reset`]
230    /// instead and pass a closure that issues a stream RST with the protocol error code.
231    ///
232    /// [`process_inbound_bidi_with_reset`]: Self::process_inbound_bidi_with_reset
233    ///
234    /// # Errors
235    ///
236    /// Returns an `H3Error` in case of io error or http/3 semantic error.
237    #[deprecated(
238        since = "1.2.0",
239        note = "use `process_inbound_bidi_with_reset` so stream-level protocol errors RST the \
240                stream as required by RFC 9114 §4.1.2"
241    )]
242    pub async fn process_inbound_bidi<Transport, Handler, Fut>(
243        self: Arc<Self>,
244        transport: Transport,
245        handler: Handler,
246        stream_id: u64,
247    ) -> Result<H3StreamResult<Transport>, H3Error>
248    where
249        Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
250        Handler: FnOnce(Conn<Transport>) -> Fut,
251        Fut: Future<Output = Conn<Transport>>,
252    {
253        self.process_inbound_bidi_with_reset(transport, handler, stream_id, |_, _| {})
254            .await
255    }
256
257    /// Process a single HTTP/3 request-response cycle on a bidirectional stream, calling
258    /// `reset` to issue a stream RST when a stream-level protocol error occurs.
259    ///
260    /// Identical to [`process_inbound_bidi`][Self::process_inbound_bidi] except that on any
261    /// `H3Error::Protocol(code)` produced by first-frame processing (HEADERS decode,
262    /// pseudo-header validation, etc.), `reset` is invoked with the still-owned transport and
263    /// the error code before the error is returned. This lets callers RST both the recv and
264    /// send halves of the bidi stream — required by RFC 9114 §4.1.2 for stream errors like
265    /// `H3_MESSAGE_ERROR`. I/O errors and successful runs do not invoke `reset`.
266    ///
267    /// `reset` is a `FnOnce` taking `(&mut Transport, H3ErrorCode)`. trillium-http does not
268    /// itself depend on any reset capability of the transport; callers wire up the actual
269    /// stream-RST mechanism (e.g. quinn's `RecvStream::stop` + `SendStream::reset`) inside
270    /// the closure.
271    ///
272    /// # Errors
273    ///
274    /// Returns an `H3Error` in case of io error or http/3 semantic error.
275    pub async fn process_inbound_bidi_with_reset<Transport, Handler, Fut, Reset>(
276        self: Arc<Self>,
277        mut transport: Transport,
278        handler: Handler,
279        stream_id: u64,
280        reset: Reset,
281    ) -> Result<H3StreamResult<Transport>, H3Error>
282    where
283        Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
284        Handler: FnOnce(Conn<Transport>) -> Fut,
285        Fut: Future<Output = Conn<Transport>>,
286        Reset: FnOnce(&mut Transport, H3ErrorCode),
287    {
288        self.record_accepted_stream(stream_id);
289        let _guard = self.swansong.guard();
290        let mut buffer: Buffer =
291            Vec::with_capacity(self.context.config.request_buffer_initial_len).into();
292
293        let outcome =
294            Conn::process_first_frame_h3(&self, &mut transport, &mut buffer, stream_id).await;
295
296        match outcome {
297            Ok(H3FirstFrame::Request {
298                validated,
299                start_time,
300            }) => {
301                let conn =
302                    Conn::build_h3(self, transport, buffer, validated, start_time, stream_id);
303                Ok(H3StreamResult::Request(
304                    handler(conn).await.send_h3().await?,
305                ))
306            }
307            Ok(H3FirstFrame::WebTransport { session_id }) => Ok(H3StreamResult::WebTransport {
308                session_id,
309                transport,
310                buffer,
311            }),
312            Err(error) => {
313                if let H3Error::Protocol(code) = &error {
314                    reset(&mut transport, *code);
315                }
316                Err(error)
317            }
318        }
319    }
320
321    /// Decode a QPACK-encoded field section, consulting the dynamic table as needed.
322    ///
323    /// If the field section's Required Insert Count is greater than zero, waits until the
324    /// dynamic table has received enough entries. Returns an error on protocol violations or
325    /// if the encoder stream fails while waiting.
326    ///
327    /// Duplicate pseudo-headers are silently ignored (first value wins).
328    /// Unknown pseudo-headers are rejected per RFC 9114 §4.1.1.
329    ///
330    /// # Errors
331    ///
332    /// Returns an error if the encoded bytes cannot be parsed as a valid field section.
333    #[cfg(feature = "unstable")]
334    pub async fn decode_field_section(
335        &self,
336        encoded: &[u8],
337        stream_id: u64,
338    ) -> Result<FieldSection<'static>, H3Error> {
339        self.decoder_dynamic_table.decode(encoded, stream_id).await
340    }
341
342    #[cfg(not(feature = "unstable"))]
343    pub(crate) async fn decode_field_section(
344        &self,
345        encoded: &[u8],
346        stream_id: u64,
347    ) -> Result<FieldSection<'static>, H3Error> {
348        self.decoder_dynamic_table.decode(encoded, stream_id).await
349    }
350
351    /// Encode a QPACK field section from pseudo-headers and headers.
352    ///
353    /// This currently uses only the static table (no dynamic table).
354    /// Decode a QPACK-encoded field section, consulting the dynamic table as needed.
355    ///
356    /// # Errors
357    ///
358    /// Returns an `H3Error` in case of http/3 semantic error.
359    #[cfg(feature = "unstable")]
360    #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
361    pub fn encode_field_section(
362        &self,
363        field_section: &FieldSection<'_>,
364        buf: &mut Vec<u8>,
365        stream_id: u64,
366    ) -> Result<(), H3Error> {
367        self.encoder_dynamic_table
368            .encode(field_section, buf, stream_id);
369        Ok(())
370    }
371
372    #[cfg(not(feature = "unstable"))]
373    #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
374    pub(crate) fn encode_field_section(
375        &self,
376        field_section: &FieldSection<'_>,
377        buf: &mut Vec<u8>,
378        stream_id: u64,
379    ) -> Result<(), H3Error> {
380        self.encoder_dynamic_table
381            .encode(field_section, buf, stream_id);
382        Ok(())
383    }
384
385    /// Run this server's HTTP/3 outbound control stream.
386    ///
387    /// Sends the initial SETTINGS frame, then sends GOAWAY when the connection shuts down.
388    /// Returns after GOAWAY is sent; keep the stream open until the QUIC connection closes
389    /// (closing a control stream is a connection error per RFC 9114 §6.2.1).
390    ///
391    /// # Errors
392    ///
393    /// Returns an `H3Error` in case of io error or http/3 semantic error.
394    pub async fn run_outbound_control<T>(&self, mut stream: T) -> Result<(), H3Error>
395    where
396        T: AsyncWrite + Unpin + Send,
397    {
398        let mut buf = vec![0; 128];
399
400        // Stream type + SETTINGS frame
401        let settings = Frame::Settings(H3Settings::from(&self.context.config));
402        log::trace!(
403            "H3 outbound control: sending SETTINGS: {:?}",
404            H3Settings::from(&self.context.config)
405        );
406
407        write(&mut buf, &mut stream, |buf| {
408            let mut written = quic_varint::encode(UniStreamType::Control, buf)?;
409            written += settings.encode(&mut buf[written..])?;
410            Some(written)
411        })
412        .await?;
413        log::trace!("H3 outbound control: SETTINGS sent");
414
415        // Wait for shutdown
416        self.swansong.clone().await;
417
418        // Send GOAWAY
419        write(&mut buf, &mut stream, |buf| {
420            Frame::Goaway(self.goaway_id()).encode(buf)
421        })
422        .await?;
423
424        Ok(())
425    }
426
427    /// Run the outbound QPACK encoder stream for the duration of the connection.
428    ///
429    /// Writes the stream type byte, then drains encoder-stream instructions from the encoder
430    /// dynamic table as they are enqueued. Returns when the connection shuts down or the table is
431    /// marked failed.
432    ///
433    /// # Errors
434    ///
435    /// Returns an `H3Error` in case of io error.
436    pub async fn run_encoder<T>(&self, mut stream: T) -> Result<(), H3Error>
437    where
438        T: AsyncWrite + Unpin + Send,
439    {
440        self.encoder_dynamic_table
441            .run_writer(&mut stream, self.swansong.clone())
442            .await
443    }
444
445    /// Run the outbound QPACK decoder stream for the duration of the connection.
446    ///
447    /// Writes the stream type byte, then loops sending Section Acknowledgement and Insert
448    /// Count Increment instructions as they become needed. Returns when the connection
449    /// shuts down.
450    ///
451    /// # Errors
452    ///
453    /// Returns an `H3Error` in case of io error or http/3 semantic error.
454    pub async fn run_decoder<T>(&self, mut stream: T) -> Result<(), H3Error>
455    where
456        T: AsyncWrite + Unpin + Send,
457    {
458        self.decoder_dynamic_table
459            .run_writer(&mut stream, self.swansong.clone())
460            .await
461    }
462
463    /// Handle an inbound unidirectional HTTP/3 stream from the peer.
464    ///
465    /// Internal stream types (control, QPACK encoder/decoder) are handled automatically;
466    /// application streams are returned via [`UniStreamResult`] for the caller to process.
467    ///
468    /// On a connection-level protocol error, this method drops the recv stream before
469    /// the caller can react. Quinn's `RecvStream::drop` then sends `STOP_SENDING`, which
470    /// races against the caller's `connection.close` — if the peer responds with a
471    /// malformed `RESET_STREAM` (notably `final_offset = 0`) before our app close is
472    /// applied, the transport-level error overrides our app error code on the wire.
473    /// Use [`process_inbound_uni_with_close`] to thread the close call through the
474    /// function so it fires before the stream drops.
475    ///
476    /// [`process_inbound_uni_with_close`]: Self::process_inbound_uni_with_close
477    ///
478    /// # Errors
479    ///
480    /// Returns a `H3Error` in case of io error or http/3 semantic error.
481    #[deprecated(
482        since = "1.2.0",
483        note = "use `process_inbound_uni_with_close` so connection-level protocol errors close \
484                the QUIC connection before the recv stream drops, avoiding a `FINAL_SIZE_ERROR` \
485                race with the peer's response to STOP_SENDING"
486    )]
487    pub async fn process_inbound_uni<T>(&self, stream: T) -> Result<UniStreamResult<T>, H3Error>
488    where
489        T: AsyncRead + Unpin + Send,
490    {
491        self.process_inbound_uni_with_close(stream, |_| {}).await
492    }
493
494    /// Handle an inbound unidirectional HTTP/3 stream from the peer, calling `on_close` to
495    /// close the QUIC connection if a connection-level protocol error is detected.
496    ///
497    /// Identical to [`process_inbound_uni`][Self::process_inbound_uni] except that on
498    /// any `H3Error::Protocol(code)` whose code is a connection-level error
499    /// (RFC 9114 §8.1, RFC 9204 §6), `on_close` is invoked with that code while the
500    /// recv stream is still alive. This lets callers send a `CONNECTION_CLOSE` before
501    /// the stream drops — if the close call sets quinn's `conn.error`, quinn's
502    /// `RecvStream::drop` skips `STOP_SENDING`, eliminating a peer race that otherwise
503    /// causes `FINAL_SIZE_ERROR` to override the app error code.
504    ///
505    /// `on_close` is a `FnOnce` taking `H3ErrorCode`. trillium-http does not itself
506    /// hold the QUIC connection; callers wire up the actual `connection.close()` call
507    /// inside the closure (e.g. quinn's `Connection::close`).
508    ///
509    /// # Errors
510    ///
511    /// Returns a `H3Error` in case of io error or http/3 semantic error.
512    pub async fn process_inbound_uni_with_close<T, OnClose>(
513        &self,
514        mut stream: T,
515        on_close: OnClose,
516    ) -> Result<UniStreamResult<T>, H3Error>
517    where
518        T: AsyncRead + Unpin + Send,
519        OnClose: FnOnce(H3ErrorCode),
520    {
521        let inner = self
522            .swansong
523            .interrupt(self.process_inbound_uni_inner(&mut stream))
524            .await
525            .unwrap_or(Ok(UniInnerResult::Handled)); // interrupted
526
527        match inner {
528            Ok(UniInnerResult::Handled) => Ok(UniStreamResult::Handled),
529            Ok(UniInnerResult::WebTransport { session_id, buffer }) => {
530                Ok(UniStreamResult::WebTransport {
531                    session_id,
532                    stream,
533                    buffer,
534                })
535            }
536            Ok(UniInnerResult::Unknown { stream_type }) => Ok(UniStreamResult::Unknown {
537                stream_type,
538                stream,
539            }),
540            Err(error) => {
541                // Fire `on_close` BEFORE returning so the caller's connection.close
542                // call sets quinn's `conn.error` while `stream` is still alive. When
543                // `stream` then drops at function return, quinn's `RecvStream::drop`
544                // skips STOP_SENDING — preventing the peer-RESET_STREAM race that
545                // otherwise replaces our app close code with FINAL_SIZE_ERROR.
546                if let H3Error::Protocol(code) = &error
547                    && code.is_connection_error()
548                {
549                    on_close(*code);
550                }
551                Err(error)
552            }
553        }
554    }
555
556    /// Inner-loop body of [`process_inbound_uni_with_close`][Self::process_inbound_uni_with_close].
557    /// Borrows `stream` so the outer function can keep ownership of it across the await,
558    /// which lets the caller's close callback fire before the recv stream drops.
559    async fn process_inbound_uni_inner<T>(&self, stream: &mut T) -> Result<UniInnerResult, H3Error>
560    where
561        T: AsyncRead + Unpin + Send,
562    {
563        let mut buf = vec![0; 128];
564        let mut filled = 0;
565
566        // Read stream type varint (decode as raw u64 to handle unknown types)
567        let stream_type = read(&mut buf, &mut filled, stream, |data| {
568            match quic_varint::decode(data) {
569                Ok(ok) => Ok(Some(ok)),
570                Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
571                // this branch is unreachable because u64 is always From<u64>
572                Err(QuicVarIntError::UnknownValue { bytes, value }) => Ok(Some((value, bytes))),
573            }
574        })
575        .await?;
576
577        match UniStreamType::try_from(stream_type) {
578            Ok(UniStreamType::Control) => {
579                log::trace!("H3 inbound uni: control stream");
580                self.run_inbound_control(&mut buf, &mut filled, stream)
581                    .await?;
582                Ok(UniInnerResult::Handled)
583            }
584
585            Ok(UniStreamType::QpackEncoder) => {
586                log::trace!("H3 inbound uni: QPACK encoder stream ({filled} bytes pre-read)");
587                let mut reader = Prepended {
588                    head: &buf[..filled],
589                    tail: stream,
590                };
591
592                log::trace!("QPACK encoder stream: started");
593                self.decoder_dynamic_table.run_reader(&mut reader).await?;
594
595                Ok(UniInnerResult::Handled)
596            }
597
598            Ok(UniStreamType::QpackDecoder) => {
599                log::trace!("H3 inbound uni: QPACK decoder stream ({filled} bytes pre-read)");
600                let mut reader = Prepended {
601                    head: &buf[..filled],
602                    tail: stream,
603                };
604                self.encoder_dynamic_table.run_reader(&mut reader).await?;
605                Ok(UniInnerResult::Handled)
606            }
607
608            Ok(UniStreamType::WebTransport) => {
609                log::trace!("H3 inbound uni: WebTransport stream");
610                let session_id =
611                    read(
612                        &mut buf,
613                        &mut filled,
614                        stream,
615                        |data| match quic_varint::decode(data) {
616                            Ok(ok) => Ok(Some(ok)),
617                            Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
618                            Err(QuicVarIntError::UnknownValue { bytes, value }) => {
619                                Ok(Some((value, bytes)))
620                            }
621                        },
622                    )
623                    .await?;
624
625                buf.truncate(filled);
626
627                Ok(UniInnerResult::WebTransport {
628                    session_id,
629                    buffer: buf.into(),
630                })
631            }
632
633            Ok(UniStreamType::Push) => {
634                // Push streams are server→client per RFC 9114 §4.6. Trillium does
635                // not support HTTP/3 push as initiator or recipient, so we hand
636                // these back as `Unknown` for the caller to dispose of identically
637                // to truly-unknown stream types — the explicit arm exists so trace
638                // output names "push stream" rather than a bare type id.
639                log::trace!("H3 inbound uni: push stream (push not supported)");
640                Ok(UniInnerResult::Unknown { stream_type })
641            }
642
643            Err(_) => {
644                log::trace!("H3 inbound uni: unknown stream type {stream_type:#x}");
645                Ok(UniInnerResult::Unknown { stream_type })
646            }
647        }
648    }
649
650    /// Handle the http/3 peer's inbound control stream.
651    ///
652    /// # Errors
653    ///
654    /// Returns a `H3Error` in case of io error or HTTP/3 semantic error.
655    // The first frame must be SETTINGS. After that, watches for
656    // GOAWAY to initiate connection shutdown.
657    async fn run_inbound_control<T>(
658        &self,
659        buf: &mut Vec<u8>,
660        filled: &mut usize,
661        stream: &mut T,
662    ) -> Result<(), H3Error>
663    where
664        T: AsyncRead + Unpin + Send,
665    {
666        // First frame must be SETTINGS (§6.2.1). A non-SETTINGS first frame OR a malformed
667        // first frame whose payload doesn't decode are both H3_MISSING_SETTINGS. *But* if
668        // the first frame is a SETTINGS frame whose payload is itself invalid (e.g.
669        // forbidden HTTP/2 setting IDs per §7.2.4.1), that's H3_SETTINGS_ERROR — preserve.
670        let settings = read(buf, filled, stream, |data| match Frame::decode(data) {
671            Ok((Frame::Settings(s), consumed)) => Ok(Some((s, consumed))),
672            Err(FrameDecodeError::Incomplete) => Ok(None),
673            Err(FrameDecodeError::Error(H3ErrorCode::SettingsError)) => {
674                Err(H3ErrorCode::SettingsError)
675            }
676            Ok(_) | Err(FrameDecodeError::Error(_)) => Err(H3ErrorCode::MissingSettings),
677        })
678        .await
679        .map_err(map_critical_stream_eof)?;
680
681        log::trace!("H3 peer settings: {settings:?}");
682
683        self.peer_settings
684            .set(settings)
685            .map_err(|_| H3ErrorCode::FrameUnexpected)?;
686        self.wake_peer_settings_waiters();
687
688        self.encoder_dynamic_table
689            .initialize_from_peer_settings(settings);
690
691        // Read subsequent frames, watching for GOAWAY
692        loop {
693            let frame = self
694                .swansong
695                .interrupt(read(buf, filled, stream, |data| {
696                    match Frame::decode(data) {
697                        Ok((frame, consumed)) => Ok(Some((frame, consumed))),
698                        Err(FrameDecodeError::Incomplete) => Ok(None),
699                        Err(FrameDecodeError::Error(code)) => Err(code),
700                    }
701                }))
702                .await
703                .transpose()
704                .map_err(map_critical_stream_eof)?;
705
706            match frame {
707                None => {
708                    log::trace!("H3 control stream: interrupted by shutdown");
709                    return Ok(());
710                }
711
712                Some(Frame::Goaway(id)) => {
713                    log::trace!("H3 control stream: peer sent GOAWAY(stream_id={id})");
714                    self.swansong.shut_down();
715                    return Ok(());
716                }
717
718                Some(Frame::Unknown(n)) => {
719                    // RFC 9114 §7.2.8: unknown frame types MUST be ignored.
720                    // We must also consume the payload bytes so the stream stays synchronized.
721                    log::trace!("H3 control stream: skipping unknown frame (payload {n} bytes)");
722                    let n = usize::try_from(n).unwrap_or(usize::MAX);
723                    let in_buf = n.min(*filled);
724                    buf.copy_within(in_buf..*filled, 0);
725                    *filled -= in_buf;
726                    let mut todo = n - in_buf;
727                    let mut scratch = [0u8; 256];
728                    while todo > 0 {
729                        let to_read = todo.min(scratch.len());
730                        let n = stream
731                            .read(&mut scratch[..to_read])
732                            .await
733                            .map_err(H3Error::Io)?;
734                        if n == 0 {
735                            return Err(H3ErrorCode::ClosedCriticalStream.into());
736                        }
737                        todo -= n;
738                    }
739                }
740
741                // RFC 9114 §7.2.4: a second SETTINGS frame is H3_FRAME_UNEXPECTED.
742                // RFC 9114 §7.2.1 / §7.2.2 / §7.2.5: DATA, HEADERS, and PUSH_PROMISE are
743                // not permitted on the control stream; same H3_FRAME_UNEXPECTED. The
744                // WebTransport bidi-signal (0x41) similarly has no business here.
745                Some(
746                    Frame::Settings(_)
747                    | Frame::Data(_)
748                    | Frame::Headers(_)
749                    | Frame::PushPromise { .. }
750                    | Frame::WebTransport(_),
751                ) => {
752                    return Err(H3ErrorCode::FrameUnexpected.into());
753                }
754
755                // CANCEL_PUSH and MAX_PUSH_ID are valid control-stream frames; we don't push,
756                // so we ignore them.
757                Some(Frame::CancelPush(_) | Frame::MaxPushId(_)) => {
758                    log::trace!("H3 control stream: ignoring {frame:?}");
759                }
760            }
761        }
762    }
763}
764
765/// Map an `UnexpectedEof` I/O error (the `read` helper's "stream FIN'd" signal) to
766/// `H3_CLOSED_CRITICAL_STREAM`. RFC 9114 §6.2.1 forbids closure of the control stream;
767/// closure of either QPACK side-channel is the same connection error per RFC 9204 §4.2.
768/// Other I/O errors and any protocol error are passed through unchanged.
769fn map_critical_stream_eof(error: H3Error) -> H3Error {
770    match error {
771        H3Error::Io(e) if e.kind() == ErrorKind::UnexpectedEof => {
772            H3ErrorCode::ClosedCriticalStream.into()
773        }
774        other => other,
775    }
776}
777
778async fn write(
779    buf: &mut Vec<u8>,
780    mut stream: impl AsyncWrite + Unpin + Send,
781    mut f: impl FnMut(&mut [u8]) -> Option<usize>,
782) -> io::Result<usize> {
783    let written = loop {
784        if let Some(w) = f(buf) {
785            break w;
786        }
787        if buf.len() >= MAX_BUFFER_SIZE {
788            return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation"));
789        }
790        buf.resize(buf.len() * 2, 0);
791    };
792
793    stream.write_all(&buf[..written]).await?;
794    stream.flush().await?;
795    Ok(written)
796}
797
798/// An `AsyncRead` adapter that drains a byte slice before reading from an inner stream.
799///
800/// Used in `process_inbound_uni` to replay bytes that were read ahead while
801/// parsing the stream-type varint before dispatching to `run_inbound_encoder`.
802struct Prepended<'a, T> {
803    head: &'a [u8],
804    tail: T,
805}
806
807impl<T: AsyncRead + Unpin> AsyncRead for Prepended<'_, T> {
808    fn poll_read(
809        self: Pin<&mut Self>,
810        cx: &mut Context<'_>,
811        out: &mut [u8],
812    ) -> Poll<io::Result<usize>> {
813        let this = self.get_mut();
814        if !this.head.is_empty() {
815            let n = this.head.len().min(out.len());
816            out[..n].copy_from_slice(&this.head[..n]);
817            this.head = &this.head[n..];
818            return Poll::Ready(Ok(n));
819        }
820        Pin::new(&mut this.tail).poll_read(cx, out)
821    }
822}
823
824/// Read from `stream` into `buf` until `f` can decode a value.
825///
826/// `f` receives the filled portion of the buffer and returns:
827/// - `Ok(Some((value, consumed)))` — success; consumed bytes are removed from the front
828/// - `Ok(None)` — need more data; reads more bytes and retries
829/// - `Err(e)` — unrecoverable error; propagated to caller
830async fn read<R>(
831    buf: &mut Vec<u8>,
832    filled: &mut usize,
833    stream: &mut (impl AsyncRead + Unpin + Send),
834    f: impl Fn(&[u8]) -> Result<Option<(R, usize)>, H3ErrorCode>,
835) -> Result<R, H3Error> {
836    loop {
837        if let Some((result, consumed)) = f(&buf[..*filled])? {
838            buf.copy_within(consumed..*filled, 0);
839            *filled -= consumed;
840            return Ok(result);
841        }
842
843        if *filled >= buf.len() {
844            if buf.len() >= MAX_BUFFER_SIZE {
845                return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation").into());
846            }
847            buf.resize(buf.len() * 2, 0);
848        }
849
850        let n = stream.read(&mut buf[*filled..]).await?;
851        if n == 0 {
852            return Err(io::Error::new(ErrorKind::UnexpectedEof, "stream closed").into());
853        }
854        *filled += n;
855    }
856}