1use std::{
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use futures::Stream;
9
10use crate::RtpPacket;
11
12pub trait Depacketizer {
23 type Frame;
24 type Error;
25
26 fn push(&mut self, packet: RtpPacket) -> Result<(), Self::Error>;
32
33 fn flush(&mut self) -> Result<(), Self::Error>;
39
40 fn take(&mut self) -> Result<Option<Self::Frame>, Self::Error>;
45
46 #[inline]
48 fn map_frame<F, T>(self, f: F) -> MapFrame<Self, F>
49 where
50 F: FnMut(Self::Frame) -> T,
51 Self: Sized,
52 {
53 MapFrame {
54 depacketizer: self,
55 closure: f,
56 }
57 }
58
59 #[inline]
61 fn map_err<F, E>(self, f: F) -> MapErr<Self, F>
62 where
63 F: FnMut(Self::Error) -> E,
64 Self: Sized,
65 {
66 MapErr {
67 depacketizer: self,
68 closure: f,
69 }
70 }
71}
72
73pub struct MapErr<D, F> {
75 depacketizer: D,
76 closure: F,
77}
78
79impl<D, F, E> Depacketizer for MapErr<D, F>
80where
81 D: Depacketizer,
82 F: FnMut(D::Error) -> E,
83{
84 type Frame = D::Frame;
85 type Error = E;
86
87 #[inline]
88 fn push(&mut self, packet: RtpPacket) -> Result<(), Self::Error> {
89 self.depacketizer.push(packet).map_err(&mut self.closure)
90 }
91
92 #[inline]
93 fn flush(&mut self) -> Result<(), Self::Error> {
94 self.depacketizer.flush().map_err(&mut self.closure)
95 }
96
97 #[inline]
98 fn take(&mut self) -> Result<Option<Self::Frame>, Self::Error> {
99 self.depacketizer.take().map_err(&mut self.closure)
100 }
101}
102
103pub struct MapFrame<D, F> {
105 depacketizer: D,
106 closure: F,
107}
108
109impl<D, F, T> Depacketizer for MapFrame<D, F>
110where
111 D: Depacketizer,
112 F: FnMut(D::Frame) -> T,
113{
114 type Frame = T;
115 type Error = D::Error;
116
117 #[inline]
118 fn push(&mut self, packet: RtpPacket) -> Result<(), Self::Error> {
119 self.depacketizer.push(packet)
120 }
121
122 #[inline]
123 fn flush(&mut self) -> Result<(), Self::Error> {
124 self.depacketizer.flush()
125 }
126
127 #[inline]
128 fn take(&mut self) -> Result<Option<Self::Frame>, Self::Error> {
129 if let Some(frame) = self.depacketizer.take()? {
130 Ok(Some((self.closure)(frame)))
131 } else {
132 Ok(None)
133 }
134 }
135}
136
137impl<T> Depacketizer for Box<T>
138where
139 T: Depacketizer + ?Sized,
140{
141 type Frame = T::Frame;
142 type Error = T::Error;
143
144 #[inline]
145 fn push(&mut self, packet: RtpPacket) -> Result<(), Self::Error> {
146 <T as Depacketizer>::push(self, packet)
147 }
148
149 #[inline]
150 fn flush(&mut self) -> Result<(), Self::Error> {
151 <T as Depacketizer>::flush(self)
152 }
153
154 #[inline]
155 fn take(&mut self) -> Result<Option<Self::Frame>, Self::Error> {
156 <T as Depacketizer>::take(self)
157 }
158}
159
160pin_project_lite::pin_project! {
161 pub struct MediaStream<S, D> {
164 #[pin]
165 rtp_stream: Option<S>,
166 depacketizer: D,
167 }
168}
169
170impl<S, D> MediaStream<S, D> {
171 #[inline]
173 pub const fn new(rtp_stream: S, depacketizer: D) -> Self {
174 Self {
175 rtp_stream: Some(rtp_stream),
176 depacketizer,
177 }
178 }
179}
180
181impl<S, D, E> Stream for MediaStream<S, D>
182where
183 S: Stream<Item = Result<RtpPacket, E>>,
184 D: Depacketizer,
185 E: From<D::Error>,
186{
187 type Item = Result<D::Frame, E>;
188
189 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
190 let mut this = self.project();
191
192 loop {
193 let rtp_stream = this.rtp_stream.as_mut();
194
195 if let Some(frame) = this.depacketizer.take()? {
196 return Poll::Ready(Some(Ok(frame)));
197 } else if let Some(stream) = rtp_stream.as_pin_mut() {
198 if let Poll::Ready(ready) = stream.poll_next(cx) {
199 if let Some(packet) = ready.transpose()? {
200 this.depacketizer.push(packet)?;
201 } else {
202 this.depacketizer.flush()?;
203 this.rtp_stream.set(None);
204 }
205 } else {
206 return Poll::Pending;
207 }
208 } else {
209 return Poll::Ready(None);
210 }
211 }
212 }
213}