1use std::{
16 collections::{HashMap, HashSet},
17 fmt::Debug,
18 sync::Arc,
19 time::Duration,
20};
21
22use gosuto_libwebrtc::prelude::*;
23use livekit_protocol as proto;
24use livekit_runtime::timeout;
25use parking_lot::Mutex;
26
27use super::{
28 ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantKindDetail, TrackKind,
29};
30use crate::{
31 prelude::*,
32 rtc_engine::RtcEngine,
33 track::{TrackError, VideoQuality},
34};
35
36const ADD_TRACK_TIMEOUT: Duration = Duration::from_secs(5);
37
38type TrackPublishedHandler = Box<dyn Fn(RemoteParticipant, RemoteTrackPublication) + Send>;
39type TrackUnpublishedHandler = Box<dyn Fn(RemoteParticipant, RemoteTrackPublication) + Send>;
40type TrackSubscribedHandler =
41 Box<dyn Fn(RemoteParticipant, RemoteTrackPublication, RemoteTrack) + Send>;
42type TrackUnsubscribedHandler =
43 Box<dyn Fn(RemoteParticipant, RemoteTrackPublication, RemoteTrack) + Send>;
44type TrackSubscriptionFailedHandler = Box<dyn Fn(RemoteParticipant, TrackSid, TrackError) + Send>;
45
46#[derive(Default)]
47struct RemoteEvents {
48 track_published: Mutex<Option<TrackPublishedHandler>>,
49 track_unpublished: Mutex<Option<TrackUnpublishedHandler>>,
50 track_subscribed: Mutex<Option<TrackSubscribedHandler>>,
51 track_unsubscribed: Mutex<Option<TrackUnsubscribedHandler>>,
52 track_subscription_failed: Mutex<Option<TrackSubscriptionFailedHandler>>,
53}
54
55struct RemoteInfo {
56 events: Arc<RemoteEvents>,
57 auto_subscribe: bool, }
59
60#[derive(Clone)]
61pub struct RemoteParticipant {
62 inner: Arc<ParticipantInner>,
63 remote: Arc<RemoteInfo>,
64}
65
66impl Debug for RemoteParticipant {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("RemoteParticipant")
69 .field("sid", &self.sid())
70 .field("identity", &self.identity())
71 .field("name", &self.name())
72 .finish()
73 }
74}
75
76impl RemoteParticipant {
77 pub(crate) fn new(
78 rtc_engine: Arc<RtcEngine>,
79 kind: ParticipantKind,
80 kind_details: Vec<ParticipantKindDetail>,
81 sid: ParticipantSid,
82 identity: ParticipantIdentity,
83 name: String,
84 metadata: String,
85 attributes: HashMap<String, String>,
86 auto_subscribe: bool,
87 permission: Option<proto::ParticipantPermission>,
88 ) -> Self {
89 Self {
90 inner: super::new_inner(
91 rtc_engine,
92 sid,
93 identity,
94 name,
95 metadata,
96 attributes,
97 kind,
98 kind_details,
99 permission,
100 ),
101 remote: Arc::new(RemoteInfo { events: Default::default(), auto_subscribe }),
102 }
103 }
104
105 pub(crate) fn internal_track_publications(&self) -> HashMap<TrackSid, TrackPublication> {
106 self.inner.track_publications.read().clone()
107 }
108
109 pub(crate) async fn add_subscribed_media_track(
110 &self,
111 sid: TrackSid,
112 media_track: MediaStreamTrack,
113 transceiver: RtpTransceiver,
114 ) {
115 let wait_publication = {
116 let participant = self.clone();
117 let sid = sid.clone();
118 async move {
119 loop {
120 let publication = participant.get_track_publication(&sid);
121 if let Some(publication) = publication {
122 return publication;
123 }
124
125 livekit_runtime::sleep(Duration::from_millis(50)).await;
126 }
127 }
128 };
129
130 if let Ok(remote_publication) = timeout(ADD_TRACK_TIMEOUT, wait_publication).await {
131 let track = match remote_publication.kind() {
132 TrackKind::Audio => {
133 if let MediaStreamTrack::Audio(rtc_track) = media_track {
134 let audio_track = RemoteAudioTrack::new(
135 remote_publication.sid(),
136 remote_publication.name(),
137 rtc_track,
138 );
139 RemoteTrack::Audio(audio_track)
140 } else {
141 unreachable!();
142 }
143 }
144 TrackKind::Video => {
145 if let MediaStreamTrack::Video(rtc_track) = media_track {
146 let video_track = RemoteVideoTrack::new(
147 remote_publication.sid(),
148 remote_publication.name(),
149 rtc_track,
150 );
151 RemoteTrack::Video(video_track)
152 } else {
153 unreachable!()
154 }
155 }
156 };
157
158 track.set_transceiver(Some(transceiver));
159
160 track.update_info(proto::TrackInfo {
162 sid: remote_publication.sid().to_string(),
163 name: remote_publication.name(),
164 r#type: proto::TrackType::from(remote_publication.kind()) as i32,
165 source: proto::TrackSource::from(remote_publication.source()) as i32,
166 muted: remote_publication.is_muted(),
167 ..Default::default()
168 });
169
170 self.add_publication(TrackPublication::Remote(remote_publication.clone()));
171 track.enable();
172
173 remote_publication.set_track(Some(track)); } else {
176 log::error!("could not find published track with sid: {:?}", sid);
177
178 if let Some(track_subscription_failed) =
179 self.remote.events.track_subscription_failed.lock().as_ref()
180 {
181 track_subscription_failed(
182 self.clone(),
183 sid.clone(),
184 TrackError::TrackNotFound(sid),
185 );
186 }
187 }
188 }
189
190 pub(crate) fn unpublish_track(&self, sid: &TrackSid) {
191 if let Some(publication) = self.get_track_publication(sid) {
192 if let Some(track) = publication.track() {
194 track.disable();
195 publication.set_track(None); }
197
198 self.remove_publication(sid);
199
200 if let Some(track_unpublished) = self.remote.events.track_unpublished.lock().as_ref() {
201 track_unpublished(self.clone(), publication);
202 }
203 }
204 }
205
206 pub(crate) fn update_info(&self, info: proto::ParticipantInfo) {
207 super::update_info(&self.inner, &Participant::Remote(self.clone()), info.clone());
208
209 let mut valid_tracks = HashSet::<TrackSid>::new();
210 for track in info.tracks {
211 let track_sid = track.sid.clone().try_into().unwrap();
212 if let Some(publication) = self.get_track_publication(&track_sid) {
213 publication.update_info(track.clone());
214 } else {
215 let publication =
216 RemoteTrackPublication::new(track.clone(), None, self.remote.auto_subscribe);
217
218 self.add_publication(TrackPublication::Remote(publication.clone()));
219
220 if let Some(track_published) = self.remote.events.track_published.lock().as_ref() {
222 track_published(self.clone(), publication);
223 }
224 }
225
226 valid_tracks.insert(track_sid);
227 }
228
229 let tracks = self.inner.track_publications.read().clone();
231 for sid in tracks.keys() {
232 if valid_tracks.contains(sid) {
233 continue;
234 }
235
236 self.unpublish_track(sid);
237 }
238 }
239
240 pub(crate) fn on_track_published(
241 &self,
242 track_published: impl Fn(RemoteParticipant, RemoteTrackPublication) + Send + 'static,
243 ) {
244 *self.remote.events.track_published.lock() = Some(Box::new(track_published));
245 }
246
247 pub(crate) fn on_track_unpublished(
248 &self,
249 track_unpublished: impl Fn(RemoteParticipant, RemoteTrackPublication) + Send + 'static,
250 ) {
251 *self.remote.events.track_unpublished.lock() = Some(Box::new(track_unpublished));
252 }
253
254 pub(crate) fn on_track_subscribed(
255 &self,
256 track_subscribed: impl Fn(RemoteParticipant, RemoteTrackPublication, RemoteTrack)
257 + Send
258 + 'static,
259 ) {
260 *self.remote.events.track_subscribed.lock() = Some(Box::new(track_subscribed));
261 }
262
263 pub(crate) fn on_track_unsubscribed(
264 &self,
265 track_unsubscribed: impl Fn(RemoteParticipant, RemoteTrackPublication, RemoteTrack)
266 + Send
267 + 'static,
268 ) {
269 *self.remote.events.track_unsubscribed.lock() = Some(Box::new(track_unsubscribed));
270 }
271
272 pub(crate) fn on_track_subscription_failed(
273 &self,
274 track_subscription_failed: impl Fn(RemoteParticipant, TrackSid, TrackError) + Send + 'static,
275 ) {
276 *self.remote.events.track_subscription_failed.lock() =
277 Some(Box::new(track_subscription_failed));
278 }
279
280 pub(crate) fn on_track_muted(
281 &self,
282 handler: impl Fn(Participant, TrackPublication) + Send + 'static,
283 ) {
284 super::on_track_muted(&self.inner, handler)
285 }
286
287 pub(crate) fn on_track_unmuted(
288 &self,
289 handler: impl Fn(Participant, TrackPublication) + Send + 'static,
290 ) {
291 super::on_track_unmuted(&self.inner, handler)
292 }
293
294 pub(crate) fn on_metadata_changed(
295 &self,
296 handler: impl Fn(Participant, String, String) + Send + 'static,
297 ) {
298 super::on_metadata_changed(&self.inner, handler)
299 }
300
301 pub(crate) fn on_name_changed(
302 &self,
303 handler: impl Fn(Participant, String, String) + Send + 'static,
304 ) {
305 super::on_name_changed(&self.inner, handler)
306 }
307
308 pub(crate) fn on_attributes_changed(
309 &self,
310 handler: impl Fn(Participant, HashMap<String, String>) + Send + 'static,
311 ) {
312 super::on_attributes_changed(&self.inner, handler)
313 }
314
315 pub(crate) fn on_permission_changed(
316 &self,
317 handler: impl Fn(Participant, Option<proto::ParticipantPermission>) + Send + 'static,
318 ) {
319 super::on_permission_changed(&self.inner, handler)
320 }
321
322 pub(crate) fn on_encryption_status_changed(
323 &self,
324 handler: impl Fn(Participant, bool) + Send + 'static,
325 ) {
326 super::on_encryption_status_changed(&self.inner, handler);
327 }
328
329 pub(crate) fn set_speaking(&self, speaking: bool) {
330 super::set_speaking(&self.inner, &Participant::Remote(self.clone()), speaking);
331 }
332
333 pub(crate) fn set_audio_level(&self, level: f32) {
334 super::set_audio_level(&self.inner, &Participant::Remote(self.clone()), level);
335 }
336
337 pub(crate) fn set_connection_quality(&self, quality: ConnectionQuality) {
338 super::set_connection_quality(&self.inner, &Participant::Remote(self.clone()), quality);
339 }
340
341 pub(crate) fn add_publication(&self, publication: TrackPublication) {
342 super::add_publication(
343 &self.inner,
344 &Participant::Remote(self.clone()),
345 publication.clone(),
346 );
347
348 let TrackPublication::Remote(publication) = publication else {
349 panic!("expected remote publication");
350 };
351
352 publication.on_subscription_update_needed({
353 let rtc_engine = self.inner.rtc_engine.clone();
354 let psid = self.sid();
355 move |publication, subscribed| {
356 let rtc_engine = rtc_engine.clone();
357 let psid = psid.clone();
358 livekit_runtime::spawn(async move {
359 let tsid: String = publication.sid().into();
360 let update_subscription = proto::UpdateSubscription {
361 track_sids: vec![tsid.clone()],
362 subscribe: subscribed,
363 participant_tracks: vec![proto::ParticipantTracks {
364 participant_sid: psid.into(),
365 track_sids: vec![tsid],
366 }],
367 };
368
369 let _ = rtc_engine
370 .send_request(proto::signal_request::Message::Subscription(
371 update_subscription,
372 ))
373 .await;
374 });
375 }
376 });
377
378 publication.on_subscribed({
379 let events = self.remote.events.clone();
380 let participant = self.clone();
381 move |publication, track| {
382 if let Some(track_subscribed) = events.track_subscribed.lock().as_ref() {
383 track_subscribed(participant.clone(), publication, track);
384 }
385 }
386 });
387
388 publication.on_unsubscribed({
389 let events = self.remote.events.clone();
390 let participant = self.clone();
391 move |publication, track| {
392 if let Some(track_unsubscribed) = events.track_unsubscribed.lock().as_ref() {
393 track_unsubscribed(participant.clone(), publication, track);
394 }
395 }
396 });
397
398 publication.on_enabled_status_changed({
399 let rtc_engine = self.inner.rtc_engine.clone();
400 move |publication, enabled| {
401 let rtc_engine = rtc_engine.clone();
402 livekit_runtime::spawn(async move {
403 let tsid: String = publication.sid().into();
404 let TrackDimension(width, height) = publication.dimension();
405 let update_track_settings = proto::UpdateTrackSettings {
406 track_sids: vec![tsid.clone()],
407 disabled: !enabled,
408 width,
409 height,
410 ..Default::default()
411 };
412
413 rtc_engine
414 .send_request(proto::signal_request::Message::TrackSetting(
415 update_track_settings,
416 ))
417 .await
418 });
419 }
420 });
421
422 publication.on_video_dimensions_changed({
423 let rtc_engine = self.inner.rtc_engine.clone();
424 move |publication, dimension| {
425 let rtc_engine = rtc_engine.clone();
426 livekit_runtime::spawn(async move {
427 let tsid: String = publication.sid().into();
428 let TrackDimension(width, height) = dimension;
429 let enabled = publication.is_enabled();
430 let update_track_settings = proto::UpdateTrackSettings {
431 track_sids: vec![tsid.clone()],
432 disabled: !enabled,
433 width,
434 height,
435 ..Default::default()
436 };
437
438 rtc_engine
439 .send_request(proto::signal_request::Message::TrackSetting(
440 update_track_settings,
441 ))
442 .await
443 });
444 }
445 });
446
447 publication.on_video_quality_changed({
448 let rtc_engine = self.inner.rtc_engine.clone();
449 move |publication, quality| {
450 let rtc_engine = rtc_engine.clone();
451 livekit_runtime::spawn(async move {
452 let tsid: String = publication.sid().into();
453 let quality = match quality {
454 VideoQuality::Low => proto::VideoQuality::Low,
455 VideoQuality::Medium => proto::VideoQuality::Medium,
456 VideoQuality::High => proto::VideoQuality::High,
457 }
458 .into();
459 let update_track_settings = proto::UpdateTrackSettings {
460 track_sids: vec![tsid.clone()],
461 quality,
462 ..Default::default()
463 };
464
465 rtc_engine
466 .send_request(proto::signal_request::Message::TrackSetting(
467 update_track_settings,
468 ))
469 .await
470 });
471 }
472 });
473 }
474
475 pub(crate) fn remove_publication(&self, sid: &TrackSid) -> Option<TrackPublication> {
476 let publication =
477 super::remove_publication(&self.inner, &Participant::Remote(self.clone()), sid);
478
479 if let Some(publication) = publication.clone() {
480 let TrackPublication::Remote(publication) = publication else {
481 panic!("expected remote publication");
482 };
483
484 publication.on_subscription_update_needed(|_, _| {});
485 publication.on_subscribed(|_, _| {});
486 publication.on_unsubscribed(|_, _| {});
487 }
488
489 publication
490 }
491
492 pub fn get_track_publication(&self, sid: &TrackSid) -> Option<RemoteTrackPublication> {
493 self.inner.track_publications.read().get(sid).map(|track| {
494 if let TrackPublication::Remote(remote) = track {
495 return remote.clone();
496 }
497 unreachable!()
498 })
499 }
500
501 pub fn sid(&self) -> ParticipantSid {
502 self.inner.info.read().sid.clone()
503 }
504
505 pub fn identity(&self) -> ParticipantIdentity {
506 self.inner.info.read().identity.clone()
507 }
508
509 pub fn name(&self) -> String {
510 self.inner.info.read().name.clone()
511 }
512
513 pub fn metadata(&self) -> String {
514 self.inner.info.read().metadata.clone()
515 }
516
517 pub fn attributes(&self) -> HashMap<String, String> {
518 self.inner.info.read().attributes.clone()
519 }
520
521 pub fn is_speaking(&self) -> bool {
522 self.inner.info.read().speaking
523 }
524
525 pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
526 self.inner
527 .track_publications
528 .read()
529 .clone()
530 .into_iter()
531 .map(|(sid, track)| {
532 if let TrackPublication::Remote(remote) = track {
533 return (sid, remote);
534 }
535 unreachable!()
536 })
537 .collect()
538 }
539
540 pub fn audio_level(&self) -> f32 {
541 self.inner.info.read().audio_level
542 }
543
544 pub fn connection_quality(&self) -> ConnectionQuality {
545 self.inner.info.read().connection_quality
546 }
547
548 pub fn kind(&self) -> ParticipantKind {
549 self.inner.info.read().kind
550 }
551
552 pub fn kind_details(&self) -> Vec<ParticipantKindDetail> {
553 self.inner.info.read().kind_details.clone()
554 }
555
556 pub fn disconnect_reason(&self) -> DisconnectReason {
557 self.inner.info.read().disconnect_reason
558 }
559
560 pub fn permission(&self) -> Option<proto::ParticipantPermission> {
561 self.inner.info.read().permission.clone()
562 }
563
564 pub fn is_encrypted(&self) -> bool {
565 *self.inner.is_encrypted.read()
566 }
567
568 #[doc(hidden)]
569 pub fn update_data_encryption_status(&self, is_encrypted: bool) {
570 super::update_data_encryption_status(
571 &self.inner,
572 &super::Participant::Remote(self.clone()),
573 is_encrypted,
574 );
575 }
576}