librist-rust 0.6.3

Rust wapper for librist
use librist_sys::*;
use std::ffi::{CString, c_void, CStr};
use std::os::raw::{c_char, c_int};
use bitflags::bitflags;

#[derive(Debug, Copy, Clone)]
pub enum Profile {
    Advanced,
    Main,
    Simple,
}

#[derive(Debug)]
pub enum RistError {
    ReceiverCreateError,
    LoggingSetError,
    /// A rust string value was passed that could not be converted into a 'C' string for passing to
    /// librist (most likely because the rust string contained a NUL (`\0`) character
    InvalidCString,
    ParseAddressError,
    PeerCreateError,
    AuthHandlerError,
    OobCallbackError,
    DataCallbackError,
    DataReadError,
    StartError,
    /// A `std::time::Duration` value provided to specify a timeout was too large in value to be
    /// passed to librist
    TimeoutTooLarge,
}

impl From<Profile> for rist_profile {
    fn from(p: Profile) -> Self {
        match p {
            Profile::Advanced => rist_profile_RIST_PROFILE_ADVANCED,
            Profile::Main => rist_profile_RIST_PROFILE_MAIN,
            Profile::Simple => rist_profile_RIST_PROFILE_SIMPLE,
        }
    }
}
#[derive(Debug)]
pub enum LogLevel {
    Debug,
    Disable,
    Error,
    Info,
    Notice,
    Simulate,
    Warn,
}
impl From<LogLevel> for rist_log_level {
    fn from(l: LogLevel) -> Self {
        match l {
            LogLevel::Debug => rist_log_level_RIST_LOG_DEBUG,
            LogLevel::Disable => rist_log_level_RIST_LOG_DISABLE,
            LogLevel::Error => rist_log_level_RIST_LOG_ERROR,
            LogLevel::Info => rist_log_level_RIST_LOG_INFO,
            LogLevel::Notice => rist_log_level_RIST_LOG_NOTICE,
            LogLevel::Simulate => rist_log_level_RIST_LOG_SIMULATE,
            LogLevel::Warn => rist_log_level_RIST_LOG_WARN,
        }
    }
}

pub struct LoggingSettings {
    settings: *mut rist_logging_settings,
}
impl LoggingSettings {
    pub fn file<FD>(log_level: LogLevel, logfile: FD) -> Result<LoggingSettings, RistError>
        where
            FD: std::os::unix::io::AsRawFd
    {
        let stderr = unsafe {
            let mode = CString::new("w").unwrap();
            libc::fdopen(logfile.as_raw_fd(), mode.as_ptr())
        };

        let mut settings = LoggingSettings {
            settings: std::ptr::null_mut(),
        };

        let res = unsafe {
            let log_cb = None;
            let cb_arg = std::ptr::null_mut();
            let address = std::ptr::null_mut();
            rist_logging_set(
                &mut settings.settings,
                log_level.into(),
                log_cb,
                cb_arg,
                address,
                stderr,
            )
        };
        if res !=0 {
            return Err(RistError::LoggingSetError)
        }

        Ok(settings)
    }
}

pub struct ReceiverContext<CB>
    where
        CB: FnMut(ReceiveDataBlock)
{
    ctx: *mut rist_ctx,
    callback: Box<CB>
}

impl<CB: Send> ReceiverContext<CB>
    where
        CB: FnMut(ReceiveDataBlock) + Send
{

    /// The `callback` will potentially be called from a thread within librist
    pub fn create(profile: Profile, logging_settings: LoggingSettings, callback: CB) -> Result<ReceiverContext<CB>, RistError> {
        let mut ctx = ReceiverContext {
            ctx: std::ptr::null_mut(),
            callback: Box::new(callback),
        };
        let res = unsafe {
            rist_receiver_create(
                &mut ctx.ctx,
                profile.into(),
                logging_settings.settings,
            )
        };
        if res !=0 {
            return Err(RistError::ReceiverCreateError)
        }
        //ctx.auth_handler_set()?;
        ctx.data_callback()?;
        Ok(ctx)
    }

    pub fn peer_create(&mut self, peer_config: PeerConfig) -> Result<(), RistError> {
        let mut peer = std::ptr::null_mut();
        let res = unsafe { rist_peer_create(self.ctx, &mut peer, peer_config.config) };
        if res != 0 {
            return Err(RistError::PeerCreateError);
        }
        Ok(())
    }

    pub fn start(&mut self) -> Result<(), RistError> {
        let res = unsafe { rist_start(self.ctx) };
        if res != 0 {
            return Err(RistError::StartError);
        }
        Ok(())
    }

    pub fn auth_handler_set(&mut self) -> Result<(), RistError> {
        let res = unsafe { rist_auth_handler_set(self.ctx, Some(cb_auth_connect), Some(cb_auth_disconnect), self.ctx as *mut c_void) };
        if res != 0 {
            return Err(RistError::AuthHandlerError);
        }
        Ok(())
    }

    pub fn oob_callback_set(&mut self) -> Result<(), RistError> {
        let res = unsafe { rist_oob_callback_set(self.ctx, Some(cb_recv_oob), self.ctx as *mut c_void) };
        if res != 0 {
            return Err(RistError::OobCallbackError);
        }
        Ok(())
    }

    fn data_callback<>(&mut self) -> Result<(), RistError> {
        extern "C" fn trampoline<CB: FnMut(ReceiveDataBlock)>(arg: *mut c_void, block: *const rist_data_block) -> i32 {
            let closure: &mut CB = unsafe { &mut *(arg as *mut CB) };
            (*closure)(ReceiveDataBlock { block });
            return 0;
        }
        let res = unsafe {
            rist_receiver_data_callback_set(
                self.ctx,
                Some(trampoline::<CB>),
                self.callback.as_ref() as *const _ as *mut c_void
            )
        };
        if res == 0 {
            Ok(())
        } else {
            Err(RistError::DataCallbackError)
        }
    }

    pub fn data_read(&mut self, timeout: std::time::Duration) -> Result<DataReadResponse, RistError> {
        let timeout = {
            let ms = timeout.as_millis();
            if ms > (i32::MAX as u128) {
                return Err(RistError::TimeoutTooLarge);
            }
            ms as i32
        };
        let mut block = ReceiveDataBlock {
            block: std::ptr::null_mut(),
        };
        let res = unsafe {
            rist_receiver_data_read(
                self.ctx,
                &mut block.block,
                timeout,
            )
        };
        if res == -1 {
            return Err(RistError::DataReadError);
        }
        if res == 0 {
            return Ok(DataReadResponse::NoData)
        }
        if block.block.is_null() {
            // this this should have been covered by the res==0 case above; but belt-and-braces,
            return Ok(DataReadResponse::NoData)
        }
        Ok(DataReadResponse::Data {
            block,
            queue_size: (res - 1) as usize,
        })
    }
}

