1use std::{
4 cell::{Cell, RefCell},
5 rc::Rc,
6};
7
8use futures::{StreamExt as _, future::LocalBoxFuture};
9use medea_client_api_proto::{
10 self as proto, MediaDirection, MediaSourceKind, MediaType, MemberId,
11 TrackId, TrackPatchEvent,
12};
13use medea_macro::watchers;
14use medea_reactive::{AllProcessed, Guarded, ObservableCell, ProgressableCell};
15use proto::ConnectionMode;
16use tracerr::Traced;
17
18use super::Sender;
19use crate::{
20 media::{LocalTracksConstraints, MediaKind, TrackConstraints, VideoSource},
21 peer::{
22 MediaExchangeStateController, MediaState, MediaStateControllable,
23 MuteStateController, RtcPeerConnectionError, TransceiverSide,
24 UpdateLocalStreamError,
25 component::SyncPhase,
26 media::{
27 InTransition as _, MediaExchangeState, MuteState,
28 ProhibitedStateError, media_exchange_state, mute_state,
29 },
30 },
31 platform::transceiver::probe_target_codecs,
32 utils::{AsProtoState, SynchronizableState, Updatable, component},
33};
34
35#[derive(Clone, Debug)]
42enum LocalTrackState {
43 Stable,
47
48 NeedUpdate,
52
53 Failed(Traced<UpdateLocalStreamError>),
62}
63
64impl PartialEq for LocalTrackState {
65 fn eq(&self, other: &Self) -> bool {
66 match self {
67 Self::NeedUpdate => matches!(other, Self::NeedUpdate),
68 Self::Stable => matches!(other, Self::Stable),
69 Self::Failed(_) => matches!(other, Self::Failed(_)),
70 }
71 }
72}
73
74pub type Component = component::Component<State, Sender>;
77
78#[derive(Debug)]
80pub struct State {
81 id: TrackId,
85
86 mid: Option<String>,
91
92 media_type: MediaType,
96
97 receivers: RefCell<Vec<MemberId>>,
101
102 send_encodings: ProgressableCell<Vec<proto::EncodingParameters>>,
106
107 enabled_individual: Rc<MediaExchangeStateController>,
112
113 muted: Rc<MuteStateController>,
117
118 enabled_general: ProgressableCell<media_exchange_state::Stable>,
123
124 media_direction: Cell<MediaDirection>,
126
127 send_constraints: LocalTracksConstraints,
132
133 connection_mode: ConnectionMode,
139
140 local_track: ObservableCell<LocalTrackState>,
144
145 sync_phase: ObservableCell<SyncPhase>,
147}
148
149impl AsProtoState for State {
150 type Output = proto::state::Sender;
151
152 fn as_proto(&self) -> Self::Output {
153 Self::Output {
154 id: self.id,
155 connection_mode: self.connection_mode,
156 mid: self.mid.clone(),
157 media_type: self.media_type.clone(),
158 receivers: self.receivers.borrow().clone(),
159 media_direction: self.media_direction.get(),
160 muted: self.muted.muted(),
161 }
162 }
163}
164
165impl SynchronizableState for State {
166 type Input = proto::state::Sender;
167
168 fn from_proto(
169 input: Self::Input,
170 send_constraints: &LocalTracksConstraints,
171 ) -> Self {
172 Self {
173 id: input.id,
174 mid: input.mid,
175 media_type: input.media_type,
176 receivers: RefCell::new(input.receivers),
177 muted: MuteStateController::new(mute_state::Stable::from(
178 input.muted,
179 )),
180 enabled_individual: MediaExchangeStateController::new(
181 media_exchange_state::Stable::from(
182 input.media_direction.is_send_enabled(),
183 ),
184 ),
185 enabled_general: ProgressableCell::new(
186 media_exchange_state::Stable::from(
187 input.media_direction.is_enabled_general(),
188 ),
189 ),
190 media_direction: Cell::new(input.media_direction),
191 send_constraints: send_constraints.clone(),
192 connection_mode: input.connection_mode,
193 local_track: ObservableCell::new(LocalTrackState::Stable),
194 sync_phase: ObservableCell::new(SyncPhase::Synced),
195 send_encodings: ProgressableCell::new(Vec::new()),
196 }
197 }
198
199 fn apply(&self, input: Self::Input, _: &LocalTracksConstraints) {
200 let new_media_exchange_state = media_exchange_state::Stable::from(
201 input.media_direction.is_send_enabled(),
202 );
203 let current_media_exchange_state = match self.enabled_individual.state()
204 {
205 MediaExchangeState::Transition(transition) => {
206 transition.into_inner()
207 }
208 MediaExchangeState::Stable(stable) => stable,
209 };
210 if current_media_exchange_state != new_media_exchange_state {
211 self.enabled_individual.update(new_media_exchange_state);
212 }
213
214 let new_mute_state = mute_state::Stable::from(input.muted);
215 let current_mute_state = match self.muted.state() {
216 MuteState::Stable(stable) => stable,
217 MuteState::Transition(transition) => transition.into_inner(),
218 };
219 if current_mute_state != new_mute_state {
220 self.muted.update(new_mute_state);
221 }
222
223 let new_general_media_exchange_state =
224 media_exchange_state::Stable::from(
225 input.media_direction.is_enabled_general(),
226 );
227 self.enabled_general.set(new_general_media_exchange_state);
228
229 self.sync_phase.set(SyncPhase::Synced);
230 }
231}
232
233impl Updatable for State {
234 fn when_stabilized(&self) -> AllProcessed<'static> {
237 medea_reactive::when_all_processed(vec![
238 Rc::clone(&self.enabled_individual).when_stabilized().into(),
239 Rc::clone(&self.muted).when_stabilized().into(),
240 ])
241 }
242
243 fn when_updated(&self) -> AllProcessed<'static> {
246 medea_reactive::when_all_processed(vec![
247 self.enabled_individual.when_processed().into(),
248 self.muted.when_processed().into(),
249 self.enabled_general.when_all_processed().into(),
250 self.send_encodings.when_all_processed().into(),
251 ])
252 }
253
254 fn connection_lost(&self) {
256 self.sync_phase.set(SyncPhase::Desynced);
257 }
258
259 fn connection_recovered(&self) {
261 self.sync_phase.set(SyncPhase::Syncing);
262 }
263}
264
265impl From<&State> for proto::state::Sender {
266 fn from(state: &State) -> Self {
267 Self {
268 id: state.id,
269 connection_mode: state.connection_mode,
270 mid: state.mid.clone(),
271 media_type: state.media_type.clone(),
272 receivers: state.receivers.borrow().clone(),
273 media_direction: state.media_direction.get(),
274 muted: state.muted.muted(),
275 }
276 }
277}
278
279impl State {
280 #[expect(clippy::too_many_arguments, reason = "not a problem")]
282 #[must_use]
283 pub fn new(
284 id: TrackId,
285 mid: Option<String>,
286 media_type: MediaType,
287 media_direction: MediaDirection,
288 muted: bool,
289 receivers: Vec<MemberId>,
290 send_constraints: LocalTracksConstraints,
291 connection_mode: ConnectionMode,
292 ) -> Self {
293 Self {
294 id,
295 mid,
296 media_type,
297 receivers: RefCell::new(receivers),
298 send_encodings: ProgressableCell::new(Vec::new()),
299 enabled_individual: MediaExchangeStateController::new(
300 media_exchange_state::Stable::from(
301 media_direction.is_send_enabled(),
302 ),
303 ),
304 enabled_general: ProgressableCell::new(
305 media_exchange_state::Stable::from(
306 media_direction.is_enabled_general(),
307 ),
308 ),
309 media_direction: Cell::new(media_direction),
310 muted: MuteStateController::new(mute_state::Stable::from(muted)),
311 sync_phase: ObservableCell::new(SyncPhase::Synced),
312 send_constraints,
313 connection_mode,
314 local_track: ObservableCell::new(LocalTrackState::Stable),
315 }
316 }
317
318 #[must_use]
321 pub fn enabled(&self) -> bool {
322 self.enabled_individual.enabled()
323 }
324
325 #[must_use]
327 pub const fn id(&self) -> TrackId {
328 self.id
329 }
330
331 #[must_use]
333 pub fn mid(&self) -> Option<&str> {
334 self.mid.as_deref()
335 }
336
337 #[must_use]
339 pub const fn media_type(&self) -> &MediaType {
340 &self.media_type
341 }
342
343 #[must_use]
346 pub fn receivers(&self) -> Vec<MemberId> {
347 self.receivers.borrow().clone()
348 }
349
350 #[must_use]
352 pub fn is_enabled_individual(&self) -> bool {
353 self.enabled_individual.enabled()
354 }
355
356 #[must_use]
358 pub fn is_enabled_general(&self) -> bool {
359 self.enabled_general.get() == media_exchange_state::Stable::Enabled
360 }
361
362 #[must_use]
364 pub fn is_muted(&self) -> bool {
365 self.muted.muted()
366 }
367
368 pub fn local_stream_update_result(
375 &self,
376 ) -> LocalBoxFuture<'static, Result<(), Traced<UpdateLocalStreamError>>>
377 {
378 let mut local_track_state_rx = self.local_track.subscribe();
379 Box::pin(async move {
380 while let Some(s) = local_track_state_rx.next().await {
381 match s {
382 LocalTrackState::Stable => return Ok(()),
383 LocalTrackState::Failed(err) => {
384 return Err(tracerr::new!(err));
385 }
386 LocalTrackState::NeedUpdate => (),
387 }
388 }
389
390 Ok(())
391 })
392 }
393
394 pub fn update(&self, track_patch: TrackPatchEvent) {
396 if track_patch.id != self.id {
397 return;
398 }
399 if let Some(direction) = track_patch.media_direction {
400 self.media_direction.set(direction);
401 self.enabled_general.set((direction.is_enabled_general()).into());
402
403 self.enabled_individual.update(direction.is_send_enabled().into());
404 }
405 if let Some(muted) = track_patch.muted {
406 self.muted.update(mute_state::Stable::from(muted));
407 }
408 if let Some(receivers) = track_patch.receivers {
409 *self.receivers.borrow_mut() = receivers;
410 }
411 if let Some(enc_params) = track_patch.encoding_parameters {
412 drop(self.send_encodings.replace(enc_params));
413 }
414 }
415
416 #[must_use]
418 pub fn is_local_stream_update_needed(&self) -> bool {
419 matches!(self.local_track.get(), LocalTrackState::NeedUpdate)
420 }
421
422 pub fn failed_local_stream_update(
425 &self,
426 error: Traced<UpdateLocalStreamError>,
427 ) {
428 self.local_track.set(LocalTrackState::Failed(error));
429 }
430
431 pub fn local_stream_updated(&self) {
433 self.local_track.set(LocalTrackState::Stable);
434 }
435
436 #[must_use]
438 pub const fn media_kind(&self) -> MediaKind {
439 match &self.media_type {
440 MediaType::Audio(_) => MediaKind::Audio,
441 MediaType::Video(_) => MediaKind::Video,
442 }
443 }
444
445 #[must_use]
447 pub const fn media_source(&self) -> MediaSourceKind {
448 match &self.media_type {
449 MediaType::Audio(_) => MediaSourceKind::Device,
450 MediaType::Video(video) => video.source_kind,
451 }
452 }
453}
454
455#[watchers]
456impl Component {
457 #[watch(self.enabled_individual.subscribe_transition())]
465 fn enabled_individual_transition_started(
466 sender: &Sender,
467 _: &State,
468 new_state: media_exchange_state::Transition,
469 ) {
470 sender.send_media_exchange_state_intention(new_state);
471 }
472
473 #[watch(self.muted.subscribe_transition())]
480 fn mute_state_transition_watcher(
481 sender: &Sender,
482 _: &State,
483 new_state: mute_state::Transition,
484 ) {
485 sender.send_mute_state_intention(new_state);
486 }
487
488 #[watch(self.enabled_general.subscribe())]
496 async fn enabled_general_state_changed(
497 sender: Rc<Sender>,
498 state: Rc<State>,
499 new_state: Guarded<media_exchange_state::Stable>,
500 ) {
501 let (new_state, _guard) = new_state.into_parts();
502 sender
503 .enabled_general
504 .set(new_state == media_exchange_state::Stable::Enabled);
505
506 if state.connection_mode == ConnectionMode::Sfu {
507 sender.transceiver.set_send(true).await;
509 } else {
510 match new_state {
511 media_exchange_state::Stable::Enabled => {
512 if sender.enabled_in_cons() {
513 sender.transceiver.set_send(true).await;
514 }
515 }
516 media_exchange_state::Stable::Disabled => {
517 sender.transceiver.set_send(false).await;
518 }
519 }
520 }
521 }
522
523 #[watch(self.enabled_individual.subscribe_stable())]
534 async fn enabled_individual_stable_state_changed(
535 sender: Rc<Sender>,
536 state: Rc<State>,
537 new_state: media_exchange_state::Stable,
538 ) {
539 sender
540 .enabled_individual
541 .set(new_state == media_exchange_state::Stable::Enabled);
542 match new_state {
543 media_exchange_state::Stable::Enabled => {
544 state.local_track.set(LocalTrackState::NeedUpdate);
545 }
546 media_exchange_state::Stable::Disabled => {
547 sender.remove_track().await;
548 }
549 }
550 }
551
552 #[watch(self.muted.subscribe_stable())]
559 fn mute_state_stable_watcher(
560 sender: &Sender,
561 _: &State,
562 new_state: mute_state::Stable,
563 ) {
564 sender.muted.set(new_state == mute_state::Stable::Muted);
565 if let Some(track) = sender.track.borrow().as_ref() {
566 match new_state {
567 mute_state::Stable::Muted => {
568 track.set_enabled(false);
569 }
570 mute_state::Stable::Unmuted => {
571 track.set_enabled(true);
572 }
573 }
574 }
575 }
576
577 #[watch(self.sync_phase.subscribe().skip(1))]
582 fn sync_phase_watcher(
583 sender: &Sender,
584 state: &State,
585 sync_phase: SyncPhase,
586 ) {
587 match sync_phase {
588 SyncPhase::Synced => {
589 if let MediaExchangeState::Transition(transition) =
590 state.enabled_individual.state()
591 {
592 sender.send_media_exchange_state_intention(transition);
593 }
594 if let MuteState::Transition(transition) = state.muted.state() {
595 sender.send_mute_state_intention(transition);
596 }
597 state.enabled_individual.reset_transition_timeout();
598 state.muted.reset_transition_timeout();
599 }
600 SyncPhase::Desynced => {
601 state.enabled_individual.stop_transition_timeout();
602 state.muted.stop_transition_timeout();
603 }
604 SyncPhase::Syncing => (),
605 }
606 }
607
608 #[expect(clippy::needless_pass_by_value, reason = "required by macro")]
610 #[watch(self.local_track.subscribe())]
611 fn local_track_state_changed(
612 _: &Sender,
613 state: &State,
614 new_state: LocalTrackState,
615 ) -> Result<(), Traced<ProhibitedStateError>> {
616 if matches!(new_state, LocalTrackState::Failed(_)) {
617 state.media_state_transition_to(MediaState::MediaExchange(
618 media_exchange_state::Stable::Disabled,
619 ))?;
620 }
621 Ok(())
622 }
623
624 #[watch(self.send_encodings.subscribe().skip(1))]
626 async fn send_encodings_updated(
627 sender: Rc<Sender>,
628 _: Rc<State>,
629 enc_params: Guarded<Vec<proto::EncodingParameters>>,
630 ) -> Result<(), Traced<RtcPeerConnectionError>> {
631 let (enc_params, _guard) = enc_params.into_parts();
632
633 let target_codecs = probe_target_codecs(
634 enc_params.iter().filter_map(|e| e.codec.as_ref()),
635 )
636 .await;
637 if let Some(target_codecs) = target_codecs {
644 sender.transceiver.set_codec_preferences(target_codecs);
645 } else {
646 sender.transceiver.set_codec_preferences(Vec::new());
648 }
649
650 sender
651 .transceiver
652 .update_send_encodings(&enc_params)
653 .await
654 .map_err(RtcPeerConnectionError::UpdateSendEncodingsError)
655 .map_err(tracerr::wrap!())
656 }
657}
658
659impl TransceiverSide for State {
660 fn track_id(&self) -> TrackId {
661 self.id
662 }
663
664 fn kind(&self) -> MediaKind {
665 self.media_kind()
666 }
667
668 fn source_kind(&self) -> MediaSourceKind {
669 self.media_source()
670 }
671
672 fn is_transitable(&self) -> bool {
673 let caps = TrackConstraints::from(self.media_type.clone());
674 match &caps {
675 TrackConstraints::Video(VideoSource::Device(_)) => {
676 self.send_constraints.inner().get_device_video().is_some()
677 }
678 TrackConstraints::Video(VideoSource::Display(_)) => {
679 self.send_constraints.inner().get_display_video().is_some()
680 }
681 TrackConstraints::Audio(_) => true,
682 }
683 }
684}
685
686impl MediaStateControllable for State {
687 fn media_exchange_state_controller(
688 &self,
689 ) -> Rc<MediaExchangeStateController> {
690 Rc::clone(&self.enabled_individual)
691 }
692
693 fn mute_state_controller(&self) -> Rc<MuteStateController> {
694 Rc::clone(&self.muted)
695 }
696
697 fn media_state_transition_to(
698 &self,
699 desired_state: MediaState,
700 ) -> Result<(), Traced<ProhibitedStateError>> {
701 if self.media_type.required()
702 && matches!(
703 desired_state,
704 MediaState::Mute(mute_state::Stable::Muted)
705 | MediaState::MediaExchange(
706 media_exchange_state::Stable::Disabled
707 )
708 )
709 {
710 Err(tracerr::new!(
711 ProhibitedStateError::CannotDisableRequiredSender
712 ))
713 } else {
714 match desired_state {
715 MediaState::MediaExchange(desired_state) => {
716 self.media_exchange_state_controller()
717 .transition_to(desired_state);
718 }
719 MediaState::Mute(desired_state) => {
720 self.mute_state_controller().transition_to(desired_state);
721 }
722 }
723 Ok(())
724 }
725 }
726}
727
728#[cfg(feature = "mockable")]
729#[expect(clippy::allow_attributes, reason = "`#[expect]` is not considered")]
731#[allow(clippy::multiple_inherent_impl, reason = "feature gated")]
732impl State {
733 pub fn synced(&self) {
735 self.sync_phase.set(SyncPhase::Synced);
736 }
737}