use super::fixture::*;
use crate::{
Headers, Method, Status,
h2::{H2ErrorCode, frame::Frame},
};
use futures_lite::AsyncWrite;
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll, Wake, Waker},
};
#[test]
fn submit_trailers_lands_on_wire_after_body_parked() {
use crate::{h2::frame::Frame, headers::hpack::PseudoHeaders};
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Get, "/", true);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let _submit = fx
.connection
.submit_upgrade(1, pseudos, Headers::new(), None);
let _ = fx.tick();
let headers_round = fx.next_outbound_frames();
assert!(
headers_round.iter().any(|f| matches!(
f,
Frame::Headers {
stream_id: 1,
end_stream: false,
..
}
)),
"response HEADERS (without END_STREAM) should be on the wire after first tick; got \
{headers_round:?}",
);
let mut trailers = Headers::new();
trailers.insert("grpc-status", "0");
fx.connection
.submit_trailers(1, trailers)
.expect("submit_trailers on a live stream");
let _ = fx.tick();
let trailing = fx.next_outbound_frames();
let trailing_headers = trailing
.iter()
.filter(|f| {
matches!(
f,
Frame::Headers {
stream_id: 1,
end_stream: true,
..
}
)
})
.count();
assert_eq!(
trailing_headers, 1,
"exactly one trailing HEADERS with END_STREAM should land on the wire after \
submit_trailers; got {trailing:?}",
);
}
#[test]
fn idle_upgrade_open_stream_parks_without_self_waking() {
use crate::headers::hpack::PseudoHeaders;
struct CountingWaker(std::sync::atomic::AtomicUsize);
impl Wake for CountingWaker {
fn wake(self: Arc<Self>) {
self.0.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
fn wake_by_ref(self: &Arc<Self>) {
self.0.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Get, "/", true);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let _submit = fx
.connection
.submit_upgrade(1, pseudos, Headers::new(), None);
let _ = fx.tick();
let _ = fx.next_outbound_bytes();
let counter = Arc::new(CountingWaker(std::sync::atomic::AtomicUsize::new(0)));
let waker = Waker::from(counter.clone());
let mut cx = Context::from_waker(&waker);
let polled = fx.driver.drive(&mut cx);
assert!(
matches!(polled, Poll::Pending),
"idle upgrade-open driver should park, got {polled:?}",
);
assert_eq!(
counter.0.load(std::sync::atomic::Ordering::SeqCst),
0,
"driver self-woke instead of parking — busy-spin on an idle bidi/upgrade tunnel",
);
}
#[test]
fn peer_end_stream_after_server_trailers_is_not_reset() {
use crate::headers::hpack::PseudoHeaders;
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let _submit = fx
.connection
.submit_upgrade(1, pseudos, Headers::new(), None);
let _ = fx.tick();
let _ = fx.next_outbound_frames();
let mut trailers = Headers::new();
trailers.insert("grpc-status", "0");
fx.connection
.submit_trailers(1, trailers)
.expect("submit_trailers on a live stream");
let _ = fx.tick();
let trailing = fx.next_outbound_frames();
assert!(
trailing.iter().any(|f| matches!(
f,
Frame::Headers {
stream_id: 1,
end_stream: true,
..
}
)),
"server's trailing HEADERS with END_STREAM should be on the wire; got {trailing:?}",
);
fx.peer_data(1, &[], true);
let _ = fx.tick();
let after = fx.next_outbound_frames();
assert!(
!after
.iter()
.any(|f| matches!(f, Frame::RstStream { stream_id: 1, .. })),
"peer's END_STREAM on a half-closed-local stream must close cleanly, not earn a \
RST_STREAM; got {after:?}",
);
}
#[test]
fn peer_half_close_does_not_tear_down_open_upgrade() {
use crate::headers::hpack::PseudoHeaders;
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let mut conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let _submit = fx
.connection
.submit_upgrade(1, pseudos, Headers::new(), None);
let _ = fx.tick();
let _ = fx.next_outbound_frames();
fx.peer_data(1, &[], true);
let _ = fx.tick();
assert!(
fx.connection.streams_lock().contains_key(&1),
"peer half-close on an open upgrade stream tore the whole stream down; the server's send \
half is still live and the handler is still writing",
);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let _ = Pin::new(&mut conn.transport).poll_write(&mut cx, b"hello bidi");
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
frames
.iter()
.any(|f| matches!(f, Frame::Data { stream_id: 1, .. })),
"handler's post-half-close write should be framed as DATA on stream 1; got {frames:?}",
);
}
#[test]
fn upgrade_completes_after_peer_half_closes_first() {
use crate::headers::hpack::PseudoHeaders;
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let mut conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let _submit = fx
.connection
.submit_upgrade(1, pseudos, Headers::new(), None);
let _ = fx.tick();
let _ = fx.next_outbound_frames();
fx.peer_data(1, &[], true);
let _ = fx.tick();
assert!(
fx.connection.streams_lock().contains_key(&1),
"stream must survive the peer's half-close while the handler is still open",
);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let _ = Pin::new(&mut conn.transport).poll_write(&mut cx, b"final");
let _ = fx.tick();
let mut trailers = Headers::new();
trailers.insert("grpc-status", "0");
fx.connection
.submit_trailers(1, trailers)
.expect("submit_trailers on a live stream");
let mut frames = Vec::new();
for _ in 0..4 {
let _ = fx.tick();
frames.extend(fx.next_outbound_frames());
}
assert!(
frames.iter().any(|f| matches!(
f,
Frame::Headers {
stream_id: 1,
end_stream: true,
..
}
)),
"handler completion should emit trailing HEADERS(END_STREAM); got {frames:?}",
);
assert!(
!fx.connection.streams_lock().contains_key(&1),
"with the send terminator framed and the peer already half-closed, the server should \
remove the stream",
);
}
#[test]
fn peer_rst_during_open_upgrade_rejects_further_writes() {
use crate::headers::hpack::PseudoHeaders;
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let mut conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let _submit = fx
.connection
.submit_upgrade(1, pseudos, Headers::new(), None);
let _ = fx.tick();
let _ = fx.next_outbound_frames();
fx.peer_rst_stream(1, H2ErrorCode::Cancel);
let _ = fx.tick();
assert!(
!fx.connection.streams_lock().contains_key(&1),
"peer RST_STREAM should remove the stream from the map",
);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match Pin::new(&mut conn.transport).poll_write(&mut cx, b"after reset") {
Poll::Ready(Err(e)) => assert_eq!(
e.kind(),
std::io::ErrorKind::BrokenPipe,
"post-RST write should report BrokenPipe",
),
other => panic!("post-RST write should fail with BrokenPipe, got {other:?}"),
}
}
#[test]
fn peer_data_after_its_own_end_stream_is_reset() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
fx.peer_data(1, b"hello", false);
let _ = fx.tick();
fx.peer_data(1, &[], true); let _ = fx.tick();
let _ = fx.next_outbound_frames();
fx.peer_data(1, b"extra", false);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
frames.iter().any(|f| matches!(
f,
Frame::RstStream {
stream_id: 1,
error_code: H2ErrorCode::StreamClosed,
}
)),
"DATA after the peer's own END_STREAM must earn RST_STREAM(STREAM_CLOSED); got {frames:?}",
);
}
#[test]
fn inbound_goaway_does_not_tear_down_open_upgrade() {
use crate::headers::hpack::PseudoHeaders;
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let mut conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let _submit = fx
.connection
.submit_upgrade(1, pseudos, Headers::new(), None);
let _ = fx.tick();
let _ = fx.next_outbound_frames();
fx.peer_goaway(0, H2ErrorCode::NoError);
let _ = fx.tick();
assert!(
fx.connection.streams_lock().contains_key(&1),
"inbound GOAWAY tore down an in-flight upgrade stream mid-bidi",
);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let _ = Pin::new(&mut conn.transport).poll_write(&mut cx, b"post-goaway");
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
frames
.iter()
.any(|f| matches!(f, Frame::Data { stream_id: 1, .. })),
"handler's post-GOAWAY write should be framed as DATA on stream 1; got {frames:?}",
);
}
#[test]
fn peer_rst_on_in_flight_request_errors_later_submit() {
use crate::{Body, headers::hpack::PseudoHeaders};
use std::{future::Future, io::ErrorKind};
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
fx.peer_rst_stream(1, H2ErrorCode::Cancel);
let _ = fx.tick();
assert!(
!fx.connection.streams_lock().contains_key(&1),
"peer RST_STREAM should remove the in-flight stream from the map",
);
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let mut submit = fx.connection.submit_send(
1,
pseudos,
Headers::new(),
Some(Body::new_static(b"hi" as &[u8])),
);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match Pin::new(&mut submit).poll(&mut cx) {
Poll::Ready(Err(e)) => assert_eq!(
e.kind(),
ErrorKind::NotConnected,
"submit_send on a reset/removed stream should resolve NotConnected",
),
other => panic!("submit_send after RST should resolve NotConnected, got {other:?}"),
}
}
#[test]
fn same_tick_peer_end_stream_and_response_terminator_closes_cleanly() {
use crate::{Body, headers::hpack::PseudoHeaders};
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
fx.peer_data(1, &[], true);
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let _submit = fx.connection.submit_send(
1,
pseudos,
Headers::new(),
Some(Body::new_static(b"hi" as &[u8])),
);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
frames.iter().any(|f| matches!(
f,
Frame::Data {
stream_id: 1,
end_stream: true,
..
}
)),
"response should terminate with DATA(END_STREAM); got {frames:?}",
);
assert!(
!frames
.iter()
.any(|f| matches!(f, Frame::RstStream { stream_id: 1, .. })),
"coalesced both-halves close must not emit a spurious RST_STREAM; got {frames:?}",
);
assert!(
!fx.connection.streams_lock().contains_key(&1),
"with both halves closed in one tick, the server should remove the stream",
);
}
#[test]
fn trailing_headers_without_end_stream_is_reset() {
use crate::headers::hpack::PseudoHeaders;
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let _ = fx.next_outbound_frames();
let mut trailers = Headers::new();
trailers.insert("grpc-status", "0");
fx.peer_headers(1, PseudoHeaders::default(), &trailers, false);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
frames.iter().any(|f| matches!(
f,
Frame::RstStream {
stream_id: 1,
error_code: H2ErrorCode::ProtocolError,
}
)),
"trailing HEADERS without END_STREAM must earn RST_STREAM(PROTOCOL_ERROR); got {frames:?}",
);
assert!(
!fx.connection.streams_lock().contains_key(&1),
"the malformed-trailer stream should be removed",
);
}
#[test]
fn trailing_headers_with_pseudo_header_is_reset() {
use crate::headers::hpack::PseudoHeaders;
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let _ = fx.next_outbound_frames();
fx.peer_headers(
1,
PseudoHeaders::default().with_status(Status::Ok),
&Headers::new(),
true,
);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
frames.iter().any(|f| matches!(
f,
Frame::RstStream {
stream_id: 1,
error_code: H2ErrorCode::ProtocolError,
}
)),
"trailing HEADERS with a pseudo-header must earn RST_STREAM(PROTOCOL_ERROR); got \
{frames:?}",
);
assert!(
!fx.connection.streams_lock().contains_key(&1),
"the malformed-trailer stream should be removed",
);
}
#[test]
fn peer_rst_during_prelude_body_phase_rejects_writes() {
use crate::{Body, h2::settings::H2Settings, headers::hpack::PseudoHeaders};
let mut fx = DriverFixture::new_server();
fx.complete_handshake_with_peer_settings(H2Settings::default().with_initial_window_size(2));
fx.peer_open_stream(1, Method::Post, "/", false);
let mut conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let _submit = fx.connection.submit_upgrade(
1,
pseudos,
Headers::new(),
Some(Body::new_static(b"prelude-bytes" as &[u8])),
);
let _ = fx.tick();
let _ = fx.next_outbound_frames();
fx.peer_rst_stream(1, H2ErrorCode::Cancel);
let _ = fx.tick();
assert!(
!fx.connection.streams_lock().contains_key(&1),
"peer RST mid-prelude should remove the stream",
);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match Pin::new(&mut conn.transport).poll_write(&mut cx, b"after reset") {
Poll::Ready(Err(e)) => assert_eq!(
e.kind(),
std::io::ErrorKind::BrokenPipe,
"post-RST write should report BrokenPipe",
),
other => panic!("post-RST write should fail with BrokenPipe, got {other:?}"),
}
}