use std::collections::{HashMap, VecDeque};
use bytes::Bytes;
use crate::pid::Pid;
use crate::ts::{SectionReassembler, TsPacket};
const PAT_TABLE_ID: u8 = 0x00;
const TOT_TABLE_ID: u8 = 0x73;
const MIN_SECTION_LEN: usize = 3;
const LONG_FORM_EXTRA: usize = 5;
const CRC_LEN: usize = 4;
#[derive(Debug, Clone)]
pub struct SectionEvent {
pid: Pid,
bytes: Bytes,
}
impl SectionEvent {
#[must_use]
pub fn pid(&self) -> Pid {
self.pid
}
#[must_use]
pub fn bytes(&self) -> &Bytes {
&self.bytes
}
#[must_use]
pub fn table_id(&self) -> u8 {
self.bytes[0]
}
#[must_use]
fn is_long_form(&self) -> bool {
(self.bytes[1] & 0x80) != 0
}
#[must_use]
pub fn version(&self) -> Option<u8> {
if self.is_long_form() && self.bytes.len() > 5 {
Some((self.bytes[5] >> 1) & 0x1F)
} else {
None
}
}
#[must_use]
pub fn table_id_extension(&self) -> Option<u16> {
if self.is_long_form() && self.bytes.len() > 4 {
Some(((self.bytes[3] as u16) << 8) | self.bytes[4] as u16)
} else {
None
}
}
#[must_use]
pub fn section_number(&self) -> Option<u8> {
if self.is_long_form() && self.bytes.len() > 6 {
Some(self.bytes[6])
} else {
None
}
}
#[must_use]
pub fn crc_ok(&self) -> bool {
true
}
pub fn table(&self) -> crate::Result<crate::tables::AnyTable<'_>> {
crate::tables::AnyTable::parse(&self.bytes)
}
pub fn parse<'s, T: crate::traits::TableDef<'s>>(&'s self) -> crate::Result<T> {
<T as dvb_common::Parse>::parse(&self.bytes)
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct Stats {
pub packets: u64,
pub sections_completed: u64,
pub emitted: u64,
pub suppressed: u64,
pub crc_failures: u64,
pub malformed_packets: u64,
pub gate_evictions: u64,
}
#[derive(Clone, Copy, PartialEq, Eq)]
struct GateEntry {
version: u8,
crc: u32,
}
struct Config {
follow_pat: bool,
emit_repeats: bool,
gate_capacity: usize,
}
pub struct SiDemuxBuilder {
follow_pat: bool,
dvb_si_pids: bool,
emit_repeats: bool,
gate_capacity: usize,
extra_pids: Vec<Pid>,
}
impl Default for SiDemuxBuilder {
fn default() -> Self {
Self {
follow_pat: true,
dvb_si_pids: true,
emit_repeats: false,
gate_capacity: 65_536,
extra_pids: Vec::new(),
}
}
}
impl SiDemuxBuilder {
#[must_use]
pub fn follow_pat(mut self, on: bool) -> Self {
self.follow_pat = on;
self
}
#[must_use]
pub fn dvb_si_pids(mut self, on: bool) -> Self {
self.dvb_si_pids = on;
self
}
#[must_use]
pub fn pid(mut self, pid: Pid) -> Self {
self.extra_pids.push(pid);
self
}
#[must_use]
pub fn emit_repeats(mut self, on: bool) -> Self {
self.emit_repeats = on;
self
}
#[must_use]
pub fn gate_capacity(mut self, cap: usize) -> Self {
self.gate_capacity = cap;
self
}
#[must_use]
pub fn build(self) -> SiDemux {
let mut pids: HashMap<Pid, SectionReassembler> = HashMap::new();
if self.dvb_si_pids {
use crate::pid::well_known as wk;
for pid in [
wk::PAT,
wk::CAT,
wk::NIT,
wk::SDT_BAT,
wk::EIT,
wk::RST,
wk::TDT_TOT,
wk::SAT,
] {
pids.entry(pid).or_default();
}
}
for p in self.extra_pids {
pids.entry(p).or_default();
}
SiDemux {
pids,
gate: HashMap::new(),
gate_order: VecDeque::new(),
cfg: Config {
follow_pat: self.follow_pat,
emit_repeats: self.emit_repeats,
gate_capacity: self.gate_capacity,
},
stats: Stats::default(),
scratch: Vec::new(),
}
}
}
pub struct SiDemux {
pids: HashMap<Pid, SectionReassembler>,
gate: HashMap<u64, GateEntry>,
gate_order: VecDeque<u64>,
cfg: Config,
stats: Stats,
scratch: Vec<SectionEvent>,
}
impl SiDemux {
#[must_use]
pub fn builder() -> SiDemuxBuilder {
SiDemuxBuilder::default()
}
#[must_use]
pub fn stats(&self) -> Stats {
self.stats
}
pub fn feed(&mut self, packet: &[u8]) -> impl Iterator<Item = SectionEvent> + '_ {
self.scratch.clear();
self.stats.packets += 1;
match TsPacket::parse(packet) {
Err(_) => self.stats.malformed_packets += 1,
Ok(ts) => {
let pid = Pid::new(ts.header.pid);
if self.pids.contains_key(&pid) {
let payload = ts.payload.unwrap_or(&[]);
self.pids
.get_mut(&pid)
.expect("checked above")
.feed(payload, ts.header.pusi);
while let Some(section) = self
.pids
.get_mut(&pid)
.and_then(SectionReassembler::pop_section)
{
self.stats.sections_completed += 1;
self.consider(pid, section);
}
}
}
}
self.scratch.drain(..)
}
fn consider(&mut self, pid: Pid, section: Bytes) {
if section.len() < MIN_SECTION_LEN {
self.stats.crc_failures += 1;
return;
}
let table_id = section[0];
let long_form = (section[1] & 0x80) != 0;
let has_crc = long_form || table_id == TOT_TABLE_ID;
if has_crc {
if section.len() < CRC_LEN {
self.stats.crc_failures += 1;
return;
}
let covered = §ion[..section.len() - CRC_LEN];
let declared = u32::from_be_bytes([
section[section.len() - 4],
section[section.len() - 3],
section[section.len() - 2],
section[section.len() - 1],
]);
let computed = dvb_common::crc32_mpeg2::compute(covered);
if computed != declared {
self.stats.crc_failures += 1;
return;
}
}
let (ext, section_number, version, change_crc) =
if long_form && section.len() >= MIN_SECTION_LEN + LONG_FORM_EXTRA + CRC_LEN {
let ext = ((section[3] as u16) << 8) | section[4] as u16;
let version = (section[5] >> 1) & 0x1F;
let section_number = section[6];
let crc = u32::from_be_bytes([
section[section.len() - 4],
section[section.len() - 3],
section[section.len() - 2],
section[section.len() - 1],
]);
(ext, section_number, version, crc)
} else {
(0u16, 0u8, 0u8, dvb_common::crc32_mpeg2::compute(§ion))
};
let key = (pid.value() as u64)
| ((table_id as u64) << 13)
| ((ext as u64) << 21)
| ((section_number as u64) << 37);
let entry = GateEntry {
version,
crc: change_crc,
};
let changed = match self.gate.get(&key) {
Some(prev) => *prev != entry,
None => true,
};
if !self.gate.contains_key(&key) {
if self.gate.len() >= self.cfg.gate_capacity {
if let Some(old) = self.gate_order.pop_front() {
self.gate.remove(&old);
self.stats.gate_evictions += 1;
}
}
self.gate_order.push_back(key);
}
self.gate.insert(key, entry);
if changed || self.cfg.emit_repeats {
let event = SectionEvent {
pid,
bytes: section,
};
if self.cfg.follow_pat && changed && table_id == PAT_TABLE_ID {
self.follow_pat(&event);
}
self.stats.emitted += 1;
self.scratch.push(event);
} else {
self.stats.suppressed += 1;
}
}
fn follow_pat(&mut self, event: &SectionEvent) {
use crate::tables::pat::Pat;
use dvb_common::Parse;
if let Ok(pat) = Pat::parse(&event.bytes) {
for entry in &pat.entries {
if entry.program_number != 0 {
self.pids.entry(Pid::new(entry.pid)).or_default();
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ts::{TsHeader, TS_PACKET_SIZE};
fn ts_packet(pid: u16, section: &[u8]) -> [u8; TS_PACKET_SIZE] {
let mut pkt = [0xFFu8; TS_PACKET_SIZE];
let header = TsHeader {
tei: false,
pusi: true,
pid,
scrambling: 0,
has_adaptation: false,
has_payload: true,
continuity_counter: 0,
};
header.serialize_into(&mut pkt);
pkt[4] = 0x00; let start = 5;
assert!(start + section.len() <= TS_PACKET_SIZE, "section too big");
pkt[start..start + section.len()].copy_from_slice(section);
pkt
}
fn long_section(
table_id: u8,
ext: u16,
version: u8,
section_number: u8,
payload: &[u8],
) -> Vec<u8> {
let section_length = (LONG_FORM_EXTRA + payload.len() + CRC_LEN) as u16;
let mut v = vec![
table_id,
0x80 | 0x30 | ((section_length >> 8) as u8 & 0x0F),
(section_length & 0xFF) as u8,
(ext >> 8) as u8,
(ext & 0xFF) as u8,
0xC0 | ((version & 0x1F) << 1) | 0x01,
section_number,
section_number, ];
v.extend_from_slice(payload);
let crc = dvb_common::crc32_mpeg2::compute(&v);
v.extend_from_slice(&crc.to_be_bytes());
v
}
fn pat_section(tsid: u16, version: u8, entries: &[(u16, u16)]) -> Vec<u8> {
let mut body = Vec::new();
for &(pn, pid) in entries {
body.extend_from_slice(&pn.to_be_bytes());
body.push(0xE0 | ((pid >> 8) as u8 & 0x1F));
body.push((pid & 0xFF) as u8);
}
long_section(0x00, tsid, version, 0, &body)
}
fn pmt_section(program_number: u16, version: u8, pcr_pid: u16) -> Vec<u8> {
let body = [
0xE0 | ((pcr_pid >> 8) as u8 & 0x1F),
(pcr_pid & 0xFF) as u8,
0xF0,
0x00,
0x02,
0xE0 | (((pcr_pid + 1) >> 8) as u8 & 0x1F),
((pcr_pid + 1) & 0xFF) as u8,
0xF0,
0x00,
];
long_section(0x02, program_number, version, 0, &body)
}
#[test]
fn pat_emits_once_suppresses_repeat_reemits_on_version_change() {
let mut demux = SiDemux::builder().build();
let pat_v0 = pat_section(0x0001, 0, &[(1, 0x0100)]);
let pat_v1 = pat_section(0x0001, 1, &[(1, 0x0100)]);
let pkt_v0 = ts_packet(0x0000, &pat_v0);
let pkt_v1 = ts_packet(0x0000, &pat_v1);
let n0: Vec<_> = demux.feed(&pkt_v0).collect();
assert_eq!(n0.len(), 1, "PAT v0 should emit one event");
assert_eq!(n0[0].table_id(), 0x00);
assert_eq!(n0[0].version(), Some(0));
let n1: Vec<_> = demux.feed(&pkt_v0).collect();
assert_eq!(n1.len(), 0, "repeat PAT should be suppressed");
let n2: Vec<_> = demux.feed(&pkt_v1).collect();
assert_eq!(n2.len(), 1, "PAT v1 should re-emit");
assert_eq!(n2[0].version(), Some(1));
let s = demux.stats();
assert_eq!(s.sections_completed, 3);
assert_eq!(s.emitted, 2);
assert_eq!(s.suppressed, 1);
assert_eq!(s.crc_failures, 0);
}
#[test]
fn follow_pat_registers_pmt_pid_and_emits_typed_pmt() {
use crate::tables::AnyTable;
let mut demux = SiDemux::builder().build();
let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
let pat_evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
assert_eq!(pat_evts.len(), 1);
let pmt = pmt_section(1, 0, 0x0100);
let pmt_evts: Vec<_> = demux.feed(&ts_packet(0x0100, &pmt)).collect();
assert_eq!(pmt_evts.len(), 1, "PMT on the followed PID should emit");
assert_eq!(pmt_evts[0].pid(), Pid::new(0x0100));
match pmt_evts[0].table().unwrap() {
AnyTable::Pmt(p) => assert_eq!(p.program_number, 1),
other => panic!("expected Pmt, got {other:?}"),
}
}
#[test]
fn corrupted_crc_sdt_dropped_and_counted() {
let mut demux = SiDemux::builder().build();
let mut sdt = long_section(0x42, 0x0001, 0, 0, &[0xDE, 0xAD, 0xBE, 0xEF]);
sdt[8] ^= 0xFF;
let evts: Vec<_> = demux.feed(&ts_packet(0x0011, &sdt)).collect();
assert_eq!(evts.len(), 0, "corrupted SDT must not emit");
let s = demux.stats();
assert_eq!(s.crc_failures, 1);
assert_eq!(s.emitted, 0);
assert_eq!(s.sections_completed, 1);
}
#[test]
fn gate_capacity_evicts_fifo_and_reemits() {
let mut demux = SiDemux::builder().gate_capacity(2).build();
let a = long_section(0x4E, 0x0001, 0, 0, &[0x01]);
let b = long_section(0x4E, 0x0002, 0, 0, &[0x02]);
let c = long_section(0x4E, 0x0003, 0, 0, &[0x03]);
assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
assert_eq!(demux.feed(&ts_packet(0x0012, &b)).count(), 1);
assert_eq!(demux.feed(&ts_packet(0x0012, &c)).count(), 1);
assert_eq!(demux.stats().gate_evictions, 1);
assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
}
#[test]
fn garbage_packet_counted_no_panic() {
let mut demux = SiDemux::builder().build();
let garbage = [0x00u8; TS_PACKET_SIZE]; let evts: Vec<_> = demux.feed(&garbage).collect();
assert_eq!(evts.len(), 0);
assert_eq!(demux.stats().malformed_packets, 1);
assert_eq!(demux.stats().packets, 1);
}
#[test]
fn emit_repeats_bypasses_suppression() {
let mut demux = SiDemux::builder().emit_repeats(true).build();
let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
let pkt = ts_packet(0x0000, &pat);
assert_eq!(demux.feed(&pkt).count(), 1);
assert_eq!(demux.feed(&pkt).count(), 1, "emit_repeats re-emits");
assert_eq!(demux.stats().suppressed, 0);
assert_eq!(demux.stats().emitted, 2);
}
}