1use std::{
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use futures::{Stream, StreamExt};
9
10use crate::RtpPacket;
11
12pub trait Depacketizer {
23 type Frame;
24 type Error;
25
26 fn push(&mut self, packet: RtpPacket) -> Result<(), Self::Error>;
32
33 fn flush(&mut self) -> Result<(), Self::Error>;
39
40 fn take(&mut self) -> Result<Option<Self::Frame>, Self::Error>;
45
46 #[inline]
48 fn map_frame<F, T>(self, f: F) -> MapFrame<Self, F>
49 where
50 F: FnMut(Self::Frame) -> T,
51 Self: Sized,
52 {
53 MapFrame {
54 depacketizer: self,
55 closure: f,
56 }
57 }
58
59 #[inline]
61 fn map_err<F, E>(self, f: F) -> MapErr<Self, F>
62 where
63 F: FnMut(Self::Error) -> E,
64 Self: Sized,
65 {
66 MapErr {
67 depacketizer: self,
68 closure: f,
69 }
70 }
71}
72
73pub struct MapErr<D, F> {
75 depacketizer: D,
76 closure: F,
77}
78
79impl<D, F, E> Depacketizer for MapErr<D, F>
80where
81 D: Depacketizer,
82 F: FnMut(D::Error) -> E,
83{
84 type Frame = D::Frame;
85 type Error = E;
86
87 #[inline]
88 fn push(&mut self, packet: RtpPacket) -> Result<(), Self::Error> {
89 self.depacketizer.push(packet).map_err(&mut self.closure)
90 }
91
92 #[inline]
93 fn flush(&mut self) -> Result<(), Self::Error> {
94 self.depacketizer.flush().map_err(&mut self.closure)
95 }
96
97 #[inline]
98 fn take(&mut self) -> Result<Option<Self::Frame>, Self::Error> {
99 self.depacketizer.take().map_err(&mut self.closure)
100 }
101}
102
103pub struct MapFrame<D, F> {
105 depacketizer: D,
106 closure: F,
107}
108
109impl<D, F, T> Depacketizer for MapFrame<D, F>
110where
111 D: Depacketizer,
112 F: FnMut(D::Frame) -> T,
113{
114 type Frame = T;
115 type Error = D::Error;
116
117 #[inline]
118 fn push(&mut self, packet: RtpPacket) -> Result<(), Self::Error> {
119 self.depacketizer.push(packet)
120 }
121
122 #[inline]
123 fn flush(&mut self) -> Result<(), Self::Error> {
124 self.depacketizer.flush()
125 }
126
127 #[inline]
128 fn take(&mut self) -> Result<Option<Self::Frame>, Self::Error> {
129 if let Some(frame) = self.depacketizer.take()? {
130 Ok(Some((self.closure)(frame)))
131 } else {
132 Ok(None)
133 }
134 }
135}
136
137impl<T> Depacketizer for Box<T>
138where
139 T: Depacketizer + ?Sized,
140{
141 type Frame = T::Frame;
142 type Error = T::Error;
143
144 #[inline]
145 fn push(&mut self, packet: RtpPacket) -> Result<(), Self::Error> {
146 <T as Depacketizer>::push(self, packet)
147 }
148
149 #[inline]
150 fn flush(&mut self) -> Result<(), Self::Error> {
151 <T as Depacketizer>::flush(self)
152 }
153
154 #[inline]
155 fn take(&mut self) -> Result<Option<Self::Frame>, Self::Error> {
156 <T as Depacketizer>::take(self)
157 }
158}
159
160pub struct MediaStream<S, D> {
163 rtp_stream: Option<S>,
164 depacketizer: D,
165}
166
167impl<S, D> MediaStream<S, D> {
168 #[inline]
170 pub const fn new(rtp_stream: S, depacketizer: D) -> Self {
171 Self {
172 rtp_stream: Some(rtp_stream),
173 depacketizer,
174 }
175 }
176}
177
178impl<S, D, E> Stream for MediaStream<S, D>
179where
180 S: Stream<Item = Result<RtpPacket, E>> + Unpin,
181 D: Depacketizer + Unpin,
182 E: From<D::Error>,
183{
184 type Item = Result<D::Frame, E>;
185
186 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
187 loop {
188 if let Some(frame) = self.depacketizer.take()? {
189 return Poll::Ready(Some(Ok(frame)));
190 } else if let Some(stream) = self.rtp_stream.as_mut() {
191 if let Poll::Ready(ready) = stream.poll_next_unpin(cx) {
192 if let Some(packet) = ready.transpose()? {
193 self.depacketizer.push(packet)?;
194 } else {
195 self.depacketizer.flush()?;
196 self.rtp_stream = None;
197 }
198 } else {
199 return Poll::Pending;
200 }
201 } else {
202 return Poll::Ready(None);
203 }
204 }
205 }
206}