Skip to main content

msf_rtp/rtcp/
context.rs

1use std::{
2    collections::HashMap,
3    num::NonZeroUsize,
4    sync::{Arc, Mutex},
5    task::{Context, Poll, Waker},
6    time::Duration,
7};
8
9use lru::LruCache;
10
11use crate::{
12    rtcp::{
13        stats::{SSRCRxStats, SSRCTxStats},
14        ByePacket, CompoundRtcpPacket, ReceiverReport, ReportBlock, RtcpHeader, RtcpPacket,
15        SenderReport, SourceDescription, SourceDescriptionPacket,
16    },
17    rtp::{IncomingRtpPacket, OrderedRtpPacket, RtpPacket},
18    transceiver::{RtpTransceiverOptions, SSRCMode},
19};
20
21/// RTCP context.
22///
23/// This struct manages RTCP state, including sender and receiver statistics,
24/// source descriptions, and RTCP report generation. It is used for interaction
25/// between RTP channels and their corresponding RTCP channels.
26///
27/// The RTCP context is designed to be owned by an RTP channel/transceiver,
28/// while a handle to the context can be shared with the RTCP channel for
29/// processing incoming RTCP packets and generating RTCP reports.
30///
31/// The context will be closed automatically when dropped.
32pub struct RtcpContext {
33    inner: Arc<Mutex<InternalContext>>,
34}
35
36impl RtcpContext {
37    /// Create a new RTCP context.
38    pub fn new(options: RtpTransceiverOptions) -> Self {
39        Self {
40            inner: Arc::new(Mutex::new(InternalContext::new(options))),
41        }
42    }
43
44    /// Process a given outgoing RTP packet.
45    pub fn process_outgoing_rtp_packet(&self, packet: &RtpPacket) {
46        self.inner
47            .lock()
48            .unwrap()
49            .process_outgoing_rtp_packet(packet);
50    }
51
52    /// Process a given incoming RTP packet.
53    pub fn process_incoming_rtp_packet(&self, packet: &IncomingRtpPacket) {
54        self.inner
55            .lock()
56            .unwrap()
57            .process_incoming_rtp_packet(packet);
58    }
59
60    /// Process a given incoming RTP packet after reordering.
61    ///
62    /// Note that if the underlying RTP transport is ordered and no packet
63    /// reordering is needed, the method `process_incoming_rtp_packet` still
64    /// needs to be called for each packet before calling this method.
65    pub fn process_ordered_rtp_packet(&self, packet: &OrderedRtpPacket) {
66        self.inner
67            .lock()
68            .unwrap()
69            .process_ordered_rtp_packet(packet);
70    }
71
72    /// Close the RTCP context.
73    ///
74    /// This will generate BYE packets for all active sender SSRCs and stop
75    /// generating further RTCP reports. A sender SSRC is considered active if
76    /// we have sent at least one RTP packet with the SSRC.
77    pub fn close(&self) {
78        self.inner.lock().unwrap().close();
79    }
80
81    /// Check if the end of stream has been reached.
82    ///
83    /// The method checks the end-of-stream condition based on the configured
84    /// SSRC mode and the reception of BYE packets for the relevant SSRCs.
85    ///
86    /// * If the input SSRC mode is `Specific`, the method returns true if BYE
87    ///   packets have been received for all configured input SSRCs.
88    /// * If the input SSRC mode is `Ignore`, the method returns true if at
89    ///   least one BYE packet has been received.
90    /// * If the input SSRC mode is `Any`, the method returns true if BYE
91    ///   packets have been received for all currently tracked SSRCs on the
92    ///   receiver side and there is at least one such SSRC.
93    pub fn end_of_stream(&self) -> bool {
94        self.inner.lock().unwrap().end_of_stream()
95    }
96
97    /// Create a context handle that can be shared with the companion RTCP
98    /// channel.
99    pub fn handle(&self) -> RtcpContextHandle {
100        RtcpContextHandle {
101            inner: self.inner.clone(),
102        }
103    }
104}
105
106impl Drop for RtcpContext {
107    #[inline]
108    fn drop(&mut self) {
109        self.close();
110    }
111}
112
113/// RTCP context handle.
114///
115/// This handle can be shared with an RTCP channel for processing incoming
116/// RTCP packets and generating RTCP reports.
117#[derive(Clone)]
118pub struct RtcpContextHandle {
119    inner: Arc<Mutex<InternalContext>>,
120}
121
122impl RtcpContextHandle {
123    /// Process a given receiver report.
124    pub fn process_incoming_receiver_report(&self, report: &ReceiverReport) {
125        self.inner
126            .lock()
127            .unwrap()
128            .process_incoming_receiver_report(report);
129    }
130
131    /// Process a given sender report.
132    pub fn process_incoming_sender_report(&self, report: &SenderReport) {
133        self.inner
134            .lock()
135            .unwrap()
136            .process_incoming_sender_report(report);
137    }
138
139    /// Process a given BYE packet.
140    pub fn process_incoming_bye_packet(&self, packet: &ByePacket) {
141        self.inner
142            .lock()
143            .unwrap()
144            .process_incoming_bye_packet(packet);
145    }
146
147    /// Create RTCP reports.
148    ///
149    /// This method generates receiver and/or sender reports for all SSRCs that
150    /// appeared since the last call to this method. If no RTP packets have
151    /// been sent or received since the last call, empty receiver reports for
152    /// all active sender SSRCs will be generated. A sender SSRC is considered
153    /// active if we have sent at least one RTP packet with the SSRC.
154    ///
155    /// The method also generates BYE packets for all active sender SSRCs if
156    /// the context has been closed. The method will return an empty vector if
157    /// the context has already been closed and all corresponding BYE packets
158    /// have been generated.
159    pub fn create_rtcp_reports(&mut self) -> Vec<CompoundRtcpPacket> {
160        self.inner.lock().unwrap().create_rtcp_reports()
161    }
162
163    /// Close the RTCP context.
164    ///
165    /// This will generate BYE packets for all active sender SSRCs and stop
166    /// generating further RTCP reports. A sender SSRC is considered active if
167    /// we have sent at least one RTP packet with the SSRC.
168    pub fn close(&self) {
169        self.inner.lock().unwrap().close();
170    }
171
172    /// Poll the closed state of the RTCP context.
173    ///
174    /// The method returns `Poll::Ready(())` if the `close` method has been
175    /// called or the parent RTCP context has been dropped. Otherwise, it
176    /// returns `Poll::Pending`. It can be used to register a task waker that
177    /// will be notified when the context is closed.
178    ///
179    /// There can be only one task waker per the whole context. Only the last
180    /// registered waker will be notified when the context is closed.
181    pub fn poll_closed(&self, cx: &mut Context<'_>) -> Poll<()> {
182        self.inner.lock().unwrap().poll_closed(cx)
183    }
184
185    /// Check if the end of stream has been reached.
186    ///
187    /// The method checks the end-of-stream condition based on the configured
188    /// SSRC mode and the reception of BYE packets for the relevant SSRCs.
189    ///
190    /// * If the input SSRC mode is `Specific`, the method returns true if BYE
191    ///   packets have been received for all configured input SSRCs.
192    /// * If the input SSRC mode is `Ignore`, the method returns true if at
193    ///   least one BYE packet has been received.
194    /// * If the input SSRC mode is `Any`, the method returns true if BYE
195    ///   packets have been received for all currently tracked SSRCs on the
196    ///   receiver side and there is at least one such SSRC.
197    pub fn end_of_stream(&self) -> bool {
198        self.inner.lock().unwrap().end_of_stream()
199    }
200}
201
202/// Internal RTCP context state.
203#[derive(Debug, Copy, Clone, PartialEq, Eq)]
204enum ContextState {
205    Open,
206    Closing,
207    Closed,
208}
209
210/// Internal RTCP context.
211struct InternalContext {
212    options: RtpTransceiverOptions,
213    source_descriptions: SourceDescriptionCache,
214    rx_stats: LruCache<u32, SSRCRxStats>,
215    tx_stats: HashMap<u32, SSRCTxStats>,
216    last_ssrc: Option<u32>,
217    state: ContextState,
218    closed_waker: Option<Waker>,
219}
220
221impl InternalContext {
222    /// Create a new internal RTCP context.
223    fn new(options: RtpTransceiverOptions) -> Self {
224        let input_ssrc_mode = options.input_ssrc_mode();
225        let max_input_ssrcs = options.max_input_ssrcs();
226
227        let rx_stats = if let (SSRCMode::Any, Some(max)) = (input_ssrc_mode, max_input_ssrcs) {
228            let input_ssrcs = options.input_ssrcs();
229
230            let max = NonZeroUsize::new(max.max(input_ssrcs.len())).unwrap_or(NonZeroUsize::MIN);
231
232            LruCache::new(max)
233        } else {
234            LruCache::unbounded()
235        };
236
237        Self {
238            options,
239            source_descriptions: SourceDescriptionCache::new(),
240            rx_stats,
241            tx_stats: HashMap::new(),
242            last_ssrc: None,
243            state: ContextState::Open,
244            closed_waker: None,
245        }
246    }
247
248    /// Process a given outgoing RTP packet.
249    fn process_outgoing_rtp_packet(&mut self, packet: &RtpPacket) {
250        self.get_tx_stats_mut(packet.ssrc())
251            .process_outgoing_packet(packet);
252    }
253
254    /// Process a given incoming RTP packet.
255    fn process_incoming_rtp_packet(&mut self, packet: &IncomingRtpPacket) {
256        let mut ssrc = packet.ssrc();
257
258        self.last_ssrc = Some(ssrc);
259
260        let input_ssrcs = self.options.input_ssrcs();
261
262        match self.options.input_ssrc_mode() {
263            SSRCMode::Ignore => ssrc = 0,
264            SSRCMode::Specific if !input_ssrcs.contains(ssrc) => return,
265            _ => (),
266        }
267
268        self.get_rx_stats_mut(ssrc)
269            .process_incoming_rtp_packet(packet);
270    }
271
272    /// Process a given incoming RTP packet after reordering.
273    fn process_ordered_rtp_packet(&mut self, packet: &OrderedRtpPacket) {
274        let mut ssrc = packet.ssrc();
275
276        let input_ssrcs = self.options.input_ssrcs();
277
278        match self.options.input_ssrc_mode() {
279            SSRCMode::Ignore => ssrc = 0,
280            SSRCMode::Specific if !input_ssrcs.contains(ssrc) => return,
281            _ => (),
282        }
283
284        self.get_rx_stats_mut(ssrc)
285            .process_ordered_rtp_packet(packet);
286    }
287
288    /// Process a given sender report.
289    fn process_incoming_sender_report(&mut self, report: &SenderReport) {
290        if let Some(stats) = self.rx_stats.peek_mut(&report.sender_ssrc()) {
291            stats.process_incoming_sender_report(report);
292        }
293
294        self.process_incoming_reception_report_blocks(report.report_blocks());
295    }
296
297    /// Process a given receiver report.
298    fn process_incoming_receiver_report(&mut self, report: &ReceiverReport) {
299        self.process_incoming_reception_report_blocks(report.report_blocks());
300    }
301
302    /// Process given reception report blocks.
303    fn process_incoming_reception_report_blocks(&mut self, _: &[ReportBlock]) {
304        // we have no use for reception reports at the moment
305    }
306
307    /// Process a given BYE packet.
308    fn process_incoming_bye_packet(&mut self, packet: &ByePacket) {
309        let sources = if self.options.input_ssrc_mode() == SSRCMode::Ignore {
310            &[0]
311        } else {
312            packet.sources()
313        };
314
315        for &ssrc in sources {
316            self.get_rx_stats_mut(ssrc)
317                .process_incoming_bye_packet(packet);
318        }
319    }
320
321    /// Check if end of stream has been reached.
322    ///
323    /// The method checks the end-of-stream condition based on the configured
324    /// SSRC mode and the reception of BYE packets for the relevant SSRCs.
325    ///
326    /// * If the input SSRC mode is `Specific`, the method returns true if BYE
327    ///   packets have been received for all configured input SSRCs.
328    /// * If the input SSRC mode is `Ignore`, the method returns true if we
329    ///   have received at least one BYE packet.
330    /// * If the input SSRC mode is `Any`, the method returns true if BYE
331    ///   packets have been received for all currently tracked SSRCs on the
332    ///   receiver side and there is at least one such SSRC.
333    fn end_of_stream(&self) -> bool {
334        if self.options.input_ssrc_mode() == SSRCMode::Specific {
335            self.options.input_ssrcs().iter().all(|(ssrc, _)| {
336                self.rx_stats
337                    .peek(&ssrc)
338                    .map(|stats| stats.bye_received())
339                    .unwrap_or(false)
340            })
341        } else {
342            !self.rx_stats.is_empty() && self.rx_stats.iter().all(|(_, stats)| stats.bye_received())
343        }
344    }
345
346    /// Close the RTCP context.
347    ///
348    /// This will generate BYE packets for all active sender SSRCs and stop
349    /// generating further RTCP reports. A sender SSRC is considered active if
350    /// we have sent at least one RTP packet with the SSRC.
351    fn close(&mut self) {
352        if self.state != ContextState::Open {
353            return;
354        }
355
356        self.state = ContextState::Closing;
357
358        if let Some(waker) = self.closed_waker.take() {
359            waker.wake();
360        }
361    }
362
363    /// Poll the closed state of the RTCP context.
364    ///
365    /// The method returns `Poll::Ready(())` if the `close` method has been
366    /// called or the parent RTCP context has been dropped. Otherwise, it
367    /// returns `Poll::Pending`. It can be used to register a task waker that
368    /// will be notified when the context is closed.
369    ///
370    /// There can be only one task waker per the whole context. Only the last
371    /// registered waker will be notified when the context is closed.
372    fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
373        if self.state == ContextState::Open {
374            let waker = cx.waker();
375
376            self.closed_waker = Some(waker.clone());
377
378            Poll::Pending
379        } else {
380            Poll::Ready(())
381        }
382    }
383
384    /// Get transmission statistics for a given SSRC.
385    fn get_tx_stats_mut(&mut self, ssrc: u32) -> &mut SSRCTxStats {
386        // helper function
387        fn create_tx_stats(ssrc: u32, options: &RtpTransceiverOptions) -> SSRCTxStats {
388            let clock_rate = options
389                .output_ssrcs()
390                .clock_rate(ssrc)
391                .unwrap_or(options.default_clock_rate());
392
393            SSRCTxStats::new(ssrc, clock_rate)
394        }
395
396        self.tx_stats
397            .entry(ssrc)
398            .or_insert_with(|| create_tx_stats(ssrc, &self.options))
399    }
400
401    /// Get reception statistics for a given SSRC.
402    fn get_rx_stats_mut(&mut self, ssrc: u32) -> &mut SSRCRxStats {
403        // helper function
404        fn create_rx_stats(ssrc: u32, options: &RtpTransceiverOptions) -> SSRCRxStats {
405            let clock_rate = options
406                .input_ssrcs()
407                .clock_rate(ssrc)
408                .unwrap_or(options.default_clock_rate());
409
410            SSRCRxStats::new(ssrc, clock_rate)
411        }
412
413        self.rx_stats
414            .get_or_insert_mut(ssrc, || create_rx_stats(ssrc, &self.options))
415    }
416
417    /// Generate reception report blocks.
418    ///
419    /// The method returns an iterator over reception report blocks for all
420    /// tracked SSRCs ordered by the time since the last report was generated.
421    /// Only SSRCs with packets received since the last report will be included
422    /// in the output.
423    ///
424    /// Consumer of the iterator should not call the `next` method if the
425    /// resulting report block would be dropped (e.g. due to size limits) as
426    /// that would update the last report generation instant for the
427    /// corresponding SSRC and thus affect the ordering of subsequent calls.
428    fn generate_reception_report_blocks(&mut self) -> impl Iterator<Item = ReportBlock> + use<'_> {
429        let mut report_order = self
430            .rx_stats
431            .iter()
432            .map(|(&ssrc, stats)| {
433                let duration_since_last_report = stats
434                    .last_reception_report_at()
435                    .map(|t| t.elapsed())
436                    .unwrap_or(Duration::MAX);
437
438                (ssrc, duration_since_last_report)
439            })
440            .collect::<Vec<_>>();
441
442        report_order.sort_unstable_by_key(|&(_, d)| d);
443        report_order.reverse();
444
445        report_order
446            .into_iter()
447            .filter_map(|(ssrc, _)| self.rx_stats.peek_mut(&ssrc)?.create_reception_report())
448            .map(|block| {
449                if self.options.input_ssrc_mode() == SSRCMode::Ignore {
450                    if let Some(ssrc) = self.last_ssrc {
451                        return block.with_ssrc(ssrc);
452                    }
453                }
454
455                block
456            })
457    }
458
459    /// Create the primary RTCP report.
460    ///
461    /// This report is created for the primary sender SSRC and may include
462    /// reception report blocks for the peer SSRCs.
463    fn create_primary_rtcp_report(&mut self) -> CompoundRtcpPacket {
464        let sender_ssrc = self.options.primary_sender_ssrc();
465
466        let report = self
467            .tx_stats
468            .get_mut(&sender_ssrc)
469            .and_then(|stats| stats.create_sender_report())
470            .map(RtcpReport::Sender)
471            .unwrap_or_else(|| RtcpReport::Receiver(ReceiverReport::new(sender_ssrc)));
472
473        let mut builder = RtcpReportBuilder::new(report);
474
475        let sdes = self.source_descriptions.get(sender_ssrc);
476
477        let mut min_packets = 2;
478        let mut encoded_size = sdes.raw_size();
479
480        let bye = if self.state != ContextState::Open {
481            let pkt = ByePacket::new([sender_ssrc]);
482
483            let encoded = pkt.encode();
484
485            encoded_size += encoded.raw_size();
486            min_packets += 1;
487
488            Some(encoded)
489        } else {
490            None
491        };
492
493        let mut packets = Vec::with_capacity(min_packets);
494
495        let max_encoded_size = self.options.max_rtcp_packet_size();
496
497        let mut report_blocks = self.generate_reception_report_blocks();
498
499        while (encoded_size + builder.size() + ReportBlock::RAW_SIZE) <= max_encoded_size {
500            if let Some(block) = report_blocks.next() {
501                if let Some(packet) = builder.add(block) {
502                    encoded_size += packet.raw_size();
503
504                    packets.push(packet);
505                }
506            } else {
507                break;
508            }
509        }
510
511        if !builder.is_empty() || packets.is_empty() {
512            packets.push(builder.build_and_encode());
513        }
514
515        packets.push(sdes);
516
517        if let Some(bye) = bye {
518            packets.push(bye);
519        }
520
521        CompoundRtcpPacket::new(packets)
522    }
523
524    /// Create RTCP reports.
525    ///
526    /// This method generates receiver and/or sender reports for all SSRCs that
527    /// appeared since the last call to this method. If no RTP packets have
528    /// been sent or received since the last call, empty receiver reports for
529    /// all active sender SSRCs will be generated. A sender SSRC is considered
530    /// active if we have sent at least one RTP packet with the SSRC.
531    ///
532    /// The method also generates BYE packets for all active sender SSRCs if
533    /// the context has been closed. The method will return an empty vector if
534    /// the context has already been closed and all corresponding BYE packets
535    /// have been generated.
536    fn create_rtcp_reports(&mut self) -> Vec<CompoundRtcpPacket> {
537        match self.state {
538            ContextState::Open => (),
539            ContextState::Closing => self.state = ContextState::Closed,
540            ContextState::Closed => return Vec::new(),
541        }
542
543        let mut reports = vec![self.create_primary_rtcp_report()];
544
545        let secondary_report_packets = if self.state == ContextState::Open {
546            2
547        } else {
548            3
549        };
550
551        // We consider each sender SSRC as an independent RTP participant, so
552        // we create separate sender reports for them.
553        let secondary = self
554            .tx_stats
555            .iter_mut()
556            .filter(|(&ssrc, _)| ssrc != self.options.primary_sender_ssrc())
557            .map(|(&ssrc, stats)| {
558                let mut packets = Vec::with_capacity(secondary_report_packets);
559
560                // if there are no sender stats to report, create an empty
561                // receiver report instead
562                let report = stats
563                    .create_sender_report()
564                    .map(RtcpReport::Sender)
565                    .unwrap_or_else(|| RtcpReport::Receiver(ReceiverReport::new(ssrc)));
566
567                packets.push(report.encode());
568                packets.push(self.source_descriptions.get(ssrc));
569
570                if self.state != ContextState::Open {
571                    let bye = ByePacket::new([ssrc]);
572
573                    packets.push(bye.encode());
574                }
575
576                CompoundRtcpPacket::new(packets)
577            });
578
579        reports.extend(secondary);
580        reports
581    }
582}
583
584/// Cache for source description packets.
585struct SourceDescriptionCache {
586    descriptions: HashMap<u32, RtcpPacket>,
587}
588
589impl SourceDescriptionCache {
590    /// Create a new source description packet cache.
591    fn new() -> Self {
592        Self {
593            descriptions: HashMap::new(),
594        }
595    }
596
597    /// Get a source description packet for a given SSRC.
598    fn get(&mut self, ssrc: u32) -> RtcpPacket {
599        self.descriptions
600            .entry(ssrc)
601            .or_insert_with(|| {
602                let cname = format!("{:016x}", rand::random::<u64>());
603
604                let desc = SourceDescription::new(ssrc, cname);
605
606                SourceDescriptionPacket::new()
607                    .with_source_descriptions([desc])
608                    .encode()
609            })
610            .clone()
611    }
612}
613
614/// RTCP report builder.
615struct RtcpReportBuilder {
616    report: RtcpReport,
617    blocks: Vec<ReportBlock>,
618    size: usize,
619}
620
621impl RtcpReportBuilder {
622    /// Create a new RTCP report builder.
623    ///
624    /// # Arguments
625    /// * `report` - the initial RTCP report (sender or receiver)
626    fn new<T>(report: T) -> Self
627    where
628        T: Into<RtcpReport>,
629    {
630        let report = report.into();
631
632        let size = RtcpHeader::RAW_SIZE + report.raw_size();
633
634        Self {
635            report,
636            blocks: Vec::new(),
637            size,
638        }
639    }
640
641    /// Get the current size of the report in bytes.
642    fn size(&self) -> usize {
643        self.size
644    }
645
646    /// Check if the report is empty (i.e. there are no report blocks).
647    fn is_empty(&self) -> bool {
648        self.blocks.is_empty()
649    }
650
651    /// Check if the report is full (i.e. contains the maximum allowed number
652    /// of report blocks).
653    fn is_full(&self) -> bool {
654        self.blocks.len() >= 31
655    }
656
657    /// Add a given report block to the report.
658    ///
659    /// The method will finalize the report and encode it into an RTCP packet
660    /// if the report becomes full after adding the block. In such case, the
661    /// method will return the encoded RTCP packet and a new empty receiver
662    /// report will be started.
663    ///
664    /// This keeps the invariant that the currently open report is never full
665    /// making the packet size limit calculations simpler.
666    fn add(&mut self, block: ReportBlock) -> Option<RtcpPacket> {
667        self.size += block.raw_size();
668
669        self.blocks.push(block);
670
671        if !self.is_full() {
672            return None;
673        }
674
675        let empty = ReceiverReport::new(self.report.sender_ssrc());
676
677        let full = std::mem::replace(self, Self::new(empty));
678
679        Some(full.build_and_encode())
680    }
681
682    /// Finalize the sender/receiver report and encode it into an RTCP packet.
683    fn build_and_encode(self) -> RtcpPacket {
684        self.report.with_report_blocks(self.blocks).encode()
685    }
686}
687
688/// RTCP report.
689enum RtcpReport {
690    Sender(SenderReport),
691    Receiver(ReceiverReport),
692}
693
694impl RtcpReport {
695    /// Get the sender SSRC.
696    fn sender_ssrc(&self) -> u32 {
697        match self {
698            Self::Sender(sr) => sr.sender_ssrc(),
699            Self::Receiver(rr) => rr.sender_ssrc(),
700        }
701    }
702
703    /// Get size of the encoded report.
704    fn raw_size(&self) -> usize {
705        match self {
706            Self::Sender(sr) => sr.raw_size(),
707            Self::Receiver(rr) => rr.raw_size(),
708        }
709    }
710
711    /// Set the reception report blocks.
712    fn with_report_blocks(self, blocks: Vec<ReportBlock>) -> Self {
713        match self {
714            Self::Sender(sr) => Self::Sender(sr.with_report_blocks(blocks)),
715            Self::Receiver(rr) => Self::Receiver(rr.with_report_blocks(blocks)),
716        }
717    }
718
719    /// Encode the report into an RTCP packet.
720    fn encode(&self) -> RtcpPacket {
721        match self {
722            Self::Sender(sr) => sr.encode(),
723            Self::Receiver(rr) => rr.encode(),
724        }
725    }
726}
727
728impl From<SenderReport> for RtcpReport {
729    fn from(sr: SenderReport) -> Self {
730        Self::Sender(sr)
731    }
732}
733
734impl From<ReceiverReport> for RtcpReport {
735    fn from(rr: ReceiverReport) -> Self {
736        Self::Receiver(rr)
737    }
738}
739
740#[cfg(test)]
741mod tests {
742    use std::time::Instant;
743
744    use super::InternalContext;
745
746    use crate::{
747        rtcp::{ByePacket, ReceiverReport, RtcpPacketType, SenderReport},
748        rtp::{IncomingRtpPacket, OrderedRtpPacket, RtpPacket},
749        transceiver::{RtpTransceiverOptions, SSRCMode},
750    };
751
752    fn make_rtp_packet(ssrc: u32, seq: u16, timestamp: u32) -> RtpPacket {
753        RtpPacket::new()
754            .with_ssrc(ssrc)
755            .with_sequence_number(seq)
756            .with_timestamp(timestamp)
757    }
758
759    fn make_ordered_rtp_packet(ssrc: u32, index: u64, timestamp: u32) -> OrderedRtpPacket {
760        let packet = make_rtp_packet(ssrc, index as u16, timestamp);
761
762        let incoming = IncomingRtpPacket::new(packet, Instant::now());
763
764        OrderedRtpPacket::new(incoming, index)
765    }
766
767    #[test]
768    fn test_input_ssrc_ignore_mode() {
769        let options = RtpTransceiverOptions::new()
770            .with_default_clock_rate(1000)
771            .with_primary_sender_ssrc(1)
772            .with_input_ssrc_mode(SSRCMode::Ignore);
773
774        let mut context = InternalContext::new(options);
775
776        let packets = vec![
777            make_ordered_rtp_packet(10, 1, 100),
778            make_ordered_rtp_packet(20, 1, 200),
779            make_ordered_rtp_packet(30, 1, 300),
780        ];
781
782        context.process_incoming_rtp_packet(&packets[0]);
783        context.process_ordered_rtp_packet(&packets[0]);
784        context.process_incoming_rtp_packet(&packets[1]);
785        context.process_ordered_rtp_packet(&packets[1]);
786
787        let ssrcs = context
788            .rx_stats
789            .iter()
790            .map(|(&ssrc, _)| ssrc)
791            .collect::<Vec<_>>();
792
793        assert_eq!(&ssrcs[..], &[0]);
794
795        context.process_incoming_rtp_packet(&packets[2]);
796        context.process_ordered_rtp_packet(&packets[2]);
797
798        let report = context.create_primary_rtcp_report();
799
800        assert_eq!(report.len(), 2);
801
802        let rr = &report[0];
803        let sdes = &report[1];
804
805        assert_eq!(rr.packet_type(), RtcpPacketType::RR);
806        assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
807
808        // we expect only one report block with the last SSRC used (i.e. 30)
809        let rr = ReceiverReport::decode(rr).unwrap();
810
811        let rbs = rr.report_blocks();
812
813        assert_eq!(rbs.len(), 1);
814
815        for rb in rbs {
816            assert_eq!(rb.ssrc(), 30);
817        }
818    }
819
820    #[test]
821    fn test_input_ssrc_specific_mode() {
822        let options = RtpTransceiverOptions::new()
823            .with_default_clock_rate(1000)
824            .with_primary_sender_ssrc(1)
825            .with_input_ssrc_mode(SSRCMode::Specific)
826            .with_input_ssrcs([(20, 1000)]);
827
828        let mut context = InternalContext::new(options);
829
830        let packets = vec![
831            make_ordered_rtp_packet(10, 1, 100),
832            make_ordered_rtp_packet(20, 1, 200),
833            make_ordered_rtp_packet(30, 1, 300),
834        ];
835
836        for packet in &packets {
837            context.process_incoming_rtp_packet(packet);
838            context.process_ordered_rtp_packet(packet);
839        }
840
841        let ssrcs = context
842            .rx_stats
843            .iter()
844            .map(|(&ssrc, _)| ssrc)
845            .collect::<Vec<_>>();
846
847        assert_eq!(&ssrcs[..], &[20]);
848
849        let report = context.create_primary_rtcp_report();
850
851        assert_eq!(report.len(), 2);
852
853        let rr = &report[0];
854        let sdes = &report[1];
855
856        assert_eq!(rr.packet_type(), RtcpPacketType::RR);
857        assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
858
859        // we expect only one report block for SSRC 20
860        let rr = ReceiverReport::decode(rr).unwrap();
861
862        let rbs = rr.report_blocks();
863
864        assert_eq!(rbs.len(), 1);
865
866        for rb in rbs {
867            assert_eq!(rb.ssrc(), 20);
868        }
869    }
870
871    #[test]
872    fn test_input_ssrc_any_mode() {
873        let options = RtpTransceiverOptions::new()
874            .with_default_clock_rate(1000)
875            .with_primary_sender_ssrc(1)
876            .with_input_ssrc_mode(SSRCMode::Any)
877            .with_max_input_ssrcs(Some(2));
878
879        let mut context = InternalContext::new(options);
880
881        let packets = vec![
882            make_ordered_rtp_packet(10, 1, 100),
883            make_ordered_rtp_packet(20, 1, 200),
884            make_ordered_rtp_packet(30, 1, 300),
885        ];
886
887        context.process_incoming_rtp_packet(&packets[0]);
888        context.process_ordered_rtp_packet(&packets[0]);
889        context.process_incoming_rtp_packet(&packets[1]);
890        context.process_ordered_rtp_packet(&packets[1]);
891
892        let mut ssrcs = context
893            .rx_stats
894            .iter()
895            .map(|(&ssrc, _)| ssrc)
896            .collect::<Vec<_>>();
897
898        ssrcs.sort_unstable();
899
900        assert_eq!(&ssrcs[..], &[10, 20]);
901
902        context.process_incoming_rtp_packet(&packets[2]);
903        context.process_ordered_rtp_packet(&packets[2]);
904
905        let mut ssrcs = context
906            .rx_stats
907            .iter()
908            .map(|(&ssrc, _)| ssrc)
909            .collect::<Vec<_>>();
910
911        ssrcs.sort_unstable();
912
913        // we expect the least recently updated SSRC stats to be dropped
914        assert_eq!(&ssrcs[..], &[20, 30]);
915
916        let report = context.create_primary_rtcp_report();
917
918        assert_eq!(report.len(), 2);
919
920        let rr = &report[0];
921        let sdes = &report[1];
922
923        assert_eq!(rr.packet_type(), RtcpPacketType::RR);
924        assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
925
926        // we expect there will be two report blocks - one for SSRC 20 and one
927        // for SSRC 30
928        let rr = ReceiverReport::decode(rr).unwrap();
929
930        let rbs = rr.report_blocks();
931
932        assert_eq!(rbs.len(), 2);
933
934        for rb in rbs {
935            assert!(rb.ssrc() == 20 || rb.ssrc() == 30);
936        }
937    }
938
939    #[test]
940    fn test_sender_report_generation() {
941        let options = RtpTransceiverOptions::new()
942            .with_default_clock_rate(1000)
943            .with_primary_sender_ssrc(10)
944            .with_input_ssrc_mode(SSRCMode::Ignore);
945
946        let mut context = InternalContext::new(options);
947
948        let packet = make_ordered_rtp_packet(10, 1, 100);
949
950        context.process_incoming_rtp_packet(&packet);
951        context.process_ordered_rtp_packet(&packet);
952        context.process_outgoing_rtp_packet(&packet);
953
954        let report = context.create_primary_rtcp_report();
955
956        assert_eq!(report.len(), 2);
957
958        let sr = &report[0];
959        let sdes = &report[1];
960
961        assert_eq!(sr.packet_type(), RtcpPacketType::SR);
962        assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
963
964        let sr = SenderReport::decode(sr).unwrap();
965
966        assert_eq!(sr.sender_ssrc(), 10);
967        assert_eq!(sr.octet_count(), 0);
968        assert_eq!(sr.packet_count(), 1);
969
970        let rbs = sr.report_blocks();
971
972        assert_eq!(rbs.len(), 1);
973
974        for rb in rbs {
975            assert_eq!(rb.ssrc(), 10);
976        }
977    }
978
979    #[test]
980    fn test_multiple_sender_ssrcs() {
981        let options = RtpTransceiverOptions::new()
982            .with_default_clock_rate(1000)
983            .with_primary_sender_ssrc(10);
984
985        let mut context = InternalContext::new(options);
986
987        context.process_outgoing_rtp_packet(&make_rtp_packet(10, 1, 100));
988        context.process_outgoing_rtp_packet(&make_rtp_packet(20, 1, 100));
989
990        let reports = context.create_rtcp_reports();
991
992        assert_eq!(reports.len(), 2);
993
994        // there should be two packets in each report: SR and SDES
995        for r in &reports {
996            assert_eq!(r.len(), 2);
997
998            let sr = &r[0];
999            let sdes = &r[1];
1000
1001            assert_eq!(sr.packet_type(), RtcpPacketType::SR);
1002            assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
1003        }
1004
1005        let primary = &reports[0][0];
1006        let secondary = &reports[1][0];
1007
1008        let sr = SenderReport::decode(primary).unwrap();
1009
1010        assert_eq!(sr.sender_ssrc(), 10);
1011
1012        let sr = SenderReport::decode(secondary).unwrap();
1013
1014        assert_eq!(sr.sender_ssrc(), 20);
1015    }
1016
1017    #[test]
1018    fn test_end_of_stream() {
1019        // first we test it with any/arbitrary number of input SSRCs
1020        let options = RtpTransceiverOptions::new()
1021            .with_default_clock_rate(1000)
1022            .with_primary_sender_ssrc(1)
1023            .with_input_ssrc_mode(SSRCMode::Any)
1024            .with_max_input_ssrcs(Some(3));
1025
1026        let mut context = InternalContext::new(options);
1027
1028        assert!(!context.end_of_stream());
1029
1030        let packets = vec![
1031            make_ordered_rtp_packet(10, 1, 100),
1032            make_ordered_rtp_packet(20, 1, 200),
1033            make_ordered_rtp_packet(30, 1, 300),
1034        ];
1035
1036        for packet in &packets {
1037            context.process_incoming_rtp_packet(packet);
1038            context.process_ordered_rtp_packet(packet);
1039        }
1040
1041        assert!(!context.end_of_stream());
1042
1043        context.process_incoming_bye_packet(&ByePacket::new([10]));
1044
1045        assert!(!context.end_of_stream());
1046
1047        context.process_incoming_bye_packet(&ByePacket::new([20, 30]));
1048
1049        assert!(context.end_of_stream());
1050
1051        // then we test it with specific input SSRCs
1052        let options = RtpTransceiverOptions::new()
1053            .with_default_clock_rate(1000)
1054            .with_primary_sender_ssrc(1)
1055            .with_input_ssrc_mode(SSRCMode::Specific)
1056            .with_input_ssrcs([(10, 1000), (20, 1000), (30, 1000)]);
1057
1058        let mut context = InternalContext::new(options);
1059
1060        assert!(!context.end_of_stream());
1061
1062        let packets = vec![
1063            make_ordered_rtp_packet(10, 1, 100),
1064            make_ordered_rtp_packet(20, 1, 200),
1065        ];
1066
1067        for packet in &packets {
1068            context.process_incoming_rtp_packet(packet);
1069            context.process_ordered_rtp_packet(packet);
1070        }
1071
1072        assert!(!context.end_of_stream());
1073
1074        context.process_incoming_bye_packet(&ByePacket::new([10]));
1075
1076        assert!(!context.end_of_stream());
1077
1078        context.process_incoming_bye_packet(&ByePacket::new([20]));
1079
1080        assert!(!context.end_of_stream());
1081
1082        context.process_incoming_bye_packet(&ByePacket::new([30]));
1083
1084        assert!(context.end_of_stream());
1085    }
1086
1087    #[test]
1088    fn test_multi_packet_receiver_report() {
1089        let options = RtpTransceiverOptions::new()
1090            .with_default_clock_rate(1000)
1091            .with_primary_sender_ssrc(0)
1092            .with_input_ssrc_mode(SSRCMode::Any)
1093            .with_max_input_ssrcs(None)
1094            .with_max_rtcp_packet_size(836);
1095
1096        let mut context = InternalContext::new(options);
1097
1098        for i in 0..33 {
1099            let packet = make_ordered_rtp_packet(0 + i, 1, 100);
1100
1101            context.process_incoming_rtp_packet(&packet);
1102            context.process_ordered_rtp_packet(&packet);
1103        }
1104
1105        let report = context.create_primary_rtcp_report();
1106
1107        assert_eq!(report.len(), 3);
1108
1109        let rr = &report[0];
1110
1111        assert_eq!(rr.packet_type(), RtcpPacketType::RR);
1112        assert_eq!(rr.raw_size(), 752);
1113
1114        let rr = ReceiverReport::decode(rr).unwrap();
1115
1116        let rbs = rr.report_blocks();
1117
1118        assert_eq!(rbs.len(), 31);
1119
1120        for (i, rb) in rbs.iter().enumerate() {
1121            assert_eq!(rb.ssrc(), i as u32);
1122        }
1123
1124        let rr = &report[1];
1125
1126        assert_eq!(rr.packet_type(), RtcpPacketType::RR);
1127        assert_eq!(rr.raw_size(), 56);
1128
1129        let rr = ReceiverReport::decode(rr).unwrap();
1130
1131        let rbs = rr.report_blocks();
1132
1133        assert_eq!(rbs.len(), 2);
1134
1135        for (i, rb) in rbs.iter().enumerate() {
1136            assert_eq!(rb.ssrc(), (i + 31) as u32);
1137        }
1138
1139        let sdes = &report[2];
1140
1141        assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
1142        assert_eq!(sdes.raw_size(), 28);
1143
1144        assert_eq!(report.raw_size(), 836);
1145
1146        // now we repeat the test with a smaller max RTCP packet size that
1147        // forces us to create two receiver report packets but with one less
1148        // report block
1149        let options = RtpTransceiverOptions::new()
1150            .with_default_clock_rate(1000)
1151            .with_primary_sender_ssrc(0)
1152            .with_input_ssrc_mode(SSRCMode::Any)
1153            .with_max_input_ssrcs(None)
1154            .with_max_rtcp_packet_size(835);
1155
1156        let mut context = InternalContext::new(options);
1157
1158        for i in 0..33 {
1159            let packet = make_ordered_rtp_packet(0 + i, 1, 100);
1160
1161            context.process_incoming_rtp_packet(&packet);
1162            context.process_ordered_rtp_packet(&packet);
1163        }
1164
1165        let report = context.create_primary_rtcp_report();
1166
1167        assert_eq!(report.len(), 3);
1168
1169        let rr = &report[0];
1170
1171        assert_eq!(rr.packet_type(), RtcpPacketType::RR);
1172        assert_eq!(rr.raw_size(), 752);
1173
1174        let rr = ReceiverReport::decode(rr).unwrap();
1175
1176        let rbs = rr.report_blocks();
1177
1178        assert_eq!(rbs.len(), 31);
1179
1180        for (i, rb) in rbs.iter().enumerate() {
1181            assert_eq!(rb.ssrc(), i as u32);
1182        }
1183
1184        let rr = &report[1];
1185
1186        assert_eq!(rr.packet_type(), RtcpPacketType::RR);
1187        assert_eq!(rr.raw_size(), 32);
1188
1189        let rr = ReceiverReport::decode(rr).unwrap();
1190
1191        let rbs = rr.report_blocks();
1192
1193        assert_eq!(rbs.len(), 1);
1194
1195        for (i, rb) in rbs.iter().enumerate() {
1196            assert_eq!(rb.ssrc(), (i + 31) as u32);
1197        }
1198
1199        let sdes = &report[2];
1200
1201        assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
1202        assert_eq!(sdes.raw_size(), 28);
1203
1204        assert_eq!(report.raw_size(), 812);
1205    }
1206
1207    #[test]
1208    fn test_context_closing() {
1209        let options = RtpTransceiverOptions::new()
1210            .with_default_clock_rate(1000)
1211            .with_primary_sender_ssrc(10);
1212
1213        let mut context = InternalContext::new(options);
1214
1215        let reports = context.create_rtcp_reports();
1216
1217        assert_eq!(reports.len(), 1);
1218
1219        for r in &reports {
1220            assert_eq!(r.len(), 2); // empty RR + SDES
1221        }
1222
1223        context.process_outgoing_rtp_packet(&make_rtp_packet(10, 1, 100));
1224        context.process_outgoing_rtp_packet(&make_rtp_packet(20, 1, 100));
1225
1226        let reports = context.create_rtcp_reports();
1227
1228        assert_eq!(reports.len(), 2);
1229
1230        for r in &reports {
1231            assert_eq!(r.len(), 2); // empty RR + SDES
1232        }
1233
1234        context.close();
1235
1236        let reports = context.create_rtcp_reports();
1237
1238        assert_eq!(reports.len(), 2);
1239
1240        for r in &reports {
1241            assert_eq!(r.len(), 3); // SR + SDES + BYE
1242
1243            let bye = &r[2];
1244
1245            assert_eq!(bye.packet_type(), RtcpPacketType::BYE);
1246        }
1247
1248        let reports = context.create_rtcp_reports();
1249
1250        assert!(reports.is_empty());
1251    }
1252}