trillium-http 1.3.0

the http implementation for the trillium toolkit
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
//! Wire-level tests for [`H2Driver`].
//!
//! These tests sit above [`StreamState`][super::super::transport::StreamState] — they
//! drive the public(-ish) acceptor surface and assert what bytes appear on the wire, not
//! which per-stream booleans flipped. The bag-of-atomics / lifecycle-enum refactor is
//! below this layer; a future reader who only sees the test diff should not be able to
//! tell which implementation is in effect.
//!
//! See [`h2-lifecycle-refactor-plan`] (memory) for the enumerated tests this module is
//! meant to grow.

use crate::{
    Body, Conn, Headers, HttpContext, Method, Status,
    h2::{
        H2Driver, H2Error, H2ErrorCode, H2Transport,
        acceptor::{
            recv::CLIENT_PREFACE,
            types::{CloseOutcome, DriverState},
        },
        connection::H2Connection,
        frame::{
            FRAME_HEADER_LEN, Frame, FrameHeader, data as data_frame, headers as headers_frame,
            settings,
        },
        settings::H2Settings,
    },
    headers::{
        header_observer::HeaderObserver,
        hpack::{FieldSection, HpackEncoder, PseudoHeaders},
    },
};
use std::{
    sync::Arc,
    task::{Context, Poll, Wake, Waker},
};
use trillium_testing::TestTransport;

/// Marker waker — the driver's `drive` calls `wake_by_ref` to ensure the executor
/// re-polls after the cooperative-yield bound. Tests poll synchronously, so we don't
/// need a real wake; we just observe whether a poll returned `Ready` or `Pending`.
struct NoopWaker;
impl Wake for NoopWaker {
    fn wake(self: Arc<Self>) {}

    fn wake_by_ref(self: &Arc<Self>) {}
}

fn noop_waker() -> Waker {
    Waker::from(Arc::new(NoopWaker))
}

/// Paired-transport `H2Driver` test fixture. The driver runs over one half of a
/// `TestTransport` pair; the test code drives "the peer" through the other half — writing
/// frames synchronously into the driver's read side and pulling outbound bytes from the
/// driver's write side.
///
/// Each `tick` advances the driver one `drive` call (one full `copy_loops_per_yield`
/// budget). Outbound bytes are revealed incrementally via [`Self::next_outbound_bytes`]
/// so multi-step tests can isolate what each step emitted.
pub(super) struct DriverFixture {
    pub(super) driver: H2Driver<TestTransport>,
    pub(super) connection: Arc<H2Connection>,
    pub(super) peer: TestTransport,
    peer_read_cursor: usize,

    /// Peer-side HPACK encoder. Independent dynamic table from the driver's encoder so
    /// frames the test writes peer-to-driver are encoded against this state, while frames
    /// the driver writes back are encoded against its own. Configured with
    /// `local_preferred_size = 0` so every header line is emitted as a literal-
    /// without-indexing — the driver's decoder learns nothing from the lines, keeping the
    /// two dynamic tables in (trivial) sync without bookkeeping.
    peer_hpack: HpackEncoder,
}

impl DriverFixture {
    /// Construct a server-role fixture. The driver starts in `AwaitingPreface`; tests
    /// that need the steady state should follow up with [`Self::complete_handshake`].
    pub(super) fn new_server() -> Self {
        let (driver_transport, peer) = TestTransport::new();
        let context = Arc::new(HttpContext::new());
        let connection = H2Connection::new(context);
        let driver = connection.clone().run(driver_transport);
        let peer_hpack = HpackEncoder::new(Arc::new(HeaderObserver::default()), 0, 0);
        Self {
            driver,
            connection,
            peer,
            peer_read_cursor: 0,
            peer_hpack,
        }
    }

