Skip to main content

gosuto_livekit/rtc_engine/
mod.rs

1// Copyright 2025 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{borrow::Cow, fmt::Debug, sync::Arc, time::Duration};
16
17use gosuto_libwebrtc::prelude::*;
18use livekit_api::signal_client::{SignalError, SignalOptions};
19use livekit_protocol as proto;
20use livekit_runtime::{interval, Interval, JoinHandle};
21use parking_lot::{RwLock, RwLockReadGuard};
22use thiserror::Error;
23use tokio::sync::{
24    mpsc, oneshot, Mutex as AsyncMutex, Notify, RwLock as AsyncRwLock,
25    RwLockReadGuard as AsyncRwLockReadGuard,
26};
27
28pub use self::rtc_session::{SessionStats, INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD};
29use crate::prelude::ParticipantIdentity;
30use crate::{
31    id::ParticipantSid,
32    options::TrackPublishOptions,
33    prelude::LocalTrack,
34    room::DisconnectReason,
35    rtc_engine::{
36        lk_runtime::LkRuntime,
37        rtc_session::{RtcSession, SessionEvent, SessionEvents},
38    },
39    DataPacketKind,
40};
41use crate::{ChatMessage, E2eeManager, TranscriptionSegment};
42
43pub mod lk_runtime;
44mod peer_transport;
45mod rtc_events;
46mod rtc_session;
47
48pub(crate) type EngineEmitter = mpsc::UnboundedSender<EngineEvent>;
49pub(crate) type EngineEvents = mpsc::UnboundedReceiver<EngineEvent>;
50pub(crate) type EngineResult<T> = Result<T, EngineError>;
51
52pub const RECONNECT_ATTEMPTS: u32 = 10;
53pub const RECONNECT_INTERVAL: Duration = Duration::from_secs(5);
54
55#[derive(Debug, Clone, Copy, Eq, PartialEq)]
56pub enum SimulateScenario {
57    SignalReconnect,
58    Speaker,
59    NodeFailure,
60    ServerLeave,
61    Migration,
62    ForceTcp,
63    ForceTls,
64}
65
66#[derive(Error, Debug)]
67pub enum EngineError {
68    #[error("signal failure: {0}")]
69    Signal(#[from] SignalError),
70    #[error("internal webrtc failure")]
71    Rtc(#[from] RtcError),
72    #[error("connection error: {0}")]
73    Connection(Cow<'static, str>), // Connectivity issues (Failed to connect/reconnect)
74    #[error("internal error: {0}")]
75    Internal(Cow<'static, str>), // Unexpected error, generally we can't recover
76}
77
78#[derive(Default, Debug, Clone)]
79pub struct EngineOptions {
80    pub rtc_config: RtcConfiguration,
81    pub signal_options: SignalOptions,
82    pub join_retries: u32,
83    /// Enable single peer connection mode
84    pub single_peer_connection: bool,
85}
86
87#[derive(Debug)]
88pub enum EngineEvent {
89    ParticipantUpdate {
90        updates: Vec<proto::ParticipantInfo>,
91    },
92    MediaTrack {
93        track: MediaStreamTrack,
94        stream: MediaStream,
95        transceiver: RtpTransceiver,
96    },
97    Data {
98        participant_sid: Option<ParticipantSid>,
99        participant_identity: Option<ParticipantIdentity>,
100        payload: Vec<u8>,
101        topic: Option<String>,
102        kind: DataPacketKind,
103        encryption_type: proto::encryption::Type,
104    },
105    ChatMessage {
106        participant_identity: ParticipantIdentity,
107        message: ChatMessage,
108    },
109    Transcription {
110        participant_identity: ParticipantIdentity,
111        track_sid: String,
112        segments: Vec<TranscriptionSegment>,
113    },
114    SipDTMF {
115        participant_identity: Option<ParticipantIdentity>,
116        code: u32,
117        digit: Option<String>,
118    },
119    RpcRequest {
120        caller_identity: Option<ParticipantIdentity>,
121        request_id: String,
122        method: String,
123        payload: String,
124        response_timeout: Duration,
125        version: u32,
126    },
127    RpcResponse {
128        request_id: String,
129        payload: Option<String>,
130        error: Option<proto::RpcError>,
131    },
132    RpcAck {
133        request_id: String,
134    },
135    SpeakersChanged {
136        speakers: Vec<proto::SpeakerInfo>,
137    },
138    ConnectionQuality {
139        updates: Vec<proto::ConnectionQualityInfo>,
140    },
141    RoomUpdate {
142        room: proto::Room,
143    },
144    RoomMoved {
145        moved: proto::RoomMovedResponse,
146    },
147    /// The following events are used to notify the room about the reconnection state
148    /// Since the room needs to also sync state in a good timing with the server.
149    /// We synchronize the state with a one-shot channel.
150    Resuming(oneshot::Sender<()>),
151    Resumed(oneshot::Sender<()>),
152    SignalResumed {
153        reconnect_response: proto::ReconnectResponse,
154        tx: oneshot::Sender<()>,
155    },
156    Restarting(oneshot::Sender<()>),
157    Restarted(oneshot::Sender<()>),
158    SignalRestarted {
159        join_response: proto::JoinResponse,
160        tx: oneshot::Sender<()>,
161    },
162    Disconnected {
163        reason: DisconnectReason,
164    },
165    LocalTrackSubscribed {
166        track_sid: String,
167    },
168    DataStreamHeader {
169        header: proto::data_stream::Header,
170        participant_identity: String,
171        encryption_type: proto::encryption::Type,
172    },
173    DataStreamChunk {
174        chunk: proto::data_stream::Chunk,
175        participant_identity: String,
176        encryption_type: proto::encryption::Type,
177    },
178    DataStreamTrailer {
179        trailer: proto::data_stream::Trailer,
180        participant_identity: String,
181    },
182    DataChannelBufferedAmountLowThresholdChanged {
183        kind: DataPacketKind,
184        threshold: u64,
185    },
186    RefreshToken {
187        url: String,
188        token: String,
189    },
190    TrackMuted {
191        sid: String,
192        muted: bool,
193    },
194}
195
196/// Represents a running RtcSession with the ability to close the session
197/// and the engine_task
198#[derive(Debug)]
199struct EngineHandle {
200    session: Arc<RtcSession>,
201    closed: bool,
202    reconnecting: bool,
203    can_reconnect: bool,
204
205    // If full_reconnect is true, the next attempt will not try to resume
206    // and will instead do a full reconnect
207    full_reconnect: bool,
208    engine_task: Option<(JoinHandle<()>, oneshot::Sender<()>)>,
209}
210
211struct EngineInner {
212    // Keep a strong reference to LkRuntime to avoid creating a new RtcRuntime or PeerConnection
213    // factory accross multiple Rtc sessions
214    #[allow(dead_code)]
215    lk_runtime: Arc<LkRuntime>,
216    engine_tx: EngineEmitter,
217    options: EngineOptions,
218
219    close_notifier: Arc<Notify>,
220    running_handle: RwLock<EngineHandle>,
221
222    // The lock is write guarded for the whole reconnection time.
223    // We can simply wait for reconnection by trying to acquire a read lock.
224    // (This also prevents new reconnection to happens if a read guard is still held)
225    reconnecting_lock: AsyncRwLock<()>,
226    reconnecting_interval: AsyncMutex<Interval>,
227}
228
229pub struct RtcEngine {
230    inner: Arc<EngineInner>,
231}
232
233impl Debug for RtcEngine {
234    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235        f.debug_struct("RtcEngine").finish()
236    }
237}
238
239impl RtcEngine {
240    pub async fn connect(
241        url: &str,
242        token: &str,
243        options: EngineOptions,
244        e2ee_manager: Option<E2eeManager>,
245    ) -> EngineResult<(Self, proto::JoinResponse, EngineEvents)> {
246        let (inner, join_response, engine_events) =
247            EngineInner::connect(url, token, options, e2ee_manager).await?;
248        Ok((Self { inner }, join_response, engine_events))
249    }
250
251    pub async fn close(&self, reason: DisconnectReason) {
252        self.inner.close(reason).await
253    }
254
255    pub async fn publish_data(
256        &self,
257        data: proto::DataPacket,
258        kind: DataPacketKind,
259        is_raw_packet: bool,
260    ) -> EngineResult<()> {
261        let (session, _r_lock) = {
262            let (handle, _r_lock) = self.inner.wait_reconnection().await?;
263            (handle.session.clone(), _r_lock)
264        };
265        session.publish_data(data, kind, is_raw_packet).await
266    }
267
268    pub async fn simulate_scenario(&self, scenario: SimulateScenario) -> EngineResult<()> {
269        let (session, _r_lock) = {
270            let (handle, _r_lock) = self.inner.wait_reconnection().await?;
271            (handle.session.clone(), _r_lock)
272        };
273        session.simulate_scenario(scenario).await
274    }
275
276    pub async fn add_track(&self, req: proto::AddTrackRequest) -> EngineResult<proto::TrackInfo> {
277        let (session, _r_lock) = {
278            let (handle, _r_lock) = self.inner.wait_reconnection().await?;
279            (handle.session.clone(), _r_lock)
280        };
281        session.add_track(req).await
282    }
283
284    pub fn remove_track(&self, sender: RtpSender) -> EngineResult<()> {
285        // We don't need to wait for the reconnection
286        let session = self.inner.running_handle.read().session.clone();
287        session.remove_track(sender) // TODO(theomonnom): Ignore errors where this
288                                     // RtpSender is bound to the old session. (Can
289                                     // happen on bad timing and it is safe to ignore)
290    }
291
292    pub async fn mute_track(&self, req: proto::MuteTrackRequest) -> EngineResult<()> {
293        let (session, _r_lock) = {
294            let (handle, _r_lock) = self.inner.wait_reconnection().await?;
295            (handle.session.clone(), _r_lock)
296        };
297        session.mute_track(req).await
298    }
299
300    pub async fn create_sender(
301        &self,
302        track: LocalTrack,
303        options: TrackPublishOptions,
304        encodings: Vec<RtpEncodingParameters>,
305    ) -> EngineResult<RtpTransceiver> {
306        // When creating a new RtpSender, make sure we're always using the latest session
307        let (session, _r_lock) = {
308            let (handle, _r_lock) = self.inner.wait_reconnection().await?;
309            (handle.session.clone(), _r_lock)
310        };
311
312        session.create_sender(track, options, encodings).await
313    }
314
315    pub fn publisher_negotiation_needed(&self) {
316        let inner = self.inner.clone();
317        livekit_runtime::spawn(async move {
318            if let Ok((handle, _)) = inner.wait_reconnection().await {
319                handle.session.publisher_negotiation_needed()
320            }
321        });
322    }
323
324    pub async fn send_request(&self, msg: proto::signal_request::Message) {
325        // Getting the current session is OK to do without waiting for reconnection
326        // SignalClient will attempt to queue the message if the session is not connected
327        // Also on full_reconnect, every message is OK to ignore (Since this is another RtcSession)
328        let session = self.inner.running_handle.read().session.clone();
329        session.signal_client().send(msg).await // Returns () and automatically queues the message
330                                                // on fail
331    }
332
333    pub async fn get_response(&self, request_id: u32) -> proto::RequestResponse {
334        let session = self.inner.running_handle.read().session.clone();
335        session.get_response(request_id).await
336    }
337
338    pub async fn get_stats(&self) -> EngineResult<SessionStats> {
339        let session = self.inner.running_handle.read().session.clone();
340        session.get_stats().await
341    }
342
343    pub fn session(&self) -> Arc<RtcSession> {
344        self.inner.running_handle.read().session.clone()
345    }
346}
347
348impl EngineInner {
349    async fn connect(
350        url: &str,
351        token: &str,
352        options: EngineOptions,
353        e2ee_manager: Option<E2eeManager>,
354    ) -> EngineResult<(Arc<Self>, proto::JoinResponse, EngineEvents)> {
355        let lk_runtime = LkRuntime::instance();
356        let max_retries = options.join_retries;
357
358        let try_connect = {
359            move || {
360                let options = options.clone();
361                let lk_runtime = lk_runtime.clone();
362                let e2ee_manager = e2ee_manager.clone();
363                async move {
364                    let (session, join_response, session_events) =
365                        RtcSession::connect(url, token, options.clone(), e2ee_manager).await?;
366                    session.wait_pc_connection().await?;
367
368                    let (engine_tx, engine_rx) = mpsc::unbounded_channel();
369                    let inner = Arc::new(Self {
370                        lk_runtime,
371                        engine_tx,
372                        close_notifier: Arc::new(Notify::new()),
373                        running_handle: RwLock::new(EngineHandle {
374                            session: Arc::new(session),
375                            closed: false,
376                            reconnecting: false,
377                            can_reconnect: true,
378                            full_reconnect: false,
379                            engine_task: None,
380                        }),
381                        options,
382                        reconnecting_lock: AsyncRwLock::default(),
383                        reconnecting_interval: AsyncMutex::new(interval(RECONNECT_INTERVAL)),
384                    });
385
386                    // Start initial tasks
387                    let (close_tx, close_rx) = oneshot::channel();
388                    let session_task = livekit_runtime::spawn(Self::engine_task(
389                        inner.clone(),
390                        session_events,
391                        close_rx,
392                    ));
393                    inner.running_handle.write().engine_task = Some((session_task, close_tx));
394
395                    Ok((inner, join_response, engine_rx))
396                }
397            }
398        };
399
400        let mut last_error = None;
401        for i in 0..(max_retries + 1) {
402            match try_connect().await {
403                Ok(res) => return Ok(res),
404                Err(e) => {
405                    let attempt_i = i + 1;
406                    if i < max_retries {
407                        log::warn!(
408                            "failed to connect: {:?}, retrying... ({}/{})",
409                            e,
410                            attempt_i,
411                            max_retries
412                        );
413                    }
414                    last_error = Some(e)
415                }
416            }
417        }
418
419        Err(last_error.unwrap())
420    }
421
422    async fn engine_task(
423        self: Arc<Self>,
424        mut session_events: SessionEvents,
425        mut close_rx: oneshot::Receiver<()>,
426    ) {
427        loop {
428            tokio::select! {
429                Some(event) = session_events.recv() => {
430                    let debug = format!("{:?}", event);
431                    let inner = self.clone();
432                    let (tx, rx) = oneshot::channel();
433                    let task = livekit_runtime::spawn(async move {
434                        if let Err(err) = inner.on_session_event(event).await {
435                            log::error!("failed to handle session event: {:?}", err);
436                        }
437                        let _ = tx.send(());
438                    });
439
440                    // Monitor sync/async blockings
441                    tokio::select! {
442                        _ = rx => {},
443                        _ = livekit_runtime::sleep(Duration::from_secs(10)) => {
444                            log::error!("session_event is taking too much time: {}", debug);
445                        }
446                    }
447
448                    task.await;
449                },
450                 _ = &mut close_rx => {
451                    break;
452                }
453            }
454        }
455
456        log::debug!("engine task closed");
457    }
458
459    async fn on_session_event(self: &Arc<Self>, event: SessionEvent) -> EngineResult<()> {
460        match event {
461            SessionEvent::Close { source, reason, action, retry_now } => {
462                match action {
463                    proto::leave_request::Action::Resume
464                    | proto::leave_request::Action::Reconnect => {
465                        {
466                            let running_handle = self.running_handle.read();
467
468                            // server could have sent a leave & disconnected signal client
469                            // we don't want to start another resume cycle
470                            if !running_handle.can_reconnect {
471                                return Ok(());
472                            }
473                            // ensure we release the lock from this scope, it'll be used again in reconnection_needed
474                        }
475
476                        log::warn!(
477                            "received session close: {:?} {:?} {:?}",
478                            source,
479                            reason,
480                            action
481                        );
482                        self.reconnection_needed(
483                            retry_now,
484                            action == proto::leave_request::Action::Reconnect,
485                        );
486                    }
487                    proto::leave_request::Action::Disconnect => {
488                        // Disallow reconnection to avoid races
489                        let mut running_handle = self.running_handle.write();
490                        running_handle.can_reconnect = false;
491
492                        // Spawning a new task because the close function wait for the engine_task to
493                        // finish. (So it doesn't make sense to await it here)
494                        livekit_runtime::spawn({
495                            let inner = self.clone();
496                            async move {
497                                inner.close(reason).await;
498                            }
499                        });
500                    }
501                }
502            }
503            SessionEvent::Data {
504                participant_sid,
505                participant_identity,
506                payload,
507                topic,
508                kind,
509                encryption_type,
510            } => {
511                let _ = self.engine_tx.send(EngineEvent::Data {
512                    participant_sid,
513                    participant_identity,
514                    payload,
515                    topic,
516                    kind,
517                    encryption_type,
518                });
519            }
520            SessionEvent::ChatMessage { participant_identity, message } => {
521                let _ =
522                    self.engine_tx.send(EngineEvent::ChatMessage { participant_identity, message });
523            }
524            SessionEvent::SipDTMF { participant_identity, code, digit } => {
525                let _ =
526                    self.engine_tx.send(EngineEvent::SipDTMF { participant_identity, code, digit });
527            }
528            SessionEvent::Transcription { participant_identity, track_sid, segments } => {
529                let _ = self.engine_tx.send(EngineEvent::Transcription {
530                    participant_identity,
531                    track_sid,
532                    segments,
533                });
534            }
535            SessionEvent::SipDTMF { participant_identity, code, digit } => {
536                let _ =
537                    self.engine_tx.send(EngineEvent::SipDTMF { participant_identity, code, digit });
538            }
539            SessionEvent::RpcRequest {
540                caller_identity,
541                request_id,
542                method,
543                payload,
544                response_timeout,
545                version,
546            } => {
547                let _ = self.engine_tx.send(EngineEvent::RpcRequest {
548                    caller_identity,
549                    request_id,
550                    method,
551                    payload,
552                    response_timeout,
553                    version,
554                });
555            }
556            SessionEvent::RpcResponse { request_id, payload, error } => {
557                let _ =
558                    self.engine_tx.send(EngineEvent::RpcResponse { request_id, payload, error });
559            }
560            SessionEvent::RpcAck { request_id } => {
561                let _ = self.engine_tx.send(EngineEvent::RpcAck { request_id });
562            }
563            SessionEvent::MediaTrack { track, stream, transceiver } => {
564                let _ = self.engine_tx.send(EngineEvent::MediaTrack { track, stream, transceiver });
565            }
566            SessionEvent::ParticipantUpdate { updates } => {
567                let _ = self.engine_tx.send(EngineEvent::ParticipantUpdate { updates });
568            }
569            SessionEvent::SpeakersChanged { speakers } => {
570                let _ = self.engine_tx.send(EngineEvent::SpeakersChanged { speakers });
571            }
572            SessionEvent::ConnectionQuality { updates } => {
573                let _ = self.engine_tx.send(EngineEvent::ConnectionQuality { updates });
574            }
575            SessionEvent::RoomUpdate { room } => {
576                let _ = self.engine_tx.send(EngineEvent::RoomUpdate { room });
577            }
578            SessionEvent::RoomMoved { moved } => {
579                let _ = self.engine_tx.send(EngineEvent::RoomMoved { moved });
580            }
581            SessionEvent::LocalTrackSubscribed { track_sid } => {
582                let _ = self.engine_tx.send(EngineEvent::LocalTrackSubscribed { track_sid });
583            }
584            SessionEvent::DataStreamHeader { header, participant_identity, encryption_type } => {
585                let _ = self.engine_tx.send(EngineEvent::DataStreamHeader {
586                    header,
587                    participant_identity,
588                    encryption_type,
589                });
590            }
591            SessionEvent::DataStreamChunk { chunk, participant_identity, encryption_type } => {
592                let _ = self.engine_tx.send(EngineEvent::DataStreamChunk {
593                    chunk,
594                    participant_identity,
595                    encryption_type,
596                });
597            }
598            SessionEvent::DataStreamTrailer { trailer, participant_identity } => {
599                let _ = self
600                    .engine_tx
601                    .send(EngineEvent::DataStreamTrailer { trailer, participant_identity });
602            }
603            SessionEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold } => {
604                let _ = self.engine_tx.send(
605                    EngineEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold },
606                );
607            }
608            SessionEvent::RefreshToken { url, token } => {
609                let _ = self.engine_tx.send(EngineEvent::RefreshToken { url, token });
610            }
611            SessionEvent::TrackMuted { sid, muted } => {
612                let _ = self.engine_tx.send(EngineEvent::TrackMuted { sid, muted });
613            }
614        }
615        Ok(())
616    }
617
618    /// Close the engine
619    /// the RtcSession is not removed so we can still access stats for e.g
620    async fn close(&self, reason: DisconnectReason) {
621        let (session, engine_task) = {
622            let mut running_handle = self.running_handle.write();
623            running_handle.closed = true;
624
625            let session = running_handle.session.clone();
626            let engine_task = running_handle.engine_task.take();
627            (session, engine_task)
628        };
629
630        if let Some((engine_task, close_tx)) = engine_task {
631            session.close().await;
632            let _ = close_tx.send(());
633            let _ = engine_task.await;
634            let _ = self.engine_tx.send(EngineEvent::Disconnected { reason });
635        }
636    }
637
638    /// When waiting for reconnection, it ensures we're always using the latest session.
639    async fn wait_reconnection(
640        &self,
641    ) -> EngineResult<(RwLockReadGuard<EngineHandle>, AsyncRwLockReadGuard<()>)> {
642        let r_lock = self.reconnecting_lock.read().await;
643        let running_handle = self.running_handle.read();
644
645        if running_handle.closed {
646            // Reconnection may have failed
647            // TODO(theomonnom): More precise error?
648            return Err(EngineError::Connection("engine is closed".into()));
649        }
650
651        Ok((running_handle, r_lock))
652    }
653
654    /// Start the reconnect task if not already started
655    /// Ask to retry directly if `retry_now` is true
656    /// Ask for a full reconnect if `full_reconnect` is true
657    fn reconnection_needed(self: &Arc<Self>, retry_now: bool, full_reconnect: bool) {
658        let mut running_handle = self.running_handle.write();
659
660        if !running_handle.can_reconnect {
661            return;
662        }
663
664        if running_handle.reconnecting {
665            // If we're already reconnecting just update the interval to restart a new attempt
666            // ASAP
667
668            // Only escalate to full reconnect, never downgrade. Stale signal-close
669            // events (which request resume) must not override a full reconnect decision
670            // made by the reconnect loop after a failed resume attempt.
671            if full_reconnect {
672                running_handle.full_reconnect = true;
673            }
674
675            if retry_now {
676                let inner = self.clone();
677                livekit_runtime::spawn(async move {
678                    inner.reconnecting_interval.lock().await.reset();
679                });
680            }
681
682            return;
683        }
684
685        running_handle.reconnecting = true;
686        running_handle.full_reconnect = full_reconnect;
687
688        livekit_runtime::spawn({
689            let inner = self.clone();
690            async move {
691                // Hold the reconnection lock for the whole reconnection time
692                let _r_lock = inner.reconnecting_lock.write().await;
693                // The close function can send a signal to cancel the reconnection
694
695                let close_notifier = inner.close_notifier.clone();
696                let close_receiver = close_notifier.notified();
697                tokio::pin!(close_receiver);
698
699                tokio::select! {
700                    _ = &mut close_receiver => {
701                        log::debug!("reconnection cancelled");
702                        return;
703                    }
704                    res = inner.reconnect_task() => {
705                        if res.is_err() {
706                            log::error!("failed to reconnect to the livekit room");
707                            inner.close(DisconnectReason::UnknownReason).await;
708                        } else {
709                            log::info!("RtcEngine successfully recovered")
710                        }
711                    }
712                }
713
714                let mut running_handle = inner.running_handle.write();
715                running_handle.reconnecting = false;
716
717                // r_lock is now dropped
718            }
719        });
720    }
721
722    /// Runned every time the PeerConnection or the SignalClient is closed
723    /// We first try to resume the connection, if it fails, we start a full reconnect.
724    /// NOTE: The reconnect_task must be canncellation safe
725    async fn reconnect_task(self: &Arc<Self>) -> EngineResult<()> {
726        // Get the latest connection info from the signal_client (including the refreshed token
727        // because the initial join token may have expired)
728        let (url, token, e2ee_manager) = {
729            let running_handle = self.running_handle.read();
730            let signal_client = running_handle.session.signal_client();
731            let e2ee_manager = running_handle.session.e2ee_manager();
732            (
733                signal_client.url(),
734                signal_client.token(), // Refreshed token
735                e2ee_manager.clone(),
736            )
737        };
738
739        for i in 0..RECONNECT_ATTEMPTS {
740            let (is_closed, full_reconnect) = {
741                let running_handle = self.running_handle.read();
742                (running_handle.closed, running_handle.full_reconnect)
743            };
744
745            if is_closed {
746                return Err(EngineError::Connection("attempt canncelled, engine is closed".into()));
747            }
748
749            if full_reconnect {
750                if i == 0 {
751                    let (tx, rx) = oneshot::channel();
752                    let _ = self.engine_tx.send(EngineEvent::Restarting(tx));
753                    let _ = rx.await;
754                }
755
756                log::error!("restarting connection... attempt: {}", i);
757                if let Err(err) = self
758                    .try_restart_connection(
759                        &url,
760                        &token,
761                        self.options.clone(),
762                        e2ee_manager.clone(),
763                    )
764                    .await
765                {
766                    log::error!("restarting connection failed: {}", err);
767                } else {
768                    let (tx, rx) = oneshot::channel();
769                    let _ = self.engine_tx.send(EngineEvent::Restarted(tx));
770                    let _ = rx.await;
771                    return Ok(());
772                }
773            } else {
774                if i == 0 {
775                    let (tx, rx) = oneshot::channel();
776                    let _ = self.engine_tx.send(EngineEvent::Resuming(tx));
777                    let _ = rx.await;
778                }
779
780                log::error!("resuming connection... attempt: {}", i);
781                if let Err(err) = self.try_resume_connection().await {
782                    log::error!("resuming connection failed: {}", err);
783                    let mut running_handle = self.running_handle.write();
784                    running_handle.full_reconnect = true;
785                } else {
786                    let (tx, rx) = oneshot::channel();
787                    let _ = self.engine_tx.send(EngineEvent::Resumed(tx));
788                    let _ = rx.await;
789                    return Ok(());
790                }
791            }
792
793            self.reconnecting_interval.lock().await.tick().await;
794        }
795
796        Err(EngineError::Connection(
797            format!("failed to reconnect after {}", RECONNECT_ATTEMPTS).into(),
798        ))
799    }
800
801    /// Try to recover the connection by doing a full reconnect.
802    /// It recreates a new RtcSession (new peer connection, new signal client, new data channels,
803    /// etc...)
804    async fn try_restart_connection(
805        self: &Arc<Self>,
806        url: &str,
807        token: &str,
808        options: EngineOptions,
809        e2ee_manager: Option<E2eeManager>,
810    ) -> EngineResult<()> {
811        // Close the current RtcSession and the current tasks
812        let (session, engine_task) = {
813            let mut running_handle = self.running_handle.write();
814            let session = running_handle.session.clone();
815            let engine_task = running_handle.engine_task.take();
816            (session, engine_task)
817        };
818
819        if let Some((engine_task, close_tx)) = engine_task {
820            session.close().await;
821            let _ = close_tx.send(());
822            let _ = engine_task.await;
823        }
824
825        let (new_session, join_response, session_events) =
826            RtcSession::connect(url, token, options, e2ee_manager).await?;
827
828        // On SignalRestarted, the room will try to unpublish the local tracks
829        // NOTE: Doing operations that use rtc_session will not use the new one
830        let (tx, rx) = oneshot::channel();
831        let _ = self.engine_tx.send(EngineEvent::SignalRestarted { join_response, tx });
832        let _ = rx.await;
833
834        new_session.wait_pc_connection().await?;
835
836        // Only replace the current session if the new one succeed
837        // This is important so we can still use the old session if the new one failed
838        // (for example, this is important if we still want to get the stats of the old session)
839        // This has the drawback to not being able to use the new session on the SignalRestarted
840        // event.
841        let mut handle = self.running_handle.write();
842        handle.session = Arc::new(new_session);
843
844        let (close_tx, close_rx) = oneshot::channel();
845        let task = livekit_runtime::spawn(self.clone().engine_task(session_events, close_rx));
846        handle.engine_task = Some((task, close_tx));
847
848        Ok(())
849    }
850
851    /// Try to restart the current session
852    async fn try_resume_connection(&self) -> EngineResult<()> {
853        let session = self.running_handle.read().session.clone();
854        let reconnect_response = session.restart().await?;
855
856        let (tx, rx) = oneshot::channel();
857        let _ = self.engine_tx.send(EngineEvent::SignalResumed { reconnect_response, tx });
858
859        // With SignalResumed, the room will send a SyncState message to the server
860        let _ = rx.await;
861
862        // The publisher offer must be sent AFTER the SyncState message
863        session.restart_publisher().await?;
864        session.wait_pc_connection().await
865    }
866}