use crate::enums::{IntoResult, OtcError, OtcResult};
use crate::stream::Stream;
use crate::video_capturer::VideoCapturer;
use crate::video_frame::VideoFrame;
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::os::raw::{c_char, c_void};
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
use std::sync::{Arc, Mutex};
lazy_static! {
pub static ref INSTANCES: Arc<Mutex<HashMap<usize, Publisher>>> = Default::default();
}
pub enum PublisherError {
Internal,
SessionDisconnected,
TimedOut,
UnableToPublish,
WebRtcError,
__Unknown,
}
impl From<ffi::otc_publisher_error_code> for PublisherError {
fn from(type_: ffi::otc_publisher_error_code) -> PublisherError {
match type_ {
ffi::otc_publisher_error_code_OTC_PUBLISHER_INTERNAL_ERROR => PublisherError::Internal,
ffi::otc_publisher_error_code_OTC_PUBLISHER_SESSION_DISCONNECTED => {
PublisherError::SessionDisconnected
}
ffi::otc_publisher_error_code_OTC_PUBLISHER_UNABLE_TO_PUBLISH => {
PublisherError::UnableToPublish
}
ffi::otc_publisher_error_code_OTC_PUBLISHER_TIMED_OUT => PublisherError::TimedOut,
ffi::otc_publisher_error_code_OTC_PUBLISHER_WEBRTC_ERROR => PublisherError::WebRtcError,
_ => PublisherError::__Unknown,
}
}
}
enum VideoType {
Camera,
Screen,
}
struct MediaStats {
connection_id: String,
subscriber_id: String,
packets_lost: i64,
bytes_sent: i64,
audio_level: f32,
timestamp: f64,
start_time: f64,
}
enum PublisherStats {
Audio(MediaStats),
Video(MediaStats),
}
ffi_callback!(
on_stream_created,
*mut ffi::otc_publisher,
Publisher,
*const ffi::otc_stream
);
ffi_callback!(
on_stream_destroyed,
*mut ffi::otc_publisher,
Publisher,
*const ffi::otc_stream
);
ffi_callback!(
on_render_frame,
*mut ffi::otc_publisher,
Publisher,
*const ffi::otc_video_frame
);
ffi_callback!(
on_audio_level_updated,
*mut ffi::otc_publisher,
Publisher,
f32
);
ffi_callback!(
on_error,
*mut ffi::otc_publisher,
Publisher,
*const c_char,
ffi::otc_publisher_error_code
);
#[allow(clippy::type_complexity)]
pub struct PublisherCallbacks {
on_stream_created: Option<Box<dyn Fn(&Publisher, Stream) + Send + Sync + 'static>>,
on_stream_destroyed: Option<Box<dyn Fn(&Publisher, Stream) + Send + Sync + 'static>>,
on_render_frame: Option<Box<dyn Fn(&Publisher, VideoFrame) + Send + Sync + 'static>>,
on_audio_level_updated: Option<Box<dyn Fn(&Publisher, f32) + Send + Sync + 'static>>,
on_error: Option<Box<dyn Fn(&Publisher, &str, PublisherError) + Send + Sync + 'static>>,
}
impl PublisherCallbacks {
pub fn builder() -> PublisherCallbacksBuilder {
PublisherCallbacksBuilder::default()
}
callback!(on_stream_created, &Publisher, Stream);
callback!(on_stream_destroyed, &Publisher, Stream);
callback!(on_render_frame, &Publisher, VideoFrame);
callback!(on_audio_level_updated, &Publisher, f32);
callback!(on_error, &Publisher, &str, PublisherError);
}
#[derive(Default)]
#[allow(clippy::type_complexity)]
pub struct PublisherCallbacksBuilder {
on_stream_created: Option<Box<dyn Fn(&Publisher, Stream) + Send + Sync + 'static>>,
on_stream_destroyed: Option<Box<dyn Fn(&Publisher, Stream) + Send + Sync + 'static>>,
on_render_frame: Option<Box<dyn Fn(&Publisher, VideoFrame) + Send + Sync + 'static>>,
on_audio_level_updated: Option<Box<dyn Fn(&Publisher, f32) + Send + Sync + 'static>>,
on_error: Option<Box<dyn Fn(&Publisher, &str, PublisherError) + Send + Sync + 'static>>,
}
impl PublisherCallbacksBuilder {
callback_setter!(on_stream_created, &Publisher, Stream);
callback_setter!(on_stream_destroyed, &Publisher, Stream);
callback_setter!(on_render_frame, &Publisher, VideoFrame);
callback_setter!(on_audio_level_updated, &Publisher, f32);
callback_setter!(on_error, &Publisher, &str, PublisherError);
pub fn build(self) -> PublisherCallbacks {
PublisherCallbacks {
on_stream_created: self.on_stream_created,
on_stream_destroyed: self.on_stream_destroyed,
on_render_frame: self.on_render_frame,
on_audio_level_updated: self.on_audio_level_updated,
on_error: self.on_error,
}
}
}
#[derive(Clone)]
pub struct Publisher {
ptr: Arc<AtomicPtr<*const ffi::otc_publisher>>,
capturer: Option<VideoCapturer>,
callbacks: Arc<Mutex<PublisherCallbacks>>,
publishing: Arc<AtomicBool>,
}
unsafe impl Sync for Publisher {}
unsafe impl Send for Publisher {}
impl Publisher {
pub fn new(name: &str, capturer: Option<VideoCapturer>, callbacks: PublisherCallbacks) -> Self {
let name = CString::new(name).unwrap_or_default();
let capturer_callbacks = capturer.clone().map_or(std::ptr::null(), |mut capturer| {
&*capturer.callbacks().lock().unwrap() as *const ffi::otc_video_capturer_callbacks
});
let ffi_callbacks = ffi::otc_publisher_callbacks {
on_stream_created: Some(on_stream_created),
on_stream_destroyed: Some(on_stream_destroyed),
on_render_frame: Some(on_render_frame),
on_audio_level_updated: Some(on_audio_level_updated),
on_audio_stats: None,
on_video_stats: None,
on_error: Some(on_error),
user_data: std::ptr::null_mut(),
reserved: std::ptr::null_mut(),
};
let ptr =
unsafe { ffi::otc_publisher_new(name.as_ptr(), capturer_callbacks, &ffi_callbacks) };
let publisher = Self {
ptr: Arc::new(AtomicPtr::new(ptr as *mut _)),
capturer,
callbacks: Arc::new(Mutex::new(callbacks)),
publishing: Default::default(),
};
INSTANCES
.lock()
.unwrap()
.insert(ptr as usize, publisher.clone());
publisher
}
pub fn inner(&self) -> *const ffi::otc_publisher {
self.ptr.load(Ordering::Relaxed) as *const _
}
callback_call!(on_render_frame, *const ffi::otc_video_frame);
callback_call!(on_audio_level_updated, f32);
fn on_stream_created(&self, stream: *const ffi::otc_stream) {
self.publishing.store(true, Ordering::Relaxed);
if let Ok(callbacks) = self.callbacks.try_lock() {
callbacks.on_stream_created(self, stream.into());
}
}
fn on_stream_destroyed(&self, stream: *const ffi::otc_stream) {
self.publishing.store(false, Ordering::Relaxed);
if let Ok(callbacks) = self.callbacks.try_lock() {
callbacks.on_stream_destroyed(self, stream.into());
}
}
fn on_error(&self, error_string: *const c_char, error_code: ffi::otc_publisher_error_code) {
if error_string.is_null() {
return;
}
let error_string = unsafe { CStr::from_ptr(error_string) };
if let Ok(callbacks) = self.callbacks.try_lock() {
callbacks.on_error(
self,
error_string.to_str().unwrap_or_default(),
error_code.into(),
);
}
}
pub fn toggle_audio(&self, audio_enabled: bool) -> OtcResult {
if self.ptr.load(Ordering::Relaxed).is_null() {
return Err(OtcError::NullError);
}
unsafe {
ffi::otc_publisher_set_publish_audio(
self.ptr.load(Ordering::Relaxed) as *mut _,
audio_enabled.into(),
)
}
.into_result()
}
pub fn toggle_video(&self, video_enabled: bool) -> OtcResult {
if self.ptr.load(Ordering::Relaxed).is_null() {
return Err(OtcError::NullError);
}
unsafe {
ffi::otc_publisher_set_publish_video(
self.ptr.load(Ordering::Relaxed) as *mut _,
video_enabled.into(),
)
}
.into_result()
}
pub fn stream(&self) -> Option<Stream> {
if self.ptr.load(Ordering::Relaxed).is_null() {
return None;
}
let stream_ptr =
unsafe { ffi::otc_publisher_get_stream(self.ptr.load(Ordering::Relaxed) as *mut _) };
if stream_ptr.is_null() {
return None;
}
Some((stream_ptr as *const ffi::otc_stream).into())
}
pub fn unpublish(&self) -> OtcResult {
if !self.publishing.load(Ordering::Relaxed) {
return Ok(());
}
let ptr = self.ptr.load(Ordering::Relaxed);
if ptr.is_null() {
return Err(OtcError::NullError);
}
unsafe {
let session = ffi::otc_publisher_get_session(ptr as *const _);
if !session.is_null() {
ffi::otc_session_unpublish(session, ptr as *mut _)
} else {
return Ok(());
}
}
.into_result()
}
}
impl Drop for Publisher {
fn drop(&mut self) {
let ptr = self.ptr.load(Ordering::Relaxed);
if Arc::strong_count(&self.ptr) > 2 {
return;
}
if ptr.is_null() {
return;
}
let _ = self.unpublish();
self.ptr.store(std::ptr::null_mut(), Ordering::Relaxed);
unsafe {
ffi::otc_publisher_delete(ptr as *mut _);
}
if let Ok(ref mut instances) = INSTANCES.try_lock() {
instances.remove(&(ptr as usize));
}
}
}