use crate::{
api::{DataTrack, DataTrackFrame, DataTrackInfo, InternalError},
track::DataTrackInner,
};
use std::{fmt, marker::PhantomData, sync::Arc};
use thiserror::Error;
use tokio::sync::{mpsc, watch};
pub(crate) mod events;
pub(crate) mod manager;
pub(crate) mod proto;
mod packetizer;
mod pipeline;
pub type LocalDataTrack = DataTrack<Local>;
#[derive(Debug, Clone)]
pub struct Local;
impl DataTrack<Local> {
pub(crate) fn new(info: Arc<DataTrackInfo>, inner: LocalTrackInner) -> Self {
Self { info, inner: Arc::new(inner.into()), _location: PhantomData }
}
fn inner(&self) -> &LocalTrackInner {
match &*self.inner {
DataTrackInner::Local(track) => track,
DataTrackInner::Remote(_) => unreachable!(), }
}
}
impl DataTrack<Local> {
pub fn try_push(&self, frame: DataTrackFrame) -> Result<(), PushFrameError> {
match self.inner().publish_state() {
manager::PublishState::Republishing => {
return Err(PushFrameError::new(frame, PushFrameErrorReason::QueueFull))?
}
manager::PublishState::Unpublished => {
return Err(PushFrameError::new(frame, PushFrameErrorReason::TrackUnpublished))?;
}
manager::PublishState::Published => {}
}
self.inner()
.frame_tx
.try_send(frame)
.map_err(|err| PushFrameError::new(err.into_inner(), PushFrameErrorReason::QueueFull))
}
pub fn unpublish(&self) {
self.inner().local_unpublish();
}
}
#[derive(Debug, Clone)]
pub(crate) struct LocalTrackInner {
pub frame_tx: mpsc::Sender<DataTrackFrame>,
pub state_tx: watch::Sender<manager::PublishState>,
}
impl LocalTrackInner {
fn publish_state(&self) -> manager::PublishState {
*self.state_tx.borrow()
}
pub(crate) fn is_published(&self) -> bool {
self.publish_state() != manager::PublishState::Unpublished
}
pub(crate) async fn wait_for_unpublish(&self) {
_ = self
.state_tx
.subscribe()
.wait_for(|state| *state == manager::PublishState::Unpublished)
.await
}
fn local_unpublish(&self) {
_ = self.state_tx.send(manager::PublishState::Unpublished);
}
}
impl Drop for LocalTrackInner {
fn drop(&mut self) {
self.local_unpublish();
}
}
#[derive(Clone, Debug)]
pub struct DataTrackOptions {
pub(crate) name: String,
}
impl DataTrackOptions {
pub fn new(name: impl Into<String>) -> Self {
Self { name: name.into() }
}
}
impl From<String> for DataTrackOptions {
fn from(name: String) -> Self {
Self::new(name)
}
}
impl From<&str> for DataTrackOptions {
fn from(name: &str) -> Self {
Self::new(name.to_string())
}
}
#[derive(Debug, Error)]
pub enum PublishError {
#[error("Data track publishing unauthorized")]
NotAllowed,
#[error("Track name already taken")]
DuplicateName,
#[error("Track name invalid")]
InvalidName,
#[error("Publish data track timed-out")]
Timeout,
#[error("Data track publication limit reached")]
LimitReached,
#[error("Room disconnected")]
Disconnected,
#[error(transparent)]
Internal(#[from] InternalError),
}
#[derive(Debug, Error)]
#[error("Failed to publish frame: {reason}")]
pub struct PushFrameError {
frame: DataTrackFrame,
reason: PushFrameErrorReason,
}
impl PushFrameError {
pub(crate) fn new(frame: DataTrackFrame, reason: PushFrameErrorReason) -> Self {
Self { frame, reason }
}
pub fn reason(&self) -> PushFrameErrorReason {
self.reason
}
pub fn into_frame(self) -> DataTrackFrame {
self.frame
}
}
#[derive(Debug, Clone, Copy)]
pub enum PushFrameErrorReason {
TrackUnpublished,
QueueFull,
}
impl fmt::Display for PushFrameErrorReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::TrackUnpublished => write!(f, "track unpublished"),
Self::QueueFull => write!(f, "queue full"),
}
}
}