use std::collections::VecDeque;
use crate::bytes::Bytes;
use crate::util::det_hash::DetHashMap;
use super::error::{ErrorCode, H2Error};
use super::frame::PrioritySpec;
use super::settings::DEFAULT_INITIAL_WINDOW_SIZE;
const HEADER_FRAGMENT_MULTIPLIER: usize = 4;
const MAX_HEADER_FRAGMENT_SIZE: usize = 256 * 1024;
const MAX_STREAM_ID: u32 = 0x7FFF_FFFF;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamState {
Idle,
ReservedLocal,
ReservedRemote,
Open,
HalfClosedLocal,
HalfClosedRemote,
Closed,
}
impl StreamState {
#[must_use]
pub fn can_send(&self) -> bool {
matches!(
self,
Self::Open | Self::HalfClosedRemote | Self::ReservedLocal
)
}
#[must_use]
pub fn can_recv(&self) -> bool {
matches!(
self,
Self::Open | Self::HalfClosedLocal | Self::ReservedRemote
)
}
#[must_use]
pub fn is_closed(&self) -> bool {
matches!(self, Self::Closed)
}
#[must_use]
pub fn is_active(&self) -> bool {
matches!(
self,
Self::Open
| Self::HalfClosedLocal
| Self::HalfClosedRemote
| Self::ReservedLocal
| Self::ReservedRemote
)
}
#[must_use]
pub fn can_send_headers(&self) -> bool {
matches!(
self,
Self::Idle | Self::ReservedLocal | Self::Open | Self::HalfClosedRemote
)
}
#[must_use]
pub fn can_recv_headers(&self) -> bool {
matches!(
self,
Self::Idle | Self::ReservedRemote | Self::Open | Self::HalfClosedLocal
)
}
}
#[derive(Debug)]
pub struct Stream {
id: u32,
state: StreamState,
send_window: i32,
recv_window: i32,
initial_send_window: i32,
initial_recv_window: i32,
priority: PrioritySpec,
pending_data: VecDeque<PendingData>,
error_code: Option<ErrorCode>,
headers_complete: bool,
header_fragments: Vec<Bytes>,
max_header_list_size: u32,
}
#[derive(Debug)]
struct PendingData {
data: Bytes,
end_stream: bool,
}
impl Stream {
#[must_use]
pub fn new(id: u32, initial_window_size: u32, max_header_list_size: u32) -> Self {
let initial_send_window =
i32::try_from(initial_window_size).expect("initial window size exceeds i32");
let default_recv_window =
i32::try_from(DEFAULT_INITIAL_WINDOW_SIZE).expect("default window size exceeds i32");
Self {
id,
state: StreamState::Idle,
send_window: initial_send_window,
recv_window: default_recv_window,
initial_send_window,
initial_recv_window: default_recv_window,
priority: PrioritySpec {
exclusive: false,
dependency: 0,
weight: 16,
},
pending_data: VecDeque::new(),
error_code: None,
headers_complete: true,
header_fragments: Vec::new(),
max_header_list_size,
}
}
#[must_use]
pub fn new_reserved_remote(
id: u32,
initial_window_size: u32,
max_header_list_size: u32,
) -> Self {
let mut stream = Self::new(id, initial_window_size, max_header_list_size);
stream.state = StreamState::ReservedRemote;
stream
}
pub(crate) fn max_header_fragment_size_for(max_header_list_size: u32) -> usize {
let max_list_size = usize::try_from(max_header_list_size).unwrap_or(usize::MAX);
let calculated = max_list_size.saturating_mul(HEADER_FRAGMENT_MULTIPLIER);
calculated.min(MAX_HEADER_FRAGMENT_SIZE)
}
fn max_header_fragment_size(&self) -> usize {
Self::max_header_fragment_size_for(self.max_header_list_size)
}
#[must_use]
pub fn id(&self) -> u32 {
self.id
}
#[must_use]
pub fn state(&self) -> StreamState {
self.state
}
#[must_use]
pub fn send_window(&self) -> i32 {
self.send_window
}
#[must_use]
pub fn recv_window(&self) -> i32 {
self.recv_window
}
#[must_use]
pub fn priority(&self) -> &PrioritySpec {
&self.priority
}
#[must_use]
pub fn error_code(&self) -> Option<ErrorCode> {
self.error_code
}
#[must_use]
pub fn is_receiving_headers(&self) -> bool {
!self.headers_complete
}
#[must_use]
pub fn has_pending_data(&self) -> bool {
!self.pending_data.is_empty()
}
pub fn update_send_window(&mut self, delta: i32) -> Result<(), H2Error> {
let new_window = i64::from(self.send_window) + i64::from(delta);
let new_window = i32::try_from(new_window).map_err(|_| {
H2Error::stream(self.id, ErrorCode::FlowControlError, "window size overflow")
})?;
self.send_window = new_window;
Ok(())
}
pub fn update_recv_window(&mut self, delta: i32) -> Result<(), H2Error> {
let new_window = i64::from(self.recv_window) + i64::from(delta);
let new_window = i32::try_from(new_window).map_err(|_| {
H2Error::stream(self.id, ErrorCode::FlowControlError, "window size overflow")
})?;
self.recv_window = new_window;
Ok(())
}
pub fn consume_send_window(&mut self, amount: u32) {
let amount_i64 = i64::from(amount);
let new_window = i64::from(self.send_window) - amount_i64;
self.send_window =
i32::try_from(new_window.clamp(i64::from(i32::MIN), i64::from(i32::MAX)))
.unwrap_or(i32::MIN);
}
pub fn consume_recv_window(&mut self, amount: u32) {
let amount_i64 = i64::from(amount);
let new_window = i64::from(self.recv_window) - amount_i64;
self.recv_window =
i32::try_from(new_window.clamp(i64::from(i32::MIN), i64::from(i32::MAX)))
.unwrap_or(i32::MIN);
}
#[must_use]
pub fn auto_window_update_increment(&self) -> Option<u32> {
let low_watermark = self.initial_recv_window / 2;
if self.recv_window < low_watermark {
let increment = i64::from(self.initial_recv_window) - i64::from(self.recv_window);
u32::try_from(increment).ok().filter(|&inc| inc > 0)
} else {
None
}
}
pub fn set_priority(&mut self, priority: PrioritySpec) {
self.priority = priority;
}
pub fn update_initial_window_size(&mut self, new_size: u32) -> Result<(), H2Error> {
let new_size = i32::try_from(new_size)
.map_err(|_| H2Error::flow_control("initial window size too large"))?;
let delta = new_size - self.initial_send_window;
self.initial_send_window = new_size;
self.update_send_window(delta)
}
pub fn send_headers(&mut self, end_stream: bool) -> Result<(), H2Error> {
match self.state {
StreamState::Idle => {
self.state = if end_stream {
StreamState::HalfClosedLocal
} else {
StreamState::Open
};
Ok(())
}
StreamState::ReservedLocal => {
self.state = if end_stream {
StreamState::Closed
} else {
StreamState::HalfClosedRemote
};
Ok(())
}
StreamState::Open if end_stream => {
self.state = StreamState::HalfClosedLocal;
Ok(())
}
StreamState::HalfClosedRemote if end_stream => {
self.state = StreamState::Closed;
Ok(())
}
StreamState::Open | StreamState::HalfClosedRemote => Ok(()),
_ => Err(H2Error::stream(
self.id,
ErrorCode::StreamClosed,
"cannot send headers in current state",
)),
}
}
pub fn recv_headers(&mut self, end_stream: bool, end_headers: bool) -> Result<(), H2Error> {
match self.state {
StreamState::Idle => {
self.state = if end_stream {
StreamState::HalfClosedRemote
} else {
StreamState::Open
};
}
StreamState::ReservedRemote => {
self.state = if end_stream {
StreamState::Closed
} else {
StreamState::HalfClosedLocal
};
}
StreamState::Open if end_stream => {
self.state = StreamState::HalfClosedRemote;
}
StreamState::HalfClosedLocal if end_stream => {
self.state = StreamState::Closed;
}
StreamState::Open | StreamState::HalfClosedLocal => {}
_ => {
return Err(H2Error::stream(
self.id,
ErrorCode::StreamClosed,
"cannot receive headers in current state",
));
}
}
self.headers_complete = end_headers;
Ok(())
}
pub fn recv_continuation(
&mut self,
header_block: Bytes,
end_headers: bool,
) -> Result<(), H2Error> {
if self.state.is_closed() {
return Err(H2Error::stream(
self.id,
ErrorCode::StreamClosed,
"CONTINUATION on closed stream",
));
}
if self.headers_complete {
return Err(H2Error::stream(
self.id,
ErrorCode::ProtocolError,
"unexpected CONTINUATION frame",
));
}
let current_size: usize = self.header_fragments.iter().map(Bytes::len).sum();
if current_size.saturating_add(header_block.len()) > self.max_header_fragment_size() {
return Err(H2Error::stream(
self.id,
ErrorCode::EnhanceYourCalm,
"accumulated header fragments too large",
));
}
self.header_fragments.push(header_block);
self.headers_complete = end_headers;
Ok(())
}
pub fn take_header_fragments(&mut self) -> Vec<Bytes> {
std::mem::take(&mut self.header_fragments)
}
pub fn add_header_fragment(&mut self, fragment: Bytes) -> Result<(), H2Error> {
let current_size: usize = self.header_fragments.iter().map(Bytes::len).sum();
if current_size.saturating_add(fragment.len()) > self.max_header_fragment_size() {
return Err(H2Error::stream(
self.id,
ErrorCode::EnhanceYourCalm,
"accumulated header fragments too large",
));
}
self.header_fragments.push(fragment);
Ok(())
}
pub fn send_data(&mut self, end_stream: bool) -> Result<(), H2Error> {
if !self.state.can_send() || self.state == StreamState::ReservedLocal {
return Err(H2Error::stream(
self.id,
ErrorCode::StreamClosed,
"cannot send data in current state",
));
}
if end_stream {
match self.state {
StreamState::Open => self.state = StreamState::HalfClosedLocal,
StreamState::HalfClosedRemote => self.state = StreamState::Closed,
_ => {}
}
}
Ok(())
}
pub fn recv_data(&mut self, len: u32, end_stream: bool) -> Result<(), H2Error> {
if !self.state.can_recv() || self.state == StreamState::ReservedRemote {
return Err(H2Error::stream(
self.id,
ErrorCode::StreamClosed,
"cannot receive data in current state",
));
}
let len_i32 = i32::try_from(len).map_err(|_| {
H2Error::stream(
self.id,
ErrorCode::FlowControlError,
"data length too large",
)
})?;
if len_i32 > self.recv_window {
return Err(H2Error::stream(
self.id,
ErrorCode::FlowControlError,
"data exceeds flow control window",
));
}
self.consume_recv_window(len);
if end_stream {
match self.state {
StreamState::Open => self.state = StreamState::HalfClosedRemote,
StreamState::HalfClosedLocal => self.state = StreamState::Closed,
_ => {}
}
}
Ok(())
}
pub fn reset(&mut self, error_code: ErrorCode) {
self.state = StreamState::Closed;
self.error_code = Some(error_code);
self.header_fragments.clear();
self.pending_data.clear();
}
pub fn queue_data(&mut self, data: Bytes, end_stream: bool) {
self.pending_data
.push_back(PendingData { data, end_stream });
}
pub fn take_pending_data(&mut self, max_len: usize) -> Option<(Bytes, bool)> {
if max_len == 0 {
return None;
}
if let Some(front) = self.pending_data.front() {
if front.data.len() <= max_len {
let pending = self.pending_data.pop_front()?;
return Some((pending.data, pending.end_stream));
}
}
if let Some(front) = self.pending_data.front_mut() {
let data = front.data.slice(..max_len);
front.data = front.data.slice(max_len..);
return Some((data, false));
}
None
}
}
#[derive(Debug)]
pub struct StreamStore {
streams: DetHashMap<u32, Stream>,
next_client_stream_id: u32,
next_server_stream_id: u32,
max_concurrent_streams: u32,
initial_window_size: u32,
max_header_list_size: u32,
is_client: bool,
}
impl StreamStore {
#[must_use]
pub fn new(is_client: bool, initial_window_size: u32, max_header_list_size: u32) -> Self {
Self {
streams: DetHashMap::default(),
next_client_stream_id: 1,
next_server_stream_id: 2,
max_concurrent_streams: u32::MAX,
initial_window_size,
max_header_list_size,
is_client,
}
}
pub fn set_max_concurrent_streams(&mut self, max: u32) {
self.max_concurrent_streams = max;
}
pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), H2Error> {
for stream in self.streams.values_mut() {
if !stream.state.is_closed() {
stream.update_initial_window_size(size)?;
}
}
self.initial_window_size = size;
Ok(())
}
#[must_use]
pub fn initial_window_size(&self) -> u32 {
self.initial_window_size
}
#[must_use]
pub fn get(&self, id: u32) -> Option<&Stream> {
self.streams.get(&id)
}
#[must_use]
pub fn get_mut(&mut self, id: u32) -> Option<&mut Stream> {
self.streams.get_mut(&id)
}
#[must_use]
pub fn is_idle_stream_id(&self, id: u32) -> bool {
if id == 0 || id > MAX_STREAM_ID {
return false;
}
if let Some(stream) = self.streams.get(&id) {
return stream.state() == StreamState::Idle;
}
if id % 2 == 1 {
id >= self.next_client_stream_id
} else {
id >= self.next_server_stream_id
}
}
pub fn get_or_create(&mut self, id: u32) -> Result<&mut Stream, H2Error> {
if !self.streams.contains_key(&id) {
if id == 0 {
return Err(H2Error::protocol("stream ID 0 is reserved"));
}
if id > MAX_STREAM_ID {
return Err(H2Error::protocol("stream ID exceeds maximum"));
}
let is_client_stream = id % 2 == 1;
if self.is_client && is_client_stream {
return Err(H2Error::protocol("invalid stream ID parity"));
}
if !self.is_client && !is_client_stream {
return Err(H2Error::protocol("invalid stream ID parity"));
}
if self.streams.len() >= self.max_concurrent_streams as usize {
let active = self
.streams
.values()
.filter(|s| s.state.is_active())
.count();
self.streams.retain(|_, s| !s.state.is_closed());
if active >= self.max_concurrent_streams as usize {
return Err(H2Error::stream(
id,
ErrorCode::RefusedStream,
"max concurrent streams exceeded",
));
}
}
if self.is_client && !is_client_stream {
if id < self.next_server_stream_id {
return Err(H2Error::protocol("stream ID already used"));
}
self.next_server_stream_id = id.saturating_add(2);
} else if !self.is_client && is_client_stream {
if id < self.next_client_stream_id {
return Err(H2Error::protocol("stream ID already used"));
}
self.next_client_stream_id = id.saturating_add(2);
}
let stream = Stream::new(id, self.initial_window_size, self.max_header_list_size);
self.streams.insert(id, stream);
}
self.streams.get_mut(&id).ok_or_else(|| {
H2Error::connection(ErrorCode::InternalError, "stream missing after insert")
})
}
pub fn reserve_remote_stream(&mut self, id: u32) -> Result<&mut Stream, H2Error> {
if id == 0 {
return Err(H2Error::protocol("stream ID 0 is reserved"));
}
if id > MAX_STREAM_ID {
return Err(H2Error::protocol("stream ID exceeds maximum"));
}
if self.streams.contains_key(&id) {
return Err(H2Error::protocol("stream ID already used"));
}
let is_client_stream = id % 2 == 1;
if self.is_client {
if is_client_stream {
return Err(H2Error::protocol("invalid promised stream ID"));
}
if id < self.next_server_stream_id {
return Err(H2Error::protocol("stream ID already used"));
}
self.next_server_stream_id = id.saturating_add(2);
} else {
if !is_client_stream {
return Err(H2Error::protocol("invalid promised stream ID"));
}
if id < self.next_client_stream_id {
return Err(H2Error::protocol("stream ID already used"));
}
self.next_client_stream_id = id.saturating_add(2);
}
let stream =
Stream::new_reserved_remote(id, self.initial_window_size, self.max_header_list_size);
self.streams.insert(id, stream);
self.streams.get_mut(&id).ok_or_else(|| {
H2Error::connection(
ErrorCode::InternalError,
"reserved stream missing after insert",
)
})
}
pub fn allocate_stream_id(&mut self) -> Result<u32, H2Error> {
if self.streams.len() >= self.max_concurrent_streams as usize {
let mut active_count = 0;
self.streams.retain(|_, s| {
if s.state.is_active() {
active_count += 1;
}
!s.state.is_closed()
});
if active_count >= self.max_concurrent_streams {
return Err(H2Error::protocol("max concurrent streams exceeded"));
}
}
let id = if self.is_client {
if self.next_client_stream_id > MAX_STREAM_ID {
return Err(H2Error::protocol("stream ID exhausted"));
}
let id = self.next_client_stream_id;
self.next_client_stream_id = id.saturating_add(2);
id
} else {
if self.next_server_stream_id > MAX_STREAM_ID {
return Err(H2Error::protocol("stream ID exhausted"));
}
let id = self.next_server_stream_id;
self.next_server_stream_id = id.saturating_add(2);
id
};
let stream = Stream::new(id, self.initial_window_size, self.max_header_list_size);
self.streams.insert(id, stream);
Ok(id)
}
#[must_use]
pub fn len(&self) -> usize {
self.streams.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.streams.is_empty()
}
pub fn prune_closed(&mut self) {
self.streams.retain(|_, stream| !stream.state.is_closed());
}
#[must_use]
pub fn active_stream_ids(&self) -> Vec<u32> {
self.streams
.iter()
.filter(|(_, s)| s.state.is_active())
.map(|(&id, _)| id)
.collect()
}
#[must_use]
pub fn active_count(&self) -> usize {
self.streams
.values()
.filter(|s| s.state.is_active())
.count()
}
}
#[cfg(test)]
mod tests {
use super::super::settings::DEFAULT_MAX_HEADER_LIST_SIZE;
use super::*;
#[test]
fn test_stream_state_transitions() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
assert_eq!(stream.state(), StreamState::Idle);
stream.send_headers(false).unwrap();
assert_eq!(stream.state(), StreamState::Open);
stream.recv_data(100, true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedRemote);
stream.send_data(true).unwrap();
assert_eq!(stream.state(), StreamState::Closed);
}
#[test]
fn test_stream_flow_control() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
assert_eq!(stream.send_window(), 65535);
stream.consume_send_window(1000);
assert_eq!(stream.send_window(), 64535);
stream.update_send_window(500).unwrap();
assert_eq!(stream.send_window(), 65035);
}
#[test]
fn header_fragment_limit_respects_max_header_list_size() {
let max_list_size = 8;
let mut stream = Stream::new(1, 65535, max_list_size);
stream
.add_header_fragment(Bytes::from(vec![0; 16]))
.unwrap();
assert!(
stream
.add_header_fragment(Bytes::from(vec![0; 17]))
.is_err()
);
}
#[test]
fn test_stream_store_allocation() {
let mut store = StreamStore::new(true, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
assert!(store.is_empty());
let id1 = store.allocate_stream_id().unwrap();
assert_eq!(id1, 1);
let id2 = store.allocate_stream_id().unwrap();
assert_eq!(id2, 3);
let id3 = store.allocate_stream_id().unwrap();
assert_eq!(id3, 5);
assert!(!store.is_empty());
}
#[test]
fn test_stream_store_max_concurrent() {
let mut store = StreamStore::new(true, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
store.set_max_concurrent_streams(2);
let id1 = store.allocate_stream_id().unwrap();
store.get_mut(id1).unwrap().send_headers(false).unwrap();
let id2 = store.allocate_stream_id().unwrap();
store.get_mut(id2).unwrap().send_headers(false).unwrap();
assert!(store.allocate_stream_id().is_err());
store.get_mut(id1).unwrap().reset(ErrorCode::NoError);
store.prune_closed();
assert!(store.allocate_stream_id().is_ok());
}
#[test]
fn auto_window_update_not_needed_when_window_above_half() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
stream.recv_data(30_000, false).unwrap();
assert!(
stream.recv_window() >= stream.initial_recv_window / 2,
"window should still be above the low watermark"
);
assert!(stream.auto_window_update_increment().is_none());
}
#[test]
fn auto_window_update_triggered_when_window_below_half() {
let initial = DEFAULT_INITIAL_WINDOW_SIZE;
let initial_i32 = i32::try_from(initial).unwrap();
let mut stream = Stream::new(1, initial, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
let consume = u32::try_from(initial_i32 / 2 + 2).unwrap();
stream.recv_data(consume, false).unwrap();
let increment = stream
.auto_window_update_increment()
.expect("should need WINDOW_UPDATE");
assert_eq!(
i64::from(stream.recv_window()) + i64::from(increment),
i64::from(initial_i32)
);
}
#[test]
fn auto_window_update_returns_none_after_replenish() {
let initial = DEFAULT_INITIAL_WINDOW_SIZE;
let initial_i32 = i32::try_from(initial).unwrap();
let mut stream = Stream::new(1, initial, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
let consume = u32::try_from(initial_i32 / 2 + 2).unwrap();
stream.recv_data(consume, false).unwrap();
let increment = stream.auto_window_update_increment().unwrap();
stream
.update_recv_window(i32::try_from(increment).unwrap())
.unwrap();
assert!(stream.auto_window_update_increment().is_none());
}
#[test]
fn idle_to_open_via_send_headers() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
assert_eq!(stream.state(), StreamState::Idle);
stream.send_headers(false).unwrap();
assert_eq!(stream.state(), StreamState::Open);
}
#[test]
fn idle_to_half_closed_local_via_send_headers_with_end_stream() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
assert_eq!(stream.state(), StreamState::Idle);
stream.send_headers(true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedLocal);
}
#[test]
fn idle_to_open_via_recv_headers() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
assert_eq!(stream.state(), StreamState::Idle);
stream.recv_headers(false, true).unwrap();
assert_eq!(stream.state(), StreamState::Open);
}
#[test]
fn idle_to_half_closed_remote_via_recv_headers_with_end_stream() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
assert_eq!(stream.state(), StreamState::Idle);
stream.recv_headers(true, true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedRemote);
}
#[test]
fn open_to_half_closed_local_via_send_data() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
assert_eq!(stream.state(), StreamState::Open);
stream.send_data(true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedLocal);
}
#[test]
fn open_to_half_closed_local_via_send_headers() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
assert_eq!(stream.state(), StreamState::Open);
stream.send_headers(true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedLocal);
}
#[test]
fn open_to_half_closed_remote_via_recv_data() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
assert_eq!(stream.state(), StreamState::Open);
stream.recv_data(100, true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedRemote);
}
#[test]
fn open_to_half_closed_remote_via_recv_headers() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
assert_eq!(stream.state(), StreamState::Open);
stream.recv_headers(true, true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedRemote);
}
#[test]
fn half_closed_local_to_closed_via_recv_data() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(true).unwrap(); assert_eq!(stream.state(), StreamState::HalfClosedLocal);
stream.recv_data(100, true).unwrap();
assert_eq!(stream.state(), StreamState::Closed);
}
#[test]
fn half_closed_local_to_closed_via_recv_headers() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedLocal);
stream.recv_headers(true, true).unwrap();
assert_eq!(stream.state(), StreamState::Closed);
}
#[test]
fn half_closed_remote_to_closed_via_send_data() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
stream.recv_data(100, true).unwrap(); assert_eq!(stream.state(), StreamState::HalfClosedRemote);
stream.send_data(true).unwrap();
assert_eq!(stream.state(), StreamState::Closed);
}
#[test]
fn half_closed_remote_to_closed_via_send_headers() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
stream.recv_data(100, true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedRemote);
stream.send_headers(true).unwrap();
assert_eq!(stream.state(), StreamState::Closed);
}
#[test]
fn send_headers_open_without_end_stream_stays_open() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap(); assert_eq!(stream.state(), StreamState::Open);
stream.send_headers(false).unwrap();
assert_eq!(stream.state(), StreamState::Open);
}
#[test]
fn send_headers_half_closed_remote_without_end_stream_stays() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap(); stream.recv_data(100, true).unwrap(); assert_eq!(stream.state(), StreamState::HalfClosedRemote);
stream.send_headers(false).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedRemote);
}
#[test]
fn recv_headers_open_without_end_stream_stays_open() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap(); assert_eq!(stream.state(), StreamState::Open);
stream.recv_headers(false, true).unwrap();
assert_eq!(stream.state(), StreamState::Open);
}
#[test]
fn recv_headers_half_closed_local_without_end_stream_stays() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(true).unwrap(); assert_eq!(stream.state(), StreamState::HalfClosedLocal);
stream.recv_headers(false, true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedLocal);
}
#[test]
fn reserved_local_to_half_closed_remote_via_send_headers() {
let mut stream = Stream::new(2, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.state = StreamState::ReservedLocal;
stream.send_headers(false).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedRemote);
}
#[test]
fn reserved_local_to_closed_via_send_headers_with_end_stream() {
let mut stream = Stream::new(2, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.state = StreamState::ReservedLocal;
stream.send_headers(true).unwrap();
assert_eq!(stream.state(), StreamState::Closed);
}
#[test]
fn reserved_remote_to_half_closed_local_via_recv_headers() {
let mut stream = Stream::new(2, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.state = StreamState::ReservedRemote;
stream.recv_headers(false, true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedLocal);
}
#[test]
fn reserved_remote_to_closed_via_recv_headers_with_end_stream() {
let mut stream = Stream::new(2, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.state = StreamState::ReservedRemote;
stream.recv_headers(true, true).unwrap();
assert_eq!(stream.state(), StreamState::Closed);
}
#[test]
fn reset_from_any_state_goes_to_closed() {
for initial_state in [
StreamState::Idle,
StreamState::Open,
StreamState::HalfClosedLocal,
StreamState::HalfClosedRemote,
StreamState::ReservedLocal,
StreamState::ReservedRemote,
] {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.state = initial_state;
stream.reset(ErrorCode::Cancel);
assert_eq!(stream.state(), StreamState::Closed);
assert_eq!(stream.error_code(), Some(ErrorCode::Cancel));
}
}
#[test]
fn reset_preserves_error_code() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
stream.reset(ErrorCode::InternalError);
assert_eq!(stream.error_code(), Some(ErrorCode::InternalError));
stream.reset(ErrorCode::StreamClosed);
assert_eq!(stream.error_code(), Some(ErrorCode::StreamClosed));
}
#[test]
fn cannot_send_headers_on_closed_stream() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.reset(ErrorCode::Cancel);
assert_eq!(stream.state(), StreamState::Closed);
let result = stream.send_headers(false);
assert!(result.is_err());
}
#[test]
fn cannot_recv_headers_on_closed_stream() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.reset(ErrorCode::Cancel);
let result = stream.recv_headers(false, true);
assert!(result.is_err());
}
#[test]
fn cannot_send_data_on_closed_stream() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.reset(ErrorCode::Cancel);
let result = stream.send_data(false);
assert!(result.is_err());
}
#[test]
fn cannot_recv_data_on_closed_stream() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.reset(ErrorCode::Cancel);
let result = stream.recv_data(100, false);
assert!(result.is_err());
}
#[test]
fn cannot_send_data_on_half_closed_local() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedLocal);
let result = stream.send_data(false);
assert!(result.is_err());
}
#[test]
fn cannot_recv_data_on_half_closed_remote() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
stream.recv_data(100, true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedRemote);
let result = stream.recv_data(100, false);
assert!(result.is_err());
}
#[test]
fn cannot_send_headers_on_half_closed_local() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedLocal);
let result = stream.send_headers(false);
assert!(result.is_err());
}
#[test]
fn cannot_recv_headers_on_half_closed_remote() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
stream.recv_headers(true, true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedRemote);
let result = stream.recv_headers(false, true);
assert!(result.is_err());
}
#[test]
fn cannot_send_data_on_idle() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
assert_eq!(stream.state(), StreamState::Idle);
let result = stream.send_data(false);
assert!(result.is_err());
}
#[test]
fn cannot_recv_data_on_idle() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
assert_eq!(stream.state(), StreamState::Idle);
let result = stream.recv_data(100, false);
assert!(result.is_err());
}
#[test]
fn recv_data_exceeding_window_returns_flow_control_error() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
stream.recv_data(65530, false).unwrap();
let result = stream.recv_data(100, false);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.code, ErrorCode::FlowControlError);
}
#[test]
fn window_update_overflow_returns_error() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
let result = stream.update_send_window(i32::MAX);
assert!(result.is_err());
}
#[test]
fn can_send_predicates_are_correct() {
assert!(!StreamState::Idle.can_send());
assert!(StreamState::Open.can_send());
assert!(!StreamState::HalfClosedLocal.can_send());
assert!(StreamState::HalfClosedRemote.can_send());
assert!(StreamState::ReservedLocal.can_send());
assert!(!StreamState::ReservedRemote.can_send());
assert!(!StreamState::Closed.can_send());
}
#[test]
fn can_recv_predicates_are_correct() {
assert!(!StreamState::Idle.can_recv());
assert!(StreamState::Open.can_recv());
assert!(StreamState::HalfClosedLocal.can_recv());
assert!(!StreamState::HalfClosedRemote.can_recv());
assert!(!StreamState::ReservedLocal.can_recv());
assert!(StreamState::ReservedRemote.can_recv());
assert!(!StreamState::Closed.can_recv());
}
#[test]
fn can_send_headers_predicates_are_correct() {
assert!(StreamState::Idle.can_send_headers());
assert!(StreamState::Open.can_send_headers());
assert!(!StreamState::HalfClosedLocal.can_send_headers());
assert!(StreamState::HalfClosedRemote.can_send_headers());
assert!(StreamState::ReservedLocal.can_send_headers());
assert!(!StreamState::ReservedRemote.can_send_headers());
assert!(!StreamState::Closed.can_send_headers());
}
#[test]
fn can_recv_headers_predicates_are_correct() {
assert!(StreamState::Idle.can_recv_headers());
assert!(StreamState::Open.can_recv_headers());
assert!(StreamState::HalfClosedLocal.can_recv_headers());
assert!(!StreamState::HalfClosedRemote.can_recv_headers());
assert!(!StreamState::ReservedLocal.can_recv_headers());
assert!(StreamState::ReservedRemote.can_recv_headers());
assert!(!StreamState::Closed.can_recv_headers());
}
#[test]
fn is_closed_predicate_is_correct() {
assert!(!StreamState::Idle.is_closed());
assert!(!StreamState::Open.is_closed());
assert!(!StreamState::HalfClosedLocal.is_closed());
assert!(!StreamState::HalfClosedRemote.is_closed());
assert!(!StreamState::ReservedLocal.is_closed());
assert!(!StreamState::ReservedRemote.is_closed());
assert!(StreamState::Closed.is_closed());
}
#[test]
fn continuation_without_headers_in_progress_is_error() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
let result = stream.recv_continuation(Bytes::from_static(b"test"), false);
assert!(result.is_err());
}
#[test]
fn continuation_accumulates_fragments() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.recv_headers(false, false).unwrap();
assert!(stream.is_receiving_headers());
stream
.recv_continuation(Bytes::from_static(b"part1"), false)
.unwrap();
stream
.recv_continuation(Bytes::from_static(b"part2"), true)
.unwrap();
assert!(!stream.is_receiving_headers());
let fragments = stream.take_header_fragments();
assert_eq!(fragments.len(), 2);
}
#[test]
fn pending_data_queue_works() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
assert!(!stream.has_pending_data());
stream.queue_data(Bytes::from_static(b"hello"), false);
stream.queue_data(Bytes::from_static(b"world"), true);
assert!(stream.has_pending_data());
let (data1, end1) = stream.take_pending_data(100).unwrap();
assert_eq!(&data1[..], b"hello");
assert!(!end1);
let (data2, end2) = stream.take_pending_data(100).unwrap();
assert_eq!(&data2[..], b"world");
assert!(end2);
assert!(!stream.has_pending_data());
}
#[test]
fn pending_data_partial_take() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.queue_data(Bytes::from_static(b"hello world"), true);
let (data1, end1) = stream.take_pending_data(5).unwrap();
assert_eq!(&data1[..], b"hello");
assert!(!end1);
let (data2, end2) = stream.take_pending_data(100).unwrap();
assert_eq!(&data2[..], b" world");
assert!(end2);
}
#[test]
fn pending_data_zero_window_returns_none() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.queue_data(Bytes::from_static(b"hello"), true);
let taken = stream.take_pending_data(0);
assert!(taken.is_none());
assert!(stream.has_pending_data());
let (data, end) = stream.take_pending_data(5).unwrap();
assert_eq!(&data[..], b"hello");
assert!(end);
assert!(!stream.has_pending_data());
}
#[test]
fn stream_store_rejects_stream_id_zero() {
let mut store = StreamStore::new(true, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
let result = store.get_or_create(0);
assert!(result.is_err());
}
#[test]
fn stream_store_rejects_stream_id_over_max() {
let mut store = StreamStore::new(true, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
let result = store.get_or_create(MAX_STREAM_ID + 1);
assert!(result.is_err());
}
#[test]
fn stream_store_client_rejects_client_initiated_stream() {
let mut store = StreamStore::new(true, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
let result = store.get_or_create(1);
assert!(result.is_err());
}
#[test]
fn stream_store_server_rejects_server_initiated_stream() {
let mut store = StreamStore::new(false, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
let result = store.get_or_create(2);
assert!(result.is_err());
}
#[test]
fn stream_store_client_rejects_reused_server_stream_id() {
let mut store = StreamStore::new(true, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
store.get_or_create(2).unwrap();
store.get_or_create(4).unwrap();
assert!(store.get_or_create(2).is_ok());
}
#[test]
fn stream_store_server_advances_client_stream_ids() {
let mut store = StreamStore::new(false, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
store.get_or_create(1).unwrap();
store.get_or_create(5).unwrap();
let result = store.get_or_create(3);
assert!(result.is_err());
}
#[test]
fn stream_store_allocate_stream_id_exhausts_at_max() {
let mut store = StreamStore::new(true, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
store.next_client_stream_id = MAX_STREAM_ID;
let id = store.allocate_stream_id().unwrap();
assert_eq!(id, MAX_STREAM_ID);
assert!(store.allocate_stream_id().is_err());
}
#[test]
fn stream_store_prune_removes_closed_streams() {
let mut store = StreamStore::new(true, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
let id = store.allocate_stream_id().unwrap();
store.get_mut(id).unwrap().reset(ErrorCode::NoError);
assert_eq!(store.active_count(), 0);
store.prune_closed();
assert!(store.get(id).is_none());
}
#[test]
fn stream_store_active_stream_ids() {
let mut store = StreamStore::new(true, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
let id1 = store.allocate_stream_id().unwrap();
let id2 = store.allocate_stream_id().unwrap();
store.get_mut(id2).unwrap().send_headers(false).unwrap();
store.get_mut(id1).unwrap().reset(ErrorCode::NoError);
let active = store.active_stream_ids();
assert_eq!(active.len(), 1);
assert!(active.contains(&id2));
assert!(!active.contains(&id1));
}
#[test]
fn update_initial_window_size_adjusts_existing_streams() {
let mut store = StreamStore::new(true, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
let id = store.allocate_stream_id().unwrap();
assert_eq!(store.get(id).unwrap().send_window(), 65535);
store.set_initial_window_size(100_000).unwrap();
assert_eq!(store.get(id).unwrap().send_window(), 100_000);
store.set_initial_window_size(50_000).unwrap();
assert_eq!(store.get(id).unwrap().send_window(), 50_000);
}
#[test]
fn priority_can_be_set_and_retrieved() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
let new_priority = PrioritySpec {
exclusive: true,
dependency: 3,
weight: 255,
};
stream.set_priority(new_priority);
let priority = stream.priority();
assert!(priority.exclusive);
assert_eq!(priority.dependency, 3);
assert_eq!(priority.weight, 255);
}
#[test]
fn reset_then_recv_data_returns_error() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
assert_eq!(stream.state(), StreamState::Open);
stream.reset(ErrorCode::Cancel);
assert_eq!(stream.state(), StreamState::Closed);
assert_eq!(stream.error_code(), Some(ErrorCode::Cancel));
let result = stream.recv_data(100, false);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.code, ErrorCode::StreamClosed);
}
#[test]
fn reset_then_recv_headers_returns_error() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
stream.reset(ErrorCode::InternalError);
assert_eq!(stream.state(), StreamState::Closed);
let result = stream.recv_headers(true, true);
assert!(result.is_err());
}
#[test]
fn reset_during_header_accumulation() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.recv_headers(false, false).unwrap();
assert!(stream.is_receiving_headers());
stream
.add_header_fragment(Bytes::from_static(b"partial_header"))
.unwrap();
stream.reset(ErrorCode::Cancel);
assert_eq!(stream.state(), StreamState::Closed);
assert!(stream.is_receiving_headers());
let result = stream.recv_data(100, false);
assert!(result.is_err());
let result = stream.recv_headers(false, true);
assert!(result.is_err());
}
#[test]
fn double_reset_is_safe() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
stream.reset(ErrorCode::Cancel);
assert_eq!(stream.state(), StreamState::Closed);
assert_eq!(stream.error_code(), Some(ErrorCode::Cancel));
stream.reset(ErrorCode::InternalError);
assert_eq!(stream.state(), StreamState::Closed);
assert_eq!(stream.error_code(), Some(ErrorCode::InternalError));
}
#[test]
fn no_send_after_end_stream() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(true).unwrap(); assert_eq!(stream.state(), StreamState::HalfClosedLocal);
assert!(stream.send_data(false).is_err());
assert!(stream.send_headers(false).is_err());
}
#[test]
fn trailers_transition_to_half_closed() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
assert_eq!(stream.state(), StreamState::Open);
stream.send_headers(true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedLocal);
}
#[test]
fn recv_trailers_transition_to_half_closed() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
assert_eq!(stream.state(), StreamState::Open);
stream.recv_headers(true, true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedRemote);
}
#[test]
fn window_can_go_negative_after_settings_change() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
stream.consume_send_window(60000);
assert_eq!(stream.send_window(), 5535);
stream.update_initial_window_size(1000).unwrap();
assert!(stream.send_window() < 0);
}
#[test]
fn reserved_remote_can_recv_data() {
let mut stream = Stream::new(2, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.state = StreamState::ReservedRemote;
assert!(stream.state().can_recv());
stream.recv_headers(false, true).unwrap();
assert_eq!(stream.state(), StreamState::HalfClosedLocal);
let result = stream.recv_data(100, true);
assert!(result.is_ok());
assert_eq!(stream.state(), StreamState::Closed);
}
#[test]
fn reserved_local_rejects_send_data() {
let mut stream = Stream::new(2, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.state = StreamState::ReservedLocal;
let result = stream.send_data(false);
assert!(result.is_err(), "DATA on reserved(local) must be rejected");
}
#[test]
fn reserved_remote_rejects_recv_data() {
let mut stream = Stream::new(2, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.state = StreamState::ReservedRemote;
let result = stream.recv_data(100, false);
assert!(result.is_err(), "DATA on reserved(remote) must be rejected");
}
#[test]
fn reset_clears_header_fragments_and_pending_data() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.recv_headers(false, false).unwrap();
stream
.add_header_fragment(Bytes::from(vec![0xAA; 64]))
.unwrap();
assert!(!stream.take_header_fragments().is_empty() || stream.is_receiving_headers());
stream
.add_header_fragment(Bytes::from(vec![0xBB; 64]))
.unwrap();
stream.queue_data(Bytes::from_static(b"buffered"), false);
assert!(stream.has_pending_data());
stream.reset(ErrorCode::Cancel);
assert_eq!(stream.state(), StreamState::Closed);
assert!(
stream.take_header_fragments().is_empty(),
"header_fragments should be cleared on reset"
);
assert!(
!stream.has_pending_data(),
"pending_data should be cleared on reset"
);
}
#[test]
fn set_initial_window_size_skips_closed_streams() {
let mut store = StreamStore::new(true, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
let id = store.allocate_stream_id().unwrap();
store.get_mut(id).unwrap().consume_send_window(65535);
store
.get_mut(id)
.unwrap()
.update_initial_window_size(1)
.unwrap();
assert!(store.get(id).unwrap().send_window() < 0);
store.get_mut(id).unwrap().reset(ErrorCode::NoError);
let result = store.set_initial_window_size(0x7fff_ffff);
assert!(
result.is_ok(),
"closed streams must not block SETTINGS update: {result:?}"
);
}
#[test]
fn stream_store_handles_rapid_churn() {
let mut store = StreamStore::new(true, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
store.set_max_concurrent_streams(10);
for round in 0..10 {
let mut ids = Vec::new();
for _ in 0..10 {
let id = store.allocate_stream_id().unwrap();
store.get_mut(id).unwrap().send_headers(false).unwrap();
ids.push(id);
}
let result = store.allocate_stream_id();
assert!(
result.is_err(),
"round {round}: should hit max_concurrent_streams limit"
);
for id in &ids {
store.get_mut(*id).unwrap().reset(ErrorCode::NoError);
}
store.prune_closed();
assert_eq!(
store.active_count(),
0,
"round {round}: all streams should be pruned"
);
}
let id = store.allocate_stream_id().unwrap();
assert!(id > 0);
}
#[test]
fn reserve_remote_validates_parity() {
let mut store = StreamStore::new(true, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
assert!(store.reserve_remote_stream(2).is_ok());
assert!(store.reserve_remote_stream(3).is_err());
let mut store = StreamStore::new(false, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
assert!(store.reserve_remote_stream(1).is_ok());
assert!(store.reserve_remote_stream(2).is_err());
}
#[test]
fn stream_id_must_be_monotonic() {
let mut store = StreamStore::new(true, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
let _ = store.allocate_stream_id().unwrap(); let _ = store.allocate_stream_id().unwrap();
store.reserve_remote_stream(2).unwrap();
store.reserve_remote_stream(4).unwrap();
assert!(store.reserve_remote_stream(2).is_err());
}
#[test]
fn pending_data_preserves_order() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.queue_data(Bytes::from_static(b"first"), false);
stream.queue_data(Bytes::from_static(b"second"), false);
stream.queue_data(Bytes::from_static(b"third"), true);
let (d1, e1) = stream.take_pending_data(100).unwrap();
assert_eq!(&d1[..], b"first");
assert!(!e1);
let (d2, e2) = stream.take_pending_data(100).unwrap();
assert_eq!(&d2[..], b"second");
assert!(!e2);
let (d3, e3) = stream.take_pending_data(100).unwrap();
assert_eq!(&d3[..], b"third");
assert!(e3);
assert!(!stream.has_pending_data());
}
#[test]
fn recv_headers_on_closed_stream_does_not_corrupt_headers_complete() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
assert_eq!(stream.state(), StreamState::Open);
stream.reset(ErrorCode::Cancel);
assert_eq!(stream.state(), StreamState::Closed);
assert!(
!stream.is_receiving_headers(),
"headers_complete should be true before the rejected recv_headers"
);
let result = stream.recv_headers(false, false);
assert!(result.is_err(), "recv_headers on Closed must fail");
assert!(
!stream.is_receiving_headers(),
"headers_complete must not be corrupted by a rejected recv_headers"
);
}
#[test]
fn recv_continuation_rejects_closed_stream() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.recv_headers(false, false).unwrap();
assert!(stream.is_receiving_headers());
stream.reset(ErrorCode::Cancel);
assert_eq!(stream.state(), StreamState::Closed);
let result = stream.recv_continuation(Bytes::from_static(b"fragment"), true);
assert!(
result.is_err(),
"recv_continuation must reject frames on a Closed stream"
);
assert_eq!(
result.unwrap_err().code,
ErrorCode::StreamClosed,
"error code should be StreamClosed"
);
}
#[test]
fn reset_then_rejected_headers_then_continuation_all_rejected() {
let mut stream = Stream::new(1, 65535, DEFAULT_MAX_HEADER_LIST_SIZE);
stream.send_headers(false).unwrap();
stream.reset(ErrorCode::Cancel);
assert!(stream.recv_headers(false, false).is_err());
assert!(
!stream.is_receiving_headers(),
"rejected recv_headers must not flip headers_complete"
);
stream.headers_complete = false;
let result = stream.recv_continuation(Bytes::from_static(b"payload"), true);
assert!(
result.is_err(),
"recv_continuation state check must catch closed stream"
);
}
}