1use std::{
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use futures::{ready, Sink, SinkExt};
10
11use crate::RtpPacket;
12
13pub trait Packetizer {
24 type Frame;
25 type Error;
26
27 fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error>;
33
34 fn flush(&mut self) -> Result<(), Self::Error>;
40
41 fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error>;
46
47 #[inline]
50 fn with_frame<F, T>(self, f: F) -> WithFrame<Self, F, T>
51 where
52 F: FnMut(T) -> Self::Frame,
53 Self: Sized,
54 {
55 WithFrame {
56 packetizer: self,
57 closure: f,
58 _frame: PhantomData,
59 }
60 }
61
62 #[inline]
64 fn map_err<F, E>(self, f: F) -> MapErr<Self, F>
65 where
66 F: FnMut(Self::Error) -> E,
67 Self: Sized,
68 {
69 MapErr {
70 packetizer: self,
71 closure: f,
72 }
73 }
74}
75
76pub struct MapErr<P, F> {
78 packetizer: P,
79 closure: F,
80}
81
82impl<P, F, E> Packetizer for MapErr<P, F>
83where
84 P: Packetizer,
85 F: FnMut(P::Error) -> E,
86{
87 type Frame = P::Frame;
88 type Error = E;
89
90 #[inline]
91 fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error> {
92 self.packetizer.push(frame).map_err(&mut self.closure)
93 }
94
95 #[inline]
96 fn flush(&mut self) -> Result<(), Self::Error> {
97 self.packetizer.flush().map_err(&mut self.closure)
98 }
99
100 #[inline]
101 fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error> {
102 self.packetizer.take().map_err(&mut self.closure)
103 }
104}
105
106pub struct WithFrame<P, F, T> {
108 packetizer: P,
109 closure: F,
110 _frame: PhantomData<T>,
111}
112
113impl<P, F, T> Packetizer for WithFrame<P, F, T>
114where
115 P: Packetizer,
116 F: FnMut(T) -> P::Frame,
117{
118 type Frame = T;
119 type Error = P::Error;
120
121 #[inline]
122 fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error> {
123 self.packetizer.push((self.closure)(frame))
124 }
125
126 #[inline]
127 fn flush(&mut self) -> Result<(), Self::Error> {
128 self.packetizer.flush()
129 }
130
131 #[inline]
132 fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error> {
133 self.packetizer.take()
134 }
135}
136
137impl<T> Packetizer for Box<T>
138where
139 T: Packetizer + ?Sized,
140{
141 type Frame = T::Frame;
142 type Error = T::Error;
143
144 #[inline]
145 fn push(&mut self, frame: Self::Frame) -> Result<(), Self::Error> {
146 <T as Packetizer>::push(self, frame)
147 }
148
149 #[inline]
150 fn flush(&mut self) -> Result<(), Self::Error> {
151 <T as Packetizer>::flush(self)
152 }
153
154 #[inline]
155 fn take(&mut self) -> Result<Option<RtpPacket>, Self::Error> {
156 <T as Packetizer>::take(self)
157 }
158}
159
160pub struct MediaSink<S, P> {
163 rtp_sink: S,
164 packetizer: P,
165 pending: Option<RtpPacket>,
166}
167
168impl<S, P> MediaSink<S, P> {
169 #[inline]
171 pub const fn new(rtp_sink: S, packetizer: P) -> Self {
172 Self {
173 rtp_sink,
174 packetizer,
175 pending: None,
176 }
177 }
178}
179
180impl<S, P> MediaSink<S, P>
181where
182 S: Sink<RtpPacket> + Unpin,
183 P: Packetizer,
184 S::Error: From<P::Error>,
185{
186 fn next_packet(&mut self) -> Result<Option<RtpPacket>, P::Error> {
188 if let Some(packet) = self.pending.take() {
189 Ok(Some(packet))
190 } else {
191 self.packetizer.take()
192 }
193 }
194
195 fn poll_flush_packetizer(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
197 while let Some(packet) = self.next_packet()? {
198 match self.rtp_sink.poll_ready_unpin(cx) {
199 Poll::Ready(Ok(())) => self.rtp_sink.start_send_unpin(packet)?,
200 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
201 Poll::Pending => {
202 self.pending = Some(packet);
204
205 return Poll::Pending;
206 }
207 }
208 }
209
210 Poll::Ready(Ok(()))
211 }
212}
213
214impl<S, P> Sink<P::Frame> for MediaSink<S, P>
215where
216 S: Sink<RtpPacket> + Unpin,
217 P: Packetizer + Unpin,
218 S::Error: From<P::Error>,
219{
220 type Error = S::Error;
221
222 #[inline]
223 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
224 self.poll_flush_packetizer(cx)
225 }
226
227 #[inline]
228 fn start_send(mut self: Pin<&mut Self>, frame: P::Frame) -> Result<(), Self::Error> {
229 self.packetizer.push(frame)?;
230
231 Ok(())
232 }
233
234 #[inline]
235 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
236 ready!(self.poll_flush_packetizer(cx))?;
237
238 self.rtp_sink.poll_flush_unpin(cx)
239 }
240
241 #[inline]
242 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
243 ready!(self.poll_flush_packetizer(cx))?;
244
245 self.rtp_sink.poll_close_unpin(cx)
246 }
247}