Skip to main content

medea_jason/peer/
mod.rs

1//! Adapters to [RTCPeerConnection][1] and related objects.
2//!
3//! [1]: https://w3.org/TR/webrtc#rtcpeerconnection-interface
4
5mod component;
6pub mod media;
7pub mod repo;
8mod stream_update_criteria;
9mod tracks_request;
10
11use std::{
12    cell::{Cell, RefCell},
13    collections::{HashMap, hash_map::DefaultHasher},
14    hash::{Hash as _, Hasher as _},
15    rc::Rc,
16};
17
18use derive_more::with_trait::{Display, From};
19use futures::{StreamExt as _, channel::mpsc, future};
20use medea_client_api_proto::{
21    Command, ConnectionMode, IceConnectionState, MediaSourceKind, MemberId,
22    PeerConnectionState, PeerId as Id, PeerId, TrackId, TrackPatchCommand,
23    stats::StatId,
24};
25use medea_macro::dispatchable;
26use tracerr::Traced;
27
28#[doc(inline)]
29pub use self::{
30    component::{Component, DESCRIPTION_APPROVE_TIMEOUT, State},
31    media::{
32        GetMidsError, InsertLocalTracksError, MediaConnections,
33        MediaExchangeState, MediaExchangeStateController, MediaState,
34        MediaStateControllable, MuteState, MuteStateController,
35        ProhibitedStateError, TrackDirection, TransceiverSide,
36        TransitableState, TransitableStateController, media_exchange_state,
37        mute_state, receiver, sender,
38    },
39    platform::RtcPeerConnectionError,
40    stream_update_criteria::LocalStreamUpdateCriteria,
41    tracks_request::{SimpleTracksRequest, TracksRequest, TracksRequestError},
42};
43use crate::{
44    connection::Connections,
45    media::{
46        InitLocalTracksError, LocalTracksConstraints, MediaKind, MediaManager,
47        MediaStreamSettings, RecvConstraints,
48        track::{local, remote},
49    },
50    platform,
51    utils::Caused,
52};
53
54/// Errors occurring in [`PeerConnection::update_local_stream()`] method.
55#[derive(Caused, Clone, Debug, Display, From)]
56#[cause(error = platform::Error)]
57pub enum UpdateLocalStreamError {
58    /// Errors occurred when [`TracksRequest`] validation fails.
59    InvalidLocalTracks(TracksRequestError),
60
61    /// [`MediaManager`] failed to acquire [`local::Track`]s.
62    CouldNotGetLocalMedia(#[cause] InitLocalTracksError),
63
64    /// Errors occurred in [`MediaConnections::insert_local_tracks()`] method.
65    InsertLocalTracksError(#[cause] InsertLocalTracksError),
66}
67
68/// Events emitted from a [`Sender`] or a [`Receiver`].
69///
70/// [`Receiver`]: receiver::Receiver
71/// [`Sender`]: sender::Sender
72#[derive(Clone, Copy, Debug)]
73pub enum TrackEvent {
74    /// Intention of the `MediaTrack` to mute/unmute himself.
75    MuteUpdateIntention {
76        /// ID of the `MediaTrack` which sends this intention.
77        id: TrackId,
78
79        /// The muting intention itself.
80        muted: bool,
81    },
82
83    /// Intention of the `MediaTrack` to enabled/disable himself.
84    MediaExchangeIntention {
85        /// ID of the `MediaTrack` which sends this intention.
86        id: TrackId,
87
88        /// The enabling/disabling intention itself.
89        enabled: bool,
90    },
91}
92
93/// Local media update errors that [`PeerConnection`] reports in
94/// [`PeerEvent::FailedLocalMedia`] messages.
95#[derive(Caused, Clone, Debug, Display, From)]
96#[cause(error = platform::Error)]
97pub enum LocalMediaError {
98    /// Error occurred in [`PeerConnection::update_local_stream()`] method.
99    UpdateLocalStreamError(#[cause] UpdateLocalStreamError),
100
101    /// Error occurred when creating a new [`Sender`].
102    ///
103    /// [`Sender`]: sender::Sender
104    SenderCreateError(sender::CreateError),
105}
106
107/// Events emitted from [`platform::RtcPeerConnection`].
108#[dispatchable(self: &Self, async_trait(?Send))]
109#[derive(Clone, Debug)]
110pub enum PeerEvent {
111    /// [`platform::RtcPeerConnection`] discovered new ICE candidate.
112    ///
113    /// Wrapper around [RTCPeerConnectionIceEvent][1].
114    ///
115    /// [1]: https://w3.org/TR/webrtc#rtcpeerconnectioniceevent
116    IceCandidateDiscovered {
117        /// ID of the [`PeerConnection`] that discovered new ICE candidate.
118        peer_id: Id,
119
120        /// [`candidate` field][2] of the discovered [RTCIceCandidate][1].
121        ///
122        /// [1]: https://w3.org/TR/webrtc#dom-rtcicecandidate
123        /// [2]: https://w3.org/TR/webrtc#dom-rtcicecandidate-candidate
124        candidate: String,
125
126        /// [`sdpMLineIndex` field][2] of the discovered [RTCIceCandidate][1].
127        ///
128        /// [1]: https://w3.org/TR/webrtc#dom-rtcicecandidate
129        /// [2]: https://w3.org/TR/webrtc#dom-rtcicecandidate-sdpmlineindex
130        sdp_m_line_index: Option<u16>,
131
132        /// [`sdpMid` field][2] of the discovered [RTCIceCandidate][1].
133        ///
134        /// [1]: https://w3.org/TR/webrtc#dom-rtcicecandidate
135        /// [2]: https://w3.org/TR/webrtc#dom-rtcicecandidate-sdpmid
136        sdp_mid: Option<String>,
137    },
138
139    /// Error occurred with an [ICE] candidate from a [`PeerConnection`].
140    ///
141    /// [ICE]: https://webrtcglossary.com/ice
142    IceCandidateError {
143        /// ID of the [`PeerConnection`] that errored.
144        peer_id: Id,
145
146        /// Local IP address used to communicate with a [STUN]/[TURN]
147        /// server.
148        ///
149        /// [STUN]: https://webrtcglossary.com/stun
150        /// [TURN]: https://webrtcglossary.com/turn
151        address: Option<String>,
152
153        /// Port used to communicate with a [STUN]/[TURN] server.
154        ///
155        /// [STUN]: https://webrtcglossary.com/stun
156        /// [TURN]: https://webrtcglossary.com/turn
157        port: Option<u32>,
158
159        /// URL identifying the [STUN]/[TURN] server for which the failure
160        /// occurred.
161        ///
162        /// [STUN]: https://webrtcglossary.com/stun
163        /// [TURN]: https://webrtcglossary.com/turn
164        url: String,
165
166        /// Numeric [STUN] error code returned by the [STUN]/[TURN] server.
167        ///
168        /// If no host candidate can reach the server, this error code will be
169        /// set to the value `701`, which is outside the [STUN] error code
170        /// range. This error is only fired once per server URL while in the
171        /// `RTCIceGatheringState` of "gathering".
172        ///
173        /// [STUN]: https://webrtcglossary.com/stun
174        /// [TURN]: https://webrtcglossary.com/turn
175        error_code: i32,
176
177        /// [STUN] reason text returned by the [STUN]/[TURN] server.
178        ///
179        /// If the server could not be reached, this reason test will be set to
180        /// an implementation-specific value providing details about
181        /// the error.
182        ///
183        /// [STUN]: https://webrtcglossary.com/stun
184        /// [TURN]: https://webrtcglossary.com/turn
185        error_text: String,
186    },
187
188    /// [`platform::RtcPeerConnection`] received a new [`remote::Track`] from
189    /// a remote sender.
190    NewRemoteTrack {
191        /// Remote `Member` ID.
192        sender_id: MemberId,
193
194        /// Received [`remote::Track`].
195        track: remote::Track,
196    },
197
198    /// [`platform::RtcPeerConnection`] sent new local track to remote members.
199    NewLocalTrack {
200        /// Local [`local::Track`] that is sent to remote members.
201        local_track: Rc<local::Track>,
202    },
203
204    /// [`platform::RtcPeerConnection`]'s [ICE connection][1] state changed.
205    ///
206    /// [1]: https://w3.org/TR/webrtc#dfn-ice-connection-state
207    IceConnectionStateChanged {
208        /// ID of the [`PeerConnection`] that sends
209        /// [`iceconnectionstatechange`][1] event.
210        ///
211        /// [1]: https://w3.org/TR/webrtc#event-iceconnectionstatechange
212        peer_id: Id,
213
214        /// New [`IceConnectionState`].
215        ice_connection_state: IceConnectionState,
216    },
217
218    /// [`platform::RtcPeerConnection`]'s [connection][1] state changed.
219    ///
220    /// [1]: https://w3.org/TR/webrtc#dom-peerconnection-connection-state
221    PeerConnectionStateChanged {
222        /// ID of the [`PeerConnection`] that sends
223        /// [`connectionstatechange`][1] event.
224        ///
225        /// [1]: https://w3.org/TR/webrtc#event-connectionstatechange
226        peer_id: Id,
227
228        /// New [`PeerConnectionState`].
229        peer_connection_state: PeerConnectionState,
230    },
231
232    /// [`platform::RtcPeerConnection`]'s [iceGatheringState][1] property has
233    /// changed.
234    ///
235    /// [1]: https://w3.org/TR/webrtc#dom-peerconnection-ice-gathering-state
236    IceGatheringStateChanged {
237        /// ID of the [`PeerConnection`] that sends
238        /// [`icegatheringstatechange`][1] event.
239        ///
240        /// [1]: https://w3.org/TR/webrtc#event-icegatheringstatechange
241        peer_id: Id,
242
243        /// New [`platform::IceGatheringState`].
244        ice_gathering_state: platform::IceGatheringState,
245    },
246
247    /// [`platform::RtcPeerConnection`]'s [`platform::RtcStats`] update.
248    StatsUpdate {
249        /// ID of the [`PeerConnection`] for which [` platform::RtcStats`] was
250        /// sent.
251        peer_id: Id,
252
253        /// [` platform::RtcStats`] of this [`PeerConnection`].
254        stats: platform::RtcStats,
255    },
256
257    /// [`PeerConnection::update_local_stream`] was failed, so
258    /// `on_failed_local_stream` callback should be called.
259    FailedLocalMedia {
260        /// Reasons of local media updating fail.
261        error: Traced<LocalMediaError>,
262    },
263
264    /// [`Component`] generated a new SDP answer.
265    NewSdpAnswer {
266        /// ID of the [`PeerConnection`] for which SDP answer was generated.
267        peer_id: PeerId,
268
269        /// SDP Answer of the `Peer`.
270        sdp_answer: String,
271
272        /// Statuses of `Peer` transceivers.
273        transceivers_statuses: HashMap<TrackId, bool>,
274    },
275
276    /// [`Component`] generated a new SDP offer.
277    NewSdpOffer {
278        /// ID of the [`PeerConnection`] for which SDP offer was generated.
279        peer_id: PeerId,
280
281        /// SDP Offer of the [`PeerConnection`].
282        sdp_offer: String,
283
284        /// Associations between `Track` and transceiver's
285        /// [media description][1].
286        ///
287        /// `mid` is basically an ID of [`m=<media>` section][1] in SDP.
288        ///
289        /// [1]: https://tools.ietf.org/html/rfc4566#section-5.14
290        mids: HashMap<TrackId, String>,
291
292        /// Statuses of [`PeerConnection`] transceivers.
293        transceivers_statuses: HashMap<TrackId, bool>,
294    },
295
296    /// [`Component`] resends his intentions.
297    MediaUpdateCommand {
298        /// Actual intentions of the [`Component`].
299        command: Command,
300    },
301}
302
303/// High-level wrapper around a [`platform::RtcPeerConnection`].
304#[derive(Debug)]
305pub struct PeerConnection {
306    /// Unique ID of [`PeerConnection`].
307    id: Id,
308
309    /// Underlying [`platform::RtcPeerConnection`].
310    peer: Rc<platform::RtcPeerConnection>,
311
312    /// [`sender::Component`]s and [`receiver::Component`]s of this
313    /// [`platform::RtcPeerConnection`].
314    media_connections: Rc<MediaConnections>,
315
316    /// [`MediaManager`] that will be used to acquire [`local::Track`]s.
317    media_manager: Rc<MediaManager>,
318
319    /// [`PeerEvent`]s tx.
320    peer_events_sender: Rc<mpsc::UnboundedSender<PeerEvent>>,
321
322    /// Indicator whether the underlying [`platform::RtcPeerConnection`] has a
323    /// remote description.
324    has_remote_description: Cell<bool>,
325
326    /// Buffer of [`platform::IceCandidate`]s received before a remote
327    /// description for the underlying [`platform::RtcPeerConnection`].
328    ice_candidates_buffer: RefCell<Vec<platform::IceCandidate>>,
329
330    /// Last hashes of all the [`platform::RtcStats`] which were already sent
331    /// to a server, so we won't duplicate stats that were already sent.
332    ///
333    /// Stores precomputed hashes, since we don't need access to actual stats
334    /// values.
335    sent_stats_cache: RefCell<HashMap<StatId, u64>>,
336
337    /// Local media stream constraints used in this [`PeerConnection`].
338    send_constraints: LocalTracksConstraints,
339
340    /// Collection of [`Connection`]s with a remote `Member`s.
341    ///
342    /// [`Connection`]: crate::connection::Connection
343    connections: Rc<Connections>,
344
345    /// Sender for the [`TrackEvent`]s which should be processed by this
346    /// [`PeerConnection`].
347    track_events_sender: mpsc::UnboundedSender<TrackEvent>,
348
349    /// Constraints to the [`remote::Track`] from this [`PeerConnection`]. Used
350    /// to disable or enable media receiving.
351    recv_constraints: Rc<RecvConstraints>,
352}
353
354impl PeerConnection {
355    /// Creates a new [`PeerConnection`].
356    ///
357    /// Provided `peer_events_sender` will be used to emit [`PeerEvent`]s from
358    /// this peer.
359    ///
360    /// Provided `ice_servers` will be used by the created
361    /// [`platform::RtcPeerConnection`].
362    ///
363    /// # Errors
364    ///
365    /// Errors with an [`RtcPeerConnectionError::PeerCreationError`] if
366    /// [`platform::RtcPeerConnection`] creating fails.
367    pub async fn new(
368        state: &State,
369        peer_events_sender: mpsc::UnboundedSender<PeerEvent>,
370        media_manager: Rc<MediaManager>,
371        send_constraints: LocalTracksConstraints,
372        connections: Rc<Connections>,
373        recv_constraints: Rc<RecvConstraints>,
374    ) -> Result<Rc<Self>, Traced<RtcPeerConnectionError>> {
375        let peer = Rc::new(
376            platform::RtcPeerConnection::new(
377                state.ice_servers().clone(),
378                state.force_relay(),
379            )
380            .await
381            .map_err(tracerr::map_from_and_wrap!())?,
382        );
383        let (track_events_sender, mut track_events_rx) = mpsc::unbounded();
384        let media_connections = Rc::new(MediaConnections::new(
385            Rc::clone(&peer),
386            peer_events_sender.clone(),
387        ));
388
389        platform::spawn({
390            let peer_events_sender = peer_events_sender.clone();
391            let peer_id = state.id();
392
393            async move {
394                while let Some(e) = track_events_rx.next().await {
395                    Self::handle_track_event(peer_id, &peer_events_sender, e);
396                }
397            }
398        });
399
400        let peer = Self {
401            id: state.id(),
402            peer,
403            media_connections,
404            media_manager,
405            peer_events_sender: Rc::new(peer_events_sender),
406            sent_stats_cache: RefCell::new(HashMap::new()),
407            has_remote_description: Cell::new(false),
408            ice_candidates_buffer: RefCell::new(Vec::new()),
409            send_constraints,
410            connections,
411            track_events_sender,
412            recv_constraints,
413        };
414
415        peer.bind_event_listeners(state);
416
417        Ok(Rc::new(peer))
418    }
419
420    /// Binds all the necessary event listeners to this [`PeerConnection`].
421    fn bind_event_listeners(&self, state: &State) {
422        // Bind to `icecandidate` event.
423        {
424            let id = self.id;
425            let weak_sender = Rc::downgrade(&self.peer_events_sender);
426            self.peer.on_ice_candidate(Some(move |candidate| {
427                if let Some(sender) = weak_sender.upgrade() {
428                    Self::on_ice_candidate(id, &sender, candidate);
429                }
430            }));
431        }
432
433        // Bind to `icecandidateerror` event.
434        {
435            let id = self.id;
436            let weak_sender = Rc::downgrade(&self.peer_events_sender);
437            self.peer.on_ice_candidate_error(Some(move |error| {
438                if let Some(sender) = weak_sender.upgrade() {
439                    Self::on_ice_candidate_error(id, &sender, error);
440                }
441            }));
442        }
443
444        // Bind to `iceconnectionstatechange` event.
445        {
446            let id = self.id;
447            let weak_sender = Rc::downgrade(&self.peer_events_sender);
448            self.peer.on_ice_connection_state_change(Some(
449                move |ice_connection_state| {
450                    if let Some(sender) = weak_sender.upgrade() {
451                        Self::on_ice_connection_state_changed(
452                            id,
453                            &sender,
454                            ice_connection_state,
455                        );
456                    }
457                },
458            ));
459        }
460
461        // Bind to `connectionstatechange` event.
462        {
463            let id = self.id;
464            let weak_sender = Rc::downgrade(&self.peer_events_sender);
465            self.peer.on_connection_state_change(Some(
466                move |peer_connection_state| {
467                    if let Some(sender) = weak_sender.upgrade() {
468                        Self::on_connection_state_changed(
469                            id,
470                            &sender,
471                            peer_connection_state,
472                        );
473                    }
474                },
475            ));
476        }
477
478        // Bind to `track` event.
479        {
480            let media_conns = Rc::downgrade(&self.media_connections);
481            let connection_mode = state.connection_mode();
482            self.peer.on_track(Some(move |track, transceiver| {
483                if let Some(c) = media_conns.upgrade() {
484                    platform::spawn(async move {
485                        if let (Err(mid), ConnectionMode::Mesh) = (
486                            c.add_remote_track(track, transceiver).await,
487                            connection_mode,
488                        ) {
489                            log::error!(
490                                "Cannot add new remote track with mid={mid}",
491                            );
492                        }
493                    });
494                }
495            }));
496        }
497
498        {
499            let id = self.id;
500            let weak_sender = Rc::downgrade(&self.peer_events_sender);
501            self.peer.on_ice_gathering_state_change(Some(
502                move |ice_gathering_state_change| {
503                    if let Some(sender) = weak_sender.upgrade() {
504                        Self::on_ice_gathering_state_change(
505                            id,
506                            &sender,
507                            ice_gathering_state_change,
508                        );
509                    }
510                },
511            ));
512        }
513    }
514
515    /// Handles [`TrackEvent`]s emitted from a [`Sender`] or a [`Receiver`].
516    ///
517    /// Sends a [`PeerEvent::MediaUpdateCommand`] with a
518    /// [`Command::UpdateTracks`] on [`TrackEvent::MediaExchangeIntention`] and
519    /// [`TrackEvent::MuteUpdateIntention`].
520    ///
521    /// [`Sender`]: sender::Sender
522    /// [`Receiver`]: receiver::Receiver
523    fn handle_track_event(
524        peer_id: PeerId,
525        peer_events_sender: &mpsc::UnboundedSender<PeerEvent>,
526        event: TrackEvent,
527    ) {
528        let patch = match event {
529            TrackEvent::MediaExchangeIntention { id, enabled } => {
530                TrackPatchCommand { id, muted: None, enabled: Some(enabled) }
531            }
532            TrackEvent::MuteUpdateIntention { id, muted } => {
533                TrackPatchCommand { id, muted: Some(muted), enabled: None }
534            }
535        };
536
537        _ = peer_events_sender
538            .unbounded_send(PeerEvent::MediaUpdateCommand {
539                command: Command::UpdateTracks {
540                    peer_id,
541                    tracks_patches: vec![patch],
542                },
543            })
544            .ok();
545    }
546
547    /// Returns all [`TrackId`]s of [`Sender`]s that match the provided
548    /// [`LocalStreamUpdateCriteria`] and don't have [`local::Track`].
549    ///
550    /// [`Sender`]: sender::Sender
551    #[must_use]
552    pub fn get_senders_without_tracks_ids(
553        &self,
554        kinds: LocalStreamUpdateCriteria,
555    ) -> Vec<TrackId> {
556        self.media_connections.get_senders_without_tracks_ids(kinds)
557    }
558
559    /// Drops [`local::Track`]s of all [`Sender`]s which are matches provided
560    /// [`LocalStreamUpdateCriteria`].
561    ///
562    /// [`Sender`]: sender::Sender
563    pub async fn drop_send_tracks(&self, kinds: LocalStreamUpdateCriteria) {
564        self.media_connections.drop_send_tracks(kinds).await;
565    }
566
567    /// Filters out already sent stats, and send new stats from the provided
568    /// [`platform::RtcStats`].
569    pub fn send_peer_stats(&self, stats: platform::RtcStats) {
570        let mut stats_cache = self.sent_stats_cache.borrow_mut();
571        let stats = platform::RtcStats(
572            stats
573                .0
574                .into_iter()
575                .filter(|stat| {
576                    let mut hasher = DefaultHasher::new();
577                    stat.stats.hash(&mut hasher);
578                    let stat_hash = hasher.finish();
579
580                    #[expect( // false positive
581                        clippy::option_if_let_else,
582                        reason = "false positive: &mut"
583                    )]
584                    if let Some(last_hash) = stats_cache.get_mut(&stat.id) {
585                        if *last_hash == stat_hash {
586                            false
587                        } else {
588                            *last_hash = stat_hash;
589                            true
590                        }
591                    } else {
592                        _ = stats_cache.insert(stat.id.clone(), stat_hash);
593                        true
594                    }
595                })
596                .collect(),
597        );
598
599        if !stats.0.is_empty() {
600            drop(self.peer_events_sender.unbounded_send(
601                PeerEvent::StatsUpdate { peer_id: self.id, stats },
602            ));
603        }
604    }
605
606    /// Sends [`platform::RtcStats`] update of this [`PeerConnection`] to a
607    /// server.
608    pub async fn scrape_and_send_peer_stats(&self) {
609        match self.peer.get_stats().await {
610            Ok(stats) => self.send_peer_stats(stats),
611            Err(e) => log::error!("{e}"),
612        }
613    }
614
615    /// Indicates whether all [`TransceiverSide`]s with the provided
616    /// [`MediaKind`], [`TrackDirection`] and [`MediaSourceKind`] are in the
617    /// provided [`MediaState`].
618    #[must_use]
619    pub fn is_all_transceiver_sides_in_media_state(
620        &self,
621        kind: MediaKind,
622        direction: TrackDirection,
623        source_kind: Option<MediaSourceKind>,
624        state: MediaState,
625    ) -> bool {
626        self.media_connections.is_all_tracks_in_media_state(
627            kind,
628            direction,
629            source_kind,
630            state,
631        )
632    }
633
634    /// Returns the [`PeerId`] of this [`PeerConnection`].
635    pub const fn id(&self) -> PeerId {
636        self.id
637    }
638
639    /// Handle `icecandidate` event from the underlying peer emitting
640    /// [`PeerEvent::IceCandidateDiscovered`] event into this peer's
641    /// `peer_events_sender`.
642    fn on_ice_candidate(
643        id: Id,
644        sender: &mpsc::UnboundedSender<PeerEvent>,
645        candidate: platform::IceCandidate,
646    ) {
647        drop(sender.unbounded_send(PeerEvent::IceCandidateDiscovered {
648            peer_id: id,
649            candidate: candidate.candidate,
650            sdp_m_line_index: candidate.sdp_m_line_index,
651            sdp_mid: candidate.sdp_mid,
652        }));
653    }
654
655    /// Handle `icecandidateerror` event from the underlying peer emitting
656    /// [`PeerEvent::IceCandidateError`] event into this peer's
657    /// `peer_events_sender`.
658    fn on_ice_candidate_error(
659        id: Id,
660        sender: &mpsc::UnboundedSender<PeerEvent>,
661        error: platform::IceCandidateError,
662    ) {
663        drop(sender.unbounded_send(PeerEvent::IceCandidateError {
664            peer_id: id,
665            address: error.address,
666            port: error.port,
667            url: error.url,
668            error_code: error.error_code,
669            error_text: error.error_text,
670        }));
671    }
672
673    /// Handle `iceconnectionstatechange` event from the underlying peer
674    /// emitting [`PeerEvent::IceConnectionStateChanged`] event into this peer's
675    /// `peer_events_sender`.
676    fn on_ice_connection_state_changed(
677        peer_id: Id,
678        sender: &mpsc::UnboundedSender<PeerEvent>,
679        ice_connection_state: IceConnectionState,
680    ) {
681        drop(sender.unbounded_send(PeerEvent::IceConnectionStateChanged {
682            peer_id,
683            ice_connection_state,
684        }));
685    }
686
687    /// Handles `connectionstatechange` event from the underlying peer emitting
688    /// [`PeerEvent::PeerConnectionStateChanged`] event into this peer's
689    /// `peer_events_sender`.
690    fn on_connection_state_changed(
691        peer_id: Id,
692        sender: &mpsc::UnboundedSender<PeerEvent>,
693        peer_connection_state: PeerConnectionState,
694    ) {
695        drop(sender.unbounded_send(PeerEvent::PeerConnectionStateChanged {
696            peer_id,
697            peer_connection_state,
698        }));
699    }
700
701    /// Handles `icegatheringstatechange` event from the underlying peer
702    /// emitting [`PeerEvent::IceGatheringStateChanged`] event into this peer's
703    /// `peer_events_sender`.
704    fn on_ice_gathering_state_change(
705        peer_id: Id,
706        sender: &mpsc::UnboundedSender<PeerEvent>,
707        ice_gathering_state: platform::IceGatheringState,
708    ) {
709        drop(sender.unbounded_send(PeerEvent::IceGatheringStateChanged {
710            peer_id,
711            ice_gathering_state,
712        }));
713    }
714
715    /// Marks [`PeerConnection`] to trigger ICE restart.
716    ///
717    /// After this function returns, the generated offer is automatically
718    /// configured to trigger ICE restart.
719    fn restart_ice(&self) {
720        self.peer.restart_ice();
721    }
722
723    /// Returns all [`TransceiverSide`]s from this [`PeerConnection`] with
724    /// provided [`MediaKind`], [`TrackDirection`] and [`MediaSourceKind`].
725    pub fn get_transceivers_sides(
726        &self,
727        kind: MediaKind,
728        direction: TrackDirection,
729        source_kind: Option<MediaSourceKind>,
730    ) -> Vec<Rc<dyn TransceiverSide>> {
731        self.media_connections.get_transceivers_sides(
732            kind,
733            direction,
734            source_kind,
735        )
736    }
737
738    /// Track id to mid relations of all send tracks of this
739    /// [`platform::RtcPeerConnection`]. mid is id of [`m= section`][1]. mids
740    /// are received directly from registered [`RTCRtpTransceiver`][2]s, and
741    /// are being allocated on SDP update.
742    ///
743    /// # Errors
744    ///
745    /// Errors if finds transceiver without mid, so must be called after setting
746    /// local description if offerer, and remote if answerer.
747    ///
748    /// [1]: https://tools.ietf.org/html/rfc4566#section-5.14
749    /// [2]: https://w3.org/TR/webrtc#rtcrtptransceiver-interface
750    fn get_mids(
751        &self,
752    ) -> Result<HashMap<TrackId, String>, Traced<GetMidsError>> {
753        self.media_connections.get_mids().map_err(tracerr::wrap!())
754    }
755
756    /// Returns publishing statuses of the all [`Sender`]s from this
757    /// [`MediaConnections`].
758    ///
759    /// [`Sender`]: sender::Sender
760    async fn get_transceivers_statuses(&self) -> HashMap<TrackId, bool> {
761        self.media_connections.get_transceivers_statuses().await
762    }
763
764    /// Updates [`local::Track`]s being used in [`PeerConnection`]s [`Sender`]s.
765    /// [`Sender`]s are chosen based on the provided
766    /// [`LocalStreamUpdateCriteria`].
767    ///
768    /// First of all makes sure that [`PeerConnection`] [`Sender`]s are
769    /// up-to-date and synchronized with a real object state. If there are no
770    /// [`Sender`]s configured in this [`PeerConnection`], then this method is
771    /// no-op.
772    ///
773    /// Secondly, make sure that configured [`LocalTracksConstraints`] are up to
774    /// date.
775    ///
776    /// This function requests local stream from [`MediaManager`]. If stream
777    /// returned from [`MediaManager`] is considered new, then this function
778    /// will emit [`PeerEvent::NewLocalTrack`] events.
779    ///
780    /// Constraints being used when requesting stream from [`MediaManager`] are
781    /// a result of merging constraints received from this [`PeerConnection`]
782    /// [`Sender`]s, which are configured by server during signalling, and
783    /// [`LocalTracksConstraints`].
784    ///
785    /// Returns [`HashMap`] with [`media_exchange_state::Stable`]s updates for
786    /// the [`Sender`]s.
787    ///
788    /// # Errors
789    ///
790    /// With an [`UpdateLocalStreamError::InvalidLocalTracks`] if the current
791    /// state of the [`PeerConnection`]'s [`Sender`]s cannot be represented as
792    /// [`SimpleTracksRequest`] (max 1 audio [`Sender`] and max 2 video
793    /// [`Sender`]s), or the [`local::Track`]s requested from the
794    /// [`MediaManager`] doesn't satisfy [`Sender`]'s constraints.
795    ///
796    /// With an [`UpdateLocalStreamError::CouldNotGetLocalMedia`] if the
797    /// [`local::Track`]s cannot be obtained from the UA.
798    ///
799    /// With an [`UpdateLocalStreamError::InvalidLocalTracks`] if the
800    /// [`local::Track`]s cannot be inserted into [`PeerConnection`]s
801    /// [`Sender`]s.
802    ///
803    /// [`Sender`]: sender::Sender
804    /// [1]: https://w3.org/TR/mediacapture-streams#mediastream
805    /// [2]: https://w3.org/TR/webrtc#rtcpeerconnection-interface
806    pub async fn update_local_stream(
807        &self,
808        criteria: LocalStreamUpdateCriteria,
809    ) -> Result<
810        HashMap<TrackId, media_exchange_state::Stable>,
811        Traced<UpdateLocalStreamError>,
812    > {
813        self.inner_update_local_stream(criteria).await.inspect_err(|e| {
814            drop(self.peer_events_sender.unbounded_send(
815                PeerEvent::FailedLocalMedia {
816                    error: tracerr::map_from(e.clone()),
817                },
818            ));
819        })
820    }
821
822    /// Returns [`MediaStreamSettings`] for the provided [`MediaKind`] and
823    /// [`MediaSourceKind`].
824    ///
825    /// If [`MediaSourceKind`] is [`None`] then [`MediaStreamSettings`] for all
826    /// [`MediaSourceKind`]s will be provided.
827    ///
828    /// # Errors
829    ///
830    /// Errors with a [`TracksRequestError`] if failed to create or merge
831    /// [`SimpleTracksRequest`].
832    pub fn get_media_settings(
833        &self,
834        kind: MediaKind,
835        source_kind: Option<MediaSourceKind>,
836    ) -> Result<Option<MediaStreamSettings>, Traced<TracksRequestError>> {
837        let mut criteria = LocalStreamUpdateCriteria::empty();
838        if let Some(msk) = source_kind {
839            criteria.add(kind, msk);
840        } else {
841            criteria.add(kind, MediaSourceKind::Device);
842            criteria.add(kind, MediaSourceKind::Display);
843        }
844
845        self.get_simple_tracks_request(criteria)
846            .map_err(tracerr::map_from_and_wrap!())
847            .map(|opt| opt.map(|s| MediaStreamSettings::from(&s)))
848    }
849
850    /// Returns [`SimpleTracksRequest`] for the provided
851    /// [`LocalStreamUpdateCriteria`].
852    ///
853    /// # Errors
854    ///
855    /// Errors with a [`TracksRequestError`] if failed to create or merge
856    /// [`SimpleTracksRequest`].
857    fn get_simple_tracks_request(
858        &self,
859        criteria: LocalStreamUpdateCriteria,
860    ) -> Result<Option<SimpleTracksRequest>, Traced<TracksRequestError>> {
861        let Some(request) = self.media_connections.get_tracks_request(criteria)
862        else {
863            return Ok(None);
864        };
865        let mut required_caps = SimpleTracksRequest::try_from(request)
866            .map_err(tracerr::from_and_wrap!())?;
867        required_caps
868            .merge(self.send_constraints.inner())
869            .map_err(tracerr::map_from_and_wrap!())?;
870
871        Ok(Some(required_caps))
872    }
873
874    /// Implementation of the [`PeerConnection::update_local_stream`] method.
875    async fn inner_update_local_stream(
876        &self,
877        criteria: LocalStreamUpdateCriteria,
878    ) -> Result<
879        HashMap<TrackId, media_exchange_state::Stable>,
880        Traced<UpdateLocalStreamError>,
881    > {
882        if let Some(required_caps) = self
883            .get_simple_tracks_request(criteria)
884            .map_err(tracerr::map_from_and_wrap!())?
885        {
886            let used_caps = MediaStreamSettings::from(&required_caps);
887
888            let media_tracks = self
889                .media_manager
890                .get_tracks(used_caps)
891                .await
892                .map_err(tracerr::map_from_and_wrap!())?;
893            let peer_tracks = required_caps
894                .parse_tracks(
895                    media_tracks.iter().map(|(t, _)| t).cloned().collect(),
896                )
897                .await
898                .map_err(tracerr::map_from_and_wrap!())?;
899
900            let media_exchange_states_updates = self
901                .media_connections
902                .insert_local_tracks(&peer_tracks)
903                .await
904                .map_err(tracerr::map_from_and_wrap!())?;
905
906            for (local_track, is_new) in media_tracks {
907                if is_new {
908                    drop(self.peer_events_sender.unbounded_send(
909                        PeerEvent::NewLocalTrack { local_track },
910                    ));
911                }
912            }
913
914            Ok(media_exchange_states_updates)
915        } else {
916            Ok(HashMap::new())
917        }
918    }
919
920    /// Returns [`Rc`] to [`TransceiverSide`] with a provided [`TrackId`].
921    ///
922    /// Returns [`None`] if [`TransceiverSide`] with a provided [`TrackId`]
923    /// doesn't exist in this [`PeerConnection`].
924    pub fn get_transceiver_side_by_id(
925        &self,
926        track_id: TrackId,
927    ) -> Option<Rc<dyn TransceiverSide>> {
928        self.media_connections.get_transceiver_side_by_id(track_id)
929    }
930
931    /// Updates underlying [RTCPeerConnection][1]'s remote SDP from answer.
932    ///
933    /// # Errors
934    ///
935    /// With [`RtcPeerConnectionError::SetRemoteDescriptionFailed`][3] if
936    /// [RTCPeerConnection.setRemoteDescription()][2] fails.
937    ///
938    /// [1]: https://w3.org/TR/webrtc#rtcpeerconnection-interface
939    /// [2]: https://w3.org/TR/webrtc#dom-peerconnection-setremotedescription
940    /// [3]: platform::RtcPeerConnectionError::SetRemoteDescriptionFailed
941    async fn set_remote_answer(
942        &self,
943        answer: String,
944    ) -> Result<(), Traced<RtcPeerConnectionError>> {
945        self.set_remote_description(platform::SdpType::Answer(answer))
946            .await
947            .map_err(tracerr::wrap!())
948    }
949
950    /// Updates underlying [RTCPeerConnection][1]'s remote SDP from offer.
951    ///
952    /// # Errors
953    ///
954    /// With [`platform::RtcPeerConnectionError::SetRemoteDescriptionFailed`] if
955    /// [RTCPeerConnection.setRemoteDescription()][2] fails.
956    ///
957    /// [1]: https://w3.org/TR/webrtc#rtcpeerconnection-interface
958    /// [2]: https://w3.org/TR/webrtc#dom-peerconnection-setremotedescription
959    async fn set_remote_offer(
960        &self,
961        offer: String,
962    ) -> Result<(), Traced<RtcPeerConnectionError>> {
963        self.set_remote_description(platform::SdpType::Offer(offer))
964            .await
965            .map_err(tracerr::wrap!())
966    }
967
968    /// Updates underlying [RTCPeerConnection][1]'s remote SDP with given
969    /// description.
970    ///
971    /// # Errors
972    ///
973    /// With [`platform::RtcPeerConnectionError::SetRemoteDescriptionFailed`] if
974    /// [RTCPeerConnection.setRemoteDescription()][2] fails.
975    ///
976    /// With [`platform::RtcPeerConnectionError::AddIceCandidateFailed`] if
977    /// [RtcPeerConnection.addIceCandidate()][3] fails when adding buffered ICE
978    /// candidates.
979    ///
980    /// [1]: https://w3.org/TR/webrtc#rtcpeerconnection-interface
981    /// [2]: https://w3.org/TR/webrtc#dom-peerconnection-setremotedescription
982    /// [3]: https://w3.org/TR/webrtc#dom-peerconnection-addicecandidate
983    async fn set_remote_description(
984        &self,
985        desc: platform::SdpType,
986    ) -> Result<(), Traced<RtcPeerConnectionError>> {
987        self.peer
988            .set_remote_description(desc)
989            .await
990            .map_err(tracerr::map_from_and_wrap!())?;
991        self.has_remote_description.set(true);
992        self.media_connections.sync_receivers().await;
993
994        let ice_candidates_buffer_flush_fut = future::try_join_all(
995            self.ice_candidates_buffer.borrow_mut().drain(..).map(
996                |candidate| {
997                    let peer = Rc::clone(&self.peer);
998                    async move {
999                        peer.add_ice_candidate(
1000                            &candidate.candidate,
1001                            candidate.sdp_m_line_index,
1002                            &candidate.sdp_mid,
1003                        )
1004                        .await
1005                    }
1006                },
1007            ),
1008        );
1009        ice_candidates_buffer_flush_fut
1010            .await
1011            .map(drop)
1012            .map_err(tracerr::map_from_and_wrap!())?;
1013
1014        Ok(())
1015    }
1016
1017    /// Adds remote peers [ICE Candidate][1] to this peer.
1018    ///
1019    /// # Errors
1020    ///
1021    /// With [`RtcPeerConnectionError::AddIceCandidateFailed`] if
1022    /// [RtcPeerConnection.addIceCandidate()][3] fails to add buffered
1023    /// [ICE candidates][1].
1024    ///
1025    /// [1]: https://tools.ietf.org/html/rfc5245#section-2
1026    /// [3]: https://w3.org/TR/webrtc#dom-peerconnection-addicecandidate
1027    pub async fn add_ice_candidate(
1028        &self,
1029        candidate: String,
1030        sdp_m_line_index: Option<u16>,
1031        sdp_mid: Option<String>,
1032    ) -> Result<(), Traced<RtcPeerConnectionError>> {
1033        if self.has_remote_description.get() {
1034            self.peer
1035                .add_ice_candidate(&candidate, sdp_m_line_index, &sdp_mid)
1036                .await
1037                .map_err(tracerr::map_from_and_wrap!())?;
1038        } else {
1039            self.ice_candidates_buffer.borrow_mut().push(
1040                platform::IceCandidate { candidate, sdp_m_line_index, sdp_mid },
1041            );
1042        }
1043        Ok(())
1044    }
1045
1046    /// Removes a [`sender::Component`] and a [`receiver::Component`] with the
1047    /// provided [`TrackId`] from this [`PeerConnection`].
1048    pub fn remove_track(&self, track_id: TrackId) {
1049        self.media_connections.remove_track(track_id);
1050    }
1051
1052    /// Returns the [`PeerConnectionState`] of this [`PeerConnection`].
1053    pub fn connection_state(&self) -> PeerConnectionState {
1054        self.peer.connection_state()
1055    }
1056}
1057
1058#[cfg(feature = "mockable")]
1059#[expect(clippy::multiple_inherent_impl, reason = "feature gated")]
1060impl PeerConnection {
1061    /// Returns [`RtcStats`] of this [`PeerConnection`].
1062    ///
1063    /// # Errors
1064    ///
1065    /// Errors with [`PeerError::RtcPeerConnection`] if failed to get
1066    /// [`RtcStats`].
1067    pub async fn get_stats(
1068        &self,
1069    ) -> Result<platform::RtcStats, Traced<RtcPeerConnectionError>> {
1070        self.peer.get_stats().await
1071    }
1072
1073    /// Indicates whether all [`Receiver`]s audio tracks are enabled.
1074    #[must_use]
1075    pub fn is_recv_audio_enabled(&self) -> bool {
1076        self.media_connections.is_recv_audio_enabled()
1077    }
1078
1079    /// Indicates whether all [`Receiver`]s video tracks are enabled.
1080    #[must_use]
1081    pub fn is_recv_video_enabled(&self) -> bool {
1082        self.media_connections.is_recv_video_enabled()
1083    }
1084
1085    /// Returns inner [`IceCandidate`]'s buffer length. Used in tests.
1086    #[must_use]
1087    pub fn candidates_buffer_len(&self) -> usize {
1088        self.ice_candidates_buffer.borrow().len()
1089    }
1090
1091    /// Lookups [`Sender`] by provided [`TrackId`].
1092    #[must_use]
1093    pub fn get_sender_by_id(&self, id: TrackId) -> Option<Rc<media::Sender>> {
1094        self.media_connections.get_sender_by_id(id)
1095    }
1096
1097    /// Lookups [`sender::State`] by the provided [`TrackId`].
1098    #[must_use]
1099    pub fn get_sender_state_by_id(
1100        &self,
1101        id: TrackId,
1102    ) -> Option<Rc<sender::State>> {
1103        self.media_connections.get_sender_state_by_id(id)
1104    }
1105
1106    /// Indicates whether all [`Sender`]s audio tracks are enabled.
1107    #[must_use]
1108    pub fn is_send_audio_enabled(&self) -> bool {
1109        self.media_connections.is_send_audio_enabled()
1110    }
1111
1112    /// Indicates whether all [`Sender`]s video tracks are enabled.
1113    #[must_use]
1114    pub fn is_send_video_enabled(
1115        &self,
1116        source_kind: Option<MediaSourceKind>,
1117    ) -> bool {
1118        self.media_connections.is_send_video_enabled(source_kind)
1119    }
1120
1121    /// Indicates whether all [`Sender`]s video tracks are unmuted.
1122    #[must_use]
1123    pub fn is_send_video_unmuted(
1124        &self,
1125        source_kind: Option<MediaSourceKind>,
1126    ) -> bool {
1127        self.media_connections.is_send_video_unmuted(source_kind)
1128    }
1129
1130    /// Indicates whether all [`Sender`]s audio tracks are unmuted.
1131    #[must_use]
1132    pub fn is_send_audio_unmuted(&self) -> bool {
1133        self.media_connections.is_send_audio_unmuted()
1134    }
1135
1136    /// Returns all [`local::Track`]s from [`PeerConnection`]'s
1137    /// [`Transceiver`]s.
1138    #[must_use]
1139    pub fn get_send_tracks(&self) -> Vec<Rc<local::Track>> {
1140        self.media_connections
1141            .get_senders()
1142            .into_iter()
1143            .filter_map(|sndr| sndr.get_send_track())
1144            .collect()
1145    }
1146
1147    /// Returns [`Rc`] to the [`Receiver`] with the provided [`TrackId`].
1148    #[must_use]
1149    pub fn get_receiver_by_id(
1150        &self,
1151        id: TrackId,
1152    ) -> Option<Rc<receiver::Receiver>> {
1153        self.media_connections.get_receiver_by_id(id)
1154    }
1155}
1156
1157impl Drop for PeerConnection {
1158    /// Drops `on_track` and `on_ice_candidate` callbacks to prevent possible
1159    /// leaks.
1160    fn drop(&mut self) {
1161        self.peer.on_track::<Box<
1162            dyn FnMut(platform::MediaStreamTrack, platform::Transceiver),
1163        >>(None);
1164        self.peer
1165            .on_ice_candidate::<Box<dyn FnMut(platform::IceCandidate)>>(None);
1166        self.peer
1167            .on_ice_candidate_error::<Box<dyn FnMut(
1168                platform::IceCandidateError
1169            )>>(None);
1170    }
1171}