unsafe extern "C" fn cb_recv_oob(_arg: *mut c_void, _oob_block: *const rist_oob_block) -> c_int {
    return 0;
}

unsafe extern "C" fn cb_auth_connect(arg: *mut c_void, conn_ip: *const c_char, conn_port: u16, local_ip: *const c_char, local_port: u16, peer: *mut rist_peer) -> c_int {
    let ctx = arg as *mut rist_ctx;
    let connecting_ip = CStr::from_ptr(conn_ip).to_string_lossy();
    let local_ip = CStr::from_ptr(local_ip).to_string_lossy();
    let message = format!("auth,{}:{},{}:{}", connecting_ip, conn_port, local_ip, local_port);
    println!("message {:?}", message);
    let oob_block = rist_oob_block {
        payload: message.as_ptr() as *mut c_void,
        payload_len: message.len(),
        peer,
        ts_ntp: 0
    };
    rist_oob_write(ctx, &oob_block);
    return 0;
}
unsafe extern "C" fn cb_auth_disconnect(_arg: *mut c_void, _peer: *mut rist_peer) -> c_int {
    return 0;
}
pub enum DataReadResponse {
    NoData,
    Data {
        block: ReceiveDataBlock,
        queue_size: usize,
    }
}

impl<CB> Drop for ReceiverContext<CB>
    where
        CB: FnMut(ReceiveDataBlock)
{
    fn drop(&mut self) {
        let res = unsafe { rist_destroy(self.ctx) };
        assert_eq!(res, 0, "Could not destroy rist receiver context");
    }
}

pub struct PeerConfig {
    config: *const rist_peer_config,
}
impl PeerConfig {
    pub fn parse_address(url: &str) -> Result<PeerConfig, RistError> {
        let url = CString::new(url).map_err(|_| RistError::InvalidCString)?;
        let mut conf = PeerConfig {
            config: std::ptr::null_mut(),
        };
        let res = unsafe { rist_parse_address(url.as_ptr(), &mut conf.config) };
        if res != 0 {
            return Err(RistError::ParseAddressError);
        }
        Ok(conf)
    }
}

pub struct ReceiveDataBlock {
    block: *const rist_data_block,
}
impl ReceiveDataBlock {
    pub fn payload(&self) -> &[u8] {
        unsafe { &*std::ptr::slice_from_raw_parts((*self.block).payload as *const u8, (*self.block).payload_len)  }
    }
    pub fn ts_ntp(&self) -> u64 {
        unsafe { (*self.block).ts_ntp }
    }
    pub fn virt_src_port(&self) -> u16 {
        unsafe { (*self.block).virt_src_port }
    }
    pub fn virt_dst_port(&self) -> u16 {
        unsafe { (*self.block).virt_dst_port }
    }
    pub fn flow_id(&self) -> u32 {
        unsafe { (*self.block).flow_id }
    }
    pub fn seq(&self) -> u64 {
        unsafe { (*self.block).seq }
    }
    pub fn flags(&self) -> ReceiveFlags {
        ReceiveFlags::from_bits_truncate(unsafe { (*self.block).flags })
    }
}

impl Drop for ReceiveDataBlock {
    fn drop(&mut self) {
        unsafe {
            rist_receiver_data_block_free(&mut (self.block as *mut rist_data_block));
        }
    }
}

bitflags! {
    pub struct ReceiveFlags: u32 {
        const DISCONTINUITY = rist_data_block_receiver_flags_RIST_DATA_FLAGS_DISCONTINUITY;
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::io::stderr;

    #[test]
    fn smoke() {
        let logging_settings = LoggingSettings::file(LogLevel::Info, stderr())
            .expect("LoggingSettings::file() failed");
        let ctx = ReceiverContext::create(Profile::Simple, logging_settings, |_data_block| { });
        drop(ctx);
    }
}