msf_rtp/rtcp/
handler.rs

1use std::{
2    collections::VecDeque,
3    future::Future,
4    pin::Pin,
5    task::{Context, Poll},
6    time::{Duration, Instant},
7};
8
9use futures::{channel::mpsc, ready, FutureExt, Sink, SinkExt, Stream, StreamExt};
10use tokio::{
11    task::JoinHandle,
12    time::{Interval, MissedTickBehavior},
13};
14
15use crate::{
16    rtcp::{ByePacket, ReceiverReport, RtcpContextHandle, RtcpPacketType, SenderReport},
17    transceiver::RtpTransceiver,
18    utils::PacketMux,
19    CompoundRtcpPacket, InvalidInput, RtpPacket,
20};
21
22/// RTCP handler options.
23#[derive(Copy, Clone)]
24pub struct RtcpHandlerOptions {
25    rtcp_report_interval: Duration,
26    ignore_decoding_errors: bool,
27}
28
29impl RtcpHandlerOptions {
30    /// Create new RTCP handler options with default values.
31    #[inline]
32    pub const fn new() -> Self {
33        Self {
34            rtcp_report_interval: Duration::from_secs(5),
35            ignore_decoding_errors: true,
36        }
37    }
38
39    /// Get the RTCP report interval.
40    #[inline]
41    pub const fn rtcp_report_interval(&self) -> Duration {
42        self.rtcp_report_interval
43    }
44
45    /// Set the RTCP report interval.
46    ///
47    /// RTCP reports will be generated every `interval` seconds. The default
48    /// value is 5 seconds.
49    #[inline]
50    pub const fn with_rtcp_report_interval(mut self, interval: Duration) -> Self {
51        self.rtcp_report_interval = interval;
52        self
53    }
54
55    /// Check if RTCP decoding errors should be ignored.
56    #[inline]
57    pub const fn ignore_decoding_errors(&self) -> bool {
58        self.ignore_decoding_errors
59    }
60
61    /// Set whether RTCP decoding errors should be ignored.
62    ///
63    /// If true, decoding errors will be ignored and the invalid packets
64    /// will be silently dropped. If false, the RTCP handler will stop
65    /// processing incoming RTCP packets on the first decoding error. The
66    /// default value is true.
67    #[inline]
68    pub const fn with_ignore_decoding_errors(mut self, ignore: bool) -> Self {
69        self.ignore_decoding_errors = ignore;
70        self
71    }
72}
73
74impl Default for RtcpHandlerOptions {
75    #[inline]
76    fn default() -> Self {
77        Self::new()
78    }
79}
80
81pin_project_lite::pin_project! {
82    /// RTCP protocol handler.
83    ///
84    /// The handler consumes a given RTP-RTCP stream pair and handles all the
85    /// necessary RTCP communication. The resulting object can be used as an RTP
86    /// stream/sink while the corresponding RTCP communication is handled
87    /// automatically by a background task.
88    pub struct RtcpHandler<T> {
89        #[pin]
90        stream: T,
91        context: RtcpHandlerContext,
92    }
93}
94
95impl<T> RtcpHandler<T> {
96    /// Create a new RTCP handler.
97    ///
98    /// The handler will use the RTCP context provided by the RTP transceiver.
99    pub fn new<U, E>(rtp: T, rtcp: U, options: RtcpHandlerOptions) -> Self
100    where
101        T: RtpTransceiver,
102        U: Send + 'static,
103        U: Stream<Item = Result<CompoundRtcpPacket, E>>,
104        U: Sink<CompoundRtcpPacket>,
105    {
106        let rtcp_context = rtp.rtcp_context();
107
108        Self::new_with_rtcp_context(rtp, rtcp, rtcp_context, options)
109    }
110
111    /// Create a new RTCP handler with a given RTCP context.
112    pub fn new_with_rtcp_context<U, E>(
113        rtp: T,
114        rtcp: U,
115        rtcp_context: RtcpContextHandle,
116        options: RtcpHandlerOptions,
117    ) -> Self
118    where
119        U: Send + 'static,
120        U: Stream<Item = Result<CompoundRtcpPacket, E>>,
121        U: Sink<CompoundRtcpPacket>,
122    {
123        let (rtcp_tx, rtcp_rx) = rtcp.split();
124
125        let sender = send_rtcp_reports(
126            rtcp_tx,
127            rtcp_context.clone(),
128            options.rtcp_report_interval(),
129        );
130
131        // NOTE: This task will run as long as the RtcpContext is generating
132        //   RTCP reports. It stops when the context is closed. Therefore, we
133        //   close the context when the handler is dropped.
134        tokio::spawn(async move {
135            let _ = sender.await;
136        });
137
138        let receiver = RtcpReceiver::new(
139            rtcp_rx,
140            rtcp_context.clone(),
141            options.ignore_decoding_errors(),
142        );
143
144        // NOTE: This task will be terminated when the handler is dropped.
145        let receiver = tokio::spawn(async move {
146            let _ = receiver.await;
147        });
148
149        Self {
150            stream: rtp,
151            context: RtcpHandlerContext {
152                context: rtcp_context,
153                receiver,
154            },
155        }
156    }
157}
158
159impl<T, P, E> Stream for RtcpHandler<T>
160where
161    T: Stream<Item = Result<P, E>>,
162{
163    type Item = Result<P, E>;
164
165    #[inline]
166    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
167        let this = self.project();
168
169        this.stream.poll_next(cx)
170    }
171}
172
173impl<T, P, E> Sink<P> for RtcpHandler<T>
174where
175    T: Sink<P, Error = E>,
176{
177    type Error = E;
178
179    #[inline]
180    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
181        let this = self.project();
182
183        this.stream.poll_ready(cx)
184    }
185
186    #[inline]
187    fn start_send(self: Pin<&mut Self>, packet: P) -> Result<(), Self::Error> {
188        let this = self.project();
189
190        this.stream.start_send(packet)
191    }
192
193    #[inline]
194    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
195        let this = self.project();
196
197        this.stream.poll_flush(cx)
198    }
199
200    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
201        let this = self.project();
202
203        ready!(this.stream.poll_close(cx))?;
204
205        // close the RTCP context here just in case the stream itself did not
206        // do it
207        this.context.context.close();
208
209        Poll::Ready(Ok(()))
210    }
211}
212
213/// RTCP handler context.
214struct RtcpHandlerContext {
215    context: RtcpContextHandle,
216    receiver: JoinHandle<()>,
217}
218
219impl Drop for RtcpHandlerContext {
220    fn drop(&mut self) {
221        // close the RTCP context here just in case the RTCP handler sink was
222        // not closed
223        self.context.close();
224
225        // stop the RTCP receiver
226        self.receiver.abort();
227    }
228}
229
230/// Type alias.
231type DemuxingRtpStream<P, E> = mpsc::Receiver<Result<P, E>>;
232
233/// Type alias.
234type MuxingRtpSink = PacketMuxer<mpsc::Sender<PacketMux>>;
235
236/// Type alias.
237type RtpComponent<P, E> = StreamSink<DemuxingRtpStream<P, E>, MuxingRtpSink>;
238
239/// RTCP protocol handler for muxed RTP-RTCP streams.
240///
241/// The handler consumes a given muxed RTP-RTCP stream and handles all the
242/// necessary RTCP communication. The resulting object can be used as an RTP
243/// stream/sink while the corresponding RTCP communication is handled
244/// automatically by a background task.
245pub struct MuxedRtcpHandler<P, E> {
246    inner: RtcpHandler<RtpComponent<P, E>>,
247    reader: JoinHandle<()>,
248    writer: JoinHandle<Result<(), E>>,
249    sink_error: bool,
250}
251
252impl<P, E> MuxedRtcpHandler<P, E> {
253    /// Create a new RTCP handler.
254    pub fn new<T>(stream: T, options: RtcpHandlerOptions) -> Self
255    where
256        T: Send + 'static,
257        T: Stream<Item = Result<PacketMux<P>, E>>,
258        T: Sink<PacketMux, Error = E>,
259        T: RtpTransceiver,
260        P: Send + 'static,
261        E: Send + 'static,
262    {
263        let rtcp_context = stream.rtcp_context();
264
265        let (muxed_tx, mut muxed_rx) = stream.split();
266
267        let (mut input_rtp_tx, input_rtp_rx) = mpsc::channel::<Result<_, E>>(4);
268        let (output_rtp_tx, output_rtp_rx) = mpsc::channel(4);
269        let (mut input_rtcp_tx, input_rtcp_rx) = mpsc::channel::<Result<_, E>>(4);
270        let (output_rtcp_tx, output_rtcp_rx) = mpsc::channel(4);
271
272        let output_rtp_tx = PacketMuxer::new(output_rtp_tx);
273        let output_rtcp_tx = PacketMuxer::new(output_rtcp_tx);
274
275        let rtp = StreamSink::new(input_rtp_rx, output_rtp_tx);
276        let rtcp = StreamSink::new(input_rtcp_rx, output_rtcp_tx);
277
278        // NOTE: This task will be terminated when the handler is dropped.
279        let reader = tokio::spawn(async move {
280            let mut run = true;
281
282            while run {
283                let next = muxed_rx.next().await;
284
285                run = matches!(next, Some(Ok(_)));
286
287                let _ = match next {
288                    Some(Ok(PacketMux::Rtp(packet))) => input_rtp_tx.send(Ok(packet)).await,
289                    Some(Ok(PacketMux::Rtcp(packet))) => input_rtcp_tx.send(Ok(packet)).await,
290                    Some(Err(err)) => input_rtp_tx.send(Err(err)).await,
291                    _ => Ok(()),
292                };
293            }
294        });
295
296        // NOTE: This task will run as long as the `output_rtp_rx` and
297        //   `output_rtcp_rx` are open. These channels will be closed when the
298        //   inner handler is dropped.
299        let writer = tokio::spawn(async move {
300            futures::stream::select(output_rtp_rx, output_rtcp_rx)
301                .map(Ok)
302                .forward(muxed_tx)
303                .await
304        });
305
306        Self {
307            inner: RtcpHandler::new_with_rtcp_context(rtp, rtcp, rtcp_context, options),
308            reader,
309            writer,
310            sink_error: false,
311        }
312    }
313
314    /// Poll the writer result.
315    fn poll_writer_result(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> {
316        match ready!(self.writer.poll_unpin(cx)) {
317            Ok(Ok(_)) => Poll::Ready(Ok(())),
318            Ok(Err(err)) => Poll::Ready(Err(err)),
319            Err(_) => Poll::Ready(Ok(())),
320        }
321    }
322}
323
324impl<P, E> Drop for MuxedRtcpHandler<P, E> {
325    #[inline]
326    fn drop(&mut self) {
327        self.reader.abort();
328    }
329}
330
331impl<P, E> Stream for MuxedRtcpHandler<P, E> {
332    type Item = Result<P, E>;
333
334    #[inline]
335    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
336        self.inner.poll_next_unpin(cx)
337    }
338}
339
340impl<P, E> Sink<RtpPacket> for MuxedRtcpHandler<P, E> {
341    type Error = E;
342
343    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
344        loop {
345            if self.sink_error {
346                return self.poll_writer_result(cx);
347            }
348
349            let res = ready!(SinkExt::<RtpPacket>::poll_ready_unpin(&mut self.inner, cx));
350
351            if res.is_ok() {
352                return Poll::Ready(Ok(()));
353            } else {
354                self.sink_error = true;
355            }
356        }
357    }
358
359    fn start_send(mut self: Pin<&mut Self>, item: RtpPacket) -> Result<(), Self::Error> {
360        let res = SinkExt::<RtpPacket>::start_send_unpin(&mut self.inner, item);
361
362        // we cannot get the actual error here, it needs to be polled out from
363        // the writer
364        if res.is_err() {
365            self.sink_error = true;
366        }
367
368        Ok(())
369    }
370
371    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
372        loop {
373            if self.sink_error {
374                return self.poll_writer_result(cx);
375            }
376
377            let res = ready!(SinkExt::<RtpPacket>::poll_flush_unpin(&mut self.inner, cx));
378
379            if res.is_ok() {
380                return Poll::Ready(Ok(()));
381            } else {
382                self.sink_error = true;
383            }
384        }
385    }
386
387    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
388        loop {
389            if self.sink_error {
390                return self.poll_writer_result(cx);
391            }
392
393            let res = ready!(SinkExt::<RtpPacket>::poll_close_unpin(&mut self.inner, cx));
394
395            if res.is_ok() {
396                return Poll::Ready(Ok(()));
397            } else {
398                self.sink_error = true;
399            }
400        }
401    }
402}
403
404pin_project_lite::pin_project! {
405    /// Helper struct.
406    struct StreamSink<T, U> {
407        #[pin]
408        stream: T,
409        #[pin]
410        sink: U,
411    }
412}
413
414impl<T, U> StreamSink<T, U> {
415    /// Create a new stream-sink.
416    fn new(stream: T, sink: U) -> Self {
417        Self { stream, sink }
418    }
419}
420
421impl<T, U> Stream for StreamSink<T, U>
422where
423    T: Stream,
424{
425    type Item = T::Item;
426
427    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
428        let this = self.project();
429
430        this.stream.poll_next(cx)
431    }
432}
433
434impl<T, U, I> Sink<I> for StreamSink<T, U>
435where
436    U: Sink<I>,
437{
438    type Error = U::Error;
439
440    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
441        let this = self.project();
442
443        this.sink.poll_ready(cx)
444    }
445
446    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
447        let this = self.project();
448
449        this.sink.start_send(item)
450    }
451
452    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
453        let this = self.project();
454
455        this.sink.poll_flush(cx)
456    }
457
458    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
459        let this = self.project();
460
461        this.sink.poll_close(cx)
462    }
463}
464
465pin_project_lite::pin_project! {
466    /// Helper struct.
467    struct PacketMuxer<T> {
468        #[pin]
469        inner: T,
470    }
471}
472
473impl<T> PacketMuxer<T> {
474    /// Create a new packet muxer.
475    fn new(sink: T) -> Self {
476        Self { inner: sink }
477    }
478}
479
480impl<T, I> Sink<I> for PacketMuxer<T>
481where
482    T: Sink<PacketMux>,
483    I: Into<PacketMux>,
484{
485    type Error = T::Error;
486
487    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
488        let this = self.project();
489
490        this.inner.poll_ready(cx)
491    }
492
493    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
494        let this = self.project();
495
496        this.inner.start_send(item.into())
497    }
498
499    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
500        let this = self.project();
501
502        this.inner.poll_flush(cx)
503    }
504
505    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
506        let this = self.project();
507
508        this.inner.poll_close(cx)
509    }
510}
511
512pin_project_lite::pin_project! {
513    /// Future that will read and process all incoming RTCP packets.
514    struct RtcpReceiver<T> {
515        #[pin]
516        stream: T,
517        context: RtcpReceiverContext,
518        ignore_decoding_errors: bool,
519    }
520}
521
522impl<T> RtcpReceiver<T> {
523    /// Create a new RTCP receiver.
524    fn new(stream: T, context: RtcpContextHandle, ignore_decoding_errors: bool) -> Self {
525        Self {
526            stream,
527            context: RtcpReceiverContext::new(context),
528            ignore_decoding_errors,
529        }
530    }
531}
532
533impl<T, E> Future for RtcpReceiver<T>
534where
535    T: Stream<Item = Result<CompoundRtcpPacket, E>>,
536{
537    type Output = Result<(), RtcpReceiverError<E>>;
538
539    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
540        let mut this = self.project();
541
542        loop {
543            let stream = this.stream.as_mut();
544
545            match ready!(stream.poll_next(cx)) {
546                Some(Ok(packet)) => {
547                    if let Err(err) = this.context.process_incoming_rtcp_packet(&packet) {
548                        if !*this.ignore_decoding_errors {
549                            return Poll::Ready(Err(err.into()));
550                        }
551                    }
552                }
553                Some(Err(err)) => return Poll::Ready(Err(RtcpReceiverError::Other(err))),
554                None => return Poll::Ready(Ok(())),
555            }
556        }
557    }
558}
559
560/// RTCP receiver context.
561struct RtcpReceiverContext {
562    context: RtcpContextHandle,
563}
564
565impl RtcpReceiverContext {
566    /// Create a new RTCP receiver context.
567    fn new(context: RtcpContextHandle) -> Self {
568        Self { context }
569    }
570
571    /// Process a given incoming RTCP packet.
572    fn process_incoming_rtcp_packet(
573        &mut self,
574        packet: &CompoundRtcpPacket,
575    ) -> Result<(), InvalidInput> {
576        for packet in packet.iter() {
577            match packet.packet_type() {
578                RtcpPacketType::SR => {
579                    self.context
580                        .process_incoming_sender_report(&SenderReport::decode(packet)?);
581                }
582                RtcpPacketType::RR => {
583                    self.context
584                        .process_incoming_receiver_report(&ReceiverReport::decode(packet)?);
585                }
586                RtcpPacketType::BYE => {
587                    self.context
588                        .process_incoming_bye_packet(&ByePacket::decode(packet)?);
589                }
590                _ => (),
591            }
592        }
593
594        Ok(())
595    }
596}
597
598/// Internal RTCP receiver error.
599enum RtcpReceiverError<E> {
600    InvalidInput,
601    Other(E),
602}
603
604impl<E> From<InvalidInput> for RtcpReceiverError<E> {
605    fn from(_: InvalidInput) -> Self {
606        Self::InvalidInput
607    }
608}
609
610/// Generate and send RTCP reports at regular intervals.
611///
612/// The returned future completes once there are no more RTCP reports to send
613/// and the sink has been flushed and closed.
614async fn send_rtcp_reports<T>(
615    sink: T,
616    context: RtcpContextHandle,
617    rtcp_report_interval: Duration,
618) -> Result<(), T::Error>
619where
620    T: Sink<CompoundRtcpPacket>,
621{
622    RtcpOutputStream::new(context, rtcp_report_interval)
623        .map(Ok)
624        .forward(sink)
625        .await
626}
627
628/// Stream of outgoing RTCP packets.
629struct RtcpOutputStream {
630    interval: Interval,
631    context: RtcpContextHandle,
632    output: VecDeque<CompoundRtcpPacket>,
633}
634
635impl RtcpOutputStream {
636    /// Create a new stream that will generate RTCP reports at regular
637    /// intervals.
638    fn new(context: RtcpContextHandle, rtcp_report_interval: Duration) -> Self {
639        let start = Instant::now() + (rtcp_report_interval / 2);
640
641        let mut interval = tokio::time::interval_at(start.into(), rtcp_report_interval);
642
643        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
644
645        Self {
646            interval,
647            context,
648            output: VecDeque::new(),
649        }
650    }
651}
652
653impl Stream for RtcpOutputStream {
654    type Item = CompoundRtcpPacket;
655
656    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
657        loop {
658            if let Some(packet) = self.output.pop_front() {
659                return Poll::Ready(Some(packet));
660            }
661
662            let closed = self.context.poll_closed(cx);
663
664            if closed.is_pending() {
665                ready!(self.interval.poll_tick(cx));
666            }
667
668            let packets = self.context.create_rtcp_reports();
669
670            if packets.is_empty() {
671                return Poll::Ready(None);
672            }
673
674            self.output.extend(packets);
675        }
676    }
677}
678
679#[cfg(test)]
680mod tests {
681    use std::{
682        collections::VecDeque,
683        convert::Infallible,
684        pin::Pin,
685        sync::{Arc, Mutex},
686        task::{Context, Poll},
687        time::{Duration, Instant},
688    };
689
690    use futures::{channel::mpsc, Sink, SinkExt, Stream, StreamExt};
691
692    use super::{MuxedRtcpHandler, RtcpHandler, RtcpHandlerOptions, StreamSink};
693
694    use crate::{
695        rtcp::{RtcpContext, RtcpPacketType},
696        rtp::{IncomingRtpPacket, OrderedRtpPacket, RtpPacket},
697        transceiver::{DefaultRtpTransceiver, RtpTransceiver, RtpTransceiverOptions, SSRCMode},
698        utils::PacketMux,
699    };
700
701    fn make_rtp_packet(ssrc: u32, seq: u16, timestamp: u32) -> RtpPacket {
702        RtpPacket::new()
703            .with_ssrc(ssrc)
704            .with_sequence_number(seq)
705            .with_timestamp(timestamp)
706    }
707
708    /// Helper stream-sink for testing.
709    #[derive(Clone)]
710    struct RtcpTestChannel<I, O> {
711        inner: Arc<Mutex<InnerRtcpTestChannel<I, O>>>,
712    }
713
714    impl<I, O> RtcpTestChannel<I, O> {
715        /// Create a new RTCP test channel.
716        fn new<T>(input: T) -> Self
717        where
718            T: IntoIterator<Item = I>,
719        {
720            Self {
721                inner: Arc::new(Mutex::new(InnerRtcpTestChannel::new(input))),
722            }
723        }
724    }
725
726    impl<I, O> Stream for RtcpTestChannel<I, O> {
727        type Item = Result<I, Infallible>;
728
729        fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
730            let mut inner = self.inner.lock().unwrap();
731
732            if let Some(packet) = inner.input.pop_front() {
733                Poll::Ready(Some(Ok(packet)))
734            } else {
735                Poll::Pending
736            }
737        }
738    }
739
740    impl<I, O> Sink<O> for RtcpTestChannel<I, O> {
741        type Error = Infallible;
742
743        fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
744            Poll::Ready(Ok(()))
745        }
746
747        fn start_send(self: Pin<&mut Self>, packet: O) -> Result<(), Self::Error> {
748            let mut inner = self.inner.lock().unwrap();
749            inner.output.push(packet);
750
751            Ok(())
752        }
753
754        fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
755            Poll::Ready(Ok(()))
756        }
757
758        fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
759            self.inner.lock().unwrap().closed = true;
760
761            Poll::Ready(Ok(()))
762        }
763    }
764
765    /// Inner RTCP test channel.
766    struct InnerRtcpTestChannel<I, O> {
767        input: VecDeque<I>,
768        output: Vec<O>,
769        closed: bool,
770    }
771
772    impl<I, O> InnerRtcpTestChannel<I, O> {
773        /// Create a new inner RTCP test channel.
774        fn new<T>(input: T) -> Self
775        where
776            T: IntoIterator<Item = I>,
777        {
778            Self {
779                input: VecDeque::from_iter(input),
780                output: Vec::new(),
781                closed: false,
782            }
783        }
784    }
785
786    /// Test transceiver for muxed RTP-RTCP streams.
787    #[derive(Clone)]
788    struct MuxedTestTransceiver {
789        inner: Arc<Mutex<InnerMuxedTestTransceiver>>,
790    }
791
792    impl MuxedTestTransceiver {
793        /// Create a new muxed RTP-RTCP test transceiver.
794        fn new<T>(input: T, options: RtpTransceiverOptions) -> Self
795        where
796            T: IntoIterator<Item = PacketMux>,
797        {
798            let inner = InnerMuxedTestTransceiver::new(input, options);
799
800            Self {
801                inner: Arc::new(Mutex::new(inner)),
802            }
803        }
804    }
805
806    impl Stream for MuxedTestTransceiver {
807        type Item = Result<PacketMux, Infallible>;
808
809        fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
810            let mut inner = self.inner.lock().unwrap();
811
812            if let Some(packet) = inner.inner.input.pop_front() {
813                let packet = if let PacketMux::Rtp(packet) = packet {
814                    let index = packet.sequence_number() as u64;
815
816                    let now = Instant::now();
817                    let incoming = IncomingRtpPacket::new(packet, now);
818                    let ordered = OrderedRtpPacket::new(incoming, index);
819
820                    inner.context.process_incoming_rtp_packet(&ordered);
821                    inner.context.process_ordered_rtp_packet(&ordered);
822
823                    PacketMux::Rtp(ordered.into())
824                } else {
825                    packet
826                };
827
828                Poll::Ready(Some(Ok(packet)))
829            } else {
830                Poll::Ready(None)
831            }
832        }
833    }
834
835    impl Sink<PacketMux> for MuxedTestTransceiver {
836        type Error = Infallible;
837
838        fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
839            Poll::Ready(Ok(()))
840        }
841
842        fn start_send(self: Pin<&mut Self>, packet: PacketMux) -> Result<(), Self::Error> {
843            let mut inner = self.inner.lock().unwrap();
844
845            if let PacketMux::Rtp(packet) = &packet {
846                inner.context.process_outgoing_rtp_packet(packet);
847            }
848
849            inner.inner.output.push(packet);
850
851            Ok(())
852        }
853
854        fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
855            Poll::Ready(Ok(()))
856        }
857
858        fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
859            let mut inner = self.inner.lock().unwrap();
860
861            inner.inner.closed = true;
862
863            Poll::Ready(Ok(()))
864        }
865    }
866
867    impl RtpTransceiver for MuxedTestTransceiver {
868        fn rtcp_context(&self) -> crate::rtcp::RtcpContextHandle {
869            let inner = self.inner.lock().unwrap();
870
871            inner.context.handle()
872        }
873    }
874
875    /// Inner muxed RTP-RTCP test transceiver.
876    struct InnerMuxedTestTransceiver {
877        inner: InnerRtcpTestChannel<PacketMux, PacketMux>,
878        context: RtcpContext,
879    }
880
881    impl InnerMuxedTestTransceiver {
882        /// Create a new inner muxed RTP-RTCP test transceiver.
883        fn new<T>(input: T, options: RtpTransceiverOptions) -> Self
884        where
885            T: IntoIterator<Item = PacketMux>,
886        {
887            Self {
888                inner: InnerRtcpTestChannel::new(input),
889                context: RtcpContext::new(options),
890            }
891        }
892    }
893
894    #[tokio::test]
895    async fn test_handler_task_termination() {
896        let (mut incoming_rtp_tx, incoming_rtp_rx) =
897            mpsc::unbounded::<Result<RtpPacket, Infallible>>();
898        let (outgoing_rtp_tx, outgoing_rtp_rx) = mpsc::unbounded::<RtpPacket>();
899
900        let rtp = StreamSink::new(incoming_rtp_rx, outgoing_rtp_tx);
901
902        let options = RtpTransceiverOptions::new()
903            .with_default_clock_rate(1000)
904            .with_primary_sender_ssrc(0)
905            .with_input_ssrc_mode(SSRCMode::Any);
906
907        let rtp = DefaultRtpTransceiver::<_, Infallible>::new(rtp, options);
908
909        let rtcp = RtcpTestChannel::new([]);
910
911        let options = RtcpHandlerOptions::new()
912            .with_ignore_decoding_errors(true)
913            .with_rtcp_report_interval(Duration::from_millis(100));
914
915        let handler = RtcpHandler::new(rtp, rtcp.clone(), options);
916
917        let handler = tokio::spawn(async move { handler.collect::<Vec<_>>().await });
918
919        incoming_rtp_tx
920            .send(Ok(make_rtp_packet(1, 1, 100)))
921            .await
922            .unwrap();
923        incoming_rtp_tx.close().await.unwrap();
924
925        let incoming_rtp_packets = handler.await.unwrap();
926
927        std::mem::drop(outgoing_rtp_rx);
928
929        assert_eq!(incoming_rtp_packets.len(), 1);
930
931        let packet = incoming_rtp_packets.into_iter().next().unwrap().unwrap();
932
933        assert_eq!(packet.ssrc(), 1);
934        assert_eq!(packet.sequence_number(), 1);
935        assert_eq!(packet.timestamp(), 100);
936
937        let wait_for_close = async {
938            while Arc::strong_count(&rtcp.inner) > 1 {
939                tokio::time::sleep(Duration::from_millis(100)).await;
940            }
941        };
942
943        tokio::time::timeout(Duration::from_secs(1), wait_for_close)
944            .await
945            .expect("RTCP handler tasks have not terminated");
946
947        // once we reach here, both RTCP handler tasks are already terminated
948        let rtcp = Arc::try_unwrap(rtcp.inner)
949            .ok()
950            .unwrap()
951            .into_inner()
952            .ok()
953            .unwrap();
954
955        assert!(rtcp.closed);
956
957        assert_eq!(rtcp.output.len(), 1);
958
959        let report = &rtcp.output[0];
960
961        assert_eq!(report.len(), 3);
962
963        let rr = &report[0];
964        let sdes = &report[1];
965        let bye = &report[2];
966
967        assert_eq!(rr.packet_type(), RtcpPacketType::RR);
968        assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
969        assert_eq!(bye.packet_type(), RtcpPacketType::BYE);
970    }
971
972    #[tokio::test]
973    async fn test_muxed_handler_task_termination() {
974        let options = RtpTransceiverOptions::new()
975            .with_default_clock_rate(1000)
976            .with_primary_sender_ssrc(0)
977            .with_input_ssrc_mode(SSRCMode::Any);
978
979        let packet = PacketMux::Rtp(make_rtp_packet(1, 1, 100));
980
981        let muxed = MuxedTestTransceiver::new([packet], options);
982
983        let options = RtcpHandlerOptions::new()
984            .with_ignore_decoding_errors(true)
985            .with_rtcp_report_interval(Duration::from_millis(100));
986
987        let handler = MuxedRtcpHandler::new(muxed.clone(), options);
988
989        let handler = tokio::spawn(async move { handler.collect::<Vec<_>>().await });
990
991        let incoming_rtp_packets = handler.await.unwrap();
992
993        assert_eq!(incoming_rtp_packets.len(), 1);
994
995        let packet = incoming_rtp_packets.into_iter().next().unwrap().unwrap();
996
997        assert_eq!(packet.ssrc(), 1);
998        assert_eq!(packet.sequence_number(), 1);
999        assert_eq!(packet.timestamp(), 100);
1000
1001        let wait_for_close = async {
1002            while Arc::strong_count(&muxed.inner) > 1 {
1003                tokio::time::sleep(Duration::from_millis(100)).await;
1004            }
1005        };
1006
1007        tokio::time::timeout(Duration::from_secs(1), wait_for_close)
1008            .await
1009            .expect("RTCP handler tasks have not terminated");
1010
1011        // once we reach here, all RTCP handler tasks are already terminated
1012        let muxed = Arc::try_unwrap(muxed.inner)
1013            .ok()
1014            .unwrap()
1015            .into_inner()
1016            .ok()
1017            .unwrap();
1018
1019        assert!(muxed.inner.closed);
1020
1021        assert_eq!(muxed.inner.output.len(), 1);
1022
1023        let PacketMux::Rtcp(report) = &muxed.inner.output[0] else {
1024            panic!("expected RTCP packet");
1025        };
1026
1027        assert_eq!(report.len(), 3);
1028
1029        let rr = &report[0];
1030        let sdes = &report[1];
1031        let bye = &report[2];
1032
1033        assert_eq!(rr.packet_type(), RtcpPacketType::RR);
1034        assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
1035        assert_eq!(bye.packet_type(), RtcpPacketType::BYE);
1036    }
1037}