1use bmrng::unbounded::UnboundedRequestReceiver;
16use gosuto_libwebrtc::{
17 native::frame_cryptor::EncryptionState,
18 prelude::{
19 ContinualGatheringPolicy, IceTransportsType, MediaStream, MediaStreamTrack,
20 RtcConfiguration,
21 },
22 rtp_transceiver::RtpTransceiver,
23 RtcError,
24};
25use livekit_api::signal_client::{SignalOptions, SignalSdkOptions, SIGNAL_CONNECT_TIMEOUT};
26use livekit_protocol::observer::Dispatcher;
27use livekit_protocol::{self as proto, encryption};
28use livekit_runtime::JoinHandle;
29use parking_lot::RwLock;
30pub use proto::DisconnectReason;
31use proto::{promise::Promise, SignalTarget};
32use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration};
33use thiserror::Error;
34use tokio::sync::{
35 broadcast,
36 mpsc::{self, UnboundedReceiver},
37 oneshot, Mutex as AsyncMutex,
38};
39pub use utils::take_cell::TakeCell;
40
41pub use self::{
42 data_stream::*,
43 e2ee::{manager::E2eeManager, E2eeOptions},
44 participant::{ParticipantKind, ParticipantKindDetail},
45};
46pub use crate::rtc_engine::SimulateScenario;
47use crate::{
48 participant::ConnectionQuality,
49 prelude::*,
50 registered_audio_filter_plugins,
51 rtc_engine::{
52 EngineError, EngineEvent, EngineEvents, EngineOptions, EngineResult, RtcEngine,
53 SessionStats, INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD,
54 },
55};
56
57pub mod data_stream;
58pub mod e2ee;
59pub mod id;
60pub mod options;
61pub mod participant;
62pub mod publication;
63pub mod track;
64pub(crate) mod utils;
65
66pub const SDK_VERSION: &str = env!("CARGO_PKG_VERSION");
67
68pub type RoomResult<T> = Result<T, RoomError>;
69
70#[derive(Error, Debug)]
71pub enum RoomError {
72 #[error("engine: {0}")]
73 Engine(#[from] EngineError),
74 #[error("room failure: {0}")]
75 Internal(String),
76 #[error("rtc: {0}")]
77 Rtc(#[from] RtcError),
78 #[error("this track or a track of the same source is already published")]
79 TrackAlreadyPublished,
80 #[error("already closed")]
81 AlreadyClosed,
82 #[error("request error: {reason:?} - {message}")]
83 Request { reason: proto::request_response::Reason, message: String },
84}
85
86#[derive(Clone, Debug)]
87#[non_exhaustive]
88pub enum RoomEvent {
89 ParticipantConnected(RemoteParticipant),
90 ParticipantDisconnected(RemoteParticipant),
91 LocalTrackPublished {
92 publication: LocalTrackPublication,
93 track: LocalTrack,
94 participant: LocalParticipant,
95 },
96 LocalTrackUnpublished {
97 publication: LocalTrackPublication,
98 participant: LocalParticipant,
99 },
100 LocalTrackSubscribed {
101 track: LocalTrack,
102 },
103 TrackSubscribed {
104 track: RemoteTrack,
105 publication: RemoteTrackPublication,
106 participant: RemoteParticipant,
107 },
108 TrackUnsubscribed {
109 track: RemoteTrack,
110 publication: RemoteTrackPublication,
111 participant: RemoteParticipant,
112 },
113 TrackSubscriptionFailed {
114 participant: RemoteParticipant,
115 error: track::TrackError,
116 track_sid: TrackSid,
117 },
118 TrackPublished {
119 publication: RemoteTrackPublication,
120 participant: RemoteParticipant,
121 },
122 TrackUnpublished {
123 publication: RemoteTrackPublication,
124 participant: RemoteParticipant,
125 },
126 TrackMuted {
127 participant: Participant,
128 publication: TrackPublication,
129 },
130 TrackUnmuted {
131 participant: Participant,
132 publication: TrackPublication,
133 },
134 RoomMetadataChanged {
135 old_metadata: String,
136 metadata: String,
137 },
138 ParticipantMetadataChanged {
139 participant: Participant,
140 old_metadata: String,
141 metadata: String,
142 },
143 ParticipantNameChanged {
144 participant: Participant,
145 old_name: String,
146 name: String,
147 },
148 ParticipantAttributesChanged {
149 participant: Participant,
150 changed_attributes: HashMap<String, String>,
151 },
152 ParticipantEncryptionStatusChanged {
153 participant: Participant,
154 is_encrypted: bool,
155 },
156 ParticipantPermissionChanged {
157 participant: Participant,
158 permission: Option<proto::ParticipantPermission>,
159 },
160 ActiveSpeakersChanged {
161 speakers: Vec<Participant>,
162 },
163 ConnectionQualityChanged {
164 quality: ConnectionQuality,
165 participant: Participant,
166 },
167 DataReceived {
168 payload: Arc<Vec<u8>>,
169 topic: Option<String>,
170 kind: DataPacketKind,
171 participant: Option<RemoteParticipant>,
172 },
173 TranscriptionReceived {
174 participant: Option<Participant>,
175 track_publication: Option<TrackPublication>,
176 segments: Vec<TranscriptionSegment>,
177 },
178 SipDTMFReceived {
179 code: u32,
180 digit: Option<String>,
181 participant: Option<RemoteParticipant>,
182 },
183 ChatMessage {
184 message: ChatMessage,
185 participant: Option<RemoteParticipant>,
186 },
187 ByteStreamOpened {
188 reader: TakeCell<ByteStreamReader>,
189 topic: String,
190 participant_identity: ParticipantIdentity,
191 },
192 TextStreamOpened {
193 reader: TakeCell<TextStreamReader>,
194 topic: String,
195 participant_identity: ParticipantIdentity,
196 },
197 #[deprecated(note = "Use high-level data streams API instead.")]
198 StreamHeaderReceived {
199 header: proto::data_stream::Header,
200 participant_identity: String,
201 },
202 #[deprecated(note = "Use high-level data streams API instead.")]
203 StreamChunkReceived {
204 chunk: proto::data_stream::Chunk,
205 participant_identity: String,
206 },
207 #[deprecated(note = "Use high-level data streams API instead.")]
208 StreamTrailerReceived {
209 trailer: proto::data_stream::Trailer,
210 participant_identity: String,
211 },
212 E2eeStateChanged {
213 participant: Participant,
214 state: EncryptionState,
215 },
216 ConnectionStateChanged(ConnectionState),
217 Connected {
218 participants_with_tracks: Vec<(RemoteParticipant, Vec<RemoteTrackPublication>)>,
222 },
223 Disconnected {
224 reason: DisconnectReason,
225 },
226 Reconnecting,
227 Reconnected,
228 DataChannelBufferedAmountLowThresholdChanged {
229 kind: DataPacketKind,
230 threshold: u64,
231 },
232 RoomUpdated {
233 room: RoomInfo,
234 },
235 Moved {
236 room: RoomInfo,
237 },
238 ParticipantsUpdated {
239 participants: Vec<Participant>,
240 },
241 TokenRefreshed {
242 token: String,
243 },
244}
245
246#[derive(Debug, Clone, Copy, Eq, PartialEq)]
247pub enum ConnectionState {
248 Disconnected,
249 Connected,
250 Reconnecting,
251}
252
253#[derive(Debug, Clone, Copy, Eq, PartialEq)]
254pub enum DataPacketKind {
255 Lossy,
256 Reliable,
257}
258
259#[derive(Debug, Clone)]
260pub struct DataPacket {
261 pub payload: Vec<u8>,
262 pub topic: Option<String>,
263 pub reliable: bool,
264 pub destination_identities: Vec<ParticipantIdentity>,
265}
266
267impl Default for DataPacket {
268 fn default() -> Self {
269 Self {
270 payload: Vec::new(),
271 topic: None,
272 reliable: false,
273 destination_identities: Vec::new(),
274 }
275 }
276}
277
278#[derive(Default, Debug, Clone)]
279pub struct Transcription {
280 pub participant_identity: String,
281 pub track_id: String,
282 pub segments: Vec<TranscriptionSegment>,
283}
284
285#[derive(Default, Debug, Clone)]
286pub struct TranscriptionSegment {
287 pub id: String,
288 pub text: String,
289 pub start_time: u64,
290 pub end_time: u64,
291 pub r#final: bool,
292 pub language: String,
293}
294
295#[derive(Default, Debug, Clone)]
296pub struct SipDTMF {
297 pub code: u32,
298 pub digit: String,
299 pub destination_identities: Vec<ParticipantIdentity>,
300}
301
302#[derive(Default, Debug, Clone)]
303pub struct ChatMessage {
304 pub id: String,
305 pub message: String,
306 pub timestamp: i64,
307 pub edit_timestamp: Option<i64>,
308 pub deleted: Option<bool>,
309 pub generated: Option<bool>,
310}
311
312#[derive(Debug, Clone)]
313pub struct RpcRequest {
314 pub destination_identity: String,
315 pub id: String,
316 pub method: String,
317 pub payload: String,
318 pub response_timeout: Duration,
319 pub version: u32,
320}
321
322#[derive(Debug, Clone)]
323pub struct RpcResponse {
324 destination_identity: String,
325 request_id: String,
326 payload: Option<String>,
327 error: Option<proto::RpcError>,
328}
329
330#[derive(Debug, Clone)]
331pub struct RpcAck {
332 destination_identity: String,
333 request_id: String,
334}
335
336#[derive(Debug, Clone)]
337#[non_exhaustive]
338pub struct RoomSdkOptions {
339 pub sdk: String,
340 pub sdk_version: String,
341}
342
343impl Default for RoomSdkOptions {
344 fn default() -> Self {
345 Self { sdk: "rust".to_string(), sdk_version: SDK_VERSION.to_string() }
346 }
347}
348
349impl From<RoomSdkOptions> for SignalSdkOptions {
350 fn from(options: RoomSdkOptions) -> Self {
351 let mut sdk_options = SignalSdkOptions::default();
352 sdk_options.sdk = options.sdk;
353 sdk_options.sdk_version = Some(options.sdk_version);
354 sdk_options
355 }
356}
357
358#[derive(Debug, Clone)]
359#[non_exhaustive]
360pub struct RoomOptions {
361 pub auto_subscribe: bool,
362 pub adaptive_stream: bool,
363 pub dynacast: bool,
364 #[deprecated(note = "Use `encryption` field instead")]
366 pub e2ee: Option<E2eeOptions>,
367 pub encryption: Option<E2eeOptions>,
368 pub rtc_config: RtcConfiguration,
369 pub join_retries: u32,
370 pub sdk_options: RoomSdkOptions,
371 pub single_peer_connection: bool,
375 pub connect_timeout: Duration,
377}
378
379impl Default for RoomOptions {
380 fn default() -> Self {
381 Self {
382 auto_subscribe: true,
383 adaptive_stream: false,
384 dynacast: false,
385 e2ee: None,
386 encryption: None,
387
388 rtc_config: RtcConfiguration {
390 ice_servers: vec![], continual_gathering_policy: ContinualGatheringPolicy::GatherContinually,
393 ice_transport_type: IceTransportsType::All,
394 },
395 join_retries: 3,
396 sdk_options: RoomSdkOptions::default(),
397 single_peer_connection: false,
398 connect_timeout: SIGNAL_CONNECT_TIMEOUT,
399 }
400 }
401}
402
403pub struct Room {
404 inner: Arc<RoomSession>,
405}
406
407impl Debug for Room {
408 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
409 f.debug_struct("Room")
410 .field("sid", &self.maybe_sid())
411 .field("name", &self.name())
412 .field("connection_state", &self.connection_state())
413 .finish()
414 }
415}
416
417#[derive(Clone, Debug)]
418pub struct RoomInfo {
419 pub sid: Option<RoomSid>,
420 pub name: String,
421 pub metadata: String,
422 pub state: ConnectionState,
423 pub lossy_dc_options: DataChannelOptions,
424 pub reliable_dc_options: DataChannelOptions,
425 pub empty_timeout: u32,
426 pub departure_timeout: u32,
427 pub max_participants: u32,
428 pub creation_time: i64,
429 pub num_publishers: u32,
430 pub num_participants: u32,
431 pub active_recording: bool,
432}
433
434#[derive(Clone, Debug)]
435pub struct DataChannelOptions {
436 pub buffered_amount_low_threshold: u64,
437}
438
439impl Default for DataChannelOptions {
440 fn default() -> Self {
441 Self { buffered_amount_low_threshold: INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD }
442 }
443}
444
445pub(crate) struct RoomSession {
446 rtc_engine: Arc<RtcEngine>,
447 sid_promise: Promise<RoomSid>,
448 info: RwLock<RoomInfo>,
449 dispatcher: Dispatcher<RoomEvent>,
450 options: RoomOptions,
451 active_speakers: RwLock<Vec<Participant>>,
452 local_participant: LocalParticipant,
453 remote_participants: RwLock<HashMap<ParticipantIdentity, RemoteParticipant>>,
454 e2ee_manager: E2eeManager,
455 incoming_stream_manager: IncomingStreamManager,
456 outgoing_stream_manager: OutgoingStreamManager,
457 handle: AsyncMutex<Option<Handle>>,
458}
459
460struct Handle {
461 room_handle: JoinHandle<()>,
462 incoming_stream_handle: JoinHandle<()>,
463 outgoing_stream_handle: JoinHandle<()>,
464 close_tx: broadcast::Sender<()>,
465}
466
467impl Debug for RoomSession {
468 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469 let info = self.info.read();
470 f.debug_struct("SessionInner")
471 .field("sid", &info.sid)
472 .field("name", &info.name)
473 .field("rtc_engine", &self.rtc_engine)
474 .finish()
475 }
476}
477
478impl Room {
479 pub async fn connect(
480 url: &str,
481 token: &str,
482 mut options: RoomOptions,
483 ) -> RoomResult<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
484 let with_dc_encryption = options.encryption.is_some();
486 let encryption_options = options.encryption.take().or(options.e2ee.take());
487 let e2ee_manager = E2eeManager::new(encryption_options, with_dc_encryption);
488 let mut signal_options = SignalOptions::default();
489 signal_options.sdk_options = options.sdk_options.clone().into();
490 signal_options.auto_subscribe = options.auto_subscribe;
491 signal_options.adaptive_stream = options.adaptive_stream;
492 signal_options.single_peer_connection = options.single_peer_connection;
493 signal_options.connect_timeout = options.connect_timeout;
494 let (rtc_engine, join_response, engine_events) = RtcEngine::connect(
495 url,
496 token,
497 EngineOptions {
498 rtc_config: options.rtc_config.clone(),
499 signal_options,
500 join_retries: options.join_retries,
501 single_peer_connection: options.single_peer_connection,
502 },
503 Some(e2ee_manager.clone()),
504 )
505 .await?;
506 let rtc_engine = Arc::new(rtc_engine);
507
508 if let Some(key_provider) = e2ee_manager.key_provider() {
509 key_provider.set_sif_trailer(join_response.sif_trailer);
510 }
511
512 let pi = join_response.participant.unwrap();
513 let local_participant = LocalParticipant::new(
514 rtc_engine.clone(),
515 pi.kind().into(),
516 utils::convert_kind_details(&pi.kind_details),
517 pi.sid.try_into().unwrap(),
518 pi.identity.into(),
519 pi.name,
520 pi.metadata,
521 pi.attributes,
522 e2ee_manager.encryption_type(),
523 pi.permission,
524 );
525
526 let dispatcher = Dispatcher::<RoomEvent>::default();
527 local_participant.on_local_track_published({
528 let dispatcher = dispatcher.clone();
529 let e2ee_manager = e2ee_manager.clone();
530 move |participant, publication| {
531 log::debug!("local track published: {}", publication.sid());
532 let track = publication.track().unwrap();
533 let event = RoomEvent::LocalTrackPublished {
534 participant: participant.clone(),
535 publication: publication.clone(),
536 track: track.clone(),
537 };
538 e2ee_manager.on_local_track_published(track, publication, participant);
539 dispatcher.dispatch(&event);
540 }
541 });
542
543 local_participant.on_local_track_unpublished({
544 let dispatcher = dispatcher.clone();
545 let e2ee_manager = e2ee_manager.clone();
546 move |participant, publication| {
547 log::debug!("local track unpublished: {}", publication.sid());
548 let event = RoomEvent::LocalTrackUnpublished {
549 participant: participant.clone(),
550 publication: publication.clone(),
551 };
552 e2ee_manager.on_local_track_unpublished(publication, participant);
553 dispatcher.dispatch(&event);
554 }
555 });
556
557 local_participant.on_track_muted({
558 let dispatcher = dispatcher.clone();
559 move |participant, publication| {
560 let event = RoomEvent::TrackMuted { participant, publication };
561 dispatcher.dispatch(&event);
562 }
563 });
564
565 local_participant.on_track_unmuted({
566 let dispatcher = dispatcher.clone();
567 move |participant, publication| {
568 let event = RoomEvent::TrackUnmuted { participant, publication };
569 dispatcher.dispatch(&event);
570 }
571 });
572
573 local_participant.on_metadata_changed({
574 let dispatcher = dispatcher.clone();
575 move |participant, old_metadata, metadata| {
576 let event =
577 RoomEvent::ParticipantMetadataChanged { participant, old_metadata, metadata };
578 dispatcher.dispatch(&event);
579 }
580 });
581
582 local_participant.on_name_changed({
583 let dispatcher = dispatcher.clone();
584 move |participant, old_name, name| {
585 let event = RoomEvent::ParticipantNameChanged { participant, old_name, name };
586 dispatcher.dispatch(&event);
587 }
588 });
589
590 local_participant.on_attributes_changed({
591 let dispatcher = dispatcher.clone();
592 move |participant, changed_attributes| {
593 let event =
594 RoomEvent::ParticipantAttributesChanged { participant, changed_attributes };
595 dispatcher.dispatch(&event);
596 }
597 });
598
599 local_participant.on_permission_changed({
600 let dispatcher = dispatcher.clone();
601 move |participant, permission| {
602 let event = RoomEvent::ParticipantPermissionChanged { participant, permission };
603 dispatcher.dispatch(&event);
604 }
605 });
606
607 let (incoming_stream_manager, open_rx) = IncomingStreamManager::new();
608 let (outgoing_stream_manager, packet_rx) = OutgoingStreamManager::new();
609
610 let room_info = join_response.room.unwrap();
611 let inner = Arc::new(RoomSession {
612 sid_promise: Promise::new(),
613 info: RwLock::new(RoomInfo {
614 sid: room_info.sid.try_into().ok(),
615 name: room_info.name,
616 metadata: room_info.metadata,
617 empty_timeout: room_info.empty_timeout,
618 departure_timeout: room_info.departure_timeout,
619 max_participants: room_info.max_participants,
620 creation_time: room_info.creation_time_ms,
621 num_publishers: room_info.num_publishers,
622 num_participants: room_info.num_participants,
623 active_recording: room_info.active_recording,
624 state: ConnectionState::Disconnected,
625 lossy_dc_options: Default::default(),
626 reliable_dc_options: Default::default(),
627 }),
628 remote_participants: Default::default(),
629 active_speakers: Default::default(),
630 options: options.clone(),
631 rtc_engine: rtc_engine.clone(),
632 local_participant,
633 dispatcher: dispatcher.clone(),
634 e2ee_manager: e2ee_manager.clone(),
635 incoming_stream_manager,
636 outgoing_stream_manager,
637 handle: Default::default(),
638 });
639 inner.local_participant.set_session(Arc::downgrade(&inner));
640
641 e2ee_manager.on_state_changed({
642 let dispatcher = dispatcher.clone();
643 let inner = inner.clone();
644 move |participant_identity, state| {
645 let participant = if participant_identity.as_str()
649 == inner.local_participant.identity().as_str()
650 {
651 Participant::Local(inner.local_participant.clone())
652 } else if let Some(participant) =
653 inner.remote_participants.read().get(&participant_identity)
654 {
655 Participant::Remote(participant.clone())
656 } else {
657 return;
659 };
660
661 dispatcher.dispatch(&RoomEvent::E2eeStateChanged { participant, state });
662 }
663 });
664
665 for pi in join_response.other_participants {
666 let participant = {
667 let pi = pi.clone();
668 inner.create_participant(
669 pi.kind().into(),
670 utils::convert_kind_details(&pi.kind_details),
671 pi.sid.try_into().unwrap(),
672 pi.identity.into(),
673 pi.name,
674 pi.metadata,
675 pi.attributes,
676 pi.permission,
677 )
678 };
679 participant.update_info(pi.clone());
680 }
681
682 let participants = inner.remote_participants.read().clone();
686 let participants_with_tracks = participants
687 .into_values()
688 .map(|p| (p.clone(), p.track_publications().into_values().collect()))
689 .collect();
690
691 let events = inner.dispatcher.register();
692 inner.dispatcher.dispatch(&RoomEvent::Connected { participants_with_tracks });
693 inner.update_connection_state(ConnectionState::Connected);
694
695 let (close_tx, close_rx) = broadcast::channel(1);
696
697 let incoming_stream_handle = livekit_runtime::spawn(incoming_data_stream_task(
698 open_rx,
699 dispatcher.clone(),
700 close_rx.resubscribe(),
701 ));
702 let outgoing_stream_handle = livekit_runtime::spawn(outgoing_data_stream_task(
703 packet_rx,
704 rtc_engine.clone(),
705 close_rx.resubscribe(),
706 ));
707
708 let room_handle = livekit_runtime::spawn(inner.clone().room_task(engine_events, close_rx));
709
710 let handle =
711 Handle { room_handle, incoming_stream_handle, outgoing_stream_handle, close_tx };
712 inner.handle.lock().await.replace(handle);
713
714 Ok((Self { inner }, events))
715 }
716
717 pub async fn close(&self) -> RoomResult<()> {
718 self.inner.close(DisconnectReason::ClientInitiated).await
719 }
720
721 pub async fn simulate_scenario(&self, scenario: SimulateScenario) -> EngineResult<()> {
722 self.inner.rtc_engine.simulate_scenario(scenario).await
723 }
724
725 pub async fn get_stats(&self) -> EngineResult<SessionStats> {
726 self.inner.rtc_engine.get_stats().await
727 }
728
729 pub fn subscribe(&self) -> mpsc::UnboundedReceiver<RoomEvent> {
730 self.inner.dispatcher.register()
731 }
732
733 pub async fn sid(&self) -> RoomSid {
734 let sid = self.inner.info.read().sid.clone();
736 if sid.is_none() {
737 return self.inner.sid_promise.result().await;
738 }
739 sid.unwrap()
740 }
741
742 pub fn maybe_sid(&self) -> Option<RoomSid> {
743 self.inner.info.read().sid.clone()
744 }
745
746 pub fn name(&self) -> String {
747 self.inner.info.read().name.clone()
748 }
749
750 pub fn metadata(&self) -> String {
751 self.inner.info.read().metadata.clone()
752 }
753
754 pub fn local_participant(&self) -> LocalParticipant {
755 self.inner.local_participant.clone()
756 }
757
758 pub fn connection_state(&self) -> ConnectionState {
759 self.inner.info.read().state
760 }
761
762 pub fn is_single_peer_connection_active(&self) -> bool {
765 self.inner.rtc_engine.session().is_single_pc_mode()
766 }
767
768 pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
769 self.inner.remote_participants.read().clone()
770 }
771
772 pub fn e2ee_manager(&self) -> &E2eeManager {
773 &self.inner.e2ee_manager
774 }
775
776 pub fn data_channel_options(&self, kind: DataPacketKind) -> DataChannelOptions {
777 match kind {
778 DataPacketKind::Lossy => self.inner.info.read().lossy_dc_options.clone(),
779 DataPacketKind::Reliable => self.inner.info.read().reliable_dc_options.clone(),
780 }
781 }
782
783 pub fn empty_timeout(&self) -> u32 {
784 self.inner.info.read().empty_timeout
785 }
786
787 pub fn departure_timeout(&self) -> u32 {
788 self.inner.info.read().departure_timeout
789 }
790
791 pub fn max_participants(&self) -> u32 {
792 self.inner.info.read().max_participants
793 }
794 pub fn creation_time(&self) -> i64 {
796 self.inner.info.read().creation_time
797 }
798
799 pub fn num_participants(&self) -> u32 {
800 self.inner.info.read().num_participants
801 }
802
803 pub fn num_publishers(&self) -> u32 {
804 self.inner.info.read().num_publishers
805 }
806
807 pub fn active_recording(&self) -> bool {
808 self.inner.info.read().active_recording
809 }
810}
811
812impl RoomSession {
813 async fn room_task(
814 self: Arc<Self>,
815 mut engine_events: EngineEvents,
816 mut close_rx: broadcast::Receiver<()>,
817 ) {
818 loop {
819 tokio::select! {
820 Some(event) = engine_events.recv() => {
821 let debug = format!("{:?}", event);
822 let inner = self.clone();
823 let (tx, rx) = oneshot::channel();
824 let task = livekit_runtime::spawn(async move {
825 if let Err(err) = inner.on_engine_event(event).await {
826 log::error!("failed to handle engine event: {:?}", err);
827 }
828 let _ = tx.send(());
829 });
830
831 tokio::select! {
833 _ = rx => {},
834 _ = livekit_runtime::sleep(Duration::from_secs(10)) => {
835 log::error!("engine_event is taking too much time: {}", debug);
836 }
837 }
838
839 task.await;
840 },
841 _ = close_rx.recv() => {
842 break;
843 }
844 }
845 }
846
847 log::debug!("room_task closed");
848 }
849
850 async fn on_engine_event(self: &Arc<Self>, event: EngineEvent) -> RoomResult<()> {
851 match event {
852 EngineEvent::ParticipantUpdate { updates } => self.handle_participant_update(updates),
853 EngineEvent::MediaTrack { track, stream, transceiver } => {
854 self.handle_media_track(track, stream, transceiver)
855 }
856 EngineEvent::RoomUpdate { room } => self.handle_room_update(room),
857 EngineEvent::RoomMoved { moved } => self.handle_room_moved(moved),
858 EngineEvent::Resuming(tx) => self.handle_resuming(tx),
859 EngineEvent::Resumed(tx) => self.handle_resumed(tx),
860 EngineEvent::SignalResumed { reconnect_response, tx } => {
861 self.handle_signal_resumed(reconnect_response, tx)
862 }
863 EngineEvent::Restarting(tx) => self.handle_restarting(tx),
864 EngineEvent::Restarted(tx) => self.handle_restarted(tx),
865 EngineEvent::SignalRestarted { join_response, tx } => {
866 self.handle_signal_restarted(join_response, tx)
867 }
868 EngineEvent::Disconnected { reason } => self.handle_disconnected(reason),
869 EngineEvent::Data {
870 payload,
871 topic,
872 kind,
873 participant_sid,
874 participant_identity,
875 encryption_type,
876 } => {
877 self.handle_data(
878 payload,
879 topic,
880 kind,
881 participant_sid,
882 participant_identity,
883 encryption_type,
884 );
885 }
886 EngineEvent::ChatMessage { participant_identity, message } => {
887 self.handle_chat_message(participant_identity, message);
888 }
889 EngineEvent::Transcription { participant_identity, track_sid, segments } => {
890 self.handle_transcription(participant_identity, track_sid, segments);
891 }
892 EngineEvent::SipDTMF { code, digit, participant_identity } => {
893 self.handle_dtmf(code, digit, participant_identity);
894 }
895 EngineEvent::RpcRequest {
896 caller_identity,
897 request_id,
898 method,
899 payload,
900 response_timeout,
901 version,
902 } => {
903 if caller_identity.is_none() {
904 log::warn!("Received RPC request with null caller identity");
905 return Ok(());
906 }
907 let local_participant = self.local_participant.clone();
908 livekit_runtime::spawn(async move {
909 local_participant
910 .handle_incoming_rpc_request(
911 caller_identity.unwrap(),
912 request_id,
913 method,
914 payload,
915 response_timeout,
916 version,
917 )
918 .await;
919 });
920 }
921 EngineEvent::RpcResponse { request_id, payload, error } => {
922 self.local_participant.handle_incoming_rpc_response(request_id, payload, error);
923 }
924 EngineEvent::RpcAck { request_id } => {
925 self.local_participant.handle_incoming_rpc_ack(request_id);
926 }
927 EngineEvent::SpeakersChanged { speakers } => self.handle_speakers_changed(speakers),
928 EngineEvent::ConnectionQuality { updates } => {
929 self.handle_connection_quality_update(updates)
930 }
931 EngineEvent::LocalTrackSubscribed { track_sid } => {
932 self.handle_track_subscribed(track_sid)
933 }
934 EngineEvent::DataStreamHeader { header, participant_identity, encryption_type } => {
935 self.handle_data_stream_header(header, participant_identity, encryption_type);
936 }
937 EngineEvent::DataStreamChunk { chunk, participant_identity, encryption_type } => {
938 self.handle_data_stream_chunk(chunk, participant_identity, encryption_type);
939 }
940 EngineEvent::DataStreamTrailer { trailer, participant_identity } => {
941 self.handle_data_stream_trailer(trailer, participant_identity);
942 }
943 EngineEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold } => {
944 self.handle_data_channel_buffered_low_threshold_change(kind, threshold);
945 }
946 EngineEvent::RefreshToken { url, token } => {
947 self.handle_refresh_token(url, token);
948 }
949 EngineEvent::TrackMuted { sid, muted } => {
950 self.handle_server_initiated_mute_track(sid, muted);
951 }
952 _ => {}
953 }
954
955 Ok(())
956 }
957
958 async fn close(&self, reason: DisconnectReason) -> RoomResult<()> {
959 let Some(handle) = self.handle.lock().await.take() else { Err(RoomError::AlreadyClosed)? };
960
961 for (sid, _) in self.local_participant.track_publications().iter() {
963 let _ = self.local_participant.unpublish_track(sid).await;
964 }
965
966 self.rtc_engine.close(reason).await;
967 self.e2ee_manager.cleanup();
968
969 let _ = handle.close_tx.send(());
970 let _ = handle.incoming_stream_handle.await;
971 let _ = handle.outgoing_stream_handle.await;
972 let _ = handle.room_handle.await;
973
974 self.dispatcher.clear();
975 Ok(())
976 }
977
978 fn update_connection_state(&self, state: ConnectionState) -> bool {
982 let mut info = self.info.write();
983 if info.state == state {
984 return false;
985 }
986
987 info.state = state;
988 self.dispatcher.dispatch(&RoomEvent::ConnectionStateChanged(state));
989 true
990 }
991
992 fn handle_participant_update(self: &Arc<Self>, updates: Vec<proto::ParticipantInfo>) {
996 let mut participants: Vec<Participant> = Vec::new();
998 for pi in updates {
999 let participant_sid = pi.sid.clone().try_into().unwrap();
1000 let participant_identity: ParticipantIdentity = pi.identity.clone().into();
1001
1002 if participant_sid == self.local_participant.sid()
1003 || participant_identity == self.local_participant.identity()
1004 {
1005 self.local_participant.clone().update_info(pi);
1006 participants.push(Participant::Local(self.local_participant.clone()));
1007 continue;
1008 }
1009
1010 if let Some(remote_participant) =
1012 self.get_participant_by_identity(&participant_identity)
1013 {
1014 if remote_participant.sid() != participant_sid {
1015 self.clone().handle_participant_disconnect(remote_participant);
1017 }
1018 }
1019
1020 let remote_participant = self.get_participant_by_sid(&participant_sid);
1021 if pi.state == proto::participant_info::State::Disconnected as i32 {
1022 if let Some(remote_participant) = remote_participant {
1023 remote_participant.update_info(pi.clone());
1025 self.clone().handle_participant_disconnect(remote_participant)
1026 } else {
1027 }
1030 } else if let Some(remote_participant) = remote_participant {
1031 remote_participant.update_info(pi.clone());
1032 participants.push(Participant::Remote(remote_participant));
1033 } else {
1034 let remote_participant = {
1036 let pi = pi.clone();
1037 self.create_participant(
1038 pi.kind().into(),
1039 utils::convert_kind_details(&pi.kind_details),
1040 pi.sid.try_into().unwrap(),
1041 pi.identity.into(),
1042 pi.name,
1043 pi.metadata,
1044 pi.attributes,
1045 pi.permission,
1046 )
1047 };
1048
1049 self.dispatcher
1050 .dispatch(&RoomEvent::ParticipantConnected(remote_participant.clone()));
1051
1052 remote_participant.update_info(pi.clone()); }
1054 }
1055 if !participants.is_empty() {
1056 self.dispatcher.dispatch(&RoomEvent::ParticipantsUpdated { participants });
1057 }
1058 }
1059
1060 fn handle_media_track(
1061 &self,
1062 track: MediaStreamTrack,
1063 stream: MediaStream,
1064 transceiver: RtpTransceiver,
1065 ) {
1066 let stream_id = stream.id();
1067 let lk_stream_id = unpack_stream_id(&stream_id);
1068 if lk_stream_id.is_none() {
1069 log::error!("received track with an invalid track_id: {:?}", &stream_id);
1070 return;
1071 }
1072
1073 let (participant_sid, stream_id) = lk_stream_id.unwrap();
1074 let mut track_id = track.id();
1075
1076 let session = self.rtc_engine.session();
1078 if session.is_single_pc_mode() {
1079 if let Some(mid) = transceiver.mid() {
1081 if let Some(resolved_track_id) = session.get_track_id_for_mid(&mid) {
1082 log::debug!(
1083 "resolved track_id from mid: mid={}, track_id={}",
1084 mid,
1085 resolved_track_id
1086 );
1087 track_id = resolved_track_id.into();
1088 } else {
1089 log::warn!(
1090 "could not resolve track_id for mid={}, using track.id()={}",
1091 mid,
1092 track_id
1093 );
1094 }
1095 }
1096 } else if stream_id.starts_with("TR") {
1097 track_id = stream_id.into();
1099 }
1100
1101 if !track_id.starts_with("TR") {
1102 log::warn!(
1103 "track_id does not start with TR after resolution: track_id={}, stream_id={}",
1104 track_id,
1105 stream_id
1106 );
1107 }
1108
1109 let participant_sid: ParticipantSid = participant_sid.to_owned().try_into().unwrap();
1110 let track_id: TrackSid = match track_id.to_owned().try_into() {
1111 Ok(track_id) => track_id,
1112 Err(err) => {
1113 log::error!(
1114 "dropping remote track due to invalid TrackSid: track_id={}, stream_id={}, err={:?}",
1115 track_id,
1116 stream_id,
1117 err
1118 );
1119 return;
1120 }
1121 };
1122
1123 let remote_participant = self
1124 .remote_participants
1125 .read()
1126 .values()
1127 .find(|x| &x.sid() == &participant_sid)
1128 .cloned();
1129
1130 if let Some(remote_participant) = remote_participant {
1131 livekit_runtime::spawn(async move {
1132 remote_participant.add_subscribed_media_track(track_id, track, transceiver).await;
1133 });
1134 } else {
1135 log::error!("received track from an unknown participant: {:?}", participant_sid);
1138 }
1139 }
1140
1141 fn handle_speakers_changed(&self, speakers_info: Vec<proto::SpeakerInfo>) {
1144 let mut speakers = Vec::new();
1145
1146 for speaker in speakers_info {
1147 let sid: ParticipantSid = speaker.sid.try_into().unwrap();
1148 let participant = {
1149 if sid == self.local_participant.sid() {
1150 Participant::Local(self.local_participant.clone())
1151 } else if let Some(participant) = self.get_participant_by_sid(&sid) {
1152 Participant::Remote(participant)
1153 } else {
1154 continue;
1155 }
1156 };
1157
1158 participant.set_speaking(speaker.active);
1159 participant.set_audio_level(speaker.level);
1160
1161 if speaker.active {
1162 speakers.push(participant);
1163 }
1164 }
1165
1166 speakers.sort_by(|a, b| a.audio_level().partial_cmp(&b.audio_level()).unwrap());
1167 *self.active_speakers.write() = speakers.clone();
1168
1169 self.dispatcher.dispatch(&RoomEvent::ActiveSpeakersChanged { speakers });
1170 }
1171
1172 fn handle_connection_quality_update(&self, updates: Vec<proto::ConnectionQualityInfo>) {
1175 for update in updates {
1176 let quality: ConnectionQuality = update.quality().into();
1177 let sid: ParticipantSid = update.participant_sid.try_into().unwrap();
1178 let participant = {
1179 if sid == self.local_participant.sid() {
1180 Participant::Local(self.local_participant.clone())
1181 } else if let Some(participant) = self.get_participant_by_sid(&sid) {
1182 Participant::Remote(participant)
1183 } else {
1184 continue;
1185 }
1186 };
1187
1188 participant.set_connection_quality(quality);
1189 self.dispatcher.dispatch(&RoomEvent::ConnectionQualityChanged { participant, quality });
1190 }
1191 }
1192
1193 fn handle_track_subscribed(&self, track_sid: String) {
1196 let publications = self.local_participant.track_publications().clone();
1197 let publication = publications.get(&track_sid.to_owned().try_into().unwrap());
1198 if let Some(publication) = publication {
1199 self.dispatcher
1200 .dispatch(&RoomEvent::LocalTrackSubscribed { track: publication.track().unwrap() });
1201 }
1202 }
1203
1204 async fn send_sync_state(self: &Arc<Self>) {
1205 let auto_subscribe = self.options.auto_subscribe;
1206 let session = self.rtc_engine.session();
1207 let single_pc_mode = session.is_single_pc_mode();
1208
1209 let (offer, answer) = if single_pc_mode {
1212 let pub_pc = session.publisher().peer_connection();
1213 let Some(local_desc) = pub_pc.current_local_description() else {
1214 log::warn!("skipping sendSyncState, no publisher offer");
1215 return;
1216 };
1217 let remote_desc = pub_pc.current_remote_description();
1218 (local_desc, remote_desc)
1220 } else {
1221 let Some(sub_pc) = session.subscriber() else {
1222 log::warn!("skipping sendSyncState, no subscriber");
1223 return;
1224 };
1225 let sub_pc = sub_pc.peer_connection();
1226 let Some(local_desc) = sub_pc.current_local_description() else {
1227 log::warn!("skipping sendSyncState, no subscriber answer");
1228 return;
1229 };
1230 let Some(remote_desc) = sub_pc.current_remote_description() else {
1231 log::warn!("skipping sendSyncState, no subscriber offer");
1232 return;
1233 };
1234 (remote_desc, Some(local_desc))
1236 };
1237
1238 let mut track_sids = Vec::new();
1239 for (_, participant) in self.remote_participants.read().clone() {
1240 for (track_sid, track) in participant.track_publications() {
1241 if track.is_desired() != auto_subscribe {
1242 track_sids.push(track_sid.to_string());
1243 }
1244 }
1245 }
1246
1247 let mut dcs = Vec::with_capacity(4);
1248 if session.has_published() {
1249 let lossy_dc =
1250 session.data_channel(SignalTarget::Publisher, DataPacketKind::Lossy).unwrap();
1251 let reliable_dc =
1252 session.data_channel(SignalTarget::Publisher, DataPacketKind::Reliable).unwrap();
1253
1254 dcs.push(proto::DataChannelInfo {
1255 label: lossy_dc.label(),
1256 id: lossy_dc.id() as u32,
1257 target: proto::SignalTarget::Publisher as i32,
1258 });
1259
1260 dcs.push(proto::DataChannelInfo {
1261 label: reliable_dc.label(),
1262 id: reliable_dc.id() as u32,
1263 target: proto::SignalTarget::Publisher as i32,
1264 });
1265 }
1266
1267 if let Some(lossy_dc) =
1268 session.data_channel(SignalTarget::Subscriber, DataPacketKind::Lossy)
1269 {
1270 dcs.push(proto::DataChannelInfo {
1271 label: lossy_dc.label(),
1272 id: lossy_dc.id() as u32,
1273 target: proto::SignalTarget::Subscriber as i32,
1274 });
1275 }
1276
1277 if let Some(reliable_dc) =
1278 session.data_channel(SignalTarget::Subscriber, DataPacketKind::Reliable)
1279 {
1280 dcs.push(proto::DataChannelInfo {
1281 label: reliable_dc.label(),
1282 id: reliable_dc.id() as u32,
1283 target: proto::SignalTarget::Subscriber as i32,
1284 });
1285 }
1286
1287 let sync_state = proto::SyncState {
1288 answer: answer.map(|a| proto::SessionDescription {
1289 sdp: a.to_string(),
1290 r#type: a.sdp_type().to_string(),
1291 id: 0,
1292 mid_to_track_id: Default::default(),
1293 }),
1294 offer: Some(proto::SessionDescription {
1295 sdp: offer.to_string(),
1296 r#type: offer.sdp_type().to_string(),
1297 id: 0,
1298 mid_to_track_id: Default::default(),
1299 }),
1300 track_sids_disabled: Vec::default(), subscription: Some(proto::UpdateSubscription {
1302 track_sids,
1303 subscribe: !auto_subscribe,
1304 participant_tracks: Vec::new(),
1305 }),
1306 publish_tracks: self.local_participant.published_tracks_info(),
1307 data_channels: dcs,
1308 datachannel_receive_states: session.data_channel_receive_states(),
1309 publish_data_tracks: Default::default(),
1310 };
1311
1312 log::debug!("sending sync state {:?}", sync_state);
1313 self.rtc_engine.send_request(proto::signal_request::Message::SyncState(sync_state)).await;
1314 }
1315
1316 fn handle_room_update(self: &Arc<Self>, room: proto::Room) {
1317 let mut info = self.info.write();
1318 let old_metadata = std::mem::replace(&mut info.metadata, room.metadata.clone());
1319 let mut updated = false;
1320 if old_metadata != room.metadata {
1321 updated = true;
1322 self.dispatcher.dispatch(&RoomEvent::RoomMetadataChanged {
1323 old_metadata,
1324 metadata: info.metadata.clone(),
1325 });
1326 }
1327 if !room.sid.is_empty() {
1328 let sid = room.sid.try_into().ok();
1329 info.sid = sid.clone();
1330 if let Some(sid) = sid {
1331 let _ = self.sid_promise.resolve(sid);
1332 }
1333 }
1334 if info.name != room.name {
1335 updated = true;
1336 info.name = room.name;
1337 }
1338 if info.empty_timeout != room.empty_timeout {
1339 updated = true;
1340 info.empty_timeout = room.empty_timeout;
1341 }
1342 if info.departure_timeout != room.departure_timeout {
1343 updated = true;
1344 info.departure_timeout = room.departure_timeout;
1345 }
1346 if info.max_participants != room.max_participants {
1347 updated = true;
1348 info.max_participants = room.max_participants;
1349 }
1350 if info.num_participants != room.num_participants {
1351 updated = true;
1352 info.num_participants = room.num_participants;
1353 }
1354 if info.num_publishers != room.num_publishers {
1355 updated = true;
1356 info.num_publishers = room.num_publishers;
1357 }
1358 if info.active_recording != room.active_recording {
1359 updated = true;
1360 info.active_recording = room.active_recording;
1361 }
1362 info.creation_time = room.creation_time_ms;
1363 if updated {
1364 self.dispatcher.dispatch(&RoomEvent::RoomUpdated { room: info.clone() });
1365 }
1366 }
1367
1368 fn handle_room_moved(self: &Arc<Self>, moved: proto::RoomMovedResponse) {
1369 self.handle_refresh_token(self.rtc_engine.session().signal_client().url(), moved.token);
1370 if let Some(local_participant) = moved.participant {
1371 self.local_participant.update_info(local_participant);
1372 self.dispatcher.dispatch(&RoomEvent::ParticipantsUpdated {
1373 participants: vec![Participant::Local(self.local_participant.clone())],
1374 });
1375 }
1376 self.handle_participant_update(moved.other_participants);
1377 if let Some(room) = moved.room {
1378 self.handle_room_update(room);
1379 }
1380 let info = self.info.read();
1381 self.dispatcher.dispatch(&RoomEvent::Moved { room: info.clone() });
1382 }
1383
1384 fn handle_resuming(self: &Arc<Self>, tx: oneshot::Sender<()>) {
1385 if self.update_connection_state(ConnectionState::Reconnecting) {
1386 self.dispatcher.dispatch(&RoomEvent::Reconnecting);
1387 }
1388
1389 let _ = tx.send(());
1390 }
1391
1392 fn handle_resumed(self: &Arc<Self>, tx: oneshot::Sender<()>) {
1393 self.update_connection_state(ConnectionState::Connected);
1394 self.dispatcher.dispatch(&RoomEvent::Reconnected);
1395
1396 let _ = tx.send(());
1397
1398 let local_participant = self.local_participant.clone();
1399 livekit_runtime::spawn(async move {
1400 local_participant.update_track_subscription_permissions().await;
1401 });
1402 }
1403
1404 fn handle_signal_resumed(
1405 self: &Arc<Self>,
1406 _reconnect_repsonse: proto::ReconnectResponse,
1407 tx: oneshot::Sender<()>,
1408 ) {
1409 livekit_runtime::spawn({
1410 let session = self.clone();
1411 async move {
1412 session.send_sync_state().await;
1413
1414 let _ = tx.send(());
1417 }
1418 });
1419 }
1420
1421 fn handle_restarting(self: &Arc<Self>, tx: oneshot::Sender<()>) {
1422 let participants = self.remote_participants.read().clone();
1424 for (_, participant) in participants.iter() {
1425 self.clone().handle_participant_disconnect(participant.clone());
1426 }
1427
1428 if self.update_connection_state(ConnectionState::Reconnecting) {
1429 self.dispatcher.dispatch(&RoomEvent::Reconnecting);
1430 }
1431
1432 let _ = tx.send(());
1433 }
1434
1435 fn handle_restarted(self: &Arc<Self>, tx: oneshot::Sender<()>) {
1436 let _ = tx.send(());
1437
1438 let published_tracks = self.local_participant.track_publications();
1441
1442 let local_participant = self.local_participant.clone();
1444
1445 livekit_runtime::spawn({
1448 let session = self.clone();
1449 async move {
1450 let mut set = tokio::task::JoinSet::new();
1451
1452 for (_, publication) in published_tracks {
1453 let track = publication.track().unwrap();
1454
1455 let lp = session.local_participant.clone();
1456 let republish = async move {
1457 let _ = lp.unpublish_track(&publication.sid()).await;
1463 if let Err(err) =
1464 lp.publish_track(track.clone(), publication.publish_options()).await
1465 {
1466 log::error!(
1467 "failed to republish track {} after rtc_engine restarted: {}",
1468 track.name(),
1469 err
1470 )
1471 }
1472 };
1473
1474 set.spawn(republish);
1475 }
1476
1477 while set.join_next().await.is_some() {}
1479
1480 local_participant.update_track_subscription_permissions().await;
1481
1482 session.update_connection_state(ConnectionState::Connected);
1483 session.dispatcher.dispatch(&RoomEvent::Reconnected);
1484 }
1485 });
1486 }
1487
1488 fn handle_signal_restarted(
1489 self: &Arc<Self>,
1490 join_response: proto::JoinResponse,
1491 tx: oneshot::Sender<()>,
1492 ) {
1493 self.local_participant.update_info(join_response.participant.unwrap()); self.handle_participant_update(join_response.other_participants);
1496 self.handle_room_update(join_response.room.unwrap());
1497
1498 let _ = tx.send(());
1499 }
1500
1501 fn handle_disconnected(self: &Arc<Self>, reason: DisconnectReason) {
1502 if self.update_connection_state(ConnectionState::Disconnected) {
1503 self.dispatcher.dispatch(&RoomEvent::Disconnected { reason });
1504 }
1505
1506 log::info!("disconnected from room with reason: {:?}", reason);
1507 if reason != DisconnectReason::ClientInitiated {
1508 livekit_runtime::spawn({
1509 let inner = self.clone();
1510 async move {
1511 let _ = inner.close(reason).await;
1512 }
1513 });
1514 }
1515 }
1516
1517 fn handle_data(
1518 &self,
1519 payload: Vec<u8>,
1520 topic: Option<String>,
1521 kind: DataPacketKind,
1522 participant_sid: Option<ParticipantSid>,
1523 participant_identity: Option<ParticipantIdentity>,
1524 encryption_type: proto::encryption::Type,
1525 ) {
1526 let mut participant = participant_identity
1527 .as_ref()
1528 .map(|identity| self.get_participant_by_identity(identity))
1529 .unwrap_or(None);
1530
1531 if participant.is_none() {
1532 participant = participant_sid
1533 .as_ref()
1534 .map(|sid| self.get_participant_by_sid(sid))
1535 .unwrap_or(None);
1536 }
1537
1538 if let Some(ref p) = participant {
1540 use crate::e2ee::EncryptionType;
1541 let is_encrypted = EncryptionType::from(encryption_type) != EncryptionType::None;
1542 p.update_data_encryption_status(is_encrypted);
1543 }
1544
1545 self.dispatcher.dispatch(&RoomEvent::DataReceived {
1546 payload: Arc::new(payload),
1547 topic,
1548 kind,
1549 participant,
1550 });
1551 }
1552
1553 fn handle_chat_message(
1554 &self,
1555 participant_identity: ParticipantIdentity,
1556 chat_message: ChatMessage,
1557 ) {
1558 let participant = self.get_participant_by_identity(&participant_identity);
1559
1560 if participant.is_none() {
1561 return;
1563 }
1564
1565 self.dispatcher.dispatch(&RoomEvent::ChatMessage { message: chat_message, participant });
1566 }
1567
1568 fn handle_dtmf(
1569 &self,
1570 code: u32,
1571 digit: Option<String>,
1572 participant_identity: Option<ParticipantIdentity>,
1573 ) {
1574 let participant = participant_identity
1575 .as_ref()
1576 .map(|identity| self.get_participant_by_identity(identity))
1577 .unwrap_or(None);
1578
1579 if participant.is_none() && participant_identity.is_some() {
1580 return;
1582 }
1583
1584 self.dispatcher.dispatch(&RoomEvent::SipDTMFReceived { code, digit, participant });
1585 }
1586
1587 fn handle_transcription(
1588 &self,
1589 participant_identity: ParticipantIdentity,
1590 track_sid: String,
1591 segments: Vec<TranscriptionSegment>,
1592 ) {
1593 let participant = self.get_local_or_remote_participant(&participant_identity);
1594 let track_sid: TrackSid = track_sid.to_owned().try_into().unwrap();
1595 let track_publication: Option<TrackPublication> = match &participant {
1596 Some(Participant::Local(ref participant)) => {
1597 participant.get_track_publication(&track_sid).map(TrackPublication::Local)
1598 }
1599 Some(Participant::Remote(ref participant)) => {
1600 participant.get_track_publication(&track_sid).map(TrackPublication::Remote)
1601 }
1602 None => None,
1603 };
1604
1605 self.dispatcher.dispatch(&RoomEvent::TranscriptionReceived {
1606 participant,
1607 track_publication,
1608 segments,
1609 });
1610 }
1611
1612 fn handle_data_stream_header(
1613 &self,
1614 header: proto::data_stream::Header,
1615 participant_identity: String,
1616 encryption_type: proto::encryption::Type,
1617 ) {
1618 self.incoming_stream_manager.handle_header(
1619 header.clone(),
1620 participant_identity.clone(),
1621 encryption_type,
1622 );
1623
1624 if let Some(participant) =
1626 self.remote_participants.read().get(&participant_identity.clone().into()).cloned()
1627 {
1628 use crate::e2ee::EncryptionType;
1629 let is_encrypted = EncryptionType::from(encryption_type) != EncryptionType::None;
1630 participant.update_data_encryption_status(is_encrypted);
1631 }
1632
1633 let event = RoomEvent::StreamHeaderReceived { header, participant_identity };
1635 self.dispatcher.dispatch(&event);
1636 }
1637
1638 fn handle_data_stream_chunk(
1639 &self,
1640 chunk: proto::data_stream::Chunk,
1641 participant_identity: String,
1642 encryption_type: proto::encryption::Type,
1643 ) {
1644 self.incoming_stream_manager.handle_chunk(chunk.clone(), encryption_type);
1645
1646 let event = RoomEvent::StreamChunkReceived { chunk, participant_identity };
1648 self.dispatcher.dispatch(&event);
1649 }
1650
1651 fn handle_data_stream_trailer(
1652 &self,
1653 trailer: proto::data_stream::Trailer,
1654 participant_identity: String,
1655 ) {
1656 self.incoming_stream_manager.handle_trailer(trailer.clone());
1657
1658 let event = RoomEvent::StreamTrailerReceived { trailer, participant_identity };
1660 self.dispatcher.dispatch(&event);
1661 }
1662
1663 fn handle_data_channel_buffered_low_threshold_change(
1664 &self,
1665 kind: DataPacketKind,
1666 threshold: u64,
1667 ) {
1668 let mut info = self.info.write();
1669 match kind {
1670 DataPacketKind::Lossy => {
1671 info.lossy_dc_options.buffered_amount_low_threshold = threshold;
1672 }
1673 DataPacketKind::Reliable => {
1674 info.reliable_dc_options.buffered_amount_low_threshold = threshold;
1675 }
1676 }
1677 let event = RoomEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold };
1678 self.dispatcher.dispatch(&event);
1679 }
1680
1681 fn handle_server_initiated_mute_track(&self, sid: String, muted: bool) {
1682 let sid_for_log = sid.clone();
1683 let track_sid = match sid.try_into() {
1684 Ok(sid) => sid,
1685 Err(_) => {
1686 log::warn!("Invalid track sid in mute request: {}", sid_for_log);
1687 return;
1688 }
1689 };
1690
1691 if let Some(publication) = self.local_participant.get_track_publication(&track_sid) {
1692 if muted {
1693 publication.mute();
1694 } else {
1695 publication.unmute();
1696 }
1697 return;
1698 }
1699
1700 log::warn!("Track not found in mute request: {}", sid_for_log);
1701 }
1702
1703 fn create_participant(
1706 self: &Arc<Self>,
1707 kind: ParticipantKind,
1708 kind_details: Vec<ParticipantKindDetail>,
1709 sid: ParticipantSid,
1710 identity: ParticipantIdentity,
1711 name: String,
1712 metadata: String,
1713 attributes: HashMap<String, String>,
1714 permission: Option<proto::ParticipantPermission>,
1715 ) -> RemoteParticipant {
1716 let participant = RemoteParticipant::new(
1717 self.rtc_engine.clone(),
1718 kind,
1719 kind_details,
1720 sid.clone(),
1721 identity.clone(),
1722 name,
1723 metadata,
1724 attributes,
1725 self.options.auto_subscribe,
1726 permission,
1727 );
1728
1729 participant.on_track_published({
1730 let dispatcher = self.dispatcher.clone();
1731 move |participant, publication| {
1732 dispatcher.dispatch(&RoomEvent::TrackPublished { participant, publication });
1733 }
1734 });
1735
1736 participant.on_track_unpublished({
1737 let dispatcher = self.dispatcher.clone();
1738 move |participant, publication| {
1739 dispatcher.dispatch(&RoomEvent::TrackUnpublished { participant, publication });
1740 }
1741 });
1742
1743 participant.on_track_subscribed({
1744 let dispatcher = self.dispatcher.clone();
1745 let e2ee_manager = self.e2ee_manager.clone();
1746 move |participant, publication, track| {
1747 let event = RoomEvent::TrackSubscribed {
1748 participant: participant.clone(),
1749 track: track.clone(),
1750 publication: publication.clone(),
1751 };
1752 e2ee_manager.on_track_subscribed(track, publication, participant);
1753 dispatcher.dispatch(&event);
1754 }
1755 });
1756
1757 participant.on_track_unsubscribed({
1758 let dispatcher = self.dispatcher.clone();
1759 let e2ee_manager = self.e2ee_manager.clone();
1760 move |participant, publication, track| {
1761 let event = RoomEvent::TrackUnsubscribed {
1762 participant: participant.clone(),
1763 track: track.clone(),
1764 publication: publication.clone(),
1765 };
1766 e2ee_manager.on_track_unsubscribed(track, publication, participant);
1767 dispatcher.dispatch(&event);
1768 }
1769 });
1770
1771 participant.on_track_subscription_failed({
1772 let dispatcher = self.dispatcher.clone();
1773 move |participant, track_sid, error| {
1774 dispatcher.dispatch(&RoomEvent::TrackSubscriptionFailed {
1775 participant,
1776 track_sid,
1777 error,
1778 });
1779 }
1780 });
1781
1782 participant.on_track_muted({
1783 let dispatcher = self.dispatcher.clone();
1784 move |participant, publication| {
1785 let event = RoomEvent::TrackMuted { participant, publication };
1786 dispatcher.dispatch(&event);
1787 }
1788 });
1789
1790 participant.on_track_unmuted({
1791 let dispatcher = self.dispatcher.clone();
1792 move |participant, publication| {
1793 let event = RoomEvent::TrackUnmuted { participant, publication };
1794 dispatcher.dispatch(&event);
1795 }
1796 });
1797
1798 participant.on_metadata_changed({
1799 let dispatcher = self.dispatcher.clone();
1800 move |participant, old_metadata, metadata| {
1801 let event =
1802 RoomEvent::ParticipantMetadataChanged { participant, old_metadata, metadata };
1803 dispatcher.dispatch(&event);
1804 }
1805 });
1806
1807 participant.on_name_changed({
1808 let dispatcher = self.dispatcher.clone();
1809 move |participant, old_name, name| {
1810 let event = RoomEvent::ParticipantNameChanged { participant, old_name, name };
1811 dispatcher.dispatch(&event);
1812 }
1813 });
1814
1815 participant.on_attributes_changed({
1816 let dispatcher = self.dispatcher.clone();
1817 move |participant, changed_attributes| {
1818 let event =
1819 RoomEvent::ParticipantAttributesChanged { participant, changed_attributes };
1820 dispatcher.dispatch(&event);
1821 }
1822 });
1823
1824 participant.on_permission_changed({
1825 let dispatcher = self.dispatcher.clone();
1826 move |participant, permission| {
1827 let event = RoomEvent::ParticipantPermissionChanged { participant, permission };
1828 dispatcher.dispatch(&event);
1829 }
1830 });
1831
1832 participant.on_encryption_status_changed({
1833 let dispatcher = self.dispatcher.clone();
1834 move |participant, is_encrypted| {
1835 let event =
1836 RoomEvent::ParticipantEncryptionStatusChanged { participant, is_encrypted };
1837 dispatcher.dispatch(&event);
1838 }
1839 });
1840
1841 let mut participants = self.remote_participants.write();
1842 participants.insert(identity, participant.clone());
1843 participant
1844 }
1845
1846 fn handle_participant_disconnect(self: Arc<Self>, remote_participant: RemoteParticipant) {
1849 for (sid, _) in remote_participant.track_publications() {
1850 remote_participant.unpublish_track(&sid);
1851 }
1852
1853 let mut participants = self.remote_participants.write();
1854 participants.remove(&remote_participant.identity());
1855 self.dispatcher.dispatch(&RoomEvent::ParticipantDisconnected(remote_participant));
1856 }
1857
1858 fn get_participant_by_sid(&self, sid: &ParticipantSid) -> Option<RemoteParticipant> {
1859 self.remote_participants.read().values().find(|x| &x.sid() == sid).cloned()
1860 }
1861
1862 fn get_participant_by_identity(
1863 &self,
1864 identity: &ParticipantIdentity,
1865 ) -> Option<RemoteParticipant> {
1866 self.remote_participants.read().get(identity).cloned()
1867 }
1868
1869 fn get_local_or_remote_participant(
1870 &self,
1871 identity: &ParticipantIdentity,
1872 ) -> Option<Participant> {
1873 if identity == &self.local_participant.identity() {
1874 return Some(Participant::Local(self.local_participant.clone()));
1875 }
1876 return self.get_participant_by_identity(identity).map(Participant::Remote);
1877 }
1878
1879 fn handle_refresh_token(self: &Arc<Self>, url: String, token: String) {
1880 for filter in registered_audio_filter_plugins().into_iter() {
1882 filter.update_token(url.clone(), token.clone());
1883 }
1884 let event = RoomEvent::TokenRefreshed { token };
1885 self.dispatcher.dispatch(&event);
1886 }
1887}
1888
1889async fn incoming_data_stream_task(
1891 mut open_rx: UnboundedReceiver<(AnyStreamReader, String)>,
1892 dispatcher: Dispatcher<RoomEvent>,
1893 mut close_rx: broadcast::Receiver<()>,
1894) {
1895 loop {
1896 tokio::select! {
1897 Some((reader, identity)) = open_rx.recv() => {
1898 match reader {
1899 AnyStreamReader::Byte(reader) => dispatcher.dispatch(&RoomEvent::ByteStreamOpened {
1900 topic: reader.info().topic.clone(),
1901 reader: TakeCell::new(reader),
1902 participant_identity: ParticipantIdentity(identity)
1903 }),
1904 AnyStreamReader::Text(reader) => dispatcher.dispatch(&RoomEvent::TextStreamOpened {
1905 topic: reader.info().topic.clone(),
1906 reader: TakeCell::new(reader),
1907 participant_identity: ParticipantIdentity(identity)
1908 }),
1909 }
1910 },
1911 _ = close_rx.recv() => {
1912 break;
1913 }
1914 }
1915 }
1916}
1917
1918async fn outgoing_data_stream_task(
1920 mut packet_rx: UnboundedRequestReceiver<proto::DataPacket, Result<(), EngineError>>,
1921 engine: Arc<RtcEngine>,
1922 mut close_rx: broadcast::Receiver<()>,
1923) {
1924 loop {
1925 tokio::select! {
1926 Ok((packet, responder)) = packet_rx.recv() => {
1927 let result = engine.publish_data(packet, DataPacketKind::Reliable, false).await;
1928 let _ = responder.respond(result);
1929 },
1930 _ = close_rx.recv() => {
1931 break;
1932 }
1933 }
1934 }
1935}
1936
1937fn unpack_stream_id(stream_id: &str) -> Option<(&str, &str)> {
1938 let split: Vec<&str> = stream_id.split('|').collect();
1939 if split.len() == 2 {
1940 let participant_sid = split.first().unwrap();
1941 let track_sid = split.get(1).unwrap();
1942 Some((participant_sid, track_sid))
1943 } else {
1944 None
1945 }
1946}