livekit-datatrack 0.1.1

Data track core for LiveKit
Documentation
// Copyright 2025 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::api::{DataTrack, DataTrackFrame, DataTrackInfo, DataTrackInner, InternalError};
use events::{InputEvent, SubscribeRequest};
use livekit_runtime::timeout;
use std::{
    marker::PhantomData,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
    time::Duration,
};
use thiserror::Error;
use tokio::sync::{mpsc, oneshot, watch};
use tokio_stream::{wrappers::BroadcastStream, Stream};

pub(crate) mod events;
pub(crate) mod manager;
pub(crate) mod proto;

mod depacketizer;
mod pipeline;

/// Data track published by a remote participant.
pub type RemoteDataTrack = DataTrack<Remote>;

/// Marker type indicating a [`DataTrack`] belongs to a remote participant.
///
/// See also: [`RemoteDataTrack`]
///
#[derive(Debug, Clone)]
pub struct Remote;

impl DataTrack<Remote> {
    pub(crate) fn new(info: Arc<DataTrackInfo>, inner: RemoteTrackInner) -> Self {
        Self { info, inner: Arc::new(inner.into()), _location: PhantomData }
    }

    fn inner(&self) -> &RemoteTrackInner {
        match &*self.inner {
            DataTrackInner::Remote(inner) => inner,
            DataTrackInner::Local(_) => unreachable!(), // Safe (type state)
        }
    }
}

impl DataTrack<Remote> {
    /// Subscribes to the data track.
    ///
    /// # Returns
    ///
    /// A stream that yields [`DataTrackFrame`]s as they arrive.
    ///
    /// # Options
    ///
    /// To set custom subscription options, see [`Self::subscribe_with_options`].
    ///
    /// # Multiple Subscriptions
    ///
    /// An application may call `subscribe` more than once to process frames in
    /// multiple places. For example, one async task might plot values on a graph
    /// while another writes them to a file.
    ///
    /// Internally, only the first call to `subscribe` communicates with the SFU and
    /// allocates the resources required to receive frames. Additional subscriptions
    /// reuse the same underlying pipeline and do not trigger additional signaling.
    ///
    /// Note that newly created subscriptions only receive frames published after
    /// the initial subscription is established.
    ///
    pub async fn subscribe(&self) -> Result<DataTrackSubscription, DataTrackSubscribeError> {
        self.subscribe_with_options(DataTrackSubscribeOptions::default()).await
    }

    /// Subscribes to the data track, specifying custom options.
    ///
    /// Same usage and return as [`Self::subscribe`] with an additional argument
    /// to specify options.
    ///
    pub async fn subscribe_with_options(
        &self,
        options: DataTrackSubscribeOptions,
    ) -> Result<DataTrackSubscription, DataTrackSubscribeError> {
        let (result_tx, result_rx) = oneshot::channel();
        let subscribe_event = SubscribeRequest { sid: self.info.sid(), options, result_tx };
        self.inner()
            .event_in_tx
            .upgrade()
            .ok_or(DataTrackSubscribeError::Disconnected)?
            .send(subscribe_event.into())
            .await
            .map_err(|_| DataTrackSubscribeError::Disconnected)?;

        // TODO: standardize timeout
        let frame_rx = timeout(Duration::from_secs(10), result_rx)
            .await
            .map_err(|_| DataTrackSubscribeError::Timeout)?
            .map_err(|_| DataTrackSubscribeError::Disconnected)??;

        Ok(DataTrackSubscription { inner: BroadcastStream::new(frame_rx) })
    }

    /// Identity of the participant who published the track.
    pub fn publisher_identity(&self) -> &str {
        &self.inner().publisher_identity
    }
}

/// A stream of [`DataTrackFrame`]s received from a [`RemoteDataTrack`].
pub struct DataTrackSubscription {
    inner: BroadcastStream<DataTrackFrame>,
}

impl Stream for DataTrackSubscription {
    type Item = DataTrackFrame;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();
        loop {
            match Pin::new(&mut this.inner).poll_next(cx) {
                Poll::Ready(Some(Ok(frame))) => return Poll::Ready(Some(frame)),
                Poll::Ready(Some(Err(_))) => continue,
                Poll::Ready(None) => return Poll::Ready(None),
                Poll::Pending => return Poll::Pending,
            }
        }
    }
}

#[derive(Debug, Clone)]
pub(crate) struct RemoteTrackInner {
    publisher_identity: Arc<str>,
    published_rx: watch::Receiver<bool>,
    event_in_tx: mpsc::WeakSender<InputEvent>,
}

impl RemoteTrackInner {
    pub(crate) fn is_published(&self) -> bool {
        *self.published_rx.borrow()
    }

    pub(crate) async fn wait_for_unpublish(&self) {
        let mut published_rx = self.published_rx.clone();
        _ = published_rx.wait_for(|is_published| !*is_published).await
    }
}

#[derive(Debug, Error)]
pub enum DataTrackSubscribeError {
    #[error("The track has been unpublished and is no longer available")]
    Unpublished,
    #[error("Request to subscribe to data track timed-out")]
    Timeout,
    #[error("Cannot subscribe to data track when disconnected")]
    Disconnected,
    #[error(transparent)]
    Internal(#[from] InternalError),
}

/// Options for subscribing to a data track.
///
/// # Examples
///
/// Specify a custom buffer size:
///
/// ```
/// # use livekit_datatrack::api::DataTrackSubscribeOptions;
/// let options = DataTrackSubscribeOptions::default()
///     .with_buffer_size(128); // Buffer 128 frames internally
///
/// # assert_eq!(options.buffer_size(), 128);
/// ```
///
#[derive(Debug, Clone)]
pub struct DataTrackSubscribeOptions {
    buffer_size: usize,
}

impl DataTrackSubscribeOptions {
    /// Creates subscribe options with default values.
    ///
    /// Equivalent to [`Self::default`].
    ///
    pub fn new() -> Self {
        Self { buffer_size: 16 }
    }

    /// Returns the maximum number of received frames buffered internally.
    pub fn buffer_size(&self) -> usize {
        self.buffer_size
    }

    /// Sets the maximum number of received frames buffered internally.
    ///
    /// Zero is not a valid buffer size; if a value of zero is provided, it will be clamped to one.
    ///
    /// Note: if there is already an active subscription for a given track, specifying a
    /// different buffer size when obtaining a new subscription will have no effect.
    ///
    pub fn with_buffer_size(mut self, mut frames: usize) -> Self {
        if frames == 0 {
            log::warn!("Zero is not a valid buffer size, using one");
            frames = 1;
        }
        self.buffer_size = frames;
        self
    }
}

impl Default for DataTrackSubscribeOptions {
    fn default() -> Self {
        Self::new()
    }
}