1use std::{
2 collections::HashMap,
3 num::NonZeroUsize,
4 sync::{Arc, Mutex},
5 task::{Context, Poll, Waker},
6 time::Duration,
7};
8
9use lru::LruCache;
10
11use crate::{
12 rtcp::{
13 stats::{SSRCRxStats, SSRCTxStats},
14 ByePacket, CompoundRtcpPacket, ReceiverReport, ReportBlock, RtcpHeader, RtcpPacket,
15 SenderReport, SourceDescription, SourceDescriptionPacket,
16 },
17 rtp::{IncomingRtpPacket, OrderedRtpPacket, RtpPacket},
18 transceiver::{RtpTransceiverOptions, SSRCMode},
19};
20
21pub struct RtcpContext {
33 inner: Arc<Mutex<InternalContext>>,
34}
35
36impl RtcpContext {
37 pub fn new(options: RtpTransceiverOptions) -> Self {
39 Self {
40 inner: Arc::new(Mutex::new(InternalContext::new(options))),
41 }
42 }
43
44 pub fn process_outgoing_rtp_packet(&self, packet: &RtpPacket) {
46 self.inner
47 .lock()
48 .unwrap()
49 .process_outgoing_rtp_packet(packet);
50 }
51
52 pub fn process_incoming_rtp_packet(&self, packet: &IncomingRtpPacket) {
54 self.inner
55 .lock()
56 .unwrap()
57 .process_incoming_rtp_packet(packet);
58 }
59
60 pub fn process_ordered_rtp_packet(&self, packet: &OrderedRtpPacket) {
66 self.inner
67 .lock()
68 .unwrap()
69 .process_ordered_rtp_packet(packet);
70 }
71
72 pub fn close(&self) {
78 self.inner.lock().unwrap().close();
79 }
80
81 pub fn end_of_stream(&self) -> bool {
94 self.inner.lock().unwrap().end_of_stream()
95 }
96
97 pub fn handle(&self) -> RtcpContextHandle {
100 RtcpContextHandle {
101 inner: self.inner.clone(),
102 }
103 }
104}
105
106impl Drop for RtcpContext {
107 #[inline]
108 fn drop(&mut self) {
109 self.close();
110 }
111}
112
113#[derive(Clone)]
118pub struct RtcpContextHandle {
119 inner: Arc<Mutex<InternalContext>>,
120}
121
122impl RtcpContextHandle {
123 pub fn process_incoming_receiver_report(&self, report: &ReceiverReport) {
125 self.inner
126 .lock()
127 .unwrap()
128 .process_incoming_receiver_report(report);
129 }
130
131 pub fn process_incoming_sender_report(&self, report: &SenderReport) {
133 self.inner
134 .lock()
135 .unwrap()
136 .process_incoming_sender_report(report);
137 }
138
139 pub fn process_incoming_bye_packet(&self, packet: &ByePacket) {
141 self.inner
142 .lock()
143 .unwrap()
144 .process_incoming_bye_packet(packet);
145 }
146
147 pub fn create_rtcp_reports(&mut self) -> Vec<CompoundRtcpPacket> {
160 self.inner.lock().unwrap().create_rtcp_reports()
161 }
162
163 pub fn close(&self) {
169 self.inner.lock().unwrap().close();
170 }
171
172 pub fn poll_closed(&self, cx: &mut Context<'_>) -> Poll<()> {
182 self.inner.lock().unwrap().poll_closed(cx)
183 }
184
185 pub fn end_of_stream(&self) -> bool {
198 self.inner.lock().unwrap().end_of_stream()
199 }
200}
201
202#[derive(Debug, Copy, Clone, PartialEq, Eq)]
204enum ContextState {
205 Open,
206 Closing,
207 Closed,
208}
209
210struct InternalContext {
212 options: RtpTransceiverOptions,
213 source_descriptions: SourceDescriptionCache,
214 rx_stats: LruCache<u32, SSRCRxStats>,
215 tx_stats: HashMap<u32, SSRCTxStats>,
216 last_ssrc: Option<u32>,
217 state: ContextState,
218 closed_waker: Option<Waker>,
219}
220
221impl InternalContext {
222 fn new(options: RtpTransceiverOptions) -> Self {
224 let input_ssrc_mode = options.input_ssrc_mode();
225 let max_input_ssrcs = options.max_input_ssrcs();
226
227 let rx_stats = if let (SSRCMode::Any, Some(max)) = (input_ssrc_mode, max_input_ssrcs) {
228 let input_ssrcs = options.input_ssrcs();
229
230 let max = NonZeroUsize::new(max.max(input_ssrcs.len())).unwrap_or(NonZeroUsize::MIN);
231
232 LruCache::new(max)
233 } else {
234 LruCache::unbounded()
235 };
236
237 Self {
238 options,
239 source_descriptions: SourceDescriptionCache::new(),
240 rx_stats,
241 tx_stats: HashMap::new(),
242 last_ssrc: None,
243 state: ContextState::Open,
244 closed_waker: None,
245 }
246 }
247
248 fn process_outgoing_rtp_packet(&mut self, packet: &RtpPacket) {
250 self.get_tx_stats_mut(packet.ssrc())
251 .process_outgoing_packet(packet);
252 }
253
254 fn process_incoming_rtp_packet(&mut self, packet: &IncomingRtpPacket) {
256 let mut ssrc = packet.ssrc();
257
258 self.last_ssrc = Some(ssrc);
259
260 let input_ssrcs = self.options.input_ssrcs();
261
262 match self.options.input_ssrc_mode() {
263 SSRCMode::Ignore => ssrc = 0,
264 SSRCMode::Specific if !input_ssrcs.contains(ssrc) => return,
265 _ => (),
266 }
267
268 self.get_rx_stats_mut(ssrc)
269 .process_incoming_rtp_packet(packet);
270 }
271
272 fn process_ordered_rtp_packet(&mut self, packet: &OrderedRtpPacket) {
274 let mut ssrc = packet.ssrc();
275
276 let input_ssrcs = self.options.input_ssrcs();
277
278 match self.options.input_ssrc_mode() {
279 SSRCMode::Ignore => ssrc = 0,
280 SSRCMode::Specific if !input_ssrcs.contains(ssrc) => return,
281 _ => (),
282 }
283
284 self.get_rx_stats_mut(ssrc)
285 .process_ordered_rtp_packet(packet);
286 }
287
288 fn process_incoming_sender_report(&mut self, report: &SenderReport) {
290 if let Some(stats) = self.rx_stats.peek_mut(&report.sender_ssrc()) {
291 stats.process_incoming_sender_report(report);
292 }
293
294 self.process_incoming_reception_report_blocks(report.report_blocks());
295 }
296
297 fn process_incoming_receiver_report(&mut self, report: &ReceiverReport) {
299 self.process_incoming_reception_report_blocks(report.report_blocks());
300 }
301
302 fn process_incoming_reception_report_blocks(&mut self, _: &[ReportBlock]) {
304 }
306
307 fn process_incoming_bye_packet(&mut self, packet: &ByePacket) {
309 let sources = if self.options.input_ssrc_mode() == SSRCMode::Ignore {
310 &[0]
311 } else {
312 packet.sources()
313 };
314
315 for &ssrc in sources {
316 self.get_rx_stats_mut(ssrc)
317 .process_incoming_bye_packet(packet);
318 }
319 }
320
321 fn end_of_stream(&self) -> bool {
334 if self.options.input_ssrc_mode() == SSRCMode::Specific {
335 self.options.input_ssrcs().iter().all(|(ssrc, _)| {
336 self.rx_stats
337 .peek(&ssrc)
338 .map(|stats| stats.bye_received())
339 .unwrap_or(false)
340 })
341 } else {
342 !self.rx_stats.is_empty() && self.rx_stats.iter().all(|(_, stats)| stats.bye_received())
343 }
344 }
345
346 fn close(&mut self) {
352 if self.state != ContextState::Open {
353 return;
354 }
355
356 self.state = ContextState::Closing;
357
358 if let Some(waker) = self.closed_waker.take() {
359 waker.wake();
360 }
361 }
362
363 fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
373 if self.state == ContextState::Open {
374 let waker = cx.waker();
375
376 self.closed_waker = Some(waker.clone());
377
378 Poll::Pending
379 } else {
380 Poll::Ready(())
381 }
382 }
383
384 fn get_tx_stats_mut(&mut self, ssrc: u32) -> &mut SSRCTxStats {
386 fn create_tx_stats(ssrc: u32, options: &RtpTransceiverOptions) -> SSRCTxStats {
388 let clock_rate = options
389 .output_ssrcs()
390 .clock_rate(ssrc)
391 .unwrap_or(options.default_clock_rate());
392
393 SSRCTxStats::new(ssrc, clock_rate)
394 }
395
396 self.tx_stats
397 .entry(ssrc)
398 .or_insert_with(|| create_tx_stats(ssrc, &self.options))
399 }
400
401 fn get_rx_stats_mut(&mut self, ssrc: u32) -> &mut SSRCRxStats {
403 fn create_rx_stats(ssrc: u32, options: &RtpTransceiverOptions) -> SSRCRxStats {
405 let clock_rate = options
406 .input_ssrcs()
407 .clock_rate(ssrc)
408 .unwrap_or(options.default_clock_rate());
409
410 SSRCRxStats::new(ssrc, clock_rate)
411 }
412
413 self.rx_stats
414 .get_or_insert_mut(ssrc, || create_rx_stats(ssrc, &self.options))
415 }
416
417 fn generate_reception_report_blocks(&mut self) -> impl Iterator<Item = ReportBlock> + use<'_> {
429 let mut report_order = self
430 .rx_stats
431 .iter()
432 .map(|(&ssrc, stats)| {
433 let duration_since_last_report = stats
434 .last_reception_report_at()
435 .map(|t| t.elapsed())
436 .unwrap_or(Duration::MAX);
437
438 (ssrc, duration_since_last_report)
439 })
440 .collect::<Vec<_>>();
441
442 report_order.sort_unstable_by_key(|&(_, d)| d);
443 report_order.reverse();
444
445 report_order
446 .into_iter()
447 .filter_map(|(ssrc, _)| self.rx_stats.peek_mut(&ssrc)?.create_reception_report())
448 .map(|block| {
449 if self.options.input_ssrc_mode() == SSRCMode::Ignore {
450 if let Some(ssrc) = self.last_ssrc {
451 return block.with_ssrc(ssrc);
452 }
453 }
454
455 block
456 })
457 }
458
459 fn create_primary_rtcp_report(&mut self) -> CompoundRtcpPacket {
464 let sender_ssrc = self.options.primary_sender_ssrc();
465
466 let report = self
467 .tx_stats
468 .get_mut(&sender_ssrc)
469 .and_then(|stats| stats.create_sender_report())
470 .map(RtcpReport::Sender)
471 .unwrap_or_else(|| RtcpReport::Receiver(ReceiverReport::new(sender_ssrc)));
472
473 let mut builder = RtcpReportBuilder::new(report);
474
475 let sdes = self.source_descriptions.get(sender_ssrc);
476
477 let mut min_packets = 2;
478 let mut encoded_size = sdes.raw_size();
479
480 let bye = if self.state != ContextState::Open {
481 let pkt = ByePacket::new([sender_ssrc]);
482
483 let encoded = pkt.encode();
484
485 encoded_size += encoded.raw_size();
486 min_packets += 1;
487
488 Some(encoded)
489 } else {
490 None
491 };
492
493 let mut packets = Vec::with_capacity(min_packets);
494
495 let max_encoded_size = self.options.max_rtcp_packet_size();
496
497 let mut report_blocks = self.generate_reception_report_blocks();
498
499 while (encoded_size + builder.size() + ReportBlock::RAW_SIZE) <= max_encoded_size {
500 if let Some(block) = report_blocks.next() {
501 if let Some(packet) = builder.add(block) {
502 encoded_size += packet.raw_size();
503
504 packets.push(packet);
505 }
506 } else {
507 break;
508 }
509 }
510
511 if !builder.is_empty() || packets.is_empty() {
512 packets.push(builder.build_and_encode());
513 }
514
515 packets.push(sdes);
516
517 if let Some(bye) = bye {
518 packets.push(bye);
519 }
520
521 CompoundRtcpPacket::new(packets)
522 }
523
524 fn create_rtcp_reports(&mut self) -> Vec<CompoundRtcpPacket> {
537 match self.state {
538 ContextState::Open => (),
539 ContextState::Closing => self.state = ContextState::Closed,
540 ContextState::Closed => return Vec::new(),
541 }
542
543 let mut reports = vec![self.create_primary_rtcp_report()];
544
545 let secondary_report_packets = if self.state == ContextState::Open {
546 2
547 } else {
548 3
549 };
550
551 let secondary = self
554 .tx_stats
555 .iter_mut()
556 .filter(|(&ssrc, _)| ssrc != self.options.primary_sender_ssrc())
557 .map(|(&ssrc, stats)| {
558 let mut packets = Vec::with_capacity(secondary_report_packets);
559
560 let report = stats
563 .create_sender_report()
564 .map(RtcpReport::Sender)
565 .unwrap_or_else(|| RtcpReport::Receiver(ReceiverReport::new(ssrc)));
566
567 packets.push(report.encode());
568 packets.push(self.source_descriptions.get(ssrc));
569
570 if self.state != ContextState::Open {
571 let bye = ByePacket::new([ssrc]);
572
573 packets.push(bye.encode());
574 }
575
576 CompoundRtcpPacket::new(packets)
577 });
578
579 reports.extend(secondary);
580 reports
581 }
582}
583
584struct SourceDescriptionCache {
586 descriptions: HashMap<u32, RtcpPacket>,
587}
588
589impl SourceDescriptionCache {
590 fn new() -> Self {
592 Self {
593 descriptions: HashMap::new(),
594 }
595 }
596
597 fn get(&mut self, ssrc: u32) -> RtcpPacket {
599 self.descriptions
600 .entry(ssrc)
601 .or_insert_with(|| {
602 let cname = format!("{:016x}", rand::random::<u64>());
603
604 let desc = SourceDescription::new(ssrc, cname);
605
606 SourceDescriptionPacket::new()
607 .with_source_descriptions([desc])
608 .encode()
609 })
610 .clone()
611 }
612}
613
614struct RtcpReportBuilder {
616 report: RtcpReport,
617 blocks: Vec<ReportBlock>,
618 size: usize,
619}
620
621impl RtcpReportBuilder {
622 fn new<T>(report: T) -> Self
627 where
628 T: Into<RtcpReport>,
629 {
630 let report = report.into();
631
632 let size = RtcpHeader::RAW_SIZE + report.raw_size();
633
634 Self {
635 report,
636 blocks: Vec::new(),
637 size,
638 }
639 }
640
641 fn size(&self) -> usize {
643 self.size
644 }
645
646 fn is_empty(&self) -> bool {
648 self.blocks.is_empty()
649 }
650
651 fn is_full(&self) -> bool {
654 self.blocks.len() >= 31
655 }
656
657 fn add(&mut self, block: ReportBlock) -> Option<RtcpPacket> {
667 self.size += block.raw_size();
668
669 self.blocks.push(block);
670
671 if !self.is_full() {
672 return None;
673 }
674
675 let empty = ReceiverReport::new(self.report.sender_ssrc());
676
677 let full = std::mem::replace(self, Self::new(empty));
678
679 Some(full.build_and_encode())
680 }
681
682 fn build_and_encode(self) -> RtcpPacket {
684 self.report.with_report_blocks(self.blocks).encode()
685 }
686}
687
688enum RtcpReport {
690 Sender(SenderReport),
691 Receiver(ReceiverReport),
692}
693
694impl RtcpReport {
695 fn sender_ssrc(&self) -> u32 {
697 match self {
698 Self::Sender(sr) => sr.sender_ssrc(),
699 Self::Receiver(rr) => rr.sender_ssrc(),
700 }
701 }
702
703 fn raw_size(&self) -> usize {
705 match self {
706 Self::Sender(sr) => sr.raw_size(),
707 Self::Receiver(rr) => rr.raw_size(),
708 }
709 }
710
711 fn with_report_blocks(self, blocks: Vec<ReportBlock>) -> Self {
713 match self {
714 Self::Sender(sr) => Self::Sender(sr.with_report_blocks(blocks)),
715 Self::Receiver(rr) => Self::Receiver(rr.with_report_blocks(blocks)),
716 }
717 }
718
719 fn encode(&self) -> RtcpPacket {
721 match self {
722 Self::Sender(sr) => sr.encode(),
723 Self::Receiver(rr) => rr.encode(),
724 }
725 }
726}
727
728impl From<SenderReport> for RtcpReport {
729 fn from(sr: SenderReport) -> Self {
730 Self::Sender(sr)
731 }
732}
733
734impl From<ReceiverReport> for RtcpReport {
735 fn from(rr: ReceiverReport) -> Self {
736 Self::Receiver(rr)
737 }
738}
739
740#[cfg(test)]
741mod tests {
742 use std::time::Instant;
743
744 use super::InternalContext;
745
746 use crate::{
747 rtcp::{ByePacket, ReceiverReport, RtcpPacketType, SenderReport},
748 rtp::{IncomingRtpPacket, OrderedRtpPacket, RtpPacket},
749 transceiver::{RtpTransceiverOptions, SSRCMode},
750 };
751
752 fn make_rtp_packet(ssrc: u32, seq: u16, timestamp: u32) -> RtpPacket {
753 RtpPacket::new()
754 .with_ssrc(ssrc)
755 .with_sequence_number(seq)
756 .with_timestamp(timestamp)
757 }
758
759 fn make_ordered_rtp_packet(ssrc: u32, index: u64, timestamp: u32) -> OrderedRtpPacket {
760 let packet = make_rtp_packet(ssrc, index as u16, timestamp);
761
762 let incoming = IncomingRtpPacket::new(packet, Instant::now());
763
764 OrderedRtpPacket::new(incoming, index)
765 }
766
767 #[test]
768 fn test_input_ssrc_ignore_mode() {
769 let options = RtpTransceiverOptions::new()
770 .with_default_clock_rate(1000)
771 .with_primary_sender_ssrc(1)
772 .with_input_ssrc_mode(SSRCMode::Ignore);
773
774 let mut context = InternalContext::new(options);
775
776 let packets = vec![
777 make_ordered_rtp_packet(10, 1, 100),
778 make_ordered_rtp_packet(20, 1, 200),
779 make_ordered_rtp_packet(30, 1, 300),
780 ];
781
782 context.process_incoming_rtp_packet(&packets[0]);
783 context.process_ordered_rtp_packet(&packets[0]);
784 context.process_incoming_rtp_packet(&packets[1]);
785 context.process_ordered_rtp_packet(&packets[1]);
786
787 let ssrcs = context
788 .rx_stats
789 .iter()
790 .map(|(&ssrc, _)| ssrc)
791 .collect::<Vec<_>>();
792
793 assert_eq!(&ssrcs[..], &[0]);
794
795 context.process_incoming_rtp_packet(&packets[2]);
796 context.process_ordered_rtp_packet(&packets[2]);
797
798 let report = context.create_primary_rtcp_report();
799
800 assert_eq!(report.len(), 2);
801
802 let rr = &report[0];
803 let sdes = &report[1];
804
805 assert_eq!(rr.packet_type(), RtcpPacketType::RR);
806 assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
807
808 let rr = ReceiverReport::decode(rr).unwrap();
810
811 let rbs = rr.report_blocks();
812
813 assert_eq!(rbs.len(), 1);
814
815 for rb in rbs {
816 assert_eq!(rb.ssrc(), 30);
817 }
818 }
819
820 #[test]
821 fn test_input_ssrc_specific_mode() {
822 let options = RtpTransceiverOptions::new()
823 .with_default_clock_rate(1000)
824 .with_primary_sender_ssrc(1)
825 .with_input_ssrc_mode(SSRCMode::Specific)
826 .with_input_ssrcs([(20, 1000)]);
827
828 let mut context = InternalContext::new(options);
829
830 let packets = vec![
831 make_ordered_rtp_packet(10, 1, 100),
832 make_ordered_rtp_packet(20, 1, 200),
833 make_ordered_rtp_packet(30, 1, 300),
834 ];
835
836 for packet in &packets {
837 context.process_incoming_rtp_packet(packet);
838 context.process_ordered_rtp_packet(packet);
839 }
840
841 let ssrcs = context
842 .rx_stats
843 .iter()
844 .map(|(&ssrc, _)| ssrc)
845 .collect::<Vec<_>>();
846
847 assert_eq!(&ssrcs[..], &[20]);
848
849 let report = context.create_primary_rtcp_report();
850
851 assert_eq!(report.len(), 2);
852
853 let rr = &report[0];
854 let sdes = &report[1];
855
856 assert_eq!(rr.packet_type(), RtcpPacketType::RR);
857 assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
858
859 let rr = ReceiverReport::decode(rr).unwrap();
861
862 let rbs = rr.report_blocks();
863
864 assert_eq!(rbs.len(), 1);
865
866 for rb in rbs {
867 assert_eq!(rb.ssrc(), 20);
868 }
869 }
870
871 #[test]
872 fn test_input_ssrc_any_mode() {
873 let options = RtpTransceiverOptions::new()
874 .with_default_clock_rate(1000)
875 .with_primary_sender_ssrc(1)
876 .with_input_ssrc_mode(SSRCMode::Any)
877 .with_max_input_ssrcs(Some(2));
878
879 let mut context = InternalContext::new(options);
880
881 let packets = vec![
882 make_ordered_rtp_packet(10, 1, 100),
883 make_ordered_rtp_packet(20, 1, 200),
884 make_ordered_rtp_packet(30, 1, 300),
885 ];
886
887 context.process_incoming_rtp_packet(&packets[0]);
888 context.process_ordered_rtp_packet(&packets[0]);
889 context.process_incoming_rtp_packet(&packets[1]);
890 context.process_ordered_rtp_packet(&packets[1]);
891
892 let mut ssrcs = context
893 .rx_stats
894 .iter()
895 .map(|(&ssrc, _)| ssrc)
896 .collect::<Vec<_>>();
897
898 ssrcs.sort_unstable();
899
900 assert_eq!(&ssrcs[..], &[10, 20]);
901
902 context.process_incoming_rtp_packet(&packets[2]);
903 context.process_ordered_rtp_packet(&packets[2]);
904
905 let mut ssrcs = context
906 .rx_stats
907 .iter()
908 .map(|(&ssrc, _)| ssrc)
909 .collect::<Vec<_>>();
910
911 ssrcs.sort_unstable();
912
913 assert_eq!(&ssrcs[..], &[20, 30]);
915
916 let report = context.create_primary_rtcp_report();
917
918 assert_eq!(report.len(), 2);
919
920 let rr = &report[0];
921 let sdes = &report[1];
922
923 assert_eq!(rr.packet_type(), RtcpPacketType::RR);
924 assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
925
926 let rr = ReceiverReport::decode(rr).unwrap();
929
930 let rbs = rr.report_blocks();
931
932 assert_eq!(rbs.len(), 2);
933
934 for rb in rbs {
935 assert!(rb.ssrc() == 20 || rb.ssrc() == 30);
936 }
937 }
938
939 #[test]
940 fn test_sender_report_generation() {
941 let options = RtpTransceiverOptions::new()
942 .with_default_clock_rate(1000)
943 .with_primary_sender_ssrc(10)
944 .with_input_ssrc_mode(SSRCMode::Ignore);
945
946 let mut context = InternalContext::new(options);
947
948 let packet = make_ordered_rtp_packet(10, 1, 100);
949
950 context.process_incoming_rtp_packet(&packet);
951 context.process_ordered_rtp_packet(&packet);
952 context.process_outgoing_rtp_packet(&packet);
953
954 let report = context.create_primary_rtcp_report();
955
956 assert_eq!(report.len(), 2);
957
958 let sr = &report[0];
959 let sdes = &report[1];
960
961 assert_eq!(sr.packet_type(), RtcpPacketType::SR);
962 assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
963
964 let sr = SenderReport::decode(sr).unwrap();
965
966 assert_eq!(sr.sender_ssrc(), 10);
967 assert_eq!(sr.octet_count(), 0);
968 assert_eq!(sr.packet_count(), 1);
969
970 let rbs = sr.report_blocks();
971
972 assert_eq!(rbs.len(), 1);
973
974 for rb in rbs {
975 assert_eq!(rb.ssrc(), 10);
976 }
977 }
978
979 #[test]
980 fn test_multiple_sender_ssrcs() {
981 let options = RtpTransceiverOptions::new()
982 .with_default_clock_rate(1000)
983 .with_primary_sender_ssrc(10);
984
985 let mut context = InternalContext::new(options);
986
987 context.process_outgoing_rtp_packet(&make_rtp_packet(10, 1, 100));
988 context.process_outgoing_rtp_packet(&make_rtp_packet(20, 1, 100));
989
990 let reports = context.create_rtcp_reports();
991
992 assert_eq!(reports.len(), 2);
993
994 for r in &reports {
996 assert_eq!(r.len(), 2);
997
998 let sr = &r[0];
999 let sdes = &r[1];
1000
1001 assert_eq!(sr.packet_type(), RtcpPacketType::SR);
1002 assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
1003 }
1004
1005 let primary = &reports[0][0];
1006 let secondary = &reports[1][0];
1007
1008 let sr = SenderReport::decode(primary).unwrap();
1009
1010 assert_eq!(sr.sender_ssrc(), 10);
1011
1012 let sr = SenderReport::decode(secondary).unwrap();
1013
1014 assert_eq!(sr.sender_ssrc(), 20);
1015 }
1016
1017 #[test]
1018 fn test_end_of_stream() {
1019 let options = RtpTransceiverOptions::new()
1021 .with_default_clock_rate(1000)
1022 .with_primary_sender_ssrc(1)
1023 .with_input_ssrc_mode(SSRCMode::Any)
1024 .with_max_input_ssrcs(Some(3));
1025
1026 let mut context = InternalContext::new(options);
1027
1028 assert!(!context.end_of_stream());
1029
1030 let packets = vec![
1031 make_ordered_rtp_packet(10, 1, 100),
1032 make_ordered_rtp_packet(20, 1, 200),
1033 make_ordered_rtp_packet(30, 1, 300),
1034 ];
1035
1036 for packet in &packets {
1037 context.process_incoming_rtp_packet(packet);
1038 context.process_ordered_rtp_packet(packet);
1039 }
1040
1041 assert!(!context.end_of_stream());
1042
1043 context.process_incoming_bye_packet(&ByePacket::new([10]));
1044
1045 assert!(!context.end_of_stream());
1046
1047 context.process_incoming_bye_packet(&ByePacket::new([20, 30]));
1048
1049 assert!(context.end_of_stream());
1050
1051 let options = RtpTransceiverOptions::new()
1053 .with_default_clock_rate(1000)
1054 .with_primary_sender_ssrc(1)
1055 .with_input_ssrc_mode(SSRCMode::Specific)
1056 .with_input_ssrcs([(10, 1000), (20, 1000), (30, 1000)]);
1057
1058 let mut context = InternalContext::new(options);
1059
1060 assert!(!context.end_of_stream());
1061
1062 let packets = vec![
1063 make_ordered_rtp_packet(10, 1, 100),
1064 make_ordered_rtp_packet(20, 1, 200),
1065 ];
1066
1067 for packet in &packets {
1068 context.process_incoming_rtp_packet(packet);
1069 context.process_ordered_rtp_packet(packet);
1070 }
1071
1072 assert!(!context.end_of_stream());
1073
1074 context.process_incoming_bye_packet(&ByePacket::new([10]));
1075
1076 assert!(!context.end_of_stream());
1077
1078 context.process_incoming_bye_packet(&ByePacket::new([20]));
1079
1080 assert!(!context.end_of_stream());
1081
1082 context.process_incoming_bye_packet(&ByePacket::new([30]));
1083
1084 assert!(context.end_of_stream());
1085 }
1086
1087 #[test]
1088 fn test_multi_packet_receiver_report() {
1089 let options = RtpTransceiverOptions::new()
1090 .with_default_clock_rate(1000)
1091 .with_primary_sender_ssrc(0)
1092 .with_input_ssrc_mode(SSRCMode::Any)
1093 .with_max_input_ssrcs(None)
1094 .with_max_rtcp_packet_size(836);
1095
1096 let mut context = InternalContext::new(options);
1097
1098 for i in 0..33 {
1099 let packet = make_ordered_rtp_packet(0 + i, 1, 100);
1100
1101 context.process_incoming_rtp_packet(&packet);
1102 context.process_ordered_rtp_packet(&packet);
1103 }
1104
1105 let report = context.create_primary_rtcp_report();
1106
1107 assert_eq!(report.len(), 3);
1108
1109 let rr = &report[0];
1110
1111 assert_eq!(rr.packet_type(), RtcpPacketType::RR);
1112 assert_eq!(rr.raw_size(), 752);
1113
1114 let rr = ReceiverReport::decode(rr).unwrap();
1115
1116 let rbs = rr.report_blocks();
1117
1118 assert_eq!(rbs.len(), 31);
1119
1120 for (i, rb) in rbs.iter().enumerate() {
1121 assert_eq!(rb.ssrc(), i as u32);
1122 }
1123
1124 let rr = &report[1];
1125
1126 assert_eq!(rr.packet_type(), RtcpPacketType::RR);
1127 assert_eq!(rr.raw_size(), 56);
1128
1129 let rr = ReceiverReport::decode(rr).unwrap();
1130
1131 let rbs = rr.report_blocks();
1132
1133 assert_eq!(rbs.len(), 2);
1134
1135 for (i, rb) in rbs.iter().enumerate() {
1136 assert_eq!(rb.ssrc(), (i + 31) as u32);
1137 }
1138
1139 let sdes = &report[2];
1140
1141 assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
1142 assert_eq!(sdes.raw_size(), 28);
1143
1144 assert_eq!(report.raw_size(), 836);
1145
1146 let options = RtpTransceiverOptions::new()
1150 .with_default_clock_rate(1000)
1151 .with_primary_sender_ssrc(0)
1152 .with_input_ssrc_mode(SSRCMode::Any)
1153 .with_max_input_ssrcs(None)
1154 .with_max_rtcp_packet_size(835);
1155
1156 let mut context = InternalContext::new(options);
1157
1158 for i in 0..33 {
1159 let packet = make_ordered_rtp_packet(0 + i, 1, 100);
1160
1161 context.process_incoming_rtp_packet(&packet);
1162 context.process_ordered_rtp_packet(&packet);
1163 }
1164
1165 let report = context.create_primary_rtcp_report();
1166
1167 assert_eq!(report.len(), 3);
1168
1169 let rr = &report[0];
1170
1171 assert_eq!(rr.packet_type(), RtcpPacketType::RR);
1172 assert_eq!(rr.raw_size(), 752);
1173
1174 let rr = ReceiverReport::decode(rr).unwrap();
1175
1176 let rbs = rr.report_blocks();
1177
1178 assert_eq!(rbs.len(), 31);
1179
1180 for (i, rb) in rbs.iter().enumerate() {
1181 assert_eq!(rb.ssrc(), i as u32);
1182 }
1183
1184 let rr = &report[1];
1185
1186 assert_eq!(rr.packet_type(), RtcpPacketType::RR);
1187 assert_eq!(rr.raw_size(), 32);
1188
1189 let rr = ReceiverReport::decode(rr).unwrap();
1190
1191 let rbs = rr.report_blocks();
1192
1193 assert_eq!(rbs.len(), 1);
1194
1195 for (i, rb) in rbs.iter().enumerate() {
1196 assert_eq!(rb.ssrc(), (i + 31) as u32);
1197 }
1198
1199 let sdes = &report[2];
1200
1201 assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
1202 assert_eq!(sdes.raw_size(), 28);
1203
1204 assert_eq!(report.raw_size(), 812);
1205 }
1206
1207 #[test]
1208 fn test_context_closing() {
1209 let options = RtpTransceiverOptions::new()
1210 .with_default_clock_rate(1000)
1211 .with_primary_sender_ssrc(10);
1212
1213 let mut context = InternalContext::new(options);
1214
1215 let reports = context.create_rtcp_reports();
1216
1217 assert_eq!(reports.len(), 1);
1218
1219 for r in &reports {
1220 assert_eq!(r.len(), 2); }
1222
1223 context.process_outgoing_rtp_packet(&make_rtp_packet(10, 1, 100));
1224 context.process_outgoing_rtp_packet(&make_rtp_packet(20, 1, 100));
1225
1226 let reports = context.create_rtcp_reports();
1227
1228 assert_eq!(reports.len(), 2);
1229
1230 for r in &reports {
1231 assert_eq!(r.len(), 2); }
1233
1234 context.close();
1235
1236 let reports = context.create_rtcp_reports();
1237
1238 assert_eq!(reports.len(), 2);
1239
1240 for r in &reports {
1241 assert_eq!(r.len(), 3); let bye = &r[2];
1244
1245 assert_eq!(bye.packet_type(), RtcpPacketType::BYE);
1246 }
1247
1248 let reports = context.create_rtcp_reports();
1249
1250 assert!(reports.is_empty());
1251 }
1252}