1use std::{
16 collections::HashMap,
17 fmt::Debug,
18 future::Future,
19 path::Path,
20 pin::Pin,
21 sync::{Arc, Weak},
22 time::Duration,
23};
24
25use super::{
26 ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantKindDetail,
27 ParticipantTrackPermission,
28};
29use crate::{
30 data_stream::{
31 ByteStreamInfo, ByteStreamWriter, StreamByteOptions, StreamResult, StreamTextOptions,
32 TextStreamInfo, TextStreamWriter,
33 },
34 e2ee::EncryptionType,
35 options::{self, compute_video_encodings, video_layers_from_encodings, TrackPublishOptions},
36 prelude::*,
37 room::participant::rpc::{RpcError, RpcErrorCode, RpcInvocationData, MAX_PAYLOAD_BYTES},
38 rtc_engine::{EngineError, RtcEngine},
39 ChatMessage, DataPacket, RoomSession, RpcAck, RpcRequest, RpcResponse, SipDTMF, Transcription,
40};
41use chrono::Utc;
42use gosuto_libwebrtc::{native::create_random_uuid, rtp_parameters::RtpEncodingParameters};
43use livekit_api::signal_client::SignalError;
44use livekit_protocol as proto;
45use livekit_runtime::timeout;
46use parking_lot::{Mutex, RwLock};
47use proto::request_response::Reason;
48use semver::Version;
49use tokio::sync::oneshot;
50
51type RpcHandler = Arc<
52 dyn Fn(RpcInvocationData) -> Pin<Box<dyn Future<Output = Result<String, RpcError>> + Send>>
53 + Send
54 + Sync,
55>;
56
57const REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
58
59type LocalTrackPublishedHandler = Box<dyn Fn(LocalParticipant, LocalTrackPublication) + Send>;
60type LocalTrackUnpublishedHandler = Box<dyn Fn(LocalParticipant, LocalTrackPublication) + Send>;
61
62#[derive(Default)]
63struct LocalEvents {
64 local_track_published: Mutex<Option<LocalTrackPublishedHandler>>,
65 local_track_unpublished: Mutex<Option<LocalTrackUnpublishedHandler>>,
66}
67
68struct RpcState {
69 pending_acks: HashMap<String, oneshot::Sender<()>>,
70 pending_responses: HashMap<String, oneshot::Sender<Result<String, RpcError>>>,
71 handlers: HashMap<String, RpcHandler>,
72}
73
74impl RpcState {
75 fn new() -> Self {
76 Self {
77 pending_acks: HashMap::new(),
78 pending_responses: HashMap::new(),
79 handlers: HashMap::new(),
80 }
81 }
82}
83struct LocalInfo {
84 events: LocalEvents,
85 encryption_type: EncryptionType,
86 rpc_state: Mutex<RpcState>,
87 all_participants_allowed: Mutex<bool>,
88 track_permissions: Mutex<Vec<ParticipantTrackPermission>>,
89 session: RwLock<Option<Weak<RoomSession>>>,
90}
91
92#[derive(Clone)]
93pub struct LocalParticipant {
94 inner: Arc<ParticipantInner>,
95 local: Arc<LocalInfo>,
96}
97
98impl Debug for LocalParticipant {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 f.debug_struct("LocalParticipant")
101 .field("sid", &self.sid())
102 .field("identity", &self.identity())
103 .field("name", &self.name())
104 .finish()
105 }
106}
107
108impl LocalParticipant {
109 pub(crate) fn new(
110 rtc_engine: Arc<RtcEngine>,
111 kind: ParticipantKind,
112 kind_details: Vec<ParticipantKindDetail>,
113 sid: ParticipantSid,
114 identity: ParticipantIdentity,
115 name: String,
116 metadata: String,
117 attributes: HashMap<String, String>,
118 encryption_type: EncryptionType,
119 permission: Option<proto::ParticipantPermission>,
120 ) -> Self {
121 Self {
122 inner: super::new_inner(
123 rtc_engine,
124 sid,
125 identity,
126 name,
127 metadata,
128 attributes,
129 kind,
130 kind_details,
131 permission,
132 ),
133 local: Arc::new(LocalInfo {
134 events: LocalEvents::default(),
135 encryption_type,
136 rpc_state: Mutex::new(RpcState::new()),
137 all_participants_allowed: Mutex::new(true),
138 track_permissions: Mutex::new(vec![]),
139 session: Default::default(),
140 }),
141 }
142 }
143
144 pub(crate) fn set_session(&self, session: Weak<RoomSession>) {
145 *self.local.session.write() = Some(session);
146 }
147
148 pub(crate) fn session(&self) -> Option<Arc<RoomSession>> {
149 self.local.session.read().as_ref().and_then(|s| s.upgrade())
150 }
151
152 pub(crate) fn internal_track_publications(&self) -> HashMap<TrackSid, TrackPublication> {
153 self.inner.track_publications.read().clone()
154 }
155
156 pub(crate) fn update_info(&self, info: proto::ParticipantInfo) {
157 super::update_info(&self.inner, &Participant::Local(self.clone()), info);
158 }
159
160 pub(crate) fn set_speaking(&self, speaking: bool) {
161 super::set_speaking(&self.inner, &Participant::Local(self.clone()), speaking);
162 }
163
164 pub(crate) fn set_audio_level(&self, level: f32) {
165 super::set_audio_level(&self.inner, &Participant::Local(self.clone()), level);
166 }
167
168 pub(crate) fn set_connection_quality(&self, quality: ConnectionQuality) {
169 super::set_connection_quality(&self.inner, &Participant::Local(self.clone()), quality);
170 }
171
172 pub(crate) fn on_local_track_published(
173 &self,
174 handler: impl Fn(LocalParticipant, LocalTrackPublication) + Send + 'static,
175 ) {
176 *self.local.events.local_track_published.lock() = Some(Box::new(handler));
177 }
178
179 pub(crate) fn on_local_track_unpublished(
180 &self,
181 handler: impl Fn(LocalParticipant, LocalTrackPublication) + Send + 'static,
182 ) {
183 *self.local.events.local_track_unpublished.lock() = Some(Box::new(handler));
184 }
185
186 pub(crate) fn on_track_muted(
187 &self,
188 handler: impl Fn(Participant, TrackPublication) + Send + 'static,
189 ) {
190 super::on_track_muted(&self.inner, handler)
191 }
192
193 pub(crate) fn on_track_unmuted(
194 &self,
195 handler: impl Fn(Participant, TrackPublication) + Send + 'static,
196 ) {
197 super::on_track_unmuted(&self.inner, handler)
198 }
199
200 pub(crate) fn on_metadata_changed(
201 &self,
202 handler: impl Fn(Participant, String, String) + Send + 'static,
203 ) {
204 super::on_metadata_changed(&self.inner, handler)
205 }
206
207 pub(crate) fn on_name_changed(
208 &self,
209 handler: impl Fn(Participant, String, String) + Send + 'static,
210 ) {
211 super::on_name_changed(&self.inner, handler)
212 }
213
214 pub(crate) fn on_attributes_changed(
215 &self,
216 handler: impl Fn(Participant, HashMap<String, String>) + Send + 'static,
217 ) {
218 super::on_attributes_changed(&self.inner, handler)
219 }
220
221 pub(crate) fn on_permission_changed(
222 &self,
223 handler: impl Fn(Participant, Option<proto::ParticipantPermission>) + Send + 'static,
224 ) {
225 super::on_permission_changed(&self.inner, handler)
226 }
227
228 pub(crate) fn add_publication(&self, publication: TrackPublication) {
229 super::add_publication(&self.inner, &Participant::Local(self.clone()), publication);
230 }
231
232 pub(crate) fn remove_publication(&self, sid: &TrackSid) -> Option<TrackPublication> {
233 super::remove_publication(&self.inner, &Participant::Local(self.clone()), sid)
234 }
235
236 pub(crate) fn published_tracks_info(&self) -> Vec<proto::TrackPublishedResponse> {
237 let tracks = self.track_publications();
238 let mut vec = Vec::with_capacity(tracks.len());
239
240 for p in tracks.values() {
241 if let Some(track) = p.track() {
242 vec.push(proto::TrackPublishedResponse {
243 cid: track.rtc_track().id(),
244 track: Some(p.proto_info()),
245 });
246 }
247 }
248
249 vec
250 }
251
252 pub async fn publish_track(
253 &self,
254 track: LocalTrack,
255 options: TrackPublishOptions,
256 ) -> RoomResult<LocalTrackPublication> {
257 let disable_red = self.local.encryption_type != EncryptionType::None || !options.red;
258
259 let mut req = proto::AddTrackRequest {
260 cid: track.rtc_track().id(),
261 name: track.name(),
262 r#type: proto::TrackType::from(track.kind()) as i32,
263 muted: track.is_muted(),
264 source: proto::TrackSource::from(options.source) as i32,
265 disable_dtx: !options.dtx,
266 disable_red,
267 encryption: proto::encryption::Type::from(self.local.encryption_type) as i32,
268 stream: options.stream.clone(),
269 ..Default::default()
270 };
271
272 if options.preconnect_buffer {
273 req.audio_features.push(proto::AudioTrackFeature::TfPreconnectBuffer as i32);
274 }
275
276 let mut encodings = Vec::default();
277 match &track {
278 LocalTrack::Video(video_track) => {
279 let resolution = video_track.rtc_source().video_resolution();
282 req.width = resolution.width;
283 req.height = resolution.height;
284
285 encodings = compute_video_encodings(req.width, req.height, &options);
286 req.layers = video_layers_from_encodings(req.width, req.height, &encodings);
287
288 if options.simulcast && encodings.len() > 1 {
290 req.simulcast_codecs = vec![proto::SimulcastCodec {
291 codec: options.video_codec.as_str().to_string(),
292 cid: track.rtc_track().id(),
293 layers: req.layers.clone(),
294 ..Default::default()
295 }];
296 }
297 }
298 LocalTrack::Audio(_audio_track) => {
299 let audio_encoding =
301 options.audio_encoding.as_ref().unwrap_or(&options::audio::MUSIC.encoding);
302
303 encodings.push(RtpEncodingParameters {
304 max_bitrate: Some(audio_encoding.max_bitrate),
305 ..Default::default()
306 });
307 }
308 }
309 let track_info = self.inner.rtc_engine.add_track(req).await?;
310 let publication = LocalTrackPublication::new(track_info.clone(), track.clone());
311 track.update_info(track_info); publication.set_track(Some(track.clone().into()));
315
316 let transceiver =
317 self.inner.rtc_engine.create_sender(track.clone(), options.clone(), encodings).await?;
318
319 track.set_transceiver(Some(transceiver));
320
321 self.inner.rtc_engine.publisher_negotiation_needed();
322
323 publication.update_publish_options(options);
324 self.add_publication(TrackPublication::Local(publication.clone()));
325
326 if let Some(local_track_published) = self.local.events.local_track_published.lock().as_ref()
327 {
328 local_track_published(self.clone(), publication.clone());
329 }
330 track.enable();
331
332 Ok(publication)
333 }
334
335 pub async fn set_metadata(&self, metadata: String) -> RoomResult<()> {
336 if let Ok(response) = timeout(REQUEST_TIMEOUT, {
337 let request_id = self.inner.rtc_engine.session().signal_client().next_request_id();
338 self.inner
339 .rtc_engine
340 .send_request(proto::signal_request::Message::UpdateMetadata(
341 proto::UpdateParticipantMetadata {
342 metadata,
343 name: self.name(),
344 attributes: Default::default(),
345 request_id,
346 ..Default::default()
347 },
348 ))
349 .await;
350 self.inner.rtc_engine.get_response(request_id)
351 })
352 .await
353 {
354 match response.reason() {
355 Reason::Ok => Ok(()),
356 reason => Err(RoomError::Request { reason, message: response.message }),
357 }
358 } else {
359 Err(RoomError::Engine(EngineError::Signal(SignalError::Timeout(
360 "request timeout".into(),
361 ))))
362 }
363 }
364
365 pub async fn set_attributes(&self, attributes: HashMap<String, String>) -> RoomResult<()> {
366 if let Ok(response) = timeout(REQUEST_TIMEOUT, {
367 let request_id = self.inner.rtc_engine.session().signal_client().next_request_id();
368 self.inner
369 .rtc_engine
370 .send_request(proto::signal_request::Message::UpdateMetadata(
371 proto::UpdateParticipantMetadata {
372 attributes,
373 metadata: self.metadata(),
374 name: self.name(),
375 request_id,
376 ..Default::default()
377 },
378 ))
379 .await;
380 self.inner.rtc_engine.get_response(request_id)
381 })
382 .await
383 {
384 match response.reason() {
385 Reason::Ok => Ok(()),
386 reason => Err(RoomError::Request { reason, message: response.message }),
387 }
388 } else {
389 Err(RoomError::Engine(EngineError::Signal(SignalError::Timeout(
390 "request timeout".into(),
391 ))))
392 }
393 }
394
395 pub async fn set_name(&self, name: String) -> RoomResult<()> {
396 if let Ok(response) = timeout(REQUEST_TIMEOUT, {
397 let request_id = self.inner.rtc_engine.session().signal_client().next_request_id();
398 self.inner
399 .rtc_engine
400 .send_request(proto::signal_request::Message::UpdateMetadata(
401 proto::UpdateParticipantMetadata {
402 name,
403 metadata: self.metadata(),
404 attributes: Default::default(),
405 request_id,
406 ..Default::default()
407 },
408 ))
409 .await;
410 self.inner.rtc_engine.get_response(request_id)
411 })
412 .await
413 {
414 match response.reason() {
415 Reason::Ok => Ok(()),
416 reason => Err(RoomError::Request { reason, message: response.message }),
417 }
418 } else {
419 Err(RoomError::Engine(EngineError::Signal(SignalError::Timeout(
420 "request timeout".into(),
421 ))))
422 }
423 }
424
425 pub async fn send_chat_message(
426 &self,
427 text: String,
428 destination_identities: Option<Vec<String>>,
429 sender_identity: Option<String>,
430 ) -> RoomResult<ChatMessage> {
431 let chat_message = proto::ChatMessage {
432 id: create_random_uuid(),
433 timestamp: Utc::now().timestamp_millis(),
434 message: text,
435 ..Default::default()
436 };
437
438 let data = proto::DataPacket {
439 value: Some(proto::data_packet::Value::ChatMessage(chat_message.clone())),
440 participant_identity: sender_identity.unwrap_or_default(),
441 destination_identities: destination_identities.unwrap_or_default(),
442 ..Default::default()
443 };
444
445 match self.inner.rtc_engine.publish_data(data, DataPacketKind::Reliable, false).await {
446 Ok(_) => Ok(ChatMessage::from(chat_message)),
447 Err(e) => Err(Into::into(e)),
448 }
449 }
450
451 pub async fn edit_chat_message(
452 &self,
453 edit_text: String,
454 original_message: ChatMessage,
455 destination_identities: Option<Vec<String>>,
456 sender_identity: Option<String>,
457 ) -> RoomResult<ChatMessage> {
458 let edited_message = ChatMessage {
459 message: edit_text,
460 edit_timestamp: Utc::now().timestamp_millis().into(),
461 ..original_message
462 };
463 let proto_msg = proto::ChatMessage::from(edited_message);
464 let data = proto::DataPacket {
465 value: Some(proto::data_packet::Value::ChatMessage(proto_msg.clone())),
466 participant_identity: sender_identity.unwrap_or_default(),
467 destination_identities: destination_identities.unwrap_or_default(),
468 ..Default::default()
469 };
470
471 match self.inner.rtc_engine.publish_data(data, DataPacketKind::Reliable, false).await {
472 Ok(_) => Ok(ChatMessage::from(proto_msg)),
473 Err(e) => Err(Into::into(e)),
474 }
475 }
476
477 pub async fn unpublish_track(
478 &self,
479 sid: &TrackSid,
480 ) -> RoomResult<LocalTrackPublication> {
482 let publication = self.remove_publication(sid);
483 if let Some(TrackPublication::Local(publication)) = publication {
484 let track = publication.track().unwrap();
485 let sender = track.transceiver().unwrap().sender();
486
487 self.inner.rtc_engine.remove_track(sender)?;
488 track.set_transceiver(None);
489
490 if let Some(local_track_unpublished) =
491 self.local.events.local_track_unpublished.lock().as_ref()
492 {
493 local_track_unpublished(self.clone(), publication.clone());
494 }
495
496 publication.set_track(None);
497 self.inner.rtc_engine.publisher_negotiation_needed();
498
499 Ok(publication)
500 } else {
501 Err(RoomError::Internal("track not found".to_string()))
502 }
503 }
504
505 pub async fn publish_raw_data(
507 self,
508 packet: proto::DataPacket,
509 reliable: bool,
510 ) -> RoomResult<()> {
511 let kind = match reliable {
512 true => DataPacketKind::Reliable,
513 false => DataPacketKind::Lossy,
514 };
515 self.inner.rtc_engine.publish_data(packet, kind, true).await.map_err(Into::into)
516 }
517
518 pub async fn publish_data(&self, packet: DataPacket) -> RoomResult<()> {
519 let kind = match packet.reliable {
520 true => DataPacketKind::Reliable,
521 false => DataPacketKind::Lossy,
522 };
523 let destination_identities: Vec<String> =
524 packet.destination_identities.into_iter().map(Into::into).collect();
525 let data = proto::DataPacket {
526 kind: kind as i32,
527 destination_identities: destination_identities.clone(),
528 value: Some(proto::data_packet::Value::User(proto::UserPacket {
529 payload: packet.payload,
530 topic: packet.topic,
531 ..Default::default()
532 })),
533 ..Default::default()
534 };
535
536 self.inner.rtc_engine.publish_data(data, kind, false).await.map_err(Into::into)
537 }
538
539 pub fn set_data_channel_buffered_amount_low_threshold(
540 &self,
541 threshold: u64,
542 kind: DataPacketKind,
543 ) -> RoomResult<()> {
544 self.inner
545 .rtc_engine
546 .session()
547 .set_data_channel_buffered_amount_low_threshold(threshold, kind);
548 Ok(())
549 }
550
551 pub fn data_channel_buffered_amount_low_threshold(
552 &self,
553 kind: DataPacketKind,
554 ) -> RoomResult<u64> {
555 Ok(self.inner.rtc_engine.session().data_channel_buffered_amount_low_threshold(kind))
556 }
557
558 pub async fn set_track_subscription_permissions(
559 &self,
560 all_participants_allowed: bool,
561 permissions: Vec<ParticipantTrackPermission>,
562 ) -> RoomResult<()> {
563 *self.local.track_permissions.lock() = permissions;
564 *self.local.all_participants_allowed.lock() = all_participants_allowed;
565 self.update_track_subscription_permissions().await;
566 Ok(())
567 }
568
569 pub async fn publish_transcription(&self, packet: Transcription) -> RoomResult<()> {
570 let segments: Vec<proto::TranscriptionSegment> = packet
571 .segments
572 .into_iter()
573 .map(|segment| proto::TranscriptionSegment {
574 id: segment.id,
575 start_time: segment.start_time,
576 end_time: segment.end_time,
577 text: segment.text,
578 r#final: segment.r#final,
579 language: segment.language,
580 })
581 .collect();
582 let transcription_packet = proto::Transcription {
583 transcribed_participant_identity: packet.participant_identity,
584 segments: segments,
585 track_id: packet.track_id,
586 };
587 let data = proto::DataPacket {
588 value: Some(proto::data_packet::Value::Transcription(transcription_packet)),
589 ..Default::default()
590 };
591 self.inner
592 .rtc_engine
593 .publish_data(data, DataPacketKind::Reliable, false)
594 .await
595 .map_err(Into::into)
596 }
597
598 pub async fn publish_dtmf(&self, dtmf: SipDTMF) -> RoomResult<()> {
599 let destination_identities: Vec<String> =
600 dtmf.destination_identities.into_iter().map(Into::into).collect();
601 let dtmf_message = proto::SipDtmf { code: dtmf.code, digit: dtmf.digit };
602
603 let data = proto::DataPacket {
604 value: Some(proto::data_packet::Value::SipDtmf(dtmf_message)),
605 destination_identities: destination_identities.clone(),
606 ..Default::default()
607 };
608
609 self.inner
610 .rtc_engine
611 .publish_data(data, DataPacketKind::Reliable, false)
612 .await
613 .map_err(Into::into)
614 }
615
616 async fn publish_rpc_request(&self, rpc_request: RpcRequest) -> RoomResult<()> {
617 let destination_identities = vec![rpc_request.destination_identity];
618 let rpc_request_message = proto::RpcRequest {
619 id: rpc_request.id,
620 method: rpc_request.method,
621 payload: rpc_request.payload,
622 response_timeout_ms: rpc_request.response_timeout.as_millis() as u32,
623 version: rpc_request.version,
624 ..Default::default()
625 };
626
627 let data = proto::DataPacket {
628 value: Some(proto::data_packet::Value::RpcRequest(rpc_request_message)),
629 destination_identities,
630 ..Default::default()
631 };
632
633 self.inner
634 .rtc_engine
635 .publish_data(data, DataPacketKind::Reliable, false)
636 .await
637 .map_err(Into::into)
638 }
639
640 async fn publish_rpc_response(&self, rpc_response: RpcResponse) -> RoomResult<()> {
641 let destination_identities = vec![rpc_response.destination_identity];
642 let rpc_response_message = proto::RpcResponse {
643 request_id: rpc_response.request_id,
644 value: Some(match rpc_response.error {
645 Some(error) => proto::rpc_response::Value::Error(proto::RpcError {
646 code: error.code,
647 message: error.message,
648 data: error.data,
649 }),
650 None => proto::rpc_response::Value::Payload(rpc_response.payload.unwrap()),
651 }),
652 ..Default::default()
653 };
654
655 let data = proto::DataPacket {
656 value: Some(proto::data_packet::Value::RpcResponse(rpc_response_message)),
657 destination_identities: destination_identities.clone(),
658 ..Default::default()
659 };
660
661 self.inner
662 .rtc_engine
663 .publish_data(data, DataPacketKind::Reliable, false)
664 .await
665 .map_err(Into::into)
666 }
667
668 async fn publish_rpc_ack(&self, rpc_ack: RpcAck) -> RoomResult<()> {
669 let destination_identities = vec![rpc_ack.destination_identity];
670 let rpc_ack_message =
671 proto::RpcAck { request_id: rpc_ack.request_id, ..Default::default() };
672
673 let data = proto::DataPacket {
674 value: Some(proto::data_packet::Value::RpcAck(rpc_ack_message)),
675 destination_identities: destination_identities.clone(),
676 ..Default::default()
677 };
678
679 self.inner
680 .rtc_engine
681 .publish_data(data, DataPacketKind::Reliable, false)
682 .await
683 .map_err(Into::into)
684 }
685
686 pub(crate) async fn update_track_subscription_permissions(&self) {
687 let all_participants_allowed = *self.local.all_participants_allowed.lock();
688 let track_permissions = self
689 .local
690 .track_permissions
691 .lock()
692 .iter()
693 .map(|p| proto::TrackPermission::from(p.clone()))
694 .collect();
695
696 self.inner
697 .rtc_engine
698 .send_request(proto::signal_request::Message::SubscriptionPermission(
699 proto::SubscriptionPermission {
700 all_participants: all_participants_allowed,
701 track_permissions,
702 },
703 ))
704 .await;
705 }
706
707 pub fn get_track_publication(&self, sid: &TrackSid) -> Option<LocalTrackPublication> {
708 self.inner.track_publications.read().get(sid).map(|track| {
709 if let TrackPublication::Local(local) = track {
710 return local.clone();
711 }
712
713 unreachable!()
714 })
715 }
716
717 pub fn sid(&self) -> ParticipantSid {
718 self.inner.info.read().sid.clone()
719 }
720
721 pub fn identity(&self) -> ParticipantIdentity {
722 self.inner.info.read().identity.clone()
723 }
724
725 pub fn name(&self) -> String {
726 self.inner.info.read().name.clone()
727 }
728
729 pub fn metadata(&self) -> String {
730 self.inner.info.read().metadata.clone()
731 }
732
733 pub fn attributes(&self) -> HashMap<String, String> {
734 self.inner.info.read().attributes.clone()
735 }
736
737 pub fn is_speaking(&self) -> bool {
738 self.inner.info.read().speaking
739 }
740
741 pub fn track_publications(&self) -> HashMap<TrackSid, LocalTrackPublication> {
742 self.inner
743 .track_publications
744 .read()
745 .clone()
746 .into_iter()
747 .map(|(sid, track)| {
748 if let TrackPublication::Local(local) = track {
749 return (sid, local);
750 }
751
752 unreachable!()
753 })
754 .collect()
755 }
756
757 pub fn audio_level(&self) -> f32 {
758 self.inner.info.read().audio_level
759 }
760
761 pub fn connection_quality(&self) -> ConnectionQuality {
762 self.inner.info.read().connection_quality
763 }
764
765 pub fn kind(&self) -> ParticipantKind {
766 self.inner.info.read().kind
767 }
768
769 pub fn kind_details(&self) -> Vec<ParticipantKindDetail> {
770 self.inner.info.read().kind_details.clone()
771 }
772
773 pub fn disconnect_reason(&self) -> DisconnectReason {
774 self.inner.info.read().disconnect_reason
775 }
776
777 pub fn permission(&self) -> Option<proto::ParticipantPermission> {
778 self.inner.info.read().permission.clone()
779 }
780
781 pub async fn perform_rpc(&self, data: PerformRpcData) -> Result<String, RpcError> {
782 let max_round_trip_latency = Duration::from_millis(7000);
786 let min_effective_timeout = Duration::from_millis(1000);
787
788 if data.payload.len() > MAX_PAYLOAD_BYTES {
789 return Err(RpcError::built_in(RpcErrorCode::RequestPayloadTooLarge, None));
790 }
791
792 if let Some(server_info) =
793 self.inner.rtc_engine.session().signal_client().join_response().server_info
794 {
795 if !server_info.version.is_empty() {
796 let server_version = Version::parse(&server_info.version).unwrap();
797 let min_required_version = Version::parse("1.8.0").unwrap();
798 if server_version < min_required_version {
799 return Err(RpcError::built_in(RpcErrorCode::UnsupportedServer, None));
800 }
801 }
802 }
803
804 let id = create_random_uuid();
805 let (ack_tx, ack_rx) = oneshot::channel();
806 let (response_tx, response_rx) = oneshot::channel();
807 let effective_timeout = std::cmp::max(
808 data.response_timeout.saturating_sub(max_round_trip_latency),
809 min_effective_timeout,
810 );
811
812 {
815 let mut rpc_state = self.local.rpc_state.lock();
816 rpc_state.pending_acks.insert(id.clone(), ack_tx);
817 rpc_state.pending_responses.insert(id.clone(), response_tx);
818 }
819
820 if let Err(e) = self
821 .publish_rpc_request(RpcRequest {
822 destination_identity: data.destination_identity.clone(),
823 id: id.clone(),
824 method: data.method.clone(),
825 payload: data.payload.clone(),
826 response_timeout: effective_timeout,
827 version: 1,
828 })
829 .await
830 {
831 let mut rpc_state = self.local.rpc_state.lock();
833 rpc_state.pending_acks.remove(&id);
834 rpc_state.pending_responses.remove(&id);
835 log::error!("Failed to publish RPC request: {}", e);
836 return Err(RpcError::built_in(RpcErrorCode::SendFailed, Some(e.to_string())));
837 }
838
839 match tokio::time::timeout(max_round_trip_latency, ack_rx).await {
841 Err(_) => {
842 let mut rpc_state = self.local.rpc_state.lock();
843 rpc_state.pending_acks.remove(&id);
844 rpc_state.pending_responses.remove(&id);
845 return Err(RpcError::built_in(RpcErrorCode::ConnectionTimeout, None));
846 }
847 Ok(_) => {
848 }
850 }
851
852 let response = match tokio::time::timeout(data.response_timeout, response_rx).await {
854 Err(_) => {
855 self.local.rpc_state.lock().pending_responses.remove(&id);
856 return Err(RpcError::built_in(RpcErrorCode::ResponseTimeout, None));
857 }
858 Ok(result) => result,
859 };
860
861 match response {
862 Err(_) => {
863 Err(RpcError::built_in(RpcErrorCode::RecipientDisconnected, None))
865 }
866 Ok(Err(e)) => {
867 Err(e)
869 }
870 Ok(Ok(payload)) => {
871 Ok(payload)
873 }
874 }
875 }
876
877 pub fn register_rpc_method(
878 &self,
879 method: String,
880 handler: impl Fn(RpcInvocationData) -> Pin<Box<dyn Future<Output = Result<String, RpcError>> + Send>>
881 + Send
882 + Sync
883 + 'static,
884 ) {
885 self.local.rpc_state.lock().handlers.insert(method, Arc::new(handler));
886
887 self.inner.rtc_engine.publisher_negotiation_needed();
891 }
892
893 pub fn unregister_rpc_method(&self, method: String) {
894 self.local.rpc_state.lock().handlers.remove(&method);
895 }
896
897 pub(crate) fn handle_incoming_rpc_ack(&self, request_id: String) {
898 let mut rpc_state = self.local.rpc_state.lock();
899 if let Some(tx) = rpc_state.pending_acks.remove(&request_id) {
900 let _ = tx.send(());
901 } else {
902 log::error!("Ack received for unexpected RPC request: {}", request_id);
903 }
904 }
905
906 pub(crate) fn handle_incoming_rpc_response(
907 &self,
908 request_id: String,
909 payload: Option<String>,
910 error: Option<proto::RpcError>,
911 ) {
912 let mut rpc_state = self.local.rpc_state.lock();
913 if let Some(tx) = rpc_state.pending_responses.remove(&request_id) {
914 let _ = tx.send(match error {
915 Some(e) => Err(RpcError::from_proto(e)),
916 None => Ok(payload.unwrap_or_default()),
917 });
918 } else {
919 log::error!("Response received for unexpected RPC request: {}", request_id);
920 }
921 }
922
923 pub(crate) async fn handle_incoming_rpc_request(
924 &self,
925 caller_identity: ParticipantIdentity,
926 request_id: String,
927 method: String,
928 payload: String,
929 response_timeout: Duration,
930 version: u32,
931 ) {
932 if let Err(e) = self
933 .publish_rpc_ack(RpcAck {
934 destination_identity: caller_identity.to_string(),
935 request_id: request_id.clone(),
936 })
937 .await
938 {
939 log::error!("Failed to publish RPC ACK: {:?}", e);
940 }
941
942 let caller_identity_2 = caller_identity.clone();
943 let request_id_2 = request_id.clone();
944
945 let response = if version != 1 {
946 Err(RpcError::built_in(RpcErrorCode::UnsupportedVersion, None))
947 } else {
948 let handler = self.local.rpc_state.lock().handlers.get(&method).cloned();
949
950 match handler {
951 Some(handler) => {
952 match tokio::task::spawn(async move {
953 handler(RpcInvocationData {
954 request_id: request_id.clone(),
955 caller_identity: caller_identity.clone(),
956 payload: payload.clone(),
957 response_timeout,
958 })
959 .await
960 })
961 .await
962 {
963 Ok(result) => result,
964 Err(e) => {
965 log::error!("RPC method handler returned an error: {:?}", e);
966 Err(RpcError::built_in(RpcErrorCode::ApplicationError, None))
967 }
968 }
969 }
970 None => Err(RpcError::built_in(RpcErrorCode::UnsupportedMethod, None)),
971 }
972 };
973
974 let (payload, error) = match response {
975 Ok(response_payload) if response_payload.len() <= MAX_PAYLOAD_BYTES => {
976 (Some(response_payload), None)
977 }
978 Ok(_) => (None, Some(RpcError::built_in(RpcErrorCode::ResponsePayloadTooLarge, None))),
979 Err(e) => (None, Some(e.into())),
980 };
981
982 if let Err(e) = self
983 .publish_rpc_response(RpcResponse {
984 destination_identity: caller_identity_2.to_string(),
985 request_id: request_id_2,
986 payload,
987 error: error.map(|e| e.to_proto()),
988 })
989 .await
990 {
991 log::error!("Failed to publish RPC response: {:?}", e);
992 }
993 }
994
995 pub async fn send_text(
1008 &self,
1009 text: &str,
1010 options: StreamTextOptions,
1011 ) -> StreamResult<TextStreamInfo> {
1012 self.session().unwrap().outgoing_stream_manager.send_text(text, options).await
1013 }
1014
1015 pub async fn send_file(
1028 &self,
1029 path: impl AsRef<Path>,
1030 options: StreamByteOptions,
1031 ) -> StreamResult<ByteStreamInfo> {
1032 self.session().unwrap().outgoing_stream_manager.send_file(path, options).await
1033 }
1034
1035 pub async fn send_bytes(
1045 &self,
1046 data: impl AsRef<[u8]>,
1047 options: StreamByteOptions,
1048 ) -> StreamResult<ByteStreamInfo> {
1049 self.session().unwrap().outgoing_stream_manager.send_bytes(data, options).await
1050 }
1051
1052 pub async fn stream_text(&self, options: StreamTextOptions) -> StreamResult<TextStreamWriter> {
1064 self.session().unwrap().outgoing_stream_manager.stream_text(options).await
1065 }
1066
1067 pub async fn stream_bytes(&self, options: StreamByteOptions) -> StreamResult<ByteStreamWriter> {
1079 self.session().unwrap().outgoing_stream_manager.stream_bytes(options).await
1080 }
1081
1082 pub fn is_encrypted(&self) -> bool {
1083 *self.inner.is_encrypted.read()
1084 }
1085
1086 #[doc(hidden)]
1087 pub fn update_data_encryption_status(&self, _is_encrypted: bool) {
1088 }
1090}