    /// Open a peer-initiated request stream by writing a HEADERS frame with the supplied
    /// pseudo-headers. Body-less requests (`end_stream = true`) terminate the stream's
    /// recv side at this frame; otherwise the caller is responsible for sending a
    /// terminating DATA frame with `end_stream = true` (or RST_STREAM) to complete it.
    ///
    /// HEADERS are framed with `end_headers = true` (no CONTINUATION continuation).
    pub(super) fn peer_open_stream(
        &mut self,
        stream_id: u32,
        method: Method,
        path: &str,
        end_stream: bool,
    ) {
        let pseudos = PseudoHeaders::default()
            .with_method(method)
            .with_path(path)
            .with_scheme("http")
            .with_authority("test");
        let headers = Headers::new();
        let field_section = FieldSection::new(pseudos, &headers);
        let mut block = Vec::new();
        self.peer_hpack.encode(&field_section, &mut block);

        let block_len = u32::try_from(block.len()).expect("block fits u32");
        let mut frame = vec![0u8; FRAME_HEADER_LEN + block.len()];
        headers_frame::encode_prefix(stream_id, end_stream, true, None, block_len, 0, &mut frame)
            .expect("encode HEADERS prefix");
        frame[FRAME_HEADER_LEN..].copy_from_slice(&block);
        self.peer.write_all(&frame);
    }

    /// Write a peer-side trailing HEADERS frame on an existing `stream_id`. RFC 9113 §8.1
    /// requires `END_STREAM` and no pseudo-headers on the trailer block; both invariants
    /// are baked in here so tests just supply the trailer fields.
    pub(super) fn peer_trailers(&mut self, stream_id: u32, trailers: &Headers) {
        let field_section = FieldSection::new(PseudoHeaders::default(), trailers);
        let mut block = Vec::new();
        self.peer_hpack.encode(&field_section, &mut block);
        let block_len = u32::try_from(block.len()).expect("block fits u32");

        let mut frame = vec![0u8; FRAME_HEADER_LEN + block.len()];
        headers_frame::encode_prefix(stream_id, true, true, None, block_len, 0, &mut frame)
            .expect("encode HEADERS prefix");
        frame[FRAME_HEADER_LEN..].copy_from_slice(&block);
        self.peer.write_all(&frame);
    }

    /// Write a peer-side DATA frame carrying `payload` bytes on `stream_id`, with the
    /// supplied `end_stream` flag. No padding.
    pub(super) fn peer_data(&mut self, stream_id: u32, payload: &[u8], end_stream: bool) {
        let payload_len = u32::try_from(payload.len()).expect("data fits u32");
        let mut frame = vec![0u8; FRAME_HEADER_LEN + payload.len()];
        data_frame::encode_prefix(stream_id, end_stream, payload_len, 0, &mut frame)
            .expect("encode DATA prefix");
        frame[FRAME_HEADER_LEN..].copy_from_slice(payload);
        self.peer.write_all(&frame);
    }

    /// Drive the connection through the standard server-role handshake: client preface
    /// in, initial SETTINGS + connection-level WINDOW_UPDATE out, peer SETTINGS in,
    /// SETTINGS_ACK out. Asserts the driver lands in `Running` and that the expected
    /// frames appeared on the wire.
    pub(super) fn complete_handshake(&mut self) {
        // Peer writes the 24-byte preface immediately; a real client would as well.
        self.peer.write_all(CLIENT_PREFACE);

        // Drive through preface read → server SETTINGS queue → running. One tick is
        // usually sufficient (drive's inner copy_loops_per_yield budget covers it),
        // but tick a second time defensively in case scheduling shifts.
        let _ = self.tick();
        if self.driver.state != DriverState::Running {
            let _ = self.tick();
        }
        assert_eq!(
            self.driver.state,
            DriverState::Running,
            "driver should reach Running after preface",
        );

        // Peer writes an empty SETTINGS so the driver has something to ACK and the
        // recv pump has parsed at least one peer frame — keeps the post-handshake
        // start point realistic.
        let empty_settings = H2Settings::default();
        let mut buf = vec![0u8; settings::encoded_len(&empty_settings)];
        settings::encode(&empty_settings, &mut buf).expect("encode settings");
        self.peer.write_all(&buf);
        let _ = self.tick();

        // Burn off handshake bytes so subsequent assertions see only test-relevant frames.
        let _ = self.next_outbound_bytes();
    }

