Skip to main content

gosuto_livekit/room/participant/
remote_participant.rs

1// Copyright 2025 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{HashMap, HashSet},
17    fmt::Debug,
18    sync::Arc,
19    time::Duration,
20};
21
22use gosuto_libwebrtc::prelude::*;
23use livekit_protocol as proto;
24use livekit_runtime::timeout;
25use parking_lot::Mutex;
26
27use super::{
28    ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantKindDetail, TrackKind,
29};
30use crate::{
31    prelude::*,
32    rtc_engine::RtcEngine,
33    track::{TrackError, VideoQuality},
34};
35
36const ADD_TRACK_TIMEOUT: Duration = Duration::from_secs(5);
37
38type TrackPublishedHandler = Box<dyn Fn(RemoteParticipant, RemoteTrackPublication) + Send>;
39type TrackUnpublishedHandler = Box<dyn Fn(RemoteParticipant, RemoteTrackPublication) + Send>;
40type TrackSubscribedHandler =
41    Box<dyn Fn(RemoteParticipant, RemoteTrackPublication, RemoteTrack) + Send>;
42type TrackUnsubscribedHandler =
43    Box<dyn Fn(RemoteParticipant, RemoteTrackPublication, RemoteTrack) + Send>;
44type TrackSubscriptionFailedHandler = Box<dyn Fn(RemoteParticipant, TrackSid, TrackError) + Send>;
45
46#[derive(Default)]
47struct RemoteEvents {
48    track_published: Mutex<Option<TrackPublishedHandler>>,
49    track_unpublished: Mutex<Option<TrackUnpublishedHandler>>,
50    track_subscribed: Mutex<Option<TrackSubscribedHandler>>,
51    track_unsubscribed: Mutex<Option<TrackUnsubscribedHandler>>,
52    track_subscription_failed: Mutex<Option<TrackSubscriptionFailedHandler>>,
53}
54
55struct RemoteInfo {
56    events: Arc<RemoteEvents>,
57    auto_subscribe: bool, // better way to access this from room?
58}
59
60#[derive(Clone)]
61pub struct RemoteParticipant {
62    inner: Arc<ParticipantInner>,
63    remote: Arc<RemoteInfo>,
64}
65
66impl Debug for RemoteParticipant {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("RemoteParticipant")
69            .field("sid", &self.sid())
70            .field("identity", &self.identity())
71            .field("name", &self.name())
72            .finish()
73    }
74}
75
76impl RemoteParticipant {
77    pub(crate) fn new(
78        rtc_engine: Arc<RtcEngine>,
79        kind: ParticipantKind,
80        kind_details: Vec<ParticipantKindDetail>,
81        sid: ParticipantSid,
82        identity: ParticipantIdentity,
83        name: String,
84        metadata: String,
85        attributes: HashMap<String, String>,
86        auto_subscribe: bool,
87        permission: Option<proto::ParticipantPermission>,
88    ) -> Self {
89        Self {
90            inner: super::new_inner(
91                rtc_engine,
92                sid,
93                identity,
94                name,
95                metadata,
96                attributes,
97                kind,
98                kind_details,
99                permission,
100            ),
101            remote: Arc::new(RemoteInfo { events: Default::default(), auto_subscribe }),
102        }
103    }
104
105    pub(crate) fn internal_track_publications(&self) -> HashMap<TrackSid, TrackPublication> {
106        self.inner.track_publications.read().clone()
107    }
108
109    pub(crate) async fn add_subscribed_media_track(
110        &self,
111        sid: TrackSid,
112        media_track: MediaStreamTrack,
113        transceiver: RtpTransceiver,
114    ) {
115        let wait_publication = {
116            let participant = self.clone();
117            let sid = sid.clone();
118            async move {
119                loop {
120                    let publication = participant.get_track_publication(&sid);
121                    if let Some(publication) = publication {
122                        return publication;
123                    }
124
125                    livekit_runtime::sleep(Duration::from_millis(50)).await;
126                }
127            }
128        };
129
130        if let Ok(remote_publication) = timeout(ADD_TRACK_TIMEOUT, wait_publication).await {
131            let track = match remote_publication.kind() {
132                TrackKind::Audio => {
133                    if let MediaStreamTrack::Audio(rtc_track) = media_track {
134                        let audio_track = RemoteAudioTrack::new(
135                            remote_publication.sid(),
136                            remote_publication.name(),
137                            rtc_track,
138                        );
139                        RemoteTrack::Audio(audio_track)
140                    } else {
141                        unreachable!();
142                    }
143                }
144                TrackKind::Video => {
145                    if let MediaStreamTrack::Video(rtc_track) = media_track {
146                        let video_track = RemoteVideoTrack::new(
147                            remote_publication.sid(),
148                            remote_publication.name(),
149                            rtc_track,
150                        );
151                        RemoteTrack::Video(video_track)
152                    } else {
153                        unreachable!()
154                    }
155                }
156            };
157
158            track.set_transceiver(Some(transceiver));
159
160            //track.set_muted(remote_publication.is_muted());
161            track.update_info(proto::TrackInfo {
162                sid: remote_publication.sid().to_string(),
163                name: remote_publication.name(),
164                r#type: proto::TrackType::from(remote_publication.kind()) as i32,
165                source: proto::TrackSource::from(remote_publication.source()) as i32,
166                muted: remote_publication.is_muted(),
167                ..Default::default()
168            });
169
170            self.add_publication(TrackPublication::Remote(remote_publication.clone()));
171            track.enable();
172
173            remote_publication.set_track(Some(track)); // This will fire TrackSubscribed on the
174                                                       // publication
175        } else {
176            log::error!("could not find published track with sid: {:?}", sid);
177
178            if let Some(track_subscription_failed) =
179                self.remote.events.track_subscription_failed.lock().as_ref()
180            {
181                track_subscription_failed(
182                    self.clone(),
183                    sid.clone(),
184                    TrackError::TrackNotFound(sid),
185                );
186            }
187        }
188    }
189
190    pub(crate) fn unpublish_track(&self, sid: &TrackSid) {
191        if let Some(publication) = self.get_track_publication(sid) {
192            // Unsubscribe to the track if needed
193            if let Some(track) = publication.track() {
194                track.disable();
195                publication.set_track(None); // This will fire TrackUnsubscribed on the publication
196            }
197
198            self.remove_publication(sid);
199
200            if let Some(track_unpublished) = self.remote.events.track_unpublished.lock().as_ref() {
201                track_unpublished(self.clone(), publication);
202            }
203        }
204    }
205
206    pub(crate) fn update_info(&self, info: proto::ParticipantInfo) {
207        super::update_info(&self.inner, &Participant::Remote(self.clone()), info.clone());
208
209        let mut valid_tracks = HashSet::<TrackSid>::new();
210        for track in info.tracks {
211            let track_sid = track.sid.clone().try_into().unwrap();
212            if let Some(publication) = self.get_track_publication(&track_sid) {
213                publication.update_info(track.clone());
214            } else {
215                let publication =
216                    RemoteTrackPublication::new(track.clone(), None, self.remote.auto_subscribe);
217
218                self.add_publication(TrackPublication::Remote(publication.clone()));
219
220                // This is a new track, dispatch publish event
221                if let Some(track_published) = self.remote.events.track_published.lock().as_ref() {
222                    track_published(self.clone(), publication);
223                }
224            }
225
226            valid_tracks.insert(track_sid);
227        }
228
229        // remove tracks that are no longer valid
230        let tracks = self.inner.track_publications.read().clone();
231        for sid in tracks.keys() {
232            if valid_tracks.contains(sid) {
233                continue;
234            }
235
236            self.unpublish_track(sid);
237        }
238    }
239
240    pub(crate) fn on_track_published(
241        &self,
242        track_published: impl Fn(RemoteParticipant, RemoteTrackPublication) + Send + 'static,
243    ) {
244        *self.remote.events.track_published.lock() = Some(Box::new(track_published));
245    }
246
247    pub(crate) fn on_track_unpublished(
248        &self,
249        track_unpublished: impl Fn(RemoteParticipant, RemoteTrackPublication) + Send + 'static,
250    ) {
251        *self.remote.events.track_unpublished.lock() = Some(Box::new(track_unpublished));
252    }
253
254    pub(crate) fn on_track_subscribed(
255        &self,
256        track_subscribed: impl Fn(RemoteParticipant, RemoteTrackPublication, RemoteTrack)
257            + Send
258            + 'static,
259    ) {
260        *self.remote.events.track_subscribed.lock() = Some(Box::new(track_subscribed));
261    }
262
263    pub(crate) fn on_track_unsubscribed(
264        &self,
265        track_unsubscribed: impl Fn(RemoteParticipant, RemoteTrackPublication, RemoteTrack)
266            + Send
267            + 'static,
268    ) {
269        *self.remote.events.track_unsubscribed.lock() = Some(Box::new(track_unsubscribed));
270    }
271
272    pub(crate) fn on_track_subscription_failed(
273        &self,
274        track_subscription_failed: impl Fn(RemoteParticipant, TrackSid, TrackError) + Send + 'static,
275    ) {
276        *self.remote.events.track_subscription_failed.lock() =
277            Some(Box::new(track_subscription_failed));
278    }
279
280    pub(crate) fn on_track_muted(
281        &self,
282        handler: impl Fn(Participant, TrackPublication) + Send + 'static,
283    ) {
284        super::on_track_muted(&self.inner, handler)
285    }
286
287    pub(crate) fn on_track_unmuted(
288        &self,
289        handler: impl Fn(Participant, TrackPublication) + Send + 'static,
290    ) {
291        super::on_track_unmuted(&self.inner, handler)
292    }
293
294    pub(crate) fn on_metadata_changed(
295        &self,
296        handler: impl Fn(Participant, String, String) + Send + 'static,
297    ) {
298        super::on_metadata_changed(&self.inner, handler)
299    }
300
301    pub(crate) fn on_name_changed(
302        &self,
303        handler: impl Fn(Participant, String, String) + Send + 'static,
304    ) {
305        super::on_name_changed(&self.inner, handler)
306    }
307
308    pub(crate) fn on_attributes_changed(
309        &self,
310        handler: impl Fn(Participant, HashMap<String, String>) + Send + 'static,
311    ) {
312        super::on_attributes_changed(&self.inner, handler)
313    }
314
315    pub(crate) fn on_permission_changed(
316        &self,
317        handler: impl Fn(Participant, Option<proto::ParticipantPermission>) + Send + 'static,
318    ) {
319        super::on_permission_changed(&self.inner, handler)
320    }
321
322    pub(crate) fn on_encryption_status_changed(
323        &self,
324        handler: impl Fn(Participant, bool) + Send + 'static,
325    ) {
326        super::on_encryption_status_changed(&self.inner, handler);
327    }
328
329    pub(crate) fn set_speaking(&self, speaking: bool) {
330        super::set_speaking(&self.inner, &Participant::Remote(self.clone()), speaking);
331    }
332
333    pub(crate) fn set_audio_level(&self, level: f32) {
334        super::set_audio_level(&self.inner, &Participant::Remote(self.clone()), level);
335    }
336
337    pub(crate) fn set_connection_quality(&self, quality: ConnectionQuality) {
338        super::set_connection_quality(&self.inner, &Participant::Remote(self.clone()), quality);
339    }
340
341    pub(crate) fn add_publication(&self, publication: TrackPublication) {
342        super::add_publication(
343            &self.inner,
344            &Participant::Remote(self.clone()),
345            publication.clone(),
346        );
347
348        let TrackPublication::Remote(publication) = publication else {
349            panic!("expected remote publication");
350        };
351
352        publication.on_subscription_update_needed({
353            let rtc_engine = self.inner.rtc_engine.clone();
354            let psid = self.sid();
355            move |publication, subscribed| {
356                let rtc_engine = rtc_engine.clone();
357                let psid = psid.clone();
358                livekit_runtime::spawn(async move {
359                    let tsid: String = publication.sid().into();
360                    let update_subscription = proto::UpdateSubscription {
361                        track_sids: vec![tsid.clone()],
362                        subscribe: subscribed,
363                        participant_tracks: vec![proto::ParticipantTracks {
364                            participant_sid: psid.into(),
365                            track_sids: vec![tsid],
366                        }],
367                    };
368
369                    let _ = rtc_engine
370                        .send_request(proto::signal_request::Message::Subscription(
371                            update_subscription,
372                        ))
373                        .await;
374                });
375            }
376        });
377
378        publication.on_subscribed({
379            let events = self.remote.events.clone();
380            let participant = self.clone();
381            move |publication, track| {
382                if let Some(track_subscribed) = events.track_subscribed.lock().as_ref() {
383                    track_subscribed(participant.clone(), publication, track);
384                }
385            }
386        });
387
388        publication.on_unsubscribed({
389            let events = self.remote.events.clone();
390            let participant = self.clone();
391            move |publication, track| {
392                if let Some(track_unsubscribed) = events.track_unsubscribed.lock().as_ref() {
393                    track_unsubscribed(participant.clone(), publication, track);
394                }
395            }
396        });
397
398        publication.on_enabled_status_changed({
399            let rtc_engine = self.inner.rtc_engine.clone();
400            move |publication, enabled| {
401                let rtc_engine = rtc_engine.clone();
402                livekit_runtime::spawn(async move {
403                    let tsid: String = publication.sid().into();
404                    let TrackDimension(width, height) = publication.dimension();
405                    let update_track_settings = proto::UpdateTrackSettings {
406                        track_sids: vec![tsid.clone()],
407                        disabled: !enabled,
408                        width,
409                        height,
410                        ..Default::default()
411                    };
412
413                    rtc_engine
414                        .send_request(proto::signal_request::Message::TrackSetting(
415                            update_track_settings,
416                        ))
417                        .await
418                });
419            }
420        });
421
422        publication.on_video_dimensions_changed({
423            let rtc_engine = self.inner.rtc_engine.clone();
424            move |publication, dimension| {
425                let rtc_engine = rtc_engine.clone();
426                livekit_runtime::spawn(async move {
427                    let tsid: String = publication.sid().into();
428                    let TrackDimension(width, height) = dimension;
429                    let enabled = publication.is_enabled();
430                    let update_track_settings = proto::UpdateTrackSettings {
431                        track_sids: vec![tsid.clone()],
432                        disabled: !enabled,
433                        width,
434                        height,
435                        ..Default::default()
436                    };
437
438                    rtc_engine
439                        .send_request(proto::signal_request::Message::TrackSetting(
440                            update_track_settings,
441                        ))
442                        .await
443                });
444            }
445        });
446
447        publication.on_video_quality_changed({
448            let rtc_engine = self.inner.rtc_engine.clone();
449            move |publication, quality| {
450                let rtc_engine = rtc_engine.clone();
451                livekit_runtime::spawn(async move {
452                    let tsid: String = publication.sid().into();
453                    let quality = match quality {
454                        VideoQuality::Low => proto::VideoQuality::Low,
455                        VideoQuality::Medium => proto::VideoQuality::Medium,
456                        VideoQuality::High => proto::VideoQuality::High,
457                    }
458                    .into();
459                    let update_track_settings = proto::UpdateTrackSettings {
460                        track_sids: vec![tsid.clone()],
461                        quality,
462                        ..Default::default()
463                    };
464
465                    rtc_engine
466                        .send_request(proto::signal_request::Message::TrackSetting(
467                            update_track_settings,
468                        ))
469                        .await
470                });
471            }
472        });
473    }
474
475    pub(crate) fn remove_publication(&self, sid: &TrackSid) -> Option<TrackPublication> {
476        let publication =
477            super::remove_publication(&self.inner, &Participant::Remote(self.clone()), sid);
478
479        if let Some(publication) = publication.clone() {
480            let TrackPublication::Remote(publication) = publication else {
481                panic!("expected remote publication");
482            };
483
484            publication.on_subscription_update_needed(|_, _| {});
485            publication.on_subscribed(|_, _| {});
486            publication.on_unsubscribed(|_, _| {});
487        }
488
489        publication
490    }
491
492    pub fn get_track_publication(&self, sid: &TrackSid) -> Option<RemoteTrackPublication> {
493        self.inner.track_publications.read().get(sid).map(|track| {
494            if let TrackPublication::Remote(remote) = track {
495                return remote.clone();
496            }
497            unreachable!()
498        })
499    }
500
501    pub fn sid(&self) -> ParticipantSid {
502        self.inner.info.read().sid.clone()
503    }
504
505    pub fn identity(&self) -> ParticipantIdentity {
506        self.inner.info.read().identity.clone()
507    }
508
509    pub fn name(&self) -> String {
510        self.inner.info.read().name.clone()
511    }
512
513    pub fn metadata(&self) -> String {
514        self.inner.info.read().metadata.clone()
515    }
516
517    pub fn attributes(&self) -> HashMap<String, String> {
518        self.inner.info.read().attributes.clone()
519    }
520
521    pub fn is_speaking(&self) -> bool {
522        self.inner.info.read().speaking
523    }
524
525    pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
526        self.inner
527            .track_publications
528            .read()
529            .clone()
530            .into_iter()
531            .map(|(sid, track)| {
532                if let TrackPublication::Remote(remote) = track {
533                    return (sid, remote);
534                }
535                unreachable!()
536            })
537            .collect()
538    }
539
540    pub fn audio_level(&self) -> f32 {
541        self.inner.info.read().audio_level
542    }
543
544    pub fn connection_quality(&self) -> ConnectionQuality {
545        self.inner.info.read().connection_quality
546    }
547
548    pub fn kind(&self) -> ParticipantKind {
549        self.inner.info.read().kind
550    }
551
552    pub fn kind_details(&self) -> Vec<ParticipantKindDetail> {
553        self.inner.info.read().kind_details.clone()
554    }
555
556    pub fn disconnect_reason(&self) -> DisconnectReason {
557        self.inner.info.read().disconnect_reason
558    }
559
560    pub fn permission(&self) -> Option<proto::ParticipantPermission> {
561        self.inner.info.read().permission.clone()
562    }
563
564    pub fn is_encrypted(&self) -> bool {
565        *self.inner.is_encrypted.read()
566    }
567
568    #[doc(hidden)]
569    pub fn update_data_encryption_status(&self, is_encrypted: bool) {
570        super::update_data_encryption_status(
571            &self.inner,
572            &super::Participant::Remote(self.clone()),
573            is_encrypted,
574        );
575    }
576}