1use super::{
16 events::*,
17 pipeline::{Pipeline, PipelineOptions},
18 LocalTrackInner,
19};
20use crate::{
21 api::{DataTrackFrame, DataTrackInfo, DataTrackOptions, InternalError, PublishError},
22 e2ee::EncryptionProvider,
23 local::LocalDataTrack,
24 packet::{self, Handle},
25};
26use anyhow::{anyhow, Context};
27use futures_core::Stream;
28use std::{collections::HashMap, sync::Arc, time::Duration};
29use tokio::sync::{mpsc, oneshot, watch};
30use tokio_stream::wrappers::ReceiverStream;
31
32#[derive(Debug)]
34pub struct ManagerOptions {
35 pub encryption_provider: Option<Arc<dyn EncryptionProvider>>,
40}
41
42pub struct Manager {
44 encryption_provider: Option<Arc<dyn EncryptionProvider>>,
45 event_in_tx: mpsc::Sender<InputEvent>,
46 event_in_rx: mpsc::Receiver<InputEvent>,
47 event_out_tx: mpsc::Sender<OutputEvent>,
48 handle_allocator: packet::HandleAllocator,
49 descriptors: HashMap<Handle, Descriptor>,
50}
51
52impl Manager {
53 pub fn new(options: ManagerOptions) -> (Self, ManagerInput, impl Stream<Item = OutputEvent>) {
62 let (event_in_tx, event_in_rx) = mpsc::channel(Self::EVENT_BUFFER_COUNT);
63 let (event_out_tx, event_out_rx) = mpsc::channel(Self::EVENT_BUFFER_COUNT);
64
65 let event_in = ManagerInput::new(event_in_tx.clone());
66 let manager = Manager {
67 encryption_provider: options.encryption_provider,
68 event_in_tx,
69 event_in_rx,
70 event_out_tx,
71 handle_allocator: packet::HandleAllocator::default(),
72 descriptors: HashMap::new(),
73 };
74
75 let event_out = ReceiverStream::new(event_out_rx);
76 (manager, event_in, event_out)
77 }
78
79 pub async fn run(mut self) {
84 log::debug!("Task started");
85 while let Some(event) = self.event_in_rx.recv().await {
86 log::debug!("Input event: {:?}", event);
87 match event {
88 InputEvent::PublishRequest(event) => self.on_publish_request(event).await,
89 InputEvent::PublishCancelled(event) => self.on_publish_cancelled(event).await,
90 InputEvent::QueryPublished(event) => self.on_query_published(event).await,
91 InputEvent::UnpublishRequest(event) => self.on_unpublish_request(event).await,
92 InputEvent::SfuPublishResponse(event) => self.on_sfu_publish_response(event).await,
93 InputEvent::SfuUnpublishResponse(event) => {
94 self.on_sfu_unpublish_response(event).await
95 }
96 InputEvent::RepublishTracks => self.on_republish_tracks().await,
97 InputEvent::Shutdown => break,
98 }
99 }
100 self.shutdown().await;
101 log::debug!("Task ended");
102 }
103
104 async fn on_publish_request(&mut self, event: PublishRequest) {
105 let Some(handle) = self.handle_allocator.get() else {
106 _ = event.result_tx.send(Err(PublishError::LimitReached));
107 return;
108 };
109
110 if self.descriptors.contains_key(&handle) {
111 _ = event.result_tx.send(Err(PublishError::Internal(
112 anyhow!("Descriptor for handle already exists").into(),
113 )));
114 return;
115 }
116
117 let (result_tx, result_rx) = oneshot::channel();
118 self.descriptors.insert(handle, Descriptor::Pending(result_tx));
119
120 livekit_runtime::spawn(Self::forward_publish_result(
121 handle,
122 result_rx,
123 event.result_tx,
124 self.event_in_tx.downgrade(),
125 ));
126
127 let event = SfuPublishRequest {
128 handle,
129 name: event.options.name,
130 uses_e2ee: self.encryption_provider.is_some(),
131 };
132 _ = self.event_out_tx.send(event.into()).await;
133 }
134
135 async fn forward_publish_result(
141 handle: Handle,
142 result_rx: oneshot::Receiver<Result<LocalDataTrack, PublishError>>,
143 mut forward_result_tx: oneshot::Sender<Result<LocalDataTrack, PublishError>>,
144 event_in_tx: mpsc::WeakSender<InputEvent>,
145 ) {
146 tokio::select! {
147 biased;
148 Ok(result) = result_rx => {
149 _ = forward_result_tx.send(result);
150 }
151 _ = forward_result_tx.closed() => {
152 let Some(tx) = event_in_tx.upgrade() else { return };
153 let event = PublishCancelled { handle };
154 _ = tx.try_send(event.into());
155 }
156 }
157 }
158
159 async fn on_publish_cancelled(&mut self, event: PublishCancelled) {
160 if self.descriptors.remove(&event.handle).is_none() {
161 log::warn!("No descriptor for {}", event.handle);
162 }
163 }
164
165 async fn on_query_published(&self, event: QueryPublished) {
166 let published_info: Vec<_> = self
167 .descriptors
168 .iter()
169 .filter_map(|descriptor| {
170 let (_, Descriptor::Active { info, .. }) = descriptor else {
171 return None;
172 };
173 info.clone().into()
174 })
175 .collect();
176 _ = event.result_tx.send(published_info);
177 }
178
179 async fn on_unpublish_request(&mut self, event: UnpublishRequest) {
180 self.remove_descriptor(event.handle);
181
182 let event = SfuUnpublishRequest { handle: event.handle };
183 _ = self.event_out_tx.send(event.into()).await;
184 }
185
186 async fn on_sfu_publish_response(&mut self, event: SfuPublishResponse) {
187 let Some(descriptor) = self.descriptors.remove(&event.handle) else {
188 _ = self.event_out_tx.send(SfuUnpublishRequest { handle: event.handle }.into()).await;
191 return;
192 };
193 match descriptor {
194 Descriptor::Pending(result_tx) => {
195 if result_tx.is_closed() {
197 return;
198 }
199 let result = event.result.map(|track_info| self.create_local_track(track_info));
200 _ = result_tx.send(result);
201 return;
202 }
203 Descriptor::Active { ref state_tx, ref info, .. } => {
204 if *state_tx.borrow() != PublishState::Republishing {
205 log::warn!("Track {} already active", event.handle);
206 return;
207 }
208 let Ok(updated_info) = event.result else {
209 log::warn!("Republish failed for track {}", event.handle);
210 return;
211 };
212
213 log::debug!("Track {} republished", event.handle);
214 {
215 let mut sid = info.sid.write().unwrap();
216 *sid = updated_info.sid();
217 }
218 _ = state_tx.send(PublishState::Published);
219 self.descriptors.insert(event.handle, descriptor);
220 }
221 }
222 }
223
224 fn create_local_track(&mut self, info: DataTrackInfo) -> LocalDataTrack {
225 let info = Arc::new(info);
226 let encryption_provider =
227 if info.uses_e2ee() { self.encryption_provider.as_ref().map(Arc::clone) } else { None };
228
229 let pipeline_opts = PipelineOptions { info: info.clone(), encryption_provider };
230 let pipeline = Pipeline::new(pipeline_opts);
231
232 let (frame_tx, frame_rx) = mpsc::channel(Self::FRAME_BUFFER_COUNT);
233 let (state_tx, state_rx) = watch::channel(PublishState::Published);
234
235 let track_task = TrackTask {
236 info: info.clone(),
237 pipeline,
238 state_rx,
239 frame_rx,
240 event_in_tx: self.event_in_tx.clone(),
241 event_out_tx: self.event_out_tx.clone(),
242 };
243 let task_handle = livekit_runtime::spawn(track_task.run());
244
245 self.descriptors.insert(
246 info.pub_handle,
247 Descriptor::Active { info: info.clone(), state_tx: state_tx.clone(), task_handle },
248 );
249
250 let inner = LocalTrackInner { frame_tx, state_tx };
251 LocalDataTrack::new(info, inner)
252 }
253
254 async fn on_sfu_unpublish_response(&mut self, event: SfuUnpublishResponse) {
255 self.remove_descriptor(event.handle);
256 }
257
258 fn remove_descriptor(&mut self, handle: Handle) {
259 let Some(descriptor) = self.descriptors.remove(&handle) else {
260 return;
261 };
262 let Descriptor::Active { state_tx, .. } = descriptor else {
263 return;
264 };
265 if *state_tx.borrow() != PublishState::Unpublished {
266 _ = state_tx.send(PublishState::Unpublished);
267 }
268 }
269
270 async fn on_republish_tracks(&mut self) {
271 let descriptors = std::mem::take(&mut self.descriptors);
272 for (handle, descriptor) in descriptors {
273 match descriptor {
274 Descriptor::Pending(result_tx) => {
275 _ = result_tx.send(Err(PublishError::Disconnected));
277 }
278 Descriptor::Active { ref info, ref state_tx, .. } => {
279 let event = SfuPublishRequest {
280 handle: info.pub_handle,
281 name: info.name.clone(),
282 uses_e2ee: info.uses_e2ee,
283 };
284 _ = state_tx.send(PublishState::Republishing);
285 _ = self.event_out_tx.send(event.into()).await;
286 self.descriptors.insert(handle, descriptor);
287 }
288 }
289 }
290 }
291
292 async fn shutdown(self) {
294 for (_, descriptor) in self.descriptors {
295 match descriptor {
296 Descriptor::Pending(result_tx) => {
297 _ = result_tx.send(Err(PublishError::Disconnected))
298 }
299 Descriptor::Active { state_tx, task_handle, .. } => {
300 _ = state_tx.send(PublishState::Unpublished);
301 task_handle.await;
302 }
303 }
304 }
305 }
306
307 const FRAME_BUFFER_COUNT: usize = 16;
309
310 const EVENT_BUFFER_COUNT: usize = 16;
312}
313
314struct TrackTask {
316 info: Arc<DataTrackInfo>,
317 pipeline: Pipeline,
318 state_rx: watch::Receiver<PublishState>,
319 frame_rx: mpsc::Receiver<DataTrackFrame>,
320 event_in_tx: mpsc::Sender<InputEvent>,
321 event_out_tx: mpsc::Sender<OutputEvent>,
322}
323
324impl TrackTask {
325 async fn run(mut self) {
326 let sid = self.info.sid();
327 log::debug!("Track task started: sid={}", sid);
328
329 let mut state = *self.state_rx.borrow();
330 while state != PublishState::Unpublished {
331 tokio::select! {
332 _ = self.state_rx.changed() => {
333 state = *self.state_rx.borrow();
334 }
335 Some(frame) = self.frame_rx.recv() => {
336 if state == PublishState::Republishing {
337 continue;
339 }
340 self.process_and_send(frame);
341 }
342 }
343 }
344
345 let event = UnpublishRequest { handle: self.info.pub_handle };
346 _ = self.event_in_tx.send(event.into()).await;
347
348 log::debug!("Track task ended: sid={}", sid);
349 }
350
351 fn process_and_send(&mut self, frame: DataTrackFrame) {
352 let Ok(packets) = self
353 .pipeline
354 .process_frame(frame)
355 .inspect_err(|err| log::debug!("Process failed: {}", err))
356 else {
357 return;
358 };
359 let packets: Vec<_> = packets.into_iter().map(|packet| packet.serialize()).collect();
360 _ = self
361 .event_out_tx
362 .try_send(packets.into())
363 .inspect_err(|err| log::debug!("Cannot send packets to transport: {}", err));
364 }
365}
366
367#[derive(Debug)]
368enum Descriptor {
369 Pending(oneshot::Sender<Result<LocalDataTrack, PublishError>>),
375 Active {
380 info: Arc<DataTrackInfo>,
381 state_tx: watch::Sender<PublishState>,
382 task_handle: livekit_runtime::JoinHandle<()>,
383 },
384}
385
386#[derive(Debug, Clone, Copy, PartialEq, Eq)]
387pub(crate) enum PublishState {
388 Published,
390 Republishing,
392 Unpublished,
394}
395
396#[derive(Debug, Clone)]
398pub struct ManagerInput {
399 event_in_tx: mpsc::Sender<InputEvent>,
400 _drop_guard: Arc<DropGuard>,
401}
402
403#[derive(Debug)]
405struct DropGuard {
406 event_in_tx: mpsc::Sender<InputEvent>,
407}
408
409impl Drop for DropGuard {
410 fn drop(&mut self) {
411 _ = self.event_in_tx.try_send(InputEvent::Shutdown);
412 }
413}
414
415impl ManagerInput {
416 fn new(event_in_tx: mpsc::Sender<InputEvent>) -> Self {
417 Self { event_in_tx: event_in_tx.clone(), _drop_guard: DropGuard { event_in_tx }.into() }
418 }
419
420 pub fn send(&self, event: InputEvent) -> Result<(), InternalError> {
422 Ok(self.event_in_tx.try_send(event).context("Failed to handle input event")?)
423 }
424
425 pub async fn publish_track(
427 &self,
428 options: DataTrackOptions,
429 ) -> Result<LocalDataTrack, PublishError> {
430 let (result_tx, result_rx) = oneshot::channel();
431
432 let event = PublishRequest { options, result_tx };
433 self.event_in_tx.try_send(event.into()).map_err(|_| PublishError::Disconnected)?;
434
435 let track = tokio::time::timeout(Self::PUBLISH_TIMEOUT, result_rx)
436 .await
437 .map_err(|_| PublishError::Timeout)?
438 .map_err(|_| PublishError::Disconnected)??;
439
440 Ok(track)
441 }
442
443 pub async fn query_tracks(&self) -> Vec<Arc<DataTrackInfo>> {
448 let (result_tx, result_rx) = oneshot::channel();
449
450 let event = QueryPublished { result_tx };
451 if self.event_in_tx.send(event.into()).await.is_err() {
452 return vec![];
453 }
454
455 result_rx.await.unwrap_or_default()
456 }
457
458 const PUBLISH_TIMEOUT: Duration = Duration::from_secs(10);
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465 use crate::{
466 api::DataTrackSid,
467 e2ee::{EncryptedPayload, EncryptionError, EncryptionProvider},
468 packet::Packet,
469 utils::testing::expect_event,
470 };
471 use bytes::Bytes;
472 use fake::{Fake, Faker};
473 use futures_util::StreamExt;
474 use livekit_runtime::{sleep, timeout};
475 use std::sync::RwLock;
476
477 #[derive(Debug)]
478 struct PrefixingEncryptor;
479
480 impl EncryptionProvider for PrefixingEncryptor {
481 fn encrypt(&self, payload: Bytes) -> Result<EncryptedPayload, EncryptionError> {
482 let mut output = Vec::with_capacity(4 + payload.len());
483 output.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
484 output.extend_from_slice(&payload);
485 Ok(EncryptedPayload { payload: output.into(), iv: [0; 12], key_index: 0 })
486 }
487 }
488
489 #[tokio::test]
490 async fn test_task_shutdown() {
491 let options = ManagerOptions { encryption_provider: None };
492 let (manager, input, _) = Manager::new(options);
493
494 let join_handle = livekit_runtime::spawn(manager.run());
495 _ = input.send(InputEvent::Shutdown);
496
497 timeout(Duration::from_secs(1), join_handle).await.unwrap();
498 }
499
500 #[tokio::test]
501 async fn test_publish() {
502 let payload_size = 256;
503 let packet_count = 10;
504
505 let track_name: String = Faker.fake();
506 let track_sid: DataTrackSid = Faker.fake();
507 let pub_handle: Handle = Faker.fake();
508
509 let options = ManagerOptions { encryption_provider: None };
510 let (manager, input, mut output) = Manager::new(options);
511 livekit_runtime::spawn(manager.run());
512
513 let track_name_clone = track_name.clone();
514 let handle_events = async {
515 let mut packets_sent = 0;
516 while let Some(event) = output.next().await {
517 match event {
518 OutputEvent::SfuPublishRequest(event) => {
519 assert!(!event.uses_e2ee);
520 assert_eq!(event.name, track_name_clone);
521
522 let info = DataTrackInfo {
524 sid: RwLock::new(track_sid.clone()).into(),
525 pub_handle,
526 name: event.name,
527 uses_e2ee: event.uses_e2ee,
528 };
529 let event = SfuPublishResponse { handle: event.handle, result: Ok(info) };
530 _ = input.send(event.into());
531 }
532 OutputEvent::PacketsAvailable(packets) => {
533 let packet = packets.into_iter().nth(0).unwrap();
534 let payload = Packet::deserialize(packet).unwrap().payload;
535 assert_eq!(payload.len(), payload_size);
536 packets_sent += 1;
537 }
538 OutputEvent::SfuUnpublishRequest(event) => {
539 assert_eq!(event.handle, pub_handle);
540 assert_eq!(packets_sent, packet_count);
541 break;
542 }
543 }
544 }
545 };
546 let publish_track = async {
547 let track_options = DataTrackOptions::new(track_name.clone());
548 let track = input.publish_track(track_options).await.unwrap();
549 assert!(!track.info().uses_e2ee());
550 assert_eq!(track.info().name(), track_name);
551 assert_eq!(track.info().sid(), track_sid);
552
553 for _ in 0..packet_count {
554 track.try_push(vec![0xFA; payload_size].into()).unwrap();
555 sleep(Duration::from_millis(10)).await;
556 }
557 };
559 timeout(Duration::from_secs(1), async { tokio::join!(publish_track, handle_events) })
560 .await
561 .unwrap();
562 }
563
564 #[tokio::test]
565 async fn test_publish_sfu_error() {
566 let options = ManagerOptions { encryption_provider: None };
567 let (manager, input, mut output) = Manager::new(options);
568 livekit_runtime::spawn(manager.run());
569
570 let (result_tx, result_rx) = oneshot::channel();
571 let event = PublishRequest { options: DataTrackOptions::new("test"), result_tx };
572 input.send(event.into()).unwrap();
573
574 let event = expect_event!(output, OutputEvent::SfuPublishRequest);
576 let event =
577 SfuPublishResponse { handle: event.handle, result: Err(PublishError::LimitReached) };
578 input.send(event.into()).unwrap();
579
580 assert!(result_rx.await.unwrap().is_err());
581 }
582
583 #[tokio::test]
584 async fn test_publish_cancelled() {
585 let options = ManagerOptions { encryption_provider: None };
586 let (manager, input, mut output) = Manager::new(options);
587 livekit_runtime::spawn(manager.run());
588
589 let (result_tx, result_rx) = oneshot::channel();
590 let event = PublishRequest { options: DataTrackOptions::new("test"), result_tx };
591 input.send(event.into()).unwrap();
592
593 let event = expect_event!(output, OutputEvent::SfuPublishRequest);
594 let handle = event.handle;
595
596 drop(result_rx);
598 sleep(Duration::from_millis(50)).await;
599
600 let track_sid: DataTrackSid = Faker.fake();
602 let info = DataTrackInfo {
603 sid: RwLock::new(track_sid).into(),
604 pub_handle: handle,
605 name: "test".into(),
606 uses_e2ee: false,
607 };
608 let event = SfuPublishResponse { handle, result: Ok(info) };
609 input.send(event.into()).unwrap();
610
611 let event = expect_event!(output, OutputEvent::SfuUnpublishRequest);
613 assert_eq!(event.handle, handle);
614 }
615
616 #[tokio::test]
617 async fn test_publish_with_e2ee() {
618 let options = ManagerOptions { encryption_provider: Some(Arc::new(PrefixingEncryptor)) };
619 let (manager, input, mut output) = Manager::new(options);
620 livekit_runtime::spawn(manager.run());
621
622 let (result_tx, result_rx) = oneshot::channel();
623 let event = PublishRequest { options: DataTrackOptions::new("secure"), result_tx };
624 input.send(event.into()).unwrap();
625
626 let event = expect_event!(output, OutputEvent::SfuPublishRequest);
628 assert!(event.uses_e2ee);
629
630 let track_sid: DataTrackSid = Faker.fake();
632 let info = DataTrackInfo {
633 sid: RwLock::new(track_sid).into(),
634 pub_handle: event.handle,
635 name: "secure".into(),
636 uses_e2ee: true,
637 };
638 let event = SfuPublishResponse { handle: event.handle, result: Ok(info) };
639 input.send(event.into()).unwrap();
640
641 let track = result_rx.await.unwrap().unwrap();
642 assert!(track.info().uses_e2ee());
643
644 track.try_push(vec![1, 2, 3, 4, 5].into()).unwrap();
646
647 let packets = expect_event!(output, OutputEvent::PacketsAvailable);
648 let packet = Packet::deserialize(packets.into_iter().next().unwrap()).unwrap();
649 assert_eq!(&packet.payload[..4], &[0xDE, 0xAD, 0xBE, 0xEF]);
650 assert_eq!(&packet.payload[4..], &[1, 2, 3, 4, 5]);
651 assert!(packet.header.extensions.e2ee.is_some());
652 }
653
654 #[tokio::test]
655 async fn test_republish_tracks() {
656 let options = ManagerOptions { encryption_provider: None };
657 let (manager, input, mut output) = Manager::new(options);
658 livekit_runtime::spawn(manager.run());
659
660 let track_name: String = Faker.fake();
662 let track_sid: DataTrackSid = Faker.fake();
663
664 let (result_tx, result_rx) = oneshot::channel();
665 let event =
666 PublishRequest { options: DataTrackOptions::new(track_name.clone()), result_tx };
667 input.send(event.into()).unwrap();
668
669 let event = expect_event!(output, OutputEvent::SfuPublishRequest);
670 let handle = event.handle;
671
672 let info = DataTrackInfo {
673 sid: RwLock::new(track_sid.clone()).into(),
674 pub_handle: handle,
675 name: track_name.clone(),
676 uses_e2ee: false,
677 };
678 let event = SfuPublishResponse { handle, result: Ok(info) };
679 input.send(event.into()).unwrap();
680
681 let track = result_rx.await.unwrap().unwrap();
682 assert_eq!(track.info().sid(), track_sid);
683
684 input.send(InputEvent::RepublishTracks).unwrap();
686 sleep(Duration::from_millis(50)).await;
687
688 assert!(track.try_push(vec![0xFF].into()).is_err());
690
691 let event = expect_event!(output, OutputEvent::SfuPublishRequest);
693 assert_eq!(event.handle, handle);
694 assert_eq!(event.name, track_name);
695
696 let new_sid: DataTrackSid = Faker.fake();
697 let info = DataTrackInfo {
698 sid: RwLock::new(new_sid.clone()).into(),
699 pub_handle: handle,
700 name: track_name.clone(),
701 uses_e2ee: false,
702 };
703 let event = SfuPublishResponse { handle, result: Ok(info) };
704 input.send(event.into()).unwrap();
705 sleep(Duration::from_millis(50)).await;
706
707 assert_eq!(track.info().sid(), new_sid);
709 assert!(track.try_push(vec![0xFF].into()).is_ok());
710 }
711
712 #[tokio::test]
713 async fn test_query_published() {
714 let options = ManagerOptions { encryption_provider: None };
715 let (manager, input, mut output) = Manager::new(options);
716 livekit_runtime::spawn(manager.run());
717
718 let mut tracks = Vec::new();
720 for name in ["track_a", "track_b"] {
721 let (result_tx, result_rx) = oneshot::channel();
722 let event = PublishRequest { options: DataTrackOptions::new(name), result_tx };
723 input.send(event.into()).unwrap();
724
725 let event = expect_event!(output, OutputEvent::SfuPublishRequest);
726 let info = DataTrackInfo {
727 sid: RwLock::new(Faker.fake()).into(),
728 pub_handle: event.handle,
729 name: name.into(),
730 uses_e2ee: false,
731 };
732 let event = SfuPublishResponse { handle: event.handle, result: Ok(info) };
733 input.send(event.into()).unwrap();
734
735 tracks.push(result_rx.await.unwrap().unwrap());
736 }
737
738 let published = input.query_tracks().await;
739 assert_eq!(published.len(), 2);
740
741 let names: Vec<&str> = published.iter().map(|i| i.name()).collect();
742 assert!(names.contains(&"track_a"));
743 assert!(names.contains(&"track_b"));
744 }
745
746 #[tokio::test]
747 async fn test_shutdown_with_pending_and_active() {
748 let options = ManagerOptions { encryption_provider: None };
749 let (manager, input, mut output) = Manager::new(options);
750 livekit_runtime::spawn(manager.run());
751
752 let (result_tx, pending_rx) = oneshot::channel();
754 let event = PublishRequest { options: DataTrackOptions::new("pending"), result_tx };
755 input.send(event.into()).unwrap();
756
757 expect_event!(output, OutputEvent::SfuPublishRequest);
758
759 let (result_tx, result_rx) = oneshot::channel();
761 let event = PublishRequest { options: DataTrackOptions::new("active"), result_tx };
762 input.send(event.into()).unwrap();
763
764 let event = expect_event!(output, OutputEvent::SfuPublishRequest);
765 let info = DataTrackInfo {
766 sid: RwLock::new(Faker.fake()).into(),
767 pub_handle: event.handle,
768 name: "active".into(),
769 uses_e2ee: false,
770 };
771 let event = SfuPublishResponse { handle: event.handle, result: Ok(info) };
772 input.send(event.into()).unwrap();
773
774 let active_track = result_rx.await.unwrap().unwrap();
775 assert!(active_track.is_published());
776
777 input.send(InputEvent::Shutdown).unwrap();
779 sleep(Duration::from_millis(50)).await;
780
781 let pending_result = pending_rx.await.unwrap();
783 assert!(pending_result.is_err());
784
785 assert!(!active_track.is_published());
787 }
788}