use std::collections::HashMap;
use super::capsule::{Capsule, MAX_STREAMS_LIMIT};
use super::error::{Error, ErrorCode};
use super::stream::Stream;
pub(crate) const MAX_BUFFERED_STREAMS: usize = 100;
pub(crate) const MAX_BUFFERED_DATAGRAMS: usize = 100;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BufferedStream {
pub stream_id: u64,
pub is_bidirectional: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum SessionState {
#[default]
Pending,
Connecting,
Established,
Draining,
Closed,
}
impl SessionState {
pub fn can_create_stream(self) -> bool {
matches!(self, Self::Established | Self::Draining)
}
pub fn can_send(self) -> bool {
matches!(self, Self::Established | Self::Draining)
}
pub fn can_receive(self) -> bool {
matches!(self, Self::Established | Self::Draining)
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct FlowControlLimits {
pub max_streams_uni: u64,
pub max_streams_bidi: u64,
pub max_data: u64,
}
impl FlowControlLimits {
pub fn new() -> Self {
Self::default()
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct FlowControlState {
pub streams_uni_opened: u64,
pub streams_bidi_opened: u64,
pub data_sent: u64,
pub streams_uni_received: u64,
pub streams_bidi_received: u64,
pub data_received: u64,
pub datagrams_received: u64,
}
impl FlowControlState {
pub fn new() -> Self {
Self::default()
}
}
#[derive(Debug, Clone)]
pub(crate) struct DirectionalStreamFlowControl {
concurrent_limit: u64,
advertised_max: u64,
total_closed: u64,
total_received: u64,
}
impl DirectionalStreamFlowControl {
pub(crate) fn new(concurrent_limit: u64) -> Self {
Self {
concurrent_limit,
advertised_max: concurrent_limit,
total_closed: 0,
total_received: 0,
}
}
pub(crate) fn check_received(&self) -> bool {
self.total_received < self.advertised_max
}
pub(crate) fn on_stream_received(&mut self) {
self.total_received += 1;
}
pub(crate) fn on_stream_closed(&mut self) -> Option<u64> {
self.total_closed += 1;
if self.concurrent_limit == 0 {
return None;
}
let remaining = self.advertised_max.saturating_sub(self.total_received);
let threshold = self.concurrent_limit / 2;
if remaining <= threshold {
let new_max = self
.total_closed
.saturating_add(self.concurrent_limit)
.min(MAX_STREAMS_LIMIT);
if new_max > self.advertised_max {
self.advertised_max = new_max;
return Some(new_max);
}
}
None
}
}
#[derive(Debug, Clone)]
pub(crate) struct DataFlowControl {
initial_window: u64,
advertised_max: u64,
total_consumed: u64,
total_received: u64,
}
impl DataFlowControl {
pub(crate) fn new(initial_window: u64) -> Self {
Self {
initial_window,
advertised_max: initial_window,
total_consumed: 0,
total_received: 0,
}
}
pub(crate) fn check_received(&self, bytes: u64) -> bool {
self.total_received + bytes <= self.advertised_max
}
pub(crate) fn on_data_received(&mut self, bytes: u64) {
self.total_received += bytes;
}
pub(crate) fn on_data_consumed(&mut self, bytes: u64) -> Option<u64> {
self.total_consumed += bytes;
if self.initial_window == 0 {
return None;
}
let remaining = self.advertised_max.saturating_sub(self.total_received);
let threshold = self.initial_window / 2;
if remaining <= threshold {
let new_max = self.total_consumed.saturating_add(self.initial_window);
if new_max > self.advertised_max {
self.advertised_max = new_max;
return Some(new_max);
}
}
None
}
}
#[derive(Debug, Clone, Default)]
struct SendBlockedState {
last_streams_blocked_uni: Option<u64>,
last_streams_blocked_bidi: Option<u64>,
last_data_blocked: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CapsuleProcessError {
Session(Error),
Connection(u64),
}
#[derive(Debug)]
pub struct Session {
session_id: u64,
state: SessionState,
streams: HashMap<u64, Stream>,
remote_limits: FlowControlLimits,
local_limits: FlowControlLimits,
flow_state: FlowControlState,
flow_control_enabled: bool,
pending_capsules: Vec<Capsule>,
pending_stream_resets: Vec<u64>,
close_error: Option<Error>,
buffered_streams: Vec<BufferedStream>,
buffered_datagrams: Vec<Vec<u8>>,
goaway_received: bool,
close_session_received: bool,
close_session_sent: bool,
recv_stream_fc_uni: DirectionalStreamFlowControl,
recv_stream_fc_bidi: DirectionalStreamFlowControl,
recv_data_fc: DataFlowControl,
send_blocked: SendBlockedState,
}
impl Session {
pub fn new(session_id: u64) -> Self {
Self {
session_id,
state: SessionState::Pending,
streams: HashMap::new(),
remote_limits: FlowControlLimits::new(),
local_limits: FlowControlLimits::new(),
flow_state: FlowControlState::new(),
flow_control_enabled: true,
pending_capsules: Vec::new(),
pending_stream_resets: Vec::new(),
close_error: None,
buffered_streams: Vec::new(),
buffered_datagrams: Vec::new(),
goaway_received: false,
close_session_received: false,
close_session_sent: false,
recv_stream_fc_uni: DirectionalStreamFlowControl::new(0),
recv_stream_fc_bidi: DirectionalStreamFlowControl::new(0),
recv_data_fc: DataFlowControl::new(0),
send_blocked: SendBlockedState::default(),
}
}
pub fn session_id(&self) -> u64 {
self.session_id
}
pub fn state(&self) -> SessionState {
self.state
}
pub fn is_established(&self) -> bool {
self.state == SessionState::Established
}
pub fn is_closed(&self) -> bool {
self.state == SessionState::Closed
}
pub fn set_connecting(&mut self) {
if self.state == SessionState::Pending {
self.state = SessionState::Connecting;
}
}
pub fn set_established(&mut self) {
if matches!(self.state, SessionState::Pending | SessionState::Connecting) {
self.state = SessionState::Established;
}
}
pub fn set_draining(&mut self) {
if self.state == SessionState::Established {
self.state = SessionState::Draining;
}
}
pub fn close(&mut self, error: Option<Error>) {
if self.state != SessionState::Closed {
self.pending_stream_resets = self.streams.keys().copied().collect();
}
self.state = SessionState::Closed;
self.close_error = error;
}
pub fn close_error(&self) -> Option<&Error> {
self.close_error.as_ref()
}
pub fn add_stream(&mut self, stream: Stream) {
let _ = self.try_add_stream(stream);
}
pub fn try_add_stream(&mut self, stream: Stream) -> bool {
if self.is_closed() {
return false;
}
self.streams.insert(stream.stream_id(), stream);
true
}
pub fn get_stream(&self, stream_id: u64) -> Option<&Stream> {
self.streams.get(&stream_id)
}
pub fn get_stream_mut(&mut self, stream_id: u64) -> Option<&mut Stream> {
self.streams.get_mut(&stream_id)
}
pub fn remove_stream(&mut self, stream_id: u64) -> Option<Stream> {
self.streams.remove(&stream_id)
}
pub fn stream_count(&self) -> usize {
self.streams.len()
}
pub fn streams(&self) -> impl Iterator<Item = &Stream> {
self.streams.values()
}
pub fn remote_limits(&self) -> &FlowControlLimits {
&self.remote_limits
}
pub fn remote_limits_mut(&mut self) -> &mut FlowControlLimits {
&mut self.remote_limits
}
pub fn local_limits(&self) -> &FlowControlLimits {
&self.local_limits
}
pub fn local_limits_mut(&mut self) -> &mut FlowControlLimits {
&mut self.local_limits
}
pub fn flow_state(&self) -> &FlowControlState {
&self.flow_state
}
pub fn flow_state_mut(&mut self) -> &mut FlowControlState {
&mut self.flow_state
}
pub fn set_flow_control_enabled(&mut self, enabled: bool) {
self.flow_control_enabled = enabled;
}
pub fn is_flow_control_enabled(&self) -> bool {
self.flow_control_enabled
}
pub fn initialize_local_limits(&mut self, limits: FlowControlLimits) {
self.local_limits = limits;
self.recv_stream_fc_uni = DirectionalStreamFlowControl::new(limits.max_streams_uni);
self.recv_stream_fc_bidi = DirectionalStreamFlowControl::new(limits.max_streams_bidi);
self.recv_data_fc = DataFlowControl::new(limits.max_data);
}
pub fn queue_initial_flow_control_capsules(&mut self, limits: FlowControlLimits) {
self.initialize_local_limits(limits);
if limits.max_streams_bidi > 0 {
self.pending_capsules.push(Capsule::MaxStreams {
bidirectional: true,
maximum: limits.max_streams_bidi,
});
}
if limits.max_streams_uni > 0 {
self.pending_capsules.push(Capsule::MaxStreams {
bidirectional: false,
maximum: limits.max_streams_uni,
});
}
if limits.max_data > 0 {
self.pending_capsules.push(Capsule::MaxData {
maximum: limits.max_data,
});
}
}
pub fn can_create_unidirectional_stream(&self) -> bool {
self.state.can_create_stream()
&& self.flow_state.streams_uni_opened < self.remote_limits.max_streams_uni
}
pub fn can_create_bidirectional_stream(&self) -> bool {
self.state.can_create_stream()
&& self.flow_state.streams_bidi_opened < self.remote_limits.max_streams_bidi
}
pub fn can_send_data(&self, bytes: u64) -> bool {
self.state.can_send() && self.flow_state.data_sent + bytes <= self.remote_limits.max_data
}
pub fn try_open_stream(&mut self, bidirectional: bool) -> bool {
if !self.state.can_create_stream() {
return false;
}
let (opened, limit, last_blocked) = if bidirectional {
(
&mut self.flow_state.streams_bidi_opened,
self.remote_limits.max_streams_bidi,
&mut self.send_blocked.last_streams_blocked_bidi,
)
} else {
(
&mut self.flow_state.streams_uni_opened,
self.remote_limits.max_streams_uni,
&mut self.send_blocked.last_streams_blocked_uni,
)
};
if *opened < limit {
*opened += 1;
return true;
}
if self.flow_control_enabled && *last_blocked != Some(limit) {
*last_blocked = Some(limit);
self.pending_capsules.push(Capsule::StreamsBlocked {
bidirectional,
maximum: limit,
});
}
false
}
pub fn try_send_data(&mut self, bytes: u64) -> bool {
if !self.state.can_send() {
return false;
}
if self.flow_state.data_sent + bytes <= self.remote_limits.max_data {
self.flow_state.data_sent += bytes;
return true;
}
if self.flow_control_enabled {
let limit = self.remote_limits.max_data;
if self.send_blocked.last_data_blocked != Some(limit) {
self.send_blocked.last_data_blocked = Some(limit);
self.pending_capsules
.push(Capsule::DataBlocked { maximum: limit });
}
}
false
}
pub fn check_received_data(&self, bytes: u64) -> bool {
if !self.flow_control_enabled {
return true;
}
self.recv_data_fc.check_received(bytes)
}
pub fn add_received_data(&mut self, bytes: u64) {
self.flow_state.data_received += bytes;
self.recv_data_fc.on_data_received(bytes);
}
pub fn check_received_stream(&self, bidirectional: bool) -> bool {
if !self.flow_control_enabled {
return true;
}
if bidirectional {
self.recv_stream_fc_bidi.check_received()
} else {
self.recv_stream_fc_uni.check_received()
}
}
pub fn add_received_stream(&mut self, bidirectional: bool) {
if bidirectional {
self.flow_state.streams_bidi_received += 1;
self.recv_stream_fc_bidi.on_stream_received();
} else {
self.flow_state.streams_uni_received += 1;
self.recv_stream_fc_uni.on_stream_received();
}
}
pub fn on_remote_stream_closed(&mut self, bidirectional: bool) {
if !self.flow_control_enabled || self.is_closed() {
return;
}
let fc = if bidirectional {
&mut self.recv_stream_fc_bidi
} else {
&mut self.recv_stream_fc_uni
};
if let Some(new_max) = fc.on_stream_closed() {
self.pending_capsules.push(Capsule::MaxStreams {
bidirectional,
maximum: new_max,
});
if bidirectional {
self.local_limits.max_streams_bidi = new_max;
} else {
self.local_limits.max_streams_uni = new_max;
}
}
}
pub fn on_data_consumed(&mut self, bytes: u64) {
if !self.flow_control_enabled || self.is_closed() {
return;
}
if let Some(new_max) = self.recv_data_fc.on_data_consumed(bytes) {
self.pending_capsules
.push(Capsule::MaxData { maximum: new_max });
self.local_limits.max_data = new_max;
}
}
pub fn add_received_datagram(&mut self) {
self.flow_state.datagrams_received += 1;
}
pub fn queue_capsule(&mut self, capsule: Capsule) {
if self.is_closed() {
return;
}
self.pending_capsules.push(capsule);
}
pub fn pending_capsules(&self) -> &[Capsule] {
&self.pending_capsules
}
pub fn clear_pending_capsules(&mut self) {
self.pending_capsules.clear();
}
pub fn take_pending_capsules(&mut self) -> Vec<Capsule> {
std::mem::take(&mut self.pending_capsules)
}
pub fn close_with_error(&mut self, code: u32, message: impl Into<String>) {
if self.is_closed() {
return;
}
let mut message = message.into();
if message.len() > 1024 {
let mut end = 1024;
while end > 0 && !message.is_char_boundary(end) {
end -= 1;
}
message.truncate(end);
}
let application_error = Error::application(code, message.clone());
let capsule = Capsule::CloseSession {
error_code: code,
message,
};
self.queue_capsule(capsule);
self.close_session_sent = true;
self.close(Some(application_error));
}
pub fn drain(&mut self) {
if self.state == SessionState::Established {
self.queue_capsule(Capsule::DrainSession);
self.set_draining();
}
}
pub fn buffer_incoming_stream(&mut self, stream_id: u64, is_bidirectional: bool) -> bool {
if self.buffered_streams.len() >= MAX_BUFFERED_STREAMS {
return false;
}
self.buffered_streams.push(BufferedStream {
stream_id,
is_bidirectional,
});
true
}
pub fn take_buffered_streams(&mut self) -> Vec<BufferedStream> {
std::mem::take(&mut self.buffered_streams)
}
pub fn buffer_datagram(&mut self, data: Vec<u8>) -> bool {
if self.buffered_datagrams.len() >= MAX_BUFFERED_DATAGRAMS {
return false;
}
self.buffered_datagrams.push(data);
true
}
pub fn take_buffered_datagrams(&mut self) -> Vec<Vec<u8>> {
std::mem::take(&mut self.buffered_datagrams)
}
pub fn handle_goaway(&mut self) {
self.goaway_received = true;
self.set_draining();
}
pub fn is_goaway_received(&self) -> bool {
self.goaway_received
}
pub fn is_close_session_received(&self) -> bool {
self.close_session_received
}
pub fn is_close_session_sent(&self) -> bool {
self.close_session_sent
}
pub fn stream_ids_to_reset(&self) -> Vec<u64> {
self.streams.keys().copied().collect()
}
pub fn take_pending_stream_resets(&mut self) -> Vec<u64> {
std::mem::take(&mut self.pending_stream_resets)
}
pub fn on_connect_stream_closed(&mut self) {
if !self.close_session_received {
self.close_session_received = true;
self.close(Some(Error::application(0, "")));
}
}
pub fn process_capsule(&mut self, capsule: &Capsule) -> Result<(), CapsuleProcessError> {
if capsule.is_prohibited_in_http3() {
return Err(CapsuleProcessError::Session(Error::Protocol(
ErrorCode::FlowControlError,
)));
}
if !self.flow_control_enabled && capsule.is_flow_control() {
return Ok(());
}
match capsule {
Capsule::CloseSession {
error_code,
message,
} => {
self.close_session_received = true;
self.close(Some(Error::application(*error_code, message.clone())));
}
Capsule::DrainSession => {
self.set_draining();
}
Capsule::MaxData { maximum } => {
if *maximum < self.remote_limits.max_data {
return Err(CapsuleProcessError::Session(Error::Protocol(
ErrorCode::FlowControlError,
)));
}
self.remote_limits.max_data = *maximum;
self.send_blocked.last_data_blocked = None;
}
Capsule::MaxStreams {
bidirectional,
maximum,
} => {
if *maximum > MAX_STREAMS_LIMIT {
return Err(CapsuleProcessError::Connection(
crate::webtransport::capsule::H3_DATAGRAM_ERROR,
));
}
if *bidirectional {
if *maximum < self.remote_limits.max_streams_bidi {
return Err(CapsuleProcessError::Session(Error::Protocol(
ErrorCode::FlowControlError,
)));
}
self.remote_limits.max_streams_bidi = *maximum;
self.send_blocked.last_streams_blocked_bidi = None;
} else {
if *maximum < self.remote_limits.max_streams_uni {
return Err(CapsuleProcessError::Session(Error::Protocol(
ErrorCode::FlowControlError,
)));
}
self.remote_limits.max_streams_uni = *maximum;
self.send_blocked.last_streams_blocked_uni = None;
}
}
Capsule::DataBlocked { .. } | Capsule::StreamsBlocked { .. } => {
}
Capsule::Unknown { .. } => {
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_session_creation() {
let session = Session::new(0);
assert_eq!(session.session_id(), 0);
assert_eq!(session.state(), SessionState::Pending);
assert!(!session.is_established());
assert!(!session.is_closed());
}
#[test]
fn test_session_state_transitions() {
let mut session = Session::new(0);
session.set_connecting();
assert_eq!(session.state(), SessionState::Connecting);
session.set_established();
assert_eq!(session.state(), SessionState::Established);
assert!(session.is_established());
session.set_draining();
assert_eq!(session.state(), SessionState::Draining);
session.close(None);
assert_eq!(session.state(), SessionState::Closed);
assert!(session.is_closed());
}
#[test]
fn test_session_state_can_create_stream() {
assert!(!SessionState::Pending.can_create_stream());
assert!(!SessionState::Connecting.can_create_stream());
assert!(SessionState::Established.can_create_stream());
assert!(SessionState::Draining.can_create_stream());
assert!(!SessionState::Closed.can_create_stream());
}
#[test]
fn test_session_stream_management() {
let mut session = Session::new(0);
let stream = Stream::new(4, 0, true);
session.add_stream(stream);
assert_eq!(session.stream_count(), 1);
assert!(session.get_stream(4).is_some());
let removed = session.remove_stream(4);
assert!(removed.is_some());
assert_eq!(session.stream_count(), 0);
}
#[test]
fn test_session_flow_control() {
let mut session = Session::new(0);
session.set_established();
session.remote_limits_mut().max_streams_uni = 10;
session.remote_limits_mut().max_streams_bidi = 5;
session.remote_limits_mut().max_data = 1024;
assert!(session.can_create_unidirectional_stream());
assert!(session.can_create_bidirectional_stream());
assert!(session.can_send_data(512));
assert!(session.can_send_data(1024));
assert!(!session.can_send_data(1025));
}
#[test]
fn test_session_capsule_queue() {
let mut session = Session::new(0);
session.queue_capsule(Capsule::DrainSession);
assert_eq!(session.pending_capsules().len(), 1);
let capsules = session.take_pending_capsules();
assert_eq!(capsules.len(), 1);
assert!(session.pending_capsules().is_empty());
}
#[test]
fn test_session_process_max_data() {
let mut session = Session::new(0);
session.set_flow_control_enabled(true);
session
.process_capsule(&Capsule::MaxData { maximum: 1000 })
.unwrap();
assert_eq!(session.remote_limits().max_data, 1000);
let result = session.process_capsule(&Capsule::MaxData { maximum: 500 });
assert!(result.is_err());
}
#[test]
fn test_session_process_max_streams() {
let mut session = Session::new(0);
session.set_flow_control_enabled(true);
session
.process_capsule(&Capsule::MaxStreams {
bidirectional: true,
maximum: 100,
})
.unwrap();
assert_eq!(session.remote_limits().max_streams_bidi, 100);
session
.process_capsule(&Capsule::MaxStreams {
bidirectional: false,
maximum: 50,
})
.unwrap();
assert_eq!(session.remote_limits().max_streams_uni, 50);
}
#[test]
fn test_session_close_with_error() {
let mut session = Session::new(0);
session.set_established();
session.close_with_error(42, "test error");
assert!(session.is_closed());
assert_eq!(session.pending_capsules().len(), 1);
}
#[test]
fn test_session_close_with_error_on_closed_session() {
let mut session = Session::new(0);
session.set_established();
session
.process_capsule(&Capsule::CloseSession {
error_code: 1,
message: "peer closed".to_string(),
})
.expect("CloseSession capsule should be accepted");
assert!(session.is_closed());
assert!(!session.is_close_session_sent());
session.close_with_error(42, "local close");
assert!(!session.is_close_session_sent());
assert!(session.pending_capsules().is_empty());
}
#[test]
fn test_session_process_flow_control_capsule_ignored_when_disabled() {
let mut session = Session::new(0);
session.set_flow_control_enabled(false);
session
.process_capsule(&Capsule::MaxData { maximum: 1000 })
.unwrap();
assert_eq!(session.remote_limits().max_data, 0);
}
#[test]
fn test_session_process_prohibited_capsule_returns_error() {
let mut session = Session::new(0);
let result = session.process_capsule(&Capsule::Unknown {
capsule_type: 0x190B4D3E,
payload: vec![],
});
assert_eq!(
result,
Err(CapsuleProcessError::Session(Error::Protocol(
ErrorCode::FlowControlError
)))
);
}
#[test]
fn test_session_drain() {
let mut session = Session::new(0);
session.set_established();
session.drain();
assert_eq!(session.state(), SessionState::Draining);
assert_eq!(session.pending_capsules().len(), 1);
}
#[test]
fn test_session_buffer_incoming_stream() {
let mut session = Session::new(0);
assert!(session.buffer_incoming_stream(4, true));
assert!(session.buffer_incoming_stream(8, false));
let streams = session.take_buffered_streams();
assert_eq!(streams.len(), 2);
assert_eq!(streams[0].stream_id, 4);
assert!(streams[0].is_bidirectional);
assert_eq!(streams[1].stream_id, 8);
assert!(!streams[1].is_bidirectional);
let streams = session.take_buffered_streams();
assert!(streams.is_empty());
}
#[test]
fn test_session_buffer_stream_limit() {
let mut session = Session::new(0);
for i in 0..MAX_BUFFERED_STREAMS {
assert!(session.buffer_incoming_stream(i as u64 * 4, false));
}
assert!(!session.buffer_incoming_stream(99999, false));
}
#[test]
fn test_session_buffer_datagram() {
let mut session = Session::new(0);
assert!(session.buffer_datagram(vec![1, 2, 3]));
assert!(session.buffer_datagram(vec![4, 5, 6]));
let datagrams = session.take_buffered_datagrams();
assert_eq!(datagrams.len(), 2);
assert_eq!(datagrams[0], vec![1, 2, 3]);
assert_eq!(datagrams[1], vec![4, 5, 6]);
assert!(session.take_buffered_datagrams().is_empty());
}
#[test]
fn test_session_buffer_datagram_limit() {
let mut session = Session::new(0);
for _ in 0..MAX_BUFFERED_DATAGRAMS {
assert!(session.buffer_datagram(vec![0]));
}
assert!(!session.buffer_datagram(vec![0xff]));
}
#[test]
fn test_session_handle_goaway() {
let mut session = Session::new(0);
session.set_established();
assert!(!session.is_goaway_received());
session.handle_goaway();
assert!(session.is_goaway_received());
assert_eq!(session.state(), SessionState::Draining);
}
#[test]
fn test_session_close_session_tracking() {
let mut session = Session::new(0);
session.set_established();
assert!(!session.is_close_session_sent());
assert!(!session.is_close_session_received());
session.close_with_error(42, "bye");
assert!(session.is_close_session_sent());
assert!(!session.is_close_session_received());
}
#[test]
fn test_session_process_close_session_capsule() {
let mut session = Session::new(0);
session.set_established();
assert!(!session.is_close_session_received());
session
.process_capsule(&Capsule::CloseSession {
error_code: 0,
message: String::new(),
})
.unwrap();
assert!(session.is_close_session_received());
assert!(session.is_closed());
}
#[test]
fn test_session_on_connect_stream_closed() {
let mut session = Session::new(0);
session.set_established();
assert!(!session.is_close_session_received());
session.on_connect_stream_closed();
assert!(session.is_close_session_received());
assert!(session.is_closed());
}
#[test]
fn test_session_on_connect_stream_closed_idempotent() {
let mut session = Session::new(0);
session.set_established();
session
.process_capsule(&Capsule::CloseSession {
error_code: 1,
message: "error".to_string(),
})
.unwrap();
assert!(session.is_close_session_received());
session.on_connect_stream_closed();
assert!(session.is_close_session_received());
}
#[test]
fn test_session_stream_ids_to_reset() {
let mut session = Session::new(0);
session.set_established();
session.add_stream(Stream::new(4, 0, true));
session.add_stream(Stream::new(8, 0, false));
let mut ids = session.stream_ids_to_reset();
ids.sort();
assert_eq!(ids, vec![4, 8]);
}
#[test]
fn test_session_take_pending_stream_resets_on_close() {
let mut session = Session::new(0);
session.set_established();
session.add_stream(Stream::new(4, 0, true));
session.add_stream(Stream::new(8, 0, false));
session.close(None);
let mut ids = session.take_pending_stream_resets();
ids.sort();
assert_eq!(ids, vec![4, 8]);
assert!(session.take_pending_stream_resets().is_empty());
}
#[test]
fn test_session_try_add_stream_rejects_after_close() {
let mut session = Session::new(0);
session.close(None);
assert!(!session.try_add_stream(Stream::new(4, 0, true)));
assert_eq!(session.stream_count(), 0);
}
#[test]
fn test_process_capsule_max_streams_exceeds_limit() {
let mut session = Session::new(0);
session.set_established();
let result = session.process_capsule(&Capsule::MaxStreams {
bidirectional: true,
maximum: (1u64 << 60) + 1,
});
assert!(matches!(result, Err(CapsuleProcessError::Connection(_))));
}
#[test]
fn test_process_capsule_max_streams_at_limit() {
let mut session = Session::new(0);
session.set_established();
let result = session.process_capsule(&Capsule::MaxStreams {
bidirectional: true,
maximum: 1u64 << 60,
});
assert!(result.is_ok());
}
#[test]
fn test_process_capsule_max_streams_decreased() {
let mut session = Session::new(0);
session.set_established();
session
.process_capsule(&Capsule::MaxStreams {
bidirectional: false,
maximum: 10,
})
.unwrap();
let result = session.process_capsule(&Capsule::MaxStreams {
bidirectional: false,
maximum: 5,
});
assert!(matches!(result, Err(CapsuleProcessError::Session(_))));
}
#[test]
fn test_check_received_data() {
let mut session = Session::new(0);
session.set_established();
session.initialize_local_limits(FlowControlLimits {
max_data: 100,
..FlowControlLimits::default()
});
assert!(session.check_received_data(50));
session.add_received_data(50);
assert!(session.check_received_data(50));
assert!(!session.check_received_data(51));
}
#[test]
fn test_check_received_stream() {
let mut session = Session::new(0);
session.set_established();
session.initialize_local_limits(FlowControlLimits {
max_streams_uni: 2,
..FlowControlLimits::default()
});
assert!(session.check_received_stream(false));
session.add_received_stream(false);
assert!(session.check_received_stream(false));
session.add_received_stream(false);
assert!(!session.check_received_stream(false));
}
#[test]
fn test_received_datagram_counter() {
let mut session = Session::new(0);
assert_eq!(session.flow_state().datagrams_received, 0);
session.add_received_datagram();
session.add_received_datagram();
assert_eq!(session.flow_state().datagrams_received, 2);
}
#[test]
fn test_initialize_local_limits() {
let mut session = Session::new(0);
session.initialize_local_limits(FlowControlLimits {
max_streams_uni: 100,
max_streams_bidi: 50,
max_data: 1024,
});
assert_eq!(session.local_limits().max_streams_uni, 100);
assert_eq!(session.local_limits().max_streams_bidi, 50);
assert_eq!(session.local_limits().max_data, 1024);
assert!(session.check_received_stream(false));
assert!(session.check_received_stream(true));
}
#[test]
fn test_on_remote_stream_closed_generates_max_streams() {
let mut session = Session::new(0);
session.set_established();
session.initialize_local_limits(FlowControlLimits {
max_streams_uni: 4,
..FlowControlLimits::default()
});
for _ in 0..4 {
session.add_received_stream(false);
}
session.on_remote_stream_closed(false);
session.on_remote_stream_closed(false);
session.on_remote_stream_closed(false);
let capsules = session.take_pending_capsules();
let max_streams_capsules: Vec<_> = capsules
.iter()
.filter(|c| {
matches!(
c,
Capsule::MaxStreams {
bidirectional: false,
..
}
)
})
.collect();
assert!(
!max_streams_capsules.is_empty(),
"WT_MAX_STREAMS capsule should be generated"
);
for c in &max_streams_capsules {
if let Capsule::MaxStreams { maximum, .. } = c {
assert!(*maximum > 4, "new max should be greater than initial");
}
}
}
#[test]
fn test_on_remote_stream_closed_no_op_when_disabled() {
let mut session = Session::new(0);
session.set_established();
session.set_flow_control_enabled(false);
session.initialize_local_limits(FlowControlLimits {
max_streams_uni: 4,
..FlowControlLimits::default()
});
for _ in 0..4 {
session.add_received_stream(false);
}
for _ in 0..4 {
session.on_remote_stream_closed(false);
}
assert!(session.take_pending_capsules().is_empty());
}
#[test]
fn test_on_remote_stream_closed_no_op_when_closed() {
let mut session = Session::new(0);
session.set_established();
session.initialize_local_limits(FlowControlLimits {
max_streams_uni: 4,
..FlowControlLimits::default()
});
session.close(None);
for _ in 0..4 {
session.on_remote_stream_closed(false);
}
assert!(session.take_pending_capsules().is_empty());
}
#[test]
fn test_try_open_stream_success() {
let mut session = Session::new(0);
session.set_established();
session.remote_limits_mut().max_streams_uni = 3;
assert!(session.try_open_stream(false));
assert!(session.try_open_stream(false));
assert!(session.try_open_stream(false));
assert!(!session.try_open_stream(false));
assert_eq!(session.flow_state().streams_uni_opened, 3);
}
#[test]
fn test_try_open_stream_blocked_dedup() {
let mut session = Session::new(0);
session.set_established();
session.remote_limits_mut().max_streams_uni = 0;
assert!(!session.try_open_stream(false));
assert!(!session.try_open_stream(false));
let capsules = session.take_pending_capsules();
let blocked_count = capsules
.iter()
.filter(|c| matches!(c, Capsule::StreamsBlocked { .. }))
.count();
assert_eq!(
blocked_count, 1,
"STREAMS_BLOCKED should be sent only once per maximum"
);
}
#[test]
fn test_try_open_stream_blocked_reset_after_max_streams() {
let mut session = Session::new(0);
session.set_established();
session.remote_limits_mut().max_streams_bidi = 1;
assert!(session.try_open_stream(true));
assert!(!session.try_open_stream(true));
session
.process_capsule(&Capsule::MaxStreams {
bidirectional: true,
maximum: 2,
})
.unwrap();
assert!(session.try_open_stream(true));
assert!(!session.try_open_stream(true));
let capsules = session.take_pending_capsules();
let blocked_count = capsules
.iter()
.filter(|c| matches!(c, Capsule::StreamsBlocked { .. }))
.count();
assert_eq!(
blocked_count, 2,
"STREAMS_BLOCKED should be sent again after new MAX_STREAMS"
);
}
#[test]
fn test_try_send_data_success() {
let mut session = Session::new(0);
session.set_established();
session.remote_limits_mut().max_data = 100;
assert!(session.try_send_data(50));
assert!(session.try_send_data(50));
assert!(!session.try_send_data(1));
assert_eq!(session.flow_state().data_sent, 100);
}
#[test]
fn test_try_send_data_blocked_dedup() {
let mut session = Session::new(0);
session.set_established();
session.remote_limits_mut().max_data = 0;
assert!(!session.try_send_data(1));
assert!(!session.try_send_data(1));
let capsules = session.take_pending_capsules();
let blocked_count = capsules
.iter()
.filter(|c| matches!(c, Capsule::DataBlocked { .. }))
.count();
assert_eq!(
blocked_count, 1,
"DATA_BLOCKED should be sent only once per maximum"
);
}
#[test]
fn test_on_data_consumed_generates_max_data() {
let mut session = Session::new(0);
session.set_established();
session.initialize_local_limits(FlowControlLimits {
max_data: 100,
..FlowControlLimits::default()
});
session.add_received_data(100);
session.on_data_consumed(80);
let capsules = session.take_pending_capsules();
let max_data_capsules: Vec<_> = capsules
.iter()
.filter(|c| matches!(c, Capsule::MaxData { .. }))
.collect();
assert!(
!max_data_capsules.is_empty(),
"WT_MAX_DATA capsule should be generated"
);
for c in &max_data_capsules {
if let Capsule::MaxData { maximum } = c {
assert!(
*maximum > 100,
"new max_data should be greater than initial"
);
}
}
}
#[test]
fn test_on_data_consumed_no_op_when_disabled() {
let mut session = Session::new(0);
session.set_established();
session.set_flow_control_enabled(false);
session.initialize_local_limits(FlowControlLimits {
max_data: 100,
..FlowControlLimits::default()
});
session.add_received_data(100);
session.on_data_consumed(80);
assert!(session.take_pending_capsules().is_empty());
}
#[test]
fn test_advertised_max_does_not_exceed_limit() {
let mut session = Session::new(0);
session.set_established();
session.initialize_local_limits(FlowControlLimits {
max_streams_uni: MAX_STREAMS_LIMIT,
..FlowControlLimits::default()
});
for _ in 0..10 {
session.add_received_stream(false);
}
for _ in 0..10 {
session.on_remote_stream_closed(false);
}
for capsule in session.take_pending_capsules() {
if let Capsule::MaxStreams { maximum, .. } = capsule {
assert!(maximum <= MAX_STREAMS_LIMIT);
}
}
}
#[test]
fn test_queue_initial_flow_control_capsules() {
let mut session = Session::new(0);
session.set_established();
let limits = FlowControlLimits {
max_streams_bidi: 100,
max_streams_uni: 50,
max_data: 8 * 1024 * 1024,
};
session.queue_initial_flow_control_capsules(limits);
assert_eq!(session.local_limits().max_streams_bidi, 100);
assert_eq!(session.local_limits().max_streams_uni, 50);
assert_eq!(session.local_limits().max_data, 8 * 1024 * 1024);
let capsules = session.take_pending_capsules();
assert_eq!(capsules.len(), 3);
assert_eq!(
capsules[0],
Capsule::MaxStreams {
bidirectional: true,
maximum: 100
}
);
assert_eq!(
capsules[1],
Capsule::MaxStreams {
bidirectional: false,
maximum: 50
}
);
assert_eq!(capsules[2], Capsule::MaxData { maximum: 8388608 });
}
#[test]
fn test_queue_initial_flow_control_capsules_zero_values() {
let mut session = Session::new(0);
session.set_established();
let limits = FlowControlLimits {
max_streams_bidi: 0,
max_streams_uni: 0,
max_data: 0,
};
session.queue_initial_flow_control_capsules(limits);
assert!(session.take_pending_capsules().is_empty());
}
}