1#[cfg(test)]
2mod rtp_transceiver_test;
3
4use std::fmt;
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9
10use interceptor::stream_info::{AssociatedStreamInfo, RTPHeaderExtension, StreamInfo};
11use interceptor::Attributes;
12use log::trace;
13use portable_atomic::{AtomicBool, AtomicU8};
14use serde::{Deserialize, Serialize};
15use smol_str::SmolStr;
16use tokio::sync::{Mutex, OnceCell};
17
18use crate::api::media_engine::MediaEngine;
19use crate::error::{Error, Result};
20use crate::rtp_transceiver::rtp_codec::*;
21use crate::rtp_transceiver::rtp_receiver::{RTCRtpReceiver, RTPReceiverInternal};
22use crate::rtp_transceiver::rtp_sender::RTCRtpSender;
23use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
24use crate::track::track_local::TrackLocal;
25
26pub(crate) mod fmtp;
27pub mod rtp_codec;
28pub mod rtp_receiver;
29pub mod rtp_sender;
30pub mod rtp_transceiver_direction;
31pub(crate) mod srtp_writer_future;
32
33#[allow(clippy::upper_case_acronyms)]
39pub type SSRC = u32;
40
41pub type PayloadType = u8;
46
47pub const TYPE_RTCP_FB_TRANSPORT_CC: &str = "transport-cc";
49
50pub const TYPE_RTCP_FB_GOOG_REMB: &str = "goog-remb";
52
53pub const TYPE_RTCP_FB_ACK: &str = "ack";
55
56pub const TYPE_RTCP_FB_CCM: &str = "ccm";
58
59pub const TYPE_RTCP_FB_NACK: &str = "nack";
61
62#[derive(Default, Debug, Clone, PartialEq, Eq)]
65pub struct RTCPFeedback {
66 pub typ: String,
70
71 pub parameter: String,
74}
75
76#[derive(Default, Debug, Clone)]
79pub struct RTCRtpCapabilities {
80 pub codecs: Vec<RTCRtpCodecCapability>,
81 pub header_extensions: Vec<RTCRtpHeaderExtensionCapability>,
82}
83
84#[derive(Default, Debug, Clone, Serialize, Deserialize)]
87pub struct RTCRtpRtxParameters {
88 pub ssrc: SSRC,
89}
90
91#[derive(Default, Debug, Clone, Serialize, Deserialize)]
95pub struct RTCRtpCodingParameters {
96 pub rid: SmolStr,
97 pub ssrc: SSRC,
98 pub payload_type: PayloadType,
99 pub rtx: RTCRtpRtxParameters,
100}
101
102pub type RTCRtpDecodingParameters = RTCRtpCodingParameters;
106
107pub type RTCRtpEncodingParameters = RTCRtpCodingParameters;
111
112#[derive(Debug)]
114pub struct RTCRtpReceiveParameters {
115 pub encodings: Vec<RTCRtpDecodingParameters>,
116}
117
118#[derive(Debug)]
120pub struct RTCRtpSendParameters {
121 pub rtp_parameters: RTCRtpParameters,
122 pub encodings: Vec<RTCRtpEncodingParameters>,
123}
124
125pub struct RTCRtpTransceiverInit {
127 pub direction: RTCRtpTransceiverDirection,
128 pub send_encodings: Vec<RTCRtpEncodingParameters>,
129 }
131
132pub(crate) fn create_stream_info(
133 id: String,
134 ssrc: SSRC,
135 payload_type: PayloadType,
136 codec: RTCRtpCodecCapability,
137 webrtc_header_extensions: &[RTCRtpHeaderExtensionParameters],
138 associated_stream: Option<AssociatedStreamInfo>,
139) -> StreamInfo {
140 let header_extensions: Vec<RTPHeaderExtension> = webrtc_header_extensions
141 .iter()
142 .map(|h| RTPHeaderExtension {
143 id: h.id,
144 uri: h.uri.clone(),
145 })
146 .collect();
147
148 let feedbacks: Vec<_> = codec
149 .rtcp_feedback
150 .iter()
151 .map(|f| interceptor::stream_info::RTCPFeedback {
152 typ: f.typ.clone(),
153 parameter: f.parameter.clone(),
154 })
155 .collect();
156
157 StreamInfo {
158 id,
159 attributes: Attributes::new(),
160 ssrc,
161 payload_type,
162 rtp_header_extensions: header_extensions,
163 mime_type: codec.mime_type,
164 clock_rate: codec.clock_rate,
165 channels: codec.channels,
166 sdp_fmtp_line: codec.sdp_fmtp_line,
167 rtcp_feedback: feedbacks,
168 associated_stream,
169 }
170}
171
172pub type TriggerNegotiationNeededFnOption =
173 Option<Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> + Send + Sync>>;
174
175pub struct RTCRtpTransceiver {
177 mid: OnceCell<SmolStr>, sender: Mutex<Arc<RTCRtpSender>>, receiver: Mutex<Arc<RTCRtpReceiver>>, direction: AtomicU8, current_direction: AtomicU8, codecs: Arc<Mutex<Vec<RTCRtpCodecParameters>>>, pub(crate) stopped: AtomicBool,
187 pub(crate) kind: RTPCodecType,
188
189 media_engine: Arc<MediaEngine>,
190
191 trigger_negotiation_needed: Mutex<TriggerNegotiationNeededFnOption>,
192}
193
194impl RTCRtpTransceiver {
195 pub async fn new(
196 receiver: Arc<RTCRtpReceiver>,
197 sender: Arc<RTCRtpSender>,
198 direction: RTCRtpTransceiverDirection,
199 kind: RTPCodecType,
200 codecs: Vec<RTCRtpCodecParameters>,
201 media_engine: Arc<MediaEngine>,
202 trigger_negotiation_needed: TriggerNegotiationNeededFnOption,
203 ) -> Arc<Self> {
204 let codecs = Arc::new(Mutex::new(codecs));
205 receiver.set_transceiver_codecs(Some(Arc::clone(&codecs)));
206
207 let t = Arc::new(RTCRtpTransceiver {
208 mid: OnceCell::new(),
209 sender: Mutex::new(sender),
210 receiver: Mutex::new(receiver),
211
212 direction: AtomicU8::new(direction as u8),
213 current_direction: AtomicU8::new(RTCRtpTransceiverDirection::Unspecified as u8),
214
215 codecs,
216 stopped: AtomicBool::new(false),
217 kind,
218 media_engine,
219 trigger_negotiation_needed: Mutex::new(trigger_negotiation_needed),
220 });
221 t.sender()
222 .await
223 .set_rtp_transceiver(Some(Arc::downgrade(&t)));
224
225 t
226 }
227
228 pub async fn set_codec_preferences(&self, codecs: Vec<RTCRtpCodecParameters>) -> Result<()> {
231 for codec in &codecs {
232 let media_engine_codecs = self.media_engine.get_codecs_by_kind(self.kind);
233 let (_, match_type) = codec_parameters_fuzzy_search(codec, &media_engine_codecs);
234 if match_type == CodecMatch::None {
235 return Err(Error::ErrRTPTransceiverCodecUnsupported);
236 }
237 }
238
239 {
240 let mut c = self.codecs.lock().await;
241 *c = codecs;
242 }
243 Ok(())
244 }
245
246 pub(crate) async fn get_codecs(&self) -> Vec<RTCRtpCodecParameters> {
248 let mut codecs = self.codecs.lock().await;
249 RTPReceiverInternal::get_codecs(&mut codecs, self.kind, &self.media_engine)
250 }
251
252 pub async fn sender(&self) -> Arc<RTCRtpSender> {
254 let sender = self.sender.lock().await;
255 sender.clone()
256 }
257
258 pub async fn set_sender_track(
260 self: &Arc<Self>,
261 sender: Arc<RTCRtpSender>,
262 track: Option<Arc<dyn TrackLocal + Send + Sync>>,
263 ) -> Result<()> {
264 self.set_sender(sender).await;
265 self.set_sending_track(track).await
266 }
267
268 pub async fn set_sender(self: &Arc<Self>, s: Arc<RTCRtpSender>) {
269 s.set_rtp_transceiver(Some(Arc::downgrade(self)));
270
271 let prev_sender = self.sender().await;
272 prev_sender.set_rtp_transceiver(None);
273
274 {
275 let mut sender = self.sender.lock().await;
276 *sender = s;
277 }
278 }
279
280 pub async fn receiver(&self) -> Arc<RTCRtpReceiver> {
282 let receiver = self.receiver.lock().await;
283 receiver.clone()
284 }
285
286 pub(crate) async fn set_receiver(&self, r: Arc<RTCRtpReceiver>) {
287 r.set_transceiver_codecs(Some(Arc::clone(&self.codecs)));
288
289 {
290 let mut receiver = self.receiver.lock().await;
291 (*receiver).set_transceiver_codecs(None);
292
293 *receiver = r;
294 }
295 }
296
297 pub(crate) fn set_mid(&self, mid: SmolStr) -> Result<()> {
299 self.mid
300 .set(mid)
301 .map_err(|_| Error::ErrRTPTransceiverCannotChangeMid)
302 }
303
304 pub fn mid(&self) -> Option<SmolStr> {
306 self.mid.get().cloned()
307 }
308
309 pub fn kind(&self) -> RTPCodecType {
311 self.kind
312 }
313
314 pub fn direction(&self) -> RTCRtpTransceiverDirection {
316 self.direction.load(Ordering::SeqCst).into()
317 }
318
319 pub async fn set_direction(&self, d: RTCRtpTransceiverDirection) {
321 let changed = self.set_direction_internal(d);
322
323 if changed {
324 let lock = self.trigger_negotiation_needed.lock().await;
325 if let Some(trigger) = &*lock {
326 (trigger)().await;
327 }
328 }
329 }
330
331 pub(crate) fn set_direction_internal(&self, d: RTCRtpTransceiverDirection) -> bool {
332 let previous: RTCRtpTransceiverDirection =
333 self.direction.swap(d as u8, Ordering::SeqCst).into();
334
335 let changed = d != previous;
336
337 if changed {
338 trace!(
339 "Changing direction of transceiver from {} to {}",
340 previous,
341 d
342 );
343 }
344
345 changed
346 }
347
348 pub fn current_direction(&self) -> RTCRtpTransceiverDirection {
352 if self.stopped.load(Ordering::SeqCst) {
353 return RTCRtpTransceiverDirection::Unspecified;
354 }
355
356 self.current_direction.load(Ordering::SeqCst).into()
357 }
358
359 pub(crate) fn set_current_direction(&self, d: RTCRtpTransceiverDirection) {
360 let previous: RTCRtpTransceiverDirection = self
361 .current_direction
362 .swap(d as u8, Ordering::SeqCst)
363 .into();
364
365 if d != previous {
366 trace!(
367 "Changing current direction of transceiver from {} to {}",
368 previous,
369 d,
370 );
371 }
372 }
373
374 pub(crate) async fn process_new_current_direction(
379 &self,
380 previous_direction: RTCRtpTransceiverDirection,
381 ) -> Result<()> {
382 if self.stopped.load(Ordering::SeqCst) {
383 return Ok(());
384 }
385
386 let current_direction = self.current_direction();
387 if previous_direction != current_direction {
388 let mid = self.mid();
389 trace!(
390 "Processing transceiver({:?}) direction change from {} to {}",
391 mid,
392 previous_direction,
393 current_direction
394 );
395 } else {
396 return Ok(());
398 }
399
400 {
401 let receiver = self.receiver.lock().await;
402 let pause_receiver = !current_direction.has_recv();
403
404 if pause_receiver {
405 receiver.pause().await?;
406 } else {
407 receiver.resume().await?;
408 }
409 }
410
411 let pause_sender = !current_direction.has_send();
412 {
413 let sender = &*self.sender.lock().await;
414 sender.set_paused(pause_sender);
415 }
416
417 Ok(())
418 }
419
420 pub async fn stop(&self) -> Result<()> {
422 if self.stopped.load(Ordering::SeqCst) {
423 return Ok(());
424 }
425
426 self.stopped.store(true, Ordering::SeqCst);
427
428 {
429 let sender = self.sender.lock().await;
430 sender.stop().await?;
431 }
432 {
433 let r = self.receiver.lock().await;
434 r.stop().await?;
435 }
436
437 self.set_direction_internal(RTCRtpTransceiverDirection::Inactive);
438
439 Ok(())
440 }
441
442 pub(crate) async fn set_sending_track(
443 &self,
444 track: Option<Arc<dyn TrackLocal + Send + Sync>>,
445 ) -> Result<()> {
446 let track_is_none = track.is_none();
447 {
448 let sender = self.sender.lock().await;
449 sender.replace_track(track).await?;
450 }
451
452 let direction = self.direction();
453 let should_send = !track_is_none;
454 let should_recv = direction.has_recv();
455 self.set_direction_internal(RTCRtpTransceiverDirection::from_send_recv(
456 should_send,
457 should_recv,
458 ));
459
460 Ok(())
461 }
462}
463
464impl fmt::Debug for RTCRtpTransceiver {
465 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
466 f.debug_struct("RTCRtpTransceiver")
467 .field("mid", &self.mid)
468 .field("sender", &self.sender)
469 .field("receiver", &self.receiver)
470 .field("direction", &self.direction)
471 .field("current_direction", &self.current_direction)
472 .field("codecs", &self.codecs)
473 .field("stopped", &self.stopped)
474 .field("kind", &self.kind)
475 .finish()
476 }
477}
478
479pub(crate) async fn find_by_mid(
480 mid: &str,
481 local_transceivers: &mut Vec<Arc<RTCRtpTransceiver>>,
482) -> Option<Arc<RTCRtpTransceiver>> {
483 for (i, t) in local_transceivers.iter().enumerate() {
484 if t.mid() == Some(SmolStr::from(mid)) {
485 return Some(local_transceivers.remove(i));
486 }
487 }
488
489 None
490}
491
492pub(crate) async fn satisfy_type_and_direction(
495 remote_kind: RTPCodecType,
496 remote_direction: RTCRtpTransceiverDirection,
497 local_transceivers: &mut Vec<Arc<RTCRtpTransceiver>>,
498) -> Option<Arc<RTCRtpTransceiver>> {
499 let get_preferred_directions = || -> Vec<RTCRtpTransceiverDirection> {
501 match remote_direction {
502 RTCRtpTransceiverDirection::Sendrecv => vec![
503 RTCRtpTransceiverDirection::Recvonly,
504 RTCRtpTransceiverDirection::Sendrecv,
505 ],
506 RTCRtpTransceiverDirection::Sendonly => vec![RTCRtpTransceiverDirection::Recvonly],
507 RTCRtpTransceiverDirection::Recvonly => vec![
508 RTCRtpTransceiverDirection::Sendonly,
509 RTCRtpTransceiverDirection::Sendrecv,
510 ],
511 _ => vec![],
512 }
513 };
514
515 for possible_direction in get_preferred_directions() {
516 for (i, t) in local_transceivers.iter().enumerate() {
517 if t.mid().is_none() && t.kind == remote_kind && possible_direction == t.direction() {
518 return Some(local_transceivers.remove(i));
519 }
520 }
521 }
522
523 None
524}