trillium_http/h2/transport.rs
1//! Per-stream transport handed to handler tasks.
2//!
3//! [`H2Transport`] is the [`AsyncRead`] + [`AsyncWrite`] view of a single HTTP/2 stream. It is
4//! carried on the emitted [`Conn`][crate::Conn] returned from [`H2Driver::next`], and the
5//! runtime adapter spawns a handler task that consumes it. The transport never touches the
6//! underlying TCP connection directly — all I/O coordinates through shared per-stream state
7//! on the [`H2Connection`] driven by the driver task.
8//!
9//! Two paths reach the impls:
10//!
11//! - **Normal HTTP/2 request/response**: handlers usually don't touch [`H2Transport`] directly
12//! (same sharp edge h1 and h3 document). [`ReceivedBody`][crate::ReceivedBody] reads request body
13//! bytes through the transport's `AsyncRead` via
14//! [`ReceivedBody::handle_raw`][crate::ReceivedBody::handle_raw]. Response bytes flow through
15//! [`H2Connection::submit_send`][submit_send] to the driver's send pump, which frames HEADERS +
16//! DATA + trailing HEADERS onto the connection without ever touching this `AsyncWrite`.
17//!
18//! - **Extended-CONNECT upgrades** ([RFC 8441] WebSocket-over-h2, plus the in-progress
19//! `draft-ietf-webtrans-http2` for WebTransport-over-h2): after the handler responds 200 to a
20//! `CONNECT` request with a `:protocol` pseudo-header, [`Conn::send_h2`][crate::Conn::send_h2]
21//! routes through [`H2Connection::submit_upgrade`][submit_upgrade] which frames HEADERS without
22//! `END_STREAM`, signals send completion early, and leaves the stream open as a bidirectional
23//! byte channel. The runtime adapter then dispatches
24//! [`Handler::upgrade`][trillium::Handler::upgrade], which gets an [`Upgrade`][crate::Upgrade]
25//! wrapping this transport. `AsyncWrite::poll_write` appends to a per-stream outbound queue
26//! ([`SendState::outbound`]); the driver's send pump drains it into DATA frames bounded by the
27//! per-stream and connection send windows. `AsyncWrite::poll_close` flips
28//! [`SendState::outbound_close_requested`] so the driver eventually emits `DATA(END_STREAM)` and
29//! tears the stream down.
30//!
31//! [`H2Driver::next`]: super::H2Driver::next
32//! [`H2Connection`]: super::H2Connection
33//! [`BoxedTransport`]: crate::transport::BoxedTransport
34//! [submit_send]: super::H2Connection::submit_send
35//! [submit_upgrade]: super::H2Connection::submit_upgrade
36//! [RFC 8441]: https://www.rfc-editor.org/rfc/rfc8441
37
38use super::{H2Connection, H2ErrorCode, lifecycle::StreamLifecycle};
39use crate::{Buffer, Headers, headers::hpack::FieldSection};
40use atomic_waker::AtomicWaker;
41use futures_lite::io::{AsyncRead, AsyncWrite};
42use std::{
43 fmt, io,
44 pin::Pin,
45 sync::{
46 Arc, Mutex, MutexGuard,
47 atomic::{AtomicBool, AtomicU64, Ordering},
48 },
49 task::{Context, Poll},
50};
51
52/// A single HTTP/2 stream's transport handle.
53///
54/// Carries a backref to the shared [`H2Connection`], the stream id, and the per-stream
55/// `Arc<StreamState>` used by the read side. Normal HTTP/2 operation reads through
56/// [`ReceivedBody`][crate::ReceivedBody] and writes through the connection's send queue;
57/// the `AsyncRead` / `AsyncWrite` impls here are only reached by code that borrows the
58/// transport directly (typically an upgrade handler after extended CONNECT).
59pub struct H2Transport {
60 connection: Arc<H2Connection>,
61 stream_id: u32,
62 state: Arc<StreamState>,
63}
64
65impl fmt::Debug for H2Transport {
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 f.debug_struct("H2Transport")
68 .field("stream_id", &self.stream_id)
69 .finish_non_exhaustive()
70 }
71}
72
73impl H2Transport {
74 /// Create a transport for a stream that has just been opened by the driver.
75 pub(super) fn new(
76 connection: Arc<H2Connection>,
77 stream_id: u32,
78 state: Arc<StreamState>,
79 ) -> Self {
80 Self {
81 connection,
82 stream_id,
83 state,
84 }
85 }
86
87 /// The stream identifier this transport is bound to.
88 pub fn stream_id(&self) -> u32 {
89 self.stream_id
90 }
91
92 /// The shared [`H2Connection`] backing this stream.
93 pub fn connection(&self) -> &Arc<H2Connection> {
94 &self.connection
95 }
96}
97
98impl Drop for H2Transport {
99 /// Application-side release / cancel signal. A single flat match on the lifecycle
100 /// variant — the variant *is* the answer, so there is no ordering between checks to
101 /// get wrong:
102 ///
103 /// - `Reset` / `ResetRequested`: stream is already torn down or about to be; no-op.
104 /// - `AwaitingRelease`: client-only wire-closed-but-held lifecycle. Signal release so the
105 /// driver removes the entry from both maps.
106 /// - `UpgradeOpen` / `UpgradeClosing`: extended-CONNECT lifecycle. Schedule graceful close —
107 /// the variant itself records that this stream's drop semantics are "graceful," no separate
108 /// `graceful_drop` flag needed. Wake is idempotent if already in `UpgradeClosing`.
109 /// - Anything else (`Idle` / `Submitted` / `Sending`): mid-stream drop on a normal
110 /// request/response — emit `RST_STREAM(Cancel)` so the peer learns we abandoned it.
111 fn drop(&mut self) {
112 // Cheap pre-check: if the stream is no longer in the shared map the driver has
113 // already cleaned up; nothing to do.
114 if !self.connection.streams_lock().contains_key(&self.stream_id) {
115 log::trace!(
116 "h2 stream {}: H2Transport dropped on already-released stream",
117 self.stream_id,
118 );
119 return;
120 }
121
122 let mut lifecycle = self.state.lifecycle_lock();
123 match &*lifecycle {
124 StreamLifecycle::Reset(_) | StreamLifecycle::ResetRequested(_) => {
125 log::trace!(
126 "h2 stream {}: H2Transport dropped on already-reset stream",
127 self.stream_id,
128 );
129 }
130 StreamLifecycle::AwaitingRelease => {
131 // Shouldn't ordinarily happen — this branch's Drop path is what
132 // *transitions* a stream into AwaitingRelease. Reaching it again means
133 // a second Drop, which is structurally impossible (`H2Transport` is not
134 // `Clone`); leaving it as a defensive no-op.
135 log::trace!(
136 "h2 stream {}: H2Transport dropped while already AwaitingRelease",
137 self.stream_id,
138 );
139 }
140 StreamLifecycle::UpgradeOpen { recv_eof } => {
141 let recv_eof = *recv_eof;
142 log::trace!(
143 "h2 stream {}: H2Transport dropped (upgrade) — scheduling graceful close",
144 self.stream_id,
145 );
146 *lifecycle = StreamLifecycle::UpgradeClosing {
147 recv_eof,
148 pending_trailers: None,
149 };
150 drop(lifecycle);
151 self.state.needs_servicing.store(true, Ordering::Release);
152 self.state.send.outbound_waker.wake();
153 self.connection.outbound_waker().wake();
154 }
155 StreamLifecycle::UpgradeClosing { .. } => {
156 log::trace!(
157 "h2 stream {}: H2Transport dropped — graceful close already in flight",
158 self.stream_id,
159 );
160 // Driver is already draining; just nudge in case it parked.
161 drop(lifecycle);
162 self.state.needs_servicing.store(true, Ordering::Release);
163 self.state.send.outbound_waker.wake();
164 self.connection.outbound_waker().wake();
165 }
166 _ => {
167 // Idle / Submitted / Sending: mid-stream drop. Check the wire-closed
168 // case first — recv_eof + send.completed means the application is done
169 // with a stream that already finished on the wire, and we forward to
170 // release (client-role keeps streams in the map past wire-close so
171 // late-trailer / late-read access works). Otherwise RST_STREAM(Cancel).
172 let send_done = self.state.send.completed.load(Ordering::Acquire);
173 let recv_done = lifecycle.recv_eof();
174 if send_done && recv_done {
175 log::trace!(
176 "h2 stream {}: H2Transport dropped on wire-closed stream — releasing",
177 self.stream_id,
178 );
179 *lifecycle = StreamLifecycle::AwaitingRelease;
180 drop(lifecycle);
181 self.state.needs_servicing.store(true, Ordering::Release);
182 self.connection.outbound_waker().wake();
183 } else {
184 log::debug!(
185 "h2 stream {}: H2Transport dropped mid-stream — RST_STREAM(Cancel)",
186 self.stream_id,
187 );
188 *lifecycle = StreamLifecycle::ResetRequested(H2ErrorCode::Cancel);
189 drop(lifecycle);
190 self.state.needs_servicing.store(true, Ordering::Release);
191 self.connection.outbound_waker().wake();
192 }
193 }
194 }
195 }
196}
197
198impl AsyncRead for H2Transport {
199 fn poll_read(
200 self: Pin<&mut Self>,
201 cx: &mut Context<'_>,
202 out: &mut [u8],
203 ) -> Poll<io::Result<usize>> {
204 if out.is_empty() {
205 return Poll::Ready(Ok(0));
206 }
207
208 // The first `poll_read` is the handler's declaration of intent to consume the request
209 // body — until this point, we've advertised a zero recv window and the peer has sent
210 // nothing beyond HEADERS. Tell the driver to top up our per-stream window now. Later
211 // calls CAS-fail silently and don't re-signal.
212 let recv_state = &self.state.recv;
213 let connection = &*self.connection;
214 if !recv_state.is_reading.swap(true, Ordering::AcqRel) {
215 self.state.needs_servicing.store(true, Ordering::Release);
216 connection.outbound_waker().wake();
217 }
218
219 let mut recv = recv_state.buf.lock().expect("recv buf mutex poisoned");
220
221 // Copy as many bytes as fit from the front of the ring into `out`, then advance the
222 // ring's virtual read cursor. `Buffer::ignore_front` truncates the underlying `Vec` to
223 // zero when we drain fully, so capacity stays bounded by peak in-flight bytes rather
224 // than cumulative traffic.
225 let take = out.len().min(recv.len());
226 if take > 0 {
227 out[..take].copy_from_slice(&recv[..take]);
228 recv.ignore_front(take);
229 // Drop the buf lock before the waker fire so the driver can grab it without
230 // contention when it wakes.
231 drop(recv);
232 // Tell the driver how many bytes the handler consumed so it can emit a matching
233 // `WINDOW_UPDATE` and keep the peer's stream + connection windows topped up.
234 // `fetch_add` accumulates across calls that happen before the driver's next
235 // service tick; the driver's `swap(0)` takes the whole batch at once.
236 recv_state
237 .bytes_consumed
238 .fetch_add(take as u64, Ordering::AcqRel);
239 self.state.needs_servicing.store(true, Ordering::Release);
240 connection.outbound_waker().wake();
241 return Poll::Ready(Ok(take));
242 }
243
244 // Buffer empty. Register the waker *before* releasing the buf lock so a driver
245 // push between this poll and the lifecycle-eof check is guaranteed to wake us:
246 // 1. We take buf lock (driver-push blocked).
247 // 2. We register waker.
248 // 3. We drop buf lock (driver-push may now proceed and fire waker).
249 // 4. We take lifecycle lock to check eof.
250 // 5. Return Pending or Ready(0); if a push raced through step 3, the waker is registered
251 // and a fresh poll will see the new bytes.
252 recv_state.waker.register(cx.waker());
253 drop(recv);
254 if self.state.lifecycle_lock().recv_eof() {
255 return Poll::Ready(Ok(0));
256 }
257 Poll::Pending
258 }
259}
260
261impl AsyncWrite for H2Transport {
262 fn poll_write(
263 self: Pin<&mut Self>,
264 cx: &mut Context<'_>,
265 buf: &[u8],
266 ) -> Poll<io::Result<usize>> {
267 // Append into the per-stream outbound queue used by the extended-CONNECT
268 // (RFC 8441) upgrade path. The driver's send pump drains the same queue (via
269 // the upgrade body's `AsyncRead::poll_read`) into DATA frames bounded by
270 // per-stream + connection send windows.
271 //
272 // Bounded by `config.response_buffer_max_len` — the same cap h1 and h3 response
273 // paths use for their transit buffers. If the peer's flow-control window stalls
274 // (slow or malicious reader) the driver can't drain `outbound`, the cap is hit,
275 // and we return `Pending` so the handler is throttled. The drain side
276 // (`H2OutboundReader::poll_read`) wakes `outbound_write_waker` after each take.
277 let send = &self.state.send;
278
279 // Reject writes once the upgrade has begun closing — either the user called
280 // `poll_close`, or `submit_trailers` staged trailers + close. Reject as well in
281 // terminal states. Anything else (normal upgrade flow) is `UpgradeOpen` and
282 // accepts writes.
283 // Only `UpgradeOpen` accepts writes. Anything else (upgrade past close, terminal
284 // states, or a non-upgrade lifecycle that never had an `H2OutboundReader` to
285 // drain) returns `BrokenPipe` — the `AsyncWrite` impl is structurally meaningful
286 // only during an active extended-CONNECT upgrade.
287 if !matches!(
288 &*self.state.lifecycle_lock(),
289 StreamLifecycle::UpgradeOpen { .. }
290 ) {
291 return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
292 }
293
294 let cap = self.connection.context.config.response_buffer_max_len;
295 let mut outbound = send.outbound.lock().expect("outbound buf mutex poisoned");
296 if outbound.len() >= cap {
297 // Register first, then re-check under lock to close the race against the
298 // drain side (`H2OutboundReader::poll_read` takes the same lock to call
299 // `ignore_front` and then wakes us). If a drain landed between our length
300 // check and the register, the second check sees the freed space.
301 send.outbound_write_waker.register(cx.waker());
302 if outbound.len() >= cap {
303 return Poll::Pending;
304 }
305 }
306 let take = (cap - outbound.len()).min(buf.len());
307 log::trace!(
308 "h2 stream {}: H2Transport::poll_write appending {take}/{} bytes to outbound queue",
309 self.stream_id,
310 buf.len(),
311 );
312 outbound.extend_from_slice(&buf[..take]);
313 drop(outbound);
314
315 // Wake the driver task (if parked on the connection-level waker) and the
316 // upgrade body's poll_read (in case it's registered between driver ticks).
317 // Firing both is cheap and resolves the cross-task race where the driver
318 // happens to be parked on `connection.outbound_waker` rather than mid-body-poll.
319 send.outbound_waker.wake();
320 self.connection.outbound_waker().wake();
321 Poll::Ready(Ok(take))
322 }
323
324 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
325 // Best-effort: bytes appended via `poll_write` are already visible to the driver
326 // and will be framed on the next tick. There's no application-level "flushed"
327 // state below us to wait on.
328 Poll::Ready(Ok(()))
329 }
330
331 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
332 // Mark the upgrade write-half closed by transitioning `UpgradeOpen` →
333 // `UpgradeClosing`. Once the driver drains the remaining outbound bytes, the
334 // upgrade body's `poll_read` will return `Ready(0)`, the send pump transitions
335 // through trailers (none) into `DATA(END_STREAM)`, and the stream tears down via
336 // the normal `complete_and_remove_stream` path. Idempotent on any non-`UpgradeOpen`
337 // state — `poll_close` is allowed to be called multiple times.
338 log::trace!(
339 "h2 stream {}: H2Transport::poll_close marking outbound closed",
340 self.stream_id,
341 );
342 let mut lifecycle = self.state.lifecycle_lock();
343 if let StreamLifecycle::UpgradeOpen { recv_eof } = &*lifecycle {
344 *lifecycle = StreamLifecycle::UpgradeClosing {
345 recv_eof: *recv_eof,
346 pending_trailers: None,
347 };
348 }
349 drop(lifecycle);
350 self.state.send.outbound_waker.wake();
351 self.connection.outbound_waker().wake();
352 Poll::Ready(Ok(()))
353 }
354}
355
356/// Shared per-stream state. Owned by an [`Arc`] held jointly by the driver (via the connection's
357/// stream table) and the handler task (via [`H2Transport`]).
358///
359/// The cross-task-visible state machine is the [`StreamLifecycle`] held in [`Self::lifecycle`];
360/// the recv buffer / wakers / completion signal channel are independent data and stay as
361/// sibling fields. See the [`lifecycle`][super::lifecycle] module docs for the rationale.
362#[derive(Debug, Default)]
363pub(super) struct StreamState {
364 /// Cross-task-visible per-stream state machine. The variants encode the legal
365 /// observable states of a stream; predicates ([`StreamLifecycle::is_in_flight`],
366 /// [`StreamLifecycle::has_pending_recv`], [`StreamLifecycle::recv_eof`],
367 /// [`StreamLifecycle::has_active_send`]) and code that needs to make decisions
368 /// match on the variants directly.
369 pub(super) lifecycle: Mutex<StreamLifecycle>,
370
371 /// Recv side: inbound DATA payloads, handler waker, handler-intent signal, trailers.
372 /// Independent of [`Self::lifecycle`] — these are data, not state.
373 pub(super) recv: RecvState,
374
375 /// Send side: outbound upgrade buffer + wakers + the driver→conn-task completion
376 /// signal channel. Independent of [`Self::lifecycle`].
377 pub(super) send: SendState,
378
379 /// Mailbox flag for conn-task → driver work signaling. Set by conn-task code
380 /// whenever it produces work the driver should service (lifecycle transition,
381 /// [`RecvState::bytes_consumed`] increment, [`RecvState::is_reading`] transition).
382 /// The driver's `service_handler_signals` consults this via `swap(false, AcqRel)` —
383 /// only streams where it returns `true` pay for the lifecycle-lock pickup. Idle
384 /// streams cost a single atomic RMW per tick.
385 ///
386 /// **Setter ordering rule**: write the underlying state first, then store `true`
387 /// with `Release`, then call [`H2Connection::outbound_waker`][super::H2Connection]`.wake()`.
388 /// Over-notification is harmless; under-notification would lose a signal.
389 pub(super) needs_servicing: AtomicBool,
390}
391
392impl StreamState {
393 /// Lock the per-stream lifecycle. Convenience wrapper around the inner mutex's
394 /// `lock().expect(...)` — every call site treats poisoning as a programming error.
395 pub(super) fn lifecycle_lock(&self) -> MutexGuard<'_, StreamLifecycle> {
396 self.lifecycle.lock().expect("lifecycle mutex poisoned")
397 }
398}
399
400/// Receive-side per-stream state.
401#[derive(Debug, Default)]
402pub(super) struct RecvState {
403 /// Inbound DATA body bytes awaiting handler read. A single persistent ring (append-at-tail,
404 /// `ignore_front`-at-head): the driver appends via `extend_from_slice` when a DATA frame
405 /// arrives; the handler reads from the front and virtually drops consumed bytes. When
406 /// `ignore_front` catches up to the data end the `Buffer` truncates to zero, so the
407 /// underlying `Vec` capacity stays bounded by peak in-flight bytes rather than cumulative
408 /// traffic — zero amortized allocations per DATA frame.
409 pub(super) buf: Mutex<Buffer>,
410
411 /// Handler-task waker, fired by the driver after pushing DATA into `buf` or after
412 /// the lifecycle transitions to recv-eof. Single-waiter: only one task ever polls a
413 /// given `H2Transport`.
414 pub(super) waker: AtomicWaker,
415
416 /// Set by the handler's first [`H2Transport::poll_read`] to declare intent to consume the
417 /// request body. The driver observes the transition and emits a `WINDOW_UPDATE` for this
418 /// stream, topping its recv window up from `SETTINGS_INITIAL_WINDOW_SIZE` (advertised as
419 /// `0`) to the per-stream maximum. Once set, stays set.
420 pub(super) is_reading: AtomicBool,
421
422 /// Bytes the handler has consumed from `buf` since the driver last sampled this counter.
423 /// Incremented by [`H2Transport::poll_read`] using `fetch_add` after each drain; the
424 /// driver reads it via `swap(0)` on each tick and emits stream-level + connection-level
425 /// `WINDOW_UPDATE` for the consumed total. Ensures a handler draining a body larger than
426 /// a single window doesn't stall the peer.
427 pub(super) bytes_consumed: AtomicU64,
428
429 /// Trailers, populated by the driver if a trailing HEADERS frame arrives for this stream.
430 /// Always written *before* `eof` is set, so once the handler observes `Ready(0)` on the
431 /// recv side, any trailers for this request are guaranteed to be in place.
432 ///
433 /// Taken out and moved into [`Conn::request_trailers`][crate::Conn] by the receiver-side
434 /// body state machine when it transitions to
435 /// [`ReceivedBodyState::End`][crate::received_body::ReceivedBodyState].
436 pub(super) trailers: Mutex<Option<Headers>>,
437
438 /// Client-role: response HEADERS field section, populated by the driver on the first
439 /// non-1xx HEADERS frame arrival for a client-initiated stream. Server role doesn't use
440 /// this slot (response HEADERS go *out* on the server, not in). Single-shot: the conn
441 /// task takes the `FieldSection` via [`H2Connection::response_headers`][super::H2Connection]
442 /// once; subsequent HEADERS arrivals on the same stream are interpreted as trailers and
443 /// routed to the [`Self::trailers`] slot. Interim 1xx HEADERS frames are discarded by
444 /// the driver without touching this slot or latching `first_response_headers_seen`.
445 pub(super) response_headers: Mutex<Option<FieldSection<'static>>>,
446
447 /// Client-role: latching flag for "first HEADERS arrived for this stream." Distinct from
448 /// `response_headers.is_some()` — the conn task drains that slot when it consumes
449 /// headers, so the driver can't use slot occupancy to distinguish "haven't seen
450 /// HEADERS yet" from "headers seen + already taken." Set inside `finalize_response_headers`
451 /// before that slot is populated; checked by `route_headers` on subsequent HEADERS to
452 /// route them as trailers. Never cleared.
453 pub(super) first_response_headers_seen: AtomicBool,
454
455 /// Client-role: waker the conn task registers via
456 /// [`H2Connection::response_headers`][super::H2Connection]; fired by the driver after
457 /// stashing the `FieldSection` in [`Self::response_headers`] *or* on stream removal (so
458 /// a parked conn task observing the stream gone surfaces `NotConnected` instead of
459 /// hanging).
460 pub(super) response_headers_waker: AtomicWaker,
461}
462
463/// Send-side per-stream state used to hand a response from the conn task to the driver,
464/// plus the outbound byte queue for extended-CONNECT upgraded streams.
465///
466/// **Normal response path**: the conn task fills `submission` once via
467/// [`H2Connection::submit_send`][submit] and waits on `completion_waker` for `completed` to
468/// flip. The driver picks up the submission on its next `drive` tick, frames it (HEADERS,
469/// DATA, optional trailing HEADERS) into the connection's outbound buffer as send-side flow
470/// control allows, and on completion stores the `completion_result`, sets `completed = true`,
471/// and wakes the conn task.
472///
473/// **Extended-CONNECT upgrade path** ([RFC 8441]): the conn task calls
474/// [`H2Connection::submit_upgrade`][submit_upgrade], which constructs an
475/// [`H2OutboundReader`] over `outbound` / `outbound_close_requested` /
476/// `outbound_waker` and submits it as the response body. The driver signals
477/// `completion_waker` as soon as the response HEADERS frame is on the wire (instead of
478/// waiting for the body to drain), so the conn task's `submit_upgrade().await` returns and
479/// the runtime adapter can dispatch [`Handler::upgrade`][trillium::Handler::upgrade]. The
480/// upgrade handler then writes through [`H2Transport`]'s `AsyncWrite`, which appends to
481/// `outbound`; the driver's send pump pulls those bytes out via the body's `AsyncRead`
482/// and frames them as DATA. Closing the transport sets `outbound_close_requested`, the
483/// reader returns `Ready(0)`, and the send pump terminates the stream with
484/// `DATA(END_STREAM)`.
485///
486/// [submit]: super::H2Connection::submit_send
487/// [submit_upgrade]: super::H2Connection::submit_upgrade
488/// [RFC 8441]: https://www.rfc-editor.org/rfc/rfc8441
489#[derive(Debug, Default)]
490pub(super) struct SendState {
491 /// Set to `true` by the driver once the response has been fully framed, flushed, or
492 /// errored. The conn task's `SubmitSend` future polls this atomic and registers on
493 /// `completion_waker`. Independent signal channel from the lifecycle — on the
494 /// extended-CONNECT upgrade path completion is signalled *early* (at `END_HEADERS`,
495 /// before the body is on the wire) so the runtime can dispatch `Handler::upgrade`;
496 /// the lifecycle stays in `UpgradeOpen` past that point.
497 pub(super) completed: AtomicBool,
498
499 /// The driver writes the final result here before flipping `completed`. The conn task
500 /// takes it once `completed` is observed true.
501 pub(super) completion_result: Mutex<Option<io::Result<()>>>,
502
503 /// The conn task's waker, registered by `SubmitSend::poll` and fired by the driver
504 /// after `completed` is set.
505 pub(super) completion_waker: AtomicWaker,
506
507 /// Outbound bytes for an extended-CONNECT (RFC 8441) upgraded stream.
508 /// Appended to by [`H2Transport`]'s `AsyncWrite::poll_write` and drained by the
509 /// upgrade body's `AsyncRead::poll_read` (the driver-task side of the send pump).
510 /// Empty for normal responses — the driver pumps the response [`Body`] directly.
511 pub(super) outbound: Mutex<Buffer>,
512
513 /// Waker for the upgrade body's `poll_read`. Fired by [`H2Transport::poll_write`]
514 /// after appending bytes and by [`H2Transport::poll_close`] after the lifecycle
515 /// transitions to `UpgradeClosing`. Registered by the body during its `poll_read`
516 /// when it observes an empty buffer and an `UpgradeOpen` lifecycle.
517 pub(super) outbound_waker: AtomicWaker,
518
519 /// Reverse-direction waker: registered by [`H2Transport::poll_write`] when `outbound`
520 /// has reached the configured cap, fired by [`H2OutboundReader::poll_read`] after it
521 /// drains bytes (i.e. after `ignore_front`) so a parked writer can resume. This is the
522 /// edge that surfaces peer flow-control backpressure to the upgrade handler — without
523 /// it, a slow or unresponsive peer's closed window would let `outbound` grow without
524 /// bound.
525 pub(super) outbound_write_waker: AtomicWaker,
526}
527
528/// `AsyncRead` source the driver uses as the response body for an extended-CONNECT upgrade.
529///
530/// Reads from [`SendState::outbound`] — the same per-stream queue [`H2Transport`]'s
531/// `AsyncWrite::poll_write` appends to. Returns `Ready(0)` once the queue is empty and
532/// [`SendState::outbound_close_requested`] has been set (handler dropped or called
533/// `poll_close` on the transport), at which point the driver's send pump transitions
534/// through trailers (none) into `DATA(END_STREAM)` and tears the stream down.
535///
536/// Constructed by [`H2Connection::submit_upgrade`][super::H2Connection::submit_upgrade];
537/// wrapped in [`Body::new_streaming`] so the existing send pump can pump it as if it were
538/// any other unknown-length response body.
539#[derive(Debug)]
540pub(super) struct H2OutboundReader {
541 state: Arc<StreamState>,
542 stream_id: u32,
543}
544
545impl H2OutboundReader {
546 pub(super) fn new(state: Arc<StreamState>, stream_id: u32) -> Self {
547 Self { state, stream_id }
548 }
549}
550
551impl AsyncRead for H2OutboundReader {
552 fn poll_read(
553 self: Pin<&mut Self>,
554 cx: &mut Context<'_>,
555 out: &mut [u8],
556 ) -> Poll<io::Result<usize>> {
557 if out.is_empty() {
558 return Poll::Ready(Ok(0));
559 }
560
561 let send = &self.state.send;
562 let mut outbound = send.outbound.lock().expect("outbound buf mutex poisoned");
563 let take = out.len().min(outbound.len());
564 if take > 0 {
565 out[..take].copy_from_slice(&outbound[..take]);
566 outbound.ignore_front(take);
567 log::trace!(
568 "h2 stream {}: H2OutboundReader::poll_read drained {take} bytes",
569 self.stream_id,
570 );
571 // Drop the lock before waking — the writer reacquires it on resume.
572 drop(outbound);
573 // Surface flow-control backpressure: wake any writer parked on
574 // `outbound_write_waker` because the cap was hit. Registered-but-still-full
575 // is harmless — the writer's recheck under lock observes the new len.
576 send.outbound_write_waker.wake();
577 return Poll::Ready(Ok(take));
578 }
579
580 // Queue empty. Register first, then re-check the lifecycle. This closes the
581 // register-then-check race against `poll_close` / `submit_trailers` (both of
582 // which transition the lifecycle but don't take the outbound buf lock).
583 send.outbound_waker.register(cx.waker());
584
585 // EOF when the lifecycle has moved past `UpgradeOpen` — `UpgradeClosing`
586 // (`poll_close`/`submit_trailers` fired), `Reset*` (peer reset / local error),
587 // or terminal. Anything that hasn't reached EOF stays `UpgradeOpen` and we
588 // park.
589 let lifecycle_says_eof = !matches!(
590 &*self.state.lifecycle_lock(),
591 StreamLifecycle::UpgradeOpen { .. }
592 );
593 if lifecycle_says_eof {
594 log::trace!(
595 "h2 stream {}: H2OutboundReader::poll_read EOF (lifecycle past UpgradeOpen, queue \
596 empty)",
597 self.stream_id,
598 );
599 return Poll::Ready(Ok(0));
600 }
601 Poll::Pending
602 }
603}
604
605// `Submission` lives in [`super::lifecycle`] — it's the payload of
606// `StreamLifecycle::Submitted`.
607
608#[cfg(test)]
609mod tests {
610 use super::*;
611 use crate::HttpContext;
612 use futures_lite::{AsyncRead, AsyncWrite};
613 use std::{
614 sync::{
615 Arc,
616 atomic::{AtomicBool, Ordering},
617 },
618 task::{Context, Poll, Wake, Waker},
619 };
620
621 struct CountingWaker(AtomicBool);
622 impl Wake for CountingWaker {
623 fn wake(self: Arc<Self>) {
624 self.0.store(true, Ordering::Release);
625 }
626 }
627
628 /// Build a `(transport, reader)` pair against a fresh `H2Connection`, with the
629 /// stream's lifecycle pre-set to `UpgradeOpen` — `H2Transport::poll_write` only
630 /// accepts writes for upgrade-lifecycle streams.
631 fn pair_with_cap(cap: usize) -> (H2Transport, H2OutboundReader) {
632 let mut context = HttpContext::new();
633 context.config.response_buffer_max_len = cap;
634 let connection = H2Connection::new(Arc::new(context));
635 let state = Arc::new(StreamState::default());
636 *state.lifecycle_lock() = StreamLifecycle::UpgradeOpen { recv_eof: true };
637 let transport = H2Transport::new(connection.clone(), 1, state.clone());
638 let reader = H2OutboundReader::new(state, 1);
639 (transport, reader)
640 }
641
642 #[test]
643 fn poll_write_caps_at_response_buffer_max_len() {
644 // Cap of 16 bytes. Writing 32 bytes should accept exactly 16 (partial-write
645 // semantics; AsyncWriteExt::write_all retries the rest).
646 let (mut transport, _reader) = pair_with_cap(16);
647 let waker = Waker::from(Arc::new(CountingWaker(AtomicBool::new(false))));
648 let mut cx = Context::from_waker(&waker);
649
650 let buf = [0u8; 32];
651 match Pin::new(&mut transport).poll_write(&mut cx, &buf) {
652 Poll::Ready(Ok(n)) => assert_eq!(n, 16, "should accept exactly cap bytes"),
653 other => panic!("expected Ready(Ok(16)), got {other:?}"),
654 }
655 }
656
657 #[test]
658 fn poll_write_returns_pending_when_full_and_drain_wakes() {
659 let (mut transport, mut reader) = pair_with_cap(8);
660 let counting = Arc::new(CountingWaker(AtomicBool::new(false)));
661 let writer_waker = Waker::from(counting.clone());
662 let mut writer_cx = Context::from_waker(&writer_waker);
663
664 // Fill the buffer to the cap.
665 let buf = [0u8; 8];
666 match Pin::new(&mut transport).poll_write(&mut writer_cx, &buf) {
667 Poll::Ready(Ok(8)) => {}
668 other => panic!("expected Ready(Ok(8)), got {other:?}"),
669 }
670
671 // Next write must return Pending — buffer is at cap.
672 let extra = [0u8; 4];
673 match Pin::new(&mut transport).poll_write(&mut writer_cx, &extra) {
674 Poll::Pending => {}
675 other => panic!("expected Pending, got {other:?}"),
676 }
677 assert!(
678 !counting.0.load(Ordering::Acquire),
679 "writer waker should not have fired yet"
680 );
681
682 // Drain via the reader — this should wake the writer.
683 let reader_waker = Waker::noop().clone();
684 let mut reader_cx = Context::from_waker(&reader_waker);
685 let mut sink = [0u8; 4];
686 match Pin::new(&mut reader).poll_read(&mut reader_cx, &mut sink) {
687 Poll::Ready(Ok(4)) => {}
688 other => panic!("expected Ready(Ok(4)), got {other:?}"),
689 }
690 assert!(
691 counting.0.load(Ordering::Acquire),
692 "drain should have woken the writer"
693 );
694
695 // Re-poll the writer — there's now room for the 4 extra bytes.
696 match Pin::new(&mut transport).poll_write(&mut writer_cx, &extra) {
697 Poll::Ready(Ok(4)) => {}
698 other => panic!("expected Ready(Ok(4)), got {other:?}"),
699 }
700 }
701
702 // Drop / submit_trailers / mark_drop_graceful unit tests are gone — those behaviors
703 // are now covered end-to-end by the wire-level fixture in
704 // [`super::super::acceptor::tests`] and by the `h2c_*` integration tests in
705 // `http/tests/upgrade_matrix.rs`.
706}