pub(super) mod active_speaker_observer;
pub(super) mod audio_level_observer;
pub(super) mod consumer;
pub(super) mod data_consumer;
pub(super) mod data_producer;
pub(super) mod direct_transport;
pub(super) mod pipe_transport;
pub(super) mod plain_transport;
pub(super) mod producer;
pub(super) mod rtp_observer;
#[cfg(test)]
mod tests;
pub(super) mod transport;
pub(super) mod webrtc_transport;
use crate::active_speaker_observer::{ActiveSpeakerObserver, ActiveSpeakerObserverOptions};
use crate::audio_level_observer::{AudioLevelObserver, AudioLevelObserverOptions};
use crate::consumer::{Consumer, ConsumerId, ConsumerOptions};
use crate::data_consumer::{DataConsumer, DataConsumerId, DataConsumerOptions};
use crate::data_producer::{
DataProducer, DataProducerId, DataProducerOptions, NonClosingDataProducer, WeakDataProducer,
};
use crate::direct_transport::{DirectTransport, DirectTransportOptions};
use crate::messages::{
RouterCloseRequest, RouterCreateActiveSpeakerObserverData,
RouterCreateActiveSpeakerObserverRequest, RouterCreateAudioLevelObserverData,
RouterCreateAudioLevelObserverRequest, RouterCreateDirectTransportData,
RouterCreateDirectTransportRequest, RouterCreatePipeTransportData,
RouterCreatePipeTransportRequest, RouterCreatePlainTransportData,
RouterCreatePlainTransportRequest, RouterCreateWebRtcTransportRequest,
RouterCreateWebRtcTransportWithServerRequest, RouterCreateWebrtcTransportData,
RouterDumpRequest,
};
use crate::pipe_transport::{
PipeTransport, PipeTransportOptions, PipeTransportRemoteParameters, WeakPipeTransport,
};
use crate::plain_transport::{PlainTransport, PlainTransportOptions};
use crate::producer::{PipedProducer, Producer, ProducerId, ProducerOptions, WeakProducer};
use crate::rtp_observer::{RtpObserver, RtpObserverId};
use crate::transport::{
ConsumeDataError, ConsumeError, ProduceDataError, ProduceError, Transport, TransportGeneric,
TransportId,
};
use crate::webrtc_transport::{WebRtcTransport, WebRtcTransportListen, WebRtcTransportOptions};
use crate::worker::{Channel, RequestError, Worker};
use crate::{ortc, uuid_based_wrapper_type};
use async_executor::Executor;
use async_lock::Mutex as AsyncMutex;
use event_listener_primitives::{Bag, BagOnce, HandlerId};
use futures_lite::future;
use hash_hasher::{HashedMap, HashedSet};
use log::{debug, error};
use mediasoup_types::data_structures::{AppData, ListenInfo, Protocol};
use mediasoup_types::rtp_parameters::{
RtpCapabilities, RtpCapabilitiesFinalized, RtpCodecCapability,
};
use mediasoup_types::sctp_parameters::NumSctpStreams;
use parking_lot::{Mutex, RwLock};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::net::{IpAddr, Ipv4Addr};
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex as SyncMutex;
use std::sync::{Arc, Weak};
use thiserror::Error;
uuid_based_wrapper_type!(
RouterId
);
#[derive(Debug)]
#[non_exhaustive]
pub struct RouterOptions {
pub media_codecs: Vec<RtpCodecCapability>,
pub app_data: AppData,
}
impl RouterOptions {
#[must_use]
pub fn new(media_codecs: Vec<RtpCodecCapability>) -> Self {
Self {
media_codecs,
app_data: AppData::default(),
}
}
}
impl Default for RouterOptions {
fn default() -> Self {
Self::new(vec![])
}
}
#[derive(Debug)]
pub struct PipeToRouterOptions {
pub router: Router,
pub keep_id: bool,
listen_info: ListenInfo,
pub enable_sctp: bool,
pub num_sctp_streams: NumSctpStreams,
pub enable_rtx: bool,
pub enable_srtp: bool,
}
impl PipeToRouterOptions {
#[must_use]
pub fn new(router: Router) -> Self {
Self {
router,
keep_id: true,
listen_info: ListenInfo {
protocol: Protocol::Udp,
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_address: None,
expose_internal_ip: false,
port: None,
port_range: None,
flags: None,
send_buffer_size: None,
recv_buffer_size: None,
},
enable_sctp: true,
num_sctp_streams: NumSctpStreams::default(),
enable_rtx: false,
enable_srtp: false,
}
}
}
#[derive(Debug)]
pub struct PipeProducerToRouterPair {
pub pipe_consumer: Consumer,
pub pipe_producer: PipedProducer,
}
#[derive(Debug, Error)]
pub enum PipeProducerToRouterError {
#[error("Destination router must be different")]
SameRouter,
#[error("Producer with id \"{0}\" not found")]
ProducerNotFound(ProducerId),
#[error("Failed to create or connect Pipe transport: \"{0}\"")]
TransportFailed(RequestError),
#[error("Failed to consume: \"{0}\"")]
ConsumeFailed(ConsumeError),
#[error("Failed to produce: \"{0}\"")]
ProduceFailed(ProduceError),
}
impl From<RequestError> for PipeProducerToRouterError {
fn from(error: RequestError) -> Self {
PipeProducerToRouterError::TransportFailed(error)
}
}
impl From<ConsumeError> for PipeProducerToRouterError {
fn from(error: ConsumeError) -> Self {
PipeProducerToRouterError::ConsumeFailed(error)
}
}
impl From<ProduceError> for PipeProducerToRouterError {
fn from(error: ProduceError) -> Self {
PipeProducerToRouterError::ProduceFailed(error)
}
}
#[derive(Debug)]
pub struct PipeDataProducerToRouterPair {
pub pipe_data_consumer: DataConsumer,
pub pipe_data_producer: NonClosingDataProducer,
}
#[derive(Debug, Error)]
pub enum PipeDataProducerToRouterError {
#[error("Destination router must be different")]
SameRouter,
#[error("Data producer with id \"{0}\" not found")]
DataProducerNotFound(DataProducerId),
#[error("Failed to create or connect Pipe transport: \"{0}\"")]
TransportFailed(RequestError),
#[error("Failed to consume: \"{0}\"")]
ConsumeFailed(ConsumeDataError),
#[error("Failed to produce: \"{0}\"")]
ProduceFailed(ProduceDataError),
}
impl From<RequestError> for PipeDataProducerToRouterError {
fn from(error: RequestError) -> Self {
PipeDataProducerToRouterError::TransportFailed(error)
}
}
impl From<ConsumeDataError> for PipeDataProducerToRouterError {
fn from(error: ConsumeDataError) -> Self {
PipeDataProducerToRouterError::ConsumeFailed(error)
}
}
impl From<ProduceDataError> for PipeDataProducerToRouterError {
fn from(error: ProduceDataError) -> Self {
PipeDataProducerToRouterError::ProduceFailed(error)
}
}
#[derive(Debug, Error)]
pub enum UpdateMediaCodecsError {
#[error("RTP capabilities generation error: {0}")]
FailedRtpCapabilitiesGeneration(ortc::RtpCapabilitiesError),
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
#[non_exhaustive]
pub struct RouterDump {
pub id: RouterId,
pub map_consumer_id_producer_id: HashedMap<ConsumerId, ProducerId>,
pub map_data_consumer_id_data_producer_id: HashedMap<DataConsumerId, DataProducerId>,
pub map_data_producer_id_data_consumer_ids:
HashedMap<DataProducerId, HashedSet<DataConsumerId>>,
pub map_producer_id_consumer_ids: HashedMap<ProducerId, HashedSet<ConsumerId>>,
pub map_producer_id_observer_ids: HashedMap<ProducerId, HashedSet<RtpObserverId>>,
pub rtp_observer_ids: HashedSet<RtpObserverId>,
pub transport_ids: HashedSet<TransportId>,
}
#[derive(Debug)]
pub enum NewTransport<'a> {
Direct(&'a DirectTransport),
Pipe(&'a PipeTransport),
Plain(&'a PlainTransport),
WebRtc(&'a WebRtcTransport),
}
impl<'a> Deref for NewTransport<'a> {
type Target = dyn Transport;
fn deref(&self) -> &Self::Target {
match self {
Self::Direct(transport) => *transport as &Self::Target,
Self::Pipe(transport) => *transport as &Self::Target,
Self::Plain(transport) => *transport as &Self::Target,
Self::WebRtc(transport) => *transport as &Self::Target,
}
}
}
#[derive(Debug)]
pub enum NewRtpObserver<'a> {
AudioLevel(&'a AudioLevelObserver),
ActiveSpeaker(&'a ActiveSpeakerObserver),
}
impl<'a> Deref for NewRtpObserver<'a> {
type Target = dyn RtpObserver;
fn deref(&self) -> &Self::Target {
match self {
Self::AudioLevel(observer) => *observer as &Self::Target,
Self::ActiveSpeaker(observer) => *observer as &Self::Target,
}
}
}
struct PipeTransportPair {
local: PipeTransport,
remote: PipeTransport,
}
impl PipeTransportPair {
fn downgrade(&self) -> WeakPipeTransportPair {
WeakPipeTransportPair {
local: self.local.downgrade(),
remote: self.remote.downgrade(),
}
}
}
#[derive(Debug)]
struct WeakPipeTransportPair {
local: WeakPipeTransport,
remote: WeakPipeTransport,
}
impl WeakPipeTransportPair {
fn upgrade(&self) -> Option<PipeTransportPair> {
let local = self.local.upgrade()?;
let remote = self.remote.upgrade()?;
Some(PipeTransportPair { local, remote })
}
}
#[derive(Default)]
#[allow(clippy::type_complexity)]
struct Handlers {
new_transport: Bag<Arc<dyn Fn(NewTransport<'_>) + Send + Sync>>,
new_rtp_observer: Bag<Arc<dyn Fn(NewRtpObserver<'_>) + Send + Sync>>,
worker_close: BagOnce<Box<dyn FnOnce() + Send>>,
close: BagOnce<Box<dyn FnOnce() + Send>>,
}
struct Inner {
id: RouterId,
executor: Arc<Executor<'static>>,
rtp_capabilities: SyncMutex<RtpCapabilitiesFinalized>,
channel: Channel,
handlers: Arc<Handlers>,
app_data: AppData,
producers: Arc<RwLock<HashedMap<ProducerId, WeakProducer>>>,
data_producers: Arc<RwLock<HashedMap<DataProducerId, WeakDataProducer>>>,
#[allow(clippy::type_complexity)]
mapped_pipe_transports:
Arc<Mutex<HashedMap<RouterId, Arc<AsyncMutex<Option<WeakPipeTransportPair>>>>>>,
worker: Worker,
closed: AtomicBool,
_on_worker_close_handler: Mutex<HandlerId>,
}
impl Drop for Inner {
fn drop(&mut self) {
debug!("drop()");
self.close();
}
}
impl Inner {
fn close(&self) {
if !self.closed.swap(true, Ordering::SeqCst) {
self.handlers.close.call_simple();
{
let channel = self.channel.clone();
let request = RouterCloseRequest { router_id: self.id };
self.executor
.spawn(async move {
match channel.request("", request).await {
Err(RequestError::ChannelClosed) => {
debug!("router closing failed on drop: Channel already closed");
}
Err(error) => {
error!("router closing failed on drop: {}", error);
}
Ok(_) => {}
}
})
.detach();
}
}
}
}
#[derive(Clone)]
#[must_use = "Router will be closed on drop, make sure to keep it around for as long as needed"]
pub struct Router {
inner: Arc<Inner>,
}
impl fmt::Debug for Router {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Router")
.field("id", &self.inner.id)
.field("rtp_capabilities", &self.inner.rtp_capabilities)
.field("producers", &self.inner.producers)
.field("data_producers", &self.inner.data_producers)
.field("mapped_pipe_transports", &self.inner.mapped_pipe_transports)
.field("worker", &self.inner.worker)
.field("closed", &self.inner.closed)
.finish()
}
}
impl Router {
pub(super) fn new(
id: RouterId,
executor: Arc<Executor<'static>>,
channel: Channel,
rtp_capabilities: RtpCapabilitiesFinalized,
app_data: AppData,
worker: Worker,
) -> Self {
debug!("new()");
let producers = Arc::<RwLock<HashedMap<ProducerId, WeakProducer>>>::default();
let data_producers = Arc::<RwLock<HashedMap<DataProducerId, WeakDataProducer>>>::default();
let mapped_pipe_transports = Arc::<
Mutex<HashedMap<RouterId, Arc<AsyncMutex<Option<WeakPipeTransportPair>>>>>,
>::default();
let handlers = Arc::<Handlers>::default();
let inner_weak = Arc::<Mutex<Option<Weak<Inner>>>>::default();
let on_worker_close_handler = worker.on_close({
let inner_weak = Arc::clone(&inner_weak);
move || {
let maybe_inner = inner_weak.lock().as_ref().and_then(Weak::upgrade);
if let Some(inner) = maybe_inner {
inner.handlers.worker_close.call_simple();
if !inner.closed.swap(true, Ordering::SeqCst) {
inner.handlers.close.call_simple();
}
}
}
});
let inner = Arc::new(Inner {
id,
executor,
rtp_capabilities: SyncMutex::new(rtp_capabilities),
channel,
handlers,
producers,
data_producers,
mapped_pipe_transports,
app_data,
worker,
closed: AtomicBool::new(false),
_on_worker_close_handler: Mutex::new(on_worker_close_handler),
});
inner_weak.lock().replace(Arc::downgrade(&inner));
Self { inner }
}
#[must_use]
pub fn id(&self) -> RouterId {
self.inner.id
}
pub fn worker(&self) -> &Worker {
&self.inner.worker
}
#[must_use]
pub fn app_data(&self) -> &AppData {
&self.inner.app_data
}
#[must_use]
pub fn closed(&self) -> bool {
self.inner.closed.load(Ordering::SeqCst)
}
#[must_use]
pub fn rtp_capabilities(&self) -> RtpCapabilitiesFinalized {
self.inner.rtp_capabilities.lock().unwrap().clone()
}
#[doc(hidden)]
pub async fn dump(&self) -> Result<RouterDump, RequestError> {
debug!("dump()");
self.inner
.channel
.request(self.inner.id, RouterDumpRequest {})
.await
}
pub async fn create_direct_transport(
&self,
direct_transport_options: DirectTransportOptions,
) -> Result<DirectTransport, RequestError> {
debug!("create_direct_transport()");
let transport_id = TransportId::new();
let _buffer_guard = self.inner.channel.buffer_messages_for(transport_id.into());
self.inner
.channel
.request(
self.inner.id,
RouterCreateDirectTransportRequest {
data: RouterCreateDirectTransportData::from_options(
transport_id,
&direct_transport_options,
),
},
)
.await?;
let transport = DirectTransport::new(
transport_id,
Arc::clone(&self.inner.executor),
self.inner.channel.clone(),
direct_transport_options.app_data,
self.clone(),
);
self.inner.handlers.new_transport.call(|callback| {
callback(NewTransport::Direct(&transport));
});
self.after_transport_creation(&transport);
Ok(transport)
}
pub async fn create_webrtc_transport(
&self,
webrtc_transport_options: WebRtcTransportOptions,
) -> Result<WebRtcTransport, RequestError> {
debug!("create_webrtc_transport()");
let transport_id = TransportId::new();
let _buffer_guard = self.inner.channel.buffer_messages_for(transport_id.into());
let data = match webrtc_transport_options.listen {
WebRtcTransportListen::Individual { listen_infos: _ } => {
self.inner
.channel
.request(
self.inner.id,
RouterCreateWebRtcTransportRequest {
data: RouterCreateWebrtcTransportData::from_options(
transport_id,
&webrtc_transport_options,
),
},
)
.await?
}
WebRtcTransportListen::Server { webrtc_server: _ } => {
self.inner
.channel
.request(
self.inner.id,
RouterCreateWebRtcTransportWithServerRequest {
data: RouterCreateWebrtcTransportData::from_options(
transport_id,
&webrtc_transport_options,
),
},
)
.await?
}
};
let transport = WebRtcTransport::new(
transport_id,
Arc::clone(&self.inner.executor),
self.inner.channel.clone(),
data,
webrtc_transport_options.app_data,
self.clone(),
match webrtc_transport_options.listen {
WebRtcTransportListen::Individual { .. } => None,
WebRtcTransportListen::Server { webrtc_server } => Some(webrtc_server),
},
);
self.inner.handlers.new_transport.call(|callback| {
callback(NewTransport::WebRtc(&transport));
});
self.after_transport_creation(&transport);
Ok(transport)
}
pub async fn create_pipe_transport(
&self,
pipe_transport_options: PipeTransportOptions,
) -> Result<PipeTransport, RequestError> {
debug!("create_pipe_transport()");
let transport_id = TransportId::new();
let _buffer_guard = self.inner.channel.buffer_messages_for(transport_id.into());
let data = self
.inner
.channel
.request(
self.inner.id,
RouterCreatePipeTransportRequest {
data: RouterCreatePipeTransportData::from_options(
transport_id,
&pipe_transport_options,
),
},
)
.await?;
let transport = PipeTransport::new(
transport_id,
Arc::clone(&self.inner.executor),
self.inner.channel.clone(),
data,
pipe_transport_options.app_data,
self.clone(),
);
self.inner.handlers.new_transport.call(|callback| {
callback(NewTransport::Pipe(&transport));
});
self.after_transport_creation(&transport);
Ok(transport)
}
pub async fn create_plain_transport(
&self,
plain_transport_options: PlainTransportOptions,
) -> Result<PlainTransport, RequestError> {
debug!("create_plain_transport()");
let transport_id = TransportId::new();
let _buffer_guard = self.inner.channel.buffer_messages_for(transport_id.into());
let data = self
.inner
.channel
.request(
self.inner.id,
RouterCreatePlainTransportRequest {
data: RouterCreatePlainTransportData::from_options(
transport_id,
&plain_transport_options,
),
},
)
.await?;
let transport = PlainTransport::new(
transport_id,
Arc::clone(&self.inner.executor),
self.inner.channel.clone(),
data,
plain_transport_options.app_data,
self.clone(),
);
self.inner.handlers.new_transport.call(|callback| {
callback(NewTransport::Plain(&transport));
});
self.after_transport_creation(&transport);
Ok(transport)
}
pub async fn create_audio_level_observer(
&self,
audio_level_observer_options: AudioLevelObserverOptions,
) -> Result<AudioLevelObserver, RequestError> {
debug!("create_audio_level_observer()");
let rtp_observer_id = RtpObserverId::new();
let _buffer_guard = self
.inner
.channel
.buffer_messages_for(rtp_observer_id.into());
self.inner
.channel
.request(
self.inner.id,
RouterCreateAudioLevelObserverRequest {
data: RouterCreateAudioLevelObserverData::from_options(
rtp_observer_id,
&audio_level_observer_options,
),
},
)
.await?;
let audio_level_observer = AudioLevelObserver::new(
rtp_observer_id,
Arc::clone(&self.inner.executor),
self.inner.channel.clone(),
audio_level_observer_options.app_data,
self.clone(),
);
self.inner.handlers.new_rtp_observer.call(|callback| {
callback(NewRtpObserver::AudioLevel(&audio_level_observer));
});
Ok(audio_level_observer)
}
pub async fn create_active_speaker_observer(
&self,
active_speaker_observer_options: ActiveSpeakerObserverOptions,
) -> Result<ActiveSpeakerObserver, RequestError> {
debug!("create_active_speaker_observer()");
let rtp_observer_id = RtpObserverId::new();
let _buffer_guard = self
.inner
.channel
.buffer_messages_for(rtp_observer_id.into());
self.inner
.channel
.request(
self.inner.id,
RouterCreateActiveSpeakerObserverRequest {
data: RouterCreateActiveSpeakerObserverData::from_options(
rtp_observer_id,
&active_speaker_observer_options,
),
},
)
.await?;
let active_speaker_observer = ActiveSpeakerObserver::new(
rtp_observer_id,
Arc::clone(&self.inner.executor),
self.inner.channel.clone(),
active_speaker_observer_options.app_data,
self.clone(),
);
self.inner.handlers.new_rtp_observer.call(|callback| {
callback(NewRtpObserver::ActiveSpeaker(&active_speaker_observer));
});
Ok(active_speaker_observer)
}
pub async fn pipe_producer_to_router(
&self,
producer_id: ProducerId,
pipe_to_router_options: PipeToRouterOptions,
) -> Result<PipeProducerToRouterPair, PipeProducerToRouterError> {
debug!("pipe_producer_to_router()");
let PipeToRouterOptions {
ref router,
keep_id,
..
} = pipe_to_router_options;
if keep_id && router.id() == self.id() {
return Err(PipeProducerToRouterError::SameRouter);
}
let producer = match self
.inner
.producers
.read()
.get(&producer_id)
.and_then(WeakProducer::upgrade)
{
Some(producer) => producer,
None => {
return Err(PipeProducerToRouterError::ProducerNotFound(producer_id));
}
};
let pipe_transport_pair = self
.get_or_create_pipe_transport_pair(pipe_to_router_options)
.await?;
let pipe_consumer = pipe_transport_pair
.local
.consume(ConsumerOptions::new(
producer_id,
RtpCapabilities::default(),
))
.await?;
let pipe_producer = pipe_transport_pair
.remote
.produce({
let mut producer_options = ProducerOptions::new_pipe_transport(
if keep_id {
producer_id
} else {
ProducerId::new()
},
pipe_consumer.kind(),
pipe_consumer.rtp_parameters().clone(),
);
producer_options.paused = pipe_consumer.producer_paused();
producer_options.app_data = producer.app_data().clone();
producer_options
})
.await?;
pipe_consumer
.on_close({
let pipe_producer_weak = pipe_producer.downgrade();
move || {
if let Some(pipe_producer) = pipe_producer_weak.upgrade() {
pipe_producer.close();
}
}
})
.detach();
pipe_consumer
.on_pause({
let executor = Arc::clone(&self.inner.executor);
let pipe_producer_weak = pipe_producer.downgrade();
move || {
if let Some(pipe_producer) = pipe_producer_weak.upgrade() {
executor
.spawn(async move {
let _ = pipe_producer.pause().await;
})
.detach();
}
}
})
.detach();
pipe_consumer
.on_resume({
let executor = Arc::clone(&self.inner.executor);
let pipe_producer_weak = pipe_producer.downgrade();
move || {
if let Some(pipe_producer) = pipe_producer_weak.upgrade() {
executor
.spawn(async move {
let _ = pipe_producer.resume().await;
})
.detach();
}
}
})
.detach();
pipe_producer
.on_close({
let pipe_consumer = pipe_consumer.clone();
move || {
drop(pipe_consumer);
}
})
.detach();
let pipe_producer = PipedProducer::new(pipe_producer, {
let weak_producer = producer.downgrade();
move |pipe_producer| {
if let Some(producer) = weak_producer.upgrade() {
producer
.on_close(move || {
pipe_producer.close();
})
.detach();
}
}
});
Ok(PipeProducerToRouterPair {
pipe_consumer,
pipe_producer,
})
}
pub async fn pipe_data_producer_to_router(
&self,
data_producer_id: DataProducerId,
pipe_to_router_options: PipeToRouterOptions,
) -> Result<PipeDataProducerToRouterPair, PipeDataProducerToRouterError> {
debug!("pipe_data_producer_to_router()");
let PipeToRouterOptions {
ref router,
keep_id,
..
} = pipe_to_router_options;
if keep_id && router.id() == self.id() {
return Err(PipeDataProducerToRouterError::SameRouter);
}
let data_producer = match self
.inner
.data_producers
.read()
.get(&data_producer_id)
.and_then(WeakDataProducer::upgrade)
{
Some(data_producer) => data_producer,
None => {
return Err(PipeDataProducerToRouterError::DataProducerNotFound(
data_producer_id,
));
}
};
let pipe_transport_pair = self
.get_or_create_pipe_transport_pair(pipe_to_router_options)
.await?;
let pipe_data_consumer = pipe_transport_pair
.local
.consume_data(DataConsumerOptions::new_sctp(data_producer_id))
.await?;
let pipe_data_producer = pipe_transport_pair
.remote
.produce_data({
let mut producer_options = DataProducerOptions::new_pipe_transport(
if keep_id {
data_producer_id
} else {
DataProducerId::new()
},
pipe_data_consumer.sctp_stream_parameters().unwrap(),
);
producer_options
.label
.clone_from(pipe_data_consumer.label());
producer_options
.protocol
.clone_from(pipe_data_consumer.protocol());
producer_options.app_data = data_producer.app_data().clone();
producer_options
})
.await?;
pipe_data_consumer
.on_close({
let pipe_data_producer_weak = pipe_data_producer.downgrade();
move || {
if let Some(pipe_data_producer) = pipe_data_producer_weak.upgrade() {
pipe_data_producer.close();
}
}
})
.detach();
pipe_data_producer
.on_close({
let pipe_data_consumer = pipe_data_consumer.clone();
move || {
drop(pipe_data_consumer);
}
})
.detach();
let pipe_data_producer = NonClosingDataProducer::new(pipe_data_producer, {
let weak_data_producer = data_producer.downgrade();
move |pipe_data_producer| {
if let Some(data_producer) = weak_data_producer.upgrade() {
data_producer
.on_close(move || {
pipe_data_producer.close();
})
.detach();
}
}
});
Ok(PipeDataProducerToRouterPair {
pipe_data_consumer,
pipe_data_producer,
})
}
#[must_use]
pub fn can_consume(
&self,
producer_id: &ProducerId,
rtp_capabilities: &RtpCapabilities,
) -> bool {
if let Some(producer) = self.get_producer(producer_id) {
match ortc::can_consume(producer.consumable_rtp_parameters(), rtp_capabilities) {
Ok(result) => result,
Err(error) => {
error!("can_consume() | unexpected error: {}", error);
false
}
}
} else {
error!(
"can_consume() | Producer with id \"{}\" not found",
producer_id
);
false
}
}
pub fn update_media_codecs(
&mut self,
media_codecs: Vec<RtpCodecCapability>,
) -> Result<(), UpdateMediaCodecsError> {
debug!("update_media_codecs()");
let rtp_capabilities = ortc::generate_router_rtp_capabilities(media_codecs)
.map_err(UpdateMediaCodecsError::FailedRtpCapabilitiesGeneration)?;
let mut locked = self.inner.rtp_capabilities.lock().unwrap();
*locked = rtp_capabilities;
Ok(())
}
pub fn on_new_transport<F: Fn(NewTransport<'_>) + Send + Sync + 'static>(
&self,
callback: F,
) -> HandlerId {
self.inner.handlers.new_transport.add(Arc::new(callback))
}
pub fn on_new_rtp_observer<F: Fn(NewRtpObserver<'_>) + Send + Sync + 'static>(
&self,
callback: F,
) -> HandlerId {
self.inner.handlers.new_rtp_observer.add(Arc::new(callback))
}
pub fn on_worker_close<F: FnOnce() + Send + 'static>(&self, callback: F) -> HandlerId {
self.inner.handlers.worker_close.add(Box::new(callback))
}
pub fn on_close<F: FnOnce() + Send + 'static>(&self, callback: F) -> HandlerId {
let handler_id = self.inner.handlers.close.add(Box::new(callback));
if self.inner.closed.load(Ordering::Relaxed) {
self.inner.handlers.close.call_simple();
}
handler_id
}
async fn get_or_create_pipe_transport_pair(
&self,
pipe_to_router_options: PipeToRouterOptions,
) -> Result<PipeTransportPair, RequestError> {
let pipe_transport_pair_mutex = self
.inner
.mapped_pipe_transports
.lock()
.entry(pipe_to_router_options.router.id())
.or_default()
.clone();
let mut pipe_transport_pair_guard = pipe_transport_pair_mutex.lock().await;
let pipe_transport_pair = match pipe_transport_pair_guard
.as_ref()
.and_then(WeakPipeTransportPair::upgrade)
{
Some(pipe_transport_pair) => pipe_transport_pair,
None => {
let pipe_transport_pair = self
.create_pipe_transport_pair(pipe_to_router_options)
.await?;
pipe_transport_pair_guard.replace(pipe_transport_pair.downgrade());
pipe_transport_pair
}
};
Ok(pipe_transport_pair)
}
async fn create_pipe_transport_pair(
&self,
pipe_to_router_options: PipeToRouterOptions,
) -> Result<PipeTransportPair, RequestError> {
let PipeToRouterOptions {
router,
keep_id: _,
listen_info,
enable_sctp,
num_sctp_streams,
enable_rtx,
enable_srtp,
} = pipe_to_router_options;
let remote_router_id = router.id();
let transport_options = PipeTransportOptions {
enable_sctp,
num_sctp_streams,
enable_rtx,
enable_srtp,
app_data: AppData::default(),
..PipeTransportOptions::new(listen_info)
};
let local_pipe_transport_fut = self.create_pipe_transport(transport_options.clone());
let remote_pipe_transport_fut = router.create_pipe_transport(transport_options);
let (local_pipe_transport, remote_pipe_transport) =
future::try_zip(local_pipe_transport_fut, remote_pipe_transport_fut).await?;
let local_connect_fut = local_pipe_transport.connect({
let tuple = remote_pipe_transport.tuple();
PipeTransportRemoteParameters {
ip: tuple.local_address().parse::<IpAddr>().unwrap(),
port: tuple.local_port(),
srtp_parameters: remote_pipe_transport.srtp_parameters(),
}
});
let remote_connect_fut = remote_pipe_transport.connect({
let tuple = local_pipe_transport.tuple();
PipeTransportRemoteParameters {
ip: tuple.local_address().parse::<IpAddr>().unwrap(),
port: tuple.local_port(),
srtp_parameters: local_pipe_transport.srtp_parameters(),
}
});
future::try_zip(local_connect_fut, remote_connect_fut).await?;
local_pipe_transport
.on_close({
let mapped_pipe_transports = Arc::clone(&self.inner.mapped_pipe_transports);
Box::new(move || {
mapped_pipe_transports.lock().remove(&remote_router_id);
})
})
.detach();
remote_pipe_transport
.on_close({
let mapped_pipe_transports = Arc::clone(&self.inner.mapped_pipe_transports);
Box::new(move || {
mapped_pipe_transports.lock().remove(&remote_router_id);
})
})
.detach();
Ok(PipeTransportPair {
local: local_pipe_transport,
remote: remote_pipe_transport,
})
}
fn after_transport_creation(&self, transport: &impl TransportGeneric) {
{
let producers_weak = Arc::downgrade(&self.inner.producers);
transport
.on_new_producer(Arc::new(move |producer| {
let producer_id = producer.id();
if let Some(producers) = producers_weak.upgrade() {
producers.write().insert(producer_id, producer.downgrade());
}
{
let producers_weak = producers_weak.clone();
producer
.on_close(move || {
if let Some(producers) = producers_weak.upgrade() {
producers.write().remove(&producer_id);
}
})
.detach();
}
}))
.detach();
}
{
let data_producers_weak = Arc::downgrade(&self.inner.data_producers);
transport
.on_new_data_producer(Arc::new(move |data_producer| {
let data_producer_id = data_producer.id();
if let Some(data_producers) = data_producers_weak.upgrade() {
data_producers
.write()
.insert(data_producer_id, data_producer.downgrade());
}
{
let data_producers_weak = data_producers_weak.clone();
data_producer
.on_close(move || {
if let Some(data_producers) = data_producers_weak.upgrade() {
data_producers.write().remove(&data_producer_id);
}
})
.detach();
}
}))
.detach();
}
}
fn has_producer(&self, producer_id: &ProducerId) -> bool {
self.inner.producers.read().contains_key(producer_id)
}
fn get_producer(&self, producer_id: &ProducerId) -> Option<Producer> {
self.inner.producers.read().get(producer_id)?.upgrade()
}
fn has_data_producer(&self, data_producer_id: &DataProducerId) -> bool {
self.inner
.data_producers
.read()
.contains_key(data_producer_id)
}
fn get_data_producer(&self, data_producer_id: &DataProducerId) -> Option<DataProducer> {
self.inner
.data_producers
.read()
.get(data_producer_id)?
.upgrade()
}
#[cfg(test)]
fn close(&self) {
self.inner.close();
}
}