Skip to main content

gosuto_livekit/room/participant/
mod.rs

1// Copyright 2025 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::HashMap, fmt::Debug, sync::Arc};
16
17use livekit_protocol as proto;
18use livekit_protocol::enum_dispatch;
19use parking_lot::{Mutex, RwLock};
20
21use crate::{prelude::*, rtc_engine::RtcEngine};
22
23mod local_participant;
24mod remote_participant;
25mod rpc;
26use crate::room::utils;
27
28pub use local_participant::*;
29pub use remote_participant::*;
30pub use rpc::*;
31
32#[derive(Debug, Clone, Copy, Eq, PartialEq)]
33pub enum ConnectionQuality {
34    Excellent,
35    Good,
36    Poor,
37    Lost,
38}
39
40#[derive(Debug, Clone, Copy, Eq, PartialEq)]
41pub enum ParticipantKind {
42    Standard,
43    Ingress,
44    Egress,
45    Sip,
46    Agent,
47    Connector,
48    Bridge,
49}
50
51#[derive(Debug, Clone, Copy, Eq, PartialEq)]
52pub enum ParticipantKindDetail {
53    CloudAgent,
54    Forwarded,
55    ConnectorWhatsapp,
56    ConnectorTwilio,
57    BridgeRtsp,
58}
59
60#[derive(Debug, Clone, Copy, Eq, PartialEq)]
61pub enum DisconnectReason {
62    UnknownReason,
63    ClientInitiated,
64    DuplicateIdentity,
65    ServerShutdown,
66    ParticipantRemoved,
67    RoomDeleted,
68    StateMismatch,
69    JoinFailure,
70    Migration,
71    SignalClose,
72    RoomClosed,
73    UserUnavailable,
74    UserRejected,
75    SipTrunkFailure,
76    ConnectionTimeout,
77    MediaFailure,
78}
79
80#[derive(Debug, Clone)]
81pub enum Participant {
82    Local(LocalParticipant),
83    Remote(RemoteParticipant),
84}
85
86impl Participant {
87    enum_dispatch!(
88        [Local, Remote];
89        pub fn sid(self: &Self) -> ParticipantSid;
90        pub fn identity(self: &Self) -> ParticipantIdentity;
91        pub fn name(self: &Self) -> String;
92        pub fn metadata(self: &Self) -> String;
93        pub fn attributes(self: &Self) -> HashMap<String, String>;
94        pub fn is_speaking(self: &Self) -> bool;
95        pub fn audio_level(self: &Self) -> f32;
96        pub fn connection_quality(self: &Self) -> ConnectionQuality;
97        pub fn kind(self: &Self) -> ParticipantKind;
98        pub fn kind_details(self: &Self) -> Vec<ParticipantKindDetail>;
99        pub fn disconnect_reason(self: &Self) -> DisconnectReason;
100        pub fn is_encrypted(self: &Self) -> bool;
101        pub fn permission(self: &Self) -> Option<proto::ParticipantPermission>;
102
103        pub(crate) fn update_info(self: &Self, info: proto::ParticipantInfo) -> ();
104
105        // Internal functions called by the Room when receiving the associated signal messages
106        pub(crate) fn set_speaking(self: &Self, speaking: bool) -> ();
107        pub(crate) fn set_audio_level(self: &Self, level: f32) -> ();
108        pub(crate) fn set_connection_quality(self: &Self, quality: ConnectionQuality) -> ();
109        pub(crate) fn add_publication(self: &Self, publication: TrackPublication) -> ();
110        pub(crate) fn remove_publication(self: &Self, sid: &TrackSid) -> Option<TrackPublication>;
111        pub(crate) fn update_data_encryption_status(self: &Self, is_encrypted: bool) -> ();
112    );
113
114    pub fn track_publications(&self) -> HashMap<TrackSid, TrackPublication> {
115        match self {
116            Participant::Local(p) => p.internal_track_publications(),
117            Participant::Remote(p) => p.internal_track_publications(),
118        }
119    }
120}
121
122struct ParticipantInfo {
123    pub sid: ParticipantSid,
124    pub identity: ParticipantIdentity,
125    pub name: String,
126    pub metadata: String,
127    pub attributes: HashMap<String, String>,
128    pub speaking: bool,
129    pub audio_level: f32,
130    pub connection_quality: ConnectionQuality,
131    pub kind: ParticipantKind,
132    pub kind_details: Vec<ParticipantKindDetail>,
133    pub disconnect_reason: DisconnectReason,
134    pub permission: Option<proto::ParticipantPermission>,
135}
136
137type TrackMutedHandler = Box<dyn Fn(Participant, TrackPublication) + Send>;
138type TrackUnmutedHandler = Box<dyn Fn(Participant, TrackPublication) + Send>;
139type MetadataChangedHandler = Box<dyn Fn(Participant, String, String) + Send>;
140type AttributesChangedHandler = Box<dyn Fn(Participant, HashMap<String, String>) + Send>;
141type NameChangedHandler = Box<dyn Fn(Participant, String, String) + Send>;
142type EncryptionStatusChangedHandler = Box<dyn Fn(Participant, bool) + Send>;
143type PermissionChangedHandler =
144    Box<dyn Fn(Participant, Option<proto::ParticipantPermission>) + Send>;
145
146#[derive(Default)]
147struct ParticipantEvents {
148    track_muted: Mutex<Option<TrackMutedHandler>>,
149    track_unmuted: Mutex<Option<TrackUnmutedHandler>>,
150    metadata_changed: Mutex<Option<MetadataChangedHandler>>,
151    attributes_changed: Mutex<Option<AttributesChangedHandler>>,
152    name_changed: Mutex<Option<NameChangedHandler>>,
153    encryption_status_changed: Mutex<Option<EncryptionStatusChangedHandler>>,
154    permission_changed: Mutex<Option<PermissionChangedHandler>>,
155}
156
157pub(super) struct ParticipantInner {
158    rtc_engine: Arc<RtcEngine>,
159    info: RwLock<ParticipantInfo>,
160    track_publications: RwLock<HashMap<TrackSid, TrackPublication>>,
161    events: Arc<ParticipantEvents>,
162    is_encrypted: RwLock<bool>,
163    is_data_encrypted: RwLock<Option<bool>>,
164}
165
166#[derive(Clone)]
167pub struct ParticipantTrackPermission {
168    pub participant_identity: ParticipantIdentity,
169    pub allow_all: bool,
170    pub allowed_track_sids: Vec<TrackSid>,
171}
172
173pub(super) fn new_inner(
174    rtc_engine: Arc<RtcEngine>,
175    sid: ParticipantSid,
176    identity: ParticipantIdentity,
177    name: String,
178    metadata: String,
179    attributes: HashMap<String, String>,
180    kind: ParticipantKind,
181    kind_details: Vec<ParticipantKindDetail>,
182    permission: Option<proto::ParticipantPermission>,
183) -> Arc<ParticipantInner> {
184    Arc::new(ParticipantInner {
185        rtc_engine,
186        info: RwLock::new(ParticipantInfo {
187            sid,
188            identity,
189            name,
190            metadata,
191            attributes,
192            kind,
193            kind_details,
194            speaking: false,
195            audio_level: 0.0,
196            connection_quality: ConnectionQuality::Excellent,
197            disconnect_reason: DisconnectReason::UnknownReason,
198            permission,
199        }),
200        track_publications: Default::default(),
201        events: Default::default(),
202        is_encrypted: RwLock::new(false),
203        is_data_encrypted: RwLock::new(None),
204    })
205}
206
207pub(super) fn update_info(
208    inner: &Arc<ParticipantInner>,
209    participant: &Participant,
210    new_info: proto::ParticipantInfo,
211) {
212    let mut info = inner.info.write();
213    info.disconnect_reason = new_info.disconnect_reason().into();
214    info.kind = new_info.kind().into();
215    info.kind_details = super::utils::convert_kind_details(&new_info.kind_details);
216    info.sid = new_info.sid.try_into().unwrap();
217    info.identity = new_info.identity.into();
218
219    let old_name = std::mem::replace(&mut info.name, new_info.name.clone());
220    if old_name != new_info.name {
221        if let Some(cb) = inner.events.name_changed.lock().as_ref() {
222            cb(participant.clone(), old_name, new_info.name);
223        }
224    }
225
226    let old_metadata = std::mem::replace(&mut info.metadata, new_info.metadata.clone());
227    if old_metadata != new_info.metadata {
228        if let Some(cb) = inner.events.metadata_changed.lock().as_ref() {
229            cb(participant.clone(), old_metadata, new_info.metadata);
230        }
231    }
232
233    let old_attributes = std::mem::replace(&mut info.attributes, new_info.attributes.clone());
234    let changed_attributes =
235        utils::calculate_changed_attributes(old_attributes, new_info.attributes.clone());
236    if changed_attributes.len() != 0 {
237        if let Some(cb) = inner.events.attributes_changed.lock().as_ref() {
238            cb(participant.clone(), changed_attributes);
239        }
240    }
241
242    let old_permission = std::mem::replace(&mut info.permission, new_info.permission.clone());
243    if old_permission != new_info.permission {
244        if let Some(cb) = inner.events.permission_changed.lock().as_ref() {
245            cb(participant.clone(), new_info.permission.clone());
246        }
247    }
248}
249
250pub(super) fn set_speaking(
251    inner: &Arc<ParticipantInner>,
252    _participant: &Participant,
253    speaking: bool,
254) {
255    inner.info.write().speaking = speaking;
256}
257
258pub(super) fn set_audio_level(
259    inner: &Arc<ParticipantInner>,
260    _participant: &Participant,
261    audio_level: f32,
262) {
263    inner.info.write().audio_level = audio_level;
264}
265
266pub(super) fn set_connection_quality(
267    inner: &Arc<ParticipantInner>,
268    _participant: &Participant,
269    quality: ConnectionQuality,
270) {
271    inner.info.write().connection_quality = quality;
272}
273
274pub(super) fn on_track_muted(
275    inner: &Arc<ParticipantInner>,
276    handler: impl Fn(Participant, TrackPublication) + Send + 'static,
277) {
278    *inner.events.track_muted.lock() = Some(Box::new(handler));
279}
280
281pub(super) fn on_track_unmuted(
282    inner: &Arc<ParticipantInner>,
283    handler: impl Fn(Participant, TrackPublication) + Send + 'static,
284) {
285    *inner.events.track_unmuted.lock() = Some(Box::new(handler));
286}
287
288pub(super) fn on_metadata_changed(
289    inner: &Arc<ParticipantInner>,
290    handler: impl Fn(Participant, String, String) + Send + 'static,
291) {
292    *inner.events.metadata_changed.lock() = Some(Box::new(handler));
293}
294
295pub(super) fn on_name_changed(
296    inner: &Arc<ParticipantInner>,
297    handler: impl Fn(Participant, String, String) + Send + 'static,
298) {
299    *inner.events.name_changed.lock() = Some(Box::new(handler));
300}
301
302pub(super) fn on_attributes_changed(
303    inner: &Arc<ParticipantInner>,
304    handler: impl Fn(Participant, HashMap<String, String>) + Send + 'static,
305) {
306    *inner.events.attributes_changed.lock() = Some(Box::new(handler));
307}
308
309pub(super) fn on_encryption_status_changed(
310    inner: &Arc<ParticipantInner>,
311    handler: impl Fn(Participant, bool) + Send + 'static,
312) {
313    *inner.events.encryption_status_changed.lock() = Some(Box::new(handler));
314}
315
316pub(super) fn on_permission_changed(
317    inner: &Arc<ParticipantInner>,
318    handler: impl Fn(Participant, Option<proto::ParticipantPermission>) + Send + 'static,
319) {
320    *inner.events.permission_changed.lock() = Some(Box::new(handler));
321}
322
323pub(super) fn update_encryption_status(inner: &Arc<ParticipantInner>, participant: &Participant) {
324    use crate::e2ee::EncryptionType;
325
326    let track_publications = inner.track_publications.read();
327    let data_encryption_status = inner.is_data_encrypted.read();
328
329    // Check if all track publications are encrypted
330    let tracks_encrypted = !track_publications.is_empty()
331        && track_publications.values().all(|pub_| pub_.encryption_type() != EncryptionType::None);
332
333    // Overall encryption status: both tracks and data must be encrypted (if data exists)
334    let is_encrypted = match *data_encryption_status {
335        Some(data_encrypted) => tracks_encrypted && data_encrypted,
336        None => tracks_encrypted, // No data messages yet, only consider tracks
337    };
338
339    let mut current_status = inner.is_encrypted.write();
340    if *current_status != is_encrypted {
341        *current_status = is_encrypted;
342        drop(current_status);
343        drop(track_publications);
344        drop(data_encryption_status);
345
346        if let Some(cb) = inner.events.encryption_status_changed.lock().as_ref() {
347            cb(participant.clone(), is_encrypted);
348        }
349    }
350}
351
352pub(super) fn update_data_encryption_status(
353    inner: &Arc<ParticipantInner>,
354    participant: &Participant,
355    is_encrypted: bool,
356) {
357    let mut data_encryption_status = inner.is_data_encrypted.write();
358    let previous_status = *data_encryption_status;
359
360    match previous_status {
361        Some(current) if current == is_encrypted => {
362            // No change needed
363            return;
364        }
365        Some(true) if !is_encrypted => {
366            // Data was encrypted, now unencrypted - update immediately
367            *data_encryption_status = Some(false);
368        }
369        Some(false) if is_encrypted => {
370            // Data was unencrypted, now encrypted - but we need to keep it false
371            // because once we've seen unencrypted data, participant is not fully encrypted
372            return;
373        }
374        None => {
375            // First data message - set the status
376            *data_encryption_status = Some(is_encrypted);
377        }
378        _ => return,
379    }
380
381    drop(data_encryption_status);
382
383    // Update overall encryption status
384    update_encryption_status(inner, participant);
385}
386
387pub(super) fn remove_publication(
388    inner: &Arc<ParticipantInner>,
389    participant: &Participant,
390    sid: &TrackSid,
391) -> Option<TrackPublication> {
392    let mut tracks = inner.track_publications.write();
393    let publication = tracks.remove(sid);
394    if let Some(publication) = publication.clone() {
395        // remove events
396        publication.on_muted(|_| {});
397        publication.on_unmuted(|_| {});
398    } else {
399        // shouldn't happen (internal)
400        log::warn!("could not find publication to remove: {:?}", sid);
401    }
402    drop(tracks);
403
404    // Update encryption status after removing publication
405    update_encryption_status(inner, participant);
406
407    publication
408}
409
410pub(super) fn add_publication(
411    inner: &Arc<ParticipantInner>,
412    participant: &Participant,
413    publication: TrackPublication,
414) {
415    let mut tracks = inner.track_publications.write();
416    tracks.insert(publication.sid(), publication.clone());
417
418    publication.on_muted({
419        let events = inner.events.clone();
420        let participant = participant.clone();
421        let rtc_engine = inner.rtc_engine.clone();
422        move |publication| {
423            if let Some(cb) = events.track_muted.lock().as_ref() {
424                if !publication.is_remote() {
425                    let rtc_engine = rtc_engine.clone();
426                    let publication_cloned = publication.clone();
427                    livekit_runtime::spawn(async move {
428                        let engine_request = rtc_engine
429                            .mute_track(proto::MuteTrackRequest {
430                                sid: publication_cloned.sid().to_string(),
431                                muted: true,
432                            })
433                            .await;
434                        if let Err(e) = engine_request {
435                            log::error!("could not mute track: {e:?}");
436                        }
437                    });
438                }
439                cb(participant.clone(), publication);
440            }
441        }
442    });
443
444    publication.on_unmuted({
445        let events = inner.events.clone();
446        let participant = participant.clone();
447        let rtc_engine = inner.rtc_engine.clone();
448        move |publication| {
449            if let Some(cb) = events.track_unmuted.lock().as_ref() {
450                if !publication.is_remote() {
451                    let rtc_engine = rtc_engine.clone();
452                    let publication_cloned = publication.clone();
453                    livekit_runtime::spawn(async move {
454                        let engine_request = rtc_engine
455                            .mute_track(proto::MuteTrackRequest {
456                                sid: publication_cloned.sid().to_string(),
457                                muted: false,
458                            })
459                            .await;
460                        if let Err(e) = engine_request {
461                            log::error!("could not unmute track: {e:?}");
462                        }
463                    });
464                }
465                cb(participant.clone(), publication);
466            }
467        }
468    });
469    drop(tracks);
470
471    // Update encryption status after adding publication
472    update_encryption_status(inner, participant);
473}