livekit_datatrack/local/mod.rs
1// Copyright 2025 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{
16 api::{DataTrack, DataTrackFrame, DataTrackInfo, InternalError},
17 track::DataTrackInner,
18};
19use std::{fmt, marker::PhantomData, sync::Arc};
20use thiserror::Error;
21use tokio::sync::{mpsc, watch};
22
23pub(crate) mod events;
24pub(crate) mod manager;
25pub(crate) mod proto;
26
27mod packetizer;
28mod pipeline;
29
30/// Data track published by the local participant.
31pub type LocalDataTrack = DataTrack<Local>;
32
33/// Marker type indicating a [`DataTrack`] belongs to the local participant.
34///
35/// See also: [`LocalDataTrack`]
36///
37#[derive(Debug, Clone)]
38pub struct Local;
39
40impl DataTrack<Local> {
41 pub(crate) fn new(info: Arc<DataTrackInfo>, inner: LocalTrackInner) -> Self {
42 Self { info, inner: Arc::new(inner.into()), _location: PhantomData }
43 }
44
45 fn inner(&self) -> &LocalTrackInner {
46 match &*self.inner {
47 DataTrackInner::Local(track) => track,
48 DataTrackInner::Remote(_) => unreachable!(), // Safe (type state)
49 }
50 }
51}
52
53impl DataTrack<Local> {
54 /// Try pushing a frame to subscribers of the track.
55 ///
56 /// # Example
57 ///
58 /// ```
59 /// # use livekit_datatrack::api::{LocalDataTrack, DataTrackFrame, PushFrameError};
60 /// # fn example(track: LocalDataTrack) -> Result<(), PushFrameError> {
61 /// fn read_sensor() -> Vec<u8> {
62 /// // Read some sensor data...
63 /// vec![0xFA; 16]
64 /// }
65 ///
66 /// let frame = read_sensor().into(); // Convert to frame
67 /// track.try_push(frame)?;
68 ///
69 /// # Ok(())
70 /// # }
71 /// ```
72 ///
73 /// See [`DataTrackFrame`] for more ways to construct a frame and how to attach metadata.
74 ///
75 /// # Errors
76 ///
77 /// Pushing a frame can fail for several reasons:
78 ///
79 /// - The track has been unpublished by the local participant or SFU
80 /// - The room is no longer connected
81 /// - Frames are being pushed too fast
82 ///
83 pub fn try_push(&self, frame: DataTrackFrame) -> Result<(), PushFrameError> {
84 match self.inner().publish_state() {
85 manager::PublishState::Republishing => {
86 return Err(PushFrameError::new(frame, PushFrameErrorReason::QueueFull))?
87 }
88 manager::PublishState::Unpublished => {
89 return Err(PushFrameError::new(frame, PushFrameErrorReason::TrackUnpublished))?;
90 }
91 manager::PublishState::Published => {}
92 }
93 self.inner()
94 .frame_tx
95 .try_send(frame)
96 .map_err(|err| PushFrameError::new(err.into_inner(), PushFrameErrorReason::QueueFull))
97 }
98
99 /// Unpublishes the track.
100 pub fn unpublish(&self) {
101 self.inner().local_unpublish();
102 }
103}
104
105#[derive(Debug, Clone)]
106pub(crate) struct LocalTrackInner {
107 pub frame_tx: mpsc::Sender<DataTrackFrame>,
108 pub state_tx: watch::Sender<manager::PublishState>,
109}
110
111impl LocalTrackInner {
112 fn publish_state(&self) -> manager::PublishState {
113 *self.state_tx.borrow()
114 }
115
116 pub(crate) fn is_published(&self) -> bool {
117 // Note: a track which is internally in the "resubscribing" state
118 // is still considered published from the public API perspective.
119 self.publish_state() != manager::PublishState::Unpublished
120 }
121
122 pub(crate) async fn wait_for_unpublish(&self) {
123 _ = self
124 .state_tx
125 .subscribe()
126 .wait_for(|state| *state == manager::PublishState::Unpublished)
127 .await
128 }
129
130 fn local_unpublish(&self) {
131 _ = self.state_tx.send(manager::PublishState::Unpublished);
132 }
133}
134
135impl Drop for LocalTrackInner {
136 fn drop(&mut self) {
137 // Implicit unpublish when handle dropped.
138 self.local_unpublish();
139 }
140}
141
142/// Options for publishing a data track.
143///
144/// # Examples
145///
146/// Create options for publishing a track named "my_track":
147///
148/// ```
149/// # use livekit_datatrack::api::DataTrackOptions;
150/// let options = DataTrackOptions::new("my_track");
151/// ```
152///
153#[derive(Clone, Debug)]
154pub struct DataTrackOptions {
155 pub(crate) name: String,
156}
157
158impl DataTrackOptions {
159 /// Creates options with the given track name.
160 ///
161 /// The track name is used to identify the track to other participants.
162 ///
163 /// # Requirements
164 /// - Must not be empty
165 /// - Must be unique per publisher
166 ///
167 pub fn new(name: impl Into<String>) -> Self {
168 Self { name: name.into() }
169 }
170}
171
172impl From<String> for DataTrackOptions {
173 fn from(name: String) -> Self {
174 Self::new(name)
175 }
176}
177
178impl From<&str> for DataTrackOptions {
179 fn from(name: &str) -> Self {
180 Self::new(name.to_string())
181 }
182}
183
184/// An error that can occur when publishing a data track.
185#[derive(Debug, Error)]
186pub enum PublishError {
187 /// Local participant does not have permission to publish data tracks.
188 ///
189 /// Ensure the participant's token contains the `canPublishData` grant.
190 ///
191 #[error("Data track publishing unauthorized")]
192 NotAllowed,
193
194 /// A track with the same name is already published by the local participant.
195 #[error("Track name already taken")]
196 DuplicateName,
197
198 /// The track name is invalid.
199 ///
200 /// This occurs when the name is empty or exceeds the allowed maximum length.
201 ///
202 #[error("Track name invalid")]
203 InvalidName,
204
205 /// Request to publish the track took long to complete.
206 #[error("Publish data track timed-out")]
207 Timeout,
208
209 /// No additional data tracks can be published by the local participant.
210 #[error("Data track publication limit reached")]
211 LimitReached,
212
213 /// Cannot publish data track when the room is disconnected.
214 #[error("Room disconnected")]
215 Disconnected,
216
217 /// Internal error, please report on GitHub.
218 #[error(transparent)]
219 Internal(#[from] InternalError),
220}
221
222/// Frame could not be pushed to a data track.
223#[derive(Debug, Error)]
224#[error("Failed to publish frame: {reason}")]
225pub struct PushFrameError {
226 frame: DataTrackFrame,
227 reason: PushFrameErrorReason,
228}
229
230impl PushFrameError {
231 pub(crate) fn new(frame: DataTrackFrame, reason: PushFrameErrorReason) -> Self {
232 Self { frame, reason }
233 }
234
235 /// Returns the reason the frame could not be pushed.
236 pub fn reason(&self) -> PushFrameErrorReason {
237 self.reason
238 }
239
240 /// Consumes the error and returns the frame that couldn't be pushed.
241 ///
242 /// This may be useful for implementing application-specific retry logic.
243 ///
244 pub fn into_frame(self) -> DataTrackFrame {
245 self.frame
246 }
247}
248
249/// Reason why a data track frame could not be pushed.
250#[derive(Debug, Clone, Copy)]
251pub enum PushFrameErrorReason {
252 /// Track is no longer published.
253 TrackUnpublished,
254 /// Frame was dropped due to the pipeline queue being full.
255 QueueFull,
256}
257
258impl fmt::Display for PushFrameErrorReason {
259 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260 match self {
261 Self::TrackUnpublished => write!(f, "track unpublished"),
262 Self::QueueFull => write!(f, "queue full"),
263 }
264 }
265}