use crate::*;
use std::sync::{Arc, Mutex, Weak};
use tx5_go_pion_sys::API;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(crate = "tx5_core::deps::serde", rename_all = "camelCase")]
pub struct IceServer {
pub urls: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub username: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub credential: Option<String>,
}
#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(crate = "tx5_core::deps::serde", rename_all = "camelCase")]
pub struct PeerConnectionConfig {
pub ice_servers: Vec<IceServer>,
}
impl From<PeerConnectionConfig> for GoBufRef<'static> {
fn from(p: PeerConnectionConfig) -> Self {
GoBufRef::json(p)
}
}
impl From<&PeerConnectionConfig> for GoBufRef<'static> {
fn from(p: &PeerConnectionConfig) -> Self {
GoBufRef::json(p)
}
}
#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(crate = "tx5_core::deps::serde", rename_all = "camelCase")]
pub struct DataChannelConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub label: Option<String>,
}
impl From<DataChannelConfig> for GoBufRef<'static> {
fn from(p: DataChannelConfig) -> Self {
GoBufRef::json(p)
}
}
impl From<&DataChannelConfig> for GoBufRef<'static> {
fn from(p: &DataChannelConfig) -> Self {
GoBufRef::json(p)
}
}
#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(crate = "tx5_core::deps::serde", rename_all = "camelCase")]
pub struct OfferConfig {}
impl From<OfferConfig> for GoBufRef<'static> {
fn from(p: OfferConfig) -> Self {
GoBufRef::json(p)
}
}
impl From<&OfferConfig> for GoBufRef<'static> {
fn from(p: &OfferConfig) -> Self {
GoBufRef::json(p)
}
}
#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(crate = "tx5_core::deps::serde", rename_all = "camelCase")]
pub struct AnswerConfig {}
impl From<AnswerConfig> for GoBufRef<'static> {
fn from(p: AnswerConfig) -> Self {
GoBufRef::json(p)
}
}
impl From<&AnswerConfig> for GoBufRef<'static> {
fn from(p: &AnswerConfig) -> Self {
GoBufRef::json(p)
}
}
pub(crate) struct PeerConCore {
peer_con_id: usize,
con_state: PeerConnectionState,
evt_send: tokio::sync::mpsc::UnboundedSender<PeerConnectionEvent>,
drop_err: Error,
}
impl Drop for PeerConCore {
fn drop(&mut self) {
let _ = self
.evt_send
.send(PeerConnectionEvent::Error(self.drop_err.clone()));
unregister_peer_con(self.peer_con_id);
unsafe {
API.peer_con_free(self.peer_con_id);
}
}
}
impl PeerConCore {
pub fn new(
peer_con_id: usize,
evt_send: tokio::sync::mpsc::UnboundedSender<PeerConnectionEvent>,
) -> Self {
Self {
peer_con_id,
con_state: PeerConnectionState::New,
evt_send,
drop_err: Error::id("PeerConnectionDropped").into(),
}
}
pub fn close(&mut self, err: Error) {
self.drop_err = err;
}
}
#[derive(Clone)]
pub(crate) struct WeakPeerCon(
pub(crate) Weak<Mutex<std::result::Result<PeerConCore, Error>>>,
);
macro_rules! peer_con_strong_core {
($inner:expr, $ident:ident, $block:block) => {
match &mut *$inner.lock().unwrap() {
Ok($ident) => $block,
Err(err) => Result::Err(err.clone().into()),
}
};
}
macro_rules! peer_con_weak_core {
($inner:expr, $ident:ident, $block:block) => {
match $inner.upgrade() {
Some(strong) => peer_con_strong_core!(strong, $ident, $block),
None => Result::Err(Error::id("PeerConnectionClosed")),
}
};
}
impl WeakPeerCon {
pub fn send_evt(&self, evt: PeerConnectionEvent) -> Result<()> {
peer_con_weak_core!(self.0, core, {
core.evt_send
.send(evt)
.map_err(|_| Error::id("PeerConnectionClosed"))
})
}
}
pub struct PeerConnection(Arc<Mutex<std::result::Result<PeerConCore, Error>>>);
impl PeerConnection {
pub async fn new<'a, B>(
config: B,
) -> Result<(
Self,
tokio::sync::mpsc::UnboundedReceiver<PeerConnectionEvent>,
)>
where
B: Into<GoBufRef<'a>>,
{
tx5_init().await.map_err(Error::err)?;
init_evt_manager();
r2id!(config);
tokio::task::spawn_blocking(move || unsafe {
let peer_con_id = API.peer_con_alloc(config)?;
let (evt_send, evt_recv) = tokio::sync::mpsc::unbounded_channel();
let strong = Arc::new(Mutex::new(Ok(PeerConCore::new(
peer_con_id,
evt_send,
))));
let weak = WeakPeerCon(Arc::downgrade(&strong));
register_peer_con(peer_con_id, weak);
Ok((Self(strong), evt_recv))
})
.await?
}
pub fn set_con_state(&self, con_state: PeerConnectionState) {
let mut lock = self.0.lock().unwrap();
if let Ok(core) = &mut *lock {
core.con_state = con_state;
} else {
tracing::warn!(
?con_state,
"Unable to set peer connection state: {:?}",
self.get_peer_con_id()
);
}
}
pub fn get_con_state(&self) -> Result<PeerConnectionState> {
peer_con_strong_core!(self.0, core, { Ok(core.con_state) })
}
pub fn close<E: Into<Error>>(&self, err: E) {
let err = err.into();
let mut tmp = Err(err.clone());
{
let mut lock = self.0.lock().unwrap();
let mut do_swap = false;
if let Ok(core) = &mut *lock {
core.close(err.clone());
do_swap = true;
}
if do_swap {
std::mem::swap(&mut *lock, &mut tmp);
}
}
drop(tmp);
}
fn get_peer_con_id(&self) -> Result<usize> {
peer_con_strong_core!(self.0, core, { Ok(core.peer_con_id) })
}
pub async fn stats(&self) -> Result<GoBuf> {
let peer_con_id = self.get_peer_con_id()?;
tokio::task::spawn_blocking(move || unsafe {
API.peer_con_stats(peer_con_id).map(GoBuf)
})
.await?
}
pub async fn create_offer<'a, B>(&self, config: B) -> Result<GoBuf>
where
B: Into<GoBufRef<'a>>,
{
let peer_con_id = self.get_peer_con_id()?;
r2id!(config);
tokio::task::spawn_blocking(move || unsafe {
API.peer_con_create_offer(peer_con_id, config).map(GoBuf)
})
.await?
}
pub async fn create_answer<'a, B>(&self, config: B) -> Result<GoBuf>
where
B: Into<GoBufRef<'a>>,
{
let peer_con_id = self.get_peer_con_id()?;
r2id!(config);
tokio::task::spawn_blocking(move || unsafe {
API.peer_con_create_answer(peer_con_id, config).map(GoBuf)
})
.await?
}
pub async fn set_local_description<'a, B>(&self, desc: B) -> Result<()>
where
B: Into<GoBufRef<'a>>,
{
let peer_con_id = self.get_peer_con_id()?;
r2id!(desc);
tokio::task::spawn_blocking(move || unsafe {
API.peer_con_set_local_desc(peer_con_id, desc)
})
.await?
}
pub async fn set_remote_description<'a, B>(&self, desc: B) -> Result<()>
where
B: Into<GoBufRef<'a>>,
{
let peer_con_id = self.get_peer_con_id()?;
r2id!(desc);
tokio::task::spawn_blocking(move || unsafe {
API.peer_con_set_rem_desc(peer_con_id, desc)
})
.await?
}
pub async fn add_ice_candidate<'a, B>(&self, ice: B) -> Result<()>
where
B: Into<GoBufRef<'a>>,
{
let peer_con_id = self.get_peer_con_id()?;
r2id!(ice);
tokio::task::spawn_blocking(move || unsafe {
API.peer_con_add_ice_candidate(peer_con_id, ice)
})
.await?
}
pub async fn create_data_channel<'a, B>(
&self,
config: B,
) -> Result<(
DataChannel,
tokio::sync::mpsc::UnboundedReceiver<DataChannelEvent>,
)>
where
B: Into<GoBufRef<'a>>,
{
let peer_con_id =
peer_con_strong_core!(self.0, core, { Ok(core.peer_con_id) })?;
r2id!(config);
tokio::task::spawn_blocking(move || unsafe {
let data_chan_id =
API.peer_con_create_data_chan(peer_con_id, config)?;
Ok(DataChannel::new(data_chan_id))
})
.await?
}
}