msf_rtp/
depacketizer.rs

1//! Common types for RTP to media framing.
2
3use std::{
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use futures::Stream;
9
10use crate::RtpPacket;
11
12/// Common trait for de-packetizers.
13///
14/// Depacketizers are responsible for converting RTP packets into media frames.
15///
16/// # Usage
17/// 1. Push an RTP packet into the depacketizer.
18/// 2. Take all media frames from the depacketizer.
19/// 3. Repeat from (1) if needed.
20/// 4. Flush the depacketizer.
21/// 5. Take all media frames from the depacketizer.
22pub trait Depacketizer {
23    type Frame;
24    type Error;
25
26    /// Process a given RTP packet.
27    ///
28    /// # Panics
29    /// The method may panic if calling the `take` method would not return
30    /// `None`.
31    fn push(&mut self, packet: RtpPacket) -> Result<(), Self::Error>;
32
33    /// Flush the depacketizer.
34    ///
35    /// # Panics
36    /// The method may panic if calling the `take` method would not return
37    /// `None`.
38    fn flush(&mut self) -> Result<(), Self::Error>;
39
40    /// Take the next available media frame.
41    ///
42    /// Note that only after this method returns `None`, it is allowed to call
43    /// the `push` method or the `flush` method again.
44    fn take(&mut self) -> Result<Option<Self::Frame>, Self::Error>;
45
46    /// Map the media frame into a different type.
47    #[inline]
48    fn map_frame<F, T>(self, f: F) -> MapFrame<Self, F>
49    where
50        F: FnMut(Self::Frame) -> T,
51        Self: Sized,
52    {
53        MapFrame {
54            depacketizer: self,
55            closure: f,
56        }
57    }
58
59    /// Map the depacketizer error into a different one.
60    #[inline]
61    fn map_err<F, E>(self, f: F) -> MapErr<Self, F>
62    where
63        F: FnMut(Self::Error) -> E,
64        Self: Sized,
65    {
66        MapErr {
67            depacketizer: self,
68            closure: f,
69        }
70    }
71}
72
73/// Depacketizer with mapped error type.
74pub struct MapErr<D, F> {
75    depacketizer: D,
76    closure: F,
77}
78
79impl<D, F, E> Depacketizer for MapErr<D, F>
80where
81    D: Depacketizer,
82    F: FnMut(D::Error) -> E,
83{
84    type Frame = D::Frame;
85    type Error = E;
86
87    #[inline]
88    fn push(&mut self, packet: RtpPacket) -> Result<(), Self::Error> {
89        self.depacketizer.push(packet).map_err(&mut self.closure)
90    }
91
92    #[inline]
93    fn flush(&mut self) -> Result<(), Self::Error> {
94        self.depacketizer.flush().map_err(&mut self.closure)
95    }
96
97    #[inline]
98    fn take(&mut self) -> Result<Option<Self::Frame>, Self::Error> {
99        self.depacketizer.take().map_err(&mut self.closure)
100    }
101}
102
103/// Depacketizer with mapped media frame type.
104pub struct MapFrame<D, F> {
105    depacketizer: D,
106    closure: F,
107}
108
109impl<D, F, T> Depacketizer for MapFrame<D, F>
110where
111    D: Depacketizer,
112    F: FnMut(D::Frame) -> T,
113{
114    type Frame = T;
115    type Error = D::Error;
116
117    #[inline]
118    fn push(&mut self, packet: RtpPacket) -> Result<(), Self::Error> {
119        self.depacketizer.push(packet)
120    }
121
122    #[inline]
123    fn flush(&mut self) -> Result<(), Self::Error> {
124        self.depacketizer.flush()
125    }
126
127    #[inline]
128    fn take(&mut self) -> Result<Option<Self::Frame>, Self::Error> {
129        if let Some(frame) = self.depacketizer.take()? {
130            Ok(Some((self.closure)(frame)))
131        } else {
132            Ok(None)
133        }
134    }
135}
136
137impl<T> Depacketizer for Box<T>
138where
139    T: Depacketizer + ?Sized,
140{
141    type Frame = T::Frame;
142    type Error = T::Error;
143
144    #[inline]
145    fn push(&mut self, packet: RtpPacket) -> Result<(), Self::Error> {
146        <T as Depacketizer>::push(self, packet)
147    }
148
149    #[inline]
150    fn flush(&mut self) -> Result<(), Self::Error> {
151        <T as Depacketizer>::flush(self)
152    }
153
154    #[inline]
155    fn take(&mut self) -> Result<Option<Self::Frame>, Self::Error> {
156        <T as Depacketizer>::take(self)
157    }
158}
159
160pin_project_lite::pin_project! {
161    /// Media stream that uses an underlying depacketizer to convert RTP packets
162    /// from the underlying RTP stream into media frames.
163    pub struct MediaStream<S, D> {
164        #[pin]
165        rtp_stream: Option<S>,
166        depacketizer: D,
167    }
168}
169
170impl<S, D> MediaStream<S, D> {
171    /// Create a new media stream.
172    #[inline]
173    pub const fn new(rtp_stream: S, depacketizer: D) -> Self {
174        Self {
175            rtp_stream: Some(rtp_stream),
176            depacketizer,
177        }
178    }
179}
180
181impl<S, D, E> Stream for MediaStream<S, D>
182where
183    S: Stream<Item = Result<RtpPacket, E>>,
184    D: Depacketizer,
185    E: From<D::Error>,
186{
187    type Item = Result<D::Frame, E>;
188
189    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
190        let mut this = self.project();
191
192        loop {
193            let rtp_stream = this.rtp_stream.as_mut();
194
195            if let Some(frame) = this.depacketizer.take()? {
196                return Poll::Ready(Some(Ok(frame)));
197            } else if let Some(stream) = rtp_stream.as_pin_mut() {
198                if let Poll::Ready(ready) = stream.poll_next(cx) {
199                    if let Some(packet) = ready.transpose()? {
200                        this.depacketizer.push(packet)?;
201                    } else {
202                        this.depacketizer.flush()?;
203                        this.rtp_stream.set(None);
204                    }
205                } else {
206                    return Poll::Pending;
207                }
208            } else {
209                return Poll::Ready(None);
210            }
211        }
212    }
213}