1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::io::{Cursor, Write};
4use std::rc::Rc;
5use std::time::Duration;
6
7use crate::packet::Packet as TsPacket;
8use crate::pes::PES;
9use crate::pid::PID;
10use crate::result::Result;
11use crate::section::{WithHeader, WithSyntaxSection};
12use crate::subtable_id::{SubtableID, SubtableIDer};
13use crate::{EIT, PAT, PMT, SDT};
14
15pub struct Buf(pub Cursor<Vec<u8>>);
16
17impl Buf {
18 #[inline(always)]
19 fn reset(&mut self) {
20 self.0.set_position(0);
21 self.0.get_mut().clear();
22 }
23
24 #[inline(always)]
25 fn is_empty(&self) -> bool {
26 self.0.position() == 0
27 }
28
29 #[inline(always)]
30 pub fn sz(&self) -> usize {
31 self.0.position() as usize
32 }
33}
34
35impl Default for Buf {
36 fn default() -> Self {
37 Buf(Cursor::new(Vec::with_capacity(2048)))
38 }
39}
40
41pub struct Section {
42 table_id: SubtableID,
44
45 number: u8,
47
48 sz: usize,
50
51 pub buf: Buf,
52}
53
54impl Section {
55 fn new(table_id: SubtableID, number: u8, sz: usize) -> Section {
56 Section {
57 table_id,
58 number,
59 sz,
60 buf: Default::default(),
61 }
62 }
63
64 #[inline(always)]
65 fn into_ref(self) -> SectionRef {
66 Rc::new(RefCell::new(Box::new(self)))
67 }
68
69 #[inline(always)]
71 fn done(&self) -> bool {
72 self.sz_need() == 0
73 }
74
75 #[inline(always)]
77 fn sz_need(&self) -> usize {
78 self.sz - self.buf.sz()
79 }
80}
81
82type SectionRef = Rc<RefCell<Box<Section>>>;
83
84pub struct Sections(pub Vec<SectionRef>);
85
86impl Sections {
87 #[inline(always)]
88 fn get_mut(&mut self, number: u8) -> Option<&mut SectionRef> {
89 self.0.iter_mut().find(|s| s.borrow().number == number)
90 }
91
92 #[inline(always)]
93 fn push(&mut self, s: SectionRef) {
94 self.0.push(s);
95 self.0
96 .sort_unstable_by(|a, b| a.borrow().number.cmp(&b.borrow().number));
97 }
98}
99
100impl Default for Sections {
101 fn default() -> Self {
102 Sections(Vec::with_capacity(1))
103 }
104}
105
106pub struct Table {
107 last_section_number: u8,
109 pub sections: Sections,
110}
111
112impl Table {
113 fn new(last_section_number: u8) -> Table {
114 Table {
115 last_section_number,
116 sections: Default::default(),
117 }
118 }
119
120 #[inline(always)]
121 fn done(&self) -> bool {
122 match self.sections.0.len() {
123 0 => false,
124 n => {
125 let last = (&self.sections.0[n - 1]).borrow();
126 let first = (&self.sections.0[0]).borrow();
127
128 first.number == 0
129 && last.number == self.last_section_number
130 && first.done()
131 && last.done()
132 }
133 }
134 }
135}
136
137struct Tables {
138 map: HashMap<SubtableID, Table>,
139 current: Option<SectionRef>,
141}
142
143impl Tables {}
144
145impl Default for Tables {
146 fn default() -> Self {
147 Tables {
148 map: HashMap::new(),
149 current: None,
150 }
151 }
152}
153
154pub struct Packet {
155 pub pid: PID,
156
157 pub offset: usize,
158
159 pub pts: Option<Duration>,
161
162 pub dts: Option<Duration>,
164
165 pub buf: Buf,
166
167 started: bool,
169}
170
171impl Packet {
172 fn new(pid: PID) -> Packet {
173 Packet {
174 pid,
175 offset: 0,
176 pts: None,
177 dts: None,
178 buf: Default::default(),
179 started: false,
180 }
181 }
182}
183
184#[derive(Default)]
185struct Packets(HashMap<PID, Packet>);
186
187#[derive(Debug)]
189struct PMTPids(Vec<(PID, bool)>);
190
191impl PMTPids {
192 #[inline(always)]
193 fn has(&self, pid: PID) -> bool {
194 self.0.iter().any(|p| (*p).0 == pid)
195 }
196
197 #[inline(always)]
198 fn push_uniq(&mut self, pid: PID) {
199 if !self.has(pid) {
200 self.0.push((pid, false))
201 }
202 }
203
204 #[inline(always)]
206 fn is_packet_builded(&self, pid: PID) -> Option<bool> {
207 self.0.iter().find(|p| p.0 == pid).map(|p| p.1)
208 }
209
210 #[inline(always)]
211 fn set_is_packet_builded(&mut self, pid: PID, v: bool) {
212 if let Some(p) = self.0.iter_mut().find(|p| p.0 == pid) {
213 p.1 = v;
214 }
215 }
216
217 #[inline(always)]
219 #[allow(dead_code)]
220 fn are_all_packets_builded(&self) -> bool {
221 !self.0.iter().any(|p| !(*p).1)
222 }
223}
224
225impl Default for PMTPids {
226 fn default() -> Self {
227 PMTPids(Vec::with_capacity(3))
228 }
229}
230
231pub trait DemuxerEvents {
232 fn on_table(&mut self, _: SubtableID, _: &Table) {}
233 fn on_packet(&mut self, _: &Packet) {}
234}
235
236pub struct Demuxer<T>
239where
240 T: DemuxerEvents,
241{
242 offset: usize,
243
244 pat: Tables,
245 pmt: Tables,
246 eit: Tables,
247 sdt: Tables,
248
249 #[allow(dead_code)]
250 nit: Tables,
251 #[allow(dead_code)]
252 cat: Tables,
253 #[allow(dead_code)]
254 bat: Tables,
255
256 packets: Packets,
257
258 pmt_pids: PMTPids,
261
262 events: T,
263}
264
265unsafe impl<T> Send for Demuxer<T> where T: DemuxerEvents + Send {}
266
267impl<T> Demuxer<T>
268where
269 T: DemuxerEvents,
270{
271 pub fn new(events: T) -> Demuxer<T> {
272 Demuxer {
273 offset: 0,
274
275 pat: Default::default(),
276 pmt: Default::default(),
277 eit: Default::default(),
278 sdt: Default::default(),
279 nit: Default::default(),
280 cat: Default::default(),
281 bat: Default::default(),
282
283 pmt_pids: Default::default(),
284
285 packets: Default::default(),
286
287 events,
288 }
289 }
290
291 #[inline(always)]
295 fn build_pmt_pids(&mut self) {
296 for (_, table) in self.pat.map.iter() {
297 for section_ref in table.sections.0.iter() {
298 let section = (*section_ref).borrow();
299 let raw = section.buf.0.get_ref().as_slice();
300 let pat = PAT::new(raw);
301
302 for pid in pat
304 .programs()
305 .filter_map(Result::ok)
306 .filter(|p| p.pid().is_program_map())
307 .map(|p| PID::from(p.pid()))
308 {
309 self.pmt_pids.push_uniq(pid)
310 }
311 }
312 }
313 }
314
315 #[inline(always)]
319 fn build_packets(&mut self) {
320 for (_, table) in self.pmt.map.iter() {
321 for section_ref in table.sections.0.iter() {
322 let section = (*section_ref).borrow();
323 let raw = section.buf.0.get_ref().as_slice();
324 let pmt = PMT::new(raw);
325
326 for pid in pmt
328 .streams()
329 .filter_map(Result::ok)
330 .map(|s| PID::from(s.pid()))
331 {
332 self.packets
333 .0
334 .entry(pid)
335 .or_insert_with(|| Packet::new(pid));
336 }
337 }
338 }
339 }
340
341 #[inline(always)]
343 fn demux_section(&mut self, pid_or_pmt: (PID, bool), pkt: &TsPacket) -> Result<()> {
344 let tables = match pid_or_pmt {
345 (PID::PAT, false) => &mut self.pat,
346 (PID::SDT, false) => &mut self.sdt,
347 (PID::EIT, false) => &mut self.eit,
348 (PID::NIT, false) => &mut self.nit,
349 (PID::CAT, false) => &mut self.cat,
350 (_, true) => &mut self.pmt,
351 _ => unreachable!(),
352 };
353
354 let buf = pkt.buf_payload_section()?;
355
356 if pkt.pusi() {
357 let (id, sz, section_number, last_section_number) = match pid_or_pmt {
358 (PID::PAT, false) => {
359 let s = PAT::try_new(buf)?;
360 (
361 s.subtable_id(),
362 s.sz(),
363 s.section_number(),
364 s.last_section_number(),
365 )
366 }
367 (PID::SDT, false) => {
368 let s = SDT::try_new(buf)?;
369 (
370 s.subtable_id(),
371 s.sz(),
372 s.section_number(),
373 s.last_section_number(),
374 )
375 }
376 (PID::EIT, false) => {
377 let s = EIT::try_new(buf)?;
378 (
379 s.subtable_id(),
380 s.sz(),
381 s.section_number(),
382 s.last_section_number(),
383 )
384 }
385 (_, true) => {
386 let s = PMT::try_new(buf)?;
387 (
388 s.subtable_id(),
389 s.sz(),
390 s.section_number(),
391 s.last_section_number(),
392 )
393 }
394 _ => unreachable!(),
395 };
396
397 let table = tables
398 .map
399 .entry(id)
400 .or_insert_with(|| Table::new(last_section_number));
401
402 let section_ref = match table.sections.get_mut(section_number) {
403 Some(section_ref) => {
404 let mut section = (*section_ref).borrow_mut();
405 section.buf.reset();
406 section.sz = sz;
407
408 section_ref.clone()
409 }
410 None => {
411 let section_ref = Section::new(id, section_number, sz).into_ref();
412 table.sections.push(section_ref.clone());
413 section_ref
414 }
415 };
416
417 tables.current = Some(section_ref);
418 }
419
420 if let Some(section_ref) = &tables.current {
421 {
422 let mut section = (*section_ref).borrow_mut();
423 let sz_need = section.sz_need();
424
425 let buf = if buf.len() > sz_need {
427 &buf[0..sz_need]
428 } else {
429 buf
430 };
431
432 section.buf.0.write_all(buf)?;
433 }
434
435 {
436 let section = (*section_ref).borrow();
437 if section.done() {
438 if let Some(table) = tables.map.get(§ion.table_id) {
439 if table.done() {
440 self.events.on_table(section.table_id, &table);
442 }
443 }
444 }
445 }
446 }
447
448 Ok(())
449 }
450
451 pub fn demux(&mut self, raw: &[u8]) -> Result<()> {
452 if self.demux_tables(raw)? {
453 return Ok(());
454 }
455
456 self.demux_packets(raw)
457 }
458
459 pub fn demux_tables(&mut self, raw: &[u8]) -> Result<(bool)> {
463 self.offset += raw.len();
464
465 let pkt = TsPacket::new(&raw)?;
466 let pid = pkt.pid();
467
468 if pid.is_null() {
469 return Ok(true);
471 }
472
473 match pid {
474 PID::PAT => {
475 self.demux_section((pid, false), &pkt)?;
476
477 if self.pmt_pids.0.is_empty() {
479 self.pmt_pids.0.clear();
480 self.packets.0.clear();
481 self.build_pmt_pids();
482 }
483 }
484 PID::SDT | PID::EIT =>
485 self.demux_section((pid, false), &pkt)?,
486
487 PID::Other(..) => {
488 if self.pmt_pids.0.is_empty() {
491 return Ok(true);
492 }
493
494 match self.pmt_pids.is_packet_builded(pid) {
495 Some(true) => { self.demux_section((pid, true), &pkt)?;
497 },
498 Some(false) => { self.demux_section((pid, true), &pkt)?;
500
501 self.build_packets();
502
503 self.pmt_pids.set_is_packet_builded(pid, true);
504 },
505 None => {return Ok(false); }
506 }
507 }
508 _ => {}
509 }
510
511 Ok(true)
512 }
513
514 pub fn demux_packets(&mut self, raw: &[u8]) -> Result<()> {
516 self.offset += raw.len();
517
518 let pkt = TsPacket::new(&raw)?;
519 let pid = pkt.pid();
520
521 if pid.is_null() && !pid.is_other() && !self.pmt_pids.0.is_empty()
526 {
527 return Ok(());
528 }
529
530 let mut packet = match self.packets.0.get_mut(&pid) {
531 Some(packet) => packet,
532 None => return Ok(()), };
534
535 let mut buf = pkt.buf_payload_pes()?;
536
537 if pkt.pusi() {
538 let pes = PES::new(buf);
539
540 if !packet.buf.is_empty() {
541 self.events.on_packet(packet);
543 }
544
545 packet.buf.reset();
546 packet.started = true;
547 packet.offset += self.offset + raw.len() - buf.len();
548 packet.pts = pes.pts().map(Duration::from);
549 packet.dts = pes.dts().map(Duration::from);
550
551 buf = pes.buf_seek_payload();
552 }
553
554 if packet.started {
555 packet.buf.0.write_all(buf)?;
556 }
557
558 Ok(())
559 }
560}
561
562#[cfg(test)]
563mod tests {}