use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Weak};
use arc_swap::ArcSwapOption;
use interceptor::{Attributes, Interceptor};
use portable_atomic::{AtomicU32, AtomicU8, AtomicUsize};
use smol_str::SmolStr;
use tokio::sync::Mutex;
use util::sync::Mutex as SyncMutex;
use crate::api::media_engine::MediaEngine;
use crate::error::{Error, Result};
use crate::rtp_transceiver::rtp_codec::{RTCRtpCodecParameters, RTCRtpParameters, RTPCodecType};
use crate::rtp_transceiver::rtp_receiver::RTPReceiverInternal;
use crate::rtp_transceiver::{PayloadType, SSRC};
lazy_static! {
static ref TRACK_REMOTE_UNIQUE_ID: AtomicUsize = AtomicUsize::new(0);
}
pub type OnMuteHdlrFn = Box<
dyn (FnMut() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>) + Send + Sync + 'static,
>;
#[derive(Default)]
struct Handlers {
on_mute: ArcSwapOption<Mutex<OnMuteHdlrFn>>,
on_unmute: ArcSwapOption<Mutex<OnMuteHdlrFn>>,
}
#[derive(Default)]
struct TrackRemoteInternal {
peeked: VecDeque<(rtp::packet::Packet, Attributes)>,
}
pub struct TrackRemote {
tid: usize,
id: SyncMutex<String>,
stream_id: SyncMutex<String>,
receive_mtu: usize,
payload_type: AtomicU8, kind: AtomicU8, ssrc: AtomicU32, codec: SyncMutex<RTCRtpCodecParameters>,
pub(crate) params: SyncMutex<RTCRtpParameters>,
rid: SmolStr,
media_engine: Arc<MediaEngine>,
interceptor: Arc<dyn Interceptor + Send + Sync>,
handlers: Arc<Handlers>,
receiver: Option<Weak<RTPReceiverInternal>>,
internal: Mutex<TrackRemoteInternal>,
}
impl std::fmt::Debug for TrackRemote {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TrackRemote")
.field("id", &self.id)
.field("stream_id", &self.stream_id)
.field("payload_type", &self.payload_type)
.field("kind", &self.kind)
.field("ssrc", &self.ssrc)
.field("codec", &self.codec)
.field("params", &self.params)
.field("rid", &self.rid)
.finish()
}
}
impl TrackRemote {
pub(crate) fn new(
receive_mtu: usize,
kind: RTPCodecType,
ssrc: SSRC,
rid: SmolStr,
receiver: Weak<RTPReceiverInternal>,
media_engine: Arc<MediaEngine>,
interceptor: Arc<dyn Interceptor + Send + Sync>,
) -> Self {
TrackRemote {
tid: TRACK_REMOTE_UNIQUE_ID.fetch_add(1, Ordering::SeqCst),
id: Default::default(),
stream_id: Default::default(),
receive_mtu,
payload_type: Default::default(),
kind: AtomicU8::new(kind as u8),
ssrc: AtomicU32::new(ssrc),
codec: Default::default(),
params: Default::default(),
rid,
receiver: Some(receiver),
media_engine,
interceptor,
handlers: Default::default(),
internal: Default::default(),
}
}
pub fn tid(&self) -> usize {
self.tid
}
pub fn id(&self) -> String {
let id = self.id.lock();
id.clone()
}
pub fn set_id(&self, s: String) {
let mut id = self.id.lock();
*id = s;
}
pub fn stream_id(&self) -> String {
let stream_id = self.stream_id.lock();
stream_id.clone()
}
pub fn set_stream_id(&self, s: String) {
let mut stream_id = self.stream_id.lock();
*stream_id = s;
}
pub fn rid(&self) -> &str {
self.rid.as_str()
}
pub fn payload_type(&self) -> PayloadType {
self.payload_type.load(Ordering::SeqCst)
}
pub fn set_payload_type(&self, payload_type: PayloadType) {
self.payload_type.store(payload_type, Ordering::SeqCst);
}
pub fn kind(&self) -> RTPCodecType {
self.kind.load(Ordering::SeqCst).into()
}
pub fn set_kind(&self, kind: RTPCodecType) {
self.kind.store(kind as u8, Ordering::SeqCst);
}
pub fn ssrc(&self) -> SSRC {
self.ssrc.load(Ordering::SeqCst)
}
pub fn set_ssrc(&self, ssrc: SSRC) {
self.ssrc.store(ssrc, Ordering::SeqCst);
}
pub fn msid(&self) -> String {
format!("{} {}", self.stream_id(), self.id())
}
pub fn codec(&self) -> RTCRtpCodecParameters {
let codec = self.codec.lock();
codec.clone()
}
pub fn set_codec(&self, codec: RTCRtpCodecParameters) {
let mut c = self.codec.lock();
*c = codec;
}
pub fn params(&self) -> RTCRtpParameters {
let p = self.params.lock();
p.clone()
}
pub fn set_params(&self, params: RTCRtpParameters) {
let mut p = self.params.lock();
*p = params;
}
pub fn onmute<F>(&self, handler: F)
where
F: FnMut() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static + Sync,
{
self.handlers
.on_mute
.store(Some(Arc::new(Mutex::new(Box::new(handler)))));
}
pub fn onunmute<F>(&self, handler: F)
where
F: FnMut() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static + Sync,
{
self.handlers
.on_unmute
.store(Some(Arc::new(Mutex::new(Box::new(handler)))));
}
pub async fn read(&self, b: &mut [u8]) -> Result<(rtp::packet::Packet, Attributes)> {
{
let mut internal = self.internal.lock().await;
if let Some((pkt, attributes)) = internal.peeked.pop_front() {
self.check_and_update_track(&pkt).await?;
return Ok((pkt, attributes));
}
};
let receiver = match self.receiver.as_ref().and_then(|r| r.upgrade()) {
Some(r) => r,
None => return Err(Error::ErrRTPReceiverNil),
};
let (pkt, attributes) = receiver.read_rtp(b, self.tid).await?;
self.check_and_update_track(&pkt).await?;
Ok((pkt, attributes))
}
pub(crate) async fn check_and_update_track(&self, pkt: &rtp::packet::Packet) -> Result<()> {
let payload_type = pkt.header.payload_type;
if payload_type != self.payload_type() {
let p = self
.media_engine
.get_rtp_parameters_by_payload_type(payload_type)
.await?;
if let Some(receiver) = &self.receiver {
if let Some(receiver) = receiver.upgrade() {
self.kind.store(receiver.kind as u8, Ordering::SeqCst);
}
}
self.payload_type.store(payload_type, Ordering::SeqCst);
{
let mut codec = self.codec.lock();
*codec = if let Some(codec) = p.codecs.first() {
codec.clone()
} else {
return Err(Error::ErrCodecNotFound);
};
}
{
let mut params = self.params.lock();
*params = p;
}
}
Ok(())
}
pub async fn read_rtp(&self) -> Result<(rtp::packet::Packet, Attributes)> {
let mut b = vec![0u8; self.receive_mtu];
let (pkt, attributes) = self.read(&mut b).await?;
Ok((pkt, attributes))
}
pub(crate) async fn peek(&self, b: &mut [u8]) -> Result<(rtp::packet::Packet, Attributes)> {
let (pkt, a) = self.read(b).await?;
{
let mut internal = self.internal.lock().await;
internal.peeked.push_back((pkt.clone(), a.clone()));
}
Ok((pkt, a))
}
pub(crate) async fn prepopulate_peeked_data(
&self,
data: VecDeque<(rtp::packet::Packet, Attributes)>,
) {
let mut internal = self.internal.lock().await;
internal.peeked = data;
}
pub(crate) async fn fire_onmute(&self) {
let on_mute = self.handlers.on_mute.load();
if let Some(f) = on_mute.as_ref() {
(f.lock().await)().await
};
}
pub(crate) async fn fire_onunmute(&self) {
let on_unmute = self.handlers.on_unmute.load();
if let Some(f) = on_unmute.as_ref() {
(f.lock().await)().await
};
}
}