msf_rtp/
channel.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use bytes::{Bytes, BytesMut};
7use futures::{ready, Sink, Stream};
8
9use crate::{rtp::RtpPacket, InvalidInput};
10
11pin_project_lite::pin_project! {
12    /// RTP channel decoding/encoding RTP packets from/to byte frames.
13    ///
14    /// The channel does not perform any reordering or additional validation of
15    /// the incoming RTP packets.
16    pub struct RtpChannel<T> {
17        #[pin]
18        inner: T,
19        output_buffer: BytesMut,
20        ignore_decoding_errors: bool,
21    }
22}
23
24impl<T> RtpChannel<T> {
25    /// Create a new RTP channel from a given byte frame channel.
26    ///
27    /// # Arguments
28    /// * `inner` - the underlying byte frame channel
29    /// * `ignore_decoding_errors` - if true, decoding errors will be ignored
30    ///   and the invalid packets will be silently dropped (the underlying
31    ///   stream errors will still be propagated)
32    #[inline]
33    pub fn new(inner: T, ignore_decoding_errors: bool) -> Self {
34        Self {
35            inner,
36            output_buffer: BytesMut::new(),
37            ignore_decoding_errors,
38        }
39    }
40}
41
42impl<T, E> Stream for RtpChannel<T>
43where
44    T: Stream<Item = Result<Bytes, E>>,
45    E: From<InvalidInput>,
46{
47    type Item = Result<RtpPacket, E>;
48
49    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50        let mut this = self.project();
51
52        loop {
53            let inner = this.inner.as_mut();
54
55            let res = match ready!(inner.poll_next(cx)) {
56                Some(Ok(frame)) => match RtpPacket::decode(frame) {
57                    Ok(packet) => Some(Ok(packet)),
58                    Err(_) if *this.ignore_decoding_errors => continue,
59                    Err(err) => Some(Err(err.into())),
60                },
61                Some(Err(err)) => Some(Err(err)),
62                None => None,
63            };
64
65            return Poll::Ready(res);
66        }
67    }
68}
69
70impl<T> Sink<RtpPacket> for RtpChannel<T>
71where
72    T: Sink<Bytes>,
73{
74    type Error = T::Error;
75
76    #[inline]
77    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
78        let this = self.project();
79
80        this.inner.poll_ready(cx)
81    }
82
83    fn start_send(self: Pin<&mut Self>, item: RtpPacket) -> Result<(), Self::Error> {
84        let this = self.project();
85
86        item.encode(this.output_buffer);
87
88        let frame = this.output_buffer.split();
89
90        this.inner.start_send(frame.freeze())?;
91
92        Ok(())
93    }
94
95    #[inline]
96    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
97        let this = self.project();
98
99        this.inner.poll_flush(cx)
100    }
101
102    #[inline]
103    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
104        let this = self.project();
105
106        this.inner.poll_close(cx)
107    }
108}