#[cfg(test)]
mod rtp_transceiver_test;
use crate::api::media_engine::MediaEngine;
use crate::error::{Error, Result};
use crate::rtp_transceiver::rtp_codec::*;
use crate::rtp_transceiver::rtp_receiver::{RTCRtpReceiver, RTPReceiverInternal};
use crate::rtp_transceiver::rtp_sender::RTCRtpSender;
use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
use crate::track::track_local::TrackLocal;
use interceptor::{
stream_info::{RTPHeaderExtension, StreamInfo},
Attributes,
};
use log::trace;
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use util::Unmarshal;
pub(crate) mod fmtp;
pub mod rtp_codec;
pub mod rtp_receiver;
pub mod rtp_sender;
pub mod rtp_transceiver_direction;
pub(crate) mod srtp_writer_future;
#[allow(clippy::upper_case_acronyms)]
pub type SSRC = u32;
pub type PayloadType = u8;
pub const TYPE_RTCP_FB_TRANSPORT_CC: &str = "transport-cc";
pub const TYPE_RTCP_FB_GOOG_REMB: &str = "goog-remb";
pub const TYPE_RTCP_FB_ACK: &str = "ack";
pub const TYPE_RTCP_FB_CCM: &str = "ccm";
pub const TYPE_RTCP_FB_NACK: &str = "nack";
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct RTCPFeedback {
pub typ: String,
pub parameter: String,
}
#[derive(Default, Debug, Clone)]
pub struct RTCRtpCapabilities {
pub codecs: Vec<RTCRtpCodecCapability>,
pub header_extensions: Vec<RTCRtpHeaderExtensionCapability>,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct RTCRtpRtxParameters {
pub ssrc: SSRC,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct RTCRtpCodingParameters {
pub rid: String,
pub ssrc: SSRC,
pub payload_type: PayloadType,
pub rtx: RTCRtpRtxParameters,
}
pub type RTCRtpDecodingParameters = RTCRtpCodingParameters;
pub type RTCRtpEncodingParameters = RTCRtpCodingParameters;
#[derive(Debug)]
pub struct RTCRtpReceiveParameters {
pub encodings: Vec<RTCRtpDecodingParameters>,
}
#[derive(Debug)]
pub struct RTCRtpSendParameters {
pub rtp_parameters: RTCRtpParameters,
pub encodings: Vec<RTCRtpEncodingParameters>,
}
pub struct RTCRtpTransceiverInit {
pub direction: RTCRtpTransceiverDirection,
pub send_encodings: Vec<RTCRtpEncodingParameters>,
}
pub(crate) fn create_stream_info(
id: String,
ssrc: SSRC,
payload_type: PayloadType,
codec: RTCRtpCodecCapability,
webrtc_header_extensions: &[RTCRtpHeaderExtensionParameters],
) -> StreamInfo {
let mut header_extensions = vec![];
for h in webrtc_header_extensions {
header_extensions.push(RTPHeaderExtension {
id: h.id,
uri: h.uri.clone(),
});
}
let mut feedbacks = vec![];
for f in &codec.rtcp_feedback {
feedbacks.push(interceptor::stream_info::RTCPFeedback {
typ: f.typ.clone(),
parameter: f.parameter.clone(),
});
}
StreamInfo {
id,
attributes: Attributes::new(),
ssrc,
payload_type,
rtp_header_extensions: header_extensions,
mime_type: codec.mime_type,
clock_rate: codec.clock_rate,
channels: codec.channels,
sdp_fmtp_line: codec.sdp_fmtp_line,
rtcp_feedback: feedbacks,
}
}
pub type TriggerNegotiationNeededFnOption =
Option<Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> + Send + Sync>>;
pub struct RTCRtpTransceiver {
mid: Mutex<String>, sender: Mutex<Option<Arc<RTCRtpSender>>>, receiver: Mutex<Option<Arc<RTCRtpReceiver>>>,
direction: AtomicU8, current_direction: AtomicU8,
codecs: Arc<Mutex<Vec<RTCRtpCodecParameters>>>,
pub(crate) stopped: AtomicBool,
pub(crate) kind: RTPCodecType,
media_engine: Arc<MediaEngine>,
trigger_negotiation_needed: Mutex<TriggerNegotiationNeededFnOption>,
}
impl RTCRtpTransceiver {
pub(crate) async fn new(
receiver: Option<Arc<RTCRtpReceiver>>,
sender: Option<Arc<RTCRtpSender>>,
direction: RTCRtpTransceiverDirection,
kind: RTPCodecType,
codecs: Vec<RTCRtpCodecParameters>,
media_engine: Arc<MediaEngine>,
trigger_negotiation_needed: TriggerNegotiationNeededFnOption,
) -> Arc<Self> {
let t = Arc::new(RTCRtpTransceiver {
mid: Mutex::new(String::new()),
sender: Mutex::new(None),
receiver: Mutex::new(None),
direction: AtomicU8::new(direction as u8),
current_direction: AtomicU8::new(RTCRtpTransceiverDirection::Unspecified as u8),
codecs: Arc::new(Mutex::new(codecs)),
stopped: AtomicBool::new(false),
kind,
media_engine,
trigger_negotiation_needed: Mutex::new(trigger_negotiation_needed),
});
t.set_receiver(receiver).await;
t.set_sender(sender).await;
t
}
pub async fn set_codec_preferences(&self, codecs: Vec<RTCRtpCodecParameters>) -> Result<()> {
for codec in &codecs {
let media_engine_codecs = self.media_engine.get_codecs_by_kind(self.kind).await;
let (_, match_type) = codec_parameters_fuzzy_search(codec, &media_engine_codecs);
if match_type == CodecMatch::None {
return Err(Error::ErrRTPTransceiverCodecUnsupported);
}
}
{
let mut c = self.codecs.lock().await;
*c = codecs;
}
Ok(())
}
pub(crate) async fn get_codecs(&self) -> Vec<RTCRtpCodecParameters> {
let mut codecs = self.codecs.lock().await;
RTPReceiverInternal::get_codecs(&mut codecs, self.kind, &self.media_engine).await
}
pub async fn sender(&self) -> Option<Arc<RTCRtpSender>> {
let sender = self.sender.lock().await;
sender.clone()
}
pub async fn set_sender_track(
self: &Arc<Self>,
sender: Option<Arc<RTCRtpSender>>,
track: Option<Arc<dyn TrackLocal + Send + Sync>>,
) -> Result<()> {
self.set_sender(sender).await;
self.set_sending_track(track).await
}
pub async fn set_sender(self: &Arc<Self>, s: Option<Arc<RTCRtpSender>>) {
if let Some(sender) = &s {
sender.set_rtp_transceiver(Some(Arc::downgrade(self))).await;
}
if let Some(prev_sender) = self.sender().await {
prev_sender.set_rtp_transceiver(None).await;
}
{
let mut sender = self.sender.lock().await;
*sender = s;
}
}
pub async fn receiver(&self) -> Option<Arc<RTCRtpReceiver>> {
let receiver = self.receiver.lock().await;
receiver.clone()
}
pub(crate) async fn set_receiver(&self, r: Option<Arc<RTCRtpReceiver>>) {
if let Some(receiver) = &r {
receiver
.set_transceiver_codecs(Some(Arc::clone(&self.codecs)))
.await;
}
{
let mut receiver = self.receiver.lock().await;
if let Some(prev_receiver) = &*receiver {
prev_receiver.set_transceiver_codecs(None).await;
}
*receiver = r;
}
}
pub(crate) async fn set_mid(&self, mid: String) -> Result<()> {
let mut m = self.mid.lock().await;
if !m.is_empty() {
return Err(Error::ErrRTPTransceiverCannotChangeMid);
}
*m = mid;
Ok(())
}
pub async fn mid(&self) -> String {
let mid = self.mid.lock().await;
mid.clone()
}
pub fn kind(&self) -> RTPCodecType {
self.kind
}
pub fn direction(&self) -> RTCRtpTransceiverDirection {
self.direction.load(Ordering::SeqCst).into()
}
pub async fn set_direction(&self, d: RTCRtpTransceiverDirection) {
let changed = self.set_direction_internal(d);
if changed {
let lock = self.trigger_negotiation_needed.lock().await;
if let Some(trigger) = &*lock {
(trigger)().await;
}
}
}
pub(crate) fn set_direction_internal(&self, d: RTCRtpTransceiverDirection) -> bool {
let previous: RTCRtpTransceiverDirection =
self.direction.swap(d as u8, Ordering::SeqCst).into();
let changed = d != previous;
if changed {
trace!(
"Changing direction of transceiver from {} to {}",
previous,
d
);
}
changed
}
pub fn current_direction(&self) -> RTCRtpTransceiverDirection {
if self.stopped.load(Ordering::SeqCst) {
return RTCRtpTransceiverDirection::Unspecified;
}
self.current_direction.load(Ordering::SeqCst).into()
}
pub(crate) fn set_current_direction(&self, d: RTCRtpTransceiverDirection) {
let previous: RTCRtpTransceiverDirection = self
.current_direction
.swap(d as u8, Ordering::SeqCst)
.into();
if d != previous {
trace!(
"Changing current direction of transceiver from {} to {}",
previous,
d,
);
}
}
pub(crate) async fn process_new_current_direction(
&self,
previous_direction: RTCRtpTransceiverDirection,
) -> Result<()> {
if self.stopped.load(Ordering::SeqCst) {
return Ok(());
}
let current_direction = self.current_direction();
if previous_direction != current_direction {
let mid = self.mid().await;
trace!(
"Processing transceiver({}) direction change from {} to {}",
mid,
previous_direction,
current_direction
);
} else {
return Ok(());
}
if let Some(receiver) = &*self.receiver.lock().await {
let pause_receiver = !current_direction.has_recv();
if pause_receiver {
receiver.pause().await?;
} else {
receiver.resume().await?;
}
}
if let Some(sender) = &*self.sender.lock().await {
let pause_sender = !current_direction.has_send();
sender.set_paused(pause_sender);
}
Ok(())
}
pub async fn stop(&self) -> Result<()> {
if self.stopped.load(Ordering::SeqCst) {
return Ok(());
}
self.stopped.store(true, Ordering::SeqCst);
{
let s = self.sender.lock().await;
if let Some(sender) = &*s {
sender.stop().await?;
}
}
{
let r = self.receiver.lock().await;
if let Some(receiver) = &*r {
receiver.stop().await?;
}
}
self.set_direction_internal(RTCRtpTransceiverDirection::Inactive);
Ok(())
}
pub(crate) async fn set_sending_track(
&self,
track: Option<Arc<dyn TrackLocal + Send + Sync>>,
) -> Result<()> {
let track_is_none = track.is_none();
{
let mut s = self.sender.lock().await;
if let Some(sender) = &*s {
sender.replace_track(track).await?;
}
if track_is_none {
*s = None;
}
}
let direction = self.direction();
let should_send = !track_is_none;
let should_recv = direction.has_recv();
self.set_direction_internal(RTCRtpTransceiverDirection::from_send_recv(
should_send,
should_recv,
));
Ok(())
}
}
pub(crate) async fn find_by_mid(
mid: &str,
local_transceivers: &mut Vec<Arc<RTCRtpTransceiver>>,
) -> Option<Arc<RTCRtpTransceiver>> {
for (i, t) in local_transceivers.iter().enumerate() {
if t.mid().await == mid {
return Some(local_transceivers.remove(i));
}
}
None
}
pub(crate) async fn satisfy_type_and_direction(
remote_kind: RTPCodecType,
remote_direction: RTCRtpTransceiverDirection,
local_transceivers: &mut Vec<Arc<RTCRtpTransceiver>>,
) -> Option<Arc<RTCRtpTransceiver>> {
let get_preferred_directions = || -> Vec<RTCRtpTransceiverDirection> {
match remote_direction {
RTCRtpTransceiverDirection::Sendrecv => vec![
RTCRtpTransceiverDirection::Recvonly,
RTCRtpTransceiverDirection::Sendrecv,
],
RTCRtpTransceiverDirection::Sendonly => vec![RTCRtpTransceiverDirection::Recvonly],
RTCRtpTransceiverDirection::Recvonly => vec![
RTCRtpTransceiverDirection::Sendonly,
RTCRtpTransceiverDirection::Sendrecv,
],
_ => vec![],
}
};
for possible_direction in get_preferred_directions() {
for (i, t) in local_transceivers.iter().enumerate() {
if t.mid().await.is_empty()
&& t.kind == remote_kind
&& possible_direction == t.direction()
{
return Some(local_transceivers.remove(i));
}
}
}
None
}
pub(crate) fn handle_unknown_rtp_packet(
buf: &[u8],
mid_extension_id: u8,
sid_extension_id: u8,
rsid_extension_id: u8,
) -> Result<(String, String, String, PayloadType)> {
let mut reader = buf;
let rp = rtp::packet::Packet::unmarshal(&mut reader)?;
if !rp.header.extension {
return Ok((String::new(), String::new(), String::new(), 0));
}
let payload_type = rp.header.payload_type;
let mid = if let Some(payload) = rp.header.get_extension(mid_extension_id) {
String::from_utf8(payload.to_vec())?
} else {
String::new()
};
let rid = if let Some(payload) = rp.header.get_extension(sid_extension_id) {
String::from_utf8(payload.to_vec())?
} else {
String::new()
};
let srid = if let Some(payload) = rp.header.get_extension(rsid_extension_id) {
String::from_utf8(payload.to_vec())?
} else {
String::new()
};
Ok((mid, rid, srid, payload_type))
}