Skip to main content

gosuto_livekit/room/participant/
local_participant.rs

1// Copyright 2025 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    fmt::Debug,
18    future::Future,
19    path::Path,
20    pin::Pin,
21    sync::{Arc, Weak},
22    time::Duration,
23};
24
25use super::{
26    ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantKindDetail,
27    ParticipantTrackPermission,
28};
29use crate::{
30    data_stream::{
31        ByteStreamInfo, ByteStreamWriter, StreamByteOptions, StreamResult, StreamTextOptions,
32        TextStreamInfo, TextStreamWriter,
33    },
34    e2ee::EncryptionType,
35    options::{self, compute_video_encodings, video_layers_from_encodings, TrackPublishOptions},
36    prelude::*,
37    room::participant::rpc::{RpcError, RpcErrorCode, RpcInvocationData, MAX_PAYLOAD_BYTES},
38    rtc_engine::{EngineError, RtcEngine},
39    ChatMessage, DataPacket, RoomSession, RpcAck, RpcRequest, RpcResponse, SipDTMF, Transcription,
40};
41use chrono::Utc;
42use gosuto_libwebrtc::{native::create_random_uuid, rtp_parameters::RtpEncodingParameters};
43use livekit_api::signal_client::SignalError;
44use livekit_protocol as proto;
45use livekit_runtime::timeout;
46use parking_lot::{Mutex, RwLock};
47use proto::request_response::Reason;
48use semver::Version;
49use tokio::sync::oneshot;
50
51type RpcHandler = Arc<
52    dyn Fn(RpcInvocationData) -> Pin<Box<dyn Future<Output = Result<String, RpcError>> + Send>>
53        + Send
54        + Sync,
55>;
56
57const REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
58
59type LocalTrackPublishedHandler = Box<dyn Fn(LocalParticipant, LocalTrackPublication) + Send>;
60type LocalTrackUnpublishedHandler = Box<dyn Fn(LocalParticipant, LocalTrackPublication) + Send>;
61
62#[derive(Default)]
63struct LocalEvents {
64    local_track_published: Mutex<Option<LocalTrackPublishedHandler>>,
65    local_track_unpublished: Mutex<Option<LocalTrackUnpublishedHandler>>,
66}
67
68struct RpcState {
69    pending_acks: HashMap<String, oneshot::Sender<()>>,
70    pending_responses: HashMap<String, oneshot::Sender<Result<String, RpcError>>>,
71    handlers: HashMap<String, RpcHandler>,
72}
73
74impl RpcState {
75    fn new() -> Self {
76        Self {
77            pending_acks: HashMap::new(),
78            pending_responses: HashMap::new(),
79            handlers: HashMap::new(),
80        }
81    }
82}
83struct LocalInfo {
84    events: LocalEvents,
85    encryption_type: EncryptionType,
86    rpc_state: Mutex<RpcState>,
87    all_participants_allowed: Mutex<bool>,
88    track_permissions: Mutex<Vec<ParticipantTrackPermission>>,
89    session: RwLock<Option<Weak<RoomSession>>>,
90}
91
92#[derive(Clone)]
93pub struct LocalParticipant {
94    inner: Arc<ParticipantInner>,
95    local: Arc<LocalInfo>,
96}
97
98impl Debug for LocalParticipant {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        f.debug_struct("LocalParticipant")
101            .field("sid", &self.sid())
102            .field("identity", &self.identity())
103            .field("name", &self.name())
104            .finish()
105    }
106}
107
108impl LocalParticipant {
109    pub(crate) fn new(
110        rtc_engine: Arc<RtcEngine>,
111        kind: ParticipantKind,
112        kind_details: Vec<ParticipantKindDetail>,
113        sid: ParticipantSid,
114        identity: ParticipantIdentity,
115        name: String,
116        metadata: String,
117        attributes: HashMap<String, String>,
118        encryption_type: EncryptionType,
119        permission: Option<proto::ParticipantPermission>,
120    ) -> Self {
121        Self {
122            inner: super::new_inner(
123                rtc_engine,
124                sid,
125                identity,
126                name,
127                metadata,
128                attributes,
129                kind,
130                kind_details,
131                permission,
132            ),
133            local: Arc::new(LocalInfo {
134                events: LocalEvents::default(),
135                encryption_type,
136                rpc_state: Mutex::new(RpcState::new()),
137                all_participants_allowed: Mutex::new(true),
138                track_permissions: Mutex::new(vec![]),
139                session: Default::default(),
140            }),
141        }
142    }
143
144    pub(crate) fn set_session(&self, session: Weak<RoomSession>) {
145        *self.local.session.write() = Some(session);
146    }
147
148    pub(crate) fn session(&self) -> Option<Arc<RoomSession>> {
149        self.local.session.read().as_ref().and_then(|s| s.upgrade())
150    }
151
152    pub(crate) fn internal_track_publications(&self) -> HashMap<TrackSid, TrackPublication> {
153        self.inner.track_publications.read().clone()
154    }
155
156    pub(crate) fn update_info(&self, info: proto::ParticipantInfo) {
157        super::update_info(&self.inner, &Participant::Local(self.clone()), info);
158    }
159
160    pub(crate) fn set_speaking(&self, speaking: bool) {
161        super::set_speaking(&self.inner, &Participant::Local(self.clone()), speaking);
162    }
163
164    pub(crate) fn set_audio_level(&self, level: f32) {
165        super::set_audio_level(&self.inner, &Participant::Local(self.clone()), level);
166    }
167
168    pub(crate) fn set_connection_quality(&self, quality: ConnectionQuality) {
169        super::set_connection_quality(&self.inner, &Participant::Local(self.clone()), quality);
170    }
171
172    pub(crate) fn on_local_track_published(
173        &self,
174        handler: impl Fn(LocalParticipant, LocalTrackPublication) + Send + 'static,
175    ) {
176        *self.local.events.local_track_published.lock() = Some(Box::new(handler));
177    }
178
179    pub(crate) fn on_local_track_unpublished(
180        &self,
181        handler: impl Fn(LocalParticipant, LocalTrackPublication) + Send + 'static,
182    ) {
183        *self.local.events.local_track_unpublished.lock() = Some(Box::new(handler));
184    }
185
186    pub(crate) fn on_track_muted(
187        &self,
188        handler: impl Fn(Participant, TrackPublication) + Send + 'static,
189    ) {
190        super::on_track_muted(&self.inner, handler)
191    }
192
193    pub(crate) fn on_track_unmuted(
194        &self,
195        handler: impl Fn(Participant, TrackPublication) + Send + 'static,
196    ) {
197        super::on_track_unmuted(&self.inner, handler)
198    }
199
200    pub(crate) fn on_metadata_changed(
201        &self,
202        handler: impl Fn(Participant, String, String) + Send + 'static,
203    ) {
204        super::on_metadata_changed(&self.inner, handler)
205    }
206
207    pub(crate) fn on_name_changed(
208        &self,
209        handler: impl Fn(Participant, String, String) + Send + 'static,
210    ) {
211        super::on_name_changed(&self.inner, handler)
212    }
213
214    pub(crate) fn on_attributes_changed(
215        &self,
216        handler: impl Fn(Participant, HashMap<String, String>) + Send + 'static,
217    ) {
218        super::on_attributes_changed(&self.inner, handler)
219    }
220
221    pub(crate) fn on_permission_changed(
222        &self,
223        handler: impl Fn(Participant, Option<proto::ParticipantPermission>) + Send + 'static,
224    ) {
225        super::on_permission_changed(&self.inner, handler)
226    }
227
228    pub(crate) fn add_publication(&self, publication: TrackPublication) {
229        super::add_publication(&self.inner, &Participant::Local(self.clone()), publication);
230    }
231
232    pub(crate) fn remove_publication(&self, sid: &TrackSid) -> Option<TrackPublication> {
233        super::remove_publication(&self.inner, &Participant::Local(self.clone()), sid)
234    }
235
236    pub(crate) fn published_tracks_info(&self) -> Vec<proto::TrackPublishedResponse> {
237        let tracks = self.track_publications();
238        let mut vec = Vec::with_capacity(tracks.len());
239
240        for p in tracks.values() {
241            if let Some(track) = p.track() {
242                vec.push(proto::TrackPublishedResponse {
243                    cid: track.rtc_track().id(),
244                    track: Some(p.proto_info()),
245                });
246            }
247        }
248
249        vec
250    }
251
252    pub async fn publish_track(
253        &self,
254        track: LocalTrack,
255        options: TrackPublishOptions,
256    ) -> RoomResult<LocalTrackPublication> {
257        let disable_red = self.local.encryption_type != EncryptionType::None || !options.red;
258
259        let mut req = proto::AddTrackRequest {
260            cid: track.rtc_track().id(),
261            name: track.name(),
262            r#type: proto::TrackType::from(track.kind()) as i32,
263            muted: track.is_muted(),
264            source: proto::TrackSource::from(options.source) as i32,
265            disable_dtx: !options.dtx,
266            disable_red,
267            encryption: proto::encryption::Type::from(self.local.encryption_type) as i32,
268            stream: options.stream.clone(),
269            ..Default::default()
270        };
271
272        if options.preconnect_buffer {
273            req.audio_features.push(proto::AudioTrackFeature::TfPreconnectBuffer as i32);
274        }
275
276        let mut encodings = Vec::default();
277        match &track {
278            LocalTrack::Video(video_track) => {
279                // Get the video dimension
280                // TODO(theomonnom): Use MediaStreamTrack::getSettings() on web
281                let resolution = video_track.rtc_source().video_resolution();
282                req.width = resolution.width;
283                req.height = resolution.height;
284
285                encodings = compute_video_encodings(req.width, req.height, &options);
286                req.layers = video_layers_from_encodings(req.width, req.height, &encodings);
287
288                // Populate simulcast_codecs so the server knows this track is simulcasted
289                if options.simulcast && encodings.len() > 1 {
290                    req.simulcast_codecs = vec![proto::SimulcastCodec {
291                        codec: options.video_codec.as_str().to_string(),
292                        cid: track.rtc_track().id(),
293                        layers: req.layers.clone(),
294                        ..Default::default()
295                    }];
296                }
297            }
298            LocalTrack::Audio(_audio_track) => {
299                // Setup audio encoding
300                let audio_encoding =
301                    options.audio_encoding.as_ref().unwrap_or(&options::audio::MUSIC.encoding);
302
303                encodings.push(RtpEncodingParameters {
304                    max_bitrate: Some(audio_encoding.max_bitrate),
305                    ..Default::default()
306                });
307            }
308        }
309        let track_info = self.inner.rtc_engine.add_track(req).await?;
310        let publication = LocalTrackPublication::new(track_info.clone(), track.clone());
311        track.update_info(track_info); // Update sid + source
312
313        // set track for publication to listen mute/unmute events
314        publication.set_track(Some(track.clone().into()));
315
316        let transceiver =
317            self.inner.rtc_engine.create_sender(track.clone(), options.clone(), encodings).await?;
318
319        track.set_transceiver(Some(transceiver));
320
321        self.inner.rtc_engine.publisher_negotiation_needed();
322
323        publication.update_publish_options(options);
324        self.add_publication(TrackPublication::Local(publication.clone()));
325
326        if let Some(local_track_published) = self.local.events.local_track_published.lock().as_ref()
327        {
328            local_track_published(self.clone(), publication.clone());
329        }
330        track.enable();
331
332        Ok(publication)
333    }
334
335    pub async fn set_metadata(&self, metadata: String) -> RoomResult<()> {
336        if let Ok(response) = timeout(REQUEST_TIMEOUT, {
337            let request_id = self.inner.rtc_engine.session().signal_client().next_request_id();
338            self.inner
339                .rtc_engine
340                .send_request(proto::signal_request::Message::UpdateMetadata(
341                    proto::UpdateParticipantMetadata {
342                        metadata,
343                        name: self.name(),
344                        attributes: Default::default(),
345                        request_id,
346                        ..Default::default()
347                    },
348                ))
349                .await;
350            self.inner.rtc_engine.get_response(request_id)
351        })
352        .await
353        {
354            match response.reason() {
355                Reason::Ok => Ok(()),
356                reason => Err(RoomError::Request { reason, message: response.message }),
357            }
358        } else {
359            Err(RoomError::Engine(EngineError::Signal(SignalError::Timeout(
360                "request timeout".into(),
361            ))))
362        }
363    }
364
365    pub async fn set_attributes(&self, attributes: HashMap<String, String>) -> RoomResult<()> {
366        if let Ok(response) = timeout(REQUEST_TIMEOUT, {
367            let request_id = self.inner.rtc_engine.session().signal_client().next_request_id();
368            self.inner
369                .rtc_engine
370                .send_request(proto::signal_request::Message::UpdateMetadata(
371                    proto::UpdateParticipantMetadata {
372                        attributes,
373                        metadata: self.metadata(),
374                        name: self.name(),
375                        request_id,
376                        ..Default::default()
377                    },
378                ))
379                .await;
380            self.inner.rtc_engine.get_response(request_id)
381        })
382        .await
383        {
384            match response.reason() {
385                Reason::Ok => Ok(()),
386                reason => Err(RoomError::Request { reason, message: response.message }),
387            }
388        } else {
389            Err(RoomError::Engine(EngineError::Signal(SignalError::Timeout(
390                "request timeout".into(),
391            ))))
392        }
393    }
394
395    pub async fn set_name(&self, name: String) -> RoomResult<()> {
396        if let Ok(response) = timeout(REQUEST_TIMEOUT, {
397            let request_id = self.inner.rtc_engine.session().signal_client().next_request_id();
398            self.inner
399                .rtc_engine
400                .send_request(proto::signal_request::Message::UpdateMetadata(
401                    proto::UpdateParticipantMetadata {
402                        name,
403                        metadata: self.metadata(),
404                        attributes: Default::default(),
405                        request_id,
406                        ..Default::default()
407                    },
408                ))
409                .await;
410            self.inner.rtc_engine.get_response(request_id)
411        })
412        .await
413        {
414            match response.reason() {
415                Reason::Ok => Ok(()),
416                reason => Err(RoomError::Request { reason, message: response.message }),
417            }
418        } else {
419            Err(RoomError::Engine(EngineError::Signal(SignalError::Timeout(
420                "request timeout".into(),
421            ))))
422        }
423    }
424
425    pub async fn send_chat_message(
426        &self,
427        text: String,
428        destination_identities: Option<Vec<String>>,
429        sender_identity: Option<String>,
430    ) -> RoomResult<ChatMessage> {
431        let chat_message = proto::ChatMessage {
432            id: create_random_uuid(),
433            timestamp: Utc::now().timestamp_millis(),
434            message: text,
435            ..Default::default()
436        };
437
438        let data = proto::DataPacket {
439            value: Some(proto::data_packet::Value::ChatMessage(chat_message.clone())),
440            participant_identity: sender_identity.unwrap_or_default(),
441            destination_identities: destination_identities.unwrap_or_default(),
442            ..Default::default()
443        };
444
445        match self.inner.rtc_engine.publish_data(data, DataPacketKind::Reliable, false).await {
446            Ok(_) => Ok(ChatMessage::from(chat_message)),
447            Err(e) => Err(Into::into(e)),
448        }
449    }
450
451    pub async fn edit_chat_message(
452        &self,
453        edit_text: String,
454        original_message: ChatMessage,
455        destination_identities: Option<Vec<String>>,
456        sender_identity: Option<String>,
457    ) -> RoomResult<ChatMessage> {
458        let edited_message = ChatMessage {
459            message: edit_text,
460            edit_timestamp: Utc::now().timestamp_millis().into(),
461            ..original_message
462        };
463        let proto_msg = proto::ChatMessage::from(edited_message);
464        let data = proto::DataPacket {
465            value: Some(proto::data_packet::Value::ChatMessage(proto_msg.clone())),
466            participant_identity: sender_identity.unwrap_or_default(),
467            destination_identities: destination_identities.unwrap_or_default(),
468            ..Default::default()
469        };
470
471        match self.inner.rtc_engine.publish_data(data, DataPacketKind::Reliable, false).await {
472            Ok(_) => Ok(ChatMessage::from(proto_msg)),
473            Err(e) => Err(Into::into(e)),
474        }
475    }
476
477    pub async fn unpublish_track(
478        &self,
479        sid: &TrackSid,
480        // _stop_on_unpublish: bool,
481    ) -> RoomResult<LocalTrackPublication> {
482        let publication = self.remove_publication(sid);
483        if let Some(TrackPublication::Local(publication)) = publication {
484            let track = publication.track().unwrap();
485            let sender = track.transceiver().unwrap().sender();
486
487            self.inner.rtc_engine.remove_track(sender)?;
488            track.set_transceiver(None);
489
490            if let Some(local_track_unpublished) =
491                self.local.events.local_track_unpublished.lock().as_ref()
492            {
493                local_track_unpublished(self.clone(), publication.clone());
494            }
495
496            publication.set_track(None);
497            self.inner.rtc_engine.publisher_negotiation_needed();
498
499            Ok(publication)
500        } else {
501            Err(RoomError::Internal("track not found".to_string()))
502        }
503    }
504
505    /** internal */
506    pub async fn publish_raw_data(
507        self,
508        packet: proto::DataPacket,
509        reliable: bool,
510    ) -> RoomResult<()> {
511        let kind = match reliable {
512            true => DataPacketKind::Reliable,
513            false => DataPacketKind::Lossy,
514        };
515        self.inner.rtc_engine.publish_data(packet, kind, true).await.map_err(Into::into)
516    }
517
518    pub async fn publish_data(&self, packet: DataPacket) -> RoomResult<()> {
519        let kind = match packet.reliable {
520            true => DataPacketKind::Reliable,
521            false => DataPacketKind::Lossy,
522        };
523        let destination_identities: Vec<String> =
524            packet.destination_identities.into_iter().map(Into::into).collect();
525        let data = proto::DataPacket {
526            kind: kind as i32,
527            destination_identities: destination_identities.clone(),
528            value: Some(proto::data_packet::Value::User(proto::UserPacket {
529                payload: packet.payload,
530                topic: packet.topic,
531                ..Default::default()
532            })),
533            ..Default::default()
534        };
535
536        self.inner.rtc_engine.publish_data(data, kind, false).await.map_err(Into::into)
537    }
538
539    pub fn set_data_channel_buffered_amount_low_threshold(
540        &self,
541        threshold: u64,
542        kind: DataPacketKind,
543    ) -> RoomResult<()> {
544        self.inner
545            .rtc_engine
546            .session()
547            .set_data_channel_buffered_amount_low_threshold(threshold, kind);
548        Ok(())
549    }
550
551    pub fn data_channel_buffered_amount_low_threshold(
552        &self,
553        kind: DataPacketKind,
554    ) -> RoomResult<u64> {
555        Ok(self.inner.rtc_engine.session().data_channel_buffered_amount_low_threshold(kind))
556    }
557
558    pub async fn set_track_subscription_permissions(
559        &self,
560        all_participants_allowed: bool,
561        permissions: Vec<ParticipantTrackPermission>,
562    ) -> RoomResult<()> {
563        *self.local.track_permissions.lock() = permissions;
564        *self.local.all_participants_allowed.lock() = all_participants_allowed;
565        self.update_track_subscription_permissions().await;
566        Ok(())
567    }
568
569    pub async fn publish_transcription(&self, packet: Transcription) -> RoomResult<()> {
570        let segments: Vec<proto::TranscriptionSegment> = packet
571            .segments
572            .into_iter()
573            .map(|segment| proto::TranscriptionSegment {
574                id: segment.id,
575                start_time: segment.start_time,
576                end_time: segment.end_time,
577                text: segment.text,
578                r#final: segment.r#final,
579                language: segment.language,
580            })
581            .collect();
582        let transcription_packet = proto::Transcription {
583            transcribed_participant_identity: packet.participant_identity,
584            segments: segments,
585            track_id: packet.track_id,
586        };
587        let data = proto::DataPacket {
588            value: Some(proto::data_packet::Value::Transcription(transcription_packet)),
589            ..Default::default()
590        };
591        self.inner
592            .rtc_engine
593            .publish_data(data, DataPacketKind::Reliable, false)
594            .await
595            .map_err(Into::into)
596    }
597
598    pub async fn publish_dtmf(&self, dtmf: SipDTMF) -> RoomResult<()> {
599        let destination_identities: Vec<String> =
600            dtmf.destination_identities.into_iter().map(Into::into).collect();
601        let dtmf_message = proto::SipDtmf { code: dtmf.code, digit: dtmf.digit };
602
603        let data = proto::DataPacket {
604            value: Some(proto::data_packet::Value::SipDtmf(dtmf_message)),
605            destination_identities: destination_identities.clone(),
606            ..Default::default()
607        };
608
609        self.inner
610            .rtc_engine
611            .publish_data(data, DataPacketKind::Reliable, false)
612            .await
613            .map_err(Into::into)
614    }
615
616    async fn publish_rpc_request(&self, rpc_request: RpcRequest) -> RoomResult<()> {
617        let destination_identities = vec![rpc_request.destination_identity];
618        let rpc_request_message = proto::RpcRequest {
619            id: rpc_request.id,
620            method: rpc_request.method,
621            payload: rpc_request.payload,
622            response_timeout_ms: rpc_request.response_timeout.as_millis() as u32,
623            version: rpc_request.version,
624            ..Default::default()
625        };
626
627        let data = proto::DataPacket {
628            value: Some(proto::data_packet::Value::RpcRequest(rpc_request_message)),
629            destination_identities,
630            ..Default::default()
631        };
632
633        self.inner
634            .rtc_engine
635            .publish_data(data, DataPacketKind::Reliable, false)
636            .await
637            .map_err(Into::into)
638    }
639
640    async fn publish_rpc_response(&self, rpc_response: RpcResponse) -> RoomResult<()> {
641        let destination_identities = vec![rpc_response.destination_identity];
642        let rpc_response_message = proto::RpcResponse {
643            request_id: rpc_response.request_id,
644            value: Some(match rpc_response.error {
645                Some(error) => proto::rpc_response::Value::Error(proto::RpcError {
646                    code: error.code,
647                    message: error.message,
648                    data: error.data,
649                }),
650                None => proto::rpc_response::Value::Payload(rpc_response.payload.unwrap()),
651            }),
652            ..Default::default()
653        };
654
655        let data = proto::DataPacket {
656            value: Some(proto::data_packet::Value::RpcResponse(rpc_response_message)),
657            destination_identities: destination_identities.clone(),
658            ..Default::default()
659        };
660
661        self.inner
662            .rtc_engine
663            .publish_data(data, DataPacketKind::Reliable, false)
664            .await
665            .map_err(Into::into)
666    }
667
668    async fn publish_rpc_ack(&self, rpc_ack: RpcAck) -> RoomResult<()> {
669        let destination_identities = vec![rpc_ack.destination_identity];
670        let rpc_ack_message =
671            proto::RpcAck { request_id: rpc_ack.request_id, ..Default::default() };
672
673        let data = proto::DataPacket {
674            value: Some(proto::data_packet::Value::RpcAck(rpc_ack_message)),
675            destination_identities: destination_identities.clone(),
676            ..Default::default()
677        };
678
679        self.inner
680            .rtc_engine
681            .publish_data(data, DataPacketKind::Reliable, false)
682            .await
683            .map_err(Into::into)
684    }
685
686    pub(crate) async fn update_track_subscription_permissions(&self) {
687        let all_participants_allowed = *self.local.all_participants_allowed.lock();
688        let track_permissions = self
689            .local
690            .track_permissions
691            .lock()
692            .iter()
693            .map(|p| proto::TrackPermission::from(p.clone()))
694            .collect();
695
696        self.inner
697            .rtc_engine
698            .send_request(proto::signal_request::Message::SubscriptionPermission(
699                proto::SubscriptionPermission {
700                    all_participants: all_participants_allowed,
701                    track_permissions,
702                },
703            ))
704            .await;
705    }
706
707    pub fn get_track_publication(&self, sid: &TrackSid) -> Option<LocalTrackPublication> {
708        self.inner.track_publications.read().get(sid).map(|track| {
709            if let TrackPublication::Local(local) = track {
710                return local.clone();
711            }
712
713            unreachable!()
714        })
715    }
716
717    pub fn sid(&self) -> ParticipantSid {
718        self.inner.info.read().sid.clone()
719    }
720
721    pub fn identity(&self) -> ParticipantIdentity {
722        self.inner.info.read().identity.clone()
723    }
724
725    pub fn name(&self) -> String {
726        self.inner.info.read().name.clone()
727    }
728
729    pub fn metadata(&self) -> String {
730        self.inner.info.read().metadata.clone()
731    }
732
733    pub fn attributes(&self) -> HashMap<String, String> {
734        self.inner.info.read().attributes.clone()
735    }
736
737    pub fn is_speaking(&self) -> bool {
738        self.inner.info.read().speaking
739    }
740
741    pub fn track_publications(&self) -> HashMap<TrackSid, LocalTrackPublication> {
742        self.inner
743            .track_publications
744            .read()
745            .clone()
746            .into_iter()
747            .map(|(sid, track)| {
748                if let TrackPublication::Local(local) = track {
749                    return (sid, local);
750                }
751
752                unreachable!()
753            })
754            .collect()
755    }
756
757    pub fn audio_level(&self) -> f32 {
758        self.inner.info.read().audio_level
759    }
760
761    pub fn connection_quality(&self) -> ConnectionQuality {
762        self.inner.info.read().connection_quality
763    }
764
765    pub fn kind(&self) -> ParticipantKind {
766        self.inner.info.read().kind
767    }
768
769    pub fn kind_details(&self) -> Vec<ParticipantKindDetail> {
770        self.inner.info.read().kind_details.clone()
771    }
772
773    pub fn disconnect_reason(&self) -> DisconnectReason {
774        self.inner.info.read().disconnect_reason
775    }
776
777    pub fn permission(&self) -> Option<proto::ParticipantPermission> {
778        self.inner.info.read().permission.clone()
779    }
780
781    pub async fn perform_rpc(&self, data: PerformRpcData) -> Result<String, RpcError> {
782        // Maximum amount of time it should ever take for an RPC request to reach the destination, and the ACK to come back
783        // This is set to 7 seconds to account for various relay timeouts and retries in LiveKit Cloud that occur in rare cases
784
785        let max_round_trip_latency = Duration::from_millis(7000);
786        let min_effective_timeout = Duration::from_millis(1000);
787
788        if data.payload.len() > MAX_PAYLOAD_BYTES {
789            return Err(RpcError::built_in(RpcErrorCode::RequestPayloadTooLarge, None));
790        }
791
792        if let Some(server_info) =
793            self.inner.rtc_engine.session().signal_client().join_response().server_info
794        {
795            if !server_info.version.is_empty() {
796                let server_version = Version::parse(&server_info.version).unwrap();
797                let min_required_version = Version::parse("1.8.0").unwrap();
798                if server_version < min_required_version {
799                    return Err(RpcError::built_in(RpcErrorCode::UnsupportedServer, None));
800                }
801            }
802        }
803
804        let id = create_random_uuid();
805        let (ack_tx, ack_rx) = oneshot::channel();
806        let (response_tx, response_rx) = oneshot::channel();
807        let effective_timeout = std::cmp::max(
808            data.response_timeout.saturating_sub(max_round_trip_latency),
809            min_effective_timeout,
810        );
811
812        // Register channels BEFORE sending the request to avoid race condition
813        // where the response arrives before we've registered the handlers
814        {
815            let mut rpc_state = self.local.rpc_state.lock();
816            rpc_state.pending_acks.insert(id.clone(), ack_tx);
817            rpc_state.pending_responses.insert(id.clone(), response_tx);
818        }
819
820        if let Err(e) = self
821            .publish_rpc_request(RpcRequest {
822                destination_identity: data.destination_identity.clone(),
823                id: id.clone(),
824                method: data.method.clone(),
825                payload: data.payload.clone(),
826                response_timeout: effective_timeout,
827                version: 1,
828            })
829            .await
830        {
831            // Clean up on failure
832            let mut rpc_state = self.local.rpc_state.lock();
833            rpc_state.pending_acks.remove(&id);
834            rpc_state.pending_responses.remove(&id);
835            log::error!("Failed to publish RPC request: {}", e);
836            return Err(RpcError::built_in(RpcErrorCode::SendFailed, Some(e.to_string())));
837        }
838
839        // Wait for ack timeout
840        match tokio::time::timeout(max_round_trip_latency, ack_rx).await {
841            Err(_) => {
842                let mut rpc_state = self.local.rpc_state.lock();
843                rpc_state.pending_acks.remove(&id);
844                rpc_state.pending_responses.remove(&id);
845                return Err(RpcError::built_in(RpcErrorCode::ConnectionTimeout, None));
846            }
847            Ok(_) => {
848                // Ack received, continue to wait for response
849            }
850        }
851
852        // Wait for response timout
853        let response = match tokio::time::timeout(data.response_timeout, response_rx).await {
854            Err(_) => {
855                self.local.rpc_state.lock().pending_responses.remove(&id);
856                return Err(RpcError::built_in(RpcErrorCode::ResponseTimeout, None));
857            }
858            Ok(result) => result,
859        };
860
861        match response {
862            Err(_) => {
863                // Something went wrong locally
864                Err(RpcError::built_in(RpcErrorCode::RecipientDisconnected, None))
865            }
866            Ok(Err(e)) => {
867                // RPC error from remote, forward it
868                Err(e)
869            }
870            Ok(Ok(payload)) => {
871                // Successful response
872                Ok(payload)
873            }
874        }
875    }
876
877    pub fn register_rpc_method(
878        &self,
879        method: String,
880        handler: impl Fn(RpcInvocationData) -> Pin<Box<dyn Future<Output = Result<String, RpcError>> + Send>>
881            + Send
882            + Sync
883            + 'static,
884    ) {
885        self.local.rpc_state.lock().handlers.insert(method, Arc::new(handler));
886
887        // Pre-connect the publisher PC so ACKs can be sent immediately when requests arrive.
888        // Without this, the first RPC request would trigger publisher negotiation, causing
889        // a ~300-500ms delay before the ACK can be sent (ICE negotiation time).
890        self.inner.rtc_engine.publisher_negotiation_needed();
891    }
892
893    pub fn unregister_rpc_method(&self, method: String) {
894        self.local.rpc_state.lock().handlers.remove(&method);
895    }
896
897    pub(crate) fn handle_incoming_rpc_ack(&self, request_id: String) {
898        let mut rpc_state = self.local.rpc_state.lock();
899        if let Some(tx) = rpc_state.pending_acks.remove(&request_id) {
900            let _ = tx.send(());
901        } else {
902            log::error!("Ack received for unexpected RPC request: {}", request_id);
903        }
904    }
905
906    pub(crate) fn handle_incoming_rpc_response(
907        &self,
908        request_id: String,
909        payload: Option<String>,
910        error: Option<proto::RpcError>,
911    ) {
912        let mut rpc_state = self.local.rpc_state.lock();
913        if let Some(tx) = rpc_state.pending_responses.remove(&request_id) {
914            let _ = tx.send(match error {
915                Some(e) => Err(RpcError::from_proto(e)),
916                None => Ok(payload.unwrap_or_default()),
917            });
918        } else {
919            log::error!("Response received for unexpected RPC request: {}", request_id);
920        }
921    }
922
923    pub(crate) async fn handle_incoming_rpc_request(
924        &self,
925        caller_identity: ParticipantIdentity,
926        request_id: String,
927        method: String,
928        payload: String,
929        response_timeout: Duration,
930        version: u32,
931    ) {
932        if let Err(e) = self
933            .publish_rpc_ack(RpcAck {
934                destination_identity: caller_identity.to_string(),
935                request_id: request_id.clone(),
936            })
937            .await
938        {
939            log::error!("Failed to publish RPC ACK: {:?}", e);
940        }
941
942        let caller_identity_2 = caller_identity.clone();
943        let request_id_2 = request_id.clone();
944
945        let response = if version != 1 {
946            Err(RpcError::built_in(RpcErrorCode::UnsupportedVersion, None))
947        } else {
948            let handler = self.local.rpc_state.lock().handlers.get(&method).cloned();
949
950            match handler {
951                Some(handler) => {
952                    match tokio::task::spawn(async move {
953                        handler(RpcInvocationData {
954                            request_id: request_id.clone(),
955                            caller_identity: caller_identity.clone(),
956                            payload: payload.clone(),
957                            response_timeout,
958                        })
959                        .await
960                    })
961                    .await
962                    {
963                        Ok(result) => result,
964                        Err(e) => {
965                            log::error!("RPC method handler returned an error: {:?}", e);
966                            Err(RpcError::built_in(RpcErrorCode::ApplicationError, None))
967                        }
968                    }
969                }
970                None => Err(RpcError::built_in(RpcErrorCode::UnsupportedMethod, None)),
971            }
972        };
973
974        let (payload, error) = match response {
975            Ok(response_payload) if response_payload.len() <= MAX_PAYLOAD_BYTES => {
976                (Some(response_payload), None)
977            }
978            Ok(_) => (None, Some(RpcError::built_in(RpcErrorCode::ResponsePayloadTooLarge, None))),
979            Err(e) => (None, Some(e.into())),
980        };
981
982        if let Err(e) = self
983            .publish_rpc_response(RpcResponse {
984                destination_identity: caller_identity_2.to_string(),
985                request_id: request_id_2,
986                payload,
987                error: error.map(|e| e.to_proto()),
988            })
989            .await
990        {
991            log::error!("Failed to publish RPC response: {:?}", e);
992        }
993    }
994
995    /// Send text to participants in the room.
996    ///
997    /// This method sends a complete text string to participants in the room as a text stream.
998    /// The text is sent in a single operation, and the method returns information about the
999    /// stream used to send the text.
1000    ///
1001    /// # Arguments
1002    ///
1003    /// * `text` - The text content to send.
1004    /// * `options` - Configuration options for the text stream, including topic and
1005    ///   destination participants.
1006    ///
1007    pub async fn send_text(
1008        &self,
1009        text: &str,
1010        options: StreamTextOptions,
1011    ) -> StreamResult<TextStreamInfo> {
1012        self.session().unwrap().outgoing_stream_manager.send_text(text, options).await
1013    }
1014
1015    /// Send a file on disk to participants in the room.
1016    ///
1017    /// This method reads a file from the specified path and sends its contents
1018    /// to participants in the room as a byte stream, and the method returns information
1019    /// the stream used to send the file.
1020    ///
1021    /// # Arguments
1022    ///
1023    /// * `path` - Path to the file to be sent.
1024    /// * `options` - Configuration options for the byte stream, including topic and
1025    ///   destination participants.
1026    ///
1027    pub async fn send_file(
1028        &self,
1029        path: impl AsRef<Path>,
1030        options: StreamByteOptions,
1031    ) -> StreamResult<ByteStreamInfo> {
1032        self.session().unwrap().outgoing_stream_manager.send_file(path, options).await
1033    }
1034
1035    /// Send an in-memory blob of bytes to participants in the room.
1036    ///
1037    /// This method sends a provided byte slice as a byte stream.
1038    ///
1039    /// # Arguments
1040    ///
1041    /// * `data` - The bytes to send.
1042    /// * `options` - Configuration options for the byte stream, including topic and
1043    ///   destination participants.
1044    pub async fn send_bytes(
1045        &self,
1046        data: impl AsRef<[u8]>,
1047        options: StreamByteOptions,
1048    ) -> StreamResult<ByteStreamInfo> {
1049        self.session().unwrap().outgoing_stream_manager.send_bytes(data, options).await
1050    }
1051
1052    /// Stream text incrementally to participants in the room.
1053    ///
1054    /// This method allows sending text data in chunks as it becomes available.
1055    /// Unlike `send_text`, which sends the entire text at once, this method returns
1056    /// a writer that can be used to send text incrementally.
1057    ///
1058    /// # Arguments
1059    ///
1060    /// * `options` - Configuration options for the text stream, including topic and
1061    ///   destination participants.
1062    ///
1063    pub async fn stream_text(&self, options: StreamTextOptions) -> StreamResult<TextStreamWriter> {
1064        self.session().unwrap().outgoing_stream_manager.stream_text(options).await
1065    }
1066
1067    /// Stream bytes incrementally to participants in the room.
1068    ///
1069    /// This method allows sending binary data in chunks as it becomes available.
1070    /// Unlike `send_file`, which sends the entire file at once, this method returns
1071    /// a writer that can be used to send binary data incrementally.
1072    ///
1073    /// # Arguments
1074    ///
1075    /// * `options` - Configuration options for the byte stream, including topic and
1076    ///   destination participants.
1077    ///
1078    pub async fn stream_bytes(&self, options: StreamByteOptions) -> StreamResult<ByteStreamWriter> {
1079        self.session().unwrap().outgoing_stream_manager.stream_bytes(options).await
1080    }
1081
1082    pub fn is_encrypted(&self) -> bool {
1083        *self.inner.is_encrypted.read()
1084    }
1085
1086    #[doc(hidden)]
1087    pub fn update_data_encryption_status(&self, _is_encrypted: bool) {
1088        // Local participants don't receive data messages, so this is a no-op
1089    }
1090}