    /// One poll of the driver's `drive`. Returns `Ready(item)` if the driver yielded
    /// (new Conn or terminal result); `Pending` otherwise. Internally `drive` consumes
    /// up to `copy_loops_per_yield` of its inner work units per call.
    pub(super) fn tick(&mut self) -> Poll<Option<Result<Conn<H2Transport>, H2Error>>> {
        let waker = noop_waker();
        let mut cx = Context::from_waker(&waker);
        self.driver.drive(&mut cx)
    }

    /// Bytes the driver has written to the wire since the last call to this method (or
    /// since construction). Empty if no new outbound bytes have been flushed.
    pub(super) fn next_outbound_bytes(&mut self) -> Vec<u8> {
        let all = self.peer.snapshot();
        if all.len() <= self.peer_read_cursor {
            return Vec::new();
        }
        let bytes = all[self.peer_read_cursor..].to_vec();
        self.peer_read_cursor = all.len();
        bytes
    }

    /// Drain the next outbound bytes and decode them into a flat list of frames. Panics
    /// if the buffer doesn't end on a frame boundary or if any frame fails to decode —
    /// the wire-format invariants the driver upholds should be unconditional.
    pub(super) fn next_outbound_frames(&mut self) -> Vec<Frame> {
        decode_frames(&self.next_outbound_bytes())
    }
}

/// Decode a sequence of complete h2 frames from `bytes`. Panics on incomplete or
/// malformed input — the caller is expected to pass a buffer the driver has flushed in
/// full, so partial frames are a fixture bug rather than something to recover from.
fn decode_frames(bytes: &[u8]) -> Vec<Frame> {
    let mut frames = Vec::new();
    let mut offset = 0;
    while offset < bytes.len() {
        let header = FrameHeader::decode(&bytes[offset..]).expect("incomplete frame header");
        let frame_len = FRAME_HEADER_LEN + header.length as usize;
        let frame_bytes = &bytes[offset..offset + frame_len];
        let (frame, _consumed) = Frame::decode(frame_bytes).expect("frame decode");
        frames.push(frame);
        offset += frame_len;
    }
    frames
}

/// Convenience predicate — fixture parsing surfaces every frame as a `Frame` enum, and
/// most assertions count occurrences by variant rather than caring about fields.
fn count_goaways(frames: &[Frame]) -> usize {
    frames
        .iter()
        .filter(|f| matches!(f, Frame::Goaway { .. }))
        .count()
}

/// Fixture sanity check — the standard server-role handshake should produce a SETTINGS
/// frame and an initial connection-level WINDOW_UPDATE on the wire. Validates the test
/// helper machinery before relying on it in the lifecycle tests below.
#[test]
fn fixture_handshake_emits_settings_and_window_update() {
    let mut fx = DriverFixture::new_server();
    fx.peer.write_all(CLIENT_PREFACE);
    let _ = fx.tick();
    let _ = fx.tick();

    let frames = fx.next_outbound_frames();
    let settings_count = frames
        .iter()
        .filter(|f| matches!(f, Frame::Settings(_)))
        .count();
    let wu_count = frames
        .iter()
        .filter(|f| matches!(f, Frame::WindowUpdate { .. }))
        .count();
    assert!(
        settings_count >= 1,
        "expected initial SETTINGS in handshake outbound, got frames: {frames:?}",
    );
    assert!(
        wu_count >= 1,
        "expected initial WINDOW_UPDATE in handshake outbound, got frames: {frames:?}",
    );
}

/// Driver yields a `Conn` for a well-formed peer HEADERS opening a new stream. Validates
/// `peer_open_stream` + the recv-pump → `Action::Emit` path end-to-end before lifecycle
/// tests rely on it for setup.
#[test]
fn peer_headers_opening_stream_yields_conn() {
    let mut fx = DriverFixture::new_server();
    fx.complete_handshake();

    fx.peer_open_stream(1, Method::Get, "/", true);
    let polled = fx.tick();
    match polled {
        Poll::Ready(Some(Ok(conn))) => {
            assert_eq!(conn.method(), Method::Get);
            assert_eq!(conn.path(), "/");
        }
        other => panic!("expected Ready(Some(Ok(conn))) yielding the new request, got {other:?}"),
    }
}

