use crate::consumer::{Consumer, ConsumerId, ConsumerOptions, ConsumerType};
use crate::data_consumer::{DataConsumer, DataConsumerId, DataConsumerOptions, DataConsumerType};
use crate::data_producer::{DataProducer, DataProducerId, DataProducerOptions, DataProducerType};
use crate::data_structures::{AppData, TraceEventDirection};
use crate::messages::{
ConsumerInternal, DataConsumerInternal, DataProducerInternal, ProducerInternal,
TransportConsumeData, TransportConsumeDataData, TransportConsumeDataRequest,
TransportConsumeRequest, TransportDumpRequest, TransportEnableTraceEventData,
TransportEnableTraceEventRequest, TransportGetStatsRequest, TransportInternal,
TransportProduceData, TransportProduceDataData, TransportProduceDataRequest,
TransportProduceRequest, TransportSetMaxIncomingBitrateData,
TransportSetMaxIncomingBitrateRequest,
};
pub use crate::ortc::{
ConsumerRtpParametersError, RtpCapabilitiesError, RtpParametersError, RtpParametersMappingError,
};
use crate::producer::{Producer, ProducerId, ProducerOptions};
use crate::router::{Router, RouterId};
use crate::rtp_parameters::RtpEncodingParameters;
use crate::worker::{Channel, PayloadChannel, RequestError};
use crate::{ortc, uuid_based_wrapper_type};
use async_executor::Executor;
use async_trait::async_trait;
use event_listener_primitives::HandlerId;
use log::*;
use parking_lot::Mutex;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use thiserror::Error;
use uuid::Uuid;
uuid_based_wrapper_type!(
TransportId
);
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum TransportTraceEventData {
Probation {
timestamp: u64,
direction: TraceEventDirection,
info: Value,
},
Bwe {
timestamp: u64,
direction: TraceEventDirection,
info: Value,
},
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum TransportTraceEventType {
Probation,
Bwe,
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
pub struct RtpListener {
pub mid_table: HashMap<String, ProducerId>,
pub rid_table: HashMap<String, ProducerId>,
pub ssrc_table: HashMap<String, ProducerId>,
}
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
pub struct RecvRtpHeaderExtensions {
mid: Option<u8>,
rid: Option<u8>,
rrid: Option<u8>,
abs_send_time: Option<u8>,
transport_wide_cc01: Option<u8>,
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
pub struct SctpListener {
stream_id_table: HashMap<String, DataProducerId>,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(super) enum TransportType {
Direct,
Pipe,
Plain,
WebRtc,
}
#[async_trait(?Send)]
pub trait Transport: Debug + Send + Sync + CloneTransport {
fn id(&self) -> TransportId;
fn router_id(&self) -> RouterId;
fn app_data(&self) -> &AppData;
fn closed(&self) -> bool;
async fn produce(&self, producer_options: ProducerOptions) -> Result<Producer, ProduceError>;
async fn consume(&self, consumer_options: ConsumerOptions) -> Result<Consumer, ConsumeError>;
async fn produce_data(
&self,
data_producer_options: DataProducerOptions,
) -> Result<DataProducer, ProduceDataError>;
async fn consume_data(
&self,
data_consumer_options: DataConsumerOptions,
) -> Result<DataConsumer, ConsumeDataError>;
async fn enable_trace_event(
&self,
types: Vec<TransportTraceEventType>,
) -> Result<(), RequestError>;
fn on_new_producer(
&self,
callback: Box<dyn Fn(&Producer) + Send + Sync + 'static>,
) -> HandlerId;
fn on_new_consumer(
&self,
callback: Box<dyn Fn(&Consumer) + Send + Sync + 'static>,
) -> HandlerId;
fn on_new_data_producer(
&self,
callback: Box<dyn Fn(&DataProducer) + Send + Sync + 'static>,
) -> HandlerId;
fn on_new_data_consumer(
&self,
callback: Box<dyn Fn(&DataConsumer) + Send + Sync + 'static>,
) -> HandlerId;
fn on_trace(
&self,
callback: Box<dyn Fn(&TransportTraceEventData) + Send + Sync + 'static>,
) -> HandlerId;
fn on_router_close(&self, callback: Box<dyn FnOnce() + Send + 'static>) -> HandlerId;
fn on_close(&self, callback: Box<dyn FnOnce() + Send + 'static>) -> HandlerId;
}
#[doc(hidden)]
pub trait CloneTransport {
#[doc(hidden)]
fn clone_transport(&self) -> Box<dyn Transport>;
}
impl<T> CloneTransport for T
where
T: Transport + Clone + 'static,
{
fn clone_transport(&self) -> Box<dyn Transport> {
Box::new(self.clone())
}
}
impl Clone for Box<dyn Transport> {
fn clone(&self) -> Self {
self.clone_transport()
}
}
#[async_trait(?Send)]
pub trait TransportGeneric: Transport + Clone + 'static {
#[doc(hidden)]
type Dump: Debug + DeserializeOwned + 'static;
type Stat: Debug + DeserializeOwned + 'static;
async fn dump(&self) -> Result<Self::Dump, RequestError>;
async fn get_stats(&self) -> Result<Vec<Self::Stat>, RequestError>;
}
#[derive(Debug, Error, Eq, PartialEq)]
pub enum ProduceError {
#[error("Producer with the same id \"{0}\" already exists")]
AlreadyExists(ProducerId),
#[error("Incorrect RTP parameters: {0}")]
IncorrectRtpParameters(RtpParametersError),
#[error("RTP mapping error: {0}")]
FailedRtpParametersMapping(RtpParametersMappingError),
#[error("Request to worker failed: {0}")]
Request(RequestError),
}
#[derive(Debug, Error, Eq, PartialEq)]
pub enum ConsumeError {
#[error("Producer with id \"{0}\" not found")]
ProducerNotFound(ProducerId),
#[error("RTP capabilities error: {0}")]
FailedRtpCapabilitiesValidation(RtpCapabilitiesError),
#[error("Bad consumer RTP parameters: {0}")]
BadConsumerRtpParameters(ConsumerRtpParametersError),
#[error("Request to worker failed: {0}")]
Request(RequestError),
}
#[derive(Debug, Error, Eq, PartialEq)]
pub enum ProduceDataError {
#[error("Data producer with the same id \"{0}\" already exists")]
AlreadyExists(DataProducerId),
#[error("SCTP stream parameters are required for this transport")]
SctpStreamParametersRequired,
#[error("Request to worker failed: {0}")]
Request(RequestError),
}
#[derive(Debug, Error, Eq, PartialEq)]
pub enum ConsumeDataError {
#[error("Data producer with id \"{0}\" not found")]
DataProducerNotFound(DataProducerId),
#[error("No free sctp_stream_id available in transport")]
NoSctpStreamId,
#[error("Request to worker failed: {0}")]
Request(RequestError),
}
#[async_trait(?Send)]
pub(super) trait TransportImpl: TransportGeneric {
fn router(&self) -> &Router;
fn channel(&self) -> &Channel;
fn payload_channel(&self) -> &PayloadChannel;
fn executor(&self) -> &Arc<Executor<'static>>;
fn next_mid_for_consumers(&self) -> &AtomicUsize;
fn used_sctp_stream_ids(&self) -> &Mutex<HashMap<u16, bool>>;
fn cname_for_producers(&self) -> &Mutex<Option<String>>;
fn allocate_sctp_stream_id(&self) -> Option<u16> {
let mut used_sctp_stream_ids = self.used_sctp_stream_ids().lock();
for (index, used) in used_sctp_stream_ids.iter_mut() {
if !*used {
*used = true;
return Some(*index);
}
}
None
}
fn deallocate_sctp_stream_id(&self, sctp_stream_id: u16) {
let used_sctp_stream_ids = self.used_sctp_stream_ids();
if let Some(used) = used_sctp_stream_ids.lock().get_mut(&sctp_stream_id) {
*used = false;
}
}
async fn dump_impl(&self) -> Result<Self::Dump, RequestError> {
self.channel()
.request(TransportDumpRequest {
internal: TransportInternal {
router_id: self.router().id(),
transport_id: self.id(),
},
phantom_data: PhantomData {},
})
.await
}
async fn get_stats_impl(&self) -> Result<Vec<Self::Stat>, RequestError> {
self.channel()
.request(TransportGetStatsRequest {
internal: TransportInternal {
router_id: self.router().id(),
transport_id: self.id(),
},
phantom_data: PhantomData {},
})
.await
}
async fn set_max_incoming_bitrate_impl(&self, bitrate: u32) -> Result<(), RequestError> {
self.channel()
.request(TransportSetMaxIncomingBitrateRequest {
internal: TransportInternal {
router_id: self.router().id(),
transport_id: self.id(),
},
data: TransportSetMaxIncomingBitrateData { bitrate },
})
.await
}
async fn enable_trace_event_impl(
&self,
types: Vec<TransportTraceEventType>,
) -> Result<(), RequestError> {
self.channel()
.request(TransportEnableTraceEventRequest {
internal: TransportInternal {
router_id: self.router().id(),
transport_id: self.id(),
},
data: TransportEnableTraceEventData { types },
})
.await
}
async fn produce_impl(
&self,
producer_options: ProducerOptions,
transport_type: TransportType,
) -> Result<Producer, ProduceError> {
if let Some(id) = &producer_options.id {
if self.router().has_producer(id) {
return Err(ProduceError::AlreadyExists(*id));
}
}
let ProducerOptions {
id,
kind,
mut rtp_parameters,
paused,
key_frame_request_delay,
app_data,
} = producer_options;
ortc::validate_rtp_parameters(&rtp_parameters)
.map_err(ProduceError::IncorrectRtpParameters)?;
if rtp_parameters.encodings.is_empty() {
rtp_parameters
.encodings
.push(RtpEncodingParameters::default());
}
if transport_type != TransportType::Pipe {
let mut cname_for_producers = self.cname_for_producers().lock();
if let Some(cname_for_producers) = cname_for_producers.as_ref() {
rtp_parameters.rtcp.cname = Some(cname_for_producers.clone());
} else if let Some(cname) = rtp_parameters.rtcp.cname.as_ref() {
cname_for_producers.replace(cname.clone());
} else {
let cname = Uuid::new_v4().to_string();
cname_for_producers.replace(cname.clone());
rtp_parameters.rtcp.cname = Some(cname);
}
}
let router_rtp_capabilities = self.router().rtp_capabilities();
let rtp_mapping =
ortc::get_producer_rtp_parameters_mapping(&rtp_parameters, router_rtp_capabilities)
.map_err(ProduceError::FailedRtpParametersMapping)?;
let consumable_rtp_parameters = ortc::get_consumable_rtp_parameters(
kind,
&rtp_parameters,
router_rtp_capabilities,
&rtp_mapping,
);
let producer_id = id.unwrap_or_else(ProducerId::new);
let _buffer_guard = self.channel().buffer_messages_for(producer_id.into());
let response = self
.channel()
.request(TransportProduceRequest {
internal: ProducerInternal {
router_id: self.router().id(),
transport_id: self.id(),
producer_id,
},
data: TransportProduceData {
kind,
rtp_parameters: rtp_parameters.clone(),
rtp_mapping,
key_frame_request_delay,
paused,
},
})
.await
.map_err(ProduceError::Request)?;
let producer_fut = Producer::new(
producer_id,
kind,
response.r#type,
rtp_parameters,
consumable_rtp_parameters,
paused,
Arc::clone(self.executor()),
self.channel().clone(),
self.payload_channel().clone(),
app_data,
Box::new(self.clone()),
transport_type == TransportType::Direct,
);
Ok(producer_fut.await)
}
async fn consume_impl(
&self,
consumer_options: ConsumerOptions,
transport_type: TransportType,
rtx: bool,
) -> Result<Consumer, ConsumeError> {
let ConsumerOptions {
producer_id,
rtp_capabilities,
paused,
preferred_layers,
pipe,
app_data,
} = consumer_options;
ortc::validate_rtp_capabilities(&rtp_capabilities)
.map_err(ConsumeError::FailedRtpCapabilitiesValidation)?;
let producer = match self.router().get_producer(&producer_id) {
Some(producer) => producer,
None => {
return Err(ConsumeError::ProducerNotFound(producer_id));
}
};
let rtp_parameters = if transport_type == TransportType::Pipe {
ortc::get_pipe_consumer_rtp_parameters(producer.consumable_rtp_parameters(), rtx)
} else {
let mut rtp_parameters = ortc::get_consumer_rtp_parameters(
producer.consumable_rtp_parameters(),
rtp_capabilities,
pipe,
)
.map_err(ConsumeError::BadConsumerRtpParameters)?;
if !pipe {
let next_mid_for_consumers = self
.next_mid_for_consumers()
.fetch_add(1, Ordering::Relaxed);
let mid = next_mid_for_consumers % 100_000_000;
rtp_parameters.mid = Some(format!("{}", mid));
}
rtp_parameters
};
let consumer_id = ConsumerId::new();
let r#type = if transport_type == TransportType::Pipe || pipe {
ConsumerType::Pipe
} else {
producer.r#type().into()
};
let _buffer_guard = self.channel().buffer_messages_for(consumer_id.into());
let response = self
.channel()
.request(TransportConsumeRequest {
internal: ConsumerInternal {
router_id: self.router().id(),
transport_id: self.id(),
consumer_id,
producer_id: producer.id(),
},
data: TransportConsumeData {
kind: producer.kind(),
rtp_parameters: rtp_parameters.clone(),
r#type,
consumable_rtp_encodings: producer
.consumable_rtp_parameters()
.encodings
.clone(),
paused,
preferred_layers,
},
})
.await
.map_err(ConsumeError::Request)?;
Ok(Consumer::new(
consumer_id,
producer.id(),
producer.kind(),
r#type,
rtp_parameters,
response.paused,
Arc::clone(self.executor()),
self.channel().clone(),
self.payload_channel().clone(),
response.producer_paused,
response.score,
response.preferred_layers,
app_data,
Box::new(self.clone()),
))
}
async fn produce_data_impl(
&self,
r#type: DataProducerType,
data_producer_options: DataProducerOptions,
transport_type: TransportType,
) -> Result<DataProducer, ProduceDataError> {
if let Some(id) = &data_producer_options.id {
if self.router().has_data_producer(id) {
return Err(ProduceDataError::AlreadyExists(*id));
}
}
match r#type {
DataProducerType::Sctp => {
if data_producer_options.sctp_stream_parameters.is_none() {
return Err(ProduceDataError::SctpStreamParametersRequired);
}
}
DataProducerType::Direct => {
if data_producer_options.sctp_stream_parameters.is_some() {
warn!(
"sctp_stream_parameters are ignored when producing data on a DirectTransport",
);
}
}
}
let DataProducerOptions {
id,
sctp_stream_parameters,
label,
protocol,
app_data,
} = data_producer_options;
let data_producer_id = id.unwrap_or_else(DataProducerId::new);
let _buffer_guard = self.channel().buffer_messages_for(data_producer_id.into());
let response = self
.channel()
.request(TransportProduceDataRequest {
internal: DataProducerInternal {
router_id: self.router().id(),
transport_id: self.id(),
data_producer_id,
},
data: TransportProduceDataData {
r#type,
sctp_stream_parameters,
label,
protocol,
},
})
.await
.map_err(ProduceDataError::Request)?;
Ok(DataProducer::new(
data_producer_id,
response.r#type,
response.sctp_stream_parameters,
response.label,
response.protocol,
Arc::clone(self.executor()),
self.channel().clone(),
self.payload_channel().clone(),
app_data,
Box::new(self.clone()),
transport_type == TransportType::Direct,
))
}
async fn consume_data_impl(
&self,
r#type: DataConsumerType,
data_consumer_options: DataConsumerOptions,
transport_type: TransportType,
) -> Result<DataConsumer, ConsumeDataError> {
let DataConsumerOptions {
data_producer_id,
ordered,
max_packet_life_time,
max_retransmits,
app_data,
} = data_consumer_options;
let data_producer = match self.router().get_data_producer(&data_producer_id) {
Some(data_producer) => data_producer,
None => {
return Err(ConsumeDataError::DataProducerNotFound(data_producer_id));
}
};
let sctp_stream_parameters = match r#type {
DataConsumerType::Sctp => {
let mut sctp_stream_parameters = data_producer.sctp_stream_parameters();
if let Some(sctp_stream_parameters) = &mut sctp_stream_parameters {
if let Some(stream_id) = self.allocate_sctp_stream_id() {
sctp_stream_parameters.stream_id = stream_id;
} else {
return Err(ConsumeDataError::NoSctpStreamId);
}
if let Some(ordered) = ordered {
sctp_stream_parameters.ordered = ordered;
}
if let Some(max_packet_life_time) = max_packet_life_time {
sctp_stream_parameters.max_packet_life_time = Some(max_packet_life_time);
}
if let Some(max_retransmits) = max_retransmits {
sctp_stream_parameters.max_retransmits = Some(max_retransmits);
}
}
sctp_stream_parameters
}
DataConsumerType::Direct => {
if ordered.is_some() || max_packet_life_time.is_some() || max_retransmits.is_some()
{
warn!("ordered, max_packet_life_time and max_retransmits are ignored when consuming data on a DirectTransport");
}
None
}
};
let data_consumer_id = DataConsumerId::new();
let _buffer_guard = self.channel().buffer_messages_for(data_consumer_id.into());
let response = self
.channel()
.request(TransportConsumeDataRequest {
internal: DataConsumerInternal {
router_id: self.router().id(),
transport_id: self.id(),
data_producer_id: data_producer.id(),
data_consumer_id,
},
data: TransportConsumeDataData {
r#type,
sctp_stream_parameters,
label: data_producer.label().clone(),
protocol: data_producer.protocol().clone(),
},
})
.await
.map_err(ConsumeDataError::Request)?;
let data_consumer = DataConsumer::new(
data_consumer_id,
response.r#type,
response.sctp_stream_parameters,
response.label,
response.protocol,
data_producer.id(),
Arc::clone(self.executor()),
self.channel().clone(),
self.payload_channel().clone(),
app_data,
Box::new(self.clone()),
transport_type == TransportType::Direct,
);
if let Some(sctp_stream_parameters) = data_consumer.sctp_stream_parameters() {
let stream_id = sctp_stream_parameters.stream_id;
let transport = self.clone();
data_consumer
.on_close(move || {
transport.deallocate_sctp_stream_id(stream_id);
})
.detach();
}
Ok(data_consumer)
}
}