medea_jason/peer/media/receiver/
component.rs

1//! [`Component`] for `MediaTrack` with a `Recv` direction.
2
3use std::{iter, rc::Rc};
4
5use futures::StreamExt as _;
6use medea_client_api_proto as proto;
7use medea_client_api_proto::{
8    MediaSourceKind, MediaType, MemberId, TrackId, TrackPatchEvent,
9};
10use medea_macro::watchers;
11use medea_reactive::{
12    AllProcessed, Guarded, ObservableCell, Processed, ProgressableCell,
13    when_all_processed,
14};
15use proto::ConnectionMode;
16
17use super::Receiver;
18use crate::{
19    media::{LocalTracksConstraints, MediaDirection, MediaKind},
20    peer::{
21        MediaExchangeState, MediaExchangeStateController,
22        MediaStateControllable, MuteStateController, TransceiverSide,
23        component::SyncPhase,
24        media::{InTransition as _, transitable_state::media_exchange_state},
25    },
26    utils::{AsProtoState, SynchronizableState, Updatable, component},
27};
28
29/// Component responsible for the [`Receiver`] enabling/disabling and
30/// muting/unmuting.
31pub type Component = component::Component<State, Receiver>;
32
33/// State of the [`Component`].
34#[derive(Debug)]
35pub struct State {
36    /// ID of the [`Receiver`]'s [`remote::Track`].
37    ///
38    /// [`remote::Track`]: crate::media::track::remote::Track
39    id: TrackId,
40
41    /// [MID] of the [`Receiver`]'s [`Transceiver`].
42    ///
43    /// [`Transceiver`]: crate::platform::Transceiver
44    /// [MID]: https://w3.org/TR/webrtc#dom-rtptransceiver-mid
45    mid: Option<String>,
46
47    /// [`MediaType`] of the [`Receiver`]'s [`remote::Track`].
48    ///
49    /// [`remote::Track`]: crate::media::track::remote::Track
50    media_type: MediaType,
51
52    /// ID of the member sending the [`Receiver`]'s [`remote::Track`].
53    ///
54    /// [`remote::Track`]: crate::media::track::remote::Track
55    sender_id: MemberId,
56
57    /// Indicator whether the [`Receiver`]'s [`remote::Track`] is enabled
58    /// individually.
59    ///
60    /// [`remote::Track`]: crate::media::track::remote::Track
61    enabled_individual: Rc<MediaExchangeStateController>,
62
63    /// Indicator whether the [`Receiver`]'s [`remote::Track`] is enabled
64    /// generally.
65    ///
66    /// [`remote::Track`]: crate::media::track::remote::Track
67    enabled_general: ProgressableCell<media_exchange_state::Stable>,
68
69    /// Current general [`MediaDirection`] of this [`Receiver`].
70    media_direction: ObservableCell<MediaDirection>,
71
72    /// Indicator whether the [`Receiver`]'s [`remote::Track`] is muted.
73    ///
74    /// [`remote::Track`]: crate::media::track::remote::Track
75    muted: ObservableCell<bool>,
76
77    /// Indicator whether this [`Receiver`] is working in a [P2P mesh] or [SFU]
78    /// mode.
79    ///
80    /// [P2P mesh]: https://webrtcglossary.com/mesh
81    /// [SFU]: https://webrtcglossary.com/sfu
82    connection_mode: ConnectionMode,
83
84    /// [`SyncPhase`] of the [`Component`].
85    sync_phase: ObservableCell<SyncPhase>,
86}
87
88impl AsProtoState for State {
89    type Output = proto::state::Receiver;
90
91    fn as_proto(&self) -> Self::Output {
92        Self::Output {
93            id: self.id,
94            connection_mode: self.connection_mode,
95            mid: self.mid.clone(),
96            media_type: self.media_type.clone(),
97            sender_id: self.sender_id.clone(),
98            muted: false,
99            media_direction: self.media_direction().into(),
100        }
101    }
102}
103
104impl SynchronizableState for State {
105    type Input = proto::state::Receiver;
106
107    fn from_proto(input: Self::Input, _: &LocalTracksConstraints) -> Self {
108        Self {
109            id: input.id,
110            mid: input.mid,
111            media_type: input.media_type,
112            sender_id: input.sender_id,
113            enabled_individual: MediaExchangeStateController::new(
114                media_exchange_state::Stable::from(
115                    input.media_direction.is_recv_enabled(),
116                ),
117            ),
118            enabled_general: ProgressableCell::new(
119                media_exchange_state::Stable::from(
120                    input.media_direction.is_enabled_general(),
121                ),
122            ),
123            muted: ObservableCell::new(input.muted),
124            media_direction: ObservableCell::new(input.media_direction.into()),
125            connection_mode: input.connection_mode,
126            sync_phase: ObservableCell::new(SyncPhase::Synced),
127        }
128    }
129
130    fn apply(&self, input: Self::Input, _: &LocalTracksConstraints) {
131        let new_media_exchange_state = media_exchange_state::Stable::from(
132            input.media_direction.is_recv_enabled(),
133        );
134        let current_media_exchange_state = match self.enabled_individual.state()
135        {
136            MediaExchangeState::Transition(transition) => {
137                transition.into_inner()
138            }
139            MediaExchangeState::Stable(stable) => stable,
140        };
141        if current_media_exchange_state != new_media_exchange_state {
142            self.enabled_individual.update(new_media_exchange_state);
143        }
144
145        self.enabled_general.set(media_exchange_state::Stable::from(
146            input.media_direction.is_enabled_general(),
147        ));
148        self.media_direction.set(input.media_direction.into());
149
150        self.sync_phase.set(SyncPhase::Synced);
151    }
152}
153
154impl Updatable for State {
155    /// Returns [`Future`] resolving once [`media_exchange_state`] is
156    /// stabilized.
157    fn when_stabilized(&self) -> AllProcessed<'static> {
158        let controller = Rc::clone(&self.enabled_individual);
159        when_all_processed(iter::once(
160            Processed::new(Box::new(move || {
161                let controller = Rc::clone(&controller);
162                Box::pin(async move {
163                    controller.when_stabilized().await;
164                })
165            }))
166            .into(),
167        ))
168    }
169
170    /// Returns [`Future`] resolving once [`State`] update will be applied onto
171    /// the [`Receiver`].
172    fn when_updated(&self) -> AllProcessed<'static> {
173        when_all_processed(vec![
174            self.enabled_individual.when_processed().into(),
175            self.enabled_general.when_all_processed().into(),
176        ])
177    }
178
179    /// Notifies [`State`] about a RPC connection loss.
180    fn connection_lost(&self) {
181        self.sync_phase.set(SyncPhase::Desynced);
182    }
183
184    /// Notifies [`State`] about a RPC connection restore.
185    fn connection_recovered(&self) {
186        self.sync_phase.set(SyncPhase::Syncing);
187    }
188}
189
190impl From<&State> for proto::state::Receiver {
191    fn from(from: &State) -> Self {
192        Self {
193            id: from.id,
194            connection_mode: from.connection_mode,
195            mid: from.mid.clone(),
196            media_type: from.media_type.clone(),
197            sender_id: from.sender_id.clone(),
198            media_direction: from.media_direction().into(),
199            muted: false,
200        }
201    }
202}
203
204impl State {
205    /// Returns [`State`] with a provided data.
206    #[must_use]
207    pub fn new(
208        id: TrackId,
209        mid: Option<String>,
210        media_type: MediaType,
211        media_direction: medea_client_api_proto::MediaDirection,
212        muted: bool,
213        sender: MemberId,
214        connection_mode: ConnectionMode,
215    ) -> Self {
216        Self {
217            id,
218            mid,
219            media_type,
220            sender_id: sender,
221            enabled_individual: MediaExchangeStateController::new(
222                media_direction.is_recv_enabled().into(),
223            ),
224            enabled_general: ProgressableCell::new(
225                media_direction.is_enabled_general().into(),
226            ),
227            muted: ObservableCell::new(muted),
228            sync_phase: ObservableCell::new(SyncPhase::Synced),
229            connection_mode,
230            media_direction: ObservableCell::new(media_direction.into()),
231        }
232    }
233
234    /// Returns [`TrackId`] of this [`State`].
235    #[must_use]
236    pub const fn id(&self) -> TrackId {
237        self.id
238    }
239
240    /// Returns current `mid` of this [`State`].
241    #[must_use]
242    pub fn mid(&self) -> Option<&str> {
243        self.mid.as_deref()
244    }
245
246    /// Returns current [`MediaType`] of this [`State`].
247    #[must_use]
248    pub const fn media_type(&self) -> &MediaType {
249        &self.media_type
250    }
251
252    /// Returns current [`MemberId`] of the `Member` from which this
253    /// [`State`] should receive media data.
254    #[must_use]
255    pub const fn sender_id(&self) -> &MemberId {
256        &self.sender_id
257    }
258
259    /// Returns current individual media exchange state of this [`State`].
260    #[must_use]
261    pub fn enabled_individual(&self) -> bool {
262        self.enabled_individual.enabled()
263    }
264
265    /// Returns current general media exchange state of this [`State`].
266    #[must_use]
267    pub fn enabled_general(&self) -> bool {
268        self.enabled_general.get() == media_exchange_state::Stable::Enabled
269    }
270
271    /// Returns current mute state of this [`State`].
272    #[must_use]
273    pub fn muted(&self) -> bool {
274        self.muted.get()
275    }
276
277    /// Returns the current general [`MediaDirection`] of this [`State`].
278    #[must_use]
279    pub fn media_direction(&self) -> MediaDirection {
280        self.media_direction.get()
281    }
282
283    /// Updates this [`State`] with the provided [`TrackPatchEvent`].
284    pub fn update(&self, track_patch: &TrackPatchEvent) {
285        if self.id != track_patch.id {
286            return;
287        }
288        if let Some(direction) = track_patch.media_direction {
289            self.enabled_general.set(direction.is_enabled_general().into());
290
291            self.enabled_individual.update(direction.is_recv_enabled().into());
292        }
293        if let Some(muted) = track_patch.muted {
294            self.muted.set(muted);
295        }
296        if let Some(direction) = track_patch.media_direction {
297            self.media_direction.set(direction.into());
298        }
299    }
300}
301
302#[watchers]
303impl Component {
304    /// Watcher for the [`State::enabled_general`] updates.
305    ///
306    /// Updates [`Receiver`]'s general media exchange state. Adds or removes
307    /// [`RECV`] direction from the [`Transceiver`] of the [`Receiver`].
308    ///
309    /// [`RECV`]: crate::platform::TransceiverDirection::RECV
310    /// [`Transceiver`]: crate::platform::Transceiver
311    #[watch(self.enabled_general.subscribe())]
312    async fn general_media_exchange_state_changed(
313        receiver: Rc<Receiver>,
314        st: Rc<State>,
315        state: Guarded<media_exchange_state::Stable>,
316    ) {
317        let (state, _guard) = state.into_parts();
318        receiver
319            .enabled_general
320            .set(state == media_exchange_state::Stable::Enabled);
321        if (st.connection_mode, state)
322            == (ConnectionMode::Mesh, media_exchange_state::Stable::Disabled)
323        {
324            let sub_recv = {
325                receiver
326                    .transceiver
327                    .borrow()
328                    .as_ref()
329                    .map(|trnscvr| trnscvr.set_recv(false))
330            };
331            if let Some(fut) = sub_recv {
332                fut.await;
333            }
334        } else {
335            let add_recv = receiver
336                .transceiver
337                .borrow()
338                .as_ref()
339                .map(|trnscvr| trnscvr.set_recv(true));
340            if let Some(fut) = add_recv {
341                fut.await;
342            }
343        }
344        receiver.maybe_notify_track().await;
345    }
346
347    /// Watcher for [`media_exchange_state::Stable`] media exchange state
348    /// updates.
349    ///
350    /// Updates [`Receiver::enabled_individual`] to the new state.
351    #[watch(self.enabled_individual.subscribe_stable())]
352    fn enabled_individual_stable_state_changed(
353        receiver: &Receiver,
354        _: &State,
355        state: media_exchange_state::Stable,
356    ) {
357        receiver
358            .enabled_individual
359            .set(state == media_exchange_state::Stable::Enabled);
360    }
361
362    /// Watcher for media exchange state [`media_exchange_state::Transition`]
363    /// updates.
364    ///
365    /// Sends [`TrackEvent::MediaExchangeIntention`][1] with the provided
366    /// [`media_exchange_state`].
367    ///
368    /// [1]: crate::peer::TrackEvent::MediaExchangeIntention
369    #[watch(self.enabled_individual.subscribe_transition())]
370    fn enabled_individual_transition_started(
371        receiver: &Receiver,
372        _: &State,
373        state: media_exchange_state::Transition,
374    ) {
375        receiver.send_media_exchange_state_intention(state);
376    }
377
378    /// Watcher for the mute state updates.
379    ///
380    /// Propagates command to the associated [`Receiver`] and updates its media
381    /// track (if any).
382    #[watch(self.muted.subscribe())]
383    fn mute_state_changed(receiver: &Receiver, _: &State, muted: bool) {
384        receiver.muted.set(muted);
385        if let Some(track) = receiver.track.borrow().as_ref() {
386            track.set_muted(muted);
387        }
388    }
389
390    /// Stops transition timeouts on [`SyncPhase::Desynced`].
391    ///
392    /// Sends media state intentions and resets transition timeouts on
393    /// [`SyncPhase::Synced`].
394    #[watch(self.sync_phase.subscribe().skip(1))]
395    fn sync_phase_watcher(
396        receiver: &Receiver,
397        state: &State,
398        sync_phase: SyncPhase,
399    ) {
400        match sync_phase {
401            SyncPhase::Synced => {
402                if let MediaExchangeState::Transition(transition) =
403                    state.enabled_individual.state()
404                {
405                    receiver.send_media_exchange_state_intention(transition);
406                }
407                state.enabled_individual.reset_transition_timeout();
408            }
409            SyncPhase::Desynced => {
410                state.enabled_individual.stop_transition_timeout();
411            }
412            SyncPhase::Syncing => (),
413        }
414    }
415
416    /// Updates [`MediaDirection`] of the provided [`Receiver`].
417    #[watch(self.media_direction.subscribe())]
418    fn direction_watcher(
419        receiver: &Receiver,
420        _: &State,
421        direction: MediaDirection,
422    ) {
423        receiver.set_media_direction(direction);
424    }
425}
426
427impl MediaStateControllable for State {
428    fn media_exchange_state_controller(
429        &self,
430    ) -> Rc<MediaExchangeStateController> {
431        Rc::clone(&self.enabled_individual)
432    }
433
434    fn mute_state_controller(&self) -> Rc<MuteStateController> {
435        // Receivers can be muted, but currently they are muted directly by
436        // server events.
437        //
438        // There is no point to provide an external API for muting receivers,
439        // since the muting is pipelined after demuxing and decoding, so it
440        // won't reduce incoming traffic or CPU usage. Therefore receivers
441        // muting don't require `MuteStateController`'s state management.
442        //
443        // Removing this `unreachable!()` would require abstracting
444        // `MuteStateController` to some trait and creating some dummy
445        // implementation. Not worth it atm.
446        unreachable!("Receivers muting is not implemented");
447    }
448}
449
450impl TransceiverSide for State {
451    fn track_id(&self) -> TrackId {
452        self.id
453    }
454
455    fn kind(&self) -> MediaKind {
456        match &self.media_type {
457            MediaType::Audio(_) => MediaKind::Audio,
458            MediaType::Video(_) => MediaKind::Video,
459        }
460    }
461
462    fn source_kind(&self) -> MediaSourceKind {
463        match &self.media_type {
464            MediaType::Audio(_) => MediaSourceKind::Device,
465            MediaType::Video(video) => video.source_kind,
466        }
467    }
468
469    fn is_transitable(&self) -> bool {
470        true
471    }
472}
473
474#[cfg(feature = "mockable")]
475// TODO: Try remove on next Rust version upgrade.
476#[expect(clippy::allow_attributes, reason = "`#[expect]` is not considered")]
477#[allow(clippy::multiple_inherent_impl, reason = "feature gated")]
478impl State {
479    /// Stabilizes the [`MediaExchangeState`] of this [`State`].
480    pub fn stabilize(&self) {
481        if let MediaExchangeState::Transition(transition) =
482            self.enabled_individual.state()
483        {
484            self.enabled_individual.update(transition.intended());
485            self.enabled_general.set(transition.intended());
486        }
487    }
488
489    /// Sets the [`State::sync_phase`] to a [`SyncPhase::Synced`].
490    pub fn synced(&self) {
491        self.sync_phase.set(SyncPhase::Synced);
492    }
493}