Skip to main content

trillium_http/h3/
connection.rs

1mod peer_settings_wait;
2
3use super::{
4    H3Error,
5    frame::{Frame, FrameDecodeError, UniStreamType},
6    quic_varint::{self, QuicVarIntError},
7    settings::H3Settings,
8};
9use crate::{
10    Buffer, Conn, HttpContext, KnownHeaderName, Priority,
11    conn::H3FirstFrame,
12    h3::{H3ErrorCode, MAX_BUFFER_SIZE},
13    headers::qpack::{DecoderDynamicTable, EncoderDynamicTable, FieldSection},
14};
15use event_listener::Event;
16use futures_lite::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
17use std::{
18    future::{Future, IntoFuture},
19    io::{self, ErrorKind},
20    pin::Pin,
21    sync::{
22        Arc, OnceLock,
23        atomic::{AtomicBool, AtomicU64, Ordering},
24    },
25    task::{Context, Poll},
26};
27use swansong::{ShutdownCompletion, Swansong};
28
29/// The result of processing an HTTP/3 bidirectional stream.
30#[derive(Debug)]
31#[allow(
32    clippy::large_enum_variant,
33    reason = "Request is the hot path; boxing it would add an allocation per request"
34)]
35pub enum H3StreamResult<Transport> {
36    /// The stream carried a normal HTTP/3 request.
37    Request(Conn<Transport>),
38
39    /// The stream carries a WebTransport bidirectional data stream. The `session_id` identifies
40    /// the associated WebTransport session.
41    WebTransport {
42        /// The WebTransport session ID (stream ID of the CONNECT request).
43        session_id: u64,
44        /// The underlying transport, ready for application data.
45        transport: Transport,
46        /// Any bytes buffered after the session ID during stream negotiation.
47        buffer: Buffer,
48    },
49}
50
51/// Inner-loop result of [`H3Connection::process_inbound_uni_with_close`] before the recv
52/// stream is reattached. Decouples the inner async block (which only borrows the stream)
53/// from the caller-visible [`UniStreamResult`] (which returns the stream by value on
54/// non-`Handled` variants), so the function can keep ownership of `stream` long enough to
55/// fire its close callback before `stream` drops.
56enum UniInnerResult {
57    Handled,
58    WebTransport { session_id: u64, buffer: Buffer },
59    Unknown { stream_type: u64 },
60}
61
62/// The result of processing an HTTP/3 unidirectional stream.
63#[derive(Debug)]
64pub enum UniStreamResult<T> {
65    /// The stream was a known internal type (control, QPACK encoder/decoder) and was handled
66    /// automatically.
67    Handled,
68
69    /// A WebTransport unidirectional data stream. The `session_id` identifies the associated
70    /// WebTransport session.
71    WebTransport {
72        /// The WebTransport session ID.
73        session_id: u64,
74        /// The receive stream, ready for application data.
75        stream: T,
76        /// Any bytes buffered after the session ID during stream negotiation.
77        buffer: Buffer,
78    },
79
80    /// A stream whose type is recognized but unsupported (e.g. `Push`) or not recognized
81    /// at all by this crate.
82    ///
83    /// The caller is responsible for disposing of the stream — the in-tree consumers RST
84    /// it with `H3_STREAM_CREATION_ERROR`. `process_inbound_uni` deliberately does *not*
85    /// close the stream itself: handing it back gives a downstream extension the option to
86    /// implement a stream type trillium-http doesn't know about (a future RFC, an
87    /// experiment, etc.) without forking the codec.
88    Unknown {
89        /// The raw stream type value.
90        stream_type: u64,
91        /// The stream.
92        stream: T,
93    },
94}
95
96/// Shared state for a single HTTP/3 QUIC connection.
97///
98/// Call the appropriate methods on this type for each stream accepted from the QUIC connection.
99///
100/// # Driver shape (vs h2)
101///
102/// h2 multiplexes everything onto a single TCP byte stream, so a single
103/// [`H2Driver`][crate::h2::H2Driver] task suffices. h3 instead has the QUIC layer hand us multiple
104/// independent streams: an inbound and outbound control stream, an inbound and outbound QPACK
105/// encoder stream, an inbound and outbound QPACK decoder stream, and one bidi stream per
106/// request. There is no single "h3 driver" — each stream is driven by its own future returned from
107/// `H3Connection`'s `run_*` / `process_*` methods, and the caller decides how those futures are
108/// scheduled.
109///
110/// The trillium-http boundary is **runtime-free by design**: this crate hands out anonymous futures
111/// and lets the caller pick the executor. The in-tree consumers (`trillium-server-common`,
112/// `trillium-client`) follow a task-per-stream pattern — spawn each long-lived control / encoder /
113/// decoder future on its own task at connection setup, then spawn one task per accepted request
114/// stream. Nothing in this crate requires that pattern; a caller could in principle race all the
115/// futures on one task instead, with different perf characteristics.
116#[derive(Debug)]
117pub struct H3Connection {
118    /// Shared configuration across all protocols.
119    context: Arc<HttpContext>,
120
121    /// Connection-scoped shutdown signal. Shut down when we receive GOAWAY from the peer or when
122    /// the server-level Swansong shuts down.  Request stream tasks use this to interrupt
123    /// in-progress work.
124    swansong: Swansong,
125
126    /// The peer's H3 settings, received on their control stream.  Request streams may need to
127    /// consult these (e.g. max field section size).
128    pub(super) peer_settings: OnceLock<H3Settings>,
129
130    /// Multi-listener wake source for
131    /// [`PeerSettingsReady`][peer_settings_wait::PeerSettingsReady]. Notified by
132    /// `run_inbound_control` after applying peer SETTINGS, and again on connection
133    /// close, so any number of concurrently-parked futures all unblock together.
134    pub(super) peer_settings_event: Event,
135
136    /// The highest bidirectional stream ID we have accepted.  Used to compute the GOAWAY value
137    /// (this + 4) to tell the peer which requests we saw. None until the first stream is accepted.
138    /// Updated by the runtime adapter's accept loop via [`record_accepted_stream`].
139    max_accepted_stream_id: AtomicU64,
140
141    /// Whether we have accepted any streams yet.
142    has_accepted_stream: AtomicBool,
143
144    /// The decoder-side QPACK dynamic table for this connection.
145    decoder_dynamic_table: DecoderDynamicTable,
146
147    /// The encoder-side QPACK dynamic table for this connection.
148    encoder_dynamic_table: EncoderDynamicTable,
149
150    /// Sink for RFC 9218 priority signals, set via
151    /// [`register_priority_callback`][Self::register_priority_callback]. Unset until the runtime
152    /// adapter that owns the QUIC streams registers it.
153    priority_callback: PriorityCallback,
154}
155
156/// Boxed sink for `(stream_id, priority, is_update)` signals.
157type PriorityCallbackFn = Box<dyn Fn(u64, Priority, bool) + Send + Sync>;
158
159/// A registered sink for `(stream_id, priority, is_update)` signals. Newtype so [`H3Connection`]
160/// can keep deriving `Debug` despite holding a boxed closure.
161#[derive(Default)]
162struct PriorityCallback(OnceLock<PriorityCallbackFn>);
163
164impl std::fmt::Debug for PriorityCallback {
165    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166        f.debug_tuple("PriorityCallback")
167            .field(&self.0.get().map(|_| format_args!("..")))
168            .finish()
169    }
170}
171
172impl H3Connection {
173    /// Construct a new `H3Connection` to manage HTTP/3 for a given peer.
174    pub fn new(context: Arc<HttpContext>) -> Arc<Self> {
175        let swansong = context.swansong.child();
176        let max_table_capacity = context.config.dynamic_table_capacity;
177        let blocked_streams = context.config.h3_blocked_streams;
178        let encoder_dynamic_table = EncoderDynamicTable::new(&context);
179        Arc::new(Self {
180            context,
181            swansong,
182            peer_settings: OnceLock::new(),
183            peer_settings_event: Event::new(),
184            max_accepted_stream_id: AtomicU64::new(0),
185            has_accepted_stream: AtomicBool::new(false),
186            decoder_dynamic_table: DecoderDynamicTable::new(max_table_capacity, blocked_streams),
187            encoder_dynamic_table,
188            priority_callback: PriorityCallback::default(),
189        })
190    }
191
192    /// Register the sink for RFC 9218 priority signals on this connection.
193    ///
194    /// The callback is invoked with `(stream_id, priority, is_update)` once per request when its
195    /// initial `priority` header is parsed (`is_update = false`), and again for every
196    /// `PRIORITY_UPDATE` received afterward (`is_update = true`). `is_update` lets the receiver
197    /// honor RFC 9218 precedence: a `PRIORITY_UPDATE` outranks the request's initial header
198    /// priority regardless of arrival order, including when it arrives before the stream is
199    /// accepted.
200    ///
201    /// This crate keeps no priority state of its own and does no scheduling: it parses each
202    /// signal and hands the [`Priority`] off through this callback, leaving the receiver to apply
203    /// it to whatever owns send scheduling. Without a registered callback, priority is parsed but
204    /// never applied.
205    ///
206    /// Has no effect if a callback is already registered.
207    pub fn register_priority_callback(
208        &self,
209        callback: impl Fn(u64, Priority, bool) + Send + Sync + 'static,
210    ) {
211        let _ = self.priority_callback.0.set(Box::new(callback));
212    }
213
214    /// Emit a priority signal for a request stream to the registered callback, if any.
215    /// `is_update` distinguishes a received `PRIORITY_UPDATE` from the request's initial header
216    /// priority so the receiver can honor the precedence rule.
217    fn emit_priority(&self, stream_id: u64, priority: Priority, is_update: bool) {
218        let kind = if is_update {
219            "PRIORITY_UPDATE"
220        } else {
221            "initial"
222        };
223        match self.priority_callback.0.get() {
224            Some(callback) => {
225                log::trace!("H3 stream {stream_id}: emitting {kind} priority \"{priority}\"");
226                callback(stream_id, priority, is_update);
227            }
228            None => log::trace!(
229                "H3 stream {stream_id}: {kind} priority \"{priority}\" parsed but no callback \
230                 registered"
231            ),
232        }
233    }
234
235    /// Handle an RFC 9218 `PRIORITY_UPDATE` received on the peer's control stream. The
236    /// prioritized element id must name a client-initiated bidirectional (request) stream —
237    /// `id % 4 == 0` in QUIC — and other ids are ignored rather than erroring, since the signal
238    /// is advisory.
239    fn emit_priority_update(&self, prioritized_element_id: u64, priority: Priority) {
240        if prioritized_element_id.is_multiple_of(4) {
241            self.emit_priority(prioritized_element_id, priority, true);
242        } else {
243            log::trace!(
244                "H3: ignoring PRIORITY_UPDATE for non-request stream {prioritized_element_id}"
245            );
246        }
247    }
248
249    /// Retrieve the [`Swansong`] shutdown handle for this HTTP/3 connection. See also
250    /// [`H3Connection::shut_down`]
251    pub fn swansong(&self) -> &Swansong {
252        &self.swansong
253    }
254
255    /// Attempt graceful shutdown of this HTTP/3 connection (all streams).
256    ///
257    /// The returned [`ShutdownCompletion`] type can
258    /// either be awaited in an async context or blocked on with [`ShutdownCompletion::block`] in a
259    /// blocking context
260    ///
261    /// Note that this will NOT shut down the server. To shut down the whole server, use
262    /// [`HttpContext::shut_down`]
263    pub fn shut_down(&self) -> ShutdownCompletion {
264        // Wake any in-flight `decode_field_section` calls parked on the decoder
265        // table's `ThresholdWait` (a non-I/O future awaiting dynamic-table inserts
266        // from the peer). The encoder table's writer loop is already swansong-
267        // aware, but we mark it failed too for symmetry: any future state
268        // mutations after shutdown are no longer wire-relevant.
269        self.decoder_dynamic_table.fail(H3ErrorCode::NoError);
270        self.encoder_dynamic_table.fail(H3ErrorCode::NoError);
271        self.wake_peer_settings_waiters();
272        self.swansong.shut_down()
273    }
274
275    /// Retrieve the [`HttpContext`] for this server.
276    pub fn context(&self) -> Arc<HttpContext> {
277        self.context.clone()
278    }
279
280    /// Returns the peer's HTTP/3 settings, available once the peer's control stream has been
281    /// processed.
282    pub fn peer_settings(&self) -> Option<&H3Settings> {
283        self.peer_settings.get()
284    }
285
286    /// Record that we accepted a bidirectional stream with this ID.
287    fn record_accepted_stream(&self, stream_id: u64) {
288        self.max_accepted_stream_id
289            .fetch_max(stream_id, Ordering::Relaxed);
290        self.has_accepted_stream.store(true, Ordering::Relaxed);
291    }
292
293    /// The stream ID to send in a GOAWAY frame: one past the highest stream we accepted, or 0 if we
294    /// haven't accepted any.
295    fn goaway_id(&self) -> u64 {
296        if self.has_accepted_stream.load(Ordering::Relaxed) {
297            self.max_accepted_stream_id.load(Ordering::Relaxed) + 4
298        } else {
299            0
300        }
301    }
302
303    /// Begin processing a single HTTP/3 request-response cycle on an accepted bidirectional
304    /// stream.
305    ///
306    /// Returns a builder. Attach an optional reset hook with
307    /// [`with_reset`][H3BidiRequest::with_reset], then `.await` it to run one request/response
308    /// cycle. Awaiting resolves to [`H3StreamResult::WebTransport`] if the stream opens a
309    /// WebTransport session rather than a standard HTTP/3 request.
310    ///
311    /// Without a reset hook, a stream-level protocol error drops the transport without
312    /// resetting it; attach `with_reset` to issue the RST that RFC 9114 requires for stream
313    /// errors.
314    ///
315    /// RFC 9218 priority is delivered out of band via the callback registered with
316    /// [`register_priority_callback`][Self::register_priority_callback]: this method emits the
317    /// request's initial priority once the headers are parsed.
318    pub fn process_inbound_bidi<Transport, Handler>(
319        self: Arc<Self>,
320        transport: Transport,
321        handler: Handler,
322        stream_id: u64,
323    ) -> H3BidiRequest<Transport, Handler> {
324        H3BidiRequest {
325            h3: self,
326            transport,
327            handler,
328            stream_id,
329            reset: None,
330        }
331    }
332
333    /// Process a single HTTP/3 request-response cycle on a bidirectional stream, calling
334    /// `reset` to issue a stream RST when a stream-level protocol error occurs.
335    ///
336    /// On any `H3Error::Protocol(code)` produced by first-frame processing (HEADERS decode,
337    /// pseudo-header validation, etc.), `reset` is invoked with the still-owned transport and
338    /// the error code before the error is returned. This lets callers RST both the recv and
339    /// send halves of the bidi stream — required by RFC 9114 for stream errors like
340    /// `H3_MESSAGE_ERROR`. I/O errors and successful runs do not invoke `reset`.
341    ///
342    /// `reset` is a `FnOnce` taking `(&mut Transport, H3ErrorCode)`. trillium-http does not
343    /// itself depend on any reset capability of the transport; callers wire up the actual
344    /// stream-RST mechanism (e.g. quinn's `RecvStream::stop` + `SendStream::reset`) inside
345    /// the closure.
346    ///
347    /// # Errors
348    ///
349    /// Returns an `H3Error` in case of io error or http/3 semantic error.
350    // This is not deprecated yet because it didn't make sense to release a new version of
351    // trillium-client just to avoid this deprecation, but the intention is to deprecate
352    pub async fn process_inbound_bidi_with_reset<Transport, Handler, Fut, Reset>(
353        self: Arc<Self>,
354        mut transport: Transport,
355        handler: Handler,
356        stream_id: u64,
357        reset: Reset,
358    ) -> Result<H3StreamResult<Transport>, H3Error>
359    where
360        Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
361        Handler: FnOnce(Conn<Transport>) -> Fut,
362        Fut: Future<Output = Conn<Transport>>,
363        Reset: FnOnce(&mut Transport, H3ErrorCode),
364    {
365        self.record_accepted_stream(stream_id);
366        let _guard = self.swansong.guard();
367        let mut buffer: Buffer =
368            Vec::with_capacity(self.context.config.request_buffer_initial_len).into();
369
370        let outcome =
371            Conn::process_first_frame_h3(&self, &mut transport, &mut buffer, stream_id).await;
372
373        match outcome {
374            Ok(H3FirstFrame::Request {
375                validated,
376                start_time,
377            }) => {
378                let conn =
379                    Conn::build_h3(self, transport, buffer, validated, start_time, stream_id);
380                Ok(H3StreamResult::Request(
381                    handler(conn).await.send_h3().await?,
382                ))
383            }
384            Ok(H3FirstFrame::WebTransport { session_id }) => Ok(H3StreamResult::WebTransport {
385                session_id,
386                transport,
387                buffer,
388            }),
389            Err(error) => {
390                if let H3Error::Protocol(code) = &error {
391                    reset(&mut transport, *code);
392                }
393                Err(error)
394            }
395        }
396    }
397
398    /// Decode a QPACK-encoded field section, consulting the dynamic table as needed.
399    ///
400    /// If the field section's Required Insert Count is greater than zero, waits until the
401    /// dynamic table has received enough entries. Returns an error on protocol violations or
402    /// if the encoder stream fails while waiting.
403    ///
404    /// Duplicate pseudo-headers are silently ignored (first value wins). Unknown
405    /// pseudo-headers are rejected.
406    ///
407    /// # Errors
408    ///
409    /// Returns an error if the encoded bytes cannot be parsed as a valid field section.
410    #[cfg(feature = "unstable")]
411    pub async fn decode_field_section(
412        &self,
413        encoded: &[u8],
414        stream_id: u64,
415    ) -> Result<FieldSection<'static>, H3Error> {
416        self.decoder_dynamic_table.decode(encoded, stream_id).await
417    }
418
419    #[cfg(not(feature = "unstable"))]
420    pub(crate) async fn decode_field_section(
421        &self,
422        encoded: &[u8],
423        stream_id: u64,
424    ) -> Result<FieldSection<'static>, H3Error> {
425        self.decoder_dynamic_table.decode(encoded, stream_id).await
426    }
427
428    /// Encode a QPACK field section from pseudo-headers and headers, consulting the encoder
429    /// dynamic table to emit literal-with-name-reference or indexed representations as the
430    /// table's contents allow.
431    ///
432    /// # Errors
433    ///
434    /// Returns an `H3Error` in case of http/3 semantic error.
435    #[cfg(feature = "unstable")]
436    #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
437    pub fn encode_field_section(
438        &self,
439        field_section: &FieldSection<'_>,
440        buf: &mut Vec<u8>,
441        stream_id: u64,
442    ) -> Result<(), H3Error> {
443        self.encoder_dynamic_table
444            .encode(field_section, buf, stream_id);
445        Ok(())
446    }
447
448    #[cfg(not(feature = "unstable"))]
449    #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
450    pub(crate) fn encode_field_section(
451        &self,
452        field_section: &FieldSection<'_>,
453        buf: &mut Vec<u8>,
454        stream_id: u64,
455    ) -> Result<(), H3Error> {
456        self.encoder_dynamic_table
457            .encode(field_section, buf, stream_id);
458        Ok(())
459    }
460
461    /// Run this connection's HTTP/3 outbound control stream.
462    ///
463    /// Sends the initial SETTINGS frame, then sends GOAWAY when the connection shuts down.
464    /// Returns after GOAWAY is sent; keep the stream open until the QUIC connection closes
465    /// (closing a control stream is a connection error).
466    ///
467    /// # Errors
468    ///
469    /// Returns an `H3Error` in case of io error or http/3 semantic error.
470    pub async fn run_outbound_control<T>(&self, mut stream: T) -> Result<(), H3Error>
471    where
472        T: AsyncWrite + Unpin + Send,
473    {
474        let mut buf = vec![0; 128];
475
476        let settings = Frame::Settings(H3Settings::from(&self.context.config));
477        log::trace!(
478            "H3 outbound control: sending SETTINGS: {:?}",
479            H3Settings::from(&self.context.config)
480        );
481
482        write(&mut buf, &mut stream, |buf| {
483            let mut written = quic_varint::encode(UniStreamType::Control, buf)?;
484            written += settings.encode(&mut buf[written..])?;
485            Some(written)
486        })
487        .await?;
488        log::trace!("H3 outbound control: SETTINGS sent");
489
490        self.swansong.clone().await;
491
492        write(&mut buf, &mut stream, |buf| {
493            Frame::Goaway(self.goaway_id()).encode(buf)
494        })
495        .await?;
496
497        Ok(())
498    }
499
500    /// Run the outbound QPACK encoder stream for the duration of the connection.
501    ///
502    /// Writes the stream type byte, then drains encoder-stream instructions from the encoder
503    /// dynamic table as they are enqueued. Returns when the connection shuts down or the table is
504    /// marked failed.
505    ///
506    /// # Errors
507    ///
508    /// Returns an `H3Error` in case of io error.
509    pub async fn run_encoder<T>(&self, mut stream: T) -> Result<(), H3Error>
510    where
511        T: AsyncWrite + Unpin + Send,
512    {
513        self.encoder_dynamic_table
514            .run_writer(&mut stream, self.swansong.clone())
515            .await
516    }
517
518    /// Run the outbound QPACK decoder stream for the duration of the connection.
519    ///
520    /// Writes the stream type byte, then loops sending Section Acknowledgement and Insert
521    /// Count Increment instructions as they become needed. Returns when the connection
522    /// shuts down.
523    ///
524    /// # Errors
525    ///
526    /// Returns an `H3Error` in case of io error or http/3 semantic error.
527    pub async fn run_decoder<T>(&self, mut stream: T) -> Result<(), H3Error>
528    where
529        T: AsyncWrite + Unpin + Send,
530    {
531        self.decoder_dynamic_table
532            .run_writer(&mut stream, self.swansong.clone())
533            .await
534    }
535
536    /// Handle an inbound unidirectional HTTP/3 stream from the peer.
537    ///
538    /// Internal stream types (control, QPACK encoder/decoder) are handled automatically;
539    /// application streams are returned via [`UniStreamResult`] for the caller to process.
540    ///
541    /// On a connection-level protocol error, this method drops the recv stream before
542    /// the caller can react. Quinn's `RecvStream::drop` then sends `STOP_SENDING`, which
543    /// races against the caller's `connection.close` — if the peer responds with a
544    /// malformed `RESET_STREAM` (notably `final_offset = 0`) before our app close is
545    /// applied, the transport-level error overrides our app error code on the wire.
546    /// Use [`process_inbound_uni_with_close`] to thread the close call through the
547    /// function so it fires before the stream drops.
548    ///
549    /// [`process_inbound_uni_with_close`]: Self::process_inbound_uni_with_close
550    ///
551    /// # Errors
552    ///
553    /// Returns a `H3Error` in case of io error or http/3 semantic error.
554    #[deprecated(
555        since = "1.2.0",
556        note = "use `process_inbound_uni_with_close` so connection-level protocol errors close \
557                the QUIC connection before the recv stream drops, avoiding a `FINAL_SIZE_ERROR` \
558                race with the peer's response to STOP_SENDING"
559    )]
560    pub async fn process_inbound_uni<T>(&self, stream: T) -> Result<UniStreamResult<T>, H3Error>
561    where
562        T: AsyncRead + Unpin + Send,
563    {
564        self.process_inbound_uni_with_close(stream, |_| {}).await
565    }
566
567    /// Handle an inbound unidirectional HTTP/3 stream from the peer, calling `on_close` to
568    /// close the QUIC connection if a connection-level protocol error is detected.
569    ///
570    /// Identical to [`process_inbound_uni`][Self::process_inbound_uni] except that on
571    /// any `H3Error::Protocol(code)` whose code is a connection-level error (RFC 9114,
572    /// RFC 9204), `on_close` is invoked with that code while the recv stream is still alive. This
573    /// lets callers send a `CONNECTION_CLOSE` before the stream drops — if the close call sets
574    /// quinn's `conn.error`, quinn's `RecvStream::drop` skips `STOP_SENDING`, eliminating a
575    /// peer race that otherwise causes `FINAL_SIZE_ERROR` to override the app error code.
576    ///
577    /// `on_close` is a `FnOnce` taking `H3ErrorCode`. trillium-http does not itself
578    /// hold the QUIC connection; callers wire up the actual `connection.close()` call
579    /// inside the closure (e.g. quinn's `Connection::close`).
580    ///
581    /// # Errors
582    ///
583    /// Returns a `H3Error` in case of io error or http/3 semantic error.
584    pub async fn process_inbound_uni_with_close<T, OnClose>(
585        &self,
586        mut stream: T,
587        on_close: OnClose,
588    ) -> Result<UniStreamResult<T>, H3Error>
589    where
590        T: AsyncRead + Unpin + Send,
591        OnClose: FnOnce(H3ErrorCode),
592    {
593        let inner = self
594            .swansong
595            .interrupt(self.process_inbound_uni_inner(&mut stream))
596            .await
597            .unwrap_or(Ok(UniInnerResult::Handled)); // interrupted
598
599        match inner {
600            Ok(UniInnerResult::Handled) => Ok(UniStreamResult::Handled),
601            Ok(UniInnerResult::WebTransport { session_id, buffer }) => {
602                Ok(UniStreamResult::WebTransport {
603                    session_id,
604                    stream,
605                    buffer,
606                })
607            }
608            Ok(UniInnerResult::Unknown { stream_type }) => Ok(UniStreamResult::Unknown {
609                stream_type,
610                stream,
611            }),
612            Err(error) => {
613                // Fire `on_close` BEFORE returning so the caller's connection.close
614                // call sets quinn's `conn.error` while `stream` is still alive. When
615                // `stream` then drops at function return, quinn's `RecvStream::drop`
616                // skips STOP_SENDING — preventing the peer-RESET_STREAM race that
617                // otherwise replaces our app close code with FINAL_SIZE_ERROR.
618                if let H3Error::Protocol(code) = &error
619                    && code.is_connection_error()
620                {
621                    on_close(*code);
622                }
623                Err(error)
624            }
625        }
626    }
627
628    /// Inner-loop body of [`process_inbound_uni_with_close`][Self::process_inbound_uni_with_close].
629    /// Borrows `stream` so the outer function can keep ownership of it across the await,
630    /// which lets the caller's close callback fire before the recv stream drops.
631    async fn process_inbound_uni_inner<T>(&self, stream: &mut T) -> Result<UniInnerResult, H3Error>
632    where
633        T: AsyncRead + Unpin + Send,
634    {
635        let mut buf = vec![0; 128];
636        let mut filled = 0;
637
638        // Read stream type varint (decode as raw u64 to handle unknown types)
639        let stream_type = read(&mut buf, &mut filled, stream, |data| {
640            match quic_varint::decode(data) {
641                Ok(ok) => Ok(Some(ok)),
642                Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
643                // this branch is unreachable because u64 is always From<u64>
644                Err(QuicVarIntError::UnknownValue { bytes, value }) => Ok(Some((value, bytes))),
645            }
646        })
647        .await?;
648
649        match UniStreamType::try_from(stream_type) {
650            Ok(UniStreamType::Control) => {
651                log::trace!("H3 inbound uni: control stream");
652                self.run_inbound_control(&mut buf, &mut filled, stream)
653                    .await?;
654                Ok(UniInnerResult::Handled)
655            }
656
657            Ok(UniStreamType::QpackEncoder) => {
658                log::trace!("H3 inbound uni: QPACK encoder stream ({filled} bytes pre-read)");
659                let mut reader = Prepended {
660                    head: &buf[..filled],
661                    tail: stream,
662                };
663
664                log::trace!("QPACK encoder stream: started");
665                self.decoder_dynamic_table.run_reader(&mut reader).await?;
666
667                Ok(UniInnerResult::Handled)
668            }
669
670            Ok(UniStreamType::QpackDecoder) => {
671                log::trace!("H3 inbound uni: QPACK decoder stream ({filled} bytes pre-read)");
672                let mut reader = Prepended {
673                    head: &buf[..filled],
674                    tail: stream,
675                };
676                self.encoder_dynamic_table.run_reader(&mut reader).await?;
677                Ok(UniInnerResult::Handled)
678            }
679
680            Ok(UniStreamType::WebTransport) => {
681                log::trace!("H3 inbound uni: WebTransport stream");
682                let session_id =
683                    read(
684                        &mut buf,
685                        &mut filled,
686                        stream,
687                        |data| match quic_varint::decode(data) {
688                            Ok(ok) => Ok(Some(ok)),
689                            Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
690                            Err(QuicVarIntError::UnknownValue { bytes, value }) => {
691                                Ok(Some((value, bytes)))
692                            }
693                        },
694                    )
695                    .await?;
696
697                buf.truncate(filled);
698
699                Ok(UniInnerResult::WebTransport {
700                    session_id,
701                    buffer: buf.into(),
702                })
703            }
704
705            Ok(UniStreamType::Push) => {
706                // Trillium does not support HTTP/3 push, so we hand these back as `Unknown`
707                // identically to truly-unknown stream types — the explicit arm exists so
708                // trace output names "push stream" rather than a bare type id.
709                log::trace!("H3 inbound uni: push stream (push not supported)");
710                Ok(UniInnerResult::Unknown { stream_type })
711            }
712
713            Err(_) => {
714                log::trace!("H3 inbound uni: unknown stream type {stream_type:#x}");
715                Ok(UniInnerResult::Unknown { stream_type })
716            }
717        }
718    }
719
720    /// Handle the http/3 peer's inbound control stream.
721    ///
722    /// # Errors
723    ///
724    /// Returns a `H3Error` in case of io error or HTTP/3 semantic error.
725    async fn run_inbound_control<T>(
726        &self,
727        buf: &mut Vec<u8>,
728        filled: &mut usize,
729        stream: &mut T,
730    ) -> Result<(), H3Error>
731    where
732        T: AsyncRead + Unpin + Send,
733    {
734        // SettingsError takes priority: a SETTINGS frame whose payload is itself invalid
735        // (e.g. forbidden HTTP/2 setting IDs) is reported as SETTINGS_ERROR, not the
736        // MISSING_SETTINGS we report for everything else here.
737        let settings = read(buf, filled, stream, |data| match Frame::decode(data) {
738            Ok((Frame::Settings(s), consumed)) => Ok(Some((s, consumed))),
739            Err(FrameDecodeError::Incomplete) => Ok(None),
740            Err(FrameDecodeError::Error(H3ErrorCode::SettingsError)) => {
741                Err(H3ErrorCode::SettingsError)
742            }
743            Ok(_) | Err(FrameDecodeError::Error(_)) => Err(H3ErrorCode::MissingSettings),
744        })
745        .await
746        .map_err(map_critical_stream_eof)?;
747
748        log::trace!("H3 peer settings: {settings:?}");
749
750        self.peer_settings
751            .set(settings)
752            .map_err(|_| H3ErrorCode::FrameUnexpected)?;
753        self.wake_peer_settings_waiters();
754
755        self.encoder_dynamic_table
756            .initialize_from_peer_settings(settings);
757
758        loop {
759            let frame = self
760                .swansong
761                .interrupt(read(buf, filled, stream, |data| {
762                    match Frame::decode(data) {
763                        Ok((frame, consumed)) => Ok(Some((frame, consumed))),
764                        Err(FrameDecodeError::Incomplete) => Ok(None),
765                        Err(FrameDecodeError::Error(code)) => Err(code),
766                    }
767                }))
768                .await
769                .transpose()
770                .map_err(map_critical_stream_eof)?;
771
772            match frame {
773                None => {
774                    log::trace!("H3 control stream: interrupted by shutdown");
775                    return Ok(());
776                }
777
778                Some(Frame::Goaway(id)) => {
779                    log::trace!("H3 control stream: peer sent GOAWAY(stream_id={id})");
780                    self.swansong.shut_down();
781                    return Ok(());
782                }
783
784                Some(Frame::Unknown(n)) => {
785                    // Consume the payload bytes so the stream stays synchronized.
786                    log::trace!("H3 control stream: skipping unknown frame (payload {n} bytes)");
787                    let n = usize::try_from(n).unwrap_or(usize::MAX);
788                    let in_buf = n.min(*filled);
789                    buf.copy_within(in_buf..*filled, 0);
790                    *filled -= in_buf;
791                    let mut todo = n - in_buf;
792                    let mut scratch = [0u8; 256];
793                    while todo > 0 {
794                        let to_read = todo.min(scratch.len());
795                        let n = stream
796                            .read(&mut scratch[..to_read])
797                            .await
798                            .map_err(H3Error::Io)?;
799                        if n == 0 {
800                            return Err(H3ErrorCode::ClosedCriticalStream.into());
801                        }
802                        todo -= n;
803                    }
804                }
805
806                Some(
807                    Frame::Settings(_)
808                    | Frame::Data(_)
809                    | Frame::Headers(_)
810                    | Frame::PushPromise { .. }
811                    | Frame::WebTransport(_),
812                ) => {
813                    return Err(H3ErrorCode::FrameUnexpected.into());
814                }
815
816                Some(Frame::PriorityUpdate {
817                    prioritized_element_id,
818                    priority,
819                }) => {
820                    log::trace!(
821                        "H3 control stream: PRIORITY_UPDATE stream={prioritized_element_id} \
822                         priority=\"{priority}\""
823                    );
824                    self.emit_priority_update(prioritized_element_id, priority);
825                }
826
827                // Trillium doesn't implement push, so these are ignored rather than acted on.
828                Some(Frame::CancelPush(_) | Frame::MaxPushId(_)) => {
829                    log::trace!("H3 control stream: ignoring {frame:?}");
830                }
831            }
832        }
833    }
834}
835
836/// A pending HTTP/3 request-response cycle on one bidirectional stream, with optional
837/// per-stream hooks.
838///
839/// Built by [`H3Connection::process_inbound_bidi`]. Configure hooks with the `with_*`
840/// methods and `.await` it to run the cycle. New per-stream extension points are added as
841/// further `with_*` methods, so the entry point's required arguments never change.
842pub struct H3BidiRequest<Transport, Handler> {
843    h3: Arc<H3Connection>,
844    transport: Transport,
845    handler: Handler,
846    stream_id: u64,
847    reset: Option<ResetHook<Transport>>,
848}
849
850/// Per-stream reset hook: RST both halves with the still-owned transport on a stream-level error.
851type ResetHook<Transport> = Box<dyn FnOnce(&mut Transport, H3ErrorCode) + Send>;
852
853impl<Transport, Handler> std::fmt::Debug for H3BidiRequest<Transport, Handler> {
854    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
855        f.debug_struct("H3BidiRequest")
856            .field("stream_id", &self.stream_id)
857            .finish_non_exhaustive()
858    }
859}
860
861impl<Transport, Handler> H3BidiRequest<Transport, Handler> {
862    /// Issue a stream RST on a stream-level protocol error.
863    ///
864    /// On any `H3Error::Protocol(code)` from first-frame processing, `reset` is called with
865    /// the still-owned transport and the error code before the error is returned — letting the
866    /// caller RST both halves of the bidi stream as RFC 9114 requires. I/O errors and
867    /// successful runs do not invoke it. Without this hook, the transport is dropped without a
868    /// reset.
869    #[must_use]
870    pub fn with_reset<R>(mut self, reset: R) -> Self
871    where
872        R: FnOnce(&mut Transport, H3ErrorCode) + Send + 'static,
873    {
874        self.reset = Some(Box::new(reset));
875        self
876    }
877}
878
879impl<Transport, Handler, Fut> IntoFuture for H3BidiRequest<Transport, Handler>
880where
881    Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
882    Handler: FnOnce(Conn<Transport>) -> Fut + Send + 'static,
883    Fut: Future<Output = Conn<Transport>> + Send + 'static,
884{
885    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
886    type Output = Result<H3StreamResult<Transport>, H3Error>;
887
888    fn into_future(self) -> Self::IntoFuture {
889        Box::pin(async move {
890            let Self {
891                h3,
892                mut transport,
893                handler,
894                stream_id,
895                reset,
896            } = self;
897
898            h3.record_accepted_stream(stream_id);
899            let _guard = h3.swansong.guard();
900            let mut buffer: Buffer =
901                Vec::with_capacity(h3.context.config.request_buffer_initial_len).into();
902
903            let outcome =
904                Conn::process_first_frame_h3(&h3, &mut transport, &mut buffer, stream_id).await;
905
906            match outcome {
907                Ok(H3FirstFrame::Request {
908                    validated,
909                    start_time,
910                }) => {
911                    let initial_priority = validated
912                        .request_headers
913                        .get_str(KnownHeaderName::Priority)
914                        .map(Priority::parse)
915                        .unwrap_or_default();
916                    h3.emit_priority(stream_id, initial_priority, false);
917                    let conn =
918                        Conn::build_h3(h3, transport, buffer, validated, start_time, stream_id);
919                    Ok(H3StreamResult::Request(
920                        handler(conn).await.send_h3().await?,
921                    ))
922                }
923                Ok(H3FirstFrame::WebTransport { session_id }) => Ok(H3StreamResult::WebTransport {
924                    session_id,
925                    transport,
926                    buffer,
927                }),
928                Err(error) => {
929                    if let H3Error::Protocol(code) = &error
930                        && let Some(reset) = reset
931                    {
932                        reset(&mut transport, *code);
933                    }
934                    Err(error)
935                }
936            }
937        })
938    }
939}
940
941/// Map an `UnexpectedEof` I/O error (the `read` helper's "stream FIN'd" signal) to
942/// `H3_CLOSED_CRITICAL_STREAM`. Closure of the control stream or of either QPACK
943/// side-channel is a connection error. Other I/O errors and any protocol error are passed
944/// through unchanged.
945fn map_critical_stream_eof(error: H3Error) -> H3Error {
946    match error {
947        H3Error::Io(e) if e.kind() == ErrorKind::UnexpectedEof => {
948            H3ErrorCode::ClosedCriticalStream.into()
949        }
950        other => other,
951    }
952}
953
954async fn write(
955    buf: &mut Vec<u8>,
956    mut stream: impl AsyncWrite + Unpin + Send,
957    mut f: impl FnMut(&mut [u8]) -> Option<usize>,
958) -> io::Result<usize> {
959    let written = loop {
960        if let Some(w) = f(buf) {
961            break w;
962        }
963        if buf.len() >= MAX_BUFFER_SIZE {
964            return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation"));
965        }
966        buf.resize(buf.len() * 2, 0);
967    };
968
969    stream.write_all(&buf[..written]).await?;
970    stream.flush().await?;
971    Ok(written)
972}
973
974/// An `AsyncRead` adapter that drains a byte slice before reading from an inner stream.
975///
976/// Used to replay bytes that were read ahead while parsing a stream-type varint, before
977/// dispatching to the inner runner that consumes the rest of the stream.
978struct Prepended<'a, T> {
979    head: &'a [u8],
980    tail: T,
981}
982
983impl<T: AsyncRead + Unpin> AsyncRead for Prepended<'_, T> {
984    fn poll_read(
985        self: Pin<&mut Self>,
986        cx: &mut Context<'_>,
987        out: &mut [u8],
988    ) -> Poll<io::Result<usize>> {
989        let this = self.get_mut();
990        if !this.head.is_empty() {
991            let n = this.head.len().min(out.len());
992            out[..n].copy_from_slice(&this.head[..n]);
993            this.head = &this.head[n..];
994            return Poll::Ready(Ok(n));
995        }
996        Pin::new(&mut this.tail).poll_read(cx, out)
997    }
998}
999
1000/// Read from `stream` into `buf` until `f` can decode a value.
1001///
1002/// `f` receives the filled portion of the buffer and returns:
1003/// - `Ok(Some((value, consumed)))` — success; consumed bytes are removed from the front
1004/// - `Ok(None)` — need more data; reads more bytes and retries
1005/// - `Err(e)` — unrecoverable error; propagated to caller
1006async fn read<R>(
1007    buf: &mut Vec<u8>,
1008    filled: &mut usize,
1009    stream: &mut (impl AsyncRead + Unpin + Send),
1010    f: impl Fn(&[u8]) -> Result<Option<(R, usize)>, H3ErrorCode>,
1011) -> Result<R, H3Error> {
1012    loop {
1013        if let Some((result, consumed)) = f(&buf[..*filled])? {
1014            buf.copy_within(consumed..*filled, 0);
1015            *filled -= consumed;
1016            return Ok(result);
1017        }
1018
1019        if *filled >= buf.len() {
1020            if buf.len() >= MAX_BUFFER_SIZE {
1021                return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation").into());
1022            }
1023            buf.resize(buf.len() * 2, 0);
1024        }
1025
1026        let n = stream.read(&mut buf[*filled..]).await?;
1027        if n == 0 {
1028            return Err(io::Error::new(ErrorKind::UnexpectedEof, "stream closed").into());
1029        }
1030        *filled += n;
1031    }
1032}