use crate::api::{DataTrack, DataTrackFrame, DataTrackInfo, DataTrackInner, InternalError};
use events::{InputEvent, SubscribeRequest};
use livekit_runtime::timeout;
use std::{
marker::PhantomData,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use thiserror::Error;
use tokio::sync::{mpsc, oneshot, watch};
use tokio_stream::{wrappers::BroadcastStream, Stream};
pub(crate) mod events;
pub(crate) mod manager;
pub(crate) mod proto;
mod depacketizer;
mod pipeline;
pub type RemoteDataTrack = DataTrack<Remote>;
#[derive(Debug, Clone)]
pub struct Remote;
impl DataTrack<Remote> {
pub(crate) fn new(info: Arc<DataTrackInfo>, inner: RemoteTrackInner) -> Self {
Self { info, inner: Arc::new(inner.into()), _location: PhantomData }
}
fn inner(&self) -> &RemoteTrackInner {
match &*self.inner {
DataTrackInner::Remote(inner) => inner,
DataTrackInner::Local(_) => unreachable!(), }
}
}
impl DataTrack<Remote> {
pub async fn subscribe(&self) -> Result<DataTrackSubscription, DataTrackSubscribeError> {
self.subscribe_with_options(DataTrackSubscribeOptions::default()).await
}
pub async fn subscribe_with_options(
&self,
options: DataTrackSubscribeOptions,
) -> Result<DataTrackSubscription, DataTrackSubscribeError> {
let (result_tx, result_rx) = oneshot::channel();
let subscribe_event = SubscribeRequest { sid: self.info.sid(), options, result_tx };
self.inner()
.event_in_tx
.upgrade()
.ok_or(DataTrackSubscribeError::Disconnected)?
.send(subscribe_event.into())
.await
.map_err(|_| DataTrackSubscribeError::Disconnected)?;
let frame_rx = timeout(Duration::from_secs(10), result_rx)
.await
.map_err(|_| DataTrackSubscribeError::Timeout)?
.map_err(|_| DataTrackSubscribeError::Disconnected)??;
Ok(DataTrackSubscription { inner: BroadcastStream::new(frame_rx) })
}
pub fn publisher_identity(&self) -> &str {
&self.inner().publisher_identity
}
}
pub struct DataTrackSubscription {
inner: BroadcastStream<DataTrackFrame>,
}
impl Stream for DataTrackSubscription {
type Item = DataTrackFrame;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Ready(Some(Ok(frame))) => return Poll::Ready(Some(frame)),
Poll::Ready(Some(Err(_))) => continue,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
}
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct RemoteTrackInner {
publisher_identity: Arc<str>,
published_rx: watch::Receiver<bool>,
event_in_tx: mpsc::WeakSender<InputEvent>,
}
impl RemoteTrackInner {
pub(crate) fn is_published(&self) -> bool {
*self.published_rx.borrow()
}
pub(crate) async fn wait_for_unpublish(&self) {
let mut published_rx = self.published_rx.clone();
_ = published_rx.wait_for(|is_published| !*is_published).await
}
}
#[derive(Debug, Error)]
pub enum DataTrackSubscribeError {
#[error("The track has been unpublished and is no longer available")]
Unpublished,
#[error("Request to subscribe to data track timed-out")]
Timeout,
#[error("Cannot subscribe to data track when disconnected")]
Disconnected,
#[error(transparent)]
Internal(#[from] InternalError),
}
#[derive(Debug, Clone)]
pub struct DataTrackSubscribeOptions {
buffer_size: usize,
}
impl DataTrackSubscribeOptions {
pub fn new() -> Self {
Self { buffer_size: 16 }
}
pub fn buffer_size(&self) -> usize {
self.buffer_size
}
pub fn with_buffer_size(mut self, mut frames: usize) -> Self {
if frames == 0 {
log::warn!("Zero is not a valid buffer size, using one");
frames = 1;
}
self.buffer_size = frames;
self
}
}
impl Default for DataTrackSubscribeOptions {
fn default() -> Self {
Self::new()
}
}