livekit_datatrack/remote/
mod.rs1use crate::api::{DataTrack, DataTrackFrame, DataTrackInfo, DataTrackInner, InternalError};
16use events::{InputEvent, SubscribeRequest};
17use livekit_runtime::timeout;
18use std::{
19 marker::PhantomData,
20 pin::Pin,
21 sync::Arc,
22 task::{Context, Poll},
23 time::Duration,
24};
25use thiserror::Error;
26use tokio::sync::{mpsc, oneshot, watch};
27use tokio_stream::{wrappers::BroadcastStream, Stream};
28
29pub(crate) mod events;
30pub(crate) mod manager;
31pub(crate) mod proto;
32
33mod depacketizer;
34mod pipeline;
35
36pub type RemoteDataTrack = DataTrack<Remote>;
38
39#[derive(Debug, Clone)]
44pub struct Remote;
45
46impl DataTrack<Remote> {
47 pub(crate) fn new(info: Arc<DataTrackInfo>, inner: RemoteTrackInner) -> Self {
48 Self { info, inner: Arc::new(inner.into()), _location: PhantomData }
49 }
50
51 fn inner(&self) -> &RemoteTrackInner {
52 match &*self.inner {
53 DataTrackInner::Remote(inner) => inner,
54 DataTrackInner::Local(_) => unreachable!(), }
56 }
57}
58
59impl DataTrack<Remote> {
60 pub async fn subscribe(&self) -> Result<DataTrackStream, DataTrackSubscribeError> {
84 self.subscribe_with_options(DataTrackSubscribeOptions::default()).await
85 }
86
87 pub async fn subscribe_with_options(
93 &self,
94 options: DataTrackSubscribeOptions,
95 ) -> Result<DataTrackStream, DataTrackSubscribeError> {
96 let (result_tx, result_rx) = oneshot::channel();
97 let subscribe_event = SubscribeRequest { sid: self.info.sid(), options, result_tx };
98 self.inner()
99 .event_in_tx
100 .upgrade()
101 .ok_or(DataTrackSubscribeError::Disconnected)?
102 .send(subscribe_event.into())
103 .await
104 .map_err(|_| DataTrackSubscribeError::Disconnected)?;
105
106 let frame_rx = timeout(Duration::from_secs(10), result_rx)
108 .await
109 .map_err(|_| DataTrackSubscribeError::Timeout)?
110 .map_err(|_| DataTrackSubscribeError::Disconnected)??;
111
112 Ok(DataTrackStream { inner: BroadcastStream::new(frame_rx) })
113 }
114
115 pub fn publisher_identity(&self) -> &str {
117 &self.inner().publisher_identity
118 }
119}
120
121pub struct DataTrackStream {
123 inner: BroadcastStream<DataTrackFrame>,
124}
125
126impl Stream for DataTrackStream {
127 type Item = DataTrackFrame;
128
129 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
130 let this = self.get_mut();
131 loop {
132 match Pin::new(&mut this.inner).poll_next(cx) {
133 Poll::Ready(Some(Ok(frame))) => return Poll::Ready(Some(frame)),
134 Poll::Ready(Some(Err(_))) => continue,
135 Poll::Ready(None) => return Poll::Ready(None),
136 Poll::Pending => return Poll::Pending,
137 }
138 }
139 }
140}
141
142#[derive(Debug, Clone)]
143pub(crate) struct RemoteTrackInner {
144 publisher_identity: Arc<str>,
145 published_rx: watch::Receiver<bool>,
146 event_in_tx: mpsc::WeakSender<InputEvent>,
147}
148
149impl RemoteTrackInner {
150 pub(crate) fn is_published(&self) -> bool {
151 *self.published_rx.borrow()
152 }
153
154 pub(crate) async fn wait_for_unpublish(&self) {
155 let mut published_rx = self.published_rx.clone();
156 _ = published_rx.wait_for(|is_published| !*is_published).await
157 }
158}
159
160#[derive(Debug, Error)]
161pub enum DataTrackSubscribeError {
162 #[error("The track has been unpublished and is no longer available")]
163 Unpublished,
164 #[error("Request to subscribe to data track timed-out")]
165 Timeout,
166 #[error("Cannot subscribe to data track when disconnected")]
167 Disconnected,
168 #[error(transparent)]
169 Internal(#[from] InternalError),
170}
171
172#[derive(Debug, Clone)]
187pub struct DataTrackSubscribeOptions {
188 buffer_size: usize,
189}
190
191impl DataTrackSubscribeOptions {
192 pub fn new() -> Self {
197 Self { buffer_size: 16 }
198 }
199
200 pub fn buffer_size(&self) -> usize {
202 self.buffer_size
203 }
204
205 pub fn with_buffer_size(mut self, mut frames: usize) -> Self {
213 if frames == 0 {
214 log::warn!("Zero is not a valid buffer size, using one");
215 frames = 1;
216 }
217 self.buffer_size = frames;
218 self
219 }
220}
221
222impl Default for DataTrackSubscribeOptions {
223 fn default() -> Self {
224 Self::new()
225 }
226}