#[cfg(test)]
mod tests;
use crate::data_structures::{AppData, WebRtcMessage};
use crate::messages::{
DataProducerCloseRequest, DataProducerDumpRequest, DataProducerGetStatsRequest,
DataProducerSendNotification,
};
use crate::sctp_parameters::SctpStreamParameters;
use crate::transport::Transport;
use crate::uuid_based_wrapper_type;
use crate::worker::{Channel, NotificationError, PayloadChannel, RequestError};
use async_executor::Executor;
use event_listener_primitives::{BagOnce, HandlerId};
use log::{debug, error};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
uuid_based_wrapper_type!(
DataProducerId
);
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct DataProducerOptions {
pub(super) id: Option<DataProducerId>,
pub(super) sctp_stream_parameters: Option<SctpStreamParameters>,
pub label: String,
pub protocol: String,
pub app_data: AppData,
}
impl DataProducerOptions {
#[must_use]
pub(super) fn new_pipe_transport(
data_producer_id: DataProducerId,
sctp_stream_parameters: SctpStreamParameters,
) -> Self {
Self {
id: Some(data_producer_id),
sctp_stream_parameters: Some(sctp_stream_parameters),
label: "".to_string(),
protocol: "".to_string(),
app_data: AppData::default(),
}
}
#[must_use]
pub fn new_sctp(sctp_stream_parameters: SctpStreamParameters) -> Self {
Self {
id: None,
sctp_stream_parameters: Some(sctp_stream_parameters),
label: "".to_string(),
protocol: "".to_string(),
app_data: AppData::default(),
}
}
#[must_use]
pub fn new_direct() -> Self {
Self {
id: None,
sctp_stream_parameters: None,
label: "".to_string(),
protocol: "".to_string(),
app_data: AppData::default(),
}
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum DataProducerType {
Sctp,
Direct,
}
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
#[non_exhaustive]
pub struct DataProducerDump {
pub id: DataProducerId,
pub r#type: DataProducerType,
pub label: String,
pub protocol: String,
pub sctp_stream_parameters: Option<SctpStreamParameters>,
}
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
#[allow(missing_docs)]
pub struct DataProducerStat {
pub timestamp: u64,
pub label: String,
pub protocol: String,
pub messages_received: usize,
pub bytes_received: usize,
}
#[derive(Default)]
#[allow(clippy::type_complexity)]
struct Handlers {
transport_close: BagOnce<Box<dyn FnOnce() + Send>>,
close: BagOnce<Box<dyn FnOnce() + Send>>,
}
struct Inner {
id: DataProducerId,
r#type: DataProducerType,
sctp_stream_parameters: Option<SctpStreamParameters>,
label: String,
protocol: String,
direct: bool,
executor: Arc<Executor<'static>>,
channel: Channel,
payload_channel: PayloadChannel,
handlers: Arc<Handlers>,
app_data: AppData,
transport: Arc<dyn Transport>,
closed: AtomicBool,
_on_transport_close_handler: Mutex<HandlerId>,
}
impl Drop for Inner {
fn drop(&mut self) {
debug!("drop()");
self.close(true);
}
}
impl Inner {
fn close(&self, close_request: bool) {
if !self.closed.swap(true, Ordering::SeqCst) {
debug!("close()");
self.handlers.close.call_simple();
if close_request {
let channel = self.channel.clone();
let transport_id = self.transport.id();
let request = DataProducerCloseRequest {
data_producer_id: self.id,
};
self.executor
.spawn(async move {
if let Err(error) = channel.request(transport_id, request).await {
error!("data producer closing failed on drop: {}", error);
}
})
.detach();
}
}
}
}
#[derive(Clone)]
#[must_use = "Data producer will be closed on drop, make sure to keep it around for as long as needed"]
pub struct RegularDataProducer {
inner: Arc<Inner>,
}
impl fmt::Debug for RegularDataProducer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RegularDataProducer")
.field("id", &self.inner.id)
.field("type", &self.inner.r#type)
.field("sctp_stream_parameters", &self.inner.sctp_stream_parameters)
.field("label", &self.inner.label)
.field("protocol", &self.inner.protocol)
.field("transport", &self.inner.transport)
.field("closed", &self.inner.closed)
.finish()
}
}
impl From<RegularDataProducer> for DataProducer {
fn from(producer: RegularDataProducer) -> Self {
DataProducer::Regular(producer)
}
}
#[derive(Clone)]
#[must_use = "Data producer will be closed on drop, make sure to keep it around for as long as needed"]
pub struct DirectDataProducer {
inner: Arc<Inner>,
}
impl fmt::Debug for DirectDataProducer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DirectDataProducer")
.field("id", &self.inner.id)
.field("type", &self.inner.r#type)
.field("sctp_stream_parameters", &self.inner.sctp_stream_parameters)
.field("label", &self.inner.label)
.field("protocol", &self.inner.protocol)
.field("transport", &self.inner.transport)
.field("closed", &self.inner.closed)
.finish()
}
}
impl From<DirectDataProducer> for DataProducer {
fn from(producer: DirectDataProducer) -> Self {
DataProducer::Direct(producer)
}
}
#[derive(Clone)]
#[non_exhaustive]
#[must_use = "Data producer will be closed on drop, make sure to keep it around for as long as needed"]
pub enum DataProducer {
Regular(RegularDataProducer),
Direct(DirectDataProducer),
}
impl fmt::Debug for DataProducer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self {
DataProducer::Regular(producer) => f.debug_tuple("Regular").field(&producer).finish(),
DataProducer::Direct(producer) => f.debug_tuple("Direct").field(&producer).finish(),
}
}
}
impl DataProducer {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
id: DataProducerId,
r#type: DataProducerType,
sctp_stream_parameters: Option<SctpStreamParameters>,
label: String,
protocol: String,
executor: Arc<Executor<'static>>,
channel: Channel,
payload_channel: PayloadChannel,
app_data: AppData,
transport: Arc<dyn Transport>,
direct: bool,
) -> Self {
debug!("new()");
let handlers = Arc::<Handlers>::default();
let inner_weak = Arc::<Mutex<Option<Weak<Inner>>>>::default();
let on_transport_close_handler = transport.on_close({
let inner_weak = Arc::clone(&inner_weak);
Box::new(move || {
let maybe_inner = inner_weak.lock().as_ref().and_then(Weak::upgrade);
if let Some(inner) = maybe_inner {
inner.handlers.transport_close.call_simple();
inner.close(false);
}
})
});
let inner = Arc::new(Inner {
id,
r#type,
sctp_stream_parameters,
label,
protocol,
direct,
executor,
channel,
payload_channel,
handlers,
app_data,
transport,
closed: AtomicBool::new(false),
_on_transport_close_handler: Mutex::new(on_transport_close_handler),
});
inner_weak.lock().replace(Arc::downgrade(&inner));
if direct {
Self::Direct(DirectDataProducer { inner })
} else {
Self::Regular(RegularDataProducer { inner })
}
}
#[must_use]
pub fn id(&self) -> DataProducerId {
self.inner().id
}
pub fn transport(&self) -> &Arc<dyn Transport> {
&self.inner().transport
}
#[must_use]
pub fn r#type(&self) -> DataProducerType {
self.inner().r#type
}
#[must_use]
pub fn sctp_stream_parameters(&self) -> Option<SctpStreamParameters> {
self.inner().sctp_stream_parameters
}
#[must_use]
pub fn label(&self) -> &String {
&self.inner().label
}
#[must_use]
pub fn protocol(&self) -> &String {
&self.inner().protocol
}
#[must_use]
pub fn app_data(&self) -> &AppData {
&self.inner().app_data
}
#[must_use]
pub fn closed(&self) -> bool {
self.inner().closed.load(Ordering::SeqCst)
}
#[doc(hidden)]
pub async fn dump(&self) -> Result<DataProducerDump, RequestError> {
debug!("dump()");
self.inner()
.channel
.request(self.id(), DataProducerDumpRequest {})
.await
}
pub async fn get_stats(&self) -> Result<Vec<DataProducerStat>, RequestError> {
debug!("get_stats()");
self.inner()
.channel
.request(self.id(), DataProducerGetStatsRequest {})
.await
}
pub fn on_transport_close<F: FnOnce() + Send + 'static>(&self, callback: F) -> HandlerId {
self.inner()
.handlers
.transport_close
.add(Box::new(callback))
}
pub fn on_close<F: FnOnce() + Send + 'static>(&self, callback: F) -> HandlerId {
let handler_id = self.inner().handlers.close.add(Box::new(callback));
if self.inner().closed.load(Ordering::Relaxed) {
self.inner().handlers.close.call_simple();
}
handler_id
}
pub(super) fn close(&self) {
self.inner().close(true);
}
#[must_use]
pub fn downgrade(&self) -> WeakDataProducer {
WeakDataProducer {
inner: Arc::downgrade(self.inner()),
}
}
fn inner(&self) -> &Arc<Inner> {
match self {
DataProducer::Regular(data_producer) => &data_producer.inner,
DataProducer::Direct(data_producer) => &data_producer.inner,
}
}
}
impl DirectDataProducer {
pub fn send(&self, message: WebRtcMessage<'_>) -> Result<(), NotificationError> {
let (ppid, payload) = message.into_ppid_and_payload();
self.inner.payload_channel.notify(
self.inner.id,
DataProducerSendNotification { ppid },
payload.into_owned(),
)
}
}
pub struct NonClosingDataProducer {
data_producer: DataProducer,
on_drop: Option<Box<dyn FnOnce(DataProducer) + Send + 'static>>,
}
impl fmt::Debug for NonClosingDataProducer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NonClosingDataProducer")
.field("data_producer", &self.data_producer)
.finish()
}
}
impl Drop for NonClosingDataProducer {
fn drop(&mut self) {
if let Some(on_drop) = self.on_drop.take() {
on_drop(self.data_producer.clone())
}
}
}
impl NonClosingDataProducer {
pub(crate) fn new<F: FnOnce(DataProducer) + Send + 'static>(
data_producer: DataProducer,
on_drop: F,
) -> Self {
Self {
data_producer,
on_drop: Some(Box::new(on_drop)),
}
}
pub fn into_inner(mut self) -> DataProducer {
self.on_drop.take();
self.data_producer.clone()
}
}
#[derive(Clone)]
pub struct WeakDataProducer {
inner: Weak<Inner>,
}
impl fmt::Debug for WeakDataProducer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WeakDataProducer").finish()
}
}
impl WeakDataProducer {
#[must_use]
pub fn upgrade(&self) -> Option<DataProducer> {
let inner = self.inner.upgrade()?;
let data_producer = if inner.direct {
DataProducer::Direct(DirectDataProducer { inner })
} else {
DataProducer::Regular(RegularDataProducer { inner })
};
Some(data_producer)
}
}