/// Closing → Drained is gated on the in-flight stream predicate: while any stream has an
/// active send cursor or unfinished recv side, the driver stays in Closing — only once
/// both clear can it transition to Drained. Validates the behavior the wip-commit
/// docstring promises:
///
/// > Defer the transition while in-flight streams still have outbound (SendCursor not yet
/// > Complete) OR inbound (`recv.eof` not yet set) work.
///
/// Wire-level assertion: after begin_close with an in-flight stream open, no FIN-style
/// close happens (state stays Closing); after peer ends the stream's recv side, the
/// transition fires.
#[test]
fn closing_to_drained_waits_for_in_flight_stream() {
    let mut fx = DriverFixture::new_server();
    fx.complete_handshake();

    // Open stream 1 with `end_stream=false` so the recv side stays in-flight after the
    // request HEADERS — `has_pending_recv` will be true until peer END_STREAM lands.
    fx.peer_open_stream(1, Method::Post, "/", false);
    let conn = match fx.tick() {
        Poll::Ready(Some(Ok(conn))) => conn,
        other => panic!("expected Conn yielded for stream 1, got {other:?}"),
    };
    // Hold the conn for the duration of the test — dropping it would tear down the
    // H2Transport and let the stream complete via a different path than the one we're
    // exercising here.
    let _conn_guard = conn;

    fx.driver.begin_close(CloseOutcome::Graceful);
    let _ = fx.tick();
    assert_eq!(
        fx.driver.state,
        DriverState::Closing,
        "in-flight stream's open recv side should hold the driver in Closing",
    );

    // Peer closes its half of stream 1. Driver's recv pump (still running in Closing per
    // the wip commit) picks up END_STREAM, recv.eof flips, predicate clears, transition
    // fires.
    fx.peer_data(1, &[], true);
    let _ = fx.tick();
    assert_eq!(
        fx.driver.state,
        DriverState::Drained,
        "with the last in-flight stream's recv side closed, Closing should advance to Drained",
    );
}

/// Trailers staged via `submit_trailers` while a `SendCursor` is parked in `Body` phase
/// (waiting on the upgrade outbound buffer to fill) must still reach the wire as a
/// trailing HEADERS frame on the next driver tick. This is the trailers-stranding
/// regression that motivated the recent `transition_to_trailers` fallback in
/// [`send`][super::send]: previously, by the time the cursor reached `Body` EOF, the
/// only pickup site for `pending_trailers` had already run, and the trailers were lost.
#[test]
fn submit_trailers_lands_on_wire_after_body_parked() {
    let mut fx = DriverFixture::new_server();
    fx.complete_handshake();

    fx.peer_open_stream(1, Method::Get, "/", true);
    let _conn = match fx.tick() {
        Poll::Ready(Some(Ok(conn))) => conn,
        other => panic!("expected Conn yielded for stream 1, got {other:?}"),
    };

    // submit_upgrade installs an `H2OutboundReader` as the body, signals submission
    // completion at END_HEADERS, and leaves the cursor parked in Body until either bytes
    // appear in the outbound queue or `outbound_close_requested` flips.
    let pseudos = PseudoHeaders::default().with_status(Status::Ok);
    let _submit = fx.connection.submit_upgrade(1, pseudos, Headers::new());

    // Tick: HEADERS go out, cursor parks in Body (empty outbound, close not requested).
    let _ = fx.tick();
    let headers_round = fx.next_outbound_frames();
    assert!(
        headers_round.iter().any(|f| matches!(
            f,
            Frame::Headers {
                stream_id: 1,
                end_stream: false,
                ..
            }
        )),
        "response HEADERS (without END_STREAM) should be on the wire after first tick; got \
         {headers_round:?}",
    );

    // Outside the driver task: stage trailers + request close. The driver's send pump
    // must pick this up on its next tick despite the cursor being parked.
    let mut trailers = Headers::new();
    trailers.insert("grpc-status", "0");
    fx.connection
        .submit_trailers(1, trailers)
        .expect("submit_trailers on a live stream");

    let _ = fx.tick();
    let trailing = fx.next_outbound_frames();
    let trailing_headers = trailing
        .iter()
        .filter(|f| {
            matches!(
                f,
                Frame::Headers {
                    stream_id: 1,
                    end_stream: true,
                    ..
                }
            )
        })
        .count();
    assert_eq!(
        trailing_headers, 1,
        "exactly one trailing HEADERS with END_STREAM should land on the wire after \
         submit_trailers; got {trailing:?}",
    );
}

