Skip to main content

gosuto_livekit/room/
mod.rs

1// Copyright 2025 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bmrng::unbounded::UnboundedRequestReceiver;
16use gosuto_libwebrtc::{
17    native::frame_cryptor::EncryptionState,
18    prelude::{
19        ContinualGatheringPolicy, IceTransportsType, MediaStream, MediaStreamTrack,
20        RtcConfiguration,
21    },
22    rtp_transceiver::RtpTransceiver,
23    RtcError,
24};
25use livekit_api::signal_client::{SignalOptions, SignalSdkOptions, SIGNAL_CONNECT_TIMEOUT};
26use livekit_protocol::observer::Dispatcher;
27use livekit_protocol::{self as proto, encryption};
28use livekit_runtime::JoinHandle;
29use parking_lot::RwLock;
30pub use proto::DisconnectReason;
31use proto::{promise::Promise, SignalTarget};
32use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration};
33use thiserror::Error;
34use tokio::sync::{
35    broadcast,
36    mpsc::{self, UnboundedReceiver},
37    oneshot, Mutex as AsyncMutex,
38};
39pub use utils::take_cell::TakeCell;
40
41pub use self::{
42    data_stream::*,
43    e2ee::{manager::E2eeManager, E2eeOptions},
44    participant::{ParticipantKind, ParticipantKindDetail},
45};
46pub use crate::rtc_engine::SimulateScenario;
47use crate::{
48    participant::ConnectionQuality,
49    prelude::*,
50    registered_audio_filter_plugins,
51    rtc_engine::{
52        EngineError, EngineEvent, EngineEvents, EngineOptions, EngineResult, RtcEngine,
53        SessionStats, INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD,
54    },
55};
56
57pub mod data_stream;
58pub mod e2ee;
59pub mod id;
60pub mod options;
61pub mod participant;
62pub mod publication;
63pub mod track;
64pub(crate) mod utils;
65
66pub const SDK_VERSION: &str = env!("CARGO_PKG_VERSION");
67
68pub type RoomResult<T> = Result<T, RoomError>;
69
70#[derive(Error, Debug)]
71pub enum RoomError {
72    #[error("engine: {0}")]
73    Engine(#[from] EngineError),
74    #[error("room failure: {0}")]
75    Internal(String),
76    #[error("rtc: {0}")]
77    Rtc(#[from] RtcError),
78    #[error("this track or a track of the same source is already published")]
79    TrackAlreadyPublished,
80    #[error("already closed")]
81    AlreadyClosed,
82    #[error("request error: {reason:?} - {message}")]
83    Request { reason: proto::request_response::Reason, message: String },
84}
85
86#[derive(Clone, Debug)]
87#[non_exhaustive]
88pub enum RoomEvent {
89    ParticipantConnected(RemoteParticipant),
90    ParticipantDisconnected(RemoteParticipant),
91    LocalTrackPublished {
92        publication: LocalTrackPublication,
93        track: LocalTrack,
94        participant: LocalParticipant,
95    },
96    LocalTrackUnpublished {
97        publication: LocalTrackPublication,
98        participant: LocalParticipant,
99    },
100    LocalTrackSubscribed {
101        track: LocalTrack,
102    },
103    TrackSubscribed {
104        track: RemoteTrack,
105        publication: RemoteTrackPublication,
106        participant: RemoteParticipant,
107    },
108    TrackUnsubscribed {
109        track: RemoteTrack,
110        publication: RemoteTrackPublication,
111        participant: RemoteParticipant,
112    },
113    TrackSubscriptionFailed {
114        participant: RemoteParticipant,
115        error: track::TrackError,
116        track_sid: TrackSid,
117    },
118    TrackPublished {
119        publication: RemoteTrackPublication,
120        participant: RemoteParticipant,
121    },
122    TrackUnpublished {
123        publication: RemoteTrackPublication,
124        participant: RemoteParticipant,
125    },
126    TrackMuted {
127        participant: Participant,
128        publication: TrackPublication,
129    },
130    TrackUnmuted {
131        participant: Participant,
132        publication: TrackPublication,
133    },
134    RoomMetadataChanged {
135        old_metadata: String,
136        metadata: String,
137    },
138    ParticipantMetadataChanged {
139        participant: Participant,
140        old_metadata: String,
141        metadata: String,
142    },
143    ParticipantNameChanged {
144        participant: Participant,
145        old_name: String,
146        name: String,
147    },
148    ParticipantAttributesChanged {
149        participant: Participant,
150        changed_attributes: HashMap<String, String>,
151    },
152    ParticipantEncryptionStatusChanged {
153        participant: Participant,
154        is_encrypted: bool,
155    },
156    ParticipantPermissionChanged {
157        participant: Participant,
158        permission: Option<proto::ParticipantPermission>,
159    },
160    ActiveSpeakersChanged {
161        speakers: Vec<Participant>,
162    },
163    ConnectionQualityChanged {
164        quality: ConnectionQuality,
165        participant: Participant,
166    },
167    DataReceived {
168        payload: Arc<Vec<u8>>,
169        topic: Option<String>,
170        kind: DataPacketKind,
171        participant: Option<RemoteParticipant>,
172    },
173    TranscriptionReceived {
174        participant: Option<Participant>,
175        track_publication: Option<TrackPublication>,
176        segments: Vec<TranscriptionSegment>,
177    },
178    SipDTMFReceived {
179        code: u32,
180        digit: Option<String>,
181        participant: Option<RemoteParticipant>,
182    },
183    ChatMessage {
184        message: ChatMessage,
185        participant: Option<RemoteParticipant>,
186    },
187    ByteStreamOpened {
188        reader: TakeCell<ByteStreamReader>,
189        topic: String,
190        participant_identity: ParticipantIdentity,
191    },
192    TextStreamOpened {
193        reader: TakeCell<TextStreamReader>,
194        topic: String,
195        participant_identity: ParticipantIdentity,
196    },
197    #[deprecated(note = "Use high-level data streams API instead.")]
198    StreamHeaderReceived {
199        header: proto::data_stream::Header,
200        participant_identity: String,
201    },
202    #[deprecated(note = "Use high-level data streams API instead.")]
203    StreamChunkReceived {
204        chunk: proto::data_stream::Chunk,
205        participant_identity: String,
206    },
207    #[deprecated(note = "Use high-level data streams API instead.")]
208    StreamTrailerReceived {
209        trailer: proto::data_stream::Trailer,
210        participant_identity: String,
211    },
212    E2eeStateChanged {
213        participant: Participant,
214        state: EncryptionState,
215    },
216    ConnectionStateChanged(ConnectionState),
217    Connected {
218        /// Initial participants & their tracks prior to joining the room
219        /// We're not returning this directly inside Room::connect because it is unlikely to be
220        /// used
221        participants_with_tracks: Vec<(RemoteParticipant, Vec<RemoteTrackPublication>)>,
222    },
223    Disconnected {
224        reason: DisconnectReason,
225    },
226    Reconnecting,
227    Reconnected,
228    DataChannelBufferedAmountLowThresholdChanged {
229        kind: DataPacketKind,
230        threshold: u64,
231    },
232    RoomUpdated {
233        room: RoomInfo,
234    },
235    Moved {
236        room: RoomInfo,
237    },
238    ParticipantsUpdated {
239        participants: Vec<Participant>,
240    },
241    TokenRefreshed {
242        token: String,
243    },
244}
245
246#[derive(Debug, Clone, Copy, Eq, PartialEq)]
247pub enum ConnectionState {
248    Disconnected,
249    Connected,
250    Reconnecting,
251}
252
253#[derive(Debug, Clone, Copy, Eq, PartialEq)]
254pub enum DataPacketKind {
255    Lossy,
256    Reliable,
257}
258
259#[derive(Debug, Clone)]
260pub struct DataPacket {
261    pub payload: Vec<u8>,
262    pub topic: Option<String>,
263    pub reliable: bool,
264    pub destination_identities: Vec<ParticipantIdentity>,
265}
266
267impl Default for DataPacket {
268    fn default() -> Self {
269        Self {
270            payload: Vec::new(),
271            topic: None,
272            reliable: false,
273            destination_identities: Vec::new(),
274        }
275    }
276}
277
278#[derive(Default, Debug, Clone)]
279pub struct Transcription {
280    pub participant_identity: String,
281    pub track_id: String,
282    pub segments: Vec<TranscriptionSegment>,
283}
284
285#[derive(Default, Debug, Clone)]
286pub struct TranscriptionSegment {
287    pub id: String,
288    pub text: String,
289    pub start_time: u64,
290    pub end_time: u64,
291    pub r#final: bool,
292    pub language: String,
293}
294
295#[derive(Default, Debug, Clone)]
296pub struct SipDTMF {
297    pub code: u32,
298    pub digit: String,
299    pub destination_identities: Vec<ParticipantIdentity>,
300}
301
302#[derive(Default, Debug, Clone)]
303pub struct ChatMessage {
304    pub id: String,
305    pub message: String,
306    pub timestamp: i64,
307    pub edit_timestamp: Option<i64>,
308    pub deleted: Option<bool>,
309    pub generated: Option<bool>,
310}
311
312#[derive(Debug, Clone)]
313pub struct RpcRequest {
314    pub destination_identity: String,
315    pub id: String,
316    pub method: String,
317    pub payload: String,
318    pub response_timeout: Duration,
319    pub version: u32,
320}
321
322#[derive(Debug, Clone)]
323pub struct RpcResponse {
324    destination_identity: String,
325    request_id: String,
326    payload: Option<String>,
327    error: Option<proto::RpcError>,
328}
329
330#[derive(Debug, Clone)]
331pub struct RpcAck {
332    destination_identity: String,
333    request_id: String,
334}
335
336#[derive(Debug, Clone)]
337#[non_exhaustive]
338pub struct RoomSdkOptions {
339    pub sdk: String,
340    pub sdk_version: String,
341}
342
343impl Default for RoomSdkOptions {
344    fn default() -> Self {
345        Self { sdk: "rust".to_string(), sdk_version: SDK_VERSION.to_string() }
346    }
347}
348
349impl From<RoomSdkOptions> for SignalSdkOptions {
350    fn from(options: RoomSdkOptions) -> Self {
351        let mut sdk_options = SignalSdkOptions::default();
352        sdk_options.sdk = options.sdk;
353        sdk_options.sdk_version = Some(options.sdk_version);
354        sdk_options
355    }
356}
357
358#[derive(Debug, Clone)]
359#[non_exhaustive]
360pub struct RoomOptions {
361    pub auto_subscribe: bool,
362    pub adaptive_stream: bool,
363    pub dynacast: bool,
364    // TODO: link to encryption docs in deprecation notice once available
365    #[deprecated(note = "Use `encryption` field instead")]
366    pub e2ee: Option<E2eeOptions>,
367    pub encryption: Option<E2eeOptions>,
368    pub rtc_config: RtcConfiguration,
369    pub join_retries: u32,
370    pub sdk_options: RoomSdkOptions,
371    /// Enable single peer connection mode. When true, uses one RTCPeerConnection
372    /// for both publishing and subscribing instead of two separate connections.
373    /// Falls back to dual peer connection if the server doesn't support single PC.
374    pub single_peer_connection: bool,
375    /// Timeout for each individual signal connection attempt
376    pub connect_timeout: Duration,
377}
378
379impl Default for RoomOptions {
380    fn default() -> Self {
381        Self {
382            auto_subscribe: true,
383            adaptive_stream: false,
384            dynacast: false,
385            e2ee: None,
386            encryption: None,
387
388            // Explicitly set the default values
389            rtc_config: RtcConfiguration {
390                ice_servers: vec![], /* When empty, this will automatically be filled by the
391                                      * JoinResponse */
392                continual_gathering_policy: ContinualGatheringPolicy::GatherContinually,
393                ice_transport_type: IceTransportsType::All,
394            },
395            join_retries: 3,
396            sdk_options: RoomSdkOptions::default(),
397            single_peer_connection: false,
398            connect_timeout: SIGNAL_CONNECT_TIMEOUT,
399        }
400    }
401}
402
403pub struct Room {
404    inner: Arc<RoomSession>,
405}
406
407impl Debug for Room {
408    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
409        f.debug_struct("Room")
410            .field("sid", &self.maybe_sid())
411            .field("name", &self.name())
412            .field("connection_state", &self.connection_state())
413            .finish()
414    }
415}
416
417#[derive(Clone, Debug)]
418pub struct RoomInfo {
419    pub sid: Option<RoomSid>,
420    pub name: String,
421    pub metadata: String,
422    pub state: ConnectionState,
423    pub lossy_dc_options: DataChannelOptions,
424    pub reliable_dc_options: DataChannelOptions,
425    pub empty_timeout: u32,
426    pub departure_timeout: u32,
427    pub max_participants: u32,
428    pub creation_time: i64,
429    pub num_publishers: u32,
430    pub num_participants: u32,
431    pub active_recording: bool,
432}
433
434#[derive(Clone, Debug)]
435pub struct DataChannelOptions {
436    pub buffered_amount_low_threshold: u64,
437}
438
439impl Default for DataChannelOptions {
440    fn default() -> Self {
441        Self { buffered_amount_low_threshold: INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD }
442    }
443}
444
445pub(crate) struct RoomSession {
446    rtc_engine: Arc<RtcEngine>,
447    sid_promise: Promise<RoomSid>,
448    info: RwLock<RoomInfo>,
449    dispatcher: Dispatcher<RoomEvent>,
450    options: RoomOptions,
451    active_speakers: RwLock<Vec<Participant>>,
452    local_participant: LocalParticipant,
453    remote_participants: RwLock<HashMap<ParticipantIdentity, RemoteParticipant>>,
454    e2ee_manager: E2eeManager,
455    incoming_stream_manager: IncomingStreamManager,
456    outgoing_stream_manager: OutgoingStreamManager,
457    handle: AsyncMutex<Option<Handle>>,
458}
459
460struct Handle {
461    room_handle: JoinHandle<()>,
462    incoming_stream_handle: JoinHandle<()>,
463    outgoing_stream_handle: JoinHandle<()>,
464    close_tx: broadcast::Sender<()>,
465}
466
467impl Debug for RoomSession {
468    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469        let info = self.info.read();
470        f.debug_struct("SessionInner")
471            .field("sid", &info.sid)
472            .field("name", &info.name)
473            .field("rtc_engine", &self.rtc_engine)
474            .finish()
475    }
476}
477
478impl Room {
479    pub async fn connect(
480        url: &str,
481        token: &str,
482        mut options: RoomOptions,
483    ) -> RoomResult<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
484        // TODO(theomonnom): move connection logic to the RoomSession
485        let with_dc_encryption = options.encryption.is_some();
486        let encryption_options = options.encryption.take().or(options.e2ee.take());
487        let e2ee_manager = E2eeManager::new(encryption_options, with_dc_encryption);
488        let mut signal_options = SignalOptions::default();
489        signal_options.sdk_options = options.sdk_options.clone().into();
490        signal_options.auto_subscribe = options.auto_subscribe;
491        signal_options.adaptive_stream = options.adaptive_stream;
492        signal_options.single_peer_connection = options.single_peer_connection;
493        signal_options.connect_timeout = options.connect_timeout;
494        let (rtc_engine, join_response, engine_events) = RtcEngine::connect(
495            url,
496            token,
497            EngineOptions {
498                rtc_config: options.rtc_config.clone(),
499                signal_options,
500                join_retries: options.join_retries,
501                single_peer_connection: options.single_peer_connection,
502            },
503            Some(e2ee_manager.clone()),
504        )
505        .await?;
506        let rtc_engine = Arc::new(rtc_engine);
507
508        if let Some(key_provider) = e2ee_manager.key_provider() {
509            key_provider.set_sif_trailer(join_response.sif_trailer);
510        }
511
512        let pi = join_response.participant.unwrap();
513        let local_participant = LocalParticipant::new(
514            rtc_engine.clone(),
515            pi.kind().into(),
516            utils::convert_kind_details(&pi.kind_details),
517            pi.sid.try_into().unwrap(),
518            pi.identity.into(),
519            pi.name,
520            pi.metadata,
521            pi.attributes,
522            e2ee_manager.encryption_type(),
523            pi.permission,
524        );
525
526        let dispatcher = Dispatcher::<RoomEvent>::default();
527        local_participant.on_local_track_published({
528            let dispatcher = dispatcher.clone();
529            let e2ee_manager = e2ee_manager.clone();
530            move |participant, publication| {
531                log::debug!("local track published: {}", publication.sid());
532                let track = publication.track().unwrap();
533                let event = RoomEvent::LocalTrackPublished {
534                    participant: participant.clone(),
535                    publication: publication.clone(),
536                    track: track.clone(),
537                };
538                e2ee_manager.on_local_track_published(track, publication, participant);
539                dispatcher.dispatch(&event);
540            }
541        });
542
543        local_participant.on_local_track_unpublished({
544            let dispatcher = dispatcher.clone();
545            let e2ee_manager = e2ee_manager.clone();
546            move |participant, publication| {
547                log::debug!("local track unpublished: {}", publication.sid());
548                let event = RoomEvent::LocalTrackUnpublished {
549                    participant: participant.clone(),
550                    publication: publication.clone(),
551                };
552                e2ee_manager.on_local_track_unpublished(publication, participant);
553                dispatcher.dispatch(&event);
554            }
555        });
556
557        local_participant.on_track_muted({
558            let dispatcher = dispatcher.clone();
559            move |participant, publication| {
560                let event = RoomEvent::TrackMuted { participant, publication };
561                dispatcher.dispatch(&event);
562            }
563        });
564
565        local_participant.on_track_unmuted({
566            let dispatcher = dispatcher.clone();
567            move |participant, publication| {
568                let event = RoomEvent::TrackUnmuted { participant, publication };
569                dispatcher.dispatch(&event);
570            }
571        });
572
573        local_participant.on_metadata_changed({
574            let dispatcher = dispatcher.clone();
575            move |participant, old_metadata, metadata| {
576                let event =
577                    RoomEvent::ParticipantMetadataChanged { participant, old_metadata, metadata };
578                dispatcher.dispatch(&event);
579            }
580        });
581
582        local_participant.on_name_changed({
583            let dispatcher = dispatcher.clone();
584            move |participant, old_name, name| {
585                let event = RoomEvent::ParticipantNameChanged { participant, old_name, name };
586                dispatcher.dispatch(&event);
587            }
588        });
589
590        local_participant.on_attributes_changed({
591            let dispatcher = dispatcher.clone();
592            move |participant, changed_attributes| {
593                let event =
594                    RoomEvent::ParticipantAttributesChanged { participant, changed_attributes };
595                dispatcher.dispatch(&event);
596            }
597        });
598
599        local_participant.on_permission_changed({
600            let dispatcher = dispatcher.clone();
601            move |participant, permission| {
602                let event = RoomEvent::ParticipantPermissionChanged { participant, permission };
603                dispatcher.dispatch(&event);
604            }
605        });
606
607        let (incoming_stream_manager, open_rx) = IncomingStreamManager::new();
608        let (outgoing_stream_manager, packet_rx) = OutgoingStreamManager::new();
609
610        let room_info = join_response.room.unwrap();
611        let inner = Arc::new(RoomSession {
612            sid_promise: Promise::new(),
613            info: RwLock::new(RoomInfo {
614                sid: room_info.sid.try_into().ok(),
615                name: room_info.name,
616                metadata: room_info.metadata,
617                empty_timeout: room_info.empty_timeout,
618                departure_timeout: room_info.departure_timeout,
619                max_participants: room_info.max_participants,
620                creation_time: room_info.creation_time_ms,
621                num_publishers: room_info.num_publishers,
622                num_participants: room_info.num_participants,
623                active_recording: room_info.active_recording,
624                state: ConnectionState::Disconnected,
625                lossy_dc_options: Default::default(),
626                reliable_dc_options: Default::default(),
627            }),
628            remote_participants: Default::default(),
629            active_speakers: Default::default(),
630            options: options.clone(),
631            rtc_engine: rtc_engine.clone(),
632            local_participant,
633            dispatcher: dispatcher.clone(),
634            e2ee_manager: e2ee_manager.clone(),
635            incoming_stream_manager,
636            outgoing_stream_manager,
637            handle: Default::default(),
638        });
639        inner.local_participant.set_session(Arc::downgrade(&inner));
640
641        e2ee_manager.on_state_changed({
642            let dispatcher = dispatcher.clone();
643            let inner = inner.clone();
644            move |participant_identity, state| {
645                // Forward e2ee events to the room
646                // (Ignore if the participant is not in the room anymore)
647
648                let participant = if participant_identity.as_str()
649                    == inner.local_participant.identity().as_str()
650                {
651                    Participant::Local(inner.local_participant.clone())
652                } else if let Some(participant) =
653                    inner.remote_participants.read().get(&participant_identity)
654                {
655                    Participant::Remote(participant.clone())
656                } else {
657                    // Ignore if the participant is disconnected (can happens on bad timing)
658                    return;
659                };
660
661                dispatcher.dispatch(&RoomEvent::E2eeStateChanged { participant, state });
662            }
663        });
664
665        for pi in join_response.other_participants {
666            let participant = {
667                let pi = pi.clone();
668                inner.create_participant(
669                    pi.kind().into(),
670                    utils::convert_kind_details(&pi.kind_details),
671                    pi.sid.try_into().unwrap(),
672                    pi.identity.into(),
673                    pi.name,
674                    pi.metadata,
675                    pi.attributes,
676                    pi.permission,
677                )
678            };
679            participant.update_info(pi.clone());
680        }
681
682        // Get the initial states (Can be useful on some usecases, like the FfiServer)
683        // Getting them here ensure nothing happening before (Like a new participant joining)
684        // because the room task is not started yet
685        let participants = inner.remote_participants.read().clone();
686        let participants_with_tracks = participants
687            .into_values()
688            .map(|p| (p.clone(), p.track_publications().into_values().collect()))
689            .collect();
690
691        let events = inner.dispatcher.register();
692        inner.dispatcher.dispatch(&RoomEvent::Connected { participants_with_tracks });
693        inner.update_connection_state(ConnectionState::Connected);
694
695        let (close_tx, close_rx) = broadcast::channel(1);
696
697        let incoming_stream_handle = livekit_runtime::spawn(incoming_data_stream_task(
698            open_rx,
699            dispatcher.clone(),
700            close_rx.resubscribe(),
701        ));
702        let outgoing_stream_handle = livekit_runtime::spawn(outgoing_data_stream_task(
703            packet_rx,
704            rtc_engine.clone(),
705            close_rx.resubscribe(),
706        ));
707
708        let room_handle = livekit_runtime::spawn(inner.clone().room_task(engine_events, close_rx));
709
710        let handle =
711            Handle { room_handle, incoming_stream_handle, outgoing_stream_handle, close_tx };
712        inner.handle.lock().await.replace(handle);
713
714        Ok((Self { inner }, events))
715    }
716
717    pub async fn close(&self) -> RoomResult<()> {
718        self.inner.close(DisconnectReason::ClientInitiated).await
719    }
720
721    pub async fn simulate_scenario(&self, scenario: SimulateScenario) -> EngineResult<()> {
722        self.inner.rtc_engine.simulate_scenario(scenario).await
723    }
724
725    pub async fn get_stats(&self) -> EngineResult<SessionStats> {
726        self.inner.rtc_engine.get_stats().await
727    }
728
729    pub fn subscribe(&self) -> mpsc::UnboundedReceiver<RoomEvent> {
730        self.inner.dispatcher.register()
731    }
732
733    pub async fn sid(&self) -> RoomSid {
734        // sid could have been updated due to room move
735        let sid = self.inner.info.read().sid.clone();
736        if sid.is_none() {
737            return self.inner.sid_promise.result().await;
738        }
739        sid.unwrap()
740    }
741
742    pub fn maybe_sid(&self) -> Option<RoomSid> {
743        self.inner.info.read().sid.clone()
744    }
745
746    pub fn name(&self) -> String {
747        self.inner.info.read().name.clone()
748    }
749
750    pub fn metadata(&self) -> String {
751        self.inner.info.read().metadata.clone()
752    }
753
754    pub fn local_participant(&self) -> LocalParticipant {
755        self.inner.local_participant.clone()
756    }
757
758    pub fn connection_state(&self) -> ConnectionState {
759        self.inner.info.read().state
760    }
761
762    /// Returns whether the room is currently using single peer connection signaling.
763    /// If requested but not supported by server, this will be false after v0 fallback.
764    pub fn is_single_peer_connection_active(&self) -> bool {
765        self.inner.rtc_engine.session().is_single_pc_mode()
766    }
767
768    pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
769        self.inner.remote_participants.read().clone()
770    }
771
772    pub fn e2ee_manager(&self) -> &E2eeManager {
773        &self.inner.e2ee_manager
774    }
775
776    pub fn data_channel_options(&self, kind: DataPacketKind) -> DataChannelOptions {
777        match kind {
778            DataPacketKind::Lossy => self.inner.info.read().lossy_dc_options.clone(),
779            DataPacketKind::Reliable => self.inner.info.read().reliable_dc_options.clone(),
780        }
781    }
782
783    pub fn empty_timeout(&self) -> u32 {
784        self.inner.info.read().empty_timeout
785    }
786
787    pub fn departure_timeout(&self) -> u32 {
788        self.inner.info.read().departure_timeout
789    }
790
791    pub fn max_participants(&self) -> u32 {
792        self.inner.info.read().max_participants
793    }
794    /// Returns the room creation time in milliseconds since Unix epoch.
795    pub fn creation_time(&self) -> i64 {
796        self.inner.info.read().creation_time
797    }
798
799    pub fn num_participants(&self) -> u32 {
800        self.inner.info.read().num_participants
801    }
802
803    pub fn num_publishers(&self) -> u32 {
804        self.inner.info.read().num_publishers
805    }
806
807    pub fn active_recording(&self) -> bool {
808        self.inner.info.read().active_recording
809    }
810}
811
812impl RoomSession {
813    async fn room_task(
814        self: Arc<Self>,
815        mut engine_events: EngineEvents,
816        mut close_rx: broadcast::Receiver<()>,
817    ) {
818        loop {
819            tokio::select! {
820                Some(event) = engine_events.recv() => {
821                    let debug = format!("{:?}", event);
822                    let inner = self.clone();
823                    let (tx, rx) = oneshot::channel();
824                    let task = livekit_runtime::spawn(async move {
825                        if let Err(err) = inner.on_engine_event(event).await {
826                            log::error!("failed to handle engine event: {:?}", err);
827                        }
828                        let _ = tx.send(());
829                    });
830
831                    // Monitor sync/async blockings
832                    tokio::select! {
833                        _ = rx => {},
834                        _ = livekit_runtime::sleep(Duration::from_secs(10)) => {
835                            log::error!("engine_event is taking too much time: {}", debug);
836                        }
837                    }
838
839                    task.await;
840                },
841                _ = close_rx.recv() => {
842                    break;
843                }
844            }
845        }
846
847        log::debug!("room_task closed");
848    }
849
850    async fn on_engine_event(self: &Arc<Self>, event: EngineEvent) -> RoomResult<()> {
851        match event {
852            EngineEvent::ParticipantUpdate { updates } => self.handle_participant_update(updates),
853            EngineEvent::MediaTrack { track, stream, transceiver } => {
854                self.handle_media_track(track, stream, transceiver)
855            }
856            EngineEvent::RoomUpdate { room } => self.handle_room_update(room),
857            EngineEvent::RoomMoved { moved } => self.handle_room_moved(moved),
858            EngineEvent::Resuming(tx) => self.handle_resuming(tx),
859            EngineEvent::Resumed(tx) => self.handle_resumed(tx),
860            EngineEvent::SignalResumed { reconnect_response, tx } => {
861                self.handle_signal_resumed(reconnect_response, tx)
862            }
863            EngineEvent::Restarting(tx) => self.handle_restarting(tx),
864            EngineEvent::Restarted(tx) => self.handle_restarted(tx),
865            EngineEvent::SignalRestarted { join_response, tx } => {
866                self.handle_signal_restarted(join_response, tx)
867            }
868            EngineEvent::Disconnected { reason } => self.handle_disconnected(reason),
869            EngineEvent::Data {
870                payload,
871                topic,
872                kind,
873                participant_sid,
874                participant_identity,
875                encryption_type,
876            } => {
877                self.handle_data(
878                    payload,
879                    topic,
880                    kind,
881                    participant_sid,
882                    participant_identity,
883                    encryption_type,
884                );
885            }
886            EngineEvent::ChatMessage { participant_identity, message } => {
887                self.handle_chat_message(participant_identity, message);
888            }
889            EngineEvent::Transcription { participant_identity, track_sid, segments } => {
890                self.handle_transcription(participant_identity, track_sid, segments);
891            }
892            EngineEvent::SipDTMF { code, digit, participant_identity } => {
893                self.handle_dtmf(code, digit, participant_identity);
894            }
895            EngineEvent::RpcRequest {
896                caller_identity,
897                request_id,
898                method,
899                payload,
900                response_timeout,
901                version,
902            } => {
903                if caller_identity.is_none() {
904                    log::warn!("Received RPC request with null caller identity");
905                    return Ok(());
906                }
907                let local_participant = self.local_participant.clone();
908                livekit_runtime::spawn(async move {
909                    local_participant
910                        .handle_incoming_rpc_request(
911                            caller_identity.unwrap(),
912                            request_id,
913                            method,
914                            payload,
915                            response_timeout,
916                            version,
917                        )
918                        .await;
919                });
920            }
921            EngineEvent::RpcResponse { request_id, payload, error } => {
922                self.local_participant.handle_incoming_rpc_response(request_id, payload, error);
923            }
924            EngineEvent::RpcAck { request_id } => {
925                self.local_participant.handle_incoming_rpc_ack(request_id);
926            }
927            EngineEvent::SpeakersChanged { speakers } => self.handle_speakers_changed(speakers),
928            EngineEvent::ConnectionQuality { updates } => {
929                self.handle_connection_quality_update(updates)
930            }
931            EngineEvent::LocalTrackSubscribed { track_sid } => {
932                self.handle_track_subscribed(track_sid)
933            }
934            EngineEvent::DataStreamHeader { header, participant_identity, encryption_type } => {
935                self.handle_data_stream_header(header, participant_identity, encryption_type);
936            }
937            EngineEvent::DataStreamChunk { chunk, participant_identity, encryption_type } => {
938                self.handle_data_stream_chunk(chunk, participant_identity, encryption_type);
939            }
940            EngineEvent::DataStreamTrailer { trailer, participant_identity } => {
941                self.handle_data_stream_trailer(trailer, participant_identity);
942            }
943            EngineEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold } => {
944                self.handle_data_channel_buffered_low_threshold_change(kind, threshold);
945            }
946            EngineEvent::RefreshToken { url, token } => {
947                self.handle_refresh_token(url, token);
948            }
949            EngineEvent::TrackMuted { sid, muted } => {
950                self.handle_server_initiated_mute_track(sid, muted);
951            }
952            _ => {}
953        }
954
955        Ok(())
956    }
957
958    async fn close(&self, reason: DisconnectReason) -> RoomResult<()> {
959        let Some(handle) = self.handle.lock().await.take() else { Err(RoomError::AlreadyClosed)? };
960
961        // remove published tracks
962        for (sid, _) in self.local_participant.track_publications().iter() {
963            let _ = self.local_participant.unpublish_track(sid).await;
964        }
965
966        self.rtc_engine.close(reason).await;
967        self.e2ee_manager.cleanup();
968
969        let _ = handle.close_tx.send(());
970        let _ = handle.incoming_stream_handle.await;
971        let _ = handle.outgoing_stream_handle.await;
972        let _ = handle.room_handle.await;
973
974        self.dispatcher.clear();
975        Ok(())
976    }
977
978    /// Change the connection state and emit an event
979    /// Does nothing if the state is already the same
980    /// Returns true if the state changed
981    fn update_connection_state(&self, state: ConnectionState) -> bool {
982        let mut info = self.info.write();
983        if info.state == state {
984            return false;
985        }
986
987        info.state = state;
988        self.dispatcher.dispatch(&RoomEvent::ConnectionStateChanged(state));
989        true
990    }
991
992    /// Update the participants inside a Room.
993    /// It'll create, update or remove a participant
994    /// It also update the participant tracks.
995    fn handle_participant_update(self: &Arc<Self>, updates: Vec<proto::ParticipantInfo>) {
996        // only update non-disconnected participants to refresh info
997        let mut participants: Vec<Participant> = Vec::new();
998        for pi in updates {
999            let participant_sid = pi.sid.clone().try_into().unwrap();
1000            let participant_identity: ParticipantIdentity = pi.identity.clone().into();
1001
1002            if participant_sid == self.local_participant.sid()
1003                || participant_identity == self.local_participant.identity()
1004            {
1005                self.local_participant.clone().update_info(pi);
1006                participants.push(Participant::Local(self.local_participant.clone()));
1007                continue;
1008            }
1009
1010            // The remote participant sid could have changed (due to a new initial connection)
1011            if let Some(remote_participant) =
1012                self.get_participant_by_identity(&participant_identity)
1013            {
1014                if remote_participant.sid() != participant_sid {
1015                    // Same identity but different sid, disconnect, remove the old participant
1016                    self.clone().handle_participant_disconnect(remote_participant);
1017                }
1018            }
1019
1020            let remote_participant = self.get_participant_by_sid(&participant_sid);
1021            if pi.state == proto::participant_info::State::Disconnected as i32 {
1022                if let Some(remote_participant) = remote_participant {
1023                    // need to update to get the correct disconnect reason
1024                    remote_participant.update_info(pi.clone());
1025                    self.clone().handle_participant_disconnect(remote_participant)
1026                } else {
1027                    // Ignore, just received the ParticipantInfo but the participant is already
1028                    // disconnected
1029                }
1030            } else if let Some(remote_participant) = remote_participant {
1031                remote_participant.update_info(pi.clone());
1032                participants.push(Participant::Remote(remote_participant));
1033            } else {
1034                // Create a new participant
1035                let remote_participant = {
1036                    let pi = pi.clone();
1037                    self.create_participant(
1038                        pi.kind().into(),
1039                        utils::convert_kind_details(&pi.kind_details),
1040                        pi.sid.try_into().unwrap(),
1041                        pi.identity.into(),
1042                        pi.name,
1043                        pi.metadata,
1044                        pi.attributes,
1045                        pi.permission,
1046                    )
1047                };
1048
1049                self.dispatcher
1050                    .dispatch(&RoomEvent::ParticipantConnected(remote_participant.clone()));
1051
1052                remote_participant.update_info(pi.clone()); // Add tracks
1053            }
1054        }
1055        if !participants.is_empty() {
1056            self.dispatcher.dispatch(&RoomEvent::ParticipantsUpdated { participants });
1057        }
1058    }
1059
1060    fn handle_media_track(
1061        &self,
1062        track: MediaStreamTrack,
1063        stream: MediaStream,
1064        transceiver: RtpTransceiver,
1065    ) {
1066        let stream_id = stream.id();
1067        let lk_stream_id = unpack_stream_id(&stream_id);
1068        if lk_stream_id.is_none() {
1069            log::error!("received track with an invalid track_id: {:?}", &stream_id);
1070            return;
1071        }
1072
1073        let (participant_sid, stream_id) = lk_stream_id.unwrap();
1074        let mut track_id = track.id();
1075
1076        // Resolve track ID based on signaling mode
1077        let session = self.rtc_engine.session();
1078        if session.is_single_pc_mode() {
1079            // In single PC mode, resolve track ID from mid_to_track_id mapping
1080            if let Some(mid) = transceiver.mid() {
1081                if let Some(resolved_track_id) = session.get_track_id_for_mid(&mid) {
1082                    log::debug!(
1083                        "resolved track_id from mid: mid={}, track_id={}",
1084                        mid,
1085                        resolved_track_id
1086                    );
1087                    track_id = resolved_track_id.into();
1088                } else {
1089                    log::warn!(
1090                        "could not resolve track_id for mid={}, using track.id()={}",
1091                        mid,
1092                        track_id
1093                    );
1094                }
1095            }
1096        } else if stream_id.starts_with("TR") {
1097            // In dual PC mode, use stream_id if it's a valid track ID
1098            track_id = stream_id.into();
1099        }
1100
1101        if !track_id.starts_with("TR") {
1102            log::warn!(
1103                "track_id does not start with TR after resolution: track_id={}, stream_id={}",
1104                track_id,
1105                stream_id
1106            );
1107        }
1108
1109        let participant_sid: ParticipantSid = participant_sid.to_owned().try_into().unwrap();
1110        let track_id: TrackSid = match track_id.to_owned().try_into() {
1111            Ok(track_id) => track_id,
1112            Err(err) => {
1113                log::error!(
1114                    "dropping remote track due to invalid TrackSid: track_id={}, stream_id={}, err={:?}",
1115                    track_id,
1116                    stream_id,
1117                    err
1118                );
1119                return;
1120            }
1121        };
1122
1123        let remote_participant = self
1124            .remote_participants
1125            .read()
1126            .values()
1127            .find(|x| &x.sid() == &participant_sid)
1128            .cloned();
1129
1130        if let Some(remote_participant) = remote_participant {
1131            livekit_runtime::spawn(async move {
1132                remote_participant.add_subscribed_media_track(track_id, track, transceiver).await;
1133            });
1134        } else {
1135            // The server should send participant updates before sending a new offer, this should
1136            // happen
1137            log::error!("received track from an unknown participant: {:?}", participant_sid);
1138        }
1139    }
1140
1141    /// Active speakers changed
1142    /// Update the participants & sort the active_speakers by audio_level
1143    fn handle_speakers_changed(&self, speakers_info: Vec<proto::SpeakerInfo>) {
1144        let mut speakers = Vec::new();
1145
1146        for speaker in speakers_info {
1147            let sid: ParticipantSid = speaker.sid.try_into().unwrap();
1148            let participant = {
1149                if sid == self.local_participant.sid() {
1150                    Participant::Local(self.local_participant.clone())
1151                } else if let Some(participant) = self.get_participant_by_sid(&sid) {
1152                    Participant::Remote(participant)
1153                } else {
1154                    continue;
1155                }
1156            };
1157
1158            participant.set_speaking(speaker.active);
1159            participant.set_audio_level(speaker.level);
1160
1161            if speaker.active {
1162                speakers.push(participant);
1163            }
1164        }
1165
1166        speakers.sort_by(|a, b| a.audio_level().partial_cmp(&b.audio_level()).unwrap());
1167        *self.active_speakers.write() = speakers.clone();
1168
1169        self.dispatcher.dispatch(&RoomEvent::ActiveSpeakersChanged { speakers });
1170    }
1171
1172    /// Handle a connection quality update
1173    /// Emit ConnectionQualityChanged event for the concerned participants
1174    fn handle_connection_quality_update(&self, updates: Vec<proto::ConnectionQualityInfo>) {
1175        for update in updates {
1176            let quality: ConnectionQuality = update.quality().into();
1177            let sid: ParticipantSid = update.participant_sid.try_into().unwrap();
1178            let participant = {
1179                if sid == self.local_participant.sid() {
1180                    Participant::Local(self.local_participant.clone())
1181                } else if let Some(participant) = self.get_participant_by_sid(&sid) {
1182                    Participant::Remote(participant)
1183                } else {
1184                    continue;
1185                }
1186            };
1187
1188            participant.set_connection_quality(quality);
1189            self.dispatcher.dispatch(&RoomEvent::ConnectionQualityChanged { participant, quality });
1190        }
1191    }
1192
1193    /// Handle the first time a participant subscribes to a track
1194    /// Pass this event forward
1195    fn handle_track_subscribed(&self, track_sid: String) {
1196        let publications = self.local_participant.track_publications().clone();
1197        let publication = publications.get(&track_sid.to_owned().try_into().unwrap());
1198        if let Some(publication) = publication {
1199            self.dispatcher
1200                .dispatch(&RoomEvent::LocalTrackSubscribed { track: publication.track().unwrap() });
1201        }
1202    }
1203
1204    async fn send_sync_state(self: &Arc<Self>) {
1205        let auto_subscribe = self.options.auto_subscribe;
1206        let session = self.rtc_engine.session();
1207        let single_pc_mode = session.is_single_pc_mode();
1208
1209        // In single PC mode, use publisher's offer/answer
1210        // In dual PC mode, use subscriber's offer/answer
1211        let (offer, answer) = if single_pc_mode {
1212            let pub_pc = session.publisher().peer_connection();
1213            let Some(local_desc) = pub_pc.current_local_description() else {
1214                log::warn!("skipping sendSyncState, no publisher offer");
1215                return;
1216            };
1217            let remote_desc = pub_pc.current_remote_description();
1218            // In single PC mode: offer is local (publisher initiates), answer is remote
1219            (local_desc, remote_desc)
1220        } else {
1221            let Some(sub_pc) = session.subscriber() else {
1222                log::warn!("skipping sendSyncState, no subscriber");
1223                return;
1224            };
1225            let sub_pc = sub_pc.peer_connection();
1226            let Some(local_desc) = sub_pc.current_local_description() else {
1227                log::warn!("skipping sendSyncState, no subscriber answer");
1228                return;
1229            };
1230            let Some(remote_desc) = sub_pc.current_remote_description() else {
1231                log::warn!("skipping sendSyncState, no subscriber offer");
1232                return;
1233            };
1234            // In dual PC mode: answer is local, offer is remote
1235            (remote_desc, Some(local_desc))
1236        };
1237
1238        let mut track_sids = Vec::new();
1239        for (_, participant) in self.remote_participants.read().clone() {
1240            for (track_sid, track) in participant.track_publications() {
1241                if track.is_desired() != auto_subscribe {
1242                    track_sids.push(track_sid.to_string());
1243                }
1244            }
1245        }
1246
1247        let mut dcs = Vec::with_capacity(4);
1248        if session.has_published() {
1249            let lossy_dc =
1250                session.data_channel(SignalTarget::Publisher, DataPacketKind::Lossy).unwrap();
1251            let reliable_dc =
1252                session.data_channel(SignalTarget::Publisher, DataPacketKind::Reliable).unwrap();
1253
1254            dcs.push(proto::DataChannelInfo {
1255                label: lossy_dc.label(),
1256                id: lossy_dc.id() as u32,
1257                target: proto::SignalTarget::Publisher as i32,
1258            });
1259
1260            dcs.push(proto::DataChannelInfo {
1261                label: reliable_dc.label(),
1262                id: reliable_dc.id() as u32,
1263                target: proto::SignalTarget::Publisher as i32,
1264            });
1265        }
1266
1267        if let Some(lossy_dc) =
1268            session.data_channel(SignalTarget::Subscriber, DataPacketKind::Lossy)
1269        {
1270            dcs.push(proto::DataChannelInfo {
1271                label: lossy_dc.label(),
1272                id: lossy_dc.id() as u32,
1273                target: proto::SignalTarget::Subscriber as i32,
1274            });
1275        }
1276
1277        if let Some(reliable_dc) =
1278            session.data_channel(SignalTarget::Subscriber, DataPacketKind::Reliable)
1279        {
1280            dcs.push(proto::DataChannelInfo {
1281                label: reliable_dc.label(),
1282                id: reliable_dc.id() as u32,
1283                target: proto::SignalTarget::Subscriber as i32,
1284            });
1285        }
1286
1287        let sync_state = proto::SyncState {
1288            answer: answer.map(|a| proto::SessionDescription {
1289                sdp: a.to_string(),
1290                r#type: a.sdp_type().to_string(),
1291                id: 0,
1292                mid_to_track_id: Default::default(),
1293            }),
1294            offer: Some(proto::SessionDescription {
1295                sdp: offer.to_string(),
1296                r#type: offer.sdp_type().to_string(),
1297                id: 0,
1298                mid_to_track_id: Default::default(),
1299            }),
1300            track_sids_disabled: Vec::default(), // TODO: New protocol version
1301            subscription: Some(proto::UpdateSubscription {
1302                track_sids,
1303                subscribe: !auto_subscribe,
1304                participant_tracks: Vec::new(),
1305            }),
1306            publish_tracks: self.local_participant.published_tracks_info(),
1307            data_channels: dcs,
1308            datachannel_receive_states: session.data_channel_receive_states(),
1309            publish_data_tracks: Default::default(),
1310        };
1311
1312        log::debug!("sending sync state {:?}", sync_state);
1313        self.rtc_engine.send_request(proto::signal_request::Message::SyncState(sync_state)).await;
1314    }
1315
1316    fn handle_room_update(self: &Arc<Self>, room: proto::Room) {
1317        let mut info = self.info.write();
1318        let old_metadata = std::mem::replace(&mut info.metadata, room.metadata.clone());
1319        let mut updated = false;
1320        if old_metadata != room.metadata {
1321            updated = true;
1322            self.dispatcher.dispatch(&RoomEvent::RoomMetadataChanged {
1323                old_metadata,
1324                metadata: info.metadata.clone(),
1325            });
1326        }
1327        if !room.sid.is_empty() {
1328            let sid = room.sid.try_into().ok();
1329            info.sid = sid.clone();
1330            if let Some(sid) = sid {
1331                let _ = self.sid_promise.resolve(sid);
1332            }
1333        }
1334        if info.name != room.name {
1335            updated = true;
1336            info.name = room.name;
1337        }
1338        if info.empty_timeout != room.empty_timeout {
1339            updated = true;
1340            info.empty_timeout = room.empty_timeout;
1341        }
1342        if info.departure_timeout != room.departure_timeout {
1343            updated = true;
1344            info.departure_timeout = room.departure_timeout;
1345        }
1346        if info.max_participants != room.max_participants {
1347            updated = true;
1348            info.max_participants = room.max_participants;
1349        }
1350        if info.num_participants != room.num_participants {
1351            updated = true;
1352            info.num_participants = room.num_participants;
1353        }
1354        if info.num_publishers != room.num_publishers {
1355            updated = true;
1356            info.num_publishers = room.num_publishers;
1357        }
1358        if info.active_recording != room.active_recording {
1359            updated = true;
1360            info.active_recording = room.active_recording;
1361        }
1362        info.creation_time = room.creation_time_ms;
1363        if updated {
1364            self.dispatcher.dispatch(&RoomEvent::RoomUpdated { room: info.clone() });
1365        }
1366    }
1367
1368    fn handle_room_moved(self: &Arc<Self>, moved: proto::RoomMovedResponse) {
1369        self.handle_refresh_token(self.rtc_engine.session().signal_client().url(), moved.token);
1370        if let Some(local_participant) = moved.participant {
1371            self.local_participant.update_info(local_participant);
1372            self.dispatcher.dispatch(&RoomEvent::ParticipantsUpdated {
1373                participants: vec![Participant::Local(self.local_participant.clone())],
1374            });
1375        }
1376        self.handle_participant_update(moved.other_participants);
1377        if let Some(room) = moved.room {
1378            self.handle_room_update(room);
1379        }
1380        let info = self.info.read();
1381        self.dispatcher.dispatch(&RoomEvent::Moved { room: info.clone() });
1382    }
1383
1384    fn handle_resuming(self: &Arc<Self>, tx: oneshot::Sender<()>) {
1385        if self.update_connection_state(ConnectionState::Reconnecting) {
1386            self.dispatcher.dispatch(&RoomEvent::Reconnecting);
1387        }
1388
1389        let _ = tx.send(());
1390    }
1391
1392    fn handle_resumed(self: &Arc<Self>, tx: oneshot::Sender<()>) {
1393        self.update_connection_state(ConnectionState::Connected);
1394        self.dispatcher.dispatch(&RoomEvent::Reconnected);
1395
1396        let _ = tx.send(());
1397
1398        let local_participant = self.local_participant.clone();
1399        livekit_runtime::spawn(async move {
1400            local_participant.update_track_subscription_permissions().await;
1401        });
1402    }
1403
1404    fn handle_signal_resumed(
1405        self: &Arc<Self>,
1406        _reconnect_repsonse: proto::ReconnectResponse,
1407        tx: oneshot::Sender<()>,
1408    ) {
1409        livekit_runtime::spawn({
1410            let session = self.clone();
1411            async move {
1412                session.send_sync_state().await;
1413
1414                // Always send the sync state before continuing the reconnection (e.g: publisher
1415                // offer)
1416                let _ = tx.send(());
1417            }
1418        });
1419    }
1420
1421    fn handle_restarting(self: &Arc<Self>, tx: oneshot::Sender<()>) {
1422        // Remove existing participants/subscriptions on full reconnect
1423        let participants = self.remote_participants.read().clone();
1424        for (_, participant) in participants.iter() {
1425            self.clone().handle_participant_disconnect(participant.clone());
1426        }
1427
1428        if self.update_connection_state(ConnectionState::Reconnecting) {
1429            self.dispatcher.dispatch(&RoomEvent::Reconnecting);
1430        }
1431
1432        let _ = tx.send(());
1433    }
1434
1435    fn handle_restarted(self: &Arc<Self>, tx: oneshot::Sender<()>) {
1436        let _ = tx.send(());
1437
1438        // Unpublish and republish every track
1439        // At this time we know that the RtcSession is successfully restarted
1440        let published_tracks = self.local_participant.track_publications();
1441
1442        // we need to update the track subscription permissions after reconnection
1443        let local_participant = self.local_participant.clone();
1444
1445        // Spawining a new task because we need to wait for the RtcEngine to close the reconnection
1446        // lock.
1447        livekit_runtime::spawn({
1448            let session = self.clone();
1449            async move {
1450                let mut set = tokio::task::JoinSet::new();
1451
1452                for (_, publication) in published_tracks {
1453                    let track = publication.track().unwrap();
1454
1455                    let lp = session.local_participant.clone();
1456                    let republish = async move {
1457                        // Only "really" used to send LocalTrackUnpublished event (Since we don't
1458                        // really need to remove the RtpSender since we know
1459                        // we are using a new RtcSession,
1460                        // so new PeerConnetions)
1461
1462                        let _ = lp.unpublish_track(&publication.sid()).await;
1463                        if let Err(err) =
1464                            lp.publish_track(track.clone(), publication.publish_options()).await
1465                        {
1466                            log::error!(
1467                                "failed to republish track {} after rtc_engine restarted: {}",
1468                                track.name(),
1469                                err
1470                            )
1471                        }
1472                    };
1473
1474                    set.spawn(republish);
1475                }
1476
1477                // Wait for the tracks to be republished before sending the Connect event
1478                while set.join_next().await.is_some() {}
1479
1480                local_participant.update_track_subscription_permissions().await;
1481
1482                session.update_connection_state(ConnectionState::Connected);
1483                session.dispatcher.dispatch(&RoomEvent::Reconnected);
1484            }
1485        });
1486    }
1487
1488    fn handle_signal_restarted(
1489        self: &Arc<Self>,
1490        join_response: proto::JoinResponse,
1491        tx: oneshot::Sender<()>,
1492    ) {
1493        self.local_participant.update_info(join_response.participant.unwrap()); // The sid may have changed
1494
1495        self.handle_participant_update(join_response.other_participants);
1496        self.handle_room_update(join_response.room.unwrap());
1497
1498        let _ = tx.send(());
1499    }
1500
1501    fn handle_disconnected(self: &Arc<Self>, reason: DisconnectReason) {
1502        if self.update_connection_state(ConnectionState::Disconnected) {
1503            self.dispatcher.dispatch(&RoomEvent::Disconnected { reason });
1504        }
1505
1506        log::info!("disconnected from room with reason: {:?}", reason);
1507        if reason != DisconnectReason::ClientInitiated {
1508            livekit_runtime::spawn({
1509                let inner = self.clone();
1510                async move {
1511                    let _ = inner.close(reason).await;
1512                }
1513            });
1514        }
1515    }
1516
1517    fn handle_data(
1518        &self,
1519        payload: Vec<u8>,
1520        topic: Option<String>,
1521        kind: DataPacketKind,
1522        participant_sid: Option<ParticipantSid>,
1523        participant_identity: Option<ParticipantIdentity>,
1524        encryption_type: proto::encryption::Type,
1525    ) {
1526        let mut participant = participant_identity
1527            .as_ref()
1528            .map(|identity| self.get_participant_by_identity(identity))
1529            .unwrap_or(None);
1530
1531        if participant.is_none() {
1532            participant = participant_sid
1533                .as_ref()
1534                .map(|sid| self.get_participant_by_sid(sid))
1535                .unwrap_or(None);
1536        }
1537
1538        // Update participant's data encryption status for regular data messages
1539        if let Some(ref p) = participant {
1540            use crate::e2ee::EncryptionType;
1541            let is_encrypted = EncryptionType::from(encryption_type) != EncryptionType::None;
1542            p.update_data_encryption_status(is_encrypted);
1543        }
1544
1545        self.dispatcher.dispatch(&RoomEvent::DataReceived {
1546            payload: Arc::new(payload),
1547            topic,
1548            kind,
1549            participant,
1550        });
1551    }
1552
1553    fn handle_chat_message(
1554        &self,
1555        participant_identity: ParticipantIdentity,
1556        chat_message: ChatMessage,
1557    ) {
1558        let participant = self.get_participant_by_identity(&participant_identity);
1559
1560        if participant.is_none() {
1561            // We received a data packet from a participant that is not in the participants list
1562            return;
1563        }
1564
1565        self.dispatcher.dispatch(&RoomEvent::ChatMessage { message: chat_message, participant });
1566    }
1567
1568    fn handle_dtmf(
1569        &self,
1570        code: u32,
1571        digit: Option<String>,
1572        participant_identity: Option<ParticipantIdentity>,
1573    ) {
1574        let participant = participant_identity
1575            .as_ref()
1576            .map(|identity| self.get_participant_by_identity(identity))
1577            .unwrap_or(None);
1578
1579        if participant.is_none() && participant_identity.is_some() {
1580            // We received a DTMF from a participant that is not in the participants list
1581            return;
1582        }
1583
1584        self.dispatcher.dispatch(&RoomEvent::SipDTMFReceived { code, digit, participant });
1585    }
1586
1587    fn handle_transcription(
1588        &self,
1589        participant_identity: ParticipantIdentity,
1590        track_sid: String,
1591        segments: Vec<TranscriptionSegment>,
1592    ) {
1593        let participant = self.get_local_or_remote_participant(&participant_identity);
1594        let track_sid: TrackSid = track_sid.to_owned().try_into().unwrap();
1595        let track_publication: Option<TrackPublication> = match &participant {
1596            Some(Participant::Local(ref participant)) => {
1597                participant.get_track_publication(&track_sid).map(TrackPublication::Local)
1598            }
1599            Some(Participant::Remote(ref participant)) => {
1600                participant.get_track_publication(&track_sid).map(TrackPublication::Remote)
1601            }
1602            None => None,
1603        };
1604
1605        self.dispatcher.dispatch(&RoomEvent::TranscriptionReceived {
1606            participant,
1607            track_publication,
1608            segments,
1609        });
1610    }
1611
1612    fn handle_data_stream_header(
1613        &self,
1614        header: proto::data_stream::Header,
1615        participant_identity: String,
1616        encryption_type: proto::encryption::Type,
1617    ) {
1618        self.incoming_stream_manager.handle_header(
1619            header.clone(),
1620            participant_identity.clone(),
1621            encryption_type,
1622        );
1623
1624        // Update participant's data encryption status
1625        if let Some(participant) =
1626            self.remote_participants.read().get(&participant_identity.clone().into()).cloned()
1627        {
1628            use crate::e2ee::EncryptionType;
1629            let is_encrypted = EncryptionType::from(encryption_type) != EncryptionType::None;
1630            participant.update_data_encryption_status(is_encrypted);
1631        }
1632
1633        // For backwards compatibly
1634        let event = RoomEvent::StreamHeaderReceived { header, participant_identity };
1635        self.dispatcher.dispatch(&event);
1636    }
1637
1638    fn handle_data_stream_chunk(
1639        &self,
1640        chunk: proto::data_stream::Chunk,
1641        participant_identity: String,
1642        encryption_type: proto::encryption::Type,
1643    ) {
1644        self.incoming_stream_manager.handle_chunk(chunk.clone(), encryption_type);
1645
1646        // For backwards compatibly
1647        let event = RoomEvent::StreamChunkReceived { chunk, participant_identity };
1648        self.dispatcher.dispatch(&event);
1649    }
1650
1651    fn handle_data_stream_trailer(
1652        &self,
1653        trailer: proto::data_stream::Trailer,
1654        participant_identity: String,
1655    ) {
1656        self.incoming_stream_manager.handle_trailer(trailer.clone());
1657
1658        // For backwards compatibly
1659        let event = RoomEvent::StreamTrailerReceived { trailer, participant_identity };
1660        self.dispatcher.dispatch(&event);
1661    }
1662
1663    fn handle_data_channel_buffered_low_threshold_change(
1664        &self,
1665        kind: DataPacketKind,
1666        threshold: u64,
1667    ) {
1668        let mut info = self.info.write();
1669        match kind {
1670            DataPacketKind::Lossy => {
1671                info.lossy_dc_options.buffered_amount_low_threshold = threshold;
1672            }
1673            DataPacketKind::Reliable => {
1674                info.reliable_dc_options.buffered_amount_low_threshold = threshold;
1675            }
1676        }
1677        let event = RoomEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold };
1678        self.dispatcher.dispatch(&event);
1679    }
1680
1681    fn handle_server_initiated_mute_track(&self, sid: String, muted: bool) {
1682        let sid_for_log = sid.clone();
1683        let track_sid = match sid.try_into() {
1684            Ok(sid) => sid,
1685            Err(_) => {
1686                log::warn!("Invalid track sid in mute request: {}", sid_for_log);
1687                return;
1688            }
1689        };
1690
1691        if let Some(publication) = self.local_participant.get_track_publication(&track_sid) {
1692            if muted {
1693                publication.mute();
1694            } else {
1695                publication.unmute();
1696            }
1697            return;
1698        }
1699
1700        log::warn!("Track not found in mute request: {}", sid_for_log);
1701    }
1702
1703    /// Create a new participant
1704    /// Also add it to the participants list
1705    fn create_participant(
1706        self: &Arc<Self>,
1707        kind: ParticipantKind,
1708        kind_details: Vec<ParticipantKindDetail>,
1709        sid: ParticipantSid,
1710        identity: ParticipantIdentity,
1711        name: String,
1712        metadata: String,
1713        attributes: HashMap<String, String>,
1714        permission: Option<proto::ParticipantPermission>,
1715    ) -> RemoteParticipant {
1716        let participant = RemoteParticipant::new(
1717            self.rtc_engine.clone(),
1718            kind,
1719            kind_details,
1720            sid.clone(),
1721            identity.clone(),
1722            name,
1723            metadata,
1724            attributes,
1725            self.options.auto_subscribe,
1726            permission,
1727        );
1728
1729        participant.on_track_published({
1730            let dispatcher = self.dispatcher.clone();
1731            move |participant, publication| {
1732                dispatcher.dispatch(&RoomEvent::TrackPublished { participant, publication });
1733            }
1734        });
1735
1736        participant.on_track_unpublished({
1737            let dispatcher = self.dispatcher.clone();
1738            move |participant, publication| {
1739                dispatcher.dispatch(&RoomEvent::TrackUnpublished { participant, publication });
1740            }
1741        });
1742
1743        participant.on_track_subscribed({
1744            let dispatcher = self.dispatcher.clone();
1745            let e2ee_manager = self.e2ee_manager.clone();
1746            move |participant, publication, track| {
1747                let event = RoomEvent::TrackSubscribed {
1748                    participant: participant.clone(),
1749                    track: track.clone(),
1750                    publication: publication.clone(),
1751                };
1752                e2ee_manager.on_track_subscribed(track, publication, participant);
1753                dispatcher.dispatch(&event);
1754            }
1755        });
1756
1757        participant.on_track_unsubscribed({
1758            let dispatcher = self.dispatcher.clone();
1759            let e2ee_manager = self.e2ee_manager.clone();
1760            move |participant, publication, track| {
1761                let event = RoomEvent::TrackUnsubscribed {
1762                    participant: participant.clone(),
1763                    track: track.clone(),
1764                    publication: publication.clone(),
1765                };
1766                e2ee_manager.on_track_unsubscribed(track, publication, participant);
1767                dispatcher.dispatch(&event);
1768            }
1769        });
1770
1771        participant.on_track_subscription_failed({
1772            let dispatcher = self.dispatcher.clone();
1773            move |participant, track_sid, error| {
1774                dispatcher.dispatch(&RoomEvent::TrackSubscriptionFailed {
1775                    participant,
1776                    track_sid,
1777                    error,
1778                });
1779            }
1780        });
1781
1782        participant.on_track_muted({
1783            let dispatcher = self.dispatcher.clone();
1784            move |participant, publication| {
1785                let event = RoomEvent::TrackMuted { participant, publication };
1786                dispatcher.dispatch(&event);
1787            }
1788        });
1789
1790        participant.on_track_unmuted({
1791            let dispatcher = self.dispatcher.clone();
1792            move |participant, publication| {
1793                let event = RoomEvent::TrackUnmuted { participant, publication };
1794                dispatcher.dispatch(&event);
1795            }
1796        });
1797
1798        participant.on_metadata_changed({
1799            let dispatcher = self.dispatcher.clone();
1800            move |participant, old_metadata, metadata| {
1801                let event =
1802                    RoomEvent::ParticipantMetadataChanged { participant, old_metadata, metadata };
1803                dispatcher.dispatch(&event);
1804            }
1805        });
1806
1807        participant.on_name_changed({
1808            let dispatcher = self.dispatcher.clone();
1809            move |participant, old_name, name| {
1810                let event = RoomEvent::ParticipantNameChanged { participant, old_name, name };
1811                dispatcher.dispatch(&event);
1812            }
1813        });
1814
1815        participant.on_attributes_changed({
1816            let dispatcher = self.dispatcher.clone();
1817            move |participant, changed_attributes| {
1818                let event =
1819                    RoomEvent::ParticipantAttributesChanged { participant, changed_attributes };
1820                dispatcher.dispatch(&event);
1821            }
1822        });
1823
1824        participant.on_permission_changed({
1825            let dispatcher = self.dispatcher.clone();
1826            move |participant, permission| {
1827                let event = RoomEvent::ParticipantPermissionChanged { participant, permission };
1828                dispatcher.dispatch(&event);
1829            }
1830        });
1831
1832        participant.on_encryption_status_changed({
1833            let dispatcher = self.dispatcher.clone();
1834            move |participant, is_encrypted| {
1835                let event =
1836                    RoomEvent::ParticipantEncryptionStatusChanged { participant, is_encrypted };
1837                dispatcher.dispatch(&event);
1838            }
1839        });
1840
1841        let mut participants = self.remote_participants.write();
1842        participants.insert(identity, participant.clone());
1843        participant
1844    }
1845
1846    /// A participant has disconnected
1847    /// Cleanup the participant and emit an event
1848    fn handle_participant_disconnect(self: Arc<Self>, remote_participant: RemoteParticipant) {
1849        for (sid, _) in remote_participant.track_publications() {
1850            remote_participant.unpublish_track(&sid);
1851        }
1852
1853        let mut participants = self.remote_participants.write();
1854        participants.remove(&remote_participant.identity());
1855        self.dispatcher.dispatch(&RoomEvent::ParticipantDisconnected(remote_participant));
1856    }
1857
1858    fn get_participant_by_sid(&self, sid: &ParticipantSid) -> Option<RemoteParticipant> {
1859        self.remote_participants.read().values().find(|x| &x.sid() == sid).cloned()
1860    }
1861
1862    fn get_participant_by_identity(
1863        &self,
1864        identity: &ParticipantIdentity,
1865    ) -> Option<RemoteParticipant> {
1866        self.remote_participants.read().get(identity).cloned()
1867    }
1868
1869    fn get_local_or_remote_participant(
1870        &self,
1871        identity: &ParticipantIdentity,
1872    ) -> Option<Participant> {
1873        if identity == &self.local_participant.identity() {
1874            return Some(Participant::Local(self.local_participant.clone()));
1875        }
1876        return self.get_participant_by_identity(identity).map(Participant::Remote);
1877    }
1878
1879    fn handle_refresh_token(self: &Arc<Self>, url: String, token: String) {
1880        // notify refreshed token to registered audio filters
1881        for filter in registered_audio_filter_plugins().into_iter() {
1882            filter.update_token(url.clone(), token.clone());
1883        }
1884        let event = RoomEvent::TokenRefreshed { token };
1885        self.dispatcher.dispatch(&event);
1886    }
1887}
1888
1889/// Receives stream readers for newly-opened streams and dispatches room events.
1890async fn incoming_data_stream_task(
1891    mut open_rx: UnboundedReceiver<(AnyStreamReader, String)>,
1892    dispatcher: Dispatcher<RoomEvent>,
1893    mut close_rx: broadcast::Receiver<()>,
1894) {
1895    loop {
1896        tokio::select! {
1897            Some((reader, identity)) = open_rx.recv() => {
1898                match reader {
1899                    AnyStreamReader::Byte(reader) => dispatcher.dispatch(&RoomEvent::ByteStreamOpened {
1900                        topic: reader.info().topic.clone(),
1901                        reader: TakeCell::new(reader),
1902                        participant_identity: ParticipantIdentity(identity)
1903                    }),
1904                    AnyStreamReader::Text(reader) => dispatcher.dispatch(&RoomEvent::TextStreamOpened {
1905                        topic: reader.info().topic.clone(),
1906                        reader: TakeCell::new(reader),
1907                        participant_identity: ParticipantIdentity(identity)
1908                    }),
1909                }
1910            },
1911            _ = close_rx.recv() => {
1912                break;
1913            }
1914        }
1915    }
1916}
1917
1918/// Receives packets from the outgoing stream manager and send them.
1919async fn outgoing_data_stream_task(
1920    mut packet_rx: UnboundedRequestReceiver<proto::DataPacket, Result<(), EngineError>>,
1921    engine: Arc<RtcEngine>,
1922    mut close_rx: broadcast::Receiver<()>,
1923) {
1924    loop {
1925        tokio::select! {
1926            Ok((packet, responder)) = packet_rx.recv() => {
1927                let result = engine.publish_data(packet, DataPacketKind::Reliable, false).await;
1928                let _ = responder.respond(result);
1929            },
1930            _ = close_rx.recv() => {
1931                break;
1932            }
1933        }
1934    }
1935}
1936
1937fn unpack_stream_id(stream_id: &str) -> Option<(&str, &str)> {
1938    let split: Vec<&str> = stream_id.split('|').collect();
1939    if split.len() == 2 {
1940        let participant_sid = split.first().unwrap();
1941        let track_sid = split.get(1).unwrap();
1942        Some((participant_sid, track_sid))
1943    } else {
1944        None
1945    }
1946}