trillium_http/h2/acceptor.rs
1//! HTTP/2 driver loop ([`H2Driver`]) — owns the per-connection TCP transport and runs the
2//! poll-based state machine that demuxes frames, dispatches stream-opens to handler tasks, and
3//! pumps responses back out.
4//!
5//! Created by [`H2Connection::run`]. The runtime adapter calls [`H2Driver::next`] in a
6//! loop (or drives via the [`Stream`] impl, which has the same semantics); each yield either
7//! returns the next opened request stream (a [`Conn`] for the runtime to spawn a handler
8//! task against) or `None` when the connection is closed.
9//!
10//! The driver is a poll-based state machine, not an async fn. A single `drive` call is the
11//! unit of forward progress: it picks up conn-task signals, advances any in-flight response
12//! sends, drains pending outbound bytes, and advances the read cursor — parking with
13//! cancel-safe partial state when no further progress can be made.
14//!
15//! # Module layout
16//!
17//! Driver impl is split across this file and child modules to keep each focused:
18//!
19//! - **`acceptor.rs`** (this file): struct definition, the [`Self::drive`] orchestration loop, I/O
20//! read primitives (`poll_fill_to`, `poll_drain_peer`), and the supporting enums
21//! ([`DriverState`], [`ReadPhase`], [`CloseOutcome`], [`Action`], [`StreamEntry`]).
22//! - **`acceptor::closed_streams`**: bounded ledger of recently-closed streams + reasons, consulted
23//! to pick the right §5.1 error category for stale peer frames.
24//! - **`acceptor::handler_signals`**: conn-task → driver work-pickup boundary. Owns the
25//! `needs_servicing` mailbox protocol — `service_handler_signals`, `pick_up_new_client_streams`,
26//! `has_pending_handler_signals`.
27//! - **`acceptor::outbound`**: outbound write/flush plumbing and `queue_*` frame helpers.
28//! - **`acceptor::recv`**: receive side — frame reader, dispatch, HEADERS+CONTINUATION
29//! accumulation, malformed-request `RST_STREAM`, DATA routing into per-stream recv rings.
30//! - **`acceptor::send`**: send pump — picks up [`SendCursor`][send::SendCursor]s from the
31//! conn-task signal pickup, frames HEADERS / DATA / trailing-HEADERS, signals completion.
32//!
33//! [`H2Connection::run`]: super::H2Connection::run
34//! [`Stream`]: futures_lite::stream::Stream
35
36mod closed_streams;
37mod constants;
38mod handler_signals;
39mod outbound;
40mod recv;
41mod send;
42#[cfg(test)]
43mod tests;
44mod types;
45
46use super::{
47 H2Error, H2ErrorCode, connection::H2Connection, frame::FRAME_HEADER_LEN, role::Role,
48 transport::H2Transport,
49};
50use crate::{
51 Conn,
52 headers::hpack::{HpackDecoder, HpackEncoder},
53};
54use closed_streams::{ClosedReason, ClosedStreams};
55use constants::{
56 INITIAL_CONNECTION_RECV_WINDOW, MAX_BUFFER_SIZE, MAX_DATA_CHUNK_SIZE, MAX_FLOW_CONTROL_WINDOW,
57};
58use futures_lite::io::{AsyncRead, AsyncWrite};
59use hashbrown::HashMap;
60use recv::PendingHeaders;
61use std::{
62 future::Future,
63 io,
64 pin::Pin,
65 sync::Arc,
66 task::{Context, Poll, ready},
67};
68use swansong::ShuttingDown;
69use types::{
70 AcceptorConfig, Action, CloseOutcome, DriverState, Next, ReadPhase, StreamEntry, frame_slice,
71};
72
73/// Owns the per-connection TCP transport and drives the HTTP/2 demux loop.
74///
75/// See the module docs for the high-level driver shape and how its impl is split across the
76/// `recv` and `send` child modules.
77#[derive(Debug)]
78pub struct H2Driver<T> {
79 connection: Arc<H2Connection>,
80 transport: T,
81
82 /// Role this driver runs in — see [`Role`]. Consulted at role-asymmetric branch points
83 /// (preface direction, HEADERS-on-unknown-id, HEADERS-on-known-id).
84 role: Role,
85
86 /// Overall lifecycle position of the driver.
87 state: DriverState,
88
89 /// Future that resolves when the shared `Swansong` begins shutdown. Polled each
90 /// `drive` tick while the driver is running; on resolution the driver queues a
91 /// GOAWAY and transitions to `Closing`, after which the top-of-loop guard returns
92 /// early and we never poll this again on the same acceptor.
93 shutting_down: ShuttingDown,
94
95 /// Inbound byte cursor. Accumulates bytes from the transport across `drive` calls so
96 /// a partial frame read can survive a return to `Poll::Pending`. Always contains
97 /// exactly the bytes of the current frame being accumulated (header, then payload);
98 /// reset after each complete frame is dispatched.
99 read_buf: Vec<u8>,
100 read_filled: usize,
101 read_phase: ReadPhase,
102
103 /// Outbound byte cursor. The driver encodes control frames into `write_buf` and drains
104 /// to the transport via `poll_flush_outbound`. `write_cursor` is the offset of the
105 /// first byte not yet accepted by `poll_write`. After the buffer fully drains, both
106 /// fields are reset and a flush is issued.
107 write_buf: Vec<u8>,
108 write_cursor: usize,
109 write_flush_pending: bool,
110
111 /// HPACK decoder state, shared across all header blocks on this connection.
112 hpack: HpackDecoder,
113
114 /// HPACK encoder state. The driver is the sole owner — handlers / conn tasks
115 /// no longer touch it, so this is a plain field with no synchronization.
116 hpack_encoder: HpackEncoder,
117
118 /// Per-stream state, keyed by stream id. Driver-only — handler tasks hold their own
119 /// `Arc<StreamState>` via [`H2Transport`] and don't consult this table. The entry
120 /// bundles the shared state with driver-private bookkeeping (e.g. "have we already
121 /// advertised the recv window after seeing `is_reading`?").
122 streams: HashMap<u32, StreamEntry>,
123
124 /// Highest peer-initiated stream id seen so far. Peer-initiated (client) stream ids
125 /// must be odd and strictly increasing.
126 last_peer_stream_id: u32,
127
128 /// Accumulator for an in-progress HEADERS block that is waiting on further CONTINUATION
129 /// frames. `None` outside a HEADERS block. The spec forbids any frame on any stream
130 /// from interleaving while this is `Some`.
131 pending_headers: Option<PendingHeaders>,
132
133 /// Set once the driver decides to close: graceful (peer GOAWAY / server swansong / peer
134 /// EOF) or erroring (protocol violation → GOAWAY with code, or I/O failure → no
135 /// GOAWAY). `drive` completes (returns `None` or a final `Some(Err(...))`) once
136 /// outbound drains to empty.
137 close_outcome: Option<CloseOutcome>,
138
139 /// Set after `drive` yields its terminal result. Subsequent calls return `None` without
140 /// touching the transport.
141 finished: bool,
142
143 /// Reusable scratch the send pump reads body chunks into before framing as DATA.
144 /// Sized at [`MAX_DATA_CHUNK_SIZE`] — even if the peer permits larger frames we cap our
145 /// DATA emissions here to bound per-connection memory.
146 body_scratch: Vec<u8>,
147
148 /// Connection-level send flow-control window. Tracked as [`i64`] so mid-connection
149 /// `INITIAL_WINDOW_SIZE` reductions can drive per-stream windows temporarily negative
150 /// — kept here to the connection window for symmetry though the connection window
151 /// itself is *not* affected by `SETTINGS_INITIAL_WINDOW_SIZE`. Decremented as we emit
152 /// DATA; incremented by peer `WINDOW_UPDATE(stream_id=0, inc)`. Overflow past
153 /// [`MAX_FLOW_CONTROL_WINDOW`] is a connection-level `FLOW_CONTROL_ERROR`.
154 connection_send_window: i64,
155
156 /// Connection-level recv flow-control window. Starts at the spec's baseline of 65535
157 /// octets and is raised to [`MAX_CONNECTION_RECV_WINDOW`] via an initial
158 /// `WINDOW_UPDATE(0)` right after SETTINGS — the spec forbids SETTINGS from altering
159 /// it, so WU is the only path. Decremented as peer DATA frames arrive (across all
160 /// streams); incremented as the handler-task-side consumption signal is picked up and
161 /// we emit `WINDOW_UPDATE(0, consumed)`. A negative value means the peer overran the
162 /// window — connection-level `FLOW_CONTROL_ERROR`.
163 connection_recv_window: i64,
164
165 /// Bounded ledger of recently-closed streams and why they closed. Consulted by
166 /// [`recv::H2Driver::finalize_headers`] when a HEADERS frame arrives on an id ≤
167 /// `last_peer_stream_id` that's not in the active map, to distinguish `RST_STREAM`-
168 /// closed (stream-level `STREAM_CLOSED`) from `END_STREAM`-closed or never-opened
169 /// (connection-level). See [`ClosedStreams`] for the eviction policy.
170 closed_streams: ClosedStreams,
171
172 /// Snapshot of the h2-relevant fields of [`HttpConfig`][crate::HttpConfig] taken at
173 /// acceptor construction. Copied in because `HttpConfig` is per-server but an acceptor
174 /// is per-connection — the config is effectively immutable over a connection's
175 /// lifetime, and a local copy avoids reaching through [`H2Connection::context`] on
176 /// every policy check.
177 ///
178 /// [`H2Connection::context`]: super::H2Connection::context
179 pub(super) config: AcceptorConfig,
180}
181
182impl<T> H2Driver<T>
183where
184 T: AsyncRead + AsyncWrite + Unpin + Send,
185{
186 pub(super) fn new(connection: Arc<H2Connection>, transport: T, role: Role) -> Self {
187 let shutting_down = connection.swansong().shutting_down();
188 let context = connection.context();
189 let config = AcceptorConfig::from_http_config(context.config());
190 let hpack_encoder = HpackEncoder::new(
191 context.observer.clone(),
192 context.config.dynamic_table_capacity(),
193 context.config.recent_pairs_size(),
194 );
195 Self {
196 connection,
197 transport,
198 role,
199 state: DriverState::AwaitingPreface,
200 shutting_down,
201 read_buf: vec![0u8; FRAME_HEADER_LEN],
202 read_filled: 0,
203 read_phase: ReadPhase::NeedHeader,
204 write_buf: Vec::new(),
205 write_cursor: 0,
206 write_flush_pending: false,
207 hpack: HpackDecoder::new(config.hpack_table_capacity()),
208 hpack_encoder,
209 streams: HashMap::new(),
210 last_peer_stream_id: 0,
211 pending_headers: None,
212 close_outcome: None,
213 finished: false,
214 body_scratch: vec![0u8; MAX_DATA_CHUNK_SIZE as usize],
215 connection_send_window: INITIAL_CONNECTION_RECV_WINDOW,
216 connection_recv_window: INITIAL_CONNECTION_RECV_WINDOW,
217 closed_streams: ClosedStreams::default(),
218 config,
219 }
220 }
221
222 /// The shared [`H2Connection`] this acceptor was created from.
223 pub fn connection(&self) -> &Arc<H2Connection> {
224 &self.connection
225 }
226
227 /// Drive the connection until the next request stream opens, the connection ends, or a
228 /// fatal protocol or I/O error occurs.
229 ///
230 /// Returns `Ok(Some(conn))` for each new request stream — the runtime adapter is
231 /// expected to spawn a handler task that consumes the [`Conn`]. Malformed requests are
232 /// handled internally with a stream-level `RST_STREAM` and never surfaced. Returns
233 /// `Ok(None)` when the connection has been shut down cleanly (peer GOAWAY, our own
234 /// swansong shutdown, peer EOF at a frame boundary).
235 ///
236 /// # Errors
237 ///
238 /// The returned future resolves to an [`H2Error`] for any *connection-level* protocol
239 /// violation detected while decoding peer frames or for an unrecoverable transport I/O
240 /// error. A final GOAWAY is sent before a protocol error is returned (best-effort; I/O
241 /// errors skip it).
242 // Mirrors `StreamExt::next` (a `&mut self -> impl Future<Output = Option<T>>` adapter),
243 // not `Iterator::next`. The driver is also `Stream`, so callers can use either.
244 #[allow(clippy::should_implement_trait)]
245 pub fn next(&mut self) -> Next<'_, T> {
246 Next { driver: self }
247 }
248
249 /// Poll-based driver core. Shared by [`Next`]'s `Future` impl, the [`Stream`] impl on
250 /// [`H2Driver`], and [`H2Initiator`][super::H2Initiator]'s client-side Future impl.
251 ///
252 /// [`Stream`]: futures_lite::stream::Stream
253 #[allow(
254 clippy::too_many_lines,
255 reason = "state-machine orchestration; splitting muddies the read-as-a-recipe shape"
256 )]
257 pub(super) fn drive(
258 &mut self,
259 cx: &mut Context<'_>,
260 ) -> Poll<Option<Result<Conn<H2Transport>, H2Error>>> {
261 if self.finished {
262 return Poll::Ready(None);
263 }
264
265 for loop_number in 0..self.config.copy_loops_per_yield() {
266 log::trace!("h2 drive loop number: {loop_number}");
267 // 1. Conn-task signals. Picks up window-update intent (`is_reading`) and new
268 // `submit_send` submissions, moving them into driver-private state.
269 self.service_handler_signals();
270
271 // 2. Send pump. Turns picked-up SendCursors into HEADERS / DATA / trailing- HEADERS
272 // frame bytes in `write_buf`. Body reads that return Pending leave the cursor in
273 // place — the body's source will wake the driver task.
274 self.advance_outbound_sends(cx);
275
276 // 3. Flush any pending outbound — never re-poll reads when we still owe bytes to the
277 // peer, and never signal closure to the caller before the wire is clean.
278 match self.poll_flush_outbound(cx) {
279 Poll::Ready(Ok(())) => {}
280 Poll::Ready(Err(e)) => {
281 // Flush failure while closing: just take whatever outcome we had and
282 // shelve the fresh I/O error. While running, record and finish.
283 if self.close_outcome.is_none() {
284 self.close_outcome = Some(CloseOutcome::Io(e));
285 }
286 return Poll::Ready(self.finish_with_current_outcome());
287 }
288 Poll::Pending => return Poll::Pending,
289 }
290
291 // 4. If we were closing, outbound is now drained. For graceful (or protocol-error)
292 // shutdowns, transition to `Drained` and wait for the peer to close its write half —
293 // otherwise the peer sees our drop as a reset rather than a clean close. For
294 // I/O-error shutdowns the transport is already untrustworthy, so skip the drain.
295 //
296 // Defer the transition while in-flight streams still have outbound (SendCursor
297 // not yet `Complete`) OR inbound (`recv.eof` not yet set) work. Without this, a
298 // handler that submits trailers *after* the cancellation race resolves (gRPC
299 // `Cancellation::race`) gets stranded with bytes parked in mailboxes, and a
300 // client receiving GOAWAY mid-stream stops decoding incoming frames before the
301 // server's trailing HEADERS arrive. Falls through to step 6 so the recv pump
302 // (also gated on Running|Closing now) keeps running and parks on the transport
303 // read waker rather than the outbound-only `park` here.
304 if self.state == DriverState::Closing {
305 if matches!(self.close_outcome, Some(CloseOutcome::Io(_))) {
306 return Poll::Ready(self.finish_with_current_outcome());
307 }
308 if self.has_active_send_cursors() || self.has_pending_recv() {
309 self.log_closing_blockers();
310 } else {
311 self.set_state(
312 DriverState::Drained,
313 "outbound drained, no in-flight streams",
314 );
315 }
316 }
317
318 // 5. Server-initiated shutdown check. Only relevant while we're running — once we're
319 // past the Closing/Drained transition we've already committed to a close and
320 // re-observing the swansong here would re-enter begin_close in a loop. Post-shutdown
321 // re-polls of `ShuttingDown` are harmless themselves (event_listener-backed, not
322 // single-shot) but the re-entry isn't.
323 if self.state == DriverState::Running
324 && Pin::new(&mut self.shutting_down).poll(cx).is_ready()
325 {
326 self.begin_close(CloseOutcome::Graceful);
327 continue;
328 }
329
330 // 6. State-specific step.
331 match self.state {
332 DriverState::AwaitingPreface => {
333 // Role-asymmetric: server reads the 24-byte preface off the wire; client
334 // writes it to `write_buf` (the next drain tick flushes it, then our
335 // SETTINGS, then the peer's SETTINGS arrives as the first frame in Running).
336 let poll = match self.role {
337 Role::Server => self.poll_read_preface(cx),
338 Role::Client => {
339 self.queue_client_preface();
340 Poll::Ready(Ok(()))
341 }
342 };
343 match poll {
344 Poll::Ready(Ok(())) => {
345 self.set_state(DriverState::NeedsServerSettings, "preface complete");
346 }
347 Poll::Ready(Err(e)) => {
348 self.close_outcome = Some(e);
349 return Poll::Ready(self.finish_with_current_outcome());
350 }
351 Poll::Pending => {
352 if self.park(cx) {
353 return Poll::Pending;
354 }
355 }
356 }
357 }
358
359 DriverState::NeedsServerSettings => {
360 self.queue_settings();
361 // The spec forbids SETTINGS from altering the connection-level
362 // flow-control window — it stays at the 65535 baseline unless we raise
363 // it via `WINDOW_UPDATE(0)`. Do that immediately after SETTINGS so peer
364 // bulk uploads aren't capped at ~5 Mbit/s × RTT.
365 let raise = i64::from(self.config.initial_connection_window_size())
366 - INITIAL_CONNECTION_RECV_WINDOW;
367 if raise > 0 {
368 let raise = u32::try_from(raise).unwrap_or(u32::MAX);
369 self.queue_window_update(0, raise);
370 self.connection_recv_window += i64::from(raise);
371 }
372 self.set_state(DriverState::Running, "initial SETTINGS queued");
373 }
374
375 // Read pump runs in both Running and Closing so a Closing-side driver
376 // (we sent or received GOAWAY) keeps decoding inbound frames for streams
377 // that haven't reached `recv.eof` yet — e.g. trailing HEADERS for an
378 // in-flight server-stream the peer is about to send. New `Action::Emit`
379 // streams are ignored in Closing: post-GOAWAY the peer shouldn't be
380 // opening new ones (and we wouldn't want to dispatch handlers for them
381 // even if it did).
382 DriverState::Running | DriverState::Closing => match self.poll_advance_read(cx) {
383 Poll::Ready(Ok(Action::Continue)) => {}
384 Poll::Ready(Ok(Action::Emit(conn))) => {
385 if self.state == DriverState::Running {
386 return Poll::Ready(Some(Ok(*conn)));
387 }
388 // Closing — drop the conn; outer loop continues processing
389 // remaining in-flight streams until drained.
390 }
391 Poll::Ready(Ok(Action::Close(outcome))) => {
392 self.begin_close(outcome);
393 }
394 // Protocol errors need a GOAWAY on the wire before we terminate;
395 // `begin_close` queues that and transitions us to Closing so the next
396 // outer-loop iteration drains the frame. Io errors short-circuit:
397 // if we're already Closing, the transport is gone, so finish without
398 // looping forever waiting for in-flight streams (`has_pending_recv`
399 // can't decide on its own that the peer is never sending again).
400 Poll::Ready(Err(e)) => {
401 if self.state == DriverState::Closing {
402 self.close_outcome.get_or_insert(e);
403 return Poll::Ready(self.finish_with_current_outcome());
404 }
405 self.begin_close(e);
406 }
407 Poll::Pending => {
408 if self.park(cx) {
409 return Poll::Pending;
410 }
411 }
412 },
413
414 DriverState::Drained => match self.poll_drain_peer(cx) {
415 Poll::Ready(()) => {
416 return Poll::Ready(self.finish_with_current_outcome());
417 }
418 Poll::Pending => return Poll::Pending,
419 },
420 }
421 }
422
423 // Cooperative yield: we made `copy_loops_per_yield` rounds of progress without
424 // hitting an internal Pending. Re-arm immediately and let the runtime pick up
425 // anything else it has waiting before we resume.
426 cx.waker().wake_by_ref();
427 Poll::Pending
428 }
429
430 /// Register the driver's waker with the shared `outbound_waker` (so handler tasks can
431 /// wake the driver) and tell the caller whether it's safe to park. Returns `true` if
432 /// the driver should return `Poll::Pending`, or `false` if a handler produced work
433 /// between our last check and the registration — in which case the caller should loop
434 /// around to pick it up.
435 fn park(&mut self, cx: &mut Context<'_>) -> bool {
436 self.connection.outbound_waker().register(cx.waker());
437 !self.has_pending_handler_signals() && !self.has_pending_outbound_progress()
438 }
439
440 /// Convert the current `close_outcome` into the terminal return of [`Self::drive`]. Must
441 /// only be called after outbound bytes have been flushed. Graceful closes return `None`;
442 /// errors surface as a final `Some(Err(...))` before subsequent polls return `None`.
443 fn finish_with_current_outcome(&mut self) -> Option<Result<Conn<H2Transport>, H2Error>> {
444 self.finished = true;
445 // Complete every outstanding `H2Connection::send_ping` future with an error so
446 // awaiting callers don't block forever. Safe to call regardless of outcome —
447 // a no-op if no pings are in flight.
448 self.connection.fail_pending_pings(
449 io::ErrorKind::ConnectionAborted,
450 "h2 connection closed before PING ACK",
451 );
452 // Wake any `PeerSettings` waiters so a peer that disconnects without ever sending
453 // SETTINGS doesn't strand them. Their `poll` rechecks swansong state and returns
454 // Ready; the caller's follow-up operation surfaces the connection-closed error.
455 self.connection.wake_peer_settings_waiters();
456 match self.close_outcome.take() {
457 None | Some(CloseOutcome::Graceful) => None,
458 Some(CloseOutcome::Protocol(code)) => Some(Err(H2Error::Protocol(code))),
459 Some(CloseOutcome::Io(e)) => Some(Err(H2Error::Io(e))),
460 }
461 }
462
463 /// Enter the closing state: record the outcome and queue a GOAWAY (only for outcomes
464 /// that warrant one). The main loop will drain `write_buf` and then finish.
465 fn begin_close(&mut self, outcome: CloseOutcome) {
466 // Idempotent: with the recv pump now running in Closing (so we keep
467 // decoding inbound frames for in-flight streams across GOAWAY), a peer
468 // GOAWAY arriving after we've already begun closing would otherwise
469 // re-queue our own GOAWAY and re-enter Closing, ping-ponging forever
470 // with a peer that mirrors the behavior.
471 if self.state == DriverState::Closing || self.state == DriverState::Drained {
472 log::trace!(
473 "h2 driver: begin_close({outcome:?}) — already in {:?}, ignoring",
474 self.state,
475 );
476 return;
477 }
478 // Don't overwrite a prior outcome (e.g. if an error fires in the middle of a
479 // graceful shutdown, keep the error).
480 let code = match &outcome {
481 CloseOutcome::Graceful => Some(H2ErrorCode::NoError),
482 CloseOutcome::Protocol(code) => Some(*code),
483 CloseOutcome::Io(_) => None,
484 };
485 let reason = match &outcome {
486 CloseOutcome::Graceful => "graceful close",
487 CloseOutcome::Protocol(_) => "protocol error",
488 CloseOutcome::Io(_) => "i/o error",
489 };
490 if self.close_outcome.is_none() {
491 self.close_outcome = Some(outcome);
492 }
493 if let Some(code) = code {
494 self.queue_goaway(self.last_peer_stream_id, code);
495 }
496 self.set_state(DriverState::Closing, reason);
497 }
498
499 /// The sole mutator of `self.state`. Logs every transition so a trace log reads as
500 /// a sequence of named lifecycle events.
501 fn set_state(&mut self, new: DriverState, reason: &'static str) {
502 if self.state == new {
503 return;
504 }
505 log::trace!(
506 "h2 driver: state {old:?} → {new:?} ({reason})",
507 old = self.state,
508 );
509 self.state = new;
510 }
511
512 /// Log which in-flight streams are blocking the `Closing → Drained` transition.
513 /// Called from the closing-state check when at least one predicate (`has_active_send_cursors`
514 /// or `has_pending_recv`) is still true, so a trace log shows exactly which streams the
515 /// driver is waiting on.
516 fn log_closing_blockers(&self) {
517 if !log::log_enabled!(log::Level::Trace) {
518 return;
519 }
520 for (id, entry) in &self.streams {
521 let lifecycle = entry.shared.lifecycle_lock();
522 if entry.send.is_some() || lifecycle.has_active_send() || lifecycle.has_pending_recv() {
523 log::trace!(
524 "h2 driver: Closing — stream {id} blocking drain (lifecycle={lifecycle:?}, \
525 cursor_present={})",
526 entry.send.is_some(),
527 );
528 }
529 }
530 }
531
532 /// Read bytes from the transport into `read_buf[read_filled..target]` until
533 /// `read_filled >= target`. Cancel-safe: if the caller drops the Future, any bytes
534 /// already placed are preserved in the buffer.
535 ///
536 /// A 0-byte read is surfaced as `UnexpectedEof`. The caller maps this to a terminal
537 /// I/O error; we don't emit a GOAWAY on peer-initiated close.
538 fn poll_fill_to(&mut self, target: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
539 if self.read_buf.len() < target {
540 self.read_buf.resize(target, 0);
541 }
542 while self.read_filled < target {
543 let n = ready!(
544 Pin::new(&mut self.transport)
545 .poll_read(cx, &mut self.read_buf[self.read_filled..target])
546 )?;
547 if n == 0 {
548 return Poll::Ready(Err(io::Error::from(io::ErrorKind::UnexpectedEof)));
549 }
550 self.read_filled += n;
551 }
552 Poll::Ready(Ok(()))
553 }
554
555 /// Post-GOAWAY, drain whatever inbound bytes are *immediately* available from the
556 /// peer so our Drop sends a clean FIN (no unread data → no TCP RST) while the peer
557 /// sees the GOAWAY we just emitted. Read loops internally: consume each Ready chunk,
558 /// discard it, ask for more. Exits as soon as the transport returns `Pending` (no
559 /// bytes available right now) OR `Ready(0)` (peer FIN already arrived) OR any error.
560 ///
561 /// Does **not** register the waker on `Pending` — we're actively closing, not
562 /// observing the peer. A peer that happens to send more bytes after our exit will
563 /// have those bytes dropped when the transport is closed; that's a race the peer
564 /// chose to lose by sending after receiving our GOAWAY.
565 ///
566 /// Returning `Ready(())` unconditionally (no `Pending` case) lets the caller finalize
567 /// immediately. The `Poll` wrapper is kept for symmetry with the rest of the driver's
568 /// poll-style methods.
569 fn poll_drain_peer(&mut self, cx: &mut Context<'_>) -> Poll<()> {
570 // A peer flooding us with bytes could keep this loop going a long time. Cap it
571 // so a pathological client can't pin our close-out forever.
572 const MAX_DISCARD_ITERATIONS: usize = 256;
573 // Lightweight scratch — we're throwing it away. 512 balances "drain in few
574 // iterations" against "don't hold a large buffer for a rare path."
575 let mut scratch = [0u8; 512];
576 for _ in 0..MAX_DISCARD_ITERATIONS {
577 // We pass `cx` through for the benefit of the transport's `poll_read` contract,
578 // but we *interpret* `Pending` as "done draining" rather than parking on it —
579 // we're actively closing, not observing. A peer that sends more bytes after
580 // our exit loses the race.
581 match Pin::new(&mut self.transport).poll_read(cx, &mut scratch) {
582 Poll::Ready(Ok(0) | Err(_)) | Poll::Pending => {
583 return Poll::Ready(());
584 }
585 Poll::Ready(Ok(_)) => {}
586 }
587 }
588 Poll::Ready(())
589 }
590
591 /// Look up why a stream is closed. `None` means either never-opened or evicted from the
592 /// bounded ledger — both fall through to the connection-level default.
593 pub(super) fn closed_reason(&self, stream_id: u32) -> Option<ClosedReason> {
594 self.closed_streams.reason(stream_id)
595 }
596}