msf_rtp/
transceiver.rs

1//! RTP transceiver.
2
3use std::{
4    collections::{HashMap, VecDeque},
5    convert::Infallible,
6    ops::{Deref, DerefMut},
7    pin::Pin,
8    sync::Arc,
9    task::{Context, Poll},
10    time::Instant,
11};
12
13use futures::{ready, Sink, Stream};
14
15use crate::{
16    rtcp::{RtcpContext, RtcpContextHandle},
17    rtp::{IncomingRtpPacket, OrderedRtpPacket, RtpPacket},
18    utils::reorder::{ReorderingError, ReorderingMultiBuffer},
19};
20
21/// RTP packet transceiver.
22pub trait RtpTransceiver {
23    /// Get the transceiver's RTCP context.
24    ///
25    /// The transceiver is responsible for feeding the RTCP context with
26    /// incoming and outgoing RTP packets. This happens internally. The RTCP
27    /// context can be used then to generate RTCP reports and process incoming
28    /// RTCP packets.
29    fn rtcp_context(&self) -> RtcpContextHandle;
30}
31
32/// SSRC handling mode.
33#[derive(Debug, Copy, Clone, PartialEq, Eq)]
34pub enum SSRCMode {
35    /// Ignore incoming SSRCs and treat all packets as belonging to a single
36    /// SSRC.
37    ///
38    /// This mode is useful when dealing with buggy peers that change SSRCs
39    /// unexpectedly. The receiver reports will use the last seen SSRC.
40    Ignore,
41
42    /// Accept packets with any SSRC.
43    Any,
44
45    /// Accept packets only from specific SSRCs.
46    Specific,
47}
48
49/// SSRC to clock rate mapping.
50#[derive(Clone)]
51pub struct SSRC2ClockRate {
52    inner: HashMap<u32, u32>,
53}
54
55impl SSRC2ClockRate {
56    /// Create an empty mapping.
57    fn new() -> Self {
58        Self {
59            inner: HashMap::new(),
60        }
61    }
62
63    /// Create a mapping from a given iterator of `(ssrc, clock_rate)` tuples.
64    fn from_iter<T>(items: T) -> Self
65    where
66        T: IntoIterator<Item = (u32, u32)>,
67    {
68        Self {
69            inner: HashMap::from_iter(items),
70        }
71    }
72
73    /// Get the number of SSRCs in the mapping.
74    #[inline]
75    pub fn len(&self) -> usize {
76        self.inner.len()
77    }
78
79    /// Check if the mapping is empty.
80    #[inline]
81    pub fn is_empty(&self) -> bool {
82        self.inner.is_empty()
83    }
84
85    /// Get the clock rate for a given SSRC.
86    #[inline]
87    pub fn clock_rate(&self, ssrc: u32) -> Option<u32> {
88        self.inner.get(&ssrc).copied()
89    }
90
91    /// Check if the mapping contains a given SSRC.
92    #[inline]
93    pub fn contains(&self, ssrc: u32) -> bool {
94        self.inner.contains_key(&ssrc)
95    }
96
97    /// Get an iterator over all `(ssrc, clock_rate)` pairs within the mapping.
98    pub fn iter(&self) -> impl Iterator<Item = (u32, u32)> + use<'_> {
99        self.inner
100            .iter()
101            .map(|(&ssrc, &clock_rate)| (ssrc, clock_rate))
102    }
103}
104
105/// RTP transceiver options.
106#[derive(Clone)]
107pub struct RtpTransceiverOptions {
108    inner: ArcInnerTransceiverOptions,
109}
110
111impl RtpTransceiverOptions {
112    /// Create new options.
113    pub fn new() -> Self {
114        Self {
115            inner: ArcInnerTransceiverOptions::new(InnerTransceiverOptions::new()),
116        }
117    }
118
119    /// Get the primary sender SSRC.
120    #[inline]
121    pub fn primary_sender_ssrc(&self) -> u32 {
122        self.inner.primary_sender_ssrc
123    }
124
125    /// Set the primary sender SSRC.
126    ///
127    /// This SSRC will be used as the sender SSRC for RTCP reception reports.
128    /// The default value is random.
129    pub fn with_primary_sender_ssrc(mut self, ssrc: u32) -> Self {
130        self.inner.primary_sender_ssrc = ssrc;
131        self
132    }
133
134    /// Get depth of the reordering buffer for incoming RTP packets.
135    #[inline]
136    pub fn reordering_buffer_depth(&self) -> usize {
137        self.inner.reordering_buffer_depth
138    }
139
140    /// Set depth of the reordering buffer for incoming RTP packets.
141    ///
142    /// The default value is 64.
143    pub fn with_reordering_buffer_depth(mut self, depth: usize) -> Self {
144        self.inner.reordering_buffer_depth = depth;
145        self
146    }
147
148    /// Get the default clock rate for SSRCs without an explicit clock rate.
149    #[inline]
150    pub fn default_clock_rate(&self) -> u32 {
151        self.inner.default_clock_rate
152    }
153
154    /// Set the default clock rate for SSRCs without an explicit clock rate.
155    ///
156    /// This clock rate will be used when creating sender and receiver reports
157    /// for SSRCs where the clock rate is not known. The default value is
158    /// 90000.
159    pub fn with_default_clock_rate(mut self, clock_rate: u32) -> Self {
160        self.inner.default_clock_rate = clock_rate;
161        self
162    }
163
164    /// Get the input SSRC handling mode.
165    #[inline]
166    pub fn input_ssrc_mode(&self) -> SSRCMode {
167        self.inner.input_ssrc_mode
168    }
169
170    /// Set the input SSRC handling mode.
171    ///
172    /// The default mode is `SSRCMode::Any`.
173    pub fn with_input_ssrc_mode(mut self, mode: SSRCMode) -> Self {
174        self.inner.input_ssrc_mode = mode;
175        self
176    }
177
178    /// Get the maximum number of input SSRCs to track.
179    #[inline]
180    pub fn max_input_ssrcs(&self) -> Option<usize> {
181        self.inner.max_input_ssrcs
182    }
183
184    /// Set the maximum number of input SSRCs to track.
185    ///
186    /// This option is valid only when `input_ssrc_mode` is set to
187    /// `SSRCMode::Any`. Setting this option to `None` will allow unlimited
188    /// number of SSRCs. This should be used with caution as it may lead to
189    /// excessive memory usage. The default limit is 64 SSRCs.
190    ///
191    /// If there are more SSRCs than the limit, the least recently used SSRCs
192    /// will be dropped first.
193    pub fn with_max_input_ssrcs(mut self, max: Option<usize>) -> Self {
194        self.inner.max_input_ssrcs = max;
195        self
196    }
197
198    /// Get the input SSRC to clock rate mapping.
199    #[inline]
200    pub fn input_ssrcs(&self) -> &SSRC2ClockRate {
201        &self.inner.input_ssrcs
202    }
203
204    /// Set the expected input SSRCs along with their clock rates.
205    ///
206    /// The clock rate is used for generating RTCP receiver reports. The method
207    /// accepts an iterator of `(ssrc, clock_rate)` tuples.
208    ///
209    /// Note that if the clock rate for a given SSRC is not specified here, the
210    /// default clock rate will be used instead when generating receiver
211    /// reports. This may lead to incorrect reports if the actual clock rate
212    /// differs from the default one.
213    pub fn with_input_ssrcs<T>(mut self, ssrcs: T) -> Self
214    where
215        T: IntoIterator<Item = (u32, u32)>,
216    {
217        self.inner.input_ssrcs = SSRC2ClockRate::from_iter(ssrcs);
218        self
219    }
220
221    /// Get the output SSRC to clock rate mapping.
222    #[inline]
223    pub fn output_ssrcs(&self) -> &SSRC2ClockRate {
224        &self.inner.output_ssrcs
225    }
226
227    /// Set the output SSRCs along with their clock rates.
228    ///
229    /// The clock rate is used for generating RTCP sender reports. The method
230    /// accepts an iterator of `(ssrc, clock_rate)` tuples.
231    ///
232    /// Note that if the clock rate for a given SSRC is not specified here, the
233    /// default clock rate will be used instead when generating sender reports.
234    /// This may lead to incorrect reports if the actual clock rate differs
235    /// from the default one.
236    pub fn with_output_ssrcs<T>(mut self, ssrcs: T) -> Self
237    where
238        T: IntoIterator<Item = (u32, u32)>,
239    {
240        self.inner.output_ssrcs = SSRC2ClockRate::from_iter(ssrcs);
241        self
242    }
243
244    /// Get the maximum RTCP packet size.
245    #[inline]
246    pub fn max_rtcp_packet_size(&self) -> usize {
247        self.inner.max_rtcp_packet_size
248    }
249
250    /// Set the maximum RTCP packet size.
251    ///
252    /// Limiting the maximum RTCP packet size helps avoid IP packet
253    /// fragmentation. The default limit is 1200 bytes. This should be safe for
254    /// UDP transport in IPv4/IPv6 networks with typical MTU sizes in the
255    /// Internet environment.
256    pub fn with_max_rtcp_packet_size(mut self, size: usize) -> Self {
257        self.inner.max_rtcp_packet_size = size;
258        self
259    }
260}
261
262impl Default for RtpTransceiverOptions {
263    #[inline]
264    fn default() -> Self {
265        Self::new()
266    }
267}
268
269/// Helper type for shared transceiver options.
270///
271/// It implements copy-on-write semantics.
272#[derive(Clone)]
273struct ArcInnerTransceiverOptions {
274    inner: Arc<InnerTransceiverOptions>,
275}
276
277impl ArcInnerTransceiverOptions {
278    /// Create shared transceiver options.
279    fn new(inner: InnerTransceiverOptions) -> Self {
280        Self {
281            inner: Arc::new(inner),
282        }
283    }
284}
285
286impl Deref for ArcInnerTransceiverOptions {
287    type Target = InnerTransceiverOptions;
288
289    #[inline]
290    fn deref(&self) -> &Self::Target {
291        &self.inner
292    }
293}
294
295impl DerefMut for ArcInnerTransceiverOptions {
296    fn deref_mut(&mut self) -> &mut Self::Target {
297        Arc::make_mut(&mut self.inner)
298    }
299}
300
301/// Inner transceiver options.
302#[derive(Clone)]
303struct InnerTransceiverOptions {
304    primary_sender_ssrc: u32,
305    reordering_buffer_depth: usize,
306    default_clock_rate: u32,
307    input_ssrc_mode: SSRCMode,
308    max_input_ssrcs: Option<usize>,
309    input_ssrcs: SSRC2ClockRate,
310    output_ssrcs: SSRC2ClockRate,
311    max_rtcp_packet_size: usize,
312}
313
314impl InnerTransceiverOptions {
315    /// Create new inner options.
316    fn new() -> Self {
317        Self {
318            primary_sender_ssrc: rand::random(),
319            reordering_buffer_depth: 64,
320            default_clock_rate: 90000,
321            input_ssrc_mode: SSRCMode::Any,
322            max_input_ssrcs: Some(64),
323            input_ssrcs: SSRC2ClockRate::new(),
324            output_ssrcs: SSRC2ClockRate::new(),
325            max_rtcp_packet_size: 1200,
326        }
327    }
328}
329
330pin_project_lite::pin_project! {
331    /// Default RTP transceiver implementation.
332    pub struct DefaultRtpTransceiver<T, E = Infallible> {
333        #[pin]
334        inner: T,
335        context: TransceiverContext,
336        error: Option<E>,
337        eof: bool,
338    }
339}
340
341impl<T, E> DefaultRtpTransceiver<T, E> {
342    /// Create a new RTP packet receiver.
343    pub fn new(stream: T, options: RtpTransceiverOptions) -> Self {
344        Self {
345            inner: stream,
346            context: TransceiverContext::new(options),
347            error: None,
348            eof: false,
349        }
350    }
351}
352
353impl<T, E> Stream for DefaultRtpTransceiver<T, E>
354where
355    T: Stream<Item = Result<RtpPacket, E>>,
356{
357    type Item = Result<OrderedRtpPacket, E>;
358
359    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
360        let mut this = self.project();
361
362        loop {
363            if let Some(packet) = this.context.poll_next_ordered_packet() {
364                return Poll::Ready(Some(Ok(packet)));
365            }
366
367            let inner = this.inner.as_mut();
368
369            if !*this.eof {
370                let ready = if this.context.end_of_stream() {
371                    None
372                } else {
373                    ready!(inner.poll_next(cx))
374                };
375
376                match ready {
377                    Some(Ok(packet)) => this.context.process_incoming_packet(packet),
378                    other => {
379                        if let Some(Err(err)) = other {
380                            *this.error = Some(err);
381                        }
382
383                        *this.eof = true;
384                    }
385                }
386            } else if let Some(packet) = this.context.take_next_ordered_packet() {
387                return Poll::Ready(Some(Ok(packet)));
388            } else if let Some(err) = this.error.take() {
389                return Poll::Ready(Some(Err(err)));
390            } else {
391                return Poll::Ready(None);
392            }
393        }
394    }
395}
396
397impl<T, E> Sink<RtpPacket> for DefaultRtpTransceiver<T, E>
398where
399    T: Sink<RtpPacket>,
400{
401    type Error = T::Error;
402
403    #[inline]
404    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
405        let this = self.project();
406
407        this.inner.poll_ready(cx)
408    }
409
410    fn start_send(self: Pin<&mut Self>, packet: RtpPacket) -> Result<(), Self::Error> {
411        let this = self.project();
412
413        this.context.process_outgoing_packet(&packet);
414
415        this.inner.start_send(packet)?;
416
417        Ok(())
418    }
419
420    #[inline]
421    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
422        let this = self.project();
423
424        this.inner.poll_flush(cx)
425    }
426
427    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
428        let this = self.project();
429
430        ready!(this.inner.poll_close(cx))?;
431
432        this.context.close();
433
434        Poll::Ready(Ok(()))
435    }
436}
437
438impl<T, E> RtpTransceiver for DefaultRtpTransceiver<T, E> {
439    #[inline]
440    fn rtcp_context(&self) -> RtcpContextHandle {
441        self.context.rtcp_context()
442    }
443}
444
445/// RTP transceiver context.
446struct TransceiverContext {
447    options: RtpTransceiverOptions,
448    rtcp: RtcpContext,
449    buffer: ReorderingMultiBuffer,
450    output: VecDeque<OrderedRtpPacket>,
451}
452
453impl TransceiverContext {
454    /// Create a new RTP receiver context.
455    fn new(options: RtpTransceiverOptions) -> Self {
456        let input_ssrcs = options.input_ssrcs();
457        let expected_ssrcs = input_ssrcs.len();
458
459        let max_input_ssrcs = options.max_input_ssrcs().map(|max| max.max(expected_ssrcs));
460
461        let max_ssrc_buffers = match options.input_ssrc_mode() {
462            SSRCMode::Ignore => Some(1),
463            SSRCMode::Any => max_input_ssrcs,
464            SSRCMode::Specific => Some(expected_ssrcs),
465        };
466
467        let reordering_buffer_depth = options.reordering_buffer_depth();
468
469        Self {
470            options: options.clone(),
471            rtcp: RtcpContext::new(options),
472            buffer: ReorderingMultiBuffer::new(reordering_buffer_depth, max_ssrc_buffers),
473            output: VecDeque::new(),
474        }
475    }
476
477    /// Process a given outgoing RTP packet.
478    fn process_outgoing_packet(&mut self, packet: &RtpPacket) {
479        self.rtcp.process_outgoing_rtp_packet(packet);
480    }
481
482    /// Process a given incoming RTP packet.
483    fn process_incoming_packet(&mut self, packet: RtpPacket) {
484        let ssrc = packet.ssrc();
485
486        let input_ssrcs = self.options.input_ssrcs();
487        let input_ssrc_mode = self.options.input_ssrc_mode();
488
489        if input_ssrc_mode == SSRCMode::Specific && !input_ssrcs.contains(ssrc) {
490            return;
491        }
492
493        let now = Instant::now();
494
495        let packet = IncomingRtpPacket::new(packet, now);
496
497        // update the statistics (we need to do this before modifying the SSRC)
498        self.rtcp.process_incoming_rtp_packet(&packet);
499
500        let mut packet = RtpPacket::from(packet);
501
502        // set SSRC to 0 if we are ignoring SSRCs
503        if input_ssrc_mode == SSRCMode::Ignore {
504            packet = packet.with_ssrc(0);
505        }
506
507        let mut packet = IncomingRtpPacket::new(packet, now);
508
509        // put the packet into the reordering buffer, skipping missing packets
510        // if necessary
511        while let Err(ReorderingError::BufferFull(tmp)) = self.buffer.push(packet) {
512            if let Some(p) = self.buffer.take() {
513                self.process_ordered_packet(p);
514            }
515
516            packet = tmp;
517        }
518
519        // take all in-order packets from the reordering buffer
520        while let Some(p) = self.buffer.next() {
521            self.process_ordered_packet(p);
522        }
523    }
524
525    /// Process a given incoming RTP packet after reordering.
526    fn process_ordered_packet(&mut self, packet: OrderedRtpPacket) {
527        self.rtcp.process_ordered_rtp_packet(&packet);
528        self.output.push_back(packet);
529    }
530
531    /// Take the next incoming packet RTP without skipping missing packets.
532    fn poll_next_ordered_packet(&mut self) -> Option<OrderedRtpPacket> {
533        self.output.pop_front()
534    }
535
536    /// Take the next incoming RTP packet from the reordering buffer.
537    ///
538    /// This method will skip missing packets if necessary.
539    fn take_next_ordered_packet(&mut self) -> Option<OrderedRtpPacket> {
540        while self.output.is_empty() {
541            if self.buffer.is_empty() {
542                break;
543            } else if let Some(packet) = self.buffer.take() {
544                self.process_ordered_packet(packet);
545            }
546        }
547
548        self.output.pop_front()
549    }
550
551    /// Check if the end of stream has been signaled by the other peer via the
552    /// RTCP channel.
553    fn end_of_stream(&self) -> bool {
554        self.rtcp.end_of_stream()
555    }
556
557    /// Signal the end of stream to the other peer via the RTCP channel.
558    fn close(&mut self) {
559        self.rtcp.close();
560    }
561
562    /// Get the transceiver's RTCP context handle.
563    #[inline]
564    fn rtcp_context(&self) -> RtcpContextHandle {
565        self.rtcp.handle()
566    }
567}