Skip to main content

livekit_datatrack/local/
manager.rs

1// Copyright 2025 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{
16    events::*,
17    pipeline::{Pipeline, PipelineOptions},
18    LocalTrackInner,
19};
20use crate::{
21    api::{DataTrackFrame, DataTrackInfo, DataTrackOptions, InternalError, PublishError},
22    e2ee::EncryptionProvider,
23    local::LocalDataTrack,
24    packet::{self, Handle},
25};
26use anyhow::{anyhow, Context};
27use futures_core::Stream;
28use std::{collections::HashMap, sync::Arc, time::Duration};
29use tokio::sync::{mpsc, oneshot, watch};
30use tokio_stream::wrappers::ReceiverStream;
31
32/// Options for creating a [`Manager`].
33#[derive(Debug)]
34pub struct ManagerOptions {
35    /// Provider to use for encrypting outgoing frame payloads.
36    ///
37    /// If none, end-to-end encryption will be disabled for all published tracks.
38    ///
39    pub encryption_provider: Option<Arc<dyn EncryptionProvider>>,
40}
41
42/// System for managing data track publications.
43pub struct Manager {
44    encryption_provider: Option<Arc<dyn EncryptionProvider>>,
45    event_in_tx: mpsc::Sender<InputEvent>,
46    event_in_rx: mpsc::Receiver<InputEvent>,
47    event_out_tx: mpsc::Sender<OutputEvent>,
48    handle_allocator: packet::HandleAllocator,
49    descriptors: HashMap<Handle, Descriptor>,
50}
51
52impl Manager {
53    /// Creates a new manager.
54    ///
55    /// Returns a tuple containing the following:
56    ///
57    /// - The manager itself to be spawned by the caller (see [`Manager::run`]).
58    /// - Channel for sending [`InputEvent`]s to be processed by the manager.
59    /// - Stream for receiving [`OutputEvent`]s produced by the manager.
60    ///
61    pub fn new(options: ManagerOptions) -> (Self, ManagerInput, impl Stream<Item = OutputEvent>) {
62        let (event_in_tx, event_in_rx) = mpsc::channel(Self::EVENT_BUFFER_COUNT);
63        let (event_out_tx, event_out_rx) = mpsc::channel(Self::EVENT_BUFFER_COUNT);
64
65        let event_in = ManagerInput::new(event_in_tx.clone());
66        let manager = Manager {
67            encryption_provider: options.encryption_provider,
68            event_in_tx,
69            event_in_rx,
70            event_out_tx,
71            handle_allocator: packet::HandleAllocator::default(),
72            descriptors: HashMap::new(),
73        };
74
75        let event_out = ReceiverStream::new(event_out_rx);
76        (manager, event_in, event_out)
77    }
78
79    /// Run the manager task, consuming self.
80    ///
81    /// The manager will continue running until receiving [`InputEvent::Shutdown`].
82    ///
83    pub async fn run(mut self) {
84        log::debug!("Task started");
85        while let Some(event) = self.event_in_rx.recv().await {
86            log::debug!("Input event: {:?}", event);
87            match event {
88                InputEvent::PublishRequest(event) => self.on_publish_request(event).await,
89                InputEvent::PublishCancelled(event) => self.on_publish_cancelled(event).await,
90                InputEvent::QueryPublished(event) => self.on_query_published(event).await,
91                InputEvent::UnpublishRequest(event) => self.on_unpublish_request(event).await,
92                InputEvent::SfuPublishResponse(event) => self.on_sfu_publish_response(event).await,
93                InputEvent::SfuUnpublishResponse(event) => {
94                    self.on_sfu_unpublish_response(event).await
95                }
96                InputEvent::RepublishTracks => self.on_republish_tracks().await,
97                InputEvent::Shutdown => break,
98            }
99        }
100        self.shutdown().await;
101        log::debug!("Task ended");
102    }
103
104    async fn on_publish_request(&mut self, event: PublishRequest) {
105        let Some(handle) = self.handle_allocator.get() else {
106            _ = event.result_tx.send(Err(PublishError::LimitReached));
107            return;
108        };
109
110        if self.descriptors.contains_key(&handle) {
111            _ = event.result_tx.send(Err(PublishError::Internal(
112                anyhow!("Descriptor for handle already exists").into(),
113            )));
114            return;
115        }
116
117        let (result_tx, result_rx) = oneshot::channel();
118        self.descriptors.insert(handle, Descriptor::Pending(result_tx));
119
120        livekit_runtime::spawn(Self::forward_publish_result(
121            handle,
122            result_rx,
123            event.result_tx,
124            self.event_in_tx.downgrade(),
125        ));
126
127        let event = SfuPublishRequest {
128            handle,
129            name: event.options.name,
130            uses_e2ee: self.encryption_provider.is_some(),
131        };
132        _ = self.event_out_tx.send(event.into()).await;
133    }
134
135    /// Task that awaits a pending publish result.
136    ///
137    /// Forwards the result to the user, or notifies the manager if the receiver
138    /// is dropped (e.g., due to timeout) so it can remove the pending publication.
139    ///
140    async fn forward_publish_result(
141        handle: Handle,
142        result_rx: oneshot::Receiver<Result<LocalDataTrack, PublishError>>,
143        mut forward_result_tx: oneshot::Sender<Result<LocalDataTrack, PublishError>>,
144        event_in_tx: mpsc::WeakSender<InputEvent>,
145    ) {
146        tokio::select! {
147            biased;
148            Ok(result) = result_rx => {
149                _ = forward_result_tx.send(result);
150            }
151            _ = forward_result_tx.closed() => {
152                let Some(tx) = event_in_tx.upgrade() else { return };
153                let event = PublishCancelled { handle };
154                _ = tx.try_send(event.into());
155            }
156        }
157    }
158
159    async fn on_publish_cancelled(&mut self, event: PublishCancelled) {
160        if self.descriptors.remove(&event.handle).is_none() {
161            log::warn!("No descriptor for {}", event.handle);
162        }
163    }
164
165    async fn on_query_published(&self, event: QueryPublished) {
166        let published_info: Vec<_> = self
167            .descriptors
168            .iter()
169            .filter_map(|descriptor| {
170                let (_, Descriptor::Active { info, .. }) = descriptor else {
171                    return None;
172                };
173                info.clone().into()
174            })
175            .collect();
176        _ = event.result_tx.send(published_info);
177    }
178
179    async fn on_unpublish_request(&mut self, event: UnpublishRequest) {
180        self.remove_descriptor(event.handle);
181
182        let event = SfuUnpublishRequest { handle: event.handle };
183        _ = self.event_out_tx.send(event.into()).await;
184    }
185
186    async fn on_sfu_publish_response(&mut self, event: SfuPublishResponse) {
187        let Some(descriptor) = self.descriptors.remove(&event.handle) else {
188            // This can occur if a publish request is cancelled before the SFU responds,
189            // send an unpublish request to ensure consistent SFU state.
190            _ = self.event_out_tx.send(SfuUnpublishRequest { handle: event.handle }.into()).await;
191            return;
192        };
193        match descriptor {
194            Descriptor::Pending(result_tx) => {
195                // SFU accepted initial publication request
196                if result_tx.is_closed() {
197                    return;
198                }
199                let result = event.result.map(|track_info| self.create_local_track(track_info));
200                _ = result_tx.send(result);
201                return;
202            }
203            Descriptor::Active { ref state_tx, ref info, .. } => {
204                if *state_tx.borrow() != PublishState::Republishing {
205                    log::warn!("Track {} already active", event.handle);
206                    return;
207                }
208                let Ok(updated_info) = event.result else {
209                    log::warn!("Republish failed for track {}", event.handle);
210                    return;
211                };
212
213                log::debug!("Track {} republished", event.handle);
214                {
215                    let mut sid = info.sid.write().unwrap();
216                    *sid = updated_info.sid();
217                }
218                _ = state_tx.send(PublishState::Published);
219                self.descriptors.insert(event.handle, descriptor);
220            }
221        }
222    }
223
224    fn create_local_track(&mut self, info: DataTrackInfo) -> LocalDataTrack {
225        let info = Arc::new(info);
226        let encryption_provider =
227            if info.uses_e2ee() { self.encryption_provider.as_ref().map(Arc::clone) } else { None };
228
229        let pipeline_opts = PipelineOptions { info: info.clone(), encryption_provider };
230        let pipeline = Pipeline::new(pipeline_opts);
231
232        let (frame_tx, frame_rx) = mpsc::channel(Self::FRAME_BUFFER_COUNT);
233        let (state_tx, state_rx) = watch::channel(PublishState::Published);
234
235        let track_task = TrackTask {
236            info: info.clone(),
237            pipeline,
238            state_rx,
239            frame_rx,
240            event_in_tx: self.event_in_tx.clone(),
241            event_out_tx: self.event_out_tx.clone(),
242        };
243        let task_handle = livekit_runtime::spawn(track_task.run());
244
245        self.descriptors.insert(
246            info.pub_handle,
247            Descriptor::Active { info: info.clone(), state_tx: state_tx.clone(), task_handle },
248        );
249
250        let inner = LocalTrackInner { frame_tx, state_tx };
251        LocalDataTrack::new(info, inner)
252    }
253
254    async fn on_sfu_unpublish_response(&mut self, event: SfuUnpublishResponse) {
255        self.remove_descriptor(event.handle);
256    }
257
258    fn remove_descriptor(&mut self, handle: Handle) {
259        let Some(descriptor) = self.descriptors.remove(&handle) else {
260            return;
261        };
262        let Descriptor::Active { state_tx, .. } = descriptor else {
263            return;
264        };
265        if *state_tx.borrow() != PublishState::Unpublished {
266            _ = state_tx.send(PublishState::Unpublished);
267        }
268    }
269
270    async fn on_republish_tracks(&mut self) {
271        let descriptors = std::mem::take(&mut self.descriptors);
272        for (handle, descriptor) in descriptors {
273            match descriptor {
274                Descriptor::Pending(result_tx) => {
275                    // TODO: support republish for pending publications
276                    _ = result_tx.send(Err(PublishError::Disconnected));
277                }
278                Descriptor::Active { ref info, ref state_tx, .. } => {
279                    let event = SfuPublishRequest {
280                        handle: info.pub_handle,
281                        name: info.name.clone(),
282                        uses_e2ee: info.uses_e2ee,
283                    };
284                    _ = state_tx.send(PublishState::Republishing);
285                    _ = self.event_out_tx.send(event.into()).await;
286                    self.descriptors.insert(handle, descriptor);
287                }
288            }
289        }
290    }
291
292    /// Performs cleanup before the task ends.
293    async fn shutdown(self) {
294        for (_, descriptor) in self.descriptors {
295            match descriptor {
296                Descriptor::Pending(result_tx) => {
297                    _ = result_tx.send(Err(PublishError::Disconnected))
298                }
299                Descriptor::Active { state_tx, task_handle, .. } => {
300                    _ = state_tx.send(PublishState::Unpublished);
301                    task_handle.await;
302                }
303            }
304        }
305    }
306
307    /// Maximum number of outgoing frames to buffer per track.
308    const FRAME_BUFFER_COUNT: usize = 16;
309
310    /// Maximum number of input and output events to buffer.
311    const EVENT_BUFFER_COUNT: usize = 16;
312}
313
314/// Task for an individual published data track.
315struct TrackTask {
316    info: Arc<DataTrackInfo>,
317    pipeline: Pipeline,
318    state_rx: watch::Receiver<PublishState>,
319    frame_rx: mpsc::Receiver<DataTrackFrame>,
320    event_in_tx: mpsc::Sender<InputEvent>,
321    event_out_tx: mpsc::Sender<OutputEvent>,
322}
323
324impl TrackTask {
325    async fn run(mut self) {
326        let sid = self.info.sid();
327        log::debug!("Track task started: sid={}", sid);
328
329        let mut state = *self.state_rx.borrow();
330        while state != PublishState::Unpublished {
331            tokio::select! {
332                _ = self.state_rx.changed() => {
333                    state = *self.state_rx.borrow();
334                }
335                Some(frame) = self.frame_rx.recv() => {
336                    if state == PublishState::Republishing {
337                        // Drop frames while republishing.
338                        continue;
339                    }
340                    self.process_and_send(frame);
341                }
342            }
343        }
344
345        let event = UnpublishRequest { handle: self.info.pub_handle };
346        _ = self.event_in_tx.send(event.into()).await;
347
348        log::debug!("Track task ended: sid={}", sid);
349    }
350
351    fn process_and_send(&mut self, frame: DataTrackFrame) {
352        let Ok(packets) = self
353            .pipeline
354            .process_frame(frame)
355            .inspect_err(|err| log::debug!("Process failed: {}", err))
356        else {
357            return;
358        };
359        let packets: Vec<_> = packets.into_iter().map(|packet| packet.serialize()).collect();
360        _ = self
361            .event_out_tx
362            .try_send(packets.into())
363            .inspect_err(|err| log::debug!("Cannot send packets to transport: {}", err));
364    }
365}
366
367#[derive(Debug)]
368enum Descriptor {
369    /// Publication is awaiting SFU response.
370    ///
371    /// The associated channel is used to send a result to the user,
372    /// either the local track or a publish error.
373    ///
374    Pending(oneshot::Sender<Result<LocalDataTrack, PublishError>>),
375    /// Publication is active.
376    ///
377    /// The associated channel is used to end the track task.
378    ///
379    Active {
380        info: Arc<DataTrackInfo>,
381        state_tx: watch::Sender<PublishState>,
382        task_handle: livekit_runtime::JoinHandle<()>,
383    },
384}
385
386#[derive(Debug, Clone, Copy, PartialEq, Eq)]
387pub(crate) enum PublishState {
388    /// Track is published.
389    Published,
390    /// Track is being republished.
391    Republishing,
392    /// Track is no longer published.
393    Unpublished,
394}
395
396/// Channel for sending [`InputEvent`]s to [`Manager`].
397#[derive(Debug, Clone)]
398pub struct ManagerInput {
399    event_in_tx: mpsc::Sender<InputEvent>,
400    _drop_guard: Arc<DropGuard>,
401}
402
403/// Guard that sends shutdown event when the last reference is dropped.
404#[derive(Debug)]
405struct DropGuard {
406    event_in_tx: mpsc::Sender<InputEvent>,
407}
408
409impl Drop for DropGuard {
410    fn drop(&mut self) {
411        _ = self.event_in_tx.try_send(InputEvent::Shutdown);
412    }
413}
414
415impl ManagerInput {
416    fn new(event_in_tx: mpsc::Sender<InputEvent>) -> Self {
417        Self { event_in_tx: event_in_tx.clone(), _drop_guard: DropGuard { event_in_tx }.into() }
418    }
419
420    /// Sends an input event to the manager's task to be processed.
421    pub fn send(&self, event: InputEvent) -> Result<(), InternalError> {
422        Ok(self.event_in_tx.try_send(event).context("Failed to handle input event")?)
423    }
424
425    /// Publishes a data track with given options.
426    pub async fn publish_track(
427        &self,
428        options: DataTrackOptions,
429    ) -> Result<LocalDataTrack, PublishError> {
430        let (result_tx, result_rx) = oneshot::channel();
431
432        let event = PublishRequest { options, result_tx };
433        self.event_in_tx.try_send(event.into()).map_err(|_| PublishError::Disconnected)?;
434
435        let track = tokio::time::timeout(Self::PUBLISH_TIMEOUT, result_rx)
436            .await
437            .map_err(|_| PublishError::Timeout)?
438            .map_err(|_| PublishError::Disconnected)??;
439
440        Ok(track)
441    }
442
443    /// Get information about all currently published tracks.
444    ///
445    /// This does not include publications that are still pending.
446    ///
447    pub async fn query_tracks(&self) -> Vec<Arc<DataTrackInfo>> {
448        let (result_tx, result_rx) = oneshot::channel();
449
450        let event = QueryPublished { result_tx };
451        if self.event_in_tx.send(event.into()).await.is_err() {
452            return vec![];
453        }
454
455        result_rx.await.unwrap_or_default()
456    }
457
458    /// How long to wait for before timeout.
459    const PUBLISH_TIMEOUT: Duration = Duration::from_secs(10);
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465    use crate::{
466        api::DataTrackSid,
467        e2ee::{EncryptedPayload, EncryptionError, EncryptionProvider},
468        packet::Packet,
469        utils::testing::expect_event,
470    };
471    use bytes::Bytes;
472    use fake::{Fake, Faker};
473    use futures_util::StreamExt;
474    use livekit_runtime::{sleep, timeout};
475    use std::sync::RwLock;
476
477    #[derive(Debug)]
478    struct PrefixingEncryptor;
479
480    impl EncryptionProvider for PrefixingEncryptor {
481        fn encrypt(&self, payload: Bytes) -> Result<EncryptedPayload, EncryptionError> {
482            let mut output = Vec::with_capacity(4 + payload.len());
483            output.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
484            output.extend_from_slice(&payload);
485            Ok(EncryptedPayload { payload: output.into(), iv: [0; 12], key_index: 0 })
486        }
487    }
488
489    #[tokio::test]
490    async fn test_task_shutdown() {
491        let options = ManagerOptions { encryption_provider: None };
492        let (manager, input, _) = Manager::new(options);
493
494        let join_handle = livekit_runtime::spawn(manager.run());
495        _ = input.send(InputEvent::Shutdown);
496
497        timeout(Duration::from_secs(1), join_handle).await.unwrap();
498    }
499
500    #[tokio::test]
501    async fn test_publish() {
502        let payload_size = 256;
503        let packet_count = 10;
504
505        let track_name: String = Faker.fake();
506        let track_sid: DataTrackSid = Faker.fake();
507        let pub_handle: Handle = Faker.fake();
508
509        let options = ManagerOptions { encryption_provider: None };
510        let (manager, input, mut output) = Manager::new(options);
511        livekit_runtime::spawn(manager.run());
512
513        let track_name_clone = track_name.clone();
514        let handle_events = async {
515            let mut packets_sent = 0;
516            while let Some(event) = output.next().await {
517                match event {
518                    OutputEvent::SfuPublishRequest(event) => {
519                        assert!(!event.uses_e2ee);
520                        assert_eq!(event.name, track_name_clone);
521
522                        // SFU accepts publication
523                        let info = DataTrackInfo {
524                            sid: RwLock::new(track_sid.clone()).into(),
525                            pub_handle,
526                            name: event.name,
527                            uses_e2ee: event.uses_e2ee,
528                        };
529                        let event = SfuPublishResponse { handle: event.handle, result: Ok(info) };
530                        _ = input.send(event.into());
531                    }
532                    OutputEvent::PacketsAvailable(packets) => {
533                        let packet = packets.into_iter().nth(0).unwrap();
534                        let payload = Packet::deserialize(packet).unwrap().payload;
535                        assert_eq!(payload.len(), payload_size);
536                        packets_sent += 1;
537                    }
538                    OutputEvent::SfuUnpublishRequest(event) => {
539                        assert_eq!(event.handle, pub_handle);
540                        assert_eq!(packets_sent, packet_count);
541                        break;
542                    }
543                }
544            }
545        };
546        let publish_track = async {
547            let track_options = DataTrackOptions::new(track_name.clone());
548            let track = input.publish_track(track_options).await.unwrap();
549            assert!(!track.info().uses_e2ee());
550            assert_eq!(track.info().name(), track_name);
551            assert_eq!(track.info().sid(), track_sid);
552
553            for _ in 0..packet_count {
554                track.try_push(vec![0xFA; payload_size].into()).unwrap();
555                sleep(Duration::from_millis(10)).await;
556            }
557            // Only reference to track dropped here (unpublish)
558        };
559        timeout(Duration::from_secs(1), async { tokio::join!(publish_track, handle_events) })
560            .await
561            .unwrap();
562    }
563
564    #[tokio::test]
565    async fn test_publish_sfu_error() {
566        let options = ManagerOptions { encryption_provider: None };
567        let (manager, input, mut output) = Manager::new(options);
568        livekit_runtime::spawn(manager.run());
569
570        let (result_tx, result_rx) = oneshot::channel();
571        let event = PublishRequest { options: DataTrackOptions::new("test"), result_tx };
572        input.send(event.into()).unwrap();
573
574        // SFU rejects publication
575        let event = expect_event!(output, OutputEvent::SfuPublishRequest);
576        let event =
577            SfuPublishResponse { handle: event.handle, result: Err(PublishError::LimitReached) };
578        input.send(event.into()).unwrap();
579
580        assert!(result_rx.await.unwrap().is_err());
581    }
582
583    #[tokio::test]
584    async fn test_publish_cancelled() {
585        let options = ManagerOptions { encryption_provider: None };
586        let (manager, input, mut output) = Manager::new(options);
587        livekit_runtime::spawn(manager.run());
588
589        let (result_tx, result_rx) = oneshot::channel();
590        let event = PublishRequest { options: DataTrackOptions::new("test"), result_tx };
591        input.send(event.into()).unwrap();
592
593        let event = expect_event!(output, OutputEvent::SfuPublishRequest);
594        let handle = event.handle;
595
596        // Caller drops receiver before SFU responds
597        drop(result_rx);
598        sleep(Duration::from_millis(50)).await;
599
600        // Late SFU response arrives after cancellation
601        let track_sid: DataTrackSid = Faker.fake();
602        let info = DataTrackInfo {
603            sid: RwLock::new(track_sid).into(),
604            pub_handle: handle,
605            name: "test".into(),
606            uses_e2ee: false,
607        };
608        let event = SfuPublishResponse { handle, result: Ok(info) };
609        input.send(event.into()).unwrap();
610
611        // Manager sends unpublish for the orphaned handle
612        let event = expect_event!(output, OutputEvent::SfuUnpublishRequest);
613        assert_eq!(event.handle, handle);
614    }
615
616    #[tokio::test]
617    async fn test_publish_with_e2ee() {
618        let options = ManagerOptions { encryption_provider: Some(Arc::new(PrefixingEncryptor)) };
619        let (manager, input, mut output) = Manager::new(options);
620        livekit_runtime::spawn(manager.run());
621
622        let (result_tx, result_rx) = oneshot::channel();
623        let event = PublishRequest { options: DataTrackOptions::new("secure"), result_tx };
624        input.send(event.into()).unwrap();
625
626        // SFU publish request should indicate e2ee
627        let event = expect_event!(output, OutputEvent::SfuPublishRequest);
628        assert!(event.uses_e2ee);
629
630        // SFU accepts publication with e2ee
631        let track_sid: DataTrackSid = Faker.fake();
632        let info = DataTrackInfo {
633            sid: RwLock::new(track_sid).into(),
634            pub_handle: event.handle,
635            name: "secure".into(),
636            uses_e2ee: true,
637        };
638        let event = SfuPublishResponse { handle: event.handle, result: Ok(info) };
639        input.send(event.into()).unwrap();
640
641        let track = result_rx.await.unwrap().unwrap();
642        assert!(track.info().uses_e2ee());
643
644        // Push a frame and verify encryption was applied
645        track.try_push(vec![1, 2, 3, 4, 5].into()).unwrap();
646
647        let packets = expect_event!(output, OutputEvent::PacketsAvailable);
648        let packet = Packet::deserialize(packets.into_iter().next().unwrap()).unwrap();
649        assert_eq!(&packet.payload[..4], &[0xDE, 0xAD, 0xBE, 0xEF]);
650        assert_eq!(&packet.payload[4..], &[1, 2, 3, 4, 5]);
651        assert!(packet.header.extensions.e2ee.is_some());
652    }
653
654    #[tokio::test]
655    async fn test_republish_tracks() {
656        let options = ManagerOptions { encryption_provider: None };
657        let (manager, input, mut output) = Manager::new(options);
658        livekit_runtime::spawn(manager.run());
659
660        // Publish a track through the full flow
661        let track_name: String = Faker.fake();
662        let track_sid: DataTrackSid = Faker.fake();
663
664        let (result_tx, result_rx) = oneshot::channel();
665        let event =
666            PublishRequest { options: DataTrackOptions::new(track_name.clone()), result_tx };
667        input.send(event.into()).unwrap();
668
669        let event = expect_event!(output, OutputEvent::SfuPublishRequest);
670        let handle = event.handle;
671
672        let info = DataTrackInfo {
673            sid: RwLock::new(track_sid.clone()).into(),
674            pub_handle: handle,
675            name: track_name.clone(),
676            uses_e2ee: false,
677        };
678        let event = SfuPublishResponse { handle, result: Ok(info) };
679        input.send(event.into()).unwrap();
680
681        let track = result_rx.await.unwrap().unwrap();
682        assert_eq!(track.info().sid(), track_sid);
683
684        // Simulate reconnect
685        input.send(InputEvent::RepublishTracks).unwrap();
686        sleep(Duration::from_millis(50)).await;
687
688        // try_push should fail while republishing
689        assert!(track.try_push(vec![0xFF].into()).is_err());
690
691        // SFU re-publishes with a new SID
692        let event = expect_event!(output, OutputEvent::SfuPublishRequest);
693        assert_eq!(event.handle, handle);
694        assert_eq!(event.name, track_name);
695
696        let new_sid: DataTrackSid = Faker.fake();
697        let info = DataTrackInfo {
698            sid: RwLock::new(new_sid.clone()).into(),
699            pub_handle: handle,
700            name: track_name.clone(),
701            uses_e2ee: false,
702        };
703        let event = SfuPublishResponse { handle, result: Ok(info) };
704        input.send(event.into()).unwrap();
705        sleep(Duration::from_millis(50)).await;
706
707        // SID updated in place, pushes succeed again
708        assert_eq!(track.info().sid(), new_sid);
709        assert!(track.try_push(vec![0xFF].into()).is_ok());
710    }
711
712    #[tokio::test]
713    async fn test_query_published() {
714        let options = ManagerOptions { encryption_provider: None };
715        let (manager, input, mut output) = Manager::new(options);
716        livekit_runtime::spawn(manager.run());
717
718        // Publish two tracks
719        let mut tracks = Vec::new();
720        for name in ["track_a", "track_b"] {
721            let (result_tx, result_rx) = oneshot::channel();
722            let event = PublishRequest { options: DataTrackOptions::new(name), result_tx };
723            input.send(event.into()).unwrap();
724
725            let event = expect_event!(output, OutputEvent::SfuPublishRequest);
726            let info = DataTrackInfo {
727                sid: RwLock::new(Faker.fake()).into(),
728                pub_handle: event.handle,
729                name: name.into(),
730                uses_e2ee: false,
731            };
732            let event = SfuPublishResponse { handle: event.handle, result: Ok(info) };
733            input.send(event.into()).unwrap();
734
735            tracks.push(result_rx.await.unwrap().unwrap());
736        }
737
738        let published = input.query_tracks().await;
739        assert_eq!(published.len(), 2);
740
741        let names: Vec<&str> = published.iter().map(|i| i.name()).collect();
742        assert!(names.contains(&"track_a"));
743        assert!(names.contains(&"track_b"));
744    }
745
746    #[tokio::test]
747    async fn test_shutdown_with_pending_and_active() {
748        let options = ManagerOptions { encryption_provider: None };
749        let (manager, input, mut output) = Manager::new(options);
750        livekit_runtime::spawn(manager.run());
751
752        // Pending publication (no SFU response sent)
753        let (result_tx, pending_rx) = oneshot::channel();
754        let event = PublishRequest { options: DataTrackOptions::new("pending"), result_tx };
755        input.send(event.into()).unwrap();
756
757        expect_event!(output, OutputEvent::SfuPublishRequest);
758
759        // Active publication (fully published)
760        let (result_tx, result_rx) = oneshot::channel();
761        let event = PublishRequest { options: DataTrackOptions::new("active"), result_tx };
762        input.send(event.into()).unwrap();
763
764        let event = expect_event!(output, OutputEvent::SfuPublishRequest);
765        let info = DataTrackInfo {
766            sid: RwLock::new(Faker.fake()).into(),
767            pub_handle: event.handle,
768            name: "active".into(),
769            uses_e2ee: false,
770        };
771        let event = SfuPublishResponse { handle: event.handle, result: Ok(info) };
772        input.send(event.into()).unwrap();
773
774        let active_track = result_rx.await.unwrap().unwrap();
775        assert!(active_track.is_published());
776
777        // Shutdown the manager
778        input.send(InputEvent::Shutdown).unwrap();
779        sleep(Duration::from_millis(50)).await;
780
781        // Pending publish receives disconnected error
782        let pending_result = pending_rx.await.unwrap();
783        assert!(pending_result.is_err());
784
785        // Active track is no longer published
786        assert!(!active_track.is_published());
787    }
788}