1pub mod receiver;
6pub mod sender;
7mod transitable_state;
8
9use std::{cell::RefCell, collections::HashMap, rc::Rc};
10
11use derive_more::with_trait::{Display, From};
12use futures::{
13 FutureExt as _, TryFutureExt as _, channel::mpsc, future,
14 future::LocalBoxFuture,
15};
16use medea_client_api_proto as proto;
17#[cfg(feature = "mockable")]
18use medea_client_api_proto::{ConnectionMode, MemberId};
19use proto::{MediaSourceKind, MediaType, TrackId};
20use tracerr::Traced;
21
22#[doc(inline)]
23pub use self::{
24 receiver::Receiver,
25 sender::Sender,
26 transitable_state::{
27 InStable, InTransition, MediaExchangeState,
28 MediaExchangeStateController, MediaState, MuteState,
29 MuteStateController, TransitableState, TransitableStateController,
30 media_exchange_state, mute_state,
31 },
32};
33use super::tracks_request::TracksRequest;
34#[cfg(feature = "mockable")]
35use crate::media::{LocalTracksConstraints, RecvConstraints};
36use crate::{
37 media::{MediaKind, track::local},
38 peer::{LocalStreamUpdateCriteria, PeerEvent},
39 platform,
40 platform::{
41 TransceiverInit, send_encoding_parameters::SendEncodingParameters,
42 transceiver::probe_target_codecs,
43 },
44 utils::{Caused, Component},
45};
46
47pub trait TransceiverSide: MediaStateControllable {
49 fn track_id(&self) -> TrackId;
51
52 fn kind(&self) -> MediaKind;
54
55 fn source_kind(&self) -> MediaSourceKind;
57
58 fn is_transitable(&self) -> bool;
63}
64
65pub trait MediaStateControllable {
68 #[must_use]
70 fn media_exchange_state_controller(
71 &self,
72 ) -> Rc<MediaExchangeStateController>;
73
74 #[must_use]
76 fn mute_state_controller(&self) -> Rc<MuteStateController>;
77
78 fn media_exchange_state(&self) -> MediaExchangeState {
80 self.media_exchange_state_controller().state()
81 }
82
83 #[must_use]
85 fn mute_state(&self) -> MuteState {
86 self.mute_state_controller().state()
87 }
88
89 fn media_state_transition_to(
96 &self,
97 desired_state: MediaState,
98 ) -> Result<(), Traced<ProhibitedStateError>> {
99 match desired_state {
100 MediaState::MediaExchange(desired_state) => {
101 self.media_exchange_state_controller()
102 .transition_to(desired_state);
103 }
104 MediaState::Mute(desired_state) => {
105 self.mute_state_controller().transition_to(desired_state);
106 }
107 }
108
109 Ok(())
110 }
111
112 fn is_subscription_needed(&self, desired_state: MediaState) -> bool {
117 match desired_state {
118 MediaState::MediaExchange(media_exchange) => {
119 let current = self.media_exchange_state();
120 match current {
121 MediaExchangeState::Transition(_) => true,
122 MediaExchangeState::Stable(stable) => {
123 stable != media_exchange
124 }
125 }
126 }
127 MediaState::Mute(mute_state) => {
128 let current = self.mute_state();
129 match current {
130 MuteState::Transition(_) => true,
131 MuteState::Stable(stable) => stable != mute_state,
132 }
133 }
134 }
135 }
136
137 #[must_use]
144 fn is_track_patch_needed(&self, desired_state: MediaState) -> bool {
145 match desired_state {
146 MediaState::MediaExchange(media_exchange) => {
147 let current = self.media_exchange_state();
148 match current {
149 MediaExchangeState::Stable(stable) => {
150 stable != media_exchange
151 }
152 MediaExchangeState::Transition(transition) => {
153 transition.intended() != media_exchange
154 }
155 }
156 }
157 MediaState::Mute(mute_state) => {
158 let current = self.mute_state();
159 match current {
160 MuteState::Stable(stable) => stable != mute_state,
161 MuteState::Transition(transition) => {
162 transition.intended() != mute_state
163 }
164 }
165 }
166 }
167 }
168
169 fn when_media_state_stable(
180 &self,
181 desired_state: MediaState,
182 ) -> LocalBoxFuture<'static, Result<(), MediaState>> {
183 match desired_state {
184 MediaState::Mute(desired_state) => self
185 .mute_state_controller()
186 .when_media_state_stable(desired_state)
187 .map_err(MediaState::Mute)
188 .boxed_local(),
189 MediaState::MediaExchange(desired_state) => self
190 .media_exchange_state_controller()
191 .when_media_state_stable(desired_state)
192 .map_err(MediaState::MediaExchange)
193 .boxed_local(),
194 }
195 }
196}
197
198#[derive(Clone, Copy, Debug)]
200pub enum TrackDirection {
201 Send,
203
204 Recv,
206}
207
208#[derive(Clone, Copy, Debug, Display)]
210pub enum ProhibitedStateError {
211 #[display(
213 "`MediaExchangeState` of `Sender` can't transit to \
214 disabled state, because this `Sender` is required"
215 )]
216 CannotDisableRequiredSender,
217}
218
219#[derive(Caused, Clone, Debug, Display, From)]
221#[cause(error = platform::Error)]
222pub enum InsertLocalTracksError {
223 #[display("Provided `Track` doesn't satisfy senders constraints")]
225 InvalidMediaTrack,
226
227 #[display("Provided stream does not have all necessary `Track`s")]
229 NotEnoughTracks,
230
231 CouldNotInsertLocalTrack(#[cause] sender::InsertTrackError),
233}
234
235#[derive(Clone, Copy, Debug, Display)]
237pub enum GetMidsError {
238 #[display("Peer has senders without mid")]
240 SendersWithoutMid,
241
242 #[display("Peer has receivers without mid")]
244 ReceiversWithoutMid,
245}
246
247#[derive(Debug)]
249struct InnerMediaConnections {
250 peer: Rc<platform::RtcPeerConnection>,
254
255 peer_events_sender: mpsc::UnboundedSender<PeerEvent>,
257
258 senders: HashMap<TrackId, sender::Component>,
260
261 receivers: HashMap<TrackId, receiver::Component>,
263}
264
265impl InnerMediaConnections {
266 fn iter_senders_with_kind_and_source_kind(
269 &self,
270 kind: MediaKind,
271 source_kind: Option<MediaSourceKind>,
272 ) -> impl Iterator<Item = &sender::Component> {
273 self.senders
274 .values()
275 .filter(move |sender| sender.state().kind() == kind)
276 .filter(move |sender| {
277 source_kind
278 .is_none_or(|sk| sender.caps().media_source_kind() == sk)
279 })
280 }
281
282 fn iter_receivers_with_kind_and_source_kind(
285 &self,
286 kind: MediaKind,
287 source_kind: Option<MediaSourceKind>,
288 ) -> impl Iterator<Item = &receiver::Component> {
289 self.receivers
290 .values()
291 .filter(move |s| s.state().kind() == kind)
292 .filter(move |s| {
293 source_kind.is_none_or(|skind| s.state().source_kind() == skind)
294 })
295 }
296
297 fn get_transceivers_by_direction_and_kind(
300 &self,
301 direction: TrackDirection,
302 kind: MediaKind,
303 source_kind: Option<MediaSourceKind>,
304 ) -> Vec<Rc<dyn TransceiverSide>> {
305 match direction {
306 TrackDirection::Send => self
307 .iter_senders_with_kind_and_source_kind(kind, source_kind)
308 .map(|tx| -> Rc<dyn TransceiverSide> { tx.state() })
309 .collect(),
310 TrackDirection::Recv => self
311 .iter_receivers_with_kind_and_source_kind(kind, source_kind)
312 .map(|rx| -> Rc<dyn TransceiverSide> { rx.state() })
313 .collect(),
314 }
315 }
316
317 fn add_transceiver(
323 &self,
324 media_type: MediaType,
325 direction: platform::TransceiverDirection,
326 ) -> impl Future<Output = platform::Transceiver> + 'static + use<> {
327 let peer = Rc::clone(&self.peer);
328
329 async move {
330 let kind = MediaKind::from(&media_type);
331
332 match media_type {
333 MediaType::Audio(_) => {
334 peer.add_transceiver(kind, TransceiverInit::new(direction))
335 .await
336 }
337 MediaType::Video(settings) => {
338 let init = TransceiverInit::new(direction);
339
340 init.set_send_encodings(
341 settings
342 .encoding_parameters
343 .iter()
344 .cloned()
345 .map(SendEncodingParameters::from)
346 .collect(),
347 );
348
349 let transceiver = peer.add_transceiver(kind, init).await;
350 let target_codecs = probe_target_codecs(
351 settings
352 .encoding_parameters
353 .iter()
354 .filter_map(|e| e.codec.as_ref()),
355 )
356 .await;
357 if let Some(target_codecs) = target_codecs {
364 transceiver.set_codec_preferences(target_codecs);
365 }
366 transceiver
367 }
368 }
369 }
370 }
371
372 fn get_transceiver_by_mid(
376 &self,
377 mid: String,
378 ) -> impl Future<Output = Option<platform::Transceiver>> + 'static + use<>
379 {
380 self.peer.get_transceiver_by_mid(mid)
381 }
382}
383
384#[derive(Debug)]
387pub struct MediaConnections(RefCell<InnerMediaConnections>);
388
389impl MediaConnections {
390 #[must_use]
393 pub fn new(
394 peer: Rc<platform::RtcPeerConnection>,
395 peer_events_sender: mpsc::UnboundedSender<PeerEvent>,
396 ) -> Self {
397 Self(RefCell::new(InnerMediaConnections {
398 peer,
399 peer_events_sender,
400 senders: HashMap::new(),
401 receivers: HashMap::new(),
402 }))
403 }
404
405 pub fn get_transceivers_sides(
409 &self,
410 kind: MediaKind,
411 direction: TrackDirection,
412 source_kind: Option<MediaSourceKind>,
413 ) -> Vec<Rc<dyn TransceiverSide>> {
414 self.0.borrow().get_transceivers_by_direction_and_kind(
415 direction,
416 kind,
417 source_kind,
418 )
419 }
420
421 #[must_use]
425 pub fn is_all_tracks_in_media_state(
426 &self,
427 kind: MediaKind,
428 direction: TrackDirection,
429 source_kind: Option<MediaSourceKind>,
430 state: MediaState,
431 ) -> bool {
432 let transceivers =
433 self.0.borrow().get_transceivers_by_direction_and_kind(
434 direction,
435 kind,
436 source_kind,
437 );
438 for transceiver in transceivers {
439 if !transceiver.is_transitable() {
440 continue;
441 }
442
443 let not_in_state = match state {
444 MediaState::Mute(mute_state) => {
445 transceiver.mute_state() != mute_state.into()
446 }
447 MediaState::MediaExchange(media_exchange) => {
448 transceiver.media_exchange_state() != media_exchange.into()
449 }
450 };
451 if not_in_state {
452 return false;
453 }
454 }
455
456 true
457 }
458
459 pub fn get_mids(
466 &self,
467 ) -> Result<HashMap<TrackId, String>, Traced<GetMidsError>> {
468 let inner = self.0.borrow();
469 let mut mids =
470 HashMap::with_capacity(inner.senders.len() + inner.receivers.len());
471 #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
472 for (track_id, sender) in &inner.senders {
473 drop(
474 mids.insert(
475 *track_id,
476 sender
477 .mid()
478 .ok_or(GetMidsError::SendersWithoutMid)
479 .map_err(tracerr::wrap!())?,
480 ),
481 );
482 }
483 #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
484 for (track_id, receiver) in &inner.receivers {
485 drop(
486 mids.insert(
487 *track_id,
488 receiver
489 .mid()
490 .ok_or(GetMidsError::ReceiversWithoutMid)
491 .map_err(tracerr::wrap!())?,
492 ),
493 );
494 }
495 Ok(mids)
496 }
497
498 pub fn get_transceivers_statuses(
501 &self,
502 ) -> impl Future<Output = HashMap<TrackId, bool>> + 'static + use<> {
503 let inner = self.0.borrow();
504 let transceivers = inner
505 .senders
506 .iter()
507 .map(|(&track_id, sender)| {
508 let sender = sender.obj();
509 async move { (track_id, sender.is_publishing().await) }
510 .boxed_local()
511 })
512 .chain(inner.receivers.iter().map(|(&track_id, receiver)| {
513 let receiver = receiver.obj();
514 async move { (track_id, receiver.is_receiving().await) }
515 .boxed_local()
516 }))
517 .collect::<Vec<_>>();
518
519 future::join_all(transceivers).map(|r| r.into_iter().collect())
520 }
521
522 pub fn get_transceiver_side_by_id(
527 &self,
528 track_id: TrackId,
529 ) -> Option<Rc<dyn TransceiverSide>> {
530 let inner = self.0.borrow();
531 inner
532 .senders
533 .get(&track_id)
534 .map(|sndr| -> Rc<dyn TransceiverSide> { sndr.state() })
535 .or_else(|| {
536 inner
537 .receivers
538 .get(&track_id)
539 .map(|rcvr| -> Rc<dyn TransceiverSide> { rcvr.state() })
540 })
541 }
542
543 pub fn insert_sender(&self, sender: sender::Component) {
545 drop(self.0.borrow_mut().senders.insert(sender.state().id(), sender));
546 }
547
548 pub fn insert_receiver(&self, receiver: receiver::Component) {
550 drop(
551 self.0
552 .borrow_mut()
553 .receivers
554 .insert(receiver.state().id(), receiver),
555 );
556 }
557
558 pub fn get_tracks_request(
562 &self,
563 kinds: LocalStreamUpdateCriteria,
564 ) -> Option<TracksRequest> {
565 let mut stream_request = None;
566 #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
567 for sender in self.0.borrow().senders.values() {
568 if kinds
569 .has(sender.state().media_kind(), sender.state().media_source())
570 {
571 stream_request
572 .get_or_insert_with(TracksRequest::default)
573 .add_track_request(
574 sender.state().track_id(),
575 sender.caps().clone(),
576 );
577 }
578 }
579 stream_request
580 }
581
582 pub async fn insert_local_tracks(
597 &self,
598 tracks: &HashMap<TrackId, Rc<local::Track>>,
599 ) -> Result<
600 HashMap<TrackId, media_exchange_state::Stable>,
601 Traced<InsertLocalTracksError>,
602 > {
603 let mut sender_and_track =
605 Vec::with_capacity(self.0.borrow().senders.len());
606 let mut media_exchange_state_updates = HashMap::new();
607 let senders = self
608 .0
609 .borrow()
610 .senders
611 .values()
612 .map(|c| (c.obj(), c.state()))
613 .collect::<Vec<_>>();
614 for (sender, state) in senders {
615 if let Some(track) = tracks.get(&state.id()).cloned() {
616 if sender.caps().satisfies(track.as_ref()).await {
617 sender_and_track.push((sender, track));
618 } else {
619 return Err(tracerr::new!(
620 InsertLocalTracksError::InvalidMediaTrack
621 ));
622 }
623 } else if sender.caps().required() {
624 return Err(tracerr::new!(
625 InsertLocalTracksError::NotEnoughTracks
626 ));
627 } else {
628 _ = media_exchange_state_updates
629 .insert(state.id(), media_exchange_state::Stable::Disabled);
630 }
631 }
632
633 future::try_join_all(sender_and_track.into_iter().map(
634 async |(sender, track)| {
635 Rc::clone(&sender).insert_track(track).await
636 },
637 ))
638 .await
639 .map(drop)
640 .map_err(tracerr::map_from_and_wrap!())?;
641
642 Ok(media_exchange_state_updates)
643 }
644
645 pub async fn add_remote_track(
660 &self,
661 track: platform::MediaStreamTrack,
662 transceiver: platform::Transceiver,
663 ) -> Result<(), String> {
664 let mid = transceiver.mid().ok_or("No Transceiver::mid found")?;
667 let receiver = self
668 .0
669 .borrow()
670 .receivers
671 .values()
672 .find(|rcvr| rcvr.mid().as_ref() == Some(&mid))
673 .map(Component::obj);
674
675 if let Some(rcvr) = receiver {
676 rcvr.set_remote_track(transceiver, track).await;
677 Ok(())
678 } else {
679 Err(mid)
680 }
681 }
682
683 pub fn sync_receivers(&self) -> impl Future<Output = ()> + 'static + use<> {
690 future::join_all({
691 self.0
692 .borrow()
693 .receivers
694 .values()
695 .filter_map(|receiver| {
696 #[expect(clippy::question_mark, reason = "more readable")]
697 if receiver.transceiver().is_none() {
698 return None;
699 }
700 receiver.mid().map(|mid| {
701 let fut = {
702 self.0.borrow().peer.get_transceiver_by_mid(mid)
703 };
704 let receiver = Component::obj(receiver);
705 async move {
706 if let Some(t) = fut.await {
707 receiver.set_transceiver(t);
708 }
709 }
710 })
711 })
712 .collect::<Vec<_>>()
713 })
714 .map(drop)
715 }
716
717 pub fn get_senders_without_tracks_ids(
720 &self,
721 kinds: LocalStreamUpdateCriteria,
722 ) -> Vec<TrackId> {
723 self.0
724 .borrow()
725 .senders
726 .values()
727 .filter_map(|s| {
728 (kinds.has(s.state().kind(), s.state().source_kind())
729 && s.state().enabled()
730 && !s.has_track())
731 .then_some(s.state().id())
732 })
733 .collect()
734 }
735
736 pub async fn drop_send_tracks(&self, kinds: LocalStreamUpdateCriteria) {
739 let remove_tracks_fut = future::join_all(
740 self.0
741 .borrow()
742 .senders
743 .values()
744 .filter(|&s| {
745 kinds.has(s.state().kind(), s.state().source_kind())
746 })
747 .map(|s| {
748 let sender = s.obj();
749 async move {
750 sender.remove_track().await;
751 }
752 }),
753 );
754 drop(remove_tracks_fut.await);
755 }
756
757 pub fn remove_track(&self, track_id: TrackId) {
760 let mut inner = self.0.borrow_mut();
761 if inner.receivers.remove(&track_id).is_none() {
762 drop(inner.senders.remove(&track_id));
763 }
764 }
765}
766
767#[cfg(feature = "mockable")]
768#[expect(clippy::allow_attributes, reason = "`#[expect]` is not considered")]
770#[allow(clippy::multiple_inherent_impl, reason = "feature gated")]
771impl MediaConnections {
772 #[must_use]
775 pub fn is_recv_video_enabled(&self) -> bool {
776 !self
777 .0
778 .borrow()
779 .iter_receivers_with_kind_and_source_kind(MediaKind::Video, None)
780 .any(|s| !s.state().enabled_individual())
781 }
782
783 #[must_use]
786 pub fn is_recv_audio_enabled(&self) -> bool {
787 !self
788 .0
789 .borrow()
790 .iter_receivers_with_kind_and_source_kind(MediaKind::Audio, None)
791 .any(|s| !s.state().enabled_individual())
792 }
793
794 #[must_use]
796 pub fn get_receiver_by_id(&self, id: TrackId) -> Option<Rc<Receiver>> {
797 self.0.borrow().receivers.get(&id).map(Component::obj)
798 }
799
800 #[must_use]
802 pub fn get_sender_by_id(&self, id: TrackId) -> Option<Rc<Sender>> {
803 self.0.borrow().senders.get(&id).map(Component::obj)
804 }
805
806 #[must_use]
808 pub fn is_send_audio_enabled(&self) -> bool {
809 self.0
810 .borrow()
811 .iter_senders_with_kind_and_source_kind(MediaKind::Audio, None)
812 .all(|s| s.state().enabled())
813 }
814
815 #[must_use]
817 pub fn is_send_video_enabled(
818 &self,
819 source_kind: Option<MediaSourceKind>,
820 ) -> bool {
821 self.0
822 .borrow()
823 .iter_senders_with_kind_and_source_kind(
824 MediaKind::Video,
825 source_kind,
826 )
827 .all(|s| s.state().enabled())
828 }
829
830 #[must_use]
832 pub fn is_send_video_unmuted(
833 &self,
834 source_kind: Option<MediaSourceKind>,
835 ) -> bool {
836 !self
837 .0
838 .borrow()
839 .iter_senders_with_kind_and_source_kind(
840 MediaKind::Video,
841 source_kind,
842 )
843 .any(|s| s.muted())
844 }
845
846 #[must_use]
848 pub fn is_send_audio_unmuted(&self) -> bool {
849 !self
850 .0
851 .borrow()
852 .iter_senders_with_kind_and_source_kind(MediaKind::Audio, None)
853 .any(|s| s.muted())
854 }
855
856 #[expect(clippy::too_many_arguments, reason = "not a problem")]
862 pub async fn create_sender(
863 &self,
864 id: TrackId,
865 media_type: MediaType,
866 media_direction: proto::MediaDirection,
867 muted: bool,
868 mid: Option<String>,
869 receivers: Vec<MemberId>,
870 send_constraints: &LocalTracksConstraints,
871 connection_mode: ConnectionMode,
872 ) -> Result<sender::Component, Traced<sender::CreateError>> {
873 let sender_state = sender::State::new(
874 id,
875 mid,
876 media_type,
877 media_direction,
878 muted,
879 receivers,
880 send_constraints.clone(),
881 connection_mode,
882 );
883 let sender = Sender::new(
884 &sender_state,
885 self,
886 send_constraints.clone(),
887 mpsc::unbounded().0,
888 )
889 .await?;
890
891 Ok(sender::Component::new(sender, Rc::new(sender_state)))
892 }
893
894 #[expect(clippy::too_many_arguments, reason = "not a problem")]
896 pub async fn create_receiver(
897 &self,
898 id: TrackId,
899 media_type: MediaType,
900 media_direction: proto::MediaDirection,
901 muted: bool,
902 mid: Option<String>,
903 sender: MemberId,
904 recv_constraints: &RecvConstraints,
905 connection_mode: ConnectionMode,
906 ) -> receiver::Component {
907 let state = receiver::State::new(
908 id,
909 mid,
910 media_type,
911 media_direction,
912 muted,
913 sender,
914 connection_mode,
915 );
916 let receiver = Receiver::new(
917 &state,
918 self,
919 mpsc::unbounded().0,
920 recv_constraints,
921 connection_mode,
922 )
923 .await;
924
925 receiver::Component::new(Rc::new(receiver), Rc::new(state))
926 }
927
928 pub async fn create_tracks(
935 &self,
936 tracks: Vec<proto::Track>,
937 send_constraints: &LocalTracksConstraints,
938 recv_constraints: &RecvConstraints,
939 connection_mode: ConnectionMode,
940 ) -> Result<(), Traced<sender::CreateError>> {
941 for track in tracks {
942 match track.direction {
943 proto::Direction::Send { mid, receivers } => {
944 let is_muted = send_constraints.muted(&track.media_type);
945 let component = self
946 .create_sender(
947 track.id,
948 track.media_type,
949 proto::MediaDirection::SendRecv,
950 is_muted,
951 mid,
952 receivers,
953 send_constraints,
954 connection_mode,
955 )
956 .await?;
957 drop(
958 self.0.borrow_mut().senders.insert(track.id, component),
959 );
960 }
961 proto::Direction::Recv { mid, sender } => {
962 let component = self
963 .create_receiver(
964 track.id,
965 track.media_type,
966 proto::MediaDirection::SendRecv,
967 false,
968 mid,
969 sender,
970 recv_constraints,
971 connection_mode,
972 )
973 .await;
974 drop(
975 self.0
976 .borrow_mut()
977 .receivers
978 .insert(track.id, component),
979 );
980 }
981 }
982 }
983 Ok(())
984 }
985
986 pub fn get_senders(&self) -> Vec<Rc<Sender>> {
988 self.0.borrow().senders.values().map(Component::obj).collect()
989 }
990
991 #[must_use]
993 pub fn get_sender_state_by_id(
994 &self,
995 id: TrackId,
996 ) -> Option<Rc<sender::State>> {
997 self.0.borrow().senders.get(&id).map(Component::state)
998 }
999}