1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use bytes::{Bytes, BytesMut};
7use futures::{ready, Sink, Stream};
8
9use crate::{rtcp::CompoundRtcpPacket, InvalidInput};
10
11pin_project_lite::pin_project! {
12 pub struct RtcpChannel<T> {
14 #[pin]
15 inner: T,
16 output_buffer: BytesMut,
17 ignore_decoding_errors: bool,
18 }
19}
20
21impl<T> RtcpChannel<T> {
22 #[inline]
30 pub fn new(inner: T, ignore_decoding_errors: bool) -> Self {
31 Self {
32 inner,
33 output_buffer: BytesMut::new(),
34 ignore_decoding_errors,
35 }
36 }
37}
38
39impl<T, E> Stream for RtcpChannel<T>
40where
41 T: Stream<Item = Result<Bytes, E>>,
42 E: From<InvalidInput>,
43{
44 type Item = Result<CompoundRtcpPacket, E>;
45
46 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47 let mut this = self.project();
48
49 loop {
50 let inner = this.inner.as_mut();
51
52 let res = match ready!(inner.poll_next(cx)) {
53 Some(Ok(frame)) => match CompoundRtcpPacket::decode(frame) {
54 Ok(packet) => Some(Ok(packet)),
55 Err(_) if *this.ignore_decoding_errors => continue,
56 Err(err) => Some(Err(err.into())),
57 },
58 Some(Err(err)) => Some(Err(err)),
59 None => None,
60 };
61
62 return Poll::Ready(res);
63 }
64 }
65}
66
67impl<T> Sink<CompoundRtcpPacket> for RtcpChannel<T>
68where
69 T: Sink<Bytes>,
70{
71 type Error = T::Error;
72
73 #[inline]
74 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
75 let this = self.project();
76
77 this.inner.poll_ready(cx)
78 }
79
80 fn start_send(self: Pin<&mut Self>, item: CompoundRtcpPacket) -> Result<(), Self::Error> {
81 let this = self.project();
82
83 item.encode(this.output_buffer);
84
85 let frame = this.output_buffer.split();
86
87 this.inner.start_send(frame.freeze())?;
88
89 Ok(())
90 }
91
92 #[inline]
93 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
94 let this = self.project();
95
96 this.inner.poll_flush(cx)
97 }
98
99 #[inline]
100 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
101 let this = self.project();
102
103 this.inner.poll_close(cx)
104 }
105}