va_ts/
demuxer.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::io::{Cursor, Write};
4use std::rc::Rc;
5use std::time::Duration;
6
7use crate::packet::Packet as TsPacket;
8use crate::pes::PES;
9use crate::pid::PID;
10use crate::result::Result;
11use crate::section::{WithHeader, WithSyntaxSection};
12use crate::subtable_id::{SubtableID, SubtableIDer};
13use crate::{EIT, PAT, PMT, SDT};
14
15pub struct Buf(pub Cursor<Vec<u8>>);
16
17impl Buf {
18    #[inline(always)]
19    fn reset(&mut self) {
20        self.0.set_position(0);
21        self.0.get_mut().clear();
22    }
23
24    #[inline(always)]
25    fn is_empty(&self) -> bool {
26        self.0.position() == 0
27    }
28
29    #[inline(always)]
30    pub fn sz(&self) -> usize {
31        self.0.position() as usize
32    }
33}
34
35impl Default for Buf {
36    fn default() -> Self {
37        Buf(Cursor::new(Vec::with_capacity(2048)))
38    }
39}
40
41pub struct Section {
42    /// parent table-id
43    table_id: SubtableID,
44
45    /// number inside table sections
46    number: u8,
47
48    /// full section size with header, data, CRC
49    sz: usize,
50
51    pub buf: Buf,
52}
53
54impl Section {
55    fn new(table_id: SubtableID, number: u8, sz: usize) -> Section {
56        Section {
57            table_id,
58            number,
59            sz,
60            buf: Default::default(),
61        }
62    }
63
64    #[inline(always)]
65    fn into_ref(self) -> SectionRef {
66        Rc::new(RefCell::new(Box::new(self)))
67    }
68
69    /// section consumed all data
70    #[inline(always)]
71    fn done(&self) -> bool {
72        self.sz_need() == 0
73    }
74
75    /// sz need to read
76    #[inline(always)]
77    fn sz_need(&self) -> usize {
78        self.sz - self.buf.sz()
79    }
80}
81
82type SectionRef = Rc<RefCell<Box<Section>>>;
83
84pub struct Sections(pub Vec<SectionRef>);
85
86impl Sections {
87    #[inline(always)]
88    fn get_mut(&mut self, number: u8) -> Option<&mut SectionRef> {
89        self.0.iter_mut().find(|s| s.borrow().number == number)
90    }
91
92    #[inline(always)]
93    fn push(&mut self, s: SectionRef) {
94        self.0.push(s);
95        self.0
96            .sort_unstable_by(|a, b| a.borrow().number.cmp(&b.borrow().number));
97    }
98}
99
100impl Default for Sections {
101    fn default() -> Self {
102        Sections(Vec::with_capacity(1))
103    }
104}
105
106pub struct Table {
107    /// mpeg-ts last-section-number
108    last_section_number: u8,
109    pub sections: Sections,
110}
111
112impl Table {
113    fn new(last_section_number: u8) -> Table {
114        Table {
115            last_section_number,
116            sections: Default::default(),
117        }
118    }
119
120    #[inline(always)]
121    fn done(&self) -> bool {
122        match self.sections.0.len() {
123            0 => false,
124            n => {
125                let last = (&self.sections.0[n - 1]).borrow();
126                let first = (&self.sections.0[0]).borrow();
127
128                first.number == 0
129                    && last.number == self.last_section_number
130                    && first.done()
131                    && last.done()
132            }
133        }
134    }
135}
136
137struct Tables {
138    map: HashMap<SubtableID, Table>,
139    /// current demuxing section
140    current: Option<SectionRef>,
141}
142
143impl Tables {}
144
145impl Default for Tables {
146    fn default() -> Self {
147        Tables {
148            map: HashMap::new(),
149            current: None,
150        }
151    }
152}
153
154pub struct Packet {
155    pub pid: PID,
156
157    pub offset: usize,
158
159    /// presentation time stamp
160    pub pts: Option<Duration>,
161
162    /// decode time stamp
163    pub dts: Option<Duration>,
164
165    pub buf: Buf,
166
167    /// got ts PUSI
168    started: bool,
169}
170
171impl Packet {
172    fn new(pid: PID) -> Packet {
173        Packet {
174            pid,
175            offset: 0,
176            pts: None,
177            dts: None,
178            buf: Default::default(),
179            started: false,
180        }
181    }
182}
183
184#[derive(Default)]
185struct Packets(HashMap<PID, Packet>);
186
187/// pid, packet-constructed
188#[derive(Debug)]
189struct PMTPids(Vec<(PID, bool)>);
190
191impl PMTPids {
192    #[inline(always)]
193    fn has(&self, pid: PID) -> bool {
194        self.0.iter().any(|p| (*p).0 == pid)
195    }
196
197    #[inline(always)]
198    fn push_uniq(&mut self, pid: PID) {
199        if !self.has(pid) {
200            self.0.push((pid, false))
201        }
202    }
203
204    /// got pid? and pid is parsed and packet builded
205    #[inline(always)]
206    fn is_packet_builded(&self, pid: PID) -> Option<bool> {
207        self.0.iter().find(|p| p.0 == pid).map(|p| p.1)
208    }
209
210    #[inline(always)]
211    fn set_is_packet_builded(&mut self, pid: PID, v: bool) {
212        if let Some(p) = self.0.iter_mut().find(|p| p.0 == pid) {
213            p.1 = v;
214        }
215    }
216
217    /// all pids are parsed?
218    #[inline(always)]
219    #[allow(dead_code)]
220    fn are_all_packets_builded(&self) -> bool {
221        !self.0.iter().any(|p| !(*p).1)
222    }
223}
224
225impl Default for PMTPids {
226    fn default() -> Self {
227        PMTPids(Vec::with_capacity(3))
228    }
229}
230
231pub trait DemuxerEvents {
232    fn on_table(&mut self, _: SubtableID, _: &Table) {}
233    fn on_packet(&mut self, _: &Packet) {}
234}
235
236/// TODO: use tree, redix tree here
237/// TODO: add benches
238pub struct Demuxer<T>
239where
240    T: DemuxerEvents,
241{
242    offset: usize,
243
244    pat: Tables,
245    pmt: Tables,
246    eit: Tables,
247    sdt: Tables,
248
249    #[allow(dead_code)]
250    nit: Tables,
251    #[allow(dead_code)]
252    cat: Tables,
253    #[allow(dead_code)]
254    bat: Tables,
255
256    packets: Packets,
257
258    // TODO: add PID with state(is-parsed or not)
259    //       for multiple PMTs
260    pmt_pids: PMTPids,
261
262    events: T,
263}
264
265unsafe impl<T> Send for Demuxer<T> where T: DemuxerEvents + Send {}
266
267impl<T> Demuxer<T>
268where
269    T: DemuxerEvents,
270{
271    pub fn new(events: T) -> Demuxer<T> {
272        Demuxer {
273            offset: 0,
274
275            pat: Default::default(),
276            pmt: Default::default(),
277            eit: Default::default(),
278            sdt: Default::default(),
279            nit: Default::default(),
280            cat: Default::default(),
281            bat: Default::default(),
282
283            pmt_pids: Default::default(),
284
285            packets: Default::default(),
286
287            events,
288        }
289    }
290
291    /// cache pmt pids
292    // TODO: also do via iterator
293    // TODO: .iter().collect() for lazy collection
294    #[inline(always)]
295    fn build_pmt_pids(&mut self) {
296        for (_, table) in self.pat.map.iter() {
297            for section_ref in table.sections.0.iter() {
298                let section = (*section_ref).borrow();
299                let raw = section.buf.0.get_ref().as_slice();
300                let pat = PAT::new(raw);
301
302                // TODO: refactor via iter/to-iter
303                for pid in pat
304                    .programs()
305                    .filter_map(Result::ok)
306                    .filter(|p| p.pid().is_program_map())
307                    .map(|p| PID::from(p.pid()))
308                {
309                    self.pmt_pids.push_uniq(pid)
310                }
311            }
312        }
313    }
314
315    /// build packets cache
316    // TODO: also do via iterator
317    // TODO: .iter().collect() for lazy collection
318    #[inline(always)]
319    fn build_packets(&mut self) {
320        for (_, table) in self.pmt.map.iter() {
321            for section_ref in table.sections.0.iter() {
322                let section = (*section_ref).borrow();
323                let raw = section.buf.0.get_ref().as_slice();
324                let pmt = PMT::new(raw);
325
326                // TODO: refactor via iter/to-iter
327                for pid in pmt
328                    .streams()
329                    .filter_map(Result::ok)
330                    .map(|s| PID::from(s.pid()))
331                {
332                    self.packets
333                        .0
334                        .entry(pid)
335                        .or_insert_with(|| Packet::new(pid));
336                }
337            }
338        }
339    }
340
341    // TODO: move to macros?
342    #[inline(always)]
343    fn demux_section(&mut self, pid_or_pmt: (PID, bool), pkt: &TsPacket) -> Result<()> {
344        let tables = match pid_or_pmt {
345            (PID::PAT, false) => &mut self.pat,
346            (PID::SDT, false) => &mut self.sdt,
347            (PID::EIT, false) => &mut self.eit,
348            (PID::NIT, false) => &mut self.nit,
349            (PID::CAT, false) => &mut self.cat,
350            (_, true) => &mut self.pmt,
351            _ => unreachable!(),
352        };
353
354        let buf = pkt.buf_payload_section()?;
355
356        if pkt.pusi() {
357            let (id, sz, section_number, last_section_number) = match pid_or_pmt {
358                (PID::PAT, false) => {
359                    let s = PAT::try_new(buf)?;
360                    (
361                        s.subtable_id(),
362                        s.sz(),
363                        s.section_number(),
364                        s.last_section_number(),
365                    )
366                }
367                (PID::SDT, false) => {
368                    let s = SDT::try_new(buf)?;
369                    (
370                        s.subtable_id(),
371                        s.sz(),
372                        s.section_number(),
373                        s.last_section_number(),
374                    )
375                }
376                (PID::EIT, false) => {
377                    let s = EIT::try_new(buf)?;
378                    (
379                        s.subtable_id(),
380                        s.sz(),
381                        s.section_number(),
382                        s.last_section_number(),
383                    )
384                }
385                (_, true) => {
386                    let s = PMT::try_new(buf)?;
387                    (
388                        s.subtable_id(),
389                        s.sz(),
390                        s.section_number(),
391                        s.last_section_number(),
392                    )
393                }
394                _ => unreachable!(),
395            };
396
397            let table = tables
398                .map
399                .entry(id)
400                .or_insert_with(|| Table::new(last_section_number));
401
402            let section_ref = match table.sections.get_mut(section_number) {
403                Some(section_ref) => {
404                    let mut section = (*section_ref).borrow_mut();
405                    section.buf.reset();
406                    section.sz = sz;
407
408                    section_ref.clone()
409                }
410                None => {
411                    let section_ref = Section::new(id, section_number, sz).into_ref();
412                    table.sections.push(section_ref.clone());
413                    section_ref
414                }
415            };
416
417            tables.current = Some(section_ref);
418        }
419
420        if let Some(section_ref) = &tables.current {
421            {
422                let mut section = (*section_ref).borrow_mut();
423                let sz_need = section.sz_need();
424
425                // remove null/padding bytes
426                let buf = if buf.len() > sz_need {
427                    &buf[0..sz_need]
428                } else {
429                    buf
430                };
431
432                section.buf.0.write_all(buf)?;
433            }
434
435            {
436                let section = (*section_ref).borrow();
437                if section.done() {
438                    if let Some(table) = tables.map.get(&section.table_id) {
439                        if table.done() {
440                            // emit
441                            self.events.on_table(section.table_id, &table);
442                        }
443                    }
444                }
445            }
446        }
447
448        Ok(())
449    }
450
451    pub fn demux(&mut self, raw: &[u8]) -> Result<()> {
452        if self.demux_tables(raw)? {
453            return Ok(());
454        }
455
456        self.demux_packets(raw)
457    }
458
459    /// ffmpeg::avformat_open_input analog
460    /// probe input
461    /// return: is pid handled?
462    pub fn demux_tables(&mut self, raw: &[u8]) -> Result<(bool)> {
463        self.offset += raw.len();
464
465        let pkt = TsPacket::new(&raw)?;
466        let pid = pkt.pid();
467
468        if pid.is_null() {
469            // null packet PID
470            return Ok(true);
471        }
472
473        match pid {
474            PID::PAT => {
475                self.demux_section((pid, false), &pkt)?;
476
477                // extract pids from PAT
478                if self.pmt_pids.0.is_empty() {
479                    self.pmt_pids.0.clear();
480                    self.packets.0.clear();
481                    self.build_pmt_pids();
482                }
483            }
484            PID::SDT | PID::EIT /* | PID::NIT | PID::CAT | PID::BAT */ =>
485                self.demux_section((pid, false), &pkt)?,
486
487            PID::Other(..) => {
488                // PAT not ready yet
489                // wait for PAT
490                if self.pmt_pids.0.is_empty() {
491                    return Ok(true);
492                }
493
494                match self.pmt_pids.is_packet_builded(pid) {
495                    Some(true) => { // got PMT and already builded
496                        self.demux_section((pid, true), &pkt)?;
497                    },
498                    Some(false) => { // got PMT and not builded
499                        self.demux_section((pid, true), &pkt)?;
500
501                        self.build_packets();
502
503                        self.pmt_pids.set_is_packet_builded(pid, true);
504                    },
505                    None => {return Ok(false); }
506                }
507            }
508            _ => {}
509        }
510
511        Ok(true)
512    }
513
514    /// ffmpeg::av_read_frame analog
515    pub fn demux_packets(&mut self, raw: &[u8]) -> Result<()> {
516        self.offset += raw.len();
517
518        let pkt = TsPacket::new(&raw)?;
519        let pid = pkt.pid();
520
521        if pid.is_null() // null packet PID
522        && !pid.is_other() // PID is section/table PID
523        // PAT not ready yet
524        // wait for pat
525        && !self.pmt_pids.0.is_empty()
526        {
527            return Ok(());
528        }
529
530        let mut packet = match self.packets.0.get_mut(&pid) {
531            Some(packet) => packet,
532            None => return Ok(()), // packet is not builder - wait fot PMT
533        };
534
535        let mut buf = pkt.buf_payload_pes()?;
536
537        if pkt.pusi() {
538            let pes = PES::new(buf);
539
540            if !packet.buf.is_empty() {
541                // emit
542                self.events.on_packet(packet);
543            }
544
545            packet.buf.reset();
546            packet.started = true;
547            packet.offset += self.offset + raw.len() - buf.len();
548            packet.pts = pes.pts().map(Duration::from);
549            packet.dts = pes.dts().map(Duration::from);
550
551            buf = pes.buf_seek_payload();
552        }
553
554        if packet.started {
555            packet.buf.0.write_all(buf)?;
556        }
557
558        Ok(())
559    }
560}
561
562#[cfg(test)]
563mod tests {}