1use std::collections::{HashMap, VecDeque};
80
81use bytes::Bytes;
82
83use crate::pid::Pid;
84use crate::ts::{SectionReassembler, TsPacket};
85
86const PAT_TABLE_ID: u8 = 0x00;
88const TOT_TABLE_ID: u8 = 0x73;
90const MIN_SECTION_LEN: usize = 3;
92const LONG_FORM_EXTRA: usize = 5;
94const CRC_LEN: usize = 4;
96
97#[derive(Debug, Clone)]
104pub struct SectionEvent {
105 pid: Pid,
106 bytes: Bytes,
107}
108
109impl SectionEvent {
110 #[must_use]
112 pub fn pid(&self) -> Pid {
113 self.pid
114 }
115
116 #[must_use]
118 pub fn bytes(&self) -> &Bytes {
119 &self.bytes
120 }
121
122 #[must_use]
125 pub fn table_id(&self) -> u8 {
126 self.bytes[0]
127 }
128
129 #[must_use]
132 fn is_long_form(&self) -> bool {
133 (self.bytes[1] & 0x80) != 0
134 }
135
136 #[must_use]
140 pub fn version(&self) -> Option<u8> {
141 if self.is_long_form() && self.bytes.len() > 5 {
142 Some((self.bytes[5] >> 1) & 0x1F)
143 } else {
144 None
145 }
146 }
147
148 #[must_use]
150 pub fn table_id_extension(&self) -> Option<u16> {
151 if self.is_long_form() && self.bytes.len() > 4 {
152 Some(((self.bytes[3] as u16) << 8) | self.bytes[4] as u16)
153 } else {
154 None
155 }
156 }
157
158 #[must_use]
160 pub fn section_number(&self) -> Option<u8> {
161 if self.is_long_form() && self.bytes.len() > 6 {
162 Some(self.bytes[6])
163 } else {
164 None
165 }
166 }
167
168 #[must_use]
171 pub fn crc_ok(&self) -> bool {
172 true
173 }
174
175 pub fn table_section(&self) -> crate::Result<crate::tables::AnyTableSection<'_>> {
180 crate::tables::AnyTableSection::parse(&self.bytes)
181 }
182
183 pub fn table_section_with(
192 &self,
193 registry: &crate::tables::registry::TableRegistry,
194 ) -> crate::Result<crate::tables::AnyTableSection<'_>> {
195 crate::tables::AnyTableSection::parse_with(registry, &self.bytes)
196 }
197
198 pub fn parse<'s, T: crate::traits::TableDef<'s>>(&'s self) -> crate::Result<T> {
203 <T as dvb_common::Parse>::parse(&self.bytes)
204 }
205}
206
207#[non_exhaustive]
209#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
210pub struct Stats {
211 pub packets: u64,
213 pub sections_completed: u64,
215 pub emitted: u64,
217 pub suppressed: u64,
219 pub crc_failures: u64,
223 pub malformed_packets: u64,
225 pub gate_evictions: u64,
227}
228
229#[derive(Clone, Copy, PartialEq, Eq)]
231struct GateEntry {
232 version: u8,
234 crc: u32,
237}
238
239struct Config {
241 follow_pat: bool,
242 emit_repeats: bool,
243 gate_capacity: usize,
244}
245
246pub struct SiDemuxBuilder {
251 follow_pat: bool,
252 dvb_si_pids: bool,
253 emit_repeats: bool,
254 gate_capacity: usize,
255 extra_pids: Vec<Pid>,
256}
257
258impl Default for SiDemuxBuilder {
259 fn default() -> Self {
260 Self {
261 follow_pat: true,
262 dvb_si_pids: true,
263 emit_repeats: false,
264 gate_capacity: 65_536,
265 extra_pids: Vec::new(),
266 }
267 }
268}
269
270impl SiDemuxBuilder {
271 #[must_use]
274 pub fn follow_pat(mut self, on: bool) -> Self {
275 self.follow_pat = on;
276 self
277 }
278
279 #[must_use]
282 pub fn dvb_si_pids(mut self, on: bool) -> Self {
283 self.dvb_si_pids = on;
284 self
285 }
286
287 #[must_use]
289 pub fn pid(mut self, pid: Pid) -> Self {
290 self.extra_pids.push(pid);
291 self
292 }
293
294 #[must_use]
297 pub fn emit_repeats(mut self, on: bool) -> Self {
298 self.emit_repeats = on;
299 self
300 }
301
302 #[must_use]
305 pub fn gate_capacity(mut self, cap: usize) -> Self {
306 self.gate_capacity = cap;
307 self
308 }
309
310 #[must_use]
312 pub fn build(self) -> SiDemux {
313 let mut pids: HashMap<Pid, SectionReassembler> = HashMap::new();
314 if self.dvb_si_pids {
315 use crate::pid::well_known as wk;
316 for pid in [
317 wk::PAT,
318 wk::CAT,
319 wk::NIT,
320 wk::SDT_BAT,
321 wk::EIT,
322 wk::RST,
323 wk::TDT_TOT,
324 wk::SAT,
325 ] {
326 pids.entry(pid).or_default();
327 }
328 }
329 for p in self.extra_pids {
330 pids.entry(p).or_default();
331 }
332 SiDemux {
333 pids,
334 gate: HashMap::new(),
335 gate_order: VecDeque::new(),
336 cfg: Config {
337 follow_pat: self.follow_pat,
338 emit_repeats: self.emit_repeats,
339 gate_capacity: self.gate_capacity,
340 },
341 stats: Stats::default(),
342 scratch: Vec::new(),
343 }
344 }
345}
346
347pub struct SiDemux {
351 pids: HashMap<Pid, SectionReassembler>,
352 gate: HashMap<u64, GateEntry>,
356 gate_order: VecDeque<u64>,
357 cfg: Config,
358 stats: Stats,
359 scratch: Vec<SectionEvent>,
360}
361
362impl SiDemux {
363 #[must_use]
365 pub fn builder() -> SiDemuxBuilder {
366 SiDemuxBuilder::default()
367 }
368
369 #[must_use]
371 pub fn stats(&self) -> Stats {
372 self.stats
373 }
374
375 pub fn feed(&mut self, packet: &[u8]) -> impl Iterator<Item = SectionEvent> + '_ {
379 self.scratch.clear();
380 self.stats.packets += 1;
381
382 match TsPacket::parse(packet) {
383 Err(_) => self.stats.malformed_packets += 1,
384 Ok(ts) => {
385 let pid = Pid::new(ts.header.pid);
386 let payload = ts.payload.unwrap_or(&[]);
387 let mut completed: Vec<Bytes> = Vec::new();
394 if let Some(reasm) = self.pids.get_mut(&pid) {
395 reasm.feed(payload, ts.header.pusi);
396 while let Some(section) = reasm.pop_section() {
397 completed.push(section);
398 }
399 }
400 self.stats.sections_completed += completed.len() as u64;
401 for section in completed {
402 self.consider(pid, section);
403 }
404 }
405 }
406
407 self.scratch.drain(..)
408 }
409
410 fn consider(&mut self, pid: Pid, section: Bytes) {
412 if section.len() < MIN_SECTION_LEN {
417 self.stats.crc_failures += 1;
418 return;
419 }
420
421 let table_id = section[0];
422 let long_form = (section[1] & 0x80) != 0;
423 let has_crc = long_form || table_id == TOT_TABLE_ID;
425
426 if has_crc {
428 if section.len() < CRC_LEN {
429 self.stats.crc_failures += 1;
430 return;
431 }
432 let covered = §ion[..section.len() - CRC_LEN];
433 let declared = u32::from_be_bytes([
434 section[section.len() - 4],
435 section[section.len() - 3],
436 section[section.len() - 2],
437 section[section.len() - 1],
438 ]);
439 let computed = dvb_common::crc32_mpeg2::compute(covered);
440 if computed != declared {
441 self.stats.crc_failures += 1;
442 return;
443 }
444 }
445
446 let (ext, section_number, version, change_crc) =
448 if long_form && section.len() >= MIN_SECTION_LEN + LONG_FORM_EXTRA + CRC_LEN {
449 let ext = ((section[3] as u16) << 8) | section[4] as u16;
450 let version = (section[5] >> 1) & 0x1F;
451 let section_number = section[6];
452 let crc = u32::from_be_bytes([
455 section[section.len() - 4],
456 section[section.len() - 3],
457 section[section.len() - 2],
458 section[section.len() - 1],
459 ]);
460 (ext, section_number, version, crc)
461 } else {
462 (0u16, 0u8, 0u8, dvb_common::crc32_mpeg2::compute(§ion))
466 };
467
468 let key = (pid.value() as u64)
469 | ((table_id as u64) << 13)
470 | ((ext as u64) << 21)
471 | ((section_number as u64) << 37);
472
473 let entry = GateEntry {
474 version,
475 crc: change_crc,
476 };
477
478 let changed = match self.gate.get(&key) {
479 Some(prev) => *prev != entry,
480 None => true,
481 };
482
483 if !self.gate.contains_key(&key) {
485 if self.gate.len() >= self.cfg.gate_capacity {
486 if let Some(old) = self.gate_order.pop_front() {
487 self.gate.remove(&old);
488 self.stats.gate_evictions += 1;
489 }
490 }
491 self.gate_order.push_back(key);
492 }
493 self.gate.insert(key, entry);
494
495 if changed || self.cfg.emit_repeats {
496 let event = SectionEvent {
497 pid,
498 bytes: section,
499 };
500 if self.cfg.follow_pat && changed && table_id == PAT_TABLE_ID {
502 self.follow_pat(&event);
503 }
504 self.stats.emitted += 1;
505 self.scratch.push(event);
506 } else {
507 self.stats.suppressed += 1;
508 }
509 }
510
511 fn follow_pat(&mut self, event: &SectionEvent) {
515 use crate::tables::pat::PatSection;
516 use dvb_common::Parse;
517 if let Ok(pat) = PatSection::parse(&event.bytes) {
518 for entry in &pat.entries {
519 if entry.program_number != 0 {
520 self.pids.entry(Pid::new(entry.pid)).or_default();
521 }
522 }
523 }
524 }
525}
526
527#[cfg(test)]
528mod tests {
529 use super::*;
530 use crate::ts::{TsHeader, TS_PACKET_SIZE};
531
532 fn ts_packet(pid: u16, section: &[u8]) -> [u8; TS_PACKET_SIZE] {
535 let mut pkt = [0xFFu8; TS_PACKET_SIZE];
536 let header = TsHeader {
537 tei: false,
538 pusi: true,
539 pid,
540 scrambling: 0,
541 has_adaptation: false,
542 has_payload: true,
543 continuity_counter: 0,
544 };
545 header.serialize_into(&mut pkt).unwrap();
546 pkt[4] = 0x00; let start = 5;
548 assert!(start + section.len() <= TS_PACKET_SIZE, "section too big");
549 pkt[start..start + section.len()].copy_from_slice(section);
550 pkt
551 }
552
553 fn long_section(
555 table_id: u8,
556 ext: u16,
557 version: u8,
558 section_number: u8,
559 payload: &[u8],
560 ) -> Vec<u8> {
561 let section_length = (LONG_FORM_EXTRA + payload.len() + CRC_LEN) as u16;
562 let mut v = vec![
563 table_id,
564 0x80 | 0x30 | ((section_length >> 8) as u8 & 0x0F),
565 (section_length & 0xFF) as u8,
566 (ext >> 8) as u8,
567 (ext & 0xFF) as u8,
568 0xC0 | ((version & 0x1F) << 1) | 0x01,
569 section_number,
570 section_number, ];
572 v.extend_from_slice(payload);
573 let crc = dvb_common::crc32_mpeg2::compute(&v);
574 v.extend_from_slice(&crc.to_be_bytes());
575 v
576 }
577
578 fn pat_section(tsid: u16, version: u8, entries: &[(u16, u16)]) -> Vec<u8> {
580 let mut body = Vec::new();
581 for &(pn, pid) in entries {
582 body.extend_from_slice(&pn.to_be_bytes());
583 body.push(0xE0 | ((pid >> 8) as u8 & 0x1F));
584 body.push((pid & 0xFF) as u8);
585 }
586 long_section(0x00, tsid, version, 0, &body)
587 }
588
589 fn pmt_section(program_number: u16, version: u8, pcr_pid: u16) -> Vec<u8> {
591 let body = [
594 0xE0 | ((pcr_pid >> 8) as u8 & 0x1F),
595 (pcr_pid & 0xFF) as u8,
596 0xF0,
597 0x00,
598 0x02,
599 0xE0 | (((pcr_pid + 1) >> 8) as u8 & 0x1F),
600 ((pcr_pid + 1) & 0xFF) as u8,
601 0xF0,
602 0x00,
603 ];
604 long_section(0x02, program_number, version, 0, &body)
605 }
606
607 #[test]
608 fn pat_emits_once_suppresses_repeat_reemits_on_version_change() {
609 let mut demux = SiDemux::builder().build();
610
611 let pat_v0 = pat_section(0x0001, 0, &[(1, 0x0100)]);
612 let pat_v1 = pat_section(0x0001, 1, &[(1, 0x0100)]);
613
614 let pkt_v0 = ts_packet(0x0000, &pat_v0);
615 let pkt_v1 = ts_packet(0x0000, &pat_v1);
616
617 let n0: Vec<_> = demux.feed(&pkt_v0).collect();
618 assert_eq!(n0.len(), 1, "PAT v0 should emit one event");
619 assert_eq!(n0[0].table_id(), 0x00);
620 assert_eq!(n0[0].version(), Some(0));
621
622 let n1: Vec<_> = demux.feed(&pkt_v0).collect();
623 assert_eq!(n1.len(), 0, "repeat PAT should be suppressed");
624
625 let n2: Vec<_> = demux.feed(&pkt_v1).collect();
626 assert_eq!(n2.len(), 1, "PAT v1 should re-emit");
627 assert_eq!(n2[0].version(), Some(1));
628
629 let s = demux.stats();
630 assert_eq!(s.sections_completed, 3);
631 assert_eq!(s.emitted, 2);
632 assert_eq!(s.suppressed, 1);
633 assert_eq!(s.crc_failures, 0);
634 }
635
636 #[test]
637 fn follow_pat_registers_pmt_pid_and_emits_typed_pmt() {
638 use crate::tables::AnyTableSection;
639 let mut demux = SiDemux::builder().build();
640
641 let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
643 let pat_evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
644 assert_eq!(pat_evts.len(), 1);
645
646 let pmt = pmt_section(1, 0, 0x0100);
649 let pmt_evts: Vec<_> = demux.feed(&ts_packet(0x0100, &pmt)).collect();
650 assert_eq!(pmt_evts.len(), 1, "PMT on the followed PID should emit");
651 assert_eq!(pmt_evts[0].pid(), Pid::new(0x0100));
652 match pmt_evts[0].table_section().unwrap() {
653 AnyTableSection::PmtSection(p) => assert_eq!(p.program_number, 1),
654 other => panic!("expected PmtSection, got {other:?}"),
655 }
656 }
657
658 #[test]
659 fn corrupted_crc_sdt_dropped_and_counted() {
660 let mut demux = SiDemux::builder().build();
661 let mut sdt = long_section(0x42, 0x0001, 0, 0, &[0xDE, 0xAD, 0xBE, 0xEF]);
663 sdt[8] ^= 0xFF;
665 let evts: Vec<_> = demux.feed(&ts_packet(0x0011, &sdt)).collect();
666 assert_eq!(evts.len(), 0, "corrupted SDT must not emit");
667 let s = demux.stats();
668 assert_eq!(s.crc_failures, 1);
669 assert_eq!(s.emitted, 0);
670 assert_eq!(s.sections_completed, 1);
671 }
672
673 #[test]
674 fn gate_capacity_evicts_fifo_and_reemits() {
675 let mut demux = SiDemux::builder().gate_capacity(2).build();
676
677 let a = long_section(0x4E, 0x0001, 0, 0, &[0x01]);
680 let b = long_section(0x4E, 0x0002, 0, 0, &[0x02]);
681 let c = long_section(0x4E, 0x0003, 0, 0, &[0x03]);
682
683 assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
684 assert_eq!(demux.feed(&ts_packet(0x0012, &b)).count(), 1);
685 assert_eq!(demux.feed(&ts_packet(0x0012, &c)).count(), 1);
687 assert_eq!(demux.stats().gate_evictions, 1);
688
689 assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
691 }
692
693 #[test]
694 fn garbage_packet_counted_no_panic() {
695 let mut demux = SiDemux::builder().build();
696 let garbage = [0x00u8; TS_PACKET_SIZE]; let evts: Vec<_> = demux.feed(&garbage).collect();
698 assert_eq!(evts.len(), 0);
699 assert_eq!(demux.stats().malformed_packets, 1);
700 assert_eq!(demux.stats().packets, 1);
701 }
702
703 #[test]
704 fn emit_repeats_bypasses_suppression() {
705 let mut demux = SiDemux::builder().emit_repeats(true).build();
706 let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
707 let pkt = ts_packet(0x0000, &pat);
708 assert_eq!(demux.feed(&pkt).count(), 1);
709 assert_eq!(demux.feed(&pkt).count(), 1, "emit_repeats re-emits");
710 assert_eq!(demux.stats().suppressed, 0);
711 assert_eq!(demux.stats().emitted, 2);
712 }
713
714 #[test]
715 fn table_section_with_empty_registry_matches_table_section() {
716 use crate::tables::registry::TableRegistry;
717 use crate::tables::AnyTableSection;
718
719 let mut demux = SiDemux::builder().build();
720 let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
721 let evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
722 assert_eq!(evts.len(), 1);
723
724 let reg = TableRegistry::new();
725 let with_reg = evts[0].table_section_with(®).unwrap();
726 let without = evts[0].table_section().unwrap();
727 assert!(matches!(with_reg, AnyTableSection::PatSection(_)));
728 assert!(matches!(without, AnyTableSection::PatSection(_)));
729 }
730
731 #[test]
732 fn table_section_with_custom_registry_yields_other() {
733 use crate::tables::registry::TableRegistry;
734 use crate::tables::AnyTableSection;
735 use crate::traits::TableDef;
736 use dvb_common::Parse;
737
738 const PRIVATE_TID: u8 = 0x90;
739
740 #[derive(Debug)]
741 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
742 struct PrivateTable {
743 table_id: u8,
744 }
745
746 impl<'a> Parse<'a> for PrivateTable {
747 type Error = crate::Error;
748 fn parse(bytes: &'a [u8]) -> crate::Result<Self> {
749 if bytes.is_empty() {
750 return Err(crate::Error::BufferTooShort {
751 need: 1,
752 have: 0,
753 what: "PrivateTable",
754 });
755 }
756 Ok(Self { table_id: bytes[0] })
757 }
758 }
759
760 impl<'a> TableDef<'a> for PrivateTable {
761 const TABLE_ID_RANGES: &'static [(u8, u8)] = &[(PRIVATE_TID, PRIVATE_TID)];
762 const NAME: &'static str = "PRIVATE_TABLE";
763 }
764
765 let mut reg = TableRegistry::new();
766 reg.register::<PrivateTable>();
767
768 let mut demux = SiDemux::builder()
769 .dvb_si_pids(false)
770 .pid(Pid::new(0x0200))
771 .build();
772
773 let section = long_section(PRIVATE_TID, 0x0001, 0, 0, &[0x42]);
774 let evts: Vec<_> = demux.feed(&ts_packet(0x0200, §ion)).collect();
775 assert_eq!(evts.len(), 1);
776
777 let result = evts[0].table_section_with(®).unwrap();
778 match result {
779 AnyTableSection::Other {
780 table_id,
781 ref value,
782 } => {
783 assert_eq!(table_id, PRIVATE_TID);
784 let pt = value.downcast_ref::<PrivateTable>().unwrap();
785 assert_eq!(pt.table_id, PRIVATE_TID);
786 }
787 other => panic!("expected Other, got {other:?}"),
788 }
789 }
790}