use bytes::Bytes;
use crate::crc;
use crate::packet::Header;
use crate::payload::AnyPayload;
use crate::ts::PacketReassembler;
const TS_SYNC_BYTE: u8 = 0x47;
const TS_PACKET_SIZE: usize = 188;
const PUSI_MASK: u8 = 0x40;
const PID_MASK_HI: u8 = 0x1F;
const ADAPTATION_FLAG: u8 = 0x20;
const PAYLOAD_FLAG: u8 = 0x10;
struct TsInfo {
pid: u16,
pusi: bool,
payload_start: usize,
}
fn parse_ts_header(buf: &[u8]) -> Option<TsInfo> {
if buf.len() < TS_PACKET_SIZE || buf[0] != TS_SYNC_BYTE {
return None;
}
let b1 = buf[1];
let b3 = buf[3];
let pusi = (b1 & PUSI_MASK) != 0;
let pid = (((b1 & PID_MASK_HI) as u16) << 8) | (buf[2] as u16);
let has_adaptation = (b3 & ADAPTATION_FLAG) != 0;
let has_payload = (b3 & PAYLOAD_FLAG) != 0;
if !has_payload {
return None;
}
let mut cursor: usize = 4;
if has_adaptation {
let af_len = buf[cursor] as usize;
cursor += 1 + af_len;
if cursor > TS_PACKET_SIZE {
return None;
}
}
Some(TsInfo {
pid,
pusi,
payload_start: cursor,
})
}
#[derive(Debug, Clone)]
pub struct T2miEvent {
bytes: Bytes,
}
impl T2miEvent {
#[must_use]
pub fn bytes(&self) -> &Bytes {
&self.bytes
}
#[must_use]
pub fn packet_type(&self) -> u8 {
self.bytes[0]
}
pub fn header(&self) -> crate::Result<Header> {
use dvb_common::Parse;
Header::parse(&self.bytes)
}
fn payload_parts(&self) -> crate::Result<(u8, &[u8])> {
let payload_bytes = Header::raw_payload_bytes(&self.bytes)?;
let packet_type = self.bytes[0];
Ok((packet_type, payload_bytes))
}
pub fn payload(&self) -> crate::Result<AnyPayload<'_>> {
let (packet_type, payload_bytes) = self.payload_parts()?;
Ok(match AnyPayload::dispatch(packet_type, payload_bytes) {
Some(result) => result?,
None => AnyPayload::Unknown {
packet_type,
body: payload_bytes,
},
})
}
pub fn payload_with(
&self,
registry: &crate::payload::PayloadRegistry,
) -> crate::Result<AnyPayload<'_>> {
let (packet_type, payload_bytes) = self.payload_parts()?;
Ok(
match AnyPayload::dispatch_with(registry, packet_type, payload_bytes) {
Some(result) => result?,
None => AnyPayload::Unknown {
packet_type,
body: payload_bytes,
},
},
)
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
#[non_exhaustive]
pub struct Stats {
pub ts_packets: u64,
pub t2mi_packets: u64,
pub crc_failures: u64,
pub malformed_packets: u64,
}
pub struct T2miPump {
mode: PumpMode,
reasm: PacketReassembler,
stats: Stats,
scratch: Vec<T2miEvent>,
raw_started: bool,
}
enum PumpMode {
Ts { pid: u16 },
Raw,
}
impl T2miPump {
#[must_use]
pub fn new(pid: u16) -> Self {
Self {
mode: PumpMode::Ts { pid },
reasm: PacketReassembler::new(),
stats: Stats::default(),
scratch: Vec::new(),
raw_started: false,
}
}
#[must_use]
pub fn raw() -> Self {
Self {
mode: PumpMode::Raw,
reasm: PacketReassembler::new(),
stats: Stats::default(),
scratch: Vec::new(),
raw_started: false,
}
}
#[must_use]
pub fn stats(&self) -> Stats {
self.stats
}
pub fn feed_ts(&mut self, packet: &[u8]) -> impl Iterator<Item = T2miEvent> + '_ {
self.scratch.clear();
match self.mode {
PumpMode::Raw => {
self.stats.malformed_packets += 1;
}
PumpMode::Ts { pid: filter_pid } => {
self.stats.ts_packets += 1;
match parse_ts_header(packet) {
None => {
self.stats.malformed_packets += 1;
}
Some(info) => {
if info.pid == filter_pid {
let payload = &packet[info.payload_start..TS_PACKET_SIZE];
self.reasm.feed(payload, info.pusi);
Self::drain_reasm_into(
&mut self.reasm,
&mut self.stats,
&mut self.scratch,
);
}
}
}
}
}
self.scratch.drain(..)
}
pub fn feed_raw(&mut self, data: &[u8]) -> impl Iterator<Item = T2miEvent> + '_ {
self.scratch.clear();
match self.mode {
PumpMode::Ts { .. } => {
self.stats.malformed_packets += 1;
}
PumpMode::Raw => {
if !self.raw_started {
let mut buf = Vec::with_capacity(1 + data.len());
buf.push(0x00); buf.extend_from_slice(data);
self.reasm.feed(&buf, true);
self.raw_started = true;
} else {
self.reasm.feed(data, false);
}
Self::drain_reasm_into(&mut self.reasm, &mut self.stats, &mut self.scratch);
}
}
self.scratch.drain(..)
}
fn drain_reasm_into(
reasm: &mut PacketReassembler,
stats: &mut Stats,
scratch: &mut Vec<T2miEvent>,
) {
for raw in reasm.drain_packets() {
stats.t2mi_packets += 1;
match crc::validate_crc(&raw) {
Ok(()) => scratch.push(T2miEvent { bytes: raw }),
Err(_) => stats.crc_failures += 1,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use dvb_common::crc32_mpeg2;
fn make_t2mi_packet(packet_type: u8, payload: &[u8]) -> Vec<u8> {
let payload_len_bits = (payload.len() * 8) as u16;
let mut pkt = Vec::with_capacity(6 + payload.len() + 4);
pkt.push(packet_type);
pkt.push(0x01); pkt.push(0x00); pkt.push(0x00); pkt.extend_from_slice(&payload_len_bits.to_be_bytes());
pkt.extend_from_slice(payload);
let crc = crc32_mpeg2::compute(&pkt);
pkt.extend_from_slice(&crc.to_be_bytes());
pkt
}
fn ts_packet(pid: u16, t2mi_data: &[u8], pusi: bool, pointer_field: u8) -> [u8; 188] {
let mut pkt = [0xFFu8; 188];
pkt[0] = TS_SYNC_BYTE;
pkt[1] = if pusi { PUSI_MASK } else { 0 };
pkt[1] |= ((pid >> 8) as u8) & PID_MASK_HI;
pkt[2] = (pid & 0xFF) as u8;
pkt[3] = PAYLOAD_FLAG; if pusi {
pkt[4] = pointer_field;
let start = 5 + pointer_field as usize;
assert!(
start + t2mi_data.len() <= 188,
"T2-MI data too large for one TS packet"
);
pkt[start..start + t2mi_data.len()].copy_from_slice(t2mi_data);
} else {
let start = 4;
assert!(
start + t2mi_data.len() <= 188,
"T2-MI data too large for one TS packet"
);
pkt[start..start + t2mi_data.len()].copy_from_slice(t2mi_data);
}
pkt
}
#[test]
fn ts_packet_emits_one_event_with_typed_payload() {
let bbframe_payload = [0x01u8, 0x02, 0x00];
let t2mi = make_t2mi_packet(0x00, &bbframe_payload);
let pkt = ts_packet(0x0006, &t2mi, true, 0);
let mut pump = T2miPump::new(0x0006);
let events: Vec<_> = pump.feed_ts(&pkt).collect();
assert_eq!(events.len(), 1, "expected exactly one event");
assert_eq!(events[0].packet_type(), 0x00);
let payload = events[0].payload().expect("payload parse should succeed");
assert!(
matches!(payload, AnyPayload::Bbframe(_)),
"expected Bbframe, got {payload:?}"
);
let stats = pump.stats();
assert_eq!(stats.ts_packets, 1);
assert_eq!(stats.t2mi_packets, 1);
assert_eq!(stats.crc_failures, 0);
assert_eq!(stats.malformed_packets, 0);
}
#[test]
fn corrupted_crc_drops_packet_and_counts() {
let payload = [0x00u8, 0x00, 0x00]; let mut t2mi = make_t2mi_packet(0x00, &payload);
*t2mi.last_mut().unwrap() ^= 0xFF;
let pkt = ts_packet(0x0006, &t2mi, true, 0);
let mut pump = T2miPump::new(0x0006);
let events: Vec<_> = pump.feed_ts(&pkt).collect();
assert_eq!(events.len(), 0, "corrupted packet must not emit");
let stats = pump.stats();
assert_eq!(stats.crc_failures, 1);
assert_eq!(stats.t2mi_packets, 1); }
#[test]
fn feed_raw_split_across_two_calls_emits_one_event() {
let ts_payload = [0x00u8; 11];
let t2mi = make_t2mi_packet(0x20, &ts_payload);
let split = 6;
let first = &t2mi[..split];
let second = &t2mi[split..];
let mut pump = T2miPump::raw();
let ev1: Vec<_> = pump.feed_raw(first).collect();
assert_eq!(ev1.len(), 0, "no complete packet yet after first chunk");
let ev2: Vec<_> = pump.feed_raw(second).collect();
assert_eq!(
ev2.len(),
1,
"one event after second chunk completes the packet"
);
let stats = pump.stats();
assert_eq!(stats.t2mi_packets, 1);
assert_eq!(stats.crc_failures, 0);
}
#[test]
fn garbage_ts_packet_counted_no_panic() {
let mut pump = T2miPump::new(0x0006);
let garbage = [0x00u8; 188]; let events: Vec<_> = pump.feed_ts(&garbage).collect();
assert_eq!(events.len(), 0);
assert_eq!(pump.stats().malformed_packets, 1);
assert_eq!(pump.stats().ts_packets, 1);
}
#[test]
fn wrong_pid_ts_packet_ignored() {
let payload = [0x00u8, 0x00, 0x00];
let t2mi = make_t2mi_packet(0x00, &payload);
let pkt = ts_packet(0x0100, &t2mi, true, 0);
let mut pump = T2miPump::new(0x0006);
let events: Vec<_> = pump.feed_ts(&pkt).collect();
assert_eq!(events.len(), 0, "wrong-PID packet must not emit");
let stats = pump.stats();
assert_eq!(stats.ts_packets, 1);
assert_eq!(stats.t2mi_packets, 0);
assert_eq!(stats.crc_failures, 0);
assert_eq!(stats.malformed_packets, 0);
}
#[test]
fn event_header_lazy_parse_matches_packet_type() {
let payload = [0x00u8; 11]; let t2mi = make_t2mi_packet(0x20, &payload);
let pkt = ts_packet(0x0010, &t2mi, true, 0);
let mut pump = T2miPump::new(0x0010);
let events: Vec<_> = pump.feed_ts(&pkt).collect();
assert_eq!(events.len(), 1);
let hdr = events[0].header().expect("header parse should succeed");
assert_eq!(hdr.packet_type as u8, 0x20);
assert_eq!(hdr.packet_count, 0x01);
}
#[test]
fn stats_accumulate_across_feeds() {
let payload = [0x00u8, 0x00, 0x00];
let t2mi = make_t2mi_packet(0x00, &payload);
let pkt = ts_packet(0x0006, &t2mi, true, 0);
let mut pump = T2miPump::new(0x0006);
pump.feed_ts(&pkt).for_each(drop);
pump.feed_ts(&pkt).for_each(drop);
let stats = pump.stats();
assert_eq!(stats.ts_packets, 2);
assert_eq!(stats.t2mi_packets, 2);
}
#[test]
fn payload_with_dispatches_custom_registered_type() {
use crate::payload::registry::PayloadRegistry;
use crate::traits::PayloadDef;
use dvb_common::Parse;
#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
struct TestPrivatePayload {
val: u8,
}
impl<'a> Parse<'a> for TestPrivatePayload {
type Error = crate::Error;
fn parse(bytes: &'a [u8]) -> crate::Result<Self> {
if bytes.is_empty() {
return Err(crate::Error::BufferTooShort {
need: 1,
have: 0,
what: "TestPrivatePayload",
});
}
Ok(Self { val: bytes[0] })
}
}
impl<'a> PayloadDef<'a> for TestPrivatePayload {
const PACKET_TYPE: u8 = 0x00;
const NAME: &'static str = "TEST_PRIVATE";
}
let mut reg = PayloadRegistry::new();
reg.register::<TestPrivatePayload>();
let private_payload = [0x42u8, 0x02, 0x00];
let t2mi = make_t2mi_packet(0x00, &private_payload);
let pkt = ts_packet(0x0006, &t2mi, true, 0);
let mut pump = T2miPump::new(0x0006);
let events: Vec<_> = pump.feed_ts(&pkt).collect();
assert_eq!(events.len(), 1, "expected one event");
let result = events[0].payload_with(®).expect("payload_with parse");
match result {
AnyPayload::Other {
packet_type,
ref value,
} => {
assert_eq!(packet_type, 0x00);
let downcast = value.downcast_ref::<TestPrivatePayload>().unwrap();
assert_eq!(downcast.val, 0x42);
}
other => panic!("expected Other, got {other:?}"),
}
let built_in = events[0].payload().expect("payload parse");
assert!(
matches!(built_in, AnyPayload::Bbframe(_)),
"expected Bbframe via built-in dispatch, got {built_in:?}"
);
}
#[test]
fn payload_with_dispatches_genuinely_private_packet_type() {
use crate::payload::registry::PayloadRegistry;
use crate::traits::PayloadDef;
use dvb_common::Parse;
#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
struct PrivatePayload {
val: u8,
}
impl<'a> Parse<'a> for PrivatePayload {
type Error = crate::Error;
fn parse(bytes: &'a [u8]) -> crate::Result<Self> {
if bytes.is_empty() {
return Err(crate::Error::BufferTooShort {
need: 1,
have: 0,
what: "PrivatePayload",
});
}
Ok(Self { val: bytes[0] })
}
}
impl<'a> PayloadDef<'a> for PrivatePayload {
const PACKET_TYPE: u8 = 0x42;
const NAME: &'static str = "PRIVATE_0X42";
}
let mut reg = PayloadRegistry::new();
reg.register::<PrivatePayload>();
let private_body = [0xABu8];
let t2mi = make_t2mi_packet(0x42, &private_body);
let pkt = ts_packet(0x0006, &t2mi, true, 0);
let mut pump = T2miPump::new(0x0006);
let events: Vec<_> = pump.feed_ts(&pkt).collect();
assert_eq!(events.len(), 1, "expected one event");
let result = events[0].payload_with(®).expect("payload_with parse");
match result {
AnyPayload::Other {
packet_type,
ref value,
} => {
assert_eq!(packet_type, 0x42);
let downcast = value.downcast_ref::<PrivatePayload>().unwrap();
assert_eq!(downcast.val, 0xAB);
}
other => panic!("expected Other, got {other:?}"),
}
let no_reg = events[0].payload().expect("payload without registry");
match no_reg {
AnyPayload::Unknown {
packet_type,
body: _,
} => {
assert_eq!(packet_type, 0x42);
}
other => panic!("expected Unknown, got {other:?}"),
}
}
}