1use alloc::collections::{BTreeMap, VecDeque};
80use alloc::vec::Vec;
81
82use bytes::Bytes;
83
84use crate::pid::Pid;
85use crate::ts::{SectionReassembler, TsPacket};
86
87const PAT_TABLE_ID: u8 = 0x00;
89const TOT_TABLE_ID: u8 = 0x73;
91const MIN_SECTION_LEN: usize = 3;
93const LONG_FORM_EXTRA: usize = 5;
95const CRC_LEN: usize = 4;
97
98#[derive(Debug, Clone)]
105pub struct SectionEvent {
106 pid: Pid,
107 bytes: Bytes,
108}
109
110impl SectionEvent {
111 #[must_use]
113 pub fn pid(&self) -> Pid {
114 self.pid
115 }
116
117 #[must_use]
119 pub fn bytes(&self) -> &Bytes {
120 &self.bytes
121 }
122
123 #[must_use]
126 pub fn table_id(&self) -> u8 {
127 self.bytes[0]
128 }
129
130 #[must_use]
133 fn is_long_form(&self) -> bool {
134 (self.bytes[1] & 0x80) != 0
135 }
136
137 #[must_use]
141 pub fn version(&self) -> Option<u8> {
142 if self.is_long_form() && self.bytes.len() > 5 {
143 Some((self.bytes[5] >> 1) & 0x1F)
144 } else {
145 None
146 }
147 }
148
149 #[must_use]
151 pub fn table_id_extension(&self) -> Option<u16> {
152 if self.is_long_form() && self.bytes.len() > 4 {
153 Some(((self.bytes[3] as u16) << 8) | self.bytes[4] as u16)
154 } else {
155 None
156 }
157 }
158
159 #[must_use]
161 pub fn section_number(&self) -> Option<u8> {
162 if self.is_long_form() && self.bytes.len() > 6 {
163 Some(self.bytes[6])
164 } else {
165 None
166 }
167 }
168
169 #[must_use]
172 pub fn crc_ok(&self) -> bool {
173 true
174 }
175
176 pub fn table_section(&self) -> crate::Result<crate::tables::AnyTableSection<'_>> {
181 crate::tables::AnyTableSection::parse(&self.bytes)
182 }
183
184 pub fn table_section_with(
193 &self,
194 registry: &crate::tables::registry::TableRegistry,
195 ) -> crate::Result<crate::tables::AnyTableSection<'_>> {
196 crate::tables::AnyTableSection::parse_with(registry, &self.bytes)
197 }
198
199 pub fn parse<'s, T: crate::traits::TableDef<'s>>(&'s self) -> crate::Result<T> {
204 <T as dvb_common::Parse>::parse(&self.bytes)
205 }
206}
207
208#[non_exhaustive]
210#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
211pub struct Stats {
212 pub packets: u64,
214 pub sections_completed: u64,
216 pub emitted: u64,
218 pub suppressed: u64,
220 pub crc_failures: u64,
224 pub malformed_packets: u64,
226 pub gate_evictions: u64,
228}
229
230#[derive(Clone, Copy, PartialEq, Eq)]
232struct GateEntry {
233 version: u8,
235 crc: u32,
238}
239
240struct Config {
242 follow_pat: bool,
243 emit_repeats: bool,
244 gate_capacity: usize,
245}
246
247pub struct SiDemuxBuilder {
252 follow_pat: bool,
253 dvb_si_pids: bool,
254 emit_repeats: bool,
255 gate_capacity: usize,
256 extra_pids: Vec<Pid>,
257}
258
259impl Default for SiDemuxBuilder {
260 fn default() -> Self {
261 Self {
262 follow_pat: true,
263 dvb_si_pids: true,
264 emit_repeats: false,
265 gate_capacity: 65_536,
266 extra_pids: Vec::new(),
267 }
268 }
269}
270
271impl SiDemuxBuilder {
272 #[must_use]
275 pub fn follow_pat(mut self, on: bool) -> Self {
276 self.follow_pat = on;
277 self
278 }
279
280 #[must_use]
283 pub fn dvb_si_pids(mut self, on: bool) -> Self {
284 self.dvb_si_pids = on;
285 self
286 }
287
288 #[must_use]
290 pub fn pid(mut self, pid: Pid) -> Self {
291 self.extra_pids.push(pid);
292 self
293 }
294
295 #[must_use]
298 pub fn emit_repeats(mut self, on: bool) -> Self {
299 self.emit_repeats = on;
300 self
301 }
302
303 #[must_use]
306 pub fn gate_capacity(mut self, cap: usize) -> Self {
307 self.gate_capacity = cap;
308 self
309 }
310
311 #[must_use]
313 pub fn build(self) -> SiDemux {
314 let mut pids: BTreeMap<Pid, SectionReassembler> = BTreeMap::new();
315 if self.dvb_si_pids {
316 use crate::pid::well_known as wk;
317 for pid in [
318 wk::PAT,
319 wk::CAT,
320 wk::NIT,
321 wk::SDT_BAT,
322 wk::EIT,
323 wk::RST,
324 wk::TDT_TOT,
325 wk::SAT,
326 ] {
327 pids.entry(pid).or_default();
328 }
329 }
330 for p in self.extra_pids {
331 pids.entry(p).or_default();
332 }
333 SiDemux {
334 pids,
335 gate: BTreeMap::new(),
336 gate_order: VecDeque::new(),
337 cfg: Config {
338 follow_pat: self.follow_pat,
339 emit_repeats: self.emit_repeats,
340 gate_capacity: self.gate_capacity,
341 },
342 stats: Stats::default(),
343 scratch: Vec::new(),
344 completed_scratch: Vec::new(),
345 }
346 }
347}
348
349pub struct SiDemux {
353 pids: BTreeMap<Pid, SectionReassembler>,
354 gate: BTreeMap<u64, GateEntry>,
358 gate_order: VecDeque<u64>,
359 cfg: Config,
360 stats: Stats,
361 scratch: Vec<SectionEvent>,
362 completed_scratch: Vec<Bytes>,
363}
364
365impl SiDemux {
366 #[must_use]
368 pub fn builder() -> SiDemuxBuilder {
369 SiDemuxBuilder::default()
370 }
371
372 #[must_use]
374 pub fn stats(&self) -> Stats {
375 self.stats
376 }
377
378 pub fn feed(&mut self, packet: &[u8]) -> impl Iterator<Item = SectionEvent> + '_ {
382 self.scratch.clear();
383 self.stats.packets += 1;
384
385 match TsPacket::parse(packet) {
386 Err(_) => self.stats.malformed_packets += 1,
387 Ok(ts) => {
388 let pid = Pid::new(ts.header.pid);
389 let payload = ts.payload.unwrap_or(&[]);
390 let mut completed = core::mem::take(&mut self.completed_scratch);
397 if let Some(reasm) = self.pids.get_mut(&pid) {
398 reasm.feed(payload, ts.header.pusi);
399 while let Some(section) = reasm.pop_section() {
400 completed.push(section);
401 }
402 }
403 self.stats.sections_completed += completed.len() as u64;
404 for section in completed.drain(..) {
405 self.consider(pid, section);
406 }
407 self.completed_scratch = completed;
408 }
409 }
410
411 self.scratch.drain(..)
412 }
413
414 fn consider(&mut self, pid: Pid, section: Bytes) {
416 if section.len() < MIN_SECTION_LEN {
421 self.stats.crc_failures += 1;
422 return;
423 }
424
425 let table_id = section[0];
426 let long_form = (section[1] & 0x80) != 0;
427 let has_crc = long_form || table_id == TOT_TABLE_ID;
429
430 if has_crc {
432 if section.len() < CRC_LEN {
433 self.stats.crc_failures += 1;
434 return;
435 }
436 let covered = §ion[..section.len() - CRC_LEN];
437 let declared = u32::from_be_bytes([
438 section[section.len() - 4],
439 section[section.len() - 3],
440 section[section.len() - 2],
441 section[section.len() - 1],
442 ]);
443 let computed = dvb_common::crc32_mpeg2::compute(covered);
444 if computed != declared {
445 self.stats.crc_failures += 1;
446 return;
447 }
448 }
449
450 let (ext, section_number, version, change_crc) =
452 if long_form && section.len() >= MIN_SECTION_LEN + LONG_FORM_EXTRA + CRC_LEN {
453 let ext = ((section[3] as u16) << 8) | section[4] as u16;
454 let version = (section[5] >> 1) & 0x1F;
455 let section_number = section[6];
456 let crc = u32::from_be_bytes([
459 section[section.len() - 4],
460 section[section.len() - 3],
461 section[section.len() - 2],
462 section[section.len() - 1],
463 ]);
464 (ext, section_number, version, crc)
465 } else {
466 (0u16, 0u8, 0u8, dvb_common::crc32_mpeg2::compute(§ion))
470 };
471
472 let key = (pid.value() as u64)
473 | ((table_id as u64) << 13)
474 | ((ext as u64) << 21)
475 | ((section_number as u64) << 37);
476
477 let entry = GateEntry {
478 version,
479 crc: change_crc,
480 };
481
482 let changed = match self.gate.get(&key) {
483 Some(prev) => *prev != entry,
484 None => true,
485 };
486
487 let is_new = !self.gate.contains_key(&key);
489 if is_new && self.gate.len() >= self.cfg.gate_capacity {
490 if let Some(old) = self.gate_order.pop_front() {
491 self.gate.remove(&old);
492 self.stats.gate_evictions += 1;
493 }
494 }
495 if is_new {
496 self.gate_order.push_back(key);
497 }
498 match self.gate.entry(key) {
499 alloc::collections::btree_map::Entry::Occupied(mut oe) => {
500 *oe.get_mut() = entry;
501 }
502 alloc::collections::btree_map::Entry::Vacant(ve) => {
503 ve.insert(entry);
504 }
505 }
506
507 if changed || self.cfg.emit_repeats {
508 let event = SectionEvent {
509 pid,
510 bytes: section,
511 };
512 if self.cfg.follow_pat && changed && table_id == PAT_TABLE_ID {
514 self.follow_pat(&event);
515 }
516 self.stats.emitted += 1;
517 self.scratch.push(event);
518 } else {
519 self.stats.suppressed += 1;
520 }
521 }
522
523 fn follow_pat(&mut self, event: &SectionEvent) {
527 use crate::tables::pat::PatSection;
528 use dvb_common::Parse;
529 if let Ok(pat) = PatSection::parse(&event.bytes) {
530 for entry in &pat.entries {
531 if entry.program_number != 0 {
532 self.pids.entry(Pid::new(entry.pid)).or_default();
533 }
534 }
535 }
536 }
537}
538
539#[cfg(test)]
540mod tests {
541 use super::*;
542 use crate::ts::{TsHeader, TS_PACKET_SIZE};
543
544 fn ts_packet(pid: u16, section: &[u8]) -> [u8; TS_PACKET_SIZE] {
547 let mut pkt = [0xFFu8; TS_PACKET_SIZE];
548 let header = TsHeader {
549 tei: false,
550 pusi: true,
551 pid,
552 scrambling: 0,
553 has_adaptation: false,
554 has_payload: true,
555 continuity_counter: 0,
556 };
557 header.serialize_into(&mut pkt).unwrap();
558 pkt[4] = 0x00; let start = 5;
560 assert!(start + section.len() <= TS_PACKET_SIZE, "section too big");
561 pkt[start..start + section.len()].copy_from_slice(section);
562 pkt
563 }
564
565 fn long_section(
567 table_id: u8,
568 ext: u16,
569 version: u8,
570 section_number: u8,
571 payload: &[u8],
572 ) -> Vec<u8> {
573 let section_length = (LONG_FORM_EXTRA + payload.len() + CRC_LEN) as u16;
574 let mut v = vec![
575 table_id,
576 0x80 | 0x30 | ((section_length >> 8) as u8 & 0x0F),
577 (section_length & 0xFF) as u8,
578 (ext >> 8) as u8,
579 (ext & 0xFF) as u8,
580 0xC0 | ((version & 0x1F) << 1) | 0x01,
581 section_number,
582 section_number, ];
584 v.extend_from_slice(payload);
585 let crc = dvb_common::crc32_mpeg2::compute(&v);
586 v.extend_from_slice(&crc.to_be_bytes());
587 v
588 }
589
590 fn pat_section(tsid: u16, version: u8, entries: &[(u16, u16)]) -> Vec<u8> {
592 let mut body = Vec::new();
593 for &(pn, pid) in entries {
594 body.extend_from_slice(&pn.to_be_bytes());
595 body.push(0xE0 | ((pid >> 8) as u8 & 0x1F));
596 body.push((pid & 0xFF) as u8);
597 }
598 long_section(0x00, tsid, version, 0, &body)
599 }
600
601 fn pmt_section(program_number: u16, version: u8, pcr_pid: u16) -> Vec<u8> {
603 let body = [
606 0xE0 | ((pcr_pid >> 8) as u8 & 0x1F),
607 (pcr_pid & 0xFF) as u8,
608 0xF0,
609 0x00,
610 0x02,
611 0xE0 | (((pcr_pid + 1) >> 8) as u8 & 0x1F),
612 ((pcr_pid + 1) & 0xFF) as u8,
613 0xF0,
614 0x00,
615 ];
616 long_section(0x02, program_number, version, 0, &body)
617 }
618
619 #[test]
620 fn pat_emits_once_suppresses_repeat_reemits_on_version_change() {
621 let mut demux = SiDemux::builder().build();
622
623 let pat_v0 = pat_section(0x0001, 0, &[(1, 0x0100)]);
624 let pat_v1 = pat_section(0x0001, 1, &[(1, 0x0100)]);
625
626 let pkt_v0 = ts_packet(0x0000, &pat_v0);
627 let pkt_v1 = ts_packet(0x0000, &pat_v1);
628
629 let n0: Vec<_> = demux.feed(&pkt_v0).collect();
630 assert_eq!(n0.len(), 1, "PAT v0 should emit one event");
631 assert_eq!(n0[0].table_id(), 0x00);
632 assert_eq!(n0[0].version(), Some(0));
633
634 let n1: Vec<_> = demux.feed(&pkt_v0).collect();
635 assert_eq!(n1.len(), 0, "repeat PAT should be suppressed");
636
637 let n2: Vec<_> = demux.feed(&pkt_v1).collect();
638 assert_eq!(n2.len(), 1, "PAT v1 should re-emit");
639 assert_eq!(n2[0].version(), Some(1));
640
641 let s = demux.stats();
642 assert_eq!(s.sections_completed, 3);
643 assert_eq!(s.emitted, 2);
644 assert_eq!(s.suppressed, 1);
645 assert_eq!(s.crc_failures, 0);
646 }
647
648 #[test]
649 fn follow_pat_registers_pmt_pid_and_emits_typed_pmt() {
650 use crate::tables::AnyTableSection;
651 let mut demux = SiDemux::builder().build();
652
653 let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
655 let pat_evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
656 assert_eq!(pat_evts.len(), 1);
657
658 let pmt = pmt_section(1, 0, 0x0100);
661 let pmt_evts: Vec<_> = demux.feed(&ts_packet(0x0100, &pmt)).collect();
662 assert_eq!(pmt_evts.len(), 1, "PMT on the followed PID should emit");
663 assert_eq!(pmt_evts[0].pid(), Pid::new(0x0100));
664 match pmt_evts[0].table_section().unwrap() {
665 AnyTableSection::PmtSection(p) => assert_eq!(p.program_number, 1),
666 other => panic!("expected PmtSection, got {other:?}"),
667 }
668 }
669
670 #[test]
671 fn corrupted_crc_sdt_dropped_and_counted() {
672 let mut demux = SiDemux::builder().build();
673 let mut sdt = long_section(0x42, 0x0001, 0, 0, &[0xDE, 0xAD, 0xBE, 0xEF]);
675 sdt[8] ^= 0xFF;
677 let evts: Vec<_> = demux.feed(&ts_packet(0x0011, &sdt)).collect();
678 assert_eq!(evts.len(), 0, "corrupted SDT must not emit");
679 let s = demux.stats();
680 assert_eq!(s.crc_failures, 1);
681 assert_eq!(s.emitted, 0);
682 assert_eq!(s.sections_completed, 1);
683 }
684
685 #[test]
686 fn gate_capacity_evicts_fifo_and_reemits() {
687 let mut demux = SiDemux::builder().gate_capacity(2).build();
688
689 let a = long_section(0x4E, 0x0001, 0, 0, &[0x01]);
692 let b = long_section(0x4E, 0x0002, 0, 0, &[0x02]);
693 let c = long_section(0x4E, 0x0003, 0, 0, &[0x03]);
694
695 assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
696 assert_eq!(demux.feed(&ts_packet(0x0012, &b)).count(), 1);
697 assert_eq!(demux.feed(&ts_packet(0x0012, &c)).count(), 1);
699 assert_eq!(demux.stats().gate_evictions, 1);
700
701 assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
703 }
704
705 #[test]
706 fn garbage_packet_counted_no_panic() {
707 let mut demux = SiDemux::builder().build();
708 let garbage = [0x00u8; TS_PACKET_SIZE]; let evts: Vec<_> = demux.feed(&garbage).collect();
710 assert_eq!(evts.len(), 0);
711 assert_eq!(demux.stats().malformed_packets, 1);
712 assert_eq!(demux.stats().packets, 1);
713 }
714
715 #[test]
716 fn emit_repeats_bypasses_suppression() {
717 let mut demux = SiDemux::builder().emit_repeats(true).build();
718 let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
719 let pkt = ts_packet(0x0000, &pat);
720 assert_eq!(demux.feed(&pkt).count(), 1);
721 assert_eq!(demux.feed(&pkt).count(), 1, "emit_repeats re-emits");
722 assert_eq!(demux.stats().suppressed, 0);
723 assert_eq!(demux.stats().emitted, 2);
724 }
725
726 #[test]
727 fn table_section_with_empty_registry_matches_table_section() {
728 use crate::tables::registry::TableRegistry;
729 use crate::tables::AnyTableSection;
730
731 let mut demux = SiDemux::builder().build();
732 let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
733 let evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
734 assert_eq!(evts.len(), 1);
735
736 let reg = TableRegistry::new();
737 let with_reg = evts[0].table_section_with(®).unwrap();
738 let without = evts[0].table_section().unwrap();
739 assert!(matches!(with_reg, AnyTableSection::PatSection(_)));
740 assert!(matches!(without, AnyTableSection::PatSection(_)));
741 }
742
743 #[test]
744 fn table_section_with_custom_registry_yields_other() {
745 use crate::tables::registry::TableRegistry;
746 use crate::tables::AnyTableSection;
747 use crate::traits::TableDef;
748 use dvb_common::Parse;
749
750 const PRIVATE_TID: u8 = 0x90;
751
752 #[derive(Debug)]
753 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
754 struct PrivateTable {
755 table_id: u8,
756 }
757
758 impl<'a> Parse<'a> for PrivateTable {
759 type Error = crate::Error;
760 fn parse(bytes: &'a [u8]) -> crate::Result<Self> {
761 if bytes.is_empty() {
762 return Err(crate::Error::BufferTooShort {
763 need: 1,
764 have: 0,
765 what: "PrivateTable",
766 });
767 }
768 Ok(Self { table_id: bytes[0] })
769 }
770 }
771
772 impl<'a> TableDef<'a> for PrivateTable {
773 const TABLE_ID_RANGES: &'static [(u8, u8)] = &[(PRIVATE_TID, PRIVATE_TID)];
774 const NAME: &'static str = "PRIVATE_TABLE";
775 }
776
777 let mut reg = TableRegistry::new();
778 reg.register::<PrivateTable>();
779
780 let mut demux = SiDemux::builder()
781 .dvb_si_pids(false)
782 .pid(Pid::new(0x0200))
783 .build();
784
785 let section = long_section(PRIVATE_TID, 0x0001, 0, 0, &[0x42]);
786 let evts: Vec<_> = demux.feed(&ts_packet(0x0200, §ion)).collect();
787 assert_eq!(evts.len(), 1);
788
789 let result = evts[0].table_section_with(®).unwrap();
790 match result {
791 AnyTableSection::Other {
792 table_id,
793 ref value,
794 } => {
795 assert_eq!(table_id, PRIVATE_TID);
796 let pt = value.downcast_ref::<PrivateTable>().unwrap();
797 assert_eq!(pt.table_id, PRIVATE_TID);
798 }
799 other => panic!("expected Other, got {other:?}"),
800 }
801 }
802}