Skip to main content

trillium_http/h3/
connection.rs

1mod peer_settings_wait;
2
3use super::{
4    H3Error,
5    frame::{Frame, FrameDecodeError, UniStreamType},
6    quic_varint::{self, QuicVarIntError},
7    settings::H3Settings,
8};
9use crate::{
10    Buffer, Conn, HttpContext,
11    h3::{H3ErrorCode, MAX_BUFFER_SIZE},
12    headers::qpack::{DecoderDynamicTable, EncoderDynamicTable, FieldSection},
13};
14use event_listener::Event;
15use futures_lite::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
16use std::{
17    future::Future,
18    io::{self, ErrorKind},
19    pin::Pin,
20    sync::{
21        Arc, OnceLock,
22        atomic::{AtomicBool, AtomicU64, Ordering},
23    },
24    task::{Context, Poll},
25};
26use swansong::{ShutdownCompletion, Swansong};
27
28/// The result of processing an HTTP/3 bidirectional stream.
29#[derive(Debug)]
30#[allow(clippy::large_enum_variant)] // Request is the hot path; boxing it would add an allocation per request
31pub enum H3StreamResult<Transport> {
32    /// The stream carried a normal HTTP/3 request.
33    Request(Conn<Transport>),
34
35    /// The stream carries a WebTransport bidirectional data stream. The `session_id` identifies
36    /// the associated WebTransport session.
37    WebTransport {
38        /// The WebTransport session ID (stream ID of the CONNECT request).
39        session_id: u64,
40        /// The underlying transport, ready for application data.
41        transport: Transport,
42        /// Any bytes buffered after the session ID during stream negotiation.
43        buffer: Buffer,
44    },
45}
46
47/// The result of processing an HTTP/3 unidirectional stream.
48#[derive(Debug)]
49pub enum UniStreamResult<T> {
50    /// The stream was a known internal type (control, QPACK encoder/decoder) and was handled
51    /// automatically.
52    Handled,
53
54    /// A WebTransport unidirectional data stream. The `session_id` identifies the associated
55    /// WebTransport session.
56    WebTransport {
57        /// The WebTransport session ID.
58        session_id: u64,
59        /// The receive stream, ready for application data.
60        stream: T,
61        /// Any bytes buffered after the session ID during stream negotiation.
62        buffer: Buffer,
63    },
64
65    /// A stream whose type is recognized but unsupported (e.g. `Push`) or not recognized
66    /// at all by this crate.
67    ///
68    /// The caller is responsible for disposing of the stream — the in-tree consumers
69    /// (`trillium-server-common` for servers, `trillium-client` for clients) RST it with
70    /// `H3_STREAM_CREATION_ERROR`. `process_inbound_uni` deliberately does *not* close
71    /// the stream itself: handing it back gives a downstream extension the option to
72    /// implement a stream type trillium-http doesn't yet know about (a future RFC, an
73    /// experiment, etc.) without forking the codec.
74    Unknown {
75        /// The raw stream type value.
76        stream_type: u64,
77        /// The stream.
78        stream: T,
79    },
80}
81
82/// Shared state for a single HTTP/3 QUIC connection.
83///
84/// Call the appropriate methods on this type for each stream accepted from the QUIC connection.
85///
86/// # Driver shape (vs h2)
87///
88/// h2 multiplexes everything onto a single TCP byte stream, so a single
89/// [`H2Driver`][crate::h2::H2Driver] task suffices. h3 instead has the QUIC layer hand us multiple
90/// independent streams: an inbound and outbound control stream, an inbound and outbound QPACK
91/// encoder stream, an inbound and outbound QPACK decoder stream, and one bidi stream per
92/// request. There is no single "h3 driver" — each stream is driven by its own future returned from
93/// `H3Connection`'s `run_*` / `process_*` methods, and the caller decides how those futures are
94/// scheduled.
95///
96/// The trillium-http boundary is **runtime-free by design**: this crate hands out anonymous futures
97/// and lets the caller pick the executor. The in-tree consumers (`trillium-server-common`,
98/// `trillium-client`) follow a task-per-stream pattern — spawn each long-lived control / encoder /
99/// decoder future on its own task at connection setup, then spawn one task per accepted request
100/// stream. Nothing in this crate requires that pattern; a caller could in principle race all the
101/// futures on one task instead, with different perf characteristics.
102#[derive(Debug)]
103pub struct H3Connection {
104    /// Shared configuration for the entire server, including tcp-based listeners
105    context: Arc<HttpContext>,
106
107    /// Connection-scoped shutdown signal. Shut down when we receive GOAWAY from the peer or when
108    /// the server-level Swansong shuts down.  Request stream tasks use this to interrupt
109    /// in-progress work.
110    swansong: Swansong,
111
112    /// The peer's H3 settings, received on their control stream.  Request streams may need to
113    /// consult these (e.g. max field section size).
114    pub(super) peer_settings: OnceLock<H3Settings>,
115
116    /// Multi-listener wake source for [`PeerSettings`]. Notified by `run_inbound_control` after
117    /// applying peer SETTINGS, and again on connection close, so any number of concurrently-
118    /// parked `PeerSettings` futures all unblock together.
119    pub(super) peer_settings_event: Event,
120
121    /// The highest bidirectional stream ID we have accepted.  Used to compute the GOAWAY value
122    /// (this + 4) to tell the peer which requests we saw. None until the first stream is accepted.
123    /// Updated by the runtime adapter's accept loop via [`record_accepted_stream`].
124    max_accepted_stream_id: AtomicU64,
125
126    /// Whether we have accepted any streams yet.
127    has_accepted_stream: AtomicBool,
128
129    /// The decoder-side QPACK dynamic table for this connection.
130    decoder_dynamic_table: DecoderDynamicTable,
131
132    /// The encoder-side QPACK dynamic table for this connection.
133    encoder_dynamic_table: EncoderDynamicTable,
134}
135
136impl H3Connection {
137    /// Construct a new `H3Connection` to manage HTTP/3 for a given peer.
138    pub fn new(context: Arc<HttpContext>) -> Arc<Self> {
139        let swansong = context.swansong.child();
140        let max_table_capacity = context.config.dynamic_table_capacity;
141        let blocked_streams = context.config.h3_blocked_streams;
142        let encoder_dynamic_table = EncoderDynamicTable::new(&context);
143        Arc::new(Self {
144            context,
145            swansong,
146            peer_settings: OnceLock::new(),
147            peer_settings_event: Event::new(),
148            max_accepted_stream_id: AtomicU64::new(0),
149            has_accepted_stream: AtomicBool::new(false),
150            decoder_dynamic_table: DecoderDynamicTable::new(max_table_capacity, blocked_streams),
151            encoder_dynamic_table,
152        })
153    }
154
155    /// Retrieve the [`Swansong`] shutdown handle for this HTTP/3 connection. See also
156    /// [`H3Connection::shut_down`]
157    pub fn swansong(&self) -> &Swansong {
158        &self.swansong
159    }
160
161    /// Attempt graceful shutdown of this HTTP/3 connection (all streams).
162    ///
163    /// The returned [`ShutdownCompletion`] type can
164    /// either be awaited in an async context or blocked on with [`ShutdownCompletion::block`] in a
165    /// blocking context
166    ///
167    /// Note that this will NOT shut down the server. To shut down the whole server, use
168    /// [`HttpContext::shut_down`]
169    pub fn shut_down(&self) -> ShutdownCompletion {
170        // Wake any in-flight `decode_field_section` calls parked on the decoder
171        // table's `ThresholdWait` (a non-I/O future awaiting dynamic-table inserts
172        // from the peer). The encoder table's writer loop is already swansong-
173        // aware, but we mark it failed too for symmetry: any future state
174        // mutations after shutdown are no longer wire-relevant.
175        self.decoder_dynamic_table.fail(H3ErrorCode::NoError);
176        self.encoder_dynamic_table.fail(H3ErrorCode::NoError);
177        self.wake_peer_settings_waiters();
178        self.swansong.shut_down()
179    }
180
181    /// Retrieve the [`HttpContext`] for this server.
182    pub fn context(&self) -> Arc<HttpContext> {
183        self.context.clone()
184    }
185
186    /// Returns the peer's HTTP/3 settings, available once the peer's control stream has been
187    /// processed.
188    pub fn peer_settings(&self) -> Option<&H3Settings> {
189        self.peer_settings.get()
190    }
191
192    /// Record that we accepted a bidirectional stream with this ID.
193    fn record_accepted_stream(&self, stream_id: u64) {
194        self.max_accepted_stream_id
195            .fetch_max(stream_id, Ordering::Relaxed);
196        self.has_accepted_stream.store(true, Ordering::Relaxed);
197    }
198
199    /// The stream ID to send in a GOAWAY frame: one past the highest stream we accepted, or 0 if we
200    /// haven't accepted any.
201    fn goaway_id(&self) -> u64 {
202        if self.has_accepted_stream.load(Ordering::Relaxed) {
203            self.max_accepted_stream_id.load(Ordering::Relaxed) + 4
204        } else {
205            0
206        }
207    }
208
209    /// Process a single HTTP/3 request-response cycle on a bidirectional stream.
210    ///
211    /// Call this once per accepted bidirectional stream. Returns
212    /// [`H3StreamResult::WebTransport`] if the stream opens a WebTransport session rather than
213    /// a standard HTTP/3 request.
214    ///
215    /// # Errors
216    ///
217    /// Returns an `H3Error` in case of io error or http/3 semantic error.
218    pub async fn process_inbound_bidi<Transport, Handler, Fut>(
219        self: Arc<Self>,
220        transport: Transport,
221        handler: Handler,
222        stream_id: u64,
223    ) -> Result<H3StreamResult<Transport>, H3Error>
224    where
225        Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
226        Handler: FnOnce(Conn<Transport>) -> Fut,
227        Fut: Future<Output = Conn<Transport>>,
228    {
229        self.record_accepted_stream(stream_id);
230        let _guard = self.swansong.guard();
231        let buffer = Vec::with_capacity(self.context.config.request_buffer_initial_len).into();
232        match Conn::new_h3(self, transport, buffer, stream_id).await? {
233            H3StreamResult::Request(conn) => Ok(H3StreamResult::Request(
234                handler(conn).await.send_h3().await?,
235            )),
236            wt @ H3StreamResult::WebTransport { .. } => Ok(wt),
237        }
238    }
239
240    /// Decode a QPACK-encoded field section, consulting the dynamic table as needed.
241    ///
242    /// If the field section's Required Insert Count is greater than zero, waits until the
243    /// dynamic table has received enough entries. Returns an error on protocol violations or
244    /// if the encoder stream fails while waiting.
245    ///
246    /// Duplicate pseudo-headers are silently ignored (first value wins).
247    /// Unknown pseudo-headers are rejected per RFC 9114 §4.1.1.
248    ///
249    /// # Errors
250    ///
251    /// Returns an error if the encoded bytes cannot be parsed as a valid field section.
252    #[cfg(feature = "unstable")]
253    pub async fn decode_field_section(
254        &self,
255        encoded: &[u8],
256        stream_id: u64,
257    ) -> Result<FieldSection<'static>, H3Error> {
258        self.decoder_dynamic_table.decode(encoded, stream_id).await
259    }
260
261    #[cfg(not(feature = "unstable"))]
262    pub(crate) async fn decode_field_section(
263        &self,
264        encoded: &[u8],
265        stream_id: u64,
266    ) -> Result<FieldSection<'static>, H3Error> {
267        self.decoder_dynamic_table.decode(encoded, stream_id).await
268    }
269
270    /// Encode a QPACK field section from pseudo-headers and headers.
271    ///
272    /// This currently uses only the static table (no dynamic table).
273    /// Decode a QPACK-encoded field section, consulting the dynamic table as needed.
274    ///
275    /// # Errors
276    ///
277    /// Returns an `H3Error` in case of http/3 semantic error.
278    #[cfg(feature = "unstable")]
279    #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
280    pub fn encode_field_section(
281        &self,
282        field_section: &FieldSection<'_>,
283        buf: &mut Vec<u8>,
284        stream_id: u64,
285    ) -> Result<(), H3Error> {
286        self.encoder_dynamic_table
287            .encode(field_section, buf, stream_id);
288        Ok(())
289    }
290
291    #[cfg(not(feature = "unstable"))]
292    #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
293    pub(crate) fn encode_field_section(
294        &self,
295        field_section: &FieldSection<'_>,
296        buf: &mut Vec<u8>,
297        stream_id: u64,
298    ) -> Result<(), H3Error> {
299        self.encoder_dynamic_table
300            .encode(field_section, buf, stream_id);
301        Ok(())
302    }
303
304    /// Run this server's HTTP/3 outbound control stream.
305    ///
306    /// Sends the initial SETTINGS frame, then sends GOAWAY when the connection shuts down.
307    /// Returns after GOAWAY is sent; keep the stream open until the QUIC connection closes
308    /// (closing a control stream is a connection error per RFC 9114 §6.2.1).
309    ///
310    /// # Errors
311    ///
312    /// Returns an `H3Error` in case of io error or http/3 semantic error.
313    pub async fn run_outbound_control<T>(&self, mut stream: T) -> Result<(), H3Error>
314    where
315        T: AsyncWrite + Unpin + Send,
316    {
317        let mut buf = vec![0; 128];
318
319        // Stream type + SETTINGS frame
320        let settings = Frame::Settings(H3Settings::from(&self.context.config));
321        log::trace!(
322            "H3 outbound control: sending SETTINGS: {:?}",
323            H3Settings::from(&self.context.config)
324        );
325
326        write(&mut buf, &mut stream, |buf| {
327            let mut written = quic_varint::encode(UniStreamType::Control, buf)?;
328            written += settings.encode(&mut buf[written..])?;
329            Some(written)
330        })
331        .await?;
332        log::trace!("H3 outbound control: SETTINGS sent");
333
334        // Wait for shutdown
335        self.swansong.clone().await;
336
337        // Send GOAWAY
338        write(&mut buf, &mut stream, |buf| {
339            Frame::Goaway(self.goaway_id()).encode(buf)
340        })
341        .await?;
342
343        Ok(())
344    }
345
346    /// Run the outbound QPACK encoder stream for the duration of the connection.
347    ///
348    /// Writes the stream type byte, then drains encoder-stream instructions from the encoder
349    /// dynamic table as they are enqueued. Returns when the connection shuts down or the table is
350    /// marked failed.
351    ///
352    /// # Errors
353    ///
354    /// Returns an `H3Error` in case of io error.
355    pub async fn run_encoder<T>(&self, mut stream: T) -> Result<(), H3Error>
356    where
357        T: AsyncWrite + Unpin + Send,
358    {
359        self.encoder_dynamic_table
360            .run_writer(&mut stream, self.swansong.clone())
361            .await
362    }
363
364    /// Run the outbound QPACK decoder stream for the duration of the connection.
365    ///
366    /// Writes the stream type byte, then loops sending Section Acknowledgement and Insert
367    /// Count Increment instructions as they become needed. Returns when the connection
368    /// shuts down.
369    ///
370    /// # Errors
371    ///
372    /// Returns an `H3Error` in case of io error or http/3 semantic error.
373    pub async fn run_decoder<T>(&self, mut stream: T) -> Result<(), H3Error>
374    where
375        T: AsyncWrite + Unpin + Send,
376    {
377        self.decoder_dynamic_table
378            .run_writer(&mut stream, self.swansong.clone())
379            .await
380    }
381
382    /// Handle an inbound unidirectional HTTP/3 stream from the peer.
383    ///
384    /// Internal stream types (control, QPACK encoder/decoder) are handled automatically;
385    /// application streams are returned via [`UniStreamResult`] for the caller to process.
386    ///
387    /// # Errors
388    ///
389    /// Returns a `H3Error` in case of io error or http/3 semantic error.
390    pub async fn process_inbound_uni<T>(&self, mut stream: T) -> Result<UniStreamResult<T>, H3Error>
391    where
392        T: AsyncRead + Unpin + Send,
393    {
394        self.swansong
395            .interrupt(async move {
396                let mut buf = vec![0; 128];
397                let mut filled = 0;
398
399                // Read stream type varint (decode as raw u64 to handle unknown types)
400                let stream_type =
401                    read(
402                        &mut buf,
403                        &mut filled,
404                        &mut stream,
405                        |data| match quic_varint::decode(data) {
406                            Ok(ok) => Ok(Some(ok)),
407                            Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
408                            // this branch is unreachable because u64 is always From<u64>
409                            Err(QuicVarIntError::UnknownValue { bytes, value }) => {
410                                Ok(Some((value, bytes)))
411                            }
412                        },
413                    )
414                    .await?;
415
416                match UniStreamType::try_from(stream_type) {
417                    Ok(UniStreamType::Control) => {
418                        log::trace!("H3 inbound uni: control stream");
419                        self.run_inbound_control(&mut buf, &mut filled, &mut stream)
420                            .await?;
421                        Ok(UniStreamResult::Handled)
422                    }
423
424                    Ok(UniStreamType::QpackEncoder) => {
425                        log::trace!(
426                            "H3 inbound uni: QPACK encoder stream ({filled} bytes pre-read)"
427                        );
428                        let mut reader = Prepended {
429                            head: &buf[..filled],
430                            tail: stream,
431                        };
432
433                        log::trace!("QPACK encoder stream: started");
434                        self.decoder_dynamic_table.run_reader(&mut reader).await?;
435
436                        Ok(UniStreamResult::Handled)
437                    }
438
439                    Ok(UniStreamType::QpackDecoder) => {
440                        log::trace!(
441                            "H3 inbound uni: QPACK decoder stream ({filled} bytes pre-read)"
442                        );
443                        let mut reader = Prepended {
444                            head: &buf[..filled],
445                            tail: stream,
446                        };
447                        self.encoder_dynamic_table.run_reader(&mut reader).await?;
448                        Ok(UniStreamResult::Handled)
449                    }
450
451                    Ok(UniStreamType::WebTransport) => {
452                        log::trace!("H3 inbound uni: WebTransport stream");
453                        let session_id = read(&mut buf, &mut filled, &mut stream, |data| {
454                            match quic_varint::decode(data) {
455                                Ok(ok) => Ok(Some(ok)),
456                                Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
457                                Err(QuicVarIntError::UnknownValue { bytes, value }) => {
458                                    Ok(Some((value, bytes)))
459                                }
460                            }
461                        })
462                        .await?;
463
464                        buf.truncate(filled);
465
466                        Ok(UniStreamResult::WebTransport {
467                            session_id,
468                            stream,
469                            buffer: buf.into(),
470                        })
471                    }
472
473                    Ok(UniStreamType::Push) => {
474                        // Push streams are server→client per RFC 9114 §4.6. Trillium does
475                        // not support HTTP/3 push as initiator or recipient, so we hand
476                        // these back as `Unknown` for the caller to dispose of identically
477                        // to truly-unknown stream types — the explicit arm exists so trace
478                        // output names "push stream" rather than a bare type id.
479                        log::trace!("H3 inbound uni: push stream (push not supported)");
480                        Ok(UniStreamResult::Unknown {
481                            stream_type,
482                            stream,
483                        })
484                    }
485
486                    Err(_) => {
487                        log::trace!("H3 inbound uni: unknown stream type {stream_type:#x}");
488                        Ok(UniStreamResult::Unknown {
489                            stream_type,
490                            stream,
491                        })
492                    }
493                }
494            })
495            .await
496            .unwrap_or(Ok(UniStreamResult::Handled)) // interrupted
497    }
498
499    /// Handle the http/3 peer's inbound control stream.
500    ///
501    /// # Errors
502    ///
503    /// Returns a `H3Error` in case of io error or HTTP/3 semantic error.
504    // The first frame must be SETTINGS. After that, watches for
505    // GOAWAY to initiate connection shutdown.
506    async fn run_inbound_control<T>(
507        &self,
508        buf: &mut Vec<u8>,
509        filled: &mut usize,
510        stream: &mut T,
511    ) -> Result<(), H3Error>
512    where
513        T: AsyncRead + Unpin + Send,
514    {
515        // First frame must be SETTINGS (§6.2.1)
516        let settings = read(buf, filled, stream, |data| match Frame::decode(data) {
517            Ok((Frame::Settings(s), consumed)) => Ok(Some((s, consumed))),
518            Ok(_) => Err(H3ErrorCode::FrameUnexpected),
519            Err(FrameDecodeError::Incomplete) => Ok(None),
520            Err(FrameDecodeError::Error(code)) => Err(code),
521        })
522        .await?;
523
524        log::trace!("H3 peer settings: {settings:?}");
525
526        self.peer_settings
527            .set(settings)
528            .map_err(|_| H3ErrorCode::FrameUnexpected)?;
529        self.wake_peer_settings_waiters();
530
531        self.encoder_dynamic_table
532            .initialize_from_peer_settings(settings);
533
534        // Read subsequent frames, watching for GOAWAY
535        loop {
536            let frame = self
537                .swansong
538                .interrupt(read(buf, filled, stream, |data| {
539                    match Frame::decode(data) {
540                        Ok((frame, consumed)) => Ok(Some((frame, consumed))),
541                        Err(FrameDecodeError::Incomplete) => Ok(None),
542                        Err(FrameDecodeError::Error(code)) => Err(code),
543                    }
544                }))
545                .await
546                .transpose()?;
547
548            match frame {
549                None => {
550                    log::trace!("H3 control stream: interrupted by shutdown");
551                    return Ok(());
552                }
553
554                Some(Frame::Goaway(id)) => {
555                    log::trace!("H3 control stream: peer sent GOAWAY(stream_id={id})");
556                    self.swansong.shut_down();
557                    return Ok(());
558                }
559
560                Some(Frame::Settings(_)) => {
561                    return Err(H3ErrorCode::FrameUnexpected.into());
562                }
563
564                Some(Frame::Unknown(n)) => {
565                    // RFC 9114 §7.2.8: unknown frame types MUST be ignored.
566                    // We must also consume the payload bytes so the stream stays synchronized.
567                    log::trace!("H3 control stream: skipping unknown frame (payload {n} bytes)");
568                    let n = usize::try_from(n).unwrap_or(usize::MAX);
569                    let in_buf = n.min(*filled);
570                    buf.copy_within(in_buf..*filled, 0);
571                    *filled -= in_buf;
572                    let mut todo = n - in_buf;
573                    let mut scratch = [0u8; 256];
574                    while todo > 0 {
575                        let to_read = todo.min(scratch.len());
576                        let n = stream
577                            .read(&mut scratch[..to_read])
578                            .await
579                            .map_err(H3Error::Io)?;
580                        if n == 0 {
581                            return Err(H3ErrorCode::ClosedCriticalStream.into());
582                        }
583                        todo -= n;
584                    }
585                }
586                other => {
587                    log::trace!("H3 control stream: ignoring {other:?}");
588                }
589            }
590        }
591    }
592}
593
594async fn write(
595    buf: &mut Vec<u8>,
596    mut stream: impl AsyncWrite + Unpin + Send,
597    mut f: impl FnMut(&mut [u8]) -> Option<usize>,
598) -> io::Result<usize> {
599    let written = loop {
600        if let Some(w) = f(buf) {
601            break w;
602        }
603        if buf.len() >= MAX_BUFFER_SIZE {
604            return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation"));
605        }
606        buf.resize(buf.len() * 2, 0);
607    };
608
609    stream.write_all(&buf[..written]).await?;
610    stream.flush().await?;
611    Ok(written)
612}
613
614/// An `AsyncRead` adapter that drains a byte slice before reading from an inner stream.
615///
616/// Used in `process_inbound_uni` to replay bytes that were read ahead while
617/// parsing the stream-type varint before dispatching to `run_inbound_encoder`.
618struct Prepended<'a, T> {
619    head: &'a [u8],
620    tail: T,
621}
622
623impl<T: AsyncRead + Unpin> AsyncRead for Prepended<'_, T> {
624    fn poll_read(
625        self: Pin<&mut Self>,
626        cx: &mut Context<'_>,
627        out: &mut [u8],
628    ) -> Poll<io::Result<usize>> {
629        let this = self.get_mut();
630        if !this.head.is_empty() {
631            let n = this.head.len().min(out.len());
632            out[..n].copy_from_slice(&this.head[..n]);
633            this.head = &this.head[n..];
634            return Poll::Ready(Ok(n));
635        }
636        Pin::new(&mut this.tail).poll_read(cx, out)
637    }
638}
639
640/// Read from `stream` into `buf` until `f` can decode a value.
641///
642/// `f` receives the filled portion of the buffer and returns:
643/// - `Ok(Some((value, consumed)))` — success; consumed bytes are removed from the front
644/// - `Ok(None)` — need more data; reads more bytes and retries
645/// - `Err(e)` — unrecoverable error; propagated to caller
646async fn read<R>(
647    buf: &mut Vec<u8>,
648    filled: &mut usize,
649    stream: &mut (impl AsyncRead + Unpin + Send),
650    f: impl Fn(&[u8]) -> Result<Option<(R, usize)>, H3ErrorCode>,
651) -> Result<R, H3Error> {
652    loop {
653        if let Some((result, consumed)) = f(&buf[..*filled])? {
654            buf.copy_within(consumed..*filled, 0);
655            *filled -= consumed;
656            return Ok(result);
657        }
658
659        if *filled >= buf.len() {
660            if buf.len() >= MAX_BUFFER_SIZE {
661                return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation").into());
662            }
663            buf.resize(buf.len() * 2, 0);
664        }
665
666        let n = stream.read(&mut buf[*filled..]).await?;
667        if n == 0 {
668            return Err(io::Error::new(ErrorKind::UnexpectedEof, "stream closed").into());
669        }
670        *filled += n;
671    }
672}