1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use bytes::{Bytes, BytesMut};
7use futures::{ready, Sink, Stream};
8
9use crate::{rtp::RtpPacket, InvalidInput};
10
11pin_project_lite::pin_project! {
12 pub struct RtpChannel<T> {
17 #[pin]
18 inner: T,
19 output_buffer: BytesMut,
20 ignore_decoding_errors: bool,
21 }
22}
23
24impl<T> RtpChannel<T> {
25 #[inline]
33 pub fn new(inner: T, ignore_decoding_errors: bool) -> Self {
34 Self {
35 inner,
36 output_buffer: BytesMut::new(),
37 ignore_decoding_errors,
38 }
39 }
40}
41
42impl<T, E> Stream for RtpChannel<T>
43where
44 T: Stream<Item = Result<Bytes, E>>,
45 E: From<InvalidInput>,
46{
47 type Item = Result<RtpPacket, E>;
48
49 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50 let mut this = self.project();
51
52 loop {
53 let inner = this.inner.as_mut();
54
55 let res = match ready!(inner.poll_next(cx)) {
56 Some(Ok(frame)) => match RtpPacket::decode(frame) {
57 Ok(packet) => Some(Ok(packet)),
58 Err(_) if *this.ignore_decoding_errors => continue,
59 Err(err) => Some(Err(err.into())),
60 },
61 Some(Err(err)) => Some(Err(err)),
62 None => None,
63 };
64
65 return Poll::Ready(res);
66 }
67 }
68}
69
70impl<T> Sink<RtpPacket> for RtpChannel<T>
71where
72 T: Sink<Bytes>,
73{
74 type Error = T::Error;
75
76 #[inline]
77 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
78 let this = self.project();
79
80 this.inner.poll_ready(cx)
81 }
82
83 fn start_send(self: Pin<&mut Self>, item: RtpPacket) -> Result<(), Self::Error> {
84 let this = self.project();
85
86 item.encode(this.output_buffer);
87
88 let frame = this.output_buffer.split();
89
90 this.inner.start_send(frame.freeze())?;
91
92 Ok(())
93 }
94
95 #[inline]
96 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
97 let this = self.project();
98
99 this.inner.poll_flush(cx)
100 }
101
102 #[inline]
103 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
104 let this = self.project();
105
106 this.inner.poll_close(cx)
107 }
108}