use std::{
pin::Pin,
task::{Context, Poll},
};
use bytes::{Bytes, BytesMut};
use futures::{ready, Sink, Stream};
use crate::{rtp::RtpPacket, InvalidInput};
pin_project_lite::pin_project! {
pub struct RtpChannel<T> {
#[pin]
inner: T,
output_buffer: BytesMut,
ignore_decoding_errors: bool,
}
}
impl<T> RtpChannel<T> {
#[inline]
pub fn new(inner: T, ignore_decoding_errors: bool) -> Self {
Self {
inner,
output_buffer: BytesMut::new(),
ignore_decoding_errors,
}
}
}
impl<T, E> Stream for RtpChannel<T>
where
T: Stream<Item = Result<Bytes, E>>,
E: From<InvalidInput>,
{
type Item = Result<RtpPacket, E>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
let inner = this.inner.as_mut();
let res = match ready!(inner.poll_next(cx)) {
Some(Ok(frame)) => match RtpPacket::decode(frame) {
Ok(packet) => Some(Ok(packet)),
Err(_) if *this.ignore_decoding_errors => continue,
Err(err) => Some(Err(err.into())),
},
Some(Err(err)) => Some(Err(err)),
None => None,
};
return Poll::Ready(res);
}
}
}
impl<T> Sink<RtpPacket> for RtpChannel<T>
where
T: Sink<Bytes>,
{
type Error = T::Error;
#[inline]
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
this.inner.poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, item: RtpPacket) -> Result<(), Self::Error> {
let this = self.project();
item.encode(this.output_buffer);
let frame = this.output_buffer.split();
this.inner.start_send(frame.freeze())?;
Ok(())
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
this.inner.poll_flush(cx)
}
#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
this.inner.poll_close(cx)
}
}