1use crate::error::*;
2
3use crate::buffer::Buffered;
4use std::any::Any;
5use std::io::SeekFrom;
6use std::sync::Arc;
7
8use crate::common::*;
9
10use crate::data::packet::Packet;
11use crate::stream::Stream;
12
13#[non_exhaustive]
15#[derive(Clone, Debug)]
16pub enum Event {
17 NewPacket(Packet),
19 NewStream(Stream),
21 MoreDataNeeded(usize),
23 Continue,
27 Eof,
31}
32
33pub trait Demuxer: Send + Sync {
35 fn read_headers(&mut self, buf: &mut dyn Buffered, info: &mut GlobalInfo) -> Result<SeekFrom>;
40 fn read_event(&mut self, buf: &mut dyn Buffered) -> Result<(SeekFrom, Event)>;
42}
43
44pub struct Context<D: Demuxer, R: Buffered> {
47 demuxer: D,
48 reader: R,
49 pub info: GlobalInfo,
51 pub user_private: Option<Arc<dyn Any + Send + Sync>>,
55}
56
57impl<D: Demuxer, R: Buffered> Context<D, R> {
58 pub fn new(demuxer: D, reader: R) -> Self {
60 Context {
61 demuxer,
62 reader,
63 info: GlobalInfo {
64 duration: None,
65 timebase: None,
66 streams: Vec::with_capacity(2),
67 },
68 user_private: None,
69 }
70 }
71
72 pub fn demuxer(&self) -> &D {
74 &self.demuxer
75 }
76
77 fn read_headers_internal(&mut self) -> Result<()> {
78 let demux = &mut self.demuxer;
79
80 let res = demux.read_headers(&mut self.reader, &mut self.info);
81 match res {
82 Err(e) => Err(e),
83 Ok(seek) => {
84 let res = self.reader.seek(seek);
86 log::trace!("stream now at index: {:?}", res);
87 Ok(())
88 }
89 }
90 }
91
92 pub fn read_headers(&mut self) -> Result<()> {
94 loop {
95 self.reader.fill_buf()?;
97 match self.read_headers_internal() {
98 Err(e) => match e {
99 Error::MoreDataNeeded(needed) => {
100 self.reader.grow(needed);
101 }
102 _ => return Err(e),
103 },
104 Ok(_) => return Ok(()),
105 }
106 }
107 }
108
109 fn read_event_internal(&mut self) -> Result<Event> {
110 let demux = &mut self.demuxer;
111
112 let res = demux.read_event(&mut self.reader);
113 match res {
114 Err(e) => Err(e),
115 Ok((seek, mut event)) => {
116 let _ = self.reader.seek(seek)?;
118 if let Event::NewStream(ref st) = event {
119 self.info.streams.push(st.clone());
120 }
121 if let Event::MoreDataNeeded(size) = event {
122 return Err(Error::MoreDataNeeded(size));
123 }
124 if let Event::NewPacket(ref mut pkt) = event {
125 if pkt.t.timebase.is_none() {
126 if let Some(st) = self
127 .info
128 .streams
129 .iter()
130 .find(|s| s.index as isize == pkt.stream_index)
131 {
132 pkt.t.timebase = Some(st.timebase);
133 }
134 }
135 }
136 Ok(event)
137 }
138 }
139 }
140
141 pub fn read_event(&mut self) -> Result<Event> {
143 loop {
145 match self.read_event_internal() {
146 Err(e) => match e {
147 Error::MoreDataNeeded(needed) => {
148 let len = self.reader.data().len();
149
150 if len >= needed {
152 continue;
153 }
154 self.reader.grow(needed);
155 self.reader.fill_buf()?;
156 if self.reader.data().len() <= len {
157 return Ok(Event::Eof);
158 }
159 }
160 _ => return Err(e),
161 },
162 Ok(ev) => return Ok(ev),
163 }
164 }
165 }
166}
167
168#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct Descr {
173 pub name: &'static str,
175 pub demuxer: &'static str,
177 pub description: &'static str,
179 pub extensions: &'static [&'static str],
181 pub mime: &'static [&'static str],
183}
184
185pub trait Descriptor {
187 type OutputDemuxer: Demuxer;
189
190 fn create(&self) -> Self::OutputDemuxer;
192 fn describe(&self) -> &Descr;
194
195 fn probe(&self, data: &[u8]) -> u8;
198}
199
200pub const PROBE_DATA: usize = 4 * 1024;
202
203pub const PROBE_SCORE_EXTENSION: u8 = 50;
206
207pub trait Probe<T: Descriptor + ?Sized> {
209 fn probe(&self, data: &[u8]) -> Option<&'static T>;
211}
212
213impl<T: Descriptor + ?Sized> Probe<T> for [&'static T] {
214 fn probe(&self, data: &[u8]) -> Option<&'static T> {
215 let mut max = u8::MIN;
216 let mut candidate: Option<&'static T> = None;
217 for desc in self {
218 let score = desc.probe(data);
219
220 if score > max {
221 max = score;
222 candidate = Some(*desc);
223 }
224 }
225
226 if max > PROBE_SCORE_EXTENSION {
227 candidate
228 } else {
229 None
230 }
231 }
232}
233
234#[cfg(test)]
235mod test {
236 use super::*;
237 use crate::data::packet::Packet;
238 use std::io::SeekFrom;
239
240 struct DummyDes {
241 d: Descr,
242 }
243
244 struct DummyDemuxer {}
245
246 impl Demuxer for DummyDemuxer {
247 fn read_headers(
248 &mut self,
249 buf: &mut dyn Buffered,
250 _info: &mut GlobalInfo,
251 ) -> Result<SeekFrom> {
252 let len = buf.data().len();
253 if 9 > len {
254 let needed = 9 - len;
255 Err(Error::MoreDataNeeded(needed))
256 } else {
257 Ok(SeekFrom::Current(9))
258 }
259 }
260 fn read_event(&mut self, buf: &mut dyn Buffered) -> Result<(SeekFrom, Event)> {
261 let size = 2;
262 let len = buf.data().len();
263 if size > len {
264 Err(Error::MoreDataNeeded(size - len))
265 } else {
266 log::debug!("{:?}", buf.data());
267 match &buf.data()[..2] {
268 b"p1" => Ok((SeekFrom::Current(3), Event::NewPacket(Packet::new()))),
269 b"e1" => Ok((SeekFrom::Current(3), Event::MoreDataNeeded(0))),
270 _ => Err(Error::InvalidData),
271 }
272 }
273 }
274 }
275
276 impl Descriptor for DummyDes {
277 type OutputDemuxer = DummyDemuxer;
278
279 fn create(&self) -> Self::OutputDemuxer {
280 DummyDemuxer {}
281 }
282 fn describe<'a>(&'_ self) -> &'_ Descr {
283 &self.d
284 }
285 fn probe(&self, data: &[u8]) -> u8 {
286 match data {
287 b"dummy" => 100,
288 _ => 0,
289 }
290 }
291 }
292
293 const DUMMY_DES: &dyn Descriptor<OutputDemuxer = DummyDemuxer> = &DummyDes {
294 d: Descr {
295 name: "dummy",
296 demuxer: "dummy",
297 description: "Dummy dem",
298 extensions: &["dm", "dum"],
299 mime: &["application/dummy"],
300 },
301 };
302
303 #[test]
304 fn probe() {
305 let demuxers: &[&'static dyn Descriptor<OutputDemuxer = DummyDemuxer>] = &[DUMMY_DES];
306
307 demuxers.probe(b"dummy").unwrap();
308 }
309
310 use crate::buffer::*;
311 use std::io::Cursor;
312
313 #[test]
314 fn read_headers() {
315 let buf = b"dummy header";
316 let r = AccReader::with_capacity(4, Cursor::new(buf));
317 let d = DUMMY_DES.create();
318 let mut c = Context::new(d, r);
319
320 c.read_headers().unwrap();
321 }
322
323 #[test]
324 fn read_event() {
325 let buf = b"dummy header p1 e1 p1 ";
326
327 let r = AccReader::with_capacity(4, Cursor::new(buf));
328 let d = DUMMY_DES.create();
329 let mut c = Context::new(d, r);
330
331 c.read_headers().unwrap();
332
333 println!("{:?}", c.read_event());
334 println!("{:?}", c.read_event());
335 println!("{:?}", c.read_event());
336 println!("{:?}", c.read_event());
337 }
338}