/// The send pump runs in `Closing` (not just `Running`): once we've begun closing, any
/// stream with a staged submission must still be framed and put on the wire — gRPC and
/// other late-trailer patterns submit the response right around the same time the
/// shutdown decision fires, and dropping the in-flight response would be a regression.
/// The wip-commit changed the send-pump's run condition from `Running` to
/// `Running | Closing` for this reason; this test pins it.
#[test]
fn send_pump_emits_response_in_closing() {
    let mut fx = DriverFixture::new_server();
    fx.complete_handshake();

    fx.peer_open_stream(1, Method::Get, "/", true);
    let _conn = match fx.tick() {
        Poll::Ready(Some(Ok(conn))) => conn,
        other => panic!("expected Conn yielded for stream 1, got {other:?}"),
    };

    // Stage a small response submission, then immediately begin_close — the send pump
    // hasn't picked it up yet, so the question is whether the pump runs in Closing.
    let pseudos = PseudoHeaders::default().with_status(Status::Ok);
    let body = Body::new_static(b"hi" as &[u8]);
    let _submit = fx
        .connection
        .submit_send(1, pseudos, Headers::new(), Some(body));
    fx.driver.begin_close(CloseOutcome::Graceful);
    let _ = fx.tick();

    let frames = fx.next_outbound_frames();
    let response_headers = frames
        .iter()
        .filter(|f| matches!(f, Frame::Headers { stream_id: 1, .. }))
        .count();
    let data_frames = frames
        .iter()
        .filter(|f| matches!(f, Frame::Data { stream_id: 1, .. }))
        .count();
    assert!(
        response_headers >= 1,
        "send pump should emit response HEADERS for stream 1 while Closing; got {frames:?}",
    );
    assert!(
        data_frames >= 1,
        "send pump should emit DATA for stream 1 while Closing; got {frames:?}",
    );
    let end_stream_data = frames.iter().any(|f| {
        matches!(
            f,
            Frame::Data {
                stream_id: 1,
                end_stream: true,
                ..
            }
        )
    });
    assert!(
        end_stream_data,
        "send pump should terminate stream 1 with END_STREAM; got {frames:?}",
    );
}

/// The recv pump runs in `Closing` (not just `Running`): trailing HEADERS the peer sends
/// after the driver has begun closing must still be decoded and stashed on the in-flight
/// stream's `recv.trailers` slot — otherwise gRPC trailers can vanish under shutdown
/// pressure. The wip-commit changed the read-side pump's run condition from
/// `Running` to `Running | Closing` for precisely this reason; this test pins the
/// behavior so the lifecycle refactor preserves it.
#[test]
fn recv_pump_decodes_trailing_headers_in_closing() {
    let mut fx = DriverFixture::new_server();
    fx.complete_handshake();

    // POST with end_stream=false leaves the request body open — we'll send trailing
    // HEADERS as the terminator instead of DATA(END_STREAM).
    fx.peer_open_stream(1, Method::Post, "/", false);
    let _conn = match fx.tick() {
        Poll::Ready(Some(Ok(conn))) => conn,
        other => panic!("expected Conn yielded for stream 1, got {other:?}"),
    };
    let state = fx
        .connection
        .streams_lock()
        .get(&1)
        .cloned()
        .expect("stream 1 registered");

    fx.driver.begin_close(CloseOutcome::Graceful);
    let _ = fx.tick();
    assert_eq!(fx.driver.state, DriverState::Closing);

    // Trailing HEADERS arrive *after* our GOAWAY went out. The recv-pump-in-Closing rule
    // says we keep decoding for streams already in flight.
    let mut trailers_in = Headers::new();
    trailers_in.insert("grpc-status", "0");
    trailers_in.insert("grpc-message", "ok");
    fx.peer_trailers(1, &trailers_in);
    let _ = fx.tick();

    let stashed = state
        .recv
        .trailers
        .lock()
        .expect("recv.trailers mutex poisoned")
        .clone()
        .expect("driver should have stashed trailers from the post-GOAWAY frame");
    assert_eq!(stashed.get_str("grpc-status"), Some("0"));
    assert_eq!(stashed.get_str("grpc-message"), Some("ok"));
}

