#[cfg(not(doc))]
pub mod audio_level_observer;
#[cfg(not(doc))]
pub mod consumer;
#[cfg(not(doc))]
pub mod data_consumer;
#[cfg(not(doc))]
pub mod data_producer;
#[cfg(not(doc))]
pub mod direct_transport;
#[cfg(not(doc))]
pub mod pipe_transport;
#[cfg(not(doc))]
pub mod plain_transport;
#[cfg(not(doc))]
pub mod producer;
#[cfg(not(doc))]
pub mod rtp_observer;
#[cfg(test)]
mod tests;
#[cfg(not(doc))]
pub mod transport;
#[cfg(not(doc))]
pub mod webrtc_transport;
use crate::audio_level_observer::{AudioLevelObserver, AudioLevelObserverOptions};
use crate::consumer::{Consumer, ConsumerId, ConsumerOptions};
use crate::data_consumer::{DataConsumer, DataConsumerId, DataConsumerOptions};
use crate::data_producer::{
DataProducer, DataProducerId, DataProducerOptions, NonClosingDataProducer, WeakDataProducer,
};
use crate::data_structures::{AppData, TransportListenIp, TransportTuple};
use crate::direct_transport::{DirectTransport, DirectTransportOptions};
use crate::messages::{
RouterCloseRequest, RouterCreateAudioLevelObserverData, RouterCreateAudioLevelObserverRequest,
RouterCreateDirectTransportData, RouterCreateDirectTransportRequest,
RouterCreatePipeTransportData, RouterCreatePipeTransportRequest,
RouterCreatePlainTransportData, RouterCreatePlainTransportRequest,
RouterCreateWebrtcTransportData, RouterCreateWebrtcTransportRequest, RouterDumpRequest,
RouterInternal, RtpObserverInternal, TransportInternal,
};
use crate::pipe_transport::{
PipeTransport, PipeTransportOptions, PipeTransportRemoteParameters, WeakPipeTransport,
};
use crate::plain_transport::{PlainTransport, PlainTransportOptions};
use crate::producer::{NonClosingProducer, Producer, ProducerId, ProducerOptions, WeakProducer};
use crate::rtp_observer::RtpObserverId;
use crate::rtp_parameters::{RtpCapabilities, RtpCapabilitiesFinalized, RtpCodecCapability};
use crate::sctp_parameters::NumSctpStreams;
use crate::transport::{
ConsumeDataError, ConsumeError, ProduceDataError, ProduceError, Transport, TransportGeneric,
TransportId,
};
use crate::webrtc_transport::{WebRtcTransport, WebRtcTransportOptions};
use crate::worker::{Channel, PayloadChannel, RequestError, Worker};
use crate::{ortc, uuid_based_wrapper_type};
use async_executor::Executor;
use async_lock::Mutex as AsyncMutex;
use event_listener_primitives::{Bag, BagOnce, HandlerId};
use futures_lite::future;
use log::*;
use parking_lot::{Mutex, RwLock};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use thiserror::Error;
uuid_based_wrapper_type!(
RouterId
);
#[derive(Debug)]
#[non_exhaustive]
pub struct RouterOptions {
pub media_codecs: Vec<RtpCodecCapability>,
pub app_data: AppData,
}
impl RouterOptions {
pub fn new(media_codecs: Vec<RtpCodecCapability>) -> Self {
Self {
media_codecs,
app_data: AppData::default(),
}
}
}
impl Default for RouterOptions {
fn default() -> Self {
Self::new(vec![])
}
}
#[derive(Debug)]
pub struct PipeToRouterOptions {
pub router: Router,
listen_ip: TransportListenIp,
pub enable_sctp: bool,
pub num_sctp_streams: NumSctpStreams,
pub enable_rtx: bool,
pub enable_srtp: bool,
}
impl PipeToRouterOptions {
pub fn new(router: Router) -> Self {
Self {
router,
listen_ip: TransportListenIp {
ip: "127.0.0.1".parse().unwrap(),
announced_ip: None,
},
enable_sctp: true,
num_sctp_streams: Default::default(),
enable_rtx: false,
enable_srtp: false,
}
}
}
#[derive(Debug)]
pub struct PipeProducerToRouterPair {
pub pipe_consumer: Consumer,
pub pipe_producer: NonClosingProducer,
}
#[derive(Debug, Error, Eq, PartialEq)]
pub enum PipeProducerToRouterError {
#[error("Destination router must be different")]
SameRouter,
#[error("Producer with id \"{0}\" not found")]
ProducerNotFound(ProducerId),
#[error("Failed to create or connect Pipe transport: \"{0}\"")]
TransportFailed(RequestError),
#[error("Failed to consume: \"{0}\"")]
ConsumeFailed(ConsumeError),
#[error("Failed to produce: \"{0}\"")]
ProduceFailed(ProduceError),
}
impl From<RequestError> for PipeProducerToRouterError {
fn from(error: RequestError) -> Self {
PipeProducerToRouterError::TransportFailed(error)
}
}
impl From<ConsumeError> for PipeProducerToRouterError {
fn from(error: ConsumeError) -> Self {
PipeProducerToRouterError::ConsumeFailed(error)
}
}
impl From<ProduceError> for PipeProducerToRouterError {
fn from(error: ProduceError) -> Self {
PipeProducerToRouterError::ProduceFailed(error)
}
}
#[derive(Debug)]
pub struct PipeDataProducerToRouterPair {
pub pipe_data_consumer: DataConsumer,
pub pipe_data_producer: NonClosingDataProducer,
}
#[derive(Debug, Error)]
pub enum PipeDataProducerToRouterError {
#[error("Destination router must be different")]
SameRouter,
#[error("Data producer with id \"{0}\" not found")]
DataProducerNotFound(DataProducerId),
#[error("Failed to create or connect Pipe transport: \"{0}\"")]
TransportFailed(RequestError),
#[error("Failed to consume: \"{0}\"")]
ConsumeFailed(ConsumeDataError),
#[error("Failed to produce: \"{0}\"")]
ProduceFailed(ProduceDataError),
}
impl From<RequestError> for PipeDataProducerToRouterError {
fn from(error: RequestError) -> Self {
PipeDataProducerToRouterError::TransportFailed(error)
}
}
impl From<ConsumeDataError> for PipeDataProducerToRouterError {
fn from(error: ConsumeDataError) -> Self {
PipeDataProducerToRouterError::ConsumeFailed(error)
}
}
impl From<ProduceDataError> for PipeDataProducerToRouterError {
fn from(error: ProduceDataError) -> Self {
PipeDataProducerToRouterError::ProduceFailed(error)
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
#[non_exhaustive]
pub struct RouterDump {
pub id: RouterId,
pub map_consumer_id_producer_id: HashMap<ConsumerId, ProducerId>,
pub map_data_consumer_id_data_producer_id: HashMap<DataConsumerId, DataProducerId>,
pub map_data_producer_id_data_consumer_ids: HashMap<DataProducerId, HashSet<DataConsumerId>>,
pub map_producer_id_consumer_ids: HashMap<ProducerId, HashSet<ConsumerId>>,
pub map_producer_id_observer_ids: HashMap<ProducerId, HashSet<RtpObserverId>>,
pub rtp_observer_ids: HashSet<RtpObserverId>,
pub transport_ids: HashSet<TransportId>,
}
#[derive(Debug)]
pub enum NewTransport<'a> {
Direct(&'a DirectTransport),
Pipe(&'a PipeTransport),
Plain(&'a PlainTransport),
WebRtc(&'a WebRtcTransport),
}
#[derive(Debug)]
pub enum NewRtpObserver<'a> {
AudioLevel(&'a AudioLevelObserver),
}
struct PipeTransportPair {
local: PipeTransport,
remote: PipeTransport,
}
impl PipeTransportPair {
fn downgrade(&self) -> WeakPipeTransportPair {
WeakPipeTransportPair {
local: self.local.downgrade(),
remote: self.remote.downgrade(),
}
}
}
#[derive(Debug)]
struct WeakPipeTransportPair {
local: WeakPipeTransport,
remote: WeakPipeTransport,
}
impl WeakPipeTransportPair {
fn upgrade(&self) -> Option<PipeTransportPair> {
let local = self.local.upgrade()?;
let remote = self.remote.upgrade()?;
Some(PipeTransportPair { local, remote })
}
}
#[derive(Default)]
struct Handlers {
new_transport: Bag<Box<dyn Fn(NewTransport<'_>) + Send + Sync>>,
new_rtp_observer: Bag<Box<dyn Fn(NewRtpObserver<'_>) + Send + Sync>>,
worker_close: BagOnce<Box<dyn FnOnce() + Send>>,
close: BagOnce<Box<dyn FnOnce() + Send>>,
}
struct Inner {
id: RouterId,
executor: Arc<Executor<'static>>,
rtp_capabilities: RtpCapabilitiesFinalized,
channel: Channel,
payload_channel: PayloadChannel,
handlers: Arc<Handlers>,
app_data: AppData,
producers: Arc<RwLock<HashMap<ProducerId, WeakProducer>>>,
data_producers: Arc<RwLock<HashMap<DataProducerId, WeakDataProducer>>>,
#[allow(clippy::type_complexity)]
mapped_pipe_transports:
Arc<Mutex<HashMap<RouterId, Arc<AsyncMutex<Option<WeakPipeTransportPair>>>>>>,
worker: Option<Worker>,
closed: AtomicBool,
_on_worker_close_handler: Mutex<HandlerId>,
}
impl Drop for Inner {
fn drop(&mut self) {
debug!("drop()");
self.close();
}
}
impl Inner {
fn close(&self) {
if !self.closed.swap(true, Ordering::SeqCst) {
self.handlers.close.call_simple();
{
let channel = self.channel.clone();
let request = RouterCloseRequest {
internal: RouterInternal { router_id: self.id },
};
let worker = self.worker.clone();
self.executor
.spawn(async move {
if let Err(error) = channel.request(request).await {
error!("router closing failed on drop: {}", error);
}
drop(worker);
})
.detach();
}
}
}
}
#[derive(Clone)]
pub struct Router {
inner: Arc<Inner>,
}
impl fmt::Debug for Router {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Router")
.field("id", &self.inner.id)
.field("rtp_capabilities", &self.inner.rtp_capabilities)
.field("producers", &self.inner.producers)
.field("data_producers", &self.inner.data_producers)
.field("mapped_pipe_transports", &self.inner.mapped_pipe_transports)
.field("worker", &self.inner.worker)
.field("closed", &self.inner.closed)
.finish()
}
}
impl Router {
pub(super) fn new(
id: RouterId,
executor: Arc<Executor<'static>>,
channel: Channel,
payload_channel: PayloadChannel,
rtp_capabilities: RtpCapabilitiesFinalized,
app_data: AppData,
worker: Worker,
) -> Self {
debug!("new()");
let producers = Arc::<RwLock<HashMap<ProducerId, WeakProducer>>>::default();
let data_producers = Arc::<RwLock<HashMap<DataProducerId, WeakDataProducer>>>::default();
let mapped_pipe_transports = Arc::<
Mutex<HashMap<RouterId, Arc<AsyncMutex<Option<WeakPipeTransportPair>>>>>,
>::default();
let handlers = Arc::<Handlers>::default();
let inner_weak = Arc::<Mutex<Option<Weak<Inner>>>>::default();
let on_worker_close_handler = worker.on_close({
let inner_weak = Arc::clone(&inner_weak);
move || {
if let Some(inner) = inner_weak
.lock()
.as_ref()
.and_then(|weak_inner| weak_inner.upgrade())
{
inner.handlers.worker_close.call_simple();
if !inner.closed.swap(true, Ordering::SeqCst) {
inner.handlers.close.call_simple();
}
}
}
});
let inner = Arc::new(Inner {
id,
executor,
rtp_capabilities,
channel,
payload_channel,
handlers,
producers,
data_producers,
mapped_pipe_transports,
app_data,
worker: Some(worker),
closed: AtomicBool::new(false),
_on_worker_close_handler: Mutex::new(on_worker_close_handler),
});
inner_weak.lock().replace(Arc::downgrade(&inner));
Self { inner }
}
pub fn id(&self) -> RouterId {
self.inner.id
}
pub fn app_data(&self) -> &AppData {
&self.inner.app_data
}
pub fn closed(&self) -> bool {
self.inner.closed.load(Ordering::SeqCst)
}
pub fn rtp_capabilities(&self) -> &RtpCapabilitiesFinalized {
&self.inner.rtp_capabilities
}
#[doc(hidden)]
pub async fn dump(&self) -> Result<RouterDump, RequestError> {
debug!("dump()");
self.inner
.channel
.request(RouterDumpRequest {
internal: RouterInternal {
router_id: self.inner.id,
},
})
.await
}
pub async fn create_direct_transport(
&self,
direct_transport_options: DirectTransportOptions,
) -> Result<DirectTransport, RequestError> {
debug!("create_direct_transport()");
let transport_id = TransportId::new();
let _buffer_guard = self.inner.channel.buffer_messages_for(transport_id.into());
self.inner
.channel
.request(RouterCreateDirectTransportRequest {
internal: TransportInternal {
router_id: self.inner.id,
transport_id,
},
data: RouterCreateDirectTransportData::from_options(&direct_transport_options),
})
.await?;
let transport = DirectTransport::new(
transport_id,
Arc::clone(&self.inner.executor),
self.inner.channel.clone(),
self.inner.payload_channel.clone(),
direct_transport_options.app_data,
self.clone(),
);
self.inner.handlers.new_transport.call(|callback| {
callback(NewTransport::Direct(&transport));
});
self.after_transport_creation(&transport);
Ok(transport)
}
pub async fn create_webrtc_transport(
&self,
webrtc_transport_options: WebRtcTransportOptions,
) -> Result<WebRtcTransport, RequestError> {
debug!("create_webrtc_transport()");
let transport_id = TransportId::new();
let _buffer_guard = self.inner.channel.buffer_messages_for(transport_id.into());
let data = self
.inner
.channel
.request(RouterCreateWebrtcTransportRequest {
internal: TransportInternal {
router_id: self.inner.id,
transport_id,
},
data: RouterCreateWebrtcTransportData::from_options(&webrtc_transport_options),
})
.await?;
let transport = WebRtcTransport::new(
transport_id,
Arc::clone(&self.inner.executor),
self.inner.channel.clone(),
self.inner.payload_channel.clone(),
data,
webrtc_transport_options.app_data,
self.clone(),
);
self.inner.handlers.new_transport.call(|callback| {
callback(NewTransport::WebRtc(&transport));
});
self.after_transport_creation(&transport);
Ok(transport)
}
pub async fn create_pipe_transport(
&self,
pipe_transport_options: PipeTransportOptions,
) -> Result<PipeTransport, RequestError> {
debug!("create_pipe_transport()");
let transport_id = TransportId::new();
let _buffer_guard = self.inner.channel.buffer_messages_for(transport_id.into());
let data = self
.inner
.channel
.request(RouterCreatePipeTransportRequest {
internal: TransportInternal {
router_id: self.inner.id,
transport_id,
},
data: RouterCreatePipeTransportData::from_options(&pipe_transport_options),
})
.await?;
let transport = PipeTransport::new(
transport_id,
Arc::clone(&self.inner.executor),
self.inner.channel.clone(),
self.inner.payload_channel.clone(),
data,
pipe_transport_options.app_data,
self.clone(),
);
self.inner.handlers.new_transport.call(|callback| {
callback(NewTransport::Pipe(&transport));
});
self.after_transport_creation(&transport);
Ok(transport)
}
pub async fn create_plain_transport(
&self,
plain_transport_options: PlainTransportOptions,
) -> Result<PlainTransport, RequestError> {
debug!("create_plain_transport()");
let transport_id = TransportId::new();
let _buffer_guard = self.inner.channel.buffer_messages_for(transport_id.into());
let data = self
.inner
.channel
.request(RouterCreatePlainTransportRequest {
internal: TransportInternal {
router_id: self.inner.id,
transport_id,
},
data: RouterCreatePlainTransportData::from_options(&plain_transport_options),
})
.await?;
let transport = PlainTransport::new(
transport_id,
Arc::clone(&self.inner.executor),
self.inner.channel.clone(),
self.inner.payload_channel.clone(),
data,
plain_transport_options.app_data,
self.clone(),
);
self.inner.handlers.new_transport.call(|callback| {
callback(NewTransport::Plain(&transport));
});
self.after_transport_creation(&transport);
Ok(transport)
}
pub async fn create_audio_level_observer(
&self,
audio_level_observer_options: AudioLevelObserverOptions,
) -> Result<AudioLevelObserver, RequestError> {
debug!("create_audio_level_observer()");
let rtp_observer_id = RtpObserverId::new();
let _buffer_guard = self
.inner
.channel
.buffer_messages_for(rtp_observer_id.into());
self.inner
.channel
.request(RouterCreateAudioLevelObserverRequest {
internal: RtpObserverInternal {
router_id: self.inner.id,
rtp_observer_id,
},
data: RouterCreateAudioLevelObserverData::from_options(
&audio_level_observer_options,
),
})
.await?;
let audio_level_observer = AudioLevelObserver::new(
rtp_observer_id,
Arc::clone(&self.inner.executor),
self.inner.channel.clone(),
audio_level_observer_options.app_data,
self.clone(),
);
self.inner.handlers.new_rtp_observer.call(|callback| {
callback(NewRtpObserver::AudioLevel(&audio_level_observer));
});
Ok(audio_level_observer)
}
pub async fn pipe_producer_to_router(
&self,
producer_id: ProducerId,
pipe_to_router_options: PipeToRouterOptions,
) -> Result<PipeProducerToRouterPair, PipeProducerToRouterError> {
debug!("pipe_producer_to_router()");
if pipe_to_router_options.router.id() == self.id() {
return Err(PipeProducerToRouterError::SameRouter);
}
let producer = match self
.inner
.producers
.read()
.get(&producer_id)
.and_then(WeakProducer::upgrade)
{
Some(producer) => producer,
None => {
return Err(PipeProducerToRouterError::ProducerNotFound(producer_id));
}
};
let pipe_transport_pair = self.get_pipe_transport_pair(pipe_to_router_options).await?;
let pipe_consumer = pipe_transport_pair
.local
.consume(ConsumerOptions::new(
producer_id,
RtpCapabilities::default(),
))
.await?;
let pipe_producer = pipe_transport_pair
.remote
.produce({
let mut producer_options = ProducerOptions::new_pipe_transport(
producer_id,
pipe_consumer.kind(),
pipe_consumer.rtp_parameters().clone(),
);
producer_options.paused = pipe_consumer.producer_paused();
producer_options.app_data = producer.app_data().clone();
producer_options
})
.await?;
pipe_consumer
.on_close({
let pipe_producer_weak = pipe_producer.downgrade();
move || {
if let Some(pipe_producer) = pipe_producer_weak.upgrade() {
let _ = pipe_producer.close();
}
}
})
.detach();
pipe_consumer
.on_pause({
let executor = Arc::clone(&self.inner.executor);
let pipe_producer_weak = pipe_producer.downgrade();
move || {
if let Some(pipe_producer) = pipe_producer_weak.upgrade() {
executor
.spawn(async move {
let _ = pipe_producer.pause().await;
})
.detach();
}
}
})
.detach();
pipe_consumer
.on_resume({
let executor = Arc::clone(&self.inner.executor);
let pipe_producer_weak = pipe_producer.downgrade();
move || {
if let Some(pipe_producer) = pipe_producer_weak.upgrade() {
executor
.spawn(async move {
let _ = pipe_producer.resume().await;
})
.detach();
}
}
})
.detach();
pipe_producer
.on_close({
let pipe_consumer = pipe_consumer.clone();
move || {
drop(pipe_consumer);
}
})
.detach();
let pipe_producer = NonClosingProducer::new(pipe_producer, {
let weak_producer = producer.downgrade();
move |pipe_producer| {
if let Some(producer) = weak_producer.upgrade() {
producer
.on_close(move || {
pipe_producer.close();
})
.detach();
}
}
});
Ok(PipeProducerToRouterPair {
pipe_consumer,
pipe_producer,
})
}
pub async fn pipe_data_producer_to_router(
&self,
data_producer_id: DataProducerId,
pipe_to_router_options: PipeToRouterOptions,
) -> Result<PipeDataProducerToRouterPair, PipeDataProducerToRouterError> {
debug!("pipe_data_producer_to_router()");
if pipe_to_router_options.router.id() == self.id() {
return Err(PipeDataProducerToRouterError::SameRouter);
}
let data_producer = match self
.inner
.data_producers
.read()
.get(&data_producer_id)
.and_then(WeakDataProducer::upgrade)
{
Some(data_producer) => data_producer,
None => {
return Err(PipeDataProducerToRouterError::DataProducerNotFound(
data_producer_id,
));
}
};
let pipe_transport_pair = self.get_pipe_transport_pair(pipe_to_router_options).await?;
let pipe_data_consumer = pipe_transport_pair
.local
.consume_data(DataConsumerOptions::new_sctp(data_producer_id))
.await?;
let pipe_data_producer = pipe_transport_pair
.remote
.produce_data({
let mut producer_options = DataProducerOptions::new_pipe_transport(
data_producer_id,
pipe_data_consumer.sctp_stream_parameters().unwrap(),
);
producer_options.label = pipe_data_consumer.label().clone();
producer_options.protocol = pipe_data_consumer.protocol().clone();
producer_options.app_data = data_producer.app_data().clone();
producer_options
})
.await?;
pipe_data_consumer
.on_close({
let pipe_data_producer_weak = pipe_data_producer.downgrade();
move || {
if let Some(pipe_data_producer) = pipe_data_producer_weak.upgrade() {
let _ = pipe_data_producer.close();
}
}
})
.detach();
pipe_data_producer
.on_close({
let pipe_data_consumer = pipe_data_consumer.clone();
move || {
drop(pipe_data_consumer);
}
})
.detach();
let pipe_data_producer = NonClosingDataProducer::new(pipe_data_producer, {
let weak_data_producer = data_producer.downgrade();
move |pipe_data_producer| {
if let Some(data_producer) = weak_data_producer.upgrade() {
data_producer
.on_close(move || {
pipe_data_producer.close();
})
.detach();
}
}
});
Ok(PipeDataProducerToRouterPair {
pipe_data_consumer,
pipe_data_producer,
})
}
pub fn can_consume(
&self,
producer_id: &ProducerId,
rtp_capabilities: &RtpCapabilities,
) -> bool {
match self.get_producer(producer_id) {
Some(producer) => {
match ortc::can_consume(producer.consumable_rtp_parameters(), rtp_capabilities) {
Ok(result) => result,
Err(error) => {
error!("can_consume() | unexpected error: {}", error);
false
}
}
}
None => {
error!(
"can_consume() | Producer with id \"{}\" not found",
producer_id
);
false
}
}
}
pub fn on_new_transport<F: Fn(NewTransport<'_>) + Send + Sync + 'static>(
&self,
callback: F,
) -> HandlerId {
self.inner.handlers.new_transport.add(Box::new(callback))
}
pub fn on_new_rtp_observer<F: Fn(NewRtpObserver<'_>) + Send + Sync + 'static>(
&self,
callback: F,
) -> HandlerId {
self.inner.handlers.new_rtp_observer.add(Box::new(callback))
}
pub fn on_worker_close<F: FnOnce() + Send + 'static>(&self, callback: F) -> HandlerId {
self.inner.handlers.worker_close.add(Box::new(callback))
}
pub fn on_close<F: FnOnce() + Send + 'static>(&self, callback: F) -> HandlerId {
let handler_id = self.inner.handlers.close.add(Box::new(callback));
if self.inner.closed.load(Ordering::Relaxed) {
self.inner.handlers.close.call_simple();
}
handler_id
}
async fn get_pipe_transport_pair(
&self,
pipe_to_router_options: PipeToRouterOptions,
) -> Result<PipeTransportPair, RequestError> {
let mut mapped_pipe_transports = self.inner.mapped_pipe_transports.lock();
let pipe_transport_pair_mutex = mapped_pipe_transports
.entry(pipe_to_router_options.router.id())
.or_default()
.clone();
let mut pipe_transport_pair_guard = pipe_transport_pair_mutex.lock().await;
drop(mapped_pipe_transports);
let pipe_transport_pair = match pipe_transport_pair_guard
.as_ref()
.and_then(WeakPipeTransportPair::upgrade)
{
Some(pipe_transport_pair) => pipe_transport_pair,
None => {
let pipe_transport_pair = self
.create_pipe_transport_pair(pipe_to_router_options)
.await?;
pipe_transport_pair_guard.replace(pipe_transport_pair.downgrade());
pipe_transport_pair
}
};
Ok(pipe_transport_pair)
}
async fn create_pipe_transport_pair(
&self,
pipe_to_router_options: PipeToRouterOptions,
) -> Result<PipeTransportPair, RequestError> {
let PipeToRouterOptions {
router,
listen_ip,
enable_sctp,
num_sctp_streams,
enable_rtx,
enable_srtp,
} = pipe_to_router_options;
let remote_router_id = router.id();
let transport_options = PipeTransportOptions {
enable_sctp,
num_sctp_streams,
enable_rtx,
enable_srtp,
app_data: Default::default(),
..PipeTransportOptions::new(listen_ip)
};
let local_pipe_transport_fut = self.create_pipe_transport(transport_options.clone());
let remote_pipe_transport_fut = router.create_pipe_transport(transport_options);
let (local_pipe_transport, remote_pipe_transport) =
future::try_zip(local_pipe_transport_fut, remote_pipe_transport_fut).await?;
let local_connect_fut = local_pipe_transport.connect({
let (ip, port) = match remote_pipe_transport.tuple() {
TransportTuple::LocalOnly {
local_ip,
local_port,
..
} => (local_ip, local_port),
TransportTuple::WithRemote {
local_ip,
local_port,
..
} => (local_ip, local_port),
};
PipeTransportRemoteParameters {
ip,
port,
srtp_parameters: remote_pipe_transport.srtp_parameters(),
}
});
let remote_connect_fut = remote_pipe_transport.connect({
let (ip, port) = match local_pipe_transport.tuple() {
TransportTuple::LocalOnly {
local_ip,
local_port,
..
} => (local_ip, local_port),
TransportTuple::WithRemote {
local_ip,
local_port,
..
} => (local_ip, local_port),
};
PipeTransportRemoteParameters {
ip,
port,
srtp_parameters: local_pipe_transport.srtp_parameters(),
}
});
future::try_zip(local_connect_fut, remote_connect_fut).await?;
local_pipe_transport
.on_close({
let mapped_pipe_transports = Arc::clone(&self.inner.mapped_pipe_transports);
Box::new(move || {
mapped_pipe_transports.lock().remove(&remote_router_id);
})
})
.detach();
remote_pipe_transport
.on_close({
let mapped_pipe_transports = Arc::clone(&self.inner.mapped_pipe_transports);
Box::new(move || {
mapped_pipe_transports.lock().remove(&remote_router_id);
})
})
.detach();
Ok(PipeTransportPair {
local: local_pipe_transport,
remote: remote_pipe_transport,
})
}
fn after_transport_creation(&self, transport: &impl TransportGeneric) {
{
let producers_weak = Arc::downgrade(&self.inner.producers);
transport
.on_new_producer(Box::new(move |producer| {
let producer_id = producer.id();
if let Some(producers) = producers_weak.upgrade() {
producers.write().insert(producer_id, producer.downgrade());
}
{
let producers_weak = producers_weak.clone();
producer
.on_close(move || {
if let Some(producers) = producers_weak.upgrade() {
producers.write().remove(&producer_id);
}
})
.detach();
}
}))
.detach();
}
{
let data_producers_weak = Arc::downgrade(&self.inner.data_producers);
transport
.on_new_data_producer(Box::new(move |data_producer| {
let data_producer_id = data_producer.id();
if let Some(data_producers) = data_producers_weak.upgrade() {
data_producers
.write()
.insert(data_producer_id, data_producer.downgrade());
}
{
let data_producers_weak = data_producers_weak.clone();
data_producer
.on_close(move || {
if let Some(data_producers) = data_producers_weak.upgrade() {
data_producers.write().remove(&data_producer_id);
}
})
.detach();
}
}))
.detach();
}
}
fn has_producer(&self, producer_id: &ProducerId) -> bool {
self.inner.producers.read().contains_key(producer_id)
}
fn get_producer(&self, producer_id: &ProducerId) -> Option<Producer> {
self.inner.producers.read().get(producer_id)?.upgrade()
}
fn has_data_producer(&self, data_producer_id: &DataProducerId) -> bool {
self.inner
.data_producers
.read()
.contains_key(data_producer_id)
}
fn get_data_producer(&self, data_producer_id: &DataProducerId) -> Option<DataProducer> {
self.inner
.data_producers
.read()
.get(data_producer_id)?
.upgrade()
}
#[cfg(test)]
fn close(&self) {
self.inner.close();
}
}