Skip to main content

livekit_datatrack/local/
mod.rs

1// Copyright 2025 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{
16    api::{DataTrack, DataTrackFrame, DataTrackInfo, InternalError},
17    track::DataTrackInner,
18};
19use std::{fmt, marker::PhantomData, sync::Arc};
20use thiserror::Error;
21use tokio::sync::{mpsc, watch};
22
23pub(crate) mod events;
24pub(crate) mod manager;
25pub(crate) mod proto;
26
27mod packetizer;
28mod pipeline;
29
30/// Data track published by the local participant.
31pub type LocalDataTrack = DataTrack<Local>;
32
33/// Marker type indicating a [`DataTrack`] belongs to the local participant.
34///
35/// See also: [`LocalDataTrack`]
36///
37#[derive(Debug, Clone)]
38pub struct Local;
39
40impl DataTrack<Local> {
41    pub(crate) fn new(info: Arc<DataTrackInfo>, inner: LocalTrackInner) -> Self {
42        Self { info, inner: Arc::new(inner.into()), _location: PhantomData }
43    }
44
45    fn inner(&self) -> &LocalTrackInner {
46        match &*self.inner {
47            DataTrackInner::Local(track) => track,
48            DataTrackInner::Remote(_) => unreachable!(), // Safe (type state)
49        }
50    }
51}
52
53impl DataTrack<Local> {
54    /// Try pushing a frame to subscribers of the track.
55    ///
56    /// # Example
57    ///
58    /// ```
59    /// # use livekit_datatrack::api::{LocalDataTrack, DataTrackFrame, PushFrameError};
60    /// # fn example(track: LocalDataTrack) -> Result<(), PushFrameError> {
61    /// fn read_sensor() -> Vec<u8> {
62    ///     // Read some sensor data...
63    ///     vec![0xFA; 16]
64    /// }
65    ///
66    /// let frame = read_sensor().into(); // Convert to frame
67    /// track.try_push(frame)?;
68    ///
69    /// # Ok(())
70    /// # }
71    /// ```
72    ///
73    /// See [`DataTrackFrame`] for more ways to construct a frame and how to attach metadata.
74    ///
75    /// # Errors
76    ///
77    /// Pushing a frame can fail for several reasons:
78    ///
79    /// - The track has been unpublished by the local participant or SFU
80    /// - The room is no longer connected
81    /// - Frames are being pushed too fast
82    ///
83    pub fn try_push(&self, frame: DataTrackFrame) -> Result<(), PushFrameError> {
84        match self.inner().publish_state() {
85            manager::PublishState::Republishing => {
86                return Err(PushFrameError::new(frame, PushFrameErrorReason::QueueFull))?
87            }
88            manager::PublishState::Unpublished => {
89                return Err(PushFrameError::new(frame, PushFrameErrorReason::TrackUnpublished))?;
90            }
91            manager::PublishState::Published => {}
92        }
93        self.inner()
94            .frame_tx
95            .try_send(frame)
96            .map_err(|err| PushFrameError::new(err.into_inner(), PushFrameErrorReason::QueueFull))
97    }
98
99    /// Unpublishes the track.
100    pub fn unpublish(&self) {
101        self.inner().local_unpublish();
102    }
103}
104
105#[derive(Debug, Clone)]
106pub(crate) struct LocalTrackInner {
107    pub frame_tx: mpsc::Sender<DataTrackFrame>,
108    pub state_tx: watch::Sender<manager::PublishState>,
109}
110
111impl LocalTrackInner {
112    fn publish_state(&self) -> manager::PublishState {
113        *self.state_tx.borrow()
114    }
115
116    pub(crate) fn is_published(&self) -> bool {
117        // Note: a track which is internally in the "resubscribing" state
118        // is still considered published from the public API perspective.
119        self.publish_state() != manager::PublishState::Unpublished
120    }
121
122    pub(crate) async fn wait_for_unpublish(&self) {
123        _ = self
124            .state_tx
125            .subscribe()
126            .wait_for(|state| *state == manager::PublishState::Unpublished)
127            .await
128    }
129
130    fn local_unpublish(&self) {
131        _ = self.state_tx.send(manager::PublishState::Unpublished);
132    }
133}
134
135impl Drop for LocalTrackInner {
136    fn drop(&mut self) {
137        // Implicit unpublish when handle dropped.
138        self.local_unpublish();
139    }
140}
141
142/// Options for publishing a data track.
143///
144/// # Examples
145///
146/// Create options for publishing a track named "my_track":
147///
148/// ```
149/// # use livekit_datatrack::api::DataTrackOptions;
150/// let options = DataTrackOptions::new("my_track");
151/// ```
152///
153#[derive(Clone, Debug)]
154pub struct DataTrackOptions {
155    pub(crate) name: String,
156}
157
158impl DataTrackOptions {
159    /// Creates options with the given track name.
160    ///
161    /// The track name is used to identify the track to other participants.
162    ///
163    /// # Requirements
164    /// - Must not be empty
165    /// - Must be unique per publisher
166    ///
167    pub fn new(name: impl Into<String>) -> Self {
168        Self { name: name.into() }
169    }
170}
171
172impl From<String> for DataTrackOptions {
173    fn from(name: String) -> Self {
174        Self::new(name)
175    }
176}
177
178impl From<&str> for DataTrackOptions {
179    fn from(name: &str) -> Self {
180        Self::new(name.to_string())
181    }
182}
183
184/// An error that can occur when publishing a data track.
185#[derive(Debug, Error)]
186pub enum PublishError {
187    /// Local participant does not have permission to publish data tracks.
188    ///
189    /// Ensure the participant's token contains the `canPublishData` grant.
190    ///
191    #[error("Data track publishing unauthorized")]
192    NotAllowed,
193
194    /// A track with the same name is already published by the local participant.
195    #[error("Track name already taken")]
196    DuplicateName,
197
198    /// The track name is invalid.
199    ///
200    /// This occurs when the name is empty or exceeds the allowed maximum length.
201    ///
202    #[error("Track name invalid")]
203    InvalidName,
204
205    /// Request to publish the track took long to complete.
206    #[error("Publish data track timed-out")]
207    Timeout,
208
209    /// No additional data tracks can be published by the local participant.
210    #[error("Data track publication limit reached")]
211    LimitReached,
212
213    /// Cannot publish data track when the room is disconnected.
214    #[error("Room disconnected")]
215    Disconnected,
216
217    /// Internal error, please report on GitHub.
218    #[error(transparent)]
219    Internal(#[from] InternalError),
220}
221
222/// Frame could not be pushed to a data track.
223#[derive(Debug, Error)]
224#[error("Failed to publish frame: {reason}")]
225pub struct PushFrameError {
226    frame: DataTrackFrame,
227    reason: PushFrameErrorReason,
228}
229
230impl PushFrameError {
231    pub(crate) fn new(frame: DataTrackFrame, reason: PushFrameErrorReason) -> Self {
232        Self { frame, reason }
233    }
234
235    /// Returns the reason the frame could not be pushed.
236    pub fn reason(&self) -> PushFrameErrorReason {
237        self.reason
238    }
239
240    /// Consumes the error and returns the frame that couldn't be pushed.
241    ///
242    /// This may be useful for implementing application-specific retry logic.
243    ///
244    pub fn into_frame(self) -> DataTrackFrame {
245        self.frame
246    }
247}
248
249/// Reason why a data track frame could not be pushed.
250#[derive(Debug, Clone, Copy)]
251pub enum PushFrameErrorReason {
252    /// Track is no longer published.
253    TrackUnpublished,
254    /// Frame was dropped due to the pipeline queue being full.
255    QueueFull,
256}
257
258impl fmt::Display for PushFrameErrorReason {
259    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260        match self {
261            Self::TrackUnpublished => write!(f, "track unpublished"),
262            Self::QueueFull => write!(f, "queue full"),
263        }
264    }
265}