1use std::collections::{HashMap, VecDeque};
80
81use bytes::Bytes;
82
83use crate::pid::Pid;
84use crate::ts::{SectionReassembler, TsPacket};
85
86const PAT_TABLE_ID: u8 = 0x00;
88const TOT_TABLE_ID: u8 = 0x73;
90const MIN_SECTION_LEN: usize = 3;
92const LONG_FORM_EXTRA: usize = 5;
94const CRC_LEN: usize = 4;
96
97#[derive(Debug, Clone)]
104pub struct SectionEvent {
105 pid: Pid,
106 bytes: Bytes,
107}
108
109impl SectionEvent {
110 #[must_use]
112 pub fn pid(&self) -> Pid {
113 self.pid
114 }
115
116 #[must_use]
118 pub fn bytes(&self) -> &Bytes {
119 &self.bytes
120 }
121
122 #[must_use]
125 pub fn table_id(&self) -> u8 {
126 self.bytes[0]
127 }
128
129 #[must_use]
132 fn is_long_form(&self) -> bool {
133 (self.bytes[1] & 0x80) != 0
134 }
135
136 #[must_use]
140 pub fn version(&self) -> Option<u8> {
141 if self.is_long_form() && self.bytes.len() > 5 {
142 Some((self.bytes[5] >> 1) & 0x1F)
143 } else {
144 None
145 }
146 }
147
148 #[must_use]
150 pub fn table_id_extension(&self) -> Option<u16> {
151 if self.is_long_form() && self.bytes.len() > 4 {
152 Some(((self.bytes[3] as u16) << 8) | self.bytes[4] as u16)
153 } else {
154 None
155 }
156 }
157
158 #[must_use]
160 pub fn section_number(&self) -> Option<u8> {
161 if self.is_long_form() && self.bytes.len() > 6 {
162 Some(self.bytes[6])
163 } else {
164 None
165 }
166 }
167
168 #[must_use]
171 pub fn crc_ok(&self) -> bool {
172 true
173 }
174
175 pub fn table_section(&self) -> crate::Result<crate::tables::AnyTableSection<'_>> {
180 crate::tables::AnyTableSection::parse(&self.bytes)
181 }
182
183 pub fn table_section_with(
192 &self,
193 registry: &crate::tables::registry::TableRegistry,
194 ) -> crate::Result<crate::tables::AnyTableSection<'_>> {
195 crate::tables::AnyTableSection::parse_with(registry, &self.bytes)
196 }
197
198 pub fn parse<'s, T: crate::traits::TableDef<'s>>(&'s self) -> crate::Result<T> {
203 <T as dvb_common::Parse>::parse(&self.bytes)
204 }
205}
206
207#[non_exhaustive]
209#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
210pub struct Stats {
211 pub packets: u64,
213 pub sections_completed: u64,
215 pub emitted: u64,
217 pub suppressed: u64,
219 pub crc_failures: u64,
223 pub malformed_packets: u64,
225 pub gate_evictions: u64,
227}
228
229#[derive(Clone, Copy, PartialEq, Eq)]
231struct GateEntry {
232 version: u8,
234 crc: u32,
237}
238
239struct Config {
241 follow_pat: bool,
242 emit_repeats: bool,
243 gate_capacity: usize,
244}
245
246pub struct SiDemuxBuilder {
251 follow_pat: bool,
252 dvb_si_pids: bool,
253 emit_repeats: bool,
254 gate_capacity: usize,
255 extra_pids: Vec<Pid>,
256}
257
258impl Default for SiDemuxBuilder {
259 fn default() -> Self {
260 Self {
261 follow_pat: true,
262 dvb_si_pids: true,
263 emit_repeats: false,
264 gate_capacity: 65_536,
265 extra_pids: Vec::new(),
266 }
267 }
268}
269
270impl SiDemuxBuilder {
271 #[must_use]
274 pub fn follow_pat(mut self, on: bool) -> Self {
275 self.follow_pat = on;
276 self
277 }
278
279 #[must_use]
282 pub fn dvb_si_pids(mut self, on: bool) -> Self {
283 self.dvb_si_pids = on;
284 self
285 }
286
287 #[must_use]
289 pub fn pid(mut self, pid: Pid) -> Self {
290 self.extra_pids.push(pid);
291 self
292 }
293
294 #[must_use]
297 pub fn emit_repeats(mut self, on: bool) -> Self {
298 self.emit_repeats = on;
299 self
300 }
301
302 #[must_use]
305 pub fn gate_capacity(mut self, cap: usize) -> Self {
306 self.gate_capacity = cap;
307 self
308 }
309
310 #[must_use]
312 pub fn build(self) -> SiDemux {
313 let mut pids: HashMap<Pid, SectionReassembler> = HashMap::new();
314 if self.dvb_si_pids {
315 use crate::pid::well_known as wk;
316 for pid in [
317 wk::PAT,
318 wk::CAT,
319 wk::NIT,
320 wk::SDT_BAT,
321 wk::EIT,
322 wk::RST,
323 wk::TDT_TOT,
324 wk::SAT,
325 ] {
326 pids.entry(pid).or_default();
327 }
328 }
329 for p in self.extra_pids {
330 pids.entry(p).or_default();
331 }
332 SiDemux {
333 pids,
334 gate: HashMap::new(),
335 gate_order: VecDeque::new(),
336 cfg: Config {
337 follow_pat: self.follow_pat,
338 emit_repeats: self.emit_repeats,
339 gate_capacity: self.gate_capacity,
340 },
341 stats: Stats::default(),
342 scratch: Vec::new(),
343 completed_scratch: Vec::new(),
344 }
345 }
346}
347
348pub struct SiDemux {
352 pids: HashMap<Pid, SectionReassembler>,
353 gate: HashMap<u64, GateEntry>,
357 gate_order: VecDeque<u64>,
358 cfg: Config,
359 stats: Stats,
360 scratch: Vec<SectionEvent>,
361 completed_scratch: Vec<Bytes>,
362}
363
364impl SiDemux {
365 #[must_use]
367 pub fn builder() -> SiDemuxBuilder {
368 SiDemuxBuilder::default()
369 }
370
371 #[must_use]
373 pub fn stats(&self) -> Stats {
374 self.stats
375 }
376
377 pub fn feed(&mut self, packet: &[u8]) -> impl Iterator<Item = SectionEvent> + '_ {
381 self.scratch.clear();
382 self.stats.packets += 1;
383
384 match TsPacket::parse(packet) {
385 Err(_) => self.stats.malformed_packets += 1,
386 Ok(ts) => {
387 let pid = Pid::new(ts.header.pid);
388 let payload = ts.payload.unwrap_or(&[]);
389 let mut completed = std::mem::take(&mut self.completed_scratch);
396 if let Some(reasm) = self.pids.get_mut(&pid) {
397 reasm.feed(payload, ts.header.pusi);
398 while let Some(section) = reasm.pop_section() {
399 completed.push(section);
400 }
401 }
402 self.stats.sections_completed += completed.len() as u64;
403 for section in completed.drain(..) {
404 self.consider(pid, section);
405 }
406 self.completed_scratch = completed;
407 }
408 }
409
410 self.scratch.drain(..)
411 }
412
413 fn consider(&mut self, pid: Pid, section: Bytes) {
415 if section.len() < MIN_SECTION_LEN {
420 self.stats.crc_failures += 1;
421 return;
422 }
423
424 let table_id = section[0];
425 let long_form = (section[1] & 0x80) != 0;
426 let has_crc = long_form || table_id == TOT_TABLE_ID;
428
429 if has_crc {
431 if section.len() < CRC_LEN {
432 self.stats.crc_failures += 1;
433 return;
434 }
435 let covered = §ion[..section.len() - CRC_LEN];
436 let declared = u32::from_be_bytes([
437 section[section.len() - 4],
438 section[section.len() - 3],
439 section[section.len() - 2],
440 section[section.len() - 1],
441 ]);
442 let computed = dvb_common::crc32_mpeg2::compute(covered);
443 if computed != declared {
444 self.stats.crc_failures += 1;
445 return;
446 }
447 }
448
449 let (ext, section_number, version, change_crc) =
451 if long_form && section.len() >= MIN_SECTION_LEN + LONG_FORM_EXTRA + CRC_LEN {
452 let ext = ((section[3] as u16) << 8) | section[4] as u16;
453 let version = (section[5] >> 1) & 0x1F;
454 let section_number = section[6];
455 let crc = u32::from_be_bytes([
458 section[section.len() - 4],
459 section[section.len() - 3],
460 section[section.len() - 2],
461 section[section.len() - 1],
462 ]);
463 (ext, section_number, version, crc)
464 } else {
465 (0u16, 0u8, 0u8, dvb_common::crc32_mpeg2::compute(§ion))
469 };
470
471 let key = (pid.value() as u64)
472 | ((table_id as u64) << 13)
473 | ((ext as u64) << 21)
474 | ((section_number as u64) << 37);
475
476 let entry = GateEntry {
477 version,
478 crc: change_crc,
479 };
480
481 let changed = match self.gate.get(&key) {
482 Some(prev) => *prev != entry,
483 None => true,
484 };
485
486 let is_new = !self.gate.contains_key(&key);
488 if is_new && self.gate.len() >= self.cfg.gate_capacity {
489 if let Some(old) = self.gate_order.pop_front() {
490 self.gate.remove(&old);
491 self.stats.gate_evictions += 1;
492 }
493 }
494 if is_new {
495 self.gate_order.push_back(key);
496 }
497 match self.gate.entry(key) {
498 std::collections::hash_map::Entry::Occupied(mut oe) => {
499 *oe.get_mut() = entry;
500 }
501 std::collections::hash_map::Entry::Vacant(ve) => {
502 ve.insert(entry);
503 }
504 }
505
506 if changed || self.cfg.emit_repeats {
507 let event = SectionEvent {
508 pid,
509 bytes: section,
510 };
511 if self.cfg.follow_pat && changed && table_id == PAT_TABLE_ID {
513 self.follow_pat(&event);
514 }
515 self.stats.emitted += 1;
516 self.scratch.push(event);
517 } else {
518 self.stats.suppressed += 1;
519 }
520 }
521
522 fn follow_pat(&mut self, event: &SectionEvent) {
526 use crate::tables::pat::PatSection;
527 use dvb_common::Parse;
528 if let Ok(pat) = PatSection::parse(&event.bytes) {
529 for entry in &pat.entries {
530 if entry.program_number != 0 {
531 self.pids.entry(Pid::new(entry.pid)).or_default();
532 }
533 }
534 }
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541 use crate::ts::{TsHeader, TS_PACKET_SIZE};
542
543 fn ts_packet(pid: u16, section: &[u8]) -> [u8; TS_PACKET_SIZE] {
546 let mut pkt = [0xFFu8; TS_PACKET_SIZE];
547 let header = TsHeader {
548 tei: false,
549 pusi: true,
550 pid,
551 scrambling: 0,
552 has_adaptation: false,
553 has_payload: true,
554 continuity_counter: 0,
555 };
556 header.serialize_into(&mut pkt).unwrap();
557 pkt[4] = 0x00; let start = 5;
559 assert!(start + section.len() <= TS_PACKET_SIZE, "section too big");
560 pkt[start..start + section.len()].copy_from_slice(section);
561 pkt
562 }
563
564 fn long_section(
566 table_id: u8,
567 ext: u16,
568 version: u8,
569 section_number: u8,
570 payload: &[u8],
571 ) -> Vec<u8> {
572 let section_length = (LONG_FORM_EXTRA + payload.len() + CRC_LEN) as u16;
573 let mut v = vec![
574 table_id,
575 0x80 | 0x30 | ((section_length >> 8) as u8 & 0x0F),
576 (section_length & 0xFF) as u8,
577 (ext >> 8) as u8,
578 (ext & 0xFF) as u8,
579 0xC0 | ((version & 0x1F) << 1) | 0x01,
580 section_number,
581 section_number, ];
583 v.extend_from_slice(payload);
584 let crc = dvb_common::crc32_mpeg2::compute(&v);
585 v.extend_from_slice(&crc.to_be_bytes());
586 v
587 }
588
589 fn pat_section(tsid: u16, version: u8, entries: &[(u16, u16)]) -> Vec<u8> {
591 let mut body = Vec::new();
592 for &(pn, pid) in entries {
593 body.extend_from_slice(&pn.to_be_bytes());
594 body.push(0xE0 | ((pid >> 8) as u8 & 0x1F));
595 body.push((pid & 0xFF) as u8);
596 }
597 long_section(0x00, tsid, version, 0, &body)
598 }
599
600 fn pmt_section(program_number: u16, version: u8, pcr_pid: u16) -> Vec<u8> {
602 let body = [
605 0xE0 | ((pcr_pid >> 8) as u8 & 0x1F),
606 (pcr_pid & 0xFF) as u8,
607 0xF0,
608 0x00,
609 0x02,
610 0xE0 | (((pcr_pid + 1) >> 8) as u8 & 0x1F),
611 ((pcr_pid + 1) & 0xFF) as u8,
612 0xF0,
613 0x00,
614 ];
615 long_section(0x02, program_number, version, 0, &body)
616 }
617
618 #[test]
619 fn pat_emits_once_suppresses_repeat_reemits_on_version_change() {
620 let mut demux = SiDemux::builder().build();
621
622 let pat_v0 = pat_section(0x0001, 0, &[(1, 0x0100)]);
623 let pat_v1 = pat_section(0x0001, 1, &[(1, 0x0100)]);
624
625 let pkt_v0 = ts_packet(0x0000, &pat_v0);
626 let pkt_v1 = ts_packet(0x0000, &pat_v1);
627
628 let n0: Vec<_> = demux.feed(&pkt_v0).collect();
629 assert_eq!(n0.len(), 1, "PAT v0 should emit one event");
630 assert_eq!(n0[0].table_id(), 0x00);
631 assert_eq!(n0[0].version(), Some(0));
632
633 let n1: Vec<_> = demux.feed(&pkt_v0).collect();
634 assert_eq!(n1.len(), 0, "repeat PAT should be suppressed");
635
636 let n2: Vec<_> = demux.feed(&pkt_v1).collect();
637 assert_eq!(n2.len(), 1, "PAT v1 should re-emit");
638 assert_eq!(n2[0].version(), Some(1));
639
640 let s = demux.stats();
641 assert_eq!(s.sections_completed, 3);
642 assert_eq!(s.emitted, 2);
643 assert_eq!(s.suppressed, 1);
644 assert_eq!(s.crc_failures, 0);
645 }
646
647 #[test]
648 fn follow_pat_registers_pmt_pid_and_emits_typed_pmt() {
649 use crate::tables::AnyTableSection;
650 let mut demux = SiDemux::builder().build();
651
652 let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
654 let pat_evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
655 assert_eq!(pat_evts.len(), 1);
656
657 let pmt = pmt_section(1, 0, 0x0100);
660 let pmt_evts: Vec<_> = demux.feed(&ts_packet(0x0100, &pmt)).collect();
661 assert_eq!(pmt_evts.len(), 1, "PMT on the followed PID should emit");
662 assert_eq!(pmt_evts[0].pid(), Pid::new(0x0100));
663 match pmt_evts[0].table_section().unwrap() {
664 AnyTableSection::PmtSection(p) => assert_eq!(p.program_number, 1),
665 other => panic!("expected PmtSection, got {other:?}"),
666 }
667 }
668
669 #[test]
670 fn corrupted_crc_sdt_dropped_and_counted() {
671 let mut demux = SiDemux::builder().build();
672 let mut sdt = long_section(0x42, 0x0001, 0, 0, &[0xDE, 0xAD, 0xBE, 0xEF]);
674 sdt[8] ^= 0xFF;
676 let evts: Vec<_> = demux.feed(&ts_packet(0x0011, &sdt)).collect();
677 assert_eq!(evts.len(), 0, "corrupted SDT must not emit");
678 let s = demux.stats();
679 assert_eq!(s.crc_failures, 1);
680 assert_eq!(s.emitted, 0);
681 assert_eq!(s.sections_completed, 1);
682 }
683
684 #[test]
685 fn gate_capacity_evicts_fifo_and_reemits() {
686 let mut demux = SiDemux::builder().gate_capacity(2).build();
687
688 let a = long_section(0x4E, 0x0001, 0, 0, &[0x01]);
691 let b = long_section(0x4E, 0x0002, 0, 0, &[0x02]);
692 let c = long_section(0x4E, 0x0003, 0, 0, &[0x03]);
693
694 assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
695 assert_eq!(demux.feed(&ts_packet(0x0012, &b)).count(), 1);
696 assert_eq!(demux.feed(&ts_packet(0x0012, &c)).count(), 1);
698 assert_eq!(demux.stats().gate_evictions, 1);
699
700 assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
702 }
703
704 #[test]
705 fn garbage_packet_counted_no_panic() {
706 let mut demux = SiDemux::builder().build();
707 let garbage = [0x00u8; TS_PACKET_SIZE]; let evts: Vec<_> = demux.feed(&garbage).collect();
709 assert_eq!(evts.len(), 0);
710 assert_eq!(demux.stats().malformed_packets, 1);
711 assert_eq!(demux.stats().packets, 1);
712 }
713
714 #[test]
715 fn emit_repeats_bypasses_suppression() {
716 let mut demux = SiDemux::builder().emit_repeats(true).build();
717 let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
718 let pkt = ts_packet(0x0000, &pat);
719 assert_eq!(demux.feed(&pkt).count(), 1);
720 assert_eq!(demux.feed(&pkt).count(), 1, "emit_repeats re-emits");
721 assert_eq!(demux.stats().suppressed, 0);
722 assert_eq!(demux.stats().emitted, 2);
723 }
724
725 #[test]
726 fn table_section_with_empty_registry_matches_table_section() {
727 use crate::tables::registry::TableRegistry;
728 use crate::tables::AnyTableSection;
729
730 let mut demux = SiDemux::builder().build();
731 let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
732 let evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
733 assert_eq!(evts.len(), 1);
734
735 let reg = TableRegistry::new();
736 let with_reg = evts[0].table_section_with(®).unwrap();
737 let without = evts[0].table_section().unwrap();
738 assert!(matches!(with_reg, AnyTableSection::PatSection(_)));
739 assert!(matches!(without, AnyTableSection::PatSection(_)));
740 }
741
742 #[test]
743 fn table_section_with_custom_registry_yields_other() {
744 use crate::tables::registry::TableRegistry;
745 use crate::tables::AnyTableSection;
746 use crate::traits::TableDef;
747 use dvb_common::Parse;
748
749 const PRIVATE_TID: u8 = 0x90;
750
751 #[derive(Debug)]
752 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
753 struct PrivateTable {
754 table_id: u8,
755 }
756
757 impl<'a> Parse<'a> for PrivateTable {
758 type Error = crate::Error;
759 fn parse(bytes: &'a [u8]) -> crate::Result<Self> {
760 if bytes.is_empty() {
761 return Err(crate::Error::BufferTooShort {
762 need: 1,
763 have: 0,
764 what: "PrivateTable",
765 });
766 }
767 Ok(Self { table_id: bytes[0] })
768 }
769 }
770
771 impl<'a> TableDef<'a> for PrivateTable {
772 const TABLE_ID_RANGES: &'static [(u8, u8)] = &[(PRIVATE_TID, PRIVATE_TID)];
773 const NAME: &'static str = "PRIVATE_TABLE";
774 }
775
776 let mut reg = TableRegistry::new();
777 reg.register::<PrivateTable>();
778
779 let mut demux = SiDemux::builder()
780 .dvb_si_pids(false)
781 .pid(Pid::new(0x0200))
782 .build();
783
784 let section = long_section(PRIVATE_TID, 0x0001, 0, 0, &[0x42]);
785 let evts: Vec<_> = demux.feed(&ts_packet(0x0200, §ion)).collect();
786 assert_eq!(evts.len(), 1);
787
788 let result = evts[0].table_section_with(®).unwrap();
789 match result {
790 AnyTableSection::Other {
791 table_id,
792 ref value,
793 } => {
794 assert_eq!(table_id, PRIVATE_TID);
795 let pt = value.downcast_ref::<PrivateTable>().unwrap();
796 assert_eq!(pt.table_id, PRIVATE_TID);
797 }
798 other => panic!("expected Other, got {other:?}"),
799 }
800 }
801}