/// A peer HEADERS opening a *new* stream while the driver is in `Closing` must not be
/// yielded as a `Conn` — once we've sent GOAWAY, the peer shouldn't be opening new
/// streams, and even if it does we mustn't dispatch a handler for one we're about to tear
/// down. Pairs with [`closing_to_drained_waits_for_in_flight_stream`] above (which keeps
/// the driver in Closing long enough to observe this branch).
#[test]
fn closing_discards_new_stream_headers() {
    let mut fx = DriverFixture::new_server();
    fx.complete_handshake();

    // Keep the driver in Closing by holding an in-flight stream with an open recv side.
    fx.peer_open_stream(1, Method::Post, "/", false);
    let stream_one = match fx.tick() {
        Poll::Ready(Some(Ok(conn))) => conn,
        other => panic!("expected Conn yielded for stream 1, got {other:?}"),
    };

    fx.driver.begin_close(CloseOutcome::Graceful);
    let _ = fx.tick();
    assert_eq!(fx.driver.state, DriverState::Closing);
    let _ = fx.next_outbound_bytes();

    // Peer (misbehaving) opens a new stream past the GOAWAY.
    fx.peer_open_stream(3, Method::Get, "/late", true);
    let polled = fx.tick();
    assert!(
        !matches!(polled, Poll::Ready(Some(Ok(_)))),
        "post-GOAWAY HEADERS opening a new stream must not yield a Conn; got {polled:?}",
    );

    // Cleanup: drop the held stream-1 conn so its Drop doesn't outlive the fixture and
    // accidentally interleave assertions in a later test (unimportant for correctness;
    // makes the test scope explicit).
    drop(stream_one);
}

/// `begin_close` is idempotent: a second call once the driver is already `Closing` (or
/// `Drained`) does not queue another GOAWAY and does not overwrite the prior close
/// outcome. The peer-mirror case in the wild — peer GOAWAY arrives after we've already
/// begun closing — would otherwise ping-pong, each side re-arming on the other's frame.
///
/// Asserts at the wire level (count of GOAWAY frames in outbound bytes) so the
/// future lifecycle-enum refactor doesn't change what this test exercises.
#[test]
fn begin_close_is_idempotent() {
    let mut fx = DriverFixture::new_server();
    fx.complete_handshake();
    assert_eq!(fx.driver.state, DriverState::Running);

    // First close: graceful. Drains outbound to put the GOAWAY on the wire.
    fx.driver.begin_close(CloseOutcome::Graceful);
    let _ = fx.tick();
    assert_eq!(fx.driver.state, DriverState::Drained);
    let first_round = fx.next_outbound_frames();
    assert_eq!(
        count_goaways(&first_round),
        1,
        "graceful begin_close should emit exactly one GOAWAY; got {first_round:?}",
    );
    let first_goaway_code = first_round.iter().find_map(|f| match f {
        Frame::Goaway { error_code, .. } => Some(*error_code),
        _ => None,
    });
    assert_eq!(
        first_goaway_code,
        Some(H2ErrorCode::NoError),
        "graceful close should queue NoError, got {first_goaway_code:?}",
    );

    // Second close: protocol error. Must be a no-op — no fresh GOAWAY, state unchanged.
    fx.driver
        .begin_close(CloseOutcome::Protocol(H2ErrorCode::InternalError));
    let _ = fx.tick();
    let second_round = fx.next_outbound_frames();
    assert_eq!(
        count_goaways(&second_round),
        0,
        "second begin_close after Closing/Drained must not re-queue GOAWAY; got {second_round:?}",
    );
}