1use std::{
2 collections::VecDeque,
3 future::Future,
4 pin::Pin,
5 task::{Context, Poll},
6 time::{Duration, Instant},
7};
8
9use futures::{channel::mpsc, ready, FutureExt, Sink, SinkExt, Stream, StreamExt};
10use tokio::{
11 task::JoinHandle,
12 time::{Interval, MissedTickBehavior},
13};
14
15use crate::{
16 rtcp::{ByePacket, ReceiverReport, RtcpContextHandle, RtcpPacketType, SenderReport},
17 transceiver::RtpTransceiver,
18 utils::PacketMux,
19 CompoundRtcpPacket, InvalidInput, RtpPacket,
20};
21
22#[derive(Copy, Clone)]
24pub struct RtcpHandlerOptions {
25 rtcp_report_interval: Duration,
26 ignore_decoding_errors: bool,
27}
28
29impl RtcpHandlerOptions {
30 #[inline]
32 pub const fn new() -> Self {
33 Self {
34 rtcp_report_interval: Duration::from_secs(5),
35 ignore_decoding_errors: true,
36 }
37 }
38
39 #[inline]
41 pub const fn rtcp_report_interval(&self) -> Duration {
42 self.rtcp_report_interval
43 }
44
45 #[inline]
50 pub const fn with_rtcp_report_interval(mut self, interval: Duration) -> Self {
51 self.rtcp_report_interval = interval;
52 self
53 }
54
55 #[inline]
57 pub const fn ignore_decoding_errors(&self) -> bool {
58 self.ignore_decoding_errors
59 }
60
61 #[inline]
68 pub const fn with_ignore_decoding_errors(mut self, ignore: bool) -> Self {
69 self.ignore_decoding_errors = ignore;
70 self
71 }
72}
73
74impl Default for RtcpHandlerOptions {
75 #[inline]
76 fn default() -> Self {
77 Self::new()
78 }
79}
80
81pin_project_lite::pin_project! {
82 pub struct RtcpHandler<T> {
89 #[pin]
90 stream: T,
91 context: RtcpHandlerContext,
92 }
93}
94
95impl<T> RtcpHandler<T> {
96 pub fn new<U, E>(rtp: T, rtcp: U, options: RtcpHandlerOptions) -> Self
100 where
101 T: RtpTransceiver,
102 U: Send + 'static,
103 U: Stream<Item = Result<CompoundRtcpPacket, E>>,
104 U: Sink<CompoundRtcpPacket>,
105 {
106 let rtcp_context = rtp.rtcp_context();
107
108 Self::new_with_rtcp_context(rtp, rtcp, rtcp_context, options)
109 }
110
111 pub fn new_with_rtcp_context<U, E>(
113 rtp: T,
114 rtcp: U,
115 rtcp_context: RtcpContextHandle,
116 options: RtcpHandlerOptions,
117 ) -> Self
118 where
119 U: Send + 'static,
120 U: Stream<Item = Result<CompoundRtcpPacket, E>>,
121 U: Sink<CompoundRtcpPacket>,
122 {
123 let (rtcp_tx, rtcp_rx) = rtcp.split();
124
125 let sender = send_rtcp_reports(
126 rtcp_tx,
127 rtcp_context.clone(),
128 options.rtcp_report_interval(),
129 );
130
131 tokio::spawn(async move {
135 let _ = sender.await;
136 });
137
138 let receiver = RtcpReceiver::new(
139 rtcp_rx,
140 rtcp_context.clone(),
141 options.ignore_decoding_errors(),
142 );
143
144 let receiver = tokio::spawn(async move {
146 let _ = receiver.await;
147 });
148
149 Self {
150 stream: rtp,
151 context: RtcpHandlerContext {
152 context: rtcp_context,
153 receiver,
154 },
155 }
156 }
157}
158
159impl<T, P, E> Stream for RtcpHandler<T>
160where
161 T: Stream<Item = Result<P, E>>,
162{
163 type Item = Result<P, E>;
164
165 #[inline]
166 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
167 let this = self.project();
168
169 this.stream.poll_next(cx)
170 }
171}
172
173impl<T, P, E> Sink<P> for RtcpHandler<T>
174where
175 T: Sink<P, Error = E>,
176{
177 type Error = E;
178
179 #[inline]
180 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
181 let this = self.project();
182
183 this.stream.poll_ready(cx)
184 }
185
186 #[inline]
187 fn start_send(self: Pin<&mut Self>, packet: P) -> Result<(), Self::Error> {
188 let this = self.project();
189
190 this.stream.start_send(packet)
191 }
192
193 #[inline]
194 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
195 let this = self.project();
196
197 this.stream.poll_flush(cx)
198 }
199
200 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
201 let this = self.project();
202
203 ready!(this.stream.poll_close(cx))?;
204
205 this.context.context.close();
208
209 Poll::Ready(Ok(()))
210 }
211}
212
213struct RtcpHandlerContext {
215 context: RtcpContextHandle,
216 receiver: JoinHandle<()>,
217}
218
219impl Drop for RtcpHandlerContext {
220 fn drop(&mut self) {
221 self.context.close();
224
225 self.receiver.abort();
227 }
228}
229
230type DemuxingRtpStream<P, E> = mpsc::Receiver<Result<P, E>>;
232
233type MuxingRtpSink = PacketMuxer<mpsc::Sender<PacketMux>>;
235
236type RtpComponent<P, E> = StreamSink<DemuxingRtpStream<P, E>, MuxingRtpSink>;
238
239pub struct MuxedRtcpHandler<P, E> {
246 inner: RtcpHandler<RtpComponent<P, E>>,
247 reader: JoinHandle<()>,
248 writer: JoinHandle<Result<(), E>>,
249 sink_error: bool,
250}
251
252impl<P, E> MuxedRtcpHandler<P, E> {
253 pub fn new<T>(stream: T, options: RtcpHandlerOptions) -> Self
255 where
256 T: Send + 'static,
257 T: Stream<Item = Result<PacketMux<P>, E>>,
258 T: Sink<PacketMux, Error = E>,
259 T: RtpTransceiver,
260 P: Send + 'static,
261 E: Send + 'static,
262 {
263 let rtcp_context = stream.rtcp_context();
264
265 let (muxed_tx, mut muxed_rx) = stream.split();
266
267 let (mut input_rtp_tx, input_rtp_rx) = mpsc::channel::<Result<_, E>>(4);
268 let (output_rtp_tx, output_rtp_rx) = mpsc::channel(4);
269 let (mut input_rtcp_tx, input_rtcp_rx) = mpsc::channel::<Result<_, E>>(4);
270 let (output_rtcp_tx, output_rtcp_rx) = mpsc::channel(4);
271
272 let output_rtp_tx = PacketMuxer::new(output_rtp_tx);
273 let output_rtcp_tx = PacketMuxer::new(output_rtcp_tx);
274
275 let rtp = StreamSink::new(input_rtp_rx, output_rtp_tx);
276 let rtcp = StreamSink::new(input_rtcp_rx, output_rtcp_tx);
277
278 let reader = tokio::spawn(async move {
280 let mut run = true;
281
282 while run {
283 let next = muxed_rx.next().await;
284
285 run = matches!(next, Some(Ok(_)));
286
287 let _ = match next {
288 Some(Ok(PacketMux::Rtp(packet))) => input_rtp_tx.send(Ok(packet)).await,
289 Some(Ok(PacketMux::Rtcp(packet))) => input_rtcp_tx.send(Ok(packet)).await,
290 Some(Err(err)) => input_rtp_tx.send(Err(err)).await,
291 _ => Ok(()),
292 };
293 }
294 });
295
296 let writer = tokio::spawn(async move {
300 futures::stream::select(output_rtp_rx, output_rtcp_rx)
301 .map(Ok)
302 .forward(muxed_tx)
303 .await
304 });
305
306 Self {
307 inner: RtcpHandler::new_with_rtcp_context(rtp, rtcp, rtcp_context, options),
308 reader,
309 writer,
310 sink_error: false,
311 }
312 }
313
314 fn poll_writer_result(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> {
316 match ready!(self.writer.poll_unpin(cx)) {
317 Ok(Ok(_)) => Poll::Ready(Ok(())),
318 Ok(Err(err)) => Poll::Ready(Err(err)),
319 Err(_) => Poll::Ready(Ok(())),
320 }
321 }
322}
323
324impl<P, E> Drop for MuxedRtcpHandler<P, E> {
325 #[inline]
326 fn drop(&mut self) {
327 self.reader.abort();
328 }
329}
330
331impl<P, E> Stream for MuxedRtcpHandler<P, E> {
332 type Item = Result<P, E>;
333
334 #[inline]
335 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
336 self.inner.poll_next_unpin(cx)
337 }
338}
339
340impl<P, E> Sink<RtpPacket> for MuxedRtcpHandler<P, E> {
341 type Error = E;
342
343 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
344 loop {
345 if self.sink_error {
346 return self.poll_writer_result(cx);
347 }
348
349 let res = ready!(SinkExt::<RtpPacket>::poll_ready_unpin(&mut self.inner, cx));
350
351 if res.is_ok() {
352 return Poll::Ready(Ok(()));
353 } else {
354 self.sink_error = true;
355 }
356 }
357 }
358
359 fn start_send(mut self: Pin<&mut Self>, item: RtpPacket) -> Result<(), Self::Error> {
360 let res = SinkExt::<RtpPacket>::start_send_unpin(&mut self.inner, item);
361
362 if res.is_err() {
365 self.sink_error = true;
366 }
367
368 Ok(())
369 }
370
371 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
372 loop {
373 if self.sink_error {
374 return self.poll_writer_result(cx);
375 }
376
377 let res = ready!(SinkExt::<RtpPacket>::poll_flush_unpin(&mut self.inner, cx));
378
379 if res.is_ok() {
380 return Poll::Ready(Ok(()));
381 } else {
382 self.sink_error = true;
383 }
384 }
385 }
386
387 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
388 loop {
389 if self.sink_error {
390 return self.poll_writer_result(cx);
391 }
392
393 let res = ready!(SinkExt::<RtpPacket>::poll_close_unpin(&mut self.inner, cx));
394
395 if res.is_ok() {
396 return Poll::Ready(Ok(()));
397 } else {
398 self.sink_error = true;
399 }
400 }
401 }
402}
403
404pin_project_lite::pin_project! {
405 struct StreamSink<T, U> {
407 #[pin]
408 stream: T,
409 #[pin]
410 sink: U,
411 }
412}
413
414impl<T, U> StreamSink<T, U> {
415 fn new(stream: T, sink: U) -> Self {
417 Self { stream, sink }
418 }
419}
420
421impl<T, U> Stream for StreamSink<T, U>
422where
423 T: Stream,
424{
425 type Item = T::Item;
426
427 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
428 let this = self.project();
429
430 this.stream.poll_next(cx)
431 }
432}
433
434impl<T, U, I> Sink<I> for StreamSink<T, U>
435where
436 U: Sink<I>,
437{
438 type Error = U::Error;
439
440 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
441 let this = self.project();
442
443 this.sink.poll_ready(cx)
444 }
445
446 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
447 let this = self.project();
448
449 this.sink.start_send(item)
450 }
451
452 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
453 let this = self.project();
454
455 this.sink.poll_flush(cx)
456 }
457
458 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
459 let this = self.project();
460
461 this.sink.poll_close(cx)
462 }
463}
464
465pin_project_lite::pin_project! {
466 struct PacketMuxer<T> {
468 #[pin]
469 inner: T,
470 }
471}
472
473impl<T> PacketMuxer<T> {
474 fn new(sink: T) -> Self {
476 Self { inner: sink }
477 }
478}
479
480impl<T, I> Sink<I> for PacketMuxer<T>
481where
482 T: Sink<PacketMux>,
483 I: Into<PacketMux>,
484{
485 type Error = T::Error;
486
487 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
488 let this = self.project();
489
490 this.inner.poll_ready(cx)
491 }
492
493 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
494 let this = self.project();
495
496 this.inner.start_send(item.into())
497 }
498
499 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
500 let this = self.project();
501
502 this.inner.poll_flush(cx)
503 }
504
505 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
506 let this = self.project();
507
508 this.inner.poll_close(cx)
509 }
510}
511
512pin_project_lite::pin_project! {
513 struct RtcpReceiver<T> {
515 #[pin]
516 stream: T,
517 context: RtcpReceiverContext,
518 ignore_decoding_errors: bool,
519 }
520}
521
522impl<T> RtcpReceiver<T> {
523 fn new(stream: T, context: RtcpContextHandle, ignore_decoding_errors: bool) -> Self {
525 Self {
526 stream,
527 context: RtcpReceiverContext::new(context),
528 ignore_decoding_errors,
529 }
530 }
531}
532
533impl<T, E> Future for RtcpReceiver<T>
534where
535 T: Stream<Item = Result<CompoundRtcpPacket, E>>,
536{
537 type Output = Result<(), RtcpReceiverError<E>>;
538
539 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
540 let mut this = self.project();
541
542 loop {
543 let stream = this.stream.as_mut();
544
545 match ready!(stream.poll_next(cx)) {
546 Some(Ok(packet)) => {
547 if let Err(err) = this.context.process_incoming_rtcp_packet(&packet) {
548 if !*this.ignore_decoding_errors {
549 return Poll::Ready(Err(err.into()));
550 }
551 }
552 }
553 Some(Err(err)) => return Poll::Ready(Err(RtcpReceiverError::Other(err))),
554 None => return Poll::Ready(Ok(())),
555 }
556 }
557 }
558}
559
560struct RtcpReceiverContext {
562 context: RtcpContextHandle,
563}
564
565impl RtcpReceiverContext {
566 fn new(context: RtcpContextHandle) -> Self {
568 Self { context }
569 }
570
571 fn process_incoming_rtcp_packet(
573 &mut self,
574 packet: &CompoundRtcpPacket,
575 ) -> Result<(), InvalidInput> {
576 for packet in packet.iter() {
577 match packet.packet_type() {
578 RtcpPacketType::SR => {
579 self.context
580 .process_incoming_sender_report(&SenderReport::decode(packet)?);
581 }
582 RtcpPacketType::RR => {
583 self.context
584 .process_incoming_receiver_report(&ReceiverReport::decode(packet)?);
585 }
586 RtcpPacketType::BYE => {
587 self.context
588 .process_incoming_bye_packet(&ByePacket::decode(packet)?);
589 }
590 _ => (),
591 }
592 }
593
594 Ok(())
595 }
596}
597
598enum RtcpReceiverError<E> {
600 InvalidInput,
601 Other(E),
602}
603
604impl<E> From<InvalidInput> for RtcpReceiverError<E> {
605 fn from(_: InvalidInput) -> Self {
606 Self::InvalidInput
607 }
608}
609
610async fn send_rtcp_reports<T>(
615 sink: T,
616 context: RtcpContextHandle,
617 rtcp_report_interval: Duration,
618) -> Result<(), T::Error>
619where
620 T: Sink<CompoundRtcpPacket>,
621{
622 RtcpOutputStream::new(context, rtcp_report_interval)
623 .map(Ok)
624 .forward(sink)
625 .await
626}
627
628struct RtcpOutputStream {
630 interval: Interval,
631 context: RtcpContextHandle,
632 output: VecDeque<CompoundRtcpPacket>,
633}
634
635impl RtcpOutputStream {
636 fn new(context: RtcpContextHandle, rtcp_report_interval: Duration) -> Self {
639 let start = Instant::now() + (rtcp_report_interval / 2);
640
641 let mut interval = tokio::time::interval_at(start.into(), rtcp_report_interval);
642
643 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
644
645 Self {
646 interval,
647 context,
648 output: VecDeque::new(),
649 }
650 }
651}
652
653impl Stream for RtcpOutputStream {
654 type Item = CompoundRtcpPacket;
655
656 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
657 loop {
658 if let Some(packet) = self.output.pop_front() {
659 return Poll::Ready(Some(packet));
660 }
661
662 let closed = self.context.poll_closed(cx);
663
664 if closed.is_pending() {
665 ready!(self.interval.poll_tick(cx));
666 }
667
668 let packets = self.context.create_rtcp_reports();
669
670 if packets.is_empty() {
671 return Poll::Ready(None);
672 }
673
674 self.output.extend(packets);
675 }
676 }
677}
678
679#[cfg(test)]
680mod tests {
681 use std::{
682 collections::VecDeque,
683 convert::Infallible,
684 pin::Pin,
685 sync::{Arc, Mutex},
686 task::{Context, Poll},
687 time::{Duration, Instant},
688 };
689
690 use futures::{channel::mpsc, Sink, SinkExt, Stream, StreamExt};
691
692 use super::{MuxedRtcpHandler, RtcpHandler, RtcpHandlerOptions, StreamSink};
693
694 use crate::{
695 rtcp::{RtcpContext, RtcpPacketType},
696 rtp::{IncomingRtpPacket, OrderedRtpPacket, RtpPacket},
697 transceiver::{DefaultRtpTransceiver, RtpTransceiver, RtpTransceiverOptions, SSRCMode},
698 utils::PacketMux,
699 };
700
701 fn make_rtp_packet(ssrc: u32, seq: u16, timestamp: u32) -> RtpPacket {
702 RtpPacket::new()
703 .with_ssrc(ssrc)
704 .with_sequence_number(seq)
705 .with_timestamp(timestamp)
706 }
707
708 #[derive(Clone)]
710 struct RtcpTestChannel<I, O> {
711 inner: Arc<Mutex<InnerRtcpTestChannel<I, O>>>,
712 }
713
714 impl<I, O> RtcpTestChannel<I, O> {
715 fn new<T>(input: T) -> Self
717 where
718 T: IntoIterator<Item = I>,
719 {
720 Self {
721 inner: Arc::new(Mutex::new(InnerRtcpTestChannel::new(input))),
722 }
723 }
724 }
725
726 impl<I, O> Stream for RtcpTestChannel<I, O> {
727 type Item = Result<I, Infallible>;
728
729 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
730 let mut inner = self.inner.lock().unwrap();
731
732 if let Some(packet) = inner.input.pop_front() {
733 Poll::Ready(Some(Ok(packet)))
734 } else {
735 Poll::Pending
736 }
737 }
738 }
739
740 impl<I, O> Sink<O> for RtcpTestChannel<I, O> {
741 type Error = Infallible;
742
743 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
744 Poll::Ready(Ok(()))
745 }
746
747 fn start_send(self: Pin<&mut Self>, packet: O) -> Result<(), Self::Error> {
748 let mut inner = self.inner.lock().unwrap();
749 inner.output.push(packet);
750
751 Ok(())
752 }
753
754 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
755 Poll::Ready(Ok(()))
756 }
757
758 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
759 self.inner.lock().unwrap().closed = true;
760
761 Poll::Ready(Ok(()))
762 }
763 }
764
765 struct InnerRtcpTestChannel<I, O> {
767 input: VecDeque<I>,
768 output: Vec<O>,
769 closed: bool,
770 }
771
772 impl<I, O> InnerRtcpTestChannel<I, O> {
773 fn new<T>(input: T) -> Self
775 where
776 T: IntoIterator<Item = I>,
777 {
778 Self {
779 input: VecDeque::from_iter(input),
780 output: Vec::new(),
781 closed: false,
782 }
783 }
784 }
785
786 #[derive(Clone)]
788 struct MuxedTestTransceiver {
789 inner: Arc<Mutex<InnerMuxedTestTransceiver>>,
790 }
791
792 impl MuxedTestTransceiver {
793 fn new<T>(input: T, options: RtpTransceiverOptions) -> Self
795 where
796 T: IntoIterator<Item = PacketMux>,
797 {
798 let inner = InnerMuxedTestTransceiver::new(input, options);
799
800 Self {
801 inner: Arc::new(Mutex::new(inner)),
802 }
803 }
804 }
805
806 impl Stream for MuxedTestTransceiver {
807 type Item = Result<PacketMux, Infallible>;
808
809 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
810 let mut inner = self.inner.lock().unwrap();
811
812 if let Some(packet) = inner.inner.input.pop_front() {
813 let packet = if let PacketMux::Rtp(packet) = packet {
814 let index = packet.sequence_number() as u64;
815
816 let now = Instant::now();
817 let incoming = IncomingRtpPacket::new(packet, now);
818 let ordered = OrderedRtpPacket::new(incoming, index);
819
820 inner.context.process_incoming_rtp_packet(&ordered);
821 inner.context.process_ordered_rtp_packet(&ordered);
822
823 PacketMux::Rtp(ordered.into())
824 } else {
825 packet
826 };
827
828 Poll::Ready(Some(Ok(packet)))
829 } else {
830 Poll::Ready(None)
831 }
832 }
833 }
834
835 impl Sink<PacketMux> for MuxedTestTransceiver {
836 type Error = Infallible;
837
838 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
839 Poll::Ready(Ok(()))
840 }
841
842 fn start_send(self: Pin<&mut Self>, packet: PacketMux) -> Result<(), Self::Error> {
843 let mut inner = self.inner.lock().unwrap();
844
845 if let PacketMux::Rtp(packet) = &packet {
846 inner.context.process_outgoing_rtp_packet(packet);
847 }
848
849 inner.inner.output.push(packet);
850
851 Ok(())
852 }
853
854 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
855 Poll::Ready(Ok(()))
856 }
857
858 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
859 let mut inner = self.inner.lock().unwrap();
860
861 inner.inner.closed = true;
862
863 Poll::Ready(Ok(()))
864 }
865 }
866
867 impl RtpTransceiver for MuxedTestTransceiver {
868 fn rtcp_context(&self) -> crate::rtcp::RtcpContextHandle {
869 let inner = self.inner.lock().unwrap();
870
871 inner.context.handle()
872 }
873 }
874
875 struct InnerMuxedTestTransceiver {
877 inner: InnerRtcpTestChannel<PacketMux, PacketMux>,
878 context: RtcpContext,
879 }
880
881 impl InnerMuxedTestTransceiver {
882 fn new<T>(input: T, options: RtpTransceiverOptions) -> Self
884 where
885 T: IntoIterator<Item = PacketMux>,
886 {
887 Self {
888 inner: InnerRtcpTestChannel::new(input),
889 context: RtcpContext::new(options),
890 }
891 }
892 }
893
894 #[tokio::test]
895 async fn test_handler_task_termination() {
896 let (mut incoming_rtp_tx, incoming_rtp_rx) =
897 mpsc::unbounded::<Result<RtpPacket, Infallible>>();
898 let (outgoing_rtp_tx, outgoing_rtp_rx) = mpsc::unbounded::<RtpPacket>();
899
900 let rtp = StreamSink::new(incoming_rtp_rx, outgoing_rtp_tx);
901
902 let options = RtpTransceiverOptions::new()
903 .with_default_clock_rate(1000)
904 .with_primary_sender_ssrc(0)
905 .with_input_ssrc_mode(SSRCMode::Any);
906
907 let rtp = DefaultRtpTransceiver::<_, Infallible>::new(rtp, options);
908
909 let rtcp = RtcpTestChannel::new([]);
910
911 let options = RtcpHandlerOptions::new()
912 .with_ignore_decoding_errors(true)
913 .with_rtcp_report_interval(Duration::from_millis(100));
914
915 let handler = RtcpHandler::new(rtp, rtcp.clone(), options);
916
917 let handler = tokio::spawn(async move { handler.collect::<Vec<_>>().await });
918
919 incoming_rtp_tx
920 .send(Ok(make_rtp_packet(1, 1, 100)))
921 .await
922 .unwrap();
923 incoming_rtp_tx.close().await.unwrap();
924
925 let incoming_rtp_packets = handler.await.unwrap();
926
927 std::mem::drop(outgoing_rtp_rx);
928
929 assert_eq!(incoming_rtp_packets.len(), 1);
930
931 let packet = incoming_rtp_packets.into_iter().next().unwrap().unwrap();
932
933 assert_eq!(packet.ssrc(), 1);
934 assert_eq!(packet.sequence_number(), 1);
935 assert_eq!(packet.timestamp(), 100);
936
937 let wait_for_close = async {
938 while Arc::strong_count(&rtcp.inner) > 1 {
939 tokio::time::sleep(Duration::from_millis(100)).await;
940 }
941 };
942
943 tokio::time::timeout(Duration::from_secs(1), wait_for_close)
944 .await
945 .expect("RTCP handler tasks have not terminated");
946
947 let rtcp = Arc::try_unwrap(rtcp.inner)
949 .ok()
950 .unwrap()
951 .into_inner()
952 .ok()
953 .unwrap();
954
955 assert!(rtcp.closed);
956
957 assert_eq!(rtcp.output.len(), 1);
958
959 let report = &rtcp.output[0];
960
961 assert_eq!(report.len(), 3);
962
963 let rr = &report[0];
964 let sdes = &report[1];
965 let bye = &report[2];
966
967 assert_eq!(rr.packet_type(), RtcpPacketType::RR);
968 assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
969 assert_eq!(bye.packet_type(), RtcpPacketType::BYE);
970 }
971
972 #[tokio::test]
973 async fn test_muxed_handler_task_termination() {
974 let options = RtpTransceiverOptions::new()
975 .with_default_clock_rate(1000)
976 .with_primary_sender_ssrc(0)
977 .with_input_ssrc_mode(SSRCMode::Any);
978
979 let packet = PacketMux::Rtp(make_rtp_packet(1, 1, 100));
980
981 let muxed = MuxedTestTransceiver::new([packet], options);
982
983 let options = RtcpHandlerOptions::new()
984 .with_ignore_decoding_errors(true)
985 .with_rtcp_report_interval(Duration::from_millis(100));
986
987 let handler = MuxedRtcpHandler::new(muxed.clone(), options);
988
989 let handler = tokio::spawn(async move { handler.collect::<Vec<_>>().await });
990
991 let incoming_rtp_packets = handler.await.unwrap();
992
993 assert_eq!(incoming_rtp_packets.len(), 1);
994
995 let packet = incoming_rtp_packets.into_iter().next().unwrap().unwrap();
996
997 assert_eq!(packet.ssrc(), 1);
998 assert_eq!(packet.sequence_number(), 1);
999 assert_eq!(packet.timestamp(), 100);
1000
1001 let wait_for_close = async {
1002 while Arc::strong_count(&muxed.inner) > 1 {
1003 tokio::time::sleep(Duration::from_millis(100)).await;
1004 }
1005 };
1006
1007 tokio::time::timeout(Duration::from_secs(1), wait_for_close)
1008 .await
1009 .expect("RTCP handler tasks have not terminated");
1010
1011 let muxed = Arc::try_unwrap(muxed.inner)
1013 .ok()
1014 .unwrap()
1015 .into_inner()
1016 .ok()
1017 .unwrap();
1018
1019 assert!(muxed.inner.closed);
1020
1021 assert_eq!(muxed.inner.output.len(), 1);
1022
1023 let PacketMux::Rtcp(report) = &muxed.inner.output[0] else {
1024 panic!("expected RTCP packet");
1025 };
1026
1027 assert_eq!(report.len(), 3);
1028
1029 let rr = &report[0];
1030 let sdes = &report[1];
1031 let bye = &report[2];
1032
1033 assert_eq!(rr.packet_type(), RtcpPacketType::RR);
1034 assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
1035 assert_eq!(bye.packet_type(), RtcpPacketType::BYE);
1036 }
1037}