use std::{
ffi::CStr,
mem::MaybeUninit,
os::raw::{c_char, c_int, c_void},
sync::Arc,
};
use ntgcalls::{
ntg_add_incoming_video, ntg_async_struct, ntg_auth_params_struct, ntg_calls, ntg_connect,
ntg_connect_p2p, ntg_cpu_usage, ntg_create, ntg_create_p2p, ntg_destroy, ntg_enable_g_lib_loop,
ntg_exchange_keys, ntg_frame_struct, ntg_get_connection_mode, ntg_get_media_devices,
ntg_get_protocol, ntg_get_state, ntg_get_version, ntg_init, ntg_init_exchange,
ntg_init_presentation, ntg_media_devices_struct, ntg_media_state_struct,
ntg_media_state_struct as NtgMediaState, ntg_mute, ntg_network_info_struct,
ntg_on_connection_change, ntg_on_stream_end, ntg_on_upgrade, ntg_pause, ntg_protocol_struct,
ntg_remote_source_struct, ntg_remove_incoming_video, ntg_resume,
ntg_segment_part_request_struct, ntg_send_broadcast_part, ntg_send_broadcast_timestamp,
ntg_send_external_frame, ntg_send_signaling_data, ntg_set_stream_sources, ntg_skip_exchange,
ntg_stop, ntg_stop_presentation, ntg_stream_device_enum, ntg_stream_mode_enum,
ntg_stream_type_enum, ntg_time, ntg_unmute, uintptr_t,
};
pub mod enums;
pub mod errors;
mod logger;
pub mod structures;
pub mod utils;
pub use enums::{
ConnectionKind, ConnectionMode, ConnectionState, MediaSegmentQuality, MediaSegmentStatus,
MediaSource, StreamDevice, StreamMode, StreamStatus, StreamType,
};
pub use errors::{CallError, Result};
pub use structures::{
AudioDescription, AuthParams, CallInfo, DeviceInfo, DhConfig, FrameData, MediaDescription,
MediaDevices, MediaState, NetworkInfo, RemoteSource, RtcServer, SegmentPartRequest, SsrcGroup,
VideoDescription,
};
use structures::{FfiDhConfig, FfiMediaDesc, FfiRtcServer, FfiSsrcGroup};
pub use utils::IntoCString;
pub type StreamCallback = Option<
unsafe extern "C" fn(uintptr_t, i64, ntg_stream_type_enum, ntg_stream_device_enum, *mut c_void),
>;
pub type UpgradeCallback = Option<unsafe extern "C" fn(uintptr_t, i64, NtgMediaState, *mut c_void)>;
pub type ConnectionCallback =
Option<unsafe extern "C" fn(uintptr_t, i64, ntg_network_info_struct, *mut c_void)>;
pub type SignalingCallback =
Option<unsafe extern "C" fn(uintptr_t, i64, *mut u8, c_int, *mut c_void)>;
pub type FrameCallback = Option<
unsafe extern "C" fn(
uintptr_t,
i64,
ntg_stream_mode_enum,
ntg_stream_device_enum,
*mut ntg_frame_struct,
u64,
*mut c_void,
),
>;
pub type RemoteSourceCallback =
Option<unsafe extern "C" fn(uintptr_t, i64, ntg_remote_source_struct, *mut c_void)>;
pub type BroadcastTimestampCallback = Option<unsafe extern "C" fn(uintptr_t, i64, *mut c_void)>;
pub type BroadcastPartCallback =
Option<unsafe extern "C" fn(uintptr_t, i64, ntg_segment_part_request_struct, *mut c_void)>;
struct AsyncCtx {
tx: std::sync::mpsc::SyncSender<()>,
error_code: c_int,
_error_message: *mut c_char,
}
unsafe impl Send for AsyncCtx {}
unsafe extern "C" fn async_promise(user_data: *mut c_void) {
let ctx = unsafe { &*(user_data as *mut AsyncCtx) };
let _ = ctx.tx.send(());
}
fn call_async<F>(f: F) -> Result<()>
where
F: FnOnce(ntg_async_struct) -> c_int,
{
let (tx, rx) = std::sync::mpsc::sync_channel::<()>(1);
let ctx = Box::into_raw(Box::new(AsyncCtx {
tx,
error_code: 0,
_error_message: std::ptr::null_mut(),
}));
let future = ntg_async_struct {
userData: ctx as *mut _,
errorCode: unsafe { &mut (*ctx).error_code as *mut _ },
errorMessage: unsafe { &mut (*ctx)._error_message as *mut _ },
promise: Some(async_promise),
};
let dispatch_rc = f(future);
if dispatch_rc < 0 {
unsafe { drop(Box::from_raw(ctx)) };
return Err(CallError::from(dispatch_rc));
}
rx.recv().expect("NTgCalls async promise never called");
let error_code = unsafe { (*ctx).error_code };
unsafe { drop(Box::from_raw(ctx)) };
if error_code < 0 {
Err(CallError::from(error_code))
} else {
Ok(())
}
}
struct Inner(uintptr_t);
impl Drop for Inner {
fn drop(&mut self) {
let _ = unsafe { ntg_destroy(self.0) };
}
}
#[derive(Clone)]
pub struct TgCalls {
inner: Arc<Inner>,
}
impl TgCalls {
pub fn try_new() -> Result<Self> {
logger::init();
#[cfg(feature = "bundled")]
{
const BUNDLED: &str = env!("NTGCALLS_BUNDLED_VERSION");
if BUNDLED != "local" {
let runtime = Self::version_raw();
if runtime != BUNDLED {
return Err(CallError::VersionMismatch {
compiled: BUNDLED.to_string(),
loaded: runtime,
});
}
}
}
Ok(Self {
inner: Arc::new(Inner(unsafe { ntg_init() })),
})
}
fn version_raw() -> String {
let mut buf: *mut c_char = std::ptr::null_mut();
unsafe {
ntg_get_version(&mut buf);
if buf.is_null() {
return String::new();
}
CStr::from_ptr(buf).to_string_lossy().into_owned()
}
}
pub fn version() -> String {
Self::version_raw()
}
pub fn protocol() -> Result<Protocol> {
let mut proto: MaybeUninit<ntg_protocol_struct> = MaybeUninit::uninit();
let rc = unsafe { ntg_get_protocol(proto.as_mut_ptr()) };
if rc < 0 {
return Err(CallError::from(rc));
}
let p = unsafe { proto.assume_init() };
let versions = unsafe {
(0..p.libraryVersionsSize as usize)
.map(|i| {
CStr::from_ptr(*p.libraryVersions.add(i))
.to_string_lossy()
.into_owned()
})
.collect()
};
Ok(Protocol {
min_layer: p.minLayer,
max_layer: p.maxLayer,
udp_p2p: p.udpP2P,
udp_reflector: p.udpReflector,
library_versions: versions,
})
}
pub fn enable_g_lib_loop(enable: bool) -> Result<()> {
let rc = unsafe { ntg_enable_g_lib_loop(enable) };
if rc < 0 {
Err(CallError::from(rc))
} else {
Ok(())
}
}
pub fn media_devices() -> Result<MediaDevices> {
let mut raw: MaybeUninit<ntg_media_devices_struct> = MaybeUninit::uninit();
let rc = unsafe { ntg_get_media_devices(raw.as_mut_ptr()) };
if rc < 0 {
return Err(CallError::from(rc));
}
Ok(unsafe { MediaDevices::from_ffi(raw.assume_init()) })
}
}
impl TgCalls {
pub fn create(&self, chat_id: i64) -> Result<String> {
let mut buf: *mut c_char = std::ptr::null_mut();
let buf_ptr = &mut buf as *mut *mut c_char;
let ptr = self.inner.0;
call_async(|future| unsafe { ntg_create(ptr, chat_id, buf_ptr, future) })?;
if buf.is_null() {
return Err(CallError::InvalidParams);
}
Ok(unsafe { CStr::from_ptr(buf).to_string_lossy().into_owned() })
}
pub fn connect<S: IntoCString>(
&self,
chat_id: i64,
params: S,
is_presentation: bool,
) -> Result<()> {
let c_params = params.into_c_string();
let ptr = self.inner.0;
call_async(|future| unsafe {
ntg_connect(
ptr,
chat_id,
c_params.as_ptr() as *mut _,
is_presentation,
future,
)
})
}
pub fn set_stream_sources(
&self,
chat_id: i64,
stream_mode: StreamMode,
desc: MediaDescription,
) -> Result<()> {
let mut ffi = FfiMediaDesc::new(&desc);
let ffi_desc = ffi.as_ffi();
let ptr = self.inner.0;
call_async(|future| unsafe {
ntg_set_stream_sources(ptr, chat_id, stream_mode as i32, ffi_desc, future)
})
}
pub fn stop(&self, chat_id: i64) -> Result<()> {
let ptr = self.inner.0;
call_async(|future| unsafe { ntg_stop(ptr, chat_id, future) })
}
pub fn pause(&self, chat_id: i64) -> Result<()> {
let ptr = self.inner.0;
call_async(|future| unsafe { ntg_pause(ptr, chat_id, future) })
}
pub fn resume(&self, chat_id: i64) -> Result<()> {
let ptr = self.inner.0;
call_async(|future| unsafe { ntg_resume(ptr, chat_id, future) })
}
pub fn mute(&self, chat_id: i64) -> Result<()> {
let ptr = self.inner.0;
call_async(|future| unsafe { ntg_mute(ptr, chat_id, future) })
}
pub fn unmute(&self, chat_id: i64) -> Result<()> {
let ptr = self.inner.0;
call_async(|future| unsafe { ntg_unmute(ptr, chat_id, future) })
}
pub fn played_time(&self, chat_id: i64, stream_mode: StreamMode) -> Result<i64> {
let mut time: i64 = 0;
let ptr = self.inner.0;
call_async(|future| unsafe {
ntg_time(ptr, chat_id, stream_mode as i32, &mut time, future)
})?;
Ok(time)
}
pub fn media_state(&self, chat_id: i64) -> Result<MediaState> {
let mut buf: MaybeUninit<ntg_media_state_struct> = MaybeUninit::uninit();
let ptr = self.inner.0;
call_async(|future| unsafe { ntg_get_state(ptr, chat_id, buf.as_mut_ptr(), future) })?;
let s = unsafe { buf.assume_init() };
Ok(MediaState {
muted: s.muted,
video_paused: s.videoPaused,
video_stopped: s.videoStopped,
presentation_paused: s.presentationPaused,
})
}
pub fn connection_mode(&self, chat_id: i64) -> Result<ConnectionMode> {
let mut mode: i32 = 0;
let ptr = self.inner.0;
call_async(|future| unsafe { ntg_get_connection_mode(ptr, chat_id, &mut mode, future) })?;
Ok(unsafe { std::mem::transmute::<i32, enums::ConnectionMode>(mode) })
}
pub fn calls(&self) -> Result<Vec<CallInfo>> {
let mut buf: *mut ntgcalls::ntg_call_info_struct = std::ptr::null_mut();
let mut size: c_int = 0;
let ptr = self.inner.0;
call_async(|future| unsafe { ntg_calls(ptr, &mut buf, &mut size, future) })?;
let result = unsafe {
std::slice::from_raw_parts(buf, size as usize)
.iter()
.map(|s| CallInfo::from(*s))
.collect()
};
Ok(result)
}
pub fn cpu_usage(&self) -> Result<f64> {
let mut usage: f64 = 0.0;
let ptr = self.inner.0;
call_async(|future| unsafe { ntg_cpu_usage(ptr, &mut usage, future) })?;
Ok(usage)
}
pub fn init_presentation(&self, chat_id: i64) -> Result<String> {
let mut buf: *mut c_char = std::ptr::null_mut();
let buf_ptr = &mut buf as *mut *mut c_char;
let ptr = self.inner.0;
call_async(|future| unsafe { ntg_init_presentation(ptr, chat_id, buf_ptr, future) })?;
if buf.is_null() {
return Err(CallError::InvalidParams);
}
Ok(unsafe { CStr::from_ptr(buf).to_string_lossy().into_owned() })
}
pub fn stop_presentation(&self, chat_id: i64) -> Result<()> {
let ptr = self.inner.0;
call_async(|future| unsafe { ntg_stop_presentation(ptr, chat_id, future) })
}
pub fn add_incoming_video(
&self,
chat_id: i64,
endpoint: impl IntoCString,
ssrc_groups: &[SsrcGroup],
) -> Result<u32> {
let endpoint_cs = endpoint.into_c_string();
let mut ffi_groups: Vec<FfiSsrcGroup> = ssrc_groups.iter().map(FfiSsrcGroup::new).collect();
let mut ffi_ptrs: Vec<ntgcalls::ntg_ssrc_group_struct> =
ffi_groups.iter_mut().map(|g| g.ffi.clone()).collect();
let mut out_ssrc: u32 = 0;
let ptr = self.inner.0;
call_async(|future| unsafe {
ntg_add_incoming_video(
ptr,
chat_id,
endpoint_cs.as_ptr() as *mut _,
if ffi_ptrs.is_empty() {
std::ptr::null_mut()
} else {
ffi_ptrs.as_mut_ptr()
},
ffi_ptrs.len() as c_int,
&mut out_ssrc,
future,
)
})?;
Ok(out_ssrc)
}
pub fn remove_incoming_video(&self, chat_id: i64, endpoint: impl IntoCString) -> Result<()> {
let endpoint_cs = endpoint.into_c_string();
let ptr = self.inner.0;
call_async(|future| unsafe {
ntg_remove_incoming_video(ptr, chat_id, endpoint_cs.as_ptr() as *mut _, future)
})
}
pub fn send_external_frame(
&self,
chat_id: i64,
device: StreamDevice,
frame: &mut [u8],
frame_data: FrameData,
) -> Result<()> {
let ptr = self.inner.0;
call_async(|future| unsafe {
ntg_send_external_frame(
ptr,
chat_id,
device as i32,
frame.as_mut_ptr(),
frame.len() as c_int,
frame_data.to_ffi(),
future,
)
})
}
pub fn send_broadcast_timestamp(&self, chat_id: i64, timestamp: i64) -> Result<()> {
let ptr = self.inner.0;
call_async(|future| unsafe {
ntg_send_broadcast_timestamp(ptr, chat_id, timestamp, future)
})
}
pub fn send_broadcast_part(
&self,
chat_id: i64,
segment_id: i64,
part_id: i32,
status: MediaSegmentStatus,
quality_update: bool,
frame: &[u8],
) -> Result<()> {
let ptr = self.inner.0;
call_async(|future| unsafe {
ntg_send_broadcast_part(
ptr,
chat_id,
segment_id,
part_id,
status as i32,
quality_update,
frame.as_ptr(),
frame.len() as c_int,
future,
)
})
}
pub fn create_p2p(&self, user_id: i64) -> Result<()> {
let ptr = self.inner.0;
call_async(|future| unsafe { ntg_create_p2p(ptr, user_id, future) })
}
pub fn connect_p2p(
&self,
user_id: i64,
servers: &[RtcServer],
versions: &[String],
p2p_allowed: bool,
) -> Result<()> {
let ffi_servers: Vec<FfiRtcServer> = servers.iter().map(FfiRtcServer::new).collect();
let mut ffi_server_ptrs: Vec<ntgcalls::ntg_rtc_server_struct> =
ffi_servers.iter().map(|s| s.ffi.clone()).collect();
let cstrings: Vec<std::ffi::CString> = versions
.iter()
.map(|v| std::ffi::CString::new(v.as_str()).unwrap_or_default())
.collect();
let mut ver_ptrs: Vec<*mut c_char> =
cstrings.iter().map(|cs| cs.as_ptr() as *mut _).collect();
let ptr = self.inner.0;
call_async(|future| unsafe {
ntg_connect_p2p(
ptr,
user_id,
if ffi_server_ptrs.is_empty() {
std::ptr::null_mut()
} else {
ffi_server_ptrs.as_mut_ptr()
},
ffi_server_ptrs.len() as c_int,
if ver_ptrs.is_empty() {
std::ptr::null_mut()
} else {
ver_ptrs.as_mut_ptr()
},
ver_ptrs.len() as c_int,
p2p_allowed,
future,
)
})
}
pub fn send_signaling_data(&self, user_id: i64, data: &mut [u8]) -> Result<()> {
let ptr = self.inner.0;
call_async(|future| unsafe {
ntg_send_signaling_data(ptr, user_id, data.as_mut_ptr(), data.len() as c_int, future)
})
}
pub fn init_exchange(
&self,
user_id: i64,
dh_config: &DhConfig,
g_a_hash: &[u8],
) -> Result<Vec<u8>> {
let mut ffi_dh = FfiDhConfig::new(dh_config);
let mut out_buf: *mut u8 = std::ptr::null_mut();
let mut out_size: c_int = 0;
let ptr = self.inner.0;
call_async(|future| unsafe {
ntg_init_exchange(
ptr,
user_id,
&mut ffi_dh.ffi,
g_a_hash.as_ptr(),
g_a_hash.len() as c_int,
&mut out_buf,
&mut out_size,
future,
)
})?;
if out_buf.is_null() || out_size <= 0 {
return Ok(Vec::new());
}
Ok(unsafe { std::slice::from_raw_parts(out_buf, out_size as usize).to_vec() })
}
pub fn exchange_keys(
&self,
user_id: i64,
g_a_or_b: &[u8],
fingerprint: i64,
) -> Result<AuthParams> {
let mut raw: MaybeUninit<ntg_auth_params_struct> = MaybeUninit::uninit();
let ptr = self.inner.0;
call_async(|future| unsafe {
ntg_exchange_keys(
ptr,
user_id,
g_a_or_b.as_ptr(),
g_a_or_b.len() as c_int,
fingerprint,
raw.as_mut_ptr(),
future,
)
})?;
Ok(AuthParams::from_ffi(unsafe { raw.assume_init() }))
}
pub fn skip_exchange(
&self,
user_id: i64,
encryption_key: &[u8],
is_outgoing: bool,
) -> Result<()> {
let ptr = self.inner.0;
call_async(|future| unsafe {
ntg_skip_exchange(
ptr,
user_id,
encryption_key.as_ptr(),
encryption_key.len() as c_int,
is_outgoing,
future,
)
})
}
pub unsafe fn on_stream_end(
&self,
callback: StreamCallback,
user_data: *mut c_void,
) -> Result<()> {
let rc = unsafe { ntg_on_stream_end(self.inner.0, callback, user_data) };
if rc < 0 {
Err(CallError::from(rc))
} else {
Ok(())
}
}
pub unsafe fn on_upgrade(
&self,
callback: UpgradeCallback,
user_data: *mut c_void,
) -> Result<()> {
let rc = unsafe { ntg_on_upgrade(self.inner.0, callback, user_data) };
if rc < 0 {
Err(CallError::from(rc))
} else {
Ok(())
}
}
pub unsafe fn on_connection_change(
&self,
callback: ConnectionCallback,
user_data: *mut c_void,
) -> Result<()> {
let rc = unsafe { ntg_on_connection_change(self.inner.0, callback, user_data) };
if rc < 0 {
Err(CallError::from(rc))
} else {
Ok(())
}
}
pub unsafe fn on_signaling_data(
&self,
callback: SignalingCallback,
user_data: *mut c_void,
) -> Result<()> {
let rc = unsafe { ntgcalls::ntg_on_signaling_data(self.inner.0, callback, user_data) };
if rc < 0 {
Err(CallError::from(rc))
} else {
Ok(())
}
}
pub unsafe fn on_frames(&self, callback: FrameCallback, user_data: *mut c_void) -> Result<()> {
let rc = unsafe { ntgcalls::ntg_on_frames(self.inner.0, callback, user_data) };
if rc < 0 {
Err(CallError::from(rc))
} else {
Ok(())
}
}
pub unsafe fn on_remote_source_change(
&self,
callback: RemoteSourceCallback,
user_data: *mut c_void,
) -> Result<()> {
let rc =
unsafe { ntgcalls::ntg_on_remote_source_change(self.inner.0, callback, user_data) };
if rc < 0 {
Err(CallError::from(rc))
} else {
Ok(())
}
}
pub unsafe fn on_request_broadcast_timestamp(
&self,
callback: BroadcastTimestampCallback,
user_data: *mut c_void,
) -> Result<()> {
let rc = unsafe {
ntgcalls::ntg_on_request_broadcast_timestamp(self.inner.0, callback, user_data)
};
if rc < 0 {
Err(CallError::from(rc))
} else {
Ok(())
}
}
pub unsafe fn on_request_broadcast_part(
&self,
callback: BroadcastPartCallback,
user_data: *mut c_void,
) -> Result<()> {
let rc =
unsafe { ntgcalls::ntg_on_request_broadcast_part(self.inner.0, callback, user_data) };
if rc < 0 {
Err(CallError::from(rc))
} else {
Ok(())
}
}
}
#[derive(Debug, Clone)]
pub struct Protocol {
pub min_layer: i32,
pub max_layer: i32,
pub udp_p2p: bool,
pub udp_reflector: bool,
pub library_versions: Vec<String>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn version_is_non_empty() {
assert!(!TgCalls::version().is_empty());
}
}