webrtc/rtp_transceiver/
mod.rs

1#[cfg(test)]
2mod rtp_transceiver_test;
3
4use std::fmt;
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9
10use interceptor::stream_info::{AssociatedStreamInfo, RTPHeaderExtension, StreamInfo};
11use interceptor::Attributes;
12use log::trace;
13use portable_atomic::{AtomicBool, AtomicU8};
14use serde::{Deserialize, Serialize};
15use smol_str::SmolStr;
16use tokio::sync::{Mutex, OnceCell};
17
18use crate::api::media_engine::MediaEngine;
19use crate::error::{Error, Result};
20use crate::rtp_transceiver::rtp_codec::*;
21use crate::rtp_transceiver::rtp_receiver::{RTCRtpReceiver, RTPReceiverInternal};
22use crate::rtp_transceiver::rtp_sender::RTCRtpSender;
23use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
24use crate::track::track_local::TrackLocal;
25
26pub(crate) mod fmtp;
27pub mod rtp_codec;
28pub mod rtp_receiver;
29pub mod rtp_sender;
30pub mod rtp_transceiver_direction;
31pub(crate) mod srtp_writer_future;
32
33/// SSRC represents a synchronization source
34/// A synchronization source is a randomly chosen
35/// value meant to be globally unique within a particular
36/// RTP session. Used to identify a single stream of media.
37/// <https://tools.ietf.org/html/rfc3550#section-3>
38#[allow(clippy::upper_case_acronyms)]
39pub type SSRC = u32;
40
41/// PayloadType identifies the format of the RTP payload and determines
42/// its interpretation by the application. Each codec in a RTP Session
43/// will have a different PayloadType
44/// <https://tools.ietf.org/html/rfc3550#section-3>
45pub type PayloadType = u8;
46
47/// TYPE_RTCP_FBT_RANSPORT_CC ..
48pub const TYPE_RTCP_FB_TRANSPORT_CC: &str = "transport-cc";
49
50/// TYPE_RTCP_FB_GOOG_REMB ..
51pub const TYPE_RTCP_FB_GOOG_REMB: &str = "goog-remb";
52
53/// TYPE_RTCP_FB_ACK ..
54pub const TYPE_RTCP_FB_ACK: &str = "ack";
55
56/// TYPE_RTCP_FB_CCM ..
57pub const TYPE_RTCP_FB_CCM: &str = "ccm";
58
59/// TYPE_RTCP_FB_NACK ..
60pub const TYPE_RTCP_FB_NACK: &str = "nack";
61
62/// rtcpfeedback signals the connection to use additional RTCP packet types.
63/// <https://draft.ortc.org/#dom-rtcrtcpfeedback>
64#[derive(Default, Debug, Clone, PartialEq, Eq)]
65pub struct RTCPFeedback {
66    /// Type is the type of feedback.
67    /// see: <https://draft.ortc.org/#dom-rtcrtcpfeedback>
68    /// valid: ack, ccm, nack, goog-remb, transport-cc
69    pub typ: String,
70
71    /// The parameter value depends on the type.
72    /// For example, type="nack" parameter="pli" will send Picture Loss Indicator packets.
73    pub parameter: String,
74}
75
76/// RTPCapabilities represents the capabilities of a transceiver
77/// <https://w3c.github.io/webrtc-pc/#rtcrtpcapabilities>
78#[derive(Default, Debug, Clone)]
79pub struct RTCRtpCapabilities {
80    pub codecs: Vec<RTCRtpCodecCapability>,
81    pub header_extensions: Vec<RTCRtpHeaderExtensionCapability>,
82}
83
84/// RTPRtxParameters dictionary contains information relating to retransmission (RTX) settings.
85/// <https://draft.ortc.org/#dom-rtcrtprtxparameters>
86#[derive(Default, Debug, Clone, Serialize, Deserialize)]
87pub struct RTCRtpRtxParameters {
88    pub ssrc: SSRC,
89}
90
91/// RTPCodingParameters provides information relating to both encoding and decoding.
92/// This is a subset of the RFC since Pion WebRTC doesn't implement encoding/decoding itself
93/// <http://draft.ortc.org/#dom-rtcrtpcodingparameters>
94#[derive(Default, Debug, Clone, Serialize, Deserialize)]
95pub struct RTCRtpCodingParameters {
96    pub rid: SmolStr,
97    pub ssrc: SSRC,
98    pub payload_type: PayloadType,
99    pub rtx: RTCRtpRtxParameters,
100}
101
102/// RTPDecodingParameters provides information relating to both encoding and decoding.
103/// This is a subset of the RFC since Pion WebRTC doesn't implement decoding itself
104/// <http://draft.ortc.org/#dom-rtcrtpdecodingparameters>
105pub type RTCRtpDecodingParameters = RTCRtpCodingParameters;
106
107/// RTPEncodingParameters provides information relating to both encoding and decoding.
108/// This is a subset of the RFC since Pion WebRTC doesn't implement encoding itself
109/// <http://draft.ortc.org/#dom-rtcrtpencodingparameters>
110pub type RTCRtpEncodingParameters = RTCRtpCodingParameters;
111
112/// RTPReceiveParameters contains the RTP stack settings used by receivers
113#[derive(Debug)]
114pub struct RTCRtpReceiveParameters {
115    pub encodings: Vec<RTCRtpDecodingParameters>,
116}
117
118/// RTPSendParameters contains the RTP stack settings used by receivers
119#[derive(Debug)]
120pub struct RTCRtpSendParameters {
121    pub rtp_parameters: RTCRtpParameters,
122    pub encodings: Vec<RTCRtpEncodingParameters>,
123}
124
125/// RTPTransceiverInit dictionary is used when calling the WebRTC function addTransceiver() to provide configuration options for the new transceiver.
126pub struct RTCRtpTransceiverInit {
127    pub direction: RTCRtpTransceiverDirection,
128    pub send_encodings: Vec<RTCRtpEncodingParameters>,
129    // Streams       []*Track
130}
131
132pub(crate) fn create_stream_info(
133    id: String,
134    ssrc: SSRC,
135    payload_type: PayloadType,
136    codec: RTCRtpCodecCapability,
137    webrtc_header_extensions: &[RTCRtpHeaderExtensionParameters],
138    associated_stream: Option<AssociatedStreamInfo>,
139) -> StreamInfo {
140    let header_extensions: Vec<RTPHeaderExtension> = webrtc_header_extensions
141        .iter()
142        .map(|h| RTPHeaderExtension {
143            id: h.id,
144            uri: h.uri.clone(),
145        })
146        .collect();
147
148    let feedbacks: Vec<_> = codec
149        .rtcp_feedback
150        .iter()
151        .map(|f| interceptor::stream_info::RTCPFeedback {
152            typ: f.typ.clone(),
153            parameter: f.parameter.clone(),
154        })
155        .collect();
156
157    StreamInfo {
158        id,
159        attributes: Attributes::new(),
160        ssrc,
161        payload_type,
162        rtp_header_extensions: header_extensions,
163        mime_type: codec.mime_type,
164        clock_rate: codec.clock_rate,
165        channels: codec.channels,
166        sdp_fmtp_line: codec.sdp_fmtp_line,
167        rtcp_feedback: feedbacks,
168        associated_stream,
169    }
170}
171
172pub type TriggerNegotiationNeededFnOption =
173    Option<Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> + Send + Sync>>;
174
175/// RTPTransceiver represents a combination of an RTPSender and an RTPReceiver that share a common mid.
176pub struct RTCRtpTransceiver {
177    mid: OnceCell<SmolStr>,               //atomic.Value
178    sender: Mutex<Arc<RTCRtpSender>>,     //atomic.Value
179    receiver: Mutex<Arc<RTCRtpReceiver>>, //atomic.Value
180
181    direction: AtomicU8,         //RTPTransceiverDirection
182    current_direction: AtomicU8, //RTPTransceiverDirection
183
184    codecs: Arc<Mutex<Vec<RTCRtpCodecParameters>>>, // User provided codecs via set_codec_preferences
185
186    pub(crate) stopped: AtomicBool,
187    pub(crate) kind: RTPCodecType,
188
189    media_engine: Arc<MediaEngine>,
190
191    trigger_negotiation_needed: Mutex<TriggerNegotiationNeededFnOption>,
192}
193
194impl RTCRtpTransceiver {
195    pub async fn new(
196        receiver: Arc<RTCRtpReceiver>,
197        sender: Arc<RTCRtpSender>,
198        direction: RTCRtpTransceiverDirection,
199        kind: RTPCodecType,
200        codecs: Vec<RTCRtpCodecParameters>,
201        media_engine: Arc<MediaEngine>,
202        trigger_negotiation_needed: TriggerNegotiationNeededFnOption,
203    ) -> Arc<Self> {
204        let codecs = Arc::new(Mutex::new(codecs));
205        receiver.set_transceiver_codecs(Some(Arc::clone(&codecs)));
206
207        let t = Arc::new(RTCRtpTransceiver {
208            mid: OnceCell::new(),
209            sender: Mutex::new(sender),
210            receiver: Mutex::new(receiver),
211
212            direction: AtomicU8::new(direction as u8),
213            current_direction: AtomicU8::new(RTCRtpTransceiverDirection::Unspecified as u8),
214
215            codecs,
216            stopped: AtomicBool::new(false),
217            kind,
218            media_engine,
219            trigger_negotiation_needed: Mutex::new(trigger_negotiation_needed),
220        });
221        t.sender()
222            .await
223            .set_rtp_transceiver(Some(Arc::downgrade(&t)));
224
225        t
226    }
227
228    /// set_codec_preferences sets preferred list of supported codecs
229    /// if codecs is empty or nil we reset to default from MediaEngine
230    pub async fn set_codec_preferences(&self, codecs: Vec<RTCRtpCodecParameters>) -> Result<()> {
231        for codec in &codecs {
232            let media_engine_codecs = self.media_engine.get_codecs_by_kind(self.kind);
233            let (_, match_type) = codec_parameters_fuzzy_search(codec, &media_engine_codecs);
234            if match_type == CodecMatch::None {
235                return Err(Error::ErrRTPTransceiverCodecUnsupported);
236            }
237        }
238
239        {
240            let mut c = self.codecs.lock().await;
241            *c = codecs;
242        }
243        Ok(())
244    }
245
246    /// Codecs returns list of supported codecs
247    pub(crate) async fn get_codecs(&self) -> Vec<RTCRtpCodecParameters> {
248        let mut codecs = self.codecs.lock().await;
249        RTPReceiverInternal::get_codecs(&mut codecs, self.kind, &self.media_engine)
250    }
251
252    /// sender returns the RTPTransceiver's RTPSender if it has one
253    pub async fn sender(&self) -> Arc<RTCRtpSender> {
254        let sender = self.sender.lock().await;
255        sender.clone()
256    }
257
258    /// set_sender_track sets the RTPSender and Track to current transceiver
259    pub async fn set_sender_track(
260        self: &Arc<Self>,
261        sender: Arc<RTCRtpSender>,
262        track: Option<Arc<dyn TrackLocal + Send + Sync>>,
263    ) -> Result<()> {
264        self.set_sender(sender).await;
265        self.set_sending_track(track).await
266    }
267
268    pub async fn set_sender(self: &Arc<Self>, s: Arc<RTCRtpSender>) {
269        s.set_rtp_transceiver(Some(Arc::downgrade(self)));
270
271        let prev_sender = self.sender().await;
272        prev_sender.set_rtp_transceiver(None);
273
274        {
275            let mut sender = self.sender.lock().await;
276            *sender = s;
277        }
278    }
279
280    /// receiver returns the RTPTransceiver's RTPReceiver if it has one
281    pub async fn receiver(&self) -> Arc<RTCRtpReceiver> {
282        let receiver = self.receiver.lock().await;
283        receiver.clone()
284    }
285
286    pub(crate) async fn set_receiver(&self, r: Arc<RTCRtpReceiver>) {
287        r.set_transceiver_codecs(Some(Arc::clone(&self.codecs)));
288
289        {
290            let mut receiver = self.receiver.lock().await;
291            (*receiver).set_transceiver_codecs(None);
292
293            *receiver = r;
294        }
295    }
296
297    /// set_mid sets the RTPTransceiver's mid. If it was already set, will return an error.
298    pub(crate) fn set_mid(&self, mid: SmolStr) -> Result<()> {
299        self.mid
300            .set(mid)
301            .map_err(|_| Error::ErrRTPTransceiverCannotChangeMid)
302    }
303
304    /// mid gets the Transceiver's mid value. When not already set, this value will be set in CreateOffer or create_answer.
305    pub fn mid(&self) -> Option<SmolStr> {
306        self.mid.get().cloned()
307    }
308
309    /// kind returns RTPTransceiver's kind.
310    pub fn kind(&self) -> RTPCodecType {
311        self.kind
312    }
313
314    /// direction returns the RTPTransceiver's desired direction.
315    pub fn direction(&self) -> RTCRtpTransceiverDirection {
316        self.direction.load(Ordering::SeqCst).into()
317    }
318
319    /// Set the direction of this transceiver. This might trigger a renegotiation.
320    pub async fn set_direction(&self, d: RTCRtpTransceiverDirection) {
321        let changed = self.set_direction_internal(d);
322
323        if changed {
324            let lock = self.trigger_negotiation_needed.lock().await;
325            if let Some(trigger) = &*lock {
326                (trigger)().await;
327            }
328        }
329    }
330
331    pub(crate) fn set_direction_internal(&self, d: RTCRtpTransceiverDirection) -> bool {
332        let previous: RTCRtpTransceiverDirection =
333            self.direction.swap(d as u8, Ordering::SeqCst).into();
334
335        let changed = d != previous;
336
337        if changed {
338            trace!(
339                "Changing direction of transceiver from {} to {}",
340                previous,
341                d
342            );
343        }
344
345        changed
346    }
347
348    /// current_direction returns the RTPTransceiver's current direction as negotiated.
349    ///
350    /// If this transceiver has never been negotiated or if it's stopped this returns [`RTCRtpTransceiverDirection::Unspecified`].
351    pub fn current_direction(&self) -> RTCRtpTransceiverDirection {
352        if self.stopped.load(Ordering::SeqCst) {
353            return RTCRtpTransceiverDirection::Unspecified;
354        }
355
356        self.current_direction.load(Ordering::SeqCst).into()
357    }
358
359    pub(crate) fn set_current_direction(&self, d: RTCRtpTransceiverDirection) {
360        let previous: RTCRtpTransceiverDirection = self
361            .current_direction
362            .swap(d as u8, Ordering::SeqCst)
363            .into();
364
365        if d != previous {
366            trace!(
367                "Changing current direction of transceiver from {} to {}",
368                previous,
369                d,
370            );
371        }
372    }
373
374    /// Perform any subsequent actions after altering the transceiver's direction.
375    ///
376    /// After changing the transceiver's direction this method should be called to perform any
377    /// side-effects that results from the new direction, such as pausing/resuming the RTP receiver.
378    pub(crate) async fn process_new_current_direction(
379        &self,
380        previous_direction: RTCRtpTransceiverDirection,
381    ) -> Result<()> {
382        if self.stopped.load(Ordering::SeqCst) {
383            return Ok(());
384        }
385
386        let current_direction = self.current_direction();
387        if previous_direction != current_direction {
388            let mid = self.mid();
389            trace!(
390                "Processing transceiver({:?}) direction change from {} to {}",
391                mid,
392                previous_direction,
393                current_direction
394            );
395        } else {
396            // no change.
397            return Ok(());
398        }
399
400        {
401            let receiver = self.receiver.lock().await;
402            let pause_receiver = !current_direction.has_recv();
403
404            if pause_receiver {
405                receiver.pause().await?;
406            } else {
407                receiver.resume().await?;
408            }
409        }
410
411        let pause_sender = !current_direction.has_send();
412        {
413            let sender = &*self.sender.lock().await;
414            sender.set_paused(pause_sender);
415        }
416
417        Ok(())
418    }
419
420    /// stop irreversibly stops the RTPTransceiver
421    pub async fn stop(&self) -> Result<()> {
422        if self.stopped.load(Ordering::SeqCst) {
423            return Ok(());
424        }
425
426        self.stopped.store(true, Ordering::SeqCst);
427
428        {
429            let sender = self.sender.lock().await;
430            sender.stop().await?;
431        }
432        {
433            let r = self.receiver.lock().await;
434            r.stop().await?;
435        }
436
437        self.set_direction_internal(RTCRtpTransceiverDirection::Inactive);
438
439        Ok(())
440    }
441
442    pub(crate) async fn set_sending_track(
443        &self,
444        track: Option<Arc<dyn TrackLocal + Send + Sync>>,
445    ) -> Result<()> {
446        let track_is_none = track.is_none();
447        {
448            let sender = self.sender.lock().await;
449            sender.replace_track(track).await?;
450        }
451
452        let direction = self.direction();
453        let should_send = !track_is_none;
454        let should_recv = direction.has_recv();
455        self.set_direction_internal(RTCRtpTransceiverDirection::from_send_recv(
456            should_send,
457            should_recv,
458        ));
459
460        Ok(())
461    }
462}
463
464impl fmt::Debug for RTCRtpTransceiver {
465    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
466        f.debug_struct("RTCRtpTransceiver")
467            .field("mid", &self.mid)
468            .field("sender", &self.sender)
469            .field("receiver", &self.receiver)
470            .field("direction", &self.direction)
471            .field("current_direction", &self.current_direction)
472            .field("codecs", &self.codecs)
473            .field("stopped", &self.stopped)
474            .field("kind", &self.kind)
475            .finish()
476    }
477}
478
479pub(crate) async fn find_by_mid(
480    mid: &str,
481    local_transceivers: &mut Vec<Arc<RTCRtpTransceiver>>,
482) -> Option<Arc<RTCRtpTransceiver>> {
483    for (i, t) in local_transceivers.iter().enumerate() {
484        if t.mid() == Some(SmolStr::from(mid)) {
485            return Some(local_transceivers.remove(i));
486        }
487    }
488
489    None
490}
491
492/// Given a direction+type pluck a transceiver from the passed list
493/// if no entry satisfies the requested type+direction return a inactive Transceiver
494pub(crate) async fn satisfy_type_and_direction(
495    remote_kind: RTPCodecType,
496    remote_direction: RTCRtpTransceiverDirection,
497    local_transceivers: &mut Vec<Arc<RTCRtpTransceiver>>,
498) -> Option<Arc<RTCRtpTransceiver>> {
499    // Get direction order from most preferred to least
500    let get_preferred_directions = || -> Vec<RTCRtpTransceiverDirection> {
501        match remote_direction {
502            RTCRtpTransceiverDirection::Sendrecv => vec![
503                RTCRtpTransceiverDirection::Recvonly,
504                RTCRtpTransceiverDirection::Sendrecv,
505            ],
506            RTCRtpTransceiverDirection::Sendonly => vec![RTCRtpTransceiverDirection::Recvonly],
507            RTCRtpTransceiverDirection::Recvonly => vec![
508                RTCRtpTransceiverDirection::Sendonly,
509                RTCRtpTransceiverDirection::Sendrecv,
510            ],
511            _ => vec![],
512        }
513    };
514
515    for possible_direction in get_preferred_directions() {
516        for (i, t) in local_transceivers.iter().enumerate() {
517            if t.mid().is_none() && t.kind == remote_kind && possible_direction == t.direction() {
518                return Some(local_transceivers.remove(i));
519            }
520        }
521    }
522
523    None
524}