#![allow(non_snake_case)]
#![allow(non_camel_case_types)]
use crate::epoll::EpollFlags;
use crate::errors::{SRT_ERRNO, SRT_ERRNO::*};
use crate::socket::{CSrtSocket, SocketData};
use std::mem;
use std::{
cell::RefCell,
cmp::min,
collections::BTreeMap,
ffi::CString,
fmt::{Debug, Display},
io::Write,
mem::{size_of, take, MaybeUninit},
os::raw::{c_char, c_int},
ptr::{self, NonNull},
slice::{from_raw_parts, from_raw_parts_mut},
sync::{
atomic::{AtomicI32, Ordering},
Arc, Mutex, RwLock,
},
time::{Duration, Instant},
};
use srt_tokio::bind_socket;
use tokio::{
runtime::{self, Runtime},
time::timeout,
};
use bytes::Bytes;
use futures::StreamExt;
use lazy_static::lazy_static;
use log::error;
use os_socketaddr::OsSocketAddr;
use srt_protocol::options::SrtVersion;
use crate::epoll::SrtEpoll;
pub type SYSSOCKET = c_int;
pub type SRTSOCKET = CSrtSocket;
pub type srt_listen_callback_fn = extern "C" fn(
opaq: *mut (),
ns: SRTSOCKET,
c_int,
peeraddr: *const libc::sockaddr,
streamid: *const c_char,
) -> c_int;
pub struct SrtError {
errno: SRT_ERRNO,
context: String,
}
impl SrtError {
pub fn new(errno: SRT_ERRNO, context: impl Display) -> SrtError {
SrtError {
errno,
context: context.to_string(),
}
}
}
impl From<SRT_ERRNO> for SrtError {
fn from(e: SRT_ERRNO) -> Self {
SrtError::new(e, "")
}
}
#[repr(C)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SRT_SOCKOPT {
SRTO_MSS = 0, SRTO_SNDSYN = 1, SRTO_RCVSYN = 2, SRTO_ISN = 3, SRTO_FC = 4, SRTO_SNDBUF = 5, SRTO_RCVBUF = 6, SRTO_LINGER = 7, SRTO_UDP_SNDBUF = 8, SRTO_UDP_RCVBUF = 9, SRTO_RENDEZVOUS = 12, SRTO_SNDTIMEO = 13, SRTO_RCVTIMEO = 14, SRTO_REUSEADDR = 15, SRTO_MAXBW = 16, SRTO_STATE = 17, SRTO_EVENT = 18, SRTO_SNDDATA = 19, SRTO_RCVDATA = 20, SRTO_SENDER = 21, SRTO_TSBPDMODE = 22, SRTO_LATENCY = 23, SRTO_INPUTBW = 24, SRTO_OHEADBW, SRTO_PASSPHRASE = 26, SRTO_PBKEYLEN, SRTO_KMSTATE, SRTO_IPTTL = 29, SRTO_IPTOS, SRTO_TLPKTDROP = 31, SRTO_SNDDROPDELAY = 32, SRTO_NAKREPORT = 33, SRTO_VERSION = 34, SRTO_PEERVERSION, SRTO_CONNTIMEO = 36, SRTO_DRIFTTRACER = 37, SRTO_MININPUTBW = 38, SRTO_SNDKMSTATE = 40, SRTO_RCVKMSTATE, SRTO_LOSSMAXTTL, SRTO_RCVLATENCY, SRTO_PEERLATENCY, SRTO_MINVERSION, SRTO_STREAMID, SRTO_CONGESTION, SRTO_MESSAGEAPI, SRTO_PAYLOADSIZE, SRTO_TRANSTYPE = 50, SRTO_KMREFRESHRATE, SRTO_KMPREANNOUNCE, SRTO_ENFORCEDENCRYPTION, SRTO_IPV6ONLY, SRTO_PEERIDLETIMEO, SRTO_BINDTODEVICE, SRTO_PACKETFILTER = 60, SRTO_RETRANSMITALGO = 61,
SRTO_E_SIZE, }
#[repr(C)]
pub struct SRT_TRACEBSTATS {
msTimeStamp: i64, pktSentTotal: i64, pktRecvTotal: i64, pktSndLossTotal: c_int, pktRcvLossTotal: c_int, pktRetransTotal: c_int, pktSentACKTotal: c_int, pktRecvACKTotal: c_int, pktSentNAKTotal: c_int, pktRecvNAKTotal: c_int, usSndDurationTotal: i64, pktSndDropTotal: c_int, pktRcvDropTotal: c_int, pktRcvUndecryptTotal: c_int, byteSentTotal: u64, byteRecvTotal: u64, byteRcvLossTotal: u64, byteRetransTotal: u64, byteSndDropTotal: u64, byteRcvDropTotal: u64, byteRcvUndecryptTotal: u64,
pktSent: i64, pktRecv: i64, pktSndLoss: c_int, pktRcvLoss: c_int, pktRetrans: c_int, pktRcvRetrans: c_int, pktSentACK: c_int, pktRecvACK: c_int, pktSentNAK: c_int, pktRecvNAK: c_int, mbpsSendRate: f64, mbpsRecvRate: f64, usSndDuration: i64, pktReorderDistance: c_int, pktRcvAvgBelatedTime: f64, pktRcvBelated: i64, pktSndDrop: c_int, pktRcvDrop: c_int, pktRcvUndecrypt: c_int, byteSent: u64, byteRecv: u64, byteRcvLoss: u64, byteRetrans: u64, byteSndDrop: u64, byteRcvDrop: u64, byteRcvUndecrypt: u64,
usPktSndPeriod: f64, pktFlowWindow: c_int, pktCongestionWindow: c_int, pktFlightSize: c_int, msRTT: f64, mbpsBandwidth: f64, byteAvailSndBuf: c_int, byteAvailRcvBuf: c_int, mbpsMaxBW: f64, byteMSS: c_int,
pktSndBuf: c_int, byteSndBuf: c_int, msSndBuf: c_int, msSndTsbPdDelay: c_int,
pktRcvBuf: c_int, byteRcvBuf: c_int, msRcvBuf: c_int, msRcvTsbPdDelay: c_int,
pktSndFilterExtraTotal: c_int, pktRcvFilterExtraTotal: c_int, pktRcvFilterSupplyTotal: c_int, pktRcvFilterLossTotal: c_int,
pktSndFilterExtra: c_int, pktRcvFilterExtra: c_int, pktRcvFilterSupply: c_int, pktRcvFilterLoss: c_int, pktReorderTolerance: c_int,
pktSentUniqueTotal: i64, pktRecvUniqueTotal: i64, byteSentUniqueTotal: u64, byteRecvUniqueTotal: u64,
pktSentUnique: i64, pktRecvUnique: i64, byteSentUnique: u64, byteRecvUnique: u64, }
#[repr(C)]
#[repr(C)]
pub struct SRT_MSGCTRL {
flags: c_int,
msgttl: c_int,
inorder: c_int,
boundary: c_int,
srctime: i64,
pktseq: i32,
msgno: i32,
grpdata: *const (),
grpdata_size: usize,
}
unsafe impl Send for SRT_MSGCTRL {}
unsafe impl Sync for SRT_MSGCTRL {}
#[repr(C)]
pub enum SRT_TRANSTYPE {
SRTT_LIVE,
SRTT_FILE,
SRTT_INVALID,
}
pub const LOG_EMERG: c_int = 0;
pub const LOG_ALERT: c_int = 1;
pub const LOG_CRIT: c_int = 2;
pub const LOG_ERR: c_int = 3;
pub const LOG_WARNING: c_int = 4;
pub const LOG_NOTICE: c_int = 5;
pub const LOG_INFO: c_int = 6;
pub const LOG_DEBUG: c_int = 7;
const SRT_SUCCESS: c_int = 0;
pub const SRT_ERROR: c_int = -1;
pub const SRT_LIVE_DEF_PLSIZE: c_int = 1316;
pub const SRT_INVALID_SOCK: CSrtSocket = CSrtSocket::INVALID;
pub const SRT_SYNC_CLOCK_STDCXX_STEADY: c_int = 0;
pub const SRT_SYNC_CLOCK_GETTIME_MONOTONIC: c_int = 1;
pub const SRT_SYNC_CLOCK_WINQPC: c_int = 2;
pub const SRT_SYNC_CLOCK_MACH_ABSTIME: c_int = 3;
pub const SRT_SYNC_CLOCK_POSIX_GETTIMEOFDAY: c_int = 4;
pub const SRT_SYNC_CLOCK_AMD64_RDTSC: c_int = 5;
pub const SRT_SYNC_CLOCK_IA32_RDTSC: c_int = 6;
pub const SRT_SYNC_CLOCK_IA64_ITC: c_int = 7;
lazy_static! {
pub static ref TOKIO_RUNTIME: Runtime = {
let rt = runtime::Builder::new_multi_thread().worker_threads(3).enable_all().build().unwrap();
#[cfg(feature = "console-subscriber")]
console_subscriber::init();
rt
};
static ref SOCKETS: RwLock<BTreeMap<CSrtSocket, Arc<Mutex<SocketData>>>> =
RwLock::new(BTreeMap::new());
static ref BASE_TIME: Instant = Instant::now();
}
pub fn get_sock(sock: CSrtSocket) -> Option<Arc<Mutex<SocketData>>> {
SOCKETS.read().unwrap().get(&sock).cloned()
}
#[no_mangle]
pub extern "C" fn srt_startup() -> c_int {
lazy_static::initialize(&TOKIO_RUNTIME);
lazy_static::initialize(&SOCKETS);
lazy_static::initialize(&BASE_TIME);
let _ = pretty_env_logger::try_init();
SRT_SUCCESS
}
#[no_mangle]
pub extern "C" fn srt_cleanup() -> c_int {
SOCKETS.write().unwrap().clear();
SRT_SUCCESS
}
#[no_mangle]
pub extern "C" fn srt_getversion() -> u32 {
SrtVersion::CURRENT.to_u32()
}
#[no_mangle]
pub extern "C" fn srt_clock_type() -> c_int {
SRT_SYNC_CLOCK_GETTIME_MONOTONIC
}
#[no_mangle]
pub extern "C" fn srt_bind(
sock: SRTSOCKET,
name: Option<&libc::sockaddr>,
namelen: c_int,
) -> c_int {
let name = match name {
Some(name) => name,
None => return set_error(SrtError::new(SRT_EINVPARAM, "Invalid socket address")),
};
let name = unsafe { OsSocketAddr::copy_from_raw(name, namelen.try_into().unwrap()) };
let name = match name.into_addr() {
Some(name) => name,
None => return set_error(SRT_EINVPARAM.into()),
};
let sock = match get_sock(sock) {
None => return set_error(SRT_EINVSOCK.into()),
Some(sock) => sock,
};
let (mut opts, sid, aopts) = {
let mut l = sock.lock().unwrap();
match mem::replace(&mut *l, SocketData::InvalidIntermediateState) {
SocketData::Initialized(opts, sid, aopts) => (opts, sid, aopts),
other => {
*l = other;
return set_error(SRT_ECONNSOCK.into());
}
}
};
opts.connect.local = name;
let socket = TOKIO_RUNTIME.block_on(async {
bind_socket(&opts)
.await
.map_err(|e| SrtError::new(SRT_EBINDCONFLICT, format!("{}", e)))
});
let socket = match socket {
Ok(socket) => socket,
Err(e) => return set_error(e),
};
let mut l = sock.lock().unwrap();
*l = SocketData::Bound(opts, socket, sid, aopts);
SRT_SUCCESS
}
#[no_mangle]
pub extern "C" fn srt_listen(sock: SRTSOCKET, _backlog: c_int) -> c_int {
let sock = match get_sock(sock) {
None => return set_error(SRT_EINVSOCK.into()),
Some(sock) => sock,
};
let mut l = sock.lock().unwrap();
handle_result(l.listen(sock.clone()))
}
pub type SRT_EPOLL_OPT = c_int;
pub const SRT_EPOLL_OPT_NONE: c_int = 0x0;
pub const SRT_EPOLL_IN: c_int = 0x1;
pub const SRT_EPOLL_OUT: c_int = 0x4;
pub const SRT_EPOLL_ERR: c_int = 0x8;
pub const SRT_EPOLL_UPDATE: c_int = 0x10;
pub const SRT_EPOLL_ET: c_int = 1 << 31;
lazy_static! {
static ref EPOLLS: RwLock<BTreeMap<c_int, Arc<Mutex<SrtEpoll>>>> = RwLock::new(BTreeMap::new());
}
static NEXT_EPOLLID: AtomicI32 = AtomicI32::new(1);
fn get_epoll(id: c_int) -> Option<Arc<Mutex<SrtEpoll>>> {
EPOLLS.read().unwrap().get(&id).cloned()
}
#[no_mangle]
pub extern "C" fn srt_epoll_create() -> c_int {
let id = NEXT_EPOLLID.fetch_add(1, Ordering::SeqCst) as c_int;
EPOLLS
.write()
.unwrap()
.entry(id)
.or_insert_with(|| Arc::new(Mutex::new(SrtEpoll::default())));
id
}
#[no_mangle]
pub extern "C" fn srt_epoll_add_usock(
eid: c_int,
sock: SRTSOCKET,
events: Option<&c_int>,
) -> c_int {
let epoll = match get_epoll(eid) {
None => return set_error(SRT_EINVPOLLID.into()),
Some(sock) => sock,
};
let flags = match events.copied().and_then(EpollFlags::from_bits) {
Some(flags) => flags,
None => return set_error(SRT_EINVPARAM.into()),
};
let mut l = epoll.lock().unwrap();
l.add_srt(sock, flags);
SRT_SUCCESS
}
#[no_mangle]
pub extern "C" fn srt_epoll_add_ssock(eid: c_int, s: SYSSOCKET, events: Option<&c_int>) -> c_int {
let epoll = match get_epoll(eid) {
None => return set_error(SRT_EINVPOLLID.into()),
Some(sock) => sock,
};
let flags = match events.copied().and_then(EpollFlags::from_bits) {
Some(flags) => flags,
None => return set_error(SRT_EINVPARAM.into()),
};
let mut l = epoll.lock().unwrap();
l.add_sys(s, flags);
SRT_SUCCESS
}
#[no_mangle]
pub extern "C" fn srt_epoll_remove_usock(eid: c_int, sock: SRTSOCKET) -> c_int {
let epoll = match get_epoll(eid) {
None => return set_error(SRT_EINVPOLLID.into()),
Some(sock) => sock,
};
let mut l = epoll.lock().unwrap();
handle_result(l.remove_srt(sock))
}
#[no_mangle]
pub extern "C" fn srt_epoll_update_usock(
eid: c_int,
u: SRTSOCKET,
events: Option<&c_int>,
) -> c_int {
let epoll = match get_epoll(eid) {
None => return set_error(SRT_EINVPOLLID.into()),
Some(sock) => sock,
};
let flags = match events.copied().and_then(EpollFlags::from_bits) {
Some(flags) => flags,
None => return set_error(SRT_EINVPARAM.into()),
};
let mut l = epoll.lock().unwrap();
handle_result(l.update_srt(u, flags))
}
#[no_mangle]
pub extern "C" fn srt_epoll_release(eid: c_int) -> c_int {
if EPOLLS.write().unwrap().remove(&eid).is_none() {
return set_error(SRT_EINVPOLLID.into());
}
SRT_SUCCESS
}
#[no_mangle]
pub unsafe extern "C" fn srt_epoll_wait(
eid: c_int,
readfds: *mut SRTSOCKET,
rnum: Option<&mut c_int>,
writefds: *mut SRTSOCKET,
wnum: Option<&mut c_int>,
msTimeOut: i64,
lrfds: *mut SYSSOCKET,
lrnum: Option<&mut c_int>,
lwfds: *mut SYSSOCKET,
lwnum: Option<&mut c_int>,
) -> c_int {
let epoll = match get_epoll(eid) {
None => return set_error(SRT_EINVPOLLID.into()),
Some(sock) => sock,
};
let mut l = epoll.lock().unwrap();
let srt_read = if !readfds.is_null() && rnum.is_some() {
from_raw_parts_mut(
readfds as *mut MaybeUninit<CSrtSocket>,
*rnum.as_deref().unwrap() as usize,
)
} else {
&mut []
};
let srt_write = if !writefds.is_null() && wnum.is_some() {
from_raw_parts_mut(
writefds as *mut MaybeUninit<CSrtSocket>,
*wnum.as_deref().unwrap() as usize,
)
} else {
&mut []
};
let sys_read = if !lrfds.is_null() && lrnum.is_some() {
from_raw_parts_mut(
lrfds as *mut MaybeUninit<SYSSOCKET>,
*lrnum.as_deref().unwrap() as usize,
)
} else {
&mut []
};
let sys_write = if !lwfds.is_null() && lwnum.is_some() {
from_raw_parts_mut(
lwfds as *mut MaybeUninit<SYSSOCKET>,
*lwnum.as_deref().unwrap() as usize,
)
} else {
&mut []
};
let timeout = msTimeOut.try_into().map(Duration::from_millis).ok();
match l.wait(srt_read, srt_write, sys_read, sys_write, timeout) {
Ok((srt_rnum, srt_wnum, sys_rnum, sys_wnum)) => {
if let Some(rnum) = rnum {
*rnum = srt_rnum.try_into().unwrap();
}
if let Some(wnum) = wnum {
*wnum = srt_wnum.try_into().unwrap();
}
if let Some(lrnum) = lrnum {
*lrnum = sys_rnum.try_into().unwrap();
}
if let Some(lwnum) = lwnum {
*lwnum = sys_wnum.try_into().unwrap();
}
(srt_rnum + srt_wnum + sys_rnum + sys_wnum)
.try_into()
.unwrap()
}
Err(e) => set_error(e),
}
}
#[no_mangle]
pub extern "C" fn srt_epoll_uwait(
_eid: c_int,
_fdsSet: *mut SRT_EPOLL_EVENT,
_fdsSize: c_int,
_msTimeOut: i64,
) -> c_int {
todo!()
}
#[no_mangle]
pub extern "C" fn srt_connect(
sock: SRTSOCKET,
name: Option<&libc::sockaddr>,
namelen: c_int,
) -> c_int {
let name = match name {
Some(name) => name,
None => return set_error(SrtError::new(SRT_EINVPARAM, "Invalid socket address")),
};
let name = unsafe {
OsSocketAddr::copy_from_raw(
name,
min(
namelen.try_into().unwrap(),
size_of::<libc::sockaddr_in6>() as u32,
),
)
};
let name = match name.into_addr() {
Some(name) => name,
None => return set_error(SRT_EINVPARAM.into()),
};
let sock = match get_sock(sock) {
None => return set_error(SRT_EINVSOCK.into()),
Some(sock) => sock,
};
let l = sock.lock().unwrap();
handle_result(SocketData::connect(l, sock.clone(), name))
}
#[no_mangle]
pub extern "C" fn srt_accept(
sock: SRTSOCKET,
addr: Option<&mut libc::sockaddr>,
addrlen: Option<&mut c_int>,
) -> SRTSOCKET {
let addr = match (addr, addrlen) {
(None, None) | (None, Some(_)) => None,
(Some(addr), Some(addrlen)) => Some((addr, addrlen)),
(Some(_), None) => {
set_error(SRT_EINVPARAM.into());
return SRT_INVALID_SOCK;
}
};
let sock = match get_sock(sock) {
None => {
set_error(SRT_EINVSOCK.into());
return SRT_INVALID_SOCK;
}
Some(sock) => sock,
};
let l = sock.lock().unwrap();
match SocketData::accept(l, sock.clone()) {
Ok((sock, remote)) => {
if let Some((addr, len)) = addr {
let osa = OsSocketAddr::from(remote);
*addr = unsafe { *(osa.as_ptr()) };
*len = osa.len() as c_int;
}
sock
}
Err(e) => {
set_error(e);
SRT_INVALID_SOCK
}
}
}
fn set_error(err: SrtError) -> c_int {
LAST_ERROR_STR.with(|l| {
let mut m = l.borrow_mut();
let mut vec = take(&mut *m).into_bytes_with_nul();
vec.clear();
write!(&mut vec, "{:?}: {}", err.errno, err.context).unwrap();
vec.push(b'\0');
*m = CString::from_vec_with_nul(vec).unwrap();
});
LAST_ERROR.with(|l| *l.borrow_mut() = err.errno);
SRT_ERROR
}
fn handle_result(res: Result<(), SrtError>) -> c_int {
match res {
Ok(_) => SRT_SUCCESS,
Err(err) => set_error(err),
}
}
thread_local! {
pub static LAST_ERROR_STR: RefCell<CString> = RefCell::new(CString::new("(no error set on this thread)").unwrap());
pub static LAST_ERROR: RefCell<SRT_ERRNO> = const { RefCell::new(SRT_ERRNO::SRT_SUCCESS) };
}
#[no_mangle]
pub extern "C" fn srt_getlasterror(_errno_loc: *mut c_int) -> c_int {
LAST_ERROR.with(|l| *l.borrow()) as c_int
}
#[no_mangle]
pub extern "C" fn srt_getlasterror_str() -> *const c_char {
LAST_ERROR_STR.with(|f| f.borrow().as_c_str().as_ptr())
}
#[no_mangle]
pub extern "C" fn srt_send(sock: SRTSOCKET, buf: *const c_char, len: c_int) -> c_int {
srt_sendmsg2(sock, buf, len, None)
}
#[no_mangle]
pub extern "C" fn srt_sendmsg(
sock: SRTSOCKET,
buf: *const c_char,
len: c_int,
ttl: c_int,
inorder: c_int,
) -> c_int {
let ctrl = SRT_MSGCTRL {
msgttl: ttl,
inorder,
..srt_msgctrl_default
};
srt_sendmsg2(sock, buf, len, Some(&ctrl))
}
#[no_mangle]
pub extern "C" fn srt_sendmsg2(
sock: SRTSOCKET,
buf: *const c_char,
len: c_int,
_mctrl: Option<&SRT_MSGCTRL>,
) -> c_int {
let sock = match get_sock(sock) {
None => return set_error(SRT_EINVSOCK.into()),
Some(sock) => sock,
};
let mut l = sock.lock().unwrap();
match *l {
SocketData::Established(ref mut sock, _opts) => {
if sock
.try_send(
Instant::now(),
Bytes::copy_from_slice(unsafe {
from_raw_parts(buf as *const u8, len as usize)
}),
)
.is_err()
{
return set_error(SRT_ELARGEMSG.into());
}
}
_ => return set_error(SRT_ENOCONN.into()),
}
len
}
#[no_mangle]
pub extern "C" fn srt_recv(sock: SRTSOCKET, buf: *mut c_char, len: c_int) -> c_int {
let sock = match get_sock(sock) {
None => return set_error(SRT_EINVSOCK.into()),
Some(sock) => sock,
};
let bytes = unsafe { from_raw_parts_mut(buf as *mut u8, len as usize) };
let mut l = sock.lock().unwrap();
if let SocketData::Established(ref mut sock, opts) = *l {
TOKIO_RUNTIME.block_on(async {
let d = if opts.rcv_syn {
sock.next().await
} else {
match timeout(Duration::from_millis(10), sock.next()).await {
Err(_) => return set_error(SRT_EASYNCRCV.into()),
Ok(d) => d,
}
};
let (_, recvd) = match d {
Some(Ok(d)) => d,
Some(Err(e)) => return set_error(SrtError::new(SRT_ECONNLOST, e)), None => return set_error(SRT_ECONNLOST.into()),
};
if bytes.len() < recvd.len() {
error!("Receive buffer was not large enough, truncating...");
}
let bytes_to_write = min(bytes.len(), recvd.len());
bytes[..bytes_to_write].copy_from_slice(&recvd[..bytes_to_write]);
bytes_to_write as c_int
})
} else {
set_error(SRT_ENOCONN.into())
}
}
#[no_mangle]
pub extern "C" fn srt_recvmsg(sock: SRTSOCKET, buf: *mut c_char, len: c_int) -> c_int {
srt_recv(sock, buf, len)
}
#[no_mangle]
pub extern "C" fn srt_bstats(
_sock: SRTSOCKET,
_perf: &mut SRT_TRACEBSTATS,
_clear: c_int,
) -> c_int {
todo!()
}
pub fn insert_socket(data: SocketData) -> CSrtSocket {
let mut sockets = SOCKETS.write().unwrap();
let new_sockid = CSrtSocket::new_unused();
sockets.insert(new_sockid, Arc::new(Mutex::new(data)));
new_sockid
}
#[no_mangle]
pub static srt_msgctrl_default: SRT_MSGCTRL = SRT_MSGCTRL {
flags: 0,
msgttl: -1, inorder: 0, boundary: 0,
srctime: 0,
pktseq: -1,
msgno: -1,
grpdata: ptr::null(),
grpdata_size: 0,
};
#[no_mangle]
pub extern "C" fn srt_create_socket() -> SRTSOCKET {
insert_socket(SocketData::Initialized(
Default::default(),
None,
Default::default(),
))
}
#[no_mangle]
pub extern "C" fn srt_setloglevel(ll: c_int) {
let _level = match ll {
LOG_EMERG => log::Level::Error,
LOG_ALERT => log::Level::Error,
LOG_CRIT => log::Level::Error,
LOG_ERR => log::Level::Error,
LOG_WARNING => log::Level::Warn,
LOG_NOTICE => log::Level::Info,
LOG_INFO => log::Level::Info,
LOG_DEBUG => log::Level::Debug,
_ => return, };
}
#[no_mangle]
pub unsafe extern "C" fn srt_setsockopt(
sock: SRTSOCKET,
_level: c_int, optname: SRT_SOCKOPT,
optval: *const (),
optlen: c_int,
) -> c_int {
srt_setsockflag(sock, optname, optval, optlen)
}
#[no_mangle]
pub unsafe extern "C" fn srt_getsockopt(
sock: SRTSOCKET,
_level: c_int,
optname: SRT_SOCKOPT,
optval: *mut (),
optlen: Option<&mut c_int>,
) -> c_int {
srt_getsockflag(sock, optname, optval, optlen)
}
#[repr(C)]
pub enum SRT_SOCKSTATUS {
SRTS_INIT = 1,
SRTS_OPENED,
SRTS_LISTENING,
SRTS_CONNECTING,
SRTS_CONNECTED,
SRTS_BROKEN,
SRTS_CLOSING,
SRTS_CLOSED,
SRTS_NONEXIST,
}
#[repr(C)]
pub enum SRT_KM_STATE {
SRT_KM_S_UNSECURED = 0, SRT_KM_S_SECURING = 1, SRT_KM_S_SECURED = 2, SRT_KM_S_NOSECRET = 3, SRT_KM_S_BADSECRET = 4, }
#[no_mangle]
pub extern "C" fn srt_getsockstate(sock: SRTSOCKET) -> SRT_SOCKSTATUS {
use SRT_SOCKSTATUS::*;
let sock = match get_sock(sock) {
None => return SRTS_NONEXIST,
Some(sock) => sock,
};
let l = sock.lock().unwrap();
match *l {
SocketData::Initialized(_, _, _) => SRTS_INIT,
SocketData::Bound(_, _, _, _) => SRTS_INIT, SocketData::ConnectingNonBlocking(_, _) => SRTS_CONNECTING,
SocketData::Established(_, _) => SRTS_CONNECTED,
SocketData::Listening(_, _, _, _) => SRTS_LISTENING,
SocketData::ConnectFailed(_) => SRTS_BROKEN,
SocketData::Accepting(_) => SRTS_LISTENING,
SocketData::InvalidIntermediateState => SRTS_BROKEN,
SocketData::Closed => SRTS_CLOSED,
}
}
#[repr(C)]
pub struct SRT_EPOLL_EVENT {
fd: SRTSOCKET,
events: c_int,
}
#[no_mangle]
pub unsafe extern "C" fn srt_setsockflag(
sock: SRTSOCKET,
opt: SRT_SOCKOPT,
optval: *const (),
optlen: c_int,
) -> c_int {
let optval = NonNull::new(optval as *mut ());
let sock = match get_sock(sock) {
None => return set_error(SRT_EINVSOCK.into()),
Some(sock) => sock,
};
let mut sock = sock.lock().unwrap();
handle_result(sock.set_flag(opt, optval, optlen))
}
#[no_mangle]
pub unsafe extern "C" fn srt_getsockflag(
sock: SRTSOCKET,
opt: SRT_SOCKOPT,
optval: *mut (),
optlen: Option<&mut c_int>,
) -> c_int {
let optval = NonNull::new(optval);
let sock = match get_sock(sock) {
None => return set_error(SRT_EINVSOCK.into()),
Some(sock) => sock,
};
let optval_len = match optlen.as_ref().map(|&&mut i| i.try_into()) {
Some(Ok(len)) => len,
Some(Err(_)) => return set_error(SrtError::new(SRT_EINVPARAM, "optlen was negative")), None => 0,
};
let l = sock.lock().unwrap();
match l.get_flag(opt, optval, optval_len) {
Ok(len) => {
if let Some(ol) = optlen {
*ol = len.try_into().expect("returned >2GiB, strange");
}
SRT_SUCCESS
}
Err(e) => set_error(e),
}
}
#[no_mangle]
pub extern "C" fn srt_getsockname(
_sock: SRTSOCKET,
_name: *mut libc::sockaddr,
_namelen: *mut c_int,
) -> c_int {
todo!()
}
#[no_mangle]
pub extern "C" fn srt_getpeername(
_sock: SRTSOCKET,
_name: *mut libc::sockaddr,
_namelen: *mut c_int,
) -> c_int {
todo!()
}
#[no_mangle]
pub unsafe extern "C" fn srt_listen_callback(
sock: SRTSOCKET,
hook_fn: srt_listen_callback_fn,
hook_opaque: *mut (),
) -> c_int {
let sock = match get_sock(sock) {
None => return set_error(SRT_EINVSOCK.into()),
Some(sock) => sock,
};
let mut l = sock.lock().unwrap();
handle_result(l.listen_callback(hook_fn, hook_opaque))
}
#[no_mangle]
pub extern "C" fn srt_time_now() -> i64 {
(Instant::now() - *BASE_TIME)
.as_micros()
.try_into()
.expect("did not expect program to run for 2^63 us")
}
#[no_mangle]
pub extern "C" fn srt_close(socknum: SRTSOCKET) -> c_int {
let sock = match get_sock(socknum) {
None => return set_error(SRT_EINVSOCK.into()),
Some(sock) => sock,
};
let mut retcode = SRT_SUCCESS;
let mut l = sock.lock().unwrap();
match &mut *l {
SocketData::Established(ref mut s, _) => {
let res = TOKIO_RUNTIME.block_on(async move { s.close_and_finish().await });
if let Err(e) = res {
retcode = set_error(SrtError::new(
SRT_EINVOP,
format_args!("Failed to close socket: {e}"),
));
}
*l = SocketData::Closed
}
SocketData::Listening(ref mut listener, _, jh, _) => {
TOKIO_RUNTIME.block_on(async {
listener.close().await;
jh.await.unwrap();
});
*l = SocketData::Closed;
}
_ => (),
}
let mut sockets = SOCKETS.write().unwrap();
sockets.remove(&socknum);
retcode
}