#[cfg(test)]
pub(crate) mod peer_connection_test;
pub mod certificate;
pub mod configuration;
pub mod offer_answer_options;
pub(crate) mod operation;
mod peer_connection_internal;
pub mod peer_connection_state;
pub mod policy;
pub mod sdp;
pub mod signaling_state;
use crate::api::media_engine::MediaEngine;
use crate::api::setting_engine::SettingEngine;
use crate::api::API;
use crate::data_channel::data_channel_init::RTCDataChannelInit;
use crate::data_channel::data_channel_parameters::DataChannelParameters;
use crate::data_channel::data_channel_state::RTCDataChannelState;
use crate::data_channel::RTCDataChannel;
use crate::dtls_transport::dtls_fingerprint::RTCDtlsFingerprint;
use crate::dtls_transport::dtls_parameters::DTLSParameters;
use crate::dtls_transport::dtls_role::{
DTLSRole, DEFAULT_DTLS_ROLE_ANSWER, DEFAULT_DTLS_ROLE_OFFER,
};
use crate::dtls_transport::dtls_transport_state::RTCDtlsTransportState;
use crate::dtls_transport::RTCDtlsTransport;
use crate::error::{flatten_errs, Error, Result};
use crate::ice_transport::ice_candidate::{RTCIceCandidate, RTCIceCandidateInit};
use crate::ice_transport::ice_connection_state::RTCIceConnectionState;
use crate::ice_transport::ice_gatherer::RTCIceGatherOptions;
use crate::ice_transport::ice_gatherer::{
OnGatheringCompleteHdlrFn, OnICEGathererStateChangeHdlrFn, OnLocalCandidateHdlrFn,
RTCIceGatherer,
};
use crate::ice_transport::ice_gatherer_state::RTCIceGathererState;
use crate::ice_transport::ice_gathering_state::RTCIceGatheringState;
use crate::ice_transport::ice_parameters::RTCIceParameters;
use crate::ice_transport::ice_role::RTCIceRole;
use crate::ice_transport::ice_transport_state::RTCIceTransportState;
use crate::ice_transport::RTCIceTransport;
use crate::peer_connection::certificate::RTCCertificate;
use crate::peer_connection::configuration::RTCConfiguration;
use crate::peer_connection::offer_answer_options::{RTCAnswerOptions, RTCOfferOptions};
use crate::peer_connection::operation::{Operation, Operations};
use crate::peer_connection::peer_connection_state::{
NegotiationNeededState, RTCPeerConnectionState,
};
use crate::peer_connection::sdp::sdp_type::RTCSdpType;
use crate::peer_connection::sdp::session_description::RTCSessionDescription;
use crate::peer_connection::sdp::*;
use crate::peer_connection::signaling_state::{
check_next_signaling_state, RTCSignalingState, StateChangeOp,
};
use crate::rtp_transceiver::rtp_codec::{RTCRtpHeaderExtensionCapability, RTPCodecType};
use crate::rtp_transceiver::rtp_receiver::RTCRtpReceiver;
use crate::rtp_transceiver::rtp_sender::RTCRtpSender;
use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
use crate::rtp_transceiver::{
find_by_mid, handle_unknown_rtp_packet, satisfy_type_and_direction, RTCRtpTransceiver,
};
use crate::rtp_transceiver::{RTCRtpTransceiverInit, SSRC};
use crate::sctp_transport::sctp_transport_capabilities::SCTPTransportCapabilities;
use crate::sctp_transport::sctp_transport_state::RTCSctpTransportState;
use crate::sctp_transport::RTCSctpTransport;
use crate::stats::StatsReport;
use crate::track::track_local::track_local_static_sample::TrackLocalStaticSample;
use crate::track::track_local::TrackLocal;
use crate::track::track_remote::TrackRemote;
use ::ice::candidate::candidate_base::unmarshal_candidate;
use ::ice::candidate::Candidate;
use ::sdp::description::session::*;
use ::sdp::util::ConnectionRole;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use interceptor::{stats, Attributes, Interceptor, RTCPWriter};
use peer_connection_internal::*;
use rand::{thread_rng, Rng};
use rcgen::KeyPair;
use srtp::stream::Stream;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::{mpsc, Mutex};
pub(crate) const SIMULCAST_PROBE_COUNT: usize = 10;
pub(crate) const SIMULCAST_MAX_PROBE_ROUTINES: u64 = 25;
pub(crate) const MEDIA_SECTION_APPLICATION: &str = "application";
const RUNES_ALPHA: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
pub fn math_rand_alpha(n: usize) -> String {
let mut rng = thread_rng();
let rand_string: String = (0..n)
.map(|_| {
let idx = rng.gen_range(0..RUNES_ALPHA.len());
RUNES_ALPHA[idx] as char
})
.collect();
rand_string
}
pub type OnSignalingStateChangeHdlrFn = Box<
dyn (FnMut(RTCSignalingState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
+ Send
+ Sync,
>;
pub type OnICEConnectionStateChangeHdlrFn = Box<
dyn (FnMut(RTCIceConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
+ Send
+ Sync,
>;
pub type OnPeerConnectionStateChangeHdlrFn = Box<
dyn (FnMut(RTCPeerConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
+ Send
+ Sync,
>;
pub type OnDataChannelHdlrFn = Box<
dyn (FnMut(Arc<RTCDataChannel>) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
+ Send
+ Sync,
>;
pub type OnTrackHdlrFn = Box<
dyn (FnMut(
Option<Arc<TrackRemote>>,
Option<Arc<RTCRtpReceiver>>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
+ Send
+ Sync,
>;
pub type OnNegotiationNeededHdlrFn =
Box<dyn (FnMut() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>) + Send + Sync>;
#[derive(Clone)]
struct StartTransportsParams {
ice_transport: Arc<RTCIceTransport>,
dtls_transport: Arc<RTCDtlsTransport>,
on_peer_connection_state_change_handler: Arc<Mutex<Option<OnPeerConnectionStateChangeHdlrFn>>>,
is_closed: Arc<AtomicBool>,
peer_connection_state: Arc<AtomicU8>,
ice_connection_state: Arc<AtomicU8>,
}
#[derive(Clone)]
struct CheckNegotiationNeededParams {
sctp_transport: Arc<RTCSctpTransport>,
rtp_transceivers: Arc<Mutex<Vec<Arc<RTCRtpTransceiver>>>>,
current_local_description: Arc<Mutex<Option<RTCSessionDescription>>>,
current_remote_description: Arc<Mutex<Option<RTCSessionDescription>>>,
}
#[derive(Clone)]
struct NegotiationNeededParams {
on_negotiation_needed_handler: Arc<ArcSwapOption<Mutex<OnNegotiationNeededHdlrFn>>>,
is_closed: Arc<AtomicBool>,
ops: Arc<Operations>,
negotiation_needed_state: Arc<AtomicU8>,
is_negotiation_needed: Arc<AtomicBool>,
signaling_state: Arc<AtomicU8>,
check_negotiation_needed_params: CheckNegotiationNeededParams,
}
pub struct RTCPeerConnection {
stats_id: String,
idp_login_url: Option<String>,
configuration: RTCConfiguration,
interceptor_rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>,
interceptor: Arc<dyn Interceptor + Send + Sync>,
pub(crate) internal: Arc<PeerConnectionInternal>,
}
impl std::fmt::Debug for RTCPeerConnection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RTCPeerConnection")
.field("stats_id", &self.stats_id)
.field("idp_login_url", &self.idp_login_url)
.field("signaling_state", &self.signaling_state())
.field("ice_connection_state", &self.ice_connection_state())
.finish()
}
}
impl std::fmt::Display for RTCPeerConnection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "(RTCPeerConnection {})", self.stats_id)
}
}
impl RTCPeerConnection {
pub(crate) async fn new(api: &API, mut configuration: RTCConfiguration) -> Result<Self> {
RTCPeerConnection::init_configuration(&mut configuration)?;
let (interceptor, stats_interceptor): (Arc<dyn Interceptor + Send + Sync>, _) = {
let mut chain = api.interceptor_registry.build_chain("")?;
let stats_interceptor = stats::make_stats_interceptor("");
chain.add(stats_interceptor.clone());
(Arc::new(chain), stats_interceptor)
};
let weak_interceptor = Arc::downgrade(&interceptor);
let (internal, configuration) =
PeerConnectionInternal::new(api, weak_interceptor, stats_interceptor, configuration)
.await?;
let internal_rtcp_writer = Arc::clone(&internal) as Arc<dyn RTCPWriter + Send + Sync>;
let interceptor_rtcp_writer = interceptor.bind_rtcp_writer(internal_rtcp_writer).await;
Ok(RTCPeerConnection {
stats_id: format!(
"PeerConnection-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos()
),
interceptor,
interceptor_rtcp_writer,
internal,
configuration,
idp_login_url: None,
})
}
fn init_configuration(configuration: &mut RTCConfiguration) -> Result<()> {
let sanitized_ice_servers = configuration.get_ice_servers();
if !sanitized_ice_servers.is_empty() {
for server in &sanitized_ice_servers {
server.validate()?;
}
}
if !configuration.certificates.is_empty() {
let now = SystemTime::now();
for cert in &configuration.certificates {
cert.expires
.duration_since(now)
.map_err(|_| Error::ErrCertificateExpired)?;
}
} else {
let kp = KeyPair::generate(&rcgen::PKCS_ECDSA_P256_SHA256)?;
let cert = RTCCertificate::from_key_pair(kp)?;
configuration.certificates = vec![cert];
};
Ok(())
}
pub fn on_signaling_state_change(&self, f: OnSignalingStateChangeHdlrFn) {
self.internal
.on_signaling_state_change_handler
.store(Some(Arc::new(Mutex::new(f))))
}
async fn do_signaling_state_change(&self, new_state: RTCSignalingState) {
log::info!("signaling state changed to {}", new_state);
if let Some(handler) = &*self.internal.on_signaling_state_change_handler.load() {
let mut f = handler.lock().await;
f(new_state).await;
}
}
pub fn on_data_channel(&self, f: OnDataChannelHdlrFn) {
self.internal
.on_data_channel_handler
.store(Some(Arc::new(Mutex::new(f))));
}
pub fn on_negotiation_needed(&self, f: OnNegotiationNeededHdlrFn) {
self.internal
.on_negotiation_needed_handler
.store(Some(Arc::new(Mutex::new(f))));
}
fn do_negotiation_needed_inner(params: &NegotiationNeededParams) -> bool {
let state: NegotiationNeededState = params
.negotiation_needed_state
.load(Ordering::SeqCst)
.into();
if state == NegotiationNeededState::Run {
params
.negotiation_needed_state
.store(NegotiationNeededState::Queue as u8, Ordering::SeqCst);
false
} else if state == NegotiationNeededState::Queue {
false
} else {
params
.negotiation_needed_state
.store(NegotiationNeededState::Run as u8, Ordering::SeqCst);
true
}
}
async fn do_negotiation_needed(params: NegotiationNeededParams) {
if !RTCPeerConnection::do_negotiation_needed_inner(¶ms) {
return;
}
let params2 = params.clone();
let _ = params
.ops
.enqueue(Operation::new(
move || {
let params3 = params2.clone();
Box::pin(async move { RTCPeerConnection::negotiation_needed_op(params3).await })
},
"do_negotiation_needed",
))
.await;
}
async fn after_negotiation_needed_op(params: NegotiationNeededParams) -> bool {
let old_negotiation_needed_state = params.negotiation_needed_state.load(Ordering::SeqCst);
params
.negotiation_needed_state
.store(NegotiationNeededState::Empty as u8, Ordering::SeqCst);
if old_negotiation_needed_state == NegotiationNeededState::Queue as u8 {
RTCPeerConnection::do_negotiation_needed_inner(¶ms)
} else {
false
}
}
async fn negotiation_needed_op(params: NegotiationNeededParams) -> bool {
let handler = &*params.on_negotiation_needed_handler.load();
if handler.is_none() {
return false;
}
if params.is_closed.load(Ordering::SeqCst) {
return false;
}
if !params.ops.is_empty().await {
return true;
}
if params.signaling_state.load(Ordering::SeqCst) != RTCSignalingState::Stable as u8 {
return RTCPeerConnection::after_negotiation_needed_op(params).await;
}
if !RTCPeerConnection::check_negotiation_needed(¶ms.check_negotiation_needed_params)
.await
{
params.is_negotiation_needed.store(false, Ordering::SeqCst);
return RTCPeerConnection::after_negotiation_needed_op(params).await;
}
if params.is_negotiation_needed.load(Ordering::SeqCst) {
return RTCPeerConnection::after_negotiation_needed_op(params).await;
}
params.is_negotiation_needed.store(true, Ordering::SeqCst);
if let Some(handler) = handler {
let mut f = handler.lock().await;
f().await;
}
RTCPeerConnection::after_negotiation_needed_op(params).await
}
async fn check_negotiation_needed(params: &CheckNegotiationNeededParams) -> bool {
let current_local_description = {
let current_local_description = params.current_local_description.lock().await;
current_local_description.clone()
};
let current_remote_description = {
let current_remote_description = params.current_remote_description.lock().await;
current_remote_description.clone()
};
if let Some(local_desc) = ¤t_local_description {
let len_data_channel = {
let data_channels = params.sctp_transport.data_channels.lock().await;
data_channels.len()
};
if len_data_channel != 0 && have_data_channel(local_desc).is_none() {
return true;
}
let transceivers = params.rtp_transceivers.lock().await;
for t in &*transceivers {
let mid = t.mid().await;
let m = get_by_mid(&mid, local_desc);
if !t.stopped.load(Ordering::SeqCst) && m.is_none() {
return true;
}
if !t.stopped.load(Ordering::SeqCst) {
if let Some(m) = m {
if t.direction().has_send() {
let dmsid = match m.attribute(ATTR_KEY_MSID).and_then(|o| o) {
Some(m) => m,
None => return true, };
let sender = match t.sender().await {
Some(s) => s.clone(),
None => {
log::warn!(
"RtpSender missing for transeceiver with sending direction {} for mid {}",
t.direction(),
mid
);
continue;
}
};
let stream_ids = sender.associated_media_stream_ids();
if stream_ids.is_empty() {
return true;
}
if dmsid.split_whitespace().next() != Some(&stream_ids[0]) {
return true;
}
}
match local_desc.sdp_type {
RTCSdpType::Offer => {
if let Some(remote_desc) = ¤t_remote_description {
if let Some(rm) =
get_by_mid(t.mid().await.as_str(), remote_desc)
{
if get_peer_direction(m) != t.direction()
&& get_peer_direction(rm) != t.direction().reverse()
{
return true;
}
} else {
return true;
}
}
}
RTCSdpType::Answer => {
let remote_desc = match ¤t_remote_description {
Some(d) => d,
None => return true,
};
let offered_direction =
match get_by_mid(t.mid().await.as_str(), remote_desc) {
Some(d) => {
let dir = get_peer_direction(d);
if dir == RTCRtpTransceiverDirection::Unspecified {
RTCRtpTransceiverDirection::Inactive
} else {
dir
}
}
None => RTCRtpTransceiverDirection::Inactive,
};
let current_direction = get_peer_direction(m);
if current_direction
!= t.direction().intersect(offered_direction.reverse())
{
return true;
}
}
_ => {}
};
}
}
if t.stopped.load(Ordering::SeqCst) && !t.mid().await.is_empty() {
let current_remote_description = params.current_remote_description.lock().await;
if let Some(remote_desc) = &*current_remote_description {
if get_by_mid(t.mid().await.as_str(), local_desc).is_some()
|| get_by_mid(t.mid().await.as_str(), remote_desc).is_some()
{
return true;
}
}
}
}
false
} else {
true
}
}
pub fn on_ice_candidate(&self, f: OnLocalCandidateHdlrFn) {
self.internal.ice_gatherer.on_local_candidate(f)
}
pub fn on_ice_gathering_state_change(&self, f: OnICEGathererStateChangeHdlrFn) {
self.internal.ice_gatherer.on_state_change(f)
}
pub fn on_track(&self, f: OnTrackHdlrFn) {
self.internal
.on_track_handler
.store(Some(Arc::new(Mutex::new(f))));
}
async fn do_track(
on_track_handler: Arc<ArcSwapOption<Mutex<OnTrackHdlrFn>>>,
t: Option<Arc<TrackRemote>>,
r: Option<Arc<RTCRtpReceiver>>,
) {
log::debug!("got new track: {:?}", t);
if t.is_some() {
tokio::spawn(async move {
if let Some(handler) = &*on_track_handler.load() {
let mut f = handler.lock().await;
f(t, r).await;
} else {
log::warn!("on_track unset, unable to handle incoming media streams");
}
});
}
}
pub fn on_ice_connection_state_change(&self, f: OnICEConnectionStateChangeHdlrFn) {
self.internal
.on_ice_connection_state_change_handler
.store(Some(Arc::new(Mutex::new(f))));
}
async fn do_ice_connection_state_change(
handler: &Arc<ArcSwapOption<Mutex<OnICEConnectionStateChangeHdlrFn>>>,
ice_connection_state: &Arc<AtomicU8>,
cs: RTCIceConnectionState,
) {
ice_connection_state.store(cs as u8, Ordering::SeqCst);
log::info!("ICE connection state changed: {}", cs);
if let Some(handler) = &*handler.load() {
let mut f = handler.lock().await;
f(cs).await;
}
}
pub fn on_peer_connection_state_change(&self, f: OnPeerConnectionStateChangeHdlrFn) {
self.internal
.on_peer_connection_state_change_handler
.store(Some(Arc::new(Mutex::new(f))));
}
async fn do_peer_connection_state_change(
handler: &Arc<ArcSwapOption<Mutex<OnPeerConnectionStateChangeHdlrFn>>>,
cs: RTCPeerConnectionState,
) {
if let Some(handler) = &*handler.load() {
let mut f = handler.lock().await;
f(cs).await;
}
}
pub fn get_configuration(&self) -> &RTCConfiguration {
&self.configuration
}
pub fn get_stats_id(&self) -> &str {
self.stats_id.as_str()
}
pub async fn create_offer(
&self,
options: Option<RTCOfferOptions>,
) -> Result<RTCSessionDescription> {
let use_identity = self.idp_login_url.is_some();
if use_identity {
return Err(Error::ErrIdentityProviderNotImplemented);
} else if self.internal.is_closed.load(Ordering::SeqCst) {
return Err(Error::ErrConnectionClosed);
}
if let Some(options) = options {
if options.ice_restart {
self.internal.ice_transport.restart().await?;
}
}
let mut count = 0;
let mut offer;
loop {
let current_transceivers = {
let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
rtp_transceivers.clone()
};
{
let current_remote_description =
self.internal.current_remote_description.lock().await;
if let Some(d) = &*current_remote_description {
if let Some(parsed) = &d.parsed {
for media in &parsed.media_descriptions {
if let Some(mid) = get_mid_value(media) {
if mid.is_empty() {
continue;
}
let numeric_mid = match mid.parse::<isize>() {
Ok(n) => n,
Err(_) => continue,
};
if numeric_mid > self.internal.greater_mid.load(Ordering::SeqCst) {
self.internal
.greater_mid
.store(numeric_mid, Ordering::SeqCst);
}
}
}
}
}
}
for t in ¤t_transceivers {
if !t.mid().await.is_empty() {
continue;
}
if let Some(gen) = &self.internal.setting_engine.mid_generator {
let current_greatest = self.internal.greater_mid.load(Ordering::SeqCst);
let mid = (gen)(current_greatest);
if let Ok(numeric_mid) = mid.parse::<isize>() {
if numeric_mid > self.internal.greater_mid.load(Ordering::SeqCst) {
self.internal
.greater_mid
.store(numeric_mid, Ordering::SeqCst);
}
}
t.set_mid(mid).await?;
} else {
let greater_mid = self.internal.greater_mid.fetch_add(1, Ordering::SeqCst);
t.set_mid(format!("{}", greater_mid + 1)).await?;
}
}
let current_remote_description_is_none = {
let current_remote_description =
self.internal.current_remote_description.lock().await;
current_remote_description.is_none()
};
let mut d = if current_remote_description_is_none {
self.internal
.generate_unmatched_sdp(current_transceivers, use_identity)
.await?
} else {
self.internal
.generate_matched_sdp(
current_transceivers,
use_identity,
true,
DEFAULT_DTLS_ROLE_OFFER.to_connection_role(),
)
.await?
};
{
let mut sdp_origin = self.internal.sdp_origin.lock().await;
update_sdp_origin(&mut sdp_origin, &mut d);
}
let sdp = d.marshal();
offer = RTCSessionDescription {
sdp_type: RTCSdpType::Offer,
sdp,
parsed: Some(d),
};
if !self.internal.has_local_description_changed(&offer).await {
break;
}
count += 1;
if count >= 128 {
return Err(Error::ErrExcessiveRetries);
}
}
{
let mut last_offer = self.internal.last_offer.lock().await;
*last_offer = offer.sdp.clone();
}
Ok(offer)
}
async fn update_connection_state(
on_peer_connection_state_change_handler: &Arc<
ArcSwapOption<Mutex<OnPeerConnectionStateChangeHdlrFn>>,
>,
is_closed: &Arc<AtomicBool>,
peer_connection_state: &Arc<AtomicU8>,
ice_connection_state: RTCIceConnectionState,
dtls_transport_state: RTCDtlsTransportState,
) {
let connection_state =
if is_closed.load(Ordering::SeqCst) {
RTCPeerConnectionState::Closed
} else if ice_connection_state == RTCIceConnectionState::Failed || dtls_transport_state == RTCDtlsTransportState::Failed {
RTCPeerConnectionState::Failed
} else if ice_connection_state == RTCIceConnectionState::Disconnected {
RTCPeerConnectionState::Disconnected
} else if ice_connection_state == RTCIceConnectionState::Connected && dtls_transport_state == RTCDtlsTransportState::Connected {
RTCPeerConnectionState::Connected
} else if ice_connection_state == RTCIceConnectionState::Checking && dtls_transport_state == RTCDtlsTransportState::Connecting {
RTCPeerConnectionState::Connecting
} else {
RTCPeerConnectionState::New
};
if peer_connection_state.load(Ordering::SeqCst) == connection_state as u8 {
return;
}
log::info!("peer connection state changed: {}", connection_state);
peer_connection_state.store(connection_state as u8, Ordering::SeqCst);
RTCPeerConnection::do_peer_connection_state_change(
on_peer_connection_state_change_handler,
connection_state,
)
.await;
}
pub async fn create_answer(
&self,
_options: Option<RTCAnswerOptions>,
) -> Result<RTCSessionDescription> {
let use_identity = self.idp_login_url.is_some();
if self.remote_description().await.is_none() {
return Err(Error::ErrNoRemoteDescription);
} else if use_identity {
return Err(Error::ErrIdentityProviderNotImplemented);
} else if self.internal.is_closed.load(Ordering::SeqCst) {
return Err(Error::ErrConnectionClosed);
} else if self.signaling_state() != RTCSignalingState::HaveRemoteOffer
&& self.signaling_state() != RTCSignalingState::HaveLocalPranswer
{
return Err(Error::ErrIncorrectSignalingState);
}
let mut connection_role = self
.internal
.setting_engine
.answering_dtls_role
.to_connection_role();
if connection_role == ConnectionRole::Unspecified {
connection_role = DEFAULT_DTLS_ROLE_ANSWER.to_connection_role();
}
let local_transceivers = self.get_transceivers().await;
let mut d = self
.internal
.generate_matched_sdp(
local_transceivers,
use_identity,
false,
connection_role,
)
.await?;
{
let mut sdp_origin = self.internal.sdp_origin.lock().await;
update_sdp_origin(&mut sdp_origin, &mut d);
}
let sdp = d.marshal();
let answer = RTCSessionDescription {
sdp_type: RTCSdpType::Answer,
sdp,
parsed: Some(d),
};
{
let mut last_answer = self.internal.last_answer.lock().await;
*last_answer = answer.sdp.clone();
}
Ok(answer)
}
pub(crate) async fn set_description(
&self,
sd: &RTCSessionDescription,
op: StateChangeOp,
) -> Result<()> {
if self.internal.is_closed.load(Ordering::SeqCst) {
return Err(Error::ErrConnectionClosed);
} else if sd.sdp_type == RTCSdpType::Unspecified {
return Err(Error::ErrPeerConnSDPTypeInvalidValue);
}
let next_state = {
let cur = self.signaling_state();
let new_sdpdoes_not_match_offer = Error::ErrSDPDoesNotMatchOffer;
let new_sdpdoes_not_match_answer = Error::ErrSDPDoesNotMatchAnswer;
match op {
StateChangeOp::SetLocal => {
match sd.sdp_type {
RTCSdpType::Offer => {
let check = {
let last_offer = self.internal.last_offer.lock().await;
sd.sdp != *last_offer
};
if check {
Err(new_sdpdoes_not_match_offer)
} else {
let next_state = check_next_signaling_state(
cur,
RTCSignalingState::HaveLocalOffer,
StateChangeOp::SetLocal,
sd.sdp_type,
);
if next_state.is_ok() {
let mut pending_local_description =
self.internal.pending_local_description.lock().await;
*pending_local_description = Some(sd.clone());
}
next_state
}
}
RTCSdpType::Answer => {
let check = {
let last_answer = self.internal.last_answer.lock().await;
sd.sdp != *last_answer
};
if check {
Err(new_sdpdoes_not_match_answer)
} else {
let next_state = check_next_signaling_state(
cur,
RTCSignalingState::Stable,
StateChangeOp::SetLocal,
sd.sdp_type,
);
if next_state.is_ok() {
let pending_remote_description = {
let mut pending_remote_description =
self.internal.pending_remote_description.lock().await;
pending_remote_description.take()
};
let _pending_local_description = {
let mut pending_local_description =
self.internal.pending_local_description.lock().await;
pending_local_description.take()
};
{
let mut current_local_description =
self.internal.current_local_description.lock().await;
*current_local_description = Some(sd.clone());
}
{
let mut current_remote_description =
self.internal.current_remote_description.lock().await;
*current_remote_description = pending_remote_description;
}
}
next_state
}
}
RTCSdpType::Rollback => {
let next_state = check_next_signaling_state(
cur,
RTCSignalingState::Stable,
StateChangeOp::SetLocal,
sd.sdp_type,
);
if next_state.is_ok() {
let mut pending_local_description =
self.internal.pending_local_description.lock().await;
*pending_local_description = None;
}
next_state
}
RTCSdpType::Pranswer => {
let check = {
let last_answer = self.internal.last_answer.lock().await;
sd.sdp != *last_answer
};
if check {
Err(new_sdpdoes_not_match_answer)
} else {
let next_state = check_next_signaling_state(
cur,
RTCSignalingState::HaveLocalPranswer,
StateChangeOp::SetLocal,
sd.sdp_type,
);
if next_state.is_ok() {
let mut pending_local_description =
self.internal.pending_local_description.lock().await;
*pending_local_description = Some(sd.clone());
}
next_state
}
}
_ => Err(Error::ErrPeerConnStateChangeInvalid),
}
}
StateChangeOp::SetRemote => {
match sd.sdp_type {
RTCSdpType::Offer => {
let next_state = check_next_signaling_state(
cur,
RTCSignalingState::HaveRemoteOffer,
StateChangeOp::SetRemote,
sd.sdp_type,
);
if next_state.is_ok() {
let mut pending_remote_description =
self.internal.pending_remote_description.lock().await;
*pending_remote_description = Some(sd.clone());
}
next_state
}
RTCSdpType::Answer => {
let next_state = check_next_signaling_state(
cur,
RTCSignalingState::Stable,
StateChangeOp::SetRemote,
sd.sdp_type,
);
if next_state.is_ok() {
let pending_local_description = {
let mut pending_local_description =
self.internal.pending_local_description.lock().await;
pending_local_description.take()
};
let _pending_remote_description = {
let mut pending_remote_description =
self.internal.pending_remote_description.lock().await;
pending_remote_description.take()
};
{
let mut current_remote_description =
self.internal.current_remote_description.lock().await;
*current_remote_description = Some(sd.clone());
}
{
let mut current_local_description =
self.internal.current_local_description.lock().await;
*current_local_description = pending_local_description;
}
}
next_state
}
RTCSdpType::Rollback => {
let next_state = check_next_signaling_state(
cur,
RTCSignalingState::Stable,
StateChangeOp::SetRemote,
sd.sdp_type,
);
if next_state.is_ok() {
let mut pending_remote_description =
self.internal.pending_remote_description.lock().await;
*pending_remote_description = None;
}
next_state
}
RTCSdpType::Pranswer => {
let next_state = check_next_signaling_state(
cur,
RTCSignalingState::HaveRemotePranswer,
StateChangeOp::SetRemote,
sd.sdp_type,
);
if next_state.is_ok() {
let mut pending_remote_description =
self.internal.pending_remote_description.lock().await;
*pending_remote_description = Some(sd.clone());
}
next_state
}
_ => Err(Error::ErrPeerConnStateChangeInvalid),
}
} }
};
match next_state {
Ok(next_state) => {
self.internal
.signaling_state
.store(next_state as u8, Ordering::SeqCst);
if self.signaling_state() == RTCSignalingState::Stable {
self.internal
.is_negotiation_needed
.store(false, Ordering::SeqCst);
self.internal.trigger_negotiation_needed().await;
}
self.do_signaling_state_change(next_state).await;
Ok(())
}
Err(err) => Err(err),
}
}
pub async fn set_local_description(&self, mut desc: RTCSessionDescription) -> Result<()> {
if self.internal.is_closed.load(Ordering::SeqCst) {
return Err(Error::ErrConnectionClosed);
}
let have_local_description = {
let current_local_description = self.internal.current_local_description.lock().await;
current_local_description.is_some()
};
if desc.sdp.is_empty() {
match desc.sdp_type {
RTCSdpType::Answer | RTCSdpType::Pranswer => {
let last_answer = self.internal.last_answer.lock().await;
desc.sdp = last_answer.clone();
}
RTCSdpType::Offer => {
let last_offer = self.internal.last_offer.lock().await;
desc.sdp = last_offer.clone();
}
_ => return Err(Error::ErrPeerConnSDPTypeInvalidValueSetLocalDescription),
}
}
desc.parsed = Some(desc.unmarshal()?);
self.set_description(&desc, StateChangeOp::SetLocal).await?;
let we_answer = desc.sdp_type == RTCSdpType::Answer;
let remote_description = self.remote_description().await;
let mut local_transceivers = self.get_transceivers().await;
if we_answer {
if let Some(parsed) = desc.parsed {
for media in &parsed.media_descriptions {
if media.media_name.media == MEDIA_SECTION_APPLICATION {
continue;
}
let kind = RTPCodecType::from(media.media_name.media.as_str());
let direction = get_peer_direction(media);
if kind == RTPCodecType::Unspecified
|| direction == RTCRtpTransceiverDirection::Unspecified
{
continue;
}
let mid_value = match get_mid_value(media) {
Some(mid) if !mid.is_empty() => mid,
_ => continue,
};
let t = match find_by_mid(mid_value, &mut local_transceivers).await {
Some(t) => t,
None => continue,
};
let previous_direction = t.current_direction();
t.set_current_direction(direction);
t.process_new_current_direction(previous_direction).await?;
}
}
if let Some(remote_desc) = remote_description {
self.start_rtp_senders().await?;
let pci = Arc::clone(&self.internal);
let remote_desc = Arc::new(remote_desc);
self.internal
.ops
.enqueue(Operation::new(
move || {
let pc = Arc::clone(&pci);
let rd = Arc::clone(&remote_desc);
Box::pin(async move {
let _ = pc.start_rtp(have_local_description, rd).await;
false
})
},
"set_local_description",
))
.await?;
}
}
if self.internal.ice_gatherer.state() == RTCIceGathererState::New {
self.internal.ice_gatherer.gather().await
} else {
Ok(())
}
}
pub async fn local_description(&self) -> Option<RTCSessionDescription> {
if let Some(pending_local_description) = self.pending_local_description().await {
return Some(pending_local_description);
}
self.current_local_description().await
}
pub async fn set_remote_description(&self, mut desc: RTCSessionDescription) -> Result<()> {
if self.internal.is_closed.load(Ordering::SeqCst) {
return Err(Error::ErrConnectionClosed);
}
let is_renegotation = {
let current_remote_description = self.internal.current_remote_description.lock().await;
current_remote_description.is_some()
};
desc.parsed = Some(desc.unmarshal()?);
self.set_description(&desc, StateChangeOp::SetRemote)
.await?;
if let Some(parsed) = &desc.parsed {
self.internal
.media_engine
.update_from_remote_description(parsed)
.await?;
let mut local_transceivers = self.get_transceivers().await;
let remote_description = self.remote_description().await;
let we_offer = desc.sdp_type == RTCSdpType::Answer;
if !we_offer {
if let Some(parsed) = remote_description.as_ref().and_then(|r| r.parsed.as_ref()) {
for media in &parsed.media_descriptions {
let mid_value = match get_mid_value(media) {
Some(m) => {
if m.is_empty() {
return Err(Error::ErrPeerConnRemoteDescriptionWithoutMidValue);
} else {
m
}
}
None => continue,
};
if media.media_name.media == MEDIA_SECTION_APPLICATION {
continue;
}
let kind = RTPCodecType::from(media.media_name.media.as_str());
let direction = get_peer_direction(media);
if kind == RTPCodecType::Unspecified
|| direction == RTCRtpTransceiverDirection::Unspecified
{
continue;
}
let t = if let Some(t) =
find_by_mid(mid_value, &mut local_transceivers).await
{
Some(t)
} else {
satisfy_type_and_direction(kind, direction, &mut local_transceivers)
.await
};
if let Some(t) = t {
if t.mid().await.is_empty() {
t.set_mid(mid_value.to_owned()).await?;
}
} else {
let receiver = Arc::new(RTCRtpReceiver::new(
self.internal.setting_engine.get_receive_mtu(),
kind,
Arc::clone(&self.internal.dtls_transport),
Arc::clone(&self.internal.media_engine),
Arc::clone(&self.interceptor),
));
let local_direction =
if direction == RTCRtpTransceiverDirection::Recvonly {
RTCRtpTransceiverDirection::Sendonly
} else {
RTCRtpTransceiverDirection::Recvonly
};
let t = RTCRtpTransceiver::new(
Some(receiver),
None,
local_direction,
kind,
vec![],
Arc::clone(&self.internal.media_engine),
Some(Box::new(self.internal.make_negotiation_needed_trigger())),
)
.await;
self.internal.add_rtp_transceiver(Arc::clone(&t)).await;
if t.mid().await.is_empty() {
t.set_mid(mid_value.to_owned()).await?;
}
}
}
}
}
if we_offer {
if let Some(parsed) = remote_description.as_ref().and_then(|r| r.parsed.as_ref()) {
for media in &parsed.media_descriptions {
let mid_value = match get_mid_value(media) {
Some(m) => {
if m.is_empty() {
return Err(Error::ErrPeerConnRemoteDescriptionWithoutMidValue);
} else {
m
}
}
None => continue,
};
if media.media_name.media == MEDIA_SECTION_APPLICATION {
continue;
}
let kind = RTPCodecType::from(media.media_name.media.as_str());
let direction = get_peer_direction(media);
if kind == RTPCodecType::Unspecified
|| direction == RTCRtpTransceiverDirection::Unspecified
{
continue;
}
if let Some(t) = find_by_mid(mid_value, &mut local_transceivers).await {
let previous_direction = t.current_direction();
let reversed_direction = direction.reverse();
t.set_current_direction(reversed_direction);
t.process_new_current_direction(previous_direction).await?;
}
}
}
}
let (remote_ufrag, remote_pwd, candidates) = extract_ice_details(parsed).await?;
if is_renegotation
&& self
.internal
.ice_transport
.have_remote_credentials_change(&remote_ufrag, &remote_pwd)
.await
{
if !we_offer {
self.internal.ice_transport.restart().await?;
}
self.internal
.ice_transport
.set_remote_credentials(remote_ufrag.clone(), remote_pwd.clone())
.await?;
}
for candidate in candidates {
self.internal
.ice_transport
.add_remote_candidate(Some(candidate))
.await?;
}
if is_renegotation {
if we_offer {
self.start_rtp_senders().await?;
let pci = Arc::clone(&self.internal);
let remote_desc = Arc::new(desc);
self.internal
.ops
.enqueue(Operation::new(
move || {
let pc = Arc::clone(&pci);
let rd = Arc::clone(&remote_desc);
Box::pin(async move {
let _ = pc.start_rtp(true, rd).await;
false
})
},
"set_remote_description renegotiation",
))
.await?;
}
return Ok(());
}
let mut remote_is_lite = false;
for a in &parsed.attributes {
if a.key.trim() == ATTR_KEY_ICELITE {
remote_is_lite = true;
break;
}
}
let (fingerprint, fingerprint_hash) = extract_fingerprint(parsed)?;
let ice_role = if (we_offer
&& remote_is_lite == self.internal.setting_engine.candidates.ice_lite)
|| (remote_is_lite && !self.internal.setting_engine.candidates.ice_lite)
{
RTCIceRole::Controlling
} else {
RTCIceRole::Controlled
};
if we_offer {
self.start_rtp_senders().await?;
}
let pci = Arc::clone(&self.internal);
let dtls_role = DTLSRole::from(parsed);
let remote_desc = Arc::new(desc);
self.internal
.ops
.enqueue(Operation::new(
move || {
let pc = Arc::clone(&pci);
let rd = Arc::clone(&remote_desc);
let ru = remote_ufrag.clone();
let rp = remote_pwd.clone();
let fp = fingerprint.clone();
let fp_hash = fingerprint_hash.clone();
Box::pin(async move {
log::trace!(
"start_transports: ice_role={}, dtls_role={}",
ice_role,
dtls_role,
);
pc.start_transports(ice_role, dtls_role, ru, rp, fp, fp_hash)
.await;
if we_offer {
let _ = pc.start_rtp(false, rd).await;
}
false
})
},
"set_remote_description",
))
.await?;
}
Ok(())
}
pub(crate) async fn start_rtp_senders(&self) -> Result<()> {
let current_transceivers = self.internal.rtp_transceivers.lock().await;
for transceiver in &*current_transceivers {
if let Some(sender) = transceiver.sender().await {
if sender.is_negotiated() && !sender.has_sent().await {
sender.send(&sender.get_parameters().await).await?;
}
}
}
Ok(())
}
pub async fn remote_description(&self) -> Option<RTCSessionDescription> {
self.internal.remote_description().await
}
pub async fn add_ice_candidate(&self, candidate: RTCIceCandidateInit) -> Result<()> {
if self.remote_description().await.is_none() {
return Err(Error::ErrNoRemoteDescription);
}
let candidate_value = match candidate.candidate.strip_prefix("candidate:") {
Some(s) => s,
None => candidate.candidate.as_str(),
};
let ice_candidate = if !candidate_value.is_empty() {
let candidate: Arc<dyn Candidate + Send + Sync> =
Arc::new(unmarshal_candidate(candidate_value)?);
Some(RTCIceCandidate::from(&candidate))
} else {
None
};
self.internal
.ice_transport
.add_remote_candidate(ice_candidate)
.await
}
pub fn ice_connection_state(&self) -> RTCIceConnectionState {
self.internal
.ice_connection_state
.load(Ordering::SeqCst)
.into()
}
pub async fn get_senders(&self) -> Vec<Arc<RTCRtpSender>> {
let mut senders = vec![];
let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
for transceiver in &*rtp_transceivers {
if let Some(sender) = transceiver.sender().await {
senders.push(sender);
}
}
senders
}
pub async fn get_receivers(&self) -> Vec<Arc<RTCRtpReceiver>> {
let mut receivers = vec![];
let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
for transceiver in &*rtp_transceivers {
if let Some(receiver) = transceiver.receiver().await {
receivers.push(receiver);
}
}
receivers
}
pub async fn get_transceivers(&self) -> Vec<Arc<RTCRtpTransceiver>> {
let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
rtp_transceivers.clone()
}
pub async fn add_track(
&self,
track: Arc<dyn TrackLocal + Send + Sync>,
) -> Result<Arc<RTCRtpSender>> {
if self.internal.is_closed.load(Ordering::SeqCst) {
return Err(Error::ErrConnectionClosed);
}
{
let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
for t in &*rtp_transceivers {
if !t.stopped.load(Ordering::SeqCst)
&& t.kind == track.kind()
&& t.sender().await.is_none()
{
let sender = Arc::new(
RTCRtpSender::new(
self.internal.setting_engine.get_receive_mtu(),
Arc::clone(&track),
Arc::clone(&self.internal.dtls_transport),
Arc::clone(&self.internal.media_engine),
Arc::clone(&self.interceptor),
false, )
.await,
);
if let Err(err) = t
.set_sender_track(Some(Arc::clone(&sender)), Some(Arc::clone(&track)))
.await
{
let _ = sender.stop().await;
t.set_sender(None).await;
return Err(err);
}
self.internal.trigger_negotiation_needed().await;
return Ok(sender);
}
}
}
let transceiver = self
.internal
.new_transceiver_from_track(RTCRtpTransceiverDirection::Sendrecv, track)
.await?;
self.internal
.add_rtp_transceiver(Arc::clone(&transceiver))
.await;
match transceiver.sender().await {
Some(sender) => Ok(sender),
None => Err(Error::ErrRTPSenderNil),
}
}
pub async fn remove_track(&self, sender: &Arc<RTCRtpSender>) -> Result<()> {
if self.internal.is_closed.load(Ordering::SeqCst) {
return Err(Error::ErrConnectionClosed);
}
let mut transceiver = None;
{
let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
for t in &*rtp_transceivers {
if let Some(s) = t.sender().await {
if s.id == sender.id {
transceiver = Some(t.clone());
break;
}
}
}
}
let t = transceiver.ok_or(Error::ErrSenderNotCreatedByConnection)?;
t.set_direction_internal(RTCRtpTransceiverDirection::from_send_recv(
false,
t.direction().has_recv(),
));
let sender_result = sender.stop().await;
let sending_track_result = t.set_sending_track(None).await;
if sender_result.is_ok() && sending_track_result.is_ok() {
self.internal.trigger_negotiation_needed().await;
}
Ok(())
}
pub async fn add_transceiver_from_kind(
&self,
kind: RTPCodecType,
init: &[RTCRtpTransceiverInit],
) -> Result<Arc<RTCRtpTransceiver>> {
self.internal.add_transceiver_from_kind(kind, init).await
}
pub async fn add_transceiver_from_track<'a>(
&'a self,
track: Arc<dyn TrackLocal + Send + Sync>,
init: &'a [RTCRtpTransceiverInit],
) -> Result<Arc<RTCRtpTransceiver>> {
if self.internal.is_closed.load(Ordering::SeqCst) {
return Err(Error::ErrConnectionClosed);
}
let direction = match init.len() {
0 => RTCRtpTransceiverDirection::Sendrecv,
1 => init[0].direction,
_ => return Err(Error::ErrPeerConnAddTransceiverFromTrackOnlyAcceptsOne),
};
let t = self
.internal
.new_transceiver_from_track(direction, track)
.await?;
self.internal.add_rtp_transceiver(Arc::clone(&t)).await;
Ok(t)
}
pub async fn create_data_channel(
&self,
label: &str,
options: Option<RTCDataChannelInit>,
) -> Result<Arc<RTCDataChannel>> {
if self.internal.is_closed.load(Ordering::SeqCst) {
return Err(Error::ErrConnectionClosed);
}
let mut params = DataChannelParameters {
label: label.to_owned(),
ordered: true,
..Default::default()
};
if let Some(options) = options {
if let Some(ordered) = options.ordered {
params.ordered = ordered;
}
if let Some(max_packet_life_time) = options.max_packet_life_time {
params.max_packet_life_time = max_packet_life_time;
}
if let Some(max_retransmits) = options.max_retransmits {
params.max_retransmits = max_retransmits;
}
if let Some(protocol) = options.protocol {
params.protocol = protocol;
}
if params.protocol.len() > 65535 {
return Err(Error::ErrProtocolTooLarge);
}
params.negotiated = options.negotiated;
}
let d = Arc::new(RTCDataChannel::new(
params,
Arc::clone(&self.internal.setting_engine),
));
if d.max_packet_lifetime != 0 && d.max_retransmits != 0 {
return Err(Error::ErrRetransmitsOrPacketLifeTime);
}
{
let mut data_channels = self.internal.sctp_transport.data_channels.lock().await;
data_channels.push(Arc::clone(&d));
}
self.internal
.sctp_transport
.data_channels_requested
.fetch_add(1, Ordering::SeqCst);
if self.internal.sctp_transport.state() == RTCSctpTransportState::Connected {
d.open(Arc::clone(&self.internal.sctp_transport)).await?;
}
self.internal.trigger_negotiation_needed().await;
Ok(d)
}
pub fn set_identity_provider(&self, _provider: &str) -> Result<()> {
Err(Error::ErrPeerConnSetIdentityProviderNotImplemented)
}
pub async fn write_rtcp(
&self,
pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
) -> Result<usize> {
let a = Attributes::new();
Ok(self.interceptor_rtcp_writer.write(pkts, &a).await?)
}
pub async fn close(&self) -> Result<()> {
if self.internal.is_closed.load(Ordering::SeqCst) {
return Ok(());
}
self.internal.is_closed.store(true, Ordering::SeqCst);
self.internal
.signaling_state
.store(RTCSignalingState::Closed as u8, Ordering::SeqCst);
let mut close_errs = vec![];
if let Err(err) = self.interceptor.close().await {
close_errs.push(Error::new(format!("interceptor: {}", err)));
}
{
let mut rtp_transceivers = self.internal.rtp_transceivers.lock().await;
for t in &*rtp_transceivers {
if let Err(err) = t.stop().await {
close_errs.push(Error::new(format!("rtp_transceivers: {}", err)));
}
}
rtp_transceivers.clear();
}
{
let mut data_channels = self.internal.sctp_transport.data_channels.lock().await;
for d in &*data_channels {
if let Err(err) = d.close().await {
close_errs.push(Error::new(format!("data_channels: {}", err)));
}
}
data_channels.clear();
}
if let Err(err) = self.internal.sctp_transport.stop().await {
close_errs.push(Error::new(format!("sctp_transport: {}", err)));
}
if let Err(err) = self.internal.dtls_transport.stop().await {
close_errs.push(Error::new(format!("dtls_transport: {}", err)));
}
if let Err(err) = self.internal.ice_transport.stop().await {
close_errs.push(Error::new(format!("dtls_transport: {}", err)));
}
RTCPeerConnection::update_connection_state(
&self.internal.on_peer_connection_state_change_handler,
&self.internal.is_closed,
&self.internal.peer_connection_state,
self.ice_connection_state(),
self.internal.dtls_transport.state(),
)
.await;
if let Err(err) = self.internal.ops.close().await {
close_errs.push(Error::new(format!("ops: {}", err)));
}
flatten_errs(close_errs)
}
pub async fn current_local_description(&self) -> Option<RTCSessionDescription> {
let local_description = {
let current_local_description = self.internal.current_local_description.lock().await;
current_local_description.clone()
};
let ice_gather = Some(&self.internal.ice_gatherer);
let ice_gathering_state = self.ice_gathering_state();
populate_local_candidates(local_description.as_ref(), ice_gather, ice_gathering_state).await
}
pub async fn pending_local_description(&self) -> Option<RTCSessionDescription> {
let local_description = {
let pending_local_description = self.internal.pending_local_description.lock().await;
pending_local_description.clone()
};
let ice_gather = Some(&self.internal.ice_gatherer);
let ice_gathering_state = self.ice_gathering_state();
populate_local_candidates(local_description.as_ref(), ice_gather, ice_gathering_state).await
}
pub async fn current_remote_description(&self) -> Option<RTCSessionDescription> {
let current_remote_description = self.internal.current_remote_description.lock().await;
current_remote_description.clone()
}
pub async fn pending_remote_description(&self) -> Option<RTCSessionDescription> {
let pending_remote_description = self.internal.pending_remote_description.lock().await;
pending_remote_description.clone()
}
pub fn signaling_state(&self) -> RTCSignalingState {
self.internal.signaling_state.load(Ordering::SeqCst).into()
}
pub fn ice_gathering_state(&self) -> RTCIceGatheringState {
self.internal.ice_gathering_state()
}
pub fn connection_state(&self) -> RTCPeerConnectionState {
self.internal
.peer_connection_state
.load(Ordering::SeqCst)
.into()
}
pub async fn get_stats(&self) -> StatsReport {
self.internal
.get_stats(self.get_stats_id().to_owned())
.await
.into()
}
pub fn sctp(&self) -> Arc<RTCSctpTransport> {
Arc::clone(&self.internal.sctp_transport)
}
pub async fn gathering_complete_promise(&self) -> mpsc::Receiver<()> {
let (gathering_complete_tx, gathering_complete_rx) = mpsc::channel(1);
let done = Arc::new(Mutex::new(Some(gathering_complete_tx)));
let done2 = Arc::clone(&done);
self.internal.set_gather_complete_handler(Box::new(move || {
log::trace!("setGatherCompleteHandler");
let done3 = Arc::clone(&done2);
Box::pin(async move {
let mut d = done3.lock().await;
d.take();
})
}));
if self.ice_gathering_state() == RTCIceGatheringState::Complete {
log::trace!("ICEGatheringState::Complete");
let mut d = done.lock().await;
d.take();
}
gathering_complete_rx
}
}