1use alloc::collections::{BTreeMap, VecDeque};
80use alloc::vec::Vec;
81
82use bytes::Bytes;
83
84use crate::pid::Pid;
85use crate::ts::{SectionReassembler, TsPacket};
86
87const PAT_TABLE_ID: u8 = 0x00;
89const TOT_TABLE_ID: u8 = 0x73;
91const MIN_SECTION_LEN: usize = 3;
93const LONG_FORM_EXTRA: usize = 5;
95const CRC_LEN: usize = 4;
97
98#[derive(Debug, Clone)]
105pub struct SectionEvent {
106 pid: Pid,
107 bytes: Bytes,
108}
109
110impl SectionEvent {
111 #[must_use]
113 pub fn pid(&self) -> Pid {
114 self.pid
115 }
116
117 #[must_use]
119 pub fn bytes(&self) -> &Bytes {
120 &self.bytes
121 }
122
123 #[must_use]
126 pub fn table_id(&self) -> u8 {
127 self.bytes[0]
128 }
129
130 #[must_use]
133 fn is_long_form(&self) -> bool {
134 (self.bytes[1] & 0x80) != 0
135 }
136
137 #[must_use]
141 pub fn version(&self) -> Option<u8> {
142 if self.is_long_form() && self.bytes.len() > 5 {
143 Some((self.bytes[5] >> 1) & 0x1F)
144 } else {
145 None
146 }
147 }
148
149 #[must_use]
151 pub fn table_id_extension(&self) -> Option<u16> {
152 if self.is_long_form() && self.bytes.len() > 4 {
153 Some(((self.bytes[3] as u16) << 8) | self.bytes[4] as u16)
154 } else {
155 None
156 }
157 }
158
159 #[must_use]
161 pub fn section_number(&self) -> Option<u8> {
162 if self.is_long_form() && self.bytes.len() > 6 {
163 Some(self.bytes[6])
164 } else {
165 None
166 }
167 }
168
169 #[must_use]
172 pub fn crc_ok(&self) -> bool {
173 true
174 }
175
176 pub fn table_section(&self) -> crate::Result<crate::tables::AnyTableSection<'_>> {
181 crate::tables::AnyTableSection::parse(&self.bytes)
182 }
183
184 pub fn table_section_with(
193 &self,
194 registry: &crate::tables::registry::TableRegistry,
195 ) -> crate::Result<crate::tables::AnyTableSection<'_>> {
196 crate::tables::AnyTableSection::parse_with(registry, &self.bytes)
197 }
198
199 pub fn parse<'s, T: crate::traits::TableDef<'s>>(&'s self) -> crate::Result<T> {
204 <T as dvb_common::Parse>::parse(&self.bytes)
205 }
206}
207
208#[non_exhaustive]
210#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
211pub struct Stats {
212 pub packets: u64,
214 pub sections_completed: u64,
216 pub emitted: u64,
218 pub suppressed: u64,
220 pub crc_failures: u64,
224 pub malformed_packets: u64,
226 pub gate_evictions: u64,
228}
229
230#[derive(Clone, Copy, PartialEq, Eq)]
232struct GateEntry {
233 version: u8,
235 crc: u32,
238}
239
240struct Config {
242 follow_pat: bool,
243 emit_repeats: bool,
244 gate_capacity: usize,
245}
246
247pub struct SiDemuxBuilder {
252 follow_pat: bool,
253 dvb_si_pids: bool,
254 emit_repeats: bool,
255 gate_capacity: usize,
256 extra_pids: Vec<Pid>,
257}
258
259impl Default for SiDemuxBuilder {
260 fn default() -> Self {
261 Self {
262 follow_pat: true,
263 dvb_si_pids: true,
264 emit_repeats: false,
265 gate_capacity: 65_536,
266 extra_pids: Vec::new(),
267 }
268 }
269}
270
271impl SiDemuxBuilder {
272 #[must_use]
275 pub fn follow_pat(mut self, on: bool) -> Self {
276 self.follow_pat = on;
277 self
278 }
279
280 #[must_use]
283 pub fn dvb_si_pids(mut self, on: bool) -> Self {
284 self.dvb_si_pids = on;
285 self
286 }
287
288 #[must_use]
290 pub fn pid(mut self, pid: Pid) -> Self {
291 self.extra_pids.push(pid);
292 self
293 }
294
295 #[must_use]
298 pub fn emit_repeats(mut self, on: bool) -> Self {
299 self.emit_repeats = on;
300 self
301 }
302
303 #[must_use]
306 pub fn gate_capacity(mut self, cap: usize) -> Self {
307 self.gate_capacity = cap;
308 self
309 }
310
311 #[must_use]
313 pub fn build(self) -> SiDemux {
314 let mut pids: BTreeMap<Pid, SectionReassembler> = BTreeMap::new();
315 if self.dvb_si_pids {
316 use crate::pid::well_known as wk;
317 for pid in [
318 wk::PAT,
319 wk::CAT,
320 wk::NIT,
321 wk::SDT_BAT,
322 wk::EIT,
323 wk::RST,
324 wk::TDT_TOT,
325 wk::SAT,
326 ] {
327 pids.entry(pid).or_default();
328 }
329 }
330 for p in self.extra_pids {
331 pids.entry(p).or_default();
332 }
333 SiDemux {
334 pids,
335 gate: BTreeMap::new(),
336 gate_order: VecDeque::new(),
337 cfg: Config {
338 follow_pat: self.follow_pat,
339 emit_repeats: self.emit_repeats,
340 gate_capacity: self.gate_capacity,
341 },
342 stats: Stats::default(),
343 scratch: Vec::new(),
344 completed_scratch: Vec::new(),
345 }
346 }
347}
348
349pub struct SiDemux {
353 pids: BTreeMap<Pid, SectionReassembler>,
354 gate: BTreeMap<u64, GateEntry>,
358 gate_order: VecDeque<u64>,
359 cfg: Config,
360 stats: Stats,
361 scratch: Vec<SectionEvent>,
362 completed_scratch: Vec<Bytes>,
363}
364
365impl SiDemux {
366 #[must_use]
368 pub fn builder() -> SiDemuxBuilder {
369 SiDemuxBuilder::default()
370 }
371
372 #[must_use]
374 pub fn stats(&self) -> Stats {
375 self.stats
376 }
377
378 pub fn feed(&mut self, packet: &[u8]) -> impl Iterator<Item = SectionEvent> + '_ {
382 self.scratch.clear();
383 self.stats.packets += 1;
384
385 match TsPacket::parse(packet) {
386 Err(_) => self.stats.malformed_packets += 1,
387 Ok(ts) => {
388 let pid = Pid::new(ts.header.pid);
389 let payload = ts.payload.unwrap_or(&[]);
390 let mut completed = core::mem::take(&mut self.completed_scratch);
397 if let Some(reasm) = self.pids.get_mut(&pid) {
398 reasm.feed(payload, ts.header.pusi);
399 while let Some(section) = reasm.pop_section() {
400 completed.push(section);
401 }
402 }
403 self.stats.sections_completed += completed.len() as u64;
404 for section in completed.drain(..) {
405 self.consider(pid, section);
406 }
407 self.completed_scratch = completed;
408 }
409 }
410
411 self.scratch.drain(..)
412 }
413
414 fn consider(&mut self, pid: Pid, section: Bytes) {
416 if section.len() < MIN_SECTION_LEN {
421 self.stats.crc_failures += 1;
422 return;
423 }
424
425 let table_id = section[0];
426 let long_form = (section[1] & 0x80) != 0;
427 let has_crc = long_form || table_id == TOT_TABLE_ID;
429
430 if has_crc {
432 if section.len() < CRC_LEN {
433 self.stats.crc_failures += 1;
434 return;
435 }
436 let (covered, crc_bytes) = section.split_last_chunk::<CRC_LEN>().unwrap();
437 let declared = u32::from_be_bytes(*crc_bytes);
438 let computed = dvb_common::crc32_mpeg2::compute(covered);
439 if computed != declared {
440 self.stats.crc_failures += 1;
441 return;
442 }
443 }
444
445 let (ext, section_number, version, change_crc) =
447 if long_form && section.len() >= MIN_SECTION_LEN + LONG_FORM_EXTRA + CRC_LEN {
448 let ext = ((section[3] as u16) << 8) | section[4] as u16;
449 let version = (section[5] >> 1) & 0x1F;
450 let section_number = section[6];
451 let (_, crc_bytes) = section.split_last_chunk::<CRC_LEN>().unwrap();
454 let crc = u32::from_be_bytes(*crc_bytes);
455 (ext, section_number, version, crc)
456 } else {
457 (0u16, 0u8, 0u8, dvb_common::crc32_mpeg2::compute(§ion))
461 };
462
463 let key = (pid.value() as u64)
464 | ((table_id as u64) << 13)
465 | ((ext as u64) << 21)
466 | ((section_number as u64) << 37);
467
468 let entry = GateEntry {
469 version,
470 crc: change_crc,
471 };
472
473 let changed = match self.gate.get(&key) {
474 Some(prev) => *prev != entry,
475 None => true,
476 };
477
478 let is_new = !self.gate.contains_key(&key);
480 if is_new && self.gate.len() >= self.cfg.gate_capacity {
481 if let Some(old) = self.gate_order.pop_front() {
482 self.gate.remove(&old);
483 self.stats.gate_evictions += 1;
484 }
485 }
486 if is_new {
487 self.gate_order.push_back(key);
488 }
489 match self.gate.entry(key) {
490 alloc::collections::btree_map::Entry::Occupied(mut oe) => {
491 *oe.get_mut() = entry;
492 }
493 alloc::collections::btree_map::Entry::Vacant(ve) => {
494 ve.insert(entry);
495 }
496 }
497
498 if changed || self.cfg.emit_repeats {
499 let event = SectionEvent {
500 pid,
501 bytes: section,
502 };
503 if self.cfg.follow_pat && changed && table_id == PAT_TABLE_ID {
505 self.follow_pat(&event);
506 }
507 self.stats.emitted += 1;
508 self.scratch.push(event);
509 } else {
510 self.stats.suppressed += 1;
511 }
512 }
513
514 fn follow_pat(&mut self, event: &SectionEvent) {
518 use crate::tables::pat::PatSection;
519 use dvb_common::Parse;
520 if let Ok(pat) = PatSection::parse(&event.bytes) {
521 for entry in &pat.entries {
522 if entry.program_number != 0 {
523 self.pids.entry(Pid::new(entry.pid)).or_default();
524 }
525 }
526 }
527 }
528}
529
530#[cfg(test)]
531mod tests {
532 use super::*;
533 use crate::ts::{TsHeader, TS_PACKET_SIZE};
534
535 fn ts_packet(pid: u16, section: &[u8]) -> [u8; TS_PACKET_SIZE] {
538 let mut pkt = [0xFFu8; TS_PACKET_SIZE];
539 let header = TsHeader {
540 tei: false,
541 pusi: true,
542 pid,
543 scrambling: 0,
544 has_adaptation: false,
545 has_payload: true,
546 continuity_counter: 0,
547 };
548 header.serialize_into(&mut pkt).unwrap();
549 pkt[4] = 0x00; let start = 5;
551 assert!(start + section.len() <= TS_PACKET_SIZE, "section too big");
552 pkt[start..start + section.len()].copy_from_slice(section);
553 pkt
554 }
555
556 fn long_section(
558 table_id: u8,
559 ext: u16,
560 version: u8,
561 section_number: u8,
562 payload: &[u8],
563 ) -> Vec<u8> {
564 let section_length = (LONG_FORM_EXTRA + payload.len() + CRC_LEN) as u16;
565 let mut v = vec![
566 table_id,
567 0x80 | 0x30 | ((section_length >> 8) as u8 & 0x0F),
568 (section_length & 0xFF) as u8,
569 (ext >> 8) as u8,
570 (ext & 0xFF) as u8,
571 0xC0 | ((version & 0x1F) << 1) | 0x01,
572 section_number,
573 section_number, ];
575 v.extend_from_slice(payload);
576 let crc = dvb_common::crc32_mpeg2::compute(&v);
577 v.extend_from_slice(&crc.to_be_bytes());
578 v
579 }
580
581 fn pat_section(tsid: u16, version: u8, entries: &[(u16, u16)]) -> Vec<u8> {
583 let mut body = Vec::new();
584 for &(pn, pid) in entries {
585 body.extend_from_slice(&pn.to_be_bytes());
586 body.push(0xE0 | ((pid >> 8) as u8 & 0x1F));
587 body.push((pid & 0xFF) as u8);
588 }
589 long_section(0x00, tsid, version, 0, &body)
590 }
591
592 fn pmt_section(program_number: u16, version: u8, pcr_pid: u16) -> Vec<u8> {
594 let body = [
597 0xE0 | ((pcr_pid >> 8) as u8 & 0x1F),
598 (pcr_pid & 0xFF) as u8,
599 0xF0,
600 0x00,
601 0x02,
602 0xE0 | (((pcr_pid + 1) >> 8) as u8 & 0x1F),
603 ((pcr_pid + 1) & 0xFF) as u8,
604 0xF0,
605 0x00,
606 ];
607 long_section(0x02, program_number, version, 0, &body)
608 }
609
610 #[test]
611 fn pat_emits_once_suppresses_repeat_reemits_on_version_change() {
612 let mut demux = SiDemux::builder().build();
613
614 let pat_v0 = pat_section(0x0001, 0, &[(1, 0x0100)]);
615 let pat_v1 = pat_section(0x0001, 1, &[(1, 0x0100)]);
616
617 let pkt_v0 = ts_packet(0x0000, &pat_v0);
618 let pkt_v1 = ts_packet(0x0000, &pat_v1);
619
620 let n0: Vec<_> = demux.feed(&pkt_v0).collect();
621 assert_eq!(n0.len(), 1, "PAT v0 should emit one event");
622 assert_eq!(n0[0].table_id(), 0x00);
623 assert_eq!(n0[0].version(), Some(0));
624
625 let n1: Vec<_> = demux.feed(&pkt_v0).collect();
626 assert_eq!(n1.len(), 0, "repeat PAT should be suppressed");
627
628 let n2: Vec<_> = demux.feed(&pkt_v1).collect();
629 assert_eq!(n2.len(), 1, "PAT v1 should re-emit");
630 assert_eq!(n2[0].version(), Some(1));
631
632 let s = demux.stats();
633 assert_eq!(s.sections_completed, 3);
634 assert_eq!(s.emitted, 2);
635 assert_eq!(s.suppressed, 1);
636 assert_eq!(s.crc_failures, 0);
637 }
638
639 #[test]
640 fn follow_pat_registers_pmt_pid_and_emits_typed_pmt() {
641 use crate::tables::AnyTableSection;
642 let mut demux = SiDemux::builder().build();
643
644 let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
646 let pat_evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
647 assert_eq!(pat_evts.len(), 1);
648
649 let pmt = pmt_section(1, 0, 0x0100);
652 let pmt_evts: Vec<_> = demux.feed(&ts_packet(0x0100, &pmt)).collect();
653 assert_eq!(pmt_evts.len(), 1, "PMT on the followed PID should emit");
654 assert_eq!(pmt_evts[0].pid(), Pid::new(0x0100));
655 match pmt_evts[0].table_section().unwrap() {
656 AnyTableSection::PmtSection(p) => assert_eq!(p.program_number, 1),
657 other => panic!("expected PmtSection, got {other:?}"),
658 }
659 }
660
661 #[test]
662 fn corrupted_crc_sdt_dropped_and_counted() {
663 let mut demux = SiDemux::builder().build();
664 let mut sdt = long_section(0x42, 0x0001, 0, 0, &[0xDE, 0xAD, 0xBE, 0xEF]);
666 sdt[8] ^= 0xFF;
668 let evts: Vec<_> = demux.feed(&ts_packet(0x0011, &sdt)).collect();
669 assert_eq!(evts.len(), 0, "corrupted SDT must not emit");
670 let s = demux.stats();
671 assert_eq!(s.crc_failures, 1);
672 assert_eq!(s.emitted, 0);
673 assert_eq!(s.sections_completed, 1);
674 }
675
676 #[test]
677 fn gate_capacity_evicts_fifo_and_reemits() {
678 let mut demux = SiDemux::builder().gate_capacity(2).build();
679
680 let a = long_section(0x4E, 0x0001, 0, 0, &[0x01]);
683 let b = long_section(0x4E, 0x0002, 0, 0, &[0x02]);
684 let c = long_section(0x4E, 0x0003, 0, 0, &[0x03]);
685
686 assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
687 assert_eq!(demux.feed(&ts_packet(0x0012, &b)).count(), 1);
688 assert_eq!(demux.feed(&ts_packet(0x0012, &c)).count(), 1);
690 assert_eq!(demux.stats().gate_evictions, 1);
691
692 assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
694 }
695
696 #[test]
697 fn garbage_packet_counted_no_panic() {
698 let mut demux = SiDemux::builder().build();
699 let garbage = [0x00u8; TS_PACKET_SIZE]; let evts: Vec<_> = demux.feed(&garbage).collect();
701 assert_eq!(evts.len(), 0);
702 assert_eq!(demux.stats().malformed_packets, 1);
703 assert_eq!(demux.stats().packets, 1);
704 }
705
706 #[test]
707 fn emit_repeats_bypasses_suppression() {
708 let mut demux = SiDemux::builder().emit_repeats(true).build();
709 let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
710 let pkt = ts_packet(0x0000, &pat);
711 assert_eq!(demux.feed(&pkt).count(), 1);
712 assert_eq!(demux.feed(&pkt).count(), 1, "emit_repeats re-emits");
713 assert_eq!(demux.stats().suppressed, 0);
714 assert_eq!(demux.stats().emitted, 2);
715 }
716
717 #[test]
718 fn table_section_with_empty_registry_matches_table_section() {
719 use crate::tables::registry::TableRegistry;
720 use crate::tables::AnyTableSection;
721
722 let mut demux = SiDemux::builder().build();
723 let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
724 let evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
725 assert_eq!(evts.len(), 1);
726
727 let reg = TableRegistry::new();
728 let with_reg = evts[0].table_section_with(®).unwrap();
729 let without = evts[0].table_section().unwrap();
730 assert!(matches!(with_reg, AnyTableSection::PatSection(_)));
731 assert!(matches!(without, AnyTableSection::PatSection(_)));
732 }
733
734 #[test]
735 fn table_section_with_custom_registry_yields_other() {
736 use crate::tables::registry::TableRegistry;
737 use crate::tables::AnyTableSection;
738 use crate::traits::TableDef;
739 use dvb_common::Parse;
740
741 const PRIVATE_TID: u8 = 0x90;
742
743 #[derive(Debug)]
744 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
745 struct PrivateTable {
746 table_id: u8,
747 }
748
749 impl<'a> Parse<'a> for PrivateTable {
750 type Error = crate::Error;
751 fn parse(bytes: &'a [u8]) -> crate::Result<Self> {
752 if bytes.is_empty() {
753 return Err(crate::Error::BufferTooShort {
754 need: 1,
755 have: 0,
756 what: "PrivateTable",
757 });
758 }
759 Ok(Self { table_id: bytes[0] })
760 }
761 }
762
763 impl<'a> TableDef<'a> for PrivateTable {
764 const TABLE_ID_RANGES: &'static [(u8, u8)] = &[(PRIVATE_TID, PRIVATE_TID)];
765 const NAME: &'static str = "PRIVATE_TABLE";
766 }
767
768 let mut reg = TableRegistry::new();
769 reg.register::<PrivateTable>();
770
771 let mut demux = SiDemux::builder()
772 .dvb_si_pids(false)
773 .pid(Pid::new(0x0200))
774 .build();
775
776 let section = long_section(PRIVATE_TID, 0x0001, 0, 0, &[0x42]);
777 let evts: Vec<_> = demux.feed(&ts_packet(0x0200, §ion)).collect();
778 assert_eq!(evts.len(), 1);
779
780 let result = evts[0].table_section_with(®).unwrap();
781 match result {
782 AnyTableSection::Other {
783 table_id,
784 ref value,
785 } => {
786 assert_eq!(table_id, PRIVATE_TID);
787 let pt = value.downcast_ref::<PrivateTable>().unwrap();
788 assert_eq!(pt.table_id, PRIVATE_TID);
789 }
790 other => panic!("expected Other, got {other:?}"),
791 }
792 }
793}