Skip to main content

trillium_http/h2/
transport.rs

1//! Per-stream transport handed to handler tasks.
2//!
3//! [`H2Transport`] is the [`AsyncRead`] + [`AsyncWrite`] view of a single HTTP/2 stream. It is
4//! carried on the emitted [`Conn`][crate::Conn] returned from [`H2Driver::next`], and the
5//! runtime adapter spawns a handler task that consumes it. The transport never touches the
6//! underlying TCP connection directly — all I/O coordinates through shared per-stream state
7//! on the [`H2Connection`] driven by the driver task.
8//!
9//! Two paths reach the impls:
10//!
11//! - **Normal HTTP/2 request/response**: handlers usually don't touch [`H2Transport`] directly
12//!   (same sharp edge h1 and h3 document). [`ReceivedBody`][crate::ReceivedBody] reads request body
13//!   bytes through the transport's `AsyncRead` via
14//!   [`ReceivedBody::handle_raw`][crate::ReceivedBody::handle_raw]. Response bytes flow through
15//!   [`H2Connection::submit_send`][submit_send] to the driver's send pump, which frames HEADERS +
16//!   DATA + trailing HEADERS onto the connection without ever touching this `AsyncWrite`.
17//!
18//! - **Extended-CONNECT upgrades** ([RFC 8441] WebSocket-over-h2, plus the in-progress
19//!   `draft-ietf-webtrans-http2` for WebTransport-over-h2): after the handler responds 200 to a
20//!   `CONNECT` request with a `:protocol` pseudo-header, [`Conn::send_h2`][crate::Conn::send_h2]
21//!   routes through [`H2Connection::submit_upgrade`][submit_upgrade] which frames HEADERS without
22//!   `END_STREAM`, signals send completion early, and leaves the stream open as a bidirectional
23//!   byte channel. The runtime adapter then dispatches
24//!   [`Handler::upgrade`][trillium::Handler::upgrade], which gets an [`Upgrade`][crate::Upgrade]
25//!   wrapping this transport. `AsyncWrite::poll_write` appends to a per-stream outbound queue
26//!   ([`SendState::outbound`]); the driver's send pump drains it into DATA frames bounded by the
27//!   per-stream and connection send windows. `AsyncWrite::poll_close` flips
28//!   [`SendState::outbound_close_requested`] so the driver eventually emits `DATA(END_STREAM)` and
29//!   tears the stream down.
30//!
31//! [`H2Driver::next`]: super::H2Driver::next
32//! [`H2Connection`]: super::H2Connection
33//! [`BoxedTransport`]: crate::transport::BoxedTransport
34//! [submit_send]: super::H2Connection::submit_send
35//! [submit_upgrade]: super::H2Connection::submit_upgrade
36//! [RFC 8441]: https://www.rfc-editor.org/rfc/rfc8441
37
38use super::{H2Connection, H2ErrorCode, lifecycle::StreamLifecycle};
39use crate::{Buffer, Headers, headers::hpack::FieldSection};
40use atomic_waker::AtomicWaker;
41use futures_lite::io::{AsyncRead, AsyncWrite};
42use std::{
43    fmt, io,
44    pin::Pin,
45    sync::{
46        Arc, Mutex, MutexGuard,
47        atomic::{AtomicBool, AtomicU64, Ordering},
48    },
49    task::{Context, Poll},
50};
51
52/// A single HTTP/2 stream's transport handle.
53///
54/// Carries a backref to the shared [`H2Connection`], the stream id, and the per-stream
55/// `Arc<StreamState>` used by the read side. Normal HTTP/2 operation reads through
56/// [`ReceivedBody`][crate::ReceivedBody] and writes through the connection's send queue;
57/// the `AsyncRead` / `AsyncWrite` impls here are only reached by code that borrows the
58/// transport directly (typically an upgrade handler after extended CONNECT).
59pub struct H2Transport {
60    connection: Arc<H2Connection>,
61    stream_id: u32,
62    state: Arc<StreamState>,
63}
64
65impl fmt::Debug for H2Transport {
66    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67        f.debug_struct("H2Transport")
68            .field("stream_id", &self.stream_id)
69            .finish_non_exhaustive()
70    }
71}
72
73impl H2Transport {
74    /// Create a transport for a stream that has just been opened by the driver.
75    pub(super) fn new(
76        connection: Arc<H2Connection>,
77        stream_id: u32,
78        state: Arc<StreamState>,
79    ) -> Self {
80        Self {
81            connection,
82            stream_id,
83            state,
84        }
85    }
86
87    /// The stream identifier this transport is bound to.
88    pub fn stream_id(&self) -> u32 {
89        self.stream_id
90    }
91
92    /// The shared [`H2Connection`] backing this stream.
93    pub fn connection(&self) -> &Arc<H2Connection> {
94        &self.connection
95    }
96}
97
98impl Drop for H2Transport {
99    /// Application-side release / cancel signal. A single flat match on the lifecycle
100    /// variant — the variant *is* the answer, so there is no ordering between checks to
101    /// get wrong:
102    ///
103    /// - `Reset` / `ResetRequested`: stream is already torn down or about to be; no-op.
104    /// - `AwaitingRelease`: client-only wire-closed-but-held lifecycle. Signal release so the
105    ///   driver removes the entry from both maps.
106    /// - `UpgradeOpen` / `UpgradeClosing`: extended-CONNECT lifecycle. Schedule graceful close —
107    ///   the variant itself records that this stream's drop semantics are "graceful," no separate
108    ///   `graceful_drop` flag needed. Wake is idempotent if already in `UpgradeClosing`.
109    /// - Anything else (`Idle` / `Submitted` / `Sending`): mid-stream drop on a normal
110    ///   request/response — emit `RST_STREAM(Cancel)` so the peer learns we abandoned it.
111    fn drop(&mut self) {
112        // Cheap pre-check: if the stream is no longer in the shared map the driver has
113        // already cleaned up; nothing to do.
114        if !self.connection.streams_lock().contains_key(&self.stream_id) {
115            log::trace!(
116                "h2 stream {}: H2Transport dropped on already-released stream",
117                self.stream_id,
118            );
119            return;
120        }
121
122        let mut lifecycle = self.state.lifecycle_lock();
123        match &*lifecycle {
124            StreamLifecycle::Reset(_) | StreamLifecycle::ResetRequested(_) => {
125                log::trace!(
126                    "h2 stream {}: H2Transport dropped on already-reset stream",
127                    self.stream_id,
128                );
129            }
130            StreamLifecycle::AwaitingRelease => {
131                // Shouldn't ordinarily happen — this branch's Drop path is what
132                // *transitions* a stream into AwaitingRelease. Reaching it again means
133                // a second Drop, which is structurally impossible (`H2Transport` is not
134                // `Clone`); leaving it as a defensive no-op.
135                log::trace!(
136                    "h2 stream {}: H2Transport dropped while already AwaitingRelease",
137                    self.stream_id,
138                );
139            }
140            StreamLifecycle::UpgradeOpen { recv_eof } => {
141                let recv_eof = *recv_eof;
142                log::trace!(
143                    "h2 stream {}: H2Transport dropped (upgrade) — scheduling graceful close",
144                    self.stream_id,
145                );
146                *lifecycle = StreamLifecycle::UpgradeClosing {
147                    recv_eof,
148                    pending_trailers: None,
149                };
150                drop(lifecycle);
151                self.state.needs_servicing.store(true, Ordering::Release);
152                self.state.send.outbound_waker.wake();
153                self.connection.outbound_waker().wake();
154            }
155            StreamLifecycle::UpgradeClosing { .. } => {
156                log::trace!(
157                    "h2 stream {}: H2Transport dropped — graceful close already in flight",
158                    self.stream_id,
159                );
160                // Driver is already draining; just nudge in case it parked.
161                drop(lifecycle);
162                self.state.needs_servicing.store(true, Ordering::Release);
163                self.state.send.outbound_waker.wake();
164                self.connection.outbound_waker().wake();
165            }
166            _ => {
167                // Idle / Submitted / Sending: mid-stream drop. Check the wire-closed
168                // case first — recv_eof + send.completed means the application is done
169                // with a stream that already finished on the wire, and we forward to
170                // release (client-role keeps streams in the map past wire-close so
171                // late-trailer / late-read access works). Otherwise RST_STREAM(Cancel).
172                let send_done = self.state.send.completed.load(Ordering::Acquire);
173                let recv_done = lifecycle.recv_eof();
174                if send_done && recv_done {
175                    log::trace!(
176                        "h2 stream {}: H2Transport dropped on wire-closed stream — releasing",
177                        self.stream_id,
178                    );
179                    *lifecycle = StreamLifecycle::AwaitingRelease;
180                    drop(lifecycle);
181                    self.state.needs_servicing.store(true, Ordering::Release);
182                    self.connection.outbound_waker().wake();
183                } else {
184                    log::debug!(
185                        "h2 stream {}: H2Transport dropped mid-stream — RST_STREAM(Cancel)",
186                        self.stream_id,
187                    );
188                    *lifecycle = StreamLifecycle::ResetRequested(H2ErrorCode::Cancel);
189                    drop(lifecycle);
190                    self.state.needs_servicing.store(true, Ordering::Release);
191                    self.connection.outbound_waker().wake();
192                }
193            }
194        }
195    }
196}
197
198impl AsyncRead for H2Transport {
199    fn poll_read(
200        self: Pin<&mut Self>,
201        cx: &mut Context<'_>,
202        out: &mut [u8],
203    ) -> Poll<io::Result<usize>> {
204        if out.is_empty() {
205            return Poll::Ready(Ok(0));
206        }
207
208        // The first `poll_read` is the handler's declaration of intent to consume the request
209        // body — until this point, we've advertised a zero recv window and the peer has sent
210        // nothing beyond HEADERS. Tell the driver to top up our per-stream window now. Later
211        // calls CAS-fail silently and don't re-signal.
212        let recv_state = &self.state.recv;
213        let connection = &*self.connection;
214        if !recv_state.is_reading.swap(true, Ordering::AcqRel) {
215            self.state.needs_servicing.store(true, Ordering::Release);
216            connection.outbound_waker().wake();
217        }
218
219        let mut recv = recv_state.buf.lock().expect("recv buf mutex poisoned");
220
221        // Copy as many bytes as fit from the front of the ring into `out`, then advance the
222        // ring's virtual read cursor. `Buffer::ignore_front` truncates the underlying `Vec` to
223        // zero when we drain fully, so capacity stays bounded by peak in-flight bytes rather
224        // than cumulative traffic.
225        let take = out.len().min(recv.len());
226        if take > 0 {
227            out[..take].copy_from_slice(&recv[..take]);
228            recv.ignore_front(take);
229            // Drop the buf lock before the waker fire so the driver can grab it without
230            // contention when it wakes.
231            drop(recv);
232            // Tell the driver how many bytes the handler consumed so it can emit a matching
233            // `WINDOW_UPDATE` and keep the peer's stream + connection windows topped up.
234            // `fetch_add` accumulates across calls that happen before the driver's next
235            // service tick; the driver's `swap(0)` takes the whole batch at once.
236            recv_state
237                .bytes_consumed
238                .fetch_add(take as u64, Ordering::AcqRel);
239            self.state.needs_servicing.store(true, Ordering::Release);
240            connection.outbound_waker().wake();
241            return Poll::Ready(Ok(take));
242        }
243
244        // Buffer empty. Register the waker *before* releasing the buf lock so a driver
245        // push between this poll and the lifecycle-eof check is guaranteed to wake us:
246        //   1. We take buf lock (driver-push blocked).
247        //   2. We register waker.
248        //   3. We drop buf lock (driver-push may now proceed and fire waker).
249        //   4. We take lifecycle lock to check eof.
250        //   5. Return Pending or Ready(0); if a push raced through step 3, the waker is registered
251        //      and a fresh poll will see the new bytes.
252        recv_state.waker.register(cx.waker());
253        drop(recv);
254        if self.state.lifecycle_lock().recv_eof() {
255            return Poll::Ready(Ok(0));
256        }
257        Poll::Pending
258    }
259}
260
261impl AsyncWrite for H2Transport {
262    fn poll_write(
263        self: Pin<&mut Self>,
264        cx: &mut Context<'_>,
265        buf: &[u8],
266    ) -> Poll<io::Result<usize>> {
267        // Append into the per-stream outbound queue used by the extended-CONNECT
268        // (RFC 8441) upgrade path. The driver's send pump drains the same queue (via
269        // the upgrade body's `AsyncRead::poll_read`) into DATA frames bounded by
270        // per-stream + connection send windows.
271        //
272        // Bounded by `config.response_buffer_max_len` — the same cap h1 and h3 response
273        // paths use for their transit buffers. If the peer's flow-control window stalls
274        // (slow or malicious reader) the driver can't drain `outbound`, the cap is hit,
275        // and we return `Pending` so the handler is throttled. The drain side
276        // (`H2OutboundReader::poll_read`) wakes `outbound_write_waker` after each take.
277        let send = &self.state.send;
278
279        // Reject writes once the upgrade has begun closing — either the user called
280        // `poll_close`, or `submit_trailers` staged trailers + close. Reject as well in
281        // terminal states. Anything else (normal upgrade flow) is `UpgradeOpen` and
282        // accepts writes.
283        // Only `UpgradeOpen` accepts writes. Anything else (upgrade past close, terminal
284        // states, or a non-upgrade lifecycle that never had an `H2OutboundReader` to
285        // drain) returns `BrokenPipe` — the `AsyncWrite` impl is structurally meaningful
286        // only during an active extended-CONNECT upgrade.
287        if !matches!(
288            &*self.state.lifecycle_lock(),
289            StreamLifecycle::UpgradeOpen { .. }
290        ) {
291            return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
292        }
293
294        let cap = self.connection.context.config.response_buffer_max_len;
295        let mut outbound = send.outbound.lock().expect("outbound buf mutex poisoned");
296        if outbound.len() >= cap {
297            // Register first, then re-check under lock to close the race against the
298            // drain side (`H2OutboundReader::poll_read` takes the same lock to call
299            // `ignore_front` and then wakes us). If a drain landed between our length
300            // check and the register, the second check sees the freed space.
301            send.outbound_write_waker.register(cx.waker());
302            if outbound.len() >= cap {
303                return Poll::Pending;
304            }
305        }
306        let take = (cap - outbound.len()).min(buf.len());
307        log::trace!(
308            "h2 stream {}: H2Transport::poll_write appending {take}/{} bytes to outbound queue",
309            self.stream_id,
310            buf.len(),
311        );
312        outbound.extend_from_slice(&buf[..take]);
313        drop(outbound);
314
315        // Wake the driver task (if parked on the connection-level waker) and the
316        // upgrade body's poll_read (in case it's registered between driver ticks).
317        // Firing both is cheap and resolves the cross-task race where the driver
318        // happens to be parked on `connection.outbound_waker` rather than mid-body-poll.
319        send.outbound_waker.wake();
320        self.connection.outbound_waker().wake();
321        Poll::Ready(Ok(take))
322    }
323
324    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
325        // Best-effort: bytes appended via `poll_write` are already visible to the driver
326        // and will be framed on the next tick. There's no application-level "flushed"
327        // state below us to wait on.
328        Poll::Ready(Ok(()))
329    }
330
331    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
332        // Mark the upgrade write-half closed by transitioning `UpgradeOpen` →
333        // `UpgradeClosing`. Once the driver drains the remaining outbound bytes, the
334        // upgrade body's `poll_read` will return `Ready(0)`, the send pump transitions
335        // through trailers (none) into `DATA(END_STREAM)`, and the stream tears down via
336        // the normal `complete_and_remove_stream` path. Idempotent on any non-`UpgradeOpen`
337        // state — `poll_close` is allowed to be called multiple times.
338        log::trace!(
339            "h2 stream {}: H2Transport::poll_close marking outbound closed",
340            self.stream_id,
341        );
342        let mut lifecycle = self.state.lifecycle_lock();
343        if let StreamLifecycle::UpgradeOpen { recv_eof } = &*lifecycle {
344            *lifecycle = StreamLifecycle::UpgradeClosing {
345                recv_eof: *recv_eof,
346                pending_trailers: None,
347            };
348        }
349        drop(lifecycle);
350        self.state.send.outbound_waker.wake();
351        self.connection.outbound_waker().wake();
352        Poll::Ready(Ok(()))
353    }
354}
355
356/// Shared per-stream state. Owned by an [`Arc`] held jointly by the driver (via the connection's
357/// stream table) and the handler task (via [`H2Transport`]).
358///
359/// The cross-task-visible state machine is the [`StreamLifecycle`] held in [`Self::lifecycle`];
360/// the recv buffer / wakers / completion signal channel are independent data and stay as
361/// sibling fields. See the [`lifecycle`][super::lifecycle] module docs for the rationale.
362#[derive(Debug, Default)]
363pub(super) struct StreamState {
364    /// Cross-task-visible per-stream state machine. The variants encode the legal
365    /// observable states of a stream; predicates ([`StreamLifecycle::is_in_flight`],
366    /// [`StreamLifecycle::has_pending_recv`], [`StreamLifecycle::recv_eof`],
367    /// [`StreamLifecycle::has_active_send`]) and code that needs to make decisions
368    /// match on the variants directly.
369    pub(super) lifecycle: Mutex<StreamLifecycle>,
370
371    /// Recv side: inbound DATA payloads, handler waker, handler-intent signal, trailers.
372    /// Independent of [`Self::lifecycle`] — these are data, not state.
373    pub(super) recv: RecvState,
374
375    /// Send side: outbound upgrade buffer + wakers + the driver→conn-task completion
376    /// signal channel. Independent of [`Self::lifecycle`].
377    pub(super) send: SendState,
378
379    /// Mailbox flag for conn-task → driver work signaling. Set by conn-task code
380    /// whenever it produces work the driver should service (lifecycle transition,
381    /// [`RecvState::bytes_consumed`] increment, [`RecvState::is_reading`] transition).
382    /// The driver's `service_handler_signals` consults this via `swap(false, AcqRel)` —
383    /// only streams where it returns `true` pay for the lifecycle-lock pickup. Idle
384    /// streams cost a single atomic RMW per tick.
385    ///
386    /// **Setter ordering rule**: write the underlying state first, then store `true`
387    /// with `Release`, then call [`H2Connection::outbound_waker`][super::H2Connection]`.wake()`.
388    /// Over-notification is harmless; under-notification would lose a signal.
389    pub(super) needs_servicing: AtomicBool,
390}
391
392impl StreamState {
393    /// Lock the per-stream lifecycle. Convenience wrapper around the inner mutex's
394    /// `lock().expect(...)` — every call site treats poisoning as a programming error.
395    pub(super) fn lifecycle_lock(&self) -> MutexGuard<'_, StreamLifecycle> {
396        self.lifecycle.lock().expect("lifecycle mutex poisoned")
397    }
398}
399
400/// Receive-side per-stream state.
401#[derive(Debug, Default)]
402pub(super) struct RecvState {
403    /// Inbound DATA body bytes awaiting handler read. A single persistent ring (append-at-tail,
404    /// `ignore_front`-at-head): the driver appends via `extend_from_slice` when a DATA frame
405    /// arrives; the handler reads from the front and virtually drops consumed bytes. When
406    /// `ignore_front` catches up to the data end the `Buffer` truncates to zero, so the
407    /// underlying `Vec` capacity stays bounded by peak in-flight bytes rather than cumulative
408    /// traffic — zero amortized allocations per DATA frame.
409    pub(super) buf: Mutex<Buffer>,
410
411    /// Handler-task waker, fired by the driver after pushing DATA into `buf` or after
412    /// the lifecycle transitions to recv-eof. Single-waiter: only one task ever polls a
413    /// given `H2Transport`.
414    pub(super) waker: AtomicWaker,
415
416    /// Set by the handler's first [`H2Transport::poll_read`] to declare intent to consume the
417    /// request body. The driver observes the transition and emits a `WINDOW_UPDATE` for this
418    /// stream, topping its recv window up from `SETTINGS_INITIAL_WINDOW_SIZE` (advertised as
419    /// `0`) to the per-stream maximum. Once set, stays set.
420    pub(super) is_reading: AtomicBool,
421
422    /// Bytes the handler has consumed from `buf` since the driver last sampled this counter.
423    /// Incremented by [`H2Transport::poll_read`] using `fetch_add` after each drain; the
424    /// driver reads it via `swap(0)` on each tick and emits stream-level + connection-level
425    /// `WINDOW_UPDATE` for the consumed total. Ensures a handler draining a body larger than
426    /// a single window doesn't stall the peer.
427    pub(super) bytes_consumed: AtomicU64,
428
429    /// Trailers, populated by the driver if a trailing HEADERS frame arrives for this stream.
430    /// Always written *before* `eof` is set, so once the handler observes `Ready(0)` on the
431    /// recv side, any trailers for this request are guaranteed to be in place.
432    ///
433    /// Taken out and moved into [`Conn::request_trailers`][crate::Conn] by the receiver-side
434    /// body state machine when it transitions to
435    /// [`ReceivedBodyState::End`][crate::received_body::ReceivedBodyState].
436    pub(super) trailers: Mutex<Option<Headers>>,
437
438    /// Client-role: response HEADERS field section, populated by the driver on the first
439    /// non-1xx HEADERS frame arrival for a client-initiated stream. Server role doesn't use
440    /// this slot (response HEADERS go *out* on the server, not in). Single-shot: the conn
441    /// task takes the `FieldSection` via [`H2Connection::response_headers`][super::H2Connection]
442    /// once; subsequent HEADERS arrivals on the same stream are interpreted as trailers and
443    /// routed to the [`Self::trailers`] slot. Interim 1xx HEADERS frames are discarded by
444    /// the driver without touching this slot or latching `first_response_headers_seen`.
445    pub(super) response_headers: Mutex<Option<FieldSection<'static>>>,
446
447    /// Client-role: latching flag for "first HEADERS arrived for this stream." Distinct from
448    /// `response_headers.is_some()` — the conn task drains that slot when it consumes
449    /// headers, so the driver can't use slot occupancy to distinguish "haven't seen
450    /// HEADERS yet" from "headers seen + already taken." Set inside `finalize_response_headers`
451    /// before that slot is populated; checked by `route_headers` on subsequent HEADERS to
452    /// route them as trailers. Never cleared.
453    pub(super) first_response_headers_seen: AtomicBool,
454
455    /// Client-role: waker the conn task registers via
456    /// [`H2Connection::response_headers`][super::H2Connection]; fired by the driver after
457    /// stashing the `FieldSection` in [`Self::response_headers`] *or* on stream removal (so
458    /// a parked conn task observing the stream gone surfaces `NotConnected` instead of
459    /// hanging).
460    pub(super) response_headers_waker: AtomicWaker,
461}
462
463/// Send-side per-stream state used to hand a response from the conn task to the driver,
464/// plus the outbound byte queue for extended-CONNECT upgraded streams.
465///
466/// **Normal response path**: the conn task fills `submission` once via
467/// [`H2Connection::submit_send`][submit] and waits on `completion_waker` for `completed` to
468/// flip. The driver picks up the submission on its next `drive` tick, frames it (HEADERS,
469/// DATA, optional trailing HEADERS) into the connection's outbound buffer as send-side flow
470/// control allows, and on completion stores the `completion_result`, sets `completed = true`,
471/// and wakes the conn task.
472///
473/// **Extended-CONNECT upgrade path** ([RFC 8441]): the conn task calls
474/// [`H2Connection::submit_upgrade`][submit_upgrade], which constructs an
475/// [`H2OutboundReader`] over `outbound` / `outbound_close_requested` /
476/// `outbound_waker` and submits it as the response body. The driver signals
477/// `completion_waker` as soon as the response HEADERS frame is on the wire (instead of
478/// waiting for the body to drain), so the conn task's `submit_upgrade().await` returns and
479/// the runtime adapter can dispatch [`Handler::upgrade`][trillium::Handler::upgrade]. The
480/// upgrade handler then writes through [`H2Transport`]'s `AsyncWrite`, which appends to
481/// `outbound`; the driver's send pump pulls those bytes out via the body's `AsyncRead`
482/// and frames them as DATA. Closing the transport sets `outbound_close_requested`, the
483/// reader returns `Ready(0)`, and the send pump terminates the stream with
484/// `DATA(END_STREAM)`.
485///
486/// [submit]: super::H2Connection::submit_send
487/// [submit_upgrade]: super::H2Connection::submit_upgrade
488/// [RFC 8441]: https://www.rfc-editor.org/rfc/rfc8441
489#[derive(Debug, Default)]
490pub(super) struct SendState {
491    /// Set to `true` by the driver once the response has been fully framed, flushed, or
492    /// errored. The conn task's `SubmitSend` future polls this atomic and registers on
493    /// `completion_waker`. Independent signal channel from the lifecycle — on the
494    /// extended-CONNECT upgrade path completion is signalled *early* (at `END_HEADERS`,
495    /// before the body is on the wire) so the runtime can dispatch `Handler::upgrade`;
496    /// the lifecycle stays in `UpgradeOpen` past that point.
497    pub(super) completed: AtomicBool,
498
499    /// The driver writes the final result here before flipping `completed`. The conn task
500    /// takes it once `completed` is observed true.
501    pub(super) completion_result: Mutex<Option<io::Result<()>>>,
502
503    /// The conn task's waker, registered by `SubmitSend::poll` and fired by the driver
504    /// after `completed` is set.
505    pub(super) completion_waker: AtomicWaker,
506
507    /// Outbound bytes for an extended-CONNECT (RFC 8441) upgraded stream.
508    /// Appended to by [`H2Transport`]'s `AsyncWrite::poll_write` and drained by the
509    /// upgrade body's `AsyncRead::poll_read` (the driver-task side of the send pump).
510    /// Empty for normal responses — the driver pumps the response [`Body`] directly.
511    pub(super) outbound: Mutex<Buffer>,
512
513    /// Waker for the upgrade body's `poll_read`. Fired by [`H2Transport::poll_write`]
514    /// after appending bytes and by [`H2Transport::poll_close`] after the lifecycle
515    /// transitions to `UpgradeClosing`. Registered by the body during its `poll_read`
516    /// when it observes an empty buffer and an `UpgradeOpen` lifecycle.
517    pub(super) outbound_waker: AtomicWaker,
518
519    /// Reverse-direction waker: registered by [`H2Transport::poll_write`] when `outbound`
520    /// has reached the configured cap, fired by [`H2OutboundReader::poll_read`] after it
521    /// drains bytes (i.e. after `ignore_front`) so a parked writer can resume. This is the
522    /// edge that surfaces peer flow-control backpressure to the upgrade handler — without
523    /// it, a slow or unresponsive peer's closed window would let `outbound` grow without
524    /// bound.
525    pub(super) outbound_write_waker: AtomicWaker,
526}
527
528/// `AsyncRead` source the driver uses as the response body for an extended-CONNECT upgrade.
529///
530/// Reads from [`SendState::outbound`] — the same per-stream queue [`H2Transport`]'s
531/// `AsyncWrite::poll_write` appends to. Returns `Ready(0)` once the queue is empty and
532/// [`SendState::outbound_close_requested`] has been set (handler dropped or called
533/// `poll_close` on the transport), at which point the driver's send pump transitions
534/// through trailers (none) into `DATA(END_STREAM)` and tears the stream down.
535///
536/// Constructed by [`H2Connection::submit_upgrade`][super::H2Connection::submit_upgrade];
537/// wrapped in [`Body::new_streaming`] so the existing send pump can pump it as if it were
538/// any other unknown-length response body.
539#[derive(Debug)]
540pub(super) struct H2OutboundReader {
541    state: Arc<StreamState>,
542    stream_id: u32,
543}
544
545impl H2OutboundReader {
546    pub(super) fn new(state: Arc<StreamState>, stream_id: u32) -> Self {
547        Self { state, stream_id }
548    }
549}
550
551impl AsyncRead for H2OutboundReader {
552    fn poll_read(
553        self: Pin<&mut Self>,
554        cx: &mut Context<'_>,
555        out: &mut [u8],
556    ) -> Poll<io::Result<usize>> {
557        if out.is_empty() {
558            return Poll::Ready(Ok(0));
559        }
560
561        let send = &self.state.send;
562        let mut outbound = send.outbound.lock().expect("outbound buf mutex poisoned");
563        let take = out.len().min(outbound.len());
564        if take > 0 {
565            out[..take].copy_from_slice(&outbound[..take]);
566            outbound.ignore_front(take);
567            log::trace!(
568                "h2 stream {}: H2OutboundReader::poll_read drained {take} bytes",
569                self.stream_id,
570            );
571            // Drop the lock before waking — the writer reacquires it on resume.
572            drop(outbound);
573            // Surface flow-control backpressure: wake any writer parked on
574            // `outbound_write_waker` because the cap was hit. Registered-but-still-full
575            // is harmless — the writer's recheck under lock observes the new len.
576            send.outbound_write_waker.wake();
577            return Poll::Ready(Ok(take));
578        }
579
580        // Queue empty. Register first, then re-check the lifecycle. This closes the
581        // register-then-check race against `poll_close` / `submit_trailers` (both of
582        // which transition the lifecycle but don't take the outbound buf lock).
583        send.outbound_waker.register(cx.waker());
584
585        // EOF when the lifecycle has moved past `UpgradeOpen` — `UpgradeClosing`
586        // (`poll_close`/`submit_trailers` fired), `Reset*` (peer reset / local error),
587        // or terminal. Anything that hasn't reached EOF stays `UpgradeOpen` and we
588        // park.
589        let lifecycle_says_eof = !matches!(
590            &*self.state.lifecycle_lock(),
591            StreamLifecycle::UpgradeOpen { .. }
592        );
593        if lifecycle_says_eof {
594            log::trace!(
595                "h2 stream {}: H2OutboundReader::poll_read EOF (lifecycle past UpgradeOpen, queue \
596                 empty)",
597                self.stream_id,
598            );
599            return Poll::Ready(Ok(0));
600        }
601        Poll::Pending
602    }
603}
604
605// `Submission` lives in [`super::lifecycle`] — it's the payload of
606// `StreamLifecycle::Submitted`.
607
608#[cfg(test)]
609mod tests {
610    use super::*;
611    use crate::HttpContext;
612    use futures_lite::{AsyncRead, AsyncWrite};
613    use std::{
614        sync::{
615            Arc,
616            atomic::{AtomicBool, Ordering},
617        },
618        task::{Context, Poll, Wake, Waker},
619    };
620
621    struct CountingWaker(AtomicBool);
622    impl Wake for CountingWaker {
623        fn wake(self: Arc<Self>) {
624            self.0.store(true, Ordering::Release);
625        }
626    }
627
628    /// Build a `(transport, reader)` pair against a fresh `H2Connection`, with the
629    /// stream's lifecycle pre-set to `UpgradeOpen` — `H2Transport::poll_write` only
630    /// accepts writes for upgrade-lifecycle streams.
631    fn pair_with_cap(cap: usize) -> (H2Transport, H2OutboundReader) {
632        let mut context = HttpContext::new();
633        context.config.response_buffer_max_len = cap;
634        let connection = H2Connection::new(Arc::new(context));
635        let state = Arc::new(StreamState::default());
636        *state.lifecycle_lock() = StreamLifecycle::UpgradeOpen { recv_eof: true };
637        let transport = H2Transport::new(connection.clone(), 1, state.clone());
638        let reader = H2OutboundReader::new(state, 1);
639        (transport, reader)
640    }
641
642    #[test]
643    fn poll_write_caps_at_response_buffer_max_len() {
644        // Cap of 16 bytes. Writing 32 bytes should accept exactly 16 (partial-write
645        // semantics; AsyncWriteExt::write_all retries the rest).
646        let (mut transport, _reader) = pair_with_cap(16);
647        let waker = Waker::from(Arc::new(CountingWaker(AtomicBool::new(false))));
648        let mut cx = Context::from_waker(&waker);
649
650        let buf = [0u8; 32];
651        match Pin::new(&mut transport).poll_write(&mut cx, &buf) {
652            Poll::Ready(Ok(n)) => assert_eq!(n, 16, "should accept exactly cap bytes"),
653            other => panic!("expected Ready(Ok(16)), got {other:?}"),
654        }
655    }
656
657    #[test]
658    fn poll_write_returns_pending_when_full_and_drain_wakes() {
659        let (mut transport, mut reader) = pair_with_cap(8);
660        let counting = Arc::new(CountingWaker(AtomicBool::new(false)));
661        let writer_waker = Waker::from(counting.clone());
662        let mut writer_cx = Context::from_waker(&writer_waker);
663
664        // Fill the buffer to the cap.
665        let buf = [0u8; 8];
666        match Pin::new(&mut transport).poll_write(&mut writer_cx, &buf) {
667            Poll::Ready(Ok(8)) => {}
668            other => panic!("expected Ready(Ok(8)), got {other:?}"),
669        }
670
671        // Next write must return Pending — buffer is at cap.
672        let extra = [0u8; 4];
673        match Pin::new(&mut transport).poll_write(&mut writer_cx, &extra) {
674            Poll::Pending => {}
675            other => panic!("expected Pending, got {other:?}"),
676        }
677        assert!(
678            !counting.0.load(Ordering::Acquire),
679            "writer waker should not have fired yet"
680        );
681
682        // Drain via the reader — this should wake the writer.
683        let reader_waker = Waker::noop().clone();
684        let mut reader_cx = Context::from_waker(&reader_waker);
685        let mut sink = [0u8; 4];
686        match Pin::new(&mut reader).poll_read(&mut reader_cx, &mut sink) {
687            Poll::Ready(Ok(4)) => {}
688            other => panic!("expected Ready(Ok(4)), got {other:?}"),
689        }
690        assert!(
691            counting.0.load(Ordering::Acquire),
692            "drain should have woken the writer"
693        );
694
695        // Re-poll the writer — there's now room for the 4 extra bytes.
696        match Pin::new(&mut transport).poll_write(&mut writer_cx, &extra) {
697            Poll::Ready(Ok(4)) => {}
698            other => panic!("expected Ready(Ok(4)), got {other:?}"),
699        }
700    }
701
702    // Drop / submit_trailers / mark_drop_graceful unit tests are gone — those behaviors
703    // are now covered end-to-end by the wire-level fixture in
704    // [`super::super::acceptor::tests`] and by the `h2c_*` integration tests in
705    // `http/tests/upgrade_matrix.rs`.
706}