use bytes::Bytes;
use crate::crc;
use crate::packet::Header;
use crate::payload::AnyPayload;
use crate::ts::PacketReassembler;
const TS_SYNC: u8 = 0x47;
const TS_PACKET_SIZE: usize = 188;
const PUSI_MASK: u8 = 0x40;
const PID_HI_MASK: u8 = 0x1F;
const ADAPTATION_FLAG: u8 = 0x20;
const PAYLOAD_FLAG: u8 = 0x10;
struct TsInfo {
pid: u16,
pusi: bool,
payload_start: usize,
}
fn parse_ts_header(buf: &[u8]) -> Option<TsInfo> {
if buf.len() < TS_PACKET_SIZE || buf[0] != TS_SYNC {
return None;
}
let b1 = buf[1];
let b3 = buf[3];
let pusi = (b1 & PUSI_MASK) != 0;
let pid = (((b1 & PID_HI_MASK) as u16) << 8) | (buf[2] as u16);
let has_adaptation = (b3 & ADAPTATION_FLAG) != 0;
let has_payload = (b3 & PAYLOAD_FLAG) != 0;
if !has_payload {
return None;
}
let mut cursor: usize = 4;
if has_adaptation {
if cursor >= TS_PACKET_SIZE {
return None;
}
let af_len = buf[cursor] as usize;
cursor += 1 + af_len;
if cursor > TS_PACKET_SIZE {
return None;
}
}
Some(TsInfo {
pid,
pusi,
payload_start: cursor,
})
}
#[derive(Debug, Clone)]
pub struct T2miEvent {
bytes: Bytes,
}
impl T2miEvent {
#[must_use]
pub fn bytes(&self) -> &Bytes {
&self.bytes
}
#[must_use]
pub fn packet_type(&self) -> u8 {
self.bytes[0]
}
pub fn header(&self) -> crate::Result<Header> {
use dvb_common::Parse;
Header::parse(&self.bytes)
}
pub fn payload(&self) -> crate::Result<AnyPayload<'_>> {
use dvb_common::Parse;
let hdr = Header::parse(&self.bytes)?;
let payload_bytes = hdr.payload_bytes(&self.bytes)?;
let packet_type = self.bytes[0];
Ok(match AnyPayload::dispatch(packet_type, payload_bytes) {
Some(result) => result?,
None => AnyPayload::Unknown {
packet_type,
body: payload_bytes,
},
})
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct Stats {
pub ts_packets: u64,
pub t2mi_packets: u64,
pub crc_failures: u64,
pub malformed_packets: u64,
}
pub struct T2miPump {
mode: PumpMode,
reasm: PacketReassembler,
stats: Stats,
scratch: Vec<T2miEvent>,
raw_started: bool,
}
enum PumpMode {
Ts { pid: u16 },
Raw,
}
impl T2miPump {
#[must_use]
pub fn new(pid: u16) -> Self {
Self {
mode: PumpMode::Ts { pid },
reasm: PacketReassembler::new(),
stats: Stats::default(),
scratch: Vec::new(),
raw_started: false,
}
}
#[must_use]
pub fn raw() -> Self {
Self {
mode: PumpMode::Raw,
reasm: PacketReassembler::new(),
stats: Stats::default(),
scratch: Vec::new(),
raw_started: false,
}
}
#[must_use]
pub fn stats(&self) -> Stats {
self.stats
}
pub fn feed_ts(&mut self, packet: &[u8]) -> impl Iterator<Item = T2miEvent> + '_ {
self.scratch.clear();
match self.mode {
PumpMode::Raw => {
self.stats.malformed_packets += 1;
}
PumpMode::Ts { pid: filter_pid } => {
self.stats.ts_packets += 1;
match parse_ts_header(packet) {
None => {
self.stats.malformed_packets += 1;
}
Some(info) => {
if info.pid == filter_pid {
let payload = &packet[info.payload_start..TS_PACKET_SIZE];
self.reasm.feed(payload, info.pusi);
Self::drain_reasm_into(
&mut self.reasm,
&mut self.stats,
&mut self.scratch,
);
}
}
}
}
}
self.scratch.drain(..)
}
pub fn feed_raw(&mut self, data: &[u8]) -> impl Iterator<Item = T2miEvent> + '_ {
self.scratch.clear();
match self.mode {
PumpMode::Ts { .. } => {
self.stats.malformed_packets += 1;
}
PumpMode::Raw => {
if !self.raw_started {
let mut buf = Vec::with_capacity(1 + data.len());
buf.push(0x00); buf.extend_from_slice(data);
self.reasm.feed(&buf, true);
self.raw_started = true;
} else {
self.reasm.feed(data, false);
}
Self::drain_reasm_into(&mut self.reasm, &mut self.stats, &mut self.scratch);
}
}
self.scratch.drain(..)
}
fn drain_reasm_into(
reasm: &mut PacketReassembler,
stats: &mut Stats,
scratch: &mut Vec<T2miEvent>,
) {
for raw in reasm.drain_packets() {
stats.t2mi_packets += 1;
match crc::validate_crc(&raw) {
Ok(()) => scratch.push(T2miEvent { bytes: raw }),
Err(_) => stats.crc_failures += 1,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use dvb_common::crc32_mpeg2;
fn make_t2mi_packet(packet_type: u8, payload: &[u8]) -> Vec<u8> {
let payload_len_bits = (payload.len() * 8) as u16;
let mut pkt = Vec::with_capacity(6 + payload.len() + 4);
pkt.push(packet_type);
pkt.push(0x01); pkt.push(0x00); pkt.push(0x00); pkt.extend_from_slice(&payload_len_bits.to_be_bytes());
pkt.extend_from_slice(payload);
let crc = crc32_mpeg2::compute(&pkt);
pkt.extend_from_slice(&crc.to_be_bytes());
pkt
}
fn ts_packet(pid: u16, t2mi_data: &[u8], pusi: bool, pointer_field: u8) -> [u8; 188] {
let mut pkt = [0xFFu8; 188];
pkt[0] = TS_SYNC;
pkt[1] = if pusi { PUSI_MASK } else { 0 };
pkt[1] |= ((pid >> 8) as u8) & PID_HI_MASK;
pkt[2] = (pid & 0xFF) as u8;
pkt[3] = PAYLOAD_FLAG; if pusi {
pkt[4] = pointer_field;
let start = 5 + pointer_field as usize;
assert!(
start + t2mi_data.len() <= 188,
"T2-MI data too large for one TS packet"
);
pkt[start..start + t2mi_data.len()].copy_from_slice(t2mi_data);
} else {
let start = 4;
assert!(
start + t2mi_data.len() <= 188,
"T2-MI data too large for one TS packet"
);
pkt[start..start + t2mi_data.len()].copy_from_slice(t2mi_data);
}
pkt
}
#[test]
fn ts_packet_emits_one_event_with_typed_payload() {
let bbframe_payload = [0x01u8, 0x02, 0x00];
let t2mi = make_t2mi_packet(0x00, &bbframe_payload);
let pkt = ts_packet(0x0006, &t2mi, true, 0);
let mut pump = T2miPump::new(0x0006);
let events: Vec<_> = pump.feed_ts(&pkt).collect();
assert_eq!(events.len(), 1, "expected exactly one event");
assert_eq!(events[0].packet_type(), 0x00);
let payload = events[0].payload().expect("payload parse should succeed");
assert!(
matches!(payload, AnyPayload::Bbframe(_)),
"expected Bbframe, got {payload:?}"
);
let stats = pump.stats();
assert_eq!(stats.ts_packets, 1);
assert_eq!(stats.t2mi_packets, 1);
assert_eq!(stats.crc_failures, 0);
assert_eq!(stats.malformed_packets, 0);
}
#[test]
fn corrupted_crc_drops_packet_and_counts() {
let payload = [0x00u8, 0x00, 0x00]; let mut t2mi = make_t2mi_packet(0x00, &payload);
*t2mi.last_mut().unwrap() ^= 0xFF;
let pkt = ts_packet(0x0006, &t2mi, true, 0);
let mut pump = T2miPump::new(0x0006);
let events: Vec<_> = pump.feed_ts(&pkt).collect();
assert_eq!(events.len(), 0, "corrupted packet must not emit");
let stats = pump.stats();
assert_eq!(stats.crc_failures, 1);
assert_eq!(stats.t2mi_packets, 1); }
#[test]
fn feed_raw_split_across_two_calls_emits_one_event() {
let ts_payload = [0x00u8; 11];
let t2mi = make_t2mi_packet(0x20, &ts_payload);
let split = 6;
let first = &t2mi[..split];
let second = &t2mi[split..];
let mut pump = T2miPump::raw();
let ev1: Vec<_> = pump.feed_raw(first).collect();
assert_eq!(ev1.len(), 0, "no complete packet yet after first chunk");
let ev2: Vec<_> = pump.feed_raw(second).collect();
assert_eq!(
ev2.len(),
1,
"one event after second chunk completes the packet"
);
let stats = pump.stats();
assert_eq!(stats.t2mi_packets, 1);
assert_eq!(stats.crc_failures, 0);
}
#[test]
fn garbage_ts_packet_counted_no_panic() {
let mut pump = T2miPump::new(0x0006);
let garbage = [0x00u8; 188]; let events: Vec<_> = pump.feed_ts(&garbage).collect();
assert_eq!(events.len(), 0);
assert_eq!(pump.stats().malformed_packets, 1);
assert_eq!(pump.stats().ts_packets, 1);
}
#[test]
fn wrong_pid_ts_packet_ignored() {
let payload = [0x00u8, 0x00, 0x00];
let t2mi = make_t2mi_packet(0x00, &payload);
let pkt = ts_packet(0x0100, &t2mi, true, 0);
let mut pump = T2miPump::new(0x0006);
let events: Vec<_> = pump.feed_ts(&pkt).collect();
assert_eq!(events.len(), 0, "wrong-PID packet must not emit");
let stats = pump.stats();
assert_eq!(stats.ts_packets, 1);
assert_eq!(stats.t2mi_packets, 0);
assert_eq!(stats.crc_failures, 0);
assert_eq!(stats.malformed_packets, 0);
}
#[test]
fn event_header_lazy_parse_matches_packet_type() {
let payload = [0x00u8; 11]; let t2mi = make_t2mi_packet(0x20, &payload);
let pkt = ts_packet(0x0010, &t2mi, true, 0);
let mut pump = T2miPump::new(0x0010);
let events: Vec<_> = pump.feed_ts(&pkt).collect();
assert_eq!(events.len(), 1);
let hdr = events[0].header().expect("header parse should succeed");
assert_eq!(hdr.packet_type as u8, 0x20);
assert_eq!(hdr.packet_count, 0x01);
}
#[test]
fn stats_accumulate_across_feeds() {
let payload = [0x00u8, 0x00, 0x00];
let t2mi = make_t2mi_packet(0x00, &payload);
let pkt = ts_packet(0x0006, &t2mi, true, 0);
let mut pump = T2miPump::new(0x0006);
pump.feed_ts(&pkt).for_each(drop);
pump.feed_ts(&pkt).for_each(drop);
let stats = pump.stats();
assert_eq!(stats.ts_packets, 2);
assert_eq!(stats.t2mi_packets, 2);
}
}