msf_rtp/rtcp/
channel.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use bytes::{Bytes, BytesMut};
7use futures::{ready, Sink, Stream};
8
9use crate::{rtcp::CompoundRtcpPacket, InvalidInput};
10
11pin_project_lite::pin_project! {
12    /// RTCP channel decoding/encoding RTCP packets from/to byte frames.
13    pub struct RtcpChannel<T> {
14        #[pin]
15        inner: T,
16        output_buffer: BytesMut,
17        ignore_decoding_errors: bool,
18    }
19}
20
21impl<T> RtcpChannel<T> {
22    /// Create a new RTCP channel from a given byte frame channel.
23    ///
24    /// # Arguments
25    /// * `inner` - the underlying byte frame channel
26    /// * `ignore_decoding_errors` - if true, decoding errors will be ignored
27    ///   and the invalid packets will be silently dropped (the underlying
28    ///   stream errors will still be propagated)
29    #[inline]
30    pub fn new(inner: T, ignore_decoding_errors: bool) -> Self {
31        Self {
32            inner,
33            output_buffer: BytesMut::new(),
34            ignore_decoding_errors,
35        }
36    }
37}
38
39impl<T, E> Stream for RtcpChannel<T>
40where
41    T: Stream<Item = Result<Bytes, E>>,
42    E: From<InvalidInput>,
43{
44    type Item = Result<CompoundRtcpPacket, E>;
45
46    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47        let mut this = self.project();
48
49        loop {
50            let inner = this.inner.as_mut();
51
52            let res = match ready!(inner.poll_next(cx)) {
53                Some(Ok(frame)) => match CompoundRtcpPacket::decode(frame) {
54                    Ok(packet) => Some(Ok(packet)),
55                    Err(_) if *this.ignore_decoding_errors => continue,
56                    Err(err) => Some(Err(err.into())),
57                },
58                Some(Err(err)) => Some(Err(err)),
59                None => None,
60            };
61
62            return Poll::Ready(res);
63        }
64    }
65}
66
67impl<T> Sink<CompoundRtcpPacket> for RtcpChannel<T>
68where
69    T: Sink<Bytes>,
70{
71    type Error = T::Error;
72
73    #[inline]
74    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
75        let this = self.project();
76
77        this.inner.poll_ready(cx)
78    }
79
80    fn start_send(self: Pin<&mut Self>, item: CompoundRtcpPacket) -> Result<(), Self::Error> {
81        let this = self.project();
82
83        item.encode(this.output_buffer);
84
85        let frame = this.output_buffer.split();
86
87        this.inner.start_send(frame.freeze())?;
88
89        Ok(())
90    }
91
92    #[inline]
93    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
94        let this = self.project();
95
96        this.inner.poll_flush(cx)
97    }
98
99    #[inline]
100    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
101        let this = self.project();
102
103        this.inner.poll_close(cx)
104    }
105}