1use std::{collections::HashMap, fmt::Debug, sync::Arc};
16
17use livekit_protocol as proto;
18use livekit_protocol::enum_dispatch;
19use parking_lot::{Mutex, RwLock};
20
21use crate::{prelude::*, rtc_engine::RtcEngine};
22
23mod local_participant;
24mod remote_participant;
25mod rpc;
26use crate::room::utils;
27
28pub use local_participant::*;
29pub use remote_participant::*;
30pub use rpc::*;
31
32#[derive(Debug, Clone, Copy, Eq, PartialEq)]
33pub enum ConnectionQuality {
34 Excellent,
35 Good,
36 Poor,
37 Lost,
38}
39
40#[derive(Debug, Clone, Copy, Eq, PartialEq)]
41pub enum ParticipantKind {
42 Standard,
43 Ingress,
44 Egress,
45 Sip,
46 Agent,
47 Connector,
48 Bridge,
49}
50
51#[derive(Debug, Clone, Copy, Eq, PartialEq)]
52pub enum ParticipantKindDetail {
53 CloudAgent,
54 Forwarded,
55 ConnectorWhatsapp,
56 ConnectorTwilio,
57 BridgeRtsp,
58}
59
60#[derive(Debug, Clone, Copy, Eq, PartialEq)]
61pub enum DisconnectReason {
62 UnknownReason,
63 ClientInitiated,
64 DuplicateIdentity,
65 ServerShutdown,
66 ParticipantRemoved,
67 RoomDeleted,
68 StateMismatch,
69 JoinFailure,
70 Migration,
71 SignalClose,
72 RoomClosed,
73 UserUnavailable,
74 UserRejected,
75 SipTrunkFailure,
76 ConnectionTimeout,
77 MediaFailure,
78}
79
80#[derive(Debug, Clone)]
81pub enum Participant {
82 Local(LocalParticipant),
83 Remote(RemoteParticipant),
84}
85
86impl Participant {
87 enum_dispatch!(
88 [Local, Remote];
89 pub fn sid(self: &Self) -> ParticipantSid;
90 pub fn identity(self: &Self) -> ParticipantIdentity;
91 pub fn name(self: &Self) -> String;
92 pub fn metadata(self: &Self) -> String;
93 pub fn attributes(self: &Self) -> HashMap<String, String>;
94 pub fn is_speaking(self: &Self) -> bool;
95 pub fn audio_level(self: &Self) -> f32;
96 pub fn connection_quality(self: &Self) -> ConnectionQuality;
97 pub fn kind(self: &Self) -> ParticipantKind;
98 pub fn kind_details(self: &Self) -> Vec<ParticipantKindDetail>;
99 pub fn disconnect_reason(self: &Self) -> DisconnectReason;
100 pub fn is_encrypted(self: &Self) -> bool;
101 pub fn permission(self: &Self) -> Option<proto::ParticipantPermission>;
102
103 pub(crate) fn update_info(self: &Self, info: proto::ParticipantInfo) -> ();
104
105 pub(crate) fn set_speaking(self: &Self, speaking: bool) -> ();
107 pub(crate) fn set_audio_level(self: &Self, level: f32) -> ();
108 pub(crate) fn set_connection_quality(self: &Self, quality: ConnectionQuality) -> ();
109 pub(crate) fn add_publication(self: &Self, publication: TrackPublication) -> ();
110 pub(crate) fn remove_publication(self: &Self, sid: &TrackSid) -> Option<TrackPublication>;
111 pub(crate) fn update_data_encryption_status(self: &Self, is_encrypted: bool) -> ();
112 );
113
114 pub fn track_publications(&self) -> HashMap<TrackSid, TrackPublication> {
115 match self {
116 Participant::Local(p) => p.internal_track_publications(),
117 Participant::Remote(p) => p.internal_track_publications(),
118 }
119 }
120}
121
122struct ParticipantInfo {
123 pub sid: ParticipantSid,
124 pub identity: ParticipantIdentity,
125 pub name: String,
126 pub metadata: String,
127 pub attributes: HashMap<String, String>,
128 pub speaking: bool,
129 pub audio_level: f32,
130 pub connection_quality: ConnectionQuality,
131 pub kind: ParticipantKind,
132 pub kind_details: Vec<ParticipantKindDetail>,
133 pub disconnect_reason: DisconnectReason,
134 pub permission: Option<proto::ParticipantPermission>,
135}
136
137type TrackMutedHandler = Box<dyn Fn(Participant, TrackPublication) + Send>;
138type TrackUnmutedHandler = Box<dyn Fn(Participant, TrackPublication) + Send>;
139type MetadataChangedHandler = Box<dyn Fn(Participant, String, String) + Send>;
140type AttributesChangedHandler = Box<dyn Fn(Participant, HashMap<String, String>) + Send>;
141type NameChangedHandler = Box<dyn Fn(Participant, String, String) + Send>;
142type EncryptionStatusChangedHandler = Box<dyn Fn(Participant, bool) + Send>;
143type PermissionChangedHandler =
144 Box<dyn Fn(Participant, Option<proto::ParticipantPermission>) + Send>;
145
146#[derive(Default)]
147struct ParticipantEvents {
148 track_muted: Mutex<Option<TrackMutedHandler>>,
149 track_unmuted: Mutex<Option<TrackUnmutedHandler>>,
150 metadata_changed: Mutex<Option<MetadataChangedHandler>>,
151 attributes_changed: Mutex<Option<AttributesChangedHandler>>,
152 name_changed: Mutex<Option<NameChangedHandler>>,
153 encryption_status_changed: Mutex<Option<EncryptionStatusChangedHandler>>,
154 permission_changed: Mutex<Option<PermissionChangedHandler>>,
155}
156
157pub(super) struct ParticipantInner {
158 rtc_engine: Arc<RtcEngine>,
159 info: RwLock<ParticipantInfo>,
160 track_publications: RwLock<HashMap<TrackSid, TrackPublication>>,
161 events: Arc<ParticipantEvents>,
162 is_encrypted: RwLock<bool>,
163 is_data_encrypted: RwLock<Option<bool>>,
164}
165
166#[derive(Clone)]
167pub struct ParticipantTrackPermission {
168 pub participant_identity: ParticipantIdentity,
169 pub allow_all: bool,
170 pub allowed_track_sids: Vec<TrackSid>,
171}
172
173pub(super) fn new_inner(
174 rtc_engine: Arc<RtcEngine>,
175 sid: ParticipantSid,
176 identity: ParticipantIdentity,
177 name: String,
178 metadata: String,
179 attributes: HashMap<String, String>,
180 kind: ParticipantKind,
181 kind_details: Vec<ParticipantKindDetail>,
182 permission: Option<proto::ParticipantPermission>,
183) -> Arc<ParticipantInner> {
184 Arc::new(ParticipantInner {
185 rtc_engine,
186 info: RwLock::new(ParticipantInfo {
187 sid,
188 identity,
189 name,
190 metadata,
191 attributes,
192 kind,
193 kind_details,
194 speaking: false,
195 audio_level: 0.0,
196 connection_quality: ConnectionQuality::Excellent,
197 disconnect_reason: DisconnectReason::UnknownReason,
198 permission,
199 }),
200 track_publications: Default::default(),
201 events: Default::default(),
202 is_encrypted: RwLock::new(false),
203 is_data_encrypted: RwLock::new(None),
204 })
205}
206
207pub(super) fn update_info(
208 inner: &Arc<ParticipantInner>,
209 participant: &Participant,
210 new_info: proto::ParticipantInfo,
211) {
212 let mut info = inner.info.write();
213 info.disconnect_reason = new_info.disconnect_reason().into();
214 info.kind = new_info.kind().into();
215 info.kind_details = super::utils::convert_kind_details(&new_info.kind_details);
216 info.sid = new_info.sid.try_into().unwrap();
217 info.identity = new_info.identity.into();
218
219 let old_name = std::mem::replace(&mut info.name, new_info.name.clone());
220 if old_name != new_info.name {
221 if let Some(cb) = inner.events.name_changed.lock().as_ref() {
222 cb(participant.clone(), old_name, new_info.name);
223 }
224 }
225
226 let old_metadata = std::mem::replace(&mut info.metadata, new_info.metadata.clone());
227 if old_metadata != new_info.metadata {
228 if let Some(cb) = inner.events.metadata_changed.lock().as_ref() {
229 cb(participant.clone(), old_metadata, new_info.metadata);
230 }
231 }
232
233 let old_attributes = std::mem::replace(&mut info.attributes, new_info.attributes.clone());
234 let changed_attributes =
235 utils::calculate_changed_attributes(old_attributes, new_info.attributes.clone());
236 if changed_attributes.len() != 0 {
237 if let Some(cb) = inner.events.attributes_changed.lock().as_ref() {
238 cb(participant.clone(), changed_attributes);
239 }
240 }
241
242 let old_permission = std::mem::replace(&mut info.permission, new_info.permission.clone());
243 if old_permission != new_info.permission {
244 if let Some(cb) = inner.events.permission_changed.lock().as_ref() {
245 cb(participant.clone(), new_info.permission.clone());
246 }
247 }
248}
249
250pub(super) fn set_speaking(
251 inner: &Arc<ParticipantInner>,
252 _participant: &Participant,
253 speaking: bool,
254) {
255 inner.info.write().speaking = speaking;
256}
257
258pub(super) fn set_audio_level(
259 inner: &Arc<ParticipantInner>,
260 _participant: &Participant,
261 audio_level: f32,
262) {
263 inner.info.write().audio_level = audio_level;
264}
265
266pub(super) fn set_connection_quality(
267 inner: &Arc<ParticipantInner>,
268 _participant: &Participant,
269 quality: ConnectionQuality,
270) {
271 inner.info.write().connection_quality = quality;
272}
273
274pub(super) fn on_track_muted(
275 inner: &Arc<ParticipantInner>,
276 handler: impl Fn(Participant, TrackPublication) + Send + 'static,
277) {
278 *inner.events.track_muted.lock() = Some(Box::new(handler));
279}
280
281pub(super) fn on_track_unmuted(
282 inner: &Arc<ParticipantInner>,
283 handler: impl Fn(Participant, TrackPublication) + Send + 'static,
284) {
285 *inner.events.track_unmuted.lock() = Some(Box::new(handler));
286}
287
288pub(super) fn on_metadata_changed(
289 inner: &Arc<ParticipantInner>,
290 handler: impl Fn(Participant, String, String) + Send + 'static,
291) {
292 *inner.events.metadata_changed.lock() = Some(Box::new(handler));
293}
294
295pub(super) fn on_name_changed(
296 inner: &Arc<ParticipantInner>,
297 handler: impl Fn(Participant, String, String) + Send + 'static,
298) {
299 *inner.events.name_changed.lock() = Some(Box::new(handler));
300}
301
302pub(super) fn on_attributes_changed(
303 inner: &Arc<ParticipantInner>,
304 handler: impl Fn(Participant, HashMap<String, String>) + Send + 'static,
305) {
306 *inner.events.attributes_changed.lock() = Some(Box::new(handler));
307}
308
309pub(super) fn on_encryption_status_changed(
310 inner: &Arc<ParticipantInner>,
311 handler: impl Fn(Participant, bool) + Send + 'static,
312) {
313 *inner.events.encryption_status_changed.lock() = Some(Box::new(handler));
314}
315
316pub(super) fn on_permission_changed(
317 inner: &Arc<ParticipantInner>,
318 handler: impl Fn(Participant, Option<proto::ParticipantPermission>) + Send + 'static,
319) {
320 *inner.events.permission_changed.lock() = Some(Box::new(handler));
321}
322
323pub(super) fn update_encryption_status(inner: &Arc<ParticipantInner>, participant: &Participant) {
324 use crate::e2ee::EncryptionType;
325
326 let track_publications = inner.track_publications.read();
327 let data_encryption_status = inner.is_data_encrypted.read();
328
329 let tracks_encrypted = !track_publications.is_empty()
331 && track_publications.values().all(|pub_| pub_.encryption_type() != EncryptionType::None);
332
333 let is_encrypted = match *data_encryption_status {
335 Some(data_encrypted) => tracks_encrypted && data_encrypted,
336 None => tracks_encrypted, };
338
339 let mut current_status = inner.is_encrypted.write();
340 if *current_status != is_encrypted {
341 *current_status = is_encrypted;
342 drop(current_status);
343 drop(track_publications);
344 drop(data_encryption_status);
345
346 if let Some(cb) = inner.events.encryption_status_changed.lock().as_ref() {
347 cb(participant.clone(), is_encrypted);
348 }
349 }
350}
351
352pub(super) fn update_data_encryption_status(
353 inner: &Arc<ParticipantInner>,
354 participant: &Participant,
355 is_encrypted: bool,
356) {
357 let mut data_encryption_status = inner.is_data_encrypted.write();
358 let previous_status = *data_encryption_status;
359
360 match previous_status {
361 Some(current) if current == is_encrypted => {
362 return;
364 }
365 Some(true) if !is_encrypted => {
366 *data_encryption_status = Some(false);
368 }
369 Some(false) if is_encrypted => {
370 return;
373 }
374 None => {
375 *data_encryption_status = Some(is_encrypted);
377 }
378 _ => return,
379 }
380
381 drop(data_encryption_status);
382
383 update_encryption_status(inner, participant);
385}
386
387pub(super) fn remove_publication(
388 inner: &Arc<ParticipantInner>,
389 participant: &Participant,
390 sid: &TrackSid,
391) -> Option<TrackPublication> {
392 let mut tracks = inner.track_publications.write();
393 let publication = tracks.remove(sid);
394 if let Some(publication) = publication.clone() {
395 publication.on_muted(|_| {});
397 publication.on_unmuted(|_| {});
398 } else {
399 log::warn!("could not find publication to remove: {:?}", sid);
401 }
402 drop(tracks);
403
404 update_encryption_status(inner, participant);
406
407 publication
408}
409
410pub(super) fn add_publication(
411 inner: &Arc<ParticipantInner>,
412 participant: &Participant,
413 publication: TrackPublication,
414) {
415 let mut tracks = inner.track_publications.write();
416 tracks.insert(publication.sid(), publication.clone());
417
418 publication.on_muted({
419 let events = inner.events.clone();
420 let participant = participant.clone();
421 let rtc_engine = inner.rtc_engine.clone();
422 move |publication| {
423 if let Some(cb) = events.track_muted.lock().as_ref() {
424 if !publication.is_remote() {
425 let rtc_engine = rtc_engine.clone();
426 let publication_cloned = publication.clone();
427 livekit_runtime::spawn(async move {
428 let engine_request = rtc_engine
429 .mute_track(proto::MuteTrackRequest {
430 sid: publication_cloned.sid().to_string(),
431 muted: true,
432 })
433 .await;
434 if let Err(e) = engine_request {
435 log::error!("could not mute track: {e:?}");
436 }
437 });
438 }
439 cb(participant.clone(), publication);
440 }
441 }
442 });
443
444 publication.on_unmuted({
445 let events = inner.events.clone();
446 let participant = participant.clone();
447 let rtc_engine = inner.rtc_engine.clone();
448 move |publication| {
449 if let Some(cb) = events.track_unmuted.lock().as_ref() {
450 if !publication.is_remote() {
451 let rtc_engine = rtc_engine.clone();
452 let publication_cloned = publication.clone();
453 livekit_runtime::spawn(async move {
454 let engine_request = rtc_engine
455 .mute_track(proto::MuteTrackRequest {
456 sid: publication_cloned.sid().to_string(),
457 muted: false,
458 })
459 .await;
460 if let Err(e) = engine_request {
461 log::error!("could not unmute track: {e:?}");
462 }
463 });
464 }
465 cb(participant.clone(), publication);
466 }
467 }
468 });
469 drop(tracks);
470
471 update_encryption_status(inner, participant);
473}