Skip to main content

trillium_http/h2/
acceptor.rs

1//! HTTP/2 driver loop ([`H2Driver`]) — owns the per-connection TCP transport and runs the
2//! poll-based state machine that demuxes frames, dispatches stream-opens to handler tasks, and
3//! pumps responses back out.
4//!
5//! Created by [`H2Connection::run`]. The runtime adapter calls [`H2Driver::next`] in a
6//! loop (or drives via the [`Stream`] impl, which has the same semantics); each yield either
7//! returns the next opened request stream (a [`Conn`] for the runtime to spawn a handler
8//! task against) or `None` when the connection is closed.
9//!
10//! The driver is a poll-based state machine, not an async fn. A single `drive` call is the
11//! unit of forward progress: it picks up conn-task signals, advances any in-flight response
12//! sends, drains pending outbound bytes, and advances the read cursor — parking with
13//! cancel-safe partial state when no further progress can be made.
14//!
15//! # Module layout
16//!
17//! Driver impl is split across this file and child modules to keep each focused:
18//!
19//! - **`acceptor.rs`** (this file): struct definition, the [`Self::drive`] orchestration loop, I/O
20//!   read primitives (`poll_fill_to`, `poll_drain_peer`), and the supporting enums
21//!   ([`DriverState`], [`ReadPhase`], [`CloseOutcome`], [`Action`], [`StreamEntry`]).
22//! - **`acceptor::closed_streams`**: bounded ledger of recently-closed streams + reasons, consulted
23//!   to pick the right §5.1 error category for stale peer frames.
24//! - **`acceptor::handler_signals`**: conn-task → driver work-pickup boundary. Owns the
25//!   `needs_servicing` mailbox protocol — `service_handler_signals`, `pick_up_new_client_streams`,
26//!   `has_pending_handler_signals`.
27//! - **`acceptor::outbound`**: outbound write/flush plumbing and `queue_*` frame helpers.
28//! - **`acceptor::recv`**: receive side — frame reader, dispatch, HEADERS+CONTINUATION
29//!   accumulation, malformed-request `RST_STREAM`, DATA routing into per-stream recv rings.
30//! - **`acceptor::send`**: send pump — picks up [`SendCursor`][send::SendCursor]s from the
31//!   conn-task signal pickup, frames HEADERS / DATA / trailing-HEADERS, signals completion.
32//!
33//! [`H2Connection::run`]: super::H2Connection::run
34//! [`Stream`]: futures_lite::stream::Stream
35
36mod closed_streams;
37mod constants;
38mod handler_signals;
39mod inflow;
40mod outbound;
41mod recv;
42mod send;
43#[cfg(test)]
44mod tests;
45mod types;
46
47use super::{
48    H2Error, H2ErrorCode, connection::H2Connection, frame::FRAME_HEADER_LEN, role::Role,
49    stream_state::StreamEvent, transport::H2Transport,
50};
51use crate::{
52    Conn, Priority,
53    headers::hpack::{HpackDecoder, HpackEncoder},
54};
55use closed_streams::{ClosedReason, ClosedStreams};
56use constants::{
57    INITIAL_CONNECTION_RECV_WINDOW, MAX_BUFFER_SIZE, MAX_DATA_CHUNK_SIZE, MAX_FLOW_CONTROL_WINDOW,
58};
59use futures_lite::io::{AsyncRead, AsyncWrite};
60use inflow::Inflow;
61use recv::PendingHeaders;
62use send::ScheduleEntry;
63use std::{
64    collections::{BTreeMap, HashMap},
65    future::Future,
66    io,
67    pin::Pin,
68    sync::Arc,
69    task::{Context, Poll, ready},
70};
71use swansong::ShuttingDown;
72use types::{
73    AcceptorConfig, Action, CloseOutcome, DriverState, Next, ReadPhase, StreamEntry, frame_slice,
74};
75
76/// Owns the per-connection TCP transport and drives the HTTP/2 demux loop.
77///
78/// See the module docs for the high-level driver shape and how its impl is split across the
79/// `recv` and `send` child modules.
80#[derive(Debug)]
81pub struct H2Driver<T> {
82    connection: Arc<H2Connection>,
83    transport: T,
84
85    /// Role this driver runs in — see [`Role`]. Consulted at role-asymmetric branch points
86    /// (preface direction, HEADERS-on-unknown-id, HEADERS-on-known-id).
87    role: Role,
88
89    /// Overall lifecycle position of the driver.
90    state: DriverState,
91
92    /// Future that resolves when the shared `Swansong` begins shutdown. Polled each
93    /// `drive` tick while the driver is running; on resolution the driver queues a
94    /// GOAWAY and transitions to `Closing`, after which the top-of-loop guard returns
95    /// early and we never poll this again on the same acceptor.
96    shutting_down: ShuttingDown,
97
98    /// Inbound byte cursor. Accumulates bytes from the transport across `drive` calls so
99    /// a partial frame read can survive a return to `Poll::Pending`. Always contains
100    /// exactly the bytes of the current frame being accumulated (header, then payload);
101    /// reset after each complete frame is dispatched.
102    read_buf: Vec<u8>,
103    read_filled: usize,
104    read_phase: ReadPhase,
105
106    /// Outbound byte cursor. The driver encodes control frames into `write_buf` and drains
107    /// to the transport via `poll_flush_outbound`. `write_cursor` is the offset of the
108    /// first byte not yet accepted by `poll_write`. After the buffer fully drains, both
109    /// fields are reset and a flush is issued.
110    write_buf: Vec<u8>,
111    write_cursor: usize,
112    write_flush_pending: bool,
113
114    /// HPACK decoder state, shared across all header blocks on this connection.
115    hpack: HpackDecoder,
116
117    /// HPACK encoder state. The driver is the sole owner — handlers / conn tasks
118    /// no longer touch it, so this is a plain field with no synchronization.
119    hpack_encoder: HpackEncoder,
120
121    /// Per-stream state, keyed by stream id. Driver-only — handler tasks hold their own
122    /// `Arc<StreamState>` via [`H2Transport`] and don't consult this table. The entry
123    /// bundles the shared state with driver-private bookkeeping (e.g. "have we already
124    /// advertised the recv window after seeing `is_reading`?").
125    ///
126    /// A `BTreeMap` (not a hash map) so the send pump iterates streams in ascending
127    /// stream-id order. For the client role this is load-bearing: a client MUST send
128    /// opening HEADERS in monotonically increasing stream-id order (RFC 9113 §5.1.1),
129    /// and concurrent `open_stream` calls would otherwise let the pump frame a higher
130    /// id before a lower one, drawing a `GOAWAY(PROTOCOL_ERROR)` from the peer. (See
131    /// also the allocate-under-`streams_lock` ordering in `open_stream`.)
132    streams: BTreeMap<u32, StreamEntry>,
133
134    /// Highest peer-initiated stream id seen so far. Peer-initiated (client) stream ids
135    /// must be odd and strictly increasing.
136    last_peer_stream_id: u32,
137
138    /// Latest RFC 9218 priority signaled per stream via `PRIORITY_UPDATE`, keyed by stream
139    /// id. Holds entries for streams not yet opened too (an update can precede its HEADERS),
140    /// so it is bounded by [`recv::MAX_TRACKED_PRIORITIES`] as a `DoS` guard. Latest signal
141    /// wins; the request's own `priority` header (on the [`StreamEntry`]) is the fallback when
142    /// no entry is present. See [`Self::effective_priority`].
143    ///
144    /// Lookups are by stream id and nothing iterates this in order, so it's a hash map.
145    stream_priorities: HashMap<u32, Priority>,
146
147    /// Reused scratch for the send pump's priority schedule ([`ScheduleEntry`] per active stream),
148    /// rebuilt and sorted each pump tick. Kept on the driver (rather than allocated per tick) so
149    /// the per-tick cost on this hot path is a clear + extend, not a fresh allocation. Empty
150    /// between ticks. See [`Self::advance_outbound_sends`].
151    send_schedule: Vec<ScheduleEntry>,
152
153    /// Accumulator for an in-progress HEADERS block that is waiting on further CONTINUATION
154    /// frames. `None` outside a HEADERS block. The spec forbids any frame on any stream
155    /// from interleaving while this is `Some`.
156    pending_headers: Option<PendingHeaders>,
157
158    /// Set once the driver decides to close: graceful (peer GOAWAY / server swansong / peer
159    /// EOF) or erroring (protocol violation → GOAWAY with code, or I/O failure → no
160    /// GOAWAY). `drive` completes (returns `None` or a final `Some(Err(...))`) once
161    /// outbound drains to empty.
162    close_outcome: Option<CloseOutcome>,
163
164    /// Set after `drive` yields its terminal result. Subsequent calls return `None` without
165    /// touching the transport.
166    finished: bool,
167
168    /// Reusable scratch the send pump reads body chunks into before framing as DATA.
169    /// Sized at [`MAX_DATA_CHUNK_SIZE`] — even if the peer permits larger frames we cap our
170    /// DATA emissions here to bound per-connection memory.
171    body_scratch: Vec<u8>,
172
173    /// Reusable scratch the HPACK encoder writes a HEADERS block into before it is copied
174    /// into `write_buf` as HEADERS/CONTINUATION fragments. Retained across responses so the
175    /// steady-state header encode allocates nothing.
176    headers_scratch: Vec<u8>,
177
178    /// Connection-level send flow-control window. Tracked as [`i64`] for symmetry with the
179    /// per-stream windows, which a mid-connection `INITIAL_WINDOW_SIZE` reduction can drive
180    /// temporarily negative; the connection window itself is *not* affected by
181    /// `SETTINGS_INITIAL_WINDOW_SIZE`. Decremented as we emit DATA; incremented by peer
182    /// `WINDOW_UPDATE(stream_id=0, inc)`. Overflow past [`MAX_FLOW_CONTROL_WINDOW`] is a
183    /// connection-level `FLOW_CONTROL_ERROR`.
184    connection_send_window: i64,
185
186    /// Connection-level receive flow-control window. Starts at the spec's 65535-octet baseline
187    /// (the spec forbids SETTINGS from altering it) and is promoted to the configured
188    /// `h2_initial_connection_window_size` via an initial `WINDOW_UPDATE(0)` right after SETTINGS,
189    /// then topped up as handlers drain across all streams. See [`Inflow`]. A peer that sends past
190    /// the granted window earns a connection-level `FLOW_CONTROL_ERROR`.
191    connection_inflow: Inflow,
192
193    /// Bounded ledger of recently-closed streams and why they closed. Consulted by
194    /// [`recv::H2Driver::finalize_headers`] when a HEADERS frame arrives on an id ≤
195    /// `last_peer_stream_id` that's not in the active map, to distinguish `RST_STREAM`-
196    /// closed (stream-level `STREAM_CLOSED`) from `END_STREAM`-closed or never-opened
197    /// (connection-level). See [`ClosedStreams`] for the eviction policy.
198    closed_streams: ClosedStreams,
199
200    /// Snapshot of the h2-relevant fields of [`HttpConfig`][crate::HttpConfig] taken at
201    /// acceptor construction. Copied in because `HttpConfig` is per-server but an acceptor
202    /// is per-connection — the config is effectively immutable over a connection's
203    /// lifetime, and a local copy avoids reaching through [`H2Connection::context`] on
204    /// every policy check.
205    ///
206    /// [`H2Connection::context`]: super::H2Connection::context
207    pub(super) config: AcceptorConfig,
208}
209
210impl<T> H2Driver<T>
211where
212    T: AsyncRead + AsyncWrite + Unpin + Send,
213{
214    pub(super) fn new(connection: Arc<H2Connection>, transport: T, role: Role) -> Self {
215        let shutting_down = connection.swansong().shutting_down();
216        let context = connection.context();
217        let config = AcceptorConfig::from_http_config(context.config());
218        let hpack_encoder = HpackEncoder::new(
219            context.observer.clone(),
220            context.config.dynamic_table_capacity(),
221            context.config.recent_pairs_size(),
222        );
223        Self {
224            connection,
225            transport,
226            role,
227            state: DriverState::AwaitingPreface,
228            shutting_down,
229            read_buf: vec![0u8; FRAME_HEADER_LEN],
230            read_filled: 0,
231            read_phase: ReadPhase::NeedHeader,
232            write_buf: Vec::new(),
233            write_cursor: 0,
234            write_flush_pending: false,
235            hpack: HpackDecoder::new(config.hpack_table_capacity()),
236            hpack_encoder,
237            streams: BTreeMap::new(),
238            last_peer_stream_id: 0,
239            stream_priorities: HashMap::new(),
240            send_schedule: Vec::new(),
241            pending_headers: None,
242            close_outcome: None,
243            finished: false,
244            body_scratch: vec![0u8; MAX_DATA_CHUNK_SIZE as usize],
245            headers_scratch: Vec::new(),
246            connection_send_window: INITIAL_CONNECTION_RECV_WINDOW,
247            connection_inflow: Inflow::new(INITIAL_CONNECTION_RECV_WINDOW),
248            closed_streams: ClosedStreams::default(),
249            config,
250        }
251    }
252
253    /// The shared [`H2Connection`] this acceptor was created from.
254    pub fn connection(&self) -> &Arc<H2Connection> {
255        &self.connection
256    }
257
258    /// Drive the connection until the next request stream opens, the connection ends, or a
259    /// fatal protocol or I/O error occurs.
260    ///
261    /// Returns `Ok(Some(conn))` for each new request stream — the runtime adapter is
262    /// expected to spawn a handler task that consumes the [`Conn`]. Malformed requests are
263    /// handled internally with a stream-level `RST_STREAM` and never surfaced. Returns
264    /// `Ok(None)` when the connection has been shut down cleanly (peer GOAWAY, our own
265    /// swansong shutdown, peer EOF at a frame boundary).
266    ///
267    /// # Errors
268    ///
269    /// The returned future resolves to an [`H2Error`] for any *connection-level* protocol
270    /// violation detected while decoding peer frames or for an unrecoverable transport I/O
271    /// error. A final GOAWAY is sent before a protocol error is returned (best-effort; I/O
272    /// errors skip it).
273    // Mirrors `StreamExt::next` (a `&mut self -> impl Future<Output = Option<T>>` adapter),
274    // not `Iterator::next`. The driver is also `Stream`, so callers can use either.
275    #[allow(clippy::should_implement_trait)]
276    pub fn next(&mut self) -> Next<'_, T> {
277        Next { driver: self }
278    }
279
280    /// Poll-based driver core. Shared by [`Next`]'s `Future` impl, the [`Stream`] impl on
281    /// [`H2Driver`], and [`H2Initiator`][super::H2Initiator]'s client-side Future impl.
282    ///
283    /// [`Stream`]: futures_lite::stream::Stream
284    #[allow(
285        clippy::too_many_lines,
286        reason = "state-machine orchestration; splitting muddies the read-as-a-recipe shape"
287    )]
288    pub(super) fn drive(
289        &mut self,
290        cx: &mut Context<'_>,
291    ) -> Poll<Option<Result<Conn<H2Transport>, H2Error>>> {
292        if self.finished {
293            return Poll::Ready(None);
294        }
295
296        for loop_number in 0..self.config.copy_loops_per_yield() {
297            log::trace!("h2 drive loop number: {loop_number}");
298            // 1. Conn-task signals. Picks up window-update intent (`is_reading`) and new
299            //    `submit_send` submissions, moving them into driver-private state.
300            self.service_handler_signals();
301
302            // 2. Send pump. Turns picked-up SendCursors into HEADERS / DATA / trailing- HEADERS
303            //    frame bytes in `write_buf`. Body reads that return Pending leave the cursor in
304            //    place — the body's source will wake the driver task.
305            self.advance_outbound_sends(cx);
306
307            // 3. Flush any pending outbound — never re-poll reads when we still owe bytes to the
308            //    peer, and never signal closure to the caller before the wire is clean.
309            match self.poll_flush_outbound(cx) {
310                Poll::Ready(Ok(())) => {}
311                Poll::Ready(Err(e)) => {
312                    // Flush failure while closing: just take whatever outcome we had and
313                    // shelve the fresh I/O error. While running, record and finish.
314                    if self.close_outcome.is_none() {
315                        self.close_outcome = Some(CloseOutcome::Io(e));
316                    }
317                    return Poll::Ready(self.finish_with_current_outcome());
318                }
319                Poll::Pending => return Poll::Pending,
320            }
321
322            // 4. If we were closing, outbound is now drained. For graceful (or protocol-error)
323            //    shutdowns, transition to `Drained` and wait for the peer to close its write half —
324            //    otherwise the peer sees our drop as a reset rather than a clean close. For
325            //    I/O-error shutdowns the transport is already untrustworthy, so skip the drain.
326            //    Defer the transition while in-flight streams still have outbound (an active
327            //    SendCursor or queued parts), an open send half (a handler that hasn't submitted
328            //    its response yet — half-closed-remote is *not* drained), OR inbound (recv half not
329            //    yet closed) work. Without this, a handler that submits trailers *after* the
330            //    cancellation race resolves gets stranded with bytes parked in mailboxes; a handler
331            //    that hasn't responded yet when shutdown begins has its response `SubmitSend`
332            //    orphaned by a driver that finished out from under it; and a client receiving
333            //    GOAWAY mid-stream stops decoding incoming frames before the server's trailing
334            //    HEADERS arrive. Falls through to step 6 so the recv pump (also gated on
335            //    Running|Closing now) keeps running and parks on the transport read waker rather
336            //    than the outbound-only `park` here.
337            if self.state == DriverState::Closing {
338                if matches!(self.close_outcome, Some(CloseOutcome::Io(_))) {
339                    return Poll::Ready(self.finish_with_current_outcome());
340                }
341                if self.has_active_send_cursors()
342                    || self.has_open_send_half()
343                    || self.has_pending_recv()
344                {
345                    self.log_closing_blockers();
346                } else {
347                    self.set_state(
348                        DriverState::Drained,
349                        "outbound drained, no in-flight streams",
350                    );
351                }
352            }
353
354            // 5. Server-initiated shutdown check. Only relevant while we're running — once we're
355            //    past the Closing/Drained transition we've already committed to a close and
356            //    re-observing the swansong here would re-enter begin_close in a loop. Post-shutdown
357            //    re-polls of `ShuttingDown` are harmless themselves (event_listener-backed, not
358            //    single-shot) but the re-entry isn't.
359            if self.state == DriverState::Running
360                && Pin::new(&mut self.shutting_down).poll(cx).is_ready()
361            {
362                self.begin_close(CloseOutcome::Graceful);
363                continue;
364            }
365
366            // 6. State-specific step.
367            match self.state {
368                DriverState::AwaitingPreface => {
369                    // Role-asymmetric: server reads the 24-byte preface off the wire; client
370                    // writes it to `write_buf` (the next drain tick flushes it, then our
371                    // SETTINGS, then the peer's SETTINGS arrives as the first frame in Running).
372                    let poll = match self.role {
373                        Role::Server => self.poll_read_preface(cx),
374                        Role::Client => {
375                            self.queue_client_preface();
376                            Poll::Ready(Ok(()))
377                        }
378                    };
379                    match poll {
380                        Poll::Ready(Ok(())) => {
381                            self.set_state(DriverState::NeedsServerSettings, "preface complete");
382                        }
383                        Poll::Ready(Err(e)) => {
384                            self.close_outcome = Some(e);
385                            return Poll::Ready(self.finish_with_current_outcome());
386                        }
387                        Poll::Pending => {
388                            if self.park(cx) {
389                                return Poll::Pending;
390                            }
391                        }
392                    }
393                }
394
395                DriverState::NeedsServerSettings => {
396                    self.queue_settings();
397                    // The spec forbids SETTINGS from altering the connection-level
398                    // flow-control window — it stays at the 65535 baseline unless we raise
399                    // it via `WINDOW_UPDATE(0)`. Do that immediately after SETTINGS so peer
400                    // bulk uploads aren't capped at ~5 Mbit/s × RTT.
401                    let raise = self
402                        .connection_inflow
403                        .raise_target(i64::from(self.config.initial_connection_window_size()));
404                    if raise > 0 {
405                        self.queue_window_update(0, u32::try_from(raise).unwrap_or(u32::MAX));
406                    }
407                    self.set_state(DriverState::Running, "initial SETTINGS queued");
408                }
409
410                // Read pump runs in both Running and Closing so a Closing-side driver
411                // (we sent or received GOAWAY) keeps decoding inbound frames for streams
412                // that haven't reached recv-closed yet — e.g. trailing HEADERS for an
413                // in-flight server-stream the peer is about to send. New `Action::Emit`
414                // streams are ignored in Closing: post-GOAWAY the peer shouldn't be
415                // opening new ones (and we wouldn't want to dispatch handlers for them
416                // even if it did).
417                DriverState::Running | DriverState::Closing => match self.poll_advance_read(cx) {
418                    Poll::Ready(Ok(Action::Continue)) => {}
419                    Poll::Ready(Ok(Action::Emit(conn))) => {
420                        if self.state == DriverState::Running {
421                            return Poll::Ready(Some(Ok(*conn)));
422                        }
423                        // Closing — drop the conn; outer loop continues processing
424                        // remaining in-flight streams until drained.
425                    }
426                    Poll::Ready(Ok(Action::Close(outcome))) => {
427                        self.begin_close(outcome);
428                    }
429                    // Protocol errors need a GOAWAY on the wire before we terminate;
430                    // `begin_close` queues that and transitions us to Closing so the next
431                    // outer-loop iteration drains the frame. Io errors short-circuit:
432                    // if we're already Closing, the transport is gone, so finish without
433                    // looping forever waiting for in-flight streams (`has_pending_recv`
434                    // can't decide on its own that the peer is never sending again).
435                    Poll::Ready(Err(e)) => {
436                        if self.state == DriverState::Closing {
437                            self.close_outcome.get_or_insert(e);
438                            return Poll::Ready(self.finish_with_current_outcome());
439                        }
440                        self.begin_close(e);
441                    }
442                    Poll::Pending => {
443                        if self.park(cx) {
444                            return Poll::Pending;
445                        }
446                    }
447                },
448
449                DriverState::Drained => match self.poll_drain_peer(cx) {
450                    Poll::Ready(()) => {
451                        return Poll::Ready(self.finish_with_current_outcome());
452                    }
453                    Poll::Pending => return Poll::Pending,
454                },
455            }
456        }
457
458        // Cooperative yield: we made `copy_loops_per_yield` rounds of progress without
459        // hitting an internal Pending. Re-arm immediately and let the runtime pick up
460        // anything else it has waiting before we resume.
461        cx.waker().wake_by_ref();
462        Poll::Pending
463    }
464
465    /// Register the driver's waker with the shared `outbound_waker` (so handler tasks can
466    /// wake the driver) and tell the caller whether it's safe to park. Returns `true` if
467    /// the driver should return `Poll::Pending`, or `false` if a handler produced work
468    /// between our last check and the registration — in which case the caller should loop
469    /// around to pick it up.
470    fn park(&mut self, cx: &mut Context<'_>) -> bool {
471        self.connection.outbound_waker().register(cx.waker());
472        !self.has_pending_handler_signals() && !self.has_pending_outbound_progress()
473    }
474
475    /// Convert the current `close_outcome` into the terminal return of [`Self::drive`]. Must
476    /// only be called after outbound bytes have been flushed. Graceful closes return `None`;
477    /// errors surface as a final `Some(Err(...))` before subsequent polls return `None`.
478    fn finish_with_current_outcome(&mut self) -> Option<Result<Conn<H2Transport>, H2Error>> {
479        self.finished = true;
480        // Complete every outstanding `H2Connection::send_ping` future with an error so
481        // awaiting callers don't block forever. Safe to call regardless of outcome —
482        // a no-op if no pings are in flight.
483        self.connection.fail_pending_pings(
484            io::ErrorKind::ConnectionAborted,
485            "h2 connection closed before PING ACK",
486        );
487        // Wake any `PeerSettings` waiters so a peer that disconnects without ever sending
488        // SETTINGS doesn't strand them. Their `poll` rechecks swansong state and returns
489        // Ready; the caller's follow-up operation surfaces the connection-closed error.
490        self.connection.wake_peer_settings_waiters();
491        // Resolve every still-live stream's recv-side waiters. A connection that dies with
492        // an in-flight stream (server GOAWAY + close, peer FIN, I/O error) leaves any task
493        // parked on the response — `response_headers`, a body `poll_read`, an upgrade
494        // `poll_write` — with no other wake source. Without this a client request hangs
495        // forever on a graceful server shutdown. Mirror the per-stream RST teardown:
496        // terminal `Reset` (recv reports eof → `ResponseHeaders` yields `ConnectionAborted`,
497        // reads return EOF, writes `BrokenPipe`) + the same waker fan-out.
498        let reset_code = match &self.close_outcome {
499            Some(CloseOutcome::Protocol(code)) => *code,
500            _ => H2ErrorCode::NoError,
501        };
502        for entry in self.streams.values() {
503            // Move each still-live stream to `Closed{Reset}` (a no-op on streams already closed, so
504            // an existing reason isn't clobbered), then fan out every recv/send waker so parked
505            // tasks observe the close instead of hanging.
506            let _ = entry.shared.apply_event(StreamEvent::RecvReset(reset_code));
507            entry.shared.recv.waker.wake();
508            entry.shared.recv.response_headers_waker.wake();
509            entry.shared.send.outbound_write_waker.wake();
510            // A handler already parked in `SubmitSend` (response staged, awaiting the driver to
511            // frame it) needs this wake to re-poll and observe the now-reset stream — the recv
512            // fan-out above doesn't reach the send-completion waiter.
513            entry.shared.send.completion_waker.wake();
514        }
515        match self.close_outcome.take() {
516            None | Some(CloseOutcome::Graceful) => None,
517            Some(CloseOutcome::Protocol(code)) => Some(Err(H2Error::Protocol(code))),
518            Some(CloseOutcome::Io(e)) => Some(Err(H2Error::Io(e))),
519        }
520    }
521
522    /// Enter the closing state: record the outcome and queue a GOAWAY (only for outcomes
523    /// that warrant one). The main loop will drain `write_buf` and then finish.
524    fn begin_close(&mut self, outcome: CloseOutcome) {
525        // Idempotent: with the recv pump now running in Closing (so we keep
526        // decoding inbound frames for in-flight streams across GOAWAY), a peer
527        // GOAWAY arriving after we've already begun closing would otherwise
528        // re-queue our own GOAWAY and re-enter Closing, ping-ponging forever
529        // with a peer that mirrors the behavior.
530        if self.state == DriverState::Closing || self.state == DriverState::Drained {
531            log::trace!(
532                "h2 driver: begin_close({outcome:?}) — already in {:?}, ignoring",
533                self.state,
534            );
535            return;
536        }
537        // Don't overwrite a prior outcome (e.g. if an error fires in the middle of a
538        // graceful shutdown, keep the error).
539        let code = match &outcome {
540            CloseOutcome::Graceful => Some(H2ErrorCode::NoError),
541            CloseOutcome::Protocol(code) => Some(*code),
542            CloseOutcome::Io(_) => None,
543        };
544        let reason = match &outcome {
545            CloseOutcome::Graceful => "graceful close",
546            CloseOutcome::Protocol(_) => "protocol error",
547            CloseOutcome::Io(_) => "i/o error",
548        };
549        if self.close_outcome.is_none() {
550            self.close_outcome = Some(outcome);
551        }
552        if let Some(code) = code {
553            self.queue_goaway(self.last_peer_stream_id, code);
554        }
555        self.set_state(DriverState::Closing, reason);
556    }
557
558    /// The sole mutator of `self.state`. Logs every transition so a trace log reads as
559    /// a sequence of named lifecycle events.
560    fn set_state(&mut self, new: DriverState, reason: &'static str) {
561        if self.state == new {
562            return;
563        }
564        log::trace!(
565            "h2 driver: state {old:?} → {new:?} ({reason})",
566            old = self.state,
567        );
568        self.state = new;
569    }
570
571    /// Log which in-flight streams are blocking the `Closing → Drained` transition.
572    /// Called from the closing-state check when at least one predicate (`has_active_send_cursors`
573    /// or `has_pending_recv`) is still true, so a trace log shows exactly which streams the
574    /// driver is waiting on.
575    fn log_closing_blockers(&self) {
576        if !log::log_enabled!(log::Level::Trace) {
577            return;
578        }
579        for (id, entry) in &self.streams {
580            let lifecycle = *entry.shared.lifecycle_lock();
581            let queued = !entry
582                .shared
583                .send
584                .queue
585                .lock()
586                .expect("send queue mutex poisoned")
587                .is_empty();
588            if entry.send.is_some() || queued || !lifecycle.recv_closed() {
589                log::trace!(
590                    "h2 driver: Closing — stream {id} blocking drain (lifecycle={lifecycle:?}, \
591                     cursor_present={}, queued={queued})",
592                    entry.send.is_some(),
593                );
594            }
595        }
596    }
597
598    /// Read bytes from the transport into `read_buf[read_filled..target]` until
599    /// `read_filled >= target`. Cancel-safe: if the caller drops the Future, any bytes
600    /// already placed are preserved in the buffer.
601    ///
602    /// A 0-byte read is surfaced as `UnexpectedEof`. The caller maps this to a terminal
603    /// I/O error; we don't emit a GOAWAY on peer-initiated close.
604    fn poll_fill_to(&mut self, target: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
605        if self.read_buf.len() < target {
606            self.read_buf.resize(target, 0);
607        }
608        while self.read_filled < target {
609            let n = ready!(
610                Pin::new(&mut self.transport)
611                    .poll_read(cx, &mut self.read_buf[self.read_filled..target])
612            )?;
613            if n == 0 {
614                return Poll::Ready(Err(io::Error::from(io::ErrorKind::UnexpectedEof)));
615            }
616            self.read_filled += n;
617        }
618        Poll::Ready(Ok(()))
619    }
620
621    /// Post-GOAWAY, drain whatever inbound bytes are *immediately* available from the
622    /// peer so our Drop sends a clean FIN (no unread data → no TCP RST) while the peer
623    /// sees the GOAWAY we just emitted. Read loops internally: consume each Ready chunk,
624    /// discard it, ask for more. Exits as soon as the transport returns `Pending` (no
625    /// bytes available right now) OR `Ready(0)` (peer FIN already arrived) OR any error.
626    ///
627    /// Does **not** register the waker on `Pending` — we're actively closing, not
628    /// observing the peer. A peer that happens to send more bytes after our exit will
629    /// have those bytes dropped when the transport is closed; that's a race the peer
630    /// chose to lose by sending after receiving our GOAWAY.
631    ///
632    /// Returning `Ready(())` unconditionally (no `Pending` case) lets the caller finalize
633    /// immediately. The `Poll` wrapper is kept for symmetry with the rest of the driver's
634    /// poll-style methods.
635    fn poll_drain_peer(&mut self, cx: &mut Context<'_>) -> Poll<()> {
636        // A peer flooding us with bytes could keep this loop going a long time. Cap it
637        // so a pathological client can't pin our close-out forever.
638        const MAX_DISCARD_ITERATIONS: usize = 256;
639        // Lightweight scratch — we're throwing it away. 512 balances "drain in few
640        // iterations" against "don't hold a large buffer for a rare path."
641        let mut scratch = [0u8; 512];
642        for _ in 0..MAX_DISCARD_ITERATIONS {
643            // We pass `cx` through for the benefit of the transport's `poll_read` contract,
644            // but we *interpret* `Pending` as "done draining" rather than parking on it —
645            // we're actively closing, not observing. A peer that sends more bytes after
646            // our exit loses the race.
647            match Pin::new(&mut self.transport).poll_read(cx, &mut scratch) {
648                Poll::Ready(Ok(0) | Err(_)) | Poll::Pending => {
649                    return Poll::Ready(());
650                }
651                Poll::Ready(Ok(_)) => {}
652            }
653        }
654        Poll::Ready(())
655    }
656
657    /// Look up why a stream is closed. `None` means either never-opened or evicted from the
658    /// bounded ledger — both fall through to the connection-level default.
659    pub(super) fn closed_reason(&self, stream_id: u32) -> Option<ClosedReason> {
660        self.closed_streams.reason(stream_id)
661    }
662}