medea_jason/peer/mod.rs
1//! Adapters to [RTCPeerConnection][1] and related objects.
2//!
3//! [1]: https://w3.org/TR/webrtc#rtcpeerconnection-interface
4
5mod component;
6pub mod media;
7pub mod repo;
8mod stream_update_criteria;
9mod tracks_request;
10
11use std::{
12 cell::{Cell, RefCell},
13 collections::{HashMap, hash_map::DefaultHasher},
14 hash::{Hash as _, Hasher as _},
15 rc::Rc,
16};
17
18use derive_more::with_trait::{Display, From};
19use futures::{StreamExt as _, channel::mpsc, future};
20use medea_client_api_proto::{
21 Command, ConnectionMode, IceConnectionState, MediaSourceKind, MemberId,
22 PeerConnectionState, PeerId as Id, PeerId, TrackId, TrackPatchCommand,
23 stats::StatId,
24};
25use medea_macro::dispatchable;
26use tracerr::Traced;
27
28#[doc(inline)]
29pub use self::{
30 component::{Component, DESCRIPTION_APPROVE_TIMEOUT, State},
31 media::{
32 GetMidsError, InsertLocalTracksError, MediaConnections,
33 MediaExchangeState, MediaExchangeStateController, MediaState,
34 MediaStateControllable, MuteState, MuteStateController,
35 ProhibitedStateError, TrackDirection, TransceiverSide,
36 TransitableState, TransitableStateController, media_exchange_state,
37 mute_state, receiver, sender,
38 },
39 platform::RtcPeerConnectionError,
40 stream_update_criteria::LocalStreamUpdateCriteria,
41 tracks_request::{SimpleTracksRequest, TracksRequest, TracksRequestError},
42};
43use crate::{
44 connection::Connections,
45 media::{
46 InitLocalTracksError, LocalTracksConstraints, MediaKind, MediaManager,
47 MediaStreamSettings, RecvConstraints,
48 track::{local, remote},
49 },
50 platform,
51 utils::Caused,
52};
53
54/// Errors occurring in [`PeerConnection::update_local_stream()`] method.
55#[derive(Caused, Clone, Debug, Display, From)]
56#[cause(error = platform::Error)]
57pub enum UpdateLocalStreamError {
58 /// Errors occurred when [`TracksRequest`] validation fails.
59 InvalidLocalTracks(TracksRequestError),
60
61 /// [`MediaManager`] failed to acquire [`local::Track`]s.
62 CouldNotGetLocalMedia(#[cause] InitLocalTracksError),
63
64 /// Errors occurred in [`MediaConnections::insert_local_tracks()`] method.
65 InsertLocalTracksError(#[cause] InsertLocalTracksError),
66}
67
68/// Events emitted from a [`Sender`] or a [`Receiver`].
69///
70/// [`Receiver`]: receiver::Receiver
71/// [`Sender`]: sender::Sender
72#[derive(Clone, Copy, Debug)]
73pub enum TrackEvent {
74 /// Intention of the `MediaTrack` to mute/unmute himself.
75 MuteUpdateIntention {
76 /// ID of the `MediaTrack` which sends this intention.
77 id: TrackId,
78
79 /// The muting intention itself.
80 muted: bool,
81 },
82
83 /// Intention of the `MediaTrack` to enabled/disable himself.
84 MediaExchangeIntention {
85 /// ID of the `MediaTrack` which sends this intention.
86 id: TrackId,
87
88 /// The enabling/disabling intention itself.
89 enabled: bool,
90 },
91}
92
93/// Local media update errors that [`PeerConnection`] reports in
94/// [`PeerEvent::FailedLocalMedia`] messages.
95#[derive(Caused, Clone, Debug, Display, From)]
96#[cause(error = platform::Error)]
97pub enum LocalMediaError {
98 /// Error occurred in [`PeerConnection::update_local_stream()`] method.
99 UpdateLocalStreamError(#[cause] UpdateLocalStreamError),
100
101 /// Error occurred when creating a new [`Sender`].
102 ///
103 /// [`Sender`]: sender::Sender
104 SenderCreateError(sender::CreateError),
105}
106
107/// Events emitted from [`platform::RtcPeerConnection`].
108#[dispatchable(self: &Self, async_trait(?Send))]
109#[derive(Clone, Debug)]
110pub enum PeerEvent {
111 /// [`platform::RtcPeerConnection`] discovered new ICE candidate.
112 ///
113 /// Wrapper around [RTCPeerConnectionIceEvent][1].
114 ///
115 /// [1]: https://w3.org/TR/webrtc#rtcpeerconnectioniceevent
116 IceCandidateDiscovered {
117 /// ID of the [`PeerConnection`] that discovered new ICE candidate.
118 peer_id: Id,
119
120 /// [`candidate` field][2] of the discovered [RTCIceCandidate][1].
121 ///
122 /// [1]: https://w3.org/TR/webrtc#dom-rtcicecandidate
123 /// [2]: https://w3.org/TR/webrtc#dom-rtcicecandidate-candidate
124 candidate: String,
125
126 /// [`sdpMLineIndex` field][2] of the discovered [RTCIceCandidate][1].
127 ///
128 /// [1]: https://w3.org/TR/webrtc#dom-rtcicecandidate
129 /// [2]: https://w3.org/TR/webrtc#dom-rtcicecandidate-sdpmlineindex
130 sdp_m_line_index: Option<u16>,
131
132 /// [`sdpMid` field][2] of the discovered [RTCIceCandidate][1].
133 ///
134 /// [1]: https://w3.org/TR/webrtc#dom-rtcicecandidate
135 /// [2]: https://w3.org/TR/webrtc#dom-rtcicecandidate-sdpmid
136 sdp_mid: Option<String>,
137 },
138
139 /// Error occurred with an [ICE] candidate from a [`PeerConnection`].
140 ///
141 /// [ICE]: https://webrtcglossary.com/ice
142 IceCandidateError {
143 /// ID of the [`PeerConnection`] that errored.
144 peer_id: Id,
145
146 /// Local IP address used to communicate with a [STUN]/[TURN]
147 /// server.
148 ///
149 /// [STUN]: https://webrtcglossary.com/stun
150 /// [TURN]: https://webrtcglossary.com/turn
151 address: Option<String>,
152
153 /// Port used to communicate with a [STUN]/[TURN] server.
154 ///
155 /// [STUN]: https://webrtcglossary.com/stun
156 /// [TURN]: https://webrtcglossary.com/turn
157 port: Option<u32>,
158
159 /// URL identifying the [STUN]/[TURN] server for which the failure
160 /// occurred.
161 ///
162 /// [STUN]: https://webrtcglossary.com/stun
163 /// [TURN]: https://webrtcglossary.com/turn
164 url: String,
165
166 /// Numeric [STUN] error code returned by the [STUN]/[TURN] server.
167 ///
168 /// If no host candidate can reach the server, this error code will be
169 /// set to the value `701`, which is outside the [STUN] error code
170 /// range. This error is only fired once per server URL while in the
171 /// `RTCIceGatheringState` of "gathering".
172 ///
173 /// [STUN]: https://webrtcglossary.com/stun
174 /// [TURN]: https://webrtcglossary.com/turn
175 error_code: i32,
176
177 /// [STUN] reason text returned by the [STUN]/[TURN] server.
178 ///
179 /// If the server could not be reached, this reason test will be set to
180 /// an implementation-specific value providing details about
181 /// the error.
182 ///
183 /// [STUN]: https://webrtcglossary.com/stun
184 /// [TURN]: https://webrtcglossary.com/turn
185 error_text: String,
186 },
187
188 /// [`platform::RtcPeerConnection`] received a new [`remote::Track`] from
189 /// a remote sender.
190 NewRemoteTrack {
191 /// Remote `Member` ID.
192 sender_id: MemberId,
193
194 /// Received [`remote::Track`].
195 track: remote::Track,
196 },
197
198 /// [`platform::RtcPeerConnection`] sent new local track to remote members.
199 NewLocalTrack {
200 /// Local [`local::Track`] that is sent to remote members.
201 local_track: Rc<local::Track>,
202 },
203
204 /// [`platform::RtcPeerConnection`]'s [ICE connection][1] state changed.
205 ///
206 /// [1]: https://w3.org/TR/webrtc#dfn-ice-connection-state
207 IceConnectionStateChanged {
208 /// ID of the [`PeerConnection`] that sends
209 /// [`iceconnectionstatechange`][1] event.
210 ///
211 /// [1]: https://w3.org/TR/webrtc#event-iceconnectionstatechange
212 peer_id: Id,
213
214 /// New [`IceConnectionState`].
215 ice_connection_state: IceConnectionState,
216 },
217
218 /// [`platform::RtcPeerConnection`]'s [connection][1] state changed.
219 ///
220 /// [1]: https://w3.org/TR/webrtc#dom-peerconnection-connection-state
221 PeerConnectionStateChanged {
222 /// ID of the [`PeerConnection`] that sends
223 /// [`connectionstatechange`][1] event.
224 ///
225 /// [1]: https://w3.org/TR/webrtc#event-connectionstatechange
226 peer_id: Id,
227
228 /// New [`PeerConnectionState`].
229 peer_connection_state: PeerConnectionState,
230 },
231
232 /// [`platform::RtcPeerConnection`]'s [iceGatheringState][1] property has
233 /// changed.
234 ///
235 /// [1]: https://w3.org/TR/webrtc#dom-peerconnection-ice-gathering-state
236 IceGatheringStateChanged {
237 /// ID of the [`PeerConnection`] that sends
238 /// [`icegatheringstatechange`][1] event.
239 ///
240 /// [1]: https://w3.org/TR/webrtc#event-icegatheringstatechange
241 peer_id: Id,
242
243 /// New [`platform::IceGatheringState`].
244 ice_gathering_state: platform::IceGatheringState,
245 },
246
247 /// [`platform::RtcPeerConnection`]'s [`platform::RtcStats`] update.
248 StatsUpdate {
249 /// ID of the [`PeerConnection`] for which [` platform::RtcStats`] was
250 /// sent.
251 peer_id: Id,
252
253 /// [` platform::RtcStats`] of this [`PeerConnection`].
254 stats: platform::RtcStats,
255 },
256
257 /// [`PeerConnection::update_local_stream`] was failed, so
258 /// `on_failed_local_stream` callback should be called.
259 FailedLocalMedia {
260 /// Reasons of local media updating fail.
261 error: Traced<LocalMediaError>,
262 },
263
264 /// [`Component`] generated a new SDP answer.
265 NewSdpAnswer {
266 /// ID of the [`PeerConnection`] for which SDP answer was generated.
267 peer_id: PeerId,
268
269 /// SDP Answer of the `Peer`.
270 sdp_answer: String,
271
272 /// Statuses of `Peer` transceivers.
273 transceivers_statuses: HashMap<TrackId, bool>,
274 },
275
276 /// [`Component`] generated a new SDP offer.
277 NewSdpOffer {
278 /// ID of the [`PeerConnection`] for which SDP offer was generated.
279 peer_id: PeerId,
280
281 /// SDP Offer of the [`PeerConnection`].
282 sdp_offer: String,
283
284 /// Associations between `Track` and transceiver's
285 /// [media description][1].
286 ///
287 /// `mid` is basically an ID of [`m=<media>` section][1] in SDP.
288 ///
289 /// [1]: https://tools.ietf.org/html/rfc4566#section-5.14
290 mids: HashMap<TrackId, String>,
291
292 /// Statuses of [`PeerConnection`] transceivers.
293 transceivers_statuses: HashMap<TrackId, bool>,
294 },
295
296 /// [`Component`] resends his intentions.
297 MediaUpdateCommand {
298 /// Actual intentions of the [`Component`].
299 command: Command,
300 },
301}
302
303/// High-level wrapper around a [`platform::RtcPeerConnection`].
304#[derive(Debug)]
305pub struct PeerConnection {
306 /// Unique ID of [`PeerConnection`].
307 id: Id,
308
309 /// Underlying [`platform::RtcPeerConnection`].
310 peer: Rc<platform::RtcPeerConnection>,
311
312 /// [`sender::Component`]s and [`receiver::Component`]s of this
313 /// [`platform::RtcPeerConnection`].
314 media_connections: Rc<MediaConnections>,
315
316 /// [`MediaManager`] that will be used to acquire [`local::Track`]s.
317 media_manager: Rc<MediaManager>,
318
319 /// [`PeerEvent`]s tx.
320 peer_events_sender: Rc<mpsc::UnboundedSender<PeerEvent>>,
321
322 /// Indicator whether the underlying [`platform::RtcPeerConnection`] has a
323 /// remote description.
324 has_remote_description: Cell<bool>,
325
326 /// Buffer of [`platform::IceCandidate`]s received before a remote
327 /// description for the underlying [`platform::RtcPeerConnection`].
328 ice_candidates_buffer: RefCell<Vec<platform::IceCandidate>>,
329
330 /// Last hashes of all the [`platform::RtcStats`] which were already sent
331 /// to a server, so we won't duplicate stats that were already sent.
332 ///
333 /// Stores precomputed hashes, since we don't need access to actual stats
334 /// values.
335 sent_stats_cache: RefCell<HashMap<StatId, u64>>,
336
337 /// Local media stream constraints used in this [`PeerConnection`].
338 send_constraints: LocalTracksConstraints,
339
340 /// Collection of [`Connection`]s with a remote `Member`s.
341 ///
342 /// [`Connection`]: crate::connection::Connection
343 connections: Rc<Connections>,
344
345 /// Sender for the [`TrackEvent`]s which should be processed by this
346 /// [`PeerConnection`].
347 track_events_sender: mpsc::UnboundedSender<TrackEvent>,
348
349 /// Constraints to the [`remote::Track`] from this [`PeerConnection`]. Used
350 /// to disable or enable media receiving.
351 recv_constraints: Rc<RecvConstraints>,
352}
353
354impl PeerConnection {
355 /// Creates a new [`PeerConnection`].
356 ///
357 /// Provided `peer_events_sender` will be used to emit [`PeerEvent`]s from
358 /// this peer.
359 ///
360 /// Provided `ice_servers` will be used by the created
361 /// [`platform::RtcPeerConnection`].
362 ///
363 /// # Errors
364 ///
365 /// Errors with an [`RtcPeerConnectionError::PeerCreationError`] if
366 /// [`platform::RtcPeerConnection`] creating fails.
367 pub async fn new(
368 state: &State,
369 peer_events_sender: mpsc::UnboundedSender<PeerEvent>,
370 media_manager: Rc<MediaManager>,
371 send_constraints: LocalTracksConstraints,
372 connections: Rc<Connections>,
373 recv_constraints: Rc<RecvConstraints>,
374 ) -> Result<Rc<Self>, Traced<RtcPeerConnectionError>> {
375 let peer = Rc::new(
376 platform::RtcPeerConnection::new(
377 state.ice_servers().clone(),
378 state.force_relay(),
379 )
380 .await
381 .map_err(tracerr::map_from_and_wrap!())?,
382 );
383 let (track_events_sender, mut track_events_rx) = mpsc::unbounded();
384 let media_connections = Rc::new(MediaConnections::new(
385 Rc::clone(&peer),
386 peer_events_sender.clone(),
387 ));
388
389 platform::spawn({
390 let peer_events_sender = peer_events_sender.clone();
391 let peer_id = state.id();
392
393 async move {
394 while let Some(e) = track_events_rx.next().await {
395 Self::handle_track_event(peer_id, &peer_events_sender, e);
396 }
397 }
398 });
399
400 let peer = Self {
401 id: state.id(),
402 peer,
403 media_connections,
404 media_manager,
405 peer_events_sender: Rc::new(peer_events_sender),
406 sent_stats_cache: RefCell::new(HashMap::new()),
407 has_remote_description: Cell::new(false),
408 ice_candidates_buffer: RefCell::new(Vec::new()),
409 send_constraints,
410 connections,
411 track_events_sender,
412 recv_constraints,
413 };
414
415 peer.bind_event_listeners(state);
416
417 Ok(Rc::new(peer))
418 }
419
420 /// Binds all the necessary event listeners to this [`PeerConnection`].
421 fn bind_event_listeners(&self, state: &State) {
422 // Bind to `icecandidate` event.
423 {
424 let id = self.id;
425 let weak_sender = Rc::downgrade(&self.peer_events_sender);
426 self.peer.on_ice_candidate(Some(move |candidate| {
427 if let Some(sender) = weak_sender.upgrade() {
428 Self::on_ice_candidate(id, &sender, candidate);
429 }
430 }));
431 }
432
433 // Bind to `icecandidateerror` event.
434 {
435 let id = self.id;
436 let weak_sender = Rc::downgrade(&self.peer_events_sender);
437 self.peer.on_ice_candidate_error(Some(move |error| {
438 if let Some(sender) = weak_sender.upgrade() {
439 Self::on_ice_candidate_error(id, &sender, error);
440 }
441 }));
442 }
443
444 // Bind to `iceconnectionstatechange` event.
445 {
446 let id = self.id;
447 let weak_sender = Rc::downgrade(&self.peer_events_sender);
448 self.peer.on_ice_connection_state_change(Some(
449 move |ice_connection_state| {
450 if let Some(sender) = weak_sender.upgrade() {
451 Self::on_ice_connection_state_changed(
452 id,
453 &sender,
454 ice_connection_state,
455 );
456 }
457 },
458 ));
459 }
460
461 // Bind to `connectionstatechange` event.
462 {
463 let id = self.id;
464 let weak_sender = Rc::downgrade(&self.peer_events_sender);
465 self.peer.on_connection_state_change(Some(
466 move |peer_connection_state| {
467 if let Some(sender) = weak_sender.upgrade() {
468 Self::on_connection_state_changed(
469 id,
470 &sender,
471 peer_connection_state,
472 );
473 }
474 },
475 ));
476 }
477
478 // Bind to `track` event.
479 {
480 let media_conns = Rc::downgrade(&self.media_connections);
481 let connection_mode = state.connection_mode();
482 self.peer.on_track(Some(move |track, transceiver| {
483 if let Some(c) = media_conns.upgrade() {
484 platform::spawn(async move {
485 if let (Err(mid), ConnectionMode::Mesh) = (
486 c.add_remote_track(track, transceiver).await,
487 connection_mode,
488 ) {
489 log::error!(
490 "Cannot add new remote track with mid={mid}",
491 );
492 }
493 });
494 }
495 }));
496 }
497
498 {
499 let id = self.id;
500 let weak_sender = Rc::downgrade(&self.peer_events_sender);
501 self.peer.on_ice_gathering_state_change(Some(
502 move |ice_gathering_state_change| {
503 if let Some(sender) = weak_sender.upgrade() {
504 Self::on_ice_gathering_state_change(
505 id,
506 &sender,
507 ice_gathering_state_change,
508 );
509 }
510 },
511 ));
512 }
513 }
514
515 /// Handles [`TrackEvent`]s emitted from a [`Sender`] or a [`Receiver`].
516 ///
517 /// Sends a [`PeerEvent::MediaUpdateCommand`] with a
518 /// [`Command::UpdateTracks`] on [`TrackEvent::MediaExchangeIntention`] and
519 /// [`TrackEvent::MuteUpdateIntention`].
520 ///
521 /// [`Sender`]: sender::Sender
522 /// [`Receiver`]: receiver::Receiver
523 fn handle_track_event(
524 peer_id: PeerId,
525 peer_events_sender: &mpsc::UnboundedSender<PeerEvent>,
526 event: TrackEvent,
527 ) {
528 let patch = match event {
529 TrackEvent::MediaExchangeIntention { id, enabled } => {
530 TrackPatchCommand { id, muted: None, enabled: Some(enabled) }
531 }
532 TrackEvent::MuteUpdateIntention { id, muted } => {
533 TrackPatchCommand { id, muted: Some(muted), enabled: None }
534 }
535 };
536
537 _ = peer_events_sender
538 .unbounded_send(PeerEvent::MediaUpdateCommand {
539 command: Command::UpdateTracks {
540 peer_id,
541 tracks_patches: vec![patch],
542 },
543 })
544 .ok();
545 }
546
547 /// Returns all [`TrackId`]s of [`Sender`]s that match the provided
548 /// [`LocalStreamUpdateCriteria`] and don't have [`local::Track`].
549 ///
550 /// [`Sender`]: sender::Sender
551 #[must_use]
552 pub fn get_senders_without_tracks_ids(
553 &self,
554 kinds: LocalStreamUpdateCriteria,
555 ) -> Vec<TrackId> {
556 self.media_connections.get_senders_without_tracks_ids(kinds)
557 }
558
559 /// Drops [`local::Track`]s of all [`Sender`]s which are matches provided
560 /// [`LocalStreamUpdateCriteria`].
561 ///
562 /// [`Sender`]: sender::Sender
563 pub async fn drop_send_tracks(&self, kinds: LocalStreamUpdateCriteria) {
564 self.media_connections.drop_send_tracks(kinds).await;
565 }
566
567 /// Filters out already sent stats, and send new stats from the provided
568 /// [`platform::RtcStats`].
569 pub fn send_peer_stats(&self, stats: platform::RtcStats) {
570 let mut stats_cache = self.sent_stats_cache.borrow_mut();
571 let stats = platform::RtcStats(
572 stats
573 .0
574 .into_iter()
575 .filter(|stat| {
576 let mut hasher = DefaultHasher::new();
577 stat.stats.hash(&mut hasher);
578 let stat_hash = hasher.finish();
579
580 #[expect( // false positive
581 clippy::option_if_let_else,
582 reason = "false positive: &mut"
583 )]
584 if let Some(last_hash) = stats_cache.get_mut(&stat.id) {
585 if *last_hash == stat_hash {
586 false
587 } else {
588 *last_hash = stat_hash;
589 true
590 }
591 } else {
592 _ = stats_cache.insert(stat.id.clone(), stat_hash);
593 true
594 }
595 })
596 .collect(),
597 );
598
599 if !stats.0.is_empty() {
600 drop(self.peer_events_sender.unbounded_send(
601 PeerEvent::StatsUpdate { peer_id: self.id, stats },
602 ));
603 }
604 }
605
606 /// Sends [`platform::RtcStats`] update of this [`PeerConnection`] to a
607 /// server.
608 pub async fn scrape_and_send_peer_stats(&self) {
609 match self.peer.get_stats().await {
610 Ok(stats) => self.send_peer_stats(stats),
611 Err(e) => log::error!("{e}"),
612 }
613 }
614
615 /// Indicates whether all [`TransceiverSide`]s with the provided
616 /// [`MediaKind`], [`TrackDirection`] and [`MediaSourceKind`] are in the
617 /// provided [`MediaState`].
618 #[must_use]
619 pub fn is_all_transceiver_sides_in_media_state(
620 &self,
621 kind: MediaKind,
622 direction: TrackDirection,
623 source_kind: Option<MediaSourceKind>,
624 state: MediaState,
625 ) -> bool {
626 self.media_connections.is_all_tracks_in_media_state(
627 kind,
628 direction,
629 source_kind,
630 state,
631 )
632 }
633
634 /// Returns the [`PeerId`] of this [`PeerConnection`].
635 pub const fn id(&self) -> PeerId {
636 self.id
637 }
638
639 /// Handle `icecandidate` event from the underlying peer emitting
640 /// [`PeerEvent::IceCandidateDiscovered`] event into this peer's
641 /// `peer_events_sender`.
642 fn on_ice_candidate(
643 id: Id,
644 sender: &mpsc::UnboundedSender<PeerEvent>,
645 candidate: platform::IceCandidate,
646 ) {
647 drop(sender.unbounded_send(PeerEvent::IceCandidateDiscovered {
648 peer_id: id,
649 candidate: candidate.candidate,
650 sdp_m_line_index: candidate.sdp_m_line_index,
651 sdp_mid: candidate.sdp_mid,
652 }));
653 }
654
655 /// Handle `icecandidateerror` event from the underlying peer emitting
656 /// [`PeerEvent::IceCandidateError`] event into this peer's
657 /// `peer_events_sender`.
658 fn on_ice_candidate_error(
659 id: Id,
660 sender: &mpsc::UnboundedSender<PeerEvent>,
661 error: platform::IceCandidateError,
662 ) {
663 drop(sender.unbounded_send(PeerEvent::IceCandidateError {
664 peer_id: id,
665 address: error.address,
666 port: error.port,
667 url: error.url,
668 error_code: error.error_code,
669 error_text: error.error_text,
670 }));
671 }
672
673 /// Handle `iceconnectionstatechange` event from the underlying peer
674 /// emitting [`PeerEvent::IceConnectionStateChanged`] event into this peer's
675 /// `peer_events_sender`.
676 fn on_ice_connection_state_changed(
677 peer_id: Id,
678 sender: &mpsc::UnboundedSender<PeerEvent>,
679 ice_connection_state: IceConnectionState,
680 ) {
681 drop(sender.unbounded_send(PeerEvent::IceConnectionStateChanged {
682 peer_id,
683 ice_connection_state,
684 }));
685 }
686
687 /// Handles `connectionstatechange` event from the underlying peer emitting
688 /// [`PeerEvent::PeerConnectionStateChanged`] event into this peer's
689 /// `peer_events_sender`.
690 fn on_connection_state_changed(
691 peer_id: Id,
692 sender: &mpsc::UnboundedSender<PeerEvent>,
693 peer_connection_state: PeerConnectionState,
694 ) {
695 drop(sender.unbounded_send(PeerEvent::PeerConnectionStateChanged {
696 peer_id,
697 peer_connection_state,
698 }));
699 }
700
701 /// Handles `icegatheringstatechange` event from the underlying peer
702 /// emitting [`PeerEvent::IceGatheringStateChanged`] event into this peer's
703 /// `peer_events_sender`.
704 fn on_ice_gathering_state_change(
705 peer_id: Id,
706 sender: &mpsc::UnboundedSender<PeerEvent>,
707 ice_gathering_state: platform::IceGatheringState,
708 ) {
709 drop(sender.unbounded_send(PeerEvent::IceGatheringStateChanged {
710 peer_id,
711 ice_gathering_state,
712 }));
713 }
714
715 /// Marks [`PeerConnection`] to trigger ICE restart.
716 ///
717 /// After this function returns, the generated offer is automatically
718 /// configured to trigger ICE restart.
719 fn restart_ice(&self) {
720 self.peer.restart_ice();
721 }
722
723 /// Returns all [`TransceiverSide`]s from this [`PeerConnection`] with
724 /// provided [`MediaKind`], [`TrackDirection`] and [`MediaSourceKind`].
725 pub fn get_transceivers_sides(
726 &self,
727 kind: MediaKind,
728 direction: TrackDirection,
729 source_kind: Option<MediaSourceKind>,
730 ) -> Vec<Rc<dyn TransceiverSide>> {
731 self.media_connections.get_transceivers_sides(
732 kind,
733 direction,
734 source_kind,
735 )
736 }
737
738 /// Track id to mid relations of all send tracks of this
739 /// [`platform::RtcPeerConnection`]. mid is id of [`m= section`][1]. mids
740 /// are received directly from registered [`RTCRtpTransceiver`][2]s, and
741 /// are being allocated on SDP update.
742 ///
743 /// # Errors
744 ///
745 /// Errors if finds transceiver without mid, so must be called after setting
746 /// local description if offerer, and remote if answerer.
747 ///
748 /// [1]: https://tools.ietf.org/html/rfc4566#section-5.14
749 /// [2]: https://w3.org/TR/webrtc#rtcrtptransceiver-interface
750 fn get_mids(
751 &self,
752 ) -> Result<HashMap<TrackId, String>, Traced<GetMidsError>> {
753 self.media_connections.get_mids().map_err(tracerr::wrap!())
754 }
755
756 /// Returns publishing statuses of the all [`Sender`]s from this
757 /// [`MediaConnections`].
758 ///
759 /// [`Sender`]: sender::Sender
760 async fn get_transceivers_statuses(&self) -> HashMap<TrackId, bool> {
761 self.media_connections.get_transceivers_statuses().await
762 }
763
764 /// Updates [`local::Track`]s being used in [`PeerConnection`]s [`Sender`]s.
765 /// [`Sender`]s are chosen based on the provided
766 /// [`LocalStreamUpdateCriteria`].
767 ///
768 /// First of all makes sure that [`PeerConnection`] [`Sender`]s are
769 /// up-to-date and synchronized with a real object state. If there are no
770 /// [`Sender`]s configured in this [`PeerConnection`], then this method is
771 /// no-op.
772 ///
773 /// Secondly, make sure that configured [`LocalTracksConstraints`] are up to
774 /// date.
775 ///
776 /// This function requests local stream from [`MediaManager`]. If stream
777 /// returned from [`MediaManager`] is considered new, then this function
778 /// will emit [`PeerEvent::NewLocalTrack`] events.
779 ///
780 /// Constraints being used when requesting stream from [`MediaManager`] are
781 /// a result of merging constraints received from this [`PeerConnection`]
782 /// [`Sender`]s, which are configured by server during signalling, and
783 /// [`LocalTracksConstraints`].
784 ///
785 /// Returns [`HashMap`] with [`media_exchange_state::Stable`]s updates for
786 /// the [`Sender`]s.
787 ///
788 /// # Errors
789 ///
790 /// With an [`UpdateLocalStreamError::InvalidLocalTracks`] if the current
791 /// state of the [`PeerConnection`]'s [`Sender`]s cannot be represented as
792 /// [`SimpleTracksRequest`] (max 1 audio [`Sender`] and max 2 video
793 /// [`Sender`]s), or the [`local::Track`]s requested from the
794 /// [`MediaManager`] doesn't satisfy [`Sender`]'s constraints.
795 ///
796 /// With an [`UpdateLocalStreamError::CouldNotGetLocalMedia`] if the
797 /// [`local::Track`]s cannot be obtained from the UA.
798 ///
799 /// With an [`UpdateLocalStreamError::InvalidLocalTracks`] if the
800 /// [`local::Track`]s cannot be inserted into [`PeerConnection`]s
801 /// [`Sender`]s.
802 ///
803 /// [`Sender`]: sender::Sender
804 /// [1]: https://w3.org/TR/mediacapture-streams#mediastream
805 /// [2]: https://w3.org/TR/webrtc#rtcpeerconnection-interface
806 pub async fn update_local_stream(
807 &self,
808 criteria: LocalStreamUpdateCriteria,
809 ) -> Result<
810 HashMap<TrackId, media_exchange_state::Stable>,
811 Traced<UpdateLocalStreamError>,
812 > {
813 self.inner_update_local_stream(criteria).await.inspect_err(|e| {
814 drop(self.peer_events_sender.unbounded_send(
815 PeerEvent::FailedLocalMedia {
816 error: tracerr::map_from(e.clone()),
817 },
818 ));
819 })
820 }
821
822 /// Returns [`MediaStreamSettings`] for the provided [`MediaKind`] and
823 /// [`MediaSourceKind`].
824 ///
825 /// If [`MediaSourceKind`] is [`None`] then [`MediaStreamSettings`] for all
826 /// [`MediaSourceKind`]s will be provided.
827 ///
828 /// # Errors
829 ///
830 /// Errors with a [`TracksRequestError`] if failed to create or merge
831 /// [`SimpleTracksRequest`].
832 pub fn get_media_settings(
833 &self,
834 kind: MediaKind,
835 source_kind: Option<MediaSourceKind>,
836 ) -> Result<Option<MediaStreamSettings>, Traced<TracksRequestError>> {
837 let mut criteria = LocalStreamUpdateCriteria::empty();
838 if let Some(msk) = source_kind {
839 criteria.add(kind, msk);
840 } else {
841 criteria.add(kind, MediaSourceKind::Device);
842 criteria.add(kind, MediaSourceKind::Display);
843 }
844
845 self.get_simple_tracks_request(criteria)
846 .map_err(tracerr::map_from_and_wrap!())
847 .map(|opt| opt.map(|s| MediaStreamSettings::from(&s)))
848 }
849
850 /// Returns [`SimpleTracksRequest`] for the provided
851 /// [`LocalStreamUpdateCriteria`].
852 ///
853 /// # Errors
854 ///
855 /// Errors with a [`TracksRequestError`] if failed to create or merge
856 /// [`SimpleTracksRequest`].
857 fn get_simple_tracks_request(
858 &self,
859 criteria: LocalStreamUpdateCriteria,
860 ) -> Result<Option<SimpleTracksRequest>, Traced<TracksRequestError>> {
861 let Some(request) = self.media_connections.get_tracks_request(criteria)
862 else {
863 return Ok(None);
864 };
865 let mut required_caps = SimpleTracksRequest::try_from(request)
866 .map_err(tracerr::from_and_wrap!())?;
867 required_caps
868 .merge(self.send_constraints.inner())
869 .map_err(tracerr::map_from_and_wrap!())?;
870
871 Ok(Some(required_caps))
872 }
873
874 /// Implementation of the [`PeerConnection::update_local_stream`] method.
875 async fn inner_update_local_stream(
876 &self,
877 criteria: LocalStreamUpdateCriteria,
878 ) -> Result<
879 HashMap<TrackId, media_exchange_state::Stable>,
880 Traced<UpdateLocalStreamError>,
881 > {
882 if let Some(required_caps) = self
883 .get_simple_tracks_request(criteria)
884 .map_err(tracerr::map_from_and_wrap!())?
885 {
886 let used_caps = MediaStreamSettings::from(&required_caps);
887
888 let media_tracks = self
889 .media_manager
890 .get_tracks(used_caps)
891 .await
892 .map_err(tracerr::map_from_and_wrap!())?;
893 let peer_tracks = required_caps
894 .parse_tracks(
895 media_tracks.iter().map(|(t, _)| t).cloned().collect(),
896 )
897 .await
898 .map_err(tracerr::map_from_and_wrap!())?;
899
900 let media_exchange_states_updates = self
901 .media_connections
902 .insert_local_tracks(&peer_tracks)
903 .await
904 .map_err(tracerr::map_from_and_wrap!())?;
905
906 for (local_track, is_new) in media_tracks {
907 if is_new {
908 drop(self.peer_events_sender.unbounded_send(
909 PeerEvent::NewLocalTrack { local_track },
910 ));
911 }
912 }
913
914 Ok(media_exchange_states_updates)
915 } else {
916 Ok(HashMap::new())
917 }
918 }
919
920 /// Returns [`Rc`] to [`TransceiverSide`] with a provided [`TrackId`].
921 ///
922 /// Returns [`None`] if [`TransceiverSide`] with a provided [`TrackId`]
923 /// doesn't exist in this [`PeerConnection`].
924 pub fn get_transceiver_side_by_id(
925 &self,
926 track_id: TrackId,
927 ) -> Option<Rc<dyn TransceiverSide>> {
928 self.media_connections.get_transceiver_side_by_id(track_id)
929 }
930
931 /// Updates underlying [RTCPeerConnection][1]'s remote SDP from answer.
932 ///
933 /// # Errors
934 ///
935 /// With [`RtcPeerConnectionError::SetRemoteDescriptionFailed`][3] if
936 /// [RTCPeerConnection.setRemoteDescription()][2] fails.
937 ///
938 /// [1]: https://w3.org/TR/webrtc#rtcpeerconnection-interface
939 /// [2]: https://w3.org/TR/webrtc#dom-peerconnection-setremotedescription
940 /// [3]: platform::RtcPeerConnectionError::SetRemoteDescriptionFailed
941 async fn set_remote_answer(
942 &self,
943 answer: String,
944 ) -> Result<(), Traced<RtcPeerConnectionError>> {
945 self.set_remote_description(platform::SdpType::Answer(answer))
946 .await
947 .map_err(tracerr::wrap!())
948 }
949
950 /// Updates underlying [RTCPeerConnection][1]'s remote SDP from offer.
951 ///
952 /// # Errors
953 ///
954 /// With [`platform::RtcPeerConnectionError::SetRemoteDescriptionFailed`] if
955 /// [RTCPeerConnection.setRemoteDescription()][2] fails.
956 ///
957 /// [1]: https://w3.org/TR/webrtc#rtcpeerconnection-interface
958 /// [2]: https://w3.org/TR/webrtc#dom-peerconnection-setremotedescription
959 async fn set_remote_offer(
960 &self,
961 offer: String,
962 ) -> Result<(), Traced<RtcPeerConnectionError>> {
963 self.set_remote_description(platform::SdpType::Offer(offer))
964 .await
965 .map_err(tracerr::wrap!())
966 }
967
968 /// Updates underlying [RTCPeerConnection][1]'s remote SDP with given
969 /// description.
970 ///
971 /// # Errors
972 ///
973 /// With [`platform::RtcPeerConnectionError::SetRemoteDescriptionFailed`] if
974 /// [RTCPeerConnection.setRemoteDescription()][2] fails.
975 ///
976 /// With [`platform::RtcPeerConnectionError::AddIceCandidateFailed`] if
977 /// [RtcPeerConnection.addIceCandidate()][3] fails when adding buffered ICE
978 /// candidates.
979 ///
980 /// [1]: https://w3.org/TR/webrtc#rtcpeerconnection-interface
981 /// [2]: https://w3.org/TR/webrtc#dom-peerconnection-setremotedescription
982 /// [3]: https://w3.org/TR/webrtc#dom-peerconnection-addicecandidate
983 async fn set_remote_description(
984 &self,
985 desc: platform::SdpType,
986 ) -> Result<(), Traced<RtcPeerConnectionError>> {
987 self.peer
988 .set_remote_description(desc)
989 .await
990 .map_err(tracerr::map_from_and_wrap!())?;
991 self.has_remote_description.set(true);
992 self.media_connections.sync_receivers().await;
993
994 let ice_candidates_buffer_flush_fut = future::try_join_all(
995 self.ice_candidates_buffer.borrow_mut().drain(..).map(
996 |candidate| {
997 let peer = Rc::clone(&self.peer);
998 async move {
999 peer.add_ice_candidate(
1000 &candidate.candidate,
1001 candidate.sdp_m_line_index,
1002 &candidate.sdp_mid,
1003 )
1004 .await
1005 }
1006 },
1007 ),
1008 );
1009 ice_candidates_buffer_flush_fut
1010 .await
1011 .map(drop)
1012 .map_err(tracerr::map_from_and_wrap!())?;
1013
1014 Ok(())
1015 }
1016
1017 /// Adds remote peers [ICE Candidate][1] to this peer.
1018 ///
1019 /// # Errors
1020 ///
1021 /// With [`RtcPeerConnectionError::AddIceCandidateFailed`] if
1022 /// [RtcPeerConnection.addIceCandidate()][3] fails to add buffered
1023 /// [ICE candidates][1].
1024 ///
1025 /// [1]: https://tools.ietf.org/html/rfc5245#section-2
1026 /// [3]: https://w3.org/TR/webrtc#dom-peerconnection-addicecandidate
1027 pub async fn add_ice_candidate(
1028 &self,
1029 candidate: String,
1030 sdp_m_line_index: Option<u16>,
1031 sdp_mid: Option<String>,
1032 ) -> Result<(), Traced<RtcPeerConnectionError>> {
1033 if self.has_remote_description.get() {
1034 self.peer
1035 .add_ice_candidate(&candidate, sdp_m_line_index, &sdp_mid)
1036 .await
1037 .map_err(tracerr::map_from_and_wrap!())?;
1038 } else {
1039 self.ice_candidates_buffer.borrow_mut().push(
1040 platform::IceCandidate { candidate, sdp_m_line_index, sdp_mid },
1041 );
1042 }
1043 Ok(())
1044 }
1045
1046 /// Removes a [`sender::Component`] and a [`receiver::Component`] with the
1047 /// provided [`TrackId`] from this [`PeerConnection`].
1048 pub fn remove_track(&self, track_id: TrackId) {
1049 self.media_connections.remove_track(track_id);
1050 }
1051
1052 /// Returns the [`PeerConnectionState`] of this [`PeerConnection`].
1053 pub fn connection_state(&self) -> PeerConnectionState {
1054 self.peer.connection_state()
1055 }
1056}
1057
1058#[cfg(feature = "mockable")]
1059#[expect(clippy::multiple_inherent_impl, reason = "feature gated")]
1060impl PeerConnection {
1061 /// Returns [`RtcStats`] of this [`PeerConnection`].
1062 ///
1063 /// # Errors
1064 ///
1065 /// Errors with [`PeerError::RtcPeerConnection`] if failed to get
1066 /// [`RtcStats`].
1067 pub async fn get_stats(
1068 &self,
1069 ) -> Result<platform::RtcStats, Traced<RtcPeerConnectionError>> {
1070 self.peer.get_stats().await
1071 }
1072
1073 /// Indicates whether all [`Receiver`]s audio tracks are enabled.
1074 #[must_use]
1075 pub fn is_recv_audio_enabled(&self) -> bool {
1076 self.media_connections.is_recv_audio_enabled()
1077 }
1078
1079 /// Indicates whether all [`Receiver`]s video tracks are enabled.
1080 #[must_use]
1081 pub fn is_recv_video_enabled(&self) -> bool {
1082 self.media_connections.is_recv_video_enabled()
1083 }
1084
1085 /// Returns inner [`IceCandidate`]'s buffer length. Used in tests.
1086 #[must_use]
1087 pub fn candidates_buffer_len(&self) -> usize {
1088 self.ice_candidates_buffer.borrow().len()
1089 }
1090
1091 /// Lookups [`Sender`] by provided [`TrackId`].
1092 #[must_use]
1093 pub fn get_sender_by_id(&self, id: TrackId) -> Option<Rc<media::Sender>> {
1094 self.media_connections.get_sender_by_id(id)
1095 }
1096
1097 /// Lookups [`sender::State`] by the provided [`TrackId`].
1098 #[must_use]
1099 pub fn get_sender_state_by_id(
1100 &self,
1101 id: TrackId,
1102 ) -> Option<Rc<sender::State>> {
1103 self.media_connections.get_sender_state_by_id(id)
1104 }
1105
1106 /// Indicates whether all [`Sender`]s audio tracks are enabled.
1107 #[must_use]
1108 pub fn is_send_audio_enabled(&self) -> bool {
1109 self.media_connections.is_send_audio_enabled()
1110 }
1111
1112 /// Indicates whether all [`Sender`]s video tracks are enabled.
1113 #[must_use]
1114 pub fn is_send_video_enabled(
1115 &self,
1116 source_kind: Option<MediaSourceKind>,
1117 ) -> bool {
1118 self.media_connections.is_send_video_enabled(source_kind)
1119 }
1120
1121 /// Indicates whether all [`Sender`]s video tracks are unmuted.
1122 #[must_use]
1123 pub fn is_send_video_unmuted(
1124 &self,
1125 source_kind: Option<MediaSourceKind>,
1126 ) -> bool {
1127 self.media_connections.is_send_video_unmuted(source_kind)
1128 }
1129
1130 /// Indicates whether all [`Sender`]s audio tracks are unmuted.
1131 #[must_use]
1132 pub fn is_send_audio_unmuted(&self) -> bool {
1133 self.media_connections.is_send_audio_unmuted()
1134 }
1135
1136 /// Returns all [`local::Track`]s from [`PeerConnection`]'s
1137 /// [`Transceiver`]s.
1138 #[must_use]
1139 pub fn get_send_tracks(&self) -> Vec<Rc<local::Track>> {
1140 self.media_connections
1141 .get_senders()
1142 .into_iter()
1143 .filter_map(|sndr| sndr.get_send_track())
1144 .collect()
1145 }
1146
1147 /// Returns [`Rc`] to the [`Receiver`] with the provided [`TrackId`].
1148 #[must_use]
1149 pub fn get_receiver_by_id(
1150 &self,
1151 id: TrackId,
1152 ) -> Option<Rc<receiver::Receiver>> {
1153 self.media_connections.get_receiver_by_id(id)
1154 }
1155}
1156
1157impl Drop for PeerConnection {
1158 /// Drops `on_track` and `on_ice_candidate` callbacks to prevent possible
1159 /// leaks.
1160 fn drop(&mut self) {
1161 self.peer.on_track::<Box<
1162 dyn FnMut(platform::MediaStreamTrack, platform::Transceiver),
1163 >>(None);
1164 self.peer
1165 .on_ice_candidate::<Box<dyn FnMut(platform::IceCandidate)>>(None);
1166 self.peer
1167 .on_ice_candidate_error::<Box<dyn FnMut(
1168 platform::IceCandidateError
1169 )>>(None);
1170 }
1171}