use super::fixture::*;
use crate::{
Body, Conn, Headers, Method, Priority, Status,
h2::{H2Error, H2Transport, frame::Frame},
headers::hpack::PseudoHeaders,
};
use std::task::Poll;
fn data_bytes(frames: &[Frame], stream_id: u32) -> u32 {
frames
.iter()
.filter_map(|f| match f {
Frame::Data {
stream_id: id,
data_length,
..
} if *id == stream_id => Some(*data_length),
_ => None,
})
.sum()
}
fn data_stream_order(frames: &[Frame]) -> Vec<u32> {
frames
.iter()
.filter_map(|f| match f {
Frame::Data { stream_id, .. } => Some(*stream_id),
_ => None,
})
.collect()
}
fn has_headers(frames: &[Frame], stream_id: u32) -> bool {
frames
.iter()
.any(|f| matches!(f, Frame::Headers { stream_id: id, .. } if *id == stream_id))
}
fn expect_conn(polled: Poll<Option<Result<Conn<H2Transport>, H2Error>>>) -> Conn<H2Transport> {
match polled {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected a yielded Conn, got {other:?}"),
}
}
fn ok() -> PseudoHeaders<'static> {
PseudoHeaders::default().with_status(Status::Ok)
}
#[test]
fn higher_urgency_drains_before_lower() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream_with_priority(1, "/low", Priority::new(5), true);
let _c1 = expect_conn(fx.tick());
fx.peer_open_stream_with_priority(3, "/high", Priority::new(1), true);
let _c3 = expect_conn(fx.tick());
let _ = fx.connection.submit_send(
1,
ok(),
Headers::new(),
Some(Body::new_static(vec![b'a'; 70_000])),
);
let _ = fx.connection.submit_send(
3,
ok(),
Headers::new(),
Some(Body::new_static(vec![b'b'; 70_000])),
);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert_eq!(
data_bytes(&frames, 3),
65_535,
"the more-urgent stream 3 should consume the entire connection window; got {frames:?}",
);
assert_eq!(
data_bytes(&frames, 1),
0,
"the less-urgent stream 1 should get no bandwidth until the urgent stream stalls; got \
{frames:?}",
);
assert_eq!(
data_stream_order(&frames).first(),
Some(&3),
"the first DATA frame must belong to the more-urgent stream; got {frames:?}",
);
}
#[test]
fn same_urgency_non_incremental_is_sequential() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream_with_priority(1, "/a", Priority::new(3), true);
let _c1 = expect_conn(fx.tick());
fx.peer_open_stream_with_priority(3, "/b", Priority::new(3), true);
let _c3 = expect_conn(fx.tick());
let _ = fx.connection.submit_send(
1,
ok(),
Headers::new(),
Some(Body::new_static(vec![b'a'; 20_000])),
);
let _ = fx.connection.submit_send(
3,
ok(),
Headers::new(),
Some(Body::new_static(vec![b'b'; 70_000])),
);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert_eq!(
data_bytes(&frames, 1),
20_000,
"stream 1 should drain its whole body before stream 3 starts; got {frames:?}",
);
assert!(
frames.iter().any(|f| matches!(
f,
Frame::Data {
stream_id: 1,
end_stream: true,
..
}
)),
"stream 1 should finish (END_STREAM) before stream 3 sends; got {frames:?}",
);
assert_eq!(
data_bytes(&frames, 3),
45_535,
"stream 3 should get the remainder of the window after stream 1 completes; got {frames:?}",
);
let order = data_stream_order(&frames);
let first_three = order
.iter()
.position(|&id| id == 3)
.expect("stream 3 sent DATA");
assert!(
order[..first_three].iter().all(|&id| id == 1),
"all of stream 1's DATA must precede any of stream 3's (sequential, not interleaved); got \
order {order:?}",
);
}
#[test]
fn same_urgency_incremental_round_robins() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
let incremental = Priority::new(3).with_incremental(true);
fx.peer_open_stream_with_priority(1, "/a", incremental, true);
let _c1 = expect_conn(fx.tick());
fx.peer_open_stream_with_priority(3, "/b", incremental, true);
let _c3 = expect_conn(fx.tick());
let _ = fx.connection.submit_send(
1,
ok(),
Headers::new(),
Some(Body::new_static(vec![b'a'; 40_000])),
);
let _ = fx.connection.submit_send(
3,
ok(),
Headers::new(),
Some(Body::new_static(vec![b'b'; 40_000])),
);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert_eq!(
data_stream_order(&frames),
vec![1, 3, 1, 3],
"equal-urgency incremental streams should round-robin one frame each; got {frames:?}",
);
assert_eq!(
data_bytes(&frames, 1),
32_768,
"stream 1 share; got {frames:?}"
);
assert_eq!(
data_bytes(&frames, 3),
32_767,
"stream 3 share; got {frames:?}"
);
}
#[test]
fn per_stream_window_stall_falls_through_to_lower_priority() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake_with_peer_settings(
crate::h2::settings::H2Settings::default().with_initial_window_size(16_384),
);
fx.peer_open_stream_with_priority(1, "/high", Priority::new(1), true);
let _c1 = expect_conn(fx.tick());
fx.peer_open_stream_with_priority(3, "/low", Priority::new(5), true);
let _c3 = expect_conn(fx.tick());
let _ = fx.connection.submit_send(
1,
ok(),
Headers::new(),
Some(Body::new_static(vec![b'a'; 40_000])),
);
let _ = fx.connection.submit_send(
3,
ok(),
Headers::new(),
Some(Body::new_static(vec![b'b'; 40_000])),
);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert_eq!(
data_bytes(&frames, 1),
16_384,
"the urgent stream should send exactly its per-stream window, then stall; got {frames:?}",
);
assert_eq!(
data_bytes(&frames, 3),
16_384,
"the lower-priority stream should still send (fall-through past the stalled urgent \
stream) rather than the connection sitting idle; got {frames:?}",
);
}
#[test]
fn headers_emit_ahead_of_a_monopolizing_body() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream_with_priority(1, "/low", Priority::new(7), true);
let _c1 = expect_conn(fx.tick());
fx.peer_open_stream_with_priority(3, "/high", Priority::new(0), true);
let _c3 = expect_conn(fx.tick());
let _ = fx.connection.submit_send(
1,
ok(),
Headers::new(),
Some(Body::new_static(vec![b'a'; 70_000])),
);
let _ = fx.connection.submit_send(
3,
ok(),
Headers::new(),
Some(Body::new_static(vec![b'b'; 70_000])),
);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
has_headers(&frames, 1) && has_headers(&frames, 3),
"both responses' HEADERS should be framed regardless of body priority; got {frames:?}",
);
assert_eq!(
data_bytes(&frames, 1),
0,
"stream 1's body is still starved behind the urgent stream — only its HEADERS went out; \
got {frames:?}",
);
assert_eq!(
data_bytes(&frames, 3),
65_535,
"stream 3 still consumes the whole window for its body; got {frames:?}",
);
}
#[test]
fn priority_update_reshuffles_scheduling() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake_with_peer_settings(
crate::h2::settings::H2Settings::default().with_initial_window_size(200_000),
);
fx.peer_open_stream(1, Method::Get, "/a", true);
let _c1 = expect_conn(fx.tick());
fx.peer_open_stream(3, Method::Get, "/b", true);
let _c3 = expect_conn(fx.tick());
let _ = fx.connection.submit_send(
1,
ok(),
Headers::new(),
Some(Body::new_static(vec![b'a'; 70_000])),
);
let _ = fx.connection.submit_send(
3,
ok(),
Headers::new(),
Some(Body::new_static(vec![b'b'; 70_000])),
);
let _ = fx.tick();
let first = fx.next_outbound_frames();
assert_eq!(
data_bytes(&first, 1),
65_535,
"before reprioritization the lower-id stream drains first; got {first:?}",
);
assert_eq!(
data_bytes(&first, 3),
0,
"stream 3 waits its turn; got {first:?}"
);
fx.peer_priority_update(3, Priority::new(0));
let _ = fx.tick();
assert!(
fx.next_outbound_frames().is_empty(),
"no bandwidth to use yet — the connection window is still exhausted",
);
fx.peer_window_update(0, 200_000);
let _ = fx.tick();
let second = fx.next_outbound_frames();
let order = data_stream_order(&second);
let first_one = order.iter().position(|&id| id == 1).unwrap_or(order.len());
assert!(
!order.is_empty() && order[..first_one].iter().all(|&id| id == 3),
"after the PRIORITY_UPDATE, the now-urgent stream 3 should drain ahead of stream 1's \
remainder; got order {order:?}",
);
assert_eq!(
data_bytes(&second, 3),
70_000,
"stream 3 should finish first now that it is the most urgent; got {second:?}",
);
}