msf_rtp/
packetizer.rs

1//! Common types for media to RTP framing.
2
3use std::{
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use futures::{ready, Sink};
10
11use crate::RtpPacket;
12
13/// Common trait for packetizers.
14///
15/// Packetizers are responsible for converting media frames into RTP packets.
16///
17/// # Usage
18/// 1. Push a media frame into the packetizer.
19/// 2. Take all RTP packets from the packetizer.
20/// 3. Repeat from (1) if needed.
21/// 4. Flush the packetizer.
22/// 5. Take all RTP packets from the packetizer.
23pub trait Packetizer {
24    type Frame;
25    type Error;
26
27    /// Process a given media frame.
28    ///
29    /// # Panics
30    /// The method may panic if calling the `take` method would not return
31    /// `None`.
32    fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error>;
33
34    /// Flush the packetizer.
35    ///
36    /// # Panics
37    /// The method may panic if calling the `take` method would not return
38    /// `None`.
39    fn flush(&mut self) -> Result<(), Self::Error>;
40
41    /// Take the next available RTP packet.
42    ///
43    /// Note that only after this method returns `None`, it is allowed to call
44    /// the `push` method or the `flush` method again.
45    fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error>;
46
47    /// Convert this packetizer into a new one accepting media frames of a
48    /// given type.
49    #[inline]
50    fn with_frame<F, T>(self, f: F) -> WithFrame<Self, F, T>
51    where
52        F: FnMut(T) -> Self::Frame,
53        Self: Sized,
54    {
55        WithFrame {
56            packetizer: self,
57            closure: f,
58            _frame: PhantomData,
59        }
60    }
61
62    /// Map the packetizer error into a different one.
63    #[inline]
64    fn map_err<F, E>(self, f: F) -> MapErr<Self, F>
65    where
66        F: FnMut(Self::Error) -> E,
67        Self: Sized,
68    {
69        MapErr {
70            packetizer: self,
71            closure: f,
72        }
73    }
74}
75
76/// Packetizer with mapped error type.
77pub struct MapErr<P, F> {
78    packetizer: P,
79    closure: F,
80}
81
82impl<P, F, E> Packetizer for MapErr<P, F>
83where
84    P: Packetizer,
85    F: FnMut(P::Error) -> E,
86{
87    type Frame = P::Frame;
88    type Error = E;
89
90    #[inline]
91    fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error> {
92        self.packetizer.push(frame).map_err(&mut self.closure)
93    }
94
95    #[inline]
96    fn flush(&mut self) -> Result<(), Self::Error> {
97        self.packetizer.flush().map_err(&mut self.closure)
98    }
99
100    #[inline]
101    fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error> {
102        self.packetizer.take().map_err(&mut self.closure)
103    }
104}
105
106/// Packetizer with mapped media frame type.
107pub struct WithFrame<P, F, T> {
108    packetizer: P,
109    closure: F,
110    _frame: PhantomData<T>,
111}
112
113impl<P, F, T> Packetizer for WithFrame<P, F, T>
114where
115    P: Packetizer,
116    F: FnMut(T) -> P::Frame,
117{
118    type Frame = T;
119    type Error = P::Error;
120
121    #[inline]
122    fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error> {
123        self.packetizer.push((self.closure)(frame))
124    }
125
126    #[inline]
127    fn flush(&mut self) -> Result<(), Self::Error> {
128        self.packetizer.flush()
129    }
130
131    #[inline]
132    fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error> {
133        self.packetizer.take()
134    }
135}
136
137impl<T> Packetizer for Box<T>
138where
139    T: Packetizer + ?Sized,
140{
141    type Frame = T::Frame;
142    type Error = T::Error;
143
144    #[inline]
145    fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error> {
146        <T as Packetizer>::push(self, frame)
147    }
148
149    #[inline]
150    fn flush(&mut self) -> Result<(), Self::Error> {
151        <T as Packetizer>::flush(self)
152    }
153
154    #[inline]
155    fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error> {
156        <T as Packetizer>::take(self)
157    }
158}
159
160pin_project_lite::pin_project! {
161    /// Media sink that uses an underlying packetizer to convert media frames
162    /// into RTP packets and forwards the RTP packets into an underlying RTP
163    /// sink.
164    pub struct MediaSink<S, P> {
165        #[pin]
166        rtp_sink: S,
167        context: MediaSinkContext<P>,
168    }
169}
170
171impl<S, P> MediaSink<S, P> {
172    /// Create a new media sink.
173    #[inline]
174    pub const fn new(rtp_sink: S, packetizer: P) -> Self {
175        Self {
176            rtp_sink,
177            context: MediaSinkContext::new(packetizer),
178        }
179    }
180}
181
182impl<S, P> MediaSink<S, P>
183where
184    S: Sink<RtpPacket>,
185    P: Packetizer,
186    S::Error: From<P::Error>,
187{
188    /// Flush the underlying packetizer.
189    fn poll_flush_packetizer(
190        self: Pin<&mut Self>,
191        cx: &mut Context<'_>,
192    ) -> Poll<Result<(), S::Error>> {
193        let mut this = self.project();
194
195        while let Some(packet) = this.context.next_packet()? {
196            match this.rtp_sink.as_mut().poll_ready(cx) {
197                Poll::Ready(Ok(())) => this.rtp_sink.as_mut().start_send(packet)?,
198                Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
199                Poll::Pending => {
200                    // we'll have to try it next time
201                    this.context.push_back_packet(packet);
202
203                    return Poll::Pending;
204                }
205            }
206        }
207
208        Poll::Ready(Ok(()))
209    }
210}
211
212impl<S, P> Sink<P::Frame> for MediaSink<S, P>
213where
214    S: Sink<RtpPacket>,
215    P: Packetizer,
216    S::Error: From<P::Error>,
217{
218    type Error = S::Error;
219
220    #[inline]
221    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
222        self.poll_flush_packetizer(cx)
223    }
224
225    #[inline]
226    fn start_send(self: Pin<&mut Self>, frame: P::Frame) -> Result<(), Self::Error> {
227        let this = self.project();
228
229        this.context.push_frame(frame)?;
230
231        Ok(())
232    }
233
234    #[inline]
235    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
236        let this = self.as_mut();
237
238        ready!(this.poll_flush_packetizer(cx))?;
239
240        let this = self.project();
241
242        this.rtp_sink.poll_flush(cx)
243    }
244
245    #[inline]
246    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
247        let this = self.as_mut();
248
249        ready!(this.poll_flush_packetizer(cx))?;
250
251        let this = self.project();
252
253        this.rtp_sink.poll_close(cx)
254    }
255}
256
257/// Media sink context.
258struct MediaSinkContext<P> {
259    packetizer: P,
260    pending: Option<RtpPacket>,
261}
262
263impl<P> MediaSinkContext<P> {
264    /// Create a new media sink context.
265    #[inline]
266    const fn new(packetizer: P) -> Self {
267        Self {
268            packetizer,
269            pending: None,
270        }
271    }
272
273    /// Push back a given packet that was not sent.
274    ///
275    /// The packet will be returned next time the `next_packet` method is
276    /// called.
277    fn push_back_packet(&mut self, packet: RtpPacket) {
278        self.pending = Some(packet);
279    }
280}
281
282impl<P> MediaSinkContext<P>
283where
284    P: Packetizer,
285{
286    /// Push a given media frame into the underlying packetizer.
287    fn push_frame(&mut self, frame: P::Frame) -> Result<(), P::Error> {
288        self.packetizer.push(frame)
289    }
290
291    /// Get the next packet to be sent.
292    fn next_packet(&mut self) -> Result<Option<RtpPacket>, P::Error> {
293        if let Some(packet) = self.pending.take() {
294            Ok(Some(packet))
295        } else {
296            self.packetizer.take()
297        }
298    }
299}