medea_jason/peer/component/
mod.rs

1//! Implementation of a [`Component`].
2
3mod ice_candidates;
4mod local_sdp;
5mod tracks_repository;
6mod watchers;
7
8use std::{cell::Cell, collections::HashSet, rc::Rc};
9
10use futures::{StreamExt as _, TryFutureExt as _, future::LocalBoxFuture};
11pub use local_sdp::DESCRIPTION_APPROVE_TIMEOUT;
12use medea_client_api_proto::{
13    self as proto, IceCandidate, IceServer, NegotiationRole, PeerId as Id,
14    TrackId,
15};
16use medea_reactive::{AllProcessed, ObservableCell, ProgressableCell};
17use proto::{ConnectionMode, MemberId};
18use tracerr::Traced;
19
20use self::{
21    ice_candidates::IceCandidates, local_sdp::LocalSdp,
22    tracks_repository::TracksRepository,
23};
24use crate::{
25    media::LocalTracksConstraints,
26    peer::{
27        LocalStreamUpdateCriteria, PeerConnection, UpdateLocalStreamError,
28        media::{receiver, sender},
29    },
30    utils::{AsProtoState, SynchronizableState, Updatable, component},
31};
32
33/// Possible synchronization phases of [`Component`]'s state.
34#[derive(Clone, Copy, Debug, Eq, PartialEq)]
35pub enum SyncPhase {
36    /// State is desynced and should be synced on RPC reconnection.
37    Desynced,
38
39    /// State syncs with a Media Server state.
40    Syncing,
41
42    /// State is synced.
43    Synced,
44}
45
46/// Possible negotiation phases of a [`Component`].
47///
48/// ```ignore
49///           +--------+
50///           |        |
51/// +-------->+ Stable +<----------+
52/// |         |        |           |
53/// |         +---+----+           |
54/// |             |                |
55/// |             v                |
56/// |      +------+-------+        |
57/// |      |              |        |
58/// |      | WaitLocalSdp +<----+  |
59/// |      |              |     |  |
60/// |      +------+-------+     |  |
61/// |             |             |  |
62/// |             v             |  |
63/// |  +----------+----------+  |  |
64/// |  |                     |  |  |
65/// +--+ WaitLocalSdpApprove +--+  |
66///    |                     |     |
67///    +----------+----------+     |
68///               |                |
69///               v                |
70///       +-------+-------+        |
71///       |               |        |
72///       | WaitRemoteSdp |        |
73///       |               |        |
74///       +-------+-------+        |
75///               |                |
76///               |                |
77///               +----------------+
78/// ```
79#[derive(Clone, Copy, Debug, Eq, PartialEq)]
80enum NegotiationPhase {
81    /// [`Component`] is new or negotiation is completed.
82    Stable,
83
84    /// [`Component`] waits for a local SDP offer generating.
85    WaitLocalSdp,
86
87    /// [`Component`] waits for a local SDP being approved by server.
88    WaitLocalSdpApprove,
89
90    /// [`Component`] waits for a remote SDP offer.
91    WaitRemoteSdp,
92}
93
94/// State of a [`Component`].
95#[derive(Debug)]
96pub struct State {
97    /// ID of this [`Component`].
98    id: Id,
99
100    /// Indicator whether this `Peer` is working in a [P2P mesh] or [SFU] mode.
101    ///
102    /// [P2P mesh]: https://webrtcglossary.com/mesh
103    /// [SFU]: https://webrtcglossary.com/sfu
104    connection_mode: ConnectionMode,
105
106    /// All [`sender::State`]s of this [`Component`].
107    senders: TracksRepository<sender::State>,
108
109    /// All [`receiver::State`]s of this [`Component`].
110    receivers: TracksRepository<receiver::State>,
111
112    /// Indicator whether this [`Component`] should relay all media through a
113    /// TURN server forcibly.
114    force_relay: bool,
115
116    /// List of [`IceServer`]s which this [`Component`] should use.
117    ice_servers: Vec<IceServer>,
118
119    /// Current [`NegotiationRole`] of this [`Component`].
120    negotiation_role: ProgressableCell<Option<NegotiationRole>>,
121
122    /// [`NegotiationPhase`] of this [`Component`].
123    negotiation_phase: ObservableCell<NegotiationPhase>,
124
125    /// Local session description of this [`Component`].
126    local_sdp: LocalSdp,
127
128    /// Remote session description of this [`Component`].
129    remote_sdp: ProgressableCell<Option<String>>,
130
131    /// Indicates whether ICE restart should be performed.
132    restart_ice: Cell<bool>,
133
134    /// All [`IceCandidate`]s of this [`Component`].
135    ice_candidates: IceCandidates,
136
137    /// Indicator whether [`State::update_local_stream`] method should be
138    /// called if some [`sender`] wants to update a local stream.
139    maybe_update_local_stream: ObservableCell<bool>,
140
141    /// Indicator whether there is some information about tracks to provide
142    /// into [`Connections`].
143    ///
144    /// [`Connections`]: crate::connection::Connections
145    maybe_update_connections:
146        ObservableCell<Option<(TrackId, HashSet<MemberId>)>>,
147
148    /// [`SyncPhase`] of this [`Component`].
149    sync_phase: ObservableCell<SyncPhase>,
150}
151
152impl State {
153    /// Creates a new [`State`] with the provided data.
154    #[must_use]
155    pub fn new(
156        id: Id,
157        ice_servers: Vec<IceServer>,
158        force_relay: bool,
159        negotiation_role: Option<NegotiationRole>,
160        connection_mode: ConnectionMode,
161    ) -> Self {
162        Self {
163            id,
164            connection_mode,
165            senders: TracksRepository::new(),
166            receivers: TracksRepository::new(),
167            ice_servers,
168            force_relay,
169            remote_sdp: ProgressableCell::new(None),
170            local_sdp: LocalSdp::new(),
171            negotiation_role: ProgressableCell::new(negotiation_role),
172            negotiation_phase: ObservableCell::new(NegotiationPhase::Stable),
173            restart_ice: Cell::new(false),
174            ice_candidates: IceCandidates::new(),
175            maybe_update_local_stream: ObservableCell::new(false),
176            maybe_update_connections: ObservableCell::new(None),
177            sync_phase: ObservableCell::new(SyncPhase::Synced),
178        }
179    }
180
181    /// Returns [`ConnectionMode`] of this [`State`].
182    #[must_use]
183    pub const fn connection_mode(&self) -> ConnectionMode {
184        self.connection_mode
185    }
186
187    /// Returns [`Id`] of this [`State`].
188    #[must_use]
189    pub const fn id(&self) -> Id {
190        self.id
191    }
192
193    /// Returns all [`IceServer`]s of this [`State`].
194    #[must_use]
195    pub const fn ice_servers(&self) -> &Vec<IceServer> {
196        &self.ice_servers
197    }
198
199    /// Indicates whether [`PeerConnection`] should be relayed forcibly.
200    #[must_use]
201    pub const fn force_relay(&self) -> bool {
202        self.force_relay
203    }
204
205    /// Inserts a new [`sender::State`] into this [`State`].
206    pub fn insert_sender(&self, track_id: TrackId, sender: Rc<sender::State>) {
207        self.senders.insert(track_id, sender);
208    }
209
210    /// Inserts a new [`receiver::State`] into this [`State`].
211    pub fn insert_receiver(
212        &self,
213        track_id: TrackId,
214        receiver: Rc<receiver::State>,
215    ) {
216        self.receivers.insert(track_id, receiver);
217    }
218
219    /// Returns [`Rc`] to the [`sender::State`] with the provided [`TrackId`].
220    #[must_use]
221    pub fn get_sender(&self, track_id: TrackId) -> Option<Rc<sender::State>> {
222        self.senders.get(track_id)
223    }
224
225    /// Returns [`Rc`] to the [`receiver::State`] with the provided [`TrackId`].
226    #[must_use]
227    pub fn get_receiver(
228        &self,
229        track_id: TrackId,
230    ) -> Option<Rc<receiver::State>> {
231        self.receivers.get(track_id)
232    }
233
234    /// Returns all the send [`TrackId`]s of the peer.
235    pub fn get_send_tracks(&self) -> Vec<TrackId> {
236        self.senders.ids()
237    }
238
239    /// Returns all the receive [`TrackId`]s of the peer.
240    pub fn get_recv_tracks(&self) -> Vec<TrackId> {
241        self.receivers.ids()
242    }
243
244    /// Returns all the [`TrackId`]s (sand and receiver) of the peer.
245    pub fn get_tracks(&self) -> Vec<TrackId> {
246        self.get_send_tracks()
247            .into_iter()
248            .chain(self.get_recv_tracks())
249            .collect()
250    }
251
252    /// Sets [`NegotiationRole`] of this [`State`] to the provided one.
253    pub async fn set_negotiation_role(
254        &self,
255        negotiation_role: NegotiationRole,
256    ) {
257        _ = self
258            .negotiation_role
259            .subscribe()
260            .any(async |val| val.is_none())
261            .await;
262        self.negotiation_role.set(Some(negotiation_role));
263    }
264
265    /// Sets [`State::restart_ice`] to `true`.
266    pub fn restart_ice(&self) {
267        self.restart_ice.set(true);
268    }
269
270    /// Removes [`sender::State`] or [`receiver::State`] with the provided
271    /// [`TrackId`].
272    pub fn remove_track(&self, track_id: TrackId) {
273        if !self.receivers.remove(track_id) {
274            _ = self.senders.remove(track_id);
275        }
276    }
277
278    /// Sets remote SDP offer to the provided value.
279    pub fn set_remote_sdp(&self, sdp: String) {
280        self.remote_sdp.set(Some(sdp));
281    }
282
283    /// Adds [`IceCandidate`] for the [`State`].
284    pub fn add_ice_candidate(&self, ice_candidate: IceCandidate) {
285        self.ice_candidates.add(ice_candidate);
286    }
287
288    /// Marks current local SDP as approved by server.
289    pub fn apply_local_sdp(&self, sdp: String) {
290        self.local_sdp.approved_set(sdp);
291    }
292
293    /// Stops all timeouts of the [`State`].
294    ///
295    /// Stops local SDP rollback timeout.
296    pub fn stop_timeouts(&self) {
297        self.local_sdp.stop_timeout();
298    }
299
300    /// Resumes all timeouts of the [`State`].
301    ///
302    /// Resumes local SDP rollback timeout.
303    pub fn resume_timeouts(&self) {
304        self.local_sdp.resume_timeout();
305    }
306
307    /// Returns [`Future`] resolving once
308    /// [getUserMedia()][1]/[getDisplayMedia()][2] request for the provided
309    /// [`TrackId`]s is resolved.
310    ///
311    /// [`Result`] returned by this [`Future`] will be the same as the result of
312    /// the [getUserMedia()][1]/[getDisplayMedia()][2] request.
313    ///
314    /// Returns last known [getUserMedia()][1]/[getDisplayMedia()][2] request's
315    /// [`Result`], if currently no such requests are running for the provided
316    /// [`TrackId`]s.
317    ///
318    /// [1]: https://tinyurl.com/w3-streams#dom-mediadevices-getusermedia
319    /// [2]: https://w3.org/TR/screen-capture/#dom-mediadevices-getdisplaymedia
320    pub fn local_stream_update_result(
321        &self,
322        tracks_ids: HashSet<TrackId>,
323    ) -> LocalBoxFuture<'static, Result<(), Traced<UpdateLocalStreamError>>>
324    {
325        Box::pin(
326            self.senders
327                .local_stream_update_result(tracks_ids)
328                .map_err(tracerr::map_from_and_wrap!()),
329        )
330    }
331
332    /// Returns [`Future`] resolving when all [`sender::State`]'s and
333    /// [`receiver::State`]'s updates will be applied.
334    pub fn when_all_updated(&self) -> AllProcessed<'static> {
335        medea_reactive::when_all_processed(vec![
336            self.senders.when_updated().into(),
337            self.receivers.when_updated().into(),
338        ])
339    }
340
341    /// Updates a local `MediaStream` based on a
342    /// [`sender::State::is_local_stream_update_needed`].
343    ///
344    /// Resets a [`sender::State`] local stream update when it's updated.
345    async fn update_local_stream(
346        &self,
347        peer: &Rc<PeerConnection>,
348    ) -> Result<(), Traced<UpdateLocalStreamError>> {
349        let mut criteria = LocalStreamUpdateCriteria::empty();
350        let senders = self.senders.get_outdated();
351        for s in &senders {
352            criteria.add(s.media_kind(), s.media_source());
353        }
354        let res = peer
355            .update_local_stream(criteria)
356            .await
357            .map_err(tracerr::map_from_and_wrap!())
358            .map(drop);
359        for s in senders {
360            if let Err(err) = res.clone() {
361                s.failed_local_stream_update(err);
362            } else {
363                s.local_stream_updated();
364            }
365        }
366        res
367    }
368
369    /// Inserts the provided [`proto::Track`] to this [`State`].
370    pub fn insert_track(
371        &self,
372        track: &proto::Track,
373        send_constraints: LocalTracksConstraints,
374    ) {
375        match &track.direction {
376            proto::Direction::Send { receivers, mid } => {
377                self.senders.insert(
378                    track.id,
379                    Rc::new(sender::State::new(
380                        track.id,
381                        mid.clone(),
382                        track.media_type.clone(),
383                        track.media_direction,
384                        track.muted,
385                        receivers.clone(),
386                        send_constraints,
387                        self.connection_mode,
388                    )),
389                );
390            }
391            proto::Direction::Recv { sender, mid } => {
392                self.receivers.insert(
393                    track.id,
394                    Rc::new(receiver::State::new(
395                        track.id,
396                        mid.clone(),
397                        track.media_type.clone(),
398                        track.media_direction,
399                        track.muted,
400                        sender.clone(),
401                        self.connection_mode,
402                    )),
403                );
404            }
405        }
406    }
407
408    /// Returns [`Future`] resolving once all senders inserts and removes are
409    /// processed.
410    pub fn when_all_senders_processed(&self) -> AllProcessed<'static> {
411        self.senders.when_all_processed()
412    }
413
414    /// Returns [`Future`] resolving once all [`State::receivers`]' inserts and
415    /// removes are processed.
416    fn when_all_receivers_processed(&self) -> AllProcessed<'static> {
417        self.receivers.when_all_processed()
418    }
419
420    /// Patches [`sender::State`] or [`receiver::State`] with the provided
421    /// [`proto::TrackPatchEvent`].
422    ///
423    /// Schedules a local stream update.
424    pub async fn patch_track(&self, patch: proto::TrackPatchEvent) {
425        if let Some(receivers) = &patch.receivers {
426            _ = self.maybe_update_connections.when_eq(None).await;
427            self.maybe_update_connections
428                .set(Some((patch.id, receivers.clone().into_iter().collect())));
429        }
430
431        if let Some(sender) = self.get_sender(patch.id) {
432            sender.update(patch);
433            _ = self.maybe_update_local_stream.when_eq(false).await;
434            self.maybe_update_local_stream.set(true);
435        } else if let Some(receiver) = self.get_receiver(patch.id) {
436            receiver.update(&patch);
437        } else {
438            log::warn!("Cannot apply patch to `Track`: {}", patch.id.0);
439        }
440    }
441
442    /// Returns the current SDP offer of this [`State`].
443    #[must_use]
444    pub fn current_sdp_offer(&self) -> Option<String> {
445        self.local_sdp.current()
446    }
447}
448
449/// Component responsible for a [`PeerConnection`] updating.
450pub type Component = component::Component<State, PeerConnection>;
451
452impl AsProtoState for State {
453    type Output = proto::state::Peer;
454
455    fn as_proto(&self) -> Self::Output {
456        Self::Output {
457            id: self.id,
458            connection_mode: self.connection_mode,
459            senders: self.senders.as_proto(),
460            receivers: self.receivers.as_proto(),
461            ice_candidates: self.ice_candidates.as_proto(),
462            force_relay: self.force_relay,
463            ice_servers: self.ice_servers.clone(),
464            negotiation_role: self.negotiation_role.get(),
465            local_sdp: self.local_sdp.current(),
466            remote_sdp: self.remote_sdp.get(),
467            restart_ice: self.restart_ice.get(),
468        }
469    }
470}
471
472impl SynchronizableState for State {
473    type Input = proto::state::Peer;
474
475    fn from_proto(
476        input: Self::Input,
477        send_constraints: &LocalTracksConstraints,
478    ) -> Self {
479        let state = Self::new(
480            input.id,
481            input.ice_servers,
482            input.force_relay,
483            input.negotiation_role,
484            input.connection_mode,
485        );
486
487        #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
488        for (id, sender) in input.senders {
489            if !sender.receivers.is_empty() {
490                state.senders.insert(
491                    id,
492                    Rc::new(sender::State::from_proto(
493                        sender,
494                        send_constraints,
495                    )),
496                );
497            }
498        }
499        #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
500        for (id, receiver) in input.receivers {
501            state.receivers.insert(
502                id,
503                Rc::new(receiver::State::from_proto(
504                    receiver,
505                    send_constraints,
506                )),
507            );
508        }
509        #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
510        for ice_candidate in input.ice_candidates {
511            state.ice_candidates.add(ice_candidate);
512        }
513
514        state
515    }
516
517    fn apply(&self, input: Self::Input, send_cons: &LocalTracksConstraints) {
518        if input.negotiation_role.is_some() {
519            self.negotiation_role.set(input.negotiation_role);
520        }
521        if input.restart_ice {
522            self.restart_ice.set(true);
523        }
524        if let Some(sdp_offer) = input.local_sdp {
525            self.local_sdp.approved_set(sdp_offer);
526        } else {
527            self.negotiation_phase.set(NegotiationPhase::WaitLocalSdp);
528        }
529        self.remote_sdp.set(input.remote_sdp);
530        self.ice_candidates.apply(input.ice_candidates, send_cons);
531        self.senders.apply(input.senders, send_cons);
532        self.receivers.apply(input.receivers, send_cons);
533
534        self.sync_phase.set(SyncPhase::Synced);
535    }
536}
537
538impl Updatable for State {
539    fn when_stabilized(&self) -> AllProcessed<'static> {
540        medea_reactive::when_all_processed(vec![
541            self.senders.when_stabilized().into(),
542            self.receivers.when_stabilized().into(),
543        ])
544    }
545
546    fn when_updated(&self) -> AllProcessed<'static> {
547        medea_reactive::when_all_processed(vec![
548            self.receivers.when_updated().into(),
549            self.senders.when_updated().into(),
550        ])
551    }
552
553    fn connection_lost(&self) {
554        self.sync_phase.set(SyncPhase::Desynced);
555        self.senders.connection_lost();
556        self.receivers.connection_lost();
557    }
558
559    fn connection_recovered(&self) {
560        self.sync_phase.set(SyncPhase::Syncing);
561        self.senders.connection_recovered();
562        self.receivers.connection_recovered();
563    }
564}
565
566#[cfg(feature = "mockable")]
567// TODO: Try remove on next Rust version upgrade.
568#[expect(clippy::allow_attributes, reason = "`#[expect]` is not considered")]
569#[allow(clippy::multiple_inherent_impl, reason = "feature gated")]
570impl State {
571    /// Waits for a [`State::remote_sdp`] change to be applied.
572    pub async fn when_remote_sdp_processed(&self) {
573        self.remote_sdp.when_all_processed().await;
574    }
575
576    /// Resets a [`NegotiationRole`] of this [`State`] to [`None`].
577    pub fn reset_negotiation_role(&self) {
578        self.negotiation_phase.set(NegotiationPhase::Stable);
579        self.negotiation_role.set(None);
580    }
581
582    /// Returns the current [`NegotiationRole`] of this [`State`].
583    #[must_use]
584    pub fn negotiation_role(&self) -> Option<NegotiationRole> {
585        self.negotiation_role.get()
586    }
587
588    /// Returns a [`Future`] resolving once local SDP approve is needed.
589    pub fn when_local_sdp_approve_needed(
590        &self,
591    ) -> impl Future<Output = ()> + use<> {
592        use futures::FutureExt as _;
593
594        self.negotiation_phase
595            .when_eq(NegotiationPhase::WaitLocalSdpApprove)
596            .map(drop)
597    }
598
599    /// Stabilizes all [`receiver::State`]s of this [`State`].
600    pub fn stabilize_all(&self) {
601        self.receivers.stabilize_all();
602    }
603
604    /// Waits until a [`State::local_sdp`] is resolved and returns its new
605    /// value.
606    pub async fn when_local_sdp_updated(&self) -> Option<String> {
607        use futures::StreamExt as _;
608
609        self.local_sdp.subscribe().skip(1).next().await.flatten()
610    }
611
612    /// Waits until all [`State::senders`]' and [`State::receivers`]' inserts
613    /// are processed.
614    pub async fn when_all_tracks_created(&self) {
615        medea_reactive::when_all_processed(vec![
616            self.senders.when_insert_processed().into(),
617            self.receivers.when_insert_processed().into(),
618        ])
619        .await;
620    }
621
622    /// Sets [`State::sync_phase`] to the [`SyncPhase::Synced`].
623    pub fn synced(&self) {
624        self.senders.synced();
625        self.receivers.synced();
626        self.sync_phase.set(SyncPhase::Synced);
627    }
628}