use std::{
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use futures::{ready, Sink, SinkExt};
use crate::RtpPacket;
pub trait Packetizer {
type Frame;
type Error;
fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error>;
fn flush(&mut self) -> Result<(), Self::Error>;
fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error>;
#[inline]
fn with_frame<F, T>(self, f: F) -> WithFrame<Self, F, T>
where
F: FnMut(T) -> Self::Frame,
Self: Sized,
{
WithFrame {
packetizer: self,
closure: f,
_frame: PhantomData::default(),
}
}
#[inline]
fn map_err<F, E>(self, f: F) -> MapErr<Self, F>
where
F: FnMut(Self::Error) -> E,
Self: Sized,
{
MapErr {
packetizer: self,
closure: f,
}
}
}
pub struct MapErr<P, F> {
packetizer: P,
closure: F,
}
impl<P, F, E> Packetizer for MapErr<P, F>
where
P: Packetizer,
F: FnMut(P::Error) -> E,
{
type Frame = P::Frame;
type Error = E;
#[inline]
fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error> {
self.packetizer.push(frame).map_err(&mut self.closure)
}
#[inline]
fn flush(&mut self) -> Result<(), Self::Error> {
self.packetizer.flush().map_err(&mut self.closure)
}
#[inline]
fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error> {
self.packetizer.take().map_err(&mut self.closure)
}
}
pub struct WithFrame<P, F, T> {
packetizer: P,
closure: F,
_frame: PhantomData<T>,
}
impl<P, F, T> Packetizer for WithFrame<P, F, T>
where
P: Packetizer,
F: FnMut(T) -> P::Frame,
{
type Frame = T;
type Error = P::Error;
#[inline]
fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error> {
self.packetizer.push((self.closure)(frame))
}
#[inline]
fn flush(&mut self) -> Result<(), Self::Error> {
self.packetizer.flush()
}
#[inline]
fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error> {
self.packetizer.take()
}
}
impl<T> Packetizer for Box<T>
where
T: Packetizer + ?Sized,
{
type Frame = T::Frame;
type Error = T::Error;
#[inline]
fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error> {
<T as Packetizer>::push(self, frame)
}
#[inline]
fn flush(&mut self) -> Result<(), Self::Error> {
<T as Packetizer>::flush(self)
}
#[inline]
fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error> {
<T as Packetizer>::take(self)
}
}
pub struct MediaSink<S, P> {
rtp_sink: S,
packetizer: P,
pending: Option<RtpPacket>,
}
impl<S, P> MediaSink<S, P> {
#[inline]
pub const fn new(rtp_sink: S, packetizer: P) -> Self {
Self {
rtp_sink,
packetizer,
pending: None,
}
}
}
impl<S, P> MediaSink<S, P>
where
S: Sink<RtpPacket> + Unpin,
P: Packetizer,
S::Error: From<P::Error>,
{
fn next_packet(&mut self) -> Result<Option<RtpPacket>, P::Error> {
if let Some(packet) = self.pending.take() {
Ok(Some(packet))
} else {
self.packetizer.take()
}
}
fn poll_flush_packetizer(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
while let Some(packet) = self.next_packet()? {
match self.rtp_sink.poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => self.rtp_sink.start_send_unpin(packet)?,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => {
self.pending = Some(packet);
return Poll::Pending;
}
}
}
Poll::Ready(Ok(()))
}
}
impl<S, P> Sink<P::Frame> for MediaSink<S, P>
where
S: Sink<RtpPacket> + Unpin,
P: Packetizer + Unpin,
S::Error: From<P::Error>,
{
type Error = S::Error;
#[inline]
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush_packetizer(cx)
}
#[inline]
fn start_send(mut self: Pin<&mut Self>, frame: P::Frame) -> Result<(), Self::Error> {
self.packetizer.push(frame)?;
Ok(())
}
#[inline]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.poll_flush_packetizer(cx))?;
self.rtp_sink.poll_flush_unpin(cx)
}
#[inline]
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.poll_flush_packetizer(cx))?;
self.rtp_sink.poll_close_unpin(cx)
}
}