#[cfg(test)]
mod tests;
use crate::consumer::RtpStreamParams;
use crate::data_structures::{AppData, RtpPacketTraceInfo, SsrcTraceInfo, TraceEventDirection};
use crate::messages::{
ProducerCloseRequest, ProducerDumpRequest, ProducerEnableTraceEventRequest,
ProducerGetStatsRequest, ProducerPauseRequest, ProducerResumeRequest, ProducerSendNotification,
};
pub use crate::ortc::RtpMapping;
use crate::rtp_parameters::{MediaKind, MimeType, RtpParameters};
use crate::transport::Transport;
use crate::uuid_based_wrapper_type;
use crate::worker::{
Channel, NotificationError, PayloadChannel, RequestError, SubscriptionHandler,
};
use async_executor::Executor;
use event_listener_primitives::{Bag, BagOnce, HandlerId};
use hash_hasher::HashedMap;
use log::{debug, error};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use std::fmt;
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
uuid_based_wrapper_type!(
ProducerId
);
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ProducerOptions {
pub(super) id: Option<ProducerId>,
pub kind: MediaKind,
pub rtp_parameters: RtpParameters,
pub paused: bool,
pub key_frame_request_delay: u32,
pub app_data: AppData,
}
impl ProducerOptions {
#[must_use]
pub fn new_pipe_transport(
producer_id: ProducerId,
kind: MediaKind,
rtp_parameters: RtpParameters,
) -> Self {
Self {
id: Some(producer_id),
kind,
rtp_parameters,
paused: false,
key_frame_request_delay: 0,
app_data: AppData::default(),
}
}
#[must_use]
pub fn new(kind: MediaKind, rtp_parameters: RtpParameters) -> Self {
Self {
id: None,
kind,
rtp_parameters,
paused: false,
key_frame_request_delay: 0,
app_data: AppData::default(),
}
}
}
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
pub struct RtpStreamRecv {
pub params: RtpStreamParams,
pub score: u8,
pub jitter: u32,
pub packet_count: usize,
pub byte_count: usize,
pub bitrate: u32,
pub bitrate_by_layer: Option<HashedMap<String, u32>>,
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
#[non_exhaustive]
pub struct ProducerDump {
pub id: ProducerId,
pub kind: MediaKind,
pub paused: bool,
pub rtp_mapping: RtpMapping,
pub rtp_parameters: RtpParameters,
pub rtp_streams: Vec<RtpStreamRecv>,
pub trace_event_types: String,
pub r#type: ProducerType,
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ProducerType {
Simple,
Simulcast,
Svc,
}
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ProducerScore {
pub encoding_idx: u32,
pub ssrc: u32,
pub rid: Option<String>,
pub score: u8,
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Deserialize_repr, Serialize_repr)]
#[repr(u16)]
pub enum Rotation {
None = 0,
Clockwise = 90,
Rotate180 = 180,
CounterClockwise = 270,
}
#[derive(Debug, Copy, Clone, Deserialize, Serialize)]
pub struct ProducerVideoOrientation {
pub camera: bool,
pub flip: bool,
pub rotation: Rotation,
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
#[allow(missing_docs)]
pub struct ProducerStat {
pub timestamp: u64,
pub ssrc: u32,
pub rtx_ssrc: Option<u32>,
pub rid: Option<String>,
pub kind: String,
pub mime_type: MimeType,
pub packets_lost: u32,
pub fraction_lost: u8,
pub packets_discarded: usize,
pub packets_retransmitted: usize,
pub packets_repaired: usize,
pub nack_count: usize,
pub nack_packet_count: usize,
pub pli_count: usize,
pub fir_count: usize,
pub score: u8,
pub packet_count: usize,
pub byte_count: usize,
pub bitrate: u32,
pub round_trip_time: Option<f32>,
pub rtx_packets_discarded: Option<u32>,
pub jitter: u32,
pub bitrate_by_layer: Option<HashedMap<String, u32>>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum ProducerTraceEventData {
Rtp {
timestamp: u64,
direction: TraceEventDirection,
info: RtpPacketTraceInfo,
},
KeyFrame {
timestamp: u64,
direction: TraceEventDirection,
info: RtpPacketTraceInfo,
},
Nack {
timestamp: u64,
direction: TraceEventDirection,
},
Pli {
timestamp: u64,
direction: TraceEventDirection,
info: SsrcTraceInfo,
},
Fir {
timestamp: u64,
direction: TraceEventDirection,
info: SsrcTraceInfo,
},
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ProducerTraceEventType {
Rtp,
KeyFrame,
Nack,
Pli,
Fir,
}
#[derive(Debug, Deserialize)]
#[serde(tag = "event", rename_all = "lowercase", content = "data")]
enum Notification {
Score(Vec<ProducerScore>),
VideoOrientationChange(ProducerVideoOrientation),
Trace(ProducerTraceEventData),
}
#[derive(Default)]
#[allow(clippy::type_complexity)]
struct Handlers {
score: Bag<Arc<dyn Fn(&[ProducerScore]) + Send + Sync>>,
video_orientation_change: Bag<Arc<dyn Fn(ProducerVideoOrientation) + Send + Sync>>,
pause: Bag<Arc<dyn Fn() + Send + Sync>>,
resume: Bag<Arc<dyn Fn() + Send + Sync>>,
trace: Bag<Arc<dyn Fn(&ProducerTraceEventData) + Send + Sync>, ProducerTraceEventData>,
transport_close: BagOnce<Box<dyn FnOnce() + Send>>,
close: BagOnce<Box<dyn FnOnce() + Send>>,
}
struct Inner {
id: ProducerId,
kind: MediaKind,
r#type: ProducerType,
rtp_parameters: RtpParameters,
consumable_rtp_parameters: RtpParameters,
direct: bool,
paused: AtomicBool,
score: Arc<Mutex<Vec<ProducerScore>>>,
executor: Arc<Executor<'static>>,
channel: Channel,
payload_channel: PayloadChannel,
handlers: Arc<Handlers>,
app_data: AppData,
transport: Arc<dyn Transport>,
closed: AtomicBool,
_subscription_handler: Mutex<Option<SubscriptionHandler>>,
_on_transport_close_handler: Mutex<HandlerId>,
}
impl Drop for Inner {
fn drop(&mut self) {
debug!("drop()");
self.close(true);
}
}
impl Inner {
fn close(&self, close_request: bool) {
if !self.closed.swap(true, Ordering::SeqCst) {
debug!("close()");
self.handlers.close.call_simple();
if close_request {
let channel = self.channel.clone();
let transport_id = self.transport.id();
let request = ProducerCloseRequest {
producer_id: self.id,
};
self.executor
.spawn(async move {
if let Err(error) = channel.request(transport_id, request).await {
error!("producer closing failed on drop: {}", error);
}
})
.detach();
}
}
}
}
#[derive(Clone)]
#[must_use = "Producer will be closed on drop, make sure to keep it around for as long as needed"]
pub struct RegularProducer {
inner: Arc<Inner>,
}
impl fmt::Debug for RegularProducer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RegularProducer")
.field("id", &self.inner.id)
.field("kind", &self.inner.kind)
.field("type", &self.inner.r#type)
.field("rtp_parameters", &self.inner.rtp_parameters)
.field(
"consumable_rtp_parameters",
&self.inner.consumable_rtp_parameters,
)
.field("paused", &self.inner.paused)
.field("score", &self.inner.score)
.field("transport", &self.inner.transport)
.field("closed", &self.inner.closed)
.finish()
}
}
impl From<RegularProducer> for Producer {
fn from(producer: RegularProducer) -> Self {
Producer::Regular(producer)
}
}
#[derive(Clone)]
#[must_use = "Producer will be closed on drop, make sure to keep it around for as long as needed"]
pub struct DirectProducer {
inner: Arc<Inner>,
}
impl fmt::Debug for DirectProducer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DirectProducer")
.field("id", &self.inner.id)
.field("kind", &self.inner.kind)
.field("type", &self.inner.r#type)
.field("rtp_parameters", &self.inner.rtp_parameters)
.field(
"consumable_rtp_parameters",
&self.inner.consumable_rtp_parameters,
)
.field("paused", &self.inner.paused)
.field("score", &self.inner.score)
.field("transport", &self.inner.transport)
.field("closed", &self.inner.closed)
.finish()
}
}
impl From<DirectProducer> for Producer {
fn from(producer: DirectProducer) -> Self {
Producer::Direct(producer)
}
}
#[derive(Clone)]
#[non_exhaustive]
#[must_use = "Producer will be closed on drop, make sure to keep it around for as long as needed"]
pub enum Producer {
Regular(RegularProducer),
Direct(DirectProducer),
}
impl fmt::Debug for Producer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self {
Producer::Regular(producer) => f.debug_tuple("Regular").field(&producer).finish(),
Producer::Direct(producer) => f.debug_tuple("Direct").field(&producer).finish(),
}
}
}
impl Producer {
#[allow(clippy::too_many_arguments)]
pub(super) async fn new(
id: ProducerId,
kind: MediaKind,
r#type: ProducerType,
rtp_parameters: RtpParameters,
consumable_rtp_parameters: RtpParameters,
paused: bool,
executor: Arc<Executor<'static>>,
channel: Channel,
payload_channel: PayloadChannel,
app_data: AppData,
transport: Arc<dyn Transport>,
direct: bool,
) -> Self {
debug!("new()");
let handlers = Arc::<Handlers>::default();
let score = Arc::<Mutex<Vec<ProducerScore>>>::default();
let subscription_handler = {
let handlers = Arc::clone(&handlers);
let score = Arc::clone(&score);
channel.subscribe_to_notifications(id.into(), move |notification| {
match serde_json::from_slice::<Notification>(notification) {
Ok(notification) => match notification {
Notification::Score(scores) => {
*score.lock() = scores.clone();
handlers.score.call(|callback| {
callback(&scores);
});
}
Notification::VideoOrientationChange(video_orientation) => {
handlers.video_orientation_change.call(|callback| {
callback(video_orientation);
});
}
Notification::Trace(trace_event_data) => {
handlers.trace.call_simple(&trace_event_data);
}
},
Err(error) => {
error!("Failed to parse notification: {}", error);
}
}
})
};
let inner_weak = Arc::<Mutex<Option<Weak<Inner>>>>::default();
let on_transport_close_handler = transport.on_close({
let inner_weak = Arc::clone(&inner_weak);
Box::new(move || {
let maybe_inner = inner_weak.lock().as_ref().and_then(Weak::upgrade);
if let Some(inner) = maybe_inner {
inner.handlers.transport_close.call_simple();
inner.close(false);
}
})
});
let inner = Arc::new(Inner {
id,
kind,
r#type,
rtp_parameters,
consumable_rtp_parameters,
direct,
paused: AtomicBool::new(paused),
score,
executor,
channel,
payload_channel,
handlers,
app_data,
transport,
closed: AtomicBool::new(false),
_subscription_handler: Mutex::new(subscription_handler),
_on_transport_close_handler: Mutex::new(on_transport_close_handler),
});
inner_weak.lock().replace(Arc::downgrade(&inner));
if direct {
Self::Direct(DirectProducer { inner })
} else {
Self::Regular(RegularProducer { inner })
}
}
#[must_use]
pub fn id(&self) -> ProducerId {
self.inner().id
}
pub fn transport(&self) -> &Arc<dyn Transport> {
&self.inner().transport
}
#[must_use]
pub fn kind(&self) -> MediaKind {
self.inner().kind
}
#[must_use]
pub fn rtp_parameters(&self) -> &RtpParameters {
&self.inner().rtp_parameters
}
#[must_use]
pub fn r#type(&self) -> ProducerType {
self.inner().r#type
}
#[must_use]
pub fn paused(&self) -> bool {
self.inner().paused.load(Ordering::SeqCst)
}
#[must_use]
pub fn score(&self) -> Vec<ProducerScore> {
self.inner().score.lock().clone()
}
#[must_use]
pub fn app_data(&self) -> &AppData {
&self.inner().app_data
}
#[must_use]
pub fn closed(&self) -> bool {
self.inner().closed.load(Ordering::SeqCst)
}
#[doc(hidden)]
pub async fn dump(&self) -> Result<ProducerDump, RequestError> {
debug!("dump()");
self.inner()
.channel
.request(self.id(), ProducerDumpRequest {})
.await
}
pub async fn get_stats(&self) -> Result<Vec<ProducerStat>, RequestError> {
debug!("get_stats()");
self.inner()
.channel
.request(self.id(), ProducerGetStatsRequest {})
.await
}
pub async fn pause(&self) -> Result<(), RequestError> {
debug!("pause()");
self.inner()
.channel
.request(self.id(), ProducerPauseRequest {})
.await?;
let was_paused = self.inner().paused.swap(true, Ordering::SeqCst);
if !was_paused {
self.inner().handlers.pause.call_simple();
}
Ok(())
}
pub async fn resume(&self) -> Result<(), RequestError> {
debug!("resume()");
self.inner()
.channel
.request(self.id(), ProducerResumeRequest {})
.await?;
let was_paused = self.inner().paused.swap(false, Ordering::SeqCst);
if was_paused {
self.inner().handlers.resume.call_simple();
}
Ok(())
}
pub async fn enable_trace_event(
&self,
types: Vec<ProducerTraceEventType>,
) -> Result<(), RequestError> {
debug!("enable_trace_event()");
self.inner()
.channel
.request(self.id(), ProducerEnableTraceEventRequest { types })
.await
}
pub fn on_score<F: Fn(&[ProducerScore]) + Send + Sync + 'static>(
&self,
callback: F,
) -> HandlerId {
self.inner().handlers.score.add(Arc::new(callback))
}
pub fn on_video_orientation_change<F: Fn(ProducerVideoOrientation) + Send + Sync + 'static>(
&self,
callback: F,
) -> HandlerId {
self.inner()
.handlers
.video_orientation_change
.add(Arc::new(callback))
}
pub fn on_pause<F: Fn() + Send + Sync + 'static>(&self, callback: F) -> HandlerId {
self.inner().handlers.pause.add(Arc::new(callback))
}
pub fn on_resume<F: Fn() + Send + Sync + 'static>(&self, callback: F) -> HandlerId {
self.inner().handlers.resume.add(Arc::new(callback))
}
pub fn on_trace<F: Fn(&ProducerTraceEventData) + Send + Sync + 'static>(
&self,
callback: F,
) -> HandlerId {
self.inner().handlers.trace.add(Arc::new(callback))
}
pub fn on_transport_close<F: FnOnce() + Send + 'static>(&self, callback: F) -> HandlerId {
self.inner()
.handlers
.transport_close
.add(Box::new(callback))
}
pub fn on_close<F: FnOnce() + Send + 'static>(&self, callback: F) -> HandlerId {
let handler_id = self.inner().handlers.close.add(Box::new(callback));
if self.inner().closed.load(Ordering::Relaxed) {
self.inner().handlers.close.call_simple();
}
handler_id
}
#[doc(hidden)]
#[must_use]
pub fn consumable_rtp_parameters(&self) -> &RtpParameters {
&self.inner().consumable_rtp_parameters
}
pub(super) fn close(&self) {
self.inner().close(true);
}
#[must_use]
pub fn downgrade(&self) -> WeakProducer {
WeakProducer {
inner: Arc::downgrade(self.inner()),
}
}
fn inner(&self) -> &Arc<Inner> {
match self {
Producer::Regular(producer) => &producer.inner,
Producer::Direct(producer) => &producer.inner,
}
}
}
impl DirectProducer {
pub fn send(&self, rtp_packet: Vec<u8>) -> Result<(), NotificationError> {
self.inner
.payload_channel
.notify(self.inner.id, ProducerSendNotification {}, rtp_packet)
}
}
pub struct PipedProducer {
producer: Producer,
on_drop: Option<Box<dyn FnOnce(Producer) + Send + 'static>>,
}
impl fmt::Debug for PipedProducer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PipedProducer")
.field("producer", &self.producer)
.finish()
}
}
impl Drop for PipedProducer {
fn drop(&mut self) {
if let Some(on_drop) = self.on_drop.take() {
on_drop(self.producer.clone())
}
}
}
impl PipedProducer {
pub(crate) fn new<F: FnOnce(Producer) + Send + 'static>(
producer: Producer,
on_drop: F,
) -> Self {
Self {
producer,
on_drop: Some(Box::new(on_drop)),
}
}
pub fn into_inner(mut self) -> Producer {
self.on_drop.take();
self.producer.clone()
}
}
#[derive(Clone)]
pub struct WeakProducer {
inner: Weak<Inner>,
}
impl fmt::Debug for WeakProducer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WeakProducer").finish()
}
}
impl WeakProducer {
#[must_use]
pub fn upgrade(&self) -> Option<Producer> {
let inner = self.inner.upgrade()?;
let producer = if inner.direct {
Producer::Direct(DirectProducer { inner })
} else {
Producer::Regular(RegularProducer { inner })
};
Some(producer)
}
}