#[cfg(test)]
mod tests;
use crate::consumer::{RtpStreamParams, RtxStreamParams};
use crate::data_structures::{
AppData, RtpPacketTraceInfo, SrTraceInfo, SsrcTraceInfo, TraceEventDirection,
};
use crate::messages::{
ProducerCloseRequest, ProducerDumpRequest, ProducerEnableTraceEventRequest,
ProducerGetStatsRequest, ProducerPauseRequest, ProducerResumeRequest, ProducerSendNotification,
};
pub use crate::ortc::RtpMapping;
use crate::rtp_parameters::{MediaKind, MimeType, RtpParameters};
use crate::transport::Transport;
use crate::uuid_based_wrapper_type;
use crate::worker::{
Channel, NotificationError, NotificationParseError, RequestError, SubscriptionHandler,
};
use async_executor::Executor;
use event_listener_primitives::{Bag, BagOnce, HandlerId};
use log::{debug, error};
use mediasoup_sys::fbs::{notification, producer, response, rtp_parameters, rtp_stream};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use std::error::Error;
use std::fmt;
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
uuid_based_wrapper_type!(
ProducerId
);
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ProducerOptions {
pub(super) id: Option<ProducerId>,
pub kind: MediaKind,
pub rtp_parameters: RtpParameters,
pub paused: bool,
pub key_frame_request_delay: u32,
pub app_data: AppData,
}
impl ProducerOptions {
#[must_use]
pub fn new_pipe_transport(
producer_id: ProducerId,
kind: MediaKind,
rtp_parameters: RtpParameters,
) -> Self {
Self {
id: Some(producer_id),
kind,
rtp_parameters,
paused: false,
key_frame_request_delay: 0,
app_data: AppData::default(),
}
}
#[must_use]
pub fn new(kind: MediaKind, rtp_parameters: RtpParameters) -> Self {
Self {
id: None,
kind,
rtp_parameters,
paused: false,
key_frame_request_delay: 0,
app_data: AppData::default(),
}
}
}
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
pub struct RtxStream {
pub params: RtxStreamParams,
}
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
pub struct RtpStreamRecv {
pub params: RtpStreamParams,
pub score: u8,
pub rtx_stream: Option<RtxStream>,
}
impl RtpStreamRecv {
pub(crate) fn from_fbs_ref(dump: rtp_stream::DumpRef<'_>) -> Result<Self, Box<dyn Error>> {
Ok(Self {
params: RtpStreamParams::from_fbs_ref(dump.params()?)?,
score: dump.score()?,
rtx_stream: if let Some(rtx_stream) = dump.rtx_stream()? {
Some(RtxStream {
params: RtxStreamParams::from_fbs_ref(rtx_stream.params()?)?,
})
} else {
None
},
})
}
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
#[non_exhaustive]
pub struct ProducerDump {
pub id: ProducerId,
pub kind: MediaKind,
pub paused: bool,
pub rtp_mapping: RtpMapping,
pub rtp_parameters: RtpParameters,
pub rtp_streams: Vec<RtpStreamRecv>,
pub trace_event_types: Vec<ProducerTraceEventType>,
pub r#type: ProducerType,
}
impl ProducerDump {
pub(crate) fn from_fbs_ref(
dump: producer::DumpResponseRef<'_>,
) -> Result<Self, Box<dyn Error>> {
Ok(Self {
id: dump.id()?.parse()?,
kind: MediaKind::from_fbs(dump.kind()?),
paused: dump.paused()?,
rtp_mapping: RtpMapping::from_fbs_ref(dump.rtp_mapping()?)?,
rtp_parameters: RtpParameters::from_fbs_ref(dump.rtp_parameters()?)?,
rtp_streams: dump
.rtp_streams()?
.iter()
.map(|rtp_stream| RtpStreamRecv::from_fbs_ref(rtp_stream?))
.collect::<Result<_, Box<dyn Error>>>()?,
trace_event_types: dump
.trace_event_types()?
.iter()
.map(|trace_event_type| {
ProducerTraceEventType::from_fbs(&trace_event_type.unwrap())
})
.collect(),
r#type: ProducerType::from_fbs(dump.type_()?),
})
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ProducerType {
Simple,
Simulcast,
Svc,
}
impl ProducerType {
pub(crate) fn from_fbs(producer_type: rtp_parameters::Type) -> Self {
match producer_type {
rtp_parameters::Type::Simple => ProducerType::Simple,
rtp_parameters::Type::Simulcast => ProducerType::Simulcast,
rtp_parameters::Type::Svc => ProducerType::Svc,
rtp_parameters::Type::Pipe => unimplemented!(),
}
}
}
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ProducerScore {
pub encoding_idx: u32,
pub ssrc: u32,
pub rid: Option<String>,
pub score: u8,
}
impl ProducerScore {
pub(crate) fn from_fbs(producer_score: &producer::Score) -> Self {
Self {
encoding_idx: producer_score.encoding_idx,
ssrc: producer_score.ssrc,
rid: producer_score.rid.clone(),
score: producer_score.score,
}
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Deserialize_repr, Serialize_repr)]
#[repr(u16)]
pub enum Rotation {
None = 0,
Clockwise = 90,
Rotate180 = 180,
CounterClockwise = 270,
}
#[derive(Debug, Copy, Clone, Deserialize, Serialize)]
pub struct ProducerVideoOrientation {
pub camera: bool,
pub flip: bool,
pub rotation: Rotation,
}
impl ProducerVideoOrientation {
pub(crate) fn from_fbs(
video_orientation: producer::VideoOrientationChangeNotification,
) -> Self {
Self {
camera: video_orientation.camera,
flip: video_orientation.flip,
rotation: match video_orientation.rotation {
0 => Rotation::None,
90 => Rotation::Clockwise,
180 => Rotation::Rotate180,
270 => Rotation::CounterClockwise,
_ => Rotation::None,
},
}
}
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
#[allow(missing_docs)]
pub struct BitrateByLayer {
layer: String,
bitrate: u32,
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
#[allow(missing_docs)]
pub struct ProducerStat {
pub timestamp: u64,
pub ssrc: u32,
pub rtx_ssrc: Option<u32>,
pub rid: Option<String>,
pub kind: MediaKind,
pub mime_type: MimeType,
pub packets_lost: u64,
pub fraction_lost: u8,
pub packets_discarded: u64,
pub packets_retransmitted: u64,
pub packets_repaired: u64,
pub nack_count: u64,
pub nack_packet_count: u64,
pub pli_count: u64,
pub fir_count: u64,
pub score: u8,
pub packet_count: u64,
pub byte_count: u64,
pub bitrate: u32,
pub round_trip_time: Option<f32>,
pub rtx_packets_discarded: Option<u64>,
pub jitter: u32,
pub bitrate_by_layer: Vec<BitrateByLayer>,
}
impl ProducerStat {
pub(crate) fn from_fbs(stats: &rtp_stream::Stats) -> Self {
let rtp_stream::StatsData::RecvStats(ref stats) = stats.data else {
panic!("Wrong message from worker");
};
let rtp_stream::StatsData::BaseStats(ref base) = stats.base.data else {
panic!("Wrong message from worker");
};
Self {
timestamp: base.timestamp,
ssrc: base.ssrc,
rtx_ssrc: base.rtx_ssrc,
rid: base.rid.clone(),
kind: MediaKind::from_fbs(base.kind),
mime_type: base.mime_type.to_string().parse().unwrap(),
packets_lost: base.packets_lost,
fraction_lost: base.fraction_lost,
packets_discarded: base.packets_discarded,
packets_retransmitted: base.packets_retransmitted,
packets_repaired: base.packets_repaired,
nack_count: base.nack_count,
nack_packet_count: base.nack_packet_count,
pli_count: base.pli_count,
fir_count: base.fir_count,
score: base.score,
packet_count: stats.packet_count,
byte_count: stats.byte_count,
bitrate: stats.bitrate,
round_trip_time: Some(base.round_trip_time),
rtx_packets_discarded: Some(base.rtx_packets_discarded),
jitter: stats.jitter,
bitrate_by_layer: stats
.bitrate_by_layer
.iter()
.map(|bitrate_by_layer| BitrateByLayer {
layer: bitrate_by_layer.layer.to_string(),
bitrate: bitrate_by_layer.bitrate,
})
.collect(),
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum ProducerTraceEventData {
Rtp {
timestamp: u64,
direction: TraceEventDirection,
info: RtpPacketTraceInfo,
},
KeyFrame {
timestamp: u64,
direction: TraceEventDirection,
info: RtpPacketTraceInfo,
},
Nack {
timestamp: u64,
direction: TraceEventDirection,
},
Pli {
timestamp: u64,
direction: TraceEventDirection,
info: SsrcTraceInfo,
},
Fir {
timestamp: u64,
direction: TraceEventDirection,
info: SsrcTraceInfo,
},
Sr {
timestamp: u64,
direction: TraceEventDirection,
info: SrTraceInfo,
},
}
impl ProducerTraceEventData {
pub(crate) fn from_fbs(data: producer::TraceNotification) -> Self {
match data.type_ {
producer::TraceEventType::Rtp => ProducerTraceEventData::Rtp {
timestamp: data.timestamp,
direction: TraceEventDirection::from_fbs(data.direction),
info: {
let Some(producer::TraceInfo::RtpTraceInfo(info)) = data.info else {
panic!("Wrong message from worker: {data:?}");
};
RtpPacketTraceInfo::from_fbs(*info.rtp_packet, info.is_rtx)
},
},
producer::TraceEventType::Keyframe => ProducerTraceEventData::KeyFrame {
timestamp: data.timestamp,
direction: TraceEventDirection::from_fbs(data.direction),
info: {
let Some(producer::TraceInfo::KeyFrameTraceInfo(info)) = data.info else {
panic!("Wrong message from worker: {data:?}");
};
RtpPacketTraceInfo::from_fbs(*info.rtp_packet, info.is_rtx)
},
},
producer::TraceEventType::Nack => ProducerTraceEventData::Nack {
timestamp: data.timestamp,
direction: TraceEventDirection::from_fbs(data.direction),
},
producer::TraceEventType::Pli => ProducerTraceEventData::Pli {
timestamp: data.timestamp,
direction: TraceEventDirection::from_fbs(data.direction),
info: {
let Some(producer::TraceInfo::PliTraceInfo(info)) = data.info else {
panic!("Wrong message from worker: {data:?}");
};
SsrcTraceInfo { ssrc: info.ssrc }
},
},
producer::TraceEventType::Fir => ProducerTraceEventData::Fir {
timestamp: data.timestamp,
direction: TraceEventDirection::from_fbs(data.direction),
info: {
let Some(producer::TraceInfo::FirTraceInfo(info)) = data.info else {
panic!("Wrong message from worker: {data:?}");
};
SsrcTraceInfo { ssrc: info.ssrc }
},
},
producer::TraceEventType::Sr => ProducerTraceEventData::Sr {
timestamp: data.timestamp,
direction: TraceEventDirection::from_fbs(data.direction),
info: {
let Some(producer::TraceInfo::SrTraceInfo(info)) = data.info else {
panic!("Wrong message from worker: {data:?}");
};
SrTraceInfo::from_fbs(*info)
},
},
}
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ProducerTraceEventType {
Rtp,
KeyFrame,
Nack,
Pli,
Fir,
SR,
}
impl ProducerTraceEventType {
pub(crate) fn to_fbs(self) -> producer::TraceEventType {
match self {
ProducerTraceEventType::Rtp => producer::TraceEventType::Rtp,
ProducerTraceEventType::KeyFrame => producer::TraceEventType::Keyframe,
ProducerTraceEventType::Nack => producer::TraceEventType::Nack,
ProducerTraceEventType::Pli => producer::TraceEventType::Pli,
ProducerTraceEventType::Fir => producer::TraceEventType::Fir,
ProducerTraceEventType::SR => producer::TraceEventType::Sr,
}
}
pub(crate) fn from_fbs(event_type: &producer::TraceEventType) -> Self {
match event_type {
producer::TraceEventType::Rtp => ProducerTraceEventType::Rtp,
producer::TraceEventType::Keyframe => ProducerTraceEventType::KeyFrame,
producer::TraceEventType::Nack => ProducerTraceEventType::Nack,
producer::TraceEventType::Pli => ProducerTraceEventType::Pli,
producer::TraceEventType::Fir => ProducerTraceEventType::Fir,
producer::TraceEventType::Sr => ProducerTraceEventType::SR,
}
}
}
#[derive(Debug, Deserialize)]
#[serde(tag = "event", rename_all = "lowercase", content = "data")]
enum Notification {
Score(Vec<ProducerScore>),
VideoOrientationChange(ProducerVideoOrientation),
Trace(ProducerTraceEventData),
}
impl Notification {
pub(crate) fn from_fbs(
notification: notification::NotificationRef<'_>,
) -> Result<Self, NotificationParseError> {
match notification.event().unwrap() {
notification::Event::ProducerScore => {
let Ok(Some(notification::BodyRef::ProducerScoreNotification(body))) =
notification.body()
else {
panic!("Wrong message from worker: {notification:?}");
};
let scores_fbs: Vec<_> = body
.scores()
.unwrap()
.iter()
.map(|score| producer::Score::try_from(score.unwrap()).unwrap())
.collect();
let scores = scores_fbs.iter().map(ProducerScore::from_fbs).collect();
Ok(Notification::Score(scores))
}
notification::Event::ProducerVideoOrientationChange => {
let Ok(Some(notification::BodyRef::ProducerVideoOrientationChangeNotification(
body,
))) = notification.body()
else {
panic!("Wrong message from worker: {notification:?}");
};
let video_orientation_fbs =
producer::VideoOrientationChangeNotification::try_from(body).unwrap();
let video_orientation = ProducerVideoOrientation::from_fbs(video_orientation_fbs);
Ok(Notification::VideoOrientationChange(video_orientation))
}
notification::Event::ProducerTrace => {
let Ok(Some(notification::BodyRef::ProducerTraceNotification(body))) =
notification.body()
else {
panic!("Wrong message from worker: {notification:?}");
};
let trace_notification_fbs = producer::TraceNotification::try_from(body).unwrap();
let trace_notification = ProducerTraceEventData::from_fbs(trace_notification_fbs);
Ok(Notification::Trace(trace_notification))
}
_ => Err(NotificationParseError::InvalidEvent),
}
}
}
#[derive(Default)]
#[allow(clippy::type_complexity)]
struct Handlers {
score: Bag<Arc<dyn Fn(&[ProducerScore]) + Send + Sync>>,
video_orientation_change: Bag<Arc<dyn Fn(ProducerVideoOrientation) + Send + Sync>>,
pause: Bag<Arc<dyn Fn() + Send + Sync>>,
resume: Bag<Arc<dyn Fn() + Send + Sync>>,
trace: Bag<Arc<dyn Fn(&ProducerTraceEventData) + Send + Sync>, ProducerTraceEventData>,
transport_close: BagOnce<Box<dyn FnOnce() + Send>>,
close: BagOnce<Box<dyn FnOnce() + Send>>,
}
struct Inner {
id: ProducerId,
kind: MediaKind,
r#type: ProducerType,
rtp_parameters: RtpParameters,
consumable_rtp_parameters: RtpParameters,
direct: bool,
paused: AtomicBool,
score: Arc<Mutex<Vec<ProducerScore>>>,
executor: Arc<Executor<'static>>,
channel: Channel,
handlers: Arc<Handlers>,
app_data: AppData,
transport: Arc<dyn Transport>,
closed: AtomicBool,
_subscription_handler: Mutex<Option<SubscriptionHandler>>,
_on_transport_close_handler: Mutex<HandlerId>,
}
impl Drop for Inner {
fn drop(&mut self) {
debug!("drop()");
self.close(true);
}
}
impl Inner {
fn close(&self, close_request: bool) {
if !self.closed.swap(true, Ordering::SeqCst) {
debug!("close()");
self.handlers.close.call_simple();
if close_request {
let channel = self.channel.clone();
let transport_id = self.transport.id();
let request = ProducerCloseRequest {
producer_id: self.id,
};
self.executor
.spawn(async move {
if let Err(error) = channel.request(transport_id, request).await {
error!("producer closing failed on drop: {}", error);
}
})
.detach();
}
}
}
}
#[derive(Clone)]
#[must_use = "Producer will be closed on drop, make sure to keep it around for as long as needed"]
pub struct RegularProducer {
inner: Arc<Inner>,
}
impl fmt::Debug for RegularProducer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RegularProducer")
.field("id", &self.inner.id)
.field("kind", &self.inner.kind)
.field("type", &self.inner.r#type)
.field("rtp_parameters", &self.inner.rtp_parameters)
.field(
"consumable_rtp_parameters",
&self.inner.consumable_rtp_parameters,
)
.field("paused", &self.inner.paused)
.field("score", &self.inner.score)
.field("transport", &self.inner.transport)
.field("closed", &self.inner.closed)
.finish()
}
}
impl From<RegularProducer> for Producer {
fn from(producer: RegularProducer) -> Self {
Producer::Regular(producer)
}
}
#[derive(Clone)]
#[must_use = "Producer will be closed on drop, make sure to keep it around for as long as needed"]
pub struct DirectProducer {
inner: Arc<Inner>,
}
impl fmt::Debug for DirectProducer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DirectProducer")
.field("id", &self.inner.id)
.field("kind", &self.inner.kind)
.field("type", &self.inner.r#type)
.field("rtp_parameters", &self.inner.rtp_parameters)
.field(
"consumable_rtp_parameters",
&self.inner.consumable_rtp_parameters,
)
.field("paused", &self.inner.paused)
.field("score", &self.inner.score)
.field("transport", &self.inner.transport)
.field("closed", &self.inner.closed)
.finish()
}
}
impl From<DirectProducer> for Producer {
fn from(producer: DirectProducer) -> Self {
Producer::Direct(producer)
}
}
#[derive(Clone)]
#[non_exhaustive]
#[must_use = "Producer will be closed on drop, make sure to keep it around for as long as needed"]
pub enum Producer {
Regular(RegularProducer),
Direct(DirectProducer),
}
impl fmt::Debug for Producer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self {
Producer::Regular(producer) => f.debug_tuple("Regular").field(&producer).finish(),
Producer::Direct(producer) => f.debug_tuple("Direct").field(&producer).finish(),
}
}
}
impl Producer {
#[allow(clippy::too_many_arguments)]
pub(super) async fn new(
id: ProducerId,
kind: MediaKind,
r#type: ProducerType,
rtp_parameters: RtpParameters,
consumable_rtp_parameters: RtpParameters,
paused: bool,
executor: Arc<Executor<'static>>,
channel: Channel,
app_data: AppData,
transport: Arc<dyn Transport>,
direct: bool,
) -> Self {
debug!("new()");
let handlers = Arc::<Handlers>::default();
let score = Arc::<Mutex<Vec<ProducerScore>>>::default();
let subscription_handler = {
let handlers = Arc::clone(&handlers);
let score = Arc::clone(&score);
channel.subscribe_to_notifications(id.into(), move |notification| {
match Notification::from_fbs(notification) {
Ok(notification) => match notification {
Notification::Score(scores) => {
*score.lock() = scores.clone();
handlers.score.call(|callback| {
callback(&scores);
});
}
Notification::VideoOrientationChange(video_orientation) => {
handlers.video_orientation_change.call(|callback| {
callback(video_orientation);
});
}
Notification::Trace(trace_event_data) => {
handlers.trace.call_simple(&trace_event_data);
}
},
Err(error) => {
error!("Failed to parse notification: {}", error);
}
}
})
};
let inner_weak = Arc::<Mutex<Option<Weak<Inner>>>>::default();
let on_transport_close_handler = transport.on_close({
let inner_weak = Arc::clone(&inner_weak);
Box::new(move || {
let maybe_inner = inner_weak.lock().as_ref().and_then(Weak::upgrade);
if let Some(inner) = maybe_inner {
inner.handlers.transport_close.call_simple();
inner.close(false);
}
})
});
let inner = Arc::new(Inner {
id,
kind,
r#type,
rtp_parameters,
consumable_rtp_parameters,
direct,
paused: AtomicBool::new(paused),
score,
executor,
channel,
handlers,
app_data,
transport,
closed: AtomicBool::new(false),
_subscription_handler: Mutex::new(subscription_handler),
_on_transport_close_handler: Mutex::new(on_transport_close_handler),
});
inner_weak.lock().replace(Arc::downgrade(&inner));
if direct {
Self::Direct(DirectProducer { inner })
} else {
Self::Regular(RegularProducer { inner })
}
}
#[must_use]
pub fn id(&self) -> ProducerId {
self.inner().id
}
pub fn transport(&self) -> &Arc<dyn Transport> {
&self.inner().transport
}
#[must_use]
pub fn kind(&self) -> MediaKind {
self.inner().kind
}
#[must_use]
pub fn rtp_parameters(&self) -> &RtpParameters {
&self.inner().rtp_parameters
}
#[must_use]
pub fn r#type(&self) -> ProducerType {
self.inner().r#type
}
#[must_use]
pub fn paused(&self) -> bool {
self.inner().paused.load(Ordering::SeqCst)
}
#[must_use]
pub fn score(&self) -> Vec<ProducerScore> {
self.inner().score.lock().clone()
}
#[must_use]
pub fn app_data(&self) -> &AppData {
&self.inner().app_data
}
#[must_use]
pub fn closed(&self) -> bool {
self.inner().closed.load(Ordering::SeqCst)
}
#[doc(hidden)]
pub async fn dump(&self) -> Result<ProducerDump, RequestError> {
debug!("dump()");
self.inner()
.channel
.request(self.id(), ProducerDumpRequest {})
.await
}
pub async fn get_stats(&self) -> Result<Vec<ProducerStat>, RequestError> {
debug!("get_stats()");
let response = self
.inner()
.channel
.request(self.id(), ProducerGetStatsRequest {})
.await;
if let Ok(response::Body::ProducerGetStatsResponse(data)) = response {
Ok(data.stats.iter().map(ProducerStat::from_fbs).collect())
} else {
panic!("Wrong message from worker");
}
}
pub async fn pause(&self) -> Result<(), RequestError> {
debug!("pause()");
self.inner()
.channel
.request(self.id(), ProducerPauseRequest {})
.await?;
let was_paused = self.inner().paused.swap(true, Ordering::SeqCst);
if !was_paused {
self.inner().handlers.pause.call_simple();
}
Ok(())
}
pub async fn resume(&self) -> Result<(), RequestError> {
debug!("resume()");
self.inner()
.channel
.request(self.id(), ProducerResumeRequest {})
.await?;
let was_paused = self.inner().paused.swap(false, Ordering::SeqCst);
if was_paused {
self.inner().handlers.resume.call_simple();
}
Ok(())
}
pub async fn enable_trace_event(
&self,
types: Vec<ProducerTraceEventType>,
) -> Result<(), RequestError> {
debug!("enable_trace_event()");
self.inner()
.channel
.request(self.id(), ProducerEnableTraceEventRequest { types })
.await
}
pub fn on_score<F: Fn(&[ProducerScore]) + Send + Sync + 'static>(
&self,
callback: F,
) -> HandlerId {
self.inner().handlers.score.add(Arc::new(callback))
}
pub fn on_video_orientation_change<F: Fn(ProducerVideoOrientation) + Send + Sync + 'static>(
&self,
callback: F,
) -> HandlerId {
self.inner()
.handlers
.video_orientation_change
.add(Arc::new(callback))
}
pub fn on_pause<F: Fn() + Send + Sync + 'static>(&self, callback: F) -> HandlerId {
self.inner().handlers.pause.add(Arc::new(callback))
}
pub fn on_resume<F: Fn() + Send + Sync + 'static>(&self, callback: F) -> HandlerId {
self.inner().handlers.resume.add(Arc::new(callback))
}
pub fn on_trace<F: Fn(&ProducerTraceEventData) + Send + Sync + 'static>(
&self,
callback: F,
) -> HandlerId {
self.inner().handlers.trace.add(Arc::new(callback))
}
pub fn on_transport_close<F: FnOnce() + Send + 'static>(&self, callback: F) -> HandlerId {
self.inner()
.handlers
.transport_close
.add(Box::new(callback))
}
pub fn on_close<F: FnOnce() + Send + 'static>(&self, callback: F) -> HandlerId {
let handler_id = self.inner().handlers.close.add(Box::new(callback));
if self.inner().closed.load(Ordering::Relaxed) {
self.inner().handlers.close.call_simple();
}
handler_id
}
#[doc(hidden)]
#[must_use]
pub fn consumable_rtp_parameters(&self) -> &RtpParameters {
&self.inner().consumable_rtp_parameters
}
pub(super) fn close(&self) {
self.inner().close(true);
}
#[must_use]
pub fn downgrade(&self) -> WeakProducer {
WeakProducer {
inner: Arc::downgrade(self.inner()),
}
}
fn inner(&self) -> &Arc<Inner> {
match self {
Producer::Regular(producer) => &producer.inner,
Producer::Direct(producer) => &producer.inner,
}
}
}
impl DirectProducer {
pub fn send(&self, rtp_packet: Vec<u8>) -> Result<(), NotificationError> {
self.inner
.channel
.notify(self.inner.id, ProducerSendNotification { rtp_packet })
}
}
pub struct PipedProducer {
producer: Producer,
on_drop: Option<Box<dyn FnOnce(Producer) + Send + 'static>>,
}
impl fmt::Debug for PipedProducer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PipedProducer")
.field("producer", &self.producer)
.finish()
}
}
impl Drop for PipedProducer {
fn drop(&mut self) {
if let Some(on_drop) = self.on_drop.take() {
on_drop(self.producer.clone())
}
}
}
impl PipedProducer {
pub(crate) fn new<F: FnOnce(Producer) + Send + 'static>(
producer: Producer,
on_drop: F,
) -> Self {
Self {
producer,
on_drop: Some(Box::new(on_drop)),
}
}
pub fn into_inner(mut self) -> Producer {
self.on_drop.take();
self.producer.clone()
}
}
#[derive(Clone)]
pub struct WeakProducer {
inner: Weak<Inner>,
}
impl fmt::Debug for WeakProducer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WeakProducer").finish()
}
}
impl WeakProducer {
#[must_use]
pub fn upgrade(&self) -> Option<Producer> {
let inner = self.inner.upgrade()?;
let producer = if inner.direct {
Producer::Direct(DirectProducer { inner })
} else {
Producer::Regular(RegularProducer { inner })
};
Some(producer)
}
}