msf_rtp/
packetizer.rs

1//! Common types for media to RTP framing.
2
3use std::{
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use futures::{ready, Sink, SinkExt};
10
11use crate::RtpPacket;
12
13/// Common trait for packetizers.
14///
15/// Packetizers are responsible for converting media frames into RTP packets.
16///
17/// # Usage
18/// 1. Push a media frame into the packetizer.
19/// 2. Take all RTP packets from the packetizer.
20/// 3. Repeat from (1) if needed.
21/// 4. Flush the packetizer.
22/// 5. Take all RTP packets from the packetizer.
23pub trait Packetizer {
24    type Frame;
25    type Error;
26
27    /// Process a given media frame.
28    ///
29    /// # Panics
30    /// The method may panic if calling the `take` method would not return
31    /// `None`.
32    fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error>;
33
34    /// Flush the packetizer.
35    ///
36    /// # Panics
37    /// The method may panic if calling the `take` method would not return
38    /// `None`.
39    fn flush(&mut self) -> Result<(), Self::Error>;
40
41    /// Take the next available RTP packet.
42    ///
43    /// Note that only after this method returns `None`, it is allowed to call
44    /// the `push` method or the `flush` method again.
45    fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error>;
46
47    /// Convert this packetizer into a new one accepting media frames of a
48    /// given type.
49    #[inline]
50    fn with_frame<F, T>(self, f: F) -> WithFrame<Self, F, T>
51    where
52        F: FnMut(T) -> Self::Frame,
53        Self: Sized,
54    {
55        WithFrame {
56            packetizer: self,
57            closure: f,
58            _frame: PhantomData,
59        }
60    }
61
62    /// Map the packetizer error into a different one.
63    #[inline]
64    fn map_err<F, E>(self, f: F) -> MapErr<Self, F>
65    where
66        F: FnMut(Self::Error) -> E,
67        Self: Sized,
68    {
69        MapErr {
70            packetizer: self,
71            closure: f,
72        }
73    }
74}
75
76/// Packetizer with mapped error type.
77pub struct MapErr<P, F> {
78    packetizer: P,
79    closure: F,
80}
81
82impl<P, F, E> Packetizer for MapErr<P, F>
83where
84    P: Packetizer,
85    F: FnMut(P::Error) -> E,
86{
87    type Frame = P::Frame;
88    type Error = E;
89
90    #[inline]
91    fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error> {
92        self.packetizer.push(frame).map_err(&mut self.closure)
93    }
94
95    #[inline]
96    fn flush(&mut self) -> Result<(), Self::Error> {
97        self.packetizer.flush().map_err(&mut self.closure)
98    }
99
100    #[inline]
101    fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error> {
102        self.packetizer.take().map_err(&mut self.closure)
103    }
104}
105
106/// Packetizer with mapped media frame type.
107pub struct WithFrame<P, F, T> {
108    packetizer: P,
109    closure: F,
110    _frame: PhantomData<T>,
111}
112
113impl<P, F, T> Packetizer for WithFrame<P, F, T>
114where
115    P: Packetizer,
116    F: FnMut(T) -> P::Frame,
117{
118    type Frame = T;
119    type Error = P::Error;
120
121    #[inline]
122    fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error> {
123        self.packetizer.push((self.closure)(frame))
124    }
125
126    #[inline]
127    fn flush(&mut self) -> Result<(), Self::Error> {
128        self.packetizer.flush()
129    }
130
131    #[inline]
132    fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error> {
133        self.packetizer.take()
134    }
135}
136
137impl<T> Packetizer for Box<T>
138where
139    T: Packetizer + ?Sized,
140{
141    type Frame = T::Frame;
142    type Error = T::Error;
143
144    #[inline]
145    fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error> {
146        <T as Packetizer>::push(self, frame)
147    }
148
149    #[inline]
150    fn flush(&mut self) -> Result<(), Self::Error> {
151        <T as Packetizer>::flush(self)
152    }
153
154    #[inline]
155    fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error> {
156        <T as Packetizer>::take(self)
157    }
158}
159
160/// Media sink that uses an underlying packetizer to convert media frames into
161/// RTP packets and forwards the RTP packets into an underlying RTP sink.
162pub struct MediaSink<S, P> {
163    rtp_sink: S,
164    packetizer: P,
165    pending: Option<RtpPacket>,
166}
167
168impl<S, P> MediaSink<S, P> {
169    /// Create a new media sink.
170    #[inline]
171    pub const fn new(rtp_sink: S, packetizer: P) -> Self {
172        Self {
173            rtp_sink,
174            packetizer,
175            pending: None,
176        }
177    }
178}
179
180impl<S, P> MediaSink<S, P>
181where
182    S: Sink<RtpPacket> + Unpin,
183    P: Packetizer,
184    S::Error: From<P::Error>,
185{
186    /// Get the next packet to be sent.
187    fn next_packet(&mut self) -> Result<Option<RtpPacket>, P::Error> {
188        if let Some(packet) = self.pending.take() {
189            Ok(Some(packet))
190        } else {
191            self.packetizer.take()
192        }
193    }
194
195    /// Flush the underlying packetizer.
196    fn poll_flush_packetizer(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
197        while let Some(packet) = self.next_packet()? {
198            match self.rtp_sink.poll_ready_unpin(cx) {
199                Poll::Ready(Ok(())) => self.rtp_sink.start_send_unpin(packet)?,
200                Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
201                Poll::Pending => {
202                    // we'll have to try it next time
203                    self.pending = Some(packet);
204
205                    return Poll::Pending;
206                }
207            }
208        }
209
210        Poll::Ready(Ok(()))
211    }
212}
213
214impl<S, P> Sink<P::Frame> for MediaSink<S, P>
215where
216    S: Sink<RtpPacket> + Unpin,
217    P: Packetizer + Unpin,
218    S::Error: From<P::Error>,
219{
220    type Error = S::Error;
221
222    #[inline]
223    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
224        self.poll_flush_packetizer(cx)
225    }
226
227    #[inline]
228    fn start_send(mut self: Pin<&mut Self>, frame: P::Frame) -> Result<(), Self::Error> {
229        self.packetizer.push(frame)?;
230
231        Ok(())
232    }
233
234    #[inline]
235    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
236        ready!(self.poll_flush_packetizer(cx))?;
237
238        self.rtp_sink.poll_flush_unpin(cx)
239    }
240
241    #[inline]
242    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
243        ready!(self.poll_flush_packetizer(cx))?;
244
245        self.rtp_sink.poll_close_unpin(cx)
246    }
247}