medea_jason/peer/media/receiver/
component.rs1use std::{iter, rc::Rc};
4
5use futures::StreamExt as _;
6use medea_client_api_proto as proto;
7use medea_client_api_proto::{
8 MediaSourceKind, MediaType, MemberId, TrackId, TrackPatchEvent,
9};
10use medea_macro::watchers;
11use medea_reactive::{
12 AllProcessed, Guarded, ObservableCell, Processed, ProgressableCell,
13 when_all_processed,
14};
15use proto::ConnectionMode;
16
17use super::Receiver;
18use crate::{
19 media::{LocalTracksConstraints, MediaDirection, MediaKind},
20 peer::{
21 MediaExchangeState, MediaExchangeStateController,
22 MediaStateControllable, MuteStateController, TransceiverSide,
23 component::SyncPhase,
24 media::{InTransition as _, transitable_state::media_exchange_state},
25 },
26 utils::{AsProtoState, SynchronizableState, Updatable, component},
27};
28
29pub type Component = component::Component<State, Receiver>;
32
33#[derive(Debug)]
35pub struct State {
36 id: TrackId,
40
41 mid: Option<String>,
46
47 media_type: MediaType,
51
52 sender_id: MemberId,
56
57 enabled_individual: Rc<MediaExchangeStateController>,
62
63 enabled_general: ProgressableCell<media_exchange_state::Stable>,
68
69 media_direction: ObservableCell<MediaDirection>,
71
72 muted: ObservableCell<bool>,
76
77 connection_mode: ConnectionMode,
83
84 sync_phase: ObservableCell<SyncPhase>,
86}
87
88impl AsProtoState for State {
89 type Output = proto::state::Receiver;
90
91 fn as_proto(&self) -> Self::Output {
92 Self::Output {
93 id: self.id,
94 connection_mode: self.connection_mode,
95 mid: self.mid.clone(),
96 media_type: self.media_type.clone(),
97 sender_id: self.sender_id.clone(),
98 muted: false,
99 media_direction: self.media_direction().into(),
100 }
101 }
102}
103
104impl SynchronizableState for State {
105 type Input = proto::state::Receiver;
106
107 fn from_proto(input: Self::Input, _: &LocalTracksConstraints) -> Self {
108 Self {
109 id: input.id,
110 mid: input.mid,
111 media_type: input.media_type,
112 sender_id: input.sender_id,
113 enabled_individual: MediaExchangeStateController::new(
114 media_exchange_state::Stable::from(
115 input.media_direction.is_recv_enabled(),
116 ),
117 ),
118 enabled_general: ProgressableCell::new(
119 media_exchange_state::Stable::from(
120 input.media_direction.is_enabled_general(),
121 ),
122 ),
123 muted: ObservableCell::new(input.muted),
124 media_direction: ObservableCell::new(input.media_direction.into()),
125 connection_mode: input.connection_mode,
126 sync_phase: ObservableCell::new(SyncPhase::Synced),
127 }
128 }
129
130 fn apply(&self, input: Self::Input, _: &LocalTracksConstraints) {
131 let new_media_exchange_state = media_exchange_state::Stable::from(
132 input.media_direction.is_recv_enabled(),
133 );
134 let current_media_exchange_state = match self.enabled_individual.state()
135 {
136 MediaExchangeState::Transition(transition) => {
137 transition.into_inner()
138 }
139 MediaExchangeState::Stable(stable) => stable,
140 };
141 if current_media_exchange_state != new_media_exchange_state {
142 self.enabled_individual.update(new_media_exchange_state);
143 }
144
145 self.enabled_general.set(media_exchange_state::Stable::from(
146 input.media_direction.is_enabled_general(),
147 ));
148 self.media_direction.set(input.media_direction.into());
149
150 self.sync_phase.set(SyncPhase::Synced);
151 }
152}
153
154impl Updatable for State {
155 fn when_stabilized(&self) -> AllProcessed<'static> {
158 let controller = Rc::clone(&self.enabled_individual);
159 when_all_processed(iter::once(
160 Processed::new(Box::new(move || {
161 let controller = Rc::clone(&controller);
162 Box::pin(async move {
163 controller.when_stabilized().await;
164 })
165 }))
166 .into(),
167 ))
168 }
169
170 fn when_updated(&self) -> AllProcessed<'static> {
173 when_all_processed(vec![
174 self.enabled_individual.when_processed().into(),
175 self.enabled_general.when_all_processed().into(),
176 ])
177 }
178
179 fn connection_lost(&self) {
181 self.sync_phase.set(SyncPhase::Desynced);
182 }
183
184 fn connection_recovered(&self) {
186 self.sync_phase.set(SyncPhase::Syncing);
187 }
188}
189
190impl From<&State> for proto::state::Receiver {
191 fn from(from: &State) -> Self {
192 Self {
193 id: from.id,
194 connection_mode: from.connection_mode,
195 mid: from.mid.clone(),
196 media_type: from.media_type.clone(),
197 sender_id: from.sender_id.clone(),
198 media_direction: from.media_direction().into(),
199 muted: false,
200 }
201 }
202}
203
204impl State {
205 #[must_use]
207 pub fn new(
208 id: TrackId,
209 mid: Option<String>,
210 media_type: MediaType,
211 media_direction: medea_client_api_proto::MediaDirection,
212 muted: bool,
213 sender: MemberId,
214 connection_mode: ConnectionMode,
215 ) -> Self {
216 Self {
217 id,
218 mid,
219 media_type,
220 sender_id: sender,
221 enabled_individual: MediaExchangeStateController::new(
222 media_direction.is_recv_enabled().into(),
223 ),
224 enabled_general: ProgressableCell::new(
225 media_direction.is_enabled_general().into(),
226 ),
227 muted: ObservableCell::new(muted),
228 sync_phase: ObservableCell::new(SyncPhase::Synced),
229 connection_mode,
230 media_direction: ObservableCell::new(media_direction.into()),
231 }
232 }
233
234 #[must_use]
236 pub const fn id(&self) -> TrackId {
237 self.id
238 }
239
240 #[must_use]
242 pub fn mid(&self) -> Option<&str> {
243 self.mid.as_deref()
244 }
245
246 #[must_use]
248 pub const fn media_type(&self) -> &MediaType {
249 &self.media_type
250 }
251
252 #[must_use]
255 pub const fn sender_id(&self) -> &MemberId {
256 &self.sender_id
257 }
258
259 #[must_use]
261 pub fn enabled_individual(&self) -> bool {
262 self.enabled_individual.enabled()
263 }
264
265 #[must_use]
267 pub fn enabled_general(&self) -> bool {
268 self.enabled_general.get() == media_exchange_state::Stable::Enabled
269 }
270
271 #[must_use]
273 pub fn muted(&self) -> bool {
274 self.muted.get()
275 }
276
277 #[must_use]
279 pub fn media_direction(&self) -> MediaDirection {
280 self.media_direction.get()
281 }
282
283 pub fn update(&self, track_patch: &TrackPatchEvent) {
285 if self.id != track_patch.id {
286 return;
287 }
288 if let Some(direction) = track_patch.media_direction {
289 self.enabled_general.set(direction.is_enabled_general().into());
290
291 self.enabled_individual.update(direction.is_recv_enabled().into());
292 }
293 if let Some(muted) = track_patch.muted {
294 self.muted.set(muted);
295 }
296 if let Some(direction) = track_patch.media_direction {
297 self.media_direction.set(direction.into());
298 }
299 }
300}
301
302#[watchers]
303impl Component {
304 #[watch(self.enabled_general.subscribe())]
312 async fn general_media_exchange_state_changed(
313 receiver: Rc<Receiver>,
314 st: Rc<State>,
315 state: Guarded<media_exchange_state::Stable>,
316 ) {
317 let (state, _guard) = state.into_parts();
318 receiver
319 .enabled_general
320 .set(state == media_exchange_state::Stable::Enabled);
321 if (st.connection_mode, state)
322 == (ConnectionMode::Mesh, media_exchange_state::Stable::Disabled)
323 {
324 let sub_recv = {
325 receiver
326 .transceiver
327 .borrow()
328 .as_ref()
329 .map(|trnscvr| trnscvr.set_recv(false))
330 };
331 if let Some(fut) = sub_recv {
332 fut.await;
333 }
334 } else {
335 let add_recv = receiver
336 .transceiver
337 .borrow()
338 .as_ref()
339 .map(|trnscvr| trnscvr.set_recv(true));
340 if let Some(fut) = add_recv {
341 fut.await;
342 }
343 }
344 receiver.maybe_notify_track().await;
345 }
346
347 #[watch(self.enabled_individual.subscribe_stable())]
352 fn enabled_individual_stable_state_changed(
353 receiver: &Receiver,
354 _: &State,
355 state: media_exchange_state::Stable,
356 ) {
357 receiver
358 .enabled_individual
359 .set(state == media_exchange_state::Stable::Enabled);
360 }
361
362 #[watch(self.enabled_individual.subscribe_transition())]
370 fn enabled_individual_transition_started(
371 receiver: &Receiver,
372 _: &State,
373 state: media_exchange_state::Transition,
374 ) {
375 receiver.send_media_exchange_state_intention(state);
376 }
377
378 #[watch(self.muted.subscribe())]
383 fn mute_state_changed(receiver: &Receiver, _: &State, muted: bool) {
384 receiver.muted.set(muted);
385 if let Some(track) = receiver.track.borrow().as_ref() {
386 track.set_muted(muted);
387 }
388 }
389
390 #[watch(self.sync_phase.subscribe().skip(1))]
395 fn sync_phase_watcher(
396 receiver: &Receiver,
397 state: &State,
398 sync_phase: SyncPhase,
399 ) {
400 match sync_phase {
401 SyncPhase::Synced => {
402 if let MediaExchangeState::Transition(transition) =
403 state.enabled_individual.state()
404 {
405 receiver.send_media_exchange_state_intention(transition);
406 }
407 state.enabled_individual.reset_transition_timeout();
408 }
409 SyncPhase::Desynced => {
410 state.enabled_individual.stop_transition_timeout();
411 }
412 SyncPhase::Syncing => (),
413 }
414 }
415
416 #[watch(self.media_direction.subscribe())]
418 fn direction_watcher(
419 receiver: &Receiver,
420 _: &State,
421 direction: MediaDirection,
422 ) {
423 receiver.set_media_direction(direction);
424 }
425}
426
427impl MediaStateControllable for State {
428 fn media_exchange_state_controller(
429 &self,
430 ) -> Rc<MediaExchangeStateController> {
431 Rc::clone(&self.enabled_individual)
432 }
433
434 fn mute_state_controller(&self) -> Rc<MuteStateController> {
435 unreachable!("Receivers muting is not implemented");
447 }
448}
449
450impl TransceiverSide for State {
451 fn track_id(&self) -> TrackId {
452 self.id
453 }
454
455 fn kind(&self) -> MediaKind {
456 match &self.media_type {
457 MediaType::Audio(_) => MediaKind::Audio,
458 MediaType::Video(_) => MediaKind::Video,
459 }
460 }
461
462 fn source_kind(&self) -> MediaSourceKind {
463 match &self.media_type {
464 MediaType::Audio(_) => MediaSourceKind::Device,
465 MediaType::Video(video) => video.source_kind,
466 }
467 }
468
469 fn is_transitable(&self) -> bool {
470 true
471 }
472}
473
474#[cfg(feature = "mockable")]
475#[expect(clippy::allow_attributes, reason = "`#[expect]` is not considered")]
477#[allow(clippy::multiple_inherent_impl, reason = "feature gated")]
478impl State {
479 pub fn stabilize(&self) {
481 if let MediaExchangeState::Transition(transition) =
482 self.enabled_individual.state()
483 {
484 self.enabled_individual.update(transition.intended());
485 self.enabled_general.set(transition.intended());
486 }
487 }
488
489 pub fn synced(&self) {
491 self.sync_phase.set(SyncPhase::Synced);
492 }
493}