1use std::{borrow::Cow, fmt::Debug, sync::Arc, time::Duration};
16
17use gosuto_libwebrtc::prelude::*;
18use livekit_api::signal_client::{SignalError, SignalOptions};
19use livekit_protocol as proto;
20use livekit_runtime::{interval, Interval, JoinHandle};
21use parking_lot::{RwLock, RwLockReadGuard};
22use thiserror::Error;
23use tokio::sync::{
24 mpsc, oneshot, Mutex as AsyncMutex, Notify, RwLock as AsyncRwLock,
25 RwLockReadGuard as AsyncRwLockReadGuard,
26};
27
28pub use self::rtc_session::{SessionStats, INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD};
29use crate::prelude::ParticipantIdentity;
30use crate::{
31 id::ParticipantSid,
32 options::TrackPublishOptions,
33 prelude::LocalTrack,
34 room::DisconnectReason,
35 rtc_engine::{
36 lk_runtime::LkRuntime,
37 rtc_session::{RtcSession, SessionEvent, SessionEvents},
38 },
39 DataPacketKind,
40};
41use crate::{ChatMessage, E2eeManager, TranscriptionSegment};
42
43pub mod lk_runtime;
44mod peer_transport;
45mod rtc_events;
46mod rtc_session;
47
48pub(crate) type EngineEmitter = mpsc::UnboundedSender<EngineEvent>;
49pub(crate) type EngineEvents = mpsc::UnboundedReceiver<EngineEvent>;
50pub(crate) type EngineResult<T> = Result<T, EngineError>;
51
52pub const RECONNECT_ATTEMPTS: u32 = 10;
53pub const RECONNECT_INTERVAL: Duration = Duration::from_secs(5);
54
55#[derive(Debug, Clone, Copy, Eq, PartialEq)]
56pub enum SimulateScenario {
57 SignalReconnect,
58 Speaker,
59 NodeFailure,
60 ServerLeave,
61 Migration,
62 ForceTcp,
63 ForceTls,
64}
65
66#[derive(Error, Debug)]
67pub enum EngineError {
68 #[error("signal failure: {0}")]
69 Signal(#[from] SignalError),
70 #[error("internal webrtc failure")]
71 Rtc(#[from] RtcError),
72 #[error("connection error: {0}")]
73 Connection(Cow<'static, str>), #[error("internal error: {0}")]
75 Internal(Cow<'static, str>), }
77
78#[derive(Default, Debug, Clone)]
79pub struct EngineOptions {
80 pub rtc_config: RtcConfiguration,
81 pub signal_options: SignalOptions,
82 pub join_retries: u32,
83 pub single_peer_connection: bool,
85}
86
87#[derive(Debug)]
88pub enum EngineEvent {
89 ParticipantUpdate {
90 updates: Vec<proto::ParticipantInfo>,
91 },
92 MediaTrack {
93 track: MediaStreamTrack,
94 stream: MediaStream,
95 transceiver: RtpTransceiver,
96 },
97 Data {
98 participant_sid: Option<ParticipantSid>,
99 participant_identity: Option<ParticipantIdentity>,
100 payload: Vec<u8>,
101 topic: Option<String>,
102 kind: DataPacketKind,
103 encryption_type: proto::encryption::Type,
104 },
105 ChatMessage {
106 participant_identity: ParticipantIdentity,
107 message: ChatMessage,
108 },
109 Transcription {
110 participant_identity: ParticipantIdentity,
111 track_sid: String,
112 segments: Vec<TranscriptionSegment>,
113 },
114 SipDTMF {
115 participant_identity: Option<ParticipantIdentity>,
116 code: u32,
117 digit: Option<String>,
118 },
119 RpcRequest {
120 caller_identity: Option<ParticipantIdentity>,
121 request_id: String,
122 method: String,
123 payload: String,
124 response_timeout: Duration,
125 version: u32,
126 },
127 RpcResponse {
128 request_id: String,
129 payload: Option<String>,
130 error: Option<proto::RpcError>,
131 },
132 RpcAck {
133 request_id: String,
134 },
135 SpeakersChanged {
136 speakers: Vec<proto::SpeakerInfo>,
137 },
138 ConnectionQuality {
139 updates: Vec<proto::ConnectionQualityInfo>,
140 },
141 RoomUpdate {
142 room: proto::Room,
143 },
144 RoomMoved {
145 moved: proto::RoomMovedResponse,
146 },
147 Resuming(oneshot::Sender<()>),
151 Resumed(oneshot::Sender<()>),
152 SignalResumed {
153 reconnect_response: proto::ReconnectResponse,
154 tx: oneshot::Sender<()>,
155 },
156 Restarting(oneshot::Sender<()>),
157 Restarted(oneshot::Sender<()>),
158 SignalRestarted {
159 join_response: proto::JoinResponse,
160 tx: oneshot::Sender<()>,
161 },
162 Disconnected {
163 reason: DisconnectReason,
164 },
165 LocalTrackSubscribed {
166 track_sid: String,
167 },
168 DataStreamHeader {
169 header: proto::data_stream::Header,
170 participant_identity: String,
171 encryption_type: proto::encryption::Type,
172 },
173 DataStreamChunk {
174 chunk: proto::data_stream::Chunk,
175 participant_identity: String,
176 encryption_type: proto::encryption::Type,
177 },
178 DataStreamTrailer {
179 trailer: proto::data_stream::Trailer,
180 participant_identity: String,
181 },
182 DataChannelBufferedAmountLowThresholdChanged {
183 kind: DataPacketKind,
184 threshold: u64,
185 },
186 RefreshToken {
187 url: String,
188 token: String,
189 },
190 TrackMuted {
191 sid: String,
192 muted: bool,
193 },
194}
195
196#[derive(Debug)]
199struct EngineHandle {
200 session: Arc<RtcSession>,
201 closed: bool,
202 reconnecting: bool,
203 can_reconnect: bool,
204
205 full_reconnect: bool,
208 engine_task: Option<(JoinHandle<()>, oneshot::Sender<()>)>,
209}
210
211struct EngineInner {
212 #[allow(dead_code)]
215 lk_runtime: Arc<LkRuntime>,
216 engine_tx: EngineEmitter,
217 options: EngineOptions,
218
219 close_notifier: Arc<Notify>,
220 running_handle: RwLock<EngineHandle>,
221
222 reconnecting_lock: AsyncRwLock<()>,
226 reconnecting_interval: AsyncMutex<Interval>,
227}
228
229pub struct RtcEngine {
230 inner: Arc<EngineInner>,
231}
232
233impl Debug for RtcEngine {
234 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235 f.debug_struct("RtcEngine").finish()
236 }
237}
238
239impl RtcEngine {
240 pub async fn connect(
241 url: &str,
242 token: &str,
243 options: EngineOptions,
244 e2ee_manager: Option<E2eeManager>,
245 ) -> EngineResult<(Self, proto::JoinResponse, EngineEvents)> {
246 let (inner, join_response, engine_events) =
247 EngineInner::connect(url, token, options, e2ee_manager).await?;
248 Ok((Self { inner }, join_response, engine_events))
249 }
250
251 pub async fn close(&self, reason: DisconnectReason) {
252 self.inner.close(reason).await
253 }
254
255 pub async fn publish_data(
256 &self,
257 data: proto::DataPacket,
258 kind: DataPacketKind,
259 is_raw_packet: bool,
260 ) -> EngineResult<()> {
261 let (session, _r_lock) = {
262 let (handle, _r_lock) = self.inner.wait_reconnection().await?;
263 (handle.session.clone(), _r_lock)
264 };
265 session.publish_data(data, kind, is_raw_packet).await
266 }
267
268 pub async fn simulate_scenario(&self, scenario: SimulateScenario) -> EngineResult<()> {
269 let (session, _r_lock) = {
270 let (handle, _r_lock) = self.inner.wait_reconnection().await?;
271 (handle.session.clone(), _r_lock)
272 };
273 session.simulate_scenario(scenario).await
274 }
275
276 pub async fn add_track(&self, req: proto::AddTrackRequest) -> EngineResult<proto::TrackInfo> {
277 let (session, _r_lock) = {
278 let (handle, _r_lock) = self.inner.wait_reconnection().await?;
279 (handle.session.clone(), _r_lock)
280 };
281 session.add_track(req).await
282 }
283
284 pub fn remove_track(&self, sender: RtpSender) -> EngineResult<()> {
285 let session = self.inner.running_handle.read().session.clone();
287 session.remove_track(sender) }
291
292 pub async fn mute_track(&self, req: proto::MuteTrackRequest) -> EngineResult<()> {
293 let (session, _r_lock) = {
294 let (handle, _r_lock) = self.inner.wait_reconnection().await?;
295 (handle.session.clone(), _r_lock)
296 };
297 session.mute_track(req).await
298 }
299
300 pub async fn create_sender(
301 &self,
302 track: LocalTrack,
303 options: TrackPublishOptions,
304 encodings: Vec<RtpEncodingParameters>,
305 ) -> EngineResult<RtpTransceiver> {
306 let (session, _r_lock) = {
308 let (handle, _r_lock) = self.inner.wait_reconnection().await?;
309 (handle.session.clone(), _r_lock)
310 };
311
312 session.create_sender(track, options, encodings).await
313 }
314
315 pub fn publisher_negotiation_needed(&self) {
316 let inner = self.inner.clone();
317 livekit_runtime::spawn(async move {
318 if let Ok((handle, _)) = inner.wait_reconnection().await {
319 handle.session.publisher_negotiation_needed()
320 }
321 });
322 }
323
324 pub async fn send_request(&self, msg: proto::signal_request::Message) {
325 let session = self.inner.running_handle.read().session.clone();
329 session.signal_client().send(msg).await }
332
333 pub async fn get_response(&self, request_id: u32) -> proto::RequestResponse {
334 let session = self.inner.running_handle.read().session.clone();
335 session.get_response(request_id).await
336 }
337
338 pub async fn get_stats(&self) -> EngineResult<SessionStats> {
339 let session = self.inner.running_handle.read().session.clone();
340 session.get_stats().await
341 }
342
343 pub fn session(&self) -> Arc<RtcSession> {
344 self.inner.running_handle.read().session.clone()
345 }
346}
347
348impl EngineInner {
349 async fn connect(
350 url: &str,
351 token: &str,
352 options: EngineOptions,
353 e2ee_manager: Option<E2eeManager>,
354 ) -> EngineResult<(Arc<Self>, proto::JoinResponse, EngineEvents)> {
355 let lk_runtime = LkRuntime::instance();
356 let max_retries = options.join_retries;
357
358 let try_connect = {
359 move || {
360 let options = options.clone();
361 let lk_runtime = lk_runtime.clone();
362 let e2ee_manager = e2ee_manager.clone();
363 async move {
364 let (session, join_response, session_events) =
365 RtcSession::connect(url, token, options.clone(), e2ee_manager).await?;
366 session.wait_pc_connection().await?;
367
368 let (engine_tx, engine_rx) = mpsc::unbounded_channel();
369 let inner = Arc::new(Self {
370 lk_runtime,
371 engine_tx,
372 close_notifier: Arc::new(Notify::new()),
373 running_handle: RwLock::new(EngineHandle {
374 session: Arc::new(session),
375 closed: false,
376 reconnecting: false,
377 can_reconnect: true,
378 full_reconnect: false,
379 engine_task: None,
380 }),
381 options,
382 reconnecting_lock: AsyncRwLock::default(),
383 reconnecting_interval: AsyncMutex::new(interval(RECONNECT_INTERVAL)),
384 });
385
386 let (close_tx, close_rx) = oneshot::channel();
388 let session_task = livekit_runtime::spawn(Self::engine_task(
389 inner.clone(),
390 session_events,
391 close_rx,
392 ));
393 inner.running_handle.write().engine_task = Some((session_task, close_tx));
394
395 Ok((inner, join_response, engine_rx))
396 }
397 }
398 };
399
400 let mut last_error = None;
401 for i in 0..(max_retries + 1) {
402 match try_connect().await {
403 Ok(res) => return Ok(res),
404 Err(e) => {
405 let attempt_i = i + 1;
406 if i < max_retries {
407 log::warn!(
408 "failed to connect: {:?}, retrying... ({}/{})",
409 e,
410 attempt_i,
411 max_retries
412 );
413 }
414 last_error = Some(e)
415 }
416 }
417 }
418
419 Err(last_error.unwrap())
420 }
421
422 async fn engine_task(
423 self: Arc<Self>,
424 mut session_events: SessionEvents,
425 mut close_rx: oneshot::Receiver<()>,
426 ) {
427 loop {
428 tokio::select! {
429 Some(event) = session_events.recv() => {
430 let debug = format!("{:?}", event);
431 let inner = self.clone();
432 let (tx, rx) = oneshot::channel();
433 let task = livekit_runtime::spawn(async move {
434 if let Err(err) = inner.on_session_event(event).await {
435 log::error!("failed to handle session event: {:?}", err);
436 }
437 let _ = tx.send(());
438 });
439
440 tokio::select! {
442 _ = rx => {},
443 _ = livekit_runtime::sleep(Duration::from_secs(10)) => {
444 log::error!("session_event is taking too much time: {}", debug);
445 }
446 }
447
448 task.await;
449 },
450 _ = &mut close_rx => {
451 break;
452 }
453 }
454 }
455
456 log::debug!("engine task closed");
457 }
458
459 async fn on_session_event(self: &Arc<Self>, event: SessionEvent) -> EngineResult<()> {
460 match event {
461 SessionEvent::Close { source, reason, action, retry_now } => {
462 match action {
463 proto::leave_request::Action::Resume
464 | proto::leave_request::Action::Reconnect => {
465 {
466 let running_handle = self.running_handle.read();
467
468 if !running_handle.can_reconnect {
471 return Ok(());
472 }
473 }
475
476 log::warn!(
477 "received session close: {:?} {:?} {:?}",
478 source,
479 reason,
480 action
481 );
482 self.reconnection_needed(
483 retry_now,
484 action == proto::leave_request::Action::Reconnect,
485 );
486 }
487 proto::leave_request::Action::Disconnect => {
488 let mut running_handle = self.running_handle.write();
490 running_handle.can_reconnect = false;
491
492 livekit_runtime::spawn({
495 let inner = self.clone();
496 async move {
497 inner.close(reason).await;
498 }
499 });
500 }
501 }
502 }
503 SessionEvent::Data {
504 participant_sid,
505 participant_identity,
506 payload,
507 topic,
508 kind,
509 encryption_type,
510 } => {
511 let _ = self.engine_tx.send(EngineEvent::Data {
512 participant_sid,
513 participant_identity,
514 payload,
515 topic,
516 kind,
517 encryption_type,
518 });
519 }
520 SessionEvent::ChatMessage { participant_identity, message } => {
521 let _ =
522 self.engine_tx.send(EngineEvent::ChatMessage { participant_identity, message });
523 }
524 SessionEvent::SipDTMF { participant_identity, code, digit } => {
525 let _ =
526 self.engine_tx.send(EngineEvent::SipDTMF { participant_identity, code, digit });
527 }
528 SessionEvent::Transcription { participant_identity, track_sid, segments } => {
529 let _ = self.engine_tx.send(EngineEvent::Transcription {
530 participant_identity,
531 track_sid,
532 segments,
533 });
534 }
535 SessionEvent::SipDTMF { participant_identity, code, digit } => {
536 let _ =
537 self.engine_tx.send(EngineEvent::SipDTMF { participant_identity, code, digit });
538 }
539 SessionEvent::RpcRequest {
540 caller_identity,
541 request_id,
542 method,
543 payload,
544 response_timeout,
545 version,
546 } => {
547 let _ = self.engine_tx.send(EngineEvent::RpcRequest {
548 caller_identity,
549 request_id,
550 method,
551 payload,
552 response_timeout,
553 version,
554 });
555 }
556 SessionEvent::RpcResponse { request_id, payload, error } => {
557 let _ =
558 self.engine_tx.send(EngineEvent::RpcResponse { request_id, payload, error });
559 }
560 SessionEvent::RpcAck { request_id } => {
561 let _ = self.engine_tx.send(EngineEvent::RpcAck { request_id });
562 }
563 SessionEvent::MediaTrack { track, stream, transceiver } => {
564 let _ = self.engine_tx.send(EngineEvent::MediaTrack { track, stream, transceiver });
565 }
566 SessionEvent::ParticipantUpdate { updates } => {
567 let _ = self.engine_tx.send(EngineEvent::ParticipantUpdate { updates });
568 }
569 SessionEvent::SpeakersChanged { speakers } => {
570 let _ = self.engine_tx.send(EngineEvent::SpeakersChanged { speakers });
571 }
572 SessionEvent::ConnectionQuality { updates } => {
573 let _ = self.engine_tx.send(EngineEvent::ConnectionQuality { updates });
574 }
575 SessionEvent::RoomUpdate { room } => {
576 let _ = self.engine_tx.send(EngineEvent::RoomUpdate { room });
577 }
578 SessionEvent::RoomMoved { moved } => {
579 let _ = self.engine_tx.send(EngineEvent::RoomMoved { moved });
580 }
581 SessionEvent::LocalTrackSubscribed { track_sid } => {
582 let _ = self.engine_tx.send(EngineEvent::LocalTrackSubscribed { track_sid });
583 }
584 SessionEvent::DataStreamHeader { header, participant_identity, encryption_type } => {
585 let _ = self.engine_tx.send(EngineEvent::DataStreamHeader {
586 header,
587 participant_identity,
588 encryption_type,
589 });
590 }
591 SessionEvent::DataStreamChunk { chunk, participant_identity, encryption_type } => {
592 let _ = self.engine_tx.send(EngineEvent::DataStreamChunk {
593 chunk,
594 participant_identity,
595 encryption_type,
596 });
597 }
598 SessionEvent::DataStreamTrailer { trailer, participant_identity } => {
599 let _ = self
600 .engine_tx
601 .send(EngineEvent::DataStreamTrailer { trailer, participant_identity });
602 }
603 SessionEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold } => {
604 let _ = self.engine_tx.send(
605 EngineEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold },
606 );
607 }
608 SessionEvent::RefreshToken { url, token } => {
609 let _ = self.engine_tx.send(EngineEvent::RefreshToken { url, token });
610 }
611 SessionEvent::TrackMuted { sid, muted } => {
612 let _ = self.engine_tx.send(EngineEvent::TrackMuted { sid, muted });
613 }
614 }
615 Ok(())
616 }
617
618 async fn close(&self, reason: DisconnectReason) {
621 let (session, engine_task) = {
622 let mut running_handle = self.running_handle.write();
623 running_handle.closed = true;
624
625 let session = running_handle.session.clone();
626 let engine_task = running_handle.engine_task.take();
627 (session, engine_task)
628 };
629
630 if let Some((engine_task, close_tx)) = engine_task {
631 session.close().await;
632 let _ = close_tx.send(());
633 let _ = engine_task.await;
634 let _ = self.engine_tx.send(EngineEvent::Disconnected { reason });
635 }
636 }
637
638 async fn wait_reconnection(
640 &self,
641 ) -> EngineResult<(RwLockReadGuard<EngineHandle>, AsyncRwLockReadGuard<()>)> {
642 let r_lock = self.reconnecting_lock.read().await;
643 let running_handle = self.running_handle.read();
644
645 if running_handle.closed {
646 return Err(EngineError::Connection("engine is closed".into()));
649 }
650
651 Ok((running_handle, r_lock))
652 }
653
654 fn reconnection_needed(self: &Arc<Self>, retry_now: bool, full_reconnect: bool) {
658 let mut running_handle = self.running_handle.write();
659
660 if !running_handle.can_reconnect {
661 return;
662 }
663
664 if running_handle.reconnecting {
665 if full_reconnect {
672 running_handle.full_reconnect = true;
673 }
674
675 if retry_now {
676 let inner = self.clone();
677 livekit_runtime::spawn(async move {
678 inner.reconnecting_interval.lock().await.reset();
679 });
680 }
681
682 return;
683 }
684
685 running_handle.reconnecting = true;
686 running_handle.full_reconnect = full_reconnect;
687
688 livekit_runtime::spawn({
689 let inner = self.clone();
690 async move {
691 let _r_lock = inner.reconnecting_lock.write().await;
693 let close_notifier = inner.close_notifier.clone();
696 let close_receiver = close_notifier.notified();
697 tokio::pin!(close_receiver);
698
699 tokio::select! {
700 _ = &mut close_receiver => {
701 log::debug!("reconnection cancelled");
702 return;
703 }
704 res = inner.reconnect_task() => {
705 if res.is_err() {
706 log::error!("failed to reconnect to the livekit room");
707 inner.close(DisconnectReason::UnknownReason).await;
708 } else {
709 log::info!("RtcEngine successfully recovered")
710 }
711 }
712 }
713
714 let mut running_handle = inner.running_handle.write();
715 running_handle.reconnecting = false;
716
717 }
719 });
720 }
721
722 async fn reconnect_task(self: &Arc<Self>) -> EngineResult<()> {
726 let (url, token, e2ee_manager) = {
729 let running_handle = self.running_handle.read();
730 let signal_client = running_handle.session.signal_client();
731 let e2ee_manager = running_handle.session.e2ee_manager();
732 (
733 signal_client.url(),
734 signal_client.token(), e2ee_manager.clone(),
736 )
737 };
738
739 for i in 0..RECONNECT_ATTEMPTS {
740 let (is_closed, full_reconnect) = {
741 let running_handle = self.running_handle.read();
742 (running_handle.closed, running_handle.full_reconnect)
743 };
744
745 if is_closed {
746 return Err(EngineError::Connection("attempt canncelled, engine is closed".into()));
747 }
748
749 if full_reconnect {
750 if i == 0 {
751 let (tx, rx) = oneshot::channel();
752 let _ = self.engine_tx.send(EngineEvent::Restarting(tx));
753 let _ = rx.await;
754 }
755
756 log::error!("restarting connection... attempt: {}", i);
757 if let Err(err) = self
758 .try_restart_connection(
759 &url,
760 &token,
761 self.options.clone(),
762 e2ee_manager.clone(),
763 )
764 .await
765 {
766 log::error!("restarting connection failed: {}", err);
767 } else {
768 let (tx, rx) = oneshot::channel();
769 let _ = self.engine_tx.send(EngineEvent::Restarted(tx));
770 let _ = rx.await;
771 return Ok(());
772 }
773 } else {
774 if i == 0 {
775 let (tx, rx) = oneshot::channel();
776 let _ = self.engine_tx.send(EngineEvent::Resuming(tx));
777 let _ = rx.await;
778 }
779
780 log::error!("resuming connection... attempt: {}", i);
781 if let Err(err) = self.try_resume_connection().await {
782 log::error!("resuming connection failed: {}", err);
783 let mut running_handle = self.running_handle.write();
784 running_handle.full_reconnect = true;
785 } else {
786 let (tx, rx) = oneshot::channel();
787 let _ = self.engine_tx.send(EngineEvent::Resumed(tx));
788 let _ = rx.await;
789 return Ok(());
790 }
791 }
792
793 self.reconnecting_interval.lock().await.tick().await;
794 }
795
796 Err(EngineError::Connection(
797 format!("failed to reconnect after {}", RECONNECT_ATTEMPTS).into(),
798 ))
799 }
800
801 async fn try_restart_connection(
805 self: &Arc<Self>,
806 url: &str,
807 token: &str,
808 options: EngineOptions,
809 e2ee_manager: Option<E2eeManager>,
810 ) -> EngineResult<()> {
811 let (session, engine_task) = {
813 let mut running_handle = self.running_handle.write();
814 let session = running_handle.session.clone();
815 let engine_task = running_handle.engine_task.take();
816 (session, engine_task)
817 };
818
819 if let Some((engine_task, close_tx)) = engine_task {
820 session.close().await;
821 let _ = close_tx.send(());
822 let _ = engine_task.await;
823 }
824
825 let (new_session, join_response, session_events) =
826 RtcSession::connect(url, token, options, e2ee_manager).await?;
827
828 let (tx, rx) = oneshot::channel();
831 let _ = self.engine_tx.send(EngineEvent::SignalRestarted { join_response, tx });
832 let _ = rx.await;
833
834 new_session.wait_pc_connection().await?;
835
836 let mut handle = self.running_handle.write();
842 handle.session = Arc::new(new_session);
843
844 let (close_tx, close_rx) = oneshot::channel();
845 let task = livekit_runtime::spawn(self.clone().engine_task(session_events, close_rx));
846 handle.engine_task = Some((task, close_tx));
847
848 Ok(())
849 }
850
851 async fn try_resume_connection(&self) -> EngineResult<()> {
853 let session = self.running_handle.read().session.clone();
854 let reconnect_response = session.restart().await?;
855
856 let (tx, rx) = oneshot::channel();
857 let _ = self.engine_tx.send(EngineEvent::SignalResumed { reconnect_response, tx });
858
859 let _ = rx.await;
861
862 session.restart_publisher().await?;
864 session.wait_pc_connection().await
865 }
866}