Skip to main content

livekit_datatrack/remote/
mod.rs

1// Copyright 2025 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::api::{DataTrack, DataTrackFrame, DataTrackInfo, DataTrackInner, InternalError};
16use events::{InputEvent, SubscribeRequest};
17use livekit_runtime::timeout;
18use std::{
19    marker::PhantomData,
20    pin::Pin,
21    sync::Arc,
22    task::{Context, Poll},
23    time::Duration,
24};
25use thiserror::Error;
26use tokio::sync::{mpsc, oneshot, watch};
27use tokio_stream::{wrappers::BroadcastStream, Stream};
28
29pub(crate) mod events;
30pub(crate) mod manager;
31pub(crate) mod proto;
32
33mod depacketizer;
34mod pipeline;
35
36/// Data track published by a remote participant.
37pub type RemoteDataTrack = DataTrack<Remote>;
38
39/// Marker type indicating a [`DataTrack`] belongs to a remote participant.
40///
41/// See also: [`RemoteDataTrack`]
42///
43#[derive(Debug, Clone)]
44pub struct Remote;
45
46impl DataTrack<Remote> {
47    pub(crate) fn new(info: Arc<DataTrackInfo>, inner: RemoteTrackInner) -> Self {
48        Self { info, inner: Arc::new(inner.into()), _location: PhantomData }
49    }
50
51    fn inner(&self) -> &RemoteTrackInner {
52        match &*self.inner {
53            DataTrackInner::Remote(inner) => inner,
54            DataTrackInner::Local(_) => unreachable!(), // Safe (type state)
55        }
56    }
57}
58
59impl DataTrack<Remote> {
60    /// Subscribes to the data track.
61    ///
62    /// # Returns
63    ///
64    /// A stream that yields [`DataTrackFrame`]s as they arrive.
65    ///
66    /// # Options
67    ///
68    /// To set custom subscription options, see [`Self::subscribe_with_options`].
69    ///
70    /// # Multiple Subscriptions
71    ///
72    /// An application may call `subscribe` more than once to process frames in
73    /// multiple places. For example, one async task might plot values on a graph
74    /// while another writes them to a file.
75    ///
76    /// Internally, only the first call to `subscribe` communicates with the SFU and
77    /// allocates the resources required to receive frames. Additional subscriptions
78    /// reuse the same underlying pipeline and do not trigger additional signaling.
79    ///
80    /// Note that newly created subscriptions only receive frames published after
81    /// the initial subscription is established.
82    ///
83    pub async fn subscribe(&self) -> Result<DataTrackStream, DataTrackSubscribeError> {
84        self.subscribe_with_options(DataTrackSubscribeOptions::default()).await
85    }
86
87    /// Subscribes to the data track, specifying custom options.
88    ///
89    /// Same usage and return as [`Self::subscribe`] with an additional argument
90    /// to specify options.
91    ///
92    pub async fn subscribe_with_options(
93        &self,
94        options: DataTrackSubscribeOptions,
95    ) -> Result<DataTrackStream, DataTrackSubscribeError> {
96        let (result_tx, result_rx) = oneshot::channel();
97        let subscribe_event = SubscribeRequest { sid: self.info.sid(), options, result_tx };
98        self.inner()
99            .event_in_tx
100            .upgrade()
101            .ok_or(DataTrackSubscribeError::Disconnected)?
102            .send(subscribe_event.into())
103            .await
104            .map_err(|_| DataTrackSubscribeError::Disconnected)?;
105
106        // TODO: standardize timeout
107        let frame_rx = timeout(Duration::from_secs(10), result_rx)
108            .await
109            .map_err(|_| DataTrackSubscribeError::Timeout)?
110            .map_err(|_| DataTrackSubscribeError::Disconnected)??;
111
112        Ok(DataTrackStream { inner: BroadcastStream::new(frame_rx) })
113    }
114
115    /// Identity of the participant who published the track.
116    pub fn publisher_identity(&self) -> &str {
117        &self.inner().publisher_identity
118    }
119}
120
121/// A stream of [`DataTrackFrame`]s received from a [`RemoteDataTrack`].
122pub struct DataTrackStream {
123    inner: BroadcastStream<DataTrackFrame>,
124}
125
126impl Stream for DataTrackStream {
127    type Item = DataTrackFrame;
128
129    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
130        let this = self.get_mut();
131        loop {
132            match Pin::new(&mut this.inner).poll_next(cx) {
133                Poll::Ready(Some(Ok(frame))) => return Poll::Ready(Some(frame)),
134                Poll::Ready(Some(Err(_))) => continue,
135                Poll::Ready(None) => return Poll::Ready(None),
136                Poll::Pending => return Poll::Pending,
137            }
138        }
139    }
140}
141
142#[derive(Debug, Clone)]
143pub(crate) struct RemoteTrackInner {
144    publisher_identity: Arc<str>,
145    published_rx: watch::Receiver<bool>,
146    event_in_tx: mpsc::WeakSender<InputEvent>,
147}
148
149impl RemoteTrackInner {
150    pub(crate) fn is_published(&self) -> bool {
151        *self.published_rx.borrow()
152    }
153
154    pub(crate) async fn wait_for_unpublish(&self) {
155        let mut published_rx = self.published_rx.clone();
156        _ = published_rx.wait_for(|is_published| !*is_published).await
157    }
158}
159
160#[derive(Debug, Error)]
161pub enum DataTrackSubscribeError {
162    #[error("The track has been unpublished and is no longer available")]
163    Unpublished,
164    #[error("Request to subscribe to data track timed-out")]
165    Timeout,
166    #[error("Cannot subscribe to data track when disconnected")]
167    Disconnected,
168    #[error(transparent)]
169    Internal(#[from] InternalError),
170}
171
172/// Options for subscribing to a data track.
173///
174/// # Examples
175///
176/// Specify a custom buffer size:
177///
178/// ```
179/// # use livekit_datatrack::api::DataTrackSubscribeOptions;
180/// let options = DataTrackSubscribeOptions::default()
181///     .with_buffer_size(128); // Buffer 128 frames internally
182///
183/// # assert_eq!(options.buffer_size(), 128);
184/// ```
185///
186#[derive(Debug, Clone)]
187pub struct DataTrackSubscribeOptions {
188    buffer_size: usize,
189}
190
191impl DataTrackSubscribeOptions {
192    /// Creates subscribe options with default values.
193    ///
194    /// Equivalent to [`Self::default`].
195    ///
196    pub fn new() -> Self {
197        Self { buffer_size: 16 }
198    }
199
200    /// Returns the maximum number of received frames buffered internally.
201    pub fn buffer_size(&self) -> usize {
202        self.buffer_size
203    }
204
205    /// Sets the maximum number of received frames buffered internally.
206    ///
207    /// Zero is not a valid buffer size; if a value of zero is provided, it will be clamped to one.
208    ///
209    /// Note: if there is already an active subscription for a given track, specifying a
210    /// different buffer size when obtaining a new subscription will have no effect.
211    ///
212    pub fn with_buffer_size(mut self, mut frames: usize) -> Self {
213        if frames == 0 {
214            log::warn!("Zero is not a valid buffer size, using one");
215            frames = 1;
216        }
217        self.buffer_size = frames;
218        self
219    }
220}
221
222impl Default for DataTrackSubscribeOptions {
223    fn default() -> Self {
224        Self::new()
225    }
226}