trillium_http/h3/connection.rs
1mod peer_settings_wait;
2
3use super::{
4 H3Error,
5 frame::{Frame, FrameDecodeError, UniStreamType},
6 quic_varint::{self, QuicVarIntError},
7 settings::H3Settings,
8};
9use crate::{
10 Buffer, Conn, HttpContext,
11 h3::{H3ErrorCode, MAX_BUFFER_SIZE},
12 headers::qpack::{DecoderDynamicTable, EncoderDynamicTable, FieldSection},
13};
14use event_listener::Event;
15use futures_lite::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
16use std::{
17 future::Future,
18 io::{self, ErrorKind},
19 pin::Pin,
20 sync::{
21 Arc, OnceLock,
22 atomic::{AtomicBool, AtomicU64, Ordering},
23 },
24 task::{Context, Poll},
25};
26use swansong::{ShutdownCompletion, Swansong};
27
28/// The result of processing an HTTP/3 bidirectional stream.
29#[derive(Debug)]
30#[allow(clippy::large_enum_variant)] // Request is the hot path; boxing it would add an allocation per request
31pub enum H3StreamResult<Transport> {
32 /// The stream carried a normal HTTP/3 request.
33 Request(Conn<Transport>),
34
35 /// The stream carries a WebTransport bidirectional data stream. The `session_id` identifies
36 /// the associated WebTransport session.
37 WebTransport {
38 /// The WebTransport session ID (stream ID of the CONNECT request).
39 session_id: u64,
40 /// The underlying transport, ready for application data.
41 transport: Transport,
42 /// Any bytes buffered after the session ID during stream negotiation.
43 buffer: Buffer,
44 },
45}
46
47/// The result of processing an HTTP/3 unidirectional stream.
48#[derive(Debug)]
49pub enum UniStreamResult<T> {
50 /// The stream was a known internal type (control, QPACK encoder/decoder) and was handled
51 /// automatically.
52 Handled,
53
54 /// A WebTransport unidirectional data stream. The `session_id` identifies the associated
55 /// WebTransport session.
56 WebTransport {
57 /// The WebTransport session ID.
58 session_id: u64,
59 /// The receive stream, ready for application data.
60 stream: T,
61 /// Any bytes buffered after the session ID during stream negotiation.
62 buffer: Buffer,
63 },
64
65 /// A stream whose type is recognized but unsupported (e.g. `Push`) or not recognized
66 /// at all by this crate.
67 ///
68 /// The caller is responsible for disposing of the stream — the in-tree consumers
69 /// (`trillium-server-common` for servers, `trillium-client` for clients) RST it with
70 /// `H3_STREAM_CREATION_ERROR`. `process_inbound_uni` deliberately does *not* close
71 /// the stream itself: handing it back gives a downstream extension the option to
72 /// implement a stream type trillium-http doesn't yet know about (a future RFC, an
73 /// experiment, etc.) without forking the codec.
74 Unknown {
75 /// The raw stream type value.
76 stream_type: u64,
77 /// The stream.
78 stream: T,
79 },
80}
81
82/// Shared state for a single HTTP/3 QUIC connection.
83///
84/// Call the appropriate methods on this type for each stream accepted from the QUIC connection.
85///
86/// # Driver shape (vs h2)
87///
88/// h2 multiplexes everything onto a single TCP byte stream, so a single
89/// [`H2Driver`][crate::h2::H2Driver] task suffices. h3 instead has the QUIC layer hand us multiple
90/// independent streams: an inbound and outbound control stream, an inbound and outbound QPACK
91/// encoder stream, an inbound and outbound QPACK decoder stream, and one bidi stream per
92/// request. There is no single "h3 driver" — each stream is driven by its own future returned from
93/// `H3Connection`'s `run_*` / `process_*` methods, and the caller decides how those futures are
94/// scheduled.
95///
96/// The trillium-http boundary is **runtime-free by design**: this crate hands out anonymous futures
97/// and lets the caller pick the executor. The in-tree consumers (`trillium-server-common`,
98/// `trillium-client`) follow a task-per-stream pattern — spawn each long-lived control / encoder /
99/// decoder future on its own task at connection setup, then spawn one task per accepted request
100/// stream. Nothing in this crate requires that pattern; a caller could in principle race all the
101/// futures on one task instead, with different perf characteristics.
102#[derive(Debug)]
103pub struct H3Connection {
104 /// Shared configuration for the entire server, including tcp-based listeners
105 context: Arc<HttpContext>,
106
107 /// Connection-scoped shutdown signal. Shut down when we receive GOAWAY from the peer or when
108 /// the server-level Swansong shuts down. Request stream tasks use this to interrupt
109 /// in-progress work.
110 swansong: Swansong,
111
112 /// The peer's H3 settings, received on their control stream. Request streams may need to
113 /// consult these (e.g. max field section size).
114 pub(super) peer_settings: OnceLock<H3Settings>,
115
116 /// Multi-listener wake source for [`PeerSettings`]. Notified by `run_inbound_control` after
117 /// applying peer SETTINGS, and again on connection close, so any number of concurrently-
118 /// parked `PeerSettings` futures all unblock together.
119 pub(super) peer_settings_event: Event,
120
121 /// The highest bidirectional stream ID we have accepted. Used to compute the GOAWAY value
122 /// (this + 4) to tell the peer which requests we saw. None until the first stream is accepted.
123 /// Updated by the runtime adapter's accept loop via [`record_accepted_stream`].
124 max_accepted_stream_id: AtomicU64,
125
126 /// Whether we have accepted any streams yet.
127 has_accepted_stream: AtomicBool,
128
129 /// The decoder-side QPACK dynamic table for this connection.
130 decoder_dynamic_table: DecoderDynamicTable,
131
132 /// The encoder-side QPACK dynamic table for this connection.
133 encoder_dynamic_table: EncoderDynamicTable,
134}
135
136impl H3Connection {
137 /// Construct a new `H3Connection` to manage HTTP/3 for a given peer.
138 pub fn new(context: Arc<HttpContext>) -> Arc<Self> {
139 let swansong = context.swansong.child();
140 let max_table_capacity = context.config.dynamic_table_capacity;
141 let blocked_streams = context.config.h3_blocked_streams;
142 let encoder_dynamic_table = EncoderDynamicTable::new(&context);
143 Arc::new(Self {
144 context,
145 swansong,
146 peer_settings: OnceLock::new(),
147 peer_settings_event: Event::new(),
148 max_accepted_stream_id: AtomicU64::new(0),
149 has_accepted_stream: AtomicBool::new(false),
150 decoder_dynamic_table: DecoderDynamicTable::new(max_table_capacity, blocked_streams),
151 encoder_dynamic_table,
152 })
153 }
154
155 /// Retrieve the [`Swansong`] shutdown handle for this HTTP/3 connection. See also
156 /// [`H3Connection::shut_down`]
157 pub fn swansong(&self) -> &Swansong {
158 &self.swansong
159 }
160
161 /// Attempt graceful shutdown of this HTTP/3 connection (all streams).
162 ///
163 /// The returned [`ShutdownCompletion`] type can
164 /// either be awaited in an async context or blocked on with [`ShutdownCompletion::block`] in a
165 /// blocking context
166 ///
167 /// Note that this will NOT shut down the server. To shut down the whole server, use
168 /// [`HttpContext::shut_down`]
169 pub fn shut_down(&self) -> ShutdownCompletion {
170 // Wake any in-flight `decode_field_section` calls parked on the decoder
171 // table's `ThresholdWait` (a non-I/O future awaiting dynamic-table inserts
172 // from the peer). The encoder table's writer loop is already swansong-
173 // aware, but we mark it failed too for symmetry: any future state
174 // mutations after shutdown are no longer wire-relevant.
175 self.decoder_dynamic_table.fail(H3ErrorCode::NoError);
176 self.encoder_dynamic_table.fail(H3ErrorCode::NoError);
177 self.wake_peer_settings_waiters();
178 self.swansong.shut_down()
179 }
180
181 /// Retrieve the [`HttpContext`] for this server.
182 pub fn context(&self) -> Arc<HttpContext> {
183 self.context.clone()
184 }
185
186 /// Returns the peer's HTTP/3 settings, available once the peer's control stream has been
187 /// processed.
188 pub fn peer_settings(&self) -> Option<&H3Settings> {
189 self.peer_settings.get()
190 }
191
192 /// Record that we accepted a bidirectional stream with this ID.
193 fn record_accepted_stream(&self, stream_id: u64) {
194 self.max_accepted_stream_id
195 .fetch_max(stream_id, Ordering::Relaxed);
196 self.has_accepted_stream.store(true, Ordering::Relaxed);
197 }
198
199 /// The stream ID to send in a GOAWAY frame: one past the highest stream we accepted, or 0 if we
200 /// haven't accepted any.
201 fn goaway_id(&self) -> u64 {
202 if self.has_accepted_stream.load(Ordering::Relaxed) {
203 self.max_accepted_stream_id.load(Ordering::Relaxed) + 4
204 } else {
205 0
206 }
207 }
208
209 /// Process a single HTTP/3 request-response cycle on a bidirectional stream.
210 ///
211 /// Call this once per accepted bidirectional stream. Returns
212 /// [`H3StreamResult::WebTransport`] if the stream opens a WebTransport session rather than
213 /// a standard HTTP/3 request.
214 ///
215 /// # Errors
216 ///
217 /// Returns an `H3Error` in case of io error or http/3 semantic error.
218 pub async fn process_inbound_bidi<Transport, Handler, Fut>(
219 self: Arc<Self>,
220 transport: Transport,
221 handler: Handler,
222 stream_id: u64,
223 ) -> Result<H3StreamResult<Transport>, H3Error>
224 where
225 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
226 Handler: FnOnce(Conn<Transport>) -> Fut,
227 Fut: Future<Output = Conn<Transport>>,
228 {
229 self.record_accepted_stream(stream_id);
230 let _guard = self.swansong.guard();
231 let buffer = Vec::with_capacity(self.context.config.request_buffer_initial_len).into();
232 match Conn::new_h3(self, transport, buffer, stream_id).await? {
233 H3StreamResult::Request(conn) => Ok(H3StreamResult::Request(
234 handler(conn).await.send_h3().await?,
235 )),
236 wt @ H3StreamResult::WebTransport { .. } => Ok(wt),
237 }
238 }
239
240 /// Decode a QPACK-encoded field section, consulting the dynamic table as needed.
241 ///
242 /// If the field section's Required Insert Count is greater than zero, waits until the
243 /// dynamic table has received enough entries. Returns an error on protocol violations or
244 /// if the encoder stream fails while waiting.
245 ///
246 /// Duplicate pseudo-headers are silently ignored (first value wins).
247 /// Unknown pseudo-headers are rejected per RFC 9114 §4.1.1.
248 ///
249 /// # Errors
250 ///
251 /// Returns an error if the encoded bytes cannot be parsed as a valid field section.
252 #[cfg(feature = "unstable")]
253 pub async fn decode_field_section(
254 &self,
255 encoded: &[u8],
256 stream_id: u64,
257 ) -> Result<FieldSection<'static>, H3Error> {
258 self.decoder_dynamic_table.decode(encoded, stream_id).await
259 }
260
261 #[cfg(not(feature = "unstable"))]
262 pub(crate) async fn decode_field_section(
263 &self,
264 encoded: &[u8],
265 stream_id: u64,
266 ) -> Result<FieldSection<'static>, H3Error> {
267 self.decoder_dynamic_table.decode(encoded, stream_id).await
268 }
269
270 /// Encode a QPACK field section from pseudo-headers and headers.
271 ///
272 /// This currently uses only the static table (no dynamic table).
273 /// Decode a QPACK-encoded field section, consulting the dynamic table as needed.
274 ///
275 /// # Errors
276 ///
277 /// Returns an `H3Error` in case of http/3 semantic error.
278 #[cfg(feature = "unstable")]
279 #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
280 pub fn encode_field_section(
281 &self,
282 field_section: &FieldSection<'_>,
283 buf: &mut Vec<u8>,
284 stream_id: u64,
285 ) -> Result<(), H3Error> {
286 self.encoder_dynamic_table
287 .encode(field_section, buf, stream_id);
288 Ok(())
289 }
290
291 #[cfg(not(feature = "unstable"))]
292 #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
293 pub(crate) fn encode_field_section(
294 &self,
295 field_section: &FieldSection<'_>,
296 buf: &mut Vec<u8>,
297 stream_id: u64,
298 ) -> Result<(), H3Error> {
299 self.encoder_dynamic_table
300 .encode(field_section, buf, stream_id);
301 Ok(())
302 }
303
304 /// Run this server's HTTP/3 outbound control stream.
305 ///
306 /// Sends the initial SETTINGS frame, then sends GOAWAY when the connection shuts down.
307 /// Returns after GOAWAY is sent; keep the stream open until the QUIC connection closes
308 /// (closing a control stream is a connection error per RFC 9114 §6.2.1).
309 ///
310 /// # Errors
311 ///
312 /// Returns an `H3Error` in case of io error or http/3 semantic error.
313 pub async fn run_outbound_control<T>(&self, mut stream: T) -> Result<(), H3Error>
314 where
315 T: AsyncWrite + Unpin + Send,
316 {
317 let mut buf = vec![0; 128];
318
319 // Stream type + SETTINGS frame
320 let settings = Frame::Settings(H3Settings::from(&self.context.config));
321 log::trace!(
322 "H3 outbound control: sending SETTINGS: {:?}",
323 H3Settings::from(&self.context.config)
324 );
325
326 write(&mut buf, &mut stream, |buf| {
327 let mut written = quic_varint::encode(UniStreamType::Control, buf)?;
328 written += settings.encode(&mut buf[written..])?;
329 Some(written)
330 })
331 .await?;
332 log::trace!("H3 outbound control: SETTINGS sent");
333
334 // Wait for shutdown
335 self.swansong.clone().await;
336
337 // Send GOAWAY
338 write(&mut buf, &mut stream, |buf| {
339 Frame::Goaway(self.goaway_id()).encode(buf)
340 })
341 .await?;
342
343 Ok(())
344 }
345
346 /// Run the outbound QPACK encoder stream for the duration of the connection.
347 ///
348 /// Writes the stream type byte, then drains encoder-stream instructions from the encoder
349 /// dynamic table as they are enqueued. Returns when the connection shuts down or the table is
350 /// marked failed.
351 ///
352 /// # Errors
353 ///
354 /// Returns an `H3Error` in case of io error.
355 pub async fn run_encoder<T>(&self, mut stream: T) -> Result<(), H3Error>
356 where
357 T: AsyncWrite + Unpin + Send,
358 {
359 self.encoder_dynamic_table
360 .run_writer(&mut stream, self.swansong.clone())
361 .await
362 }
363
364 /// Run the outbound QPACK decoder stream for the duration of the connection.
365 ///
366 /// Writes the stream type byte, then loops sending Section Acknowledgement and Insert
367 /// Count Increment instructions as they become needed. Returns when the connection
368 /// shuts down.
369 ///
370 /// # Errors
371 ///
372 /// Returns an `H3Error` in case of io error or http/3 semantic error.
373 pub async fn run_decoder<T>(&self, mut stream: T) -> Result<(), H3Error>
374 where
375 T: AsyncWrite + Unpin + Send,
376 {
377 self.decoder_dynamic_table
378 .run_writer(&mut stream, self.swansong.clone())
379 .await
380 }
381
382 /// Handle an inbound unidirectional HTTP/3 stream from the peer.
383 ///
384 /// Internal stream types (control, QPACK encoder/decoder) are handled automatically;
385 /// application streams are returned via [`UniStreamResult`] for the caller to process.
386 ///
387 /// # Errors
388 ///
389 /// Returns a `H3Error` in case of io error or http/3 semantic error.
390 pub async fn process_inbound_uni<T>(&self, mut stream: T) -> Result<UniStreamResult<T>, H3Error>
391 where
392 T: AsyncRead + Unpin + Send,
393 {
394 self.swansong
395 .interrupt(async move {
396 let mut buf = vec![0; 128];
397 let mut filled = 0;
398
399 // Read stream type varint (decode as raw u64 to handle unknown types)
400 let stream_type =
401 read(
402 &mut buf,
403 &mut filled,
404 &mut stream,
405 |data| match quic_varint::decode(data) {
406 Ok(ok) => Ok(Some(ok)),
407 Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
408 // this branch is unreachable because u64 is always From<u64>
409 Err(QuicVarIntError::UnknownValue { bytes, value }) => {
410 Ok(Some((value, bytes)))
411 }
412 },
413 )
414 .await?;
415
416 match UniStreamType::try_from(stream_type) {
417 Ok(UniStreamType::Control) => {
418 log::trace!("H3 inbound uni: control stream");
419 self.run_inbound_control(&mut buf, &mut filled, &mut stream)
420 .await?;
421 Ok(UniStreamResult::Handled)
422 }
423
424 Ok(UniStreamType::QpackEncoder) => {
425 log::trace!(
426 "H3 inbound uni: QPACK encoder stream ({filled} bytes pre-read)"
427 );
428 let mut reader = Prepended {
429 head: &buf[..filled],
430 tail: stream,
431 };
432
433 log::trace!("QPACK encoder stream: started");
434 self.decoder_dynamic_table.run_reader(&mut reader).await?;
435
436 Ok(UniStreamResult::Handled)
437 }
438
439 Ok(UniStreamType::QpackDecoder) => {
440 log::trace!(
441 "H3 inbound uni: QPACK decoder stream ({filled} bytes pre-read)"
442 );
443 let mut reader = Prepended {
444 head: &buf[..filled],
445 tail: stream,
446 };
447 self.encoder_dynamic_table.run_reader(&mut reader).await?;
448 Ok(UniStreamResult::Handled)
449 }
450
451 Ok(UniStreamType::WebTransport) => {
452 log::trace!("H3 inbound uni: WebTransport stream");
453 let session_id = read(&mut buf, &mut filled, &mut stream, |data| {
454 match quic_varint::decode(data) {
455 Ok(ok) => Ok(Some(ok)),
456 Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
457 Err(QuicVarIntError::UnknownValue { bytes, value }) => {
458 Ok(Some((value, bytes)))
459 }
460 }
461 })
462 .await?;
463
464 buf.truncate(filled);
465
466 Ok(UniStreamResult::WebTransport {
467 session_id,
468 stream,
469 buffer: buf.into(),
470 })
471 }
472
473 Ok(UniStreamType::Push) => {
474 // Push streams are server→client per RFC 9114 §4.6. Trillium does
475 // not support HTTP/3 push as initiator or recipient, so we hand
476 // these back as `Unknown` for the caller to dispose of identically
477 // to truly-unknown stream types — the explicit arm exists so trace
478 // output names "push stream" rather than a bare type id.
479 log::trace!("H3 inbound uni: push stream (push not supported)");
480 Ok(UniStreamResult::Unknown {
481 stream_type,
482 stream,
483 })
484 }
485
486 Err(_) => {
487 log::trace!("H3 inbound uni: unknown stream type {stream_type:#x}");
488 Ok(UniStreamResult::Unknown {
489 stream_type,
490 stream,
491 })
492 }
493 }
494 })
495 .await
496 .unwrap_or(Ok(UniStreamResult::Handled)) // interrupted
497 }
498
499 /// Handle the http/3 peer's inbound control stream.
500 ///
501 /// # Errors
502 ///
503 /// Returns a `H3Error` in case of io error or HTTP/3 semantic error.
504 // The first frame must be SETTINGS. After that, watches for
505 // GOAWAY to initiate connection shutdown.
506 async fn run_inbound_control<T>(
507 &self,
508 buf: &mut Vec<u8>,
509 filled: &mut usize,
510 stream: &mut T,
511 ) -> Result<(), H3Error>
512 where
513 T: AsyncRead + Unpin + Send,
514 {
515 // First frame must be SETTINGS (§6.2.1)
516 let settings = read(buf, filled, stream, |data| match Frame::decode(data) {
517 Ok((Frame::Settings(s), consumed)) => Ok(Some((s, consumed))),
518 Ok(_) => Err(H3ErrorCode::FrameUnexpected),
519 Err(FrameDecodeError::Incomplete) => Ok(None),
520 Err(FrameDecodeError::Error(code)) => Err(code),
521 })
522 .await?;
523
524 log::trace!("H3 peer settings: {settings:?}");
525
526 self.peer_settings
527 .set(settings)
528 .map_err(|_| H3ErrorCode::FrameUnexpected)?;
529 self.wake_peer_settings_waiters();
530
531 self.encoder_dynamic_table
532 .initialize_from_peer_settings(settings);
533
534 // Read subsequent frames, watching for GOAWAY
535 loop {
536 let frame = self
537 .swansong
538 .interrupt(read(buf, filled, stream, |data| {
539 match Frame::decode(data) {
540 Ok((frame, consumed)) => Ok(Some((frame, consumed))),
541 Err(FrameDecodeError::Incomplete) => Ok(None),
542 Err(FrameDecodeError::Error(code)) => Err(code),
543 }
544 }))
545 .await
546 .transpose()?;
547
548 match frame {
549 None => {
550 log::trace!("H3 control stream: interrupted by shutdown");
551 return Ok(());
552 }
553
554 Some(Frame::Goaway(id)) => {
555 log::trace!("H3 control stream: peer sent GOAWAY(stream_id={id})");
556 self.swansong.shut_down();
557 return Ok(());
558 }
559
560 Some(Frame::Settings(_)) => {
561 return Err(H3ErrorCode::FrameUnexpected.into());
562 }
563
564 Some(Frame::Unknown(n)) => {
565 // RFC 9114 §7.2.8: unknown frame types MUST be ignored.
566 // We must also consume the payload bytes so the stream stays synchronized.
567 log::trace!("H3 control stream: skipping unknown frame (payload {n} bytes)");
568 let n = usize::try_from(n).unwrap_or(usize::MAX);
569 let in_buf = n.min(*filled);
570 buf.copy_within(in_buf..*filled, 0);
571 *filled -= in_buf;
572 let mut todo = n - in_buf;
573 let mut scratch = [0u8; 256];
574 while todo > 0 {
575 let to_read = todo.min(scratch.len());
576 let n = stream
577 .read(&mut scratch[..to_read])
578 .await
579 .map_err(H3Error::Io)?;
580 if n == 0 {
581 return Err(H3ErrorCode::ClosedCriticalStream.into());
582 }
583 todo -= n;
584 }
585 }
586 other => {
587 log::trace!("H3 control stream: ignoring {other:?}");
588 }
589 }
590 }
591 }
592}
593
594async fn write(
595 buf: &mut Vec<u8>,
596 mut stream: impl AsyncWrite + Unpin + Send,
597 mut f: impl FnMut(&mut [u8]) -> Option<usize>,
598) -> io::Result<usize> {
599 let written = loop {
600 if let Some(w) = f(buf) {
601 break w;
602 }
603 if buf.len() >= MAX_BUFFER_SIZE {
604 return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation"));
605 }
606 buf.resize(buf.len() * 2, 0);
607 };
608
609 stream.write_all(&buf[..written]).await?;
610 stream.flush().await?;
611 Ok(written)
612}
613
614/// An `AsyncRead` adapter that drains a byte slice before reading from an inner stream.
615///
616/// Used in `process_inbound_uni` to replay bytes that were read ahead while
617/// parsing the stream-type varint before dispatching to `run_inbound_encoder`.
618struct Prepended<'a, T> {
619 head: &'a [u8],
620 tail: T,
621}
622
623impl<T: AsyncRead + Unpin> AsyncRead for Prepended<'_, T> {
624 fn poll_read(
625 self: Pin<&mut Self>,
626 cx: &mut Context<'_>,
627 out: &mut [u8],
628 ) -> Poll<io::Result<usize>> {
629 let this = self.get_mut();
630 if !this.head.is_empty() {
631 let n = this.head.len().min(out.len());
632 out[..n].copy_from_slice(&this.head[..n]);
633 this.head = &this.head[n..];
634 return Poll::Ready(Ok(n));
635 }
636 Pin::new(&mut this.tail).poll_read(cx, out)
637 }
638}
639
640/// Read from `stream` into `buf` until `f` can decode a value.
641///
642/// `f` receives the filled portion of the buffer and returns:
643/// - `Ok(Some((value, consumed)))` — success; consumed bytes are removed from the front
644/// - `Ok(None)` — need more data; reads more bytes and retries
645/// - `Err(e)` — unrecoverable error; propagated to caller
646async fn read<R>(
647 buf: &mut Vec<u8>,
648 filled: &mut usize,
649 stream: &mut (impl AsyncRead + Unpin + Send),
650 f: impl Fn(&[u8]) -> Result<Option<(R, usize)>, H3ErrorCode>,
651) -> Result<R, H3Error> {
652 loop {
653 if let Some((result, consumed)) = f(&buf[..*filled])? {
654 buf.copy_within(consumed..*filled, 0);
655 *filled -= consumed;
656 return Ok(result);
657 }
658
659 if *filled >= buf.len() {
660 if buf.len() >= MAX_BUFFER_SIZE {
661 return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation").into());
662 }
663 buf.resize(buf.len() * 2, 0);
664 }
665
666 let n = stream.read(&mut buf[*filled..]).await?;
667 if n == 0 {
668 return Err(io::Error::new(ErrorKind::UnexpectedEof, "stream closed").into());
669 }
670 *filled += n;
671 }
672}