use crate::data_structures::{AppData, EventDirection, RtpType};
use crate::messages::{
ConsumerCloseRequest, ConsumerDumpRequest, ConsumerEnableTraceEventRequest,
ConsumerEnableTraceEventRequestData, ConsumerGetStatsRequest, ConsumerInternal,
ConsumerPauseRequest, ConsumerRequestKeyFrameRequest, ConsumerResumeRequest,
ConsumerSetPreferredLayersRequest, ConsumerSetPriorityRequest, ConsumerSetPriorityRequestData,
};
use crate::producer::{ProducerId, ProducerStat, ProducerType};
use crate::rtp_parameters::{MediaKind, MimeType, RtpCapabilities, RtpParameters};
use crate::transport::Transport;
use crate::uuid_based_wrapper_type;
use crate::worker::{Channel, RequestError, SubscriptionHandler};
use async_executor::Executor;
use log::*;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::mem;
use std::sync::{Arc, Mutex};
uuid_based_wrapper_type!(ConsumerId);
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ConsumerLayers {
pub spatial_layer: u8,
pub temporal_layer: Option<u8>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ConsumerScore {
score: u8,
producer_score: u8,
producer_scores: Vec<u8>,
}
#[derive(Debug)]
#[non_exhaustive]
pub struct ConsumerOptions {
pub producer_id: ProducerId,
pub rtp_capabilities: RtpCapabilities,
pub paused: bool,
pub preferred_layers: Option<ConsumerLayers>,
pub app_data: AppData,
}
impl ConsumerOptions {
pub fn new(producer_id: ProducerId, rtp_capabilities: RtpCapabilities) -> Self {
Self {
producer_id,
rtp_capabilities,
paused: false,
preferred_layers: None,
app_data: AppData::default(),
}
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
pub struct RtpStreamParams {
clock_rate: u32,
cname: String,
encoding_idx: usize,
mime_type: MimeType,
payload_type: u8,
spatial_layers: u8,
ssrc: u32,
temporal_layers: u8,
use_dtx: bool,
use_in_band_fec: bool,
use_nack: bool,
use_pli: bool,
rid: Option<String>,
rtc_ssrc: Option<u32>,
rtc_payload_type: Option<u8>,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
pub struct RtpStream {
params: RtpStreamParams,
score: u8,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
pub struct RtpRtxParameters {
ssrc: Option<u32>,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
pub struct ConsumableRtpEncoding {
ssrc: Option<u32>,
rid: Option<String>,
codec_payload_type: Option<u8>,
rtx: Option<RtpRtxParameters>,
max_bitrate: Option<u32>,
max_framerate: Option<f64>,
dtx: Option<bool>,
scalability_mode: Option<String>,
spatial_layers: Option<u8>,
temporal_layers: Option<u8>,
ksvc: Option<bool>,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
pub struct ConsumerDump {
pub id: ConsumerId,
pub kind: MediaKind,
pub paused: bool,
pub priority: u8,
pub producer_id: ProducerId,
pub producer_paused: bool,
pub rtp_parameters: RtpParameters,
pub supported_codec_payload_types: Vec<u8>,
pub trace_event_types: String,
pub r#type: ConsumerType,
pub consumable_rtp_encodings: Vec<ConsumableRtpEncoding>,
pub rtp_stream: RtpStream,
pub preferred_spatial_layer: Option<u8>,
pub target_spatial_layer: Option<u8>,
pub current_spatial_layer: Option<u8>,
pub preferred_temporal_layer: Option<u8>,
pub target_temporal_layer: Option<u8>,
pub current_temporal_layer: Option<u8>,
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ConsumerType {
Simple,
Simulcast,
SVC,
Pipe,
}
impl From<ProducerType> for ConsumerType {
fn from(producer_type: ProducerType) -> Self {
match producer_type {
ProducerType::Simple => ConsumerType::Simple,
ProducerType::Simulcast => ConsumerType::Simulcast,
ProducerType::SVC => ConsumerType::SVC,
}
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ConsumerStat {
pub r#type: RtpType,
pub timestamp: u32,
pub ssrc: u32,
pub rtx_ssrc: Option<u32>,
pub kind: String,
pub mime_type: MimeType,
pub packets_lost: u32,
pub fraction_lost: u8,
pub packets_discarded: usize,
pub packets_retransmitted: usize,
pub packets_repaired: usize,
pub nack_count: usize,
pub nack_packet_count: usize,
pub pli_count: usize,
pub fir_count: usize,
pub score: u8,
pub packet_count: usize,
pub byte_count: usize,
pub bitrate: u32,
pub round_trip_time: Option<u32>,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(untagged)]
pub enum ConsumerStats {
JustConsumer((ConsumerStat,)),
WithProducer((ConsumerStat, ProducerStat)),
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum ConsumerTraceEventData {
RTP {
timestamp: u64,
direction: EventDirection,
info: Value,
},
KeyFrame {
timestamp: u64,
direction: EventDirection,
info: Value,
},
NACK {
timestamp: u64,
direction: EventDirection,
info: Value,
},
PLI {
timestamp: u64,
direction: EventDirection,
info: Value,
},
FIR {
timestamp: u64,
direction: EventDirection,
info: Value,
},
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ConsumerTraceEventType {
RTP,
KeyFrame,
NACK,
PLI,
FIR,
}
#[derive(Debug, Deserialize)]
#[serde(tag = "event", rename_all = "lowercase", content = "data")]
enum Notification {
ProducerClose,
ProducerPause,
ProducerResume,
Score(ConsumerScore),
LayersChange(ConsumerLayers),
Trace(ConsumerTraceEventData),
}
#[derive(Default)]
struct Handlers {
pause: Mutex<Vec<Box<dyn Fn() + Send>>>,
resume: Mutex<Vec<Box<dyn Fn() + Send>>>,
score: Mutex<Vec<Box<dyn Fn(&ConsumerScore) + Send>>>,
layers_change: Mutex<Vec<Box<dyn Fn(&ConsumerLayers) + Send>>>,
trace: Mutex<Vec<Box<dyn Fn(&ConsumerTraceEventData) + Send>>>,
closed: Mutex<Vec<Box<dyn FnOnce() + Send>>>,
}
struct Inner {
id: ConsumerId,
producer_id: ProducerId,
kind: MediaKind,
r#type: ConsumerType,
rtp_parameters: RtpParameters,
paused: Arc<Mutex<bool>>,
executor: Arc<Executor<'static>>,
channel: Channel,
payload_channel: Channel,
producer_paused: Arc<Mutex<bool>>,
priority: Mutex<u8>,
score: Arc<Mutex<ConsumerScore>>,
preferred_layers: Mutex<Option<ConsumerLayers>>,
current_layers: Arc<Mutex<Option<ConsumerLayers>>>,
handlers: Arc<Handlers>,
app_data: AppData,
transport: Box<dyn Transport>,
_subscription_handler: SubscriptionHandler,
}
impl Drop for Inner {
fn drop(&mut self) {
debug!("drop()");
let callbacks: Vec<_> = mem::take(self.handlers.closed.lock().unwrap().as_mut());
for callback in callbacks {
callback();
}
{
let channel = self.channel.clone();
let request = ConsumerCloseRequest {
internal: ConsumerInternal {
router_id: self.transport.router_id(),
transport_id: self.transport.id(),
consumer_id: self.id,
producer_id: self.producer_id,
},
};
self.executor
.spawn(async move {
if let Err(error) = channel.request(request).await {
error!("consumer closing failed on drop: {}", error);
}
})
.detach();
}
}
}
#[derive(Clone)]
pub struct Consumer {
inner: Arc<Inner>,
}
impl Consumer {
pub(super) async fn new(
id: ConsumerId,
producer_id: ProducerId,
kind: MediaKind,
r#type: ConsumerType,
rtp_parameters: RtpParameters,
paused: bool,
executor: Arc<Executor<'static>>,
channel: Channel,
payload_channel: Channel,
producer_paused: bool,
score: ConsumerScore,
preferred_layers: Option<ConsumerLayers>,
app_data: AppData,
transport: Box<dyn Transport>,
) -> Self {
debug!("new()");
let handlers = Arc::<Handlers>::default();
let score = Arc::new(Mutex::new(score));
let paused = Arc::new(Mutex::new(paused));
let producer_paused = Arc::new(Mutex::new(producer_paused));
let current_layers = Arc::<Mutex<Option<ConsumerLayers>>>::default();
let subscription_handler = {
let handlers = Arc::clone(&handlers);
let paused = Arc::clone(&paused);
let producer_paused = Arc::clone(&producer_paused);
let score = Arc::clone(&score);
let current_layers = Arc::clone(¤t_layers);
channel
.subscribe_to_notifications(id.to_string(), move |notification| {
match serde_json::from_value::<Notification>(notification) {
Ok(notification) => match notification {
Notification::ProducerClose => {
}
Notification::ProducerPause => {
let mut producer_paused = producer_paused.lock().unwrap();
let was_paused = *paused.lock().unwrap() || *producer_paused;
*producer_paused = true;
if !was_paused {
for callback in handlers.pause.lock().unwrap().iter() {
callback();
}
}
}
Notification::ProducerResume => {
let mut producer_paused = producer_paused.lock().unwrap();
let paused = *paused.lock().unwrap();
let was_paused = paused || *producer_paused;
*producer_paused = false;
if was_paused && !paused {
for callback in handlers.resume.lock().unwrap().iter() {
callback();
}
}
}
Notification::Score(consumer_score) => {
*score.lock().unwrap() = consumer_score.clone();
for callback in handlers.score.lock().unwrap().iter() {
callback(&consumer_score);
}
}
Notification::LayersChange(consumer_layers) => {
*current_layers.lock().unwrap() = Some(consumer_layers.clone());
for callback in handlers.layers_change.lock().unwrap().iter() {
callback(&consumer_layers);
}
}
Notification::Trace(trace_event_data) => {
for callback in handlers.trace.lock().unwrap().iter() {
callback(&trace_event_data);
}
}
},
Err(error) => {
error!("Failed to parse notification: {}", error);
}
}
})
.await
.unwrap()
};
let inner = Arc::new(Inner {
id,
producer_id,
kind,
r#type,
rtp_parameters,
paused,
producer_paused,
priority: Mutex::new(1u8),
score,
preferred_layers: Mutex::new(preferred_layers),
current_layers,
executor,
channel,
payload_channel,
handlers,
app_data,
transport,
_subscription_handler: subscription_handler,
});
Self { inner }
}
pub fn id(&self) -> ConsumerId {
self.inner.id
}
pub fn producer_id(&self) -> ProducerId {
self.inner.producer_id
}
pub fn kind(&self) -> MediaKind {
self.inner.kind
}
pub fn rtp_parameters(&self) -> &RtpParameters {
&self.inner.rtp_parameters
}
pub fn r#type(&self) -> ConsumerType {
self.inner.r#type
}
pub fn paused(&self) -> bool {
*self.inner.paused.lock().unwrap()
}
pub fn producer_paused(&self) -> bool {
*self.inner.producer_paused.lock().unwrap()
}
pub fn priority(&self) -> u8 {
*self.inner.priority.lock().unwrap()
}
pub fn score(&self) -> ConsumerScore {
self.inner.score.lock().unwrap().clone()
}
pub fn preferred_layers(&self) -> Option<ConsumerLayers> {
self.inner.preferred_layers.lock().unwrap().clone()
}
pub fn current_layers(&self) -> Option<ConsumerLayers> {
self.inner.current_layers.lock().unwrap().clone()
}
pub fn app_data(&self) -> &AppData {
&self.inner.app_data
}
#[doc(hidden)]
pub async fn dump(&self) -> Result<ConsumerDump, RequestError> {
debug!("dump()");
self.inner
.channel
.request(ConsumerDumpRequest {
internal: self.get_internal(),
})
.await
}
pub async fn get_stats(&self) -> Result<ConsumerStats, RequestError> {
debug!("get_stats()");
self.inner
.channel
.request(ConsumerGetStatsRequest {
internal: self.get_internal(),
})
.await
}
pub async fn pause(&self) -> Result<(), RequestError> {
debug!("pause()");
self.inner
.channel
.request(ConsumerPauseRequest {
internal: self.get_internal(),
})
.await?;
let mut paused = self.inner.paused.lock().unwrap();
let was_paused = *paused || *self.inner.producer_paused.lock().unwrap();
*paused = true;
if !was_paused {
for callback in self.inner.handlers.pause.lock().unwrap().iter() {
callback();
}
}
Ok(())
}
pub async fn resume(&self) -> Result<(), RequestError> {
debug!("resume()");
self.inner
.channel
.request(ConsumerResumeRequest {
internal: self.get_internal(),
})
.await?;
let mut paused = self.inner.paused.lock().unwrap();
let was_paused = *paused || *self.inner.producer_paused.lock().unwrap();
*paused = false;
if was_paused {
for callback in self.inner.handlers.resume.lock().unwrap().iter() {
callback();
}
}
Ok(())
}
pub async fn set_preferred_layers(
&self,
consumer_layers: ConsumerLayers,
) -> Result<(), RequestError> {
debug!("set_preferred_layers()");
let consumer_layers = self
.inner
.channel
.request(ConsumerSetPreferredLayersRequest {
internal: self.get_internal(),
data: consumer_layers,
})
.await?;
*self.inner.preferred_layers.lock().unwrap() = consumer_layers;
Ok(())
}
pub async fn set_priority(&self, priority: u8) -> Result<(), RequestError> {
debug!("set_preferred_layers()");
let result = self
.inner
.channel
.request(ConsumerSetPriorityRequest {
internal: self.get_internal(),
data: ConsumerSetPriorityRequestData { priority },
})
.await?;
*self.inner.priority.lock().unwrap() = result.priority;
Ok(())
}
pub async fn unset_priority(&self) -> Result<(), RequestError> {
debug!("unset_priority()");
let priority = 1;
let result = self
.inner
.channel
.request(ConsumerSetPriorityRequest {
internal: self.get_internal(),
data: ConsumerSetPriorityRequestData { priority },
})
.await?;
*self.inner.priority.lock().unwrap() = result.priority;
Ok(())
}
pub async fn request_key_frame(&self) -> Result<(), RequestError> {
debug!("request_key_frame()");
self.inner
.channel
.request(ConsumerRequestKeyFrameRequest {
internal: self.get_internal(),
})
.await
}
pub async fn enable_trace_event(
&self,
types: Vec<ConsumerTraceEventType>,
) -> Result<(), RequestError> {
debug!("enable_trace_event()");
self.inner
.channel
.request(ConsumerEnableTraceEventRequest {
internal: self.get_internal(),
data: ConsumerEnableTraceEventRequestData { types },
})
.await
}
pub fn connect_pause<F: Fn() + Send + 'static>(&self, callback: F) {
self.inner
.handlers
.pause
.lock()
.unwrap()
.push(Box::new(callback));
}
pub fn connect_resume<F: Fn() + Send + 'static>(&self, callback: F) {
self.inner
.handlers
.resume
.lock()
.unwrap()
.push(Box::new(callback));
}
pub fn connect_score<F: Fn(&ConsumerScore) + Send + 'static>(&self, callback: F) {
self.inner
.handlers
.score
.lock()
.unwrap()
.push(Box::new(callback));
}
pub fn connect_layers_change<F: Fn(&ConsumerLayers) + Send + 'static>(&self, callback: F) {
self.inner
.handlers
.layers_change
.lock()
.unwrap()
.push(Box::new(callback));
}
pub fn connect_trace<F: Fn(&ConsumerTraceEventData) + Send + 'static>(&self, callback: F) {
self.inner
.handlers
.trace
.lock()
.unwrap()
.push(Box::new(callback));
}
pub fn connect_closed<F: FnOnce() + Send + 'static>(&self, callback: F) {
self.inner
.handlers
.closed
.lock()
.unwrap()
.push(Box::new(callback));
}
fn get_internal(&self) -> ConsumerInternal {
ConsumerInternal {
router_id: self.inner.transport.router_id(),
transport_id: self.inner.transport.id(),
consumer_id: self.inner.id,
producer_id: self.inner.producer_id,
}
}
}