use crate::EventSink;
use crate::api::ErrorKind;
use crate::api::Options;
use crate::api::SctpImplementation;
use crate::api::SocketEvent;
use crate::api::SocketTime;
use crate::packet::chunk::Chunk;
use crate::packet::sctp_packet::SctpPacketBuilder;
use crate::socket::State;
use crate::socket::transmission_control_block::CurrentResetRequest;
use crate::timer::Timer;
use crate::tx::send_queue::SendQueue;
use std::cell::RefCell;
use std::cmp::min;
use std::rc::Rc;
use std::time::Duration;
pub(crate) struct TxErrorCounter {
error_counter: u32,
limit: Option<u32>,
}
impl TxErrorCounter {
pub fn new(limit: Option<u32>) -> Self {
Self { error_counter: 0, limit }
}
pub fn increment(&mut self) {
match self.limit {
Some(limit) if self.error_counter <= limit => {
self.error_counter += 1;
}
_ => {}
}
}
pub fn reset(&mut self) {
self.error_counter = 0;
}
pub fn is_exhausted(&self) -> bool {
if let Some(limit) = self.limit { self.error_counter > limit } else { false }
}
pub fn value(&self) -> u32 {
self.error_counter
}
}
pub(crate) struct Context {
pub options: Options,
pub events: Rc<RefCell<dyn EventSink>>,
pub send_queue: SendQueue,
pub limit_forward_tsn_until: SocketTime,
pub heartbeat_interval: Timer,
pub heartbeat_timeout: Timer,
pub heartbeat_counter: u32,
pub heartbeat_sent_time: SocketTime,
pub rx_packets_count: usize,
pub tx_packets_count: usize,
pub tx_messages_count: usize,
pub peer_implementation: SctpImplementation,
pub tx_error_counter: TxErrorCounter,
}
impl Context {
pub fn send_buffered_packets(&mut self, state: &mut State, now: SocketTime) {
if let Some(tcb) = &state.tcb_mut() {
let mut packet = tcb.new_packet();
self.send_buffered_packets_with(state, now, &mut packet);
}
}
pub fn send_buffered_packets_with(
&mut self,
state: &mut State,
now: SocketTime,
builder: &mut SctpPacketBuilder,
) {
for packet_idx in 0..self.options.max_burst {
if let Some(tcb) = state.tcb_mut() {
if packet_idx == 0 {
let also_if_delayed = self.send_queue.has_data_to_send()
|| tcb.retransmission_queue.can_send_data();
if tcb.data_tracker.should_send_ack(now, also_if_delayed) {
builder.add(
&Chunk::Sack(tcb.data_tracker.create_selective_ack(
tcb.reassembly_queue.remaining_bytes() as u32,
)),
);
}
if now >= self.limit_forward_tsn_until
&& tcb.retransmission_queue.should_send_forward_tsn(now)
{
builder.add(&tcb.retransmission_queue.create_forward_tsn());
self.limit_forward_tsn_until =
now + min(Duration::from_millis(200), tcb.rto.srtt());
}
if matches!(tcb.current_reset_request, CurrentResetRequest::None)
&& self.send_queue.has_streams_ready_to_be_reset()
{
tcb.start_ssn_reset_request(
now,
self.send_queue.get_streams_ready_to_reset(),
builder,
);
}
}
let chunks = tcb.retransmission_queue.get_chunks_to_send(
now,
builder.bytes_remaining(),
|max_size, discard| {
for (stream_id, message_id) in discard {
self.send_queue.discard(*stream_id, *message_id);
}
self.send_queue.produce(now, max_size)
},
);
if !chunks.is_empty() {
self.heartbeat_interval.start(now);
}
for (tsn, data) in chunks {
builder.add(&tcb.make_data_chunk(tsn, data));
}
}
if builder.is_empty() {
break;
}
self.events.borrow_mut().add(SocketEvent::SendPacket(builder.build()));
self.tx_packets_count += 1;
if matches!(state, State::CookieEchoed(_)) {
return;
}
}
}
pub fn internal_close(&mut self, state: &mut State, error: ErrorKind, message: String) {
if !matches!(state, State::Closed) {
self.heartbeat_interval.stop();
self.heartbeat_timeout.stop();
if error == ErrorKind::NoError {
self.events.borrow_mut().add(SocketEvent::OnClosed());
} else {
self.events.borrow_mut().add(SocketEvent::OnAborted(error, message));
}
*state = State::Closed;
}
}
}