trillium_http/h2/acceptor.rs
1//! HTTP/2 driver loop ([`H2Driver`]) — owns the per-connection TCP transport and runs the
2//! poll-based state machine that demuxes frames, dispatches stream-opens to handler tasks, and
3//! pumps responses back out.
4//!
5//! Created by [`H2Connection::run`]. The runtime adapter calls [`H2Driver::next`] in a
6//! loop (or drives via the [`Stream`] impl, which has the same semantics); each yield either
7//! returns the next opened request stream (a [`Conn`] for the runtime to spawn a handler
8//! task against) or `None` when the connection is closed.
9//!
10//! The driver is a poll-based state machine, not an async fn. A single `drive` call is the
11//! unit of forward progress: it picks up conn-task signals, advances any in-flight response
12//! sends, drains pending outbound bytes, and advances the read cursor — parking with
13//! cancel-safe partial state when no further progress can be made.
14//!
15//! # Module layout
16//!
17//! Driver impl is split across this file and child modules to keep each focused:
18//!
19//! - **`acceptor.rs`** (this file): struct definition, the [`Self::drive`] orchestration loop, I/O
20//! read primitives (`poll_fill_to`, `poll_drain_peer`), and the supporting enums
21//! ([`DriverState`], [`ReadPhase`], [`CloseOutcome`], [`Action`], [`StreamEntry`]).
22//! - **`acceptor::closed_streams`**: bounded ledger of recently-closed streams + reasons, consulted
23//! to pick the right §5.1 error category for stale peer frames.
24//! - **`acceptor::handler_signals`**: conn-task → driver work-pickup boundary. Owns the
25//! `needs_servicing` mailbox protocol — `service_handler_signals`, `pick_up_new_client_streams`,
26//! `has_pending_handler_signals`.
27//! - **`acceptor::outbound`**: outbound write/flush plumbing and `queue_*` frame helpers.
28//! - **`acceptor::recv`**: receive side — frame reader, dispatch, HEADERS+CONTINUATION
29//! accumulation, malformed-request `RST_STREAM`, DATA routing into per-stream recv rings.
30//! - **`acceptor::send`**: send pump — picks up [`SendCursor`][send::SendCursor]s from the
31//! conn-task signal pickup, frames HEADERS / DATA / trailing-HEADERS, signals completion.
32//!
33//! [`H2Connection::run`]: super::H2Connection::run
34//! [`Stream`]: futures_lite::stream::Stream
35
36mod closed_streams;
37mod constants;
38mod handler_signals;
39mod outbound;
40mod recv;
41mod send;
42#[cfg(test)]
43mod tests;
44mod types;
45
46use super::{
47 H2Error, H2ErrorCode, connection::H2Connection, frame::FRAME_HEADER_LEN, role::Role,
48 stream_state::StreamEvent, transport::H2Transport,
49};
50use crate::{
51 Conn,
52 headers::hpack::{HpackDecoder, HpackEncoder},
53};
54use closed_streams::{ClosedReason, ClosedStreams};
55use constants::{
56 INITIAL_CONNECTION_RECV_WINDOW, MAX_BUFFER_SIZE, MAX_DATA_CHUNK_SIZE, MAX_FLOW_CONTROL_WINDOW,
57};
58use futures_lite::io::{AsyncRead, AsyncWrite};
59use recv::PendingHeaders;
60use std::{
61 collections::BTreeMap,
62 future::Future,
63 io,
64 pin::Pin,
65 sync::Arc,
66 task::{Context, Poll, ready},
67};
68use swansong::ShuttingDown;
69use types::{
70 AcceptorConfig, Action, CloseOutcome, DriverState, Next, ReadPhase, StreamEntry, frame_slice,
71};
72
73/// Owns the per-connection TCP transport and drives the HTTP/2 demux loop.
74///
75/// See the module docs for the high-level driver shape and how its impl is split across the
76/// `recv` and `send` child modules.
77#[derive(Debug)]
78pub struct H2Driver<T> {
79 connection: Arc<H2Connection>,
80 transport: T,
81
82 /// Role this driver runs in — see [`Role`]. Consulted at role-asymmetric branch points
83 /// (preface direction, HEADERS-on-unknown-id, HEADERS-on-known-id).
84 role: Role,
85
86 /// Overall lifecycle position of the driver.
87 state: DriverState,
88
89 /// Future that resolves when the shared `Swansong` begins shutdown. Polled each
90 /// `drive` tick while the driver is running; on resolution the driver queues a
91 /// GOAWAY and transitions to `Closing`, after which the top-of-loop guard returns
92 /// early and we never poll this again on the same acceptor.
93 shutting_down: ShuttingDown,
94
95 /// Inbound byte cursor. Accumulates bytes from the transport across `drive` calls so
96 /// a partial frame read can survive a return to `Poll::Pending`. Always contains
97 /// exactly the bytes of the current frame being accumulated (header, then payload);
98 /// reset after each complete frame is dispatched.
99 read_buf: Vec<u8>,
100 read_filled: usize,
101 read_phase: ReadPhase,
102
103 /// Outbound byte cursor. The driver encodes control frames into `write_buf` and drains
104 /// to the transport via `poll_flush_outbound`. `write_cursor` is the offset of the
105 /// first byte not yet accepted by `poll_write`. After the buffer fully drains, both
106 /// fields are reset and a flush is issued.
107 write_buf: Vec<u8>,
108 write_cursor: usize,
109 write_flush_pending: bool,
110
111 /// HPACK decoder state, shared across all header blocks on this connection.
112 hpack: HpackDecoder,
113
114 /// HPACK encoder state. The driver is the sole owner — handlers / conn tasks
115 /// no longer touch it, so this is a plain field with no synchronization.
116 hpack_encoder: HpackEncoder,
117
118 /// Per-stream state, keyed by stream id. Driver-only — handler tasks hold their own
119 /// `Arc<StreamState>` via [`H2Transport`] and don't consult this table. The entry
120 /// bundles the shared state with driver-private bookkeeping (e.g. "have we already
121 /// advertised the recv window after seeing `is_reading`?").
122 ///
123 /// A `BTreeMap` (not a hash map) so the send pump iterates streams in ascending
124 /// stream-id order. For the client role this is load-bearing: a client MUST send
125 /// opening HEADERS in monotonically increasing stream-id order (RFC 9113 §5.1.1),
126 /// and concurrent `open_stream` calls would otherwise let the pump frame a higher
127 /// id before a lower one, drawing a `GOAWAY(PROTOCOL_ERROR)` from the peer. (See
128 /// also the allocate-under-`streams_lock` ordering in `open_stream`.)
129 streams: BTreeMap<u32, StreamEntry>,
130
131 /// Highest peer-initiated stream id seen so far. Peer-initiated (client) stream ids
132 /// must be odd and strictly increasing.
133 last_peer_stream_id: u32,
134
135 /// Accumulator for an in-progress HEADERS block that is waiting on further CONTINUATION
136 /// frames. `None` outside a HEADERS block. The spec forbids any frame on any stream
137 /// from interleaving while this is `Some`.
138 pending_headers: Option<PendingHeaders>,
139
140 /// Set once the driver decides to close: graceful (peer GOAWAY / server swansong / peer
141 /// EOF) or erroring (protocol violation → GOAWAY with code, or I/O failure → no
142 /// GOAWAY). `drive` completes (returns `None` or a final `Some(Err(...))`) once
143 /// outbound drains to empty.
144 close_outcome: Option<CloseOutcome>,
145
146 /// Set after `drive` yields its terminal result. Subsequent calls return `None` without
147 /// touching the transport.
148 finished: bool,
149
150 /// Reusable scratch the send pump reads body chunks into before framing as DATA.
151 /// Sized at [`MAX_DATA_CHUNK_SIZE`] — even if the peer permits larger frames we cap our
152 /// DATA emissions here to bound per-connection memory.
153 body_scratch: Vec<u8>,
154
155 /// Reusable scratch the HPACK encoder writes a HEADERS block into before it is copied
156 /// into `write_buf` as HEADERS/CONTINUATION fragments. Retained across responses so the
157 /// steady-state header encode allocates nothing.
158 headers_scratch: Vec<u8>,
159
160 /// Connection-level send flow-control window. Tracked as [`i64`] for symmetry with the
161 /// per-stream windows, which a mid-connection `INITIAL_WINDOW_SIZE` reduction can drive
162 /// temporarily negative; the connection window itself is *not* affected by
163 /// `SETTINGS_INITIAL_WINDOW_SIZE`. Decremented as we emit DATA; incremented by peer
164 /// `WINDOW_UPDATE(stream_id=0, inc)`. Overflow past [`MAX_FLOW_CONTROL_WINDOW`] is a
165 /// connection-level `FLOW_CONTROL_ERROR`.
166 connection_send_window: i64,
167
168 /// Connection-level recv flow-control window. Starts at the spec's baseline of 65535
169 /// octets and is raised to the configured `h2_initial_connection_window_size` via an
170 /// initial `WINDOW_UPDATE(0)` right after SETTINGS — the spec forbids SETTINGS from altering
171 /// it, so WU is the only path. Decremented as peer DATA frames arrive (across all
172 /// streams); incremented as the handler-task-side consumption signal is picked up and
173 /// we emit `WINDOW_UPDATE(0, consumed)`. A negative value means the peer overran the
174 /// window — connection-level `FLOW_CONTROL_ERROR`.
175 connection_recv_window: i64,
176
177 /// Bounded ledger of recently-closed streams and why they closed. Consulted by
178 /// [`recv::H2Driver::finalize_headers`] when a HEADERS frame arrives on an id ≤
179 /// `last_peer_stream_id` that's not in the active map, to distinguish `RST_STREAM`-
180 /// closed (stream-level `STREAM_CLOSED`) from `END_STREAM`-closed or never-opened
181 /// (connection-level). See [`ClosedStreams`] for the eviction policy.
182 closed_streams: ClosedStreams,
183
184 /// Snapshot of the h2-relevant fields of [`HttpConfig`][crate::HttpConfig] taken at
185 /// acceptor construction. Copied in because `HttpConfig` is per-server but an acceptor
186 /// is per-connection — the config is effectively immutable over a connection's
187 /// lifetime, and a local copy avoids reaching through [`H2Connection::context`] on
188 /// every policy check.
189 ///
190 /// [`H2Connection::context`]: super::H2Connection::context
191 pub(super) config: AcceptorConfig,
192}
193
194impl<T> H2Driver<T>
195where
196 T: AsyncRead + AsyncWrite + Unpin + Send,
197{
198 pub(super) fn new(connection: Arc<H2Connection>, transport: T, role: Role) -> Self {
199 let shutting_down = connection.swansong().shutting_down();
200 let context = connection.context();
201 let config = AcceptorConfig::from_http_config(context.config());
202 let hpack_encoder = HpackEncoder::new(
203 context.observer.clone(),
204 context.config.dynamic_table_capacity(),
205 context.config.recent_pairs_size(),
206 );
207 Self {
208 connection,
209 transport,
210 role,
211 state: DriverState::AwaitingPreface,
212 shutting_down,
213 read_buf: vec![0u8; FRAME_HEADER_LEN],
214 read_filled: 0,
215 read_phase: ReadPhase::NeedHeader,
216 write_buf: Vec::new(),
217 write_cursor: 0,
218 write_flush_pending: false,
219 hpack: HpackDecoder::new(config.hpack_table_capacity()),
220 hpack_encoder,
221 streams: BTreeMap::new(),
222 last_peer_stream_id: 0,
223 pending_headers: None,
224 close_outcome: None,
225 finished: false,
226 body_scratch: vec![0u8; MAX_DATA_CHUNK_SIZE as usize],
227 headers_scratch: Vec::new(),
228 connection_send_window: INITIAL_CONNECTION_RECV_WINDOW,
229 connection_recv_window: INITIAL_CONNECTION_RECV_WINDOW,
230 closed_streams: ClosedStreams::default(),
231 config,
232 }
233 }
234
235 /// The shared [`H2Connection`] this acceptor was created from.
236 pub fn connection(&self) -> &Arc<H2Connection> {
237 &self.connection
238 }
239
240 /// Drive the connection until the next request stream opens, the connection ends, or a
241 /// fatal protocol or I/O error occurs.
242 ///
243 /// Returns `Ok(Some(conn))` for each new request stream — the runtime adapter is
244 /// expected to spawn a handler task that consumes the [`Conn`]. Malformed requests are
245 /// handled internally with a stream-level `RST_STREAM` and never surfaced. Returns
246 /// `Ok(None)` when the connection has been shut down cleanly (peer GOAWAY, our own
247 /// swansong shutdown, peer EOF at a frame boundary).
248 ///
249 /// # Errors
250 ///
251 /// The returned future resolves to an [`H2Error`] for any *connection-level* protocol
252 /// violation detected while decoding peer frames or for an unrecoverable transport I/O
253 /// error. A final GOAWAY is sent before a protocol error is returned (best-effort; I/O
254 /// errors skip it).
255 // Mirrors `StreamExt::next` (a `&mut self -> impl Future<Output = Option<T>>` adapter),
256 // not `Iterator::next`. The driver is also `Stream`, so callers can use either.
257 #[allow(clippy::should_implement_trait)]
258 pub fn next(&mut self) -> Next<'_, T> {
259 Next { driver: self }
260 }
261
262 /// Poll-based driver core. Shared by [`Next`]'s `Future` impl, the [`Stream`] impl on
263 /// [`H2Driver`], and [`H2Initiator`][super::H2Initiator]'s client-side Future impl.
264 ///
265 /// [`Stream`]: futures_lite::stream::Stream
266 #[allow(
267 clippy::too_many_lines,
268 reason = "state-machine orchestration; splitting muddies the read-as-a-recipe shape"
269 )]
270 pub(super) fn drive(
271 &mut self,
272 cx: &mut Context<'_>,
273 ) -> Poll<Option<Result<Conn<H2Transport>, H2Error>>> {
274 if self.finished {
275 return Poll::Ready(None);
276 }
277
278 for loop_number in 0..self.config.copy_loops_per_yield() {
279 log::trace!("h2 drive loop number: {loop_number}");
280 // 1. Conn-task signals. Picks up window-update intent (`is_reading`) and new
281 // `submit_send` submissions, moving them into driver-private state.
282 self.service_handler_signals();
283
284 // 2. Send pump. Turns picked-up SendCursors into HEADERS / DATA / trailing- HEADERS
285 // frame bytes in `write_buf`. Body reads that return Pending leave the cursor in
286 // place — the body's source will wake the driver task.
287 self.advance_outbound_sends(cx);
288
289 // 3. Flush any pending outbound — never re-poll reads when we still owe bytes to the
290 // peer, and never signal closure to the caller before the wire is clean.
291 match self.poll_flush_outbound(cx) {
292 Poll::Ready(Ok(())) => {}
293 Poll::Ready(Err(e)) => {
294 // Flush failure while closing: just take whatever outcome we had and
295 // shelve the fresh I/O error. While running, record and finish.
296 if self.close_outcome.is_none() {
297 self.close_outcome = Some(CloseOutcome::Io(e));
298 }
299 return Poll::Ready(self.finish_with_current_outcome());
300 }
301 Poll::Pending => return Poll::Pending,
302 }
303
304 // 4. If we were closing, outbound is now drained. For graceful (or protocol-error)
305 // shutdowns, transition to `Drained` and wait for the peer to close its write half —
306 // otherwise the peer sees our drop as a reset rather than a clean close. For
307 // I/O-error shutdowns the transport is already untrustworthy, so skip the drain.
308 // Defer the transition while in-flight streams still have outbound (an active
309 // SendCursor or queued parts), an open send half (a handler that hasn't submitted
310 // its response yet — half-closed-remote is *not* drained), OR inbound (recv half not
311 // yet closed) work. Without this, a handler that submits trailers *after* the
312 // cancellation race resolves gets stranded with bytes parked in mailboxes; a handler
313 // that hasn't responded yet when shutdown begins has its response `SubmitSend`
314 // orphaned by a driver that finished out from under it; and a client receiving
315 // GOAWAY mid-stream stops decoding incoming frames before the server's trailing
316 // HEADERS arrive. Falls through to step 6 so the recv pump (also gated on
317 // Running|Closing now) keeps running and parks on the transport read waker rather
318 // than the outbound-only `park` here.
319 if self.state == DriverState::Closing {
320 if matches!(self.close_outcome, Some(CloseOutcome::Io(_))) {
321 return Poll::Ready(self.finish_with_current_outcome());
322 }
323 if self.has_active_send_cursors()
324 || self.has_open_send_half()
325 || self.has_pending_recv()
326 {
327 self.log_closing_blockers();
328 } else {
329 self.set_state(
330 DriverState::Drained,
331 "outbound drained, no in-flight streams",
332 );
333 }
334 }
335
336 // 5. Server-initiated shutdown check. Only relevant while we're running — once we're
337 // past the Closing/Drained transition we've already committed to a close and
338 // re-observing the swansong here would re-enter begin_close in a loop. Post-shutdown
339 // re-polls of `ShuttingDown` are harmless themselves (event_listener-backed, not
340 // single-shot) but the re-entry isn't.
341 if self.state == DriverState::Running
342 && Pin::new(&mut self.shutting_down).poll(cx).is_ready()
343 {
344 self.begin_close(CloseOutcome::Graceful);
345 continue;
346 }
347
348 // 6. State-specific step.
349 match self.state {
350 DriverState::AwaitingPreface => {
351 // Role-asymmetric: server reads the 24-byte preface off the wire; client
352 // writes it to `write_buf` (the next drain tick flushes it, then our
353 // SETTINGS, then the peer's SETTINGS arrives as the first frame in Running).
354 let poll = match self.role {
355 Role::Server => self.poll_read_preface(cx),
356 Role::Client => {
357 self.queue_client_preface();
358 Poll::Ready(Ok(()))
359 }
360 };
361 match poll {
362 Poll::Ready(Ok(())) => {
363 self.set_state(DriverState::NeedsServerSettings, "preface complete");
364 }
365 Poll::Ready(Err(e)) => {
366 self.close_outcome = Some(e);
367 return Poll::Ready(self.finish_with_current_outcome());
368 }
369 Poll::Pending => {
370 if self.park(cx) {
371 return Poll::Pending;
372 }
373 }
374 }
375 }
376
377 DriverState::NeedsServerSettings => {
378 self.queue_settings();
379 // The spec forbids SETTINGS from altering the connection-level
380 // flow-control window — it stays at the 65535 baseline unless we raise
381 // it via `WINDOW_UPDATE(0)`. Do that immediately after SETTINGS so peer
382 // bulk uploads aren't capped at ~5 Mbit/s × RTT.
383 let raise = i64::from(self.config.initial_connection_window_size())
384 - INITIAL_CONNECTION_RECV_WINDOW;
385 if raise > 0 {
386 let raise = u32::try_from(raise).unwrap_or(u32::MAX);
387 self.queue_window_update(0, raise);
388 self.connection_recv_window += i64::from(raise);
389 }
390 self.set_state(DriverState::Running, "initial SETTINGS queued");
391 }
392
393 // Read pump runs in both Running and Closing so a Closing-side driver
394 // (we sent or received GOAWAY) keeps decoding inbound frames for streams
395 // that haven't reached recv-closed yet — e.g. trailing HEADERS for an
396 // in-flight server-stream the peer is about to send. New `Action::Emit`
397 // streams are ignored in Closing: post-GOAWAY the peer shouldn't be
398 // opening new ones (and we wouldn't want to dispatch handlers for them
399 // even if it did).
400 DriverState::Running | DriverState::Closing => match self.poll_advance_read(cx) {
401 Poll::Ready(Ok(Action::Continue)) => {}
402 Poll::Ready(Ok(Action::Emit(conn))) => {
403 if self.state == DriverState::Running {
404 return Poll::Ready(Some(Ok(*conn)));
405 }
406 // Closing — drop the conn; outer loop continues processing
407 // remaining in-flight streams until drained.
408 }
409 Poll::Ready(Ok(Action::Close(outcome))) => {
410 self.begin_close(outcome);
411 }
412 // Protocol errors need a GOAWAY on the wire before we terminate;
413 // `begin_close` queues that and transitions us to Closing so the next
414 // outer-loop iteration drains the frame. Io errors short-circuit:
415 // if we're already Closing, the transport is gone, so finish without
416 // looping forever waiting for in-flight streams (`has_pending_recv`
417 // can't decide on its own that the peer is never sending again).
418 Poll::Ready(Err(e)) => {
419 if self.state == DriverState::Closing {
420 self.close_outcome.get_or_insert(e);
421 return Poll::Ready(self.finish_with_current_outcome());
422 }
423 self.begin_close(e);
424 }
425 Poll::Pending => {
426 if self.park(cx) {
427 return Poll::Pending;
428 }
429 }
430 },
431
432 DriverState::Drained => match self.poll_drain_peer(cx) {
433 Poll::Ready(()) => {
434 return Poll::Ready(self.finish_with_current_outcome());
435 }
436 Poll::Pending => return Poll::Pending,
437 },
438 }
439 }
440
441 // Cooperative yield: we made `copy_loops_per_yield` rounds of progress without
442 // hitting an internal Pending. Re-arm immediately and let the runtime pick up
443 // anything else it has waiting before we resume.
444 cx.waker().wake_by_ref();
445 Poll::Pending
446 }
447
448 /// Register the driver's waker with the shared `outbound_waker` (so handler tasks can
449 /// wake the driver) and tell the caller whether it's safe to park. Returns `true` if
450 /// the driver should return `Poll::Pending`, or `false` if a handler produced work
451 /// between our last check and the registration — in which case the caller should loop
452 /// around to pick it up.
453 fn park(&mut self, cx: &mut Context<'_>) -> bool {
454 self.connection.outbound_waker().register(cx.waker());
455 !self.has_pending_handler_signals() && !self.has_pending_outbound_progress()
456 }
457
458 /// Convert the current `close_outcome` into the terminal return of [`Self::drive`]. Must
459 /// only be called after outbound bytes have been flushed. Graceful closes return `None`;
460 /// errors surface as a final `Some(Err(...))` before subsequent polls return `None`.
461 fn finish_with_current_outcome(&mut self) -> Option<Result<Conn<H2Transport>, H2Error>> {
462 self.finished = true;
463 // Complete every outstanding `H2Connection::send_ping` future with an error so
464 // awaiting callers don't block forever. Safe to call regardless of outcome —
465 // a no-op if no pings are in flight.
466 self.connection.fail_pending_pings(
467 io::ErrorKind::ConnectionAborted,
468 "h2 connection closed before PING ACK",
469 );
470 // Wake any `PeerSettings` waiters so a peer that disconnects without ever sending
471 // SETTINGS doesn't strand them. Their `poll` rechecks swansong state and returns
472 // Ready; the caller's follow-up operation surfaces the connection-closed error.
473 self.connection.wake_peer_settings_waiters();
474 // Resolve every still-live stream's recv-side waiters. A connection that dies with
475 // an in-flight stream (server GOAWAY + close, peer FIN, I/O error) leaves any task
476 // parked on the response — `response_headers`, a body `poll_read`, an upgrade
477 // `poll_write` — with no other wake source. Without this a client request hangs
478 // forever on a graceful server shutdown. Mirror the per-stream RST teardown:
479 // terminal `Reset` (recv reports eof → `ResponseHeaders` yields `ConnectionAborted`,
480 // reads return EOF, writes `BrokenPipe`) + the same waker fan-out.
481 let reset_code = match &self.close_outcome {
482 Some(CloseOutcome::Protocol(code)) => *code,
483 _ => H2ErrorCode::NoError,
484 };
485 for entry in self.streams.values() {
486 // Move each still-live stream to `Closed{Reset}` (a no-op on streams already closed, so
487 // an existing reason isn't clobbered), then fan out every recv/send waker so parked
488 // tasks observe the close instead of hanging.
489 let _ = entry.shared.apply_event(StreamEvent::RecvReset(reset_code));
490 entry.shared.recv.waker.wake();
491 entry.shared.recv.response_headers_waker.wake();
492 entry.shared.send.outbound_write_waker.wake();
493 // A handler already parked in `SubmitSend` (response staged, awaiting the driver to
494 // frame it) needs this wake to re-poll and observe the now-reset stream — the recv
495 // fan-out above doesn't reach the send-completion waiter.
496 entry.shared.send.completion_waker.wake();
497 }
498 match self.close_outcome.take() {
499 None | Some(CloseOutcome::Graceful) => None,
500 Some(CloseOutcome::Protocol(code)) => Some(Err(H2Error::Protocol(code))),
501 Some(CloseOutcome::Io(e)) => Some(Err(H2Error::Io(e))),
502 }
503 }
504
505 /// Enter the closing state: record the outcome and queue a GOAWAY (only for outcomes
506 /// that warrant one). The main loop will drain `write_buf` and then finish.
507 fn begin_close(&mut self, outcome: CloseOutcome) {
508 // Idempotent: with the recv pump now running in Closing (so we keep
509 // decoding inbound frames for in-flight streams across GOAWAY), a peer
510 // GOAWAY arriving after we've already begun closing would otherwise
511 // re-queue our own GOAWAY and re-enter Closing, ping-ponging forever
512 // with a peer that mirrors the behavior.
513 if self.state == DriverState::Closing || self.state == DriverState::Drained {
514 log::trace!(
515 "h2 driver: begin_close({outcome:?}) — already in {:?}, ignoring",
516 self.state,
517 );
518 return;
519 }
520 // Don't overwrite a prior outcome (e.g. if an error fires in the middle of a
521 // graceful shutdown, keep the error).
522 let code = match &outcome {
523 CloseOutcome::Graceful => Some(H2ErrorCode::NoError),
524 CloseOutcome::Protocol(code) => Some(*code),
525 CloseOutcome::Io(_) => None,
526 };
527 let reason = match &outcome {
528 CloseOutcome::Graceful => "graceful close",
529 CloseOutcome::Protocol(_) => "protocol error",
530 CloseOutcome::Io(_) => "i/o error",
531 };
532 if self.close_outcome.is_none() {
533 self.close_outcome = Some(outcome);
534 }
535 if let Some(code) = code {
536 self.queue_goaway(self.last_peer_stream_id, code);
537 }
538 self.set_state(DriverState::Closing, reason);
539 }
540
541 /// The sole mutator of `self.state`. Logs every transition so a trace log reads as
542 /// a sequence of named lifecycle events.
543 fn set_state(&mut self, new: DriverState, reason: &'static str) {
544 if self.state == new {
545 return;
546 }
547 log::trace!(
548 "h2 driver: state {old:?} → {new:?} ({reason})",
549 old = self.state,
550 );
551 self.state = new;
552 }
553
554 /// Log which in-flight streams are blocking the `Closing → Drained` transition.
555 /// Called from the closing-state check when at least one predicate (`has_active_send_cursors`
556 /// or `has_pending_recv`) is still true, so a trace log shows exactly which streams the
557 /// driver is waiting on.
558 fn log_closing_blockers(&self) {
559 if !log::log_enabled!(log::Level::Trace) {
560 return;
561 }
562 for (id, entry) in &self.streams {
563 let lifecycle = *entry.shared.lifecycle_lock();
564 let queued = !entry
565 .shared
566 .send
567 .queue
568 .lock()
569 .expect("send queue mutex poisoned")
570 .is_empty();
571 if entry.send.is_some() || queued || !lifecycle.recv_closed() {
572 log::trace!(
573 "h2 driver: Closing — stream {id} blocking drain (lifecycle={lifecycle:?}, \
574 cursor_present={}, queued={queued})",
575 entry.send.is_some(),
576 );
577 }
578 }
579 }
580
581 /// Read bytes from the transport into `read_buf[read_filled..target]` until
582 /// `read_filled >= target`. Cancel-safe: if the caller drops the Future, any bytes
583 /// already placed are preserved in the buffer.
584 ///
585 /// A 0-byte read is surfaced as `UnexpectedEof`. The caller maps this to a terminal
586 /// I/O error; we don't emit a GOAWAY on peer-initiated close.
587 fn poll_fill_to(&mut self, target: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
588 if self.read_buf.len() < target {
589 self.read_buf.resize(target, 0);
590 }
591 while self.read_filled < target {
592 let n = ready!(
593 Pin::new(&mut self.transport)
594 .poll_read(cx, &mut self.read_buf[self.read_filled..target])
595 )?;
596 if n == 0 {
597 return Poll::Ready(Err(io::Error::from(io::ErrorKind::UnexpectedEof)));
598 }
599 self.read_filled += n;
600 }
601 Poll::Ready(Ok(()))
602 }
603
604 /// Post-GOAWAY, drain whatever inbound bytes are *immediately* available from the
605 /// peer so our Drop sends a clean FIN (no unread data → no TCP RST) while the peer
606 /// sees the GOAWAY we just emitted. Read loops internally: consume each Ready chunk,
607 /// discard it, ask for more. Exits as soon as the transport returns `Pending` (no
608 /// bytes available right now) OR `Ready(0)` (peer FIN already arrived) OR any error.
609 ///
610 /// Does **not** register the waker on `Pending` — we're actively closing, not
611 /// observing the peer. A peer that happens to send more bytes after our exit will
612 /// have those bytes dropped when the transport is closed; that's a race the peer
613 /// chose to lose by sending after receiving our GOAWAY.
614 ///
615 /// Returning `Ready(())` unconditionally (no `Pending` case) lets the caller finalize
616 /// immediately. The `Poll` wrapper is kept for symmetry with the rest of the driver's
617 /// poll-style methods.
618 fn poll_drain_peer(&mut self, cx: &mut Context<'_>) -> Poll<()> {
619 // A peer flooding us with bytes could keep this loop going a long time. Cap it
620 // so a pathological client can't pin our close-out forever.
621 const MAX_DISCARD_ITERATIONS: usize = 256;
622 // Lightweight scratch — we're throwing it away. 512 balances "drain in few
623 // iterations" against "don't hold a large buffer for a rare path."
624 let mut scratch = [0u8; 512];
625 for _ in 0..MAX_DISCARD_ITERATIONS {
626 // We pass `cx` through for the benefit of the transport's `poll_read` contract,
627 // but we *interpret* `Pending` as "done draining" rather than parking on it —
628 // we're actively closing, not observing. A peer that sends more bytes after
629 // our exit loses the race.
630 match Pin::new(&mut self.transport).poll_read(cx, &mut scratch) {
631 Poll::Ready(Ok(0) | Err(_)) | Poll::Pending => {
632 return Poll::Ready(());
633 }
634 Poll::Ready(Ok(_)) => {}
635 }
636 }
637 Poll::Ready(())
638 }
639
640 /// Look up why a stream is closed. `None` means either never-opened or evicted from the
641 /// bounded ledger — both fall through to the connection-level default.
642 pub(super) fn closed_reason(&self, stream_id: u32) -> Option<ClosedReason> {
643 self.closed_streams.reason(stream_id)
644 }
645}