use crate::api::ErrorKind;
use crate::api::ResetStreamsError;
use crate::api::SocketEvent;
use crate::api::SocketTime;
use crate::api::StreamId;
use crate::packet::chunk::Chunk;
use crate::packet::incoming_ssn_reset_request_parameter::IncomingSsnResetRequestParameter;
use crate::packet::outgoing_ssn_reset_request_parameter::OutgoingSsnResetRequestParameter;
use crate::packet::parameter::Parameter;
use crate::packet::re_config_chunk::ReConfigChunk;
use crate::packet::reconfiguration_response_parameter::ReconfigurationResponseParameter;
use crate::packet::reconfiguration_response_parameter::ReconfigurationResponseResult;
use crate::socket::context::Context;
use crate::socket::state::State;
use crate::socket::transmission_control_block::CurrentResetRequest;
use crate::socket::transmission_control_block::InflightResetRequest;
use crate::socket::transmission_control_block::TransmissionControlBlock;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ReqSeqNbrValidationResult {
Valid,
Retransmission,
BadSequenceNumber,
}
pub(crate) fn do_reset_streams(
state: &mut State,
ctx: &mut Context,
now: SocketTime,
outgoing_streams: &[StreamId],
) -> Result<(), ResetStreamsError> {
let Some(tcb) = state.tcb_mut() else {
return Err(ResetStreamsError::NotConnected);
};
if !tcb.capabilities.reconfig {
return Err(ResetStreamsError::NotSupported);
}
for stream_id in outgoing_streams {
ctx.send_queue.prepare_reset_stream(*stream_id);
}
ctx.send_buffered_packets(state, now);
Ok(())
}
pub(crate) fn handle_reconfig(
state: &mut State,
ctx: &mut Context,
now: SocketTime,
chunk: ReConfigChunk,
) {
let Some(tcb) = state.tcb_mut() else {
return;
};
if chunk.parameters.is_empty() {
ctx.events.borrow_mut().add(SocketEvent::OnError(
ErrorKind::ProtocolViolation,
"RE-CONFIG chunk must have at least one parameter".into(),
));
return;
}
let mut responses: Vec<Parameter> = Vec::new();
let mut has_seen_outgoing_reset_request = false;
for parameter in chunk.parameters {
match parameter {
Parameter::OutgoingSsnResetRequest(req) => {
if has_seen_outgoing_reset_request {
ctx.events.borrow_mut().add(SocketEvent::OnError(
ErrorKind::ProtocolViolation,
"RE-CONFIG chunk must not have multiple Outgoing SSN Reset Request parameters"
.into(),
));
return;
}
has_seen_outgoing_reset_request = true;
handle_outgoing_reset_request(tcb, ctx, req, &mut responses);
}
Parameter::IncomingSsnResetRequest(req) => {
handle_incoming_reset_request(tcb, req, &mut responses);
}
Parameter::ReconfigurationResponse(resp) => {
handle_reconfiguration_response(tcb, ctx, now, resp);
}
_ => {}
}
}
if !responses.is_empty() {
ctx.events.borrow_mut().add(SocketEvent::SendPacket(
tcb.new_packet().add(&Chunk::ReConfig(ReConfigChunk { parameters: responses })).build(),
));
ctx.tx_packets_count += 1;
}
ctx.send_buffered_packets(state, now);
}
fn handle_outgoing_reset_request(
tcb: &mut TransmissionControlBlock,
ctx: &mut Context,
req: OutgoingSsnResetRequestParameter,
responses: &mut Vec<Parameter>,
) {
let validation_result =
validate_req_seq_nbr(req.request_seq_nbr, tcb.last_processed_req_seq_nbr);
if validation_result == ReqSeqNbrValidationResult::BadSequenceNumber {
responses.push(Parameter::ReconfigurationResponse(ReconfigurationResponseParameter {
response_seq_nbr: req.request_seq_nbr,
result: ReconfigurationResponseResult::ErrorBadSequenceNumber,
sender_next_tsn: None,
receiver_next_tsn: None,
}));
return;
}
if validation_result == ReqSeqNbrValidationResult::Retransmission
&& tcb.last_processed_req_result != ReconfigurationResponseResult::InProgress
{
responses.push(Parameter::ReconfigurationResponse(ReconfigurationResponseParameter {
response_seq_nbr: req.request_seq_nbr,
result: tcb.last_processed_req_result,
sender_next_tsn: None,
receiver_next_tsn: None,
}));
return;
}
tcb.last_processed_req_seq_nbr = req.request_seq_nbr;
tcb.last_processed_req_result =
if req.sender_last_assigned_tsn > tcb.data_tracker.last_cumulative_acked_tsn() {
tcb.reassembly_queue.enter_deferred_reset(req.sender_last_assigned_tsn, &req.streams);
ReconfigurationResponseResult::InProgress
} else {
tcb.reassembly_queue.reset_streams_and_leave_deferred_reset(&req.streams);
ctx.events.borrow_mut().add(SocketEvent::OnIncomingStreamReset(req.streams));
ReconfigurationResponseResult::SuccessPerformed
};
responses.push(Parameter::ReconfigurationResponse(ReconfigurationResponseParameter {
response_seq_nbr: req.request_seq_nbr,
result: tcb.last_processed_req_result,
sender_next_tsn: None,
receiver_next_tsn: None,
}));
}
fn handle_incoming_reset_request(
tcb: &mut TransmissionControlBlock,
req: IncomingSsnResetRequestParameter,
responses: &mut Vec<Parameter>,
) {
let validation_result =
validate_req_seq_nbr(req.request_seq_nbr, tcb.last_processed_req_seq_nbr);
if validation_result == ReqSeqNbrValidationResult::Valid
|| validation_result == ReqSeqNbrValidationResult::Retransmission
{
responses.push(Parameter::ReconfigurationResponse(ReconfigurationResponseParameter {
response_seq_nbr: req.request_seq_nbr,
result: ReconfigurationResponseResult::SuccessNothingToDo,
sender_next_tsn: None,
receiver_next_tsn: None,
}));
tcb.last_processed_req_seq_nbr = req.request_seq_nbr;
tcb.last_processed_req_result = ReconfigurationResponseResult::SuccessNothingToDo;
} else {
responses.push(Parameter::ReconfigurationResponse(ReconfigurationResponseParameter {
response_seq_nbr: req.request_seq_nbr,
result: ReconfigurationResponseResult::ErrorBadSequenceNumber,
sender_next_tsn: None,
receiver_next_tsn: None,
}));
}
}
fn handle_reconfiguration_response(
tcb: &mut TransmissionControlBlock,
ctx: &mut Context,
now: SocketTime,
resp: ReconfigurationResponseParameter,
) {
if let CurrentResetRequest::Inflight(InflightResetRequest {
request_sequence_number,
request,
})
| CurrentResetRequest::Deferred(InflightResetRequest {
request_sequence_number,
request,
}) = &tcb.current_reset_request
{
if resp.response_seq_nbr == *request_sequence_number {
tcb.reconfig_timer.stop();
tcb.current_reset_request = match resp.result {
ReconfigurationResponseResult::SuccessNothingToDo
| ReconfigurationResponseResult::SuccessPerformed => {
ctx.events
.borrow_mut()
.add(SocketEvent::OnStreamsResetPerformed(request.streams.clone()));
ctx.send_queue.commit_reset_streams();
CurrentResetRequest::None
}
ReconfigurationResponseResult::InProgress => {
tcb.reconfig_timer.set_duration(tcb.rto.rto());
tcb.reconfig_timer.start(now);
CurrentResetRequest::Deferred(InflightResetRequest {
request_sequence_number: *request_sequence_number,
request: request.clone(),
})
}
ReconfigurationResponseResult::Denied
| ReconfigurationResponseResult::ErrorWrongSSN
| ReconfigurationResponseResult::ErrorRequestAlreadyInProgress
| ReconfigurationResponseResult::ErrorBadSequenceNumber => {
ctx.events
.borrow_mut()
.add(SocketEvent::OnStreamsResetFailed(request.streams.clone()));
ctx.send_queue.rollback_reset_streams();
CurrentResetRequest::None
}
}
}
}
}
pub(crate) fn handle_reconfig_timeout(
state: &mut State,
ctx: &mut Context,
now: SocketTime,
) -> bool {
let tcb = state.tcb_mut().unwrap();
if !tcb.reconfig_timer.expire(now) {
return false;
}
match tcb.current_reset_request {
CurrentResetRequest::None => unreachable!(),
CurrentResetRequest::Prepared(..) => {
}
CurrentResetRequest::Inflight(..) => {
ctx.tx_error_counter.increment();
}
CurrentResetRequest::Deferred(ref req) => {
tcb.current_reset_request = CurrentResetRequest::Inflight(InflightResetRequest {
request_sequence_number: req.request_sequence_number,
request: req.request.clone(),
});
}
}
if !ctx.tx_error_counter.is_exhausted() {
tcb.reconfig_timer.set_duration(tcb.rto.rto());
let mut builder = tcb.new_packet();
tcb.add_prepared_ssn_reset_request(&mut builder);
ctx.events.borrow_mut().add(SocketEvent::SendPacket(builder.build()));
ctx.tx_packets_count += 1;
}
true
}
fn validate_req_seq_nbr(
req_seq_nbr: u32,
last_processed_req_seq_nbr: u32,
) -> ReqSeqNbrValidationResult {
if req_seq_nbr == last_processed_req_seq_nbr {
ReqSeqNbrValidationResult::Retransmission
} else if req_seq_nbr != last_processed_req_seq_nbr.wrapping_add(1) {
ReqSeqNbrValidationResult::BadSequenceNumber
} else {
ReqSeqNbrValidationResult::Valid
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::EventSink;
use crate::api::Message;
use crate::api::Options;
use crate::api::PpId;
use crate::api::SctpImplementation;
use crate::api::SendOptions;
use crate::events::Events;
use crate::packet::SkippedStream;
use crate::packet::sctp_packet::SctpPacket;
use crate::socket::capabilities::Capabilities;
use crate::socket::context::TxErrorCounter;
use crate::socket::transmission_control_block::TransmissionControlBlock;
use crate::testing::data_sequencer::DataSequencer;
use crate::timer::BackoffAlgorithm;
use crate::timer::Timer;
use crate::tx::send_queue::SendQueue;
use crate::types::Ssn;
use crate::types::Tsn;
use std::cell::RefCell;
use std::rc::Rc;
use std::time::Duration;
fn create_test_objects(
my_initial_tsn: Tsn,
peer_initial_tsn: Tsn,
) -> (State, Context, Rc<RefCell<Events>>) {
let options = Options::default();
let capabilities = Capabilities { reconfig: true, ..Default::default() };
let events = Rc::new(RefCell::new(Events::new()));
let tcb = TransmissionControlBlock::new(
&options,
0, my_initial_tsn,
0, peer_initial_tsn,
0, 131072, capabilities,
Rc::clone(&events) as Rc<RefCell<dyn EventSink>>,
);
let state = State::Established(tcb);
let send_queue =
SendQueue::new(options.mtu, &options, Rc::clone(&events) as Rc<RefCell<dyn EventSink>>);
let context = Context {
options,
events: Rc::clone(&events) as Rc<RefCell<dyn EventSink>>,
send_queue,
limit_forward_tsn_until: SocketTime::zero(),
heartbeat_interval: Timer::new(
Duration::from_secs(30),
BackoffAlgorithm::Fixed,
None,
None,
),
heartbeat_timeout: Timer::new(
Duration::from_secs(1),
BackoffAlgorithm::Exponential,
Some(0),
None,
),
heartbeat_counter: 0,
heartbeat_sent_time: SocketTime::zero(),
rx_packets_count: 0,
tx_packets_count: 0,
tx_messages_count: 0,
peer_implementation: SctpImplementation::Unknown,
tx_error_counter: TxErrorCounter::new(Some(10)),
};
(state, context, events)
}
fn expect_sent_packet(events: &Rc<RefCell<Events>>, options: &Options) -> SctpPacket {
loop {
let event = events.borrow_mut().next_event().expect("expected event");
match event {
SocketEvent::SendPacket(packet) => {
return SctpPacket::from_bytes(&packet, options).expect("valid packet");
}
SocketEvent::OnBufferedAmountLow(_) => {
continue;
}
_ => {
panic!("Expected SendPacket, got {:?}", event);
}
}
}
}
fn expect_sent_reconfig_chunk(
events: &Rc<RefCell<Events>>,
options: &Options,
) -> ReConfigChunk {
let packet = expect_sent_packet(events, options);
packet
.chunks
.into_iter()
.find_map(|c| match c {
Chunk::ReConfig(r) => Some(r),
_ => None,
})
.expect("Expected ReConfig chunk")
}
fn expect_sent_reconfig_response(
events: &Rc<RefCell<Events>>,
options: &Options,
) -> ReconfigurationResponseParameter {
let chunk = expect_sent_reconfig_chunk(events, options);
chunk
.parameters
.into_iter()
.find_map(|p| match p {
Parameter::ReconfigurationResponse(r) => Some(r),
_ => None,
})
.expect("Expected ReconfigurationResponse")
}
fn expect_sent_reset_request(
events: &Rc<RefCell<Events>>,
options: &Options,
) -> OutgoingSsnResetRequestParameter {
let chunk = expect_sent_reconfig_chunk(events, options);
chunk
.parameters
.into_iter()
.find_map(|p| match p {
Parameter::OutgoingSsnResetRequest(r) => Some(r),
_ => None,
})
.expect("Expected OutgoingSsnResetRequest")
}
fn expect_incoming_stream_reset_event(
events: &Rc<RefCell<Events>>,
expected_streams: Vec<StreamId>,
) {
let event = events.borrow_mut().next_event().expect("expected event");
let SocketEvent::OnIncomingStreamReset(streams) = event else {
panic!("Expected OnIncomingStreamReset, got {:?}", event);
};
assert_eq!(streams, expected_streams);
}
#[test]
fn chunk_with_no_parameters_returns_error() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk { parameters: vec![] },
);
let event = events.borrow_mut().next_event().expect("expected event");
let SocketEvent::OnError(kind, msg) = event else {
panic!("Expected OnError, got {:?}", event);
};
assert_eq!(kind, ErrorKind::ProtocolViolation);
assert_eq!(msg, "RE-CONFIG chunk must have at least one parameter");
}
#[test]
fn chunk_with_invalid_parameters_returns_error() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
let param1 = Parameter::OutgoingSsnResetRequest(OutgoingSsnResetRequestParameter {
request_seq_nbr: 1,
response_seq_nbr: 10,
sender_last_assigned_tsn: Tsn(10),
streams: vec![StreamId(1)],
});
let param2 = Parameter::OutgoingSsnResetRequest(OutgoingSsnResetRequestParameter {
request_seq_nbr: 2,
response_seq_nbr: 10,
sender_last_assigned_tsn: Tsn(10),
streams: vec![StreamId(2)],
});
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk { parameters: vec![param1, param2] },
);
let event = events.borrow_mut().next_event().expect("expected event");
let SocketEvent::OnError(kind, msg) = event else {
panic!("Expected OnError, got {:?}", event);
};
assert_eq!(kind, ErrorKind::ProtocolViolation);
assert_eq!(
msg,
"RE-CONFIG chunk must not have multiple Outgoing SSN Reset Request parameters"
);
}
#[test]
fn fail_to_deliver_without_resetting_stream() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, _, _) = create_test_objects(my_initial_tsn, peer_initial_tsn);
let mut seq = DataSequencer::new(StreamId(1));
let tcb = state.tcb_mut().unwrap();
tcb.data_tracker.observe(SocketTime::zero(), Tsn(10), false);
tcb.reassembly_queue.add(Tsn(10), seq.ordered("1234", "BE"));
tcb.data_tracker.observe(SocketTime::zero(), Tsn(11), false);
tcb.reassembly_queue.add(Tsn(11), seq.ordered("2345", "BE"));
assert_eq!(tcb.reassembly_queue.get_next_message().unwrap().payload, b"1234");
assert_eq!(tcb.reassembly_queue.get_next_message().unwrap().payload, b"2345");
assert!(tcb.reassembly_queue.get_next_message().is_none());
let mut seq = DataSequencer::new(StreamId(1));
tcb.data_tracker.observe(SocketTime::zero(), Tsn(12), false);
tcb.reassembly_queue.add(Tsn(12), seq.ordered("3456", "BE"));
assert!(tcb.reassembly_queue.get_next_message().is_none());
}
#[test]
fn reset_streams_not_deferred() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
let mut seq = DataSequencer::new(StreamId(1));
let tcb = state.tcb_mut().unwrap();
tcb.data_tracker.observe(SocketTime::zero(), Tsn(10), false);
tcb.reassembly_queue.add(Tsn(10), seq.ordered("1234", "BE"));
tcb.data_tracker.observe(SocketTime::zero(), Tsn(11), false);
tcb.reassembly_queue.add(Tsn(11), seq.ordered("2345", "BE"));
assert_eq!(tcb.reassembly_queue.get_next_message().unwrap().payload, b"1234");
assert_eq!(tcb.reassembly_queue.get_next_message().unwrap().payload, b"2345");
assert!(tcb.reassembly_queue.get_next_message().is_none());
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 10,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(11),
streams: vec![StreamId(1)],
},
)],
},
);
expect_incoming_stream_reset_event(&events, vec![StreamId(1)]);
let response = expect_sent_reconfig_response(&events, &ctx.options);
assert_eq!(response.result, ReconfigurationResponseResult::SuccessPerformed);
let mut seq = DataSequencer::new(StreamId(1));
let tcb = state.tcb_mut().unwrap();
tcb.data_tracker.observe(SocketTime::zero(), Tsn(12), false);
tcb.reassembly_queue.add(Tsn(12), seq.ordered("3456", "BE"));
assert_eq!(tcb.reassembly_queue.get_next_message().unwrap().payload, b"3456");
}
#[test]
fn reset_streams_deferred() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
let mut seq = DataSequencer::new(StreamId(1));
let tcb = state.tcb_mut().unwrap();
tcb.data_tracker.observe(SocketTime::zero(), Tsn(10), false);
tcb.reassembly_queue.add(Tsn(10), seq.ordered("1234", "BE"));
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 10,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(11),
streams: vec![StreamId(1)],
},
)],
},
);
let response = expect_sent_reconfig_response(&events, &ctx.options);
assert_eq!(response.result, ReconfigurationResponseResult::InProgress);
while let Some(event) = events.borrow_mut().next_event() {
if let SocketEvent::OnIncomingStreamReset(_) = event {
panic!("Unexpected OnIncomingStreamReset event: {:?}", event);
}
}
let tcb = state.tcb_mut().unwrap();
tcb.data_tracker.observe(SocketTime::zero(), Tsn(11), false);
tcb.reassembly_queue.add(Tsn(11), seq.ordered("2345", "BE"));
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 10,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(11),
streams: vec![StreamId(1)],
},
)],
},
);
expect_incoming_stream_reset_event(&events, vec![StreamId(1)]);
let response = expect_sent_reconfig_response(&events, &ctx.options);
assert_eq!(response.result, ReconfigurationResponseResult::SuccessPerformed);
let mut seq = DataSequencer::new(StreamId(1));
let tcb = state.tcb_mut().unwrap();
tcb.data_tracker.observe(SocketTime::zero(), Tsn(12), false);
tcb.reassembly_queue.add(Tsn(12), seq.ordered("3456", "BE"));
assert_eq!(tcb.reassembly_queue.get_next_message().unwrap().payload, b"1234"); assert_eq!(tcb.reassembly_queue.get_next_message().unwrap().payload, b"2345"); assert_eq!(tcb.reassembly_queue.get_next_message().unwrap().payload, b"3456");
}
#[test]
fn reset_streams_deferred_only_selected_streams() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
let mut seq1 = DataSequencer::new(StreamId(1));
let mut seq2 = DataSequencer::new(StreamId(2));
let mut seq3 = DataSequencer::new(StreamId(3));
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 10,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(12),
streams: vec![StreamId(1), StreamId(2)],
},
)],
},
);
let response = expect_sent_reconfig_response(&events, &ctx.options);
assert_eq!(response.result, ReconfigurationResponseResult::InProgress);
let tcb = state.tcb_mut().unwrap();
tcb.data_tracker.observe(SocketTime::zero(), Tsn(10), false);
tcb.reassembly_queue.add(Tsn(10), seq1.ordered("1111", "BE"));
assert_eq!(tcb.reassembly_queue.get_next_message().unwrap().payload, b"1111");
tcb.data_tracker.observe(SocketTime::zero(), Tsn(11), false);
tcb.reassembly_queue.add(Tsn(11), seq2.ordered("2222", "BE"));
assert_eq!(tcb.reassembly_queue.get_next_message().unwrap().payload, b"2222");
tcb.data_tracker.observe(SocketTime::zero(), Tsn(12), false);
tcb.reassembly_queue.add(Tsn(12), seq3.ordered("3333", "BE"));
assert_eq!(tcb.reassembly_queue.get_next_message().unwrap().payload, b"3333");
let mut seq1 = DataSequencer::new(StreamId(1));
tcb.data_tracker.observe(SocketTime::zero(), Tsn(13), false);
tcb.reassembly_queue.add(Tsn(13), seq1.ordered("1-new", "BE"));
assert!(tcb.reassembly_queue.get_next_message().is_none());
let mut seq2 = DataSequencer::new(StreamId(2));
tcb.data_tracker.observe(SocketTime::zero(), Tsn(14), false);
tcb.reassembly_queue.add(Tsn(14), seq2.ordered("2-new", "BE"));
assert!(tcb.reassembly_queue.get_next_message().is_none());
tcb.data_tracker.observe(SocketTime::zero(), Tsn(15), false);
tcb.reassembly_queue.add(Tsn(15), seq3.ordered("4444", "BE"));
assert_eq!(tcb.reassembly_queue.get_next_message().unwrap().payload, b"4444");
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 11,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(12),
streams: vec![StreamId(1), StreamId(2)],
},
)],
},
);
expect_incoming_stream_reset_event(&events, vec![StreamId(1), StreamId(2)]);
let response = expect_sent_reconfig_response(&events, &ctx.options);
assert_eq!(response.result, ReconfigurationResponseResult::SuccessPerformed);
let tcb = state.tcb_mut().unwrap();
assert_eq!(tcb.reassembly_queue.get_next_message().unwrap().payload, b"1-new");
assert_eq!(tcb.reassembly_queue.get_next_message().unwrap().payload, b"2-new");
assert!(tcb.reassembly_queue.get_next_message().is_none());
}
#[test]
fn reset_streams_defers_forward_tsn() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
let mut seq = DataSequencer::new(StreamId(42));
let _tsn10 = seq.ordered("1234", "BE");
let _tsn11 = seq.ordered("2345", "BE");
let _tsn12 = seq.ordered("3456", "BE");
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 10,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(12),
streams: vec![StreamId(42)],
},
)],
},
);
assert_eq!(
expect_sent_reconfig_response(&events, &ctx.options).result,
ReconfigurationResponseResult::InProgress
);
let mut seq = DataSequencer::new(StreamId(42));
let tcb = state.tcb_mut().unwrap();
let tsn13 = seq.ordered("part1", "B");
tcb.data_tracker.observe(SocketTime::zero(), Tsn(13), false);
tcb.reassembly_queue.add(Tsn(13), tsn13);
assert!(tcb.reassembly_queue.get_next_message().is_none());
let _tsn14 = seq.ordered("part2", "E");
let tsn15 = seq.ordered("next", "BE");
tcb.data_tracker.observe(SocketTime::zero(), Tsn(15), false);
tcb.reassembly_queue.add(Tsn(15), tsn15);
assert!(tcb.reassembly_queue.get_next_message().is_none());
tcb.data_tracker.handle_forward_tsn(SocketTime::zero(), Tsn(12));
tcb.reassembly_queue
.handle_forward_tsn(Tsn(12), vec![SkippedStream::ForwardTsn(StreamId(42), Ssn(2))]);
tcb.data_tracker.handle_forward_tsn(SocketTime::zero(), Tsn(14));
tcb.reassembly_queue
.handle_forward_tsn(Tsn(14), vec![SkippedStream::ForwardTsn(StreamId(42), Ssn(0))]);
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 11,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(12),
streams: vec![StreamId(42)],
},
)],
},
);
expect_incoming_stream_reset_event(&events, vec![StreamId(42)]);
assert_eq!(
expect_sent_reconfig_response(&events, &ctx.options).result,
ReconfigurationResponseResult::SuccessPerformed
);
let tcb = state.tcb_mut().unwrap();
assert_eq!(tcb.reassembly_queue.get_next_message().unwrap().payload, b"next");
assert!(tcb.reassembly_queue.get_next_message().is_none());
}
#[test]
fn send_outgoing_request_directly() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
do_reset_streams(&mut state, &mut ctx, SocketTime::zero(), &[StreamId(1)]).unwrap();
let req = expect_sent_reset_request(&events, &ctx.options);
assert_eq!(req.streams, vec![StreamId(1)]);
assert_eq!(req.request_seq_nbr, 0); }
#[test]
fn reset_multiple_streams_in_one_request() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
do_reset_streams(&mut state, &mut ctx, SocketTime::zero(), &[StreamId(1), StreamId(3)])
.unwrap();
let req = expect_sent_reset_request(&events, &ctx.options);
let mut streams = req.streams.clone();
streams.sort();
assert_eq!(streams, vec![StreamId(1), StreamId(3)]);
}
#[test]
fn send_outgoing_request_deferred() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
let large_payload = vec![0u8; 2000];
ctx.send_queue.add(
SocketTime::zero(),
Message::new(StreamId(42), PpId(53), large_payload),
&SendOptions::default(),
);
let chunk = ctx.send_queue.produce(SocketTime::zero(), 1000);
assert!(chunk.is_some());
do_reset_streams(&mut state, &mut ctx, SocketTime::zero(), &[StreamId(42)]).unwrap();
while let Some(event) = events.borrow_mut().next_event() {
if let SocketEvent::SendPacket(packet) = event {
let packet = SctpPacket::from_bytes(&packet, &ctx.options).unwrap();
if packet.chunks.iter().any(|c| matches!(c, Chunk::ReConfig(_))) {
panic!("Unexpected ReConfig chunk - should be deferred");
}
}
}
while ctx.send_queue.produce(SocketTime::zero(), 1000).is_some() {}
ctx.send_buffered_packets(&mut state, SocketTime::zero());
let req = expect_sent_reset_request(&events, &ctx.options);
assert_eq!(req.streams, vec![StreamId(42)]);
}
#[test]
fn send_outgoing_resetting_on_positive_response() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
do_reset_streams(&mut state, &mut ctx, SocketTime::zero(), &[StreamId(1)]).unwrap();
let req = expect_sent_reset_request(&events, &ctx.options);
let req_seq_nbr = req.request_seq_nbr;
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::ReconfigurationResponse(
ReconfigurationResponseParameter {
response_seq_nbr: req_seq_nbr,
result: ReconfigurationResponseResult::SuccessPerformed,
sender_next_tsn: None,
receiver_next_tsn: None,
},
)],
},
);
let event = events.borrow_mut().next_event().expect("expected event");
let SocketEvent::OnStreamsResetPerformed(streams) = event else {
panic!("Unexpected event: {:?}", event);
};
assert_eq!(streams, vec![StreamId(1)]);
}
#[test]
fn send_outgoing_reset_rollback_on_error() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
do_reset_streams(&mut state, &mut ctx, SocketTime::zero(), &[StreamId(1)]).unwrap();
let req = expect_sent_reset_request(&events, &ctx.options);
let req_seq_nbr = req.request_seq_nbr;
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::ReconfigurationResponse(
ReconfigurationResponseParameter {
response_seq_nbr: req_seq_nbr,
result: ReconfigurationResponseResult::ErrorBadSequenceNumber,
sender_next_tsn: None,
receiver_next_tsn: None,
},
)],
},
);
let event = events.borrow_mut().next_event().expect("expected event");
let SocketEvent::OnStreamsResetFailed(streams) = event else {
panic!("Unexpected event: {:?}", event);
};
assert_eq!(streams, vec![StreamId(1)]);
}
#[test]
fn send_outgoing_reset_retransmit_on_in_progress() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
do_reset_streams(&mut state, &mut ctx, SocketTime::zero(), &[StreamId(1)]).unwrap();
let req = expect_sent_reset_request(&events, &ctx.options);
let req_seq_nbr = req.request_seq_nbr;
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::ReconfigurationResponse(
ReconfigurationResponseParameter {
response_seq_nbr: req_seq_nbr,
result: ReconfigurationResponseResult::InProgress,
sender_next_tsn: None,
receiver_next_tsn: None,
},
)],
},
);
while let Some(event) = events.borrow_mut().next_event() {
if let SocketEvent::SendPacket(packet) = event {
let packet = SctpPacket::from_bytes(&packet, &ctx.options).unwrap();
if packet.chunks.iter().any(|c| matches!(c, Chunk::ReConfig(_))) {
panic!("Unexpected ReConfig chunk");
}
}
}
let rto = state.tcb().unwrap().rto.rto();
let now = SocketTime::zero() + rto;
handle_reconfig_timeout(&mut state, &mut ctx, now);
let req = expect_sent_reset_request(&events, &ctx.options);
assert_eq!(req.streams, vec![StreamId(1)]);
assert_eq!(req.request_seq_nbr, req_seq_nbr);
}
#[test]
fn reset_while_request_is_sent_will_queue() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
do_reset_streams(&mut state, &mut ctx, SocketTime::zero(), &[StreamId(1)]).unwrap();
let req = expect_sent_reset_request(&events, &ctx.options);
assert_eq!(req.streams, vec![StreamId(1)]);
let req_seq_nbr = req.request_seq_nbr;
do_reset_streams(&mut state, &mut ctx, SocketTime::zero(), &[StreamId(2), StreamId(3)])
.unwrap();
ctx.send_buffered_packets(&mut state, SocketTime::zero());
while let Some(event) = events.borrow_mut().next_event() {
if let SocketEvent::SendPacket(packet) = event {
let packet = SctpPacket::from_bytes(&packet, &ctx.options).unwrap();
if packet.chunks.iter().any(|c| matches!(c, Chunk::ReConfig(_))) {
panic!("Unexpected ReConfig chunk");
}
}
}
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::ReconfigurationResponse(
ReconfigurationResponseParameter {
response_seq_nbr: req_seq_nbr,
result: ReconfigurationResponseResult::SuccessPerformed,
sender_next_tsn: None,
receiver_next_tsn: None,
},
)],
},
);
let event = events.borrow_mut().next_event().expect("expected event");
let SocketEvent::OnStreamsResetPerformed(streams) = event else {
panic!("Unexpected event: {:?}", event);
};
assert_eq!(streams, vec![StreamId(1)]);
let req = expect_sent_reset_request(&events, &ctx.options);
let mut streams = req.streams.clone();
streams.sort();
assert_eq!(streams, vec![StreamId(2), StreamId(3)]);
assert_eq!(req.request_seq_nbr, req_seq_nbr.wrapping_add(1));
}
#[test]
fn send_incoming_reset_just_returns_nothing_performed() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::IncomingSsnResetRequest(
IncomingSsnResetRequestParameter {
request_seq_nbr: 10,
streams: vec![StreamId(1)],
},
)],
},
);
let resp = expect_sent_reconfig_response(&events, &ctx.options);
assert_eq!(resp.response_seq_nbr, 10);
assert_eq!(resp.result, ReconfigurationResponseResult::SuccessNothingToDo);
}
#[test]
fn send_outgoing_reset_retransmit_on_in_progress_does_not_increment_error_counter() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
do_reset_streams(&mut state, &mut ctx, SocketTime::zero(), &[StreamId(42)]).unwrap();
let req = expect_sent_reset_request(&events, &ctx.options);
let req_seq_nbr = req.request_seq_nbr;
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::ReconfigurationResponse(
ReconfigurationResponseParameter {
response_seq_nbr: req_seq_nbr,
result: ReconfigurationResponseResult::InProgress,
sender_next_tsn: None,
receiver_next_tsn: None,
},
)],
},
);
assert_eq!(ctx.tx_error_counter.value(), 0);
let rto = state.tcb().unwrap().rto.rto();
let now = SocketTime::zero() + rto;
handle_reconfig_timeout(&mut state, &mut ctx, now);
assert_eq!(ctx.tx_error_counter.value(), 0);
let req = expect_sent_reset_request(&events, &ctx.options);
assert_eq!(req.request_seq_nbr, req_seq_nbr);
let now = now + rto;
handle_reconfig_timeout(&mut state, &mut ctx, now);
assert_eq!(ctx.tx_error_counter.value(), 1);
}
#[test]
fn send_same_request_twice_is_idempotent() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
for _ in 0..2 {
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 10,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(11),
streams: vec![StreamId(1)],
},
)],
},
);
assert_eq!(
expect_sent_reconfig_response(&events, &ctx.options).result,
ReconfigurationResponseResult::InProgress
);
}
}
#[test]
fn perform_close_after_one_first_failing() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 10,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(10), streams: vec![StreamId(1)],
},
)],
},
);
let response = expect_sent_reconfig_response(&events, &ctx.options);
assert_eq!(response.result, ReconfigurationResponseResult::InProgress);
let mut seq = DataSequencer::new(StreamId(1));
let tcb = state.tcb_mut().unwrap();
tcb.reassembly_queue.add(Tsn(10), seq.ordered("1234", "BE"));
tcb.data_tracker.observe(SocketTime::zero(), Tsn(10), false);
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 11,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(10),
streams: vec![StreamId(1)],
},
)],
},
);
expect_incoming_stream_reset_event(&events, vec![StreamId(1)]);
assert_eq!(
expect_sent_reconfig_response(&events, &ctx.options).result,
ReconfigurationResponseResult::SuccessPerformed
);
}
#[test]
fn reset_streams_deferred_retransmission_with_same_seq_num_success() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
let mut seq = DataSequencer::new(StreamId(1));
let tcb = state.tcb_mut().unwrap();
tcb.data_tracker.observe(SocketTime::zero(), Tsn(10), false);
tcb.reassembly_queue.add(Tsn(10), seq.ordered("10", "BE"));
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 10,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(12),
streams: vec![StreamId(1)],
},
)],
},
);
let response = expect_sent_reconfig_response(&events, &ctx.options);
assert_eq!(response.result, ReconfigurationResponseResult::InProgress);
let tcb = state.tcb_mut().unwrap();
tcb.data_tracker.observe(SocketTime::zero(), Tsn(11), false);
tcb.reassembly_queue.add(Tsn(11), seq.ordered("11", "BE"));
tcb.data_tracker.observe(SocketTime::zero(), Tsn(12), false);
tcb.reassembly_queue.add(Tsn(12), seq.ordered("12", "BE"));
while events.borrow_mut().next_event().is_some() {}
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 10,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(12),
streams: vec![StreamId(1)],
},
)],
},
);
expect_incoming_stream_reset_event(&events, vec![StreamId(1)]);
let response = expect_sent_reconfig_response(&events, &ctx.options);
assert_eq!(response.result, ReconfigurationResponseResult::SuccessPerformed);
}
#[test]
fn reset_streams_deferred_with_new_seq_num_success() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
let mut seq = DataSequencer::new(StreamId(1));
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 10,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(11),
streams: vec![StreamId(1)],
},
)],
},
);
assert_eq!(
expect_sent_reconfig_response(&events, &ctx.options).result,
ReconfigurationResponseResult::InProgress
);
let tcb = state.tcb_mut().unwrap();
tcb.data_tracker.observe(SocketTime::zero(), Tsn(10), false);
tcb.reassembly_queue.add(Tsn(10), seq.ordered("10", "BE"));
tcb.data_tracker.observe(SocketTime::zero(), Tsn(11), false);
tcb.reassembly_queue.add(Tsn(11), seq.ordered("11", "BE"));
while events.borrow_mut().next_event().is_some() {}
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 11,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(11),
streams: vec![StreamId(1)],
},
)],
},
);
expect_incoming_stream_reset_event(&events, vec![StreamId(1)]);
assert_eq!(
expect_sent_reconfig_response(&events, &ctx.options).result,
ReconfigurationResponseResult::SuccessPerformed
);
}
#[test]
fn reset_streams_deferred_retransmission_still_in_progress() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 10,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(11),
streams: vec![StreamId(1)],
},
)],
},
);
assert_eq!(
expect_sent_reconfig_response(&events, &ctx.options).result,
ReconfigurationResponseResult::InProgress
);
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 10,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(11),
streams: vec![StreamId(1)],
},
)],
},
);
assert_eq!(
expect_sent_reconfig_response(&events, &ctx.options).result,
ReconfigurationResponseResult::InProgress
);
}
#[test]
fn reset_streams_success_idempotency() {
let my_initial_tsn = Tsn(0);
let peer_initial_tsn = Tsn(10);
let (mut state, mut ctx, events) = create_test_objects(my_initial_tsn, peer_initial_tsn);
let mut seq = DataSequencer::new(StreamId(1));
let tcb = state.tcb_mut().unwrap();
tcb.data_tracker.observe(SocketTime::zero(), Tsn(10), false);
tcb.reassembly_queue.add(Tsn(10), seq.ordered("10", "BE"));
tcb.data_tracker.observe(SocketTime::zero(), Tsn(11), false);
tcb.reassembly_queue.add(Tsn(11), seq.ordered("11", "BE"));
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 10,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(11),
streams: vec![StreamId(1)],
},
)],
},
);
expect_incoming_stream_reset_event(&events, vec![StreamId(1)]);
assert_eq!(
expect_sent_reconfig_response(&events, &ctx.options).result,
ReconfigurationResponseResult::SuccessPerformed
);
while events.borrow_mut().next_event().is_some() {}
handle_reconfig(
&mut state,
&mut ctx,
SocketTime::zero(),
ReConfigChunk {
parameters: vec![Parameter::OutgoingSsnResetRequest(
OutgoingSsnResetRequestParameter {
request_seq_nbr: 10,
response_seq_nbr: 3,
sender_last_assigned_tsn: Tsn(11),
streams: vec![StreamId(1)],
},
)],
},
);
assert_eq!(
expect_sent_reconfig_response(&events, &ctx.options).result,
ReconfigurationResponseResult::SuccessPerformed
);
while let Some(event) = events.borrow_mut().next_event() {
if let SocketEvent::OnIncomingStreamReset(_) = event {
panic!("Unexpected OnIncomingStreamReset event: {:?}", event);
}
}
}
}