use std::{
collections::VecDeque,
future::Future,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
use futures::{channel::mpsc, ready, FutureExt, Sink, SinkExt, Stream, StreamExt};
use tokio::{
task::JoinHandle,
time::{Interval, MissedTickBehavior},
};
use crate::{
rtcp::{ByePacket, ReceiverReport, RtcpContextHandle, RtcpPacketType, SenderReport},
transceiver::RtpTransceiver,
utils::PacketMux,
CompoundRtcpPacket, InvalidInput, RtpPacket,
};
#[derive(Copy, Clone)]
pub struct RtcpHandlerOptions {
rtcp_report_interval: Duration,
ignore_decoding_errors: bool,
}
impl RtcpHandlerOptions {
#[inline]
pub const fn new() -> Self {
Self {
rtcp_report_interval: Duration::from_secs(5),
ignore_decoding_errors: true,
}
}
#[inline]
pub const fn rtcp_report_interval(&self) -> Duration {
self.rtcp_report_interval
}
#[inline]
pub const fn with_rtcp_report_interval(mut self, interval: Duration) -> Self {
self.rtcp_report_interval = interval;
self
}
#[inline]
pub const fn ignore_decoding_errors(&self) -> bool {
self.ignore_decoding_errors
}
#[inline]
pub const fn with_ignore_decoding_errors(mut self, ignore: bool) -> Self {
self.ignore_decoding_errors = ignore;
self
}
}
impl Default for RtcpHandlerOptions {
#[inline]
fn default() -> Self {
Self::new()
}
}
pin_project_lite::pin_project! {
pub struct RtcpHandler<T> {
#[pin]
stream: T,
context: RtcpHandlerContext,
}
}
impl<T> RtcpHandler<T> {
pub fn new<U, E>(rtp: T, rtcp: U, options: RtcpHandlerOptions) -> Self
where
T: RtpTransceiver,
U: Send + 'static,
U: Stream<Item = Result<CompoundRtcpPacket, E>>,
U: Sink<CompoundRtcpPacket>,
{
let rtcp_context = rtp.rtcp_context();
Self::new_with_rtcp_context(rtp, rtcp, rtcp_context, options)
}
pub fn new_with_rtcp_context<U, E>(
rtp: T,
rtcp: U,
rtcp_context: RtcpContextHandle,
options: RtcpHandlerOptions,
) -> Self
where
U: Send + 'static,
U: Stream<Item = Result<CompoundRtcpPacket, E>>,
U: Sink<CompoundRtcpPacket>,
{
let (rtcp_tx, rtcp_rx) = rtcp.split();
let sender = send_rtcp_reports(
rtcp_tx,
rtcp_context.clone(),
options.rtcp_report_interval(),
);
tokio::spawn(async move {
let _ = sender.await;
});
let receiver = RtcpReceiver::new(
rtcp_rx,
rtcp_context.clone(),
options.ignore_decoding_errors(),
);
let receiver = tokio::spawn(async move {
let _ = receiver.await;
});
Self {
stream: rtp,
context: RtcpHandlerContext {
context: rtcp_context,
receiver,
},
}
}
}
impl<T, P, E> Stream for RtcpHandler<T>
where
T: Stream<Item = Result<P, E>>,
{
type Item = Result<P, E>;
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
this.stream.poll_next(cx)
}
}
impl<T, P, E> Sink<P> for RtcpHandler<T>
where
T: Sink<P, Error = E>,
{
type Error = E;
#[inline]
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
this.stream.poll_ready(cx)
}
#[inline]
fn start_send(self: Pin<&mut Self>, packet: P) -> Result<(), Self::Error> {
let this = self.project();
this.stream.start_send(packet)
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
this.stream.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
ready!(this.stream.poll_close(cx))?;
this.context.context.close();
Poll::Ready(Ok(()))
}
}
struct RtcpHandlerContext {
context: RtcpContextHandle,
receiver: JoinHandle<()>,
}
impl Drop for RtcpHandlerContext {
fn drop(&mut self) {
self.context.close();
self.receiver.abort();
}
}
type DemuxingRtpStream<P, E> = mpsc::Receiver<Result<P, E>>;
type MuxingRtpSink = PacketMuxer<mpsc::Sender<PacketMux>>;
type RtpComponent<P, E> = StreamSink<DemuxingRtpStream<P, E>, MuxingRtpSink>;
pub struct MuxedRtcpHandler<P, E> {
inner: RtcpHandler<RtpComponent<P, E>>,
reader: JoinHandle<()>,
writer: JoinHandle<Result<(), E>>,
sink_error: bool,
}
impl<P, E> MuxedRtcpHandler<P, E> {
pub fn new<T>(stream: T, options: RtcpHandlerOptions) -> Self
where
T: Send + 'static,
T: Stream<Item = Result<PacketMux<P>, E>>,
T: Sink<PacketMux, Error = E>,
T: RtpTransceiver,
P: Send + 'static,
E: Send + 'static,
{
let rtcp_context = stream.rtcp_context();
let (muxed_tx, mut muxed_rx) = stream.split();
let (mut input_rtp_tx, input_rtp_rx) = mpsc::channel::<Result<_, E>>(4);
let (output_rtp_tx, output_rtp_rx) = mpsc::channel(4);
let (mut input_rtcp_tx, input_rtcp_rx) = mpsc::channel::<Result<_, E>>(4);
let (output_rtcp_tx, output_rtcp_rx) = mpsc::channel(4);
let output_rtp_tx = PacketMuxer::new(output_rtp_tx);
let output_rtcp_tx = PacketMuxer::new(output_rtcp_tx);
let rtp = StreamSink::new(input_rtp_rx, output_rtp_tx);
let rtcp = StreamSink::new(input_rtcp_rx, output_rtcp_tx);
let reader = tokio::spawn(async move {
let mut run = true;
while run {
let next = muxed_rx.next().await;
run = matches!(next, Some(Ok(_)));
let _ = match next {
Some(Ok(PacketMux::Rtp(packet))) => input_rtp_tx.send(Ok(packet)).await,
Some(Ok(PacketMux::Rtcp(packet))) => input_rtcp_tx.send(Ok(packet)).await,
Some(Err(err)) => input_rtp_tx.send(Err(err)).await,
_ => Ok(()),
};
}
});
let writer = tokio::spawn(async move {
futures::stream::select(output_rtp_rx, output_rtcp_rx)
.map(Ok)
.forward(muxed_tx)
.await
});
Self {
inner: RtcpHandler::new_with_rtcp_context(rtp, rtcp, rtcp_context, options),
reader,
writer,
sink_error: false,
}
}
fn poll_writer_result(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> {
match ready!(self.writer.poll_unpin(cx)) {
Ok(Ok(_)) => Poll::Ready(Ok(())),
Ok(Err(err)) => Poll::Ready(Err(err)),
Err(_) => Poll::Ready(Ok(())),
}
}
}
impl<P, E> Drop for MuxedRtcpHandler<P, E> {
#[inline]
fn drop(&mut self) {
self.reader.abort();
}
}
impl<P, E> Stream for MuxedRtcpHandler<P, E> {
type Item = Result<P, E>;
#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
impl<P, E> Sink<RtpPacket> for MuxedRtcpHandler<P, E> {
type Error = E;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
if self.sink_error {
return self.poll_writer_result(cx);
}
let res = ready!(SinkExt::<RtpPacket>::poll_ready_unpin(&mut self.inner, cx));
if res.is_ok() {
return Poll::Ready(Ok(()));
} else {
self.sink_error = true;
}
}
}
fn start_send(mut self: Pin<&mut Self>, item: RtpPacket) -> Result<(), Self::Error> {
let res = SinkExt::<RtpPacket>::start_send_unpin(&mut self.inner, item);
if res.is_err() {
self.sink_error = true;
}
Ok(())
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
if self.sink_error {
return self.poll_writer_result(cx);
}
let res = ready!(SinkExt::<RtpPacket>::poll_flush_unpin(&mut self.inner, cx));
if res.is_ok() {
return Poll::Ready(Ok(()));
} else {
self.sink_error = true;
}
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
if self.sink_error {
return self.poll_writer_result(cx);
}
let res = ready!(SinkExt::<RtpPacket>::poll_close_unpin(&mut self.inner, cx));
if res.is_ok() {
return Poll::Ready(Ok(()));
} else {
self.sink_error = true;
}
}
}
}
pin_project_lite::pin_project! {
struct StreamSink<T, U> {
#[pin]
stream: T,
#[pin]
sink: U,
}
}
impl<T, U> StreamSink<T, U> {
fn new(stream: T, sink: U) -> Self {
Self { stream, sink }
}
}
impl<T, U> Stream for StreamSink<T, U>
where
T: Stream,
{
type Item = T::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
this.stream.poll_next(cx)
}
}
impl<T, U, I> Sink<I> for StreamSink<T, U>
where
U: Sink<I>,
{
type Error = U::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
this.sink.poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
let this = self.project();
this.sink.start_send(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
this.sink.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
this.sink.poll_close(cx)
}
}
pin_project_lite::pin_project! {
struct PacketMuxer<T> {
#[pin]
inner: T,
}
}
impl<T> PacketMuxer<T> {
fn new(sink: T) -> Self {
Self { inner: sink }
}
}
impl<T, I> Sink<I> for PacketMuxer<T>
where
T: Sink<PacketMux>,
I: Into<PacketMux>,
{
type Error = T::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
this.inner.poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
let this = self.project();
this.inner.start_send(item.into())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
this.inner.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
this.inner.poll_close(cx)
}
}
pin_project_lite::pin_project! {
struct RtcpReceiver<T> {
#[pin]
stream: T,
context: RtcpReceiverContext,
ignore_decoding_errors: bool,
}
}
impl<T> RtcpReceiver<T> {
fn new(stream: T, context: RtcpContextHandle, ignore_decoding_errors: bool) -> Self {
Self {
stream,
context: RtcpReceiverContext::new(context),
ignore_decoding_errors,
}
}
}
impl<T, E> Future for RtcpReceiver<T>
where
T: Stream<Item = Result<CompoundRtcpPacket, E>>,
{
type Output = Result<(), RtcpReceiverError<E>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let stream = this.stream.as_mut();
match ready!(stream.poll_next(cx)) {
Some(Ok(packet)) => {
if let Err(err) = this.context.process_incoming_rtcp_packet(&packet) {
if !*this.ignore_decoding_errors {
return Poll::Ready(Err(err.into()));
}
}
}
Some(Err(err)) => return Poll::Ready(Err(RtcpReceiverError::Other(err))),
None => return Poll::Ready(Ok(())),
}
}
}
}
struct RtcpReceiverContext {
context: RtcpContextHandle,
}
impl RtcpReceiverContext {
fn new(context: RtcpContextHandle) -> Self {
Self { context }
}
fn process_incoming_rtcp_packet(
&mut self,
packet: &CompoundRtcpPacket,
) -> Result<(), InvalidInput> {
for packet in packet.iter() {
match packet.packet_type() {
RtcpPacketType::SR => {
self.context
.process_incoming_sender_report(&SenderReport::decode(packet)?);
}
RtcpPacketType::RR => {
self.context
.process_incoming_receiver_report(&ReceiverReport::decode(packet)?);
}
RtcpPacketType::BYE => {
self.context
.process_incoming_bye_packet(&ByePacket::decode(packet)?);
}
_ => (),
}
}
Ok(())
}
}
enum RtcpReceiverError<E> {
InvalidInput,
Other(E),
}
impl<E> From<InvalidInput> for RtcpReceiverError<E> {
fn from(_: InvalidInput) -> Self {
Self::InvalidInput
}
}
async fn send_rtcp_reports<T>(
sink: T,
context: RtcpContextHandle,
rtcp_report_interval: Duration,
) -> Result<(), T::Error>
where
T: Sink<CompoundRtcpPacket>,
{
RtcpOutputStream::new(context, rtcp_report_interval)
.map(Ok)
.forward(sink)
.await
}
struct RtcpOutputStream {
interval: Interval,
context: RtcpContextHandle,
output: VecDeque<CompoundRtcpPacket>,
}
impl RtcpOutputStream {
fn new(context: RtcpContextHandle, rtcp_report_interval: Duration) -> Self {
let start = Instant::now() + (rtcp_report_interval / 2);
let mut interval = tokio::time::interval_at(start.into(), rtcp_report_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
Self {
interval,
context,
output: VecDeque::new(),
}
}
}
impl Stream for RtcpOutputStream {
type Item = CompoundRtcpPacket;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(packet) = self.output.pop_front() {
return Poll::Ready(Some(packet));
}
let closed = self.context.poll_closed(cx);
if closed.is_pending() {
ready!(self.interval.poll_tick(cx));
}
let packets = self.context.create_rtcp_reports();
if packets.is_empty() {
return Poll::Ready(None);
}
self.output.extend(packets);
}
}
}
#[cfg(test)]
mod tests {
use std::{
collections::VecDeque,
convert::Infallible,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
time::{Duration, Instant},
};
use futures::{channel::mpsc, Sink, SinkExt, Stream, StreamExt};
use super::{MuxedRtcpHandler, RtcpHandler, RtcpHandlerOptions, StreamSink};
use crate::{
rtcp::{RtcpContext, RtcpPacketType},
rtp::{IncomingRtpPacket, OrderedRtpPacket, RtpPacket},
transceiver::{DefaultRtpTransceiver, RtpTransceiver, RtpTransceiverOptions, SSRCMode},
utils::PacketMux,
};
fn make_rtp_packet(ssrc: u32, seq: u16, timestamp: u32) -> RtpPacket {
RtpPacket::new()
.with_ssrc(ssrc)
.with_sequence_number(seq)
.with_timestamp(timestamp)
}
#[derive(Clone)]
struct RtcpTestChannel<I, O> {
inner: Arc<Mutex<InnerRtcpTestChannel<I, O>>>,
}
impl<I, O> RtcpTestChannel<I, O> {
fn new<T>(input: T) -> Self
where
T: IntoIterator<Item = I>,
{
Self {
inner: Arc::new(Mutex::new(InnerRtcpTestChannel::new(input))),
}
}
}
impl<I, O> Stream for RtcpTestChannel<I, O> {
type Item = Result<I, Infallible>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut inner = self.inner.lock().unwrap();
if let Some(packet) = inner.input.pop_front() {
Poll::Ready(Some(Ok(packet)))
} else {
Poll::Pending
}
}
}
impl<I, O> Sink<O> for RtcpTestChannel<I, O> {
type Error = Infallible;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, packet: O) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().unwrap();
inner.output.push(packet);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.lock().unwrap().closed = true;
Poll::Ready(Ok(()))
}
}
struct InnerRtcpTestChannel<I, O> {
input: VecDeque<I>,
output: Vec<O>,
closed: bool,
}
impl<I, O> InnerRtcpTestChannel<I, O> {
fn new<T>(input: T) -> Self
where
T: IntoIterator<Item = I>,
{
Self {
input: VecDeque::from_iter(input),
output: Vec::new(),
closed: false,
}
}
}
#[derive(Clone)]
struct MuxedTestTransceiver {
inner: Arc<Mutex<InnerMuxedTestTransceiver>>,
}
impl MuxedTestTransceiver {
fn new<T>(input: T, options: RtpTransceiverOptions) -> Self
where
T: IntoIterator<Item = PacketMux>,
{
let inner = InnerMuxedTestTransceiver::new(input, options);
Self {
inner: Arc::new(Mutex::new(inner)),
}
}
}
impl Stream for MuxedTestTransceiver {
type Item = Result<PacketMux, Infallible>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut inner = self.inner.lock().unwrap();
if let Some(packet) = inner.inner.input.pop_front() {
let packet = if let PacketMux::Rtp(packet) = packet {
let index = packet.sequence_number() as u64;
let now = Instant::now();
let incoming = IncomingRtpPacket::new(packet, now);
let ordered = OrderedRtpPacket::new(incoming, index);
inner.context.process_incoming_rtp_packet(&ordered);
inner.context.process_ordered_rtp_packet(&ordered);
PacketMux::Rtp(ordered.into())
} else {
packet
};
Poll::Ready(Some(Ok(packet)))
} else {
Poll::Ready(None)
}
}
}
impl Sink<PacketMux> for MuxedTestTransceiver {
type Error = Infallible;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, packet: PacketMux) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().unwrap();
if let PacketMux::Rtp(packet) = &packet {
inner.context.process_outgoing_rtp_packet(packet);
}
inner.inner.output.push(packet);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut inner = self.inner.lock().unwrap();
inner.inner.closed = true;
Poll::Ready(Ok(()))
}
}
impl RtpTransceiver for MuxedTestTransceiver {
fn rtcp_context(&self) -> crate::rtcp::RtcpContextHandle {
let inner = self.inner.lock().unwrap();
inner.context.handle()
}
}
struct InnerMuxedTestTransceiver {
inner: InnerRtcpTestChannel<PacketMux, PacketMux>,
context: RtcpContext,
}
impl InnerMuxedTestTransceiver {
fn new<T>(input: T, options: RtpTransceiverOptions) -> Self
where
T: IntoIterator<Item = PacketMux>,
{
Self {
inner: InnerRtcpTestChannel::new(input),
context: RtcpContext::new(options),
}
}
}
#[tokio::test]
async fn test_handler_task_termination() {
let (mut incoming_rtp_tx, incoming_rtp_rx) =
mpsc::unbounded::<Result<RtpPacket, Infallible>>();
let (outgoing_rtp_tx, outgoing_rtp_rx) = mpsc::unbounded::<RtpPacket>();
let rtp = StreamSink::new(incoming_rtp_rx, outgoing_rtp_tx);
let options = RtpTransceiverOptions::new()
.with_default_clock_rate(1000)
.with_primary_sender_ssrc(0)
.with_input_ssrc_mode(SSRCMode::Any);
let rtp = DefaultRtpTransceiver::<_, Infallible>::new(rtp, options);
let rtcp = RtcpTestChannel::new([]);
let options = RtcpHandlerOptions::new()
.with_ignore_decoding_errors(true)
.with_rtcp_report_interval(Duration::from_millis(100));
let handler = RtcpHandler::new(rtp, rtcp.clone(), options);
let handler = tokio::spawn(async move { handler.collect::<Vec<_>>().await });
incoming_rtp_tx
.send(Ok(make_rtp_packet(1, 1, 100)))
.await
.unwrap();
incoming_rtp_tx.close().await.unwrap();
let incoming_rtp_packets = handler.await.unwrap();
std::mem::drop(outgoing_rtp_rx);
assert_eq!(incoming_rtp_packets.len(), 1);
let packet = incoming_rtp_packets.into_iter().next().unwrap().unwrap();
assert_eq!(packet.ssrc(), 1);
assert_eq!(packet.sequence_number(), 1);
assert_eq!(packet.timestamp(), 100);
let wait_for_close = async {
while Arc::strong_count(&rtcp.inner) > 1 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
};
tokio::time::timeout(Duration::from_secs(1), wait_for_close)
.await
.expect("RTCP handler tasks have not terminated");
let rtcp = Arc::try_unwrap(rtcp.inner)
.ok()
.unwrap()
.into_inner()
.ok()
.unwrap();
assert!(rtcp.closed);
assert_eq!(rtcp.output.len(), 1);
let report = &rtcp.output[0];
assert_eq!(report.len(), 3);
let rr = &report[0];
let sdes = &report[1];
let bye = &report[2];
assert_eq!(rr.packet_type(), RtcpPacketType::RR);
assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
assert_eq!(bye.packet_type(), RtcpPacketType::BYE);
}
#[tokio::test]
async fn test_muxed_handler_task_termination() {
let options = RtpTransceiverOptions::new()
.with_default_clock_rate(1000)
.with_primary_sender_ssrc(0)
.with_input_ssrc_mode(SSRCMode::Any);
let packet = PacketMux::Rtp(make_rtp_packet(1, 1, 100));
let muxed = MuxedTestTransceiver::new([packet], options);
let options = RtcpHandlerOptions::new()
.with_ignore_decoding_errors(true)
.with_rtcp_report_interval(Duration::from_millis(100));
let handler = MuxedRtcpHandler::new(muxed.clone(), options);
let handler = tokio::spawn(async move { handler.collect::<Vec<_>>().await });
let incoming_rtp_packets = handler.await.unwrap();
assert_eq!(incoming_rtp_packets.len(), 1);
let packet = incoming_rtp_packets.into_iter().next().unwrap().unwrap();
assert_eq!(packet.ssrc(), 1);
assert_eq!(packet.sequence_number(), 1);
assert_eq!(packet.timestamp(), 100);
let wait_for_close = async {
while Arc::strong_count(&muxed.inner) > 1 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
};
tokio::time::timeout(Duration::from_secs(1), wait_for_close)
.await
.expect("RTCP handler tasks have not terminated");
let muxed = Arc::try_unwrap(muxed.inner)
.ok()
.unwrap()
.into_inner()
.ok()
.unwrap();
assert!(muxed.inner.closed);
assert_eq!(muxed.inner.output.len(), 1);
let PacketMux::Rtcp(report) = &muxed.inner.output[0] else {
panic!("expected RTCP packet");
};
assert_eq!(report.len(), 3);
let rr = &report[0];
let sdes = &report[1];
let bye = &report[2];
assert_eq!(rr.packet_type(), RtcpPacketType::RR);
assert_eq!(sdes.packet_type(), RtcpPacketType::SDES);
assert_eq!(bye.packet_type(), RtcpPacketType::BYE);
}
}