use librist_sys::*;
use std::ffi::{CString, c_void, CStr};
use std::os::raw::{c_char, c_int};
use bitflags::bitflags;
#[derive(Debug, Copy, Clone)]
pub enum Profile {
Advanced,
Main,
Simple,
}
#[derive(Debug)]
pub enum RistError {
ReceiverCreateError,
LoggingSetError,
InvalidCString,
ParseAddressError,
PeerCreateError,
AuthHandlerError,
OobCallbackError,
DataReadError,
StartError,
TimeoutTooLarge,
}
impl From<Profile> for rist_profile {
fn from(p: Profile) -> Self {
match p {
Profile::Advanced => rist_profile_RIST_PROFILE_ADVANCED,
Profile::Main => rist_profile_RIST_PROFILE_MAIN,
Profile::Simple => rist_profile_RIST_PROFILE_SIMPLE,
}
}
}
#[derive(Debug)]
pub enum LogLevel {
Debug,
Disable,
Error,
Info,
Notice,
Simulate,
Warn,
}
impl From<LogLevel> for rist_log_level {
fn from(l: LogLevel) -> Self {
match l {
LogLevel::Debug => rist_log_level_RIST_LOG_DEBUG,
LogLevel::Disable => rist_log_level_RIST_LOG_DISABLE,
LogLevel::Error => rist_log_level_RIST_LOG_ERROR,
LogLevel::Info => rist_log_level_RIST_LOG_INFO,
LogLevel::Notice => rist_log_level_RIST_LOG_NOTICE,
LogLevel::Simulate => rist_log_level_RIST_LOG_SIMULATE,
LogLevel::Warn => rist_log_level_RIST_LOG_WARN,
}
}
}
pub struct LoggingSettings {
settings: *mut rist_logging_settings,
}
impl LoggingSettings {
pub fn file<FD>(log_level: LogLevel, logfile: FD) -> Result<LoggingSettings, RistError>
where
FD: std::os::unix::io::AsRawFd
{
let stderr = unsafe {
libc::fdopen(logfile.as_raw_fd(), CString::new("w").unwrap().as_ptr())
};
let mut settings = LoggingSettings {
settings: std::ptr::null_mut(),
};
let res = unsafe {
let log_cb = None;
let cb_arg = std::ptr::null_mut();
let address = std::ptr::null_mut();
rist_logging_set(
&mut settings.settings,
log_level.into(),
log_cb,
cb_arg,
address,
stderr,
)
};
if res !=0 {
return Err(RistError::LoggingSetError)
}
Ok(settings)
}
}
pub struct ReceiverContext<CB>
where
CB: FnMut(ReceiveDataBlock)
{
ctx: *mut rist_ctx,
callback: Box<CB>
}
impl<CB: Send> ReceiverContext<CB>
where
CB: FnMut(ReceiveDataBlock)
{
pub fn create(profile: Profile, logging_settings: LoggingSettings, callback: CB) -> Result<ReceiverContext<CB>, RistError> {
let mut ctx = ReceiverContext {
ctx: std::ptr::null_mut(),
callback: Box::new(callback),
};
let res = unsafe {
rist_receiver_create(
&mut ctx.ctx,
profile.into(),
logging_settings.settings,
)
};
ctx.data_callback();
if res !=0 {
return Err(RistError::ReceiverCreateError)
}
Ok(ctx)
}
pub fn peer_create(&mut self, peer_config: PeerConfig) -> Result<(), RistError> {
let mut peer = std::ptr::null_mut();
let res = unsafe { rist_peer_create(self.ctx, &mut peer, peer_config.config) };
if res != 0 {
return Err(RistError::PeerCreateError);
}
Ok(())
}
pub fn start(&mut self) -> Result<(), RistError> {
let res = unsafe { rist_start(self.ctx) };
if res != 0 {
return Err(RistError::StartError);
}
Ok(())
}
pub fn auth_handler_set(&mut self) -> Result<(), RistError> {
let res = unsafe { rist_auth_handler_set(self.ctx, Some(cb_auth_connect), Some(cb_auth_disconnect), self.ctx as *mut c_void) };
if res != 0 {
return Err(RistError::AuthHandlerError);
}
Ok(())
}
pub fn oob_callback_set(&mut self) -> Result<(), RistError> {
let res = unsafe { rist_oob_callback_set(self.ctx, Some(cb_recv_oob), self.ctx as *mut c_void) };
if res != 0 {
return Err(RistError::OobCallbackError);
}
Ok(())
}
fn data_callback<>(&mut self) {
extern "C" fn trampoline<CB: FnMut(ReceiveDataBlock)>(arg: *mut c_void, block: *const rist_data_block) -> i32 {
let closure: &mut CB = unsafe { &mut *(arg as *mut CB) };
(*closure)(ReceiveDataBlock { block });
return 0;
}
let _res = unsafe {
rist_receiver_data_callback_set(
self.ctx,
Some(trampoline::<CB>),
self.callback.as_ref() as *const _ as *mut c_void
)
};
}
pub fn data_read(&mut self, timeout: std::time::Duration) -> Result<DataReadResponse, RistError> {
let timeout = {
let ms = timeout.as_millis();
if ms > (i32::MAX as u128) {
return Err(RistError::TimeoutTooLarge);
}
ms as i32
};
let mut block = ReceiveDataBlock {
block: std::ptr::null_mut(),
};
let res = unsafe {
rist_receiver_data_read(
self.ctx,
&mut block.block,
timeout,
)
};
if res == -1 {
return Err(RistError::DataReadError);
}
if res == 0 {
return Ok(DataReadResponse::NoData)
}
if block.block.is_null() {
return Ok(DataReadResponse::NoData)
}
Ok(DataReadResponse::Data {
block,
queue_size: (res - 1) as usize,
})
}
}
unsafe extern "C" fn cb_recv_oob(_arg: *mut c_void, _oob_block: *const rist_oob_block) -> c_int {
return 0;
}
unsafe extern "C" fn cb_auth_connect(arg: *mut c_void, conn_ip: *const c_char, conn_port: u16, local_ip: *const c_char, local_port: u16, peer: *mut rist_peer) -> c_int {
let ctx = arg as *mut rist_ctx;
let connecting_ip = CStr::from_ptr(conn_ip).to_string_lossy();
let local_ip = CStr::from_ptr(local_ip).to_string_lossy();
let message = format!("auth,{}:{},{}:{}", connecting_ip, conn_port, local_ip, local_port);
println!("message {:?}", message);
let oob_block = rist_oob_block {
payload: message.as_ptr() as *mut c_void,
payload_len: message.len(),
peer,
ts_ntp: 0
};
rist_oob_write(ctx, &oob_block);
return 0;
}
unsafe extern "C" fn cb_auth_disconnect(_arg: *mut c_void, _peer: *mut rist_peer) -> c_int {
return 0;
}
pub enum DataReadResponse {
NoData,
Data {
block: ReceiveDataBlock,
queue_size: usize,
}
}
impl<CB> Drop for ReceiverContext<CB>
where
CB: FnMut(ReceiveDataBlock)
{
fn drop(&mut self) {
let res = unsafe { rist_destroy(self.ctx) };
assert_eq!(res, 0, "Could not destroy rist receiver context");
}
}
pub struct PeerConfig {
config: *const rist_peer_config,
}
impl PeerConfig {
pub fn parse_address(url: &str) -> Result<PeerConfig, RistError> {
let url = CString::new(url).map_err(|_| RistError::InvalidCString)?;
let mut conf = PeerConfig {
config: std::ptr::null_mut(),
};
let res = unsafe { rist_parse_address(url.as_ptr(), &mut conf.config) };
if res != 0 {
return Err(RistError::ParseAddressError);
}
Ok(conf)
}
}
pub struct ReceiveDataBlock {
block: *const rist_data_block,
}
impl ReceiveDataBlock {
pub fn payload(&self) -> &[u8] {
unsafe { &*std::ptr::slice_from_raw_parts((*self.block).payload as *const u8, (*self.block).payload_len) }
}
pub fn ts_ntp(&self) -> u64 {
unsafe { (*self.block).ts_ntp }
}
pub fn virt_src_port(&self) -> u16 {
unsafe { (*self.block).virt_src_port }
}
pub fn virt_dst_port(&self) -> u16 {
unsafe { (*self.block).virt_dst_port }
}
pub fn flow_id(&self) -> u32 {
unsafe { (*self.block).flow_id }
}
pub fn seq(&self) -> u64 {
unsafe { (*self.block).seq }
}
pub fn flags(&self) -> ReceiveFlags {
ReceiveFlags::from_bits_truncate(unsafe { (*self.block).flags })
}
}
bitflags! {
pub struct ReceiveFlags: u32 {
const DISCONTINUITY = rist_data_block_receiver_flags_RIST_DATA_FLAGS_DISCONTINUITY;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::stderr;
#[test]
fn smoke() {
let logging_settings = LoggingSettings::file(LogLevel::Info, stderr())
.expect("LoggingSettings::file() failed");
let ctx = ReceiverContext::create(Profile::Simple, logging_settings);
drop(ctx);
}
}