medea_jason/peer/media/
mod.rs

1//! [`PeerConnection`] media management.
2//!
3//! [`PeerConnection`]: crate::peer::PeerConnection
4
5pub mod receiver;
6pub mod sender;
7mod transitable_state;
8
9use std::{cell::RefCell, collections::HashMap, rc::Rc};
10
11use derive_more::with_trait::{Display, From};
12use futures::{
13    FutureExt as _, TryFutureExt as _, channel::mpsc, future,
14    future::LocalBoxFuture,
15};
16use medea_client_api_proto as proto;
17#[cfg(feature = "mockable")]
18use medea_client_api_proto::{ConnectionMode, MemberId};
19use proto::{MediaSourceKind, MediaType, TrackId};
20use tracerr::Traced;
21
22#[doc(inline)]
23pub use self::{
24    receiver::Receiver,
25    sender::Sender,
26    transitable_state::{
27        InStable, InTransition, MediaExchangeState,
28        MediaExchangeStateController, MediaState, MuteState,
29        MuteStateController, TransitableState, TransitableStateController,
30        media_exchange_state, mute_state,
31    },
32};
33use super::tracks_request::TracksRequest;
34#[cfg(feature = "mockable")]
35use crate::media::{LocalTracksConstraints, RecvConstraints};
36use crate::{
37    media::{MediaKind, track::local},
38    peer::{LocalStreamUpdateCriteria, PeerEvent},
39    platform,
40    platform::{
41        TransceiverInit, send_encoding_parameters::SendEncodingParameters,
42        transceiver::probe_target_codecs,
43    },
44    utils::{Caused, Component},
45};
46
47/// Transceiver's sending ([`Sender`]) or receiving ([`Receiver`]) side.
48pub trait TransceiverSide: MediaStateControllable {
49    /// Returns [`TrackId`] of this [`TransceiverSide`].
50    fn track_id(&self) -> TrackId;
51
52    /// Returns [`MediaKind`] of this [`TransceiverSide`].
53    fn kind(&self) -> MediaKind;
54
55    /// Returns [`MediaSourceKind`] of this [`TransceiverSide`].
56    fn source_kind(&self) -> MediaSourceKind;
57
58    /// Returns `true` if this [`TransceiverSide`] currently can be
59    /// disabled/enabled without [`LocalTracksConstraints`] updating.
60    ///
61    /// [`LocalTracksConstraints`]: super::LocalTracksConstraints
62    fn is_transitable(&self) -> bool;
63}
64
65/// Default functions for dealing with [`MediaExchangeStateController`] and
66/// [`MuteStateController`] for objects that use it.
67pub trait MediaStateControllable {
68    /// Returns reference to the [`MediaExchangeStateController`].
69    #[must_use]
70    fn media_exchange_state_controller(
71        &self,
72    ) -> Rc<MediaExchangeStateController>;
73
74    /// Returns a reference to the [`MuteStateController`].
75    #[must_use]
76    fn mute_state_controller(&self) -> Rc<MuteStateController>;
77
78    /// Returns [`MediaExchangeState`] of this [`MediaStateControllable`].
79    fn media_exchange_state(&self) -> MediaExchangeState {
80        self.media_exchange_state_controller().state()
81    }
82
83    /// Returns [`MuteState`] of this [`MediaStateControllable`].
84    #[must_use]
85    fn mute_state(&self) -> MuteState {
86        self.mute_state_controller().state()
87    }
88
89    /// Sets current [`MediaState`] to [`TransitableState::Transition`].
90    ///
91    /// # Errors
92    ///
93    /// Implementors might return [`ProhibitedStateError`] if a transition
94    /// cannot be made for some reason.
95    fn media_state_transition_to(
96        &self,
97        desired_state: MediaState,
98    ) -> Result<(), Traced<ProhibitedStateError>> {
99        match desired_state {
100            MediaState::MediaExchange(desired_state) => {
101                self.media_exchange_state_controller()
102                    .transition_to(desired_state);
103            }
104            MediaState::Mute(desired_state) => {
105                self.mute_state_controller().transition_to(desired_state);
106            }
107        }
108
109        Ok(())
110    }
111
112    /// Indicates whether [`Room`] should subscribe to the [`MediaState`] update
113    /// when updating [`MediaStateControllable`] to the provided [`MediaState`].
114    ///
115    /// [`Room`]: crate::room::Room
116    fn is_subscription_needed(&self, desired_state: MediaState) -> bool {
117        match desired_state {
118            MediaState::MediaExchange(media_exchange) => {
119                let current = self.media_exchange_state();
120                match current {
121                    MediaExchangeState::Transition(_) => true,
122                    MediaExchangeState::Stable(stable) => {
123                        stable != media_exchange
124                    }
125                }
126            }
127            MediaState::Mute(mute_state) => {
128                let current = self.mute_state();
129                match current {
130                    MuteState::Transition(_) => true,
131                    MuteState::Stable(stable) => stable != mute_state,
132                }
133            }
134        }
135    }
136
137    /// Indicates whether [`Room`] should send [`TrackPatchCommand`] to the
138    /// server when updating [`MediaStateControllable`] to the provided
139    /// [`MediaState`].
140    ///
141    /// [`TrackPatchCommand`]: medea_client_api_proto::TrackPatchCommand
142    /// [`Room`]: crate::room::Room
143    #[must_use]
144    fn is_track_patch_needed(&self, desired_state: MediaState) -> bool {
145        match desired_state {
146            MediaState::MediaExchange(media_exchange) => {
147                let current = self.media_exchange_state();
148                match current {
149                    MediaExchangeState::Stable(stable) => {
150                        stable != media_exchange
151                    }
152                    MediaExchangeState::Transition(transition) => {
153                        transition.intended() != media_exchange
154                    }
155                }
156            }
157            MediaState::Mute(mute_state) => {
158                let current = self.mute_state();
159                match current {
160                    MuteState::Stable(stable) => stable != mute_state,
161                    MuteState::Transition(transition) => {
162                        transition.intended() != mute_state
163                    }
164                }
165            }
166        }
167    }
168
169    /// Returns [`Future`] which will be resolved when [`MediaState`] of this
170    /// [`MediaStateControllable`] will be [`TransitableState::Stable`] or it's
171    /// dropped.
172    ///
173    /// # Errors
174    ///
175    /// With an approved stable [`MediaState`] if transition to the
176    /// `desired_state` cannot be made.
177    ///
178    /// [`MediaState`]: super::MediaState
179    fn when_media_state_stable(
180        &self,
181        desired_state: MediaState,
182    ) -> LocalBoxFuture<'static, Result<(), MediaState>> {
183        match desired_state {
184            MediaState::Mute(desired_state) => self
185                .mute_state_controller()
186                .when_media_state_stable(desired_state)
187                .map_err(MediaState::Mute)
188                .boxed_local(),
189            MediaState::MediaExchange(desired_state) => self
190                .media_exchange_state_controller()
191                .when_media_state_stable(desired_state)
192                .map_err(MediaState::MediaExchange)
193                .boxed_local(),
194        }
195    }
196}
197
198/// Direction of the `MediaTrack`.
199#[derive(Clone, Copy, Debug)]
200pub enum TrackDirection {
201    /// Sends media data.
202    Send,
203
204    /// Receives media data.
205    Recv,
206}
207
208/// Error occurring when media state transition is not allowed.
209#[derive(Clone, Copy, Debug, Display)]
210pub enum ProhibitedStateError {
211    /// [`Sender`] cannot be disabled because it's required.
212    #[display(
213        "`MediaExchangeState` of `Sender` can't transit to \
214         disabled state, because this `Sender` is required"
215    )]
216    CannotDisableRequiredSender,
217}
218
219/// Errors occurring in [`MediaConnections::insert_local_tracks()`] method.
220#[derive(Caused, Clone, Debug, Display, From)]
221#[cause(error = platform::Error)]
222pub enum InsertLocalTracksError {
223    /// [`local::Track`] doesn't satisfy [`Sender`]'s constraints.
224    #[display("Provided `Track` doesn't satisfy senders constraints")]
225    InvalidMediaTrack,
226
227    /// There are not enough [`local::Track`]s being inserted into [`Sender`]s.
228    #[display("Provided stream does not have all necessary `Track`s")]
229    NotEnoughTracks,
230
231    /// Insertion of a [`local::Track`] into a [`Sender`] fails.
232    CouldNotInsertLocalTrack(#[cause] sender::InsertTrackError),
233}
234
235/// Errors occurring in [`MediaConnections::get_mids()`] method.
236#[derive(Clone, Copy, Debug, Display)]
237pub enum GetMidsError {
238    /// Cannot get the `mid` from a [`Sender`].
239    #[display("Peer has senders without mid")]
240    SendersWithoutMid,
241
242    /// Cannot get the `mid` from a [`Receiver`].
243    #[display("Peer has receivers without mid")]
244    ReceiversWithoutMid,
245}
246
247/// Actual data of [`MediaConnections`] storage.
248#[derive(Debug)]
249struct InnerMediaConnections {
250    /// Reference to the parent [`platform::RtcPeerConnection`].
251    ///
252    /// Used to generate transceivers for [`Sender`]s and [`Receiver`]s.
253    peer: Rc<platform::RtcPeerConnection>,
254
255    /// [`PeerEvent`]s tx.
256    peer_events_sender: mpsc::UnboundedSender<PeerEvent>,
257
258    /// [`TrackId`] to its [`sender::Component`].
259    senders: HashMap<TrackId, sender::Component>,
260
261    /// [`TrackId`] to its [`receiver::Component`].
262    receivers: HashMap<TrackId, receiver::Component>,
263}
264
265impl InnerMediaConnections {
266    /// Returns [`Iterator`] over [`sender::Component`]s with provided
267    /// [`MediaKind`] and [`MediaSourceKind`].
268    fn iter_senders_with_kind_and_source_kind(
269        &self,
270        kind: MediaKind,
271        source_kind: Option<MediaSourceKind>,
272    ) -> impl Iterator<Item = &sender::Component> {
273        self.senders
274            .values()
275            .filter(move |sender| sender.state().kind() == kind)
276            .filter(move |sender| {
277                source_kind
278                    .is_none_or(|sk| sender.caps().media_source_kind() == sk)
279            })
280    }
281
282    /// Returns [`Iterator`] over [`receiver::Component`]s with provided
283    /// [`MediaKind`] and [`MediaSourceKind`].
284    fn iter_receivers_with_kind_and_source_kind(
285        &self,
286        kind: MediaKind,
287        source_kind: Option<MediaSourceKind>,
288    ) -> impl Iterator<Item = &receiver::Component> {
289        self.receivers
290            .values()
291            .filter(move |s| s.state().kind() == kind)
292            .filter(move |s| {
293                source_kind.is_none_or(|skind| s.state().source_kind() == skind)
294            })
295    }
296
297    /// Returns all [`TransceiverSide`]s by provided [`TrackDirection`],
298    /// [`MediaKind`] and [`MediaSourceKind`].
299    fn get_transceivers_by_direction_and_kind(
300        &self,
301        direction: TrackDirection,
302        kind: MediaKind,
303        source_kind: Option<MediaSourceKind>,
304    ) -> Vec<Rc<dyn TransceiverSide>> {
305        match direction {
306            TrackDirection::Send => self
307                .iter_senders_with_kind_and_source_kind(kind, source_kind)
308                .map(|tx| -> Rc<dyn TransceiverSide> { tx.state() })
309                .collect(),
310            TrackDirection::Recv => self
311                .iter_receivers_with_kind_and_source_kind(kind, source_kind)
312                .map(|rx| -> Rc<dyn TransceiverSide> { rx.state() })
313                .collect(),
314        }
315    }
316
317    /// Creates a [`platform::Transceiver`] and adds it to the
318    /// [`platform::RtcPeerConnection`].
319    ///
320    /// Handles both audio and video media types, including setting up
321    /// [`EncodingParameters`] and codec preferences for video.
322    fn add_transceiver(
323        &self,
324        media_type: MediaType,
325        direction: platform::TransceiverDirection,
326    ) -> impl Future<Output = platform::Transceiver> + 'static + use<> {
327        let peer = Rc::clone(&self.peer);
328
329        async move {
330            let kind = MediaKind::from(&media_type);
331
332            match media_type {
333                MediaType::Audio(_) => {
334                    peer.add_transceiver(kind, TransceiverInit::new(direction))
335                        .await
336                }
337                MediaType::Video(settings) => {
338                    let init = TransceiverInit::new(direction);
339
340                    init.set_send_encodings(
341                        settings
342                            .encoding_parameters
343                            .iter()
344                            .cloned()
345                            .map(SendEncodingParameters::from)
346                            .collect(),
347                    );
348
349                    let transceiver = peer.add_transceiver(kind, init).await;
350                    let target_codecs = probe_target_codecs(
351                        settings
352                            .encoding_parameters
353                            .iter()
354                            .filter_map(|e| e.codec.as_ref()),
355                    )
356                    .await;
357                    // TODO: Switch to negotiationless codec change via
358                    //       `RTCRtpSender.setParameters()` once
359                    //       `RTCRtpEncodingParameters.codec` is supported on
360                    //       all major UAs.
361                    //       Track for `parameters.encodings.codec` here:
362                    //       https://tinyurl.com/wytxmuss
363                    if let Some(target_codecs) = target_codecs {
364                        transceiver.set_codec_preferences(target_codecs);
365                    }
366                    transceiver
367                }
368            }
369        }
370    }
371
372    /// Lookups a [`platform::Transceiver`] by the provided [`mid`].
373    ///
374    /// [`mid`]: https://w3.org/TR/webrtc#dom-rtptransceiver-mid
375    fn get_transceiver_by_mid(
376        &self,
377        mid: String,
378    ) -> impl Future<Output = Option<platform::Transceiver>> + 'static + use<>
379    {
380        self.peer.get_transceiver_by_mid(mid)
381    }
382}
383
384/// Storage of [`platform::RtcPeerConnection`]'s [`sender::Component`] and
385/// [`receiver::Component`].
386#[derive(Debug)]
387pub struct MediaConnections(RefCell<InnerMediaConnections>);
388
389impl MediaConnections {
390    /// Instantiates a new [`MediaConnections`] storage for the given
391    /// [`platform::RtcPeerConnection`].
392    #[must_use]
393    pub fn new(
394        peer: Rc<platform::RtcPeerConnection>,
395        peer_events_sender: mpsc::UnboundedSender<PeerEvent>,
396    ) -> Self {
397        Self(RefCell::new(InnerMediaConnections {
398            peer,
399            peer_events_sender,
400            senders: HashMap::new(),
401            receivers: HashMap::new(),
402        }))
403    }
404
405    /// Returns all [`Sender`]s and [`Receiver`]s from this [`MediaConnections`]
406    /// with provided [`MediaKind`], [`TrackDirection`] and
407    /// [`MediaSourceKind`].
408    pub fn get_transceivers_sides(
409        &self,
410        kind: MediaKind,
411        direction: TrackDirection,
412        source_kind: Option<MediaSourceKind>,
413    ) -> Vec<Rc<dyn TransceiverSide>> {
414        self.0.borrow().get_transceivers_by_direction_and_kind(
415            direction,
416            kind,
417            source_kind,
418        )
419    }
420
421    /// Indicates whether all [`TransceiverSide`]s with provided [`MediaKind`],
422    /// [`TrackDirection`] and [`MediaSourceKind`] is in the provided
423    /// [`MediaExchangeState`].
424    #[must_use]
425    pub fn is_all_tracks_in_media_state(
426        &self,
427        kind: MediaKind,
428        direction: TrackDirection,
429        source_kind: Option<MediaSourceKind>,
430        state: MediaState,
431    ) -> bool {
432        let transceivers =
433            self.0.borrow().get_transceivers_by_direction_and_kind(
434                direction,
435                kind,
436                source_kind,
437            );
438        for transceiver in transceivers {
439            if !transceiver.is_transitable() {
440                continue;
441            }
442
443            let not_in_state = match state {
444                MediaState::Mute(mute_state) => {
445                    transceiver.mute_state() != mute_state.into()
446                }
447                MediaState::MediaExchange(media_exchange) => {
448                    transceiver.media_exchange_state() != media_exchange.into()
449                }
450            };
451            if not_in_state {
452                return false;
453            }
454        }
455
456        true
457    }
458
459    /// Returns mapping from a [`proto::Track`] ID to a `mid` of this track's
460    /// [`platform::Transceiver`].
461    ///
462    /// # Errors
463    ///
464    /// See [`GetMidsError`] for details.
465    pub fn get_mids(
466        &self,
467    ) -> Result<HashMap<TrackId, String>, Traced<GetMidsError>> {
468        let inner = self.0.borrow();
469        let mut mids =
470            HashMap::with_capacity(inner.senders.len() + inner.receivers.len());
471        #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
472        for (track_id, sender) in &inner.senders {
473            drop(
474                mids.insert(
475                    *track_id,
476                    sender
477                        .mid()
478                        .ok_or(GetMidsError::SendersWithoutMid)
479                        .map_err(tracerr::wrap!())?,
480                ),
481            );
482        }
483        #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
484        for (track_id, receiver) in &inner.receivers {
485            drop(
486                mids.insert(
487                    *track_id,
488                    receiver
489                        .mid()
490                        .ok_or(GetMidsError::ReceiversWithoutMid)
491                        .map_err(tracerr::wrap!())?,
492                ),
493            );
494        }
495        Ok(mids)
496    }
497
498    /// Returns activity statuses of the all the [`Sender`]s and [`Receiver`]s
499    /// from these [`MediaConnections`].
500    pub fn get_transceivers_statuses(
501        &self,
502    ) -> impl Future<Output = HashMap<TrackId, bool>> + 'static + use<> {
503        let inner = self.0.borrow();
504        let transceivers = inner
505            .senders
506            .iter()
507            .map(|(&track_id, sender)| {
508                let sender = sender.obj();
509                async move { (track_id, sender.is_publishing().await) }
510                    .boxed_local()
511            })
512            .chain(inner.receivers.iter().map(|(&track_id, receiver)| {
513                let receiver = receiver.obj();
514                async move { (track_id, receiver.is_receiving().await) }
515                    .boxed_local()
516            }))
517            .collect::<Vec<_>>();
518
519        future::join_all(transceivers).map(|r| r.into_iter().collect())
520    }
521
522    /// Returns [`Rc`] to [`TransceiverSide`] with a provided [`TrackId`].
523    ///
524    /// Returns `None` if [`TransceiverSide`] with a provided [`TrackId`]
525    /// doesn't exist in this [`MediaConnections`].
526    pub fn get_transceiver_side_by_id(
527        &self,
528        track_id: TrackId,
529    ) -> Option<Rc<dyn TransceiverSide>> {
530        let inner = self.0.borrow();
531        inner
532            .senders
533            .get(&track_id)
534            .map(|sndr| -> Rc<dyn TransceiverSide> { sndr.state() })
535            .or_else(|| {
536                inner
537                    .receivers
538                    .get(&track_id)
539                    .map(|rcvr| -> Rc<dyn TransceiverSide> { rcvr.state() })
540            })
541    }
542
543    /// Inserts new [`sender::Component`] into [`MediaConnections`].
544    pub fn insert_sender(&self, sender: sender::Component) {
545        drop(self.0.borrow_mut().senders.insert(sender.state().id(), sender));
546    }
547
548    /// Inserts new [`receiver::Component`] into [`MediaConnections`].
549    pub fn insert_receiver(&self, receiver: receiver::Component) {
550        drop(
551            self.0
552                .borrow_mut()
553                .receivers
554                .insert(receiver.state().id(), receiver),
555        );
556    }
557
558    /// Returns [`TracksRequest`] based on [`Sender`]s in this
559    /// [`MediaConnections`]. [`Sender`]s are chosen based on provided
560    /// [`LocalStreamUpdateCriteria`].
561    pub fn get_tracks_request(
562        &self,
563        kinds: LocalStreamUpdateCriteria,
564    ) -> Option<TracksRequest> {
565        let mut stream_request = None;
566        #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
567        for sender in self.0.borrow().senders.values() {
568            if kinds
569                .has(sender.state().media_kind(), sender.state().media_source())
570            {
571                stream_request
572                    .get_or_insert_with(TracksRequest::default)
573                    .add_track_request(
574                        sender.state().track_id(),
575                        sender.caps().clone(),
576                    );
577            }
578        }
579        stream_request
580    }
581
582    /// Inserts provided tracks into [`Sender`]s based on track IDs.
583    ///
584    /// [`local::Track`]s are inserted into [`Sender`]'s
585    /// [`platform::Transceiver`]s via a [`replaceTrack` method][1], changing
586    /// its direction to `sendonly`.
587    ///
588    /// Returns [`HashMap`] with [`media_exchange_state::Stable`]s updates for
589    /// the [`Sender`]s.
590    ///
591    /// # Errors
592    ///
593    /// See [`InsertLocalTracksError`] for details.
594    ///
595    /// [1]: https://w3.org/TR/webrtc#dom-rtcrtpsender-replacetrack
596    pub async fn insert_local_tracks(
597        &self,
598        tracks: &HashMap<TrackId, Rc<local::Track>>,
599    ) -> Result<
600        HashMap<TrackId, media_exchange_state::Stable>,
601        Traced<InsertLocalTracksError>,
602    > {
603        // Build sender to track pairs to catch errors before inserting.
604        let mut sender_and_track =
605            Vec::with_capacity(self.0.borrow().senders.len());
606        let mut media_exchange_state_updates = HashMap::new();
607        let senders = self
608            .0
609            .borrow()
610            .senders
611            .values()
612            .map(|c| (c.obj(), c.state()))
613            .collect::<Vec<_>>();
614        for (sender, state) in senders {
615            if let Some(track) = tracks.get(&state.id()).cloned() {
616                if sender.caps().satisfies(track.as_ref()).await {
617                    sender_and_track.push((sender, track));
618                } else {
619                    return Err(tracerr::new!(
620                        InsertLocalTracksError::InvalidMediaTrack
621                    ));
622                }
623            } else if sender.caps().required() {
624                return Err(tracerr::new!(
625                    InsertLocalTracksError::NotEnoughTracks
626                ));
627            } else {
628                _ = media_exchange_state_updates
629                    .insert(state.id(), media_exchange_state::Stable::Disabled);
630            }
631        }
632
633        future::try_join_all(sender_and_track.into_iter().map(
634            async |(sender, track)| {
635                Rc::clone(&sender).insert_track(track).await
636            },
637        ))
638        .await
639        .map(drop)
640        .map_err(tracerr::map_from_and_wrap!())?;
641
642        Ok(media_exchange_state_updates)
643    }
644
645    /// Adds a new track to the corresponding [`Receiver`].
646    ///
647    /// # Errors
648    ///
649    /// Errors with a transceivers `mid` if could not find [`Receiver`] by this
650    /// `mid`.
651    ///
652    /// # Panics
653    ///
654    /// If the provided [`platform::Transceiver`] doesn't have a [`mid`]. Not
655    /// supposed to happen, since [`platform::MediaStreamTrack`] is only fired
656    /// when a [`platform::Transceiver`] is negotiated, thus have a [`mid`].
657    ///
658    /// [`mid`]: https://w3.org/TR/webrtc#dom-rtptransceiver-mid
659    pub async fn add_remote_track(
660        &self,
661        track: platform::MediaStreamTrack,
662        transceiver: platform::Transceiver,
663    ) -> Result<(), String> {
664        // Cannot fail, since transceiver is guaranteed to be negotiated at this
665        // point.
666        let mid = transceiver.mid().ok_or("No Transceiver::mid found")?;
667        let receiver = self
668            .0
669            .borrow()
670            .receivers
671            .values()
672            .find(|rcvr| rcvr.mid().as_ref() == Some(&mid))
673            .map(Component::obj);
674
675        if let Some(rcvr) = receiver {
676            rcvr.set_remote_track(transceiver, track).await;
677            Ok(())
678        } else {
679            Err(mid)
680        }
681    }
682
683    /// Iterates over all [`Receiver`]s with [`mid`] and without
684    /// [`platform::Transceiver`], trying to find the corresponding
685    /// [`platform::Transceiver`] in the [`platform::RtcPeerConnection`] and to
686    /// insert it into the [`Receiver`].
687    ///
688    /// [`mid`]: https://w3.org/TR/webrtc#dom-rtptransceiver-mid
689    pub fn sync_receivers(&self) -> impl Future<Output = ()> + 'static + use<> {
690        future::join_all({
691            self.0
692                .borrow()
693                .receivers
694                .values()
695                .filter_map(|receiver| {
696                    #[expect(clippy::question_mark, reason = "more readable")]
697                    if receiver.transceiver().is_none() {
698                        return None;
699                    }
700                    receiver.mid().map(|mid| {
701                        let fut = {
702                            self.0.borrow().peer.get_transceiver_by_mid(mid)
703                        };
704                        let receiver = Component::obj(receiver);
705                        async move {
706                            if let Some(t) = fut.await {
707                                receiver.set_transceiver(t);
708                            }
709                        }
710                    })
711                })
712                .collect::<Vec<_>>()
713        })
714        .map(drop)
715    }
716
717    /// Returns all [`Sender`]s which are matches provided
718    /// [`LocalStreamUpdateCriteria`] and doesn't have [`local::Track`].
719    pub fn get_senders_without_tracks_ids(
720        &self,
721        kinds: LocalStreamUpdateCriteria,
722    ) -> Vec<TrackId> {
723        self.0
724            .borrow()
725            .senders
726            .values()
727            .filter_map(|s| {
728                (kinds.has(s.state().kind(), s.state().source_kind())
729                    && s.state().enabled()
730                    && !s.has_track())
731                .then_some(s.state().id())
732            })
733            .collect()
734    }
735
736    /// Drops [`local::Track`]s of all [`Sender`]s which are matches provided
737    /// [`LocalStreamUpdateCriteria`].
738    pub async fn drop_send_tracks(&self, kinds: LocalStreamUpdateCriteria) {
739        let remove_tracks_fut = future::join_all(
740            self.0
741                .borrow()
742                .senders
743                .values()
744                .filter(|&s| {
745                    kinds.has(s.state().kind(), s.state().source_kind())
746                })
747                .map(|s| {
748                    let sender = s.obj();
749                    async move {
750                        sender.remove_track().await;
751                    }
752                }),
753        );
754        drop(remove_tracks_fut.await);
755    }
756
757    /// Removes a [`sender::Component`] or a [`receiver::Component`] with the
758    /// provided [`TrackId`] from these [`MediaConnections`].
759    pub fn remove_track(&self, track_id: TrackId) {
760        let mut inner = self.0.borrow_mut();
761        if inner.receivers.remove(&track_id).is_none() {
762            drop(inner.senders.remove(&track_id));
763        }
764    }
765}
766
767#[cfg(feature = "mockable")]
768// TODO: Try remove on next Rust version upgrade.
769#[expect(clippy::allow_attributes, reason = "`#[expect]` is not considered")]
770#[allow(clippy::multiple_inherent_impl, reason = "feature gated")]
771impl MediaConnections {
772    /// Indicates whether all [`Receiver`]s with [`MediaKind::Video`] are
773    /// enabled.
774    #[must_use]
775    pub fn is_recv_video_enabled(&self) -> bool {
776        !self
777            .0
778            .borrow()
779            .iter_receivers_with_kind_and_source_kind(MediaKind::Video, None)
780            .any(|s| !s.state().enabled_individual())
781    }
782
783    /// Indicates whether if all [`Receiver`]s with [`MediaKind::Audio`] are
784    /// enabled.
785    #[must_use]
786    pub fn is_recv_audio_enabled(&self) -> bool {
787        !self
788            .0
789            .borrow()
790            .iter_receivers_with_kind_and_source_kind(MediaKind::Audio, None)
791            .any(|s| !s.state().enabled_individual())
792    }
793
794    /// Returns [`Receiver`] with the provided [`TrackId`].
795    #[must_use]
796    pub fn get_receiver_by_id(&self, id: TrackId) -> Option<Rc<Receiver>> {
797        self.0.borrow().receivers.get(&id).map(Component::obj)
798    }
799
800    /// Returns [`Sender`] with a provided [`TrackId`].
801    #[must_use]
802    pub fn get_sender_by_id(&self, id: TrackId) -> Option<Rc<Sender>> {
803        self.0.borrow().senders.get(&id).map(Component::obj)
804    }
805
806    /// Indicates whether all [`Sender`]s with [`MediaKind::Audio`] are enabled.
807    #[must_use]
808    pub fn is_send_audio_enabled(&self) -> bool {
809        self.0
810            .borrow()
811            .iter_senders_with_kind_and_source_kind(MediaKind::Audio, None)
812            .all(|s| s.state().enabled())
813    }
814
815    /// Indicates whether all [`Sender`]s with [`MediaKind::Video`] are enabled.
816    #[must_use]
817    pub fn is_send_video_enabled(
818        &self,
819        source_kind: Option<MediaSourceKind>,
820    ) -> bool {
821        self.0
822            .borrow()
823            .iter_senders_with_kind_and_source_kind(
824                MediaKind::Video,
825                source_kind,
826            )
827            .all(|s| s.state().enabled())
828    }
829
830    /// Indicates whether all [`Sender`]'s video tracks are unmuted.
831    #[must_use]
832    pub fn is_send_video_unmuted(
833        &self,
834        source_kind: Option<MediaSourceKind>,
835    ) -> bool {
836        !self
837            .0
838            .borrow()
839            .iter_senders_with_kind_and_source_kind(
840                MediaKind::Video,
841                source_kind,
842            )
843            .any(|s| s.muted())
844    }
845
846    /// Indicates whether all [`Sender`]'s audio tracks are unmuted.
847    #[must_use]
848    pub fn is_send_audio_unmuted(&self) -> bool {
849        !self
850            .0
851            .borrow()
852            .iter_senders_with_kind_and_source_kind(MediaKind::Audio, None)
853            .any(|s| s.muted())
854    }
855
856    /// Creates a new [`sender::Component`] with the provided data.
857    ///
858    /// # Errors
859    ///
860    /// See [`sender::CreateError`] for details.
861    #[expect(clippy::too_many_arguments, reason = "not a problem")]
862    pub async fn create_sender(
863        &self,
864        id: TrackId,
865        media_type: MediaType,
866        media_direction: proto::MediaDirection,
867        muted: bool,
868        mid: Option<String>,
869        receivers: Vec<MemberId>,
870        send_constraints: &LocalTracksConstraints,
871        connection_mode: ConnectionMode,
872    ) -> Result<sender::Component, Traced<sender::CreateError>> {
873        let sender_state = sender::State::new(
874            id,
875            mid,
876            media_type,
877            media_direction,
878            muted,
879            receivers,
880            send_constraints.clone(),
881            connection_mode,
882        );
883        let sender = Sender::new(
884            &sender_state,
885            self,
886            send_constraints.clone(),
887            mpsc::unbounded().0,
888        )
889        .await?;
890
891        Ok(sender::Component::new(sender, Rc::new(sender_state)))
892    }
893
894    /// Creates a new [`receiver::Component`] with the provided data.
895    #[expect(clippy::too_many_arguments, reason = "not a problem")]
896    pub async fn create_receiver(
897        &self,
898        id: TrackId,
899        media_type: MediaType,
900        media_direction: proto::MediaDirection,
901        muted: bool,
902        mid: Option<String>,
903        sender: MemberId,
904        recv_constraints: &RecvConstraints,
905        connection_mode: ConnectionMode,
906    ) -> receiver::Component {
907        let state = receiver::State::new(
908            id,
909            mid,
910            media_type,
911            media_direction,
912            muted,
913            sender,
914            connection_mode,
915        );
916        let receiver = Receiver::new(
917            &state,
918            self,
919            mpsc::unbounded().0,
920            recv_constraints,
921            connection_mode,
922        )
923        .await;
924
925        receiver::Component::new(Rc::new(receiver), Rc::new(state))
926    }
927
928    /// Creates a new [`sender::Component`]s/[`receiver::Component`]s from the
929    /// provided [`proto::Track`]s.
930    ///
931    /// # Errors
932    ///
933    /// See [`sender::CreateError`] for details.
934    pub async fn create_tracks(
935        &self,
936        tracks: Vec<proto::Track>,
937        send_constraints: &LocalTracksConstraints,
938        recv_constraints: &RecvConstraints,
939        connection_mode: ConnectionMode,
940    ) -> Result<(), Traced<sender::CreateError>> {
941        for track in tracks {
942            match track.direction {
943                proto::Direction::Send { mid, receivers } => {
944                    let is_muted = send_constraints.muted(&track.media_type);
945                    let component = self
946                        .create_sender(
947                            track.id,
948                            track.media_type,
949                            proto::MediaDirection::SendRecv,
950                            is_muted,
951                            mid,
952                            receivers,
953                            send_constraints,
954                            connection_mode,
955                        )
956                        .await?;
957                    drop(
958                        self.0.borrow_mut().senders.insert(track.id, component),
959                    );
960                }
961                proto::Direction::Recv { mid, sender } => {
962                    let component = self
963                        .create_receiver(
964                            track.id,
965                            track.media_type,
966                            proto::MediaDirection::SendRecv,
967                            false,
968                            mid,
969                            sender,
970                            recv_constraints,
971                            connection_mode,
972                        )
973                        .await;
974                    drop(
975                        self.0
976                            .borrow_mut()
977                            .receivers
978                            .insert(track.id, component),
979                    );
980                }
981            }
982        }
983        Ok(())
984    }
985
986    /// Returns all underlying [`Sender`]'s.
987    pub fn get_senders(&self) -> Vec<Rc<Sender>> {
988        self.0.borrow().senders.values().map(Component::obj).collect()
989    }
990
991    /// Returns [`sender::State`] with the provided [`TrackId`].
992    #[must_use]
993    pub fn get_sender_state_by_id(
994        &self,
995        id: TrackId,
996    ) -> Option<Rc<sender::State>> {
997        self.0.borrow().senders.get(&id).map(Component::state)
998    }
999}