use crate::api::handover::HandoverReadiness;
use crate::api::handover::SocketHandoverState;
use std::fmt;
use std::num::NonZeroU64;
use std::ops::Add;
use std::ops::Sub;
use std::time::Duration;
use thiserror::Error;
pub mod handover;
pub use crate::socket::Socket;
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct SocketTime(Duration);
impl SocketTime {
pub const fn zero() -> SocketTime {
SocketTime(Duration::ZERO)
}
pub const fn infinite_future() -> SocketTime {
SocketTime(Duration::MAX)
}
}
impl Add<Duration> for SocketTime {
type Output = SocketTime;
fn add(self, rhs: Duration) -> SocketTime {
SocketTime(self.0 + rhs)
}
}
impl Sub<Duration> for SocketTime {
type Output = SocketTime;
fn sub(self, rhs: Duration) -> SocketTime {
SocketTime(self.0.saturating_sub(rhs))
}
}
impl Sub<SocketTime> for SocketTime {
type Output = Duration;
fn sub(self, rhs: SocketTime) -> Duration {
self.0.saturating_sub(rhs.0)
}
}
#[derive(Clone)]
pub struct LifecycleId(NonZeroU64);
impl PartialEq for LifecycleId {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}
}
impl Eq for LifecycleId {}
impl fmt::Debug for LifecycleId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self, f)
}
}
impl fmt::Display for LifecycleId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl LifecycleId {
pub fn new(n: u64) -> Option<LifecycleId> {
NonZeroU64::new(n).map(LifecycleId)
}
pub fn from(n: u64) -> LifecycleId {
debug_assert!(n != 0);
LifecycleId(NonZeroU64::new(n).unwrap())
}
pub fn value(&self) -> u64 {
self.0.into()
}
}
#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct StreamId(pub u16);
impl fmt::Debug for StreamId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self, f)
}
}
impl fmt::Display for StreamId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Clone, Copy, Eq, Hash, PartialEq)]
pub struct PpId(pub u32);
impl fmt::Debug for PpId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self, f)
}
}
impl fmt::Display for PpId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Default)]
pub struct SendOptions {
pub unordered: bool,
pub lifetime: Option<Duration>,
pub max_retransmissions: Option<u16>,
pub lifecycle_id: Option<LifecycleId>,
}
#[derive(Debug)]
pub struct Message {
pub stream_id: StreamId,
pub ppid: PpId,
pub payload: Vec<u8>,
}
impl Message {
pub fn new(stream_id: StreamId, ppid: PpId, payload: Vec<u8>) -> Self {
Message { stream_id, ppid, payload }
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct ZeroChecksumAlternateErrorDetectionMethod(pub u32);
pub const ZERO_CHECKSUM_ALTERNATE_ERROR_DETECTION_METHOD_NONE:
ZeroChecksumAlternateErrorDetectionMethod = ZeroChecksumAlternateErrorDetectionMethod(0);
pub const ZERO_CHECKSUM_ALTERNATE_ERROR_DETECTION_METHOD_LOWER_LAYER_DTLS:
ZeroChecksumAlternateErrorDetectionMethod = ZeroChecksumAlternateErrorDetectionMethod(1);
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SctpImplementation {
Unknown,
DcsctpRs,
DcsctpCc,
UsrSctp,
Other,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ErrorKind {
NoError,
TooManyRetries,
NotConnected,
ParseFailed,
WrongSequence,
PeerReported,
ProtocolViolation,
ResourceExhaustion,
UnsupportedOperation,
}
#[derive(Clone)]
pub struct Options {
pub local_port: u16,
pub remote_port: u16,
pub announced_maximum_incoming_streams: u16,
pub announced_maximum_outgoing_streams: u16,
pub mtu: usize,
pub max_message_size: usize,
pub default_stream_priority: u16,
pub max_receiver_window_buffer_size: usize,
pub max_send_buffer_size: usize,
pub per_stream_send_queue_limit: usize,
pub total_buffered_amount_low_threshold: usize,
pub default_stream_buffered_amount_low_threshold: usize,
pub rtt_max: Duration,
pub rto_initial: Duration,
pub rto_max: Duration,
pub rto_min: Duration,
pub t1_init_timeout: Duration,
pub t1_cookie_timeout: Duration,
pub t2_shutdown_timeout: Duration,
pub max_timer_backoff_duration: Option<Duration>,
pub heartbeat_interval: Duration,
pub delayed_ack_max_timeout: Duration,
pub min_rtt_variance: Duration,
pub cwnd_mtus_initial: usize,
pub cwnd_mtus_min: usize,
pub avoid_fragmentation_cwnd_mtus: usize,
pub max_burst: i32,
pub max_retransmissions: Option<u32>,
pub max_init_retransmits: Option<u32>,
pub enable_partial_reliability: bool,
pub enable_message_interleaving: bool,
pub heartbeat_interval_include_rtt: bool,
pub zero_checksum_alternate_error_detection_method: ZeroChecksumAlternateErrorDetectionMethod,
pub disable_checksum_verification: bool,
}
impl Default for Options {
fn default() -> Self {
Options {
local_port: 5000,
remote_port: 5000,
announced_maximum_incoming_streams: u16::MAX,
announced_maximum_outgoing_streams: u16::MAX,
mtu: 1191,
max_message_size: 256 * 1024,
default_stream_priority: 256,
max_receiver_window_buffer_size: 5 * 1024 * 1024,
max_send_buffer_size: 2_000_000,
per_stream_send_queue_limit: 2_000_000,
total_buffered_amount_low_threshold: 1_800_000,
default_stream_buffered_amount_low_threshold: 0,
rtt_max: Duration::from_secs(60),
rto_initial: Duration::from_millis(500),
rto_max: Duration::from_secs(60),
rto_min: Duration::from_millis(400),
t1_init_timeout: Duration::from_secs(1),
t1_cookie_timeout: Duration::from_secs(1),
t2_shutdown_timeout: Duration::from_secs(1),
max_timer_backoff_duration: None,
heartbeat_interval: Duration::from_secs(30),
delayed_ack_max_timeout: Duration::from_millis(200),
min_rtt_variance: Duration::from_millis(220),
cwnd_mtus_initial: 10,
cwnd_mtus_min: 4,
avoid_fragmentation_cwnd_mtus: 6,
max_burst: 4,
max_retransmissions: Some(10),
max_init_retransmits: Some(8),
enable_partial_reliability: true,
enable_message_interleaving: false,
heartbeat_interval_include_rtt: true,
disable_checksum_verification: false,
zero_checksum_alternate_error_detection_method:
ZERO_CHECKSUM_ALTERNATE_ERROR_DETECTION_METHOD_NONE,
}
}
}
#[derive(Debug)]
pub enum SocketEvent {
SendPacket(Vec<u8>),
OnConnected(),
OnClosed(),
OnConnectionRestarted(),
OnAborted(ErrorKind, String),
OnError(ErrorKind, String),
OnBufferedAmountLow(StreamId),
OnTotalBufferedAmountLow(),
OnStreamsResetFailed(Vec<StreamId>),
OnStreamsResetPerformed(Vec<StreamId>),
OnIncomingStreamReset(Vec<StreamId>),
OnLifecycleMessageFullySent(LifecycleId),
OnLifecycleMessageMaybeExpired(LifecycleId),
OnLifecycleMessageExpired(LifecycleId),
OnLifecycleMessageDelivered(LifecycleId),
OnLifecycleEnd(LifecycleId),
}
#[derive(Debug, PartialEq)]
pub enum SocketState {
Closed,
Connecting,
Connected,
ShuttingDown,
}
#[derive(Debug, Error, PartialEq)]
pub enum SendError {
#[error("message payload cannot be empty")]
EmptyPayload,
#[error("message size ({len}) exceeds configured max_message_size ({limit})")]
MessageTooLarge { len: usize, limit: usize },
#[error("send queue is full")]
ResourceExhaustion,
#[error("socket is shutting down or closed")]
ShuttingDown,
}
#[derive(Debug, Error)]
#[error("batch send failed with errors: {0:?}")]
pub struct BatchSendError(pub Vec<(usize, SendError)>);
#[derive(Debug, Error, PartialEq)]
pub enum ResetStreamsError {
#[error("socket is not connected")]
NotConnected,
#[error("peer does not support stream resetting")]
NotSupported,
}
#[derive(Debug, Error, PartialEq)]
pub enum HandoverError {
#[error("socket is not in a ready state for handover")]
NotReady(HandoverReadiness),
}
#[derive(Debug, Error, PartialEq)]
pub enum RestoreError {
#[error("cannot restore state: socket is not closed")]
SocketNotClosed,
}
pub struct Metrics {
pub tx_packets_count: usize,
pub tx_messages_count: usize,
pub rtx_packets_count: usize,
pub rtx_bytes_count: u64,
pub cwnd_bytes: usize,
pub srtt: Duration,
pub unack_data_count: usize,
pub rx_packets_count: usize,
pub rx_messages_count: usize,
pub peer_rwnd_bytes: u32,
pub peer_implementation: SctpImplementation,
pub uses_message_interleaving: bool,
pub uses_zero_checksum: bool,
pub negotiated_maximum_incoming_streams: u16,
pub negotiated_maximum_outgoing_streams: u16,
}
pub trait DcSctpSocket {
fn poll_event(&mut self) -> Option<SocketEvent>;
fn get_next_message(&mut self) -> Option<Message>;
fn handle_input(&mut self, packet: &[u8]);
fn advance_time(&mut self, now: SocketTime);
fn poll_timeout(&self) -> SocketTime;
fn connect(&mut self);
fn restore_from_state(&mut self, state: &SocketHandoverState) -> Result<(), RestoreError>;
fn shutdown(&mut self);
fn close(&mut self);
fn state(&self) -> SocketState;
fn messages_ready_count(&self) -> usize;
fn options(&self) -> Options;
fn set_max_message_size(&mut self, max_message_size: usize);
fn set_stream_priority(&mut self, stream_id: StreamId, priority: u16);
fn get_stream_priority(&self, stream_id: StreamId) -> u16;
fn send(&mut self, message: Message, send_options: &SendOptions) -> Result<(), SendError>;
fn send_many(
&mut self,
messages: Vec<Message>,
send_options: &SendOptions,
) -> Result<(), BatchSendError>;
fn reset_streams(&mut self, outgoing_streams: &[StreamId]) -> Result<(), ResetStreamsError>;
fn buffered_amount(&self, stream_id: StreamId) -> usize;
fn buffered_amount_low_threshold(&self, stream_id: StreamId) -> usize;
fn set_buffered_amount_low_threshold(&mut self, stream_id: StreamId, bytes: usize);
fn get_metrics(&self) -> Option<Metrics>;
fn get_handover_readiness(&self) -> HandoverReadiness;
fn get_handover_state_and_close(&mut self) -> Result<SocketHandoverState, HandoverError>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_options() {
let options: Options = Options::default();
assert_eq!(options.local_port, 5000);
assert_eq!(options.remote_port, 5000);
}
}