1use std::{
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use futures::{ready, Sink};
10
11use crate::RtpPacket;
12
13pub trait Packetizer {
24 type Frame;
25 type Error;
26
27 fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error>;
33
34 fn flush(&mut self) -> Result<(), Self::Error>;
40
41 fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error>;
46
47 #[inline]
50 fn with_frame<F, T>(self, f: F) -> WithFrame<Self, F, T>
51 where
52 F: FnMut(T) -> Self::Frame,
53 Self: Sized,
54 {
55 WithFrame {
56 packetizer: self,
57 closure: f,
58 _frame: PhantomData,
59 }
60 }
61
62 #[inline]
64 fn map_err<F, E>(self, f: F) -> MapErr<Self, F>
65 where
66 F: FnMut(Self::Error) -> E,
67 Self: Sized,
68 {
69 MapErr {
70 packetizer: self,
71 closure: f,
72 }
73 }
74}
75
76pub struct MapErr<P, F> {
78 packetizer: P,
79 closure: F,
80}
81
82impl<P, F, E> Packetizer for MapErr<P, F>
83where
84 P: Packetizer,
85 F: FnMut(P::Error) -> E,
86{
87 type Frame = P::Frame;
88 type Error = E;
89
90 #[inline]
91 fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error> {
92 self.packetizer.push(frame).map_err(&mut self.closure)
93 }
94
95 #[inline]
96 fn flush(&mut self) -> Result<(), Self::Error> {
97 self.packetizer.flush().map_err(&mut self.closure)
98 }
99
100 #[inline]
101 fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error> {
102 self.packetizer.take().map_err(&mut self.closure)
103 }
104}
105
106pub struct WithFrame<P, F, T> {
108 packetizer: P,
109 closure: F,
110 _frame: PhantomData<T>,
111}
112
113impl<P, F, T> Packetizer for WithFrame<P, F, T>
114where
115 P: Packetizer,
116 F: FnMut(T) -> P::Frame,
117{
118 type Frame = T;
119 type Error = P::Error;
120
121 #[inline]
122 fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error> {
123 self.packetizer.push((self.closure)(frame))
124 }
125
126 #[inline]
127 fn flush(&mut self) -> Result<(), Self::Error> {
128 self.packetizer.flush()
129 }
130
131 #[inline]
132 fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error> {
133 self.packetizer.take()
134 }
135}
136
137impl<T> Packetizer for Box<T>
138where
139 T: Packetizer + ?Sized,
140{
141 type Frame = T::Frame;
142 type Error = T::Error;
143
144 #[inline]
145 fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error> {
146 <T as Packetizer>::push(self, frame)
147 }
148
149 #[inline]
150 fn flush(&mut self) -> Result<(), Self::Error> {
151 <T as Packetizer>::flush(self)
152 }
153
154 #[inline]
155 fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error> {
156 <T as Packetizer>::take(self)
157 }
158}
159
160pin_project_lite::pin_project! {
161 pub struct MediaSink<S, P> {
165 #[pin]
166 rtp_sink: S,
167 context: MediaSinkContext<P>,
168 }
169}
170
171impl<S, P> MediaSink<S, P> {
172 #[inline]
174 pub const fn new(rtp_sink: S, packetizer: P) -> Self {
175 Self {
176 rtp_sink,
177 context: MediaSinkContext::new(packetizer),
178 }
179 }
180}
181
182impl<S, P> MediaSink<S, P>
183where
184 S: Sink<RtpPacket>,
185 P: Packetizer,
186 S::Error: From<P::Error>,
187{
188 fn poll_flush_packetizer(
190 self: Pin<&mut Self>,
191 cx: &mut Context<'_>,
192 ) -> Poll<Result<(), S::Error>> {
193 let mut this = self.project();
194
195 while let Some(packet) = this.context.next_packet()? {
196 match this.rtp_sink.as_mut().poll_ready(cx) {
197 Poll::Ready(Ok(())) => this.rtp_sink.as_mut().start_send(packet)?,
198 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
199 Poll::Pending => {
200 this.context.push_back_packet(packet);
202
203 return Poll::Pending;
204 }
205 }
206 }
207
208 Poll::Ready(Ok(()))
209 }
210}
211
212impl<S, P> Sink<P::Frame> for MediaSink<S, P>
213where
214 S: Sink<RtpPacket>,
215 P: Packetizer,
216 S::Error: From<P::Error>,
217{
218 type Error = S::Error;
219
220 #[inline]
221 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
222 self.poll_flush_packetizer(cx)
223 }
224
225 #[inline]
226 fn start_send(self: Pin<&mut Self>, frame: P::Frame) -> Result<(), Self::Error> {
227 let this = self.project();
228
229 this.context.push_frame(frame)?;
230
231 Ok(())
232 }
233
234 #[inline]
235 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
236 let this = self.as_mut();
237
238 ready!(this.poll_flush_packetizer(cx))?;
239
240 let this = self.project();
241
242 this.rtp_sink.poll_flush(cx)
243 }
244
245 #[inline]
246 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
247 let this = self.as_mut();
248
249 ready!(this.poll_flush_packetizer(cx))?;
250
251 let this = self.project();
252
253 this.rtp_sink.poll_close(cx)
254 }
255}
256
257struct MediaSinkContext<P> {
259 packetizer: P,
260 pending: Option<RtpPacket>,
261}
262
263impl<P> MediaSinkContext<P> {
264 #[inline]
266 const fn new(packetizer: P) -> Self {
267 Self {
268 packetizer,
269 pending: None,
270 }
271 }
272
273 fn push_back_packet(&mut self, packet: RtpPacket) {
278 self.pending = Some(packet);
279 }
280}
281
282impl<P> MediaSinkContext<P>
283where
284 P: Packetizer,
285{
286 fn push_frame(&mut self, frame: P::Frame) -> Result<(), P::Error> {
288 self.packetizer.push(frame)
289 }
290
291 fn next_packet(&mut self) -> Result<Option<RtpPacket>, P::Error> {
293 if let Some(packet) = self.pending.take() {
294 Ok(Some(packet))
295 } else {
296 self.packetizer.take()
297 }
298 }
299}